From de11c7ff51216c0c28b2b685bc9cf2a6d9d2b14e Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Mon, 9 Mar 2026 22:26:24 +0100 Subject: [PATCH] [perf](trx-client): TCP_NODELAY, Arc spectrum bins, eliminate format allocs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three hot-path optimizations in the client polling loop and SSE spectrum stream: - Set TCP_NODELAY on the client→server connection so each framed JSON command is sent immediately instead of being held up to 40 ms by Nagle's algorithm. - Wrap SpectrumData in Arc<> inside SharedSpectrum. snapshot() was cloning the full bin vector (~8 KB for 2048 f32 bins) for every SSE /spectrum client on every 40 ms tick. With N clients that is N×8 KB per tick; now replace() pays one Arc::new() and each client gets an O(1) pointer clone. - Eliminate the format!("{}\n", payload) intermediate String in the three send_command / send_command_no_state_update / send_get_rigs call sites. Push '\n' in-place on the serialised payload String instead of allocating a second buffer. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- src/trx-client/src/remote_client.rs | 20 ++++++++++++++------ src/trx-client/trx-frontend/src/lib.rs | 8 +++++--- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 6474f42..4f86fed 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -74,6 +74,11 @@ pub async fn run_remote_client( // Reset backoff on successful TCP connect: server is reachable, so the // next disconnect should retry quickly rather than waiting up to 10 s. reconnect_delay = Duration::from_secs(1); + // Disable Nagle's algorithm so each framed command is sent immediately + // rather than being held for up to 40 ms waiting for ACKs. + if let Err(e) = stream.set_nodelay(true) { + warn!("TCP_NODELAY failed: {}", e); + } if let Err(e) = handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await { @@ -225,12 +230,13 @@ async fn send_command( ) -> RigResult { let envelope = build_envelope(config, cmd, rig_id_override); - let payload = serde_json::to_string(&envelope) + let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); time::timeout( IO_TIMEOUT, - writer.write_all(format!("{}\n", payload).as_bytes()), + writer.write_all(payload.as_bytes()), ) .await .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? @@ -271,11 +277,12 @@ async fn send_command_no_state_update( cmd: ClientCommand, ) -> RigResult { let envelope = build_envelope(config, cmd, None); - let payload = serde_json::to_string(&envelope) + let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); time::timeout( SPECTRUM_IO_TIMEOUT, - writer.write_all(format!("{}\n", payload).as_bytes()), + writer.write_all(payload.as_bytes()), ) .await .map_err(|_| { @@ -356,12 +363,13 @@ async fn send_get_rigs( reader: &mut BufReader, ) -> RigResult> { let envelope = build_envelope(config, ClientCommand::GetRigs, None); - let payload = serde_json::to_string(&envelope) + let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); time::timeout( IO_TIMEOUT, - writer.write_all(format!("{}\n", payload).as_bytes()), + writer.write_all(payload.as_bytes()), ) .await .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index f796dc5..6414ca9 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -41,16 +41,18 @@ pub trait FrontendSpawner { #[derive(Debug, Default)] pub struct SharedSpectrum { revision: u64, - frame: Option, + // Arc so that each SSE client gets a cheap pointer clone instead of + // copying the entire bin vector (~8 KB for 2048 f32 bins). + frame: Option>, } impl SharedSpectrum { pub fn replace(&mut self, frame: Option) { self.revision = self.revision.wrapping_add(1); - self.frame = frame; + self.frame = frame.map(Arc::new); } - pub fn snapshot(&self) -> (u64, Option) { + pub fn snapshot(&self) -> (u64, Option>) { (self.revision, self.frame.clone()) } }