From c0296fe2578f330192b584cfb80d81a504808720 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sun, 3 May 2026 20:08:34 +0200 Subject: [PATCH] [test](trx-server): add Tier 2 unit tests for DecoderHistories Extract a generic prune_by_age helper from the 11 duplicated typed prune_* methods. The helper takes an explicit Instant so tests drive time deterministically, and it uses checked_sub so an early monotonic clock cannot panic on cutoff computation. Tests cover: prune_by_age (empty / fresh / stale-front / all-stale / out-of-order); record/snapshot round-trip; auto-assignment of ts_ms when None; CRC-failed APRS packets are dropped; capacity eviction at MAX_HISTORY_ENTRIES; estimated_total_count tracks records and survives clear; adjust_total_count saturates on underflow; concurrent recorders converge to a consistent count. 16 new tests; trx-server suite now reports 87 passed (was 71). Co-authored-by: Claude Opus 4.7 Signed-off-by: Stan Grams --- src/trx-server/src/audio.rs | 375 +++++++++++++++++++++++++++--------- 1 file changed, 287 insertions(+), 88 deletions(-) diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index c8385e2..2489968 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -287,6 +287,23 @@ fn enforce_capacity(deque: &mut VecDeque, max: usize) { } } +/// Drop entries older than `retention` from the front of a time-tagged deque. +/// Uses `checked_sub` so an early `now` (before `retention` elapses since the +/// monotonic clock origin) is treated as "no entries are old enough to prune" +/// rather than panicking. +fn prune_by_age(deque: &mut VecDeque<(Instant, T)>, retention: Duration, now: Instant) { + let Some(cutoff) = now.checked_sub(retention) else { + return; + }; + while let Some((ts, _)) = deque.front() { + if *ts < cutoff { + deque.pop_front(); + } else { + break; + } + } +} + impl DecoderHistories { pub fn new() -> Arc { Arc::new(Self { @@ -335,14 +352,7 @@ impl DecoderHistories { // --- AIS --- fn prune_ais(history: &mut VecDeque<(Instant, AisMessage)>) { - let cutoff = Instant::now() - AIS_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, AIS_HISTORY_RETENTION, Instant::now()); } pub fn record_ais_message(&self, mut msg: AisMessage) { @@ -368,14 +378,7 @@ impl DecoderHistories { // --- VDES --- fn prune_vdes(history: &mut VecDeque<(Instant, VdesMessage)>) { - let cutoff = Instant::now() - VDES_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, VDES_HISTORY_RETENTION, Instant::now()); } pub fn record_vdes_message(&self, mut msg: VdesMessage) { @@ -401,14 +404,7 @@ impl DecoderHistories { // --- APRS --- fn prune_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) { - let cutoff = Instant::now() - APRS_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, APRS_HISTORY_RETENTION, Instant::now()); } pub fn record_aprs_packet(&self, mut pkt: AprsPacket) { @@ -446,14 +442,7 @@ impl DecoderHistories { // --- HF APRS --- fn prune_hf_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) { - let cutoff = Instant::now() - HF_APRS_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, HF_APRS_HISTORY_RETENTION, Instant::now()); } pub fn record_hf_aprs_packet(&self, mut pkt: AprsPacket) { @@ -491,14 +480,7 @@ impl DecoderHistories { // --- CW --- fn prune_cw(history: &mut VecDeque<(Instant, CwEvent)>) { - let cutoff = Instant::now() - CW_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, CW_HISTORY_RETENTION, Instant::now()); } pub fn record_cw_event(&self, evt: CwEvent) { @@ -530,14 +512,7 @@ impl DecoderHistories { // --- FT8 --- fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) { - let cutoff = Instant::now() - FT8_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, FT8_HISTORY_RETENTION, Instant::now()); } pub fn record_ft8_message(&self, msg: Ft8Message) { @@ -569,14 +544,7 @@ impl DecoderHistories { // --- FT4 --- fn prune_ft4(history: &mut VecDeque<(Instant, Ft8Message)>) { - let cutoff = Instant::now() - FT8_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, FT8_HISTORY_RETENTION, Instant::now()); } pub fn record_ft4_message(&self, msg: Ft8Message) { @@ -609,14 +577,7 @@ impl DecoderHistories { #[cfg_attr(not(feature = "ft2"), allow(dead_code))] fn prune_ft2(history: &mut VecDeque<(Instant, Ft8Message)>) { - let cutoff = Instant::now() - FT8_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, FT8_HISTORY_RETENTION, Instant::now()); } #[cfg_attr(not(feature = "ft2"), allow(dead_code))] @@ -650,14 +611,7 @@ impl DecoderHistories { // --- WSPR --- fn prune_wspr(history: &mut VecDeque<(Instant, WsprMessage)>) { - let cutoff = Instant::now() - WSPR_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, WSPR_HISTORY_RETENTION, Instant::now()); } pub fn record_wspr_message(&self, msg: WsprMessage) { @@ -689,14 +643,7 @@ impl DecoderHistories { // --- LRPT --- fn prune_lrpt(history: &mut VecDeque<(Instant, LrptImage)>) { - let cutoff = Instant::now() - LRPT_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, LRPT_HISTORY_RETENTION, Instant::now()); } pub fn record_lrpt_image(&self, mut img: LrptImage) { @@ -731,14 +678,7 @@ impl DecoderHistories { // --- WEFAX --- fn prune_wefax(history: &mut VecDeque<(Instant, WefaxMessage)>) { - let cutoff = Instant::now() - WEFAX_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; - } - } + prune_by_age(history, WEFAX_HISTORY_RETENTION, Instant::now()); } pub fn record_wefax_message(&self, mut msg: WefaxMessage) { @@ -4339,4 +4279,263 @@ mod tests { // Different text => emitted. assert!(should_emit_ft2_decode(&mut recent, "CQ ZZ", 14_074_000.4)); } + + // ------------------------------------------------------------------- + // prune_by_age (generic) + // ------------------------------------------------------------------- + + #[test] + fn prune_by_age_no_op_on_empty_deque() { + let mut q: VecDeque<(Instant, ())> = VecDeque::new(); + prune_by_age(&mut q, Duration::from_secs(60), Instant::now()); + assert!(q.is_empty()); + } + + #[test] + fn prune_by_age_keeps_fresh_entries() { + let now = Instant::now(); + let mut q: VecDeque<(Instant, u32)> = VecDeque::new(); + q.push_back((now - Duration::from_secs(1), 1)); + q.push_back((now - Duration::from_secs(2), 2)); + prune_by_age(&mut q, Duration::from_secs(60), now); + assert_eq!(q.len(), 2); + } + + #[test] + fn prune_by_age_drops_old_front_only() { + let now = Instant::now(); + let retention = Duration::from_secs(60); + let mut q: VecDeque<(Instant, u32)> = VecDeque::new(); + // Two stale entries followed by two fresh ones, in chronological order. + q.push_back((now - Duration::from_secs(120), 1)); + q.push_back((now - Duration::from_secs(90), 2)); + q.push_back((now - Duration::from_secs(30), 3)); + q.push_back((now - Duration::from_secs(5), 4)); + prune_by_age(&mut q, retention, now); + let kept: Vec = q.iter().map(|(_, v)| *v).collect(); + assert_eq!(kept, vec![3, 4]); + } + + #[test] + fn prune_by_age_drops_all_when_all_stale() { + let now = Instant::now(); + let mut q: VecDeque<(Instant, u32)> = VecDeque::new(); + q.push_back((now - Duration::from_secs(3600), 1)); + q.push_back((now - Duration::from_secs(3000), 2)); + prune_by_age(&mut q, Duration::from_secs(60), now); + assert!(q.is_empty()); + } + + #[test] + fn prune_by_age_stops_at_first_fresh_entry() { + // Out-of-order timestamps: a fresh entry in front blocks pruning of + // older entries behind it. Documents the front-only behaviour. + let now = Instant::now(); + let retention = Duration::from_secs(60); + let mut q: VecDeque<(Instant, u32)> = VecDeque::new(); + q.push_back((now - Duration::from_secs(5), 1)); + q.push_back((now - Duration::from_secs(120), 2)); + prune_by_age(&mut q, retention, now); + let kept: Vec = q.iter().map(|(_, v)| *v).collect(); + assert_eq!(kept, vec![1, 2]); + } + + // ------------------------------------------------------------------- + // DecoderHistories integration + // ------------------------------------------------------------------- + + fn sample_ais(mmsi: u32) -> AisMessage { + AisMessage { + rig_id: None, + ts_ms: None, + channel: "A".into(), + message_type: 1, + repeat: 0, + mmsi, + crc_ok: true, + bit_len: 168, + raw_bytes: vec![0u8; 21], + lat: None, + lon: None, + sog_knots: None, + cog_deg: None, + heading_deg: None, + nav_status: None, + vessel_name: None, + callsign: None, + destination: None, + } + } + + fn sample_aprs(crc_ok: bool) -> AprsPacket { + AprsPacket { + rig_id: None, + ts_ms: None, + src_call: "N0CALL".into(), + dest_call: "APRS".into(), + path: String::new(), + info: String::new(), + info_bytes: Vec::new(), + packet_type: "msg".into(), + crc_ok, + lat: None, + lon: None, + symbol_table: None, + symbol_code: None, + } + } + + fn sample_cw(text: &str) -> CwEvent { + CwEvent { + rig_id: None, + text: text.into(), + wpm: 20, + tone_hz: 700, + signal_on: true, + } + } + + fn sample_ft8(seq: i64) -> Ft8Message { + Ft8Message { + rig_id: None, + ts_ms: seq, + snr_db: -10.0, + dt_s: 0.5, + freq_hz: 1500.0, + message: format!("CQ {}", seq), + } + } + + #[test] + fn record_ais_then_snapshot_round_trip() { + let h = DecoderHistories::new(); + h.record_ais_message(sample_ais(1)); + h.record_ais_message(sample_ais(2)); + let snap = h.snapshot_ais_history(); + assert_eq!(snap.len(), 2); + assert_eq!(snap[0].mmsi, 1); + assert_eq!(snap[1].mmsi, 2); + } + + #[test] + fn record_ais_assigns_ts_ms_when_missing() { + let h = DecoderHistories::new(); + h.record_ais_message(sample_ais(42)); + let snap = h.snapshot_ais_history(); + assert!(snap[0].ts_ms.is_some()); + assert!(snap[0].ts_ms.unwrap() > 0); + } + + #[test] + fn record_ais_preserves_caller_supplied_ts_ms() { + let h = DecoderHistories::new(); + let mut msg = sample_ais(7); + msg.ts_ms = Some(1_700_000_000_000); + h.record_ais_message(msg); + let snap = h.snapshot_ais_history(); + assert_eq!(snap[0].ts_ms, Some(1_700_000_000_000)); + } + + #[test] + fn record_aprs_drops_packets_with_bad_crc() { + let h = DecoderHistories::new(); + h.record_aprs_packet(sample_aprs(false)); + h.record_aprs_packet(sample_aprs(false)); + assert!(h.snapshot_aprs_history().is_empty()); + assert_eq!(h.estimated_total_count(), 0); + } + + #[test] + fn record_aprs_keeps_packets_with_good_crc() { + let h = DecoderHistories::new(); + h.record_aprs_packet(sample_aprs(true)); + assert_eq!(h.snapshot_aprs_history().len(), 1); + assert_eq!(h.estimated_total_count(), 1); + } + + #[test] + fn record_evicts_oldest_when_at_capacity() { + let h = DecoderHistories::new(); + for i in 0..(MAX_HISTORY_ENTRIES + 5) { + let mut msg = sample_ais(i as u32); + msg.ts_ms = Some(i as i64); + h.record_ais_message(msg); + } + let snap = h.snapshot_ais_history(); + assert_eq!(snap.len(), MAX_HISTORY_ENTRIES); + // Oldest 5 (mmsi 0..5) should have been evicted; surviving front is mmsi 5. + assert_eq!(snap.first().unwrap().mmsi, 5); + assert_eq!(snap.last().unwrap().mmsi, (MAX_HISTORY_ENTRIES + 4) as u32); + } + + #[test] + fn estimated_total_count_starts_at_zero_and_tracks_records() { + let h = DecoderHistories::new(); + assert_eq!(h.estimated_total_count(), 0); + h.record_ais_message(sample_ais(1)); + h.record_aprs_packet(sample_aprs(true)); + h.record_cw_event(sample_cw("CQ")); + h.record_ft8_message(sample_ft8(1)); + h.record_ft4_message(sample_ft8(2)); + assert_eq!(h.estimated_total_count(), 5); + } + + #[test] + fn clear_aprs_history_resets_total_count_by_drained_amount() { + let h = DecoderHistories::new(); + h.record_aprs_packet(sample_aprs(true)); + h.record_aprs_packet(sample_aprs(true)); + h.record_ais_message(sample_ais(1)); + assert_eq!(h.estimated_total_count(), 3); + h.clear_aprs_history(); + assert!(h.snapshot_aprs_history().is_empty()); + // Only APRS entries are drained — AIS entry remains. + assert_eq!(h.estimated_total_count(), 1); + } + + #[test] + fn adjust_total_count_saturates_on_underflow() { + let h = DecoderHistories::new(); + // Counter is currently 0. Asking it to decrement by 100 must clamp to + // 0 instead of wrapping to usize::MAX (which would later panic when + // pre-allocating the history replay blob). + h.adjust_total_count(100, 0); + assert_eq!(h.estimated_total_count(), 0); + } + + #[test] + fn adjust_total_count_increments_then_decrements_correctly() { + let h = DecoderHistories::new(); + h.adjust_total_count(0, 7); + assert_eq!(h.estimated_total_count(), 7); + h.adjust_total_count(7, 4); + assert_eq!(h.estimated_total_count(), 4); + h.adjust_total_count(4, 4); // no-op + assert_eq!(h.estimated_total_count(), 4); + } + + #[test] + fn estimated_total_count_consistent_under_concurrent_records() { + use std::thread; + const THREADS: usize = 8; + const PER_THREAD: usize = 250; + let h = DecoderHistories::new(); + let handles: Vec<_> = (0..THREADS) + .map(|t| { + let h = h.clone(); + thread::spawn(move || { + for i in 0..PER_THREAD { + h.record_ais_message(sample_ais((t * PER_THREAD + i) as u32)); + } + }) + }) + .collect(); + for handle in handles { + handle.join().unwrap(); + } + let snap_len = h.snapshot_ais_history().len(); + let counter = h.estimated_total_count(); + assert_eq!(snap_len, THREADS * PER_THREAD); + assert_eq!(counter, THREADS * PER_THREAD); + } }