[feat](trx-rs): persistent multi-channel virtual channels with OOB eviction
Allow users to allocate multiple virtual channels independently of browser tab count. Channels survive SDR center-frequency retuning as long as they stay within the capture bandwidth; channels that fall outside the SDR span are automatically destroyed. Changes: - trx-core: add AUDIO_MSG_VCHAN_DESTROYED (0x12) wire constant; add default subscribe_destroyed() to VirtualChannelManager trait - trx-backend-soapysdr: update_center_hz() detects OOB channels, removes them, fires destroyed_tx broadcast; add destroyed_sender() and subscribe_destroyed() override - trx-server/audio: recv_destroyed() helper avoids select! busy-loop for non-SDR backends; send AUDIO_MSG_VCHAN_DESTROYED to client when a channel is evicted server-side - trx-client/audio_client: persist active_subs across TCP reconnects, re-subscribe on reconnect; handle AUDIO_MSG_VCHAN_DESTROYED by pruning vchan_audio map and forwarding UUID via vchan_destroyed_tx - trx-frontend/lib: add vchan_destroyed broadcast field to FrontendRuntimeContext - trx-client/main: wire vchan_destroyed_tx into audio client and frontend runtime context - trx-frontend-http/vchan: remove per-session one-channel limit in allocate(); replace auto-evict in release_session_on_rig() with subscriber-count-only update; add remove_by_uuid() for server- triggered OOB destruction (skips redundant VChanAudioCmd::Remove) - trx-frontend-http/server: spawn background task that forwards vchan_destroyed broadcast to ClientChannelManager.remove_by_uuid() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -5,7 +5,7 @@
|
||||
//! Audio TCP client that connects to the server's audio port and relays
|
||||
//! RX/TX Opus frames via broadcast/mpsc channels.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -26,9 +26,9 @@ use trx_core::audio::{
|
||||
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
|
||||
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE,
|
||||
AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH,
|
||||
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_FREQ,
|
||||
AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB,
|
||||
AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
||||
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_DESTROYED,
|
||||
AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB,
|
||||
AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
||||
};
|
||||
use trx_core::decode::DecodedMessage;
|
||||
use trx_frontend::VChanAudioCmd;
|
||||
@@ -48,8 +48,12 @@ pub async fn run_audio_client(
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
|
||||
mut vchan_cmd_rx: mpsc::Receiver<VChanAudioCmd>,
|
||||
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
|
||||
) {
|
||||
let mut reconnect_delay = Duration::from_secs(1);
|
||||
// Active virtual-channel subscriptions, keyed by UUID.
|
||||
// Re-sent to the server on every audio TCP reconnect.
|
||||
let mut active_subs: HashMap<Uuid, (u64, String)> = HashMap::new();
|
||||
|
||||
loop {
|
||||
if *shutdown_rx.borrow() {
|
||||
@@ -87,6 +91,8 @@ pub async fn run_audio_client(
|
||||
&mut shutdown_rx,
|
||||
&vchan_audio,
|
||||
&mut vchan_cmd_rx,
|
||||
&mut active_subs,
|
||||
&vchan_destroyed_tx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -132,6 +138,8 @@ async fn handle_audio_connection(
|
||||
shutdown_rx: &mut watch::Receiver<bool>,
|
||||
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
|
||||
vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>,
|
||||
active_subs: &mut HashMap<Uuid, (u64, String)>,
|
||||
vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>,
|
||||
) -> std::io::Result<()> {
|
||||
let (reader, writer) = stream.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
@@ -153,10 +161,30 @@ async fn handle_audio_connection(
|
||||
);
|
||||
let _ = stream_info_tx.send(Some(info));
|
||||
|
||||
// On reconnect: re-subscribe all previously active virtual channels.
|
||||
// Track which UUIDs were pre-sent so we don't duplicate them when the
|
||||
// same Subscribe command arrives from the mpsc queue.
|
||||
let mut resubscribed: HashSet<Uuid> = HashSet::new();
|
||||
for (&uuid, &(freq_hz, ref mode)) in active_subs.iter() {
|
||||
let json = serde_json::json!({
|
||||
"uuid": uuid.to_string(),
|
||||
"freq_hz": freq_hz,
|
||||
"mode": mode,
|
||||
});
|
||||
if let Ok(payload) = serde_json::to_vec(&json) {
|
||||
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
|
||||
warn!("Audio vchan reconnect SUB write failed: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
resubscribed.insert(uuid);
|
||||
}
|
||||
|
||||
// Spawn RX read task
|
||||
let rx_tx = rx_tx.clone();
|
||||
let decode_tx = decode_tx.clone();
|
||||
let vchan_audio_rx: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>> = Arc::clone(vchan_audio);
|
||||
let vchan_destroyed_for_rx = vchan_destroyed_tx.clone();
|
||||
let mut rx_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
match read_audio_msg(&mut reader).await {
|
||||
@@ -184,6 +212,19 @@ async fn handle_audio_connection(
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((AUDIO_MSG_VCHAN_DESTROYED, payload)) => {
|
||||
if let Ok(uuid) = parse_vchan_uuid_msg(&payload) {
|
||||
// Remove the broadcaster so audio_ws gets no more frames.
|
||||
if let Ok(mut map) = vchan_audio_rx.write() {
|
||||
map.remove(&uuid);
|
||||
}
|
||||
// Notify the HTTP frontend so it removes the channel from
|
||||
// ClientChannelManager (triggers SSE channels event).
|
||||
if let Some(ref tx) = vchan_destroyed_for_rx {
|
||||
let _ = tx.send(uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => {
|
||||
// Decompress gzip blob, then iterate the embedded framed messages.
|
||||
let mut decompressed = Vec::new();
|
||||
@@ -265,15 +306,21 @@ async fn handle_audio_connection(
|
||||
cmd = vchan_cmd_rx.recv() => {
|
||||
match cmd {
|
||||
Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => {
|
||||
let json = serde_json::json!({
|
||||
"uuid": uuid.to_string(),
|
||||
"freq_hz": freq_hz,
|
||||
"mode": mode,
|
||||
});
|
||||
if let Ok(payload) = serde_json::to_vec(&json) {
|
||||
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
|
||||
warn!("Audio vchan SUB write failed: {}", e);
|
||||
break;
|
||||
active_subs.insert(uuid, (freq_hz, mode.clone()));
|
||||
// Skip if already re-sent during reconnect initialization.
|
||||
if resubscribed.remove(&uuid) {
|
||||
// Already sent above; don't duplicate.
|
||||
} else {
|
||||
let json = serde_json::json!({
|
||||
"uuid": uuid.to_string(),
|
||||
"freq_hz": freq_hz,
|
||||
"mode": mode,
|
||||
});
|
||||
if let Ok(payload) = serde_json::to_vec(&json) {
|
||||
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
|
||||
warn!("Audio vchan SUB write failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -292,6 +339,7 @@ async fn handle_audio_connection(
|
||||
if let Ok(mut map) = vchan_audio.write() {
|
||||
map.remove(&uuid);
|
||||
}
|
||||
active_subs.remove(&uuid);
|
||||
}
|
||||
Some(VChanAudioCmd::SetFreq { uuid, freq_hz }) => {
|
||||
let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz });
|
||||
|
||||
@@ -303,6 +303,9 @@ async fn async_init() -> DynResult<AppState> {
|
||||
let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::<trx_frontend::VChanAudioCmd>(64);
|
||||
*frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
|
||||
|
||||
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
|
||||
frontend_runtime.vchan_destroyed = Some(vchan_destroyed_tx.clone());
|
||||
|
||||
info!(
|
||||
"Audio enabled: default port {}, decode channel set",
|
||||
cfg.frontends.audio.server_port
|
||||
@@ -324,6 +327,7 @@ async fn async_init() -> DynResult<AppState> {
|
||||
audio_shutdown_rx,
|
||||
vchan_audio_map,
|
||||
vchan_cmd_rx,
|
||||
Some(vchan_destroyed_tx),
|
||||
)));
|
||||
|
||||
if cfg.frontends.audio.bridge.enabled {
|
||||
|
||||
@@ -236,6 +236,10 @@ pub struct FrontendRuntimeContext {
|
||||
/// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection.
|
||||
/// `None` when no audio connection is active.
|
||||
pub vchan_audio_cmd: Arc<Mutex<Option<mpsc::Sender<VChanAudioCmd>>>>,
|
||||
/// Broadcast sender that fires whenever the server destroys a virtual
|
||||
/// channel (e.g. out-of-bandwidth after center-frequency retune).
|
||||
/// The HTTP frontend subscribes to clean up `ClientChannelManager`.
|
||||
pub vchan_destroyed: Option<broadcast::Sender<Uuid>>,
|
||||
}
|
||||
|
||||
impl FrontendRuntimeContext {
|
||||
@@ -281,6 +285,7 @@ impl FrontendRuntimeContext {
|
||||
},
|
||||
vchan_audio: Arc::new(RwLock::new(HashMap::new())),
|
||||
vchan_audio_cmd: Arc::new(Mutex::new(None)),
|
||||
vchan_destroyed: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use actix_web::{
|
||||
web, App, HttpServer,
|
||||
};
|
||||
use tokio::signal;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info};
|
||||
|
||||
@@ -92,6 +92,24 @@ async fn serve(
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn a task that removes channels destroyed server-side (OOB) from the
|
||||
// client-side registry so the SSE channel list stays in sync.
|
||||
if let Some(ref destroyed_tx) = context.vchan_destroyed {
|
||||
let mut destroyed_rx = destroyed_tx.subscribe();
|
||||
let mgr_for_destroyed = vchan_mgr.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match destroyed_rx.recv().await {
|
||||
Ok(uuid) => {
|
||||
mgr_for_destroyed.remove_by_uuid(uuid);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status, vchan_mgr)?;
|
||||
let handle = server.handle();
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -203,9 +203,6 @@ impl ClientChannelManager {
|
||||
freq_hz: u64,
|
||||
mode: &str,
|
||||
) -> Result<ClientChannel, VChanClientError> {
|
||||
// Release any existing channel owned by this session on this rig.
|
||||
self.release_session_on_rig(session_id, rig_id);
|
||||
|
||||
let mut rigs = self.rigs.write().unwrap();
|
||||
let channels = rigs.entry(rig_id.to_string()).or_default();
|
||||
|
||||
@@ -318,27 +315,9 @@ impl ClientChannelManager {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
// Collect IDs of non-permanent channels about to be evicted (0 subscribers).
|
||||
let to_remove: Vec<Uuid> = channels
|
||||
.iter()
|
||||
.filter(|c| !c.permanent && c.session_ids.is_empty())
|
||||
.map(|c| c.id)
|
||||
.collect();
|
||||
// Remove non-permanent channels with no subscribers.
|
||||
let before = channels.len();
|
||||
channels.retain(|c| c.permanent || !c.session_ids.is_empty());
|
||||
if channels.len() != before {
|
||||
changed = true;
|
||||
}
|
||||
if changed {
|
||||
self.broadcast_change(rig_id, channels);
|
||||
}
|
||||
drop(rigs);
|
||||
// Notify the audio-client task so it can tear down the server-side
|
||||
// DSP pipeline and Opus encoder for each evicted channel.
|
||||
for id in to_remove {
|
||||
self.send_audio_cmd(VChanAudioCmd::Remove(id));
|
||||
}
|
||||
}
|
||||
|
||||
/// Explicitly delete a channel by UUID (any session may do this).
|
||||
@@ -373,6 +352,42 @@ impl ClientChannelManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove a channel by UUID across all rigs (called when the server destroys
|
||||
/// it due to out-of-band center-frequency change). Does NOT send a
|
||||
/// `VChanAudioCmd::Remove` since the server-side channel is already gone.
|
||||
pub fn remove_by_uuid(&self, channel_id: Uuid) {
|
||||
let evicted_sessions: Vec<Uuid>;
|
||||
let rig_id_opt: Option<String>;
|
||||
{
|
||||
let mut rigs = self.rigs.write().unwrap();
|
||||
let mut found = false;
|
||||
let mut evicted = Vec::new();
|
||||
let mut found_rig = None;
|
||||
for (rig_id, channels) in rigs.iter_mut() {
|
||||
if let Some(pos) = channels.iter().position(|c| c.id == channel_id) {
|
||||
evicted = channels[pos].session_ids.clone();
|
||||
channels.remove(pos);
|
||||
self.broadcast_change(rig_id, channels);
|
||||
found_rig = Some(rig_id.clone());
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
evicted_sessions = evicted;
|
||||
rig_id_opt = found_rig;
|
||||
let _ = found; // suppress warning
|
||||
}
|
||||
// Clean up session → channel mapping for sessions that were subscribed.
|
||||
if rig_id_opt.is_some() {
|
||||
let mut sessions = self.sessions.write().unwrap();
|
||||
for sid in evicted_sessions {
|
||||
if matches!(sessions.get(&sid), Some((_, ch)) if *ch == channel_id) {
|
||||
sessions.remove(&sid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update freq/mode metadata for a channel.
|
||||
pub fn set_channel_freq(
|
||||
&self,
|
||||
|
||||
@@ -56,6 +56,9 @@ pub const AUDIO_MSG_VCHAN_MODE: u8 = 0x10;
|
||||
/// Client → server: remove a virtual channel (stops encoding and destroys the DSP pipeline).
|
||||
/// Payload: 16-byte UUID of the virtual channel on the server.
|
||||
pub const AUDIO_MSG_VCHAN_REMOVE: u8 = 0x11;
|
||||
/// Server → client: a virtual channel was destroyed server-side (e.g. went out of bandwidth).
|
||||
/// Payload: 16-byte UUID of the destroyed channel.
|
||||
pub const AUDIO_MSG_VCHAN_DESTROYED: u8 = 0x12;
|
||||
|
||||
/// Maximum payload size for normal messages (1 MB).
|
||||
const MAX_PAYLOAD_SIZE: u32 = 1_048_576;
|
||||
|
||||
@@ -117,6 +117,18 @@ pub trait VirtualChannelManager: Send + Sync {
|
||||
|
||||
/// Maximum number of channels (including the primary channel).
|
||||
fn max_channels(&self) -> usize;
|
||||
|
||||
/// Subscribe to server-side channel destruction events.
|
||||
///
|
||||
/// Returns a `broadcast::Receiver<Uuid>` that fires whenever the manager
|
||||
/// destroys a channel (e.g. because it went out of the SDR capture
|
||||
/// bandwidth). The default implementation returns an immediately-closed
|
||||
/// receiver so non-SDR backends do not need to override this.
|
||||
fn subscribe_destroyed(&self) -> broadcast::Receiver<Uuid> {
|
||||
// Drop the sender immediately; the receiver will resolve to
|
||||
// `Err(RecvError::Closed)` on first poll, signalling "no events".
|
||||
broadcast::channel::<Uuid>(1).1
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience alias used in `RigHandle`.
|
||||
|
||||
@@ -27,7 +27,7 @@ use trx_core::audio::{
|
||||
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE,
|
||||
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME,
|
||||
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED,
|
||||
AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE,
|
||||
AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE,
|
||||
AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
||||
};
|
||||
use trx_core::vchan::SharedVChanManager;
|
||||
@@ -1823,6 +1823,23 @@ pub async fn run_audio_listener(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the next destroyed-channel UUID, or `pending()` when the receiver
|
||||
/// has been closed or is not present. Disables itself on close so the
|
||||
/// enclosing `select!` never busy-loops on a dead channel.
|
||||
async fn recv_destroyed(rx: &mut Option<broadcast::Receiver<Uuid>>) -> Option<Uuid> {
|
||||
match rx {
|
||||
None => std::future::pending::<Option<Uuid>>().await,
|
||||
Some(r) => match r.recv().await {
|
||||
Ok(uuid) => Some(uuid),
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => None,
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
*rx = None;
|
||||
std::future::pending::<Option<Uuid>>().await
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_audio_client(
|
||||
socket: TcpStream,
|
||||
@@ -1921,6 +1938,10 @@ async fn handle_audio_client(
|
||||
let opus_sample_rate = stream_info.sample_rate;
|
||||
let opus_channels = stream_info.channels;
|
||||
|
||||
// Subscribe to server-side channel destruction events (SDR rigs only).
|
||||
let mut destroyed_rx: Option<broadcast::Receiver<Uuid>> =
|
||||
vchan_manager.as_ref().map(|m| m.subscribe_destroyed());
|
||||
|
||||
let rx_handle = tokio::spawn(async move {
|
||||
// UUID → JoinHandle of per-channel Opus encoder task.
|
||||
let mut vchan_tasks: std::collections::HashMap<Uuid, tokio::task::JoinHandle<()>> =
|
||||
@@ -2047,6 +2068,25 @@ async fn handle_audio_client(
|
||||
}
|
||||
}
|
||||
}
|
||||
uuid = recv_destroyed(&mut destroyed_rx) => {
|
||||
if let Some(uuid) = uuid {
|
||||
// Stop encoding for this channel.
|
||||
if let Some(h) = vchan_tasks.remove(&uuid) {
|
||||
h.abort();
|
||||
}
|
||||
// Notify the client.
|
||||
if let Err(e) = write_vchan_uuid_msg(
|
||||
&mut writer_for_rx,
|
||||
AUDIO_MSG_VCHAN_DESTROYED,
|
||||
uuid,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Audio vchan destroyed write to {} failed: {}", peer, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -87,6 +87,8 @@ pub struct SdrVirtualChannelManager {
|
||||
/// Maximum total channels including the primary (enforced on `add_channel`).
|
||||
max_total: usize,
|
||||
channels: RwLock<Vec<ManagedChannel>>,
|
||||
/// Fires whenever a channel is destroyed (e.g. went out of SDR bandwidth).
|
||||
destroyed_tx: broadcast::Sender<Uuid>,
|
||||
}
|
||||
|
||||
impl SdrVirtualChannelManager {
|
||||
@@ -124,15 +126,22 @@ impl SdrVirtualChannelManager {
|
||||
permanent: true,
|
||||
};
|
||||
|
||||
let (destroyed_tx, _) = broadcast::channel::<Uuid>(16);
|
||||
|
||||
Self {
|
||||
center_hz: pipeline.shared_center_hz.clone(),
|
||||
pipeline,
|
||||
fixed_slot_count,
|
||||
max_total: max_total.max(1),
|
||||
channels: RwLock::new(vec![primary]),
|
||||
destroyed_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn destroyed_sender(&self) -> broadcast::Sender<Uuid> {
|
||||
self.destroyed_tx.clone()
|
||||
}
|
||||
|
||||
fn half_span_hz(&self) -> i64 {
|
||||
i64::from(self.pipeline.sdr_sample_rate) / 2
|
||||
}
|
||||
@@ -141,16 +150,28 @@ impl SdrVirtualChannelManager {
|
||||
/// Recomputes the IF offset for every virtual channel.
|
||||
pub fn update_center_hz(&self, new_center_hz: i64) {
|
||||
self.center_hz.store(new_center_hz, Ordering::Relaxed);
|
||||
let channels = self.channels.read().unwrap();
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
for ch in channels.iter().filter(|c| !c.permanent) {
|
||||
let new_if_hz = ch.freq_hz as i64 - new_center_hz;
|
||||
if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) {
|
||||
dsp_arc
|
||||
.lock()
|
||||
.unwrap()
|
||||
.set_channel_if_hz(new_if_hz as f64);
|
||||
let half_span = self.half_span_hz();
|
||||
|
||||
// Single pass under read lock: update in-band IF offsets and collect OOB IDs.
|
||||
let oob_ids: Vec<Uuid> = {
|
||||
let channels = self.channels.read().unwrap();
|
||||
let dsps = self.pipeline.channel_dsps.read().unwrap();
|
||||
let mut oob = Vec::new();
|
||||
for ch in channels.iter().filter(|c| !c.permanent) {
|
||||
let new_if_hz = ch.freq_hz as i64 - new_center_hz;
|
||||
if new_if_hz.unsigned_abs() as i64 > half_span {
|
||||
oob.push(ch.id);
|
||||
} else if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) {
|
||||
dsp_arc.lock().unwrap().set_channel_if_hz(new_if_hz as f64);
|
||||
}
|
||||
}
|
||||
oob
|
||||
}; // read locks released here
|
||||
|
||||
// Destroy OOB channels and notify subscribers.
|
||||
for id in oob_ids {
|
||||
let _ = self.remove_channel(id); // acquires write lock internally
|
||||
let _ = self.destroyed_tx.send(id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,6 +322,10 @@ impl VirtualChannelManager for SdrVirtualChannelManager {
|
||||
self.max_total
|
||||
}
|
||||
|
||||
fn subscribe_destroyed(&self) -> broadcast::Receiver<Uuid> {
|
||||
self.destroyed_tx.subscribe()
|
||||
}
|
||||
|
||||
fn ensure_channel_pcm(
|
||||
&self,
|
||||
id: Uuid,
|
||||
|
||||
Reference in New Issue
Block a user