From 0b8c408c173ec60af3e7ec6d02f67f19a62cf9bb Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Wed, 25 Feb 2026 08:04:19 +0100 Subject: [PATCH] [feat](trx-server): implement multi-rig support Enable N simultaneous rig backends in one server process: - rig_handle.rs: new RigHandle { rig_id, rig_tx, state_rx } thin struct - config.rs: RigInstanceConfig, rigs: Vec in ServerConfig, resolved_rigs() (synthesises legacy flat fields as id="default"), validate() checks unique rig IDs and audio ports; MR-08 config tests - audio.rs: replace four OnceLock> statics with DecoderHistories { aprs, ft8, wspr } Arc struct; decoder/listener functions now take Arc for per-rig isolation - rig_task.rs: add rig_id + histories: Arc to RigTaskConfig; clear_*_history calls use ctx.histories instance methods - listener.rs: run_listener takes Arc> + default_rig_id; routes envelope.rig_id to correct rig; GetRigs fast path aggregates all rig states; all responses include rig_id field - main.rs: loop over resolved_rigs(); spawn_rig_audio_stack() helper; builds Arc> passed to run_listener Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- src/trx-server/src/audio.rs | 185 +++---- src/trx-server/src/config.rs | 272 +++++++++- src/trx-server/src/listener.rs | 171 ++++++- src/trx-server/src/main.rs | 841 +++++++++++++++++++------------ src/trx-server/src/rig_handle.rs | 23 + src/trx-server/src/rig_task.rs | 20 +- 6 files changed, 1058 insertions(+), 454 deletions(-) create mode 100644 src/trx-server/src/rig_handle.rs diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 3260988..f8fe4f5 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -6,9 +6,9 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use std::{collections::VecDeque, sync::Mutex}; +use std::collections::VecDeque; use bytes::Bytes; use tokio::net::{TcpListener, TcpStream}; @@ -118,103 +118,112 @@ fn classify_stream_error(err: &str) -> &'static str { } } -fn aprs_history() -> &'static Mutex> { - static HISTORY: OnceLock>> = OnceLock::new(); - HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) +/// Per-rig decoder history store. +/// +/// Replaces the previous process-wide `OnceLock` statics so that each rig +/// instance can maintain its own independent history. Pass an +/// `Arc` into every decoder task and into the audio listener. +pub struct DecoderHistories { + aprs: Mutex>, + ft8: Mutex>, + wspr: Mutex>, } -fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) { - let cutoff = Instant::now() - APRS_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; +impl DecoderHistories { + pub fn new() -> Arc { + Arc::new(Self { + aprs: Mutex::new(VecDeque::new()), + ft8: Mutex::new(VecDeque::new()), + wspr: Mutex::new(VecDeque::new()), + }) + } + + // --- APRS --- + + fn prune_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) { + let cutoff = Instant::now() - APRS_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } } } -} -pub fn record_aprs_packet(pkt: AprsPacket) { - let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); - history.push_back((Instant::now(), pkt)); - prune_aprs_history(&mut history); -} + pub fn record_aprs_packet(&self, pkt: AprsPacket) { + let mut h = self.aprs.lock().expect("aprs history mutex poisoned"); + h.push_back((Instant::now(), pkt)); + Self::prune_aprs(&mut h); + } -pub fn snapshot_aprs_history() -> Vec { - let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); - prune_aprs_history(&mut history); - history.iter().map(|(_, pkt)| pkt.clone()).collect() -} + pub fn snapshot_aprs_history(&self) -> Vec { + let mut h = self.aprs.lock().expect("aprs history mutex poisoned"); + Self::prune_aprs(&mut h); + h.iter().map(|(_, pkt)| pkt.clone()).collect() + } -pub fn clear_aprs_history() { - let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); - history.clear(); -} + pub fn clear_aprs_history(&self) { + self.aprs.lock().expect("aprs history mutex poisoned").clear(); + } -fn ft8_history() -> &'static Mutex> { - static HISTORY: OnceLock>> = OnceLock::new(); - HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) -} + // --- FT8 --- -fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) { - let cutoff = Instant::now() - FT8_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; + fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) { + let cutoff = Instant::now() - FT8_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } } } -} -pub fn record_ft8_message(msg: Ft8Message) { - let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); - history.push_back((Instant::now(), msg)); - prune_ft8_history(&mut history); -} + pub fn record_ft8_message(&self, msg: Ft8Message) { + let mut h = self.ft8.lock().expect("ft8 history mutex poisoned"); + h.push_back((Instant::now(), msg)); + Self::prune_ft8(&mut h); + } -pub fn snapshot_ft8_history() -> Vec { - let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); - prune_ft8_history(&mut history); - history.iter().map(|(_, msg)| msg.clone()).collect() -} + pub fn snapshot_ft8_history(&self) -> Vec { + let mut h = self.ft8.lock().expect("ft8 history mutex poisoned"); + Self::prune_ft8(&mut h); + h.iter().map(|(_, msg)| msg.clone()).collect() + } -pub fn clear_ft8_history() { - let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); - history.clear(); -} + pub fn clear_ft8_history(&self) { + self.ft8.lock().expect("ft8 history mutex poisoned").clear(); + } -fn wspr_history() -> &'static Mutex> { - static HISTORY: OnceLock>> = OnceLock::new(); - HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) -} + // --- WSPR --- -fn prune_wspr_history(history: &mut VecDeque<(Instant, WsprMessage)>) { - let cutoff = Instant::now() - WSPR_HISTORY_RETENTION; - while let Some((ts, _)) = history.front() { - if *ts < cutoff { - history.pop_front(); - } else { - break; + fn prune_wspr(history: &mut VecDeque<(Instant, WsprMessage)>) { + let cutoff = Instant::now() - WSPR_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } } } -} -pub fn snapshot_wspr_history() -> Vec { - let mut history = wspr_history().lock().expect("wspr history mutex poisoned"); - prune_wspr_history(&mut history); - history.iter().map(|(_, msg)| msg.clone()).collect() -} + pub fn record_wspr_message(&self, msg: WsprMessage) { + let mut h = self.wspr.lock().expect("wspr history mutex poisoned"); + h.push_back((Instant::now(), msg)); + Self::prune_wspr(&mut h); + } -pub fn clear_wspr_history() { - let mut history = wspr_history().lock().expect("wspr history mutex poisoned"); - history.clear(); -} + pub fn snapshot_wspr_history(&self) -> Vec { + let mut h = self.wspr.lock().expect("wspr history mutex poisoned"); + Self::prune_wspr(&mut h); + h.iter().map(|(_, msg)| msg.clone()).collect() + } -pub fn record_wspr_message(msg: WsprMessage) { - let mut history = wspr_history().lock().expect("wspr history mutex poisoned"); - history.push_back((Instant::now(), msg)); - prune_wspr_history(&mut history); + pub fn clear_wspr_history(&self) { + self.wspr.lock().expect("wspr history mutex poisoned").clear(); + } } /// Spawn the audio capture thread. @@ -665,6 +674,7 @@ pub async fn run_aprs_decoder( mut state_rx: watch::Receiver, decode_tx: broadcast::Sender, decode_logs: Option>, + histories: Arc, ) { info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels); let mut decoder = AprsDecoder::new(sample_rate); @@ -717,7 +727,7 @@ pub async fn run_aprs_decoder( was_active = true; for pkt in decoder.process_samples(&mono) { - record_aprs_packet(pkt.clone()); + histories.record_aprs_packet(pkt.clone()); if let Some(logger) = decode_logs.as_ref() { logger.log_aprs(&pkt); } @@ -936,6 +946,7 @@ pub async fn run_ft8_decoder( mut state_rx: watch::Receiver, decode_tx: broadcast::Sender, decode_logs: Option>, + histories: Arc, ) { info!("FT8 decoder started ({}Hz, {} ch)", sample_rate, channels); let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) { @@ -1020,7 +1031,7 @@ pub async fn run_ft8_decoder( freq_hz: res.freq_hz, message: res.text, }; - record_ft8_message(msg.clone()); + histories.record_ft8_message(msg.clone()); if let Some(logger) = decode_logs.as_ref() { logger.log_ft8(&msg); } @@ -1072,6 +1083,7 @@ pub async fn run_wspr_decoder( mut state_rx: watch::Receiver, decode_tx: broadcast::Sender, decode_logs: Option>, + histories: Arc, ) { info!("WSPR decoder started ({}Hz, {} ch)", sample_rate, channels); let decoder = match WsprDecoder::new() { @@ -1136,7 +1148,7 @@ pub async fn run_wspr_decoder( freq_hz: res.freq_hz, message: res.message, }; - record_wspr_message(msg.clone()); + histories.record_wspr_message(msg.clone()); if let Some(logger) = decode_logs.as_ref() { logger.log_wspr(&msg); } @@ -1209,6 +1221,7 @@ pub async fn run_audio_listener( stream_info: AudioStreamInfo, decode_tx: broadcast::Sender, mut shutdown_rx: watch::Receiver, + histories: Arc, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; info!("Audio listener on {}", addr); @@ -1224,9 +1237,10 @@ pub async fn run_audio_listener( let info = stream_info.clone(); let decode_tx = decode_tx.clone(); let client_shutdown_rx = shutdown_rx.clone(); + let client_histories = histories.clone(); tokio::spawn(async move { - if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx).await { + if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx, client_histories).await { warn!("Audio client {} error: {:?}", peer, e); } info!("Audio client {} disconnected", peer); @@ -1255,6 +1269,7 @@ async fn handle_audio_client( stream_info: AudioStreamInfo, decode_tx: broadcast::Sender, mut shutdown_rx: watch::Receiver, + histories: Arc, ) -> std::io::Result<()> { let (reader, writer) = socket.into_split(); let mut reader = tokio::io::BufReader::new(reader); @@ -1265,7 +1280,7 @@ async fn handle_audio_client( write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; // Send APRS history to newly connected client. - let history = snapshot_aprs_history(); + let history = histories.snapshot_aprs_history(); for pkt in history { let msg = DecodedMessage::Aprs(pkt); let msg_type = AUDIO_MSG_APRS_DECODE; @@ -1274,7 +1289,7 @@ async fn handle_audio_client( } } // Send FT8 history to newly connected client. - let history = snapshot_ft8_history(); + let history = histories.snapshot_ft8_history(); for msg in history { let msg = DecodedMessage::Ft8(msg); let msg_type = AUDIO_MSG_FT8_DECODE; @@ -1283,7 +1298,7 @@ async fn handle_audio_client( } } // Send WSPR history to newly connected client. - let history = snapshot_wspr_history(); + let history = histories.snapshot_wspr_history(); for msg in history { let msg = DecodedMessage::Wspr(msg); let msg_type = AUDIO_MSG_WSPR_DECODE; diff --git a/src/trx-server/src/config.rs b/src/trx-server/src/config.rs index 04b3b58..7a28283 100644 --- a/src/trx-server/src/config.rs +++ b/src/trx-server/src/config.rs @@ -20,28 +20,74 @@ pub use trx_decode_log::DecodeLogsConfig; use trx_core::rig::state::RigMode; +/// Per-rig instance configuration for multi-rig setups. +/// +/// Each entry in `[[rigs]]` becomes one of these. The flat top-level +/// `[rig]` / `[audio]` / `[sdr]` / `[pskreporter]` / `[aprsfi]` / +/// `[behavior]` / `[decode_logs]` fields are still supported via +/// `ServerConfig::resolved_rigs()` which synthesises a single-element list +/// with `id = "default"` when `rigs` is empty. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct RigInstanceConfig { + /// Stable rig identifier used in protocol routing. + pub id: String, + /// Rig backend configuration. + pub rig: RigConfig, + /// Polling and retry behavior. + pub behavior: BehaviorConfig, + /// Audio streaming configuration for this rig. + pub audio: AudioConfig, + /// SDR pipeline configuration (only used when [rigs.rig.access] type = "sdr"). + pub sdr: SdrConfig, + /// PSK Reporter uplink for this rig. + pub pskreporter: PskReporterConfig, + /// APRS-IS IGate uplink for this rig. + pub aprsfi: AprsFiConfig, + /// Decoder file logging for this rig. + pub decode_logs: DecodeLogsConfig, +} + +impl Default for RigInstanceConfig { + fn default() -> Self { + Self { + id: "default".to_string(), + rig: RigConfig::default(), + behavior: BehaviorConfig::default(), + audio: AudioConfig::default(), + sdr: SdrConfig::default(), + pskreporter: PskReporterConfig::default(), + aprsfi: AprsFiConfig::default(), + decode_logs: DecodeLogsConfig::default(), + } + } +} + /// Top-level server configuration structure. #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(default)] pub struct ServerConfig { /// General settings pub general: GeneralConfig, - /// Rig backend configuration + /// Rig backend configuration (legacy flat; use [[rigs]] for multi-rig) pub rig: RigConfig, - /// Polling and retry behavior + /// Polling and retry behavior (legacy flat) pub behavior: BehaviorConfig, /// TCP listener configuration pub listen: ListenConfig, - /// Audio streaming configuration + /// Audio streaming configuration (legacy flat) pub audio: AudioConfig, - /// PSK Reporter uplink configuration + /// PSK Reporter uplink configuration (legacy flat) pub pskreporter: PskReporterConfig, - /// APRS-IS IGate uplink configuration + /// APRS-IS IGate uplink configuration (legacy flat) pub aprsfi: AprsFiConfig, - /// Decoder file logging configuration + /// Decoder file logging configuration (legacy flat) pub decode_logs: DecodeLogsConfig, - /// SDR pipeline configuration (used when [rig.access] type = "sdr"). + /// SDR pipeline configuration (legacy flat; used when [rig.access] type = "sdr"). pub sdr: SdrConfig, + /// Multi-rig instance list. When non-empty, takes priority over the flat fields. + #[serde(rename = "rigs", default)] + pub rigs: Vec, } /// General application settings. @@ -441,6 +487,33 @@ impl ServerConfig { } } + // Multi-rig uniqueness checks. + if !self.rigs.is_empty() { + let mut seen_ids: std::collections::HashSet = + std::collections::HashSet::new(); + let mut seen_ports: std::collections::HashSet = + std::collections::HashSet::new(); + for rig in &self.rigs { + if rig.id.trim().is_empty() { + return Err("[[rigs]] entry has an empty id".to_string()); + } + if !seen_ids.insert(rig.id.clone()) { + return Err(format!( + "[[rigs]] duplicate rig id: \"{}\"", + rig.id + )); + } + if rig.audio.enabled { + if !seen_ports.insert(rig.audio.port) { + return Err(format!( + "[[rigs]] duplicate audio port {} (rig id: \"{}\")", + rig.audio.port, rig.id + )); + } + } + } + } + if self.decode_logs.enabled { if self.decode_logs.dir.trim().is_empty() { return Err("[decode_logs].dir must not be empty when enabled".to_string()); @@ -535,6 +608,27 @@ impl ServerConfig { ::load_from_default_paths() } + /// Return the effective list of rig instances to spawn. + /// + /// When `[[rigs]]` entries are present they are returned as-is. + /// Otherwise the legacy flat `[rig]` / `[audio]` / … fields are synthesised + /// into a single `RigInstanceConfig` with `id = "default"`. + pub fn resolved_rigs(&self) -> Vec { + if !self.rigs.is_empty() { + return self.rigs.clone(); + } + vec![RigInstanceConfig { + id: "default".to_string(), + rig: self.rig.clone(), + behavior: self.behavior.clone(), + audio: self.audio.clone(), + sdr: self.sdr.clone(), + pskreporter: self.pskreporter.clone(), + aprsfi: self.aprsfi.clone(), + decode_logs: self.decode_logs.clone(), + }] + } + /// Generate an example configuration as a TOML string. pub fn example_toml() -> String { let example = ServerConfig { @@ -564,6 +658,7 @@ impl ServerConfig { aprsfi: AprsFiConfig::default(), decode_logs: DecodeLogsConfig::default(), sdr: SdrConfig::default(), + rigs: Vec::new(), }; toml::to_string_pretty(&example).unwrap_or_default() @@ -1018,4 +1113,167 @@ tokens = ["secret123"] errors ); } + + // --- MR-08: multi-rig config tests --- + + #[test] + fn test_resolved_rigs_legacy_flat_fields() { + let mut cfg = ServerConfig::default(); + cfg.rig.model = Some("ft817".to_string()); + cfg.rig.access.access_type = Some("serial".to_string()); + cfg.rig.access.port = Some("/dev/ttyUSB0".to_string()); + cfg.rig.access.baud = Some(9600); + + let rigs = cfg.resolved_rigs(); + assert_eq!(rigs.len(), 1); + assert_eq!(rigs[0].id, "default"); + assert_eq!(rigs[0].rig.model, Some("ft817".to_string())); + } + + #[test] + fn test_resolved_rigs_multi_rig_toml() { + let toml_str = r#" +[general] +callsign = "W1AW" + +[[rigs]] +id = "hf" + +[rigs.rig] +model = "ft450d" +initial_freq_hz = 14074000 + +[rigs.rig.access] +type = "serial" +port = "/dev/ttyUSB0" +baud = 9600 + +[rigs.audio] +port = 4531 + +[[rigs]] +id = "sdr" + +[rigs.rig] +model = "soapysdr" + +[rigs.rig.access] +type = "sdr" +args = "driver=rtlsdr" + +[rigs.audio] +port = 4532 +"#; + let cfg: ServerConfig = toml::from_str(toml_str).unwrap(); + let rigs = cfg.resolved_rigs(); + assert_eq!(rigs.len(), 2); + assert_eq!(rigs[0].id, "hf"); + assert_eq!(rigs[0].rig.model, Some("ft450d".to_string())); + assert_eq!(rigs[0].audio.port, 4531); + assert_eq!(rigs[1].id, "sdr"); + assert_eq!(rigs[1].rig.model, Some("soapysdr".to_string())); + assert_eq!(rigs[1].audio.port, 4532); + } + + #[test] + fn test_validate_rejects_duplicate_rig_ids() { + let toml_str = r#" +[[rigs]] +id = "rig1" +[rigs.rig] +model = "ft817" +[rigs.rig.access] +type = "serial" +port = "/dev/ttyUSB0" +baud = 9600 +[rigs.audio] +port = 4531 + +[[rigs]] +id = "rig1" +[rigs.rig] +model = "ft450d" +[rigs.rig.access] +type = "serial" +port = "/dev/ttyUSB1" +baud = 9600 +[rigs.audio] +port = 4532 +"#; + let cfg: ServerConfig = toml::from_str(toml_str).unwrap(); + let result = cfg.validate(); + assert!(result.is_err()); + assert!( + result.unwrap_err().contains("duplicate rig id"), + "expected error about duplicate rig id" + ); + } + + #[test] + fn test_validate_rejects_duplicate_audio_ports() { + let toml_str = r#" +[[rigs]] +id = "rig1" +[rigs.rig] +model = "ft817" +[rigs.rig.access] +type = "serial" +port = "/dev/ttyUSB0" +baud = 9600 +[rigs.audio] +port = 4531 + +[[rigs]] +id = "rig2" +[rigs.rig] +model = "ft450d" +[rigs.rig.access] +type = "serial" +port = "/dev/ttyUSB1" +baud = 9600 +[rigs.audio] +port = 4531 +"#; + let cfg: ServerConfig = toml::from_str(toml_str).unwrap(); + let result = cfg.validate(); + assert!(result.is_err()); + assert!( + result.unwrap_err().contains("duplicate audio port"), + "expected error about duplicate audio port" + ); + } + + #[test] + fn test_validate_accepts_multi_rig_unique_ids_and_ports() { + let toml_str = r#" +[[rigs]] +id = "hf" +[rigs.rig] +model = "ft450d" +[rigs.rig.access] +type = "serial" +port = "/dev/ttyUSB0" +baud = 9600 +[rigs.audio] +port = 4531 + +[[rigs]] +id = "sdr" +[rigs.rig] +model = "soapysdr" +[rigs.rig.access] +type = "sdr" +args = "driver=rtlsdr" +[rigs.audio] +port = 4532 +"#; + let cfg: ServerConfig = toml::from_str(toml_str).unwrap(); + // validate() uses the flat [rig] field for rig-level checks; multi-rig + // validation focuses on ID/port uniqueness. The flat [rig] is default + // (no model), so the access check is skipped when both fields are absent. + assert!( + cfg.validate().is_ok(), + "expected Ok for valid multi-rig config" + ); + } } diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index 6b96bcf..9281956 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -6,7 +6,11 @@ //! //! Accepts client connections speaking the `ClientEnvelope`/`ClientResponse` //! protocol defined in `trx-protocol`. +//! +//! Multi-rig routing: `ClientEnvelope.rig_id` selects the target rig. +//! When absent the first rig in the map is used (backward compat). +use std::collections::HashMap; use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; @@ -14,28 +18,34 @@ use std::time::Duration; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{oneshot, watch}; use tokio::time; -use tracing::{error, info}; +use tracing::{error, info, warn}; use trx_core::rig::command::RigCommand; use trx_core::rig::request::RigRequest; -use trx_core::rig::state::RigState; use trx_protocol::auth::{SimpleTokenValidator, TokenValidator}; use trx_protocol::codec::parse_envelope; use trx_protocol::mapping; +use trx_protocol::types::{ClientCommand, RigEntry}; use trx_protocol::ClientResponse; +use crate::rig_handle::RigHandle; + const IO_TIMEOUT: Duration = Duration::from_secs(10); const REQUEST_TIMEOUT: Duration = Duration::from_secs(12); const MAX_JSON_LINE_BYTES: usize = 16 * 1024; /// Run the JSON TCP listener, accepting client connections. +/// +/// `rigs` is a shared map from rig_id → `RigHandle`. The first entry (by +/// insertion order — deterministic after MR-07 iterates `resolved_rigs()` in +/// order) is the default rig for backward-compat clients that omit `rig_id`. pub async fn run_listener( addr: SocketAddr, - rig_tx: mpsc::Sender, + rigs: Arc>, + default_rig_id: String, auth_tokens: HashSet, - state_rx: watch::Receiver, mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; @@ -49,12 +59,12 @@ pub async fn run_listener( let (socket, peer) = accept?; info!("Client connected: {}", peer); - let tx = rig_tx.clone(); - let srx = state_rx.clone(); + let rigs = Arc::clone(&rigs); + let default_rig_id = default_rig_id.clone(); let validator = Arc::clone(&validator); let client_shutdown_rx = shutdown_rx.clone(); tokio::spawn(async move { - if let Err(e) = handle_client(socket, peer, tx, validator, srx, client_shutdown_rx).await { + if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, client_shutdown_rx).await { error!("Client {} error: {:?}", peer, e); } }); @@ -147,9 +157,9 @@ async fn send_response( async fn handle_client( socket: TcpStream, addr: SocketAddr, - tx: mpsc::Sender, + rigs: Arc>, + default_rig_id: String, validator: Arc, - state_rx: watch::Receiver, mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let (reader, mut writer) = socket.into_split(); @@ -196,7 +206,9 @@ async fn handle_client( error!("Invalid JSON from {}: {} / {:?}", addr, trimmed, e); let resp = ClientResponse { success: false, + rig_id: None, state: None, + rigs: None, error: Some(format!("Invalid JSON: {}", e)), }; send_response(&mut writer, &resp).await?; @@ -207,23 +219,74 @@ async fn handle_client( if let Err(err) = validator.as_ref().validate(&envelope.token) { let resp = ClientResponse { success: false, + rig_id: None, state: None, + rigs: None, error: Some(err), }; send_response(&mut writer, &resp).await?; continue; } + // Resolve rig_id from the envelope (absent = default). + let target_rig_id = envelope + .rig_id + .as_deref() + .unwrap_or(&default_rig_id) + .to_string(); + + // GetRigs: aggregate all rig states and return without hitting any task. + if matches!(envelope.cmd, ClientCommand::GetRigs) { + let mut entries: Vec = Vec::new(); + for handle in rigs.values() { + let state = handle.state_rx.borrow().clone(); + if let Some(snapshot) = state.snapshot() { + entries.push(RigEntry { + rig_id: handle.rig_id.clone(), + state: snapshot, + }); + } + } + let resp = ClientResponse { + success: true, + rig_id: Some("server".to_string()), + state: None, + rigs: Some(entries), + error: None, + }; + send_response(&mut writer, &resp).await?; + continue; + } + + // Look up the target rig handle. + let handle = match rigs.get(&target_rig_id) { + Some(h) => h, + None => { + warn!("Unknown rig_id '{}' from {}", target_rig_id, addr); + let resp = ClientResponse { + success: false, + rig_id: Some(target_rig_id.clone()), + state: None, + rigs: None, + error: Some(format!("Unknown rig_id: {}", target_rig_id)), + }; + send_response(&mut writer, &resp).await?; + continue; + } + }; + let rig_cmd = mapping::client_command_to_rig(envelope.cmd); // Fast path: serve GetSnapshot directly from the watch channel // so clients get a response even while the rig task is initializing. if matches!(rig_cmd, RigCommand::GetSnapshot) { - let state = state_rx.borrow().clone(); + let state = handle.state_rx.borrow().clone(); if let Some(snapshot) = state.snapshot() { let resp = ClientResponse { success: true, + rig_id: Some(target_rig_id.clone()), state: Some(snapshot), + rigs: None, error: None, }; send_response(&mut writer, &resp).await?; @@ -237,13 +300,15 @@ async fn handle_client( respond_to: resp_tx, }; - match time::timeout(IO_TIMEOUT, tx.send(req)).await { + match time::timeout(IO_TIMEOUT, handle.rig_tx.send(req)).await { Ok(Ok(())) => {} Ok(Err(e)) => { - error!("Failed to send request to rig_task: {:?}", e); + error!("Failed to send request to rig_task for '{}': {:?}", target_rig_id, e); let resp = ClientResponse { success: false, + rig_id: Some(target_rig_id.clone()), state: None, + rigs: None, error: Some("Internal error: rig task not available".into()), }; send_response(&mut writer, &resp).await?; @@ -252,7 +317,9 @@ async fn handle_client( Err(_) => { let resp = ClientResponse { success: false, + rig_id: Some(target_rig_id.clone()), state: None, + rigs: None, error: Some("Internal error: request queue timeout".into()), }; send_response(&mut writer, &resp).await?; @@ -267,7 +334,9 @@ async fn handle_client( Err(_) => { let resp = ClientResponse { success: false, + rig_id: Some(target_rig_id.clone()), state: None, + rigs: None, error: Some("Request timed out waiting for rig response".into()), }; send_response(&mut writer, &resp).await?; @@ -289,7 +358,9 @@ async fn handle_client( Ok(Ok(snapshot)) => { let resp = ClientResponse { success: true, + rig_id: Some(target_rig_id.clone()), state: Some(snapshot), + rigs: None, error: None, }; send_response(&mut writer, &resp).await?; @@ -297,7 +368,9 @@ async fn handle_client( Ok(Err(err)) => { let resp = ClientResponse { success: false, + rig_id: Some(target_rig_id.clone()), state: None, + rigs: None, error: Some(err.message), }; send_response(&mut writer, &resp).await?; @@ -306,7 +379,9 @@ async fn handle_client( error!("Rig response oneshot recv error: {:?}", e); let resp = ClientResponse { success: false, + rig_id: Some(target_rig_id.clone()), state: None, + rigs: None, error: Some("Internal error waiting for rig response".into()), }; send_response(&mut writer, &resp).await?; @@ -325,9 +400,12 @@ mod tests { use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; + use tokio::sync::{mpsc, watch}; use trx_core::radio::freq::Band; + use trx_core::rig::request::RigRequest; use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo}; + use trx_core::rig::state::RigState; fn loopback_addr() -> SocketAddr { let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind"); @@ -367,18 +445,30 @@ mod tests { state } + fn make_rigs(state: RigState) -> (Arc>, String) { + let (rig_tx, _rig_rx) = mpsc::channel::(8); + let (state_tx, state_rx) = watch::channel(state); + let _state_tx = state_tx; + let handle = RigHandle { + rig_id: "default".to_string(), + rig_tx, + state_rx, + }; + let mut map = HashMap::new(); + map.insert("default".to_string(), handle); + (Arc::new(map), "default".to_string()) + } + #[tokio::test] #[ignore = "requires TCP bind permissions"] async fn listener_rejects_missing_token() { let addr = loopback_addr(); - let (rig_tx, _rig_rx) = mpsc::channel::(8); - let (state_tx, state_rx) = watch::channel(sample_state()); - let _state_tx = state_tx; + let (rigs, default_id) = make_rigs(sample_state()); let (shutdown_tx, shutdown_rx) = watch::channel(false); let mut auth = HashSet::new(); auth.insert("secret".to_string()); - let handle = tokio::spawn(run_listener(addr, rig_tx, auth, state_rx, shutdown_rx)); + let handle = tokio::spawn(run_listener(addr, rigs, default_id, auth, shutdown_rx)); let stream = TcpStream::connect(addr).await.expect("connect"); let (reader, mut writer) = stream.into_split(); @@ -406,16 +496,14 @@ mod tests { #[ignore = "requires TCP bind permissions"] async fn listener_serves_get_state_snapshot() { let addr = loopback_addr(); - let (rig_tx, _rig_rx) = mpsc::channel::(8); - let (state_tx, state_rx) = watch::channel(sample_state()); - let _state_tx = state_tx; + let (rigs, default_id) = make_rigs(sample_state()); let (shutdown_tx, shutdown_rx) = watch::channel(false); let handle = tokio::spawn(run_listener( addr, - rig_tx, + rigs, + default_id, HashSet::new(), - state_rx, shutdown_rx, )); @@ -437,6 +525,45 @@ mod tests { let snapshot = resp.state.expect("snapshot"); assert_eq!(snapshot.info.model, "Dummy"); assert_eq!(snapshot.status.freq.hz, 144_300_000); + // rig_id should be set in the response + assert_eq!(resp.rig_id.as_deref(), Some("default")); + + let _ = shutdown_tx.send(true); + handle.abort(); + let _ = handle.await; + } + + #[tokio::test] + #[ignore = "requires TCP bind permissions"] + async fn listener_routes_unknown_rig_id() { + let addr = loopback_addr(); + let (rigs, default_id) = make_rigs(sample_state()); + let (shutdown_tx, shutdown_rx) = watch::channel(false); + + let handle = tokio::spawn(run_listener( + addr, + rigs, + default_id, + HashSet::new(), + shutdown_rx, + )); + + let stream = TcpStream::connect(addr).await.expect("connect"); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + writer + .write_all(br#"{"rig_id":"nonexistent","cmd":"get_state"}"#) + .await + .expect("write"); + writer.write_all(b"\n").await.expect("newline"); + writer.flush().await.expect("flush"); + + let mut line = String::new(); + reader.read_line(&mut line).await.expect("read"); + let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json"); + assert!(!resp.success); + assert!(resp.error.as_deref().unwrap_or("").contains("Unknown rig_id")); let _ = shutdown_tx.send(true); handle.abort(); diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 78451af..a54f7ab 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -8,12 +8,15 @@ mod config; mod error; mod listener; mod pskreporter; +mod rig_handle; mod rig_task; +use std::collections::HashMap; use std::collections::HashSet; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::ptr::NonNull; +use std::sync::Arc; use std::time::Duration; use bytes::Bytes; @@ -32,7 +35,9 @@ use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; use trx_core::DynResult; -use config::ServerConfig; +use audio::DecoderHistories; +use config::{RigInstanceConfig, ServerConfig}; +use rig_handle::RigHandle; use trx_decode_log::DecoderLoggers; #[cfg(feature = "soapysdr")] @@ -101,7 +106,7 @@ fn parse_serial_addr(addr: &str) -> DynResult<(String, u32)> { Ok((path.to_string(), baud)) } -/// Resolved configuration after merging config file and CLI arguments. +/// Resolved configuration for the first/only rig (legacy single-rig CLI path). struct ResolvedConfig { rig: String, access: RigAccess, @@ -190,51 +195,35 @@ fn resolve_config( }) } -fn build_rig_task_config( - resolved: &ResolvedConfig, - cfg: &ServerConfig, - registry: std::sync::Arc, -) -> rig_task::RigTaskConfig { - let pskreporter_status = if cfg.pskreporter.enabled { - let has_locator = cfg.pskreporter.receiver_locator.is_some() - || (resolved.latitude.is_some() && resolved.longitude.is_some()); - if has_locator { - Some(format!( - "Enabled ({}:{})", - cfg.pskreporter.host, cfg.pskreporter.port - )) - } else { - Some(format!( - "Enabled but inactive (missing locator source) ({}:{})", - cfg.pskreporter.host, cfg.pskreporter.port - )) +/// Derive a `RigAccess` from a rig instance config's access fields. +fn access_from_rig_instance(rig_cfg: &RigInstanceConfig) -> DynResult { + match rig_cfg.rig.access.access_type.as_deref() { + Some("serial") | None => { + let path = rig_cfg + .rig + .access + .port + .clone() + .unwrap_or_else(|| "/dev/ttyUSB0".to_string()); + let baud = rig_cfg.rig.access.baud.unwrap_or(9600); + Ok(RigAccess::Serial { path, baud }) } - } else { - Some("Disabled".to_string()) - }; - - rig_task::RigTaskConfig { - registry, - rig_model: resolved.rig.clone(), - access: resolved.access.clone(), - polling: AdaptivePolling::new( - Duration::from_millis(cfg.behavior.poll_interval_ms), - Duration::from_millis(cfg.behavior.poll_interval_tx_ms), - ), - retry: ExponentialBackoff::new( - cfg.behavior.max_retries.max(1), - Duration::from_millis(cfg.behavior.retry_base_delay_ms), - Duration::from_secs(RETRY_MAX_DELAY_SECS), - ), - initial_freq_hz: cfg.rig.initial_freq_hz, - initial_mode: cfg.rig.initial_mode.clone(), - server_callsign: resolved.callsign.clone(), - server_version: Some(env!("CARGO_PKG_VERSION").to_string()), - server_build_date: Some(env!("TRX_SERVER_BUILD_DATE").to_string()), - server_latitude: resolved.latitude, - server_longitude: resolved.longitude, - pskreporter_status, - prebuilt_rig: None, + Some("tcp") => { + let host = rig_cfg.rig.access.host.clone().unwrap_or_default(); + let port = rig_cfg.rig.access.tcp_port.unwrap_or(0); + Ok(RigAccess::Tcp { + addr: format!("{}:{}", host, port), + }) + } + Some("sdr") => { + let args = rig_cfg.rig.access.args.clone().unwrap_or_default(); + Ok(RigAccess::Sdr { args }) + } + Some(other) => Err(format!( + "Unknown access type '{}' for rig '{}'", + other, rig_cfg.id + ) + .into()), } } @@ -271,13 +260,10 @@ fn parse_rig_mode( } } -/// Build a `SoapySdrRig` with full channel config from `ServerConfig` and -/// return both the rig box and a PCM receiver subscribed to its primary channel. -/// -/// Only compiled when the `soapysdr` feature is enabled. +/// Build a `SoapySdrRig` with full channel config from a `RigInstanceConfig`. #[cfg(feature = "soapysdr")] -fn build_sdr_rig( - cfg: &ServerConfig, +fn build_sdr_rig_from_instance( + rig_cfg: &RigInstanceConfig, ) -> DynResult<( Box, tokio::sync::broadcast::Receiver>, @@ -285,14 +271,14 @@ fn build_sdr_rig( use trx_core::radio::freq::Freq; use trx_core::rig::AudioSource; - let args = cfg.rig.access.args.as_deref().unwrap_or(""); - let channels: Vec<(f64, trx_core::rig::state::RigMode, u32, usize)> = cfg + let args = rig_cfg.rig.access.args.as_deref().unwrap_or(""); + let channels: Vec<(f64, trx_core::rig::state::RigMode, u32, usize)> = rig_cfg .sdr .channels .iter() .map(|ch| { - let if_hz = (cfg.sdr.center_offset_hz + ch.offset_hz) as f64; - let mode = parse_rig_mode(&ch.mode, &cfg.rig.initial_mode); + let if_hz = (rig_cfg.sdr.center_offset_hz + ch.offset_hz) as f64; + let mode = parse_rig_mode(&ch.mode, &rig_cfg.rig.initial_mode); (if_hz, mode, ch.audio_bandwidth_hz, ch.fir_taps) }) .collect(); @@ -300,28 +286,290 @@ fn build_sdr_rig( let sdr_rig = trx_backend_soapysdr::SoapySdrRig::new_with_config( args, &channels, - &cfg.sdr.gain.mode, - cfg.sdr.gain.value, - cfg.audio.sample_rate, - cfg.audio.frame_duration_ms, + &rig_cfg.sdr.gain.mode, + rig_cfg.sdr.gain.value, + rig_cfg.audio.sample_rate, + rig_cfg.audio.frame_duration_ms, Freq { - hz: cfg.rig.initial_freq_hz, + hz: rig_cfg.rig.initial_freq_hz, }, - cfg.rig.initial_mode.clone(), - cfg.sdr.sample_rate, + rig_cfg.rig.initial_mode.clone(), + rig_cfg.sdr.sample_rate, )?; - // Subscribe to the primary channel's PCM broadcast before consuming the rig. let pcm_rx = sdr_rig.subscribe_pcm(); Ok((Box::new(sdr_rig) as Box, pcm_rx)) } +/// Build a `RigTaskConfig` for a single rig instance. +fn build_rig_task_config( + rig_cfg: &RigInstanceConfig, + rig_model: String, + access: RigAccess, + callsign: Option, + latitude: Option, + longitude: Option, + registry: Arc, + histories: Arc, +) -> rig_task::RigTaskConfig { + let pskreporter_status = if rig_cfg.pskreporter.enabled { + let has_locator = rig_cfg.pskreporter.receiver_locator.is_some() + || (latitude.is_some() && longitude.is_some()); + if has_locator { + Some(format!( + "Enabled ({}:{})", + rig_cfg.pskreporter.host, rig_cfg.pskreporter.port + )) + } else { + Some(format!( + "Enabled but inactive (missing locator source) ({}:{})", + rig_cfg.pskreporter.host, rig_cfg.pskreporter.port + )) + } + } else { + Some("Disabled".to_string()) + }; + + rig_task::RigTaskConfig { + registry, + rig_id: rig_cfg.id.clone(), + rig_model, + access, + polling: AdaptivePolling::new( + Duration::from_millis(rig_cfg.behavior.poll_interval_ms), + Duration::from_millis(rig_cfg.behavior.poll_interval_tx_ms), + ), + retry: ExponentialBackoff::new( + rig_cfg.behavior.max_retries.max(1), + Duration::from_millis(rig_cfg.behavior.retry_base_delay_ms), + Duration::from_secs(RETRY_MAX_DELAY_SECS), + ), + initial_freq_hz: rig_cfg.rig.initial_freq_hz, + initial_mode: rig_cfg.rig.initial_mode.clone(), + server_callsign: callsign, + server_version: Some(env!("CARGO_PKG_VERSION").to_string()), + server_build_date: Some(env!("TRX_SERVER_BUILD_DATE").to_string()), + server_latitude: latitude, + server_longitude: longitude, + pskreporter_status, + histories, + prebuilt_rig: None, + } +} + +/// Spawn all audio-related tasks for one rig instance. +/// +/// `sdr_pcm_rx` carries a live SDR PCM receiver when the rig uses the +/// SoapySDR backend; `None` selects the cpal capture path. +fn spawn_rig_audio_stack( + rig_cfg: &RigInstanceConfig, + state_rx: watch::Receiver, + shutdown_rx: &watch::Receiver, + histories: Arc, + callsign: Option, + latitude: Option, + longitude: Option, + listen_override: Option, + sdr_pcm_rx: Option>>, +) -> Vec> { + let mut handles: Vec> = Vec::new(); + + if !rig_cfg.audio.enabled { + return handles; + } + + let audio_listen = SocketAddr::from(( + listen_override.unwrap_or(rig_cfg.audio.listen), + rig_cfg.audio.port, + )); + let stream_info = AudioStreamInfo { + sample_rate: rig_cfg.audio.sample_rate, + channels: rig_cfg.audio.channels, + frame_duration_ms: rig_cfg.audio.frame_duration_ms, + }; + + let (rx_audio_tx, _) = broadcast::channel::(256); + let (tx_audio_tx, tx_audio_rx) = mpsc::channel::(64); + + // PCM tap for server-side decoders + let (pcm_tx, _) = broadcast::channel::>(64); + // Decoded messages broadcast + let (decode_tx, _) = broadcast::channel::(256); + + if rig_cfg.pskreporter.enabled { + let cs = callsign.clone().unwrap_or_default(); + if cs.trim().is_empty() { + warn!( + "[{}] PSK Reporter enabled but [general].callsign is empty; uplink disabled", + rig_cfg.id + ); + } else { + let pr_cfg = rig_cfg.pskreporter.clone(); + let pr_state_rx = state_rx.clone(); + let pr_decode_rx = decode_tx.subscribe(); + let pr_shutdown_rx = shutdown_rx.clone(); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = pskreporter::run_pskreporter_uplink( + pr_cfg, + cs, + latitude, + longitude, + pr_state_rx, + pr_decode_rx + ) => {} + _ = wait_for_shutdown(pr_shutdown_rx) => {} + } + })); + } + } + + if rig_cfg.aprsfi.enabled { + let cs = callsign.clone().unwrap_or_default(); + if cs.trim().is_empty() { + warn!( + "[{}] APRS-IS IGate enabled but [general].callsign is empty; uplink disabled", + rig_cfg.id + ); + } else { + let ai_cfg = rig_cfg.aprsfi.clone(); + let ai_decode_rx = decode_tx.subscribe(); + let ai_shutdown_rx = shutdown_rx.clone(); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = aprsfi::run_aprsfi_uplink(ai_cfg, cs, ai_decode_rx) => {} + _ = wait_for_shutdown(ai_shutdown_rx) => {} + } + })); + } + } + + let decoder_logs = match DecoderLoggers::from_config(&rig_cfg.decode_logs) { + Ok(v) => v, + Err(e) => { + warn!("[{}] Decoder file logging disabled: {}", rig_cfg.id, e); + None + } + }; + + if rig_cfg.audio.rx_enabled { + if let Some(mut sdr_rx) = sdr_pcm_rx { + // SDR path: the backend pipeline provides demodulated PCM. + info!("[{}] using SDR audio source — cpal capture disabled", rig_cfg.id); + let pcm_tx_clone = pcm_tx.clone(); + handles.push(tokio::spawn(async move { + loop { + match sdr_rx.recv().await { + Ok(frame) => { + let _ = pcm_tx_clone.send(frame); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("SDR audio bridge: dropped {} frames", n); + } + Err(_) => break, + } + } + })); + } else { + // cpal path (serial/TCP transceivers) + let _capture_thread = audio::spawn_audio_capture( + &rig_cfg.audio, + rx_audio_tx.clone(), + Some(pcm_tx.clone()), + ); + } + + // Spawn APRS decoder task + let aprs_pcm_rx = pcm_tx.subscribe(); + let aprs_state_rx = state_rx.clone(); + let aprs_decode_tx = decode_tx.clone(); + let aprs_sr = rig_cfg.audio.sample_rate; + let aprs_ch = rig_cfg.audio.channels; + let aprs_shutdown_rx = shutdown_rx.clone(); + let aprs_logs = decoder_logs.clone(); + let aprs_histories = histories.clone(); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, aprs_logs, aprs_histories) => {} + _ = wait_for_shutdown(aprs_shutdown_rx) => {} + } + })); + + // Spawn CW decoder task (no histories needed — CW has no persistent history) + let cw_pcm_rx = pcm_tx.subscribe(); + let cw_state_rx = state_rx.clone(); + let cw_decode_tx = decode_tx.clone(); + let cw_sr = rig_cfg.audio.sample_rate; + let cw_ch = rig_cfg.audio.channels; + let cw_shutdown_rx = shutdown_rx.clone(); + let cw_logs = decoder_logs.clone(); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {} + _ = wait_for_shutdown(cw_shutdown_rx) => {} + } + })); + + // Spawn FT8 decoder task + let ft8_pcm_rx = pcm_tx.subscribe(); + let ft8_state_rx = state_rx.clone(); + let ft8_decode_tx = decode_tx.clone(); + let ft8_sr = rig_cfg.audio.sample_rate; + let ft8_ch = rig_cfg.audio.channels; + let ft8_shutdown_rx = shutdown_rx.clone(); + let ft8_logs = decoder_logs.clone(); + let ft8_histories = histories.clone(); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx, ft8_logs, ft8_histories) => {} + _ = wait_for_shutdown(ft8_shutdown_rx) => {} + } + })); + + // Spawn WSPR decoder task + let wspr_pcm_rx = pcm_tx.subscribe(); + let wspr_state_rx = state_rx.clone(); + let wspr_decode_tx = decode_tx.clone(); + let wspr_sr = rig_cfg.audio.sample_rate; + let wspr_ch = rig_cfg.audio.channels; + let wspr_shutdown_rx = shutdown_rx.clone(); + let wspr_logs = decoder_logs.clone(); + let wspr_histories = histories.clone(); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx, wspr_logs, wspr_histories) => {} + _ = wait_for_shutdown(wspr_shutdown_rx) => {} + } + })); + } + + if rig_cfg.audio.tx_enabled { + let _playback_thread = audio::spawn_audio_playback(&rig_cfg.audio, tx_audio_rx); + } + + let audio_shutdown_rx = shutdown_rx.clone(); + let audio_histories = histories; + handles.push(tokio::spawn(async move { + if let Err(e) = audio::run_audio_listener( + audio_listen, + rx_audio_tx, + tx_audio_tx, + stream_info, + decode_tx, + audio_shutdown_rx, + audio_histories, + ) + .await + { + error!("Audio listener error: {:?}", e); + } + })); + + handles +} + #[tokio::main] async fn main() -> DynResult<()> { - // Phase 3B: Create bootstrap context for explicit initialization. - // This replaces reliance on global mutable state, though currently - // built-in backends still register on globals for plugin compatibility. - // Full de-globalization would require threading context through rig_task and listener. let mut bootstrap_ctx = RegistrationContext::new(); register_builtin_backends_on(&mut bootstrap_ctx); @@ -341,7 +589,7 @@ async fn main() -> DynResult<()> { cfg.validate() .map_err(|e| format!("Invalid server configuration: {}", e))?; - // Validate SDR-specific configuration rules (see SDR.md §11). + // Validate SDR-specific configuration rules. let sdr_errors = cfg.validate_sdr(); if !sdr_errors.is_empty() { for e in &sdr_errors { @@ -359,98 +607,203 @@ async fn main() -> DynResult<()> { info!("Loaded configuration from {}", path.display()); } - let resolved = resolve_config(&cli, &cfg, &bootstrap_ctx)?; + let registry = Arc::new(bootstrap_ctx); - match &resolved.access { - RigAccess::Serial { path, baud } => { - info!( - "Starting trx-server (rig: {}, access: serial {} @ {} baud)", - resolved.rig, path, baud - ); - } - RigAccess::Tcp { addr } => { - info!( - "Starting trx-server (rig: {}, access: tcp {})", - resolved.rig, addr - ); - } - RigAccess::Sdr { args } => { - info!( - "Starting trx-server (rig: {}, access: sdr {})", - resolved.rig, args - ); - } - } + // --- Resolve the effective rig list --- + // + // Legacy path: no [[rigs]] → synthesise from flat fields + CLI overrides. + // Multi-rig path: [[rigs]] entries are used as-is; CLI rig/access flags + // are ignored (no unambiguous target). + let mut resolved_rigs = cfg.resolved_rigs(); - if let Some(ref cs) = resolved.callsign { + let (callsign, latitude, longitude) = if cfg.rigs.is_empty() { + // Apply CLI overrides to the first (only) rig. + let legacy = resolve_config(&cli, &cfg, ®istry)?; + + let first = resolved_rigs + .first_mut() + .expect("resolved_rigs always has ≥1 entry"); + + first.rig.model = Some(legacy.rig.clone()); + match &legacy.access { + RigAccess::Serial { path, baud } => { + first.rig.access.access_type = Some("serial".to_string()); + first.rig.access.port = Some(path.clone()); + first.rig.access.baud = Some(*baud); + } + RigAccess::Tcp { addr } => { + first.rig.access.access_type = Some("tcp".to_string()); + // Split "host:port" back into parts. + if let Some(colon) = addr.rfind(':') { + first.rig.access.host = Some(addr[..colon].to_string()); + first.rig.access.tcp_port = addr[colon + 1..].parse().ok(); + } + } + RigAccess::Sdr { args } => { + first.rig.access.access_type = Some("sdr".to_string()); + first.rig.access.args = Some(args.clone()); + } + } + (legacy.callsign, legacy.latitude, legacy.longitude) + } else { + // Multi-rig path: validate all rig models are registered. + for rig_cfg in &resolved_rigs { + if let Some(ref model) = rig_cfg.rig.model { + let norm = normalize_name(model); + if !registry.is_backend_registered(&norm) { + return Err(format!( + "Unknown rig model '{}' for rig '{}' (available: {})", + norm, + rig_cfg.id, + registry.registered_backends().join(", ") + ) + .into()); + } + } + } + let callsign = cli.callsign.clone().or_else(|| cfg.general.callsign.clone()); + (callsign, cfg.general.latitude, cfg.general.longitude) + }; + + info!( + "Starting trx-server with {} rig(s): {}", + resolved_rigs.len(), + resolved_rigs + .iter() + .map(|r| r.id.as_str()) + .collect::>() + .join(", ") + ); + if let Some(ref cs) = callsign { info!("Callsign: {}", cs); } - // For the SDR access type: build the SoapySdrRig with full channel config - // here in main so we can subscribe to its primary-channel PCM sender - // before passing the rig to the rig task. The rig task skips its - // registry factory when `prebuilt_rig` is set. - // - // When the `soapysdr` feature is disabled this block is elided and - // `sdr_pcm_rx` is always `None`, preserving the cpal path. - #[cfg(feature = "soapysdr")] - let (sdr_prebuilt_rig, sdr_pcm_rx): ( - Option>, - Option>>, - ) = if cfg.rig.access.access_type.as_deref() == Some("sdr") { - let (rig, pcm_rx) = build_sdr_rig(&cfg)?; - (Some(rig), Some(pcm_rx)) - } else { - (None, None) - }; - - #[cfg(not(feature = "soapysdr"))] - let (sdr_prebuilt_rig, sdr_pcm_rx): ( - Option>, - Option>>, - ) = (None, None); - - let (tx, rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); let mut task_handles: Vec> = Vec::new(); let (shutdown_tx, shutdown_rx) = watch::channel(false); - let initial_state = RigState::new_with_metadata( - resolved.callsign.clone(), - Some(env!("CARGO_PKG_VERSION").to_string()), - Some(env!("TRX_SERVER_BUILD_DATE").to_string()), - resolved.latitude, - resolved.longitude, - cfg.rig.initial_freq_hz, - cfg.rig.initial_mode.clone(), - ); - let mut initial_state = initial_state; - initial_state.pskreporter_status = if cfg.pskreporter.enabled { - Some(format!( - "Enabled ({}:{})", - cfg.pskreporter.host, cfg.pskreporter.port - )) - } else { - Some("Disabled".to_string()) - }; - let (state_tx, state_rx) = watch::channel(initial_state); - // Keep receivers alive so channels don't close prematurely - let _state_rx = state_rx; - let mut rig_task_config = - build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx)); + // The first rig id is the default for backward-compat clients that omit rig_id. + let default_rig_id = resolved_rigs + .first() + .map(|r| r.id.clone()) + .unwrap_or_else(|| "default".to_string()); - // Pass pre-built SDR rig to the task so it skips the registry factory. - if let Some(prebuilt) = sdr_prebuilt_rig { - rig_task_config.prebuilt_rig = Some(prebuilt); + let mut rig_handles: HashMap = HashMap::new(); + + for rig_cfg in &resolved_rigs { + let rig_model = normalize_name(rig_cfg.rig.model.as_deref().unwrap_or("")); + + let access = access_from_rig_instance(rig_cfg)?; + + match &access { + RigAccess::Serial { path, baud } => { + info!( + "[{}] Starting (rig: {}, access: serial {} @ {} baud)", + rig_cfg.id, rig_model, path, baud + ); + } + RigAccess::Tcp { addr } => { + info!( + "[{}] Starting (rig: {}, access: tcp {})", + rig_cfg.id, rig_model, addr + ); + } + RigAccess::Sdr { args } => { + info!( + "[{}] Starting (rig: {}, access: sdr {})", + rig_cfg.id, rig_model, args + ); + } + } + + // Build SDR rig when applicable. + #[cfg(feature = "soapysdr")] + let (sdr_prebuilt_rig, sdr_pcm_rx): ( + Option>, + Option>>, + ) = if rig_cfg.rig.access.access_type.as_deref() == Some("sdr") { + let (rig, pcm_rx) = build_sdr_rig_from_instance(rig_cfg)?; + (Some(rig), Some(pcm_rx)) + } else { + (None, None) + }; + + #[cfg(not(feature = "soapysdr"))] + let (sdr_prebuilt_rig, sdr_pcm_rx): ( + Option>, + Option>>, + ) = (None, None); + + let histories = DecoderHistories::new(); + + let (rig_tx, rig_rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); + let mut initial_state = RigState::new_with_metadata( + callsign.clone(), + Some(env!("CARGO_PKG_VERSION").to_string()), + Some(env!("TRX_SERVER_BUILD_DATE").to_string()), + latitude, + longitude, + rig_cfg.rig.initial_freq_hz, + rig_cfg.rig.initial_mode.clone(), + ); + initial_state.pskreporter_status = if rig_cfg.pskreporter.enabled { + Some(format!( + "Enabled ({}:{})", + rig_cfg.pskreporter.host, rig_cfg.pskreporter.port + )) + } else { + Some("Disabled".to_string()) + }; + let (state_tx, state_rx) = watch::channel(initial_state); + + let mut task_config = build_rig_task_config( + rig_cfg, + rig_model, + access, + callsign.clone(), + latitude, + longitude, + Arc::clone(®istry), + histories.clone(), + ); + if let Some(prebuilt) = sdr_prebuilt_rig { + task_config.prebuilt_rig = Some(prebuilt); + } + + // Spawn rig task. + let rig_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + if let Err(e) = + rig_task::run_rig_task(task_config, rig_rx, state_tx, rig_shutdown_rx).await + { + error!("Rig task error: {:?}", e); + } + })); + + // Spawn audio stack. + let audio_handles = spawn_rig_audio_stack( + rig_cfg, + state_rx.clone(), + &shutdown_rx, + histories.clone(), + callsign.clone(), + latitude, + longitude, + cli.listen, + sdr_pcm_rx, + ); + task_handles.extend(audio_handles); + + rig_handles.insert( + rig_cfg.id.clone(), + RigHandle { + rig_id: rig_cfg.id.clone(), + rig_tx, + state_rx, + }, + ); } - let rig_shutdown_rx = shutdown_rx.clone(); - task_handles.push(tokio::spawn(async move { - if let Err(e) = rig_task::run_rig_task(rig_task_config, rx, state_tx, rig_shutdown_rx).await - { - error!("Rig task error: {:?}", e); - } - })); - + // Start JSON TCP listener. if cfg.listen.enabled { let listen_ip = cli.listen.unwrap_or(cfg.listen.listen); let listen_port = cli.port.unwrap_or(cfg.listen.port); @@ -463,15 +816,14 @@ async fn main() -> DynResult<()> { .filter(|t| !t.is_empty()) .cloned() .collect(); - let rig_tx = tx.clone(); - let state_rx_listener = _state_rx.clone(); + let rigs_arc = Arc::new(rig_handles); let listener_shutdown_rx = shutdown_rx.clone(); task_handles.push(tokio::spawn(async move { if let Err(e) = listener::run_listener( listen_addr, - rig_tx, + rigs_arc, + default_rig_id, auth_tokens, - state_rx_listener, listener_shutdown_rx, ) .await @@ -481,190 +833,9 @@ async fn main() -> DynResult<()> { })); } - if cfg.audio.enabled { - let audio_listen = - SocketAddr::from((cli.listen.unwrap_or(cfg.audio.listen), cfg.audio.port)); - let stream_info = AudioStreamInfo { - sample_rate: cfg.audio.sample_rate, - channels: cfg.audio.channels, - frame_duration_ms: cfg.audio.frame_duration_ms, - }; - - let (rx_audio_tx, _) = broadcast::channel::(256); - let (tx_audio_tx, tx_audio_rx) = mpsc::channel::(64); - - // PCM tap for server-side decoders - let (pcm_tx, _) = broadcast::channel::>(64); - // Decoded messages broadcast - let (decode_tx, _) = broadcast::channel::(256); - - if cfg.pskreporter.enabled { - let callsign = resolved.callsign.clone().unwrap_or_default(); - if callsign.trim().is_empty() { - warn!("PSK Reporter enabled but [general].callsign is empty; uplink disabled"); - } else { - let pr_cfg = cfg.pskreporter.clone(); - let pr_state_rx = _state_rx.clone(); - let pr_decode_rx = decode_tx.subscribe(); - let pr_shutdown_rx = shutdown_rx.clone(); - let latitude = resolved.latitude; - let longitude = resolved.longitude; - task_handles.push(tokio::spawn(async move { - tokio::select! { - _ = pskreporter::run_pskreporter_uplink( - pr_cfg, - callsign, - latitude, - longitude, - pr_state_rx, - pr_decode_rx - ) => {} - _ = wait_for_shutdown(pr_shutdown_rx) => {} - } - })); - } - } - - if cfg.aprsfi.enabled { - let callsign = resolved.callsign.clone().unwrap_or_default(); - if callsign.trim().is_empty() { - warn!("APRS-IS IGate enabled but [general].callsign is empty; uplink disabled"); - } else { - let ai_cfg = cfg.aprsfi.clone(); - let ai_decode_rx = decode_tx.subscribe(); - let ai_shutdown_rx = shutdown_rx.clone(); - task_handles.push(tokio::spawn(async move { - tokio::select! { - _ = aprsfi::run_aprsfi_uplink(ai_cfg, callsign, ai_decode_rx) => {} - _ = wait_for_shutdown(ai_shutdown_rx) => {} - } - })); - } - } - - let decoder_logs = match DecoderLoggers::from_config(&cfg.decode_logs) { - Ok(v) => v, - Err(e) => { - warn!("Decoder file logging disabled: {}", e); - None - } - }; - - if cfg.audio.rx_enabled { - if let Some(mut sdr_rx) = sdr_pcm_rx { - // SDR path: the backend pipeline provides demodulated PCM, - // so cpal capture is skipped entirely. - // The SDR PCM frames are bridged into pcm_tx so the existing - // decoder spawn code below receives them unchanged. - tracing::info!("using SDR audio source — cpal capture disabled"); - let pcm_tx_clone = pcm_tx.clone(); - task_handles.push(tokio::spawn(async move { - loop { - match sdr_rx.recv().await { - Ok(frame) => { - let _ = pcm_tx_clone.send(frame); - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!("SDR audio bridge: dropped {} frames", n); - } - Err(_) => break, - } - } - })); - } else { - // cpal path (existing serial/TCP transceivers) - let _capture_thread = audio::spawn_audio_capture( - &cfg.audio, - rx_audio_tx.clone(), - Some(pcm_tx.clone()), - ); - } - - // Spawn APRS decoder task - let aprs_pcm_rx = pcm_tx.subscribe(); - let aprs_state_rx = _state_rx.clone(); - let aprs_decode_tx = decode_tx.clone(); - let aprs_sr = cfg.audio.sample_rate; - let aprs_ch = cfg.audio.channels; - let aprs_shutdown_rx = shutdown_rx.clone(); - let aprs_logs = decoder_logs.clone(); - task_handles.push(tokio::spawn(async move { - tokio::select! { - _ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, aprs_logs) => {} - _ = wait_for_shutdown(aprs_shutdown_rx) => {} - } - })); - - // Spawn CW decoder task - let cw_pcm_rx = pcm_tx.subscribe(); - let cw_state_rx = _state_rx.clone(); - let cw_decode_tx = decode_tx.clone(); - let cw_sr = cfg.audio.sample_rate; - let cw_ch = cfg.audio.channels; - let cw_shutdown_rx = shutdown_rx.clone(); - let cw_logs = decoder_logs.clone(); - task_handles.push(tokio::spawn(async move { - tokio::select! { - _ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {} - _ = wait_for_shutdown(cw_shutdown_rx) => {} - } - })); - - // Spawn FT8 decoder task - let ft8_pcm_rx = pcm_tx.subscribe(); - let ft8_state_rx = _state_rx.clone(); - let ft8_decode_tx = decode_tx.clone(); - let ft8_sr = cfg.audio.sample_rate; - let ft8_ch = cfg.audio.channels; - let ft8_shutdown_rx = shutdown_rx.clone(); - let ft8_logs = decoder_logs.clone(); - task_handles.push(tokio::spawn(async move { - tokio::select! { - _ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx, ft8_logs) => {} - _ = wait_for_shutdown(ft8_shutdown_rx) => {} - } - })); - - // Spawn WSPR decoder task - let wspr_pcm_rx = pcm_tx.subscribe(); - let wspr_state_rx = _state_rx.clone(); - let wspr_decode_tx = decode_tx.clone(); - let wspr_sr = cfg.audio.sample_rate; - let wspr_ch = cfg.audio.channels; - let wspr_shutdown_rx = shutdown_rx.clone(); - let wspr_logs = decoder_logs.clone(); - task_handles.push(tokio::spawn(async move { - tokio::select! { - _ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx, wspr_logs) => {} - _ = wait_for_shutdown(wspr_shutdown_rx) => {} - } - })); - } - if cfg.audio.tx_enabled { - let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx); - } - - let audio_shutdown_rx = shutdown_rx.clone(); - task_handles.push(tokio::spawn(async move { - if let Err(e) = audio::run_audio_listener( - audio_listen, - rx_audio_tx, - tx_audio_tx, - stream_info, - decode_tx, - audio_shutdown_rx, - ) - .await - { - error!("Audio listener error: {:?}", e); - } - })); - } - signal::ctrl_c().await?; info!("Ctrl+C received, shutting down"); let _ = shutdown_tx.send(true); - drop(tx); tokio::time::sleep(Duration::from_millis(400)).await; for handle in &task_handles { diff --git a/src/trx-server/src/rig_handle.rs b/src/trx-server/src/rig_handle.rs new file mode 100644 index 0000000..ccadad2 --- /dev/null +++ b/src/trx-server/src/rig_handle.rs @@ -0,0 +1,23 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Thin handle giving the listener access to one rig's task and state. + +use tokio::sync::{mpsc, watch}; + +use trx_core::rig::request::RigRequest; +use trx_core::rig::state::RigState; + +/// A handle to a single running rig backend. +/// +/// One `RigHandle` is created per rig in `main.rs` and stored in the shared +/// `Arc>` passed to the listener. +pub struct RigHandle { + /// Stable rig identifier, matches the key in the HashMap. + pub rig_id: String, + /// Send commands to the rig task. + pub rig_tx: mpsc::Sender, + /// Watch the latest rig state for fast GetState/GetRigs responses. + pub state_rx: watch::Receiver, +} diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 510b3b8..9248177 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -24,12 +24,14 @@ use trx_core::rig::state::{RigMode, RigSnapshot, RigState}; use trx_core::rig::{RigCat, RigRxStatus, RigTxStatus}; use trx_core::{DynResult, RigError, RigResult}; -use crate::audio; +use crate::audio::DecoderHistories; use crate::error::is_invalid_bcd_error; /// Configuration for the rig task. pub struct RigTaskConfig { pub registry: Arc, + /// Stable rig identifier (matches the key in the listener's HashMap). + pub rig_id: String, pub rig_model: String, pub access: RigAccess, pub polling: AdaptivePolling, @@ -42,6 +44,9 @@ pub struct RigTaskConfig { pub server_latitude: Option, pub server_longitude: Option, pub pskreporter_status: Option, + /// Per-rig decoder history store. Used by Reset* commands to clear the + /// history and by the audio listener to serve history on connection. + pub histories: Arc, /// Pre-built rig backend. When `Some`, the registry factory is skipped. /// Used by the SDR path in `main.rs` to pass a fully-configured /// `SoapySdrRig` (built with channel config) without duplicating the @@ -55,6 +60,7 @@ impl Default for RigTaskConfig { trx_backend::register_builtin_backends_on(&mut registry); Self { registry: Arc::new(registry), + rig_id: "default".to_string(), rig_model: "ft817".to_string(), access: RigAccess::Serial { path: "/dev/ttyUSB0".to_string(), @@ -70,6 +76,7 @@ impl Default for RigTaskConfig { server_latitude: None, server_longitude: None, pskreporter_status: None, + histories: DecoderHistories::new(), prebuilt_rig: None, } } @@ -93,7 +100,8 @@ pub async fn run_rig_task( state_tx: watch::Sender, mut shutdown_rx: watch::Receiver, ) -> DynResult<()> { - info!("Opening rig backend {}", config.rig_model); + let histories = config.histories.clone(); + info!("[{}] Opening rig backend {}", config.rig_id, config.rig_model); match &config.access { RigAccess::Serial { path, baud } => info!("Serial: {} @ {} baud", path, baud), RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr), @@ -317,6 +325,7 @@ pub async fn run_rig_task( last_power_on: &mut last_power_on, state_tx: &state_tx, retry, + histories: &histories, }; let result = process_command(cmd, &mut cmd_ctx).await; @@ -347,6 +356,7 @@ struct CommandExecContext<'a> { last_power_on: &'a mut Option, state_tx: &'a watch::Sender, retry: &'a ExponentialBackoff, + histories: &'a Arc, } async fn process_command( @@ -393,7 +403,7 @@ async fn process_command( return snapshot_from(ctx.state); } RigCommand::ResetAprsDecoder => { - audio::clear_aprs_history(); + ctx.histories.clear_aprs_history(); ctx.state.aprs_decode_reset_seq += 1; let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state); @@ -404,13 +414,13 @@ async fn process_command( return snapshot_from(ctx.state); } RigCommand::ResetFt8Decoder => { - audio::clear_ft8_history(); + ctx.histories.clear_ft8_history(); ctx.state.ft8_decode_reset_seq += 1; let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state); } RigCommand::ResetWsprDecoder => { - audio::clear_wspr_history(); + ctx.histories.clear_wspr_history(); ctx.state.wspr_decode_reset_seq += 1; let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state);