diff --git a/src/trx-server/src/pskreporter.rs b/src/trx-server/src/pskreporter.rs index 1188672..1de0680 100644 --- a/src/trx-server/src/pskreporter.rs +++ b/src/trx-server/src/pskreporter.rs @@ -2,11 +2,12 @@ // // SPDX-License-Identifier: BSD-2-Clause +use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::net::UdpSocket; use tokio::sync::{broadcast, watch}; -use tokio::time::{self, Duration}; +use tokio::time::{self, Duration, Instant}; use tracing::{info, warn}; use trx_core::decode::DecodedMessage; @@ -15,15 +16,43 @@ use trx_core::rig::state::RigState; use crate::config::PskReporterConfig; const PSK_REPORTER_IDENTIFIER: u16 = 0x000A; -const RECEIVER_DESCRIPTOR: u16 = 0x9992; -const SENDER_DESCRIPTOR: u16 = 0x9993; +const RECEIVER_FLOWSET: u16 = 0x9992; +const SENDER_FLOWSET: u16 = 0x9993; -const RECEIVER_RECORD_FORMAT: &[u8] = &[0x00, 0x03, 0x00, 0x00, 0x80, 0x02, 0xFF, 0xFF]; -const SENDER_RECORD_FORMAT: &[u8] = &[ - 0x00, 0x06, 0x00, 0x00, 0x80, 0x01, 0xFF, 0xFF, 0x80, 0x04, 0xFF, 0xFF, 0x80, 0x08, 0xFF, 0xFF, - 0x00, 0x96, 0x00, 0x04, +// Receiver: Options Template Set (FlowSetID=0x0003, 36 bytes total) +// Template ID 0x9992 with 3 fields, scope count 1. +// Fields: receiverCallsign (30351.2), receiverLocator (30351.4), decoderSoftware (30351.8) +const RECEIVER_TEMPLATE: &[u8] = &[ + 0x00, 0x03, 0x00, 0x24, // FlowSetID=3, Length=36 + 0x99, 0x92, 0x00, 0x03, 0x00, 0x01, // TemplateID=0x9992, FieldCount=3, ScopeCount=1 + 0x80, 0x02, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // receiverCallsign (30351.2), variable + 0x80, 0x04, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // receiverLocator (30351.4), variable + 0x80, 0x08, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // decoderSoftware (30351.8), variable + 0x00, 0x00, // padding to 4-byte boundary ]; +// Sender: Template Set (FlowSetID=0x0002, 68 bytes total) +// Template ID 0x9993 with 8 fields. +// Fields: senderCallsign, frequency, sNR, iMD, mode, informationSource, senderLocator, +// flowStartSeconds +const SENDER_TEMPLATE: &[u8] = &[ + 0x00, 0x02, 0x00, 0x44, // FlowSetID=2, Length=68 + 0x99, 0x93, 0x00, 0x08, // TemplateID=0x9993, FieldCount=8 + 0x80, 0x01, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // senderCallsign (30351.1), variable + 0x80, 0x05, 0x00, 0x04, 0x00, 0x00, 0x76, 0x8F, // frequency (30351.5), 4 bytes + 0x80, 0x06, 0x00, 0x01, 0x00, 0x00, 0x76, 0x8F, // sNR (30351.6), 1 byte + 0x80, 0x07, 0x00, 0x01, 0x00, 0x00, 0x76, 0x8F, // iMD (30351.7), 1 byte + 0x80, 0x0A, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // mode (30351.10), variable + 0x80, 0x0B, 0x00, 0x01, 0x00, 0x00, 0x76, 0x8F, // informationSource (30351.11), 1 byte + 0x80, 0x03, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // senderLocator (30351.3), variable + 0x00, 0x96, 0x00, 0x04, // flowStartSeconds (150), 4 bytes +]; + +// Send at most one packet every 5 minutes per PSKReporter spec. +const FLUSH_INTERVAL_SECS: u64 = 300; +// Retransmit template descriptors once per hour (plus first 3 packets on startup). +const TEMPLATE_RESEND_SECS: u64 = 3600; + #[derive(Debug, Clone)] struct Spot { sender_callsign: String, @@ -82,20 +111,41 @@ pub async fn run_pskreporter_uplink( ); let mut current_freq_hz = state_rx.borrow().status.freq.hz; + // Deduplicated pending spots: callsign → most-recent Spot. + let mut pending: HashMap = HashMap::new(); let mut stats_received: u64 = 0; let mut stats_sent: u64 = 0; let mut stats_skipped: u64 = 0; let mut stats_send_err: u64 = 0; let mut stats_tick = time::interval(Duration::from_secs(60)); + // Delay first flush by FLUSH_INTERVAL_SECS so we accumulate a useful batch. + let mut flush_tick = time::interval_at( + Instant::now() + Duration::from_secs(FLUSH_INTERVAL_SECS), + Duration::from_secs(FLUSH_INTERVAL_SECS), + ); loop { tokio::select! { _ = stats_tick.tick() => { info!( - "PSK Reporter stats: received={}, sent={}, skipped={}, send_errors={}", - stats_received, stats_sent, stats_skipped, stats_send_err + "PSK Reporter stats: received={}, sent={}, skipped={}, \ + send_errors={}, pending={}", + stats_received, stats_sent, stats_skipped, stats_send_err, + pending.len() ); } + _ = flush_tick.tick() => { + if !pending.is_empty() { + let spots: Vec = pending.drain().map(|(_, v)| v).collect(); + let n = spots.len() as u64; + if let Err(err) = client.send_spots(&spots).await { + warn!("PSK Reporter send failed: {}", err); + stats_send_err += 1; + } else { + stats_sent += n; + } + } + } changed = state_rx.changed() => { if changed.is_err() { break; @@ -120,12 +170,16 @@ pub async fn run_pskreporter_uplink( continue; } }; - if let Err(err) = client.send_spot(&spot).await { - warn!("PSK Reporter send failed: {}", err); - stats_send_err += 1; - } else { - stats_sent += 1; + // Guard against history replays: reject any message whose timestamp + // is older than the flush window. Live FT8/WSPR messages are at most + // a few seconds old; history items can be up to 24 hours old. + let age = now_unix_seconds().saturating_sub(spot.flow_start_seconds); + if age > FLUSH_INTERVAL_SECS as u32 { + stats_skipped += 1; + continue; } + // Keep only the most-recent spot per callsign within the window. + pending.insert(spot.sender_callsign.clone(), spot); } } } @@ -324,6 +378,8 @@ struct PskReporterClient { software: String, sequence: u32, session: u32, + packets_sent: u32, + last_template_instant: Option, } impl PskReporterClient { @@ -354,54 +410,81 @@ impl PskReporterClient { software, sequence: 1, session, + packets_sent: 0, + last_template_instant: None, }) } - async fn send_spot(&mut self, spot: &Spot) -> Result<(), String> { - let packet = self.make_packet(spot)?; + async fn send_spots(&mut self, spots: &[Spot]) -> Result<(), String> { + // Include template descriptors in first 3 packets and once per hour thereafter. + let include_templates = self.packets_sent < 3 + || self + .last_template_instant + .map_or(true, |t| t.elapsed() >= Duration::from_secs(TEMPLATE_RESEND_SECS)); + + let packet = self.make_packet(spots, include_templates)?; self.socket .send(&packet) .await .map_err(|e| format!("send failed: {e}"))?; - self.sequence = self.sequence.wrapping_add(1); + + self.packets_sent += 1; + // Sequence number = count of reports submitted (not packets). + self.sequence = self.sequence.wrapping_add(spots.len() as u32); + if include_templates { + self.last_template_instant = Some(Instant::now()); + } Ok(()) } - fn make_packet(&self, spot: &Spot) -> Result, String> { + fn make_packet(&self, spots: &[Spot], include_templates: bool) -> Result, String> { let now = now_unix_seconds(); - let mut out = Vec::with_capacity(256); + let mut out = Vec::with_capacity(512); - push_u16_be(&mut out, PSK_REPORTER_IDENTIFIER); - push_u16_be(&mut out, 0); // patched later + // IPFIX message header (16 bytes) — total length patched at the end. + push_u16_be(&mut out, PSK_REPORTER_IDENTIFIER); // version 0x000A + push_u16_be(&mut out, 0); // length — patched later push_u32_be(&mut out, now); push_u32_be(&mut out, self.sequence); push_u32_be(&mut out, self.session); - append_record(&mut out, RECEIVER_DESCRIPTOR, RECEIVER_RECORD_FORMAT); - append_record(&mut out, SENDER_DESCRIPTOR, SENDER_RECORD_FORMAT); + // Template descriptor blocks (optional after first 3 packets). + if include_templates { + out.extend_from_slice(RECEIVER_TEMPLATE); + out.extend_from_slice(SENDER_TEMPLATE); + } - let mut receiver_payload = Vec::new(); - push_prefixed_string(&mut receiver_payload, &self.receiver_callsign)?; - push_prefixed_string(&mut receiver_payload, &self.receiver_locator)?; - push_prefixed_string(&mut receiver_payload, &self.software)?; - append_record(&mut out, RECEIVER_DESCRIPTOR, &receiver_payload); + // Receiver information data record (FlowSetID 0x9992). + let mut rx_data: Vec = Vec::new(); + push_prefixed_string(&mut rx_data, &self.receiver_callsign)?; + push_prefixed_string(&mut rx_data, &self.receiver_locator)?; + push_prefixed_string(&mut rx_data, &self.software)?; + pad_to_4(&mut rx_data); + push_u16_be(&mut out, RECEIVER_FLOWSET); + push_u16_be(&mut out, (rx_data.len() + 4) as u16); // length includes 4-byte set header + out.extend_from_slice(&rx_data); - let mut sender_payload = Vec::new(); - push_prefixed_string(&mut sender_payload, &spot.sender_callsign)?; - push_u32_be( - &mut sender_payload, - spot.abs_freq_hz.min(u32::MAX as u64) as u32, - ); - sender_payload.push(spot.snr_db.round().clamp(-128.0, 127.0) as i8 as u8); - push_prefixed_string(&mut sender_payload, spot.mode)?; - sender_payload.push(1); // information source = local - push_u32_be(&mut sender_payload, spot.flow_start_seconds); - push_prefixed_string( - &mut sender_payload, - spot.sender_locator.as_deref().unwrap_or(""), - )?; - append_record(&mut out, SENDER_DESCRIPTOR, &sender_payload); + // Sender information data records (FlowSetID 0x9993). + // Field order must match SENDER_TEMPLATE: + // senderCallsign, frequency, sNR, iMD, mode, informationSource, + // senderLocator, flowStartSeconds + let mut tx_data: Vec = Vec::new(); + for spot in spots { + push_prefixed_string(&mut tx_data, &spot.sender_callsign)?; + push_u32_be(&mut tx_data, spot.abs_freq_hz.min(u32::MAX as u64) as u32); + tx_data.push(spot.snr_db.round().clamp(-128.0, 127.0) as i8 as u8); + tx_data.push(0u8); // iMD — not available from FT8/WSPR decoders + push_prefixed_string(&mut tx_data, spot.mode)?; + tx_data.push(1u8); // informationSource = 1 (automatically extracted) + push_prefixed_string(&mut tx_data, spot.sender_locator.as_deref().unwrap_or(""))?; + push_u32_be(&mut tx_data, spot.flow_start_seconds); + } + pad_to_4(&mut tx_data); + push_u16_be(&mut out, SENDER_FLOWSET); + push_u16_be(&mut out, (tx_data.len() + 4) as u16); + out.extend_from_slice(&tx_data); + // Patch total packet length into header bytes [2..3]. let len = out.len(); if len > u16::MAX as usize { return Err("PSK Reporter packet too large".to_string()); @@ -409,14 +492,17 @@ impl PskReporterClient { let be = (len as u16).to_be_bytes(); out[2] = be[0]; out[3] = be[1]; + Ok(out) } } -fn append_record(out: &mut Vec, descriptor: u16, payload: &[u8]) { - push_u16_be(out, descriptor); - push_u16_be(out, (payload.len() + 4) as u16); - out.extend_from_slice(payload); +/// Pad `buf` with null bytes until its length is a multiple of 4. +fn pad_to_4(buf: &mut Vec) { + let rem = buf.len() % 4; + if rem != 0 { + buf.extend(std::iter::repeat(0u8).take(4 - rem)); + } } fn push_u16_be(buf: &mut Vec, value: u16) { @@ -429,7 +515,7 @@ fn push_u32_be(buf: &mut Vec, value: u32) { fn push_prefixed_string(buf: &mut Vec, value: &str) -> Result<(), String> { let bytes = value.as_bytes(); - if bytes.len() > u8::MAX as usize { + if bytes.len() > 254 { return Err(format!("string too long for PSK Reporter field: {}", value)); } buf.push(bytes.len() as u8); @@ -479,4 +565,18 @@ mod tests { assert_eq!(offset_to_abs(14_074_000, 1_237.0), 14_075_237); assert_eq!(offset_to_abs(14_074_000, 14_075_237.0), 14_075_237); } + + #[test] + fn receiver_template_length_correct() { + assert_eq!(RECEIVER_TEMPLATE.len(), 36); + let len = u16::from_be_bytes([RECEIVER_TEMPLATE[2], RECEIVER_TEMPLATE[3]]); + assert_eq!(len as usize, RECEIVER_TEMPLATE.len()); + } + + #[test] + fn sender_template_length_correct() { + assert_eq!(SENDER_TEMPLATE.len(), 68); + let len = u16::from_be_bytes([SENDER_TEMPLATE[2], SENDER_TEMPLATE[3]]); + assert_eq!(len as usize, SENDER_TEMPLATE.len()); + } }