From 47a85d98328cec9ac714d5c24b7e2dea9c417aaa Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 28 Mar 2026 17:39:02 +0100 Subject: [PATCH] [fix](trx-rs): add NOAA-15/18/19 TLEs and move sat pass refresh off main connection CelesTrak GROUP=weather does not include legacy NOAA POES satellites. Added GROUP=noaa fetch so NOAA-15/18/19 appear in predictions. Moved GetSatPasses to a dedicated TCP connection (client) and spawn_blocking (server) so pass computation never blocks state polling. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Stan Grams --- src/trx-client/src/remote_client.rs | 187 ++++++++++++++++++---------- src/trx-core/src/geo.rs | 22 +++- src/trx-server/src/listener.rs | 11 +- 3 files changed, 154 insertions(+), 66 deletions(-) diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 5a2e3fb..23a6de7 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -212,6 +212,116 @@ async fn run_spectrum_connection( } } +/// Satellite pass prediction refresh runs on a dedicated TCP connection so it +/// never blocks state polls or user commands on the main connection. +/// Fetches immediately on connect, then every 5 minutes. +async fn run_sat_pass_connection( + config: RemoteClientConfig, + mut shutdown_rx: watch::Receiver, +) { + const SAT_PASS_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); + // Allow extra time for server-side SGP4 computation. + const SAT_PASS_IO_TIMEOUT: Duration = Duration::from_secs(30); + + loop { + if *shutdown_rx.borrow() { + break; + } + + match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await { + Ok(Ok(stream)) => { + let _ = stream.set_nodelay(true); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut interval = time::interval(SAT_PASS_REFRESH_INTERVAL); + + 'inner: loop { + tokio::select! { + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => return, + Ok(()) => {} + Err(_) => return, + } + } + _ = interval.tick() => { + match send_get_sat_passes_on( + &config, + &mut writer, + &mut reader, + SAT_PASS_IO_TIMEOUT, + ) + .await + { + Ok(result) => { + if let Ok(mut guard) = config.sat_passes.write() { + *guard = Some(result); + } + } + Err(e) => { + warn!("Sat passes refresh failed: {}", e); + break 'inner; + } + } + } + } + } + } + Ok(Err(e)) => warn!("Sat-pass connect failed: {}", e), + Err(_) => warn!("Sat-pass connect timed out"), + } + + tokio::select! { + _ = time::sleep(Duration::from_secs(5)) => {} + changed = shutdown_rx.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() { + break; + } + } + } + } +} + +/// Send a GetSatPasses request on the given connection with a custom timeout. +async fn send_get_sat_passes_on( + config: &RemoteClientConfig, + writer: &mut tokio::net::tcp::OwnedWriteHalf, + reader: &mut BufReader, + timeout: Duration, +) -> RigResult { + let envelope = build_envelope(config, ClientCommand::GetSatPasses, None); + let mut payload = serde_json::to_string(&envelope) + .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); + + time::timeout(timeout, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| RigError::communication(format!("write timed out after {timeout:?}")))? + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + time::timeout(timeout, writer.flush()) + .await + .map_err(|_| RigError::communication(format!("flush timed out after {timeout:?}")))? + .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; + + let line = time::timeout(timeout, read_limited_line(reader, MAX_JSON_LINE_BYTES)) + .await + .map_err(|_| RigError::communication(format!("read timed out after {timeout:?}")))? + .map_err(|e| RigError::communication(format!("read failed: {e}")))?; + let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?; + + let resp: ClientResponse = serde_json::from_str(line.trim_end()) + .map_err(|e| RigError::communication(format!("invalid response: {e}")))?; + if resp.success { + return resp + .sat_passes + .ok_or_else(|| RigError::communication("missing sat_passes in GetSatPasses response")); + } + + Err(RigError::communication( + resp.error.unwrap_or_else(|| "remote error".into()), + )) +} + async fn handle_spectrum_connection( config: &RemoteClientConfig, stream: TcpStream, @@ -303,25 +413,23 @@ async fn handle_connection( warn!("Initial remote snapshot refresh failed: {}", e); } - // Fetch satellite passes immediately and then every 5 minutes. - let sat_pass_interval = Duration::from_secs(5 * 60); - let mut last_sat_pass_refresh = Instant::now(); - match send_get_sat_passes(config, &mut writer, &mut reader).await { - Ok(result) => { - if let Ok(mut guard) = config.sat_passes.write() { - *guard = Some(result); - } - } - Err(e) => warn!("Initial sat passes fetch failed: {}", e), - } + // Satellite pass refresh runs on its own dedicated TCP connection so it + // never blocks state polls or user commands on the main connection. + let sat_pass_task = tokio::spawn(run_sat_pass_connection(config.clone(), shutdown_rx.clone())); loop { tokio::select! { changed = shutdown_rx.changed() => { match changed { - Ok(()) if *shutdown_rx.borrow() => return Ok(()), + Ok(()) if *shutdown_rx.borrow() => { + sat_pass_task.abort(); + return Ok(()); + } Ok(()) => {} - Err(_) => return Ok(()), + Err(_) => { + sat_pass_task.abort(); + return Ok(()); + } } } _ = poll_interval.tick() => { @@ -342,9 +450,11 @@ async fn handle_connection( e.message.contains("timed out") || e.message.contains("connection closed"); if timeout_or_disconnect { + sat_pass_task.abort(); return Err(e); } if poll_failure_streak >= MAX_CONSECUTIVE_POLL_FAILURES { + sat_pass_task.abort(); return Err(RigError::communication(format!( "remote poll failed {} consecutive times: {}", poll_failure_streak, e @@ -353,19 +463,6 @@ async fn handle_connection( } else { poll_failure_streak = 0; } - - // Refresh satellite passes periodically (every 5 minutes). - if last_sat_pass_refresh.elapsed() >= sat_pass_interval { - last_sat_pass_refresh = Instant::now(); - match send_get_sat_passes(config, &mut writer, &mut reader).await { - Ok(result) => { - if let Ok(mut guard) = config.sat_passes.write() { - *guard = Some(result); - } - } - Err(e) => warn!("Sat passes refresh failed: {}", e), - } - } } req = rx.recv() => { let Some(req) = req else { @@ -618,44 +715,6 @@ async fn send_get_rigs( )) } -async fn send_get_sat_passes( - config: &RemoteClientConfig, - writer: &mut tokio::net::tcp::OwnedWriteHalf, - reader: &mut BufReader, -) -> RigResult { - let envelope = build_envelope(config, ClientCommand::GetSatPasses, None); - let mut payload = serde_json::to_string(&envelope) - .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; - payload.push('\n'); - - time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes())) - .await - .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? - .map_err(|e| RigError::communication(format!("write failed: {e}")))?; - time::timeout(IO_TIMEOUT, writer.flush()) - .await - .map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))? - .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; - - let line = time::timeout(IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES)) - .await - .map_err(|_| RigError::communication(format!("read timed out after {:?}", IO_TIMEOUT)))? - .map_err(|e| RigError::communication(format!("read failed: {e}")))?; - let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?; - - let resp: ClientResponse = serde_json::from_str(line.trim_end()) - .map_err(|e| RigError::communication(format!("invalid response: {e}")))?; - if resp.success { - return resp - .sat_passes - .ok_or_else(|| RigError::communication("missing sat_passes in GetSatPasses response")); - } - - Err(RigError::communication( - resp.error.unwrap_or_else(|| "remote error".into()), - )) -} - fn cache_remote_rigs( config: &RemoteClientConfig, _raw_rigs: &[RigEntry], diff --git a/src/trx-core/src/geo.rs b/src/trx-core/src/geo.rs index 591c058..b75d0c3 100644 --- a/src/trx-core/src/geo.rs +++ b/src/trx-core/src/geo.rs @@ -45,6 +45,10 @@ const EARTH_RADIUS_KM: f64 = 6371.0; const CELESTRAK_WEATHER_URL: &str = "https://celestrak.org/NORAD/elements/gp.php?GROUP=weather&FORMAT=tle"; +/// CelesTrak NOAA satellite TLE endpoint (includes NOAA-15/18/19 APT sats). +const CELESTRAK_NOAA_URL: &str = + "https://celestrak.org/NORAD/elements/gp.php?GROUP=noaa&FORMAT=tle"; + /// CelesTrak amateur satellite TLE endpoint. const CELESTRAK_HAM_URL: &str = "https://celestrak.org/NORAD/elements/gp.php?GROUP=amateur&FORMAT=tle"; @@ -280,7 +284,7 @@ pub async fn refresh_tles_from_celestrak() -> Result { /// do not stop the periodic refresh — hardcoded fallback TLEs remain usable. pub fn spawn_tle_refresh_task() { tokio::spawn(async { - // Initial fetch at startup: weather + amateur satellites. + // Initial fetch at startup: weather + NOAA + amateur satellites. match fetch_and_merge_tles(CELESTRAK_WEATHER_URL, SatCategory::Weather).await { Ok(n) => { tracing::info!("TLE refresh: loaded {n} weather satellite TLEs from CelesTrak") @@ -289,6 +293,14 @@ pub fn spawn_tle_refresh_task() { tracing::warn!("TLE refresh: weather fetch failed ({e}), using hardcoded TLEs") } } + match fetch_and_merge_tles(CELESTRAK_NOAA_URL, SatCategory::Weather).await { + Ok(n) => { + tracing::info!("TLE refresh: loaded {n} NOAA satellite TLEs from CelesTrak") + } + Err(e) => { + tracing::warn!("TLE refresh: NOAA fetch failed ({e})") + } + } match fetch_and_merge_tles(CELESTRAK_HAM_URL, SatCategory::Amateur).await { Ok(n) => { tracing::info!("TLE refresh: loaded {n} amateur satellite TLEs from CelesTrak") @@ -311,6 +323,14 @@ pub fn spawn_tle_refresh_task() { tracing::warn!("TLE refresh: weather fetch failed ({e}), keeping previous TLEs") } } + match fetch_and_merge_tles(CELESTRAK_NOAA_URL, SatCategory::Weather).await { + Ok(n) => { + tracing::info!("TLE refresh: updated {n} NOAA satellite TLEs from CelesTrak") + } + Err(e) => { + tracing::warn!("TLE refresh: NOAA fetch failed ({e}), keeping previous TLEs") + } + } match fetch_and_merge_tles(CELESTRAK_HAM_URL, SatCategory::Amateur).await { Ok(n) => { tracing::info!("TLE refresh: updated {n} amateur satellite TLEs from CelesTrak") diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index 8e3405d..4f7a997 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -271,6 +271,7 @@ async fn handle_client( } // GetSatPasses: compute satellite passes from the server-side TLE store. + // Runs on a blocking thread to avoid stalling the connection handler. if matches!(envelope.cmd, ClientCommand::GetSatPasses) { let result = if let Some((lat, lon)) = station_coords { let now_ms = std::time::SystemTime::now() @@ -278,7 +279,15 @@ async fn handle_client( .unwrap_or_default() .as_millis() as i64; let window_ms = 24 * 3600 * 1000; // 24 hours - trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms) + tokio::task::spawn_blocking(move || { + trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms) + }) + .await + .unwrap_or_else(|_| trx_core::geo::PassPredictionResult { + passes: vec![], + satellite_count: 0, + tle_source: trx_core::geo::TleSource::Unavailable, + }) } else { trx_core::geo::PassPredictionResult { passes: vec![],