[fix](trx-server): batch history replay into single TCP write

Replace per-message write + flush-every-64 with a single in-memory blob
that is sent via one write_all + one flush. Add estimated_total_count()
to DecoderHistories for pre-allocation. Eliminates N/64 partial flushes
and repeated small writes that dominated replay latency for large APRS
histories.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-08 18:39:50 +01:00
parent 6c4853c88c
commit 250300395f
+49 -93
View File
@@ -20,7 +20,7 @@ use tracing::{error, info, warn};
use trx_ais::AisDecoder;
use trx_aprs::AprsDecoder;
use trx_core::audio::{
read_audio_msg, write_audio_msg, write_audio_msg_buffered, AudioStreamInfo,
read_audio_msg, write_audio_msg, AudioStreamInfo,
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE,
AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE,
AUDIO_MSG_WSPR_DECODE,
@@ -47,7 +47,6 @@ const FT8_SAMPLE_RATE: u32 = 12_000;
const DECODE_AUDIO_GATE_RMS: f32 = 2.5e-4;
const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1);
const HISTORY_REPLAY_FLUSH_INTERVAL: usize = 64;
fn current_timestamp_ms() -> i64 {
match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
@@ -346,6 +345,18 @@ impl DecoderHistories {
.expect("wspr history mutex poisoned")
.clear();
}
/// Returns a quick (non-pruning) estimate of the total number of history
/// entries across all decoders, used for pre-allocating the replay blob.
pub fn estimated_total_count(&self) -> usize {
let ais = self.ais.lock().map(|h| h.len()).unwrap_or(0);
let vdes = self.vdes.lock().map(|h| h.len()).unwrap_or(0);
let aprs = self.aprs.lock().map(|h| h.len()).unwrap_or(0);
let cw = self.cw.lock().map(|h| h.len()).unwrap_or(0);
let ft8 = self.ft8.lock().map(|h| h.len()).unwrap_or(0);
let wspr = self.wspr.lock().map(|h| h.len()).unwrap_or(0);
ais + vdes + aprs + cw + ft8 + wspr
}
}
/// Spawn the audio capture thread.
@@ -1670,99 +1681,44 @@ async fn handle_audio_client(
write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?;
let history_replay_started_at = Instant::now();
let mut replayed_history_count = 0usize;
let mut pending_history_flush = 0usize;
// Send APRS history to newly connected client.
let history = histories.snapshot_ais_history();
for msg in history {
let msg = DecodedMessage::Ais(msg);
let msg_type = AUDIO_MSG_AIS_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
// Serialize the entire history into a single in-memory blob so we can
// send it with one write_all + one flush, avoiding repeated partial
// flushes that dominate replay time for large histories.
let history_blob = {
// Estimate ~256 bytes per message; pre-allocate to avoid repeated
// reallocation for large histories.
let estimated_msgs = histories.estimated_total_count();
let mut blob: Vec<u8> = Vec::with_capacity(estimated_msgs.saturating_mul(256));
let mut count = 0usize;
macro_rules! push_history {
($msgs:expr, $variant:expr, $msg_type:expr) => {
for item in $msgs {
let wrapped = $variant(item);
if let Ok(json) = serde_json::to_vec(&wrapped) {
let len = json.len() as u32;
blob.push($msg_type);
blob.extend_from_slice(&len.to_be_bytes());
blob.extend_from_slice(&json);
count += 1;
}
}
};
}
}
let history = histories.snapshot_vdes_history();
for msg in history {
let msg = DecodedMessage::Vdes(msg);
let msg_type = AUDIO_MSG_VDES_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send APRS history to newly connected client.
let history = histories.snapshot_aprs_history();
for pkt in history {
let msg = DecodedMessage::Aprs(pkt);
let msg_type = AUDIO_MSG_APRS_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send FT8 history to newly connected client.
let history = histories.snapshot_ft8_history();
for msg in history {
let msg = DecodedMessage::Ft8(msg);
let msg_type = AUDIO_MSG_FT8_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send WSPR history to newly connected client.
let history = histories.snapshot_wspr_history();
for msg in history {
let msg = DecodedMessage::Wspr(msg);
let msg_type = AUDIO_MSG_WSPR_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send CW history to newly connected client.
let history = histories.snapshot_cw_history();
for evt in history {
let msg = DecodedMessage::Cw(evt);
let msg_type = AUDIO_MSG_CW_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
if pending_history_flush > 0 {
push_history!(histories.snapshot_ais_history(), DecodedMessage::Ais, AUDIO_MSG_AIS_DECODE);
push_history!(histories.snapshot_vdes_history(), DecodedMessage::Vdes, AUDIO_MSG_VDES_DECODE);
push_history!(histories.snapshot_aprs_history(), DecodedMessage::Aprs, AUDIO_MSG_APRS_DECODE);
push_history!(histories.snapshot_ft8_history(), DecodedMessage::Ft8, AUDIO_MSG_FT8_DECODE);
push_history!(histories.snapshot_wspr_history(), DecodedMessage::Wspr, AUDIO_MSG_WSPR_DECODE);
push_history!(histories.snapshot_cw_history(), DecodedMessage::Cw, AUDIO_MSG_CW_DECODE);
(blob, count)
};
let (blob, replayed_history_count) = history_blob;
if !blob.is_empty() {
writer.write_all(&blob).await?;
writer.flush().await?;
}
if replayed_history_count > 0 {