[fix](trx-frontend-http): move history decode off main thread

Serve a dedicated decode-history worker and move compressed history fetch
and CBOR parsing into that worker.

The main thread now drains ready-made decode batches within a frame budget,
which further reduces UI disruption during large history restores.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-14 18:04:11 +01:00
parent e9cb2852be
commit cba3751f0e
4 changed files with 377 additions and 35 deletions
@@ -7294,6 +7294,7 @@ document.getElementById("copyright-year").textContent = new Date().getFullYear()
// --- Server-side decode SSE --- // --- Server-side decode SSE ---
let decodeSource = null; let decodeSource = null;
let decodeConnected = false; let decodeConnected = false;
let decodeHistoryWorker = null;
function setModeBoundDecodeStatus(el, activeModes, inactiveText, connectedText) { function setModeBoundDecodeStatus(el, activeModes, inactiveText, connectedText) {
if (!el) return; if (!el) return;
const modeUpper = (document.getElementById("mode")?.value || "").toUpperCase(); const modeUpper = (document.getElementById("mode")?.value || "").toUpperCase();
@@ -7363,6 +7364,13 @@ function dispatchDecodeBatch(batch) {
const DECODE_HISTORY_MAX_BATCH = 256; const DECODE_HISTORY_MAX_BATCH = 256;
const DECODE_HISTORY_TYPE_BATCH_LIMIT = 192; const DECODE_HISTORY_TYPE_BATCH_LIMIT = 192;
const DECODE_HISTORY_SLICE_BUDGET_MS = 10; const DECODE_HISTORY_SLICE_BUDGET_MS = 10;
const DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS = 8;
function terminateDecodeHistoryWorker() {
if (!decodeHistoryWorker) return;
try { decodeHistoryWorker.terminate(); } catch (_) {}
decodeHistoryWorker = null;
}
function scheduleDecodeHistoryDrainStep(callback) { function scheduleDecodeHistoryDrainStep(callback) {
if (typeof callback !== "function") return; if (typeof callback !== "function") return;
@@ -7404,8 +7412,24 @@ function drainDecodeHistory(buffer, index, onDone, onProgress) {
} }
} }
function loadDecodeHistoryOnMainThread(onReady, onError) {
fetch("/decode/history").then(async (resp) => {
if (!resp.ok) return null;
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload");
const payload = await resp.arrayBuffer();
if (!payload || payload.byteLength === 0) return [];
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload");
return decodeCborPayload(payload);
}).then((msgs) => {
if (typeof onReady === "function") onReady(Array.isArray(msgs) ? msgs : []);
}).catch((err) => {
if (typeof onError === "function") onError(err);
});
}
function connectDecode() { function connectDecode() {
if (decodeSource) { decodeSource.close(); } if (decodeSource) { decodeSource.close(); }
terminateDecodeHistoryWorker();
decodeHistoryReplayActive = false; decodeHistoryReplayActive = false;
decodeMapSyncPending = false; decodeMapSyncPending = false;
if (window.resetAisHistoryView) window.resetAisHistoryView(); if (window.resetAisHistoryView) window.resetAisHistoryView();
@@ -7418,9 +7442,16 @@ function connectDecode() {
// Buffer live messages until history fetch settles so history always appears // Buffer live messages until history fetch settles so history always appears
// before any live updates, regardless of network ordering. // before any live updates, regardless of network ordering.
let historySettled = false; let historySettled = false;
let historyWorkerDone = false;
let historyFallbackStarted = false;
let historyBatchDrainScheduled = false;
let historyTotal = 0;
let historyProcessed = 0;
const historyBatchQueue = [];
const liveBuffer = []; const liveBuffer = [];
function flushLiveBuffer() { function flushLiveBuffer() {
historySettled = true; historySettled = true;
terminateDecodeHistoryWorker();
setDecodeHistoryReplayActive(false); setDecodeHistoryReplayActive(false);
setDecodeHistoryOverlayVisible(false); setDecodeHistoryOverlayVisible(false);
for (const msg of liveBuffer) { for (const msg of liveBuffer) {
@@ -7428,45 +7459,57 @@ function connectDecode() {
} }
liveBuffer.length = 0; liveBuffer.length = 0;
} }
// Safety valve: if the history fetch hangs, unblock after 8 s.
const historyTimeout = setTimeout(() => { if (!historySettled) flushLiveBuffer(); }, 8000);
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Fetching recent decodes from the client buffer");
decodeSource = new EventSource("/decode"); function updateHistoryReplayOverlay() {
decodeSource.onopen = () => { setDecodeHistoryOverlayVisible(
decodeConnected = true; true,
updateDecodeStatus("Connected, listening for packets"); "Loading decode history…",
}; `Replaying ${historyProcessed} / ${historyTotal} decoded messages`
decodeSource.onmessage = (evt) => { );
try {
const msg = JSON.parse(evt.data);
if (historySettled) dispatchDecodeMessage(msg);
else liveBuffer.push(msg);
} catch (e) { /* ignore parse errors */ }
};
decodeSource.onerror = () => {
// readyState CLOSED (2) = server rejected (404/error), CONNECTING (0) = temporary drop
const wasClosed = decodeSource.readyState === 2;
decodeSource.close();
decodeConnected = false;
if (wasClosed) {
updateDecodeStatus("Decode not available (check client audio config)");
setTimeout(connectDecode, 10000);
} else {
updateDecodeStatus("Decode disconnected, retrying…");
setTimeout(connectDecode, 5000);
} }
};
// Fetch history in parallel; drain it first, then flush buffered live msgs. function maybeFinishHistoryReplay() {
fetch("/decode/history").then(async (resp) => { if (historySettled) return;
if (!resp.ok) return null; if (historyWorkerDone && historyBatchQueue.length === 0) {
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload"); clearTimeout(historyTimeout);
const payload = await resp.arrayBuffer(); flushLiveBuffer();
if (!payload || payload.byteLength === 0) return []; }
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload"); }
return decodeCborPayload(payload);
}).then((msgs) => { function pumpDecodeHistoryBatchQueue() {
historyBatchDrainScheduled = false;
const startedAt = typeof performance !== "undefined" && typeof performance.now === "function"
? performance.now()
: 0;
while (historyBatchQueue.length > 0) {
const batch = historyBatchQueue.shift();
dispatchDecodeBatch(batch);
historyProcessed += Array.isArray(batch) ? batch.length : 0;
updateHistoryReplayOverlay();
if (startedAt > 0 && (performance.now() - startedAt) >= DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS) {
break;
}
}
if (historyBatchQueue.length > 0) {
scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue);
historyBatchDrainScheduled = true;
return;
}
maybeFinishHistoryReplay();
}
function enqueueDecodeHistoryBatch(batch) {
if (!Array.isArray(batch) || batch.length === 0) return;
historyBatchQueue.push(batch);
if (historyBatchDrainScheduled) return;
historyBatchDrainScheduled = true;
scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue);
}
function startDecodeHistoryFallback() {
if (historyFallbackStarted || historySettled) return;
historyFallbackStarted = true;
loadDecodeHistoryOnMainThread((msgs) => {
clearTimeout(historyTimeout); clearTimeout(historyTimeout);
if (Array.isArray(msgs) && msgs.length > 0) { if (Array.isArray(msgs) && msgs.length > 0) {
setDecodeHistoryReplayActive(true); setDecodeHistoryReplayActive(true);
@@ -7486,11 +7529,108 @@ function connectDecode() {
} else { } else {
flushLiveBuffer(); flushLiveBuffer();
} }
}).catch((err) => { }, (err) => {
console.error("Decode history fetch failed", err); console.error("Decode history fallback failed", err);
clearTimeout(historyTimeout); clearTimeout(historyTimeout);
flushLiveBuffer(); flushLiveBuffer();
}); });
}
function startDecodeHistoryWorkerReplay() {
if (typeof Worker !== "function") return false;
let worker;
try {
worker = new Worker("/decode-history-worker.js");
} catch (err) {
console.error("Decode history worker startup failed", err);
return false;
}
decodeHistoryWorker = worker;
worker.onmessage = (evt) => {
if (historySettled || worker !== decodeHistoryWorker) return;
const data = evt?.data || {};
if (data.type === "status") {
const phase = String(data.phase || "");
if (phase === "fetching") {
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Fetching recent decodes from the client buffer");
} else if (phase === "decoding") {
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history in background");
}
return;
}
if (data.type === "start") {
historyTotal = Math.max(0, Number(data.total) || 0);
historyProcessed = 0;
if (historyTotal > 0) {
setDecodeHistoryReplayActive(true);
updateHistoryReplayOverlay();
}
return;
}
if (data.type === "batch") {
enqueueDecodeHistoryBatch(data.batch);
return;
}
if (data.type === "done") {
historyWorkerDone = true;
clearTimeout(historyTimeout);
terminateDecodeHistoryWorker();
maybeFinishHistoryReplay();
return;
}
if (data.type === "error") {
console.error("Decode history worker failed", data.message || "unknown worker failure");
terminateDecodeHistoryWorker();
startDecodeHistoryFallback();
}
};
worker.postMessage({
type: "fetch-history",
url: "/decode/history",
batchLimit: DECODE_HISTORY_TYPE_BATCH_LIMIT,
});
return true;
}
// Safety valve: if the history fetch hangs, unblock after 20 s.
const historyTimeout = setTimeout(() => {
if (!historySettled) {
terminateDecodeHistoryWorker();
flushLiveBuffer();
}
}, 20000);
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Fetching recent decodes from the client buffer");
decodeSource = new EventSource("/decode");
decodeSource.onopen = () => {
decodeConnected = true;
updateDecodeStatus("Connected, listening for packets");
};
decodeSource.onmessage = (evt) => {
try {
const msg = JSON.parse(evt.data);
if (historySettled) dispatchDecodeMessage(msg);
else liveBuffer.push(msg);
} catch (e) { /* ignore parse errors */ }
};
decodeSource.onerror = () => {
// readyState CLOSED (2) = server rejected (404/error), CONNECTING (0) = temporary drop
const wasClosed = decodeSource.readyState === 2;
decodeSource.close();
decodeConnected = false;
terminateDecodeHistoryWorker();
if (wasClosed) {
updateDecodeStatus("Decode not available (check client audio config)");
setTimeout(connectDecode, 10000);
} else {
updateDecodeStatus("Decode disconnected, retrying…");
setTimeout(connectDecode, 5000);
}
};
if (!startDecodeHistoryWorkerReplay()) {
startDecodeHistoryFallback();
}
} }
// connectDecode() is called from initializeApp() after auth succeeds, // connectDecode() is called from initializeApp() after auth succeeds,
// and from login/guest handlers — no standalone window.load call needed. // and from login/guest handlers — no standalone window.load call needed.
@@ -0,0 +1,189 @@
const textDecoder = typeof TextDecoder === "function" ? new TextDecoder() : null;
function decodeCborUint(view, bytes, state, additional) {
const offset = state.offset;
if (additional < 24) return additional;
if (additional === 24) {
if (offset + 1 > bytes.length) throw new Error("CBOR payload truncated");
state.offset += 1;
return bytes[offset];
}
if (additional === 25) {
if (offset + 2 > bytes.length) throw new Error("CBOR payload truncated");
state.offset += 2;
return view.getUint16(offset);
}
if (additional === 26) {
if (offset + 4 > bytes.length) throw new Error("CBOR payload truncated");
state.offset += 4;
return view.getUint32(offset);
}
if (additional === 27) {
if (offset + 8 > bytes.length) throw new Error("CBOR payload truncated");
const value = view.getBigUint64(offset);
state.offset += 8;
const numeric = Number(value);
if (!Number.isSafeInteger(numeric)) throw new Error("CBOR integer exceeds JS safe range");
return numeric;
}
throw new Error("Unsupported CBOR additional info");
}
function decodeCborFloat16(bits) {
const sign = (bits & 0x8000) ? -1 : 1;
const exponent = (bits >> 10) & 0x1f;
const fraction = bits & 0x03ff;
if (exponent === 0) {
return fraction === 0 ? sign * 0 : sign * Math.pow(2, -14) * (fraction / 1024);
}
if (exponent === 0x1f) {
return fraction === 0 ? sign * Infinity : Number.NaN;
}
return sign * Math.pow(2, exponent - 15) * (1 + (fraction / 1024));
}
function decodeCborItem(view, bytes, state) {
if (state.offset >= bytes.length) throw new Error("CBOR payload truncated");
const initial = bytes[state.offset++];
const major = initial >> 5;
const additional = initial & 0x1f;
if (major === 0) return decodeCborUint(view, bytes, state, additional);
if (major === 1) return -1 - decodeCborUint(view, bytes, state, additional);
if (major === 2) {
const length = decodeCborUint(view, bytes, state, additional);
if (state.offset + length > bytes.length) throw new Error("CBOR payload truncated");
const chunk = bytes.slice(state.offset, state.offset + length);
state.offset += length;
return Array.from(chunk);
}
if (major === 3) {
const length = decodeCborUint(view, bytes, state, additional);
if (state.offset + length > bytes.length) throw new Error("CBOR payload truncated");
const chunk = bytes.subarray(state.offset, state.offset + length);
state.offset += length;
return textDecoder ? textDecoder.decode(chunk) : String.fromCharCode(...chunk);
}
if (major === 4) {
const length = decodeCborUint(view, bytes, state, additional);
const items = new Array(length);
for (let i = 0; i < length; i += 1) {
items[i] = decodeCborItem(view, bytes, state);
}
return items;
}
if (major === 5) {
const length = decodeCborUint(view, bytes, state, additional);
const value = {};
for (let i = 0; i < length; i += 1) {
const key = decodeCborItem(view, bytes, state);
value[String(key)] = decodeCborItem(view, bytes, state);
}
return value;
}
if (major === 6) {
decodeCborUint(view, bytes, state, additional);
return decodeCborItem(view, bytes, state);
}
if (major === 7) {
if (additional === 20) return false;
if (additional === 21) return true;
if (additional === 22) return null;
if (additional === 23) return undefined;
if (additional === 25) {
if (state.offset + 2 > bytes.length) throw new Error("CBOR payload truncated");
const bits = view.getUint16(state.offset);
state.offset += 2;
return decodeCborFloat16(bits);
}
if (additional === 26) {
if (state.offset + 4 > bytes.length) throw new Error("CBOR payload truncated");
const value = view.getFloat32(state.offset);
state.offset += 4;
return value;
}
if (additional === 27) {
if (state.offset + 8 > bytes.length) throw new Error("CBOR payload truncated");
const value = view.getFloat64(state.offset);
state.offset += 8;
return value;
}
}
throw new Error("Unsupported CBOR major type");
}
function decodeTopLevelArrayLength(view, bytes, state) {
if (state.offset >= bytes.length) throw new Error("CBOR payload truncated");
const initial = bytes[state.offset++];
const major = initial >> 5;
const additional = initial & 0x1f;
if (major !== 4) throw new Error("Decode history payload is not a CBOR array");
return decodeCborUint(view, bytes, state, additional);
}
async function fetchAndDecodeHistory(url, batchLimit) {
self.postMessage({ type: "status", phase: "fetching" });
const resp = await fetch(url, { credentials: "same-origin" });
if (!resp.ok) throw new Error(`History fetch failed: ${resp.status}`);
const payload = await resp.arrayBuffer();
if (!payload || payload.byteLength === 0) {
self.postMessage({ type: "start", total: 0 });
self.postMessage({ type: "done", total: 0 });
return;
}
self.postMessage({ type: "status", phase: "decoding" });
const bytes = new Uint8Array(payload);
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength);
const state = { offset: 0 };
const total = decodeTopLevelArrayLength(view, bytes, state);
self.postMessage({ type: "start", total });
let processed = 0;
let currentType = "";
let currentBatch = [];
const safeLimit = Math.max(1, Math.min(512, Number(batchLimit) || 192));
function flushBatch() {
if (currentBatch.length === 0) return;
self.postMessage({
type: "batch",
batch: currentBatch,
processed,
total,
});
currentBatch = [];
currentType = "";
}
for (let i = 0; i < total; i += 1) {
const item = decodeCborItem(view, bytes, state);
const itemType = String(item?.type || "");
if (
currentBatch.length > 0
&& (itemType !== currentType || currentBatch.length >= safeLimit)
) {
flushBatch();
}
currentType = itemType;
currentBatch.push(item);
processed += 1;
}
flushBatch();
if (state.offset !== bytes.length) {
throw new Error("Unexpected trailing bytes in decode history payload");
}
self.postMessage({ type: "done", total });
}
self.onmessage = (event) => {
const data = event?.data || {};
if (data?.type !== "fetch-history") return;
fetchAndDecodeHistory(data.url || "/decode/history", data.batchLimit)
.catch((err) => {
self.postMessage({
type: "error",
message: err && err.message ? err.message : String(err || "unknown worker failure"),
});
});
};
@@ -1527,6 +1527,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(logo) .service(logo)
.service(style_css) .service(style_css)
.service(app_js) .service(app_js)
.service(decode_history_worker_js)
.service(webgl_renderer_js) .service(webgl_renderer_js)
.service(leaflet_ais_tracksymbol_js) .service(leaflet_ais_tracksymbol_js)
.service(ais_js) .service(ais_js)
@@ -1627,6 +1628,16 @@ async fn app_js() -> impl Responder {
.body(status::APP_JS) .body(status::APP_JS)
} }
#[get("/decode-history-worker.js")]
async fn decode_history_worker_js() -> impl Responder {
HttpResponse::Ok()
.insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::DECODE_HISTORY_WORKER_JS)
}
#[get("/webgl-renderer.js")] #[get("/webgl-renderer.js")]
async fn webgl_renderer_js() -> impl Responder { async fn webgl_renderer_js() -> impl Responder {
HttpResponse::Ok() HttpResponse::Ok()
@@ -9,6 +9,8 @@ const CLIENT_BUILD_DATE: &str = env!("TRX_CLIENT_BUILD_DATE");
const INDEX_HTML: &str = include_str!("../assets/web/index.html"); const INDEX_HTML: &str = include_str!("../assets/web/index.html");
pub const STYLE_CSS: &str = include_str!("../assets/web/style.css"); pub const STYLE_CSS: &str = include_str!("../assets/web/style.css");
pub const APP_JS: &str = include_str!("../assets/web/app.js"); pub const APP_JS: &str = include_str!("../assets/web/app.js");
pub const DECODE_HISTORY_WORKER_JS: &str =
include_str!("../assets/web/decode-history-worker.js");
pub const WEBGL_RENDERER_JS: &str = include_str!("../assets/web/webgl-renderer.js"); pub const WEBGL_RENDERER_JS: &str = include_str!("../assets/web/webgl-renderer.js");
pub const LEAFLET_AIS_TRACKSYMBOL_JS: &str = pub const LEAFLET_AIS_TRACKSYMBOL_JS: &str =
include_str!("../assets/web/leaflet-ais-tracksymbol.js"); include_str!("../assets/web/leaflet-ais-tracksymbol.js");