[fix](trx-server): fix PSKReporter IPFIX packet format and batching

Rewrite the PSKReporter uplink to match the protocol spec exactly:

- Fix template FlowSetIDs: receiver uses 0x0003 (Options Template Set),
  sender uses 0x0002 (Template Set); previously both used 0x9992/0x9993
- Add missing enterprise numbers (0x0000768F = 30351) to all enterprise
  field specifiers in both template blocks
- Fix sender template field IDs: use correct attributes (senderCallsign
  30351.1, frequency 30351.5, sNR 30351.6, iMD 30351.7, mode 30351.10,
  informationSource 30351.11, senderLocator 30351.3, flowStartSeconds 150)
- Fix sender data field order to match the template declaration
- Add iMD byte (0) required by the 8-field template
- Add 4-byte null padding on receiver and sender data records
- Batch spots into one UDP packet per 5-minute window (spec requirement)
- Deduplicate by callsign within each window (keep most-recent spot)
- Send template descriptors only in first 3 packets then once per hour
- Increment sequence number by report count, not packet count
- Guard against history replays: drop any spot older than the flush
  window (live FT8/WSPR is seconds old; history can be 24 h old)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-14 08:33:36 +01:00
parent 0b0e86f496
commit 5ed3e29d90
+148 -48
View File
@@ -2,11 +2,12 @@
// //
// SPDX-License-Identifier: BSD-2-Clause // SPDX-License-Identifier: BSD-2-Clause
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::sync::{broadcast, watch}; use tokio::sync::{broadcast, watch};
use tokio::time::{self, Duration}; use tokio::time::{self, Duration, Instant};
use tracing::{info, warn}; use tracing::{info, warn};
use trx_core::decode::DecodedMessage; use trx_core::decode::DecodedMessage;
@@ -15,15 +16,43 @@ use trx_core::rig::state::RigState;
use crate::config::PskReporterConfig; use crate::config::PskReporterConfig;
const PSK_REPORTER_IDENTIFIER: u16 = 0x000A; const PSK_REPORTER_IDENTIFIER: u16 = 0x000A;
const RECEIVER_DESCRIPTOR: u16 = 0x9992; const RECEIVER_FLOWSET: u16 = 0x9992;
const SENDER_DESCRIPTOR: u16 = 0x9993; const SENDER_FLOWSET: u16 = 0x9993;
const RECEIVER_RECORD_FORMAT: &[u8] = &[0x00, 0x03, 0x00, 0x00, 0x80, 0x02, 0xFF, 0xFF]; // Receiver: Options Template Set (FlowSetID=0x0003, 36 bytes total)
const SENDER_RECORD_FORMAT: &[u8] = &[ // Template ID 0x9992 with 3 fields, scope count 1.
0x00, 0x06, 0x00, 0x00, 0x80, 0x01, 0xFF, 0xFF, 0x80, 0x04, 0xFF, 0xFF, 0x80, 0x08, 0xFF, 0xFF, // Fields: receiverCallsign (30351.2), receiverLocator (30351.4), decoderSoftware (30351.8)
0x00, 0x96, 0x00, 0x04, const RECEIVER_TEMPLATE: &[u8] = &[
0x00, 0x03, 0x00, 0x24, // FlowSetID=3, Length=36
0x99, 0x92, 0x00, 0x03, 0x00, 0x01, // TemplateID=0x9992, FieldCount=3, ScopeCount=1
0x80, 0x02, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // receiverCallsign (30351.2), variable
0x80, 0x04, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // receiverLocator (30351.4), variable
0x80, 0x08, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // decoderSoftware (30351.8), variable
0x00, 0x00, // padding to 4-byte boundary
]; ];
// Sender: Template Set (FlowSetID=0x0002, 68 bytes total)
// Template ID 0x9993 with 8 fields.
// Fields: senderCallsign, frequency, sNR, iMD, mode, informationSource, senderLocator,
// flowStartSeconds
const SENDER_TEMPLATE: &[u8] = &[
0x00, 0x02, 0x00, 0x44, // FlowSetID=2, Length=68
0x99, 0x93, 0x00, 0x08, // TemplateID=0x9993, FieldCount=8
0x80, 0x01, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // senderCallsign (30351.1), variable
0x80, 0x05, 0x00, 0x04, 0x00, 0x00, 0x76, 0x8F, // frequency (30351.5), 4 bytes
0x80, 0x06, 0x00, 0x01, 0x00, 0x00, 0x76, 0x8F, // sNR (30351.6), 1 byte
0x80, 0x07, 0x00, 0x01, 0x00, 0x00, 0x76, 0x8F, // iMD (30351.7), 1 byte
0x80, 0x0A, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // mode (30351.10), variable
0x80, 0x0B, 0x00, 0x01, 0x00, 0x00, 0x76, 0x8F, // informationSource (30351.11), 1 byte
0x80, 0x03, 0xFF, 0xFF, 0x00, 0x00, 0x76, 0x8F, // senderLocator (30351.3), variable
0x00, 0x96, 0x00, 0x04, // flowStartSeconds (150), 4 bytes
];
// Send at most one packet every 5 minutes per PSKReporter spec.
const FLUSH_INTERVAL_SECS: u64 = 300;
// Retransmit template descriptors once per hour (plus first 3 packets on startup).
const TEMPLATE_RESEND_SECS: u64 = 3600;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Spot { struct Spot {
sender_callsign: String, sender_callsign: String,
@@ -82,20 +111,41 @@ pub async fn run_pskreporter_uplink(
); );
let mut current_freq_hz = state_rx.borrow().status.freq.hz; let mut current_freq_hz = state_rx.borrow().status.freq.hz;
// Deduplicated pending spots: callsign → most-recent Spot.
let mut pending: HashMap<String, Spot> = HashMap::new();
let mut stats_received: u64 = 0; let mut stats_received: u64 = 0;
let mut stats_sent: u64 = 0; let mut stats_sent: u64 = 0;
let mut stats_skipped: u64 = 0; let mut stats_skipped: u64 = 0;
let mut stats_send_err: u64 = 0; let mut stats_send_err: u64 = 0;
let mut stats_tick = time::interval(Duration::from_secs(60)); let mut stats_tick = time::interval(Duration::from_secs(60));
// Delay first flush by FLUSH_INTERVAL_SECS so we accumulate a useful batch.
let mut flush_tick = time::interval_at(
Instant::now() + Duration::from_secs(FLUSH_INTERVAL_SECS),
Duration::from_secs(FLUSH_INTERVAL_SECS),
);
loop { loop {
tokio::select! { tokio::select! {
_ = stats_tick.tick() => { _ = stats_tick.tick() => {
info!( info!(
"PSK Reporter stats: received={}, sent={}, skipped={}, send_errors={}", "PSK Reporter stats: received={}, sent={}, skipped={}, \
stats_received, stats_sent, stats_skipped, stats_send_err send_errors={}, pending={}",
stats_received, stats_sent, stats_skipped, stats_send_err,
pending.len()
); );
} }
_ = flush_tick.tick() => {
if !pending.is_empty() {
let spots: Vec<Spot> = pending.drain().map(|(_, v)| v).collect();
let n = spots.len() as u64;
if let Err(err) = client.send_spots(&spots).await {
warn!("PSK Reporter send failed: {}", err);
stats_send_err += 1;
} else {
stats_sent += n;
}
}
}
changed = state_rx.changed() => { changed = state_rx.changed() => {
if changed.is_err() { if changed.is_err() {
break; break;
@@ -120,12 +170,16 @@ pub async fn run_pskreporter_uplink(
continue; continue;
} }
}; };
if let Err(err) = client.send_spot(&spot).await { // Guard against history replays: reject any message whose timestamp
warn!("PSK Reporter send failed: {}", err); // is older than the flush window. Live FT8/WSPR messages are at most
stats_send_err += 1; // a few seconds old; history items can be up to 24 hours old.
} else { let age = now_unix_seconds().saturating_sub(spot.flow_start_seconds);
stats_sent += 1; if age > FLUSH_INTERVAL_SECS as u32 {
stats_skipped += 1;
continue;
} }
// Keep only the most-recent spot per callsign within the window.
pending.insert(spot.sender_callsign.clone(), spot);
} }
} }
} }
@@ -324,6 +378,8 @@ struct PskReporterClient {
software: String, software: String,
sequence: u32, sequence: u32,
session: u32, session: u32,
packets_sent: u32,
last_template_instant: Option<Instant>,
} }
impl PskReporterClient { impl PskReporterClient {
@@ -354,54 +410,81 @@ impl PskReporterClient {
software, software,
sequence: 1, sequence: 1,
session, session,
packets_sent: 0,
last_template_instant: None,
}) })
} }
async fn send_spot(&mut self, spot: &Spot) -> Result<(), String> { async fn send_spots(&mut self, spots: &[Spot]) -> Result<(), String> {
let packet = self.make_packet(spot)?; // Include template descriptors in first 3 packets and once per hour thereafter.
let include_templates = self.packets_sent < 3
|| self
.last_template_instant
.map_or(true, |t| t.elapsed() >= Duration::from_secs(TEMPLATE_RESEND_SECS));
let packet = self.make_packet(spots, include_templates)?;
self.socket self.socket
.send(&packet) .send(&packet)
.await .await
.map_err(|e| format!("send failed: {e}"))?; .map_err(|e| format!("send failed: {e}"))?;
self.sequence = self.sequence.wrapping_add(1);
self.packets_sent += 1;
// Sequence number = count of reports submitted (not packets).
self.sequence = self.sequence.wrapping_add(spots.len() as u32);
if include_templates {
self.last_template_instant = Some(Instant::now());
}
Ok(()) Ok(())
} }
fn make_packet(&self, spot: &Spot) -> Result<Vec<u8>, String> { fn make_packet(&self, spots: &[Spot], include_templates: bool) -> Result<Vec<u8>, String> {
let now = now_unix_seconds(); let now = now_unix_seconds();
let mut out = Vec::with_capacity(256); let mut out = Vec::with_capacity(512);
push_u16_be(&mut out, PSK_REPORTER_IDENTIFIER); // IPFIX message header (16 bytes) — total length patched at the end.
push_u16_be(&mut out, 0); // patched later push_u16_be(&mut out, PSK_REPORTER_IDENTIFIER); // version 0x000A
push_u16_be(&mut out, 0); // length — patched later
push_u32_be(&mut out, now); push_u32_be(&mut out, now);
push_u32_be(&mut out, self.sequence); push_u32_be(&mut out, self.sequence);
push_u32_be(&mut out, self.session); push_u32_be(&mut out, self.session);
append_record(&mut out, RECEIVER_DESCRIPTOR, RECEIVER_RECORD_FORMAT); // Template descriptor blocks (optional after first 3 packets).
append_record(&mut out, SENDER_DESCRIPTOR, SENDER_RECORD_FORMAT); if include_templates {
out.extend_from_slice(RECEIVER_TEMPLATE);
out.extend_from_slice(SENDER_TEMPLATE);
}
let mut receiver_payload = Vec::new(); // Receiver information data record (FlowSetID 0x9992).
push_prefixed_string(&mut receiver_payload, &self.receiver_callsign)?; let mut rx_data: Vec<u8> = Vec::new();
push_prefixed_string(&mut receiver_payload, &self.receiver_locator)?; push_prefixed_string(&mut rx_data, &self.receiver_callsign)?;
push_prefixed_string(&mut receiver_payload, &self.software)?; push_prefixed_string(&mut rx_data, &self.receiver_locator)?;
append_record(&mut out, RECEIVER_DESCRIPTOR, &receiver_payload); push_prefixed_string(&mut rx_data, &self.software)?;
pad_to_4(&mut rx_data);
push_u16_be(&mut out, RECEIVER_FLOWSET);
push_u16_be(&mut out, (rx_data.len() + 4) as u16); // length includes 4-byte set header
out.extend_from_slice(&rx_data);
let mut sender_payload = Vec::new(); // Sender information data records (FlowSetID 0x9993).
push_prefixed_string(&mut sender_payload, &spot.sender_callsign)?; // Field order must match SENDER_TEMPLATE:
push_u32_be( // senderCallsign, frequency, sNR, iMD, mode, informationSource,
&mut sender_payload, // senderLocator, flowStartSeconds
spot.abs_freq_hz.min(u32::MAX as u64) as u32, let mut tx_data: Vec<u8> = Vec::new();
); for spot in spots {
sender_payload.push(spot.snr_db.round().clamp(-128.0, 127.0) as i8 as u8); push_prefixed_string(&mut tx_data, &spot.sender_callsign)?;
push_prefixed_string(&mut sender_payload, spot.mode)?; push_u32_be(&mut tx_data, spot.abs_freq_hz.min(u32::MAX as u64) as u32);
sender_payload.push(1); // information source = local tx_data.push(spot.snr_db.round().clamp(-128.0, 127.0) as i8 as u8);
push_u32_be(&mut sender_payload, spot.flow_start_seconds); tx_data.push(0u8); // iMD — not available from FT8/WSPR decoders
push_prefixed_string( push_prefixed_string(&mut tx_data, spot.mode)?;
&mut sender_payload, tx_data.push(1u8); // informationSource = 1 (automatically extracted)
spot.sender_locator.as_deref().unwrap_or(""), push_prefixed_string(&mut tx_data, spot.sender_locator.as_deref().unwrap_or(""))?;
)?; push_u32_be(&mut tx_data, spot.flow_start_seconds);
append_record(&mut out, SENDER_DESCRIPTOR, &sender_payload); }
pad_to_4(&mut tx_data);
push_u16_be(&mut out, SENDER_FLOWSET);
push_u16_be(&mut out, (tx_data.len() + 4) as u16);
out.extend_from_slice(&tx_data);
// Patch total packet length into header bytes [2..3].
let len = out.len(); let len = out.len();
if len > u16::MAX as usize { if len > u16::MAX as usize {
return Err("PSK Reporter packet too large".to_string()); return Err("PSK Reporter packet too large".to_string());
@@ -409,14 +492,17 @@ impl PskReporterClient {
let be = (len as u16).to_be_bytes(); let be = (len as u16).to_be_bytes();
out[2] = be[0]; out[2] = be[0];
out[3] = be[1]; out[3] = be[1];
Ok(out) Ok(out)
} }
} }
fn append_record(out: &mut Vec<u8>, descriptor: u16, payload: &[u8]) { /// Pad `buf` with null bytes until its length is a multiple of 4.
push_u16_be(out, descriptor); fn pad_to_4(buf: &mut Vec<u8>) {
push_u16_be(out, (payload.len() + 4) as u16); let rem = buf.len() % 4;
out.extend_from_slice(payload); if rem != 0 {
buf.extend(std::iter::repeat(0u8).take(4 - rem));
}
} }
fn push_u16_be(buf: &mut Vec<u8>, value: u16) { fn push_u16_be(buf: &mut Vec<u8>, value: u16) {
@@ -429,7 +515,7 @@ fn push_u32_be(buf: &mut Vec<u8>, value: u32) {
fn push_prefixed_string(buf: &mut Vec<u8>, value: &str) -> Result<(), String> { fn push_prefixed_string(buf: &mut Vec<u8>, value: &str) -> Result<(), String> {
let bytes = value.as_bytes(); let bytes = value.as_bytes();
if bytes.len() > u8::MAX as usize { if bytes.len() > 254 {
return Err(format!("string too long for PSK Reporter field: {}", value)); return Err(format!("string too long for PSK Reporter field: {}", value));
} }
buf.push(bytes.len() as u8); buf.push(bytes.len() as u8);
@@ -479,4 +565,18 @@ mod tests {
assert_eq!(offset_to_abs(14_074_000, 1_237.0), 14_075_237); assert_eq!(offset_to_abs(14_074_000, 1_237.0), 14_075_237);
assert_eq!(offset_to_abs(14_074_000, 14_075_237.0), 14_075_237); assert_eq!(offset_to_abs(14_074_000, 14_075_237.0), 14_075_237);
} }
#[test]
fn receiver_template_length_correct() {
assert_eq!(RECEIVER_TEMPLATE.len(), 36);
let len = u16::from_be_bytes([RECEIVER_TEMPLATE[2], RECEIVER_TEMPLATE[3]]);
assert_eq!(len as usize, RECEIVER_TEMPLATE.len());
}
#[test]
fn sender_template_length_correct() {
assert_eq!(SENDER_TEMPLATE.len(), 68);
let len = u16::from_be_bytes([SENDER_TEMPLATE[2], SENDER_TEMPLATE[3]]);
assert_eq!(len as usize, SENDER_TEMPLATE.len());
}
} }