[fix](trx-server): make ALSA stream recovery reliable

Signal backend stream callback failures directly to capture/playback loops and strengthen cross-thread failure flag visibility so recurring POLLERR conditions consistently trigger stream recreation.

Co-authored-by: Codex <codex@openai.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-13 00:12:12 +01:00
parent c5b441f75b
commit a7719bd7a9
+31 -8
View File
@@ -256,7 +256,7 @@ fn run_capture(
pcm_tx: Option<broadcast::Sender<Vec<f32>>>, pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::{RecvTimeoutError, TryRecvError as StdTryRecvError};
let host = cpal::default_host(); let host = cpal::default_host();
let device = if let Some(ref name) = device_name { let device = if let Some(ref name) = device_name {
@@ -304,6 +304,7 @@ fn run_capture(
loop { loop {
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(64); let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(64);
let (stream_err_tx, stream_err_rx) = std::sync::mpsc::sync_channel::<()>(1);
let stream_failed = Arc::new(AtomicBool::new(false)); let stream_failed = Arc::new(AtomicBool::new(false));
let stream = match device.build_input_stream( let stream = match device.build_input_stream(
&config, &config,
@@ -313,9 +314,11 @@ fn run_capture(
{ {
let input_err_logger = input_err_logger.clone(); let input_err_logger = input_err_logger.clone();
let stream_failed = stream_failed.clone(); let stream_failed = stream_failed.clone();
let stream_err_tx = stream_err_tx.clone();
move |err| { move |err| {
input_err_logger.log(&err.to_string()); input_err_logger.log(&err.to_string());
stream_failed.store(true, Ordering::Relaxed); stream_failed.store(true, Ordering::SeqCst);
let _ = stream_err_tx.try_send(());
} }
}, },
None, None,
@@ -336,7 +339,15 @@ fn run_capture(
} }
loop { loop {
if stream_failed.load(Ordering::Relaxed) { match stream_err_rx.try_recv() {
Ok(()) | Err(StdTryRecvError::Disconnected) => {
warn!("Audio capture: backend stream error, recreating");
break;
}
Err(StdTryRecvError::Empty) => {}
}
if stream_failed.load(Ordering::SeqCst) {
warn!("Audio capture: backend stream error, recreating"); warn!("Audio capture: backend stream error, recreating");
break; break;
} }
@@ -424,7 +435,8 @@ fn run_playback(
mut rx: mpsc::Receiver<Bytes>, mut rx: mpsc::Receiver<Bytes>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use tokio::sync::mpsc::error::TryRecvError; use std::sync::mpsc::TryRecvError as StdTryRecvError;
use tokio::sync::mpsc::error::TryRecvError as TokioTryRecvError;
let host = cpal::default_host(); let host = cpal::default_host();
let device = if let Some(ref name) = device_name { let device = if let Some(ref name) = device_name {
@@ -472,6 +484,7 @@ fn run_playback(
let mut channel_closed = false; let mut channel_closed = false;
loop { loop {
let (stream_err_tx, stream_err_rx) = std::sync::mpsc::sync_channel::<()>(1);
let stream_failed = Arc::new(AtomicBool::new(false)); let stream_failed = Arc::new(AtomicBool::new(false));
let stream = match device.build_output_stream( let stream = match device.build_output_stream(
&config, &config,
@@ -487,9 +500,11 @@ fn run_playback(
{ {
let output_err_logger = output_err_logger.clone(); let output_err_logger = output_err_logger.clone();
let stream_failed = stream_failed.clone(); let stream_failed = stream_failed.clone();
let stream_err_tx = stream_err_tx.clone();
move |err| { move |err| {
output_err_logger.log(&err.to_string()); output_err_logger.log(&err.to_string());
stream_failed.store(true, Ordering::Relaxed); stream_failed.store(true, Ordering::SeqCst);
let _ = stream_err_tx.try_send(());
} }
}, },
None, None,
@@ -512,7 +527,15 @@ fn run_playback(
} }
loop { loop {
if stream_failed.load(Ordering::Relaxed) { match stream_err_rx.try_recv() {
Ok(()) | Err(StdTryRecvError::Disconnected) => {
warn!("Audio playback: backend stream error, recreating");
break;
}
Err(StdTryRecvError::Empty) => {}
}
if stream_failed.load(Ordering::SeqCst) {
warn!("Audio playback: backend stream error, recreating"); warn!("Audio playback: backend stream error, recreating");
break; break;
} }
@@ -550,13 +573,13 @@ fn run_playback(
} }
} }
} }
Err(TryRecvError::Empty) => { Err(TokioTryRecvError::Empty) => {
if channel_closed && !playing { if channel_closed && !playing {
return Ok(()); return Ok(());
} }
std::thread::sleep(std::time::Duration::from_millis(20)); std::thread::sleep(std::time::Duration::from_millis(20));
} }
Err(TryRecvError::Disconnected) => { Err(TokioTryRecvError::Disconnected) => {
channel_closed = true; channel_closed = true;
if !playing { if !playing {
return Ok(()); return Ok(());