From daedd9182972c1a6a0b4ade1aed31d4e5e6d98c3 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 14 Mar 2026 14:09:04 +0100 Subject: [PATCH] [fix](trx-client): restore decode history replay Write compressed audio-history replay directly into the local frontend history buffers so large APRS and AIS replays survive trx-client restart instead of overrunning the live decode broadcast channel. Verification: cargo test -p trx-client --no-run Verification: cargo test -p trx-frontend-http --no-run Co-authored-by: OpenAI Codex Signed-off-by: Stan Grams --- src/trx-client/src/audio_client.rs | 7 ++- src/trx-client/src/main.rs | 69 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index e47a524..dc82b25 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -54,6 +54,7 @@ pub async fn run_audio_client( mut tx_rx: mpsc::Receiver, stream_info_tx: watch::Sender>, decode_tx: broadcast::Sender, + replay_history_sink: Option>, mut shutdown_rx: watch::Receiver, vchan_audio: Arc>>>, mut vchan_cmd_rx: mpsc::UnboundedReceiver, @@ -97,6 +98,7 @@ pub async fn run_audio_client( &mut tx_rx, &stream_info_tx, &decode_tx, + replay_history_sink.clone(), &mut shutdown_rx, &vchan_audio, &mut vchan_cmd_rx, @@ -144,6 +146,7 @@ async fn handle_audio_connection( tx_rx: &mut mpsc::Receiver, stream_info_tx: &watch::Sender>, decode_tx: &broadcast::Sender, + replay_history_sink: Option>, shutdown_rx: &mut watch::Receiver, vchan_audio: &Arc>>>, vchan_cmd_rx: &mut mpsc::UnboundedReceiver, @@ -269,7 +272,9 @@ async fn handle_audio_connection( } let json = &decompressed[pos..pos + len]; if let Ok(msg) = serde_json::from_slice::(json) { - let _ = decode_tx.send(msg); + if let Some(ref sink) = replay_history_sink { + sink(msg); + } } pos += len; } diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 503ff2f..f1e73cd 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -306,6 +306,66 @@ async fn async_init() -> DynResult { let (vchan_destroyed_tx, _) = broadcast::channel::(64); frontend_runtime.vchan_destroyed = Some(vchan_destroyed_tx.clone()); + let ais_history = frontend_runtime.ais_history.clone(); + let vdes_history = frontend_runtime.vdes_history.clone(); + let aprs_history = frontend_runtime.aprs_history.clone(); + let hf_aprs_history = frontend_runtime.hf_aprs_history.clone(); + let cw_history = frontend_runtime.cw_history.clone(); + let ft8_history = frontend_runtime.ft8_history.clone(); + let wspr_history = frontend_runtime.wspr_history.clone(); + let replay_history_sink: Arc = + Arc::new(move |msg| { + let now = std::time::Instant::now(); + match msg { + DecodedMessage::Ais(mut message) => { + if message.ts_ms.is_none() { + message.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = ais_history.lock() { + history.push_back((now, message)); + } + } + DecodedMessage::Vdes(mut message) => { + if message.ts_ms.is_none() { + message.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = vdes_history.lock() { + history.push_back((now, message)); + } + } + DecodedMessage::Aprs(mut packet) => { + if packet.ts_ms.is_none() { + packet.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = aprs_history.lock() { + history.push_back((now, packet)); + } + } + DecodedMessage::HfAprs(mut packet) => { + if packet.ts_ms.is_none() { + packet.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = hf_aprs_history.lock() { + history.push_back((now, packet)); + } + } + DecodedMessage::Cw(event) => { + if let Ok(mut history) = cw_history.lock() { + history.push_back((now, event)); + } + } + DecodedMessage::Ft8(message) => { + if let Ok(mut history) = ft8_history.lock() { + history.push_back((now, message)); + } + } + DecodedMessage::Wspr(message) => { + if let Ok(mut history) = wspr_history.lock() { + history.push_back((now, message)); + } + } + } + }); info!( "Audio enabled: default port {}, decode channel set", @@ -325,6 +385,7 @@ async fn async_init() -> DynResult { tx_audio_rx, stream_info_tx, decode_tx, + Some(replay_history_sink), audio_shutdown_rx, vchan_audio_map, vchan_cmd_rx, @@ -442,3 +503,11 @@ async fn async_init() -> DynResult { request_tx: tx, }) } + +fn current_timestamp_ms() -> i64 { + let millis = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + i64::try_from(millis).unwrap_or(i64::MAX) +}