From 50e1c447220202e7af6fc0c247deccb79c11a851 Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Sun, 8 Feb 2026 22:28:48 +0100 Subject: [PATCH] [feat](trx-client): relay server-side decoded messages to frontends Handle APRS/CW decode message types (0x03/0x04) in audio_client, deserialize and forward via broadcast channel. Create decode channel and pass to audio client and HTTP frontend. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Stanislaw Grams --- src/trx-client/src/audio_client.rs | 15 ++++++++++++--- src/trx-client/src/main.rs | 6 +++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index da496e2..c74259a 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -15,9 +15,10 @@ use tokio::time; use tracing::{info, warn}; use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, - AUDIO_MSG_TX_FRAME, + read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, + AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, }; +use trx_core::decode::DecodedMessage; /// Run the audio client with auto-reconnect. pub async fn run_audio_client( @@ -25,6 +26,7 @@ pub async fn run_audio_client( rx_tx: broadcast::Sender, mut tx_rx: mpsc::Receiver, stream_info_tx: watch::Sender>, + decode_tx: broadcast::Sender, ) { let mut reconnect_delay = Duration::from_secs(1); @@ -34,7 +36,7 @@ pub async fn run_audio_client( Ok(stream) => { reconnect_delay = Duration::from_secs(1); if let Err(e) = - handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx).await + handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx, &decode_tx).await { warn!("Audio connection dropped: {}", e); } @@ -55,6 +57,7 @@ async fn handle_audio_connection( rx_tx: &broadcast::Sender, tx_rx: &mut mpsc::Receiver, stream_info_tx: &watch::Sender>, + decode_tx: &broadcast::Sender, ) -> std::io::Result<()> { let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); @@ -78,12 +81,18 @@ async fn handle_audio_connection( // Spawn RX read task let rx_tx = rx_tx.clone(); + let decode_tx = decode_tx.clone(); let mut rx_handle = tokio::spawn(async move { loop { match read_audio_msg(&mut reader).await { Ok((AUDIO_MSG_RX_FRAME, payload)) => { let _ = rx_tx.send(Bytes::from(payload)); } + Ok((AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE, payload)) => { + if let Ok(msg) = serde_json::from_slice::(&payload) { + let _ = decode_tx.send(msg); + } + } Ok((msg_type, _)) => { warn!("Audio client: unexpected message type {}", msg_type); } diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index f39ee4e..e90f894 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -25,7 +25,8 @@ use trx_core::rig::{RigControl, RigRxStatus, RigStatus, RigTxStatus}; use trx_core::radio::freq::Freq; use trx_core::DynResult; use trx_frontend::{is_frontend_registered, registered_frontends}; -use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels}; +use trx_core::decode::DecodedMessage; +use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels, set_decode_channel}; use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens}; use trx_frontend_rigctl::register_frontend as register_rigctl_frontend; @@ -294,16 +295,19 @@ async fn async_init() -> DynResult { let (rx_audio_tx, _) = broadcast::channel::(256); let (tx_audio_tx, tx_audio_rx) = mpsc::channel::(64); let (stream_info_tx, stream_info_rx) = watch::channel::>(None); + let (decode_tx, _) = broadcast::channel::(256); let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port); set_audio_channels(rx_audio_tx.clone(), tx_audio_tx, stream_info_rx); + set_decode_channel(decode_tx.clone()); tokio::spawn(audio_client::run_audio_client( audio_addr, rx_audio_tx, tx_audio_rx, stream_info_tx, + decode_tx, )); }