[feat](trx-client): support audio URL overrides

Use full audio endpoint URLs for trx-server audio connections while
preserving server-advertised ports and legacy port-based fallback for
backward compatibility.

Add `server_url` and per-remote `rig_urls` config entries, plus
validation and tests for audio URL parsing and address resolution.

Co-authored-by: OpenAI Codex <noreply@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-23 22:04:14 +01:00
parent d8444f35f6
commit e09f14d2d3
4 changed files with 317 additions and 104 deletions
+118 -23
View File
@@ -21,6 +21,7 @@ use trx_frontend::RemoteRigEntry;
use uuid::Uuid; use uuid::Uuid;
use crate::remote_client::RemoteEndpoint;
use trx_core::audio::{ use trx_core::audio::{
parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg,
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
@@ -43,11 +44,36 @@ struct ActiveVChanSub {
decoder_kinds: Vec<String>, decoder_kinds: Vec<String>,
} }
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AudioConnectConfig {
pub server_host: String,
pub default_port: u16,
pub fixed_addr: Option<String>,
}
impl AudioConnectConfig {
pub fn from_host_port(server_host: String, default_port: u16) -> Self {
Self {
server_host,
default_port,
fixed_addr: None,
}
}
pub fn fixed(addr: String) -> Self {
Self {
server_host: String::new(),
default_port: 0,
fixed_addr: Some(addr),
}
}
}
/// Per-rig audio task state, tracked by the multi-rig manager. /// Per-rig audio task state, tracked by the multi-rig manager.
struct PerRigAudioTask { struct PerRigAudioTask {
handle: tokio::task::JoinHandle<()>, handle: tokio::task::JoinHandle<()>,
shutdown_tx: watch::Sender<bool>, shutdown_tx: watch::Sender<bool>,
port: u16, addr: String,
} }
/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on /// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on
@@ -55,11 +81,8 @@ struct PerRigAudioTask {
/// an `audio_port` gets its own TCP connection. /// an `audio_port` gets its own TCP connection.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn run_multi_rig_audio_manager( pub async fn run_multi_rig_audio_manager(
server_host: String, default_connect: AudioConnectConfig,
default_port: u16, rig_connect: HashMap<String, AudioConnectConfig>,
rig_ports: HashMap<String, u16>,
// Per-rig server host overrides (short_name -> host) for multi-server mode.
rig_server_hosts: HashMap<String, String>,
selected_rig_id: Arc<Mutex<Option<String>>>, selected_rig_id: Arc<Mutex<Option<String>>>,
known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>, known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
global_rx_tx: broadcast::Sender<Bytes>, global_rx_tx: broadcast::Sender<Bytes>,
@@ -86,28 +109,31 @@ pub async fn run_multi_rig_audio_manager(
loop { loop {
tokio::select! { tokio::select! {
_ = poll_interval.tick() => { _ = poll_interval.tick() => {
// Collect current known rigs and their audio ports. // Collect current known rigs and their audio endpoints.
let current_rigs: HashMap<String, u16> = known_rigs let current_rigs: HashMap<String, String> = known_rigs
.lock() .lock()
.ok() .ok()
.map(|entries| { .map(|entries| {
entries.iter().map(|e| { entries.iter().map(|e| {
let port = rig_ports.get(&e.rig_id).copied() let addr = resolve_audio_addr(
.or(e.audio_port) &e.rig_id,
.unwrap_or(default_port); e.audio_port,
(e.rig_id.clone(), port) &rig_connect,
&default_connect,
);
(e.rig_id.clone(), addr)
}).collect() }).collect()
}) })
.unwrap_or_default(); .unwrap_or_default();
// Tear down tasks for rigs that are no longer present or // Tear down tasks for rigs that are no longer present or
// whose port has changed. // whose audio endpoint has changed.
let to_remove: Vec<String> = active_tasks.keys() let to_remove: Vec<String> = active_tasks.keys()
.filter(|id| { .filter(|id| {
match current_rigs.get(*id) { match current_rigs.get(*id) {
None => true, None => true,
Some(port) => active_tasks.get(*id) Some(addr) => active_tasks.get(*id)
.is_none_or(|t| t.port != *port), .is_none_or(|t| t.addr != *addr),
} }
}) })
.cloned() .cloned()
@@ -121,7 +147,7 @@ pub async fn run_multi_rig_audio_manager(
} }
// Spawn tasks for new rigs. // Spawn tasks for new rigs.
for (rig_id, port) in &current_rigs { for (rig_id, addr) in &current_rigs {
if active_tasks.contains_key(rig_id) { if active_tasks.contains_key(rig_id) {
continue; continue;
} }
@@ -149,10 +175,6 @@ pub async fn run_multi_rig_audio_manager(
map.insert(rig_id.clone(), per_rig_vchan_tx); map.insert(rig_id.clone(), per_rig_vchan_tx);
} }
let host = rig_server_hosts
.get(rig_id)
.unwrap_or(&server_host);
let addr = format!("{}:{}", host, port);
let rig_id_clone = rig_id.clone(); let rig_id_clone = rig_id.clone();
let global_rx_tx_clone = global_rx_tx.clone(); let global_rx_tx_clone = global_rx_tx.clone();
let global_info_tx_clone = global_stream_info_tx.clone(); let global_info_tx_clone = global_stream_info_tx.clone();
@@ -162,10 +184,12 @@ pub async fn run_multi_rig_audio_manager(
let vchan_audio_clone = vchan_audio.clone(); let vchan_audio_clone = vchan_audio.clone();
let vchan_destroyed_clone = vchan_destroyed_tx.clone(); let vchan_destroyed_clone = vchan_destroyed_tx.clone();
let tx_rx_clone = tx_rx.clone(); let tx_rx_clone = tx_rx.clone();
let addr = addr.clone();
let task_addr = addr.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
run_single_rig_audio_client( run_single_rig_audio_client(
addr, task_addr,
rig_id_clone, rig_id_clone,
selected_clone, selected_clone,
per_rig_rx_tx, per_rig_rx_tx,
@@ -183,11 +207,11 @@ pub async fn run_multi_rig_audio_manager(
.await; .await;
}); });
info!("Audio client: started task for rig {} ({}:{})", rig_id, host, port); info!("Audio client: started task for rig {} ({})", rig_id, addr);
active_tasks.insert(rig_id.clone(), PerRigAudioTask { active_tasks.insert(rig_id.clone(), PerRigAudioTask {
handle, handle,
shutdown_tx: per_rig_shutdown_tx, shutdown_tx: per_rig_shutdown_tx,
port: *port, addr: addr.clone(),
}); });
} }
} }
@@ -206,6 +230,24 @@ pub async fn run_multi_rig_audio_manager(
} }
} }
fn resolve_audio_addr(
rig_id: &str,
advertised_port: Option<u16>,
rig_connect: &HashMap<String, AudioConnectConfig>,
default_connect: &AudioConnectConfig,
) -> String {
let connect = rig_connect.get(rig_id).unwrap_or(default_connect);
if let Some(addr) = &connect.fixed_addr {
return addr.clone();
}
RemoteEndpoint {
host: connect.server_host.clone(),
port: advertised_port.unwrap_or(connect.default_port),
}
.connect_addr()
}
/// Audio client for a single rig. Maintains its own TCP connection with /// Audio client for a single rig. Maintains its own TCP connection with
/// auto-reconnect, publishes RX frames to both per-rig and (if selected) /// auto-reconnect, publishes RX frames to both per-rig and (if selected)
/// global broadcast channels. /// global broadcast channels.
@@ -295,6 +337,59 @@ async fn run_single_rig_audio_client(
} }
} }
#[cfg(test)]
mod tests {
use super::{resolve_audio_addr, AudioConnectConfig};
use std::collections::HashMap;
#[test]
fn resolve_audio_addr_prefers_fixed_url() {
let mut rig_connect = HashMap::new();
rig_connect.insert(
"home-hf".to_string(),
AudioConnectConfig::fixed("audio.example.com:4700".to_string()),
);
let addr = resolve_audio_addr(
"home-hf",
Some(4531),
&rig_connect,
&AudioConnectConfig::from_host_port("control.example.com".to_string(), 4531),
);
assert_eq!(addr, "audio.example.com:4700");
}
#[test]
fn resolve_audio_addr_uses_advertised_port_with_remote_host() {
let mut rig_connect = HashMap::new();
rig_connect.insert(
"home-hf".to_string(),
AudioConnectConfig::from_host_port("control.example.com".to_string(), 4531),
);
let addr = resolve_audio_addr(
"home-hf",
Some(4600),
&rig_connect,
&AudioConnectConfig::from_host_port("fallback.example.com".to_string(), 4531),
);
assert_eq!(addr, "control.example.com:4600");
}
#[test]
fn resolve_audio_addr_falls_back_to_default_port() {
let rig_connect = HashMap::new();
let addr = resolve_audio_addr(
"home-hf",
None,
&rig_connect,
&AudioConnectConfig::from_host_port("fallback.example.com".to_string(), 4531),
);
assert_eq!(addr, "fallback.example.com:4531");
}
}
/// Handle a single TCP connection for one rig. Similar to `handle_audio_connection` /// Handle a single TCP connection for one rig. Similar to `handle_audio_connection`
/// but publishes to per-rig channels directly and mirrors to global when selected. /// but publishes to per-rig channels directly and mirrors to global when selected.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
+81 -11
View File
@@ -143,9 +143,18 @@ pub struct FrontendsConfig {
pub struct AudioClientConfig { pub struct AudioClientConfig {
/// Whether audio streaming is enabled /// Whether audio streaming is enabled
pub enabled: bool, pub enabled: bool,
/// Audio TCP port on the remote server /// Optional exact audio TCP URL override applied to all remotes.
/// When set, this takes precedence over `server_port`.
pub server_url: Option<String>,
/// Optional per-rig audio URL overrides keyed by remote short name.
/// These take precedence over `server_url`, server-advertised ports, and
/// the legacy `rig_ports` map.
pub rig_urls: HashMap<String, String>,
/// Legacy audio TCP port fallback on the remote server when no URL override
/// is configured and the server does not advertise a per-rig audio port.
pub server_port: u16, pub server_port: u16,
/// Optional per-rig audio port overrides for multi-rig servers. /// Legacy per-rig audio port overrides for multi-rig servers.
/// Prefer `rig_urls` when the audio endpoint differs by host as well.
pub rig_ports: HashMap<String, u16>, pub rig_ports: HashMap<String, u16>,
/// Local audio bridge (virtual device integration) /// Local audio bridge (virtual device integration)
pub bridge: AudioBridgeConfig, pub bridge: AudioBridgeConfig,
@@ -155,6 +164,8 @@ impl Default for AudioClientConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
enabled: true, enabled: true,
server_url: None,
rig_urls: HashMap::new(),
server_port: 4531, server_port: 4531,
rig_ports: HashMap::new(), rig_ports: HashMap::new(),
bridge: AudioBridgeConfig::default(), bridge: AudioBridgeConfig::default(),
@@ -406,10 +417,7 @@ impl ClientConfig {
return Err(format!("[[remotes]][{}].name must not be empty", i)); return Err(format!("[[remotes]][{}].name must not be empty", i));
} }
if !seen_names.insert(&entry.name) { if !seen_names.insert(&entry.name) {
return Err(format!( return Err(format!("[[remotes]] duplicate name \"{}\"", entry.name));
"[[remotes]] duplicate name \"{}\"",
entry.name
));
} }
if entry.url.trim().is_empty() { if entry.url.trim().is_empty() {
return Err(format!( return Err(format!(
@@ -483,7 +491,7 @@ impl ClientConfig {
if let Some(rig_id) = &self.frontends.http.default_rig_name { if let Some(rig_id) = &self.frontends.http.default_rig_name {
if rig_id.trim().is_empty() { if rig_id.trim().is_empty() {
return Err( return Err(
"[frontends.http].default_rig_name must not be empty when set".to_string() "[frontends.http].default_rig_name must not be empty when set".to_string(),
); );
} }
} }
@@ -534,9 +542,23 @@ impl ClientConfig {
)); ));
} }
} }
if self.frontends.audio.enabled && self.frontends.audio.server_port == 0 { if let Some(url) = &self.frontends.audio.server_url {
crate::remote_client::parse_audio_url(url)
.map_err(|e| format!("[frontends.audio].server_url {e}"))?;
}
if self.frontends.audio.enabled
&& self.frontends.audio.server_url.is_none()
&& self.frontends.audio.server_port == 0
{
return Err("[frontends.audio].server_port must be > 0 when enabled".to_string()); return Err("[frontends.audio].server_port must be > 0 when enabled".to_string());
} }
for (rig_id, url) in &self.frontends.audio.rig_urls {
if rig_id.trim().is_empty() {
return Err("[frontends.audio].rig_urls keys must not be empty".to_string());
}
crate::remote_client::parse_audio_url(url)
.map_err(|e| format!("[frontends.audio].rig_urls[\"{rig_id}\"] {e}"))?;
}
for (rig_id, port) in &self.frontends.audio.rig_ports { for (rig_id, port) in &self.frontends.audio.rig_ports {
if rig_id.trim().is_empty() { if rig_id.trim().is_empty() {
return Err("[frontends.audio].rig_ports keys must not be empty".to_string()); return Err("[frontends.audio].rig_ports keys must not be empty".to_string());
@@ -750,6 +772,8 @@ mod tests {
); );
assert_eq!(config.remote.poll_interval_ms, 750); assert_eq!(config.remote.poll_interval_ms, 750);
assert!(config.frontends.audio.enabled); assert!(config.frontends.audio.enabled);
assert!(config.frontends.audio.server_url.is_none());
assert!(config.frontends.audio.rig_urls.is_empty());
assert_eq!(config.frontends.audio.server_port, 4531); assert_eq!(config.frontends.audio.server_port, 4531);
assert!(config.frontends.audio.rig_ports.is_empty()); assert!(config.frontends.audio.rig_ports.is_empty());
assert!(!config.frontends.audio.bridge.enabled); assert!(!config.frontends.audio.bridge.enabled);
@@ -825,6 +849,28 @@ uhf = 60
); );
} }
#[test]
fn test_parse_client_toml_with_audio_urls() {
let toml_str = r#"
[frontends.audio]
enabled = true
server_url = "tcp://audio.example.com"
[frontends.audio.rig_urls]
home-hf = "audio://10.0.0.5:4600"
"#;
let config: ClientConfig = toml::from_str(toml_str).unwrap();
assert_eq!(
config.frontends.audio.server_url,
Some("tcp://audio.example.com".to_string())
);
assert_eq!(
config.frontends.audio.rig_urls.get("home-hf"),
Some(&"audio://10.0.0.5:4600".to_string())
);
}
#[test] #[test]
fn test_example_combined_toml_parses() { fn test_example_combined_toml_parses() {
let example = ClientConfig::example_combined_toml(); let example = ClientConfig::example_combined_toml();
@@ -865,6 +911,21 @@ uhf = 60
assert!(config.validate().is_err()); assert!(config.validate().is_err());
} }
#[test]
fn test_validate_rejects_invalid_audio_url() {
let mut config = ClientConfig::default();
config.frontends.audio.server_url = Some("tcp://:4531".to_string());
assert!(config.validate().is_err());
}
#[test]
fn test_validate_accepts_audio_url_without_server_port() {
let mut config = ClientConfig::default();
config.frontends.audio.server_url = Some("audio.example.com".to_string());
config.frontends.audio.server_port = 0;
assert!(config.validate().is_ok());
}
#[test] #[test]
fn test_validate_rejects_http_auth_enabled_without_passphrases() { fn test_validate_rejects_http_auth_enabled_without_passphrases() {
let mut config = ClientConfig::default(); let mut config = ClientConfig::default();
@@ -1080,7 +1141,10 @@ url = "remote.example.com:4530"
auth: RemoteAuthConfig::default(), auth: RemoteAuthConfig::default(),
poll_interval_ms: 750, poll_interval_ms: 750,
}]; }];
assert!(config.validate().unwrap_err().contains("name must not be empty")); assert!(config
.validate()
.unwrap_err()
.contains("name must not be empty"));
} }
#[test] #[test]
@@ -1093,7 +1157,10 @@ url = "remote.example.com:4530"
auth: RemoteAuthConfig::default(), auth: RemoteAuthConfig::default(),
poll_interval_ms: 750, poll_interval_ms: 750,
}]; }];
assert!(config.validate().unwrap_err().contains("url must not be empty")); assert!(config
.validate()
.unwrap_err()
.contains("url must not be empty"));
} }
#[test] #[test]
@@ -1106,6 +1173,9 @@ url = "remote.example.com:4530"
auth: RemoteAuthConfig::default(), auth: RemoteAuthConfig::default(),
poll_interval_ms: 0, poll_interval_ms: 0,
}]; }];
assert!(config.validate().unwrap_err().contains("poll_interval_ms must be > 0")); assert!(config
.validate()
.unwrap_err()
.contains("poll_interval_ms must be > 0"));
} }
} }
+48 -29
View File
@@ -32,8 +32,9 @@ use trx_frontend_http::register_frontend_on as register_http_frontend;
use trx_frontend_http_json::register_frontend_on as register_http_json_frontend; use trx_frontend_http_json::register_frontend_on as register_http_json_frontend;
use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend; use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend;
use audio_client::AudioConnectConfig;
use config::{ClientConfig, RemoteEntry}; use config::{ClientConfig, RemoteEntry};
use remote_client::{parse_remote_url, RemoteClientConfig}; use remote_client::{parse_audio_url, parse_remote_url, RemoteClientConfig};
const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - remote rig client"); const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - remote rig client");
const RIG_TASK_CHANNEL_BUFFER: usize = 32; const RIG_TASK_CHANNEL_BUFFER: usize = 32;
@@ -194,10 +195,7 @@ async fn async_init() -> DynResult<AppState> {
// Resolve remote entries: CLI --url > [[remotes]] > legacy [remote] > error // Resolve remote entries: CLI --url > [[remotes]] > legacy [remote] > error
let resolved_remotes: Vec<RemoteEntry> = if let Some(ref url) = cli.url { let resolved_remotes: Vec<RemoteEntry> = if let Some(ref url) = cli.url {
// CLI --url creates a single implicit remote entry // CLI --url creates a single implicit remote entry
let rig_id = cli let rig_id = cli.rig_id.clone().or_else(|| cfg.remote.rig_id.clone());
.rig_id
.clone()
.or_else(|| cfg.remote.rig_id.clone());
let name = rig_id.clone().unwrap_or_else(|| "default".to_string()); let name = rig_id.clone().unwrap_or_else(|| "default".to_string());
let token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone()); let token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone());
let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms); let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms);
@@ -304,9 +302,34 @@ async fn async_init() -> DynResult<AppState> {
}) })
.collect::<Result<Vec<_>, String>>()?; .collect::<Result<Vec<_>, String>>()?;
// Build per-short-name server host map for audio routing. let global_audio_addr = cfg
let mut audio_server_hosts: HashMap<String, (String, u16)> = HashMap::new(); .frontends
.audio
.server_url
.as_deref()
.map(|url| {
parse_audio_url(url)
.map(|endpoint| endpoint.connect_addr())
.map_err(|e| format!("Invalid audio URL override '{}': {}", url, e))
})
.transpose()?;
// Build per-short-name audio connection defaults.
let mut audio_connect: HashMap<String, AudioConnectConfig> = HashMap::new();
for (entry, ep) in &parsed_remotes { for (entry, ep) in &parsed_remotes {
let connect = if let Some(url) = cfg.frontends.audio.rig_urls.get(&entry.name) {
let addr = parse_audio_url(url)
.map(|endpoint| endpoint.connect_addr())
.map_err(|e| {
format!(
"Invalid audio URL override for remote '{}': {}",
entry.name, e
)
})?;
AudioConnectConfig::fixed(addr)
} else if let Some(addr) = global_audio_addr.clone() {
AudioConnectConfig::fixed(addr)
} else {
let audio_port = cfg let audio_port = cfg
.frontends .frontends
.audio .audio
@@ -314,7 +337,9 @@ async fn async_init() -> DynResult<AppState> {
.get(&entry.name) .get(&entry.name)
.copied() .copied()
.unwrap_or(cfg.frontends.audio.server_port); .unwrap_or(cfg.frontends.audio.server_port);
audio_server_hosts.insert(entry.name.clone(), (ep.host.clone(), audio_port)); AudioConnectConfig::from_host_port(ep.host.clone(), audio_port)
};
audio_connect.insert(entry.name.clone(), connect);
} }
// Group by (connect_addr, token). // Group by (connect_addr, token).
@@ -365,8 +390,12 @@ async fn async_init() -> DynResult<AppState> {
let state_tx = state_tx.clone(); let state_tx = state_tx.clone();
let remote_shutdown_rx = shutdown_rx.clone(); let remote_shutdown_rx = shutdown_rx.clone();
task_handles.push(tokio::spawn(async move { task_handles.push(tokio::spawn(async move {
if let Err(e) = if let Err(e) = remote_client::run_remote_client(
remote_client::run_remote_client(remote_cfg, server_rx, state_tx, remote_shutdown_rx) remote_cfg,
server_rx,
state_tx,
remote_shutdown_rx,
)
.await .await
{ {
error!("Remote client error: {}", e); error!("Remote client error: {}", e);
@@ -388,12 +417,7 @@ async fn async_init() -> DynResult<AppState> {
.rig_id_override .rig_id_override
.as_deref() .as_deref()
.map(String::from) .map(String::from)
.or_else(|| { .or_else(|| default_rig_for_router.lock().ok().and_then(|g| g.clone()));
default_rig_for_router
.lock()
.ok()
.and_then(|g| g.clone())
});
let sender = target let sender = target
.as_deref() .as_deref()
.and_then(|name| route_map.get(name)) .and_then(|name| route_map.get(name))
@@ -501,26 +525,21 @@ async fn async_init() -> DynResult<AppState> {
} }
}); });
info!( info!("Audio enabled: decode channel set");
"Audio enabled: default port {}, decode channel set",
cfg.frontends.audio.server_port
);
let audio_rig_ports: HashMap<String, u16> = cfg.frontends.audio.rig_ports.clone();
let audio_shutdown_rx = shutdown_rx.clone(); let audio_shutdown_rx = shutdown_rx.clone();
let vchan_audio_map = frontend_runtime.vchan_audio.clone(); let vchan_audio_map = frontend_runtime.vchan_audio.clone();
let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone(); let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone();
let rig_audio_info_map = frontend_runtime.rig_audio_info.clone(); let rig_audio_info_map = frontend_runtime.rig_audio_info.clone();
let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone(); let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone();
let audio_rig_server_hosts: HashMap<String, String> = audio_server_hosts let default_audio_connect = if let Some(addr) = global_audio_addr {
.iter() AudioConnectConfig::fixed(addr)
.map(|(name, (host, _))| (name.clone(), host.clone())) } else {
.collect(); AudioConnectConfig::from_host_port(remote_host.clone(), cfg.frontends.audio.server_port)
};
pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager( pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager(
remote_host, default_audio_connect,
cfg.frontends.audio.server_port, audio_connect,
audio_rig_ports,
audio_rig_server_hosts,
frontend_runtime.remote_active_rig_id.clone(), frontend_runtime.remote_active_rig_id.clone(),
frontend_runtime.remote_rigs.clone(), frontend_runtime.remote_rigs.clone(),
rx_audio_tx, rx_audio_tx,
+62 -33
View File
@@ -23,6 +23,7 @@ use trx_protocol::types::RigEntry;
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse}; use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse};
const DEFAULT_REMOTE_PORT: u16 = 4530; const DEFAULT_REMOTE_PORT: u16 = 4530;
const DEFAULT_AUDIO_PORT: u16 = 4531;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const IO_TIMEOUT: Duration = Duration::from_secs(15); const IO_TIMEOUT: Duration = Duration::from_secs(15);
const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_secs(3); const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_secs(3);
@@ -426,10 +427,7 @@ async fn refresh_remote_snapshot(
// Track which wildcard (None-key) entry we've already resolved. // Track which wildcard (None-key) entry we've already resolved.
let mut wildcard_resolved = false; let mut wildcard_resolved = false;
for entry in &rigs { for entry in &rigs {
if let Some(short_name) = config if let Some(short_name) = config.rig_id_to_short_name.get(&Some(entry.rig_id.clone())) {
.rig_id_to_short_name
.get(&Some(entry.rig_id.clone()))
{
// Update reverse map. // Update reverse map.
if let Ok(mut rev) = config.short_name_to_rig_id.write() { if let Ok(mut rev) = config.short_name_to_rig_id.write() {
rev.insert(short_name.clone(), entry.rig_id.clone()); rev.insert(short_name.clone(), entry.rig_id.clone());
@@ -456,9 +454,7 @@ async fn refresh_remote_snapshot(
} }
mapped mapped
} else { } else {
rigs.iter() rigs.iter().map(|e| (e.rig_id.clone(), e)).collect()
.map(|e| (e.rig_id.clone(), e))
.collect()
}; };
cache_remote_rigs(config, &rigs, &mapped_rigs); cache_remote_rigs(config, &rigs, &mapped_rigs);
@@ -602,7 +598,10 @@ fn resolve_short_name(config: &RemoteClientConfig, server_rig_id: &str) -> Optio
return Some(server_rig_id.to_string()); return Some(server_rig_id.to_string());
} }
// Try explicit rig_id mapping first. // Try explicit rig_id mapping first.
if let Some(name) = config.rig_id_to_short_name.get(&Some(server_rig_id.to_string())) { if let Some(name) = config
.rig_id_to_short_name
.get(&Some(server_rig_id.to_string()))
{
return Some(name.clone()); return Some(name.clone());
} }
// Try wildcard (None key = "default rig on this server"). // Try wildcard (None key = "default rig on this server").
@@ -777,35 +776,44 @@ async fn read_limited_line<R: AsyncBufRead + Unpin>(
} }
pub fn parse_remote_url(url: &str) -> Result<RemoteEndpoint, String> { pub fn parse_remote_url(url: &str) -> Result<RemoteEndpoint, String> {
parse_endpoint_url(url, DEFAULT_REMOTE_PORT, "remote")
}
pub fn parse_audio_url(url: &str) -> Result<RemoteEndpoint, String> {
parse_endpoint_url(url, DEFAULT_AUDIO_PORT, "audio")
}
fn parse_endpoint_url(url: &str, default_port: u16, kind: &str) -> Result<RemoteEndpoint, String> {
let trimmed = url.trim(); let trimmed = url.trim();
if trimmed.is_empty() { if trimmed.is_empty() {
return Err("remote url is empty".into()); return Err(format!("{kind} url is empty"));
} }
let addr = trimmed let addr = trimmed
.strip_prefix("tcp://") .strip_prefix("tcp://")
.or_else(|| trimmed.strip_prefix("http-json://")) .or_else(|| trimmed.strip_prefix("http-json://"))
.or_else(|| trimmed.strip_prefix("audio://"))
.unwrap_or(trimmed); .unwrap_or(trimmed);
parse_host_port(addr) parse_host_port(addr, default_port, kind)
} }
fn parse_host_port(input: &str) -> Result<RemoteEndpoint, String> { fn parse_host_port(input: &str, default_port: u16, kind: &str) -> Result<RemoteEndpoint, String> {
if let Some(rest) = input.strip_prefix('[') { if let Some(rest) = input.strip_prefix('[') {
let closing = rest let closing = rest
.find(']') .find(']')
.ok_or("invalid remote url: missing closing ']' for IPv6 host")?; .ok_or_else(|| format!("invalid {kind} url: missing closing ']' for IPv6 host"))?;
let host = &rest[..closing]; let host = &rest[..closing];
let remainder = &rest[closing + 1..]; let remainder = &rest[closing + 1..];
if host.is_empty() { if host.is_empty() {
return Err("invalid remote url: host is empty".into()); return Err(format!("invalid {kind} url: host is empty"));
} }
let port = if remainder.is_empty() { let port = if remainder.is_empty() {
DEFAULT_REMOTE_PORT default_port
} else if let Some(port_str) = remainder.strip_prefix(':') { } else if let Some(port_str) = remainder.strip_prefix(':') {
parse_port(port_str)? parse_port(port_str, kind)?
} else { } else {
return Err("invalid remote url: expected ':<port>' after ']'".into()); return Err(format!("invalid {kind} url: expected ':<port>' after ']'"));
}; };
return Ok(RemoteEndpoint { return Ok(RemoteEndpoint {
host: host.to_string(), host: host.to_string(),
@@ -815,41 +823,45 @@ fn parse_host_port(input: &str) -> Result<RemoteEndpoint, String> {
if input.contains(':') { if input.contains(':') {
if input.matches(':').count() > 1 { if input.matches(':').count() > 1 {
return Err("invalid remote url: IPv6 host must be bracketed like [::1]:4532".into()); return Err(format!(
"invalid {kind} url: IPv6 host must be bracketed like [::1]:4532"
));
} }
let (host, port_str) = input let (host, port_str) = input
.rsplit_once(':') .rsplit_once(':')
.ok_or("invalid remote url: expected host:port")?; .ok_or_else(|| format!("invalid {kind} url: expected host:port"))?;
if host.is_empty() { if host.is_empty() {
return Err("invalid remote url: host is empty".into()); return Err(format!("invalid {kind} url: host is empty"));
} }
return Ok(RemoteEndpoint { return Ok(RemoteEndpoint {
host: host.to_string(), host: host.to_string(),
port: parse_port(port_str)?, port: parse_port(port_str, kind)?,
}); });
} }
Ok(RemoteEndpoint { Ok(RemoteEndpoint {
host: input.to_string(), host: input.to_string(),
port: DEFAULT_REMOTE_PORT, port: default_port,
}) })
} }
fn parse_port(port_str: &str) -> Result<u16, String> { fn parse_port(port_str: &str, kind: &str) -> Result<u16, String> {
let port: u16 = port_str let port: u16 = port_str
.parse() .parse()
.map_err(|_| format!("invalid remote port: '{port_str}'"))?; .map_err(|_| format!("invalid {kind} port: '{port_str}'"))?;
if port == 0 { if port == 0 {
return Err("invalid remote port: 0".into()); return Err(format!("invalid {kind} port: 0"));
} }
Ok(port) Ok(port)
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{parse_remote_url, RemoteClientConfig, RemoteEndpoint, SharedSpectrum};
#[allow(unused_imports)] #[allow(unused_imports)]
use super::{has_short_names, resolve_server_rig_id, resolve_short_name}; use super::{has_short_names, resolve_server_rig_id, resolve_short_name};
use super::{
parse_audio_url, parse_remote_url, RemoteClientConfig, RemoteEndpoint, SharedSpectrum,
};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@@ -878,6 +890,18 @@ mod tests {
); );
} }
#[test]
fn parse_audio_host_default_port() {
let parsed = parse_audio_url("audio.example.local").expect("must parse");
assert_eq!(
parsed,
RemoteEndpoint {
host: "audio.example.local".to_string(),
port: 4531
}
);
}
#[test] #[test]
fn parse_ipv4_with_port() { fn parse_ipv4_with_port() {
let parsed = parse_remote_url("tcp://127.0.0.1:9000").expect("must parse"); let parsed = parse_remote_url("tcp://127.0.0.1:9000").expect("must parse");
@@ -1092,9 +1116,10 @@ mod tests {
#[test] #[test]
fn build_envelope_translates_short_name_to_server_rig_id() { fn build_envelope_translates_short_name_to_server_rig_id() {
let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default()); let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default());
let short_name_to_rig_id = Arc::new(RwLock::new(HashMap::from([ let short_name_to_rig_id = Arc::new(RwLock::new(HashMap::from([(
("home-hf".to_string(), "hf".to_string()), "home-hf".to_string(),
]))); "hf".to_string(),
)])));
let config = RemoteClientConfig { let config = RemoteClientConfig {
addr: "127.0.0.1:4530".to_string(), addr: "127.0.0.1:4530".to_string(),
token: None, token: None,
@@ -1105,9 +1130,7 @@ mod tests {
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([ rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
(Some("hf".to_string()), "home-hf".to_string()),
]),
short_name_to_rig_id, short_name_to_rig_id,
}; };
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf" // selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
@@ -1164,8 +1187,14 @@ mod tests {
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
}; };
assert!(has_short_names(&config)); assert!(has_short_names(&config));
assert_eq!(resolve_short_name(&config, "hf"), Some("home-hf".to_string())); assert_eq!(
resolve_short_name(&config, "hf"),
Some("home-hf".to_string())
);
// Unknown rig_id falls through to wildcard // Unknown rig_id falls through to wildcard
assert_eq!(resolve_short_name(&config, "unknown"), Some("default-rig".to_string())); assert_eq!(
resolve_short_name(&config, "unknown"),
Some("default-rig".to_string())
);
} }
} }