atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 922 lines 23 kB view raw
1import type Database from "better-sqlite3"; 2import { 3 Repo, 4 WriteOpAction, 5 BlockMap, 6 blocksToCarFile, 7 readCarWithRoot, 8 getRecords, 9 type RecordCreateOp, 10 type RecordUpdateOp, 11 type RecordDeleteOp, 12 type RecordWriteOp, 13} from "@atproto/repo"; 14type RepoRecord = Record<string, unknown>; 15import { Secp256k1Keypair } from "@atproto/crypto"; 16import { CID, asCid, isBlobRef } from "@atproto/lex-data"; 17import { now as tidNow } from "@atcute/tid"; 18import { encode as cborEncode } from "./cbor-compat.js"; 19import { SqliteRepoStorage } from "./storage.js"; 20import { 21 Sequencer, 22 type SeqEvent, 23 type SeqCommitEvent, 24 type SeqIdentityEvent, 25 type CommitData, 26} from "./sequencer.js"; 27import { BlobStore, type BlobRef } from "./blobs.js"; 28import { jsonToLex, type JsonValue } from "@atproto/lex-json"; 29import type { Config } from "./config.js"; 30import type { BlockStore, NetworkService } from "./ipfs.js"; 31 32/** 33 * RepoManager - manages a single user's AT Protocol repository. 34 * 35 * This is the Node.js equivalent of Cirrus's AccountDurableObject, 36 * converted from a Cloudflare Durable Object to a plain class. 37 */ 38export class RepoManager { 39 storage: SqliteRepoStorage; 40 private repo: Repo | null = null; 41 private keypair: Secp256k1Keypair | null = null; 42 sequencer: Sequencer; 43 blobStore: BlobStore | null = null; 44 blockStore: BlockStore | null = null; 45 networkService: NetworkService | null = null; 46 private repoInitialized = false; 47 48 /** Callback invoked when a firehose event is produced */ 49 onFirehoseEvent?: (event: SeqEvent) => void; 50 51 constructor( 52 private db: Database.Database, 53 private config: Config, 54 ) { 55 this.storage = new SqliteRepoStorage(db); 56 this.sequencer = new Sequencer(db); 57 } 58 59 /** 60 * Initialize storage schema and optionally the blob store. 61 */ 62 init( 63 blobStore?: BlobStore, 64 blockStore?: BlockStore, 65 networkService?: NetworkService, 66 ): void { 67 this.storage.initSchema(true); 68 if (blobStore) { 69 this.blobStore = blobStore; 70 } 71 if (blockStore) { 72 this.blockStore = blockStore; 73 } 74 if (networkService) { 75 this.networkService = networkService; 76 } 77 } 78 79 /** 80 * Initialize the Repo instance. Called lazily on first repo access. 81 */ 82 private async ensureRepoInitialized(): Promise<void> { 83 if (this.repoInitialized) return; 84 85 if (!this.config.DID || !this.config.SIGNING_KEY) { 86 throw new Error("RepoManager requires DID and SIGNING_KEY to be configured"); 87 } 88 89 this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY); 90 91 const root = await this.storage.getRoot(); 92 if (root) { 93 this.repo = await Repo.load(this.storage, root); 94 } else { 95 this.repo = await Repo.create( 96 this.storage, 97 this.config.DID, 98 this.keypair, 99 ); 100 } 101 102 this.repoInitialized = true; 103 } 104 105 async getRepo(): Promise<Repo> { 106 await this.ensureRepoInitialized(); 107 return this.repo!; 108 } 109 110 async getKeypair(): Promise<Secp256k1Keypair> { 111 await this.ensureRepoInitialized(); 112 return this.keypair!; 113 } 114 115 async ensureActive(): Promise<void> { 116 const isActive = await this.storage.getActive(); 117 if (!isActive) { 118 throw new Error( 119 "AccountDeactivated: Account is deactivated. Call activateAccount to enable writes.", 120 ); 121 } 122 } 123 124 /** 125 * Get new blocks for the current revision from the database. 126 */ 127 private getNewBlocksForRev(rev: string): BlockMap { 128 const newBlocks = new BlockMap(); 129 const rows = this.db 130 .prepare("SELECT cid, bytes FROM blocks WHERE rev = ?") 131 .all(rev) as Array<{ cid: string; bytes: Buffer }>; 132 133 for (const row of rows) { 134 const cid = CID.parse(row.cid); 135 const bytes = new Uint8Array(row.bytes); 136 newBlocks.set(cid, bytes); 137 } 138 return newBlocks; 139 } 140 141 /** 142 * Sequence a commit and broadcast to firehose listeners. 143 */ 144 private async sequenceAndBroadcast( 145 prevRev: string, 146 ops: Array<RecordWriteOp & { cid?: CID | null }>, 147 ): Promise<void> { 148 const newBlocks = this.getNewBlocksForRev(this.repo!.commit.rev); 149 150 const commitData: CommitData = { 151 did: this.repo!.did, 152 commit: this.repo!.cid, 153 rev: this.repo!.commit.rev, 154 since: prevRev, 155 newBlocks, 156 ops, 157 }; 158 159 const event = await this.sequencer.sequenceCommit(commitData); 160 this.onFirehoseEvent?.(event); 161 162 // Fire-and-forget: push new blocks to IPFS (never blocks commit path) 163 if (this.blockStore) { 164 const store = this.blockStore; 165 const net = this.networkService; 166 store 167 .putBlocks(newBlocks) 168 .then(() => { 169 if (!net) return; 170 const cidStrs: string[] = []; 171 const map = ( 172 newBlocks as unknown as { 173 map: Map<string, Uint8Array>; 174 } 175 ).map; 176 if (map) { 177 for (const cidStr of map.keys()) { 178 cidStrs.push(cidStr); 179 } 180 } 181 return net.provideBlocks(cidStrs).then(() => { 182 net.publishCommitNotification( 183 commitData.did, 184 commitData.commit.toString(), 185 commitData.rev, 186 ).catch(() => {}); 187 }); 188 }) 189 .catch(() => {}); 190 } 191 } 192 193 // ============================================ 194 // Repo Operations 195 // ============================================ 196 197 async describeRepo(): Promise<{ 198 did: string; 199 collections: string[]; 200 cid: string; 201 }> { 202 const repo = await this.getRepo(); 203 204 if (!this.storage.hasCollections() && (await this.storage.getRoot())) { 205 const seen = new Set<string>(); 206 for await (const record of repo.walkRecords()) { 207 if (!seen.has(record.collection)) { 208 seen.add(record.collection); 209 this.storage.addCollection(record.collection); 210 } 211 } 212 } 213 214 return { 215 did: repo.did, 216 collections: this.storage.getCollections(), 217 cid: repo.cid.toString(), 218 }; 219 } 220 221 async getRecord( 222 collection: string, 223 rkey: string, 224 ): Promise<{ cid: string; record: unknown } | null> { 225 const repo = await this.getRepo(); 226 227 const dataKey = `${collection}/${rkey}`; 228 const recordCid = await repo.data.get(dataKey); 229 if (!recordCid) return null; 230 231 const record = await repo.getRecord(collection, rkey); 232 if (!record) return null; 233 234 return { 235 cid: recordCid.toString(), 236 record: serializeRecord(record), 237 }; 238 } 239 240 async listRecords( 241 collection: string, 242 opts: { limit: number; cursor?: string; reverse?: boolean }, 243 ): Promise<{ 244 records: Array<{ uri: string; cid: string; value: unknown }>; 245 cursor?: string; 246 }> { 247 const repo = await this.getRepo(); 248 const records = []; 249 const startFrom = opts.cursor || `${collection}/`; 250 251 for await (const record of repo.walkRecords(startFrom)) { 252 if (record.collection !== collection) { 253 if (records.length > 0) break; 254 continue; 255 } 256 257 records.push({ 258 uri: `at://${repo.did}/${record.collection}/${record.rkey}`, 259 cid: record.cid.toString(), 260 value: serializeRecord(record.record), 261 }); 262 263 if (records.length >= opts.limit + 1) break; 264 } 265 266 if (opts.reverse) { 267 records.reverse(); 268 } 269 270 const hasMore = records.length > opts.limit; 271 const results = hasMore ? records.slice(0, opts.limit) : records; 272 const cursor = hasMore 273 ? `${collection}/${results[results.length - 1]?.uri.split("/").pop() ?? ""}` 274 : undefined; 275 276 return { records: results, cursor }; 277 } 278 279 async createRecord( 280 collection: string, 281 rkey: string | undefined, 282 record: unknown, 283 ): Promise<{ 284 uri: string; 285 cid: string; 286 commit: { cid: string; rev: string }; 287 }> { 288 await this.ensureActive(); 289 const repo = await this.getRepo(); 290 const keypair = await this.getKeypair(); 291 292 const actualRkey = rkey || tidNow(); 293 const createOp: RecordCreateOp = { 294 action: WriteOpAction.Create, 295 collection, 296 rkey: actualRkey, 297 record: jsonToLex(record as JsonValue) as RepoRecord, 298 }; 299 300 const prevRev = repo.commit.rev; 301 const updatedRepo = await repo.applyWrites([createOp], keypair); 302 this.repo = updatedRepo; 303 304 const dataKey = `${collection}/${actualRkey}`; 305 const recordCid = await this.repo.data.get(dataKey); 306 if (!recordCid) { 307 throw new Error(`Failed to create record: ${collection}/${actualRkey}`); 308 } 309 310 this.storage.addCollection(collection); 311 312 const opWithCid = { ...createOp, cid: recordCid }; 313 await this.sequenceAndBroadcast(prevRev, [opWithCid]); 314 315 return { 316 uri: `at://${this.repo.did}/${collection}/${actualRkey}`, 317 cid: recordCid.toString(), 318 commit: { 319 cid: this.repo.cid.toString(), 320 rev: this.repo.commit.rev, 321 }, 322 }; 323 } 324 325 async deleteRecord( 326 collection: string, 327 rkey: string, 328 ): Promise<{ commit: { cid: string; rev: string } } | null> { 329 await this.ensureActive(); 330 const repo = await this.getRepo(); 331 const keypair = await this.getKeypair(); 332 333 const existing = await repo.getRecord(collection, rkey); 334 if (!existing) return null; 335 336 const deleteOp: RecordDeleteOp = { 337 action: WriteOpAction.Delete, 338 collection, 339 rkey, 340 }; 341 342 const prevRev = repo.commit.rev; 343 const updatedRepo = await repo.applyWrites([deleteOp], keypair); 344 this.repo = updatedRepo; 345 346 await this.sequenceAndBroadcast(prevRev, [deleteOp]); 347 348 return { 349 commit: { 350 cid: updatedRepo.cid.toString(), 351 rev: updatedRepo.commit.rev, 352 }, 353 }; 354 } 355 356 async putRecord( 357 collection: string, 358 rkey: string, 359 record: unknown, 360 ): Promise<{ 361 uri: string; 362 cid: string; 363 commit: { cid: string; rev: string }; 364 validationStatus: string; 365 }> { 366 await this.ensureActive(); 367 const repo = await this.getRepo(); 368 const keypair = await this.getKeypair(); 369 370 const existing = await repo.getRecord(collection, rkey); 371 const isUpdate = existing !== null; 372 373 const normalizedRecord = jsonToLex(record as JsonValue) as RepoRecord; 374 const op: RecordWriteOp = isUpdate 375 ? ({ 376 action: WriteOpAction.Update, 377 collection, 378 rkey, 379 record: normalizedRecord, 380 } as RecordUpdateOp) 381 : ({ 382 action: WriteOpAction.Create, 383 collection, 384 rkey, 385 record: normalizedRecord, 386 } as RecordCreateOp); 387 388 const prevRev = repo.commit.rev; 389 const updatedRepo = await repo.applyWrites([op], keypair); 390 this.repo = updatedRepo; 391 392 const dataKey = `${collection}/${rkey}`; 393 const recordCid = await this.repo.data.get(dataKey); 394 if (!recordCid) { 395 throw new Error(`Failed to put record: ${collection}/${rkey}`); 396 } 397 398 this.storage.addCollection(collection); 399 400 const opWithCid = { ...op, cid: recordCid }; 401 await this.sequenceAndBroadcast(prevRev, [opWithCid]); 402 403 return { 404 uri: `at://${this.repo.did}/${collection}/${rkey}`, 405 cid: recordCid.toString(), 406 commit: { 407 cid: this.repo.cid.toString(), 408 rev: this.repo.commit.rev, 409 }, 410 validationStatus: "valid", 411 }; 412 } 413 414 async applyWrites( 415 writes: Array<{ 416 $type: string; 417 collection: string; 418 rkey?: string; 419 value?: unknown; 420 }>, 421 ): Promise<{ 422 commit: { cid: string; rev: string }; 423 results: Array<{ 424 $type: string; 425 uri?: string; 426 cid?: string; 427 validationStatus?: string; 428 }>; 429 }> { 430 await this.ensureActive(); 431 const repo = await this.getRepo(); 432 const keypair = await this.getKeypair(); 433 434 const ops: RecordWriteOp[] = []; 435 const results: Array<{ 436 $type: string; 437 collection: string; 438 rkey: string; 439 action: WriteOpAction; 440 }> = []; 441 442 for (const write of writes) { 443 if (write.$type === "com.atproto.repo.applyWrites#create") { 444 const rkey = write.rkey || tidNow(); 445 const op: RecordCreateOp = { 446 action: WriteOpAction.Create, 447 collection: write.collection, 448 rkey, 449 record: jsonToLex(write.value as JsonValue) as RepoRecord, 450 }; 451 ops.push(op); 452 results.push({ 453 $type: "com.atproto.repo.applyWrites#createResult", 454 collection: write.collection, 455 rkey, 456 action: WriteOpAction.Create, 457 }); 458 } else if (write.$type === "com.atproto.repo.applyWrites#update") { 459 if (!write.rkey) throw new Error("Update requires rkey"); 460 const op: RecordUpdateOp = { 461 action: WriteOpAction.Update, 462 collection: write.collection, 463 rkey: write.rkey, 464 record: jsonToLex(write.value as JsonValue) as RepoRecord, 465 }; 466 ops.push(op); 467 results.push({ 468 $type: "com.atproto.repo.applyWrites#updateResult", 469 collection: write.collection, 470 rkey: write.rkey, 471 action: WriteOpAction.Update, 472 }); 473 } else if (write.$type === "com.atproto.repo.applyWrites#delete") { 474 if (!write.rkey) throw new Error("Delete requires rkey"); 475 const op: RecordDeleteOp = { 476 action: WriteOpAction.Delete, 477 collection: write.collection, 478 rkey: write.rkey, 479 }; 480 ops.push(op); 481 results.push({ 482 $type: "com.atproto.repo.applyWrites#deleteResult", 483 collection: write.collection, 484 rkey: write.rkey, 485 action: WriteOpAction.Delete, 486 }); 487 } else { 488 throw new Error(`Unknown write type: ${write.$type}`); 489 } 490 } 491 492 const prevRev = repo.commit.rev; 493 const updatedRepo = await repo.applyWrites(ops, keypair); 494 this.repo = updatedRepo; 495 496 for (const op of ops) { 497 if (op.action !== WriteOpAction.Delete) { 498 this.storage.addCollection(op.collection); 499 } 500 } 501 502 const finalResults: Array<{ 503 $type: string; 504 uri?: string; 505 cid?: string; 506 validationStatus?: string; 507 }> = []; 508 const opsWithCids: Array<RecordWriteOp & { cid?: CID | null }> = []; 509 510 for (let i = 0; i < results.length; i++) { 511 const result = results[i]!; 512 const op = ops[i]!; 513 514 if (result.action === WriteOpAction.Delete) { 515 finalResults.push({ $type: result.$type }); 516 opsWithCids.push(op); 517 } else { 518 const dataKey = `${result.collection}/${result.rkey}`; 519 const recordCid = await this.repo.data.get(dataKey); 520 finalResults.push({ 521 $type: result.$type, 522 uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`, 523 cid: recordCid?.toString(), 524 validationStatus: "valid", 525 }); 526 opsWithCids.push({ ...op, cid: recordCid }); 527 } 528 } 529 530 await this.sequenceAndBroadcast(prevRev, opsWithCids); 531 532 return { 533 commit: { 534 cid: this.repo.cid.toString(), 535 rev: this.repo.commit.rev, 536 }, 537 results: finalResults, 538 }; 539 } 540 541 // ============================================ 542 // Repo Export/Import 543 // ============================================ 544 545 async getRepoStatus(): Promise<{ 546 did: string; 547 head: string; 548 rev: string; 549 }> { 550 const repo = await this.getRepo(); 551 return { 552 did: repo.did, 553 head: repo.cid.toString(), 554 rev: repo.commit.rev, 555 }; 556 } 557 558 async getRepoCar(): Promise<Uint8Array> { 559 const root = await this.storage.getRoot(); 560 if (!root) throw new Error("No repository root found"); 561 562 const rows = this.db 563 .prepare("SELECT cid, bytes FROM blocks") 564 .all() as Array<{ cid: string; bytes: Buffer }>; 565 566 const blocks = new BlockMap(); 567 for (const row of rows) { 568 const cid = CID.parse(row.cid); 569 const bytes = new Uint8Array(row.bytes); 570 blocks.set(cid, bytes); 571 } 572 573 return blocksToCarFile(root, blocks); 574 } 575 576 async getBlocks(cids: string[]): Promise<Uint8Array> { 577 const root = await this.storage.getRoot(); 578 if (!root) throw new Error("No repository root found"); 579 580 const blocks = new BlockMap(); 581 for (const cidStr of cids) { 582 const cid = CID.parse(cidStr); 583 const bytes = await this.storage.getBytes(cid); 584 if (bytes) { 585 blocks.set(cid, bytes); 586 } 587 } 588 589 return blocksToCarFile(root, blocks); 590 } 591 592 async getRecordProof( 593 collection: string, 594 rkey: string, 595 ): Promise<Uint8Array> { 596 const root = await this.storage.getRoot(); 597 if (!root) throw new Error("No repository root found"); 598 599 const carChunks: Uint8Array[] = []; 600 for await (const chunk of getRecords(this.storage, root, [ 601 { collection, rkey }, 602 ])) { 603 carChunks.push(chunk); 604 } 605 606 const totalLength = carChunks.reduce((acc, chunk) => acc + chunk.length, 0); 607 const result = new Uint8Array(totalLength); 608 let offset = 0; 609 for (const chunk of carChunks) { 610 result.set(chunk, offset); 611 offset += chunk.length; 612 } 613 614 return result; 615 } 616 617 async importRepo(carBytes: Uint8Array): Promise<{ 618 did: string; 619 rev: string; 620 cid: string; 621 }> { 622 const isActive = await this.storage.getActive(); 623 const existingRoot = await this.storage.getRoot(); 624 625 if (isActive && existingRoot) { 626 throw new Error( 627 "Repository already exists. Cannot import over existing repository.", 628 ); 629 } 630 631 if (existingRoot) { 632 await this.storage.destroy(); 633 this.repo = null; 634 this.repoInitialized = false; 635 } 636 637 const { root: rootCid, blocks } = await readCarWithRoot(carBytes); 638 639 const importRev = tidNow(); 640 await this.storage.putMany(blocks, importRev); 641 642 if (!this.config.DID || !this.config.SIGNING_KEY) { 643 throw new Error("RepoManager requires DID and SIGNING_KEY for import"); 644 } 645 646 this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY); 647 this.repo = await Repo.load(this.storage, rootCid); 648 649 await this.storage.updateRoot(rootCid, this.repo.commit.rev); 650 651 if (this.repo.did !== this.config.DID) { 652 await this.storage.destroy(); 653 throw new Error( 654 `DID mismatch: CAR file contains DID ${this.repo.did}, but expected ${this.config.DID}`, 655 ); 656 } 657 658 this.repoInitialized = true; 659 660 const seenCollections = new Set<string>(); 661 for await (const record of this.repo.walkRecords()) { 662 if (!seenCollections.has(record.collection)) { 663 seenCollections.add(record.collection); 664 this.storage.addCollection(record.collection); 665 } 666 const blobCids = extractBlobCids(record.record); 667 if (blobCids.length > 0) { 668 const uri = `at://${this.repo.did}/${record.collection}/${record.rkey}`; 669 this.storage.addRecordBlobs(uri, blobCids); 670 } 671 } 672 673 return { 674 did: this.repo.did, 675 rev: this.repo.commit.rev, 676 cid: rootCid.toString(), 677 }; 678 } 679 680 // ============================================ 681 // Blob Operations 682 // ============================================ 683 684 async uploadBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> { 685 if (!this.blobStore) { 686 throw new Error("Blob storage not configured"); 687 } 688 689 const MAX_BLOB_SIZE = 60 * 1024 * 1024; 690 if (bytes.length > MAX_BLOB_SIZE) { 691 throw new Error( 692 `Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`, 693 ); 694 } 695 696 const blobRef = await this.blobStore.putBlob(bytes, mimeType); 697 this.storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType); 698 699 // Fire-and-forget: push blob to IPFS 700 if (this.blockStore) { 701 this.blockStore 702 .putBlock(blobRef.ref.$link, bytes) 703 .catch(() => {}); 704 } 705 706 return blobRef; 707 } 708 709 // ============================================ 710 // Preferences 711 // ============================================ 712 713 async getPreferences(): Promise<{ preferences: unknown[] }> { 714 const preferences = await this.storage.getPreferences(); 715 return { preferences }; 716 } 717 718 async putPreferences(preferences: unknown[]): Promise<void> { 719 await this.storage.putPreferences(preferences); 720 } 721 722 // ============================================ 723 // Account State 724 // ============================================ 725 726 async getEmail(): Promise<{ email: string | null }> { 727 return { email: this.storage.getEmail() }; 728 } 729 730 async updateEmail(email: string): Promise<void> { 731 this.storage.setEmail(email); 732 } 733 734 async getActive(): Promise<boolean> { 735 return this.storage.getActive(); 736 } 737 738 async activateAccount(): Promise<void> { 739 await this.storage.setActive(true); 740 } 741 742 async deactivateAccount(): Promise<void> { 743 await this.storage.setActive(false); 744 } 745 746 // ============================================ 747 // Migration Progress 748 // ============================================ 749 750 async countBlocks(): Promise<number> { 751 return this.storage.countBlocks(); 752 } 753 754 async countRecords(): Promise<number> { 755 const repo = await this.getRepo(); 756 let count = 0; 757 for await (const _record of repo.walkRecords()) { 758 count++; 759 } 760 return count; 761 } 762 763 async countExpectedBlobs(): Promise<number> { 764 return this.storage.countExpectedBlobs(); 765 } 766 767 async countImportedBlobs(): Promise<number> { 768 return this.storage.countImportedBlobs(); 769 } 770 771 async listMissingBlobs( 772 limit: number = 500, 773 cursor?: string, 774 ): Promise<{ 775 blobs: Array<{ cid: string; recordUri: string }>; 776 cursor?: string; 777 }> { 778 return this.storage.listMissingBlobs(limit, cursor); 779 } 780 781 async resetMigration(): Promise<{ 782 blocksDeleted: number; 783 blobsCleared: number; 784 }> { 785 const isActive = await this.storage.getActive(); 786 if (isActive) { 787 throw new Error( 788 "AccountActive: Cannot reset migration on an active account. Deactivate first.", 789 ); 790 } 791 792 const blocksDeleted = await this.storage.countBlocks(); 793 const blobsCleared = this.storage.countImportedBlobs(); 794 795 await this.storage.destroy(); 796 this.storage.clearBlobTracking(); 797 798 this.repo = null; 799 this.repoInitialized = false; 800 801 return { blocksDeleted, blobsCleared }; 802 } 803 804 // ============================================ 805 // Identity Events 806 // ============================================ 807 808 async emitIdentityEvent(handle: string): Promise<{ seq: number }> { 809 const time = new Date().toISOString(); 810 811 const result = this.db 812 .prepare( 813 `INSERT INTO firehose_events (event_type, payload) 814 VALUES ('identity', ?)`, 815 ) 816 .run(Buffer.alloc(0)); 817 const seq = Number(result.lastInsertRowid); 818 819 const event: SeqIdentityEvent = { 820 seq, 821 type: "identity", 822 event: { 823 seq, 824 did: this.config.DID ?? "", 825 time, 826 handle, 827 }, 828 time, 829 }; 830 831 this.onFirehoseEvent?.(event); 832 833 return { seq }; 834 } 835 836 // ============================================ 837 // Health 838 // ============================================ 839 840 healthCheck(): { ok: true } { 841 this.db.prepare("SELECT 1").get(); 842 return { ok: true }; 843 } 844 845 getFirehoseStatus(subscriberCount: number): { 846 subscribers: number; 847 latestSeq: number | null; 848 } { 849 const seq = this.sequencer.getLatestSeq(); 850 return { 851 subscribers: subscriberCount, 852 latestSeq: seq || null, 853 }; 854 } 855} 856 857// ============================================ 858// Utility Functions 859// ============================================ 860 861/** 862 * Serialize a record for JSON by converting CID objects to { $link: "..." } format. 863 */ 864function serializeRecord(obj: unknown): unknown { 865 if (obj === null || obj === undefined) return obj; 866 867 const cid = asCid(obj); 868 if (cid) { 869 return { $link: cid.toString() }; 870 } 871 872 if (obj instanceof Uint8Array) { 873 let binary = ""; 874 for (let i = 0; i < obj.length; i++) { 875 binary += String.fromCharCode(obj[i]!); 876 } 877 return { $bytes: btoa(binary) }; 878 } 879 880 if (Array.isArray(obj)) { 881 return obj.map(serializeRecord); 882 } 883 884 if (typeof obj === "object") { 885 const result: Record<string, unknown> = {}; 886 for (const [key, value] of Object.entries(obj)) { 887 result[key] = serializeRecord(value); 888 } 889 return result; 890 } 891 892 return obj; 893} 894 895/** 896 * Extract blob CIDs from a record by recursively searching for blob references. 897 */ 898export function extractBlobCids(obj: unknown): string[] { 899 const cids: string[] = []; 900 901 function walk(value: unknown): void { 902 if (value === null || value === undefined) return; 903 904 if (isBlobRef(value)) { 905 cids.push(value.ref.toString()); 906 return; 907 } 908 909 if (Array.isArray(value)) { 910 for (const item of value) { 911 walk(item); 912 } 913 } else if (typeof value === "object") { 914 for (const key of Object.keys(value as Record<string, unknown>)) { 915 walk((value as Record<string, unknown>)[key]); 916 } 917 } 918 } 919 920 walk(obj); 921 return cids; 922}