[fix](trx-frontend-http): async decode history replay to unblock UI
Server emits an SSE sentinel event (history_done) after replaying stored history. Client buffers all incoming messages until the sentinel arrives, then drains the buffer in 30-event chunks via setTimeout so the browser can handle input between batches. Live events after the sentinel are dispatched immediately as before. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -5982,6 +5982,22 @@ function updateDecodeStatus(text) {
|
|||||||
setModeBoundDecodeStatus(cw, ["CW", "CWR"], "Select CW mode to decode", cwText);
|
setModeBoundDecodeStatus(cw, ["CW", "CWR"], "Select CW mode to decode", cwText);
|
||||||
if (ft8 && ft8.textContent !== "Receiving") ft8.textContent = text;
|
if (ft8 && ft8.textContent !== "Receiving") ft8.textContent = text;
|
||||||
}
|
}
|
||||||
|
function dispatchDecodeMessage(msg) {
|
||||||
|
if (msg.type === "ais" && window.onServerAis) window.onServerAis(msg);
|
||||||
|
if (msg.type === "vdes" && window.onServerVdes) window.onServerVdes(msg);
|
||||||
|
if (msg.type === "aprs" && window.onServerAprs) window.onServerAprs(msg);
|
||||||
|
if (msg.type === "cw" && window.onServerCw) window.onServerCw(msg);
|
||||||
|
if (msg.type === "ft8" && window.onServerFt8) window.onServerFt8(msg);
|
||||||
|
if (msg.type === "wspr" && window.onServerWspr) window.onServerWspr(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
function drainDecodeHistory(buffer, index) {
|
||||||
|
const CHUNK = 30;
|
||||||
|
const end = Math.min(index + CHUNK, buffer.length);
|
||||||
|
for (let i = index; i < end; i++) dispatchDecodeMessage(buffer[i]);
|
||||||
|
if (end < buffer.length) setTimeout(() => drainDecodeHistory(buffer, end), 0);
|
||||||
|
}
|
||||||
|
|
||||||
function connectDecode() {
|
function connectDecode() {
|
||||||
if (decodeSource) { decodeSource.close(); }
|
if (decodeSource) { decodeSource.close(); }
|
||||||
if (window.resetAisHistoryView) window.resetAisHistoryView();
|
if (window.resetAisHistoryView) window.resetAisHistoryView();
|
||||||
@@ -5990,20 +6006,25 @@ function connectDecode() {
|
|||||||
if (window.resetCwHistoryView) window.resetCwHistoryView();
|
if (window.resetCwHistoryView) window.resetCwHistoryView();
|
||||||
if (window.resetFt8HistoryView) window.resetFt8HistoryView();
|
if (window.resetFt8HistoryView) window.resetFt8HistoryView();
|
||||||
if (window.resetWsprHistoryView) window.resetWsprHistoryView();
|
if (window.resetWsprHistoryView) window.resetWsprHistoryView();
|
||||||
|
const historyBuffer = [];
|
||||||
|
let historyDone = false;
|
||||||
decodeSource = new EventSource("/decode");
|
decodeSource = new EventSource("/decode");
|
||||||
decodeSource.onopen = () => {
|
decodeSource.onopen = () => {
|
||||||
decodeConnected = true;
|
decodeConnected = true;
|
||||||
updateDecodeStatus("Connected, listening for packets");
|
updateDecodeStatus("Connected, listening for packets");
|
||||||
};
|
};
|
||||||
|
decodeSource.addEventListener("history_done", () => {
|
||||||
|
historyDone = true;
|
||||||
|
drainDecodeHistory(historyBuffer, 0);
|
||||||
|
});
|
||||||
decodeSource.onmessage = (evt) => {
|
decodeSource.onmessage = (evt) => {
|
||||||
try {
|
try {
|
||||||
const msg = JSON.parse(evt.data);
|
const msg = JSON.parse(evt.data);
|
||||||
if (msg.type === "ais" && window.onServerAis) window.onServerAis(msg);
|
if (!historyDone) {
|
||||||
if (msg.type === "vdes" && window.onServerVdes) window.onServerVdes(msg);
|
historyBuffer.push(msg);
|
||||||
if (msg.type === "aprs" && window.onServerAprs) window.onServerAprs(msg);
|
} else {
|
||||||
if (msg.type === "cw" && window.onServerCw) window.onServerCw(msg);
|
dispatchDecodeMessage(msg);
|
||||||
if (msg.type === "ft8" && window.onServerFt8) window.onServerFt8(msg);
|
}
|
||||||
if (msg.type === "wspr" && window.onServerWspr) window.onServerWspr(msg);
|
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
// ignore parse errors
|
// ignore parse errors
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -307,6 +307,9 @@ pub async fn decode_events(
|
|||||||
.map(|json| Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n"))))
|
.map(|json| Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n"))))
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
let history_done =
|
||||||
|
once(async { Ok::<Bytes, Error>(Bytes::from("event: history_done\ndata: {}\n\n")) });
|
||||||
|
|
||||||
let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move {
|
let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move {
|
||||||
loop {
|
loop {
|
||||||
match rx.recv().await {
|
match rx.recv().await {
|
||||||
@@ -327,7 +330,7 @@ pub async fn decode_events(
|
|||||||
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
||||||
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
||||||
|
|
||||||
let stream = history_stream.chain(select(pings, decode_stream));
|
let stream = history_stream.chain(history_done).chain(select(pings, decode_stream));
|
||||||
|
|
||||||
Ok(HttpResponse::Ok()
|
Ok(HttpResponse::Ok()
|
||||||
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
|
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
|
||||||
|
|||||||
Reference in New Issue
Block a user