diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index deaa459..9dfdd65 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -15,7 +15,7 @@ mod channel; mod filter; mod spectrum; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use num_complex::Complex; @@ -84,7 +84,10 @@ impl IqSource for MockIqSource { pub struct SdrPipeline { pub pcm_senders: Vec>>, pub iq_senders: Vec>>>, - pub channel_dsps: Vec>>, + /// All DSP channel slots, including fixed (primary, AIS) and dynamic + /// (user virtual) channels. Shared with the IQ read thread via RwLock. + /// Virtual channels are appended beyond the fixed slots. + pub channel_dsps: Arc>>>>, /// Latest FFT magnitude bins (dBFS, FFT-shifted), updated ~10 Hz. pub spectrum_buf: Arc>>>, /// SDR capture sample rate, needed by `SoapySdrRig::get_spectrum`. @@ -95,6 +98,13 @@ pub struct SdrPipeline { /// Write `Some(gain_db)` here to adjust the hardware RX gain. /// The IQ read loop picks it up on the next iteration. pub gain_cmd: Arc>>, + // Parameters cached here so `add_virtual_channel` can construct new + // `ChannelDsp` instances without needing to be passed them again. + audio_sample_rate: u32, + audio_channels: usize, + frame_duration_ms: u16, + wfm_deemphasis_us: u32, + wfm_stereo: bool, } impl SdrPipeline { @@ -117,7 +127,7 @@ impl SdrPipeline { let mut pcm_senders = Vec::with_capacity(channels.len()); let mut iq_senders = Vec::with_capacity(channels.len()); - let mut channel_dsps: Vec>> = Vec::with_capacity(channels.len()); + let mut channel_dsps_vec: Vec>> = Vec::with_capacity(channels.len()); for (channel_idx, &(channel_if_hz, ref mode, audio_bandwidth_hz, fir_taps)) in channels.iter().enumerate() @@ -146,10 +156,12 @@ impl SdrPipeline { ); pcm_senders.push(pcm_tx); iq_senders.push(iq_tx); - channel_dsps.push(Arc::new(Mutex::new(dsp))); + channel_dsps_vec.push(Arc::new(Mutex::new(dsp))); } - let thread_dsps: Vec>> = channel_dsps.clone(); + let channel_dsps: Arc>>>> = + Arc::new(RwLock::new(channel_dsps_vec)); + let thread_dsps = channel_dsps.clone(); let spectrum_buf: Arc>>> = Arc::new(Mutex::new(None)); let thread_spectrum_buf = spectrum_buf.clone(); let retune_cmd: Arc>> = Arc::new(std::sync::Mutex::new(None)); @@ -180,8 +192,67 @@ impl SdrPipeline { sdr_sample_rate, retune_cmd, gain_cmd, + audio_sample_rate, + audio_channels: output_channels, + frame_duration_ms, + wfm_deemphasis_us, + wfm_stereo, } } + + /// Allocate a new virtual DSP channel. + /// + /// Returns the PCM and IQ broadcast senders so the caller can subscribe to + /// audio frames and raw decimated IQ respectively. The channel is appended + /// beyond all fixed slots and is immediately visible to the IQ read thread. + pub fn add_virtual_channel( + &self, + channel_if_hz: f64, + mode: &RigMode, + bandwidth_hz: u32, + fir_taps: usize, + ) -> (broadcast::Sender>, broadcast::Sender>>) { + const PCM_BROADCAST_CAPACITY: usize = 32; + const IQ_BROADCAST_CAPACITY: usize = 64; + let (pcm_tx, _) = broadcast::channel::>(PCM_BROADCAST_CAPACITY); + let (iq_tx, _) = broadcast::channel::>>(IQ_BROADCAST_CAPACITY); + let dsp = ChannelDsp::new( + channel_if_hz, + mode, + self.sdr_sample_rate, + self.audio_sample_rate, + self.audio_channels, + self.frame_duration_ms, + bandwidth_hz, + self.wfm_deemphasis_us, + self.wfm_stereo, + fir_taps.max(1), + VirtualSquelchConfig::default(), + pcm_tx.clone(), + iq_tx.clone(), + ); + self.channel_dsps + .write() + .expect("channel_dsps RwLock poisoned") + .push(Arc::new(Mutex::new(dsp))); + (pcm_tx, iq_tx) + } + + /// Remove a DSP channel slot by its index in the full `channel_dsps` list. + /// + /// Returns `false` when the index is out of range. Callers are responsible + /// for not removing fixed slots (primary, AIS). + pub fn remove_virtual_channel(&self, idx: usize) -> bool { + let mut dsps = self + .channel_dsps + .write() + .expect("channel_dsps RwLock poisoned"); + if idx >= dsps.len() { + return false; + } + dsps.remove(idx); + true + } } // --------------------------------------------------------------------------- @@ -193,7 +264,7 @@ pub const IQ_BLOCK_SIZE: usize = 4096; fn iq_read_loop( mut source: Box, sdr_sample_rate: u32, - channel_dsps: Vec>>, + channel_dsps: Arc>>>>, iq_tx: broadcast::Sender>>, spectrum_buf: Arc>>>, retune_cmd: Arc>>, @@ -326,11 +397,18 @@ fn iq_read_loop( let _ = iq_tx.send(samples.to_vec()); } - for dsp_arc in &channel_dsps { - match dsp_arc.lock() { - Ok(mut dsp) => dsp.process_block(samples), - Err(e) => { - tracing::error!("channel DSP mutex poisoned: {}", e); + // Hold a read lock only for the duration of this block's DSP pass. + // Write lock (add/remove channel) waits at most one block (~2 ms). + { + let dsps = channel_dsps + .read() + .expect("channel_dsps RwLock poisoned"); + for dsp_arc in dsps.iter() { + match dsp_arc.lock() { + Ok(mut dsp) => dsp.process_block(samples), + Err(e) => { + tracing::error!("channel DSP mutex poisoned: {}", e); + } } } } @@ -425,7 +503,7 @@ mod tests { &[(200_000.0, RigMode::USB, 3000, 64)], ); assert_eq!(pipeline.pcm_senders.len(), 1); - assert_eq!(pipeline.channel_dsps.len(), 1); + assert_eq!(pipeline.channel_dsps.read().unwrap().len(), 1); } #[test] @@ -442,6 +520,6 @@ mod tests { &[], ); assert_eq!(pipeline.pcm_senders.len(), 0); - assert_eq!(pipeline.channel_dsps.len(), 0); + assert_eq!(pipeline.channel_dsps.read().unwrap().len(), 0); } } diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs index 4c3db44..7dc524a 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs @@ -310,12 +310,12 @@ impl SoapySdrRig { let Some((ais_a_idx, ais_b_idx)) = self.ais_channel_indices else { return; }; - - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(ais_a_idx) { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(ais_a_idx) { let if_hz = (self.freq.hz as i64 - self.center_hz) as f64; dsp_arc.lock().unwrap().set_channel_if_hz(if_hz); } - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(ais_b_idx) { + if let Some(dsp_arc) = dsps.get(ais_b_idx) { let if_hz = (self.freq.hz as i64 + AIS_CHANNEL_SPACING_HZ - self.center_hz) as f64; dsp_arc.lock().unwrap().set_channel_if_hz(if_hz); } @@ -325,9 +325,9 @@ impl SoapySdrRig { let Some((ais_a_idx, ais_b_idx)) = self.ais_channel_indices else { return; }; - + let dsps = self.pipeline.channel_dsps.read().unwrap(); for idx in [ais_a_idx, ais_b_idx] { - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(idx) { + if let Some(dsp_arc) = dsps.get(idx) { dsp_arc .lock() .unwrap() @@ -341,17 +341,103 @@ impl SoapySdrRig { return; }; let enabled = matches!(self.mode, RigMode::AIS | RigMode::MARINE); + let dsps = self.pipeline.channel_dsps.read().unwrap(); for idx in [ais_a_idx, ais_b_idx] { - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(idx) { + if let Some(dsp_arc) = dsps.get(idx) { dsp_arc.lock().unwrap().set_processing_enabled(enabled); } } } + /// Current hardware center frequency (Hz). + pub fn center_hz(&self) -> i64 { + self.center_hz + } + + /// Half of the SDR capture bandwidth (Hz). A virtual channel's dial + /// frequency must stay within `center_hz ± half_span_hz`. + pub fn half_span_hz(&self) -> i64 { + i64::from(self.pipeline.sdr_sample_rate) / 2 + } + + /// Allocate a new virtual DSP channel within the current SDR capture + /// bandwidth. Returns `None` if `freq_hz` is outside the capture span. + /// + /// The returned senders can be subscribed to for PCM audio frames. The + /// `pipeline_slot` index identifies the slot for future + /// `virtual_channel_set_freq`, `virtual_channel_set_mode`, and + /// `virtual_channel_remove` calls. + pub fn virtual_channel_add( + &self, + freq_hz: u64, + mode: &RigMode, + bandwidth_hz: u32, + fir_taps: usize, + ) -> Option<( + tokio::sync::broadcast::Sender>, + tokio::sync::broadcast::Sender>>, + usize, + )> { + let channel_if_hz = freq_hz as i64 - self.center_hz; + if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() { + return None; + } + let (pcm_tx, iq_tx) = + self.pipeline + .add_virtual_channel(channel_if_hz as f64, mode, bandwidth_hz, fir_taps); + let slot = self + .pipeline + .channel_dsps + .read() + .unwrap() + .len() + .saturating_sub(1); + Some((pcm_tx, iq_tx, slot)) + } + + /// Remove a virtual channel at the given pipeline slot index. + /// Returns `false` if the slot is out of range. + pub fn virtual_channel_remove(&self, pipeline_slot: usize) -> bool { + self.pipeline.remove_virtual_channel(pipeline_slot) + } + + /// Update the dial frequency of a virtual channel. + /// Returns `false` if the slot is out of range or the freq is outside + /// the current capture bandwidth. + pub fn virtual_channel_set_freq(&self, pipeline_slot: usize, freq_hz: u64) -> bool { + let channel_if_hz = freq_hz as i64 - self.center_hz; + if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() { + return false; + } + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(pipeline_slot) { + dsp_arc + .lock() + .unwrap() + .set_channel_if_hz(channel_if_hz as f64); + true + } else { + false + } + } + + /// Update the demodulation mode of a virtual channel. + /// Returns `false` if the slot is out of range. + pub fn virtual_channel_set_mode(&self, pipeline_slot: usize, mode: &RigMode) -> bool { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(pipeline_slot) { + dsp_arc.lock().unwrap().set_mode(mode); + true + } else { + false + } + } + pub fn subscribe_iq_channel( &self, channel_idx: usize, ) -> tokio::sync::broadcast::Receiver>> { + // iq_senders covers fixed channels only (primary + AIS). if let Some(sender) = self.pipeline.iq_senders.get(channel_idx) { sender.subscribe() } else { @@ -437,12 +523,15 @@ impl RigCat for SoapySdrRig { } } - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64; - let mut dsp = dsp_arc.lock().unwrap(); - dsp.set_channel_if_hz(channel_if_hz); - if freq_changed { - dsp.reset_wfm_state(); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64; + let mut dsp = dsp_arc.lock().unwrap(); + dsp.set_channel_if_hz(channel_if_hz); + if freq_changed { + dsp.reset_wfm_state(); + } } } self.update_ais_channel_offsets(); @@ -460,9 +549,12 @@ impl RigCat for SoapySdrRig { if let Ok(mut cmd) = self.retune_cmd.lock() { *cmd = Some(self.center_hz as f64); } - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64; - dsp_arc.lock().unwrap().set_channel_if_hz(channel_if_hz); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + let channel_if_hz = (self.freq.hz as i64 - self.center_hz) as f64; + dsp_arc.lock().unwrap().set_channel_if_hz(channel_if_hz); + } } self.update_ais_channel_offsets(); Ok(()) @@ -478,10 +570,13 @@ impl RigCat for SoapySdrRig { self.mode = mode.clone(); self.bandwidth_hz = Self::default_bandwidth_for_mode(&mode); // Update the primary channel's demodulator in the live pipeline. - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - let mut dsp = dsp_arc.lock().unwrap(); - dsp.set_mode(&mode); - dsp.set_filter(self.bandwidth_hz, self.fir_taps as usize); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + let mut dsp = dsp_arc.lock().unwrap(); + dsp.set_mode(&mode); + dsp.set_filter(self.bandwidth_hz, self.fir_taps as usize); + } } self.apply_ais_channel_activity(); self.apply_ais_channel_filters(); @@ -501,8 +596,11 @@ impl RigCat for SoapySdrRig { } }; self.wfm_deemphasis_us = deemphasis_us; - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - dsp_arc.lock().unwrap().set_wfm_deemphasis(deemphasis_us); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + dsp_arc.lock().unwrap().set_wfm_deemphasis(deemphasis_us); + } } Ok(()) }) @@ -545,11 +643,14 @@ impl RigCat for SoapySdrRig { } self.squelch_enabled = enabled; self.squelch_threshold_db = threshold_db as f32; - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - dsp_arc - .lock() - .unwrap() - .set_squelch(enabled, self.squelch_threshold_db); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + dsp_arc + .lock() + .unwrap() + .set_squelch(enabled, self.squelch_threshold_db); + } } Ok(()) }) @@ -654,11 +755,14 @@ impl RigCat for SoapySdrRig { Box::pin(async move { tracing::debug!("SoapySdrRig: set_bandwidth -> {} Hz", bandwidth_hz); self.bandwidth_hz = bandwidth_hz; - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - dsp_arc - .lock() - .unwrap() - .set_filter(bandwidth_hz, self.fir_taps as usize); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + dsp_arc + .lock() + .unwrap() + .set_filter(bandwidth_hz, self.fir_taps as usize); + } } self.apply_ais_channel_filters(); Ok(()) @@ -672,11 +776,14 @@ impl RigCat for SoapySdrRig { Box::pin(async move { tracing::debug!("SoapySdrRig: set_fir_taps -> {}", taps); self.fir_taps = taps; - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - dsp_arc - .lock() - .unwrap() - .set_filter(self.bandwidth_hz, taps as usize); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + dsp_arc + .lock() + .unwrap() + .set_filter(self.bandwidth_hz, taps as usize); + } } self.apply_ais_channel_filters(); Ok(()) @@ -689,8 +796,11 @@ impl RigCat for SoapySdrRig { ) -> Pin> + Send + 'a>> { Box::pin(async move { self.wfm_stereo = enabled; - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - dsp_arc.lock().unwrap().set_wfm_stereo(enabled); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + dsp_arc.lock().unwrap().set_wfm_stereo(enabled); + } } Ok(()) }) @@ -702,8 +812,11 @@ impl RigCat for SoapySdrRig { ) -> Pin> + Send + 'a>> { Box::pin(async move { self.wfm_denoise = level; - if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { - dsp_arc.lock().unwrap().set_wfm_denoise(level); + { + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { + dsp_arc.lock().unwrap().set_wfm_denoise(level); + } } Ok(()) }) @@ -713,6 +826,8 @@ impl RigCat for SoapySdrRig { let wfm_stereo_detected = self .pipeline .channel_dsps + .read() + .unwrap() .get(self.primary_channel_idx) .and_then(|dsp| dsp.lock().ok().map(|d| d.wfm_stereo_detected())) .unwrap_or(false); @@ -739,6 +854,8 @@ impl RigCat for SoapySdrRig { let rds = self .pipeline .channel_dsps + .read() + .unwrap() .get(self.primary_channel_idx) .and_then(|dsp| dsp.lock().ok().and_then(|d| d.rds_data())); Some(SpectrumData {