[feat](trx-backend): VirtualChannelManager trait + SdrVirtualChannelManager impl
Add VirtualChannelManager trait in trx-core::vchan with types VChannelInfo, VChanError, and SharedVChanManager alias. Re-export from trx-backend::vchan. Implement SdrVirtualChannelManager in trx-backend-soapysdr: - Wraps Arc<SdrPipeline> + shared AtomicI64 center_hz - add_channel / remove_channel / set_channel_freq / set_channel_mode - Slot-stability: on remove, shifts pipeline_slot for surviving channels - update_center_hz: recomputes IF offsets for all virtual channels on retune - update_primary_meta: keeps channel-0 freq/mode in sync for API consumers Wire into SoapySdrRig (holds Arc<SdrVirtualChannelManager>, exposes channel_manager()), SdrPipeline (shared_center_hz AtomicI64), and RigHandle (vchan_manager: Option<SharedVChanManager>). main.rs extracts the manager before boxing the SDR rig and stores it in the handle. Add max_virtual_channels to SdrConfig (default 4, TOML-configurable). Add 5 unit tests: add, remove, permanent guard, cap, out-of-bandwidth. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -345,6 +345,14 @@ pub struct SdrConfig {
|
||||
pub squelch: SdrSquelchConfig,
|
||||
/// Virtual receiver channels (at least one required when SDR backend is active).
|
||||
pub channels: Vec<SdrChannelConfig>,
|
||||
/// Maximum number of simultaneous virtual channels (including the primary).
|
||||
/// Default: 4.
|
||||
#[serde(default = "default_max_virtual_channels")]
|
||||
pub max_virtual_channels: usize,
|
||||
}
|
||||
|
||||
fn default_max_virtual_channels() -> usize {
|
||||
4
|
||||
}
|
||||
|
||||
impl Default for SdrConfig {
|
||||
@@ -357,6 +365,7 @@ impl Default for SdrConfig {
|
||||
gain: SdrGainConfig::default(),
|
||||
squelch: SdrSquelchConfig::default(),
|
||||
channels: Vec::new(),
|
||||
max_virtual_channels: default_max_virtual_channels(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -466,6 +466,7 @@ mod tests {
|
||||
rig_tx,
|
||||
state_rx,
|
||||
audio_port: 4531,
|
||||
vchan_manager: None,
|
||||
};
|
||||
let mut map = HashMap::new();
|
||||
map.insert("default".to_string(), handle);
|
||||
|
||||
@@ -285,6 +285,7 @@ type SdrRigBuildResult = DynResult<(
|
||||
tokio::sync::broadcast::Receiver<Vec<f32>>,
|
||||
),
|
||||
tokio::sync::broadcast::Receiver<Vec<num_complex::Complex<f32>>>,
|
||||
trx_core::vchan::SharedVChanManager,
|
||||
)>;
|
||||
|
||||
type OptionalSdrRig = Option<Box<dyn trx_core::rig::RigCat>>;
|
||||
@@ -348,6 +349,7 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult
|
||||
rig_cfg.sdr.squelch.threshold_db,
|
||||
rig_cfg.sdr.squelch.hysteresis_db,
|
||||
rig_cfg.sdr.squelch.tail_ms,
|
||||
rig_cfg.sdr.max_virtual_channels,
|
||||
)?;
|
||||
|
||||
let pcm_rx = sdr_rig.subscribe_pcm();
|
||||
@@ -363,11 +365,14 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult
|
||||
.position(|(_, mode, _, _)| matches!(mode, trx_core::rig::state::RigMode::VDES | trx_core::rig::state::RigMode::MARINE))
|
||||
.unwrap_or(0);
|
||||
let vdes_iq = sdr_rig.subscribe_iq_channel(vdes_channel_idx);
|
||||
// Extract the virtual channel manager before the rig is consumed by Box.
|
||||
let vchan_manager: trx_core::vchan::SharedVChanManager = sdr_rig.channel_manager();
|
||||
Ok((
|
||||
Box::new(sdr_rig) as Box<dyn trx_core::rig::RigCat>,
|
||||
pcm_rx,
|
||||
ais_pcm,
|
||||
vdes_iq,
|
||||
vchan_manager,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -917,13 +922,17 @@ async fn main() -> DynResult<()> {
|
||||
|
||||
// Build SDR rig when applicable.
|
||||
#[cfg(feature = "soapysdr")]
|
||||
let mut sdr_vchan_manager: Option<trx_core::vchan::SharedVChanManager> = None;
|
||||
#[cfg(feature = "soapysdr")]
|
||||
let (sdr_prebuilt_rig, sdr_pcm_rx, sdr_ais_pcm_rx, sdr_vdes_iq_rx): (
|
||||
OptionalSdrRig,
|
||||
OptionalSdrPcmRx,
|
||||
OptionalSdrAisPcmRx,
|
||||
OptionalSdrVdesIqRx,
|
||||
) = if rig_cfg.rig.access.access_type.as_deref() == Some("sdr") {
|
||||
let (rig, pcm_rx, ais_pcm_rx, vdes_iq_rx) = build_sdr_rig_from_instance(rig_cfg)?;
|
||||
let (rig, pcm_rx, ais_pcm_rx, vdes_iq_rx, vchan_mgr) =
|
||||
build_sdr_rig_from_instance(rig_cfg)?;
|
||||
sdr_vchan_manager = Some(vchan_mgr);
|
||||
(Some(rig), Some(pcm_rx), Some(ais_pcm_rx), Some(vdes_iq_rx))
|
||||
} else {
|
||||
(None, None, None, None)
|
||||
@@ -1005,6 +1014,11 @@ async fn main() -> DynResult<()> {
|
||||
);
|
||||
task_handles.extend(audio_handles);
|
||||
|
||||
#[cfg(feature = "soapysdr")]
|
||||
let vchan_manager_for_handle = sdr_vchan_manager;
|
||||
#[cfg(not(feature = "soapysdr"))]
|
||||
let vchan_manager_for_handle: Option<trx_core::vchan::SharedVChanManager> = None;
|
||||
|
||||
rig_handles.insert(
|
||||
rig_cfg.id.clone(),
|
||||
RigHandle {
|
||||
@@ -1013,6 +1027,7 @@ async fn main() -> DynResult<()> {
|
||||
rig_tx,
|
||||
state_rx,
|
||||
audio_port: rig_cfg.audio.port,
|
||||
vchan_manager: vchan_manager_for_handle,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
use trx_core::vchan::SharedVChanManager;
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
|
||||
@@ -24,4 +25,6 @@ pub struct RigHandle {
|
||||
pub state_rx: watch::Receiver<RigState>,
|
||||
/// Per-rig audio listener TCP port.
|
||||
pub audio_port: u16,
|
||||
/// Virtual channel manager; `Some` only for SDR rigs.
|
||||
pub vchan_manager: Option<SharedVChanManager>,
|
||||
}
|
||||
|
||||
@@ -21,4 +21,6 @@ trx-backend-soapysdr = { path = "./trx-backend-soapysdr", optional = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tokio-serial = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -8,6 +8,9 @@ use trx_core::rig::RigCat;
|
||||
use trx_core::DynResult;
|
||||
|
||||
mod dummy;
|
||||
pub mod vchan;
|
||||
|
||||
pub use vchan::{SharedVChanManager, VChanError, VChannelInfo, VirtualChannelManager};
|
||||
|
||||
#[cfg(feature = "ft450d")]
|
||||
use trx_backend_ft450d::Ft450d;
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
// Re-export the trait and types from trx-core so crates that depend on
|
||||
// trx-backend can use them without a direct trx-core dependency.
|
||||
pub use trx_core::vchan::{SharedVChanManager, VChanError, VChannelInfo, VirtualChannelManager};
|
||||
@@ -13,6 +13,7 @@ trx-core = { path = "../../../trx-core" }
|
||||
trx-rds = { path = "../../../decoders/trx-rds" }
|
||||
tokio = { workspace = true, features = ["sync", "rt"] }
|
||||
serde = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
num-complex = "0.4"
|
||||
rustfft = "6"
|
||||
|
||||
@@ -15,6 +15,7 @@ mod channel;
|
||||
mod filter;
|
||||
mod spectrum;
|
||||
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -98,6 +99,9 @@ pub struct SdrPipeline {
|
||||
/// Write `Some(gain_db)` here to adjust the hardware RX gain.
|
||||
/// The IQ read loop picks it up on the next iteration.
|
||||
pub gain_cmd: Arc<std::sync::Mutex<Option<f64>>>,
|
||||
/// Current hardware center frequency in Hz, kept in sync by `SoapySdrRig`.
|
||||
/// Read by `SdrVirtualChannelManager` to validate and compute IF offsets.
|
||||
pub shared_center_hz: Arc<AtomicI64>,
|
||||
// Parameters cached here so `add_virtual_channel` can construct new
|
||||
// `ChannelDsp` instances without needing to be passed them again.
|
||||
audio_sample_rate: u32,
|
||||
@@ -192,6 +196,7 @@ impl SdrPipeline {
|
||||
sdr_sample_rate,
|
||||
retune_cmd,
|
||||
gain_cmd,
|
||||
shared_center_hz: Arc::new(AtomicI64::new(0)),
|
||||
audio_sample_rate,
|
||||
audio_channels: output_channels,
|
||||
frame_duration_ms,
|
||||
|
||||
@@ -5,8 +5,10 @@
|
||||
pub mod demod;
|
||||
pub mod dsp;
|
||||
pub mod real_iq_source;
|
||||
pub mod vchan_impl;
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use trx_core::radio::freq::{Band, Freq};
|
||||
@@ -19,12 +21,14 @@ use trx_core::{DynResult, RigMode};
|
||||
|
||||
const AIS_CHANNEL_SPACING_HZ: i64 = 50_000;
|
||||
|
||||
pub use vchan_impl::SdrVirtualChannelManager;
|
||||
|
||||
/// RX-only backend for any SoapySDR-compatible device.
|
||||
pub struct SoapySdrRig {
|
||||
info: RigInfo,
|
||||
freq: Freq,
|
||||
mode: RigMode,
|
||||
pipeline: dsp::SdrPipeline,
|
||||
pipeline: Arc<dsp::SdrPipeline>,
|
||||
/// Index of the primary channel in `pipeline.channel_dsps`.
|
||||
primary_channel_idx: usize,
|
||||
/// Current filter state of the primary channel (for filter_controls support).
|
||||
@@ -55,6 +59,8 @@ pub struct SoapySdrRig {
|
||||
squelch_threshold_db: f32,
|
||||
/// Hidden AIS decoder channels (A and B) when available.
|
||||
ais_channel_indices: Option<(usize, usize)>,
|
||||
/// Virtual channel manager shared with external consumers (e.g. RigHandle).
|
||||
channel_manager: Arc<vchan_impl::SdrVirtualChannelManager>,
|
||||
}
|
||||
|
||||
impl SoapySdrRig {
|
||||
@@ -118,6 +124,7 @@ impl SoapySdrRig {
|
||||
squelch_threshold_db: f32,
|
||||
squelch_hysteresis_db: f32,
|
||||
squelch_tail_ms: u32,
|
||||
max_virtual_channels: usize,
|
||||
) -> DynResult<Self> {
|
||||
tracing::info!(
|
||||
"initialising SoapySDR backend (args={:?}, gain_mode={:?}, gain_db={}, max_gain_db={:?})",
|
||||
@@ -184,7 +191,7 @@ impl SoapySdrRig {
|
||||
(squelch_tail_ms as f64 / block_ms).ceil().max(0.0) as u32
|
||||
};
|
||||
|
||||
let pipeline = dsp::SdrPipeline::start(
|
||||
let pipeline = Arc::new(dsp::SdrPipeline::start(
|
||||
iq_source,
|
||||
sdr_sample_rate,
|
||||
audio_sample_rate,
|
||||
@@ -199,7 +206,7 @@ impl SoapySdrRig {
|
||||
tail_blocks: squelch_tail_blocks,
|
||||
},
|
||||
&all_channels,
|
||||
);
|
||||
));
|
||||
|
||||
let info = RigInfo {
|
||||
manufacturer: "SoapySDR".to_string(),
|
||||
@@ -254,6 +261,18 @@ impl SoapySdrRig {
|
||||
|
||||
let spectrum_buf = pipeline.spectrum_buf.clone();
|
||||
let retune_cmd = pipeline.retune_cmd.clone();
|
||||
// Initial center_hz stored in the pipeline's shared atomic so the
|
||||
// virtual channel manager can read it without a reference to SoapySdrRig.
|
||||
pipeline
|
||||
.shared_center_hz
|
||||
.store(hardware_center_hz, Ordering::Relaxed);
|
||||
// Fixed slots: user-configured channels + 2 AIS channels.
|
||||
let fixed_slot_count = all_channels.len();
|
||||
let channel_manager = Arc::new(vchan_impl::SdrVirtualChannelManager::new(
|
||||
pipeline.clone(),
|
||||
fixed_slot_count,
|
||||
max_virtual_channels,
|
||||
));
|
||||
|
||||
let rig = Self {
|
||||
info,
|
||||
@@ -275,6 +294,7 @@ impl SoapySdrRig {
|
||||
squelch_enabled,
|
||||
squelch_threshold_db,
|
||||
ais_channel_indices: Some((primary_channel_count, primary_channel_count + 1)),
|
||||
channel_manager,
|
||||
};
|
||||
rig.apply_ais_channel_activity();
|
||||
Ok(rig)
|
||||
@@ -303,9 +323,16 @@ impl SoapySdrRig {
|
||||
-65.0, // squelch_threshold_db
|
||||
3.0, // squelch_hysteresis_db
|
||||
180, // squelch_tail_ms
|
||||
4, // max_virtual_channels
|
||||
)
|
||||
}
|
||||
|
||||
/// Return the virtual channel manager for this SDR rig.
|
||||
/// Used by `RigHandle` to expose the channel API.
|
||||
pub fn channel_manager(&self) -> trx_core::vchan::SharedVChanManager {
|
||||
self.channel_manager.clone()
|
||||
}
|
||||
|
||||
fn update_ais_channel_offsets(&self) {
|
||||
let Some((ais_a_idx, ais_b_idx)) = self.ais_channel_indices else {
|
||||
return;
|
||||
@@ -354,85 +381,11 @@ impl SoapySdrRig {
|
||||
self.center_hz
|
||||
}
|
||||
|
||||
/// Half of the SDR capture bandwidth (Hz). A virtual channel's dial
|
||||
/// frequency must stay within `center_hz ± half_span_hz`.
|
||||
/// Half of the SDR capture bandwidth (Hz).
|
||||
pub fn half_span_hz(&self) -> i64 {
|
||||
i64::from(self.pipeline.sdr_sample_rate) / 2
|
||||
}
|
||||
|
||||
/// Allocate a new virtual DSP channel within the current SDR capture
|
||||
/// bandwidth. Returns `None` if `freq_hz` is outside the capture span.
|
||||
///
|
||||
/// The returned senders can be subscribed to for PCM audio frames. The
|
||||
/// `pipeline_slot` index identifies the slot for future
|
||||
/// `virtual_channel_set_freq`, `virtual_channel_set_mode`, and
|
||||
/// `virtual_channel_remove` calls.
|
||||
pub fn virtual_channel_add(
|
||||
&self,
|
||||
freq_hz: u64,
|
||||
mode: &RigMode,
|
||||
bandwidth_hz: u32,
|
||||
fir_taps: usize,
|
||||
) -> Option<(
|
||||
tokio::sync::broadcast::Sender<Vec<f32>>,
|
||||
tokio::sync::broadcast::Sender<Vec<num_complex::Complex<f32>>>,
|
||||
usize,
|
||||
)> {
|
||||
let channel_if_hz = freq_hz as i64 - self.center_hz;
|
||||
if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() {
|
||||
return None;
|
||||
}
|
||||
let (pcm_tx, iq_tx) =
|
||||
self.pipeline
|
||||
.add_virtual_channel(channel_if_hz as f64, mode, bandwidth_hz, fir_taps);
|
||||
let slot = self
|
||||
.pipeline
|
||||
.channel_dsps
|
||||
.read()
|
||||
.unwrap()
|
||||
.len()
|
||||
.saturating_sub(1);
|
||||
Some((pcm_tx, iq_tx, slot))
|
||||
}
|
||||
|
||||
/// Remove a virtual channel at the given pipeline slot index.
|
||||
/// Returns `false` if the slot is out of range.
|
||||
pub fn virtual_channel_remove(&self, pipeline_slot: usize) -> bool {
|
||||
self.pipeline.remove_virtual_channel(pipeline_slot)
|
||||
}
|
||||
|
||||
/// Update the dial frequency of a virtual channel.
|
||||
/// Returns `false` if the slot is out of range or the freq is outside
|
||||
/// the current capture bandwidth.
|
||||
pub fn virtual_channel_set_freq(&self, pipeline_slot: usize, freq_hz: u64) -> bool {
|
||||
let channel_if_hz = freq_hz as i64 - self.center_hz;
|
||||
if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() {
|
||||
return false;
|
||||
}
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
if let Some(dsp_arc) = dsps.get(pipeline_slot) {
|
||||
dsp_arc
|
||||
.lock()
|
||||
.unwrap()
|
||||
.set_channel_if_hz(channel_if_hz as f64);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the demodulation mode of a virtual channel.
|
||||
/// Returns `false` if the slot is out of range.
|
||||
pub fn virtual_channel_set_mode(&self, pipeline_slot: usize, mode: &RigMode) -> bool {
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
if let Some(dsp_arc) = dsps.get(pipeline_slot) {
|
||||
dsp_arc.lock().unwrap().set_mode(mode);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe_iq_channel(
|
||||
&self,
|
||||
channel_idx: usize,
|
||||
@@ -521,6 +474,7 @@ impl RigCat for SoapySdrRig {
|
||||
if let Ok(mut cmd) = self.retune_cmd.lock() {
|
||||
*cmd = Some(hardware_hz as f64);
|
||||
}
|
||||
self.channel_manager.update_center_hz(hardware_hz);
|
||||
}
|
||||
|
||||
{
|
||||
@@ -549,6 +503,7 @@ impl RigCat for SoapySdrRig {
|
||||
if let Ok(mut cmd) = self.retune_cmd.lock() {
|
||||
*cmd = Some(self.center_hz as f64);
|
||||
}
|
||||
self.channel_manager.update_center_hz(self.center_hz);
|
||||
{
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
|
||||
|
||||
@@ -0,0 +1,378 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
//! Concrete `VirtualChannelManager` implementation for SoapySDR rigs.
|
||||
//!
|
||||
//! `SdrVirtualChannelManager` wraps an `Arc<SdrPipeline>` and maintains a list
|
||||
//! of managed channel entries. The primary channel (pipeline slot 0) is always
|
||||
//! present and marked permanent; additional virtual channels are appended
|
||||
//! dynamically.
|
||||
//!
|
||||
//! ## Slot stability
|
||||
//!
|
||||
//! Virtual channels occupy pipeline slots `fixed_slot_count..`. When a channel
|
||||
//! at slot *K* is removed, `Vec::remove(K)` shifts all entries K+1..end left by
|
||||
//! one; the manager updates every surviving entry's `pipeline_slot` accordingly.
|
||||
//!
|
||||
//! ## Center-frequency updates
|
||||
//!
|
||||
//! When the hardware retunes (changing `center_hz`), all channel IF offsets must
|
||||
//! be recomputed. The rig calls `update_center_hz()` after every retune; this
|
||||
//! updates every `ChannelDsp` in a single write-locked pass.
|
||||
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use num_complex::Complex;
|
||||
use tokio::sync::broadcast;
|
||||
use trx_core::rig::state::RigMode;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::dsp::{SdrPipeline, VirtualSquelchConfig};
|
||||
use trx_core::vchan::{VChanError, VChannelInfo, VirtualChannelManager};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Default DSP parameters for virtual channels
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const DEFAULT_FIR_TAPS: usize = 64;
|
||||
|
||||
fn default_bandwidth_hz(mode: &RigMode) -> u32 {
|
||||
match mode {
|
||||
RigMode::CW | RigMode::CWR => 500,
|
||||
RigMode::LSB | RigMode::USB | RigMode::DIG => 3_000,
|
||||
RigMode::AM => 9_000,
|
||||
RigMode::FM => 12_500,
|
||||
RigMode::WFM => 180_000,
|
||||
RigMode::PKT | RigMode::AIS => 25_000,
|
||||
RigMode::VDES | RigMode::MARINE => 100_000,
|
||||
RigMode::Other(_) => 3_000,
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Internal channel record
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
struct ManagedChannel {
|
||||
id: Uuid,
|
||||
freq_hz: u64,
|
||||
mode: RigMode,
|
||||
/// `broadcast::Sender` kept alive so new subscribers can join at any time.
|
||||
pcm_tx: broadcast::Sender<Vec<f32>>,
|
||||
/// IQ tap sender (kept alive; external consumers may subscribe).
|
||||
#[allow(dead_code)]
|
||||
iq_tx: broadcast::Sender<Vec<Complex<f32>>>,
|
||||
/// Index of this channel in `pipeline.channel_dsps`.
|
||||
pipeline_slot: usize,
|
||||
/// True only for the primary channel; prevents removal.
|
||||
permanent: bool,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SdrVirtualChannelManager
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub struct SdrVirtualChannelManager {
|
||||
pipeline: Arc<SdrPipeline>,
|
||||
/// Current SDR hardware center frequency, updated on every retune.
|
||||
center_hz: Arc<AtomicI64>,
|
||||
/// Pipeline slots 0..fixed_slot_count are reserved (primary + AIS).
|
||||
/// Virtual channels occupy slots fixed_slot_count and above.
|
||||
fixed_slot_count: usize,
|
||||
/// Maximum total channels including the primary (enforced on `add_channel`).
|
||||
max_total: usize,
|
||||
channels: RwLock<Vec<ManagedChannel>>,
|
||||
}
|
||||
|
||||
impl SdrVirtualChannelManager {
|
||||
/// Create a new manager.
|
||||
///
|
||||
/// - `pipeline`: shared reference to the running `SdrPipeline`.
|
||||
/// - `fixed_slot_count`: number of fixed pipeline slots (primary + AIS),
|
||||
/// i.e. the index of the first slot available for virtual channels.
|
||||
/// - `max_total`: maximum total channels including primary (e.g. 4).
|
||||
pub fn new(
|
||||
pipeline: Arc<SdrPipeline>,
|
||||
fixed_slot_count: usize,
|
||||
max_total: usize,
|
||||
) -> Self {
|
||||
// Seed the channel list with a synthetic primary-channel entry.
|
||||
// We use the first PCM sender from the pipeline (index 0).
|
||||
let primary_pcm_tx = pipeline
|
||||
.pcm_senders
|
||||
.first()
|
||||
.cloned()
|
||||
.unwrap_or_else(|| broadcast::channel::<Vec<f32>>(1).0);
|
||||
let primary_iq_tx = pipeline
|
||||
.iq_senders
|
||||
.first()
|
||||
.cloned()
|
||||
.unwrap_or_else(|| broadcast::channel::<Vec<Complex<f32>>>(1).0);
|
||||
|
||||
let primary = ManagedChannel {
|
||||
id: Uuid::new_v4(),
|
||||
freq_hz: 0, // actual freq kept by SoapySdrRig; manager treats ch-0 as opaque
|
||||
mode: RigMode::USB,
|
||||
pcm_tx: primary_pcm_tx,
|
||||
iq_tx: primary_iq_tx,
|
||||
pipeline_slot: 0,
|
||||
permanent: true,
|
||||
};
|
||||
|
||||
Self {
|
||||
center_hz: pipeline.shared_center_hz.clone(),
|
||||
pipeline,
|
||||
fixed_slot_count,
|
||||
max_total: max_total.max(1),
|
||||
channels: RwLock::new(vec![primary]),
|
||||
}
|
||||
}
|
||||
|
||||
fn half_span_hz(&self) -> i64 {
|
||||
i64::from(self.pipeline.sdr_sample_rate) / 2
|
||||
}
|
||||
|
||||
/// Called by `SoapySdrRig` whenever the hardware center frequency changes.
|
||||
/// Recomputes the IF offset for every virtual channel.
|
||||
pub fn update_center_hz(&self, new_center_hz: i64) {
|
||||
self.center_hz.store(new_center_hz, Ordering::Relaxed);
|
||||
let channels = self.channels.read().unwrap();
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
for ch in channels.iter().filter(|c| !c.permanent) {
|
||||
let new_if_hz = ch.freq_hz as i64 - new_center_hz;
|
||||
if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) {
|
||||
dsp_arc
|
||||
.lock()
|
||||
.unwrap()
|
||||
.set_channel_if_hz(new_if_hz as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the primary channel's freq/mode metadata (called by SoapySdrRig
|
||||
/// on SetFreq/SetMode so channel-0 info stays current for API consumers).
|
||||
pub fn update_primary_meta(&self, freq_hz: u64, mode: &RigMode) {
|
||||
let mut channels = self.channels.write().unwrap();
|
||||
if let Some(ch) = channels.first_mut() {
|
||||
ch.freq_hz = freq_hz;
|
||||
ch.mode = mode.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualChannelManager for SdrVirtualChannelManager {
|
||||
fn add_channel(
|
||||
&self,
|
||||
freq_hz: u64,
|
||||
mode: &RigMode,
|
||||
) -> Result<(Uuid, broadcast::Receiver<Vec<f32>>), VChanError> {
|
||||
let half_span = self.half_span_hz();
|
||||
let center = self.center_hz.load(Ordering::Relaxed);
|
||||
let if_hz = freq_hz as i64 - center;
|
||||
if if_hz.unsigned_abs() as i64 > half_span {
|
||||
return Err(VChanError::OutOfBandwidth {
|
||||
half_span_hz: half_span,
|
||||
});
|
||||
}
|
||||
|
||||
let mut channels = self.channels.write().unwrap();
|
||||
if channels.len() >= self.max_total {
|
||||
return Err(VChanError::CapReached { max: self.max_total });
|
||||
}
|
||||
|
||||
let bandwidth_hz = default_bandwidth_hz(mode);
|
||||
let (pcm_tx, iq_tx) =
|
||||
self.pipeline
|
||||
.add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS);
|
||||
|
||||
let pipeline_slot = self
|
||||
.pipeline
|
||||
.channel_dsps
|
||||
.read()
|
||||
.unwrap()
|
||||
.len()
|
||||
.saturating_sub(1);
|
||||
|
||||
let id = Uuid::new_v4();
|
||||
let pcm_rx = pcm_tx.subscribe();
|
||||
channels.push(ManagedChannel {
|
||||
id,
|
||||
freq_hz,
|
||||
mode: mode.clone(),
|
||||
pcm_tx,
|
||||
iq_tx,
|
||||
pipeline_slot,
|
||||
permanent: false,
|
||||
});
|
||||
|
||||
Ok((id, pcm_rx))
|
||||
}
|
||||
|
||||
fn remove_channel(&self, id: Uuid) -> Result<(), VChanError> {
|
||||
let mut channels = self.channels.write().unwrap();
|
||||
let pos = channels
|
||||
.iter()
|
||||
.position(|c| c.id == id)
|
||||
.ok_or(VChanError::NotFound)?;
|
||||
|
||||
if channels[pos].permanent {
|
||||
return Err(VChanError::Permanent);
|
||||
}
|
||||
|
||||
let slot = channels[pos].pipeline_slot;
|
||||
self.pipeline.remove_virtual_channel(slot);
|
||||
channels.remove(pos);
|
||||
|
||||
// Shift pipeline_slot for all channels that were after the removed one.
|
||||
for ch in channels.iter_mut().filter(|c| c.pipeline_slot > slot) {
|
||||
ch.pipeline_slot -= 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_channel_freq(&self, id: Uuid, freq_hz: u64) -> Result<(), VChanError> {
|
||||
let half_span = self.half_span_hz();
|
||||
let center = self.center_hz.load(Ordering::Relaxed);
|
||||
let if_hz = freq_hz as i64 - center;
|
||||
if if_hz.unsigned_abs() as i64 > half_span {
|
||||
return Err(VChanError::OutOfBandwidth {
|
||||
half_span_hz: half_span,
|
||||
});
|
||||
}
|
||||
|
||||
let mut channels = self.channels.write().unwrap();
|
||||
let ch = channels
|
||||
.iter_mut()
|
||||
.find(|c| c.id == id)
|
||||
.ok_or(VChanError::NotFound)?;
|
||||
|
||||
ch.freq_hz = freq_hz;
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) {
|
||||
dsp_arc.lock().unwrap().set_channel_if_hz(if_hz as f64);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_channel_mode(&self, id: Uuid, mode: &RigMode) -> Result<(), VChanError> {
|
||||
let mut channels = self.channels.write().unwrap();
|
||||
let ch = channels
|
||||
.iter_mut()
|
||||
.find(|c| c.id == id)
|
||||
.ok_or(VChanError::NotFound)?;
|
||||
|
||||
ch.mode = mode.clone();
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) {
|
||||
dsp_arc.lock().unwrap().set_mode(mode);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_pcm(&self, id: Uuid) -> Option<broadcast::Receiver<Vec<f32>>> {
|
||||
let channels = self.channels.read().unwrap();
|
||||
channels
|
||||
.iter()
|
||||
.find(|c| c.id == id)
|
||||
.map(|c| c.pcm_tx.subscribe())
|
||||
}
|
||||
|
||||
fn channels(&self) -> Vec<VChannelInfo> {
|
||||
let channels = self.channels.read().unwrap();
|
||||
channels
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, ch)| VChannelInfo {
|
||||
id: ch.id,
|
||||
index: idx,
|
||||
freq_hz: ch.freq_hz,
|
||||
mode: format!("{:?}", ch.mode),
|
||||
permanent: ch.permanent,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn max_channels(&self) -> usize {
|
||||
self.max_total
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::dsp::{MockIqSource, SdrPipeline};
|
||||
|
||||
fn make_pipeline() -> Arc<SdrPipeline> {
|
||||
Arc::new(SdrPipeline::start(
|
||||
Box::new(MockIqSource),
|
||||
1_920_000,
|
||||
48_000,
|
||||
1,
|
||||
20,
|
||||
75,
|
||||
true,
|
||||
VirtualSquelchConfig::default(),
|
||||
&[(0.0, RigMode::USB, 3_000, 64)],
|
||||
))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_and_list() {
|
||||
let p = make_pipeline();
|
||||
let mgr = SdrVirtualChannelManager::new(p, 1, 4);
|
||||
// Set center to 14.1 MHz so that 14.074 MHz is within ±960 kHz.
|
||||
mgr.update_center_hz(14_100_000);
|
||||
assert_eq!(mgr.channels().len(), 1); // primary only
|
||||
|
||||
let (id, _rx) = mgr.add_channel(14_074_000, &RigMode::USB).unwrap();
|
||||
assert_eq!(mgr.channels().len(), 2);
|
||||
|
||||
let ch = mgr.channels().into_iter().find(|c| c.id == id).unwrap();
|
||||
assert_eq!(ch.freq_hz, 14_074_000);
|
||||
assert!(!ch.permanent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_virtual_channel() {
|
||||
let p = make_pipeline();
|
||||
let mgr = SdrVirtualChannelManager::new(p, 1, 4);
|
||||
mgr.update_center_hz(14_100_000);
|
||||
let (id, _) = mgr.add_channel(14_074_000, &RigMode::USB).unwrap();
|
||||
mgr.remove_channel(id).unwrap();
|
||||
assert_eq!(mgr.channels().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cannot_remove_primary() {
|
||||
let p = make_pipeline();
|
||||
let mgr = SdrVirtualChannelManager::new(p, 1, 4);
|
||||
let primary_id = mgr.channels()[0].id;
|
||||
let err = mgr.remove_channel(primary_id).unwrap_err();
|
||||
assert!(matches!(err, VChanError::Permanent));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cap_enforced() {
|
||||
let p = make_pipeline();
|
||||
let mgr = SdrVirtualChannelManager::new(p, 1, 2); // primary + 1 virtual max
|
||||
mgr.update_center_hz(14_100_000);
|
||||
mgr.add_channel(14_074_000, &RigMode::USB).unwrap();
|
||||
let err = mgr.add_channel(14_075_000, &RigMode::USB).unwrap_err();
|
||||
assert!(matches!(err, VChanError::CapReached { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn out_of_bandwidth() {
|
||||
let p = make_pipeline();
|
||||
let mgr = SdrVirtualChannelManager::new(p, 1, 4);
|
||||
// center_hz = 0, half_span = 960_000 Hz — 10 MHz is way out
|
||||
let err = mgr.add_channel(10_000_000, &RigMode::USB).unwrap_err();
|
||||
assert!(matches!(err, VChanError::OutOfBandwidth { .. }));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user