diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 6474f42..4f86fed 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -74,6 +74,11 @@ pub async fn run_remote_client( // Reset backoff on successful TCP connect: server is reachable, so the // next disconnect should retry quickly rather than waiting up to 10 s. reconnect_delay = Duration::from_secs(1); + // Disable Nagle's algorithm so each framed command is sent immediately + // rather than being held for up to 40 ms waiting for ACKs. + if let Err(e) = stream.set_nodelay(true) { + warn!("TCP_NODELAY failed: {}", e); + } if let Err(e) = handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await { @@ -225,12 +230,13 @@ async fn send_command( ) -> RigResult { let envelope = build_envelope(config, cmd, rig_id_override); - let payload = serde_json::to_string(&envelope) + let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); time::timeout( IO_TIMEOUT, - writer.write_all(format!("{}\n", payload).as_bytes()), + writer.write_all(payload.as_bytes()), ) .await .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? @@ -271,11 +277,12 @@ async fn send_command_no_state_update( cmd: ClientCommand, ) -> RigResult { let envelope = build_envelope(config, cmd, None); - let payload = serde_json::to_string(&envelope) + let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); time::timeout( SPECTRUM_IO_TIMEOUT, - writer.write_all(format!("{}\n", payload).as_bytes()), + writer.write_all(payload.as_bytes()), ) .await .map_err(|_| { @@ -356,12 +363,13 @@ async fn send_get_rigs( reader: &mut BufReader, ) -> RigResult> { let envelope = build_envelope(config, ClientCommand::GetRigs, None); - let payload = serde_json::to_string(&envelope) + let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); time::timeout( IO_TIMEOUT, - writer.write_all(format!("{}\n", payload).as_bytes()), + writer.write_all(payload.as_bytes()), ) .await .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index f796dc5..6414ca9 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -41,16 +41,18 @@ pub trait FrontendSpawner { #[derive(Debug, Default)] pub struct SharedSpectrum { revision: u64, - frame: Option, + // Arc so that each SSE client gets a cheap pointer clone instead of + // copying the entire bin vector (~8 KB for 2048 f32 bins). + frame: Option>, } impl SharedSpectrum { pub fn replace(&mut self, frame: Option) { self.revision = self.revision.wrapping_add(1); - self.frame = frame; + self.frame = frame.map(Arc::new); } - pub fn snapshot(&self) -> (u64, Option) { + pub fn snapshot(&self) -> (u64, Option>) { (self.revision, self.frame.clone()) } }