[feat](trx-frontend-http): add audio WebSocket endpoint with auto-PTT
Add /audio WebSocket endpoint that streams RX Opus frames to the browser and accepts TX frames back. Browser UI includes RX/TX Audio toggle buttons with WebCodecs Opus decode/encode and a level indicator. TX audio automatically engages PTT on start and releases on stop or WebSocket disconnect. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -15,6 +15,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
actix-web = "=4.4.1"
|
||||
actix-ws = "0.3"
|
||||
tokio-stream = { version = "0.1", features = ["sync"] }
|
||||
futures-util = "0.3"
|
||||
bytes = "1"
|
||||
|
||||
@@ -167,6 +167,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
.service(set_mode)
|
||||
.service(set_ptt)
|
||||
.service(set_tx_limit)
|
||||
.service(crate::server::audio::audio_ws)
|
||||
.service(favicon)
|
||||
.service(logo);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
//! Audio WebSocket endpoint for the HTTP frontend.
|
||||
//!
|
||||
//! Exposes `/audio` which upgrades to a WebSocket:
|
||||
//! - First text message: JSON `AudioStreamInfo`
|
||||
//! - Subsequent binary messages: raw Opus packets (RX)
|
||||
//! - Browser sends binary messages: raw Opus packets (TX)
|
||||
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
|
||||
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
|
||||
use actix_ws::Message;
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use tracing::warn;
|
||||
|
||||
use trx_core::audio::AudioStreamInfo;
|
||||
|
||||
struct AudioChannels {
|
||||
rx: broadcast::Sender<Bytes>,
|
||||
tx: mpsc::Sender<Bytes>,
|
||||
info: watch::Receiver<Option<AudioStreamInfo>>,
|
||||
}
|
||||
|
||||
fn audio_channels() -> &'static Mutex<Option<AudioChannels>> {
|
||||
static CHANNELS: OnceLock<Mutex<Option<AudioChannels>>> = OnceLock::new();
|
||||
CHANNELS.get_or_init(|| Mutex::new(None))
|
||||
}
|
||||
|
||||
/// Set the audio channels from the client main. Must be called before the
|
||||
/// HTTP server starts if audio is enabled.
|
||||
pub fn set_audio_channels(
|
||||
rx: broadcast::Sender<Bytes>,
|
||||
tx: mpsc::Sender<Bytes>,
|
||||
info: watch::Receiver<Option<AudioStreamInfo>>,
|
||||
) {
|
||||
let mut ch = audio_channels()
|
||||
.lock()
|
||||
.expect("audio channels mutex poisoned");
|
||||
*ch = Some(AudioChannels { rx, tx, info });
|
||||
}
|
||||
|
||||
#[get("/audio")]
|
||||
pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
|
||||
let channels = audio_channels().lock().expect("audio channels mutex poisoned");
|
||||
let Some(ref ch) = *channels else {
|
||||
return Ok(HttpResponse::NotFound().body("audio not enabled"));
|
||||
};
|
||||
|
||||
let mut rx_sub = ch.rx.subscribe();
|
||||
let tx_sender = ch.tx.clone();
|
||||
let mut info_rx = ch.info.clone();
|
||||
drop(channels);
|
||||
|
||||
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
|
||||
|
||||
// Spawn the WebSocket handler
|
||||
actix_web::rt::spawn(async move {
|
||||
// Wait for stream info and send as first text message
|
||||
let info = loop {
|
||||
if let Some(info) = info_rx.borrow().clone() {
|
||||
break info;
|
||||
}
|
||||
if info_rx.changed().await.is_err() {
|
||||
let _ = session.close(None).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let info_json = match serde_json::to_string(&info) {
|
||||
Ok(j) => j,
|
||||
Err(_) => {
|
||||
let _ = session.close(None).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
if session.text(info_json).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Spawn RX forwarding task
|
||||
let mut rx_session = session.clone();
|
||||
let rx_handle = actix_web::rt::spawn(async move {
|
||||
loop {
|
||||
match rx_sub.recv().await {
|
||||
Ok(packet) => {
|
||||
if rx_session.binary(packet).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("Audio WS: dropped {} RX frames", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Read TX frames from browser
|
||||
while let Some(Ok(msg)) = msg_stream.recv().await {
|
||||
match msg {
|
||||
Message::Binary(data) => {
|
||||
let _ = tx_sender.send(data).await;
|
||||
}
|
||||
Message::Close(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
rx_handle.abort();
|
||||
let _ = session.close(None).await;
|
||||
});
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
@@ -4,6 +4,8 @@
|
||||
|
||||
pub mod server;
|
||||
|
||||
pub use server::audio::set_audio_channels;
|
||||
|
||||
pub fn register_frontend() {
|
||||
use trx_frontend::FrontendSpawner;
|
||||
trx_frontend::register_frontend("http", server::HttpFrontend::spawn_frontend);
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
|
||||
#[path = "api.rs"]
|
||||
mod api;
|
||||
#[path = "audio.rs"]
|
||||
pub mod audio;
|
||||
#[path = "status.rs"]
|
||||
pub mod status;
|
||||
|
||||
|
||||
@@ -123,6 +123,17 @@ const INDEX_HTML_TEMPLATE: &str = r##"<!DOCTYPE html>
|
||||
<small>Units depend on rig (percent/watts).</small>
|
||||
</div>
|
||||
</div>
|
||||
<div class="full-row" id="audio-row">
|
||||
<div class="label">Audio</div>
|
||||
<div class="inline" style="gap: 0.6rem; flex-wrap: wrap; align-items: center;">
|
||||
<button id="rx-audio-btn" type="button">RX Audio</button>
|
||||
<button id="tx-audio-btn" type="button">TX Audio</button>
|
||||
<div id="audio-level" style="flex: 1 1 auto; height: 12px; border-radius: 999px; background: #1f2937; border: 1px solid #2d3748; overflow: hidden; min-width: 80px;">
|
||||
<div id="audio-level-fill" style="height: 100%; width: 0%; background: linear-gradient(90deg, #00d17f, #f0ad4e); transition: width 100ms ease;"></div>
|
||||
</div>
|
||||
<small id="audio-status" style="min-width: 60px;">Off</small>
|
||||
</div>
|
||||
</div>
|
||||
<div class="footer">
|
||||
<div class="hint" id="power-hint">Connecting…</div>
|
||||
</div>
|
||||
@@ -582,6 +593,259 @@ const INDEX_HTML_TEMPLATE: &str = r##"<!DOCTYPE html>
|
||||
});
|
||||
|
||||
connect();
|
||||
|
||||
// --- Audio streaming ---
|
||||
const rxAudioBtn = document.getElementById("rx-audio-btn");
|
||||
const txAudioBtn = document.getElementById("tx-audio-btn");
|
||||
const audioStatus = document.getElementById("audio-status");
|
||||
const audioLevelFill = document.getElementById("audio-level-fill");
|
||||
|
||||
let audioWs = null;
|
||||
let audioCtx = null;
|
||||
let rxActive = false;
|
||||
let txActive = false;
|
||||
let txStream = null;
|
||||
let txProcessor = null;
|
||||
let streamInfo = null;
|
||||
|
||||
// Simple ring-buffer based audio player
|
||||
let playBuffer = [];
|
||||
let playNode = null;
|
||||
|
||||
function startRxAudio() {
|
||||
if (rxActive) { stopRxAudio(); return; }
|
||||
const proto = location.protocol === "https:" ? "wss:" : "ws:";
|
||||
audioWs = new WebSocket(`${proto}//${location.host}/audio`);
|
||||
audioWs.binaryType = "arraybuffer";
|
||||
audioStatus.textContent = "Connecting…";
|
||||
|
||||
audioWs.onopen = () => {
|
||||
audioStatus.textContent = "Connected";
|
||||
};
|
||||
|
||||
audioWs.onmessage = (evt) => {
|
||||
if (typeof evt.data === "string") {
|
||||
// Stream info JSON
|
||||
try {
|
||||
streamInfo = JSON.parse(evt.data);
|
||||
audioCtx = new AudioContext({ sampleRate: streamInfo.sample_rate || 48000 });
|
||||
rxActive = true;
|
||||
rxAudioBtn.style.borderColor = "#00d17f";
|
||||
rxAudioBtn.style.color = "#00d17f";
|
||||
audioStatus.textContent = "RX";
|
||||
} catch (e) {
|
||||
console.error("Audio stream info parse error", e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Binary Opus data — decode via WebCodecs AudioDecoder if available
|
||||
if (!audioCtx) return;
|
||||
const data = new Uint8Array(evt.data);
|
||||
|
||||
// Show level indicator from packet size (rough estimate)
|
||||
const level = Math.min(100, (data.length / 120) * 100);
|
||||
audioLevelFill.style.width = `${level}%`;
|
||||
|
||||
// Use WebCodecs AudioDecoder for Opus if available
|
||||
if (typeof AudioDecoder !== "undefined" && !window._opusDecoder) {
|
||||
try {
|
||||
const channels = (streamInfo && streamInfo.channels) || 1;
|
||||
const sampleRate = (streamInfo && streamInfo.sample_rate) || 48000;
|
||||
window._opusDecoder = new AudioDecoder({
|
||||
output: (frame) => {
|
||||
const buf = new Float32Array(frame.numberOfFrames * frame.numberOfChannels);
|
||||
frame.copyTo(buf, { planeIndex: 0 });
|
||||
const ab = audioCtx.createBuffer(frame.numberOfChannels, frame.numberOfFrames, frame.sampleRate);
|
||||
for (let ch = 0; ch < frame.numberOfChannels; ch++) {
|
||||
const chData = new Float32Array(frame.numberOfFrames);
|
||||
for (let i = 0; i < frame.numberOfFrames; i++) {
|
||||
chData[i] = buf[i * frame.numberOfChannels + ch];
|
||||
}
|
||||
ab.copyToChannel(chData, ch);
|
||||
}
|
||||
const src = audioCtx.createBufferSource();
|
||||
src.buffer = ab;
|
||||
src.connect(audioCtx.destination);
|
||||
const now = audioCtx.currentTime;
|
||||
const schedTime = Math.max(now, (window._nextPlayTime || now));
|
||||
src.start(schedTime);
|
||||
window._nextPlayTime = schedTime + ab.duration;
|
||||
frame.close();
|
||||
},
|
||||
error: (e) => { console.error("AudioDecoder error", e); }
|
||||
});
|
||||
window._opusDecoder.configure({
|
||||
codec: "opus",
|
||||
sampleRate: sampleRate,
|
||||
numberOfChannels: channels,
|
||||
});
|
||||
} catch (e) {
|
||||
console.warn("WebCodecs AudioDecoder not available for Opus", e);
|
||||
window._opusDecoder = null;
|
||||
}
|
||||
}
|
||||
if (window._opusDecoder) {
|
||||
try {
|
||||
window._opusDecoder.decode(new EncodedAudioChunk({
|
||||
type: "key",
|
||||
timestamp: performance.now() * 1000,
|
||||
data: data,
|
||||
}));
|
||||
} catch (e) {
|
||||
// Ignore decode errors for individual frames
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
audioWs.onclose = () => {
|
||||
// If TX was active when WS closed, release PTT
|
||||
if (txActive) { stopTxAudio(); }
|
||||
rxActive = false;
|
||||
rxAudioBtn.style.borderColor = "";
|
||||
rxAudioBtn.style.color = "";
|
||||
audioStatus.textContent = "Off";
|
||||
audioLevelFill.style.width = "0%";
|
||||
if (window._opusDecoder) {
|
||||
try { window._opusDecoder.close(); } catch(e) {}
|
||||
window._opusDecoder = null;
|
||||
}
|
||||
window._nextPlayTime = 0;
|
||||
};
|
||||
|
||||
audioWs.onerror = () => {
|
||||
audioStatus.textContent = "Error";
|
||||
};
|
||||
}
|
||||
|
||||
function stopRxAudio() {
|
||||
rxActive = false;
|
||||
if (audioWs) { audioWs.close(); audioWs = null; }
|
||||
if (audioCtx) { audioCtx.close(); audioCtx = null; }
|
||||
if (window._opusDecoder) {
|
||||
try { window._opusDecoder.close(); } catch(e) {}
|
||||
window._opusDecoder = null;
|
||||
}
|
||||
window._nextPlayTime = 0;
|
||||
rxAudioBtn.style.borderColor = "";
|
||||
rxAudioBtn.style.color = "";
|
||||
audioStatus.textContent = "Off";
|
||||
audioLevelFill.style.width = "0%";
|
||||
}
|
||||
|
||||
function startTxAudio() {
|
||||
if (txActive) { stopTxAudio(); return; }
|
||||
if (!audioWs || audioWs.readyState !== WebSocket.OPEN) {
|
||||
audioStatus.textContent = "RX first";
|
||||
return;
|
||||
}
|
||||
if (!streamInfo) return;
|
||||
|
||||
navigator.mediaDevices.getUserMedia({
|
||||
audio: { sampleRate: streamInfo.sample_rate || 48000, channelCount: streamInfo.channels || 1 }
|
||||
}).then(async (stream) => {
|
||||
txStream = stream;
|
||||
txActive = true;
|
||||
txAudioBtn.style.borderColor = "#e55353";
|
||||
txAudioBtn.style.color = "#e55353";
|
||||
audioStatus.textContent = "RX+TX";
|
||||
|
||||
// Engage PTT automatically
|
||||
try { await postPath("/set_ptt?ptt=true"); } catch (e) { console.error("PTT on failed", e); }
|
||||
|
||||
// If WebCodecs AudioEncoder is available, use it for Opus encoding
|
||||
if (typeof AudioEncoder !== "undefined") {
|
||||
const sampleRate = streamInfo.sample_rate || 48000;
|
||||
const channels = streamInfo.channels || 1;
|
||||
const encoder = new AudioEncoder({
|
||||
output: (chunk) => {
|
||||
const buf = new ArrayBuffer(chunk.byteLength);
|
||||
chunk.copyTo(buf);
|
||||
if (audioWs && audioWs.readyState === WebSocket.OPEN) {
|
||||
audioWs.send(buf);
|
||||
}
|
||||
},
|
||||
error: (e) => { console.error("AudioEncoder error", e); }
|
||||
});
|
||||
encoder.configure({
|
||||
codec: "opus",
|
||||
sampleRate: sampleRate,
|
||||
numberOfChannels: channels,
|
||||
bitrate: (streamInfo.bitrate_bps || 24000),
|
||||
});
|
||||
window._txEncoder = encoder;
|
||||
|
||||
// Use AudioWorklet or ScriptProcessor to feed encoder
|
||||
if (!audioCtx) audioCtx = new AudioContext({ sampleRate: sampleRate });
|
||||
const source = audioCtx.createMediaStreamSource(stream);
|
||||
const frameDuration = (streamInfo.frame_duration_ms || 20) / 1000;
|
||||
const frameSize = Math.floor(sampleRate * frameDuration);
|
||||
// Use ScriptProcessorNode (deprecated but widely supported)
|
||||
const processor = audioCtx.createScriptProcessor(frameSize, channels, channels);
|
||||
let tsCounter = 0;
|
||||
processor.onaudioprocess = (e) => {
|
||||
if (!txActive || !window._txEncoder) return;
|
||||
const input = e.inputBuffer;
|
||||
const data = new Float32Array(input.length * input.numberOfChannels);
|
||||
for (let ch = 0; ch < input.numberOfChannels; ch++) {
|
||||
const chData = input.getChannelData(ch);
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
data[i * input.numberOfChannels + ch] = chData[i];
|
||||
}
|
||||
}
|
||||
try {
|
||||
const frame = new AudioData({
|
||||
format: "f32-planar",
|
||||
sampleRate: input.sampleRate,
|
||||
numberOfFrames: input.length,
|
||||
numberOfChannels: input.numberOfChannels,
|
||||
timestamp: tsCounter,
|
||||
data: input.getChannelData(0),
|
||||
});
|
||||
tsCounter += (input.length / input.sampleRate) * 1_000_000;
|
||||
window._txEncoder.encode(frame);
|
||||
frame.close();
|
||||
} catch (e) {
|
||||
// Ignore
|
||||
}
|
||||
};
|
||||
source.connect(processor);
|
||||
processor.connect(audioCtx.destination);
|
||||
txProcessor = { source, processor };
|
||||
}
|
||||
}).catch((err) => {
|
||||
console.error("getUserMedia failed:", err);
|
||||
audioStatus.textContent = "Mic denied";
|
||||
});
|
||||
}
|
||||
|
||||
async function stopTxAudio() {
|
||||
if (!txActive) return;
|
||||
txActive = false;
|
||||
|
||||
// Release PTT automatically
|
||||
try { await postPath("/set_ptt?ptt=false"); } catch (e) { console.error("PTT off failed", e); }
|
||||
|
||||
if (txStream) {
|
||||
txStream.getTracks().forEach(t => t.stop());
|
||||
txStream = null;
|
||||
}
|
||||
if (txProcessor) {
|
||||
txProcessor.source.disconnect();
|
||||
txProcessor.processor.disconnect();
|
||||
txProcessor = null;
|
||||
}
|
||||
if (window._txEncoder) {
|
||||
try { window._txEncoder.close(); } catch(e) {}
|
||||
window._txEncoder = null;
|
||||
}
|
||||
txAudioBtn.style.borderColor = "";
|
||||
txAudioBtn.style.color = "";
|
||||
audioStatus.textContent = rxActive ? "RX" : "Off";
|
||||
}
|
||||
|
||||
rxAudioBtn.addEventListener("click", startRxAudio);
|
||||
txAudioBtn.addEventListener("click", startTxAudio);
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
Reference in New Issue
Block a user