[style](trx-rs): reformat codebase

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-03-17 22:36:11 +01:00
parent a823e66816
commit b533d704a1
22 changed files with 348 additions and 252 deletions
+1 -5
View File
@@ -166,11 +166,7 @@ fn sync_correlation_score(signal: &[f32], base_hz: f32) -> f32 {
base_hz + 2.0 * TONE_SPACING_HZ, base_hz + 2.0 * TONE_SPACING_HZ,
WSPR_SAMPLE_RATE as f32, WSPR_SAMPLE_RATE as f32,
); );
let p1 = goertzel_power( let p1 = goertzel_power(frame, base_hz + TONE_SPACING_HZ, WSPR_SAMPLE_RATE as f32);
frame,
base_hz + TONE_SPACING_HZ,
WSPR_SAMPLE_RATE as f32,
);
let p3 = goertzel_power( let p3 = goertzel_power(
frame, frame,
base_hz + 3.0 * TONE_SPACING_HZ, base_hz + 3.0 * TONE_SPACING_HZ,
+9 -7
View File
@@ -25,11 +25,11 @@ use trx_core::audio::{
parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg,
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE,
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME,
AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH, AUDIO_MSG_RX_FRAME_CH, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED,
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE,
AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE,
AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, AUDIO_MSG_WSPR_DECODE,
}; };
use trx_core::decode::DecodedMessage; use trx_core::decode::DecodedMessage;
use trx_frontend::VChanAudioCmd; use trx_frontend::VChanAudioCmd;
@@ -195,7 +195,8 @@ async fn handle_audio_connection(
} }
// Re-apply non-default bandwidth after re-subscribing. // Re-apply non-default bandwidth after re-subscribing.
if sub.bandwidth_hz > 0 { if sub.bandwidth_hz > 0 {
let bw_json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz }); let bw_json =
serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&bw_json) { if let Ok(payload) = serde_json::to_vec(&bw_json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await {
warn!("Audio vchan reconnect BW write failed: {}", e); warn!("Audio vchan reconnect BW write failed: {}", e);
@@ -209,7 +210,8 @@ async fn handle_audio_connection(
// Spawn RX read task // Spawn RX read task
let rx_tx = rx_tx.clone(); let rx_tx = rx_tx.clone();
let decode_tx = decode_tx.clone(); let decode_tx = decode_tx.clone();
let vchan_audio_rx: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>> = Arc::clone(vchan_audio); let vchan_audio_rx: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>> =
Arc::clone(vchan_audio);
let vchan_destroyed_for_rx = vchan_destroyed_tx.clone(); let vchan_destroyed_for_rx = vchan_destroyed_tx.clone();
let mut rx_handle = tokio::spawn(async move { let mut rx_handle = tokio::spawn(async move {
loop { loop {
+3 -7
View File
@@ -395,9 +395,7 @@ impl ClientConfig {
); );
} }
if self.frontends.http.decode_history_retention_min == 0 { if self.frontends.http.decode_history_retention_min == 0 {
return Err( return Err("[frontends.http].decode_history_retention_min must be > 0".to_string());
"[frontends.http].decode_history_retention_min must be > 0".to_string(),
);
} }
for (rig_id, minutes) in &self.frontends.http.decode_history_retention_min_by_rig { for (rig_id, minutes) in &self.frontends.http.decode_history_retention_min_by_rig {
if rig_id.trim().is_empty() { if rig_id.trim().is_empty() {
@@ -616,13 +614,11 @@ mod tests {
assert_eq!(config.frontends.http.spectrum_coverage_margin_hz, 50_000); assert_eq!(config.frontends.http.spectrum_coverage_margin_hz, 50_000);
assert_eq!(config.frontends.http.spectrum_usable_span_ratio, 0.92); assert_eq!(config.frontends.http.spectrum_usable_span_ratio, 0.92);
assert_eq!(config.frontends.http.decode_history_retention_min, 1440); assert_eq!(config.frontends.http.decode_history_retention_min, 1440);
assert!( assert!(config
config
.frontends .frontends
.http .http
.decode_history_retention_min_by_rig .decode_history_retention_min_by_rig
.is_empty() .is_empty());
);
assert_eq!(config.frontends.rigctl.port, 4532); assert_eq!(config.frontends.rigctl.port, 4532);
assert!(config.frontends.http_json.enabled); assert!(config.frontends.http_json.enabled);
assert_eq!(config.frontends.http_json.port, 0); assert_eq!(config.frontends.http_json.port, 0);
+7 -6
View File
@@ -185,8 +185,11 @@ async fn async_init() -> DynResult<AppState> {
cfg.frontends.http.spectrum_usable_span_ratio; cfg.frontends.http.spectrum_usable_span_ratio;
frontend_runtime.http_decode_history_retention_min = frontend_runtime.http_decode_history_retention_min =
cfg.frontends.http.decode_history_retention_min; cfg.frontends.http.decode_history_retention_min;
frontend_runtime.http_decode_history_retention_min_by_rig = frontend_runtime.http_decode_history_retention_min_by_rig = cfg
cfg.frontends.http.decode_history_retention_min_by_rig.clone(); .frontends
.http
.decode_history_retention_min_by_rig
.clone();
// Resolve remote URL: CLI > config [remote] section > error // Resolve remote URL: CLI > config [remote] section > error
let remote_url = cli let remote_url = cli
@@ -305,8 +308,7 @@ async fn async_init() -> DynResult<AppState> {
frontend_runtime.decode_rx = Some(decode_tx.clone()); frontend_runtime.decode_rx = Some(decode_tx.clone());
// Virtual-channel audio: shared broadcaster map + command channel. // Virtual-channel audio: shared broadcaster map + command channel.
let (vchan_cmd_tx, vchan_cmd_rx) = let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::unbounded_channel::<trx_frontend::VChanAudioCmd>();
mpsc::unbounded_channel::<trx_frontend::VChanAudioCmd>();
*frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64); let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
@@ -318,8 +320,7 @@ async fn async_init() -> DynResult<AppState> {
let cw_history = frontend_runtime.cw_history.clone(); let cw_history = frontend_runtime.cw_history.clone();
let ft8_history = frontend_runtime.ft8_history.clone(); let ft8_history = frontend_runtime.ft8_history.clone();
let wspr_history = frontend_runtime.wspr_history.clone(); let wspr_history = frontend_runtime.wspr_history.clone();
let replay_history_sink: Arc<dyn Fn(DecodedMessage) + Send + Sync> = let replay_history_sink: Arc<dyn Fn(DecodedMessage) + Send + Sync> = Arc::new(move |msg| {
Arc::new(move |msg| {
let now = std::time::Instant::now(); let now = std::time::Instant::now();
match msg { match msg {
DecodedMessage::Ais(mut message) => { DecodedMessage::Ais(mut message) => {
+5 -18
View File
@@ -68,10 +68,7 @@ pub async fn run_remote_client(
) -> RigResult<()> { ) -> RigResult<()> {
// Spectrum polling runs on its own dedicated TCP connection so it never // Spectrum polling runs on its own dedicated TCP connection so it never
// blocks state polls or user commands on the main connection. // blocks state polls or user commands on the main connection.
let spectrum_task = tokio::spawn(run_spectrum_connection( let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone()));
config.clone(),
shutdown_rx.clone(),
));
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
@@ -147,8 +144,7 @@ async fn run_spectrum_connection(
if let Err(e) = stream.set_nodelay(true) { if let Err(e) = stream.set_nodelay(true) {
warn!("Spectrum TCP_NODELAY failed: {}", e); warn!("Spectrum TCP_NODELAY failed: {}", e);
} }
if let Err(e) = if let Err(e) = handle_spectrum_connection(&config, stream, &mut shutdown_rx).await
handle_spectrum_connection(&config, stream, &mut shutdown_rx).await
{ {
warn!("Spectrum connection dropped: {}", e); warn!("Spectrum connection dropped: {}", e);
} }
@@ -301,10 +297,7 @@ async fn send_command(
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n'); payload.push('\n');
time::timeout( time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
IO_TIMEOUT,
writer.write_all(payload.as_bytes()),
)
.await .await
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?; .map_err(|e| RigError::communication(format!("write failed: {e}")))?;
@@ -347,10 +340,7 @@ async fn send_command_no_state_update(
let mut payload = serde_json::to_string(&envelope) let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n'); payload.push('\n');
time::timeout( time::timeout(SPECTRUM_IO_TIMEOUT, writer.write_all(payload.as_bytes()))
SPECTRUM_IO_TIMEOUT,
writer.write_all(payload.as_bytes()),
)
.await .await
.map_err(|_| { .map_err(|_| {
RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)) RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT))
@@ -443,10 +433,7 @@ async fn send_get_rigs(
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n'); payload.push('\n');
time::timeout( time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
IO_TIMEOUT,
writer.write_all(payload.as_bytes()),
)
.await .await
.map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?; .map_err(|e| RigError::communication(format!("write failed: {e}")))?;
+6 -2
View File
@@ -3,9 +3,9 @@
// SPDX-License-Identifier: BSD-2-Clause // SPDX-License-Identifier: BSD-2-Clause
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::RwLock;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::RwLock;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Instant; use std::time::Instant;
@@ -95,7 +95,11 @@ pub struct SharedSpectrum {
impl SharedSpectrum { impl SharedSpectrum {
/// Replace the stored frame, pre-serialising RDS in one pass. /// Replace the stored frame, pre-serialising RDS in one pass.
pub fn set(&mut self, frame: Option<SpectrumData>, vchan_rds: Option<Vec<trx_core::rig::state::VchanRdsEntry>>) { pub fn set(
&mut self,
frame: Option<SpectrumData>,
vchan_rds: Option<Vec<trx_core::rig::state::VchanRdsEntry>>,
) {
self.rds_json = frame self.rds_json = frame
.as_ref() .as_ref()
.and_then(|f| f.rds.as_ref()) .and_then(|f| f.rds.as_ref())
@@ -47,8 +47,16 @@ fn base64_encode(data: &[u8]) -> String {
let n = (b0 << 16) | (b1 << 8) | b2; let n = (b0 << 16) | (b1 << 8) | b2;
out.push(T[((n >> 18) & 63) as usize]); out.push(T[((n >> 18) & 63) as usize]);
out.push(T[((n >> 12) & 63) as usize]); out.push(T[((n >> 12) & 63) as usize]);
out.push(if chunk.len() > 1 { T[((n >> 6) & 63) as usize] } else { b'=' }); out.push(if chunk.len() > 1 {
out.push(if chunk.len() > 2 { T[(n & 63) as usize] } else { b'=' }); T[((n >> 6) & 63) as usize]
} else {
b'='
});
out.push(if chunk.len() > 2 {
T[(n & 63) as usize]
} else {
b'='
});
} }
// SAFETY: output contains only ASCII base64 characters. // SAFETY: output contains only ASCII base64 characters.
unsafe { String::from_utf8_unchecked(out) } unsafe { String::from_utf8_unchecked(out) }
@@ -120,23 +128,53 @@ fn inject_frontend_meta(json: &str, meta: FrontendMeta) -> String {
// Build only the extra key-value pairs as a JSON fragment. // Build only the extra key-value pairs as a JSON fragment.
let mut extra = serde_json::Map::new(); let mut extra = serde_json::Map::new();
extra.insert("clients".into(), serde_json::json!(meta.http_clients)); extra.insert("clients".into(), serde_json::json!(meta.http_clients));
extra.insert("rigctl_clients".into(), serde_json::json!(meta.rigctl_clients)); extra.insert(
if let Some(v) = meta.rigctl_addr { extra.insert("rigctl_addr".into(), serde_json::json!(v)); } "rigctl_clients".into(),
if let Some(v) = meta.active_rig_id { extra.insert("active_rig_id".into(), serde_json::json!(v)); } serde_json::json!(meta.rigctl_clients),
);
if let Some(v) = meta.rigctl_addr {
extra.insert("rigctl_addr".into(), serde_json::json!(v));
}
if let Some(v) = meta.active_rig_id {
extra.insert("active_rig_id".into(), serde_json::json!(v));
}
extra.insert("rig_ids".into(), serde_json::json!(meta.rig_ids)); extra.insert("rig_ids".into(), serde_json::json!(meta.rig_ids));
if let Some(v) = meta.owner_callsign { extra.insert("owner_callsign".into(), serde_json::json!(v)); } if let Some(v) = meta.owner_callsign {
if let Some(v) = meta.owner_website_url { extra.insert("owner_website_url".into(), serde_json::json!(v)); } extra.insert("owner_callsign".into(), serde_json::json!(v));
if let Some(v) = meta.owner_website_name { extra.insert("owner_website_name".into(), serde_json::json!(v)); } }
if let Some(v) = meta.ais_vessel_url_base { extra.insert("ais_vessel_url_base".into(), serde_json::json!(v)); } if let Some(v) = meta.owner_website_url {
extra.insert("show_sdr_gain_control".into(), serde_json::json!(meta.show_sdr_gain_control)); extra.insert("owner_website_url".into(), serde_json::json!(v));
extra.insert("initial_map_zoom".into(), serde_json::json!(meta.initial_map_zoom)); }
extra.insert("spectrum_coverage_margin_hz".into(), serde_json::json!(meta.spectrum_coverage_margin_hz)); if let Some(v) = meta.owner_website_name {
extra.insert("spectrum_usable_span_ratio".into(), serde_json::json!(meta.spectrum_usable_span_ratio)); extra.insert("owner_website_name".into(), serde_json::json!(v));
}
if let Some(v) = meta.ais_vessel_url_base {
extra.insert("ais_vessel_url_base".into(), serde_json::json!(v));
}
extra.insert(
"show_sdr_gain_control".into(),
serde_json::json!(meta.show_sdr_gain_control),
);
extra.insert(
"initial_map_zoom".into(),
serde_json::json!(meta.initial_map_zoom),
);
extra.insert(
"spectrum_coverage_margin_hz".into(),
serde_json::json!(meta.spectrum_coverage_margin_hz),
);
extra.insert(
"spectrum_usable_span_ratio".into(),
serde_json::json!(meta.spectrum_usable_span_ratio),
);
extra.insert( extra.insert(
"decode_history_retention_min".into(), "decode_history_retention_min".into(),
serde_json::json!(meta.decode_history_retention_min), serde_json::json!(meta.decode_history_retention_min),
); );
extra.insert("server_connected".into(), serde_json::json!(meta.server_connected)); extra.insert(
"server_connected".into(),
serde_json::json!(meta.server_connected),
);
// Serialize the extra map, strip its outer braces, and splice in. // Serialize the extra map, strip its outer braces, and splice in.
let extra_json = match serde_json::to_string(&extra) { let extra_json = match serde_json::to_string(&extra) {
@@ -328,9 +366,7 @@ pub async fn events(
let scheduler_control = scheduler_control_updates.clone(); let scheduler_control = scheduler_control_updates.clone();
async move { async move {
state.snapshot().and_then(|v| { state.snapshot().and_then(|v| {
if let Ok(Some(rig_id)) = if let Ok(Some(rig_id)) = context.remote_active_rig_id.lock().map(|g| g.clone()) {
context.remote_active_rig_id.lock().map(|g| g.clone())
{
vchan.update_primary( vchan.update_primary(
&rig_id, &rig_id,
v.status.freq.hz, v.status.freq.hz,
@@ -367,9 +403,8 @@ pub async fn events(
if let Some(colon) = msg.find(':') { if let Some(colon) = msg.find(':') {
let rig_id = &msg[..colon]; let rig_id = &msg[..colon];
let channels_json = &msg[colon + 1..]; let channels_json = &msg[colon + 1..];
let payload = format!( let payload =
"{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}" format!("{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}");
);
return Some(( return Some((
Ok::<Bytes, Error>(Bytes::from(format!( Ok::<Bytes, Error>(Bytes::from(format!(
"event: channels\ndata: {payload}\n\n" "event: channels\ndata: {payload}\n\n"
@@ -573,9 +608,7 @@ fn gzip_bytes(payload: &[u8]) -> std::io::Result<Vec<u8>> {
/// not block real-time messages: the client fetches this endpoint in parallel /// not block real-time messages: the client fetches this endpoint in parallel
/// with opening the SSE connection and drains it in the background. /// with opening the SSE connection and drains it in the background.
#[get("/decode/history")] #[get("/decode/history")]
pub async fn decode_history( pub async fn decode_history(context: web::Data<Arc<FrontendRuntimeContext>>) -> impl Responder {
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> impl Responder {
if context.decode_rx.is_none() { if context.decode_rx.is_none() {
return HttpResponse::NotFound().body("decode not enabled"); return HttpResponse::NotFound().body("decode not enabled");
} }
@@ -1414,9 +1447,7 @@ pub async fn delete_channel_route(
let (rig_id, channel_id) = path.into_inner(); let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.delete_channel(&rig_id, channel_id) { match vchan_mgr.delete_channel(&rig_id, channel_id) {
Ok(()) => HttpResponse::Ok().finish(), Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => { Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(),
HttpResponse::NotFound().finish()
}
Err(crate::server::vchan::VChanClientError::Permanent) => { Err(crate::server::vchan::VChanClientError::Permanent) => {
HttpResponse::BadRequest().body("cannot remove the primary channel") HttpResponse::BadRequest().body("cannot remove the primary channel")
} }
@@ -1476,9 +1507,7 @@ pub async fn set_vchan_freq(
let (rig_id, channel_id) = path.into_inner(); let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.set_channel_freq(&rig_id, channel_id, body.freq_hz) { match vchan_mgr.set_channel_freq(&rig_id, channel_id, body.freq_hz) {
Ok(()) => HttpResponse::Ok().finish(), Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => { Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(),
HttpResponse::NotFound().finish()
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()), Err(e) => HttpResponse::BadRequest().body(e.to_string()),
} }
} }
@@ -1497,9 +1526,7 @@ pub async fn set_vchan_bw(
let (rig_id, channel_id) = path.into_inner(); let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.set_channel_bandwidth(&rig_id, channel_id, body.bandwidth_hz) { match vchan_mgr.set_channel_bandwidth(&rig_id, channel_id, body.bandwidth_hz) {
Ok(()) => HttpResponse::Ok().finish(), Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => { Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(),
HttpResponse::NotFound().finish()
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()), Err(e) => HttpResponse::BadRequest().body(e.to_string()),
} }
} }
@@ -1518,9 +1545,7 @@ pub async fn set_vchan_mode(
let (rig_id, channel_id) = path.into_inner(); let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.set_channel_mode(&rig_id, channel_id, &body.mode) { match vchan_mgr.set_channel_mode(&rig_id, channel_id, &body.mode) {
Ok(()) => HttpResponse::Ok().finish(), Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => { Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(),
HttpResponse::NotFound().finish()
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()), Err(e) => HttpResponse::BadRequest().body(e.to_string()),
} }
} }
@@ -1783,14 +1808,20 @@ async fn ft8_js() -> impl Responder {
#[get("/ft4.js")] #[get("/ft4.js")]
async fn ft4_js() -> impl Responder { async fn ft4_js() -> impl Responder {
HttpResponse::Ok() HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8")) .insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::FT4_JS) .body(status::FT4_JS)
} }
#[get("/ft2.js")] #[get("/ft2.js")]
async fn ft2_js() -> impl Responder { async fn ft2_js() -> impl Responder {
HttpResponse::Ok() HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8")) .insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::FT2_JS) .body(status::FT2_JS)
} }
@@ -1951,7 +1982,14 @@ fn bookmark_decoder_state(
} }
} }
(want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) (
want_aprs,
want_hf_aprs,
want_ft8,
want_ft4,
want_ft2,
want_wspr,
)
} }
fn bookmark_decoder_kinds(bookmark: &crate::server::bookmarks::Bookmark) -> Vec<String> { fn bookmark_decoder_kinds(bookmark: &crate::server::bookmarks::Bookmark) -> Vec<String> {
@@ -2018,7 +2056,8 @@ async fn apply_selected_channel(
let Some(bookmark) = bookmark_store.get(bookmark_id) else { let Some(bookmark) = bookmark_store.get(bookmark_id) else {
return Ok(()); return Ok(());
}; };
let (want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) = bookmark_decoder_state(&bookmark); let (want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) =
bookmark_decoder_state(&bookmark);
let desired = [ let desired = [
RigCommand::SetAprsDecodeEnabled(want_aprs), RigCommand::SetAprsDecodeEnabled(want_aprs),
RigCommand::SetHfAprsDecodeEnabled(want_hf_aprs), RigCommand::SetHfAprsDecodeEnabled(want_hf_aprs),
@@ -57,7 +57,10 @@ fn decode_history_cutoff(context: &FrontendRuntimeContext) -> Instant {
Instant::now() - decode_history_retention(context) Instant::now() - decode_history_retention(context)
} }
fn prune_aprs_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, AprsPacket)>) { fn prune_aprs_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, AprsPacket)>,
) {
let cutoff = decode_history_cutoff(context); let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() { while let Some((ts, _)) = history.front() {
if *ts >= cutoff { if *ts >= cutoff {
@@ -80,7 +83,10 @@ fn prune_hf_aprs_history(
} }
} }
fn prune_ais_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, AisMessage)>) { fn prune_ais_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, AisMessage)>,
) {
let cutoff = decode_history_cutoff(context); let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() { while let Some((ts, _)) = history.front() {
if *ts >= cutoff { if *ts >= cutoff {
@@ -137,7 +143,10 @@ fn prune_cw_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(In
} }
} }
fn prune_ft8_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Ft8Message)>) { fn prune_ft8_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, Ft8Message)>,
) {
let cutoff = decode_history_cutoff(context); let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() { while let Some((ts, _)) = history.front() {
if *ts >= cutoff { if *ts >= cutoff {
@@ -147,7 +156,10 @@ fn prune_ft8_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(I
} }
} }
fn prune_ft4_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Ft8Message)>) { fn prune_ft4_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, Ft8Message)>,
) {
let cutoff = decode_history_cutoff(context); let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() { while let Some((ts, _)) = history.front() {
if *ts >= cutoff { if *ts >= cutoff {
@@ -157,7 +169,10 @@ fn prune_ft4_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(I
} }
} }
fn prune_ft2_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Ft8Message)>) { fn prune_ft2_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, Ft8Message)>,
) {
let cutoff = decode_history_cutoff(context); let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() { while let Some((ts, _)) = history.front() {
if *ts >= cutoff { if *ts >= cutoff {
@@ -85,12 +85,24 @@ impl BackgroundDecodeStore {
let _ = std::fs::create_dir_all(parent); let _ = std::fs::create_dir_all(parent);
} }
let db = if path.exists() { let db = if path.exists() {
PickleDb::load(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) PickleDb::load(
path,
PickleDbDumpPolicy::AutoDump,
SerializationMethod::Json,
)
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) PickleDb::new(
path,
PickleDbDumpPolicy::AutoDump,
SerializationMethod::Json,
)
}) })
} else { } else {
PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) PickleDb::new(
path,
PickleDbDumpPolicy::AutoDump,
SerializationMethod::Json,
)
}; };
Self { Self {
db: Arc::new(RwLock::new(db)), db: Arc::new(RwLock::new(db)),
@@ -160,7 +172,9 @@ impl BackgroundDecodeManager {
} }
pub fn get_config(&self, rig_id: &str) -> BackgroundDecodeConfig { pub fn get_config(&self, rig_id: &str) -> BackgroundDecodeConfig {
self.store.get(rig_id).unwrap_or_else(|| BackgroundDecodeConfig { self.store
.get(rig_id)
.unwrap_or_else(|| BackgroundDecodeConfig {
rig_id: rig_id.to_string(), rig_id: rig_id.to_string(),
enabled: false, enabled: false,
bookmark_ids: Vec::new(), bookmark_ids: Vec::new(),
@@ -268,10 +282,7 @@ impl BackgroundDecodeManager {
bookmark_id: bookmark.id.clone(), bookmark_id: bookmark.id.clone(),
freq_hz: bookmark.freq_hz, freq_hz: bookmark.freq_hz,
mode: bookmark.mode.clone(), mode: bookmark.mode.clone(),
bandwidth_hz: bookmark bandwidth_hz: bookmark.bandwidth_hz.unwrap_or(0).min(u32::MAX as u64) as u32,
.bandwidth_hz
.unwrap_or(0)
.min(u32::MAX as u64) as u32,
decoder_kinds, decoder_kinds,
} }
} }
@@ -565,7 +576,8 @@ fn bookmark_supported_decoder_kinds(bookmark: &Bookmark) -> Vec<String> {
} }
fn channel_matches_bookmark(channel: &ClientChannel, bookmark: &Bookmark) -> bool { fn channel_matches_bookmark(channel: &ClientChannel, bookmark: &Bookmark) -> bool {
channel.freq_hz == bookmark.freq_hz && normalized_mode(&channel.mode) == normalized_mode(&bookmark.mode) channel.freq_hz == bookmark.freq_hz
&& normalized_mode(&channel.mode) == normalized_mode(&bookmark.mode)
} }
fn normalized_mode(mode: &str) -> String { fn normalized_mode(mode: &str) -> String {
@@ -117,12 +117,24 @@ impl SchedulerStore {
let _ = std::fs::create_dir_all(parent); let _ = std::fs::create_dir_all(parent);
} }
let db = if path.exists() { let db = if path.exists() {
PickleDb::load(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) PickleDb::load(
path,
PickleDbDumpPolicy::AutoDump,
SerializationMethod::Json,
)
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) PickleDb::new(
path,
PickleDbDumpPolicy::AutoDump,
SerializationMethod::Json,
)
}) })
} else { } else {
PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) PickleDb::new(
path,
PickleDbDumpPolicy::AutoDump,
SerializationMethod::Json,
)
}; };
Self { Self {
db: Arc::new(RwLock::new(db)), db: Arc::new(RwLock::new(db)),
@@ -206,10 +218,8 @@ fn sunrise_sunset_today(lat_deg: f64, lon_deg: f64) -> Option<(f64, f64)> {
let lambda = sun_lon - 0.00569 - 0.00478 * omega.to_radians().sin(); let lambda = sun_lon - 0.00569 - 0.00478 * omega.to_radians().sin();
// Obliquity of the ecliptic. // Obliquity of the ecliptic.
let eps0 = 23.0 let eps0 =
+ (26.0 23.0 + (26.0 + (21.448 - jc * (46.8150 + jc * (0.00059 - jc * 0.001813))) / 60.0) / 60.0;
+ (21.448 - jc * (46.8150 + jc * (0.00059 - jc * 0.001813))) / 60.0)
/ 60.0;
let eps = eps0 + 0.00256 * omega.to_radians().cos(); let eps = eps0 + 0.00256 * omega.to_radians().cos();
// Sun's declination. // Sun's declination.
@@ -219,8 +229,7 @@ fn sunrise_sunset_today(lat_deg: f64, lon_deg: f64) -> Option<(f64, f64)> {
let y = (eps.to_radians() / 2.0).tan().powi(2); let y = (eps.to_radians() / 2.0).tan().powi(2);
let l0_rad = l0.to_radians(); let l0_rad = l0.to_radians();
let eot = 4.0 let eot = 4.0
* (y * (2.0 * l0_rad).sin() * (y * (2.0 * l0_rad).sin() - 2.0 * m_rad.sin()
- 2.0 * m_rad.sin()
+ 4.0 * y * m_rad.sin() * (2.0 * l0_rad).cos() + 4.0 * y * m_rad.sin() * (2.0 * l0_rad).cos()
- 0.5 * y * y * (4.0 * l0_rad).sin() - 0.5 * y * y * (4.0 * l0_rad).sin()
- 1.25 * (2.0 * m_rad).sin()) - 1.25 * (2.0 * m_rad).sin())
@@ -228,8 +237,7 @@ fn sunrise_sunset_today(lat_deg: f64, lon_deg: f64) -> Option<(f64, f64)> {
// Hour angle for sunrise/sunset (zenith = 90.833°). // Hour angle for sunrise/sunset (zenith = 90.833°).
let lat_rad = lat_deg.to_radians(); let lat_rad = lat_deg.to_radians();
let cos_ha = ((PI / 2.0 + 0.833_f64.to_radians()).cos()) let cos_ha = ((PI / 2.0 + 0.833_f64.to_radians()).cos()) / (lat_rad.cos() * decl.cos())
/ (lat_rad.cos() * decl.cos())
- lat_rad.tan() * decl.tan(); - lat_rad.tan() * decl.tan();
if !(-1.0..=1.0).contains(&cos_ha) { if !(-1.0..=1.0).contains(&cos_ha) {
@@ -654,7 +662,10 @@ pub fn spawn_scheduler_task(
) )
.await .await
{ {
warn!("scheduler: failed to apply target for '{}': {e}", config.rig_id); warn!(
"scheduler: failed to apply target for '{}': {e}",
config.rig_id
);
continue; continue;
} }
@@ -678,7 +689,11 @@ async fn apply_scheduler_decoders(
let mut want_wspr = false; let mut want_wspr = false;
let mut update_from = |bm: &crate::server::bookmarks::Bookmark| { let mut update_from = |bm: &crate::server::bookmarks::Bookmark| {
for decoder in bm.decoders.iter().map(|item| item.trim().to_ascii_lowercase()) { for decoder in bm
.decoders
.iter()
.map(|item| item.trim().to_ascii_lowercase())
{
match decoder.as_str() { match decoder.as_str() {
"aprs" => want_aprs = true, "aprs" => want_aprs = true,
"hf-aprs" => want_hf_aprs = true, "hf-aprs" => want_hf_aprs = true,
@@ -707,7 +722,10 @@ async fn apply_scheduler_decoders(
for (label, cmd) in desired { for (label, cmd) in desired {
if let Err(e) = scheduler_send(rig_tx, cmd, rig_id.to_string()).await { if let Err(e) = scheduler_send(rig_tx, cmd, rig_id.to_string()).await {
warn!("scheduler: Set{label}DecodeEnabled failed for '{}': {:?}", rig_id, e); warn!(
"scheduler: Set{label}DecodeEnabled failed for '{}': {:?}",
rig_id, e
);
} }
} }
} }
@@ -931,7 +949,9 @@ pub async fn put_scheduler_control(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{timespan_active_entry, timespan_cycle_slot, timespan_active_entries, ScheduleEntry}; use super::{
timespan_active_entries, timespan_active_entry, timespan_cycle_slot, ScheduleEntry,
};
fn entry( fn entry(
id: &str, id: &str,
@@ -6,10 +6,10 @@
mod api; mod api;
#[path = "audio.rs"] #[path = "audio.rs"]
pub mod audio; pub mod audio;
#[path = "background_decode.rs"]
pub mod background_decode;
#[path = "auth.rs"] #[path = "auth.rs"]
pub mod auth; pub mod auth;
#[path = "background_decode.rs"]
pub mod background_decode;
#[path = "bookmarks.rs"] #[path = "bookmarks.rs"]
pub mod bookmarks; pub mod bookmarks;
#[path = "scheduler.rs"] #[path = "scheduler.rs"]
@@ -88,8 +88,7 @@ async fn serve(
); );
let background_decode_path = BackgroundDecodeStore::default_path(); let background_decode_path = BackgroundDecodeStore::default_path();
let background_decode_store = let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path));
Arc::new(BackgroundDecodeStore::open(&background_decode_path));
let vchan_mgr = Arc::new(ClientChannelManager::new(4)); let vchan_mgr = Arc::new(ClientChannelManager::new(4));
let background_decode_mgr = BackgroundDecodeManager::new( let background_decode_mgr = BackgroundDecodeManager::new(
background_decode_store, background_decode_store,
@@ -9,8 +9,7 @@ const CLIENT_BUILD_DATE: &str = env!("TRX_CLIENT_BUILD_DATE");
const INDEX_HTML: &str = include_str!("../assets/web/index.html"); const INDEX_HTML: &str = include_str!("../assets/web/index.html");
pub const STYLE_CSS: &str = include_str!("../assets/web/style.css"); pub const STYLE_CSS: &str = include_str!("../assets/web/style.css");
pub const APP_JS: &str = include_str!("../assets/web/app.js"); pub const APP_JS: &str = include_str!("../assets/web/app.js");
pub const DECODE_HISTORY_WORKER_JS: &str = pub const DECODE_HISTORY_WORKER_JS: &str = include_str!("../assets/web/decode-history-worker.js");
include_str!("../assets/web/decode-history-worker.js");
pub const WEBGL_RENDERER_JS: &str = include_str!("../assets/web/webgl-renderer.js"); pub const WEBGL_RENDERER_JS: &str = include_str!("../assets/web/webgl-renderer.js");
pub const LEAFLET_AIS_TRACKSYMBOL_JS: &str = pub const LEAFLET_AIS_TRACKSYMBOL_JS: &str =
include_str!("../assets/web/leaflet-ais-tracksymbol.js"); include_str!("../assets/web/leaflet-ais-tracksymbol.js");
@@ -25,8 +24,7 @@ pub const WSPR_JS: &str = include_str!("../assets/web/plugins/wspr.js");
pub const CW_JS: &str = include_str!("../assets/web/plugins/cw.js"); pub const CW_JS: &str = include_str!("../assets/web/plugins/cw.js");
pub const BOOKMARKS_JS: &str = include_str!("../assets/web/plugins/bookmarks.js"); pub const BOOKMARKS_JS: &str = include_str!("../assets/web/plugins/bookmarks.js");
pub const SCHEDULER_JS: &str = include_str!("../assets/web/plugins/scheduler.js"); pub const SCHEDULER_JS: &str = include_str!("../assets/web/plugins/scheduler.js");
pub const BACKGROUND_DECODE_JS: &str = pub const BACKGROUND_DECODE_JS: &str = include_str!("../assets/web/plugins/background-decode.js");
include_str!("../assets/web/plugins/background-decode.js");
pub const VCHAN_JS: &str = include_str!("../assets/web/plugins/vchan.js"); pub const VCHAN_JS: &str = include_str!("../assets/web/plugins/vchan.js");
pub fn index_html() -> String { pub fn index_html() -> String {
@@ -367,11 +367,7 @@ impl ClientChannelManager {
} }
/// Explicitly delete a channel by UUID (any session may do this). /// Explicitly delete a channel by UUID (any session may do this).
pub fn delete_channel( pub fn delete_channel(&self, rig_id: &str, channel_id: Uuid) -> Result<(), VChanClientError> {
&self,
rig_id: &str,
channel_id: Uuid,
) -> Result<(), VChanClientError> {
let mut rigs = self.rigs.write().unwrap(); let mut rigs = self.rigs.write().unwrap();
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?; let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
let pos = channels let pos = channels
@@ -450,7 +446,10 @@ impl ClientChannelManager {
ch.freq_hz = freq_hz; ch.freq_hz = freq_hz;
self.broadcast_change(rig_id, channels); self.broadcast_change(rig_id, channels);
drop(rigs); drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetFreq { uuid: channel_id, freq_hz }); self.send_audio_cmd(VChanAudioCmd::SetFreq {
uuid: channel_id,
freq_hz,
});
Ok(()) Ok(())
} }
@@ -469,7 +468,10 @@ impl ClientChannelManager {
ch.mode = mode.to_string(); ch.mode = mode.to_string();
self.broadcast_change(rig_id, channels); self.broadcast_change(rig_id, channels);
drop(rigs); drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetMode { uuid: channel_id, mode: mode.to_string() }); self.send_audio_cmd(VChanAudioCmd::SetMode {
uuid: channel_id,
mode: mode.to_string(),
});
Ok(()) Ok(())
} }
@@ -488,7 +490,10 @@ impl ClientChannelManager {
ch.bandwidth_hz = bandwidth_hz; ch.bandwidth_hz = bandwidth_hz;
self.broadcast_change(rig_id, channels); self.broadcast_change(rig_id, channels);
drop(rigs); drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetBandwidth { uuid: channel_id, bandwidth_hz }); self.send_audio_cmd(VChanAudioCmd::SetBandwidth {
uuid: channel_id,
bandwidth_hz,
});
Ok(()) Ok(())
} }
@@ -530,12 +535,14 @@ impl ClientChannelManager {
let mut changed = false; let mut changed = false;
let desired_map: HashMap<String, (u64, String, u32, Vec<String>)> = desired let desired_map: HashMap<String, (u64, String, u32, Vec<String>)> = desired
.iter() .iter()
.map(|(bookmark_id, freq_hz, mode, bandwidth_hz, decoder_kinds)| { .map(
|(bookmark_id, freq_hz, mode, bandwidth_hz, decoder_kinds)| {
( (
bookmark_id.clone(), bookmark_id.clone(),
(*freq_hz, mode.clone(), *bandwidth_hz, decoder_kinds.clone()), (*freq_hz, mode.clone(), *bandwidth_hz, decoder_kinds.clone()),
) )
}) },
)
.collect(); .collect();
let desired_ids: std::collections::HashSet<&str> = let desired_ids: std::collections::HashSet<&str> =
desired_map.keys().map(String::as_str).collect(); desired_map.keys().map(String::as_str).collect();
@@ -561,7 +568,8 @@ impl ClientChannelManager {
let Some(bookmark_id) = channel.scheduler_bookmark_id.as_deref() else { let Some(bookmark_id) = channel.scheduler_bookmark_id.as_deref() else {
continue; continue;
}; };
let Some((freq_hz, mode, bandwidth_hz, decoder_kinds)) = desired_map.get(bookmark_id) else { let Some((freq_hz, mode, bandwidth_hz, decoder_kinds)) = desired_map.get(bookmark_id)
else {
continue; continue;
}; };
if channel.freq_hz != *freq_hz { if channel.freq_hz != *freq_hz {
+4 -1
View File
@@ -118,7 +118,10 @@ pub async fn read_audio_msg<R: AsyncRead + Unpin>(
if len > limit { if len > limit {
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
format!("audio frame too large: {} bytes (type={:#04x})", len, msg_type), format!(
"audio frame too large: {} bytes (type={:#04x})",
len, msg_type
),
)); ));
} }
let mut payload = vec![0u8; len as usize]; let mut payload = vec![0u8; len as usize];
+7 -1
View File
@@ -127,7 +127,13 @@ pub async fn run_aprsfi_uplink(
// Pre-build the beacon packet (None if beaconing disabled or no coords). // Pre-build the beacon packet (None if beaconing disabled or no coords).
let beacon_packet: Option<String> = if cfg.beacon { let beacon_packet: Option<String> = if cfg.beacon {
match coords { match coords {
Some((lat, lon)) => Some(format_beacon(&callsign, lat, lon, cfg.beacon_symbol_table, cfg.beacon_symbol_code)), Some((lat, lon)) => Some(format_beacon(
&callsign,
lat,
lon,
cfg.beacon_symbol_table,
cfg.beacon_symbol_code,
)),
None => { None => {
warn!( warn!(
"APRS-IS IGate: beacon enabled but no coordinates available \ "APRS-IS IGate: beacon enabled but no coordinates available \
@@ -2,8 +2,8 @@
// //
// SPDX-License-Identifier: BSD-2-Clause // SPDX-License-Identifier: BSD-2-Clause
use num_complex::Complex;
use super::DcBlocker; use super::DcBlocker;
use num_complex::Complex;
/// C-QUAM (Compatible Quadrature AM) stereo demodulator. /// C-QUAM (Compatible Quadrature AM) stereo demodulator.
/// ///
@@ -57,8 +57,7 @@ impl CquamDemod {
self.carrier_im = alpha * self.carrier_im + one_minus_alpha * s.im; self.carrier_im = alpha * self.carrier_im + one_minus_alpha * s.im;
// Rotate s by −φ to phase-align I with (1 + m_s) and Q with m_d. // Rotate s by −φ to phase-align I with (1 + m_s) and Q with m_d.
let mag_sq = let mag_sq = self.carrier_re * self.carrier_re + self.carrier_im * self.carrier_im;
self.carrier_re * self.carrier_re + self.carrier_im * self.carrier_im;
let (i_corr, q_corr) = if mag_sq > 1e-8 { let (i_corr, q_corr) = if mag_sq > 1e-8 {
let inv = mag_sq.sqrt().recip(); let inv = mag_sq.sqrt().recip();
let cos_phi = self.carrier_re * inv; let cos_phi = self.carrier_re * inv;
@@ -94,7 +93,10 @@ mod tests {
let out = demod.demodulate_stereo(&samples); let out = demod.demodulate_stereo(&samples);
assert_eq!(out.len(), 512); assert_eq!(out.len(), 512);
for &s in &out { for &s in &out {
assert!(s.abs() < 1e-5, "silence should produce near-zero output, got {s}"); assert!(
s.abs() < 1e-5,
"silence should produce near-zero output, got {s}"
);
} }
} }
@@ -8,9 +8,7 @@ use num_complex::Complex;
/// 7th-order minimax atan approximation for |z| <= 1. /// 7th-order minimax atan approximation for |z| <= 1.
#[cfg(target_arch = "aarch64")] #[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")] #[target_feature(enable = "neon")]
unsafe fn atan_poly_neon( unsafe fn atan_poly_neon(z: std::arch::aarch64::float32x4_t) -> std::arch::aarch64::float32x4_t {
z: std::arch::aarch64::float32x4_t,
) -> std::arch::aarch64::float32x4_t {
use std::arch::aarch64::*; use std::arch::aarch64::*;
let c0 = vdupq_n_f32(0.999_999_5_f32); let c0 = vdupq_n_f32(0.999_999_5_f32);
let c1 = vdupq_n_f32(-0.333_326_1_f32); let c1 = vdupq_n_f32(-0.333_326_1_f32);
@@ -249,7 +249,10 @@ impl SdrPipeline {
channel_if_hz: f64, channel_if_hz: f64,
mode: &RigMode, mode: &RigMode,
bandwidth_hz: u32, bandwidth_hz: u32,
) -> (broadcast::Sender<Vec<f32>>, broadcast::Sender<Vec<Complex<f32>>>) { ) -> (
broadcast::Sender<Vec<f32>>,
broadcast::Sender<Vec<Complex<f32>>>,
) {
const PCM_BROADCAST_CAPACITY: usize = 32; const PCM_BROADCAST_CAPACITY: usize = 32;
const IQ_BROADCAST_CAPACITY: usize = 64; const IQ_BROADCAST_CAPACITY: usize = 64;
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(PCM_BROADCAST_CAPACITY); let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(PCM_BROADCAST_CAPACITY);
@@ -456,9 +459,7 @@ fn iq_read_loop(
// Hold a read lock only for the duration of this block's DSP pass. // Hold a read lock only for the duration of this block's DSP pass.
// Write lock (add/remove channel) waits at most one block (~2 ms). // Write lock (add/remove channel) waits at most one block (~2 ms).
{ {
let dsps = channel_dsps let dsps = channel_dsps.read().expect("channel_dsps RwLock poisoned");
.read()
.expect("channel_dsps RwLock poisoned");
for dsp_arc in dsps.iter() { for dsp_arc in dsps.iter() {
match dsp_arc.lock() { match dsp_arc.lock() {
Ok(mut dsp) => dsp.process_block(samples), Ok(mut dsp) => dsp.process_block(samples),
@@ -272,7 +272,12 @@ impl ChannelDsp {
} else { } else {
(cutoff_hz / self.sdr_sample_rate as f32).min(0.499) (cutoff_hz / self.sdr_sample_rate as f32).min(0.499)
}; };
self.lpf_iq = BlockFirFilterPair::new(cutoff_norm, ssb_shift_norm(&self.mode, cutoff_norm), auto_taps(cutoff_norm), IQ_BLOCK_SIZE); self.lpf_iq = BlockFirFilterPair::new(
cutoff_norm,
ssb_shift_norm(&self.mode, cutoff_norm),
auto_taps(cutoff_norm),
IQ_BLOCK_SIZE,
);
let rate_changed = self.decim_factor != next_decim_factor; let rate_changed = self.decim_factor != next_decim_factor;
self.decim_factor = next_decim_factor; self.decim_factor = next_decim_factor;
self.decim_counter = 0; self.decim_counter = 0;
@@ -352,7 +357,12 @@ impl ChannelDsp {
channel_if_hz, channel_if_hz,
demodulator: Demodulator::for_mode(mode), demodulator: Demodulator::for_mode(mode),
mode: mode.clone(), mode: mode.clone(),
lpf_iq: BlockFirFilterPair::new(cutoff_norm, ssb_shift_norm(mode, cutoff_norm), auto_taps(cutoff_norm), IQ_BLOCK_SIZE), lpf_iq: BlockFirFilterPair::new(
cutoff_norm,
ssb_shift_norm(mode, cutoff_norm),
auto_taps(cutoff_norm),
IQ_BLOCK_SIZE,
),
sdr_sample_rate, sdr_sample_rate,
audio_sample_rate, audio_sample_rate,
audio_bandwidth_hz, audio_bandwidth_hz,
@@ -109,7 +109,12 @@ type FirKernel = (
/// Setting `shift_norm = +cutoff_norm` produces a one-sided USB filter /// Setting `shift_norm = +cutoff_norm` produces a one-sided USB filter
/// `[0, BW]`; `shift_norm = -cutoff_norm` produces a one-sided LSB filter /// `[0, BW]`; `shift_norm = -cutoff_norm` produces a one-sided LSB filter
/// `[-BW, 0]`; `shift_norm = 0` leaves the kernel symmetric (AM/FM/WFM). /// `[-BW, 0]`; `shift_norm = 0` leaves the kernel symmetric (AM/FM/WFM).
fn build_fir_kernel(cutoff_norm: f32, shift_norm: f32, taps: usize, block_size: usize) -> FirKernel { fn build_fir_kernel(
cutoff_norm: f32,
shift_norm: f32,
taps: usize,
block_size: usize,
) -> FirKernel {
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps); let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
let fft_size = (block_size + taps - 1).next_power_of_two(); let fft_size = (block_size + taps - 1).next_power_of_two();
@@ -210,8 +215,14 @@ unsafe fn mul_freq_domain_neon(
let (h_re, h_im) = (h_ri.0, h_ri.1); let (h_re, h_im) = (h_ri.0, h_ri.1);
// Complex multiply: out.re = x.re*h.re - x.im*h.im, out.im = x.re*h.im + x.im*h.re // Complex multiply: out.re = x.re*h.re - x.im*h.im, out.im = x.re*h.im + x.im*h.re
let out_re = vmulq_f32(vsubq_f32(vmulq_f32(x_re, h_re), vmulq_f32(x_im, h_im)), scale_v); let out_re = vmulq_f32(
let out_im = vmulq_f32(vaddq_f32(vmulq_f32(x_re, h_im), vmulq_f32(x_im, h_re)), scale_v); vsubq_f32(vmulq_f32(x_re, h_re), vmulq_f32(x_im, h_im)),
scale_v,
);
let out_im = vmulq_f32(
vaddq_f32(vmulq_f32(x_re, h_im), vmulq_f32(x_im, h_re)),
scale_v,
);
// Reinterleave: .0 = [re0,im0,re1,im1], .1 = [re2,im2,re3,im3] // Reinterleave: .0 = [re0,im0,re1,im1], .1 = [re2,im2,re3,im3]
let out = vzipq_f32(out_re, out_im); let out = vzipq_f32(out_re, out_im);
@@ -313,7 +324,8 @@ impl BlockFirFilterPair {
/// `-cutoff_norm` for LSB/CWR. /// `-cutoff_norm` for LSB/CWR.
pub fn new(cutoff_norm: f32, shift_norm: f32, taps: usize, block_size: usize) -> Self { pub fn new(cutoff_norm: f32, shift_norm: f32, taps: usize, block_size: usize) -> Self {
let taps = taps.max(1); let taps = taps.max(1);
let (h_buf, fft_size, fft, ifft) = build_fir_kernel(cutoff_norm, shift_norm, taps, block_size); let (h_buf, fft_size, fft, ifft) =
build_fir_kernel(cutoff_norm, shift_norm, taps, block_size);
Self { Self {
h_freq: h_buf, h_freq: h_buf,
overlap: vec![FftComplex::new(0.0, 0.0); taps.saturating_sub(1)], overlap: vec![FftComplex::new(0.0, 0.0); taps.saturating_sub(1)],
@@ -7,10 +7,10 @@ pub mod dsp;
pub mod real_iq_source; pub mod real_iq_source;
pub mod vchan_impl; pub mod vchan_impl;
use dsp::IqSource as _;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use dsp::IqSource as _;
use trx_core::radio::freq::{Band, Freq}; use trx_core::radio::freq::{Band, Freq};
use trx_core::rig::response::RigError; use trx_core::rig::response::RigError;
use trx_core::rig::state::{RigFilterState, SpectrumData, VchanRdsEntry, WfmDenoiseLevel}; use trx_core::rig::state::{RigFilterState, SpectrumData, VchanRdsEntry, WfmDenoiseLevel};
@@ -257,10 +257,7 @@ impl SoapySdrRig {
}; };
// Initialise filter state from primary channel config (index 0), or defaults. // Initialise filter state from primary channel config (index 0), or defaults.
let bandwidth_hz = channels let bandwidth_hz = channels.first().map(|&(_, _, bw)| bw).unwrap_or(3000);
.first()
.map(|&(_, _, bw)| bw)
.unwrap_or(3000);
let spectrum_buf = pipeline.spectrum_buf.clone(); let spectrum_buf = pipeline.spectrum_buf.clone();
let retune_cmd = pipeline.retune_cmd.clone(); let retune_cmd = pipeline.retune_cmd.clone();
@@ -359,10 +356,7 @@ impl SoapySdrRig {
let dsps = self.pipeline.channel_dsps.read().unwrap(); let dsps = self.pipeline.channel_dsps.read().unwrap();
for idx in [ais_a_idx, ais_b_idx] { for idx in [ais_a_idx, ais_b_idx] {
if let Some(dsp_arc) = dsps.get(idx) { if let Some(dsp_arc) = dsps.get(idx) {
dsp_arc dsp_arc.lock().unwrap().set_filter(self.bandwidth_hz);
.lock()
.unwrap()
.set_filter(self.bandwidth_hz);
} }
} }
} }
@@ -749,10 +743,7 @@ impl RigCat for SoapySdrRig {
{ {
let dsps = self.pipeline.channel_dsps.read().unwrap(); let dsps = self.pipeline.channel_dsps.read().unwrap();
if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) {
dsp_arc dsp_arc.lock().unwrap().set_filter(bandwidth_hz);
.lock()
.unwrap()
.set_filter(bandwidth_hz);
} }
} }
self.apply_ais_channel_filters(); self.apply_ais_channel_filters();
@@ -100,11 +100,7 @@ impl SdrVirtualChannelManager {
/// - `fixed_slot_count`: number of fixed pipeline slots (primary + AIS), /// - `fixed_slot_count`: number of fixed pipeline slots (primary + AIS),
/// i.e. the index of the first slot available for virtual channels. /// i.e. the index of the first slot available for virtual channels.
/// - `max_total`: maximum total channels including primary (e.g. 4). /// - `max_total`: maximum total channels including primary (e.g. 4).
pub fn new( pub fn new(pipeline: Arc<SdrPipeline>, fixed_slot_count: usize, max_total: usize) -> Self {
pipeline: Arc<SdrPipeline>,
fixed_slot_count: usize,
max_total: usize,
) -> Self {
// Seed the channel list with a synthetic primary-channel entry. // Seed the channel list with a synthetic primary-channel entry.
// We use the first PCM sender from the pipeline (index 0). // We use the first PCM sender from the pipeline (index 0).
let primary_pcm_tx = pipeline let primary_pcm_tx = pipeline
@@ -177,8 +173,8 @@ impl SdrVirtualChannelManager {
} }
let bandwidth_hz = default_bandwidth_hz(mode); let bandwidth_hz = default_bandwidth_hz(mode);
let (pcm_tx, iq_tx) = let (pcm_tx, iq_tx) = self
self.pipeline .pipeline
.add_virtual_channel(if_hz as f64, mode, bandwidth_hz); .add_virtual_channel(if_hz as f64, mode, bandwidth_hz);
let pipeline_slot = self let pipeline_slot = self