[perf](trx-client): TCP_NODELAY, Arc spectrum bins, eliminate format allocs
Three hot-path optimizations in the client polling loop and SSE
spectrum stream:
- Set TCP_NODELAY on the client→server connection so each framed
JSON command is sent immediately instead of being held up to 40 ms
by Nagle's algorithm.
- Wrap SpectrumData in Arc<> inside SharedSpectrum. snapshot() was
cloning the full bin vector (~8 KB for 2048 f32 bins) for every SSE
/spectrum client on every 40 ms tick. With N clients that is N×8 KB
per tick; now replace() pays one Arc::new() and each client gets an
O(1) pointer clone.
- Eliminate the format!("{}\n", payload) intermediate String in the
three send_command / send_command_no_state_update / send_get_rigs
call sites. Push '\n' in-place on the serialised payload String
instead of allocating a second buffer.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -74,6 +74,11 @@ pub async fn run_remote_client(
|
|||||||
// Reset backoff on successful TCP connect: server is reachable, so the
|
// Reset backoff on successful TCP connect: server is reachable, so the
|
||||||
// next disconnect should retry quickly rather than waiting up to 10 s.
|
// next disconnect should retry quickly rather than waiting up to 10 s.
|
||||||
reconnect_delay = Duration::from_secs(1);
|
reconnect_delay = Duration::from_secs(1);
|
||||||
|
// Disable Nagle's algorithm so each framed command is sent immediately
|
||||||
|
// rather than being held for up to 40 ms waiting for ACKs.
|
||||||
|
if let Err(e) = stream.set_nodelay(true) {
|
||||||
|
warn!("TCP_NODELAY failed: {}", e);
|
||||||
|
}
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await
|
handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await
|
||||||
{
|
{
|
||||||
@@ -225,12 +230,13 @@ async fn send_command(
|
|||||||
) -> RigResult<trx_core::RigSnapshot> {
|
) -> RigResult<trx_core::RigSnapshot> {
|
||||||
let envelope = build_envelope(config, cmd, rig_id_override);
|
let envelope = build_envelope(config, cmd, rig_id_override);
|
||||||
|
|
||||||
let payload = serde_json::to_string(&envelope)
|
let mut payload = serde_json::to_string(&envelope)
|
||||||
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
||||||
|
payload.push('\n');
|
||||||
|
|
||||||
time::timeout(
|
time::timeout(
|
||||||
IO_TIMEOUT,
|
IO_TIMEOUT,
|
||||||
writer.write_all(format!("{}\n", payload).as_bytes()),
|
writer.write_all(payload.as_bytes()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
|
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
|
||||||
@@ -271,11 +277,12 @@ async fn send_command_no_state_update(
|
|||||||
cmd: ClientCommand,
|
cmd: ClientCommand,
|
||||||
) -> RigResult<trx_core::RigSnapshot> {
|
) -> RigResult<trx_core::RigSnapshot> {
|
||||||
let envelope = build_envelope(config, cmd, None);
|
let envelope = build_envelope(config, cmd, None);
|
||||||
let payload = serde_json::to_string(&envelope)
|
let mut payload = serde_json::to_string(&envelope)
|
||||||
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
||||||
|
payload.push('\n');
|
||||||
time::timeout(
|
time::timeout(
|
||||||
SPECTRUM_IO_TIMEOUT,
|
SPECTRUM_IO_TIMEOUT,
|
||||||
writer.write_all(format!("{}\n", payload).as_bytes()),
|
writer.write_all(payload.as_bytes()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
@@ -356,12 +363,13 @@ async fn send_get_rigs(
|
|||||||
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
|
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
|
||||||
) -> RigResult<Vec<RigEntry>> {
|
) -> RigResult<Vec<RigEntry>> {
|
||||||
let envelope = build_envelope(config, ClientCommand::GetRigs, None);
|
let envelope = build_envelope(config, ClientCommand::GetRigs, None);
|
||||||
let payload = serde_json::to_string(&envelope)
|
let mut payload = serde_json::to_string(&envelope)
|
||||||
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
|
||||||
|
payload.push('\n');
|
||||||
|
|
||||||
time::timeout(
|
time::timeout(
|
||||||
IO_TIMEOUT,
|
IO_TIMEOUT,
|
||||||
writer.write_all(format!("{}\n", payload).as_bytes()),
|
writer.write_all(payload.as_bytes()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
|
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
|
||||||
|
|||||||
@@ -41,16 +41,18 @@ pub trait FrontendSpawner {
|
|||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct SharedSpectrum {
|
pub struct SharedSpectrum {
|
||||||
revision: u64,
|
revision: u64,
|
||||||
frame: Option<SpectrumData>,
|
// Arc so that each SSE client gets a cheap pointer clone instead of
|
||||||
|
// copying the entire bin vector (~8 KB for 2048 f32 bins).
|
||||||
|
frame: Option<Arc<SpectrumData>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SharedSpectrum {
|
impl SharedSpectrum {
|
||||||
pub fn replace(&mut self, frame: Option<SpectrumData>) {
|
pub fn replace(&mut self, frame: Option<SpectrumData>) {
|
||||||
self.revision = self.revision.wrapping_add(1);
|
self.revision = self.revision.wrapping_add(1);
|
||||||
self.frame = frame;
|
self.frame = frame.map(Arc::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn snapshot(&self) -> (u64, Option<SpectrumData>) {
|
pub fn snapshot(&self) -> (u64, Option<Arc<SpectrumData>>) {
|
||||||
(self.revision, self.frame.clone())
|
(self.revision, self.frame.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user