From 9cd172ce64ce0c56e6757f4aefa19997fd627771 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Mon, 23 Mar 2026 21:00:31 +0100 Subject: [PATCH] [feat](trx-client): support multiple trx-servers from a single client Introduce [[remotes]] config array where each entry maps a user-chosen short name to a (server URL, rig_id) pair. Short names replace raw rig_ids as the universal key throughout frontends, audio routing, and state management, allowing rig_ids to safely collide across servers. Entries sharing the same server URL and token share a single TCP connection. A request routing dispatcher forwards frontend commands to the correct per-server channel based on the short name. Legacy [remote] config and CLI --url are preserved via automatic fallback to a single-entry remotes list. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Stan Grams --- src/trx-client/src/audio_client.rs | 9 +- src/trx-client/src/config.rs | 302 ++++++++++++++++++++++++++- src/trx-client/src/main.rs | 201 ++++++++++++++---- src/trx-client/src/remote_client.rs | 311 ++++++++++++++++++++++------ 4 files changed, 714 insertions(+), 109 deletions(-) diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index e3b3297..6bf899e 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -58,6 +58,8 @@ pub async fn run_multi_rig_audio_manager( server_host: String, default_port: u16, rig_ports: HashMap, + // Per-rig server host overrides (short_name -> host) for multi-server mode. + rig_server_hosts: HashMap, selected_rig_id: Arc>>, known_rigs: Arc>>, global_rx_tx: broadcast::Sender, @@ -147,7 +149,10 @@ pub async fn run_multi_rig_audio_manager( map.insert(rig_id.clone(), per_rig_vchan_tx); } - let addr = format!("{}:{}", server_host, port); + let host = rig_server_hosts + .get(rig_id) + .unwrap_or(&server_host); + let addr = format!("{}:{}", host, port); let rig_id_clone = rig_id.clone(); let global_rx_tx_clone = global_rx_tx.clone(); let global_info_tx_clone = global_stream_info_tx.clone(); @@ -178,7 +183,7 @@ pub async fn run_multi_rig_audio_manager( .await; }); - info!("Audio client: started task for rig {} ({}:{})", rig_id, server_host, port); + info!("Audio client: started task for rig {} ({}:{})", rig_id, host, port); active_tasks.insert(rig_id.clone(), PerRigAudioTask { handle, shutdown_tx: per_rig_shutdown_tx, diff --git a/src/trx-client/src/config.rs b/src/trx-client/src/config.rs index 10ad274..93f1e0a 100644 --- a/src/trx-client/src/config.rs +++ b/src/trx-client/src/config.rs @@ -25,8 +25,12 @@ use trx_app::{ConfigError, ConfigFile}; pub struct ClientConfig { /// General settings pub general: GeneralConfig, - /// Remote connection settings + /// Legacy single remote connection settings. + /// Kept for backward compatibility; prefer `[[remotes]]`. pub remote: RemoteConfig, + /// Named remote connections (one per short-name / rig). + /// Each entry maps a user-chosen short name to a (server, rig_id) pair. + pub remotes: Vec, /// Frontend configurations pub frontends: FrontendsConfig, } @@ -92,6 +96,33 @@ pub struct RemoteAuthConfig { pub token: Option, } +/// A named remote connection entry. +/// +/// Each entry maps a user-chosen **short name** to a `(server, rig_id)` pair. +/// The short name is used as the identifier throughout frontends (HTTP rig +/// picker, rigctl ports, audio routing, etc.) instead of the server-scoped +/// `rig_id`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RemoteEntry { + /// Short name used to identify this remote in frontends and config. + pub name: String, + /// Remote server URL (`host:port` or `tcp://host:port`). + pub url: String, + /// Optional target rig ID on a multi-rig server. + /// When omitted, the server's default (or only) rig is used. + pub rig_id: Option, + /// Authentication settings. + #[serde(default)] + pub auth: RemoteAuthConfig, + /// Poll interval in milliseconds. Defaults to 750. + #[serde(default = "default_poll_interval_ms")] + pub poll_interval_ms: u64, +} + +fn default_poll_interval_ms() -> u64 { + 750 +} + /// Frontend configurations. #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(default)] @@ -334,9 +365,83 @@ pub struct HttpJsonAuthConfig { } impl ClientConfig { + /// Return the effective list of remote entries. + /// + /// If `[[remotes]]` is non-empty, return it directly. Otherwise, + /// synthesize a single entry from the legacy `[remote]` section (if it + /// has a `url`). Returns an empty `Vec` when neither is configured + /// (caller should check CLI overrides). + pub fn resolved_remotes(&self) -> Vec { + if !self.remotes.is_empty() { + return self.remotes.clone(); + } + // Legacy fallback + if let Some(url) = &self.remote.url { + let name = self + .remote + .rig_id + .clone() + .unwrap_or_else(|| "default".to_string()); + vec![RemoteEntry { + name, + url: url.clone(), + rig_id: self.remote.rig_id.clone(), + auth: self.remote.auth.clone(), + poll_interval_ms: self.remote.poll_interval_ms, + }] + } else { + Vec::new() + } + } + pub fn validate(&self) -> Result<(), String> { validate_log_level(self.general.log_level.as_deref())?; + // Validate [[remotes]] entries + { + let mut seen_names = std::collections::HashSet::new(); + for (i, entry) in self.remotes.iter().enumerate() { + if entry.name.trim().is_empty() { + return Err(format!("[[remotes]][{}].name must not be empty", i)); + } + if !seen_names.insert(&entry.name) { + return Err(format!( + "[[remotes]] duplicate name \"{}\"", + entry.name + )); + } + if entry.url.trim().is_empty() { + return Err(format!( + "[[remotes]][{}].url must not be empty (name \"{}\")", + i, entry.name + )); + } + if let Some(rig_id) = &entry.rig_id { + if rig_id.trim().is_empty() { + return Err(format!( + "[[remotes]][{}].rig_id must not be empty when set (name \"{}\")", + i, entry.name + )); + } + } + if let Some(token) = &entry.auth.token { + if token.trim().is_empty() { + return Err(format!( + "[[remotes]][{}].auth.token must not be empty when set (name \"{}\")", + i, entry.name + )); + } + } + if entry.poll_interval_ms == 0 { + return Err(format!( + "[[remotes]][{}].poll_interval_ms must be > 0 (name \"{}\")", + i, entry.name + )); + } + } + } + + // Validate legacy [remote] (kept for backward compat) if self.remote.poll_interval_ms == 0 { return Err("[remote].poll_interval_ms must be > 0".to_string()); } @@ -492,20 +597,33 @@ impl ClientConfig { ais_vessel_url_base: Some("https://www.vesselfinder.com/?mmsi=".to_string()), log_level: Some("info".to_string()), }, - remote: RemoteConfig { - url: Some("192.168.1.100:9000".to_string()), - rig_id: Some("hf".to_string()), - auth: RemoteAuthConfig { - token: Some("my-token".to_string()), + remote: RemoteConfig::default(), + remotes: vec![ + RemoteEntry { + name: "home-hf".to_string(), + url: "192.168.1.100:4530".to_string(), + rig_id: Some("hf".to_string()), + auth: RemoteAuthConfig { + token: Some("my-token".to_string()), + }, + poll_interval_ms: 750, }, - poll_interval_ms: 750, - }, + RemoteEntry { + name: "home-vhf".to_string(), + url: "192.168.1.100:4530".to_string(), + rig_id: Some("vhf".to_string()), + auth: RemoteAuthConfig { + token: Some("my-token".to_string()), + }, + poll_interval_ms: 750, + }, + ], frontends: FrontendsConfig { http: HttpFrontendConfig { enabled: true, listen: IpAddr::from([127, 0, 0, 1]), port: 8080, - default_rig_id: Some("hf".to_string()), + default_rig_id: Some("home-hf".to_string()), initial_map_zoom: 10, spectrum_coverage_margin_hz: 50_000, spectrum_usable_span_ratio: 0.92, @@ -823,4 +941,170 @@ uhf = 60 }; assert_eq!(auth.session_ttl().as_secs(), 3600); } + + #[test] + fn test_parse_remotes_toml() { + let toml_str = r#" +[[remotes]] +name = "home-hf" +url = "192.168.1.10:4530" +rig_id = "hf" +poll_interval_ms = 500 + +[remotes.auth] +token = "secret" + +[[remotes]] +name = "remote" +url = "remote.example.com:4530" +"#; + + let config: ClientConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(config.remotes.len(), 2); + assert_eq!(config.remotes[0].name, "home-hf"); + assert_eq!(config.remotes[0].url, "192.168.1.10:4530"); + assert_eq!(config.remotes[0].rig_id, Some("hf".to_string())); + assert_eq!(config.remotes[0].auth.token, Some("secret".to_string())); + assert_eq!(config.remotes[0].poll_interval_ms, 500); + assert_eq!(config.remotes[1].name, "remote"); + assert_eq!(config.remotes[1].url, "remote.example.com:4530"); + assert!(config.remotes[1].rig_id.is_none()); + assert!(config.remotes[1].auth.token.is_none()); + assert_eq!(config.remotes[1].poll_interval_ms, 750); // default + assert!(config.validate().is_ok()); + } + + #[test] + fn test_resolved_remotes_from_remotes() { + let config = ClientConfig { + remotes: vec![RemoteEntry { + name: "hf".to_string(), + url: "host:4530".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 750, + }], + ..Default::default() + }; + let resolved = config.resolved_remotes(); + assert_eq!(resolved.len(), 1); + assert_eq!(resolved[0].name, "hf"); + } + + #[test] + fn test_resolved_remotes_legacy_fallback() { + let config = ClientConfig { + remote: RemoteConfig { + url: Some("host:4530".to_string()), + rig_id: Some("hf".to_string()), + auth: RemoteAuthConfig { + token: Some("tok".to_string()), + }, + poll_interval_ms: 750, + }, + ..Default::default() + }; + let resolved = config.resolved_remotes(); + assert_eq!(resolved.len(), 1); + assert_eq!(resolved[0].name, "hf"); + assert_eq!(resolved[0].url, "host:4530"); + assert_eq!(resolved[0].rig_id, Some("hf".to_string())); + assert_eq!(resolved[0].auth.token, Some("tok".to_string())); + } + + #[test] + fn test_resolved_remotes_legacy_default_name() { + let config = ClientConfig { + remote: RemoteConfig { + url: Some("host:4530".to_string()), + rig_id: None, + ..Default::default() + }, + ..Default::default() + }; + let resolved = config.resolved_remotes(); + assert_eq!(resolved[0].name, "default"); + } + + #[test] + fn test_resolved_remotes_prefers_remotes_over_legacy() { + let config = ClientConfig { + remote: RemoteConfig { + url: Some("old:4530".to_string()), + ..Default::default() + }, + remotes: vec![RemoteEntry { + name: "new".to_string(), + url: "new:4530".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 750, + }], + ..Default::default() + }; + let resolved = config.resolved_remotes(); + assert_eq!(resolved.len(), 1); + assert_eq!(resolved[0].name, "new"); + } + + #[test] + fn test_validate_rejects_duplicate_remote_names() { + let mut config = ClientConfig::default(); + config.remotes = vec![ + RemoteEntry { + name: "dup".to_string(), + url: "a:4530".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 750, + }, + RemoteEntry { + name: "dup".to_string(), + url: "b:4530".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 750, + }, + ]; + assert!(config.validate().unwrap_err().contains("duplicate name")); + } + + #[test] + fn test_validate_rejects_empty_remote_name() { + let mut config = ClientConfig::default(); + config.remotes = vec![RemoteEntry { + name: "".to_string(), + url: "a:4530".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 750, + }]; + assert!(config.validate().unwrap_err().contains("name must not be empty")); + } + + #[test] + fn test_validate_rejects_empty_remote_url() { + let mut config = ClientConfig::default(); + config.remotes = vec![RemoteEntry { + name: "hf".to_string(), + url: " ".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 750, + }]; + assert!(config.validate().unwrap_err().contains("url must not be empty")); + } + + #[test] + fn test_validate_rejects_zero_remote_poll_interval() { + let mut config = ClientConfig::default(); + config.remotes = vec![RemoteEntry { + name: "hf".to_string(), + url: "a:4530".to_string(), + rig_id: None, + auth: RemoteAuthConfig::default(), + poll_interval_ms: 0, + }]; + assert!(config.validate().unwrap_err().contains("poll_interval_ms must be > 0")); + } } diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index f2afd82..c981afa 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -32,7 +32,7 @@ use trx_frontend_http::register_frontend_on as register_http_frontend; use trx_frontend_http_json::register_frontend_on as register_http_json_frontend; use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend; -use config::ClientConfig; +use config::{ClientConfig, RemoteEntry}; use remote_client::{parse_remote_url, RemoteClientConfig}; const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - remote rig client"); @@ -191,28 +191,44 @@ async fn async_init() -> DynResult { .decode_history_retention_min_by_rig .clone(); - // Resolve remote URL: CLI > config [remote] section > error - let remote_url = cli - .url - .clone() - .or_else(|| cfg.remote.url.clone()) - .ok_or("Remote URL not specified. Use --url or set [remote].url in config.")?; + // Resolve remote entries: CLI --url > [[remotes]] > legacy [remote] > error + let resolved_remotes: Vec = if let Some(ref url) = cli.url { + // CLI --url creates a single implicit remote entry + let rig_id = cli + .rig_id + .clone() + .or_else(|| cfg.remote.rig_id.clone()); + let name = rig_id.clone().unwrap_or_else(|| "default".to_string()); + let token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone()); + let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms); + vec![RemoteEntry { + name, + url: url.clone(), + rig_id, + auth: config::RemoteAuthConfig { token }, + poll_interval_ms, + }] + } else { + let entries = cfg.resolved_remotes(); + if entries.is_empty() { + return Err( + "No remote servers configured. Use --url or add [[remotes]] entries in config." + .into(), + ); + } + entries + }; - let remote_endpoint = - parse_remote_url(&remote_url).map_err(|e| format!("Invalid remote URL: {}", e))?; - - let remote_token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone()); - let remote_rig_id = cli + // Set initial active rig to the configured default or first remote entry. + let default_rig = cli .rig_id .clone() .or_else(|| cfg.frontends.http.default_rig_id.clone()) - .or_else(|| cfg.remote.rig_id.clone()); + .or_else(|| resolved_remotes.first().map(|e| e.name.clone())); if let Ok(mut guard) = frontend_runtime.remote_active_rig_id.lock() { - *guard = remote_rig_id.clone(); + *guard = default_rig.clone(); } - let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms); - // Resolve frontends: CLI > config > default to http let frontends: Vec = if let Some(ref fes) = cli.frontends { fes.iter().map(|f| normalize_name(f)).collect() @@ -259,9 +275,10 @@ async fn async_init() -> DynResult { frontend_runtime.owner_website_name = cfg.general.website_name.clone(); frontend_runtime.ais_vessel_url_base = cfg.general.ais_vessel_url_base.clone(); + let remote_names: Vec<&str> = resolved_remotes.iter().map(|e| e.name.as_str()).collect(); info!( - "Starting trx-client (remote: {}, frontends: {})", - remote_endpoint.connect_addr(), + "Starting trx-client (remotes: [{}], frontends: {})", + remote_names.join(", "), frontends.join(", ") ); @@ -272,28 +289,131 @@ async fn async_init() -> DynResult { let initial_state = RigState::new_uninitialized(); let (state_tx, state_rx) = watch::channel(initial_state); - // Extract host for audio before moving remote_addr - let remote_host = remote_endpoint.host.clone(); + // Group remote entries by (addr, token) so entries sharing a server share + // one TCP connection. Each group gets its own run_remote_client task. + use std::collections::BTreeMap; + use std::sync::RwLock; - let remote_cfg = RemoteClientConfig { - addr: remote_endpoint.connect_addr(), - token: remote_token, - selected_rig_id: frontend_runtime.remote_active_rig_id.clone(), - known_rigs: frontend_runtime.remote_rigs.clone(), - rig_states: frontend_runtime.rig_states.clone(), - poll_interval: Duration::from_millis(poll_interval_ms), - spectrum: frontend_runtime.spectrum.clone(), - rig_spectrums: frontend_runtime.rig_spectrums.clone(), - server_connected: frontend_runtime.server_connected.clone(), - }; - let remote_shutdown_rx = shutdown_rx.clone(); - task_handles.push(tokio::spawn(async move { - if let Err(e) = - remote_client::run_remote_client(remote_cfg, rx, state_tx, remote_shutdown_rx).await - { - error!("Remote client error: {}", e); + // Parse all endpoints upfront. + let parsed_remotes: Vec<(RemoteEntry, remote_client::RemoteEndpoint)> = resolved_remotes + .iter() + .map(|entry| { + let ep = parse_remote_url(&entry.url) + .map_err(|e| format!("Invalid URL for remote '{}': {}", entry.name, e))?; + Ok((entry.clone(), ep)) + }) + .collect::, String>>()?; + + // Build per-short-name server host map for audio routing. + let mut audio_server_hosts: HashMap = HashMap::new(); + for (entry, ep) in &parsed_remotes { + let audio_port = cfg + .frontends + .audio + .rig_ports + .get(&entry.name) + .copied() + .unwrap_or(cfg.frontends.audio.server_port); + audio_server_hosts.insert(entry.name.clone(), (ep.host.clone(), audio_port)); + } + + // Group by (connect_addr, token). + let mut server_groups: BTreeMap<(String, Option), Vec<&RemoteEntry>> = BTreeMap::new(); + let mut endpoint_by_addr: HashMap = HashMap::new(); + for (entry, ep) in &parsed_remotes { + let key = (ep.connect_addr(), entry.auth.token.clone()); + endpoint_by_addr + .entry(ep.connect_addr()) + .or_insert_with(|| ep.clone()); + server_groups.entry(key).or_default().push(entry); + } + + // Per-server request senders for the routing dispatcher. + let mut route_map: HashMap> = HashMap::new(); + + for ((addr, token), entries) in &server_groups { + // Build the rig_id → short_name mapping for this server group. + let mut rig_id_to_short_name: HashMap, String> = HashMap::new(); + for entry in entries { + rig_id_to_short_name.insert(entry.rig_id.clone(), entry.name.clone()); } - })); + + let poll_interval = entries + .iter() + .map(|e| e.poll_interval_ms) + .min() + .unwrap_or(750); + + let (server_tx, server_rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); + for entry in entries { + route_map.insert(entry.name.clone(), server_tx.clone()); + } + + let remote_cfg = RemoteClientConfig { + addr: addr.clone(), + token: token.clone(), + selected_rig_id: frontend_runtime.remote_active_rig_id.clone(), + known_rigs: frontend_runtime.remote_rigs.clone(), + rig_states: frontend_runtime.rig_states.clone(), + poll_interval: Duration::from_millis(poll_interval), + spectrum: frontend_runtime.spectrum.clone(), + rig_spectrums: frontend_runtime.rig_spectrums.clone(), + server_connected: frontend_runtime.server_connected.clone(), + rig_id_to_short_name, + short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + }; + let state_tx = state_tx.clone(); + let remote_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + if let Err(e) = + remote_client::run_remote_client(remote_cfg, server_rx, state_tx, remote_shutdown_rx) + .await + { + error!("Remote client error: {}", e); + } + })); + } + + // Request routing dispatcher: receives from the single frontend-facing + // channel and dispatches to the per-server channel based on rig_id_override + // (short name). + let route_map = Arc::new(route_map); + let default_rig_for_router = frontend_runtime.remote_active_rig_id.clone(); + { + let route_map = route_map.clone(); + let mut frontend_rx = rx; + task_handles.push(tokio::spawn(async move { + while let Some(req) = frontend_rx.recv().await { + let target = req + .rig_id_override + .as_deref() + .map(String::from) + .or_else(|| { + default_rig_for_router + .lock() + .ok() + .and_then(|g| g.clone()) + }); + let sender = target + .as_deref() + .and_then(|name| route_map.get(name)) + .or_else(|| route_map.values().next()); + if let Some(sender) = sender { + let _ = sender.send(req).await; + } else { + let _ = req.respond_to.send(Err(trx_core::RigError::communication( + "no remote server available for this rig", + ))); + } + } + })); + } + + // Extract first remote host for audio backward-compat fallback. + let remote_host = parsed_remotes + .first() + .map(|(_, ep)| ep.host.clone()) + .unwrap_or_else(|| "127.0.0.1".to_string()); // Audio streaming setup let mut pending_audio_client = None; @@ -392,10 +512,15 @@ async fn async_init() -> DynResult { let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone(); let rig_audio_info_map = frontend_runtime.rig_audio_info.clone(); let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone(); + let audio_rig_server_hosts: HashMap = audio_server_hosts + .iter() + .map(|(name, (host, _))| (name.clone(), host.clone())) + .collect(); pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager( remote_host, cfg.frontends.audio.server_port, audio_rig_ports, + audio_rig_server_hosts, frontend_runtime.remote_active_rig_id.clone(), frontend_runtime.remote_rigs.clone(), rx_audio_tx, diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index cce6e5d..beee214 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -61,8 +61,14 @@ pub struct RemoteClientConfig { /// Shared flag: `true` while a TCP connection to trx-server is active. pub server_connected: Arc, pub rig_states: Arc>>>, - /// Per-rig spectrum watch senders, keyed by rig_id. + /// Per-rig spectrum watch senders, keyed by short name (or rig_id in legacy mode). pub rig_spectrums: Arc>>>, + /// Maps configured server rig_id (`Some`) or default/wildcard (`None`) to + /// a client-side short name. Empty in legacy single-remote mode. + pub rig_id_to_short_name: HashMap, String>, + /// Dynamically resolved reverse mapping: short_name → server rig_id. + /// Populated during `refresh_remote_snapshot` when short-name mode is active. + pub short_name_to_rig_id: Arc>>, } pub async fn run_remote_client( @@ -202,29 +208,33 @@ async fn handle_spectrum_connection( // Determine the currently selected rig for backward compat. let selected = selected_rig_id(config); - for rig_id in &rig_ids { + for short_name in &rig_ids { + // Resolve the server rig_id for the wire envelope. + let wire_rig_id = if has_short_names(config) { + resolve_server_rig_id(config, short_name) + } else { + Some(short_name.clone()) + }; let envelope = ClientEnvelope { token: config.token.clone(), - rig_id: Some(rig_id.clone()), + rig_id: wire_rig_id, cmd: ClientCommand::GetSpectrum, }; match send_envelope_no_state_update(&mut writer, &mut reader, envelope).await { Ok(snapshot) => { - // Update per-rig channel. + // Update per-rig channel (keyed by short name). if let Ok(map) = config.rig_spectrums.read() { - if let Some(tx) = map.get(rig_id) { + if let Some(tx) = map.get(short_name) { tx.send_modify(|s| s.set(snapshot.spectrum.clone(), snapshot.vchan_rds.clone())); } } // Update global channel if this is the selected rig. - let is_selected = selected.as_deref() == Some(rig_id.as_str()); + let is_selected = selected.as_deref() == Some(short_name.as_str()); if is_selected { config.spectrum.send_modify(|s| s.set(snapshot.spectrum, snapshot.vchan_rds)); } } Err(e) => { - // A spectrum timeout desynchronises the TCP framing; - // return so the caller reconnects and restores sync. config.spectrum.send_modify(|s| s.set(None, None)); return Err(e); } @@ -318,6 +328,8 @@ async fn send_command( rig_id_override: Option, state_tx: &watch::Sender, ) -> RigResult { + // Keep the original short name for per-rig channel update after response. + let channel_key_override = rig_id_override.clone(); let envelope = build_envelope(config, cmd, rig_id_override); let mut payload = serde_json::to_string(&envelope) @@ -349,10 +361,15 @@ async fn send_command( // Also update the per-rig watch channel so SSE sessions // subscribed to a specific rig see the change immediately // instead of waiting for the next poll cycle. - let rig_id = envelope.rig_id.as_deref(); - if let Some(rid) = rig_id { + // The rig_id_override is a short name in multi-server mode; + // resolve accordingly for the per-rig channel key. + let channel_key = channel_key_override + .as_deref() + .map(String::from) + .or_else(|| selected_rig_id(config)); + if let Some(key) = channel_key { if let Ok(map) = config.rig_states.read() { - if let Some(tx) = map.get(rid) { + if let Some(tx) = map.get(&key) { tx.send_if_modified(|old| { if *old == new_state { false @@ -379,9 +396,17 @@ fn build_envelope( cmd: ClientCommand, rig_id_override: Option, ) -> ClientEnvelope { + let rig_id = rig_id_override.or_else(|| selected_rig_id(config)); + // In multi-server mode, the rig_id is actually a short name that needs to + // be translated back to the server-side rig_id for the wire envelope. + let wire_rig_id = if has_short_names(config) { + rig_id.and_then(|name| resolve_server_rig_id(config, &name)) + } else { + rig_id + }; ClientEnvelope { token: config.token.clone(), - rig_id: rig_id_override.or_else(|| selected_rig_id(config)), + rig_id: wire_rig_id, cmd, } } @@ -393,39 +418,82 @@ async fn refresh_remote_snapshot( state_tx: &watch::Sender, ) -> RigResult<()> { let rigs = send_get_rigs(config, writer, reader).await?; - cache_remote_rigs(config, &rigs); - if rigs.is_empty() { - return Err(RigError::communication("GetRigs returned no rigs")); - } - let selected = selected_rig_id(config); - let target = selected - .as_deref() - .and_then(|id| rigs.iter().find(|entry| entry.rig_id == id)) - .or_else(|| choose_default_rig(rigs.as_slice())) - .ok_or_else(|| RigError::communication("GetRigs returned no selectable rig"))?; - - if selected.as_deref() != Some(target.rig_id.as_str()) { - set_selected_rig_id(config, Some(target.rig_id.clone())); - } - - let new_state = RigState::from_snapshot(target.state.clone()); - // Only wake SSE subscribers when something actually changed. - state_tx.send_if_modified(|old| { - if *old == new_state { - false - } else { - *old = new_state; - true - } - }); - - // Update per-rig watch channels so each SSE session can subscribe - // to a specific rig's state independently. - if let Ok(mut rig_map) = config.rig_states.write() { + // In multi-server mode, filter rigs to only those that have a short name + // mapping, and populate the reverse mapping (short_name → server rig_id). + let mapped_rigs: Vec<(String, &RigEntry)> = if has_short_names(config) { + let mut mapped = Vec::new(); + // Track which wildcard (None-key) entry we've already resolved. + let mut wildcard_resolved = false; for entry in &rigs { + if let Some(short_name) = config + .rig_id_to_short_name + .get(&Some(entry.rig_id.clone())) + { + // Update reverse map. + if let Ok(mut rev) = config.short_name_to_rig_id.write() { + rev.insert(short_name.clone(), entry.rig_id.clone()); + } + mapped.push((short_name.clone(), entry)); + } else if !wildcard_resolved { + if let Some(short_name) = config.rig_id_to_short_name.get(&None) { + // Wildcard: first unmatched rig gets the default short name. + // Prefer an initialized, TX-capable rig when possible. + let candidate = choose_default_rig(&rigs) + .filter(|r| { + !config + .rig_id_to_short_name + .contains_key(&Some(r.rig_id.clone())) + }) + .unwrap_or(entry); + if let Ok(mut rev) = config.short_name_to_rig_id.write() { + rev.insert(short_name.clone(), candidate.rig_id.clone()); + } + mapped.push((short_name.clone(), candidate)); + wildcard_resolved = true; + } + } + } + mapped + } else { + rigs.iter() + .map(|e| (e.rig_id.clone(), e)) + .collect() + }; + + cache_remote_rigs(config, &rigs, &mapped_rigs); + + if mapped_rigs.is_empty() { + return Err(RigError::communication("GetRigs returned no mapped rigs")); + } + + // Determine target for global state_tx (backward compat). + let selected = selected_rig_id(config); + let target_key = selected + .as_deref() + .and_then(|id| mapped_rigs.iter().find(|(key, _)| key == id)) + .or_else(|| mapped_rigs.first()); + + if let Some((key, entry)) = target_key { + if selected.as_deref() != Some(key.as_str()) { + set_selected_rig_id(config, Some(key.clone())); + } + let new_state = RigState::from_snapshot(entry.state.clone()); + state_tx.send_if_modified(|old| { + if *old == new_state { + false + } else { + *old = new_state; + true + } + }); + } + + // Update per-rig watch channels keyed by short name (or rig_id in legacy mode). + if let Ok(mut rig_map) = config.rig_states.write() { + for (key, entry) in &mapped_rigs { let new_state = RigState::from_snapshot(entry.state.clone()); - if let Some(tx) = rig_map.get(&entry.rig_id) { + if let Some(tx) = rig_map.get(key) { tx.send_if_modified(|old| { if *old == new_state { false @@ -436,13 +504,13 @@ async fn refresh_remote_snapshot( }); } else { let (tx, _rx) = watch::channel(new_state); - rig_map.insert(entry.rig_id.clone(), tx); + rig_map.insert(key.clone(), tx); } } - // Remove channels for rigs no longer reported by the server. - let active_ids: std::collections::HashSet<&str> = - rigs.iter().map(|e| e.rig_id.as_str()).collect(); - rig_map.retain(|id, _| active_ids.contains(id.as_str())); + // Remove channels for keys no longer present. + let active_keys: std::collections::HashSet<&str> = + mapped_rigs.iter().map(|(k, _)| k.as_str()).collect(); + rig_map.retain(|id, _| active_keys.contains(id.as_str())); } Ok(()) } @@ -485,25 +553,30 @@ async fn send_get_rigs( )) } -fn cache_remote_rigs(config: &RemoteClientConfig, rigs: &[RigEntry]) { +fn cache_remote_rigs( + config: &RemoteClientConfig, + _raw_rigs: &[RigEntry], + mapped_rigs: &[(String, &RigEntry)], +) { if let Ok(mut guard) = config.known_rigs.lock() { // Skip the Vec rebuild when the rig list is structurally unchanged. - // We compare the fields surfaced in the UI rig picker; full state - // changes are propagated via the watch channel, not this cache. - let unchanged = guard.len() == rigs.len() - && guard.iter().zip(rigs.iter()).all(|(cached, new)| { - cached.rig_id == new.rig_id - && cached.display_name == new.display_name - && cached.state.initialized == new.state.initialized - && cached.audio_port == new.audio_port - }); + let unchanged = guard.len() == mapped_rigs.len() + && guard + .iter() + .zip(mapped_rigs.iter()) + .all(|(cached, (key, new))| { + cached.rig_id == *key + && cached.display_name == new.display_name + && cached.state.initialized == new.state.initialized + && cached.audio_port == new.audio_port + }); if unchanged { return; } - *guard = rigs + *guard = mapped_rigs .iter() - .map(|entry| RemoteRigEntry { - rig_id: entry.rig_id.clone(), + .map(|(key, entry)| RemoteRigEntry { + rig_id: key.clone(), display_name: entry.display_name.clone(), state: entry.state.clone(), audio_port: entry.audio_port, @@ -516,6 +589,38 @@ fn selected_rig_id(config: &RemoteClientConfig) -> Option { config.selected_rig_id.lock().ok().and_then(|g| g.clone()) } +/// Returns `true` when the config has short-name mappings (multi-server mode). +fn has_short_names(config: &RemoteClientConfig) -> bool { + !config.rig_id_to_short_name.is_empty() +} + +/// Resolve a server rig_id to the client-side short name. +/// In legacy mode (no mappings), returns the rig_id unchanged. +#[cfg(test)] +fn resolve_short_name(config: &RemoteClientConfig, server_rig_id: &str) -> Option { + if !has_short_names(config) { + return Some(server_rig_id.to_string()); + } + // Try explicit rig_id mapping first. + if let Some(name) = config.rig_id_to_short_name.get(&Some(server_rig_id.to_string())) { + return Some(name.clone()); + } + // Try wildcard (None key = "default rig on this server"). + config.rig_id_to_short_name.get(&None).cloned() +} + +/// Resolve a client-side short name back to a server rig_id for building envelopes. +fn resolve_server_rig_id(config: &RemoteClientConfig, short_name: &str) -> Option { + if !has_short_names(config) { + return Some(short_name.to_string()); + } + config + .short_name_to_rig_id + .read() + .ok() + .and_then(|map| map.get(short_name).cloned()) +} + fn set_selected_rig_id(config: &RemoteClientConfig, value: Option) { if let Ok(mut guard) = config.selected_rig_id.lock() { *guard = value; @@ -743,6 +848,8 @@ fn parse_port(port_str: &str) -> Result { #[cfg(test)] mod tests { use super::{parse_remote_url, RemoteClientConfig, RemoteEndpoint, SharedSpectrum}; + #[allow(unused_imports)] + use super::{has_short_names, resolve_server_rig_id, resolve_short_name}; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; @@ -932,6 +1039,8 @@ mod tests { server_connected: Arc::new(AtomicBool::new(false)), rig_states: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())), + rig_id_to_short_name: HashMap::new(), + short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), }, req_rx, state_tx, @@ -972,9 +1081,91 @@ mod tests { server_connected: Arc::new(AtomicBool::new(false)), rig_states: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())), + rig_id_to_short_name: HashMap::new(), + short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), }; let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); assert_eq!(envelope.token.as_deref(), Some("secret")); assert_eq!(envelope.rig_id.as_deref(), Some("sdr")); } + + #[test] + fn build_envelope_translates_short_name_to_server_rig_id() { + let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default()); + let short_name_to_rig_id = Arc::new(RwLock::new(HashMap::from([ + ("home-hf".to_string(), "hf".to_string()), + ]))); + let config = RemoteClientConfig { + addr: "127.0.0.1:4530".to_string(), + token: None, + selected_rig_id: Arc::new(Mutex::new(Some("home-hf".to_string()))), + known_rigs: Arc::new(Mutex::new(Vec::new())), + poll_interval: Duration::from_millis(500), + spectrum: Arc::new(spectrum_tx), + server_connected: Arc::new(AtomicBool::new(false)), + rig_states: Arc::new(RwLock::new(HashMap::new())), + rig_spectrums: Arc::new(RwLock::new(HashMap::new())), + rig_id_to_short_name: HashMap::from([ + (Some("hf".to_string()), "home-hf".to_string()), + ]), + short_name_to_rig_id, + }; + // selected_rig_id is "home-hf" (short name), envelope should translate to "hf" + let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); + assert_eq!(envelope.rig_id.as_deref(), Some("hf")); + + // Override with short name should also translate + let envelope = super::build_envelope( + &config, + trx_protocol::ClientCommand::GetState, + Some("home-hf".to_string()), + ); + assert_eq!(envelope.rig_id.as_deref(), Some("hf")); + } + + #[test] + fn resolve_short_name_legacy_passthrough() { + let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default()); + let config = RemoteClientConfig { + addr: "127.0.0.1:4530".to_string(), + token: None, + selected_rig_id: Arc::new(Mutex::new(None)), + known_rigs: Arc::new(Mutex::new(Vec::new())), + poll_interval: Duration::from_millis(500), + spectrum: Arc::new(spectrum_tx), + server_connected: Arc::new(AtomicBool::new(false)), + rig_states: Arc::new(RwLock::new(HashMap::new())), + rig_spectrums: Arc::new(RwLock::new(HashMap::new())), + rig_id_to_short_name: HashMap::new(), + short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + }; + // Legacy mode: rig_id passes through unchanged + assert!(!has_short_names(&config)); + assert_eq!(resolve_short_name(&config, "hf"), Some("hf".to_string())); + } + + #[test] + fn resolve_short_name_with_mapping() { + let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default()); + let config = RemoteClientConfig { + addr: "127.0.0.1:4530".to_string(), + token: None, + selected_rig_id: Arc::new(Mutex::new(None)), + known_rigs: Arc::new(Mutex::new(Vec::new())), + poll_interval: Duration::from_millis(500), + spectrum: Arc::new(spectrum_tx), + server_connected: Arc::new(AtomicBool::new(false)), + rig_states: Arc::new(RwLock::new(HashMap::new())), + rig_spectrums: Arc::new(RwLock::new(HashMap::new())), + rig_id_to_short_name: HashMap::from([ + (Some("hf".to_string()), "home-hf".to_string()), + (None, "default-rig".to_string()), + ]), + short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + }; + assert!(has_short_names(&config)); + assert_eq!(resolve_short_name(&config, "hf"), Some("home-hf".to_string())); + // Unknown rig_id falls through to wildcard + assert_eq!(resolve_short_name(&config, "unknown"), Some("default-rig".to_string())); + } }