[feat](trx-frontend-http): stream spectrum updates over SSE

Signed-off-by: Stan Grams <sjg@haxx.space>
Co-authored-by: OpenAI Codex <codex@openai.com>
This commit is contained in:
2026-02-27 22:53:22 +01:00
parent 54fb107d3b
commit c23f1a4b4d
3 changed files with 92 additions and 32 deletions
@@ -267,11 +267,11 @@ function applyCapabilities(caps) {
if (caps.filter_controls) {
spectrumPanel.style.display = "";
if (centerFreqField) centerFreqField.style.display = "";
startSpectrumPolling();
startSpectrumStreaming();
} else {
spectrumPanel.style.display = "none";
if (centerFreqField) centerFreqField.style.display = "none";
stopSpectrumPolling();
stopSpectrumStreaming();
}
}
}
@@ -1193,6 +1193,7 @@ function disconnect() {
decodeSource.close();
decodeSource = null;
}
stopSpectrumStreaming();
// Clear timers
if (esHeartbeat) {
clearInterval(esHeartbeat);
@@ -2399,7 +2400,8 @@ window.addEventListener("beforeunload", () => {
const spectrumCanvas = document.getElementById("spectrum-canvas");
const spectrumFreqAxis = document.getElementById("spectrum-freq-axis");
const spectrumTooltip = document.getElementById("spectrum-tooltip");
let spectrumPollTimer = null;
let spectrumSource = null;
let spectrumReconnectTimer = null;
let lastSpectrumData = null;
// Zoom / pan state. zoom >= 1; panFrac in [0,1] is the fraction of the full
@@ -2446,30 +2448,52 @@ function formatSpectrumFreq(hz) {
return hz.toFixed(0) + " Hz";
}
// ── Polling ──────────────────────────────────────────────────────────────────
function startSpectrumPolling() {
if (spectrumPollTimer !== null) return;
spectrumPollTimer = setInterval(fetchSpectrum, 200);
fetchSpectrum();
// ── Streaming ────────────────────────────────────────────────────────────────
function scheduleSpectrumReconnect() {
if (spectrumReconnectTimer !== null) return;
spectrumReconnectTimer = setTimeout(() => {
spectrumReconnectTimer = null;
startSpectrumStreaming();
}, 1000);
}
function stopSpectrumPolling() {
if (spectrumPollTimer !== null) { clearInterval(spectrumPollTimer); spectrumPollTimer = null; }
function startSpectrumStreaming() {
if (spectrumSource !== null) return;
spectrumSource = new EventSource("/spectrum");
spectrumSource.onmessage = (evt) => {
if (evt.data === "null") {
lastSpectrumData = null;
clearSpectrumCanvas();
return;
}
try {
lastSpectrumData = JSON.parse(evt.data);
refreshCenterFreqDisplay();
drawSpectrum(lastSpectrumData);
} catch (_) {}
};
spectrumSource.onerror = () => {
if (spectrumSource) {
spectrumSource.close();
spectrumSource = null;
}
scheduleSpectrumReconnect();
};
}
function stopSpectrumStreaming() {
if (spectrumSource !== null) {
spectrumSource.close();
spectrumSource = null;
}
if (spectrumReconnectTimer !== null) {
clearTimeout(spectrumReconnectTimer);
spectrumReconnectTimer = null;
}
lastSpectrumData = null;
clearSpectrumCanvas();
}
async function fetchSpectrum() {
try {
const resp = await fetch("/spectrum", { cache: "no-store" });
if (resp.status === 204) { lastSpectrumData = null; clearSpectrumCanvas(); return; }
if (!resp.ok) return;
lastSpectrumData = await resp.json();
refreshCenterFreqDisplay();
drawSpectrum(lastSpectrumData);
} catch (_) {}
}
// ── Rendering ────────────────────────────────────────────────────────────────
function clearSpectrumCanvas() {
if (!spectrumCanvas) return;
@@ -291,20 +291,53 @@ impl<I> futures_util::Stream for DropStream<I> {
}
}
/// Lightweight polling endpoint for spectrum data.
/// Returns the latest `SpectrumData` as JSON, or 204 No Content if unavailable.
/// SSE stream for spectrum data.
/// Emits JSON `SpectrumData` payloads when the latest frame changes.
/// Emits `null` when spectrum data becomes unavailable.
#[get("/spectrum")]
pub async fn spectrum(
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<impl Responder, Error> {
let data = context.spectrum.lock().ok().and_then(|g| g.clone());
match data {
Some(s) => Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/json"))
.insert_header((header::CACHE_CONTROL, "no-cache"))
.json(s)),
None => Ok(HttpResponse::NoContent().finish()),
}
) -> Result<HttpResponse, Error> {
let context_updates = context.get_ref().clone();
let updates = IntervalStream::new(time::interval(Duration::from_millis(200))).scan(
None::<String>,
move |last_json, _| {
let context = context_updates.clone();
std::future::ready({
let next_json = context
.spectrum
.lock()
.ok()
.and_then(|g| g.as_ref().and_then(|s| serde_json::to_string(s).ok()));
let payload = match (last_json.as_ref(), next_json) {
(Some(prev), Some(next)) if prev == &next => None,
(_, Some(next)) => {
*last_json = Some(next.clone());
Some(next)
}
(Some(_), None) => {
*last_json = None;
Some("null".to_string())
}
(None, None) => None,
};
payload.map(|json| Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n"))))
})
},
);
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
let stream = select(pings, updates);
Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
.insert_header((header::CACHE_CONTROL, "no-cache"))
.insert_header((header::CONNECTION, "keep-alive"))
.streaming(stream))
}
#[post("/toggle_power")]
@@ -423,11 +423,13 @@ impl RouteAccess {
|| path == "/rigs"
|| path == "/events"
|| path == "/decode"
|| path == "/spectrum"
|| path == "/audio"
|| path.starts_with("/status?")
|| path.starts_with("/rigs?")
|| path.starts_with("/events?")
|| path.starts_with("/decode?")
|| path.starts_with("/spectrum?")
|| path.starts_with("/audio?")
{
return Self::Read;
@@ -585,6 +587,7 @@ mod tests {
assert_eq!(RouteAccess::from_path("/rigs"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read);
}