[feat](trx-server): fast per-rig meter broadcast at 30 Hz

Adds a per-rig meter broadcast channel on RigHandle and threads it
through run_rig_task. SDR meter tick drops from 100 ms to 33 ms; every
tick publishes a MeterUpdate while RigState is only updated on
>=0.25 dB deltas so rigctl/JSON-TCP frontends keep working without
amplifying state churn. Listener handles SubscribeMeter by converting
the TCP connection into a one-way JSON-line stream; TCP_NODELAY is
enabled on every accepted socket for low-latency frame delivery.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-04-19 19:50:06 +02:00
parent 894b7c57be
commit b12d93fb3c
4 changed files with 122 additions and 15 deletions
+63 -5
View File
@@ -267,6 +267,10 @@ async fn handle_client(
sat_pass_cache,
timeouts,
} = ctx;
// Disable Nagle so small frames (command responses, meter samples) ship
// immediately instead of sitting in the kernel's send buffer for up to
// ~40 ms waiting for more payload.
let _ = socket.set_nodelay(true);
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
@@ -473,6 +477,55 @@ async fn handle_client(
}
};
// SubscribeMeter: turns this connection into a one-way meter stream.
// No regular responses are produced; the connection lives until the
// client disconnects or shutdown fires.
if matches!(envelope.cmd, ClientCommand::SubscribeMeter) {
let mut meter_rx = handle.meter_tx.subscribe();
let io_timeout = timeouts.io_timeout;
info!(
"Client {} subscribed to meter stream for rig '{}'",
addr, target_rig_id
);
loop {
tokio::select! {
sample = meter_rx.recv() => {
match sample {
Ok(update) => {
let Ok(mut line) = serde_json::to_string(&update) else { continue };
line.push('\n');
let write = time::timeout(
io_timeout,
writer.write_all(line.as_bytes()),
).await;
match write {
Ok(Ok(())) => {}
Ok(Err(e)) => {
info!("Client {} meter write failed: {}", addr, e);
break;
}
Err(_) => {
info!("Client {} meter write timed out", addr);
break;
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => break,
Ok(()) => {}
Err(_) => break,
}
}
}
}
break;
}
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
// Fast path: serve GetSnapshot directly from the watch channel
// so clients get a response even while the rig task is initializing.
@@ -669,12 +722,14 @@ mod tests {
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
let (state_tx, state_rx) = watch::channel(state);
let _state_tx = state_tx;
let (meter_tx, _) = tokio::sync::broadcast::channel(8);
let handle = RigHandle {
rig_id: "default".to_string(),
display_name: "Default Rig".to_string(),
rig_tx,
state_rx,
audio_port: 4531,
meter_tx,
};
let mut map = HashMap::new();
map.insert("default".to_string(), handle);
@@ -858,33 +913,36 @@ mod tests {
/// Build a multi-rig HashMap with two rigs having independent state and
/// command channels. Returns the map, default rig id, and the mpsc
/// receivers for each rig so tests can inspect routed commands.
fn make_two_rigs(
state_a: RigState,
state_b: RigState,
) -> (
type TwoRigs = (
Arc<HashMap<String, RigHandle>>,
String,
mpsc::Receiver<RigRequest>,
mpsc::Receiver<RigRequest>,
) {
);
fn make_two_rigs(state_a: RigState, state_b: RigState) -> TwoRigs {
let (tx_a, rx_a) = mpsc::channel::<RigRequest>(8);
let (_state_tx_a, state_rx_a) = watch::channel(state_a);
let (meter_tx_a, _) = tokio::sync::broadcast::channel(8);
let handle_a = RigHandle {
rig_id: "rig_hf".to_string(),
display_name: "HF Rig".to_string(),
rig_tx: tx_a,
state_rx: state_rx_a,
audio_port: 4531,
meter_tx: meter_tx_a,
};
let (tx_b, rx_b) = mpsc::channel::<RigRequest>(8);
let (_state_tx_b, state_rx_b) = watch::channel(state_b);
let (meter_tx_b, _) = tokio::sync::broadcast::channel(8);
let handle_b = RigHandle {
rig_id: "rig_vhf".to_string(),
display_name: "VHF Rig".to_string(),
rig_tx: tx_b,
state_rx: state_rx_b,
audio_port: 4532,
meter_tx: meter_tx_b,
};
let mut map = HashMap::new();
+12 -3
View File
@@ -1075,6 +1075,8 @@ async fn main() -> DynResult<()> {
Some("Disabled".to_string())
};
let (state_tx, state_rx) = watch::channel(initial_state);
let (meter_tx, _) =
broadcast::channel::<trx_protocol::MeterUpdate>(rig_handle::METER_BROADCAST_CAPACITY);
let mut task_config = build_rig_task_config(
rig_cfg,
@@ -1101,10 +1103,16 @@ async fn main() -> DynResult<()> {
// silently losing the rig.
let rig_shutdown_rx = shutdown_rx.clone();
let rig_id_supervisor = rig_cfg.id.clone();
let meter_tx_task = meter_tx.clone();
task_handles.push(tokio::spawn(async move {
let result =
rig_task::run_rig_task(task_config, rig_rx, state_tx.clone(), rig_shutdown_rx)
.await;
let result = rig_task::run_rig_task(
task_config,
rig_rx,
state_tx.clone(),
meter_tx_task,
rig_shutdown_rx,
)
.await;
match result {
Ok(()) => {
info!("[{}] Rig task exited cleanly", rig_id_supervisor);
@@ -1153,6 +1161,7 @@ async fn main() -> DynResult<()> {
rig_tx,
state_rx,
audio_port: rig_cfg.audio.port,
meter_tx,
},
);
}
+11 -1
View File
@@ -4,10 +4,16 @@
//! Thin handle giving the listener access to one rig's task and state.
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use trx_core::rig::request::RigRequest;
use trx_core::rig::state::RigState;
use trx_protocol::MeterUpdate;
/// Bounded broadcast capacity for the meter stream. Keeps ~0.5 s of buffered
/// samples at 30 Hz — more than enough slack to tolerate a scheduling blip
/// without forcing the producer to block or drop silently.
pub const METER_BROADCAST_CAPACITY: usize = 16;
/// A handle to a single running rig backend.
///
@@ -24,4 +30,8 @@ pub struct RigHandle {
pub state_rx: watch::Receiver<RigState>,
/// Per-rig audio listener TCP port.
pub audio_port: u16,
/// Fast per-rig meter samples published by `rig_task` at ~30 Hz (SDR) or
/// ~67 Hz (CAT). Consumed by `SubscribeMeter` clients; independent of
/// the slower `state_rx` snapshot path.
pub meter_tx: broadcast::Sender<MeterUpdate>,
}
+36 -6
View File
@@ -7,11 +7,12 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use tokio::time::{self, Instant};
use tracing::{debug, error, info, warn};
use trx_backend::{RegistrationContext, RigAccess};
use trx_protocol::MeterUpdate;
use trx_core::radio::freq::Freq;
use trx_core::rig::command::RigCommand;
use trx_core::rig::controller::{
@@ -113,6 +114,7 @@ pub async fn run_rig_task(
config: RigTaskConfig,
mut rx: mpsc::Receiver<RigRequest>,
state_tx: watch::Sender<RigState>,
meter_tx: broadcast::Sender<MeterUpdate>,
mut shutdown_rx: watch::Receiver<bool>,
) -> DynResult<()> {
let histories = config.histories.clone();
@@ -255,15 +257,25 @@ pub async fn run_rig_task(
let _ = state_tx.send(state.clone());
// Run a fast meter tick between full polls to keep the S-meter
// responsive. SDR backends expose a cached DSP reading and can
// refresh every 100 ms; CAT rigs poll the serial link, which is
// slower but still fine at ~150 ms.
// responsive. SDR backends expose a cached DSP reading and can
// refresh every 33 ms (~30 Hz) for instant-feeling metering; CAT
// rigs poll the serial link, which is slower but still usable at
// ~150 ms.
//
// Every tick publishes to `meter_tx` (unconditional, for the fast
// `/meter` stream) and — gated by a small delta — to `state_tx` so
// other frontends that read `RigState.status.rx.sig` keep working
// without drowning the regular `/events` SSE in near-duplicate
// full-state frames at 30 Hz.
let is_sdr = rig.as_sdr_ref().is_some();
let meter_tick_duration = if is_sdr {
Duration::from_millis(100)
Duration::from_millis(33)
} else {
Duration::from_millis(150)
};
let meter_task_start = Instant::now();
let meter_state_delta_db: f64 = 0.25;
let rig_id = config.rig_id.clone();
let mut meter_tick: std::pin::Pin<Box<tokio::time::Sleep>> =
Box::pin(tokio::time::sleep(meter_tick_duration));
@@ -308,8 +320,26 @@ pub async fn run_rig_task(
None
};
if let Some(db) = new_sig {
// Always publish to the fast broadcast: subscribers
// see every tick, including unchanged values, so the
// UI animation stays smooth. `send` only errors when
// no subscribers exist; that's fine.
let ts_ms = meter_task_start.elapsed().as_millis() as u64;
let _ = meter_tx.send(MeterUpdate {
rig_id: rig_id.clone(),
sig_dbm: db,
ts_ms,
});
// Only push into `state_tx` when the sample moved by
// a meaningful amount — otherwise `/events` clients
// would re-serialize the entire RigState at 30 Hz.
let prev = state.status.rx.as_ref().and_then(|r| r.sig);
if prev != Some(db) {
let significant = match prev {
Some(p) => (p - db).abs() >= meter_state_delta_db,
None => true,
};
if significant {
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(db);
let _ = state_tx.send(state.clone());
}