[fix](trx-frontend-http): group decode history by decoder

Serve grouped decode history payloads and restore each decoder through
explicit history restore hooks instead of replaying a mixed message stream.

This reduces replay overhead further by removing type regrouping and keeping
history restoration on decoder-specific bulk paths.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-14 18:11:09 +01:00
parent cba3751f0e
commit 73b1d5618d
10 changed files with 176 additions and 138 deletions
@@ -7361,9 +7361,8 @@ function dispatchDecodeBatch(batch) {
}
}
const DECODE_HISTORY_MAX_BATCH = 256;
const DECODE_HISTORY_TYPE_BATCH_LIMIT = 192;
const DECODE_HISTORY_SLICE_BUDGET_MS = 10;
const DECODE_HISTORY_WORKER_GROUP_LIMIT = 512;
const DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS = 8;
function terminateDecodeHistoryWorker() {
@@ -7381,52 +7380,53 @@ function scheduleDecodeHistoryDrainStep(callback) {
}
}
function drainDecodeHistory(buffer, index, onDone, onProgress) {
const startedAt = typeof performance !== "undefined" && typeof performance.now === "function"
? performance.now()
: 0;
let nextIndex = index;
while (nextIndex < buffer.length) {
const batchStart = nextIndex;
const batchType = String(buffer[nextIndex]?.type || "");
nextIndex += 1;
while (
nextIndex < buffer.length
&& (nextIndex - batchStart) < DECODE_HISTORY_TYPE_BATCH_LIMIT
&& (nextIndex - index) < DECODE_HISTORY_MAX_BATCH
&& String(buffer[nextIndex]?.type || "") === batchType
) {
nextIndex += 1;
}
dispatchDecodeBatch(buffer.slice(batchStart, nextIndex));
if ((nextIndex - index) >= DECODE_HISTORY_MAX_BATCH) break;
if (startedAt > 0 && (performance.now() - startedAt) >= DECODE_HISTORY_SLICE_BUDGET_MS) break;
}
if (typeof onProgress === "function") {
onProgress(nextIndex, buffer.length);
}
if (nextIndex < buffer.length) {
scheduleDecodeHistoryDrainStep(() => drainDecodeHistory(buffer, nextIndex, onDone, onProgress));
} else if (typeof onDone === "function") {
onDone();
}
}
function loadDecodeHistoryOnMainThread(onReady, onError) {
fetch("/decode/history").then(async (resp) => {
if (!resp.ok) return null;
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload");
const payload = await resp.arrayBuffer();
if (!payload || payload.byteLength === 0) return [];
if (!payload || payload.byteLength === 0) return {};
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload");
return decodeCborPayload(payload);
}).then((msgs) => {
if (typeof onReady === "function") onReady(Array.isArray(msgs) ? msgs : []);
}).then((groups) => {
if (typeof onReady === "function") onReady(groups && typeof groups === "object" ? groups : {});
}).catch((err) => {
if (typeof onError === "function") onError(err);
});
}
function restoreDecodeHistoryGroup(kind, messages) {
if (!Array.isArray(messages) || messages.length === 0) return;
if (kind === "ais" && window.restoreAisHistory) {
window.restoreAisHistory(messages);
return;
}
if (kind === "vdes" && window.restoreVdesHistory) {
window.restoreVdesHistory(messages);
return;
}
if (kind === "aprs" && window.restoreAprsHistory) {
window.restoreAprsHistory(messages);
return;
}
if (kind === "hf_aprs" && window.restoreHfAprsHistory) {
window.restoreHfAprsHistory(messages);
return;
}
if (kind === "cw" && window.restoreCwHistory) {
window.restoreCwHistory(messages);
return;
}
if (kind === "ft8" && window.restoreFt8History) {
window.restoreFt8History(messages);
return;
}
if (kind === "wspr" && window.restoreWsprHistory) {
window.restoreWsprHistory(messages);
return;
}
}
function connectDecode() {
if (decodeSource) { decodeSource.close(); }
terminateDecodeHistoryWorker();
@@ -7447,7 +7447,7 @@ function connectDecode() {
let historyBatchDrainScheduled = false;
let historyTotal = 0;
let historyProcessed = 0;
const historyBatchQueue = [];
const historyGroupQueue = [];
const liveBuffer = [];
function flushLiveBuffer() {
historySettled = true;
@@ -7470,62 +7470,74 @@ function connectDecode() {
function maybeFinishHistoryReplay() {
if (historySettled) return;
if (historyWorkerDone && historyBatchQueue.length === 0) {
if (historyWorkerDone && historyGroupQueue.length === 0) {
clearTimeout(historyTimeout);
flushLiveBuffer();
}
}
function pumpDecodeHistoryBatchQueue() {
function pumpDecodeHistoryGroupQueue() {
historyBatchDrainScheduled = false;
const startedAt = typeof performance !== "undefined" && typeof performance.now === "function"
? performance.now()
: 0;
while (historyBatchQueue.length > 0) {
const batch = historyBatchQueue.shift();
dispatchDecodeBatch(batch);
historyProcessed += Array.isArray(batch) ? batch.length : 0;
while (historyGroupQueue.length > 0) {
const next = historyGroupQueue.shift();
restoreDecodeHistoryGroup(next.kind, next.messages);
historyProcessed += Array.isArray(next.messages) ? next.messages.length : 0;
updateHistoryReplayOverlay();
if (startedAt > 0 && (performance.now() - startedAt) >= DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS) {
break;
}
}
if (historyBatchQueue.length > 0) {
scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue);
if (historyGroupQueue.length > 0) {
scheduleDecodeHistoryDrainStep(pumpDecodeHistoryGroupQueue);
historyBatchDrainScheduled = true;
return;
}
maybeFinishHistoryReplay();
}
function enqueueDecodeHistoryBatch(batch) {
if (!Array.isArray(batch) || batch.length === 0) return;
historyBatchQueue.push(batch);
function enqueueDecodeHistoryGroup(kind, messages) {
if (!Array.isArray(messages) || messages.length === 0) return;
historyGroupQueue.push({ kind, messages });
if (historyBatchDrainScheduled) return;
historyBatchDrainScheduled = true;
scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue);
scheduleDecodeHistoryDrainStep(pumpDecodeHistoryGroupQueue);
}
function totalDecodeHistoryMessages(groups) {
if (!groups || typeof groups !== "object") return 0;
return ["ais", "vdes", "aprs", "hf_aprs", "cw", "ft8", "wspr"]
.reduce((sum, key) => sum + (Array.isArray(groups[key]) ? groups[key].length : 0), 0);
}
function enqueueDecodeHistoryGroups(groups) {
historyTotal = totalDecodeHistoryMessages(groups);
historyProcessed = 0;
if (historyTotal > 0) {
setDecodeHistoryReplayActive(true);
updateHistoryReplayOverlay();
}
for (const kind of ["ais", "vdes", "aprs", "hf_aprs", "cw", "ft8", "wspr"]) {
const messages = groups && Array.isArray(groups[kind]) ? groups[kind] : [];
if (messages.length === 0) continue;
for (let index = 0; index < messages.length; index += DECODE_HISTORY_WORKER_GROUP_LIMIT) {
enqueueDecodeHistoryGroup(kind, messages.slice(index, index + DECODE_HISTORY_WORKER_GROUP_LIMIT));
}
}
historyWorkerDone = true;
maybeFinishHistoryReplay();
}
function startDecodeHistoryFallback() {
if (historyFallbackStarted || historySettled) return;
historyFallbackStarted = true;
loadDecodeHistoryOnMainThread((msgs) => {
loadDecodeHistoryOnMainThread((groups) => {
clearTimeout(historyTimeout);
if (Array.isArray(msgs) && msgs.length > 0) {
setDecodeHistoryReplayActive(true);
setDecodeHistoryOverlayVisible(true, "Loading decode history…", `Replaying 0 / ${msgs.length} decoded messages`);
drainDecodeHistory(
msgs,
0,
flushLiveBuffer,
(processed, total) => {
setDecodeHistoryOverlayVisible(
true,
"Loading decode history…",
`Replaying ${processed} / ${total} decoded messages`
);
}
);
const total = totalDecodeHistoryMessages(groups);
if (total > 0) {
enqueueDecodeHistoryGroups(groups);
} else {
flushLiveBuffer();
}
@@ -7567,8 +7579,8 @@ function connectDecode() {
}
return;
}
if (data.type === "batch") {
enqueueDecodeHistoryBatch(data.batch);
if (data.type === "group") {
enqueueDecodeHistoryGroup(String(data.kind || ""), data.messages);
return;
}
if (data.type === "done") {
@@ -7587,7 +7599,7 @@ function connectDecode() {
worker.postMessage({
type: "fetch-history",
url: "/decode/history",
batchLimit: DECODE_HISTORY_TYPE_BATCH_LIMIT,
batchLimit: DECODE_HISTORY_WORKER_GROUP_LIMIT,
});
return true;
}
@@ -1,4 +1,5 @@
const textDecoder = typeof TextDecoder === "function" ? new TextDecoder() : null;
const HISTORY_GROUP_KEYS = ["ais", "vdes", "aprs", "hf_aprs", "cw", "ft8", "wspr"];
function decodeCborUint(view, bytes, state, additional) {
const offset = state.offset;
@@ -111,13 +112,15 @@ function decodeCborItem(view, bytes, state) {
throw new Error("Unsupported CBOR major type");
}
function decodeTopLevelArrayLength(view, bytes, state) {
if (state.offset >= bytes.length) throw new Error("CBOR payload truncated");
const initial = bytes[state.offset++];
const major = initial >> 5;
const additional = initial & 0x1f;
if (major !== 4) throw new Error("Decode history payload is not a CBOR array");
return decodeCborUint(view, bytes, state, additional);
function decodeCborPayload(buffer) {
const bytes = buffer instanceof Uint8Array ? buffer : new Uint8Array(buffer);
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength);
const state = { offset: 0 };
const value = decodeCborItem(view, bytes, state);
if (state.offset !== bytes.length) {
throw new Error("Unexpected trailing bytes in decode history payload");
}
return value;
}
async function fetchAndDecodeHistory(url, batchLimit) {
@@ -132,46 +135,30 @@ async function fetchAndDecodeHistory(url, batchLimit) {
}
self.postMessage({ type: "status", phase: "decoding" });
const bytes = new Uint8Array(payload);
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength);
const state = { offset: 0 };
const total = decodeTopLevelArrayLength(view, bytes, state);
const history = decodeCborPayload(payload);
const total = HISTORY_GROUP_KEYS.reduce((sum, key) => {
const items = history && Array.isArray(history[key]) ? history[key] : [];
return sum + items.length;
}, 0);
self.postMessage({ type: "start", total });
let processed = 0;
let currentType = "";
let currentBatch = [];
const safeLimit = Math.max(1, Math.min(512, Number(batchLimit) || 192));
const safeLimit = Math.max(1, Math.min(2048, Number(batchLimit) || 512));
function flushBatch() {
if (currentBatch.length === 0) return;
self.postMessage({
type: "batch",
batch: currentBatch,
processed,
total,
});
currentBatch = [];
currentType = "";
}
for (let i = 0; i < total; i += 1) {
const item = decodeCborItem(view, bytes, state);
const itemType = String(item?.type || "");
if (
currentBatch.length > 0
&& (itemType !== currentType || currentBatch.length >= safeLimit)
) {
flushBatch();
for (const kind of HISTORY_GROUP_KEYS) {
const items = history && Array.isArray(history[kind]) ? history[kind] : [];
if (items.length === 0) continue;
for (let index = 0; index < items.length; index += safeLimit) {
const messages = items.slice(index, index + safeLimit);
processed += messages.length;
self.postMessage({
type: "group",
kind,
messages,
processed,
total,
});
}
currentType = itemType;
currentBatch.push(item);
processed += 1;
}
flushBatch();
if (state.offset !== bytes.length) {
throw new Error("Unexpected trailing bytes in decode history payload");
}
self.postMessage({ type: "done", total });
}
@@ -390,6 +390,10 @@ window.onServerAisBatch = function(messages) {
scheduleAisHistoryRender();
};
window.restoreAisHistory = function(messages) {
window.onServerAisBatch(messages);
};
window.pruneAisHistoryView = function() {
pruneAisMessageHistory();
updateAisBar();
@@ -447,6 +447,10 @@ window.onServerAprsBatch = function(packets) {
scheduleAprsHistoryRender();
};
window.restoreAprsHistory = function(packets) {
window.onServerAprsBatch(packets);
};
document.getElementById("aprs-clear-btn").addEventListener("click", async () => {
try {
await postPath("/clear_aprs_decode");
@@ -426,6 +426,14 @@ window.onServerCw = function(evt) {
});
};
window.restoreCwHistory = function(events) {
if (!Array.isArray(events) || events.length === 0) return;
if (cwStatusEl) cwStatusEl.textContent = cwPaused ? "Paused" : "Receiving";
for (const evt of events) {
window.onServerCw(evt);
}
};
if (cwPauseBtn) {
cwPauseBtn.addEventListener("click", () => {
cwPaused = !cwPaused;
@@ -167,6 +167,10 @@ window.onServerFt8Batch = function(messages) {
scheduleFt8HistoryRender();
};
window.restoreFt8History = function(messages) {
window.onServerFt8Batch(messages);
};
window.pruneFt8HistoryView = function() {
pruneFt8MessageHistory();
updateFt8Bar();
@@ -393,6 +393,10 @@ window.onServerHfAprsBatch = function(packets) {
scheduleHfAprsHistoryRender();
};
window.restoreHfAprsHistory = function(packets) {
window.onServerHfAprsBatch(packets);
};
document.getElementById("hf-aprs-decode-toggle-btn")?.addEventListener("click", async () => {
try { await postPath("/toggle_hf_aprs_decode"); } catch (e) { console.error("HF APRS toggle failed", e); }
});
@@ -331,6 +331,10 @@ window.onServerVdesBatch = function(messages) {
scheduleVdesHistoryRender();
};
window.restoreVdesHistory = function(messages) {
window.onServerVdesBatch(messages);
};
if (vdesClearBtn) {
vdesClearBtn.addEventListener("click", async () => {
try {
@@ -144,6 +144,10 @@ window.onServerWsprBatch = function(messages) {
scheduleWsprHistoryRender();
};
window.restoreWsprHistory = function(messages) {
window.onServerWsprBatch(messages);
};
window.pruneWsprHistoryView = function() {
pruneWsprMessageHistory();
renderWsprHistory();
@@ -442,33 +442,40 @@ fn sync_scheduler_vchannels(
vchan_mgr.sync_scheduler_channels(rig_id, &desired);
}
/// Build the combined decode history vector from all per-decoder ring-buffers.
fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec<trx_core::decode::DecodedMessage> {
let ais = crate::server::audio::snapshot_ais_history(context);
let vdes = crate::server::audio::snapshot_vdes_history(context);
let aprs = crate::server::audio::snapshot_aprs_history(context);
let hf_aprs = crate::server::audio::snapshot_hf_aprs_history(context);
let cw = crate::server::audio::snapshot_cw_history(context);
let ft8 = crate::server::audio::snapshot_ft8_history(context);
let wspr = crate::server::audio::snapshot_wspr_history(context);
#[derive(serde::Serialize)]
struct DecodeHistoryPayload {
ais: Vec<trx_core::decode::AisMessage>,
vdes: Vec<trx_core::decode::VdesMessage>,
aprs: Vec<trx_core::decode::AprsPacket>,
hf_aprs: Vec<trx_core::decode::AprsPacket>,
cw: Vec<trx_core::decode::CwEvent>,
ft8: Vec<trx_core::decode::Ft8Message>,
wspr: Vec<trx_core::decode::WsprMessage>,
}
let mut out = Vec::with_capacity(
ais.len()
+ vdes.len()
+ aprs.len()
+ hf_aprs.len()
+ cw.len()
+ ft8.len()
+ wspr.len(),
);
out.extend(ais.into_iter().map(trx_core::decode::DecodedMessage::Ais));
out.extend(vdes.into_iter().map(trx_core::decode::DecodedMessage::Vdes));
out.extend(aprs.into_iter().map(trx_core::decode::DecodedMessage::Aprs));
out.extend(hf_aprs.into_iter().map(trx_core::decode::DecodedMessage::HfAprs));
out.extend(cw.into_iter().map(trx_core::decode::DecodedMessage::Cw));
out.extend(ft8.into_iter().map(trx_core::decode::DecodedMessage::Ft8));
out.extend(wspr.into_iter().map(trx_core::decode::DecodedMessage::Wspr));
out
impl DecodeHistoryPayload {
fn total_messages(&self) -> usize {
self.ais.len()
+ self.vdes.len()
+ self.aprs.len()
+ self.hf_aprs.len()
+ self.cw.len()
+ self.ft8.len()
+ self.wspr.len()
}
}
/// Build the grouped decode history payload from all per-decoder ring-buffers.
fn collect_decode_history(context: &FrontendRuntimeContext) -> DecodeHistoryPayload {
DecodeHistoryPayload {
ais: crate::server::audio::snapshot_ais_history(context),
vdes: crate::server::audio::snapshot_vdes_history(context),
aprs: crate::server::audio::snapshot_aprs_history(context),
hf_aprs: crate::server::audio::snapshot_hf_aprs_history(context),
cw: crate::server::audio::snapshot_cw_history(context),
ft8: crate::server::audio::snapshot_ft8_history(context),
wspr: crate::server::audio::snapshot_wspr_history(context),
}
}
fn encode_cbor_length(out: &mut Vec<u8>, major: u8, value: u64) {
@@ -537,10 +544,10 @@ fn encode_cbor_json_value(out: &mut Vec<u8>, value: &serde_json::Value) {
}
fn encode_decode_history_cbor(
history: &[trx_core::decode::DecodedMessage],
history: &DecodeHistoryPayload,
) -> Result<Vec<u8>, serde_json::Error> {
let value = serde_json::to_value(history)?;
let mut out = Vec::with_capacity(history.len().saturating_mul(96));
let mut out = Vec::with_capacity(history.total_messages().saturating_mul(96));
encode_cbor_json_value(&mut out, &value);
Ok(out)
}