diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 3d5e1ef..8f7f673 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -253,7 +253,8 @@ function applyAuthRestrictions() { "settings-clear-ft4-history", "settings-clear-ft2-history", "settings-clear-wspr-history", - "settings-clear-sat-history" + "settings-clear-sat-history", + "header-rec-btn" ]; pluginToggleBtns.forEach(id => { const btn = document.getElementById(id); @@ -3305,6 +3306,10 @@ function render(update) { for (const [key, entry] of Object.entries(_decoderToggles)) { syncDecoderToggle(entry, !!update[key], entry.label); } + // Recorder state sync. + if (typeof update.recorder_enabled === "boolean" && window._syncRecorderState) { + window._syncRecorderState(update.recorder_enabled); + } if (window.updateSatLiveState) window.updateSatLiveState(update); const cwAutoEl = document.getElementById("cw-auto"); const cwWpmEl = document.getElementById("cw-wpm"); @@ -8852,6 +8857,31 @@ if (headerAudioToggle) { headerAudioToggle.addEventListener("click", startRxAudio); } +// ── Recorder button ──────────────────────────────────────────────────────── +const headerRecBtn = document.getElementById("header-rec-btn"); +let recorderActive = false; +function syncRecorderBtn() { + if (!headerRecBtn) return; + headerRecBtn.classList.toggle("rec-active", recorderActive); +} +if (headerRecBtn) { + headerRecBtn.addEventListener("click", async () => { + try { + if (recorderActive) { + await postPath("/api/recorder/stop"); + } else { + await postPath("/api/recorder/start"); + } + } catch (e) { + console.error("Recorder toggle failed", e); + } + }); +} +window._syncRecorderState = function (enabled) { + recorderActive = enabled; + syncRecorderBtn(); +}; + const rxVolPct = document.getElementById("rx-vol-pct"); const txVolPct = document.getElementById("tx-vol-pct"); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html index a851f78..57b273a 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html @@ -65,6 +65,7 @@ +
@@ -1152,6 +1153,10 @@ +
@@ -1163,7 +1168,7 @@ Entry details - +
StartEndCenter freqPrimary bookmarkExtra channelsLabelInterleave (min)
StartEndCenter freqPrimary bookmarkExtra channelsLabelInterleave (min)REC
diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/scheduler.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/scheduler.js index e38fbf9..5efdf66 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/scheduler.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/scheduler.js @@ -502,6 +502,9 @@ if (ilEl) ilEl.value = entry && entry.interleave_min ? entry.interleave_min : ""; if (centerHzEl) centerHzEl.value = entry && entry.center_hz ? entry.center_hz : ""; + const recordEl = document.getElementById("scheduler-ts-entry-record"); + if (recordEl) recordEl.checked = !!(entry && entry.record); + pendingExtraBmIds = entry && Array.isArray(entry.bookmark_ids) ? entry.bookmark_ids.slice() : []; renderExtraBmList(); @@ -550,6 +553,9 @@ } if (!currentConfig.entries) currentConfig.entries = []; + const recordCb = document.getElementById("scheduler-ts-entry-record"); + const entryRecord = recordCb ? recordCb.checked : false; + const entryData = { start_min: startMin, end_min: endMin, @@ -558,6 +564,7 @@ interleave_min: entryInterleave, center_hz: centerHz, bookmark_ids: extraBmIds, + record: entryRecord, }; if (schEntryEditIdx !== null) { @@ -696,6 +703,7 @@ '' + extraCell + '' + '' + escHtml(entry.label || "") + '' + '' + il + '' + + '' + (entry.record ? 'Yes' : '') + '' + '' + '' + '' + diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css index 00c46dd..26cb5c4 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css @@ -1142,6 +1142,31 @@ small { color: var(--text-muted); } color: #00d17f; border-color: #00d17f; } +.header-bar-btn.header-rec-btn { + height: 2rem; + min-height: 0; + padding: 0 0.5rem; + font-size: 0.7rem; + font-weight: 700; + letter-spacing: 0.04em; + border: 1px solid var(--border-light); + border-radius: 6px; + background: var(--input-bg); + color: var(--text-muted); + cursor: pointer; + flex-shrink: 0; + transition: color 0.15s, border-color 0.15s, background 0.15s; +} +.header-rec-btn.rec-active { + color: #ff3b30; + border-color: #ff3b30; + background: rgba(255, 59, 48, 0.12); + animation: rec-pulse 1.5s ease-in-out infinite; +} +@keyframes rec-pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.6; } +} .header-rig-switch { display: flex; align-items: center; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs index fffe885..447ad6b 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api/mod.rs @@ -7,6 +7,7 @@ mod assets; mod bookmarks; mod decoder; +pub mod recorder; mod rig; mod sse; mod vchan; @@ -606,6 +607,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(crate::server::background_decode::delete_background_decode) .service(crate::server::background_decode::get_background_decode_status) .service(crate::server::audio::audio_ws) + // Recorder + .service(recorder::recorder_start) + .service(recorder::recorder_stop) + .service(recorder::recorder_status) + .service(recorder::recorder_files) // Static assets .service(assets::index) .service(assets::map_index) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api/recorder.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api/recorder.rs new file mode 100644 index 0000000..94b19d1 --- /dev/null +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api/recorder.rs @@ -0,0 +1,206 @@ +// SPDX-FileCopyrightText: 2026 Stan Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! HTTP API endpoints for audio recording. + +use std::sync::Arc; + +use actix_web::{get, post, web, Error, HttpResponse}; +use tokio::sync::{mpsc, watch}; + +use trx_core::{RigCommand, RigState}; +use trx_frontend::FrontendRuntimeContext; + +use super::send_command; +use crate::server::recorder::RecorderManager; + +// ============================================================================ +// Query types +// ============================================================================ + +#[derive(serde::Deserialize)] +pub struct RecorderStartQuery { + pub remote: Option, + pub vchan_id: Option, +} + +#[derive(serde::Deserialize)] +pub struct RecorderStopQuery { + pub remote: Option, + pub vchan_id: Option, +} + +// ============================================================================ +// Endpoints +// ============================================================================ + +/// Start recording audio for the active rig (or a specific vchan). +#[post("/api/recorder/start")] +pub async fn recorder_start( + query: web::Query, + context: web::Data>, + recorder_mgr: web::Data>, + state: web::Data>, + rig_tx: web::Data>, +) -> Result { + let rig_id = resolve_rig_id(&context, query.remote.as_deref()); + let vchan_id = query.vchan_id.as_deref(); + + // Resolve the audio broadcast sender for this rig/vchan. + let (audio_tx, sample_rate, channels, frame_duration_ms) = + resolve_audio_source(&context, &rig_id, vchan_id)?; + + let current_state = state.get_ref().borrow().clone(); + let freq_hz = Some(current_state.status.freq.hz); + let mode = Some(trx_protocol::mode_to_string(¤t_state.status.mode).into_owned()); + + let params = crate::server::recorder::AudioParams { + sample_rate, + channels, + frame_duration_ms, + }; + + match recorder_mgr.start( + &rig_id, + vchan_id, + audio_tx, + params, + freq_hz, + mode.as_deref(), + ) { + Ok(info) => { + // Sync recorder_enabled state to the rig. + let _ = send_command( + &rig_tx, + RigCommand::SetRecorderEnabled(true), + query.remote.clone(), + ) + .await; + Ok(HttpResponse::Ok().json(info)) + } + Err(e) => Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e }))), + } +} + +/// Stop recording. +#[post("/api/recorder/stop")] +pub async fn recorder_stop( + query: web::Query, + context: web::Data>, + recorder_mgr: web::Data>, + rig_tx: web::Data>, +) -> Result { + let rig_id = resolve_rig_id(&context, query.remote.as_deref()); + let vchan_id = query.vchan_id.as_deref(); + + match recorder_mgr.stop(&rig_id, vchan_id).await { + Ok(result) => { + // Check if any recordings remain active for this rig. + let still_recording = recorder_mgr + .list_active() + .iter() + .any(|r| r.rig_id == rig_id); + if !still_recording { + let _ = send_command( + &rig_tx, + RigCommand::SetRecorderEnabled(false), + query.remote.clone(), + ) + .await; + } + Ok(HttpResponse::Ok().json(result)) + } + Err(e) => Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e }))), + } +} + +/// Get the status of active recordings. +#[get("/api/recorder/status")] +pub async fn recorder_status( + recorder_mgr: web::Data>, +) -> Result { + let active = recorder_mgr.list_active(); + Ok(HttpResponse::Ok().json(active)) +} + +/// List recorded files in the output directory. +#[get("/api/recorder/files")] +pub async fn recorder_files( + recorder_mgr: web::Data>, +) -> Result { + let files = recorder_mgr.list_files(); + Ok(HttpResponse::Ok().json(files)) +} + +// ============================================================================ +// Helpers +// ============================================================================ + +fn resolve_rig_id(context: &FrontendRuntimeContext, remote: Option<&str>) -> String { + if let Some(r) = remote { + return r.to_string(); + } + context + .routing + .active_rig_id + .lock() + .ok() + .and_then(|v| v.clone()) + .unwrap_or_else(|| "default".to_string()) +} + +fn resolve_audio_source( + context: &FrontendRuntimeContext, + rig_id: &str, + vchan_id: Option<&str>, +) -> Result<(tokio::sync::broadcast::Sender, u32, u8, u16), Error> { + if let Some(vchan_uuid_str) = vchan_id { + // Virtual channel audio. + let uuid: uuid::Uuid = vchan_uuid_str + .parse() + .map_err(|_| actix_web::error::ErrorBadRequest("invalid vchan_id UUID"))?; + let audio = context + .vchan + .audio + .read() + .unwrap_or_else(|e| e.into_inner()); + let tx = audio + .get(&uuid) + .cloned() + .ok_or_else(|| actix_web::error::ErrorNotFound("vchan audio not found"))?; + // Virtual channels use the same stream info as the main rig. + let (sr, ch, fd) = stream_info_for_rig(context, rig_id); + Ok((tx, sr, ch, fd)) + } else { + // Main rig audio — try per-rig first, then default. + let tx = context + .rig_audio + .rx + .read() + .ok() + .and_then(|map| map.get(rig_id).cloned()) + .or_else(|| context.audio.rx.clone()) + .ok_or_else(|| actix_web::error::ErrorNotFound("no audio source for rig"))?; + + let (sr, ch, fd) = stream_info_for_rig(context, rig_id); + Ok((tx, sr, ch, fd)) + } +} + +fn stream_info_for_rig(context: &FrontendRuntimeContext, rig_id: &str) -> (u32, u8, u16) { + // Try per-rig stream info first. + if let Some(rx) = context.rig_audio_info_rx(rig_id) { + if let Some(info) = rx.borrow().as_ref() { + return (info.sample_rate, info.channels, info.frame_duration_ms); + } + } + // Fall back to the default audio info. + if let Some(ref info_rx) = context.audio.info { + if let Some(info) = info_rx.borrow().as_ref() { + return (info.sample_rate, info.channels, info.frame_duration_ms); + } + } + // Absolute fallback. + (48000, 2, 20) +} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/recorder.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/recorder.rs new file mode 100644 index 0000000..2c306fd --- /dev/null +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/recorder.rs @@ -0,0 +1,560 @@ +// SPDX-FileCopyrightText: 2026 Stan Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Audio recorder — writes incoming Opus packets to OGG/Opus files. +//! +//! The recorder subscribes to the same `broadcast::Sender` channels +//! that feed the WebSocket audio endpoint, capturing pre-encoded Opus packets +//! without any re-encoding. + +use std::collections::HashMap; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast, watch}; +use tracing::{error, info, warn}; + +// ============================================================================ +// Configuration +// ============================================================================ + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct RecorderConfig { + /// Whether the recorder feature is available. + pub enabled: bool, + /// Directory for recorded files. Default: `$XDG_DATA_HOME/trx-rs/recordings/`. + pub output_dir: Option, + /// Maximum duration of a single recording in seconds. None = unlimited. + pub max_duration_secs: Option, +} + +impl Default for RecorderConfig { + fn default() -> Self { + Self { + enabled: true, + output_dir: None, + max_duration_secs: None, + } + } +} + +impl RecorderConfig { + pub fn resolve_output_dir(&self) -> PathBuf { + if let Some(ref dir) = self.output_dir { + PathBuf::from(dir) + } else { + dirs::data_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join("trx-rs") + .join("recordings") + } + } +} + +// ============================================================================ +// Recording metadata +// ============================================================================ + +#[derive(Debug, Clone, Serialize)] +pub struct RecordingInfo { + pub key: String, + pub rig_id: String, + pub vchan_id: Option, + pub path: String, + pub started_at: i64, + pub sample_rate: u32, + pub channels: u8, +} + +#[derive(Debug, Clone, Serialize)] +pub struct RecordingResult { + pub key: String, + pub path: String, + pub duration_secs: f64, + pub bytes_written: u64, +} + +/// Audio stream parameters for a recording. +#[derive(Debug, Clone, Copy)] +pub struct AudioParams { + pub sample_rate: u32, + pub channels: u8, + pub frame_duration_ms: u16, +} + +// ============================================================================ +// OGG/Opus writer +// ============================================================================ + +/// Minimal OGG/Opus file writer. +/// +/// Writes the mandatory OpusHead and OpusTags pages, then wraps each incoming +/// Opus packet in its own OGG page. This produces a valid, seekable OGG Opus +/// stream without pulling in an external OGG crate. +struct OggOpusWriter { + file: std::fs::File, + serial: u32, + page_seq: u32, + granule_pos: u64, + samples_per_frame: u64, + bytes_written: u64, +} + +impl OggOpusWriter { + fn create( + path: &Path, + sample_rate: u32, + channels: u8, + frame_duration_ms: u16, + ) -> std::io::Result { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let file = std::fs::File::create(path)?; + + let serial = { + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + (ts & 0xFFFF_FFFF) as u32 + }; + + let samples_per_frame = (sample_rate as u64) * (frame_duration_ms as u64) / 1000; + + let mut writer = Self { + file, + serial, + page_seq: 0, + granule_pos: 0, + samples_per_frame, + bytes_written: 0, + }; + + writer.write_opus_head(sample_rate, channels)?; + writer.write_opus_tags()?; + + Ok(writer) + } + + /// Write the OpusHead identification header (OGG page, BOS). + fn write_opus_head(&mut self, sample_rate: u32, channels: u8) -> std::io::Result<()> { + let mut head = Vec::with_capacity(19); + head.extend_from_slice(b"OpusHead"); + head.push(1); // version + head.push(channels); + head.extend_from_slice(&0u16.to_le_bytes()); // pre-skip + head.extend_from_slice(&sample_rate.to_le_bytes()); // input sample rate + head.extend_from_slice(&0u16.to_le_bytes()); // output gain + head.push(0); // channel mapping family + + // BOS flag = 0x02 + self.write_ogg_page(0x02, 0, &head) + } + + /// Write the OpusTags comment header. + fn write_opus_tags(&mut self) -> std::io::Result<()> { + let vendor = b"trx-rs"; + let mut tags = Vec::with_capacity(24); + tags.extend_from_slice(b"OpusTags"); + tags.extend_from_slice(&(vendor.len() as u32).to_le_bytes()); + tags.extend_from_slice(vendor); + tags.extend_from_slice(&0u32.to_le_bytes()); // no user comments + + self.write_ogg_page(0x00, 0, &tags) + } + + /// Write a single Opus audio packet as an OGG page. + fn write_audio_packet(&mut self, opus_data: &[u8]) -> std::io::Result<()> { + self.granule_pos += self.samples_per_frame; + self.write_ogg_page(0x00, self.granule_pos, opus_data) + } + + /// Finalize the stream by writing an EOS page. + fn finalize(mut self) -> std::io::Result { + // Write an empty EOS page. + self.write_ogg_page(0x04, self.granule_pos, &[])?; + self.file.flush()?; + Ok(self.bytes_written) + } + + /// Write a single OGG page. + fn write_ogg_page( + &mut self, + header_type: u8, + granule_position: u64, + data: &[u8], + ) -> std::io::Result<()> { + // OGG page header + let mut header = Vec::with_capacity(27 + 255); + header.extend_from_slice(b"OggS"); // capture pattern + header.push(0); // stream structure version + header.push(header_type); // header type flag + header.extend_from_slice(&granule_position.to_le_bytes()); // granule position + header.extend_from_slice(&self.serial.to_le_bytes()); // stream serial number + header.extend_from_slice(&self.page_seq.to_le_bytes()); // page sequence number + header.extend_from_slice(&0u32.to_le_bytes()); // CRC (placeholder) + self.page_seq += 1; + + // Segment table: split data into 255-byte segments. + let num_segments = if data.is_empty() { + 1 + } else { + data.len().div_ceil(255) + }; + // A single packet needs lacing values: full 255-byte segments + final remainder. + let mut segments = Vec::with_capacity(num_segments); + let mut remaining = data.len(); + while remaining >= 255 { + segments.push(255u8); + remaining -= 255; + } + segments.push(remaining as u8); + + header.push(segments.len() as u8); // number of page segments + header.extend_from_slice(&segments); + + // Compute CRC-32 over header + data + let crc = ogg_crc32(&header, data); + header[22..26].copy_from_slice(&crc.to_le_bytes()); + + self.file.write_all(&header)?; + self.file.write_all(data)?; + self.bytes_written += header.len() as u64 + data.len() as u64; + Ok(()) + } +} + +/// OGG CRC-32 (polynomial 0x04C11DB7, direct algorithm). +fn ogg_crc32(header: &[u8], data: &[u8]) -> u32 { + static TABLE: std::sync::OnceLock<[u32; 256]> = std::sync::OnceLock::new(); + let table = TABLE.get_or_init(|| { + let mut t = [0u32; 256]; + for i in 0..256u32 { + let mut r = i << 24; + for _ in 0..8 { + r = if r & 0x80000000 != 0 { + (r << 1) ^ 0x04C11DB7 + } else { + r << 1 + }; + } + t[i as usize] = r; + } + t + }); + + let mut crc = 0u32; + for &b in header.iter().chain(data.iter()) { + crc = (crc << 8) ^ table[((crc >> 24) ^ (b as u32)) as usize]; + } + crc +} + +// ============================================================================ +// RecorderHandle +// ============================================================================ + +struct RecorderHandle { + stop_tx: watch::Sender, + handle: tokio::task::JoinHandle>, + info: RecordingInfo, +} + +// ============================================================================ +// RecorderManager +// ============================================================================ + +pub struct RecorderManager { + recordings: Mutex>, + config: RecorderConfig, +} + +impl RecorderManager { + pub fn new(config: RecorderConfig) -> Self { + Self { + recordings: Mutex::new(HashMap::new()), + config, + } + } + + /// Build a recording key from rig_id and optional vchan_id. + fn make_key(rig_id: &str, vchan_id: Option<&str>) -> String { + match vchan_id { + Some(v) => format!("{rig_id}:{v}"), + None => rig_id.to_string(), + } + } + + /// Start recording the given audio stream. + pub fn start( + &self, + rig_id: &str, + vchan_id: Option<&str>, + audio_rx: broadcast::Sender, + params: AudioParams, + freq_hz: Option, + mode: Option<&str>, + ) -> Result { + if !self.config.enabled { + return Err("recorder is disabled".into()); + } + + let key = Self::make_key(rig_id, vchan_id); + + let mut recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner()); + if recordings.contains_key(&key) { + return Err(format!("already recording: {key}")); + } + + let output_dir = self.config.resolve_output_dir(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + let ts = chrono_timestamp(now.as_secs()); + + let filename = { + let freq_part = freq_hz.map(|f| format!("_{f}")).unwrap_or_default(); + let mode_part = mode.map(|m| format!("_{m}")).unwrap_or_default(); + let vchan_part = vchan_id.map(|v| format!("_vchan-{v}")).unwrap_or_default(); + format!("{rig_id}{freq_part}{mode_part}{vchan_part}_{ts}.ogg") + }; + let path = output_dir.join(&filename); + + let (stop_tx, stop_rx) = watch::channel(false); + let rx = audio_rx.subscribe(); + let path_clone = path.clone(); + let max_duration = self.config.max_duration_secs; + let key_clone = key.clone(); + + let handle = tokio::task::spawn_blocking(move || { + run_recorder(&key_clone, &path_clone, rx, stop_rx, params, max_duration) + }); + + let started_at = now.as_secs() as i64; + let info = RecordingInfo { + key: key.clone(), + rig_id: rig_id.to_string(), + vchan_id: vchan_id.map(str::to_string), + path: path.to_string_lossy().into_owned(), + started_at, + sample_rate: params.sample_rate, + channels: params.channels, + }; + + recordings.insert( + key, + RecorderHandle { + stop_tx, + handle, + info: info.clone(), + }, + ); + + Ok(info) + } + + /// Stop a recording and return the result. + pub async fn stop( + &self, + rig_id: &str, + vchan_id: Option<&str>, + ) -> Result { + let key = Self::make_key(rig_id, vchan_id); + let handle = { + let mut recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner()); + recordings.remove(&key) + }; + match handle { + Some(h) => { + let _ = h.stop_tx.send(true); + match h.handle.await { + Ok(Some(result)) => Ok(result), + Ok(None) => Err("recording failed".into()), + Err(e) => Err(format!("recorder task panicked: {e}")), + } + } + None => Err(format!("no active recording: {key}")), + } + } + + /// List active recordings. + pub fn list_active(&self) -> Vec { + let recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner()); + recordings.values().map(|h| h.info.clone()).collect() + } + + /// List recorded files in the output directory. + pub fn list_files(&self) -> Vec { + let dir = self.config.resolve_output_dir(); + let mut files = Vec::new(); + if let Ok(entries) = std::fs::read_dir(&dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().is_some_and(|e| e == "ogg") { + let name = path + .file_name() + .map(|n| n.to_string_lossy().into_owned()) + .unwrap_or_default(); + let size = entry.metadata().map(|m| m.len()).unwrap_or(0); + files.push(RecordedFile { name, size }); + } + } + } + files.sort_by(|a, b| b.name.cmp(&a.name)); // newest first + files + } + + /// Check if a recording is active for the given key. + pub fn is_recording(&self, rig_id: &str, vchan_id: Option<&str>) -> bool { + let key = Self::make_key(rig_id, vchan_id); + let recordings = self.recordings.lock().unwrap_or_else(|e| e.into_inner()); + recordings.contains_key(&key) + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct RecordedFile { + pub name: String, + pub size: u64, +} + +// ============================================================================ +// Recording task (runs in spawn_blocking) +// ============================================================================ + +fn run_recorder( + key: &str, + path: &Path, + mut rx: broadcast::Receiver, + mut stop_rx: watch::Receiver, + params: AudioParams, + max_duration_secs: Option, +) -> Option { + let mut writer = match OggOpusWriter::create( + path, + params.sample_rate, + params.channels, + params.frame_duration_ms, + ) { + Ok(w) => w, + Err(e) => { + error!("Recorder [{key}]: failed to create file {path:?}: {e}"); + return None; + } + }; + + info!("Recorder [{key}]: started → {}", path.display()); + + let start = std::time::Instant::now(); + let max_dur = max_duration_secs.map(std::time::Duration::from_secs); + let mut packets: u64 = 0; + + // Use a small runtime to bridge async broadcast → blocking writer. + let rt = tokio::runtime::Handle::current(); + + loop { + // Check stop signal. + if *stop_rx.borrow() { + break; + } + + // Check max duration. + if let Some(max) = max_dur { + if start.elapsed() >= max { + info!("Recorder [{key}]: max duration reached"); + break; + } + } + + // Receive next Opus packet (blocking in spawn_blocking context). + let packet = rt.block_on(async { + tokio::select! { + result = rx.recv() => Some(result), + _ = stop_rx.changed() => None, + } + }); + + match packet { + Some(Ok(data)) => { + if let Err(e) = writer.write_audio_packet(&data) { + error!("Recorder [{key}]: write error: {e}"); + break; + } + packets += 1; + } + Some(Err(broadcast::error::RecvError::Lagged(n))) => { + warn!("Recorder [{key}]: dropped {n} packets (lag)"); + // Continue recording despite lag. + } + Some(Err(broadcast::error::RecvError::Closed)) => { + info!("Recorder [{key}]: audio channel closed"); + break; + } + None => { + // Stop signal received. + break; + } + } + } + + let duration_secs = start.elapsed().as_secs_f64(); + let bytes_written = match writer.finalize() { + Ok(n) => n, + Err(e) => { + error!("Recorder [{key}]: finalize error: {e}"); + 0 + } + }; + + info!( + "Recorder [{key}]: stopped — {packets} packets, {duration_secs:.1}s, {} bytes", + bytes_written + ); + + Some(RecordingResult { + key: key.to_string(), + path: path.to_string_lossy().into_owned(), + duration_secs, + bytes_written, + }) +} + +// ============================================================================ +// Helpers +// ============================================================================ + +/// Format a Unix timestamp as `YYYY-MM-DD_HH-MM-SS`. +fn chrono_timestamp(epoch_secs: u64) -> String { + let secs = epoch_secs; + let days = secs / 86400; + let time = secs % 86400; + let hours = time / 3600; + let minutes = (time % 3600) / 60; + let seconds = time % 60; + + // Simple Gregorian calendar calculation from epoch days. + let (y, m, d) = epoch_days_to_ymd(days as i64); + format!("{y:04}-{m:02}-{d:02}_{hours:02}-{minutes:02}-{seconds:02}") +} + +fn epoch_days_to_ymd(days: i64) -> (i32, u32, u32) { + // Algorithm from http://howardhinnant.github.io/date_algorithms.html + let z = days + 719468; + let era = if z >= 0 { z } else { z - 146096 } / 146097; + let doe = (z - era * 146097) as u32; + let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; + let y = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d = doy - (153 * mp + 2) / 5 + 1; + let m = if mp < 10 { mp + 3 } else { mp - 9 }; + let y = if m <= 2 { y + 1 } else { y }; + (y as i32, m, d) +} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs index 443e703..3170125 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs @@ -86,6 +86,9 @@ pub struct ScheduleEntry { /// frontend can allocate the corresponding virtual channels on connect. #[serde(default)] pub bookmark_ids: Vec, + /// Whether to auto-record audio when this entry is active. + #[serde(default)] + pub record: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -102,6 +105,9 @@ pub struct SatelliteEntry { pub center_hz: Option, #[serde(default)] pub bookmark_ids: Vec, + /// Whether to auto-record audio when this satellite pass is active. + #[serde(default)] + pub record: bool, } fn default_min_elevation() -> f64 { @@ -619,6 +625,7 @@ struct AppliedTarget { center_hz: Option, extra_bookmark_ids: Vec, satellite: Option, + record: bool, } #[derive(Debug, Clone, Serialize, Default)] @@ -692,6 +699,7 @@ pub fn spawn_scheduler_task( bookmarks: Arc, status_map: SchedulerStatusMap, control: SharedSchedulerControlManager, + recorder_mgr: Option>, ) { tokio::spawn(async move { let mut interval = time::interval(Duration::from_secs(30)); @@ -733,6 +741,7 @@ pub fn spawn_scheduler_task( center_hz: sat_target.center_hz, extra_bookmark_ids: sat_target.extra_bm_ids.clone(), satellite: Some(sat_target.satellite.clone()), + record: sat_target.record, }; if last_applied.get(&config.remote) == Some(&target) { @@ -773,6 +782,18 @@ pub fn spawn_scheduler_task( continue; } + // Manage scheduler-driven recording on target transition. + if let Some(ref mgr) = recorder_mgr { + manage_scheduler_recording( + mgr, + &context, + &config.remote, + last_applied.get(&config.remote), + &target, + ) + .await; + } + last_applied.insert(config.remote.clone(), target); continue; } @@ -797,7 +818,7 @@ pub fn spawn_scheduler_task( continue; } - let (entry_id, bm_id, center_hz, extra_bm_ids) = match &config.mode { + let (entry_id, bm_id, center_hz, extra_bm_ids, entry_record) = match &config.mode { SchedulerMode::Disabled => continue, SchedulerMode::Grayline => { let Some(bm_id) = config @@ -807,7 +828,7 @@ pub fn spawn_scheduler_task( else { continue; }; - (None, bm_id, None, Vec::new()) + (None, bm_id, None, Vec::new(), false) } SchedulerMode::TimeSpan => { let Some(entry) = @@ -820,6 +841,7 @@ pub fn spawn_scheduler_task( entry.bookmark_id.clone(), entry.center_hz, entry.bookmark_ids.clone(), + entry.record, ) } }; @@ -829,6 +851,7 @@ pub fn spawn_scheduler_task( center_hz, extra_bookmark_ids: extra_bm_ids.clone(), satellite: None, + record: entry_record, }; // Already at this exact scheduled target — skip. @@ -869,6 +892,18 @@ pub fn spawn_scheduler_task( continue; } + // Manage scheduler-driven recording on target transition. + if let Some(ref mgr) = recorder_mgr { + manage_scheduler_recording( + mgr, + &context, + &config.remote, + last_applied.get(&config.remote), + &target, + ) + .await; + } + last_applied.insert(config.remote.clone(), target); } } @@ -885,6 +920,7 @@ struct SatelliteTarget { bookmark_id: String, center_hz: Option, extra_bm_ids: Vec, + record: bool, } /// Check if any configured satellite has an active pass right now. @@ -957,6 +993,7 @@ fn find_active_satellite_target( bookmark_id: entry.bookmark_id.clone(), center_hz: entry.center_hz, extra_bm_ids: entry.bookmark_ids.clone(), + record: entry.record, }) } @@ -1082,6 +1119,83 @@ async fn scheduler_send( Ok(()) } +// ============================================================================ +// Scheduler-driven recording +// ============================================================================ + +/// Manage recording state when the scheduler transitions between targets. +/// +/// Stops any existing scheduler recording for the rig, then starts a new one +/// if the new target has `record: true`. +async fn manage_scheduler_recording( + mgr: &super::recorder::RecorderManager, + context: &FrontendRuntimeContext, + remote: &str, + prev: Option<&AppliedTarget>, + next: &AppliedTarget, +) { + // Stop any existing scheduler recording for this rig. + let was_recording = prev.is_some_and(|t| t.record); + if was_recording && mgr.is_recording(remote, None) { + match mgr.stop(remote, None).await { + Ok(result) => { + info!( + "scheduler: stopped recording for '{}' — {:.1}s, {} bytes", + remote, result.duration_secs, result.bytes_written + ); + } + Err(e) => { + warn!("scheduler: failed to stop recording for '{}': {e}", remote); + } + } + } + + // Start recording if the new target requests it. + if next.record { + let audio_tx = context + .rig_audio + .rx + .read() + .ok() + .and_then(|map| map.get(remote).cloned()) + .or_else(|| context.audio.rx.clone()); + + if let Some(tx) = audio_tx { + let (sr, ch, fd) = stream_info(context, remote); + let params = super::recorder::AudioParams { + sample_rate: sr, + channels: ch, + frame_duration_ms: fd, + }; + match mgr.start(remote, None, tx, params, None, None) { + Ok(info) => { + info!( + "scheduler: started recording for '{}' → {}", + remote, info.path + ); + } + Err(e) => { + warn!("scheduler: failed to start recording for '{}': {e}", remote); + } + } + } + } +} + +fn stream_info(context: &FrontendRuntimeContext, rig_id: &str) -> (u32, u8, u16) { + if let Some(rx) = context.rig_audio_info_rx(rig_id) { + if let Some(info) = rx.borrow().as_ref() { + return (info.sample_rate, info.channels, info.frame_duration_ms); + } + } + if let Some(ref info_rx) = context.audio.info { + if let Some(info) = info_rx.borrow().as_ref() { + return (info.sample_rate, info.channels, info.frame_duration_ms); + } + } + (48000, 2, 20) +} + // ============================================================================ // HTTP handlers // ============================================================================ @@ -1264,6 +1378,7 @@ mod tests { interleave_min, center_hz, bookmark_ids: Vec::new(), + record: false, } } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index 9caa8be..daff87f 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -12,6 +12,8 @@ pub mod auth; pub mod background_decode; #[path = "bookmarks.rs"] pub mod bookmarks; +#[path = "recorder.rs"] +pub mod recorder; #[path = "scheduler.rs"] pub mod scheduler; #[path = "status.rs"] @@ -40,6 +42,7 @@ use trx_frontend::{FrontendRuntimeContext, FrontendSpawner}; use auth::{AuthConfig, AuthState, SameSite}; use background_decode::{BackgroundDecodeManager, BackgroundDecodeStore}; +use recorder::{RecorderConfig, RecorderManager}; use scheduler::{SchedulerControlManager, SchedulerStatusMap, SchedulerStoreMap}; use vchan::ClientChannelManager; @@ -86,6 +89,9 @@ async fn serve( let scheduler_status: SchedulerStatusMap = Arc::new(RwLock::new(HashMap::new())); let scheduler_control = Arc::new(SchedulerControlManager::default()); + let recorder_config = RecorderConfig::default(); + let recorder_mgr = Arc::new(RecorderManager::new(recorder_config)); + scheduler::spawn_scheduler_task( context.clone(), rig_tx.clone(), @@ -93,6 +99,7 @@ async fn serve( bookmark_store_map.clone(), scheduler_status.clone(), scheduler_control.clone(), + Some(recorder_mgr.clone()), ); let background_decode_path = BackgroundDecodeStore::default_path(); @@ -151,6 +158,7 @@ async fn serve( vchan_mgr, session_rig_mgr, background_decode_mgr, + recorder_mgr, )?; let handle = server.handle(); tokio::spawn(async move { @@ -177,6 +185,7 @@ fn build_server( vchan_mgr: Arc, session_rig_mgr: Arc, background_decode_mgr: Arc, + recorder_mgr: Arc, ) -> Result { let state_data = web::Data::new(state_rx); let rig_tx = web::Data::new(rig_tx); @@ -192,6 +201,7 @@ fn build_server( let vchan_mgr = web::Data::new(vchan_mgr); let session_rig_mgr = web::Data::new(session_rig_mgr); let background_decode_mgr = web::Data::new(background_decode_mgr); + let recorder_mgr = web::Data::new(recorder_mgr); // Extract auth config values before moving context let same_site = match context.http_auth.cookie_same_site.as_str() { @@ -248,6 +258,7 @@ fn build_server( .app_data(vchan_mgr.clone()) .app_data(session_rig_mgr.clone()) .app_data(background_decode_mgr.clone()) + .app_data(recorder_mgr.clone()) .wrap(Compress::default()) .wrap( DefaultHeaders::new() diff --git a/src/trx-core/src/rig/command.rs b/src/trx-core/src/rig/command.rs index a086d27..1c8b4a3 100644 --- a/src/trx-core/src/rig/command.rs +++ b/src/trx-core/src/rig/command.rs @@ -51,5 +51,6 @@ pub enum RigCommand { SetWfmDenoise(WfmDenoiseLevel), SetSamStereoWidth(f32), SetSamCarrierSync(bool), + SetRecorderEnabled(bool), GetSpectrum, } diff --git a/src/trx-core/src/rig/controller/handlers.rs b/src/trx-core/src/rig/controller/handlers.rs index 52515ee..19094ef 100644 --- a/src/trx-core/src/rig/controller/handlers.rs +++ b/src/trx-core/src/rig/controller/handlers.rs @@ -471,6 +471,7 @@ pub fn command_from_rig_command(cmd: RigCommand) -> Box { | RigCommand::SetWfmDenoise(_) | RigCommand::SetSamStereoWidth(_) | RigCommand::SetSamCarrierSync(_) + | RigCommand::SetRecorderEnabled(_) | RigCommand::GetSpectrum => Box::new(GetSnapshotCommand), } } diff --git a/src/trx-core/src/rig/state.rs b/src/trx-core/src/rig/state.rs index e42626b..743013e 100644 --- a/src/trx-core/src/rig/state.rs +++ b/src/trx-core/src/rig/state.rs @@ -30,6 +30,8 @@ pub struct DecoderConfig { pub wspr_decode_enabled: bool, #[serde(default)] pub lrpt_decode_enabled: bool, + #[serde(default)] + pub recorder_enabled: bool, } /// Decoder reset sequence counters for invalidating decoder windows. diff --git a/src/trx-protocol/src/mapping.rs b/src/trx-protocol/src/mapping.rs index 97759ec..925d1a2 100644 --- a/src/trx-protocol/src/mapping.rs +++ b/src/trx-protocol/src/mapping.rs @@ -146,7 +146,8 @@ define_command_mapping! { SetWfmStereo { enabled } <=> SetWfmStereo, SetWfmDenoise { level } <=> SetWfmDenoise, SetSamStereoWidth { width } <=> SetSamStereoWidth, - SetSamCarrierSync { enabled } <=> SetSamCarrierSync; + SetSamCarrierSync { enabled } <=> SetSamCarrierSync, + SetRecorderEnabled { enabled } <=> SetRecorderEnabled; // ── Multi-field struct passthrough ─────────────────────────────── multi: @@ -672,4 +673,37 @@ mod tests { panic!("Round trip failed"); } } + + #[test] + fn test_client_command_to_rig_set_recorder_enabled() { + let cmd = ClientCommand::SetRecorderEnabled { enabled: true }; + if let RigCommand::SetRecorderEnabled(enabled) = client_command_to_rig(cmd) { + assert!(enabled); + } else { + panic!("Expected SetRecorderEnabled"); + } + } + + #[test] + fn test_rig_command_to_client_set_recorder_enabled() { + let cmd = RigCommand::SetRecorderEnabled(true); + if let ClientCommand::SetRecorderEnabled { enabled } = rig_command_to_client(cmd) { + assert!(enabled); + } else { + panic!("Expected SetRecorderEnabled"); + } + } + + #[test] + fn test_round_trip_set_recorder_enabled() { + let original = ClientCommand::SetRecorderEnabled { enabled: false }; + let rig_cmd = client_command_to_rig(original); + let client_cmd = rig_command_to_client(rig_cmd); + + if let ClientCommand::SetRecorderEnabled { enabled } = client_cmd { + assert!(!enabled); + } else { + panic!("Round trip failed"); + } + } } diff --git a/src/trx-protocol/src/types.rs b/src/trx-protocol/src/types.rs index 8cab418..6cfd1a4 100644 --- a/src/trx-protocol/src/types.rs +++ b/src/trx-protocol/src/types.rs @@ -57,6 +57,7 @@ pub enum ClientCommand { SetWfmDenoise { level: WfmDenoiseLevel }, SetSamStereoWidth { width: f32 }, SetSamCarrierSync { enabled: bool }, + SetRecorderEnabled { enabled: bool }, GetSpectrum, } diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 7fc2b48..30d7487 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -589,6 +589,12 @@ async fn process_command( let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state); } + RigCommand::SetRecorderEnabled(en) => { + ctx.state.decoders.recorder_enabled = en; + info!("Recorder {}", if en { "enabled" } else { "disabled" }); + let _ = ctx.state_tx.send(ctx.state.clone()); + return snapshot_from(ctx.state); + } RigCommand::ResetLrptDecoder => { ctx.histories.clear_lrpt_history(); ctx.state.reset_seqs.lrpt_decode_reset_seq += 1;