diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index 5289943..e3b3297 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -43,17 +43,177 @@ struct ActiveVChanSub { decoder_kinds: Vec, } -/// Run the audio client with auto-reconnect. +/// Per-rig audio task state, tracked by the multi-rig manager. +struct PerRigAudioTask { + handle: tokio::task::JoinHandle<()>, + shutdown_tx: watch::Sender, + port: u16, +} + +/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on +/// demand as rigs appear/disappear from the known_rigs list. Each rig with +/// an `audio_port` gets its own TCP connection. #[allow(clippy::too_many_arguments)] -pub async fn run_audio_client( +pub async fn run_multi_rig_audio_manager( server_host: String, default_port: u16, rig_ports: HashMap, selected_rig_id: Arc>>, known_rigs: Arc>>, - rx_tx: broadcast::Sender, - mut tx_rx: mpsc::Receiver, - stream_info_tx: watch::Sender>, + global_rx_tx: broadcast::Sender, + tx_rx: mpsc::Receiver, + global_stream_info_tx: watch::Sender>, + decode_tx: broadcast::Sender, + replay_history_sink: Option>, + mut shutdown_rx: watch::Receiver, + vchan_audio: Arc>>>, + _vchan_cmd_rx: mpsc::UnboundedReceiver, + vchan_destroyed_tx: Option>, + rig_audio_rx: Arc>>>, + rig_audio_info: Arc>>>>, + rig_vchan_audio_cmd: Arc>>>, +) { + // TX frames from the microphone go to the selected rig only. + // We wrap the single tx_rx receiver so the per-rig task for the selected + // rig can consume it. + let tx_rx = Arc::new(tokio::sync::Mutex::new(tx_rx)); + + let mut active_tasks: HashMap = HashMap::new(); + let mut poll_interval = time::interval(Duration::from_millis(500)); + + loop { + tokio::select! { + _ = poll_interval.tick() => { + // Collect current known rigs and their audio ports. + let current_rigs: HashMap = known_rigs + .lock() + .ok() + .map(|entries| { + entries.iter().map(|e| { + let port = rig_ports.get(&e.rig_id).copied() + .or(e.audio_port) + .unwrap_or(default_port); + (e.rig_id.clone(), port) + }).collect() + }) + .unwrap_or_default(); + + // Tear down tasks for rigs that are no longer present or + // whose port has changed. + let to_remove: Vec = active_tasks.keys() + .filter(|id| { + match current_rigs.get(*id) { + None => true, + Some(port) => active_tasks.get(*id) + .is_none_or(|t| t.port != *port), + } + }) + .cloned() + .collect(); + for rig_id in &to_remove { + if let Some(task) = active_tasks.remove(rig_id) { + let _ = task.shutdown_tx.send(true); + task.handle.abort(); + info!("Audio client: stopped task for rig {}", rig_id); + } + } + + // Spawn tasks for new rigs. + for (rig_id, port) in ¤t_rigs { + if active_tasks.contains_key(rig_id) { + continue; + } + + let (per_rig_shutdown_tx, per_rig_shutdown_rx) = watch::channel(false); + + // Ensure per-rig broadcast and info channels exist. + let per_rig_rx_tx = { + let mut map = rig_audio_rx.write().unwrap(); + map.entry(rig_id.clone()) + .or_insert_with(|| broadcast::channel::(256).0) + .clone() + }; + let per_rig_info_tx = { + let mut map = rig_audio_info.write().unwrap(); + map.entry(rig_id.clone()) + .or_insert_with(|| watch::channel(None).0) + .clone() + }; + + // Create per-rig vchan cmd channel. + let (per_rig_vchan_tx, per_rig_vchan_rx) = + mpsc::unbounded_channel::(); + if let Ok(mut map) = rig_vchan_audio_cmd.write() { + map.insert(rig_id.clone(), per_rig_vchan_tx); + } + + let addr = format!("{}:{}", server_host, port); + let rig_id_clone = rig_id.clone(); + let global_rx_tx_clone = global_rx_tx.clone(); + let global_info_tx_clone = global_stream_info_tx.clone(); + let selected_clone = selected_rig_id.clone(); + let decode_tx_clone = decode_tx.clone(); + let replay_sink = replay_history_sink.clone(); + let vchan_audio_clone = vchan_audio.clone(); + let vchan_destroyed_clone = vchan_destroyed_tx.clone(); + let tx_rx_clone = tx_rx.clone(); + + let handle = tokio::spawn(async move { + run_single_rig_audio_client( + addr, + rig_id_clone, + selected_clone, + per_rig_rx_tx, + per_rig_info_tx, + global_rx_tx_clone, + global_info_tx_clone, + tx_rx_clone, + decode_tx_clone, + replay_sink, + per_rig_shutdown_rx, + vchan_audio_clone, + per_rig_vchan_rx, + vchan_destroyed_clone, + ) + .await; + }); + + info!("Audio client: started task for rig {} ({}:{})", rig_id, server_host, port); + active_tasks.insert(rig_id.clone(), PerRigAudioTask { + handle, + shutdown_tx: per_rig_shutdown_tx, + port: *port, + }); + } + } + changed = shutdown_rx.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() { + // Shut down all per-rig tasks. + for (rig_id, task) in active_tasks.drain() { + let _ = task.shutdown_tx.send(true); + task.handle.abort(); + info!("Audio client: shutdown task for rig {}", rig_id); + } + return; + } + } + } + } +} + +/// Audio client for a single rig. Maintains its own TCP connection with +/// auto-reconnect, publishes RX frames to both per-rig and (if selected) +/// global broadcast channels. +#[allow(clippy::too_many_arguments)] +async fn run_single_rig_audio_client( + server_addr: String, + rig_id: String, + selected_rig_id: Arc>>, + per_rig_rx_tx: broadcast::Sender, + per_rig_info_tx: watch::Sender>, + global_rx_tx: broadcast::Sender, + global_info_tx: watch::Sender>, + tx_rx: Arc>>, decode_tx: broadcast::Sender, replay_history_sink: Option>, mut shutdown_rx: watch::Receiver, @@ -62,42 +222,34 @@ pub async fn run_audio_client( vchan_destroyed_tx: Option>, ) { let mut reconnect_delay = Duration::from_secs(1); - // Active virtual-channel subscriptions, keyed by UUID, re-sent to the - // server on every audio TCP reconnect. let mut active_subs: HashMap = HashMap::new(); + let is_selected = |sel: &Arc>>, rid: &str| -> bool { + sel.lock() + .ok() + .and_then(|v| v.clone()) + .is_some_and(|s| s == rid) + }; + loop { if *shutdown_rx.borrow() { - info!("Audio client shutting down"); + info!("Audio client [{}]: shutting down", rig_id); return; } - let server_addr = resolve_audio_addr( - &server_host, - default_port, - &rig_ports, - &known_rigs, - selected_rig_id - .lock() - .ok() - .and_then(|v| v.clone()) - .as_deref(), - ); - info!("Audio client: connecting to {}", server_addr); + info!("Audio client [{}]: connecting to {}", rig_id, server_addr); match TcpStream::connect(&server_addr).await { Ok(stream) => { reconnect_delay = Duration::from_secs(1); - if let Err(e) = handle_audio_connection( + if let Err(e) = handle_single_rig_connection( stream, - &server_host, - default_port, - &rig_ports, + &rig_id, &selected_rig_id, - &known_rigs, - &server_addr, - &rx_tx, - &mut tx_rx, - &stream_info_tx, + &per_rig_rx_tx, + &per_rig_info_tx, + &global_rx_tx, + &global_info_tx, + &tx_rx, &decode_tx, replay_history_sink.clone(), &mut shutdown_rx, @@ -108,21 +260,25 @@ pub async fn run_audio_client( ) .await { - warn!("Audio connection dropped: {}", e); + warn!("Audio connection [{}] dropped: {}", rig_id, e); } } Err(e) => { - warn!("Audio connect failed: {}", e); + warn!("Audio connect [{}] failed: {}", rig_id, e); } } - let _ = stream_info_tx.send(None); + let _ = per_rig_info_tx.send(None); + if is_selected(&selected_rig_id, &rig_id) { + let _ = global_info_tx.send(None); + } + tokio::select! { _ = time::sleep(reconnect_delay) => {} changed = shutdown_rx.changed() => { match changed { Ok(()) if *shutdown_rx.borrow() => { - info!("Audio client shutting down"); + info!("Audio client [{}]: shutting down", rig_id); return; } Ok(()) => {} @@ -134,18 +290,18 @@ pub async fn run_audio_client( } } +/// Handle a single TCP connection for one rig. Similar to `handle_audio_connection` +/// but publishes to per-rig channels directly and mirrors to global when selected. #[allow(clippy::too_many_arguments)] -async fn handle_audio_connection( +async fn handle_single_rig_connection( stream: TcpStream, - server_host: &str, - default_port: u16, - rig_ports: &HashMap, + rig_id: &str, selected_rig_id: &Arc>>, - known_rigs: &Arc>>, - connected_addr: &str, - rx_tx: &broadcast::Sender, - tx_rx: &mut mpsc::Receiver, - stream_info_tx: &watch::Sender>, + per_rig_rx_tx: &broadcast::Sender, + per_rig_info_tx: &watch::Sender>, + global_rx_tx: &broadcast::Sender, + global_info_tx: &watch::Sender>, + tx_rx: &Arc>>, decode_tx: &broadcast::Sender, replay_history_sink: Option>, shutdown_rx: &mut watch::Receiver, @@ -169,14 +325,22 @@ async fn handle_audio_connection( let info: AudioStreamInfo = serde_json::from_slice(&payload) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; info!( - "Audio stream info: {}Hz, {} ch, {}ms", - info.sample_rate, info.channels, info.frame_duration_ms + "Audio stream info [{}]: {}Hz, {} ch, {}ms", + rig_id, info.sample_rate, info.channels, info.frame_duration_ms ); - let _ = stream_info_tx.send(Some(info)); + let _ = per_rig_info_tx.send(Some(info.clone())); - // On reconnect: re-subscribe all previously active virtual channels. - // Track which UUIDs were pre-sent so we don't duplicate them when the - // same Subscribe command arrives from the mpsc queue. + // Mirror to global if this is the selected rig. + let is_selected = selected_rig_id + .lock() + .ok() + .and_then(|v| v.clone()) + .is_some_and(|s| s == rig_id); + if is_selected { + let _ = global_info_tx.send(Some(info)); + } + + // Re-subscribe active virtual channels on reconnect. let mut resubscribed: HashSet = HashSet::new(); for (&uuid, sub) in active_subs.iter() { let json = serde_json::json!({ @@ -189,17 +353,16 @@ async fn handle_audio_connection( }); if let Ok(payload) = serde_json::to_vec(&json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { - warn!("Audio vchan reconnect SUB write failed: {}", e); + warn!("Audio vchan reconnect SUB write failed [{}]: {}", rig_id, e); return Err(e); } } - // Re-apply non-default bandwidth after re-subscribing. if sub.bandwidth_hz > 0 { let bw_json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz }); if let Ok(payload) = serde_json::to_vec(&bw_json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await { - warn!("Audio vchan reconnect BW write failed: {}", e); + warn!("Audio vchan reconnect BW write failed [{}]: {}", rig_id, e); return Err(e); } } @@ -207,9 +370,12 @@ async fn handle_audio_connection( resubscribed.insert(uuid); } - // Spawn RX read task - let rx_tx = rx_tx.clone(); - let decode_tx = decode_tx.clone(); + // Spawn RX read task — publishes to per-rig and (when selected) global. + let per_rig_rx_clone = per_rig_rx_tx.clone(); + let global_rx_clone = global_rx_tx.clone(); + let selected_for_rx = selected_rig_id.clone(); + let rig_id_for_rx = rig_id.to_string(); + let decode_tx_clone = decode_tx.clone(); let vchan_audio_rx: Arc>>> = Arc::clone(vchan_audio); let vchan_destroyed_for_rx = vchan_destroyed_tx.clone(); @@ -217,10 +383,20 @@ async fn handle_audio_connection( loop { match read_audio_msg(&mut reader).await { Ok((AUDIO_MSG_RX_FRAME, payload)) => { - let _ = rx_tx.send(Bytes::from(payload)); + let data = Bytes::from(payload); + // Always publish to per-rig channel. + let _ = per_rig_rx_clone.send(data.clone()); + // Mirror to global if this rig is currently selected. + let sel = selected_for_rx + .lock() + .ok() + .and_then(|v| v.clone()) + .is_some_and(|s| s == rig_id_for_rx); + if sel { + let _ = global_rx_clone.send(data); + } } Ok((AUDIO_MSG_RX_FRAME_CH, payload)) => { - // Route per-channel Opus frame to the correct broadcaster. if let Ok((uuid, opus)) = parse_vchan_audio_frame(&payload) { let pkt = Bytes::copy_from_slice(opus); if let Ok(map) = vchan_audio_rx.read() { @@ -231,8 +407,6 @@ async fn handle_audio_connection( } } Ok((AUDIO_MSG_VCHAN_ALLOCATED, payload)) => { - // Server confirmed a virtual channel is ready; ensure a - // broadcaster entry exists in the shared map. if let Ok(uuid) = parse_vchan_uuid_msg(&payload) { if let Ok(mut map) = vchan_audio_rx.write() { map.entry(uuid) @@ -242,19 +416,15 @@ async fn handle_audio_connection( } Ok((AUDIO_MSG_VCHAN_DESTROYED, payload)) => { if let Ok(uuid) = parse_vchan_uuid_msg(&payload) { - // Remove the broadcaster so audio_ws gets no more frames. if let Ok(mut map) = vchan_audio_rx.write() { map.remove(&uuid); } - // Notify the HTTP frontend so it removes the channel from - // ClientChannelManager (triggers SSE channels event). if let Some(ref tx) = vchan_destroyed_for_rx { let _ = tx.send(uuid); } } } Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => { - // Decompress gzip blob, then iterate the embedded framed messages. let mut decompressed = Vec::new(); if GzDecoder::new(payload.as_slice()) .read_to_end(&mut decompressed) @@ -296,20 +466,30 @@ async fn handle_audio_connection( payload, )) => { if let Ok(msg) = serde_json::from_slice::(&payload) { - let _ = decode_tx.send(msg); + let _ = decode_tx_clone.send(msg); } } Ok((msg_type, _)) => { - warn!("Audio client: unexpected message type {:#04x}", msg_type); + warn!( + "Audio client [{}]: unexpected message type {:#04x}", + rig_id_for_rx, msg_type + ); } Err(_) => break, } } }); - // Forward TX frames and VChanAudioCmds to server. - let mut rig_check = time::interval(Duration::from_millis(500)); + // Forward TX frames (only when we are the selected rig) and vchan commands. + let rig_id_owned = rig_id.to_string(); loop { + // Only the selected rig should consume TX frames from the mic. + let is_sel = selected_rig_id + .lock() + .ok() + .and_then(|v| v.clone()) + .is_some_and(|s| s == rig_id_owned); + tokio::select! { changed = shutdown_rx.changed() => { match changed { @@ -324,11 +504,18 @@ async fn handle_audio_connection( } } } - packet = tx_rx.recv() => { + packet = async { + if is_sel { + tx_rx.lock().await.recv().await + } else { + // Not selected — don't consume TX frames; pend forever. + std::future::pending().await + } + } => { match packet { Some(data) => { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_TX_FRAME, &data).await { - warn!("Audio TX write failed: {}", e); + warn!("Audio TX write failed [{}]: {}", rig_id_owned, e); break; } } @@ -345,9 +532,7 @@ async fn handle_audio_connection( hidden: false, decoder_kinds: decoder_kinds.clone(), }); - // Skip if already re-sent during reconnect initialization. if resubscribed.remove(&uuid) { - // Already sent above; don't duplicate. } else { let json = serde_json::json!({ "uuid": uuid.to_string(), @@ -359,7 +544,7 @@ async fn handle_audio_connection( }); if let Ok(payload) = serde_json::to_vec(&json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { - warn!("Audio vchan SUB write failed: {}", e); + warn!("Audio vchan SUB write failed [{}]: {}", rig_id_owned, e); break; } } @@ -374,7 +559,6 @@ async fn handle_audio_connection( decoder_kinds: decoder_kinds.clone(), }); if resubscribed.remove(&uuid) { - // Already sent above; don't duplicate. } else { let json = serde_json::json!({ "uuid": uuid.to_string(), @@ -386,7 +570,7 @@ async fn handle_audio_connection( }); if let Ok(payload) = serde_json::to_vec(&json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { - warn!("Audio background SUB write failed: {}", e); + warn!("Audio background SUB write failed [{}]: {}", rig_id_owned, e); break; } } @@ -394,16 +578,15 @@ async fn handle_audio_connection( } Some(VChanAudioCmd::Unsubscribe(uuid)) => { if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_UNSUB, uuid).await { - warn!("Audio vchan UNSUB write failed: {}", e); + warn!("Audio vchan UNSUB write failed [{}]: {}", rig_id_owned, e); break; } } Some(VChanAudioCmd::Remove(uuid)) => { if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_REMOVE, uuid).await { - warn!("Audio vchan REMOVE write failed: {}", e); + warn!("Audio vchan REMOVE write failed [{}]: {}", rig_id_owned, e); break; } - // Clean up local broadcaster. if let Ok(mut map) = vchan_audio.write() { map.remove(&uuid); } @@ -416,7 +599,7 @@ async fn handle_audio_connection( let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz }); if let Ok(payload) = serde_json::to_vec(&json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_FREQ, &payload).await { - warn!("Audio vchan FREQ write failed: {}", e); + warn!("Audio vchan FREQ write failed [{}]: {}", rig_id_owned, e); break; } } @@ -428,20 +611,19 @@ async fn handle_audio_connection( let json = serde_json::json!({ "uuid": uuid.to_string(), "mode": mode }); if let Ok(payload) = serde_json::to_vec(&json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_MODE, &payload).await { - warn!("Audio vchan MODE write failed: {}", e); + warn!("Audio vchan MODE write failed [{}]: {}", rig_id_owned, e); break; } } } Some(VChanAudioCmd::SetBandwidth { uuid, bandwidth_hz }) => { - // Persist for reconnect. if let Some(entry) = active_subs.get_mut(&uuid) { entry.bandwidth_hz = bandwidth_hz; } let json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz }); if let Ok(payload) = serde_json::to_vec(&json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await { - warn!("Audio vchan BW write failed: {}", e); + warn!("Audio vchan BW write failed [{}]: {}", rig_id_owned, e); break; } } @@ -452,270 +634,8 @@ async fn handle_audio_connection( _ = &mut rx_handle => { break; } - _ = rig_check.tick() => { - let current_rig = selected_rig_id.lock().ok().and_then(|v| v.clone()); - let desired_addr = resolve_audio_addr( - server_host, - default_port, - rig_ports, - known_rigs, - current_rig.as_deref(), - ); - if desired_addr != connected_addr { - info!( - "Audio client: active rig changed ({} -> {}), reconnecting audio", - connected_addr, - desired_addr - ); - break; - } - } } } Ok(()) } - -/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on -/// demand as rigs appear/disappear from the known_rigs list. -#[allow(clippy::too_many_arguments)] -pub async fn run_multi_rig_audio_manager( - server_host: String, - default_port: u16, - rig_ports: HashMap, - selected_rig_id: Arc>>, - known_rigs: Arc>>, - global_rx_tx: broadcast::Sender, - tx_rx: mpsc::Receiver, - global_stream_info_tx: watch::Sender>, - decode_tx: broadcast::Sender, - replay_history_sink: Option>, - shutdown_rx: watch::Receiver, - vchan_audio: Arc>>>, - vchan_cmd_rx: mpsc::UnboundedReceiver, - vchan_destroyed_tx: Option>, - rig_audio_rx: Arc>>>, - rig_audio_info: Arc>>>>, - rig_vchan_audio_cmd: Arc>>>, -) { - // Per-rig vchan command routing: create per-rig senders that relay into the - // single global vchan_cmd channel. When the ClientChannelManager or - // BackgroundDecodeManager sends a command for a specific rig, it goes - // through the per-rig sender, which forwards to the global channel that - // the single-connection audio client reads from. - let (global_vchan_cmd_tx, vchan_cmd_rx) = { - // We take ownership of vchan_cmd_rx and create a global sender that - // per-rig relays will forward through. - let (tx, rx) = mpsc::unbounded_channel::(); - // Spawn relay from the original vchan_cmd_rx (from main.rs). - let mut orig_rx = vchan_cmd_rx; - let tx_for_orig = tx.clone(); - tokio::spawn(async move { - while let Some(cmd) = orig_rx.recv().await { - let _ = tx_for_orig.send(cmd); - } - }); - (tx, rx) - }; - - // Populate per-rig vchan senders for known rigs and keep them in sync. - let rig_vchan_for_sync = rig_vchan_audio_cmd.clone(); - let known_rigs_for_vchan = known_rigs.clone(); - let global_vchan_for_sync = global_vchan_cmd_tx.clone(); - let mut vchan_sync_shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - let mut interval = time::interval(Duration::from_millis(500)); - loop { - tokio::select! { - _ = interval.tick() => { - let rig_ids: Vec = known_rigs_for_vchan - .lock() - .ok() - .map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect()) - .unwrap_or_default(); - if let Ok(mut map) = rig_vchan_for_sync.write() { - for rig_id in &rig_ids { - if !map.contains_key(rig_id) { - // Create a per-rig sender that relays to global. - let (per_rig_tx, mut per_rig_rx) = - mpsc::unbounded_channel::(); - let global_tx = global_vchan_for_sync.clone(); - tokio::spawn(async move { - while let Some(cmd) = per_rig_rx.recv().await { - let _ = global_tx.send(cmd); - } - }); - map.insert(rig_id.clone(), per_rig_tx); - } - } - // Remove senders for rigs no longer present. - let active: std::collections::HashSet<&str> = - rig_ids.iter().map(|s| s.as_str()).collect(); - map.retain(|id, _| active.contains(id.as_str())); - } - } - changed = vchan_sync_shutdown.changed() => { - if matches!(changed, Ok(()) | Err(_)) && *vchan_sync_shutdown.borrow() { - break; - } - } - } - } - }); - - // For now, delegate to the existing single-connection audio client. - // The per-rig channels are populated based on the rig that the single - // client connects to (the selected rig), providing per-rig subscriptions - // without the complexity of multiple TCP connections in the initial impl. - // - // On each audio connection, register the connected rig's per-rig channels - // so per-rig /audio?rig_id= subscribers get data. - let rig_audio_rx_clone = rig_audio_rx.clone(); - let rig_audio_info_clone = rig_audio_info.clone(); - - // Spawn a task that keeps per-rig audio/info maps populated for ALL - // known rigs (not just the selected one). Non-connected rigs get valid - // but silent channels so `/audio?rig_id=X` can subscribe instantly - // instead of timing out. - let known_rigs_for_sync = known_rigs.clone(); - let mut sync_shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - let mut interval = time::interval(Duration::from_millis(500)); - loop { - tokio::select! { - _ = interval.tick() => { - let rig_ids: Vec = known_rigs_for_sync - .lock() - .ok() - .map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect()) - .unwrap_or_default(); - for rig_id in &rig_ids { - if let Ok(mut map) = rig_audio_rx_clone.write() { - map.entry(rig_id.clone()) - .or_insert_with(|| broadcast::channel::(256).0); - } - if let Ok(mut map) = rig_audio_info_clone.write() { - map.entry(rig_id.clone()) - .or_insert_with(|| watch::channel(None).0); - } - } - } - changed = sync_shutdown.changed() => { - if matches!(changed, Ok(()) | Err(_)) && *sync_shutdown.borrow() { - break; - } - } - } - } - }); - - // Wrap the global_rx_tx in a relay that also publishes to per-rig channels. - let (relay_rx_tx, _) = broadcast::channel::(256); - let relay_clone = relay_rx_tx.clone(); - let rig_audio_rx_for_relay = rig_audio_rx.clone(); - let selected_for_relay = selected_rig_id.clone(); - let mut relay_sub = global_rx_tx.subscribe(); - let mut relay_shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - result = relay_sub.recv() => { - match result { - Ok(data) => { - // Forward to per-rig channel for the selected rig. - if let Some(rig_id) = selected_for_relay.lock().ok().and_then(|v| v.clone()) { - if let Ok(map) = rig_audio_rx_for_relay.read() { - if let Some(tx) = map.get(&rig_id) { - let _ = tx.send(data.clone()); - } - } - } - } - Err(broadcast::error::RecvError::Lagged(_)) => continue, - Err(broadcast::error::RecvError::Closed) => break, - } - } - changed = relay_shutdown.changed() => { - if matches!(changed, Ok(()) | Err(_)) && *relay_shutdown.borrow() { - break; - } - } - } - } - }); - - // Also relay stream info changes to per-rig info channels. - let mut info_relay_rx = global_stream_info_tx.subscribe(); - let rig_audio_info_for_relay = rig_audio_info.clone(); - let selected_for_info_relay = selected_rig_id.clone(); - let mut info_relay_shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - changed = info_relay_rx.changed() => { - match changed { - Ok(()) => { - let info = info_relay_rx.borrow().clone(); - if let Some(rig_id) = selected_for_info_relay.lock().ok().and_then(|v| v.clone()) { - if let Ok(map) = rig_audio_info_for_relay.read() { - if let Some(tx) = map.get(&rig_id) { - let _ = tx.send(info); - } - } - } - } - Err(_) => break, - } - } - changed = info_relay_shutdown.changed() => { - if matches!(changed, Ok(()) | Err(_)) && *info_relay_shutdown.borrow() { - break; - } - } - } - } - }); - - let _ = relay_clone; - - // Delegate to existing single-connection audio client. - run_audio_client( - server_host, - default_port, - rig_ports, - selected_rig_id, - known_rigs, - global_rx_tx, - tx_rx, - global_stream_info_tx, - decode_tx, - replay_history_sink, - shutdown_rx, - vchan_audio, - vchan_cmd_rx, - vchan_destroyed_tx, - ) - .await; -} - -fn resolve_audio_addr( - host: &str, - default_port: u16, - rig_ports: &HashMap, - known_rigs: &Arc>>, - selected_rig_id: Option<&str>, -) -> String { - let port = selected_rig_id - .and_then(|rig_id| { - rig_ports.get(rig_id).copied().or_else(|| { - known_rigs.lock().ok().and_then(|entries| { - entries - .iter() - .find(|entry| entry.rig_id == rig_id) - .and_then(|entry| entry.audio_port) - }) - }) - }) - .unwrap_or(default_port); - format!("{}:{}", host, port) -} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/bookmarks.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/bookmarks.js index cea5b68..e5f3d4b 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/bookmarks.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/bookmarks.js @@ -324,62 +324,67 @@ async function bmDelete(id) { async function bmApply(bm) { try { - const onVirtual = typeof vchanInterceptMode === "function" - && await vchanInterceptMode(bm.mode); - if (!onVirtual) { - await postPath("/set_mode?mode=" + encodeURIComponent(bm.mode)); - } + // --- Optimistic UI updates (instant, before any network round-trips) --- if (typeof modeEl !== "undefined" && modeEl) { modeEl.value = String(bm.mode || "").toUpperCase(); } if (bm.bandwidth_hz) { + if (typeof currentBandwidthHz !== "undefined") { + currentBandwidthHz = bm.bandwidth_hz; + } + window.currentBandwidthHz = bm.bandwidth_hz; + if (typeof syncBandwidthInput === "function") { + syncBandwidthInput(bm.bandwidth_hz); + } + } + if (typeof applyLocalTunedFrequency === "function") { + applyLocalTunedFrequency(bm.freq_hz); + } + if (typeof scheduleSpectrumDraw === "function" && typeof lastSpectrumData !== "undefined" && lastSpectrumData) { + scheduleSpectrumDraw(); + } + + // --- Send mode, bandwidth, and frequency to server in parallel --- + const modePromise = (async () => { + const onVirtual = typeof vchanInterceptMode === "function" + && await vchanInterceptMode(bm.mode); + if (!onVirtual) { + await postPath("/set_mode?mode=" + encodeURIComponent(bm.mode)); + } + })(); + const bwPromise = bm.bandwidth_hz ? (async () => { const bwHandledByVchan = typeof vchanInterceptBandwidth === "function" && await vchanInterceptBandwidth(bm.bandwidth_hz); if (!bwHandledByVchan) { await postPath("/set_bandwidth?hz=" + bm.bandwidth_hz); } - if (typeof currentBandwidthHz !== "undefined") { - currentBandwidthHz = bm.bandwidth_hz; - } - if (typeof window !== "undefined") { - window.currentBandwidthHz = bm.bandwidth_hz; - } - if (typeof syncBandwidthInput === "function") { - syncBandwidthInput(bm.bandwidth_hz); - } - } + })() : Promise.resolve(); // setRigFrequency is wrapped by vchan.js to redirect to the channel API // when on a virtual channel, so this call works correctly in both cases. - if (typeof setRigFrequency === "function") { - await setRigFrequency(bm.freq_hz); - } else { - await postPath("/set_freq?hz=" + bm.freq_hz); - } - // Toggle decoders when in DIG mode. + // It also does its own optimistic update (applyLocalTunedFrequency) but + // that's a no-op since we already set the same value above. + const freqPromise = (async () => { + if (typeof setRigFrequency === "function") { + await setRigFrequency(bm.freq_hz); + } else { + await postPath("/set_freq?hz=" + bm.freq_hz); + } + })(); + await Promise.all([modePromise, bwPromise, freqPromise]); + + // --- Toggle decoders when in DIG mode --- if (bm.mode === "DIG" && Array.isArray(bm.decoders)) { const statusResp = await fetch("/status"); if (statusResp.ok) { const st = await statusResp.json(); - const wantFt8 = bm.decoders.includes("ft8"); - if (wantFt8 !== !!st.ft8_decode_enabled) { - await postPath("/toggle_ft8_decode"); - } - const wantFt4 = bm.decoders.includes("ft4"); - if (wantFt4 !== !!st.ft4_decode_enabled) { - await postPath("/toggle_ft4_decode"); - } - const wantFt2 = bm.decoders.includes("ft2"); - if (wantFt2 !== !!st.ft2_decode_enabled) { - await postPath("/toggle_ft2_decode"); - } - const wantWspr = bm.decoders.includes("wspr"); - if (wantWspr !== !!st.wspr_decode_enabled) { - await postPath("/toggle_wspr_decode"); - } - const wantHfAprs = bm.decoders.includes("hf-aprs"); - if (wantHfAprs !== !!st.hf_aprs_decode_enabled) { - await postPath("/toggle_hf_aprs_decode"); - } + const toggles = []; + const check = (key) => { + if (bm.decoders.includes(key) !== !!st[key.replace(/-/g, "_") + "_decode_enabled"]) { + toggles.push(postPath("/toggle_" + key.replace(/-/g, "_") + "_decode")); + } + }; + check("ft8"); check("ft4"); check("ft2"); check("wspr"); check("hf-aprs"); + if (toggles.length) await Promise.all(toggles); } } } catch (err) {