From 319e935d9781e4836376f52a14f8e8276f63de8a Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Tue, 24 Feb 2026 22:27:15 +0100 Subject: [PATCH] [feat](trx-server): wire SoapySDR backend and AudioSource into server startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Call validate_sdr() at startup and abort on errors (SDR.md §11) - Build SoapySdrRig with full [[sdr.channels]] config when access_type is "sdr"; subscribe to its primary-channel PCM sender before handing the pre-built rig to the rig task via RigTaskConfig.prebuilt_rig - Skip cpal capture when an SDR audio source is available; bridge the SdrPipeline PCM broadcast into pcm_tx so all decoder tasks are unchanged - Add trx-backend-soapysdr optional dep and soapysdr feature to trx-server - Add prebuilt_rig field to RigTaskConfig so rig_task skips the registry factory when a pre-built rig is supplied by main Marks SDR-08 complete. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 1 + SDR.md | 2 +- src/trx-server/Cargo.toml | 4 + src/trx-server/src/main.rs | 146 ++++++++++++++++++++++++++++++++- src/trx-server/src/rig_task.rs | 17 +++- 5 files changed, 163 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 826ef72..103ee7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2525,6 +2525,7 @@ dependencies = [ "trx-app", "trx-aprs", "trx-backend", + "trx-backend-soapysdr", "trx-core", "trx-cw", "trx-decode-log", diff --git a/SDR.md b/SDR.md index 9a84ced..3451ae1 100644 --- a/SDR.md +++ b/SDR.md @@ -33,7 +33,7 @@ This document specifies the requirements for a SoapySDR-based RX-only backend (` | ID | Status | Task | Touches | Needs | |----|--------|------|---------|-------| -| SDR-08 | `[ ]` | `main.rs`: after building rig, if `as_audio_source()` is `Some` skip cpal, subscribe each decoder and the Opus encoder to the appropriate channel PCM senders; validate `stream_opus` count ≤ 1 | `src/trx-server/src/main.rs` | SDR-03, SDR-07 | +| SDR-08 | `[x]` | `main.rs`: after building rig, if `as_audio_source()` is `Some` skip cpal, subscribe each decoder and the Opus encoder to the appropriate channel PCM senders; validate `stream_opus` count ≤ 1 | `src/trx-server/src/main.rs` | SDR-03, SDR-07 | | SDR-09 | `[x]` | Add `trx-backend-soapysdr` to workspace `Cargo.toml`; update `CONFIGURATION.md` with new `[sdr]` / `[[sdr.channels]]` options | `Cargo.toml`, `CONFIGURATION.md` | SDR-04 | ### Validation & tests diff --git a/src/trx-server/Cargo.toml b/src/trx-server/Cargo.toml index 7388b57..880427d 100644 --- a/src/trx-server/Cargo.toml +++ b/src/trx-server/Cargo.toml @@ -8,6 +8,9 @@ version = "0.1.0" edition = "2021" build = "build.rs" +[features] +soapysdr = ["dep:trx-backend-soapysdr", "trx-backend/soapysdr"] + [dependencies] tokio = { workspace = true, features = ["full"] } tokio-serial = { workspace = true } @@ -23,6 +26,7 @@ cpal = "0.15" opus = "0.3" trx-app = { path = "../trx-app" } trx-backend = { path = "trx-backend" } +trx-backend-soapysdr = { path = "trx-backend/trx-backend-soapysdr", optional = true } trx-core = { path = "../trx-core" } trx-aprs = { path = "../decoders/trx-aprs" } trx-cw = { path = "../decoders/trx-cw" } diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 5e8d211..78451af 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -35,6 +35,9 @@ use trx_core::DynResult; use config::ServerConfig; use trx_decode_log::DecoderLoggers; +#[cfg(feature = "soapysdr")] +use trx_backend_soapysdr; + const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - rig server daemon"); const RIG_TASK_CHANNEL_BUFFER: usize = 32; const RETRY_MAX_DELAY_SECS: u64 = 2; @@ -162,6 +165,10 @@ fn resolve_config( }; RigAccess::Tcp { addr } } + Some("sdr") => { + let args = cfg.rig.access.args.clone().unwrap_or_default(); + RigAccess::Sdr { args } + } Some(other) => return Err(format!("Unknown access type: {}", other).into()), } }; @@ -227,6 +234,7 @@ fn build_rig_task_config( server_latitude: resolved.latitude, server_longitude: resolved.longitude, pskreporter_status, + prebuilt_rig: None, } } @@ -241,6 +249,73 @@ async fn wait_for_shutdown(mut shutdown_rx: watch::Receiver) { } } +/// Parse a `RigMode` from a string slice. +/// Falls back to `initial_mode` when the string is "auto" or unrecognised. +#[cfg(feature = "soapysdr")] +fn parse_rig_mode( + s: &str, + initial_mode: &trx_core::rig::state::RigMode, +) -> trx_core::rig::state::RigMode { + use trx_core::rig::state::RigMode; + match s { + "LSB" => RigMode::LSB, + "USB" => RigMode::USB, + "CW" => RigMode::CW, + "CWR" => RigMode::CWR, + "AM" => RigMode::AM, + "WFM" => RigMode::WFM, + "FM" => RigMode::FM, + "DIG" => RigMode::DIG, + "PKT" => RigMode::PKT, + _ => initial_mode.clone(), + } +} + +/// Build a `SoapySdrRig` with full channel config from `ServerConfig` and +/// return both the rig box and a PCM receiver subscribed to its primary channel. +/// +/// Only compiled when the `soapysdr` feature is enabled. +#[cfg(feature = "soapysdr")] +fn build_sdr_rig( + cfg: &ServerConfig, +) -> DynResult<( + Box, + tokio::sync::broadcast::Receiver>, +)> { + use trx_core::radio::freq::Freq; + use trx_core::rig::AudioSource; + + let args = cfg.rig.access.args.as_deref().unwrap_or(""); + let channels: Vec<(f64, trx_core::rig::state::RigMode, u32, usize)> = cfg + .sdr + .channels + .iter() + .map(|ch| { + let if_hz = (cfg.sdr.center_offset_hz + ch.offset_hz) as f64; + let mode = parse_rig_mode(&ch.mode, &cfg.rig.initial_mode); + (if_hz, mode, ch.audio_bandwidth_hz, ch.fir_taps) + }) + .collect(); + + let sdr_rig = trx_backend_soapysdr::SoapySdrRig::new_with_config( + args, + &channels, + &cfg.sdr.gain.mode, + cfg.sdr.gain.value, + cfg.audio.sample_rate, + cfg.audio.frame_duration_ms, + Freq { + hz: cfg.rig.initial_freq_hz, + }, + cfg.rig.initial_mode.clone(), + cfg.sdr.sample_rate, + )?; + + // Subscribe to the primary channel's PCM broadcast before consuming the rig. + let pcm_rx = sdr_rig.subscribe_pcm(); + Ok((Box::new(sdr_rig) as Box, pcm_rx)) +} + #[tokio::main] async fn main() -> DynResult<()> { // Phase 3B: Create bootstrap context for explicit initialization. @@ -266,6 +341,15 @@ async fn main() -> DynResult<()> { cfg.validate() .map_err(|e| format!("Invalid server configuration: {}", e))?; + // Validate SDR-specific configuration rules (see SDR.md §11). + let sdr_errors = cfg.validate_sdr(); + if !sdr_errors.is_empty() { + for e in &sdr_errors { + tracing::error!("SDR config error: {}", e); + } + std::process::exit(1); + } + init_logging(cfg.general.log_level.as_deref()); let bootstrap_ctx_ptr = NonNull::from(&mut bootstrap_ctx).cast(); @@ -302,6 +386,30 @@ async fn main() -> DynResult<()> { info!("Callsign: {}", cs); } + // For the SDR access type: build the SoapySdrRig with full channel config + // here in main so we can subscribe to its primary-channel PCM sender + // before passing the rig to the rig task. The rig task skips its + // registry factory when `prebuilt_rig` is set. + // + // When the `soapysdr` feature is disabled this block is elided and + // `sdr_pcm_rx` is always `None`, preserving the cpal path. + #[cfg(feature = "soapysdr")] + let (sdr_prebuilt_rig, sdr_pcm_rx): ( + Option>, + Option>>, + ) = if cfg.rig.access.access_type.as_deref() == Some("sdr") { + let (rig, pcm_rx) = build_sdr_rig(&cfg)?; + (Some(rig), Some(pcm_rx)) + } else { + (None, None) + }; + + #[cfg(not(feature = "soapysdr"))] + let (sdr_prebuilt_rig, sdr_pcm_rx): ( + Option>, + Option>>, + ) = (None, None); + let (tx, rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); let mut task_handles: Vec> = Vec::new(); let (shutdown_tx, shutdown_rx) = watch::channel(false); @@ -327,8 +435,14 @@ async fn main() -> DynResult<()> { // Keep receivers alive so channels don't close prematurely let _state_rx = state_rx; - let rig_task_config = + let mut rig_task_config = build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx)); + + // Pass pre-built SDR rig to the task so it skips the registry factory. + if let Some(prebuilt) = sdr_prebuilt_rig { + rig_task_config.prebuilt_rig = Some(prebuilt); + } + let rig_shutdown_rx = shutdown_rx.clone(); task_handles.push(tokio::spawn(async move { if let Err(e) = rig_task::run_rig_task(rig_task_config, rx, state_tx, rig_shutdown_rx).await @@ -437,8 +551,34 @@ async fn main() -> DynResult<()> { }; if cfg.audio.rx_enabled { - let _capture_thread = - audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone())); + if let Some(mut sdr_rx) = sdr_pcm_rx { + // SDR path: the backend pipeline provides demodulated PCM, + // so cpal capture is skipped entirely. + // The SDR PCM frames are bridged into pcm_tx so the existing + // decoder spawn code below receives them unchanged. + tracing::info!("using SDR audio source — cpal capture disabled"); + let pcm_tx_clone = pcm_tx.clone(); + task_handles.push(tokio::spawn(async move { + loop { + match sdr_rx.recv().await { + Ok(frame) => { + let _ = pcm_tx_clone.send(frame); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("SDR audio bridge: dropped {} frames", n); + } + Err(_) => break, + } + } + })); + } else { + // cpal path (existing serial/TCP transceivers) + let _capture_thread = audio::spawn_audio_capture( + &cfg.audio, + rx_audio_tx.clone(), + Some(pcm_tx.clone()), + ); + } // Spawn APRS decoder task let aprs_pcm_rx = pcm_tx.subscribe(); diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 8cc0738..510b3b8 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -42,6 +42,11 @@ pub struct RigTaskConfig { pub server_latitude: Option, pub server_longitude: Option, pub pskreporter_status: Option, + /// Pre-built rig backend. When `Some`, the registry factory is skipped. + /// Used by the SDR path in `main.rs` to pass a fully-configured + /// `SoapySdrRig` (built with channel config) without duplicating the + /// pipeline construction. + pub prebuilt_rig: Option>, } impl Default for RigTaskConfig { @@ -65,6 +70,7 @@ impl Default for RigTaskConfig { server_latitude: None, server_longitude: None, pskreporter_status: None, + prebuilt_rig: None, } } } @@ -94,9 +100,14 @@ pub async fn run_rig_task( RigAccess::Sdr { args } => info!("SDR: {}", args), } - let mut rig: Box = config - .registry - .build_rig(&config.rig_model, config.access)?; + let mut rig: Box = if let Some(prebuilt) = config.prebuilt_rig { + info!("Using pre-built rig backend (SDR path)"); + prebuilt + } else { + config + .registry + .build_rig(&config.rig_model, config.access)? + }; info!("Rig backend ready"); // Initialize state machine and state