[fix](trx-frontend-http): filter SSE channel events by rig and fix audio hang
Channel SSE events were broadcast to all tabs regardless of rig, causing tabs to display wrong rig's channels. Per-rig audio info_rx override caused WebSocket to hang waiting for stream info that never arrives. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -473,29 +473,41 @@ pub async fn events(
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Channel-list change events from the virtual channel manager.
|
// Channel-list change events from the virtual channel manager.
|
||||||
|
// Only forward events for this SSE session's rig so tabs viewing
|
||||||
|
// different rigs don't see each other's channel lists.
|
||||||
let vchan_change_rx = vchan_mgr.change_tx.subscribe();
|
let vchan_change_rx = vchan_mgr.change_tx.subscribe();
|
||||||
let chan_updates = futures_util::stream::unfold(vchan_change_rx, |mut rx| async move {
|
let session_rig_for_chan = active_rig_id.clone();
|
||||||
loop {
|
let chan_updates = futures_util::stream::unfold(
|
||||||
match rx.recv().await {
|
(vchan_change_rx, session_rig_for_chan),
|
||||||
Ok(msg) => {
|
|(mut rx, srig)| async move {
|
||||||
if let Some(colon) = msg.find(':') {
|
loop {
|
||||||
let rig_id = &msg[..colon];
|
match rx.recv().await {
|
||||||
let channels_json = &msg[colon + 1..];
|
Ok(msg) => {
|
||||||
let payload =
|
if let Some(colon) = msg.find(':') {
|
||||||
format!("{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}");
|
let rig_id = &msg[..colon];
|
||||||
return Some((
|
// Skip channel events that belong to a different rig.
|
||||||
Ok::<Bytes, Error>(Bytes::from(format!(
|
if let Some(ref expected) = srig {
|
||||||
"event: channels\ndata: {payload}\n\n"
|
if rig_id != expected.as_str() {
|
||||||
))),
|
continue;
|
||||||
rx,
|
}
|
||||||
));
|
}
|
||||||
|
let channels_json = &msg[colon + 1..];
|
||||||
|
let payload =
|
||||||
|
format!("{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}");
|
||||||
|
return Some((
|
||||||
|
Ok::<Bytes, Error>(Bytes::from(format!(
|
||||||
|
"event: channels\ndata: {payload}\n\n"
|
||||||
|
))),
|
||||||
|
(rx, srig),
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
||||||
|
Err(broadcast::error::RecvError::Closed) => return None,
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
|
||||||
Err(broadcast::error::RecvError::Closed) => return None,
|
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
});
|
);
|
||||||
|
|
||||||
// Send a named "ping" event so the JS heartbeat can observe it.
|
// Send a named "ping" event so the JS heartbeat can observe it.
|
||||||
let pings = IntervalStream::new(time::interval(Duration::from_secs(5)))
|
let pings = IntervalStream::new(time::interval(Duration::from_secs(5)))
|
||||||
|
|||||||
@@ -537,13 +537,6 @@ pub async fn audio_ws(
|
|||||||
};
|
};
|
||||||
let mut rx_sub = rx_sub;
|
let mut rx_sub = rx_sub;
|
||||||
|
|
||||||
// Use per-rig audio info if available and rig_id was specified.
|
|
||||||
if let Some(ref rig_id) = query.rig_id {
|
|
||||||
if let Some(rig_info_rx) = context.rig_audio_info_rx(rig_id) {
|
|
||||||
info_rx = rig_info_rx;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
|
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
|
||||||
|
|
||||||
actix_web::rt::spawn(async move {
|
actix_web::rt::spawn(async move {
|
||||||
|
|||||||
Reference in New Issue
Block a user