[feat](trx-frontend-http): per-rig bookmarks, scheduler, and decode filtering

Two-tier bookmark system: general bookmarks shared across all rigs plus
rig-specific bookmarks with scope picker in the Bookmarks tab. Scheduler
storage split into per-rig files with migration from legacy single file.
Decode history tagged with rig_id and filterable via ?remote= on
/decode/history endpoint. Decode SSE reconnects on rig switch to refresh
filtered history.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-24 20:24:49 +01:00
parent b50c6bca96
commit 55688a27b2
11 changed files with 436 additions and 156 deletions
+7 -7
View File
@@ -474,7 +474,7 @@ async fn async_init() -> DynResult<AppState> {
message.ts_ms = Some(current_timestamp_ms());
}
if let Ok(mut history) = ais_history.lock() {
history.push_back((now, message));
history.push_back((now, None, message));
}
}
DecodedMessage::Vdes(mut message) => {
@@ -482,7 +482,7 @@ async fn async_init() -> DynResult<AppState> {
message.ts_ms = Some(current_timestamp_ms());
}
if let Ok(mut history) = vdes_history.lock() {
history.push_back((now, message));
history.push_back((now, None, message));
}
}
DecodedMessage::Aprs(mut packet) => {
@@ -490,7 +490,7 @@ async fn async_init() -> DynResult<AppState> {
packet.ts_ms = Some(current_timestamp_ms());
}
if let Ok(mut history) = aprs_history.lock() {
history.push_back((now, packet));
history.push_back((now, None, packet));
}
}
DecodedMessage::HfAprs(mut packet) => {
@@ -498,17 +498,17 @@ async fn async_init() -> DynResult<AppState> {
packet.ts_ms = Some(current_timestamp_ms());
}
if let Ok(mut history) = hf_aprs_history.lock() {
history.push_back((now, packet));
history.push_back((now, None, packet));
}
}
DecodedMessage::Cw(event) => {
if let Ok(mut history) = cw_history.lock() {
history.push_back((now, event));
history.push_back((now, None, event));
}
}
DecodedMessage::Ft8(message) => {
if let Ok(mut history) = ft8_history.lock() {
history.push_back((now, message));
history.push_back((now, None, message));
}
}
DecodedMessage::Ft4(_) => {
@@ -519,7 +519,7 @@ async fn async_init() -> DynResult<AppState> {
}
DecodedMessage::Wspr(message) => {
if let Ok(mut history) = wspr_history.lock() {
history.push_back((now, message));
history.push_back((now, None, message));
}
}
}
+19 -18
View File
@@ -194,24 +194,25 @@ pub struct FrontendRuntimeContext {
pub audio_info: Option<watch::Receiver<Option<AudioStreamInfo>>>,
/// Decode message broadcast channel
pub decode_rx: Option<broadcast::Sender<DecodedMessage>>,
/// AIS decode history (timestamp, message)
pub ais_history: Arc<Mutex<VecDeque<(Instant, AisMessage)>>>,
/// VDES decode history (timestamp, message)
pub vdes_history: Arc<Mutex<VecDeque<(Instant, VdesMessage)>>>,
/// APRS decode history (timestamp, packet)
pub aprs_history: Arc<Mutex<VecDeque<(Instant, AprsPacket)>>>,
/// HF APRS decode history (timestamp, packet)
pub hf_aprs_history: Arc<Mutex<VecDeque<(Instant, AprsPacket)>>>,
/// CW decode history (timestamp, event)
pub cw_history: Arc<Mutex<VecDeque<(Instant, CwEvent)>>>,
/// FT8 decode history (timestamp, message)
pub ft8_history: Arc<Mutex<VecDeque<(Instant, Ft8Message)>>>,
/// FT4 decode history (timestamp, message)
pub ft4_history: Arc<Mutex<VecDeque<(Instant, Ft8Message)>>>,
/// FT2 decode history (timestamp, message)
pub ft2_history: Arc<Mutex<VecDeque<(Instant, Ft8Message)>>>,
/// WSPR decode history (timestamp, message)
pub wspr_history: Arc<Mutex<VecDeque<(Instant, WsprMessage)>>>,
/// Decode history entry: (record_time, rig_id, message).
/// AIS decode history
pub ais_history: Arc<Mutex<VecDeque<(Instant, Option<String>, AisMessage)>>>,
/// VDES decode history
pub vdes_history: Arc<Mutex<VecDeque<(Instant, Option<String>, VdesMessage)>>>,
/// APRS decode history
pub aprs_history: Arc<Mutex<VecDeque<(Instant, Option<String>, AprsPacket)>>>,
/// HF APRS decode history
pub hf_aprs_history: Arc<Mutex<VecDeque<(Instant, Option<String>, AprsPacket)>>>,
/// CW decode history
pub cw_history: Arc<Mutex<VecDeque<(Instant, Option<String>, CwEvent)>>>,
/// FT8 decode history
pub ft8_history: Arc<Mutex<VecDeque<(Instant, Option<String>, Ft8Message)>>>,
/// FT4 decode history
pub ft4_history: Arc<Mutex<VecDeque<(Instant, Option<String>, Ft8Message)>>>,
/// FT2 decode history
pub ft2_history: Arc<Mutex<VecDeque<(Instant, Option<String>, Ft8Message)>>>,
/// WSPR decode history
pub wspr_history: Arc<Mutex<VecDeque<(Instant, Option<String>, WsprMessage)>>>,
/// Authentication tokens for HTTP-JSON frontend
pub auth_tokens: HashSet<String>,
/// Active HTTP SSE clients (incremented on /events connect, decremented on disconnect).
@@ -973,6 +973,8 @@ function applyRigList(activeRigId, rigIds, displayNames) {
updateRigSubtitle(lastActiveRigId);
if (typeof setSchedulerRig === "function") setSchedulerRig(lastActiveRigId);
if (typeof setBackgroundDecodeRig === "function") setBackgroundDecodeRig(lastActiveRigId);
if (typeof bmPopulateScopePicker === "function") bmPopulateScopePicker();
if (typeof bmFetch === "function") bmFetch(document.getElementById("bm-category-filter")?.value || "");
updateMapRigFilter();
}
@@ -3498,6 +3500,9 @@ async function switchRigFromSelect(selectEl) {
updateRigSubtitle(lastActiveRigId);
if (typeof setSchedulerRig === "function") setSchedulerRig(lastActiveRigId);
if (typeof setBackgroundDecodeRig === "function") setBackgroundDecodeRig(lastActiveRigId);
if (typeof bmFetch === "function") bmFetch(document.getElementById("bm-category-filter")?.value || "");
// Reconnect decode SSE so history is re-fetched with the new rig filter.
connectDecode();
// Switch this session's rig and reconnect SSE to the new rig's
// state channel.
try {
@@ -8101,8 +8106,14 @@ function scheduleDecodeHistoryDrainStep(callback) {
}
}
function decodeHistoryUrl() {
let url = "/decode/history";
if (lastActiveRigId) url += "?remote=" + encodeURIComponent(lastActiveRigId);
return url;
}
function loadDecodeHistoryOnMainThread(onReady, onError) {
fetch("/decode/history").then(async (resp) => {
fetch(decodeHistoryUrl()).then(async (resp) => {
if (!resp.ok) return null;
setDecodeHistoryOverlayVisible(true, "Loading decode history…", "Receiving compressed history payload");
const payload = await resp.arrayBuffer();
@@ -8328,7 +8339,7 @@ function connectDecode() {
};
worker.postMessage({
type: "fetch-history",
url: "/decode/history",
url: decodeHistoryUrl(),
batchLimit: DECODE_HISTORY_WORKER_GROUP_LIMIT,
});
return true;
@@ -368,6 +368,9 @@
</div>
<div id="tab-bookmarks" class="tab-panel" style="display:none;">
<div class="bm-toolbar">
<select id="bm-scope-picker" class="status-input" aria-label="Bookmark scope">
<option value="general">General</option>
</select>
<select id="bm-category-filter" class="status-input" aria-label="Filter by category">
<option value="">All categories</option>
</select>
@@ -1,5 +1,14 @@
// --- Bookmarks Tab ---
/** Current bookmark scope: "general" or a rig remote name. */
let bmScope = "general";
/** Build the ?scope= query string for the current bookmark scope. */
function bmScopeParam(prefix) {
const sep = prefix ? "&" : "?";
return sep + "scope=" + encodeURIComponent(bmScope);
}
var bmList = [];
var bmRevision = 0;
let bmFilteredList = [];
@@ -37,9 +46,12 @@ function bmSyncAccess() {
async function bmFetch(categoryFilter) {
let url = "/bookmarks";
let hasQuery = false;
if (categoryFilter && categoryFilter !== "") {
url += "?category=" + encodeURIComponent(categoryFilter);
hasQuery = true;
}
url += bmScopeParam(hasQuery);
try {
const resp = await fetch(url);
if (!resp.ok) throw new Error("HTTP " + resp.status);
@@ -84,7 +96,7 @@ async function bmRefreshCategoryFilter(keepValue) {
const modeSel = document.getElementById("bm-mode-filter");
if (!sel && !modeSel) return;
try {
const resp = await fetch("/bookmarks");
const resp = await fetch("/bookmarks" + bmScopeParam(false));
if (!resp.ok) return;
const all = await resp.json();
if (sel) {
@@ -281,13 +293,13 @@ async function bmSave(e) {
try {
let resp;
if (id) {
resp = await fetch("/bookmarks/" + encodeURIComponent(id), {
resp = await fetch("/bookmarks/" + encodeURIComponent(id) + bmScopeParam(false), {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
} else {
resp = await fetch("/bookmarks", {
resp = await fetch("/bookmarks" + bmScopeParam(false), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
@@ -311,7 +323,7 @@ async function bmSave(e) {
async function bmDelete(id) {
if (!confirm("Delete this bookmark?")) return;
try {
const resp = await fetch("/bookmarks/" + encodeURIComponent(id), {
const resp = await fetch("/bookmarks/" + encodeURIComponent(id) + bmScopeParam(false), {
method: "DELETE",
});
if (!resp.ok) throw new Error("HTTP " + resp.status);
@@ -429,7 +441,7 @@ async function bmDeleteSelected() {
if (ids.length === 0) return;
if (!confirm(`Delete ${ids.length} selected bookmark${ids.length > 1 ? "s" : ""}?`)) return;
try {
const resp = await fetch("/bookmarks/batch_delete", {
const resp = await fetch("/bookmarks/batch_delete" + bmScopeParam(false), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ids }),
@@ -444,12 +456,45 @@ async function bmDeleteSelected() {
}
}
/** Populate the scope picker with "General" + one option per rig. */
function bmPopulateScopePicker() {
const picker = document.getElementById("bm-scope-picker");
if (!picker) return;
const rigIds = (typeof lastRigIds !== "undefined" && Array.isArray(lastRigIds)) ? lastRigIds : [];
const displayNames = (typeof lastRigDisplayNames !== "undefined") ? lastRigDisplayNames : {};
// Preserve current selection if still valid.
const prev = picker.value;
while (picker.options.length > 1) picker.remove(1);
rigIds.forEach((id) => {
const opt = document.createElement("option");
opt.value = id;
opt.textContent = displayNames[id] || id;
picker.appendChild(opt);
});
if (prev && (prev === "general" || rigIds.includes(prev))) {
picker.value = prev;
} else {
picker.value = "general";
}
bmScope = picker.value;
}
// --- Event wiring ---
(function initBookmarks() {
// Set initial button visibility (auth may already be resolved by the time
// scripts run if auth is disabled; otherwise bmFetch() will sync it).
bmSyncAccess();
// Scope picker
bmPopulateScopePicker();
const scopePicker = document.getElementById("bm-scope-picker");
if (scopePicker) {
scopePicker.addEventListener("change", (e) => {
bmScope = e.target.value;
bmFetch(document.getElementById("bm-category-filter")?.value || "");
});
}
// Refresh list and sync access when the Bookmarks tab is activated
document.querySelector(".tab-bar").addEventListener("click", (e) => {
const btn = e.target.closest('.tab[data-tab="bookmarks"]');
@@ -350,7 +350,7 @@ pub async fn events(
clients: web::Data<Arc<AtomicUsize>>,
context: web::Data<Arc<FrontendRuntimeContext>>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
bookmark_store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
bookmark_store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
scheduler_status: web::Data<crate::server::scheduler::SchedulerStatusMap>,
scheduler_control: web::Data<crate::server::scheduler::SharedSchedulerControlManager>,
session_rig_mgr: web::Data<Arc<SessionRigManager>>,
@@ -389,7 +389,7 @@ pub async fn events(
);
sync_scheduler_vchannels(
vchan_mgr.get_ref().as_ref(),
bookmark_store.get_ref().as_ref(),
bookmark_store_map.get_ref().as_ref(),
scheduler_status.get_ref(),
scheduler_control.get_ref().as_ref(),
rid,
@@ -423,7 +423,7 @@ pub async fn events(
let counter_updates = counter.clone();
let context_updates = context.get_ref().clone();
let vchan_updates = vchan_mgr.get_ref().clone();
let bookmark_store_updates = bookmark_store.get_ref().clone();
let bookmark_store_map_updates = bookmark_store_map.get_ref().clone();
let scheduler_status_updates = scheduler_status.get_ref().clone();
let scheduler_control_updates = scheduler_control.get_ref().clone();
let session_rig_mgr_updates = session_rig_mgr.get_ref().clone();
@@ -431,7 +431,7 @@ pub async fn events(
let counter = counter_updates.clone();
let context = context_updates.clone();
let vchan = vchan_updates.clone();
let bookmark_store = bookmark_store_updates.clone();
let bookmark_store_map = bookmark_store_map_updates.clone();
let scheduler_status = scheduler_status_updates.clone();
let scheduler_control = scheduler_control_updates.clone();
let session_rig_mgr = session_rig_mgr_updates.clone();
@@ -452,7 +452,7 @@ pub async fn events(
);
sync_scheduler_vchannels(
vchan.as_ref(),
bookmark_store.as_ref(),
bookmark_store_map.as_ref(),
&scheduler_status,
scheduler_control.as_ref(),
&rig_id,
@@ -536,7 +536,7 @@ pub async fn events(
fn sync_scheduler_vchannels(
vchan_mgr: &ClientChannelManager,
bookmark_store: &crate::server::bookmarks::BookmarkStore,
bookmark_store_map: &crate::server::bookmarks::BookmarkStoreMap,
scheduler_status: &crate::server::scheduler::SchedulerStatusMap,
scheduler_control: &crate::server::scheduler::SchedulerControlManager,
rig_id: &str,
@@ -555,7 +555,7 @@ fn sync_scheduler_vchannels(
.last_bookmark_ids
.iter()
.filter_map(|bookmark_id| {
bookmark_store.get(bookmark_id).map(|bookmark| {
bookmark_store_map.get_for_rig(rig_id, bookmark_id).map(|bookmark| {
(
bookmark_id.clone(),
bookmark.freq_hz,
@@ -600,17 +600,18 @@ impl DecodeHistoryPayload {
}
/// Build the grouped decode history payload from all per-decoder ring-buffers.
fn collect_decode_history(context: &FrontendRuntimeContext) -> DecodeHistoryPayload {
/// When `rig_filter` is `Some`, only entries recorded for that rig are included.
fn collect_decode_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> DecodeHistoryPayload {
DecodeHistoryPayload {
ais: crate::server::audio::snapshot_ais_history(context),
vdes: crate::server::audio::snapshot_vdes_history(context),
aprs: crate::server::audio::snapshot_aprs_history(context),
hf_aprs: crate::server::audio::snapshot_hf_aprs_history(context),
cw: crate::server::audio::snapshot_cw_history(context),
ft8: crate::server::audio::snapshot_ft8_history(context),
ft4: crate::server::audio::snapshot_ft4_history(context),
ft2: crate::server::audio::snapshot_ft2_history(context),
wspr: crate::server::audio::snapshot_wspr_history(context),
ais: crate::server::audio::snapshot_ais_history(context, rig_filter),
vdes: crate::server::audio::snapshot_vdes_history(context, rig_filter),
aprs: crate::server::audio::snapshot_aprs_history(context, rig_filter),
hf_aprs: crate::server::audio::snapshot_hf_aprs_history(context, rig_filter),
cw: crate::server::audio::snapshot_cw_history(context, rig_filter),
ft8: crate::server::audio::snapshot_ft8_history(context, rig_filter),
ft4: crate::server::audio::snapshot_ft4_history(context, rig_filter),
ft2: crate::server::audio::snapshot_ft2_history(context, rig_filter),
wspr: crate::server::audio::snapshot_wspr_history(context, rig_filter),
}
}
@@ -700,11 +701,15 @@ fn gzip_bytes(payload: &[u8]) -> std::io::Result<Vec<u8>> {
/// not block real-time messages: the client fetches this endpoint in parallel
/// with opening the SSE connection and drains it in the background.
#[get("/decode/history")]
pub async fn decode_history(context: web::Data<Arc<FrontendRuntimeContext>>) -> impl Responder {
pub async fn decode_history(
context: web::Data<Arc<FrontendRuntimeContext>>,
query: web::Query<RemoteQuery>,
) -> impl Responder {
if context.decode_rx.is_none() {
return HttpResponse::NotFound().body("decode not enabled");
}
let history = collect_decode_history(context.get_ref());
let rig_filter = query.remote.as_deref().filter(|s| !s.is_empty());
let history = collect_decode_history(context.get_ref(), rig_filter);
let cbor = match encode_decode_history_cbor(&history) {
Ok(cbor) => cbor,
Err(err) => {
@@ -1416,6 +1421,28 @@ pub async fn clear_cw_decode(
#[derive(serde::Deserialize)]
pub struct BookmarkQuery {
pub category: Option<String>,
/// `"general"` for the shared store, or a rig remote name for
/// the per-rig store. Omitting defaults to the general store.
pub scope: Option<String>,
}
/// Resolve which `BookmarkStore` to use based on the `scope` parameter.
///
/// - `scope` absent or `"general"` → general store
/// - `scope` = `"{remote}"` → per-rig store for that remote
fn resolve_bookmark_store(
scope: Option<&str>,
store_map: &crate::server::bookmarks::BookmarkStoreMap,
) -> std::sync::Arc<crate::server::bookmarks::BookmarkStore> {
match scope.filter(|s| !s.is_empty() && *s != "general") {
Some(remote) => store_map.store_for(remote),
None => store_map.general().clone(),
}
}
#[derive(serde::Deserialize)]
pub struct BookmarkScopeQuery {
pub scope: Option<String>,
}
#[derive(serde::Deserialize)]
@@ -1481,7 +1508,7 @@ where
#[get("/bookmarks")]
pub async fn list_bookmarks(
req: HttpRequest,
store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
query: web::Query<BookmarkQuery>,
) -> Result<HttpResponse, Error> {
if request_accepts_html(&req) {
@@ -1490,6 +1517,7 @@ pub async fn list_bookmarks(
status::index_html(),
));
}
let store = resolve_bookmark_store(query.scope.as_deref(), store_map.get_ref());
let mut list = store.list();
if let Some(ref cat) = query.category {
if !cat.is_empty() {
@@ -1504,11 +1532,13 @@ pub async fn list_bookmarks(
#[post("/bookmarks")]
pub async fn create_bookmark(
req: HttpRequest,
store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
query: web::Query<BookmarkScopeQuery>,
body: web::Json<BookmarkInput>,
auth_state: web::Data<crate::server::auth::AuthState>,
) -> Result<HttpResponse, Error> {
require_control(&req, &auth_state)?;
let store = resolve_bookmark_store(query.scope.as_deref(), store_map.get_ref());
if store.freq_taken(body.freq_hz, None) {
return Err(actix_web::error::ErrorConflict(
"a bookmark for that frequency already exists",
@@ -1538,11 +1568,13 @@ pub async fn create_bookmark(
pub async fn update_bookmark(
req: HttpRequest,
path: web::Path<String>,
store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
query: web::Query<BookmarkScopeQuery>,
body: web::Json<BookmarkInput>,
auth_state: web::Data<crate::server::auth::AuthState>,
) -> Result<HttpResponse, Error> {
require_control(&req, &auth_state)?;
let store = resolve_bookmark_store(query.scope.as_deref(), store_map.get_ref());
let id = path.into_inner();
if store.freq_taken(body.freq_hz, Some(&id)) {
return Err(actix_web::error::ErrorConflict(
@@ -1571,10 +1603,12 @@ pub async fn update_bookmark(
pub async fn delete_bookmark(
req: HttpRequest,
path: web::Path<String>,
store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
query: web::Query<BookmarkScopeQuery>,
auth_state: web::Data<crate::server::auth::AuthState>,
) -> Result<HttpResponse, Error> {
require_control(&req, &auth_state)?;
let store = resolve_bookmark_store(query.scope.as_deref(), store_map.get_ref());
let id = path.into_inner();
if store.remove(&id) {
Ok(HttpResponse::Ok().json(serde_json::json!({ "deleted": true })))
@@ -1592,10 +1626,12 @@ struct BatchDeleteRequest {
pub async fn batch_delete_bookmarks(
req: HttpRequest,
body: web::Json<BatchDeleteRequest>,
store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
query: web::Query<BookmarkScopeQuery>,
auth_state: web::Data<crate::server::auth::AuthState>,
) -> Result<HttpResponse, Error> {
require_control(&req, &auth_state)?;
let store = resolve_bookmark_store(query.scope.as_deref(), store_map.get_ref());
let mut deleted = 0usize;
for id in &body.ids {
if store.remove(id) {
@@ -1761,7 +1797,7 @@ pub async fn subscribe_channel(
body: web::Json<SubscribeBody>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
bookmark_store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
bookmark_store_map: web::Data<Arc<crate::server::bookmarks::BookmarkStoreMap>>,
scheduler_control: web::Data<crate::server::scheduler::SharedSchedulerControlManager>,
) -> impl Responder {
let body = body.into_inner();
@@ -1776,7 +1812,7 @@ pub async fn subscribe_channel(
rig_tx.get_ref(),
&remote,
&selected,
bookmark_store.get_ref().as_ref(),
bookmark_store_map.get_ref().as_ref(),
)
.await
{
@@ -2248,7 +2284,7 @@ async fn apply_selected_channel(
rig_tx: &mpsc::Sender<RigRequest>,
remote: &str,
channel: &crate::server::vchan::SelectedChannel,
bookmark_store: &crate::server::bookmarks::BookmarkStore,
bookmark_store_map: &crate::server::bookmarks::BookmarkStoreMap,
) -> Result<(), Error> {
send_command_to_rig(
rig_tx,
@@ -2278,7 +2314,7 @@ async fn apply_selected_channel(
let Some(bookmark_id) = channel.scheduler_bookmark_id.as_deref() else {
return Ok(());
};
let Some(bookmark) = bookmark_store.get(bookmark_id) else {
let Some(bookmark) = bookmark_store_map.get_for_rig(remote, bookmark_id) else {
return Ok(());
};
let (want_aprs, want_hf_aprs, want_ft8, want_ft4, want_ft2, want_wspr) =
@@ -59,10 +59,10 @@ fn decode_history_cutoff(context: &FrontendRuntimeContext) -> Instant {
fn prune_aprs_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, AprsPacket)>,
history: &mut VecDeque<(Instant, Option<String>, AprsPacket)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -72,10 +72,10 @@ fn prune_aprs_history(
fn prune_hf_aprs_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, AprsPacket)>,
history: &mut VecDeque<(Instant, Option<String>, AprsPacket)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -85,10 +85,10 @@ fn prune_hf_aprs_history(
fn prune_ais_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, AisMessage)>,
history: &mut VecDeque<(Instant, Option<String>, AisMessage)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -98,10 +98,10 @@ fn prune_ais_history(
fn prune_vdes_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, VdesMessage)>,
history: &mut VecDeque<(Instant, Option<String>, VdesMessage)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -109,6 +109,14 @@ fn prune_vdes_history(
}
}
fn active_rig_id(context: &FrontendRuntimeContext) -> Option<String> {
context
.remote_active_rig_id
.lock()
.ok()
.and_then(|g| g.clone())
}
fn record_ais(context: &FrontendRuntimeContext, mut msg: AisMessage) {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
@@ -117,7 +125,7 @@ fn record_ais(context: &FrontendRuntimeContext, mut msg: AisMessage) {
.ais_history
.lock()
.expect("ais history mutex poisoned");
history.push_back((Instant::now(), msg));
history.push_back((Instant::now(), active_rig_id(context), msg));
prune_ais_history(context, &mut history);
}
@@ -129,13 +137,13 @@ fn record_vdes(context: &FrontendRuntimeContext, mut msg: VdesMessage) {
.vdes_history
.lock()
.expect("vdes history mutex poisoned");
history.push_back((Instant::now(), msg));
history.push_back((Instant::now(), active_rig_id(context), msg));
prune_vdes_history(context, &mut history);
}
fn prune_cw_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, CwEvent)>) {
fn prune_cw_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(Instant, Option<String>, CwEvent)>) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -145,10 +153,10 @@ fn prune_cw_history(context: &FrontendRuntimeContext, history: &mut VecDeque<(In
fn prune_ft8_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, Ft8Message)>,
history: &mut VecDeque<(Instant, Option<String>, Ft8Message)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -158,10 +166,10 @@ fn prune_ft8_history(
fn prune_ft4_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, Ft8Message)>,
history: &mut VecDeque<(Instant, Option<String>, Ft8Message)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -171,10 +179,10 @@ fn prune_ft4_history(
fn prune_ft2_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, Ft8Message)>,
history: &mut VecDeque<(Instant, Option<String>, Ft8Message)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -184,10 +192,10 @@ fn prune_ft2_history(
fn prune_wspr_history(
context: &FrontendRuntimeContext,
history: &mut VecDeque<(Instant, WsprMessage)>,
history: &mut VecDeque<(Instant, Option<String>, WsprMessage)>,
) {
let cutoff = decode_history_cutoff(context);
while let Some((ts, _)) = history.front() {
while let Some((ts, _, _)) = history.front() {
if *ts >= cutoff {
break;
}
@@ -203,7 +211,7 @@ fn record_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
.aprs_history
.lock()
.expect("aprs history mutex poisoned");
history.push_back((Instant::now(), pkt));
history.push_back((Instant::now(), active_rig_id(context), pkt));
prune_aprs_history(context, &mut history);
}
@@ -215,7 +223,7 @@ fn record_hf_aprs(context: &FrontendRuntimeContext, mut pkt: AprsPacket) {
.hf_aprs_history
.lock()
.expect("hf_aprs history mutex poisoned");
history.push_back((Instant::now(), pkt));
history.push_back((Instant::now(), active_rig_id(context), pkt));
prune_hf_aprs_history(context, &mut history);
}
@@ -224,7 +232,7 @@ fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) {
.cw_history
.lock()
.expect("cw history mutex poisoned");
history.push_back((Instant::now(), event));
history.push_back((Instant::now(), active_rig_id(context), event));
prune_cw_history(context, &mut history);
}
@@ -233,7 +241,7 @@ fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) {
.ft8_history
.lock()
.expect("ft8 history mutex poisoned");
history.push_back((Instant::now(), msg));
history.push_back((Instant::now(), active_rig_id(context), msg));
prune_ft8_history(context, &mut history);
}
@@ -242,7 +250,7 @@ fn record_ft4(context: &FrontendRuntimeContext, msg: Ft8Message) {
.ft4_history
.lock()
.expect("ft4 history mutex poisoned");
history.push_back((Instant::now(), msg));
history.push_back((Instant::now(), active_rig_id(context), msg));
prune_ft4_history(context, &mut history);
}
@@ -251,7 +259,7 @@ fn record_ft2(context: &FrontendRuntimeContext, msg: Ft8Message) {
.ft2_history
.lock()
.expect("ft2 history mutex poisoned");
history.push_back((Instant::now(), msg));
history.push_back((Instant::now(), active_rig_id(context), msg));
prune_ft2_history(context, &mut history);
}
@@ -260,26 +268,41 @@ fn record_wspr(context: &FrontendRuntimeContext, msg: WsprMessage) {
.wspr_history
.lock()
.expect("wspr history mutex poisoned");
history.push_back((Instant::now(), msg));
history.push_back((Instant::now(), active_rig_id(context), msg));
prune_wspr_history(context, &mut history);
}
pub fn snapshot_aprs_history(context: &FrontendRuntimeContext) -> Vec<AprsPacket> {
/// Returns `true` if the entry's rig_id matches the optional filter.
/// `None` filter means "all rigs".
fn matches_rig_filter(entry_rig: Option<&str>, filter: Option<&str>) -> bool {
match filter {
None => true,
Some(f) => entry_rig == Some(f),
}
}
pub fn snapshot_aprs_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<AprsPacket> {
let mut history = context
.aprs_history
.lock()
.expect("aprs history mutex poisoned");
prune_aprs_history(context, &mut history);
history.iter().map(|(_, pkt)| pkt.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, pkt)| pkt.clone())
.collect()
}
pub fn snapshot_hf_aprs_history(context: &FrontendRuntimeContext) -> Vec<AprsPacket> {
pub fn snapshot_hf_aprs_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<AprsPacket> {
let mut history = context
.hf_aprs_history
.lock()
.expect("hf_aprs history mutex poisoned");
prune_hf_aprs_history(context, &mut history);
history.iter().map(|(_, pkt)| pkt.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, pkt)| pkt.clone())
.collect()
}
/// Return the latest message per MMSI seen within the retention window.
@@ -289,7 +312,7 @@ pub fn snapshot_hf_aprs_history(context: &FrontendRuntimeContext) -> Vec<AprsPac
/// what the map shows (current position/state) and keeps the response compact.
/// The returned vec is sorted ascending by `ts_ms` so the client can replay
/// in chronological order.
pub fn snapshot_ais_history(context: &FrontendRuntimeContext) -> Vec<AisMessage> {
pub fn snapshot_ais_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<AisMessage> {
let mut history = context
.ais_history
.lock()
@@ -298,66 +321,86 @@ pub fn snapshot_ais_history(context: &FrontendRuntimeContext) -> Vec<AisMessage>
// Iterate oldest-first; later entries overwrite earlier ones so the
// HashMap always holds the newest message per MMSI.
let mut latest: HashMap<u32, AisMessage> = HashMap::new();
for (_, msg) in history.iter() {
latest.insert(msg.mmsi, msg.clone());
for (_, rid, msg) in history.iter() {
if matches_rig_filter(rid.as_deref(), rig_filter) {
latest.insert(msg.mmsi, msg.clone());
}
}
let mut out: Vec<AisMessage> = latest.into_values().collect();
out.sort_by_key(|m| m.ts_ms.unwrap_or(0));
out
}
pub fn snapshot_vdes_history(context: &FrontendRuntimeContext) -> Vec<VdesMessage> {
pub fn snapshot_vdes_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<VdesMessage> {
let mut history = context
.vdes_history
.lock()
.expect("vdes history mutex poisoned");
prune_vdes_history(context, &mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, msg)| msg.clone())
.collect()
}
pub fn snapshot_cw_history(context: &FrontendRuntimeContext) -> Vec<CwEvent> {
pub fn snapshot_cw_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<CwEvent> {
let mut history = context
.cw_history
.lock()
.expect("cw history mutex poisoned");
prune_cw_history(context, &mut history);
history.iter().map(|(_, evt)| evt.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, evt)| evt.clone())
.collect()
}
pub fn snapshot_ft8_history(context: &FrontendRuntimeContext) -> Vec<Ft8Message> {
pub fn snapshot_ft8_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<Ft8Message> {
let mut history = context
.ft8_history
.lock()
.expect("ft8 history mutex poisoned");
prune_ft8_history(context, &mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, msg)| msg.clone())
.collect()
}
pub fn snapshot_ft4_history(context: &FrontendRuntimeContext) -> Vec<Ft8Message> {
pub fn snapshot_ft4_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<Ft8Message> {
let mut history = context
.ft4_history
.lock()
.expect("ft4 history mutex poisoned");
prune_ft4_history(context, &mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, msg)| msg.clone())
.collect()
}
pub fn snapshot_ft2_history(context: &FrontendRuntimeContext) -> Vec<Ft8Message> {
pub fn snapshot_ft2_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<Ft8Message> {
let mut history = context
.ft2_history
.lock()
.expect("ft2 history mutex poisoned");
prune_ft2_history(context, &mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, msg)| msg.clone())
.collect()
}
pub fn snapshot_wspr_history(context: &FrontendRuntimeContext) -> Vec<WsprMessage> {
pub fn snapshot_wspr_history(context: &FrontendRuntimeContext, rig_filter: Option<&str>) -> Vec<WsprMessage> {
let mut history = context
.wspr_history
.lock()
.expect("wspr history mutex poisoned");
prune_wspr_history(context, &mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
history.iter()
.filter(|(_, rid, _)| matches_rig_filter(rid.as_deref(), rig_filter))
.map(|(_, _, msg)| msg.clone())
.collect()
}
pub fn clear_aprs_history(context: &FrontendRuntimeContext) {
@@ -17,7 +17,7 @@ use tracing::warn;
use trx_frontend::{FrontendRuntimeContext, SharedSpectrum, VChanAudioCmd};
use uuid::Uuid;
use crate::server::bookmarks::{Bookmark, BookmarkStore};
use crate::server::bookmarks::{Bookmark, BookmarkStoreMap};
use crate::server::scheduler::{SchedulerStatusMap, SharedSchedulerControlManager};
use crate::server::vchan::{ClientChannel, ClientChannelManager};
@@ -133,7 +133,7 @@ impl BackgroundDecodeStore {
pub struct BackgroundDecodeManager {
store: Arc<BackgroundDecodeStore>,
bookmarks: Arc<BookmarkStore>,
bookmarks: Arc<BookmarkStoreMap>,
context: Arc<FrontendRuntimeContext>,
scheduler_status: SchedulerStatusMap,
scheduler_control: SharedSchedulerControlManager,
@@ -145,7 +145,7 @@ pub struct BackgroundDecodeManager {
impl BackgroundDecodeManager {
pub fn new(
store: Arc<BackgroundDecodeStore>,
bookmarks: Arc<BookmarkStore>,
bookmarks: Arc<BookmarkStoreMap>,
context: Arc<FrontendRuntimeContext>,
scheduler_status: SchedulerStatusMap,
scheduler_control: SharedSchedulerControlManager,
@@ -206,7 +206,7 @@ impl BackgroundDecodeManager {
let cfg = self.get_config(rig_id);
let bookmarks: HashMap<String, Bookmark> = self
.bookmarks
.list()
.list_for_rig(rig_id)
.into_iter()
.map(|bookmark| (bookmark.id.clone(), bookmark))
.collect();
@@ -346,7 +346,7 @@ impl BackgroundDecodeManager {
};
let selected_bookmarks: HashMap<String, Bookmark> = self
.bookmarks
.list()
.list_for_rig(&rig_id)
.into_iter()
.filter(|bookmark| selected.iter().any(|id| id == &bookmark.id))
.map(|bookmark| (bookmark.id.clone(), bookmark))
@@ -2,8 +2,9 @@
//
// SPDX-License-Identifier: BSD-2-Clause
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use pickledb::{PickleDb, PickleDbDumpPolicy, SerializationMethod};
use serde::{Deserialize, Serialize};
@@ -57,14 +58,20 @@ impl BookmarkStore {
}
}
/// Returns the platform default path: `~/.config/trx-rs/bookmarks.db`.
/// Falls back to `./bookmarks.db` when the config dir is unavailable.
pub fn default_path() -> PathBuf {
/// General (shared) bookmarks path: `~/.config/trx-rs/bookmarks.db`.
pub fn general_path() -> PathBuf {
dirs::config_dir()
.map(|p| p.join("trx-rs").join("bookmarks.db"))
.unwrap_or_else(|| PathBuf::from("bookmarks.db"))
}
/// Per-rig bookmarks path: `~/.config/trx-rs/bookmark.{remote}.db`.
pub fn rig_path(remote: &str) -> PathBuf {
dirs::config_dir()
.map(|p| p.join("trx-rs").join(format!("bookmark.{remote}.db")))
.unwrap_or_else(|| PathBuf::from(format!("bookmark.{remote}.db")))
}
pub fn list(&self) -> Vec<Bookmark> {
let db = self.db.read().unwrap_or_else(|e| e.into_inner());
db.iter()
@@ -113,3 +120,60 @@ impl BookmarkStore {
.any(|bm| bm.freq_hz == freq_hz && exclude_id.is_none_or(|ex| bm.id != ex))
}
}
/// Two-tier bookmark storage: a shared **general** store (`bookmarks.db`)
/// and lazily-opened per-rig stores (`bookmark.{remote}.db`).
pub struct BookmarkStoreMap {
general: Arc<BookmarkStore>,
rig_stores: Mutex<HashMap<String, Arc<BookmarkStore>>>,
}
impl BookmarkStoreMap {
pub fn new() -> Self {
let general_path = BookmarkStore::general_path();
Self {
general: Arc::new(BookmarkStore::open(&general_path)),
rig_stores: Mutex::new(HashMap::new()),
}
}
/// The shared general bookmark store.
pub fn general(&self) -> &Arc<BookmarkStore> {
&self.general
}
/// Return the per-rig store for `remote`, opening it on first access.
pub fn store_for(&self, remote: &str) -> Arc<BookmarkStore> {
let mut stores = self.rig_stores.lock().unwrap_or_else(|e| e.into_inner());
stores
.entry(remote.to_owned())
.or_insert_with(|| {
let path = BookmarkStore::rig_path(remote);
Arc::new(BookmarkStore::open(&path))
})
.clone()
}
/// Look up a bookmark by id, checking the rig-specific store first,
/// then falling back to the general store.
pub fn get_for_rig(&self, remote: &str, id: &str) -> Option<Bookmark> {
self.store_for(remote)
.get(id)
.or_else(|| self.general.get(id))
}
/// List all bookmarks visible to `remote`: rig-specific bookmarks merged
/// with general bookmarks (rig-specific wins on duplicate IDs).
pub fn list_for_rig(&self, remote: &str) -> Vec<Bookmark> {
let mut map: HashMap<String, Bookmark> = self
.general
.list()
.into_iter()
.map(|bm| (bm.id.clone(), bm))
.collect();
for bm in self.store_for(remote).list() {
map.insert(bm.id.clone(), bm);
}
map.into_values().collect()
}
}
@@ -27,7 +27,7 @@ use trx_core::rig::command::RigCommand;
use trx_core::RigRequest;
use trx_frontend::FrontendRuntimeContext;
use crate::server::bookmarks::BookmarkStore;
use crate::server::bookmarks::BookmarkStoreMap;
// ============================================================================
// Data model
@@ -141,30 +141,91 @@ impl SchedulerStore {
}
}
pub fn default_path() -> PathBuf {
/// Per-rig path: `~/.config/trx-rs/scheduler.{remote}.db`.
pub fn default_path_for(remote: &str) -> PathBuf {
dirs::config_dir()
.map(|p| p.join("trx-rs").join(format!("scheduler.{remote}.db")))
.unwrap_or_else(|| PathBuf::from(format!("scheduler.{remote}.db")))
}
/// Legacy (pre-per-rig) path.
pub fn legacy_path() -> PathBuf {
dirs::config_dir()
.map(|p| p.join("trx-rs").join("scheduler.db"))
.unwrap_or_else(|| PathBuf::from("scheduler.db"))
}
pub fn get(&self, remote: &str) -> Option<SchedulerConfig> {
pub fn get_config(&self) -> Option<SchedulerConfig> {
let db = self.db.read().unwrap_or_else(|e| e.into_inner());
db.get::<SchedulerConfig>(&format!("sch:{remote}"))
db.get::<SchedulerConfig>("config")
}
pub fn upsert(&self, config: &SchedulerConfig) -> bool {
pub fn upsert_config(&self, config: &SchedulerConfig) -> bool {
let mut db = self.db.write().unwrap_or_else(|e| e.into_inner());
db.set(&format!("sch:{}", config.remote), config).is_ok()
db.set("config", config).is_ok()
}
pub fn remove(&self, remote: &str) -> bool {
pub fn remove_config(&self) -> bool {
let mut db = self.db.write().unwrap_or_else(|e| e.into_inner());
db.rem(&format!("sch:{remote}")).unwrap_or(false)
db.rem("config").unwrap_or(false)
}
}
/// Manages per-rig scheduler stores, lazily opening them on first access.
pub struct SchedulerStoreMap {
stores: std::sync::Mutex<HashMap<String, Arc<SchedulerStore>>>,
}
impl SchedulerStoreMap {
/// Create a new map and run one-time migration from the legacy shared
/// `scheduler.db` if per-rig files do not yet exist.
pub fn new(rig_ids: &[&str]) -> Self {
let map = Self {
stores: std::sync::Mutex::new(HashMap::new()),
};
map.migrate_legacy(rig_ids);
map
}
/// Return the store for `remote`, opening it on first access.
pub fn store_for(&self, remote: &str) -> Arc<SchedulerStore> {
let mut stores = self.stores.lock().unwrap_or_else(|e| e.into_inner());
stores
.entry(remote.to_owned())
.or_insert_with(|| {
let path = SchedulerStore::default_path_for(remote);
Arc::new(SchedulerStore::open(&path))
})
.clone()
}
/// List configs from all known per-rig stores.
pub fn list_all(&self) -> Vec<SchedulerConfig> {
let db = self.db.read().unwrap_or_else(|e| e.into_inner());
db.iter()
let stores = self.stores.lock().unwrap_or_else(|e| e.into_inner());
stores
.values()
.filter_map(|s| s.get_config())
.collect()
}
/// One-time migration: extract `sch:{remote}` entries from legacy
/// `scheduler.db` into per-rig files.
fn migrate_legacy(&self, rig_ids: &[&str]) {
let legacy = SchedulerStore::legacy_path();
if !legacy.exists() || rig_ids.is_empty() {
return;
}
let any_exists = rig_ids
.iter()
.any(|id| SchedulerStore::default_path_for(id).exists());
if any_exists {
return;
}
info!("migrating legacy scheduler.db to per-rig files");
let legacy_store = SchedulerStore::open(&legacy);
let db = legacy_store.db.read().unwrap_or_else(|e| e.into_inner());
let configs: Vec<SchedulerConfig> = db
.iter()
.filter_map(|kv| {
if kv.get_key().starts_with("sch:") {
kv.get_value::<SchedulerConfig>()
@@ -172,7 +233,16 @@ impl SchedulerStore {
None
}
})
.collect()
.collect();
drop(db);
for config in &configs {
let store = self.store_for(&config.remote);
store.upsert_config(config);
info!(" migrated scheduler config for '{}'", config.remote);
}
let mut migrated = legacy.clone();
migrated.set_extension("db.migrated");
let _ = std::fs::rename(&legacy, &migrated);
}
}
@@ -424,19 +494,19 @@ async fn apply_scheduler_target(
rig_tx: &mpsc::Sender<RigRequest>,
remote: &str,
status_map: &SchedulerStatusMap,
bookmarks: &BookmarkStore,
bookmarks: &BookmarkStoreMap,
entry_id: Option<&str>,
bookmark_id: &str,
center_hz: Option<u64>,
extra_bm_ids: &[String],
) -> Result<SchedulerStatus, String> {
let bookmark = bookmarks
.get(bookmark_id)
.get_for_rig(remote, bookmark_id)
.ok_or_else(|| format!("bookmark '{bookmark_id}' not found for remote '{remote}'"))?;
let extra_bookmarks: Vec<_> = extra_bm_ids
.iter()
.filter_map(|id| bookmarks.get(id))
.filter_map(|id| bookmarks.get_for_rig(remote, id))
.collect();
if let Some(chz) = center_hz {
@@ -566,8 +636,8 @@ pub type SharedSchedulerControlManager = Arc<SchedulerControlManager>;
pub fn spawn_scheduler_task(
_context: Arc<FrontendRuntimeContext>,
rig_tx: mpsc::Sender<RigRequest>,
store: Arc<SchedulerStore>,
bookmarks: Arc<BookmarkStore>,
store: Arc<SchedulerStoreMap>,
bookmarks: Arc<BookmarkStoreMap>,
status_map: SchedulerStatusMap,
control: SharedSchedulerControlManager,
) {
@@ -637,7 +707,7 @@ pub fn spawn_scheduler_task(
continue;
}
let Some(bm) = bookmarks.get(&bm_id) else {
let Some(bm) = bookmarks.get_for_rig(&config.remote, &bm_id) else {
warn!(
"scheduler: bookmark '{}' not found for remote '{}'",
bm_id, config.remote
@@ -734,7 +804,7 @@ async fn apply_last_scheduler_cycle(
rig_tx: &mpsc::Sender<RigRequest>,
remote: &str,
status_map: &SchedulerStatusMap,
bookmarks: &BookmarkStore,
bookmarks: &BookmarkStoreMap,
) {
let status = {
let Ok(map) = status_map.read() else {
@@ -749,7 +819,7 @@ async fn apply_last_scheduler_cycle(
let Some(bookmark_id) = status.last_bookmark_id else {
return;
};
if bookmarks.get(&bookmark_id).is_none() {
if bookmarks.get_for_rig(remote, &bookmark_id).is_none() {
warn!(
"scheduler: last bookmark '{}' not found for remote '{}'",
bookmark_id, remote
@@ -801,10 +871,10 @@ async fn scheduler_send(
#[get("/scheduler/{remote}")]
pub async fn get_scheduler(
path: web::Path<String>,
store: web::Data<Arc<SchedulerStore>>,
store_map: web::Data<Arc<SchedulerStoreMap>>,
) -> impl Responder {
let remote = path.into_inner();
let config = store.get(&remote).unwrap_or(SchedulerConfig {
let config = store_map.store_for(&remote).get_config().unwrap_or(SchedulerConfig {
remote: remote.clone(),
mode: SchedulerMode::Disabled,
grayline: None,
@@ -819,12 +889,12 @@ pub async fn get_scheduler(
pub async fn put_scheduler(
path: web::Path<String>,
body: web::Json<SchedulerConfig>,
store: web::Data<Arc<SchedulerStore>>,
store_map: web::Data<Arc<SchedulerStoreMap>>,
) -> impl Responder {
let remote = path.into_inner();
let mut config = body.into_inner();
config.remote = remote;
if store.upsert(&config) {
config.remote = remote.clone();
if store_map.store_for(&remote).upsert_config(&config) {
HttpResponse::Ok().json(config)
} else {
HttpResponse::InternalServerError().body("failed to save scheduler config")
@@ -835,10 +905,10 @@ pub async fn put_scheduler(
#[delete("/scheduler/{remote}")]
pub async fn delete_scheduler(
path: web::Path<String>,
store: web::Data<Arc<SchedulerStore>>,
store_map: web::Data<Arc<SchedulerStoreMap>>,
) -> impl Responder {
let remote = path.into_inner();
store.remove(&remote);
store_map.store_for(&remote).remove_config();
HttpResponse::Ok().json(serde_json::json!({ "deleted": true }))
}
@@ -864,13 +934,13 @@ pub struct SchedulerActivateEntryRequest {
pub async fn put_scheduler_activate_entry(
path: web::Path<String>,
body: web::Json<SchedulerActivateEntryRequest>,
store: web::Data<Arc<SchedulerStore>>,
store_map: web::Data<Arc<SchedulerStoreMap>>,
status_map: web::Data<SchedulerStatusMap>,
bookmarks: web::Data<Arc<BookmarkStore>>,
bookmarks: web::Data<Arc<BookmarkStoreMap>>,
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> impl Responder {
let rig_id = path.into_inner();
let Some(config) = store.get(&rig_id) else {
let Some(config) = store_map.store_for(&rig_id).get_config() else {
return HttpResponse::NotFound().body("scheduler config not found");
};
if config.mode != SchedulerMode::TimeSpan {
@@ -929,7 +999,7 @@ pub async fn put_scheduler_control(
control: web::Data<SharedSchedulerControlManager>,
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
status_map: web::Data<SchedulerStatusMap>,
bookmarks: web::Data<Arc<BookmarkStore>>,
bookmarks: web::Data<Arc<BookmarkStoreMap>>,
) -> impl Responder {
let body = body.into_inner();
let summary = control.set_released(body.session_id, body.released);
@@ -40,7 +40,7 @@ use trx_frontend::{FrontendRuntimeContext, FrontendSpawner};
use auth::{AuthConfig, AuthState, SameSite};
use background_decode::{BackgroundDecodeManager, BackgroundDecodeStore};
use scheduler::{SchedulerControlManager, SchedulerStatusMap, SchedulerStore};
use scheduler::{SchedulerControlManager, SchedulerStatusMap, SchedulerStoreMap};
use vchan::ClientChannelManager;
/// HTTP frontend implementation.
@@ -71,10 +71,17 @@ async fn serve(
) -> Result<(), actix_web::Error> {
audio::start_decode_history_collector(context.clone());
let scheduler_path = SchedulerStore::default_path();
let scheduler_store = Arc::new(SchedulerStore::open(&scheduler_path));
let bookmark_path = bookmarks::BookmarkStore::default_path();
let bookmark_store = Arc::new(bookmarks::BookmarkStore::open(&bookmark_path));
// Collect rig IDs for per-rig store initialisation / migration.
let rig_ids: Vec<String> = context
.remote_rigs
.lock()
.unwrap_or_else(|e| e.into_inner())
.iter()
.map(|r| r.rig_id.clone())
.collect();
let rig_id_refs: Vec<&str> = rig_ids.iter().map(String::as_str).collect();
let scheduler_store = Arc::new(SchedulerStoreMap::new(&rig_id_refs));
let bookmark_store_map = Arc::new(bookmarks::BookmarkStoreMap::new());
let scheduler_status: SchedulerStatusMap = Arc::new(RwLock::new(HashMap::new()));
let scheduler_control = Arc::new(SchedulerControlManager::default());
@@ -82,7 +89,7 @@ async fn serve(
context.clone(),
rig_tx.clone(),
scheduler_store.clone(),
bookmark_store.clone(),
bookmark_store_map.clone(),
scheduler_status.clone(),
scheduler_control.clone(),
);
@@ -96,7 +103,7 @@ async fn serve(
let session_rig_mgr = Arc::new(api::SessionRigManager::default());
let background_decode_mgr = BackgroundDecodeManager::new(
background_decode_store,
bookmark_store.clone(),
bookmark_store_map.clone(),
context.clone(),
scheduler_status.clone(),
scheduler_control.clone(),
@@ -136,7 +143,7 @@ async fn serve(
rig_tx,
callsign,
context,
bookmark_store,
bookmark_store_map,
scheduler_store,
scheduler_status,
scheduler_control,
@@ -162,8 +169,8 @@ fn build_server(
rig_tx: mpsc::Sender<RigRequest>,
_callsign: Option<String>,
context: Arc<FrontendRuntimeContext>,
bookmark_store: Arc<bookmarks::BookmarkStore>,
scheduler_store: Arc<SchedulerStore>,
bookmark_store_map: Arc<bookmarks::BookmarkStoreMap>,
scheduler_store: Arc<SchedulerStoreMap>,
scheduler_status: SchedulerStatusMap,
scheduler_control: Arc<SchedulerControlManager>,
vchan_mgr: Arc<ClientChannelManager>,
@@ -176,7 +183,7 @@ fn build_server(
// scheduler task can observe the connected-client count.
let clients = web::Data::new(context.sse_clients.clone());
let bookmark_store = web::Data::new(bookmark_store);
let bookmark_store = web::Data::new(bookmark_store_map);
let scheduler_store = web::Data::new(scheduler_store);
let scheduler_status = web::Data::new(scheduler_status);