[feat](trx-backend-soapysdr): dynamic virtual channel DSP slots

Replace the fixed channel_dsps Vec (thread-owned) with an
Arc<RwLock<Vec<...>>> shared between the IQ read thread and the async
side, enabling live add/remove of virtual DSP channels.

Cache audio construction params in SdrPipeline so add_virtual_channel()
can build ChannelDsp instances without being re-passed them. Add:
  - SdrPipeline::add_virtual_channel() / remove_virtual_channel()
  - SoapySdrRig::virtual_channel_add/remove/set_freq/set_mode()
  - SoapySdrRig::center_hz() / half_span_hz() accessors

The IQ read loop holds a brief read lock (~2 ms per block) while
processing all channels; write lock for add/remove waits at most one
block. All 27 existing tests continue to pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-11 00:26:23 +01:00
parent 2f115fbec3
commit 05169912b1
2 changed files with 248 additions and 53 deletions
@@ -15,7 +15,7 @@ mod channel;
mod filter;
mod spectrum;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use num_complex::Complex;
@@ -84,7 +84,10 @@ impl IqSource for MockIqSource {
pub struct SdrPipeline {
pub pcm_senders: Vec<broadcast::Sender<Vec<f32>>>,
pub iq_senders: Vec<broadcast::Sender<Vec<Complex<f32>>>>,
pub channel_dsps: Vec<Arc<Mutex<ChannelDsp>>>,
/// All DSP channel slots, including fixed (primary, AIS) and dynamic
/// (user virtual) channels. Shared with the IQ read thread via RwLock.
/// Virtual channels are appended beyond the fixed slots.
pub channel_dsps: Arc<RwLock<Vec<Arc<Mutex<ChannelDsp>>>>>,
/// Latest FFT magnitude bins (dBFS, FFT-shifted), updated ~10 Hz.
pub spectrum_buf: Arc<Mutex<Option<Vec<f32>>>>,
/// SDR capture sample rate, needed by `SoapySdrRig::get_spectrum`.
@@ -95,6 +98,13 @@ pub struct SdrPipeline {
/// Write `Some(gain_db)` here to adjust the hardware RX gain.
/// The IQ read loop picks it up on the next iteration.
pub gain_cmd: Arc<std::sync::Mutex<Option<f64>>>,
// Parameters cached here so `add_virtual_channel` can construct new
// `ChannelDsp` instances without needing to be passed them again.
audio_sample_rate: u32,
audio_channels: usize,
frame_duration_ms: u16,
wfm_deemphasis_us: u32,
wfm_stereo: bool,
}
impl SdrPipeline {
@@ -117,7 +127,7 @@ impl SdrPipeline {
let mut pcm_senders = Vec::with_capacity(channels.len());
let mut iq_senders = Vec::with_capacity(channels.len());
let mut channel_dsps: Vec<Arc<Mutex<ChannelDsp>>> = Vec::with_capacity(channels.len());
let mut channel_dsps_vec: Vec<Arc<Mutex<ChannelDsp>>> = Vec::with_capacity(channels.len());
for (channel_idx, &(channel_if_hz, ref mode, audio_bandwidth_hz, fir_taps)) in
channels.iter().enumerate()
@@ -146,10 +156,12 @@ impl SdrPipeline {
);
pcm_senders.push(pcm_tx);
iq_senders.push(iq_tx);
channel_dsps.push(Arc::new(Mutex::new(dsp)));
channel_dsps_vec.push(Arc::new(Mutex::new(dsp)));
}
let thread_dsps: Vec<Arc<Mutex<ChannelDsp>>> = channel_dsps.clone();
let channel_dsps: Arc<RwLock<Vec<Arc<Mutex<ChannelDsp>>>>> =
Arc::new(RwLock::new(channel_dsps_vec));
let thread_dsps = channel_dsps.clone();
let spectrum_buf: Arc<Mutex<Option<Vec<f32>>>> = Arc::new(Mutex::new(None));
let thread_spectrum_buf = spectrum_buf.clone();
let retune_cmd: Arc<std::sync::Mutex<Option<f64>>> = Arc::new(std::sync::Mutex::new(None));
@@ -180,8 +192,67 @@ impl SdrPipeline {
sdr_sample_rate,
retune_cmd,
gain_cmd,
audio_sample_rate,
audio_channels: output_channels,
frame_duration_ms,
wfm_deemphasis_us,
wfm_stereo,
}
}
/// Allocate a new virtual DSP channel.
///
/// Returns the PCM and IQ broadcast senders so the caller can subscribe to
/// audio frames and raw decimated IQ respectively. The channel is appended
/// beyond all fixed slots and is immediately visible to the IQ read thread.
pub fn add_virtual_channel(
&self,
channel_if_hz: f64,
mode: &RigMode,
bandwidth_hz: u32,
fir_taps: usize,
) -> (broadcast::Sender<Vec<f32>>, broadcast::Sender<Vec<Complex<f32>>>) {
const PCM_BROADCAST_CAPACITY: usize = 32;
const IQ_BROADCAST_CAPACITY: usize = 64;
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(PCM_BROADCAST_CAPACITY);
let (iq_tx, _) = broadcast::channel::<Vec<Complex<f32>>>(IQ_BROADCAST_CAPACITY);
let dsp = ChannelDsp::new(
channel_if_hz,
mode,
self.sdr_sample_rate,
self.audio_sample_rate,
self.audio_channels,
self.frame_duration_ms,
bandwidth_hz,
self.wfm_deemphasis_us,
self.wfm_stereo,
fir_taps.max(1),
VirtualSquelchConfig::default(),
pcm_tx.clone(),
iq_tx.clone(),
);
self.channel_dsps
.write()
.expect("channel_dsps RwLock poisoned")
.push(Arc::new(Mutex::new(dsp)));
(pcm_tx, iq_tx)
}
/// Remove a DSP channel slot by its index in the full `channel_dsps` list.
///
/// Returns `false` when the index is out of range. Callers are responsible
/// for not removing fixed slots (primary, AIS).
pub fn remove_virtual_channel(&self, idx: usize) -> bool {
let mut dsps = self
.channel_dsps
.write()
.expect("channel_dsps RwLock poisoned");
if idx >= dsps.len() {
return false;
}
dsps.remove(idx);
true
}
}
// ---------------------------------------------------------------------------
@@ -193,7 +264,7 @@ pub const IQ_BLOCK_SIZE: usize = 4096;
fn iq_read_loop(
mut source: Box<dyn IqSource>,
sdr_sample_rate: u32,
channel_dsps: Vec<Arc<Mutex<ChannelDsp>>>,
channel_dsps: Arc<RwLock<Vec<Arc<Mutex<ChannelDsp>>>>>,
iq_tx: broadcast::Sender<Vec<Complex<f32>>>,
spectrum_buf: Arc<Mutex<Option<Vec<f32>>>>,
retune_cmd: Arc<std::sync::Mutex<Option<f64>>>,
@@ -326,11 +397,18 @@ fn iq_read_loop(
let _ = iq_tx.send(samples.to_vec());
}
for dsp_arc in &channel_dsps {
match dsp_arc.lock() {
Ok(mut dsp) => dsp.process_block(samples),
Err(e) => {
tracing::error!("channel DSP mutex poisoned: {}", e);
// Hold a read lock only for the duration of this block's DSP pass.
// Write lock (add/remove channel) waits at most one block (~2 ms).
{
let dsps = channel_dsps
.read()
.expect("channel_dsps RwLock poisoned");
for dsp_arc in dsps.iter() {
match dsp_arc.lock() {
Ok(mut dsp) => dsp.process_block(samples),
Err(e) => {
tracing::error!("channel DSP mutex poisoned: {}", e);
}
}
}
}
@@ -425,7 +503,7 @@ mod tests {
&[(200_000.0, RigMode::USB, 3000, 64)],
);
assert_eq!(pipeline.pcm_senders.len(), 1);
assert_eq!(pipeline.channel_dsps.len(), 1);
assert_eq!(pipeline.channel_dsps.read().unwrap().len(), 1);
}
#[test]
@@ -442,6 +520,6 @@ mod tests {
&[],
);
assert_eq!(pipeline.pcm_senders.len(), 0);
assert_eq!(pipeline.channel_dsps.len(), 0);
assert_eq!(pipeline.channel_dsps.read().unwrap().len(), 0);
}
}
@@ -310,12 +310,12 @@ impl SoapySdrRig {
let Some((ais_a_idx, ais_b_idx)) = self.ais_channel_indices else {
return;
};
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(ais_a_idx) {
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(ais_a_idx) {
let if_hz = (self.freq.hz as i64 - self.center_hz) as f64;
dsp_arc.lock().unwrap().set_channel_if_hz(if_hz);
}
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(ais_b_idx) {
if let Some(dsp_arc) = dsps.get(ais_b_idx) {
let if_hz = (self.freq.hz as i64 + AIS_CHANNEL_SPACING_HZ - self.center_hz) as f64;
dsp_arc.lock().unwrap().set_channel_if_hz(if_hz);
}
@@ -325,9 +325,9 @@ impl SoapySdrRig {
let Some((ais_a_idx, ais_b_idx)) = self.ais_channel_indices else {
return;
};
let dsps = self.pipeline.channel_dsps.read().unwrap();
for idx in [ais_a_idx, ais_b_idx] {
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(idx) {
if let Some(dsp_arc) = dsps.get(idx) {
dsp_arc
.lock()
.unwrap()
@@ -341,17 +341,103 @@ impl SoapySdrRig {
return;
};
let enabled = matches!(self.mode, RigMode::AIS | RigMode::MARINE);
let dsps = self.pipeline.channel_dsps.read().unwrap();
for idx in [ais_a_idx, ais_b_idx] {
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(idx) {
if let Some(dsp_arc) = dsps.get(idx) {
dsp_arc.lock().unwrap().set_processing_enabled(enabled);
}
}
}
/// Current hardware center frequency (Hz).
pub fn center_hz(&self) -> i64 {
self.center_hz
}
/// Half of the SDR capture bandwidth (Hz). A virtual channel's dial
/// frequency must stay within `center_hz ± half_span_hz`.
pub fn half_span_hz(&self) -> i64 {
i64::from(self.pipeline.sdr_sample_rate) / 2
}
/// Allocate a new virtual DSP channel within the current SDR capture
/// bandwidth. Returns `None` if `freq_hz` is outside the capture span.
///
/// The returned senders can be subscribed to for PCM audio frames. The
/// `pipeline_slot` index identifies the slot for future
/// `virtual_channel_set_freq`, `virtual_channel_set_mode`, and
/// `virtual_channel_remove` calls.
pub fn virtual_channel_add(
&self,
freq_hz: u64,
mode: &RigMode,
bandwidth_hz: u32,
fir_taps: usize,
) -> Option<(
tokio::sync::broadcast::Sender<Vec<f32>>,
tokio::sync::broadcast::Sender<Vec<num_complex::Complex<f32>>>,
usize,
)> {
let channel_if_hz = freq_hz as i64 - self.center_hz;
if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() {
return None;
}
let (pcm_tx, iq_tx) =
self.pipeline
.add_virtual_channel(channel_if_hz as f64, mode, bandwidth_hz, fir_taps);
let slot = self
.pipeline
.channel_dsps
.read()
.unwrap()
.len()
.saturating_sub(1);
Some((pcm_tx, iq_tx, slot))
}
/// Remove a virtual channel at the given pipeline slot index.
/// Returns `false` if the slot is out of range.
pub fn virtual_channel_remove(&self, pipeline_slot: usize) -> bool {
self.pipeline.remove_virtual_channel(pipeline_slot)
}
/// Update the dial frequency of a virtual channel.
/// Returns `false` if the slot is out of range or the freq is outside
/// the current capture bandwidth.
pub fn virtual_channel_set_freq(&self, pipeline_slot: usize, freq_hz: u64) -> bool {
let channel_if_hz = freq_hz as i64 - self.center_hz;
if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() {
return false;
}
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(pipeline_slot) {
dsp_arc
.lock()
.unwrap()
.set_channel_if_hz(channel_if_hz as f64);
true
} else {
false
}
}
/// Update the demodulation mode of a virtual channel.
/// Returns `false` if the slot is out of range.
pub fn virtual_channel_set_mode(&self, pipeline_slot: usize, mode: &RigMode) -> bool {
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(pipeline_slot) {
dsp_arc.lock().unwrap().set_mode(mode);
true
} else {
false
}
}
pub fn subscribe_iq_channel(
&self,
channel_idx: usize,
) -> tokio::sync::broadcast::Receiver<Vec<num_complex::Complex<f32>>> {
// iq_senders covers fixed channels only (primary + AIS).
if let Some(sender) = self.pipeline.iq_senders.get(channel_idx) {
sender.subscribe()
} else {
@@ -437,12 +523,15 @@ impl RigCat for SoapySdrRig {
}
}
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64;
let mut dsp = dsp_arc.lock().unwrap();
dsp.set_channel_if_hz(channel_if_hz);
if freq_changed {
dsp.reset_wfm_state();
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64;
let mut dsp = dsp_arc.lock().unwrap();
dsp.set_channel_if_hz(channel_if_hz);
if freq_changed {
dsp.reset_wfm_state();
}
}
}
self.update_ais_channel_offsets();
@@ -460,9 +549,12 @@ impl RigCat for SoapySdrRig {
if let Ok(mut cmd) = self.retune_cmd.lock() {
*cmd = Some(self.center_hz as f64);
}
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64;
dsp_arc.lock().unwrap().set_channel_if_hz(channel_if_hz);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64;
dsp_arc.lock().unwrap().set_channel_if_hz(channel_if_hz);
}
}
self.update_ais_channel_offsets();
Ok(())
@@ -478,10 +570,13 @@ impl RigCat for SoapySdrRig {
self.mode = mode.clone();
self.bandwidth_hz = Self::default_bandwidth_for_mode(&mode);
// Update the primary channel's demodulator in the live pipeline.
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
let mut dsp = dsp_arc.lock().unwrap();
dsp.set_mode(&mode);
dsp.set_filter(self.bandwidth_hz, self.fir_taps as usize);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
let mut dsp = dsp_arc.lock().unwrap();
dsp.set_mode(&mode);
dsp.set_filter(self.bandwidth_hz, self.fir_taps as usize);
}
}
self.apply_ais_channel_activity();
self.apply_ais_channel_filters();
@@ -501,8 +596,11 @@ impl RigCat for SoapySdrRig {
}
};
self.wfm_deemphasis_us = deemphasis_us;
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
dsp_arc.lock().unwrap().set_wfm_deemphasis(deemphasis_us);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc.lock().unwrap().set_wfm_deemphasis(deemphasis_us);
}
}
Ok(())
})
@@ -545,11 +643,14 @@ impl RigCat for SoapySdrRig {
}
self.squelch_enabled = enabled;
self.squelch_threshold_db = threshold_db as f32;
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
dsp_arc
.lock()
.unwrap()
.set_squelch(enabled, self.squelch_threshold_db);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc
.lock()
.unwrap()
.set_squelch(enabled, self.squelch_threshold_db);
}
}
Ok(())
})
@@ -654,11 +755,14 @@ impl RigCat for SoapySdrRig {
Box::pin(async move {
tracing::debug!("SoapySdrRig: set_bandwidth -> {} Hz", bandwidth_hz);
self.bandwidth_hz = bandwidth_hz;
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
dsp_arc
.lock()
.unwrap()
.set_filter(bandwidth_hz, self.fir_taps as usize);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc
.lock()
.unwrap()
.set_filter(bandwidth_hz, self.fir_taps as usize);
}
}
self.apply_ais_channel_filters();
Ok(())
@@ -672,11 +776,14 @@ impl RigCat for SoapySdrRig {
Box::pin(async move {
tracing::debug!("SoapySdrRig: set_fir_taps -> {}", taps);
self.fir_taps = taps;
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
dsp_arc
.lock()
.unwrap()
.set_filter(self.bandwidth_hz, taps as usize);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc
.lock()
.unwrap()
.set_filter(self.bandwidth_hz, taps as usize);
}
}
self.apply_ais_channel_filters();
Ok(())
@@ -689,8 +796,11 @@ impl RigCat for SoapySdrRig {
) -> Pin<Box<dyn std::future::Future<Output = DynResult<()>> + Send + 'a>> {
Box::pin(async move {
self.wfm_stereo = enabled;
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
dsp_arc.lock().unwrap().set_wfm_stereo(enabled);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc.lock().unwrap().set_wfm_stereo(enabled);
}
}
Ok(())
})
@@ -702,8 +812,11 @@ impl RigCat for SoapySdrRig {
) -> Pin<Box<dyn std::future::Future<Output = DynResult<()>> + Send + 'a>> {
Box::pin(async move {
self.wfm_denoise = level;
if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) {
dsp_arc.lock().unwrap().set_wfm_denoise(level);
{
let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc.lock().unwrap().set_wfm_denoise(level);
}
}
Ok(())
})
@@ -713,6 +826,8 @@ impl RigCat for SoapySdrRig {
let wfm_stereo_detected = self
.pipeline
.channel_dsps
.read()
.unwrap()
.get(self.primary_channel_idx)
.and_then(|dsp| dsp.lock().ok().map(|d| d.wfm_stereo_detected()))
.unwrap_or(false);
@@ -739,6 +854,8 @@ impl RigCat for SoapySdrRig {
let rds = self
.pipeline
.channel_dsps
.read()
.unwrap()
.get(self.primary_channel_idx)
.and_then(|dsp| dsp.lock().ok().and_then(|d| d.rds_data()));
Some(SpectrumData {