From 8cb6292c1d3ef52a264e107b66edfadc2f2ae3ce Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Mon, 9 Mar 2026 22:13:53 +0100 Subject: [PATCH] [fix](trx-client): reduce events source latency to <1 s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three causes of >30 s SSE stalls: 1. SPECTRUM_IO_TIMEOUT was 300 ms — transient server jitter triggered false-positive spectrum failures and immediate TCP disconnects. Raised to 1 s to tolerate brief load spikes. 2. reconnect_delay was never reset after a successful TCP connect, so after a few spectrum-induced disconnects the backoff reached 10 s. Each new disconnect then cost 10 s of stale SSE state, and several cycles accumulated to >30 s. Reset to 1 s on every successful TCP connect so recovery stays fast. 3. SSE pings were emitted as comments (": ping"), which EventSource never exposes to onmessage. lastEventAt was therefore never updated by pings, causing the JS heartbeat to force-reconnect every ~20 s even on healthy, stable connections. Changed to a named "ping" event and added es.addEventListener("ping", …) to update lastEventAt. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 3 +++ src/trx-client/src/remote_client.rs | 5 ++++- .../trx-frontend/trx-frontend-http/assets/web/app.js | 3 +++ src/trx-client/trx-frontend/trx-frontend-http/src/api.rs | 4 +++- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ccdfa6..52e095e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2460,6 +2460,7 @@ dependencies = [ "clap", "cpal", "dirs", + "flate2", "opus", "serde", "serde_json", @@ -2479,6 +2480,7 @@ dependencies = [ name = "trx-core" version = "0.1.0" dependencies = [ + "flate2", "serde", "serde_json", "tokio", @@ -2591,6 +2593,7 @@ dependencies = [ "clap", "cpal", "dirs", + "flate2", "num-complex", "opus", "pickledb", diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index a85cf7d..6474f42 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -22,7 +22,7 @@ use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse}; const DEFAULT_REMOTE_PORT: u16 = 4530; const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const IO_TIMEOUT: Duration = Duration::from_secs(15); -const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(300); +const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(1000); const MAX_JSON_LINE_BYTES: usize = 16 * 1024; const MAX_CONSECUTIVE_POLL_FAILURES: u32 = 3; @@ -71,6 +71,9 @@ pub async fn run_remote_client( info!("Remote client: connecting to {}", config.addr); match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await { Ok(Ok(stream)) => { + // Reset backoff on successful TCP connect: server is reachable, so the + // next disconnect should retry quickly rather than waiting up to 10 s. + reconnect_delay = Duration::from_secs(1); if let Err(e) = handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 6f03bef..11a2cbf 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -2705,6 +2705,9 @@ function connect() { console.error("Bad event data", e); } }; + es.addEventListener("ping", () => { + lastEventAt = Date.now(); + }); es.onerror = () => { // Check if this is an auth error by looking at readyState if (es.readyState === EventSource.CLOSED) { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index ad747b0..3291af4 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -256,8 +256,10 @@ pub async fn events( } }); + // Send a named "ping" event so the JS heartbeat can observe it (SSE + // comments like ": ping" are not exposed by EventSource.onmessage). let pings = IntervalStream::new(time::interval(Duration::from_secs(5))) - .map(|_| Ok::(Bytes::from(": ping\n\n"))); + .map(|_| Ok::(Bytes::from("event: ping\ndata: \n\n"))); // Wrap stream to decrement counter on drop. let counter_drop = counter.clone();