diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 7e807e8..2f2a04e 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -12,6 +12,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use num_complex::Complex; +use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, watch}; use tracing::{error, info, warn}; @@ -19,9 +20,10 @@ use tracing::{error, info, warn}; use trx_ais::AisDecoder; use trx_aprs::AprsDecoder; use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, - AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, - AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + read_audio_msg, write_audio_msg, write_audio_msg_buffered, AudioStreamInfo, + AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, + AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, + AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::{ AisMessage, AprsPacket, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, @@ -44,6 +46,14 @@ const FT8_SAMPLE_RATE: u32 = 12_000; const DECODE_AUDIO_GATE_RMS: f32 = 2.5e-4; const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60); const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1); +const HISTORY_REPLAY_FLUSH_INTERVAL: usize = 64; + +fn current_timestamp_ms() -> i64 { + match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(dur) => dur.as_millis() as i64, + Err(_) => 0, + } +} struct StreamErrorLogger { label: &'static str, @@ -838,13 +848,16 @@ pub async fn run_aprs_decoder( apply_decode_audio_gate(&mut mono); was_active = true; - for pkt in decoder.process_samples(&mono) { + for mut pkt in decoder.process_samples(&mono) { if let Some(logger) = decode_logs.as_ref() { logger.log_aprs(&pkt); } if !pkt.crc_ok { continue; } + if pkt.ts_ms.is_none() { + pkt.ts_ms = Some(current_timestamp_ms()); + } histories.record_aprs_packet(pkt.clone()); let _ = decode_tx.send(DecodedMessage::Aprs(pkt)); } @@ -1604,13 +1617,23 @@ async fn handle_audio_client( let info_json = serde_json::to_vec(&stream_info).map_err(std::io::Error::other)?; write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; + let history_replay_started_at = Instant::now(); + let mut replayed_history_count = 0usize; + let mut pending_history_flush = 0usize; + // Send APRS history to newly connected client. let history = histories.snapshot_ais_history(); for msg in history { let msg = DecodedMessage::Ais(msg); let msg_type = AUDIO_MSG_AIS_DECODE; if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg(&mut writer, msg_type, &json).await?; + write_audio_msg_buffered(&mut writer, msg_type, &json).await?; + replayed_history_count += 1; + pending_history_flush += 1; + if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { + writer.flush().await?; + pending_history_flush = 0; + } } } let history = histories.snapshot_vdes_history(); @@ -1618,7 +1641,13 @@ async fn handle_audio_client( let msg = DecodedMessage::Vdes(msg); let msg_type = AUDIO_MSG_VDES_DECODE; if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg(&mut writer, msg_type, &json).await?; + write_audio_msg_buffered(&mut writer, msg_type, &json).await?; + replayed_history_count += 1; + pending_history_flush += 1; + if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { + writer.flush().await?; + pending_history_flush = 0; + } } } // Send APRS history to newly connected client. @@ -1627,7 +1656,13 @@ async fn handle_audio_client( let msg = DecodedMessage::Aprs(pkt); let msg_type = AUDIO_MSG_APRS_DECODE; if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg(&mut writer, msg_type, &json).await?; + write_audio_msg_buffered(&mut writer, msg_type, &json).await?; + replayed_history_count += 1; + pending_history_flush += 1; + if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { + writer.flush().await?; + pending_history_flush = 0; + } } } // Send FT8 history to newly connected client. @@ -1636,7 +1671,13 @@ async fn handle_audio_client( let msg = DecodedMessage::Ft8(msg); let msg_type = AUDIO_MSG_FT8_DECODE; if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg(&mut writer, msg_type, &json).await?; + write_audio_msg_buffered(&mut writer, msg_type, &json).await?; + replayed_history_count += 1; + pending_history_flush += 1; + if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { + writer.flush().await?; + pending_history_flush = 0; + } } } // Send WSPR history to newly connected client. @@ -1645,9 +1686,26 @@ async fn handle_audio_client( let msg = DecodedMessage::Wspr(msg); let msg_type = AUDIO_MSG_WSPR_DECODE; if let Ok(json) = serde_json::to_vec(&msg) { - write_audio_msg(&mut writer, msg_type, &json).await?; + write_audio_msg_buffered(&mut writer, msg_type, &json).await?; + replayed_history_count += 1; + pending_history_flush += 1; + if pending_history_flush >= HISTORY_REPLAY_FLUSH_INTERVAL { + writer.flush().await?; + pending_history_flush = 0; + } } } + if pending_history_flush > 0 { + writer.flush().await?; + } + if replayed_history_count > 0 { + info!( + "Audio client {} replayed {} history messages in {:?}", + peer, + replayed_history_count, + history_replay_started_at.elapsed() + ); + } // Spawn RX + decode forwarding task (shares the writer) let mut rx_sub = rx_audio.subscribe();