[test](trx-server): add Tier 2 unit tests for DecoderHistories

Extract a generic prune_by_age<T> helper from the 11 duplicated typed prune_* methods. The helper takes an explicit Instant so tests drive time deterministically, and it uses checked_sub so an early monotonic clock cannot panic on cutoff computation.

Tests cover: prune_by_age (empty / fresh / stale-front / all-stale / out-of-order); record/snapshot round-trip; auto-assignment of ts_ms when None; CRC-failed APRS packets are dropped; capacity eviction at MAX_HISTORY_ENTRIES; estimated_total_count tracks records and survives clear; adjust_total_count saturates on underflow; concurrent recorders converge to a consistent count.

16 new tests; trx-server suite now reports 87 passed (was 71).

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-05-03 20:08:34 +02:00
parent 105d9955df
commit c0296fe257
+287 -88
View File
@@ -287,6 +287,23 @@ fn enforce_capacity<T>(deque: &mut VecDeque<T>, max: usize) {
} }
} }
/// Drop entries older than `retention` from the front of a time-tagged deque.
/// Uses `checked_sub` so an early `now` (before `retention` elapses since the
/// monotonic clock origin) is treated as "no entries are old enough to prune"
/// rather than panicking.
fn prune_by_age<T>(deque: &mut VecDeque<(Instant, T)>, retention: Duration, now: Instant) {
let Some(cutoff) = now.checked_sub(retention) else {
return;
};
while let Some((ts, _)) = deque.front() {
if *ts < cutoff {
deque.pop_front();
} else {
break;
}
}
}
impl DecoderHistories { impl DecoderHistories {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
@@ -335,14 +352,7 @@ impl DecoderHistories {
// --- AIS --- // --- AIS ---
fn prune_ais(history: &mut VecDeque<(Instant, AisMessage)>) { fn prune_ais(history: &mut VecDeque<(Instant, AisMessage)>) {
let cutoff = Instant::now() - AIS_HISTORY_RETENTION; prune_by_age(history, AIS_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_ais_message(&self, mut msg: AisMessage) { pub fn record_ais_message(&self, mut msg: AisMessage) {
@@ -368,14 +378,7 @@ impl DecoderHistories {
// --- VDES --- // --- VDES ---
fn prune_vdes(history: &mut VecDeque<(Instant, VdesMessage)>) { fn prune_vdes(history: &mut VecDeque<(Instant, VdesMessage)>) {
let cutoff = Instant::now() - VDES_HISTORY_RETENTION; prune_by_age(history, VDES_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_vdes_message(&self, mut msg: VdesMessage) { pub fn record_vdes_message(&self, mut msg: VdesMessage) {
@@ -401,14 +404,7 @@ impl DecoderHistories {
// --- APRS --- // --- APRS ---
fn prune_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) { fn prune_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) {
let cutoff = Instant::now() - APRS_HISTORY_RETENTION; prune_by_age(history, APRS_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_aprs_packet(&self, mut pkt: AprsPacket) { pub fn record_aprs_packet(&self, mut pkt: AprsPacket) {
@@ -446,14 +442,7 @@ impl DecoderHistories {
// --- HF APRS --- // --- HF APRS ---
fn prune_hf_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) { fn prune_hf_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) {
let cutoff = Instant::now() - HF_APRS_HISTORY_RETENTION; prune_by_age(history, HF_APRS_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_hf_aprs_packet(&self, mut pkt: AprsPacket) { pub fn record_hf_aprs_packet(&self, mut pkt: AprsPacket) {
@@ -491,14 +480,7 @@ impl DecoderHistories {
// --- CW --- // --- CW ---
fn prune_cw(history: &mut VecDeque<(Instant, CwEvent)>) { fn prune_cw(history: &mut VecDeque<(Instant, CwEvent)>) {
let cutoff = Instant::now() - CW_HISTORY_RETENTION; prune_by_age(history, CW_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_cw_event(&self, evt: CwEvent) { pub fn record_cw_event(&self, evt: CwEvent) {
@@ -530,14 +512,7 @@ impl DecoderHistories {
// --- FT8 --- // --- FT8 ---
fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) { fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) {
let cutoff = Instant::now() - FT8_HISTORY_RETENTION; prune_by_age(history, FT8_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_ft8_message(&self, msg: Ft8Message) { pub fn record_ft8_message(&self, msg: Ft8Message) {
@@ -569,14 +544,7 @@ impl DecoderHistories {
// --- FT4 --- // --- FT4 ---
fn prune_ft4(history: &mut VecDeque<(Instant, Ft8Message)>) { fn prune_ft4(history: &mut VecDeque<(Instant, Ft8Message)>) {
let cutoff = Instant::now() - FT8_HISTORY_RETENTION; prune_by_age(history, FT8_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_ft4_message(&self, msg: Ft8Message) { pub fn record_ft4_message(&self, msg: Ft8Message) {
@@ -609,14 +577,7 @@ impl DecoderHistories {
#[cfg_attr(not(feature = "ft2"), allow(dead_code))] #[cfg_attr(not(feature = "ft2"), allow(dead_code))]
fn prune_ft2(history: &mut VecDeque<(Instant, Ft8Message)>) { fn prune_ft2(history: &mut VecDeque<(Instant, Ft8Message)>) {
let cutoff = Instant::now() - FT8_HISTORY_RETENTION; prune_by_age(history, FT8_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
#[cfg_attr(not(feature = "ft2"), allow(dead_code))] #[cfg_attr(not(feature = "ft2"), allow(dead_code))]
@@ -650,14 +611,7 @@ impl DecoderHistories {
// --- WSPR --- // --- WSPR ---
fn prune_wspr(history: &mut VecDeque<(Instant, WsprMessage)>) { fn prune_wspr(history: &mut VecDeque<(Instant, WsprMessage)>) {
let cutoff = Instant::now() - WSPR_HISTORY_RETENTION; prune_by_age(history, WSPR_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_wspr_message(&self, msg: WsprMessage) { pub fn record_wspr_message(&self, msg: WsprMessage) {
@@ -689,14 +643,7 @@ impl DecoderHistories {
// --- LRPT --- // --- LRPT ---
fn prune_lrpt(history: &mut VecDeque<(Instant, LrptImage)>) { fn prune_lrpt(history: &mut VecDeque<(Instant, LrptImage)>) {
let cutoff = Instant::now() - LRPT_HISTORY_RETENTION; prune_by_age(history, LRPT_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_lrpt_image(&self, mut img: LrptImage) { pub fn record_lrpt_image(&self, mut img: LrptImage) {
@@ -731,14 +678,7 @@ impl DecoderHistories {
// --- WEFAX --- // --- WEFAX ---
fn prune_wefax(history: &mut VecDeque<(Instant, WefaxMessage)>) { fn prune_wefax(history: &mut VecDeque<(Instant, WefaxMessage)>) {
let cutoff = Instant::now() - WEFAX_HISTORY_RETENTION; prune_by_age(history, WEFAX_HISTORY_RETENTION, Instant::now());
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
} }
pub fn record_wefax_message(&self, mut msg: WefaxMessage) { pub fn record_wefax_message(&self, mut msg: WefaxMessage) {
@@ -4339,4 +4279,263 @@ mod tests {
// Different text => emitted. // Different text => emitted.
assert!(should_emit_ft2_decode(&mut recent, "CQ ZZ", 14_074_000.4)); assert!(should_emit_ft2_decode(&mut recent, "CQ ZZ", 14_074_000.4));
} }
// -------------------------------------------------------------------
// prune_by_age (generic)
// -------------------------------------------------------------------
#[test]
fn prune_by_age_no_op_on_empty_deque() {
let mut q: VecDeque<(Instant, ())> = VecDeque::new();
prune_by_age(&mut q, Duration::from_secs(60), Instant::now());
assert!(q.is_empty());
}
#[test]
fn prune_by_age_keeps_fresh_entries() {
let now = Instant::now();
let mut q: VecDeque<(Instant, u32)> = VecDeque::new();
q.push_back((now - Duration::from_secs(1), 1));
q.push_back((now - Duration::from_secs(2), 2));
prune_by_age(&mut q, Duration::from_secs(60), now);
assert_eq!(q.len(), 2);
}
#[test]
fn prune_by_age_drops_old_front_only() {
let now = Instant::now();
let retention = Duration::from_secs(60);
let mut q: VecDeque<(Instant, u32)> = VecDeque::new();
// Two stale entries followed by two fresh ones, in chronological order.
q.push_back((now - Duration::from_secs(120), 1));
q.push_back((now - Duration::from_secs(90), 2));
q.push_back((now - Duration::from_secs(30), 3));
q.push_back((now - Duration::from_secs(5), 4));
prune_by_age(&mut q, retention, now);
let kept: Vec<u32> = q.iter().map(|(_, v)| *v).collect();
assert_eq!(kept, vec![3, 4]);
}
#[test]
fn prune_by_age_drops_all_when_all_stale() {
let now = Instant::now();
let mut q: VecDeque<(Instant, u32)> = VecDeque::new();
q.push_back((now - Duration::from_secs(3600), 1));
q.push_back((now - Duration::from_secs(3000), 2));
prune_by_age(&mut q, Duration::from_secs(60), now);
assert!(q.is_empty());
}
#[test]
fn prune_by_age_stops_at_first_fresh_entry() {
// Out-of-order timestamps: a fresh entry in front blocks pruning of
// older entries behind it. Documents the front-only behaviour.
let now = Instant::now();
let retention = Duration::from_secs(60);
let mut q: VecDeque<(Instant, u32)> = VecDeque::new();
q.push_back((now - Duration::from_secs(5), 1));
q.push_back((now - Duration::from_secs(120), 2));
prune_by_age(&mut q, retention, now);
let kept: Vec<u32> = q.iter().map(|(_, v)| *v).collect();
assert_eq!(kept, vec![1, 2]);
}
// -------------------------------------------------------------------
// DecoderHistories integration
// -------------------------------------------------------------------
fn sample_ais(mmsi: u32) -> AisMessage {
AisMessage {
rig_id: None,
ts_ms: None,
channel: "A".into(),
message_type: 1,
repeat: 0,
mmsi,
crc_ok: true,
bit_len: 168,
raw_bytes: vec![0u8; 21],
lat: None,
lon: None,
sog_knots: None,
cog_deg: None,
heading_deg: None,
nav_status: None,
vessel_name: None,
callsign: None,
destination: None,
}
}
fn sample_aprs(crc_ok: bool) -> AprsPacket {
AprsPacket {
rig_id: None,
ts_ms: None,
src_call: "N0CALL".into(),
dest_call: "APRS".into(),
path: String::new(),
info: String::new(),
info_bytes: Vec::new(),
packet_type: "msg".into(),
crc_ok,
lat: None,
lon: None,
symbol_table: None,
symbol_code: None,
}
}
fn sample_cw(text: &str) -> CwEvent {
CwEvent {
rig_id: None,
text: text.into(),
wpm: 20,
tone_hz: 700,
signal_on: true,
}
}
fn sample_ft8(seq: i64) -> Ft8Message {
Ft8Message {
rig_id: None,
ts_ms: seq,
snr_db: -10.0,
dt_s: 0.5,
freq_hz: 1500.0,
message: format!("CQ {}", seq),
}
}
#[test]
fn record_ais_then_snapshot_round_trip() {
let h = DecoderHistories::new();
h.record_ais_message(sample_ais(1));
h.record_ais_message(sample_ais(2));
let snap = h.snapshot_ais_history();
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].mmsi, 1);
assert_eq!(snap[1].mmsi, 2);
}
#[test]
fn record_ais_assigns_ts_ms_when_missing() {
let h = DecoderHistories::new();
h.record_ais_message(sample_ais(42));
let snap = h.snapshot_ais_history();
assert!(snap[0].ts_ms.is_some());
assert!(snap[0].ts_ms.unwrap() > 0);
}
#[test]
fn record_ais_preserves_caller_supplied_ts_ms() {
let h = DecoderHistories::new();
let mut msg = sample_ais(7);
msg.ts_ms = Some(1_700_000_000_000);
h.record_ais_message(msg);
let snap = h.snapshot_ais_history();
assert_eq!(snap[0].ts_ms, Some(1_700_000_000_000));
}
#[test]
fn record_aprs_drops_packets_with_bad_crc() {
let h = DecoderHistories::new();
h.record_aprs_packet(sample_aprs(false));
h.record_aprs_packet(sample_aprs(false));
assert!(h.snapshot_aprs_history().is_empty());
assert_eq!(h.estimated_total_count(), 0);
}
#[test]
fn record_aprs_keeps_packets_with_good_crc() {
let h = DecoderHistories::new();
h.record_aprs_packet(sample_aprs(true));
assert_eq!(h.snapshot_aprs_history().len(), 1);
assert_eq!(h.estimated_total_count(), 1);
}
#[test]
fn record_evicts_oldest_when_at_capacity() {
let h = DecoderHistories::new();
for i in 0..(MAX_HISTORY_ENTRIES + 5) {
let mut msg = sample_ais(i as u32);
msg.ts_ms = Some(i as i64);
h.record_ais_message(msg);
}
let snap = h.snapshot_ais_history();
assert_eq!(snap.len(), MAX_HISTORY_ENTRIES);
// Oldest 5 (mmsi 0..5) should have been evicted; surviving front is mmsi 5.
assert_eq!(snap.first().unwrap().mmsi, 5);
assert_eq!(snap.last().unwrap().mmsi, (MAX_HISTORY_ENTRIES + 4) as u32);
}
#[test]
fn estimated_total_count_starts_at_zero_and_tracks_records() {
let h = DecoderHistories::new();
assert_eq!(h.estimated_total_count(), 0);
h.record_ais_message(sample_ais(1));
h.record_aprs_packet(sample_aprs(true));
h.record_cw_event(sample_cw("CQ"));
h.record_ft8_message(sample_ft8(1));
h.record_ft4_message(sample_ft8(2));
assert_eq!(h.estimated_total_count(), 5);
}
#[test]
fn clear_aprs_history_resets_total_count_by_drained_amount() {
let h = DecoderHistories::new();
h.record_aprs_packet(sample_aprs(true));
h.record_aprs_packet(sample_aprs(true));
h.record_ais_message(sample_ais(1));
assert_eq!(h.estimated_total_count(), 3);
h.clear_aprs_history();
assert!(h.snapshot_aprs_history().is_empty());
// Only APRS entries are drained — AIS entry remains.
assert_eq!(h.estimated_total_count(), 1);
}
#[test]
fn adjust_total_count_saturates_on_underflow() {
let h = DecoderHistories::new();
// Counter is currently 0. Asking it to decrement by 100 must clamp to
// 0 instead of wrapping to usize::MAX (which would later panic when
// pre-allocating the history replay blob).
h.adjust_total_count(100, 0);
assert_eq!(h.estimated_total_count(), 0);
}
#[test]
fn adjust_total_count_increments_then_decrements_correctly() {
let h = DecoderHistories::new();
h.adjust_total_count(0, 7);
assert_eq!(h.estimated_total_count(), 7);
h.adjust_total_count(7, 4);
assert_eq!(h.estimated_total_count(), 4);
h.adjust_total_count(4, 4); // no-op
assert_eq!(h.estimated_total_count(), 4);
}
#[test]
fn estimated_total_count_consistent_under_concurrent_records() {
use std::thread;
const THREADS: usize = 8;
const PER_THREAD: usize = 250;
let h = DecoderHistories::new();
let handles: Vec<_> = (0..THREADS)
.map(|t| {
let h = h.clone();
thread::spawn(move || {
for i in 0..PER_THREAD {
h.record_ais_message(sample_ais((t * PER_THREAD + i) as u32));
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let snap_len = h.snapshot_ais_history().len();
let counter = h.estimated_total_count();
assert_eq!(snap_len, THREADS * PER_THREAD);
assert_eq!(counter, THREADS * PER_THREAD);
}
} }