[perf](trx-client): three quick-win optimizations
State deduplication via PartialEq + send_if_modified: Derive PartialEq on the full RigState / RigSnapshot type tree (Freq, Band, RigInfo, RigCapabilities, RigStatus, RigTxStatus, RigRxStatus, RigControl, RigVfo, RigVfoEntry, RigFilterState, RdsData, SpectrumData, RigState, RigSnapshot). Use state_tx.send_if_modified() in refresh_remote_snapshot() so WatchStream only wakes SSE /events subscribers when state actually changed; with a stable rig this eliminates ~1.3 spurious JSON serialisations per second per connected client. Cache-remote-rigs skip on unchanged list: cache_remote_rigs() was rebuilding the Vec and cloning every field on every 750 ms poll. Add a structural check (rig_id, display_name, initialized, audio_port) and return early when nothing has changed — the common steady-state case. RDS JSON pre-serialised at ingestion: SharedSpectrum.replace() now serialises the optional RDS object once and stores it alongside the Arc<SpectrumData> frame. Each /spectrum SSE client's 40 ms tick reads the cached string instead of calling serde_json::to_string() per-client per-tick. Add serde_json to trx-frontend Cargo.toml to support this. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
Generated
+1
@@ -2511,6 +2511,7 @@ name = "trx-frontend"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"trx-core",
|
"trx-core",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -353,7 +353,16 @@ async fn refresh_remote_snapshot(
|
|||||||
set_selected_rig_id(config, Some(target.rig_id.clone()));
|
set_selected_rig_id(config, Some(target.rig_id.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = state_tx.send(RigState::from_snapshot(target.state.clone()));
|
let new_state = RigState::from_snapshot(target.state.clone());
|
||||||
|
// Only wake SSE subscribers when something actually changed.
|
||||||
|
state_tx.send_if_modified(|old| {
|
||||||
|
if *old == new_state {
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
*old = new_state;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -400,6 +409,19 @@ async fn send_get_rigs(
|
|||||||
|
|
||||||
fn cache_remote_rigs(config: &RemoteClientConfig, rigs: &[RigEntry]) {
|
fn cache_remote_rigs(config: &RemoteClientConfig, rigs: &[RigEntry]) {
|
||||||
if let Ok(mut guard) = config.known_rigs.lock() {
|
if let Ok(mut guard) = config.known_rigs.lock() {
|
||||||
|
// Skip the Vec rebuild when the rig list is structurally unchanged.
|
||||||
|
// We compare the fields surfaced in the UI rig picker; full state
|
||||||
|
// changes are propagated via the watch channel, not this cache.
|
||||||
|
let unchanged = guard.len() == rigs.len()
|
||||||
|
&& guard.iter().zip(rigs.iter()).all(|(cached, new)| {
|
||||||
|
cached.rig_id == new.rig_id
|
||||||
|
&& cached.display_name == new.display_name
|
||||||
|
&& cached.state.initialized == new.state.initialized
|
||||||
|
&& cached.audio_port == new.audio_port
|
||||||
|
});
|
||||||
|
if unchanged {
|
||||||
|
return;
|
||||||
|
}
|
||||||
*guard = rigs
|
*guard = rigs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|entry| RemoteRigEntry {
|
.map(|entry| RemoteRigEntry {
|
||||||
|
|||||||
@@ -9,5 +9,6 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
|
serde_json = { workspace = true }
|
||||||
trx-core = { path = "../../trx-core" }
|
trx-core = { path = "../../trx-core" }
|
||||||
tokio = { workspace = true, features = ["sync"] }
|
tokio = { workspace = true, features = ["sync"] }
|
||||||
|
|||||||
@@ -44,16 +44,25 @@ pub struct SharedSpectrum {
|
|||||||
// Arc so that each SSE client gets a cheap pointer clone instead of
|
// Arc so that each SSE client gets a cheap pointer clone instead of
|
||||||
// copying the entire bin vector (~8 KB for 2048 f32 bins).
|
// copying the entire bin vector (~8 KB for 2048 f32 bins).
|
||||||
frame: Option<Arc<SpectrumData>>,
|
frame: Option<Arc<SpectrumData>>,
|
||||||
|
// RDS JSON serialised once at ingestion; avoids per-client serde work
|
||||||
|
// on every 40 ms tick for a field that changes at most once per second.
|
||||||
|
rds_json: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SharedSpectrum {
|
impl SharedSpectrum {
|
||||||
pub fn replace(&mut self, frame: Option<SpectrumData>) {
|
pub fn replace(&mut self, frame: Option<SpectrumData>) {
|
||||||
self.revision = self.revision.wrapping_add(1);
|
self.revision = self.revision.wrapping_add(1);
|
||||||
|
self.rds_json = frame
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|f| f.rds.as_ref())
|
||||||
|
.and_then(|r| serde_json::to_string(r).ok());
|
||||||
self.frame = frame.map(Arc::new);
|
self.frame = frame.map(Arc::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn snapshot(&self) -> (u64, Option<Arc<SpectrumData>>) {
|
/// Returns `(revision, frame, rds_json)`.
|
||||||
(self.revision, self.frame.clone())
|
/// `rds_json` is pre-serialised; `None` means no RDS data.
|
||||||
|
pub fn snapshot(&self) -> (u64, Option<Arc<SpectrumData>>, Option<String>) {
|
||||||
|
(self.revision, self.frame.clone(), self.rds_json.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -424,16 +424,13 @@ pub async fn spectrum(
|
|||||||
let next = context.spectrum.lock().ok().map(|g| g.snapshot());
|
let next = context.spectrum.lock().ok().map(|g| g.snapshot());
|
||||||
|
|
||||||
let sse_chunk: Option<String> = match next {
|
let sse_chunk: Option<String> = match next {
|
||||||
Some((revision, _frame)) if last_revision == Some(revision) => None,
|
Some((revision, _frame, _rds)) if last_revision == Some(revision) => None,
|
||||||
Some((revision, Some(frame))) => {
|
Some((revision, Some(frame), rds_json)) => {
|
||||||
last_revision = Some(revision);
|
last_revision = Some(revision);
|
||||||
let mut chunk =
|
let mut chunk =
|
||||||
format!("event: b\ndata: {}\n\n", encode_spectrum_frame(&frame));
|
format!("event: b\ndata: {}\n\n", encode_spectrum_frame(&frame));
|
||||||
// Append an `rds` event only when the RDS payload changes.
|
// rds_json is pre-serialised at ingestion; append an
|
||||||
let rds_json = frame
|
// `rds` event only when the payload changed for this client.
|
||||||
.rds
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|r| serde_json::to_string(r).ok());
|
|
||||||
if rds_json != last_rds_json {
|
if rds_json != last_rds_json {
|
||||||
let data = rds_json.as_deref().unwrap_or("null");
|
let data = rds_json.as_deref().unwrap_or("null");
|
||||||
chunk.push_str(&format!("event: rds\ndata: {data}\n\n"));
|
chunk.push_str(&format!("event: rds\ndata: {data}\n\n"));
|
||||||
@@ -441,7 +438,7 @@ pub async fn spectrum(
|
|||||||
}
|
}
|
||||||
Some(chunk)
|
Some(chunk)
|
||||||
}
|
}
|
||||||
Some((revision, None)) => {
|
Some((revision, None, _)) => {
|
||||||
last_revision = Some(revision);
|
last_revision = Some(revision);
|
||||||
Some("data: null\n\n".to_string())
|
Some("data: null\n\n".to_string())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
const SPEED_OF_LIGHT_M_PER_S: f64 = 299_792_458.0;
|
const SPEED_OF_LIGHT_M_PER_S: f64 = 299_792_458.0;
|
||||||
|
|
||||||
/// Supported band range in Hz.
|
/// Supported band range in Hz.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub struct Band {
|
pub struct Band {
|
||||||
pub low_hz: u64,
|
pub low_hz: u64,
|
||||||
pub high_hz: u64,
|
pub high_hz: u64,
|
||||||
@@ -23,7 +23,7 @@ impl Band {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Frequency wrapper (Hz).
|
/// Frequency wrapper (Hz).
|
||||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub struct Freq {
|
pub struct Freq {
|
||||||
pub hz: u64,
|
pub hz: u64,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,14 +21,14 @@ pub mod response;
|
|||||||
pub mod state;
|
pub mod state;
|
||||||
|
|
||||||
/// How this backend communicates with the rig.
|
/// How this backend communicates with the rig.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub enum RigAccessMethod {
|
pub enum RigAccessMethod {
|
||||||
Serial { path: String, baud: u32 },
|
Serial { path: String, baud: u32 },
|
||||||
Tcp { addr: String },
|
Tcp { addr: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Static info describing a rig backend.
|
/// Static info describing a rig backend.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigInfo {
|
pub struct RigInfo {
|
||||||
pub manufacturer: String,
|
pub manufacturer: String,
|
||||||
pub model: String,
|
pub model: String,
|
||||||
@@ -37,7 +37,7 @@ pub struct RigInfo {
|
|||||||
pub access: RigAccessMethod,
|
pub access: RigAccessMethod,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub struct RigCapabilities {
|
pub struct RigCapabilities {
|
||||||
#[serde(default = "default_min_freq_step_hz")]
|
#[serde(default = "default_min_freq_step_hz")]
|
||||||
pub min_freq_step_hz: u64,
|
pub min_freq_step_hz: u64,
|
||||||
@@ -233,7 +233,7 @@ pub trait RigCat: Rig + Send {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Snapshot of a rig's status that every backend can expose.
|
/// Snapshot of a rig's status that every backend can expose.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigStatus {
|
pub struct RigStatus {
|
||||||
pub freq: Freq,
|
pub freq: Freq,
|
||||||
pub mode: RigMode,
|
pub mode: RigMode,
|
||||||
@@ -249,21 +249,21 @@ pub trait RigStatusProvider {
|
|||||||
fn status(&self) -> RigStatus;
|
fn status(&self) -> RigStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigVfo {
|
pub struct RigVfo {
|
||||||
pub entries: Vec<RigVfoEntry>,
|
pub entries: Vec<RigVfoEntry>,
|
||||||
/// Index into `entries` for the active VFO, if known.
|
/// Index into `entries` for the active VFO, if known.
|
||||||
pub active: Option<usize>,
|
pub active: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigVfoEntry {
|
pub struct RigVfoEntry {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub freq: Freq,
|
pub freq: Freq,
|
||||||
pub mode: Option<RigMode>,
|
pub mode: Option<RigMode>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigTxStatus {
|
pub struct RigTxStatus {
|
||||||
pub power: Option<u8>,
|
pub power: Option<u8>,
|
||||||
pub limit: Option<u8>,
|
pub limit: Option<u8>,
|
||||||
@@ -271,13 +271,13 @@ pub struct RigTxStatus {
|
|||||||
pub alc: Option<u8>,
|
pub alc: Option<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub struct RigRxStatus {
|
pub struct RigRxStatus {
|
||||||
pub sig: Option<i32>,
|
pub sig: Option<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Configurable control settings that can be pushed to the rig.
|
/// Configurable control settings that can be pushed to the rig.
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||||
pub struct RigControl {
|
pub struct RigControl {
|
||||||
pub enabled: Option<bool>,
|
pub enabled: Option<bool>,
|
||||||
pub lock: Option<bool>,
|
pub lock: Option<bool>,
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use crate::radio::freq::Freq;
|
|||||||
use crate::rig::{RigControl, RigInfo, RigRxStatus, RigStatus, RigStatusProvider, RigTxStatus};
|
use crate::rig::{RigControl, RigInfo, RigRxStatus, RigStatus, RigStatusProvider, RigTxStatus};
|
||||||
|
|
||||||
/// Simple transceiver state representation held by the rig task.
|
/// Simple transceiver state representation held by the rig task.
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||||
pub struct RigState {
|
pub struct RigState {
|
||||||
#[serde(skip_deserializing)]
|
#[serde(skip_deserializing)]
|
||||||
pub rig_info: Option<RigInfo>,
|
pub rig_info: Option<RigInfo>,
|
||||||
@@ -280,7 +280,7 @@ impl RigState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Current filter/DSP state for backends that support runtime filter adjustment.
|
/// Current filter/DSP state for backends that support runtime filter adjustment.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigFilterState {
|
pub struct RigFilterState {
|
||||||
pub bandwidth_hz: u32,
|
pub bandwidth_hz: u32,
|
||||||
pub fir_taps: u32,
|
pub fir_taps: u32,
|
||||||
@@ -324,7 +324,7 @@ fn default_wfm_denoise_level() -> WfmDenoiseLevel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Spectrum data from SDR backends (FFT magnitude over the full capture bandwidth).
|
/// Spectrum data from SDR backends (FFT magnitude over the full capture bandwidth).
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct SpectrumData {
|
pub struct SpectrumData {
|
||||||
/// FFT magnitude bins in dBFS, FFT-shifted so DC (centre frequency) is at index N/2.
|
/// FFT magnitude bins in dBFS, FFT-shifted so DC (centre frequency) is at index N/2.
|
||||||
pub bins: Vec<f32>,
|
pub bins: Vec<f32>,
|
||||||
@@ -338,7 +338,7 @@ pub struct SpectrumData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Live RDS metadata decoded from a WFM broadcast.
|
/// Live RDS metadata decoded from a WFM broadcast.
|
||||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RdsData {
|
pub struct RdsData {
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub pi: Option<u16>,
|
pub pi: Option<u16>,
|
||||||
@@ -371,7 +371,7 @@ pub struct RdsData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Read-only projection of state shared with clients.
|
/// Read-only projection of state shared with clients.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct RigSnapshot {
|
pub struct RigSnapshot {
|
||||||
pub info: RigInfo,
|
pub info: RigInfo,
|
||||||
pub status: RigStatus,
|
pub status: RigStatus,
|
||||||
|
|||||||
Reference in New Issue
Block a user