diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index dc8e89f..7f0061a 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -200,6 +200,41 @@ impl StreamErrorLogger { } } +/// Walk a length-prefixed history blob and return the byte ranges of each +/// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is +/// flushed when adding the next record would push it past `threshold`. A +/// single record larger than `threshold` becomes its own chunk — the wire-side +/// `MAX_HISTORY_PAYLOAD_SIZE` cap is generous enough for any realistic decode. +/// A malformed tail (truncated header or payload) is dropped silently; the +/// blob is self-built so this is defence in depth, not error reporting. +fn split_history_chunks(blob: &[u8], threshold: usize) -> Vec> { + let mut chunks = Vec::new(); + let mut chunk_start = 0usize; + let mut pos = 0usize; + while pos < blob.len() { + let len_end = pos.saturating_add(5); + if len_end > blob.len() { + break; + } + let payload_len = + u32::from_be_bytes([blob[pos + 1], blob[pos + 2], blob[pos + 3], blob[pos + 4]]) + as usize; + let msg_end = len_end.saturating_add(payload_len); + if msg_end > blob.len() { + break; + } + if pos > chunk_start && (msg_end - chunk_start) > threshold { + chunks.push(chunk_start..pos); + chunk_start = pos; + } + pos = msg_end; + } + if chunk_start < pos { + chunks.push(chunk_start..pos); + } + chunks +} + fn classify_stream_error(err: &str) -> &'static str { if err.contains("snd_pcm_poll_descriptors") || err.contains("alsa::poll() returned POLLERR") { "alsa_poll_failure" @@ -3392,9 +3427,11 @@ async fn handle_audio_client( let history_replay_started_at = Instant::now(); - // Serialize the entire history into a single in-memory blob so we can - // send it with one write_all + one flush, avoiding repeated partial - // flushes that dominate replay time for large histories. + // Build one in-memory blob of all history records, then split it into + // chunks at message boundaries before gzipping. A single oversized blob + // can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces; + // chunking keeps each frame well under the limit while still amortising + // the gzip cost across many records per chunk. let history_blob = { // Estimate ~256 bytes per message; pre-allocate to avoid repeated // reallocation for large histories. @@ -3478,24 +3515,34 @@ async fn handle_audio_client( }; let (blob, replayed_history_count) = history_blob; if !blob.is_empty() { - // Gzip-compress the blob before sending. JSON history compresses very - // well (~10-20x) so this dramatically reduces both transfer size and - // the time the client spends waiting for data. - let compressed = { - let mut enc = GzEncoder::new(Vec::with_capacity(blob.len() / 8), Compression::fast()); - enc.write_all(&blob) + // Target uncompressed bytes per chunk. JSON compresses ~10-20x so + // 8 MiB uncompressed lands well under the 16 MiB compressed cap even + // in adversarial cases (image-heavy LRPT/WEFAX history, low ratio). + const CHUNK_THRESHOLD_BYTES: usize = 8 * 1024 * 1024; + + let mut total_compressed = 0usize; + let mut chunks_sent = 0usize; + for range in split_history_chunks(&blob, CHUNK_THRESHOLD_BYTES) { + let slice = &blob[range]; + let mut enc = GzEncoder::new(Vec::with_capacity(slice.len() / 8), Compression::fast()); + let compressed = enc + .write_all(slice) .and_then(|_| enc.finish()) - .unwrap_or(blob.clone()) - }; - write_audio_msg(&mut writer, AUDIO_MSG_HISTORY_COMPRESSED, &compressed).await?; + .unwrap_or_else(|_| slice.to_vec()); + write_audio_msg(&mut writer, AUDIO_MSG_HISTORY_COMPRESSED, &compressed).await?; + total_compressed += compressed.len(); + chunks_sent += 1; + } + info!( - "Audio client {} replayed {} history messages in {:?} ({} → {} bytes, {:.1}x)", + "Audio client {} replayed {} history messages in {} chunk(s) in {:?} ({} → {} bytes, {:.1}x)", peer, replayed_history_count, + chunks_sent, history_replay_started_at.elapsed(), blob.len(), - compressed.len(), - blob.len() as f64 / compressed.len().max(1) as f64, + total_compressed, + blob.len() as f64 / total_compressed.max(1) as f64, ); }