fix(trx-server): add shutdown signal to audio capture/playback threads
Pass `watch::Receiver<bool>` into `run_capture` and `run_playback` so both threads check for shutdown at the top of their outer recovery loop and inner monitoring loop. Without this, restarting the process (e.g. via systemd after an ALSA error) left the old threads stalled forever while new ones were created alongside them. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -241,6 +241,7 @@ pub fn spawn_audio_capture(
|
|||||||
cfg: &AudioConfig,
|
cfg: &AudioConfig,
|
||||||
tx: broadcast::Sender<Bytes>,
|
tx: broadcast::Sender<Bytes>,
|
||||||
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
|
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
|
||||||
|
shutdown_rx: watch::Receiver<bool>,
|
||||||
) -> std::thread::JoinHandle<()> {
|
) -> std::thread::JoinHandle<()> {
|
||||||
let sample_rate = cfg.sample_rate;
|
let sample_rate = cfg.sample_rate;
|
||||||
let channels = cfg.channels as u16;
|
let channels = cfg.channels as u16;
|
||||||
@@ -257,6 +258,7 @@ pub fn spawn_audio_capture(
|
|||||||
device_name,
|
device_name,
|
||||||
tx,
|
tx,
|
||||||
pcm_tx,
|
pcm_tx,
|
||||||
|
shutdown_rx,
|
||||||
) {
|
) {
|
||||||
error!("Audio capture thread error: {}", e);
|
error!("Audio capture thread error: {}", e);
|
||||||
}
|
}
|
||||||
@@ -271,6 +273,7 @@ fn run_capture(
|
|||||||
device_name: Option<String>,
|
device_name: Option<String>,
|
||||||
tx: broadcast::Sender<Bytes>,
|
tx: broadcast::Sender<Bytes>,
|
||||||
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
|
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
|
||||||
|
shutdown_rx: watch::Receiver<bool>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||||
use std::sync::mpsc::{RecvTimeoutError, TryRecvError as StdTryRecvError};
|
use std::sync::mpsc::{RecvTimeoutError, TryRecvError as StdTryRecvError};
|
||||||
@@ -305,6 +308,11 @@ fn run_capture(
|
|||||||
let mut capturing = false;
|
let mut capturing = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if *shutdown_rx.borrow() {
|
||||||
|
info!("Audio capture: shutdown signal received, exiting");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Re-enumerate the device on every recovery cycle: after POLLERR the
|
// Re-enumerate the device on every recovery cycle: after POLLERR the
|
||||||
// existing device handle can be stale (especially for USB audio).
|
// existing device handle can be stale (especially for USB audio).
|
||||||
let host = cpal::default_host();
|
let host = cpal::default_host();
|
||||||
@@ -383,6 +391,11 @@ fn run_capture(
|
|||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if *shutdown_rx.borrow() {
|
||||||
|
info!("Audio capture: shutdown signal received, exiting");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
match stream_err_rx.try_recv() {
|
match stream_err_rx.try_recv() {
|
||||||
Ok(()) | Err(StdTryRecvError::Disconnected) => {
|
Ok(()) | Err(StdTryRecvError::Disconnected) => {
|
||||||
warn!("Audio capture: backend stream error, recreating");
|
warn!("Audio capture: backend stream error, recreating");
|
||||||
@@ -458,6 +471,7 @@ fn run_capture(
|
|||||||
pub fn spawn_audio_playback(
|
pub fn spawn_audio_playback(
|
||||||
cfg: &AudioConfig,
|
cfg: &AudioConfig,
|
||||||
rx: mpsc::Receiver<Bytes>,
|
rx: mpsc::Receiver<Bytes>,
|
||||||
|
shutdown_rx: watch::Receiver<bool>,
|
||||||
) -> std::thread::JoinHandle<()> {
|
) -> std::thread::JoinHandle<()> {
|
||||||
let sample_rate = cfg.sample_rate;
|
let sample_rate = cfg.sample_rate;
|
||||||
let channels = cfg.channels as u16;
|
let channels = cfg.channels as u16;
|
||||||
@@ -465,7 +479,9 @@ pub fn spawn_audio_playback(
|
|||||||
let device_name = cfg.device.clone();
|
let device_name = cfg.device.clone();
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
if let Err(e) = run_playback(sample_rate, channels, frame_duration_ms, device_name, rx) {
|
if let Err(e) =
|
||||||
|
run_playback(sample_rate, channels, frame_duration_ms, device_name, rx, shutdown_rx)
|
||||||
|
{
|
||||||
error!("Audio playback thread error: {}", e);
|
error!("Audio playback thread error: {}", e);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -477,6 +493,7 @@ fn run_playback(
|
|||||||
frame_duration_ms: u16,
|
frame_duration_ms: u16,
|
||||||
device_name: Option<String>,
|
device_name: Option<String>,
|
||||||
mut rx: mpsc::Receiver<Bytes>,
|
mut rx: mpsc::Receiver<Bytes>,
|
||||||
|
shutdown_rx: watch::Receiver<bool>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||||
use std::sync::mpsc::TryRecvError as StdTryRecvError;
|
use std::sync::mpsc::TryRecvError as StdTryRecvError;
|
||||||
@@ -513,6 +530,11 @@ fn run_playback(
|
|||||||
let mut channel_closed = false;
|
let mut channel_closed = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if *shutdown_rx.borrow() {
|
||||||
|
info!("Audio playback: shutdown signal received, exiting");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Re-enumerate the device on every recovery cycle: after POLLERR the
|
// Re-enumerate the device on every recovery cycle: after POLLERR the
|
||||||
// existing device handle can be stale (especially for USB audio).
|
// existing device handle can be stale (especially for USB audio).
|
||||||
let host = cpal::default_host();
|
let host = cpal::default_host();
|
||||||
@@ -600,6 +622,11 @@ fn run_playback(
|
|||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if *shutdown_rx.borrow() {
|
||||||
|
info!("Audio playback: shutdown signal received, exiting");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
match stream_err_rx.try_recv() {
|
match stream_err_rx.try_recv() {
|
||||||
Ok(()) | Err(StdTryRecvError::Disconnected) => {
|
Ok(()) | Err(StdTryRecvError::Disconnected) => {
|
||||||
warn!("Audio playback: backend stream error, recreating");
|
warn!("Audio playback: backend stream error, recreating");
|
||||||
|
|||||||
@@ -474,6 +474,7 @@ fn spawn_rig_audio_stack(
|
|||||||
&rig_cfg.audio,
|
&rig_cfg.audio,
|
||||||
rx_audio_tx.clone(),
|
rx_audio_tx.clone(),
|
||||||
Some(pcm_tx.clone()),
|
Some(pcm_tx.clone()),
|
||||||
|
shutdown_rx.clone(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -542,7 +543,8 @@ fn spawn_rig_audio_stack(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if rig_cfg.audio.tx_enabled {
|
if rig_cfg.audio.tx_enabled {
|
||||||
let _playback_thread = audio::spawn_audio_playback(&rig_cfg.audio, tx_audio_rx);
|
let _playback_thread =
|
||||||
|
audio::spawn_audio_playback(&rig_cfg.audio, tx_audio_rx, shutdown_rx.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let audio_shutdown_rx = shutdown_rx.clone();
|
let audio_shutdown_rx = shutdown_rx.clone();
|
||||||
|
|||||||
Reference in New Issue
Block a user