[fix](trx-frontend-http): sync scheduler channels live

Keep scheduler-managed virtual channels reconciled while\nclients remain connected, instead of only materializing\nthem during the initial connect path.\n\nVerification: cargo test -p trx-frontend-http vchan\n\nCo-authored-by: OpenAI Codex <codex@openai.com>

Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-14 13:33:57 +01:00
parent 9e8003afb6
commit 8603cee7be
@@ -250,31 +250,13 @@ pub async fn events(
initial.status.freq.hz, initial.status.freq.hz,
&format!("{:?}", initial.status.mode), &format!("{:?}", initial.status.mode),
); );
if scheduler_control.scheduler_allowed() { sync_scheduler_vchannels(
let desired = { vchan_mgr.get_ref().as_ref(),
let map = scheduler_status.read().unwrap_or_else(|e| e.into_inner()); bookmark_store.get_ref().as_ref(),
map.get(rid) scheduler_status.get_ref(),
.filter(|status| status.active) scheduler_control.get_ref().as_ref(),
.map(|status| { rid,
status );
.last_bookmark_ids
.iter()
.filter_map(|bookmark_id| {
bookmark_store.get(bookmark_id).map(|bookmark| {
(
bookmark_id.clone(),
bookmark.freq_hz,
bookmark.mode,
bookmark.bandwidth_hz.unwrap_or(0) as u32,
)
})
})
.collect::<Vec<_>>()
})
.unwrap_or_default()
};
vchan_mgr.sync_scheduler_channels(rid, &desired);
}
} }
// Build the prefix burst: rig state → session UUID → initial channels. // Build the prefix burst: rig state → session UUID → initial channels.
@@ -304,10 +286,16 @@ pub async fn events(
let counter_updates = counter.clone(); let counter_updates = counter.clone();
let context_updates = context.get_ref().clone(); let context_updates = context.get_ref().clone();
let vchan_updates = vchan_mgr.get_ref().clone(); let vchan_updates = vchan_mgr.get_ref().clone();
let bookmark_store_updates = bookmark_store.get_ref().clone();
let scheduler_status_updates = scheduler_status.get_ref().clone();
let scheduler_control_updates = scheduler_control.get_ref().clone();
let updates = WatchStream::new(rx).filter_map(move |state| { let updates = WatchStream::new(rx).filter_map(move |state| {
let counter = counter_updates.clone(); let counter = counter_updates.clone();
let context = context_updates.clone(); let context = context_updates.clone();
let vchan = vchan_updates.clone(); let vchan = vchan_updates.clone();
let bookmark_store = bookmark_store_updates.clone();
let scheduler_status = scheduler_status_updates.clone();
let scheduler_control = scheduler_control_updates.clone();
async move { async move {
state.snapshot().and_then(|v| { state.snapshot().and_then(|v| {
if let Ok(Some(rig_id)) = if let Ok(Some(rig_id)) =
@@ -318,6 +306,13 @@ pub async fn events(
v.status.freq.hz, v.status.freq.hz,
&format!("{:?}", v.status.mode), &format!("{:?}", v.status.mode),
); );
sync_scheduler_vchannels(
vchan.as_ref(),
bookmark_store.as_ref(),
&scheduler_status,
scheduler_control.as_ref(),
&rig_id,
);
} }
serde_json::to_string(&v).ok().map(|json| { serde_json::to_string(&v).ok().map(|json| {
let json = inject_frontend_meta( let json = inject_frontend_meta(
@@ -382,6 +377,43 @@ pub async fn events(
.streaming(stream)) .streaming(stream))
} }
fn sync_scheduler_vchannels(
vchan_mgr: &ClientChannelManager,
bookmark_store: &crate::server::bookmarks::BookmarkStore,
scheduler_status: &crate::server::scheduler::SchedulerStatusMap,
scheduler_control: &crate::server::scheduler::SchedulerControlManager,
rig_id: &str,
) {
if !scheduler_control.scheduler_allowed() {
vchan_mgr.sync_scheduler_channels(rig_id, &[]);
return;
}
let desired = {
let map = scheduler_status.read().unwrap_or_else(|e| e.into_inner());
map.get(rig_id)
.filter(|status| status.active)
.map(|status| {
status
.last_bookmark_ids
.iter()
.filter_map(|bookmark_id| {
bookmark_store.get(bookmark_id).map(|bookmark| {
(
bookmark_id.clone(),
bookmark.freq_hz,
bookmark.mode,
bookmark.bandwidth_hz.unwrap_or(0) as u32,
)
})
})
.collect::<Vec<_>>()
})
.unwrap_or_default()
};
vchan_mgr.sync_scheduler_channels(rig_id, &desired);
}
/// Build the combined decode history vector from all per-decoder ring-buffers. /// Build the combined decode history vector from all per-decoder ring-buffers.
fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec<trx_core::decode::DecodedMessage> { fn collect_decode_history(context: &FrontendRuntimeContext) -> Vec<trx_core::decode::DecodedMessage> {
let mut out = Vec::new(); let mut out = Vec::new();