[feat](trx-server): add optional per-decoder log files
Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -16,6 +16,7 @@ toml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
dirs = "6"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
bytes = "1"
|
||||
cpal = "0.15"
|
||||
opus = "0.3"
|
||||
|
||||
@@ -27,6 +27,7 @@ use trx_wspr::WsprDecoder;
|
||||
|
||||
use crate::config::AudioConfig;
|
||||
use crate::decode;
|
||||
use crate::decode_logs::DecoderLoggers;
|
||||
|
||||
const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
@@ -609,6 +610,7 @@ pub async fn run_aprs_decoder(
|
||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
) {
|
||||
info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = decode::aprs::AprsDecoder::new(sample_rate);
|
||||
@@ -662,6 +664,9 @@ pub async fn run_aprs_decoder(
|
||||
was_active = true;
|
||||
for pkt in decoder.process_samples(&mono) {
|
||||
record_aprs_packet(pkt.clone());
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_aprs(&pkt);
|
||||
}
|
||||
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
|
||||
}
|
||||
}
|
||||
@@ -703,6 +708,7 @@ pub async fn run_cw_decoder(
|
||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
) {
|
||||
info!("CW decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = decode::cw::CwDecoder::new(sample_rate);
|
||||
@@ -785,6 +791,9 @@ pub async fn run_cw_decoder(
|
||||
|
||||
was_active = true;
|
||||
for evt in decoder.process_samples(&mono) {
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_cw(&evt);
|
||||
}
|
||||
let _ = decode_tx.send(DecodedMessage::Cw(evt));
|
||||
}
|
||||
}
|
||||
@@ -872,6 +881,7 @@ pub async fn run_ft8_decoder(
|
||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
) {
|
||||
info!("FT8 decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) {
|
||||
@@ -957,6 +967,9 @@ pub async fn run_ft8_decoder(
|
||||
message: res.text,
|
||||
};
|
||||
record_ft8_message(msg.clone());
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_ft8(&msg);
|
||||
}
|
||||
let _ = decode_tx.send(DecodedMessage::Ft8(msg));
|
||||
}
|
||||
}
|
||||
@@ -1004,6 +1017,7 @@ pub async fn run_wspr_decoder(
|
||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
decode_logs: Option<Arc<DecoderLoggers>>,
|
||||
) {
|
||||
info!("WSPR decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let decoder = match WsprDecoder::new() {
|
||||
@@ -1069,6 +1083,9 @@ pub async fn run_wspr_decoder(
|
||||
message: res.message,
|
||||
};
|
||||
record_wspr_message(msg.clone());
|
||||
if let Some(logger) = decode_logs.as_ref() {
|
||||
logger.log_wspr(&msg);
|
||||
}
|
||||
let _ = decode_tx.send(DecodedMessage::Wspr(msg));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,8 @@ pub struct ServerConfig {
|
||||
pub audio: AudioConfig,
|
||||
/// PSK Reporter uplink configuration
|
||||
pub pskreporter: PskReporterConfig,
|
||||
/// Decoder file logging configuration
|
||||
pub decode_logs: DecodeLogsConfig,
|
||||
}
|
||||
|
||||
/// General application settings.
|
||||
@@ -231,6 +233,48 @@ impl Default for PskReporterConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn default_decode_logs_dir() -> String {
|
||||
if let Some(data_dir) = dirs::data_dir() {
|
||||
return data_dir
|
||||
.join("trx-rs")
|
||||
.join("decoders")
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
"logs/decoders".to_string()
|
||||
}
|
||||
|
||||
/// Server-side decoder file logging configuration.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct DecodeLogsConfig {
|
||||
/// Whether decoder file logging is enabled
|
||||
pub enabled: bool,
|
||||
/// Base directory for log files
|
||||
pub dir: String,
|
||||
/// APRS decoder log filename
|
||||
pub aprs_file: String,
|
||||
/// CW decoder log filename
|
||||
pub cw_file: String,
|
||||
/// FT8 decoder log filename
|
||||
pub ft8_file: String,
|
||||
/// WSPR decoder log filename
|
||||
pub wspr_file: String,
|
||||
}
|
||||
|
||||
impl Default for DecodeLogsConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: false,
|
||||
dir: default_decode_logs_dir(),
|
||||
aprs_file: "TRXRS-APRS-%YYYY%-%MM%-%DD%.log".to_string(),
|
||||
cw_file: "TRXRS-CW-%YYYY%-%MM%-%DD%.log".to_string(),
|
||||
ft8_file: "TRXRS-FT8-%YYYY%-%MM%-%DD%.log".to_string(),
|
||||
wspr_file: "TRXRS-WSPR-%YYYY%-%MM%-%DD%.log".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn validate(&self) -> Result<(), String> {
|
||||
validate_log_level(self.general.log_level.as_deref())?;
|
||||
@@ -307,6 +351,21 @@ impl ServerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
if self.decode_logs.enabled {
|
||||
if self.decode_logs.dir.trim().is_empty() {
|
||||
return Err("[decode_logs].dir must not be empty when enabled".to_string());
|
||||
}
|
||||
if self.decode_logs.aprs_file.trim().is_empty()
|
||||
|| self.decode_logs.cw_file.trim().is_empty()
|
||||
|| self.decode_logs.ft8_file.trim().is_empty()
|
||||
|| self.decode_logs.wspr_file.trim().is_empty()
|
||||
{
|
||||
return Err(
|
||||
"[decode_logs] file names must not be empty when enabled".to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -346,6 +405,7 @@ impl ServerConfig {
|
||||
listen: ListenConfig::default(),
|
||||
audio: AudioConfig::default(),
|
||||
pskreporter: PskReporterConfig::default(),
|
||||
decode_logs: DecodeLogsConfig::default(),
|
||||
};
|
||||
|
||||
toml::to_string_pretty(&example).unwrap_or_default()
|
||||
@@ -478,6 +538,11 @@ mod tests {
|
||||
assert_eq!(config.audio.sample_rate, 48000);
|
||||
assert!(!config.pskreporter.enabled);
|
||||
assert_eq!(config.pskreporter.port, 4739);
|
||||
assert!(!config.decode_logs.enabled);
|
||||
assert!(
|
||||
std::path::Path::new(&config.decode_logs.dir)
|
||||
.ends_with(std::path::Path::new("decoders"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
use std::fs::{create_dir_all, File, OpenOptions};
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use chrono::Utc;
|
||||
use serde_json::json;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::DecodeLogsConfig;
|
||||
use trx_core::decode::{AprsPacket, CwEvent, Ft8Message, WsprMessage};
|
||||
|
||||
struct DecoderFileLogger {
|
||||
base_dir: PathBuf,
|
||||
file_template: String,
|
||||
state: Mutex<DecoderFileState>,
|
||||
label: &'static str,
|
||||
}
|
||||
|
||||
struct DecoderFileState {
|
||||
current_file_name: String,
|
||||
writer: BufWriter<File>,
|
||||
}
|
||||
|
||||
impl DecoderFileLogger {
|
||||
fn resolve_file_name(template: &str) -> String {
|
||||
let now = Utc::now();
|
||||
template
|
||||
.replace("%YYYY%", &now.format("%Y").to_string())
|
||||
.replace("%MM%", &now.format("%m").to_string())
|
||||
.replace("%DD%", &now.format("%d").to_string())
|
||||
}
|
||||
|
||||
fn open_writer(path: &Path, label: &'static str) -> Result<BufWriter<File>, String> {
|
||||
if let Some(parent) = path.parent() {
|
||||
create_dir_all(parent)
|
||||
.map_err(|e| format!("create {} log dir '{}': {}", label, parent.display(), e))?;
|
||||
}
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)
|
||||
.map_err(|e| format!("open {} log '{}': {}", label, path.display(), e))?;
|
||||
Ok(BufWriter::new(file))
|
||||
}
|
||||
|
||||
fn open(base_dir: &Path, template: &str, label: &'static str) -> Result<Self, String> {
|
||||
let file_name = Self::resolve_file_name(template);
|
||||
let path = base_dir.join(&file_name);
|
||||
let writer = Self::open_writer(&path, label)?;
|
||||
Ok(Self {
|
||||
base_dir: base_dir.to_path_buf(),
|
||||
file_template: template.to_string(),
|
||||
state: Mutex::new(DecoderFileState {
|
||||
current_file_name: file_name,
|
||||
writer,
|
||||
}),
|
||||
label,
|
||||
})
|
||||
}
|
||||
|
||||
fn write_payload<T: serde::Serialize>(&self, payload: &T) {
|
||||
let ts_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||
Ok(d) => d.as_millis() as u64,
|
||||
Err(_) => 0,
|
||||
};
|
||||
let line = json!({
|
||||
"ts_ms": ts_ms,
|
||||
"decoder": self.label,
|
||||
"payload": payload,
|
||||
});
|
||||
let Ok(mut state) = self.state.lock() else {
|
||||
warn!("decode log mutex poisoned for {}", self.label);
|
||||
return;
|
||||
};
|
||||
|
||||
let next_file_name = Self::resolve_file_name(&self.file_template);
|
||||
if next_file_name != state.current_file_name {
|
||||
let next_path = self.base_dir.join(&next_file_name);
|
||||
match Self::open_writer(&next_path, self.label) {
|
||||
Ok(next_writer) => {
|
||||
state.current_file_name = next_file_name;
|
||||
state.writer = next_writer;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("decode log reopen failed for {}: {}", self.label, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if serde_json::to_writer(&mut state.writer, &line).is_err() {
|
||||
warn!("decode log serialization failed for {}", self.label);
|
||||
return;
|
||||
}
|
||||
if state.writer.write_all(b"\n").is_err() {
|
||||
warn!("decode log write failed for {}", self.label);
|
||||
return;
|
||||
}
|
||||
let _ = state.writer.flush();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DecoderLoggers {
|
||||
aprs: DecoderFileLogger,
|
||||
cw: DecoderFileLogger,
|
||||
ft8: DecoderFileLogger,
|
||||
wspr: DecoderFileLogger,
|
||||
}
|
||||
|
||||
impl DecoderLoggers {
|
||||
pub fn from_config(cfg: &DecodeLogsConfig) -> Result<Option<Arc<Self>>, String> {
|
||||
if !cfg.enabled {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let base_dir = PathBuf::from(cfg.dir.trim());
|
||||
create_dir_all(&base_dir)
|
||||
.map_err(|e| format!("create decode log dir '{}': {}", base_dir.display(), e))?;
|
||||
|
||||
let loggers = Self {
|
||||
aprs: DecoderFileLogger::open(&base_dir, &cfg.aprs_file, "aprs")?,
|
||||
cw: DecoderFileLogger::open(&base_dir, &cfg.cw_file, "cw")?,
|
||||
ft8: DecoderFileLogger::open(&base_dir, &cfg.ft8_file, "ft8")?,
|
||||
wspr: DecoderFileLogger::open(&base_dir, &cfg.wspr_file, "wspr")?,
|
||||
};
|
||||
|
||||
Ok(Some(Arc::new(loggers)))
|
||||
}
|
||||
|
||||
pub fn log_aprs(&self, pkt: &AprsPacket) {
|
||||
self.aprs.write_payload(pkt);
|
||||
}
|
||||
|
||||
pub fn log_cw(&self, evt: &CwEvent) {
|
||||
self.cw.write_payload(evt);
|
||||
}
|
||||
|
||||
pub fn log_ft8(&self, msg: &Ft8Message) {
|
||||
self.ft8.write_payload(msg);
|
||||
}
|
||||
|
||||
pub fn log_wspr(&self, msg: &WsprMessage) {
|
||||
self.wspr.write_payload(msg);
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@
|
||||
mod audio;
|
||||
mod config;
|
||||
mod decode;
|
||||
mod decode_logs;
|
||||
mod error;
|
||||
mod listener;
|
||||
mod pskreporter;
|
||||
@@ -33,6 +34,7 @@ use trx_core::rig::state::RigState;
|
||||
use trx_core::DynResult;
|
||||
|
||||
use config::ServerConfig;
|
||||
use decode_logs::DecoderLoggers;
|
||||
|
||||
const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - rig server daemon");
|
||||
const RIG_TASK_CHANNEL_BUFFER: usize = 32;
|
||||
@@ -402,6 +404,14 @@ async fn main() -> DynResult<()> {
|
||||
}
|
||||
}
|
||||
|
||||
let decoder_logs = match DecoderLoggers::from_config(&cfg.decode_logs) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("Decoder file logging disabled: {}", e);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if cfg.audio.rx_enabled {
|
||||
let _capture_thread =
|
||||
audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone()));
|
||||
@@ -413,9 +423,10 @@ async fn main() -> DynResult<()> {
|
||||
let aprs_sr = cfg.audio.sample_rate;
|
||||
let aprs_ch = cfg.audio.channels;
|
||||
let aprs_shutdown_rx = shutdown_rx.clone();
|
||||
let aprs_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx) => {}
|
||||
_ = audio::run_aprs_decoder(aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, aprs_logs) => {}
|
||||
_ = wait_for_shutdown(aprs_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
@@ -427,9 +438,10 @@ async fn main() -> DynResult<()> {
|
||||
let cw_sr = cfg.audio.sample_rate;
|
||||
let cw_ch = cfg.audio.channels;
|
||||
let cw_shutdown_rx = shutdown_rx.clone();
|
||||
let cw_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx) => {}
|
||||
_ = audio::run_cw_decoder(cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, cw_logs) => {}
|
||||
_ = wait_for_shutdown(cw_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
@@ -441,9 +453,10 @@ async fn main() -> DynResult<()> {
|
||||
let ft8_sr = cfg.audio.sample_rate;
|
||||
let ft8_ch = cfg.audio.channels;
|
||||
let ft8_shutdown_rx = shutdown_rx.clone();
|
||||
let ft8_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx) => {}
|
||||
_ = audio::run_ft8_decoder(ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx, ft8_logs) => {}
|
||||
_ = wait_for_shutdown(ft8_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
@@ -455,9 +468,10 @@ async fn main() -> DynResult<()> {
|
||||
let wspr_sr = cfg.audio.sample_rate;
|
||||
let wspr_ch = cfg.audio.channels;
|
||||
let wspr_shutdown_rx = shutdown_rx.clone();
|
||||
let wspr_logs = decoder_logs.clone();
|
||||
task_handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx) => {}
|
||||
_ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx, wspr_logs) => {}
|
||||
_ = wait_for_shutdown(wspr_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
Reference in New Issue
Block a user