[fix](trx-frontend-http): send decode history as single JSON array event
Previously the server emitted N individual SSE events (one per decoded message) followed by a history_done sentinel. With a 1.3 MB history this caused thousands of EventSource onmessage callbacks each blocking the JS main thread, interrupting audio playback and spectrum rendering for 50+ seconds. Server: serialize the entire history Vec as a single named "history" event containing a JSON array, then chain directly into the live decode stream. One serde_json::to_string call instead of N. JS: listen for the "history" event, parse the array once, pass it to the existing drainDecodeHistory() chunked dispatcher (30 msgs per setTimeout slice to stay off the main thread), then gate onmessage dispatching on historyReceived. Removes the historyBuffer accumulator and the history_done event entirely. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -6036,28 +6036,29 @@ function connectDecode() {
|
||||
if (window.resetCwHistoryView) window.resetCwHistoryView();
|
||||
if (window.resetFt8HistoryView) window.resetFt8HistoryView();
|
||||
if (window.resetWsprHistoryView) window.resetWsprHistoryView();
|
||||
const historyBuffer = [];
|
||||
let historyDone = false;
|
||||
// Live messages arrive after the history event; gate on this flag so
|
||||
// onmessage does not dispatch before drainDecodeHistory has started.
|
||||
let historyReceived = false;
|
||||
decodeSource = new EventSource("/decode");
|
||||
decodeSource.onopen = () => {
|
||||
decodeConnected = true;
|
||||
updateDecodeStatus("Connected, listening for packets");
|
||||
};
|
||||
decodeSource.addEventListener("history_done", () => {
|
||||
historyDone = true;
|
||||
drainDecodeHistory(historyBuffer, 0);
|
||||
// The server sends the entire history as one named "history" event (JSON
|
||||
// array). A single JSON.parse + chunked drain is far cheaper than N
|
||||
// individual EventSource callbacks each blocking the main thread.
|
||||
decodeSource.addEventListener("history", (evt) => {
|
||||
try {
|
||||
const msgs = JSON.parse(evt.data);
|
||||
if (Array.isArray(msgs)) drainDecodeHistory(msgs, 0);
|
||||
} catch (e) { /* ignore parse errors */ }
|
||||
historyReceived = true;
|
||||
});
|
||||
decodeSource.onmessage = (evt) => {
|
||||
if (!historyReceived) return; // skip anything before history event
|
||||
try {
|
||||
const msg = JSON.parse(evt.data);
|
||||
if (!historyDone) {
|
||||
historyBuffer.push(msg);
|
||||
} else {
|
||||
dispatchDecodeMessage(msg);
|
||||
}
|
||||
} catch (e) {
|
||||
// ignore parse errors
|
||||
}
|
||||
dispatchDecodeMessage(JSON.parse(evt.data));
|
||||
} catch (e) { /* ignore parse errors */ }
|
||||
};
|
||||
decodeSource.onerror = () => {
|
||||
// readyState CLOSED (2) = server rejected (404/error), CONNECTING (0) = temporary drop
|
||||
|
||||
@@ -326,14 +326,15 @@ pub async fn decode_events(
|
||||
out
|
||||
};
|
||||
|
||||
let history_stream = futures_util::stream::iter(history.into_iter().filter_map(|msg| {
|
||||
serde_json::to_string(&msg)
|
||||
.ok()
|
||||
.map(|json| Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n"))))
|
||||
}));
|
||||
|
||||
let history_done =
|
||||
once(async { Ok::<Bytes, Error>(Bytes::from("event: history_done\ndata: {}\n\n")) });
|
||||
// Send the entire history as a single named "history" event (JSON array).
|
||||
// Sending N individual events causes N EventSource callbacks in the browser,
|
||||
// each blocking the main thread — for large histories this interrupts audio
|
||||
// and spectrum rendering for tens of seconds.
|
||||
let history_event = {
|
||||
let json = serde_json::to_string(&history).unwrap_or_else(|_| "[]".to_string());
|
||||
Bytes::from(format!("event: history\ndata: {json}\n\n"))
|
||||
};
|
||||
let history_stream = once(async move { Ok::<Bytes, Error>(history_event) });
|
||||
|
||||
let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move {
|
||||
loop {
|
||||
@@ -355,7 +356,7 @@ pub async fn decode_events(
|
||||
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
||||
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
||||
|
||||
let stream = history_stream.chain(history_done).chain(select(pings, decode_stream));
|
||||
let stream = history_stream.chain(select(pings, decode_stream));
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
|
||||
|
||||
Reference in New Issue
Block a user