[feat](trx-client): add audio TCP proxy with auto-reconnect
Connect to the server audio TCP port, relay RX Opus frames into a broadcast channel and TX frames from an mpsc channel. Pass audio channels to the HTTP frontend via set_audio_channels. Reconnects with exponential backoff on disconnect. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -17,6 +17,7 @@ tracing-subscriber = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
dirs = "6"
|
||||
libloading = "0.8"
|
||||
bytes = "1"
|
||||
trx-core = { path = "../trx-core" }
|
||||
trx-frontend = { path = "trx-frontend" }
|
||||
trx-frontend-http = { path = "trx-frontend/trx-frontend-http" }
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
//! Audio TCP client that connects to the server's audio port and relays
|
||||
//! RX/TX Opus frames via broadcast/mpsc channels.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use tokio::time;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use trx_core::audio::{
|
||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO,
|
||||
AUDIO_MSG_TX_FRAME,
|
||||
};
|
||||
|
||||
/// Run the audio client with auto-reconnect.
|
||||
pub async fn run_audio_client(
|
||||
server_addr: String,
|
||||
rx_tx: broadcast::Sender<Bytes>,
|
||||
mut tx_rx: mpsc::Receiver<Bytes>,
|
||||
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
||||
) {
|
||||
let mut reconnect_delay = Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
info!("Audio client: connecting to {}", server_addr);
|
||||
match TcpStream::connect(&server_addr).await {
|
||||
Ok(stream) => {
|
||||
reconnect_delay = Duration::from_secs(1);
|
||||
if let Err(e) =
|
||||
handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx).await
|
||||
{
|
||||
warn!("Audio connection dropped: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Audio connect failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
let _ = stream_info_tx.send(None);
|
||||
time::sleep(reconnect_delay).await;
|
||||
reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10));
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_audio_connection(
|
||||
stream: TcpStream,
|
||||
rx_tx: &broadcast::Sender<Bytes>,
|
||||
tx_rx: &mut mpsc::Receiver<Bytes>,
|
||||
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
||||
) -> std::io::Result<()> {
|
||||
let (reader, writer) = stream.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut writer = tokio::io::BufWriter::new(writer);
|
||||
|
||||
// Read StreamInfo
|
||||
let (msg_type, payload) = read_audio_msg(&mut reader).await?;
|
||||
if msg_type != AUDIO_MSG_STREAM_INFO {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"expected StreamInfo as first message",
|
||||
));
|
||||
}
|
||||
let info: AudioStreamInfo = serde_json::from_slice(&payload)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||
info!(
|
||||
"Audio stream info: {}Hz, {} ch, {}ms",
|
||||
info.sample_rate, info.channels, info.frame_duration_ms
|
||||
);
|
||||
let _ = stream_info_tx.send(Some(info));
|
||||
|
||||
// Spawn RX read task
|
||||
let rx_tx = rx_tx.clone();
|
||||
let mut rx_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
match read_audio_msg(&mut reader).await {
|
||||
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
|
||||
let _ = rx_tx.send(Bytes::from(payload));
|
||||
}
|
||||
Ok((msg_type, _)) => {
|
||||
warn!("Audio client: unexpected message type {}", msg_type);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Forward TX frames to server
|
||||
loop {
|
||||
tokio::select! {
|
||||
packet = tx_rx.recv() => {
|
||||
match packet {
|
||||
Some(data) => {
|
||||
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_TX_FRAME, &data).await {
|
||||
warn!("Audio TX write failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = &mut rx_handle => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -79,6 +79,27 @@ pub struct FrontendsConfig {
|
||||
pub http_json: HttpJsonFrontendConfig,
|
||||
/// AppKit (macOS) frontend settings
|
||||
pub appkit: AppKitFrontendConfig,
|
||||
/// Audio streaming settings
|
||||
pub audio: AudioClientConfig,
|
||||
}
|
||||
|
||||
/// Audio streaming client configuration.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct AudioClientConfig {
|
||||
/// Whether audio streaming is enabled
|
||||
pub enabled: bool,
|
||||
/// Audio TCP port on the remote server
|
||||
pub server_port: u16,
|
||||
}
|
||||
|
||||
impl Default for AudioClientConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: false,
|
||||
server_port: 4533,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// HTTP frontend configuration.
|
||||
@@ -236,6 +257,7 @@ impl ClientConfig {
|
||||
},
|
||||
http_json: HttpJsonFrontendConfig::default(),
|
||||
appkit: AppKitFrontendConfig { enabled: false },
|
||||
audio: AudioClientConfig::default(),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -292,6 +314,8 @@ mod tests {
|
||||
assert_eq!(config.frontends.http_json.port, 0);
|
||||
assert!(config.remote.url.is_none());
|
||||
assert_eq!(config.remote.poll_interval_ms, 750);
|
||||
assert!(!config.frontends.audio.enabled);
|
||||
assert_eq!(config.frontends.audio.server_port, 4533);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
mod audio_client;
|
||||
mod config;
|
||||
mod plugins;
|
||||
mod remote_client;
|
||||
@@ -10,18 +11,21 @@ use std::net::{IpAddr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use clap::Parser;
|
||||
use tokio::signal;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use tracing::info;
|
||||
|
||||
use trx_core::audio::AudioStreamInfo;
|
||||
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
use trx_core::rig::{RigControl, RigRxStatus, RigStatus, RigTxStatus};
|
||||
use trx_core::radio::freq::Freq;
|
||||
use trx_core::DynResult;
|
||||
use trx_frontend::{is_frontend_registered, registered_frontends};
|
||||
use trx_frontend_http::register_frontend as register_http_frontend;
|
||||
use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels};
|
||||
use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens};
|
||||
use trx_frontend_rigctl::register_frontend as register_rigctl_frontend;
|
||||
|
||||
@@ -266,6 +270,13 @@ async fn async_init() -> DynResult<AppState> {
|
||||
};
|
||||
let (state_tx, state_rx) = watch::channel(initial_state);
|
||||
|
||||
// Extract host for audio before moving remote_addr
|
||||
let remote_host = remote_addr
|
||||
.split(':')
|
||||
.next()
|
||||
.unwrap_or("127.0.0.1")
|
||||
.to_string();
|
||||
|
||||
let remote_cfg = RemoteClientConfig {
|
||||
addr: remote_addr,
|
||||
token: remote_token,
|
||||
@@ -274,6 +285,24 @@ async fn async_init() -> DynResult<AppState> {
|
||||
let _remote_handle =
|
||||
tokio::spawn(remote_client::run_remote_client(remote_cfg, rx, state_tx));
|
||||
|
||||
// Audio streaming setup
|
||||
if cfg.frontends.audio.enabled {
|
||||
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
|
||||
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
|
||||
let (stream_info_tx, stream_info_rx) = watch::channel::<Option<AudioStreamInfo>>(None);
|
||||
|
||||
let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port);
|
||||
|
||||
set_audio_channels(rx_audio_tx.clone(), tx_audio_tx, stream_info_rx);
|
||||
|
||||
tokio::spawn(audio_client::run_audio_client(
|
||||
audio_addr,
|
||||
rx_audio_tx,
|
||||
tx_audio_rx,
|
||||
stream_info_tx,
|
||||
));
|
||||
}
|
||||
|
||||
// Spawn frontends (skip appkit — it will be driven from main thread)
|
||||
for frontend in &frontends {
|
||||
if frontend == "appkit" {
|
||||
|
||||
Reference in New Issue
Block a user