[feat](trx-server): retain aprs history for new clients
Co-authored-by: Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -5,6 +5,9 @@
|
||||
//! Audio capture, playback, and TCP streaming for trx-server.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{collections::VecDeque, sync::Mutex};
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
@@ -15,12 +18,47 @@ use trx_core::audio::{
|
||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE,
|
||||
AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
|
||||
};
|
||||
use trx_core::decode::DecodedMessage;
|
||||
use trx_core::decode::{AprsPacket, DecodedMessage};
|
||||
use trx_core::rig::state::{RigMode, RigState};
|
||||
|
||||
use crate::config::AudioConfig;
|
||||
use crate::decode;
|
||||
|
||||
const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
|
||||
fn aprs_history() -> &'static Mutex<VecDeque<(Instant, AprsPacket)>> {
|
||||
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, AprsPacket)>>> = OnceLock::new();
|
||||
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
|
||||
}
|
||||
|
||||
fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) {
|
||||
let cutoff = Instant::now() - APRS_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
history.pop_front();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_aprs_packet(pkt: AprsPacket) {
|
||||
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
|
||||
history.push_back((Instant::now(), pkt));
|
||||
prune_aprs_history(&mut history);
|
||||
}
|
||||
|
||||
pub fn snapshot_aprs_history() -> Vec<AprsPacket> {
|
||||
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
|
||||
prune_aprs_history(&mut history);
|
||||
history.iter().map(|(_, pkt)| pkt.clone()).collect()
|
||||
}
|
||||
|
||||
pub fn clear_aprs_history() {
|
||||
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
|
||||
history.clear();
|
||||
}
|
||||
|
||||
/// Spawn the audio capture thread.
|
||||
///
|
||||
/// Opens the configured input device via cpal, accumulates PCM samples into
|
||||
@@ -322,6 +360,7 @@ pub async fn run_aprs_decoder(
|
||||
};
|
||||
|
||||
for pkt in decoder.process_samples(&mono) {
|
||||
record_aprs_packet(pkt.clone());
|
||||
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
|
||||
}
|
||||
}
|
||||
@@ -440,6 +479,16 @@ async fn handle_audio_client(
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?;
|
||||
|
||||
// Send APRS history to newly connected client.
|
||||
let history = snapshot_aprs_history();
|
||||
for pkt in history {
|
||||
let msg = DecodedMessage::Aprs(pkt);
|
||||
let msg_type = AUDIO_MSG_APRS_DECODE;
|
||||
if let Ok(json) = serde_json::to_vec(&msg) {
|
||||
write_audio_msg(&mut writer, msg_type, &json).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn RX + decode forwarding task (shares the writer)
|
||||
let mut rx_sub = rx_audio.subscribe();
|
||||
let mut decode_sub = decode_tx.subscribe();
|
||||
|
||||
@@ -23,6 +23,7 @@ use trx_core::rig::state::{RigMode, RigSnapshot, RigState};
|
||||
use trx_core::rig::{RigCat, RigControl, RigRxStatus, RigStatus, RigTxStatus};
|
||||
use trx_core::{DynResult, RigError, RigResult};
|
||||
|
||||
use crate::audio;
|
||||
use crate::error::is_invalid_bcd_error;
|
||||
|
||||
/// Configuration for the rig task.
|
||||
@@ -360,6 +361,7 @@ async fn process_command(
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::ResetAprsDecoder => {
|
||||
audio::clear_aprs_history();
|
||||
ctx.state.aprs_decode_reset_seq += 1;
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
|
||||
Reference in New Issue
Block a user