[fix](trx-client): reduce events source latency to <1 s
Three causes of >30 s SSE stalls:
1. SPECTRUM_IO_TIMEOUT was 300 ms — transient server jitter triggered
false-positive spectrum failures and immediate TCP disconnects.
Raised to 1 s to tolerate brief load spikes.
2. reconnect_delay was never reset after a successful TCP connect, so
after a few spectrum-induced disconnects the backoff reached 10 s.
Each new disconnect then cost 10 s of stale SSE state, and several
cycles accumulated to >30 s. Reset to 1 s on every successful
TCP connect so recovery stays fast.
3. SSE pings were emitted as comments (": ping"), which EventSource
never exposes to onmessage. lastEventAt was therefore never updated
by pings, causing the JS heartbeat to force-reconnect every ~20 s
even on healthy, stable connections. Changed to a named "ping"
event and added es.addEventListener("ping", …) to update lastEventAt.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
Generated
+3
@@ -2460,6 +2460,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"cpal",
|
"cpal",
|
||||||
"dirs",
|
"dirs",
|
||||||
|
"flate2",
|
||||||
"opus",
|
"opus",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -2479,6 +2480,7 @@ dependencies = [
|
|||||||
name = "trx-core"
|
name = "trx-core"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"flate2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -2591,6 +2593,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"cpal",
|
"cpal",
|
||||||
"dirs",
|
"dirs",
|
||||||
|
"flate2",
|
||||||
"num-complex",
|
"num-complex",
|
||||||
"opus",
|
"opus",
|
||||||
"pickledb",
|
"pickledb",
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse};
|
|||||||
const DEFAULT_REMOTE_PORT: u16 = 4530;
|
const DEFAULT_REMOTE_PORT: u16 = 4530;
|
||||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
|
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
const IO_TIMEOUT: Duration = Duration::from_secs(15);
|
const IO_TIMEOUT: Duration = Duration::from_secs(15);
|
||||||
const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(300);
|
const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||||
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
||||||
const MAX_CONSECUTIVE_POLL_FAILURES: u32 = 3;
|
const MAX_CONSECUTIVE_POLL_FAILURES: u32 = 3;
|
||||||
|
|
||||||
@@ -71,6 +71,9 @@ pub async fn run_remote_client(
|
|||||||
info!("Remote client: connecting to {}", config.addr);
|
info!("Remote client: connecting to {}", config.addr);
|
||||||
match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
|
match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
|
||||||
Ok(Ok(stream)) => {
|
Ok(Ok(stream)) => {
|
||||||
|
// Reset backoff on successful TCP connect: server is reachable, so the
|
||||||
|
// next disconnect should retry quickly rather than waiting up to 10 s.
|
||||||
|
reconnect_delay = Duration::from_secs(1);
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await
|
handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -2705,6 +2705,9 @@ function connect() {
|
|||||||
console.error("Bad event data", e);
|
console.error("Bad event data", e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
es.addEventListener("ping", () => {
|
||||||
|
lastEventAt = Date.now();
|
||||||
|
});
|
||||||
es.onerror = () => {
|
es.onerror = () => {
|
||||||
// Check if this is an auth error by looking at readyState
|
// Check if this is an auth error by looking at readyState
|
||||||
if (es.readyState === EventSource.CLOSED) {
|
if (es.readyState === EventSource.CLOSED) {
|
||||||
|
|||||||
@@ -256,8 +256,10 @@ pub async fn events(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Send a named "ping" event so the JS heartbeat can observe it (SSE
|
||||||
|
// comments like ": ping" are not exposed by EventSource.onmessage).
|
||||||
let pings = IntervalStream::new(time::interval(Duration::from_secs(5)))
|
let pings = IntervalStream::new(time::interval(Duration::from_secs(5)))
|
||||||
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
.map(|_| Ok::<Bytes, Error>(Bytes::from("event: ping\ndata: \n\n")));
|
||||||
|
|
||||||
// Wrap stream to decrement counter on drop.
|
// Wrap stream to decrement counter on drop.
|
||||||
let counter_drop = counter.clone();
|
let counter_drop = counter.clone();
|
||||||
|
|||||||
Reference in New Issue
Block a user