diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index 4f7a997..507af81 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -13,8 +13,8 @@ use std::collections::HashMap; use std::collections::HashSet; use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; @@ -35,6 +35,16 @@ use crate::rig_handle::RigHandle; const IO_TIMEOUT: Duration = Duration::from_secs(10); const REQUEST_TIMEOUT: Duration = Duration::from_secs(12); const MAX_JSON_LINE_BYTES: usize = 256 * 1024; +/// How long to cache satellite pass predictions before recomputing. +/// SGP4 propagation for 200+ satellites is CPU-intensive; caching avoids +/// redundant recomputation when multiple clients request passes concurrently. +const SAT_PASS_CACHE_TTL: Duration = Duration::from_secs(60); + +/// Cached satellite pass prediction result shared across client connections. +struct SatPassCache { + result: trx_core::geo::PassPredictionResult, + computed_at: Instant, +} /// Run the JSON TCP listener, accepting client connections. /// /// `rigs` is a shared map from rig_id → `RigHandle`. The first entry (by @@ -51,6 +61,7 @@ pub async fn run_listener( let listener = TcpListener::bind(addr).await?; info!("Listening on {}", addr); let validator = Arc::new(SimpleTokenValidator::new(auth_tokens)); + let sat_pass_cache: Arc>> = Arc::new(Mutex::new(None)); loop { tokio::select! { @@ -63,8 +74,9 @@ pub async fn run_listener( let validator = Arc::clone(&validator); let client_shutdown_rx = shutdown_rx.clone(); let coords = station_coords; + let cache = Arc::clone(&sat_pass_cache); tokio::spawn(async move { - if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, client_shutdown_rx).await { + if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, cache, client_shutdown_rx).await { error!("Client {} error: {:?}", peer, e); } }); @@ -161,6 +173,7 @@ async fn handle_client( default_rig_id: String, validator: Arc, station_coords: Option<(f64, f64)>, + sat_pass_cache: Arc>>, mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let (reader, mut writer) = socket.into_split(); @@ -271,15 +284,32 @@ async fn handle_client( } // GetSatPasses: compute satellite passes from the server-side TLE store. - // Runs on a blocking thread to avoid stalling the connection handler. + // Results are cached for SAT_PASS_CACHE_TTL to avoid redundant CPU-heavy + // SGP4 propagation when multiple clients request passes concurrently. if matches!(envelope.cmd, ClientCommand::GetSatPasses) { - let result = if let Some((lat, lon)) = station_coords { + // Check cache first. + let cached = sat_pass_cache + .lock() + .ok() + .and_then(|guard| { + guard.as_ref().and_then(|c| { + if c.computed_at.elapsed() < SAT_PASS_CACHE_TTL { + Some(c.result.clone()) + } else { + None + } + }) + }); + + let result = if let Some(cached_result) = cached { + cached_result + } else if let Some((lat, lon)) = station_coords { let now_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as i64; let window_ms = 24 * 3600 * 1000; // 24 hours - tokio::task::spawn_blocking(move || { + let fresh = tokio::task::spawn_blocking(move || { trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms) }) .await @@ -287,7 +317,15 @@ async fn handle_client( passes: vec![], satellite_count: 0, tle_source: trx_core::geo::TleSource::Unavailable, - }) + }); + // Update cache. + if let Ok(mut guard) = sat_pass_cache.lock() { + *guard = Some(SatPassCache { + result: fresh.clone(), + computed_at: Instant::now(), + }); + } + fresh } else { trx_core::geo::PassPredictionResult { passes: vec![], diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index c7a9d94..2e5509e 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -794,9 +794,6 @@ fn spawn_rig_audio_stack( } })); - // Start periodic TLE refresh from CelesTrak (on start + once/day). - trx_core::geo::spawn_tle_refresh_task(); - // Spawn weather satellite APT decoder task let wxsat_pcm_rx = pcm_tx.subscribe(); let wxsat_state_rx = state_rx.clone(); @@ -1145,6 +1142,11 @@ async fn main() -> DynResult<()> { // Spawn periodic flush of decode history to disk (every 60 s). history_store::spawn_flush_task(history_db, rig_histories_for_flush); + // Start periodic TLE refresh from CelesTrak (on start + once/day). + // Called once globally rather than per-rig to avoid redundant HTTP fetches + // and write-lock contention on the TLE store. + trx_core::geo::spawn_tle_refresh_task(); + // Start JSON TCP listener. if cfg.listen.enabled { let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);