From 250300395f4ed015caf1dd7bf93d8a7866c2f382 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sun, 8 Mar 2026 18:39:50 +0100 Subject: [PATCH] [fix](trx-server): batch history replay into single TCP write Replace per-message write + flush-every-64 with a single in-memory blob that is sent via one write_all + one flush. Add estimated_total_count() to DecoderHistories for pre-allocation. Eliminates N/64 partial flushes and repeated small writes that dominated replay latency for large APRS histories. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- src/trx-server/src/audio.rs | 142 +++++++++++++----------------------- 1 file changed, 49 insertions(+), 93 deletions(-) diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index e375c5b..52f0a74 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -20,7 +20,7 @@ use tracing::{error, info, warn}; use trx_ais::AisDecoder; use trx_aprs::AprsDecoder; use trx_core::audio::{ - read_audio_msg, write_audio_msg, write_audio_msg_buffered, AudioStreamInfo, + read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, @@ -47,7 +47,6 @@ const FT8_SAMPLE_RATE: u32 = 12_000; const DECODE_AUDIO_GATE_RMS: f32 = 2.5e-4; const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60); const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1); -const HISTORY_REPLAY_FLUSH_INTERVAL: usize = 64; fn current_timestamp_ms() -> i64 { match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { @@ -346,6 +345,18 @@ impl DecoderHistories { .expect("wspr history mutex poisoned") .clear(); } + + /// Returns a quick (non-pruning) estimate of the total number of history + /// entries across all decoders, used for pre-allocating the replay blob. + pub fn estimated_total_count(&self) -> usize { + let ais = self.ais.lock().map(|h| h.len()).unwrap_or(0); + let vdes = self.vdes.lock().map(|h| h.len()).unwrap_or(0); + let aprs = self.aprs.lock().map(|h| h.len()).unwrap_or(0); + let cw = self.cw.lock().map(|h| h.len()).unwrap_or(0); + let ft8 = self.ft8.lock().map(|h| h.len()).unwrap_or(0); + let wspr = self.wspr.lock().map(|h| h.len()).unwrap_or(0); + ais + vdes + aprs + cw + ft8 + wspr + } } /// Spawn the audio capture thread. @@ -1670,99 +1681,44 @@ async fn handle_audio_client( write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; let history_replay_started_at = Instant::now(); - let mut replayed_history_count = 0usize; - let mut pending_history_flush = 0usize; - // Send APRS history to newly connected client. - let history = histories.snapshot_ais_history(); - for msg in history { - let msg = DecodedMessage::Ais(msg); - let msg_type = AUDIO_MSG_AIS_DECODE; - if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg_buffered(&mut writer, msg_type, &json).await?; - replayed_history_count += 1; - pending_history_flush += 1; - if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { - writer.flush().await?; - pending_history_flush = 0; - } + // Serialize the entire history into a single in-memory blob so we can + // send it with one write_all + one flush, avoiding repeated partial + // flushes that dominate replay time for large histories. + let history_blob = { + // Estimate ~256 bytes per message; pre-allocate to avoid repeated + // reallocation for large histories. + let estimated_msgs = histories.estimated_total_count(); + let mut blob: Vec = Vec::with_capacity(estimated_msgs.saturating_mul(256)); + let mut count = 0usize; + + macro_rules! push_history { + ($msgs:expr, $variant:expr, $msg_type:expr) => { + for item in $msgs { + let wrapped = $variant(item); + if let Ok(json) = serde_json::to_vec(&wrapped) { + let len = json.len() as u32; + blob.push($msg_type); + blob.extend_from_slice(&len.to_be_bytes()); + blob.extend_from_slice(&json); + count += 1; + } + } + }; } - } - let history = histories.snapshot_vdes_history(); - for msg in history { - let msg = DecodedMessage::Vdes(msg); - let msg_type = AUDIO_MSG_VDES_DECODE; - if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg_buffered(&mut writer, msg_type, &json).await?; - replayed_history_count += 1; - pending_history_flush += 1; - if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { - writer.flush().await?; - pending_history_flush = 0; - } - } - } - // Send APRS history to newly connected client. - let history = histories.snapshot_aprs_history(); - for pkt in history { - let msg = DecodedMessage::Aprs(pkt); - let msg_type = AUDIO_MSG_APRS_DECODE; - if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg_buffered(&mut writer, msg_type, &json).await?; - replayed_history_count += 1; - pending_history_flush += 1; - if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { - writer.flush().await?; - pending_history_flush = 0; - } - } - } - // Send FT8 history to newly connected client. - let history = histories.snapshot_ft8_history(); - for msg in history { - let msg = DecodedMessage::Ft8(msg); - let msg_type = AUDIO_MSG_FT8_DECODE; - if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg_buffered(&mut writer, msg_type, &json).await?; - replayed_history_count += 1; - pending_history_flush += 1; - if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { - writer.flush().await?; - pending_history_flush = 0; - } - } - } - // Send WSPR history to newly connected client. - let history = histories.snapshot_wspr_history(); - for msg in history { - let msg = DecodedMessage::Wspr(msg); - let msg_type = AUDIO_MSG_WSPR_DECODE; - if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg_buffered(&mut writer, msg_type, &json).await?; - replayed_history_count += 1; - pending_history_flush += 1; - if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { - writer.flush().await?; - pending_history_flush = 0; - } - } - } - // Send CW history to newly connected client. - let history = histories.snapshot_cw_history(); - for evt in history { - let msg = DecodedMessage::Cw(evt); - let msg_type = AUDIO_MSG_CW_DECODE; - if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg_buffered(&mut writer, msg_type, &json).await?; - replayed_history_count += 1; - pending_history_flush += 1; - if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { - writer.flush().await?; - pending_history_flush = 0; - } - } - } - if pending_history_flush > 0 { + + push_history!(histories.snapshot_ais_history(), DecodedMessage::Ais, AUDIO_MSG_AIS_DECODE); + push_history!(histories.snapshot_vdes_history(), DecodedMessage::Vdes, AUDIO_MSG_VDES_DECODE); + push_history!(histories.snapshot_aprs_history(), DecodedMessage::Aprs, AUDIO_MSG_APRS_DECODE); + push_history!(histories.snapshot_ft8_history(), DecodedMessage::Ft8, AUDIO_MSG_FT8_DECODE); + push_history!(histories.snapshot_wspr_history(), DecodedMessage::Wspr, AUDIO_MSG_WSPR_DECODE); + push_history!(histories.snapshot_cw_history(), DecodedMessage::Cw, AUDIO_MSG_CW_DECODE); + + (blob, count) + }; + let (blob, replayed_history_count) = history_blob; + if !blob.is_empty() { + writer.write_all(&blob).await?; writer.flush().await?; } if replayed_history_count > 0 {