[refactor](trx-frontend-http): replace /spectrum poll+mutex with WatchStream
Subscribe each SSE client to the watch::Sender<SharedSpectrum> rather than running an IntervalStream that locks a Mutex every 40 ms. Clients are now woken push-style exactly when new spectrum data arrives; the revision counter is no longer needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -414,44 +414,31 @@ impl<I> futures_util::Stream for DropStream<I> {
|
|||||||
pub async fn spectrum(
|
pub async fn spectrum(
|
||||||
context: web::Data<Arc<FrontendRuntimeContext>>,
|
context: web::Data<Arc<FrontendRuntimeContext>>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let context_updates = context.get_ref().clone();
|
// Subscribe to the watch channel: each client gets its own receiver and is
|
||||||
let mut last_revision: Option<u64> = None;
|
// woken exactly when new spectrum data is pushed (no 40 ms polling needed).
|
||||||
|
let rx = context.spectrum.subscribe();
|
||||||
let mut last_rds_json: Option<String> = None;
|
let mut last_rds_json: Option<String> = None;
|
||||||
let updates =
|
let mut last_had_frame = false;
|
||||||
IntervalStream::new(time::interval(Duration::from_millis(40))).filter_map(move |_| {
|
let updates = WatchStream::new(rx).filter_map(move |snapshot| {
|
||||||
let context = context_updates.clone();
|
let sse_chunk: Option<String> = if let Some(ref frame) = snapshot.frame {
|
||||||
std::future::ready({
|
last_had_frame = true;
|
||||||
let next = context.spectrum.lock().ok().map(|g| g.snapshot());
|
let mut chunk = format!("event: b\ndata: {}\n\n", encode_spectrum_frame(frame));
|
||||||
|
// rds_json is pre-serialised at ingestion; append an `rds` event
|
||||||
let sse_chunk: Option<String> = match next {
|
// only when the payload changes for this particular client.
|
||||||
Some((revision, _frame, _rds)) if last_revision == Some(revision) => None,
|
if snapshot.rds_json != last_rds_json {
|
||||||
Some((revision, Some(frame), rds_json)) => {
|
let data = snapshot.rds_json.as_deref().unwrap_or("null");
|
||||||
last_revision = Some(revision);
|
chunk.push_str(&format!("event: rds\ndata: {data}\n\n"));
|
||||||
let mut chunk =
|
last_rds_json = snapshot.rds_json;
|
||||||
format!("event: b\ndata: {}\n\n", encode_spectrum_frame(&frame));
|
}
|
||||||
// rds_json is pre-serialised at ingestion; append an
|
Some(chunk)
|
||||||
// `rds` event only when the payload changed for this client.
|
} else if last_had_frame {
|
||||||
if rds_json != last_rds_json {
|
last_had_frame = false;
|
||||||
let data = rds_json.as_deref().unwrap_or("null");
|
Some("data: null\n\n".to_string())
|
||||||
chunk.push_str(&format!("event: rds\ndata: {data}\n\n"));
|
} else {
|
||||||
last_rds_json = rds_json;
|
None
|
||||||
}
|
};
|
||||||
Some(chunk)
|
std::future::ready(sse_chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s))))
|
||||||
}
|
});
|
||||||
Some((revision, None, _)) => {
|
|
||||||
last_revision = Some(revision);
|
|
||||||
Some("data: null\n\n".to_string())
|
|
||||||
}
|
|
||||||
None if last_revision.is_some() => {
|
|
||||||
last_revision = None;
|
|
||||||
Some("data: null\n\n".to_string())
|
|
||||||
}
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
sse_chunk.map(|s| Ok::<Bytes, Error>(Bytes::from(s)))
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
|
||||||
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
|
||||||
|
|||||||
Reference in New Issue
Block a user