[fix](trx-server): chunk audio history replay to stay under 16 MiB cap
The compressed history blob sent on each audio client connect could exceed the client's MAX_HISTORY_PAYLOAD_SIZE (16 MiB) once enough decoder records accumulated, causing the client to reject the frame, drop the connection, and reconnect — producing a 1 Hz reconnect storm. Walk the framed blob at message boundaries and emit one AUDIO_MSG_HISTORY_COMPRESSED per ~8 MiB uncompressed chunk. The split is wire-compatible: clients already loop on read_audio_msg and process each history frame independently. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
+61
-14
@@ -200,6 +200,41 @@ impl StreamErrorLogger {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Walk a length-prefixed history blob and return the byte ranges of each
|
||||||
|
/// chunk to send. Each record is `[u8 type][u32 BE len][payload]`. A chunk is
|
||||||
|
/// flushed when adding the next record would push it past `threshold`. A
|
||||||
|
/// single record larger than `threshold` becomes its own chunk — the wire-side
|
||||||
|
/// `MAX_HISTORY_PAYLOAD_SIZE` cap is generous enough for any realistic decode.
|
||||||
|
/// A malformed tail (truncated header or payload) is dropped silently; the
|
||||||
|
/// blob is self-built so this is defence in depth, not error reporting.
|
||||||
|
fn split_history_chunks(blob: &[u8], threshold: usize) -> Vec<std::ops::Range<usize>> {
|
||||||
|
let mut chunks = Vec::new();
|
||||||
|
let mut chunk_start = 0usize;
|
||||||
|
let mut pos = 0usize;
|
||||||
|
while pos < blob.len() {
|
||||||
|
let len_end = pos.saturating_add(5);
|
||||||
|
if len_end > blob.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let payload_len =
|
||||||
|
u32::from_be_bytes([blob[pos + 1], blob[pos + 2], blob[pos + 3], blob[pos + 4]])
|
||||||
|
as usize;
|
||||||
|
let msg_end = len_end.saturating_add(payload_len);
|
||||||
|
if msg_end > blob.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if pos > chunk_start && (msg_end - chunk_start) > threshold {
|
||||||
|
chunks.push(chunk_start..pos);
|
||||||
|
chunk_start = pos;
|
||||||
|
}
|
||||||
|
pos = msg_end;
|
||||||
|
}
|
||||||
|
if chunk_start < pos {
|
||||||
|
chunks.push(chunk_start..pos);
|
||||||
|
}
|
||||||
|
chunks
|
||||||
|
}
|
||||||
|
|
||||||
fn classify_stream_error(err: &str) -> &'static str {
|
fn classify_stream_error(err: &str) -> &'static str {
|
||||||
if err.contains("snd_pcm_poll_descriptors") || err.contains("alsa::poll() returned POLLERR") {
|
if err.contains("snd_pcm_poll_descriptors") || err.contains("alsa::poll() returned POLLERR") {
|
||||||
"alsa_poll_failure"
|
"alsa_poll_failure"
|
||||||
@@ -3392,9 +3427,11 @@ async fn handle_audio_client(
|
|||||||
|
|
||||||
let history_replay_started_at = Instant::now();
|
let history_replay_started_at = Instant::now();
|
||||||
|
|
||||||
// Serialize the entire history into a single in-memory blob so we can
|
// Build one in-memory blob of all history records, then split it into
|
||||||
// send it with one write_all + one flush, avoiding repeated partial
|
// chunks at message boundaries before gzipping. A single oversized blob
|
||||||
// flushes that dominate replay time for large histories.
|
// can exceed the 16 MiB MAX_HISTORY_PAYLOAD_SIZE the client enforces;
|
||||||
|
// chunking keeps each frame well under the limit while still amortising
|
||||||
|
// the gzip cost across many records per chunk.
|
||||||
let history_blob = {
|
let history_blob = {
|
||||||
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
|
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
|
||||||
// reallocation for large histories.
|
// reallocation for large histories.
|
||||||
@@ -3478,24 +3515,34 @@ async fn handle_audio_client(
|
|||||||
};
|
};
|
||||||
let (blob, replayed_history_count) = history_blob;
|
let (blob, replayed_history_count) = history_blob;
|
||||||
if !blob.is_empty() {
|
if !blob.is_empty() {
|
||||||
// Gzip-compress the blob before sending. JSON history compresses very
|
// Target uncompressed bytes per chunk. JSON compresses ~10-20x so
|
||||||
// well (~10-20x) so this dramatically reduces both transfer size and
|
// 8 MiB uncompressed lands well under the 16 MiB compressed cap even
|
||||||
// the time the client spends waiting for data.
|
// in adversarial cases (image-heavy LRPT/WEFAX history, low ratio).
|
||||||
let compressed = {
|
const CHUNK_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
|
||||||
let mut enc = GzEncoder::new(Vec::with_capacity(blob.len() / 8), Compression::fast());
|
|
||||||
enc.write_all(&blob)
|
let mut total_compressed = 0usize;
|
||||||
|
let mut chunks_sent = 0usize;
|
||||||
|
for range in split_history_chunks(&blob, CHUNK_THRESHOLD_BYTES) {
|
||||||
|
let slice = &blob[range];
|
||||||
|
let mut enc = GzEncoder::new(Vec::with_capacity(slice.len() / 8), Compression::fast());
|
||||||
|
let compressed = enc
|
||||||
|
.write_all(slice)
|
||||||
.and_then(|_| enc.finish())
|
.and_then(|_| enc.finish())
|
||||||
.unwrap_or(blob.clone())
|
.unwrap_or_else(|_| slice.to_vec());
|
||||||
};
|
|
||||||
write_audio_msg(&mut writer, AUDIO_MSG_HISTORY_COMPRESSED, &compressed).await?;
|
write_audio_msg(&mut writer, AUDIO_MSG_HISTORY_COMPRESSED, &compressed).await?;
|
||||||
|
total_compressed += compressed.len();
|
||||||
|
chunks_sent += 1;
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Audio client {} replayed {} history messages in {:?} ({} → {} bytes, {:.1}x)",
|
"Audio client {} replayed {} history messages in {} chunk(s) in {:?} ({} → {} bytes, {:.1}x)",
|
||||||
peer,
|
peer,
|
||||||
replayed_history_count,
|
replayed_history_count,
|
||||||
|
chunks_sent,
|
||||||
history_replay_started_at.elapsed(),
|
history_replay_started_at.elapsed(),
|
||||||
blob.len(),
|
blob.len(),
|
||||||
compressed.len(),
|
total_compressed,
|
||||||
blob.len() as f64 / compressed.len().max(1) as f64,
|
blob.len() as f64 / total_compressed.max(1) as f64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user