[feat](trx-server): persist 24h decode history across restarts
Move persistent history from trx-client to trx-server where decode events originate. History for AIS, VDES, APRS, CW, FT8, and WSPR is loaded from ~/.local/cache/trx-rs/history.db at startup and flushed to disk every 60 seconds. CW events are now also stored in DecoderHistories and replayed to connecting clients, consistent with all other decoder types. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
Generated
+1
-1
@@ -2461,7 +2461,6 @@ dependencies = [
|
||||
"cpal",
|
||||
"dirs",
|
||||
"opus",
|
||||
"pickledb",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
@@ -2594,6 +2593,7 @@ dependencies = [
|
||||
"dirs",
|
||||
"num-complex",
|
||||
"opus",
|
||||
"pickledb",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
|
||||
@@ -15,7 +15,6 @@ toml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
dirs = "6"
|
||||
pickledb = "0.5"
|
||||
bytes = "1"
|
||||
cpal = "0.15"
|
||||
opus = "0.3"
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
mod audio_bridge;
|
||||
mod audio_client;
|
||||
mod config;
|
||||
mod history_store;
|
||||
mod remote_client;
|
||||
|
||||
use std::collections::HashMap;
|
||||
@@ -125,17 +124,6 @@ async fn async_init() -> DynResult<AppState> {
|
||||
let mut frontend_reg_ctx = FrontendRegistrationContext::new();
|
||||
let mut frontend_runtime = FrontendRuntimeContext::new();
|
||||
|
||||
// Load persisted decode history before frontends start.
|
||||
let db = {
|
||||
let db = history_store::open_db();
|
||||
history_store::load_all(&db, &mut frontend_runtime);
|
||||
tracing::info!(
|
||||
"Loaded decode history from {}",
|
||||
history_store::db_path().display()
|
||||
);
|
||||
std::sync::Arc::new(std::sync::Mutex::new(db))
|
||||
};
|
||||
|
||||
register_http_frontend(&mut frontend_reg_ctx);
|
||||
register_http_json_frontend(&mut frontend_reg_ctx);
|
||||
register_rigctl_frontend(&mut frontend_reg_ctx);
|
||||
@@ -349,9 +337,6 @@ async fn async_init() -> DynResult<AppState> {
|
||||
);
|
||||
}
|
||||
|
||||
// Flush in-memory history to disk every 60 seconds.
|
||||
history_store::spawn_flush_task(db.clone(), frontend_runtime_ctx.clone());
|
||||
|
||||
// Spawn frontends with runtime context
|
||||
for frontend in &frontends {
|
||||
let frontend_state_rx = state_rx.clone();
|
||||
|
||||
@@ -21,6 +21,7 @@ toml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
dirs = "6"
|
||||
pickledb = "0.5"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
bytes = "1"
|
||||
cpal = "0.15"
|
||||
|
||||
@@ -26,7 +26,7 @@ use trx_core::audio::{
|
||||
AUDIO_MSG_WSPR_DECODE,
|
||||
};
|
||||
use trx_core::decode::{
|
||||
AisMessage, AprsPacket, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
|
||||
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
|
||||
};
|
||||
use trx_core::rig::state::{RigMode, RigState};
|
||||
use trx_cw::CwDecoder;
|
||||
@@ -40,6 +40,7 @@ use trx_decode_log::DecoderLoggers;
|
||||
const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const AIS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const VDES_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_SAMPLE_RATE: u32 = 12_000;
|
||||
@@ -142,11 +143,12 @@ fn classify_stream_error(err: &str) -> &'static str {
|
||||
/// instance can maintain its own independent history. Pass an
|
||||
/// `Arc<DecoderHistories>` into every decoder task and into the audio listener.
|
||||
pub struct DecoderHistories {
|
||||
ais: Mutex<VecDeque<(Instant, AisMessage)>>,
|
||||
vdes: Mutex<VecDeque<(Instant, VdesMessage)>>,
|
||||
aprs: Mutex<VecDeque<(Instant, AprsPacket)>>,
|
||||
ft8: Mutex<VecDeque<(Instant, Ft8Message)>>,
|
||||
wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
|
||||
pub ais: Mutex<VecDeque<(Instant, AisMessage)>>,
|
||||
pub vdes: Mutex<VecDeque<(Instant, VdesMessage)>>,
|
||||
pub aprs: Mutex<VecDeque<(Instant, AprsPacket)>>,
|
||||
pub cw: Mutex<VecDeque<(Instant, CwEvent)>>,
|
||||
pub ft8: Mutex<VecDeque<(Instant, Ft8Message)>>,
|
||||
pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
|
||||
}
|
||||
|
||||
impl DecoderHistories {
|
||||
@@ -155,6 +157,7 @@ impl DecoderHistories {
|
||||
ais: Mutex::new(VecDeque::new()),
|
||||
vdes: Mutex::new(VecDeque::new()),
|
||||
aprs: Mutex::new(VecDeque::new()),
|
||||
cw: Mutex::new(VecDeque::new()),
|
||||
ft8: Mutex::new(VecDeque::new()),
|
||||
wspr: Mutex::new(VecDeque::new()),
|
||||
})
|
||||
@@ -254,6 +257,35 @@ impl DecoderHistories {
|
||||
.clear();
|
||||
}
|
||||
|
||||
// --- CW ---
|
||||
|
||||
fn prune_cw(history: &mut VecDeque<(Instant, CwEvent)>) {
|
||||
let cutoff = Instant::now() - CW_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
history.pop_front();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_cw_event(&self, evt: CwEvent) {
|
||||
let mut h = self.cw.lock().expect("cw history mutex poisoned");
|
||||
h.push_back((Instant::now(), evt));
|
||||
Self::prune_cw(&mut h);
|
||||
}
|
||||
|
||||
pub fn snapshot_cw_history(&self) -> Vec<CwEvent> {
|
||||
let mut h = self.cw.lock().expect("cw history mutex poisoned");
|
||||
Self::prune_cw(&mut h);
|
||||
h.iter().map(|(_, evt)| evt.clone()).collect()
|
||||
}
|
||||
|
||||
pub fn clear_cw_history(&self) {
|
||||
self.cw.lock().expect("cw history mutex poisoned").clear();
|
||||
}
|
||||
|
||||
// --- FT8 ---
|
||||
|
||||
fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) {
|
||||
@@ -1087,6 +1119,7 @@ pub async fn run_cw_decoder(
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) {
|
||||
info!("CW decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = CwDecoder::new(sample_rate);
|
||||
@@ -1183,6 +1216,7 @@ pub async fn run_cw_decoder(
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_cw(&evt);
|
||||
}
|
||||
histories.record_cw_event(evt.clone());
|
||||
let _ = decode_tx.send(DecodedMessage::Cw(evt));
|
||||
}
|
||||
}
|
||||
@@ -1713,6 +1747,21 @@ async fn handle_audio_client(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Send CW history to newly connected client.
|
||||
let history = histories.snapshot_cw_history();
|
||||
for evt in history {
|
||||
let msg = DecodedMessage::Cw(evt);
|
||||
let msg_type = AUDIO_MSG_CW_DECODE;
|
||||
if let Ok(json) = serde_json::to_vec(&msg) {
|
||||
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
|
||||
replayed_history_count += 1;
|
||||
pending_history_flush += 1;
|
||||
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
|
||||
writer.flush().await?;
|
||||
pending_history_flush = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if pending_history_flush > 0 {
|
||||
writer.flush().await?;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,178 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
//! Persistent decode history storage for trx-server using pickledb.
|
||||
//!
|
||||
//! History for all decoder types (AIS, VDES, APRS, CW, FT8, WSPR) is
|
||||
//! serialised as JSON arrays to `~/.local/cache/trx-rs/history.db` and
|
||||
//! loaded back on startup, preserving up to 24 hours of decodes across
|
||||
//! trx-server restarts. Each rig's keys are prefixed with the rig id
|
||||
//! (e.g. `"default.ais"`) so multi-rig setups don't collide.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use pickledb::{PickleDb, PickleDbDumpPolicy, SerializationMethod};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
|
||||
use trx_core::decode::{AisMessage, AprsPacket, CwEvent, Ft8Message, VdesMessage, WsprMessage};
|
||||
|
||||
use crate::audio::DecoderHistories;
|
||||
|
||||
const HISTORY_RETENTION_MS: i64 = 24 * 60 * 60 * 1_000;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct StoredEntry<T> {
|
||||
ts_ms: i64,
|
||||
data: T,
|
||||
}
|
||||
|
||||
fn now_unix_ms() -> i64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap_or(i64::MAX)
|
||||
}
|
||||
|
||||
pub fn db_path() -> PathBuf {
|
||||
let base = dirs::cache_dir().unwrap_or_else(|| PathBuf::from("."));
|
||||
base.join("trx-rs").join("history.db")
|
||||
}
|
||||
|
||||
/// Open (or create) the history database at the canonical cache path.
|
||||
pub fn open_db() -> PickleDb {
|
||||
let path = db_path();
|
||||
if let Some(parent) = path.parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
PickleDb::load(
|
||||
&path,
|
||||
PickleDbDumpPolicy::DumpUponRequest,
|
||||
SerializationMethod::Json,
|
||||
)
|
||||
.unwrap_or_else(|_| {
|
||||
PickleDb::new(
|
||||
path,
|
||||
PickleDbDumpPolicy::DumpUponRequest,
|
||||
SerializationMethod::Json,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn load_key<T: DeserializeOwned>(db: &PickleDb, key: &str) -> Vec<(Instant, T)> {
|
||||
let now_ms = now_unix_ms();
|
||||
let cutoff_ms = now_ms - HISTORY_RETENTION_MS;
|
||||
|
||||
let entries: Vec<StoredEntry<T>> = db.get(key).unwrap_or_default();
|
||||
|
||||
entries
|
||||
.into_iter()
|
||||
.filter(|e| e.ts_ms >= cutoff_ms)
|
||||
.map(|e| {
|
||||
let age_ms = now_ms.saturating_sub(e.ts_ms).max(0) as u64;
|
||||
// checked_sub returns None when age exceeds system uptime; treat as
|
||||
// brand-new so the entry stays visible for another 24 h.
|
||||
let instant = Instant::now()
|
||||
.checked_sub(Duration::from_millis(age_ms))
|
||||
.unwrap_or_else(Instant::now);
|
||||
(instant, e.data)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn save_key<T: Clone + Serialize>(db: &mut PickleDb, key: &str, deque: &VecDeque<(Instant, T)>) {
|
||||
let now_ms = now_unix_ms();
|
||||
let entries: Vec<StoredEntry<T>> = deque
|
||||
.iter()
|
||||
.map(|(inst, data)| StoredEntry {
|
||||
ts_ms: now_ms - inst.elapsed().as_millis() as i64,
|
||||
data: data.clone(),
|
||||
})
|
||||
.collect();
|
||||
let _ = db.set(key, &entries);
|
||||
}
|
||||
|
||||
/// Populate `histories` from the database using `rig_id`-prefixed keys.
|
||||
pub fn load_all(db: &PickleDb, rig_id: &str, histories: &Arc<DecoderHistories>) {
|
||||
let k = |suffix: &str| format!("{}.{}", rig_id, suffix);
|
||||
|
||||
if let Ok(mut h) = histories.ais.lock() {
|
||||
for e in load_key::<AisMessage>(db, &k("ais")) {
|
||||
h.push_back(e);
|
||||
}
|
||||
}
|
||||
if let Ok(mut h) = histories.vdes.lock() {
|
||||
for e in load_key::<VdesMessage>(db, &k("vdes")) {
|
||||
h.push_back(e);
|
||||
}
|
||||
}
|
||||
if let Ok(mut h) = histories.aprs.lock() {
|
||||
for e in load_key::<AprsPacket>(db, &k("aprs")) {
|
||||
h.push_back(e);
|
||||
}
|
||||
}
|
||||
if let Ok(mut h) = histories.cw.lock() {
|
||||
for e in load_key::<CwEvent>(db, &k("cw")) {
|
||||
h.push_back(e);
|
||||
}
|
||||
}
|
||||
if let Ok(mut h) = histories.ft8.lock() {
|
||||
for e in load_key::<Ft8Message>(db, &k("ft8")) {
|
||||
h.push_back(e);
|
||||
}
|
||||
}
|
||||
if let Ok(mut h) = histories.wspr.lock() {
|
||||
for e in load_key::<WsprMessage>(db, &k("wspr")) {
|
||||
h.push_back(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush `histories` to the database under `rig_id`-prefixed keys and sync.
|
||||
pub fn flush_all(db: &mut PickleDb, rig_id: &str, histories: &Arc<DecoderHistories>) {
|
||||
let k = |suffix: &str| format!("{}.{}", rig_id, suffix);
|
||||
|
||||
if let Ok(h) = histories.ais.lock() {
|
||||
save_key(db, &k("ais"), &h);
|
||||
}
|
||||
if let Ok(h) = histories.vdes.lock() {
|
||||
save_key(db, &k("vdes"), &h);
|
||||
}
|
||||
if let Ok(h) = histories.aprs.lock() {
|
||||
save_key(db, &k("aprs"), &h);
|
||||
}
|
||||
if let Ok(h) = histories.cw.lock() {
|
||||
save_key(db, &k("cw"), &h);
|
||||
}
|
||||
if let Ok(h) = histories.ft8.lock() {
|
||||
save_key(db, &k("ft8"), &h);
|
||||
}
|
||||
if let Ok(h) = histories.wspr.lock() {
|
||||
save_key(db, &k("wspr"), &h);
|
||||
}
|
||||
let _ = db.dump();
|
||||
}
|
||||
|
||||
/// Spawn a Tokio task that flushes all rigs' histories to disk every 60 seconds.
|
||||
pub fn spawn_flush_task(
|
||||
db: Arc<Mutex<PickleDb>>,
|
||||
rig_histories: Vec<(String, Arc<DecoderHistories>)>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(60));
|
||||
interval.tick().await; // consume the immediate first tick
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Ok(mut guard) = db.lock() {
|
||||
for (rig_id, histories) in &rig_histories {
|
||||
flush_all(&mut guard, rig_id, histories);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -6,6 +6,7 @@ mod aprsfi;
|
||||
mod audio;
|
||||
mod config;
|
||||
mod error;
|
||||
mod history_store;
|
||||
mod listener;
|
||||
mod pskreporter;
|
||||
mod rig_handle;
|
||||
@@ -652,7 +653,7 @@ fn spawn_rig_audio_stack(
|
||||
}));
|
||||
}
|
||||
|
||||
// Spawn CW decoder task (no histories needed — CW has no persistent history)
|
||||
// Spawn CW decoder task
|
||||
let cw_pcm_rx = pcm_tx.subscribe();
|
||||
let cw_state_rx = state_rx.clone();
|
||||
let cw_decode_tx = decode_tx.clone();
|
||||
@@ -660,9 +661,10 @@ fn spawn_rig_audio_stack(
|
||||
let cw_ch = rig_cfg.audio.channels;
|
||||
let cw_shutdown_rx = shutdown_rx.clone();
|
||||
let cw_logs = decoder_logs.clone();
|
||||
let cw_histories = histories.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {}
|
||||
_ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs, cw_histories) => {}
|
||||
_ = wait_for_shutdown(cw_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
@@ -842,6 +844,14 @@ async fn main() -> DynResult<()> {
|
||||
let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
// Open persistent history DB once; each rig uses rig_id-prefixed keys.
|
||||
let history_db = {
|
||||
let db = history_store::open_db();
|
||||
info!("Decode history DB: {}", history_store::db_path().display());
|
||||
Arc::new(std::sync::Mutex::new(db))
|
||||
};
|
||||
let mut rig_histories_for_flush: Vec<(String, Arc<audio::DecoderHistories>)> = Vec::new();
|
||||
|
||||
// The first rig id is the default for backward-compat clients that omit rig_id.
|
||||
let default_rig_id = resolved_rigs
|
||||
.first()
|
||||
@@ -899,6 +909,10 @@ async fn main() -> DynResult<()> {
|
||||
) = (None, None, None, None);
|
||||
|
||||
let histories = DecoderHistories::new();
|
||||
if let Ok(db_guard) = history_db.lock() {
|
||||
history_store::load_all(&db_guard, &rig_cfg.id, &histories);
|
||||
}
|
||||
rig_histories_for_flush.push((rig_cfg.id.clone(), histories.clone()));
|
||||
|
||||
let (rig_tx, rig_rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
|
||||
let mut initial_state = RigState::new_with_metadata(
|
||||
@@ -974,6 +988,9 @@ async fn main() -> DynResult<()> {
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn periodic flush of decode history to disk (every 60 s).
|
||||
history_store::spawn_flush_task(history_db, rig_histories_for_flush);
|
||||
|
||||
// Start JSON TCP listener.
|
||||
if cfg.listen.enabled {
|
||||
let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);
|
||||
|
||||
Reference in New Issue
Block a user