[feat](trx-server): wire SoapySDR backend and AudioSource into server startup
- Call validate_sdr() at startup and abort on errors (SDR.md §11) - Build SoapySdrRig with full [[sdr.channels]] config when access_type is "sdr"; subscribe to its primary-channel PCM sender before handing the pre-built rig to the rig task via RigTaskConfig.prebuilt_rig - Skip cpal capture when an SDR audio source is available; bridge the SdrPipeline PCM broadcast into pcm_tx so all decoder tasks are unchanged - Add trx-backend-soapysdr optional dep and soapysdr feature to trx-server - Add prebuilt_rig field to RigTaskConfig so rig_task skips the registry factory when a pre-built rig is supplied by main Marks SDR-08 complete. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
Generated
+1
@@ -2525,6 +2525,7 @@ dependencies = [
|
||||
"trx-app",
|
||||
"trx-aprs",
|
||||
"trx-backend",
|
||||
"trx-backend-soapysdr",
|
||||
"trx-core",
|
||||
"trx-cw",
|
||||
"trx-decode-log",
|
||||
|
||||
@@ -33,7 +33,7 @@ This document specifies the requirements for a SoapySDR-based RX-only backend (`
|
||||
|
||||
| ID | Status | Task | Touches | Needs |
|
||||
|----|--------|------|---------|-------|
|
||||
| SDR-08 | `[ ]` | `main.rs`: after building rig, if `as_audio_source()` is `Some` skip cpal, subscribe each decoder and the Opus encoder to the appropriate channel PCM senders; validate `stream_opus` count ≤ 1 | `src/trx-server/src/main.rs` | SDR-03, SDR-07 |
|
||||
| SDR-08 | `[x]` | `main.rs`: after building rig, if `as_audio_source()` is `Some` skip cpal, subscribe each decoder and the Opus encoder to the appropriate channel PCM senders; validate `stream_opus` count ≤ 1 | `src/trx-server/src/main.rs` | SDR-03, SDR-07 |
|
||||
| SDR-09 | `[x]` | Add `trx-backend-soapysdr` to workspace `Cargo.toml`; update `CONFIGURATION.md` with new `[sdr]` / `[[sdr.channels]]` options | `Cargo.toml`, `CONFIGURATION.md` | SDR-04 |
|
||||
|
||||
### Validation & tests
|
||||
|
||||
@@ -8,6 +8,9 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
build = "build.rs"
|
||||
|
||||
[features]
|
||||
soapysdr = ["dep:trx-backend-soapysdr", "trx-backend/soapysdr"]
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tokio-serial = { workspace = true }
|
||||
@@ -23,6 +26,7 @@ cpal = "0.15"
|
||||
opus = "0.3"
|
||||
trx-app = { path = "../trx-app" }
|
||||
trx-backend = { path = "trx-backend" }
|
||||
trx-backend-soapysdr = { path = "trx-backend/trx-backend-soapysdr", optional = true }
|
||||
trx-core = { path = "../trx-core" }
|
||||
trx-aprs = { path = "../decoders/trx-aprs" }
|
||||
trx-cw = { path = "../decoders/trx-cw" }
|
||||
|
||||
+143
-3
@@ -35,6 +35,9 @@ use trx_core::DynResult;
|
||||
use config::ServerConfig;
|
||||
use trx_decode_log::DecoderLoggers;
|
||||
|
||||
#[cfg(feature = "soapysdr")]
|
||||
use trx_backend_soapysdr;
|
||||
|
||||
const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - rig server daemon");
|
||||
const RIG_TASK_CHANNEL_BUFFER: usize = 32;
|
||||
const RETRY_MAX_DELAY_SECS: u64 = 2;
|
||||
@@ -162,6 +165,10 @@ fn resolve_config(
|
||||
};
|
||||
RigAccess::Tcp { addr }
|
||||
}
|
||||
Some("sdr") => {
|
||||
let args = cfg.rig.access.args.clone().unwrap_or_default();
|
||||
RigAccess::Sdr { args }
|
||||
}
|
||||
Some(other) => return Err(format!("Unknown access type: {}", other).into()),
|
||||
}
|
||||
};
|
||||
@@ -227,6 +234,7 @@ fn build_rig_task_config(
|
||||
server_latitude: resolved.latitude,
|
||||
server_longitude: resolved.longitude,
|
||||
pskreporter_status,
|
||||
prebuilt_rig: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,6 +249,73 @@ async fn wait_for_shutdown(mut shutdown_rx: watch::Receiver<bool>) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a `RigMode` from a string slice.
|
||||
/// Falls back to `initial_mode` when the string is "auto" or unrecognised.
|
||||
#[cfg(feature = "soapysdr")]
|
||||
fn parse_rig_mode(
|
||||
s: &str,
|
||||
initial_mode: &trx_core::rig::state::RigMode,
|
||||
) -> trx_core::rig::state::RigMode {
|
||||
use trx_core::rig::state::RigMode;
|
||||
match s {
|
||||
"LSB" => RigMode::LSB,
|
||||
"USB" => RigMode::USB,
|
||||
"CW" => RigMode::CW,
|
||||
"CWR" => RigMode::CWR,
|
||||
"AM" => RigMode::AM,
|
||||
"WFM" => RigMode::WFM,
|
||||
"FM" => RigMode::FM,
|
||||
"DIG" => RigMode::DIG,
|
||||
"PKT" => RigMode::PKT,
|
||||
_ => initial_mode.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a `SoapySdrRig` with full channel config from `ServerConfig` and
|
||||
/// return both the rig box and a PCM receiver subscribed to its primary channel.
|
||||
///
|
||||
/// Only compiled when the `soapysdr` feature is enabled.
|
||||
#[cfg(feature = "soapysdr")]
|
||||
fn build_sdr_rig(
|
||||
cfg: &ServerConfig,
|
||||
) -> DynResult<(
|
||||
Box<dyn trx_core::rig::RigCat>,
|
||||
tokio::sync::broadcast::Receiver<Vec<f32>>,
|
||||
)> {
|
||||
use trx_core::radio::freq::Freq;
|
||||
use trx_core::rig::AudioSource;
|
||||
|
||||
let args = cfg.rig.access.args.as_deref().unwrap_or("");
|
||||
let channels: Vec<(f64, trx_core::rig::state::RigMode, u32, usize)> = cfg
|
||||
.sdr
|
||||
.channels
|
||||
.iter()
|
||||
.map(|ch| {
|
||||
let if_hz = (cfg.sdr.center_offset_hz + ch.offset_hz) as f64;
|
||||
let mode = parse_rig_mode(&ch.mode, &cfg.rig.initial_mode);
|
||||
(if_hz, mode, ch.audio_bandwidth_hz, ch.fir_taps)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let sdr_rig = trx_backend_soapysdr::SoapySdrRig::new_with_config(
|
||||
args,
|
||||
&channels,
|
||||
&cfg.sdr.gain.mode,
|
||||
cfg.sdr.gain.value,
|
||||
cfg.audio.sample_rate,
|
||||
cfg.audio.frame_duration_ms,
|
||||
Freq {
|
||||
hz: cfg.rig.initial_freq_hz,
|
||||
},
|
||||
cfg.rig.initial_mode.clone(),
|
||||
cfg.sdr.sample_rate,
|
||||
)?;
|
||||
|
||||
// Subscribe to the primary channel's PCM broadcast before consuming the rig.
|
||||
let pcm_rx = sdr_rig.subscribe_pcm();
|
||||
Ok((Box::new(sdr_rig) as Box<dyn trx_core::rig::RigCat>, pcm_rx))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> DynResult<()> {
|
||||
// Phase 3B: Create bootstrap context for explicit initialization.
|
||||
@@ -266,6 +341,15 @@ async fn main() -> DynResult<()> {
|
||||
cfg.validate()
|
||||
.map_err(|e| format!("Invalid server configuration: {}", e))?;
|
||||
|
||||
// Validate SDR-specific configuration rules (see SDR.md §11).
|
||||
let sdr_errors = cfg.validate_sdr();
|
||||
if !sdr_errors.is_empty() {
|
||||
for e in &sdr_errors {
|
||||
tracing::error!("SDR config error: {}", e);
|
||||
}
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
init_logging(cfg.general.log_level.as_deref());
|
||||
|
||||
let bootstrap_ctx_ptr = NonNull::from(&mut bootstrap_ctx).cast();
|
||||
@@ -302,6 +386,30 @@ async fn main() -> DynResult<()> {
|
||||
info!("Callsign: {}", cs);
|
||||
}
|
||||
|
||||
// For the SDR access type: build the SoapySdrRig with full channel config
|
||||
// here in main so we can subscribe to its primary-channel PCM sender
|
||||
// before passing the rig to the rig task. The rig task skips its
|
||||
// registry factory when `prebuilt_rig` is set.
|
||||
//
|
||||
// When the `soapysdr` feature is disabled this block is elided and
|
||||
// `sdr_pcm_rx` is always `None`, preserving the cpal path.
|
||||
#[cfg(feature = "soapysdr")]
|
||||
let (sdr_prebuilt_rig, sdr_pcm_rx): (
|
||||
Option<Box<dyn trx_core::rig::RigCat>>,
|
||||
Option<tokio::sync::broadcast::Receiver<Vec<f32>>>,
|
||||
) = if cfg.rig.access.access_type.as_deref() == Some("sdr") {
|
||||
let (rig, pcm_rx) = build_sdr_rig(&cfg)?;
|
||||
(Some(rig), Some(pcm_rx))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "soapysdr"))]
|
||||
let (sdr_prebuilt_rig, sdr_pcm_rx): (
|
||||
Option<Box<dyn trx_core::rig::RigCat>>,
|
||||
Option<tokio::sync::broadcast::Receiver<Vec<f32>>>,
|
||||
) = (None, None);
|
||||
|
||||
let (tx, rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
|
||||
let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
@@ -327,8 +435,14 @@ async fn main() -> DynResult<()> {
|
||||
// Keep receivers alive so channels don't close prematurely
|
||||
let _state_rx = state_rx;
|
||||
|
||||
let rig_task_config =
|
||||
let mut rig_task_config =
|
||||
build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx));
|
||||
|
||||
// Pass pre-built SDR rig to the task so it skips the registry factory.
|
||||
if let Some(prebuilt) = sdr_prebuilt_rig {
|
||||
rig_task_config.prebuilt_rig = Some(prebuilt);
|
||||
}
|
||||
|
||||
let rig_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
if let Err(e) = rig_task::run_rig_task(rig_task_config, rx, state_tx, rig_shutdown_rx).await
|
||||
@@ -437,8 +551,34 @@ async fn main() -> DynResult<()> {
|
||||
};
|
||||
|
||||
if cfg.audio.rx_enabled {
|
||||
let _capture_thread =
|
||||
audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone()));
|
||||
if let Some(mut sdr_rx) = sdr_pcm_rx {
|
||||
// SDR path: the backend pipeline provides demodulated PCM,
|
||||
// so cpal capture is skipped entirely.
|
||||
// The SDR PCM frames are bridged into pcm_tx so the existing
|
||||
// decoder spawn code below receives them unchanged.
|
||||
tracing::info!("using SDR audio source — cpal capture disabled");
|
||||
let pcm_tx_clone = pcm_tx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
loop {
|
||||
match sdr_rx.recv().await {
|
||||
Ok(frame) => {
|
||||
let _ = pcm_tx_clone.send(frame);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!("SDR audio bridge: dropped {} frames", n);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
// cpal path (existing serial/TCP transceivers)
|
||||
let _capture_thread = audio::spawn_audio_capture(
|
||||
&cfg.audio,
|
||||
rx_audio_tx.clone(),
|
||||
Some(pcm_tx.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn APRS decoder task
|
||||
let aprs_pcm_rx = pcm_tx.subscribe();
|
||||
|
||||
@@ -42,6 +42,11 @@ pub struct RigTaskConfig {
|
||||
pub server_latitude: Option<f64>,
|
||||
pub server_longitude: Option<f64>,
|
||||
pub pskreporter_status: Option<String>,
|
||||
/// Pre-built rig backend. When `Some`, the registry factory is skipped.
|
||||
/// Used by the SDR path in `main.rs` to pass a fully-configured
|
||||
/// `SoapySdrRig` (built with channel config) without duplicating the
|
||||
/// pipeline construction.
|
||||
pub prebuilt_rig: Option<Box<dyn RigCat>>,
|
||||
}
|
||||
|
||||
impl Default for RigTaskConfig {
|
||||
@@ -65,6 +70,7 @@ impl Default for RigTaskConfig {
|
||||
server_latitude: None,
|
||||
server_longitude: None,
|
||||
pskreporter_status: None,
|
||||
prebuilt_rig: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -94,9 +100,14 @@ pub async fn run_rig_task(
|
||||
RigAccess::Sdr { args } => info!("SDR: {}", args),
|
||||
}
|
||||
|
||||
let mut rig: Box<dyn RigCat> = config
|
||||
.registry
|
||||
.build_rig(&config.rig_model, config.access)?;
|
||||
let mut rig: Box<dyn RigCat> = if let Some(prebuilt) = config.prebuilt_rig {
|
||||
info!("Using pre-built rig backend (SDR path)");
|
||||
prebuilt
|
||||
} else {
|
||||
config
|
||||
.registry
|
||||
.build_rig(&config.rig_model, config.access)?
|
||||
};
|
||||
info!("Rig backend ready");
|
||||
|
||||
// Initialize state machine and state
|
||||
|
||||
Reference in New Issue
Block a user