fix(trx-client): route audio by selected rig with per-rig port map
This commit is contained in:
@@ -5,6 +5,8 @@
|
|||||||
//! Audio TCP client that connects to the server's audio port and relays
|
//! Audio TCP client that connects to the server's audio port and relays
|
||||||
//! RX/TX Opus frames via broadcast/mpsc channels.
|
//! RX/TX Opus frames via broadcast/mpsc channels.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
@@ -23,7 +25,10 @@ use trx_core::decode::DecodedMessage;
|
|||||||
|
|
||||||
/// Run the audio client with auto-reconnect.
|
/// Run the audio client with auto-reconnect.
|
||||||
pub async fn run_audio_client(
|
pub async fn run_audio_client(
|
||||||
server_addr: String,
|
server_host: String,
|
||||||
|
default_port: u16,
|
||||||
|
rig_ports: HashMap<String, u16>,
|
||||||
|
selected_rig_id: Arc<Mutex<Option<String>>>,
|
||||||
rx_tx: broadcast::Sender<Bytes>,
|
rx_tx: broadcast::Sender<Bytes>,
|
||||||
mut tx_rx: mpsc::Receiver<Bytes>,
|
mut tx_rx: mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
||||||
@@ -38,12 +43,27 @@ pub async fn run_audio_client(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let server_addr = resolve_audio_addr(
|
||||||
|
&server_host,
|
||||||
|
default_port,
|
||||||
|
&rig_ports,
|
||||||
|
selected_rig_id
|
||||||
|
.lock()
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.clone())
|
||||||
|
.as_deref(),
|
||||||
|
);
|
||||||
info!("Audio client: connecting to {}", server_addr);
|
info!("Audio client: connecting to {}", server_addr);
|
||||||
match TcpStream::connect(&server_addr).await {
|
match TcpStream::connect(&server_addr).await {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
reconnect_delay = Duration::from_secs(1);
|
reconnect_delay = Duration::from_secs(1);
|
||||||
if let Err(e) = handle_audio_connection(
|
if let Err(e) = handle_audio_connection(
|
||||||
stream,
|
stream,
|
||||||
|
&server_host,
|
||||||
|
default_port,
|
||||||
|
&rig_ports,
|
||||||
|
&selected_rig_id,
|
||||||
|
&server_addr,
|
||||||
&rx_tx,
|
&rx_tx,
|
||||||
&mut tx_rx,
|
&mut tx_rx,
|
||||||
&stream_info_tx,
|
&stream_info_tx,
|
||||||
@@ -80,6 +100,11 @@ pub async fn run_audio_client(
|
|||||||
|
|
||||||
async fn handle_audio_connection(
|
async fn handle_audio_connection(
|
||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
|
server_host: &str,
|
||||||
|
default_port: u16,
|
||||||
|
rig_ports: &HashMap<String, u16>,
|
||||||
|
selected_rig_id: &Arc<Mutex<Option<String>>>,
|
||||||
|
connected_addr: &str,
|
||||||
rx_tx: &broadcast::Sender<Bytes>,
|
rx_tx: &broadcast::Sender<Bytes>,
|
||||||
tx_rx: &mut mpsc::Receiver<Bytes>,
|
tx_rx: &mut mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
||||||
@@ -135,6 +160,7 @@ async fn handle_audio_connection(
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Forward TX frames to server
|
// Forward TX frames to server
|
||||||
|
let mut rig_check = time::interval(Duration::from_millis(500));
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
changed = shutdown_rx.changed() => {
|
changed = shutdown_rx.changed() => {
|
||||||
@@ -164,8 +190,38 @@ async fn handle_audio_connection(
|
|||||||
_ = &mut rx_handle => {
|
_ = &mut rx_handle => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
_ = rig_check.tick() => {
|
||||||
|
let current_rig = selected_rig_id.lock().ok().and_then(|v| v.clone());
|
||||||
|
let desired_addr = resolve_audio_addr(
|
||||||
|
server_host,
|
||||||
|
default_port,
|
||||||
|
rig_ports,
|
||||||
|
current_rig.as_deref(),
|
||||||
|
);
|
||||||
|
if desired_addr != connected_addr {
|
||||||
|
info!(
|
||||||
|
"Audio client: active rig changed ({} -> {}), reconnecting audio",
|
||||||
|
connected_addr,
|
||||||
|
desired_addr
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn resolve_audio_addr(
|
||||||
|
host: &str,
|
||||||
|
default_port: u16,
|
||||||
|
rig_ports: &HashMap<String, u16>,
|
||||||
|
selected_rig_id: Option<&str>,
|
||||||
|
) -> String {
|
||||||
|
let port = selected_rig_id
|
||||||
|
.and_then(|rig_id| rig_ports.get(rig_id))
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(default_port);
|
||||||
|
format!("{}:{}", host, port)
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
//! 4. `~/.config/trx-rs/client.toml` (XDG config)
|
//! 4. `~/.config/trx-rs/client.toml` (XDG config)
|
||||||
//! 5. `/etc/trx-rs/client.toml` (system-wide)
|
//! 5. `/etc/trx-rs/client.toml` (system-wide)
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -104,6 +105,8 @@ pub struct AudioClientConfig {
|
|||||||
pub enabled: bool,
|
pub enabled: bool,
|
||||||
/// Audio TCP port on the remote server
|
/// Audio TCP port on the remote server
|
||||||
pub server_port: u16,
|
pub server_port: u16,
|
||||||
|
/// Optional per-rig audio port overrides for multi-rig servers.
|
||||||
|
pub rig_ports: HashMap<String, u16>,
|
||||||
/// Local audio bridge (virtual device integration)
|
/// Local audio bridge (virtual device integration)
|
||||||
pub bridge: AudioBridgeConfig,
|
pub bridge: AudioBridgeConfig,
|
||||||
}
|
}
|
||||||
@@ -113,6 +116,7 @@ impl Default for AudioClientConfig {
|
|||||||
Self {
|
Self {
|
||||||
enabled: true,
|
enabled: true,
|
||||||
server_port: 4531,
|
server_port: 4531,
|
||||||
|
rig_ports: HashMap::new(),
|
||||||
bridge: AudioBridgeConfig::default(),
|
bridge: AudioBridgeConfig::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -323,6 +327,17 @@ impl ClientConfig {
|
|||||||
if self.frontends.audio.enabled && self.frontends.audio.server_port == 0 {
|
if self.frontends.audio.enabled && self.frontends.audio.server_port == 0 {
|
||||||
return Err("[frontends.audio].server_port must be > 0 when enabled".to_string());
|
return Err("[frontends.audio].server_port must be > 0 when enabled".to_string());
|
||||||
}
|
}
|
||||||
|
for (rig_id, port) in &self.frontends.audio.rig_ports {
|
||||||
|
if rig_id.trim().is_empty() {
|
||||||
|
return Err("[frontends.audio].rig_ports keys must not be empty".to_string());
|
||||||
|
}
|
||||||
|
if *port == 0 {
|
||||||
|
return Err(format!(
|
||||||
|
"[frontends.audio].rig_ports[\"{}\"] must be > 0",
|
||||||
|
rig_id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
if !self.frontends.audio.bridge.rx_gain.is_finite()
|
if !self.frontends.audio.bridge.rx_gain.is_finite()
|
||||||
|| self.frontends.audio.bridge.rx_gain < 0.0
|
|| self.frontends.audio.bridge.rx_gain < 0.0
|
||||||
{
|
{
|
||||||
@@ -492,6 +507,7 @@ mod tests {
|
|||||||
assert_eq!(config.remote.poll_interval_ms, 750);
|
assert_eq!(config.remote.poll_interval_ms, 750);
|
||||||
assert!(config.frontends.audio.enabled);
|
assert!(config.frontends.audio.enabled);
|
||||||
assert_eq!(config.frontends.audio.server_port, 4531);
|
assert_eq!(config.frontends.audio.server_port, 4531);
|
||||||
|
assert!(config.frontends.audio.rig_ports.is_empty());
|
||||||
assert!(!config.frontends.audio.bridge.enabled);
|
assert!(!config.frontends.audio.bridge.enabled);
|
||||||
assert_eq!(config.frontends.audio.bridge.rx_gain, 1.0);
|
assert_eq!(config.frontends.audio.bridge.rx_gain, 1.0);
|
||||||
assert_eq!(config.frontends.audio.bridge.tx_gain, 1.0);
|
assert_eq!(config.frontends.audio.bridge.tx_gain, 1.0);
|
||||||
@@ -552,6 +568,17 @@ port = 8080
|
|||||||
assert!(config.validate().is_err());
|
assert!(config.validate().is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_validate_rejects_zero_audio_rig_port() {
|
||||||
|
let mut config = ClientConfig::default();
|
||||||
|
config
|
||||||
|
.frontends
|
||||||
|
.audio
|
||||||
|
.rig_ports
|
||||||
|
.insert("ft817".to_string(), 0);
|
||||||
|
assert!(config.validate().is_err());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_validate_rejects_http_auth_enabled_without_passphrases() {
|
fn test_validate_rejects_http_auth_enabled_without_passphrases() {
|
||||||
let mut config = ClientConfig::default();
|
let mut config = ClientConfig::default();
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ mod audio_client;
|
|||||||
mod config;
|
mod config;
|
||||||
mod remote_client;
|
mod remote_client;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::ptr::NonNull;
|
use std::ptr::NonNull;
|
||||||
@@ -277,21 +278,23 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
let (stream_info_tx, stream_info_rx) = watch::channel::<Option<AudioStreamInfo>>(None);
|
let (stream_info_tx, stream_info_rx) = watch::channel::<Option<AudioStreamInfo>>(None);
|
||||||
let (decode_tx, _) = broadcast::channel::<DecodedMessage>(256);
|
let (decode_tx, _) = broadcast::channel::<DecodedMessage>(256);
|
||||||
|
|
||||||
let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port);
|
|
||||||
|
|
||||||
frontend_runtime.audio_rx = Some(rx_audio_tx.clone());
|
frontend_runtime.audio_rx = Some(rx_audio_tx.clone());
|
||||||
frontend_runtime.audio_tx = Some(tx_audio_tx);
|
frontend_runtime.audio_tx = Some(tx_audio_tx);
|
||||||
frontend_runtime.audio_info = Some(stream_info_rx);
|
frontend_runtime.audio_info = Some(stream_info_rx);
|
||||||
frontend_runtime.decode_rx = Some(decode_tx.clone());
|
frontend_runtime.decode_rx = Some(decode_tx.clone());
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Audio enabled: connecting to {}, decode channel set",
|
"Audio enabled: default port {}, decode channel set",
|
||||||
audio_addr
|
cfg.frontends.audio.server_port
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let audio_rig_ports: HashMap<String, u16> = cfg.frontends.audio.rig_ports.clone();
|
||||||
let audio_shutdown_rx = shutdown_rx.clone();
|
let audio_shutdown_rx = shutdown_rx.clone();
|
||||||
task_handles.push(tokio::spawn(audio_client::run_audio_client(
|
task_handles.push(tokio::spawn(audio_client::run_audio_client(
|
||||||
audio_addr,
|
remote_host,
|
||||||
|
cfg.frontends.audio.server_port,
|
||||||
|
audio_rig_ports,
|
||||||
|
frontend_runtime.remote_active_rig_id.clone(),
|
||||||
rx_audio_tx,
|
rx_audio_tx,
|
||||||
tx_audio_rx,
|
tx_audio_rx,
|
||||||
stream_info_tx,
|
stream_info_tx,
|
||||||
|
|||||||
@@ -85,6 +85,9 @@ port = 0
|
|||||||
enabled = true
|
enabled = true
|
||||||
# Remote trx-server audio port
|
# Remote trx-server audio port
|
||||||
server_port = 4531
|
server_port = 4531
|
||||||
|
# Optional per-rig audio ports for multi-rig servers:
|
||||||
|
# rig_ports.ft817 = 4531
|
||||||
|
# rig_ports.airspyhf = 4532
|
||||||
|
|
||||||
[frontends.audio.bridge]
|
[frontends.audio.bridge]
|
||||||
# Enable local cpal bridge for WSJT-X virtual audio routing
|
# Enable local cpal bridge for WSJT-X virtual audio routing
|
||||||
|
|||||||
Reference in New Issue
Block a user