From 6131d7a1d69aa0f53a16e457c53357c79916e49f Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Wed, 11 Mar 2026 08:06:18 +0100 Subject: [PATCH] [feat](trx-rs): per-virtual-channel audio streaming Add end-to-end audio routing for virtual DSP channels: Server (trx-server): - New wire protocol: AUDIO_MSG_RX_FRAME_CH (0x0b), VCHAN_ALLOCATED (0x0c), VCHAN_SUB (0x0d), VCHAN_UNSUB (0x0e), VCHAN_FREQ (0x0f), VCHAN_MODE (0x10), VCHAN_REMOVE (0x11) frame types in trx-core audio.rs - Add frame helpers: write_vchan_uuid_msg, write_vchan_audio_frame, parse_vchan_audio_frame, parse_vchan_uuid_msg - Add ensure_channel_pcm() to VirtualChannelManager trait; implement in SdrVirtualChannelManager with create-or-subscribe semantics using client UUID - Extend audio.rs handle_audio_client: VChanCmd dispatcher, per-channel Opus encoder tasks, VCHAN_SUB/UNSUB/FREQ/MODE/REMOVE reader loop handlers - Thread vchan_manager through run_audio_listener / spawn_rig_audio_stack Client (trx-client): - Add VChanAudioCmd enum to trx-frontend; add vchan_audio and vchan_audio_cmd fields to FrontendRuntimeContext - Extend audio_client: demux AUDIO_MSG_RX_FRAME_CH to per-channel broadcasters, handle VCHAN_ALLOCATED; forward VChanAudioCmd over TCP write loop - Wire vchan_cmd_tx/rx channel in main.rs; pass vchan_audio map to audio_client - ClientChannelManager.set_audio_cmd() / send_audio_cmd(): dispatch Subscribe/Remove/SetFreq/SetMode on allocate/delete/freq/mode operations - Wire audio_cmd sender in server.rs serve() after creating vchan_mgr HTTP frontend: - /audio?channel_id=: route WebSocket to per-channel Opus broadcaster - vchan.js: vchanReconnectAudio() stops/restarts RX audio on channel switch; _audioChannelOverride in app.js selects primary vs virtual WS endpoint - app.js: _audioChannelOverride variable; startRxAudio appends channel param Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 3 + src/trx-client/Cargo.toml | 1 + src/trx-client/src/audio_client.rs | 96 +++++++- src/trx-client/src/main.rs | 7 + src/trx-client/trx-frontend/Cargo.toml | 1 + src/trx-client/trx-frontend/src/lib.rs | 32 +++ .../trx-frontend-http/assets/web/app.js | 8 +- .../assets/web/plugins/vchan.js | 23 +- .../trx-frontend-http/src/audio.rs | 31 ++- .../trx-frontend-http/src/server.rs | 9 + .../trx-frontend-http/src/vchan.rs | 33 +++ src/trx-core/src/audio.rs | 86 +++++++ src/trx-core/src/vchan.rs | 13 ++ src/trx-server/Cargo.toml | 1 + src/trx-server/src/audio.rs | 215 +++++++++++++++++- src/trx-server/src/main.rs | 8 + .../trx-backend-soapysdr/src/vchan_impl.rs | 56 +++++ 17 files changed, 606 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b892812..50ac034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2535,6 +2535,7 @@ dependencies = [ "trx-frontend-http-json", "trx-frontend-rigctl", "trx-protocol", + "uuid", ] [[package]] @@ -2576,6 +2577,7 @@ dependencies = [ "serde_json", "tokio", "trx-core", + "uuid", ] [[package]] @@ -2678,6 +2680,7 @@ dependencies = [ "trx-protocol", "trx-vdes", "trx-wspr", + "uuid", ] [[package]] diff --git a/src/trx-client/Cargo.toml b/src/trx-client/Cargo.toml index 10c740c..56d8192 100644 --- a/src/trx-client/Cargo.toml +++ b/src/trx-client/Cargo.toml @@ -17,6 +17,7 @@ tracing = { workspace = true } clap = { workspace = true, features = ["derive"] } dirs = "6" bytes = "1" +uuid = { workspace = true } cpal = "0.15" opus = "0.3" trx-app = { path = "../trx-app" } diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index d6d60bf..54bd140 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -6,7 +6,7 @@ //! RX/TX Opus frames via broadcast/mpsc channels. use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use bytes::Bytes; @@ -19,13 +19,19 @@ use tokio::time; use tracing::{info, warn}; use trx_frontend::RemoteRigEntry; +use uuid::Uuid; + use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, + parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, + write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, - AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, + AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH, + AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_FREQ, + AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::DecodedMessage; +use trx_frontend::VChanAudioCmd; /// Run the audio client with auto-reconnect. #[allow(clippy::too_many_arguments)] @@ -40,6 +46,8 @@ pub async fn run_audio_client( stream_info_tx: watch::Sender>, decode_tx: broadcast::Sender, mut shutdown_rx: watch::Receiver, + vchan_audio: Arc>>>, + mut vchan_cmd_rx: mpsc::Receiver, ) { let mut reconnect_delay = Duration::from_secs(1); @@ -77,6 +85,8 @@ pub async fn run_audio_client( &stream_info_tx, &decode_tx, &mut shutdown_rx, + &vchan_audio, + &mut vchan_cmd_rx, ) .await { @@ -120,6 +130,8 @@ async fn handle_audio_connection( stream_info_tx: &watch::Sender>, decode_tx: &broadcast::Sender, shutdown_rx: &mut watch::Receiver, + vchan_audio: &Arc>>>, + vchan_cmd_rx: &mut mpsc::Receiver, ) -> std::io::Result<()> { let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); @@ -144,12 +156,34 @@ async fn handle_audio_connection( // Spawn RX read task let rx_tx = rx_tx.clone(); let decode_tx = decode_tx.clone(); + let vchan_audio_rx: Arc>>> = Arc::clone(vchan_audio); let mut rx_handle = tokio::spawn(async move { loop { match read_audio_msg(&mut reader).await { Ok((AUDIO_MSG_RX_FRAME, payload)) => { let _ = rx_tx.send(Bytes::from(payload)); } + Ok((AUDIO_MSG_RX_FRAME_CH, payload)) => { + // Route per-channel Opus frame to the correct broadcaster. + if let Ok((uuid, opus)) = parse_vchan_audio_frame(&payload) { + let pkt = Bytes::copy_from_slice(opus); + if let Ok(map) = vchan_audio_rx.read() { + if let Some(tx) = map.get(&uuid) { + let _ = tx.send(pkt); + } + } + } + } + Ok((AUDIO_MSG_VCHAN_ALLOCATED, payload)) => { + // Server confirmed a virtual channel is ready; ensure a + // broadcaster entry exists in the shared map. + if let Ok(uuid) = parse_vchan_uuid_msg(&payload) { + if let Ok(mut map) = vchan_audio_rx.write() { + map.entry(uuid) + .or_insert_with(|| broadcast::channel::(64).0); + } + } + } Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => { // Decompress gzip blob, then iterate the embedded framed messages. let mut decompressed = Vec::new(); @@ -193,14 +227,14 @@ async fn handle_audio_connection( } } Ok((msg_type, _)) => { - warn!("Audio client: unexpected message type {}", msg_type); + warn!("Audio client: unexpected message type {:#04x}", msg_type); } Err(_) => break, } } }); - // Forward TX frames to server + // Forward TX frames and VChanAudioCmds to server. let mut rig_check = time::interval(Duration::from_millis(500)); loop { tokio::select! { @@ -228,6 +262,58 @@ async fn handle_audio_connection( None => break, } } + cmd = vchan_cmd_rx.recv() => { + match cmd { + Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => { + let json = serde_json::json!({ + "uuid": uuid.to_string(), + "freq_hz": freq_hz, + "mode": mode, + }); + if let Ok(payload) = serde_json::to_vec(&json) { + if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { + warn!("Audio vchan SUB write failed: {}", e); + break; + } + } + } + Some(VChanAudioCmd::Unsubscribe(uuid)) => { + if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_UNSUB, uuid).await { + warn!("Audio vchan UNSUB write failed: {}", e); + break; + } + } + Some(VChanAudioCmd::Remove(uuid)) => { + if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_REMOVE, uuid).await { + warn!("Audio vchan REMOVE write failed: {}", e); + break; + } + // Clean up local broadcaster. + if let Ok(mut map) = vchan_audio.write() { + map.remove(&uuid); + } + } + Some(VChanAudioCmd::SetFreq { uuid, freq_hz }) => { + let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz }); + if let Ok(payload) = serde_json::to_vec(&json) { + if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_FREQ, &payload).await { + warn!("Audio vchan FREQ write failed: {}", e); + break; + } + } + } + Some(VChanAudioCmd::SetMode { uuid, mode }) => { + let json = serde_json::json!({ "uuid": uuid.to_string(), "mode": mode }); + if let Ok(payload) = serde_json::to_vec(&json) { + if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_MODE, &payload).await { + warn!("Audio vchan MODE write failed: {}", e); + break; + } + } + } + None => {} + } + } _ = &mut rx_handle => { break; } diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 4bc0082..70cf641 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -299,6 +299,10 @@ async fn async_init() -> DynResult { frontend_runtime.audio_info = Some(stream_info_rx); frontend_runtime.decode_rx = Some(decode_tx.clone()); + // Virtual-channel audio: shared broadcaster map + command channel. + let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::(64); + *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); + info!( "Audio enabled: default port {}, decode channel set", cfg.frontends.audio.server_port @@ -306,6 +310,7 @@ async fn async_init() -> DynResult { let audio_rig_ports: HashMap = cfg.frontends.audio.rig_ports.clone(); let audio_shutdown_rx = shutdown_rx.clone(); + let vchan_audio_map = frontend_runtime.vchan_audio.clone(); pending_audio_client = Some(tokio::spawn(audio_client::run_audio_client( remote_host, cfg.frontends.audio.server_port, @@ -317,6 +322,8 @@ async fn async_init() -> DynResult { stream_info_tx, decode_tx, audio_shutdown_rx, + vchan_audio_map, + vchan_cmd_rx, ))); if cfg.frontends.audio.bridge.enabled { diff --git a/src/trx-client/trx-frontend/Cargo.toml b/src/trx-client/trx-frontend/Cargo.toml index cfd7010..1427f00 100644 --- a/src/trx-client/trx-frontend/Cargo.toml +++ b/src/trx-client/trx-frontend/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" [dependencies] bytes = "1" +uuid = { workspace = true } serde_json = { workspace = true } trx-core = { path = "../../trx-core" } tokio = { workspace = true, features = ["sync"] } diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index 0c713a4..3281971 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: BSD-2-Clause use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::RwLock; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; @@ -12,6 +13,8 @@ use bytes::Bytes; use tokio::sync::{broadcast, mpsc, watch}; use tokio::task::JoinHandle; +use uuid::Uuid; + use trx_core::audio::AudioStreamInfo; use trx_core::decode::{ AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, @@ -19,6 +22,24 @@ use trx_core::decode::{ use trx_core::rig::state::{RigSnapshot, SpectrumData}; use trx_core::{DynResult, RigRequest, RigState}; +/// Command sent by the HTTP frontend to the audio-client task to manage a +/// virtual channel's audio stream over the server's audio TCP connection. +#[derive(Debug)] +pub enum VChanAudioCmd { + /// Create the server-side DSP channel (if it does not exist) and subscribe + /// to its Opus audio stream. `freq_hz` and `mode` are used if the server + /// needs to create the channel. + Subscribe { uuid: Uuid, freq_hz: u64, mode: String }, + /// Unsubscribe from audio (encoder task is stopped) but keep the DSP channel. + Unsubscribe(Uuid), + /// Unsubscribe and destroy the DSP channel. + Remove(Uuid), + /// Update the dial frequency of an existing virtual channel. + SetFreq { uuid: Uuid, freq_hz: u64 }, + /// Update the demodulation mode of an existing virtual channel. + SetMode { uuid: Uuid, mode: String }, +} + #[derive(Clone, Debug)] pub struct RemoteRigEntry { pub rig_id: String, @@ -206,6 +227,15 @@ pub struct FrontendRuntimeContext { pub ais_vessel_url_base: Option, /// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`. pub spectrum: Arc>, + /// Per-virtual-channel Opus audio senders. + /// Key: server-side virtual channel UUID. + /// Value: `broadcast::Sender` that receives per-channel Opus packets + /// forwarded by the audio-client task from `AUDIO_MSG_RX_FRAME_CH` frames. + pub vchan_audio: Arc>>>, + /// Channel to send `VChanAudioCmd` to the audio-client task, which in turn + /// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection. + /// `None` when no audio connection is active. + pub vchan_audio_cmd: Arc>>>, } impl FrontendRuntimeContext { @@ -249,6 +279,8 @@ impl FrontendRuntimeContext { let (tx, _rx) = watch::channel(SharedSpectrum::default()); Arc::new(tx) }, + vchan_audio: Arc::new(RwLock::new(HashMap::new())), + vchan_audio_cmd: Arc::new(Mutex::new(None)), } } } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 9ec0880..f16810b 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -5879,6 +5879,9 @@ function extractAudioFrameChannels(frame) { return out; } +// Optional channel_id injected by vchan.js when connecting to a virtual channel. +let _audioChannelOverride = null; + function startRxAudio() { if (rxActive) { stopRxAudio(); return; } if (!hasWebCodecs) { @@ -5886,7 +5889,10 @@ function startRxAudio() { return; } const proto = location.protocol === "https:" ? "wss:" : "ws:"; - audioWs = new WebSocket(`${proto}//${location.host}/audio`); + const audioPath = _audioChannelOverride + ? `/audio?channel_id=${encodeURIComponent(_audioChannelOverride)}` + : "/audio"; + audioWs = new WebSocket(`${proto}//${location.host}${audioPath}`); audioWs.binaryType = "arraybuffer"; audioStatus.textContent = "Connecting…"; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js index 3786c8a..9bea36f 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js @@ -36,10 +36,11 @@ function vchanHandleChannels(data) { const d = JSON.parse(data); vchanRigId = d.rig_id || null; vchanChannels = d.channels || []; - // If the active channel was evicted, fall back to channel 0. + // If the active channel was evicted, fall back to channel 0 and reconnect audio. const ids = new Set(vchanChannels.map(c => c.id)); if (vchanActiveId && !ids.has(vchanActiveId)) { vchanActiveId = vchanChannels.length > 0 ? vchanChannels[0].id : null; + vchanReconnectAudio(); } vchanRender(); } catch (e) { @@ -120,6 +121,7 @@ async function vchanAllocate() { // The SSE `channels` event will trigger vchanRender(); optimistically // mark active so the picker feels responsive even before the event arrives. vchanRender(); + vchanReconnectAudio(); } catch (e) { console.error("vchan: allocate error", e); } @@ -159,11 +161,30 @@ async function vchanSubscribe(channelId) { vchanActiveId = channelId; vchanRender(); vchanSyncModeDisplay(); + vchanReconnectAudio(); } catch (e) { console.error("vchan: subscribe error", e); } } +// Reconnect the audio WebSocket to the appropriate endpoint: +// - virtual channel: /audio?channel_id= +// - primary channel: /audio (no param) +// Only reconnects if RX audio is currently active. +function vchanReconnectAudio() { + if (typeof rxActive === "undefined" || !rxActive) return; + // Set the channel override so startRxAudio picks up the right URL. + const ch = vchanIsOnVirtual() ? vchanActiveChannel() : null; + if (typeof _audioChannelOverride !== "undefined") { + _audioChannelOverride = ch ? ch.id : null; + } + if (typeof stopRxAudio === "function") stopRxAudio(); + // Small delay so the server has time to set up the per-channel encoder. + setTimeout(() => { + if (typeof startRxAudio === "function") startRxAudio(); + }, 200); +} + // Called by app.js from applyCapabilities(). // Shows the channel picker only for SDR rigs. function vchanApplyCapabilities(caps) { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs index 627513f..4e97d1c 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs @@ -17,8 +17,10 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use actix_web::{get, web, Error, HttpRequest, HttpResponse}; use actix_ws::Message; use bytes::Bytes; +use serde::Deserialize; use tokio::sync::broadcast; use tracing::warn; +use uuid::Uuid; use trx_core::decode::{ AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, @@ -353,15 +355,18 @@ pub fn start_decode_history_collector(context: Arc) { }); } +#[derive(Deserialize)] +pub struct AudioQuery { + pub channel_id: Option, +} + #[get("/audio")] pub async fn audio_ws( req: HttpRequest, body: web::Payload, + query: web::Query, context: web::Data>, ) -> Result { - let Some(rx) = context.audio_rx.as_ref() else { - return Ok(HttpResponse::NotFound().body("audio not enabled")); - }; let Some(tx_sender) = context.audio_tx.as_ref().cloned() else { return Ok(HttpResponse::NotFound().body("audio not enabled")); }; @@ -374,7 +379,25 @@ pub async fn audio_ws( return Ok(HttpResponse::NoContent().finish()); } - let mut rx_sub = rx.subscribe(); + // If a channel_id is specified, subscribe to the per-channel broadcaster. + // Otherwise fall back to the primary RX broadcast. + let rx_sub: broadcast::Receiver = if let Some(ch_id) = query.channel_id { + match context.vchan_audio.read() { + Ok(map) => match map.get(&ch_id) { + Some(tx) => tx.subscribe(), + None => { + return Ok(HttpResponse::NotFound().body("channel not found")); + } + }, + Err(_) => return Ok(HttpResponse::InternalServerError().finish()), + } + } else { + let Some(rx) = context.audio_rx.as_ref() else { + return Ok(HttpResponse::NotFound().body("audio not enabled")); + }; + rx.subscribe() + }; + let mut rx_sub = rx_sub; let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index 822f958..96d767e 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -83,6 +83,15 @@ async fn serve( ); let vchan_mgr = Arc::new(ClientChannelManager::new(4)); + + // Wire the audio-command sender so allocate/delete/freq/mode operations on + // virtual channels are forwarded to the audio-client task. + if let Ok(guard) = context.vchan_audio_cmd.lock() { + if let Some(tx) = guard.as_ref() { + vchan_mgr.set_audio_cmd(tx.clone()); + } + } + let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status, vchan_mgr)?; let handle = server.handle(); tokio::spawn(async move { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs index a4d08d9..f12563e 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs @@ -22,6 +22,8 @@ use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use uuid::Uuid; +use trx_frontend::VChanAudioCmd; + // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- @@ -90,6 +92,8 @@ pub struct ClientChannelManager { /// `":"` so subscribers can filter by rig. pub change_tx: broadcast::Sender, pub max_channels: usize, + /// Optional sender to the audio-client task for virtual-channel audio commands. + pub audio_cmd: std::sync::Mutex>>, } impl ClientChannelManager { @@ -100,6 +104,20 @@ impl ClientChannelManager { sessions: RwLock::new(HashMap::new()), change_tx, max_channels: max_channels.max(1), + audio_cmd: std::sync::Mutex::new(None), + } + } + + /// Wire the audio-command sender so the manager can dispatch + /// `VChanAudioCmd` messages when channels are allocated/deleted/changed. + pub fn set_audio_cmd(&self, tx: tokio::sync::mpsc::Sender) { + *self.audio_cmd.lock().unwrap() = Some(tx); + } + + /// Fire-and-forget: send a `VChanAudioCmd` to the audio-client task. + fn send_audio_cmd(&self, cmd: VChanAudioCmd) { + if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() { + let _ = tx.try_send(cmd); } } @@ -225,6 +243,13 @@ impl ClientChannelManager { .unwrap() .insert(session_id, (rig_id.to_string(), id)); + // Request server-side DSP channel + audio subscription. + self.send_audio_cmd(VChanAudioCmd::Subscribe { + uuid: id, + freq_hz, + mode: mode.to_string(), + }); + Ok(snapshot) } @@ -329,6 +354,10 @@ impl ClientChannelManager { for sid in evicted { sessions.remove(&sid); } + + // Remove server-side DSP channel and stop audio encoding. + self.send_audio_cmd(VChanAudioCmd::Remove(channel_id)); + Ok(()) } @@ -347,6 +376,8 @@ impl ClientChannelManager { .ok_or(VChanClientError::NotFound)?; ch.freq_hz = freq_hz; self.broadcast_change(rig_id, channels); + drop(rigs); + self.send_audio_cmd(VChanAudioCmd::SetFreq { uuid: channel_id, freq_hz }); Ok(()) } @@ -364,6 +395,8 @@ impl ClientChannelManager { .ok_or(VChanClientError::NotFound)?; ch.mode = mode.to_string(); self.broadcast_change(rig_id, channels); + drop(rigs); + self.send_audio_cmd(VChanAudioCmd::SetMode { uuid: channel_id, mode: mode.to_string() }); Ok(()) } diff --git a/src/trx-core/src/audio.rs b/src/trx-core/src/audio.rs index c591874..3e45b0a 100644 --- a/src/trx-core/src/audio.rs +++ b/src/trx-core/src/audio.rs @@ -6,6 +6,8 @@ //! //! Wire format: `[1 byte type][4 bytes BE length N][N bytes payload]` +use uuid::Uuid; + use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; pub const AUDIO_MSG_STREAM_INFO: u8 = 0x00; @@ -22,6 +24,39 @@ pub const AUDIO_MSG_HF_APRS_DECODE: u8 = 0x09; /// framed messages (each: `[1 byte type][4 bytes BE length][payload]`). pub const AUDIO_MSG_HISTORY_COMPRESSED: u8 = 0x0a; +// --------------------------------------------------------------------------- +// Virtual-channel audio multiplexing (server → client) +// --------------------------------------------------------------------------- + +/// Per-virtual-channel Opus frame: `[16 B UUID][opus_len B Opus]`. +/// Sent by the server for each virtual channel the client has subscribed to. +pub const AUDIO_MSG_RX_FRAME_CH: u8 = 0x0b; +/// Server → client: virtual channel audio subscription acknowledged. +/// Payload: 16-byte UUID of the newly activated channel slot. +pub const AUDIO_MSG_VCHAN_ALLOCATED: u8 = 0x0c; + +// --------------------------------------------------------------------------- +// Virtual-channel audio multiplexing (client → server) +// --------------------------------------------------------------------------- + +/// Client → server: create-or-subscribe to a virtual channel's audio. +/// Payload: JSON `{"uuid":"","freq_hz":,"mode":""}`. +/// If a channel with the given UUID already exists the server just subscribes; +/// otherwise it creates a new DSP pipeline at the given frequency/mode first. +pub const AUDIO_MSG_VCHAN_SUB: u8 = 0x0d; +/// Client → server: unsubscribe from a virtual channel's audio. +/// Payload: 16-byte UUID of the virtual channel on the server. +pub const AUDIO_MSG_VCHAN_UNSUB: u8 = 0x0e; +/// Client → server: update the dial frequency of a virtual channel. +/// Payload: JSON `{"uuid":"","freq_hz":}`. +pub const AUDIO_MSG_VCHAN_FREQ: u8 = 0x0f; +/// Client → server: update the demodulation mode of a virtual channel. +/// Payload: JSON `{"uuid":"","mode":""}`. +pub const AUDIO_MSG_VCHAN_MODE: u8 = 0x10; +/// Client → server: remove a virtual channel (stops encoding and destroys the DSP pipeline). +/// Payload: 16-byte UUID of the virtual channel on the server. +pub const AUDIO_MSG_VCHAN_REMOVE: u8 = 0x11; + /// Maximum payload size for normal messages (1 MB). const MAX_PAYLOAD_SIZE: u32 = 1_048_576; /// Maximum payload size for the compressed history blob (16 MB). @@ -80,3 +115,54 @@ pub async fn read_audio_msg( reader.read_exact(&mut payload).await?; Ok((msg_type, payload)) } + +// --------------------------------------------------------------------------- +// Virtual-channel frame helpers +// --------------------------------------------------------------------------- + +/// Write a virtual-channel control frame (16-byte UUID payload only). +/// Used for `AUDIO_MSG_VCHAN_SUB`, `AUDIO_MSG_VCHAN_UNSUB`, and +/// `AUDIO_MSG_VCHAN_ALLOCATED`. +pub async fn write_vchan_uuid_msg( + writer: &mut W, + msg_type: u8, + uuid: Uuid, +) -> std::io::Result<()> { + write_audio_msg(writer, msg_type, uuid.as_bytes()).await +} + +/// Write an `AUDIO_MSG_RX_FRAME_CH` frame: 16-byte UUID followed by Opus payload. +pub async fn write_vchan_audio_frame( + writer: &mut W, + uuid: Uuid, + opus: &[u8], +) -> std::io::Result<()> { + let mut payload = Vec::with_capacity(16 + opus.len()); + payload.extend_from_slice(uuid.as_bytes()); + payload.extend_from_slice(opus); + write_audio_msg(writer, AUDIO_MSG_RX_FRAME_CH, &payload).await +} + +/// Parse a virtual-channel audio frame payload (`AUDIO_MSG_RX_FRAME_CH`). +/// Returns `(uuid, opus_bytes)` or an error if the payload is too short. +pub fn parse_vchan_audio_frame(payload: &[u8]) -> std::io::Result<(Uuid, &[u8])> { + if payload.len() < 16 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "vchan audio frame payload too short", + )); + } + let uuid = Uuid::from_bytes(payload[..16].try_into().unwrap()); + Ok((uuid, &payload[16..])) +} + +/// Parse a 16-byte UUID control frame (SUB / UNSUB / ALLOCATED). +pub fn parse_vchan_uuid_msg(payload: &[u8]) -> std::io::Result { + if payload.len() < 16 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "vchan uuid frame payload too short", + )); + } + Ok(Uuid::from_bytes(payload[..16].try_into().unwrap())) +} diff --git a/src/trx-core/src/vchan.rs b/src/trx-core/src/vchan.rs index c1d9ebd..d1161fb 100644 --- a/src/trx-core/src/vchan.rs +++ b/src/trx-core/src/vchan.rs @@ -99,6 +99,19 @@ pub trait VirtualChannelManager: Send + Sync { /// Returns `None` if the channel UUID does not exist. fn subscribe_pcm(&self, id: Uuid) -> Option>>; + /// Return a PCM receiver for an existing channel, or create a new channel + /// with the given `id`, `freq_hz`, and `mode` and subscribe to it. + /// + /// Used by the audio-TCP server path where the client provides a stable UUID + /// (generated on the client side) so that both sides use the same identifier + /// without a separate round-trip to allocate a server UUID. + fn ensure_channel_pcm( + &self, + id: Uuid, + freq_hz: u64, + mode: &RigMode, + ) -> Result>, VChanError>; + /// Return a snapshot of all channels in display order. fn channels(&self) -> Vec; diff --git a/src/trx-server/Cargo.toml b/src/trx-server/Cargo.toml index b9d44d0..89d9d3a 100644 --- a/src/trx-server/Cargo.toml +++ b/src/trx-server/Cargo.toml @@ -25,6 +25,7 @@ dirs = "6" pickledb = "0.5" chrono = { version = "0.4", default-features = false, features = ["clock"] } bytes = "1" +uuid = { workspace = true } cpal = "0.15" num-complex = "0.4" opus = "0.3" diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 3912d24..3ce016d 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -22,11 +22,16 @@ use tracing::{error, info, warn}; use trx_ais::AisDecoder; use trx_aprs::AprsDecoder; use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, + parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame, + write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, - AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, + AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, + AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; +use trx_core::vchan::SharedVChanManager; +use uuid::Uuid; use trx_core::decode::{ AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, }; @@ -1752,6 +1757,22 @@ pub async fn run_wspr_decoder( } } +// --------------------------------------------------------------------------- +// Virtual-channel audio support +// --------------------------------------------------------------------------- + +/// Commands sent from the reader loop to the RX writer task to subscribe or +/// unsubscribe a virtual channel's Opus audio stream. +enum VChanCmd { + /// Start encoding the given channel's PCM and forwarding it to the client. + Subscribe { + uuid: Uuid, + pcm_rx: tokio::sync::broadcast::Receiver>, + }, + /// Stop forwarding audio for the given channel. + Unsubscribe(Uuid), +} + /// Run the audio TCP listener, accepting client connections. pub async fn run_audio_listener( addr: SocketAddr, @@ -1761,6 +1782,7 @@ pub async fn run_audio_listener( decode_tx: broadcast::Sender, mut shutdown_rx: watch::Receiver, histories: Arc, + vchan_manager: Option, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; info!("Audio listener on {}", addr); @@ -1777,9 +1799,10 @@ pub async fn run_audio_listener( let decode_tx = decode_tx.clone(); let client_shutdown_rx = shutdown_rx.clone(); let client_histories = histories.clone(); + let client_vchan_mgr = vchan_manager.clone(); tokio::spawn(async move { - if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx, client_histories).await { + if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx, client_histories, client_vchan_mgr).await { warn!("Audio client {} error: {:?}", peer, e); } info!("Audio client {} disconnected", peer); @@ -1810,6 +1833,7 @@ async fn handle_audio_client( decode_tx: broadcast::Sender, mut shutdown_rx: watch::Receiver, histories: Arc, + vchan_manager: Option, ) -> std::io::Result<()> { let (reader, writer) = socket.into_split(); let mut reader = tokio::io::BufReader::new(reader); @@ -1882,11 +1906,26 @@ async fn handle_audio_client( ); } - // Spawn RX + decode forwarding task (shares the writer) + // Spawn RX + decode forwarding task (shares the writer). + // vchan audio frames are fed into this task via vchan_frame_rx so all + // writes share one BufWriter and stay ordered. let mut rx_sub = rx_audio.subscribe(); let mut decode_sub = decode_tx.subscribe(); let mut writer_for_rx = writer; + + // (uuid, opus_bytes) produced by per-channel encoder tasks. + let (vchan_frame_tx, mut vchan_frame_rx) = mpsc::channel::<(Uuid, Bytes)>(256); + // Commands from the reader loop: Subscribe / Unsubscribe. + let (vchan_cmd_tx, mut vchan_cmd_rx) = mpsc::channel::(32); + + let opus_sample_rate = stream_info.sample_rate; + let opus_channels = stream_info.channels; + let rx_handle = tokio::spawn(async move { + // UUID → JoinHandle of per-channel Opus encoder task. + let mut vchan_tasks: std::collections::HashMap> = + std::collections::HashMap::new(); + loop { tokio::select! { result = rx_sub.recv() => { @@ -1928,11 +1967,96 @@ async fn handle_audio_client( Err(broadcast::error::RecvError::Closed) => break, } } + // Virtual-channel audio frame produced by a per-channel encoder task. + Some((uuid, opus)) = vchan_frame_rx.recv() => { + if let Err(e) = write_vchan_audio_frame(&mut writer_for_rx, uuid, opus.as_ref()).await { + warn!("Audio vchan RX_FRAME_CH write to {} failed: {}", peer, e); + break; + } + } + // Commands from reader loop: subscribe / unsubscribe. + Some(cmd) = vchan_cmd_rx.recv() => { + match cmd { + VChanCmd::Subscribe { uuid, pcm_rx } => { + // Spin up an async Opus encoder task for this virtual channel. + let frame_tx = vchan_frame_tx.clone(); + let sr = opus_sample_rate; + let ch_count = opus_channels; + let mut pcm_rx = pcm_rx; + let handle = tokio::spawn(async move { + let opus_ch = match ch_count { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => return, + }; + let mut encoder = match opus::Encoder::new( + sr, + opus_ch, + opus::Application::Audio, + ) { + Ok(e) => e, + Err(e) => { + warn!("vchan Opus encoder init failed: {}", e); + return; + } + }; + let _ = encoder.set_bitrate(opus::Bitrate::Bits(32_000)); + let _ = encoder.set_complexity(5); + let mut buf = vec![0u8; 4096]; + loop { + match pcm_rx.recv().await { + Ok(frame) => { + match encoder.encode_float(&frame, &mut buf) { + Ok(len) => { + let pkt = Bytes::copy_from_slice(&buf[..len]); + if frame_tx.send((uuid, pkt)).await.is_err() { + break; + } + } + Err(e) => { + warn!("vchan Opus encode error: {}", e); + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("vchan encoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); + + vchan_tasks.insert(uuid, handle); + + // Acknowledge to the client. + if let Err(e) = write_vchan_uuid_msg( + &mut writer_for_rx, + AUDIO_MSG_VCHAN_ALLOCATED, + uuid, + ) + .await + { + warn!("Audio vchan allocated write to {} failed: {}", peer, e); + break; + } + } + VChanCmd::Unsubscribe(uuid) => { + if let Some(h) = vchan_tasks.remove(&uuid) { + h.abort(); + } + } + } + } } } + + // Abort all per-channel encoder tasks on disconnect. + for (_, h) in vchan_tasks { + h.abort(); + } }); - // Read TX frames from client + // Read TX frames (and virtual-channel sub/unsub commands) from client. loop { let msg = tokio::select! { msg = read_audio_msg(&mut reader) => msg, @@ -1954,8 +2078,87 @@ async fn handle_audio_client( Ok((AUDIO_MSG_TX_FRAME, payload)) => { let _ = tx_audio.send(Bytes::from(payload)).await; } + Ok((AUDIO_MSG_VCHAN_SUB, payload)) => { + if let Some(ref mgr) = vchan_manager { + // Payload: JSON { "uuid": "...", "freq_hz": N, "mode": "..." } + match serde_json::from_slice::(&payload) { + Ok(v) => { + let uuid = v["uuid"] + .as_str() + .and_then(|s| s.parse::().ok()); + let freq_hz = v["freq_hz"].as_u64(); + let mode_str = v["mode"].as_str().unwrap_or("USB"); + let mode = trx_protocol::codec::parse_mode(mode_str); + if let (Some(uuid), Some(freq_hz)) = (uuid, freq_hz) { + match mgr.ensure_channel_pcm(uuid, freq_hz, &mode) { + Ok(pcm_rx) => { + let _ = vchan_cmd_tx + .send(VChanCmd::Subscribe { uuid, pcm_rx }) + .await; + } + Err(e) => warn!("Audio vchan SUB: {}", e), + } + } else { + warn!("Audio vchan SUB: missing uuid or freq_hz in payload"); + } + } + Err(e) => warn!("Audio vchan SUB: bad JSON payload: {}", e), + } + } + } + Ok((AUDIO_MSG_VCHAN_UNSUB, payload)) => { + match parse_vchan_uuid_msg(&payload) { + Ok(uuid) => { + let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await; + } + Err(e) => warn!("Audio vchan UNSUB: bad payload: {}", e), + } + } + Ok((AUDIO_MSG_VCHAN_FREQ, payload)) => { + if let Some(ref mgr) = vchan_manager { + if let Ok(v) = serde_json::from_slice::(&payload) { + if let (Some(uuid), Some(freq_hz)) = ( + v["uuid"].as_str().and_then(|s| s.parse::().ok()), + v["freq_hz"].as_u64(), + ) { + if let Err(e) = mgr.set_channel_freq(uuid, freq_hz) { + warn!("Audio vchan FREQ: {}", e); + } + } + } + } + } + Ok((AUDIO_MSG_VCHAN_MODE, payload)) => { + if let Some(ref mgr) = vchan_manager { + if let Ok(v) = serde_json::from_slice::(&payload) { + if let Some(uuid) = v["uuid"].as_str().and_then(|s| s.parse::().ok()) { + let mode = trx_protocol::codec::parse_mode( + v["mode"].as_str().unwrap_or("USB"), + ); + if let Err(e) = mgr.set_channel_mode(uuid, &mode) { + warn!("Audio vchan MODE: {}", e); + } + } + } + } + } + Ok((AUDIO_MSG_VCHAN_REMOVE, payload)) => { + match parse_vchan_uuid_msg(&payload) { + Ok(uuid) => { + // Unsubscribe first. + let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await; + // Then remove from the DSP pipeline. + if let Some(ref mgr) = vchan_manager { + if let Err(e) = mgr.remove_channel(uuid) { + warn!("Audio vchan REMOVE: {}", e); + } + } + } + Err(e) => warn!("Audio vchan REMOVE: bad payload: {}", e), + } + } Ok((msg_type, _)) => { - warn!("Audio: unexpected message type {} from {}", msg_type, peer); + warn!("Audio: unexpected message type {:#04x} from {}", msg_type, peer); } Err(_) => break, } diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 4072821..f145dcf 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -450,6 +450,7 @@ fn spawn_rig_audio_stack( sdr_pcm_rx: OptionalSdrPcmRx, sdr_ais_pcm_rx: OptionalSdrAisPcmRx, sdr_vdes_iq_rx: OptionalSdrVdesIqRx, + vchan_manager: Option, ) -> Vec> { let mut handles: Vec> = Vec::new(); @@ -752,6 +753,7 @@ fn spawn_rig_audio_stack( decode_tx, audio_shutdown_rx, audio_histories, + vchan_manager, ) .await { @@ -999,6 +1001,11 @@ async fn main() -> DynResult<()> { // Spawn audio stack. // listen_override priority: --listen CLI flag > global [audio].listen > per-rig default. let audio_listen_override = cli.listen.or(Some(cfg.audio.listen)); + #[cfg(feature = "soapysdr")] + let audio_vchan_manager = sdr_vchan_manager.clone(); + #[cfg(not(feature = "soapysdr"))] + let audio_vchan_manager: Option = None; + let audio_handles = spawn_rig_audio_stack( rig_cfg, state_rx.clone(), @@ -1011,6 +1018,7 @@ async fn main() -> DynResult<()> { sdr_pcm_rx, sdr_ais_pcm_rx, sdr_vdes_iq_rx, + audio_vchan_manager, ); task_handles.extend(audio_handles); diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs index 12487ca..e626869 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs @@ -300,6 +300,62 @@ impl VirtualChannelManager for SdrVirtualChannelManager { fn max_channels(&self) -> usize { self.max_total } + + fn ensure_channel_pcm( + &self, + id: Uuid, + freq_hz: u64, + mode: &RigMode, + ) -> Result>, VChanError> { + // Fast path: channel already exists. + { + let channels = self.channels.read().unwrap(); + if let Some(ch) = channels.iter().find(|c| c.id == id) { + return Ok(ch.pcm_tx.subscribe()); + } + } + + // Slow path: create a new channel with the client-supplied UUID. + let half_span = self.half_span_hz(); + let center = self.center_hz.load(Ordering::Relaxed); + let if_hz = freq_hz as i64 - center; + if if_hz.unsigned_abs() as i64 > half_span { + return Err(VChanError::OutOfBandwidth { + half_span_hz: half_span, + }); + } + + let mut channels = self.channels.write().unwrap(); + if channels.len() >= self.max_total { + return Err(VChanError::CapReached { max: self.max_total }); + } + + let bandwidth_hz = default_bandwidth_hz(mode); + let (pcm_tx, iq_tx) = + self.pipeline + .add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS); + + let pipeline_slot = self + .pipeline + .channel_dsps + .read() + .unwrap() + .len() + .saturating_sub(1); + + let pcm_rx = pcm_tx.subscribe(); + channels.push(ManagedChannel { + id, + freq_hz, + mode: mode.clone(), + pcm_tx, + iq_tx, + pipeline_slot, + permanent: false, + }); + + Ok(pcm_rx) + } } // ---------------------------------------------------------------------------