diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index 3b30263..36ce34c 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -15,9 +15,8 @@ use tokio::time; use tracing::{info, warn}; use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, - AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, - AUDIO_MSG_TX_FRAME, + read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, + AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, }; use trx_core::decode::DecodedMessage; @@ -28,16 +27,29 @@ pub async fn run_audio_client( mut tx_rx: mpsc::Receiver, stream_info_tx: watch::Sender>, decode_tx: broadcast::Sender, + mut shutdown_rx: watch::Receiver, ) { let mut reconnect_delay = Duration::from_secs(1); loop { + if *shutdown_rx.borrow() { + info!("Audio client shutting down"); + return; + } + info!("Audio client: connecting to {}", server_addr); match TcpStream::connect(&server_addr).await { Ok(stream) => { reconnect_delay = Duration::from_secs(1); - if let Err(e) = - handle_audio_connection(stream, &rx_tx, &mut tx_rx, &stream_info_tx, &decode_tx).await + if let Err(e) = handle_audio_connection( + stream, + &rx_tx, + &mut tx_rx, + &stream_info_tx, + &decode_tx, + &mut shutdown_rx, + ) + .await { warn!("Audio connection dropped: {}", e); } @@ -48,7 +60,19 @@ pub async fn run_audio_client( } let _ = stream_info_tx.send(None); - time::sleep(reconnect_delay).await; + tokio::select! { + _ = time::sleep(reconnect_delay) => {} + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("Audio client shutting down"); + return; + } + Ok(()) => {} + Err(_) => return, + } + } + } reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10)); } } @@ -59,6 +83,7 @@ async fn handle_audio_connection( tx_rx: &mut mpsc::Receiver, stream_info_tx: &watch::Sender>, decode_tx: &broadcast::Sender, + shutdown_rx: &mut watch::Receiver, ) -> std::io::Result<()> { let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); @@ -89,7 +114,10 @@ async fn handle_audio_connection( Ok((AUDIO_MSG_RX_FRAME, payload)) => { let _ = rx_tx.send(Bytes::from(payload)); } - Ok((AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE | AUDIO_MSG_FT8_DECODE, payload)) => { + Ok(( + AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE | AUDIO_MSG_FT8_DECODE, + payload, + )) => { if let Ok(msg) = serde_json::from_slice::(&payload) { let _ = decode_tx.send(msg); } @@ -105,6 +133,19 @@ async fn handle_audio_connection( // Forward TX frames to server loop { tokio::select! { + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + rx_handle.abort(); + return Ok(()); + } + Ok(()) => {} + Err(_) => { + rx_handle.abort(); + return Ok(()); + } + } + } packet = tx_rx.recv() => { match packet { Some(data) => { diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index fd1f465..ec7ed26 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -14,16 +14,19 @@ use bytes::Bytes; use clap::Parser; use tokio::signal; use tokio::sync::{broadcast, mpsc, watch}; -use tracing::info; +use tokio::task::JoinHandle; +use tracing::{error, info}; use trx_app::{init_logging, load_plugins, normalize_name}; use trx_core::audio::AudioStreamInfo; +use trx_core::decode::DecodedMessage; use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; use trx_core::DynResult; -use trx_frontend::{snapshot_bootstrap_context, FrontendRegistrationContext, FrontendRuntimeContext}; -use trx_core::decode::DecodedMessage; +use trx_frontend::{ + snapshot_bootstrap_context, FrontendRegistrationContext, FrontendRuntimeContext, +}; use trx_frontend_http::register_frontend_on as register_http_frontend; use trx_frontend_http_json::register_frontend_on as register_http_json_frontend; use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend; @@ -81,20 +84,33 @@ struct Cli { callsign: Option, } -fn main() -> DynResult<()> { - let rt = tokio::runtime::Runtime::new()?; +#[tokio::main] +async fn main() -> DynResult<()> { + let app_state = async_init().await?; + signal::ctrl_c().await?; + info!("Ctrl+C received, shutting down"); - let _app_state = rt.block_on(async_init())?; + let _ = app_state.shutdown_tx.send(true); + drop(app_state.request_tx); + tokio::time::sleep(Duration::from_millis(400)).await; - rt.block_on(async { - signal::ctrl_c().await?; - info!("Ctrl+C received, shutting down"); - Ok(()) - }) + for handle in &app_state.task_handles { + if !handle.is_finished() { + handle.abort(); + } + } + for handle in app_state.task_handles { + let _ = handle.await; + } + Ok(()) } /// Holds the state needed after async initialization completes. -struct AppState; +struct AppState { + shutdown_tx: watch::Sender, + task_handles: Vec>, + request_tx: mpsc::Sender, +} async fn async_init() -> DynResult { use std::sync::Arc; @@ -151,14 +167,9 @@ async fn async_init() -> DynResult { let remote_addr = parse_remote_url(&remote_url).map_err(|e| format!("Invalid remote URL: {}", e))?; - let remote_token = cli - .token - .clone() - .or_else(|| cfg.remote.auth.token.clone()); + let remote_token = cli.token.clone().or_else(|| cfg.remote.auth.token.clone()); - let poll_interval_ms = cli - .poll_interval_ms - .unwrap_or(cfg.remote.poll_interval_ms); + let poll_interval_ms = cli.poll_interval_ms.unwrap_or(cfg.remote.poll_interval_ms); // Resolve frontends: CLI > config > default to http let frontends: Vec = if let Some(ref fes) = cli.frontends { @@ -210,6 +221,8 @@ async fn async_init() -> DynResult { ); let (tx, rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let mut task_handles: Vec> = Vec::new(); let initial_state = RigState::new_uninitialized(); let (state_tx, state_rx) = watch::channel(initial_state); @@ -226,8 +239,14 @@ async fn async_init() -> DynResult { token: remote_token, poll_interval: Duration::from_millis(poll_interval_ms), }; - let _remote_handle = - tokio::spawn(remote_client::run_remote_client(remote_cfg, rx, state_tx)); + let remote_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + if let Err(e) = + remote_client::run_remote_client(remote_cfg, rx, state_tx, remote_shutdown_rx).await + { + error!("Remote client error: {}", e); + } + })); // Audio streaming setup if cfg.frontends.audio.enabled { @@ -248,13 +267,15 @@ async fn async_init() -> DynResult { audio_addr ); - tokio::spawn(audio_client::run_audio_client( + let audio_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(audio_client::run_audio_client( audio_addr, rx_audio_tx, tx_audio_rx, stream_info_tx, decode_tx, - )); + audio_shutdown_rx, + ))); } else { info!("Audio disabled in config, decode will not be available"); } @@ -282,5 +303,9 @@ async fn async_init() -> DynResult { )?; } - Ok(AppState) + Ok(AppState { + shutdown_tx, + task_handles, + request_tx: tx, + }) } diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index a56302d..8248a0b 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -26,14 +26,22 @@ pub async fn run_remote_client( config: RemoteClientConfig, mut rx: mpsc::Receiver, state_tx: watch::Sender, + mut shutdown_rx: watch::Receiver, ) -> RigResult<()> { let mut reconnect_delay = Duration::from_secs(1); loop { + if *shutdown_rx.borrow() { + info!("Remote client shutting down"); + return Ok(()); + } + info!("Remote client: connecting to {}", config.addr); match TcpStream::connect(&config.addr).await { Ok(stream) => { - if let Err(e) = handle_connection(&config, stream, &mut rx, &state_tx).await { + if let Err(e) = + handle_connection(&config, stream, &mut rx, &state_tx, &mut shutdown_rx).await + { warn!("Remote connection dropped: {}", e); } } @@ -42,7 +50,19 @@ pub async fn run_remote_client( } } - time::sleep(reconnect_delay).await; + tokio::select! { + _ = time::sleep(reconnect_delay) => {} + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("Remote client shutting down"); + return Ok(()); + } + Ok(()) => {} + Err(_) => return Ok(()), + } + } + } reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10)); } } @@ -52,6 +72,7 @@ async fn handle_connection( stream: TcpStream, rx: &mut mpsc::Receiver, state_tx: &watch::Sender, + shutdown_rx: &mut watch::Receiver, ) -> RigResult<()> { let (reader, mut writer) = stream.into_split(); let mut reader = BufReader::new(reader); @@ -60,6 +81,13 @@ async fn handle_connection( loop { tokio::select! { + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => return Ok(()), + Ok(()) => {} + Err(_) => return Ok(()), + } + } _ = poll_interval.tick() => { if last_poll.elapsed() < config.poll_interval { continue; diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index f10faab..139bfbe 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -5,9 +5,9 @@ //! Audio capture, playback, and TCP streaming for trx-server. use std::net::SocketAddr; +use std::sync::OnceLock; use std::time::{Duration, Instant}; use std::{collections::VecDeque, sync::Mutex}; -use std::sync::OnceLock; use bytes::Bytes; use tokio::net::{TcpListener, TcpStream}; @@ -15,9 +15,8 @@ use tokio::sync::{broadcast, mpsc, watch}; use tracing::{error, info, warn}; use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, - AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, - AUDIO_MSG_TX_FRAME, + read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, + AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, }; use trx_core::decode::{AprsPacket, DecodedMessage, Ft8Message}; use trx_core::rig::state::{RigMode, RigState}; @@ -113,9 +112,15 @@ pub fn spawn_audio_capture( let device_name = cfg.device.clone(); std::thread::spawn(move || { - if let Err(e) = - run_capture(sample_rate, channels, frame_duration_ms, bitrate_bps, device_name, tx, pcm_tx) - { + if let Err(e) = run_capture( + sample_rate, + channels, + frame_duration_ms, + bitrate_bps, + device_name, + tx, + pcm_tx, + ) { error!("Audio capture thread error: {}", e); } }) @@ -153,7 +158,8 @@ fn run_capture( buffer_size: cpal::BufferSize::Default, }; - let frame_samples = (sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize; + let frame_samples = + (sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize; let opus_channels = match channels { 1 => opus::Channels::Mono, @@ -178,15 +184,18 @@ fn run_capture( )?; // Start paused — only capture when clients are connected - info!("Audio capture: ready ({}Hz, {} ch, {}ms frames)", sample_rate, channels, frame_duration_ms); + info!( + "Audio capture: ready ({}Hz, {} ch, {}ms frames)", + sample_rate, channels, frame_duration_ms + ); let mut pcm_buf: Vec = Vec::with_capacity(frame_samples * 2); let mut opus_buf = vec![0u8; 4096]; let mut capturing = false; loop { - let has_receivers = tx.receiver_count() > 0 - || pcm_tx.as_ref().is_some_and(|p| p.receiver_count() > 0); + let has_receivers = + tx.receiver_count() > 0 || pcm_tx.as_ref().is_some_and(|p| p.receiver_count() > 0); if has_receivers && !capturing { let _ = stream.play(); @@ -281,7 +290,8 @@ fn run_playback( buffer_size: cpal::BufferSize::Default, }; - let frame_samples = (sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize; + let frame_samples = + (sample_rate as usize * frame_duration_ms as usize / 1000) * channels as usize; let opus_channels = match channels { 1 => opus::Channels::Mono, @@ -291,7 +301,9 @@ fn run_playback( let mut decoder = opus::Decoder::new(sample_rate, opus_channels)?; - let ring = std::sync::Arc::new(std::sync::Mutex::new(std::collections::VecDeque::::with_capacity(frame_samples * 8))); + let ring = std::sync::Arc::new(std::sync::Mutex::new( + std::collections::VecDeque::::with_capacity(frame_samples * 8), + )); let ring_writer = ring.clone(); let stream = device.build_output_stream( @@ -334,7 +346,9 @@ fn run_playback( // Pause when no more packets are queued to avoid ALSA underruns if rx.is_empty() { // Drain remaining samples before pausing - std::thread::sleep(std::time::Duration::from_millis(frame_duration_ms as u64 * 2)); + std::thread::sleep(std::time::Duration::from_millis( + frame_duration_ms as u64 * 2, + )); if rx.is_empty() { let _ = stream.pause(); playing = false; @@ -746,26 +760,43 @@ pub async fn run_audio_listener( tx_audio: mpsc::Sender, stream_info: AudioStreamInfo, decode_tx: broadcast::Sender, + mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; info!("Audio listener on {}", addr); loop { - let (socket, peer) = listener.accept().await?; - info!("Audio client connected: {}", peer); + tokio::select! { + accept = listener.accept() => { + let (socket, peer) = accept?; + info!("Audio client connected: {}", peer); - let rx_audio = rx_audio.clone(); - let tx_audio = tx_audio.clone(); - let info = stream_info.clone(); - let decode_tx = decode_tx.clone(); + let rx_audio = rx_audio.clone(); + let tx_audio = tx_audio.clone(); + let info = stream_info.clone(); + let decode_tx = decode_tx.clone(); + let client_shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { - if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx).await { - warn!("Audio client {} error: {:?}", peer, e); + tokio::spawn(async move { + if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx, client_shutdown_rx).await { + warn!("Audio client {} error: {:?}", peer, e); + } + info!("Audio client {} disconnected", peer); + }); } - info!("Audio client {} disconnected", peer); - }); + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("Audio listener shutting down"); + break; + } + Ok(()) => {} + Err(_) => break, + } + } + } } + Ok(()) } async fn handle_audio_client( @@ -775,14 +806,14 @@ async fn handle_audio_client( tx_audio: mpsc::Sender, stream_info: AudioStreamInfo, decode_tx: broadcast::Sender, + mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let (reader, writer) = socket.into_split(); let mut reader = tokio::io::BufReader::new(reader); let mut writer = tokio::io::BufWriter::new(writer); // Send stream info - let info_json = serde_json::to_vec(&stream_info) - .map_err(std::io::Error::other)?; + let info_json = serde_json::to_vec(&stream_info).map_err(std::io::Error::other)?; write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; // Send APRS history to newly connected client. @@ -852,7 +883,23 @@ async fn handle_audio_client( // Read TX frames from client loop { - match read_audio_msg(&mut reader).await { + let msg = tokio::select! { + msg = read_audio_msg(&mut reader) => msg, + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + rx_handle.abort(); + return Ok(()); + } + Ok(()) => continue, + Err(_) => { + rx_handle.abort(); + return Ok(()); + } + } + } + }; + match msg { Ok((AUDIO_MSG_TX_FRAME, payload)) => { let _ = tx_audio.send(Bytes::from(payload)).await; } diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index df83475..05024a1 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -30,6 +30,7 @@ pub async fn run_listener( rig_tx: mpsc::Sender, auth_tokens: HashSet, state_rx: watch::Receiver, + mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; info!("Listening on {}", addr); @@ -37,18 +38,34 @@ pub async fn run_listener( let validator = Arc::new(SimpleTokenValidator::new(auth_tokens)); loop { - let (socket, peer) = listener.accept().await?; - info!("Client connected: {}", peer); + tokio::select! { + accept = listener.accept() => { + let (socket, peer) = accept?; + info!("Client connected: {}", peer); - let tx = rig_tx.clone(); - let srx = state_rx.clone(); - let validator = Arc::clone(&validator); - tokio::spawn(async move { - if let Err(e) = handle_client(socket, peer, tx, validator, srx).await { - error!("Client {} error: {:?}", peer, e); + let tx = rig_tx.clone(); + let srx = state_rx.clone(); + let validator = Arc::clone(&validator); + let client_shutdown_rx = shutdown_rx.clone(); + tokio::spawn(async move { + if let Err(e) = handle_client(socket, peer, tx, validator, srx, client_shutdown_rx).await { + error!("Client {} error: {:?}", peer, e); + } + }); } - }); + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("Listener shutting down"); + break; + } + Ok(()) => {} + Err(_) => break, + } + } + } } + Ok(()) } async fn handle_client( @@ -57,6 +74,7 @@ async fn handle_client( tx: mpsc::Sender, validator: Arc, state_rx: watch::Receiver, + mut shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { let (reader, mut writer) = socket.into_split(); let mut reader = BufReader::new(reader); @@ -64,7 +82,19 @@ async fn handle_client( loop { line.clear(); - let bytes_read = reader.read_line(&mut line).await?; + let bytes_read = tokio::select! { + read = reader.read_line(&mut line) => read?, + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("Client {} closing due to shutdown", addr); + break; + } + Ok(()) => continue, + Err(_) => break, + } + } + }; if bytes_read == 0 { info!("Client {} disconnected", addr); break; @@ -141,7 +171,19 @@ async fn handle_client( continue; } - match resp_rx.await { + match tokio::select! { + result = resp_rx => result, + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("Client {} request canceled due to shutdown", addr); + break; + } + Ok(()) => continue, + Err(_) => break, + } + } + } { Ok(Ok(snapshot)) => { let resp = ClientResponse { success: true, diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 18047d1..a5bae21 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -18,6 +18,7 @@ use bytes::Bytes; use clap::{Parser, ValueEnum}; use tokio::signal; use tokio::sync::{broadcast, mpsc, watch}; +use tokio::task::JoinHandle; use tracing::{error, info}; use trx_core::audio::AudioStreamInfo; @@ -114,9 +115,7 @@ fn resolve_config( let rig = match rig_str.as_deref() { Some(name) => normalize_name(name), None => { - return Err( - "Rig model not specified. Use --rig or set [rig].model in config.".into(), - ) + return Err("Rig model not specified. Use --rig or set [rig].model in config.".into()) } }; if !registry.is_backend_registered(&rig) { @@ -142,8 +141,7 @@ fn resolve_config( Some("serial") | None => { let (path, baud) = if let Some(ref addr) = cli.rig_addr { parse_serial_addr(addr)? - } else if let (Some(port), Some(baud)) = - (&cfg.rig.access.port, cfg.rig.access.baud) + } else if let (Some(port), Some(baud)) = (&cfg.rig.access.port, cfg.rig.access.baud) { (port.clone(), baud) } else { @@ -184,7 +182,6 @@ fn resolve_config( }) } - fn build_rig_task_config( resolved: &ResolvedConfig, cfg: &ServerConfig, @@ -212,6 +209,17 @@ fn build_rig_task_config( } } +async fn wait_for_shutdown(mut shutdown_rx: watch::Receiver) { + if *shutdown_rx.borrow() { + return; + } + while shutdown_rx.changed().await.is_ok() { + if *shutdown_rx.borrow() { + break; + } + } +} + #[tokio::main] async fn main() -> DynResult<()> { // Phase 3B: Create bootstrap context for explicit initialization. @@ -266,6 +274,8 @@ async fn main() -> DynResult<()> { } let (tx, rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); + let mut task_handles: Vec> = Vec::new(); + let (shutdown_tx, shutdown_rx) = watch::channel(false); let initial_state = RigState::new_with_metadata( resolved.callsign.clone(), Some(env!("CARGO_PKG_VERSION").to_string()), @@ -278,8 +288,15 @@ async fn main() -> DynResult<()> { // Keep receivers alive so channels don't close prematurely let _state_rx = state_rx; - let rig_task_config = build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx)); - let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx)); + let rig_task_config = + build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx)); + let rig_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + if let Err(e) = rig_task::run_rig_task(rig_task_config, rx, state_tx, rig_shutdown_rx).await + { + error!("Rig task error: {:?}", e); + } + })); if cfg.listen.enabled { let listen_ip = cli.listen.unwrap_or(cfg.listen.listen); @@ -295,15 +312,25 @@ async fn main() -> DynResult<()> { .collect(); let rig_tx = tx.clone(); let state_rx_listener = _state_rx.clone(); - tokio::spawn(async move { - if let Err(e) = listener::run_listener(listen_addr, rig_tx, auth_tokens, state_rx_listener).await { + let listener_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + if let Err(e) = listener::run_listener( + listen_addr, + rig_tx, + auth_tokens, + state_rx_listener, + listener_shutdown_rx, + ) + .await + { error!("Listener error: {:?}", e); } - }); + })); } if cfg.audio.enabled { - let audio_listen = SocketAddr::from((cli.listen.unwrap_or(cfg.audio.listen), cfg.audio.port)); + let audio_listen = + SocketAddr::from((cli.listen.unwrap_or(cfg.audio.listen), cfg.audio.port)); let stream_info = AudioStreamInfo { sample_rate: cfg.audio.sample_rate, channels: cfg.audio.channels, @@ -319,7 +346,8 @@ async fn main() -> DynResult<()> { let (decode_tx, _) = broadcast::channel::(256); if cfg.audio.rx_enabled { - let _capture_thread = audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone())); + let _capture_thread = + audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone())); // Spawn APRS decoder task let aprs_pcm_rx = pcm_tx.subscribe(); @@ -327,9 +355,13 @@ async fn main() -> DynResult<()> { let aprs_decode_tx = decode_tx.clone(); let aprs_sr = cfg.audio.sample_rate; let aprs_ch = cfg.audio.channels; - tokio::spawn(audio::run_aprs_decoder( - aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, - )); + let aprs_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx) => {} + _ = wait_for_shutdown(aprs_shutdown_rx) => {} + } + })); // Spawn CW decoder task let cw_pcm_rx = pcm_tx.subscribe(); @@ -337,9 +369,13 @@ async fn main() -> DynResult<()> { let cw_decode_tx = decode_tx.clone(); let cw_sr = cfg.audio.sample_rate; let cw_ch = cfg.audio.channels; - tokio::spawn(audio::run_cw_decoder( - cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, - )); + let cw_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx) => {} + _ = wait_for_shutdown(cw_shutdown_rx) => {} + } + })); // Spawn FT8 decoder task let ft8_pcm_rx = pcm_tx.subscribe(); @@ -347,27 +383,48 @@ async fn main() -> DynResult<()> { let ft8_decode_tx = decode_tx.clone(); let ft8_sr = cfg.audio.sample_rate; let ft8_ch = cfg.audio.channels; - tokio::spawn(audio::run_ft8_decoder( - ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx, - )); + let ft8_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx) => {} + _ = wait_for_shutdown(ft8_shutdown_rx) => {} + } + })); } if cfg.audio.tx_enabled { let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx); } - tokio::spawn(async move { - if let Err(e) = - audio::run_audio_listener(audio_listen, rx_audio_tx, tx_audio_tx, stream_info, decode_tx) - .await + let audio_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + if let Err(e) = audio::run_audio_listener( + audio_listen, + rx_audio_tx, + tx_audio_tx, + stream_info, + decode_tx, + audio_shutdown_rx, + ) + .await { error!("Audio listener error: {:?}", e); } - }); + })); } - let _tx = tx; - signal::ctrl_c().await?; info!("Ctrl+C received, shutting down"); + let _ = shutdown_tx.send(true); + drop(tx); + tokio::time::sleep(Duration::from_millis(400)).await; + + for handle in &task_handles { + if !handle.is_finished() { + handle.abort(); + } + } + for handle in task_handles { + let _ = handle.await; + } Ok(()) } diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 3b20143..1b889ef 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -4,8 +4,8 @@ //! Rig task implementation using controller components. -use std::time::Duration; use std::sync::Arc; +use std::time::Duration; use tokio::sync::{mpsc, watch}; use tokio::time::{self, Instant}; @@ -81,6 +81,7 @@ pub async fn run_rig_task( config: RigTaskConfig, mut rx: mpsc::Receiver, state_tx: watch::Sender, + mut shutdown_rx: watch::Receiver, ) -> DynResult<()> { info!("Opening rig backend {}", config.rig_model); match &config.access { @@ -88,7 +89,9 @@ pub async fn run_rig_task( RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr), } - let mut rig: Box = config.registry.build_rig(&config.rig_model, config.access)?; + let mut rig: Box = config + .registry + .build_rig(&config.rig_model, config.access)?; info!("Rig backend ready"); // Initialize state machine and state @@ -218,6 +221,16 @@ pub async fn run_rig_task( } tokio::select! { + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => { + info!("rig_task shutting down (signal)"); + break; + } + Ok(()) => {} + Err(_) => break, + } + } _ = &mut poll_sleep => { poll_sleep = Box::pin(tokio::time::sleep(current_poll_duration)); // Check if polling is paused