From 2f7adf05c8ae38ba80bf5e7855c59834888c2486 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 28 Mar 2026 15:38:39 +0100 Subject: [PATCH] [feat](trx-rs): add GetSatPasses protocol command for server-side TLE management TLE refresh now happens only on trx-server (once at startup, then every 24h). Client fetches satellite predictions from server via new GetSatPasses fast-path command and caches them locally, refreshing every 5 minutes. Removes spawn_tle_refresh_task from trx-client. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Stan Grams --- src/trx-client/src/main.rs | 4 +- src/trx-client/src/remote_client.rs | 73 +++++++++++++++++++ src/trx-client/trx-frontend/src/lib.rs | 3 + .../trx-frontend-http-json/src/server.rs | 9 +++ .../trx-frontend/trx-frontend-http/src/api.rs | 59 +++++++-------- src/trx-protocol/src/codec.rs | 3 + src/trx-protocol/src/mapping.rs | 3 + src/trx-protocol/src/types.rs | 4 + src/trx-server/src/listener.rs | 44 ++++++++++- src/trx-server/src/main.rs | 2 + 10 files changed, 167 insertions(+), 37 deletions(-) diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 526dab4..71c685a 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -383,6 +383,7 @@ async fn async_init() -> DynResult { rig_server_connected: frontend_runtime.rig_server_connected.clone(), rig_id_to_short_name, short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: frontend_runtime.sat_passes.clone(), }; let state_tx = state_tx.clone(); let remote_shutdown_rx = shutdown_rx.clone(); @@ -564,9 +565,6 @@ async fn async_init() -> DynResult { let frontend_runtime_ctx = Arc::new(frontend_runtime); - // Fetch satellite TLEs from CelesTrak for pass predictions. - trx_core::geo::spawn_tle_refresh_task(); - // Start decode history collector before audio client starts replay. // Frontend tasks are spawned asynchronously, so starting the collector // here avoids missing the initial server-side history burst. diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index c1a0f0a..7b4f89c 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -75,6 +75,8 @@ pub struct RemoteClientConfig { /// Dynamically resolved reverse mapping: short_name → server rig_id. /// Populated during `refresh_remote_snapshot` when short-name mode is active. pub short_name_to_rig_id: Arc>>, + /// Cached satellite pass predictions from the server (GetSatPasses). + pub sat_passes: Arc>>, } pub async fn run_remote_client( @@ -301,6 +303,18 @@ async fn handle_connection( warn!("Initial remote snapshot refresh failed: {}", e); } + // Fetch satellite passes immediately and then every 5 minutes. + let sat_pass_interval = Duration::from_secs(5 * 60); + let mut last_sat_pass_refresh = Instant::now(); + match send_get_sat_passes(config, &mut writer, &mut reader).await { + Ok(result) => { + if let Ok(mut guard) = config.sat_passes.write() { + *guard = Some(result); + } + } + Err(e) => warn!("Initial sat passes fetch failed: {}", e), + } + loop { tokio::select! { changed = shutdown_rx.changed() => { @@ -339,6 +353,19 @@ async fn handle_connection( } else { poll_failure_streak = 0; } + + // Refresh satellite passes periodically (every 5 minutes). + if last_sat_pass_refresh.elapsed() >= sat_pass_interval { + last_sat_pass_refresh = Instant::now(); + match send_get_sat_passes(config, &mut writer, &mut reader).await { + Ok(result) => { + if let Ok(mut guard) = config.sat_passes.write() { + *guard = Some(result); + } + } + Err(e) => warn!("Sat passes refresh failed: {}", e), + } + } } req = rx.recv() => { let Some(req) = req else { @@ -591,6 +618,44 @@ async fn send_get_rigs( )) } +async fn send_get_sat_passes( + config: &RemoteClientConfig, + writer: &mut tokio::net::tcp::OwnedWriteHalf, + reader: &mut BufReader, +) -> RigResult { + let envelope = build_envelope(config, ClientCommand::GetSatPasses, None); + let mut payload = serde_json::to_string(&envelope) + .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); + + time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + time::timeout(IO_TIMEOUT, writer.flush()) + .await + .map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))? + .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; + + let line = time::timeout(IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES)) + .await + .map_err(|_| RigError::communication(format!("read timed out after {:?}", IO_TIMEOUT)))? + .map_err(|e| RigError::communication(format!("read failed: {e}")))?; + let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?; + + let resp: ClientResponse = serde_json::from_str(line.trim_end()) + .map_err(|e| RigError::communication(format!("invalid response: {e}")))?; + if resp.success { + return resp + .sat_passes + .ok_or_else(|| RigError::communication("missing sat_passes in GetSatPasses response")); + } + + Err(RigError::communication( + resp.error.unwrap_or_else(|| "remote error".into()), + )) +} + fn cache_remote_rigs( config: &RemoteClientConfig, _raw_rigs: &[RigEntry], @@ -1135,6 +1200,7 @@ mod tests { state: snapshot.clone(), audio_port: Some(4531), }]), + sat_passes: None, error: None, }) .expect("serialize response") @@ -1181,6 +1247,7 @@ mod tests { rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_id_to_short_name: HashMap::new(), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: Arc::new(RwLock::new(None)), }, req_rx, state_tx, @@ -1224,6 +1291,7 @@ mod tests { rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_id_to_short_name: HashMap::new(), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: Arc::new(RwLock::new(None)), }; let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); assert_eq!(envelope.token.as_deref(), Some("secret")); @@ -1250,6 +1318,7 @@ mod tests { rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]), short_name_to_rig_id, + sat_passes: Arc::new(RwLock::new(None)), }; // selected_rig_id is "home-hf" (short name), envelope should translate to "hf" let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); @@ -1280,6 +1349,7 @@ mod tests { rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_id_to_short_name: HashMap::new(), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: Arc::new(RwLock::new(None)), }; // Legacy mode: rig_id passes through unchanged assert!(!has_short_names(&config)); @@ -1305,6 +1375,7 @@ mod tests { (None, "default-rig".to_string()), ]), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: Arc::new(RwLock::new(None)), }; assert!(has_short_names(&config)); assert_eq!( @@ -1340,6 +1411,7 @@ mod tests { rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: Arc::new(RwLock::new(None)), }; let snapshot = sample_snapshot(); let rigs = vec![RigEntry { @@ -1411,6 +1483,7 @@ mod tests { rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), + sat_passes: Arc::new(RwLock::new(None)), }; let ids = super::active_spectrum_rig_ids(&config); diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index fb29f95..895f4d6 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -260,6 +260,8 @@ pub struct FrontendRuntimeContext { pub remote_active_rig_id: Arc>>, /// Cached remote rig list from GetRigs polling. pub remote_rigs: Arc>>, + /// Cached satellite pass predictions from the server (GetSatPasses). + pub sat_passes: Arc>>, /// Per-rig state watch channels, keyed by rig_id. /// Populated by the remote client poll loop so each SSE session can /// subscribe to a specific rig's state independently. @@ -391,6 +393,7 @@ impl FrontendRuntimeContext { http_decode_history_retention_min_by_rig: HashMap::new(), remote_active_rig_id: Arc::new(Mutex::new(None)), remote_rigs: Arc::new(Mutex::new(Vec::new())), + sat_passes: Arc::new(RwLock::new(None)), rig_states: Arc::new(RwLock::new(HashMap::new())), owner_callsign: None, owner_website_url: None, diff --git a/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs index 771b6a1..08fc058 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs @@ -107,6 +107,7 @@ async fn handle_client( rig_id: None, state: None, rigs: None, + sat_passes: None, error: Some(format!("Invalid JSON: {}", e)), }; send_response(&mut writer, &resp).await?; @@ -120,6 +121,7 @@ async fn handle_client( rig_id: None, state: None, rigs: None, + sat_passes: None, error: Some(err), }; send_response(&mut writer, &resp).await?; @@ -138,6 +140,7 @@ async fn handle_client( rig_id: Some("client".to_string()), state: None, rigs: Some(snapshot_remote_rigs(context.as_ref())), + sat_passes: None, error: None, }; send_response(&mut writer, &resp).await?; @@ -168,6 +171,7 @@ async fn handle_client( rig_id: active_rig_id.clone(), state: None, rigs: None, + sat_passes: None, error: Some("Internal error: rig task not available".into()), }; send_response(&mut writer, &resp).await?; @@ -179,6 +183,7 @@ async fn handle_client( rig_id: active_rig_id.clone(), state: None, rigs: None, + sat_passes: None, error: Some("Internal error: request queue timeout".into()), }; send_response(&mut writer, &resp).await?; @@ -193,6 +198,7 @@ async fn handle_client( rig_id: active_rig_id.clone(), state: Some(snapshot), rigs: None, + sat_passes: None, error: None, }; send_response(&mut writer, &resp).await?; @@ -203,6 +209,7 @@ async fn handle_client( rig_id: active_rig_id.clone(), state: None, rigs: None, + sat_passes: None, error: Some(err.message), }; send_response(&mut writer, &resp).await?; @@ -214,6 +221,7 @@ async fn handle_client( rig_id: active_rig_id.clone(), state: None, rigs: None, + sat_passes: None, error: Some("Internal error waiting for rig response".into()), }; send_response(&mut writer, &resp).await?; @@ -224,6 +232,7 @@ async fn handle_client( rig_id: active_rig_id.clone(), state: None, rigs: None, + sat_passes: None, error: Some("Request timed out waiting for rig response".into()), }; send_response(&mut writer, &resp).await?; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index f1a3515..a195ac4 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -1384,43 +1384,34 @@ struct SatPassesResponse { /// Return predicted passes for all known satellites over the next 24 h. /// -/// Requires the server station location to be configured. Returns an empty -/// `passes` array with an `error` field if the location is missing or TLE -/// data has not been fetched yet. +/// Reads cached predictions from the server (fetched via GetSatPasses). +/// Returns an empty `passes` array with an `error` field if predictions +/// are not yet available. #[get("/sat_passes")] -pub async fn sat_passes(state: web::Data>) -> impl Responder { - let rig_state = state.get_ref().borrow().clone(); - let lat = rig_state.server_latitude; - let lon = rig_state.server_longitude; - - let (Some(lat), Some(lon)) = (lat, lon) else { - return web::Json(SatPassesResponse { +pub async fn sat_passes(context: web::Data>) -> impl Responder { + let cached = context.sat_passes.read().ok().and_then(|g| g.clone()); + match cached { + Some(result) => { + let error = match result.tle_source { + trx_core::geo::TleSource::Unavailable => { + Some("TLE data not yet available — waiting for CelesTrak fetch".to_string()) + } + trx_core::geo::TleSource::Celestrak => None, + }; + web::Json(SatPassesResponse { + passes: result.passes, + error, + satellite_count: result.satellite_count, + tle_source: result.tle_source, + }) + } + None => web::Json(SatPassesResponse { passes: vec![], - error: Some("No station location configured".to_string()), + error: Some("Satellite predictions not yet available from server".to_string()), satellite_count: 0, tle_source: trx_core::geo::TleSource::Unavailable, - }); - }; - - let now_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as i64; - let window_ms = 24 * 60 * 60 * 1000_i64; - - let result = trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms); - let error = match result.tle_source { - trx_core::geo::TleSource::Unavailable => { - Some("TLE data not yet available — waiting for CelesTrak fetch".to_string()) - } - trx_core::geo::TleSource::Celestrak => None, - }; - web::Json(SatPassesResponse { - passes: result.passes, - error, - satellite_count: result.satellite_count, - tle_source: result.tle_source, - }) + }), + } } #[post("/clear_ft8_decode")] @@ -2400,6 +2391,7 @@ async fn send_command( rig_id: None, state: Some(snapshot), rigs: None, + sat_passes: None, error: None, })), Ok(Err(err)) => Ok(HttpResponse::BadRequest().json(ClientResponse { @@ -2407,6 +2399,7 @@ async fn send_command( rig_id: None, state: None, rigs: None, + sat_passes: None, error: Some(err.message), })), Err(e) => Err(actix_web::error::ErrorInternalServerError(format!( diff --git a/src/trx-protocol/src/codec.rs b/src/trx-protocol/src/codec.rs index d4b0b77..61083ad 100644 --- a/src/trx-protocol/src/codec.rs +++ b/src/trx-protocol/src/codec.rs @@ -263,6 +263,7 @@ mod tests { rig_id: Some("hf".to_string()), state: None, rigs: None, + sat_passes: None, error: None, }; let json = serde_json::to_string(&resp).unwrap(); @@ -279,6 +280,7 @@ mod tests { rig_id: None, state: None, rigs: None, + sat_passes: None, error: Some("bad".to_string()), }; let json = serde_json::to_string(&resp).unwrap(); @@ -296,6 +298,7 @@ mod tests { rig_id: Some("server".to_string()), state: None, rigs: None, + sat_passes: None, error: None, }; let json = serde_json::to_string(&resp).unwrap(); diff --git a/src/trx-protocol/src/mapping.rs b/src/trx-protocol/src/mapping.rs index b984560..4bae01b 100644 --- a/src/trx-protocol/src/mapping.rs +++ b/src/trx-protocol/src/mapping.rs @@ -19,6 +19,9 @@ pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand { ClientCommand::GetRigs => { unreachable!("GetRigs is handled in the listener before reaching rig_task") } + ClientCommand::GetSatPasses => { + unreachable!("GetSatPasses is handled in the listener before reaching rig_task") + } ClientCommand::GetState => RigCommand::GetSnapshot, ClientCommand::SetFreq { freq_hz } => RigCommand::SetFreq(Freq { hz: freq_hz }), ClientCommand::SetCenterFreq { freq_hz } => RigCommand::SetCenterFreq(Freq { hz: freq_hz }), diff --git a/src/trx-protocol/src/types.rs b/src/trx-protocol/src/types.rs index e180036..cc4a059 100644 --- a/src/trx-protocol/src/types.rs +++ b/src/trx-protocol/src/types.rs @@ -15,6 +15,7 @@ use trx_core::WfmDenoiseLevel; pub enum ClientCommand { GetState, GetRigs, + GetSatPasses, SetFreq { freq_hz: u64 }, SetCenterFreq { freq_hz: u64 }, SetMode { mode: String }, @@ -95,5 +96,8 @@ pub struct ClientResponse { /// Populated only for GetRigs responses. #[serde(default, skip_serializing_if = "Option::is_none")] pub rigs: Option>, + /// Populated only for GetSatPasses responses. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sat_passes: Option, pub error: Option, } diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index 1e4134c..f09f995 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -45,6 +45,7 @@ pub async fn run_listener( rigs: Arc>, default_rig_id: String, auth_tokens: HashSet, + station_coords: Option<(f64, f64)>, mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; @@ -61,8 +62,9 @@ pub async fn run_listener( let default_rig_id = default_rig_id.clone(); let validator = Arc::clone(&validator); let client_shutdown_rx = shutdown_rx.clone(); + let coords = station_coords; tokio::spawn(async move { - if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, client_shutdown_rx).await { + if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, client_shutdown_rx).await { error!("Client {} error: {:?}", peer, e); } }); @@ -158,6 +160,7 @@ async fn handle_client( rigs: Arc>, default_rig_id: String, validator: Arc, + station_coords: Option<(f64, f64)>, mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let (reader, mut writer) = socket.into_split(); @@ -213,6 +216,7 @@ async fn handle_client( rig_id: None, state: None, rigs: None, + sat_passes: None, error: Some(format!("Invalid JSON: {}", e)), }; send_response(&mut writer, &resp).await?; @@ -226,6 +230,7 @@ async fn handle_client( rig_id: None, state: None, rigs: None, + sat_passes: None, error: Some(err), }; send_response(&mut writer, &resp).await?; @@ -258,6 +263,35 @@ async fn handle_client( rig_id: Some("server".to_string()), state: None, rigs: Some(entries), + sat_passes: None, + error: None, + }; + send_response(&mut writer, &resp).await?; + continue; + } + + // GetSatPasses: compute satellite passes from the server-side TLE store. + if matches!(envelope.cmd, ClientCommand::GetSatPasses) { + let result = if let Some((lat, lon)) = station_coords { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + let window_ms = 24 * 3600 * 1000; // 24 hours + trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms) + } else { + trx_core::geo::PassPredictionResult { + passes: vec![], + satellite_count: 0, + tle_source: trx_core::geo::TleSource::Unavailable, + } + }; + let resp = ClientResponse { + success: true, + rig_id: Some("server".to_string()), + state: None, + rigs: None, + sat_passes: Some(result), error: None, }; send_response(&mut writer, &resp).await?; @@ -274,6 +308,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: None, rigs: None, + sat_passes: None, error: Some(format!("Unknown rig_id: {}", target_rig_id)), }; send_response(&mut writer, &resp).await?; @@ -292,6 +327,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: Some(snapshot), rigs: None, + sat_passes: None, error: None, }; send_response(&mut writer, &resp).await?; @@ -318,6 +354,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: None, rigs: None, + sat_passes: None, error: Some("Internal error: rig task not available".into()), }; send_response(&mut writer, &resp).await?; @@ -329,6 +366,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: None, rigs: None, + sat_passes: None, error: Some("Internal error: request queue timeout".into()), }; send_response(&mut writer, &resp).await?; @@ -346,6 +384,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: None, rigs: None, + sat_passes: None, error: Some("Request timed out waiting for rig response".into()), }; send_response(&mut writer, &resp).await?; @@ -370,6 +409,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: Some(snapshot), rigs: None, + sat_passes: None, error: None, }; send_response(&mut writer, &resp).await?; @@ -380,6 +420,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: None, rigs: None, + sat_passes: None, error: Some(err.message), }; send_response(&mut writer, &resp).await?; @@ -391,6 +432,7 @@ async fn handle_client( rig_id: Some(target_rig_id.clone()), state: None, rigs: None, + sat_passes: None, error: Some("Internal error waiting for rig response".into()), }; send_response(&mut writer, &resp).await?; diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 7d44a6b..c7a9d94 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -1161,11 +1161,13 @@ async fn main() -> DynResult<()> { let rigs_arc = Arc::new(rig_handles); let listener_shutdown_rx = shutdown_rx.clone(); task_handles.push(tokio::spawn(async move { + let station_coords = latitude.zip(longitude); if let Err(e) = listener::run_listener( listen_addr, rigs_arc, default_rig_id, auth_tokens, + station_coords, listener_shutdown_rx, ) .await