atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add bidirectional replication plumbing: late-bind PdsClient, serve replicated data without RepoManager, logout disconnect

- ReplicationManager.setPdsClient() lazily creates OfferManager after OAuth login
- All 7 sync endpoints accept optional repoManager, registered unconditionally
- Repo read endpoints (describeRepo, getRecord, listRecords) serve replicated data without local repo
- listBlobs and sync.getRecord fall back to SyncStorage/ReplicatedRepoReader
- Logout with ?disconnect=true revokes offers/peer record, clears node_identity, unbinds DID
- OAuth callback wires PdsClient into ReplicationManager automatically
- Added typecheck npm script

+217 -125
+1
package.json
··· 8 8 "dev": "tsx watch src/server.ts", 9 9 "build": "tsc", 10 10 "start": "node dist/server.js", 11 + "typecheck": "tsc --noEmit", 11 12 "test": "vitest run", 12 13 "smoke-test": "bash scripts/smoke-test.sh" 13 14 },
+69 -71
src/index.ts
··· 76 76 // OAuth routes (before auth middleware — these handle browser redirect flow) 77 77 // ============================================ 78 78 if (oauthClientManager && pdsClientRef && db) { 79 - registerOAuthRoutes(app, config, oauthClientManager.client, pdsClientRef, db, networkService, oauthClientManager.sessionStore); 79 + registerOAuthRoutes(app, config, oauthClientManager.client, pdsClientRef, db, networkService, oauthClientManager.sessionStore, replicationManager); 80 80 } 81 81 82 82 // ============================================ ··· 306 306 // Sync endpoints (federation) 307 307 // ============================================ 308 308 309 - // Only register sync endpoints if we have a repo manager 310 - if (repoManager) { 311 - const rm = repoManager; 312 - app.get("/xrpc/com.atproto.sync.getRepo", (c) => 313 - sync.getRepo(c, rm, blockStore, replicationManager?.getSyncStorage()), 314 - ); 315 - app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => 316 - sync.getRepoStatus(c, rm, replicatedRepoReader), 317 - ); 318 - app.get("/xrpc/com.atproto.sync.getBlocks", (c) => 319 - sync.getBlocks(c, rm, blockStore, replicationManager?.getSyncStorage()), 320 - ); 321 - app.get("/xrpc/com.atproto.sync.getBlob", (c) => 322 - sync.getBlob(c, rm, blockStore, replicationManager?.getSyncStorage()), 323 - ); 324 - app.get("/xrpc/com.atproto.sync.listRepos", (c) => 325 - sync.listRepos(c, rm, replicatedRepoReader), 326 - ); 327 - app.get("/xrpc/com.atproto.sync.listBlobs", (c) => 328 - sync.listBlobs(c, rm), 329 - ); 330 - app.get("/xrpc/com.atproto.sync.getRecord", (c) => 331 - sync.getRecord(c, rm), 332 - ); 333 - } 309 + // Sync endpoints that can serve replicated data without a repo manager 310 + app.get("/xrpc/com.atproto.sync.getRepo", (c) => 311 + sync.getRepo(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 312 + ); 313 + app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => 314 + sync.getRepoStatus(c, repoManager, replicatedRepoReader), 315 + ); 316 + app.get("/xrpc/com.atproto.sync.getBlocks", (c) => 317 + sync.getBlocks(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 318 + ); 319 + app.get("/xrpc/com.atproto.sync.getBlob", (c) => 320 + sync.getBlob(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 321 + ); 322 + app.get("/xrpc/com.atproto.sync.listRepos", (c) => 323 + sync.listRepos(c, repoManager, replicatedRepoReader), 324 + ); 325 + app.get("/xrpc/com.atproto.sync.listBlobs", (c) => 326 + sync.listBlobs(c, repoManager, replicationManager?.getSyncStorage()), 327 + ); 328 + app.get("/xrpc/com.atproto.sync.getRecord", (c) => 329 + sync.getRecord(c, repoManager, replicatedRepoReader), 330 + ); 334 331 335 332 // WebSocket firehose - handled via ws library upgrade, not Hono 336 333 // (see server.ts for WebSocket setup) ··· 338 335 // ============================================ 339 336 // Repository operations 340 337 // ============================================ 341 - if (repoManager) { 342 - const rm = repoManager; 343 338 344 - app.use("/xrpc/com.atproto.repo.describeRepo", async (c, next) => { 345 - const requestedRepo = c.req.query("repo"); 346 - if (!requestedRepo || requestedRepo === config.DID) { 347 - return repo.describeRepo(c, rm); 348 - } 349 - if ( 350 - replicatedRepoReader && 351 - requestedRepo && 352 - replicatedRepoReader.isReplicatedDid(requestedRepo) 353 - ) { 354 - return repo.describeRepoReplicated(c, replicatedRepoReader, requestedRepo); 355 - } 356 - await next(); 357 - }); 339 + // Read endpoints: serve local repo if available, fall back to replicated data 340 + app.use("/xrpc/com.atproto.repo.describeRepo", async (c, next) => { 341 + const requestedRepo = c.req.query("repo"); 342 + if (repoManager && (!requestedRepo || requestedRepo === config.DID)) { 343 + return repo.describeRepo(c, repoManager); 344 + } 345 + if ( 346 + replicatedRepoReader && 347 + requestedRepo && 348 + replicatedRepoReader.isReplicatedDid(requestedRepo) 349 + ) { 350 + return repo.describeRepoReplicated(c, replicatedRepoReader, requestedRepo); 351 + } 352 + await next(); 353 + }); 358 354 359 - app.use("/xrpc/com.atproto.repo.getRecord", async (c, next) => { 360 - const requestedRepo = c.req.query("repo"); 361 - if (!requestedRepo || requestedRepo === config.DID) { 362 - return repo.getRecord(c, rm); 363 - } 364 - if ( 365 - replicatedRepoReader && 366 - requestedRepo && 367 - replicatedRepoReader.isReplicatedDid(requestedRepo) 368 - ) { 369 - return repo.getRecordReplicated(c, replicatedRepoReader, requestedRepo); 370 - } 371 - await next(); 372 - }); 355 + app.use("/xrpc/com.atproto.repo.getRecord", async (c, next) => { 356 + const requestedRepo = c.req.query("repo"); 357 + if (repoManager && (!requestedRepo || requestedRepo === config.DID)) { 358 + return repo.getRecord(c, repoManager); 359 + } 360 + if ( 361 + replicatedRepoReader && 362 + requestedRepo && 363 + replicatedRepoReader.isReplicatedDid(requestedRepo) 364 + ) { 365 + return repo.getRecordReplicated(c, replicatedRepoReader, requestedRepo); 366 + } 367 + await next(); 368 + }); 373 369 374 - app.use("/xrpc/com.atproto.repo.listRecords", async (c, next) => { 375 - const requestedRepo = c.req.query("repo"); 376 - if (!requestedRepo || requestedRepo === config.DID) { 377 - return repo.listRecords(c, rm); 378 - } 379 - if ( 380 - replicatedRepoReader && 381 - requestedRepo && 382 - replicatedRepoReader.isReplicatedDid(requestedRepo) 383 - ) { 384 - return repo.listRecordsReplicated(c, replicatedRepoReader, requestedRepo); 385 - } 386 - await next(); 387 - }); 370 + app.use("/xrpc/com.atproto.repo.listRecords", async (c, next) => { 371 + const requestedRepo = c.req.query("repo"); 372 + if (repoManager && (!requestedRepo || requestedRepo === config.DID)) { 373 + return repo.listRecords(c, repoManager); 374 + } 375 + if ( 376 + replicatedRepoReader && 377 + requestedRepo && 378 + replicatedRepoReader.isReplicatedDid(requestedRepo) 379 + ) { 380 + return repo.listRecordsReplicated(c, replicatedRepoReader, requestedRepo); 381 + } 382 + await next(); 383 + }); 388 384 389 - // Write operations require authentication 385 + // Write operations require authentication + repo manager 386 + if (repoManager) { 387 + const rm = repoManager; 390 388 app.post("/xrpc/com.atproto.repo.createRecord", requireAuth, (c) => 391 389 repo.createRecord(c, rm), 392 390 );
+46
src/oauth/routes.ts
··· 12 12 import type { Config } from "../config.js"; 13 13 import { PdsClient } from "./pds-client.js"; 14 14 import type { NetworkService } from "../ipfs.js"; 15 + import type { ReplicationManager } from "../replication/replication-manager.js"; 15 16 16 17 /** 17 18 * Mutable holder for PdsClient — allows OAuth callback to create/update ··· 30 31 db: Database.Database, 31 32 networkService?: NetworkService, 32 33 sessionStore?: import("./stores.js").OAuthSessionStore, 34 + replicationManager?: ReplicationManager, 33 35 ): void { 34 36 /** 35 37 * Start OAuth login flow. ··· 109 111 // Create PdsClient if we don't have one yet 110 112 if (!pdsClientRef.current) { 111 113 pdsClientRef.current = new PdsClient(oauthClient, did); 114 + // Late-bind into ReplicationManager so OfferManager can be created 115 + if (replicationManager) { 116 + replicationManager.setPdsClient(pdsClientRef.current, did); 117 + } 112 118 } 113 119 114 120 // Publish peer record on successful auth ··· 177 183 178 184 /** 179 185 * Logout — clear the OAuth session. 186 + * With ?disconnect=true, also unbinds the node from its DID entirely: 187 + * removes node_identity, clears config.DID, and revokes published offers/peer records. 180 188 */ 181 189 app.post("/oauth/logout", async (c) => { 182 190 try { 191 + const disconnect = c.req.query("disconnect") === "true"; 183 192 const did = config.DID; 193 + 194 + // Best-effort: revoke published records before clearing the session 195 + if (disconnect && pdsClientRef.current && did) { 196 + try { 197 + await pdsClientRef.current.deleteRecord("org.p2pds.peer", "self"); 198 + } catch { 199 + // Non-fatal: peer record may not exist 200 + } 201 + 202 + // Revoke all published offers 203 + if (replicationManager) { 204 + const offerManager = replicationManager.getOfferManager(); 205 + if (offerManager) { 206 + try { 207 + const offers = await offerManager.getLocalOffers(); 208 + for (const offer of offers) { 209 + if (offer.subject) { 210 + await offerManager.revokeOffer(offer.subject); 211 + } 212 + } 213 + } catch { 214 + // Non-fatal 215 + } 216 + } 217 + } 218 + } 219 + 220 + // Clear OAuth session 184 221 if (did && sessionStore) { 185 222 await sessionStore.del(did); 186 223 } 187 224 pdsClientRef.current?.clearAgent(); 225 + pdsClientRef.current = undefined; 226 + 227 + // Full disconnect: unbind node from DID 228 + if (disconnect) { 229 + db.prepare("DELETE FROM node_identity").run(); 230 + config.DID = undefined; 231 + config.HANDLE = undefined; 232 + } 233 + 188 234 return c.redirect("/xrpc/org.p2pds.admin.dashboard"); 189 235 } catch (err) { 190 236 const message = err instanceof Error ? err.message : String(err);
+17
src/replication/replication-manager.ts
··· 130 130 } 131 131 132 132 /** 133 + * Late-bind a PdsClient (or any RecordWriter) after construction. 134 + * Called when OAuth login completes after the server has already started. 135 + * Creates the OfferManager if a PolicyEngine is present but OfferManager 136 + * wasn't created at construction time (because no RecordWriter was available). 137 + */ 138 + setPdsClient(client: RecordWriter, did: string): void { 139 + if (this.policyEngine && !this.offerManager) { 140 + this.offerManager = new OfferManager( 141 + client, 142 + this.peerDiscovery, 143 + this.policyEngine, 144 + did, 145 + ); 146 + } 147 + } 148 + 149 + /** 133 150 * Initialize replication: create tables, sync manifests. 134 151 */ 135 152 async init(): Promise<void> {
+84 -54
src/xrpc/sync.ts
··· 11 11 12 12 export async function getRepo( 13 13 c: Context<AppEnv>, 14 - repoManager: RepoManager, 14 + repoManager: RepoManager | undefined, 15 15 blockStore?: BlockStore, 16 16 syncStorage?: SyncStorage, 17 17 ): Promise<Response> { ··· 32 32 } 33 33 34 34 // Local DID: serve from RepoManager 35 - if (did === c.env.DID) { 35 + if (did === c.env.DID && repoManager) { 36 36 const carBytes = await repoManager.getRepoCar(); 37 37 return new Response(carBytes, { 38 38 status: 200, ··· 75 75 76 76 export async function getRepoStatus( 77 77 c: Context<AppEnv>, 78 - repoManager: RepoManager, 78 + repoManager: RepoManager | undefined, 79 79 replicatedRepoReader?: ReplicatedRepoReader, 80 80 ): Promise<Response> { 81 81 const did = c.req.query("did"); ··· 94 94 ); 95 95 } 96 96 97 - if (did === c.env.DID) { 97 + if (did === c.env.DID && repoManager) { 98 98 const data = await repoManager.getRepoStatus(); 99 99 return c.json({ 100 100 did: data.did, ··· 125 125 126 126 export async function listRepos( 127 127 c: Context<AppEnv>, 128 - repoManager: RepoManager, 128 + repoManager: RepoManager | undefined, 129 129 replicatedRepoReader?: ReplicatedRepoReader, 130 130 ): Promise<Response> { 131 - const data = await repoManager.getRepoStatus(); 132 - 133 131 const repos: Array<{ 134 132 did: string; 135 133 head?: string; 136 134 rev: string | null; 137 135 active: boolean; 138 - }> = [ 139 - { 136 + }> = []; 137 + 138 + // Include local repo if available 139 + if (repoManager) { 140 + const data = await repoManager.getRepoStatus(); 141 + repos.push({ 140 142 did: data.did, 141 143 head: data.head, 142 144 rev: data.rev, 143 145 active: true, 144 - }, 145 - ]; 146 + }); 147 + } 146 148 147 149 // Include replicated repos if reader is available 148 150 if (replicatedRepoReader) { ··· 161 163 162 164 export async function listBlobs( 163 165 c: Context<AppEnv>, 164 - repoManager: RepoManager, 166 + repoManager: RepoManager | undefined, 167 + syncStorage?: SyncStorage, 165 168 ): Promise<Response> { 166 169 const did = c.req.query("did"); 167 170 ··· 179 182 ); 180 183 } 181 184 182 - if (did !== c.env.DID) { 183 - return c.json( 184 - { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 185 - 404, 186 - ); 187 - } 185 + // Local DID: serve from RepoManager's blob store 186 + if (did === c.env.DID && repoManager) { 187 + if (!repoManager.blobStore) { 188 + return c.json({ cids: [] }); 189 + } 190 + 191 + const cursor = c.req.query("cursor"); 192 + const limit = Math.min(Number(c.req.query("limit")) || 500, 1000); 188 193 189 - if (!repoManager.blobStore) { 190 - return c.json({ cids: [] }); 191 - } 194 + const result = repoManager.blobStore.listBlobs(limit, cursor || undefined); 192 195 193 - const cursor = c.req.query("cursor"); 194 - const limit = Math.min(Number(c.req.query("limit")) || 500, 1000); 196 + const response: { cids: string[]; cursor?: string } = { cids: result.cids }; 197 + if (result.cursor) { 198 + response.cursor = result.cursor; 199 + } 195 200 196 - const result = repoManager.blobStore.listBlobs(limit, cursor || undefined); 201 + return c.json(response); 202 + } 197 203 198 - const response: { cids: string[]; cursor?: string } = { cids: result.cids }; 199 - if (result.cursor) { 200 - response.cursor = result.cursor; 204 + // Replicated DID: serve from SyncStorage blob tracking 205 + if (syncStorage) { 206 + const blobCids = syncStorage.getBlobCids(did); 207 + if (blobCids.length > 0 || syncStorage.getState(did)) { 208 + return c.json({ cids: blobCids }); 209 + } 201 210 } 202 211 203 - return c.json(response); 212 + return c.json( 213 + { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 214 + 404, 215 + ); 204 216 } 205 217 206 218 export async function getBlocks( 207 219 c: Context<AppEnv>, 208 - repoManager: RepoManager, 220 + repoManager: RepoManager | undefined, 209 221 blockStore?: BlockStore, 210 222 syncStorage?: SyncStorage, 211 223 ): Promise<Response> { ··· 234 246 } 235 247 236 248 // Local DID: serve from RepoManager 237 - if (did === c.env.DID) { 249 + if (did === c.env.DID && repoManager) { 238 250 const carBytes = await repoManager.getBlocks(cidsParam); 239 251 return new Response(carBytes, { 240 252 status: 200, ··· 280 292 281 293 export async function getBlob( 282 294 c: Context<AppEnv>, 283 - repoManager: RepoManager, 295 + repoManager: RepoManager | undefined, 284 296 blockStore?: BlockStore, 285 297 syncStorage?: SyncStorage, 286 298 ): Promise<Response> { ··· 302 314 } 303 315 304 316 // Local DID: serve from filesystem BlobStore 305 - if (did === c.env.DID) { 317 + if (did === c.env.DID && repoManager) { 306 318 if (!repoManager.blobStore) { 307 319 return c.json( 308 320 { error: "ServiceUnavailable", message: "Blob storage is not configured" }, ··· 358 370 359 371 export async function getRecord( 360 372 c: Context<AppEnv>, 361 - repoManager: RepoManager, 373 + repoManager: RepoManager | undefined, 374 + replicatedRepoReader?: ReplicatedRepoReader, 362 375 ): Promise<Response> { 363 376 const did = c.req.query("did"); 364 377 const collection = c.req.query("collection"); ··· 406 419 ); 407 420 } 408 421 409 - if (did !== c.env.DID) { 410 - return c.json( 411 - { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 412 - 404, 413 - ); 422 + // Local DID: serve record proof from RepoManager 423 + if (did === c.env.DID && repoManager) { 424 + try { 425 + const carBytes = await repoManager.getRecordProof(collection, rkey); 426 + 427 + return new Response(carBytes, { 428 + status: 200, 429 + headers: { 430 + "Content-Type": "application/vnd.ipld.car", 431 + "Content-Length": carBytes.length.toString(), 432 + }, 433 + }); 434 + } catch (err) { 435 + console.error("Error getting record proof:", err); 436 + return c.json( 437 + { error: "InternalServerError", message: "Failed to get record proof" }, 438 + 500, 439 + ); 440 + } 414 441 } 415 442 416 - try { 417 - const carBytes = await repoManager.getRecordProof(collection, rkey); 418 - 419 - return new Response(carBytes, { 420 - status: 200, 421 - headers: { 422 - "Content-Type": "application/vnd.ipld.car", 423 - "Content-Length": carBytes.length.toString(), 424 - }, 425 - }); 426 - } catch (err) { 427 - console.error("Error getting record proof:", err); 428 - return c.json( 429 - { error: "InternalServerError", message: "Failed to get record proof" }, 430 - 500, 431 - ); 443 + // Replicated DID: serve record from ReplicatedRepoReader 444 + if (replicatedRepoReader && replicatedRepoReader.isReplicatedDid(did)) { 445 + try { 446 + const record = await replicatedRepoReader.getRecord(did, collection, rkey); 447 + if (record) { 448 + return c.json({ 449 + uri: `at://${did}/${collection}/${rkey}`, 450 + cid: record.cid, 451 + value: record.value, 452 + }); 453 + } 454 + } catch { 455 + // Fall through to 404 456 + } 432 457 } 458 + 459 + return c.json( 460 + { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 461 + 404, 462 + ); 433 463 }