[feat](trx-frontend-http): encode spectrum bins as compact i8/base64 SSE

Replace the JSON f32 array (~7.5 KB/frame) with a named SSE event "b"
carrying base64-encoded i8 bins (~1.4 KB/frame, ~5x reduction):

  event: b
  data: {center_hz},{sample_rate},{base64_i8_bins}

1 dB per step covers the -128…+127 dBFS display range, sufficient for
visualization. RDS is stripped from the spectrum frame and emitted as a
separate named "event: rds" only when the payload changes. The JS
decoder uses atob() + sign-extension to reconstruct the float bin array.
A minimal inline base64 encoder is added server-side (no new crate).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-09 21:34:11 +01:00
parent 1d8b77ae44
commit 75355c75a5
2 changed files with 90 additions and 17 deletions
@@ -6396,6 +6396,7 @@ function scheduleSpectrumReconnect() {
function startSpectrumStreaming() { function startSpectrumStreaming() {
if (spectrumSource !== null) return; if (spectrumSource !== null) return;
spectrumSource = new EventSource("/spectrum"); spectrumSource = new EventSource("/spectrum");
// Unnamed event = reset signal.
spectrumSource.onmessage = (evt) => { spectrumSource.onmessage = (evt) => {
if (evt.data === "null") { if (evt.data === "null") {
rejectPendingSpectrumFrameWaiters(new Error("Spectrum stream reset")); rejectPendingSpectrumFrameWaiters(new Error("Spectrum stream reset"));
@@ -6408,25 +6409,45 @@ function startSpectrumStreaming() {
scheduleOverviewDraw(); scheduleOverviewDraw();
clearSpectrumCanvas(); clearSpectrumCanvas();
updateRdsPsOverlay(null); updateRdsPsOverlay(null);
return;
} }
};
// Named "b" event = compact binary frame: "{center_hz},{sample_rate},{base64_i8_bins}"
// Bins are i8 (1 dB/step), base64-encoded for ~5× size reduction vs JSON f32 array.
// Named "b" event = compact binary frame: "{center_hz},{sample_rate},{base64_i8_bins}"
// Bins are i8 (1 dB/step), base64-encoded for ~5× size reduction vs JSON f32 array.
spectrumSource.addEventListener("b", (evt) => {
try { try {
lastSpectrumData = JSON.parse(evt.data); const commaA = evt.data.indexOf(",");
const commaB = evt.data.indexOf(",", commaA + 1);
const centerHz = Number(evt.data.slice(0, commaA));
const sampleRate = Number(evt.data.slice(commaA + 1, commaB));
const b64 = evt.data.slice(commaB + 1);
const raw = atob(b64);
const bins = new Array(raw.length);
for (let i = 0; i < raw.length; i++) bins[i] = (raw.charCodeAt(i) << 24 >> 24);
// Preserve any RDS data from the last rds event.
const rds = lastSpectrumData?.rds;
lastSpectrumData = { bins, center_hz: centerHz, sample_rate: sampleRate, rds };
window.lastSpectrumData = lastSpectrumData; window.lastSpectrumData = lastSpectrumData;
lastSpectrumRenderData = buildSpectrumRenderData(lastSpectrumData); lastSpectrumRenderData = buildSpectrumRenderData(lastSpectrumData);
settlePendingSpectrumFrameWaiters(lastSpectrumData); settlePendingSpectrumFrameWaiters(lastSpectrumData);
pushSpectrumPeakHoldFrame(lastSpectrumRenderData); pushSpectrumPeakHoldFrame(lastSpectrumRenderData);
pushOverviewWaterfallFrame(lastSpectrumData); pushOverviewWaterfallFrame(lastSpectrumData);
refreshCenterFreqDisplay(); refreshCenterFreqDisplay();
if (window.refreshCwTonePicker) { if (window.refreshCwTonePicker) window.refreshCwTonePicker();
window.refreshCwTonePicker();
}
scheduleSpectrumDraw(); scheduleSpectrumDraw();
if (lastModeName === "WFM") { if (lastModeName === "WFM") updateRdsPsOverlay(lastSpectrumData.rds);
updateRdsPsOverlay(lastSpectrumData.rds);
}
} catch (_) {} } catch (_) {}
}; });
// Named "rds" event = RDS metadata changed (emitted only when it changes).
spectrumSource.addEventListener("rds", (evt) => {
try {
const rds = evt.data === "null" ? undefined : JSON.parse(evt.data);
if (lastSpectrumData) lastSpectrumData.rds = rds;
if (lastModeName === "WFM") updateRdsPsOverlay(rds ?? null);
updateDocumentTitle(rds ?? null);
} catch (_) {}
});
spectrumSource.onerror = () => { spectrumSource.onerror = () => {
rejectPendingSpectrumFrameWaiters(new Error("Spectrum stream disconnected")); rejectPendingSpectrumFrameWaiters(new Error("Spectrum stream disconnected"));
if (spectrumSource) { if (spectrumSource) {
@@ -30,6 +30,41 @@ const LOGO_BYTES: &[u8] =
include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/assets/trx-logo.png")); include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/assets/trx-logo.png"));
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// Base64-encode `data` using the standard alphabet (no line wrapping).
fn base64_encode(data: &[u8]) -> String {
const T: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut out = Vec::with_capacity((data.len() + 2) / 3 * 4);
for chunk in data.chunks(3) {
let b0 = chunk[0] as u32;
let b1 = chunk.get(1).copied().unwrap_or(0) as u32;
let b2 = chunk.get(2).copied().unwrap_or(0) as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
out.push(T[((n >> 18) & 63) as usize]);
out.push(T[((n >> 12) & 63) as usize]);
out.push(if chunk.len() > 1 { T[((n >> 6) & 63) as usize] } else { b'=' });
out.push(if chunk.len() > 2 { T[(n & 63) as usize] } else { b'=' });
}
// SAFETY: output contains only ASCII base64 characters.
unsafe { String::from_utf8_unchecked(out) }
}
/// Encode spectrum bins as a compact base64 string of i8 values (1 dB/step).
///
/// Wire format for the `b` SSE event:
/// `{center_hz},{sample_rate},{base64_i8_bins}`
///
/// RDS is intentionally excluded — it changes rarely and is sent via the
/// `/events` state stream instead.
fn encode_spectrum_frame(frame: &trx_core::rig::state::SpectrumData) -> String {
let bytes: Vec<u8> = frame
.bins
.iter()
.map(|&v| v.round().clamp(-128.0, 127.0) as i8 as u8)
.collect();
let b64 = base64_encode(&bytes);
format!("{},{},{b64}", frame.center_hz, frame.sample_rate)
}
struct FrontendMeta { struct FrontendMeta {
http_clients: usize, http_clients: usize,
rigctl_clients: usize, rigctl_clients: usize,
@@ -383,39 +418,56 @@ impl<I> futures_util::Stream for DropStream<I> {
} }
/// SSE stream for spectrum data. /// SSE stream for spectrum data.
/// Emits JSON `SpectrumData` payloads when the latest frame changes. ///
/// Emits `null` when spectrum data becomes unavailable. /// Emits compact binary frames as named SSE event `b`:
/// `event: b\ndata: {center_hz},{sample_rate},{base64_i8_bins}[|{rds_json}]\n\n`
/// Bins are quantized to i8 (1 dB/step, 128…+127 dBFS) for ~5× bandwidth
/// reduction versus full-precision JSON.
///
/// Emits an unnamed `data: null` event when spectrum data becomes unavailable.
#[get("/spectrum")] #[get("/spectrum")]
pub async fn spectrum( pub async fn spectrum(
context: web::Data<Arc<FrontendRuntimeContext>>, context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let context_updates = context.get_ref().clone(); let context_updates = context.get_ref().clone();
let mut last_revision: Option<u64> = None; let mut last_revision: Option<u64> = None;
let mut last_rds_json: Option<String> = None;
let updates = let updates =
IntervalStream::new(time::interval(Duration::from_millis(40))).filter_map(move |_| { IntervalStream::new(time::interval(Duration::from_millis(40))).filter_map(move |_| {
let context = context_updates.clone(); let context = context_updates.clone();
std::future::ready({ std::future::ready({
let next = context.spectrum.lock().ok().map(|g| g.snapshot()); let next = context.spectrum.lock().ok().map(|g| g.snapshot());
let payload = match next { let sse_chunk: Option<String> = match next {
Some((revision, _frame)) if last_revision == Some(revision) => None, Some((revision, _frame)) if last_revision == Some(revision) => None,
Some((revision, Some(frame))) => { Some((revision, Some(frame))) => {
last_revision = Some(revision); last_revision = Some(revision);
serde_json::to_string(&frame).ok() let mut chunk =
format!("event: b\ndata: {}\n\n", encode_spectrum_frame(&frame));
// Append an `rds` event only when the RDS payload changes.
let rds_json = frame
.rds
.as_ref()
.and_then(|r| serde_json::to_string(r).ok());
if rds_json != last_rds_json {
let data = rds_json.as_deref().unwrap_or("null");
chunk.push_str(&format!("event: rds\ndata: {data}\n\n"));
last_rds_json = rds_json;
}
Some(chunk)
} }
Some((revision, None)) => { Some((revision, None)) => {
last_revision = Some(revision); last_revision = Some(revision);
Some("null".to_string()) Some("data: null\n\n".to_string())
} }
None if last_revision.is_some() => { None if last_revision.is_some() => {
// Lock poisoning is transient; retry instead of breaking stream semantics.
last_revision = None; last_revision = None;
Some("null".to_string()) Some("data: null\n\n".to_string())
} }
None => None, None => None,
}; };
payload.map(|json| Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n")))) sse_chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s)))
}) })
}); });