From f9cf95705acd89869bc032420d8d61c5ab69a1a8 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sun, 19 Apr 2026 19:50:20 +0200 Subject: [PATCH] [feat](trx-frontend-http): /meter SSE endpoint for instant signal metering Adds a dedicated /meter SSE stream that wraps the per-rig meter watch and emits one compact JSON frame per update with no equality gating, so 30 Hz samples reach the browser unthrottled. Registered as a Read-access route. app.js opens a dedicated EventSource on /meter alongside /events, writing directly to the signal bar and value on each frame with no requestAnimationFrame coalescing, starts/stops with connect/disconnect, and reconnects on rig switch. Co-Authored-By: Claude Opus 4.7 Signed-off-by: Stan Grams --- Cargo.lock | 1 + .../trx-frontend-http/assets/web/app.js | 65 +++++++++++++++++++ .../trx-frontend-http/src/api/mod.rs | 1 + .../trx-frontend-http/src/api/sse.rs | 57 ++++++++++++++++ .../trx-frontend-http/src/auth.rs | 3 + 5 files changed, 127 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 6068fa7..108d00c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3168,6 +3168,7 @@ dependencies = [ "serde_json", "tokio", "trx-core", + "trx-protocol", "uuid", ] diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 5fecd57..b1ed35d 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -3700,6 +3700,8 @@ function connect() { if (esHeartbeat) { clearInterval(esHeartbeat); } + stopMeterStreaming(); + startMeterStreaming(); pollFreshSnapshot(); const eventsUrl = lastActiveRigId ? `/events?remote=${encodeURIComponent(lastActiveRigId)}` @@ -3778,6 +3780,7 @@ function disconnect() { decodeSource = null; } stopSpectrumStreaming(); + stopMeterStreaming(); // Clear timers if (esHeartbeat) { clearInterval(esHeartbeat); @@ -3900,6 +3903,9 @@ async function switchRigFromSelect(selectEl) { // Reconnect spectrum SSE to the new rig's spectrum channel. stopSpectrumStreaming(); startSpectrumStreaming(); + // Reconnect meter SSE to the new rig's meter channel. + stopMeterStreaming(); + startMeterStreaming(); // Reconnect audio to the new rig if audio is active. if (rxActive) { stopRxAudio(); @@ -6476,6 +6482,8 @@ const spectrumCenterLeftBtn = document.getElementById("spectrum-center-left-btn" const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn"); let spectrumSource = null; let spectrumReconnectTimer = null; +let meterSource = null; +let meterReconnectTimer = null; let spectrumDrawPending = false; let spectrumAxisKey = ""; let spectrumDbAxisKey = ""; @@ -6996,6 +7004,63 @@ function stopSpectrumStreaming() { clearSpectrumCanvas(); } +// ── /meter (fast signal-strength) streaming ───────────────────────────────── +// Dedicated SSE channel pushed at ~30 Hz by trx-server; bypasses /events so +// meter frames are never gated by full-RigState diffing. Synchronous DOM +// write per frame — no rAF coalescing, per user requirement that it "feel +// instant" on the frontend. +function scheduleMeterReconnect() { + if (meterReconnectTimer !== null) return; + meterReconnectTimer = setTimeout(() => { + meterReconnectTimer = null; + startMeterStreaming(); + }, 1000); +} + +function applyMeterSample(dbm) { + if (typeof dbm !== "number" || !Number.isFinite(dbm)) return; + prevRenderData.sigDbm = dbm; + const sUnits = dbmToSUnits(dbm); + sigLastSUnits = sUnits; + sigLastDbm = dbm; + const pct = sUnits <= 9 ? Math.max(0, Math.min(100, (sUnits / 9) * 100)) : 100; + if (signalBar) signalBar.style.width = `${pct}%`; + if (signalValue) signalValue.innerHTML = formatSignal(sUnits); + refreshSigStrengthDisplay(); +} + +function startMeterStreaming() { + if (meterSource !== null) return; + const url = lastActiveRigId + ? `/meter?remote=${encodeURIComponent(lastActiveRigId)}` + : "/meter"; + meterSource = new EventSource(url); + meterSource.onmessage = (evt) => { + try { + const { sig } = JSON.parse(evt.data); + applyMeterSample(sig); + } catch (_) {} + }; + meterSource.onerror = () => { + if (meterSource) { + meterSource.close(); + meterSource = null; + } + scheduleMeterReconnect(); + }; +} + +function stopMeterStreaming() { + if (meterSource !== null) { + meterSource.close(); + meterSource = null; + } + if (meterReconnectTimer !== null) { + clearTimeout(meterReconnectTimer); + meterReconnectTimer = null; + } +} + // ── Rendering ──────────────────────────────────────────────────────────────── function clearSpectrumCanvas() { if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs index c6c90b0..b665d70 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs @@ -574,6 +574,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { // SSE streams .service(sse::events) .service(sse::spectrum) + .service(sse::meter) // Decoder endpoints .service(decoder::decoder_registry) .service(decoder::decode_history) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api/sse.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api/sse.rs index 2bfcb9e..58c604f 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api/sse.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api/sse.rs @@ -19,6 +19,7 @@ use uuid::Uuid; use trx_core::RigState; use trx_frontend::FrontendRuntimeContext; +use trx_protocol::MeterUpdate; use crate::server::vchan::ClientChannelManager; @@ -337,6 +338,62 @@ pub async fn events( .streaming(stream)) } +// ============================================================================ +// /meter SSE endpoint (fast signal-strength stream, ~30 Hz) +// ============================================================================ + +fn encode_meter_frame(update: &MeterUpdate) -> String { + // Compact JSON: one-line SSE frame, flushed immediately. + // Shape: {"sig":-72.3,"ts":12345} + format!( + "data: {{\"sig\":{:.2},\"ts\":{}}}\n\n", + update.sig_dbm, update.ts_ms + ) +} + +/// SSE stream for per-rig signal-strength updates. +/// +/// Pushed from the server's per-rig meter broadcast; intentionally bypasses +/// the `/events` RigState path so high-rate meter samples are never gated by +/// full-state diffing. Each watch update produces exactly one SSE frame. +#[get("/meter")] +pub async fn meter( + query: web::Query, + context: web::Data>, +) -> Result { + let rig_id = query.remote.clone().filter(|s| !s.is_empty()).or_else(|| { + context + .routing + .active_rig_id + .lock() + .ok() + .and_then(|g| g.clone()) + }); + + let rx = match rig_id.as_deref() { + Some(rid) => context.rig_meter_rx(rid), + None => return Ok(HttpResponse::NotFound().finish()), + }; + + let updates = WatchStream::new(rx).filter_map(|maybe| { + let chunk = maybe.as_ref().map(encode_meter_frame); + std::future::ready(chunk.map(|s| Ok::(Bytes::from(s)))) + }); + + // Infrequent keepalive comment; real meter frames carry the heartbeat. + let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) + .map(|_| Ok::(Bytes::from(": ping\n\n"))); + + let stream = select(pings, updates); + + Ok(HttpResponse::Ok() + .insert_header((header::CONTENT_TYPE, "text/event-stream")) + .insert_header((header::CONTENT_ENCODING, "identity")) + .insert_header((header::CACHE_CONTROL, "no-cache")) + .insert_header((header::CONNECTION, "keep-alive")) + .streaming(stream)) +} + // ============================================================================ // /spectrum SSE endpoint // ============================================================================ diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs index d0ce147..d9a7076 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs @@ -526,6 +526,7 @@ impl RouteAccess { || path == "/decode" || path == "/decode/history" || path == "/spectrum" + || path == "/meter" || path == "/audio" || path == "/bookmarks" || path.starts_with("/status?") @@ -534,6 +535,7 @@ impl RouteAccess { || path.starts_with("/decode?") || path.starts_with("/decode/history?") || path.starts_with("/spectrum?") + || path.starts_with("/meter?") || path.starts_with("/audio?") || path.starts_with("/bookmarks?") || path.starts_with("/bookmarks/") @@ -703,6 +705,7 @@ mod tests { assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read); + assert_eq!(RouteAccess::from_path("/meter"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read); }