From c23f1a4b4de15f2249b6773381159b824a025be1 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Fri, 27 Feb 2026 22:53:22 +0100 Subject: [PATCH] [feat](trx-frontend-http): stream spectrum updates over SSE Signed-off-by: Stan Grams Co-authored-by: OpenAI Codex --- .../trx-frontend-http/assets/web/app.js | 66 +++++++++++++------ .../trx-frontend/trx-frontend-http/src/api.rs | 55 ++++++++++++---- .../trx-frontend-http/src/auth.rs | 3 + 3 files changed, 92 insertions(+), 32 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 4787b74..098b172 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -267,11 +267,11 @@ function applyCapabilities(caps) { if (caps.filter_controls) { spectrumPanel.style.display = ""; if (centerFreqField) centerFreqField.style.display = ""; - startSpectrumPolling(); + startSpectrumStreaming(); } else { spectrumPanel.style.display = "none"; if (centerFreqField) centerFreqField.style.display = "none"; - stopSpectrumPolling(); + stopSpectrumStreaming(); } } } @@ -1193,6 +1193,7 @@ function disconnect() { decodeSource.close(); decodeSource = null; } + stopSpectrumStreaming(); // Clear timers if (esHeartbeat) { clearInterval(esHeartbeat); @@ -2399,7 +2400,8 @@ window.addEventListener("beforeunload", () => { const spectrumCanvas = document.getElementById("spectrum-canvas"); const spectrumFreqAxis = document.getElementById("spectrum-freq-axis"); const spectrumTooltip = document.getElementById("spectrum-tooltip"); -let spectrumPollTimer = null; +let spectrumSource = null; +let spectrumReconnectTimer = null; let lastSpectrumData = null; // Zoom / pan state. zoom >= 1; panFrac in [0,1] is the fraction of the full @@ -2446,30 +2448,52 @@ function formatSpectrumFreq(hz) { return hz.toFixed(0) + " Hz"; } -// ── Polling ────────────────────────────────────────────────────────────────── -function startSpectrumPolling() { - if (spectrumPollTimer !== null) return; - spectrumPollTimer = setInterval(fetchSpectrum, 200); - fetchSpectrum(); +// ── Streaming ──────────────────────────────────────────────────────────────── +function scheduleSpectrumReconnect() { + if (spectrumReconnectTimer !== null) return; + spectrumReconnectTimer = setTimeout(() => { + spectrumReconnectTimer = null; + startSpectrumStreaming(); + }, 1000); } -function stopSpectrumPolling() { - if (spectrumPollTimer !== null) { clearInterval(spectrumPollTimer); spectrumPollTimer = null; } +function startSpectrumStreaming() { + if (spectrumSource !== null) return; + spectrumSource = new EventSource("/spectrum"); + spectrumSource.onmessage = (evt) => { + if (evt.data === "null") { + lastSpectrumData = null; + clearSpectrumCanvas(); + return; + } + try { + lastSpectrumData = JSON.parse(evt.data); + refreshCenterFreqDisplay(); + drawSpectrum(lastSpectrumData); + } catch (_) {} + }; + spectrumSource.onerror = () => { + if (spectrumSource) { + spectrumSource.close(); + spectrumSource = null; + } + scheduleSpectrumReconnect(); + }; +} + +function stopSpectrumStreaming() { + if (spectrumSource !== null) { + spectrumSource.close(); + spectrumSource = null; + } + if (spectrumReconnectTimer !== null) { + clearTimeout(spectrumReconnectTimer); + spectrumReconnectTimer = null; + } lastSpectrumData = null; clearSpectrumCanvas(); } -async function fetchSpectrum() { - try { - const resp = await fetch("/spectrum", { cache: "no-store" }); - if (resp.status === 204) { lastSpectrumData = null; clearSpectrumCanvas(); return; } - if (!resp.ok) return; - lastSpectrumData = await resp.json(); - refreshCenterFreqDisplay(); - drawSpectrum(lastSpectrumData); - } catch (_) {} -} - // ── Rendering ──────────────────────────────────────────────────────────────── function clearSpectrumCanvas() { if (!spectrumCanvas) return; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 6b1ed71..a209178 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -291,20 +291,53 @@ impl futures_util::Stream for DropStream { } } -/// Lightweight polling endpoint for spectrum data. -/// Returns the latest `SpectrumData` as JSON, or 204 No Content if unavailable. +/// SSE stream for spectrum data. +/// Emits JSON `SpectrumData` payloads when the latest frame changes. +/// Emits `null` when spectrum data becomes unavailable. #[get("/spectrum")] pub async fn spectrum( context: web::Data>, -) -> Result { - let data = context.spectrum.lock().ok().and_then(|g| g.clone()); - match data { - Some(s) => Ok(HttpResponse::Ok() - .insert_header((header::CONTENT_TYPE, "application/json")) - .insert_header((header::CACHE_CONTROL, "no-cache")) - .json(s)), - None => Ok(HttpResponse::NoContent().finish()), - } +) -> Result { + let context_updates = context.get_ref().clone(); + let updates = IntervalStream::new(time::interval(Duration::from_millis(200))).scan( + None::, + move |last_json, _| { + let context = context_updates.clone(); + std::future::ready({ + let next_json = context + .spectrum + .lock() + .ok() + .and_then(|g| g.as_ref().and_then(|s| serde_json::to_string(s).ok())); + + let payload = match (last_json.as_ref(), next_json) { + (Some(prev), Some(next)) if prev == &next => None, + (_, Some(next)) => { + *last_json = Some(next.clone()); + Some(next) + } + (Some(_), None) => { + *last_json = None; + Some("null".to_string()) + } + (None, None) => None, + }; + + payload.map(|json| Ok::(Bytes::from(format!("data: {json}\n\n")))) + }) + }, + ); + + let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) + .map(|_| Ok::(Bytes::from(": ping\n\n"))); + + let stream = select(pings, updates); + + Ok(HttpResponse::Ok() + .insert_header((header::CONTENT_TYPE, "text/event-stream")) + .insert_header((header::CACHE_CONTROL, "no-cache")) + .insert_header((header::CONNECTION, "keep-alive")) + .streaming(stream)) } #[post("/toggle_power")] diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs index 965d781..b56fb10 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs @@ -423,11 +423,13 @@ impl RouteAccess { || path == "/rigs" || path == "/events" || path == "/decode" + || path == "/spectrum" || path == "/audio" || path.starts_with("/status?") || path.starts_with("/rigs?") || path.starts_with("/events?") || path.starts_with("/decode?") + || path.starts_with("/spectrum?") || path.starts_with("/audio?") { return Self::Read; @@ -585,6 +587,7 @@ mod tests { assert_eq!(RouteAccess::from_path("/rigs"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read); + assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read); assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read); }