diff --git a/Cargo.lock b/Cargo.lock index d3c108b..9ec450c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,6 +320,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + [[package]] name = "audiopus_sys" version = "0.2.2" @@ -941,10 +947,23 @@ checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "wasip2", + "wasip3", +] + [[package]] name = "glob" version = "0.3.3" @@ -970,6 +989,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -1116,6 +1144,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "idna" version = "1.1.0" @@ -1144,7 +1178,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -1238,6 +1274,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.177" @@ -1638,6 +1680,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "primal-check" version = "0.3.4" @@ -1680,6 +1732,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rand" version = "0.8.5" @@ -2407,6 +2465,7 @@ name = "trx-backend" version = "0.1.0" dependencies = [ "serde", + "serde_json", "tokio", "tokio-serial", "tracing", @@ -2414,6 +2473,7 @@ dependencies = [ "trx-backend-ft817", "trx-backend-soapysdr", "trx-core", + "uuid", ] [[package]] @@ -2450,6 +2510,7 @@ dependencies = [ "tracing", "trx-core", "trx-rds", + "uuid", ] [[package]] @@ -2485,6 +2546,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "uuid", ] [[package]] @@ -2680,6 +2742,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" +dependencies = [ + "getrandom 0.4.2", + "js-sys", + "serde_core", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -2714,7 +2788,16 @@ version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", ] [[package]] @@ -2776,6 +2859,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.10.0", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "web-sys" version = "0.3.85" @@ -3142,6 +3259,94 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.10.0", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "writeable" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index c37cae1..16bd918 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ resolver = "2" [workspace.dependencies] flate2 = "1" tokio = "1" +uuid = { version = "1", features = ["v4", "serde"] } tokio-serial = "5" serde = "1" serde_json = "1" diff --git a/src/trx-core/Cargo.toml b/src/trx-core/Cargo.toml index 8d5d5c2..9e6cdc5 100644 --- a/src/trx-core/Cargo.toml +++ b/src/trx-core/Cargo.toml @@ -13,3 +13,4 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tracing = { workspace = true } flate2 = { workspace = true } +uuid = { workspace = true } diff --git a/src/trx-core/src/lib.rs b/src/trx-core/src/lib.rs index 2fe326b..e1305f8 100644 --- a/src/trx-core/src/lib.rs +++ b/src/trx-core/src/lib.rs @@ -7,6 +7,7 @@ pub mod decode; pub mod math; pub mod radio; pub mod rig; +pub mod vchan; pub type DynResult = Result>; diff --git a/src/trx-core/src/vchan.rs b/src/trx-core/src/vchan.rs new file mode 100644 index 0000000..c1d9ebd --- /dev/null +++ b/src/trx-core/src/vchan.rs @@ -0,0 +1,110 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Virtual channel management trait and shared types. +//! +//! A *virtual channel* is an independent DSP slice within the capture bandwidth +//! of an SDR rig. Each has its own frequency offset, demodulation mode, and +//! PCM audio broadcast. Traditional (non-SDR) rigs do not support virtual +//! channels; their `RigHandle::vchan_manager` field will be `None`. + +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; +use uuid::Uuid; + +use crate::rig::state::RigMode; + +// --------------------------------------------------------------------------- +// Shared types +// --------------------------------------------------------------------------- + +/// Snapshot of one virtual channel's state (HTTP-serialisable). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VChannelInfo { + /// Stable UUID identifier. + pub id: Uuid, + /// Display index in the ordered channel list (0 = primary). + pub index: usize, + /// Dial frequency in Hz. + pub freq_hz: u64, + /// Demodulation mode name (e.g. "USB", "FM"). + pub mode: String, + /// `true` for the primary channel (index 0), which cannot be removed. + pub permanent: bool, +} + +/// Errors returned by virtual channel management operations. +#[derive(Debug, Clone)] +pub enum VChanError { + /// The configured channel cap would be exceeded. + CapReached { max: usize }, + /// The requested frequency lies outside the current SDR capture bandwidth. + OutOfBandwidth { half_span_hz: i64 }, + /// No channel with the given UUID exists. + NotFound, + /// Attempted to remove the permanent primary channel. + Permanent, +} + +impl std::fmt::Display for VChanError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + VChanError::CapReached { max } => { + write!(f, "virtual channel cap reached (max {})", max) + } + VChanError::OutOfBandwidth { half_span_hz } => write!( + f, + "frequency outside SDR capture bandwidth (±{} Hz)", + half_span_hz + ), + VChanError::NotFound => write!(f, "virtual channel not found"), + VChanError::Permanent => write!(f, "cannot remove the primary channel"), + } + } +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Manages virtual DSP channels for an SDR rig. +/// +/// Implementations are `Send + Sync` so the manager can be shared across +/// tokio tasks and actix-web handlers. +pub trait VirtualChannelManager: Send + Sync { + /// Add a new virtual channel tuned to `freq_hz` with `mode`. + /// + /// Returns the new channel UUID and a PCM broadcast receiver that delivers + /// decoded audio frames for this channel. + fn add_channel( + &self, + freq_hz: u64, + mode: &RigMode, + ) -> Result<(Uuid, broadcast::Receiver>), VChanError>; + + /// Remove a virtual channel by UUID. The primary channel (index 0) cannot + /// be removed and returns `VChanError::Permanent`. + fn remove_channel(&self, id: Uuid) -> Result<(), VChanError>; + + /// Update the dial frequency of an existing channel. + fn set_channel_freq(&self, id: Uuid, freq_hz: u64) -> Result<(), VChanError>; + + /// Update the demodulation mode of an existing channel. + fn set_channel_mode(&self, id: Uuid, mode: &RigMode) -> Result<(), VChanError>; + + /// Subscribe to decoded PCM audio from a channel. + /// Returns `None` if the channel UUID does not exist. + fn subscribe_pcm(&self, id: Uuid) -> Option>>; + + /// Return a snapshot of all channels in display order. + fn channels(&self) -> Vec; + + /// Maximum number of channels (including the primary channel). + fn max_channels(&self) -> usize; +} + +/// Convenience alias used in `RigHandle`. +pub type SharedVChanManager = Arc; diff --git a/src/trx-server/src/config.rs b/src/trx-server/src/config.rs index df33daf..8675f69 100644 --- a/src/trx-server/src/config.rs +++ b/src/trx-server/src/config.rs @@ -345,6 +345,14 @@ pub struct SdrConfig { pub squelch: SdrSquelchConfig, /// Virtual receiver channels (at least one required when SDR backend is active). pub channels: Vec, + /// Maximum number of simultaneous virtual channels (including the primary). + /// Default: 4. + #[serde(default = "default_max_virtual_channels")] + pub max_virtual_channels: usize, +} + +fn default_max_virtual_channels() -> usize { + 4 } impl Default for SdrConfig { @@ -357,6 +365,7 @@ impl Default for SdrConfig { gain: SdrGainConfig::default(), squelch: SdrSquelchConfig::default(), channels: Vec::new(), + max_virtual_channels: default_max_virtual_channels(), } } } diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index 72279ee..c47c978 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -466,6 +466,7 @@ mod tests { rig_tx, state_rx, audio_port: 4531, + vchan_manager: None, }; let mut map = HashMap::new(); map.insert("default".to_string(), handle); diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 84bbcc3..4072821 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -285,6 +285,7 @@ type SdrRigBuildResult = DynResult<( tokio::sync::broadcast::Receiver>, ), tokio::sync::broadcast::Receiver>>, + trx_core::vchan::SharedVChanManager, )>; type OptionalSdrRig = Option>; @@ -348,6 +349,7 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult rig_cfg.sdr.squelch.threshold_db, rig_cfg.sdr.squelch.hysteresis_db, rig_cfg.sdr.squelch.tail_ms, + rig_cfg.sdr.max_virtual_channels, )?; let pcm_rx = sdr_rig.subscribe_pcm(); @@ -363,11 +365,14 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult .position(|(_, mode, _, _)| matches!(mode, trx_core::rig::state::RigMode::VDES | trx_core::rig::state::RigMode::MARINE)) .unwrap_or(0); let vdes_iq = sdr_rig.subscribe_iq_channel(vdes_channel_idx); + // Extract the virtual channel manager before the rig is consumed by Box. + let vchan_manager: trx_core::vchan::SharedVChanManager = sdr_rig.channel_manager(); Ok(( Box::new(sdr_rig) as Box, pcm_rx, ais_pcm, vdes_iq, + vchan_manager, )) } @@ -917,13 +922,17 @@ async fn main() -> DynResult<()> { // Build SDR rig when applicable. #[cfg(feature = "soapysdr")] + let mut sdr_vchan_manager: Option = None; + #[cfg(feature = "soapysdr")] let (sdr_prebuilt_rig, sdr_pcm_rx, sdr_ais_pcm_rx, sdr_vdes_iq_rx): ( OptionalSdrRig, OptionalSdrPcmRx, OptionalSdrAisPcmRx, OptionalSdrVdesIqRx, ) = if rig_cfg.rig.access.access_type.as_deref() == Some("sdr") { - let (rig, pcm_rx, ais_pcm_rx, vdes_iq_rx) = build_sdr_rig_from_instance(rig_cfg)?; + let (rig, pcm_rx, ais_pcm_rx, vdes_iq_rx, vchan_mgr) = + build_sdr_rig_from_instance(rig_cfg)?; + sdr_vchan_manager = Some(vchan_mgr); (Some(rig), Some(pcm_rx), Some(ais_pcm_rx), Some(vdes_iq_rx)) } else { (None, None, None, None) @@ -1005,6 +1014,11 @@ async fn main() -> DynResult<()> { ); task_handles.extend(audio_handles); + #[cfg(feature = "soapysdr")] + let vchan_manager_for_handle = sdr_vchan_manager; + #[cfg(not(feature = "soapysdr"))] + let vchan_manager_for_handle: Option = None; + rig_handles.insert( rig_cfg.id.clone(), RigHandle { @@ -1013,6 +1027,7 @@ async fn main() -> DynResult<()> { rig_tx, state_rx, audio_port: rig_cfg.audio.port, + vchan_manager: vchan_manager_for_handle, }, ); } diff --git a/src/trx-server/src/rig_handle.rs b/src/trx-server/src/rig_handle.rs index acef2db..0ce5897 100644 --- a/src/trx-server/src/rig_handle.rs +++ b/src/trx-server/src/rig_handle.rs @@ -6,6 +6,7 @@ use tokio::sync::{mpsc, watch}; +use trx_core::vchan::SharedVChanManager; use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; @@ -24,4 +25,6 @@ pub struct RigHandle { pub state_rx: watch::Receiver, /// Per-rig audio listener TCP port. pub audio_port: u16, + /// Virtual channel manager; `Some` only for SDR rigs. + pub vchan_manager: Option, } diff --git a/src/trx-server/trx-backend/Cargo.toml b/src/trx-server/trx-backend/Cargo.toml index e04d8bc..9319414 100644 --- a/src/trx-server/trx-backend/Cargo.toml +++ b/src/trx-server/trx-backend/Cargo.toml @@ -21,4 +21,6 @@ trx-backend-soapysdr = { path = "./trx-backend-soapysdr", optional = true } tokio = { workspace = true, features = ["full"] } tokio-serial = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +uuid = { workspace = true } tracing = { workspace = true } diff --git a/src/trx-server/trx-backend/src/lib.rs b/src/trx-server/trx-backend/src/lib.rs index cdfcdf5..86c41a4 100644 --- a/src/trx-server/trx-backend/src/lib.rs +++ b/src/trx-server/trx-backend/src/lib.rs @@ -8,6 +8,9 @@ use trx_core::rig::RigCat; use trx_core::DynResult; mod dummy; +pub mod vchan; + +pub use vchan::{SharedVChanManager, VChanError, VChannelInfo, VirtualChannelManager}; #[cfg(feature = "ft450d")] use trx_backend_ft450d::Ft450d; diff --git a/src/trx-server/trx-backend/src/vchan.rs b/src/trx-server/trx-backend/src/vchan.rs new file mode 100644 index 0000000..53fbe2e --- /dev/null +++ b/src/trx-server/trx-backend/src/vchan.rs @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +// Re-export the trait and types from trx-core so crates that depend on +// trx-backend can use them without a direct trx-core dependency. +pub use trx_core::vchan::{SharedVChanManager, VChanError, VChannelInfo, VirtualChannelManager}; diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml b/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml index 7f04edd..1f4f2c5 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml @@ -13,6 +13,7 @@ trx-core = { path = "../../../trx-core" } trx-rds = { path = "../../../decoders/trx-rds" } tokio = { workspace = true, features = ["sync", "rt"] } serde = { workspace = true } +uuid = { workspace = true } tracing = { workspace = true } num-complex = "0.4" rustfft = "6" diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 9dfdd65..1d6bae4 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -15,6 +15,7 @@ mod channel; mod filter; mod spectrum; +use std::sync::atomic::AtomicI64; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; @@ -98,6 +99,9 @@ pub struct SdrPipeline { /// Write `Some(gain_db)` here to adjust the hardware RX gain. /// The IQ read loop picks it up on the next iteration. pub gain_cmd: Arc>>, + /// Current hardware center frequency in Hz, kept in sync by `SoapySdrRig`. + /// Read by `SdrVirtualChannelManager` to validate and compute IF offsets. + pub shared_center_hz: Arc, // Parameters cached here so `add_virtual_channel` can construct new // `ChannelDsp` instances without needing to be passed them again. audio_sample_rate: u32, @@ -192,6 +196,7 @@ impl SdrPipeline { sdr_sample_rate, retune_cmd, gain_cmd, + shared_center_hz: Arc::new(AtomicI64::new(0)), audio_sample_rate, audio_channels: output_channels, frame_duration_ms, diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs index 7dc524a..dedb1c1 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs @@ -5,8 +5,10 @@ pub mod demod; pub mod dsp; pub mod real_iq_source; +pub mod vchan_impl; use std::pin::Pin; +use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use trx_core::radio::freq::{Band, Freq}; @@ -19,12 +21,14 @@ use trx_core::{DynResult, RigMode}; const AIS_CHANNEL_SPACING_HZ: i64 = 50_000; +pub use vchan_impl::SdrVirtualChannelManager; + /// RX-only backend for any SoapySDR-compatible device. pub struct SoapySdrRig { info: RigInfo, freq: Freq, mode: RigMode, - pipeline: dsp::SdrPipeline, + pipeline: Arc, /// Index of the primary channel in `pipeline.channel_dsps`. primary_channel_idx: usize, /// Current filter state of the primary channel (for filter_controls support). @@ -55,6 +59,8 @@ pub struct SoapySdrRig { squelch_threshold_db: f32, /// Hidden AIS decoder channels (A and B) when available. ais_channel_indices: Option<(usize, usize)>, + /// Virtual channel manager shared with external consumers (e.g. RigHandle). + channel_manager: Arc, } impl SoapySdrRig { @@ -118,6 +124,7 @@ impl SoapySdrRig { squelch_threshold_db: f32, squelch_hysteresis_db: f32, squelch_tail_ms: u32, + max_virtual_channels: usize, ) -> DynResult { tracing::info!( "initialising SoapySDR backend (args={:?}, gain_mode={:?}, gain_db={}, max_gain_db={:?})", @@ -184,7 +191,7 @@ impl SoapySdrRig { (squelch_tail_ms as f64 / block_ms).ceil().max(0.0) as u32 }; - let pipeline = dsp::SdrPipeline::start( + let pipeline = Arc::new(dsp::SdrPipeline::start( iq_source, sdr_sample_rate, audio_sample_rate, @@ -199,7 +206,7 @@ impl SoapySdrRig { tail_blocks: squelch_tail_blocks, }, &all_channels, - ); + )); let info = RigInfo { manufacturer: "SoapySDR".to_string(), @@ -254,6 +261,18 @@ impl SoapySdrRig { let spectrum_buf = pipeline.spectrum_buf.clone(); let retune_cmd = pipeline.retune_cmd.clone(); + // Initial center_hz stored in the pipeline's shared atomic so the + // virtual channel manager can read it without a reference to SoapySdrRig. + pipeline + .shared_center_hz + .store(hardware_center_hz, Ordering::Relaxed); + // Fixed slots: user-configured channels + 2 AIS channels. + let fixed_slot_count = all_channels.len(); + let channel_manager = Arc::new(vchan_impl::SdrVirtualChannelManager::new( + pipeline.clone(), + fixed_slot_count, + max_virtual_channels, + )); let rig = Self { info, @@ -275,6 +294,7 @@ impl SoapySdrRig { squelch_enabled, squelch_threshold_db, ais_channel_indices: Some((primary_channel_count, primary_channel_count + 1)), + channel_manager, }; rig.apply_ais_channel_activity(); Ok(rig) @@ -303,9 +323,16 @@ impl SoapySdrRig { -65.0, // squelch_threshold_db 3.0, // squelch_hysteresis_db 180, // squelch_tail_ms + 4, // max_virtual_channels ) } + /// Return the virtual channel manager for this SDR rig. + /// Used by `RigHandle` to expose the channel API. + pub fn channel_manager(&self) -> trx_core::vchan::SharedVChanManager { + self.channel_manager.clone() + } + fn update_ais_channel_offsets(&self) { let Some((ais_a_idx, ais_b_idx)) = self.ais_channel_indices else { return; @@ -354,85 +381,11 @@ impl SoapySdrRig { self.center_hz } - /// Half of the SDR capture bandwidth (Hz). A virtual channel's dial - /// frequency must stay within `center_hz ± half_span_hz`. + /// Half of the SDR capture bandwidth (Hz). pub fn half_span_hz(&self) -> i64 { i64::from(self.pipeline.sdr_sample_rate) / 2 } - /// Allocate a new virtual DSP channel within the current SDR capture - /// bandwidth. Returns `None` if `freq_hz` is outside the capture span. - /// - /// The returned senders can be subscribed to for PCM audio frames. The - /// `pipeline_slot` index identifies the slot for future - /// `virtual_channel_set_freq`, `virtual_channel_set_mode`, and - /// `virtual_channel_remove` calls. - pub fn virtual_channel_add( - &self, - freq_hz: u64, - mode: &RigMode, - bandwidth_hz: u32, - fir_taps: usize, - ) -> Option<( - tokio::sync::broadcast::Sender>, - tokio::sync::broadcast::Sender>>, - usize, - )> { - let channel_if_hz = freq_hz as i64 - self.center_hz; - if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() { - return None; - } - let (pcm_tx, iq_tx) = - self.pipeline - .add_virtual_channel(channel_if_hz as f64, mode, bandwidth_hz, fir_taps); - let slot = self - .pipeline - .channel_dsps - .read() - .unwrap() - .len() - .saturating_sub(1); - Some((pcm_tx, iq_tx, slot)) - } - - /// Remove a virtual channel at the given pipeline slot index. - /// Returns `false` if the slot is out of range. - pub fn virtual_channel_remove(&self, pipeline_slot: usize) -> bool { - self.pipeline.remove_virtual_channel(pipeline_slot) - } - - /// Update the dial frequency of a virtual channel. - /// Returns `false` if the slot is out of range or the freq is outside - /// the current capture bandwidth. - pub fn virtual_channel_set_freq(&self, pipeline_slot: usize, freq_hz: u64) -> bool { - let channel_if_hz = freq_hz as i64 - self.center_hz; - if channel_if_hz.unsigned_abs() as i64 > self.half_span_hz() { - return false; - } - let dsps = self.pipeline.channel_dsps.read().unwrap(); - if let Some(dsp_arc) = dsps.get(pipeline_slot) { - dsp_arc - .lock() - .unwrap() - .set_channel_if_hz(channel_if_hz as f64); - true - } else { - false - } - } - - /// Update the demodulation mode of a virtual channel. - /// Returns `false` if the slot is out of range. - pub fn virtual_channel_set_mode(&self, pipeline_slot: usize, mode: &RigMode) -> bool { - let dsps = self.pipeline.channel_dsps.read().unwrap(); - if let Some(dsp_arc) = dsps.get(pipeline_slot) { - dsp_arc.lock().unwrap().set_mode(mode); - true - } else { - false - } - } - pub fn subscribe_iq_channel( &self, channel_idx: usize, @@ -521,6 +474,7 @@ impl RigCat for SoapySdrRig { if let Ok(mut cmd) = self.retune_cmd.lock() { *cmd = Some(hardware_hz as f64); } + self.channel_manager.update_center_hz(hardware_hz); } { @@ -549,6 +503,7 @@ impl RigCat for SoapySdrRig { if let Ok(mut cmd) = self.retune_cmd.lock() { *cmd = Some(self.center_hz as f64); } + self.channel_manager.update_center_hz(self.center_hz); { let dsps = self.pipeline.channel_dsps.read().unwrap(); if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs new file mode 100644 index 0000000..ea0e4d2 --- /dev/null +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs @@ -0,0 +1,378 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Concrete `VirtualChannelManager` implementation for SoapySDR rigs. +//! +//! `SdrVirtualChannelManager` wraps an `Arc` and maintains a list +//! of managed channel entries. The primary channel (pipeline slot 0) is always +//! present and marked permanent; additional virtual channels are appended +//! dynamically. +//! +//! ## Slot stability +//! +//! Virtual channels occupy pipeline slots `fixed_slot_count..`. When a channel +//! at slot *K* is removed, `Vec::remove(K)` shifts all entries K+1..end left by +//! one; the manager updates every surviving entry's `pipeline_slot` accordingly. +//! +//! ## Center-frequency updates +//! +//! When the hardware retunes (changing `center_hz`), all channel IF offsets must +//! be recomputed. The rig calls `update_center_hz()` after every retune; this +//! updates every `ChannelDsp` in a single write-locked pass. + +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::{Arc, RwLock}; + +use num_complex::Complex; +use tokio::sync::broadcast; +use trx_core::rig::state::RigMode; +use uuid::Uuid; + +use crate::dsp::{SdrPipeline, VirtualSquelchConfig}; +use trx_core::vchan::{VChanError, VChannelInfo, VirtualChannelManager}; + +// --------------------------------------------------------------------------- +// Default DSP parameters for virtual channels +// --------------------------------------------------------------------------- + +const DEFAULT_FIR_TAPS: usize = 64; + +fn default_bandwidth_hz(mode: &RigMode) -> u32 { + match mode { + RigMode::CW | RigMode::CWR => 500, + RigMode::LSB | RigMode::USB | RigMode::DIG => 3_000, + RigMode::AM => 9_000, + RigMode::FM => 12_500, + RigMode::WFM => 180_000, + RigMode::PKT | RigMode::AIS => 25_000, + RigMode::VDES | RigMode::MARINE => 100_000, + RigMode::Other(_) => 3_000, + } +} + +// --------------------------------------------------------------------------- +// Internal channel record +// --------------------------------------------------------------------------- + +struct ManagedChannel { + id: Uuid, + freq_hz: u64, + mode: RigMode, + /// `broadcast::Sender` kept alive so new subscribers can join at any time. + pcm_tx: broadcast::Sender>, + /// IQ tap sender (kept alive; external consumers may subscribe). + #[allow(dead_code)] + iq_tx: broadcast::Sender>>, + /// Index of this channel in `pipeline.channel_dsps`. + pipeline_slot: usize, + /// True only for the primary channel; prevents removal. + permanent: bool, +} + +// --------------------------------------------------------------------------- +// SdrVirtualChannelManager +// --------------------------------------------------------------------------- + +pub struct SdrVirtualChannelManager { + pipeline: Arc, + /// Current SDR hardware center frequency, updated on every retune. + center_hz: Arc, + /// Pipeline slots 0..fixed_slot_count are reserved (primary + AIS). + /// Virtual channels occupy slots fixed_slot_count and above. + fixed_slot_count: usize, + /// Maximum total channels including the primary (enforced on `add_channel`). + max_total: usize, + channels: RwLock>, +} + +impl SdrVirtualChannelManager { + /// Create a new manager. + /// + /// - `pipeline`: shared reference to the running `SdrPipeline`. + /// - `fixed_slot_count`: number of fixed pipeline slots (primary + AIS), + /// i.e. the index of the first slot available for virtual channels. + /// - `max_total`: maximum total channels including primary (e.g. 4). + pub fn new( + pipeline: Arc, + fixed_slot_count: usize, + max_total: usize, + ) -> Self { + // Seed the channel list with a synthetic primary-channel entry. + // We use the first PCM sender from the pipeline (index 0). + let primary_pcm_tx = pipeline + .pcm_senders + .first() + .cloned() + .unwrap_or_else(|| broadcast::channel::>(1).0); + let primary_iq_tx = pipeline + .iq_senders + .first() + .cloned() + .unwrap_or_else(|| broadcast::channel::>>(1).0); + + let primary = ManagedChannel { + id: Uuid::new_v4(), + freq_hz: 0, // actual freq kept by SoapySdrRig; manager treats ch-0 as opaque + mode: RigMode::USB, + pcm_tx: primary_pcm_tx, + iq_tx: primary_iq_tx, + pipeline_slot: 0, + permanent: true, + }; + + Self { + center_hz: pipeline.shared_center_hz.clone(), + pipeline, + fixed_slot_count, + max_total: max_total.max(1), + channels: RwLock::new(vec![primary]), + } + } + + fn half_span_hz(&self) -> i64 { + i64::from(self.pipeline.sdr_sample_rate) / 2 + } + + /// Called by `SoapySdrRig` whenever the hardware center frequency changes. + /// Recomputes the IF offset for every virtual channel. + pub fn update_center_hz(&self, new_center_hz: i64) { + self.center_hz.store(new_center_hz, Ordering::Relaxed); + let channels = self.channels.read().unwrap(); + let dsps = self.pipeline.channel_dsps.read().unwrap(); + for ch in channels.iter().filter(|c| !c.permanent) { + let new_if_hz = ch.freq_hz as i64 - new_center_hz; + if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) { + dsp_arc + .lock() + .unwrap() + .set_channel_if_hz(new_if_hz as f64); + } + } + } + + /// Update the primary channel's freq/mode metadata (called by SoapySdrRig + /// on SetFreq/SetMode so channel-0 info stays current for API consumers). + pub fn update_primary_meta(&self, freq_hz: u64, mode: &RigMode) { + let mut channels = self.channels.write().unwrap(); + if let Some(ch) = channels.first_mut() { + ch.freq_hz = freq_hz; + ch.mode = mode.clone(); + } + } +} + +impl VirtualChannelManager for SdrVirtualChannelManager { + fn add_channel( + &self, + freq_hz: u64, + mode: &RigMode, + ) -> Result<(Uuid, broadcast::Receiver>), VChanError> { + let half_span = self.half_span_hz(); + let center = self.center_hz.load(Ordering::Relaxed); + let if_hz = freq_hz as i64 - center; + if if_hz.unsigned_abs() as i64 > half_span { + return Err(VChanError::OutOfBandwidth { + half_span_hz: half_span, + }); + } + + let mut channels = self.channels.write().unwrap(); + if channels.len() >= self.max_total { + return Err(VChanError::CapReached { max: self.max_total }); + } + + let bandwidth_hz = default_bandwidth_hz(mode); + let (pcm_tx, iq_tx) = + self.pipeline + .add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS); + + let pipeline_slot = self + .pipeline + .channel_dsps + .read() + .unwrap() + .len() + .saturating_sub(1); + + let id = Uuid::new_v4(); + let pcm_rx = pcm_tx.subscribe(); + channels.push(ManagedChannel { + id, + freq_hz, + mode: mode.clone(), + pcm_tx, + iq_tx, + pipeline_slot, + permanent: false, + }); + + Ok((id, pcm_rx)) + } + + fn remove_channel(&self, id: Uuid) -> Result<(), VChanError> { + let mut channels = self.channels.write().unwrap(); + let pos = channels + .iter() + .position(|c| c.id == id) + .ok_or(VChanError::NotFound)?; + + if channels[pos].permanent { + return Err(VChanError::Permanent); + } + + let slot = channels[pos].pipeline_slot; + self.pipeline.remove_virtual_channel(slot); + channels.remove(pos); + + // Shift pipeline_slot for all channels that were after the removed one. + for ch in channels.iter_mut().filter(|c| c.pipeline_slot > slot) { + ch.pipeline_slot -= 1; + } + Ok(()) + } + + fn set_channel_freq(&self, id: Uuid, freq_hz: u64) -> Result<(), VChanError> { + let half_span = self.half_span_hz(); + let center = self.center_hz.load(Ordering::Relaxed); + let if_hz = freq_hz as i64 - center; + if if_hz.unsigned_abs() as i64 > half_span { + return Err(VChanError::OutOfBandwidth { + half_span_hz: half_span, + }); + } + + let mut channels = self.channels.write().unwrap(); + let ch = channels + .iter_mut() + .find(|c| c.id == id) + .ok_or(VChanError::NotFound)?; + + ch.freq_hz = freq_hz; + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) { + dsp_arc.lock().unwrap().set_channel_if_hz(if_hz as f64); + } + Ok(()) + } + + fn set_channel_mode(&self, id: Uuid, mode: &RigMode) -> Result<(), VChanError> { + let mut channels = self.channels.write().unwrap(); + let ch = channels + .iter_mut() + .find(|c| c.id == id) + .ok_or(VChanError::NotFound)?; + + ch.mode = mode.clone(); + let dsps = self.pipeline.channel_dsps.read().unwrap(); + if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) { + dsp_arc.lock().unwrap().set_mode(mode); + } + Ok(()) + } + + fn subscribe_pcm(&self, id: Uuid) -> Option>> { + let channels = self.channels.read().unwrap(); + channels + .iter() + .find(|c| c.id == id) + .map(|c| c.pcm_tx.subscribe()) + } + + fn channels(&self) -> Vec { + let channels = self.channels.read().unwrap(); + channels + .iter() + .enumerate() + .map(|(idx, ch)| VChannelInfo { + id: ch.id, + index: idx, + freq_hz: ch.freq_hz, + mode: format!("{:?}", ch.mode), + permanent: ch.permanent, + }) + .collect() + } + + fn max_channels(&self) -> usize { + self.max_total + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::dsp::{MockIqSource, SdrPipeline}; + + fn make_pipeline() -> Arc { + Arc::new(SdrPipeline::start( + Box::new(MockIqSource), + 1_920_000, + 48_000, + 1, + 20, + 75, + true, + VirtualSquelchConfig::default(), + &[(0.0, RigMode::USB, 3_000, 64)], + )) + } + + #[test] + fn add_and_list() { + let p = make_pipeline(); + let mgr = SdrVirtualChannelManager::new(p, 1, 4); + // Set center to 14.1 MHz so that 14.074 MHz is within ±960 kHz. + mgr.update_center_hz(14_100_000); + assert_eq!(mgr.channels().len(), 1); // primary only + + let (id, _rx) = mgr.add_channel(14_074_000, &RigMode::USB).unwrap(); + assert_eq!(mgr.channels().len(), 2); + + let ch = mgr.channels().into_iter().find(|c| c.id == id).unwrap(); + assert_eq!(ch.freq_hz, 14_074_000); + assert!(!ch.permanent); + } + + #[test] + fn remove_virtual_channel() { + let p = make_pipeline(); + let mgr = SdrVirtualChannelManager::new(p, 1, 4); + mgr.update_center_hz(14_100_000); + let (id, _) = mgr.add_channel(14_074_000, &RigMode::USB).unwrap(); + mgr.remove_channel(id).unwrap(); + assert_eq!(mgr.channels().len(), 1); + } + + #[test] + fn cannot_remove_primary() { + let p = make_pipeline(); + let mgr = SdrVirtualChannelManager::new(p, 1, 4); + let primary_id = mgr.channels()[0].id; + let err = mgr.remove_channel(primary_id).unwrap_err(); + assert!(matches!(err, VChanError::Permanent)); + } + + #[test] + fn cap_enforced() { + let p = make_pipeline(); + let mgr = SdrVirtualChannelManager::new(p, 1, 2); // primary + 1 virtual max + mgr.update_center_hz(14_100_000); + mgr.add_channel(14_074_000, &RigMode::USB).unwrap(); + let err = mgr.add_channel(14_075_000, &RigMode::USB).unwrap_err(); + assert!(matches!(err, VChanError::CapReached { .. })); + } + + #[test] + fn out_of_bandwidth() { + let p = make_pipeline(); + let mgr = SdrVirtualChannelManager::new(p, 1, 4); + // center_hz = 0, half_span = 960_000 Hz — 10 MHz is way out + let err = mgr.add_channel(10_000_000, &RigMode::USB).unwrap_err(); + assert!(matches!(err, VChanError::OutOfBandwidth { .. })); + } +}