[fix](trx-server): auto-recover audio streams and harden frontend reconnect

Implement ALSA/CPAL stream auto-recovery by recreating input/output
streams after backend callback failures with bounded retry delay.

Also improve HTTP frontend resilience by polling /status on reconnect
and after SSE errors to refresh snapshot state after broken pipes.

Co-authored-by: Codex <codex@openai.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-12 23:59:39 +01:00
parent 273283708e
commit d085f96838
2 changed files with 252 additions and 105 deletions
+222 -102
View File
@@ -5,6 +5,7 @@
//! Audio capture, playback, and TCP streaming for trx-server.
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use std::{collections::VecDeque, sync::Mutex};
@@ -31,7 +32,8 @@ const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_SAMPLE_RATE: u32 = 12_000;
const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(10);
const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1);
struct StreamErrorLogger {
label: &'static str,
@@ -40,6 +42,7 @@ struct StreamErrorLogger {
#[derive(Default)]
struct StreamErrorState {
last_kind: Option<&'static str>,
last_error: Option<String>,
last_logged_at: Option<Instant>,
suppressed: u64,
@@ -55,19 +58,14 @@ impl StreamErrorLogger {
fn log(&self, err: &str) {
let now = Instant::now();
let kind = classify_stream_error(err);
let mut state = self
.state
.lock()
.expect("stream error logger mutex poisoned");
let should_log_now = match (&state.last_error, state.last_logged_at) {
(None, _) => true,
(Some(prev), Some(ts)) => {
prev != err || now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL
}
(Some(_), None) => true,
};
if should_log_now {
// First occurrence or changed error class: log as error once.
if state.last_kind != Some(kind) {
if state.suppressed > 0 {
warn!(
"{} repeated {} times: {}",
@@ -77,12 +75,44 @@ impl StreamErrorLogger {
);
}
error!("{}: {}", self.label, err);
state.last_kind = Some(kind);
state.last_error = Some(err.to_string());
state.last_logged_at = Some(now);
state.suppressed = 0;
} else {
state.suppressed += 1;
return;
}
// Same class: suppress aggressively and emit only periodic summaries.
state.suppressed += 1;
let due = state
.last_logged_at
.map(|ts| now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL)
.unwrap_or(false);
if due {
warn!(
"{} recurring ({} repeats/{}s): {}",
self.label,
state.suppressed,
AUDIO_STREAM_ERROR_LOG_INTERVAL.as_secs(),
state.last_error.as_deref().unwrap_or("<unknown>")
);
state.last_logged_at = Some(now);
state.suppressed = 0;
} else {
state.last_error = Some(err.to_string());
}
}
}
fn classify_stream_error(err: &str) -> &'static str {
if err.contains("snd_pcm_poll_descriptors") || err.contains("alsa::poll() returned POLLERR") {
"alsa_poll_failure"
} else if err.contains("Input stream") {
"input_stream_error"
} else if err.contains("Output stream") {
"output_stream_error"
} else {
"other_stream_error"
}
}
@@ -226,6 +256,7 @@ fn run_capture(
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use std::sync::mpsc::RecvTimeoutError;
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
@@ -260,79 +291,110 @@ fn run_capture(
let mut encoder = opus::Encoder::new(sample_rate, opus_channels, opus::Application::Audio)?;
encoder.set_bitrate(opus::Bitrate::Bits(bitrate_bps as i32))?;
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(64);
let input_err_logger = Arc::new(StreamErrorLogger::new("Audio input stream error"));
let stream = device.build_input_stream(
&config,
move |data: &[f32], _: &cpal::InputCallbackInfo| {
let _ = sample_tx.try_send(data.to_vec());
},
{
let input_err_logger = input_err_logger.clone();
move |err| {
input_err_logger.log(&err.to_string());
}
},
None,
)?;
// Start paused — only capture when clients are connected
info!(
"Audio capture: ready ({}Hz, {} ch, {}ms frames)",
sample_rate, channels, frame_duration_ms
);
let input_err_logger = Arc::new(StreamErrorLogger::new("Audio input stream error"));
let mut pcm_buf: Vec<f32> = Vec::with_capacity(frame_samples * 2);
let mut opus_buf = vec![0u8; 4096];
let mut capturing = false;
loop {
let has_receivers =
tx.receiver_count() > 0 || pcm_tx.as_ref().is_some_and(|p| p.receiver_count() > 0);
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(64);
let stream_failed = Arc::new(AtomicBool::new(false));
let stream = match device.build_input_stream(
&config,
move |data: &[f32], _: &cpal::InputCallbackInfo| {
let _ = sample_tx.try_send(data.to_vec());
},
{
let input_err_logger = input_err_logger.clone();
let stream_failed = stream_failed.clone();
move |err| {
input_err_logger.log(&err.to_string());
stream_failed.store(true, Ordering::Relaxed);
}
},
None,
) {
Ok(stream) => stream,
Err(err) => {
warn!(
"Audio capture: failed to open input stream, retrying: {}",
err
);
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
};
if has_receivers && !capturing {
if capturing {
let _ = stream.play();
capturing = true;
info!("Audio capture: started");
} else if !has_receivers && capturing {
let _ = stream.pause();
capturing = false;
pcm_buf.clear();
// Drain any buffered samples
while sample_rx.try_recv().is_ok() {}
info!("Audio capture: paused (no listeners)");
}
if !capturing {
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}
loop {
if stream_failed.load(Ordering::Relaxed) {
warn!("Audio capture: backend stream error, recreating");
break;
}
match sample_rx.recv() {
Ok(samples) => {
pcm_buf.extend_from_slice(&samples);
while pcm_buf.len() >= frame_samples {
let frame: Vec<f32> = pcm_buf.drain(..frame_samples).collect();
if let Some(ref pcm_tx) = pcm_tx {
let _ = pcm_tx.send(frame.clone());
}
match encoder.encode_float(&frame, &mut opus_buf) {
Ok(len) => {
let packet = Bytes::copy_from_slice(&opus_buf[..len]);
let _ = tx.send(packet);
let has_receivers =
tx.receiver_count() > 0 || pcm_tx.as_ref().is_some_and(|p| p.receiver_count() > 0);
if has_receivers && !capturing {
let _ = stream.play();
capturing = true;
info!("Audio capture: started");
} else if !has_receivers && capturing {
let _ = stream.pause();
capturing = false;
pcm_buf.clear();
while sample_rx.try_recv().is_ok() {}
info!("Audio capture: paused (no listeners)");
}
if !capturing {
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}
match sample_rx.recv_timeout(std::time::Duration::from_millis(200)) {
Ok(samples) => {
pcm_buf.extend_from_slice(&samples);
while pcm_buf.len() >= frame_samples {
let frame: Vec<f32> = pcm_buf.drain(..frame_samples).collect();
if let Some(ref pcm_tx) = pcm_tx {
let _ = pcm_tx.send(frame.clone());
}
Err(e) => {
warn!("Opus encode error: {}", e);
match encoder.encode_float(&frame, &mut opus_buf) {
Ok(len) => {
let packet = Bytes::copy_from_slice(&opus_buf[..len]);
let _ = tx.send(packet);
}
Err(e) => {
warn!("Opus encode error: {}", e);
}
}
}
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
warn!("Audio capture: callback channel disconnected, recreating");
break;
}
}
Err(_) => break,
}
}
Ok(())
if capturing {
let _ = stream.pause();
capturing = false;
pcm_buf.clear();
}
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
}
}
/// Spawn the audio playback task.
@@ -362,6 +424,7 @@ fn run_playback(
mut rx: mpsc::Receiver<Bytes>,
) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use tokio::sync::mpsc::error::TryRecvError;
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
@@ -400,63 +463,120 @@ fn run_playback(
));
let ring_writer = ring.clone();
let output_err_logger = Arc::new(StreamErrorLogger::new("Audio output stream error"));
let stream = device.build_output_stream(
&config,
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let mut ring = ring.lock().unwrap();
for sample in data.iter_mut() {
*sample = ring.pop_front().unwrap_or(0.0);
}
},
{
let output_err_logger = output_err_logger.clone();
move |err| {
output_err_logger.log(&err.to_string());
}
},
None,
)?;
// Start paused — only play when TX packets arrive
info!("Audio playback: ready ({}Hz, {} ch)", sample_rate, channels);
let output_err_logger = Arc::new(StreamErrorLogger::new("Audio output stream error"));
let mut pcm_buf = vec![0f32; frame_samples];
let mut playing = false;
let mut channel_closed = false;
loop {
let stream_failed = Arc::new(AtomicBool::new(false));
let stream = match device.build_output_stream(
&config,
{
let ring = ring.clone();
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let mut ring = ring.lock().unwrap();
for sample in data.iter_mut() {
*sample = ring.pop_front().unwrap_or(0.0);
}
}
},
{
let output_err_logger = output_err_logger.clone();
let stream_failed = stream_failed.clone();
move |err| {
output_err_logger.log(&err.to_string());
stream_failed.store(true, Ordering::Relaxed);
}
},
None,
) {
Ok(stream) => stream,
Err(err) => {
warn!(
"Audio playback: failed to open output stream, retrying: {}",
err
);
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
continue;
}
};
while let Some(packet) = rx.blocking_recv() {
if !playing {
// stay paused until packets arrive
} else {
stream.play()?;
playing = true;
info!("Audio playback: started");
}
match decoder.decode_float(&packet, &mut pcm_buf, false) {
Ok(decoded) => {
let mut ring = ring_writer.lock().unwrap();
ring.extend(&pcm_buf[..decoded * channels as usize]);
loop {
if stream_failed.load(Ordering::Relaxed) {
warn!("Audio playback: backend stream error, recreating");
break;
}
Err(e) => {
warn!("Opus decode error: {}", e);
match rx.try_recv() {
Ok(packet) => {
if !playing {
stream.play()?;
playing = true;
info!("Audio playback: started");
}
match decoder.decode_float(&packet, &mut pcm_buf, false) {
Ok(decoded) => {
let mut ring = ring_writer.lock().unwrap();
ring.extend(&pcm_buf[..decoded * channels as usize]);
}
Err(e) => {
warn!("Opus decode error: {}", e);
}
}
if rx.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(
frame_duration_ms as u64 * 2,
));
if rx.is_empty() {
let _ = stream.pause();
playing = false;
ring_writer.lock().unwrap().clear();
info!("Audio playback: paused (idle)");
if channel_closed {
return Ok(());
}
}
}
}
Err(TryRecvError::Empty) => {
if channel_closed && !playing {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
Err(TryRecvError::Disconnected) => {
channel_closed = true;
if !playing {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
}
// Pause when no more packets are queued to avoid ALSA underruns
if rx.is_empty() {
// Drain remaining samples before pausing
std::thread::sleep(std::time::Duration::from_millis(
frame_duration_ms as u64 * 2,
));
if rx.is_empty() {
let _ = stream.pause();
playing = false;
ring_writer.lock().unwrap().clear();
info!("Audio playback: paused (idle)");
}
if playing {
let _ = stream.pause();
playing = false;
}
ring_writer.lock().unwrap().clear();
if channel_closed {
return Ok(());
}
std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY);
}
Ok(())
}
/// Run the APRS decoder task. Only processes PCM when rig mode is PKT.