diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 077ce1d..039512d 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -103,6 +103,27 @@ pub async fn decode_events() -> Result { }; tracing::info!("/decode SSE client connected"); + let history = { + let mut out = Vec::new(); + out.extend( + crate::server::audio::snapshot_aprs_history() + .into_iter() + .map(trx_core::decode::DecodedMessage::Aprs), + ); + out.extend( + crate::server::audio::snapshot_ft8_history() + .into_iter() + .map(trx_core::decode::DecodedMessage::Ft8), + ); + out + }; + + let history_stream = futures_util::stream::iter(history.into_iter().filter_map(|msg| { + serde_json::to_string(&msg) + .ok() + .map(|json| Ok::(Bytes::from(format!("data: {json}\n\n")))) + })); + let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move { loop { match rx.recv().await { @@ -123,7 +144,7 @@ pub async fn decode_events() -> Result { let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) .map(|_| Ok::(Bytes::from(": ping\n\n"))); - let stream = select(pings, decode_stream); + let stream = history_stream.chain(select(pings, decode_stream)); Ok(HttpResponse::Ok() .insert_header((header::CONTENT_TYPE, "text/event-stream"))