[fix](trx-frontend-http): stop per-rig audio fallback to global broadcast
When ?rig_id= is specified on /audio, don't fall back to the global broadcast (which carries whichever rig is connected). Return 404 for rigs without an active audio connection instead of silently delivering the wrong rig's audio. Also create per-rig audio channels for all known rigs eagerly so connected rigs are instantly subscribable. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -570,22 +570,26 @@ pub async fn run_multi_rig_audio_manager(
|
|||||||
//
|
//
|
||||||
// On each audio connection, register the connected rig's per-rig channels
|
// On each audio connection, register the connected rig's per-rig channels
|
||||||
// so per-rig /audio?rig_id= subscribers get data.
|
// so per-rig /audio?rig_id= subscribers get data.
|
||||||
let selected_clone = selected_rig_id.clone();
|
|
||||||
let rig_audio_rx_clone = rig_audio_rx.clone();
|
let rig_audio_rx_clone = rig_audio_rx.clone();
|
||||||
let rig_audio_info_clone = rig_audio_info.clone();
|
let rig_audio_info_clone = rig_audio_info.clone();
|
||||||
|
|
||||||
// Spawn a task that keeps per-rig maps in sync with the selected rig.
|
// Spawn a task that keeps per-rig audio/info maps populated for ALL
|
||||||
|
// known rigs (not just the selected one). Non-connected rigs get valid
|
||||||
|
// but silent channels so `/audio?rig_id=X` can subscribe instantly
|
||||||
|
// instead of timing out.
|
||||||
|
let known_rigs_for_sync = known_rigs.clone();
|
||||||
let mut sync_shutdown = shutdown_rx.clone();
|
let mut sync_shutdown = shutdown_rx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut last_rig: Option<String> = None;
|
|
||||||
let mut interval = time::interval(Duration::from_millis(500));
|
let mut interval = time::interval(Duration::from_millis(500));
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = interval.tick() => {
|
_ = interval.tick() => {
|
||||||
let current = selected_clone.lock().ok().and_then(|v| v.clone());
|
let rig_ids: Vec<String> = known_rigs_for_sync
|
||||||
if current != last_rig {
|
.lock()
|
||||||
// Ensure per-rig broadcast exists for new rig.
|
.ok()
|
||||||
if let Some(ref rig_id) = current {
|
.map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect())
|
||||||
|
.unwrap_or_default();
|
||||||
|
for rig_id in &rig_ids {
|
||||||
if let Ok(mut map) = rig_audio_rx_clone.write() {
|
if let Ok(mut map) = rig_audio_rx_clone.write() {
|
||||||
map.entry(rig_id.clone())
|
map.entry(rig_id.clone())
|
||||||
.or_insert_with(|| broadcast::channel::<Bytes>(256).0);
|
.or_insert_with(|| broadcast::channel::<Bytes>(256).0);
|
||||||
@@ -595,10 +599,6 @@ pub async fn run_multi_rig_audio_manager(
|
|||||||
.or_insert_with(|| watch::channel(None).0);
|
.or_insert_with(|| watch::channel(None).0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
last_rig = current;
|
|
||||||
}
|
|
||||||
// Mirror global audio data to the current rig's per-rig channel.
|
|
||||||
// (The actual mirroring happens in the RX read task below.)
|
|
||||||
}
|
}
|
||||||
changed = sync_shutdown.changed() => {
|
changed = sync_shutdown.changed() => {
|
||||||
if matches!(changed, Ok(()) | Err(_)) && *sync_shutdown.borrow() {
|
if matches!(changed, Ok(()) | Err(_)) && *sync_shutdown.borrow() {
|
||||||
|
|||||||
@@ -519,15 +519,20 @@ pub async fn audio_ws(
|
|||||||
}
|
}
|
||||||
} else if let Some(ref rig_id) = query.rig_id {
|
} else if let Some(ref rig_id) = query.rig_id {
|
||||||
// Per-rig audio: subscribe to the specific rig's broadcast.
|
// Per-rig audio: subscribe to the specific rig's broadcast.
|
||||||
match context.rig_audio_subscribe(rig_id) {
|
// Do NOT fall back to global — that would silently deliver the wrong
|
||||||
Some(rx) => rx,
|
// rig's audio. Wait briefly for the per-rig channel to appear (it is
|
||||||
None => {
|
// lazily created by the audio relay sync task every 500ms).
|
||||||
// Rig not yet connected; fall back to global.
|
let deadline = Instant::now() + Duration::from_secs(3);
|
||||||
let Some(rx) = context.audio_rx.as_ref() else {
|
loop {
|
||||||
return Ok(HttpResponse::NotFound().body("audio not enabled"));
|
if let Some(rx) = context.rig_audio_subscribe(rig_id) {
|
||||||
};
|
break rx;
|
||||||
rx.subscribe()
|
|
||||||
}
|
}
|
||||||
|
if Instant::now() >= deadline {
|
||||||
|
return Ok(
|
||||||
|
HttpResponse::NotFound().body(format!("audio not available for rig {rig_id}"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let Some(rx) = context.audio_rx.as_ref() else {
|
let Some(rx) = context.audio_rx.as_ref() else {
|
||||||
|
|||||||
Reference in New Issue
Block a user