[feat](trx-frontend-http): split decode history into separate HTTP endpoint

Move history replay out of the /decode SSE stream into a new
GET /decode/history JSON endpoint. The JS client now opens /decode
immediately for live packets (no gating) and fetches history in
parallel via fetch(), draining it in the background with the existing
chunked drainDecodeHistory() helper.

This ensures real-time decode messages are never blocked by a large
history payload, and removes the historyReceived gate entirely.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-10 18:33:40 +01:00
parent 5a3c013db5
commit 35983eb971
2 changed files with 71 additions and 65 deletions
@@ -6054,26 +6054,16 @@ function connectDecode() {
if (window.resetCwHistoryView) window.resetCwHistoryView();
if (window.resetFt8HistoryView) window.resetFt8HistoryView();
if (window.resetWsprHistoryView) window.resetWsprHistoryView();
// Live messages arrive after the history event; gate on this flag so
// onmessage does not dispatch before drainDecodeHistory has started.
let historyReceived = false;
// Open the live SSE stream first so real-time messages are never blocked by
// history replay. History is fetched separately via a plain HTTP request and
// drained in the background using the existing chunked helper.
decodeSource = new EventSource("/decode");
decodeSource.onopen = () => {
decodeConnected = true;
updateDecodeStatus("Connected, listening for packets");
};
// The server sends the entire history as one named "history" event (JSON
// array). A single JSON.parse + chunked drain is far cheaper than N
// individual EventSource callbacks each blocking the main thread.
decodeSource.addEventListener("history", (evt) => {
try {
const msgs = JSON.parse(evt.data);
if (Array.isArray(msgs)) drainDecodeHistory(msgs, 0);
} catch (e) { /* ignore parse errors */ }
historyReceived = true;
});
decodeSource.onmessage = (evt) => {
if (!historyReceived) return; // skip anything before history event
try {
dispatchDecodeMessage(JSON.parse(evt.data));
} catch (e) { /* ignore parse errors */ }
@@ -6091,6 +6081,14 @@ function connectDecode() {
setTimeout(connectDecode, 5000);
}
};
// Fetch history in parallel — does not block the live SSE stream.
fetch("/decode/history").then((resp) => {
if (!resp.ok) return;
return resp.json();
}).then((msgs) => {
if (Array.isArray(msgs)) drainDecodeHistory(msgs, 0);
}).catch(() => { /* history unavailable, ignore */ });
}
if (document.readyState === "complete") {
connectDecode();
@@ -276,6 +276,63 @@ pub async fn events(
.streaming(stream))
}
/// Build the combined decode history vector from all per-decoder ring-buffers.
fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec<trx_core::decode::DecodedMessage> {
let mut out = Vec::new();
out.extend(
crate::server::audio::snapshot_ais_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Ais),
);
out.extend(
crate::server::audio::snapshot_vdes_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Vdes),
);
out.extend(
crate::server::audio::snapshot_aprs_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Aprs),
);
out.extend(
crate::server::audio::snapshot_hf_aprs_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::HfAprs),
);
out.extend(
crate::server::audio::snapshot_cw_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Cw),
);
out.extend(
crate::server::audio::snapshot_ft8_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Ft8),
);
out.extend(
crate::server::audio::snapshot_wspr_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Wspr),
);
out
}
/// `GET /decode/history` — returns the full decode history as a JSON array.
///
/// Separated from the live `/decode` SSE stream so that history replay does
/// not block real-time messages: the client fetches this endpoint in parallel
/// with opening the SSE connection and drains it in the background.
#[get("/decode/history")]
pub async fn decode_history(
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> impl Responder {
if context.decode_rx.is_none() {
return HttpResponse::NotFound().body("decode not enabled");
}
let history = collect_decode_history(context.get_ref());
HttpResponse::Ok().json(history)
}
#[get("/decode")]
pub async fn decode_events(
context: web::Data<Arc<FrontendRuntimeContext>>,
@@ -286,56 +343,6 @@ pub async fn decode_events(
};
tracing::info!("/decode SSE client connected");
let history = {
let mut out = Vec::new();
out.extend(
crate::server::audio::snapshot_ais_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Ais),
);
out.extend(
crate::server::audio::snapshot_vdes_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Vdes),
);
out.extend(
crate::server::audio::snapshot_aprs_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Aprs),
);
out.extend(
crate::server::audio::snapshot_hf_aprs_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::HfAprs),
);
out.extend(
crate::server::audio::snapshot_cw_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Cw),
);
out.extend(
crate::server::audio::snapshot_ft8_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Ft8),
);
out.extend(
crate::server::audio::snapshot_wspr_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Wspr),
);
out
};
// Send the entire history as a single named "history" event (JSON array).
// Sending N individual events causes N EventSource callbacks in the browser,
// each blocking the main thread — for large histories this interrupts audio
// and spectrum rendering for tens of seconds.
let history_event = {
let json = serde_json::to_string(&history).unwrap_or_else(|_| "[]".to_string());
Bytes::from(format!("event: history\ndata: {json}\n\n"))
};
let history_stream = once(async move { Ok::<Bytes, Error>(history_event) });
let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move {
loop {
match rx.recv().await {
@@ -356,7 +363,7 @@ pub async fn decode_events(
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
let stream = history_stream.chain(select(pings, decode_stream));
let stream = select(pings, decode_stream);
Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
@@ -1035,6 +1042,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(status_api)
.service(list_rigs)
.service(events)
.service(decode_history)
.service(decode_events)
.service(spectrum)
.service(toggle_power)