diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 5f48ec5..0cccad1 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -256,7 +256,7 @@ fn run_capture( pcm_tx: Option>>, ) -> Result<(), Box> { use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; - use std::sync::mpsc::RecvTimeoutError; + use std::sync::mpsc::{RecvTimeoutError, TryRecvError as StdTryRecvError}; let host = cpal::default_host(); let device = if let Some(ref name) = device_name { @@ -304,6 +304,7 @@ fn run_capture( loop { let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(64); + let (stream_err_tx, stream_err_rx) = std::sync::mpsc::sync_channel::<()>(1); let stream_failed = Arc::new(AtomicBool::new(false)); let stream = match device.build_input_stream( &config, @@ -313,9 +314,11 @@ fn run_capture( { let input_err_logger = input_err_logger.clone(); let stream_failed = stream_failed.clone(); + let stream_err_tx = stream_err_tx.clone(); move |err| { input_err_logger.log(&err.to_string()); - stream_failed.store(true, Ordering::Relaxed); + stream_failed.store(true, Ordering::SeqCst); + let _ = stream_err_tx.try_send(()); } }, None, @@ -336,7 +339,15 @@ fn run_capture( } loop { - if stream_failed.load(Ordering::Relaxed) { + match stream_err_rx.try_recv() { + Ok(()) | Err(StdTryRecvError::Disconnected) => { + warn!("Audio capture: backend stream error, recreating"); + break; + } + Err(StdTryRecvError::Empty) => {} + } + + if stream_failed.load(Ordering::SeqCst) { warn!("Audio capture: backend stream error, recreating"); break; } @@ -424,7 +435,8 @@ fn run_playback( mut rx: mpsc::Receiver, ) -> Result<(), Box> { use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; - use tokio::sync::mpsc::error::TryRecvError; + use std::sync::mpsc::TryRecvError as StdTryRecvError; + use tokio::sync::mpsc::error::TryRecvError as TokioTryRecvError; let host = cpal::default_host(); let device = if let Some(ref name) = device_name { @@ -472,6 +484,7 @@ fn run_playback( let mut channel_closed = false; loop { + let (stream_err_tx, stream_err_rx) = std::sync::mpsc::sync_channel::<()>(1); let stream_failed = Arc::new(AtomicBool::new(false)); let stream = match device.build_output_stream( &config, @@ -487,9 +500,11 @@ fn run_playback( { let output_err_logger = output_err_logger.clone(); let stream_failed = stream_failed.clone(); + let stream_err_tx = stream_err_tx.clone(); move |err| { output_err_logger.log(&err.to_string()); - stream_failed.store(true, Ordering::Relaxed); + stream_failed.store(true, Ordering::SeqCst); + let _ = stream_err_tx.try_send(()); } }, None, @@ -512,7 +527,15 @@ fn run_playback( } loop { - if stream_failed.load(Ordering::Relaxed) { + match stream_err_rx.try_recv() { + Ok(()) | Err(StdTryRecvError::Disconnected) => { + warn!("Audio playback: backend stream error, recreating"); + break; + } + Err(StdTryRecvError::Empty) => {} + } + + if stream_failed.load(Ordering::SeqCst) { warn!("Audio playback: backend stream error, recreating"); break; } @@ -550,13 +573,13 @@ fn run_playback( } } } - Err(TryRecvError::Empty) => { + Err(TokioTryRecvError::Empty) => { if channel_closed && !playing { return Ok(()); } std::thread::sleep(std::time::Duration::from_millis(20)); } - Err(TryRecvError::Disconnected) => { + Err(TokioTryRecvError::Disconnected) => { channel_closed = true; if !playing { return Ok(());