From 2f1b0609fb0eb3995f03a9682016e0bf7d39417f Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 7 Mar 2026 09:50:42 +0100 Subject: [PATCH] [feat](trx-server): persist 24h decode history across restarts Move persistent history from trx-client to trx-server where decode events originate. History for AIS, VDES, APRS, CW, FT8, and WSPR is loaded from ~/.local/cache/trx-rs/history.db at startup and flushed to disk every 60 seconds. CW events are now also stored in DecoderHistories and replayed to connecting clients, consistent with all other decoder types. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 2 +- src/trx-client/Cargo.toml | 1 - src/trx-client/src/main.rs | 15 --- src/trx-server/Cargo.toml | 1 + src/trx-server/src/audio.rs | 61 +++++++++- src/trx-server/src/history_store.rs | 178 ++++++++++++++++++++++++++++ src/trx-server/src/main.rs | 21 +++- 7 files changed, 254 insertions(+), 25 deletions(-) create mode 100644 src/trx-server/src/history_store.rs diff --git a/Cargo.lock b/Cargo.lock index 5caefef..6ccdfa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2461,7 +2461,6 @@ dependencies = [ "cpal", "dirs", "opus", - "pickledb", "serde", "serde_json", "tokio", @@ -2594,6 +2593,7 @@ dependencies = [ "dirs", "num-complex", "opus", + "pickledb", "serde", "serde_json", "tokio", diff --git a/src/trx-client/Cargo.toml b/src/trx-client/Cargo.toml index 783a400..052d9b2 100644 --- a/src/trx-client/Cargo.toml +++ b/src/trx-client/Cargo.toml @@ -15,7 +15,6 @@ toml = { workspace = true } tracing = { workspace = true } clap = { workspace = true, features = ["derive"] } dirs = "6" -pickledb = "0.5" bytes = "1" cpal = "0.15" opus = "0.3" diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 211cca9..4bc0082 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -5,7 +5,6 @@ mod audio_bridge; mod audio_client; mod config; -mod history_store; mod remote_client; use std::collections::HashMap; @@ -125,17 +124,6 @@ async fn async_init() -> DynResult { let mut frontend_reg_ctx = FrontendRegistrationContext::new(); let mut frontend_runtime = FrontendRuntimeContext::new(); - // Load persisted decode history before frontends start. - let db = { - let db = history_store::open_db(); - history_store::load_all(&db, &mut frontend_runtime); - tracing::info!( - "Loaded decode history from {}", - history_store::db_path().display() - ); - std::sync::Arc::new(std::sync::Mutex::new(db)) - }; - register_http_frontend(&mut frontend_reg_ctx); register_http_json_frontend(&mut frontend_reg_ctx); register_rigctl_frontend(&mut frontend_reg_ctx); @@ -349,9 +337,6 @@ async fn async_init() -> DynResult { ); } - // Flush in-memory history to disk every 60 seconds. - history_store::spawn_flush_task(db.clone(), frontend_runtime_ctx.clone()); - // Spawn frontends with runtime context for frontend in &frontends { let frontend_state_rx = state_rx.clone(); diff --git a/src/trx-server/Cargo.toml b/src/trx-server/Cargo.toml index dd09d9f..863af41 100644 --- a/src/trx-server/Cargo.toml +++ b/src/trx-server/Cargo.toml @@ -21,6 +21,7 @@ toml = { workspace = true } tracing = { workspace = true } clap = { workspace = true, features = ["derive"] } dirs = "6" +pickledb = "0.5" chrono = { version = "0.4", default-features = false, features = ["clock"] } bytes = "1" cpal = "0.15" diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index b9b2adf..eb3b300 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -26,7 +26,7 @@ use trx_core::audio::{ AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::{ - AisMessage, AprsPacket, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, + AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, }; use trx_core::rig::state::{RigMode, RigState}; use trx_cw::CwDecoder; @@ -40,6 +40,7 @@ use trx_decode_log::DecoderLoggers; const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const AIS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const VDES_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); +const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_SAMPLE_RATE: u32 = 12_000; @@ -142,11 +143,12 @@ fn classify_stream_error(err: &str) -> &'static str { /// instance can maintain its own independent history. Pass an /// `Arc` into every decoder task and into the audio listener. pub struct DecoderHistories { - ais: Mutex>, - vdes: Mutex>, - aprs: Mutex>, - ft8: Mutex>, - wspr: Mutex>, + pub ais: Mutex>, + pub vdes: Mutex>, + pub aprs: Mutex>, + pub cw: Mutex>, + pub ft8: Mutex>, + pub wspr: Mutex>, } impl DecoderHistories { @@ -155,6 +157,7 @@ impl DecoderHistories { ais: Mutex::new(VecDeque::new()), vdes: Mutex::new(VecDeque::new()), aprs: Mutex::new(VecDeque::new()), + cw: Mutex::new(VecDeque::new()), ft8: Mutex::new(VecDeque::new()), wspr: Mutex::new(VecDeque::new()), }) @@ -254,6 +257,35 @@ impl DecoderHistories { .clear(); } + // --- CW --- + + fn prune_cw(history: &mut VecDeque<(Instant, CwEvent)>) { + let cutoff = Instant::now() - CW_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } + } + } + + pub fn record_cw_event(&self, evt: CwEvent) { + let mut h = self.cw.lock().expect("cw history mutex poisoned"); + h.push_back((Instant::now(), evt)); + Self::prune_cw(&mut h); + } + + pub fn snapshot_cw_history(&self) -> Vec { + let mut h = self.cw.lock().expect("cw history mutex poisoned"); + Self::prune_cw(&mut h); + h.iter().map(|(_, evt)| evt.clone()).collect() + } + + pub fn clear_cw_history(&self) { + self.cw.lock().expect("cw history mutex poisoned").clear(); + } + // --- FT8 --- fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) { @@ -1087,6 +1119,7 @@ pub async fn run_cw_decoder( mut state_rx: watch::Receiver, decode_tx: broadcast::Sender, decode_logs: Option>, + histories: Arc, ) { info!("CW decoder started ({}Hz, {} ch)", sample_rate, channels); let mut decoder = CwDecoder::new(sample_rate); @@ -1183,6 +1216,7 @@ pub async fn run_cw_decoder( if let Some(logger) = decode_logs.as_ref() { logger.log_cw(&evt); } + histories.record_cw_event(evt.clone()); let _ = decode_tx.send(DecodedMessage::Cw(evt)); } } @@ -1713,6 +1747,21 @@ async fn handle_audio_client( } } } + // Send CW history to newly connected client. + let history = histories.snapshot_cw_history(); + for evt in history { + let msg = DecodedMessage::Cw(evt); + let msg_type = AUDIO_MSG_CW_DECODE; + if let Ok(json) = serde_json::to_vec(&msg) { + write_audio_msg_buffered(&mut writer, msg_type, &json).await?; + replayed_history_count += 1; + pending_history_flush += 1; + if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { + writer.flush().await?; + pending_history_flush = 0; + } + } + } if pending_history_flush > 0 { writer.flush().await?; } diff --git a/src/trx-server/src/history_store.rs b/src/trx-server/src/history_store.rs new file mode 100644 index 0000000..306b7f0 --- /dev/null +++ b/src/trx-server/src/history_store.rs @@ -0,0 +1,178 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Persistent decode history storage for trx-server using pickledb. +//! +//! History for all decoder types (AIS, VDES, APRS, CW, FT8, WSPR) is +//! serialised as JSON arrays to `~/.local/cache/trx-rs/history.db` and +//! loaded back on startup, preserving up to 24 hours of decodes across +//! trx-server restarts. Each rig's keys are prefixed with the rig id +//! (e.g. `"default.ais"`) so multi-rig setups don't collide. + +use std::collections::VecDeque; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use pickledb::{PickleDb, PickleDbDumpPolicy, SerializationMethod}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use trx_core::decode::{AisMessage, AprsPacket, CwEvent, Ft8Message, VdesMessage, WsprMessage}; + +use crate::audio::DecoderHistories; + +const HISTORY_RETENTION_MS: i64 = 24 * 60 * 60 * 1_000; + +#[derive(Serialize, Deserialize)] +struct StoredEntry { + ts_ms: i64, + data: T, +} + +fn now_unix_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(i64::MAX) +} + +pub fn db_path() -> PathBuf { + let base = dirs::cache_dir().unwrap_or_else(|| PathBuf::from(".")); + base.join("trx-rs").join("history.db") +} + +/// Open (or create) the history database at the canonical cache path. +pub fn open_db() -> PickleDb { + let path = db_path(); + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + PickleDb::load( + &path, + PickleDbDumpPolicy::DumpUponRequest, + SerializationMethod::Json, + ) + .unwrap_or_else(|_| { + PickleDb::new( + path, + PickleDbDumpPolicy::DumpUponRequest, + SerializationMethod::Json, + ) + }) +} + +fn load_key(db: &PickleDb, key: &str) -> Vec<(Instant, T)> { + let now_ms = now_unix_ms(); + let cutoff_ms = now_ms - HISTORY_RETENTION_MS; + + let entries: Vec> = db.get(key).unwrap_or_default(); + + entries + .into_iter() + .filter(|e| e.ts_ms >= cutoff_ms) + .map(|e| { + let age_ms = now_ms.saturating_sub(e.ts_ms).max(0) as u64; + // checked_sub returns None when age exceeds system uptime; treat as + // brand-new so the entry stays visible for another 24 h. + let instant = Instant::now() + .checked_sub(Duration::from_millis(age_ms)) + .unwrap_or_else(Instant::now); + (instant, e.data) + }) + .collect() +} + +fn save_key(db: &mut PickleDb, key: &str, deque: &VecDeque<(Instant, T)>) { + let now_ms = now_unix_ms(); + let entries: Vec> = deque + .iter() + .map(|(inst, data)| StoredEntry { + ts_ms: now_ms - inst.elapsed().as_millis() as i64, + data: data.clone(), + }) + .collect(); + let _ = db.set(key, &entries); +} + +/// Populate `histories` from the database using `rig_id`-prefixed keys. +pub fn load_all(db: &PickleDb, rig_id: &str, histories: &Arc) { + let k = |suffix: &str| format!("{}.{}", rig_id, suffix); + + if let Ok(mut h) = histories.ais.lock() { + for e in load_key::(db, &k("ais")) { + h.push_back(e); + } + } + if let Ok(mut h) = histories.vdes.lock() { + for e in load_key::(db, &k("vdes")) { + h.push_back(e); + } + } + if let Ok(mut h) = histories.aprs.lock() { + for e in load_key::(db, &k("aprs")) { + h.push_back(e); + } + } + if let Ok(mut h) = histories.cw.lock() { + for e in load_key::(db, &k("cw")) { + h.push_back(e); + } + } + if let Ok(mut h) = histories.ft8.lock() { + for e in load_key::(db, &k("ft8")) { + h.push_back(e); + } + } + if let Ok(mut h) = histories.wspr.lock() { + for e in load_key::(db, &k("wspr")) { + h.push_back(e); + } + } +} + +/// Flush `histories` to the database under `rig_id`-prefixed keys and sync. +pub fn flush_all(db: &mut PickleDb, rig_id: &str, histories: &Arc) { + let k = |suffix: &str| format!("{}.{}", rig_id, suffix); + + if let Ok(h) = histories.ais.lock() { + save_key(db, &k("ais"), &h); + } + if let Ok(h) = histories.vdes.lock() { + save_key(db, &k("vdes"), &h); + } + if let Ok(h) = histories.aprs.lock() { + save_key(db, &k("aprs"), &h); + } + if let Ok(h) = histories.cw.lock() { + save_key(db, &k("cw"), &h); + } + if let Ok(h) = histories.ft8.lock() { + save_key(db, &k("ft8"), &h); + } + if let Ok(h) = histories.wspr.lock() { + save_key(db, &k("wspr"), &h); + } + let _ = db.dump(); +} + +/// Spawn a Tokio task that flushes all rigs' histories to disk every 60 seconds. +pub fn spawn_flush_task( + db: Arc>, + rig_histories: Vec<(String, Arc)>, +) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + interval.tick().await; // consume the immediate first tick + loop { + interval.tick().await; + if let Ok(mut guard) = db.lock() { + for (rig_id, histories) in &rig_histories { + flush_all(&mut guard, rig_id, histories); + } + } + } + }); +} diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 234fe2c..c3ebbc6 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -6,6 +6,7 @@ mod aprsfi; mod audio; mod config; mod error; +mod history_store; mod listener; mod pskreporter; mod rig_handle; @@ -652,7 +653,7 @@ fn spawn_rig_audio_stack( })); } - // Spawn CW decoder task (no histories needed — CW has no persistent history) + // Spawn CW decoder task let cw_pcm_rx = pcm_tx.subscribe(); let cw_state_rx = state_rx.clone(); let cw_decode_tx = decode_tx.clone(); @@ -660,9 +661,10 @@ fn spawn_rig_audio_stack( let cw_ch = rig_cfg.audio.channels; let cw_shutdown_rx = shutdown_rx.clone(); let cw_logs = decoder_logs.clone(); + let cw_histories = histories.clone(); handles.push(tokio::spawn(async move { tokio::select! { - _ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {} + _ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs, cw_histories) => {} _ = wait_for_shutdown(cw_shutdown_rx) => {} } })); @@ -842,6 +844,14 @@ async fn main() -> DynResult<()> { let mut task_handles: Vec> = Vec::new(); let (shutdown_tx, shutdown_rx) = watch::channel(false); + // Open persistent history DB once; each rig uses rig_id-prefixed keys. + let history_db = { + let db = history_store::open_db(); + info!("Decode history DB: {}", history_store::db_path().display()); + Arc::new(std::sync::Mutex::new(db)) + }; + let mut rig_histories_for_flush: Vec<(String, Arc)> = Vec::new(); + // The first rig id is the default for backward-compat clients that omit rig_id. let default_rig_id = resolved_rigs .first() @@ -899,6 +909,10 @@ async fn main() -> DynResult<()> { ) = (None, None, None, None); let histories = DecoderHistories::new(); + if let Ok(db_guard) = history_db.lock() { + history_store::load_all(&db_guard, &rig_cfg.id, &histories); + } + rig_histories_for_flush.push((rig_cfg.id.clone(), histories.clone())); let (rig_tx, rig_rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); let mut initial_state = RigState::new_with_metadata( @@ -974,6 +988,9 @@ async fn main() -> DynResult<()> { ); } + // Spawn periodic flush of decode history to disk (every 60 s). + history_store::spawn_flush_task(history_db, rig_histories_for_flush); + // Start JSON TCP listener. if cfg.listen.enabled { let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);