From aa079598bde1f92a8b5e1d4192b0ddabc34f0b58 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Mon, 9 Mar 2026 22:58:39 +0100 Subject: [PATCH] [refactor](trx-frontend-http): replace /spectrum poll+mutex with WatchStream Subscribe each SSE client to the watch::Sender rather than running an IntervalStream that locks a Mutex every 40 ms. Clients are now woken push-style exactly when new spectrum data arrives; the revision counter is no longer needed. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- .../trx-frontend/trx-frontend-http/src/api.rs | 61 ++++++++----------- 1 file changed, 24 insertions(+), 37 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index e1fac4c..d9a149e 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -414,44 +414,31 @@ impl futures_util::Stream for DropStream { pub async fn spectrum( context: web::Data>, ) -> Result { - let context_updates = context.get_ref().clone(); - let mut last_revision: Option = None; + // Subscribe to the watch channel: each client gets its own receiver and is + // woken exactly when new spectrum data is pushed (no 40 ms polling needed). + let rx = context.spectrum.subscribe(); let mut last_rds_json: Option = None; - let updates = - IntervalStream::new(time::interval(Duration::from_millis(40))).filter_map(move |_| { - let context = context_updates.clone(); - std::future::ready({ - let next = context.spectrum.lock().ok().map(|g| g.snapshot()); - - let sse_chunk: Option = match next { - Some((revision, _frame, _rds)) if last_revision == Some(revision) => None, - Some((revision, Some(frame), rds_json)) => { - last_revision = Some(revision); - let mut chunk = - format!("event: b\ndata: {}\n\n", encode_spectrum_frame(&frame)); - // rds_json is pre-serialised at ingestion; append an - // `rds` event only when the payload changed for this client. - if rds_json != last_rds_json { - let data = rds_json.as_deref().unwrap_or("null"); - chunk.push_str(&format!("event: rds\ndata: {data}\n\n")); - last_rds_json = rds_json; - } - Some(chunk) - } - Some((revision, None, _)) => { - last_revision = Some(revision); - Some("data: null\n\n".to_string()) - } - None if last_revision.is_some() => { - last_revision = None; - Some("data: null\n\n".to_string()) - } - None => None, - }; - - sse_chunk.map(|s| Ok::(Bytes::from(s))) - }) - }); + let mut last_had_frame = false; + let updates = WatchStream::new(rx).filter_map(move |snapshot| { + let sse_chunk: Option = if let Some(ref frame) = snapshot.frame { + last_had_frame = true; + let mut chunk = format!("event: b\ndata: {}\n\n", encode_spectrum_frame(frame)); + // rds_json is pre-serialised at ingestion; append an `rds` event + // only when the payload changes for this particular client. + if snapshot.rds_json != last_rds_json { + let data = snapshot.rds_json.as_deref().unwrap_or("null"); + chunk.push_str(&format!("event: rds\ndata: {data}\n\n")); + last_rds_json = snapshot.rds_json; + } + Some(chunk) + } else if last_had_frame { + last_had_frame = false; + Some("data: null\n\n".to_string()) + } else { + None + }; + std::future::ready(sse_chunk.map(|s| Ok::(Bytes::from(s)))) + }); let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) .map(|_| Ok::(Bytes::from(": ping\n\n")));