[feat](trx-client): add per-rig spectrum and audio streams

Each browser tab can now subscribe to a specific rig's spectrum and
audio independently via ?rig_id= query params on /spectrum and /audio.
The remote client polls spectrum for all rigs with active subscribers
and routes responses to per-rig watch channels. Virtual channel
commands are routed through per-rig senders with global fallback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-22 06:59:54 +01:00
parent 6fb7b61c1c
commit 9900314c8c
10 changed files with 578 additions and 159 deletions
+222
View File
@@ -476,6 +476,228 @@ async fn handle_audio_connection(
Ok(())
}
/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on
/// demand as rigs appear/disappear from the known_rigs list.
#[allow(clippy::too_many_arguments)]
pub async fn run_multi_rig_audio_manager(
server_host: String,
default_port: u16,
rig_ports: HashMap<String, u16>,
selected_rig_id: Arc<Mutex<Option<String>>>,
known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
global_rx_tx: broadcast::Sender<Bytes>,
tx_rx: mpsc::Receiver<Bytes>,
global_stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
decode_tx: broadcast::Sender<DecodedMessage>,
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>,
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>,
) {
// Per-rig vchan command routing: create per-rig senders that relay into the
// single global vchan_cmd channel. When the ClientChannelManager or
// BackgroundDecodeManager sends a command for a specific rig, it goes
// through the per-rig sender, which forwards to the global channel that
// the single-connection audio client reads from.
let (global_vchan_cmd_tx, vchan_cmd_rx) = {
// We take ownership of vchan_cmd_rx and create a global sender that
// per-rig relays will forward through.
let (tx, rx) = mpsc::unbounded_channel::<VChanAudioCmd>();
// Spawn relay from the original vchan_cmd_rx (from main.rs).
let mut orig_rx = vchan_cmd_rx;
let tx_for_orig = tx.clone();
tokio::spawn(async move {
while let Some(cmd) = orig_rx.recv().await {
let _ = tx_for_orig.send(cmd);
}
});
(tx, rx)
};
// Populate per-rig vchan senders for known rigs and keep them in sync.
let rig_vchan_for_sync = rig_vchan_audio_cmd.clone();
let known_rigs_for_vchan = known_rigs.clone();
let global_vchan_for_sync = global_vchan_cmd_tx.clone();
let mut vchan_sync_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = interval.tick() => {
let rig_ids: Vec<String> = known_rigs_for_vchan
.lock()
.ok()
.map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect())
.unwrap_or_default();
if let Ok(mut map) = rig_vchan_for_sync.write() {
for rig_id in &rig_ids {
if !map.contains_key(rig_id) {
// Create a per-rig sender that relays to global.
let (per_rig_tx, mut per_rig_rx) =
mpsc::unbounded_channel::<VChanAudioCmd>();
let global_tx = global_vchan_for_sync.clone();
tokio::spawn(async move {
while let Some(cmd) = per_rig_rx.recv().await {
let _ = global_tx.send(cmd);
}
});
map.insert(rig_id.clone(), per_rig_tx);
}
}
// Remove senders for rigs no longer present.
let active: std::collections::HashSet<&str> =
rig_ids.iter().map(|s| s.as_str()).collect();
map.retain(|id, _| active.contains(id.as_str()));
}
}
changed = vchan_sync_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *vchan_sync_shutdown.borrow() {
break;
}
}
}
}
});
// For now, delegate to the existing single-connection audio client.
// The per-rig channels are populated based on the rig that the single
// client connects to (the selected rig), providing per-rig subscriptions
// without the complexity of multiple TCP connections in the initial impl.
//
// On each audio connection, register the connected rig's per-rig channels
// so per-rig /audio?rig_id= subscribers get data.
let selected_clone = selected_rig_id.clone();
let rig_audio_rx_clone = rig_audio_rx.clone();
let rig_audio_info_clone = rig_audio_info.clone();
// Spawn a task that keeps per-rig maps in sync with the selected rig.
let mut sync_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut last_rig: Option<String> = None;
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = interval.tick() => {
let current = selected_clone.lock().ok().and_then(|v| v.clone());
if current != last_rig {
// Ensure per-rig broadcast exists for new rig.
if let Some(ref rig_id) = current {
if let Ok(mut map) = rig_audio_rx_clone.write() {
map.entry(rig_id.clone())
.or_insert_with(|| broadcast::channel::<Bytes>(256).0);
}
if let Ok(mut map) = rig_audio_info_clone.write() {
map.entry(rig_id.clone())
.or_insert_with(|| watch::channel(None).0);
}
}
last_rig = current;
}
// Mirror global audio data to the current rig's per-rig channel.
// (The actual mirroring happens in the RX read task below.)
}
changed = sync_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *sync_shutdown.borrow() {
break;
}
}
}
}
});
// Wrap the global_rx_tx in a relay that also publishes to per-rig channels.
let (relay_rx_tx, _) = broadcast::channel::<Bytes>(256);
let relay_clone = relay_rx_tx.clone();
let rig_audio_rx_for_relay = rig_audio_rx.clone();
let selected_for_relay = selected_rig_id.clone();
let mut relay_sub = global_rx_tx.subscribe();
let mut relay_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
loop {
tokio::select! {
result = relay_sub.recv() => {
match result {
Ok(data) => {
// Forward to per-rig channel for the selected rig.
if let Some(rig_id) = selected_for_relay.lock().ok().and_then(|v| v.clone()) {
if let Ok(map) = rig_audio_rx_for_relay.read() {
if let Some(tx) = map.get(&rig_id) {
let _ = tx.send(data.clone());
}
}
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = relay_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *relay_shutdown.borrow() {
break;
}
}
}
}
});
// Also relay stream info changes to per-rig info channels.
let mut info_relay_rx = global_stream_info_tx.subscribe();
let rig_audio_info_for_relay = rig_audio_info.clone();
let selected_for_info_relay = selected_rig_id.clone();
let mut info_relay_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
loop {
tokio::select! {
changed = info_relay_rx.changed() => {
match changed {
Ok(()) => {
let info = info_relay_rx.borrow().clone();
if let Some(rig_id) = selected_for_info_relay.lock().ok().and_then(|v| v.clone()) {
if let Ok(map) = rig_audio_info_for_relay.read() {
if let Some(tx) = map.get(&rig_id) {
let _ = tx.send(info);
}
}
}
}
Err(_) => break,
}
}
changed = info_relay_shutdown.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *info_relay_shutdown.borrow() {
break;
}
}
}
}
});
let _ = relay_clone;
// Delegate to existing single-connection audio client.
run_audio_client(
server_host,
default_port,
rig_ports,
selected_rig_id,
known_rigs,
global_rx_tx,
tx_rx,
global_stream_info_tx,
decode_tx,
replay_history_sink,
shutdown_rx,
vchan_audio,
vchan_cmd_rx,
vchan_destroyed_tx,
)
.await;
}
fn resolve_audio_addr(
host: &str,
default_port: u16,
+8 -1
View File
@@ -283,6 +283,7 @@ async fn async_init() -> DynResult<AppState> {
rig_states: frontend_runtime.rig_states.clone(),
poll_interval: Duration::from_millis(poll_interval_ms),
spectrum: frontend_runtime.spectrum.clone(),
rig_spectrums: frontend_runtime.rig_spectrums.clone(),
server_connected: frontend_runtime.server_connected.clone(),
};
let remote_shutdown_rx = shutdown_rx.clone();
@@ -388,7 +389,10 @@ async fn async_init() -> DynResult<AppState> {
let audio_rig_ports: HashMap<String, u16> = cfg.frontends.audio.rig_ports.clone();
let audio_shutdown_rx = shutdown_rx.clone();
let vchan_audio_map = frontend_runtime.vchan_audio.clone();
pending_audio_client = Some(tokio::spawn(audio_client::run_audio_client(
let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone();
let rig_audio_info_map = frontend_runtime.rig_audio_info.clone();
let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone();
pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager(
remote_host,
cfg.frontends.audio.server_port,
audio_rig_ports,
@@ -403,6 +407,9 @@ async fn async_init() -> DynResult<AppState> {
vchan_audio_map,
vchan_cmd_rx,
Some(vchan_destroyed_tx),
rig_audio_rx_map,
rig_audio_info_map,
rig_vchan_cmd_map,
)));
if cfg.frontends.audio.bridge.enabled {
+113 -68
View File
@@ -61,6 +61,8 @@ pub struct RemoteClientConfig {
/// Shared flag: `true` while a TCP connection to trx-server is active.
pub server_connected: Arc<AtomicBool>,
pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>,
/// Per-rig spectrum watch senders, keyed by rig_id.
pub rig_spectrums: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>,
}
pub async fn run_remote_client(
@@ -188,17 +190,38 @@ async fn handle_spectrum_connection(
}
}
_ = interval.tick() => {
if !should_poll_spectrum(config) {
// Collect rig IDs that have active spectrum subscribers.
let rig_ids = active_spectrum_rig_ids(config);
if rig_ids.is_empty() {
// No subscribers at all — clear global and skip.
config.spectrum.send_modify(|s| s.set(None, None));
continue;
}
match send_command_no_state_update(
config, &mut writer, &mut reader,
ClientCommand::GetSpectrum,
).await {
Ok(snapshot) => config
.spectrum
.send_modify(|s| s.set(snapshot.spectrum, snapshot.vchan_rds)),
// Determine the currently selected rig for backward compat.
let selected = selected_rig_id(config);
for rig_id in &rig_ids {
let envelope = ClientEnvelope {
token: config.token.clone(),
rig_id: Some(rig_id.clone()),
cmd: ClientCommand::GetSpectrum,
};
match send_envelope_no_state_update(&mut writer, &mut reader, envelope).await {
Ok(snapshot) => {
// Update per-rig channel.
if let Ok(map) = config.rig_spectrums.read() {
if let Some(tx) = map.get(rig_id) {
tx.send_modify(|s| s.set(snapshot.spectrum.clone(), snapshot.vchan_rds.clone()));
}
}
// Update global channel if this is the selected rig.
let is_selected = selected.as_deref() == Some(rig_id.as_str());
if is_selected {
config.spectrum.send_modify(|s| s.set(snapshot.spectrum, snapshot.vchan_rds));
}
}
Err(e) => {
// A spectrum timeout desynchronises the TCP framing;
// return so the caller reconnects and restores sync.
@@ -210,6 +233,7 @@ async fn handle_spectrum_connection(
}
}
}
}
async fn handle_connection(
config: &RemoteClientConfig,
@@ -331,53 +355,6 @@ async fn send_command(
))
}
/// Like `send_command` but does NOT update the main `state_tx` watch channel.
/// Used for spectrum polling to avoid triggering spurious SSE updates.
async fn send_command_no_state_update(
config: &RemoteClientConfig,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
cmd: ClientCommand,
) -> RigResult<trx_core::RigSnapshot> {
let envelope = build_envelope(config, cmd, None);
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(SPECTRUM_IO_TIMEOUT, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| {
RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?;
time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush())
.await
.map_err(|_| {
RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("flush failed: {e}")))?;
let line = time::timeout(
SPECTRUM_IO_TIMEOUT,
read_limited_line(reader, MAX_JSON_LINE_BYTES),
)
.await
.map_err(|_| {
RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?;
let resp: ClientResponse = serde_json::from_str(line.trim_end())
.map_err(|e| RigError::communication(format!("invalid response: {e}")))?;
if resp.success {
if let Some(snapshot) = resp.state {
return Ok(snapshot);
}
return Err(RigError::communication("missing snapshot"));
}
Err(RigError::communication(
resp.error.unwrap_or_else(|| "remote error".into()),
))
}
fn build_envelope(
config: &RemoteClientConfig,
cmd: ClientCommand,
@@ -526,26 +503,92 @@ fn set_selected_rig_id(config: &RemoteClientConfig, value: Option<String>) {
}
}
fn should_poll_spectrum(config: &RemoteClientConfig) -> bool {
if config.spectrum.receiver_count() == 0 {
return false;
/// Send a pre-built envelope and return the snapshot without updating state.
async fn send_envelope_no_state_update(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
envelope: ClientEnvelope,
) -> RigResult<trx_core::RigSnapshot> {
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(SPECTRUM_IO_TIMEOUT, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| {
RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?;
time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush())
.await
.map_err(|_| {
RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("flush failed: {e}")))?;
let line = time::timeout(
SPECTRUM_IO_TIMEOUT,
read_limited_line(reader, MAX_JSON_LINE_BYTES),
)
.await
.map_err(|_| {
RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?;
let resp: ClientResponse = serde_json::from_str(line.trim_end())
.map_err(|e| RigError::communication(format!("invalid response: {e}")))?;
if resp.success {
if let Some(snapshot) = resp.state {
return Ok(snapshot);
}
let selected = selected_rig_id(config);
let Some(selected) = selected.as_deref() else {
return true;
};
config
return Err(RigError::communication("missing snapshot"));
}
Err(RigError::communication(
resp.error.unwrap_or_else(|| "remote error".into()),
))
}
/// Collect rig IDs that have active per-rig spectrum subscribers or fall back
/// to the selected rig when only the global channel has subscribers.
fn active_spectrum_rig_ids(config: &RemoteClientConfig) -> Vec<String> {
let mut ids = Vec::new();
// Collect per-rig channels with active subscribers.
if let Ok(map) = config.rig_spectrums.read() {
for (rig_id, tx) in map.iter() {
if tx.receiver_count() > 0 {
ids.push(rig_id.clone());
}
}
}
// If global channel has subscribers but no per-rig subscriber covers the
// selected rig, add the selected rig so backward compat works.
if config.spectrum.receiver_count() > 0 {
if let Some(selected) = selected_rig_id(config) {
if !ids.contains(&selected) {
// Only add if the rig is initialized.
let initialized = config
.known_rigs
.lock()
.ok()
.and_then(|entries| {
.and_then(|entries| entries.iter().find(|e| e.rig_id == selected).cloned())
.map(|e| e.state.initialized)
.unwrap_or(true);
if initialized {
ids.push(selected);
}
}
}
}
// Filter to only initialized rigs.
if let Ok(entries) = config.known_rigs.lock() {
ids.retain(|id| {
entries
.iter()
.find(|entry| entry.rig_id == selected)
.cloned()
})
.map(|entry| entry.state.initialized)
.find(|e| &e.rig_id == id)
.map(|e| e.state.initialized)
.unwrap_or(true)
});
}
ids
}
fn choose_default_rig(rigs: &[RigEntry]) -> Option<&RigEntry> {
@@ -869,6 +912,7 @@ mod tests {
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
},
req_rx,
state_tx,
@@ -908,6 +952,7 @@ mod tests {
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
};
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret"));
+53
View File
@@ -266,6 +266,17 @@ pub struct FrontendRuntimeContext {
pub ais_vessel_url_base: Option<String>,
/// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`.
pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
/// Per-rig spectrum watch channels, keyed by rig_id.
/// Populated by the remote client spectrum polling task so each SSE
/// session can subscribe to a specific rig's spectrum independently.
pub rig_spectrums: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>,
/// Per-rig RX audio broadcast senders, keyed by rig_id.
/// Each rig's audio client task publishes Opus frames here.
pub rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
/// Per-rig audio stream info watch channels, keyed by rig_id.
pub rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
/// Per-rig virtual-channel command senders, keyed by rig_id.
pub rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>,
/// Per-virtual-channel Opus audio senders.
/// Key: server-side virtual channel UUID.
/// Value: `broadcast::Sender<Bytes>` that receives per-channel Opus packets
@@ -293,6 +304,44 @@ impl FrontendRuntimeContext {
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
}
/// Get a watch receiver for a specific rig's spectrum.
/// Lazily inserts a new channel if the rig_id is not yet present.
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {
if let Ok(map) = self.rig_spectrums.read() {
if let Some(tx) = map.get(rig_id) {
return tx.subscribe();
}
}
// Insert on miss.
if let Ok(mut map) = self.rig_spectrums.write() {
map.entry(rig_id.to_string())
.or_insert_with(|| watch::channel(SharedSpectrum::default()).0)
.subscribe()
} else {
// Poisoned lock fallback: return a dummy receiver.
watch::channel(SharedSpectrum::default()).1
}
}
/// Subscribe to a specific rig's RX audio broadcast.
pub fn rig_audio_subscribe(&self, rig_id: &str) -> Option<broadcast::Receiver<Bytes>> {
self.rig_audio_rx
.read()
.ok()
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
}
/// Get a watch receiver for a specific rig's audio stream info.
pub fn rig_audio_info_rx(
&self,
rig_id: &str,
) -> Option<watch::Receiver<Option<AudioStreamInfo>>> {
self.rig_audio_info
.read()
.ok()
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
}
/// Create a new empty runtime context.
pub fn new() -> Self {
Self {
@@ -338,6 +387,10 @@ impl FrontendRuntimeContext {
let (tx, _rx) = watch::channel(SharedSpectrum::default());
Arc::new(tx)
},
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_audio_rx: Arc::new(RwLock::new(HashMap::new())),
rig_audio_info: Arc::new(RwLock::new(HashMap::new())),
rig_vchan_audio_cmd: Arc::new(RwLock::new(HashMap::new())),
vchan_audio: Arc::new(RwLock::new(HashMap::new())),
vchan_audio_cmd: Arc::new(Mutex::new(None)),
vchan_destroyed: None,
@@ -3378,6 +3378,14 @@ async function switchRigFromSelect(selectEl) {
} catch (err) {
console.error("select_rig failed:", err);
}
// Reconnect spectrum SSE to the new rig's spectrum channel.
stopSpectrumStreaming();
startSpectrumStreaming();
// Reconnect audio to the new rig if audio is active.
if (rxActive) {
stopRxAudio();
startRxAudio();
}
showHint(`Rig: ${lastActiveRigId}`, 1500);
}
@@ -7497,9 +7505,14 @@ function startRxAudio() {
return;
}
const proto = location.protocol === "https:" ? "wss:" : "ws:";
const audioPath = _audioChannelOverride
? `/audio?channel_id=${encodeURIComponent(_audioChannelOverride)}`
: "/audio";
let audioPath;
if (_audioChannelOverride) {
audioPath = `/audio?channel_id=${encodeURIComponent(_audioChannelOverride)}`;
} else if (lastActiveRigId) {
audioPath = `/audio?rig_id=${encodeURIComponent(lastActiveRigId)}`;
} else {
audioPath = "/audio";
}
audioWs = new WebSocket(`${proto}//${location.host}${audioPath}`);
audioWs.binaryType = "arraybuffer";
audioStatus.textContent = "Connecting…";
@@ -8533,7 +8546,10 @@ function scheduleSpectrumReconnect() {
function startSpectrumStreaming() {
if (spectrumSource !== null) return;
spectrumSource = new EventSource("/spectrum");
const spectrumUrl = lastActiveRigId
? `/spectrum?rig_id=${encodeURIComponent(lastActiveRigId)}`
: "/spectrum";
spectrumSource = new EventSource(spectrumUrl);
// Unnamed event = reset signal.
spectrumSource.onmessage = (evt) => {
if (evt.data === "null") {
@@ -365,11 +365,7 @@ pub async fn events(
// Use the client-requested rig_id if provided, otherwise fall back to
// the global default. This allows each tab to reconnect SSE for the
// rig it has selected without mutating global state.
let active_rig_id = query
.rig_id
.clone()
.filter(|s| !s.is_empty())
.or_else(|| {
let active_rig_id = query.rig_id.clone().filter(|s| !s.is_empty()).or_else(|| {
context
.remote_active_rig_id
.lock()
@@ -804,11 +800,16 @@ impl<I> futures_util::Stream for DropStream<I> {
/// Emits an unnamed `data: null` event when spectrum data becomes unavailable.
#[get("/spectrum")]
pub async fn spectrum(
query: web::Query<RigIdQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
// Subscribe to the watch channel: each client gets its own receiver and is
// woken exactly when new spectrum data is pushed (no 40 ms polling needed).
let rx = context.spectrum.subscribe();
// Subscribe to a per-rig spectrum channel when rig_id is specified,
// otherwise fall back to the global channel for backward compat.
let rx = if let Some(ref rig_id) = query.rig_id {
context.rig_spectrum_rx(rig_id)
} else {
context.spectrum.subscribe()
};
let mut last_rds_json: Option<String> = None;
let mut last_vchan_rds_json: Option<String> = None;
let mut last_had_frame = false;
@@ -475,6 +475,7 @@ pub fn start_decode_history_collector(context: Arc<FrontendRuntimeContext>) {
#[derive(Deserialize)]
pub struct AudioQuery {
pub channel_id: Option<Uuid>,
pub rig_id: Option<String>,
}
#[get("/audio")]
@@ -516,6 +517,18 @@ pub async fn audio_ws(
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
} else if let Some(ref rig_id) = query.rig_id {
// Per-rig audio: subscribe to the specific rig's broadcast.
match context.rig_audio_subscribe(rig_id) {
Some(rx) => rx,
None => {
// Rig not yet connected; fall back to global.
let Some(rx) = context.audio_rx.as_ref() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
rx.subscribe()
}
}
} else {
let Some(rx) = context.audio_rx.as_ref() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
@@ -524,6 +537,13 @@ pub async fn audio_ws(
};
let mut rx_sub = rx_sub;
// Use per-rig audio info if available and rig_id was specified.
if let Some(ref rig_id) = query.rig_id {
if let Some(rig_info_rx) = context.rig_audio_info_rx(rig_id) {
info_rx = rig_info_rx;
}
}
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
actix_web::rt::spawn(async move {
@@ -250,6 +250,16 @@ impl BackgroundDecodeManager {
}
fn send_audio_cmd(&self, cmd: VChanAudioCmd) {
// Route through per-rig sender when available.
if let Some(rig_id) = self.active_rig_id() {
if let Ok(map) = self.context.rig_vchan_audio_cmd.read() {
if let Some(tx) = map.get(&rig_id) {
let _ = tx.send(cmd);
return;
}
}
}
// Fall back to global sender.
if let Ok(guard) = self.context.vchan_audio_cmd.lock() {
if let Some(tx) = guard.as_ref() {
let _ = tx.send(cmd);
@@ -89,7 +89,10 @@ async fn serve(
let background_decode_path = BackgroundDecodeStore::default_path();
let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path));
let vchan_mgr = Arc::new(ClientChannelManager::new(4));
let vchan_mgr = Arc::new(ClientChannelManager::new(
4,
context.rig_vchan_audio_cmd.clone(),
));
let session_rig_mgr = Arc::new(api::SessionRigManager::default());
let background_decode_mgr = BackgroundDecodeManager::new(
background_decode_store,
@@ -16,10 +16,10 @@
//! tunes a channel.
use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
use trx_frontend::VChanAudioCmd;
@@ -107,12 +107,18 @@ pub struct ClientChannelManager {
/// `"<rig_id>:"` so subscribers can filter by rig.
pub change_tx: broadcast::Sender<String>,
pub max_channels: usize,
/// Optional sender to the audio-client task for virtual-channel audio commands.
pub audio_cmd: std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedSender<VChanAudioCmd>>>,
/// Global fallback sender to the audio-client task for virtual-channel audio commands.
pub audio_cmd: std::sync::Mutex<Option<mpsc::UnboundedSender<VChanAudioCmd>>>,
/// Per-rig vchan command senders. Commands are routed to the per-rig sender
/// when available, falling back to the global `audio_cmd`.
pub rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>,
}
impl ClientChannelManager {
pub fn new(max_channels: usize) -> Self {
pub fn new(
max_channels: usize,
rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>,
) -> Self {
let (change_tx, _) = broadcast::channel(64);
Self {
rigs: RwLock::new(HashMap::new()),
@@ -120,17 +126,26 @@ impl ClientChannelManager {
change_tx,
max_channels: max_channels.max(1),
audio_cmd: std::sync::Mutex::new(None),
rig_vchan_audio_cmd,
}
}
/// Wire the audio-command sender so the manager can dispatch
/// `VChanAudioCmd` messages when channels are allocated/deleted/changed.
pub fn set_audio_cmd(&self, tx: tokio::sync::mpsc::UnboundedSender<VChanAudioCmd>) {
/// Wire the global audio-command sender as fallback.
pub fn set_audio_cmd(&self, tx: mpsc::UnboundedSender<VChanAudioCmd>) {
*self.audio_cmd.lock().unwrap() = Some(tx);
}
/// Fire-and-forget: send a `VChanAudioCmd` to the audio-client task.
fn send_audio_cmd(&self, cmd: VChanAudioCmd) {
/// Fire-and-forget: send a `VChanAudioCmd`, routing to the per-rig sender
/// when available or falling back to the global sender.
fn send_audio_cmd_for_rig(&self, rig_id: &str, cmd: VChanAudioCmd) {
// Try per-rig sender first.
if let Ok(map) = self.rig_vchan_audio_cmd.read() {
if let Some(tx) = map.get(rig_id) {
let _ = tx.send(cmd);
return;
}
}
// Fall back to global sender.
if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() {
let _ = tx.send(cmd);
}
@@ -265,13 +280,16 @@ impl ClientChannelManager {
.insert(session_id, (rig_id.to_string(), id));
// Request server-side DSP channel + audio subscription.
self.send_audio_cmd(VChanAudioCmd::Subscribe {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::Subscribe {
uuid: id,
freq_hz,
mode: mode.to_string(),
bandwidth_hz: 0,
decoder_kinds: Vec::new(),
});
},
);
Ok(snapshot)
}
@@ -362,7 +380,7 @@ impl ClientChannelManager {
drop(rigs);
for channel_id in removed_channel_ids {
self.send_audio_cmd(VChanAudioCmd::Remove(channel_id));
self.send_audio_cmd_for_rig(rig_id, VChanAudioCmd::Remove(channel_id));
}
}
@@ -389,7 +407,7 @@ impl ClientChannelManager {
}
// Remove server-side DSP channel and stop audio encoding.
self.send_audio_cmd(VChanAudioCmd::Remove(channel_id));
self.send_audio_cmd_for_rig(rig_id, VChanAudioCmd::Remove(channel_id));
Ok(())
}
@@ -446,10 +464,13 @@ impl ClientChannelManager {
ch.freq_hz = freq_hz;
self.broadcast_change(rig_id, channels);
drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetFreq {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::SetFreq {
uuid: channel_id,
freq_hz,
});
},
);
Ok(())
}
@@ -468,10 +489,13 @@ impl ClientChannelManager {
ch.mode = mode.to_string();
self.broadcast_change(rig_id, channels);
drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetMode {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::SetMode {
uuid: channel_id,
mode: mode.to_string(),
});
},
);
Ok(())
}
@@ -490,10 +514,13 @@ impl ClientChannelManager {
ch.bandwidth_hz = bandwidth_hz;
self.broadcast_change(rig_id, channels);
drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetBandwidth {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::SetBandwidth {
uuid: channel_id,
bandwidth_hz,
});
},
);
Ok(())
}
@@ -557,7 +584,7 @@ impl ClientChannelManager {
if remove {
let channel_id = channels[idx].id;
channels.remove(idx);
self.send_audio_cmd(VChanAudioCmd::Remove(channel_id));
self.send_audio_cmd_for_rig(rig_id, VChanAudioCmd::Remove(channel_id));
changed = true;
continue;
}
@@ -574,37 +601,49 @@ impl ClientChannelManager {
};
if channel.freq_hz != *freq_hz {
channel.freq_hz = *freq_hz;
self.send_audio_cmd(VChanAudioCmd::SetFreq {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::SetFreq {
uuid: channel.id,
freq_hz: *freq_hz,
});
},
);
changed = true;
}
if channel.mode != *mode {
channel.mode = mode.clone();
self.send_audio_cmd(VChanAudioCmd::SetMode {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::SetMode {
uuid: channel.id,
mode: mode.clone(),
});
},
);
changed = true;
}
if channel.bandwidth_hz != *bandwidth_hz {
channel.bandwidth_hz = *bandwidth_hz;
self.send_audio_cmd(VChanAudioCmd::SetBandwidth {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::SetBandwidth {
uuid: channel.id,
bandwidth_hz: *bandwidth_hz,
});
},
);
changed = true;
}
if channel.decoder_kinds != *decoder_kinds {
channel.decoder_kinds = decoder_kinds.clone();
self.send_audio_cmd(VChanAudioCmd::Subscribe {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::Subscribe {
uuid: channel.id,
freq_hz: channel.freq_hz,
mode: channel.mode.clone(),
bandwidth_hz: channel.bandwidth_hz,
decoder_kinds: channel.decoder_kinds.clone(),
});
},
);
changed = true;
}
}
@@ -630,13 +669,16 @@ impl ClientChannelManager {
scheduler_bookmark_id: Some(bookmark_id.clone()),
session_ids: Vec::new(),
});
self.send_audio_cmd(VChanAudioCmd::Subscribe {
self.send_audio_cmd_for_rig(
rig_id,
VChanAudioCmd::Subscribe {
uuid: channel_id,
freq_hz: *freq_hz,
mode: mode.clone(),
bandwidth_hz: *bandwidth_hz,
decoder_kinds: decoder_kinds.clone(),
});
},
);
changed = true;
}
@@ -652,7 +694,7 @@ mod tests {
#[test]
fn release_session_removes_last_non_permanent_channel() {
let mgr = ClientChannelManager::new(4);
let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new())));
let rig_id = "rig-a";
let session_id = Uuid::new_v4();
@@ -673,7 +715,7 @@ mod tests {
#[test]
fn sync_scheduler_channels_materializes_visible_scheduler_channels() {
let mgr = ClientChannelManager::new(4);
let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new())));
let rig_id = "rig-a";
mgr.init_rig(rig_id, 14_074_000, "USB");
@@ -699,7 +741,7 @@ mod tests {
#[test]
fn release_session_keeps_scheduler_managed_channels() {
let mgr = ClientChannelManager::new(4);
let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new())));
let rig_id = "rig-a";
let session_id = Uuid::new_v4();
@@ -728,7 +770,7 @@ mod tests {
#[test]
fn subscribed_scheduler_channel_survives_scheduler_clear_until_released() {
let mgr = ClientChannelManager::new(4);
let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new())));
let rig_id = "rig-a";
let session_id = Uuid::new_v4();