[feat](trx-rs): per-virtual-channel audio streaming

Add end-to-end audio routing for virtual DSP channels:

Server (trx-server):
- New wire protocol: AUDIO_MSG_RX_FRAME_CH (0x0b), VCHAN_ALLOCATED (0x0c),
  VCHAN_SUB (0x0d), VCHAN_UNSUB (0x0e), VCHAN_FREQ (0x0f), VCHAN_MODE (0x10),
  VCHAN_REMOVE (0x11) frame types in trx-core audio.rs
- Add frame helpers: write_vchan_uuid_msg, write_vchan_audio_frame,
  parse_vchan_audio_frame, parse_vchan_uuid_msg
- Add ensure_channel_pcm() to VirtualChannelManager trait; implement in
  SdrVirtualChannelManager with create-or-subscribe semantics using client UUID
- Extend audio.rs handle_audio_client: VChanCmd dispatcher, per-channel Opus
  encoder tasks, VCHAN_SUB/UNSUB/FREQ/MODE/REMOVE reader loop handlers
- Thread vchan_manager through run_audio_listener / spawn_rig_audio_stack

Client (trx-client):
- Add VChanAudioCmd enum to trx-frontend; add vchan_audio and vchan_audio_cmd
  fields to FrontendRuntimeContext
- Extend audio_client: demux AUDIO_MSG_RX_FRAME_CH to per-channel broadcasters,
  handle VCHAN_ALLOCATED; forward VChanAudioCmd over TCP write loop
- Wire vchan_cmd_tx/rx channel in main.rs; pass vchan_audio map to audio_client
- ClientChannelManager.set_audio_cmd() / send_audio_cmd(): dispatch
  Subscribe/Remove/SetFreq/SetMode on allocate/delete/freq/mode operations
- Wire audio_cmd sender in server.rs serve() after creating vchan_mgr

HTTP frontend:
- /audio?channel_id=<uuid>: route WebSocket to per-channel Opus broadcaster
- vchan.js: vchanReconnectAudio() stops/restarts RX audio on channel switch;
  _audioChannelOverride in app.js selects primary vs virtual WS endpoint
- app.js: _audioChannelOverride variable; startRxAudio appends channel param

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-11 08:06:18 +01:00
parent 28036ab589
commit 6131d7a1d6
17 changed files with 606 additions and 17 deletions
+1
View File
@@ -17,6 +17,7 @@ tracing = { workspace = true }
clap = { workspace = true, features = ["derive"] }
dirs = "6"
bytes = "1"
uuid = { workspace = true }
cpal = "0.15"
opus = "0.3"
trx-app = { path = "../trx-app" }
+91 -5
View File
@@ -6,7 +6,7 @@
//! RX/TX Opus frames via broadcast/mpsc channels.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use bytes::Bytes;
@@ -19,13 +19,19 @@ use tokio::time;
use tracing::{info, warn};
use trx_frontend::RemoteRigEntry;
use uuid::Uuid;
use trx_core::audio::{
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg,
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE,
AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH,
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_FREQ,
AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB,
AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
};
use trx_core::decode::DecodedMessage;
use trx_frontend::VChanAudioCmd;
/// Run the audio client with auto-reconnect.
#[allow(clippy::too_many_arguments)]
@@ -40,6 +46,8 @@ pub async fn run_audio_client(
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
decode_tx: broadcast::Sender<DecodedMessage>,
mut shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
mut vchan_cmd_rx: mpsc::Receiver<VChanAudioCmd>,
) {
let mut reconnect_delay = Duration::from_secs(1);
@@ -77,6 +85,8 @@ pub async fn run_audio_client(
&stream_info_tx,
&decode_tx,
&mut shutdown_rx,
&vchan_audio,
&mut vchan_cmd_rx,
)
.await
{
@@ -120,6 +130,8 @@ async fn handle_audio_connection(
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
decode_tx: &broadcast::Sender<DecodedMessage>,
shutdown_rx: &mut watch::Receiver<bool>,
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>,
) -> std::io::Result<()> {
let (reader, writer) = stream.into_split();
let mut reader = BufReader::new(reader);
@@ -144,12 +156,34 @@ async fn handle_audio_connection(
// Spawn RX read task
let rx_tx = rx_tx.clone();
let decode_tx = decode_tx.clone();
let vchan_audio_rx: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>> = Arc::clone(vchan_audio);
let mut rx_handle = tokio::spawn(async move {
loop {
match read_audio_msg(&mut reader).await {
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
let _ = rx_tx.send(Bytes::from(payload));
}
Ok((AUDIO_MSG_RX_FRAME_CH, payload)) => {
// Route per-channel Opus frame to the correct broadcaster.
if let Ok((uuid, opus)) = parse_vchan_audio_frame(&payload) {
let pkt = Bytes::copy_from_slice(opus);
if let Ok(map) = vchan_audio_rx.read() {
if let Some(tx) = map.get(&uuid) {
let _ = tx.send(pkt);
}
}
}
}
Ok((AUDIO_MSG_VCHAN_ALLOCATED, payload)) => {
// Server confirmed a virtual channel is ready; ensure a
// broadcaster entry exists in the shared map.
if let Ok(uuid) = parse_vchan_uuid_msg(&payload) {
if let Ok(mut map) = vchan_audio_rx.write() {
map.entry(uuid)
.or_insert_with(|| broadcast::channel::<Bytes>(64).0);
}
}
}
Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => {
// Decompress gzip blob, then iterate the embedded framed messages.
let mut decompressed = Vec::new();
@@ -193,14 +227,14 @@ async fn handle_audio_connection(
}
}
Ok((msg_type, _)) => {
warn!("Audio client: unexpected message type {}", msg_type);
warn!("Audio client: unexpected message type {:#04x}", msg_type);
}
Err(_) => break,
}
}
});
// Forward TX frames to server
// Forward TX frames and VChanAudioCmds to server.
let mut rig_check = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
@@ -228,6 +262,58 @@ async fn handle_audio_connection(
None => break,
}
}
cmd = vchan_cmd_rx.recv() => {
match cmd {
Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => {
let json = serde_json::json!({
"uuid": uuid.to_string(),
"freq_hz": freq_hz,
"mode": mode,
});
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
warn!("Audio vchan SUB write failed: {}", e);
break;
}
}
}
Some(VChanAudioCmd::Unsubscribe(uuid)) => {
if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_UNSUB, uuid).await {
warn!("Audio vchan UNSUB write failed: {}", e);
break;
}
}
Some(VChanAudioCmd::Remove(uuid)) => {
if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_REMOVE, uuid).await {
warn!("Audio vchan REMOVE write failed: {}", e);
break;
}
// Clean up local broadcaster.
if let Ok(mut map) = vchan_audio.write() {
map.remove(&uuid);
}
}
Some(VChanAudioCmd::SetFreq { uuid, freq_hz }) => {
let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz });
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_FREQ, &payload).await {
warn!("Audio vchan FREQ write failed: {}", e);
break;
}
}
}
Some(VChanAudioCmd::SetMode { uuid, mode }) => {
let json = serde_json::json!({ "uuid": uuid.to_string(), "mode": mode });
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_MODE, &payload).await {
warn!("Audio vchan MODE write failed: {}", e);
break;
}
}
}
None => {}
}
}
_ = &mut rx_handle => {
break;
}
+7
View File
@@ -299,6 +299,10 @@ async fn async_init() -> DynResult<AppState> {
frontend_runtime.audio_info = Some(stream_info_rx);
frontend_runtime.decode_rx = Some(decode_tx.clone());
// Virtual-channel audio: shared broadcaster map + command channel.
let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::<trx_frontend::VChanAudioCmd>(64);
*frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
info!(
"Audio enabled: default port {}, decode channel set",
cfg.frontends.audio.server_port
@@ -306,6 +310,7 @@ async fn async_init() -> DynResult<AppState> {
let audio_rig_ports: HashMap<String, u16> = cfg.frontends.audio.rig_ports.clone();
let audio_shutdown_rx = shutdown_rx.clone();
let vchan_audio_map = frontend_runtime.vchan_audio.clone();
pending_audio_client = Some(tokio::spawn(audio_client::run_audio_client(
remote_host,
cfg.frontends.audio.server_port,
@@ -317,6 +322,8 @@ async fn async_init() -> DynResult<AppState> {
stream_info_tx,
decode_tx,
audio_shutdown_rx,
vchan_audio_map,
vchan_cmd_rx,
)));
if cfg.frontends.audio.bridge.enabled {
+1
View File
@@ -9,6 +9,7 @@ edition = "2021"
[dependencies]
bytes = "1"
uuid = { workspace = true }
serde_json = { workspace = true }
trx-core = { path = "../../trx-core" }
tokio = { workspace = true, features = ["sync"] }
+32
View File
@@ -3,6 +3,7 @@
// SPDX-License-Identifier: BSD-2-Clause
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::RwLock;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
@@ -12,6 +13,8 @@ use bytes::Bytes;
use tokio::sync::{broadcast, mpsc, watch};
use tokio::task::JoinHandle;
use uuid::Uuid;
use trx_core::audio::AudioStreamInfo;
use trx_core::decode::{
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
@@ -19,6 +22,24 @@ use trx_core::decode::{
use trx_core::rig::state::{RigSnapshot, SpectrumData};
use trx_core::{DynResult, RigRequest, RigState};
/// Command sent by the HTTP frontend to the audio-client task to manage a
/// virtual channel's audio stream over the server's audio TCP connection.
#[derive(Debug)]
pub enum VChanAudioCmd {
/// Create the server-side DSP channel (if it does not exist) and subscribe
/// to its Opus audio stream. `freq_hz` and `mode` are used if the server
/// needs to create the channel.
Subscribe { uuid: Uuid, freq_hz: u64, mode: String },
/// Unsubscribe from audio (encoder task is stopped) but keep the DSP channel.
Unsubscribe(Uuid),
/// Unsubscribe and destroy the DSP channel.
Remove(Uuid),
/// Update the dial frequency of an existing virtual channel.
SetFreq { uuid: Uuid, freq_hz: u64 },
/// Update the demodulation mode of an existing virtual channel.
SetMode { uuid: Uuid, mode: String },
}
#[derive(Clone, Debug)]
pub struct RemoteRigEntry {
pub rig_id: String,
@@ -206,6 +227,15 @@ pub struct FrontendRuntimeContext {
pub ais_vessel_url_base: Option<String>,
/// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`.
pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
/// Per-virtual-channel Opus audio senders.
/// Key: server-side virtual channel UUID.
/// Value: `broadcast::Sender<Bytes>` that receives per-channel Opus packets
/// forwarded by the audio-client task from `AUDIO_MSG_RX_FRAME_CH` frames.
pub vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
/// Channel to send `VChanAudioCmd` to the audio-client task, which in turn
/// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection.
/// `None` when no audio connection is active.
pub vchan_audio_cmd: Arc<Mutex<Option<mpsc::Sender<VChanAudioCmd>>>>,
}
impl FrontendRuntimeContext {
@@ -249,6 +279,8 @@ impl FrontendRuntimeContext {
let (tx, _rx) = watch::channel(SharedSpectrum::default());
Arc::new(tx)
},
vchan_audio: Arc::new(RwLock::new(HashMap::new())),
vchan_audio_cmd: Arc::new(Mutex::new(None)),
}
}
}
@@ -5879,6 +5879,9 @@ function extractAudioFrameChannels(frame) {
return out;
}
// Optional channel_id injected by vchan.js when connecting to a virtual channel.
let _audioChannelOverride = null;
function startRxAudio() {
if (rxActive) { stopRxAudio(); return; }
if (!hasWebCodecs) {
@@ -5886,7 +5889,10 @@ function startRxAudio() {
return;
}
const proto = location.protocol === "https:" ? "wss:" : "ws:";
audioWs = new WebSocket(`${proto}//${location.host}/audio`);
const audioPath = _audioChannelOverride
? `/audio?channel_id=${encodeURIComponent(_audioChannelOverride)}`
: "/audio";
audioWs = new WebSocket(`${proto}//${location.host}${audioPath}`);
audioWs.binaryType = "arraybuffer";
audioStatus.textContent = "Connecting…";
@@ -36,10 +36,11 @@ function vchanHandleChannels(data) {
const d = JSON.parse(data);
vchanRigId = d.rig_id || null;
vchanChannels = d.channels || [];
// If the active channel was evicted, fall back to channel 0.
// If the active channel was evicted, fall back to channel 0 and reconnect audio.
const ids = new Set(vchanChannels.map(c => c.id));
if (vchanActiveId && !ids.has(vchanActiveId)) {
vchanActiveId = vchanChannels.length > 0 ? vchanChannels[0].id : null;
vchanReconnectAudio();
}
vchanRender();
} catch (e) {
@@ -120,6 +121,7 @@ async function vchanAllocate() {
// The SSE `channels` event will trigger vchanRender(); optimistically
// mark active so the picker feels responsive even before the event arrives.
vchanRender();
vchanReconnectAudio();
} catch (e) {
console.error("vchan: allocate error", e);
}
@@ -159,11 +161,30 @@ async function vchanSubscribe(channelId) {
vchanActiveId = channelId;
vchanRender();
vchanSyncModeDisplay();
vchanReconnectAudio();
} catch (e) {
console.error("vchan: subscribe error", e);
}
}
// Reconnect the audio WebSocket to the appropriate endpoint:
// - virtual channel: /audio?channel_id=<uuid>
// - primary channel: /audio (no param)
// Only reconnects if RX audio is currently active.
function vchanReconnectAudio() {
if (typeof rxActive === "undefined" || !rxActive) return;
// Set the channel override so startRxAudio picks up the right URL.
const ch = vchanIsOnVirtual() ? vchanActiveChannel() : null;
if (typeof _audioChannelOverride !== "undefined") {
_audioChannelOverride = ch ? ch.id : null;
}
if (typeof stopRxAudio === "function") stopRxAudio();
// Small delay so the server has time to set up the per-channel encoder.
setTimeout(() => {
if (typeof startRxAudio === "function") startRxAudio();
}, 200);
}
// Called by app.js from applyCapabilities().
// Shows the channel picker only for SDR rigs.
function vchanApplyCapabilities(caps) {
@@ -17,8 +17,10 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_ws::Message;
use bytes::Bytes;
use serde::Deserialize;
use tokio::sync::broadcast;
use tracing::warn;
use uuid::Uuid;
use trx_core::decode::{
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
@@ -353,15 +355,18 @@ pub fn start_decode_history_collector(context: Arc<FrontendRuntimeContext>) {
});
}
#[derive(Deserialize)]
pub struct AudioQuery {
pub channel_id: Option<Uuid>,
}
#[get("/audio")]
pub async fn audio_ws(
req: HttpRequest,
body: web::Payload,
query: web::Query<AudioQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
let Some(rx) = context.audio_rx.as_ref() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
let Some(tx_sender) = context.audio_tx.as_ref().cloned() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
@@ -374,7 +379,25 @@ pub async fn audio_ws(
return Ok(HttpResponse::NoContent().finish());
}
let mut rx_sub = rx.subscribe();
// If a channel_id is specified, subscribe to the per-channel broadcaster.
// Otherwise fall back to the primary RX broadcast.
let rx_sub: broadcast::Receiver<Bytes> = if let Some(ch_id) = query.channel_id {
match context.vchan_audio.read() {
Ok(map) => match map.get(&ch_id) {
Some(tx) => tx.subscribe(),
None => {
return Ok(HttpResponse::NotFound().body("channel not found"));
}
},
Err(_) => return Ok(HttpResponse::InternalServerError().finish()),
}
} else {
let Some(rx) = context.audio_rx.as_ref() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
rx.subscribe()
};
let mut rx_sub = rx_sub;
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
@@ -83,6 +83,15 @@ async fn serve(
);
let vchan_mgr = Arc::new(ClientChannelManager::new(4));
// Wire the audio-command sender so allocate/delete/freq/mode operations on
// virtual channels are forwarded to the audio-client task.
if let Ok(guard) = context.vchan_audio_cmd.lock() {
if let Some(tx) = guard.as_ref() {
vchan_mgr.set_audio_cmd(tx.clone());
}
}
let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status, vchan_mgr)?;
let handle = server.handle();
tokio::spawn(async move {
@@ -22,6 +22,8 @@ use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use uuid::Uuid;
use trx_frontend::VChanAudioCmd;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
@@ -90,6 +92,8 @@ pub struct ClientChannelManager {
/// `"<rig_id>:"` so subscribers can filter by rig.
pub change_tx: broadcast::Sender<String>,
pub max_channels: usize,
/// Optional sender to the audio-client task for virtual-channel audio commands.
pub audio_cmd: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<VChanAudioCmd>>>,
}
impl ClientChannelManager {
@@ -100,6 +104,20 @@ impl ClientChannelManager {
sessions: RwLock::new(HashMap::new()),
change_tx,
max_channels: max_channels.max(1),
audio_cmd: std::sync::Mutex::new(None),
}
}
/// Wire the audio-command sender so the manager can dispatch
/// `VChanAudioCmd` messages when channels are allocated/deleted/changed.
pub fn set_audio_cmd(&self, tx: tokio::sync::mpsc::Sender<VChanAudioCmd>) {
*self.audio_cmd.lock().unwrap() = Some(tx);
}
/// Fire-and-forget: send a `VChanAudioCmd` to the audio-client task.
fn send_audio_cmd(&self, cmd: VChanAudioCmd) {
if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() {
let _ = tx.try_send(cmd);
}
}
@@ -225,6 +243,13 @@ impl ClientChannelManager {
.unwrap()
.insert(session_id, (rig_id.to_string(), id));
// Request server-side DSP channel + audio subscription.
self.send_audio_cmd(VChanAudioCmd::Subscribe {
uuid: id,
freq_hz,
mode: mode.to_string(),
});
Ok(snapshot)
}
@@ -329,6 +354,10 @@ impl ClientChannelManager {
for sid in evicted {
sessions.remove(&sid);
}
// Remove server-side DSP channel and stop audio encoding.
self.send_audio_cmd(VChanAudioCmd::Remove(channel_id));
Ok(())
}
@@ -347,6 +376,8 @@ impl ClientChannelManager {
.ok_or(VChanClientError::NotFound)?;
ch.freq_hz = freq_hz;
self.broadcast_change(rig_id, channels);
drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetFreq { uuid: channel_id, freq_hz });
Ok(())
}
@@ -364,6 +395,8 @@ impl ClientChannelManager {
.ok_or(VChanClientError::NotFound)?;
ch.mode = mode.to_string();
self.broadcast_change(rig_id, channels);
drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetMode { uuid: channel_id, mode: mode.to_string() });
Ok(())
}