[feat](trx-server): add audio capture and TCP streaming

Add AudioConfig to server configuration with support for RX capture
and TX playback via cpal and Opus encoding. Run a dedicated TCP
listener (default port 4533) that sends StreamInfo on connect, streams
RX Opus frames to clients, and receives TX frames back.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-07 14:21:59 +01:00
parent 771cbd1987
commit eba13ac2c2
5 changed files with 1009 additions and 7 deletions
+3
View File
@@ -18,5 +18,8 @@ tracing-subscriber = { workspace = true }
clap = { workspace = true, features = ["derive"] }
dirs = "6"
libloading = "0.8"
bytes = "1"
cpal = "0.15"
opus = "0.3"
trx-backend = { path = "trx-backend" }
trx-core = { path = "../trx-core" }
+307
View File
@@ -0,0 +1,307 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Audio capture, playback, and TCP streaming for trx-server.
use std::net::SocketAddr;
use bytes::Bytes;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc};
use tracing::{error, info, warn};
use trx_core::audio::{
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO,
AUDIO_MSG_TX_FRAME,
};
use crate::config::AudioConfig;
/// Spawn the audio capture thread.
///
/// Opens the configured input device via cpal, accumulates PCM samples into
/// frames of `frame_duration_ms` length, encodes each frame with Opus, and
/// broadcasts the resulting packets.
pub fn spawn_audio_capture(
cfg: &AudioConfig,
tx: broadcast::Sender<Bytes>,
) -> std::thread::JoinHandle<()> {
let sample_rate = cfg.sample_rate;
let channels = cfg.channels as u16;
let frame_duration_ms = cfg.frame_duration_ms;
let bitrate_bps = cfg.bitrate_bps;
let device_name = cfg.device.clone();
std::thread::spawn(move || {
if let Err(e) =
run_capture(sample_rate, channels, frame_duration_ms, bitrate_bps, device_name, tx)
{
error!("Audio capture thread error: {}", e);
}
})
}
fn run_capture(
sample_rate: u32,
channels: u16,
frame_duration_ms: u16,
bitrate_bps: u32,
device_name: Option<String>,
tx: broadcast::Sender<Bytes>,
) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
host.input_devices()?
.find(|d| d.name().map(|n| n == *name).unwrap_or(false))
.ok_or_else(|| format!("audio input device '{}' not found", name))?
} else {
host.default_input_device()
.ok_or("no default audio input device")?
};
info!(
"Audio capture: using device '{}'",
device.name().unwrap_or_else(|_| "unknown".into())
);
let config = cpal::StreamConfig {
channels,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let frame_samples = (sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize;
let opus_channels = match channels {
1 => opus::Channels::Mono,
2 => opus::Channels::Stereo,
_ => return Err(format!("unsupported channel count: {}", channels).into()),
};
let mut encoder = opus::Encoder::new(sample_rate, opus_channels, opus::Application::Audio)?;
encoder.set_bitrate(opus::Bitrate::Bits(bitrate_bps as i32))?;
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(64);
let stream = device.build_input_stream(
&config,
move |data: &[f32], _: &cpal::InputCallbackInfo| {
let _ = sample_tx.try_send(data.to_vec());
},
move |err| {
error!("Audio input stream error: {}", err);
},
None,
)?;
stream.play()?;
info!("Audio capture: started ({}Hz, {} ch, {}ms frames)", sample_rate, channels, frame_duration_ms);
let mut pcm_buf: Vec<f32> = Vec::with_capacity(frame_samples * 2);
let mut opus_buf = vec![0u8; 4096];
loop {
match sample_rx.recv() {
Ok(samples) => {
pcm_buf.extend_from_slice(&samples);
while pcm_buf.len() >= frame_samples {
let frame: Vec<f32> = pcm_buf.drain(..frame_samples).collect();
match encoder.encode_float(&frame, &mut opus_buf) {
Ok(len) => {
let packet = Bytes::copy_from_slice(&opus_buf[..len]);
let _ = tx.send(packet);
}
Err(e) => {
warn!("Opus encode error: {}", e);
}
}
}
}
Err(_) => break,
}
}
Ok(())
}
/// Spawn the audio playback task.
///
/// Receives Opus packets, decodes them, and plays through cpal output.
pub fn spawn_audio_playback(
cfg: &AudioConfig,
rx: mpsc::Receiver<Bytes>,
) -> std::thread::JoinHandle<()> {
let sample_rate = cfg.sample_rate;
let channels = cfg.channels as u16;
let frame_duration_ms = cfg.frame_duration_ms;
let device_name = cfg.device.clone();
std::thread::spawn(move || {
if let Err(e) = run_playback(sample_rate, channels, frame_duration_ms, device_name, rx) {
error!("Audio playback thread error: {}", e);
}
})
}
fn run_playback(
sample_rate: u32,
channels: u16,
frame_duration_ms: u16,
device_name: Option<String>,
mut rx: mpsc::Receiver<Bytes>,
) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
host.output_devices()?
.find(|d| d.name().map(|n| n == *name).unwrap_or(false))
.ok_or_else(|| format!("audio output device '{}' not found", name))?
} else {
host.default_output_device()
.ok_or("no default audio output device")?
};
info!(
"Audio playback: using device '{}'",
device.name().unwrap_or_else(|_| "unknown".into())
);
let config = cpal::StreamConfig {
channels,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let frame_samples = (sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize;
let opus_channels = match channels {
1 => opus::Channels::Mono,
2 => opus::Channels::Stereo,
_ => return Err(format!("unsupported channel count: {}", channels).into()),
};
let mut decoder = opus::Decoder::new(sample_rate, opus_channels)?;
let ring = std::sync::Arc::new(std::sync::Mutex::new(std::collections::VecDeque::<f32>::with_capacity(frame_samples * 8)));
let ring_writer = ring.clone();
let stream = device.build_output_stream(
&config,
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let mut ring = ring.lock().unwrap();
for sample in data.iter_mut() {
*sample = ring.pop_front().unwrap_or(0.0);
}
},
move |err| {
error!("Audio output stream error: {}", err);
},
None,
)?;
stream.play()?;
info!("Audio playback: started");
let rt = tokio::runtime::Handle::current();
let mut pcm_buf = vec![0f32; frame_samples];
rt.block_on(async {
while let Some(packet) = rx.recv().await {
match decoder.decode_float(&packet, &mut pcm_buf, false) {
Ok(decoded) => {
let mut ring = ring_writer.lock().unwrap();
ring.extend(&pcm_buf[..decoded * channels as usize]);
}
Err(e) => {
warn!("Opus decode error: {}", e);
}
}
}
});
Ok(())
}
/// Run the audio TCP listener, accepting client connections.
pub async fn run_audio_listener(
addr: SocketAddr,
rx_audio: broadcast::Sender<Bytes>,
tx_audio: mpsc::Sender<Bytes>,
stream_info: AudioStreamInfo,
) -> std::io::Result<()> {
let listener = TcpListener::bind(addr).await?;
info!("Audio listener on {}", addr);
loop {
let (socket, peer) = listener.accept().await?;
info!("Audio client connected: {}", peer);
let rx_audio = rx_audio.clone();
let tx_audio = tx_audio.clone();
let info = stream_info.clone();
tokio::spawn(async move {
if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info).await {
warn!("Audio client {} error: {:?}", peer, e);
}
info!("Audio client {} disconnected", peer);
});
}
}
async fn handle_audio_client(
socket: TcpStream,
peer: SocketAddr,
rx_audio: broadcast::Sender<Bytes>,
tx_audio: mpsc::Sender<Bytes>,
stream_info: AudioStreamInfo,
) -> std::io::Result<()> {
let (reader, writer) = socket.into_split();
let mut reader = tokio::io::BufReader::new(reader);
let mut writer = tokio::io::BufWriter::new(writer);
// Send stream info
let info_json = serde_json::to_vec(&stream_info)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?;
// Spawn RX forwarding task
let mut rx_sub = rx_audio.subscribe();
let mut writer_for_rx = writer;
let rx_handle = tokio::spawn(async move {
loop {
match rx_sub.recv().await {
Ok(packet) => {
if let Err(e) = write_audio_msg(&mut writer_for_rx, AUDIO_MSG_RX_FRAME, &packet).await {
warn!("Audio RX write to {} failed: {}", peer, e);
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Audio RX: {} dropped {} frames", peer, n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
// Read TX frames from client
loop {
match read_audio_msg(&mut reader).await {
Ok((AUDIO_MSG_TX_FRAME, payload)) => {
let _ = tx_audio.send(Bytes::from(payload)).await;
}
Ok((msg_type, _)) => {
warn!("Audio: unexpected message type {} from {}", msg_type, peer);
}
Err(_) => break,
}
}
rx_handle.abort();
Ok(())
}
+49
View File
@@ -29,6 +29,8 @@ pub struct ServerConfig {
pub behavior: BehaviorConfig,
/// TCP listener configuration
pub listen: ListenConfig,
/// Audio streaming configuration
pub audio: AudioConfig,
}
/// General application settings.
@@ -141,6 +143,49 @@ pub struct AuthConfig {
pub tokens: Vec<String>,
}
/// Audio streaming configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AudioConfig {
/// Whether audio streaming is enabled
pub enabled: bool,
/// IP address to listen on for audio connections
pub listen: IpAddr,
/// TCP port for audio connections
pub port: u16,
/// Whether RX audio capture is enabled
pub rx_enabled: bool,
/// Whether TX audio playback is enabled
pub tx_enabled: bool,
/// Audio input device name (None = system default)
pub device: Option<String>,
/// Sample rate in Hz
pub sample_rate: u32,
/// Number of audio channels
pub channels: u8,
/// Opus frame duration in milliseconds
pub frame_duration_ms: u16,
/// Opus bitrate in bits per second
pub bitrate_bps: u32,
}
impl Default for AudioConfig {
fn default() -> Self {
Self {
enabled: false,
listen: IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
port: 4533,
rx_enabled: true,
tx_enabled: true,
device: None,
sample_rate: 48000,
channels: 1,
frame_duration_ms: 20,
bitrate_bps: 24000,
}
}
}
impl ServerConfig {
/// Load configuration from a specific file path.
pub fn load_from_file(path: &Path) -> Result<Self, ConfigError> {
@@ -205,6 +250,7 @@ impl ServerConfig {
},
behavior: BehaviorConfig::default(),
listen: ListenConfig::default(),
audio: AudioConfig::default(),
};
toml::to_string_pretty(&example).unwrap_or_default()
@@ -259,6 +305,9 @@ mod tests {
assert!(config.listen.enabled);
assert_eq!(config.listen.port, 4532);
assert!(config.listen.auth.tokens.is_empty());
assert!(!config.audio.enabled);
assert_eq!(config.audio.port, 4533);
assert_eq!(config.audio.sample_rate, 48000);
}
#[test]
+33 -1
View File
@@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: BSD-2-Clause
mod audio;
mod config;
mod error;
mod listener;
@@ -13,11 +14,14 @@ use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::time::Duration;
use bytes::Bytes;
use clap::{Parser, ValueEnum};
use tokio::signal;
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{error, info};
use trx_core::audio::AudioStreamInfo;
use trx_backend::{is_backend_registered, register_builtin_backends, registered_backends, RigAccess};
use trx_core::radio::freq::Freq;
use trx_core::rig::controller::{AdaptivePolling, ExponentialBackoff};
@@ -304,6 +308,34 @@ async fn main() -> DynResult<()> {
});
}
if cfg.audio.enabled {
let audio_listen = SocketAddr::from((cfg.audio.listen, cfg.audio.port));
let stream_info = AudioStreamInfo {
sample_rate: cfg.audio.sample_rate,
channels: cfg.audio.channels,
frame_duration_ms: cfg.audio.frame_duration_ms,
};
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
if cfg.audio.rx_enabled {
let _capture_thread = audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone());
}
if cfg.audio.tx_enabled {
let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx);
}
tokio::spawn(async move {
if let Err(e) =
audio::run_audio_listener(audio_listen, rx_audio_tx, tx_audio_tx, stream_info)
.await
{
error!("Audio listener error: {:?}", e);
}
});
}
let _tx = tx;
signal::ctrl_c().await?;