diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index 8dd08df..a9509cc 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -267,6 +267,10 @@ async fn handle_client( sat_pass_cache, timeouts, } = ctx; + // Disable Nagle so small frames (command responses, meter samples) ship + // immediately instead of sitting in the kernel's send buffer for up to + // ~40 ms waiting for more payload. + let _ = socket.set_nodelay(true); let (reader, mut writer) = socket.into_split(); let mut reader = BufReader::new(reader); @@ -473,6 +477,55 @@ async fn handle_client( } }; + // SubscribeMeter: turns this connection into a one-way meter stream. + // No regular responses are produced; the connection lives until the + // client disconnects or shutdown fires. + if matches!(envelope.cmd, ClientCommand::SubscribeMeter) { + let mut meter_rx = handle.meter_tx.subscribe(); + let io_timeout = timeouts.io_timeout; + info!( + "Client {} subscribed to meter stream for rig '{}'", + addr, target_rig_id + ); + loop { + tokio::select! { + sample = meter_rx.recv() => { + match sample { + Ok(update) => { + let Ok(mut line) = serde_json::to_string(&update) else { continue }; + line.push('\n'); + let write = time::timeout( + io_timeout, + writer.write_all(line.as_bytes()), + ).await; + match write { + Ok(Ok(())) => {} + Ok(Err(e)) => { + info!("Client {} meter write failed: {}", addr, e); + break; + } + Err(_) => { + info!("Client {} meter write timed out", addr); + break; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => break, + Ok(()) => {} + Err(_) => break, + } + } + } + } + break; + } + let rig_cmd = mapping::client_command_to_rig(envelope.cmd); // Fast path: serve GetSnapshot directly from the watch channel // so clients get a response even while the rig task is initializing. @@ -669,12 +722,14 @@ mod tests { let (rig_tx, _rig_rx) = mpsc::channel::(8); let (state_tx, state_rx) = watch::channel(state); let _state_tx = state_tx; + let (meter_tx, _) = tokio::sync::broadcast::channel(8); let handle = RigHandle { rig_id: "default".to_string(), display_name: "Default Rig".to_string(), rig_tx, state_rx, audio_port: 4531, + meter_tx, }; let mut map = HashMap::new(); map.insert("default".to_string(), handle); @@ -858,33 +913,36 @@ mod tests { /// Build a multi-rig HashMap with two rigs having independent state and /// command channels. Returns the map, default rig id, and the mpsc /// receivers for each rig so tests can inspect routed commands. - fn make_two_rigs( - state_a: RigState, - state_b: RigState, - ) -> ( + type TwoRigs = ( Arc>, String, mpsc::Receiver, mpsc::Receiver, - ) { + ); + + fn make_two_rigs(state_a: RigState, state_b: RigState) -> TwoRigs { let (tx_a, rx_a) = mpsc::channel::(8); let (_state_tx_a, state_rx_a) = watch::channel(state_a); + let (meter_tx_a, _) = tokio::sync::broadcast::channel(8); let handle_a = RigHandle { rig_id: "rig_hf".to_string(), display_name: "HF Rig".to_string(), rig_tx: tx_a, state_rx: state_rx_a, audio_port: 4531, + meter_tx: meter_tx_a, }; let (tx_b, rx_b) = mpsc::channel::(8); let (_state_tx_b, state_rx_b) = watch::channel(state_b); + let (meter_tx_b, _) = tokio::sync::broadcast::channel(8); let handle_b = RigHandle { rig_id: "rig_vhf".to_string(), display_name: "VHF Rig".to_string(), rig_tx: tx_b, state_rx: state_rx_b, audio_port: 4532, + meter_tx: meter_tx_b, }; let mut map = HashMap::new(); diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 1b14ce7..a1756c0 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -1075,6 +1075,8 @@ async fn main() -> DynResult<()> { Some("Disabled".to_string()) }; let (state_tx, state_rx) = watch::channel(initial_state); + let (meter_tx, _) = + broadcast::channel::(rig_handle::METER_BROADCAST_CAPACITY); let mut task_config = build_rig_task_config( rig_cfg, @@ -1101,10 +1103,16 @@ async fn main() -> DynResult<()> { // silently losing the rig. let rig_shutdown_rx = shutdown_rx.clone(); let rig_id_supervisor = rig_cfg.id.clone(); + let meter_tx_task = meter_tx.clone(); task_handles.push(tokio::spawn(async move { - let result = - rig_task::run_rig_task(task_config, rig_rx, state_tx.clone(), rig_shutdown_rx) - .await; + let result = rig_task::run_rig_task( + task_config, + rig_rx, + state_tx.clone(), + meter_tx_task, + rig_shutdown_rx, + ) + .await; match result { Ok(()) => { info!("[{}] Rig task exited cleanly", rig_id_supervisor); @@ -1153,6 +1161,7 @@ async fn main() -> DynResult<()> { rig_tx, state_rx, audio_port: rig_cfg.audio.port, + meter_tx, }, ); } diff --git a/src/trx-server/src/rig_handle.rs b/src/trx-server/src/rig_handle.rs index e12227a..c1952da 100644 --- a/src/trx-server/src/rig_handle.rs +++ b/src/trx-server/src/rig_handle.rs @@ -4,10 +4,16 @@ //! Thin handle giving the listener access to one rig's task and state. -use tokio::sync::{mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch}; use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; +use trx_protocol::MeterUpdate; + +/// Bounded broadcast capacity for the meter stream. Keeps ~0.5 s of buffered +/// samples at 30 Hz — more than enough slack to tolerate a scheduling blip +/// without forcing the producer to block or drop silently. +pub const METER_BROADCAST_CAPACITY: usize = 16; /// A handle to a single running rig backend. /// @@ -24,4 +30,8 @@ pub struct RigHandle { pub state_rx: watch::Receiver, /// Per-rig audio listener TCP port. pub audio_port: u16, + /// Fast per-rig meter samples published by `rig_task` at ~30 Hz (SDR) or + /// ~6–7 Hz (CAT). Consumed by `SubscribeMeter` clients; independent of + /// the slower `state_rx` snapshot path. + pub meter_tx: broadcast::Sender, } diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index d565688..8b1d2a4 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -7,11 +7,12 @@ use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch}; use tokio::time::{self, Instant}; use tracing::{debug, error, info, warn}; use trx_backend::{RegistrationContext, RigAccess}; +use trx_protocol::MeterUpdate; use trx_core::radio::freq::Freq; use trx_core::rig::command::RigCommand; use trx_core::rig::controller::{ @@ -113,6 +114,7 @@ pub async fn run_rig_task( config: RigTaskConfig, mut rx: mpsc::Receiver, state_tx: watch::Sender, + meter_tx: broadcast::Sender, mut shutdown_rx: watch::Receiver, ) -> DynResult<()> { let histories = config.histories.clone(); @@ -255,15 +257,25 @@ pub async fn run_rig_task( let _ = state_tx.send(state.clone()); // Run a fast meter tick between full polls to keep the S-meter - // responsive. SDR backends expose a cached DSP reading and can - // refresh every 100 ms; CAT rigs poll the serial link, which is - // slower but still fine at ~150 ms. + // responsive. SDR backends expose a cached DSP reading and can + // refresh every 33 ms (~30 Hz) for instant-feeling metering; CAT + // rigs poll the serial link, which is slower but still usable at + // ~150 ms. + // + // Every tick publishes to `meter_tx` (unconditional, for the fast + // `/meter` stream) and — gated by a small delta — to `state_tx` so + // other frontends that read `RigState.status.rx.sig` keep working + // without drowning the regular `/events` SSE in near-duplicate + // full-state frames at 30 Hz. let is_sdr = rig.as_sdr_ref().is_some(); let meter_tick_duration = if is_sdr { - Duration::from_millis(100) + Duration::from_millis(33) } else { Duration::from_millis(150) }; + let meter_task_start = Instant::now(); + let meter_state_delta_db: f64 = 0.25; + let rig_id = config.rig_id.clone(); let mut meter_tick: std::pin::Pin> = Box::pin(tokio::time::sleep(meter_tick_duration)); @@ -308,8 +320,26 @@ pub async fn run_rig_task( None }; if let Some(db) = new_sig { + // Always publish to the fast broadcast: subscribers + // see every tick, including unchanged values, so the + // UI animation stays smooth. `send` only errors when + // no subscribers exist; that's fine. + let ts_ms = meter_task_start.elapsed().as_millis() as u64; + let _ = meter_tx.send(MeterUpdate { + rig_id: rig_id.clone(), + sig_dbm: db, + ts_ms, + }); + + // Only push into `state_tx` when the sample moved by + // a meaningful amount — otherwise `/events` clients + // would re-serialize the entire RigState at 30 Hz. let prev = state.status.rx.as_ref().and_then(|r| r.sig); - if prev != Some(db) { + let significant = match prev { + Some(p) => (p - db).abs() >= meter_state_delta_db, + None => true, + }; + if significant { state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(db); let _ = state_tx.send(state.clone()); }