Compare commits
2 Commits
2e3c36f776
...
5919d4d4c0
| Author | SHA1 | Date | |
|---|---|---|---|
|
5919d4d4c0
|
|||
|
81ec8191a1
|
@@ -193,3 +193,213 @@ pub fn parse_vchan_uuid_msg(payload: &[u8]) -> std::io::Result<Uuid> {
|
||||
}
|
||||
Ok(Uuid::from_bytes(payload[..16].try_into().unwrap()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::BufReader;
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_then_read_round_trip_preserves_type_and_payload() {
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
let payload = b"hello, world";
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_FT8_DECODE, payload)
|
||||
.await
|
||||
.unwrap();
|
||||
// Wire bytes: 1 byte type + 4 bytes BE length + payload.
|
||||
assert_eq!(buf.len(), 1 + 4 + payload.len());
|
||||
assert_eq!(buf[0], AUDIO_MSG_FT8_DECODE);
|
||||
assert_eq!(
|
||||
u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]),
|
||||
payload.len() as u32
|
||||
);
|
||||
assert_eq!(&buf[5..], payload);
|
||||
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (msg_type, got) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!(msg_type, AUDIO_MSG_FT8_DECODE);
|
||||
assert_eq!(got, payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_then_read_handles_empty_payload() {
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_RX_FRAME, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (msg_type, got) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!(msg_type, AUDIO_MSG_RX_FRAME);
|
||||
assert!(got.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_audio_msg_decodes_consecutive_frames() {
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_FT8_DECODE, b"a")
|
||||
.await
|
||||
.unwrap();
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_FT4_DECODE, b"bb")
|
||||
.await
|
||||
.unwrap();
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_AIS_DECODE, b"ccc")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (t, p) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!((t, p.as_slice()), (AUDIO_MSG_FT8_DECODE, b"a".as_slice()));
|
||||
let (t, p) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!((t, p.as_slice()), (AUDIO_MSG_FT4_DECODE, b"bb".as_slice()));
|
||||
let (t, p) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!((t, p.as_slice()), (AUDIO_MSG_AIS_DECODE, b"ccc".as_slice()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_audio_msg_eof_before_header_returns_unexpected_eof() {
|
||||
let buf: Vec<u8> = Vec::new();
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let err = read_audio_msg(&mut reader).await.unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_audio_msg_eof_mid_payload_returns_unexpected_eof() {
|
||||
// Header claims 16 bytes; only 4 follow.
|
||||
let mut buf: Vec<u8> = vec![AUDIO_MSG_FT8_DECODE];
|
||||
buf.extend_from_slice(&16u32.to_be_bytes());
|
||||
buf.extend_from_slice(&[0xAA; 4]);
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let err = read_audio_msg(&mut reader).await.unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_audio_msg_rejects_oversize_normal_frame() {
|
||||
// Type ≠ HISTORY_COMPRESSED → cap is MAX_PAYLOAD_SIZE (1 MiB). Claim
|
||||
// 2 MiB and leave the body absent — we should fail before reading.
|
||||
let mut buf: Vec<u8> = vec![AUDIO_MSG_FT8_DECODE];
|
||||
buf.extend_from_slice(&(2 * 1024 * 1024u32).to_be_bytes());
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let err = read_audio_msg(&mut reader).await.unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_audio_msg_history_frame_allows_larger_payload() {
|
||||
// 2 MiB exceeds MAX_PAYLOAD_SIZE but is below MAX_HISTORY_PAYLOAD_SIZE.
|
||||
// Reading should succeed when the type is HISTORY_COMPRESSED.
|
||||
let payload = vec![0xCDu8; 2 * 1024 * 1024];
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(5 + payload.len());
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_HISTORY_COMPRESSED, &payload)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (msg_type, got) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!(msg_type, AUDIO_MSG_HISTORY_COMPRESSED);
|
||||
assert_eq!(got.len(), payload.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_audio_msg_history_frame_rejects_above_history_cap() {
|
||||
// Claim 32 MiB, body absent. Should still fail (cap enforced even for
|
||||
// history-compressed type).
|
||||
let mut buf: Vec<u8> = vec![AUDIO_MSG_HISTORY_COMPRESSED];
|
||||
buf.extend_from_slice(&(32 * 1024 * 1024u32).to_be_bytes());
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let err = read_audio_msg(&mut reader).await.unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn vchan_uuid_msg_round_trip() {
|
||||
let uuid = Uuid::new_v4();
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
write_vchan_uuid_msg(&mut buf, AUDIO_MSG_VCHAN_SUB, uuid)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (msg_type, payload) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!(msg_type, AUDIO_MSG_VCHAN_SUB);
|
||||
let got = parse_vchan_uuid_msg(&payload).unwrap();
|
||||
assert_eq!(got, uuid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_vchan_uuid_msg_rejects_short_payload() {
|
||||
let err = parse_vchan_uuid_msg(&[0u8; 8]).unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
|
||||
// Empty payload also rejected.
|
||||
let err = parse_vchan_uuid_msg(&[]).unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn vchan_audio_frame_round_trip() {
|
||||
let uuid = Uuid::new_v4();
|
||||
let opus = b"\x80\x81\x82 fake opus payload";
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
write_vchan_audio_frame(&mut buf, uuid, opus).await.unwrap();
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (msg_type, payload) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!(msg_type, AUDIO_MSG_RX_FRAME_CH);
|
||||
let (got_uuid, got_opus) = parse_vchan_audio_frame(&payload).unwrap();
|
||||
assert_eq!(got_uuid, uuid);
|
||||
assert_eq!(got_opus, opus);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_vchan_audio_frame_rejects_short_payload() {
|
||||
let err = parse_vchan_audio_frame(&[0u8; 8]).unwrap_err();
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_vchan_audio_frame_handles_empty_opus() {
|
||||
// Exactly 16 bytes: UUID with no opus payload.
|
||||
let uuid = Uuid::new_v4();
|
||||
let payload = uuid.as_bytes().to_vec();
|
||||
let (got_uuid, got_opus) = parse_vchan_audio_frame(&payload).unwrap();
|
||||
assert_eq!(got_uuid, uuid);
|
||||
assert!(got_opus.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn audio_stream_info_serialises_round_trip() {
|
||||
let info = AudioStreamInfo {
|
||||
sample_rate: 48_000,
|
||||
channels: 2,
|
||||
frame_duration_ms: 20,
|
||||
bitrate_bps: 64_000,
|
||||
};
|
||||
let json = serde_json::to_vec(&info).unwrap();
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
write_audio_msg(&mut buf, AUDIO_MSG_STREAM_INFO, &json)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut reader = BufReader::new(&buf[..]);
|
||||
let (msg_type, payload) = read_audio_msg(&mut reader).await.unwrap();
|
||||
assert_eq!(msg_type, AUDIO_MSG_STREAM_INFO);
|
||||
let parsed: AudioStreamInfo = serde_json::from_slice(&payload).unwrap();
|
||||
assert_eq!(parsed.sample_rate, info.sample_rate);
|
||||
assert_eq!(parsed.channels, info.channels);
|
||||
assert_eq!(parsed.frame_duration_ms, info.frame_duration_ms);
|
||||
assert_eq!(parsed.bitrate_bps, info.bitrate_bps);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_audio_msg_buffered_does_not_flush() {
|
||||
// Smoke test that the buffered variant produces equivalent bytes to
|
||||
// the flushed variant — Vec<u8> doesn't actually "buffer" so this is
|
||||
// mostly a behavioural check that no extra padding/headers slip in.
|
||||
let mut a: Vec<u8> = Vec::new();
|
||||
let mut b: Vec<u8> = Vec::new();
|
||||
write_audio_msg_buffered(&mut a, AUDIO_MSG_FT8_DECODE, b"x")
|
||||
.await
|
||||
.unwrap();
|
||||
write_audio_msg(&mut b, AUDIO_MSG_FT8_DECODE, b"x")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(a, b);
|
||||
}
|
||||
}
|
||||
|
||||
+238
-82
@@ -227,6 +227,92 @@ impl StreamErrorLogger {
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialise every decoder's history into a single in-memory blob of length-
|
||||
/// prefixed records (`[u8 type][u32 BE len][JSON payload]`). Returns the blob
|
||||
/// and the number of records written. Callers chunk + gzip + send the blob
|
||||
/// over the audio socket; tests inspect it directly.
|
||||
fn build_history_blob(histories: &DecoderHistories) -> (Vec<u8>, usize) {
|
||||
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
|
||||
// reallocation for large histories.
|
||||
let estimated_msgs = histories.estimated_total_count().min(500_000);
|
||||
let mut blob: Vec<u8> = Vec::with_capacity(estimated_msgs.saturating_mul(256));
|
||||
let mut count = 0usize;
|
||||
|
||||
macro_rules! push_history {
|
||||
($msgs:expr, $variant:expr, $msg_type:expr) => {
|
||||
for item in $msgs {
|
||||
let wrapped = $variant(item);
|
||||
if let Ok(json) = serde_json::to_vec(&wrapped) {
|
||||
let len = json.len() as u32;
|
||||
blob.push($msg_type);
|
||||
blob.extend_from_slice(&len.to_be_bytes());
|
||||
blob.extend_from_slice(&json);
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
push_history!(
|
||||
histories.snapshot_ais_history(),
|
||||
DecodedMessage::Ais,
|
||||
AUDIO_MSG_AIS_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_vdes_history(),
|
||||
DecodedMessage::Vdes,
|
||||
AUDIO_MSG_VDES_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_aprs_history(),
|
||||
DecodedMessage::Aprs,
|
||||
AUDIO_MSG_APRS_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_hf_aprs_history(),
|
||||
DecodedMessage::HfAprs,
|
||||
AUDIO_MSG_HF_APRS_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_ft8_history(),
|
||||
DecodedMessage::Ft8,
|
||||
AUDIO_MSG_FT8_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_ft4_history(),
|
||||
DecodedMessage::Ft4,
|
||||
AUDIO_MSG_FT4_DECODE
|
||||
);
|
||||
#[cfg(feature = "ft2")]
|
||||
push_history!(
|
||||
histories.snapshot_ft2_history(),
|
||||
DecodedMessage::Ft2,
|
||||
AUDIO_MSG_FT2_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_wspr_history(),
|
||||
DecodedMessage::Wspr,
|
||||
AUDIO_MSG_WSPR_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_cw_history(),
|
||||
DecodedMessage::Cw,
|
||||
AUDIO_MSG_CW_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_lrpt_history(),
|
||||
DecodedMessage::LrptImage,
|
||||
AUDIO_MSG_LRPT_IMAGE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_wefax_history(),
|
||||
DecodedMessage::Wefax,
|
||||
AUDIO_MSG_WEFAX_DECODE
|
||||
);
|
||||
|
||||
(blob, count)
|
||||
}
|
||||
|
||||
/// Walk a length-prefixed history blob and return the byte ranges of each
|
||||
/// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is
|
||||
/// flushed when adding the next record would push it past `threshold`. A
|
||||
@@ -3399,88 +3485,7 @@ async fn handle_audio_client(
|
||||
// can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces;
|
||||
// chunking keeps each frame well under the limit while still amortising
|
||||
// the gzip cost across many records per chunk.
|
||||
let history_blob = {
|
||||
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
|
||||
// reallocation for large histories.
|
||||
let estimated_msgs = histories.estimated_total_count().min(500_000);
|
||||
let mut blob: Vec<u8> = Vec::with_capacity(estimated_msgs.saturating_mul(256));
|
||||
let mut count = 0usize;
|
||||
|
||||
macro_rules! push_history {
|
||||
($msgs:expr, $variant:expr, $msg_type:expr) => {
|
||||
for item in $msgs {
|
||||
let wrapped = $variant(item);
|
||||
if let Ok(json) = serde_json::to_vec(&wrapped) {
|
||||
let len = json.len() as u32;
|
||||
blob.push($msg_type);
|
||||
blob.extend_from_slice(&len.to_be_bytes());
|
||||
blob.extend_from_slice(&json);
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
push_history!(
|
||||
histories.snapshot_ais_history(),
|
||||
DecodedMessage::Ais,
|
||||
AUDIO_MSG_AIS_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_vdes_history(),
|
||||
DecodedMessage::Vdes,
|
||||
AUDIO_MSG_VDES_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_aprs_history(),
|
||||
DecodedMessage::Aprs,
|
||||
AUDIO_MSG_APRS_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_hf_aprs_history(),
|
||||
DecodedMessage::HfAprs,
|
||||
AUDIO_MSG_HF_APRS_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_ft8_history(),
|
||||
DecodedMessage::Ft8,
|
||||
AUDIO_MSG_FT8_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_ft4_history(),
|
||||
DecodedMessage::Ft4,
|
||||
AUDIO_MSG_FT4_DECODE
|
||||
);
|
||||
#[cfg(feature = "ft2")]
|
||||
push_history!(
|
||||
histories.snapshot_ft2_history(),
|
||||
DecodedMessage::Ft2,
|
||||
AUDIO_MSG_FT2_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_wspr_history(),
|
||||
DecodedMessage::Wspr,
|
||||
AUDIO_MSG_WSPR_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_cw_history(),
|
||||
DecodedMessage::Cw,
|
||||
AUDIO_MSG_CW_DECODE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_lrpt_history(),
|
||||
DecodedMessage::LrptImage,
|
||||
AUDIO_MSG_LRPT_IMAGE
|
||||
);
|
||||
push_history!(
|
||||
histories.snapshot_wefax_history(),
|
||||
DecodedMessage::Wefax,
|
||||
AUDIO_MSG_WEFAX_DECODE
|
||||
);
|
||||
|
||||
(blob, count)
|
||||
};
|
||||
let (blob, replayed_history_count) = history_blob;
|
||||
let (blob, replayed_history_count) = build_history_blob(&histories);
|
||||
if !blob.is_empty() {
|
||||
// Target uncompressed bytes per chunk. JSON compresses ~10-20x so
|
||||
// 8 MiB uncompressed lands well under the 16 MiB compressed cap even
|
||||
@@ -4673,4 +4678,155 @@ mod tests {
|
||||
assert_eq!(snap_len, THREADS * PER_THREAD);
|
||||
assert_eq!(counter, THREADS * PER_THREAD);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
// build_history_blob (integration with DecoderHistories + framing)
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
/// Walk a length-prefixed blob and return `(type, payload)` for every
|
||||
/// record. Mirrors the parsing the client does on the wire.
|
||||
fn parse_blob_records(blob: &[u8]) -> Vec<(u8, Vec<u8>)> {
|
||||
let mut out = Vec::new();
|
||||
let mut pos = 0usize;
|
||||
while pos + 5 <= blob.len() {
|
||||
let msg_type = blob[pos];
|
||||
let len =
|
||||
u32::from_be_bytes([blob[pos + 1], blob[pos + 2], blob[pos + 3], blob[pos + 4]])
|
||||
as usize;
|
||||
pos += 5;
|
||||
assert!(pos + len <= blob.len(), "truncated blob");
|
||||
out.push((msg_type, blob[pos..pos + len].to_vec()));
|
||||
pos += len;
|
||||
}
|
||||
assert_eq!(pos, blob.len(), "trailing bytes in blob");
|
||||
out
|
||||
}
|
||||
|
||||
fn gunzip(data: &[u8]) -> Vec<u8> {
|
||||
use flate2::read::GzDecoder;
|
||||
use std::io::Read;
|
||||
let mut decoder = GzDecoder::new(data);
|
||||
let mut out = Vec::new();
|
||||
decoder.read_to_end(&mut out).expect("gunzip");
|
||||
out
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_history_blob_empty_when_no_records() {
|
||||
let h = DecoderHistories::new();
|
||||
let (blob, count) = build_history_blob(&h);
|
||||
assert!(blob.is_empty());
|
||||
assert_eq!(count, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_history_blob_emits_records_with_correct_type_bytes() {
|
||||
let h = DecoderHistories::new();
|
||||
h.record_ais_message(sample_ais(101));
|
||||
h.record_aprs_packet(sample_aprs(true));
|
||||
h.record_cw_event(sample_cw("CQ TEST"));
|
||||
h.record_ft8_message(sample_ft8(42));
|
||||
h.record_ft4_message(sample_ft8(43));
|
||||
|
||||
let (blob, count) = build_history_blob(&h);
|
||||
assert_eq!(count, 5);
|
||||
|
||||
let records = parse_blob_records(&blob);
|
||||
let types: Vec<u8> = records.iter().map(|(t, _)| *t).collect();
|
||||
// Decoder iteration order in build_history_blob is fixed:
|
||||
// AIS, VDES, APRS, HF_APRS, FT8, FT4, [FT2], WSPR, CW, LRPT, WEFAX.
|
||||
assert_eq!(
|
||||
types,
|
||||
vec![
|
||||
AUDIO_MSG_AIS_DECODE,
|
||||
AUDIO_MSG_APRS_DECODE,
|
||||
AUDIO_MSG_FT8_DECODE,
|
||||
AUDIO_MSG_FT4_DECODE,
|
||||
AUDIO_MSG_CW_DECODE,
|
||||
]
|
||||
);
|
||||
|
||||
// Each record body is a JSON-serialised DecodedMessage variant.
|
||||
for (_, payload) in &records {
|
||||
let msg: DecodedMessage = serde_json::from_slice(payload).expect("valid JSON");
|
||||
// Smoke-check: every variant deserialises to one of the expected types.
|
||||
match msg {
|
||||
DecodedMessage::Ais(_)
|
||||
| DecodedMessage::Aprs(_)
|
||||
| DecodedMessage::Cw(_)
|
||||
| DecodedMessage::Ft8(_)
|
||||
| DecodedMessage::Ft4(_) => {}
|
||||
other => panic!("unexpected variant: {:?}", other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_history_blob_chunk_split_then_decompress_recovers_all_records() {
|
||||
// End-to-end pipeline: build → split → gzip per chunk → decompress
|
||||
// each chunk → reassembly must match the original blob byte-for-byte.
|
||||
let h = DecoderHistories::new();
|
||||
for i in 0..200 {
|
||||
h.record_ft8_message(sample_ft8(i));
|
||||
}
|
||||
let (blob, count) = build_history_blob(&h);
|
||||
assert_eq!(count, 200);
|
||||
|
||||
// Threshold small enough to force multiple chunks.
|
||||
let threshold = blob.len() / 4;
|
||||
let ranges = split_history_chunks(&blob, threshold);
|
||||
assert!(
|
||||
ranges.len() >= 2,
|
||||
"expected multiple chunks, got {}",
|
||||
ranges.len()
|
||||
);
|
||||
|
||||
let mut reassembled: Vec<u8> = Vec::new();
|
||||
for r in &ranges {
|
||||
// Gzip then immediately decompress — proves both directions of
|
||||
// the on-the-wire transformation are intact.
|
||||
let slice = &blob[r.clone()];
|
||||
let mut enc = GzEncoder::new(Vec::new(), Compression::fast());
|
||||
enc.write_all(slice).unwrap();
|
||||
let compressed = enc.finish().unwrap();
|
||||
let decompressed = gunzip(&compressed);
|
||||
assert_eq!(decompressed, slice);
|
||||
reassembled.extend_from_slice(&decompressed);
|
||||
}
|
||||
assert_eq!(reassembled, blob);
|
||||
|
||||
// Each compressed chunk must fit under the wire-side history cap.
|
||||
for r in &ranges {
|
||||
let mut enc = GzEncoder::new(Vec::new(), Compression::fast());
|
||||
enc.write_all(&blob[r.clone()]).unwrap();
|
||||
let compressed = enc.finish().unwrap();
|
||||
assert!(
|
||||
(compressed.len() as u32) < 16 * 1024 * 1024,
|
||||
"chunk over cap: {} bytes",
|
||||
compressed.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_history_blob_skips_decoders_with_no_history() {
|
||||
let h = DecoderHistories::new();
|
||||
// Only one decoder gets a record.
|
||||
h.record_aprs_packet(sample_aprs(true));
|
||||
let (blob, count) = build_history_blob(&h);
|
||||
assert_eq!(count, 1);
|
||||
let records = parse_blob_records(&blob);
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].0, AUDIO_MSG_APRS_DECODE);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_history_blob_count_matches_estimated_total_count() {
|
||||
let h = DecoderHistories::new();
|
||||
h.record_ais_message(sample_ais(1));
|
||||
h.record_aprs_packet(sample_aprs(true));
|
||||
h.record_cw_event(sample_cw("X"));
|
||||
let (_, count) = build_history_blob(&h);
|
||||
assert_eq!(count, h.estimated_total_count());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user