[feat](trx-client): per-rig audio TCP connections

Replace single-connection relay with per-rig audio manager that spawns independent TCP connections for each rig. Each rig gets its own broadcast channel, stream info, and vchan command routing. Selected rig mirrors to global channels for backward compat. Also fix bookmark apply to update spectrum marker instantly and fire all requests in parallel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-22 07:37:59 +01:00
parent 5d905bff87
commit 5821531a93
2 changed files with 308 additions and 383 deletions
+262 -342
View File
@@ -43,17 +43,177 @@ struct ActiveVChanSub {
decoder_kinds: Vec<String>, decoder_kinds: Vec<String>,
} }
/// Run the audio client with auto-reconnect. /// Per-rig audio task state, tracked by the multi-rig manager.
struct PerRigAudioTask {
handle: tokio::task::JoinHandle<()>,
shutdown_tx: watch::Sender<bool>,
port: u16,
}
/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on
/// demand as rigs appear/disappear from the known_rigs list. Each rig with
/// an `audio_port` gets its own TCP connection.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn run_audio_client( pub async fn run_multi_rig_audio_manager(
server_host: String, server_host: String,
default_port: u16, default_port: u16,
rig_ports: HashMap<String, u16>, rig_ports: HashMap<String, u16>,
selected_rig_id: Arc<Mutex<Option<String>>>, selected_rig_id: Arc<Mutex<Option<String>>>,
known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>, known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
rx_tx: broadcast::Sender<Bytes>, global_rx_tx: broadcast::Sender<Bytes>,
mut tx_rx: mpsc::Receiver<Bytes>, tx_rx: mpsc::Receiver<Bytes>,
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>, global_stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
decode_tx: broadcast::Sender<DecodedMessage>,
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
mut shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
_vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>,
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>,
) {
// TX frames from the microphone go to the selected rig only.
// We wrap the single tx_rx receiver so the per-rig task for the selected
// rig can consume it.
let tx_rx = Arc::new(tokio::sync::Mutex::new(tx_rx));
let mut active_tasks: HashMap<String, PerRigAudioTask> = HashMap::new();
let mut poll_interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = poll_interval.tick() => {
// Collect current known rigs and their audio ports.
let current_rigs: HashMap<String, u16> = known_rigs
.lock()
.ok()
.map(|entries| {
entries.iter().map(|e| {
let port = rig_ports.get(&e.rig_id).copied()
.or(e.audio_port)
.unwrap_or(default_port);
(e.rig_id.clone(), port)
}).collect()
})
.unwrap_or_default();
// Tear down tasks for rigs that are no longer present or
// whose port has changed.
let to_remove: Vec<String> = active_tasks.keys()
.filter(|id| {
match current_rigs.get(*id) {
None => true,
Some(port) => active_tasks.get(*id)
.is_none_or(|t| t.port != *port),
}
})
.cloned()
.collect();
for rig_id in &to_remove {
if let Some(task) = active_tasks.remove(rig_id) {
let _ = task.shutdown_tx.send(true);
task.handle.abort();
info!("Audio client: stopped task for rig {}", rig_id);
}
}
// Spawn tasks for new rigs.
for (rig_id, port) in &current_rigs {
if active_tasks.contains_key(rig_id) {
continue;
}
let (per_rig_shutdown_tx, per_rig_shutdown_rx) = watch::channel(false);
// Ensure per-rig broadcast and info channels exist.
let per_rig_rx_tx = {
let mut map = rig_audio_rx.write().unwrap();
map.entry(rig_id.clone())
.or_insert_with(|| broadcast::channel::<Bytes>(256).0)
.clone()
};
let per_rig_info_tx = {
let mut map = rig_audio_info.write().unwrap();
map.entry(rig_id.clone())
.or_insert_with(|| watch::channel(None).0)
.clone()
};
// Create per-rig vchan cmd channel.
let (per_rig_vchan_tx, per_rig_vchan_rx) =
mpsc::unbounded_channel::<VChanAudioCmd>();
if let Ok(mut map) = rig_vchan_audio_cmd.write() {
map.insert(rig_id.clone(), per_rig_vchan_tx);
}
let addr = format!("{}:{}", server_host, port);
let rig_id_clone = rig_id.clone();
let global_rx_tx_clone = global_rx_tx.clone();
let global_info_tx_clone = global_stream_info_tx.clone();
let selected_clone = selected_rig_id.clone();
let decode_tx_clone = decode_tx.clone();
let replay_sink = replay_history_sink.clone();
let vchan_audio_clone = vchan_audio.clone();
let vchan_destroyed_clone = vchan_destroyed_tx.clone();
let tx_rx_clone = tx_rx.clone();
let handle = tokio::spawn(async move {
run_single_rig_audio_client(
addr,
rig_id_clone,
selected_clone,
per_rig_rx_tx,
per_rig_info_tx,
global_rx_tx_clone,
global_info_tx_clone,
tx_rx_clone,
decode_tx_clone,
replay_sink,
per_rig_shutdown_rx,
vchan_audio_clone,
per_rig_vchan_rx,
vchan_destroyed_clone,
)
.await;
});
info!("Audio client: started task for rig {} ({}:{})", rig_id, server_host, port);
active_tasks.insert(rig_id.clone(), PerRigAudioTask {
handle,
shutdown_tx: per_rig_shutdown_tx,
port: *port,
});
}
}
changed = shutdown_rx.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
// Shut down all per-rig tasks.
for (rig_id, task) in active_tasks.drain() {
let _ = task.shutdown_tx.send(true);
task.handle.abort();
info!("Audio client: shutdown task for rig {}", rig_id);
}
return;
}
}
}
}
}
/// Audio client for a single rig. Maintains its own TCP connection with
/// auto-reconnect, publishes RX frames to both per-rig and (if selected)
/// global broadcast channels.
#[allow(clippy::too_many_arguments)]
async fn run_single_rig_audio_client(
server_addr: String,
rig_id: String,
selected_rig_id: Arc<Mutex<Option<String>>>,
per_rig_rx_tx: broadcast::Sender<Bytes>,
per_rig_info_tx: watch::Sender<Option<AudioStreamInfo>>,
global_rx_tx: broadcast::Sender<Bytes>,
global_info_tx: watch::Sender<Option<AudioStreamInfo>>,
tx_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
decode_tx: broadcast::Sender<DecodedMessage>, decode_tx: broadcast::Sender<DecodedMessage>,
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>, replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
mut shutdown_rx: watch::Receiver<bool>, mut shutdown_rx: watch::Receiver<bool>,
@@ -62,42 +222,34 @@ pub async fn run_audio_client(
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
) { ) {
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
// Active virtual-channel subscriptions, keyed by UUID, re-sent to the
// server on every audio TCP reconnect.
let mut active_subs: HashMap<Uuid, ActiveVChanSub> = HashMap::new(); let mut active_subs: HashMap<Uuid, ActiveVChanSub> = HashMap::new();
let is_selected = |sel: &Arc<Mutex<Option<String>>>, rid: &str| -> bool {
sel.lock()
.ok()
.and_then(|v| v.clone())
.is_some_and(|s| s == rid)
};
loop { loop {
if *shutdown_rx.borrow() { if *shutdown_rx.borrow() {
info!("Audio client shutting down"); info!("Audio client [{}]: shutting down", rig_id);
return; return;
} }
let server_addr = resolve_audio_addr( info!("Audio client [{}]: connecting to {}", rig_id, server_addr);
&server_host,
default_port,
&rig_ports,
&known_rigs,
selected_rig_id
.lock()
.ok()
.and_then(|v| v.clone())
.as_deref(),
);
info!("Audio client: connecting to {}", server_addr);
match TcpStream::connect(&server_addr).await { match TcpStream::connect(&server_addr).await {
Ok(stream) => { Ok(stream) => {
reconnect_delay = Duration::from_secs(1); reconnect_delay = Duration::from_secs(1);
if let Err(e) = handle_audio_connection( if let Err(e) = handle_single_rig_connection(
stream, stream,
&server_host, &rig_id,
default_port,
&rig_ports,
&selected_rig_id, &selected_rig_id,
&known_rigs, &per_rig_rx_tx,
&server_addr, &per_rig_info_tx,
&rx_tx, &global_rx_tx,
&mut tx_rx, &global_info_tx,
&stream_info_tx, &tx_rx,
&decode_tx, &decode_tx,
replay_history_sink.clone(), replay_history_sink.clone(),
&mut shutdown_rx, &mut shutdown_rx,
@@ -108,21 +260,25 @@ pub async fn run_audio_client(
) )
.await .await
{ {
warn!("Audio connection dropped: {}", e); warn!("Audio connection [{}] dropped: {}", rig_id, e);
} }
} }
Err(e) => { Err(e) => {
warn!("Audio connect failed: {}", e); warn!("Audio connect [{}] failed: {}", rig_id, e);
} }
} }
let _ = stream_info_tx.send(None); let _ = per_rig_info_tx.send(None);
if is_selected(&selected_rig_id, &rig_id) {
let _ = global_info_tx.send(None);
}
tokio::select! { tokio::select! {
_ = time::sleep(reconnect_delay) => {} _ = time::sleep(reconnect_delay) => {}
changed = shutdown_rx.changed() => { changed = shutdown_rx.changed() => {
match changed { match changed {
Ok(()) if *shutdown_rx.borrow() => { Ok(()) if *shutdown_rx.borrow() => {
info!("Audio client shutting down"); info!("Audio client [{}]: shutting down", rig_id);
return; return;
} }
Ok(()) => {} Ok(()) => {}
@@ -134,18 +290,18 @@ pub async fn run_audio_client(
} }
} }
/// Handle a single TCP connection for one rig. Similar to `handle_audio_connection`
/// but publishes to per-rig channels directly and mirrors to global when selected.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn handle_audio_connection( async fn handle_single_rig_connection(
stream: TcpStream, stream: TcpStream,
server_host: &str, rig_id: &str,
default_port: u16,
rig_ports: &HashMap<String, u16>,
selected_rig_id: &Arc<Mutex<Option<String>>>, selected_rig_id: &Arc<Mutex<Option<String>>>,
known_rigs: &Arc<Mutex<Vec<RemoteRigEntry>>>, per_rig_rx_tx: &broadcast::Sender<Bytes>,
connected_addr: &str, per_rig_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
rx_tx: &broadcast::Sender<Bytes>, global_rx_tx: &broadcast::Sender<Bytes>,
tx_rx: &mut mpsc::Receiver<Bytes>, global_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>, tx_rx: &Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
decode_tx: &broadcast::Sender<DecodedMessage>, decode_tx: &broadcast::Sender<DecodedMessage>,
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>, replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
shutdown_rx: &mut watch::Receiver<bool>, shutdown_rx: &mut watch::Receiver<bool>,
@@ -169,14 +325,22 @@ async fn handle_audio_connection(
let info: AudioStreamInfo = serde_json::from_slice(&payload) let info: AudioStreamInfo = serde_json::from_slice(&payload)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
info!( info!(
"Audio stream info: {}Hz, {} ch, {}ms", "Audio stream info [{}]: {}Hz, {} ch, {}ms",
info.sample_rate, info.channels, info.frame_duration_ms rig_id, info.sample_rate, info.channels, info.frame_duration_ms
); );
let _ = stream_info_tx.send(Some(info)); let _ = per_rig_info_tx.send(Some(info.clone()));
// On reconnect: re-subscribe all previously active virtual channels. // Mirror to global if this is the selected rig.
// Track which UUIDs were pre-sent so we don't duplicate them when the let is_selected = selected_rig_id
// same Subscribe command arrives from the mpsc queue. .lock()
.ok()
.and_then(|v| v.clone())
.is_some_and(|s| s == rig_id);
if is_selected {
let _ = global_info_tx.send(Some(info));
}
// Re-subscribe active virtual channels on reconnect.
let mut resubscribed: HashSet<Uuid> = HashSet::new(); let mut resubscribed: HashSet<Uuid> = HashSet::new();
for (&uuid, sub) in active_subs.iter() { for (&uuid, sub) in active_subs.iter() {
let json = serde_json::json!({ let json = serde_json::json!({
@@ -189,17 +353,16 @@ async fn handle_audio_connection(
}); });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
warn!("Audio vchan reconnect SUB write failed: {}", e); warn!("Audio vchan reconnect SUB write failed [{}]: {}", rig_id, e);
return Err(e); return Err(e);
} }
} }
// Re-apply non-default bandwidth after re-subscribing.
if sub.bandwidth_hz > 0 { if sub.bandwidth_hz > 0 {
let bw_json = let bw_json =
serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz }); serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&bw_json) { if let Ok(payload) = serde_json::to_vec(&bw_json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await {
warn!("Audio vchan reconnect BW write failed: {}", e); warn!("Audio vchan reconnect BW write failed [{}]: {}", rig_id, e);
return Err(e); return Err(e);
} }
} }
@@ -207,9 +370,12 @@ async fn handle_audio_connection(
resubscribed.insert(uuid); resubscribed.insert(uuid);
} }
// Spawn RX read task // Spawn RX read task — publishes to per-rig and (when selected) global.
let rx_tx = rx_tx.clone(); let per_rig_rx_clone = per_rig_rx_tx.clone();
let decode_tx = decode_tx.clone(); let global_rx_clone = global_rx_tx.clone();
let selected_for_rx = selected_rig_id.clone();
let rig_id_for_rx = rig_id.to_string();
let decode_tx_clone = decode_tx.clone();
let vchan_audio_rx: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>> = let vchan_audio_rx: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>> =
Arc::clone(vchan_audio); Arc::clone(vchan_audio);
let vchan_destroyed_for_rx = vchan_destroyed_tx.clone(); let vchan_destroyed_for_rx = vchan_destroyed_tx.clone();
@@ -217,10 +383,20 @@ async fn handle_audio_connection(
loop { loop {
match read_audio_msg(&mut reader).await { match read_audio_msg(&mut reader).await {
Ok((AUDIO_MSG_RX_FRAME, payload)) => { Ok((AUDIO_MSG_RX_FRAME, payload)) => {
let _ = rx_tx.send(Bytes::from(payload)); let data = Bytes::from(payload);
// Always publish to per-rig channel.
let _ = per_rig_rx_clone.send(data.clone());
// Mirror to global if this rig is currently selected.
let sel = selected_for_rx
.lock()
.ok()
.and_then(|v| v.clone())
.is_some_and(|s| s == rig_id_for_rx);
if sel {
let _ = global_rx_clone.send(data);
}
} }
Ok((AUDIO_MSG_RX_FRAME_CH, payload)) => { Ok((AUDIO_MSG_RX_FRAME_CH, payload)) => {
// Route per-channel Opus frame to the correct broadcaster.
if let Ok((uuid, opus)) = parse_vchan_audio_frame(&payload) { if let Ok((uuid, opus)) = parse_vchan_audio_frame(&payload) {
let pkt = Bytes::copy_from_slice(opus); let pkt = Bytes::copy_from_slice(opus);
if let Ok(map) = vchan_audio_rx.read() { if let Ok(map) = vchan_audio_rx.read() {
@@ -231,8 +407,6 @@ async fn handle_audio_connection(
} }
} }
Ok((AUDIO_MSG_VCHAN_ALLOCATED, payload)) => { Ok((AUDIO_MSG_VCHAN_ALLOCATED, payload)) => {
// Server confirmed a virtual channel is ready; ensure a
// broadcaster entry exists in the shared map.
if let Ok(uuid) = parse_vchan_uuid_msg(&payload) { if let Ok(uuid) = parse_vchan_uuid_msg(&payload) {
if let Ok(mut map) = vchan_audio_rx.write() { if let Ok(mut map) = vchan_audio_rx.write() {
map.entry(uuid) map.entry(uuid)
@@ -242,19 +416,15 @@ async fn handle_audio_connection(
} }
Ok((AUDIO_MSG_VCHAN_DESTROYED, payload)) => { Ok((AUDIO_MSG_VCHAN_DESTROYED, payload)) => {
if let Ok(uuid) = parse_vchan_uuid_msg(&payload) { if let Ok(uuid) = parse_vchan_uuid_msg(&payload) {
// Remove the broadcaster so audio_ws gets no more frames.
if let Ok(mut map) = vchan_audio_rx.write() { if let Ok(mut map) = vchan_audio_rx.write() {
map.remove(&uuid); map.remove(&uuid);
} }
// Notify the HTTP frontend so it removes the channel from
// ClientChannelManager (triggers SSE channels event).
if let Some(ref tx) = vchan_destroyed_for_rx { if let Some(ref tx) = vchan_destroyed_for_rx {
let _ = tx.send(uuid); let _ = tx.send(uuid);
} }
} }
} }
Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => { Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => {
// Decompress gzip blob, then iterate the embedded framed messages.
let mut decompressed = Vec::new(); let mut decompressed = Vec::new();
if GzDecoder::new(payload.as_slice()) if GzDecoder::new(payload.as_slice())
.read_to_end(&mut decompressed) .read_to_end(&mut decompressed)
@@ -296,20 +466,30 @@ async fn handle_audio_connection(
payload, payload,
)) => { )) => {
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(&payload) { if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(&payload) {
let _ = decode_tx.send(msg); let _ = decode_tx_clone.send(msg);
} }
} }
Ok((msg_type, _)) => { Ok((msg_type, _)) => {
warn!("Audio client: unexpected message type {:#04x}", msg_type); warn!(
"Audio client [{}]: unexpected message type {:#04x}",
rig_id_for_rx, msg_type
);
} }
Err(_) => break, Err(_) => break,
} }
} }
}); });
// Forward TX frames and VChanAudioCmds to server. // Forward TX frames (only when we are the selected rig) and vchan commands.
let mut rig_check = time::interval(Duration::from_millis(500)); let rig_id_owned = rig_id.to_string();
loop { loop {
// Only the selected rig should consume TX frames from the mic.
let is_sel = selected_rig_id
.lock()
.ok()
.and_then(|v| v.clone())
.is_some_and(|s| s == rig_id_owned);
tokio::select! { tokio::select! {
changed = shutdown_rx.changed() => { changed = shutdown_rx.changed() => {
match changed { match changed {
@@ -324,11 +504,18 @@ async fn handle_audio_connection(
} }
} }
} }
packet = tx_rx.recv() => { packet = async {
if is_sel {
tx_rx.lock().await.recv().await
} else {
// Not selected — don't consume TX frames; pend forever.
std::future::pending().await
}
} => {
match packet { match packet {
Some(data) => { Some(data) => {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_TX_FRAME, &data).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_TX_FRAME, &data).await {
warn!("Audio TX write failed: {}", e); warn!("Audio TX write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
@@ -345,9 +532,7 @@ async fn handle_audio_connection(
hidden: false, hidden: false,
decoder_kinds: decoder_kinds.clone(), decoder_kinds: decoder_kinds.clone(),
}); });
// Skip if already re-sent during reconnect initialization.
if resubscribed.remove(&uuid) { if resubscribed.remove(&uuid) {
// Already sent above; don't duplicate.
} else { } else {
let json = serde_json::json!({ let json = serde_json::json!({
"uuid": uuid.to_string(), "uuid": uuid.to_string(),
@@ -359,7 +544,7 @@ async fn handle_audio_connection(
}); });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
warn!("Audio vchan SUB write failed: {}", e); warn!("Audio vchan SUB write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
@@ -374,7 +559,6 @@ async fn handle_audio_connection(
decoder_kinds: decoder_kinds.clone(), decoder_kinds: decoder_kinds.clone(),
}); });
if resubscribed.remove(&uuid) { if resubscribed.remove(&uuid) {
// Already sent above; don't duplicate.
} else { } else {
let json = serde_json::json!({ let json = serde_json::json!({
"uuid": uuid.to_string(), "uuid": uuid.to_string(),
@@ -386,7 +570,7 @@ async fn handle_audio_connection(
}); });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
warn!("Audio background SUB write failed: {}", e); warn!("Audio background SUB write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
@@ -394,16 +578,15 @@ async fn handle_audio_connection(
} }
Some(VChanAudioCmd::Unsubscribe(uuid)) => { Some(VChanAudioCmd::Unsubscribe(uuid)) => {
if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_UNSUB, uuid).await { if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_UNSUB, uuid).await {
warn!("Audio vchan UNSUB write failed: {}", e); warn!("Audio vchan UNSUB write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
Some(VChanAudioCmd::Remove(uuid)) => { Some(VChanAudioCmd::Remove(uuid)) => {
if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_REMOVE, uuid).await { if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_REMOVE, uuid).await {
warn!("Audio vchan REMOVE write failed: {}", e); warn!("Audio vchan REMOVE write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
// Clean up local broadcaster.
if let Ok(mut map) = vchan_audio.write() { if let Ok(mut map) = vchan_audio.write() {
map.remove(&uuid); map.remove(&uuid);
} }
@@ -416,7 +599,7 @@ async fn handle_audio_connection(
let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz }); let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_FREQ, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_FREQ, &payload).await {
warn!("Audio vchan FREQ write failed: {}", e); warn!("Audio vchan FREQ write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
@@ -428,20 +611,19 @@ async fn handle_audio_connection(
let json = serde_json::json!({ "uuid": uuid.to_string(), "mode": mode }); let json = serde_json::json!({ "uuid": uuid.to_string(), "mode": mode });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_MODE, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_MODE, &payload).await {
warn!("Audio vchan MODE write failed: {}", e); warn!("Audio vchan MODE write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
} }
Some(VChanAudioCmd::SetBandwidth { uuid, bandwidth_hz }) => { Some(VChanAudioCmd::SetBandwidth { uuid, bandwidth_hz }) => {
// Persist for reconnect.
if let Some(entry) = active_subs.get_mut(&uuid) { if let Some(entry) = active_subs.get_mut(&uuid) {
entry.bandwidth_hz = bandwidth_hz; entry.bandwidth_hz = bandwidth_hz;
} }
let json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz }); let json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await {
warn!("Audio vchan BW write failed: {}", e); warn!("Audio vchan BW write failed [{}]: {}", rig_id_owned, e);
break; break;
} }
} }
@@ -452,270 +634,8 @@ async fn handle_audio_connection(
_ = &mut rx_handle => { _ = &mut rx_handle => {
break; break;
} }
_ = rig_check.tick() => {
let current_rig = selected_rig_id.lock().ok().and_then(|v| v.clone());
let desired_addr = resolve_audio_addr(
server_host,
default_port,
rig_ports,
known_rigs,
current_rig.as_deref(),
);
if desired_addr != connected_addr {
info!(
"Audio client: active rig changed ({} -> {}), reconnecting audio",
connected_addr,
desired_addr
);
break;
}
}
} }
} }
Ok(()) Ok(())
} }
/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on
/// demand as rigs appear/disappear from the known_rigs list.
#[allow(clippy::too_many_arguments)]
pub async fn run_multi_rig_audio_manager(
server_host: String,
default_port: u16,
rig_ports: HashMap<String, u16>,
selected_rig_id: Arc<Mutex<Option<String>>>,
known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
global_rx_tx: broadcast::Sender<Bytes>,
tx_rx: mpsc::Receiver<Bytes>,
global_stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
decode_tx: broadcast::Sender<DecodedMessage>,
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>,
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>,
) {
// Per-rig vchan command routing: create per-rig senders that relay into the
// single global vchan_cmd channel. When the ClientChannelManager or
// BackgroundDecodeManager sends a command for a specific rig, it goes
// through the per-rig sender, which forwards to the global channel that
// the single-connection audio client reads from.
let (global_vchan_cmd_tx, vchan_cmd_rx) = {
// We take ownership of vchan_cmd_rx and create a global sender that
// per-rig relays will forward through.
let (tx, rx) = mpsc::unbounded_channel::<VChanAudioCmd>();
// Spawn relay from the original vchan_cmd_rx (from main.rs).
let mut orig_rx = vchan_cmd_rx;
let tx_for_orig = tx.clone();
tokio::spawn(async move {
while let Some(cmd) = orig_rx.recv().await {
let _ = tx_for_orig.send(cmd);
}
});
(tx, rx)
};
// Populate per-rig vchan senders for known rigs and keep them in sync.
let rig_vchan_for_sync = rig_vchan_audio_cmd.clone();
let known_rigs_for_vchan = known_rigs.clone();
let global_vchan_for_sync = global_vchan_cmd_tx.clone();
let mut vchan_sync_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = interval.tick() => {
let rig_ids: Vec<String> = known_rigs_for_vchan
.lock()
.ok()
.map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect())
.unwrap_or_default();
if let Ok(mut map) = rig_vchan_for_sync.write() {
for rig_id in &rig_ids {
if !map.contains_key(rig_id) {
// Create a per-rig sender that relays to global.
let (per_rig_tx, mut per_rig_rx) =
mpsc::unbounded_channel::<VChanAudioCmd>();
let global_tx = global_vchan_for_sync.clone();
tokio::spawn(async move {
while let Some(cmd) = per_rig_rx.recv().await {
let _ = global_tx.send(cmd);
}
});
map.insert(rig_id.clone(), per_rig_tx);
}
}
// Remove senders for rigs no longer present.
let active: std::collections::HashSet<&str> =
rig_ids.iter().map(|s| s.as_str()).collect();
map.retain(|id, _| active.contains(id.as_str()));
}
}
changed = vchan_sync_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *vchan_sync_shutdown.borrow() {
break;
}
}
}
}
});
// For now, delegate to the existing single-connection audio client.
// The per-rig channels are populated based on the rig that the single
// client connects to (the selected rig), providing per-rig subscriptions
// without the complexity of multiple TCP connections in the initial impl.
//
// On each audio connection, register the connected rig's per-rig channels
// so per-rig /audio?rig_id= subscribers get data.
let rig_audio_rx_clone = rig_audio_rx.clone();
let rig_audio_info_clone = rig_audio_info.clone();
// Spawn a task that keeps per-rig audio/info maps populated for ALL
// known rigs (not just the selected one). Non-connected rigs get valid
// but silent channels so `/audio?rig_id=X` can subscribe instantly
// instead of timing out.
let known_rigs_for_sync = known_rigs.clone();
let mut sync_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = interval.tick() => {
let rig_ids: Vec<String> = known_rigs_for_sync
.lock()
.ok()
.map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect())
.unwrap_or_default();
for rig_id in &rig_ids {
if let Ok(mut map) = rig_audio_rx_clone.write() {
map.entry(rig_id.clone())
.or_insert_with(|| broadcast::channel::<Bytes>(256).0);
}
if let Ok(mut map) = rig_audio_info_clone.write() {
map.entry(rig_id.clone())
.or_insert_with(|| watch::channel(None).0);
}
}
}
changed = sync_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *sync_shutdown.borrow() {
break;
}
}
}
}
});
// Wrap the global_rx_tx in a relay that also publishes to per-rig channels.
let (relay_rx_tx, _) = broadcast::channel::<Bytes>(256);
let relay_clone = relay_rx_tx.clone();
let rig_audio_rx_for_relay = rig_audio_rx.clone();
let selected_for_relay = selected_rig_id.clone();
let mut relay_sub = global_rx_tx.subscribe();
let mut relay_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
loop {
tokio::select! {
result = relay_sub.recv() => {
match result {
Ok(data) => {
// Forward to per-rig channel for the selected rig.
if let Some(rig_id) = selected_for_relay.lock().ok().and_then(|v| v.clone()) {
if let Ok(map) = rig_audio_rx_for_relay.read() {
if let Some(tx) = map.get(&rig_id) {
let _ = tx.send(data.clone());
}
}
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = relay_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *relay_shutdown.borrow() {
break;
}
}
}
}
});
// Also relay stream info changes to per-rig info channels.
let mut info_relay_rx = global_stream_info_tx.subscribe();
let rig_audio_info_for_relay = rig_audio_info.clone();
let selected_for_info_relay = selected_rig_id.clone();
let mut info_relay_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
loop {
tokio::select! {
changed = info_relay_rx.changed() => {
match changed {
Ok(()) => {
let info = info_relay_rx.borrow().clone();
if let Some(rig_id) = selected_for_info_relay.lock().ok().and_then(|v| v.clone()) {
if let Ok(map) = rig_audio_info_for_relay.read() {
if let Some(tx) = map.get(&rig_id) {
let _ = tx.send(info);
}
}
}
}
Err(_) => break,
}
}
changed = info_relay_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *info_relay_shutdown.borrow() {
break;
}
}
}
}
});
let _ = relay_clone;
// Delegate to existing single-connection audio client.
run_audio_client(
server_host,
default_port,
rig_ports,
selected_rig_id,
known_rigs,
global_rx_tx,
tx_rx,
global_stream_info_tx,
decode_tx,
replay_history_sink,
shutdown_rx,
vchan_audio,
vchan_cmd_rx,
vchan_destroyed_tx,
)
.await;
}
fn resolve_audio_addr(
host: &str,
default_port: u16,
rig_ports: &HashMap<String, u16>,
known_rigs: &Arc<Mutex<Vec<RemoteRigEntry>>>,
selected_rig_id: Option<&str>,
) -> String {
let port = selected_rig_id
.and_then(|rig_id| {
rig_ports.get(rig_id).copied().or_else(|| {
known_rigs.lock().ok().and_then(|entries| {
entries
.iter()
.find(|entry| entry.rig_id == rig_id)
.and_then(|entry| entry.audio_port)
})
})
})
.unwrap_or(default_port);
format!("{}:{}", host, port)
}
@@ -324,62 +324,67 @@ async function bmDelete(id) {
async function bmApply(bm) { async function bmApply(bm) {
try { try {
const onVirtual = typeof vchanInterceptMode === "function" // --- Optimistic UI updates (instant, before any network round-trips) ---
&& await vchanInterceptMode(bm.mode);
if (!onVirtual) {
await postPath("/set_mode?mode=" + encodeURIComponent(bm.mode));
}
if (typeof modeEl !== "undefined" && modeEl) { if (typeof modeEl !== "undefined" && modeEl) {
modeEl.value = String(bm.mode || "").toUpperCase(); modeEl.value = String(bm.mode || "").toUpperCase();
} }
if (bm.bandwidth_hz) { if (bm.bandwidth_hz) {
if (typeof currentBandwidthHz !== "undefined") {
currentBandwidthHz = bm.bandwidth_hz;
}
window.currentBandwidthHz = bm.bandwidth_hz;
if (typeof syncBandwidthInput === "function") {
syncBandwidthInput(bm.bandwidth_hz);
}
}
if (typeof applyLocalTunedFrequency === "function") {
applyLocalTunedFrequency(bm.freq_hz);
}
if (typeof scheduleSpectrumDraw === "function" && typeof lastSpectrumData !== "undefined" && lastSpectrumData) {
scheduleSpectrumDraw();
}
// --- Send mode, bandwidth, and frequency to server in parallel ---
const modePromise = (async () => {
const onVirtual = typeof vchanInterceptMode === "function"
&& await vchanInterceptMode(bm.mode);
if (!onVirtual) {
await postPath("/set_mode?mode=" + encodeURIComponent(bm.mode));
}
})();
const bwPromise = bm.bandwidth_hz ? (async () => {
const bwHandledByVchan = typeof vchanInterceptBandwidth === "function" const bwHandledByVchan = typeof vchanInterceptBandwidth === "function"
&& await vchanInterceptBandwidth(bm.bandwidth_hz); && await vchanInterceptBandwidth(bm.bandwidth_hz);
if (!bwHandledByVchan) { if (!bwHandledByVchan) {
await postPath("/set_bandwidth?hz=" + bm.bandwidth_hz); await postPath("/set_bandwidth?hz=" + bm.bandwidth_hz);
} }
if (typeof currentBandwidthHz !== "undefined") { })() : Promise.resolve();
currentBandwidthHz = bm.bandwidth_hz;
}
if (typeof window !== "undefined") {
window.currentBandwidthHz = bm.bandwidth_hz;
}
if (typeof syncBandwidthInput === "function") {
syncBandwidthInput(bm.bandwidth_hz);
}
}
// setRigFrequency is wrapped by vchan.js to redirect to the channel API // setRigFrequency is wrapped by vchan.js to redirect to the channel API
// when on a virtual channel, so this call works correctly in both cases. // when on a virtual channel, so this call works correctly in both cases.
if (typeof setRigFrequency === "function") { // It also does its own optimistic update (applyLocalTunedFrequency) but
await setRigFrequency(bm.freq_hz); // that's a no-op since we already set the same value above.
} else { const freqPromise = (async () => {
await postPath("/set_freq?hz=" + bm.freq_hz); if (typeof setRigFrequency === "function") {
} await setRigFrequency(bm.freq_hz);
// Toggle decoders when in DIG mode. } else {
await postPath("/set_freq?hz=" + bm.freq_hz);
}
})();
await Promise.all([modePromise, bwPromise, freqPromise]);
// --- Toggle decoders when in DIG mode ---
if (bm.mode === "DIG" && Array.isArray(bm.decoders)) { if (bm.mode === "DIG" && Array.isArray(bm.decoders)) {
const statusResp = await fetch("/status"); const statusResp = await fetch("/status");
if (statusResp.ok) { if (statusResp.ok) {
const st = await statusResp.json(); const st = await statusResp.json();
const wantFt8 = bm.decoders.includes("ft8"); const toggles = [];
if (wantFt8 !== !!st.ft8_decode_enabled) { const check = (key) => {
await postPath("/toggle_ft8_decode"); if (bm.decoders.includes(key) !== !!st[key.replace(/-/g, "_") + "_decode_enabled"]) {
} toggles.push(postPath("/toggle_" + key.replace(/-/g, "_") + "_decode"));
const wantFt4 = bm.decoders.includes("ft4"); }
if (wantFt4 !== !!st.ft4_decode_enabled) { };
await postPath("/toggle_ft4_decode"); check("ft8"); check("ft4"); check("ft2"); check("wspr"); check("hf-aprs");
} if (toggles.length) await Promise.all(toggles);
const wantFt2 = bm.decoders.includes("ft2");
if (wantFt2 !== !!st.ft2_decode_enabled) {
await postPath("/toggle_ft2_decode");
}
const wantWspr = bm.decoders.includes("wspr");
if (wantWspr !== !!st.wspr_decode_enabled) {
await postPath("/toggle_wspr_decode");
}
const wantHfAprs = bm.decoders.includes("hf-aprs");
if (wantHfAprs !== !!st.hf_aprs_decode_enabled) {
await postPath("/toggle_hf_aprs_decode");
}
} }
} }
} catch (err) { } catch (err) {