diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index f8bb378..2aff68a 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -5,6 +5,9 @@ //! Audio capture, playback, and TCP streaming for trx-server. use std::net::SocketAddr; +use std::time::{Duration, Instant}; +use std::{collections::VecDeque, sync::Mutex}; +use std::sync::OnceLock; use bytes::Bytes; use tokio::net::{TcpListener, TcpStream}; @@ -15,12 +18,47 @@ use trx_core::audio::{ read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, }; -use trx_core::decode::DecodedMessage; +use trx_core::decode::{AprsPacket, DecodedMessage}; use trx_core::rig::state::{RigMode, RigState}; use crate::config::AudioConfig; use crate::decode; +const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); + +fn aprs_history() -> &'static Mutex> { + static HISTORY: OnceLock>> = OnceLock::new(); + HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) +} + +fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) { + let cutoff = Instant::now() - APRS_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } + } +} + +pub fn record_aprs_packet(pkt: AprsPacket) { + let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); + history.push_back((Instant::now(), pkt)); + prune_aprs_history(&mut history); +} + +pub fn snapshot_aprs_history() -> Vec { + let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); + prune_aprs_history(&mut history); + history.iter().map(|(_, pkt)| pkt.clone()).collect() +} + +pub fn clear_aprs_history() { + let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); + history.clear(); +} + /// Spawn the audio capture thread. /// /// Opens the configured input device via cpal, accumulates PCM samples into @@ -322,6 +360,7 @@ pub async fn run_aprs_decoder( }; for pkt in decoder.process_samples(&mono) { + record_aprs_packet(pkt.clone()); let _ = decode_tx.send(DecodedMessage::Aprs(pkt)); } } @@ -440,6 +479,16 @@ async fn handle_audio_client( .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; + // Send APRS history to newly connected client. + let history = snapshot_aprs_history(); + for pkt in history { + let msg = DecodedMessage::Aprs(pkt); + let msg_type = AUDIO_MSG_APRS_DECODE; + if let Ok(json) = serde_json::to_vec(&msg) { + write_audio_msg(&mut writer, msg_type, &json).await?; + } + } + // Spawn RX + decode forwarding task (shares the writer) let mut rx_sub = rx_audio.subscribe(); let mut decode_sub = decode_tx.subscribe(); diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index eeef560..8c8c27a 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -23,6 +23,7 @@ use trx_core::rig::state::{RigMode, RigSnapshot, RigState}; use trx_core::rig::{RigCat, RigControl, RigRxStatus, RigStatus, RigTxStatus}; use trx_core::{DynResult, RigError, RigResult}; +use crate::audio; use crate::error::is_invalid_bcd_error; /// Configuration for the rig task. @@ -360,6 +361,7 @@ async fn process_command( return snapshot_from(ctx.state); } RigCommand::ResetAprsDecoder => { + audio::clear_aprs_history(); ctx.state.aprs_decode_reset_seq += 1; let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state);