Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
f9cf95705a
|
|||
|
fd0f1e43c0
|
|||
|
b12d93fb3c
|
|||
|
894b7c57be
|
Generated
+1
@@ -3168,6 +3168,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"trx-core",
|
"trx-core",
|
||||||
|
"trx-protocol",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -386,6 +386,7 @@ async fn async_init() -> DynResult<AppState> {
|
|||||||
rig_id_to_short_name,
|
rig_id_to_short_name,
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: frontend_runtime.routing.sat_passes.clone(),
|
sat_passes: frontend_runtime.routing.sat_passes.clone(),
|
||||||
|
rig_meters: frontend_runtime.routing.rig_meters.clone(),
|
||||||
};
|
};
|
||||||
let state_tx = state_tx.clone();
|
let state_tx = state_tx.clone();
|
||||||
let remote_shutdown_rx = shutdown_rx.clone();
|
let remote_shutdown_rx = shutdown_rx.clone();
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ use trx_core::{RigError, RigResult};
|
|||||||
use trx_frontend::{RemoteRigEntry, SharedSpectrum};
|
use trx_frontend::{RemoteRigEntry, SharedSpectrum};
|
||||||
use trx_protocol::rig_command_to_client;
|
use trx_protocol::rig_command_to_client;
|
||||||
use trx_protocol::types::RigEntry;
|
use trx_protocol::types::RigEntry;
|
||||||
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse};
|
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate};
|
||||||
|
|
||||||
const DEFAULT_REMOTE_PORT: u16 = 4530;
|
const DEFAULT_REMOTE_PORT: u16 = 4530;
|
||||||
const DEFAULT_AUDIO_PORT: u16 = 4531;
|
const DEFAULT_AUDIO_PORT: u16 = 4531;
|
||||||
@@ -77,6 +77,9 @@ pub struct RemoteClientConfig {
|
|||||||
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
|
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
|
||||||
/// Cached satellite pass predictions from the server (GetSatPasses).
|
/// Cached satellite pass predictions from the server (GetSatPasses).
|
||||||
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
|
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
|
||||||
|
/// Per-rig meter watch senders, keyed by short name (or rig_id in legacy mode).
|
||||||
|
/// Populated lazily by the meter-connection supervisor.
|
||||||
|
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_remote_client(
|
pub async fn run_remote_client(
|
||||||
@@ -88,6 +91,12 @@ pub async fn run_remote_client(
|
|||||||
// Spectrum polling runs on its own dedicated TCP connection so it never
|
// Spectrum polling runs on its own dedicated TCP connection so it never
|
||||||
// blocks state polls or user commands on the main connection.
|
// blocks state polls or user commands on the main connection.
|
||||||
let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone()));
|
let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone()));
|
||||||
|
// Meter supervisor: spawns per-rig meter-streaming TCP connections as
|
||||||
|
// soon as short names are discovered. Runs independently so the meter
|
||||||
|
// bar in the UI updates at the full server-side 30 Hz without being
|
||||||
|
// gated on state polls or user commands.
|
||||||
|
let meter_supervisor =
|
||||||
|
tokio::spawn(run_meter_supervisor(config.clone(), shutdown_rx.clone()));
|
||||||
|
|
||||||
let mut reconnect_delay = Duration::from_secs(1);
|
let mut reconnect_delay = Duration::from_secs(1);
|
||||||
|
|
||||||
@@ -95,6 +104,7 @@ pub async fn run_remote_client(
|
|||||||
if *shutdown_rx.borrow() {
|
if *shutdown_rx.borrow() {
|
||||||
info!("Remote client shutting down");
|
info!("Remote client shutting down");
|
||||||
spectrum_task.abort();
|
spectrum_task.abort();
|
||||||
|
meter_supervisor.abort();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,11 +169,13 @@ pub async fn run_remote_client(
|
|||||||
Ok(()) if *shutdown_rx.borrow() => {
|
Ok(()) if *shutdown_rx.borrow() => {
|
||||||
info!("Remote client shutting down");
|
info!("Remote client shutting down");
|
||||||
spectrum_task.abort();
|
spectrum_task.abort();
|
||||||
|
meter_supervisor.abort();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
spectrum_task.abort();
|
spectrum_task.abort();
|
||||||
|
meter_supervisor.abort();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -212,6 +224,177 @@ async fn run_spectrum_connection(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Meter stream supervisor. Watches the set of known short names and spawns
|
||||||
|
/// one dedicated TCP connection per rig that streams `MeterUpdate` JSON lines
|
||||||
|
/// (see `trx_protocol::MeterUpdate`). Each per-rig task owns its own watch
|
||||||
|
/// sender in `config.rig_meters` and reconnects on failure.
|
||||||
|
async fn run_meter_supervisor(
|
||||||
|
config: RemoteClientConfig,
|
||||||
|
mut shutdown_rx: watch::Receiver<bool>,
|
||||||
|
) {
|
||||||
|
let mut tasks: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
|
||||||
|
let mut poll = time::interval(Duration::from_millis(500));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = poll.tick() => {}
|
||||||
|
changed = shutdown_rx.changed() => {
|
||||||
|
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
|
||||||
|
for (_, handle) in tasks.drain() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let known = collect_known_short_names(&config);
|
||||||
|
for name in &known {
|
||||||
|
if tasks.contains_key(name) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Ensure a watch sender exists so SSE clients can subscribe
|
||||||
|
// before the first sample arrives.
|
||||||
|
{
|
||||||
|
if let Ok(mut map) = config.rig_meters.write() {
|
||||||
|
map.entry(name.clone())
|
||||||
|
.or_insert_with(|| watch::channel(None).0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let task = tokio::spawn(run_meter_connection(
|
||||||
|
config.clone(),
|
||||||
|
name.clone(),
|
||||||
|
shutdown_rx.clone(),
|
||||||
|
));
|
||||||
|
tasks.insert(name.clone(), task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect_known_short_names(config: &RemoteClientConfig) -> Vec<String> {
|
||||||
|
let mut names: Vec<String> = Vec::new();
|
||||||
|
if has_short_names(config) {
|
||||||
|
names.extend(config.rig_id_to_short_name.values().cloned());
|
||||||
|
}
|
||||||
|
if let Ok(map) = config.rig_states.read() {
|
||||||
|
for name in map.keys() {
|
||||||
|
if !names.iter().any(|n| n == name) {
|
||||||
|
names.push(name.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
names
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One dedicated TCP connection per short-name that sends `SubscribeMeter` and
|
||||||
|
/// pumps the server's `MeterUpdate` JSON stream into the per-rig watch sender.
|
||||||
|
async fn run_meter_connection(
|
||||||
|
config: RemoteClientConfig,
|
||||||
|
short_name: String,
|
||||||
|
mut shutdown_rx: watch::Receiver<bool>,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
if *shutdown_rx.borrow() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let stream = match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
|
||||||
|
Ok(Ok(s)) => s,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
warn!("Meter[{}]: connect failed: {}", short_name, e);
|
||||||
|
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
warn!("Meter[{}]: connect timed out", short_name);
|
||||||
|
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let _ = stream.set_nodelay(true);
|
||||||
|
|
||||||
|
if let Err(e) = stream_meter(&config, &short_name, stream, &mut shutdown_rx).await {
|
||||||
|
warn!("Meter[{}]: stream ended: {}", short_name, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_reconnect(shutdown_rx: &mut watch::Receiver<bool>, delay: Duration) -> bool {
|
||||||
|
tokio::select! {
|
||||||
|
_ = time::sleep(delay) => false,
|
||||||
|
changed = shutdown_rx.changed() => {
|
||||||
|
matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stream_meter(
|
||||||
|
config: &RemoteClientConfig,
|
||||||
|
short_name: &str,
|
||||||
|
stream: TcpStream,
|
||||||
|
shutdown_rx: &mut watch::Receiver<bool>,
|
||||||
|
) -> RigResult<()> {
|
||||||
|
let (reader, mut writer) = stream.into_split();
|
||||||
|
let mut reader = BufReader::new(reader);
|
||||||
|
|
||||||
|
let envelope = build_envelope(config, ClientCommand::SubscribeMeter, Some(short_name.to_string()));
|
||||||
|
let mut payload = serde_json::to_string(&envelope)
|
||||||
|
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
||||||
|
payload.push('\n');
|
||||||
|
time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
|
||||||
|
.await
|
||||||
|
.map_err(|_| RigError::communication("meter subscribe write timed out".to_string()))?
|
||||||
|
.map_err(|e| RigError::communication(format!("meter subscribe write failed: {e}")))?;
|
||||||
|
time::timeout(IO_TIMEOUT, writer.flush())
|
||||||
|
.await
|
||||||
|
.map_err(|_| RigError::communication("meter subscribe flush timed out".to_string()))?
|
||||||
|
.map_err(|e| RigError::communication(format!("meter subscribe flush failed: {e}")))?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
changed = shutdown_rx.changed() => {
|
||||||
|
match changed {
|
||||||
|
Ok(()) if *shutdown_rx.borrow() => return Ok(()),
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(_) => return Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line = read_limited_line(&mut reader, MAX_JSON_LINE_BYTES) => {
|
||||||
|
let line = line
|
||||||
|
.map_err(|e| RigError::communication(format!("meter read failed: {e}")))?
|
||||||
|
.ok_or_else(|| RigError::communication("meter connection closed".to_string()))?;
|
||||||
|
let trimmed = line.trim_end();
|
||||||
|
if trimmed.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let update: MeterUpdate = match serde_json::from_str(trimmed) {
|
||||||
|
Ok(u) => u,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Meter[{}]: bad frame: {}", short_name, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Ok(map) = config.rig_meters.read() {
|
||||||
|
if let Some(tx) = map.get(short_name) {
|
||||||
|
// `watch` keeps only the latest value; slow SSE
|
||||||
|
// readers simply skip intermediate samples, which is
|
||||||
|
// exactly the desired behaviour for a meter display.
|
||||||
|
let _ = tx.send(Some(update));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Satellite pass prediction refresh runs on a dedicated TCP connection so it
|
/// Satellite pass prediction refresh runs on a dedicated TCP connection so it
|
||||||
/// never blocks state polls or user commands on the main connection.
|
/// never blocks state polls or user commands on the main connection.
|
||||||
/// Fetches immediately on connect, then every 5 minutes.
|
/// Fetches immediately on connect, then every 5 minutes.
|
||||||
@@ -1300,6 +1483,7 @@ mod tests {
|
|||||||
rig_id_to_short_name: HashMap::new(),
|
rig_id_to_short_name: HashMap::new(),
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
},
|
},
|
||||||
req_rx,
|
req_rx,
|
||||||
state_tx,
|
state_tx,
|
||||||
@@ -1344,6 +1528,7 @@ mod tests {
|
|||||||
rig_id_to_short_name: HashMap::new(),
|
rig_id_to_short_name: HashMap::new(),
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
|
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
|
||||||
assert_eq!(envelope.token.as_deref(), Some("secret"));
|
assert_eq!(envelope.token.as_deref(), Some("secret"));
|
||||||
@@ -1371,6 +1556,7 @@ mod tests {
|
|||||||
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
|
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
|
||||||
short_name_to_rig_id,
|
short_name_to_rig_id,
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
|
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
|
||||||
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
|
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
|
||||||
@@ -1402,6 +1588,7 @@ mod tests {
|
|||||||
rig_id_to_short_name: HashMap::new(),
|
rig_id_to_short_name: HashMap::new(),
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
// Legacy mode: rig_id passes through unchanged
|
// Legacy mode: rig_id passes through unchanged
|
||||||
assert!(!has_short_names(&config));
|
assert!(!has_short_names(&config));
|
||||||
@@ -1428,6 +1615,7 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
assert!(has_short_names(&config));
|
assert!(has_short_names(&config));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -1464,6 +1652,7 @@ mod tests {
|
|||||||
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
|
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
let snapshot = sample_snapshot();
|
let snapshot = sample_snapshot();
|
||||||
let rigs = vec![RigEntry {
|
let rigs = vec![RigEntry {
|
||||||
@@ -1536,6 +1725,7 @@ mod tests {
|
|||||||
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
|
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
|
||||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||||
sat_passes: Arc::new(RwLock::new(None)),
|
sat_passes: Arc::new(RwLock::new(None)),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
|
|
||||||
let ids = super::active_spectrum_rig_ids(&config);
|
let ids = super::active_spectrum_rig_ids(&config);
|
||||||
|
|||||||
@@ -12,4 +12,5 @@ bytes = "1"
|
|||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
trx-core = { path = "../../trx-core" }
|
trx-core = { path = "../../trx-core" }
|
||||||
|
trx-protocol = { path = "../../trx-protocol" }
|
||||||
tokio = { workspace = true, features = ["sync"] }
|
tokio = { workspace = true, features = ["sync"] }
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use trx_core::decode::{
|
|||||||
};
|
};
|
||||||
use trx_core::rig::state::{RigSnapshot, SpectrumData};
|
use trx_core::rig::state::{RigSnapshot, SpectrumData};
|
||||||
use trx_core::{DynResult, RigRequest, RigState};
|
use trx_core::{DynResult, RigRequest, RigState};
|
||||||
|
use trx_protocol::MeterUpdate;
|
||||||
|
|
||||||
/// Shared, timestamped decode history for a single decoder type.
|
/// Shared, timestamped decode history for a single decoder type.
|
||||||
///
|
///
|
||||||
@@ -320,6 +321,10 @@ pub struct RigRoutingContext {
|
|||||||
pub server_connected: Arc<AtomicBool>,
|
pub server_connected: Arc<AtomicBool>,
|
||||||
/// Per-rig server connection state.
|
/// Per-rig server connection state.
|
||||||
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
|
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
|
||||||
|
/// Per-rig meter watch channels, keyed by rig_id. Populated lazily by
|
||||||
|
/// the meter-connection supervisor in `trx-client`; `None` on the sender
|
||||||
|
/// side means "no sample yet".
|
||||||
|
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RigRoutingContext {
|
impl Default for RigRoutingContext {
|
||||||
@@ -331,6 +336,7 @@ impl Default for RigRoutingContext {
|
|||||||
rig_states: Arc::new(RwLock::new(HashMap::new())),
|
rig_states: Arc::new(RwLock::new(HashMap::new())),
|
||||||
server_connected: Arc::new(AtomicBool::new(false)),
|
server_connected: Arc::new(AtomicBool::new(false)),
|
||||||
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
|
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -447,6 +453,25 @@ impl FrontendRuntimeContext {
|
|||||||
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
|
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a watch receiver for a specific rig's meter stream.
|
||||||
|
/// Lazily inserts a new channel if the rig_id is not yet present so
|
||||||
|
/// SSE clients can subscribe before the meter-connection supervisor
|
||||||
|
/// has produced a first sample.
|
||||||
|
pub fn rig_meter_rx(&self, rig_id: &str) -> watch::Receiver<Option<MeterUpdate>> {
|
||||||
|
if let Ok(map) = self.routing.rig_meters.read() {
|
||||||
|
if let Some(tx) = map.get(rig_id) {
|
||||||
|
return tx.subscribe();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Ok(mut map) = self.routing.rig_meters.write() {
|
||||||
|
map.entry(rig_id.to_string())
|
||||||
|
.or_insert_with(|| watch::channel(None).0)
|
||||||
|
.subscribe()
|
||||||
|
} else {
|
||||||
|
watch::channel(None).1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a watch receiver for a specific rig's spectrum.
|
/// Get a watch receiver for a specific rig's spectrum.
|
||||||
/// Lazily inserts a new channel if the rig_id is not yet present.
|
/// Lazily inserts a new channel if the rig_id is not yet present.
|
||||||
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {
|
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {
|
||||||
|
|||||||
@@ -3700,6 +3700,8 @@ function connect() {
|
|||||||
if (esHeartbeat) {
|
if (esHeartbeat) {
|
||||||
clearInterval(esHeartbeat);
|
clearInterval(esHeartbeat);
|
||||||
}
|
}
|
||||||
|
stopMeterStreaming();
|
||||||
|
startMeterStreaming();
|
||||||
pollFreshSnapshot();
|
pollFreshSnapshot();
|
||||||
const eventsUrl = lastActiveRigId
|
const eventsUrl = lastActiveRigId
|
||||||
? `/events?remote=${encodeURIComponent(lastActiveRigId)}`
|
? `/events?remote=${encodeURIComponent(lastActiveRigId)}`
|
||||||
@@ -3778,6 +3780,7 @@ function disconnect() {
|
|||||||
decodeSource = null;
|
decodeSource = null;
|
||||||
}
|
}
|
||||||
stopSpectrumStreaming();
|
stopSpectrumStreaming();
|
||||||
|
stopMeterStreaming();
|
||||||
// Clear timers
|
// Clear timers
|
||||||
if (esHeartbeat) {
|
if (esHeartbeat) {
|
||||||
clearInterval(esHeartbeat);
|
clearInterval(esHeartbeat);
|
||||||
@@ -3900,6 +3903,9 @@ async function switchRigFromSelect(selectEl) {
|
|||||||
// Reconnect spectrum SSE to the new rig's spectrum channel.
|
// Reconnect spectrum SSE to the new rig's spectrum channel.
|
||||||
stopSpectrumStreaming();
|
stopSpectrumStreaming();
|
||||||
startSpectrumStreaming();
|
startSpectrumStreaming();
|
||||||
|
// Reconnect meter SSE to the new rig's meter channel.
|
||||||
|
stopMeterStreaming();
|
||||||
|
startMeterStreaming();
|
||||||
// Reconnect audio to the new rig if audio is active.
|
// Reconnect audio to the new rig if audio is active.
|
||||||
if (rxActive) {
|
if (rxActive) {
|
||||||
stopRxAudio();
|
stopRxAudio();
|
||||||
@@ -6476,6 +6482,8 @@ const spectrumCenterLeftBtn = document.getElementById("spectrum-center-left-btn"
|
|||||||
const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn");
|
const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn");
|
||||||
let spectrumSource = null;
|
let spectrumSource = null;
|
||||||
let spectrumReconnectTimer = null;
|
let spectrumReconnectTimer = null;
|
||||||
|
let meterSource = null;
|
||||||
|
let meterReconnectTimer = null;
|
||||||
let spectrumDrawPending = false;
|
let spectrumDrawPending = false;
|
||||||
let spectrumAxisKey = "";
|
let spectrumAxisKey = "";
|
||||||
let spectrumDbAxisKey = "";
|
let spectrumDbAxisKey = "";
|
||||||
@@ -6996,6 +7004,63 @@ function stopSpectrumStreaming() {
|
|||||||
clearSpectrumCanvas();
|
clearSpectrumCanvas();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── /meter (fast signal-strength) streaming ─────────────────────────────────
|
||||||
|
// Dedicated SSE channel pushed at ~30 Hz by trx-server; bypasses /events so
|
||||||
|
// meter frames are never gated by full-RigState diffing. Synchronous DOM
|
||||||
|
// write per frame — no rAF coalescing, per user requirement that it "feel
|
||||||
|
// instant" on the frontend.
|
||||||
|
function scheduleMeterReconnect() {
|
||||||
|
if (meterReconnectTimer !== null) return;
|
||||||
|
meterReconnectTimer = setTimeout(() => {
|
||||||
|
meterReconnectTimer = null;
|
||||||
|
startMeterStreaming();
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
function applyMeterSample(dbm) {
|
||||||
|
if (typeof dbm !== "number" || !Number.isFinite(dbm)) return;
|
||||||
|
prevRenderData.sigDbm = dbm;
|
||||||
|
const sUnits = dbmToSUnits(dbm);
|
||||||
|
sigLastSUnits = sUnits;
|
||||||
|
sigLastDbm = dbm;
|
||||||
|
const pct = sUnits <= 9 ? Math.max(0, Math.min(100, (sUnits / 9) * 100)) : 100;
|
||||||
|
if (signalBar) signalBar.style.width = `${pct}%`;
|
||||||
|
if (signalValue) signalValue.innerHTML = formatSignal(sUnits);
|
||||||
|
refreshSigStrengthDisplay();
|
||||||
|
}
|
||||||
|
|
||||||
|
function startMeterStreaming() {
|
||||||
|
if (meterSource !== null) return;
|
||||||
|
const url = lastActiveRigId
|
||||||
|
? `/meter?remote=${encodeURIComponent(lastActiveRigId)}`
|
||||||
|
: "/meter";
|
||||||
|
meterSource = new EventSource(url);
|
||||||
|
meterSource.onmessage = (evt) => {
|
||||||
|
try {
|
||||||
|
const { sig } = JSON.parse(evt.data);
|
||||||
|
applyMeterSample(sig);
|
||||||
|
} catch (_) {}
|
||||||
|
};
|
||||||
|
meterSource.onerror = () => {
|
||||||
|
if (meterSource) {
|
||||||
|
meterSource.close();
|
||||||
|
meterSource = null;
|
||||||
|
}
|
||||||
|
scheduleMeterReconnect();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function stopMeterStreaming() {
|
||||||
|
if (meterSource !== null) {
|
||||||
|
meterSource.close();
|
||||||
|
meterSource = null;
|
||||||
|
}
|
||||||
|
if (meterReconnectTimer !== null) {
|
||||||
|
clearTimeout(meterReconnectTimer);
|
||||||
|
meterReconnectTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Rendering ────────────────────────────────────────────────────────────────
|
// ── Rendering ────────────────────────────────────────────────────────────────
|
||||||
function clearSpectrumCanvas() {
|
function clearSpectrumCanvas() {
|
||||||
if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return;
|
if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return;
|
||||||
|
|||||||
@@ -574,6 +574,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
// SSE streams
|
// SSE streams
|
||||||
.service(sse::events)
|
.service(sse::events)
|
||||||
.service(sse::spectrum)
|
.service(sse::spectrum)
|
||||||
|
.service(sse::meter)
|
||||||
// Decoder endpoints
|
// Decoder endpoints
|
||||||
.service(decoder::decoder_registry)
|
.service(decoder::decoder_registry)
|
||||||
.service(decoder::decode_history)
|
.service(decoder::decode_history)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use uuid::Uuid;
|
|||||||
|
|
||||||
use trx_core::RigState;
|
use trx_core::RigState;
|
||||||
use trx_frontend::FrontendRuntimeContext;
|
use trx_frontend::FrontendRuntimeContext;
|
||||||
|
use trx_protocol::MeterUpdate;
|
||||||
|
|
||||||
use crate::server::vchan::ClientChannelManager;
|
use crate::server::vchan::ClientChannelManager;
|
||||||
|
|
||||||
@@ -337,6 +338,62 @@ pub async fn events(
|
|||||||
.streaming(stream))
|
.streaming(stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// /meter SSE endpoint (fast signal-strength stream, ~30 Hz)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
fn encode_meter_frame(update: &MeterUpdate) -> String {
|
||||||
|
// Compact JSON: one-line SSE frame, flushed immediately.
|
||||||
|
// Shape: {"sig":-72.3,"ts":12345}
|
||||||
|
format!(
|
||||||
|
"data: {{\"sig\":{:.2},\"ts\":{}}}\n\n",
|
||||||
|
update.sig_dbm, update.ts_ms
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SSE stream for per-rig signal-strength updates.
|
||||||
|
///
|
||||||
|
/// Pushed from the server's per-rig meter broadcast; intentionally bypasses
|
||||||
|
/// the `/events` RigState path so high-rate meter samples are never gated by
|
||||||
|
/// full-state diffing. Each watch update produces exactly one SSE frame.
|
||||||
|
#[get("/meter")]
|
||||||
|
pub async fn meter(
|
||||||
|
query: web::Query<RemoteQuery>,
|
||||||
|
context: web::Data<Arc<FrontendRuntimeContext>>,
|
||||||
|
) -> Result<HttpResponse, Error> {
|
||||||
|
let rig_id = query.remote.clone().filter(|s| !s.is_empty()).or_else(|| {
|
||||||
|
context
|
||||||
|
.routing
|
||||||
|
.active_rig_id
|
||||||
|
.lock()
|
||||||
|
.ok()
|
||||||
|
.and_then(|g| g.clone())
|
||||||
|
});
|
||||||
|
|
||||||
|
let rx = match rig_id.as_deref() {
|
||||||
|
Some(rid) => context.rig_meter_rx(rid),
|
||||||
|
None => return Ok(HttpResponse::NotFound().finish()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let updates = WatchStream::new(rx).filter_map(|maybe| {
|
||||||
|
let chunk = maybe.as_ref().map(encode_meter_frame);
|
||||||
|
std::future::ready(chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s))))
|
||||||
|
});
|
||||||
|
|
||||||
|
// Infrequent keepalive comment; real meter frames carry the heartbeat.
|
||||||
|
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
||||||
|
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
||||||
|
|
||||||
|
let stream = select(pings, updates);
|
||||||
|
|
||||||
|
Ok(HttpResponse::Ok()
|
||||||
|
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
|
||||||
|
.insert_header((header::CONTENT_ENCODING, "identity"))
|
||||||
|
.insert_header((header::CACHE_CONTROL, "no-cache"))
|
||||||
|
.insert_header((header::CONNECTION, "keep-alive"))
|
||||||
|
.streaming(stream))
|
||||||
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// /spectrum SSE endpoint
|
// /spectrum SSE endpoint
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|||||||
@@ -526,6 +526,7 @@ impl RouteAccess {
|
|||||||
|| path == "/decode"
|
|| path == "/decode"
|
||||||
|| path == "/decode/history"
|
|| path == "/decode/history"
|
||||||
|| path == "/spectrum"
|
|| path == "/spectrum"
|
||||||
|
|| path == "/meter"
|
||||||
|| path == "/audio"
|
|| path == "/audio"
|
||||||
|| path == "/bookmarks"
|
|| path == "/bookmarks"
|
||||||
|| path.starts_with("/status?")
|
|| path.starts_with("/status?")
|
||||||
@@ -534,6 +535,7 @@ impl RouteAccess {
|
|||||||
|| path.starts_with("/decode?")
|
|| path.starts_with("/decode?")
|
||||||
|| path.starts_with("/decode/history?")
|
|| path.starts_with("/decode/history?")
|
||||||
|| path.starts_with("/spectrum?")
|
|| path.starts_with("/spectrum?")
|
||||||
|
|| path.starts_with("/meter?")
|
||||||
|| path.starts_with("/audio?")
|
|| path.starts_with("/audio?")
|
||||||
|| path.starts_with("/bookmarks?")
|
|| path.starts_with("/bookmarks?")
|
||||||
|| path.starts_with("/bookmarks/")
|
|| path.starts_with("/bookmarks/")
|
||||||
@@ -703,6 +705,7 @@ mod tests {
|
|||||||
assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read);
|
assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read);
|
||||||
assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read);
|
assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read);
|
||||||
assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read);
|
assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read);
|
||||||
|
assert_eq!(RouteAccess::from_path("/meter"), RouteAccess::Read);
|
||||||
assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read);
|
assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,4 +18,4 @@ pub use auth::{NoAuthValidator, SimpleTokenValidator, TokenValidator};
|
|||||||
pub use codec::{mode_to_string, parse_envelope, parse_mode};
|
pub use codec::{mode_to_string, parse_envelope, parse_mode};
|
||||||
pub use decoders::{DecoderActivation, DecoderDescriptor, DECODER_REGISTRY};
|
pub use decoders::{DecoderActivation, DecoderDescriptor, DECODER_REGISTRY};
|
||||||
pub use mapping::{client_command_to_rig, rig_command_to_client};
|
pub use mapping::{client_command_to_rig, rig_command_to_client};
|
||||||
pub use types::{ClientCommand, ClientEnvelope, ClientResponse, RigEntry};
|
pub use types::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate, RigEntry};
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ macro_rules! define_command_mapping {
|
|||||||
|
|
||||||
define_command_mapping! {
|
define_command_mapping! {
|
||||||
// ── Client-only variants (no RigCommand counterpart) ─────────────
|
// ── Client-only variants (no RigCommand counterpart) ─────────────
|
||||||
client_only: GetRigs, GetSatPasses;
|
client_only: GetRigs, GetSatPasses, SubscribeMeter;
|
||||||
|
|
||||||
// ── Unit variants (no payload) ───────────────────────────────────
|
// ── Unit variants (no payload) ───────────────────────────────────
|
||||||
unit:
|
unit:
|
||||||
|
|||||||
@@ -61,6 +61,24 @@ pub enum ClientCommand {
|
|||||||
SetSamCarrierSync { enabled: bool },
|
SetSamCarrierSync { enabled: bool },
|
||||||
SetRecorderEnabled { enabled: bool },
|
SetRecorderEnabled { enabled: bool },
|
||||||
GetSpectrum,
|
GetSpectrum,
|
||||||
|
/// Subscribe to a per-rig meter stream on this connection. After the
|
||||||
|
/// server receives this command, the connection becomes a one-way flow of
|
||||||
|
/// newline-delimited `MeterUpdate` JSON frames and no further commands or
|
||||||
|
/// regular responses are sent. Intended for a dedicated TCP connection.
|
||||||
|
SubscribeMeter,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fast meter sample pushed by the server on a dedicated meter stream.
|
||||||
|
///
|
||||||
|
/// Emitted at ~30 Hz for SDR backends and ~6–7 Hz for CAT backends.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct MeterUpdate {
|
||||||
|
/// Rig identifier this sample belongs to.
|
||||||
|
pub rig_id: String,
|
||||||
|
/// Receive signal strength in dBm (rig/DSP-reported).
|
||||||
|
pub sig_dbm: f64,
|
||||||
|
/// Monotonic millisecond timestamp from the server's steady clock.
|
||||||
|
pub ts_ms: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Envelope for client commands with optional authentication token and rig routing.
|
/// Envelope for client commands with optional authentication token and rig routing.
|
||||||
|
|||||||
@@ -267,6 +267,10 @@ async fn handle_client(
|
|||||||
sat_pass_cache,
|
sat_pass_cache,
|
||||||
timeouts,
|
timeouts,
|
||||||
} = ctx;
|
} = ctx;
|
||||||
|
// Disable Nagle so small frames (command responses, meter samples) ship
|
||||||
|
// immediately instead of sitting in the kernel's send buffer for up to
|
||||||
|
// ~40 ms waiting for more payload.
|
||||||
|
let _ = socket.set_nodelay(true);
|
||||||
let (reader, mut writer) = socket.into_split();
|
let (reader, mut writer) = socket.into_split();
|
||||||
let mut reader = BufReader::new(reader);
|
let mut reader = BufReader::new(reader);
|
||||||
|
|
||||||
@@ -473,6 +477,55 @@ async fn handle_client(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// SubscribeMeter: turns this connection into a one-way meter stream.
|
||||||
|
// No regular responses are produced; the connection lives until the
|
||||||
|
// client disconnects or shutdown fires.
|
||||||
|
if matches!(envelope.cmd, ClientCommand::SubscribeMeter) {
|
||||||
|
let mut meter_rx = handle.meter_tx.subscribe();
|
||||||
|
let io_timeout = timeouts.io_timeout;
|
||||||
|
info!(
|
||||||
|
"Client {} subscribed to meter stream for rig '{}'",
|
||||||
|
addr, target_rig_id
|
||||||
|
);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
sample = meter_rx.recv() => {
|
||||||
|
match sample {
|
||||||
|
Ok(update) => {
|
||||||
|
let Ok(mut line) = serde_json::to_string(&update) else { continue };
|
||||||
|
line.push('\n');
|
||||||
|
let write = time::timeout(
|
||||||
|
io_timeout,
|
||||||
|
writer.write_all(line.as_bytes()),
|
||||||
|
).await;
|
||||||
|
match write {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
info!("Client {} meter write failed: {}", addr, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
info!("Client {} meter write timed out", addr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
changed = shutdown_rx.changed() => {
|
||||||
|
match changed {
|
||||||
|
Ok(()) if *shutdown_rx.borrow() => break,
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
||||||
// Fast path: serve GetSnapshot directly from the watch channel
|
// Fast path: serve GetSnapshot directly from the watch channel
|
||||||
// so clients get a response even while the rig task is initializing.
|
// so clients get a response even while the rig task is initializing.
|
||||||
@@ -669,12 +722,14 @@ mod tests {
|
|||||||
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
|
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
|
||||||
let (state_tx, state_rx) = watch::channel(state);
|
let (state_tx, state_rx) = watch::channel(state);
|
||||||
let _state_tx = state_tx;
|
let _state_tx = state_tx;
|
||||||
|
let (meter_tx, _) = tokio::sync::broadcast::channel(8);
|
||||||
let handle = RigHandle {
|
let handle = RigHandle {
|
||||||
rig_id: "default".to_string(),
|
rig_id: "default".to_string(),
|
||||||
display_name: "Default Rig".to_string(),
|
display_name: "Default Rig".to_string(),
|
||||||
rig_tx,
|
rig_tx,
|
||||||
state_rx,
|
state_rx,
|
||||||
audio_port: 4531,
|
audio_port: 4531,
|
||||||
|
meter_tx,
|
||||||
};
|
};
|
||||||
let mut map = HashMap::new();
|
let mut map = HashMap::new();
|
||||||
map.insert("default".to_string(), handle);
|
map.insert("default".to_string(), handle);
|
||||||
@@ -858,33 +913,36 @@ mod tests {
|
|||||||
/// Build a multi-rig HashMap with two rigs having independent state and
|
/// Build a multi-rig HashMap with two rigs having independent state and
|
||||||
/// command channels. Returns the map, default rig id, and the mpsc
|
/// command channels. Returns the map, default rig id, and the mpsc
|
||||||
/// receivers for each rig so tests can inspect routed commands.
|
/// receivers for each rig so tests can inspect routed commands.
|
||||||
fn make_two_rigs(
|
type TwoRigs = (
|
||||||
state_a: RigState,
|
|
||||||
state_b: RigState,
|
|
||||||
) -> (
|
|
||||||
Arc<HashMap<String, RigHandle>>,
|
Arc<HashMap<String, RigHandle>>,
|
||||||
String,
|
String,
|
||||||
mpsc::Receiver<RigRequest>,
|
mpsc::Receiver<RigRequest>,
|
||||||
mpsc::Receiver<RigRequest>,
|
mpsc::Receiver<RigRequest>,
|
||||||
) {
|
);
|
||||||
|
|
||||||
|
fn make_two_rigs(state_a: RigState, state_b: RigState) -> TwoRigs {
|
||||||
let (tx_a, rx_a) = mpsc::channel::<RigRequest>(8);
|
let (tx_a, rx_a) = mpsc::channel::<RigRequest>(8);
|
||||||
let (_state_tx_a, state_rx_a) = watch::channel(state_a);
|
let (_state_tx_a, state_rx_a) = watch::channel(state_a);
|
||||||
|
let (meter_tx_a, _) = tokio::sync::broadcast::channel(8);
|
||||||
let handle_a = RigHandle {
|
let handle_a = RigHandle {
|
||||||
rig_id: "rig_hf".to_string(),
|
rig_id: "rig_hf".to_string(),
|
||||||
display_name: "HF Rig".to_string(),
|
display_name: "HF Rig".to_string(),
|
||||||
rig_tx: tx_a,
|
rig_tx: tx_a,
|
||||||
state_rx: state_rx_a,
|
state_rx: state_rx_a,
|
||||||
audio_port: 4531,
|
audio_port: 4531,
|
||||||
|
meter_tx: meter_tx_a,
|
||||||
};
|
};
|
||||||
|
|
||||||
let (tx_b, rx_b) = mpsc::channel::<RigRequest>(8);
|
let (tx_b, rx_b) = mpsc::channel::<RigRequest>(8);
|
||||||
let (_state_tx_b, state_rx_b) = watch::channel(state_b);
|
let (_state_tx_b, state_rx_b) = watch::channel(state_b);
|
||||||
|
let (meter_tx_b, _) = tokio::sync::broadcast::channel(8);
|
||||||
let handle_b = RigHandle {
|
let handle_b = RigHandle {
|
||||||
rig_id: "rig_vhf".to_string(),
|
rig_id: "rig_vhf".to_string(),
|
||||||
display_name: "VHF Rig".to_string(),
|
display_name: "VHF Rig".to_string(),
|
||||||
rig_tx: tx_b,
|
rig_tx: tx_b,
|
||||||
state_rx: state_rx_b,
|
state_rx: state_rx_b,
|
||||||
audio_port: 4532,
|
audio_port: 4532,
|
||||||
|
meter_tx: meter_tx_b,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut map = HashMap::new();
|
let mut map = HashMap::new();
|
||||||
|
|||||||
@@ -1075,6 +1075,8 @@ async fn main() -> DynResult<()> {
|
|||||||
Some("Disabled".to_string())
|
Some("Disabled".to_string())
|
||||||
};
|
};
|
||||||
let (state_tx, state_rx) = watch::channel(initial_state);
|
let (state_tx, state_rx) = watch::channel(initial_state);
|
||||||
|
let (meter_tx, _) =
|
||||||
|
broadcast::channel::<trx_protocol::MeterUpdate>(rig_handle::METER_BROADCAST_CAPACITY);
|
||||||
|
|
||||||
let mut task_config = build_rig_task_config(
|
let mut task_config = build_rig_task_config(
|
||||||
rig_cfg,
|
rig_cfg,
|
||||||
@@ -1101,9 +1103,15 @@ async fn main() -> DynResult<()> {
|
|||||||
// silently losing the rig.
|
// silently losing the rig.
|
||||||
let rig_shutdown_rx = shutdown_rx.clone();
|
let rig_shutdown_rx = shutdown_rx.clone();
|
||||||
let rig_id_supervisor = rig_cfg.id.clone();
|
let rig_id_supervisor = rig_cfg.id.clone();
|
||||||
|
let meter_tx_task = meter_tx.clone();
|
||||||
task_handles.push(tokio::spawn(async move {
|
task_handles.push(tokio::spawn(async move {
|
||||||
let result =
|
let result = rig_task::run_rig_task(
|
||||||
rig_task::run_rig_task(task_config, rig_rx, state_tx.clone(), rig_shutdown_rx)
|
task_config,
|
||||||
|
rig_rx,
|
||||||
|
state_tx.clone(),
|
||||||
|
meter_tx_task,
|
||||||
|
rig_shutdown_rx,
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
@@ -1153,6 +1161,7 @@ async fn main() -> DynResult<()> {
|
|||||||
rig_tx,
|
rig_tx,
|
||||||
state_rx,
|
state_rx,
|
||||||
audio_port: rig_cfg.audio.port,
|
audio_port: rig_cfg.audio.port,
|
||||||
|
meter_tx,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,10 +4,16 @@
|
|||||||
|
|
||||||
//! Thin handle giving the listener access to one rig's task and state.
|
//! Thin handle giving the listener access to one rig's task and state.
|
||||||
|
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{broadcast, mpsc, watch};
|
||||||
|
|
||||||
use trx_core::rig::request::RigRequest;
|
use trx_core::rig::request::RigRequest;
|
||||||
use trx_core::rig::state::RigState;
|
use trx_core::rig::state::RigState;
|
||||||
|
use trx_protocol::MeterUpdate;
|
||||||
|
|
||||||
|
/// Bounded broadcast capacity for the meter stream. Keeps ~0.5 s of buffered
|
||||||
|
/// samples at 30 Hz — more than enough slack to tolerate a scheduling blip
|
||||||
|
/// without forcing the producer to block or drop silently.
|
||||||
|
pub const METER_BROADCAST_CAPACITY: usize = 16;
|
||||||
|
|
||||||
/// A handle to a single running rig backend.
|
/// A handle to a single running rig backend.
|
||||||
///
|
///
|
||||||
@@ -24,4 +30,8 @@ pub struct RigHandle {
|
|||||||
pub state_rx: watch::Receiver<RigState>,
|
pub state_rx: watch::Receiver<RigState>,
|
||||||
/// Per-rig audio listener TCP port.
|
/// Per-rig audio listener TCP port.
|
||||||
pub audio_port: u16,
|
pub audio_port: u16,
|
||||||
|
/// Fast per-rig meter samples published by `rig_task` at ~30 Hz (SDR) or
|
||||||
|
/// ~6–7 Hz (CAT). Consumed by `SubscribeMeter` clients; independent of
|
||||||
|
/// the slower `state_rx` snapshot path.
|
||||||
|
pub meter_tx: broadcast::Sender<MeterUpdate>,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,11 +7,12 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{broadcast, mpsc, watch};
|
||||||
use tokio::time::{self, Instant};
|
use tokio::time::{self, Instant};
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use trx_backend::{RegistrationContext, RigAccess};
|
use trx_backend::{RegistrationContext, RigAccess};
|
||||||
|
use trx_protocol::MeterUpdate;
|
||||||
use trx_core::radio::freq::Freq;
|
use trx_core::radio::freq::Freq;
|
||||||
use trx_core::rig::command::RigCommand;
|
use trx_core::rig::command::RigCommand;
|
||||||
use trx_core::rig::controller::{
|
use trx_core::rig::controller::{
|
||||||
@@ -113,6 +114,7 @@ pub async fn run_rig_task(
|
|||||||
config: RigTaskConfig,
|
config: RigTaskConfig,
|
||||||
mut rx: mpsc::Receiver<RigRequest>,
|
mut rx: mpsc::Receiver<RigRequest>,
|
||||||
state_tx: watch::Sender<RigState>,
|
state_tx: watch::Sender<RigState>,
|
||||||
|
meter_tx: broadcast::Sender<MeterUpdate>,
|
||||||
mut shutdown_rx: watch::Receiver<bool>,
|
mut shutdown_rx: watch::Receiver<bool>,
|
||||||
) -> DynResult<()> {
|
) -> DynResult<()> {
|
||||||
let histories = config.histories.clone();
|
let histories = config.histories.clone();
|
||||||
@@ -256,14 +258,24 @@ pub async fn run_rig_task(
|
|||||||
|
|
||||||
// Run a fast meter tick between full polls to keep the S-meter
|
// Run a fast meter tick between full polls to keep the S-meter
|
||||||
// responsive. SDR backends expose a cached DSP reading and can
|
// responsive. SDR backends expose a cached DSP reading and can
|
||||||
// refresh every 100 ms; CAT rigs poll the serial link, which is
|
// refresh every 33 ms (~30 Hz) for instant-feeling metering; CAT
|
||||||
// slower but still fine at ~150 ms.
|
// rigs poll the serial link, which is slower but still usable at
|
||||||
|
// ~150 ms.
|
||||||
|
//
|
||||||
|
// Every tick publishes to `meter_tx` (unconditional, for the fast
|
||||||
|
// `/meter` stream) and — gated by a small delta — to `state_tx` so
|
||||||
|
// other frontends that read `RigState.status.rx.sig` keep working
|
||||||
|
// without drowning the regular `/events` SSE in near-duplicate
|
||||||
|
// full-state frames at 30 Hz.
|
||||||
let is_sdr = rig.as_sdr_ref().is_some();
|
let is_sdr = rig.as_sdr_ref().is_some();
|
||||||
let meter_tick_duration = if is_sdr {
|
let meter_tick_duration = if is_sdr {
|
||||||
Duration::from_millis(100)
|
Duration::from_millis(33)
|
||||||
} else {
|
} else {
|
||||||
Duration::from_millis(150)
|
Duration::from_millis(150)
|
||||||
};
|
};
|
||||||
|
let meter_task_start = Instant::now();
|
||||||
|
let meter_state_delta_db: f64 = 0.25;
|
||||||
|
let rig_id = config.rig_id.clone();
|
||||||
let mut meter_tick: std::pin::Pin<Box<tokio::time::Sleep>> =
|
let mut meter_tick: std::pin::Pin<Box<tokio::time::Sleep>> =
|
||||||
Box::pin(tokio::time::sleep(meter_tick_duration));
|
Box::pin(tokio::time::sleep(meter_tick_duration));
|
||||||
|
|
||||||
@@ -308,8 +320,26 @@ pub async fn run_rig_task(
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
if let Some(db) = new_sig {
|
if let Some(db) = new_sig {
|
||||||
|
// Always publish to the fast broadcast: subscribers
|
||||||
|
// see every tick, including unchanged values, so the
|
||||||
|
// UI animation stays smooth. `send` only errors when
|
||||||
|
// no subscribers exist; that's fine.
|
||||||
|
let ts_ms = meter_task_start.elapsed().as_millis() as u64;
|
||||||
|
let _ = meter_tx.send(MeterUpdate {
|
||||||
|
rig_id: rig_id.clone(),
|
||||||
|
sig_dbm: db,
|
||||||
|
ts_ms,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Only push into `state_tx` when the sample moved by
|
||||||
|
// a meaningful amount — otherwise `/events` clients
|
||||||
|
// would re-serialize the entire RigState at 30 Hz.
|
||||||
let prev = state.status.rx.as_ref().and_then(|r| r.sig);
|
let prev = state.status.rx.as_ref().and_then(|r| r.sig);
|
||||||
if prev != Some(db) {
|
let significant = match prev {
|
||||||
|
Some(p) => (p - db).abs() >= meter_state_delta_db,
|
||||||
|
None => true,
|
||||||
|
};
|
||||||
|
if significant {
|
||||||
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(db);
|
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(db);
|
||||||
let _ = state_tx.send(state.clone());
|
let _ = state_tx.send(state.clone());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user