Compare commits

..

2 Commits

Author SHA1 Message Date
sjg 5919d4d4c0 [test](trx-server): add Tier 5 integration tests for history pipeline
Extract build_history_blob from handle_audio_client so the blob construction can be exercised in isolation. The function is called once per audio client connect and was previously buried inside an async TCP handler.

Integration tests assert: empty histories produce an empty blob; record types appear with the correct AUDIO_MSG_* type bytes in the documented decoder iteration order; build → split_history_chunks → gzip → decompress recovers the original blob byte-for-byte and every chunk fits under MAX_HISTORY_PAYLOAD_SIZE; decoders with no history are skipped; the returned count matches estimated_total_count().

5 new tests; trx-server suite now reports 130 passed (was 110).

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
2026-05-03 20:16:39 +02:00
sjg 81ec8191a1 [test](trx-core): add Tier 5 unit tests for audio framing helpers
Cover write_audio_msg / read_audio_msg / write_vchan_* / parse_vchan_* with round-trip and edge-case tests over an in-memory buffer (no sockets). Catches regressions in the wire format used by trx-server's audio listener and the trx-client audio reader.

Tests: type+length+payload round-trip, empty payload, consecutive frames, EOF before header / mid-payload, oversize normal frame rejected, history-compressed frame allowed up to its larger cap, history cap still enforced past 16 MiB, vchan UUID + audio frame round-trips, short-payload rejection, AudioStreamInfo JSON round-trip, write_audio_msg_buffered byte-equivalence.

15 new tests in trx-core.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
2026-05-03 20:16:30 +02:00
2 changed files with 448 additions and 82 deletions
+210
View File
@@ -193,3 +193,213 @@ pub fn parse_vchan_uuid_msg(payload: &[u8]) -> std::io::Result<Uuid> {
} }
Ok(Uuid::from_bytes(payload[..16].try_into().unwrap())) Ok(Uuid::from_bytes(payload[..16].try_into().unwrap()))
} }
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::BufReader;
#[tokio::test]
async fn write_then_read_round_trip_preserves_type_and_payload() {
let mut buf: Vec<u8> = Vec::new();
let payload = b"hello, world";
write_audio_msg(&mut buf, AUDIO_MSG_FT8_DECODE, payload)
.await
.unwrap();
// Wire bytes: 1 byte type + 4 bytes BE length + payload.
assert_eq!(buf.len(), 1 + 4 + payload.len());
assert_eq!(buf[0], AUDIO_MSG_FT8_DECODE);
assert_eq!(
u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]),
payload.len() as u32
);
assert_eq!(&buf[5..], payload);
let mut reader = BufReader::new(&buf[..]);
let (msg_type, got) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!(msg_type, AUDIO_MSG_FT8_DECODE);
assert_eq!(got, payload);
}
#[tokio::test]
async fn write_then_read_handles_empty_payload() {
let mut buf: Vec<u8> = Vec::new();
write_audio_msg(&mut buf, AUDIO_MSG_RX_FRAME, &[])
.await
.unwrap();
let mut reader = BufReader::new(&buf[..]);
let (msg_type, got) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!(msg_type, AUDIO_MSG_RX_FRAME);
assert!(got.is_empty());
}
#[tokio::test]
async fn read_audio_msg_decodes_consecutive_frames() {
let mut buf: Vec<u8> = Vec::new();
write_audio_msg(&mut buf, AUDIO_MSG_FT8_DECODE, b"a")
.await
.unwrap();
write_audio_msg(&mut buf, AUDIO_MSG_FT4_DECODE, b"bb")
.await
.unwrap();
write_audio_msg(&mut buf, AUDIO_MSG_AIS_DECODE, b"ccc")
.await
.unwrap();
let mut reader = BufReader::new(&buf[..]);
let (t, p) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!((t, p.as_slice()), (AUDIO_MSG_FT8_DECODE, b"a".as_slice()));
let (t, p) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!((t, p.as_slice()), (AUDIO_MSG_FT4_DECODE, b"bb".as_slice()));
let (t, p) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!((t, p.as_slice()), (AUDIO_MSG_AIS_DECODE, b"ccc".as_slice()));
}
#[tokio::test]
async fn read_audio_msg_eof_before_header_returns_unexpected_eof() {
let buf: Vec<u8> = Vec::new();
let mut reader = BufReader::new(&buf[..]);
let err = read_audio_msg(&mut reader).await.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
}
#[tokio::test]
async fn read_audio_msg_eof_mid_payload_returns_unexpected_eof() {
// Header claims 16 bytes; only 4 follow.
let mut buf: Vec<u8> = vec![AUDIO_MSG_FT8_DECODE];
buf.extend_from_slice(&16u32.to_be_bytes());
buf.extend_from_slice(&[0xAA; 4]);
let mut reader = BufReader::new(&buf[..]);
let err = read_audio_msg(&mut reader).await.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
}
#[tokio::test]
async fn read_audio_msg_rejects_oversize_normal_frame() {
// Type ≠ HISTORY_COMPRESSED → cap is MAX_PAYLOAD_SIZE (1 MiB). Claim
// 2 MiB and leave the body absent — we should fail before reading.
let mut buf: Vec<u8> = vec![AUDIO_MSG_FT8_DECODE];
buf.extend_from_slice(&(2 * 1024 * 1024u32).to_be_bytes());
let mut reader = BufReader::new(&buf[..]);
let err = read_audio_msg(&mut reader).await.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
#[tokio::test]
async fn read_audio_msg_history_frame_allows_larger_payload() {
// 2 MiB exceeds MAX_PAYLOAD_SIZE but is below MAX_HISTORY_PAYLOAD_SIZE.
// Reading should succeed when the type is HISTORY_COMPRESSED.
let payload = vec![0xCDu8; 2 * 1024 * 1024];
let mut buf: Vec<u8> = Vec::with_capacity(5 + payload.len());
write_audio_msg(&mut buf, AUDIO_MSG_HISTORY_COMPRESSED, &payload)
.await
.unwrap();
let mut reader = BufReader::new(&buf[..]);
let (msg_type, got) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!(msg_type, AUDIO_MSG_HISTORY_COMPRESSED);
assert_eq!(got.len(), payload.len());
}
#[tokio::test]
async fn read_audio_msg_history_frame_rejects_above_history_cap() {
// Claim 32 MiB, body absent. Should still fail (cap enforced even for
// history-compressed type).
let mut buf: Vec<u8> = vec![AUDIO_MSG_HISTORY_COMPRESSED];
buf.extend_from_slice(&(32 * 1024 * 1024u32).to_be_bytes());
let mut reader = BufReader::new(&buf[..]);
let err = read_audio_msg(&mut reader).await.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
#[tokio::test]
async fn vchan_uuid_msg_round_trip() {
let uuid = Uuid::new_v4();
let mut buf: Vec<u8> = Vec::new();
write_vchan_uuid_msg(&mut buf, AUDIO_MSG_VCHAN_SUB, uuid)
.await
.unwrap();
let mut reader = BufReader::new(&buf[..]);
let (msg_type, payload) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!(msg_type, AUDIO_MSG_VCHAN_SUB);
let got = parse_vchan_uuid_msg(&payload).unwrap();
assert_eq!(got, uuid);
}
#[test]
fn parse_vchan_uuid_msg_rejects_short_payload() {
let err = parse_vchan_uuid_msg(&[0u8; 8]).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
// Empty payload also rejected.
let err = parse_vchan_uuid_msg(&[]).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
#[tokio::test]
async fn vchan_audio_frame_round_trip() {
let uuid = Uuid::new_v4();
let opus = b"\x80\x81\x82 fake opus payload";
let mut buf: Vec<u8> = Vec::new();
write_vchan_audio_frame(&mut buf, uuid, opus).await.unwrap();
let mut reader = BufReader::new(&buf[..]);
let (msg_type, payload) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!(msg_type, AUDIO_MSG_RX_FRAME_CH);
let (got_uuid, got_opus) = parse_vchan_audio_frame(&payload).unwrap();
assert_eq!(got_uuid, uuid);
assert_eq!(got_opus, opus);
}
#[test]
fn parse_vchan_audio_frame_rejects_short_payload() {
let err = parse_vchan_audio_frame(&[0u8; 8]).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
#[test]
fn parse_vchan_audio_frame_handles_empty_opus() {
// Exactly 16 bytes: UUID with no opus payload.
let uuid = Uuid::new_v4();
let payload = uuid.as_bytes().to_vec();
let (got_uuid, got_opus) = parse_vchan_audio_frame(&payload).unwrap();
assert_eq!(got_uuid, uuid);
assert!(got_opus.is_empty());
}
#[tokio::test]
async fn audio_stream_info_serialises_round_trip() {
let info = AudioStreamInfo {
sample_rate: 48_000,
channels: 2,
frame_duration_ms: 20,
bitrate_bps: 64_000,
};
let json = serde_json::to_vec(&info).unwrap();
let mut buf: Vec<u8> = Vec::new();
write_audio_msg(&mut buf, AUDIO_MSG_STREAM_INFO, &json)
.await
.unwrap();
let mut reader = BufReader::new(&buf[..]);
let (msg_type, payload) = read_audio_msg(&mut reader).await.unwrap();
assert_eq!(msg_type, AUDIO_MSG_STREAM_INFO);
let parsed: AudioStreamInfo = serde_json::from_slice(&payload).unwrap();
assert_eq!(parsed.sample_rate, info.sample_rate);
assert_eq!(parsed.channels, info.channels);
assert_eq!(parsed.frame_duration_ms, info.frame_duration_ms);
assert_eq!(parsed.bitrate_bps, info.bitrate_bps);
}
#[tokio::test]
async fn write_audio_msg_buffered_does_not_flush() {
// Smoke test that the buffered variant produces equivalent bytes to
// the flushed variant — Vec<u8> doesn't actually "buffer" so this is
// mostly a behavioural check that no extra padding/headers slip in.
let mut a: Vec<u8> = Vec::new();
let mut b: Vec<u8> = Vec::new();
write_audio_msg_buffered(&mut a, AUDIO_MSG_FT8_DECODE, b"x")
.await
.unwrap();
write_audio_msg(&mut b, AUDIO_MSG_FT8_DECODE, b"x")
.await
.unwrap();
assert_eq!(a, b);
}
}
+238 -82
View File
@@ -227,6 +227,92 @@ impl StreamErrorLogger {
} }
} }
/// Serialise every decoder's history into a single in-memory blob of length-
/// prefixed records (`[u8 type][u32 BE len][JSON payload]`). Returns the blob
/// and the number of records written. Callers chunk + gzip + send the blob
/// over the audio socket; tests inspect it directly.
fn build_history_blob(histories: &DecoderHistories) -> (Vec<u8>, usize) {
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
// reallocation for large histories.
let estimated_msgs = histories.estimated_total_count().min(500_000);
let mut blob: Vec<u8> = Vec::with_capacity(estimated_msgs.saturating_mul(256));
let mut count = 0usize;
macro_rules! push_history {
($msgs:expr, $variant:expr, $msg_type:expr) => {
for item in $msgs {
let wrapped = $variant(item);
if let Ok(json) = serde_json::to_vec(&wrapped) {
let len = json.len() as u32;
blob.push($msg_type);
blob.extend_from_slice(&len.to_be_bytes());
blob.extend_from_slice(&json);
count += 1;
}
}
};
}
push_history!(
histories.snapshot_ais_history(),
DecodedMessage::Ais,
AUDIO_MSG_AIS_DECODE
);
push_history!(
histories.snapshot_vdes_history(),
DecodedMessage::Vdes,
AUDIO_MSG_VDES_DECODE
);
push_history!(
histories.snapshot_aprs_history(),
DecodedMessage::Aprs,
AUDIO_MSG_APRS_DECODE
);
push_history!(
histories.snapshot_hf_aprs_history(),
DecodedMessage::HfAprs,
AUDIO_MSG_HF_APRS_DECODE
);
push_history!(
histories.snapshot_ft8_history(),
DecodedMessage::Ft8,
AUDIO_MSG_FT8_DECODE
);
push_history!(
histories.snapshot_ft4_history(),
DecodedMessage::Ft4,
AUDIO_MSG_FT4_DECODE
);
#[cfg(feature = "ft2")]
push_history!(
histories.snapshot_ft2_history(),
DecodedMessage::Ft2,
AUDIO_MSG_FT2_DECODE
);
push_history!(
histories.snapshot_wspr_history(),
DecodedMessage::Wspr,
AUDIO_MSG_WSPR_DECODE
);
push_history!(
histories.snapshot_cw_history(),
DecodedMessage::Cw,
AUDIO_MSG_CW_DECODE
);
push_history!(
histories.snapshot_lrpt_history(),
DecodedMessage::LrptImage,
AUDIO_MSG_LRPT_IMAGE
);
push_history!(
histories.snapshot_wefax_history(),
DecodedMessage::Wefax,
AUDIO_MSG_WEFAX_DECODE
);
(blob, count)
}
/// Walk a length-prefixed history blob and return the byte ranges of each /// Walk a length-prefixed history blob and return the byte ranges of each
/// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is /// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is
/// flushed when adding the next record would push it past `threshold`. A /// flushed when adding the next record would push it past `threshold`. A
@@ -3399,88 +3485,7 @@ async fn handle_audio_client(
// can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces; // can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces;
// chunking keeps each frame well under the limit while still amortising // chunking keeps each frame well under the limit while still amortising
// the gzip cost across many records per chunk. // the gzip cost across many records per chunk.
let history_blob = { let (blob, replayed_history_count) = build_history_blob(&histories);
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
// reallocation for large histories.
let estimated_msgs = histories.estimated_total_count().min(500_000);
let mut blob: Vec<u8> = Vec::with_capacity(estimated_msgs.saturating_mul(256));
let mut count = 0usize;
macro_rules! push_history {
($msgs:expr, $variant:expr, $msg_type:expr) => {
for item in $msgs {
let wrapped = $variant(item);
if let Ok(json) = serde_json::to_vec(&wrapped) {
let len = json.len() as u32;
blob.push($msg_type);
blob.extend_from_slice(&len.to_be_bytes());
blob.extend_from_slice(&json);
count += 1;
}
}
};
}
push_history!(
histories.snapshot_ais_history(),
DecodedMessage::Ais,
AUDIO_MSG_AIS_DECODE
);
push_history!(
histories.snapshot_vdes_history(),
DecodedMessage::Vdes,
AUDIO_MSG_VDES_DECODE
);
push_history!(
histories.snapshot_aprs_history(),
DecodedMessage::Aprs,
AUDIO_MSG_APRS_DECODE
);
push_history!(
histories.snapshot_hf_aprs_history(),
DecodedMessage::HfAprs,
AUDIO_MSG_HF_APRS_DECODE
);
push_history!(
histories.snapshot_ft8_history(),
DecodedMessage::Ft8,
AUDIO_MSG_FT8_DECODE
);
push_history!(
histories.snapshot_ft4_history(),
DecodedMessage::Ft4,
AUDIO_MSG_FT4_DECODE
);
#[cfg(feature = "ft2")]
push_history!(
histories.snapshot_ft2_history(),
DecodedMessage::Ft2,
AUDIO_MSG_FT2_DECODE
);
push_history!(
histories.snapshot_wspr_history(),
DecodedMessage::Wspr,
AUDIO_MSG_WSPR_DECODE
);
push_history!(
histories.snapshot_cw_history(),
DecodedMessage::Cw,
AUDIO_MSG_CW_DECODE
);
push_history!(
histories.snapshot_lrpt_history(),
DecodedMessage::LrptImage,
AUDIO_MSG_LRPT_IMAGE
);
push_history!(
histories.snapshot_wefax_history(),
DecodedMessage::Wefax,
AUDIO_MSG_WEFAX_DECODE
);
(blob, count)
};
let (blob, replayed_history_count) = history_blob;
if !blob.is_empty() { if !blob.is_empty() {
// Target uncompressed bytes per chunk. JSON compresses ~10-20x so // Target uncompressed bytes per chunk. JSON compresses ~10-20x so
// 8 MiB uncompressed lands well under the 16 MiB compressed cap even // 8 MiB uncompressed lands well under the 16 MiB compressed cap even
@@ -4673,4 +4678,155 @@ mod tests {
assert_eq!(snap_len, THREADS * PER_THREAD); assert_eq!(snap_len, THREADS * PER_THREAD);
assert_eq!(counter, THREADS * PER_THREAD); assert_eq!(counter, THREADS * PER_THREAD);
} }
// -------------------------------------------------------------------
// build_history_blob (integration with DecoderHistories + framing)
// -------------------------------------------------------------------
/// Walk a length-prefixed blob and return `(type, payload)` for every
/// record. Mirrors the parsing the client does on the wire.
fn parse_blob_records(blob: &[u8]) -> Vec<(u8, Vec<u8>)> {
let mut out = Vec::new();
let mut pos = 0usize;
while pos + 5 <= blob.len() {
let msg_type = blob[pos];
let len =
u32::from_be_bytes([blob[pos + 1], blob[pos + 2], blob[pos + 3], blob[pos + 4]])
as usize;
pos += 5;
assert!(pos + len <= blob.len(), "truncated blob");
out.push((msg_type, blob[pos..pos + len].to_vec()));
pos += len;
}
assert_eq!(pos, blob.len(), "trailing bytes in blob");
out
}
fn gunzip(data: &[u8]) -> Vec<u8> {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(data);
let mut out = Vec::new();
decoder.read_to_end(&mut out).expect("gunzip");
out
}
#[test]
fn build_history_blob_empty_when_no_records() {
let h = DecoderHistories::new();
let (blob, count) = build_history_blob(&h);
assert!(blob.is_empty());
assert_eq!(count, 0);
}
#[test]
fn build_history_blob_emits_records_with_correct_type_bytes() {
let h = DecoderHistories::new();
h.record_ais_message(sample_ais(101));
h.record_aprs_packet(sample_aprs(true));
h.record_cw_event(sample_cw("CQ TEST"));
h.record_ft8_message(sample_ft8(42));
h.record_ft4_message(sample_ft8(43));
let (blob, count) = build_history_blob(&h);
assert_eq!(count, 5);
let records = parse_blob_records(&blob);
let types: Vec<u8> = records.iter().map(|(t, _)| *t).collect();
// Decoder iteration order in build_history_blob is fixed:
// AIS, VDES, APRS, HF_APRS, FT8, FT4, [FT2], WSPR, CW, LRPT, WEFAX.
assert_eq!(
types,
vec![
AUDIO_MSG_AIS_DECODE,
AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_FT8_DECODE,
AUDIO_MSG_FT4_DECODE,
AUDIO_MSG_CW_DECODE,
]
);
// Each record body is a JSON-serialised DecodedMessage variant.
for (_, payload) in &records {
let msg: DecodedMessage = serde_json::from_slice(payload).expect("valid JSON");
// Smoke-check: every variant deserialises to one of the expected types.
match msg {
DecodedMessage::Ais(_)
| DecodedMessage::Aprs(_)
| DecodedMessage::Cw(_)
| DecodedMessage::Ft8(_)
| DecodedMessage::Ft4(_) => {}
other => panic!("unexpected variant: {:?}", other),
}
}
}
#[test]
fn build_history_blob_chunk_split_then_decompress_recovers_all_records() {
// End-to-end pipeline: build → split → gzip per chunk → decompress
// each chunk → reassembly must match the original blob byte-for-byte.
let h = DecoderHistories::new();
for i in 0..200 {
h.record_ft8_message(sample_ft8(i));
}
let (blob, count) = build_history_blob(&h);
assert_eq!(count, 200);
// Threshold small enough to force multiple chunks.
let threshold = blob.len() / 4;
let ranges = split_history_chunks(&blob, threshold);
assert!(
ranges.len() >= 2,
"expected multiple chunks, got {}",
ranges.len()
);
let mut reassembled: Vec<u8> = Vec::new();
for r in &ranges {
// Gzip then immediately decompress — proves both directions of
// the on-the-wire transformation are intact.
let slice = &blob[r.clone()];
let mut enc = GzEncoder::new(Vec::new(), Compression::fast());
enc.write_all(slice).unwrap();
let compressed = enc.finish().unwrap();
let decompressed = gunzip(&compressed);
assert_eq!(decompressed, slice);
reassembled.extend_from_slice(&decompressed);
}
assert_eq!(reassembled, blob);
// Each compressed chunk must fit under the wire-side history cap.
for r in &ranges {
let mut enc = GzEncoder::new(Vec::new(), Compression::fast());
enc.write_all(&blob[r.clone()]).unwrap();
let compressed = enc.finish().unwrap();
assert!(
(compressed.len() as u32) < 16 * 1024 * 1024,
"chunk over cap: {} bytes",
compressed.len()
);
}
}
#[test]
fn build_history_blob_skips_decoders_with_no_history() {
let h = DecoderHistories::new();
// Only one decoder gets a record.
h.record_aprs_packet(sample_aprs(true));
let (blob, count) = build_history_blob(&h);
assert_eq!(count, 1);
let records = parse_blob_records(&blob);
assert_eq!(records.len(), 1);
assert_eq!(records[0].0, AUDIO_MSG_APRS_DECODE);
}
#[test]
fn build_history_blob_count_matches_estimated_total_count() {
let h = DecoderHistories::new();
h.record_ais_message(sample_ais(1));
h.record_aprs_packet(sample_aprs(true));
h.record_cw_event(sample_cw("X"));
let (_, count) = build_history_blob(&h);
assert_eq!(count, h.estimated_total_count());
}
} }