From d085f96838eed839b672438da1b8933e1dbeb8cf Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Thu, 12 Feb 2026 23:59:39 +0100 Subject: [PATCH] [fix](trx-server): auto-recover audio streams and harden frontend reconnect Implement ALSA/CPAL stream auto-recovery by recreating input/output streams after backend callback failures with bounded retry delay. Also improve HTTP frontend resilience by polling /status on reconnect and after SSE errors to refresh snapshot state after broken pipes. Co-authored-by: Codex Signed-off-by: Stanislaw Grams --- .../trx-frontend-http/assets/web/app.js | 33 +- src/trx-server/src/audio.rs | 324 ++++++++++++------ 2 files changed, 252 insertions(+), 105 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 94e0233..70f56b3 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -74,6 +74,7 @@ let initialized = false; let lastEventAt = Date.now(); let es; let esHeartbeat; +let reconnectTimer = null; function formatFreq(hz) { if (!Number.isFinite(hz)) return "--"; @@ -446,6 +447,26 @@ function render(update) { } } +function scheduleReconnect(delayMs = 1000) { + if (reconnectTimer) return; + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + connect(); + }, delayMs); +} + +async function pollFreshSnapshot() { + try { + const resp = await fetch("/status", { cache: "no-store" }); + if (!resp.ok) return; + const data = await resp.json(); + render(data); + lastEventAt = Date.now(); + } catch (e) { + // Ignore network errors; connect() retry loop handles reconnection. + } +} + function connect() { if (es) { es.close(); @@ -453,9 +474,13 @@ function connect() { if (esHeartbeat) { clearInterval(esHeartbeat); } + pollFreshSnapshot(); es = new EventSource("/events"); lastEventAt = Date.now(); -es.onmessage = (evt) => { + es.onopen = () => { + pollFreshSnapshot(); + }; + es.onmessage = (evt) => { try { if (evt.data === lastRendered) return; const data = JSON.parse(evt.data); @@ -472,14 +497,16 @@ es.onmessage = (evt) => { es.onerror = () => { powerHint.textContent = "Disconnected, retrying…"; es.close(); - setTimeout(connect, 1000); + pollFreshSnapshot(); + scheduleReconnect(1000); }; esHeartbeat = setInterval(() => { const now = Date.now(); if (now - lastEventAt > 15000) { es.close(); - connect(); + pollFreshSnapshot(); + scheduleReconnect(250); } }, 5000); } diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index beda264..5f48ec5 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -5,6 +5,7 @@ //! Audio capture, playback, and TCP streaming for trx-server. use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use std::{collections::VecDeque, sync::Mutex}; @@ -31,7 +32,8 @@ const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_SAMPLE_RATE: u32 = 12_000; -const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(10); +const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60); +const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1); struct StreamErrorLogger { label: &'static str, @@ -40,6 +42,7 @@ struct StreamErrorLogger { #[derive(Default)] struct StreamErrorState { + last_kind: Option<&'static str>, last_error: Option, last_logged_at: Option, suppressed: u64, @@ -55,19 +58,14 @@ impl StreamErrorLogger { fn log(&self, err: &str) { let now = Instant::now(); + let kind = classify_stream_error(err); let mut state = self .state .lock() .expect("stream error logger mutex poisoned"); - let should_log_now = match (&state.last_error, state.last_logged_at) { - (None, _) => true, - (Some(prev), Some(ts)) => { - prev != err || now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL - } - (Some(_), None) => true, - }; - if should_log_now { + // First occurrence or changed error class: log as error once. + if state.last_kind != Some(kind) { if state.suppressed > 0 { warn!( "{} repeated {} times: {}", @@ -77,12 +75,44 @@ impl StreamErrorLogger { ); } error!("{}: {}", self.label, err); + state.last_kind = Some(kind); state.last_error = Some(err.to_string()); state.last_logged_at = Some(now); state.suppressed = 0; - } else { - state.suppressed += 1; + return; } + + // Same class: suppress aggressively and emit only periodic summaries. + state.suppressed += 1; + let due = state + .last_logged_at + .map(|ts| now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL) + .unwrap_or(false); + if due { + warn!( + "{} recurring ({} repeats/{}s): {}", + self.label, + state.suppressed, + AUDIO_STREAM_ERROR_LOG_INTERVAL.as_secs(), + state.last_error.as_deref().unwrap_or("") + ); + state.last_logged_at = Some(now); + state.suppressed = 0; + } else { + state.last_error = Some(err.to_string()); + } + } +} + +fn classify_stream_error(err: &str) -> &'static str { + if err.contains("snd_pcm_poll_descriptors") || err.contains("alsa::poll() returned POLLERR") { + "alsa_poll_failure" + } else if err.contains("Input stream") { + "input_stream_error" + } else if err.contains("Output stream") { + "output_stream_error" + } else { + "other_stream_error" } } @@ -226,6 +256,7 @@ fn run_capture( pcm_tx: Option>>, ) -> Result<(), Box> { use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; + use std::sync::mpsc::RecvTimeoutError; let host = cpal::default_host(); let device = if let Some(ref name) = device_name { @@ -260,79 +291,110 @@ fn run_capture( let mut encoder = opus::Encoder::new(sample_rate, opus_channels, opus::Application::Audio)?; encoder.set_bitrate(opus::Bitrate::Bits(bitrate_bps as i32))?; - let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(64); - - let input_err_logger = Arc::new(StreamErrorLogger::new("Audio input stream error")); - let stream = device.build_input_stream( - &config, - move |data: &[f32], _: &cpal::InputCallbackInfo| { - let _ = sample_tx.try_send(data.to_vec()); - }, - { - let input_err_logger = input_err_logger.clone(); - move |err| { - input_err_logger.log(&err.to_string()); - } - }, - None, - )?; - // Start paused — only capture when clients are connected info!( "Audio capture: ready ({}Hz, {} ch, {}ms frames)", sample_rate, channels, frame_duration_ms ); + let input_err_logger = Arc::new(StreamErrorLogger::new("Audio input stream error")); let mut pcm_buf: Vec = Vec::with_capacity(frame_samples * 2); let mut opus_buf = vec![0u8; 4096]; let mut capturing = false; loop { - let has_receivers = - tx.receiver_count() > 0 || pcm_tx.as_ref().is_some_and(|p| p.receiver_count() > 0); + let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(64); + let stream_failed = Arc::new(AtomicBool::new(false)); + let stream = match device.build_input_stream( + &config, + move |data: &[f32], _: &cpal::InputCallbackInfo| { + let _ = sample_tx.try_send(data.to_vec()); + }, + { + let input_err_logger = input_err_logger.clone(); + let stream_failed = stream_failed.clone(); + move |err| { + input_err_logger.log(&err.to_string()); + stream_failed.store(true, Ordering::Relaxed); + } + }, + None, + ) { + Ok(stream) => stream, + Err(err) => { + warn!( + "Audio capture: failed to open input stream, retrying: {}", + err + ); + std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY); + continue; + } + }; - if has_receivers && !capturing { + if capturing { let _ = stream.play(); - capturing = true; - info!("Audio capture: started"); - } else if !has_receivers && capturing { - let _ = stream.pause(); - capturing = false; - pcm_buf.clear(); - // Drain any buffered samples - while sample_rx.try_recv().is_ok() {} - info!("Audio capture: paused (no listeners)"); } - if !capturing { - std::thread::sleep(std::time::Duration::from_millis(100)); - continue; - } + loop { + if stream_failed.load(Ordering::Relaxed) { + warn!("Audio capture: backend stream error, recreating"); + break; + } - match sample_rx.recv() { - Ok(samples) => { - pcm_buf.extend_from_slice(&samples); - while pcm_buf.len() >= frame_samples { - let frame: Vec = pcm_buf.drain(..frame_samples).collect(); - if let Some(ref pcm_tx) = pcm_tx { - let _ = pcm_tx.send(frame.clone()); - } - match encoder.encode_float(&frame, &mut opus_buf) { - Ok(len) => { - let packet = Bytes::copy_from_slice(&opus_buf[..len]); - let _ = tx.send(packet); + let has_receivers = + tx.receiver_count() > 0 || pcm_tx.as_ref().is_some_and(|p| p.receiver_count() > 0); + + if has_receivers && !capturing { + let _ = stream.play(); + capturing = true; + info!("Audio capture: started"); + } else if !has_receivers && capturing { + let _ = stream.pause(); + capturing = false; + pcm_buf.clear(); + while sample_rx.try_recv().is_ok() {} + info!("Audio capture: paused (no listeners)"); + } + + if !capturing { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } + + match sample_rx.recv_timeout(std::time::Duration::from_millis(200)) { + Ok(samples) => { + pcm_buf.extend_from_slice(&samples); + while pcm_buf.len() >= frame_samples { + let frame: Vec = pcm_buf.drain(..frame_samples).collect(); + if let Some(ref pcm_tx) = pcm_tx { + let _ = pcm_tx.send(frame.clone()); } - Err(e) => { - warn!("Opus encode error: {}", e); + match encoder.encode_float(&frame, &mut opus_buf) { + Ok(len) => { + let packet = Bytes::copy_from_slice(&opus_buf[..len]); + let _ = tx.send(packet); + } + Err(e) => { + warn!("Opus encode error: {}", e); + } } } } + Err(RecvTimeoutError::Timeout) => {} + Err(RecvTimeoutError::Disconnected) => { + warn!("Audio capture: callback channel disconnected, recreating"); + break; + } } - Err(_) => break, } - } - Ok(()) + if capturing { + let _ = stream.pause(); + capturing = false; + pcm_buf.clear(); + } + std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY); + } } /// Spawn the audio playback task. @@ -362,6 +424,7 @@ fn run_playback( mut rx: mpsc::Receiver, ) -> Result<(), Box> { use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; + use tokio::sync::mpsc::error::TryRecvError; let host = cpal::default_host(); let device = if let Some(ref name) = device_name { @@ -400,63 +463,120 @@ fn run_playback( )); let ring_writer = ring.clone(); - let output_err_logger = Arc::new(StreamErrorLogger::new("Audio output stream error")); - let stream = device.build_output_stream( - &config, - move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - let mut ring = ring.lock().unwrap(); - for sample in data.iter_mut() { - *sample = ring.pop_front().unwrap_or(0.0); - } - }, - { - let output_err_logger = output_err_logger.clone(); - move |err| { - output_err_logger.log(&err.to_string()); - } - }, - None, - )?; - // Start paused — only play when TX packets arrive info!("Audio playback: ready ({}Hz, {} ch)", sample_rate, channels); + let output_err_logger = Arc::new(StreamErrorLogger::new("Audio output stream error")); let mut pcm_buf = vec![0f32; frame_samples]; let mut playing = false; + let mut channel_closed = false; + + loop { + let stream_failed = Arc::new(AtomicBool::new(false)); + let stream = match device.build_output_stream( + &config, + { + let ring = ring.clone(); + move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + let mut ring = ring.lock().unwrap(); + for sample in data.iter_mut() { + *sample = ring.pop_front().unwrap_or(0.0); + } + } + }, + { + let output_err_logger = output_err_logger.clone(); + let stream_failed = stream_failed.clone(); + move |err| { + output_err_logger.log(&err.to_string()); + stream_failed.store(true, Ordering::Relaxed); + } + }, + None, + ) { + Ok(stream) => stream, + Err(err) => { + warn!( + "Audio playback: failed to open output stream, retrying: {}", + err + ); + std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY); + continue; + } + }; - while let Some(packet) = rx.blocking_recv() { if !playing { + // stay paused until packets arrive + } else { stream.play()?; - playing = true; - info!("Audio playback: started"); } - match decoder.decode_float(&packet, &mut pcm_buf, false) { - Ok(decoded) => { - let mut ring = ring_writer.lock().unwrap(); - ring.extend(&pcm_buf[..decoded * channels as usize]); + loop { + if stream_failed.load(Ordering::Relaxed) { + warn!("Audio playback: backend stream error, recreating"); + break; } - Err(e) => { - warn!("Opus decode error: {}", e); + + match rx.try_recv() { + Ok(packet) => { + if !playing { + stream.play()?; + playing = true; + info!("Audio playback: started"); + } + + match decoder.decode_float(&packet, &mut pcm_buf, false) { + Ok(decoded) => { + let mut ring = ring_writer.lock().unwrap(); + ring.extend(&pcm_buf[..decoded * channels as usize]); + } + Err(e) => { + warn!("Opus decode error: {}", e); + } + } + + if rx.is_empty() { + std::thread::sleep(std::time::Duration::from_millis( + frame_duration_ms as u64 * 2, + )); + if rx.is_empty() { + let _ = stream.pause(); + playing = false; + ring_writer.lock().unwrap().clear(); + info!("Audio playback: paused (idle)"); + if channel_closed { + return Ok(()); + } + } + } + } + Err(TryRecvError::Empty) => { + if channel_closed && !playing { + return Ok(()); + } + std::thread::sleep(std::time::Duration::from_millis(20)); + } + Err(TryRecvError::Disconnected) => { + channel_closed = true; + if !playing { + return Ok(()); + } + std::thread::sleep(std::time::Duration::from_millis(20)); + } } } - // Pause when no more packets are queued to avoid ALSA underruns - if rx.is_empty() { - // Drain remaining samples before pausing - std::thread::sleep(std::time::Duration::from_millis( - frame_duration_ms as u64 * 2, - )); - if rx.is_empty() { - let _ = stream.pause(); - playing = false; - ring_writer.lock().unwrap().clear(); - info!("Audio playback: paused (idle)"); - } + if playing { + let _ = stream.pause(); + playing = false; } + ring_writer.lock().unwrap().clear(); + + if channel_closed { + return Ok(()); + } + std::thread::sleep(AUDIO_STREAM_RECOVERY_DELAY); } - - Ok(()) } /// Run the APRS decoder task. Only processes PCM when rig mode is PKT.