[fix](trx-rs): use per-message rig_id for map marker tagging

The map module was tagging all decode markers (APRS, AIS, VDES,
FT8/FT4/FT2/WSPR locators) with the global rig picker's active rig
instead of the actual source rig. This made the map's own rig filter
dropdown ineffective in multi-rig setups.

- Add rig_id field to all decode message structs (AisMessage,
  VdesMessage, AprsPacket, CwEvent, Ft8Message, WsprMessage)
- Set rig_id on messages in audio_client before broadcasting, using
  the actual rig connection identifier
- Update history collector to prefer message rig_id over the global
  active rig fallback
- Pass rig_id through plugin normalize functions (AIS, APRS, VDES,
  HF-APRS) so it reaches the map add functions
- Update all map marker functions (aprsMapAddStation, aisMapAddVessel,
  vdesMapAddPoint, mapAddLocator) to use the message's rig_id with
  fallback to the global picker for backward compatibility

https://claude.ai/code/session_015gC7axHk2jmp7HbFPdbivN
Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-26 14:49:15 +00:00
committed by Stan Grams
parent a63f27971d
commit 8e6623b39e
14 changed files with 95 additions and 24 deletions
+1
View File
@@ -230,6 +230,7 @@ fn parse_frame(frame: RawFrame, channel: &str) -> Option<AisMessage> {
let mmsi = get_uint(&bits, 8, 30)? as u32;
let mut msg = AisMessage {
rig_id: None,
ts_ms: None,
channel: channel.to_string(),
message_type,
+1
View File
@@ -413,6 +413,7 @@ fn parse_aprs(ax25: &Ax25Frame) -> AprsPacket {
}
AprsPacket {
rig_id: None,
ts_ms: None,
src_call,
dest_call,
+1
View File
@@ -399,6 +399,7 @@ impl CwDecoder {
fn emit_event(&mut self, text: &str) {
self.events.push(CwEvent {
rig_id: None,
text: text.to_string(),
wpm: self.wpm,
tone_hz: self.tone_freq,
+2
View File
@@ -226,6 +226,7 @@ impl VdesDecoder {
});
Some(VdesMessage {
rig_id: None,
ts_ms: None,
channel: channel.to_string(),
message_type: parsed.message_id.unwrap_or(mode.message_type),
@@ -421,6 +422,7 @@ fn build_unsynced_message(
let raw_bytes = pack_dibits_msb(raw_symbols);
let sync_pct = framed.sync_score * 100.0;
VdesMessage {
rig_id: None,
ts_ms: None,
channel: channel.to_string(),
message_type: mode.message_type,
+4 -2
View File
@@ -548,7 +548,8 @@ async fn handle_single_rig_connection(
break;
}
let json = &decompressed[pos..pos + len];
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(json) {
if let Ok(mut msg) = serde_json::from_slice::<DecodedMessage>(json) {
msg.set_rig_id(rig_id_for_rx.clone());
if let Some(ref sink) = replay_history_sink {
sink(msg);
}
@@ -569,7 +570,8 @@ async fn handle_single_rig_connection(
| AUDIO_MSG_WSPR_DECODE,
payload,
)) => {
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(&payload) {
if let Ok(mut msg) = serde_json::from_slice::<DecodedMessage>(&payload) {
msg.set_rig_id(rig_id_for_rx.clone());
let _ = decode_tx_clone.send(msg);
}
}
@@ -6275,6 +6275,7 @@ function _aprsAddMarkerToMap(call, entry) {
window.aprsMapAddStation = function(call, lat, lon, info, symbolTable, symbolCode, pkt) {
const nextPoint = [lat, lon];
const tsMs = Number.isFinite(pkt?._tsMs) ? Number(pkt._tsMs) : Date.now();
const msgRigId = pkt?.rig_id || lastActiveRigId;
const existing = stationMarkers.get(call);
if (existing) {
existing.pkt = pkt;
@@ -6283,9 +6284,9 @@ window.aprsMapAddStation = function(call, lat, lon, info, symbolTable, symbolCod
existing.info = info;
existing.symbolTable = symbolTable;
existing.symbolCode = symbolCode;
if (lastActiveRigId) {
if (msgRigId) {
if (!existing.rigIds) existing.rigIds = new Set();
existing.rigIds.add(lastActiveRigId);
existing.rigIds.add(msgRigId);
}
if (!Array.isArray(existing.trackHistory)) existing.trackHistory = [];
const prevPoint = existing.trackHistory[existing.trackHistory.length - 1];
@@ -6312,7 +6313,7 @@ window.aprsMapAddStation = function(call, lat, lon, info, symbolTable, symbolCod
info,
symbolTable,
symbolCode,
rigIds: new Set(lastActiveRigId ? [lastActiveRigId] : []),
rigIds: new Set(msgRigId ? [msgRigId] : []),
};
stationMarkers.set(call, entry);
pruneAprsEntry(call, entry, mapHistoryCutoffMs());
@@ -6407,12 +6408,13 @@ window.aisMapAddVessel = function(msg) {
const popupHtml = buildAisPopupHtml(msg);
const nextPoint = [msg.lat, msg.lon];
const tsMs = Number.isFinite(msg?._tsMs) ? Number(msg._tsMs) : Date.now();
const msgRigId = msg?.rig_id || lastActiveRigId;
const existing = aisMarkers.get(key);
if (existing) {
existing.msg = msg;
if (lastActiveRigId) {
if (msgRigId) {
if (!existing.rigIds) existing.rigIds = new Set();
existing.rigIds.add(lastActiveRigId);
existing.rigIds.add(msgRigId);
}
if (!Array.isArray(existing.trackHistory)) existing.trackHistory = [];
const prevPoint = existing.trackHistory[existing.trackHistory.length - 1];
@@ -6433,7 +6435,7 @@ window.aisMapAddVessel = function(msg) {
trackHistory: [{ lat: msg.lat, lon: msg.lon, tsMs }],
trackPoints: [nextPoint],
msg,
rigIds: new Set(lastActiveRigId ? [lastActiveRigId] : []),
rigIds: new Set(msgRigId ? [msgRigId] : []),
});
pruneAisEntry(key, aisMarkers.get(key), mapHistoryCutoffMs());
if (aisMarkers.get(key)?.visibleInHistoryWindow) ensureAisMarker(key, aisMarkers.get(key));
@@ -6447,13 +6449,14 @@ window.vdesMapAddPoint = function(msg) {
const popupHtml = buildVdesPopupHtml(msg);
const visible = Number.isFinite(Number(msg?._tsMs))
&& Number(msg._tsMs) >= mapHistoryCutoffMs();
const msgRigId = msg?.rig_id || lastActiveRigId;
const existing = vdesMarkers.get(key);
if (existing) {
existing.msg = msg;
existing.visibleInHistoryWindow = visible;
if (lastActiveRigId) {
if (msgRigId) {
if (!existing.rigIds) existing.rigIds = new Set();
existing.rigIds.add(lastActiveRigId);
existing.rigIds.add(msgRigId);
}
if (!visible) {
if (!decodeHistoryMapRenderingDeferred()) {
@@ -6479,7 +6482,7 @@ window.vdesMapAddPoint = function(msg) {
marker: null,
msg,
visibleInHistoryWindow: visible,
rigIds: new Set(lastActiveRigId ? [lastActiveRigId] : []),
rigIds: new Set(msgRigId ? [msgRigId] : []),
};
vdesMarkers.set(key, entry);
if (!visible) return;
@@ -7169,6 +7172,7 @@ window.syncBookmarkMapLocators = function(bookmarks) {
window.mapAddLocator = function(message, grids, type = "ft8", station = null, details = null) {
if (!Array.isArray(grids) || grids.length === 0) return;
const markerType = type === "wspr" ? "wspr" : (type === "ft4" ? "ft4" : (type === "ft2" ? "ft2" : "ft8"));
const msgRigId = details?.rig_id || lastActiveRigId;
const unique = [...new Set(grids.map((g) => String(g).toUpperCase()))];
const stationId = station && String(station).trim() ? String(station).trim().toUpperCase() : "";
const locatorDetails = new Map();
@@ -7199,7 +7203,7 @@ window.mapAddLocator = function(message, grids, type = "ft8", station = null, de
dt_s: Number.isFinite(details?.dt_s) ? Number(details.dt_s) : null,
freq_hz: Number.isFinite(details?.freq_hz) ? Number(details.freq_hz) : null,
message: String(details?.message || message || "").trim() || null,
remote: lastActiveRigId || null,
remote: msgRigId || null,
};
const detailKey = detailStationId || `${targetId || "decode"}:${detailEntry.message || "decode"}:${detailEntry.ts_ms || Date.now()}`;
const key = `${markerType}:${grid}`;
@@ -7213,9 +7217,9 @@ window.mapAddLocator = function(message, grids, type = "ft8", station = null, de
}
existing.allStationDetails.set(detailKey, { ...detailEntry });
existing.sourceType = markerType;
if (lastActiveRigId) {
if (msgRigId) {
if (!existing.rigIds) existing.rigIds = new Set();
existing.rigIds.add(lastActiveRigId);
existing.rigIds.add(msgRigId);
}
pruneLocatorEntry(key, existing, mapHistoryCutoffMs());
if (existing.marker) sendLocatorOverlayToBack(existing.marker);
@@ -7233,7 +7237,7 @@ window.mapAddLocator = function(message, grids, type = "ft8", station = null, de
allStationDetails,
sourceType: markerType,
bandMeta: new Map(),
rigIds: new Set(lastActiveRigId ? [lastActiveRigId] : []),
rigIds: new Set(msgRigId ? [msgRigId] : []),
};
locatorMarkers.set(key, entry);
pruneLocatorEntry(key, entry, mapHistoryCutoffMs());
@@ -326,6 +326,7 @@ function addAisMessage(msg) {
function normalizeServerAisMessage(msg) {
return {
rig_id: msg.rig_id || null,
channel: msg.channel,
message_type: msg.message_type,
mmsi: msg.mmsi,
@@ -393,6 +393,7 @@ function addAprsPacket(pkt) {
function normalizeServerAprsPacket(pkt) {
return {
rig_id: pkt.rig_id || null,
receiver: window.getDecodeRigMeta ? window.getDecodeRigMeta() : null,
srcCall: pkt.src_call,
destCall: pkt.dest_call,
@@ -335,6 +335,7 @@ function addHfAprsPacket(pkt) {
function normalizeServerHfAprsPacket(pkt) {
return {
rig_id: pkt.rig_id || null,
receiver: window.getDecodeRigMeta ? window.getDecodeRigMeta() : null,
srcCall: pkt.src_call,
destCall: pkt.dest_call,
@@ -256,6 +256,7 @@ function addVdesMessage(msg) {
function normalizeServerVdesMessage(msg) {
return {
rig_id: msg.rig_id || null,
message_type: msg.message_type,
bit_len: msg.bit_len,
raw_bytes: msg.raw_bytes,
@@ -121,11 +121,12 @@ fn record_ais(context: &FrontendRuntimeContext, mut msg: AisMessage) {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
}
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ais_history
.lock()
.expect("ais history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), msg));
history.push_back((Instant::now(), rig_id, msg));
prune_ais_history(context, &mut history);
}
@@ -133,11 +134,12 @@ fn record_vdes(context: &FrontendRuntimeContext, mut msg: VdesMessage) {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
}
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.vdes_history
.lock()
.expect("vdes history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), msg));
history.push_back((Instant::now(), rig_id, msg));
prune_vdes_history(context, &mut history);
}
@@ -210,11 +212,12 @@ fn record_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
let rig_id = pkt.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.aprs_history
.lock()
.expect("aprs history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), pkt));
history.push_back((Instant::now(), rig_id, pkt));
prune_aprs_history(context, &mut history);
}
@@ -222,56 +225,62 @@ fn record_hf_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
let rig_id = pkt.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.hf_aprs_history
.lock()
.expect("hf_aprs history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), pkt));
history.push_back((Instant::now(), rig_id, pkt));
prune_hf_aprs_history(context, &mut history);
}
fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) {
let rig_id = event.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.cw_history
.lock()
.expect("cw history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), event));
history.push_back((Instant::now(), rig_id, event));
prune_cw_history(context, &mut history);
}
fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ft8_history
.lock()
.expect("ft8 history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), msg));
history.push_back((Instant::now(), rig_id, msg));
prune_ft8_history(context, &mut history);
}
fn record_ft4(context: &FrontendRuntimeContext, msg: Ft8Message) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ft4_history
.lock()
.expect("ft4 history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), msg));
history.push_back((Instant::now(), rig_id, msg));
prune_ft4_history(context, &mut history);
}
fn record_ft2(context: &FrontendRuntimeContext, msg: Ft8Message) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.ft2_history
.lock()
.expect("ft2 history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), msg));
history.push_back((Instant::now(), rig_id, msg));
prune_ft2_history(context, &mut history);
}
fn record_wspr(context: &FrontendRuntimeContext, msg: WsprMessage) {
let rig_id = msg.rig_id.clone().or_else(|| active_rig_id(context));
let mut history = context
.wspr_history
.lock()
.expect("wspr history mutex poisoned");
history.push_back((Instant::now(), active_rig_id(context), msg));
history.push_back((Instant::now(), rig_id, msg));
prune_wspr_history(context, &mut history);
}
+38
View File
@@ -30,8 +30,36 @@ pub enum DecodedMessage {
Wspr(WsprMessage),
}
impl DecodedMessage {
/// Attach a rig identifier to the inner message variant.
pub fn set_rig_id(&mut self, id: String) {
match self {
Self::Ais(m) => m.rig_id = Some(id),
Self::Vdes(m) => m.rig_id = Some(id),
Self::Aprs(m) | Self::HfAprs(m) => m.rig_id = Some(id),
Self::Cw(m) => m.rig_id = Some(id),
Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id = Some(id),
Self::Wspr(m) => m.rig_id = Some(id),
}
}
/// Return the rig identifier from the inner message variant, if set.
pub fn rig_id(&self) -> Option<&str> {
match self {
Self::Ais(m) => m.rig_id.as_deref(),
Self::Vdes(m) => m.rig_id.as_deref(),
Self::Aprs(m) | Self::HfAprs(m) => m.rig_id.as_deref(),
Self::Cw(m) => m.rig_id.as_deref(),
Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id.as_deref(),
Self::Wspr(m) => m.rig_id.as_deref(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AisMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ts_ms: Option<i64>,
pub channel: String,
@@ -63,6 +91,8 @@ pub struct AisMessage {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VdesMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ts_ms: Option<i64>,
pub channel: String,
@@ -122,6 +152,8 @@ pub struct VdesMessage {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AprsPacket {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ts_ms: Option<i64>,
pub src_call: String,
@@ -143,6 +175,8 @@ pub struct AprsPacket {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CwEvent {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
/// Decoded text fragment (one or more characters)
pub text: String,
/// Current detected WPM
@@ -155,6 +189,8 @@ pub struct CwEvent {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Ft8Message {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
/// UTC timestamp (milliseconds since epoch)
pub ts_ms: i64,
/// Approximate SNR (dB)
@@ -169,6 +205,8 @@ pub struct Ft8Message {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsprMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
/// UTC timestamp (milliseconds since epoch)
pub ts_ms: i64,
/// Approximate SNR (dB)
+1
View File
@@ -380,6 +380,7 @@ mod tests {
fn make_pkt(src: &str, dest: &str, path: &str, info: &str, crc_ok: bool) -> AprsPacket {
AprsPacket {
rig_id: None,
ts_ms: None,
src_call: src.to_string(),
dest_call: dest.to_string(),
+8
View File
@@ -1806,6 +1806,7 @@ pub async fn run_ft8_decoder(
let base_freq_hz = state_rx.borrow().status.freq.hz as f64;
let abs_freq_hz = base_freq_hz + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms,
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -1961,6 +1962,7 @@ pub async fn run_ft4_decoder(
let base_freq_hz = state_rx.borrow().status.freq.hz as f64;
let abs_freq_hz = base_freq_hz + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms,
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -2110,6 +2112,7 @@ pub async fn run_ft2_decoder(
let base_freq_hz = state_rx.borrow().status.freq.hz as f64;
let abs_freq_hz = base_freq_hz + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms,
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -2260,6 +2263,7 @@ pub async fn run_wspr_decoder(
Err(_) => 0,
};
let msg = WsprMessage {
rig_id: None,
ts_ms,
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -2514,6 +2518,7 @@ async fn run_background_ft8_decoder(
for res in results {
let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms: current_timestamp_ms(),
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -2593,6 +2598,7 @@ async fn run_background_ft4_decoder(
for res in results {
let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms: current_timestamp_ms(),
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -2663,6 +2669,7 @@ async fn run_background_ft2_decoder(
for res in results {
let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms: current_timestamp_ms(),
snr_db: res.snr_db,
dt_s: res.dt_s,
@@ -2727,6 +2734,7 @@ async fn run_background_wspr_decoder(
Ok(results) => {
for res in results {
let msg = WsprMessage {
rig_id: None,
ts_ms: current_timestamp_ms(),
snr_db: res.snr_db,
dt_s: res.dt_s,