[fix](trx-client): restore decode history replay
Write compressed audio-history replay directly into the local frontend history buffers so large APRS and AIS replays survive trx-client restart instead of overrunning the live decode broadcast channel. Verification: cargo test -p trx-client --no-run Verification: cargo test -p trx-frontend-http --no-run Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -54,6 +54,7 @@ pub async fn run_audio_client(
|
|||||||
mut tx_rx: mpsc::Receiver<Bytes>,
|
mut tx_rx: mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
||||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||||
|
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
|
||||||
mut shutdown_rx: watch::Receiver<bool>,
|
mut shutdown_rx: watch::Receiver<bool>,
|
||||||
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
|
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
|
||||||
mut vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>,
|
mut vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>,
|
||||||
@@ -97,6 +98,7 @@ pub async fn run_audio_client(
|
|||||||
&mut tx_rx,
|
&mut tx_rx,
|
||||||
&stream_info_tx,
|
&stream_info_tx,
|
||||||
&decode_tx,
|
&decode_tx,
|
||||||
|
replay_history_sink.clone(),
|
||||||
&mut shutdown_rx,
|
&mut shutdown_rx,
|
||||||
&vchan_audio,
|
&vchan_audio,
|
||||||
&mut vchan_cmd_rx,
|
&mut vchan_cmd_rx,
|
||||||
@@ -144,6 +146,7 @@ async fn handle_audio_connection(
|
|||||||
tx_rx: &mut mpsc::Receiver<Bytes>,
|
tx_rx: &mut mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
||||||
decode_tx: &broadcast::Sender<DecodedMessage>,
|
decode_tx: &broadcast::Sender<DecodedMessage>,
|
||||||
|
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
|
||||||
shutdown_rx: &mut watch::Receiver<bool>,
|
shutdown_rx: &mut watch::Receiver<bool>,
|
||||||
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
|
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
|
||||||
vchan_cmd_rx: &mut mpsc::UnboundedReceiver<VChanAudioCmd>,
|
vchan_cmd_rx: &mut mpsc::UnboundedReceiver<VChanAudioCmd>,
|
||||||
@@ -269,7 +272,9 @@ async fn handle_audio_connection(
|
|||||||
}
|
}
|
||||||
let json = &decompressed[pos..pos + len];
|
let json = &decompressed[pos..pos + len];
|
||||||
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(json) {
|
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(json) {
|
||||||
let _ = decode_tx.send(msg);
|
if let Some(ref sink) = replay_history_sink {
|
||||||
|
sink(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pos += len;
|
pos += len;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -306,6 +306,66 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
|
|
||||||
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
|
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
|
||||||
frontend_runtime.vchan_destroyed = Some(vchan_destroyed_tx.clone());
|
frontend_runtime.vchan_destroyed = Some(vchan_destroyed_tx.clone());
|
||||||
|
let ais_history = frontend_runtime.ais_history.clone();
|
||||||
|
let vdes_history = frontend_runtime.vdes_history.clone();
|
||||||
|
let aprs_history = frontend_runtime.aprs_history.clone();
|
||||||
|
let hf_aprs_history = frontend_runtime.hf_aprs_history.clone();
|
||||||
|
let cw_history = frontend_runtime.cw_history.clone();
|
||||||
|
let ft8_history = frontend_runtime.ft8_history.clone();
|
||||||
|
let wspr_history = frontend_runtime.wspr_history.clone();
|
||||||
|
let replay_history_sink: Arc<dyn Fn(DecodedMessage) + Send + Sync> =
|
||||||
|
Arc::new(move |msg| {
|
||||||
|
let now = std::time::Instant::now();
|
||||||
|
match msg {
|
||||||
|
DecodedMessage::Ais(mut message) => {
|
||||||
|
if message.ts_ms.is_none() {
|
||||||
|
message.ts_ms = Some(current_timestamp_ms());
|
||||||
|
}
|
||||||
|
if let Ok(mut history) = ais_history.lock() {
|
||||||
|
history.push_back((now, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DecodedMessage::Vdes(mut message) => {
|
||||||
|
if message.ts_ms.is_none() {
|
||||||
|
message.ts_ms = Some(current_timestamp_ms());
|
||||||
|
}
|
||||||
|
if let Ok(mut history) = vdes_history.lock() {
|
||||||
|
history.push_back((now, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DecodedMessage::Aprs(mut packet) => {
|
||||||
|
if packet.ts_ms.is_none() {
|
||||||
|
packet.ts_ms = Some(current_timestamp_ms());
|
||||||
|
}
|
||||||
|
if let Ok(mut history) = aprs_history.lock() {
|
||||||
|
history.push_back((now, packet));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DecodedMessage::HfAprs(mut packet) => {
|
||||||
|
if packet.ts_ms.is_none() {
|
||||||
|
packet.ts_ms = Some(current_timestamp_ms());
|
||||||
|
}
|
||||||
|
if let Ok(mut history) = hf_aprs_history.lock() {
|
||||||
|
history.push_back((now, packet));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DecodedMessage::Cw(event) => {
|
||||||
|
if let Ok(mut history) = cw_history.lock() {
|
||||||
|
history.push_back((now, event));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DecodedMessage::Ft8(message) => {
|
||||||
|
if let Ok(mut history) = ft8_history.lock() {
|
||||||
|
history.push_back((now, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DecodedMessage::Wspr(message) => {
|
||||||
|
if let Ok(mut history) = wspr_history.lock() {
|
||||||
|
history.push_back((now, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Audio enabled: default port {}, decode channel set",
|
"Audio enabled: default port {}, decode channel set",
|
||||||
@@ -325,6 +385,7 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
tx_audio_rx,
|
tx_audio_rx,
|
||||||
stream_info_tx,
|
stream_info_tx,
|
||||||
decode_tx,
|
decode_tx,
|
||||||
|
Some(replay_history_sink),
|
||||||
audio_shutdown_rx,
|
audio_shutdown_rx,
|
||||||
vchan_audio_map,
|
vchan_audio_map,
|
||||||
vchan_cmd_rx,
|
vchan_cmd_rx,
|
||||||
@@ -442,3 +503,11 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
request_tx: tx,
|
request_tx: tx,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn current_timestamp_ms() -> i64 {
|
||||||
|
let millis = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_millis();
|
||||||
|
i64::try_from(millis).unwrap_or(i64::MAX)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user