[feat](trx-client): support multiple trx-servers from a single client

Introduce [[remotes]] config array where each entry maps a user-chosen
short name to a (server URL, rig_id) pair. Short names replace raw
rig_ids as the universal key throughout frontends, audio routing, and
state management, allowing rig_ids to safely collide across servers.

Entries sharing the same server URL and token share a single TCP
connection. A request routing dispatcher forwards frontend commands to
the correct per-server channel based on the short name.

Legacy [remote] config and CLI --url are preserved via automatic
fallback to a single-entry remotes list.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-23 21:00:31 +01:00
parent 1e438acaf7
commit 9cd172ce64
4 changed files with 714 additions and 109 deletions
+7 -2
View File
@@ -58,6 +58,8 @@ pub async fn run_multi_rig_audio_manager(
server_host: String,
default_port: u16,
rig_ports: HashMap<String, u16>,
// Per-rig server host overrides (short_name -> host) for multi-server mode.
rig_server_hosts: HashMap<String, String>,
selected_rig_id: Arc<Mutex<Option<String>>>,
known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
global_rx_tx: broadcast::Sender<Bytes>,
@@ -147,7 +149,10 @@ pub async fn run_multi_rig_audio_manager(
map.insert(rig_id.clone(), per_rig_vchan_tx);
}
let addr = format!("{}:{}", server_host, port);
let host = rig_server_hosts
.get(rig_id)
.unwrap_or(&server_host);
let addr = format!("{}:{}", host, port);
let rig_id_clone = rig_id.clone();
let global_rx_tx_clone = global_rx_tx.clone();
let global_info_tx_clone = global_stream_info_tx.clone();
@@ -178,7 +183,7 @@ pub async fn run_multi_rig_audio_manager(
.await;
});
info!("Audio client: started task for rig {} ({}:{})", rig_id, server_host, port);
info!("Audio client: started task for rig {} ({}:{})", rig_id, host, port);
active_tasks.insert(rig_id.clone(), PerRigAudioTask {
handle,
shutdown_tx: per_rig_shutdown_tx,
+293 -9
View File
@@ -25,8 +25,12 @@ use trx_app::{ConfigError, ConfigFile};
pub struct ClientConfig {
/// General settings
pub general: GeneralConfig,
/// Remote connection settings
/// Legacy single remote connection settings.
/// Kept for backward compatibility; prefer `[[remotes]]`.
pub remote: RemoteConfig,
/// Named remote connections (one per short-name / rig).
/// Each entry maps a user-chosen short name to a (server, rig_id) pair.
pub remotes: Vec<RemoteEntry>,
/// Frontend configurations
pub frontends: FrontendsConfig,
}
@@ -92,6 +96,33 @@ pub struct RemoteAuthConfig {
pub token: Option<String>,
}
/// A named remote connection entry.
///
/// Each entry maps a user-chosen **short name** to a `(server, rig_id)` pair.
/// The short name is used as the identifier throughout frontends (HTTP rig
/// picker, rigctl ports, audio routing, etc.) instead of the server-scoped
/// `rig_id`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteEntry {
/// Short name used to identify this remote in frontends and config.
pub name: String,
/// Remote server URL (`host:port` or `tcp://host:port`).
pub url: String,
/// Optional target rig ID on a multi-rig server.
/// When omitted, the server's default (or only) rig is used.
pub rig_id: Option<String>,
/// Authentication settings.
#[serde(default)]
pub auth: RemoteAuthConfig,
/// Poll interval in milliseconds. Defaults to 750.
#[serde(default = "default_poll_interval_ms")]
pub poll_interval_ms: u64,
}
fn default_poll_interval_ms() -> u64 {
750
}
/// Frontend configurations.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
@@ -334,9 +365,83 @@ pub struct HttpJsonAuthConfig {
}
impl ClientConfig {
/// Return the effective list of remote entries.
///
/// If `[[remotes]]` is non-empty, return it directly. Otherwise,
/// synthesize a single entry from the legacy `[remote]` section (if it
/// has a `url`). Returns an empty `Vec` when neither is configured
/// (caller should check CLI overrides).
pub fn resolved_remotes(&self) -> Vec<RemoteEntry> {
if !self.remotes.is_empty() {
return self.remotes.clone();
}
// Legacy fallback
if let Some(url) = &self.remote.url {
let name = self
.remote
.rig_id
.clone()
.unwrap_or_else(|| "default".to_string());
vec![RemoteEntry {
name,
url: url.clone(),
rig_id: self.remote.rig_id.clone(),
auth: self.remote.auth.clone(),
poll_interval_ms: self.remote.poll_interval_ms,
}]
} else {
Vec::new()
}
}
pub fn validate(&self) -> Result<(), String> {
validate_log_level(self.general.log_level.as_deref())?;
// Validate [[remotes]] entries
{
let mut seen_names = std::collections::HashSet::new();
for (i, entry) in self.remotes.iter().enumerate() {
if entry.name.trim().is_empty() {
return Err(format!("[[remotes]][{}].name must not be empty", i));
}
if !seen_names.insert(&entry.name) {
return Err(format!(
"[[remotes]] duplicate name \"{}\"",
entry.name
));
}
if entry.url.trim().is_empty() {
return Err(format!(
"[[remotes]][{}].url must not be empty (name \"{}\")",
i, entry.name
));
}
if let Some(rig_id) = &entry.rig_id {
if rig_id.trim().is_empty() {
return Err(format!(
"[[remotes]][{}].rig_id must not be empty when set (name \"{}\")",
i, entry.name
));
}
}
if let Some(token) = &entry.auth.token {
if token.trim().is_empty() {
return Err(format!(
"[[remotes]][{}].auth.token must not be empty when set (name \"{}\")",
i, entry.name
));
}
}
if entry.poll_interval_ms == 0 {
return Err(format!(
"[[remotes]][{}].poll_interval_ms must be > 0 (name \"{}\")",
i, entry.name
));
}
}
}
// Validate legacy [remote] (kept for backward compat)
if self.remote.poll_interval_ms == 0 {
return Err("[remote].poll_interval_ms must be > 0".to_string());
}
@@ -492,20 +597,33 @@ impl ClientConfig {
ais_vessel_url_base: Some("https://www.vesselfinder.com/?mmsi=".to_string()),
log_level: Some("info".to_string()),
},
remote: RemoteConfig {
url: Some("192.168.1.100:9000".to_string()),
rig_id: Some("hf".to_string()),
auth: RemoteAuthConfig {
token: Some("my-token".to_string()),
remote: RemoteConfig::default(),
remotes: vec![
RemoteEntry {
name: "home-hf".to_string(),
url: "192.168.1.100:4530".to_string(),
rig_id: Some("hf".to_string()),
auth: RemoteAuthConfig {
token: Some("my-token".to_string()),
},
poll_interval_ms: 750,
},
poll_interval_ms: 750,
},
RemoteEntry {
name: "home-vhf".to_string(),
url: "192.168.1.100:4530".to_string(),
rig_id: Some("vhf".to_string()),
auth: RemoteAuthConfig {
token: Some("my-token".to_string()),
},
poll_interval_ms: 750,
},
],
frontends: FrontendsConfig {
http: HttpFrontendConfig {
enabled: true,
listen: IpAddr::from([127, 0, 0, 1]),
port: 8080,
default_rig_id: Some("hf".to_string()),
default_rig_id: Some("home-hf".to_string()),
initial_map_zoom: 10,
spectrum_coverage_margin_hz: 50_000,
spectrum_usable_span_ratio: 0.92,
@@ -823,4 +941,170 @@ uhf = 60
};
assert_eq!(auth.session_ttl().as_secs(), 3600);
}
#[test]
fn test_parse_remotes_toml() {
let toml_str = r#"
[[remotes]]
name = "home-hf"
url = "192.168.1.10:4530"
rig_id = "hf"
poll_interval_ms = 500
[remotes.auth]
token = "secret"
[[remotes]]
name = "remote"
url = "remote.example.com:4530"
"#;
let config: ClientConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.remotes.len(), 2);
assert_eq!(config.remotes[0].name, "home-hf");
assert_eq!(config.remotes[0].url, "192.168.1.10:4530");
assert_eq!(config.remotes[0].rig_id, Some("hf".to_string()));
assert_eq!(config.remotes[0].auth.token, Some("secret".to_string()));
assert_eq!(config.remotes[0].poll_interval_ms, 500);
assert_eq!(config.remotes[1].name, "remote");
assert_eq!(config.remotes[1].url, "remote.example.com:4530");
assert!(config.remotes[1].rig_id.is_none());
assert!(config.remotes[1].auth.token.is_none());
assert_eq!(config.remotes[1].poll_interval_ms, 750); // default
assert!(config.validate().is_ok());
}
#[test]
fn test_resolved_remotes_from_remotes() {
let config = ClientConfig {
remotes: vec![RemoteEntry {
name: "hf".to_string(),
url: "host:4530".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 750,
}],
..Default::default()
};
let resolved = config.resolved_remotes();
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].name, "hf");
}
#[test]
fn test_resolved_remotes_legacy_fallback() {
let config = ClientConfig {
remote: RemoteConfig {
url: Some("host:4530".to_string()),
rig_id: Some("hf".to_string()),
auth: RemoteAuthConfig {
token: Some("tok".to_string()),
},
poll_interval_ms: 750,
},
..Default::default()
};
let resolved = config.resolved_remotes();
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].name, "hf");
assert_eq!(resolved[0].url, "host:4530");
assert_eq!(resolved[0].rig_id, Some("hf".to_string()));
assert_eq!(resolved[0].auth.token, Some("tok".to_string()));
}
#[test]
fn test_resolved_remotes_legacy_default_name() {
let config = ClientConfig {
remote: RemoteConfig {
url: Some("host:4530".to_string()),
rig_id: None,
..Default::default()
},
..Default::default()
};
let resolved = config.resolved_remotes();
assert_eq!(resolved[0].name, "default");
}
#[test]
fn test_resolved_remotes_prefers_remotes_over_legacy() {
let config = ClientConfig {
remote: RemoteConfig {
url: Some("old:4530".to_string()),
..Default::default()
},
remotes: vec![RemoteEntry {
name: "new".to_string(),
url: "new:4530".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 750,
}],
..Default::default()
};
let resolved = config.resolved_remotes();
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].name, "new");
}
#[test]
fn test_validate_rejects_duplicate_remote_names() {
let mut config = ClientConfig::default();
config.remotes = vec![
RemoteEntry {
name: "dup".to_string(),
url: "a:4530".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 750,
},
RemoteEntry {
name: "dup".to_string(),
url: "b:4530".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 750,
},
];
assert!(config.validate().unwrap_err().contains("duplicate name"));
}
#[test]
fn test_validate_rejects_empty_remote_name() {
let mut config = ClientConfig::default();
config.remotes = vec![RemoteEntry {
name: "".to_string(),
url: "a:4530".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 750,
}];
assert!(config.validate().unwrap_err().contains("name must not be empty"));
}
#[test]
fn test_validate_rejects_empty_remote_url() {
let mut config = ClientConfig::default();
config.remotes = vec![RemoteEntry {
name: "hf".to_string(),
url: " ".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 750,
}];
assert!(config.validate().unwrap_err().contains("url must not be empty"));
}
#[test]
fn test_validate_rejects_zero_remote_poll_interval() {
let mut config = ClientConfig::default();
config.remotes = vec![RemoteEntry {
name: "hf".to_string(),
url: "a:4530".to_string(),
rig_id: None,
auth: RemoteAuthConfig::default(),
poll_interval_ms: 0,
}];
assert!(config.validate().unwrap_err().contains("poll_interval_ms must be > 0"));
}
}
+163 -38
View File
@@ -32,7 +32,7 @@ use trx_frontend_http::register_frontend_on as register_http_frontend;
use trx_frontend_http_json::register_frontend_on as register_http_json_frontend;
use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend;
use config::ClientConfig;
use config::{ClientConfig, RemoteEntry};
use remote_client::{parse_remote_url, RemoteClientConfig};
const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - remote rig client");
@@ -191,28 +191,44 @@ async fn async_init() -> DynResult<AppState> {
.decode_history_retention_min_by_rig
.clone();
// Resolve remote URL: CLI > config [remote] section > error
let remote_url = cli
.url
.clone()
.or_else(|| cfg.remote.url.clone())
.ok_or("Remote URL not specified. Use --url or set [remote].url in config.")?;
// Resolve remote entries: CLI --url > [[remotes]] > legacy [remote] > error
let resolved_remotes: Vec<RemoteEntry> = if let Some(ref url) = cli.url {
// CLI --url creates a single implicit remote entry
let rig_id = cli
.rig_id
.clone()
.or_else(|| cfg.remote.rig_id.clone());
let name = rig_id.clone().unwrap_or_else(|| "default".to_string());
let token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone());
let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms);
vec![RemoteEntry {
name,
url: url.clone(),
rig_id,
auth: config::RemoteAuthConfig { token },
poll_interval_ms,
}]
} else {
let entries = cfg.resolved_remotes();
if entries.is_empty() {
return Err(
"No remote servers configured. Use --url or add [[remotes]] entries in config."
.into(),
);
}
entries
};
let remote_endpoint =
parse_remote_url(&remote_url).map_err(|e| format!("Invalid remote URL: {}", e))?;
let remote_token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone());
let remote_rig_id = cli
// Set initial active rig to the configured default or first remote entry.
let default_rig = cli
.rig_id
.clone()
.or_else(|| cfg.frontends.http.default_rig_id.clone())
.or_else(|| cfg.remote.rig_id.clone());
.or_else(|| resolved_remotes.first().map(|e| e.name.clone()));
if let Ok(mut guard) = frontend_runtime.remote_active_rig_id.lock() {
*guard = remote_rig_id.clone();
*guard = default_rig.clone();
}
let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms);
// Resolve frontends: CLI > config > default to http
let frontends: Vec<String> = if let Some(ref fes) = cli.frontends {
fes.iter().map(|f| normalize_name(f)).collect()
@@ -259,9 +275,10 @@ async fn async_init() -> DynResult<AppState> {
frontend_runtime.owner_website_name = cfg.general.website_name.clone();
frontend_runtime.ais_vessel_url_base = cfg.general.ais_vessel_url_base.clone();
let remote_names: Vec<&str> = resolved_remotes.iter().map(|e| e.name.as_str()).collect();
info!(
"Starting trx-client (remote: {}, frontends: {})",
remote_endpoint.connect_addr(),
"Starting trx-client (remotes: [{}], frontends: {})",
remote_names.join(", "),
frontends.join(", ")
);
@@ -272,28 +289,131 @@ async fn async_init() -> DynResult<AppState> {
let initial_state = RigState::new_uninitialized();
let (state_tx, state_rx) = watch::channel(initial_state);
// Extract host for audio before moving remote_addr
let remote_host = remote_endpoint.host.clone();
// Group remote entries by (addr, token) so entries sharing a server share
// one TCP connection. Each group gets its own run_remote_client task.
use std::collections::BTreeMap;
use std::sync::RwLock;
let remote_cfg = RemoteClientConfig {
addr: remote_endpoint.connect_addr(),
token: remote_token,
selected_rig_id: frontend_runtime.remote_active_rig_id.clone(),
known_rigs: frontend_runtime.remote_rigs.clone(),
rig_states: frontend_runtime.rig_states.clone(),
poll_interval: Duration::from_millis(poll_interval_ms),
spectrum: frontend_runtime.spectrum.clone(),
rig_spectrums: frontend_runtime.rig_spectrums.clone(),
server_connected: frontend_runtime.server_connected.clone(),
};
let remote_shutdown_rx = shutdown_rx.clone();
task_handles.push(tokio::spawn(async move {
if let Err(e) =
remote_client::run_remote_client(remote_cfg, rx, state_tx, remote_shutdown_rx).await
{
error!("Remote client error: {}", e);
// Parse all endpoints upfront.
let parsed_remotes: Vec<(RemoteEntry, remote_client::RemoteEndpoint)> = resolved_remotes
.iter()
.map(|entry| {
let ep = parse_remote_url(&entry.url)
.map_err(|e| format!("Invalid URL for remote '{}': {}", entry.name, e))?;
Ok((entry.clone(), ep))
})
.collect::<Result<Vec<_>, String>>()?;
// Build per-short-name server host map for audio routing.
let mut audio_server_hosts: HashMap<String, (String, u16)> = HashMap::new();
for (entry, ep) in &parsed_remotes {
let audio_port = cfg
.frontends
.audio
.rig_ports
.get(&entry.name)
.copied()
.unwrap_or(cfg.frontends.audio.server_port);
audio_server_hosts.insert(entry.name.clone(), (ep.host.clone(), audio_port));
}
// Group by (connect_addr, token).
let mut server_groups: BTreeMap<(String, Option<String>), Vec<&RemoteEntry>> = BTreeMap::new();
let mut endpoint_by_addr: HashMap<String, remote_client::RemoteEndpoint> = HashMap::new();
for (entry, ep) in &parsed_remotes {
let key = (ep.connect_addr(), entry.auth.token.clone());
endpoint_by_addr
.entry(ep.connect_addr())
.or_insert_with(|| ep.clone());
server_groups.entry(key).or_default().push(entry);
}
// Per-server request senders for the routing dispatcher.
let mut route_map: HashMap<String, mpsc::Sender<RigRequest>> = HashMap::new();
for ((addr, token), entries) in &server_groups {
// Build the rig_id → short_name mapping for this server group.
let mut rig_id_to_short_name: HashMap<Option<String>, String> = HashMap::new();
for entry in entries {
rig_id_to_short_name.insert(entry.rig_id.clone(), entry.name.clone());
}
}));
let poll_interval = entries
.iter()
.map(|e| e.poll_interval_ms)
.min()
.unwrap_or(750);
let (server_tx, server_rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
for entry in entries {
route_map.insert(entry.name.clone(), server_tx.clone());
}
let remote_cfg = RemoteClientConfig {
addr: addr.clone(),
token: token.clone(),
selected_rig_id: frontend_runtime.remote_active_rig_id.clone(),
known_rigs: frontend_runtime.remote_rigs.clone(),
rig_states: frontend_runtime.rig_states.clone(),
poll_interval: Duration::from_millis(poll_interval),
spectrum: frontend_runtime.spectrum.clone(),
rig_spectrums: frontend_runtime.rig_spectrums.clone(),
server_connected: frontend_runtime.server_connected.clone(),
rig_id_to_short_name,
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
};
let state_tx = state_tx.clone();
let remote_shutdown_rx = shutdown_rx.clone();
task_handles.push(tokio::spawn(async move {
if let Err(e) =
remote_client::run_remote_client(remote_cfg, server_rx, state_tx, remote_shutdown_rx)
.await
{
error!("Remote client error: {}", e);
}
}));
}
// Request routing dispatcher: receives from the single frontend-facing
// channel and dispatches to the per-server channel based on rig_id_override
// (short name).
let route_map = Arc::new(route_map);
let default_rig_for_router = frontend_runtime.remote_active_rig_id.clone();
{
let route_map = route_map.clone();
let mut frontend_rx = rx;
task_handles.push(tokio::spawn(async move {
while let Some(req) = frontend_rx.recv().await {
let target = req
.rig_id_override
.as_deref()
.map(String::from)
.or_else(|| {
default_rig_for_router
.lock()
.ok()
.and_then(|g| g.clone())
});
let sender = target
.as_deref()
.and_then(|name| route_map.get(name))
.or_else(|| route_map.values().next());
if let Some(sender) = sender {
let _ = sender.send(req).await;
} else {
let _ = req.respond_to.send(Err(trx_core::RigError::communication(
"no remote server available for this rig",
)));
}
}
}));
}
// Extract first remote host for audio backward-compat fallback.
let remote_host = parsed_remotes
.first()
.map(|(_, ep)| ep.host.clone())
.unwrap_or_else(|| "127.0.0.1".to_string());
// Audio streaming setup
let mut pending_audio_client = None;
@@ -392,10 +512,15 @@ async fn async_init() -> DynResult<AppState> {
let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone();
let rig_audio_info_map = frontend_runtime.rig_audio_info.clone();
let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone();
let audio_rig_server_hosts: HashMap<String, String> = audio_server_hosts
.iter()
.map(|(name, (host, _))| (name.clone(), host.clone()))
.collect();
pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager(
remote_host,
cfg.frontends.audio.server_port,
audio_rig_ports,
audio_rig_server_hosts,
frontend_runtime.remote_active_rig_id.clone(),
frontend_runtime.remote_rigs.clone(),
rx_audio_tx,
+251 -60
View File
@@ -61,8 +61,14 @@ pub struct RemoteClientConfig {
/// Shared flag: `true` while a TCP connection to trx-server is active.
pub server_connected: Arc<AtomicBool>,
pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>,
/// Per-rig spectrum watch senders, keyed by rig_id.
/// Per-rig spectrum watch senders, keyed by short name (or rig_id in legacy mode).
pub rig_spectrums: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>,
/// Maps configured server rig_id (`Some`) or default/wildcard (`None`) to
/// a client-side short name. Empty in legacy single-remote mode.
pub rig_id_to_short_name: HashMap<Option<String>, String>,
/// Dynamically resolved reverse mapping: short_name → server rig_id.
/// Populated during `refresh_remote_snapshot` when short-name mode is active.
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
}
pub async fn run_remote_client(
@@ -202,29 +208,33 @@ async fn handle_spectrum_connection(
// Determine the currently selected rig for backward compat.
let selected = selected_rig_id(config);
for rig_id in &rig_ids {
for short_name in &rig_ids {
// Resolve the server rig_id for the wire envelope.
let wire_rig_id = if has_short_names(config) {
resolve_server_rig_id(config, short_name)
} else {
Some(short_name.clone())
};
let envelope = ClientEnvelope {
token: config.token.clone(),
rig_id: Some(rig_id.clone()),
rig_id: wire_rig_id,
cmd: ClientCommand::GetSpectrum,
};
match send_envelope_no_state_update(&mut writer, &mut reader, envelope).await {
Ok(snapshot) => {
// Update per-rig channel.
// Update per-rig channel (keyed by short name).
if let Ok(map) = config.rig_spectrums.read() {
if let Some(tx) = map.get(rig_id) {
if let Some(tx) = map.get(short_name) {
tx.send_modify(|s| s.set(snapshot.spectrum.clone(), snapshot.vchan_rds.clone()));
}
}
// Update global channel if this is the selected rig.
let is_selected = selected.as_deref() == Some(rig_id.as_str());
let is_selected = selected.as_deref() == Some(short_name.as_str());
if is_selected {
config.spectrum.send_modify(|s| s.set(snapshot.spectrum, snapshot.vchan_rds));
}
}
Err(e) => {
// A spectrum timeout desynchronises the TCP framing;
// return so the caller reconnects and restores sync.
config.spectrum.send_modify(|s| s.set(None, None));
return Err(e);
}
@@ -318,6 +328,8 @@ async fn send_command(
rig_id_override: Option<String>,
state_tx: &watch::Sender<RigState>,
) -> RigResult<trx_core::RigSnapshot> {
// Keep the original short name for per-rig channel update after response.
let channel_key_override = rig_id_override.clone();
let envelope = build_envelope(config, cmd, rig_id_override);
let mut payload = serde_json::to_string(&envelope)
@@ -349,10 +361,15 @@ async fn send_command(
// Also update the per-rig watch channel so SSE sessions
// subscribed to a specific rig see the change immediately
// instead of waiting for the next poll cycle.
let rig_id = envelope.rig_id.as_deref();
if let Some(rid) = rig_id {
// The rig_id_override is a short name in multi-server mode;
// resolve accordingly for the per-rig channel key.
let channel_key = channel_key_override
.as_deref()
.map(String::from)
.or_else(|| selected_rig_id(config));
if let Some(key) = channel_key {
if let Ok(map) = config.rig_states.read() {
if let Some(tx) = map.get(rid) {
if let Some(tx) = map.get(&key) {
tx.send_if_modified(|old| {
if *old == new_state {
false
@@ -379,9 +396,17 @@ fn build_envelope(
cmd: ClientCommand,
rig_id_override: Option<String>,
) -> ClientEnvelope {
let rig_id = rig_id_override.or_else(|| selected_rig_id(config));
// In multi-server mode, the rig_id is actually a short name that needs to
// be translated back to the server-side rig_id for the wire envelope.
let wire_rig_id = if has_short_names(config) {
rig_id.and_then(|name| resolve_server_rig_id(config, &name))
} else {
rig_id
};
ClientEnvelope {
token: config.token.clone(),
rig_id: rig_id_override.or_else(|| selected_rig_id(config)),
rig_id: wire_rig_id,
cmd,
}
}
@@ -393,39 +418,82 @@ async fn refresh_remote_snapshot(
state_tx: &watch::Sender<RigState>,
) -> RigResult<()> {
let rigs = send_get_rigs(config, writer, reader).await?;
cache_remote_rigs(config, &rigs);
if rigs.is_empty() {
return Err(RigError::communication("GetRigs returned no rigs"));
}
let selected = selected_rig_id(config);
let target = selected
.as_deref()
.and_then(|id| rigs.iter().find(|entry| entry.rig_id == id))
.or_else(|| choose_default_rig(rigs.as_slice()))
.ok_or_else(|| RigError::communication("GetRigs returned no selectable rig"))?;
if selected.as_deref() != Some(target.rig_id.as_str()) {
set_selected_rig_id(config, Some(target.rig_id.clone()));
}
let new_state = RigState::from_snapshot(target.state.clone());
// Only wake SSE subscribers when something actually changed.
state_tx.send_if_modified(|old| {
if *old == new_state {
false
} else {
*old = new_state;
true
}
});
// Update per-rig watch channels so each SSE session can subscribe
// to a specific rig's state independently.
if let Ok(mut rig_map) = config.rig_states.write() {
// In multi-server mode, filter rigs to only those that have a short name
// mapping, and populate the reverse mapping (short_name → server rig_id).
let mapped_rigs: Vec<(String, &RigEntry)> = if has_short_names(config) {
let mut mapped = Vec::new();
// Track which wildcard (None-key) entry we've already resolved.
let mut wildcard_resolved = false;
for entry in &rigs {
if let Some(short_name) = config
.rig_id_to_short_name
.get(&Some(entry.rig_id.clone()))
{
// Update reverse map.
if let Ok(mut rev) = config.short_name_to_rig_id.write() {
rev.insert(short_name.clone(), entry.rig_id.clone());
}
mapped.push((short_name.clone(), entry));
} else if !wildcard_resolved {
if let Some(short_name) = config.rig_id_to_short_name.get(&None) {
// Wildcard: first unmatched rig gets the default short name.
// Prefer an initialized, TX-capable rig when possible.
let candidate = choose_default_rig(&rigs)
.filter(|r| {
!config
.rig_id_to_short_name
.contains_key(&Some(r.rig_id.clone()))
})
.unwrap_or(entry);
if let Ok(mut rev) = config.short_name_to_rig_id.write() {
rev.insert(short_name.clone(), candidate.rig_id.clone());
}
mapped.push((short_name.clone(), candidate));
wildcard_resolved = true;
}
}
}
mapped
} else {
rigs.iter()
.map(|e| (e.rig_id.clone(), e))
.collect()
};
cache_remote_rigs(config, &rigs, &mapped_rigs);
if mapped_rigs.is_empty() {
return Err(RigError::communication("GetRigs returned no mapped rigs"));
}
// Determine target for global state_tx (backward compat).
let selected = selected_rig_id(config);
let target_key = selected
.as_deref()
.and_then(|id| mapped_rigs.iter().find(|(key, _)| key == id))
.or_else(|| mapped_rigs.first());
if let Some((key, entry)) = target_key {
if selected.as_deref() != Some(key.as_str()) {
set_selected_rig_id(config, Some(key.clone()));
}
let new_state = RigState::from_snapshot(entry.state.clone());
state_tx.send_if_modified(|old| {
if *old == new_state {
false
} else {
*old = new_state;
true
}
});
}
// Update per-rig watch channels keyed by short name (or rig_id in legacy mode).
if let Ok(mut rig_map) = config.rig_states.write() {
for (key, entry) in &mapped_rigs {
let new_state = RigState::from_snapshot(entry.state.clone());
if let Some(tx) = rig_map.get(&entry.rig_id) {
if let Some(tx) = rig_map.get(key) {
tx.send_if_modified(|old| {
if *old == new_state {
false
@@ -436,13 +504,13 @@ async fn refresh_remote_snapshot(
});
} else {
let (tx, _rx) = watch::channel(new_state);
rig_map.insert(entry.rig_id.clone(), tx);
rig_map.insert(key.clone(), tx);
}
}
// Remove channels for rigs no longer reported by the server.
let active_ids: std::collections::HashSet<&str> =
rigs.iter().map(|e| e.rig_id.as_str()).collect();
rig_map.retain(|id, _| active_ids.contains(id.as_str()));
// Remove channels for keys no longer present.
let active_keys: std::collections::HashSet<&str> =
mapped_rigs.iter().map(|(k, _)| k.as_str()).collect();
rig_map.retain(|id, _| active_keys.contains(id.as_str()));
}
Ok(())
}
@@ -485,25 +553,30 @@ async fn send_get_rigs(
))
}
fn cache_remote_rigs(config: &RemoteClientConfig, rigs: &[RigEntry]) {
fn cache_remote_rigs(
config: &RemoteClientConfig,
_raw_rigs: &[RigEntry],
mapped_rigs: &[(String, &RigEntry)],
) {
if let Ok(mut guard) = config.known_rigs.lock() {
// Skip the Vec rebuild when the rig list is structurally unchanged.
// We compare the fields surfaced in the UI rig picker; full state
// changes are propagated via the watch channel, not this cache.
let unchanged = guard.len() == rigs.len()
&& guard.iter().zip(rigs.iter()).all(|(cached, new)| {
cached.rig_id == new.rig_id
&& cached.display_name == new.display_name
&& cached.state.initialized == new.state.initialized
&& cached.audio_port == new.audio_port
});
let unchanged = guard.len() == mapped_rigs.len()
&& guard
.iter()
.zip(mapped_rigs.iter())
.all(|(cached, (key, new))| {
cached.rig_id == *key
&& cached.display_name == new.display_name
&& cached.state.initialized == new.state.initialized
&& cached.audio_port == new.audio_port
});
if unchanged {
return;
}
*guard = rigs
*guard = mapped_rigs
.iter()
.map(|entry| RemoteRigEntry {
rig_id: entry.rig_id.clone(),
.map(|(key, entry)| RemoteRigEntry {
rig_id: key.clone(),
display_name: entry.display_name.clone(),
state: entry.state.clone(),
audio_port: entry.audio_port,
@@ -516,6 +589,38 @@ fn selected_rig_id(config: &RemoteClientConfig) -> Option<String> {
config.selected_rig_id.lock().ok().and_then(|g| g.clone())
}
/// Returns `true` when the config has short-name mappings (multi-server mode).
fn has_short_names(config: &RemoteClientConfig) -> bool {
!config.rig_id_to_short_name.is_empty()
}
/// Resolve a server rig_id to the client-side short name.
/// In legacy mode (no mappings), returns the rig_id unchanged.
#[cfg(test)]
fn resolve_short_name(config: &RemoteClientConfig, server_rig_id: &str) -> Option<String> {
if !has_short_names(config) {
return Some(server_rig_id.to_string());
}
// Try explicit rig_id mapping first.
if let Some(name) = config.rig_id_to_short_name.get(&Some(server_rig_id.to_string())) {
return Some(name.clone());
}
// Try wildcard (None key = "default rig on this server").
config.rig_id_to_short_name.get(&None).cloned()
}
/// Resolve a client-side short name back to a server rig_id for building envelopes.
fn resolve_server_rig_id(config: &RemoteClientConfig, short_name: &str) -> Option<String> {
if !has_short_names(config) {
return Some(short_name.to_string());
}
config
.short_name_to_rig_id
.read()
.ok()
.and_then(|map| map.get(short_name).cloned())
}
fn set_selected_rig_id(config: &RemoteClientConfig, value: Option<String>) {
if let Ok(mut guard) = config.selected_rig_id.lock() {
*guard = value;
@@ -743,6 +848,8 @@ fn parse_port(port_str: &str) -> Result<u16, String> {
#[cfg(test)]
mod tests {
use super::{parse_remote_url, RemoteClientConfig, RemoteEndpoint, SharedSpectrum};
#[allow(unused_imports)]
use super::{has_short_names, resolve_server_rig_id, resolve_short_name};
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
@@ -932,6 +1039,8 @@ mod tests {
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
},
req_rx,
state_tx,
@@ -972,9 +1081,91 @@ mod tests {
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
};
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret"));
assert_eq!(envelope.rig_id.as_deref(), Some("sdr"));
}
#[test]
fn build_envelope_translates_short_name_to_server_rig_id() {
let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default());
let short_name_to_rig_id = Arc::new(RwLock::new(HashMap::from([
("home-hf".to_string(), "hf".to_string()),
])));
let config = RemoteClientConfig {
addr: "127.0.0.1:4530".to_string(),
token: None,
selected_rig_id: Arc::new(Mutex::new(Some("home-hf".to_string()))),
known_rigs: Arc::new(Mutex::new(Vec::new())),
poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([
(Some("hf".to_string()), "home-hf".to_string()),
]),
short_name_to_rig_id,
};
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.rig_id.as_deref(), Some("hf"));
// Override with short name should also translate
let envelope = super::build_envelope(
&config,
trx_protocol::ClientCommand::GetState,
Some("home-hf".to_string()),
);
assert_eq!(envelope.rig_id.as_deref(), Some("hf"));
}
#[test]
fn resolve_short_name_legacy_passthrough() {
let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default());
let config = RemoteClientConfig {
addr: "127.0.0.1:4530".to_string(),
token: None,
selected_rig_id: Arc::new(Mutex::new(None)),
known_rigs: Arc::new(Mutex::new(Vec::new())),
poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
};
// Legacy mode: rig_id passes through unchanged
assert!(!has_short_names(&config));
assert_eq!(resolve_short_name(&config, "hf"), Some("hf".to_string()));
}
#[test]
fn resolve_short_name_with_mapping() {
let (spectrum_tx, _spectrum_rx) = watch::channel(SharedSpectrum::default());
let config = RemoteClientConfig {
addr: "127.0.0.1:4530".to_string(),
token: None,
selected_rig_id: Arc::new(Mutex::new(None)),
known_rigs: Arc::new(Mutex::new(Vec::new())),
poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([
(Some("hf".to_string()), "home-hf".to_string()),
(None, "default-rig".to_string()),
]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
};
assert!(has_short_names(&config));
assert_eq!(resolve_short_name(&config, "hf"), Some("home-hf".to_string()));
// Unknown rig_id falls through to wildcard
assert_eq!(resolve_short_name(&config, "unknown"), Some("default-rig".to_string()));
}
}