From b65b85e13ce2ca5a56daa501a13e4cecb2bc6828 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 7 Mar 2026 09:43:48 +0100 Subject: [PATCH] [feat](trx-client): persist 24h decode history across restarts Add pickledb-backed persistent history store for all decoder types (AIS, VDES, APRS, CW, FT8, WSPR). History is loaded from ~/.local/cache/trx-rs/history.db at startup and flushed to disk every 60 seconds. On load, entry timestamps are reconstructed from stored Unix ms values so 24h pruning continues to work correctly. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 1 + src/trx-client/Cargo.toml | 1 + src/trx-client/src/history_store.rs | 169 ++++++++++++++++++++++++++++ src/trx-client/src/main.rs | 15 +++ 4 files changed, 186 insertions(+) create mode 100644 src/trx-client/src/history_store.rs diff --git a/Cargo.lock b/Cargo.lock index 7d5d87d..5caefef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2461,6 +2461,7 @@ dependencies = [ "cpal", "dirs", "opus", + "pickledb", "serde", "serde_json", "tokio", diff --git a/src/trx-client/Cargo.toml b/src/trx-client/Cargo.toml index 052d9b2..783a400 100644 --- a/src/trx-client/Cargo.toml +++ b/src/trx-client/Cargo.toml @@ -15,6 +15,7 @@ toml = { workspace = true } tracing = { workspace = true } clap = { workspace = true, features = ["derive"] } dirs = "6" +pickledb = "0.5" bytes = "1" cpal = "0.15" opus = "0.3" diff --git a/src/trx-client/src/history_store.rs b/src/trx-client/src/history_store.rs new file mode 100644 index 0000000..24f5b62 --- /dev/null +++ b/src/trx-client/src/history_store.rs @@ -0,0 +1,169 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Persistent decode history storage using pickledb. +//! +//! History for all decoder types (AIS, VDES, APRS, CW, FT8, WSPR) is +//! serialised as JSON arrays to `~/.local/cache/trx-rs/history.db` and +//! loaded back on startup, preserving up to 24 hours of decodes across +//! trx-client restarts. + +use std::collections::VecDeque; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use pickledb::{PickleDb, PickleDbDumpPolicy, SerializationMethod}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use trx_core::decode::{AisMessage, AprsPacket, CwEvent, Ft8Message, VdesMessage, WsprMessage}; +use trx_frontend::FrontendRuntimeContext; + +const HISTORY_RETENTION_MS: i64 = 24 * 60 * 60 * 1_000; + +#[derive(Serialize, Deserialize)] +struct StoredEntry { + ts_ms: i64, + data: T, +} + +fn now_unix_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(i64::MAX) +} + +pub fn db_path() -> PathBuf { + let base = dirs::cache_dir().unwrap_or_else(|| PathBuf::from(".")); + base.join("trx-rs").join("history.db") +} + +/// Open (or create) the history database at the canonical cache path. +pub fn open_db() -> PickleDb { + let path = db_path(); + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + PickleDb::load( + &path, + PickleDbDumpPolicy::DumpUponRequest, + SerializationMethod::Json, + ) + .unwrap_or_else(|_| { + PickleDb::new( + path, + PickleDbDumpPolicy::DumpUponRequest, + SerializationMethod::Json, + ) + }) +} + +/// Deserialise entries for `key`, discarding anything older than 24 h. +fn load_key(db: &PickleDb, key: &str) -> Vec<(Instant, T)> { + let now_ms = now_unix_ms(); + let cutoff_ms = now_ms - HISTORY_RETENTION_MS; + + let entries: Vec> = db.get(key).unwrap_or_default(); + + entries + .into_iter() + .filter(|e| e.ts_ms >= cutoff_ms) + .map(|e| { + let age_ms = now_ms.saturating_sub(e.ts_ms).max(0) as u64; + // checked_sub returns None when age exceeds system uptime; in that + // case treat the entry as brand-new so it stays visible for 24 h. + let instant = Instant::now() + .checked_sub(Duration::from_millis(age_ms)) + .unwrap_or_else(Instant::now); + (instant, e.data) + }) + .collect() +} + +/// Serialise a VecDeque of history entries into the database under `key`. +fn save_key(db: &mut PickleDb, key: &str, deque: &VecDeque<(Instant, T)>) { + let now_ms = now_unix_ms(); + let entries: Vec> = deque + .iter() + .map(|(inst, data)| StoredEntry { + ts_ms: now_ms - inst.elapsed().as_millis() as i64, + data: data.clone(), + }) + .collect(); + let _ = db.set(key, &entries); +} + +/// Populate all history VecDeques in `ctx` from the database. +pub fn load_all(db: &PickleDb, ctx: &mut FrontendRuntimeContext) { + if let Ok(mut h) = ctx.ais_history.lock() { + for e in load_key::(db, "ais") { + h.push_back(e); + } + } + if let Ok(mut h) = ctx.vdes_history.lock() { + for e in load_key::(db, "vdes") { + h.push_back(e); + } + } + if let Ok(mut h) = ctx.aprs_history.lock() { + for e in load_key::(db, "aprs") { + h.push_back(e); + } + } + if let Ok(mut h) = ctx.cw_history.lock() { + for e in load_key::(db, "cw") { + h.push_back(e); + } + } + if let Ok(mut h) = ctx.ft8_history.lock() { + for e in load_key::(db, "ft8") { + h.push_back(e); + } + } + if let Ok(mut h) = ctx.wspr_history.lock() { + for e in load_key::(db, "wspr") { + h.push_back(e); + } + } +} + +/// Write all in-memory history VecDeques to the database and flush to disk. +pub fn flush_all(db: &mut PickleDb, ctx: &FrontendRuntimeContext) { + if let Ok(h) = ctx.ais_history.lock() { + save_key(db, "ais", &h); + } + if let Ok(h) = ctx.vdes_history.lock() { + save_key(db, "vdes", &h); + } + if let Ok(h) = ctx.aprs_history.lock() { + save_key(db, "aprs", &h); + } + if let Ok(h) = ctx.cw_history.lock() { + save_key(db, "cw", &h); + } + if let Ok(h) = ctx.ft8_history.lock() { + save_key(db, "ft8", &h); + } + if let Ok(h) = ctx.wspr_history.lock() { + save_key(db, "wspr", &h); + } + let _ = db.dump(); +} + +/// Spawn a Tokio task that flushes history to disk every 60 seconds. +pub fn spawn_flush_task(db: Arc>, ctx: Arc) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + interval.tick().await; // consume the immediate first tick + loop { + interval.tick().await; + if let Ok(mut guard) = db.lock() { + flush_all(&mut guard, &ctx); + } + } + }); +} diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 4bc0082..211cca9 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -5,6 +5,7 @@ mod audio_bridge; mod audio_client; mod config; +mod history_store; mod remote_client; use std::collections::HashMap; @@ -124,6 +125,17 @@ async fn async_init() -> DynResult { let mut frontend_reg_ctx = FrontendRegistrationContext::new(); let mut frontend_runtime = FrontendRuntimeContext::new(); + // Load persisted decode history before frontends start. + let db = { + let db = history_store::open_db(); + history_store::load_all(&db, &mut frontend_runtime); + tracing::info!( + "Loaded decode history from {}", + history_store::db_path().display() + ); + std::sync::Arc::new(std::sync::Mutex::new(db)) + }; + register_http_frontend(&mut frontend_reg_ctx); register_http_json_frontend(&mut frontend_reg_ctx); register_rigctl_frontend(&mut frontend_reg_ctx); @@ -337,6 +349,9 @@ async fn async_init() -> DynResult { ); } + // Flush in-memory history to disk every 60 seconds. + history_store::spawn_flush_task(db.clone(), frontend_runtime_ctx.clone()); + // Spawn frontends with runtime context for frontend in &frontends { let frontend_state_rx = state_rx.clone();