diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 54aa54a..e478882 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -250,31 +250,13 @@ pub async fn events( initial.status.freq.hz, &format!("{:?}", initial.status.mode), ); - if scheduler_control.scheduler_allowed() { - let desired = { - let map = scheduler_status.read().unwrap_or_else(|e| e.into_inner()); - map.get(rid) - .filter(|status| status.active) - .map(|status| { - status - .last_bookmark_ids - .iter() - .filter_map(|bookmark_id| { - bookmark_store.get(bookmark_id).map(|bookmark| { - ( - bookmark_id.clone(), - bookmark.freq_hz, - bookmark.mode, - bookmark.bandwidth_hz.unwrap_or(0) as u32, - ) - }) - }) - .collect::>() - }) - .unwrap_or_default() - }; - vchan_mgr.sync_scheduler_channels(rid, &desired); - } + sync_scheduler_vchannels( + vchan_mgr.get_ref().as_ref(), + bookmark_store.get_ref().as_ref(), + scheduler_status.get_ref(), + scheduler_control.get_ref().as_ref(), + rid, + ); } // Build the prefix burst: rig state → session UUID → initial channels. @@ -304,10 +286,16 @@ pub async fn events( let counter_updates = counter.clone(); let context_updates = context.get_ref().clone(); let vchan_updates = vchan_mgr.get_ref().clone(); + let bookmark_store_updates = bookmark_store.get_ref().clone(); + let scheduler_status_updates = scheduler_status.get_ref().clone(); + let scheduler_control_updates = scheduler_control.get_ref().clone(); let updates = WatchStream::new(rx).filter_map(move |state| { let counter = counter_updates.clone(); let context = context_updates.clone(); let vchan = vchan_updates.clone(); + let bookmark_store = bookmark_store_updates.clone(); + let scheduler_status = scheduler_status_updates.clone(); + let scheduler_control = scheduler_control_updates.clone(); async move { state.snapshot().and_then(|v| { if let Ok(Some(rig_id)) = @@ -318,6 +306,13 @@ pub async fn events( v.status.freq.hz, &format!("{:?}", v.status.mode), ); + sync_scheduler_vchannels( + vchan.as_ref(), + bookmark_store.as_ref(), + &scheduler_status, + scheduler_control.as_ref(), + &rig_id, + ); } serde_json::to_string(&v).ok().map(|json| { let json = inject_frontend_meta( @@ -382,6 +377,43 @@ pub async fn events( .streaming(stream)) } +fn sync_scheduler_vchannels( + vchan_mgr: &ClientChannelManager, + bookmark_store: &crate::server::bookmarks::BookmarkStore, + scheduler_status: &crate::server::scheduler::SchedulerStatusMap, + scheduler_control: &crate::server::scheduler::SchedulerControlManager, + rig_id: &str, +) { + if !scheduler_control.scheduler_allowed() { + vchan_mgr.sync_scheduler_channels(rig_id, &[]); + return; + } + + let desired = { + let map = scheduler_status.read().unwrap_or_else(|e| e.into_inner()); + map.get(rig_id) + .filter(|status| status.active) + .map(|status| { + status + .last_bookmark_ids + .iter() + .filter_map(|bookmark_id| { + bookmark_store.get(bookmark_id).map(|bookmark| { + ( + bookmark_id.clone(), + bookmark.freq_hz, + bookmark.mode, + bookmark.bandwidth_hz.unwrap_or(0) as u32, + ) + }) + }) + .collect::>() + }) + .unwrap_or_default() + }; + vchan_mgr.sync_scheduler_channels(rig_id, &desired); +} + /// Build the combined decode history vector from all per-decoder ring-buffers. fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec { let mut out = Vec::new();