compares plc.directory with other mirrors
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 493 lines 18 kB view raw
1// compare-plc server 2// Connects to PLC mirrors server-side for accurate lag measurement. 3// Serves index.html and exposes /api/stats. 4// 5// Lag = mirror_recv - primary_recv, corrected for network distance: 6// true_lag ≈ raw_lag − (mirror_ott − primary_ott) 7// OTT (one-way transit time) = TCP_RTT / 2, measured via SYN/SYN-ACK ping. 8// TCP ping is used because the remote OS responds immediately at the network 9// stack level — no application code runs, so it's pure network latency. 10// DNS is pre-resolved so it doesn't contaminate the timing. 11 12import net from "net"; 13import dns from "dns"; 14import { readFileSync, writeFileSync, readdirSync, unlinkSync, mkdirSync } from "fs"; 15 16const PRIMARY = "wss://plc.directory"; 17const WINDOW_MS = 15 * 60 * 1000; // rolling coverage window for live table 18const LAG_KEEP = 10_000; // max lag samples per mirror 19const RTT_KEEP = 30; // RTT samples to average over 20const PING_MS = 5_000; // TCP ping interval 21const SNAP_INTERVAL = 5 * 60 * 1000; // snapshot every 5 min 22const SNAP_KEEP = 72; // 72 × 5 min = 6 hours of history 23const DATA_DIR = "./data"; 24const PORT = 7331; 25 26// ── types ───────────────────────────────────────────────────────────────────── 27 28interface TrackerEntry { 29 primaryRecvMs: number | null; 30 mirrorRecv: Map<string, number>; // url -> server recv timestamp 31 firstSeen: number; 32 did?: string; 33} 34 35interface SnapMirror { 36 coverage: number | null; 37 missed: number; 38 ops: number; // ops received by mirror in this interval 39 primaryOps: number; // ops received by primary in this interval 40 missedCids: Record<string, string[]>; // did -> [cid, ...] primary got, mirror didn't (still missing) 41 extraCids: Record<string, string[]>; // did -> [cid, ...] mirror got, primary didn't 42 lateCids: Record<string, string[]>; // did -> [cid, ...] arrived at mirror after snapshot 43} 44 45interface Snapshot { 46 ts: number; // unix ms when taken 47 mirrors: Record<string, SnapMirror>; // non-primary mirrors only 48} 49 50interface MirrorState { 51 name: string; 52 url: string; 53 connected: boolean; 54 totalEvents: number; // persisted lifetime total 55 lastSeq: number | null; // last seen seq, used for gapless reconnect 56 lagSamples: number[]; // TCP-OTT-corrected lag in ms 57 rttSamples: number[]; // TCP SYN/SYN-ACK RTT samples in ms 58 ws: WebSocket | null; 59} 60 61interface SavedTotals { 62 totals: Record<string, number>; // url -> lifetime totalEvents 63 seqs: Record<string, number>; // url -> last seen seq 64} 65 66// ── state ───────────────────────────────────────────────────────────────────── 67 68const tracker = new Map<string, TrackerEntry>(); 69const mirrors = new Map<string, MirrorState>(); 70const snapshots: Snapshot[] = []; 71const savedTotals = new Map<string, number>(); // populated by loadData() 72const savedSeqs = new Map<string, number>(); // populated by loadData() 73 74// ── persistence ─────────────────────────────────────────────────────────────── 75 76function loadData(): void { 77 mkdirSync(DATA_DIR, { recursive: true }); 78 79 // load totals 80 try { 81 const raw = readFileSync(`${DATA_DIR}/totals.json`, "utf-8"); 82 const data = JSON.parse(raw) as SavedTotals; 83 for (const [url, n] of Object.entries(data.totals ?? {})) savedTotals.set(url, n); 84 for (const [url, n] of Object.entries(data.seqs ?? {})) savedSeqs.set(url, n); 85 } catch { /* no totals yet */ } 86 87 // load snapshot files — named <ts>.json, sorted oldest-first 88 try { 89 const files = readdirSync(DATA_DIR) 90 .filter(f => /^\d+\.json$/.test(f)) 91 .sort((a, b) => Number(a.slice(0, -5)) - Number(b.slice(0, -5))); 92 for (const f of files) { 93 try { 94 const snap = JSON.parse(readFileSync(`${DATA_DIR}/${f}`, "utf-8")) as Snapshot; 95 snapshots.push(snap); 96 } catch { /* skip corrupt file */ } 97 } 98 // trim to SNAP_KEEP most recent (shouldn't normally be needed) 99 while (snapshots.length > SNAP_KEEP) snapshots.shift(); 100 } catch { /* data dir unreadable */ } 101 102 console.log(`loaded ${snapshots.length} snapshots and ${savedTotals.size} totals from ${DATA_DIR}/`); 103} 104 105function saveTotals(): void { 106 const totals: Record<string, number> = {}; 107 const seqs: Record<string, number> = {}; 108 for (const [url, m] of mirrors) { 109 totals[url] = m.totalEvents; 110 if (m.lastSeq !== null) seqs[url] = m.lastSeq; 111 } 112 try { 113 writeFileSync(`${DATA_DIR}/totals.json`, JSON.stringify({ totals, seqs })); 114 } catch (e) { 115 console.error("failed to save totals:", e); 116 } 117} 118 119// ── RTT helpers ─────────────────────────────────────────────────────────────── 120 121function meanRtt(m: MirrorState): number | null { 122 if (!m.rttSamples.length) return null; 123 const mean = m.rttSamples.reduce((a, b) => a + b, 0) / m.rttSamples.length; 124 return Math.round(mean * 10) / 10; 125} 126 127function ott(m: MirrorState): number { 128 const rtt = meanRtt(m); 129 return rtt != null ? rtt / 2 : 0; 130} 131 132// ── mirror management ───────────────────────────────────────────────────────── 133 134function addMirror(url: string, name: string): void { 135 if (mirrors.has(url)) return; 136 mirrors.set(url, { 137 name, url, connected: false, 138 totalEvents: savedTotals.get(url) ?? 0, 139 lastSeq: savedSeqs.get(url) ?? null, 140 lagSamples: [], rttSamples: [], ws: null, 141 }); 142 connect(url); 143 measureRtt(url); 144} 145 146function connect(url: string): void { 147 const m = mirrors.get(url); 148 if (!m || m.ws) return; 149 150 let ws: WebSocket; 151 try { 152 const streamUrl = m.lastSeq !== null 153 ? `${url}/export/stream?after=${m.lastSeq}` 154 : `${url}/export/stream`; 155 ws = new WebSocket(streamUrl); 156 } catch (e) { 157 console.error(`[${url}] WebSocket create failed:`, e); 158 setTimeout(() => connect(url), 10_000); 159 return; 160 } 161 162 m.ws = ws; 163 164 ws.addEventListener("open", () => { 165 m.connected = true; 166 console.log(`[${m.name}] connected`); 167 }); 168 169 ws.addEventListener("error", () => { 170 // close event fires after and handles reconnect 171 }); 172 173 ws.addEventListener("close", () => { 174 m.connected = false; 175 m.ws = null; 176 console.log(`[${m.name}] disconnected — retry in 5s`); 177 setTimeout(() => connect(url), 5_000); 178 }); 179 180 ws.addEventListener("message", ({ data }: MessageEvent) => { 181 let ev: Record<string, unknown>; 182 try { 183 ev = JSON.parse(typeof data === "string" ? data : Buffer.from(data as ArrayBuffer).toString()) as Record<string, unknown>; 184 } catch { return; } 185 186 const cid = ev.cid as string | undefined; 187 const did = ev.did as string | undefined; 188 const seq = ev.seq as number | undefined; 189 if (!cid) return; 190 191 m.totalEvents++; 192 if (seq !== undefined && (m.lastSeq === null || seq > m.lastSeq)) m.lastSeq = seq; 193 const now = Date.now(); 194 195 if (!tracker.has(cid)) { 196 tracker.set(cid, { primaryRecvMs: null, mirrorRecv: new Map(), firstSeen: now, did }); 197 } 198 const entry = tracker.get(cid)!; 199 200 if (url === PRIMARY) { 201 if (entry.primaryRecvMs === null) { 202 entry.primaryRecvMs = now; 203 // retroactively score mirrors that already had this op 204 for (const [mu, mt] of entry.mirrorRecv) { 205 const mm = mirrors.get(mu); 206 if (mm) pushLag(mm, mt - now); 207 } 208 } 209 } else { 210 if (!entry.mirrorRecv.has(url)) { 211 entry.mirrorRecv.set(url, now); 212 if (entry.primaryRecvMs !== null) { 213 pushLag(m, now - entry.primaryRecvMs); 214 retroactivelyMarkReceived(url, cid, entry.did ?? "?", entry.primaryRecvMs); 215 } 216 } 217 } 218 }); 219} 220 221// ── RTT measurement (TCP ping) ──────────────────────────────────────────────── 222 223const resolvedIPs = new Map<string, string>(); 224 225async function resolveHost(hostname: string): Promise<string | null> { 226 if (resolvedIPs.has(hostname)) return resolvedIPs.get(hostname)!; 227 try { 228 const { address } = await dns.promises.lookup(hostname); 229 resolvedIPs.set(hostname, address); 230 return address; 231 } catch { return null; } 232} 233 234async function tcpPing(hostname: string, port = 443, timeoutMs = 3_000): Promise<number | null> { 235 const ip = await resolveHost(hostname); 236 if (!ip) return null; 237 return new Promise(resolve => { 238 const sock = new net.Socket(); 239 sock.setTimeout(timeoutMs); 240 const t0 = performance.now(); 241 sock.connect(port, ip, () => { 242 resolve(performance.now() - t0); 243 sock.destroy(); 244 }); 245 sock.on("error", () => { sock.destroy(); resolve(null); }); 246 sock.on("timeout", () => { sock.destroy(); resolve(null); }); 247 }); 248} 249 250async function measureRtt(url: string): Promise<void> { 251 const m = mirrors.get(url); 252 if (!m) return; 253 let hostname: string; 254 try { hostname = new URL(url).hostname; } catch { return; } 255 const rtt = await tcpPing(hostname); 256 if (rtt != null) { 257 m.rttSamples.push(rtt); 258 if (m.rttSamples.length > RTT_KEEP) m.rttSamples.shift(); 259 } 260} 261 262function pushLag(m: MirrorState, rawLag: number): void { 263 const primary = mirrors.get(PRIMARY)!; 264 const corrected = rawLag - (ott(m) - ott(primary)); 265 m.lagSamples.push(Math.round(corrected)); 266 if (m.lagSamples.length > LAG_KEEP) m.lagSamples.shift(); 267} 268 269// When a mirror delivers an op late (after a snapshot was already taken), fix 270// the snapshot rather than leaving a permanent false-positive "missed" entry. 271function retroactivelyMarkReceived(url: string, cid: string, did: string, primaryRecvMs: number): void { 272 for (const snap of snapshots) { 273 const windowStart = snap.ts - SNAP_INTERVAL; 274 if (primaryRecvMs < windowStart || primaryRecvMs > snap.ts) continue; 275 const sm = snap.mirrors[url]; 276 if (!sm) continue; 277 const cidList = sm.missedCids[did]; 278 if (!cidList) continue; 279 const idx = cidList.indexOf(cid); 280 if (idx === -1) continue; 281 cidList.splice(idx, 1); 282 if (cidList.length === 0) delete sm.missedCids[did]; 283 if (!sm.lateCids) sm.lateCids = {}; 284 if (!sm.lateCids[did]) sm.lateCids[did] = []; 285 sm.lateCids[did].push(cid); 286 sm.ops++; 287 sm.missed = Math.max(0, sm.missed - 1); 288 sm.coverage = sm.primaryOps > 0 ? sm.ops / sm.primaryOps : null; 289 try { 290 writeFileSync(`${DATA_DIR}/${snap.ts}.json`, JSON.stringify(snap)); 291 } catch (e) { 292 console.error("failed to retroactively update snapshot:", e); 293 } 294 } 295} 296 297// ── tracker pruning ─────────────────────────────────────────────────────────── 298 299function pruneTracker(): void { 300 const cut = Date.now() - WINDOW_MS; 301 for (const [cid, e] of tracker) { 302 if (e.firstSeen < cut) tracker.delete(cid); 303 } 304} 305 306// ── stats computation ───────────────────────────────────────────────────────── 307 308function pct(sorted: number[], p: number): number { 309 if (!sorted.length) return 0; 310 const i = Math.min(Math.ceil(p / 100 * sorted.length) - 1, sorted.length - 1); 311 return sorted[Math.max(0, i)]; 312} 313 314// Coverage stats aggregated from all stored snapshots. 315function computeStats(): Record<string, unknown> { 316 let primaryOps = 0; 317 const mirrorOps = new Map<string, number>(); 318 const mirrorMissed = new Map<string, number>(); 319 320 for (const snap of snapshots) { 321 const entries = Object.entries(snap.mirrors); 322 if (!entries.length) continue; 323 // primaryOps is the same across all mirror entries in a snapshot 324 primaryOps += entries[0][1].primaryOps; 325 for (const [url, sm] of entries) { 326 mirrorOps.set(url, (mirrorOps.get(url) ?? 0) + sm.ops); 327 mirrorMissed.set(url, (mirrorMissed.get(url) ?? 0) + sm.missed); 328 } 329 } 330 331 const out: Record<string, unknown> = {}; 332 for (const [url, m] of mirrors) { 333 const isPrimary = url === PRIMARY; 334 const opsInWindow = isPrimary ? primaryOps : (mirrorOps.get(url) ?? 0); 335 const missed = isPrimary ? 0 : (mirrorMissed.get(url) ?? 0); 336 const coverage = (!isPrimary && primaryOps > 0) ? opsInWindow / primaryOps : null; 337 const rtt = meanRtt(m); 338 339 let lagStats: unknown = null; 340 if (!isPrimary && m.lagSamples.length > 0) { 341 const sorted = [...m.lagSamples].sort((a, b) => a - b); 342 const sum = sorted.reduce((a, b) => a + b, 0); 343 lagStats = { 344 p50: pct(sorted, 50), 345 p95: pct(sorted, 95), 346 p99: pct(sorted, 99), 347 mean: Math.round(sum / sorted.length), 348 min: sorted[0], 349 max: sorted[sorted.length - 1], 350 n: sorted.length, 351 }; 352 } 353 354 out[url] = { 355 name: m.name, url, isPrimary, 356 connected: m.connected, 357 totalEvents: opsInWindow, 358 opsInWindow, primaryOps, missed, coverage, 359 lagStats, 360 rttMs: rtt != null ? Math.round(rtt) : null, 361 correctionMs: isPrimary ? null : Math.round(ott(m) - ott(mirrors.get(PRIMARY)!)), 362 }; 363 } 364 return out; 365} 366 367// Per-interval stats for snapshots — only events where primary received them 368// within [now - SNAP_INTERVAL, now]. No overlap between adjacent snapshots. 369function computeIntervalStats(): Record<string, SnapMirror> { 370 const cut = Date.now() - SNAP_INTERVAL; 371 let primaryOps = 0; 372 373 // per-mirror accumulators 374 const mirrorOps = new Map<string, number>(); 375 const mirrorMissed = new Map<string, Record<string, string[]>>(); // url -> did -> [cid] 376 const mirrorExtra = new Map<string, Record<string, string[]>>(); // url -> did -> [cid] 377 for (const [url] of mirrors) { 378 if (url === PRIMARY) continue; 379 mirrorOps.set(url, 0); 380 mirrorMissed.set(url, {}); 381 mirrorExtra.set(url, {}); 382 } 383 384 for (const [cid, e] of tracker) { 385 if (e.primaryRecvMs !== null) { 386 // op primary received — check which mirrors got it 387 if (e.primaryRecvMs < cut) continue; 388 primaryOps++; 389 for (const [url] of mirrors) { 390 if (url === PRIMARY) continue; 391 if (e.mirrorRecv.has(url)) { 392 mirrorOps.set(url, (mirrorOps.get(url) ?? 0) + 1); 393 } else { 394 const did = e.did ?? "?"; 395 const mm = mirrorMissed.get(url)!; 396 if (!mm[did]) mm[did] = []; 397 mm[did].push(cid); 398 } 399 } 400 } else { 401 // op primary never received — check if any mirror got it this interval 402 for (const [url, recvMs] of e.mirrorRecv) { 403 if (url === PRIMARY || recvMs < cut) continue; 404 const mx = mirrorExtra.get(url); 405 if (!mx) continue; 406 const did = e.did ?? "?"; 407 if (!mx[did]) mx[did] = []; 408 mx[did].push(cid); 409 } 410 } 411 } 412 413 const out: Record<string, SnapMirror> = {}; 414 for (const [url] of mirrors) { 415 if (url === PRIMARY) continue; 416 const ops = mirrorOps.get(url) ?? 0; 417 out[url] = { 418 coverage: primaryOps > 0 ? ops / primaryOps : null, 419 missed: Math.max(0, primaryOps - ops), 420 ops, 421 primaryOps, 422 missedCids: mirrorMissed.get(url) ?? {}, 423 extraCids: mirrorExtra.get(url) ?? {}, 424 lateCids: {}, 425 }; 426 } 427 return out; 428} 429 430// ── snapshots ───────────────────────────────────────────────────────────────── 431 432function takeSnapshot(): void { 433 const snap: Snapshot = { ts: Date.now(), mirrors: computeIntervalStats() }; 434 snapshots.push(snap); 435 436 try { 437 writeFileSync(`${DATA_DIR}/${snap.ts}.json`, JSON.stringify(snap)); 438 } catch (e) { 439 console.error("failed to write snapshot:", e); 440 } 441 442 // prune oldest snapshot file when over limit 443 if (snapshots.length > SNAP_KEEP) { 444 const oldest = snapshots.shift()!; 445 try { unlinkSync(`${DATA_DIR}/${oldest.ts}.json`); } catch { /* already gone */ } 446 } 447 448 saveTotals(); 449} 450 451// ── HTTP server ──────────────────────────────────────────────────────────────── 452 453const CORS_HEADERS = { 454 "Access-Control-Allow-Origin": "*", 455 "Access-Control-Allow-Methods": "GET, POST, OPTIONS", 456 "Access-Control-Allow-Headers": "Content-Type", 457}; 458 459Bun.serve({ 460 port: PORT, 461 async fetch(req: Request): Promise<Response> { 462 const { pathname } = new URL(req.url); 463 464 if (req.method === "OPTIONS") { 465 return new Response(null, { status: 204, headers: CORS_HEADERS }); 466 } 467 468 if (pathname === "/api/stats") { 469 return Response.json({ mirrors: computeStats(), snapshots }, { headers: CORS_HEADERS }); 470 } 471 472 if (pathname === "/" || pathname === "/index.html") { 473 return new Response(Bun.file("index.html"), { 474 headers: { "Content-Type": "text/html; charset=utf-8" }, 475 }); 476 } 477 478 return new Response("not found", { status: 404 }); 479 }, 480}); 481 482// ── boot ────────────────────────────────────────────────────────────────────── 483 484loadData(); 485 486addMirror(PRIMARY, "plc.directory"); 487addMirror("wss://plc.klbr.net", "plc.klbr.net"); 488 489setInterval(pruneTracker, 30_000); 490setInterval(() => { for (const url of mirrors.keys()) measureRtt(url); }, PING_MS); 491setInterval(takeSnapshot, SNAP_INTERVAL); 492 493console.log(`compare-plc → http://localhost:${PORT}`);