From 73b1d5618d3684cbb3908a3e27fad23579586afb Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 14 Mar 2026 18:11:09 +0100 Subject: [PATCH] [fix](trx-frontend-http): group decode history by decoder Serve grouped decode history payloads and restore each decoder through explicit history restore hooks instead of replaying a mixed message stream. This reduces replay overhead further by removing type regrouping and keeping history restoration on decoder-specific bulk paths. Co-authored-by: OpenAI Codex Signed-off-by: Stan Grams --- .../trx-frontend-http/assets/web/app.js | 148 ++++++++++-------- .../assets/web/decode-history-worker.js | 71 ++++----- .../assets/web/plugins/ais.js | 4 + .../assets/web/plugins/aprs.js | 4 + .../assets/web/plugins/cw.js | 8 + .../assets/web/plugins/ft8.js | 4 + .../assets/web/plugins/hf-aprs.js | 4 + .../assets/web/plugins/vdes.js | 4 + .../assets/web/plugins/wspr.js | 4 + .../trx-frontend/trx-frontend-http/src/api.rs | 63 ++++---- 10 files changed, 176 insertions(+), 138 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 6d60d05..52a3dce 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -7361,9 +7361,8 @@ function dispatchDecodeBatch(batch) { } } -const DECODE_HISTORY_MAX_BATCH = 256; const DECODE_HISTORY_TYPE_BATCH_LIMIT = 192; -const DECODE_HISTORY_SLICE_BUDGET_MS = 10; +const DECODE_HISTORY_WORKER_GROUP_LIMIT = 512; const DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS = 8; function terminateDecodeHistoryWorker() { @@ -7381,52 +7380,53 @@ function scheduleDecodeHistoryDrainStep(callback) { } } -function drainDecodeHistory(buffer, index, onDone, onProgress) { - const startedAt = typeof performance !== "undefined" && typeof performance.now === "function" - ? performance.now() - : 0; - let nextIndex = index; - while (nextIndex < buffer.length) { - const batchStart = nextIndex; - const batchType = String(buffer[nextIndex]?.type || ""); - nextIndex += 1; - while ( - nextIndex < buffer.length - && (nextIndex - batchStart) < DECODE_HISTORY_TYPE_BATCH_LIMIT - && (nextIndex - index) < DECODE_HISTORY_MAX_BATCH - && String(buffer[nextIndex]?.type || "") === batchType - ) { - nextIndex += 1; - } - dispatchDecodeBatch(buffer.slice(batchStart, nextIndex)); - if ((nextIndex - index) >= DECODE_HISTORY_MAX_BATCH) break; - if (startedAt > 0 && (performance.now() - startedAt) >= DECODE_HISTORY_SLICE_BUDGET_MS) break; - } - if (typeof onProgress === "function") { - onProgress(nextIndex, buffer.length); - } - if (nextIndex < buffer.length) { - scheduleDecodeHistoryDrainStep(() => drainDecodeHistory(buffer, nextIndex, onDone, onProgress)); - } else if (typeof onDone === "function") { - onDone(); - } -} - function loadDecodeHistoryOnMainThread(onReady, onError) { fetch("/decode/history").then(async (resp) => { if (!resp.ok) return null; setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload"); const payload = await resp.arrayBuffer(); - if (!payload || payload.byteLength === 0) return []; + if (!payload || payload.byteLength === 0) return {}; setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload"); return decodeCborPayload(payload); - }).then((msgs) => { - if (typeof onReady === "function") onReady(Array.isArray(msgs) ? msgs : []); + }).then((groups) => { + if (typeof onReady === "function") onReady(groups && typeof groups === "object" ? groups : {}); }).catch((err) => { if (typeof onError === "function") onError(err); }); } +function restoreDecodeHistoryGroup(kind, messages) { + if (!Array.isArray(messages) || messages.length === 0) return; + if (kind === "ais" && window.restoreAisHistory) { + window.restoreAisHistory(messages); + return; + } + if (kind === "vdes" && window.restoreVdesHistory) { + window.restoreVdesHistory(messages); + return; + } + if (kind === "aprs" && window.restoreAprsHistory) { + window.restoreAprsHistory(messages); + return; + } + if (kind === "hf_aprs" && window.restoreHfAprsHistory) { + window.restoreHfAprsHistory(messages); + return; + } + if (kind === "cw" && window.restoreCwHistory) { + window.restoreCwHistory(messages); + return; + } + if (kind === "ft8" && window.restoreFt8History) { + window.restoreFt8History(messages); + return; + } + if (kind === "wspr" && window.restoreWsprHistory) { + window.restoreWsprHistory(messages); + return; + } +} + function connectDecode() { if (decodeSource) { decodeSource.close(); } terminateDecodeHistoryWorker(); @@ -7447,7 +7447,7 @@ function connectDecode() { let historyBatchDrainScheduled = false; let historyTotal = 0; let historyProcessed = 0; - const historyBatchQueue = []; + const historyGroupQueue = []; const liveBuffer = []; function flushLiveBuffer() { historySettled = true; @@ -7470,62 +7470,74 @@ function connectDecode() { function maybeFinishHistoryReplay() { if (historySettled) return; - if (historyWorkerDone && historyBatchQueue.length === 0) { + if (historyWorkerDone && historyGroupQueue.length === 0) { clearTimeout(historyTimeout); flushLiveBuffer(); } } - function pumpDecodeHistoryBatchQueue() { + function pumpDecodeHistoryGroupQueue() { historyBatchDrainScheduled = false; const startedAt = typeof performance !== "undefined" && typeof performance.now === "function" ? performance.now() : 0; - while (historyBatchQueue.length > 0) { - const batch = historyBatchQueue.shift(); - dispatchDecodeBatch(batch); - historyProcessed += Array.isArray(batch) ? batch.length : 0; + while (historyGroupQueue.length > 0) { + const next = historyGroupQueue.shift(); + restoreDecodeHistoryGroup(next.kind, next.messages); + historyProcessed += Array.isArray(next.messages) ? next.messages.length : 0; updateHistoryReplayOverlay(); if (startedAt > 0 && (performance.now() - startedAt) >= DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS) { break; } } - if (historyBatchQueue.length > 0) { - scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue); + if (historyGroupQueue.length > 0) { + scheduleDecodeHistoryDrainStep(pumpDecodeHistoryGroupQueue); historyBatchDrainScheduled = true; return; } maybeFinishHistoryReplay(); } - function enqueueDecodeHistoryBatch(batch) { - if (!Array.isArray(batch) || batch.length === 0) return; - historyBatchQueue.push(batch); + function enqueueDecodeHistoryGroup(kind, messages) { + if (!Array.isArray(messages) || messages.length === 0) return; + historyGroupQueue.push({ kind, messages }); if (historyBatchDrainScheduled) return; historyBatchDrainScheduled = true; - scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue); + scheduleDecodeHistoryDrainStep(pumpDecodeHistoryGroupQueue); + } + + function totalDecodeHistoryMessages(groups) { + if (!groups || typeof groups !== "object") return 0; + return ["ais", "vdes", "aprs", "hf_aprs", "cw", "ft8", "wspr"] + .reduce((sum, key) => sum + (Array.isArray(groups[key]) ? groups[key].length : 0), 0); + } + + function enqueueDecodeHistoryGroups(groups) { + historyTotal = totalDecodeHistoryMessages(groups); + historyProcessed = 0; + if (historyTotal > 0) { + setDecodeHistoryReplayActive(true); + updateHistoryReplayOverlay(); + } + for (const kind of ["ais", "vdes", "aprs", "hf_aprs", "cw", "ft8", "wspr"]) { + const messages = groups && Array.isArray(groups[kind]) ? groups[kind] : []; + if (messages.length === 0) continue; + for (let index = 0; index < messages.length; index += DECODE_HISTORY_WORKER_GROUP_LIMIT) { + enqueueDecodeHistoryGroup(kind, messages.slice(index, index + DECODE_HISTORY_WORKER_GROUP_LIMIT)); + } + } + historyWorkerDone = true; + maybeFinishHistoryReplay(); } function startDecodeHistoryFallback() { if (historyFallbackStarted || historySettled) return; historyFallbackStarted = true; - loadDecodeHistoryOnMainThread((msgs) => { + loadDecodeHistoryOnMainThread((groups) => { clearTimeout(historyTimeout); - if (Array.isArray(msgs) && msgs.length > 0) { - setDecodeHistoryReplayActive(true); - setDecodeHistoryOverlayVisible(true, "Loading decode history…", `Replaying 0 / ${msgs.length} decoded messages`); - drainDecodeHistory( - msgs, - 0, - flushLiveBuffer, - (processed, total) => { - setDecodeHistoryOverlayVisible( - true, - "Loading decode history…", - `Replaying ${processed} / ${total} decoded messages` - ); - } - ); + const total = totalDecodeHistoryMessages(groups); + if (total > 0) { + enqueueDecodeHistoryGroups(groups); } else { flushLiveBuffer(); } @@ -7567,8 +7579,8 @@ function connectDecode() { } return; } - if (data.type === "batch") { - enqueueDecodeHistoryBatch(data.batch); + if (data.type === "group") { + enqueueDecodeHistoryGroup(String(data.kind || ""), data.messages); return; } if (data.type === "done") { @@ -7587,7 +7599,7 @@ function connectDecode() { worker.postMessage({ type: "fetch-history", url: "/decode/history", - batchLimit: DECODE_HISTORY_TYPE_BATCH_LIMIT, + batchLimit: DECODE_HISTORY_WORKER_GROUP_LIMIT, }); return true; } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js index e2e6908..241ef51 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js @@ -1,4 +1,5 @@ const textDecoder = typeof TextDecoder === "function" ? new TextDecoder() : null; +const HISTORY_GROUP_KEYS = ["ais", "vdes", "aprs", "hf_aprs", "cw", "ft8", "wspr"]; function decodeCborUint(view, bytes, state, additional) { const offset = state.offset; @@ -111,13 +112,15 @@ function decodeCborItem(view, bytes, state) { throw new Error("Unsupported CBOR major type"); } -function decodeTopLevelArrayLength(view, bytes, state) { - if (state.offset >= bytes.length) throw new Error("CBOR payload truncated"); - const initial = bytes[state.offset++]; - const major = initial >> 5; - const additional = initial & 0x1f; - if (major !== 4) throw new Error("Decode history payload is not a CBOR array"); - return decodeCborUint(view, bytes, state, additional); +function decodeCborPayload(buffer) { + const bytes = buffer instanceof Uint8Array ? buffer : new Uint8Array(buffer); + const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength); + const state = { offset: 0 }; + const value = decodeCborItem(view, bytes, state); + if (state.offset !== bytes.length) { + throw new Error("Unexpected trailing bytes in decode history payload"); + } + return value; } async function fetchAndDecodeHistory(url, batchLimit) { @@ -132,46 +135,30 @@ async function fetchAndDecodeHistory(url, batchLimit) { } self.postMessage({ type: "status", phase: "decoding" }); - const bytes = new Uint8Array(payload); - const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength); - const state = { offset: 0 }; - const total = decodeTopLevelArrayLength(view, bytes, state); + const history = decodeCborPayload(payload); + const total = HISTORY_GROUP_KEYS.reduce((sum, key) => { + const items = history && Array.isArray(history[key]) ? history[key] : []; + return sum + items.length; + }, 0); self.postMessage({ type: "start", total }); let processed = 0; - let currentType = ""; - let currentBatch = []; - const safeLimit = Math.max(1, Math.min(512, Number(batchLimit) || 192)); + const safeLimit = Math.max(1, Math.min(2048, Number(batchLimit) || 512)); - function flushBatch() { - if (currentBatch.length === 0) return; - self.postMessage({ - type: "batch", - batch: currentBatch, - processed, - total, - }); - currentBatch = []; - currentType = ""; - } - - for (let i = 0; i < total; i += 1) { - const item = decodeCborItem(view, bytes, state); - const itemType = String(item?.type || ""); - if ( - currentBatch.length > 0 - && (itemType !== currentType || currentBatch.length >= safeLimit) - ) { - flushBatch(); + for (const kind of HISTORY_GROUP_KEYS) { + const items = history && Array.isArray(history[kind]) ? history[kind] : []; + if (items.length === 0) continue; + for (let index = 0; index < items.length; index += safeLimit) { + const messages = items.slice(index, index + safeLimit); + processed += messages.length; + self.postMessage({ + type: "group", + kind, + messages, + processed, + total, + }); } - currentType = itemType; - currentBatch.push(item); - processed += 1; - } - flushBatch(); - - if (state.offset !== bytes.length) { - throw new Error("Unexpected trailing bytes in decode history payload"); } self.postMessage({ type: "done", total }); } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ais.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ais.js index 455933a..1359554 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ais.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ais.js @@ -390,6 +390,10 @@ window.onServerAisBatch = function(messages) { scheduleAisHistoryRender(); }; +window.restoreAisHistory = function(messages) { + window.onServerAisBatch(messages); +}; + window.pruneAisHistoryView = function() { pruneAisMessageHistory(); updateAisBar(); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js index 44802dc..0a2ba4a 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js @@ -447,6 +447,10 @@ window.onServerAprsBatch = function(packets) { scheduleAprsHistoryRender(); }; +window.restoreAprsHistory = function(packets) { + window.onServerAprsBatch(packets); +}; + document.getElementById("aprs-clear-btn").addEventListener("click", async () => { try { await postPath("/clear_aprs_decode"); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js index 54f46e7..e407439 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js @@ -426,6 +426,14 @@ window.onServerCw = function(evt) { }); }; +window.restoreCwHistory = function(events) { + if (!Array.isArray(events) || events.length === 0) return; + if (cwStatusEl) cwStatusEl.textContent = cwPaused ? "Paused" : "Receiving"; + for (const evt of events) { + window.onServerCw(evt); + } +}; + if (cwPauseBtn) { cwPauseBtn.addEventListener("click", () => { cwPaused = !cwPaused; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js index df139cc..3594381 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js @@ -167,6 +167,10 @@ window.onServerFt8Batch = function(messages) { scheduleFt8HistoryRender(); }; +window.restoreFt8History = function(messages) { + window.onServerFt8Batch(messages); +}; + window.pruneFt8HistoryView = function() { pruneFt8MessageHistory(); updateFt8Bar(); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/hf-aprs.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/hf-aprs.js index 4f22091..bda1e92 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/hf-aprs.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/hf-aprs.js @@ -393,6 +393,10 @@ window.onServerHfAprsBatch = function(packets) { scheduleHfAprsHistoryRender(); }; +window.restoreHfAprsHistory = function(packets) { + window.onServerHfAprsBatch(packets); +}; + document.getElementById("hf-aprs-decode-toggle-btn")?.addEventListener("click", async () => { try { await postPath("/toggle_hf_aprs_decode"); } catch (e) { console.error("HF APRS toggle failed", e); } }); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vdes.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vdes.js index a32f0ef..7c08851 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vdes.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vdes.js @@ -331,6 +331,10 @@ window.onServerVdesBatch = function(messages) { scheduleVdesHistoryRender(); }; +window.restoreVdesHistory = function(messages) { + window.onServerVdesBatch(messages); +}; + if (vdesClearBtn) { vdesClearBtn.addEventListener("click", async () => { try { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/wspr.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/wspr.js index 3b32914..2449c0a 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/wspr.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/wspr.js @@ -144,6 +144,10 @@ window.onServerWsprBatch = function(messages) { scheduleWsprHistoryRender(); }; +window.restoreWsprHistory = function(messages) { + window.onServerWsprBatch(messages); +}; + window.pruneWsprHistoryView = function() { pruneWsprMessageHistory(); renderWsprHistory(); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 4661e6f..e0567d9 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -442,33 +442,40 @@ fn sync_scheduler_vchannels( vchan_mgr.sync_scheduler_channels(rig_id, &desired); } -/// Build the combined decode history vector from all per-decoder ring-buffers. -fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec { - let ais = crate::server::audio::snapshot_ais_history(context); - let vdes = crate::server::audio::snapshot_vdes_history(context); - let aprs = crate::server::audio::snapshot_aprs_history(context); - let hf_aprs = crate::server::audio::snapshot_hf_aprs_history(context); - let cw = crate::server::audio::snapshot_cw_history(context); - let ft8 = crate::server::audio::snapshot_ft8_history(context); - let wspr = crate::server::audio::snapshot_wspr_history(context); +#[derive(serde::Serialize)] +struct DecodeHistoryPayload { + ais: Vec, + vdes: Vec, + aprs: Vec, + hf_aprs: Vec, + cw: Vec, + ft8: Vec, + wspr: Vec, +} - let mut out = Vec::with_capacity( - ais.len() - + vdes.len() - + aprs.len() - + hf_aprs.len() - + cw.len() - + ft8.len() - + wspr.len(), - ); - out.extend(ais.into_iter().map(trx_core::decode::DecodedMessage::Ais)); - out.extend(vdes.into_iter().map(trx_core::decode::DecodedMessage::Vdes)); - out.extend(aprs.into_iter().map(trx_core::decode::DecodedMessage::Aprs)); - out.extend(hf_aprs.into_iter().map(trx_core::decode::DecodedMessage::HfAprs)); - out.extend(cw.into_iter().map(trx_core::decode::DecodedMessage::Cw)); - out.extend(ft8.into_iter().map(trx_core::decode::DecodedMessage::Ft8)); - out.extend(wspr.into_iter().map(trx_core::decode::DecodedMessage::Wspr)); - out +impl DecodeHistoryPayload { + fn total_messages(&self) -> usize { + self.ais.len() + + self.vdes.len() + + self.aprs.len() + + self.hf_aprs.len() + + self.cw.len() + + self.ft8.len() + + self.wspr.len() + } +} + +/// Build the grouped decode history payload from all per-decoder ring-buffers. +fn collect_decode_history(context: &FrontendRuntimeContext) -> DecodeHistoryPayload { + DecodeHistoryPayload { + ais: crate::server::audio::snapshot_ais_history(context), + vdes: crate::server::audio::snapshot_vdes_history(context), + aprs: crate::server::audio::snapshot_aprs_history(context), + hf_aprs: crate::server::audio::snapshot_hf_aprs_history(context), + cw: crate::server::audio::snapshot_cw_history(context), + ft8: crate::server::audio::snapshot_ft8_history(context), + wspr: crate::server::audio::snapshot_wspr_history(context), + } } fn encode_cbor_length(out: &mut Vec, major: u8, value: u64) { @@ -537,10 +544,10 @@ fn encode_cbor_json_value(out: &mut Vec, value: &serde_json::Value) { } fn encode_decode_history_cbor( - history: &[trx_core::decode::DecodedMessage], + history: &DecodeHistoryPayload, ) -> Result, serde_json::Error> { let value = serde_json::to_value(history)?; - let mut out = Vec::with_capacity(history.len().saturating_mul(96)); + let mut out = Vec::with_capacity(history.total_messages().saturating_mul(96)); encode_cbor_json_value(&mut out, &value); Ok(out) }