[feat](trx-client): relay server-side decoded messages to frontends
Handle APRS/CW decode message types (0x03/0x04) in audio_client, deserialize and forward via broadcast channel. Create decode channel and pass to audio client and HTTP frontend. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -15,9 +15,10 @@ use tokio::time;
|
|||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use trx_core::audio::{
|
use trx_core::audio::{
|
||||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO,
|
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE,
|
||||||
AUDIO_MSG_TX_FRAME,
|
AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
|
||||||
};
|
};
|
||||||
|
use trx_core::decode::DecodedMessage;
|
||||||
|
|
||||||
/// Run the audio client with auto-reconnect.
|
/// Run the audio client with auto-reconnect.
|
||||||
pub async fn run_audio_client(
|
pub async fn run_audio_client(
|
||||||
@@ -25,6 +26,7 @@ pub async fn run_audio_client(
|
|||||||
rx_tx: broadcast::Sender<Bytes>,
|
rx_tx: broadcast::Sender<Bytes>,
|
||||||
mut tx_rx: mpsc::Receiver<Bytes>,
|
mut tx_rx: mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
||||||
|
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||||
) {
|
) {
|
||||||
let mut reconnect_delay = Duration::from_secs(1);
|
let mut reconnect_delay = Duration::from_secs(1);
|
||||||
|
|
||||||
@@ -34,7 +36,7 @@ pub async fn run_audio_client(
|
|||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
reconnect_delay = Duration::from_secs(1);
|
reconnect_delay = Duration::from_secs(1);
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx).await
|
handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx, &decode_tx).await
|
||||||
{
|
{
|
||||||
warn!("Audio connection dropped: {}", e);
|
warn!("Audio connection dropped: {}", e);
|
||||||
}
|
}
|
||||||
@@ -55,6 +57,7 @@ async fn handle_audio_connection(
|
|||||||
rx_tx: &broadcast::Sender<Bytes>,
|
rx_tx: &broadcast::Sender<Bytes>,
|
||||||
tx_rx: &mut mpsc::Receiver<Bytes>,
|
tx_rx: &mut mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
||||||
|
decode_tx: &broadcast::Sender<DecodedMessage>,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let (reader, writer) = stream.into_split();
|
let (reader, writer) = stream.into_split();
|
||||||
let mut reader = BufReader::new(reader);
|
let mut reader = BufReader::new(reader);
|
||||||
@@ -78,12 +81,18 @@ async fn handle_audio_connection(
|
|||||||
|
|
||||||
// Spawn RX read task
|
// Spawn RX read task
|
||||||
let rx_tx = rx_tx.clone();
|
let rx_tx = rx_tx.clone();
|
||||||
|
let decode_tx = decode_tx.clone();
|
||||||
let mut rx_handle = tokio::spawn(async move {
|
let mut rx_handle = tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match read_audio_msg(&mut reader).await {
|
match read_audio_msg(&mut reader).await {
|
||||||
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
|
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
|
||||||
let _ = rx_tx.send(Bytes::from(payload));
|
let _ = rx_tx.send(Bytes::from(payload));
|
||||||
}
|
}
|
||||||
|
Ok((AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE, payload)) => {
|
||||||
|
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(&payload) {
|
||||||
|
let _ = decode_tx.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok((msg_type, _)) => {
|
Ok((msg_type, _)) => {
|
||||||
warn!("Audio client: unexpected message type {}", msg_type);
|
warn!("Audio client: unexpected message type {}", msg_type);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,8 @@ use trx_core::rig::{RigControl, RigRxStatus, RigStatus, RigTxStatus};
|
|||||||
use trx_core::radio::freq::Freq;
|
use trx_core::radio::freq::Freq;
|
||||||
use trx_core::DynResult;
|
use trx_core::DynResult;
|
||||||
use trx_frontend::{is_frontend_registered, registered_frontends};
|
use trx_frontend::{is_frontend_registered, registered_frontends};
|
||||||
use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels};
|
use trx_core::decode::DecodedMessage;
|
||||||
|
use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels, set_decode_channel};
|
||||||
use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens};
|
use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens};
|
||||||
use trx_frontend_rigctl::register_frontend as register_rigctl_frontend;
|
use trx_frontend_rigctl::register_frontend as register_rigctl_frontend;
|
||||||
|
|
||||||
@@ -294,16 +295,19 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
|
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
|
||||||
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
|
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
|
||||||
let (stream_info_tx, stream_info_rx) = watch::channel::<Option<AudioStreamInfo>>(None);
|
let (stream_info_tx, stream_info_rx) = watch::channel::<Option<AudioStreamInfo>>(None);
|
||||||
|
let (decode_tx, _) = broadcast::channel::<DecodedMessage>(256);
|
||||||
|
|
||||||
let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port);
|
let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port);
|
||||||
|
|
||||||
set_audio_channels(rx_audio_tx.clone(), tx_audio_tx, stream_info_rx);
|
set_audio_channels(rx_audio_tx.clone(), tx_audio_tx, stream_info_rx);
|
||||||
|
set_decode_channel(decode_tx.clone());
|
||||||
|
|
||||||
tokio::spawn(audio_client::run_audio_client(
|
tokio::spawn(audio_client::run_audio_client(
|
||||||
audio_addr,
|
audio_addr,
|
||||||
rx_audio_tx,
|
rx_audio_tx,
|
||||||
tx_audio_rx,
|
tx_audio_rx,
|
||||||
stream_info_tx,
|
stream_info_tx,
|
||||||
|
decode_tx,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user