[test](trx-server): add Tier 3 unit tests for StreamErrorLogger

Split StreamErrorLogger::log into a thin tracing wrapper and a testable log_at(err, now) -> LogAction core. The LogAction enum surfaces what was emitted (Error / Suppressed / Recurring) and whether a prior class transition flushed a repeated-count summary, so tests can assert behaviour without reaching for a tracing subscriber.

Tests cover: first call emits Error; same class within the interval is Suppressed; same class past the interval emits Recurring with the accumulated count; the recurring summary restarts the suppression window; class transitions flush prior repeats only when there were any; and classify_stream_error keying really collapses different ALSA strings into one class.

7 new tests; trx-server suite now reports 94 passed (was 87).

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-05-03 20:10:15 +02:00
parent c0296fe257
commit ee7da5704f
+141 -6
View File
@@ -147,6 +147,21 @@ struct StreamErrorState {
suppressed: u64, suppressed: u64,
} }
/// Outcome of a `StreamErrorLogger::log_at` call. Tests assert on this; the
/// production path emits the corresponding tracing record(s) as a side effect.
#[derive(Debug, PartialEq, Eq)]
enum LogAction {
/// First sighting of this error class, or class transitioned. If the
/// previous class had been suppressed, `flushed_repeated` carries the
/// suppressed count emitted as a warn before the new error.
Error { flushed_repeated: Option<u64> },
/// Same class as the prior call, still inside the suppression window.
Suppressed,
/// Same class, but the suppression interval has elapsed — periodic
/// summary emitted with `repeats` accumulated since the last log.
Recurring { repeats: u64 },
}
impl StreamErrorLogger { impl StreamErrorLogger {
fn new(label: &'static str) -> Self { fn new(label: &'static str) -> Self {
Self { Self {
@@ -156,26 +171,35 @@ impl StreamErrorLogger {
} }
fn log(&self, err: &str) { fn log(&self, err: &str) {
let now = Instant::now(); let _ = self.log_at(err, Instant::now());
}
fn log_at(&self, err: &str, now: Instant) -> LogAction {
let kind = classify_stream_error(err); let kind = classify_stream_error(err);
let mut state = lock_or_recover(&self.state, self.label); let mut state = lock_or_recover(&self.state, self.label);
// First occurrence or changed error class: log as error once. // First occurrence or changed error class: log as error once.
if state.last_kind != Some(kind) { if state.last_kind != Some(kind) {
if state.suppressed > 0 { let flushed = if state.suppressed > 0 {
let n = state.suppressed;
warn!( warn!(
"{} repeated {} times: {}", "{} repeated {} times: {}",
self.label, self.label,
state.suppressed, n,
state.last_error.as_deref().unwrap_or("<unknown>") state.last_error.as_deref().unwrap_or("<unknown>")
); );
} Some(n)
} else {
None
};
error!("{}: {}", self.label, err); error!("{}: {}", self.label, err);
state.last_kind = Some(kind); state.last_kind = Some(kind);
state.last_error = Some(err.to_string()); state.last_error = Some(err.to_string());
state.last_logged_at = Some(now); state.last_logged_at = Some(now);
state.suppressed = 0; state.suppressed = 0;
return; return LogAction::Error {
flushed_repeated: flushed,
};
} }
// Same class: suppress aggressively and emit only periodic summaries. // Same class: suppress aggressively and emit only periodic summaries.
@@ -185,17 +209,20 @@ impl StreamErrorLogger {
.map(|ts| now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL) .map(|ts| now.duration_since(ts) >= AUDIO_STREAM_ERROR_LOG_INTERVAL)
.unwrap_or(false); .unwrap_or(false);
if due { if due {
let repeats = state.suppressed;
warn!( warn!(
"{} recurring ({} repeats/{}s): {}", "{} recurring ({} repeats/{}s): {}",
self.label, self.label,
state.suppressed, repeats,
AUDIO_STREAM_ERROR_LOG_INTERVAL.as_secs(), AUDIO_STREAM_ERROR_LOG_INTERVAL.as_secs(),
state.last_error.as_deref().unwrap_or("<unknown>") state.last_error.as_deref().unwrap_or("<unknown>")
); );
state.last_logged_at = Some(now); state.last_logged_at = Some(now);
state.suppressed = 0; state.suppressed = 0;
LogAction::Recurring { repeats }
} else { } else {
state.last_error = Some(err.to_string()); state.last_error = Some(err.to_string());
LogAction::Suppressed
} }
} }
} }
@@ -4514,6 +4541,114 @@ mod tests {
assert_eq!(h.estimated_total_count(), 4); assert_eq!(h.estimated_total_count(), 4);
} }
// -------------------------------------------------------------------
// StreamErrorLogger
// -------------------------------------------------------------------
#[test]
fn stream_error_logger_first_call_emits_error_with_no_flush() {
let log = StreamErrorLogger::new("input");
let action = log.log_at("Input stream closed", Instant::now());
assert_eq!(
action,
LogAction::Error {
flushed_repeated: None
}
);
}
#[test]
fn stream_error_logger_repeated_within_interval_is_suppressed() {
let log = StreamErrorLogger::new("input");
let t0 = Instant::now();
log.log_at("Input stream closed", t0);
let action = log.log_at("Input stream closed", t0 + Duration::from_secs(1));
assert_eq!(action, LogAction::Suppressed);
}
#[test]
fn stream_error_logger_emits_recurring_after_interval() {
let log = StreamErrorLogger::new("input");
let t0 = Instant::now();
log.log_at("Input stream closed", t0);
// Suppress two more, then trigger a recurring summary at the interval.
log.log_at("Input stream closed", t0 + Duration::from_secs(1));
log.log_at("Input stream closed", t0 + Duration::from_secs(2));
let action = log.log_at(
"Input stream closed",
t0 + AUDIO_STREAM_ERROR_LOG_INTERVAL + Duration::from_millis(1),
);
assert_eq!(action, LogAction::Recurring { repeats: 3 });
}
#[test]
fn stream_error_logger_recurring_resets_suppression_window() {
// After a Recurring summary, immediate next call must be suppressed
// (window starts again from the recurring timestamp).
let log = StreamErrorLogger::new("input");
let t0 = Instant::now();
log.log_at("Input stream closed", t0);
log.log_at("Input stream closed", t0 + Duration::from_secs(1));
let recurring_at = t0 + AUDIO_STREAM_ERROR_LOG_INTERVAL + Duration::from_millis(1);
let _ = log.log_at("Input stream closed", recurring_at);
// 1 ms after the recurring summary => suppressed (counter = 1).
let action = log.log_at(
"Input stream closed",
recurring_at + Duration::from_millis(1),
);
assert_eq!(action, LogAction::Suppressed);
}
#[test]
fn stream_error_logger_class_change_flushes_prior_repeats() {
let log = StreamErrorLogger::new("input");
let t0 = Instant::now();
log.log_at("Input stream closed", t0);
// Suppress two of the same kind.
log.log_at("Input stream closed", t0 + Duration::from_secs(1));
log.log_at("Input stream closed", t0 + Duration::from_secs(2));
// Different error class — should flush the suppressed count and
// immediately emit a fresh error.
let action = log.log_at("Output stream xrun", t0 + Duration::from_secs(3));
assert_eq!(
action,
LogAction::Error {
flushed_repeated: Some(2)
}
);
}
#[test]
fn stream_error_logger_class_change_without_prior_suppression_does_not_flush() {
let log = StreamErrorLogger::new("input");
let t0 = Instant::now();
log.log_at("Input stream closed", t0);
// Different class with no prior suppression — flushed_repeated must be
// None (no "repeated N times" warn emitted).
let action = log.log_at("Output stream xrun", t0 + Duration::from_secs(1));
assert_eq!(
action,
LogAction::Error {
flushed_repeated: None
}
);
}
#[test]
fn stream_error_logger_classifies_via_classify_stream_error() {
// Two different strings that both classify_stream_error maps to
// "alsa_poll_failure" — second call must be suppressed because the
// class is unchanged even though the error text differs.
let log = StreamErrorLogger::new("input");
let t0 = Instant::now();
log.log_at("snd_pcm_poll_descriptors error A", t0);
let action = log.log_at(
"alsa::poll() returned POLLERR thing B",
t0 + Duration::from_secs(1),
);
assert_eq!(action, LogAction::Suppressed);
}
#[test] #[test]
fn estimated_total_count_consistent_under_concurrent_records() { fn estimated_total_count_consistent_under_concurrent_records() {
use std::thread; use std::thread;