[feat](trx-server): implement multi-rig support
Enable N simultaneous rig backends in one server process:
- rig_handle.rs: new RigHandle { rig_id, rig_tx, state_rx } thin struct
- config.rs: RigInstanceConfig, rigs: Vec<RigInstanceConfig> in ServerConfig,
resolved_rigs() (synthesises legacy flat fields as id="default"),
validate() checks unique rig IDs and audio ports; MR-08 config tests
- audio.rs: replace four OnceLock<Mutex<VecDeque>> statics with
DecoderHistories { aprs, ft8, wspr } Arc struct; decoder/listener
functions now take Arc<DecoderHistories> for per-rig isolation
- rig_task.rs: add rig_id + histories: Arc<DecoderHistories> to
RigTaskConfig; clear_*_history calls use ctx.histories instance methods
- listener.rs: run_listener takes Arc<HashMap<String, RigHandle>> +
default_rig_id; routes envelope.rig_id to correct rig; GetRigs fast
path aggregates all rig states; all responses include rig_id field
- main.rs: loop over resolved_rigs(); spawn_rig_audio_stack() helper;
builds Arc<HashMap<String, RigHandle>> passed to run_listener
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
+71
-56
@@ -6,9 +6,9 @@
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{collections::VecDeque, sync::Mutex};
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
@@ -118,12 +118,29 @@ fn classify_stream_error(err: &str) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn aprs_history() -> &'static Mutex<VecDeque<(Instant, AprsPacket)>> {
|
||||
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, AprsPacket)>>> = OnceLock::new();
|
||||
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
|
||||
/// Per-rig decoder history store.
|
||||
///
|
||||
/// Replaces the previous process-wide `OnceLock` statics so that each rig
|
||||
/// instance can maintain its own independent history. Pass an
|
||||
/// `Arc<DecoderHistories>` into every decoder task and into the audio listener.
|
||||
pub struct DecoderHistories {
|
||||
aprs: Mutex<VecDeque<(Instant, AprsPacket)>>,
|
||||
ft8: Mutex<VecDeque<(Instant, Ft8Message)>>,
|
||||
wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
|
||||
}
|
||||
|
||||
fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) {
|
||||
impl DecoderHistories {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
aprs: Mutex::new(VecDeque::new()),
|
||||
ft8: Mutex::new(VecDeque::new()),
|
||||
wspr: Mutex::new(VecDeque::new()),
|
||||
})
|
||||
}
|
||||
|
||||
// --- APRS ---
|
||||
|
||||
fn prune_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) {
|
||||
let cutoff = Instant::now() - APRS_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
@@ -134,29 +151,25 @@ fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_aprs_packet(pkt: AprsPacket) {
|
||||
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
|
||||
history.push_back((Instant::now(), pkt));
|
||||
prune_aprs_history(&mut history);
|
||||
pub fn record_aprs_packet(&self, pkt: AprsPacket) {
|
||||
let mut h = self.aprs.lock().expect("aprs history mutex poisoned");
|
||||
h.push_back((Instant::now(), pkt));
|
||||
Self::prune_aprs(&mut h);
|
||||
}
|
||||
|
||||
pub fn snapshot_aprs_history() -> Vec<AprsPacket> {
|
||||
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
|
||||
prune_aprs_history(&mut history);
|
||||
history.iter().map(|(_, pkt)| pkt.clone()).collect()
|
||||
pub fn snapshot_aprs_history(&self) -> Vec<AprsPacket> {
|
||||
let mut h = self.aprs.lock().expect("aprs history mutex poisoned");
|
||||
Self::prune_aprs(&mut h);
|
||||
h.iter().map(|(_, pkt)| pkt.clone()).collect()
|
||||
}
|
||||
|
||||
pub fn clear_aprs_history() {
|
||||
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
|
||||
history.clear();
|
||||
pub fn clear_aprs_history(&self) {
|
||||
self.aprs.lock().expect("aprs history mutex poisoned").clear();
|
||||
}
|
||||
|
||||
fn ft8_history() -> &'static Mutex<VecDeque<(Instant, Ft8Message)>> {
|
||||
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, Ft8Message)>>> = OnceLock::new();
|
||||
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
|
||||
}
|
||||
// --- FT8 ---
|
||||
|
||||
fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) {
|
||||
fn prune_ft8(history: &mut VecDeque<(Instant, Ft8Message)>) {
|
||||
let cutoff = Instant::now() - FT8_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
@@ -167,29 +180,25 @@ fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_ft8_message(msg: Ft8Message) {
|
||||
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
|
||||
history.push_back((Instant::now(), msg));
|
||||
prune_ft8_history(&mut history);
|
||||
pub fn record_ft8_message(&self, msg: Ft8Message) {
|
||||
let mut h = self.ft8.lock().expect("ft8 history mutex poisoned");
|
||||
h.push_back((Instant::now(), msg));
|
||||
Self::prune_ft8(&mut h);
|
||||
}
|
||||
|
||||
pub fn snapshot_ft8_history() -> Vec<Ft8Message> {
|
||||
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
|
||||
prune_ft8_history(&mut history);
|
||||
history.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
pub fn snapshot_ft8_history(&self) -> Vec<Ft8Message> {
|
||||
let mut h = self.ft8.lock().expect("ft8 history mutex poisoned");
|
||||
Self::prune_ft8(&mut h);
|
||||
h.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
}
|
||||
|
||||
pub fn clear_ft8_history() {
|
||||
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
|
||||
history.clear();
|
||||
pub fn clear_ft8_history(&self) {
|
||||
self.ft8.lock().expect("ft8 history mutex poisoned").clear();
|
||||
}
|
||||
|
||||
fn wspr_history() -> &'static Mutex<VecDeque<(Instant, WsprMessage)>> {
|
||||
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, WsprMessage)>>> = OnceLock::new();
|
||||
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
|
||||
}
|
||||
// --- WSPR ---
|
||||
|
||||
fn prune_wspr_history(history: &mut VecDeque<(Instant, WsprMessage)>) {
|
||||
fn prune_wspr(history: &mut VecDeque<(Instant, WsprMessage)>) {
|
||||
let cutoff = Instant::now() - WSPR_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
@@ -200,21 +209,21 @@ fn prune_wspr_history(history: &mut VecDeque<(Instant, WsprMessage)>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn snapshot_wspr_history() -> Vec<WsprMessage> {
|
||||
let mut history = wspr_history().lock().expect("wspr history mutex poisoned");
|
||||
prune_wspr_history(&mut history);
|
||||
history.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
pub fn record_wspr_message(&self, msg: WsprMessage) {
|
||||
let mut h = self.wspr.lock().expect("wspr history mutex poisoned");
|
||||
h.push_back((Instant::now(), msg));
|
||||
Self::prune_wspr(&mut h);
|
||||
}
|
||||
|
||||
pub fn clear_wspr_history() {
|
||||
let mut history = wspr_history().lock().expect("wspr history mutex poisoned");
|
||||
history.clear();
|
||||
pub fn snapshot_wspr_history(&self) -> Vec<WsprMessage> {
|
||||
let mut h = self.wspr.lock().expect("wspr history mutex poisoned");
|
||||
Self::prune_wspr(&mut h);
|
||||
h.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
}
|
||||
|
||||
pub fn record_wspr_message(msg: WsprMessage) {
|
||||
let mut history = wspr_history().lock().expect("wspr history mutex poisoned");
|
||||
history.push_back((Instant::now(), msg));
|
||||
prune_wspr_history(&mut history);
|
||||
pub fn clear_wspr_history(&self) {
|
||||
self.wspr.lock().expect("wspr history mutex poisoned").clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the audio capture thread.
|
||||
@@ -665,6 +674,7 @@ pub async fn run_aprs_decoder(
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) {
|
||||
info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = AprsDecoder::new(sample_rate);
|
||||
@@ -717,7 +727,7 @@ pub async fn run_aprs_decoder(
|
||||
|
||||
was_active = true;
|
||||
for pkt in decoder.process_samples(&mono) {
|
||||
record_aprs_packet(pkt.clone());
|
||||
histories.record_aprs_packet(pkt.clone());
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_aprs(&pkt);
|
||||
}
|
||||
@@ -936,6 +946,7 @@ pub async fn run_ft8_decoder(
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) {
|
||||
info!("FT8 decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) {
|
||||
@@ -1020,7 +1031,7 @@ pub async fn run_ft8_decoder(
|
||||
freq_hz: res.freq_hz,
|
||||
message: res.text,
|
||||
};
|
||||
record_ft8_message(msg.clone());
|
||||
histories.record_ft8_message(msg.clone());
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_ft8(&msg);
|
||||
}
|
||||
@@ -1072,6 +1083,7 @@ pub async fn run_wspr_decoder(
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) {
|
||||
info!("WSPR decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let decoder = match WsprDecoder::new() {
|
||||
@@ -1136,7 +1148,7 @@ pub async fn run_wspr_decoder(
|
||||
freq_hz: res.freq_hz,
|
||||
message: res.message,
|
||||
};
|
||||
record_wspr_message(msg.clone());
|
||||
histories.record_wspr_message(msg.clone());
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_wspr(&msg);
|
||||
}
|
||||
@@ -1209,6 +1221,7 @@ pub async fn run_audio_listener(
|
||||
stream_info: AudioStreamInfo,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) -> std::io::Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
info!("Audio listener on {}", addr);
|
||||
@@ -1224,9 +1237,10 @@ pub async fn run_audio_listener(
|
||||
let info = stream_info.clone();
|
||||
let decode_tx = decode_tx.clone();
|
||||
let client_shutdown_rx = shutdown_rx.clone();
|
||||
let client_histories = histories.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx).await {
|
||||
if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx, client_histories).await {
|
||||
warn!("Audio client {} error: {:?}", peer, e);
|
||||
}
|
||||
info!("Audio client {} disconnected", peer);
|
||||
@@ -1255,6 +1269,7 @@ async fn handle_audio_client(
|
||||
stream_info: AudioStreamInfo,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) -> std::io::Result<()> {
|
||||
let (reader, writer) = socket.into_split();
|
||||
let mut reader = tokio::io::BufReader::new(reader);
|
||||
@@ -1265,7 +1280,7 @@ async fn handle_audio_client(
|
||||
write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?;
|
||||
|
||||
// Send APRS history to newly connected client.
|
||||
let history = snapshot_aprs_history();
|
||||
let history = histories.snapshot_aprs_history();
|
||||
for pkt in history {
|
||||
let msg = DecodedMessage::Aprs(pkt);
|
||||
let msg_type = AUDIO_MSG_APRS_DECODE;
|
||||
@@ -1274,7 +1289,7 @@ async fn handle_audio_client(
|
||||
}
|
||||
}
|
||||
// Send FT8 history to newly connected client.
|
||||
let history = snapshot_ft8_history();
|
||||
let history = histories.snapshot_ft8_history();
|
||||
for msg in history {
|
||||
let msg = DecodedMessage::Ft8(msg);
|
||||
let msg_type = AUDIO_MSG_FT8_DECODE;
|
||||
@@ -1283,7 +1298,7 @@ async fn handle_audio_client(
|
||||
}
|
||||
}
|
||||
// Send WSPR history to newly connected client.
|
||||
let history = snapshot_wspr_history();
|
||||
let history = histories.snapshot_wspr_history();
|
||||
for msg in history {
|
||||
let msg = DecodedMessage::Wspr(msg);
|
||||
let msg_type = AUDIO_MSG_WSPR_DECODE;
|
||||
|
||||
@@ -20,28 +20,74 @@ pub use trx_decode_log::DecodeLogsConfig;
|
||||
|
||||
use trx_core::rig::state::RigMode;
|
||||
|
||||
/// Per-rig instance configuration for multi-rig setups.
|
||||
///
|
||||
/// Each entry in `[[rigs]]` becomes one of these. The flat top-level
|
||||
/// `[rig]` / `[audio]` / `[sdr]` / `[pskreporter]` / `[aprsfi]` /
|
||||
/// `[behavior]` / `[decode_logs]` fields are still supported via
|
||||
/// `ServerConfig::resolved_rigs()` which synthesises a single-element list
|
||||
/// with `id = "default"` when `rigs` is empty.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct RigInstanceConfig {
|
||||
/// Stable rig identifier used in protocol routing.
|
||||
pub id: String,
|
||||
/// Rig backend configuration.
|
||||
pub rig: RigConfig,
|
||||
/// Polling and retry behavior.
|
||||
pub behavior: BehaviorConfig,
|
||||
/// Audio streaming configuration for this rig.
|
||||
pub audio: AudioConfig,
|
||||
/// SDR pipeline configuration (only used when [rigs.rig.access] type = "sdr").
|
||||
pub sdr: SdrConfig,
|
||||
/// PSK Reporter uplink for this rig.
|
||||
pub pskreporter: PskReporterConfig,
|
||||
/// APRS-IS IGate uplink for this rig.
|
||||
pub aprsfi: AprsFiConfig,
|
||||
/// Decoder file logging for this rig.
|
||||
pub decode_logs: DecodeLogsConfig,
|
||||
}
|
||||
|
||||
impl Default for RigInstanceConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: "default".to_string(),
|
||||
rig: RigConfig::default(),
|
||||
behavior: BehaviorConfig::default(),
|
||||
audio: AudioConfig::default(),
|
||||
sdr: SdrConfig::default(),
|
||||
pskreporter: PskReporterConfig::default(),
|
||||
aprsfi: AprsFiConfig::default(),
|
||||
decode_logs: DecodeLogsConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Top-level server configuration structure.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct ServerConfig {
|
||||
/// General settings
|
||||
pub general: GeneralConfig,
|
||||
/// Rig backend configuration
|
||||
/// Rig backend configuration (legacy flat; use [[rigs]] for multi-rig)
|
||||
pub rig: RigConfig,
|
||||
/// Polling and retry behavior
|
||||
/// Polling and retry behavior (legacy flat)
|
||||
pub behavior: BehaviorConfig,
|
||||
/// TCP listener configuration
|
||||
pub listen: ListenConfig,
|
||||
/// Audio streaming configuration
|
||||
/// Audio streaming configuration (legacy flat)
|
||||
pub audio: AudioConfig,
|
||||
/// PSK Reporter uplink configuration
|
||||
/// PSK Reporter uplink configuration (legacy flat)
|
||||
pub pskreporter: PskReporterConfig,
|
||||
/// APRS-IS IGate uplink configuration
|
||||
/// APRS-IS IGate uplink configuration (legacy flat)
|
||||
pub aprsfi: AprsFiConfig,
|
||||
/// Decoder file logging configuration
|
||||
/// Decoder file logging configuration (legacy flat)
|
||||
pub decode_logs: DecodeLogsConfig,
|
||||
/// SDR pipeline configuration (used when [rig.access] type = "sdr").
|
||||
/// SDR pipeline configuration (legacy flat; used when [rig.access] type = "sdr").
|
||||
pub sdr: SdrConfig,
|
||||
/// Multi-rig instance list. When non-empty, takes priority over the flat fields.
|
||||
#[serde(rename = "rigs", default)]
|
||||
pub rigs: Vec<RigInstanceConfig>,
|
||||
}
|
||||
|
||||
/// General application settings.
|
||||
@@ -441,6 +487,33 @@ impl ServerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
// Multi-rig uniqueness checks.
|
||||
if !self.rigs.is_empty() {
|
||||
let mut seen_ids: std::collections::HashSet<String> =
|
||||
std::collections::HashSet::new();
|
||||
let mut seen_ports: std::collections::HashSet<u16> =
|
||||
std::collections::HashSet::new();
|
||||
for rig in &self.rigs {
|
||||
if rig.id.trim().is_empty() {
|
||||
return Err("[[rigs]] entry has an empty id".to_string());
|
||||
}
|
||||
if !seen_ids.insert(rig.id.clone()) {
|
||||
return Err(format!(
|
||||
"[[rigs]] duplicate rig id: \"{}\"",
|
||||
rig.id
|
||||
));
|
||||
}
|
||||
if rig.audio.enabled {
|
||||
if !seen_ports.insert(rig.audio.port) {
|
||||
return Err(format!(
|
||||
"[[rigs]] duplicate audio port {} (rig id: \"{}\")",
|
||||
rig.audio.port, rig.id
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.decode_logs.enabled {
|
||||
if self.decode_logs.dir.trim().is_empty() {
|
||||
return Err("[decode_logs].dir must not be empty when enabled".to_string());
|
||||
@@ -535,6 +608,27 @@ impl ServerConfig {
|
||||
<Self as ConfigFile>::load_from_default_paths()
|
||||
}
|
||||
|
||||
/// Return the effective list of rig instances to spawn.
|
||||
///
|
||||
/// When `[[rigs]]` entries are present they are returned as-is.
|
||||
/// Otherwise the legacy flat `[rig]` / `[audio]` / … fields are synthesised
|
||||
/// into a single `RigInstanceConfig` with `id = "default"`.
|
||||
pub fn resolved_rigs(&self) -> Vec<RigInstanceConfig> {
|
||||
if !self.rigs.is_empty() {
|
||||
return self.rigs.clone();
|
||||
}
|
||||
vec![RigInstanceConfig {
|
||||
id: "default".to_string(),
|
||||
rig: self.rig.clone(),
|
||||
behavior: self.behavior.clone(),
|
||||
audio: self.audio.clone(),
|
||||
sdr: self.sdr.clone(),
|
||||
pskreporter: self.pskreporter.clone(),
|
||||
aprsfi: self.aprsfi.clone(),
|
||||
decode_logs: self.decode_logs.clone(),
|
||||
}]
|
||||
}
|
||||
|
||||
/// Generate an example configuration as a TOML string.
|
||||
pub fn example_toml() -> String {
|
||||
let example = ServerConfig {
|
||||
@@ -564,6 +658,7 @@ impl ServerConfig {
|
||||
aprsfi: AprsFiConfig::default(),
|
||||
decode_logs: DecodeLogsConfig::default(),
|
||||
sdr: SdrConfig::default(),
|
||||
rigs: Vec::new(),
|
||||
};
|
||||
|
||||
toml::to_string_pretty(&example).unwrap_or_default()
|
||||
@@ -1018,4 +1113,167 @@ tokens = ["secret123"]
|
||||
errors
|
||||
);
|
||||
}
|
||||
|
||||
// --- MR-08: multi-rig config tests ---
|
||||
|
||||
#[test]
|
||||
fn test_resolved_rigs_legacy_flat_fields() {
|
||||
let mut cfg = ServerConfig::default();
|
||||
cfg.rig.model = Some("ft817".to_string());
|
||||
cfg.rig.access.access_type = Some("serial".to_string());
|
||||
cfg.rig.access.port = Some("/dev/ttyUSB0".to_string());
|
||||
cfg.rig.access.baud = Some(9600);
|
||||
|
||||
let rigs = cfg.resolved_rigs();
|
||||
assert_eq!(rigs.len(), 1);
|
||||
assert_eq!(rigs[0].id, "default");
|
||||
assert_eq!(rigs[0].rig.model, Some("ft817".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolved_rigs_multi_rig_toml() {
|
||||
let toml_str = r#"
|
||||
[general]
|
||||
callsign = "W1AW"
|
||||
|
||||
[[rigs]]
|
||||
id = "hf"
|
||||
|
||||
[rigs.rig]
|
||||
model = "ft450d"
|
||||
initial_freq_hz = 14074000
|
||||
|
||||
[rigs.rig.access]
|
||||
type = "serial"
|
||||
port = "/dev/ttyUSB0"
|
||||
baud = 9600
|
||||
|
||||
[rigs.audio]
|
||||
port = 4531
|
||||
|
||||
[[rigs]]
|
||||
id = "sdr"
|
||||
|
||||
[rigs.rig]
|
||||
model = "soapysdr"
|
||||
|
||||
[rigs.rig.access]
|
||||
type = "sdr"
|
||||
args = "driver=rtlsdr"
|
||||
|
||||
[rigs.audio]
|
||||
port = 4532
|
||||
"#;
|
||||
let cfg: ServerConfig = toml::from_str(toml_str).unwrap();
|
||||
let rigs = cfg.resolved_rigs();
|
||||
assert_eq!(rigs.len(), 2);
|
||||
assert_eq!(rigs[0].id, "hf");
|
||||
assert_eq!(rigs[0].rig.model, Some("ft450d".to_string()));
|
||||
assert_eq!(rigs[0].audio.port, 4531);
|
||||
assert_eq!(rigs[1].id, "sdr");
|
||||
assert_eq!(rigs[1].rig.model, Some("soapysdr".to_string()));
|
||||
assert_eq!(rigs[1].audio.port, 4532);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_rejects_duplicate_rig_ids() {
|
||||
let toml_str = r#"
|
||||
[[rigs]]
|
||||
id = "rig1"
|
||||
[rigs.rig]
|
||||
model = "ft817"
|
||||
[rigs.rig.access]
|
||||
type = "serial"
|
||||
port = "/dev/ttyUSB0"
|
||||
baud = 9600
|
||||
[rigs.audio]
|
||||
port = 4531
|
||||
|
||||
[[rigs]]
|
||||
id = "rig1"
|
||||
[rigs.rig]
|
||||
model = "ft450d"
|
||||
[rigs.rig.access]
|
||||
type = "serial"
|
||||
port = "/dev/ttyUSB1"
|
||||
baud = 9600
|
||||
[rigs.audio]
|
||||
port = 4532
|
||||
"#;
|
||||
let cfg: ServerConfig = toml::from_str(toml_str).unwrap();
|
||||
let result = cfg.validate();
|
||||
assert!(result.is_err());
|
||||
assert!(
|
||||
result.unwrap_err().contains("duplicate rig id"),
|
||||
"expected error about duplicate rig id"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_rejects_duplicate_audio_ports() {
|
||||
let toml_str = r#"
|
||||
[[rigs]]
|
||||
id = "rig1"
|
||||
[rigs.rig]
|
||||
model = "ft817"
|
||||
[rigs.rig.access]
|
||||
type = "serial"
|
||||
port = "/dev/ttyUSB0"
|
||||
baud = 9600
|
||||
[rigs.audio]
|
||||
port = 4531
|
||||
|
||||
[[rigs]]
|
||||
id = "rig2"
|
||||
[rigs.rig]
|
||||
model = "ft450d"
|
||||
[rigs.rig.access]
|
||||
type = "serial"
|
||||
port = "/dev/ttyUSB1"
|
||||
baud = 9600
|
||||
[rigs.audio]
|
||||
port = 4531
|
||||
"#;
|
||||
let cfg: ServerConfig = toml::from_str(toml_str).unwrap();
|
||||
let result = cfg.validate();
|
||||
assert!(result.is_err());
|
||||
assert!(
|
||||
result.unwrap_err().contains("duplicate audio port"),
|
||||
"expected error about duplicate audio port"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_accepts_multi_rig_unique_ids_and_ports() {
|
||||
let toml_str = r#"
|
||||
[[rigs]]
|
||||
id = "hf"
|
||||
[rigs.rig]
|
||||
model = "ft450d"
|
||||
[rigs.rig.access]
|
||||
type = "serial"
|
||||
port = "/dev/ttyUSB0"
|
||||
baud = 9600
|
||||
[rigs.audio]
|
||||
port = 4531
|
||||
|
||||
[[rigs]]
|
||||
id = "sdr"
|
||||
[rigs.rig]
|
||||
model = "soapysdr"
|
||||
[rigs.rig.access]
|
||||
type = "sdr"
|
||||
args = "driver=rtlsdr"
|
||||
[rigs.audio]
|
||||
port = 4532
|
||||
"#;
|
||||
let cfg: ServerConfig = toml::from_str(toml_str).unwrap();
|
||||
// validate() uses the flat [rig] field for rig-level checks; multi-rig
|
||||
// validation focuses on ID/port uniqueness. The flat [rig] is default
|
||||
// (no model), so the access check is skipped when both fields are absent.
|
||||
assert!(
|
||||
cfg.validate().is_ok(),
|
||||
"expected Ok for valid multi-rig config"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+149
-22
@@ -6,7 +6,11 @@
|
||||
//!
|
||||
//! Accepts client connections speaking the `ClientEnvelope`/`ClientResponse`
|
||||
//! protocol defined in `trx-protocol`.
|
||||
//!
|
||||
//! Multi-rig routing: `ClientEnvelope.rig_id` selects the target rig.
|
||||
//! When absent the first rig in the map is used (backward compat).
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -14,28 +18,34 @@ use std::time::Duration;
|
||||
|
||||
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{mpsc, oneshot, watch};
|
||||
use tokio::sync::{oneshot, watch};
|
||||
use tokio::time;
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use trx_core::rig::command::RigCommand;
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
use trx_protocol::auth::{SimpleTokenValidator, TokenValidator};
|
||||
use trx_protocol::codec::parse_envelope;
|
||||
use trx_protocol::mapping;
|
||||
use trx_protocol::types::{ClientCommand, RigEntry};
|
||||
use trx_protocol::ClientResponse;
|
||||
|
||||
use crate::rig_handle::RigHandle;
|
||||
|
||||
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
||||
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
||||
|
||||
/// Run the JSON TCP listener, accepting client connections.
|
||||
///
|
||||
/// `rigs` is a shared map from rig_id → `RigHandle`. The first entry (by
|
||||
/// insertion order — deterministic after MR-07 iterates `resolved_rigs()` in
|
||||
/// order) is the default rig for backward-compat clients that omit `rig_id`.
|
||||
pub async fn run_listener(
|
||||
addr: SocketAddr,
|
||||
rig_tx: mpsc::Sender<RigRequest>,
|
||||
rigs: Arc<HashMap<String, RigHandle>>,
|
||||
default_rig_id: String,
|
||||
auth_tokens: HashSet<String>,
|
||||
state_rx: watch::Receiver<RigState>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) -> std::io::Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
@@ -49,12 +59,12 @@ pub async fn run_listener(
|
||||
let (socket, peer) = accept?;
|
||||
info!("Client connected: {}", peer);
|
||||
|
||||
let tx = rig_tx.clone();
|
||||
let srx = state_rx.clone();
|
||||
let rigs = Arc::clone(&rigs);
|
||||
let default_rig_id = default_rig_id.clone();
|
||||
let validator = Arc::clone(&validator);
|
||||
let client_shutdown_rx = shutdown_rx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_client(socket, peer, tx, validator, srx, client_shutdown_rx).await {
|
||||
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, client_shutdown_rx).await {
|
||||
error!("Client {} error: {:?}", peer, e);
|
||||
}
|
||||
});
|
||||
@@ -147,9 +157,9 @@ async fn send_response(
|
||||
async fn handle_client(
|
||||
socket: TcpStream,
|
||||
addr: SocketAddr,
|
||||
tx: mpsc::Sender<RigRequest>,
|
||||
rigs: Arc<HashMap<String, RigHandle>>,
|
||||
default_rig_id: String,
|
||||
validator: Arc<SimpleTokenValidator>,
|
||||
state_rx: watch::Receiver<RigState>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) -> std::io::Result<()> {
|
||||
let (reader, mut writer) = socket.into_split();
|
||||
@@ -196,7 +206,9 @@ async fn handle_client(
|
||||
error!("Invalid JSON from {}: {} / {:?}", addr, trimmed, e);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: None,
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some(format!("Invalid JSON: {}", e)),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -207,23 +219,74 @@ async fn handle_client(
|
||||
if let Err(err) = validator.as_ref().validate(&envelope.token) {
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: None,
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some(err),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Resolve rig_id from the envelope (absent = default).
|
||||
let target_rig_id = envelope
|
||||
.rig_id
|
||||
.as_deref()
|
||||
.unwrap_or(&default_rig_id)
|
||||
.to_string();
|
||||
|
||||
// GetRigs: aggregate all rig states and return without hitting any task.
|
||||
if matches!(envelope.cmd, ClientCommand::GetRigs) {
|
||||
let mut entries: Vec<RigEntry> = Vec::new();
|
||||
for handle in rigs.values() {
|
||||
let state = handle.state_rx.borrow().clone();
|
||||
if let Some(snapshot) = state.snapshot() {
|
||||
entries.push(RigEntry {
|
||||
rig_id: handle.rig_id.clone(),
|
||||
state: snapshot,
|
||||
});
|
||||
}
|
||||
}
|
||||
let resp = ClientResponse {
|
||||
success: true,
|
||||
rig_id: Some("server".to_string()),
|
||||
state: None,
|
||||
rigs: Some(entries),
|
||||
error: None,
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Look up the target rig handle.
|
||||
let handle = match rigs.get(&target_rig_id) {
|
||||
Some(h) => h,
|
||||
None => {
|
||||
warn!("Unknown rig_id '{}' from {}", target_rig_id, addr);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some(format!("Unknown rig_id: {}", target_rig_id)),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
||||
|
||||
// Fast path: serve GetSnapshot directly from the watch channel
|
||||
// so clients get a response even while the rig task is initializing.
|
||||
if matches!(rig_cmd, RigCommand::GetSnapshot) {
|
||||
let state = state_rx.borrow().clone();
|
||||
let state = handle.state_rx.borrow().clone();
|
||||
if let Some(snapshot) = state.snapshot() {
|
||||
let resp = ClientResponse {
|
||||
success: true,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: Some(snapshot),
|
||||
rigs: None,
|
||||
error: None,
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -237,13 +300,15 @@ async fn handle_client(
|
||||
respond_to: resp_tx,
|
||||
};
|
||||
|
||||
match time::timeout(IO_TIMEOUT, tx.send(req)).await {
|
||||
match time::timeout(IO_TIMEOUT, handle.rig_tx.send(req)).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to send request to rig_task: {:?}", e);
|
||||
error!("Failed to send request to rig_task for '{}': {:?}", target_rig_id, e);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some("Internal error: rig task not available".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -252,7 +317,9 @@ async fn handle_client(
|
||||
Err(_) => {
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some("Internal error: request queue timeout".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -267,7 +334,9 @@ async fn handle_client(
|
||||
Err(_) => {
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some("Request timed out waiting for rig response".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -289,7 +358,9 @@ async fn handle_client(
|
||||
Ok(Ok(snapshot)) => {
|
||||
let resp = ClientResponse {
|
||||
success: true,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: Some(snapshot),
|
||||
rigs: None,
|
||||
error: None,
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -297,7 +368,9 @@ async fn handle_client(
|
||||
Ok(Err(err)) => {
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some(err.message),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -306,7 +379,9 @@ async fn handle_client(
|
||||
error!("Rig response oneshot recv error: {:?}", e);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
state: None,
|
||||
rigs: None,
|
||||
error: Some("Internal error waiting for rig response".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
@@ -325,9 +400,12 @@ mod tests {
|
||||
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
use trx_core::radio::freq::Band;
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo};
|
||||
use trx_core::rig::state::RigState;
|
||||
|
||||
fn loopback_addr() -> SocketAddr {
|
||||
let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind");
|
||||
@@ -367,18 +445,30 @@ mod tests {
|
||||
state
|
||||
}
|
||||
|
||||
fn make_rigs(state: RigState) -> (Arc<HashMap<String, RigHandle>>, String) {
|
||||
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
|
||||
let (state_tx, state_rx) = watch::channel(state);
|
||||
let _state_tx = state_tx;
|
||||
let handle = RigHandle {
|
||||
rig_id: "default".to_string(),
|
||||
rig_tx,
|
||||
state_rx,
|
||||
};
|
||||
let mut map = HashMap::new();
|
||||
map.insert("default".to_string(), handle);
|
||||
(Arc::new(map), "default".to_string())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires TCP bind permissions"]
|
||||
async fn listener_rejects_missing_token() {
|
||||
let addr = loopback_addr();
|
||||
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
|
||||
let (state_tx, state_rx) = watch::channel(sample_state());
|
||||
let _state_tx = state_tx;
|
||||
let (rigs, default_id) = make_rigs(sample_state());
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
let mut auth = HashSet::new();
|
||||
auth.insert("secret".to_string());
|
||||
let handle = tokio::spawn(run_listener(addr, rig_tx, auth, state_rx, shutdown_rx));
|
||||
let handle = tokio::spawn(run_listener(addr, rigs, default_id, auth, shutdown_rx));
|
||||
|
||||
let stream = TcpStream::connect(addr).await.expect("connect");
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
@@ -406,16 +496,14 @@ mod tests {
|
||||
#[ignore = "requires TCP bind permissions"]
|
||||
async fn listener_serves_get_state_snapshot() {
|
||||
let addr = loopback_addr();
|
||||
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
|
||||
let (state_tx, state_rx) = watch::channel(sample_state());
|
||||
let _state_tx = state_tx;
|
||||
let (rigs, default_id) = make_rigs(sample_state());
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
let handle = tokio::spawn(run_listener(
|
||||
addr,
|
||||
rig_tx,
|
||||
rigs,
|
||||
default_id,
|
||||
HashSet::new(),
|
||||
state_rx,
|
||||
shutdown_rx,
|
||||
));
|
||||
|
||||
@@ -437,6 +525,45 @@ mod tests {
|
||||
let snapshot = resp.state.expect("snapshot");
|
||||
assert_eq!(snapshot.info.model, "Dummy");
|
||||
assert_eq!(snapshot.status.freq.hz, 144_300_000);
|
||||
// rig_id should be set in the response
|
||||
assert_eq!(resp.rig_id.as_deref(), Some("default"));
|
||||
|
||||
let _ = shutdown_tx.send(true);
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires TCP bind permissions"]
|
||||
async fn listener_routes_unknown_rig_id() {
|
||||
let addr = loopback_addr();
|
||||
let (rigs, default_id) = make_rigs(sample_state());
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
let handle = tokio::spawn(run_listener(
|
||||
addr,
|
||||
rigs,
|
||||
default_id,
|
||||
HashSet::new(),
|
||||
shutdown_rx,
|
||||
));
|
||||
|
||||
let stream = TcpStream::connect(addr).await.expect("connect");
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
|
||||
writer
|
||||
.write_all(br#"{"rig_id":"nonexistent","cmd":"get_state"}"#)
|
||||
.await
|
||||
.expect("write");
|
||||
writer.write_all(b"\n").await.expect("newline");
|
||||
writer.flush().await.expect("flush");
|
||||
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await.expect("read");
|
||||
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
|
||||
assert!(!resp.success);
|
||||
assert!(resp.error.as_deref().unwrap_or("").contains("Unknown rig_id"));
|
||||
|
||||
let _ = shutdown_tx.send(true);
|
||||
handle.abort();
|
||||
|
||||
+468
-297
@@ -8,12 +8,15 @@ mod config;
|
||||
mod error;
|
||||
mod listener;
|
||||
mod pskreporter;
|
||||
mod rig_handle;
|
||||
mod rig_task;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -32,7 +35,9 @@ use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
use trx_core::DynResult;
|
||||
|
||||
use config::ServerConfig;
|
||||
use audio::DecoderHistories;
|
||||
use config::{RigInstanceConfig, ServerConfig};
|
||||
use rig_handle::RigHandle;
|
||||
use trx_decode_log::DecoderLoggers;
|
||||
|
||||
#[cfg(feature = "soapysdr")]
|
||||
@@ -101,7 +106,7 @@ fn parse_serial_addr(addr: &str) -> DynResult<(String, u32)> {
|
||||
Ok((path.to_string(), baud))
|
||||
}
|
||||
|
||||
/// Resolved configuration after merging config file and CLI arguments.
|
||||
/// Resolved configuration for the first/only rig (legacy single-rig CLI path).
|
||||
struct ResolvedConfig {
|
||||
rig: String,
|
||||
access: RigAccess,
|
||||
@@ -190,51 +195,35 @@ fn resolve_config(
|
||||
})
|
||||
}
|
||||
|
||||
fn build_rig_task_config(
|
||||
resolved: &ResolvedConfig,
|
||||
cfg: &ServerConfig,
|
||||
registry: std::sync::Arc<RegistrationContext>,
|
||||
) -> rig_task::RigTaskConfig {
|
||||
let pskreporter_status = if cfg.pskreporter.enabled {
|
||||
let has_locator = cfg.pskreporter.receiver_locator.is_some()
|
||||
|| (resolved.latitude.is_some() && resolved.longitude.is_some());
|
||||
if has_locator {
|
||||
Some(format!(
|
||||
"Enabled ({}:{})",
|
||||
cfg.pskreporter.host, cfg.pskreporter.port
|
||||
))
|
||||
} else {
|
||||
Some(format!(
|
||||
"Enabled but inactive (missing locator source) ({}:{})",
|
||||
cfg.pskreporter.host, cfg.pskreporter.port
|
||||
))
|
||||
/// Derive a `RigAccess` from a rig instance config's access fields.
|
||||
fn access_from_rig_instance(rig_cfg: &RigInstanceConfig) -> DynResult<RigAccess> {
|
||||
match rig_cfg.rig.access.access_type.as_deref() {
|
||||
Some("serial") | None => {
|
||||
let path = rig_cfg
|
||||
.rig
|
||||
.access
|
||||
.port
|
||||
.clone()
|
||||
.unwrap_or_else(|| "/dev/ttyUSB0".to_string());
|
||||
let baud = rig_cfg.rig.access.baud.unwrap_or(9600);
|
||||
Ok(RigAccess::Serial { path, baud })
|
||||
}
|
||||
} else {
|
||||
Some("Disabled".to_string())
|
||||
};
|
||||
|
||||
rig_task::RigTaskConfig {
|
||||
registry,
|
||||
rig_model: resolved.rig.clone(),
|
||||
access: resolved.access.clone(),
|
||||
polling: AdaptivePolling::new(
|
||||
Duration::from_millis(cfg.behavior.poll_interval_ms),
|
||||
Duration::from_millis(cfg.behavior.poll_interval_tx_ms),
|
||||
),
|
||||
retry: ExponentialBackoff::new(
|
||||
cfg.behavior.max_retries.max(1),
|
||||
Duration::from_millis(cfg.behavior.retry_base_delay_ms),
|
||||
Duration::from_secs(RETRY_MAX_DELAY_SECS),
|
||||
),
|
||||
initial_freq_hz: cfg.rig.initial_freq_hz,
|
||||
initial_mode: cfg.rig.initial_mode.clone(),
|
||||
server_callsign: resolved.callsign.clone(),
|
||||
server_version: Some(env!("CARGO_PKG_VERSION").to_string()),
|
||||
server_build_date: Some(env!("TRX_SERVER_BUILD_DATE").to_string()),
|
||||
server_latitude: resolved.latitude,
|
||||
server_longitude: resolved.longitude,
|
||||
pskreporter_status,
|
||||
prebuilt_rig: None,
|
||||
Some("tcp") => {
|
||||
let host = rig_cfg.rig.access.host.clone().unwrap_or_default();
|
||||
let port = rig_cfg.rig.access.tcp_port.unwrap_or(0);
|
||||
Ok(RigAccess::Tcp {
|
||||
addr: format!("{}:{}", host, port),
|
||||
})
|
||||
}
|
||||
Some("sdr") => {
|
||||
let args = rig_cfg.rig.access.args.clone().unwrap_or_default();
|
||||
Ok(RigAccess::Sdr { args })
|
||||
}
|
||||
Some(other) => Err(format!(
|
||||
"Unknown access type '{}' for rig '{}'",
|
||||
other, rig_cfg.id
|
||||
)
|
||||
.into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,13 +260,10 @@ fn parse_rig_mode(
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a `SoapySdrRig` with full channel config from `ServerConfig` and
|
||||
/// return both the rig box and a PCM receiver subscribed to its primary channel.
|
||||
///
|
||||
/// Only compiled when the `soapysdr` feature is enabled.
|
||||
/// Build a `SoapySdrRig` with full channel config from a `RigInstanceConfig`.
|
||||
#[cfg(feature = "soapysdr")]
|
||||
fn build_sdr_rig(
|
||||
cfg: &ServerConfig,
|
||||
fn build_sdr_rig_from_instance(
|
||||
rig_cfg: &RigInstanceConfig,
|
||||
) -> DynResult<(
|
||||
Box<dyn trx_core::rig::RigCat>,
|
||||
tokio::sync::broadcast::Receiver<Vec<f32>>,
|
||||
@@ -285,14 +271,14 @@ fn build_sdr_rig(
|
||||
use trx_core::radio::freq::Freq;
|
||||
use trx_core::rig::AudioSource;
|
||||
|
||||
let args = cfg.rig.access.args.as_deref().unwrap_or("");
|
||||
let channels: Vec<(f64, trx_core::rig::state::RigMode, u32, usize)> = cfg
|
||||
let args = rig_cfg.rig.access.args.as_deref().unwrap_or("");
|
||||
let channels: Vec<(f64, trx_core::rig::state::RigMode, u32, usize)> = rig_cfg
|
||||
.sdr
|
||||
.channels
|
||||
.iter()
|
||||
.map(|ch| {
|
||||
let if_hz = (cfg.sdr.center_offset_hz + ch.offset_hz) as f64;
|
||||
let mode = parse_rig_mode(&ch.mode, &cfg.rig.initial_mode);
|
||||
let if_hz = (rig_cfg.sdr.center_offset_hz + ch.offset_hz) as f64;
|
||||
let mode = parse_rig_mode(&ch.mode, &rig_cfg.rig.initial_mode);
|
||||
(if_hz, mode, ch.audio_bandwidth_hz, ch.fir_taps)
|
||||
})
|
||||
.collect();
|
||||
@@ -300,28 +286,290 @@ fn build_sdr_rig(
|
||||
let sdr_rig = trx_backend_soapysdr::SoapySdrRig::new_with_config(
|
||||
args,
|
||||
&channels,
|
||||
&cfg.sdr.gain.mode,
|
||||
cfg.sdr.gain.value,
|
||||
cfg.audio.sample_rate,
|
||||
cfg.audio.frame_duration_ms,
|
||||
&rig_cfg.sdr.gain.mode,
|
||||
rig_cfg.sdr.gain.value,
|
||||
rig_cfg.audio.sample_rate,
|
||||
rig_cfg.audio.frame_duration_ms,
|
||||
Freq {
|
||||
hz: cfg.rig.initial_freq_hz,
|
||||
hz: rig_cfg.rig.initial_freq_hz,
|
||||
},
|
||||
cfg.rig.initial_mode.clone(),
|
||||
cfg.sdr.sample_rate,
|
||||
rig_cfg.rig.initial_mode.clone(),
|
||||
rig_cfg.sdr.sample_rate,
|
||||
)?;
|
||||
|
||||
// Subscribe to the primary channel's PCM broadcast before consuming the rig.
|
||||
let pcm_rx = sdr_rig.subscribe_pcm();
|
||||
Ok((Box::new(sdr_rig) as Box<dyn trx_core::rig::RigCat>, pcm_rx))
|
||||
}
|
||||
|
||||
/// Build a `RigTaskConfig` for a single rig instance.
|
||||
fn build_rig_task_config(
|
||||
rig_cfg: &RigInstanceConfig,
|
||||
rig_model: String,
|
||||
access: RigAccess,
|
||||
callsign: Option<String>,
|
||||
latitude: Option<f64>,
|
||||
longitude: Option<f64>,
|
||||
registry: Arc<RegistrationContext>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) -> rig_task::RigTaskConfig {
|
||||
let pskreporter_status = if rig_cfg.pskreporter.enabled {
|
||||
let has_locator = rig_cfg.pskreporter.receiver_locator.is_some()
|
||||
|| (latitude.is_some() && longitude.is_some());
|
||||
if has_locator {
|
||||
Some(format!(
|
||||
"Enabled ({}:{})",
|
||||
rig_cfg.pskreporter.host, rig_cfg.pskreporter.port
|
||||
))
|
||||
} else {
|
||||
Some(format!(
|
||||
"Enabled but inactive (missing locator source) ({}:{})",
|
||||
rig_cfg.pskreporter.host, rig_cfg.pskreporter.port
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Some("Disabled".to_string())
|
||||
};
|
||||
|
||||
rig_task::RigTaskConfig {
|
||||
registry,
|
||||
rig_id: rig_cfg.id.clone(),
|
||||
rig_model,
|
||||
access,
|
||||
polling: AdaptivePolling::new(
|
||||
Duration::from_millis(rig_cfg.behavior.poll_interval_ms),
|
||||
Duration::from_millis(rig_cfg.behavior.poll_interval_tx_ms),
|
||||
),
|
||||
retry: ExponentialBackoff::new(
|
||||
rig_cfg.behavior.max_retries.max(1),
|
||||
Duration::from_millis(rig_cfg.behavior.retry_base_delay_ms),
|
||||
Duration::from_secs(RETRY_MAX_DELAY_SECS),
|
||||
),
|
||||
initial_freq_hz: rig_cfg.rig.initial_freq_hz,
|
||||
initial_mode: rig_cfg.rig.initial_mode.clone(),
|
||||
server_callsign: callsign,
|
||||
server_version: Some(env!("CARGO_PKG_VERSION").to_string()),
|
||||
server_build_date: Some(env!("TRX_SERVER_BUILD_DATE").to_string()),
|
||||
server_latitude: latitude,
|
||||
server_longitude: longitude,
|
||||
pskreporter_status,
|
||||
histories,
|
||||
prebuilt_rig: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn all audio-related tasks for one rig instance.
|
||||
///
|
||||
/// `sdr_pcm_rx` carries a live SDR PCM receiver when the rig uses the
|
||||
/// SoapySDR backend; `None` selects the cpal capture path.
|
||||
fn spawn_rig_audio_stack(
|
||||
rig_cfg: &RigInstanceConfig,
|
||||
state_rx: watch::Receiver<RigState>,
|
||||
shutdown_rx: &watch::Receiver<bool>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
callsign: Option<String>,
|
||||
latitude: Option<f64>,
|
||||
longitude: Option<f64>,
|
||||
listen_override: Option<IpAddr>,
|
||||
sdr_pcm_rx: Option<broadcast::Receiver<Vec<f32>>>,
|
||||
) -> Vec<JoinHandle<()>> {
|
||||
let mut handles: Vec<JoinHandle<()>> = Vec::new();
|
||||
|
||||
if !rig_cfg.audio.enabled {
|
||||
return handles;
|
||||
}
|
||||
|
||||
let audio_listen = SocketAddr::from((
|
||||
listen_override.unwrap_or(rig_cfg.audio.listen),
|
||||
rig_cfg.audio.port,
|
||||
));
|
||||
let stream_info = AudioStreamInfo {
|
||||
sample_rate: rig_cfg.audio.sample_rate,
|
||||
channels: rig_cfg.audio.channels,
|
||||
frame_duration_ms: rig_cfg.audio.frame_duration_ms,
|
||||
};
|
||||
|
||||
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
|
||||
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
|
||||
|
||||
// PCM tap for server-side decoders
|
||||
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(64);
|
||||
// Decoded messages broadcast
|
||||
let (decode_tx, _) = broadcast::channel::<trx_core::decode::DecodedMessage>(256);
|
||||
|
||||
if rig_cfg.pskreporter.enabled {
|
||||
let cs = callsign.clone().unwrap_or_default();
|
||||
if cs.trim().is_empty() {
|
||||
warn!(
|
||||
"[{}] PSK Reporter enabled but [general].callsign is empty; uplink disabled",
|
||||
rig_cfg.id
|
||||
);
|
||||
} else {
|
||||
let pr_cfg = rig_cfg.pskreporter.clone();
|
||||
let pr_state_rx = state_rx.clone();
|
||||
let pr_decode_rx = decode_tx.subscribe();
|
||||
let pr_shutdown_rx = shutdown_rx.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = pskreporter::run_pskreporter_uplink(
|
||||
pr_cfg,
|
||||
cs,
|
||||
latitude,
|
||||
longitude,
|
||||
pr_state_rx,
|
||||
pr_decode_rx
|
||||
) => {}
|
||||
_ = wait_for_shutdown(pr_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
if rig_cfg.aprsfi.enabled {
|
||||
let cs = callsign.clone().unwrap_or_default();
|
||||
if cs.trim().is_empty() {
|
||||
warn!(
|
||||
"[{}] APRS-IS IGate enabled but [general].callsign is empty; uplink disabled",
|
||||
rig_cfg.id
|
||||
);
|
||||
} else {
|
||||
let ai_cfg = rig_cfg.aprsfi.clone();
|
||||
let ai_decode_rx = decode_tx.subscribe();
|
||||
let ai_shutdown_rx = shutdown_rx.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = aprsfi::run_aprsfi_uplink(ai_cfg, cs, ai_decode_rx) => {}
|
||||
_ = wait_for_shutdown(ai_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
let decoder_logs = match DecoderLoggers::from_config(&rig_cfg.decode_logs) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("[{}] Decoder file logging disabled: {}", rig_cfg.id, e);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if rig_cfg.audio.rx_enabled {
|
||||
if let Some(mut sdr_rx) = sdr_pcm_rx {
|
||||
// SDR path: the backend pipeline provides demodulated PCM.
|
||||
info!("[{}] using SDR audio source — cpal capture disabled", rig_cfg.id);
|
||||
let pcm_tx_clone = pcm_tx.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
loop {
|
||||
match sdr_rx.recv().await {
|
||||
Ok(frame) => {
|
||||
let _ = pcm_tx_clone.send(frame);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!("SDR audio bridge: dropped {} frames", n);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
// cpal path (serial/TCP transceivers)
|
||||
let _capture_thread = audio::spawn_audio_capture(
|
||||
&rig_cfg.audio,
|
||||
rx_audio_tx.clone(),
|
||||
Some(pcm_tx.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn APRS decoder task
|
||||
let aprs_pcm_rx = pcm_tx.subscribe();
|
||||
let aprs_state_rx = state_rx.clone();
|
||||
let aprs_decode_tx = decode_tx.clone();
|
||||
let aprs_sr = rig_cfg.audio.sample_rate;
|
||||
let aprs_ch = rig_cfg.audio.channels;
|
||||
let aprs_shutdown_rx = shutdown_rx.clone();
|
||||
let aprs_logs = decoder_logs.clone();
|
||||
let aprs_histories = histories.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, aprs_logs, aprs_histories) => {}
|
||||
_ = wait_for_shutdown(aprs_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn CW decoder task (no histories needed — CW has no persistent history)
|
||||
let cw_pcm_rx = pcm_tx.subscribe();
|
||||
let cw_state_rx = state_rx.clone();
|
||||
let cw_decode_tx = decode_tx.clone();
|
||||
let cw_sr = rig_cfg.audio.sample_rate;
|
||||
let cw_ch = rig_cfg.audio.channels;
|
||||
let cw_shutdown_rx = shutdown_rx.clone();
|
||||
let cw_logs = decoder_logs.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {}
|
||||
_ = wait_for_shutdown(cw_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn FT8 decoder task
|
||||
let ft8_pcm_rx = pcm_tx.subscribe();
|
||||
let ft8_state_rx = state_rx.clone();
|
||||
let ft8_decode_tx = decode_tx.clone();
|
||||
let ft8_sr = rig_cfg.audio.sample_rate;
|
||||
let ft8_ch = rig_cfg.audio.channels;
|
||||
let ft8_shutdown_rx = shutdown_rx.clone();
|
||||
let ft8_logs = decoder_logs.clone();
|
||||
let ft8_histories = histories.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx, ft8_logs, ft8_histories) => {}
|
||||
_ = wait_for_shutdown(ft8_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn WSPR decoder task
|
||||
let wspr_pcm_rx = pcm_tx.subscribe();
|
||||
let wspr_state_rx = state_rx.clone();
|
||||
let wspr_decode_tx = decode_tx.clone();
|
||||
let wspr_sr = rig_cfg.audio.sample_rate;
|
||||
let wspr_ch = rig_cfg.audio.channels;
|
||||
let wspr_shutdown_rx = shutdown_rx.clone();
|
||||
let wspr_logs = decoder_logs.clone();
|
||||
let wspr_histories = histories.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx, wspr_logs, wspr_histories) => {}
|
||||
_ = wait_for_shutdown(wspr_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
if rig_cfg.audio.tx_enabled {
|
||||
let _playback_thread = audio::spawn_audio_playback(&rig_cfg.audio, tx_audio_rx);
|
||||
}
|
||||
|
||||
let audio_shutdown_rx = shutdown_rx.clone();
|
||||
let audio_histories = histories;
|
||||
handles.push(tokio::spawn(async move {
|
||||
if let Err(e) = audio::run_audio_listener(
|
||||
audio_listen,
|
||||
rx_audio_tx,
|
||||
tx_audio_tx,
|
||||
stream_info,
|
||||
decode_tx,
|
||||
audio_shutdown_rx,
|
||||
audio_histories,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Audio listener error: {:?}", e);
|
||||
}
|
||||
}));
|
||||
|
||||
handles
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> DynResult<()> {
|
||||
// Phase 3B: Create bootstrap context for explicit initialization.
|
||||
// This replaces reliance on global mutable state, though currently
|
||||
// built-in backends still register on globals for plugin compatibility.
|
||||
// Full de-globalization would require threading context through rig_task and listener.
|
||||
let mut bootstrap_ctx = RegistrationContext::new();
|
||||
register_builtin_backends_on(&mut bootstrap_ctx);
|
||||
|
||||
@@ -341,7 +589,7 @@ async fn main() -> DynResult<()> {
|
||||
cfg.validate()
|
||||
.map_err(|e| format!("Invalid server configuration: {}", e))?;
|
||||
|
||||
// Validate SDR-specific configuration rules (see SDR.md §11).
|
||||
// Validate SDR-specific configuration rules.
|
||||
let sdr_errors = cfg.validate_sdr();
|
||||
if !sdr_errors.is_empty() {
|
||||
for e in &sdr_errors {
|
||||
@@ -359,46 +607,121 @@ async fn main() -> DynResult<()> {
|
||||
info!("Loaded configuration from {}", path.display());
|
||||
}
|
||||
|
||||
let resolved = resolve_config(&cli, &cfg, &bootstrap_ctx)?;
|
||||
let registry = Arc::new(bootstrap_ctx);
|
||||
|
||||
match &resolved.access {
|
||||
// --- Resolve the effective rig list ---
|
||||
//
|
||||
// Legacy path: no [[rigs]] → synthesise from flat fields + CLI overrides.
|
||||
// Multi-rig path: [[rigs]] entries are used as-is; CLI rig/access flags
|
||||
// are ignored (no unambiguous target).
|
||||
let mut resolved_rigs = cfg.resolved_rigs();
|
||||
|
||||
let (callsign, latitude, longitude) = if cfg.rigs.is_empty() {
|
||||
// Apply CLI overrides to the first (only) rig.
|
||||
let legacy = resolve_config(&cli, &cfg, ®istry)?;
|
||||
|
||||
let first = resolved_rigs
|
||||
.first_mut()
|
||||
.expect("resolved_rigs always has ≥1 entry");
|
||||
|
||||
first.rig.model = Some(legacy.rig.clone());
|
||||
match &legacy.access {
|
||||
RigAccess::Serial { path, baud } => {
|
||||
first.rig.access.access_type = Some("serial".to_string());
|
||||
first.rig.access.port = Some(path.clone());
|
||||
first.rig.access.baud = Some(*baud);
|
||||
}
|
||||
RigAccess::Tcp { addr } => {
|
||||
first.rig.access.access_type = Some("tcp".to_string());
|
||||
// Split "host:port" back into parts.
|
||||
if let Some(colon) = addr.rfind(':') {
|
||||
first.rig.access.host = Some(addr[..colon].to_string());
|
||||
first.rig.access.tcp_port = addr[colon + 1..].parse().ok();
|
||||
}
|
||||
}
|
||||
RigAccess::Sdr { args } => {
|
||||
first.rig.access.access_type = Some("sdr".to_string());
|
||||
first.rig.access.args = Some(args.clone());
|
||||
}
|
||||
}
|
||||
(legacy.callsign, legacy.latitude, legacy.longitude)
|
||||
} else {
|
||||
// Multi-rig path: validate all rig models are registered.
|
||||
for rig_cfg in &resolved_rigs {
|
||||
if let Some(ref model) = rig_cfg.rig.model {
|
||||
let norm = normalize_name(model);
|
||||
if !registry.is_backend_registered(&norm) {
|
||||
return Err(format!(
|
||||
"Unknown rig model '{}' for rig '{}' (available: {})",
|
||||
norm,
|
||||
rig_cfg.id,
|
||||
registry.registered_backends().join(", ")
|
||||
)
|
||||
.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
let callsign = cli.callsign.clone().or_else(|| cfg.general.callsign.clone());
|
||||
(callsign, cfg.general.latitude, cfg.general.longitude)
|
||||
};
|
||||
|
||||
info!(
|
||||
"Starting trx-server with {} rig(s): {}",
|
||||
resolved_rigs.len(),
|
||||
resolved_rigs
|
||||
.iter()
|
||||
.map(|r| r.id.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
);
|
||||
if let Some(ref cs) = callsign {
|
||||
info!("Callsign: {}", cs);
|
||||
}
|
||||
|
||||
let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
// The first rig id is the default for backward-compat clients that omit rig_id.
|
||||
let default_rig_id = resolved_rigs
|
||||
.first()
|
||||
.map(|r| r.id.clone())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
|
||||
let mut rig_handles: HashMap<String, RigHandle> = HashMap::new();
|
||||
|
||||
for rig_cfg in &resolved_rigs {
|
||||
let rig_model = normalize_name(rig_cfg.rig.model.as_deref().unwrap_or(""));
|
||||
|
||||
let access = access_from_rig_instance(rig_cfg)?;
|
||||
|
||||
match &access {
|
||||
RigAccess::Serial { path, baud } => {
|
||||
info!(
|
||||
"Starting trx-server (rig: {}, access: serial {} @ {} baud)",
|
||||
resolved.rig, path, baud
|
||||
"[{}] Starting (rig: {}, access: serial {} @ {} baud)",
|
||||
rig_cfg.id, rig_model, path, baud
|
||||
);
|
||||
}
|
||||
RigAccess::Tcp { addr } => {
|
||||
info!(
|
||||
"Starting trx-server (rig: {}, access: tcp {})",
|
||||
resolved.rig, addr
|
||||
"[{}] Starting (rig: {}, access: tcp {})",
|
||||
rig_cfg.id, rig_model, addr
|
||||
);
|
||||
}
|
||||
RigAccess::Sdr { args } => {
|
||||
info!(
|
||||
"Starting trx-server (rig: {}, access: sdr {})",
|
||||
resolved.rig, args
|
||||
"[{}] Starting (rig: {}, access: sdr {})",
|
||||
rig_cfg.id, rig_model, args
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref cs) = resolved.callsign {
|
||||
info!("Callsign: {}", cs);
|
||||
}
|
||||
|
||||
// For the SDR access type: build the SoapySdrRig with full channel config
|
||||
// here in main so we can subscribe to its primary-channel PCM sender
|
||||
// before passing the rig to the rig task. The rig task skips its
|
||||
// registry factory when `prebuilt_rig` is set.
|
||||
//
|
||||
// When the `soapysdr` feature is disabled this block is elided and
|
||||
// `sdr_pcm_rx` is always `None`, preserving the cpal path.
|
||||
// Build SDR rig when applicable.
|
||||
#[cfg(feature = "soapysdr")]
|
||||
let (sdr_prebuilt_rig, sdr_pcm_rx): (
|
||||
Option<Box<dyn trx_core::rig::RigCat>>,
|
||||
Option<tokio::sync::broadcast::Receiver<Vec<f32>>>,
|
||||
) = if cfg.rig.access.access_type.as_deref() == Some("sdr") {
|
||||
let (rig, pcm_rx) = build_sdr_rig(&cfg)?;
|
||||
Option<broadcast::Receiver<Vec<f32>>>,
|
||||
) = if rig_cfg.rig.access.access_type.as_deref() == Some("sdr") {
|
||||
let (rig, pcm_rx) = build_sdr_rig_from_instance(rig_cfg)?;
|
||||
(Some(rig), Some(pcm_rx))
|
||||
} else {
|
||||
(None, None)
|
||||
@@ -407,50 +730,80 @@ async fn main() -> DynResult<()> {
|
||||
#[cfg(not(feature = "soapysdr"))]
|
||||
let (sdr_prebuilt_rig, sdr_pcm_rx): (
|
||||
Option<Box<dyn trx_core::rig::RigCat>>,
|
||||
Option<tokio::sync::broadcast::Receiver<Vec<f32>>>,
|
||||
Option<broadcast::Receiver<Vec<f32>>>,
|
||||
) = (None, None);
|
||||
|
||||
let (tx, rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
|
||||
let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
let initial_state = RigState::new_with_metadata(
|
||||
resolved.callsign.clone(),
|
||||
let histories = DecoderHistories::new();
|
||||
|
||||
let (rig_tx, rig_rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
|
||||
let mut initial_state = RigState::new_with_metadata(
|
||||
callsign.clone(),
|
||||
Some(env!("CARGO_PKG_VERSION").to_string()),
|
||||
Some(env!("TRX_SERVER_BUILD_DATE").to_string()),
|
||||
resolved.latitude,
|
||||
resolved.longitude,
|
||||
cfg.rig.initial_freq_hz,
|
||||
cfg.rig.initial_mode.clone(),
|
||||
latitude,
|
||||
longitude,
|
||||
rig_cfg.rig.initial_freq_hz,
|
||||
rig_cfg.rig.initial_mode.clone(),
|
||||
);
|
||||
let mut initial_state = initial_state;
|
||||
initial_state.pskreporter_status = if cfg.pskreporter.enabled {
|
||||
initial_state.pskreporter_status = if rig_cfg.pskreporter.enabled {
|
||||
Some(format!(
|
||||
"Enabled ({}:{})",
|
||||
cfg.pskreporter.host, cfg.pskreporter.port
|
||||
rig_cfg.pskreporter.host, rig_cfg.pskreporter.port
|
||||
))
|
||||
} else {
|
||||
Some("Disabled".to_string())
|
||||
};
|
||||
let (state_tx, state_rx) = watch::channel(initial_state);
|
||||
// Keep receivers alive so channels don't close prematurely
|
||||
let _state_rx = state_rx;
|
||||
|
||||
let mut rig_task_config =
|
||||
build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx));
|
||||
|
||||
// Pass pre-built SDR rig to the task so it skips the registry factory.
|
||||
let mut task_config = build_rig_task_config(
|
||||
rig_cfg,
|
||||
rig_model,
|
||||
access,
|
||||
callsign.clone(),
|
||||
latitude,
|
||||
longitude,
|
||||
Arc::clone(®istry),
|
||||
histories.clone(),
|
||||
);
|
||||
if let Some(prebuilt) = sdr_prebuilt_rig {
|
||||
rig_task_config.prebuilt_rig = Some(prebuilt);
|
||||
task_config.prebuilt_rig = Some(prebuilt);
|
||||
}
|
||||
|
||||
// Spawn rig task.
|
||||
let rig_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
if let Err(e) = rig_task::run_rig_task(rig_task_config, rx, state_tx, rig_shutdown_rx).await
|
||||
if let Err(e) =
|
||||
rig_task::run_rig_task(task_config, rig_rx, state_tx, rig_shutdown_rx).await
|
||||
{
|
||||
error!("Rig task error: {:?}", e);
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn audio stack.
|
||||
let audio_handles = spawn_rig_audio_stack(
|
||||
rig_cfg,
|
||||
state_rx.clone(),
|
||||
&shutdown_rx,
|
||||
histories.clone(),
|
||||
callsign.clone(),
|
||||
latitude,
|
||||
longitude,
|
||||
cli.listen,
|
||||
sdr_pcm_rx,
|
||||
);
|
||||
task_handles.extend(audio_handles);
|
||||
|
||||
rig_handles.insert(
|
||||
rig_cfg.id.clone(),
|
||||
RigHandle {
|
||||
rig_id: rig_cfg.id.clone(),
|
||||
rig_tx,
|
||||
state_rx,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Start JSON TCP listener.
|
||||
if cfg.listen.enabled {
|
||||
let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);
|
||||
let listen_port = cli.port.unwrap_or(cfg.listen.port);
|
||||
@@ -463,15 +816,14 @@ async fn main() -> DynResult<()> {
|
||||
.filter(|t| !t.is_empty())
|
||||
.cloned()
|
||||
.collect();
|
||||
let rig_tx = tx.clone();
|
||||
let state_rx_listener = _state_rx.clone();
|
||||
let rigs_arc = Arc::new(rig_handles);
|
||||
let listener_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
if let Err(e) = listener::run_listener(
|
||||
listen_addr,
|
||||
rig_tx,
|
||||
rigs_arc,
|
||||
default_rig_id,
|
||||
auth_tokens,
|
||||
state_rx_listener,
|
||||
listener_shutdown_rx,
|
||||
)
|
||||
.await
|
||||
@@ -481,190 +833,9 @@ async fn main() -> DynResult<()> {
|
||||
}));
|
||||
}
|
||||
|
||||
if cfg.audio.enabled {
|
||||
let audio_listen =
|
||||
SocketAddr::from((cli.listen.unwrap_or(cfg.audio.listen), cfg.audio.port));
|
||||
let stream_info = AudioStreamInfo {
|
||||
sample_rate: cfg.audio.sample_rate,
|
||||
channels: cfg.audio.channels,
|
||||
frame_duration_ms: cfg.audio.frame_duration_ms,
|
||||
};
|
||||
|
||||
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
|
||||
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
|
||||
|
||||
// PCM tap for server-side decoders
|
||||
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(64);
|
||||
// Decoded messages broadcast
|
||||
let (decode_tx, _) = broadcast::channel::<trx_core::decode::DecodedMessage>(256);
|
||||
|
||||
if cfg.pskreporter.enabled {
|
||||
let callsign = resolved.callsign.clone().unwrap_or_default();
|
||||
if callsign.trim().is_empty() {
|
||||
warn!("PSK Reporter enabled but [general].callsign is empty; uplink disabled");
|
||||
} else {
|
||||
let pr_cfg = cfg.pskreporter.clone();
|
||||
let pr_state_rx = _state_rx.clone();
|
||||
let pr_decode_rx = decode_tx.subscribe();
|
||||
let pr_shutdown_rx = shutdown_rx.clone();
|
||||
let latitude = resolved.latitude;
|
||||
let longitude = resolved.longitude;
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = pskreporter::run_pskreporter_uplink(
|
||||
pr_cfg,
|
||||
callsign,
|
||||
latitude,
|
||||
longitude,
|
||||
pr_state_rx,
|
||||
pr_decode_rx
|
||||
) => {}
|
||||
_ = wait_for_shutdown(pr_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.aprsfi.enabled {
|
||||
let callsign = resolved.callsign.clone().unwrap_or_default();
|
||||
if callsign.trim().is_empty() {
|
||||
warn!("APRS-IS IGate enabled but [general].callsign is empty; uplink disabled");
|
||||
} else {
|
||||
let ai_cfg = cfg.aprsfi.clone();
|
||||
let ai_decode_rx = decode_tx.subscribe();
|
||||
let ai_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = aprsfi::run_aprsfi_uplink(ai_cfg, callsign, ai_decode_rx) => {}
|
||||
_ = wait_for_shutdown(ai_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
let decoder_logs = match DecoderLoggers::from_config(&cfg.decode_logs) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("Decoder file logging disabled: {}", e);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if cfg.audio.rx_enabled {
|
||||
if let Some(mut sdr_rx) = sdr_pcm_rx {
|
||||
// SDR path: the backend pipeline provides demodulated PCM,
|
||||
// so cpal capture is skipped entirely.
|
||||
// The SDR PCM frames are bridged into pcm_tx so the existing
|
||||
// decoder spawn code below receives them unchanged.
|
||||
tracing::info!("using SDR audio source — cpal capture disabled");
|
||||
let pcm_tx_clone = pcm_tx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
loop {
|
||||
match sdr_rx.recv().await {
|
||||
Ok(frame) => {
|
||||
let _ = pcm_tx_clone.send(frame);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!("SDR audio bridge: dropped {} frames", n);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
// cpal path (existing serial/TCP transceivers)
|
||||
let _capture_thread = audio::spawn_audio_capture(
|
||||
&cfg.audio,
|
||||
rx_audio_tx.clone(),
|
||||
Some(pcm_tx.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn APRS decoder task
|
||||
let aprs_pcm_rx = pcm_tx.subscribe();
|
||||
let aprs_state_rx = _state_rx.clone();
|
||||
let aprs_decode_tx = decode_tx.clone();
|
||||
let aprs_sr = cfg.audio.sample_rate;
|
||||
let aprs_ch = cfg.audio.channels;
|
||||
let aprs_shutdown_rx = shutdown_rx.clone();
|
||||
let aprs_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, aprs_logs) => {}
|
||||
_ = wait_for_shutdown(aprs_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn CW decoder task
|
||||
let cw_pcm_rx = pcm_tx.subscribe();
|
||||
let cw_state_rx = _state_rx.clone();
|
||||
let cw_decode_tx = decode_tx.clone();
|
||||
let cw_sr = cfg.audio.sample_rate;
|
||||
let cw_ch = cfg.audio.channels;
|
||||
let cw_shutdown_rx = shutdown_rx.clone();
|
||||
let cw_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {}
|
||||
_ = wait_for_shutdown(cw_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn FT8 decoder task
|
||||
let ft8_pcm_rx = pcm_tx.subscribe();
|
||||
let ft8_state_rx = _state_rx.clone();
|
||||
let ft8_decode_tx = decode_tx.clone();
|
||||
let ft8_sr = cfg.audio.sample_rate;
|
||||
let ft8_ch = cfg.audio.channels;
|
||||
let ft8_shutdown_rx = shutdown_rx.clone();
|
||||
let ft8_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx, ft8_logs) => {}
|
||||
_ = wait_for_shutdown(ft8_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
// Spawn WSPR decoder task
|
||||
let wspr_pcm_rx = pcm_tx.subscribe();
|
||||
let wspr_state_rx = _state_rx.clone();
|
||||
let wspr_decode_tx = decode_tx.clone();
|
||||
let wspr_sr = cfg.audio.sample_rate;
|
||||
let wspr_ch = cfg.audio.channels;
|
||||
let wspr_shutdown_rx = shutdown_rx.clone();
|
||||
let wspr_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx, wspr_logs) => {}
|
||||
_ = wait_for_shutdown(wspr_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
if cfg.audio.tx_enabled {
|
||||
let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx);
|
||||
}
|
||||
|
||||
let audio_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
if let Err(e) = audio::run_audio_listener(
|
||||
audio_listen,
|
||||
rx_audio_tx,
|
||||
tx_audio_tx,
|
||||
stream_info,
|
||||
decode_tx,
|
||||
audio_shutdown_rx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Audio listener error: {:?}", e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
signal::ctrl_c().await?;
|
||||
info!("Ctrl+C received, shutting down");
|
||||
let _ = shutdown_tx.send(true);
|
||||
drop(tx);
|
||||
tokio::time::sleep(Duration::from_millis(400)).await;
|
||||
|
||||
for handle in &task_handles {
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
//! Thin handle giving the listener access to one rig's task and state.
|
||||
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
|
||||
/// A handle to a single running rig backend.
|
||||
///
|
||||
/// One `RigHandle` is created per rig in `main.rs` and stored in the shared
|
||||
/// `Arc<HashMap<String, RigHandle>>` passed to the listener.
|
||||
pub struct RigHandle {
|
||||
/// Stable rig identifier, matches the key in the HashMap.
|
||||
pub rig_id: String,
|
||||
/// Send commands to the rig task.
|
||||
pub rig_tx: mpsc::Sender<RigRequest>,
|
||||
/// Watch the latest rig state for fast GetState/GetRigs responses.
|
||||
pub state_rx: watch::Receiver<RigState>,
|
||||
}
|
||||
@@ -24,12 +24,14 @@ use trx_core::rig::state::{RigMode, RigSnapshot, RigState};
|
||||
use trx_core::rig::{RigCat, RigRxStatus, RigTxStatus};
|
||||
use trx_core::{DynResult, RigError, RigResult};
|
||||
|
||||
use crate::audio;
|
||||
use crate::audio::DecoderHistories;
|
||||
use crate::error::is_invalid_bcd_error;
|
||||
|
||||
/// Configuration for the rig task.
|
||||
pub struct RigTaskConfig {
|
||||
pub registry: Arc<RegistrationContext>,
|
||||
/// Stable rig identifier (matches the key in the listener's HashMap).
|
||||
pub rig_id: String,
|
||||
pub rig_model: String,
|
||||
pub access: RigAccess,
|
||||
pub polling: AdaptivePolling,
|
||||
@@ -42,6 +44,9 @@ pub struct RigTaskConfig {
|
||||
pub server_latitude: Option<f64>,
|
||||
pub server_longitude: Option<f64>,
|
||||
pub pskreporter_status: Option<String>,
|
||||
/// Per-rig decoder history store. Used by Reset* commands to clear the
|
||||
/// history and by the audio listener to serve history on connection.
|
||||
pub histories: Arc<DecoderHistories>,
|
||||
/// Pre-built rig backend. When `Some`, the registry factory is skipped.
|
||||
/// Used by the SDR path in `main.rs` to pass a fully-configured
|
||||
/// `SoapySdrRig` (built with channel config) without duplicating the
|
||||
@@ -55,6 +60,7 @@ impl Default for RigTaskConfig {
|
||||
trx_backend::register_builtin_backends_on(&mut registry);
|
||||
Self {
|
||||
registry: Arc::new(registry),
|
||||
rig_id: "default".to_string(),
|
||||
rig_model: "ft817".to_string(),
|
||||
access: RigAccess::Serial {
|
||||
path: "/dev/ttyUSB0".to_string(),
|
||||
@@ -70,6 +76,7 @@ impl Default for RigTaskConfig {
|
||||
server_latitude: None,
|
||||
server_longitude: None,
|
||||
pskreporter_status: None,
|
||||
histories: DecoderHistories::new(),
|
||||
prebuilt_rig: None,
|
||||
}
|
||||
}
|
||||
@@ -93,7 +100,8 @@ pub async fn run_rig_task(
|
||||
state_tx: watch::Sender<RigState>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) -> DynResult<()> {
|
||||
info!("Opening rig backend {}", config.rig_model);
|
||||
let histories = config.histories.clone();
|
||||
info!("[{}] Opening rig backend {}", config.rig_id, config.rig_model);
|
||||
match &config.access {
|
||||
RigAccess::Serial { path, baud } => info!("Serial: {} @ {} baud", path, baud),
|
||||
RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr),
|
||||
@@ -317,6 +325,7 @@ pub async fn run_rig_task(
|
||||
last_power_on: &mut last_power_on,
|
||||
state_tx: &state_tx,
|
||||
retry,
|
||||
histories: &histories,
|
||||
};
|
||||
let result = process_command(cmd, &mut cmd_ctx).await;
|
||||
|
||||
@@ -347,6 +356,7 @@ struct CommandExecContext<'a> {
|
||||
last_power_on: &'a mut Option<Instant>,
|
||||
state_tx: &'a watch::Sender<RigState>,
|
||||
retry: &'a ExponentialBackoff,
|
||||
histories: &'a Arc<DecoderHistories>,
|
||||
}
|
||||
|
||||
async fn process_command(
|
||||
@@ -393,7 +403,7 @@ async fn process_command(
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::ResetAprsDecoder => {
|
||||
audio::clear_aprs_history();
|
||||
ctx.histories.clear_aprs_history();
|
||||
ctx.state.aprs_decode_reset_seq += 1;
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
@@ -404,13 +414,13 @@ async fn process_command(
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::ResetFt8Decoder => {
|
||||
audio::clear_ft8_history();
|
||||
ctx.histories.clear_ft8_history();
|
||||
ctx.state.ft8_decode_reset_seq += 1;
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::ResetWsprDecoder => {
|
||||
audio::clear_wspr_history();
|
||||
ctx.histories.clear_wspr_history();
|
||||
ctx.state.wspr_decode_reset_seq += 1;
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
|
||||
Reference in New Issue
Block a user