From c24d5d01526054b6cebe78624b6c0611bf0dc9ca Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Mon, 9 Feb 2026 23:22:35 +0100 Subject: [PATCH] [fix](trx-frontend-http): persist decode history and fix ft8 locators Co-authored-by: OpenAI Signed-off-by: Stanislaw Grams --- .../assets/web/plugins/ft8.js | 6 +- .../trx-frontend/trx-frontend-http/src/api.rs | 8 ++ .../trx-frontend-http/src/audio.rs | 132 +++++++++++++++++- 3 files changed, 142 insertions(+), 4 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js index 34b7082..61a6f1c 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/ft8.js @@ -38,8 +38,8 @@ function renderFt8Message(message) { while (j < message.length && isAlphaNum(message[j])) j++; const token = message.slice(i, j); const grid = token.toUpperCase(); - if (/^[A-R]{2}\\d{2}(?:[A-X]{2})?$/.test(grid)) { - out += `[${grid}]`; + if (/^[A-R]{2}\d{2}(?:[A-X]{2})?$/.test(grid)) { + out += `${grid}`; } else { out += escapeHtml(token); } @@ -60,7 +60,7 @@ function extractFirstGrid(message) { while (j < message.length && isAlphaNum(message[j])) j++; const token = message.slice(i, j); const grid = token.toUpperCase(); - if (/^[A-R]{2}\\d{2}(?:[A-X]{2})?$/.test(grid)) return grid; + if (/^[A-R]{2}\d{2}(?:[A-X]{2})?$/.test(grid)) return grid; i = j; } else { i += 1; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 039512d..5bc4239 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -110,6 +110,11 @@ pub async fn decode_events() -> Result { .into_iter() .map(trx_core::decode::DecodedMessage::Aprs), ); + out.extend( + crate::server::audio::snapshot_cw_history() + .into_iter() + .map(trx_core::decode::DecodedMessage::Cw), + ); out.extend( crate::server::audio::snapshot_ft8_history() .into_iter() @@ -355,6 +360,7 @@ pub async fn toggle_ft8_decode( pub async fn clear_ft8_decode( rig_tx: web::Data>, ) -> Result { + crate::server::audio::clear_ft8_history(); send_command(&rig_tx, RigCommand::ResetFt8Decoder).await } @@ -362,6 +368,7 @@ pub async fn clear_ft8_decode( pub async fn clear_aprs_decode( rig_tx: web::Data>, ) -> Result { + crate::server::audio::clear_aprs_history(); send_command(&rig_tx, RigCommand::ResetAprsDecoder).await } @@ -369,6 +376,7 @@ pub async fn clear_aprs_decode( pub async fn clear_cw_decode( rig_tx: web::Data>, ) -> Result { + crate::server::audio::clear_cw_history(); send_command(&rig_tx, RigCommand::ResetCwDecoder).await } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs index a65b3c7..e2ba45a 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs @@ -9,7 +9,9 @@ //! - Subsequent binary messages: raw Opus packets (RX) //! - Browser sends binary messages: raw Opus packets (TX) +use std::collections::VecDeque; use std::sync::{Mutex, OnceLock}; +use std::time::{Duration, Instant}; use actix_web::{get, web, Error, HttpRequest, HttpResponse}; use actix_ws::Message; @@ -18,7 +20,7 @@ use tokio::sync::{broadcast, mpsc, watch}; use tracing::warn; use trx_core::audio::AudioStreamInfo; -use trx_core::decode::DecodedMessage; +use trx_core::decode::{AprsPacket, CwEvent, DecodedMessage, Ft8Message}; struct AudioChannels { rx: broadcast::Sender, @@ -49,12 +51,108 @@ fn decode_channel() -> &'static Mutex>> CHANNEL.get_or_init(|| Mutex::new(None)) } +const HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); + +fn aprs_history() -> &'static Mutex> { + static HISTORY: OnceLock>> = OnceLock::new(); + HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) +} + +fn cw_history() -> &'static Mutex> { + static HISTORY: OnceLock>> = OnceLock::new(); + HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) +} + +fn ft8_history() -> &'static Mutex> { + static HISTORY: OnceLock>> = OnceLock::new(); + HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) +} + +fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) { + while let Some((ts, _)) = history.front() { + if ts.elapsed() <= HISTORY_RETENTION { + break; + } + history.pop_front(); + } +} + +fn prune_cw_history(history: &mut VecDeque<(Instant, CwEvent)>) { + while let Some((ts, _)) = history.front() { + if ts.elapsed() <= HISTORY_RETENTION { + break; + } + history.pop_front(); + } +} + +fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) { + while let Some((ts, _)) = history.front() { + if ts.elapsed() <= HISTORY_RETENTION { + break; + } + history.pop_front(); + } +} + +fn record_aprs(pkt: AprsPacket) { + let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); + history.push_back((Instant::now(), pkt)); + prune_aprs_history(&mut history); +} + +fn record_cw(event: CwEvent) { + let mut history = cw_history().lock().expect("cw history mutex poisoned"); + history.push_back((Instant::now(), event)); + prune_cw_history(&mut history); +} + +fn record_ft8(msg: Ft8Message) { + let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); + history.push_back((Instant::now(), msg)); + prune_ft8_history(&mut history); +} + +pub fn snapshot_aprs_history() -> Vec { + let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); + prune_aprs_history(&mut history); + history.iter().map(|(_, pkt)| pkt.clone()).collect() +} + +pub fn snapshot_cw_history() -> Vec { + let mut history = cw_history().lock().expect("cw history mutex poisoned"); + prune_cw_history(&mut history); + history.iter().map(|(_, evt)| evt.clone()).collect() +} + +pub fn snapshot_ft8_history() -> Vec { + let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); + prune_ft8_history(&mut history); + history.iter().map(|(_, msg)| msg.clone()).collect() +} + +pub fn clear_aprs_history() { + let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); + history.clear(); +} + +pub fn clear_cw_history() { + let mut history = cw_history().lock().expect("cw history mutex poisoned"); + history.clear(); +} + +pub fn clear_ft8_history() { + let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); + history.clear(); +} + /// Set the decode broadcast channel from the client main. pub fn set_decode_channel(tx: broadcast::Sender) { let mut ch = decode_channel() .lock() .expect("decode channel mutex poisoned"); *ch = Some(tx); + start_decode_history_collector(); } /// Subscribe to the decode broadcast channel, if available. @@ -65,6 +163,38 @@ pub fn subscribe_decode() -> Option> { ch.as_ref().map(|tx| tx.subscribe()) } +fn start_decode_history_collector() { + static STARTED: OnceLock> = OnceLock::new(); + let started = STARTED.get_or_init(|| Mutex::new(false)); + let mut started_guard = started.lock().expect("decode history start mutex poisoned"); + if *started_guard { + return; + } + *started_guard = true; + + let ch = decode_channel() + .lock() + .expect("decode channel mutex poisoned"); + let Some(tx) = ch.as_ref().cloned() else { + return; + }; + + tokio::spawn(async move { + let mut rx = tx.subscribe(); + loop { + match rx.recv().await { + Ok(msg) => match msg { + DecodedMessage::Aprs(pkt) => record_aprs(pkt), + DecodedMessage::Cw(evt) => record_cw(evt), + DecodedMessage::Ft8(msg) => record_ft8(msg), + }, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); +} + #[get("/audio")] pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result { let channels = audio_channels().lock().expect("audio channels mutex poisoned");