atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Extract BlockStore and NetworkService interfaces for transport-agnostic architecture

Splits IpfsService into two narrow interfaces (BlockStore for storage, NetworkService
for P2P networking) so consumers depend only on what they need. Eases future transport
migrations (e.g. Iroh, Hyperswarm) without touching storage or verification code.

+86 -41
+8 -8
src/index.ts
··· 4 4 import type { RepoManager } from "./repo-manager.js"; 5 5 import type { Firehose } from "./firehose.js"; 6 6 import type { Config } from "./config.js"; 7 - import type { IpfsService } from "./ipfs.js"; 7 + import type { BlockStore, NetworkService } from "./ipfs.js"; 8 8 import type { BlobStore } from "./blobs.js"; 9 9 import type { ReplicationManager } from "./replication/replication-manager.js"; 10 10 import * as sync from "./xrpc/sync.js"; ··· 21 21 config: Config, 22 22 repoManager: RepoManager, 23 23 firehose: Firehose, 24 - ipfsService?: IpfsService, 24 + blockStore?: BlockStore, 25 + networkService?: NetworkService, 25 26 blobStore?: BlobStore, 26 27 replicationManager?: ReplicationManager, 27 28 ) { ··· 94 95 let bytes: Uint8Array | null = null; 95 96 96 97 // 1. Try IPFS blockstore (has all repo blocks + blobs) 97 - if (ipfsService) { 98 - bytes = await ipfsService.getBlock(cidStr); 98 + if (blockStore) { 99 + bytes = await blockStore.getBlock(cidStr); 99 100 } 100 101 101 102 // 2. Fallback: SQLite blocks table ··· 141 142 status: "ok", 142 143 version: VERSION, 143 144 }; 144 - if (ipfsService) { 145 + if (blockStore) { 145 146 health.ipfs = { 146 147 enabled: true, 147 - running: ipfsService.isRunning(), 148 - peerId: ipfsService.getPeerId(), 149 - connections: ipfsService.getConnectionCount(), 148 + peerId: networkService?.getPeerId() ?? null, 149 + connections: networkService?.getConnectionCount() ?? 0, 150 150 }; 151 151 } 152 152 return c.json(health);
+1 -1
src/ipfs.test.ts
··· 223 223 224 224 blobStore = new BlobStore(tmpDir, config.DID); 225 225 226 - app = createApp(config, repoManager, firehose, ipfsService, blobStore); 226 + app = createApp(config, repoManager, firehose, ipfsService, ipfsService, blobStore); 227 227 }); 228 228 229 229 afterEach(async () => {
+29 -1
src/ipfs.ts
··· 4 4 import type { Helia } from "@helia/interface"; 5 5 import type { BlockMap } from "@atproto/repo"; 6 6 7 + /** 8 + * Pure storage: put, get, has blocks by CID string. 9 + * No networking, no peer identity — just content-addressed bytes. 10 + */ 11 + export interface BlockStore { 12 + putBlock(cidStr: string, bytes: Uint8Array): Promise<void>; 13 + getBlock(cidStr: string): Promise<Uint8Array | null>; 14 + hasBlock(cidStr: string): Promise<boolean>; 15 + putBlocks(blocks: BlockMap): Promise<void>; 16 + } 17 + 18 + /** 19 + * P2P networking: content routing, peer identity, connectivity. 20 + * Separated from storage so transports can be swapped independently. 21 + */ 22 + export interface NetworkService { 23 + provideBlocks(cidStrs: string[]): Promise<void>; 24 + publishCommitNotification( 25 + did: string, 26 + commitCid: string, 27 + rev: string, 28 + ): Promise<void>; 29 + getPeerId(): string | null; 30 + getMultiaddrs(): string[]; 31 + getConnectionCount(): number; 32 + } 33 + 7 34 export interface IpfsConfig { 8 35 blocksPath: string; 9 36 datastorePath: string; ··· 13 40 /** 14 41 * IpfsService encapsulates all Helia/IPFS functionality. 15 42 * 43 + * Implements both BlockStore (storage) and NetworkService (P2P networking). 16 44 * When networking is enabled, a full Helia node is created (libp2p + bitswap + DHT). 17 45 * When networking is disabled, only a local FsBlockstore is used (for testing). 18 46 * 19 47 * All methods no-op gracefully if the service hasn't started yet. 20 48 * String-based CID interface at the boundary — decouples from @atproto's CID class. 21 49 */ 22 - export class IpfsService { 50 + export class IpfsService implements BlockStore, NetworkService { 23 51 private helia: Helia | null = null; 24 52 private blockstore: FsBlockstore | null = null; 25 53 private config: IpfsConfig;
+9 -8
src/replication/replication-manager.ts
··· 6 6 import type Database from "better-sqlite3"; 7 7 import type { Config } from "../config.js"; 8 8 import type { RepoManager } from "../repo-manager.js"; 9 - import type { IpfsService } from "../ipfs.js"; 9 + import type { BlockStore, NetworkService } from "../ipfs.js"; 10 10 import type { DidResolver } from "../did-resolver.js"; 11 11 import { readCarWithRoot } from "@atproto/repo"; 12 12 ··· 46 46 db: Database.Database, 47 47 private config: Config, 48 48 private repoManager: RepoManager, 49 - private ipfsService: IpfsService, 49 + private blockStore: BlockStore, 50 + private networkService: NetworkService, 50 51 private didResolver: DidResolver, 51 52 verificationConfig?: Partial<VerificationConfig>, 52 53 ) { 53 54 this.syncStorage = new SyncStorage(db); 54 55 this.repoFetcher = new RepoFetcher(didResolver); 55 56 this.peerDiscovery = new PeerDiscovery(this.repoFetcher); 56 - this.verifier = new BlockVerifier(ipfsService); 57 + this.verifier = new BlockVerifier(blockStore); 57 58 this.verificationConfig = { 58 59 ...DEFAULT_VERIFICATION_CONFIG, 59 60 ...verificationConfig, 60 61 }; 61 62 this.remoteVerifier = new RemoteVerifier( 62 - ipfsService, 63 + blockStore, 63 64 this.verificationConfig, 64 65 ); 65 66 } ··· 78 79 * No-op if networking is disabled (getPeerId returns null). 79 80 */ 80 81 async publishPeerIdentity(): Promise<void> { 81 - const peerId = this.ipfsService.getPeerId(); 82 + const peerId = this.networkService.getPeerId(); 82 83 if (!peerId) return; // networking disabled 83 84 84 85 const record: PeerIdentityRecord = { 85 86 $type: PEER_NSID, 86 87 peerId, 87 - multiaddrs: this.ipfsService.getMultiaddrs(), 88 + multiaddrs: this.networkService.getMultiaddrs(), 88 89 createdAt: new Date().toISOString(), 89 90 }; 90 91 ··· 199 200 // 4. Parse CAR and store blocks 200 201 const { root, blocks } = await readCarWithRoot(carBytes); 201 202 202 - await this.ipfsService.putBlocks(blocks); 203 + await this.blockStore.putBlocks(blocks); 203 204 204 205 // 5. Collect CID strings for DHT announcement + verification 205 206 const cidStrs: string[] = []; ··· 218 219 } 219 220 220 221 // 6. Announce to DHT (fire-and-forget) 221 - this.ipfsService.provideBlocks(cidStrs).catch(() => {}); 222 + this.networkService.provideBlocks(cidStrs).catch(() => {}); 222 223 223 224 // 7. Verify local block availability 224 225 const verification =
+3 -3
src/replication/replication.test.ts
··· 204 204 await ipfsService.start(); 205 205 206 206 repoManager = new RepoManager(db, config); 207 - repoManager.init(undefined, ipfsService); 207 + repoManager.init(undefined, ipfsService, ipfsService); 208 208 }); 209 209 210 210 afterEach(async () => { ··· 518 518 }); 519 519 await sourceIpfs.start(); 520 520 sourceRepo = new RepoManager(sourceDb, sourceConfig); 521 - sourceRepo.init(undefined, sourceIpfs); 521 + sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 522 522 523 523 // Replica setup (different DID for the local node, but will replicate source's data) 524 524 const replicaConfig = testConfig(join(tmpDir, "replica"), [ ··· 535 535 }); 536 536 await replicaIpfs.start(); 537 537 replicaRepo = new RepoManager(replicaDb, replicaConfig); 538 - replicaRepo.init(undefined, replicaIpfs); 538 + replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 539 539 }); 540 540 541 541 afterEach(async () => {
+11 -7
src/replication/verification.ts
··· 2 2 * Block verification: local spot-checks and layered remote verification. 3 3 */ 4 4 5 - import type { IpfsService } from "../ipfs.js"; 5 + import type { BlockStore } from "../ipfs.js"; 6 6 import { 7 7 type VerificationConfig, 8 8 type LayerResult, ··· 17 17 } 18 18 19 19 export class BlockVerifier { 20 - constructor(private ipfsService: IpfsService) {} 20 + private blockStore: BlockStore; 21 + 22 + constructor(blockStore: BlockStore) { 23 + this.blockStore = blockStore; 24 + } 21 25 22 26 /** 23 27 * Verify that a random sample of blocks are available in our blockstore. ··· 41 45 let available = 0; 42 46 43 47 for (const cid of toCheck) { 44 - const has = await this.ipfsService.hasBlock(cid); 48 + const has = await this.blockStore.hasBlock(cid); 45 49 if (has) { 46 50 available++; 47 51 } else { ··· 80 84 */ 81 85 export class RemoteVerifier { 82 86 private config: VerificationConfig; 83 - private ipfsService: IpfsService; 87 + private blockStore: BlockStore; 84 88 private fetchFn: typeof fetch; 85 89 86 90 constructor( 87 - ipfsService: IpfsService, 91 + blockStore: BlockStore, 88 92 config?: Partial<VerificationConfig>, 89 93 fetchFn?: typeof fetch, 90 94 ) { 91 - this.ipfsService = ipfsService; 95 + this.blockStore = blockStore; 92 96 this.config = { ...DEFAULT_VERIFICATION_CONFIG, ...config }; 93 97 this.fetchFn = fetchFn ?? fetch; 94 98 } ··· 207 211 const res = await this.fetchFn(url); 208 212 if (res.status === 200) { 209 213 const remoteBytes = new Uint8Array(await res.arrayBuffer()); 210 - const localBytes = await this.ipfsService.getBlock(cid); 214 + const localBytes = await this.blockStore.getBlock(cid); 211 215 if ( 212 216 localBytes && 213 217 Buffer.from(remoteBytes).equals(Buffer.from(localBytes))
+22 -11
src/repo-manager.ts
··· 27 27 import { BlobStore, type BlobRef } from "./blobs.js"; 28 28 import { jsonToLex, type JsonValue } from "@atproto/lex-json"; 29 29 import type { Config } from "./config.js"; 30 - import type { IpfsService } from "./ipfs.js"; 30 + import type { BlockStore, NetworkService } from "./ipfs.js"; 31 31 32 32 /** 33 33 * RepoManager - manages a single user's AT Protocol repository. ··· 41 41 private keypair: Secp256k1Keypair | null = null; 42 42 sequencer: Sequencer; 43 43 blobStore: BlobStore | null = null; 44 - ipfsService: IpfsService | null = null; 44 + blockStore: BlockStore | null = null; 45 + networkService: NetworkService | null = null; 45 46 private repoInitialized = false; 46 47 47 48 /** Callback invoked when a firehose event is produced */ ··· 58 59 /** 59 60 * Initialize storage schema and optionally the blob store. 60 61 */ 61 - init(blobStore?: BlobStore, ipfsService?: IpfsService): void { 62 + init( 63 + blobStore?: BlobStore, 64 + blockStore?: BlockStore, 65 + networkService?: NetworkService, 66 + ): void { 62 67 this.storage.initSchema(true); 63 68 if (blobStore) { 64 69 this.blobStore = blobStore; 65 70 } 66 - if (ipfsService) { 67 - this.ipfsService = ipfsService; 71 + if (blockStore) { 72 + this.blockStore = blockStore; 73 + } 74 + if (networkService) { 75 + this.networkService = networkService; 68 76 } 69 77 } 70 78 ··· 148 156 this.onFirehoseEvent?.(event); 149 157 150 158 // Fire-and-forget: push new blocks to IPFS (never blocks commit path) 151 - if (this.ipfsService) { 152 - const ipfs = this.ipfsService; 153 - ipfs.putBlocks(newBlocks) 159 + if (this.blockStore) { 160 + const store = this.blockStore; 161 + const net = this.networkService; 162 + store 163 + .putBlocks(newBlocks) 154 164 .then(() => { 165 + if (!net) return; 155 166 const cidStrs: string[] = []; 156 167 const map = ( 157 168 newBlocks as unknown as { ··· 163 174 cidStrs.push(cidStr); 164 175 } 165 176 } 166 - return ipfs.provideBlocks(cidStrs); 177 + return net.provideBlocks(cidStrs); 167 178 }) 168 179 .catch(() => {}); 169 180 } ··· 672 683 this.storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType); 673 684 674 685 // Fire-and-forget: push blob to IPFS 675 - if (this.ipfsService) { 676 - this.ipfsService 686 + if (this.blockStore) { 687 + this.blockStore 677 688 .putBlock(blobRef.ref.$link, bytes) 678 689 .catch(() => {}); 679 690 }
+3 -2
src/server.ts
··· 47 47 // Initialize repo manager 48 48 const repoManager = new RepoManager(db, config); 49 49 const blobStore = new BlobStore(dataDir, config.DID); 50 - repoManager.init(blobStore, ipfsService); 50 + repoManager.init(blobStore, ipfsService, ipfsService); 51 51 52 52 // Initialize firehose 53 53 const firehose = new Firehose(repoManager); ··· 65 65 config, 66 66 repoManager, 67 67 ipfsService, 68 + ipfsService, 68 69 didResolver, 69 70 ); 70 71 } 71 72 72 73 // Create Hono app 73 - const app = createApp(config, repoManager, firehose, ipfsService, blobStore, replicationManager); 74 + const app = createApp(config, repoManager, firehose, ipfsService, ipfsService, blobStore, replicationManager); 74 75 75 76 // Create HTTP server using @hono/node-server's request listener 76 77 const requestListener = getRequestListener(app.fetch);