[feat](trx-rs): add GetSatPasses protocol command for server-side TLE management

TLE refresh now happens only on trx-server (once at startup, then every
24h). Client fetches satellite predictions from server via new
GetSatPasses fast-path command and caches them locally, refreshing
every 5 minutes. Removes spawn_tle_refresh_task from trx-client.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-28 15:38:39 +01:00
parent e831dff85d
commit 2f7adf05c8
10 changed files with 167 additions and 37 deletions
+1 -3
View File
@@ -383,6 +383,7 @@ async fn async_init() -> DynResult<AppState> {
rig_server_connected: frontend_runtime.rig_server_connected.clone(),
rig_id_to_short_name,
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: frontend_runtime.sat_passes.clone(),
};
let state_tx = state_tx.clone();
let remote_shutdown_rx = shutdown_rx.clone();
@@ -564,9 +565,6 @@ async fn async_init() -> DynResult<AppState> {
let frontend_runtime_ctx = Arc::new(frontend_runtime);
// Fetch satellite TLEs from CelesTrak for pass predictions.
trx_core::geo::spawn_tle_refresh_task();
// Start decode history collector before audio client starts replay.
// Frontend tasks are spawned asynchronously, so starting the collector
// here avoids missing the initial server-side history burst.
+73
View File
@@ -75,6 +75,8 @@ pub struct RemoteClientConfig {
/// Dynamically resolved reverse mapping: short_name → server rig_id.
/// Populated during `refresh_remote_snapshot` when short-name mode is active.
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
/// Cached satellite pass predictions from the server (GetSatPasses).
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
}
pub async fn run_remote_client(
@@ -301,6 +303,18 @@ async fn handle_connection(
warn!("Initial remote snapshot refresh failed: {}", e);
}
// Fetch satellite passes immediately and then every 5 minutes.
let sat_pass_interval = Duration::from_secs(5 * 60);
let mut last_sat_pass_refresh = Instant::now();
match send_get_sat_passes(config, &mut writer, &mut reader).await {
Ok(result) => {
if let Ok(mut guard) = config.sat_passes.write() {
*guard = Some(result);
}
}
Err(e) => warn!("Initial sat passes fetch failed: {}", e),
}
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
@@ -339,6 +353,19 @@ async fn handle_connection(
} else {
poll_failure_streak = 0;
}
// Refresh satellite passes periodically (every 5 minutes).
if last_sat_pass_refresh.elapsed() >= sat_pass_interval {
last_sat_pass_refresh = Instant::now();
match send_get_sat_passes(config, &mut writer, &mut reader).await {
Ok(result) => {
if let Ok(mut guard) = config.sat_passes.write() {
*guard = Some(result);
}
}
Err(e) => warn!("Sat passes refresh failed: {}", e),
}
}
}
req = rx.recv() => {
let Some(req) = req else {
@@ -591,6 +618,44 @@ async fn send_get_rigs(
))
}
async fn send_get_sat_passes(
config: &RemoteClientConfig,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
) -> RigResult<trx_core::geo::PassPredictionResult> {
let envelope = build_envelope(config, ClientCommand::GetSatPasses, None);
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?;
time::timeout(IO_TIMEOUT, writer.flush())
.await
.map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("flush failed: {e}")))?;
let line = time::timeout(IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES))
.await
.map_err(|_| RigError::communication(format!("read timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?;
let resp: ClientResponse = serde_json::from_str(line.trim_end())
.map_err(|e| RigError::communication(format!("invalid response: {e}")))?;
if resp.success {
return resp
.sat_passes
.ok_or_else(|| RigError::communication("missing sat_passes in GetSatPasses response"));
}
Err(RigError::communication(
resp.error.unwrap_or_else(|| "remote error".into()),
))
}
fn cache_remote_rigs(
config: &RemoteClientConfig,
_raw_rigs: &[RigEntry],
@@ -1135,6 +1200,7 @@ mod tests {
state: snapshot.clone(),
audio_port: Some(4531),
}]),
sat_passes: None,
error: None,
})
.expect("serialize response")
@@ -1181,6 +1247,7 @@ mod tests {
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
},
req_rx,
state_tx,
@@ -1224,6 +1291,7 @@ mod tests {
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
};
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret"));
@@ -1250,6 +1318,7 @@ mod tests {
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
short_name_to_rig_id,
sat_passes: Arc::new(RwLock::new(None)),
};
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
@@ -1280,6 +1349,7 @@ mod tests {
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
};
// Legacy mode: rig_id passes through unchanged
assert!(!has_short_names(&config));
@@ -1305,6 +1375,7 @@ mod tests {
(None, "default-rig".to_string()),
]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
};
assert!(has_short_names(&config));
assert_eq!(
@@ -1340,6 +1411,7 @@ mod tests {
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
};
let snapshot = sample_snapshot();
let rigs = vec![RigEntry {
@@ -1411,6 +1483,7 @@ mod tests {
rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)),
};
let ids = super::active_spectrum_rig_ids(&config);
+3
View File
@@ -260,6 +260,8 @@ pub struct FrontendRuntimeContext {
pub remote_active_rig_id: Arc<Mutex<Option<String>>>,
/// Cached remote rig list from GetRigs polling.
pub remote_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
/// Cached satellite pass predictions from the server (GetSatPasses).
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
/// Per-rig state watch channels, keyed by rig_id.
/// Populated by the remote client poll loop so each SSE session can
/// subscribe to a specific rig's state independently.
@@ -391,6 +393,7 @@ impl FrontendRuntimeContext {
http_decode_history_retention_min_by_rig: HashMap::new(),
remote_active_rig_id: Arc::new(Mutex::new(None)),
remote_rigs: Arc::new(Mutex::new(Vec::new())),
sat_passes: Arc::new(RwLock::new(None)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
owner_callsign: None,
owner_website_url: None,
@@ -107,6 +107,7 @@ async fn handle_client(
rig_id: None,
state: None,
rigs: None,
sat_passes: None,
error: Some(format!("Invalid JSON: {}", e)),
};
send_response(&mut writer, &resp).await?;
@@ -120,6 +121,7 @@ async fn handle_client(
rig_id: None,
state: None,
rigs: None,
sat_passes: None,
error: Some(err),
};
send_response(&mut writer, &resp).await?;
@@ -138,6 +140,7 @@ async fn handle_client(
rig_id: Some("client".to_string()),
state: None,
rigs: Some(snapshot_remote_rigs(context.as_ref())),
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
@@ -168,6 +171,7 @@ async fn handle_client(
rig_id: active_rig_id.clone(),
state: None,
rigs: None,
sat_passes: None,
error: Some("Internal error: rig task not available".into()),
};
send_response(&mut writer, &resp).await?;
@@ -179,6 +183,7 @@ async fn handle_client(
rig_id: active_rig_id.clone(),
state: None,
rigs: None,
sat_passes: None,
error: Some("Internal error: request queue timeout".into()),
};
send_response(&mut writer, &resp).await?;
@@ -193,6 +198,7 @@ async fn handle_client(
rig_id: active_rig_id.clone(),
state: Some(snapshot),
rigs: None,
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
@@ -203,6 +209,7 @@ async fn handle_client(
rig_id: active_rig_id.clone(),
state: None,
rigs: None,
sat_passes: None,
error: Some(err.message),
};
send_response(&mut writer, &resp).await?;
@@ -214,6 +221,7 @@ async fn handle_client(
rig_id: active_rig_id.clone(),
state: None,
rigs: None,
sat_passes: None,
error: Some("Internal error waiting for rig response".into()),
};
send_response(&mut writer, &resp).await?;
@@ -224,6 +232,7 @@ async fn handle_client(
rig_id: active_rig_id.clone(),
state: None,
rigs: None,
sat_passes: None,
error: Some("Request timed out waiting for rig response".into()),
};
send_response(&mut writer, &resp).await?;
@@ -1384,31 +1384,14 @@ struct SatPassesResponse {
/// Return predicted passes for all known satellites over the next 24 h.
///
/// Requires the server station location to be configured. Returns an empty
/// `passes` array with an `error` field if the location is missing or TLE
/// data has not been fetched yet.
/// Reads cached predictions from the server (fetched via GetSatPasses).
/// Returns an empty `passes` array with an `error` field if predictions
/// are not yet available.
#[get("/sat_passes")]
pub async fn sat_passes(state: web::Data<watch::Receiver<RigState>>) -> impl Responder {
let rig_state = state.get_ref().borrow().clone();
let lat = rig_state.server_latitude;
let lon = rig_state.server_longitude;
let (Some(lat), Some(lon)) = (lat, lon) else {
return web::Json(SatPassesResponse {
passes: vec![],
error: Some("No station location configured".to_string()),
satellite_count: 0,
tle_source: trx_core::geo::TleSource::Unavailable,
});
};
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let window_ms = 24 * 60 * 60 * 1000_i64;
let result = trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms);
pub async fn sat_passes(context: web::Data<Arc<FrontendRuntimeContext>>) -> impl Responder {
let cached = context.sat_passes.read().ok().and_then(|g| g.clone());
match cached {
Some(result) => {
let error = match result.tle_source {
trx_core::geo::TleSource::Unavailable => {
Some("TLE data not yet available — waiting for CelesTrak fetch".to_string())
@@ -1421,6 +1404,14 @@ pub async fn sat_passes(state: web::Data<watch::Receiver<RigState>>) -> impl Res
satellite_count: result.satellite_count,
tle_source: result.tle_source,
})
}
None => web::Json(SatPassesResponse {
passes: vec![],
error: Some("Satellite predictions not yet available from server".to_string()),
satellite_count: 0,
tle_source: trx_core::geo::TleSource::Unavailable,
}),
}
}
#[post("/clear_ft8_decode")]
@@ -2400,6 +2391,7 @@ async fn send_command(
rig_id: None,
state: Some(snapshot),
rigs: None,
sat_passes: None,
error: None,
})),
Ok(Err(err)) => Ok(HttpResponse::BadRequest().json(ClientResponse {
@@ -2407,6 +2399,7 @@ async fn send_command(
rig_id: None,
state: None,
rigs: None,
sat_passes: None,
error: Some(err.message),
})),
Err(e) => Err(actix_web::error::ErrorInternalServerError(format!(
+3
View File
@@ -263,6 +263,7 @@ mod tests {
rig_id: Some("hf".to_string()),
state: None,
rigs: None,
sat_passes: None,
error: None,
};
let json = serde_json::to_string(&resp).unwrap();
@@ -279,6 +280,7 @@ mod tests {
rig_id: None,
state: None,
rigs: None,
sat_passes: None,
error: Some("bad".to_string()),
};
let json = serde_json::to_string(&resp).unwrap();
@@ -296,6 +298,7 @@ mod tests {
rig_id: Some("server".to_string()),
state: None,
rigs: None,
sat_passes: None,
error: None,
};
let json = serde_json::to_string(&resp).unwrap();
+3
View File
@@ -19,6 +19,9 @@ pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand {
ClientCommand::GetRigs => {
unreachable!("GetRigs is handled in the listener before reaching rig_task")
}
ClientCommand::GetSatPasses => {
unreachable!("GetSatPasses is handled in the listener before reaching rig_task")
}
ClientCommand::GetState => RigCommand::GetSnapshot,
ClientCommand::SetFreq { freq_hz } => RigCommand::SetFreq(Freq { hz: freq_hz }),
ClientCommand::SetCenterFreq { freq_hz } => RigCommand::SetCenterFreq(Freq { hz: freq_hz }),
+4
View File
@@ -15,6 +15,7 @@ use trx_core::WfmDenoiseLevel;
pub enum ClientCommand {
GetState,
GetRigs,
GetSatPasses,
SetFreq { freq_hz: u64 },
SetCenterFreq { freq_hz: u64 },
SetMode { mode: String },
@@ -95,5 +96,8 @@ pub struct ClientResponse {
/// Populated only for GetRigs responses.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rigs: Option<Vec<RigEntry>>,
/// Populated only for GetSatPasses responses.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sat_passes: Option<trx_core::geo::PassPredictionResult>,
pub error: Option<String>,
}
+43 -1
View File
@@ -45,6 +45,7 @@ pub async fn run_listener(
rigs: Arc<HashMap<String, RigHandle>>,
default_rig_id: String,
auth_tokens: HashSet<String>,
station_coords: Option<(f64, f64)>,
mut shutdown_rx: watch::Receiver<bool>,
) -> std::io::Result<()> {
let listener = TcpListener::bind(addr).await?;
@@ -61,8 +62,9 @@ pub async fn run_listener(
let default_rig_id = default_rig_id.clone();
let validator = Arc::clone(&validator);
let client_shutdown_rx = shutdown_rx.clone();
let coords = station_coords;
tokio::spawn(async move {
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, client_shutdown_rx).await {
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, client_shutdown_rx).await {
error!("Client {} error: {:?}", peer, e);
}
});
@@ -158,6 +160,7 @@ async fn handle_client(
rigs: Arc<HashMap<String, RigHandle>>,
default_rig_id: String,
validator: Arc<SimpleTokenValidator>,
station_coords: Option<(f64, f64)>,
mut shutdown_rx: watch::Receiver<bool>,
) -> std::io::Result<()> {
let (reader, mut writer) = socket.into_split();
@@ -213,6 +216,7 @@ async fn handle_client(
rig_id: None,
state: None,
rigs: None,
sat_passes: None,
error: Some(format!("Invalid JSON: {}", e)),
};
send_response(&mut writer, &resp).await?;
@@ -226,6 +230,7 @@ async fn handle_client(
rig_id: None,
state: None,
rigs: None,
sat_passes: None,
error: Some(err),
};
send_response(&mut writer, &resp).await?;
@@ -258,6 +263,35 @@ async fn handle_client(
rig_id: Some("server".to_string()),
state: None,
rigs: Some(entries),
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
continue;
}
// GetSatPasses: compute satellite passes from the server-side TLE store.
if matches!(envelope.cmd, ClientCommand::GetSatPasses) {
let result = if let Some((lat, lon)) = station_coords {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let window_ms = 24 * 3600 * 1000; // 24 hours
trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms)
} else {
trx_core::geo::PassPredictionResult {
passes: vec![],
satellite_count: 0,
tle_source: trx_core::geo::TleSource::Unavailable,
}
};
let resp = ClientResponse {
success: true,
rig_id: Some("server".to_string()),
state: None,
rigs: None,
sat_passes: Some(result),
error: None,
};
send_response(&mut writer, &resp).await?;
@@ -274,6 +308,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: None,
rigs: None,
sat_passes: None,
error: Some(format!("Unknown rig_id: {}", target_rig_id)),
};
send_response(&mut writer, &resp).await?;
@@ -292,6 +327,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: Some(snapshot),
rigs: None,
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
@@ -318,6 +354,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: None,
rigs: None,
sat_passes: None,
error: Some("Internal error: rig task not available".into()),
};
send_response(&mut writer, &resp).await?;
@@ -329,6 +366,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: None,
rigs: None,
sat_passes: None,
error: Some("Internal error: request queue timeout".into()),
};
send_response(&mut writer, &resp).await?;
@@ -346,6 +384,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: None,
rigs: None,
sat_passes: None,
error: Some("Request timed out waiting for rig response".into()),
};
send_response(&mut writer, &resp).await?;
@@ -370,6 +409,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: Some(snapshot),
rigs: None,
sat_passes: None,
error: None,
};
send_response(&mut writer, &resp).await?;
@@ -380,6 +420,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: None,
rigs: None,
sat_passes: None,
error: Some(err.message),
};
send_response(&mut writer, &resp).await?;
@@ -391,6 +432,7 @@ async fn handle_client(
rig_id: Some(target_rig_id.clone()),
state: None,
rigs: None,
sat_passes: None,
error: Some("Internal error waiting for rig response".into()),
};
send_response(&mut writer, &resp).await?;
+2
View File
@@ -1161,11 +1161,13 @@ async fn main() -> DynResult<()> {
let rigs_arc = Arc::new(rig_handles);
let listener_shutdown_rx = shutdown_rx.clone();
task_handles.push(tokio::spawn(async move {
let station_coords = latitude.zip(longitude);
if let Err(e) = listener::run_listener(
listen_addr,
rigs_arc,
default_rig_id,
auth_tokens,
station_coords,
listener_shutdown_rx,
)
.await