diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 2489968..20b7c1b 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -147,6 +147,21 @@ struct StreamErrorState { suppressed: u64, } +/// Outcome of a `StreamErrorLogger::log_at` call. Tests assert on this; the +/// production path emits the corresponding tracing record(s) as a side effect. +#[derive(Debug, PartialEq, Eq)] +enum LogAction { + /// First sighting of this error class, or class transitioned. If the + /// previous class had been suppressed, `flushed_repeated` carries the + /// suppressed count emitted as a warn before the new error. + Error { flushed_repeated: Option }, + /// Same class as the prior call, still inside the suppression window. + Suppressed, + /// Same class, but the suppression interval has elapsed — periodic + /// summary emitted with `repeats` accumulated since the last log. + Recurring { repeats: u64 }, +} + impl StreamErrorLogger { fn new(label: &'static str) -> Self { Self { @@ -156,26 +171,35 @@ impl StreamErrorLogger { } fn log(&self, err: &str) { - let now = Instant::now(); + let _ = self.log_at(err, Instant::now()); + } + + fn log_at(&self, err: &str, now: Instant) -> LogAction { let kind = classify_stream_error(err); let mut state = lock_or_recover(&self.state, self.label); // First occurrence or changed error class: log as error once. if state.last_kind != Some(kind) { - if state.suppressed > 0 { + let flushed = if state.suppressed > 0 { + let n = state.suppressed; warn!( "{} repeated {} times: {}", self.label, - state.suppressed, + n, state.last_error.as_deref().unwrap_or("") ); - } + Some(n) + } else { + None + }; error!("{}: {}", self.label, err); state.last_kind = Some(kind); state.last_error = Some(err.to_string()); state.last_logged_at = Some(now); state.suppressed = 0; - return; + return LogAction::Error { + flushed_repeated: flushed, + }; } // Same class: suppress aggressively and emit only periodic summaries. @@ -185,17 +209,20 @@ impl StreamErrorLogger { .map(|ts| now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL) .unwrap_or(false); if due { + let repeats = state.suppressed; warn!( "{} recurring ({} repeats/{}s): {}", self.label, - state.suppressed, + repeats, AUDIO_STREAM_ERROR_LOG_INTERVAL.as_secs(), state.last_error.as_deref().unwrap_or("") ); state.last_logged_at = Some(now); state.suppressed = 0; + LogAction::Recurring { repeats } } else { state.last_error = Some(err.to_string()); + LogAction::Suppressed } } } @@ -4514,6 +4541,114 @@ mod tests { assert_eq!(h.estimated_total_count(), 4); } + // ------------------------------------------------------------------- + // StreamErrorLogger + // ------------------------------------------------------------------- + + #[test] + fn stream_error_logger_first_call_emits_error_with_no_flush() { + let log = StreamErrorLogger::new("input"); + let action = log.log_at("Input stream closed", Instant::now()); + assert_eq!( + action, + LogAction::Error { + flushed_repeated: None + } + ); + } + + #[test] + fn stream_error_logger_repeated_within_interval_is_suppressed() { + let log = StreamErrorLogger::new("input"); + let t0 = Instant::now(); + log.log_at("Input stream closed", t0); + let action = log.log_at("Input stream closed", t0 + Duration::from_secs(1)); + assert_eq!(action, LogAction::Suppressed); + } + + #[test] + fn stream_error_logger_emits_recurring_after_interval() { + let log = StreamErrorLogger::new("input"); + let t0 = Instant::now(); + log.log_at("Input stream closed", t0); + // Suppress two more, then trigger a recurring summary at the interval. + log.log_at("Input stream closed", t0 + Duration::from_secs(1)); + log.log_at("Input stream closed", t0 + Duration::from_secs(2)); + let action = log.log_at( + "Input stream closed", + t0 + AUDIO_STREAM_ERROR_LOG_INTERVAL + Duration::from_millis(1), + ); + assert_eq!(action, LogAction::Recurring { repeats: 3 }); + } + + #[test] + fn stream_error_logger_recurring_resets_suppression_window() { + // After a Recurring summary, immediate next call must be suppressed + // (window starts again from the recurring timestamp). + let log = StreamErrorLogger::new("input"); + let t0 = Instant::now(); + log.log_at("Input stream closed", t0); + log.log_at("Input stream closed", t0 + Duration::from_secs(1)); + let recurring_at = t0 + AUDIO_STREAM_ERROR_LOG_INTERVAL + Duration::from_millis(1); + let _ = log.log_at("Input stream closed", recurring_at); + // 1 ms after the recurring summary => suppressed (counter = 1). + let action = log.log_at( + "Input stream closed", + recurring_at + Duration::from_millis(1), + ); + assert_eq!(action, LogAction::Suppressed); + } + + #[test] + fn stream_error_logger_class_change_flushes_prior_repeats() { + let log = StreamErrorLogger::new("input"); + let t0 = Instant::now(); + log.log_at("Input stream closed", t0); + // Suppress two of the same kind. + log.log_at("Input stream closed", t0 + Duration::from_secs(1)); + log.log_at("Input stream closed", t0 + Duration::from_secs(2)); + // Different error class — should flush the suppressed count and + // immediately emit a fresh error. + let action = log.log_at("Output stream xrun", t0 + Duration::from_secs(3)); + assert_eq!( + action, + LogAction::Error { + flushed_repeated: Some(2) + } + ); + } + + #[test] + fn stream_error_logger_class_change_without_prior_suppression_does_not_flush() { + let log = StreamErrorLogger::new("input"); + let t0 = Instant::now(); + log.log_at("Input stream closed", t0); + // Different class with no prior suppression — flushed_repeated must be + // None (no "repeated N times" warn emitted). + let action = log.log_at("Output stream xrun", t0 + Duration::from_secs(1)); + assert_eq!( + action, + LogAction::Error { + flushed_repeated: None + } + ); + } + + #[test] + fn stream_error_logger_classifies_via_classify_stream_error() { + // Two different strings that both classify_stream_error maps to + // "alsa_poll_failure" — second call must be suppressed because the + // class is unchanged even though the error text differs. + let log = StreamErrorLogger::new("input"); + let t0 = Instant::now(); + log.log_at("snd_pcm_poll_descriptors error A", t0); + let action = log.log_at( + "alsa::poll() returned POLLERR thing B", + t0 + Duration::from_secs(1), + ); + assert_eq!(action, LogAction::Suppressed); + } + #[test] fn estimated_total_count_consistent_under_concurrent_records() { use std::thread;