compares plc.directory with other mirrors
1// compare-plc server
2// Connects to PLC mirrors server-side for accurate lag measurement.
3// Serves index.html and exposes /api/stats.
4//
5// Lag = mirror_recv - primary_recv, corrected for network distance:
6// true_lag ≈ raw_lag − (mirror_ott − primary_ott)
7// OTT (one-way transit time) = TCP_RTT / 2, measured via SYN/SYN-ACK ping.
8// TCP ping is used because the remote OS responds immediately at the network
9// stack level — no application code runs, so it's pure network latency.
10// DNS is pre-resolved so it doesn't contaminate the timing.
11
12import net from "net";
13import dns from "dns";
14import { readFileSync, writeFileSync, readdirSync, unlinkSync, mkdirSync } from "fs";
15
16const PRIMARY = "wss://plc.directory";
17const WINDOW_MS = 15 * 60 * 1000; // rolling coverage window for live table
18const LAG_KEEP = 10_000; // max lag samples per mirror
19const RTT_KEEP = 30; // RTT samples to average over
20const PING_MS = 5_000; // TCP ping interval
21const SNAP_INTERVAL = 5 * 60 * 1000; // snapshot every 5 min
22const SNAP_KEEP = 72; // 72 × 5 min = 6 hours of history
23const DATA_DIR = "./data";
24const PORT = 7331;
25
26// ── types ─────────────────────────────────────────────────────────────────────
27
28interface TrackerEntry {
29 primaryRecvMs: number | null;
30 mirrorRecv: Map<string, number>; // url -> server recv timestamp
31 firstSeen: number;
32 did?: string;
33}
34
35interface SnapMirror {
36 coverage: number | null;
37 missed: number;
38 ops: number; // ops received by mirror in this interval
39 primaryOps: number; // ops received by primary in this interval
40 missedCids: Record<string, string[]>; // did -> [cid, ...] primary got, mirror didn't (still missing)
41 extraCids: Record<string, string[]>; // did -> [cid, ...] mirror got, primary didn't
42 lateCids: Record<string, string[]>; // did -> [cid, ...] arrived at mirror after snapshot
43}
44
45interface Snapshot {
46 ts: number; // unix ms when taken
47 mirrors: Record<string, SnapMirror>; // non-primary mirrors only
48}
49
50interface MirrorState {
51 name: string;
52 url: string;
53 connected: boolean;
54 totalEvents: number; // persisted lifetime total
55 lastSeq: number | null; // last seen seq, used for gapless reconnect
56 lagSamples: number[]; // TCP-OTT-corrected lag in ms
57 rttSamples: number[]; // TCP SYN/SYN-ACK RTT samples in ms
58 ws: WebSocket | null;
59}
60
61interface SavedTotals {
62 totals: Record<string, number>; // url -> lifetime totalEvents
63 seqs: Record<string, number>; // url -> last seen seq
64}
65
66// ── state ─────────────────────────────────────────────────────────────────────
67
68const tracker = new Map<string, TrackerEntry>();
69const mirrors = new Map<string, MirrorState>();
70const snapshots: Snapshot[] = [];
71const savedTotals = new Map<string, number>(); // populated by loadData()
72const savedSeqs = new Map<string, number>(); // populated by loadData()
73
74// ── persistence ───────────────────────────────────────────────────────────────
75
76function loadData(): void {
77 mkdirSync(DATA_DIR, { recursive: true });
78
79 // load totals
80 try {
81 const raw = readFileSync(`${DATA_DIR}/totals.json`, "utf-8");
82 const data = JSON.parse(raw) as SavedTotals;
83 for (const [url, n] of Object.entries(data.totals ?? {})) savedTotals.set(url, n);
84 for (const [url, n] of Object.entries(data.seqs ?? {})) savedSeqs.set(url, n);
85 } catch { /* no totals yet */ }
86
87 // load snapshot files — named <ts>.json, sorted oldest-first
88 try {
89 const files = readdirSync(DATA_DIR)
90 .filter(f => /^\d+\.json$/.test(f))
91 .sort((a, b) => Number(a.slice(0, -5)) - Number(b.slice(0, -5)));
92 for (const f of files) {
93 try {
94 const snap = JSON.parse(readFileSync(`${DATA_DIR}/${f}`, "utf-8")) as Snapshot;
95 snapshots.push(snap);
96 } catch { /* skip corrupt file */ }
97 }
98 // trim to SNAP_KEEP most recent (shouldn't normally be needed)
99 while (snapshots.length > SNAP_KEEP) snapshots.shift();
100 } catch { /* data dir unreadable */ }
101
102 console.log(`loaded ${snapshots.length} snapshots and ${savedTotals.size} totals from ${DATA_DIR}/`);
103}
104
105function saveTotals(): void {
106 const totals: Record<string, number> = {};
107 const seqs: Record<string, number> = {};
108 for (const [url, m] of mirrors) {
109 totals[url] = m.totalEvents;
110 if (m.lastSeq !== null) seqs[url] = m.lastSeq;
111 }
112 try {
113 writeFileSync(`${DATA_DIR}/totals.json`, JSON.stringify({ totals, seqs }));
114 } catch (e) {
115 console.error("failed to save totals:", e);
116 }
117}
118
119// ── RTT helpers ───────────────────────────────────────────────────────────────
120
121function meanRtt(m: MirrorState): number | null {
122 if (!m.rttSamples.length) return null;
123 const mean = m.rttSamples.reduce((a, b) => a + b, 0) / m.rttSamples.length;
124 return Math.round(mean * 10) / 10;
125}
126
127function ott(m: MirrorState): number {
128 const rtt = meanRtt(m);
129 return rtt != null ? rtt / 2 : 0;
130}
131
132// ── mirror management ─────────────────────────────────────────────────────────
133
134function addMirror(url: string, name: string): void {
135 if (mirrors.has(url)) return;
136 mirrors.set(url, {
137 name, url, connected: false,
138 totalEvents: savedTotals.get(url) ?? 0,
139 lastSeq: savedSeqs.get(url) ?? null,
140 lagSamples: [], rttSamples: [], ws: null,
141 });
142 connect(url);
143 measureRtt(url);
144}
145
146function connect(url: string): void {
147 const m = mirrors.get(url);
148 if (!m || m.ws) return;
149
150 let ws: WebSocket;
151 try {
152 const streamUrl = m.lastSeq !== null
153 ? `${url}/export/stream?after=${m.lastSeq}`
154 : `${url}/export/stream`;
155 ws = new WebSocket(streamUrl);
156 } catch (e) {
157 console.error(`[${url}] WebSocket create failed:`, e);
158 setTimeout(() => connect(url), 10_000);
159 return;
160 }
161
162 m.ws = ws;
163
164 ws.addEventListener("open", () => {
165 m.connected = true;
166 console.log(`[${m.name}] connected`);
167 });
168
169 ws.addEventListener("error", () => {
170 // close event fires after and handles reconnect
171 });
172
173 ws.addEventListener("close", () => {
174 m.connected = false;
175 m.ws = null;
176 console.log(`[${m.name}] disconnected — retry in 5s`);
177 setTimeout(() => connect(url), 5_000);
178 });
179
180 ws.addEventListener("message", ({ data }: MessageEvent) => {
181 let ev: Record<string, unknown>;
182 try {
183 ev = JSON.parse(typeof data === "string" ? data : Buffer.from(data as ArrayBuffer).toString()) as Record<string, unknown>;
184 } catch { return; }
185
186 const cid = ev.cid as string | undefined;
187 const did = ev.did as string | undefined;
188 const seq = ev.seq as number | undefined;
189 if (!cid) return;
190
191 m.totalEvents++;
192 if (seq !== undefined && (m.lastSeq === null || seq > m.lastSeq)) m.lastSeq = seq;
193 const now = Date.now();
194
195 if (!tracker.has(cid)) {
196 tracker.set(cid, { primaryRecvMs: null, mirrorRecv: new Map(), firstSeen: now, did });
197 }
198 const entry = tracker.get(cid)!;
199
200 if (url === PRIMARY) {
201 if (entry.primaryRecvMs === null) {
202 entry.primaryRecvMs = now;
203 // retroactively score mirrors that already had this op
204 for (const [mu, mt] of entry.mirrorRecv) {
205 const mm = mirrors.get(mu);
206 if (mm) pushLag(mm, mt - now);
207 }
208 }
209 } else {
210 if (!entry.mirrorRecv.has(url)) {
211 entry.mirrorRecv.set(url, now);
212 if (entry.primaryRecvMs !== null) {
213 pushLag(m, now - entry.primaryRecvMs);
214 retroactivelyMarkReceived(url, cid, entry.did ?? "?", entry.primaryRecvMs);
215 }
216 }
217 }
218 });
219}
220
221// ── RTT measurement (TCP ping) ────────────────────────────────────────────────
222
223const resolvedIPs = new Map<string, string>();
224
225async function resolveHost(hostname: string): Promise<string | null> {
226 if (resolvedIPs.has(hostname)) return resolvedIPs.get(hostname)!;
227 try {
228 const { address } = await dns.promises.lookup(hostname);
229 resolvedIPs.set(hostname, address);
230 return address;
231 } catch { return null; }
232}
233
234async function tcpPing(hostname: string, port = 443, timeoutMs = 3_000): Promise<number | null> {
235 const ip = await resolveHost(hostname);
236 if (!ip) return null;
237 return new Promise(resolve => {
238 const sock = new net.Socket();
239 sock.setTimeout(timeoutMs);
240 const t0 = performance.now();
241 sock.connect(port, ip, () => {
242 resolve(performance.now() - t0);
243 sock.destroy();
244 });
245 sock.on("error", () => { sock.destroy(); resolve(null); });
246 sock.on("timeout", () => { sock.destroy(); resolve(null); });
247 });
248}
249
250async function measureRtt(url: string): Promise<void> {
251 const m = mirrors.get(url);
252 if (!m) return;
253 let hostname: string;
254 try { hostname = new URL(url).hostname; } catch { return; }
255 const rtt = await tcpPing(hostname);
256 if (rtt != null) {
257 m.rttSamples.push(rtt);
258 if (m.rttSamples.length > RTT_KEEP) m.rttSamples.shift();
259 }
260}
261
262function pushLag(m: MirrorState, rawLag: number): void {
263 const primary = mirrors.get(PRIMARY)!;
264 const corrected = rawLag - (ott(m) - ott(primary));
265 m.lagSamples.push(Math.round(corrected));
266 if (m.lagSamples.length > LAG_KEEP) m.lagSamples.shift();
267}
268
269// When a mirror delivers an op late (after a snapshot was already taken), fix
270// the snapshot rather than leaving a permanent false-positive "missed" entry.
271function retroactivelyMarkReceived(url: string, cid: string, did: string, primaryRecvMs: number): void {
272 for (const snap of snapshots) {
273 const windowStart = snap.ts - SNAP_INTERVAL;
274 if (primaryRecvMs < windowStart || primaryRecvMs > snap.ts) continue;
275 const sm = snap.mirrors[url];
276 if (!sm) continue;
277 const cidList = sm.missedCids[did];
278 if (!cidList) continue;
279 const idx = cidList.indexOf(cid);
280 if (idx === -1) continue;
281 cidList.splice(idx, 1);
282 if (cidList.length === 0) delete sm.missedCids[did];
283 if (!sm.lateCids) sm.lateCids = {};
284 if (!sm.lateCids[did]) sm.lateCids[did] = [];
285 sm.lateCids[did].push(cid);
286 sm.ops++;
287 sm.missed = Math.max(0, sm.missed - 1);
288 sm.coverage = sm.primaryOps > 0 ? sm.ops / sm.primaryOps : null;
289 try {
290 writeFileSync(`${DATA_DIR}/${snap.ts}.json`, JSON.stringify(snap));
291 } catch (e) {
292 console.error("failed to retroactively update snapshot:", e);
293 }
294 }
295}
296
297// ── tracker pruning ───────────────────────────────────────────────────────────
298
299function pruneTracker(): void {
300 const cut = Date.now() - WINDOW_MS;
301 for (const [cid, e] of tracker) {
302 if (e.firstSeen < cut) tracker.delete(cid);
303 }
304}
305
306// ── stats computation ─────────────────────────────────────────────────────────
307
308function pct(sorted: number[], p: number): number {
309 if (!sorted.length) return 0;
310 const i = Math.min(Math.ceil(p / 100 * sorted.length) - 1, sorted.length - 1);
311 return sorted[Math.max(0, i)];
312}
313
314// Coverage stats aggregated from all stored snapshots.
315function computeStats(): Record<string, unknown> {
316 let primaryOps = 0;
317 const mirrorOps = new Map<string, number>();
318 const mirrorMissed = new Map<string, number>();
319
320 for (const snap of snapshots) {
321 const entries = Object.entries(snap.mirrors);
322 if (!entries.length) continue;
323 // primaryOps is the same across all mirror entries in a snapshot
324 primaryOps += entries[0][1].primaryOps;
325 for (const [url, sm] of entries) {
326 mirrorOps.set(url, (mirrorOps.get(url) ?? 0) + sm.ops);
327 mirrorMissed.set(url, (mirrorMissed.get(url) ?? 0) + sm.missed);
328 }
329 }
330
331 const out: Record<string, unknown> = {};
332 for (const [url, m] of mirrors) {
333 const isPrimary = url === PRIMARY;
334 const opsInWindow = isPrimary ? primaryOps : (mirrorOps.get(url) ?? 0);
335 const missed = isPrimary ? 0 : (mirrorMissed.get(url) ?? 0);
336 const coverage = (!isPrimary && primaryOps > 0) ? opsInWindow / primaryOps : null;
337 const rtt = meanRtt(m);
338
339 let lagStats: unknown = null;
340 if (!isPrimary && m.lagSamples.length > 0) {
341 const sorted = [...m.lagSamples].sort((a, b) => a - b);
342 const sum = sorted.reduce((a, b) => a + b, 0);
343 lagStats = {
344 p50: pct(sorted, 50),
345 p95: pct(sorted, 95),
346 p99: pct(sorted, 99),
347 mean: Math.round(sum / sorted.length),
348 min: sorted[0],
349 max: sorted[sorted.length - 1],
350 n: sorted.length,
351 };
352 }
353
354 out[url] = {
355 name: m.name, url, isPrimary,
356 connected: m.connected,
357 totalEvents: opsInWindow,
358 opsInWindow, primaryOps, missed, coverage,
359 lagStats,
360 rttMs: rtt != null ? Math.round(rtt) : null,
361 correctionMs: isPrimary ? null : Math.round(ott(m) - ott(mirrors.get(PRIMARY)!)),
362 };
363 }
364 return out;
365}
366
367// Per-interval stats for snapshots — only events where primary received them
368// within [now - SNAP_INTERVAL, now]. No overlap between adjacent snapshots.
369function computeIntervalStats(): Record<string, SnapMirror> {
370 const cut = Date.now() - SNAP_INTERVAL;
371 let primaryOps = 0;
372
373 // per-mirror accumulators
374 const mirrorOps = new Map<string, number>();
375 const mirrorMissed = new Map<string, Record<string, string[]>>(); // url -> did -> [cid]
376 const mirrorExtra = new Map<string, Record<string, string[]>>(); // url -> did -> [cid]
377 for (const [url] of mirrors) {
378 if (url === PRIMARY) continue;
379 mirrorOps.set(url, 0);
380 mirrorMissed.set(url, {});
381 mirrorExtra.set(url, {});
382 }
383
384 for (const [cid, e] of tracker) {
385 if (e.primaryRecvMs !== null) {
386 // op primary received — check which mirrors got it
387 if (e.primaryRecvMs < cut) continue;
388 primaryOps++;
389 for (const [url] of mirrors) {
390 if (url === PRIMARY) continue;
391 if (e.mirrorRecv.has(url)) {
392 mirrorOps.set(url, (mirrorOps.get(url) ?? 0) + 1);
393 } else {
394 const did = e.did ?? "?";
395 const mm = mirrorMissed.get(url)!;
396 if (!mm[did]) mm[did] = [];
397 mm[did].push(cid);
398 }
399 }
400 } else {
401 // op primary never received — check if any mirror got it this interval
402 for (const [url, recvMs] of e.mirrorRecv) {
403 if (url === PRIMARY || recvMs < cut) continue;
404 const mx = mirrorExtra.get(url);
405 if (!mx) continue;
406 const did = e.did ?? "?";
407 if (!mx[did]) mx[did] = [];
408 mx[did].push(cid);
409 }
410 }
411 }
412
413 const out: Record<string, SnapMirror> = {};
414 for (const [url] of mirrors) {
415 if (url === PRIMARY) continue;
416 const ops = mirrorOps.get(url) ?? 0;
417 out[url] = {
418 coverage: primaryOps > 0 ? ops / primaryOps : null,
419 missed: Math.max(0, primaryOps - ops),
420 ops,
421 primaryOps,
422 missedCids: mirrorMissed.get(url) ?? {},
423 extraCids: mirrorExtra.get(url) ?? {},
424 lateCids: {},
425 };
426 }
427 return out;
428}
429
430// ── snapshots ─────────────────────────────────────────────────────────────────
431
432function takeSnapshot(): void {
433 const snap: Snapshot = { ts: Date.now(), mirrors: computeIntervalStats() };
434 snapshots.push(snap);
435
436 try {
437 writeFileSync(`${DATA_DIR}/${snap.ts}.json`, JSON.stringify(snap));
438 } catch (e) {
439 console.error("failed to write snapshot:", e);
440 }
441
442 // prune oldest snapshot file when over limit
443 if (snapshots.length > SNAP_KEEP) {
444 const oldest = snapshots.shift()!;
445 try { unlinkSync(`${DATA_DIR}/${oldest.ts}.json`); } catch { /* already gone */ }
446 }
447
448 saveTotals();
449}
450
451// ── HTTP server ────────────────────────────────────────────────────────────────
452
453const CORS_HEADERS = {
454 "Access-Control-Allow-Origin": "*",
455 "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
456 "Access-Control-Allow-Headers": "Content-Type",
457};
458
459Bun.serve({
460 port: PORT,
461 async fetch(req: Request): Promise<Response> {
462 const { pathname } = new URL(req.url);
463
464 if (req.method === "OPTIONS") {
465 return new Response(null, { status: 204, headers: CORS_HEADERS });
466 }
467
468 if (pathname === "/api/stats") {
469 return Response.json({ mirrors: computeStats(), snapshots }, { headers: CORS_HEADERS });
470 }
471
472 if (pathname === "/" || pathname === "/index.html") {
473 return new Response(Bun.file("index.html"), {
474 headers: { "Content-Type": "text/html; charset=utf-8" },
475 });
476 }
477
478 return new Response("not found", { status: 404 });
479 },
480});
481
482// ── boot ──────────────────────────────────────────────────────────────────────
483
484loadData();
485
486addMirror(PRIMARY, "plc.directory");
487addMirror("wss://plc.klbr.net", "plc.klbr.net");
488
489setInterval(pruneTracker, 30_000);
490setInterval(() => { for (const url of mirrors.keys()) measureRtt(url); }, PING_MS);
491setInterval(takeSnapshot, SNAP_INTERVAL);
492
493console.log(`compare-plc → http://localhost:${PORT}`);