diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 9cd2e16..48f8479 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -136,6 +136,16 @@ Notes: ### `[frontends.audio]` - `enabled` (`bool`, default: `true`) - `server_port` (`u16`, default: `4533`, must be `> 0` when enabled) +- `bridge.enabled` (`bool`, default: `false`): enables local `cpal` audio bridge +- `bridge.rx_output_device` (`string`, optional): exact local playback device name +- `bridge.tx_input_device` (`string`, optional): exact local capture device name +- `bridge.rx_gain` (`float`, default: `1.0`, must be finite and `>= 0`) +- `bridge.tx_gain` (`float`, default: `1.0`, must be finite and `>= 0`) + +Notes: +- The bridge is intended for local WSJT-X integration via virtual audio devices. +- Linux: typically use ALSA loopback (`snd-aloop`). +- macOS: install a virtual CoreAudio device (e.g. BlackHole), then set device names above. ## CLI Override Summary diff --git a/Cargo.lock b/Cargo.lock index 9ddc0e5..d809f3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2222,7 +2222,9 @@ version = "0.1.0" dependencies = [ "bytes", "clap", + "cpal", "dirs", + "opus", "serde", "serde_json", "tokio", diff --git a/src/trx-client/Cargo.toml b/src/trx-client/Cargo.toml index e5da131..052d9b2 100644 --- a/src/trx-client/Cargo.toml +++ b/src/trx-client/Cargo.toml @@ -16,6 +16,8 @@ tracing = { workspace = true } clap = { workspace = true, features = ["derive"] } dirs = "6" bytes = "1" +cpal = "0.15" +opus = "0.3" trx-app = { path = "../trx-app" } trx-core = { path = "../trx-core" } trx-protocol = { path = "../trx-protocol" } diff --git a/src/trx-client/src/audio_bridge.rs b/src/trx-client/src/audio_bridge.rs new file mode 100644 index 0000000..c26a769 --- /dev/null +++ b/src/trx-client/src/audio_bridge.rs @@ -0,0 +1,311 @@ +// SPDX-FileCopyrightText: 2026 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Local audio bridge for trx-client. +//! +//! Bridges remote Opus RX audio to a local output device and captures local +//! input device audio for upstream TX Opus frames. + +use std::collections::VecDeque; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{mpsc as std_mpsc, Arc, Mutex}; +use std::time::Duration; + +use bytes::Bytes; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use tokio::sync::{broadcast, mpsc, watch}; +use tracing::{info, warn}; + +use crate::config::AudioBridgeConfig; +use trx_core::audio::AudioStreamInfo; + +const BRIDGE_RETRY_DELAY: Duration = Duration::from_secs(2); + +pub fn spawn_audio_bridge( + cfg: AudioBridgeConfig, + rx_audio_tx: broadcast::Sender, + tx_audio_tx: mpsc::Sender, + mut stream_info_rx: watch::Receiver>, + mut shutdown_rx: watch::Receiver, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + while !*shutdown_rx.borrow() { + let info = match wait_for_stream_info(&mut stream_info_rx, &mut shutdown_rx).await { + Some(info) => info, + None => return, + }; + + info!( + "Audio bridge: starting with stream {}Hz {}ch {}ms", + info.sample_rate, info.channels, info.frame_duration_ms + ); + + let stop = Arc::new(AtomicBool::new(false)); + let playback_stop = stop.clone(); + let capture_stop = stop.clone(); + + let mut rx_packets = rx_audio_tx.subscribe(); + let (rx_bridge_tx, rx_bridge_rx) = std_mpsc::sync_channel::(128); + let rx_forward_stop = stop.clone(); + let rx_forward = tokio::spawn(async move { + while !rx_forward_stop.load(Ordering::Relaxed) { + match rx_packets.recv().await { + Ok(pkt) => { + let _ = rx_bridge_tx.try_send(pkt); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => {} + } + } + }); + + let playback_cfg = cfg.clone(); + let playback_info = info.clone(); + let playback = std::thread::spawn(move || { + if let Err(e) = run_playback(playback_cfg, playback_info, rx_bridge_rx, playback_stop) + { + warn!("Audio bridge playback stopped: {}", e); + } + }); + + let capture_cfg = cfg.clone(); + let capture_info = info.clone(); + let tx_audio_tx_clone = tx_audio_tx.clone(); + let capture = std::thread::spawn(move || { + if let Err(e) = + run_capture(capture_cfg, capture_info, tx_audio_tx_clone, capture_stop) + { + warn!("Audio bridge capture stopped: {}", e); + } + }); + + tokio::select! { + _ = shutdown_rx.changed() => {} + changed = stream_info_rx.changed() => { + if changed.is_err() { + break; + } + } + } + + stop.store(true, Ordering::Relaxed); + rx_forward.abort(); + let _ = playback.join(); + let _ = capture.join(); + + if *shutdown_rx.borrow() { + break; + } + tokio::time::sleep(BRIDGE_RETRY_DELAY).await; + } + info!("Audio bridge stopped"); + }) +} + +async fn wait_for_stream_info( + stream_info_rx: &mut watch::Receiver>, + shutdown_rx: &mut watch::Receiver, +) -> Option { + loop { + if *shutdown_rx.borrow() { + return None; + } + if let Some(info) = stream_info_rx.borrow().clone() { + return Some(info); + } + tokio::select! { + changed = stream_info_rx.changed() => { + if changed.is_err() { + return None; + } + } + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow() { + return None; + } + } + } + } +} + +fn run_playback( + cfg: AudioBridgeConfig, + info: AudioStreamInfo, + rx_packets: std_mpsc::Receiver, + stop: Arc, +) -> Result<(), Box> { + let host = cpal::default_host(); + let device = select_output_device(&host, cfg.rx_output_device.as_deref())?; + let stream_cfg = cpal::StreamConfig { + channels: info.channels as u16, + sample_rate: cpal::SampleRate(info.sample_rate), + buffer_size: cpal::BufferSize::Default, + }; + let channels = stream_cfg.channels as usize; + let frame_samples = (info.sample_rate as usize * info.frame_duration_ms as usize / 1000) * channels; + + let opus_channels = match stream_cfg.channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => return Err(format!("unsupported channel count {}", stream_cfg.channels).into()), + }; + let mut decoder = opus::Decoder::new(info.sample_rate, opus_channels)?; + let mut pcm_buf = vec![0f32; 5760 * channels]; + + let ring = Arc::new(Mutex::new(VecDeque::::with_capacity(frame_samples * 8))); + let ring_cb = ring.clone(); + let rx_gain = cfg.rx_gain.max(0.0); + + let err_stop = stop.clone(); + let stream = device.build_output_stream( + &stream_cfg, + move |data: &mut [f32], _| { + let mut rb = ring_cb.lock().expect("audio playback ring mutex poisoned"); + for sample in data.iter_mut() { + let v = rb.pop_front().unwrap_or(0.0) * rx_gain; + *sample = v.clamp(-1.0, 1.0); + } + }, + move |err| { + warn!("Audio bridge playback stream error: {}", err); + err_stop.store(true, Ordering::Relaxed); + }, + None, + )?; + + stream.play()?; + info!( + "Audio bridge playback active on '{}'", + device.name().unwrap_or_else(|_| "unknown".to_string()) + ); + + while !stop.load(Ordering::Relaxed) { + match rx_packets.recv_timeout(Duration::from_millis(200)) { + Ok(packet) => match decoder.decode_float(&packet, &mut pcm_buf, false) { + Ok(decoded_samples_per_channel) => { + let decoded_total = decoded_samples_per_channel * channels; + let mut rb = ring.lock().expect("audio playback ring mutex poisoned"); + rb.extend(pcm_buf[..decoded_total].iter().copied()); + let max_len = frame_samples * 16; + if rb.len() > max_len { + let drain = rb.len() - max_len; + rb.drain(..drain); + } + } + Err(e) => warn!("Audio bridge Opus RX decode error: {}", e), + }, + Err(std_mpsc::RecvTimeoutError::Timeout) => {} + Err(std_mpsc::RecvTimeoutError::Disconnected) => break, + } + } + + let _ = stream.pause(); + Ok(()) +} + +fn run_capture( + cfg: AudioBridgeConfig, + info: AudioStreamInfo, + tx_audio_tx: mpsc::Sender, + stop: Arc, +) -> Result<(), Box> { + let host = cpal::default_host(); + let device = select_input_device(&host, cfg.tx_input_device.as_deref())?; + let stream_cfg = cpal::StreamConfig { + channels: info.channels as u16, + sample_rate: cpal::SampleRate(info.sample_rate), + buffer_size: cpal::BufferSize::Default, + }; + let channels = stream_cfg.channels as usize; + let frame_samples = (info.sample_rate as usize * info.frame_duration_ms as usize / 1000) * channels; + + let opus_channels = match stream_cfg.channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => return Err(format!("unsupported channel count {}", stream_cfg.channels).into()), + }; + let mut encoder = opus::Encoder::new(info.sample_rate, opus_channels, opus::Application::Audio)?; + encoder.set_bitrate(opus::Bitrate::Bits(24_000))?; + let mut opus_buf = vec![0u8; 4096]; + + let (sample_tx, sample_rx) = std_mpsc::sync_channel::>(64); + let err_stop = stop.clone(); + let stream = device.build_input_stream( + &stream_cfg, + move |data: &[f32], _| { + let _ = sample_tx.try_send(data.to_vec()); + }, + move |err| { + warn!("Audio bridge capture stream error: {}", err); + err_stop.store(true, Ordering::Relaxed); + }, + None, + )?; + + stream.play()?; + info!( + "Audio bridge capture active on '{}'", + device.name().unwrap_or_else(|_| "unknown".to_string()) + ); + + let tx_gain = cfg.tx_gain.max(0.0); + let mut pcm = Vec::::with_capacity(frame_samples * 2); + + while !stop.load(Ordering::Relaxed) { + match sample_rx.recv_timeout(Duration::from_millis(200)) { + Ok(samples) => { + pcm.extend(samples.into_iter().map(|s| (s * tx_gain).clamp(-1.0, 1.0))); + while pcm.len() >= frame_samples { + let frame: Vec = pcm.drain(..frame_samples).collect(); + match encoder.encode_float(&frame, &mut opus_buf) { + Ok(len) => { + let pkt = Bytes::copy_from_slice(&opus_buf[..len]); + let _ = tx_audio_tx.try_send(pkt); + } + Err(e) => warn!("Audio bridge Opus TX encode error: {}", e), + } + } + } + Err(std_mpsc::RecvTimeoutError::Timeout) => {} + Err(std_mpsc::RecvTimeoutError::Disconnected) => break, + } + } + + let _ = stream.pause(); + Ok(()) +} + +fn select_output_device( + host: &cpal::Host, + preferred_name: Option<&str>, +) -> Result> { + if let Some(name) = preferred_name { + if let Some(device) = host + .output_devices()? + .find(|d| d.name().map(|n| n == name).unwrap_or(false)) + { + return Ok(device); + } + return Err(format!("output device '{}' not found", name).into()); + } + host.default_output_device() + .ok_or_else(|| "no default output device".into()) +} + +fn select_input_device( + host: &cpal::Host, + preferred_name: Option<&str>, +) -> Result> { + if let Some(name) = preferred_name { + if let Some(device) = host + .input_devices()? + .find(|d| d.name().map(|n| n == name).unwrap_or(false)) + { + return Ok(device); + } + return Err(format!("input device '{}' not found", name).into()); + } + host.default_input_device() + .ok_or_else(|| "no default input device".into()) +} diff --git a/src/trx-client/src/config.rs b/src/trx-client/src/config.rs index e9567ff..cdc7ffb 100644 --- a/src/trx-client/src/config.rs +++ b/src/trx-client/src/config.rs @@ -99,6 +99,8 @@ pub struct AudioClientConfig { pub enabled: bool, /// Audio TCP port on the remote server pub server_port: u16, + /// Local audio bridge (virtual device integration) + pub bridge: AudioBridgeConfig, } impl Default for AudioClientConfig { @@ -106,6 +108,35 @@ impl Default for AudioClientConfig { Self { enabled: true, server_port: 4533, + bridge: AudioBridgeConfig::default(), + } + } +} + +/// Local audio bridge configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AudioBridgeConfig { + /// Enable local cpal bridge between remote stream and local audio devices. + pub enabled: bool, + /// Local output device for remote RX playback. + pub rx_output_device: Option, + /// Local input device for TX uplink capture. + pub tx_input_device: Option, + /// RX playback gain multiplier. + pub rx_gain: f32, + /// TX capture gain multiplier. + pub tx_gain: f32, +} + +impl Default for AudioBridgeConfig { + fn default() -> Self { + Self { + enabled: false, + rx_output_device: None, + tx_input_device: None, + rx_gain: 1.0, + tx_gain: 1.0, } } } @@ -214,6 +245,14 @@ impl ClientConfig { if self.frontends.audio.enabled && self.frontends.audio.server_port == 0 { return Err("[frontends.audio].server_port must be > 0 when enabled".to_string()); } + if !self.frontends.audio.bridge.rx_gain.is_finite() || self.frontends.audio.bridge.rx_gain < 0.0 + { + return Err("[frontends.audio.bridge].rx_gain must be finite and >= 0".to_string()); + } + if !self.frontends.audio.bridge.tx_gain.is_finite() || self.frontends.audio.bridge.tx_gain < 0.0 + { + return Err("[frontends.audio.bridge].tx_gain must be finite and >= 0".to_string()); + } validate_tokens( "[frontends.http_json.auth].tokens", &self.frontends.http_json.auth.tokens, @@ -322,6 +361,9 @@ mod tests { assert_eq!(config.remote.poll_interval_ms, 750); assert!(config.frontends.audio.enabled); assert_eq!(config.frontends.audio.server_port, 4533); + assert!(!config.frontends.audio.bridge.enabled); + assert_eq!(config.frontends.audio.bridge.rx_gain, 1.0); + assert_eq!(config.frontends.audio.bridge.tx_gain, 1.0); } #[test] diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 2a388dd..f565180 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: BSD-2-Clause mod audio_client; +mod audio_bridge; mod config; mod remote_client; @@ -273,6 +274,29 @@ async fn async_init() -> DynResult { decode_tx, audio_shutdown_rx, ))); + + if cfg.frontends.audio.bridge.enabled { + info!("Audio bridge enabled (local virtual-device integration)"); + task_handles.push(audio_bridge::spawn_audio_bridge( + cfg.frontends.audio.bridge.clone(), + frontend_runtime + .audio_rx + .as_ref() + .expect("audio rx must be set") + .clone(), + frontend_runtime + .audio_tx + .as_ref() + .expect("audio tx must be set") + .clone(), + frontend_runtime + .audio_info + .as_ref() + .expect("audio info must be set") + .clone(), + shutdown_rx.clone(), + )); + } } else { info!("Audio disabled in config, decode will not be available"); } diff --git a/src/trx-client/trx-frontend/trx-frontend-rigctl/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-rigctl/src/server.rs index 47c6eb8..1e1a7b9 100644 --- a/src/trx-client/trx-frontend/trx-frontend-rigctl/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-rigctl/src/server.rs @@ -107,17 +107,18 @@ async fn process_command( rig_tx: &mpsc::Sender, ) -> CommandResult { let mut parts = cmd_line.split_whitespace(); - let Some(op) = parts.next() else { + let Some(raw_op) = parts.next() else { return CommandResult::Reply(err_response("empty command")); }; + let op = raw_op.trim_start_matches('+'); let resp = match op { "q" | "Q" | "\\q" | "\\quit" => return CommandResult::Close, - "f" => match request_snapshot(rig_tx).await { + "f" | "\\get_freq" => match request_snapshot(rig_tx).await { Ok(snapshot) => ok_response([snapshot.status.freq.hz.to_string()]), Err(e) => err_response(&e), }, - "F" => match parts.next().and_then(|s| s.parse::().ok()) { + "F" | "\\set_freq" => match parts.next().and_then(|s| s.parse::().ok()) { Some(freq) => { match send_rig_command(rig_tx, RigCommand::SetFreq(Freq { hz: freq })).await { Ok(_) => ok_only(), @@ -126,14 +127,14 @@ async fn process_command( } None => err_response("expected frequency in Hz"), }, - "m" => match request_snapshot(rig_tx).await { + "m" | "\\get_mode" => match request_snapshot(rig_tx).await { Ok(snapshot) => { let mode = rig_mode_to_str(&snapshot.status.mode); ok_response([mode, "0".to_string()]) } Err(e) => err_response(&e), }, - "M" => { + "M" | "\\set_mode" => { let Some(mode_str) = parts.next() else { return CommandResult::Reply(err_response("expected mode")); }; @@ -143,13 +144,13 @@ async fn process_command( Err(e) => err_response(&e), } } - "t" => match request_snapshot(rig_tx).await { + "t" | "\\get_ptt" => match request_snapshot(rig_tx).await { Ok(snapshot) => { ok_response([if snapshot.status.tx_en { "1" } else { "0" }.to_string()]) } Err(e) => err_response(&e), }, - "T" => match parts.next() { + "T" | "\\set_ptt" => match parts.next() { Some(v) if is_true(v) => match send_rig_command(rig_tx, RigCommand::SetPtt(true)).await { Ok(_) => ok_only(), @@ -163,6 +164,45 @@ async fn process_command( } _ => err_response("expected PTT state (0/1)"), }, + "v" | "\\get_vfo" => match request_snapshot(rig_tx).await { + Ok(snapshot) => ok_response([active_vfo_label(&snapshot)]), + Err(e) => err_response(&e), + }, + "V" | "\\set_vfo" => { + let Some(target) = parts.next() else { + return CommandResult::Reply(err_response("expected VFO (VFOA/VFOB)")); + }; + match set_vfo_target(target, rig_tx).await { + Ok(()) => ok_only(), + Err(e) => err_response(&e), + } + } + "s" | "\\get_split_vfo" => match request_snapshot(rig_tx).await { + Ok(snapshot) => { + // split state, tx vfo + ok_response(["0".to_string(), active_vfo_label(&snapshot)]) + } + Err(e) => err_response(&e), + }, + "S" | "\\set_split_vfo" => match parts.next() { + Some(v) if is_false(v) => ok_only(), + Some(v) if is_true(v) => err_response("split mode not supported"), + _ => err_response("expected split state (0/1)"), + }, + "\\get_info" => { + let snapshot = match current_snapshot(state_rx) { + Some(s) => s, + None => match request_snapshot(rig_tx).await { + Ok(s) => s, + Err(e) => return CommandResult::Reply(err_response(&e)), + }, + }; + let info = format!( + "Model: {} {}; Version: {}", + snapshot.info.manufacturer, snapshot.info.model, snapshot.info.revision + ); + ok_response([info]) + } "\\get_powerstat" | "get_powerstat" => match request_snapshot(rig_tx).await { Ok(snapshot) => { let val = snapshot.enabled.unwrap_or(false); @@ -318,6 +358,46 @@ fn active_vfo_label(snapshot: &RigSnapshot) -> String { }) .unwrap_or_else(|| "VFOA".to_string()) } + +async fn set_vfo_target(target: &str, rig_tx: &mpsc::Sender) -> Result<(), String> { + let desired = normalize_vfo_name(target).ok_or_else(|| "expected VFOA or VFOB".to_string())?; + let snapshot = request_snapshot(rig_tx).await?; + let current = active_vfo_label(&snapshot); + if current == desired { + return Ok(()); + } + + let supports_toggle = snapshot + .info + .capabilities + .num_vfos + >= 2 + && snapshot + .status + .vfo + .as_ref() + .is_some_and(|v| v.entries.len() >= 2); + if !supports_toggle { + return Err("VFO selection not supported".to_string()); + } + + send_rig_command(rig_tx, RigCommand::ToggleVfo).await?; + let after = request_snapshot(rig_tx).await?; + if active_vfo_label(&after) == desired { + Ok(()) + } else { + Err("failed to switch VFO".to_string()) + } +} + +fn normalize_vfo_name(v: &str) -> Option { + match v.trim().to_ascii_uppercase().as_str() { + "VFOA" | "A" => Some("VFOA".to_string()), + "VFOB" | "B" => Some("VFOB".to_string()), + _ => None, + } +} + fn is_true(s: &str) -> bool { matches!(s, "1" | "on" | "ON" | "true" | "True" | "TRUE") } diff --git a/trx-client.toml.example b/trx-client.toml.example index 330adbd..e9e13ea 100644 --- a/trx-client.toml.example +++ b/trx-client.toml.example @@ -47,3 +47,20 @@ listen = "127.0.0.1" port = 0 # List of accepted bearer tokens (empty = no auth) # auth.tokens = ["example-token"] + +[frontends.audio] +# Enable remote audio stream and decode transport +enabled = true +# Remote trx-server audio port +server_port = 4533 + +[frontends.audio.bridge] +# Enable local cpal bridge for WSJT-X virtual audio routing +enabled = false +# Optional exact output device name for RX playback +# rx_output_device = "BlackHole 2ch" +# Optional exact input device name for TX capture +# tx_input_device = "BlackHole 2ch" +# Playback/capture gain multipliers +rx_gain = 1.0 +tx_gain = 1.0