[refactor](trx-server): supervise runtime tasks and shutdown
Add coordinated shutdown signaling and task supervision for long-running server and client tasks to avoid detached runtimes on Ctrl+C. Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -15,9 +15,8 @@ use tokio::time;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use trx_core::audio::{
|
||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE,
|
||||
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO,
|
||||
AUDIO_MSG_TX_FRAME,
|
||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE,
|
||||
AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
|
||||
};
|
||||
use trx_core::decode::DecodedMessage;
|
||||
|
||||
@@ -28,16 +27,29 @@ pub async fn run_audio_client(
|
||||
mut tx_rx: mpsc::Receiver<Bytes>,
|
||||
stream_info_tx: watch::Sender<Option<AudioStreamInfo>>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) {
|
||||
let mut reconnect_delay = Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
if *shutdown_rx.borrow() {
|
||||
info!("Audio client shutting down");
|
||||
return;
|
||||
}
|
||||
|
||||
info!("Audio client: connecting to {}", server_addr);
|
||||
match TcpStream::connect(&server_addr).await {
|
||||
Ok(stream) => {
|
||||
reconnect_delay = Duration::from_secs(1);
|
||||
if let Err(e) =
|
||||
handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx, &decode_tx).await
|
||||
if let Err(e) = handle_audio_connection(
|
||||
stream,
|
||||
&rx_tx,
|
||||
&mut tx_rx,
|
||||
&stream_info_tx,
|
||||
&decode_tx,
|
||||
&mut shutdown_rx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Audio connection dropped: {}", e);
|
||||
}
|
||||
@@ -48,7 +60,19 @@ pub async fn run_audio_client(
|
||||
}
|
||||
|
||||
let _ = stream_info_tx.send(None);
|
||||
time::sleep(reconnect_delay).await;
|
||||
tokio::select! {
|
||||
_ = time::sleep(reconnect_delay) => {}
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
info!("Audio client shutting down");
|
||||
return;
|
||||
}
|
||||
Ok(()) => {}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10));
|
||||
}
|
||||
}
|
||||
@@ -59,6 +83,7 @@ async fn handle_audio_connection(
|
||||
tx_rx: &mut mpsc::Receiver<Bytes>,
|
||||
stream_info_tx: &watch::Sender<Option<AudioStreamInfo>>,
|
||||
decode_tx: &broadcast::Sender<DecodedMessage>,
|
||||
shutdown_rx: &mut watch::Receiver<bool>,
|
||||
) -> std::io::Result<()> {
|
||||
let (reader, writer) = stream.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
@@ -89,7 +114,10 @@ async fn handle_audio_connection(
|
||||
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
|
||||
let _ = rx_tx.send(Bytes::from(payload));
|
||||
}
|
||||
Ok((AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE | AUDIO_MSG_FT8_DECODE, payload)) => {
|
||||
Ok((
|
||||
AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE | AUDIO_MSG_FT8_DECODE,
|
||||
payload,
|
||||
)) => {
|
||||
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(&payload) {
|
||||
let _ = decode_tx.send(msg);
|
||||
}
|
||||
@@ -105,6 +133,19 @@ async fn handle_audio_connection(
|
||||
// Forward TX frames to server
|
||||
loop {
|
||||
tokio::select! {
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
rx_handle.abort();
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
rx_handle.abort();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
packet = tx_rx.recv() => {
|
||||
match packet {
|
||||
Some(data) => {
|
||||
|
||||
+49
-24
@@ -14,16 +14,19 @@ use bytes::Bytes;
|
||||
use clap::Parser;
|
||||
use tokio::signal;
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use tracing::info;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info};
|
||||
|
||||
use trx_app::{init_logging, load_plugins, normalize_name};
|
||||
use trx_core::audio::AudioStreamInfo;
|
||||
|
||||
use trx_core::decode::DecodedMessage;
|
||||
use trx_core::rig::request::RigRequest;
|
||||
use trx_core::rig::state::RigState;
|
||||
use trx_core::DynResult;
|
||||
use trx_frontend::{snapshot_bootstrap_context, FrontendRegistrationContext, FrontendRuntimeContext};
|
||||
use trx_core::decode::DecodedMessage;
|
||||
use trx_frontend::{
|
||||
snapshot_bootstrap_context, FrontendRegistrationContext, FrontendRuntimeContext,
|
||||
};
|
||||
use trx_frontend_http::register_frontend_on as register_http_frontend;
|
||||
use trx_frontend_http_json::register_frontend_on as register_http_json_frontend;
|
||||
use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend;
|
||||
@@ -81,20 +84,33 @@ struct Cli {
|
||||
callsign: Option<String>,
|
||||
}
|
||||
|
||||
fn main() -> DynResult<()> {
|
||||
let rt = tokio::runtime::Runtime::new()?;
|
||||
#[tokio::main]
|
||||
async fn main() -> DynResult<()> {
|
||||
let app_state = async_init().await?;
|
||||
signal::ctrl_c().await?;
|
||||
info!("Ctrl+C received, shutting down");
|
||||
|
||||
let _app_state = rt.block_on(async_init())?;
|
||||
let _ = app_state.shutdown_tx.send(true);
|
||||
drop(app_state.request_tx);
|
||||
tokio::time::sleep(Duration::from_millis(400)).await;
|
||||
|
||||
rt.block_on(async {
|
||||
signal::ctrl_c().await?;
|
||||
info!("Ctrl+C received, shutting down");
|
||||
Ok(())
|
||||
})
|
||||
for handle in &app_state.task_handles {
|
||||
if !handle.is_finished() {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
for handle in app_state.task_handles {
|
||||
let _ = handle.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Holds the state needed after async initialization completes.
|
||||
struct AppState;
|
||||
struct AppState {
|
||||
shutdown_tx: watch::Sender<bool>,
|
||||
task_handles: Vec<JoinHandle<()>>,
|
||||
request_tx: mpsc::Sender<RigRequest>,
|
||||
}
|
||||
|
||||
async fn async_init() -> DynResult<AppState> {
|
||||
use std::sync::Arc;
|
||||
@@ -151,14 +167,9 @@ async fn async_init() -> DynResult<AppState> {
|
||||
let remote_addr =
|
||||
parse_remote_url(&remote_url).map_err(|e| format!("Invalid remote URL: {}", e))?;
|
||||
|
||||
let remote_token = cli
|
||||
.token
|
||||
.clone()
|
||||
.or_else(|| cfg.remote.auth.token.clone());
|
||||
let remote_token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone());
|
||||
|
||||
let poll_interval_ms = cli
|
||||
.poll_interval_ms
|
||||
.unwrap_or(cfg.remote.poll_interval_ms);
|
||||
let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms);
|
||||
|
||||
// Resolve frontends: CLI > config > default to http
|
||||
let frontends: Vec<String> = if let Some(ref fes) = cli.frontends {
|
||||
@@ -210,6 +221,8 @@ async fn async_init() -> DynResult<AppState> {
|
||||
);
|
||||
|
||||
let (tx, rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
|
||||
|
||||
let initial_state = RigState::new_uninitialized();
|
||||
let (state_tx, state_rx) = watch::channel(initial_state);
|
||||
@@ -226,8 +239,14 @@ async fn async_init() -> DynResult<AppState> {
|
||||
token: remote_token,
|
||||
poll_interval: Duration::from_millis(poll_interval_ms),
|
||||
};
|
||||
let _remote_handle =
|
||||
tokio::spawn(remote_client::run_remote_client(remote_cfg, rx, state_tx));
|
||||
let remote_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
if let Err(e) =
|
||||
remote_client::run_remote_client(remote_cfg, rx, state_tx, remote_shutdown_rx).await
|
||||
{
|
||||
error!("Remote client error: {}", e);
|
||||
}
|
||||
}));
|
||||
|
||||
// Audio streaming setup
|
||||
if cfg.frontends.audio.enabled {
|
||||
@@ -248,13 +267,15 @@ async fn async_init() -> DynResult<AppState> {
|
||||
audio_addr
|
||||
);
|
||||
|
||||
tokio::spawn(audio_client::run_audio_client(
|
||||
let audio_shutdown_rx = shutdown_rx.clone();
|
||||
task_handles.push(tokio::spawn(audio_client::run_audio_client(
|
||||
audio_addr,
|
||||
rx_audio_tx,
|
||||
tx_audio_rx,
|
||||
stream_info_tx,
|
||||
decode_tx,
|
||||
));
|
||||
audio_shutdown_rx,
|
||||
)));
|
||||
} else {
|
||||
info!("Audio disabled in config, decode will not be available");
|
||||
}
|
||||
@@ -282,5 +303,9 @@ async fn async_init() -> DynResult<AppState> {
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(AppState)
|
||||
Ok(AppState {
|
||||
shutdown_tx,
|
||||
task_handles,
|
||||
request_tx: tx,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -26,14 +26,22 @@ pub async fn run_remote_client(
|
||||
config: RemoteClientConfig,
|
||||
mut rx: mpsc::Receiver<RigRequest>,
|
||||
state_tx: watch::Sender<RigState>,
|
||||
mut shutdown_rx: watch::Receiver<bool>,
|
||||
) -> RigResult<()> {
|
||||
let mut reconnect_delay = Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
if *shutdown_rx.borrow() {
|
||||
info!("Remote client shutting down");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("Remote client: connecting to {}", config.addr);
|
||||
match TcpStream::connect(&config.addr).await {
|
||||
Ok(stream) => {
|
||||
if let Err(e) = handle_connection(&config, stream, &mut rx, &state_tx).await {
|
||||
if let Err(e) =
|
||||
handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await
|
||||
{
|
||||
warn!("Remote connection dropped: {}", e);
|
||||
}
|
||||
}
|
||||
@@ -42,7 +50,19 @@ pub async fn run_remote_client(
|
||||
}
|
||||
}
|
||||
|
||||
time::sleep(reconnect_delay).await;
|
||||
tokio::select! {
|
||||
_ = time::sleep(reconnect_delay) => {}
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
info!("Remote client shutting down");
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => {}
|
||||
Err(_) => return Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10));
|
||||
}
|
||||
}
|
||||
@@ -52,6 +72,7 @@ async fn handle_connection(
|
||||
stream: TcpStream,
|
||||
rx: &mut mpsc::Receiver<RigRequest>,
|
||||
state_tx: &watch::Sender<RigState>,
|
||||
shutdown_rx: &mut watch::Receiver<bool>,
|
||||
) -> RigResult<()> {
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
@@ -60,6 +81,13 @@ async fn handle_connection(
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => return Ok(()),
|
||||
Ok(()) => {}
|
||||
Err(_) => return Ok(()),
|
||||
}
|
||||
}
|
||||
_ = poll_interval.tick() => {
|
||||
if last_poll.elapsed() < config.poll_interval {
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user