feat(multi-rig): auto-discover per-rig audio ports via GetRigs
This commit is contained in:
@@ -15,6 +15,7 @@ use tokio::net::TcpStream;
|
|||||||
use tokio::sync::{broadcast, mpsc, watch};
|
use tokio::sync::{broadcast, mpsc, watch};
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
use trx_frontend::RemoteRigEntry;
|
||||||
|
|
||||||
use trx_core::audio::{
|
use trx_core::audio::{
|
||||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE,
|
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE,
|
||||||
@@ -29,6 +30,7 @@ pub async fn run_audio_client(
|
|||||||
default_port: u16,
|
default_port: u16,
|
||||||
rig_ports: HashMap<String, u16>,
|
rig_ports: HashMap<String, u16>,
|
||||||
selected_rig_id: Arc<Mutex<Option<String>>>,
|
selected_rig_id: Arc<Mutex<Option<String>>>,
|
||||||
|
known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
|
||||||
rx_tx: broadcast::Sender<Bytes>,
|
rx_tx: broadcast::Sender<Bytes>,
|
||||||
mut tx_rx: mpsc::Receiver<Bytes>,
|
mut tx_rx: mpsc::Receiver<Bytes>,
|
||||||
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
||||||
@@ -47,6 +49,7 @@ pub async fn run_audio_client(
|
|||||||
&server_host,
|
&server_host,
|
||||||
default_port,
|
default_port,
|
||||||
&rig_ports,
|
&rig_ports,
|
||||||
|
&known_rigs,
|
||||||
selected_rig_id
|
selected_rig_id
|
||||||
.lock()
|
.lock()
|
||||||
.ok()
|
.ok()
|
||||||
@@ -63,6 +66,7 @@ pub async fn run_audio_client(
|
|||||||
default_port,
|
default_port,
|
||||||
&rig_ports,
|
&rig_ports,
|
||||||
&selected_rig_id,
|
&selected_rig_id,
|
||||||
|
&known_rigs,
|
||||||
&server_addr,
|
&server_addr,
|
||||||
&rx_tx,
|
&rx_tx,
|
||||||
&mut tx_rx,
|
&mut tx_rx,
|
||||||
@@ -104,6 +108,7 @@ async fn handle_audio_connection(
|
|||||||
default_port: u16,
|
default_port: u16,
|
||||||
rig_ports: &HashMap<String, u16>,
|
rig_ports: &HashMap<String, u16>,
|
||||||
selected_rig_id: &Arc<Mutex<Option<String>>>,
|
selected_rig_id: &Arc<Mutex<Option<String>>>,
|
||||||
|
known_rigs: &Arc<Mutex<Vec<RemoteRigEntry>>>,
|
||||||
connected_addr: &str,
|
connected_addr: &str,
|
||||||
rx_tx: &broadcast::Sender<Bytes>,
|
rx_tx: &broadcast::Sender<Bytes>,
|
||||||
tx_rx: &mut mpsc::Receiver<Bytes>,
|
tx_rx: &mut mpsc::Receiver<Bytes>,
|
||||||
@@ -196,6 +201,7 @@ async fn handle_audio_connection(
|
|||||||
server_host,
|
server_host,
|
||||||
default_port,
|
default_port,
|
||||||
rig_ports,
|
rig_ports,
|
||||||
|
known_rigs,
|
||||||
current_rig.as_deref(),
|
current_rig.as_deref(),
|
||||||
);
|
);
|
||||||
if desired_addr != connected_addr {
|
if desired_addr != connected_addr {
|
||||||
@@ -217,11 +223,20 @@ fn resolve_audio_addr(
|
|||||||
host: &str,
|
host: &str,
|
||||||
default_port: u16,
|
default_port: u16,
|
||||||
rig_ports: &HashMap<String, u16>,
|
rig_ports: &HashMap<String, u16>,
|
||||||
|
known_rigs: &Arc<Mutex<Vec<RemoteRigEntry>>>,
|
||||||
selected_rig_id: Option<&str>,
|
selected_rig_id: Option<&str>,
|
||||||
) -> String {
|
) -> String {
|
||||||
let port = selected_rig_id
|
let port = selected_rig_id
|
||||||
.and_then(|rig_id| rig_ports.get(rig_id))
|
.and_then(|rig_id| {
|
||||||
.copied()
|
rig_ports.get(rig_id).copied().or_else(|| {
|
||||||
|
known_rigs.lock().ok().and_then(|entries| {
|
||||||
|
entries
|
||||||
|
.iter()
|
||||||
|
.find(|entry| entry.rig_id == rig_id)
|
||||||
|
.and_then(|entry| entry.audio_port)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
.unwrap_or(default_port);
|
.unwrap_or(default_port);
|
||||||
format!("{}:{}", host, port)
|
format!("{}:{}", host, port)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -295,6 +295,7 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
cfg.frontends.audio.server_port,
|
cfg.frontends.audio.server_port,
|
||||||
audio_rig_ports,
|
audio_rig_ports,
|
||||||
frontend_runtime.remote_active_rig_id.clone(),
|
frontend_runtime.remote_active_rig_id.clone(),
|
||||||
|
frontend_runtime.remote_rigs.clone(),
|
||||||
rx_audio_tx,
|
rx_audio_tx,
|
||||||
tx_audio_rx,
|
tx_audio_rx,
|
||||||
stream_info_tx,
|
stream_info_tx,
|
||||||
|
|||||||
@@ -278,6 +278,7 @@ fn cache_remote_rigs(config: &RemoteClientConfig, rigs: &[RigEntry]) {
|
|||||||
.map(|entry| RemoteRigEntry {
|
.map(|entry| RemoteRigEntry {
|
||||||
rig_id: entry.rig_id.clone(),
|
rig_id: entry.rig_id.clone(),
|
||||||
state: entry.state.clone(),
|
state: entry.state.clone(),
|
||||||
|
audio_port: entry.audio_port,
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
@@ -561,6 +562,7 @@ mod tests {
|
|||||||
rigs: Some(vec![RigEntry {
|
rigs: Some(vec![RigEntry {
|
||||||
rig_id: "default".to_string(),
|
rig_id: "default".to_string(),
|
||||||
state: snapshot.clone(),
|
state: snapshot.clone(),
|
||||||
|
audio_port: Some(4531),
|
||||||
}]),
|
}]),
|
||||||
error: None,
|
error: None,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ use trx_core::{DynResult, RigRequest, RigState};
|
|||||||
pub struct RemoteRigEntry {
|
pub struct RemoteRigEntry {
|
||||||
pub rig_id: String,
|
pub rig_id: String,
|
||||||
pub state: RigSnapshot,
|
pub state: RigSnapshot,
|
||||||
|
pub audio_port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait implemented by concrete frontends to expose a runner entrypoint.
|
/// Trait implemented by concrete frontends to expose a runner entrypoint.
|
||||||
|
|||||||
@@ -244,6 +244,7 @@ fn snapshot_remote_rigs(context: &FrontendRuntimeContext) -> Vec<RigEntry> {
|
|||||||
.map(|entry| RigEntry {
|
.map(|entry| RigEntry {
|
||||||
rig_id: entry.rig_id.clone(),
|
rig_id: entry.rig_id.clone(),
|
||||||
state: entry.state.clone(),
|
state: entry.state.clone(),
|
||||||
|
audio_port: entry.audio_port,
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -55,6 +55,8 @@ pub struct ClientEnvelope {
|
|||||||
pub struct RigEntry {
|
pub struct RigEntry {
|
||||||
pub rig_id: String,
|
pub rig_id: String,
|
||||||
pub state: RigSnapshot,
|
pub state: RigSnapshot,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub audio_port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Response sent to network clients over TCP.
|
/// Response sent to network clients over TCP.
|
||||||
|
|||||||
@@ -244,6 +244,7 @@ async fn handle_client(
|
|||||||
entries.push(RigEntry {
|
entries.push(RigEntry {
|
||||||
rig_id: handle.rig_id.clone(),
|
rig_id: handle.rig_id.clone(),
|
||||||
state: snapshot,
|
state: snapshot,
|
||||||
|
audio_port: Some(handle.audio_port),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -303,7 +304,10 @@ async fn handle_client(
|
|||||||
match time::timeout(IO_TIMEOUT, handle.rig_tx.send(req)).await {
|
match time::timeout(IO_TIMEOUT, handle.rig_tx.send(req)).await {
|
||||||
Ok(Ok(())) => {}
|
Ok(Ok(())) => {}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
error!("Failed to send request to rig_task for '{}': {:?}", target_rig_id, e);
|
error!(
|
||||||
|
"Failed to send request to rig_task for '{}': {:?}",
|
||||||
|
target_rig_id, e
|
||||||
|
);
|
||||||
let resp = ClientResponse {
|
let resp = ClientResponse {
|
||||||
success: false,
|
success: false,
|
||||||
rig_id: Some(target_rig_id.clone()),
|
rig_id: Some(target_rig_id.clone()),
|
||||||
@@ -404,8 +408,8 @@ mod tests {
|
|||||||
|
|
||||||
use trx_core::radio::freq::Band;
|
use trx_core::radio::freq::Band;
|
||||||
use trx_core::rig::request::RigRequest;
|
use trx_core::rig::request::RigRequest;
|
||||||
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo};
|
|
||||||
use trx_core::rig::state::RigState;
|
use trx_core::rig::state::RigState;
|
||||||
|
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo};
|
||||||
|
|
||||||
fn loopback_addr() -> SocketAddr {
|
fn loopback_addr() -> SocketAddr {
|
||||||
let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind");
|
let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind");
|
||||||
@@ -458,6 +462,7 @@ mod tests {
|
|||||||
rig_id: "default".to_string(),
|
rig_id: "default".to_string(),
|
||||||
rig_tx,
|
rig_tx,
|
||||||
state_rx,
|
state_rx,
|
||||||
|
audio_port: 4531,
|
||||||
};
|
};
|
||||||
let mut map = HashMap::new();
|
let mut map = HashMap::new();
|
||||||
map.insert("default".to_string(), handle);
|
map.insert("default".to_string(), handle);
|
||||||
@@ -568,7 +573,11 @@ mod tests {
|
|||||||
reader.read_line(&mut line).await.expect("read");
|
reader.read_line(&mut line).await.expect("read");
|
||||||
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
|
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
|
||||||
assert!(!resp.success);
|
assert!(!resp.success);
|
||||||
assert!(resp.error.as_deref().unwrap_or("").contains("Unknown rig_id"));
|
assert!(resp
|
||||||
|
.error
|
||||||
|
.as_deref()
|
||||||
|
.unwrap_or("")
|
||||||
|
.contains("Unknown rig_id"));
|
||||||
|
|
||||||
let _ = shutdown_tx.send(true);
|
let _ = shutdown_tx.send(true);
|
||||||
handle.abort();
|
handle.abort();
|
||||||
|
|||||||
@@ -219,11 +219,9 @@ fn access_from_rig_instance(rig_cfg: &RigInstanceConfig) -> DynResult<RigAccess>
|
|||||||
let args = rig_cfg.rig.access.args.clone().unwrap_or_default();
|
let args = rig_cfg.rig.access.args.clone().unwrap_or_default();
|
||||||
Ok(RigAccess::Sdr { args })
|
Ok(RigAccess::Sdr { args })
|
||||||
}
|
}
|
||||||
Some(other) => Err(format!(
|
Some(other) => {
|
||||||
"Unknown access type '{}' for rig '{}'",
|
Err(format!("Unknown access type '{}' for rig '{}'", other, rig_cfg.id).into())
|
||||||
other, rig_cfg.id
|
}
|
||||||
)
|
|
||||||
.into()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -455,7 +453,10 @@ fn spawn_rig_audio_stack(
|
|||||||
if rig_cfg.audio.rx_enabled {
|
if rig_cfg.audio.rx_enabled {
|
||||||
if let Some(mut sdr_rx) = sdr_pcm_rx {
|
if let Some(mut sdr_rx) = sdr_pcm_rx {
|
||||||
// SDR path: the backend pipeline provides demodulated PCM.
|
// SDR path: the backend pipeline provides demodulated PCM.
|
||||||
info!("[{}] using SDR audio source — cpal capture disabled", rig_cfg.id);
|
info!(
|
||||||
|
"[{}] using SDR audio source — cpal capture disabled",
|
||||||
|
rig_cfg.id
|
||||||
|
);
|
||||||
let pcm_tx_clone = pcm_tx.clone();
|
let pcm_tx_clone = pcm_tx.clone();
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
@@ -661,7 +662,10 @@ async fn main() -> DynResult<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let callsign = cli.callsign.clone().or_else(|| cfg.general.callsign.clone());
|
let callsign = cli
|
||||||
|
.callsign
|
||||||
|
.clone()
|
||||||
|
.or_else(|| cfg.general.callsign.clone());
|
||||||
(callsign, cfg.general.latitude, cfg.general.longitude)
|
(callsign, cfg.general.latitude, cfg.general.longitude)
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -799,6 +803,7 @@ async fn main() -> DynResult<()> {
|
|||||||
rig_id: rig_cfg.id.clone(),
|
rig_id: rig_cfg.id.clone(),
|
||||||
rig_tx,
|
rig_tx,
|
||||||
state_rx,
|
state_rx,
|
||||||
|
audio_port: rig_cfg.audio.port,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,4 +20,6 @@ pub struct RigHandle {
|
|||||||
pub rig_tx: mpsc::Sender<RigRequest>,
|
pub rig_tx: mpsc::Sender<RigRequest>,
|
||||||
/// Watch the latest rig state for fast GetState/GetRigs responses.
|
/// Watch the latest rig state for fast GetState/GetRigs responses.
|
||||||
pub state_rx: watch::Receiver<RigState>,
|
pub state_rx: watch::Receiver<RigState>,
|
||||||
|
/// Per-rig audio listener TCP port.
|
||||||
|
pub audio_port: u16,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user