[fix](trx-frontend-http): speed up decode history replay

Serve decode history as gzipped CBOR and decode it in the frontend.
Defer map materialization until replay completes to avoid replay-time stutter,
and include the pending longest-QSO style adjustment.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-14 17:47:49 +01:00
parent a924902074
commit 4114e0b9fa
5 changed files with 374 additions and 86 deletions
Generated
+1
View File
@@ -2588,6 +2588,7 @@ dependencies = [
"actix-ws",
"bytes",
"dirs",
"flate2",
"futures-util",
"hex",
"pickledb",
@@ -21,6 +21,7 @@ actix-ws = "0.3"
tokio-stream = { version = "0.1", features = ["sync"] }
futures-util = "0.3"
bytes = "1"
flate2 = { workspace = true }
rand = "0.8"
hex = "0.4"
pickledb = "0.5"
@@ -396,6 +396,155 @@ function setDecodeHistoryOverlayVisible(visible, title = "", sub = "") {
if (decodeHistoryOverlaySubEl) decodeHistoryOverlaySubEl.textContent = sub || "";
decodeHistoryOverlayEl.classList.toggle("is-hidden", !visible);
}
const decodeHistoryTextDecoder = typeof TextDecoder === "function" ? new TextDecoder() : null;
let decodeHistoryReplayActive = false;
let decodeMapSyncPending = false;
function markDecodeMapSyncPending() {
decodeMapSyncPending = true;
}
function flushDeferredDecodeMapSync() {
if (!decodeMapSyncPending || decodeHistoryReplayActive || !aprsMap) return;
decodeMapSyncPending = false;
scheduleUiFrameJob("decode-map-maintenance", () => {
pruneMapHistory();
});
}
function setDecodeHistoryReplayActive(active) {
decodeHistoryReplayActive = !!active;
if (!decodeHistoryReplayActive) {
flushDeferredDecodeMapSync();
}
}
function decodeHistoryMapRenderingDeferred() {
return decodeHistoryReplayActive || !aprsMap;
}
function decodeCborUint(view, bytes, state, additional) {
const offset = state.offset;
if (additional < 24) return additional;
if (additional === 24) {
if (offset + 1 > bytes.length) throw new Error("CBOR payload truncated");
state.offset += 1;
return bytes[offset];
}
if (additional === 25) {
if (offset + 2 > bytes.length) throw new Error("CBOR payload truncated");
state.offset += 2;
return view.getUint16(offset);
}
if (additional === 26) {
if (offset + 4 > bytes.length) throw new Error("CBOR payload truncated");
state.offset += 4;
return view.getUint32(offset);
}
if (additional === 27) {
if (offset + 8 > bytes.length) throw new Error("CBOR payload truncated");
const value = view.getBigUint64(offset);
state.offset += 8;
const numeric = Number(value);
if (!Number.isSafeInteger(numeric)) throw new Error("CBOR integer exceeds JS safe range");
return numeric;
}
throw new Error("Unsupported CBOR additional info");
}
function decodeCborFloat16(bits) {
const sign = (bits & 0x8000) ? -1 : 1;
const exponent = (bits >> 10) & 0x1f;
const fraction = bits & 0x03ff;
if (exponent === 0) {
return fraction === 0 ? sign * 0 : sign * Math.pow(2, -14) * (fraction / 1024);
}
if (exponent === 0x1f) {
return fraction === 0 ? sign * Infinity : Number.NaN;
}
return sign * Math.pow(2, exponent - 15) * (1 + (fraction / 1024));
}
function decodeCborItem(view, bytes, state) {
if (state.offset >= bytes.length) throw new Error("CBOR payload truncated");
const initial = bytes[state.offset++];
const major = initial >> 5;
const additional = initial & 0x1f;
if (major === 0) return decodeCborUint(view, bytes, state, additional);
if (major === 1) return -1 - decodeCborUint(view, bytes, state, additional);
if (major === 2) {
const length = decodeCborUint(view, bytes, state, additional);
if (state.offset + length > bytes.length) throw new Error("CBOR payload truncated");
const chunk = bytes.slice(state.offset, state.offset + length);
state.offset += length;
return Array.from(chunk);
}
if (major === 3) {
const length = decodeCborUint(view, bytes, state, additional);
if (state.offset + length > bytes.length) throw new Error("CBOR payload truncated");
const chunk = bytes.subarray(state.offset, state.offset + length);
state.offset += length;
return decodeHistoryTextDecoder ? decodeHistoryTextDecoder.decode(chunk) : String.fromCharCode(...chunk);
}
if (major === 4) {
const length = decodeCborUint(view, bytes, state, additional);
const items = new Array(length);
for (let i = 0; i < length; i += 1) {
items[i] = decodeCborItem(view, bytes, state);
}
return items;
}
if (major === 5) {
const length = decodeCborUint(view, bytes, state, additional);
const value = {};
for (let i = 0; i < length; i += 1) {
const key = decodeCborItem(view, bytes, state);
value[String(key)] = decodeCborItem(view, bytes, state);
}
return value;
}
if (major === 6) {
decodeCborUint(view, bytes, state, additional);
return decodeCborItem(view, bytes, state);
}
if (major === 7) {
if (additional === 20) return false;
if (additional === 21) return true;
if (additional === 22) return null;
if (additional === 23) return undefined;
if (additional === 25) {
if (state.offset + 2 > bytes.length) throw new Error("CBOR payload truncated");
const bits = view.getUint16(state.offset);
state.offset += 2;
return decodeCborFloat16(bits);
}
if (additional === 26) {
if (state.offset + 4 > bytes.length) throw new Error("CBOR payload truncated");
const value = view.getFloat32(state.offset);
state.offset += 4;
return value;
}
if (additional === 27) {
if (state.offset + 8 > bytes.length) throw new Error("CBOR payload truncated");
const value = view.getFloat64(state.offset);
state.offset += 8;
return value;
}
}
throw new Error("Unsupported CBOR major type");
}
function decodeCborPayload(buffer) {
const bytes = buffer instanceof Uint8Array ? buffer : new Uint8Array(buffer);
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength);
const state = { offset: 0 };
const value = decodeCborItem(view, bytes, state);
if (state.offset !== bytes.length) {
throw new Error("Unexpected trailing bytes in CBOR payload");
}
return value;
}
let lastSpectrumData = null;
window.lastSpectrumData = null;
let lastControl;
@@ -4021,23 +4170,29 @@ function ensureDecodeLocatorMarker(entry) {
}
function pruneAprsEntry(call, entry, cutoffMs) {
const canRenderMap = !!aprsMap && !decodeHistoryReplayActive;
const pktTsMs = Number(entry?.pkt?._tsMs);
const visible = Number.isFinite(pktTsMs) && pktTsMs >= cutoffMs;
entry.visibleInHistoryWindow = visible;
entry.trackPoints = trimTrackHistory(entry.trackHistory, cutoffMs, APRS_TRACK_MAX_POINTS)
.map((point) => [point.lat, point.lon]);
if (canRenderMap) {
refreshAprsTrack(call, entry);
} else {
markDecodeMapSyncPending();
}
if (!visible) {
if (selectedAprsTrackCall && String(selectedAprsTrackCall) === String(call)) {
if (canRenderMap && selectedAprsTrackCall && String(selectedAprsTrackCall) === String(call)) {
selectedAprsTrackCall = null;
}
if (entry?.track) {
if (canRenderMap && entry?.track) {
entry.track.remove();
entry.track = null;
}
setRetainedMapMarkerVisible(entry?.marker, false);
if (canRenderMap) setRetainedMapMarkerVisible(entry?.marker, false);
return false;
}
if (!canRenderMap) return true;
ensureAprsMarker(call, entry);
setRetainedMapMarkerVisible(entry?.marker, true);
if (entry?.marker) {
@@ -4048,23 +4203,29 @@ function pruneAprsEntry(call, entry, cutoffMs) {
}
function pruneAisEntry(key, entry, cutoffMs) {
const canRenderMap = !!aprsMap && !decodeHistoryReplayActive;
const msgTsMs = Number(entry?.msg?._tsMs);
const visible = Number.isFinite(msgTsMs) && msgTsMs >= cutoffMs;
entry.visibleInHistoryWindow = visible;
entry.trackPoints = trimTrackHistory(entry.trackHistory, cutoffMs, AIS_TRACK_MAX_POINTS)
.map((point) => [point.lat, point.lon]);
if (canRenderMap) {
refreshAisTrack(key, entry);
} else {
markDecodeMapSyncPending();
}
if (!visible) {
if (selectedAisTrackMmsi && String(selectedAisTrackMmsi) === String(key)) {
if (canRenderMap && selectedAisTrackMmsi && String(selectedAisTrackMmsi) === String(key)) {
selectedAisTrackMmsi = null;
}
if (entry?.track) {
if (canRenderMap && entry?.track) {
entry.track.remove();
entry.track = null;
}
setRetainedMapMarkerVisible(entry?.marker, false);
if (canRenderMap) setRetainedMapMarkerVisible(entry?.marker, false);
return false;
}
if (!canRenderMap) return true;
ensureAisMarker(key, entry);
setRetainedMapMarkerVisible(entry?.marker, true);
if (entry?.marker) {
@@ -4074,6 +4235,7 @@ function pruneAisEntry(key, entry, cutoffMs) {
}
function pruneLocatorEntry(key, entry, cutoffMs) {
const canRenderMap = !!aprsMap && !decodeHistoryReplayActive;
if (!entry || (entry.sourceType !== "ft8" && entry.sourceType !== "wspr")) return true;
if (!(entry.allStationDetails instanceof Map)) {
entry.allStationDetails = entry.stationDetails instanceof Map
@@ -4092,7 +4254,8 @@ function pruneLocatorEntry(key, entry, cutoffMs) {
entry.stationDetails = new Map();
entry.stations = new Set();
entry.bandMeta = new Map();
setRetainedMapMarkerVisible(entry.marker, false);
if (canRenderMap) setRetainedMapMarkerVisible(entry.marker, false);
else markDecodeMapSyncPending();
return false;
}
const nextStations = new Set();
@@ -4106,6 +4269,10 @@ function pruneLocatorEntry(key, entry, cutoffMs) {
Array.from(nextDetails.values()).map((detail) => Number(detail?.freq_hz))
);
const count = Math.max(nextDetails.size, nextStations.size || 0, 1);
if (!canRenderMap) {
markDecodeMapSyncPending();
return true;
}
ensureDecodeLocatorMarker(entry);
setRetainedMapMarkerVisible(entry.marker, true);
if (entry.marker) {
@@ -4142,6 +4309,10 @@ function pruneMapHistory() {
for (const [key, entry] of locatorMarkers.entries()) {
pruneLocatorEntry(key, entry, cutoffMs);
}
if (!aprsMap || decodeHistoryReplayActive) {
markDecodeMapSyncPending();
return;
}
rebuildDecodeContactPaths();
rebuildMapLocatorFilters();
applyMapFilter();
@@ -5102,6 +5273,25 @@ document.addEventListener("keydown", (e) => {
}
});
function materializeBufferedMapLayers() {
if (!aprsMap) return;
for (const [key, entry] of locatorMarkers) {
if (!key.startsWith("bookmark:") || entry?.marker || !entry?.grid) continue;
const bounds = maidenheadToBounds(entry.grid);
if (!bounds) continue;
entry.sourceType = "bookmark";
entry.bandMeta = collectBandMeta((entry.bookmarks || []).map((bm) => Number(bm?.freq_hz)));
entry.marker = L.rectangle(bounds, locatorStyleForEntry(entry, entry.bookmarks?.length || 1))
.addTo(aprsMap)
.bindPopup(buildBookmarkLocatorPopupHtml(entry.grid, entry.bookmarks || []));
entry.marker.__trxType = "bookmark";
sendLocatorOverlayToBack(entry.marker);
assignLocatorMarkerMeta(entry.marker, entry.sourceType, entry.bandMeta);
mapMarkers.add(entry.marker);
}
pruneMapHistory();
}
function initAprsMap() {
const mapEl = document.getElementById("aprs-map");
if (!mapEl) return;
@@ -5203,30 +5393,7 @@ function initAprsMap() {
}
});
// Materialise any stations that were buffered before the map was ready
for (const [call, entry] of stationMarkers) {
if (entry.type === "aprs" && entry.visibleInHistoryWindow) {
ensureAprsMarker(call, entry);
}
}
for (const [key, entry] of locatorMarkers) {
if (!key.startsWith("bookmark:") || entry?.marker || !entry?.grid) continue;
const bounds = maidenheadToBounds(entry.grid);
if (!bounds) continue;
entry.sourceType = "bookmark";
entry.bandMeta = collectBandMeta((entry.bookmarks || []).map((bm) => Number(bm?.freq_hz)));
entry.marker = L.rectangle(bounds, locatorStyleForEntry(entry, entry.bookmarks?.length || 1))
.addTo(aprsMap)
.bindPopup(buildBookmarkLocatorPopupHtml(entry.grid, entry.bookmarks || []));
entry.marker.__trxType = "bookmark";
sendLocatorOverlayToBack(entry.marker);
assignLocatorMarkerMeta(entry.marker, entry.sourceType, entry.bandMeta);
mapMarkers.add(entry.marker);
}
pruneMapHistory();
rebuildDecodeContactPaths();
rebuildMapLocatorFilters();
applyMapFilter();
materializeBufferedMapLayers();
const locatorPhaseEl = document.getElementById("map-locator-phase");
const locatorChoiceEl = document.getElementById("map-locator-choice-filter");
@@ -5749,7 +5916,7 @@ window.aprsMapAddStation = function(call, lat, lon, info, symbolTable, symbolCod
prevPoint.tsMs = tsMs;
}
pruneAprsEntry(call, existing, mapHistoryCutoffMs());
if (aprsMap && existing.marker) {
if (aprsMap && existing.marker && !decodeHistoryReplayActive) {
existing.marker.setLatLng([lat, lon]);
existing.marker.setPopupContent(buildAprsPopupHtml(call, lat, lon, info, pkt));
}
@@ -5856,7 +6023,6 @@ function refreshAisMarkerColors() {
window.aisMapAddVessel = function(msg) {
if (msg == null || msg.lat == null || msg.lon == null || !Number.isFinite(msg.mmsi)) return;
if (!aprsMap) initAprsMap();
const key = String(msg.mmsi);
const popupHtml = buildAisPopupHtml(msg);
const nextPoint = [msg.lat, msg.lon];
@@ -5872,7 +6038,7 @@ window.aisMapAddVessel = function(msg) {
prevPoint.tsMs = tsMs;
}
pruneAisEntry(key, existing, mapHistoryCutoffMs());
if (existing.marker) {
if (aprsMap && existing.marker && !decodeHistoryReplayActive) {
updateAisMarker(existing.marker, msg, popupHtml);
}
return;
@@ -5893,7 +6059,6 @@ window.vdesMapAddPoint = function(msg) {
if (msg == null || msg.lat == null || msg.lon == null) return;
const key = vdesMarkerKey(msg);
if (!key) return;
if (!aprsMap) initAprsMap();
const popupHtml = buildVdesPopupHtml(msg);
const visible = Number.isFinite(Number(msg?._tsMs))
&& Number(msg._tsMs) >= mapHistoryCutoffMs();
@@ -5902,12 +6067,20 @@ window.vdesMapAddPoint = function(msg) {
existing.msg = msg;
existing.visibleInHistoryWindow = visible;
if (!visible) {
if (!decodeHistoryMapRenderingDeferred()) {
setRetainedMapMarkerVisible(existing.marker, false);
} else {
markDecodeMapSyncPending();
}
return;
}
if (!decodeHistoryMapRenderingDeferred()) {
ensureVdesMarker(key, existing);
setRetainedMapMarkerVisible(existing.marker, true);
if (existing.marker) {
} else {
markDecodeMapSyncPending();
}
if (aprsMap && existing.marker && !decodeHistoryReplayActive) {
existing.marker.setLatLng([msg.lat, msg.lon]);
existing.marker.setPopupContent(popupHtml);
}
@@ -5920,9 +6093,13 @@ window.vdesMapAddPoint = function(msg) {
};
vdesMarkers.set(key, entry);
if (!visible) return;
if (!decodeHistoryMapRenderingDeferred()) {
ensureVdesMarker(key, entry);
setRetainedMapMarkerVisible(entry.marker, true);
if (entry.marker) {
} else {
markDecodeMapSyncPending();
}
if (aprsMap && entry.marker && !decodeHistoryReplayActive) {
entry.marker.setPopupContent(popupHtml);
}
scheduleDecodeMapMaintenance();
@@ -5998,6 +6175,10 @@ function updateMapP2pPathsToggle() {
}
function scheduleDecodeMapMaintenance() {
if (decodeHistoryMapRenderingDeferred()) {
markDecodeMapSyncPending();
return;
}
scheduleUiFrameJob("decode-map-maintenance", () => {
rebuildDecodeContactPaths();
rebuildMapLocatorFilters();
@@ -6325,8 +6506,6 @@ window.syncBookmarkMapLocators = function(bookmarks) {
};
window.ft8MapAddLocator = function(message, grids, type = "ft8", station = null, details = null) {
if (!aprsMap) initAprsMap();
if (!aprsMap) return;
if (!Array.isArray(grids) || grids.length === 0) return;
const markerType = type === "wspr" ? "wspr" : "ft8";
const unique = [...new Set(grids.map((g) => String(g).toUpperCase()))];
@@ -7181,6 +7360,8 @@ function drainDecodeHistory(buffer, index, onDone, onProgress) {
function connectDecode() {
if (decodeSource) { decodeSource.close(); }
decodeHistoryReplayActive = false;
decodeMapSyncPending = false;
if (window.resetAisHistoryView) window.resetAisHistoryView();
if (window.resetVdesHistoryView) window.resetVdesHistoryView();
if (window.resetAprsHistoryView) window.resetAprsHistoryView();
@@ -7194,6 +7375,7 @@ function connectDecode() {
const liveBuffer = [];
function flushLiveBuffer() {
historySettled = true;
setDecodeHistoryReplayActive(false);
setDecodeHistoryOverlayVisible(false);
for (const msg of liveBuffer) {
try { dispatchDecodeMessage(msg); } catch (_) {}
@@ -7231,12 +7413,17 @@ function connectDecode() {
};
// Fetch history in parallel; drain it first, then flush buffered live msgs.
fetch("/decode/history").then((resp) => {
fetch("/decode/history").then(async (resp) => {
if (!resp.ok) return null;
return resp.json();
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload");
const payload = await resp.arrayBuffer();
if (!payload || payload.byteLength === 0) return [];
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Decoding compressed history payload");
return decodeCborPayload(payload);
}).then((msgs) => {
clearTimeout(historyTimeout);
if (Array.isArray(msgs) && msgs.length > 0) {
setDecodeHistoryReplayActive(true);
setDecodeHistoryOverlayVisible(true, "Loading decode history…", `Replaying 0 / ${msgs.length} decoded messages`);
drainDecodeHistory(
msgs,
@@ -7253,7 +7440,11 @@ function connectDecode() {
} else {
flushLiveBuffer();
}
}).catch(() => { clearTimeout(historyTimeout); flushLiveBuffer(); });
}).catch((err) => {
console.error("Decode history fetch failed", err);
clearTimeout(historyTimeout);
flushLiveBuffer();
});
}
// connectDecode() is called from initializeApp() after auth succeeds,
// and from login/guest handlers — no standalone window.load call needed.
@@ -1330,14 +1330,21 @@ small { color: var(--text-muted); }
.map-qso-card {
display: flex;
appearance: none;
-webkit-appearance: none;
width: 100%;
flex-direction: column;
align-items: stretch;
gap: 0.5rem;
min-width: 0;
margin: 0;
padding: 0.8rem 0.85rem;
box-sizing: border-box;
border-radius: 0.75rem;
border: 1px solid color-mix(in srgb, var(--border-light) 74%, transparent);
background: color-mix(in srgb, var(--card-bg) 78%, transparent);
font: inherit;
color: inherit;
line-height: inherit;
text-align: left;
cursor: pointer;
transition: border-color 120ms ease, background 120ms ease, transform 120ms ease, box-shadow 120ms ease;
@@ -2,12 +2,15 @@
//
// SPDX-License-Identifier: BSD-2-Clause
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use actix_web::{delete, get, post, put, web, HttpRequest, HttpResponse, Responder};
use actix_web::{http::header, Error};
use bytes::Bytes;
use flate2::write::GzEncoder;
use flate2::Compression;
use futures_util::stream::{select, StreamExt};
use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::time::{self, Duration};
@@ -441,46 +444,114 @@ fn sync_scheduler_vchannels(
/// Build the combined decode history vector from all per-decoder ring-buffers.
fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec<trx_core::decode::DecodedMessage> {
let mut out = Vec::new();
out.extend(
crate::server::audio::snapshot_ais_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Ais),
);
out.extend(
crate::server::audio::snapshot_vdes_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Vdes),
);
out.extend(
crate::server::audio::snapshot_aprs_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Aprs),
);
out.extend(
crate::server::audio::snapshot_hf_aprs_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::HfAprs),
);
out.extend(
crate::server::audio::snapshot_cw_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Cw),
);
out.extend(
crate::server::audio::snapshot_ft8_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Ft8),
);
out.extend(
crate::server::audio::snapshot_wspr_history(context)
.into_iter()
.map(trx_core::decode::DecodedMessage::Wspr),
let ais = crate::server::audio::snapshot_ais_history(context);
let vdes = crate::server::audio::snapshot_vdes_history(context);
let aprs = crate::server::audio::snapshot_aprs_history(context);
let hf_aprs = crate::server::audio::snapshot_hf_aprs_history(context);
let cw = crate::server::audio::snapshot_cw_history(context);
let ft8 = crate::server::audio::snapshot_ft8_history(context);
let wspr = crate::server::audio::snapshot_wspr_history(context);
let mut out = Vec::with_capacity(
ais.len()
+ vdes.len()
+ aprs.len()
+ hf_aprs.len()
+ cw.len()
+ ft8.len()
+ wspr.len(),
);
out.extend(ais.into_iter().map(trx_core::decode::DecodedMessage::Ais));
out.extend(vdes.into_iter().map(trx_core::decode::DecodedMessage::Vdes));
out.extend(aprs.into_iter().map(trx_core::decode::DecodedMessage::Aprs));
out.extend(hf_aprs.into_iter().map(trx_core::decode::DecodedMessage::HfAprs));
out.extend(cw.into_iter().map(trx_core::decode::DecodedMessage::Cw));
out.extend(ft8.into_iter().map(trx_core::decode::DecodedMessage::Ft8));
out.extend(wspr.into_iter().map(trx_core::decode::DecodedMessage::Wspr));
out
}
/// `GET /decode/history` — returns the full decode history as a JSON array.
fn encode_cbor_length(out: &mut Vec<u8>, major: u8, value: u64) {
debug_assert!(major <= 7);
match value {
0..=23 => out.push((major << 5) | (value as u8)),
24..=0xff => {
out.push((major << 5) | 24);
out.push(value as u8);
}
0x100..=0xffff => {
out.push((major << 5) | 25);
out.extend_from_slice(&(value as u16).to_be_bytes());
}
0x1_0000..=0xffff_ffff => {
out.push((major << 5) | 26);
out.extend_from_slice(&(value as u32).to_be_bytes());
}
_ => {
out.push((major << 5) | 27);
out.extend_from_slice(&value.to_be_bytes());
}
}
}
fn encode_cbor_json_value(out: &mut Vec<u8>, value: &serde_json::Value) {
match value {
serde_json::Value::Null => out.push(0xf6),
serde_json::Value::Bool(false) => out.push(0xf4),
serde_json::Value::Bool(true) => out.push(0xf5),
serde_json::Value::Number(number) => {
if let Some(value) = number.as_u64() {
encode_cbor_length(out, 0, value);
} else if let Some(value) = number.as_i64() {
if value >= 0 {
encode_cbor_length(out, 0, value as u64);
} else {
encode_cbor_length(out, 1, value.unsigned_abs() - 1);
}
} else if let Some(value) = number.as_f64() {
out.push(0xfb);
out.extend_from_slice(&value.to_be_bytes());
} else {
out.push(0xf6);
}
}
serde_json::Value::String(text) => {
encode_cbor_length(out, 3, text.len() as u64);
out.extend_from_slice(text.as_bytes());
}
serde_json::Value::Array(items) => {
encode_cbor_length(out, 4, items.len() as u64);
for item in items {
encode_cbor_json_value(out, item);
}
}
serde_json::Value::Object(map) => {
encode_cbor_length(out, 5, map.len() as u64);
for (key, item) in map {
encode_cbor_length(out, 3, key.len() as u64);
out.extend_from_slice(key.as_bytes());
encode_cbor_json_value(out, item);
}
}
}
}
fn encode_decode_history_cbor(
history: &[trx_core::decode::DecodedMessage],
) -> Result<Vec<u8>, serde_json::Error> {
let value = serde_json::to_value(history)?;
let mut out = Vec::with_capacity(history.len().saturating_mul(96));
encode_cbor_json_value(&mut out, &value);
Ok(out)
}
fn gzip_bytes(payload: &[u8]) -> std::io::Result<Vec<u8>> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder.write_all(payload)?;
encoder.finish()
}
/// `GET /decode/history` — returns the full decode history as gzipped CBOR.
///
/// Separated from the live `/decode` SSE stream so that history replay does
/// not block real-time messages: the client fetches this endpoint in parallel
@@ -493,7 +564,24 @@ pub async fn decode_history(
return HttpResponse::NotFound().body("decode not enabled");
}
let history = collect_decode_history(context.get_ref());
HttpResponse::Ok().json(history)
let cbor = match encode_decode_history_cbor(&history) {
Ok(cbor) => cbor,
Err(err) => {
tracing::error!("failed to encode decode history as CBOR: {err}");
return HttpResponse::InternalServerError().finish();
}
};
let payload = match gzip_bytes(&cbor) {
Ok(payload) => payload,
Err(err) => {
tracing::error!("failed to gzip decode history payload: {err}");
return HttpResponse::InternalServerError().finish();
}
};
HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/cbor"))
.insert_header((header::CONTENT_ENCODING, "gzip"))
.body(payload)
}
#[get("/decode")]