From e2c568a98a0c15c52a0c700d9594a4d8af43a6c9 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 7 Mar 2026 09:12:13 +0100 Subject: [PATCH] [fix](trx-frontend-http): async decode history replay to unblock UI Server emits an SSE sentinel event (history_done) after replaying stored history. Client buffers all incoming messages until the sentinel arrives, then drains the buffer in 30-event chunks via setTimeout so the browser can handle input between batches. Live events after the sentinel are dispatched immediately as before. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- .../trx-frontend-http/assets/web/app.js | 33 +++++++++++++++---- .../trx-frontend/trx-frontend-http/src/api.rs | 5 ++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index d8c0d7e..5ddf03a 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -5982,6 +5982,22 @@ function updateDecodeStatus(text) { setModeBoundDecodeStatus(cw, ["CW", "CWR"], "Select CW mode to decode", cwText); if (ft8 && ft8.textContent !== "Receiving") ft8.textContent = text; } +function dispatchDecodeMessage(msg) { + if (msg.type === "ais" && window.onServerAis) window.onServerAis(msg); + if (msg.type === "vdes" && window.onServerVdes) window.onServerVdes(msg); + if (msg.type === "aprs" && window.onServerAprs) window.onServerAprs(msg); + if (msg.type === "cw" && window.onServerCw) window.onServerCw(msg); + if (msg.type === "ft8" && window.onServerFt8) window.onServerFt8(msg); + if (msg.type === "wspr" && window.onServerWspr) window.onServerWspr(msg); +} + +function drainDecodeHistory(buffer, index) { + const CHUNK = 30; + const end = Math.min(index + CHUNK, buffer.length); + for (let i = index; i < end; i++) dispatchDecodeMessage(buffer[i]); + if (end < buffer.length) setTimeout(() => drainDecodeHistory(buffer, end), 0); +} + function connectDecode() { if (decodeSource) { decodeSource.close(); } if (window.resetAisHistoryView) window.resetAisHistoryView(); @@ -5990,20 +6006,25 @@ function connectDecode() { if (window.resetCwHistoryView) window.resetCwHistoryView(); if (window.resetFt8HistoryView) window.resetFt8HistoryView(); if (window.resetWsprHistoryView) window.resetWsprHistoryView(); + const historyBuffer = []; + let historyDone = false; decodeSource = new EventSource("/decode"); decodeSource.onopen = () => { decodeConnected = true; updateDecodeStatus("Connected, listening for packets"); }; + decodeSource.addEventListener("history_done", () => { + historyDone = true; + drainDecodeHistory(historyBuffer, 0); + }); decodeSource.onmessage = (evt) => { try { const msg = JSON.parse(evt.data); - if (msg.type === "ais" && window.onServerAis) window.onServerAis(msg); - if (msg.type === "vdes" && window.onServerVdes) window.onServerVdes(msg); - if (msg.type === "aprs" && window.onServerAprs) window.onServerAprs(msg); - if (msg.type === "cw" && window.onServerCw) window.onServerCw(msg); - if (msg.type === "ft8" && window.onServerFt8) window.onServerFt8(msg); - if (msg.type === "wspr" && window.onServerWspr) window.onServerWspr(msg); + if (!historyDone) { + historyBuffer.push(msg); + } else { + dispatchDecodeMessage(msg); + } } catch (e) { // ignore parse errors } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 5b15793..2d59188 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -307,6 +307,9 @@ pub async fn decode_events( .map(|json| Ok::(Bytes::from(format!("data: {json}\n\n")))) })); + let history_done = + once(async { Ok::(Bytes::from("event: history_done\ndata: {}\n\n")) }); + let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move { loop { match rx.recv().await { @@ -327,7 +330,7 @@ pub async fn decode_events( let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) .map(|_| Ok::(Bytes::from(": ping\n\n"))); - let stream = history_stream.chain(select(pings, decode_stream)); + let stream = history_stream.chain(history_done).chain(select(pings, decode_stream)); Ok(HttpResponse::Ok() .insert_header((header::CONTENT_TYPE, "text/event-stream"))