Compare commits

...

4 Commits

Author SHA1 Message Date
sjg f9cf95705a [feat](trx-frontend-http): /meter SSE endpoint for instant signal metering
Adds a dedicated /meter SSE stream that wraps the per-rig meter watch
and emits one compact JSON frame per update with no equality gating, so
30 Hz samples reach the browser unthrottled. Registered as a Read-access
route. app.js opens a dedicated EventSource on /meter alongside /events,
writing directly to the signal bar and value on each frame with no
requestAnimationFrame coalescing, starts/stops with connect/disconnect,
and reconnects on rig switch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
2026-04-19 19:50:20 +02:00
sjg fd0f1e43c0 [feat](trx-client): per-rig meter supervisor with auto-reconnect
Adds rig_meters: map of per-rig watch::Sender<Option<MeterUpdate>> to
RigRoutingContext with a lazy rig_meter_rx helper. run_meter_supervisor
polls for known short names and spawns one SubscribeMeter TCP connection
per rig; reconnect loop sets TCP_NODELAY and pushes samples into the
per-rig watch so slow SSE readers automatically skip intermediate frames.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
2026-04-19 19:50:14 +02:00
sjg b12d93fb3c [feat](trx-server): fast per-rig meter broadcast at 30 Hz
Adds a per-rig meter broadcast channel on RigHandle and threads it
through run_rig_task. SDR meter tick drops from 100 ms to 33 ms; every
tick publishes a MeterUpdate while RigState is only updated on
>=0.25 dB deltas so rigctl/JSON-TCP frontends keep working without
amplifying state churn. Listener handles SubscribeMeter by converting
the TCP connection into a one-way JSON-line stream; TCP_NODELAY is
enabled on every accepted socket for low-latency frame delivery.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
2026-04-19 19:50:06 +02:00
sjg 894b7c57be [feat](trx-protocol): add SubscribeMeter command and MeterUpdate DTO
Adds a dedicated one-way JSON-line stream for high-rate signal-strength
samples so meter updates bypass full-RigState diffing in the control path.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
2026-04-19 19:49:57 +02:00
16 changed files with 487 additions and 18 deletions
Generated
+1
View File
@@ -3168,6 +3168,7 @@ dependencies = [
"serde_json",
"tokio",
"trx-core",
"trx-protocol",
"uuid",
]
+1
View File
@@ -386,6 +386,7 @@ async fn async_init() -> DynResult<AppState> {
rig_id_to_short_name,
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: frontend_runtime.routing.sat_passes.clone(),
rig_meters: frontend_runtime.routing.rig_meters.clone(),
};
let state_tx = state_tx.clone();
let remote_shutdown_rx = shutdown_rx.clone();
+191 -1
View File
@@ -20,7 +20,7 @@ use trx_core::{RigError, RigResult};
use trx_frontend::{RemoteRigEntry, SharedSpectrum};
use trx_protocol::rig_command_to_client;
use trx_protocol::types::RigEntry;
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse};
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate};
const DEFAULT_REMOTE_PORT: u16 = 4530;
const DEFAULT_AUDIO_PORT: u16 = 4531;
@@ -77,6 +77,9 @@ pub struct RemoteClientConfig {
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
/// Cached satellite pass predictions from the server (GetSatPasses).
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
/// Per-rig meter watch senders, keyed by short name (or rig_id in legacy mode).
/// Populated lazily by the meter-connection supervisor.
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
}
pub async fn run_remote_client(
@@ -88,6 +91,12 @@ pub async fn run_remote_client(
// Spectrum polling runs on its own dedicated TCP connection so it never
// blocks state polls or user commands on the main connection.
let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone()));
// Meter supervisor: spawns per-rig meter-streaming TCP connections as
// soon as short names are discovered. Runs independently so the meter
// bar in the UI updates at the full server-side 30 Hz without being
// gated on state polls or user commands.
let meter_supervisor =
tokio::spawn(run_meter_supervisor(config.clone(), shutdown_rx.clone()));
let mut reconnect_delay = Duration::from_secs(1);
@@ -95,6 +104,7 @@ pub async fn run_remote_client(
if *shutdown_rx.borrow() {
info!("Remote client shutting down");
spectrum_task.abort();
meter_supervisor.abort();
return Ok(());
}
@@ -159,11 +169,13 @@ pub async fn run_remote_client(
Ok(()) if *shutdown_rx.borrow() => {
info!("Remote client shutting down");
spectrum_task.abort();
meter_supervisor.abort();
return Ok(());
}
Ok(()) => {}
Err(_) => {
spectrum_task.abort();
meter_supervisor.abort();
return Ok(());
}
}
@@ -212,6 +224,177 @@ async fn run_spectrum_connection(
}
}
/// Meter stream supervisor. Watches the set of known short names and spawns
/// one dedicated TCP connection per rig that streams `MeterUpdate` JSON lines
/// (see `trx_protocol::MeterUpdate`). Each per-rig task owns its own watch
/// sender in `config.rig_meters` and reconnects on failure.
async fn run_meter_supervisor(
config: RemoteClientConfig,
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut tasks: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
let mut poll = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = poll.tick() => {}
changed = shutdown_rx.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
for (_, handle) in tasks.drain() {
handle.abort();
}
return;
}
}
}
let known = collect_known_short_names(&config);
for name in &known {
if tasks.contains_key(name) {
continue;
}
// Ensure a watch sender exists so SSE clients can subscribe
// before the first sample arrives.
{
if let Ok(mut map) = config.rig_meters.write() {
map.entry(name.clone())
.or_insert_with(|| watch::channel(None).0);
}
}
let task = tokio::spawn(run_meter_connection(
config.clone(),
name.clone(),
shutdown_rx.clone(),
));
tasks.insert(name.clone(), task);
}
}
}
fn collect_known_short_names(config: &RemoteClientConfig) -> Vec<String> {
let mut names: Vec<String> = Vec::new();
if has_short_names(config) {
names.extend(config.rig_id_to_short_name.values().cloned());
}
if let Ok(map) = config.rig_states.read() {
for name in map.keys() {
if !names.iter().any(|n| n == name) {
names.push(name.clone());
}
}
}
names
}
/// One dedicated TCP connection per short-name that sends `SubscribeMeter` and
/// pumps the server's `MeterUpdate` JSON stream into the per-rig watch sender.
async fn run_meter_connection(
config: RemoteClientConfig,
short_name: String,
mut shutdown_rx: watch::Receiver<bool>,
) {
loop {
if *shutdown_rx.borrow() {
return;
}
let stream = match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
warn!("Meter[{}]: connect failed: {}", short_name, e);
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
return;
}
continue;
}
Err(_) => {
warn!("Meter[{}]: connect timed out", short_name);
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
return;
}
continue;
}
};
let _ = stream.set_nodelay(true);
if let Err(e) = stream_meter(&config, &short_name, stream, &mut shutdown_rx).await {
warn!("Meter[{}]: stream ended: {}", short_name, e);
}
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
return;
}
}
}
async fn wait_reconnect(shutdown_rx: &mut watch::Receiver<bool>, delay: Duration) -> bool {
tokio::select! {
_ = time::sleep(delay) => false,
changed = shutdown_rx.changed() => {
matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow()
}
}
}
async fn stream_meter(
config: &RemoteClientConfig,
short_name: &str,
stream: TcpStream,
shutdown_rx: &mut watch::Receiver<bool>,
) -> RigResult<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let envelope = build_envelope(config, ClientCommand::SubscribeMeter, Some(short_name.to_string()));
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| RigError::communication("meter subscribe write timed out".to_string()))?
.map_err(|e| RigError::communication(format!("meter subscribe write failed: {e}")))?;
time::timeout(IO_TIMEOUT, writer.flush())
.await
.map_err(|_| RigError::communication("meter subscribe flush timed out".to_string()))?
.map_err(|e| RigError::communication(format!("meter subscribe flush failed: {e}")))?;
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => return Ok(()),
Ok(()) => {}
Err(_) => return Ok(()),
}
}
line = read_limited_line(&mut reader, MAX_JSON_LINE_BYTES) => {
let line = line
.map_err(|e| RigError::communication(format!("meter read failed: {e}")))?
.ok_or_else(|| RigError::communication("meter connection closed".to_string()))?;
let trimmed = line.trim_end();
if trimmed.is_empty() {
continue;
}
let update: MeterUpdate = match serde_json::from_str(trimmed) {
Ok(u) => u,
Err(e) => {
warn!("Meter[{}]: bad frame: {}", short_name, e);
continue;
}
};
if let Ok(map) = config.rig_meters.read() {
if let Some(tx) = map.get(short_name) {
// `watch` keeps only the latest value; slow SSE
// readers simply skip intermediate samples, which is
// exactly the desired behaviour for a meter display.
let _ = tx.send(Some(update));
}
}
}
}
}
}
/// Satellite pass prediction refresh runs on a dedicated TCP connection so it
/// never blocks state polls or user commands on the main connection.
/// Fetches immediately on connect, then every 5 minutes.
@@ -1300,6 +1483,7 @@ mod tests {
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
},
req_rx,
state_tx,
@@ -1344,6 +1528,7 @@ mod tests {
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
};
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret"));
@@ -1371,6 +1556,7 @@ mod tests {
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
short_name_to_rig_id,
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
};
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
@@ -1402,6 +1588,7 @@ mod tests {
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
};
// Legacy mode: rig_id passes through unchanged
assert!(!has_short_names(&config));
@@ -1428,6 +1615,7 @@ mod tests {
]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
};
assert!(has_short_names(&config));
assert_eq!(
@@ -1464,6 +1652,7 @@ mod tests {
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
};
let snapshot = sample_snapshot();
let rigs = vec![RigEntry {
@@ -1536,6 +1725,7 @@ mod tests {
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
};
let ids = super::active_spectrum_rig_ids(&config);
+1
View File
@@ -12,4 +12,5 @@ bytes = "1"
uuid = { workspace = true }
serde_json = { workspace = true }
trx-core = { path = "../../trx-core" }
trx-protocol = { path = "../../trx-protocol" }
tokio = { workspace = true, features = ["sync"] }
+25
View File
@@ -22,6 +22,7 @@ use trx_core::decode::{
};
use trx_core::rig::state::{RigSnapshot, SpectrumData};
use trx_core::{DynResult, RigRequest, RigState};
use trx_protocol::MeterUpdate;
/// Shared, timestamped decode history for a single decoder type.
///
@@ -320,6 +321,10 @@ pub struct RigRoutingContext {
pub server_connected: Arc<AtomicBool>,
/// Per-rig server connection state.
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
/// Per-rig meter watch channels, keyed by rig_id. Populated lazily by
/// the meter-connection supervisor in `trx-client`; `None` on the sender
/// side means "no sample yet".
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
}
impl Default for RigRoutingContext {
@@ -331,6 +336,7 @@ impl Default for RigRoutingContext {
rig_states: Arc::new(RwLock::new(HashMap::new())),
server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}
}
}
@@ -447,6 +453,25 @@ impl FrontendRuntimeContext {
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
}
/// Get a watch receiver for a specific rig's meter stream.
/// Lazily inserts a new channel if the rig_id is not yet present so
/// SSE clients can subscribe before the meter-connection supervisor
/// has produced a first sample.
pub fn rig_meter_rx(&self, rig_id: &str) -> watch::Receiver<Option<MeterUpdate>> {
if let Ok(map) = self.routing.rig_meters.read() {
if let Some(tx) = map.get(rig_id) {
return tx.subscribe();
}
}
if let Ok(mut map) = self.routing.rig_meters.write() {
map.entry(rig_id.to_string())
.or_insert_with(|| watch::channel(None).0)
.subscribe()
} else {
watch::channel(None).1
}
}
/// Get a watch receiver for a specific rig's spectrum.
/// Lazily inserts a new channel if the rig_id is not yet present.
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {
@@ -3700,6 +3700,8 @@ function connect() {
if (esHeartbeat) {
clearInterval(esHeartbeat);
}
stopMeterStreaming();
startMeterStreaming();
pollFreshSnapshot();
const eventsUrl = lastActiveRigId
? `/events?remote=${encodeURIComponent(lastActiveRigId)}`
@@ -3778,6 +3780,7 @@ function disconnect() {
decodeSource = null;
}
stopSpectrumStreaming();
stopMeterStreaming();
// Clear timers
if (esHeartbeat) {
clearInterval(esHeartbeat);
@@ -3900,6 +3903,9 @@ async function switchRigFromSelect(selectEl) {
// Reconnect spectrum SSE to the new rig's spectrum channel.
stopSpectrumStreaming();
startSpectrumStreaming();
// Reconnect meter SSE to the new rig's meter channel.
stopMeterStreaming();
startMeterStreaming();
// Reconnect audio to the new rig if audio is active.
if (rxActive) {
stopRxAudio();
@@ -6476,6 +6482,8 @@ const spectrumCenterLeftBtn = document.getElementById("spectrum-center-left-btn"
const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn");
let spectrumSource = null;
let spectrumReconnectTimer = null;
let meterSource = null;
let meterReconnectTimer = null;
let spectrumDrawPending = false;
let spectrumAxisKey = "";
let spectrumDbAxisKey = "";
@@ -6996,6 +7004,63 @@ function stopSpectrumStreaming() {
clearSpectrumCanvas();
}
// ── /meter (fast signal-strength) streaming ─────────────────────────────────
// Dedicated SSE channel pushed at ~30 Hz by trx-server; bypasses /events so
// meter frames are never gated by full-RigState diffing. Synchronous DOM
// write per frame — no rAF coalescing, per user requirement that it "feel
// instant" on the frontend.
function scheduleMeterReconnect() {
if (meterReconnectTimer !== null) return;
meterReconnectTimer = setTimeout(() => {
meterReconnectTimer = null;
startMeterStreaming();
}, 1000);
}
function applyMeterSample(dbm) {
if (typeof dbm !== "number" || !Number.isFinite(dbm)) return;
prevRenderData.sigDbm = dbm;
const sUnits = dbmToSUnits(dbm);
sigLastSUnits = sUnits;
sigLastDbm = dbm;
const pct = sUnits <= 9 ? Math.max(0, Math.min(100, (sUnits / 9) * 100)) : 100;
if (signalBar) signalBar.style.width = `${pct}%`;
if (signalValue) signalValue.innerHTML = formatSignal(sUnits);
refreshSigStrengthDisplay();
}
function startMeterStreaming() {
if (meterSource !== null) return;
const url = lastActiveRigId
? `/meter?remote=${encodeURIComponent(lastActiveRigId)}`
: "/meter";
meterSource = new EventSource(url);
meterSource.onmessage = (evt) => {
try {
const { sig } = JSON.parse(evt.data);
applyMeterSample(sig);
} catch (_) {}
};
meterSource.onerror = () => {
if (meterSource) {
meterSource.close();
meterSource = null;
}
scheduleMeterReconnect();
};
}
function stopMeterStreaming() {
if (meterSource !== null) {
meterSource.close();
meterSource = null;
}
if (meterReconnectTimer !== null) {
clearTimeout(meterReconnectTimer);
meterReconnectTimer = null;
}
}
// ── Rendering ────────────────────────────────────────────────────────────────
function clearSpectrumCanvas() {
if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return;
@@ -574,6 +574,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
// SSE streams
.service(sse::events)
.service(sse::spectrum)
.service(sse::meter)
// Decoder endpoints
.service(decoder::decoder_registry)
.service(decoder::decode_history)
@@ -19,6 +19,7 @@ use uuid::Uuid;
use trx_core::RigState;
use trx_frontend::FrontendRuntimeContext;
use trx_protocol::MeterUpdate;
use crate::server::vchan::ClientChannelManager;
@@ -337,6 +338,62 @@ pub async fn events(
.streaming(stream))
}
// ============================================================================
// /meter SSE endpoint (fast signal-strength stream, ~30 Hz)
// ============================================================================
fn encode_meter_frame(update: &MeterUpdate) -> String {
// Compact JSON: one-line SSE frame, flushed immediately.
// Shape: {"sig":-72.3,"ts":12345}
format!(
"data: {{\"sig\":{:.2},\"ts\":{}}}\n\n",
update.sig_dbm, update.ts_ms
)
}
/// SSE stream for per-rig signal-strength updates.
///
/// Pushed from the server's per-rig meter broadcast; intentionally bypasses
/// the `/events` RigState path so high-rate meter samples are never gated by
/// full-state diffing. Each watch update produces exactly one SSE frame.
#[get("/meter")]
pub async fn meter(
query: web::Query<RemoteQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
let rig_id = query.remote.clone().filter(|s| !s.is_empty()).or_else(|| {
context
.routing
.active_rig_id
.lock()
.ok()
.and_then(|g| g.clone())
});
let rx = match rig_id.as_deref() {
Some(rid) => context.rig_meter_rx(rid),
None => return Ok(HttpResponse::NotFound().finish()),
};
let updates = WatchStream::new(rx).filter_map(|maybe| {
let chunk = maybe.as_ref().map(encode_meter_frame);
std::future::ready(chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s))))
});
// Infrequent keepalive comment; real meter frames carry the heartbeat.
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
let stream = select(pings, updates);
Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
.insert_header((header::CONTENT_ENCODING, "identity"))
.insert_header((header::CACHE_CONTROL, "no-cache"))
.insert_header((header::CONNECTION, "keep-alive"))
.streaming(stream))
}
// ============================================================================
// /spectrum SSE endpoint
// ============================================================================
@@ -526,6 +526,7 @@ impl RouteAccess {
|| path == "/decode"
|| path == "/decode/history"
|| path == "/spectrum"
|| path == "/meter"
|| path == "/audio"
|| path == "/bookmarks"
|| path.starts_with("/status?")
@@ -534,6 +535,7 @@ impl RouteAccess {
|| path.starts_with("/decode?")
|| path.starts_with("/decode/history?")
|| path.starts_with("/spectrum?")
|| path.starts_with("/meter?")
|| path.starts_with("/audio?")
|| path.starts_with("/bookmarks?")
|| path.starts_with("/bookmarks/")
@@ -703,6 +705,7 @@ mod tests {
assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/meter"), RouteAccess::Read);
assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read);
}
+1 -1
View File
@@ -18,4 +18,4 @@ pub use auth::{NoAuthValidator, SimpleTokenValidator, TokenValidator};
pub use codec::{mode_to_string, parse_envelope, parse_mode};
pub use decoders::{DecoderActivation, DecoderDescriptor, DECODER_REGISTRY};
pub use mapping::{client_command_to_rig, rig_command_to_client};
pub use types::{ClientCommand, ClientEnvelope, ClientResponse, RigEntry};
pub use types::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate, RigEntry};
+1 -1
View File
@@ -102,7 +102,7 @@ macro_rules! define_command_mapping {
define_command_mapping! {
// ── Client-only variants (no RigCommand counterpart) ─────────────
client_only: GetRigs, GetSatPasses;
client_only: GetRigs, GetSatPasses, SubscribeMeter;
// ── Unit variants (no payload) ───────────────────────────────────
unit:
+18
View File
@@ -61,6 +61,24 @@ pub enum ClientCommand {
SetSamCarrierSync { enabled: bool },
SetRecorderEnabled { enabled: bool },
GetSpectrum,
/// Subscribe to a per-rig meter stream on this connection. After the
/// server receives this command, the connection becomes a one-way flow of
/// newline-delimited `MeterUpdate` JSON frames and no further commands or
/// regular responses are sent. Intended for a dedicated TCP connection.
SubscribeMeter,
}
/// Fast meter sample pushed by the server on a dedicated meter stream.
///
/// Emitted at ~30 Hz for SDR backends and ~67 Hz for CAT backends.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MeterUpdate {
/// Rig identifier this sample belongs to.
pub rig_id: String,
/// Receive signal strength in dBm (rig/DSP-reported).
pub sig_dbm: f64,
/// Monotonic millisecond timestamp from the server's steady clock.
pub ts_ms: u64,
}
/// Envelope for client commands with optional authentication token and rig routing.
+63 -5
View File
@@ -267,6 +267,10 @@ async fn handle_client(
sat_pass_cache,
timeouts,
} = ctx;
// Disable Nagle so small frames (command responses, meter samples) ship
// immediately instead of sitting in the kernel's send buffer for up to
// ~40 ms waiting for more payload.
let _ = socket.set_nodelay(true);
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
@@ -473,6 +477,55 @@ async fn handle_client(
}
};
// SubscribeMeter: turns this connection into a one-way meter stream.
// No regular responses are produced; the connection lives until the
// client disconnects or shutdown fires.
if matches!(envelope.cmd, ClientCommand::SubscribeMeter) {
let mut meter_rx = handle.meter_tx.subscribe();
let io_timeout = timeouts.io_timeout;
info!(
"Client {} subscribed to meter stream for rig '{}'",
addr, target_rig_id
);
loop {
tokio::select! {
sample = meter_rx.recv() => {
match sample {
Ok(update) => {
let Ok(mut line) = serde_json::to_string(&update) else { continue };
line.push('\n');
let write = time::timeout(
io_timeout,
writer.write_all(line.as_bytes()),
).await;
match write {
Ok(Ok(())) => {}
Ok(Err(e)) => {
info!("Client {} meter write failed: {}", addr, e);
break;
}
Err(_) => {
info!("Client {} meter write timed out", addr);
break;
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => break,
Ok(()) => {}
Err(_) => break,
}
}
}
}
break;
}
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
// Fast path: serve GetSnapshot directly from the watch channel
// so clients get a response even while the rig task is initializing.
@@ -669,12 +722,14 @@ mod tests {
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
let (state_tx, state_rx) = watch::channel(state);
let _state_tx = state_tx;
let (meter_tx, _) = tokio::sync::broadcast::channel(8);
let handle = RigHandle {
rig_id: "default".to_string(),
display_name: "Default Rig".to_string(),
rig_tx,
state_rx,
audio_port: 4531,
meter_tx,
};
let mut map = HashMap::new();
map.insert("default".to_string(), handle);
@@ -858,33 +913,36 @@ mod tests {
/// Build a multi-rig HashMap with two rigs having independent state and
/// command channels. Returns the map, default rig id, and the mpsc
/// receivers for each rig so tests can inspect routed commands.
fn make_two_rigs(
state_a: RigState,
state_b: RigState,
) -> (
type TwoRigs = (
Arc<HashMap<String, RigHandle>>,
String,
mpsc::Receiver<RigRequest>,
mpsc::Receiver<RigRequest>,
) {
);
fn make_two_rigs(state_a: RigState, state_b: RigState) -> TwoRigs {
let (tx_a, rx_a) = mpsc::channel::<RigRequest>(8);
let (_state_tx_a, state_rx_a) = watch::channel(state_a);
let (meter_tx_a, _) = tokio::sync::broadcast::channel(8);
let handle_a = RigHandle {
rig_id: "rig_hf".to_string(),
display_name: "HF Rig".to_string(),
rig_tx: tx_a,
state_rx: state_rx_a,
audio_port: 4531,
meter_tx: meter_tx_a,
};
let (tx_b, rx_b) = mpsc::channel::<RigRequest>(8);
let (_state_tx_b, state_rx_b) = watch::channel(state_b);
let (meter_tx_b, _) = tokio::sync::broadcast::channel(8);
let handle_b = RigHandle {
rig_id: "rig_vhf".to_string(),
display_name: "VHF Rig".to_string(),
rig_tx: tx_b,
state_rx: state_rx_b,
audio_port: 4532,
meter_tx: meter_tx_b,
};
let mut map = HashMap::new();
+12 -3
View File
@@ -1075,6 +1075,8 @@ async fn main() -> DynResult<()> {
Some("Disabled".to_string())
};
let (state_tx, state_rx) = watch::channel(initial_state);
let (meter_tx, _) =
broadcast::channel::<trx_protocol::MeterUpdate>(rig_handle::METER_BROADCAST_CAPACITY);
let mut task_config = build_rig_task_config(
rig_cfg,
@@ -1101,10 +1103,16 @@ async fn main() -> DynResult<()> {
// silently losing the rig.
let rig_shutdown_rx = shutdown_rx.clone();
let rig_id_supervisor = rig_cfg.id.clone();
let meter_tx_task = meter_tx.clone();
task_handles.push(tokio::spawn(async move {
let result =
rig_task::run_rig_task(task_config, rig_rx, state_tx.clone(), rig_shutdown_rx)
.await;
let result = rig_task::run_rig_task(
task_config,
rig_rx,
state_tx.clone(),
meter_tx_task,
rig_shutdown_rx,
)
.await;
match result {
Ok(()) => {
info!("[{}] Rig task exited cleanly", rig_id_supervisor);
@@ -1153,6 +1161,7 @@ async fn main() -> DynResult<()> {
rig_tx,
state_rx,
audio_port: rig_cfg.audio.port,
meter_tx,
},
);
}
+11 -1
View File
@@ -4,10 +4,16 @@
//! Thin handle giving the listener access to one rig's task and state.
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use trx_core::rig::request::RigRequest;
use trx_core::rig::state::RigState;
use trx_protocol::MeterUpdate;
/// Bounded broadcast capacity for the meter stream. Keeps ~0.5 s of buffered
/// samples at 30 Hz — more than enough slack to tolerate a scheduling blip
/// without forcing the producer to block or drop silently.
pub const METER_BROADCAST_CAPACITY: usize = 16;
/// A handle to a single running rig backend.
///
@@ -24,4 +30,8 @@ pub struct RigHandle {
pub state_rx: watch::Receiver<RigState>,
/// Per-rig audio listener TCP port.
pub audio_port: u16,
/// Fast per-rig meter samples published by `rig_task` at ~30 Hz (SDR) or
/// ~67 Hz (CAT). Consumed by `SubscribeMeter` clients; independent of
/// the slower `state_rx` snapshot path.
pub meter_tx: broadcast::Sender<MeterUpdate>,
}
+36 -6
View File
@@ -7,11 +7,12 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use tokio::time::{self, Instant};
use tracing::{debug, error, info, warn};
use trx_backend::{RegistrationContext, RigAccess};
use trx_protocol::MeterUpdate;
use trx_core::radio::freq::Freq;
use trx_core::rig::command::RigCommand;
use trx_core::rig::controller::{
@@ -113,6 +114,7 @@ pub async fn run_rig_task(
config: RigTaskConfig,
mut rx: mpsc::Receiver<RigRequest>,
state_tx: watch::Sender<RigState>,
meter_tx: broadcast::Sender<MeterUpdate>,
mut shutdown_rx: watch::Receiver<bool>,
) -> DynResult<()> {
let histories = config.histories.clone();
@@ -255,15 +257,25 @@ pub async fn run_rig_task(
let _ = state_tx.send(state.clone());
// Run a fast meter tick between full polls to keep the S-meter
// responsive. SDR backends expose a cached DSP reading and can
// refresh every 100 ms; CAT rigs poll the serial link, which is
// slower but still fine at ~150 ms.
// responsive. SDR backends expose a cached DSP reading and can
// refresh every 33 ms (~30 Hz) for instant-feeling metering; CAT
// rigs poll the serial link, which is slower but still usable at
// ~150 ms.
//
// Every tick publishes to `meter_tx` (unconditional, for the fast
// `/meter` stream) and — gated by a small delta — to `state_tx` so
// other frontends that read `RigState.status.rx.sig` keep working
// without drowning the regular `/events` SSE in near-duplicate
// full-state frames at 30 Hz.
let is_sdr = rig.as_sdr_ref().is_some();
let meter_tick_duration = if is_sdr {
Duration::from_millis(100)
Duration::from_millis(33)
} else {
Duration::from_millis(150)
};
let meter_task_start = Instant::now();
let meter_state_delta_db: f64 = 0.25;
let rig_id = config.rig_id.clone();
let mut meter_tick: std::pin::Pin<Box<tokio::time::Sleep>> =
Box::pin(tokio::time::sleep(meter_tick_duration));
@@ -308,8 +320,26 @@ pub async fn run_rig_task(
None
};
if let Some(db) = new_sig {
// Always publish to the fast broadcast: subscribers
// see every tick, including unchanged values, so the
// UI animation stays smooth. `send` only errors when
// no subscribers exist; that's fine.
let ts_ms = meter_task_start.elapsed().as_millis() as u64;
let _ = meter_tx.send(MeterUpdate {
rig_id: rig_id.clone(),
sig_dbm: db,
ts_ms,
});
// Only push into `state_tx` when the sample moved by
// a meaningful amount — otherwise `/events` clients
// would re-serialize the entire RigState at 30 Hz.
let prev = state.status.rx.as_ref().and_then(|r| r.sig);
if prev != Some(db) {
let significant = match prev {
Some(p) => (p - db).abs() >= meter_state_delta_db,
None => true,
};
if significant {
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(db);
let _ = state_tx.send(state.clone());
}