diff --git a/src/trx-client/Cargo.toml b/src/trx-client/Cargo.toml index 5d739ab..3672dbe 100644 --- a/src/trx-client/Cargo.toml +++ b/src/trx-client/Cargo.toml @@ -17,6 +17,7 @@ tracing-subscriber = { workspace = true } clap = { workspace = true, features = ["derive"] } dirs = "6" libloading = "0.8" +bytes = "1" trx-core = { path = "../trx-core" } trx-frontend = { path = "trx-frontend" } trx-frontend-http = { path = "trx-frontend/trx-frontend-http" } diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs new file mode 100644 index 0000000..da496e2 --- /dev/null +++ b/src/trx-client/src/audio_client.rs @@ -0,0 +1,116 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Audio TCP client that connects to the server's audio port and relays +//! RX/TX Opus frames via broadcast/mpsc channels. + +use std::time::Duration; + +use bytes::Bytes; +use tokio::io::BufReader; +use tokio::net::TcpStream; +use tokio::sync::{broadcast, mpsc, watch}; +use tokio::time; +use tracing::{info, warn}; + +use trx_core::audio::{ + read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, + AUDIO_MSG_TX_FRAME, +}; + +/// Run the audio client with auto-reconnect. +pub async fn run_audio_client( + server_addr: String, + rx_tx: broadcast::Sender, + mut tx_rx: mpsc::Receiver, + stream_info_tx: watch::Sender>, +) { + let mut reconnect_delay = Duration::from_secs(1); + + loop { + info!("Audio client: connecting to {}", server_addr); + match TcpStream::connect(&server_addr).await { + Ok(stream) => { + reconnect_delay = Duration::from_secs(1); + if let Err(e) = + handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx).await + { + warn!("Audio connection dropped: {}", e); + } + } + Err(e) => { + warn!("Audio connect failed: {}", e); + } + } + + let _ = stream_info_tx.send(None); + time::sleep(reconnect_delay).await; + reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10)); + } +} + +async fn handle_audio_connection( + stream: TcpStream, + rx_tx: &broadcast::Sender, + tx_rx: &mut mpsc::Receiver, + stream_info_tx: &watch::Sender>, +) -> std::io::Result<()> { + let (reader, writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut writer = tokio::io::BufWriter::new(writer); + + // Read StreamInfo + let (msg_type, payload) = read_audio_msg(&mut reader).await?; + if msg_type != AUDIO_MSG_STREAM_INFO { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "expected StreamInfo as first message", + )); + } + let info: AudioStreamInfo = serde_json::from_slice(&payload) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + info!( + "Audio stream info: {}Hz, {} ch, {}ms", + info.sample_rate, info.channels, info.frame_duration_ms + ); + let _ = stream_info_tx.send(Some(info)); + + // Spawn RX read task + let rx_tx = rx_tx.clone(); + let mut rx_handle = tokio::spawn(async move { + loop { + match read_audio_msg(&mut reader).await { + Ok((AUDIO_MSG_RX_FRAME, payload)) => { + let _ = rx_tx.send(Bytes::from(payload)); + } + Ok((msg_type, _)) => { + warn!("Audio client: unexpected message type {}", msg_type); + } + Err(_) => break, + } + } + }); + + // Forward TX frames to server + loop { + tokio::select! { + packet = tx_rx.recv() => { + match packet { + Some(data) => { + if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_TX_FRAME, &data).await { + warn!("Audio TX write failed: {}", e); + break; + } + } + None => break, + } + } + _ = &mut rx_handle => { + break; + } + } + } + + Ok(()) +} diff --git a/src/trx-client/src/config.rs b/src/trx-client/src/config.rs index 884a1cc..be3468a 100644 --- a/src/trx-client/src/config.rs +++ b/src/trx-client/src/config.rs @@ -79,6 +79,27 @@ pub struct FrontendsConfig { pub http_json: HttpJsonFrontendConfig, /// AppKit (macOS) frontend settings pub appkit: AppKitFrontendConfig, + /// Audio streaming settings + pub audio: AudioClientConfig, +} + +/// Audio streaming client configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AudioClientConfig { + /// Whether audio streaming is enabled + pub enabled: bool, + /// Audio TCP port on the remote server + pub server_port: u16, +} + +impl Default for AudioClientConfig { + fn default() -> Self { + Self { + enabled: false, + server_port: 4533, + } + } } /// HTTP frontend configuration. @@ -236,6 +257,7 @@ impl ClientConfig { }, http_json: HttpJsonFrontendConfig::default(), appkit: AppKitFrontendConfig { enabled: false }, + audio: AudioClientConfig::default(), }, }; @@ -292,6 +314,8 @@ mod tests { assert_eq!(config.frontends.http_json.port, 0); assert!(config.remote.url.is_none()); assert_eq!(config.remote.poll_interval_ms, 750); + assert!(!config.frontends.audio.enabled); + assert_eq!(config.frontends.audio.server_port, 4533); } #[test] diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index b617c8f..46e957d 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: BSD-2-Clause +mod audio_client; mod config; mod plugins; mod remote_client; @@ -10,18 +11,21 @@ use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::time::Duration; +use bytes::Bytes; use clap::Parser; use tokio::signal; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch}; use tracing::info; +use trx_core::audio::AudioStreamInfo; + use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; use trx_core::rig::{RigControl, RigRxStatus, RigStatus, RigTxStatus}; use trx_core::radio::freq::Freq; use trx_core::DynResult; use trx_frontend::{is_frontend_registered, registered_frontends}; -use trx_frontend_http::register_frontend as register_http_frontend; +use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels}; use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens}; use trx_frontend_rigctl::register_frontend as register_rigctl_frontend; @@ -266,6 +270,13 @@ async fn async_init() -> DynResult { }; let (state_tx, state_rx) = watch::channel(initial_state); + // Extract host for audio before moving remote_addr + let remote_host = remote_addr + .split(':') + .next() + .unwrap_or("127.0.0.1") + .to_string(); + let remote_cfg = RemoteClientConfig { addr: remote_addr, token: remote_token, @@ -274,6 +285,24 @@ async fn async_init() -> DynResult { let _remote_handle = tokio::spawn(remote_client::run_remote_client(remote_cfg, rx, state_tx)); + // Audio streaming setup + if cfg.frontends.audio.enabled { + let (rx_audio_tx, _) = broadcast::channel::(256); + let (tx_audio_tx, tx_audio_rx) = mpsc::channel::(64); + let (stream_info_tx, stream_info_rx) = watch::channel::>(None); + + let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port); + + set_audio_channels(rx_audio_tx.clone(), tx_audio_tx, stream_info_rx); + + tokio::spawn(audio_client::run_audio_client( + audio_addr, + rx_audio_tx, + tx_audio_rx, + stream_info_tx, + )); + } + // Spawn frontends (skip appkit — it will be driven from main thread) for frontend in &frontends { if frontend == "appkit" {