diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 264de4f..e78f4fa 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -6054,26 +6054,16 @@ function connectDecode() { if (window.resetCwHistoryView) window.resetCwHistoryView(); if (window.resetFt8HistoryView) window.resetFt8HistoryView(); if (window.resetWsprHistoryView) window.resetWsprHistoryView(); - // Live messages arrive after the history event; gate on this flag so - // onmessage does not dispatch before drainDecodeHistory has started. - let historyReceived = false; + + // Open the live SSE stream first so real-time messages are never blocked by + // history replay. History is fetched separately via a plain HTTP request and + // drained in the background using the existing chunked helper. decodeSource = new EventSource("/decode"); decodeSource.onopen = () => { decodeConnected = true; updateDecodeStatus("Connected, listening for packets"); }; - // The server sends the entire history as one named "history" event (JSON - // array). A single JSON.parse + chunked drain is far cheaper than N - // individual EventSource callbacks each blocking the main thread. - decodeSource.addEventListener("history", (evt) => { - try { - const msgs = JSON.parse(evt.data); - if (Array.isArray(msgs)) drainDecodeHistory(msgs, 0); - } catch (e) { /* ignore parse errors */ } - historyReceived = true; - }); decodeSource.onmessage = (evt) => { - if (!historyReceived) return; // skip anything before history event try { dispatchDecodeMessage(JSON.parse(evt.data)); } catch (e) { /* ignore parse errors */ } @@ -6091,6 +6081,14 @@ function connectDecode() { setTimeout(connectDecode, 5000); } }; + + // Fetch history in parallel — does not block the live SSE stream. + fetch("/decode/history").then((resp) => { + if (!resp.ok) return; + return resp.json(); + }).then((msgs) => { + if (Array.isArray(msgs)) drainDecodeHistory(msgs, 0); + }).catch(() => { /* history unavailable, ignore */ }); } if (document.readyState === "complete") { connectDecode(); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index ffdd63c..1fd8527 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -276,6 +276,63 @@ pub async fn events( .streaming(stream)) } +/// Build the combined decode history vector from all per-decoder ring-buffers. +fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec { + let mut out = Vec::new(); + out.extend( + crate::server::audio::snapshot_ais_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::Ais), + ); + out.extend( + crate::server::audio::snapshot_vdes_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::Vdes), + ); + out.extend( + crate::server::audio::snapshot_aprs_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::Aprs), + ); + out.extend( + crate::server::audio::snapshot_hf_aprs_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::HfAprs), + ); + out.extend( + crate::server::audio::snapshot_cw_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::Cw), + ); + out.extend( + crate::server::audio::snapshot_ft8_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::Ft8), + ); + out.extend( + crate::server::audio::snapshot_wspr_history(context) + .into_iter() + .map(trx_core::decode::DecodedMessage::Wspr), + ); + out +} + +/// `GET /decode/history` — returns the full decode history as a JSON array. +/// +/// Separated from the live `/decode` SSE stream so that history replay does +/// not block real-time messages: the client fetches this endpoint in parallel +/// with opening the SSE connection and drains it in the background. +#[get("/decode/history")] +pub async fn decode_history( + context: web::Data>, +) -> impl Responder { + if context.decode_rx.is_none() { + return HttpResponse::NotFound().body("decode not enabled"); + } + let history = collect_decode_history(context.get_ref()); + HttpResponse::Ok().json(history) +} + #[get("/decode")] pub async fn decode_events( context: web::Data>, @@ -286,56 +343,6 @@ pub async fn decode_events( }; tracing::info!("/decode SSE client connected"); - let history = { - let mut out = Vec::new(); - out.extend( - crate::server::audio::snapshot_ais_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::Ais), - ); - out.extend( - crate::server::audio::snapshot_vdes_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::Vdes), - ); - out.extend( - crate::server::audio::snapshot_aprs_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::Aprs), - ); - out.extend( - crate::server::audio::snapshot_hf_aprs_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::HfAprs), - ); - out.extend( - crate::server::audio::snapshot_cw_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::Cw), - ); - out.extend( - crate::server::audio::snapshot_ft8_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::Ft8), - ); - out.extend( - crate::server::audio::snapshot_wspr_history(context.get_ref()) - .into_iter() - .map(trx_core::decode::DecodedMessage::Wspr), - ); - out - }; - - // Send the entire history as a single named "history" event (JSON array). - // Sending N individual events causes N EventSource callbacks in the browser, - // each blocking the main thread — for large histories this interrupts audio - // and spectrum rendering for tens of seconds. - let history_event = { - let json = serde_json::to_string(&history).unwrap_or_else(|_| "[]".to_string()); - Bytes::from(format!("event: history\ndata: {json}\n\n")) - }; - let history_stream = once(async move { Ok::(history_event) }); - let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move { loop { match rx.recv().await { @@ -356,7 +363,7 @@ pub async fn decode_events( let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) .map(|_| Ok::(Bytes::from(": ping\n\n"))); - let stream = history_stream.chain(select(pings, decode_stream)); + let stream = select(pings, decode_stream); Ok(HttpResponse::Ok() .insert_header((header::CONTENT_TYPE, "text/event-stream")) @@ -1035,6 +1042,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(status_api) .service(list_rigs) .service(events) + .service(decode_history) .service(decode_events) .service(spectrum) .service(toggle_power)