atproto user agency toolkit for individuals and groups
1import type Database from "better-sqlite3";
2import {
3 Repo,
4 WriteOpAction,
5 BlockMap,
6 blocksToCarFile,
7 readCarWithRoot,
8 getRecords,
9 type RecordCreateOp,
10 type RecordUpdateOp,
11 type RecordDeleteOp,
12 type RecordWriteOp,
13} from "@atproto/repo";
14type RepoRecord = Record<string, unknown>;
15import { Secp256k1Keypair } from "@atproto/crypto";
16import { CID, asCid, isBlobRef } from "@atproto/lex-data";
17import { now as tidNow } from "@atcute/tid";
18import { encode as cborEncode } from "./cbor-compat.js";
19import { SqliteRepoStorage } from "./storage.js";
20import {
21 Sequencer,
22 type SeqEvent,
23 type SeqCommitEvent,
24 type SeqIdentityEvent,
25 type CommitData,
26} from "./sequencer.js";
27import { BlobStore, type BlobRef } from "./blobs.js";
28import { jsonToLex, type JsonValue } from "@atproto/lex-json";
29import type { Config } from "./config.js";
30import type { BlockStore, NetworkService } from "./ipfs.js";
31
32/**
33 * RepoManager - manages a single user's AT Protocol repository.
34 *
35 * This is the Node.js equivalent of Cirrus's AccountDurableObject,
36 * converted from a Cloudflare Durable Object to a plain class.
37 */
38export class RepoManager {
39 storage: SqliteRepoStorage;
40 private repo: Repo | null = null;
41 private keypair: Secp256k1Keypair | null = null;
42 sequencer: Sequencer;
43 blobStore: BlobStore | null = null;
44 blockStore: BlockStore | null = null;
45 networkService: NetworkService | null = null;
46 private repoInitialized = false;
47
48 /** Callback invoked when a firehose event is produced */
49 onFirehoseEvent?: (event: SeqEvent) => void;
50
51 constructor(
52 private db: Database.Database,
53 private config: Config,
54 ) {
55 this.storage = new SqliteRepoStorage(db);
56 this.sequencer = new Sequencer(db);
57 }
58
59 /**
60 * Initialize storage schema and optionally the blob store.
61 */
62 init(
63 blobStore?: BlobStore,
64 blockStore?: BlockStore,
65 networkService?: NetworkService,
66 ): void {
67 this.storage.initSchema(true);
68 if (blobStore) {
69 this.blobStore = blobStore;
70 }
71 if (blockStore) {
72 this.blockStore = blockStore;
73 }
74 if (networkService) {
75 this.networkService = networkService;
76 }
77 }
78
79 /**
80 * Initialize the Repo instance. Called lazily on first repo access.
81 */
82 private async ensureRepoInitialized(): Promise<void> {
83 if (this.repoInitialized) return;
84
85 if (!this.config.DID || !this.config.SIGNING_KEY) {
86 throw new Error("RepoManager requires DID and SIGNING_KEY to be configured");
87 }
88
89 this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY);
90
91 const root = await this.storage.getRoot();
92 if (root) {
93 this.repo = await Repo.load(this.storage, root);
94 } else {
95 this.repo = await Repo.create(
96 this.storage,
97 this.config.DID,
98 this.keypair,
99 );
100 }
101
102 this.repoInitialized = true;
103 }
104
105 async getRepo(): Promise<Repo> {
106 await this.ensureRepoInitialized();
107 return this.repo!;
108 }
109
110 async getKeypair(): Promise<Secp256k1Keypair> {
111 await this.ensureRepoInitialized();
112 return this.keypair!;
113 }
114
115 async ensureActive(): Promise<void> {
116 const isActive = await this.storage.getActive();
117 if (!isActive) {
118 throw new Error(
119 "AccountDeactivated: Account is deactivated. Call activateAccount to enable writes.",
120 );
121 }
122 }
123
124 /**
125 * Get new blocks for the current revision from the database.
126 */
127 private getNewBlocksForRev(rev: string): BlockMap {
128 const newBlocks = new BlockMap();
129 const rows = this.db
130 .prepare("SELECT cid, bytes FROM blocks WHERE rev = ?")
131 .all(rev) as Array<{ cid: string; bytes: Buffer }>;
132
133 for (const row of rows) {
134 const cid = CID.parse(row.cid);
135 const bytes = new Uint8Array(row.bytes);
136 newBlocks.set(cid, bytes);
137 }
138 return newBlocks;
139 }
140
141 /**
142 * Sequence a commit and broadcast to firehose listeners.
143 */
144 private async sequenceAndBroadcast(
145 prevRev: string,
146 ops: Array<RecordWriteOp & { cid?: CID | null }>,
147 ): Promise<void> {
148 const newBlocks = this.getNewBlocksForRev(this.repo!.commit.rev);
149
150 const commitData: CommitData = {
151 did: this.repo!.did,
152 commit: this.repo!.cid,
153 rev: this.repo!.commit.rev,
154 since: prevRev,
155 newBlocks,
156 ops,
157 };
158
159 const event = await this.sequencer.sequenceCommit(commitData);
160 this.onFirehoseEvent?.(event);
161
162 // Fire-and-forget: push new blocks to IPFS (never blocks commit path)
163 if (this.blockStore) {
164 const store = this.blockStore;
165 const net = this.networkService;
166 store
167 .putBlocks(newBlocks)
168 .then(() => {
169 if (!net) return;
170 const cidStrs: string[] = [];
171 const map = (
172 newBlocks as unknown as {
173 map: Map<string, Uint8Array>;
174 }
175 ).map;
176 if (map) {
177 for (const cidStr of map.keys()) {
178 cidStrs.push(cidStr);
179 }
180 }
181 return net.provideBlocks(cidStrs).then(() => {
182 net.publishCommitNotification(
183 commitData.did,
184 commitData.commit.toString(),
185 commitData.rev,
186 ).catch(() => {});
187 });
188 })
189 .catch(() => {});
190 }
191 }
192
193 // ============================================
194 // Repo Operations
195 // ============================================
196
197 async describeRepo(): Promise<{
198 did: string;
199 collections: string[];
200 cid: string;
201 }> {
202 const repo = await this.getRepo();
203
204 if (!this.storage.hasCollections() && (await this.storage.getRoot())) {
205 const seen = new Set<string>();
206 for await (const record of repo.walkRecords()) {
207 if (!seen.has(record.collection)) {
208 seen.add(record.collection);
209 this.storage.addCollection(record.collection);
210 }
211 }
212 }
213
214 return {
215 did: repo.did,
216 collections: this.storage.getCollections(),
217 cid: repo.cid.toString(),
218 };
219 }
220
221 async getRecord(
222 collection: string,
223 rkey: string,
224 ): Promise<{ cid: string; record: unknown } | null> {
225 const repo = await this.getRepo();
226
227 const dataKey = `${collection}/${rkey}`;
228 const recordCid = await repo.data.get(dataKey);
229 if (!recordCid) return null;
230
231 const record = await repo.getRecord(collection, rkey);
232 if (!record) return null;
233
234 return {
235 cid: recordCid.toString(),
236 record: serializeRecord(record),
237 };
238 }
239
240 async listRecords(
241 collection: string,
242 opts: { limit: number; cursor?: string; reverse?: boolean },
243 ): Promise<{
244 records: Array<{ uri: string; cid: string; value: unknown }>;
245 cursor?: string;
246 }> {
247 const repo = await this.getRepo();
248 const records = [];
249 const startFrom = opts.cursor || `${collection}/`;
250
251 for await (const record of repo.walkRecords(startFrom)) {
252 if (record.collection !== collection) {
253 if (records.length > 0) break;
254 continue;
255 }
256
257 records.push({
258 uri: `at://${repo.did}/${record.collection}/${record.rkey}`,
259 cid: record.cid.toString(),
260 value: serializeRecord(record.record),
261 });
262
263 if (records.length >= opts.limit + 1) break;
264 }
265
266 if (opts.reverse) {
267 records.reverse();
268 }
269
270 const hasMore = records.length > opts.limit;
271 const results = hasMore ? records.slice(0, opts.limit) : records;
272 const cursor = hasMore
273 ? `${collection}/${results[results.length - 1]?.uri.split("/").pop() ?? ""}`
274 : undefined;
275
276 return { records: results, cursor };
277 }
278
279 async createRecord(
280 collection: string,
281 rkey: string | undefined,
282 record: unknown,
283 ): Promise<{
284 uri: string;
285 cid: string;
286 commit: { cid: string; rev: string };
287 }> {
288 await this.ensureActive();
289 const repo = await this.getRepo();
290 const keypair = await this.getKeypair();
291
292 const actualRkey = rkey || tidNow();
293 const createOp: RecordCreateOp = {
294 action: WriteOpAction.Create,
295 collection,
296 rkey: actualRkey,
297 record: jsonToLex(record as JsonValue) as RepoRecord,
298 };
299
300 const prevRev = repo.commit.rev;
301 const updatedRepo = await repo.applyWrites([createOp], keypair);
302 this.repo = updatedRepo;
303
304 const dataKey = `${collection}/${actualRkey}`;
305 const recordCid = await this.repo.data.get(dataKey);
306 if (!recordCid) {
307 throw new Error(`Failed to create record: ${collection}/${actualRkey}`);
308 }
309
310 this.storage.addCollection(collection);
311
312 const opWithCid = { ...createOp, cid: recordCid };
313 await this.sequenceAndBroadcast(prevRev, [opWithCid]);
314
315 return {
316 uri: `at://${this.repo.did}/${collection}/${actualRkey}`,
317 cid: recordCid.toString(),
318 commit: {
319 cid: this.repo.cid.toString(),
320 rev: this.repo.commit.rev,
321 },
322 };
323 }
324
325 async deleteRecord(
326 collection: string,
327 rkey: string,
328 ): Promise<{ commit: { cid: string; rev: string } } | null> {
329 await this.ensureActive();
330 const repo = await this.getRepo();
331 const keypair = await this.getKeypair();
332
333 const existing = await repo.getRecord(collection, rkey);
334 if (!existing) return null;
335
336 const deleteOp: RecordDeleteOp = {
337 action: WriteOpAction.Delete,
338 collection,
339 rkey,
340 };
341
342 const prevRev = repo.commit.rev;
343 const updatedRepo = await repo.applyWrites([deleteOp], keypair);
344 this.repo = updatedRepo;
345
346 await this.sequenceAndBroadcast(prevRev, [deleteOp]);
347
348 return {
349 commit: {
350 cid: updatedRepo.cid.toString(),
351 rev: updatedRepo.commit.rev,
352 },
353 };
354 }
355
356 async putRecord(
357 collection: string,
358 rkey: string,
359 record: unknown,
360 ): Promise<{
361 uri: string;
362 cid: string;
363 commit: { cid: string; rev: string };
364 validationStatus: string;
365 }> {
366 await this.ensureActive();
367 const repo = await this.getRepo();
368 const keypair = await this.getKeypair();
369
370 const existing = await repo.getRecord(collection, rkey);
371 const isUpdate = existing !== null;
372
373 const normalizedRecord = jsonToLex(record as JsonValue) as RepoRecord;
374 const op: RecordWriteOp = isUpdate
375 ? ({
376 action: WriteOpAction.Update,
377 collection,
378 rkey,
379 record: normalizedRecord,
380 } as RecordUpdateOp)
381 : ({
382 action: WriteOpAction.Create,
383 collection,
384 rkey,
385 record: normalizedRecord,
386 } as RecordCreateOp);
387
388 const prevRev = repo.commit.rev;
389 const updatedRepo = await repo.applyWrites([op], keypair);
390 this.repo = updatedRepo;
391
392 const dataKey = `${collection}/${rkey}`;
393 const recordCid = await this.repo.data.get(dataKey);
394 if (!recordCid) {
395 throw new Error(`Failed to put record: ${collection}/${rkey}`);
396 }
397
398 this.storage.addCollection(collection);
399
400 const opWithCid = { ...op, cid: recordCid };
401 await this.sequenceAndBroadcast(prevRev, [opWithCid]);
402
403 return {
404 uri: `at://${this.repo.did}/${collection}/${rkey}`,
405 cid: recordCid.toString(),
406 commit: {
407 cid: this.repo.cid.toString(),
408 rev: this.repo.commit.rev,
409 },
410 validationStatus: "valid",
411 };
412 }
413
414 async applyWrites(
415 writes: Array<{
416 $type: string;
417 collection: string;
418 rkey?: string;
419 value?: unknown;
420 }>,
421 ): Promise<{
422 commit: { cid: string; rev: string };
423 results: Array<{
424 $type: string;
425 uri?: string;
426 cid?: string;
427 validationStatus?: string;
428 }>;
429 }> {
430 await this.ensureActive();
431 const repo = await this.getRepo();
432 const keypair = await this.getKeypair();
433
434 const ops: RecordWriteOp[] = [];
435 const results: Array<{
436 $type: string;
437 collection: string;
438 rkey: string;
439 action: WriteOpAction;
440 }> = [];
441
442 for (const write of writes) {
443 if (write.$type === "com.atproto.repo.applyWrites#create") {
444 const rkey = write.rkey || tidNow();
445 const op: RecordCreateOp = {
446 action: WriteOpAction.Create,
447 collection: write.collection,
448 rkey,
449 record: jsonToLex(write.value as JsonValue) as RepoRecord,
450 };
451 ops.push(op);
452 results.push({
453 $type: "com.atproto.repo.applyWrites#createResult",
454 collection: write.collection,
455 rkey,
456 action: WriteOpAction.Create,
457 });
458 } else if (write.$type === "com.atproto.repo.applyWrites#update") {
459 if (!write.rkey) throw new Error("Update requires rkey");
460 const op: RecordUpdateOp = {
461 action: WriteOpAction.Update,
462 collection: write.collection,
463 rkey: write.rkey,
464 record: jsonToLex(write.value as JsonValue) as RepoRecord,
465 };
466 ops.push(op);
467 results.push({
468 $type: "com.atproto.repo.applyWrites#updateResult",
469 collection: write.collection,
470 rkey: write.rkey,
471 action: WriteOpAction.Update,
472 });
473 } else if (write.$type === "com.atproto.repo.applyWrites#delete") {
474 if (!write.rkey) throw new Error("Delete requires rkey");
475 const op: RecordDeleteOp = {
476 action: WriteOpAction.Delete,
477 collection: write.collection,
478 rkey: write.rkey,
479 };
480 ops.push(op);
481 results.push({
482 $type: "com.atproto.repo.applyWrites#deleteResult",
483 collection: write.collection,
484 rkey: write.rkey,
485 action: WriteOpAction.Delete,
486 });
487 } else {
488 throw new Error(`Unknown write type: ${write.$type}`);
489 }
490 }
491
492 const prevRev = repo.commit.rev;
493 const updatedRepo = await repo.applyWrites(ops, keypair);
494 this.repo = updatedRepo;
495
496 for (const op of ops) {
497 if (op.action !== WriteOpAction.Delete) {
498 this.storage.addCollection(op.collection);
499 }
500 }
501
502 const finalResults: Array<{
503 $type: string;
504 uri?: string;
505 cid?: string;
506 validationStatus?: string;
507 }> = [];
508 const opsWithCids: Array<RecordWriteOp & { cid?: CID | null }> = [];
509
510 for (let i = 0; i < results.length; i++) {
511 const result = results[i]!;
512 const op = ops[i]!;
513
514 if (result.action === WriteOpAction.Delete) {
515 finalResults.push({ $type: result.$type });
516 opsWithCids.push(op);
517 } else {
518 const dataKey = `${result.collection}/${result.rkey}`;
519 const recordCid = await this.repo.data.get(dataKey);
520 finalResults.push({
521 $type: result.$type,
522 uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`,
523 cid: recordCid?.toString(),
524 validationStatus: "valid",
525 });
526 opsWithCids.push({ ...op, cid: recordCid });
527 }
528 }
529
530 await this.sequenceAndBroadcast(prevRev, opsWithCids);
531
532 return {
533 commit: {
534 cid: this.repo.cid.toString(),
535 rev: this.repo.commit.rev,
536 },
537 results: finalResults,
538 };
539 }
540
541 // ============================================
542 // Repo Export/Import
543 // ============================================
544
545 async getRepoStatus(): Promise<{
546 did: string;
547 head: string;
548 rev: string;
549 }> {
550 const repo = await this.getRepo();
551 return {
552 did: repo.did,
553 head: repo.cid.toString(),
554 rev: repo.commit.rev,
555 };
556 }
557
558 async getRepoCar(): Promise<Uint8Array> {
559 const root = await this.storage.getRoot();
560 if (!root) throw new Error("No repository root found");
561
562 const rows = this.db
563 .prepare("SELECT cid, bytes FROM blocks")
564 .all() as Array<{ cid: string; bytes: Buffer }>;
565
566 const blocks = new BlockMap();
567 for (const row of rows) {
568 const cid = CID.parse(row.cid);
569 const bytes = new Uint8Array(row.bytes);
570 blocks.set(cid, bytes);
571 }
572
573 return blocksToCarFile(root, blocks);
574 }
575
576 async getBlocks(cids: string[]): Promise<Uint8Array> {
577 const root = await this.storage.getRoot();
578 if (!root) throw new Error("No repository root found");
579
580 const blocks = new BlockMap();
581 for (const cidStr of cids) {
582 const cid = CID.parse(cidStr);
583 const bytes = await this.storage.getBytes(cid);
584 if (bytes) {
585 blocks.set(cid, bytes);
586 }
587 }
588
589 return blocksToCarFile(root, blocks);
590 }
591
592 async getRecordProof(
593 collection: string,
594 rkey: string,
595 ): Promise<Uint8Array> {
596 const root = await this.storage.getRoot();
597 if (!root) throw new Error("No repository root found");
598
599 const carChunks: Uint8Array[] = [];
600 for await (const chunk of getRecords(this.storage, root, [
601 { collection, rkey },
602 ])) {
603 carChunks.push(chunk);
604 }
605
606 const totalLength = carChunks.reduce((acc, chunk) => acc + chunk.length, 0);
607 const result = new Uint8Array(totalLength);
608 let offset = 0;
609 for (const chunk of carChunks) {
610 result.set(chunk, offset);
611 offset += chunk.length;
612 }
613
614 return result;
615 }
616
617 async importRepo(carBytes: Uint8Array): Promise<{
618 did: string;
619 rev: string;
620 cid: string;
621 }> {
622 const isActive = await this.storage.getActive();
623 const existingRoot = await this.storage.getRoot();
624
625 if (isActive && existingRoot) {
626 throw new Error(
627 "Repository already exists. Cannot import over existing repository.",
628 );
629 }
630
631 if (existingRoot) {
632 await this.storage.destroy();
633 this.repo = null;
634 this.repoInitialized = false;
635 }
636
637 const { root: rootCid, blocks } = await readCarWithRoot(carBytes);
638
639 const importRev = tidNow();
640 await this.storage.putMany(blocks, importRev);
641
642 if (!this.config.DID || !this.config.SIGNING_KEY) {
643 throw new Error("RepoManager requires DID and SIGNING_KEY for import");
644 }
645
646 this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY);
647 this.repo = await Repo.load(this.storage, rootCid);
648
649 await this.storage.updateRoot(rootCid, this.repo.commit.rev);
650
651 if (this.repo.did !== this.config.DID) {
652 await this.storage.destroy();
653 throw new Error(
654 `DID mismatch: CAR file contains DID ${this.repo.did}, but expected ${this.config.DID}`,
655 );
656 }
657
658 this.repoInitialized = true;
659
660 const seenCollections = new Set<string>();
661 for await (const record of this.repo.walkRecords()) {
662 if (!seenCollections.has(record.collection)) {
663 seenCollections.add(record.collection);
664 this.storage.addCollection(record.collection);
665 }
666 const blobCids = extractBlobCids(record.record);
667 if (blobCids.length > 0) {
668 const uri = `at://${this.repo.did}/${record.collection}/${record.rkey}`;
669 this.storage.addRecordBlobs(uri, blobCids);
670 }
671 }
672
673 return {
674 did: this.repo.did,
675 rev: this.repo.commit.rev,
676 cid: rootCid.toString(),
677 };
678 }
679
680 // ============================================
681 // Blob Operations
682 // ============================================
683
684 async uploadBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> {
685 if (!this.blobStore) {
686 throw new Error("Blob storage not configured");
687 }
688
689 const MAX_BLOB_SIZE = 60 * 1024 * 1024;
690 if (bytes.length > MAX_BLOB_SIZE) {
691 throw new Error(
692 `Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`,
693 );
694 }
695
696 const blobRef = await this.blobStore.putBlob(bytes, mimeType);
697 this.storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType);
698
699 // Fire-and-forget: push blob to IPFS
700 if (this.blockStore) {
701 this.blockStore
702 .putBlock(blobRef.ref.$link, bytes)
703 .catch(() => {});
704 }
705
706 return blobRef;
707 }
708
709 // ============================================
710 // Preferences
711 // ============================================
712
713 async getPreferences(): Promise<{ preferences: unknown[] }> {
714 const preferences = await this.storage.getPreferences();
715 return { preferences };
716 }
717
718 async putPreferences(preferences: unknown[]): Promise<void> {
719 await this.storage.putPreferences(preferences);
720 }
721
722 // ============================================
723 // Account State
724 // ============================================
725
726 async getEmail(): Promise<{ email: string | null }> {
727 return { email: this.storage.getEmail() };
728 }
729
730 async updateEmail(email: string): Promise<void> {
731 this.storage.setEmail(email);
732 }
733
734 async getActive(): Promise<boolean> {
735 return this.storage.getActive();
736 }
737
738 async activateAccount(): Promise<void> {
739 await this.storage.setActive(true);
740 }
741
742 async deactivateAccount(): Promise<void> {
743 await this.storage.setActive(false);
744 }
745
746 // ============================================
747 // Migration Progress
748 // ============================================
749
750 async countBlocks(): Promise<number> {
751 return this.storage.countBlocks();
752 }
753
754 async countRecords(): Promise<number> {
755 const repo = await this.getRepo();
756 let count = 0;
757 for await (const _record of repo.walkRecords()) {
758 count++;
759 }
760 return count;
761 }
762
763 async countExpectedBlobs(): Promise<number> {
764 return this.storage.countExpectedBlobs();
765 }
766
767 async countImportedBlobs(): Promise<number> {
768 return this.storage.countImportedBlobs();
769 }
770
771 async listMissingBlobs(
772 limit: number = 500,
773 cursor?: string,
774 ): Promise<{
775 blobs: Array<{ cid: string; recordUri: string }>;
776 cursor?: string;
777 }> {
778 return this.storage.listMissingBlobs(limit, cursor);
779 }
780
781 async resetMigration(): Promise<{
782 blocksDeleted: number;
783 blobsCleared: number;
784 }> {
785 const isActive = await this.storage.getActive();
786 if (isActive) {
787 throw new Error(
788 "AccountActive: Cannot reset migration on an active account. Deactivate first.",
789 );
790 }
791
792 const blocksDeleted = await this.storage.countBlocks();
793 const blobsCleared = this.storage.countImportedBlobs();
794
795 await this.storage.destroy();
796 this.storage.clearBlobTracking();
797
798 this.repo = null;
799 this.repoInitialized = false;
800
801 return { blocksDeleted, blobsCleared };
802 }
803
804 // ============================================
805 // Identity Events
806 // ============================================
807
808 async emitIdentityEvent(handle: string): Promise<{ seq: number }> {
809 const time = new Date().toISOString();
810
811 const result = this.db
812 .prepare(
813 `INSERT INTO firehose_events (event_type, payload)
814 VALUES ('identity', ?)`,
815 )
816 .run(Buffer.alloc(0));
817 const seq = Number(result.lastInsertRowid);
818
819 const event: SeqIdentityEvent = {
820 seq,
821 type: "identity",
822 event: {
823 seq,
824 did: this.config.DID ?? "",
825 time,
826 handle,
827 },
828 time,
829 };
830
831 this.onFirehoseEvent?.(event);
832
833 return { seq };
834 }
835
836 // ============================================
837 // Health
838 // ============================================
839
840 healthCheck(): { ok: true } {
841 this.db.prepare("SELECT 1").get();
842 return { ok: true };
843 }
844
845 getFirehoseStatus(subscriberCount: number): {
846 subscribers: number;
847 latestSeq: number | null;
848 } {
849 const seq = this.sequencer.getLatestSeq();
850 return {
851 subscribers: subscriberCount,
852 latestSeq: seq || null,
853 };
854 }
855}
856
857// ============================================
858// Utility Functions
859// ============================================
860
861/**
862 * Serialize a record for JSON by converting CID objects to { $link: "..." } format.
863 */
864function serializeRecord(obj: unknown): unknown {
865 if (obj === null || obj === undefined) return obj;
866
867 const cid = asCid(obj);
868 if (cid) {
869 return { $link: cid.toString() };
870 }
871
872 if (obj instanceof Uint8Array) {
873 let binary = "";
874 for (let i = 0; i < obj.length; i++) {
875 binary += String.fromCharCode(obj[i]!);
876 }
877 return { $bytes: btoa(binary) };
878 }
879
880 if (Array.isArray(obj)) {
881 return obj.map(serializeRecord);
882 }
883
884 if (typeof obj === "object") {
885 const result: Record<string, unknown> = {};
886 for (const [key, value] of Object.entries(obj)) {
887 result[key] = serializeRecord(value);
888 }
889 return result;
890 }
891
892 return obj;
893}
894
895/**
896 * Extract blob CIDs from a record by recursively searching for blob references.
897 */
898export function extractBlobCids(obj: unknown): string[] {
899 const cids: string[] = [];
900
901 function walk(value: unknown): void {
902 if (value === null || value === undefined) return;
903
904 if (isBlobRef(value)) {
905 cids.push(value.ref.toString());
906 return;
907 }
908
909 if (Array.isArray(value)) {
910 for (const item of value) {
911 walk(item);
912 }
913 } else if (typeof value === "object") {
914 for (const key of Object.keys(value as Record<string, unknown>)) {
915 walk((value as Record<string, unknown>)[key]);
916 }
917 }
918 }
919
920 walk(obj);
921 return cids;
922}