[fix](trx-rs): add NOAA-15/18/19 TLEs and move sat pass refresh off main connection

CelesTrak GROUP=weather does not include legacy NOAA POES satellites.
Added GROUP=noaa fetch so NOAA-15/18/19 appear in predictions. Moved
GetSatPasses to a dedicated TCP connection (client) and spawn_blocking
(server) so pass computation never blocks state polling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-28 17:39:02 +01:00
parent 91f50ebb3f
commit 47a85d9832
3 changed files with 154 additions and 66 deletions
+123 -64
View File
@@ -212,6 +212,116 @@ async fn run_spectrum_connection(
} }
} }
/// Satellite pass prediction refresh runs on a dedicated TCP connection so it
/// never blocks state polls or user commands on the main connection.
/// Fetches immediately on connect, then every 5 minutes.
async fn run_sat_pass_connection(
config: RemoteClientConfig,
mut shutdown_rx: watch::Receiver<bool>,
) {
const SAT_PASS_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
// Allow extra time for server-side SGP4 computation.
const SAT_PASS_IO_TIMEOUT: Duration = Duration::from_secs(30);
loop {
if *shutdown_rx.borrow() {
break;
}
match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
Ok(Ok(stream)) => {
let _ = stream.set_nodelay(true);
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut interval = time::interval(SAT_PASS_REFRESH_INTERVAL);
'inner: loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => return,
Ok(()) => {}
Err(_) => return,
}
}
_ = interval.tick() => {
match send_get_sat_passes_on(
&config,
&mut writer,
&mut reader,
SAT_PASS_IO_TIMEOUT,
)
.await
{
Ok(result) => {
if let Ok(mut guard) = config.sat_passes.write() {
*guard = Some(result);
}
}
Err(e) => {
warn!("Sat passes refresh failed: {}", e);
break 'inner;
}
}
}
}
}
}
Ok(Err(e)) => warn!("Sat-pass connect failed: {}", e),
Err(_) => warn!("Sat-pass connect timed out"),
}
tokio::select! {
_ = time::sleep(Duration::from_secs(5)) => {}
changed = shutdown_rx.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
break;
}
}
}
}
}
/// Send a GetSatPasses request on the given connection with a custom timeout.
async fn send_get_sat_passes_on(
config: &RemoteClientConfig,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
timeout: Duration,
) -> RigResult<trx_core::geo::PassPredictionResult> {
let envelope = build_envelope(config, ClientCommand::GetSatPasses, None);
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(timeout, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| RigError::communication(format!("write timed out after {timeout:?}")))?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?;
time::timeout(timeout, writer.flush())
.await
.map_err(|_| RigError::communication(format!("flush timed out after {timeout:?}")))?
.map_err(|e| RigError::communication(format!("flush failed: {e}")))?;
let line = time::timeout(timeout, read_limited_line(reader, MAX_JSON_LINE_BYTES))
.await
.map_err(|_| RigError::communication(format!("read timed out after {timeout:?}")))?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?;
let resp: ClientResponse = serde_json::from_str(line.trim_end())
.map_err(|e| RigError::communication(format!("invalid response: {e}")))?;
if resp.success {
return resp
.sat_passes
.ok_or_else(|| RigError::communication("missing sat_passes in GetSatPasses response"));
}
Err(RigError::communication(
resp.error.unwrap_or_else(|| "remote error".into()),
))
}
async fn handle_spectrum_connection( async fn handle_spectrum_connection(
config: &RemoteClientConfig, config: &RemoteClientConfig,
stream: TcpStream, stream: TcpStream,
@@ -303,25 +413,23 @@ async fn handle_connection(
warn!("Initial remote snapshot refresh failed: {}", e); warn!("Initial remote snapshot refresh failed: {}", e);
} }
// Fetch satellite passes immediately and then every 5 minutes. // Satellite pass refresh runs on its own dedicated TCP connection so it
let sat_pass_interval = Duration::from_secs(5 * 60); // never blocks state polls or user commands on the main connection.
let mut last_sat_pass_refresh = Instant::now(); let sat_pass_task = tokio::spawn(run_sat_pass_connection(config.clone(), shutdown_rx.clone()));
match send_get_sat_passes(config, &mut writer, &mut reader).await {
Ok(result) => {
if let Ok(mut guard) = config.sat_passes.write() {
*guard = Some(result);
}
}
Err(e) => warn!("Initial sat passes fetch failed: {}", e),
}
loop { loop {
tokio::select! { tokio::select! {
changed = shutdown_rx.changed() => { changed = shutdown_rx.changed() => {
match changed { match changed {
Ok(()) if *shutdown_rx.borrow() => return Ok(()), Ok(()) if *shutdown_rx.borrow() => {
sat_pass_task.abort();
return Ok(());
}
Ok(()) => {} Ok(()) => {}
Err(_) => return Ok(()), Err(_) => {
sat_pass_task.abort();
return Ok(());
}
} }
} }
_ = poll_interval.tick() => { _ = poll_interval.tick() => {
@@ -342,9 +450,11 @@ async fn handle_connection(
e.message.contains("timed out") e.message.contains("timed out")
|| e.message.contains("connection closed"); || e.message.contains("connection closed");
if timeout_or_disconnect { if timeout_or_disconnect {
sat_pass_task.abort();
return Err(e); return Err(e);
} }
if poll_failure_streak >= MAX_CONSECUTIVE_POLL_FAILURES { if poll_failure_streak >= MAX_CONSECUTIVE_POLL_FAILURES {
sat_pass_task.abort();
return Err(RigError::communication(format!( return Err(RigError::communication(format!(
"remote poll failed {} consecutive times: {}", "remote poll failed {} consecutive times: {}",
poll_failure_streak, e poll_failure_streak, e
@@ -353,19 +463,6 @@ async fn handle_connection(
} else { } else {
poll_failure_streak = 0; poll_failure_streak = 0;
} }
// Refresh satellite passes periodically (every 5 minutes).
if last_sat_pass_refresh.elapsed() >= sat_pass_interval {
last_sat_pass_refresh = Instant::now();
match send_get_sat_passes(config, &mut writer, &mut reader).await {
Ok(result) => {
if let Ok(mut guard) = config.sat_passes.write() {
*guard = Some(result);
}
}
Err(e) => warn!("Sat passes refresh failed: {}", e),
}
}
} }
req = rx.recv() => { req = rx.recv() => {
let Some(req) = req else { let Some(req) = req else {
@@ -618,44 +715,6 @@ async fn send_get_rigs(
)) ))
} }
async fn send_get_sat_passes(
config: &RemoteClientConfig,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
) -> RigResult<trx_core::geo::PassPredictionResult> {
let envelope = build_envelope(config, ClientCommand::GetSatPasses, None);
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?;
time::timeout(IO_TIMEOUT, writer.flush())
.await
.map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("flush failed: {e}")))?;
let line = time::timeout(IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES))
.await
.map_err(|_| RigError::communication(format!("read timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?;
let resp: ClientResponse = serde_json::from_str(line.trim_end())
.map_err(|e| RigError::communication(format!("invalid response: {e}")))?;
if resp.success {
return resp
.sat_passes
.ok_or_else(|| RigError::communication("missing sat_passes in GetSatPasses response"));
}
Err(RigError::communication(
resp.error.unwrap_or_else(|| "remote error".into()),
))
}
fn cache_remote_rigs( fn cache_remote_rigs(
config: &RemoteClientConfig, config: &RemoteClientConfig,
_raw_rigs: &[RigEntry], _raw_rigs: &[RigEntry],
+21 -1
View File
@@ -45,6 +45,10 @@ const EARTH_RADIUS_KM: f64 = 6371.0;
const CELESTRAK_WEATHER_URL: &str = const CELESTRAK_WEATHER_URL: &str =
"https://celestrak.org/NORAD/elements/gp.php?GROUP=weather&FORMAT=tle"; "https://celestrak.org/NORAD/elements/gp.php?GROUP=weather&FORMAT=tle";
/// CelesTrak NOAA satellite TLE endpoint (includes NOAA-15/18/19 APT sats).
const CELESTRAK_NOAA_URL: &str =
"https://celestrak.org/NORAD/elements/gp.php?GROUP=noaa&FORMAT=tle";
/// CelesTrak amateur satellite TLE endpoint. /// CelesTrak amateur satellite TLE endpoint.
const CELESTRAK_HAM_URL: &str = const CELESTRAK_HAM_URL: &str =
"https://celestrak.org/NORAD/elements/gp.php?GROUP=amateur&FORMAT=tle"; "https://celestrak.org/NORAD/elements/gp.php?GROUP=amateur&FORMAT=tle";
@@ -280,7 +284,7 @@ pub async fn refresh_tles_from_celestrak() -> Result<usize, String> {
/// do not stop the periodic refresh — hardcoded fallback TLEs remain usable. /// do not stop the periodic refresh — hardcoded fallback TLEs remain usable.
pub fn spawn_tle_refresh_task() { pub fn spawn_tle_refresh_task() {
tokio::spawn(async { tokio::spawn(async {
// Initial fetch at startup: weather + amateur satellites. // Initial fetch at startup: weather + NOAA + amateur satellites.
match fetch_and_merge_tles(CELESTRAK_WEATHER_URL, SatCategory::Weather).await { match fetch_and_merge_tles(CELESTRAK_WEATHER_URL, SatCategory::Weather).await {
Ok(n) => { Ok(n) => {
tracing::info!("TLE refresh: loaded {n} weather satellite TLEs from CelesTrak") tracing::info!("TLE refresh: loaded {n} weather satellite TLEs from CelesTrak")
@@ -289,6 +293,14 @@ pub fn spawn_tle_refresh_task() {
tracing::warn!("TLE refresh: weather fetch failed ({e}), using hardcoded TLEs") tracing::warn!("TLE refresh: weather fetch failed ({e}), using hardcoded TLEs")
} }
} }
match fetch_and_merge_tles(CELESTRAK_NOAA_URL, SatCategory::Weather).await {
Ok(n) => {
tracing::info!("TLE refresh: loaded {n} NOAA satellite TLEs from CelesTrak")
}
Err(e) => {
tracing::warn!("TLE refresh: NOAA fetch failed ({e})")
}
}
match fetch_and_merge_tles(CELESTRAK_HAM_URL, SatCategory::Amateur).await { match fetch_and_merge_tles(CELESTRAK_HAM_URL, SatCategory::Amateur).await {
Ok(n) => { Ok(n) => {
tracing::info!("TLE refresh: loaded {n} amateur satellite TLEs from CelesTrak") tracing::info!("TLE refresh: loaded {n} amateur satellite TLEs from CelesTrak")
@@ -311,6 +323,14 @@ pub fn spawn_tle_refresh_task() {
tracing::warn!("TLE refresh: weather fetch failed ({e}), keeping previous TLEs") tracing::warn!("TLE refresh: weather fetch failed ({e}), keeping previous TLEs")
} }
} }
match fetch_and_merge_tles(CELESTRAK_NOAA_URL, SatCategory::Weather).await {
Ok(n) => {
tracing::info!("TLE refresh: updated {n} NOAA satellite TLEs from CelesTrak")
}
Err(e) => {
tracing::warn!("TLE refresh: NOAA fetch failed ({e}), keeping previous TLEs")
}
}
match fetch_and_merge_tles(CELESTRAK_HAM_URL, SatCategory::Amateur).await { match fetch_and_merge_tles(CELESTRAK_HAM_URL, SatCategory::Amateur).await {
Ok(n) => { Ok(n) => {
tracing::info!("TLE refresh: updated {n} amateur satellite TLEs from CelesTrak") tracing::info!("TLE refresh: updated {n} amateur satellite TLEs from CelesTrak")
+10 -1
View File
@@ -271,6 +271,7 @@ async fn handle_client(
} }
// GetSatPasses: compute satellite passes from the server-side TLE store. // GetSatPasses: compute satellite passes from the server-side TLE store.
// Runs on a blocking thread to avoid stalling the connection handler.
if matches!(envelope.cmd, ClientCommand::GetSatPasses) { if matches!(envelope.cmd, ClientCommand::GetSatPasses) {
let result = if let Some((lat, lon)) = station_coords { let result = if let Some((lat, lon)) = station_coords {
let now_ms = std::time::SystemTime::now() let now_ms = std::time::SystemTime::now()
@@ -278,7 +279,15 @@ async fn handle_client(
.unwrap_or_default() .unwrap_or_default()
.as_millis() as i64; .as_millis() as i64;
let window_ms = 24 * 3600 * 1000; // 24 hours let window_ms = 24 * 3600 * 1000; // 24 hours
trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms) tokio::task::spawn_blocking(move || {
trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms)
})
.await
.unwrap_or_else(|_| trx_core::geo::PassPredictionResult {
passes: vec![],
satellite_count: 0,
tle_source: trx_core::geo::TleSource::Unavailable,
})
} else { } else {
trx_core::geo::PassPredictionResult { trx_core::geo::PassPredictionResult {
passes: vec![], passes: vec![],