diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 00bb0cf..6f03bef 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -6396,6 +6396,7 @@ function scheduleSpectrumReconnect() { function startSpectrumStreaming() { if (spectrumSource !== null) return; spectrumSource = new EventSource("/spectrum"); + // Unnamed event = reset signal. spectrumSource.onmessage = (evt) => { if (evt.data === "null") { rejectPendingSpectrumFrameWaiters(new Error("Spectrum stream reset")); @@ -6408,25 +6409,45 @@ function startSpectrumStreaming() { scheduleOverviewDraw(); clearSpectrumCanvas(); updateRdsPsOverlay(null); - return; } + }; + // Named "b" event = compact binary frame: "{center_hz},{sample_rate},{base64_i8_bins}" + // Bins are i8 (1 dB/step), base64-encoded for ~5× size reduction vs JSON f32 array. + // Named "b" event = compact binary frame: "{center_hz},{sample_rate},{base64_i8_bins}" + // Bins are i8 (1 dB/step), base64-encoded for ~5× size reduction vs JSON f32 array. + spectrumSource.addEventListener("b", (evt) => { try { - lastSpectrumData = JSON.parse(evt.data); + const commaA = evt.data.indexOf(","); + const commaB = evt.data.indexOf(",", commaA + 1); + const centerHz = Number(evt.data.slice(0, commaA)); + const sampleRate = Number(evt.data.slice(commaA + 1, commaB)); + const b64 = evt.data.slice(commaB + 1); + const raw = atob(b64); + const bins = new Array(raw.length); + for (let i = 0; i < raw.length; i++) bins[i] = (raw.charCodeAt(i) << 24 >> 24); + // Preserve any RDS data from the last rds event. + const rds = lastSpectrumData?.rds; + lastSpectrumData = { bins, center_hz: centerHz, sample_rate: sampleRate, rds }; window.lastSpectrumData = lastSpectrumData; lastSpectrumRenderData = buildSpectrumRenderData(lastSpectrumData); settlePendingSpectrumFrameWaiters(lastSpectrumData); pushSpectrumPeakHoldFrame(lastSpectrumRenderData); pushOverviewWaterfallFrame(lastSpectrumData); refreshCenterFreqDisplay(); - if (window.refreshCwTonePicker) { - window.refreshCwTonePicker(); - } + if (window.refreshCwTonePicker) window.refreshCwTonePicker(); scheduleSpectrumDraw(); - if (lastModeName === "WFM") { - updateRdsPsOverlay(lastSpectrumData.rds); - } + if (lastModeName === "WFM") updateRdsPsOverlay(lastSpectrumData.rds); } catch (_) {} - }; + }); + // Named "rds" event = RDS metadata changed (emitted only when it changes). + spectrumSource.addEventListener("rds", (evt) => { + try { + const rds = evt.data === "null" ? undefined : JSON.parse(evt.data); + if (lastSpectrumData) lastSpectrumData.rds = rds; + if (lastModeName === "WFM") updateRdsPsOverlay(rds ?? null); + updateDocumentTitle(rds ?? null); + } catch (_) {} + }); spectrumSource.onerror = () => { rejectPendingSpectrumFrameWaiters(new Error("Spectrum stream disconnected")); if (spectrumSource) { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 8a070da..6a889a7 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -30,6 +30,41 @@ const LOGO_BYTES: &[u8] = include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/assets/trx-logo.png")); const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); +/// Base64-encode `data` using the standard alphabet (no line wrapping). +fn base64_encode(data: &[u8]) -> String { + const T: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + let mut out = Vec::with_capacity((data.len() + 2) / 3 * 4); + for chunk in data.chunks(3) { + let b0 = chunk[0] as u32; + let b1 = chunk.get(1).copied().unwrap_or(0) as u32; + let b2 = chunk.get(2).copied().unwrap_or(0) as u32; + let n = (b0 << 16) | (b1 << 8) | b2; + out.push(T[((n >> 18) & 63) as usize]); + out.push(T[((n >> 12) & 63) as usize]); + out.push(if chunk.len() > 1 { T[((n >> 6) & 63) as usize] } else { b'=' }); + out.push(if chunk.len() > 2 { T[(n & 63) as usize] } else { b'=' }); + } + // SAFETY: output contains only ASCII base64 characters. + unsafe { String::from_utf8_unchecked(out) } +} + +/// Encode spectrum bins as a compact base64 string of i8 values (1 dB/step). +/// +/// Wire format for the `b` SSE event: +/// `{center_hz},{sample_rate},{base64_i8_bins}` +/// +/// RDS is intentionally excluded — it changes rarely and is sent via the +/// `/events` state stream instead. +fn encode_spectrum_frame(frame: &trx_core::rig::state::SpectrumData) -> String { + let bytes: Vec = frame + .bins + .iter() + .map(|&v| v.round().clamp(-128.0, 127.0) as i8 as u8) + .collect(); + let b64 = base64_encode(&bytes); + format!("{},{},{b64}", frame.center_hz, frame.sample_rate) +} + struct FrontendMeta { http_clients: usize, rigctl_clients: usize, @@ -383,39 +418,56 @@ impl futures_util::Stream for DropStream { } /// SSE stream for spectrum data. -/// Emits JSON `SpectrumData` payloads when the latest frame changes. -/// Emits `null` when spectrum data becomes unavailable. +/// +/// Emits compact binary frames as named SSE event `b`: +/// `event: b\ndata: {center_hz},{sample_rate},{base64_i8_bins}[|{rds_json}]\n\n` +/// Bins are quantized to i8 (1 dB/step, −128…+127 dBFS) for ~5× bandwidth +/// reduction versus full-precision JSON. +/// +/// Emits an unnamed `data: null` event when spectrum data becomes unavailable. #[get("/spectrum")] pub async fn spectrum( context: web::Data>, ) -> Result { let context_updates = context.get_ref().clone(); let mut last_revision: Option = None; + let mut last_rds_json: Option = None; let updates = IntervalStream::new(time::interval(Duration::from_millis(40))).filter_map(move |_| { let context = context_updates.clone(); std::future::ready({ let next = context.spectrum.lock().ok().map(|g| g.snapshot()); - let payload = match next { + let sse_chunk: Option = match next { Some((revision, _frame)) if last_revision == Some(revision) => None, Some((revision, Some(frame))) => { last_revision = Some(revision); - serde_json::to_string(&frame).ok() + let mut chunk = + format!("event: b\ndata: {}\n\n", encode_spectrum_frame(&frame)); + // Append an `rds` event only when the RDS payload changes. + let rds_json = frame + .rds + .as_ref() + .and_then(|r| serde_json::to_string(r).ok()); + if rds_json != last_rds_json { + let data = rds_json.as_deref().unwrap_or("null"); + chunk.push_str(&format!("event: rds\ndata: {data}\n\n")); + last_rds_json = rds_json; + } + Some(chunk) } Some((revision, None)) => { last_revision = Some(revision); - Some("null".to_string()) + Some("data: null\n\n".to_string()) } None if last_revision.is_some() => { - // Lock poisoning is transient; retry instead of breaking stream semantics. last_revision = None; - Some("null".to_string()) + Some("data: null\n\n".to_string()) } None => None, }; - payload.map(|json| Ok::(Bytes::from(format!("data: {json}\n\n")))) + sse_chunk.map(|s| Ok::(Bytes::from(s))) }) });