[feat](trx-rs): add client-side Opus audio recorder

Record Opus audio streams to OGG files on the client. Includes manual start/stop via HTTP API, scheduler-driven auto-recording per schedule entry, and a header REC button in the web UI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-30 23:37:09 +02:00
parent 2296a53916
commit f2048c583c
15 changed files with 1016 additions and 5 deletions
@@ -253,7 +253,8 @@ function applyAuthRestrictions() {
"settings-clear-ft4-history",
"settings-clear-ft2-history",
"settings-clear-wspr-history",
"settings-clear-sat-history"
"settings-clear-sat-history",
"header-rec-btn"
];
pluginToggleBtns.forEach(id => {
const btn = document.getElementById(id);
@@ -3305,6 +3306,10 @@ function render(update) {
for (const [key, entry] of Object.entries(_decoderToggles)) {
syncDecoderToggle(entry, !!update[key], entry.label);
}
// Recorder state sync.
if (typeof update.recorder_enabled === "boolean" && window._syncRecorderState) {
window._syncRecorderState(update.recorder_enabled);
}
if (window.updateSatLiveState) window.updateSatLiveState(update);
const cwAutoEl = document.getElementById("cw-auto");
const cwWpmEl = document.getElementById("cw-wpm");
@@ -8852,6 +8857,31 @@ if (headerAudioToggle) {
headerAudioToggle.addEventListener("click", startRxAudio);
}
// ── Recorder button ────────────────────────────────────────────────────────
const headerRecBtn = document.getElementById("header-rec-btn");
let recorderActive = false;
function syncRecorderBtn() {
if (!headerRecBtn) return;
headerRecBtn.classList.toggle("rec-active", recorderActive);
}
if (headerRecBtn) {
headerRecBtn.addEventListener("click", async () => {
try {
if (recorderActive) {
await postPath("/api/recorder/stop");
} else {
await postPath("/api/recorder/start");
}
} catch (e) {
console.error("Recorder toggle failed", e);
}
});
}
window._syncRecorderState = function (enabled) {
recorderActive = enabled;
syncRecorderBtn();
};
const rxVolPct = document.getElementById("rx-vol-pct");
const txVolPct = document.getElementById("tx-vol-pct");
@@ -65,6 +65,7 @@
<button id="header-audio-toggle" class="header-bar-btn header-audio-btn" aria-label="Toggle audio playback" title="Toggle audio playback">
<svg viewBox="0 0 16 16" fill="currentColor" aria-hidden="true"><path d="M5 3v10l8-5z"/></svg>
</button>
<button id="header-rec-btn" class="header-bar-btn header-rec-btn" type="button" aria-label="Toggle recording" title="Toggle recording">REC</button>
<div class="header-rig-switch">
<select id="header-rig-switch-select" aria-label="Select active rig"></select>
</div>
@@ -1152,6 +1153,10 @@
<label class="bm-label">Interleave (min, optional)
<input type="number" id="scheduler-ts-entry-interleave" class="status-input" min="1" max="60" placeholder="default" />
</label>
<label class="bm-label" style="flex-direction:row;align-items:center;gap:0.5rem;">
<input type="checkbox" id="scheduler-ts-entry-record" />
Record audio
</label>
</div>
<div class="bm-form-actions">
<button type="submit" class="bm-save-btn">Save</button>
@@ -1163,7 +1168,7 @@
<summary>Entry details</summary>
<table class="sch-ts-table">
<thead>
<tr><th>Start</th><th>End</th><th>Center freq</th><th>Primary bookmark</th><th>Extra channels</th><th>Label</th><th>Interleave (min)</th><th></th></tr>
<tr><th>Start</th><th>End</th><th>Center freq</th><th>Primary bookmark</th><th>Extra channels</th><th>Label</th><th>Interleave (min)</th><th>REC</th><th></th></tr>
</thead>
<tbody id="scheduler-ts-tbody"></tbody>
</table>
@@ -502,6 +502,9 @@
if (ilEl) ilEl.value = entry && entry.interleave_min ? entry.interleave_min : "";
if (centerHzEl) centerHzEl.value = entry && entry.center_hz ? entry.center_hz : "";
const recordEl = document.getElementById("scheduler-ts-entry-record");
if (recordEl) recordEl.checked = !!(entry && entry.record);
pendingExtraBmIds = entry && Array.isArray(entry.bookmark_ids) ? entry.bookmark_ids.slice() : [];
renderExtraBmList();
@@ -550,6 +553,9 @@
}
if (!currentConfig.entries) currentConfig.entries = [];
const recordCb = document.getElementById("scheduler-ts-entry-record");
const entryRecord = recordCb ? recordCb.checked : false;
const entryData = {
start_min: startMin,
end_min: endMin,
@@ -558,6 +564,7 @@
interleave_min: entryInterleave,
center_hz: centerHz,
bookmark_ids: extraBmIds,
record: entryRecord,
};
if (schEntryEditIdx !== null) {
@@ -696,6 +703,7 @@
'<td>' + extraCell + '</td>' +
'<td>' + escHtml(entry.label || "") + '</td>' +
'<td>' + il + '</td>' +
'<td>' + (entry.record ? 'Yes' : '') + '</td>' +
'<td>' +
'<button class="sch-write sch-edit-btn" data-idx="' + idx + '" type="button">Edit</button>' +
'<button class="sch-write sch-remove-btn" data-idx="' + idx + '" type="button">Remove</button>' +
@@ -1142,6 +1142,31 @@ small { color: var(--text-muted); }
color: #00d17f;
border-color: #00d17f;
}
.header-bar-btn.header-rec-btn {
height: 2rem;
min-height: 0;
padding: 0 0.5rem;
font-size: 0.7rem;
font-weight: 700;
letter-spacing: 0.04em;
border: 1px solid var(--border-light);
border-radius: 6px;
background: var(--input-bg);
color: var(--text-muted);
cursor: pointer;
flex-shrink: 0;
transition: color 0.15s, border-color 0.15s, background 0.15s;
}
.header-rec-btn.rec-active {
color: #ff3b30;
border-color: #ff3b30;
background: rgba(255, 59, 48, 0.12);
animation: rec-pulse 1.5s ease-in-out infinite;
}
@keyframes rec-pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.6; }
}
.header-rig-switch {
display: flex;
align-items: center;
@@ -7,6 +7,7 @@
mod assets;
mod bookmarks;
mod decoder;
pub mod recorder;
mod rig;
mod sse;
mod vchan;
@@ -606,6 +607,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(crate::server::background_decode::delete_background_decode)
.service(crate::server::background_decode::get_background_decode_status)
.service(crate::server::audio::audio_ws)
// Recorder
.service(recorder::recorder_start)
.service(recorder::recorder_stop)
.service(recorder::recorder_status)
.service(recorder::recorder_files)
// Static assets
.service(assets::index)
.service(assets::map_index)
@@ -0,0 +1,206 @@
// SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
//
// SPDX-License-Identifier: BSD-2-Clause
//! HTTP API endpoints for audio recording.
use std::sync::Arc;
use actix_web::{get, post, web, Error, HttpResponse};
use tokio::sync::{mpsc, watch};
use trx_core::{RigCommand, RigState};
use trx_frontend::FrontendRuntimeContext;
use super::send_command;
use crate::server::recorder::RecorderManager;
// ============================================================================
// Query types
// ============================================================================
#[derive(serde::Deserialize)]
pub struct RecorderStartQuery {
pub remote: Option<String>,
pub vchan_id: Option<String>,
}
#[derive(serde::Deserialize)]
pub struct RecorderStopQuery {
pub remote: Option<String>,
pub vchan_id: Option<String>,
}
// ============================================================================
// Endpoints
// ============================================================================
/// Start recording audio for the active rig (or a specific vchan).
#[post("/api/recorder/start")]
pub async fn recorder_start(
query: web::Query<RecorderStartQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
recorder_mgr: web::Data<Arc<RecorderManager>>,
state: web::Data<watch::Receiver<RigState>>,
rig_tx: web::Data<mpsc::Sender<trx_core::RigRequest>>,
) -> Result<HttpResponse, Error> {
let rig_id = resolve_rig_id(&context, query.remote.as_deref());
let vchan_id = query.vchan_id.as_deref();
// Resolve the audio broadcast sender for this rig/vchan.
let (audio_tx, sample_rate, channels, frame_duration_ms) =
resolve_audio_source(&context, &rig_id, vchan_id)?;
let current_state = state.get_ref().borrow().clone();
let freq_hz = Some(current_state.status.freq.hz);
let mode = Some(trx_protocol::mode_to_string(&current_state.status.mode).into_owned());
let params = crate::server::recorder::AudioParams {
sample_rate,
channels,
frame_duration_ms,
};
match recorder_mgr.start(
&rig_id,
vchan_id,
audio_tx,
params,
freq_hz,
mode.as_deref(),
) {
Ok(info) => {
// Sync recorder_enabled state to the rig.
let _ = send_command(
&rig_tx,
RigCommand::SetRecorderEnabled(true),
query.remote.clone(),
)
.await;
Ok(HttpResponse::Ok().json(info))
}
Err(e) => Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e }))),
}
}
/// Stop recording.
#[post("/api/recorder/stop")]
pub async fn recorder_stop(
query: web::Query<RecorderStopQuery>,
context: web::Data<Arc<FrontendRuntimeContext>>,
recorder_mgr: web::Data<Arc<RecorderManager>>,
rig_tx: web::Data<mpsc::Sender<trx_core::RigRequest>>,
) -> Result<HttpResponse, Error> {
let rig_id = resolve_rig_id(&context, query.remote.as_deref());
let vchan_id = query.vchan_id.as_deref();
match recorder_mgr.stop(&rig_id, vchan_id).await {
Ok(result) => {
// Check if any recordings remain active for this rig.
let still_recording = recorder_mgr
.list_active()
.iter()
.any(|r| r.rig_id == rig_id);
if !still_recording {
let _ = send_command(
&rig_tx,
RigCommand::SetRecorderEnabled(false),
query.remote.clone(),
)
.await;
}
Ok(HttpResponse::Ok().json(result))
}
Err(e) => Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e }))),
}
}
/// Get the status of active recordings.
#[get("/api/recorder/status")]
pub async fn recorder_status(
recorder_mgr: web::Data<Arc<RecorderManager>>,
) -> Result<HttpResponse, Error> {
let active = recorder_mgr.list_active();
Ok(HttpResponse::Ok().json(active))
}
/// List recorded files in the output directory.
#[get("/api/recorder/files")]
pub async fn recorder_files(
recorder_mgr: web::Data<Arc<RecorderManager>>,
) -> Result<HttpResponse, Error> {
let files = recorder_mgr.list_files();
Ok(HttpResponse::Ok().json(files))
}
// ============================================================================
// Helpers
// ============================================================================
fn resolve_rig_id(context: &FrontendRuntimeContext, remote: Option<&str>) -> String {
if let Some(r) = remote {
return r.to_string();
}
context
.routing
.active_rig_id
.lock()
.ok()
.and_then(|v| v.clone())
.unwrap_or_else(|| "default".to_string())
}
fn resolve_audio_source(
context: &FrontendRuntimeContext,
rig_id: &str,
vchan_id: Option<&str>,
) -> Result<(tokio::sync::broadcast::Sender<bytes::Bytes>, u32, u8, u16), Error> {
if let Some(vchan_uuid_str) = vchan_id {
// Virtual channel audio.
let uuid: uuid::Uuid = vchan_uuid_str
.parse()
.map_err(|_| actix_web::error::ErrorBadRequest("invalid vchan_id UUID"))?;
let audio = context
.vchan
.audio
.read()
.unwrap_or_else(|e| e.into_inner());
let tx = audio
.get(&uuid)
.cloned()
.ok_or_else(|| actix_web::error::ErrorNotFound("vchan audio not found"))?;
// Virtual channels use the same stream info as the main rig.
let (sr, ch, fd) = stream_info_for_rig(context, rig_id);
Ok((tx, sr, ch, fd))
} else {
// Main rig audio — try per-rig first, then default.
let tx = context
.rig_audio
.rx
.read()
.ok()
.and_then(|map| map.get(rig_id).cloned())
.or_else(|| context.audio.rx.clone())
.ok_or_else(|| actix_web::error::ErrorNotFound("no audio source for rig"))?;
let (sr, ch, fd) = stream_info_for_rig(context, rig_id);
Ok((tx, sr, ch, fd))
}
}
fn stream_info_for_rig(context: &FrontendRuntimeContext, rig_id: &str) -> (u32, u8, u16) {
// Try per-rig stream info first.
if let Some(rx) = context.rig_audio_info_rx(rig_id) {
if let Some(info) = rx.borrow().as_ref() {
return (info.sample_rate, info.channels, info.frame_duration_ms);
}
}
// Fall back to the default audio info.
if let Some(ref info_rx) = context.audio.info {
if let Some(info) = info_rx.borrow().as_ref() {
return (info.sample_rate, info.channels, info.frame_duration_ms);
}
}
// Absolute fallback.
(48000, 2, 20)
}
@@ -0,0 +1,560 @@
// SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Audio recorder — writes incoming Opus packets to OGG/Opus files.
//!
//! The recorder subscribes to the same `broadcast::Sender<Bytes>` channels
//! that feed the WebSocket audio endpoint, capturing pre-encoded Opus packets
//! without any re-encoding.
use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, watch};
use tracing::{error, info, warn};
// ============================================================================
// Configuration
// ============================================================================
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct RecorderConfig {
/// Whether the recorder feature is available.
pub enabled: bool,
/// Directory for recorded files. Default: `$XDG_DATA_HOME/trx-rs/recordings/`.
pub output_dir: Option<String>,
/// Maximum duration of a single recording in seconds. None = unlimited.
pub max_duration_secs: Option<u64>,
}
impl Default for RecorderConfig {
fn default() -> Self {
Self {
enabled: true,
output_dir: None,
max_duration_secs: None,
}
}
}
impl RecorderConfig {
pub fn resolve_output_dir(&self) -> PathBuf {
if let Some(ref dir) = self.output_dir {
PathBuf::from(dir)
} else {
dirs::data_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("trx-rs")
.join("recordings")
}
}
}
// ============================================================================
// Recording metadata
// ============================================================================
#[derive(Debug, Clone, Serialize)]
pub struct RecordingInfo {
pub key: String,
pub rig_id: String,
pub vchan_id: Option<String>,
pub path: String,
pub started_at: i64,
pub sample_rate: u32,
pub channels: u8,
}
#[derive(Debug, Clone, Serialize)]
pub struct RecordingResult {
pub key: String,
pub path: String,
pub duration_secs: f64,
pub bytes_written: u64,
}
/// Audio stream parameters for a recording.
#[derive(Debug, Clone, Copy)]
pub struct AudioParams {
pub sample_rate: u32,
pub channels: u8,
pub frame_duration_ms: u16,
}
// ============================================================================
// OGG/Opus writer
// ============================================================================
/// Minimal OGG/Opus file writer.
///
/// Writes the mandatory OpusHead and OpusTags pages, then wraps each incoming
/// Opus packet in its own OGG page. This produces a valid, seekable OGG Opus
/// stream without pulling in an external OGG crate.
struct OggOpusWriter {
file: std::fs::File,
serial: u32,
page_seq: u32,
granule_pos: u64,
samples_per_frame: u64,
bytes_written: u64,
}
impl OggOpusWriter {
fn create(
path: &Path,
sample_rate: u32,
channels: u8,
frame_duration_ms: u16,
) -> std::io::Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::File::create(path)?;
let serial = {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
(ts & 0xFFFF_FFFF) as u32
};
let samples_per_frame = (sample_rate as u64) * (frame_duration_ms as u64) / 1000;
let mut writer = Self {
file,
serial,
page_seq: 0,
granule_pos: 0,
samples_per_frame,
bytes_written: 0,
};
writer.write_opus_head(sample_rate, channels)?;
writer.write_opus_tags()?;
Ok(writer)
}
/// Write the OpusHead identification header (OGG page, BOS).
fn write_opus_head(&mut self, sample_rate: u32, channels: u8) -> std::io::Result<()> {
let mut head = Vec::with_capacity(19);
head.extend_from_slice(b"OpusHead");
head.push(1); // version
head.push(channels);
head.extend_from_slice(&0u16.to_le_bytes()); // pre-skip
head.extend_from_slice(&sample_rate.to_le_bytes()); // input sample rate
head.extend_from_slice(&0u16.to_le_bytes()); // output gain
head.push(0); // channel mapping family
// BOS flag = 0x02
self.write_ogg_page(0x02, 0, &head)
}
/// Write the OpusTags comment header.
fn write_opus_tags(&mut self) -> std::io::Result<()> {
let vendor = b"trx-rs";
let mut tags = Vec::with_capacity(24);
tags.extend_from_slice(b"OpusTags");
tags.extend_from_slice(&(vendor.len() as u32).to_le_bytes());
tags.extend_from_slice(vendor);
tags.extend_from_slice(&0u32.to_le_bytes()); // no user comments
self.write_ogg_page(0x00, 0, &tags)
}
/// Write a single Opus audio packet as an OGG page.
fn write_audio_packet(&mut self, opus_data: &[u8]) -> std::io::Result<()> {
self.granule_pos += self.samples_per_frame;
self.write_ogg_page(0x00, self.granule_pos, opus_data)
}
/// Finalize the stream by writing an EOS page.
fn finalize(mut self) -> std::io::Result<u64> {
// Write an empty EOS page.
self.write_ogg_page(0x04, self.granule_pos, &[])?;
self.file.flush()?;
Ok(self.bytes_written)
}
/// Write a single OGG page.
fn write_ogg_page(
&mut self,
header_type: u8,
granule_position: u64,
data: &[u8],
) -> std::io::Result<()> {
// OGG page header
let mut header = Vec::with_capacity(27 + 255);
header.extend_from_slice(b"OggS"); // capture pattern
header.push(0); // stream structure version
header.push(header_type); // header type flag
header.extend_from_slice(&granule_position.to_le_bytes()); // granule position
header.extend_from_slice(&self.serial.to_le_bytes()); // stream serial number
header.extend_from_slice(&self.page_seq.to_le_bytes()); // page sequence number
header.extend_from_slice(&0u32.to_le_bytes()); // CRC (placeholder)
self.page_seq += 1;
// Segment table: split data into 255-byte segments.
let num_segments = if data.is_empty() {
1
} else {
data.len().div_ceil(255)
};
// A single packet needs lacing values: full 255-byte segments + final remainder.
let mut segments = Vec::with_capacity(num_segments);
let mut remaining = data.len();
while remaining >= 255 {
segments.push(255u8);
remaining -= 255;
}
segments.push(remaining as u8);
header.push(segments.len() as u8); // number of page segments
header.extend_from_slice(&segments);
// Compute CRC-32 over header + data
let crc = ogg_crc32(&header, data);
header[22..26].copy_from_slice(&crc.to_le_bytes());
self.file.write_all(&header)?;
self.file.write_all(data)?;
self.bytes_written += header.len() as u64 + data.len() as u64;
Ok(())
}
}
/// OGG CRC-32 (polynomial 0x04C11DB7, direct algorithm).
fn ogg_crc32(header: &[u8], data: &[u8]) -> u32 {
static TABLE: std::sync::OnceLock<[u32; 256]> = std::sync::OnceLock::new();
let table = TABLE.get_or_init(|| {
let mut t = [0u32; 256];
for i in 0..256u32 {
let mut r = i << 24;
for _ in 0..8 {
r = if r & 0x80000000 != 0 {
(r << 1) ^ 0x04C11DB7
} else {
r << 1
};
}
t[i as usize] = r;
}
t
});
let mut crc = 0u32;
for &b in header.iter().chain(data.iter()) {
crc = (crc << 8) ^ table[((crc >> 24) ^ (b as u32)) as usize];
}
crc
}
// ============================================================================
// RecorderHandle
// ============================================================================
struct RecorderHandle {
stop_tx: watch::Sender<bool>,
handle: tokio::task::JoinHandle<Option<RecordingResult>>,
info: RecordingInfo,
}
// ============================================================================
// RecorderManager
// ============================================================================
pub struct RecorderManager {
recordings: Mutex<HashMap<String, RecorderHandle>>,
config: RecorderConfig,
}
impl RecorderManager {
pub fn new(config: RecorderConfig) -> Self {
Self {
recordings: Mutex::new(HashMap::new()),
config,
}
}
/// Build a recording key from rig_id and optional vchan_id.
fn make_key(rig_id: &str, vchan_id: Option<&str>) -> String {
match vchan_id {
Some(v) => format!("{rig_id}:{v}"),
None => rig_id.to_string(),
}
}
/// Start recording the given audio stream.
pub fn start(
&self,
rig_id: &str,
vchan_id: Option<&str>,
audio_rx: broadcast::Sender<Bytes>,
params: AudioParams,
freq_hz: Option<u64>,
mode: Option<&str>,
) -> Result<RecordingInfo, String> {
if !self.config.enabled {
return Err("recorder is disabled".into());
}
let key = Self::make_key(rig_id, vchan_id);
let mut recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner());
if recordings.contains_key(&key) {
return Err(format!("already recording: {key}"));
}
let output_dir = self.config.resolve_output_dir();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let ts = chrono_timestamp(now.as_secs());
let filename = {
let freq_part = freq_hz.map(|f| format!("_{f}")).unwrap_or_default();
let mode_part = mode.map(|m| format!("_{m}")).unwrap_or_default();
let vchan_part = vchan_id.map(|v| format!("_vchan-{v}")).unwrap_or_default();
format!("{rig_id}{freq_part}{mode_part}{vchan_part}_{ts}.ogg")
};
let path = output_dir.join(&filename);
let (stop_tx, stop_rx) = watch::channel(false);
let rx = audio_rx.subscribe();
let path_clone = path.clone();
let max_duration = self.config.max_duration_secs;
let key_clone = key.clone();
let handle = tokio::task::spawn_blocking(move || {
run_recorder(&key_clone, &path_clone, rx, stop_rx, params, max_duration)
});
let started_at = now.as_secs() as i64;
let info = RecordingInfo {
key: key.clone(),
rig_id: rig_id.to_string(),
vchan_id: vchan_id.map(str::to_string),
path: path.to_string_lossy().into_owned(),
started_at,
sample_rate: params.sample_rate,
channels: params.channels,
};
recordings.insert(
key,
RecorderHandle {
stop_tx,
handle,
info: info.clone(),
},
);
Ok(info)
}
/// Stop a recording and return the result.
pub async fn stop(
&self,
rig_id: &str,
vchan_id: Option<&str>,
) -> Result<RecordingResult, String> {
let key = Self::make_key(rig_id, vchan_id);
let handle = {
let mut recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner());
recordings.remove(&key)
};
match handle {
Some(h) => {
let _ = h.stop_tx.send(true);
match h.handle.await {
Ok(Some(result)) => Ok(result),
Ok(None) => Err("recording failed".into()),
Err(e) => Err(format!("recorder task panicked: {e}")),
}
}
None => Err(format!("no active recording: {key}")),
}
}
/// List active recordings.
pub fn list_active(&self) -> Vec<RecordingInfo> {
let recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner());
recordings.values().map(|h| h.info.clone()).collect()
}
/// List recorded files in the output directory.
pub fn list_files(&self) -> Vec<RecordedFile> {
let dir = self.config.resolve_output_dir();
let mut files = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().is_some_and(|e| e == "ogg") {
let name = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_default();
let size = entry.metadata().map(|m| m.len()).unwrap_or(0);
files.push(RecordedFile { name, size });
}
}
}
files.sort_by(|a, b| b.name.cmp(&a.name)); // newest first
files
}
/// Check if a recording is active for the given key.
pub fn is_recording(&self, rig_id: &str, vchan_id: Option<&str>) -> bool {
let key = Self::make_key(rig_id, vchan_id);
let recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner());
recordings.contains_key(&key)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RecordedFile {
pub name: String,
pub size: u64,
}
// ============================================================================
// Recording task (runs in spawn_blocking)
// ============================================================================
fn run_recorder(
key: &str,
path: &Path,
mut rx: broadcast::Receiver<Bytes>,
mut stop_rx: watch::Receiver<bool>,
params: AudioParams,
max_duration_secs: Option<u64>,
) -> Option<RecordingResult> {
let mut writer = match OggOpusWriter::create(
path,
params.sample_rate,
params.channels,
params.frame_duration_ms,
) {
Ok(w) => w,
Err(e) => {
error!("Recorder [{key}]: failed to create file {path:?}: {e}");
return None;
}
};
info!("Recorder [{key}]: started → {}", path.display());
let start = std::time::Instant::now();
let max_dur = max_duration_secs.map(std::time::Duration::from_secs);
let mut packets: u64 = 0;
// Use a small runtime to bridge async broadcast → blocking writer.
let rt = tokio::runtime::Handle::current();
loop {
// Check stop signal.
if *stop_rx.borrow() {
break;
}
// Check max duration.
if let Some(max) = max_dur {
if start.elapsed() >= max {
info!("Recorder [{key}]: max duration reached");
break;
}
}
// Receive next Opus packet (blocking in spawn_blocking context).
let packet = rt.block_on(async {
tokio::select! {
result = rx.recv() => Some(result),
_ = stop_rx.changed() => None,
}
});
match packet {
Some(Ok(data)) => {
if let Err(e) = writer.write_audio_packet(&data) {
error!("Recorder [{key}]: write error: {e}");
break;
}
packets += 1;
}
Some(Err(broadcast::error::RecvError::Lagged(n))) => {
warn!("Recorder [{key}]: dropped {n} packets (lag)");
// Continue recording despite lag.
}
Some(Err(broadcast::error::RecvError::Closed)) => {
info!("Recorder [{key}]: audio channel closed");
break;
}
None => {
// Stop signal received.
break;
}
}
}
let duration_secs = start.elapsed().as_secs_f64();
let bytes_written = match writer.finalize() {
Ok(n) => n,
Err(e) => {
error!("Recorder [{key}]: finalize error: {e}");
0
}
};
info!(
"Recorder [{key}]: stopped — {packets} packets, {duration_secs:.1}s, {} bytes",
bytes_written
);
Some(RecordingResult {
key: key.to_string(),
path: path.to_string_lossy().into_owned(),
duration_secs,
bytes_written,
})
}
// ============================================================================
// Helpers
// ============================================================================
/// Format a Unix timestamp as `YYYY-MM-DD_HH-MM-SS`.
fn chrono_timestamp(epoch_secs: u64) -> String {
let secs = epoch_secs;
let days = secs / 86400;
let time = secs % 86400;
let hours = time / 3600;
let minutes = (time % 3600) / 60;
let seconds = time % 60;
// Simple Gregorian calendar calculation from epoch days.
let (y, m, d) = epoch_days_to_ymd(days as i64);
format!("{y:04}-{m:02}-{d:02}_{hours:02}-{minutes:02}-{seconds:02}")
}
fn epoch_days_to_ymd(days: i64) -> (i32, u32, u32) {
// Algorithm from http://howardhinnant.github.io/date_algorithms.html
let z = days + 719468;
let era = if z >= 0 { z } else { z - 146096 } / 146097;
let doe = (z - era * 146097) as u32;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y as i32, m, d)
}
@@ -86,6 +86,9 @@ pub struct ScheduleEntry {
/// frontend can allocate the corresponding virtual channels on connect.
#[serde(default)]
pub bookmark_ids: Vec<String>,
/// Whether to auto-record audio when this entry is active.
#[serde(default)]
pub record: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -102,6 +105,9 @@ pub struct SatelliteEntry {
pub center_hz: Option<u64>,
#[serde(default)]
pub bookmark_ids: Vec<String>,
/// Whether to auto-record audio when this satellite pass is active.
#[serde(default)]
pub record: bool,
}
fn default_min_elevation() -> f64 {
@@ -619,6 +625,7 @@ struct AppliedTarget {
center_hz: Option<u64>,
extra_bookmark_ids: Vec<String>,
satellite: Option<String>,
record: bool,
}
#[derive(Debug, Clone, Serialize, Default)]
@@ -692,6 +699,7 @@ pub fn spawn_scheduler_task(
bookmarks: Arc<BookmarkStoreMap>,
status_map: SchedulerStatusMap,
control: SharedSchedulerControlManager,
recorder_mgr: Option<Arc<super::recorder::RecorderManager>>,
) {
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(30));
@@ -733,6 +741,7 @@ pub fn spawn_scheduler_task(
center_hz: sat_target.center_hz,
extra_bookmark_ids: sat_target.extra_bm_ids.clone(),
satellite: Some(sat_target.satellite.clone()),
record: sat_target.record,
};
if last_applied.get(&config.remote) == Some(&target) {
@@ -773,6 +782,18 @@ pub fn spawn_scheduler_task(
continue;
}
// Manage scheduler-driven recording on target transition.
if let Some(ref mgr) = recorder_mgr {
manage_scheduler_recording(
mgr,
&context,
&config.remote,
last_applied.get(&config.remote),
&target,
)
.await;
}
last_applied.insert(config.remote.clone(), target);
continue;
}
@@ -797,7 +818,7 @@ pub fn spawn_scheduler_task(
continue;
}
let (entry_id, bm_id, center_hz, extra_bm_ids) = match &config.mode {
let (entry_id, bm_id, center_hz, extra_bm_ids, entry_record) = match &config.mode {
SchedulerMode::Disabled => continue,
SchedulerMode::Grayline => {
let Some(bm_id) = config
@@ -807,7 +828,7 @@ pub fn spawn_scheduler_task(
else {
continue;
};
(None, bm_id, None, Vec::new())
(None, bm_id, None, Vec::new(), false)
}
SchedulerMode::TimeSpan => {
let Some(entry) =
@@ -820,6 +841,7 @@ pub fn spawn_scheduler_task(
entry.bookmark_id.clone(),
entry.center_hz,
entry.bookmark_ids.clone(),
entry.record,
)
}
};
@@ -829,6 +851,7 @@ pub fn spawn_scheduler_task(
center_hz,
extra_bookmark_ids: extra_bm_ids.clone(),
satellite: None,
record: entry_record,
};
// Already at this exact scheduled target — skip.
@@ -869,6 +892,18 @@ pub fn spawn_scheduler_task(
continue;
}
// Manage scheduler-driven recording on target transition.
if let Some(ref mgr) = recorder_mgr {
manage_scheduler_recording(
mgr,
&context,
&config.remote,
last_applied.get(&config.remote),
&target,
)
.await;
}
last_applied.insert(config.remote.clone(), target);
}
}
@@ -885,6 +920,7 @@ struct SatelliteTarget {
bookmark_id: String,
center_hz: Option<u64>,
extra_bm_ids: Vec<String>,
record: bool,
}
/// Check if any configured satellite has an active pass right now.
@@ -957,6 +993,7 @@ fn find_active_satellite_target(
bookmark_id: entry.bookmark_id.clone(),
center_hz: entry.center_hz,
extra_bm_ids: entry.bookmark_ids.clone(),
record: entry.record,
})
}
@@ -1082,6 +1119,83 @@ async fn scheduler_send(
Ok(())
}
// ============================================================================
// Scheduler-driven recording
// ============================================================================
/// Manage recording state when the scheduler transitions between targets.
///
/// Stops any existing scheduler recording for the rig, then starts a new one
/// if the new target has `record: true`.
async fn manage_scheduler_recording(
mgr: &super::recorder::RecorderManager,
context: &FrontendRuntimeContext,
remote: &str,
prev: Option<&AppliedTarget>,
next: &AppliedTarget,
) {
// Stop any existing scheduler recording for this rig.
let was_recording = prev.is_some_and(|t| t.record);
if was_recording && mgr.is_recording(remote, None) {
match mgr.stop(remote, None).await {
Ok(result) => {
info!(
"scheduler: stopped recording for '{}' — {:.1}s, {} bytes",
remote, result.duration_secs, result.bytes_written
);
}
Err(e) => {
warn!("scheduler: failed to stop recording for '{}': {e}", remote);
}
}
}
// Start recording if the new target requests it.
if next.record {
let audio_tx = context
.rig_audio
.rx
.read()
.ok()
.and_then(|map| map.get(remote).cloned())
.or_else(|| context.audio.rx.clone());
if let Some(tx) = audio_tx {
let (sr, ch, fd) = stream_info(context, remote);
let params = super::recorder::AudioParams {
sample_rate: sr,
channels: ch,
frame_duration_ms: fd,
};
match mgr.start(remote, None, tx, params, None, None) {
Ok(info) => {
info!(
"scheduler: started recording for '{}' → {}",
remote, info.path
);
}
Err(e) => {
warn!("scheduler: failed to start recording for '{}': {e}", remote);
}
}
}
}
}
fn stream_info(context: &FrontendRuntimeContext, rig_id: &str) -> (u32, u8, u16) {
if let Some(rx) = context.rig_audio_info_rx(rig_id) {
if let Some(info) = rx.borrow().as_ref() {
return (info.sample_rate, info.channels, info.frame_duration_ms);
}
}
if let Some(ref info_rx) = context.audio.info {
if let Some(info) = info_rx.borrow().as_ref() {
return (info.sample_rate, info.channels, info.frame_duration_ms);
}
}
(48000, 2, 20)
}
// ============================================================================
// HTTP handlers
// ============================================================================
@@ -1264,6 +1378,7 @@ mod tests {
interleave_min,
center_hz,
bookmark_ids: Vec::new(),
record: false,
}
}
@@ -12,6 +12,8 @@ pub mod auth;
pub mod background_decode;
#[path = "bookmarks.rs"]
pub mod bookmarks;
#[path = "recorder.rs"]
pub mod recorder;
#[path = "scheduler.rs"]
pub mod scheduler;
#[path = "status.rs"]
@@ -40,6 +42,7 @@ use trx_frontend::{FrontendRuntimeContext, FrontendSpawner};
use auth::{AuthConfig, AuthState, SameSite};
use background_decode::{BackgroundDecodeManager, BackgroundDecodeStore};
use recorder::{RecorderConfig, RecorderManager};
use scheduler::{SchedulerControlManager, SchedulerStatusMap, SchedulerStoreMap};
use vchan::ClientChannelManager;
@@ -86,6 +89,9 @@ async fn serve(
let scheduler_status: SchedulerStatusMap = Arc::new(RwLock::new(HashMap::new()));
let scheduler_control = Arc::new(SchedulerControlManager::default());
let recorder_config = RecorderConfig::default();
let recorder_mgr = Arc::new(RecorderManager::new(recorder_config));
scheduler::spawn_scheduler_task(
context.clone(),
rig_tx.clone(),
@@ -93,6 +99,7 @@ async fn serve(
bookmark_store_map.clone(),
scheduler_status.clone(),
scheduler_control.clone(),
Some(recorder_mgr.clone()),
);
let background_decode_path = BackgroundDecodeStore::default_path();
@@ -151,6 +158,7 @@ async fn serve(
vchan_mgr,
session_rig_mgr,
background_decode_mgr,
recorder_mgr,
)?;
let handle = server.handle();
tokio::spawn(async move {
@@ -177,6 +185,7 @@ fn build_server(
vchan_mgr: Arc<ClientChannelManager>,
session_rig_mgr: Arc<api::SessionRigManager>,
background_decode_mgr: Arc<BackgroundDecodeManager>,
recorder_mgr: Arc<RecorderManager>,
) -> Result<Server, actix_web::Error> {
let state_data = web::Data::new(state_rx);
let rig_tx = web::Data::new(rig_tx);
@@ -192,6 +201,7 @@ fn build_server(
let vchan_mgr = web::Data::new(vchan_mgr);
let session_rig_mgr = web::Data::new(session_rig_mgr);
let background_decode_mgr = web::Data::new(background_decode_mgr);
let recorder_mgr = web::Data::new(recorder_mgr);
// Extract auth config values before moving context
let same_site = match context.http_auth.cookie_same_site.as_str() {
@@ -248,6 +258,7 @@ fn build_server(
.app_data(vchan_mgr.clone())
.app_data(session_rig_mgr.clone())
.app_data(background_decode_mgr.clone())
.app_data(recorder_mgr.clone())
.wrap(Compress::default())
.wrap(
DefaultHeaders::new()
+1
View File
@@ -51,5 +51,6 @@ pub enum RigCommand {
SetWfmDenoise(WfmDenoiseLevel),
SetSamStereoWidth(f32),
SetSamCarrierSync(bool),
SetRecorderEnabled(bool),
GetSpectrum,
}
@@ -471,6 +471,7 @@ pub fn command_from_rig_command(cmd: RigCommand) -> Box<dyn RigCommandHandler> {
| RigCommand::SetWfmDenoise(_)
| RigCommand::SetSamStereoWidth(_)
| RigCommand::SetSamCarrierSync(_)
| RigCommand::SetRecorderEnabled(_)
| RigCommand::GetSpectrum => Box::new(GetSnapshotCommand),
}
}
+2
View File
@@ -30,6 +30,8 @@ pub struct DecoderConfig {
pub wspr_decode_enabled: bool,
#[serde(default)]
pub lrpt_decode_enabled: bool,
#[serde(default)]
pub recorder_enabled: bool,
}
/// Decoder reset sequence counters for invalidating decoder windows.
+35 -1
View File
@@ -146,7 +146,8 @@ define_command_mapping! {
SetWfmStereo { enabled } <=> SetWfmStereo,
SetWfmDenoise { level } <=> SetWfmDenoise,
SetSamStereoWidth { width } <=> SetSamStereoWidth,
SetSamCarrierSync { enabled } <=> SetSamCarrierSync;
SetSamCarrierSync { enabled } <=> SetSamCarrierSync,
SetRecorderEnabled { enabled } <=> SetRecorderEnabled;
// ── Multi-field struct passthrough ───────────────────────────────
multi:
@@ -672,4 +673,37 @@ mod tests {
panic!("Round trip failed");
}
}
#[test]
fn test_client_command_to_rig_set_recorder_enabled() {
let cmd = ClientCommand::SetRecorderEnabled { enabled: true };
if let RigCommand::SetRecorderEnabled(enabled) = client_command_to_rig(cmd) {
assert!(enabled);
} else {
panic!("Expected SetRecorderEnabled");
}
}
#[test]
fn test_rig_command_to_client_set_recorder_enabled() {
let cmd = RigCommand::SetRecorderEnabled(true);
if let ClientCommand::SetRecorderEnabled { enabled } = rig_command_to_client(cmd) {
assert!(enabled);
} else {
panic!("Expected SetRecorderEnabled");
}
}
#[test]
fn test_round_trip_set_recorder_enabled() {
let original = ClientCommand::SetRecorderEnabled { enabled: false };
let rig_cmd = client_command_to_rig(original);
let client_cmd = rig_command_to_client(rig_cmd);
if let ClientCommand::SetRecorderEnabled { enabled } = client_cmd {
assert!(!enabled);
} else {
panic!("Round trip failed");
}
}
}
+1
View File
@@ -57,6 +57,7 @@ pub enum ClientCommand {
SetWfmDenoise { level: WfmDenoiseLevel },
SetSamStereoWidth { width: f32 },
SetSamCarrierSync { enabled: bool },
SetRecorderEnabled { enabled: bool },
GetSpectrum,
}
+6
View File
@@ -589,6 +589,12 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetRecorderEnabled(en) => {
ctx.state.decoders.recorder_enabled = en;
info!("Recorder {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetLrptDecoder => {
ctx.histories.clear_lrpt_history();
ctx.state.reset_seqs.lrpt_decode_reset_seq += 1;