diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 20b7c1b..c28545c 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -227,6 +227,92 @@ impl StreamErrorLogger { } } +/// Serialise every decoder's history into a single in-memory blob of length- +/// prefixed records (`[u8 type][u32 BE len][JSON payload]`). Returns the blob +/// and the number of records written. Callers chunk + gzip + send the blob +/// over the audio socket; tests inspect it directly. +fn build_history_blob(histories: &DecoderHistories) -> (Vec, usize) { + // Estimate ~256 bytes per message; pre-allocate to avoid repeated + // reallocation for large histories. + let estimated_msgs = histories.estimated_total_count().min(500_000); + let mut blob: Vec = Vec::with_capacity(estimated_msgs.saturating_mul(256)); + let mut count = 0usize; + + macro_rules! push_history { + ($msgs:expr, $variant:expr, $msg_type:expr) => { + for item in $msgs { + let wrapped = $variant(item); + if let Ok(json) = serde_json::to_vec(&wrapped) { + let len = json.len() as u32; + blob.push($msg_type); + blob.extend_from_slice(&len.to_be_bytes()); + blob.extend_from_slice(&json); + count += 1; + } + } + }; + } + + push_history!( + histories.snapshot_ais_history(), + DecodedMessage::Ais, + AUDIO_MSG_AIS_DECODE + ); + push_history!( + histories.snapshot_vdes_history(), + DecodedMessage::Vdes, + AUDIO_MSG_VDES_DECODE + ); + push_history!( + histories.snapshot_aprs_history(), + DecodedMessage::Aprs, + AUDIO_MSG_APRS_DECODE + ); + push_history!( + histories.snapshot_hf_aprs_history(), + DecodedMessage::HfAprs, + AUDIO_MSG_HF_APRS_DECODE + ); + push_history!( + histories.snapshot_ft8_history(), + DecodedMessage::Ft8, + AUDIO_MSG_FT8_DECODE + ); + push_history!( + histories.snapshot_ft4_history(), + DecodedMessage::Ft4, + AUDIO_MSG_FT4_DECODE + ); + #[cfg(feature = "ft2")] + push_history!( + histories.snapshot_ft2_history(), + DecodedMessage::Ft2, + AUDIO_MSG_FT2_DECODE + ); + push_history!( + histories.snapshot_wspr_history(), + DecodedMessage::Wspr, + AUDIO_MSG_WSPR_DECODE + ); + push_history!( + histories.snapshot_cw_history(), + DecodedMessage::Cw, + AUDIO_MSG_CW_DECODE + ); + push_history!( + histories.snapshot_lrpt_history(), + DecodedMessage::LrptImage, + AUDIO_MSG_LRPT_IMAGE + ); + push_history!( + histories.snapshot_wefax_history(), + DecodedMessage::Wefax, + AUDIO_MSG_WEFAX_DECODE + ); + + (blob, count) +} + /// Walk a length-prefixed history blob and return the byte ranges of each /// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is /// flushed when adding the next record would push it past `threshold`. A @@ -3399,88 +3485,7 @@ async fn handle_audio_client( // can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces; // chunking keeps each frame well under the limit while still amortising // the gzip cost across many records per chunk. - let history_blob = { - // Estimate ~256 bytes per message; pre-allocate to avoid repeated - // reallocation for large histories. - let estimated_msgs = histories.estimated_total_count().min(500_000); - let mut blob: Vec = Vec::with_capacity(estimated_msgs.saturating_mul(256)); - let mut count = 0usize; - - macro_rules! push_history { - ($msgs:expr, $variant:expr, $msg_type:expr) => { - for item in $msgs { - let wrapped = $variant(item); - if let Ok(json) = serde_json::to_vec(&wrapped) { - let len = json.len() as u32; - blob.push($msg_type); - blob.extend_from_slice(&len.to_be_bytes()); - blob.extend_from_slice(&json); - count += 1; - } - } - }; - } - - push_history!( - histories.snapshot_ais_history(), - DecodedMessage::Ais, - AUDIO_MSG_AIS_DECODE - ); - push_history!( - histories.snapshot_vdes_history(), - DecodedMessage::Vdes, - AUDIO_MSG_VDES_DECODE - ); - push_history!( - histories.snapshot_aprs_history(), - DecodedMessage::Aprs, - AUDIO_MSG_APRS_DECODE - ); - push_history!( - histories.snapshot_hf_aprs_history(), - DecodedMessage::HfAprs, - AUDIO_MSG_HF_APRS_DECODE - ); - push_history!( - histories.snapshot_ft8_history(), - DecodedMessage::Ft8, - AUDIO_MSG_FT8_DECODE - ); - push_history!( - histories.snapshot_ft4_history(), - DecodedMessage::Ft4, - AUDIO_MSG_FT4_DECODE - ); - #[cfg(feature = "ft2")] - push_history!( - histories.snapshot_ft2_history(), - DecodedMessage::Ft2, - AUDIO_MSG_FT2_DECODE - ); - push_history!( - histories.snapshot_wspr_history(), - DecodedMessage::Wspr, - AUDIO_MSG_WSPR_DECODE - ); - push_history!( - histories.snapshot_cw_history(), - DecodedMessage::Cw, - AUDIO_MSG_CW_DECODE - ); - push_history!( - histories.snapshot_lrpt_history(), - DecodedMessage::LrptImage, - AUDIO_MSG_LRPT_IMAGE - ); - push_history!( - histories.snapshot_wefax_history(), - DecodedMessage::Wefax, - AUDIO_MSG_WEFAX_DECODE - ); - - (blob, count) - }; - let (blob, replayed_history_count) = history_blob; + let (blob, replayed_history_count) = build_history_blob(&histories); if !blob.is_empty() { // Target uncompressed bytes per chunk. JSON compresses ~10-20x so // 8 MiB uncompressed lands well under the 16 MiB compressed cap even @@ -4673,4 +4678,155 @@ mod tests { assert_eq!(snap_len, THREADS * PER_THREAD); assert_eq!(counter, THREADS * PER_THREAD); } + + // ------------------------------------------------------------------- + // build_history_blob (integration with DecoderHistories + framing) + // ------------------------------------------------------------------- + + /// Walk a length-prefixed blob and return `(type, payload)` for every + /// record. Mirrors the parsing the client does on the wire. + fn parse_blob_records(blob: &[u8]) -> Vec<(u8, Vec)> { + let mut out = Vec::new(); + let mut pos = 0usize; + while pos + 5 <= blob.len() { + let msg_type = blob[pos]; + let len = + u32::from_be_bytes([blob[pos + 1], blob[pos + 2], blob[pos + 3], blob[pos + 4]]) + as usize; + pos += 5; + assert!(pos + len <= blob.len(), "truncated blob"); + out.push((msg_type, blob[pos..pos + len].to_vec())); + pos += len; + } + assert_eq!(pos, blob.len(), "trailing bytes in blob"); + out + } + + fn gunzip(data: &[u8]) -> Vec { + use flate2::read::GzDecoder; + use std::io::Read; + let mut decoder = GzDecoder::new(data); + let mut out = Vec::new(); + decoder.read_to_end(&mut out).expect("gunzip"); + out + } + + #[test] + fn build_history_blob_empty_when_no_records() { + let h = DecoderHistories::new(); + let (blob, count) = build_history_blob(&h); + assert!(blob.is_empty()); + assert_eq!(count, 0); + } + + #[test] + fn build_history_blob_emits_records_with_correct_type_bytes() { + let h = DecoderHistories::new(); + h.record_ais_message(sample_ais(101)); + h.record_aprs_packet(sample_aprs(true)); + h.record_cw_event(sample_cw("CQ TEST")); + h.record_ft8_message(sample_ft8(42)); + h.record_ft4_message(sample_ft8(43)); + + let (blob, count) = build_history_blob(&h); + assert_eq!(count, 5); + + let records = parse_blob_records(&blob); + let types: Vec = records.iter().map(|(t, _)| *t).collect(); + // Decoder iteration order in build_history_blob is fixed: + // AIS, VDES, APRS, HF_APRS, FT8, FT4, [FT2], WSPR, CW, LRPT, WEFAX. + assert_eq!( + types, + vec![ + AUDIO_MSG_AIS_DECODE, + AUDIO_MSG_APRS_DECODE, + AUDIO_MSG_FT8_DECODE, + AUDIO_MSG_FT4_DECODE, + AUDIO_MSG_CW_DECODE, + ] + ); + + // Each record body is a JSON-serialised DecodedMessage variant. + for (_, payload) in &records { + let msg: DecodedMessage = serde_json::from_slice(payload).expect("valid JSON"); + // Smoke-check: every variant deserialises to one of the expected types. + match msg { + DecodedMessage::Ais(_) + | DecodedMessage::Aprs(_) + | DecodedMessage::Cw(_) + | DecodedMessage::Ft8(_) + | DecodedMessage::Ft4(_) => {} + other => panic!("unexpected variant: {:?}", other), + } + } + } + + #[test] + fn build_history_blob_chunk_split_then_decompress_recovers_all_records() { + // End-to-end pipeline: build → split → gzip per chunk → decompress + // each chunk → reassembly must match the original blob byte-for-byte. + let h = DecoderHistories::new(); + for i in 0..200 { + h.record_ft8_message(sample_ft8(i)); + } + let (blob, count) = build_history_blob(&h); + assert_eq!(count, 200); + + // Threshold small enough to force multiple chunks. + let threshold = blob.len() / 4; + let ranges = split_history_chunks(&blob, threshold); + assert!( + ranges.len() >= 2, + "expected multiple chunks, got {}", + ranges.len() + ); + + let mut reassembled: Vec = Vec::new(); + for r in &ranges { + // Gzip then immediately decompress — proves both directions of + // the on-the-wire transformation are intact. + let slice = &blob[r.clone()]; + let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); + enc.write_all(slice).unwrap(); + let compressed = enc.finish().unwrap(); + let decompressed = gunzip(&compressed); + assert_eq!(decompressed, slice); + reassembled.extend_from_slice(&decompressed); + } + assert_eq!(reassembled, blob); + + // Each compressed chunk must fit under the wire-side history cap. + for r in &ranges { + let mut enc = GzEncoder::new(Vec::new(), Compression::fast()); + enc.write_all(&blob[r.clone()]).unwrap(); + let compressed = enc.finish().unwrap(); + assert!( + (compressed.len() as u32) < 16 * 1024 * 1024, + "chunk over cap: {} bytes", + compressed.len() + ); + } + } + + #[test] + fn build_history_blob_skips_decoders_with_no_history() { + let h = DecoderHistories::new(); + // Only one decoder gets a record. + h.record_aprs_packet(sample_aprs(true)); + let (blob, count) = build_history_blob(&h); + assert_eq!(count, 1); + let records = parse_blob_records(&blob); + assert_eq!(records.len(), 1); + assert_eq!(records[0].0, AUDIO_MSG_APRS_DECODE); + } + + #[test] + fn build_history_blob_count_matches_estimated_total_count() { + let h = DecoderHistories::new(); + h.record_ais_message(sample_ais(1)); + h.record_aprs_packet(sample_aprs(true)); + h.record_cw_event(sample_cw("X")); + let (_, count) = build_history_blob(&h); + assert_eq!(count, h.estimated_total_count()); + } }