From 5919d4d4c0b946bdb17b8d8fa6e6d82aee6ce858 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sun, 3 May 2026 20:16:39 +0200 Subject: [PATCH] [test](trx-server): add Tier 5 integration tests for history pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract build_history_blob from handle_audio_client so the blob construction can be exercised in isolation. The function is called once per audio client connect and was previously buried inside an async TCP handler. Integration tests assert: empty histories produce an empty blob; record types appear with the correct AUDIO_MSG_* type bytes in the documented decoder iteration order; build → split_history_chunks → gzip → decompress recovers the original blob byte-for-byte and every chunk fits under MAX_HISTORY_PAYLOAD_SIZE; decoders with no history are skipped; the returned count matches estimated_total_count(). 5 new tests; trx-server suite now reports 130 passed (was 110). Co-authored-by: Claude Opus 4.7 Signed-off-by: Stan Grams --- src/trx-server/src/audio.rs | 320 +++++++++++++++++++++++++++--------- 1 file changed, 238 insertions(+), 82 deletions(-) diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 20b7c1b..c28545c 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -227,6 +227,92 @@ impl StreamErrorLogger { } } +/// Serialise every decoder's history into a single in-memory blob of length- +/// prefixed records (`[u8 type][u32 BE len][JSON payload]`). Returns the blob +/// and the number of records written. Callers chunk + gzip + send the blob +/// over the audio socket; tests inspect it directly. +fn build_history_blob(histories: &DecoderHistories) -> (Vec, usize) { + // Estimate ~256 bytes per message; pre-allocate to avoid repeated + // reallocation for large histories. + let estimated_msgs = histories.estimated_total_count().min(500_000); + let mut blob: Vec = Vec::with_capacity(estimated_msgs.saturating_mul(256)); + let mut count = 0usize; + + macro_rules! push_history { + ($msgs:expr, $variant:expr, $msg_type:expr) => { + for item in $msgs { + let wrapped = $variant(item); + if let Ok(json) = serde_json::to_vec(&wrapped) { + let len = json.len() as u32; + blob.push($msg_type); + blob.extend_from_slice(&len.to_be_bytes()); + blob.extend_from_slice(&json); + count += 1; + } + } + }; + } + + push_history!( + histories.snapshot_ais_history(), + DecodedMessage::Ais, + AUDIO_MSG_AIS_DECODE + ); + push_history!( + histories.snapshot_vdes_history(), + DecodedMessage::Vdes, + AUDIO_MSG_VDES_DECODE + ); + push_history!( + histories.snapshot_aprs_history(), + DecodedMessage::Aprs, + AUDIO_MSG_APRS_DECODE + ); + push_history!( + histories.snapshot_hf_aprs_history(), + DecodedMessage::HfAprs, + AUDIO_MSG_HF_APRS_DECODE + ); + push_history!( + histories.snapshot_ft8_history(), + DecodedMessage::Ft8, + AUDIO_MSG_FT8_DECODE + ); + push_history!( + histories.snapshot_ft4_history(), + DecodedMessage::Ft4, + AUDIO_MSG_FT4_DECODE + ); + #[cfg(feature = "ft2")] + push_history!( + histories.snapshot_ft2_history(), + DecodedMessage::Ft2, + AUDIO_MSG_FT2_DECODE + ); + push_history!( + histories.snapshot_wspr_history(), + DecodedMessage::Wspr, + AUDIO_MSG_WSPR_DECODE + ); + push_history!( + histories.snapshot_cw_history(), + DecodedMessage::Cw, + AUDIO_MSG_CW_DECODE + ); + push_history!( + histories.snapshot_lrpt_history(), + DecodedMessage::LrptImage, + AUDIO_MSG_LRPT_IMAGE + ); + push_history!( + histories.snapshot_wefax_history(), + DecodedMessage::Wefax, + AUDIO_MSG_WEFAX_DECODE + ); + + (blob, count) +} + /// Walk a length-prefixed history blob and return the byte ranges of each /// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is /// flushed when adding the next record would push it past `threshold`. A @@ -3399,88 +3485,7 @@ async fn handle_audio_client( // can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces; // chunking keeps each frame well under the limit while still amortising // the gzip cost across many records per chunk. - let history_blob = { - // Estimate ~256 bytes per message; pre-allocate to avoid repeated - // reallocation for large histories. - let estimated_msgs = histories.estimated_total_count().min(500_000); - let mut blob: Vec = Vec::with_capacity(estimated_msgs.saturating_mul(256)); - let mut count = 0usize; - - macro_rules! push_history { - ($msgs:expr, $variant:expr, $msg_type:expr) => { - for item in $msgs { - let wrapped = $variant(item); - if let Ok(json) = serde_json::to_vec(&wrapped) { - let len = json.len() as u32; - blob.push($msg_type); - blob.extend_from_slice(&len.to_be_bytes()); - blob.extend_from_slice(&json); - count += 1; - } - } - }; - } - - push_history!( - histories.snapshot_ais_history(), - DecodedMessage::Ais, - AUDIO_MSG_AIS_DECODE - ); - push_history!( - histories.snapshot_vdes_history(), - DecodedMessage::Vdes, - AUDIO_MSG_VDES_DECODE - ); - push_history!( - histories.snapshot_aprs_history(), - DecodedMessage::Aprs, - AUDIO_MSG_APRS_DECODE - ); - push_history!( - histories.snapshot_hf_aprs_history(), - DecodedMessage::HfAprs, - AUDIO_MSG_HF_APRS_DECODE - ); - push_history!( - histories.snapshot_ft8_history(), - DecodedMessage::Ft8, - AUDIO_MSG_FT8_DECODE - ); - push_history!( - histories.snapshot_ft4_history(), - DecodedMessage::Ft4, - AUDIO_MSG_FT4_DECODE - ); - #[cfg(feature = "ft2")] - push_history!( - histories.snapshot_ft2_history(), - DecodedMessage::Ft2, - AUDIO_MSG_FT2_DECODE - ); - push_history!( - histories.snapshot_wspr_history(), - DecodedMessage::Wspr, - AUDIO_MSG_WSPR_DECODE - ); - push_history!( - histories.snapshot_cw_history(), - DecodedMessage::Cw, - AUDIO_MSG_CW_DECODE - ); - push_history!( - histories.snapshot_lrpt_history(), - DecodedMessage::LrptImage, - AUDIO_MSG_LRPT_IMAGE - ); - push_history!( - histories.snapshot_wefax_history(), - DecodedMessage::Wefax, - AUDIO_MSG_WEFAX_DECODE - ); - - (blob, count) - }; - let (blob, replayed_history_count) = history_blob; + let (blob, replayed_history_count) = build_history_blob(&histories); if !blob.is_empty() { // Target uncompressed bytes per chunk. JSON compresses ~10-20x so // 8 MiB uncompressed lands well under the 16 MiB compressed cap even @@ -4673,4 +4678,155 @@ mod tests { assert_eq!(snap_len, THREADS * PER_THREAD); assert_eq!(counter, THREADS * PER_THREAD); } + + // ------------------------------------------------------------------- + // build_history_blob (integration with DecoderHistories + framing) + // ------------------------------------------------------------------- + + /// Walk a length-prefixed blob and return `(type, payload)` for every + /// record. Mirrors the parsing the client does on the wire. + fn parse_blob_records(blob: &[u8]) -> Vec<(u8, Vec)> { + let mut out = Vec::new(); + let mut pos = 0usize; + while pos + 5 <= blob.len() { + let msg_type = blob[pos]; + let len = + u32::from_be_bytes([blob[pos + 1], blob[pos + 2], blob[pos + 3], blob[pos + 4]]) + as usize; + pos += 5; + assert!(pos + len <= blob.len(), "truncated blob"); + out.push((msg_type, blob[pos..pos + len].to_vec())); + pos += len; + } + assert_eq!(pos, blob.len(), "trailing bytes in blob"); + out + } + + fn gunzip(data: &[u8]) -> Vec { + use flate2::read::GzDecoder; + use std::io::Read; + let mut decoder = GzDecoder::new(data); + let mut out = Vec::new(); + decoder.read_to_end(&mut out).expect("gunzip"); + out + } + + #[test] + fn build_history_blob_empty_when_no_records() { + let h = DecoderHistories::new(); + let (blob, count) = build_history_blob(&h); + assert!(blob.is_empty()); + assert_eq!(count, 0); + } + + #[test] + fn build_history_blob_emits_records_with_correct_type_bytes() { + let h = DecoderHistories::new(); + h.record_ais_message(sample_ais(101)); + h.record_aprs_packet(sample_aprs(true)); + h.record_cw_event(sample_cw("CQ TEST")); + h.record_ft8_message(sample_ft8(42)); + h.record_ft4_message(sample_ft8(43)); + + let (blob, count) = build_history_blob(&h); + assert_eq!(count, 5); + + let records = parse_blob_records(&blob); + let types: Vec = records.iter().map(|(t, _)| *t).collect(); + // Decoder iteration order in build_history_blob is fixed: + // AIS, VDES, APRS, HF_APRS, FT8, FT4, [FT2], WSPR, CW, LRPT, WEFAX. + assert_eq!( + types, + vec![ + AUDIO_MSG_AIS_DECODE, + AUDIO_MSG_APRS_DECODE, + AUDIO_MSG_FT8_DECODE, + AUDIO_MSG_FT4_DECODE, + AUDIO_MSG_CW_DECODE, + ] + ); + + // Each record body is a JSON-serialised DecodedMessage variant. + for (_, payload) in &records { + let msg: DecodedMessage = serde_json::from_slice(payload).expect("valid JSON"); + // Smoke-check: every variant deserialises to one of the expected types. + match msg { + DecodedMessage::Ais(_) + | DecodedMessage::Aprs(_) + | DecodedMessage::Cw(_) + | DecodedMessage::Ft8(_) + | DecodedMessage::Ft4(_) => {} + other => panic!("unexpected variant: {:?}", other), + } + } + } + + #[test] + fn build_history_blob_chunk_split_then_decompress_recovers_all_records() { + // End-to-end pipeline: build → split → gzip per chunk → decompress + // each chunk → reassembly must match the original blob byte-for-byte. + let h = DecoderHistories::new(); + for i in 0..200 { + h.record_ft8_message(sample_ft8(i)); + } + let (blob, count) = build_history_blob(&h); + assert_eq!(count, 200); + + // Threshold small enough to force multiple chunks. + let threshold = blob.len() / 4; + let ranges = split_history_chunks(&blob, threshold); + assert!( + ranges.len() >= 2, + "expected multiple chunks, got {}", + ranges.len() + ); + + let mut reassembled: Vec = Vec::new(); + for r in &ranges { + // Gzip then immediately decompress — proves both directions of + // the on-the-wire transformation are intact. + let slice = &blob[r.clone()]; + let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); + enc.write_all(slice).unwrap(); + let compressed = enc.finish().unwrap(); + let decompressed = gunzip(&compressed); + assert_eq!(decompressed, slice); + reassembled.extend_from_slice(&decompressed); + } + assert_eq!(reassembled, blob); + + // Each compressed chunk must fit under the wire-side history cap. + for r in &ranges { + let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); + enc.write_all(&blob[r.clone()]).unwrap(); + let compressed = enc.finish().unwrap(); + assert!( + (compressed.len() as u32) < 16 * 1024 * 1024, + "chunk over cap: {} bytes", + compressed.len() + ); + } + } + + #[test] + fn build_history_blob_skips_decoders_with_no_history() { + let h = DecoderHistories::new(); + // Only one decoder gets a record. + h.record_aprs_packet(sample_aprs(true)); + let (blob, count) = build_history_blob(&h); + assert_eq!(count, 1); + let records = parse_blob_records(&blob); + assert_eq!(records.len(), 1); + assert_eq!(records[0].0, AUDIO_MSG_APRS_DECODE); + } + + #[test] + fn build_history_blob_count_matches_estimated_total_count() { + let h = DecoderHistories::new(); + h.record_ais_message(sample_ais(1)); + h.record_aprs_packet(sample_aprs(true)); + h.record_cw_event(sample_cw("X")); + let (_, count) = build_history_blob(&h); + assert_eq!(count, h.estimated_total_count()); + } }