[feat](trx-frontend-http): /meter SSE endpoint for instant signal metering

Adds a dedicated /meter SSE stream that wraps the per-rig meter watch
and emits one compact JSON frame per update with no equality gating, so
30 Hz samples reach the browser unthrottled. Registered as a Read-access
route. app.js opens a dedicated EventSource on /meter alongside /events,
writing directly to the signal bar and value on each frame with no
requestAnimationFrame coalescing, starts/stops with connect/disconnect,
and reconnects on rig switch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-04-19 19:50:20 +02:00
parent fd0f1e43c0
commit f9cf95705a
5 changed files with 127 additions and 0 deletions
Generated
+1
View File
@@ -3168,6 +3168,7 @@ dependencies = [
"serde_json", "serde_json",
"tokio", "tokio",
"trx-core", "trx-core",
"trx-protocol",
"uuid", "uuid",
] ]
@@ -3700,6 +3700,8 @@ function connect() {
if (esHeartbeat) { if (esHeartbeat) {
clearInterval(esHeartbeat); clearInterval(esHeartbeat);
} }
stopMeterStreaming();
startMeterStreaming();
pollFreshSnapshot(); pollFreshSnapshot();
const eventsUrl = lastActiveRigId const eventsUrl = lastActiveRigId
? `/events?remote=${encodeURIComponent(lastActiveRigId)}` ? `/events?remote=${encodeURIComponent(lastActiveRigId)}`
@@ -3778,6 +3780,7 @@ function disconnect() {
decodeSource = null; decodeSource = null;
} }
stopSpectrumStreaming(); stopSpectrumStreaming();
stopMeterStreaming();
// Clear timers // Clear timers
if (esHeartbeat) { if (esHeartbeat) {
clearInterval(esHeartbeat); clearInterval(esHeartbeat);
@@ -3900,6 +3903,9 @@ async function switchRigFromSelect(selectEl) {
// Reconnect spectrum SSE to the new rig's spectrum channel. // Reconnect spectrum SSE to the new rig's spectrum channel.
stopSpectrumStreaming(); stopSpectrumStreaming();
startSpectrumStreaming(); startSpectrumStreaming();
// Reconnect meter SSE to the new rig's meter channel.
stopMeterStreaming();
startMeterStreaming();
// Reconnect audio to the new rig if audio is active. // Reconnect audio to the new rig if audio is active.
if (rxActive) { if (rxActive) {
stopRxAudio(); stopRxAudio();
@@ -6476,6 +6482,8 @@ const spectrumCenterLeftBtn = document.getElementById("spectrum-center-left-btn"
const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn"); const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn");
let spectrumSource = null; let spectrumSource = null;
let spectrumReconnectTimer = null; let spectrumReconnectTimer = null;
let meterSource = null;
let meterReconnectTimer = null;
let spectrumDrawPending = false; let spectrumDrawPending = false;
let spectrumAxisKey = ""; let spectrumAxisKey = "";
let spectrumDbAxisKey = ""; let spectrumDbAxisKey = "";
@@ -6996,6 +7004,63 @@ function stopSpectrumStreaming() {
clearSpectrumCanvas(); clearSpectrumCanvas();
} }
// ── /meter (fast signal-strength) streaming ─────────────────────────────────
// Dedicated SSE channel pushed at ~30 Hz by trx-server; bypasses /events so
// meter frames are never gated by full-RigState diffing. Synchronous DOM
// write per frame — no rAF coalescing, per user requirement that it "feel
// instant" on the frontend.
function scheduleMeterReconnect() {
if (meterReconnectTimer !== null) return;
meterReconnectTimer = setTimeout(() => {
meterReconnectTimer = null;
startMeterStreaming();
}, 1000);
}
function applyMeterSample(dbm) {
if (typeof dbm !== "number" || !Number.isFinite(dbm)) return;
prevRenderData.sigDbm = dbm;
const sUnits = dbmToSUnits(dbm);
sigLastSUnits = sUnits;
sigLastDbm = dbm;
const pct = sUnits <= 9 ? Math.max(0, Math.min(100, (sUnits / 9) * 100)) : 100;
if (signalBar) signalBar.style.width = `${pct}%`;
if (signalValue) signalValue.innerHTML = formatSignal(sUnits);
refreshSigStrengthDisplay();
}
function startMeterStreaming() {
if (meterSource !== null) return;
const url = lastActiveRigId
? `/meter?remote=${encodeURIComponent(lastActiveRigId)}`
: "/meter";
meterSource = new EventSource(url);
meterSource.onmessage = (evt) => {
try {
const { sig } = JSON.parse(evt.data);
applyMeterSample(sig);
} catch (_) {}
};
meterSource.onerror = () => {
if (meterSource) {
meterSource.close();
meterSource = null;
}
scheduleMeterReconnect();
};
}
function stopMeterStreaming() {
if (meterSource !== null) {
meterSource.close();
meterSource = null;
}
if (meterReconnectTimer !== null) {
clearTimeout(meterReconnectTimer);
meterReconnectTimer = null;
}
}
// ── Rendering ──────────────────────────────────────────────────────────────── // ── Rendering ────────────────────────────────────────────────────────────────
function clearSpectrumCanvas() { function clearSpectrumCanvas() {
if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return; if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return;
@@ -574,6 +574,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
// SSE streams // SSE streams
.service(sse::events) .service(sse::events)
.service(sse::spectrum) .service(sse::spectrum)
.service(sse::meter)
// Decoder endpoints // Decoder endpoints
.service(decoder::decoder_registry) .service(decoder::decoder_registry)
.service(decoder::decode_history) .service(decoder::decode_history)
@@ -19,6 +19,7 @@ use uuid::Uuid;
use trx_core::RigState; use trx_core::RigState;
use trx_frontend::FrontendRuntimeContext; use trx_frontend::FrontendRuntimeContext;
use trx_protocol::MeterUpdate;
use crate::server::vchan::ClientChannelManager; use crate::server::vchan::ClientChannelManager;
@@ -337,6 +338,62 @@ pub async fn events(
.streaming(stream)) .streaming(stream))
} }
// ============================================================================
// /meter SSE endpoint (fast signal-strength stream, ~30 Hz)
// ============================================================================
fn encode_meter_frame(update: &MeterUpdate) -> String {
// Compact JSON: one-line SSE frame, flushed immediately.
// Shape: {"sig":-72.3,"ts":12345}
format!(
"data: {{\"sig\":{:.2},\"ts\":{}}}\n\n",
update.sig_dbm, update.ts_ms
)
}
/// SSE stream for per-rig signal-strength updates.
///
/// Pushed from the server's per-rig meter broadcast; intentionally bypasses
/// the `/events` RigState path so high-rate meter samples are never gated by
/// full-state diffing. Each watch update produces exactly one SSE frame.
#[get("/meter")]
pub async fn meter(
query: web::Query<RemoteQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
let rig_id = query.remote.clone().filter(|s| !s.is_empty()).or_else(|| {
context
.routing
.active_rig_id
.lock()
.ok()
.and_then(|g| g.clone())
});
let rx = match rig_id.as_deref() {
Some(rid) => context.rig_meter_rx(rid),
None => return Ok(HttpResponse::NotFound().finish()),
};
let updates = WatchStream::new(rx).filter_map(|maybe| {
let chunk = maybe.as_ref().map(encode_meter_frame);
std::future::ready(chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s))))
});
// Infrequent keepalive comment; real meter frames carry the heartbeat.
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
let stream = select(pings, updates);
Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
.insert_header((header::CONTENT_ENCODING, "identity"))
.insert_header((header::CACHE_CONTROL, "no-cache"))
.insert_header((header::CONNECTION, "keep-alive"))
.streaming(stream))
}
// ============================================================================ // ============================================================================
// /spectrum SSE endpoint // /spectrum SSE endpoint
// ============================================================================ // ============================================================================
@@ -526,6 +526,7 @@ impl RouteAccess {
|| path == "/decode" || path == "/decode"
|| path == "/decode/history" || path == "/decode/history"
|| path == "/spectrum" || path == "/spectrum"
|| path == "/meter"
|| path == "/audio" || path == "/audio"
|| path == "/bookmarks" || path == "/bookmarks"
|| path.starts_with("/status?") || path.starts_with("/status?")
@@ -534,6 +535,7 @@ impl RouteAccess {
|| path.starts_with("/decode?") || path.starts_with("/decode?")
|| path.starts_with("/decode/history?") || path.starts_with("/decode/history?")
|| path.starts_with("/spectrum?") || path.starts_with("/spectrum?")
|| path.starts_with("/meter?")
|| path.starts_with("/audio?") || path.starts_with("/audio?")
|| path.starts_with("/bookmarks?") || path.starts_with("/bookmarks?")
|| path.starts_with("/bookmarks/") || path.starts_with("/bookmarks/")
@@ -703,6 +705,7 @@ mod tests {
assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/meter"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read);
} }