From cba3751f0eca4a86bdac3c483b7ede66eb383f5c Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 14 Mar 2026 18:04:11 +0100 Subject: [PATCH] [fix](trx-frontend-http): move history decode off main thread Serve a dedicated decode-history worker and move compressed history fetch and CBOR parsing into that worker. The main thread now drains ready-made decode batches within a frame budget, which further reduces UI disruption during large history restores. Co-authored-by: OpenAI Codex Signed-off-by: Stan Grams --- .../trx-frontend-http/assets/web/app.js | 210 +++++++++++++++--- .../assets/web/decode-history-worker.js | 189 ++++++++++++++++ .../trx-frontend/trx-frontend-http/src/api.rs | 11 + .../trx-frontend-http/src/status.rs | 2 + 4 files changed, 377 insertions(+), 35 deletions(-) create mode 100644 src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index e7ea3ab..6d60d05 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -7294,6 +7294,7 @@ document.getElementById("copyright-year").textContent = new Date().getFullYear() // --- Server-side decode SSE --- let decodeSource = null; let decodeConnected = false; +let decodeHistoryWorker = null; function setModeBoundDecodeStatus(el, activeModes, inactiveText, connectedText) { if (!el) return; const modeUpper = (document.getElementById("mode")?.value || "").toUpperCase(); @@ -7363,6 +7364,13 @@ function dispatchDecodeBatch(batch) { const DECODE_HISTORY_MAX_BATCH = 256; const DECODE_HISTORY_TYPE_BATCH_LIMIT = 192; const DECODE_HISTORY_SLICE_BUDGET_MS = 10; +const DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS = 8; + +function terminateDecodeHistoryWorker() { + if (!decodeHistoryWorker) return; + try { decodeHistoryWorker.terminate(); } catch (_) {} + decodeHistoryWorker = null; +} function scheduleDecodeHistoryDrainStep(callback) { if (typeof callback !== "function") return; @@ -7404,8 +7412,24 @@ function drainDecodeHistory(buffer, index, onDone, onProgress) { } } +function loadDecodeHistoryOnMainThread(onReady, onError) { + fetch("/decode/history").then(async (resp) => { + if (!resp.ok) return null; + setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload"); + const payload = await resp.arrayBuffer(); + if (!payload || payload.byteLength === 0) return []; + setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload"); + return decodeCborPayload(payload); + }).then((msgs) => { + if (typeof onReady === "function") onReady(Array.isArray(msgs) ? msgs : []); + }).catch((err) => { + if (typeof onError === "function") onError(err); + }); +} + function connectDecode() { if (decodeSource) { decodeSource.close(); } + terminateDecodeHistoryWorker(); decodeHistoryReplayActive = false; decodeMapSyncPending = false; if (window.resetAisHistoryView) window.resetAisHistoryView(); @@ -7418,9 +7442,16 @@ function connectDecode() { // Buffer live messages until history fetch settles so history always appears // before any live updates, regardless of network ordering. let historySettled = false; + let historyWorkerDone = false; + let historyFallbackStarted = false; + let historyBatchDrainScheduled = false; + let historyTotal = 0; + let historyProcessed = 0; + const historyBatchQueue = []; const liveBuffer = []; function flushLiveBuffer() { historySettled = true; + terminateDecodeHistoryWorker(); setDecodeHistoryReplayActive(false); setDecodeHistoryOverlayVisible(false); for (const msg of liveBuffer) { @@ -7428,8 +7459,146 @@ function connectDecode() { } liveBuffer.length = 0; } - // Safety valve: if the history fetch hangs, unblock after 8 s. - const historyTimeout = setTimeout(() => { if (!historySettled) flushLiveBuffer(); }, 8000); + + function updateHistoryReplayOverlay() { + setDecodeHistoryOverlayVisible( + true, + "Loading decode history…", + `Replaying ${historyProcessed} / ${historyTotal} decoded messages` + ); + } + + function maybeFinishHistoryReplay() { + if (historySettled) return; + if (historyWorkerDone && historyBatchQueue.length === 0) { + clearTimeout(historyTimeout); + flushLiveBuffer(); + } + } + + function pumpDecodeHistoryBatchQueue() { + historyBatchDrainScheduled = false; + const startedAt = typeof performance !== "undefined" && typeof performance.now === "function" + ? performance.now() + : 0; + while (historyBatchQueue.length > 0) { + const batch = historyBatchQueue.shift(); + dispatchDecodeBatch(batch); + historyProcessed += Array.isArray(batch) ? batch.length : 0; + updateHistoryReplayOverlay(); + if (startedAt > 0 && (performance.now() - startedAt) >= DECODE_HISTORY_BATCH_DRAIN_BUDGET_MS) { + break; + } + } + if (historyBatchQueue.length > 0) { + scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue); + historyBatchDrainScheduled = true; + return; + } + maybeFinishHistoryReplay(); + } + + function enqueueDecodeHistoryBatch(batch) { + if (!Array.isArray(batch) || batch.length === 0) return; + historyBatchQueue.push(batch); + if (historyBatchDrainScheduled) return; + historyBatchDrainScheduled = true; + scheduleDecodeHistoryDrainStep(pumpDecodeHistoryBatchQueue); + } + + function startDecodeHistoryFallback() { + if (historyFallbackStarted || historySettled) return; + historyFallbackStarted = true; + loadDecodeHistoryOnMainThread((msgs) => { + clearTimeout(historyTimeout); + if (Array.isArray(msgs) && msgs.length > 0) { + setDecodeHistoryReplayActive(true); + setDecodeHistoryOverlayVisible(true, "Loading decode history…", `Replaying 0 / ${msgs.length} decoded messages`); + drainDecodeHistory( + msgs, + 0, + flushLiveBuffer, + (processed, total) => { + setDecodeHistoryOverlayVisible( + true, + "Loading decode history…", + `Replaying ${processed} / ${total} decoded messages` + ); + } + ); + } else { + flushLiveBuffer(); + } + }, (err) => { + console.error("Decode history fallback failed", err); + clearTimeout(historyTimeout); + flushLiveBuffer(); + }); + } + + function startDecodeHistoryWorkerReplay() { + if (typeof Worker !== "function") return false; + let worker; + try { + worker = new Worker("/decode-history-worker.js"); + } catch (err) { + console.error("Decode history worker startup failed", err); + return false; + } + decodeHistoryWorker = worker; + worker.onmessage = (evt) => { + if (historySettled || worker !== decodeHistoryWorker) return; + const data = evt?.data || {}; + if (data.type === "status") { + const phase = String(data.phase || ""); + if (phase === "fetching") { + setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Fetching recent decodes from the client buffer"); + } else if (phase === "decoding") { + setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history in background"); + } + return; + } + if (data.type === "start") { + historyTotal = Math.max(0, Number(data.total) || 0); + historyProcessed = 0; + if (historyTotal > 0) { + setDecodeHistoryReplayActive(true); + updateHistoryReplayOverlay(); + } + return; + } + if (data.type === "batch") { + enqueueDecodeHistoryBatch(data.batch); + return; + } + if (data.type === "done") { + historyWorkerDone = true; + clearTimeout(historyTimeout); + terminateDecodeHistoryWorker(); + maybeFinishHistoryReplay(); + return; + } + if (data.type === "error") { + console.error("Decode history worker failed", data.message || "unknown worker failure"); + terminateDecodeHistoryWorker(); + startDecodeHistoryFallback(); + } + }; + worker.postMessage({ + type: "fetch-history", + url: "/decode/history", + batchLimit: DECODE_HISTORY_TYPE_BATCH_LIMIT, + }); + return true; + } + + // Safety valve: if the history fetch hangs, unblock after 20 s. + const historyTimeout = setTimeout(() => { + if (!historySettled) { + terminateDecodeHistoryWorker(); + flushLiveBuffer(); + } + }, 20000); setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Fetching recent decodes from the client buffer"); decodeSource = new EventSource("/decode"); @@ -7449,6 +7618,7 @@ function connectDecode() { const wasClosed = decodeSource.readyState === 2; decodeSource.close(); decodeConnected = false; + terminateDecodeHistoryWorker(); if (wasClosed) { updateDecodeStatus("Decode not available (check client audio config)"); setTimeout(connectDecode, 10000); @@ -7458,39 +7628,9 @@ function connectDecode() { } }; - // Fetch history in parallel; drain it first, then flush buffered live msgs. - fetch("/decode/history").then(async (resp) => { - if (!resp.ok) return null; - setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload"); - const payload = await resp.arrayBuffer(); - if (!payload || payload.byteLength === 0) return []; - setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload"); - return decodeCborPayload(payload); - }).then((msgs) => { - clearTimeout(historyTimeout); - if (Array.isArray(msgs) && msgs.length > 0) { - setDecodeHistoryReplayActive(true); - setDecodeHistoryOverlayVisible(true, "Loading decode history…", `Replaying 0 / ${msgs.length} decoded messages`); - drainDecodeHistory( - msgs, - 0, - flushLiveBuffer, - (processed, total) => { - setDecodeHistoryOverlayVisible( - true, - "Loading decode history…", - `Replaying ${processed} / ${total} decoded messages` - ); - } - ); - } else { - flushLiveBuffer(); - } - }).catch((err) => { - console.error("Decode history fetch failed", err); - clearTimeout(historyTimeout); - flushLiveBuffer(); - }); + if (!startDecodeHistoryWorkerReplay()) { + startDecodeHistoryFallback(); + } } // connectDecode() is called from initializeApp() after auth succeeds, // and from login/guest handlers — no standalone window.load call needed. diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js new file mode 100644 index 0000000..e2e6908 --- /dev/null +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/decode-history-worker.js @@ -0,0 +1,189 @@ +const textDecoder = typeof TextDecoder === "function" ? new TextDecoder() : null; + +function decodeCborUint(view, bytes, state, additional) { + const offset = state.offset; + if (additional < 24) return additional; + if (additional === 24) { + if (offset + 1 > bytes.length) throw new Error("CBOR payload truncated"); + state.offset += 1; + return bytes[offset]; + } + if (additional === 25) { + if (offset + 2 > bytes.length) throw new Error("CBOR payload truncated"); + state.offset += 2; + return view.getUint16(offset); + } + if (additional === 26) { + if (offset + 4 > bytes.length) throw new Error("CBOR payload truncated"); + state.offset += 4; + return view.getUint32(offset); + } + if (additional === 27) { + if (offset + 8 > bytes.length) throw new Error("CBOR payload truncated"); + const value = view.getBigUint64(offset); + state.offset += 8; + const numeric = Number(value); + if (!Number.isSafeInteger(numeric)) throw new Error("CBOR integer exceeds JS safe range"); + return numeric; + } + throw new Error("Unsupported CBOR additional info"); +} + +function decodeCborFloat16(bits) { + const sign = (bits & 0x8000) ? -1 : 1; + const exponent = (bits >> 10) & 0x1f; + const fraction = bits & 0x03ff; + if (exponent === 0) { + return fraction === 0 ? sign * 0 : sign * Math.pow(2, -14) * (fraction / 1024); + } + if (exponent === 0x1f) { + return fraction === 0 ? sign * Infinity : Number.NaN; + } + return sign * Math.pow(2, exponent - 15) * (1 + (fraction / 1024)); +} + +function decodeCborItem(view, bytes, state) { + if (state.offset >= bytes.length) throw new Error("CBOR payload truncated"); + const initial = bytes[state.offset++]; + const major = initial >> 5; + const additional = initial & 0x1f; + if (major === 0) return decodeCborUint(view, bytes, state, additional); + if (major === 1) return -1 - decodeCborUint(view, bytes, state, additional); + if (major === 2) { + const length = decodeCborUint(view, bytes, state, additional); + if (state.offset + length > bytes.length) throw new Error("CBOR payload truncated"); + const chunk = bytes.slice(state.offset, state.offset + length); + state.offset += length; + return Array.from(chunk); + } + if (major === 3) { + const length = decodeCborUint(view, bytes, state, additional); + if (state.offset + length > bytes.length) throw new Error("CBOR payload truncated"); + const chunk = bytes.subarray(state.offset, state.offset + length); + state.offset += length; + return textDecoder ? textDecoder.decode(chunk) : String.fromCharCode(...chunk); + } + if (major === 4) { + const length = decodeCborUint(view, bytes, state, additional); + const items = new Array(length); + for (let i = 0; i < length; i += 1) { + items[i] = decodeCborItem(view, bytes, state); + } + return items; + } + if (major === 5) { + const length = decodeCborUint(view, bytes, state, additional); + const value = {}; + for (let i = 0; i < length; i += 1) { + const key = decodeCborItem(view, bytes, state); + value[String(key)] = decodeCborItem(view, bytes, state); + } + return value; + } + if (major === 6) { + decodeCborUint(view, bytes, state, additional); + return decodeCborItem(view, bytes, state); + } + if (major === 7) { + if (additional === 20) return false; + if (additional === 21) return true; + if (additional === 22) return null; + if (additional === 23) return undefined; + if (additional === 25) { + if (state.offset + 2 > bytes.length) throw new Error("CBOR payload truncated"); + const bits = view.getUint16(state.offset); + state.offset += 2; + return decodeCborFloat16(bits); + } + if (additional === 26) { + if (state.offset + 4 > bytes.length) throw new Error("CBOR payload truncated"); + const value = view.getFloat32(state.offset); + state.offset += 4; + return value; + } + if (additional === 27) { + if (state.offset + 8 > bytes.length) throw new Error("CBOR payload truncated"); + const value = view.getFloat64(state.offset); + state.offset += 8; + return value; + } + } + throw new Error("Unsupported CBOR major type"); +} + +function decodeTopLevelArrayLength(view, bytes, state) { + if (state.offset >= bytes.length) throw new Error("CBOR payload truncated"); + const initial = bytes[state.offset++]; + const major = initial >> 5; + const additional = initial & 0x1f; + if (major !== 4) throw new Error("Decode history payload is not a CBOR array"); + return decodeCborUint(view, bytes, state, additional); +} + +async function fetchAndDecodeHistory(url, batchLimit) { + self.postMessage({ type: "status", phase: "fetching" }); + const resp = await fetch(url, { credentials: "same-origin" }); + if (!resp.ok) throw new Error(`History fetch failed: ${resp.status}`); + const payload = await resp.arrayBuffer(); + if (!payload || payload.byteLength === 0) { + self.postMessage({ type: "start", total: 0 }); + self.postMessage({ type: "done", total: 0 }); + return; + } + + self.postMessage({ type: "status", phase: "decoding" }); + const bytes = new Uint8Array(payload); + const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength); + const state = { offset: 0 }; + const total = decodeTopLevelArrayLength(view, bytes, state); + self.postMessage({ type: "start", total }); + + let processed = 0; + let currentType = ""; + let currentBatch = []; + const safeLimit = Math.max(1, Math.min(512, Number(batchLimit) || 192)); + + function flushBatch() { + if (currentBatch.length === 0) return; + self.postMessage({ + type: "batch", + batch: currentBatch, + processed, + total, + }); + currentBatch = []; + currentType = ""; + } + + for (let i = 0; i < total; i += 1) { + const item = decodeCborItem(view, bytes, state); + const itemType = String(item?.type || ""); + if ( + currentBatch.length > 0 + && (itemType !== currentType || currentBatch.length >= safeLimit) + ) { + flushBatch(); + } + currentType = itemType; + currentBatch.push(item); + processed += 1; + } + flushBatch(); + + if (state.offset !== bytes.length) { + throw new Error("Unexpected trailing bytes in decode history payload"); + } + self.postMessage({ type: "done", total }); +} + +self.onmessage = (event) => { + const data = event?.data || {}; + if (data?.type !== "fetch-history") return; + fetchAndDecodeHistory(data.url || "/decode/history", data.batchLimit) + .catch((err) => { + self.postMessage({ + type: "error", + message: err && err.message ? err.message : String(err || "unknown worker failure"), + }); + }); +}; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index b7850d3..4661e6f 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -1527,6 +1527,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(logo) .service(style_css) .service(app_js) + .service(decode_history_worker_js) .service(webgl_renderer_js) .service(leaflet_ais_tracksymbol_js) .service(ais_js) @@ -1627,6 +1628,16 @@ async fn app_js() -> impl Responder { .body(status::APP_JS) } +#[get("/decode-history-worker.js")] +async fn decode_history_worker_js() -> impl Responder { + HttpResponse::Ok() + .insert_header(( + header::CONTENT_TYPE, + "application/javascript; charset=utf-8", + )) + .body(status::DECODE_HISTORY_WORKER_JS) +} + #[get("/webgl-renderer.js")] async fn webgl_renderer_js() -> impl Responder { HttpResponse::Ok() diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs index af98b42..8d90a88 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs @@ -9,6 +9,8 @@ const CLIENT_BUILD_DATE: &str = env!("TRX_CLIENT_BUILD_DATE"); const INDEX_HTML: &str = include_str!("../assets/web/index.html"); pub const STYLE_CSS: &str = include_str!("../assets/web/style.css"); pub const APP_JS: &str = include_str!("../assets/web/app.js"); +pub const DECODE_HISTORY_WORKER_JS: &str = + include_str!("../assets/web/decode-history-worker.js"); pub const WEBGL_RENDERER_JS: &str = include_str!("../assets/web/webgl-renderer.js"); pub const LEAFLET_AIS_TRACKSYMBOL_JS: &str = include_str!("../assets/web/leaflet-ais-tracksymbol.js");