[feat](trx-frontend-rigctl): per-rig rigctl listeners for multi-rig setups

Add rig_ports map to RigctlFrontendConfig. When non-empty, one rigctl
TCP listener is spawned per entry instead of the single shared listener,
each routing commands to its assigned rig via rig_id_override on RigRequest.

Add rig_id_override: Option<String> to RigRequest so the remote client
can route individual requests to a specific rig without changing the
globally selected rig. build_envelope prefers the override when set.

Example config:
  [frontends.rigctl]
  enabled = true
  listen = "127.0.0.1"
  port = 4532
  rig_ports.ft817 = 4532
  rig_ports.airspyhf = 4533

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-02-28 08:41:39 +01:00
parent 2c2a0951a5
commit b5a1b4af33
10 changed files with 74 additions and 19 deletions
+7 -1
View File
@@ -247,8 +247,12 @@ pub struct RigctlFrontendConfig {
pub enabled: bool,
/// Listen address
pub listen: IpAddr,
/// Listen port
/// Listen port (used for single-rig setups or as the fallback base port)
pub port: u16,
/// Per-rig port overrides for multi-rig servers.
/// Maps rig ID → local rigctl port. When non-empty, one rigctl listener
/// is spawned per entry, each routing commands to its assigned rig.
pub rig_ports: HashMap<String, u16>,
}
impl Default for RigctlFrontendConfig {
@@ -257,6 +261,7 @@ impl Default for RigctlFrontendConfig {
enabled: false,
listen: IpAddr::from([127, 0, 0, 1]),
port: 4532,
rig_ports: HashMap::new(),
}
}
}
@@ -408,6 +413,7 @@ impl ClientConfig {
enabled: false,
listen: IpAddr::from([127, 0, 0, 1]),
port: 4532,
rig_ports: HashMap::new(),
},
http_json: HttpJsonFrontendConfig::default(),
audio: AudioClientConfig::default(),
+40
View File
@@ -336,6 +336,46 @@ async fn async_init() -> DynResult<AppState> {
// Spawn frontends with runtime context
for frontend in &frontends {
let frontend_state_rx = state_rx.clone();
// rigctl with per-rig port mapping: spawn one listener per rig entry.
if frontend == "rigctl" && !cfg.frontends.rigctl.rig_ports.is_empty() {
let mut first = true;
for (rig_id, &port) in &cfg.frontends.rigctl.rig_ports {
let addr = SocketAddr::from((rigctl_listen, port));
if first {
if let Ok(mut listen_addr) = frontend_runtime_ctx.rigctl_listen_addr.lock() {
*listen_addr = Some(addr);
}
first = false;
}
// Proxy channel: inject rig_id_override before forwarding to main tx.
let (proxy_tx, mut proxy_rx) =
mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
let main_tx = tx.clone();
let rig_id_owned = rig_id.clone();
tokio::spawn(async move {
while let Some(req) = proxy_rx.recv().await {
let forwarded = RigRequest {
cmd: req.cmd,
respond_to: req.respond_to,
rig_id_override: Some(rig_id_owned.clone()),
};
let _ = main_tx.send(forwarded).await;
}
});
info!("rigctl frontend for rig '{}' on {}", rig_id, addr);
frontend_reg_ctx.spawn_frontend(
frontend,
state_rx.clone(),
proxy_tx,
callsign.clone(),
addr,
frontend_runtime_ctx.clone(),
)?;
}
continue;
}
let addr = match frontend.as_str() {
"http" => SocketAddr::from((http_listen, http_port)),
"rigctl" => SocketAddr::from((rigctl_listen, rigctl_port)),
+13 -7
View File
@@ -167,10 +167,11 @@ async fn handle_connection(
let Some(req) = req else {
return Ok(());
};
let rig_id_override = req.rig_id_override;
let cmd = req.cmd;
let result = {
let client_cmd = rig_command_to_client(cmd);
send_command(config, &mut writer, &mut reader, client_cmd, state_tx).await
send_command(config, &mut writer, &mut reader, client_cmd, rig_id_override, state_tx).await
};
let _ = req.respond_to.send(result);
@@ -184,9 +185,10 @@ async fn send_command(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
cmd: ClientCommand,
rig_id_override: Option<String>,
state_tx: &watch::Sender<RigState>,
) -> RigResult<trx_core::RigSnapshot> {
let envelope = build_envelope(config, cmd);
let envelope = build_envelope(config, cmd, rig_id_override);
let payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
@@ -233,7 +235,7 @@ async fn send_command_no_state_update(
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
cmd: ClientCommand,
) -> RigResult<trx_core::RigSnapshot> {
let envelope = build_envelope(config, cmd);
let envelope = build_envelope(config, cmd, None);
let payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
time::timeout(
@@ -265,10 +267,14 @@ async fn send_command_no_state_update(
))
}
fn build_envelope(config: &RemoteClientConfig, cmd: ClientCommand) -> ClientEnvelope {
fn build_envelope(
config: &RemoteClientConfig,
cmd: ClientCommand,
rig_id_override: Option<String>,
) -> ClientEnvelope {
ClientEnvelope {
token: config.token.clone(),
rig_id: selected_rig_id(config),
rig_id: rig_id_override.or_else(|| selected_rig_id(config)),
cmd,
}
}
@@ -305,7 +311,7 @@ async fn send_get_rigs(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
) -> RigResult<Vec<RigEntry>> {
let envelope = build_envelope(config, ClientCommand::GetRigs);
let envelope = build_envelope(config, ClientCommand::GetRigs, None);
let payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
@@ -712,7 +718,7 @@ mod tests {
poll_interval: Duration::from_millis(500),
spectrum: Arc::new(Mutex::new(SharedSpectrum::default())),
};
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState);
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret"));
assert_eq!(envelope.rig_id.as_deref(), Some("sdr"));
}
@@ -156,6 +156,7 @@ async fn handle_client(
let req = RigRequest {
cmd: rig_cmd,
respond_to: resp_tx,
rig_id_override: None,
};
match time::timeout(IO_TIMEOUT, tx.send(req)).await {
@@ -812,6 +812,7 @@ async fn send_command(
.send(RigRequest {
cmd,
respond_to: resp_tx,
rig_id_override: None,
})
.await
.map_err(|e| {
@@ -337,6 +337,7 @@ async fn send_rig_command(
.send(RigRequest {
cmd,
respond_to: resp_tx,
rig_id_override: None,
})
.await
.map_err(|e| format!("failed to send to rig: {e:?}"))?;
+3
View File
@@ -11,4 +11,7 @@ use crate::{RigCommand, RigResult, RigSnapshot};
pub struct RigRequest {
pub cmd: RigCommand,
pub respond_to: oneshot::Sender<RigResult<RigSnapshot>>,
/// When set, the remote client routes this request to the specified rig
/// instead of the globally selected rig. Used for per-rig rigctl listeners.
pub rig_id_override: Option<String>,
}
+1
View File
@@ -300,6 +300,7 @@ async fn handle_client(
let req = RigRequest {
cmd: rig_cmd,
respond_to: resp_tx,
rig_id_override: None,
};
match time::timeout(IO_TIMEOUT, handle.rig_tx.send(req)).await {
+1 -1
View File
@@ -316,7 +316,7 @@ pub async fn run_rig_task(
}
// Process each request
while let Some(RigRequest { cmd, respond_to }) = batch.pop() {
while let Some(RigRequest { cmd, respond_to, .. }) = batch.pop() {
let cmd_label = format!("{:?}", cmd);
let log_command = !matches!(&cmd, RigCommand::GetSpectrum);
let started = Instant::now();