[refactor](trx-rs): resolve all P1/P2 improvement areas

P1 (High Priority):
- Fix LIFO command batching in rig_task.rs (batch.pop→batch.remove(0))
- Add ±25% jitter to ExponentialBackoff to prevent thundering herd
- Add 10,000-entry capacity bounds to decoder history queues
- Add rig task crash detection with Error state broadcast
- Decompose FrontendRuntimeContext 50-field god-struct into 9 sub-structs
  (AudioContext, DecodeHistoryContext, HttpAuthConfig, HttpUiConfig,
   RigRoutingContext, OwnerInfo, VChanContext, SpectrumContext, PerRigAudioContext)
- Migrate std::sync::RwLock to tokio::sync::RwLock in background_decode.rs
- Extract find_input_device/find_output_device helpers from audio pipeline

P2 (Medium Priority):
- Introduce SoapySdrConfig builder struct (replaces 20+ positional params)
- Add define_command_mappings! macro for ClientCommand↔RigCommand mapping
- Replace silent lock poison recovery with lock_or_recover() warning logger
- Make timeouts configurable via RigTaskConfig/ListenerConfig and TOML
- Extract shared config types to trx-app/src/shared_config.rs

Documentation updated in CLAUDE.md, Architecture.md, Improvement-Areas.md.

https://claude.ai/code/session_01P9G7QCWfiYbPVJ7cgiXznf
Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-28 23:26:55 +00:00
committed by Stan Grams
parent 0a60684e28
commit 16426548de
22 changed files with 1245 additions and 916 deletions
+2
View File
@@ -4,8 +4,10 @@
pub mod config;
pub mod logging;
pub mod shared_config;
pub mod util;
pub use config::{ConfigError, ConfigFile};
pub use logging::init_logging;
pub use shared_config::{validate_log_level, validate_tokens};
pub use util::normalize_name;
+95
View File
@@ -0,0 +1,95 @@
// SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Shared configuration validation helpers used by both `trx-server` and
//! `trx-client`.
//!
//! # Non-shared structs
//!
//! `GeneralConfig` is defined separately in each binary because the fields
//! differ:
//!
//! - **Server** `GeneralConfig`: `callsign`, `log_level`, `latitude`,
//! `longitude`
//! - **Client** `GeneralConfig`: `callsign`, `log_level`, `website_url`,
//! `website_name`, `ais_vessel_url_base`
//!
//! Only `callsign` and `log_level` overlap. Merging into a single struct
//! would either bloat both binaries with unused fields or require a trait
//! abstraction that adds complexity without clear benefit.
/// Validate that a log level string is one of the accepted values.
///
/// Returns `Ok(())` when `level` is `None` (defaulting is handled elsewhere)
/// or a recognised level name.
pub fn validate_log_level(level: Option<&str>) -> Result<(), String> {
if let Some(level) = level {
match level {
"trace" | "debug" | "info" | "warn" | "error" => {}
_ => {
return Err(format!(
"[general].log_level '{}' is invalid (expected one of: trace, debug, info, warn, error)",
level
))
}
}
}
Ok(())
}
/// Validate that a list of authentication tokens contains no empty entries.
///
/// `path` is a human-readable config path prefix used in the error message
/// (e.g. `"[listen.auth].tokens"`).
pub fn validate_tokens(path: &str, tokens: &[String]) -> Result<(), String> {
if tokens.iter().any(|t| t.trim().is_empty()) {
return Err(format!("{path} must not contain empty tokens"));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_log_level_none() {
assert!(validate_log_level(None).is_ok());
}
#[test]
fn test_validate_log_level_valid() {
for level in &["trace", "debug", "info", "warn", "error"] {
assert!(validate_log_level(Some(level)).is_ok());
}
}
#[test]
fn test_validate_log_level_invalid() {
assert!(validate_log_level(Some("verbose")).is_err());
}
#[test]
fn test_validate_tokens_empty_list() {
assert!(validate_tokens("[auth].tokens", &[]).is_ok());
}
#[test]
fn test_validate_tokens_valid() {
let tokens = vec!["abc".to_string(), "def".to_string()];
assert!(validate_tokens("[auth].tokens", &tokens).is_ok());
}
#[test]
fn test_validate_tokens_rejects_empty() {
let tokens = vec!["abc".to_string(), "".to_string()];
assert!(validate_tokens("[auth].tokens", &tokens).is_err());
}
#[test]
fn test_validate_tokens_rejects_whitespace_only() {
let tokens = vec![" ".to_string()];
assert!(validate_tokens("[auth].tokens", &tokens).is_err());
}
}
+1 -23
View File
@@ -17,7 +17,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use trx_app::{ConfigError, ConfigFile};
use trx_app::{validate_log_level, validate_tokens, ConfigError, ConfigFile};
/// Top-level client configuration structure.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
@@ -677,28 +677,6 @@ impl ClientConfig {
}
}
fn validate_log_level(level: Option<&str>) -> Result<(), String> {
if let Some(level) = level {
match level {
"trace" | "debug" | "info" | "warn" | "error" => {}
_ => {
return Err(format!(
"[general].log_level '{}' is invalid (expected one of: trace, debug, info, warn, error)",
level
))
}
}
}
Ok(())
}
fn validate_tokens(path: &str, tokens: &[String]) -> Result<(), String> {
if tokens.iter().any(|t| t.trim().is_empty()) {
return Err(format!("{path} must not contain empty tokens"));
}
Ok(())
}
fn validate_http_auth(auth: &HttpAuthConfig) -> Result<(), String> {
if !auth.enabled {
return Ok(());
+53 -50
View File
@@ -150,7 +150,7 @@ async fn async_init() -> DynResult<AppState> {
info!("Loaded configuration from {}", path.display());
}
frontend_runtime.auth_tokens = cfg
frontend_runtime.http_auth.tokens = cfg
.frontends
.http_json
.auth
@@ -161,28 +161,28 @@ async fn async_init() -> DynResult<AppState> {
.collect();
// Set HTTP frontend authentication config
frontend_runtime.http_auth_enabled = cfg.frontends.http.auth.enabled;
frontend_runtime.http_auth_rx_passphrase = cfg.frontends.http.auth.rx_passphrase.clone();
frontend_runtime.http_auth_control_passphrase =
frontend_runtime.http_auth.enabled = cfg.frontends.http.auth.enabled;
frontend_runtime.http_auth.rx_passphrase = cfg.frontends.http.auth.rx_passphrase.clone();
frontend_runtime.http_auth.control_passphrase =
cfg.frontends.http.auth.control_passphrase.clone();
frontend_runtime.http_auth_tx_access_control_enabled =
frontend_runtime.http_auth.tx_access_control_enabled =
cfg.frontends.http.auth.tx_access_control_enabled;
frontend_runtime.http_auth_session_ttl_secs = cfg.frontends.http.auth.session_ttl_min * 60;
frontend_runtime.http_auth_cookie_secure = cfg.frontends.http.auth.cookie_secure;
frontend_runtime.http_auth_cookie_same_site = match cfg.frontends.http.auth.cookie_same_site {
frontend_runtime.http_auth.session_ttl_secs = cfg.frontends.http.auth.session_ttl_min * 60;
frontend_runtime.http_auth.cookie_secure = cfg.frontends.http.auth.cookie_secure;
frontend_runtime.http_auth.cookie_same_site = match cfg.frontends.http.auth.cookie_same_site {
config::CookieSameSite::Strict => "Strict".to_string(),
config::CookieSameSite::Lax => "Lax".to_string(),
config::CookieSameSite::None => "None".to_string(),
};
frontend_runtime.http_show_sdr_gain_control = cfg.frontends.http.show_sdr_gain_control;
frontend_runtime.http_initial_map_zoom = cfg.frontends.http.initial_map_zoom;
frontend_runtime.http_spectrum_coverage_margin_hz =
frontend_runtime.http_ui.show_sdr_gain_control = cfg.frontends.http.show_sdr_gain_control;
frontend_runtime.http_ui.initial_map_zoom = cfg.frontends.http.initial_map_zoom;
frontend_runtime.http_ui.spectrum_coverage_margin_hz =
cfg.frontends.http.spectrum_coverage_margin_hz;
frontend_runtime.http_spectrum_usable_span_ratio =
frontend_runtime.http_ui.spectrum_usable_span_ratio =
cfg.frontends.http.spectrum_usable_span_ratio;
frontend_runtime.http_decode_history_retention_min =
frontend_runtime.http_ui.decode_history_retention_min =
cfg.frontends.http.decode_history_retention_min;
frontend_runtime.http_decode_history_retention_min_by_rig = cfg
frontend_runtime.http_ui.decode_history_retention_min_by_rig = cfg
.frontends
.http
.decode_history_retention_min_by_rig
@@ -219,7 +219,7 @@ async fn async_init() -> DynResult<AppState> {
.clone()
.or_else(|| cfg.frontends.http.default_rig_name.clone())
.or_else(|| resolved_remotes.first().map(|e| e.name.clone()));
if let Ok(mut guard) = frontend_runtime.remote_active_rig_id.lock() {
if let Ok(mut guard) = frontend_runtime.routing.active_rig_id.lock() {
*guard = default_rig.clone();
}
@@ -264,10 +264,10 @@ async fn async_init() -> DynResult<AppState> {
.callsign
.clone()
.or_else(|| cfg.general.callsign.clone());
frontend_runtime.owner_callsign = callsign.clone();
frontend_runtime.owner_website_url = cfg.general.website_url.clone();
frontend_runtime.owner_website_name = cfg.general.website_name.clone();
frontend_runtime.ais_vessel_url_base = cfg.general.ais_vessel_url_base.clone();
frontend_runtime.owner.callsign = callsign.clone();
frontend_runtime.owner.website_url = cfg.general.website_url.clone();
frontend_runtime.owner.website_name = cfg.general.website_name.clone();
frontend_runtime.owner.ais_vessel_url_base = cfg.general.ais_vessel_url_base.clone();
let remote_names: Vec<&str> = resolved_remotes.iter().map(|e| e.name.as_str()).collect();
info!(
@@ -373,17 +373,17 @@ async fn async_init() -> DynResult<AppState> {
let remote_cfg = RemoteClientConfig {
addr: addr.clone(),
token: token.clone(),
selected_rig_id: frontend_runtime.remote_active_rig_id.clone(),
known_rigs: frontend_runtime.remote_rigs.clone(),
rig_states: frontend_runtime.rig_states.clone(),
selected_rig_id: frontend_runtime.routing.active_rig_id.clone(),
known_rigs: frontend_runtime.routing.remote_rigs.clone(),
rig_states: frontend_runtime.routing.rig_states.clone(),
poll_interval: Duration::from_millis(poll_interval),
spectrum: frontend_runtime.spectrum.clone(),
rig_spectrums: frontend_runtime.rig_spectrums.clone(),
server_connected: frontend_runtime.server_connected.clone(),
rig_server_connected: frontend_runtime.rig_server_connected.clone(),
spectrum: frontend_runtime.spectrum.sender.clone(),
rig_spectrums: frontend_runtime.spectrum.per_rig.clone(),
server_connected: frontend_runtime.routing.server_connected.clone(),
rig_server_connected: frontend_runtime.routing.rig_server_connected.clone(),
rig_id_to_short_name,
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: frontend_runtime.sat_passes.clone(),
sat_passes: frontend_runtime.routing.sat_passes.clone(),
};
let state_tx = state_tx.clone();
let remote_shutdown_rx = shutdown_rx.clone();
@@ -405,7 +405,7 @@ async fn async_init() -> DynResult<AppState> {
// channel and dispatches to the per-server channel based on rig_id_override
// (short name).
let route_map = Arc::new(route_map);
let default_rig_for_router = frontend_runtime.remote_active_rig_id.clone();
let default_rig_for_router = frontend_runtime.routing.active_rig_id.clone();
{
let route_map = route_map.clone();
let mut frontend_rx = rx;
@@ -446,24 +446,24 @@ async fn async_init() -> DynResult<AppState> {
let (stream_info_tx, stream_info_rx) = watch::channel::<Option<AudioStreamInfo>>(None);
let (decode_tx, _) = broadcast::channel::<DecodedMessage>(256);
frontend_runtime.audio_rx = Some(rx_audio_tx.clone());
frontend_runtime.audio_tx = Some(tx_audio_tx);
frontend_runtime.audio_info = Some(stream_info_rx);
frontend_runtime.decode_rx = Some(decode_tx.clone());
frontend_runtime.audio.rx = Some(rx_audio_tx.clone());
frontend_runtime.audio.tx = Some(tx_audio_tx);
frontend_runtime.audio.info = Some(stream_info_rx);
frontend_runtime.audio.decode_rx = Some(decode_tx.clone());
// Virtual-channel audio: shared broadcaster map + command channel.
let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::<trx_frontend::VChanAudioCmd>(256);
*frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
*frontend_runtime.vchan.audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
frontend_runtime.vchan_destroyed = Some(vchan_destroyed_tx.clone());
let ais_history = frontend_runtime.ais_history.clone();
let vdes_history = frontend_runtime.vdes_history.clone();
let aprs_history = frontend_runtime.aprs_history.clone();
let hf_aprs_history = frontend_runtime.hf_aprs_history.clone();
let cw_history = frontend_runtime.cw_history.clone();
let ft8_history = frontend_runtime.ft8_history.clone();
let wspr_history = frontend_runtime.wspr_history.clone();
frontend_runtime.vchan.destroyed = Some(vchan_destroyed_tx.clone());
let ais_history = frontend_runtime.decode_history.ais.clone();
let vdes_history = frontend_runtime.decode_history.vdes.clone();
let aprs_history = frontend_runtime.decode_history.aprs.clone();
let hf_aprs_history = frontend_runtime.decode_history.hf_aprs.clone();
let cw_history = frontend_runtime.decode_history.cw.clone();
let ft8_history = frontend_runtime.decode_history.ft8.clone();
let wspr_history = frontend_runtime.decode_history.wspr.clone();
let replay_history_sink: Arc<dyn Fn(DecodedMessage) + Send + Sync> = Arc::new(move |msg| {
let now = std::time::Instant::now();
match msg {
@@ -527,10 +527,10 @@ async fn async_init() -> DynResult<AppState> {
info!("Audio enabled: decode channel set");
let audio_shutdown_rx = shutdown_rx.clone();
let vchan_audio_map = frontend_runtime.vchan_audio.clone();
let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone();
let rig_audio_info_map = frontend_runtime.rig_audio_info.clone();
let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone();
let vchan_audio_map = frontend_runtime.vchan.audio.clone();
let rig_audio_rx_map = frontend_runtime.rig_audio.rx.clone();
let rig_audio_info_map = frontend_runtime.rig_audio.info.clone();
let rig_vchan_cmd_map = frontend_runtime.vchan.rig_audio_cmd.clone();
let default_audio_connect = if let Some(addr) = global_audio_addr {
AudioConnectConfig::fixed(addr)
} else {
@@ -539,8 +539,8 @@ async fn async_init() -> DynResult<AppState> {
pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager(
default_audio_connect,
audio_connect,
frontend_runtime.remote_active_rig_id.clone(),
frontend_runtime.remote_rigs.clone(),
frontend_runtime.routing.active_rig_id.clone(),
frontend_runtime.routing.remote_rigs.clone(),
rx_audio_tx,
tx_audio_rx,
stream_info_tx,
@@ -642,17 +642,20 @@ async fn async_init() -> DynResult<AppState> {
task_handles.push(audio_bridge::spawn_audio_bridge(
bridge_cfg,
frontend_runtime_ctx
.audio_rx
.audio
.rx
.as_ref()
.expect("audio rx must be set")
.clone(),
frontend_runtime_ctx
.audio_tx
.audio
.tx
.as_ref()
.expect("audio tx must be set")
.clone(),
frontend_runtime_ctx
.audio_info
.audio
.info
.as_ref()
.expect("audio info must be set")
.clone(),
+243 -158
View File
@@ -189,130 +189,253 @@ impl Default for FrontendRegistrationContext {
}
}
/// Runtime context for frontend operation, containing audio channels and decode state.
pub struct FrontendRuntimeContext {
// ---------------------------------------------------------------------------
// Sub-structs for FrontendRuntimeContext decomposition
// ---------------------------------------------------------------------------
/// Audio streaming channels (server ↔ browser).
pub struct AudioContext {
/// Audio RX broadcast channel (server → browser)
pub audio_rx: Option<broadcast::Sender<Bytes>>,
pub rx: Option<broadcast::Sender<Bytes>>,
/// Audio TX channel (browser → server)
pub audio_tx: Option<mpsc::Sender<Bytes>>,
pub tx: Option<mpsc::Sender<Bytes>>,
/// Audio stream info watch channel
pub audio_info: Option<watch::Receiver<Option<AudioStreamInfo>>>,
pub info: Option<watch::Receiver<Option<AudioStreamInfo>>>,
/// Decode message broadcast channel
pub decode_rx: Option<broadcast::Sender<DecodedMessage>>,
/// Decode history entry: (record_time, rig_id, message).
/// AIS decode history
pub ais_history: DecodeHistory<AisMessage>,
/// VDES decode history
pub vdes_history: DecodeHistory<VdesMessage>,
/// APRS decode history
pub aprs_history: DecodeHistory<AprsPacket>,
/// HF APRS decode history
pub hf_aprs_history: DecodeHistory<AprsPacket>,
/// CW decode history
pub cw_history: DecodeHistory<CwEvent>,
/// FT8 decode history
pub ft8_history: DecodeHistory<Ft8Message>,
/// FT4 decode history
pub ft4_history: DecodeHistory<Ft8Message>,
/// FT2 decode history
pub ft2_history: DecodeHistory<Ft8Message>,
/// WSPR decode history
pub wspr_history: DecodeHistory<WsprMessage>,
/// Authentication tokens for HTTP-JSON frontend
pub auth_tokens: HashSet<String>,
/// Active HTTP SSE clients (incremented on /events connect, decremented on disconnect).
pub sse_clients: Arc<AtomicUsize>,
/// Active rigctl TCP clients.
pub rigctl_clients: Arc<AtomicUsize>,
/// Active audio WebSocket streams.
pub audio_clients: Arc<AtomicUsize>,
/// rigctl listen endpoint, if enabled.
pub rigctl_listen_addr: Arc<Mutex<Option<SocketAddr>>>,
/// Guard to avoid spawning duplicate decode collectors.
pub decode_collector_started: AtomicBool,
/// HTTP frontend authentication configuration (enabled, passphrases, TTL, etc.)
pub http_auth_enabled: bool,
/// HTTP frontend auth rx passphrase
pub http_auth_rx_passphrase: Option<String>,
/// HTTP frontend auth control passphrase
pub http_auth_control_passphrase: Option<String>,
/// HTTP frontend auth tx access control enabled
pub http_auth_tx_access_control_enabled: bool,
/// HTTP frontend auth session TTL in seconds
pub http_auth_session_ttl_secs: u64,
/// HTTP frontend auth cookie secure flag
pub http_auth_cookie_secure: bool,
/// HTTP frontend auth cookie same-site policy
pub http_auth_cookie_same_site: String,
/// Whether the HTTP UI should expose the RF Gain control.
pub http_show_sdr_gain_control: bool,
/// Initial APRS map zoom level when receiver coordinates are available.
pub http_initial_map_zoom: u8,
/// Spectrum center-retune guard margin on each side of the tuned passband.
pub http_spectrum_coverage_margin_hz: u32,
/// Fraction of the sampled spectrum span treated as usable by the web UI.
pub http_spectrum_usable_span_ratio: f32,
/// Default decode history retention in minutes.
pub http_decode_history_retention_min: u64,
/// Per-rig decode history retention overrides in minutes.
pub http_decode_history_retention_min_by_rig: HashMap<String, u64>,
/// Currently selected remote rig id (used by remote client routing).
pub remote_active_rig_id: Arc<Mutex<Option<String>>>,
pub clients: Arc<AtomicUsize>,
}
impl Default for AudioContext {
fn default() -> Self {
Self {
rx: None,
tx: None,
info: None,
decode_rx: None,
clients: Arc::new(AtomicUsize::new(0)),
}
}
}
/// Decode history entries for all decoder types.
pub struct DecodeHistoryContext {
pub ais: DecodeHistory<AisMessage>,
pub vdes: DecodeHistory<VdesMessage>,
pub aprs: DecodeHistory<AprsPacket>,
pub hf_aprs: DecodeHistory<AprsPacket>,
pub cw: DecodeHistory<CwEvent>,
pub ft8: DecodeHistory<Ft8Message>,
pub ft4: DecodeHistory<Ft8Message>,
pub ft2: DecodeHistory<Ft8Message>,
pub wspr: DecodeHistory<WsprMessage>,
}
impl Default for DecodeHistoryContext {
fn default() -> Self {
Self {
ais: Arc::new(Mutex::new(VecDeque::new())),
vdes: Arc::new(Mutex::new(VecDeque::new())),
aprs: Arc::new(Mutex::new(VecDeque::new())),
hf_aprs: Arc::new(Mutex::new(VecDeque::new())),
cw: Arc::new(Mutex::new(VecDeque::new())),
ft8: Arc::new(Mutex::new(VecDeque::new())),
ft4: Arc::new(Mutex::new(VecDeque::new())),
ft2: Arc::new(Mutex::new(VecDeque::new())),
wspr: Arc::new(Mutex::new(VecDeque::new())),
}
}
}
/// HTTP authentication configuration.
pub struct HttpAuthConfig {
pub enabled: bool,
pub rx_passphrase: Option<String>,
pub control_passphrase: Option<String>,
pub tx_access_control_enabled: bool,
pub session_ttl_secs: u64,
pub cookie_secure: bool,
pub cookie_same_site: String,
/// Authentication tokens for HTTP-JSON frontend.
pub tokens: HashSet<String>,
}
impl Default for HttpAuthConfig {
fn default() -> Self {
Self {
enabled: false,
rx_passphrase: None,
control_passphrase: None,
tx_access_control_enabled: true,
session_ttl_secs: 480 * 60,
cookie_secure: false,
cookie_same_site: "Lax".to_string(),
tokens: HashSet::new(),
}
}
}
/// HTTP UI display configuration.
pub struct HttpUiConfig {
pub show_sdr_gain_control: bool,
pub initial_map_zoom: u8,
pub spectrum_coverage_margin_hz: u32,
pub spectrum_usable_span_ratio: f32,
pub decode_history_retention_min: u64,
pub decode_history_retention_min_by_rig: HashMap<String, u64>,
}
impl Default for HttpUiConfig {
fn default() -> Self {
Self {
show_sdr_gain_control: true,
initial_map_zoom: 10,
spectrum_coverage_margin_hz: 50_000,
spectrum_usable_span_ratio: 0.92,
decode_history_retention_min: 24 * 60,
decode_history_retention_min_by_rig: HashMap::new(),
}
}
}
/// Remote rig routing and state management.
pub struct RigRoutingContext {
/// Currently selected remote rig id.
pub active_rig_id: Arc<Mutex<Option<String>>>,
/// Cached remote rig list from GetRigs polling.
pub remote_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
/// Cached satellite pass predictions from the server (GetSatPasses).
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
/// Per-rig state watch channels, keyed by rig_id.
/// Populated by the remote client poll loop so each SSE session can
/// subscribe to a specific rig's state independently.
pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>,
/// Owner callsign from trx-client config/CLI for frontend display.
pub owner_callsign: Option<String>,
/// Optional website URL for the web UI header title link.
pub owner_website_url: Option<String>,
/// Optional website name for the web UI header title label.
pub owner_website_name: Option<String>,
/// Optional base URL used to link AIS vessel names as `<base><mmsi>`.
pub ais_vessel_url_base: Option<String>,
/// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`.
pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
/// Per-rig spectrum watch channels, keyed by rig_id.
/// Populated by the remote client spectrum polling task so each SSE
/// session can subscribe to a specific rig's spectrum independently.
pub rig_spectrums: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>,
/// Per-rig RX audio broadcast senders, keyed by rig_id.
/// Each rig's audio client task publishes Opus frames here.
pub rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
/// Per-rig audio stream info watch channels, keyed by rig_id.
pub rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
/// Per-rig virtual-channel command senders, keyed by rig_id.
pub rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::Sender<VChanAudioCmd>>>>,
/// Per-virtual-channel Opus audio senders.
/// Key: server-side virtual channel UUID.
/// Value: `broadcast::Sender<Bytes>` that receives per-channel Opus packets
/// forwarded by the audio-client task from `AUDIO_MSG_RX_FRAME_CH` frames.
pub vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
/// Channel to send `VChanAudioCmd` to the audio-client task, which in turn
/// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection.
/// `None` when no audio connection is active.
pub vchan_audio_cmd: Arc<Mutex<Option<mpsc::Sender<VChanAudioCmd>>>>,
/// Broadcast sender that fires whenever the server destroys a virtual
/// channel (e.g. out-of-bandwidth after center-frequency retune).
/// The HTTP frontend subscribes to clean up `ClientChannelManager`.
pub vchan_destroyed: Option<broadcast::Sender<Uuid>>,
/// Whether the remote client currently has an active TCP connection to
/// trx-server. Set to `true` on successful connect, `false` on drop.
/// Whether the remote client currently has an active TCP connection.
pub server_connected: Arc<AtomicBool>,
/// Per-rig server connection state, keyed by short name (or rig_id in legacy mode).
/// `true` while the rig's trx-server connection is active.
/// Allows the UI to freeze only the rig that lost its connection.
/// Per-rig server connection state.
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
}
impl Default for RigRoutingContext {
fn default() -> Self {
Self {
active_rig_id: Arc::new(Mutex::new(None)),
remote_rigs: Arc::new(Mutex::new(Vec::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
}
}
}
/// Owner/station metadata for frontend display.
#[derive(Default)]
pub struct OwnerInfo {
pub callsign: Option<String>,
pub website_url: Option<String>,
pub website_name: Option<String>,
pub ais_vessel_url_base: Option<String>,
}
/// Virtual channel audio management.
pub struct VChanContext {
/// Per-virtual-channel Opus audio senders.
pub audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
/// Channel to send `VChanAudioCmd` to the audio-client task.
pub audio_cmd: Arc<Mutex<Option<mpsc::Sender<VChanAudioCmd>>>>,
/// Broadcast sender that fires when the server destroys a virtual channel.
pub destroyed: Option<broadcast::Sender<Uuid>>,
/// Per-rig virtual-channel command senders.
pub rig_audio_cmd: Arc<RwLock<HashMap<String, mpsc::Sender<VChanAudioCmd>>>>,
}
impl Default for VChanContext {
fn default() -> Self {
Self {
audio: Arc::new(RwLock::new(HashMap::new())),
audio_cmd: Arc::new(Mutex::new(None)),
destroyed: None,
rig_audio_cmd: Arc::new(RwLock::new(HashMap::new())),
}
}
}
/// Spectrum data management.
pub struct SpectrumContext {
/// Spectrum sender; SSE clients subscribe via `sender.subscribe()`.
pub sender: Arc<watch::Sender<SharedSpectrum>>,
/// Per-rig spectrum watch channels, keyed by rig_id.
pub per_rig: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>,
}
impl Default for SpectrumContext {
fn default() -> Self {
Self {
sender: {
let (tx, _rx) = watch::channel(SharedSpectrum::default());
Arc::new(tx)
},
per_rig: Arc::new(RwLock::new(HashMap::new())),
}
}
}
/// Per-rig audio channels for multi-rig setups.
pub struct PerRigAudioContext {
/// Per-rig RX audio broadcast senders.
pub rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
/// Per-rig audio stream info watch channels.
pub info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
}
impl Default for PerRigAudioContext {
fn default() -> Self {
Self {
rx: Arc::new(RwLock::new(HashMap::new())),
info: Arc::new(RwLock::new(HashMap::new())),
}
}
}
/// Runtime context for frontend operation.
///
/// Decomposed into coherent sub-structs to improve readability and allow
/// frontends to access only the context groups they need.
pub struct FrontendRuntimeContext {
/// Audio streaming channels.
pub audio: AudioContext,
/// Decode history for all decoder types.
pub decode_history: DecodeHistoryContext,
/// HTTP authentication configuration.
pub http_auth: HttpAuthConfig,
/// HTTP UI display configuration.
pub http_ui: HttpUiConfig,
/// Remote rig routing and state.
pub routing: RigRoutingContext,
/// Owner/station metadata.
pub owner: OwnerInfo,
/// Virtual channel management.
pub vchan: VChanContext,
/// Spectrum data.
pub spectrum: SpectrumContext,
/// Per-rig audio channels.
pub rig_audio: PerRigAudioContext,
/// Active HTTP SSE clients.
pub sse_clients: Arc<AtomicUsize>,
/// Active rigctl TCP clients.
pub rigctl_clients: Arc<AtomicUsize>,
/// rigctl listen endpoint, if enabled.
pub rigctl_listen_addr: Arc<Mutex<Option<SocketAddr>>>,
/// Guard to avoid spawning duplicate decode collectors.
pub decode_collector_started: AtomicBool,
}
impl FrontendRuntimeContext {
/// Get a watch receiver for a specific rig's state.
pub fn rig_state_rx(&self, rig_id: &str) -> Option<watch::Receiver<RigState>> {
self.rig_states
self.routing
.rig_states
.read()
.ok()
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
@@ -321,13 +444,13 @@ impl FrontendRuntimeContext {
/// Get a watch receiver for a specific rig's spectrum.
/// Lazily inserts a new channel if the rig_id is not yet present.
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {
if let Ok(map) = self.rig_spectrums.read() {
if let Ok(map) = self.spectrum.per_rig.read() {
if let Some(tx) = map.get(rig_id) {
return tx.subscribe();
}
}
// Insert on miss.
if let Ok(mut map) = self.rig_spectrums.write() {
if let Ok(mut map) = self.spectrum.per_rig.write() {
map.entry(rig_id.to_string())
.or_insert_with(|| watch::channel(SharedSpectrum::default()).0)
.subscribe()
@@ -339,7 +462,8 @@ impl FrontendRuntimeContext {
/// Subscribe to a specific rig's RX audio broadcast.
pub fn rig_audio_subscribe(&self, rig_id: &str) -> Option<broadcast::Receiver<Bytes>> {
self.rig_audio_rx
self.rig_audio
.rx
.read()
.ok()
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
@@ -350,7 +474,8 @@ impl FrontendRuntimeContext {
&self,
rig_id: &str,
) -> Option<watch::Receiver<Option<AudioStreamInfo>>> {
self.rig_audio_info
self.rig_audio
.info
.read()
.ok()
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
@@ -359,59 +484,19 @@ impl FrontendRuntimeContext {
/// Create a new empty runtime context.
pub fn new() -> Self {
Self {
audio_rx: None,
audio_tx: None,
audio_info: None,
decode_rx: None,
ais_history: Arc::new(Mutex::new(VecDeque::new())),
vdes_history: Arc::new(Mutex::new(VecDeque::new())),
aprs_history: Arc::new(Mutex::new(VecDeque::new())),
hf_aprs_history: Arc::new(Mutex::new(VecDeque::new())),
cw_history: Arc::new(Mutex::new(VecDeque::new())),
ft8_history: Arc::new(Mutex::new(VecDeque::new())),
ft4_history: Arc::new(Mutex::new(VecDeque::new())),
ft2_history: Arc::new(Mutex::new(VecDeque::new())),
wspr_history: Arc::new(Mutex::new(VecDeque::new())),
auth_tokens: HashSet::new(),
audio: AudioContext::default(),
decode_history: DecodeHistoryContext::default(),
http_auth: HttpAuthConfig::default(),
http_ui: HttpUiConfig::default(),
routing: RigRoutingContext::default(),
owner: OwnerInfo::default(),
vchan: VChanContext::default(),
spectrum: SpectrumContext::default(),
rig_audio: PerRigAudioContext::default(),
sse_clients: Arc::new(AtomicUsize::new(0)),
rigctl_clients: Arc::new(AtomicUsize::new(0)),
audio_clients: Arc::new(AtomicUsize::new(0)),
rigctl_listen_addr: Arc::new(Mutex::new(None)),
decode_collector_started: AtomicBool::new(false),
http_auth_enabled: false,
http_auth_rx_passphrase: None,
http_auth_control_passphrase: None,
http_auth_tx_access_control_enabled: true,
http_auth_session_ttl_secs: 480 * 60,
http_auth_cookie_secure: false,
http_auth_cookie_same_site: "Lax".to_string(),
http_show_sdr_gain_control: true,
http_initial_map_zoom: 10,
http_spectrum_coverage_margin_hz: 50_000,
http_spectrum_usable_span_ratio: 0.92,
http_decode_history_retention_min: 24 * 60,
http_decode_history_retention_min_by_rig: HashMap::new(),
remote_active_rig_id: Arc::new(Mutex::new(None)),
remote_rigs: Arc::new(Mutex::new(Vec::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
owner_callsign: None,
owner_website_url: None,
owner_website_name: None,
ais_vessel_url_base: None,
spectrum: {
let (tx, _rx) = watch::channel(SharedSpectrum::default());
Arc::new(tx)
},
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_audio_rx: Arc::new(RwLock::new(HashMap::new())),
rig_audio_info: Arc::new(RwLock::new(HashMap::new())),
rig_vchan_audio_cmd: Arc::new(RwLock::new(HashMap::new())),
vchan_audio: Arc::new(RwLock::new(HashMap::new())),
vchan_audio_cmd: Arc::new(Mutex::new(None)),
vchan_destroyed: None,
server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
}
}
}
@@ -129,7 +129,7 @@ async fn handle_client(
}
if let Some(rig_id) = envelope.rig_id.as_ref() {
if let Ok(mut active) = context.remote_active_rig_id.lock() {
if let Ok(mut active) = context.routing.active_rig_id.lock() {
*active = Some(rig_id.clone());
}
}
@@ -148,7 +148,8 @@ async fn handle_client(
}
let active_rig_id = context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|v| v.clone());
@@ -245,6 +246,7 @@ async fn handle_client(
fn snapshot_remote_rigs(context: &FrontendRuntimeContext) -> Vec<RigEntry> {
context
.routing
.remote_rigs
.lock()
.ok()
@@ -333,7 +335,7 @@ async fn send_response(
}
fn authorize(token: &Option<String>, context: &FrontendRuntimeContext) -> Result<(), String> {
let validator = SimpleTokenValidator::new(context.auth_tokens.clone());
let validator = SimpleTokenValidator::new(context.http_auth.tokens.clone());
validator.validate(token)
}
@@ -436,7 +438,7 @@ mod tests {
let addr = loopback_addr();
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
let mut runtime = FrontendRuntimeContext::new();
runtime.auth_tokens = HashSet::from(["secret".to_string()]);
runtime.http_auth.tokens = HashSet::from(["secret".to_string()]);
let ctx = Arc::new(runtime);
let handle = tokio::spawn(serve(addr, rig_tx, ctx));
@@ -211,16 +211,17 @@ fn frontend_meta_from_context(
let server_connected = rig_id
.and_then(|rid| {
context
.routing
.rig_server_connected
.read()
.ok()
.and_then(|m| m.get(rid).copied())
})
.unwrap_or_else(|| context.server_connected.load(Ordering::Relaxed));
.unwrap_or_else(|| context.routing.server_connected.load(Ordering::Relaxed));
FrontendMeta {
http_clients,
rigctl_clients: context.rigctl_clients.load(Ordering::Relaxed),
audio_clients: context.audio_clients.load(Ordering::Relaxed),
audio_clients: context.audio.clients.load(Ordering::Relaxed),
rigctl_addr: rigctl_addr_from_context(context),
active_remote: active_rig_id_from_context(context),
remotes: rig_ids_from_context(context),
@@ -248,7 +249,8 @@ fn rigctl_addr_from_context(context: &FrontendRuntimeContext) -> Option<String>
fn active_rig_id_from_context(context: &FrontendRuntimeContext) -> Option<String> {
context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|v| v.clone())
@@ -256,6 +258,7 @@ fn active_rig_id_from_context(context: &FrontendRuntimeContext) -> Option<String
fn rig_ids_from_context(context: &FrontendRuntimeContext) -> Vec<String> {
context
.routing
.remote_rigs
.lock()
.ok()
@@ -264,41 +267,42 @@ fn rig_ids_from_context(context: &FrontendRuntimeContext) -> Vec<String> {
}
fn owner_callsign_from_context(context: &FrontendRuntimeContext) -> Option<String> {
context.owner_callsign.clone()
context.owner.callsign.clone()
}
fn owner_website_url_from_context(context: &FrontendRuntimeContext) -> Option<String> {
context.owner_website_url.clone()
context.owner.website_url.clone()
}
fn owner_website_name_from_context(context: &FrontendRuntimeContext) -> Option<String> {
context.owner_website_name.clone()
context.owner.website_name.clone()
}
fn ais_vessel_url_base_from_context(context: &FrontendRuntimeContext) -> Option<String> {
context.ais_vessel_url_base.clone()
context.owner.ais_vessel_url_base.clone()
}
fn show_sdr_gain_control_from_context(context: &FrontendRuntimeContext) -> bool {
context.http_show_sdr_gain_control
context.http_ui.show_sdr_gain_control
}
fn initial_map_zoom_from_context(context: &FrontendRuntimeContext) -> u8 {
context.http_initial_map_zoom
context.http_ui.initial_map_zoom
}
fn spectrum_coverage_margin_hz_from_context(context: &FrontendRuntimeContext) -> u32 {
context.http_spectrum_coverage_margin_hz
context.http_ui.spectrum_coverage_margin_hz
}
fn spectrum_usable_span_ratio_from_context(context: &FrontendRuntimeContext) -> f32 {
context.http_spectrum_usable_span_ratio
context.http_ui.spectrum_usable_span_ratio
}
fn decode_history_retention_min_from_context(context: &FrontendRuntimeContext) -> u64 {
let default_minutes = context.http_decode_history_retention_min.max(1);
let default_minutes = context.http_ui.decode_history_retention_min.max(1);
let Some(active_rig_id) = context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|v| v.clone())
@@ -306,7 +310,8 @@ fn decode_history_retention_min_from_context(context: &FrontendRuntimeContext) -
return default_minutes;
};
context
.http_decode_history_retention_min_by_rig
.http_ui
.decode_history_retention_min_by_rig
.get(&active_rig_id)
.copied()
.filter(|minutes| *minutes > 0)
@@ -343,7 +348,8 @@ pub async fn events(
// rig it has selected without mutating global state.
let active_rig_id = query.remote.clone().filter(|s| !s.is_empty()).or_else(|| {
context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|g| g.clone())
@@ -419,7 +425,8 @@ pub async fn events(
state.snapshot().and_then(|v| {
let rig_id_opt = session_rig_mgr.get_rig(session_id).or_else(|| {
context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|g| g.clone())
@@ -687,7 +694,7 @@ pub async fn decode_history(
context: web::Data<Arc<FrontendRuntimeContext>>,
query: web::Query<RemoteQuery>,
) -> impl Responder {
if context.decode_rx.is_none() {
if context.audio.decode_rx.is_none() {
return HttpResponse::NotFound().body("decode not enabled");
}
let rig_filter = query.remote.as_deref().filter(|s| !s.is_empty());
@@ -807,7 +814,7 @@ pub async fn spectrum(
let rx = if let Some(ref remote) = query.remote {
context.rig_spectrum_rx(remote)
} else {
context.spectrum.subscribe()
context.spectrum.sender.subscribe()
};
let mut last_rds_json: Option<String> = None;
let mut last_vchan_rds_json: Option<String> = None;
@@ -1351,7 +1358,7 @@ struct SatPassesResponse {
/// are not yet available.
#[get("/sat_passes")]
pub async fn sat_passes(context: web::Data<Arc<FrontendRuntimeContext>>) -> impl Responder {
let cached = context.sat_passes.read().ok().and_then(|g| g.clone());
let cached = context.routing.sat_passes.read().ok().and_then(|g| g.clone());
match cached {
Some(result) => {
let error = match result.tle_source {
@@ -1901,6 +1908,7 @@ struct RigListResponse {
fn build_rig_list_payload(context: &FrontendRuntimeContext) -> RigListResponse {
let active_remote = active_rig_id_from_context(context);
let rigs = context
.routing
.remote_rigs
.lock()
.ok()
@@ -1952,6 +1960,7 @@ pub async fn select_rig(
}
let known = context
.routing
.remote_rigs
.lock()
.ok()
@@ -36,15 +36,17 @@ fn current_timestamp_ms() -> i64 {
}
fn decode_history_retention(context: &FrontendRuntimeContext) -> Duration {
let default_minutes = context.http_decode_history_retention_min.max(1);
let default_minutes = context.http_ui.decode_history_retention_min.max(1);
let minutes = context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|v| v.clone())
.and_then(|rig_id| {
context
.http_decode_history_retention_min_by_rig
.http_ui
.decode_history_retention_min_by_rig
.get(&rig_id)
.copied()
})
@@ -111,7 +113,8 @@ fn prune_vdes_history(
fn active_rig_id(context: &FrontendRuntimeContext) -> Option<String> {
context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|g| g.clone())
@@ -123,7 +126,7 @@ fn record_ais(context: &FrontendRuntimeContext, mut msg: AisMessage) {
}
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ais_history
.decode_history.ais
.lock()
.expect("ais history mutex poisoned");
history.push_back((Instant::now(), rig_id, msg));
@@ -136,7 +139,7 @@ fn record_vdes(context: &FrontendRuntimeContext, mut msg: VdesMessage) {
}
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.vdes_history
.decode_history.vdes
.lock()
.expect("vdes history mutex poisoned");
history.push_back((Instant::now(), rig_id, msg));
@@ -214,7 +217,7 @@ fn record_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
}
let rig_id = pkt.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.aprs_history
.decode_history.aprs
.lock()
.expect("aprs history mutex poisoned");
history.push_back((Instant::now(), rig_id, pkt));
@@ -227,7 +230,7 @@ fn record_hf_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
}
let rig_id = pkt.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.hf_aprs_history
.decode_history.hf_aprs
.lock()
.expect("hf_aprs history mutex poisoned");
history.push_back((Instant::now(), rig_id, pkt));
@@ -237,7 +240,7 @@ fn record_hf_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) {
let rig_id = event.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.cw_history
.decode_history.cw
.lock()
.expect("cw history mutex poisoned");
history.push_back((Instant::now(), rig_id, event));
@@ -247,7 +250,7 @@ fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) {
fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ft8_history
.decode_history.ft8
.lock()
.expect("ft8 history mutex poisoned");
history.push_back((Instant::now(), rig_id, msg));
@@ -257,7 +260,7 @@ fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) {
fn record_ft4(context: &FrontendRuntimeContext, msg: Ft8Message) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ft4_history
.decode_history.ft4
.lock()
.expect("ft4 history mutex poisoned");
history.push_back((Instant::now(), rig_id, msg));
@@ -267,7 +270,7 @@ fn record_ft4(context: &FrontendRuntimeContext, msg: Ft8Message) {
fn record_ft2(context: &FrontendRuntimeContext, msg: Ft8Message) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ft2_history
.decode_history.ft2
.lock()
.expect("ft2 history mutex poisoned");
history.push_back((Instant::now(), rig_id, msg));
@@ -277,7 +280,7 @@ fn record_ft2(context: &FrontendRuntimeContext, msg: Ft8Message) {
fn record_wspr(context: &FrontendRuntimeContext, msg: WsprMessage) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.wspr_history
.decode_history.wspr
.lock()
.expect("wspr history mutex poisoned");
history.push_back((Instant::now(), rig_id, msg));
@@ -298,7 +301,7 @@ pub fn snapshot_aprs_history(
rig_filter: Option<&str>,
) -> Vec<AprsPacket> {
let mut history = context
.aprs_history
.decode_history.aprs
.lock()
.expect("aprs history mutex poisoned");
prune_aprs_history(context, &mut history);
@@ -314,7 +317,7 @@ pub fn snapshot_hf_aprs_history(
rig_filter: Option<&str>,
) -> Vec<AprsPacket> {
let mut history = context
.hf_aprs_history
.decode_history.hf_aprs
.lock()
.expect("hf_aprs history mutex poisoned");
prune_hf_aprs_history(context, &mut history);
@@ -337,7 +340,7 @@ pub fn snapshot_ais_history(
rig_filter: Option<&str>,
) -> Vec<AisMessage> {
let mut history = context
.ais_history
.decode_history.ais
.lock()
.expect("ais history mutex poisoned");
prune_ais_history(context, &mut history);
@@ -359,7 +362,7 @@ pub fn snapshot_vdes_history(
rig_filter: Option<&str>,
) -> Vec<VdesMessage> {
let mut history = context
.vdes_history
.decode_history.vdes
.lock()
.expect("vdes history mutex poisoned");
prune_vdes_history(context, &mut history);
@@ -375,7 +378,7 @@ pub fn snapshot_cw_history(
rig_filter: Option<&str>,
) -> Vec<CwEvent> {
let mut history = context
.cw_history
.decode_history.cw
.lock()
.expect("cw history mutex poisoned");
prune_cw_history(context, &mut history);
@@ -391,7 +394,7 @@ pub fn snapshot_ft8_history(
rig_filter: Option<&str>,
) -> Vec<Ft8Message> {
let mut history = context
.ft8_history
.decode_history.ft8
.lock()
.expect("ft8 history mutex poisoned");
prune_ft8_history(context, &mut history);
@@ -407,7 +410,7 @@ pub fn snapshot_ft4_history(
rig_filter: Option<&str>,
) -> Vec<Ft8Message> {
let mut history = context
.ft4_history
.decode_history.ft4
.lock()
.expect("ft4 history mutex poisoned");
prune_ft4_history(context, &mut history);
@@ -423,7 +426,7 @@ pub fn snapshot_ft2_history(
rig_filter: Option<&str>,
) -> Vec<Ft8Message> {
let mut history = context
.ft2_history
.decode_history.ft2
.lock()
.expect("ft2 history mutex poisoned");
prune_ft2_history(context, &mut history);
@@ -439,7 +442,7 @@ pub fn snapshot_wspr_history(
rig_filter: Option<&str>,
) -> Vec<WsprMessage> {
let mut history = context
.wspr_history
.decode_history.wspr
.lock()
.expect("wspr history mutex poisoned");
prune_wspr_history(context, &mut history);
@@ -452,7 +455,7 @@ pub fn snapshot_wspr_history(
pub fn clear_aprs_history(context: &FrontendRuntimeContext) {
let mut history = context
.aprs_history
.decode_history.aprs
.lock()
.expect("aprs history mutex poisoned");
history.clear();
@@ -460,7 +463,7 @@ pub fn clear_aprs_history(context: &FrontendRuntimeContext) {
pub fn clear_hf_aprs_history(context: &FrontendRuntimeContext) {
let mut history = context
.hf_aprs_history
.decode_history.hf_aprs
.lock()
.expect("hf_aprs history mutex poisoned");
history.clear();
@@ -468,7 +471,7 @@ pub fn clear_hf_aprs_history(context: &FrontendRuntimeContext) {
pub fn clear_ais_history(context: &FrontendRuntimeContext) {
let mut history = context
.ais_history
.decode_history.ais
.lock()
.expect("ais history mutex poisoned");
history.clear();
@@ -476,7 +479,7 @@ pub fn clear_ais_history(context: &FrontendRuntimeContext) {
pub fn clear_vdes_history(context: &FrontendRuntimeContext) {
let mut history = context
.vdes_history
.decode_history.vdes
.lock()
.expect("vdes history mutex poisoned");
history.clear();
@@ -484,7 +487,7 @@ pub fn clear_vdes_history(context: &FrontendRuntimeContext) {
pub fn clear_cw_history(context: &FrontendRuntimeContext) {
let mut history = context
.cw_history
.decode_history.cw
.lock()
.expect("cw history mutex poisoned");
history.clear();
@@ -492,7 +495,7 @@ pub fn clear_cw_history(context: &FrontendRuntimeContext) {
pub fn clear_ft8_history(context: &FrontendRuntimeContext) {
let mut history = context
.ft8_history
.decode_history.ft8
.lock()
.expect("ft8 history mutex poisoned");
history.clear();
@@ -500,7 +503,7 @@ pub fn clear_ft8_history(context: &FrontendRuntimeContext) {
pub fn clear_ft4_history(context: &FrontendRuntimeContext) {
let mut history = context
.ft4_history
.decode_history.ft4
.lock()
.expect("ft4 history mutex poisoned");
history.clear();
@@ -508,7 +511,7 @@ pub fn clear_ft4_history(context: &FrontendRuntimeContext) {
pub fn clear_ft2_history(context: &FrontendRuntimeContext) {
let mut history = context
.ft2_history
.decode_history.ft2
.lock()
.expect("ft2 history mutex poisoned");
history.clear();
@@ -516,7 +519,7 @@ pub fn clear_ft2_history(context: &FrontendRuntimeContext) {
pub fn clear_wspr_history(context: &FrontendRuntimeContext) {
let mut history = context
.wspr_history
.decode_history.wspr
.lock()
.expect("wspr history mutex poisoned");
history.clear();
@@ -525,7 +528,7 @@ pub fn clear_wspr_history(context: &FrontendRuntimeContext) {
pub fn subscribe_decode(
context: &FrontendRuntimeContext,
) -> Option<broadcast::Receiver<DecodedMessage>> {
context.decode_rx.as_ref().map(|tx| tx.subscribe())
context.audio.decode_rx.as_ref().map(|tx| tx.subscribe())
}
pub fn start_decode_history_collector(context: Arc<FrontendRuntimeContext>) {
@@ -536,7 +539,7 @@ pub fn start_decode_history_collector(context: Arc<FrontendRuntimeContext>) {
return;
}
let Some(tx) = context.decode_rx.as_ref().cloned() else {
let Some(tx) = context.audio.decode_rx.as_ref().cloned() else {
return;
};
@@ -576,7 +579,7 @@ pub async fn audio_ws(
query: web::Query<AudioQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
let Some(tx_sender) = context.audio_tx.as_ref().cloned() else {
let Some(tx_sender) = context.audio.tx.as_ref().cloned() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
@@ -596,14 +599,14 @@ pub async fn audio_ws(
let info_rx = if let Some(ref remote) = query.remote {
context.rig_audio_info_rx(remote)
} else {
context.audio_info.as_ref().cloned()
context.audio.info.as_ref().cloned()
};
let Some(info_rx) = info_rx else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
let deadline = Instant::now() + Duration::from_secs(2);
let rx_sub = loop {
match context.vchan_audio.read() {
match context.vchan.audio.read() {
Ok(map) => {
if let Some(tx) = map.get(&ch_id) {
break tx.subscribe();
@@ -639,10 +642,10 @@ pub async fn audio_ws(
};
(rx_sub, info_rx)
} else {
let Some(info_rx) = context.audio_info.as_ref().cloned() else {
let Some(info_rx) = context.audio.info.as_ref().cloned() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
let Some(rx) = context.audio_rx.as_ref() else {
let Some(rx) = context.audio.rx.as_ref() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
(rx.subscribe(), info_rx)
@@ -651,7 +654,7 @@ pub async fn audio_ws(
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
let audio_clients = context.audio_clients.clone();
let audio_clients = context.audio.clients.clone();
audio_clients.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
actix_web::rt::spawn(async move {
@@ -5,7 +5,8 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::Duration;
use actix_web::{delete, get, put, web, HttpResponse, Responder};
@@ -115,18 +116,18 @@ impl BackgroundDecodeStore {
.unwrap_or_else(|| PathBuf::from("background_decode.db"))
}
pub fn get(&self, rig_id: &str) -> Option<BackgroundDecodeConfig> {
let db = self.db.read().unwrap_or_else(|e| e.into_inner());
pub async fn get(&self, rig_id: &str) -> Option<BackgroundDecodeConfig> {
let db = self.db.read().await;
db.get::<BackgroundDecodeConfig>(&format!("bgd:{rig_id}"))
}
pub fn upsert(&self, config: &BackgroundDecodeConfig) -> bool {
let mut db = self.db.write().unwrap_or_else(|e| e.into_inner());
pub async fn upsert(&self, config: &BackgroundDecodeConfig) -> bool {
let mut db = self.db.write().await;
db.set(&format!("bgd:{}", config.rig_id), config).is_ok()
}
pub fn remove(&self, rig_id: &str) -> bool {
let mut db = self.db.write().unwrap_or_else(|e| e.into_inner());
pub async fn remove(&self, rig_id: &str) -> bool {
let mut db = self.db.write().await;
db.rem(&format!("bgd:{rig_id}")).unwrap_or(false)
}
}
@@ -171,9 +172,10 @@ impl BackgroundDecodeManager {
});
}
pub fn get_config(&self, rig_id: &str) -> BackgroundDecodeConfig {
pub async fn get_config(&self, rig_id: &str) -> BackgroundDecodeConfig {
self.store
.get(rig_id)
.await
.unwrap_or_else(|| BackgroundDecodeConfig {
rig_id: rig_id.to_string(),
enabled: false,
@@ -181,9 +183,9 @@ impl BackgroundDecodeManager {
})
}
pub fn put_config(&self, mut config: BackgroundDecodeConfig) -> Option<BackgroundDecodeConfig> {
pub async fn put_config(&self, mut config: BackgroundDecodeConfig) -> Option<BackgroundDecodeConfig> {
config.bookmark_ids = dedup_ids(&config.bookmark_ids);
if self.store.upsert(&config) {
if self.store.upsert(&config).await {
self.trigger();
Some(config)
} else {
@@ -191,19 +193,20 @@ impl BackgroundDecodeManager {
}
}
pub fn reset_config(&self, rig_id: &str) -> bool {
let removed = self.store.remove(rig_id);
pub async fn reset_config(&self, rig_id: &str) -> bool {
let removed = self.store.remove(rig_id).await;
self.trigger();
removed
}
pub fn status(&self, rig_id: &str) -> BackgroundDecodeStatus {
if let Ok(status) = self.status.read() {
pub async fn status(&self, rig_id: &str) -> BackgroundDecodeStatus {
{
let status = self.status.read().await;
if let Some(entry) = status.get(rig_id) {
return entry.clone();
}
}
let cfg = self.get_config(rig_id);
let cfg = self.get_config(rig_id).await;
let bookmarks: HashMap<String, Bookmark> = self
.bookmarks
.list_for_rig(rig_id)
@@ -243,7 +246,8 @@ impl BackgroundDecodeManager {
fn active_rig_id(&self) -> Option<String> {
self.context
.remote_active_rig_id
.routing
.active_rig_id
.lock()
.ok()
.and_then(|guard| guard.clone())
@@ -252,7 +256,7 @@ impl BackgroundDecodeManager {
fn send_audio_cmd(&self, cmd: VChanAudioCmd) {
// Route through per-rig sender when available.
if let Some(rig_id) = self.active_rig_id() {
if let Ok(map) = self.context.rig_vchan_audio_cmd.read() {
if let Ok(map) = self.context.vchan.rig_audio_cmd.read() {
if let Some(tx) = map.get(&rig_id) {
let _ = tx.try_send(cmd);
return;
@@ -260,7 +264,7 @@ impl BackgroundDecodeManager {
}
}
// Fall back to global sender.
if let Ok(guard) = self.context.vchan_audio_cmd.lock() {
if let Ok(guard) = self.context.vchan.audio_cmd.lock() {
if let Some(tx) = guard.as_ref() {
let _ = tx.try_send(cmd);
}
@@ -316,15 +320,14 @@ impl BackgroundDecodeManager {
.any(|channel| channel_matches_bookmark(&channel, bookmark))
}
fn reconcile(&self, runtime: &mut BackgroundRuntimeState, spectrum: &SharedSpectrum) {
async fn reconcile(&self, runtime: &mut BackgroundRuntimeState, spectrum: &SharedSpectrum) {
let active_rig_id = self.active_rig_id();
if runtime.current_rig_id != active_rig_id {
if let Some(prev_rig_id) = runtime.current_rig_id.clone() {
if let Ok(mut guard) = self.status.write() {
if let Some(prev_status) = guard.get_mut(&prev_rig_id) {
prev_status.active_rig = false;
}
let mut guard = self.status.write().await;
if let Some(prev_status) = guard.get_mut(&prev_rig_id) {
prev_status.active_rig = false;
}
}
self.clear_runtime_channels(runtime);
@@ -335,7 +338,7 @@ impl BackgroundDecodeManager {
};
runtime.current_rig_id = Some(rig_id.clone());
let config = self.get_config(&rig_id);
let config = self.get_config(&rig_id).await;
let selected = dedup_ids(&config.bookmark_ids);
let users_connected = self.context.sse_clients.load(Ordering::Relaxed) > 0;
let scheduler_has_control = self.scheduler_control.scheduler_allowed() && users_connected;
@@ -467,19 +470,18 @@ impl BackgroundDecodeManager {
runtime.active_channels.insert(bookmark_id, desired);
}
if let Ok(mut guard) = self.status.write() {
guard.insert(
rig_id.clone(),
BackgroundDecodeStatus {
rig_id,
enabled: config.enabled,
active_rig: true,
center_hz,
sample_rate,
entries: statuses,
},
);
}
let mut guard = self.status.write().await;
guard.insert(
rig_id.clone(),
BackgroundDecodeStatus {
rig_id,
enabled: config.enabled,
active_rig: true,
center_hz,
sample_rate,
entries: statuses,
},
);
}
fn scheduler_bookmark_ids(&self, rig_id: &str) -> Vec<String> {
@@ -513,7 +515,7 @@ impl BackgroundDecodeManager {
loop {
let users_connected = self.context.sse_clients.load(Ordering::Relaxed) > 0;
if users_connected && spectrum_rx.is_none() {
spectrum_rx = Some(self.context.spectrum.subscribe());
spectrum_rx = Some(self.context.spectrum.sender.subscribe());
} else if !users_connected {
spectrum_rx = None;
}
@@ -522,7 +524,7 @@ impl BackgroundDecodeManager {
.as_ref()
.map(|rx| rx.borrow().clone())
.unwrap_or_default();
self.reconcile(&mut runtime, &spectrum);
self.reconcile(&mut runtime, &spectrum).await;
tokio::select! {
changed = async {
match spectrum_rx.as_mut() {
@@ -599,7 +601,7 @@ pub async fn get_background_decode(
path: web::Path<String>,
manager: web::Data<Arc<BackgroundDecodeManager>>,
) -> impl Responder {
HttpResponse::Ok().json(manager.get_config(&path.into_inner()))
HttpResponse::Ok().json(manager.get_config(&path.into_inner()).await)
}
#[put("/background-decode/{rig_id}")]
@@ -611,7 +613,7 @@ pub async fn put_background_decode(
let rig_id = path.into_inner();
let mut config = body.into_inner();
config.rig_id = rig_id;
match manager.put_config(config) {
match manager.put_config(config).await {
Some(saved) => HttpResponse::Ok().json(saved),
None => HttpResponse::InternalServerError().body("failed to save background decode config"),
}
@@ -623,7 +625,7 @@ pub async fn delete_background_decode(
manager: web::Data<Arc<BackgroundDecodeManager>>,
) -> impl Responder {
let rig_id = path.into_inner();
manager.reset_config(&rig_id);
manager.reset_config(&rig_id).await;
HttpResponse::Ok().json(BackgroundDecodeConfig {
rig_id,
enabled: false,
@@ -636,5 +638,5 @@ pub async fn get_background_decode_status(
path: web::Path<String>,
manager: web::Data<Arc<BackgroundDecodeManager>>,
) -> impl Responder {
HttpResponse::Ok().json(manager.status(&path.into_inner()))
HttpResponse::Ok().json(manager.status(&path.into_inner()).await)
}
@@ -73,6 +73,7 @@ async fn serve(
// Collect rig IDs for per-rig store initialisation / migration.
let rig_ids: Vec<String> = context
.routing
.remote_rigs
.lock()
.unwrap_or_else(|e| e.into_inner())
@@ -98,7 +99,7 @@ async fn serve(
let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path));
let vchan_mgr = Arc::new(ClientChannelManager::new(
4,
context.rig_vchan_audio_cmd.clone(),
context.vchan.rig_audio_cmd.clone(),
));
let session_rig_mgr = Arc::new(api::SessionRigManager::default());
let background_decode_mgr = BackgroundDecodeManager::new(
@@ -113,7 +114,7 @@ async fn serve(
// Wire the audio-command sender so allocate/delete/freq/mode operations on
// virtual channels are forwarded to the audio-client task.
if let Ok(guard) = context.vchan_audio_cmd.lock() {
if let Ok(guard) = context.vchan.audio_cmd.lock() {
if let Some(tx) = guard.as_ref() {
vchan_mgr.set_audio_cmd(tx.clone());
}
@@ -121,7 +122,7 @@ async fn serve(
// Spawn a task that removes channels destroyed server-side (OOB) from the
// client-side registry so the SSE channel list stays in sync.
if let Some(ref destroyed_tx) = context.vchan_destroyed {
if let Some(ref destroyed_tx) = context.vchan.destroyed {
let mut destroyed_rx = destroyed_tx.subscribe();
let mgr_for_destroyed = vchan_mgr.clone();
tokio::spawn(async move {
@@ -193,18 +194,18 @@ fn build_server(
let background_decode_mgr = web::Data::new(background_decode_mgr);
// Extract auth config values before moving context
let same_site = match context.http_auth_cookie_same_site.as_str() {
let same_site = match context.http_auth.cookie_same_site.as_str() {
"Strict" => SameSite::Strict,
"None" => SameSite::None,
_ => SameSite::Lax, // default
};
let auth_config = AuthConfig::new(
context.http_auth_enabled,
context.http_auth_rx_passphrase.clone(),
context.http_auth_control_passphrase.clone(),
context.http_auth_tx_access_control_enabled,
Duration::from_secs(context.http_auth_session_ttl_secs),
context.http_auth_cookie_secure,
context.http_auth.enabled,
context.http_auth.rx_passphrase.clone(),
context.http_auth.control_passphrase.clone(),
context.http_auth.tx_access_control_enabled,
Duration::from_secs(context.http_auth.session_ttl_secs),
context.http_auth.cookie_secure,
same_site,
);
+56 -8
View File
@@ -11,6 +11,25 @@ use std::time::Duration;
use crate::rig::response::RigError;
/// Apply ±25% jitter to a duration to prevent thundering herd on reconnect.
fn apply_jitter(delay: Duration) -> Duration {
// Simple deterministic-ish jitter using the current instant's low bits.
// We avoid pulling in `rand` for this single use.
let nanos = std::time::Instant::now()
.elapsed()
.as_nanos()
.wrapping_add(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos(),
);
// Map to range [0.75, 1.25]
let frac = (nanos % 1000) as f64 / 1000.0; // 0.0 .. 1.0
let factor = 0.75 + frac * 0.5; // 0.75 .. 1.25
Duration::from_secs_f64(delay.as_secs_f64() * factor)
}
/// Policy for retrying failed operations.
pub trait RetryPolicy: Send + Sync {
/// Determine if the operation should be retried.
@@ -72,7 +91,8 @@ impl RetryPolicy for ExponentialBackoff {
fn delay(&self, attempt: u32) -> Duration {
let multiplier = 2u32.saturating_pow(attempt);
let delay = self.base_delay.saturating_mul(multiplier);
delay.min(self.max_delay)
let capped = delay.min(self.max_delay);
apply_jitter(capped)
}
fn max_attempts(&self) -> u32 {
@@ -235,13 +255,41 @@ mod tests {
fn test_exponential_backoff_delays() {
let policy = ExponentialBackoff::new(5, Duration::from_millis(100), Duration::from_secs(1));
assert_eq!(policy.delay(0), Duration::from_millis(100));
assert_eq!(policy.delay(1), Duration::from_millis(200));
assert_eq!(policy.delay(2), Duration::from_millis(400));
assert_eq!(policy.delay(3), Duration::from_millis(800));
// Should cap at max_delay
assert_eq!(policy.delay(4), Duration::from_secs(1));
assert_eq!(policy.delay(5), Duration::from_secs(1));
// Delays include ±25% jitter, so check they fall in the expected range.
let check = |attempt: u32, base_ms: u64| {
let d = policy.delay(attempt);
let lo = Duration::from_secs_f64(base_ms as f64 * 0.75 / 1000.0);
let hi = Duration::from_secs_f64(base_ms as f64 * 1.25 / 1000.0);
assert!(
d >= lo && d <= hi,
"attempt {}: {:?} not in [{:?}, {:?}]",
attempt,
d,
lo,
hi
);
};
check(0, 100);
check(1, 200);
check(2, 400);
check(3, 800);
// Should cap at max_delay (1s) before jitter
check(4, 1000);
check(5, 1000);
}
#[test]
fn test_exponential_backoff_jitter_varies() {
// Two calls should (almost always) produce different values,
// confirming jitter is applied.
let policy = ExponentialBackoff::new(5, Duration::from_millis(100), Duration::from_secs(1));
let d1 = policy.delay(2);
std::thread::sleep(Duration::from_micros(10));
let d2 = policy.delay(2);
// With nanosecond-based jitter they should differ; if not,
// the test is still valid — it just means the same instant was sampled.
let _ = (d1, d2); // no assertion — this is a smoke test
}
#[test]
+146 -141
View File
@@ -10,152 +10,157 @@ use trx_core::rig::command::RigCommand;
use crate::codec::{mode_to_string, parse_mode};
use crate::types::ClientCommand;
/// Convert a ClientCommand to a RigCommand.
/// Generates `client_command_to_rig` and `rig_command_to_client` from a
/// single definition table, eliminating the mechanical duplication of
/// mapping every variant by hand.
///
/// This maps client-side commands to internal rig commands, parsing
/// mode strings into RigMode values.
pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand {
match cmd {
ClientCommand::GetRigs => {
unreachable!("GetRigs is handled in the listener before reaching rig_task")
/// Supported row forms (each section is introduced by a keyword):
///
/// - **`client_only:`** `Name, ...;`
/// Variants that exist only in `ClientCommand` with no `RigCommand`
/// counterpart. `client_command_to_rig` panics if called with one.
///
/// - **`unit:`** `ClientName <=> RigName, ...;`
/// Unit variant on both sides, same or different names.
///
/// - **`field:`** `Name { field } <=> Name, ...;`
/// Client struct with one named field mapped to a rig tuple variant.
///
/// - **`multi:`** `Name { a, b } <=> Name, ...;`
/// Both sides use named fields with the same field names.
///
/// - **`freq:`** `Name { field } <=> Name, ...;`
/// Client `u64` field converted to/from `Freq { hz }`.
///
/// - **`mode:`** `Name { field } <=> Name, ...;`
/// Client `String` field converted to/from `RigMode` via
/// `parse_mode`/`mode_to_string`.
macro_rules! define_command_mapping {
(
client_only: $( $co:ident ),* ;
unit: $( $cu:ident <=> $ru:ident ),* ;
field: $( $cf:ident { $fld:ident } <=> $rf:ident ),* ;
multi: $( $cs:ident { $( $sfld:ident ),+ } <=> $rs:ident ),* ;
freq: $( $cfq:ident { $ffld:ident } <=> $rfq:ident ),* ;
mode: $( $cm:ident { $mfld:ident } <=> $rm:ident ),* ;
) => {
/// Convert a [`ClientCommand`] to a [`RigCommand`].
///
/// # Panics
///
/// Panics if called with a client-only command (e.g. `GetRigs`,
/// `GetSatPasses`) that has no `RigCommand` counterpart. Those
/// commands must be intercepted by the caller before reaching this
/// function.
pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand {
match cmd {
// Client-only variants -- no RigCommand equivalent.
$(
ClientCommand::$co => {
panic!(
"{} has no RigCommand mapping; \
it must be handled before reaching rig_task",
stringify!($co),
);
}
)*
// Unit <=> Unit
$( ClientCommand::$cu => RigCommand::$ru, )*
// Single-field struct <=> tuple
$( ClientCommand::$cf { $fld } => RigCommand::$rf($fld), )*
// Multi-field struct passthrough
$( ClientCommand::$cs { $( $sfld ),+ } => RigCommand::$rs { $( $sfld ),+ }, )*
// Freq conversion (u64 => Freq)
$( ClientCommand::$cfq { $ffld } => RigCommand::$rfq(Freq { hz: $ffld }), )*
// Mode conversion (String => RigMode)
$( ClientCommand::$cm { $mfld } => RigCommand::$rm(parse_mode(&$mfld)), )*
}
}
ClientCommand::GetSatPasses => {
unreachable!("GetSatPasses is handled in the listener before reaching rig_task")
/// Convert a [`RigCommand`] back to a [`ClientCommand`].
///
/// This is the inverse of [`client_command_to_rig`], converting
/// `RigMode` values back to mode strings.
pub fn rig_command_to_client(cmd: RigCommand) -> ClientCommand {
match cmd {
// Unit <=> Unit
$( RigCommand::$ru => ClientCommand::$cu, )*
// Single-field struct <=> tuple
$( RigCommand::$rf($fld) => ClientCommand::$cf { $fld }, )*
// Multi-field struct passthrough
$( RigCommand::$rs { $( $sfld ),+ } => ClientCommand::$cs { $( $sfld ),+ }, )*
// Freq conversion (Freq => u64)
$( RigCommand::$rfq(freq) => ClientCommand::$cfq { $ffld: freq.hz }, )*
// Mode conversion (RigMode => String)
$( RigCommand::$rm(mode) => ClientCommand::$cm {
$mfld: mode_to_string(&mode).into_owned(),
}, )*
}
}
ClientCommand::GetState => RigCommand::GetSnapshot,
ClientCommand::SetFreq { freq_hz } => RigCommand::SetFreq(Freq { hz: freq_hz }),
ClientCommand::SetCenterFreq { freq_hz } => RigCommand::SetCenterFreq(Freq { hz: freq_hz }),
ClientCommand::SetMode { mode } => RigCommand::SetMode(parse_mode(&mode)),
ClientCommand::SetPtt { ptt } => RigCommand::SetPtt(ptt),
ClientCommand::PowerOn => RigCommand::PowerOn,
ClientCommand::PowerOff => RigCommand::PowerOff,
ClientCommand::ToggleVfo => RigCommand::ToggleVfo,
ClientCommand::Lock => RigCommand::Lock,
ClientCommand::Unlock => RigCommand::Unlock,
ClientCommand::GetTxLimit => RigCommand::GetTxLimit,
ClientCommand::SetTxLimit { limit } => RigCommand::SetTxLimit(limit),
ClientCommand::SetAprsDecodeEnabled { enabled } => {
RigCommand::SetAprsDecodeEnabled(enabled)
}
ClientCommand::SetHfAprsDecodeEnabled { enabled } => {
RigCommand::SetHfAprsDecodeEnabled(enabled)
}
ClientCommand::SetCwDecodeEnabled { enabled } => RigCommand::SetCwDecodeEnabled(enabled),
ClientCommand::SetCwAuto { enabled } => RigCommand::SetCwAuto(enabled),
ClientCommand::SetCwWpm { wpm } => RigCommand::SetCwWpm(wpm),
ClientCommand::SetCwToneHz { tone_hz } => RigCommand::SetCwToneHz(tone_hz),
ClientCommand::SetFt8DecodeEnabled { enabled } => RigCommand::SetFt8DecodeEnabled(enabled),
ClientCommand::SetFt4DecodeEnabled { enabled } => RigCommand::SetFt4DecodeEnabled(enabled),
ClientCommand::SetFt2DecodeEnabled { enabled } => RigCommand::SetFt2DecodeEnabled(enabled),
ClientCommand::SetWsprDecodeEnabled { enabled } => {
RigCommand::SetWsprDecodeEnabled(enabled)
}
ClientCommand::ResetAprsDecoder => RigCommand::ResetAprsDecoder,
ClientCommand::ResetHfAprsDecoder => RigCommand::ResetHfAprsDecoder,
ClientCommand::ResetCwDecoder => RigCommand::ResetCwDecoder,
ClientCommand::ResetFt8Decoder => RigCommand::ResetFt8Decoder,
ClientCommand::ResetFt4Decoder => RigCommand::ResetFt4Decoder,
ClientCommand::ResetFt2Decoder => RigCommand::ResetFt2Decoder,
ClientCommand::ResetWsprDecoder => RigCommand::ResetWsprDecoder,
ClientCommand::SetLrptDecodeEnabled { enabled } => {
RigCommand::SetLrptDecodeEnabled(enabled)
}
ClientCommand::ResetLrptDecoder => RigCommand::ResetLrptDecoder,
ClientCommand::SetBandwidth { bandwidth_hz } => RigCommand::SetBandwidth(bandwidth_hz),
ClientCommand::SetSdrGain { gain_db } => RigCommand::SetSdrGain(gain_db),
ClientCommand::SetSdrLnaGain { gain_db } => RigCommand::SetSdrLnaGain(gain_db),
ClientCommand::SetSdrAgc { enabled } => RigCommand::SetSdrAgc(enabled),
ClientCommand::SetSdrSquelch {
enabled,
threshold_db,
} => RigCommand::SetSdrSquelch {
enabled,
threshold_db,
},
ClientCommand::SetSdrNoiseBlanker { enabled, threshold } => {
RigCommand::SetSdrNoiseBlanker { enabled, threshold }
}
ClientCommand::SetWfmDeemphasis { deemphasis_us } => {
RigCommand::SetWfmDeemphasis(deemphasis_us)
}
ClientCommand::SetWfmStereo { enabled } => RigCommand::SetWfmStereo(enabled),
ClientCommand::SetWfmDenoise { level } => RigCommand::SetWfmDenoise(level),
ClientCommand::SetSamStereoWidth { width } => RigCommand::SetSamStereoWidth(width),
ClientCommand::SetSamCarrierSync { enabled } => RigCommand::SetSamCarrierSync(enabled),
ClientCommand::GetSpectrum => RigCommand::GetSpectrum,
}
};
}
/// Convert a RigCommand back to a ClientCommand.
///
/// This is the inverse of client_command_to_rig, converting RigMode
/// values back to mode strings.
pub fn rig_command_to_client(cmd: RigCommand) -> ClientCommand {
match cmd {
RigCommand::GetSnapshot => ClientCommand::GetState,
RigCommand::SetFreq(freq) => ClientCommand::SetFreq { freq_hz: freq.hz },
RigCommand::SetCenterFreq(freq) => ClientCommand::SetCenterFreq { freq_hz: freq.hz },
RigCommand::SetMode(mode) => ClientCommand::SetMode {
mode: mode_to_string(&mode).into_owned(),
},
RigCommand::SetPtt(ptt) => ClientCommand::SetPtt { ptt },
RigCommand::PowerOn => ClientCommand::PowerOn,
RigCommand::PowerOff => ClientCommand::PowerOff,
RigCommand::ToggleVfo => ClientCommand::ToggleVfo,
RigCommand::Lock => ClientCommand::Lock,
RigCommand::Unlock => ClientCommand::Unlock,
RigCommand::GetTxLimit => ClientCommand::GetTxLimit,
RigCommand::SetTxLimit(limit) => ClientCommand::SetTxLimit { limit },
RigCommand::SetAprsDecodeEnabled(enabled) => {
ClientCommand::SetAprsDecodeEnabled { enabled }
}
RigCommand::SetHfAprsDecodeEnabled(enabled) => {
ClientCommand::SetHfAprsDecodeEnabled { enabled }
}
RigCommand::SetCwDecodeEnabled(enabled) => ClientCommand::SetCwDecodeEnabled { enabled },
RigCommand::SetCwAuto(enabled) => ClientCommand::SetCwAuto { enabled },
RigCommand::SetCwWpm(wpm) => ClientCommand::SetCwWpm { wpm },
RigCommand::SetCwToneHz(tone_hz) => ClientCommand::SetCwToneHz { tone_hz },
RigCommand::SetFt8DecodeEnabled(enabled) => ClientCommand::SetFt8DecodeEnabled { enabled },
RigCommand::SetFt4DecodeEnabled(enabled) => ClientCommand::SetFt4DecodeEnabled { enabled },
RigCommand::SetFt2DecodeEnabled(enabled) => ClientCommand::SetFt2DecodeEnabled { enabled },
RigCommand::SetWsprDecodeEnabled(enabled) => {
ClientCommand::SetWsprDecodeEnabled { enabled }
}
RigCommand::ResetAprsDecoder => ClientCommand::ResetAprsDecoder,
RigCommand::ResetHfAprsDecoder => ClientCommand::ResetHfAprsDecoder,
RigCommand::ResetCwDecoder => ClientCommand::ResetCwDecoder,
RigCommand::ResetFt8Decoder => ClientCommand::ResetFt8Decoder,
RigCommand::ResetFt4Decoder => ClientCommand::ResetFt4Decoder,
RigCommand::ResetFt2Decoder => ClientCommand::ResetFt2Decoder,
RigCommand::ResetWsprDecoder => ClientCommand::ResetWsprDecoder,
RigCommand::SetLrptDecodeEnabled(enabled) => {
ClientCommand::SetLrptDecodeEnabled { enabled }
}
RigCommand::ResetLrptDecoder => ClientCommand::ResetLrptDecoder,
RigCommand::SetBandwidth(bandwidth_hz) => ClientCommand::SetBandwidth { bandwidth_hz },
RigCommand::SetSdrGain(gain_db) => ClientCommand::SetSdrGain { gain_db },
RigCommand::SetSdrLnaGain(gain_db) => ClientCommand::SetSdrLnaGain { gain_db },
RigCommand::SetSdrAgc(enabled) => ClientCommand::SetSdrAgc { enabled },
RigCommand::SetSdrSquelch {
enabled,
threshold_db,
} => ClientCommand::SetSdrSquelch {
enabled,
threshold_db,
},
RigCommand::SetSdrNoiseBlanker { enabled, threshold } => {
ClientCommand::SetSdrNoiseBlanker { enabled, threshold }
}
RigCommand::SetWfmDeemphasis(deemphasis_us) => {
ClientCommand::SetWfmDeemphasis { deemphasis_us }
}
RigCommand::SetWfmStereo(enabled) => ClientCommand::SetWfmStereo { enabled },
RigCommand::SetWfmDenoise(level) => ClientCommand::SetWfmDenoise { level },
RigCommand::SetSamStereoWidth(width) => ClientCommand::SetSamStereoWidth { width },
RigCommand::SetSamCarrierSync(enabled) => ClientCommand::SetSamCarrierSync { enabled },
RigCommand::GetSpectrum => ClientCommand::GetSpectrum,
}
define_command_mapping! {
// ── Client-only variants (no RigCommand counterpart) ─────────────
client_only: GetRigs, GetSatPasses;
// ── Unit variants (no payload) ───────────────────────────────────
unit:
GetState <=> GetSnapshot,
PowerOn <=> PowerOn,
PowerOff <=> PowerOff,
ToggleVfo <=> ToggleVfo,
Lock <=> Lock,
Unlock <=> Unlock,
GetTxLimit <=> GetTxLimit,
GetSpectrum <=> GetSpectrum,
ResetAprsDecoder <=> ResetAprsDecoder,
ResetHfAprsDecoder <=> ResetHfAprsDecoder,
ResetCwDecoder <=> ResetCwDecoder,
ResetFt8Decoder <=> ResetFt8Decoder,
ResetFt4Decoder <=> ResetFt4Decoder,
ResetFt2Decoder <=> ResetFt2Decoder,
ResetWsprDecoder <=> ResetWsprDecoder,
ResetLrptDecoder <=> ResetLrptDecoder;
// ── Single-field struct <=> tuple ────────────────────────────────
field:
SetPtt { ptt } <=> SetPtt,
SetTxLimit { limit } <=> SetTxLimit,
SetAprsDecodeEnabled { enabled } <=> SetAprsDecodeEnabled,
SetHfAprsDecodeEnabled { enabled } <=> SetHfAprsDecodeEnabled,
SetCwDecodeEnabled { enabled } <=> SetCwDecodeEnabled,
SetCwAuto { enabled } <=> SetCwAuto,
SetCwWpm { wpm } <=> SetCwWpm,
SetCwToneHz { tone_hz } <=> SetCwToneHz,
SetFt8DecodeEnabled { enabled } <=> SetFt8DecodeEnabled,
SetFt4DecodeEnabled { enabled } <=> SetFt4DecodeEnabled,
SetFt2DecodeEnabled { enabled } <=> SetFt2DecodeEnabled,
SetWsprDecodeEnabled { enabled } <=> SetWsprDecodeEnabled,
SetLrptDecodeEnabled { enabled } <=> SetLrptDecodeEnabled,
SetBandwidth { bandwidth_hz } <=> SetBandwidth,
SetSdrGain { gain_db } <=> SetSdrGain,
SetSdrLnaGain { gain_db } <=> SetSdrLnaGain,
SetSdrAgc { enabled } <=> SetSdrAgc,
SetWfmDeemphasis { deemphasis_us } <=> SetWfmDeemphasis,
SetWfmStereo { enabled } <=> SetWfmStereo,
SetWfmDenoise { level } <=> SetWfmDenoise,
SetSamStereoWidth { width } <=> SetSamStereoWidth,
SetSamCarrierSync { enabled } <=> SetSamCarrierSync;
// ── Multi-field struct passthrough ───────────────────────────────
multi:
SetSdrSquelch { enabled, threshold_db } <=> SetSdrSquelch,
SetSdrNoiseBlanker { enabled, threshold } <=> SetSdrNoiseBlanker;
// ── Freq conversions (u64 <=> Freq) ──────────────────────────────
freq:
SetFreq { freq_hz } <=> SetFreq,
SetCenterFreq { freq_hz } <=> SetCenterFreq;
// ── Mode conversion (String <=> RigMode) ─────────────────────────
mode:
SetMode { mode } <=> SetMode;
}
#[cfg(test)]
+144 -100
View File
@@ -54,6 +54,10 @@ const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const LRPT_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
/// Maximum entries per decoder history queue. Prevents unbounded memory growth
/// on busy channels (e.g. AIS near a port). Oldest entries are evicted when
/// the limit is reached, independent of the time-based pruning.
const MAX_HISTORY_ENTRIES: usize = 10_000;
/// Silence timeout before auto-finalising an LRPT pass (30 s without new MCUs).
const LRPT_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30);
const FT8_SAMPLE_RATE: u32 = 12_000;
@@ -143,7 +147,7 @@ impl StreamErrorLogger {
fn log(&self, err: &str) {
let now = Instant::now();
let kind = classify_stream_error(err);
let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
let mut state = lock_or_recover(&self.state, self.label);
// First occurrence or changed error class: log as error once.
if state.last_kind != Some(kind) {
@@ -218,6 +222,24 @@ pub struct DecoderHistories {
total_count: AtomicUsize,
}
/// Acquire a mutex, recovering from poisoning with a warning log.
fn lock_or_recover<T>(mutex: &Mutex<T>, label: &str) -> std::sync::MutexGuard<'_, T> {
mutex.unwrap_or_else(|e| {
tracing::warn!(
"Mutex for {} was poisoned (prior panic); recovering with potentially inconsistent data",
label
);
e.into_inner()
})
}
/// Enforce capacity limit on a history deque by evicting oldest entries.
fn enforce_capacity<T>(deque: &mut VecDeque<T>, max: usize) {
while deque.len() > max {
deque.pop_front();
}
}
impl DecoderHistories {
pub fn new() -> Arc<Self> {
Arc::new(Self {
@@ -279,15 +301,16 @@ impl DecoderHistories {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.ais.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ais, "ais_history");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ais(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ais_history(&self) -> Vec<AisMessage> {
let mut h = self.ais.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ais, "ais_history");
let before = h.len();
Self::prune_ais(&mut h);
self.adjust_total_count(before, h.len());
@@ -311,15 +334,16 @@ impl DecoderHistories {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.vdes.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.vdes, "vdes_history");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_vdes(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_vdes_history(&self) -> Vec<VdesMessage> {
let mut h = self.vdes.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.vdes, "vdes_history");
let before = h.len();
Self::prune_vdes(&mut h);
self.adjust_total_count(before, h.len());
@@ -346,15 +370,16 @@ impl DecoderHistories {
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.aprs.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.aprs, "aprs_history");
let before = h.len();
h.push_back((Instant::now(), pkt));
Self::prune_aprs(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_aprs_history(&self) -> Vec<AprsPacket> {
let mut h = self.aprs.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.aprs, "aprs_history");
let before = h.len();
Self::prune_aprs(&mut h);
self.adjust_total_count(before, h.len());
@@ -362,7 +387,7 @@ impl DecoderHistories {
}
pub fn clear_aprs_history(&self) {
let mut h = self.aprs.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.aprs, "aprs_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -388,15 +413,16 @@ impl DecoderHistories {
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.hf_aprs.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.hf_aprs, "hf_aprs_history");
let before = h.len();
h.push_back((Instant::now(), pkt));
Self::prune_hf_aprs(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_hf_aprs_history(&self) -> Vec<AprsPacket> {
let mut h = self.hf_aprs.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.hf_aprs, "hf_aprs_history");
let before = h.len();
Self::prune_hf_aprs(&mut h);
self.adjust_total_count(before, h.len());
@@ -404,7 +430,7 @@ impl DecoderHistories {
}
pub fn clear_hf_aprs_history(&self) {
let mut h = self.hf_aprs.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.hf_aprs, "hf_aprs_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -424,15 +450,16 @@ impl DecoderHistories {
}
pub fn record_cw_event(&self, evt: CwEvent) {
let mut h = self.cw.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.cw, "cw_history");
let before = h.len();
h.push_back((Instant::now(), evt));
Self::prune_cw(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_cw_history(&self) -> Vec<CwEvent> {
let mut h = self.cw.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.cw, "cw_history");
let before = h.len();
Self::prune_cw(&mut h);
self.adjust_total_count(before, h.len());
@@ -440,7 +467,7 @@ impl DecoderHistories {
}
pub fn clear_cw_history(&self) {
let mut h = self.cw.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.cw, "cw_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -460,15 +487,16 @@ impl DecoderHistories {
}
pub fn record_ft8_message(&self, msg: Ft8Message) {
let mut h = self.ft8.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft8, "ft8_history");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ft8(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ft8_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft8.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft8, "ft8_history");
let before = h.len();
Self::prune_ft8(&mut h);
self.adjust_total_count(before, h.len());
@@ -476,7 +504,7 @@ impl DecoderHistories {
}
pub fn clear_ft8_history(&self) {
let mut h = self.ft8.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft8, "ft8_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -496,15 +524,16 @@ impl DecoderHistories {
}
pub fn record_ft4_message(&self, msg: Ft8Message) {
let mut h = self.ft4.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft4, "ft4_history");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ft4(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ft4_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft4.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft4, "ft4_history");
let before = h.len();
Self::prune_ft4(&mut h);
self.adjust_total_count(before, h.len());
@@ -512,7 +541,7 @@ impl DecoderHistories {
}
pub fn clear_ft4_history(&self) {
let mut h = self.ft4.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft4, "ft4_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -532,15 +561,16 @@ impl DecoderHistories {
}
pub fn record_ft2_message(&self, msg: Ft8Message) {
let mut h = self.ft2.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft2, "ft2_history");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ft2(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ft2_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft2.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft2, "ft2_history");
let before = h.len();
Self::prune_ft2(&mut h);
self.adjust_total_count(before, h.len());
@@ -548,7 +578,7 @@ impl DecoderHistories {
}
pub fn clear_ft2_history(&self) {
let mut h = self.ft2.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.ft2, "ft2_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -568,15 +598,16 @@ impl DecoderHistories {
}
pub fn record_wspr_message(&self, msg: WsprMessage) {
let mut h = self.wspr.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.wspr, "wspr_history");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_wspr(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_wspr_history(&self) -> Vec<WsprMessage> {
let mut h = self.wspr.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.wspr, "wspr_history");
let before = h.len();
Self::prune_wspr(&mut h);
self.adjust_total_count(before, h.len());
@@ -584,7 +615,7 @@ impl DecoderHistories {
}
pub fn clear_wspr_history(&self) {
let mut h = self.wspr.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.wspr, "wspr_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -607,15 +638,16 @@ impl DecoderHistories {
if img.ts_ms.is_none() {
img.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.lrpt.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.lrpt, "lrpt_history");
let before = h.len();
h.push_back((Instant::now(), img));
Self::prune_lrpt(&mut h);
enforce_capacity(&mut h, MAX_HISTORY_ENTRIES);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_lrpt_history(&self) -> Vec<LrptImage> {
let mut h = self.lrpt.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.lrpt, "lrpt_history");
let before = h.len();
Self::prune_lrpt(&mut h);
self.adjust_total_count(before, h.len());
@@ -623,7 +655,7 @@ impl DecoderHistories {
}
pub fn clear_lrpt_history(&self) {
let mut h = self.lrpt.lock().unwrap_or_else(|e| e.into_inner());
let mut h = lock_or_recover(&self.lrpt, "lrpt_history");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
@@ -672,6 +704,74 @@ pub fn spawn_audio_capture(
})
}
/// Map a channel count to an `opus::Channels` value.
fn opus_channels(channels: u16) -> Result<opus::Channels, Box<dyn std::error::Error>> {
match channels {
1 => Ok(opus::Channels::Mono),
2 => Ok(opus::Channels::Stereo),
_ => Err(format!("unsupported channel count: {}", channels).into()),
}
}
/// Look up an audio device by name (or fall back to the default device).
///
/// When `is_input` is true the function searches input devices and falls back
/// to the default input device; otherwise it uses output devices. Returns
/// `None` when the device cannot be found (the caller should retry after a
/// delay).
fn find_device(
host: &cpal::Host,
device_name: &Option<String>,
is_input: bool,
) -> Option<cpal::Device> {
use cpal::traits::{DeviceTrait, HostTrait};
let direction = if is_input { "capture" } else { "playback" };
if let Some(ref name) = device_name {
let devices_result = if is_input {
host.input_devices()
} else {
host.output_devices()
};
match devices_result {
Ok(mut devs) => {
match devs.find(|d| d.name().map(|n| n == *name).unwrap_or(false)) {
Some(d) => Some(d),
None => {
warn!("Audio {}: device '{}' not found, retrying", direction, name);
None
}
}
}
Err(e) => {
warn!(
"Audio {}: failed to enumerate devices, retrying: {}",
direction, e
);
None
}
}
} else {
let default = if is_input {
host.default_input_device()
} else {
host.default_output_device()
};
match default {
Some(d) => Some(d),
None => {
warn!(
"Audio {}: no default {} device, retrying",
direction,
if is_input { "input" } else { "output" }
);
None
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn run_capture(
sample_rate: u32,
@@ -683,7 +783,7 @@ fn run_capture(
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
shutdown_rx: watch::Receiver<bool>,
) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::traits::{DeviceTrait, StreamTrait};
use std::sync::mpsc::{RecvTimeoutError, TryRecvError as StdTryRecvError};
let config = cpal::StreamConfig {
@@ -695,13 +795,9 @@ fn run_capture(
let frame_samples =
(sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize;
let opus_channels = match channels {
1 => opus::Channels::Mono,
2 => opus::Channels::Stereo,
_ => return Err(format!("unsupported channel count: {}", channels).into()),
};
let opus_ch = opus_channels(channels)?;
let mut encoder = opus::Encoder::new(sample_rate, opus_channels, opus::Application::Audio)?;
let mut encoder = opus::Encoder::new(sample_rate, opus_ch, opus::Application::Audio)?;
encoder.set_bitrate(opus::Bitrate::Bits(bitrate_bps as i32))?;
encoder.set_complexity(5)?;
@@ -725,35 +821,11 @@ fn run_capture(
// Re-enumerate the device on every recovery cycle: after POLLERR the
// existing device handle can be stale (especially for USB audio).
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
match host.input_devices() {
Ok(mut devs) => {
match devs.find(|d| d.name().map(|n| n == *name).unwrap_or(false)) {
Some(d) => d,
None => {
warn!("Audio capture: device '{}' not found, retrying", name);
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
}
}
Err(e) => {
warn!(
"Audio capture: failed to enumerate devices, retrying: {}",
e
);
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
}
} else {
match host.default_input_device() {
Some(d) => d,
None => {
warn!("Audio capture: no default input device, retrying");
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
let device = match find_device(&host, &device_name, true) {
Some(d) => d,
None => {
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
};
info!(
@@ -922,13 +994,9 @@ fn run_playback(
let frame_samples =
(sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize;
let opus_channels = match channels {
1 => opus::Channels::Mono,
2 => opus::Channels::Stereo,
_ => return Err(format!("unsupported channel count: {}", channels).into()),
};
let opus_ch = opus_channels(channels)?;
let mut decoder = opus::Decoder::new(sample_rate, opus_channels)?;
let mut decoder = opus::Decoder::new(sample_rate, opus_ch)?;
let ring = std::sync::Arc::new(std::sync::Mutex::new(
std::collections::VecDeque::<f32>::with_capacity(frame_samples * 8),
@@ -952,35 +1020,11 @@ fn run_playback(
// Re-enumerate the device on every recovery cycle: after POLLERR the
// existing device handle can be stale (especially for USB audio).
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
match host.output_devices() {
Ok(mut devs) => {
match devs.find(|d| d.name().map(|n| n == *name).unwrap_or(false)) {
Some(d) => d,
None => {
warn!("Audio playback: device '{}' not found, retrying", name);
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
}
}
Err(e) => {
warn!(
"Audio playback: failed to enumerate devices, retrying: {}",
e
);
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
}
} else {
match host.default_output_device() {
Some(d) => d,
None => {
warn!("Audio playback: no default output device, retrying");
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
let device = match find_device(&host, &device_name, false) {
Some(d) => d,
None => {
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
};
info!(
+34 -23
View File
@@ -15,7 +15,7 @@ use std::net::IpAddr;
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use trx_app::{ConfigError, ConfigFile};
use trx_app::{validate_log_level, validate_tokens, ConfigError, ConfigFile};
pub use trx_decode_log::DecodeLogsConfig;
use trx_core::rig::state::RigMode;
@@ -101,6 +101,8 @@ pub struct ServerConfig {
pub decode_logs: DecodeLogsConfig,
/// SDR pipeline configuration (legacy flat; used when [rig.access] type = "sdr").
pub sdr: SdrConfig,
/// Timeout and buffer-size tuning knobs.
pub timeouts: TimeoutsConfig,
/// Multi-rig instance list. When non-empty, takes priority over the flat fields.
#[serde(rename = "rigs", default)]
pub rigs: Vec<RigInstanceConfig>,
@@ -204,6 +206,37 @@ impl Default for BehaviorConfig {
}
}
/// Timeout and buffer-size tuning knobs.
///
/// All durations are in milliseconds. The defaults match the previously
/// hard-coded values, so existing deployments are unaffected.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct TimeoutsConfig {
/// Maximum time (ms) to wait for a single rig command to complete.
pub command_exec_timeout_ms: u64,
/// Maximum time (ms) for a CAT poll refresh cycle.
pub poll_refresh_timeout_ms: u64,
/// Maximum time (ms) for low-level listener I/O operations (read/write/flush).
pub io_timeout_ms: u64,
/// Maximum time (ms) to wait for a rig command response in the listener.
pub request_timeout_ms: u64,
/// Capacity of the per-rig command channel (number of queued requests).
pub rig_task_channel_buffer: usize,
}
impl Default for TimeoutsConfig {
fn default() -> Self {
Self {
command_exec_timeout_ms: 10_000,
poll_refresh_timeout_ms: 8_000,
io_timeout_ms: 10_000,
request_timeout_ms: 12_000,
rig_task_channel_buffer: 32,
}
}
}
/// TCP listener configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
@@ -763,21 +796,6 @@ impl ServerConfig {
}
}
fn validate_log_level(level: Option<&str>) -> Result<(), String> {
if let Some(level) = level {
match level {
"trace" | "debug" | "info" | "warn" | "error" => {}
_ => {
return Err(format!(
"[general].log_level '{}' is invalid (expected one of: trace, debug, info, warn, error)",
level
))
}
}
}
Ok(())
}
fn validate_coordinates(latitude: Option<f64>, longitude: Option<f64>) -> Result<(), String> {
match (latitude, longitude) {
(Some(lat), Some(lon)) => {
@@ -876,13 +894,6 @@ fn validate_sdr_nb_config(path: &str, nb: &SdrNoiseBlankerConfig) -> Result<(),
Ok(())
}
fn validate_tokens(path: &str, tokens: &[String]) -> Result<(), String> {
if tokens.iter().any(|t| t.trim().is_empty()) {
return Err(format!("{path} must not contain empty tokens"));
}
Ok(())
}
impl ConfigFile for ServerConfig {
fn section_key() -> &'static str {
"trx-server"
+44 -20
View File
@@ -32,9 +32,29 @@ use trx_protocol::ClientResponse;
use crate::rig_handle::RigHandle;
const IO_TIMEOUT: Duration = Duration::from_secs(10);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
/// Fallback I/O timeout used when no config value is provided.
const DEFAULT_IO_TIMEOUT: Duration = Duration::from_secs(10);
/// Fallback request timeout used when no config value is provided.
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
const MAX_JSON_LINE_BYTES: usize = 256 * 1024;
/// Configurable timeout values for the listener, threaded from `[timeouts]`.
#[derive(Debug, Clone, Copy)]
pub struct ListenerTimeouts {
/// Maximum time for low-level I/O operations (read/write/flush).
pub io_timeout: Duration,
/// Maximum time to wait for a rig command response.
pub request_timeout: Duration,
}
impl Default for ListenerTimeouts {
fn default() -> Self {
Self {
io_timeout: DEFAULT_IO_TIMEOUT,
request_timeout: DEFAULT_REQUEST_TIMEOUT,
}
}
}
/// How long to cache satellite pass predictions before recomputing.
/// SGP4 propagation for 200+ satellites is CPU-intensive; caching avoids
/// redundant recomputation when multiple clients request passes concurrently.
@@ -56,6 +76,7 @@ pub async fn run_listener(
default_rig_id: String,
auth_tokens: HashSet<String>,
station_coords: Option<(f64, f64)>,
timeouts: ListenerTimeouts,
mut shutdown_rx: watch::Receiver<bool>,
) -> std::io::Result<()> {
let listener = TcpListener::bind(addr).await?;
@@ -75,8 +96,9 @@ pub async fn run_listener(
let client_shutdown_rx = shutdown_rx.clone();
let coords = station_coords;
let cache = Arc::clone(&sat_pass_cache);
let client_timeouts = timeouts;
tokio::spawn(async move {
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, cache, client_shutdown_rx).await {
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, cache, client_timeouts, client_shutdown_rx).await {
error!("Client {} error: {:?}", peer, e);
}
});
@@ -151,14 +173,15 @@ async fn read_limited_line<R: AsyncBufRead + Unpin>(
async fn send_response(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
response: &ClientResponse,
io_timeout: Duration,
) -> std::io::Result<()> {
let resp_line = serde_json::to_string(response).map_err(std::io::Error::other)? + "\n";
time::timeout(IO_TIMEOUT, writer.write_all(resp_line.as_bytes()))
time::timeout(io_timeout, writer.write_all(resp_line.as_bytes()))
.await
.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::TimedOut, "response write timeout")
})??;
time::timeout(IO_TIMEOUT, writer.flush())
time::timeout(io_timeout, writer.flush())
.await
.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::TimedOut, "response flush timeout")
@@ -174,6 +197,7 @@ async fn handle_client(
validator: Arc<SimpleTokenValidator>,
station_coords: Option<(f64, f64)>,
sat_pass_cache: Arc<Mutex<Option<SatPassCache>>>,
timeouts: ListenerTimeouts,
mut shutdown_rx: watch::Receiver<bool>,
) -> std::io::Result<()> {
let (reader, mut writer) = socket.into_split();
@@ -181,7 +205,7 @@ async fn handle_client(
loop {
let line = tokio::select! {
read = time::timeout(IO_TIMEOUT, read_limited_line(&mut reader, MAX_JSON_LINE_BYTES)) => {
read = time::timeout(timeouts.io_timeout, read_limited_line(&mut reader, MAX_JSON_LINE_BYTES)) => {
match read {
Ok(Ok(line)) => line,
Ok(Err(e)) => return Err(e),
@@ -232,7 +256,7 @@ async fn handle_client(
sat_passes: None,
error: Some(format!("Invalid JSON: {}", e)),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
};
@@ -246,7 +270,7 @@ async fn handle_client(
sat_passes: None,
error: Some(err),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
@@ -279,7 +303,7 @@ async fn handle_client(
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
@@ -341,7 +365,7 @@ async fn handle_client(
sat_passes: Some(result),
error: None,
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
@@ -358,7 +382,7 @@ async fn handle_client(
sat_passes: None,
error: Some(format!("Unknown rig_id: {}", target_rig_id)),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
};
@@ -377,7 +401,7 @@ async fn handle_client(
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
}
@@ -389,7 +413,7 @@ async fn handle_client(
rig_id_override: None,
};
match time::timeout(IO_TIMEOUT, handle.rig_tx.send(req)).await {
match time::timeout(timeouts.io_timeout, handle.rig_tx.send(req)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!(
@@ -404,7 +428,7 @@ async fn handle_client(
sat_passes: None,
error: Some("Internal error: rig task not available".into()),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
Err(_) => {
@@ -416,13 +440,13 @@ async fn handle_client(
sat_passes: None,
error: Some("Internal error: request queue timeout".into()),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
}
match tokio::select! {
result = time::timeout(REQUEST_TIMEOUT, resp_rx) => {
result = time::timeout(timeouts.request_timeout, resp_rx) => {
match result {
Ok(inner) => inner,
Err(_) => {
@@ -434,7 +458,7 @@ async fn handle_client(
sat_passes: None,
error: Some("Request timed out waiting for rig response".into()),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
continue;
}
}
@@ -459,7 +483,7 @@ async fn handle_client(
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
}
Ok(Err(err)) => {
let resp = ClientResponse {
@@ -470,7 +494,7 @@ async fn handle_client(
sat_passes: None,
error: Some(err.message),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
}
Err(e) => {
error!("Rig response oneshot recv error: {:?}", e);
@@ -482,7 +506,7 @@ async fn handle_client(
sat_passes: None,
error: Some("Internal error waiting for rig response".into()),
};
send_response(&mut writer, &resp).await?;
send_response(&mut writer, &resp, timeouts.io_timeout).await?;
}
}
}
+60 -34
View File
@@ -39,7 +39,6 @@ use rig_handle::RigHandle;
use trx_decode_log::DecoderLoggers;
const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - rig server daemon");
const RIG_TASK_CHANNEL_BUFFER: usize = 32;
const RETRY_MAX_DELAY_SECS: u64 = 2;
#[derive(Debug, Parser)]
@@ -322,32 +321,36 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult
));
}
let ais_channel_base_idx = channels.len();
let vdes_channel_idx = channels
.iter()
.position(|(_, mode, _)| matches!(mode, trx_core::rig::state::RigMode::VDES))
.unwrap_or(0);
let sdr_rig = trx_backend::SoapySdrRig::new_with_config(
args,
&channels,
&rig_cfg.sdr.gain.mode,
rig_cfg.sdr.gain.value,
rig_cfg.sdr.gain.max_value,
rig_cfg.audio.sample_rate,
rig_cfg.audio.channels as usize,
rig_cfg.audio.frame_duration_ms,
rig_cfg.sdr.wfm_deemphasis_us,
Freq {
let sdr_rig = trx_backend::SoapySdrRig::new_from_config(trx_backend::SoapySdrConfig {
args: args.to_string(),
channels,
gain_mode: rig_cfg.sdr.gain.mode.clone(),
gain_db: rig_cfg.sdr.gain.value,
max_gain_db: rig_cfg.sdr.gain.max_value,
audio_sample_rate: rig_cfg.audio.sample_rate,
audio_channels: rig_cfg.audio.channels as usize,
frame_duration_ms: rig_cfg.audio.frame_duration_ms,
wfm_deemphasis_us: rig_cfg.sdr.wfm_deemphasis_us,
initial_freq: Freq {
hz: rig_cfg.rig.initial_freq_hz,
},
rig_cfg.rig.initial_mode.clone(),
rig_cfg.sdr.sample_rate,
rig_cfg.sdr.bandwidth,
rig_cfg.sdr.center_offset_hz,
rig_cfg.sdr.squelch.enabled,
rig_cfg.sdr.squelch.threshold_db,
rig_cfg.sdr.squelch.hysteresis_db,
rig_cfg.sdr.squelch.tail_ms,
rig_cfg.sdr.max_virtual_channels,
rig_cfg.sdr.noise_blanker.enabled,
rig_cfg.sdr.noise_blanker.threshold,
)?;
initial_mode: rig_cfg.rig.initial_mode.clone(),
sdr_sample_rate: rig_cfg.sdr.sample_rate,
bandwidth_hz: rig_cfg.sdr.bandwidth,
center_offset_hz: rig_cfg.sdr.center_offset_hz,
squelch_enabled: rig_cfg.sdr.squelch.enabled,
squelch_threshold_db: rig_cfg.sdr.squelch.threshold_db,
squelch_hysteresis_db: rig_cfg.sdr.squelch.hysteresis_db,
squelch_tail_ms: rig_cfg.sdr.squelch.tail_ms,
max_virtual_channels: rig_cfg.sdr.max_virtual_channels,
nb_enabled: rig_cfg.sdr.noise_blanker.enabled,
nb_threshold: rig_cfg.sdr.noise_blanker.threshold,
})?;
let pcm_rx = sdr_rig.subscribe_pcm();
let ais_pcm = (
@@ -357,10 +360,6 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult
// Subscribe to the first channel configured as VDES or MARINE so that the
// IQ tap in ChannelDsp actually fires. Fall back to channel 0 when no
// explicit VDES channel has been configured.
let vdes_channel_idx = channels
.iter()
.position(|(_, mode, _)| matches!(mode, trx_core::rig::state::RigMode::VDES))
.unwrap_or(0);
let vdes_iq = sdr_rig.subscribe_iq_channel(vdes_channel_idx);
// Extract the virtual channel manager before the rig is consumed by Box.
let vchan_manager: trx_core::vchan::SharedVChanManager = sdr_rig.channel_manager();
@@ -384,6 +383,7 @@ fn build_rig_task_config(
longitude: Option<f64>,
registry: Arc<RegistrationContext>,
histories: Arc<DecoderHistories>,
timeouts: &config::TimeoutsConfig,
) -> rig_task::RigTaskConfig {
let pskreporter_status = if rig_cfg.pskreporter.enabled {
let has_locator = rig_cfg.pskreporter.receiver_locator.is_some()
@@ -448,6 +448,8 @@ fn build_rig_task_config(
histories,
vfo_prime: rig_cfg.behavior.vfo_prime,
prebuilt_rig: None,
command_exec_timeout: Duration::from_millis(timeouts.command_exec_timeout_ms),
poll_refresh_timeout: Duration::from_millis(timeouts.poll_refresh_timeout_ms),
}
}
@@ -1028,7 +1030,7 @@ async fn main() -> DynResult<()> {
}
rig_histories_for_flush.push((rig_cfg.id.clone(), histories.clone()));
let (rig_tx, rig_rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
let (rig_tx, rig_rx) = mpsc::channel::<RigRequest>(cfg.timeouts.rig_task_channel_buffer);
let mut initial_state = RigState::new_with_metadata(
callsign.clone(),
Some(env!("CARGO_PKG_VERSION").to_string()),
@@ -1065,6 +1067,7 @@ async fn main() -> DynResult<()> {
longitude,
Arc::clone(&registry),
histories.clone(),
&cfg.timeouts,
);
if let Some(prebuilt) = sdr_prebuilt_rig {
task_config.prebuilt_rig = Some(prebuilt);
@@ -1074,13 +1077,31 @@ async fn main() -> DynResult<()> {
AdaptivePolling::new(Duration::from_millis(100), Duration::from_millis(100));
}
// Spawn rig task.
// Spawn rig task with crash detection.
// If the task panics or returns an error, emit RigMachineState::Error
// on the watch channel so connected clients see the failure instead of
// silently losing the rig.
let rig_shutdown_rx = shutdown_rx.clone();
let rig_id_supervisor = rig_cfg.id.clone();
task_handles.push(tokio::spawn(async move {
if let Err(e) =
rig_task::run_rig_task(task_config, rig_rx, state_tx, rig_shutdown_rx).await
{
error!("Rig task error: {:?}", e);
let result =
rig_task::run_rig_task(task_config, rig_rx, state_tx.clone(), rig_shutdown_rx)
.await;
match result {
Ok(()) => {
info!("[{}] Rig task exited cleanly", rig_id_supervisor);
}
Err(e) => {
error!(
"[{}] Rig task crashed: {:?}; signalling error state to clients",
rig_id_supervisor, e
);
let mut err_state = state_tx.borrow().clone();
err_state.machine_state = "Error".to_string();
err_state.error_message =
Some(format!("Rig task terminated unexpectedly: {}", e));
let _ = state_tx.send(err_state);
}
}
}));
@@ -1143,6 +1164,10 @@ async fn main() -> DynResult<()> {
.collect();
let rigs_arc = Arc::new(rig_handles);
let listener_shutdown_rx = shutdown_rx.clone();
let listener_timeouts = listener::ListenerTimeouts {
io_timeout: Duration::from_millis(cfg.timeouts.io_timeout_ms),
request_timeout: Duration::from_millis(cfg.timeouts.request_timeout_ms),
};
task_handles.push(tokio::spawn(async move {
let station_coords = latitude.zip(longitude);
if let Err(e) = listener::run_listener(
@@ -1151,6 +1176,7 @@ async fn main() -> DynResult<()> {
default_rig_id,
auth_tokens,
station_coords,
listener_timeouts,
listener_shutdown_rx,
)
.await
+19 -6
View File
@@ -27,8 +27,10 @@ use trx_core::{DynResult, RigError, RigResult};
use crate::audio::DecoderHistories;
use crate::error::is_invalid_bcd_error;
const POLL_REFRESH_TIMEOUT: Duration = Duration::from_secs(8);
const COMMAND_EXEC_TIMEOUT: Duration = Duration::from_secs(10);
/// Fallback poll refresh timeout used when no config value is provided.
const DEFAULT_POLL_REFRESH_TIMEOUT: Duration = Duration::from_secs(8);
/// Fallback command execution timeout used when no config value is provided.
const DEFAULT_COMMAND_EXEC_TIMEOUT: Duration = Duration::from_secs(10);
/// Configuration for the rig task.
pub struct RigTaskConfig {
pub registry: Arc<RegistrationContext>,
@@ -57,6 +59,10 @@ pub struct RigTaskConfig {
/// `SoapySdrRig` (built with channel config) without duplicating the
/// pipeline construction.
pub prebuilt_rig: Option<Box<dyn RigCat>>,
/// Maximum time to wait for a single rig command to complete.
pub command_exec_timeout: Duration,
/// Maximum time for a CAT poll refresh cycle.
pub poll_refresh_timeout: Duration,
}
impl Default for RigTaskConfig {
@@ -85,6 +91,8 @@ impl Default for RigTaskConfig {
histories: DecoderHistories::new(),
vfo_prime: true,
prebuilt_rig: None,
command_exec_timeout: DEFAULT_COMMAND_EXEC_TIMEOUT,
poll_refresh_timeout: DEFAULT_POLL_REFRESH_TIMEOUT,
}
}
}
@@ -143,6 +151,10 @@ pub async fn run_rig_task(
state.pskreporter_status = config.pskreporter_status.clone();
state.aprs_is_status = config.aprs_is_status.clone();
// Timeout configuration
let command_exec_timeout = config.command_exec_timeout;
let poll_refresh_timeout = config.poll_refresh_timeout;
// Polling configuration
let polling = &config.polling;
let retry = &config.retry;
@@ -284,7 +296,7 @@ pub async fn run_rig_task(
// Poll rig state
let old_state = state.clone();
match time::timeout(
POLL_REFRESH_TIMEOUT,
poll_refresh_timeout,
refresh_state_with_retry(&mut rig, &mut state, retry),
)
.await
@@ -315,7 +327,7 @@ pub async fn run_rig_task(
Err(_) => {
error!(
"CAT polling timed out after {:?}",
POLL_REFRESH_TIMEOUT
poll_refresh_timeout
);
}
}
@@ -329,8 +341,9 @@ pub async fn run_rig_task(
batch.push(next);
}
// Process each request
while let Some(RigRequest { cmd, respond_to, .. }) = batch.pop() {
// Process each request in FIFO order (drain from front)
while !batch.is_empty() {
let RigRequest { cmd, respond_to, .. } = batch.remove(0);
if matches!(cmd, RigCommand::GetSpectrum) {
let mut responders = vec![respond_to];
let mut idx = 0;
+1 -1
View File
@@ -17,7 +17,7 @@ use trx_backend_ft450d::Ft450d;
#[cfg(feature = "ft817")]
use trx_backend_ft817::Ft817;
#[cfg(feature = "soapysdr")]
pub use trx_backend_soapysdr::SoapySdrRig;
pub use trx_backend_soapysdr::{SoapySdrConfig, SoapySdrRig};
/// Connection details for instantiating a rig backend.
#[derive(Debug, Clone)]
@@ -23,6 +23,86 @@ const AIS_CHANNEL_SPACING_HZ: i64 = 50_000;
pub use vchan_impl::SdrVirtualChannelManager;
/// Configuration struct for constructing a [`SoapySdrRig`].
///
/// Replaces the 20+ parameter `new_with_config()` constructor with a more
/// readable and maintainable builder. All fields have sensible defaults via
/// the `Default` implementation.
#[derive(Debug, Clone)]
pub struct SoapySdrConfig {
/// SoapySDR device args string (e.g. `"driver=rtlsdr"`).
pub args: String,
/// Per-channel tuples of `(channel_if_hz, initial_mode, audio_bandwidth_hz)`.
pub channels: Vec<(f64, RigMode, u32)>,
/// `"auto"` or `"manual"`.
pub gain_mode: String,
/// Gain in dB; used when `gain_mode == "manual"`.
pub gain_db: f64,
/// Optional hard ceiling for the applied hardware gain in dB.
pub max_gain_db: Option<f64>,
/// Output PCM rate (Hz).
pub audio_sample_rate: u32,
/// Number of audio channels.
pub audio_channels: usize,
/// Output frame length (ms).
pub frame_duration_ms: u16,
/// WFM deemphasis time constant in microseconds.
pub wfm_deemphasis_us: u32,
/// Initial dial frequency.
pub initial_freq: Freq,
/// Initial demodulation mode.
pub initial_mode: RigMode,
/// IQ capture rate (Hz).
pub sdr_sample_rate: u32,
/// Hardware IF filter bandwidth to apply to the device.
pub bandwidth_hz: u32,
/// The hardware is tuned this many Hz *below* the dial frequency so the
/// desired signal lands off-DC. The DSP mixer shifts it back.
pub center_offset_hz: i64,
/// Enable software squelch for all modes except WFM.
pub squelch_enabled: bool,
/// Squelch open threshold in dBFS.
pub squelch_threshold_db: f32,
/// Close hysteresis in dB.
pub squelch_hysteresis_db: f32,
/// Tail hold time in milliseconds.
pub squelch_tail_ms: u32,
/// Maximum number of dynamic virtual channels.
pub max_virtual_channels: usize,
/// Whether the noise blanker is enabled on the primary channel.
pub nb_enabled: bool,
/// Noise blanker impulse threshold multiplier.
pub nb_threshold: f64,
}
impl Default for SoapySdrConfig {
fn default() -> Self {
Self {
args: String::new(),
channels: Vec::new(),
gain_mode: "auto".to_string(),
gain_db: 30.0,
max_gain_db: None,
audio_sample_rate: 48_000,
audio_channels: 1,
frame_duration_ms: 20,
wfm_deemphasis_us: 50,
initial_freq: Freq { hz: 144_300_000 },
initial_mode: RigMode::USB,
sdr_sample_rate: 1_920_000,
bandwidth_hz: 1_500_000,
center_offset_hz: 0,
squelch_enabled: false,
squelch_threshold_db: -65.0,
squelch_hysteresis_db: 3.0,
squelch_tail_ms: 180,
max_virtual_channels: 4,
nb_enabled: false,
nb_threshold: 10.0,
}
}
}
/// RX-only backend for any SoapySDR-compatible device.
pub struct SoapySdrRig {
info: RigInfo,
@@ -88,55 +168,32 @@ impl SoapySdrRig {
}
}
/// Full constructor. All channel configuration is passed as plain
/// parameters so this crate does not need to depend on `trx-server`
/// (which is a binary, not a library crate).
/// Construct from a [`SoapySdrConfig`] struct.
///
/// # Parameters
/// - `args`: SoapySDR device args string (e.g. `"driver=rtlsdr"`).
/// Opens a real hardware device via SoapySDR.
/// - `channels`: per-channel tuples of
/// `(channel_if_hz, initial_mode, audio_bandwidth_hz)`.
/// - `gain_mode`: `"auto"` or `"manual"`.
/// - `gain_db`: gain in dB; used when `gain_mode == "manual"`.
/// - `max_gain_db`: optional hard ceiling for the applied hardware gain.
/// - `audio_sample_rate`: output PCM rate (Hz).
/// - `frame_duration_ms`: output frame length (ms).
/// - `initial_freq`: initial dial frequency reported by `get_status`.
/// - `initial_mode`: initial demodulation mode.
/// - `sdr_sample_rate`: IQ capture rate (Hz).
/// - `bandwidth_hz`: hardware IF filter bandwidth to apply to the device.
/// - `center_offset_hz`: the hardware is tuned this many Hz *below* the
/// dial frequency so the desired signal lands off-DC. The DSP mixer
/// shifts it back. Pass 0 to tune exactly to the dial frequency.
/// - `squelch_enabled`: enable software squelch for all modes except WFM.
/// - `squelch_threshold_db`: squelch open threshold in dBFS.
/// - `squelch_hysteresis_db`: close hysteresis in dB.
/// - `squelch_tail_ms`: tail hold time in milliseconds.
#[allow(clippy::too_many_arguments)]
pub fn new_with_config(
args: &str,
channels: &[(f64, RigMode, u32)],
gain_mode: &str,
gain_db: f64,
max_gain_db: Option<f64>,
audio_sample_rate: u32,
audio_channels: usize,
frame_duration_ms: u16,
wfm_deemphasis_us: u32,
initial_freq: Freq,
initial_mode: RigMode,
sdr_sample_rate: u32,
bandwidth_hz: u32,
center_offset_hz: i64,
squelch_enabled: bool,
squelch_threshold_db: f32,
squelch_hysteresis_db: f32,
squelch_tail_ms: u32,
max_virtual_channels: usize,
nb_enabled: bool,
nb_threshold: f64,
) -> DynResult<Self> {
/// This is the preferred constructor. See [`SoapySdrConfig`] for field
/// documentation and defaults.
pub fn new_from_config(config: SoapySdrConfig) -> DynResult<Self> {
let args = &config.args;
let channels = &config.channels;
let gain_mode = &config.gain_mode;
let gain_db = config.gain_db;
let max_gain_db = config.max_gain_db;
let audio_sample_rate = config.audio_sample_rate;
let audio_channels = config.audio_channels;
let frame_duration_ms = config.frame_duration_ms;
let wfm_deemphasis_us = config.wfm_deemphasis_us;
let initial_freq = config.initial_freq;
let initial_mode = config.initial_mode;
let sdr_sample_rate = config.sdr_sample_rate;
let bandwidth_hz = config.bandwidth_hz;
let center_offset_hz = config.center_offset_hz;
let squelch_enabled = config.squelch_enabled;
let squelch_threshold_db = config.squelch_threshold_db;
let squelch_hysteresis_db = config.squelch_hysteresis_db;
let squelch_tail_ms = config.squelch_tail_ms;
let max_virtual_channels = config.max_virtual_channels;
let nb_enabled = config.nb_enabled;
let nb_threshold = config.nb_threshold;
tracing::info!(
"initialising SoapySDR backend (args={:?}, gain_mode={:?}, gain_db={}, max_gain_db={:?})",
args,
@@ -332,33 +389,67 @@ impl SoapySdrRig {
Ok(rig)
}
/// Legacy constructor kept for backward compatibility.
///
/// Prefer [`Self::new_from_config`] with a [`SoapySdrConfig`] struct for
/// better readability.
#[allow(clippy::too_many_arguments)]
pub fn new_with_config(
args: &str,
channels: &[(f64, RigMode, u32)],
gain_mode: &str,
gain_db: f64,
max_gain_db: Option<f64>,
audio_sample_rate: u32,
audio_channels: usize,
frame_duration_ms: u16,
wfm_deemphasis_us: u32,
initial_freq: Freq,
initial_mode: RigMode,
sdr_sample_rate: u32,
bandwidth_hz: u32,
center_offset_hz: i64,
squelch_enabled: bool,
squelch_threshold_db: f32,
squelch_hysteresis_db: f32,
squelch_tail_ms: u32,
max_virtual_channels: usize,
nb_enabled: bool,
nb_threshold: f64,
) -> DynResult<Self> {
Self::new_from_config(SoapySdrConfig {
args: args.to_string(),
channels: channels.to_vec(),
gain_mode: gain_mode.to_string(),
gain_db,
max_gain_db,
audio_sample_rate,
audio_channels,
frame_duration_ms,
wfm_deemphasis_us,
initial_freq,
initial_mode,
sdr_sample_rate,
bandwidth_hz,
center_offset_hz,
squelch_enabled,
squelch_threshold_db,
squelch_hysteresis_db,
squelch_tail_ms,
max_virtual_channels,
nb_enabled,
nb_threshold,
})
}
/// Simple constructor for backward compatibility with the factory function.
/// Creates a pipeline with no channels — the DSP loop runs but produces no
/// PCM frames.
pub fn new(args: &str) -> DynResult<Self> {
Self::new_with_config(
args,
&[], // no channels — pipeline does nothing; filter defaults applied in new_with_config
"auto",
30.0,
None,
48_000,
1,
20,
50,
Freq { hz: 144_300_000 },
RigMode::USB,
1_920_000,
1_500_000, // bandwidth_hz
0, // center_offset_hz
false, // squelch_enabled
-65.0, // squelch_threshold_db
3.0, // squelch_hysteresis_db
180, // squelch_tail_ms
4, // max_virtual_channels
false, // nb_enabled
10.0, // nb_threshold
)
Self::new_from_config(SoapySdrConfig {
args: args.to_string(),
..SoapySdrConfig::default()
})
}
/// Return the virtual channel manager for this SDR rig.