[fix](trx-server): fix satellite pass computation degrading spectrum performance
Two issues introduced with wxsat/satellite support caused indirect performance degradation on the spectrum rendering path: 1. spawn_tle_refresh_task() was called inside spawn_rig_audio_stack(), which runs per-rig. With N rigs this spawned N redundant TLE refresh tasks, each making 3 concurrent HTTP requests to CelesTrak and competing for write locks on the global TLE store. Moved to a single global call after the per-rig loop. 2. compute_upcoming_passes() (SGP4 propagation for 200+ satellites over 24h = ~300K propagation steps) ran on every GetSatPasses request with no caching. Multiple client connections could trigger concurrent CPU-heavy computations, causing cache pollution and tokio runtime contention that indirectly slowed spectrum frame processing. Added a 60-second server-side cache shared across all client connections. https://claude.ai/code/session_017g7VNMb6CChaiWrfzVBhbR Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -13,8 +13,8 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
@@ -35,6 +35,16 @@ use crate::rig_handle::RigHandle;
|
|||||||
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
||||||
const MAX_JSON_LINE_BYTES: usize = 256 * 1024;
|
const MAX_JSON_LINE_BYTES: usize = 256 * 1024;
|
||||||
|
/// How long to cache satellite pass predictions before recomputing.
|
||||||
|
/// SGP4 propagation for 200+ satellites is CPU-intensive; caching avoids
|
||||||
|
/// redundant recomputation when multiple clients request passes concurrently.
|
||||||
|
const SAT_PASS_CACHE_TTL: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
/// Cached satellite pass prediction result shared across client connections.
|
||||||
|
struct SatPassCache {
|
||||||
|
result: trx_core::geo::PassPredictionResult,
|
||||||
|
computed_at: Instant,
|
||||||
|
}
|
||||||
/// Run the JSON TCP listener, accepting client connections.
|
/// Run the JSON TCP listener, accepting client connections.
|
||||||
///
|
///
|
||||||
/// `rigs` is a shared map from rig_id → `RigHandle`. The first entry (by
|
/// `rigs` is a shared map from rig_id → `RigHandle`. The first entry (by
|
||||||
@@ -51,6 +61,7 @@ pub async fn run_listener(
|
|||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
info!("Listening on {}", addr);
|
info!("Listening on {}", addr);
|
||||||
let validator = Arc::new(SimpleTokenValidator::new(auth_tokens));
|
let validator = Arc::new(SimpleTokenValidator::new(auth_tokens));
|
||||||
|
let sat_pass_cache: Arc<Mutex<Option<SatPassCache>>> = Arc::new(Mutex::new(None));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -63,8 +74,9 @@ pub async fn run_listener(
|
|||||||
let validator = Arc::clone(&validator);
|
let validator = Arc::clone(&validator);
|
||||||
let client_shutdown_rx = shutdown_rx.clone();
|
let client_shutdown_rx = shutdown_rx.clone();
|
||||||
let coords = station_coords;
|
let coords = station_coords;
|
||||||
|
let cache = Arc::clone(&sat_pass_cache);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, client_shutdown_rx).await {
|
if let Err(e) = handle_client(socket, peer, rigs, default_rig_id, validator, coords, cache, client_shutdown_rx).await {
|
||||||
error!("Client {} error: {:?}", peer, e);
|
error!("Client {} error: {:?}", peer, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -161,6 +173,7 @@ async fn handle_client(
|
|||||||
default_rig_id: String,
|
default_rig_id: String,
|
||||||
validator: Arc<SimpleTokenValidator>,
|
validator: Arc<SimpleTokenValidator>,
|
||||||
station_coords: Option<(f64, f64)>,
|
station_coords: Option<(f64, f64)>,
|
||||||
|
sat_pass_cache: Arc<Mutex<Option<SatPassCache>>>,
|
||||||
mut shutdown_rx: watch::Receiver<bool>,
|
mut shutdown_rx: watch::Receiver<bool>,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let (reader, mut writer) = socket.into_split();
|
let (reader, mut writer) = socket.into_split();
|
||||||
@@ -271,15 +284,32 @@ async fn handle_client(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetSatPasses: compute satellite passes from the server-side TLE store.
|
// GetSatPasses: compute satellite passes from the server-side TLE store.
|
||||||
// Runs on a blocking thread to avoid stalling the connection handler.
|
// Results are cached for SAT_PASS_CACHE_TTL to avoid redundant CPU-heavy
|
||||||
|
// SGP4 propagation when multiple clients request passes concurrently.
|
||||||
if matches!(envelope.cmd, ClientCommand::GetSatPasses) {
|
if matches!(envelope.cmd, ClientCommand::GetSatPasses) {
|
||||||
let result = if let Some((lat, lon)) = station_coords {
|
// Check cache first.
|
||||||
|
let cached = sat_pass_cache
|
||||||
|
.lock()
|
||||||
|
.ok()
|
||||||
|
.and_then(|guard| {
|
||||||
|
guard.as_ref().and_then(|c| {
|
||||||
|
if c.computed_at.elapsed() < SAT_PASS_CACHE_TTL {
|
||||||
|
Some(c.result.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = if let Some(cached_result) = cached {
|
||||||
|
cached_result
|
||||||
|
} else if let Some((lat, lon)) = station_coords {
|
||||||
let now_ms = std::time::SystemTime::now()
|
let now_ms = std::time::SystemTime::now()
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.as_millis() as i64;
|
.as_millis() as i64;
|
||||||
let window_ms = 24 * 3600 * 1000; // 24 hours
|
let window_ms = 24 * 3600 * 1000; // 24 hours
|
||||||
tokio::task::spawn_blocking(move || {
|
let fresh = tokio::task::spawn_blocking(move || {
|
||||||
trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms)
|
trx_core::geo::compute_upcoming_passes(lat, lon, now_ms, window_ms)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
@@ -287,7 +317,15 @@ async fn handle_client(
|
|||||||
passes: vec![],
|
passes: vec![],
|
||||||
satellite_count: 0,
|
satellite_count: 0,
|
||||||
tle_source: trx_core::geo::TleSource::Unavailable,
|
tle_source: trx_core::geo::TleSource::Unavailable,
|
||||||
})
|
});
|
||||||
|
// Update cache.
|
||||||
|
if let Ok(mut guard) = sat_pass_cache.lock() {
|
||||||
|
*guard = Some(SatPassCache {
|
||||||
|
result: fresh.clone(),
|
||||||
|
computed_at: Instant::now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
fresh
|
||||||
} else {
|
} else {
|
||||||
trx_core::geo::PassPredictionResult {
|
trx_core::geo::PassPredictionResult {
|
||||||
passes: vec![],
|
passes: vec![],
|
||||||
|
|||||||
@@ -794,9 +794,6 @@ fn spawn_rig_audio_stack(
|
|||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Start periodic TLE refresh from CelesTrak (on start + once/day).
|
|
||||||
trx_core::geo::spawn_tle_refresh_task();
|
|
||||||
|
|
||||||
// Spawn weather satellite APT decoder task
|
// Spawn weather satellite APT decoder task
|
||||||
let wxsat_pcm_rx = pcm_tx.subscribe();
|
let wxsat_pcm_rx = pcm_tx.subscribe();
|
||||||
let wxsat_state_rx = state_rx.clone();
|
let wxsat_state_rx = state_rx.clone();
|
||||||
@@ -1145,6 +1142,11 @@ async fn main() -> DynResult<()> {
|
|||||||
// Spawn periodic flush of decode history to disk (every 60 s).
|
// Spawn periodic flush of decode history to disk (every 60 s).
|
||||||
history_store::spawn_flush_task(history_db, rig_histories_for_flush);
|
history_store::spawn_flush_task(history_db, rig_histories_for_flush);
|
||||||
|
|
||||||
|
// Start periodic TLE refresh from CelesTrak (on start + once/day).
|
||||||
|
// Called once globally rather than per-rig to avoid redundant HTTP fetches
|
||||||
|
// and write-lock contention on the TLE store.
|
||||||
|
trx_core::geo::spawn_tle_refresh_task();
|
||||||
|
|
||||||
// Start JSON TCP listener.
|
// Start JSON TCP listener.
|
||||||
if cfg.listen.enabled {
|
if cfg.listen.enabled {
|
||||||
let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);
|
let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);
|
||||||
|
|||||||
Reference in New Issue
Block a user