[fix](trx-server): optimize replay and stamp APRS history time

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-06 11:58:02 +01:00
parent e5b9fb7aca
commit 18d3a00c07
+67 -9
View File
@@ -12,6 +12,7 @@ use std::time::{Duration, Instant};
use bytes::Bytes;
use num_complex::Complex;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{error, info, warn};
@@ -19,9 +20,10 @@ use tracing::{error, info, warn};
use trx_ais::AisDecoder;
use trx_aprs::AprsDecoder;
use trx_core::audio::{
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO,
AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
read_audio_msg, write_audio_msg, write_audio_msg_buffered, AudioStreamInfo,
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE,
AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE,
AUDIO_MSG_WSPR_DECODE,
};
use trx_core::decode::{
AisMessage, AprsPacket, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
@@ -44,6 +46,14 @@ const FT8_SAMPLE_RATE: u32 = 12_000;
const DECODE_AUDIO_GATE_RMS: f32 = 2.5e-4;
const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1);
const HISTORY_REPLAY_FLUSH_INTERVAL: usize = 64;
fn current_timestamp_ms() -> i64 {
match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_millis() as i64,
Err(_) => 0,
}
}
struct StreamErrorLogger {
label: &'static str,
@@ -838,13 +848,16 @@ pub async fn run_aprs_decoder(
apply_decode_audio_gate(&mut mono);
was_active = true;
for pkt in decoder.process_samples(&mono) {
for mut pkt in decoder.process_samples(&mono) {
if let Some(logger) = decode_logs.as_ref() {
logger.log_aprs(&pkt);
}
if !pkt.crc_ok {
continue;
}
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
histories.record_aprs_packet(pkt.clone());
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
}
@@ -1604,13 +1617,23 @@ async fn handle_audio_client(
let info_json = serde_json::to_vec(&stream_info).map_err(std::io::Error::other)?;
write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?;
let history_replay_started_at = Instant::now();
let mut replayed_history_count = 0usize;
let mut pending_history_flush = 0usize;
// Send APRS history to newly connected client.
let history = histories.snapshot_ais_history();
for msg in history {
let msg = DecodedMessage::Ais(msg);
let msg_type = AUDIO_MSG_AIS_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg(&mut writer, msg_type, &json).await?;
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
let history = histories.snapshot_vdes_history();
@@ -1618,7 +1641,13 @@ async fn handle_audio_client(
let msg = DecodedMessage::Vdes(msg);
let msg_type = AUDIO_MSG_VDES_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg(&mut writer, msg_type, &json).await?;
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send APRS history to newly connected client.
@@ -1627,7 +1656,13 @@ async fn handle_audio_client(
let msg = DecodedMessage::Aprs(pkt);
let msg_type = AUDIO_MSG_APRS_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg(&mut writer, msg_type, &json).await?;
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send FT8 history to newly connected client.
@@ -1636,7 +1671,13 @@ async fn handle_audio_client(
let msg = DecodedMessage::Ft8(msg);
let msg_type = AUDIO_MSG_FT8_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg(&mut writer, msg_type, &json).await?;
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
// Send WSPR history to newly connected client.
@@ -1645,9 +1686,26 @@ async fn handle_audio_client(
let msg = DecodedMessage::Wspr(msg);
let msg_type = AUDIO_MSG_WSPR_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg(&mut writer, msg_type, &json).await?;
write_audio_msg_buffered(&mut writer, msg_type, &json).await?;
replayed_history_count += 1;
pending_history_flush += 1;
if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL {
writer.flush().await?;
pending_history_flush = 0;
}
}
}
if pending_history_flush > 0 {
writer.flush().await?;
}
if replayed_history_count > 0 {
info!(
"Audio client {} replayed {} history messages in {:?}",
peer,
replayed_history_count,
history_replay_started_at.elapsed()
);
}
// Spawn RX + decode forwarding task (shares the writer)
let mut rx_sub = rx_audio.subscribe();