atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add GC and tombstone tests for SyncStorage and firehose #account events

+458
+458
src/replication/gc-and-tombstone.test.ts
··· 1 + /** 2 + * Tests for replication delete/update handling: 3 + * - SyncStorage GC methods (removeBlocks, findOrphanedCids, etc.) 4 + * - Firehose deferred GC (needs_gc flag) 5 + * - Tombstone handling (markTombstoned, purgeDidData) 6 + * - FirehoseSubscription #account event parsing 7 + */ 8 + 9 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 10 + import { mkdtempSync, rmSync } from "node:fs"; 11 + import { tmpdir } from "node:os"; 12 + import { join } from "node:path"; 13 + import Database from "better-sqlite3"; 14 + import { encode as cborEncode } from "../cbor-compat.js"; 15 + 16 + import { SyncStorage } from "./sync-storage.js"; 17 + import { 18 + FirehoseSubscription, 19 + type FirehoseAccountEvent, 20 + } from "./firehose-subscription.js"; 21 + 22 + // ============================================ 23 + // Helpers 24 + // ============================================ 25 + 26 + /** Encode a firehose frame (header + body) as concatenated CBOR. */ 27 + function encodeFrame(header: object, body: object): Buffer { 28 + const headerBytes = cborEncode(header); 29 + const bodyBytes = cborEncode(body); 30 + const frame = new Uint8Array(headerBytes.length + bodyBytes.length); 31 + frame.set(headerBytes, 0); 32 + frame.set(bodyBytes, headerBytes.length); 33 + return Buffer.from(frame); 34 + } 35 + 36 + /** Create a mock #account frame. */ 37 + function makeAccountFrame( 38 + seq: number, 39 + did: string, 40 + active: boolean, 41 + status?: string, 42 + ): Buffer { 43 + const header = { op: 1, t: "#account" }; 44 + const body = { 45 + seq, 46 + did, 47 + time: new Date().toISOString(), 48 + active, 49 + ...(status ? { status } : {}), 50 + }; 51 + return encodeFrame(header, body); 52 + } 53 + 54 + const DID_A = "did:plc:aaaaaa"; 55 + const DID_B = "did:plc:bbbbbb"; 56 + 57 + // ============================================ 58 + // SyncStorage GC methods 59 + // ============================================ 60 + 61 + describe("SyncStorage GC methods", () => { 62 + let tmpDir: string; 63 + let db: InstanceType<typeof Database>; 64 + let storage: SyncStorage; 65 + 66 + beforeEach(() => { 67 + tmpDir = mkdtempSync(join(tmpdir(), "gc-test-")); 68 + db = new Database(join(tmpDir, "test.db")); 69 + storage = new SyncStorage(db); 70 + storage.initSchema(); 71 + }); 72 + 73 + afterEach(() => { 74 + db.close(); 75 + rmSync(tmpDir, { recursive: true, force: true }); 76 + }); 77 + 78 + // ---------- removeBlocks ---------- 79 + 80 + it("removes specific blocks for a DID", () => { 81 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 82 + storage.trackBlocks(DID_A, ["cid1", "cid2", "cid3"]); 83 + 84 + expect(storage.getBlockCids(DID_A)).toHaveLength(3); 85 + 86 + storage.removeBlocks(DID_A, ["cid1", "cid3"]); 87 + 88 + const remaining = storage.getBlockCids(DID_A); 89 + expect(remaining).toHaveLength(1); 90 + expect(remaining).toContain("cid2"); 91 + }); 92 + 93 + it("removeBlocks does nothing for empty array", () => { 94 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 95 + storage.trackBlocks(DID_A, ["cid1"]); 96 + storage.removeBlocks(DID_A, []); 97 + expect(storage.getBlockCids(DID_A)).toHaveLength(1); 98 + }); 99 + 100 + it("removeBlocks does not affect other DIDs", () => { 101 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 102 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 103 + storage.trackBlocks(DID_A, ["cid1", "cid2"]); 104 + storage.trackBlocks(DID_B, ["cid1", "cid3"]); 105 + 106 + storage.removeBlocks(DID_A, ["cid1"]); 107 + 108 + expect(storage.getBlockCids(DID_A)).toEqual(["cid2"]); 109 + expect(storage.getBlockCids(DID_B)).toHaveLength(2); 110 + expect(storage.getBlockCids(DID_B)).toContain("cid1"); 111 + }); 112 + 113 + // ---------- findOrphanedCids ---------- 114 + 115 + it("finds orphaned CIDs with no remaining references", () => { 116 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 117 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 118 + storage.trackBlocks(DID_A, ["cid1", "cid2"]); 119 + storage.trackBlocks(DID_B, ["cid1", "cid3"]); 120 + 121 + // Remove DID_A's reference to cid1 and cid2 122 + storage.removeBlocks(DID_A, ["cid1", "cid2"]); 123 + 124 + // cid1 still referenced by DID_B, cid2 is orphaned 125 + const orphaned = storage.findOrphanedCids(["cid1", "cid2"]); 126 + expect(orphaned).toEqual(["cid2"]); 127 + }); 128 + 129 + it("findOrphanedCids returns all for completely removed CIDs", () => { 130 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 131 + storage.trackBlocks(DID_A, ["cid1", "cid2"]); 132 + storage.removeBlocks(DID_A, ["cid1", "cid2"]); 133 + 134 + const orphaned = storage.findOrphanedCids(["cid1", "cid2"]); 135 + expect(orphaned).toEqual(["cid1", "cid2"]); 136 + }); 137 + 138 + it("findOrphanedCids returns empty for empty input", () => { 139 + expect(storage.findOrphanedCids([])).toEqual([]); 140 + }); 141 + 142 + // ---------- removeBlobs / findOrphanedBlobCids ---------- 143 + 144 + it("removes specific blobs for a DID", () => { 145 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 146 + storage.trackBlobs(DID_A, ["blob1", "blob2", "blob3"]); 147 + 148 + storage.removeBlobs(DID_A, ["blob1", "blob3"]); 149 + 150 + const remaining = storage.getBlobCids(DID_A); 151 + expect(remaining).toHaveLength(1); 152 + expect(remaining).toContain("blob2"); 153 + }); 154 + 155 + it("finds orphaned blob CIDs", () => { 156 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 157 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 158 + storage.trackBlobs(DID_A, ["blob1", "blob2"]); 159 + storage.trackBlobs(DID_B, ["blob1"]); 160 + 161 + storage.removeBlobs(DID_A, ["blob1", "blob2"]); 162 + 163 + // blob1 still referenced by DID_B 164 + const orphaned = storage.findOrphanedBlobCids(["blob1", "blob2"]); 165 + expect(orphaned).toEqual(["blob2"]); 166 + }); 167 + 168 + // ---------- getBlockCidSet ---------- 169 + 170 + it("returns block CIDs as a Set", () => { 171 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 172 + storage.trackBlocks(DID_A, ["cid1", "cid2", "cid3"]); 173 + 174 + const cidSet = storage.getBlockCidSet(DID_A); 175 + expect(cidSet).toBeInstanceOf(Set); 176 + expect(cidSet.size).toBe(3); 177 + expect(cidSet.has("cid1")).toBe(true); 178 + expect(cidSet.has("cid4")).toBe(false); 179 + }); 180 + 181 + // ---------- needs_gc ---------- 182 + 183 + it("sets and clears needs_gc flag", () => { 184 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 185 + 186 + expect(storage.getDidsNeedingGc()).toEqual([]); 187 + 188 + storage.setNeedsGc(DID_A); 189 + expect(storage.getDidsNeedingGc()).toEqual([DID_A]); 190 + 191 + storage.clearNeedsGc(DID_A); 192 + expect(storage.getDidsNeedingGc()).toEqual([]); 193 + }); 194 + 195 + it("getDidsNeedingGc returns multiple flagged DIDs", () => { 196 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 197 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 198 + 199 + storage.setNeedsGc(DID_A); 200 + storage.setNeedsGc(DID_B); 201 + 202 + const dids = storage.getDidsNeedingGc(); 203 + expect(dids).toHaveLength(2); 204 + expect(dids).toContain(DID_A); 205 + expect(dids).toContain(DID_B); 206 + }); 207 + 208 + // ---------- markTombstoned ---------- 209 + 210 + it("marks a DID as tombstoned", () => { 211 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 212 + 213 + storage.markTombstoned(DID_A); 214 + 215 + const state = storage.getState(DID_A); 216 + expect(state?.status).toBe("tombstoned"); 217 + }); 218 + 219 + // ---------- purgeDidData ---------- 220 + 221 + it("purges all data for a DID and returns removed CIDs", () => { 222 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 223 + storage.trackBlocks(DID_A, ["cid1", "cid2"]); 224 + storage.trackBlobs(DID_A, ["blob1"]); 225 + storage.trackRecordPaths(DID_A, ["app.bsky.feed.post/abc"]); 226 + 227 + const result = storage.purgeDidData(DID_A); 228 + 229 + expect(result.blocksRemoved).toHaveLength(2); 230 + expect(result.blocksRemoved).toContain("cid1"); 231 + expect(result.blocksRemoved).toContain("cid2"); 232 + expect(result.blobsRemoved).toEqual(["blob1"]); 233 + 234 + // All tracking data should be gone 235 + expect(storage.getState(DID_A)).toBeNull(); 236 + expect(storage.getBlockCids(DID_A)).toEqual([]); 237 + expect(storage.getBlobCids(DID_A)).toEqual([]); 238 + expect(storage.getRecordPaths(DID_A)).toEqual([]); 239 + }); 240 + 241 + it("purgeDidData does not affect other DIDs", () => { 242 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 243 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 244 + storage.trackBlocks(DID_A, ["cid1"]); 245 + storage.trackBlocks(DID_B, ["cid1", "cid2"]); 246 + 247 + storage.purgeDidData(DID_A); 248 + 249 + expect(storage.getState(DID_B)).not.toBeNull(); 250 + expect(storage.getBlockCids(DID_B)).toHaveLength(2); 251 + }); 252 + 253 + // ---------- needsGc in SyncState ---------- 254 + 255 + it("SyncState includes needsGc field", () => { 256 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 257 + 258 + let state = storage.getState(DID_A); 259 + expect(state?.needsGc).toBe(false); 260 + 261 + storage.setNeedsGc(DID_A); 262 + state = storage.getState(DID_A); 263 + expect(state?.needsGc).toBe(true); 264 + }); 265 + 266 + // ---------- tombstoned status ---------- 267 + 268 + it("tombstoned status persists in SyncState", () => { 269 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://pds.example" }); 270 + storage.markTombstoned(DID_A); 271 + 272 + const state = storage.getState(DID_A); 273 + expect(state?.status).toBe("tombstoned"); 274 + }); 275 + 276 + it("tombstoned DIDs appear in getAllStates", () => { 277 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 278 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 279 + storage.markTombstoned(DID_A); 280 + 281 + const states = storage.getAllStates(); 282 + expect(states).toHaveLength(2); 283 + const tombstoned = states.find((s) => s.did === DID_A); 284 + expect(tombstoned?.status).toBe("tombstoned"); 285 + }); 286 + 287 + // ---------- Cross-DID safety ---------- 288 + 289 + it("block GC respects cross-DID sharing", () => { 290 + storage.upsertState({ did: DID_A, pdsEndpoint: "https://a.example" }); 291 + storage.upsertState({ did: DID_B, pdsEndpoint: "https://b.example" }); 292 + 293 + // Both DIDs share cid1 294 + storage.trackBlocks(DID_A, ["cid1", "cid2"]); 295 + storage.trackBlocks(DID_B, ["cid1", "cid3"]); 296 + 297 + // Remove cid1 from DID_A only 298 + storage.removeBlocks(DID_A, ["cid1"]); 299 + 300 + // cid1 is NOT orphaned (still referenced by DID_B) 301 + expect(storage.findOrphanedCids(["cid1"])).toEqual([]); 302 + 303 + // Now remove cid1 from DID_B too 304 + storage.removeBlocks(DID_B, ["cid1"]); 305 + 306 + // Now cid1 IS orphaned 307 + expect(storage.findOrphanedCids(["cid1"])).toEqual(["cid1"]); 308 + }); 309 + }); 310 + 311 + // ============================================ 312 + // FirehoseSubscription #account events 313 + // ============================================ 314 + 315 + describe("FirehoseSubscription #account events", () => { 316 + let tmpDir: string; 317 + let server: ReturnType<typeof import("node:http").createServer>; 318 + let wss: InstanceType<typeof import("ws").WebSocketServer>; 319 + let port: number; 320 + let subscription: FirehoseSubscription; 321 + let connectedWs: InstanceType<typeof import("ws").WebSocket> | null; 322 + 323 + beforeEach(async () => { 324 + tmpDir = mkdtempSync(join(tmpdir(), "account-test-")); 325 + connectedWs = null; 326 + 327 + const { createServer: createHttpServer } = await import("node:http"); 328 + const { WebSocketServer: WSS } = await import("ws"); 329 + 330 + server = createHttpServer(); 331 + wss = new WSS({ server }); 332 + 333 + wss.on("connection", (ws) => { 334 + connectedWs = ws; 335 + }); 336 + 337 + await new Promise<void>((resolve) => { 338 + server.listen(0, "127.0.0.1", () => resolve()); 339 + }); 340 + const addr = server.address(); 341 + port = typeof addr === "object" && addr ? addr.port : 0; 342 + 343 + subscription = new FirehoseSubscription({ 344 + firehoseUrl: `ws://127.0.0.1:${port}/xrpc/com.atproto.sync.subscribeRepos`, 345 + }); 346 + }); 347 + 348 + afterEach(async () => { 349 + subscription.stop(); 350 + wss.close(); 351 + server.close(); 352 + rmSync(tmpDir, { recursive: true, force: true }); 353 + // Small delay for cleanup 354 + await new Promise((r) => setTimeout(r, 100)); 355 + }); 356 + 357 + it("dispatches #account events for tracked DIDs", async () => { 358 + const events: FirehoseAccountEvent[] = []; 359 + subscription.onAccount((event) => { 360 + events.push(event); 361 + }); 362 + 363 + subscription.start(new Set([DID_A])); 364 + 365 + // Wait for WS connection 366 + await new Promise<void>((resolve) => { 367 + const check = () => { 368 + if (connectedWs) return resolve(); 369 + setTimeout(check, 20); 370 + }; 371 + check(); 372 + }); 373 + 374 + // Send an account deactivation event 375 + const frame = makeAccountFrame(1, DID_A, false, "deleted"); 376 + connectedWs!.send(frame); 377 + 378 + // Wait for processing 379 + await new Promise((r) => setTimeout(r, 200)); 380 + 381 + expect(events).toHaveLength(1); 382 + expect(events[0]!.did).toBe(DID_A); 383 + expect(events[0]!.active).toBe(false); 384 + expect(events[0]!.status).toBe("deleted"); 385 + }); 386 + 387 + it("ignores #account events for untracked DIDs", async () => { 388 + const events: FirehoseAccountEvent[] = []; 389 + subscription.onAccount((event) => { 390 + events.push(event); 391 + }); 392 + 393 + subscription.start(new Set([DID_A])); 394 + 395 + await new Promise<void>((resolve) => { 396 + const check = () => { 397 + if (connectedWs) return resolve(); 398 + setTimeout(check, 20); 399 + }; 400 + check(); 401 + }); 402 + 403 + // Send event for untracked DID 404 + const frame = makeAccountFrame(1, DID_B, false, "deleted"); 405 + connectedWs!.send(frame); 406 + 407 + await new Promise((r) => setTimeout(r, 200)); 408 + 409 + expect(events).toHaveLength(0); 410 + }); 411 + 412 + it("updates cursor for #account events", async () => { 413 + subscription.onAccount(() => {}); 414 + subscription.start(new Set([DID_A])); 415 + 416 + await new Promise<void>((resolve) => { 417 + const check = () => { 418 + if (connectedWs) return resolve(); 419 + setTimeout(check, 20); 420 + }; 421 + check(); 422 + }); 423 + 424 + const frame = makeAccountFrame(42, DID_A, false, "takendown"); 425 + connectedWs!.send(frame); 426 + 427 + await new Promise((r) => setTimeout(r, 200)); 428 + 429 + expect(subscription.getCursor()).toBe(42); 430 + }); 431 + 432 + it("dispatches re-activation events", async () => { 433 + const events: FirehoseAccountEvent[] = []; 434 + subscription.onAccount((event) => { 435 + events.push(event); 436 + }); 437 + 438 + subscription.start(new Set([DID_A])); 439 + 440 + await new Promise<void>((resolve) => { 441 + const check = () => { 442 + if (connectedWs) return resolve(); 443 + setTimeout(check, 20); 444 + }; 445 + check(); 446 + }); 447 + 448 + // Deactivate then reactivate 449 + connectedWs!.send(makeAccountFrame(1, DID_A, false, "deactivated")); 450 + connectedWs!.send(makeAccountFrame(2, DID_A, true)); 451 + 452 + await new Promise((r) => setTimeout(r, 300)); 453 + 454 + expect(events).toHaveLength(2); 455 + expect(events[0]!.active).toBe(false); 456 + expect(events[1]!.active).toBe(true); 457 + }); 458 + });