// compare-plc server // Connects to PLC mirrors server-side for accurate lag measurement. // Serves index.html and exposes /api/stats. // // Lag = mirror_recv - primary_recv, corrected for network distance: // true_lag ≈ raw_lag − (mirror_ott − primary_ott) // OTT (one-way transit time) = TCP_RTT / 2, measured via SYN/SYN-ACK ping. // TCP ping is used because the remote OS responds immediately at the network // stack level — no application code runs, so it's pure network latency. // DNS is pre-resolved so it doesn't contaminate the timing. import net from "net"; import dns from "dns"; import { readFileSync, writeFileSync, readdirSync, unlinkSync, mkdirSync } from "fs"; const PRIMARY = "wss://plc.directory"; const WINDOW_MS = 15 * 60 * 1000; // rolling coverage window for live table const LAG_KEEP = 10_000; // max lag samples per mirror const RTT_KEEP = 30; // RTT samples to average over const PING_MS = 5_000; // TCP ping interval const SNAP_INTERVAL = 5 * 60 * 1000; // snapshot every 5 min const SNAP_KEEP = 72; // 72 × 5 min = 6 hours of history const DATA_DIR = "./data"; const PORT = 7331; // ── types ───────────────────────────────────────────────────────────────────── interface TrackerEntry { primaryRecvMs: number | null; mirrorRecv: Map; // url -> server recv timestamp firstSeen: number; did?: string; } interface SnapMirror { coverage: number | null; missed: number; ops: number; // ops received by mirror in this interval primaryOps: number; // ops received by primary in this interval missedCids: Record; // did -> [cid, ...] primary got, mirror didn't (still missing) extraCids: Record; // did -> [cid, ...] mirror got, primary didn't lateCids: Record; // did -> [cid, ...] arrived at mirror after snapshot } interface Snapshot { ts: number; // unix ms when taken mirrors: Record; // non-primary mirrors only } interface MirrorState { name: string; url: string; connected: boolean; totalEvents: number; // persisted lifetime total lastSeq: number | null; // last seen seq, used for gapless reconnect lagSamples: number[]; // TCP-OTT-corrected lag in ms rttSamples: number[]; // TCP SYN/SYN-ACK RTT samples in ms ws: WebSocket | null; } interface SavedTotals { totals: Record; // url -> lifetime totalEvents seqs: Record; // url -> last seen seq } // ── state ───────────────────────────────────────────────────────────────────── const tracker = new Map(); const mirrors = new Map(); const snapshots: Snapshot[] = []; const savedTotals = new Map(); // populated by loadData() const savedSeqs = new Map(); // populated by loadData() // ── persistence ─────────────────────────────────────────────────────────────── function loadData(): void { mkdirSync(DATA_DIR, { recursive: true }); // load totals try { const raw = readFileSync(`${DATA_DIR}/totals.json`, "utf-8"); const data = JSON.parse(raw) as SavedTotals; for (const [url, n] of Object.entries(data.totals ?? {})) savedTotals.set(url, n); for (const [url, n] of Object.entries(data.seqs ?? {})) savedSeqs.set(url, n); } catch { /* no totals yet */ } // load snapshot files — named .json, sorted oldest-first try { const files = readdirSync(DATA_DIR) .filter(f => /^\d+\.json$/.test(f)) .sort((a, b) => Number(a.slice(0, -5)) - Number(b.slice(0, -5))); for (const f of files) { try { const snap = JSON.parse(readFileSync(`${DATA_DIR}/${f}`, "utf-8")) as Snapshot; snapshots.push(snap); } catch { /* skip corrupt file */ } } // trim to SNAP_KEEP most recent (shouldn't normally be needed) while (snapshots.length > SNAP_KEEP) snapshots.shift(); } catch { /* data dir unreadable */ } console.log(`loaded ${snapshots.length} snapshots and ${savedTotals.size} totals from ${DATA_DIR}/`); } function saveTotals(): void { const totals: Record = {}; const seqs: Record = {}; for (const [url, m] of mirrors) { totals[url] = m.totalEvents; if (m.lastSeq !== null) seqs[url] = m.lastSeq; } try { writeFileSync(`${DATA_DIR}/totals.json`, JSON.stringify({ totals, seqs })); } catch (e) { console.error("failed to save totals:", e); } } // ── RTT helpers ─────────────────────────────────────────────────────────────── function meanRtt(m: MirrorState): number | null { if (!m.rttSamples.length) return null; const mean = m.rttSamples.reduce((a, b) => a + b, 0) / m.rttSamples.length; return Math.round(mean * 10) / 10; } function ott(m: MirrorState): number { const rtt = meanRtt(m); return rtt != null ? rtt / 2 : 0; } // ── mirror management ───────────────────────────────────────────────────────── function addMirror(url: string, name: string): void { if (mirrors.has(url)) return; mirrors.set(url, { name, url, connected: false, totalEvents: savedTotals.get(url) ?? 0, lastSeq: savedSeqs.get(url) ?? null, lagSamples: [], rttSamples: [], ws: null, }); connect(url); measureRtt(url); } function connect(url: string): void { const m = mirrors.get(url); if (!m || m.ws) return; let ws: WebSocket; try { const streamUrl = m.lastSeq !== null ? `${url}/export/stream?after=${m.lastSeq}` : `${url}/export/stream`; ws = new WebSocket(streamUrl); } catch (e) { console.error(`[${url}] WebSocket create failed:`, e); setTimeout(() => connect(url), 10_000); return; } m.ws = ws; ws.addEventListener("open", () => { m.connected = true; console.log(`[${m.name}] connected`); }); ws.addEventListener("error", () => { // close event fires after and handles reconnect }); ws.addEventListener("close", () => { m.connected = false; m.ws = null; console.log(`[${m.name}] disconnected — retry in 5s`); setTimeout(() => connect(url), 5_000); }); ws.addEventListener("message", ({ data }: MessageEvent) => { let ev: Record; try { ev = JSON.parse(typeof data === "string" ? data : Buffer.from(data as ArrayBuffer).toString()) as Record; } catch { return; } const cid = ev.cid as string | undefined; const did = ev.did as string | undefined; const seq = ev.seq as number | undefined; if (!cid) return; m.totalEvents++; if (seq !== undefined && (m.lastSeq === null || seq > m.lastSeq)) m.lastSeq = seq; const now = Date.now(); if (!tracker.has(cid)) { tracker.set(cid, { primaryRecvMs: null, mirrorRecv: new Map(), firstSeen: now, did }); } const entry = tracker.get(cid)!; if (url === PRIMARY) { if (entry.primaryRecvMs === null) { entry.primaryRecvMs = now; // retroactively score mirrors that already had this op for (const [mu, mt] of entry.mirrorRecv) { const mm = mirrors.get(mu); if (mm) pushLag(mm, mt - now); } } } else { if (!entry.mirrorRecv.has(url)) { entry.mirrorRecv.set(url, now); if (entry.primaryRecvMs !== null) { pushLag(m, now - entry.primaryRecvMs); retroactivelyMarkReceived(url, cid, entry.did ?? "?", entry.primaryRecvMs); } } } }); } // ── RTT measurement (TCP ping) ──────────────────────────────────────────────── const resolvedIPs = new Map(); async function resolveHost(hostname: string): Promise { if (resolvedIPs.has(hostname)) return resolvedIPs.get(hostname)!; try { const { address } = await dns.promises.lookup(hostname); resolvedIPs.set(hostname, address); return address; } catch { return null; } } async function tcpPing(hostname: string, port = 443, timeoutMs = 3_000): Promise { const ip = await resolveHost(hostname); if (!ip) return null; return new Promise(resolve => { const sock = new net.Socket(); sock.setTimeout(timeoutMs); const t0 = performance.now(); sock.connect(port, ip, () => { resolve(performance.now() - t0); sock.destroy(); }); sock.on("error", () => { sock.destroy(); resolve(null); }); sock.on("timeout", () => { sock.destroy(); resolve(null); }); }); } async function measureRtt(url: string): Promise { const m = mirrors.get(url); if (!m) return; let hostname: string; try { hostname = new URL(url).hostname; } catch { return; } const rtt = await tcpPing(hostname); if (rtt != null) { m.rttSamples.push(rtt); if (m.rttSamples.length > RTT_KEEP) m.rttSamples.shift(); } } function pushLag(m: MirrorState, rawLag: number): void { const primary = mirrors.get(PRIMARY)!; const corrected = rawLag - (ott(m) - ott(primary)); m.lagSamples.push(Math.round(corrected)); if (m.lagSamples.length > LAG_KEEP) m.lagSamples.shift(); } // When a mirror delivers an op late (after a snapshot was already taken), fix // the snapshot rather than leaving a permanent false-positive "missed" entry. function retroactivelyMarkReceived(url: string, cid: string, did: string, primaryRecvMs: number): void { for (const snap of snapshots) { const windowStart = snap.ts - SNAP_INTERVAL; if (primaryRecvMs < windowStart || primaryRecvMs > snap.ts) continue; const sm = snap.mirrors[url]; if (!sm) continue; const cidList = sm.missedCids[did]; if (!cidList) continue; const idx = cidList.indexOf(cid); if (idx === -1) continue; cidList.splice(idx, 1); if (cidList.length === 0) delete sm.missedCids[did]; if (!sm.lateCids) sm.lateCids = {}; if (!sm.lateCids[did]) sm.lateCids[did] = []; sm.lateCids[did].push(cid); sm.ops++; sm.missed = Math.max(0, sm.missed - 1); sm.coverage = sm.primaryOps > 0 ? sm.ops / sm.primaryOps : null; try { writeFileSync(`${DATA_DIR}/${snap.ts}.json`, JSON.stringify(snap)); } catch (e) { console.error("failed to retroactively update snapshot:", e); } } } // ── tracker pruning ─────────────────────────────────────────────────────────── function pruneTracker(): void { const cut = Date.now() - WINDOW_MS; for (const [cid, e] of tracker) { if (e.firstSeen < cut) tracker.delete(cid); } } // ── stats computation ───────────────────────────────────────────────────────── function pct(sorted: number[], p: number): number { if (!sorted.length) return 0; const i = Math.min(Math.ceil(p / 100 * sorted.length) - 1, sorted.length - 1); return sorted[Math.max(0, i)]; } // Coverage stats aggregated from all stored snapshots. function computeStats(): Record { let primaryOps = 0; const mirrorOps = new Map(); const mirrorMissed = new Map(); for (const snap of snapshots) { const entries = Object.entries(snap.mirrors); if (!entries.length) continue; // primaryOps is the same across all mirror entries in a snapshot primaryOps += entries[0][1].primaryOps; for (const [url, sm] of entries) { mirrorOps.set(url, (mirrorOps.get(url) ?? 0) + sm.ops); mirrorMissed.set(url, (mirrorMissed.get(url) ?? 0) + sm.missed); } } const out: Record = {}; for (const [url, m] of mirrors) { const isPrimary = url === PRIMARY; const opsInWindow = isPrimary ? primaryOps : (mirrorOps.get(url) ?? 0); const missed = isPrimary ? 0 : (mirrorMissed.get(url) ?? 0); const coverage = (!isPrimary && primaryOps > 0) ? opsInWindow / primaryOps : null; const rtt = meanRtt(m); let lagStats: unknown = null; if (!isPrimary && m.lagSamples.length > 0) { const sorted = [...m.lagSamples].sort((a, b) => a - b); const sum = sorted.reduce((a, b) => a + b, 0); lagStats = { p50: pct(sorted, 50), p95: pct(sorted, 95), p99: pct(sorted, 99), mean: Math.round(sum / sorted.length), min: sorted[0], max: sorted[sorted.length - 1], n: sorted.length, }; } out[url] = { name: m.name, url, isPrimary, connected: m.connected, totalEvents: opsInWindow, opsInWindow, primaryOps, missed, coverage, lagStats, rttMs: rtt != null ? Math.round(rtt) : null, correctionMs: isPrimary ? null : Math.round(ott(m) - ott(mirrors.get(PRIMARY)!)), }; } return out; } // Per-interval stats for snapshots — only events where primary received them // within [now - SNAP_INTERVAL, now]. No overlap between adjacent snapshots. function computeIntervalStats(): Record { const cut = Date.now() - SNAP_INTERVAL; let primaryOps = 0; // per-mirror accumulators const mirrorOps = new Map(); const mirrorMissed = new Map>(); // url -> did -> [cid] const mirrorExtra = new Map>(); // url -> did -> [cid] for (const [url] of mirrors) { if (url === PRIMARY) continue; mirrorOps.set(url, 0); mirrorMissed.set(url, {}); mirrorExtra.set(url, {}); } for (const [cid, e] of tracker) { if (e.primaryRecvMs !== null) { // op primary received — check which mirrors got it if (e.primaryRecvMs < cut) continue; primaryOps++; for (const [url] of mirrors) { if (url === PRIMARY) continue; if (e.mirrorRecv.has(url)) { mirrorOps.set(url, (mirrorOps.get(url) ?? 0) + 1); } else { const did = e.did ?? "?"; const mm = mirrorMissed.get(url)!; if (!mm[did]) mm[did] = []; mm[did].push(cid); } } } else { // op primary never received — check if any mirror got it this interval for (const [url, recvMs] of e.mirrorRecv) { if (url === PRIMARY || recvMs < cut) continue; const mx = mirrorExtra.get(url); if (!mx) continue; const did = e.did ?? "?"; if (!mx[did]) mx[did] = []; mx[did].push(cid); } } } const out: Record = {}; for (const [url] of mirrors) { if (url === PRIMARY) continue; const ops = mirrorOps.get(url) ?? 0; out[url] = { coverage: primaryOps > 0 ? ops / primaryOps : null, missed: Math.max(0, primaryOps - ops), ops, primaryOps, missedCids: mirrorMissed.get(url) ?? {}, extraCids: mirrorExtra.get(url) ?? {}, lateCids: {}, }; } return out; } // ── snapshots ───────────────────────────────────────────────────────────────── function takeSnapshot(): void { const snap: Snapshot = { ts: Date.now(), mirrors: computeIntervalStats() }; snapshots.push(snap); try { writeFileSync(`${DATA_DIR}/${snap.ts}.json`, JSON.stringify(snap)); } catch (e) { console.error("failed to write snapshot:", e); } // prune oldest snapshot file when over limit if (snapshots.length > SNAP_KEEP) { const oldest = snapshots.shift()!; try { unlinkSync(`${DATA_DIR}/${oldest.ts}.json`); } catch { /* already gone */ } } saveTotals(); } // ── HTTP server ──────────────────────────────────────────────────────────────── const CORS_HEADERS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", }; Bun.serve({ port: PORT, async fetch(req: Request): Promise { const { pathname } = new URL(req.url); if (req.method === "OPTIONS") { return new Response(null, { status: 204, headers: CORS_HEADERS }); } if (pathname === "/api/stats") { return Response.json({ mirrors: computeStats(), snapshots }, { headers: CORS_HEADERS }); } if (pathname === "/" || pathname === "/index.html") { return new Response(Bun.file("index.html"), { headers: { "Content-Type": "text/html; charset=utf-8" }, }); } return new Response("not found", { status: 404 }); }, }); // ── boot ────────────────────────────────────────────────────────────────────── loadData(); addMirror(PRIMARY, "plc.directory"); addMirror("wss://plc.klbr.net", "plc.klbr.net"); setInterval(pruneTracker, 30_000); setInterval(() => { for (const url of mirrors.keys()) measureRtt(url); }, PING_MS); setInterval(takeSnapshot, SNAP_INTERVAL); console.log(`compare-plc → http://localhost:${PORT}`);