From b533d704a1a3d5f369b6029e07e018e4c049a775 Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Tue, 17 Mar 2026 22:36:11 +0100 Subject: [PATCH] [style](trx-rs): reformat codebase Co-Authored-By: Claude Opus 4.6 Signed-off-by: Stanislaw Grams --- src/decoders/trx-wspr/src/decoder.rs | 6 +- src/trx-client/src/audio_client.rs | 16 ++- src/trx-client/src/config.rs | 16 +-- src/trx-client/src/main.rs | 121 +++++++++--------- src/trx-client/src/remote_client.rs | 45 +++---- src/trx-client/trx-frontend/src/lib.rs | 8 +- .../trx-frontend/trx-frontend-http/src/api.rs | 117 +++++++++++------ .../trx-frontend-http/src/audio.rs | 25 +++- .../src/background_decode.rs | 42 +++--- .../trx-frontend-http/src/scheduler.rs | 54 +++++--- .../trx-frontend-http/src/server.rs | 7 +- .../trx-frontend-http/src/status.rs | 6 +- .../trx-frontend-http/src/vchan.rs | 38 +++--- src/trx-core/src/audio.rs | 5 +- src/trx-reporting/src/aprsfi.rs | 8 +- .../trx-backend-soapysdr/src/demod/amcquam.rs | 10 +- .../src/demod/math_arm.rs | 4 +- .../trx-backend-soapysdr/src/dsp.rs | 9 +- .../trx-backend-soapysdr/src/dsp/channel.rs | 14 +- .../trx-backend-soapysdr/src/dsp/filter.rs | 20 ++- .../trx-backend-soapysdr/src/lib.rs | 17 +-- .../trx-backend-soapysdr/src/vchan_impl.rs | 12 +- 22 files changed, 348 insertions(+), 252 deletions(-) diff --git a/src/decoders/trx-wspr/src/decoder.rs b/src/decoders/trx-wspr/src/decoder.rs index 2535c5e..6cfc997 100644 --- a/src/decoders/trx-wspr/src/decoder.rs +++ b/src/decoders/trx-wspr/src/decoder.rs @@ -166,11 +166,7 @@ fn sync_correlation_score(signal: &[f32], base_hz: f32) -> f32 { base_hz + 2.0 * TONE_SPACING_HZ, WSPR_SAMPLE_RATE as f32, ); - let p1 = goertzel_power( - frame, - base_hz + TONE_SPACING_HZ, - WSPR_SAMPLE_RATE as f32, - ); + let p1 = goertzel_power(frame, base_hz + TONE_SPACING_HZ, WSPR_SAMPLE_RATE as f32); let p3 = goertzel_power( frame, base_hz + 3.0 * TONE_SPACING_HZ, diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index ff5141a..b2a8d06 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -25,11 +25,11 @@ use trx_core::audio::{ parse_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE, - AUDIO_MSG_HF_APRS_DECODE, - AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH, - AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_BW, - AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, - AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, + AUDIO_MSG_RX_FRAME_CH, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, + AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, + AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, + AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::DecodedMessage; use trx_frontend::VChanAudioCmd; @@ -195,7 +195,8 @@ async fn handle_audio_connection( } // Re-apply non-default bandwidth after re-subscribing. if sub.bandwidth_hz > 0 { - let bw_json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz }); + let bw_json = + serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz }); if let Ok(payload) = serde_json::to_vec(&bw_json) { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await { warn!("Audio vchan reconnect BW write failed: {}", e); @@ -209,7 +210,8 @@ async fn handle_audio_connection( // Spawn RX read task let rx_tx = rx_tx.clone(); let decode_tx = decode_tx.clone(); - let vchan_audio_rx: Arc>>> = Arc::clone(vchan_audio); + let vchan_audio_rx: Arc>>> = + Arc::clone(vchan_audio); let vchan_destroyed_for_rx = vchan_destroyed_tx.clone(); let mut rx_handle = tokio::spawn(async move { loop { diff --git a/src/trx-client/src/config.rs b/src/trx-client/src/config.rs index 8802642..3a8c2a8 100644 --- a/src/trx-client/src/config.rs +++ b/src/trx-client/src/config.rs @@ -395,9 +395,7 @@ impl ClientConfig { ); } if self.frontends.http.decode_history_retention_min == 0 { - return Err( - "[frontends.http].decode_history_retention_min must be > 0".to_string(), - ); + return Err("[frontends.http].decode_history_retention_min must be > 0".to_string()); } for (rig_id, minutes) in &self.frontends.http.decode_history_retention_min_by_rig { if rig_id.trim().is_empty() { @@ -616,13 +614,11 @@ mod tests { assert_eq!(config.frontends.http.spectrum_coverage_margin_hz, 50_000); assert_eq!(config.frontends.http.spectrum_usable_span_ratio, 0.92); assert_eq!(config.frontends.http.decode_history_retention_min, 1440); - assert!( - config - .frontends - .http - .decode_history_retention_min_by_rig - .is_empty() - ); + assert!(config + .frontends + .http + .decode_history_retention_min_by_rig + .is_empty()); assert_eq!(config.frontends.rigctl.port, 4532); assert!(config.frontends.http_json.enabled); assert_eq!(config.frontends.http_json.port, 0); diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 67c11ff..841015d 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -185,8 +185,11 @@ async fn async_init() -> DynResult { cfg.frontends.http.spectrum_usable_span_ratio; frontend_runtime.http_decode_history_retention_min = cfg.frontends.http.decode_history_retention_min; - frontend_runtime.http_decode_history_retention_min_by_rig = - cfg.frontends.http.decode_history_retention_min_by_rig.clone(); + frontend_runtime.http_decode_history_retention_min_by_rig = cfg + .frontends + .http + .decode_history_retention_min_by_rig + .clone(); // Resolve remote URL: CLI > config [remote] section > error let remote_url = cli @@ -305,8 +308,7 @@ async fn async_init() -> DynResult { frontend_runtime.decode_rx = Some(decode_tx.clone()); // Virtual-channel audio: shared broadcaster map + command channel. - let (vchan_cmd_tx, vchan_cmd_rx) = - mpsc::unbounded_channel::(); + let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::unbounded_channel::(); *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); let (vchan_destroyed_tx, _) = broadcast::channel::(64); @@ -318,65 +320,64 @@ async fn async_init() -> DynResult { let cw_history = frontend_runtime.cw_history.clone(); let ft8_history = frontend_runtime.ft8_history.clone(); let wspr_history = frontend_runtime.wspr_history.clone(); - let replay_history_sink: Arc = - Arc::new(move |msg| { - let now = std::time::Instant::now(); - match msg { - DecodedMessage::Ais(mut message) => { - if message.ts_ms.is_none() { - message.ts_ms = Some(current_timestamp_ms()); - } - if let Ok(mut history) = ais_history.lock() { - history.push_back((now, message)); - } + let replay_history_sink: Arc = Arc::new(move |msg| { + let now = std::time::Instant::now(); + match msg { + DecodedMessage::Ais(mut message) => { + if message.ts_ms.is_none() { + message.ts_ms = Some(current_timestamp_ms()); } - DecodedMessage::Vdes(mut message) => { - if message.ts_ms.is_none() { - message.ts_ms = Some(current_timestamp_ms()); - } - if let Ok(mut history) = vdes_history.lock() { - history.push_back((now, message)); - } - } - DecodedMessage::Aprs(mut packet) => { - if packet.ts_ms.is_none() { - packet.ts_ms = Some(current_timestamp_ms()); - } - if let Ok(mut history) = aprs_history.lock() { - history.push_back((now, packet)); - } - } - DecodedMessage::HfAprs(mut packet) => { - if packet.ts_ms.is_none() { - packet.ts_ms = Some(current_timestamp_ms()); - } - if let Ok(mut history) = hf_aprs_history.lock() { - history.push_back((now, packet)); - } - } - DecodedMessage::Cw(event) => { - if let Ok(mut history) = cw_history.lock() { - history.push_back((now, event)); - } - } - DecodedMessage::Ft8(message) => { - if let Ok(mut history) = ft8_history.lock() { - history.push_back((now, message)); - } - } - DecodedMessage::Ft4(_) => { - // FT4 history is managed by the frontend HTTP audio collector - } - DecodedMessage::Ft2(_) => { - // FT2 history is managed by the frontend HTTP audio collector - } - DecodedMessage::Wspr(message) => { - if let Ok(mut history) = wspr_history.lock() { - history.push_back((now, message)); - } + if let Ok(mut history) = ais_history.lock() { + history.push_back((now, message)); } } - }); + DecodedMessage::Vdes(mut message) => { + if message.ts_ms.is_none() { + message.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = vdes_history.lock() { + history.push_back((now, message)); + } + } + DecodedMessage::Aprs(mut packet) => { + if packet.ts_ms.is_none() { + packet.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = aprs_history.lock() { + history.push_back((now, packet)); + } + } + DecodedMessage::HfAprs(mut packet) => { + if packet.ts_ms.is_none() { + packet.ts_ms = Some(current_timestamp_ms()); + } + if let Ok(mut history) = hf_aprs_history.lock() { + history.push_back((now, packet)); + } + } + DecodedMessage::Cw(event) => { + if let Ok(mut history) = cw_history.lock() { + history.push_back((now, event)); + } + } + DecodedMessage::Ft8(message) => { + if let Ok(mut history) = ft8_history.lock() { + history.push_back((now, message)); + } + } + DecodedMessage::Ft4(_) => { + // FT4 history is managed by the frontend HTTP audio collector + } + DecodedMessage::Ft2(_) => { + // FT2 history is managed by the frontend HTTP audio collector + } + DecodedMessage::Wspr(message) => { + if let Ok(mut history) = wspr_history.lock() { + history.push_back((now, message)); + } + } + } + }); info!( "Audio enabled: default port {}, decode channel set", diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index cfe08f5..9419ce8 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -68,10 +68,7 @@ pub async fn run_remote_client( ) -> RigResult<()> { // Spectrum polling runs on its own dedicated TCP connection so it never // blocks state polls or user commands on the main connection. - let spectrum_task = tokio::spawn(run_spectrum_connection( - config.clone(), - shutdown_rx.clone(), - )); + let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone())); let mut reconnect_delay = Duration::from_secs(1); @@ -147,8 +144,7 @@ async fn run_spectrum_connection( if let Err(e) = stream.set_nodelay(true) { warn!("Spectrum TCP_NODELAY failed: {}", e); } - if let Err(e) = - handle_spectrum_connection(&config, stream, &mut shutdown_rx).await + if let Err(e) = handle_spectrum_connection(&config, stream, &mut shutdown_rx).await { warn!("Spectrum connection dropped: {}", e); } @@ -301,13 +297,10 @@ async fn send_command( .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; payload.push('\n'); - time::timeout( - IO_TIMEOUT, - writer.write_all(payload.as_bytes()), - ) - .await - .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? - .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; time::timeout(IO_TIMEOUT, writer.flush()) .await .map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))? @@ -347,15 +340,12 @@ async fn send_command_no_state_update( let mut payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; payload.push('\n'); - time::timeout( - SPECTRUM_IO_TIMEOUT, - writer.write_all(payload.as_bytes()), - ) - .await - .map_err(|_| { - RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)) - })? - .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + time::timeout(SPECTRUM_IO_TIMEOUT, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| { + RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)) + })? + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush()) .await .map_err(|_| { @@ -443,13 +433,10 @@ async fn send_get_rigs( .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; payload.push('\n'); - time::timeout( - IO_TIMEOUT, - writer.write_all(payload.as_bytes()), - ) - .await - .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? - .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; time::timeout(IO_TIMEOUT, writer.flush()) .await .map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))? diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index 01536fd..acfabc6 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -3,9 +3,9 @@ // SPDX-License-Identifier: BSD-2-Clause use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::RwLock; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::RwLock; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -95,7 +95,11 @@ pub struct SharedSpectrum { impl SharedSpectrum { /// Replace the stored frame, pre-serialising RDS in one pass. - pub fn set(&mut self, frame: Option, vchan_rds: Option>) { + pub fn set( + &mut self, + frame: Option, + vchan_rds: Option>, + ) { self.rds_json = frame .as_ref() .and_then(|f| f.rds.as_ref()) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 66336c3..6767462 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -47,8 +47,16 @@ fn base64_encode(data: &[u8]) -> String { let n = (b0 << 16) | (b1 << 8) | b2; out.push(T[((n >> 18) & 63) as usize]); out.push(T[((n >> 12) & 63) as usize]); - out.push(if chunk.len() > 1 { T[((n >> 6) & 63) as usize] } else { b'=' }); - out.push(if chunk.len() > 2 { T[(n & 63) as usize] } else { b'=' }); + out.push(if chunk.len() > 1 { + T[((n >> 6) & 63) as usize] + } else { + b'=' + }); + out.push(if chunk.len() > 2 { + T[(n & 63) as usize] + } else { + b'=' + }); } // SAFETY: output contains only ASCII base64 characters. unsafe { String::from_utf8_unchecked(out) } @@ -120,23 +128,53 @@ fn inject_frontend_meta(json: &str, meta: FrontendMeta) -> String { // Build only the extra key-value pairs as a JSON fragment. let mut extra = serde_json::Map::new(); extra.insert("clients".into(), serde_json::json!(meta.http_clients)); - extra.insert("rigctl_clients".into(), serde_json::json!(meta.rigctl_clients)); - if let Some(v) = meta.rigctl_addr { extra.insert("rigctl_addr".into(), serde_json::json!(v)); } - if let Some(v) = meta.active_rig_id { extra.insert("active_rig_id".into(), serde_json::json!(v)); } + extra.insert( + "rigctl_clients".into(), + serde_json::json!(meta.rigctl_clients), + ); + if let Some(v) = meta.rigctl_addr { + extra.insert("rigctl_addr".into(), serde_json::json!(v)); + } + if let Some(v) = meta.active_rig_id { + extra.insert("active_rig_id".into(), serde_json::json!(v)); + } extra.insert("rig_ids".into(), serde_json::json!(meta.rig_ids)); - if let Some(v) = meta.owner_callsign { extra.insert("owner_callsign".into(), serde_json::json!(v)); } - if let Some(v) = meta.owner_website_url { extra.insert("owner_website_url".into(), serde_json::json!(v)); } - if let Some(v) = meta.owner_website_name { extra.insert("owner_website_name".into(), serde_json::json!(v)); } - if let Some(v) = meta.ais_vessel_url_base { extra.insert("ais_vessel_url_base".into(), serde_json::json!(v)); } - extra.insert("show_sdr_gain_control".into(), serde_json::json!(meta.show_sdr_gain_control)); - extra.insert("initial_map_zoom".into(), serde_json::json!(meta.initial_map_zoom)); - extra.insert("spectrum_coverage_margin_hz".into(), serde_json::json!(meta.spectrum_coverage_margin_hz)); - extra.insert("spectrum_usable_span_ratio".into(), serde_json::json!(meta.spectrum_usable_span_ratio)); + if let Some(v) = meta.owner_callsign { + extra.insert("owner_callsign".into(), serde_json::json!(v)); + } + if let Some(v) = meta.owner_website_url { + extra.insert("owner_website_url".into(), serde_json::json!(v)); + } + if let Some(v) = meta.owner_website_name { + extra.insert("owner_website_name".into(), serde_json::json!(v)); + } + if let Some(v) = meta.ais_vessel_url_base { + extra.insert("ais_vessel_url_base".into(), serde_json::json!(v)); + } + extra.insert( + "show_sdr_gain_control".into(), + serde_json::json!(meta.show_sdr_gain_control), + ); + extra.insert( + "initial_map_zoom".into(), + serde_json::json!(meta.initial_map_zoom), + ); + extra.insert( + "spectrum_coverage_margin_hz".into(), + serde_json::json!(meta.spectrum_coverage_margin_hz), + ); + extra.insert( + "spectrum_usable_span_ratio".into(), + serde_json::json!(meta.spectrum_usable_span_ratio), + ); extra.insert( "decode_history_retention_min".into(), serde_json::json!(meta.decode_history_retention_min), ); - extra.insert("server_connected".into(), serde_json::json!(meta.server_connected)); + extra.insert( + "server_connected".into(), + serde_json::json!(meta.server_connected), + ); // Serialize the extra map, strip its outer braces, and splice in. let extra_json = match serde_json::to_string(&extra) { @@ -328,9 +366,7 @@ pub async fn events( let scheduler_control = scheduler_control_updates.clone(); async move { state.snapshot().and_then(|v| { - if let Ok(Some(rig_id)) = - context.remote_active_rig_id.lock().map(|g| g.clone()) - { + if let Ok(Some(rig_id)) = context.remote_active_rig_id.lock().map(|g| g.clone()) { vchan.update_primary( &rig_id, v.status.freq.hz, @@ -367,9 +403,8 @@ pub async fn events( if let Some(colon) = msg.find(':') { let rig_id = &msg[..colon]; let channels_json = &msg[colon + 1..]; - let payload = format!( - "{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}" - ); + let payload = + format!("{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}"); return Some(( Ok::(Bytes::from(format!( "event: channels\ndata: {payload}\n\n" @@ -573,9 +608,7 @@ fn gzip_bytes(payload: &[u8]) -> std::io::Result> { /// not block real-time messages: the client fetches this endpoint in parallel /// with opening the SSE connection and drains it in the background. #[get("/decode/history")] -pub async fn decode_history( - context: web::Data>, -) -> impl Responder { +pub async fn decode_history(context: web::Data>) -> impl Responder { if context.decode_rx.is_none() { return HttpResponse::NotFound().body("decode not enabled"); } @@ -1414,9 +1447,7 @@ pub async fn delete_channel_route( let (rig_id, channel_id) = path.into_inner(); match vchan_mgr.delete_channel(&rig_id, channel_id) { Ok(()) => HttpResponse::Ok().finish(), - Err(crate::server::vchan::VChanClientError::NotFound) => { - HttpResponse::NotFound().finish() - } + Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(), Err(crate::server::vchan::VChanClientError::Permanent) => { HttpResponse::BadRequest().body("cannot remove the primary channel") } @@ -1476,9 +1507,7 @@ pub async fn set_vchan_freq( let (rig_id, channel_id) = path.into_inner(); match vchan_mgr.set_channel_freq(&rig_id, channel_id, body.freq_hz) { Ok(()) => HttpResponse::Ok().finish(), - Err(crate::server::vchan::VChanClientError::NotFound) => { - HttpResponse::NotFound().finish() - } + Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(), Err(e) => HttpResponse::BadRequest().body(e.to_string()), } } @@ -1497,9 +1526,7 @@ pub async fn set_vchan_bw( let (rig_id, channel_id) = path.into_inner(); match vchan_mgr.set_channel_bandwidth(&rig_id, channel_id, body.bandwidth_hz) { Ok(()) => HttpResponse::Ok().finish(), - Err(crate::server::vchan::VChanClientError::NotFound) => { - HttpResponse::NotFound().finish() - } + Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(), Err(e) => HttpResponse::BadRequest().body(e.to_string()), } } @@ -1518,9 +1545,7 @@ pub async fn set_vchan_mode( let (rig_id, channel_id) = path.into_inner(); match vchan_mgr.set_channel_mode(&rig_id, channel_id, &body.mode) { Ok(()) => HttpResponse::Ok().finish(), - Err(crate::server::vchan::VChanClientError::NotFound) => { - HttpResponse::NotFound().finish() - } + Err(crate::server::vchan::VChanClientError::NotFound) => HttpResponse::NotFound().finish(), Err(e) => HttpResponse::BadRequest().body(e.to_string()), } } @@ -1783,14 +1808,20 @@ async fn ft8_js() -> impl Responder { #[get("/ft4.js")] async fn ft4_js() -> impl Responder { HttpResponse::Ok() - .insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8")) + .insert_header(( + header::CONTENT_TYPE, + "application/javascript; charset=utf-8", + )) .body(status::FT4_JS) } #[get("/ft2.js")] async fn ft2_js() -> impl Responder { HttpResponse::Ok() - .insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8")) + .insert_header(( + header::CONTENT_TYPE, + "application/javascript; charset=utf-8", + )) .body(status::FT2_JS) } @@ -1951,7 +1982,14 @@ fn bookmark_decoder_state( } } - (want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) + ( + want_aprs, + want_hf_aprs, + want_ft8, + want_ft4, + want_ft2, + want_wspr, + ) } fn bookmark_decoder_kinds(bookmark: &crate::server::bookmarks::Bookmark) -> Vec { @@ -2018,7 +2056,8 @@ async fn apply_selected_channel( let Some(bookmark) = bookmark_store.get(bookmark_id) else { return Ok(()); }; - let (want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) = bookmark_decoder_state(&bookmark); + let (want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) = + bookmark_decoder_state(&bookmark); let desired = [ RigCommand::SetAprsDecodeEnabled(want_aprs), RigCommand::SetHfAprsDecodeEnabled(want_hf_aprs), diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs index e28d34a..39999b6 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs @@ -57,7 +57,10 @@ fn decode_history_cutoff(context: &FrontendRuntimeContext) -> Instant { Instant::now() - decode_history_retention(context) } -fn prune_aprs_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, AprsPacket)>) { +fn prune_aprs_history( + context: &FrontendRuntimeContext, + history: &mut VecDeque<(Instant, AprsPacket)>, +) { let cutoff = decode_history_cutoff(context); while let Some((ts, _)) = history.front() { if *ts >= cutoff { @@ -80,7 +83,10 @@ fn prune_hf_aprs_history( } } -fn prune_ais_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, AisMessage)>) { +fn prune_ais_history( + context: &FrontendRuntimeContext, + history: &mut VecDeque<(Instant, AisMessage)>, +) { let cutoff = decode_history_cutoff(context); while let Some((ts, _)) = history.front() { if *ts >= cutoff { @@ -137,7 +143,10 @@ fn prune_cw_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(In } } -fn prune_ft8_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Ft8Message)>) { +fn prune_ft8_history( + context: &FrontendRuntimeContext, + history: &mut VecDeque<(Instant, Ft8Message)>, +) { let cutoff = decode_history_cutoff(context); while let Some((ts, _)) = history.front() { if *ts >= cutoff { @@ -147,7 +156,10 @@ fn prune_ft8_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(I } } -fn prune_ft4_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Ft8Message)>) { +fn prune_ft4_history( + context: &FrontendRuntimeContext, + history: &mut VecDeque<(Instant, Ft8Message)>, +) { let cutoff = decode_history_cutoff(context); while let Some((ts, _)) = history.front() { if *ts >= cutoff { @@ -157,7 +169,10 @@ fn prune_ft4_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(I } } -fn prune_ft2_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Ft8Message)>) { +fn prune_ft2_history( + context: &FrontendRuntimeContext, + history: &mut VecDeque<(Instant, Ft8Message)>, +) { let cutoff = decode_history_cutoff(context); while let Some((ts, _)) = history.front() { if *ts >= cutoff { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs index 21427fc..c0a0def 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs @@ -85,12 +85,24 @@ impl BackgroundDecodeStore { let _ = std::fs::create_dir_all(parent); } let db = if path.exists() { - PickleDb::load(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) - .unwrap_or_else(|_| { - PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) - }) + PickleDb::load( + path, + PickleDbDumpPolicy::AutoDump, + SerializationMethod::Json, + ) + .unwrap_or_else(|_| { + PickleDb::new( + path, + PickleDbDumpPolicy::AutoDump, + SerializationMethod::Json, + ) + }) } else { - PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) + PickleDb::new( + path, + PickleDbDumpPolicy::AutoDump, + SerializationMethod::Json, + ) }; Self { db: Arc::new(RwLock::new(db)), @@ -160,11 +172,13 @@ impl BackgroundDecodeManager { } pub fn get_config(&self, rig_id: &str) -> BackgroundDecodeConfig { - self.store.get(rig_id).unwrap_or_else(|| BackgroundDecodeConfig { - rig_id: rig_id.to_string(), - enabled: false, - bookmark_ids: Vec::new(), - }) + self.store + .get(rig_id) + .unwrap_or_else(|| BackgroundDecodeConfig { + rig_id: rig_id.to_string(), + enabled: false, + bookmark_ids: Vec::new(), + }) } pub fn put_config(&self, mut config: BackgroundDecodeConfig) -> Option { @@ -268,10 +282,7 @@ impl BackgroundDecodeManager { bookmark_id: bookmark.id.clone(), freq_hz: bookmark.freq_hz, mode: bookmark.mode.clone(), - bandwidth_hz: bookmark - .bandwidth_hz - .unwrap_or(0) - .min(u32::MAX as u64) as u32, + bandwidth_hz: bookmark.bandwidth_hz.unwrap_or(0).min(u32::MAX as u64) as u32, decoder_kinds, } } @@ -565,7 +576,8 @@ fn bookmark_supported_decoder_kinds(bookmark: &Bookmark) -> Vec { } fn channel_matches_bookmark(channel: &ClientChannel, bookmark: &Bookmark) -> bool { - channel.freq_hz == bookmark.freq_hz && normalized_mode(&channel.mode) == normalized_mode(&bookmark.mode) + channel.freq_hz == bookmark.freq_hz + && normalized_mode(&channel.mode) == normalized_mode(&bookmark.mode) } fn normalized_mode(mode: &str) -> String { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs index 58b0cee..8832712 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/scheduler.rs @@ -117,12 +117,24 @@ impl SchedulerStore { let _ = std::fs::create_dir_all(parent); } let db = if path.exists() { - PickleDb::load(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) - .unwrap_or_else(|_| { - PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) - }) + PickleDb::load( + path, + PickleDbDumpPolicy::AutoDump, + SerializationMethod::Json, + ) + .unwrap_or_else(|_| { + PickleDb::new( + path, + PickleDbDumpPolicy::AutoDump, + SerializationMethod::Json, + ) + }) } else { - PickleDb::new(path, PickleDbDumpPolicy::AutoDump, SerializationMethod::Json) + PickleDb::new( + path, + PickleDbDumpPolicy::AutoDump, + SerializationMethod::Json, + ) }; Self { db: Arc::new(RwLock::new(db)), @@ -206,10 +218,8 @@ fn sunrise_sunset_today(lat_deg: f64, lon_deg: f64) -> Option<(f64, f64)> { let lambda = sun_lon - 0.00569 - 0.00478 * omega.to_radians().sin(); // Obliquity of the ecliptic. - let eps0 = 23.0 - + (26.0 - + (21.448 - jc * (46.8150 + jc * (0.00059 - jc * 0.001813))) / 60.0) - / 60.0; + let eps0 = + 23.0 + (26.0 + (21.448 - jc * (46.8150 + jc * (0.00059 - jc * 0.001813))) / 60.0) / 60.0; let eps = eps0 + 0.00256 * omega.to_radians().cos(); // Sun's declination. @@ -219,8 +229,7 @@ fn sunrise_sunset_today(lat_deg: f64, lon_deg: f64) -> Option<(f64, f64)> { let y = (eps.to_radians() / 2.0).tan().powi(2); let l0_rad = l0.to_radians(); let eot = 4.0 - * (y * (2.0 * l0_rad).sin() - - 2.0 * m_rad.sin() + * (y * (2.0 * l0_rad).sin() - 2.0 * m_rad.sin() + 4.0 * y * m_rad.sin() * (2.0 * l0_rad).cos() - 0.5 * y * y * (4.0 * l0_rad).sin() - 1.25 * (2.0 * m_rad).sin()) @@ -228,8 +237,7 @@ fn sunrise_sunset_today(lat_deg: f64, lon_deg: f64) -> Option<(f64, f64)> { // Hour angle for sunrise/sunset (zenith = 90.833°). let lat_rad = lat_deg.to_radians(); - let cos_ha = ((PI / 2.0 + 0.833_f64.to_radians()).cos()) - / (lat_rad.cos() * decl.cos()) + let cos_ha = ((PI / 2.0 + 0.833_f64.to_radians()).cos()) / (lat_rad.cos() * decl.cos()) - lat_rad.tan() * decl.tan(); if !(-1.0..=1.0).contains(&cos_ha) { @@ -654,7 +662,10 @@ pub fn spawn_scheduler_task( ) .await { - warn!("scheduler: failed to apply target for '{}': {e}", config.rig_id); + warn!( + "scheduler: failed to apply target for '{}': {e}", + config.rig_id + ); continue; } @@ -678,7 +689,11 @@ async fn apply_scheduler_decoders( let mut want_wspr = false; let mut update_from = |bm: &crate::server::bookmarks::Bookmark| { - for decoder in bm.decoders.iter().map(|item| item.trim().to_ascii_lowercase()) { + for decoder in bm + .decoders + .iter() + .map(|item| item.trim().to_ascii_lowercase()) + { match decoder.as_str() { "aprs" => want_aprs = true, "hf-aprs" => want_hf_aprs = true, @@ -707,7 +722,10 @@ async fn apply_scheduler_decoders( for (label, cmd) in desired { if let Err(e) = scheduler_send(rig_tx, cmd, rig_id.to_string()).await { - warn!("scheduler: Set{label}DecodeEnabled failed for '{}': {:?}", rig_id, e); + warn!( + "scheduler: Set{label}DecodeEnabled failed for '{}': {:?}", + rig_id, e + ); } } } @@ -931,7 +949,9 @@ pub async fn put_scheduler_control( #[cfg(test)] mod tests { - use super::{timespan_active_entry, timespan_cycle_slot, timespan_active_entries, ScheduleEntry}; + use super::{ + timespan_active_entries, timespan_active_entry, timespan_cycle_slot, ScheduleEntry, + }; fn entry( id: &str, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index 222af9c..bb09f96 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -6,10 +6,10 @@ mod api; #[path = "audio.rs"] pub mod audio; -#[path = "background_decode.rs"] -pub mod background_decode; #[path = "auth.rs"] pub mod auth; +#[path = "background_decode.rs"] +pub mod background_decode; #[path = "bookmarks.rs"] pub mod bookmarks; #[path = "scheduler.rs"] @@ -88,8 +88,7 @@ async fn serve( ); let background_decode_path = BackgroundDecodeStore::default_path(); - let background_decode_store = - Arc::new(BackgroundDecodeStore::open(&background_decode_path)); + let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path)); let vchan_mgr = Arc::new(ClientChannelManager::new(4)); let background_decode_mgr = BackgroundDecodeManager::new( background_decode_store, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs index 20f489a..d6cb1d4 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs @@ -9,8 +9,7 @@ const CLIENT_BUILD_DATE: &str = env!("TRX_CLIENT_BUILD_DATE"); const INDEX_HTML: &str = include_str!("../assets/web/index.html"); pub const STYLE_CSS: &str = include_str!("../assets/web/style.css"); pub const APP_JS: &str = include_str!("../assets/web/app.js"); -pub const DECODE_HISTORY_WORKER_JS: &str = - include_str!("../assets/web/decode-history-worker.js"); +pub const DECODE_HISTORY_WORKER_JS: &str = include_str!("../assets/web/decode-history-worker.js"); pub const WEBGL_RENDERER_JS: &str = include_str!("../assets/web/webgl-renderer.js"); pub const LEAFLET_AIS_TRACKSYMBOL_JS: &str = include_str!("../assets/web/leaflet-ais-tracksymbol.js"); @@ -25,8 +24,7 @@ pub const WSPR_JS: &str = include_str!("../assets/web/plugins/wspr.js"); pub const CW_JS: &str = include_str!("../assets/web/plugins/cw.js"); pub const BOOKMARKS_JS: &str = include_str!("../assets/web/plugins/bookmarks.js"); pub const SCHEDULER_JS: &str = include_str!("../assets/web/plugins/scheduler.js"); -pub const BACKGROUND_DECODE_JS: &str = - include_str!("../assets/web/plugins/background-decode.js"); +pub const BACKGROUND_DECODE_JS: &str = include_str!("../assets/web/plugins/background-decode.js"); pub const VCHAN_JS: &str = include_str!("../assets/web/plugins/vchan.js"); pub fn index_html() -> String { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs index ecbbe60..b6384b0 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs @@ -367,11 +367,7 @@ impl ClientChannelManager { } /// Explicitly delete a channel by UUID (any session may do this). - pub fn delete_channel( - &self, - rig_id: &str, - channel_id: Uuid, - ) -> Result<(), VChanClientError> { + pub fn delete_channel(&self, rig_id: &str, channel_id: Uuid) -> Result<(), VChanClientError> { let mut rigs = self.rigs.write().unwrap(); let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?; let pos = channels @@ -450,7 +446,10 @@ impl ClientChannelManager { ch.freq_hz = freq_hz; self.broadcast_change(rig_id, channels); drop(rigs); - self.send_audio_cmd(VChanAudioCmd::SetFreq { uuid: channel_id, freq_hz }); + self.send_audio_cmd(VChanAudioCmd::SetFreq { + uuid: channel_id, + freq_hz, + }); Ok(()) } @@ -469,7 +468,10 @@ impl ClientChannelManager { ch.mode = mode.to_string(); self.broadcast_change(rig_id, channels); drop(rigs); - self.send_audio_cmd(VChanAudioCmd::SetMode { uuid: channel_id, mode: mode.to_string() }); + self.send_audio_cmd(VChanAudioCmd::SetMode { + uuid: channel_id, + mode: mode.to_string(), + }); Ok(()) } @@ -488,7 +490,10 @@ impl ClientChannelManager { ch.bandwidth_hz = bandwidth_hz; self.broadcast_change(rig_id, channels); drop(rigs); - self.send_audio_cmd(VChanAudioCmd::SetBandwidth { uuid: channel_id, bandwidth_hz }); + self.send_audio_cmd(VChanAudioCmd::SetBandwidth { + uuid: channel_id, + bandwidth_hz, + }); Ok(()) } @@ -530,12 +535,14 @@ impl ClientChannelManager { let mut changed = false; let desired_map: HashMap)> = desired .iter() - .map(|(bookmark_id, freq_hz, mode, bandwidth_hz, decoder_kinds)| { - ( - bookmark_id.clone(), - (*freq_hz, mode.clone(), *bandwidth_hz, decoder_kinds.clone()), - ) - }) + .map( + |(bookmark_id, freq_hz, mode, bandwidth_hz, decoder_kinds)| { + ( + bookmark_id.clone(), + (*freq_hz, mode.clone(), *bandwidth_hz, decoder_kinds.clone()), + ) + }, + ) .collect(); let desired_ids: std::collections::HashSet<&str> = desired_map.keys().map(String::as_str).collect(); @@ -561,7 +568,8 @@ impl ClientChannelManager { let Some(bookmark_id) = channel.scheduler_bookmark_id.as_deref() else { continue; }; - let Some((freq_hz, mode, bandwidth_hz, decoder_kinds)) = desired_map.get(bookmark_id) else { + let Some((freq_hz, mode, bandwidth_hz, decoder_kinds)) = desired_map.get(bookmark_id) + else { continue; }; if channel.freq_hz != *freq_hz { diff --git a/src/trx-core/src/audio.rs b/src/trx-core/src/audio.rs index 9143d4c..a0e7945 100644 --- a/src/trx-core/src/audio.rs +++ b/src/trx-core/src/audio.rs @@ -118,7 +118,10 @@ pub async fn read_audio_msg( if len > limit { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, - format!("audio frame too large: {} bytes (type={:#04x})", len, msg_type), + format!( + "audio frame too large: {} bytes (type={:#04x})", + len, msg_type + ), )); } let mut payload = vec![0u8; len as usize]; diff --git a/src/trx-reporting/src/aprsfi.rs b/src/trx-reporting/src/aprsfi.rs index ff65b88..0f196c1 100644 --- a/src/trx-reporting/src/aprsfi.rs +++ b/src/trx-reporting/src/aprsfi.rs @@ -127,7 +127,13 @@ pub async fn run_aprsfi_uplink( // Pre-build the beacon packet (None if beaconing disabled or no coords). let beacon_packet: Option = if cfg.beacon { match coords { - Some((lat, lon)) => Some(format_beacon(&callsign, lat, lon, cfg.beacon_symbol_table, cfg.beacon_symbol_code)), + Some((lat, lon)) => Some(format_beacon( + &callsign, + lat, + lon, + cfg.beacon_symbol_table, + cfg.beacon_symbol_code, + )), None => { warn!( "APRS-IS IGate: beacon enabled but no coordinates available \ diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/amcquam.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/amcquam.rs index dc42a8a..45e7fe1 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/amcquam.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/amcquam.rs @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: BSD-2-Clause -use num_complex::Complex; use super::DcBlocker; +use num_complex::Complex; /// C-QUAM (Compatible Quadrature AM) stereo demodulator. /// @@ -57,8 +57,7 @@ impl CquamDemod { self.carrier_im = alpha * self.carrier_im + one_minus_alpha * s.im; // Rotate s by −φ to phase-align I with (1 + m_s) and Q with m_d. - let mag_sq = - self.carrier_re * self.carrier_re + self.carrier_im * self.carrier_im; + let mag_sq = self.carrier_re * self.carrier_re + self.carrier_im * self.carrier_im; let (i_corr, q_corr) = if mag_sq > 1e-8 { let inv = mag_sq.sqrt().recip(); let cos_phi = self.carrier_re * inv; @@ -94,7 +93,10 @@ mod tests { let out = demod.demodulate_stereo(&samples); assert_eq!(out.len(), 512); for &s in &out { - assert!(s.abs() < 1e-5, "silence should produce near-zero output, got {s}"); + assert!( + s.abs() < 1e-5, + "silence should produce near-zero output, got {s}" + ); } } diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/math_arm.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/math_arm.rs index d979674..1c11357 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/math_arm.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod/math_arm.rs @@ -8,9 +8,7 @@ use num_complex::Complex; /// 7th-order minimax atan approximation for |z| <= 1. #[cfg(target_arch = "aarch64")] #[target_feature(enable = "neon")] -unsafe fn atan_poly_neon( - z: std::arch::aarch64::float32x4_t, -) -> std::arch::aarch64::float32x4_t { +unsafe fn atan_poly_neon(z: std::arch::aarch64::float32x4_t) -> std::arch::aarch64::float32x4_t { use std::arch::aarch64::*; let c0 = vdupq_n_f32(0.999_999_5_f32); let c1 = vdupq_n_f32(-0.333_326_1_f32); diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 91ed2bd..0d9febf 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -249,7 +249,10 @@ impl SdrPipeline { channel_if_hz: f64, mode: &RigMode, bandwidth_hz: u32, - ) -> (broadcast::Sender>, broadcast::Sender>>) { + ) -> ( + broadcast::Sender>, + broadcast::Sender>>, + ) { const PCM_BROADCAST_CAPACITY: usize = 32; const IQ_BROADCAST_CAPACITY: usize = 64; let (pcm_tx, _) = broadcast::channel::>(PCM_BROADCAST_CAPACITY); @@ -456,9 +459,7 @@ fn iq_read_loop( // Hold a read lock only for the duration of this block's DSP pass. // Write lock (add/remove channel) waits at most one block (~2 ms). { - let dsps = channel_dsps - .read() - .expect("channel_dsps RwLock poisoned"); + let dsps = channel_dsps.read().expect("channel_dsps RwLock poisoned"); for dsp_arc in dsps.iter() { match dsp_arc.lock() { Ok(mut dsp) => dsp.process_block(samples), diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/channel.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/channel.rs index 3594cbd..2d5d2a7 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/channel.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/channel.rs @@ -272,7 +272,12 @@ impl ChannelDsp { } else { (cutoff_hz / self.sdr_sample_rate as f32).min(0.499) }; - self.lpf_iq = BlockFirFilterPair::new(cutoff_norm, ssb_shift_norm(&self.mode, cutoff_norm), auto_taps(cutoff_norm), IQ_BLOCK_SIZE); + self.lpf_iq = BlockFirFilterPair::new( + cutoff_norm, + ssb_shift_norm(&self.mode, cutoff_norm), + auto_taps(cutoff_norm), + IQ_BLOCK_SIZE, + ); let rate_changed = self.decim_factor != next_decim_factor; self.decim_factor = next_decim_factor; self.decim_counter = 0; @@ -352,7 +357,12 @@ impl ChannelDsp { channel_if_hz, demodulator: Demodulator::for_mode(mode), mode: mode.clone(), - lpf_iq: BlockFirFilterPair::new(cutoff_norm, ssb_shift_norm(mode, cutoff_norm), auto_taps(cutoff_norm), IQ_BLOCK_SIZE), + lpf_iq: BlockFirFilterPair::new( + cutoff_norm, + ssb_shift_norm(mode, cutoff_norm), + auto_taps(cutoff_norm), + IQ_BLOCK_SIZE, + ), sdr_sample_rate, audio_sample_rate, audio_bandwidth_hz, diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/filter.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/filter.rs index d99e0e5..1047f9a 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/filter.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp/filter.rs @@ -109,7 +109,12 @@ type FirKernel = ( /// Setting `shift_norm = +cutoff_norm` produces a one-sided USB filter /// `[0, BW]`; `shift_norm = -cutoff_norm` produces a one-sided LSB filter /// `[-BW, 0]`; `shift_norm = 0` leaves the kernel symmetric (AM/FM/WFM). -fn build_fir_kernel(cutoff_norm: f32, shift_norm: f32, taps: usize, block_size: usize) -> FirKernel { +fn build_fir_kernel( + cutoff_norm: f32, + shift_norm: f32, + taps: usize, + block_size: usize, +) -> FirKernel { let coeffs = windowed_sinc_coeffs(cutoff_norm, taps); let fft_size = (block_size + taps - 1).next_power_of_two(); @@ -210,8 +215,14 @@ unsafe fn mul_freq_domain_neon( let (h_re, h_im) = (h_ri.0, h_ri.1); // Complex multiply: out.re = x.re*h.re - x.im*h.im, out.im = x.re*h.im + x.im*h.re - let out_re = vmulq_f32(vsubq_f32(vmulq_f32(x_re, h_re), vmulq_f32(x_im, h_im)), scale_v); - let out_im = vmulq_f32(vaddq_f32(vmulq_f32(x_re, h_im), vmulq_f32(x_im, h_re)), scale_v); + let out_re = vmulq_f32( + vsubq_f32(vmulq_f32(x_re, h_re), vmulq_f32(x_im, h_im)), + scale_v, + ); + let out_im = vmulq_f32( + vaddq_f32(vmulq_f32(x_re, h_im), vmulq_f32(x_im, h_re)), + scale_v, + ); // Reinterleave: .0 = [re0,im0,re1,im1], .1 = [re2,im2,re3,im3] let out = vzipq_f32(out_re, out_im); @@ -313,7 +324,8 @@ impl BlockFirFilterPair { /// `-cutoff_norm` for LSB/CWR. pub fn new(cutoff_norm: f32, shift_norm: f32, taps: usize, block_size: usize) -> Self { let taps = taps.max(1); - let (h_buf, fft_size, fft, ifft) = build_fir_kernel(cutoff_norm, shift_norm, taps, block_size); + let (h_buf, fft_size, fft, ifft) = + build_fir_kernel(cutoff_norm, shift_norm, taps, block_size); Self { h_freq: h_buf, overlap: vec![FftComplex::new(0.0, 0.0); taps.saturating_sub(1)], diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs index 9ecb6d9..75b5b0b 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs @@ -7,10 +7,10 @@ pub mod dsp; pub mod real_iq_source; pub mod vchan_impl; +use dsp::IqSource as _; use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; -use dsp::IqSource as _; use trx_core::radio::freq::{Band, Freq}; use trx_core::rig::response::RigError; use trx_core::rig::state::{RigFilterState, SpectrumData, VchanRdsEntry, WfmDenoiseLevel}; @@ -257,10 +257,7 @@ impl SoapySdrRig { }; // Initialise filter state from primary channel config (index 0), or defaults. - let bandwidth_hz = channels - .first() - .map(|&(_, _, bw)| bw) - .unwrap_or(3000); + let bandwidth_hz = channels.first().map(|&(_, _, bw)| bw).unwrap_or(3000); let spectrum_buf = pipeline.spectrum_buf.clone(); let retune_cmd = pipeline.retune_cmd.clone(); @@ -359,10 +356,7 @@ impl SoapySdrRig { let dsps = self.pipeline.channel_dsps.read().unwrap(); for idx in [ais_a_idx, ais_b_idx] { if let Some(dsp_arc) = dsps.get(idx) { - dsp_arc - .lock() - .unwrap() - .set_filter(self.bandwidth_hz); + dsp_arc.lock().unwrap().set_filter(self.bandwidth_hz); } } } @@ -749,10 +743,7 @@ impl RigCat for SoapySdrRig { { let dsps = self.pipeline.channel_dsps.read().unwrap(); if let Some(dsp_arc) = dsps.get(self.primary_channel_idx) { - dsp_arc - .lock() - .unwrap() - .set_filter(bandwidth_hz); + dsp_arc.lock().unwrap().set_filter(bandwidth_hz); } } self.apply_ais_channel_filters(); diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs index f54af88..f9ef1a3 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs @@ -100,11 +100,7 @@ impl SdrVirtualChannelManager { /// - `fixed_slot_count`: number of fixed pipeline slots (primary + AIS), /// i.e. the index of the first slot available for virtual channels. /// - `max_total`: maximum total channels including primary (e.g. 4). - pub fn new( - pipeline: Arc, - fixed_slot_count: usize, - max_total: usize, - ) -> Self { + pub fn new(pipeline: Arc, fixed_slot_count: usize, max_total: usize) -> Self { // Seed the channel list with a synthetic primary-channel entry. // We use the first PCM sender from the pipeline (index 0). let primary_pcm_tx = pipeline @@ -177,9 +173,9 @@ impl SdrVirtualChannelManager { } let bandwidth_hz = default_bandwidth_hz(mode); - let (pcm_tx, iq_tx) = - self.pipeline - .add_virtual_channel(if_hz as f64, mode, bandwidth_hz); + let (pcm_tx, iq_tx) = self + .pipeline + .add_virtual_channel(if_hz as f64, mode, bandwidth_hz); let pipeline_slot = self .pipeline