[feat](trx-server): support hidden background decode channels
Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
+366
-54
@@ -1769,11 +1769,210 @@ enum VChanCmd {
|
|||||||
Subscribe {
|
Subscribe {
|
||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
pcm_rx: tokio::sync::broadcast::Receiver<Vec<f32>>,
|
pcm_rx: tokio::sync::broadcast::Receiver<Vec<f32>>,
|
||||||
|
send_audio: bool,
|
||||||
|
background_decode: Option<BackgroundDecodeSpec>,
|
||||||
},
|
},
|
||||||
/// Stop forwarding audio for the given channel.
|
/// Stop forwarding audio for the given channel.
|
||||||
Unsubscribe(Uuid),
|
Unsubscribe(Uuid),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct BackgroundDecodeSpec {
|
||||||
|
base_freq_hz: u64,
|
||||||
|
decoder_kinds: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_background_hf_aprs_decoder(
|
||||||
|
sample_rate: u32,
|
||||||
|
channels: u16,
|
||||||
|
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||||
|
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||||
|
) {
|
||||||
|
info!(
|
||||||
|
"Background HF APRS decoder started ({}Hz, {} ch)",
|
||||||
|
sample_rate, channels
|
||||||
|
);
|
||||||
|
let mut decoder = AprsDecoder::new_hf(sample_rate);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match pcm_rx.recv().await {
|
||||||
|
Ok(frame) => {
|
||||||
|
let mut mono = downmix_if_needed(frame, channels);
|
||||||
|
apply_decode_audio_gate(&mut mono);
|
||||||
|
for mut pkt in decoder.process_samples(&mono) {
|
||||||
|
if !pkt.crc_ok {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if pkt.ts_ms.is_none() {
|
||||||
|
pkt.ts_ms = Some(current_timestamp_ms());
|
||||||
|
}
|
||||||
|
let _ = decode_tx.send(DecodedMessage::HfAprs(pkt));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
warn!("Background HF APRS decoder: dropped {} PCM frames", n);
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_background_ft8_decoder(
|
||||||
|
sample_rate: u32,
|
||||||
|
channels: u16,
|
||||||
|
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||||
|
base_freq_hz: u64,
|
||||||
|
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||||
|
) {
|
||||||
|
info!(
|
||||||
|
"Background FT8 decoder started ({}Hz, {} ch @ {} Hz)",
|
||||||
|
sample_rate, channels, base_freq_hz
|
||||||
|
);
|
||||||
|
let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) {
|
||||||
|
Ok(decoder) => decoder,
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Background FT8 decoder init failed: {}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut ft8_buf: Vec<f32> = Vec::new();
|
||||||
|
let mut last_slot: i64 = -1;
|
||||||
|
let slot_len_s: i64 = 15;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match pcm_rx.recv().await {
|
||||||
|
Ok(frame) => {
|
||||||
|
let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
|
||||||
|
{
|
||||||
|
Ok(dur) => dur.as_secs() as i64,
|
||||||
|
Err(_) => 0,
|
||||||
|
};
|
||||||
|
let slot = now / slot_len_s;
|
||||||
|
if slot != last_slot {
|
||||||
|
last_slot = slot;
|
||||||
|
decoder.reset();
|
||||||
|
ft8_buf.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut mono = downmix_mono(frame, channels);
|
||||||
|
apply_decode_audio_gate(&mut mono);
|
||||||
|
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
|
||||||
|
warn!(
|
||||||
|
"Background FT8 decoder: unsupported sample rate {}",
|
||||||
|
sample_rate
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
ft8_buf.extend_from_slice(&resampled);
|
||||||
|
|
||||||
|
while ft8_buf.len() >= decoder.block_size() {
|
||||||
|
let block: Vec<f32> = ft8_buf.drain(..decoder.block_size()).collect();
|
||||||
|
decoder.process_block(&block);
|
||||||
|
let results = decoder.decode_if_ready(100);
|
||||||
|
for res in results {
|
||||||
|
let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64;
|
||||||
|
let msg = Ft8Message {
|
||||||
|
ts_ms: current_timestamp_ms(),
|
||||||
|
snr_db: res.snr_db,
|
||||||
|
dt_s: res.dt_s,
|
||||||
|
freq_hz: if abs_freq_hz.is_finite() && abs_freq_hz > 0.0 {
|
||||||
|
abs_freq_hz as f32
|
||||||
|
} else {
|
||||||
|
res.freq_hz
|
||||||
|
},
|
||||||
|
message: res.text,
|
||||||
|
};
|
||||||
|
let _ = decode_tx.send(DecodedMessage::Ft8(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
warn!("Background FT8 decoder: dropped {} PCM frames", n);
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_background_wspr_decoder(
|
||||||
|
sample_rate: u32,
|
||||||
|
channels: u16,
|
||||||
|
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||||
|
base_freq_hz: u64,
|
||||||
|
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||||
|
) {
|
||||||
|
info!(
|
||||||
|
"Background WSPR decoder started ({}Hz, {} ch @ {} Hz)",
|
||||||
|
sample_rate, channels, base_freq_hz
|
||||||
|
);
|
||||||
|
let decoder = match WsprDecoder::new() {
|
||||||
|
Ok(decoder) => decoder,
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Background WSPR decoder init failed: {}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut slot_buf: Vec<f32> = Vec::new();
|
||||||
|
let mut last_slot: i64 = -1;
|
||||||
|
let slot_len_s: i64 = 120;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match pcm_rx.recv().await {
|
||||||
|
Ok(frame) => {
|
||||||
|
let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
|
||||||
|
{
|
||||||
|
Ok(dur) => dur.as_secs() as i64,
|
||||||
|
Err(_) => 0,
|
||||||
|
};
|
||||||
|
let slot = now / slot_len_s;
|
||||||
|
if last_slot == -1 {
|
||||||
|
last_slot = slot;
|
||||||
|
} else if slot != last_slot {
|
||||||
|
match decoder.decode_slot(&slot_buf, Some(base_freq_hz)) {
|
||||||
|
Ok(results) => {
|
||||||
|
for res in results {
|
||||||
|
let msg = WsprMessage {
|
||||||
|
ts_ms: current_timestamp_ms(),
|
||||||
|
snr_db: res.snr_db,
|
||||||
|
dt_s: res.dt_s,
|
||||||
|
freq_hz: res.freq_hz,
|
||||||
|
message: res.message,
|
||||||
|
};
|
||||||
|
let _ = decode_tx.send(DecodedMessage::Wspr(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => warn!("Background WSPR decode failed: {}", err),
|
||||||
|
}
|
||||||
|
slot_buf.clear();
|
||||||
|
last_slot = slot;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut mono = downmix_mono(frame, channels);
|
||||||
|
apply_decode_audio_gate(&mut mono);
|
||||||
|
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
|
||||||
|
warn!(
|
||||||
|
"Background WSPR decoder: unsupported sample rate {}",
|
||||||
|
sample_rate
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
slot_buf.extend_from_slice(&resampled);
|
||||||
|
if slot_buf.len() > decoder.slot_samples() {
|
||||||
|
let keep = decoder.slot_samples();
|
||||||
|
let drain = slot_buf.len().saturating_sub(keep);
|
||||||
|
if drain > 0 {
|
||||||
|
slot_buf.drain(..drain);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
warn!("Background WSPR decoder: dropped {} PCM frames", n);
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Run the audio TCP listener, accepting client connections.
|
/// Run the audio TCP listener, accepting client connections.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn run_audio_listener(
|
pub async fn run_audio_listener(
|
||||||
@@ -1936,6 +2135,7 @@ async fn handle_audio_client(
|
|||||||
let (vchan_frame_tx, mut vchan_frame_rx) = mpsc::channel::<(Uuid, Bytes)>(256);
|
let (vchan_frame_tx, mut vchan_frame_rx) = mpsc::channel::<(Uuid, Bytes)>(256);
|
||||||
// Commands from the reader loop: Subscribe / Unsubscribe.
|
// Commands from the reader loop: Subscribe / Unsubscribe.
|
||||||
let (vchan_cmd_tx, mut vchan_cmd_rx) = mpsc::channel::<VChanCmd>(32);
|
let (vchan_cmd_tx, mut vchan_cmd_rx) = mpsc::channel::<VChanCmd>(32);
|
||||||
|
let (bg_decode_tx, mut bg_decode_rx) = broadcast::channel::<DecodedMessage>(128);
|
||||||
|
|
||||||
let opus_sample_rate = stream_info.sample_rate;
|
let opus_sample_rate = stream_info.sample_rate;
|
||||||
let opus_channels = stream_info.channels;
|
let opus_channels = stream_info.channels;
|
||||||
@@ -1945,8 +2145,8 @@ async fn handle_audio_client(
|
|||||||
vchan_manager.as_ref().map(|m| m.subscribe_destroyed());
|
vchan_manager.as_ref().map(|m| m.subscribe_destroyed());
|
||||||
|
|
||||||
let rx_handle = tokio::spawn(async move {
|
let rx_handle = tokio::spawn(async move {
|
||||||
// UUID → JoinHandle of per-channel Opus encoder task.
|
// UUID → JoinHandles of per-channel encoder/decoder tasks.
|
||||||
let mut vchan_tasks: std::collections::HashMap<Uuid, tokio::task::JoinHandle<()>> =
|
let mut vchan_tasks: std::collections::HashMap<Uuid, Vec<tokio::task::JoinHandle<()>>> =
|
||||||
std::collections::HashMap::new();
|
std::collections::HashMap::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -1990,6 +2190,31 @@ async fn handle_audio_client(
|
|||||||
Err(broadcast::error::RecvError::Closed) => break,
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
result = bg_decode_rx.recv() => {
|
||||||
|
match result {
|
||||||
|
Ok(msg) => {
|
||||||
|
let msg_type = match &msg {
|
||||||
|
DecodedMessage::Ais(_) => AUDIO_MSG_AIS_DECODE,
|
||||||
|
DecodedMessage::Vdes(_) => AUDIO_MSG_VDES_DECODE,
|
||||||
|
DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE,
|
||||||
|
DecodedMessage::HfAprs(_) => AUDIO_MSG_HF_APRS_DECODE,
|
||||||
|
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
|
||||||
|
DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE,
|
||||||
|
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
|
||||||
|
};
|
||||||
|
if let Ok(json) = serde_json::to_vec(&msg) {
|
||||||
|
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
|
||||||
|
warn!("Audio background decode write to {} failed: {}", peer, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
warn!("Audio background decode: {} dropped {} messages", peer, n);
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
// Virtual-channel audio frame produced by a per-channel encoder task.
|
// Virtual-channel audio frame produced by a per-channel encoder task.
|
||||||
Some((uuid, opus)) = vchan_frame_rx.recv() => {
|
Some((uuid, opus)) = vchan_frame_rx.recv() => {
|
||||||
if let Err(e) = write_vchan_audio_frame(&mut writer_for_rx, uuid, opus.as_ref()).await {
|
if let Err(e) = write_vchan_audio_frame(&mut writer_for_rx, uuid, opus.as_ref()).await {
|
||||||
@@ -2000,56 +2225,109 @@ async fn handle_audio_client(
|
|||||||
// Commands from reader loop: subscribe / unsubscribe.
|
// Commands from reader loop: subscribe / unsubscribe.
|
||||||
Some(cmd) = vchan_cmd_rx.recv() => {
|
Some(cmd) = vchan_cmd_rx.recv() => {
|
||||||
match cmd {
|
match cmd {
|
||||||
VChanCmd::Subscribe { uuid, pcm_rx } => {
|
VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => {
|
||||||
// Spin up an async Opus encoder task for this virtual channel.
|
let mut handles = Vec::new();
|
||||||
let frame_tx = vchan_frame_tx.clone();
|
|
||||||
let sr = opus_sample_rate;
|
if send_audio {
|
||||||
let ch_count = opus_channels;
|
// Spin up an async Opus encoder task for this virtual channel.
|
||||||
let mut pcm_rx = pcm_rx;
|
let frame_tx = vchan_frame_tx.clone();
|
||||||
let handle = tokio::spawn(async move {
|
let sr = opus_sample_rate;
|
||||||
let opus_ch = match ch_count {
|
let ch_count = opus_channels;
|
||||||
1 => opus::Channels::Mono,
|
let mut pcm_rx_audio = pcm_rx.resubscribe();
|
||||||
2 => opus::Channels::Stereo,
|
handles.push(tokio::spawn(async move {
|
||||||
_ => return,
|
let opus_ch = match ch_count {
|
||||||
};
|
1 => opus::Channels::Mono,
|
||||||
let mut encoder = match opus::Encoder::new(
|
2 => opus::Channels::Stereo,
|
||||||
sr,
|
_ => return,
|
||||||
opus_ch,
|
};
|
||||||
opus::Application::Audio,
|
let mut encoder = match opus::Encoder::new(
|
||||||
) {
|
sr,
|
||||||
Ok(e) => e,
|
opus_ch,
|
||||||
Err(e) => {
|
opus::Application::Audio,
|
||||||
warn!("vchan Opus encoder init failed: {}", e);
|
) {
|
||||||
return;
|
Ok(e) => e,
|
||||||
}
|
Err(e) => {
|
||||||
};
|
warn!("vchan Opus encoder init failed: {}", e);
|
||||||
let _ = encoder.set_bitrate(opus::Bitrate::Bits(32_000));
|
return;
|
||||||
let _ = encoder.set_complexity(5);
|
}
|
||||||
let mut buf = vec![0u8; 4096];
|
};
|
||||||
loop {
|
let _ = encoder.set_bitrate(opus::Bitrate::Bits(32_000));
|
||||||
match pcm_rx.recv().await {
|
let _ = encoder.set_complexity(5);
|
||||||
Ok(frame) => {
|
let mut buf = vec![0u8; 4096];
|
||||||
match encoder.encode_float(&frame, &mut buf) {
|
loop {
|
||||||
Ok(len) => {
|
match pcm_rx_audio.recv().await {
|
||||||
let pkt = Bytes::copy_from_slice(&buf[..len]);
|
Ok(frame) => {
|
||||||
if frame_tx.send((uuid, pkt)).await.is_err() {
|
match encoder.encode_float(&frame, &mut buf) {
|
||||||
break;
|
Ok(len) => {
|
||||||
|
let pkt = Bytes::copy_from_slice(&buf[..len]);
|
||||||
|
if frame_tx.send((uuid, pkt)).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("vchan Opus encode error: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
warn!("vchan Opus encode error: {}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
warn!("vchan encoder: dropped {} PCM frames", n);
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
||||||
warn!("vchan encoder: dropped {} PCM frames", n);
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => break,
|
|
||||||
}
|
}
|
||||||
}
|
}));
|
||||||
});
|
}
|
||||||
|
|
||||||
vchan_tasks.insert(uuid, handle);
|
if let Some(spec) = background_decode {
|
||||||
|
let base_freq_hz = spec.base_freq_hz;
|
||||||
|
for kind in spec.decoder_kinds {
|
||||||
|
let decode_tx = bg_decode_tx.clone();
|
||||||
|
let task_rx = pcm_rx.resubscribe();
|
||||||
|
let sr = opus_sample_rate;
|
||||||
|
let ch_count = opus_channels as u16;
|
||||||
|
let kind = kind.to_ascii_lowercase();
|
||||||
|
let handle = match kind.as_str() {
|
||||||
|
"ft8" => tokio::spawn(async move {
|
||||||
|
run_background_ft8_decoder(
|
||||||
|
sr,
|
||||||
|
ch_count,
|
||||||
|
task_rx,
|
||||||
|
base_freq_hz,
|
||||||
|
decode_tx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}),
|
||||||
|
"wspr" => tokio::spawn(async move {
|
||||||
|
run_background_wspr_decoder(
|
||||||
|
sr,
|
||||||
|
ch_count,
|
||||||
|
task_rx,
|
||||||
|
base_freq_hz,
|
||||||
|
decode_tx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}),
|
||||||
|
"hf-aprs" => tokio::spawn(async move {
|
||||||
|
run_background_hf_aprs_decoder(
|
||||||
|
sr,
|
||||||
|
ch_count,
|
||||||
|
task_rx,
|
||||||
|
decode_tx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}),
|
||||||
|
other => {
|
||||||
|
warn!("Unsupported background decoder kind '{}'", other);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
handles.push(handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handles.is_empty() {
|
||||||
|
vchan_tasks.insert(uuid, handles);
|
||||||
|
}
|
||||||
|
|
||||||
// Acknowledge to the client.
|
// Acknowledge to the client.
|
||||||
if let Err(e) = write_vchan_uuid_msg(
|
if let Err(e) = write_vchan_uuid_msg(
|
||||||
@@ -2064,8 +2342,10 @@ async fn handle_audio_client(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
VChanCmd::Unsubscribe(uuid) => {
|
VChanCmd::Unsubscribe(uuid) => {
|
||||||
if let Some(h) = vchan_tasks.remove(&uuid) {
|
if let Some(handles) = vchan_tasks.remove(&uuid) {
|
||||||
h.abort();
|
for handle in handles {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2073,8 +2353,10 @@ async fn handle_audio_client(
|
|||||||
uuid = recv_destroyed(&mut destroyed_rx) => {
|
uuid = recv_destroyed(&mut destroyed_rx) => {
|
||||||
if let Some(uuid) = uuid {
|
if let Some(uuid) = uuid {
|
||||||
// Stop encoding for this channel.
|
// Stop encoding for this channel.
|
||||||
if let Some(h) = vchan_tasks.remove(&uuid) {
|
if let Some(handles) = vchan_tasks.remove(&uuid) {
|
||||||
h.abort();
|
for handle in handles {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Notify the client.
|
// Notify the client.
|
||||||
if let Err(e) = write_vchan_uuid_msg(
|
if let Err(e) = write_vchan_uuid_msg(
|
||||||
@@ -2093,8 +2375,10 @@ async fn handle_audio_client(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Abort all per-channel encoder tasks on disconnect.
|
// Abort all per-channel encoder tasks on disconnect.
|
||||||
for (_, h) in vchan_tasks {
|
for (_, handles) in vchan_tasks {
|
||||||
h.abort();
|
for handle in handles {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -2130,12 +2414,40 @@ async fn handle_audio_client(
|
|||||||
.and_then(|s| s.parse::<Uuid>().ok());
|
.and_then(|s| s.parse::<Uuid>().ok());
|
||||||
let freq_hz = v["freq_hz"].as_u64();
|
let freq_hz = v["freq_hz"].as_u64();
|
||||||
let mode_str = v["mode"].as_str().unwrap_or("USB");
|
let mode_str = v["mode"].as_str().unwrap_or("USB");
|
||||||
|
let hidden = v["hidden"].as_bool().unwrap_or(false);
|
||||||
|
let bandwidth_hz = v["bandwidth_hz"].as_u64().map(|bw| bw as u32);
|
||||||
|
let decoder_kinds = v["decoder_kinds"]
|
||||||
|
.as_array()
|
||||||
|
.map(|arr| {
|
||||||
|
arr.iter()
|
||||||
|
.filter_map(|item| item.as_str().map(str::to_string))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
let mode = trx_protocol::codec::parse_mode(mode_str);
|
let mode = trx_protocol::codec::parse_mode(mode_str);
|
||||||
if let (Some(uuid), Some(freq_hz)) = (uuid, freq_hz) {
|
if let (Some(uuid), Some(freq_hz)) = (uuid, freq_hz) {
|
||||||
match mgr.ensure_channel_pcm(uuid, freq_hz, &mode) {
|
let ensure_result = if hidden {
|
||||||
|
mgr.ensure_background_channel_pcm(uuid, freq_hz, &mode)
|
||||||
|
} else {
|
||||||
|
mgr.ensure_channel_pcm(uuid, freq_hz, &mode)
|
||||||
|
};
|
||||||
|
match ensure_result {
|
||||||
Ok(pcm_rx) => {
|
Ok(pcm_rx) => {
|
||||||
|
if let Some(bandwidth_hz) = bandwidth_hz.filter(|bw| *bw > 0) {
|
||||||
|
if let Err(e) = mgr.set_channel_bandwidth(uuid, bandwidth_hz) {
|
||||||
|
warn!("Audio vchan SUB bandwidth apply failed: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
let _ = vchan_cmd_tx
|
let _ = vchan_cmd_tx
|
||||||
.send(VChanCmd::Subscribe { uuid, pcm_rx })
|
.send(VChanCmd::Subscribe {
|
||||||
|
uuid,
|
||||||
|
pcm_rx,
|
||||||
|
send_audio: !hidden,
|
||||||
|
background_decode: hidden.then_some(BackgroundDecodeSpec {
|
||||||
|
base_freq_hz: freq_hz,
|
||||||
|
decoder_kinds,
|
||||||
|
}),
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
Err(e) => warn!("Audio vchan SUB: {}", e),
|
Err(e) => warn!("Audio vchan SUB: {}", e),
|
||||||
|
|||||||
@@ -70,6 +70,9 @@ struct ManagedChannel {
|
|||||||
pipeline_slot: usize,
|
pipeline_slot: usize,
|
||||||
/// True only for the primary channel; prevents removal.
|
/// True only for the primary channel; prevents removal.
|
||||||
permanent: bool,
|
permanent: bool,
|
||||||
|
/// Hidden background-decode channels are omitted from the normal virtual
|
||||||
|
/// channel listing and do not count against the visible channel cap.
|
||||||
|
hidden: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@@ -124,6 +127,7 @@ impl SdrVirtualChannelManager {
|
|||||||
iq_tx: primary_iq_tx,
|
iq_tx: primary_iq_tx,
|
||||||
pipeline_slot: 0,
|
pipeline_slot: 0,
|
||||||
permanent: true,
|
permanent: true,
|
||||||
|
hidden: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
let (destroyed_tx, _) = broadcast::channel::<Uuid>(16);
|
let (destroyed_tx, _) = broadcast::channel::<Uuid>(16);
|
||||||
@@ -146,6 +150,61 @@ impl SdrVirtualChannelManager {
|
|||||||
i64::from(self.pipeline.sdr_sample_rate) / 2
|
i64::from(self.pipeline.sdr_sample_rate) / 2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn visible_channel_count(channels: &[ManagedChannel]) -> usize {
|
||||||
|
channels.iter().filter(|ch| !ch.hidden).count()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_channel(
|
||||||
|
&self,
|
||||||
|
channels: &mut Vec<ManagedChannel>,
|
||||||
|
id: Uuid,
|
||||||
|
freq_hz: u64,
|
||||||
|
mode: &RigMode,
|
||||||
|
hidden: bool,
|
||||||
|
) -> Result<broadcast::Receiver<Vec<f32>>, VChanError> {
|
||||||
|
let half_span = self.half_span_hz();
|
||||||
|
let center = self.center_hz.load(Ordering::Relaxed);
|
||||||
|
let if_hz = freq_hz as i64 - center;
|
||||||
|
if if_hz.unsigned_abs() as i64 > half_span {
|
||||||
|
return Err(VChanError::OutOfBandwidth {
|
||||||
|
half_span_hz: half_span,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hidden && Self::visible_channel_count(channels) >= self.max_total {
|
||||||
|
return Err(VChanError::CapReached {
|
||||||
|
max: self.max_total,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let bandwidth_hz = default_bandwidth_hz(mode);
|
||||||
|
let (pcm_tx, iq_tx) =
|
||||||
|
self.pipeline
|
||||||
|
.add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS);
|
||||||
|
|
||||||
|
let pipeline_slot = self
|
||||||
|
.pipeline
|
||||||
|
.channel_dsps
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.len()
|
||||||
|
.saturating_sub(1);
|
||||||
|
|
||||||
|
let pcm_rx = pcm_tx.subscribe();
|
||||||
|
channels.push(ManagedChannel {
|
||||||
|
id,
|
||||||
|
freq_hz,
|
||||||
|
mode: mode.clone(),
|
||||||
|
pcm_tx,
|
||||||
|
iq_tx,
|
||||||
|
pipeline_slot,
|
||||||
|
permanent: false,
|
||||||
|
hidden,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(pcm_rx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Called by `SoapySdrRig` whenever the hardware center frequency changes.
|
/// Called by `SoapySdrRig` whenever the hardware center frequency changes.
|
||||||
/// Recomputes the IF offset for every virtual channel.
|
/// Recomputes the IF offset for every virtual channel.
|
||||||
pub fn update_center_hz(&self, new_center_hz: i64) {
|
pub fn update_center_hz(&self, new_center_hz: i64) {
|
||||||
@@ -208,45 +267,9 @@ impl VirtualChannelManager for SdrVirtualChannelManager {
|
|||||||
freq_hz: u64,
|
freq_hz: u64,
|
||||||
mode: &RigMode,
|
mode: &RigMode,
|
||||||
) -> Result<(Uuid, broadcast::Receiver<Vec<f32>>), VChanError> {
|
) -> Result<(Uuid, broadcast::Receiver<Vec<f32>>), VChanError> {
|
||||||
let half_span = self.half_span_hz();
|
|
||||||
let center = self.center_hz.load(Ordering::Relaxed);
|
|
||||||
let if_hz = freq_hz as i64 - center;
|
|
||||||
if if_hz.unsigned_abs() as i64 > half_span {
|
|
||||||
return Err(VChanError::OutOfBandwidth {
|
|
||||||
half_span_hz: half_span,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut channels = self.channels.write().unwrap();
|
let mut channels = self.channels.write().unwrap();
|
||||||
if channels.len() >= self.max_total {
|
|
||||||
return Err(VChanError::CapReached { max: self.max_total });
|
|
||||||
}
|
|
||||||
|
|
||||||
let bandwidth_hz = default_bandwidth_hz(mode);
|
|
||||||
let (pcm_tx, iq_tx) =
|
|
||||||
self.pipeline
|
|
||||||
.add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS);
|
|
||||||
|
|
||||||
let pipeline_slot = self
|
|
||||||
.pipeline
|
|
||||||
.channel_dsps
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.len()
|
|
||||||
.saturating_sub(1);
|
|
||||||
|
|
||||||
let id = Uuid::new_v4();
|
let id = Uuid::new_v4();
|
||||||
let pcm_rx = pcm_tx.subscribe();
|
let pcm_rx = self.create_channel(&mut channels, id, freq_hz, mode, false)?;
|
||||||
channels.push(ManagedChannel {
|
|
||||||
id,
|
|
||||||
freq_hz,
|
|
||||||
mode: mode.clone(),
|
|
||||||
pcm_tx,
|
|
||||||
iq_tx,
|
|
||||||
pipeline_slot,
|
|
||||||
permanent: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok((id, pcm_rx))
|
Ok((id, pcm_rx))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -337,6 +360,7 @@ impl VirtualChannelManager for SdrVirtualChannelManager {
|
|||||||
let channels = self.channels.read().unwrap();
|
let channels = self.channels.read().unwrap();
|
||||||
channels
|
channels
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|ch| !ch.hidden)
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(idx, ch)| VChannelInfo {
|
.map(|(idx, ch)| VChannelInfo {
|
||||||
id: ch.id,
|
id: ch.id,
|
||||||
@@ -370,46 +394,25 @@ impl VirtualChannelManager for SdrVirtualChannelManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: create a new channel with the client-supplied UUID.
|
let mut channels = self.channels.write().unwrap();
|
||||||
let half_span = self.half_span_hz();
|
self.create_channel(&mut channels, id, freq_hz, mode, false)
|
||||||
let center = self.center_hz.load(Ordering::Relaxed);
|
}
|
||||||
let if_hz = freq_hz as i64 - center;
|
|
||||||
if if_hz.unsigned_abs() as i64 > half_span {
|
fn ensure_background_channel_pcm(
|
||||||
return Err(VChanError::OutOfBandwidth {
|
&self,
|
||||||
half_span_hz: half_span,
|
id: Uuid,
|
||||||
});
|
freq_hz: u64,
|
||||||
|
mode: &RigMode,
|
||||||
|
) -> Result<broadcast::Receiver<Vec<f32>>, VChanError> {
|
||||||
|
{
|
||||||
|
let channels = self.channels.read().unwrap();
|
||||||
|
if let Some(ch) = channels.iter().find(|c| c.id == id) {
|
||||||
|
return Ok(ch.pcm_tx.subscribe());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut channels = self.channels.write().unwrap();
|
let mut channels = self.channels.write().unwrap();
|
||||||
if channels.len() >= self.max_total {
|
self.create_channel(&mut channels, id, freq_hz, mode, true)
|
||||||
return Err(VChanError::CapReached { max: self.max_total });
|
|
||||||
}
|
|
||||||
|
|
||||||
let bandwidth_hz = default_bandwidth_hz(mode);
|
|
||||||
let (pcm_tx, iq_tx) =
|
|
||||||
self.pipeline
|
|
||||||
.add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS);
|
|
||||||
|
|
||||||
let pipeline_slot = self
|
|
||||||
.pipeline
|
|
||||||
.channel_dsps
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.len()
|
|
||||||
.saturating_sub(1);
|
|
||||||
|
|
||||||
let pcm_rx = pcm_tx.subscribe();
|
|
||||||
channels.push(ManagedChannel {
|
|
||||||
id,
|
|
||||||
freq_hz,
|
|
||||||
mode: mode.clone(),
|
|
||||||
pcm_tx,
|
|
||||||
iq_tx,
|
|
||||||
pipeline_slot,
|
|
||||||
permanent: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(pcm_rx)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -489,4 +492,20 @@ mod tests {
|
|||||||
let err = mgr.add_channel(10_000_000, &RigMode::USB).unwrap_err();
|
let err = mgr.add_channel(10_000_000, &RigMode::USB).unwrap_err();
|
||||||
assert!(matches!(err, VChanError::OutOfBandwidth { .. }));
|
assert!(matches!(err, VChanError::OutOfBandwidth { .. }));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hidden_background_channels_are_outside_visible_cap() {
|
||||||
|
let p = make_pipeline();
|
||||||
|
let mgr = SdrVirtualChannelManager::new(p, 1, 2); // primary + 1 visible max
|
||||||
|
mgr.update_center_hz(14_100_000);
|
||||||
|
|
||||||
|
mgr.add_channel(14_074_000, &RigMode::USB).unwrap();
|
||||||
|
let hidden_id = Uuid::new_v4();
|
||||||
|
mgr.ensure_background_channel_pcm(hidden_id, 14_075_000, &RigMode::DIG)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let visible = mgr.channels();
|
||||||
|
assert_eq!(visible.len(), 2);
|
||||||
|
assert!(visible.iter().all(|channel| channel.id != hidden_id));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user