Merge pull request #42 from sgrams/fix/events-source-latency

[fix](trx-client): reduce events source latency to <1 s
This commit is contained in:
2026-03-09 22:15:19 +01:00
committed by GitHub
4 changed files with 13 additions and 2 deletions
Generated
+3
View File
@@ -2460,6 +2460,7 @@ dependencies = [
"clap",
"cpal",
"dirs",
"flate2",
"opus",
"serde",
"serde_json",
@@ -2479,6 +2480,7 @@ dependencies = [
name = "trx-core"
version = "0.1.0"
dependencies = [
"flate2",
"serde",
"serde_json",
"tokio",
@@ -2591,6 +2593,7 @@ dependencies = [
"clap",
"cpal",
"dirs",
"flate2",
"num-complex",
"opus",
"pickledb",
+4 -1
View File
@@ -22,7 +22,7 @@ use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse};
const DEFAULT_REMOTE_PORT: u16 = 4530;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const IO_TIMEOUT: Duration = Duration::from_secs(15);
const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(300);
const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(1000);
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
const MAX_CONSECUTIVE_POLL_FAILURES: u32 = 3;
@@ -71,6 +71,9 @@ pub async fn run_remote_client(
info!("Remote client: connecting to {}", config.addr);
match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
Ok(Ok(stream)) => {
// Reset backoff on successful TCP connect: server is reachable, so the
// next disconnect should retry quickly rather than waiting up to 10 s.
reconnect_delay = Duration::from_secs(1);
if let Err(e) =
handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await
{
@@ -2705,6 +2705,9 @@ function connect() {
console.error("Bad event data", e);
}
};
es.addEventListener("ping", () => {
lastEventAt = Date.now();
});
es.onerror = () => {
// Check if this is an auth error by looking at readyState
if (es.readyState === EventSource.CLOSED) {
@@ -256,8 +256,10 @@ pub async fn events(
}
});
// Send a named "ping" event so the JS heartbeat can observe it (SSE
// comments like ": ping" are not exposed by EventSource.onmessage).
let pings = IntervalStream::new(time::interval(Duration::from_secs(5)))
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
.map(|_| Ok::<Bytes, Error>(Bytes::from("event: ping\ndata: \n\n")));
// Wrap stream to decrement counter on drop.
let counter_drop = counter.clone();