Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
f9cf95705a
|
|||
|
fd0f1e43c0
|
|||
|
b12d93fb3c
|
|||
|
894b7c57be
|
Generated
+1
@@ -3168,6 +3168,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"trx-core",
|
||||
"trx-protocol",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
||||
@@ -386,6 +386,7 @@ async fn async_init() -> DynResult<AppState> {
|
||||
rig_id_to_short_name,
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: frontend_runtime.routing.sat_passes.clone(),
|
||||
rig_meters: frontend_runtime.routing.rig_meters.clone(),
|
||||
};
|
||||
let state_tx = state_tx.clone();
|
||||
let remote_shutdown_rx = shutdown_rx.clone();
|
||||
|
||||
@@ -20,7 +20,7 @@ use trx_core::{RigError, RigResult};
|
||||
use trx_frontend::{RemoteRigEntry, SharedSpectrum};
|
||||
use trx_protocol::rig_command_to_client;
|
||||
use trx_protocol::types::RigEntry;
|
||||
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse};
|
||||
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate};
|
||||
|
||||
const DEFAULT_REMOTE_PORT: u16 = 4530;
|
||||
const DEFAULT_AUDIO_PORT: u16 = 4531;
|
||||
@@ -77,6 +77,9 @@ pub struct RemoteClientConfig {
|
||||
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
|
||||
/// Cached satellite pass predictions from the server (GetSatPasses).
|
||||
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
|
||||
/// Per-rig meter watch senders, keyed by short name (or rig_id in legacy mode).
|
||||
/// Populated lazily by the meter-connection supervisor.
|
||||
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
|
||||
}
|
||||
|
||||
pub async fn run_remote_client(
|
||||
@@ -88,6 +91,12 @@ pub async fn run_remote_client(
|
||||
// Spectrum polling runs on its own dedicated TCP connection so it never
|
||||
// blocks state polls or user commands on the main connection.
|
||||
let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone()));
|
||||
// Meter supervisor: spawns per-rig meter-streaming TCP connections as
|
||||
// soon as short names are discovered. Runs independently so the meter
|
||||
// bar in the UI updates at the full server-side 30 Hz without being
|
||||
// gated on state polls or user commands.
|
||||
let meter_supervisor =
|
||||
tokio::spawn(run_meter_supervisor(config.clone(), shutdown_rx.clone()));
|
||||
|
||||
let mut reconnect_delay = Duration::from_secs(1);
|
||||
|
||||
@@ -95,6 +104,7 @@ pub async fn run_remote_client(
|
||||
if *shutdown_rx.borrow() {
|
||||
info!("Remote client shutting down");
|
||||
spectrum_task.abort();
|
||||
meter_supervisor.abort();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -159,11 +169,13 @@ pub async fn run_remote_client(
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
info!("Remote client shutting down");
|
||||
spectrum_task.abort();
|
||||
meter_supervisor.abort();
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
spectrum_task.abort();
|
||||
meter_supervisor.abort();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@@ -212,6 +224,177 @@ async fn run_spectrum_connection(
|
||||
}
|
||||
}
|
||||
|
||||
/// Meter stream supervisor. Watches the set of known short names and spawns
|
||||
/// one dedicated TCP connection per rig that streams `MeterUpdate` JSON lines
|
||||
/// (see `trx_protocol::MeterUpdate`). Each per-rig task owns its own watch
|
||||
/// sender in `config.rig_meters` and reconnects on failure.
|
||||
async fn run_meter_supervisor(
|
||||
config: RemoteClientConfig,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) {
|
||||
let mut tasks: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
|
||||
let mut poll = time::interval(Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = poll.tick() => {}
|
||||
changed = shutdown_rx.changed() => {
|
||||
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
|
||||
for (_, handle) in tasks.drain() {
|
||||
handle.abort();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let known = collect_known_short_names(&config);
|
||||
for name in &known {
|
||||
if tasks.contains_key(name) {
|
||||
continue;
|
||||
}
|
||||
// Ensure a watch sender exists so SSE clients can subscribe
|
||||
// before the first sample arrives.
|
||||
{
|
||||
if let Ok(mut map) = config.rig_meters.write() {
|
||||
map.entry(name.clone())
|
||||
.or_insert_with(|| watch::channel(None).0);
|
||||
}
|
||||
}
|
||||
let task = tokio::spawn(run_meter_connection(
|
||||
config.clone(),
|
||||
name.clone(),
|
||||
shutdown_rx.clone(),
|
||||
));
|
||||
tasks.insert(name.clone(), task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_known_short_names(config: &RemoteClientConfig) -> Vec<String> {
|
||||
let mut names: Vec<String> = Vec::new();
|
||||
if has_short_names(config) {
|
||||
names.extend(config.rig_id_to_short_name.values().cloned());
|
||||
}
|
||||
if let Ok(map) = config.rig_states.read() {
|
||||
for name in map.keys() {
|
||||
if !names.iter().any(|n| n == name) {
|
||||
names.push(name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
names
|
||||
}
|
||||
|
||||
/// One dedicated TCP connection per short-name that sends `SubscribeMeter` and
|
||||
/// pumps the server's `MeterUpdate` JSON stream into the per-rig watch sender.
|
||||
async fn run_meter_connection(
|
||||
config: RemoteClientConfig,
|
||||
short_name: String,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) {
|
||||
loop {
|
||||
if *shutdown_rx.borrow() {
|
||||
return;
|
||||
}
|
||||
|
||||
let stream = match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
warn!("Meter[{}]: connect failed: {}", short_name, e);
|
||||
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Meter[{}]: connect timed out", short_name);
|
||||
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let _ = stream.set_nodelay(true);
|
||||
|
||||
if let Err(e) = stream_meter(&config, &short_name, stream, &mut shutdown_rx).await {
|
||||
warn!("Meter[{}]: stream ended: {}", short_name, e);
|
||||
}
|
||||
|
||||
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_reconnect(shutdown_rx: &mut watch::Receiver<bool>, delay: Duration) -> bool {
|
||||
tokio::select! {
|
||||
_ = time::sleep(delay) => false,
|
||||
changed = shutdown_rx.changed() => {
|
||||
matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_meter(
|
||||
config: &RemoteClientConfig,
|
||||
short_name: &str,
|
||||
stream: TcpStream,
|
||||
shutdown_rx: &mut watch::Receiver<bool>,
|
||||
) -> RigResult<()> {
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
|
||||
let envelope = build_envelope(config, ClientCommand::SubscribeMeter, Some(short_name.to_string()));
|
||||
let mut payload = serde_json::to_string(&envelope)
|
||||
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
||||
payload.push('\n');
|
||||
time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
|
||||
.await
|
||||
.map_err(|_| RigError::communication("meter subscribe write timed out".to_string()))?
|
||||
.map_err(|e| RigError::communication(format!("meter subscribe write failed: {e}")))?;
|
||||
time::timeout(IO_TIMEOUT, writer.flush())
|
||||
.await
|
||||
.map_err(|_| RigError::communication("meter subscribe flush timed out".to_string()))?
|
||||
.map_err(|e| RigError::communication(format!("meter subscribe flush failed: {e}")))?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => return Ok(()),
|
||||
Ok(()) => {}
|
||||
Err(_) => return Ok(()),
|
||||
}
|
||||
}
|
||||
line = read_limited_line(&mut reader, MAX_JSON_LINE_BYTES) => {
|
||||
let line = line
|
||||
.map_err(|e| RigError::communication(format!("meter read failed: {e}")))?
|
||||
.ok_or_else(|| RigError::communication("meter connection closed".to_string()))?;
|
||||
let trimmed = line.trim_end();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let update: MeterUpdate = match serde_json::from_str(trimmed) {
|
||||
Ok(u) => u,
|
||||
Err(e) => {
|
||||
warn!("Meter[{}]: bad frame: {}", short_name, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Ok(map) = config.rig_meters.read() {
|
||||
if let Some(tx) = map.get(short_name) {
|
||||
// `watch` keeps only the latest value; slow SSE
|
||||
// readers simply skip intermediate samples, which is
|
||||
// exactly the desired behaviour for a meter display.
|
||||
let _ = tx.send(Some(update));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Satellite pass prediction refresh runs on a dedicated TCP connection so it
|
||||
/// never blocks state polls or user commands on the main connection.
|
||||
/// Fetches immediately on connect, then every 5 minutes.
|
||||
@@ -1300,6 +1483,7 @@ mod tests {
|
||||
rig_id_to_short_name: HashMap::new(),
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
},
|
||||
req_rx,
|
||||
state_tx,
|
||||
@@ -1344,6 +1528,7 @@ mod tests {
|
||||
rig_id_to_short_name: HashMap::new(),
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
|
||||
assert_eq!(envelope.token.as_deref(), Some("secret"));
|
||||
@@ -1371,6 +1556,7 @@ mod tests {
|
||||
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
|
||||
short_name_to_rig_id,
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
|
||||
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
|
||||
@@ -1402,6 +1588,7 @@ mod tests {
|
||||
rig_id_to_short_name: HashMap::new(),
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
// Legacy mode: rig_id passes through unchanged
|
||||
assert!(!has_short_names(&config));
|
||||
@@ -1428,6 +1615,7 @@ mod tests {
|
||||
]),
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
assert!(has_short_names(&config));
|
||||
assert_eq!(
|
||||
@@ -1464,6 +1652,7 @@ mod tests {
|
||||
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
let snapshot = sample_snapshot();
|
||||
let rigs = vec![RigEntry {
|
||||
@@ -1536,6 +1725,7 @@ mod tests {
|
||||
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
|
||||
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
|
||||
sat_passes: Arc::new(RwLock::new(None)),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
|
||||
let ids = super::active_spectrum_rig_ids(&config);
|
||||
|
||||
@@ -12,4 +12,5 @@ bytes = "1"
|
||||
uuid = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
trx-core = { path = "../../trx-core" }
|
||||
trx-protocol = { path = "../../trx-protocol" }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
|
||||
@@ -22,6 +22,7 @@ use trx_core::decode::{
|
||||
};
|
||||
use trx_core::rig::state::{RigSnapshot, SpectrumData};
|
||||
use trx_core::{DynResult, RigRequest, RigState};
|
||||
use trx_protocol::MeterUpdate;
|
||||
|
||||
/// Shared, timestamped decode history for a single decoder type.
|
||||
///
|
||||
@@ -320,6 +321,10 @@ pub struct RigRoutingContext {
|
||||
pub server_connected: Arc<AtomicBool>,
|
||||
/// Per-rig server connection state.
|
||||
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
|
||||
/// Per-rig meter watch channels, keyed by rig_id. Populated lazily by
|
||||
/// the meter-connection supervisor in `trx-client`; `None` on the sender
|
||||
/// side means "no sample yet".
|
||||
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
|
||||
}
|
||||
|
||||
impl Default for RigRoutingContext {
|
||||
@@ -331,6 +336,7 @@ impl Default for RigRoutingContext {
|
||||
rig_states: Arc::new(RwLock::new(HashMap::new())),
|
||||
server_connected: Arc::new(AtomicBool::new(false)),
|
||||
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
|
||||
rig_meters: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -447,6 +453,25 @@ impl FrontendRuntimeContext {
|
||||
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
|
||||
}
|
||||
|
||||
/// Get a watch receiver for a specific rig's meter stream.
|
||||
/// Lazily inserts a new channel if the rig_id is not yet present so
|
||||
/// SSE clients can subscribe before the meter-connection supervisor
|
||||
/// has produced a first sample.
|
||||
pub fn rig_meter_rx(&self, rig_id: &str) -> watch::Receiver<Option<MeterUpdate>> {
|
||||
if let Ok(map) = self.routing.rig_meters.read() {
|
||||
if let Some(tx) = map.get(rig_id) {
|
||||
return tx.subscribe();
|
||||
}
|
||||
}
|
||||
if let Ok(mut map) = self.routing.rig_meters.write() {
|
||||
map.entry(rig_id.to_string())
|
||||
.or_insert_with(|| watch::channel(None).0)
|
||||
.subscribe()
|
||||
} else {
|
||||
watch::channel(None).1
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a watch receiver for a specific rig's spectrum.
|
||||
/// Lazily inserts a new channel if the rig_id is not yet present.
|
||||
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {
|
||||
|
||||
@@ -3700,6 +3700,8 @@ function connect() {
|
||||
if (esHeartbeat) {
|
||||
clearInterval(esHeartbeat);
|
||||
}
|
||||
stopMeterStreaming();
|
||||
startMeterStreaming();
|
||||
pollFreshSnapshot();
|
||||
const eventsUrl = lastActiveRigId
|
||||
? `/events?remote=${encodeURIComponent(lastActiveRigId)}`
|
||||
@@ -3778,6 +3780,7 @@ function disconnect() {
|
||||
decodeSource = null;
|
||||
}
|
||||
stopSpectrumStreaming();
|
||||
stopMeterStreaming();
|
||||
// Clear timers
|
||||
if (esHeartbeat) {
|
||||
clearInterval(esHeartbeat);
|
||||
@@ -3900,6 +3903,9 @@ async function switchRigFromSelect(selectEl) {
|
||||
// Reconnect spectrum SSE to the new rig's spectrum channel.
|
||||
stopSpectrumStreaming();
|
||||
startSpectrumStreaming();
|
||||
// Reconnect meter SSE to the new rig's meter channel.
|
||||
stopMeterStreaming();
|
||||
startMeterStreaming();
|
||||
// Reconnect audio to the new rig if audio is active.
|
||||
if (rxActive) {
|
||||
stopRxAudio();
|
||||
@@ -6476,6 +6482,8 @@ const spectrumCenterLeftBtn = document.getElementById("spectrum-center-left-btn"
|
||||
const spectrumCenterRightBtn = document.getElementById("spectrum-center-right-btn");
|
||||
let spectrumSource = null;
|
||||
let spectrumReconnectTimer = null;
|
||||
let meterSource = null;
|
||||
let meterReconnectTimer = null;
|
||||
let spectrumDrawPending = false;
|
||||
let spectrumAxisKey = "";
|
||||
let spectrumDbAxisKey = "";
|
||||
@@ -6996,6 +7004,63 @@ function stopSpectrumStreaming() {
|
||||
clearSpectrumCanvas();
|
||||
}
|
||||
|
||||
// ── /meter (fast signal-strength) streaming ─────────────────────────────────
|
||||
// Dedicated SSE channel pushed at ~30 Hz by trx-server; bypasses /events so
|
||||
// meter frames are never gated by full-RigState diffing. Synchronous DOM
|
||||
// write per frame — no rAF coalescing, per user requirement that it "feel
|
||||
// instant" on the frontend.
|
||||
function scheduleMeterReconnect() {
|
||||
if (meterReconnectTimer !== null) return;
|
||||
meterReconnectTimer = setTimeout(() => {
|
||||
meterReconnectTimer = null;
|
||||
startMeterStreaming();
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
function applyMeterSample(dbm) {
|
||||
if (typeof dbm !== "number" || !Number.isFinite(dbm)) return;
|
||||
prevRenderData.sigDbm = dbm;
|
||||
const sUnits = dbmToSUnits(dbm);
|
||||
sigLastSUnits = sUnits;
|
||||
sigLastDbm = dbm;
|
||||
const pct = sUnits <= 9 ? Math.max(0, Math.min(100, (sUnits / 9) * 100)) : 100;
|
||||
if (signalBar) signalBar.style.width = `${pct}%`;
|
||||
if (signalValue) signalValue.innerHTML = formatSignal(sUnits);
|
||||
refreshSigStrengthDisplay();
|
||||
}
|
||||
|
||||
function startMeterStreaming() {
|
||||
if (meterSource !== null) return;
|
||||
const url = lastActiveRigId
|
||||
? `/meter?remote=${encodeURIComponent(lastActiveRigId)}`
|
||||
: "/meter";
|
||||
meterSource = new EventSource(url);
|
||||
meterSource.onmessage = (evt) => {
|
||||
try {
|
||||
const { sig } = JSON.parse(evt.data);
|
||||
applyMeterSample(sig);
|
||||
} catch (_) {}
|
||||
};
|
||||
meterSource.onerror = () => {
|
||||
if (meterSource) {
|
||||
meterSource.close();
|
||||
meterSource = null;
|
||||
}
|
||||
scheduleMeterReconnect();
|
||||
};
|
||||
}
|
||||
|
||||
function stopMeterStreaming() {
|
||||
if (meterSource !== null) {
|
||||
meterSource.close();
|
||||
meterSource = null;
|
||||
}
|
||||
if (meterReconnectTimer !== null) {
|
||||
clearTimeout(meterReconnectTimer);
|
||||
meterReconnectTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Rendering ────────────────────────────────────────────────────────────────
|
||||
function clearSpectrumCanvas() {
|
||||
if (!spectrumCanvas || !spectrumGl || !spectrumGl.ready) return;
|
||||
|
||||
@@ -574,6 +574,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
// SSE streams
|
||||
.service(sse::events)
|
||||
.service(sse::spectrum)
|
||||
.service(sse::meter)
|
||||
// Decoder endpoints
|
||||
.service(decoder::decoder_registry)
|
||||
.service(decoder::decode_history)
|
||||
|
||||
@@ -19,6 +19,7 @@ use uuid::Uuid;
|
||||
|
||||
use trx_core::RigState;
|
||||
use trx_frontend::FrontendRuntimeContext;
|
||||
use trx_protocol::MeterUpdate;
|
||||
|
||||
use crate::server::vchan::ClientChannelManager;
|
||||
|
||||
@@ -337,6 +338,62 @@ pub async fn events(
|
||||
.streaming(stream))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// /meter SSE endpoint (fast signal-strength stream, ~30 Hz)
|
||||
// ============================================================================
|
||||
|
||||
fn encode_meter_frame(update: &MeterUpdate) -> String {
|
||||
// Compact JSON: one-line SSE frame, flushed immediately.
|
||||
// Shape: {"sig":-72.3,"ts":12345}
|
||||
format!(
|
||||
"data: {{\"sig\":{:.2},\"ts\":{}}}\n\n",
|
||||
update.sig_dbm, update.ts_ms
|
||||
)
|
||||
}
|
||||
|
||||
/// SSE stream for per-rig signal-strength updates.
|
||||
///
|
||||
/// Pushed from the server's per-rig meter broadcast; intentionally bypasses
|
||||
/// the `/events` RigState path so high-rate meter samples are never gated by
|
||||
/// full-state diffing. Each watch update produces exactly one SSE frame.
|
||||
#[get("/meter")]
|
||||
pub async fn meter(
|
||||
query: web::Query<RemoteQuery>,
|
||||
context: web::Data<Arc<FrontendRuntimeContext>>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let rig_id = query.remote.clone().filter(|s| !s.is_empty()).or_else(|| {
|
||||
context
|
||||
.routing
|
||||
.active_rig_id
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|g| g.clone())
|
||||
});
|
||||
|
||||
let rx = match rig_id.as_deref() {
|
||||
Some(rid) => context.rig_meter_rx(rid),
|
||||
None => return Ok(HttpResponse::NotFound().finish()),
|
||||
};
|
||||
|
||||
let updates = WatchStream::new(rx).filter_map(|maybe| {
|
||||
let chunk = maybe.as_ref().map(encode_meter_frame);
|
||||
std::future::ready(chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s))))
|
||||
});
|
||||
|
||||
// Infrequent keepalive comment; real meter frames carry the heartbeat.
|
||||
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
||||
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
||||
|
||||
let stream = select(pings, updates);
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
|
||||
.insert_header((header::CONTENT_ENCODING, "identity"))
|
||||
.insert_header((header::CACHE_CONTROL, "no-cache"))
|
||||
.insert_header((header::CONNECTION, "keep-alive"))
|
||||
.streaming(stream))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// /spectrum SSE endpoint
|
||||
// ============================================================================
|
||||
|
||||
@@ -526,6 +526,7 @@ impl RouteAccess {
|
||||
|| path == "/decode"
|
||||
|| path == "/decode/history"
|
||||
|| path == "/spectrum"
|
||||
|| path == "/meter"
|
||||
|| path == "/audio"
|
||||
|| path == "/bookmarks"
|
||||
|| path.starts_with("/status?")
|
||||
@@ -534,6 +535,7 @@ impl RouteAccess {
|
||||
|| path.starts_with("/decode?")
|
||||
|| path.starts_with("/decode/history?")
|
||||
|| path.starts_with("/spectrum?")
|
||||
|| path.starts_with("/meter?")
|
||||
|| path.starts_with("/audio?")
|
||||
|| path.starts_with("/bookmarks?")
|
||||
|| path.starts_with("/bookmarks/")
|
||||
@@ -703,6 +705,7 @@ mod tests {
|
||||
assert_eq!(RouteAccess::from_path("/events"), RouteAccess::Read);
|
||||
assert_eq!(RouteAccess::from_path("/decode"), RouteAccess::Read);
|
||||
assert_eq!(RouteAccess::from_path("/spectrum"), RouteAccess::Read);
|
||||
assert_eq!(RouteAccess::from_path("/meter"), RouteAccess::Read);
|
||||
assert_eq!(RouteAccess::from_path("/audio"), RouteAccess::Read);
|
||||
}
|
||||
|
||||
|
||||
@@ -18,4 +18,4 @@ pub use auth::{NoAuthValidator, SimpleTokenValidator, TokenValidator};
|
||||
pub use codec::{mode_to_string, parse_envelope, parse_mode};
|
||||
pub use decoders::{DecoderActivation, DecoderDescriptor, DECODER_REGISTRY};
|
||||
pub use mapping::{client_command_to_rig, rig_command_to_client};
|
||||
pub use types::{ClientCommand, ClientEnvelope, ClientResponse, RigEntry};
|
||||
pub use types::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate, RigEntry};
|
||||
|
||||
@@ -102,7 +102,7 @@ macro_rules! define_command_mapping {
|
||||
|
||||
define_command_mapping! {
|
||||
// ── Client-only variants (no RigCommand counterpart) ─────────────
|
||||
client_only: GetRigs, GetSatPasses;
|
||||
client_only: GetRigs, GetSatPasses, SubscribeMeter;
|
||||
|
||||
// ── Unit variants (no payload) ───────────────────────────────────
|
||||
unit:
|
||||
|
||||
@@ -61,6 +61,24 @@ pub enum ClientCommand {
|
||||
SetSamCarrierSync { enabled: bool },
|
||||
SetRecorderEnabled { enabled: bool },
|
||||
GetSpectrum,
|
||||
/// Subscribe to a per-rig meter stream on this connection. After the
|
||||
/// server receives this command, the connection becomes a one-way flow of
|
||||
/// newline-delimited `MeterUpdate` JSON frames and no further commands or
|
||||
/// regular responses are sent. Intended for a dedicated TCP connection.
|
||||
SubscribeMeter,
|
||||
}
|
||||
|
||||
/// Fast meter sample pushed by the server on a dedicated meter stream.
|
||||
///
|
||||
/// Emitted at ~30 Hz for SDR backends and ~6–7 Hz for CAT backends.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct MeterUpdate {
|
||||
/// Rig identifier this sample belongs to.
|
||||
pub rig_id: String,
|
||||
/// Receive signal strength in dBm (rig/DSP-reported).
|
||||
pub sig_dbm: f64,
|
||||
/// Monotonic millisecond timestamp from the server's steady clock.
|
||||
pub ts_ms: u64,
|
||||
}
|
||||
|
||||
/// Envelope for client commands with optional authentication token and rig routing.
|
||||
|
||||
@@ -267,6 +267,10 @@ async fn handle_client(
|
||||
sat_pass_cache,
|
||||
timeouts,
|
||||
} = ctx;
|
||||
// Disable Nagle so small frames (command responses, meter samples) ship
|
||||
// immediately instead of sitting in the kernel's send buffer for up to
|
||||
// ~40 ms waiting for more payload.
|
||||
let _ = socket.set_nodelay(true);
|
||||
let (reader, mut writer) = socket.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
|
||||
@@ -473,6 +477,55 @@ async fn handle_client(
|
||||
}
|
||||
};
|
||||
|
||||
// SubscribeMeter: turns this connection into a one-way meter stream.
|
||||
// No regular responses are produced; the connection lives until the
|
||||
// client disconnects or shutdown fires.
|
||||
if matches!(envelope.cmd, ClientCommand::SubscribeMeter) {
|
||||
let mut meter_rx = handle.meter_tx.subscribe();
|
||||
let io_timeout = timeouts.io_timeout;
|
||||
info!(
|
||||
"Client {} subscribed to meter stream for rig '{}'",
|
||||
addr, target_rig_id
|
||||
);
|
||||
loop {
|
||||
tokio::select! {
|
||||
sample = meter_rx.recv() => {
|
||||
match sample {
|
||||
Ok(update) => {
|
||||
let Ok(mut line) = serde_json::to_string(&update) else { continue };
|
||||
line.push('\n');
|
||||
let write = time::timeout(
|
||||
io_timeout,
|
||||
writer.write_all(line.as_bytes()),
|
||||
).await;
|
||||
match write {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => {
|
||||
info!("Client {} meter write failed: {}", addr, e);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
info!("Client {} meter write timed out", addr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => break,
|
||||
Ok(()) => {}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
||||
// Fast path: serve GetSnapshot directly from the watch channel
|
||||
// so clients get a response even while the rig task is initializing.
|
||||
@@ -669,12 +722,14 @@ mod tests {
|
||||
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
|
||||
let (state_tx, state_rx) = watch::channel(state);
|
||||
let _state_tx = state_tx;
|
||||
let (meter_tx, _) = tokio::sync::broadcast::channel(8);
|
||||
let handle = RigHandle {
|
||||
rig_id: "default".to_string(),
|
||||
display_name: "Default Rig".to_string(),
|
||||
rig_tx,
|
||||
state_rx,
|
||||
audio_port: 4531,
|
||||
meter_tx,
|
||||
};
|
||||
let mut map = HashMap::new();
|
||||
map.insert("default".to_string(), handle);
|
||||
@@ -858,33 +913,36 @@ mod tests {
|
||||
/// Build a multi-rig HashMap with two rigs having independent state and
|
||||
/// command channels. Returns the map, default rig id, and the mpsc
|
||||
/// receivers for each rig so tests can inspect routed commands.
|
||||
fn make_two_rigs(
|
||||
state_a: RigState,
|
||||
state_b: RigState,
|
||||
) -> (
|
||||
type TwoRigs = (
|
||||
Arc<HashMap<String, RigHandle>>,
|
||||
String,
|
||||
mpsc::Receiver<RigRequest>,
|
||||
mpsc::Receiver<RigRequest>,
|
||||
) {
|
||||
);
|
||||
|
||||
fn make_two_rigs(state_a: RigState, state_b: RigState) -> TwoRigs {
|
||||
let (tx_a, rx_a) = mpsc::channel::<RigRequest>(8);
|
||||
let (_state_tx_a, state_rx_a) = watch::channel(state_a);
|
||||
let (meter_tx_a, _) = tokio::sync::broadcast::channel(8);
|
||||
let handle_a = RigHandle {
|
||||
rig_id: "rig_hf".to_string(),
|
||||
display_name: "HF Rig".to_string(),
|
||||
rig_tx: tx_a,
|
||||
state_rx: state_rx_a,
|
||||
audio_port: 4531,
|
||||
meter_tx: meter_tx_a,
|
||||
};
|
||||
|
||||
let (tx_b, rx_b) = mpsc::channel::<RigRequest>(8);
|
||||
let (_state_tx_b, state_rx_b) = watch::channel(state_b);
|
||||
let (meter_tx_b, _) = tokio::sync::broadcast::channel(8);
|
||||
let handle_b = RigHandle {
|
||||
rig_id: "rig_vhf".to_string(),
|
||||
display_name: "VHF Rig".to_string(),
|
||||
rig_tx: tx_b,
|
||||
state_rx: state_rx_b,
|
||||
audio_port: 4532,
|
||||
meter_tx: meter_tx_b,
|
||||
};
|
||||
|
||||
let mut map = HashMap::new();
|
||||
|
||||
@@ -1075,6 +1075,8 @@ async fn main() -> DynResult<()> {
|
||||
Some("Disabled".to_string())
|
||||
};
|
||||
let (state_tx, state_rx) = watch::channel(initial_state);
|
||||
let (meter_tx, _) =
|
||||
broadcast::channel::<trx_protocol::MeterUpdate>(rig_handle::METER_BROADCAST_CAPACITY);
|
||||
|
||||
let mut task_config = build_rig_task_config(
|
||||
rig_cfg,
|
||||
@@ -1101,9 +1103,15 @@ async fn main() -> DynResult<()> {
|
||||
// silently losing the rig.
|
||||
let rig_shutdown_rx = shutdown_rx.clone();
|
||||
let rig_id_supervisor = rig_cfg.id.clone();
|
||||
let meter_tx_task = meter_tx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
let result =
|
||||
rig_task::run_rig_task(task_config, rig_rx, state_tx.clone(), rig_shutdown_rx)
|
||||
let result = rig_task::run_rig_task(
|
||||
task_config,
|
||||
rig_rx,
|
||||
state_tx.clone(),
|
||||
meter_tx_task,
|
||||
rig_shutdown_rx,
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(()) => {
|
||||
@@ -1153,6 +1161,7 @@ async fn main() -> DynResult<()> {
|
||||
rig_tx,
|
||||
state_rx,
|
||||
audio_port: rig_cfg.audio.port,
|
||||
meter_tx,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -4,10 +4,16 @@
|
||||
|
||||
//! Thin handle giving the listener access to one rig's task and state.
|
||||
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
use trx_protocol::MeterUpdate;
|
||||
|
||||
/// Bounded broadcast capacity for the meter stream. Keeps ~0.5 s of buffered
|
||||
/// samples at 30 Hz — more than enough slack to tolerate a scheduling blip
|
||||
/// without forcing the producer to block or drop silently.
|
||||
pub const METER_BROADCAST_CAPACITY: usize = 16;
|
||||
|
||||
/// A handle to a single running rig backend.
|
||||
///
|
||||
@@ -24,4 +30,8 @@ pub struct RigHandle {
|
||||
pub state_rx: watch::Receiver<RigState>,
|
||||
/// Per-rig audio listener TCP port.
|
||||
pub audio_port: u16,
|
||||
/// Fast per-rig meter samples published by `rig_task` at ~30 Hz (SDR) or
|
||||
/// ~6–7 Hz (CAT). Consumed by `SubscribeMeter` clients; independent of
|
||||
/// the slower `state_rx` snapshot path.
|
||||
pub meter_tx: broadcast::Sender<MeterUpdate>,
|
||||
}
|
||||
|
||||
@@ -7,11 +7,12 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use tokio::time::{self, Instant};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use trx_backend::{RegistrationContext, RigAccess};
|
||||
use trx_protocol::MeterUpdate;
|
||||
use trx_core::radio::freq::Freq;
|
||||
use trx_core::rig::command::RigCommand;
|
||||
use trx_core::rig::controller::{
|
||||
@@ -113,6 +114,7 @@ pub async fn run_rig_task(
|
||||
config: RigTaskConfig,
|
||||
mut rx: mpsc::Receiver<RigRequest>,
|
||||
state_tx: watch::Sender<RigState>,
|
||||
meter_tx: broadcast::Sender<MeterUpdate>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) -> DynResult<()> {
|
||||
let histories = config.histories.clone();
|
||||
@@ -256,14 +258,24 @@ pub async fn run_rig_task(
|
||||
|
||||
// Run a fast meter tick between full polls to keep the S-meter
|
||||
// responsive. SDR backends expose a cached DSP reading and can
|
||||
// refresh every 100 ms; CAT rigs poll the serial link, which is
|
||||
// slower but still fine at ~150 ms.
|
||||
// refresh every 33 ms (~30 Hz) for instant-feeling metering; CAT
|
||||
// rigs poll the serial link, which is slower but still usable at
|
||||
// ~150 ms.
|
||||
//
|
||||
// Every tick publishes to `meter_tx` (unconditional, for the fast
|
||||
// `/meter` stream) and — gated by a small delta — to `state_tx` so
|
||||
// other frontends that read `RigState.status.rx.sig` keep working
|
||||
// without drowning the regular `/events` SSE in near-duplicate
|
||||
// full-state frames at 30 Hz.
|
||||
let is_sdr = rig.as_sdr_ref().is_some();
|
||||
let meter_tick_duration = if is_sdr {
|
||||
Duration::from_millis(100)
|
||||
Duration::from_millis(33)
|
||||
} else {
|
||||
Duration::from_millis(150)
|
||||
};
|
||||
let meter_task_start = Instant::now();
|
||||
let meter_state_delta_db: f64 = 0.25;
|
||||
let rig_id = config.rig_id.clone();
|
||||
let mut meter_tick: std::pin::Pin<Box<tokio::time::Sleep>> =
|
||||
Box::pin(tokio::time::sleep(meter_tick_duration));
|
||||
|
||||
@@ -308,8 +320,26 @@ pub async fn run_rig_task(
|
||||
None
|
||||
};
|
||||
if let Some(db) = new_sig {
|
||||
// Always publish to the fast broadcast: subscribers
|
||||
// see every tick, including unchanged values, so the
|
||||
// UI animation stays smooth. `send` only errors when
|
||||
// no subscribers exist; that's fine.
|
||||
let ts_ms = meter_task_start.elapsed().as_millis() as u64;
|
||||
let _ = meter_tx.send(MeterUpdate {
|
||||
rig_id: rig_id.clone(),
|
||||
sig_dbm: db,
|
||||
ts_ms,
|
||||
});
|
||||
|
||||
// Only push into `state_tx` when the sample moved by
|
||||
// a meaningful amount — otherwise `/events` clients
|
||||
// would re-serialize the entire RigState at 30 Hz.
|
||||
let prev = state.status.rx.as_ref().and_then(|r| r.sig);
|
||||
if prev != Some(db) {
|
||||
let significant = match prev {
|
||||
Some(p) => (p - db).abs() >= meter_state_delta_db,
|
||||
None => true,
|
||||
};
|
||||
if significant {
|
||||
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(db);
|
||||
let _ = state_tx.send(state.clone());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user