atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add node identity (did:web) separate from social account

Nodes now have their own did:web:{hostname} identity for coordination
(consent, discovery, policy), independent of any social account they
may host. Social account fields (DID, HANDLE, SIGNING_KEY) are now
optional — nodes can run as replication-only without a social account.

- New node-identity module: keypair lifecycle, did:web derivation, DID
document generation
- Config: social fields optional, added NODE_DID, NODE_MANAGERS
- Server: auto-generates node keypair, separate node-repo.db, optional
social account repo
- Auth: accepts node DID + social DID + manager DIDs
- ReplicationManager: uses nodeDid for gossipsub, offers, challenges
- XRPC handlers: fall back to NODE_DID when social DID not configured
- All 384 tests pass, zero TypeScript errors

+1871 -346
+27 -13
src/config.ts
··· 3 3 import type { PolicySet } from "./policy/types.js"; 4 4 5 5 export interface Config { 6 - DID: string; 7 - HANDLE: string; 6 + /** Social account DID (optional — omit for replication-only node). */ 7 + DID?: string; 8 + /** Social account handle (optional). */ 9 + HANDLE?: string; 8 10 PDS_HOSTNAME: string; 9 11 AUTH_TOKEN: string; 10 - SIGNING_KEY: string; 11 - SIGNING_KEY_PUBLIC: string; 12 + /** Social account signing key hex (optional). */ 13 + SIGNING_KEY?: string; 14 + /** Social account signing key public multibase (optional). */ 15 + SIGNING_KEY_PUBLIC?: string; 12 16 JWT_SECRET: string; 13 17 PASSWORD_HASH: string; 14 18 EMAIL?: string; ··· 20 24 POLICY_FILE?: string; 21 25 FIREHOSE_URL: string; 22 26 FIREHOSE_ENABLED: boolean; 27 + /** Node's did:web identity, derived from PDS_HOSTNAME. Always present. */ 28 + NODE_DID: string; 29 + /** DIDs authorized to manage this node (parsed from comma-separated env var). */ 30 + NODE_MANAGERS: string[]; 31 + /** Optional human-readable node name. */ 32 + NODE_NAME?: string; 23 33 } 24 34 25 35 const REQUIRED_KEYS = [ 26 - "DID", 27 - "HANDLE", 28 36 "PDS_HOSTNAME", 29 37 "AUTH_TOKEN", 30 - "SIGNING_KEY", 31 - "SIGNING_KEY_PUBLIC", 32 38 "JWT_SECRET", 33 39 "PASSWORD_HASH", 34 40 ] as const; ··· 68 74 /** 69 75 * Load and validate configuration from environment variables. 70 76 * Optionally loads a .env file first. 77 + * 78 + * Social account fields (DID, HANDLE, SIGNING_KEY, SIGNING_KEY_PUBLIC) are optional. 79 + * When omitted, the node runs as replication-only with its own did:web identity. 71 80 */ 72 81 export function loadConfig(envPath?: string): Config { 73 82 // Load .env file if it exists ··· 87 96 ); 88 97 } 89 98 99 + const pdsHostname = process.env.PDS_HOSTNAME!; 100 + 90 101 return { 91 - DID: process.env.DID!, 92 - HANDLE: process.env.HANDLE!, 93 - PDS_HOSTNAME: process.env.PDS_HOSTNAME!, 102 + DID: process.env.DID || undefined, 103 + HANDLE: process.env.HANDLE || undefined, 104 + PDS_HOSTNAME: pdsHostname, 94 105 AUTH_TOKEN: process.env.AUTH_TOKEN!, 95 - SIGNING_KEY: process.env.SIGNING_KEY!, 96 - SIGNING_KEY_PUBLIC: process.env.SIGNING_KEY_PUBLIC!, 106 + SIGNING_KEY: process.env.SIGNING_KEY || undefined, 107 + SIGNING_KEY_PUBLIC: process.env.SIGNING_KEY_PUBLIC || undefined, 97 108 JWT_SECRET: process.env.JWT_SECRET!, 98 109 PASSWORD_HASH: process.env.PASSWORD_HASH!, 99 110 EMAIL: process.env.EMAIL, ··· 105 116 POLICY_FILE: process.env.POLICY_FILE || undefined, 106 117 FIREHOSE_URL: process.env.FIREHOSE_URL ?? "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", 107 118 FIREHOSE_ENABLED: process.env.FIREHOSE_ENABLED !== "false", 119 + NODE_DID: `did:web:${pdsHostname.replace(/:/g, "%3A")}`, 120 + NODE_MANAGERS: (process.env.NODE_MANAGERS ?? "").split(",").map(s => s.trim()).filter(Boolean), 121 + NODE_NAME: process.env.NODE_NAME || undefined, 108 122 }; 109 123 } 110 124
+231 -201
src/index.ts
··· 16 16 import { serializeResponse } from "./replication/challenge-response/http-transport.js"; 17 17 import type { StorageChallenge } from "./replication/challenge-response/types.js"; 18 18 import { generateMstProof } from "./replication/mst-proof.js"; 19 + import { generateNodeDidDocument } from "./node-identity.js"; 19 20 20 21 const VERSION = "0.1.0"; 21 22 23 + /** Node identity options passed from server.ts */ 24 + export interface NodeIdentityOpts { 25 + nodeDid: string; 26 + nodePublicKeyMultibase: string; 27 + nodeRepoManager: RepoManager; 28 + /** Social account repo manager (undefined when no social account configured). */ 29 + repoManager?: RepoManager; 30 + } 31 + 22 32 /** 23 33 * Create the Hono app with all routes. 24 34 * The repoManager and firehose are passed in from the server entry point. 25 35 */ 26 36 export function createApp( 27 37 config: Config, 28 - repoManager: RepoManager, 29 38 firehose: Firehose, 30 39 blockStore?: BlockStore, 31 40 networkService?: NetworkService, 32 41 blobStore?: BlobStore, 33 42 replicationManager?: ReplicationManager, 34 43 replicatedRepoReader?: ReplicatedRepoReader, 44 + nodeOpts?: NodeIdentityOpts, 35 45 ) { 46 + const nodeDid = nodeOpts?.nodeDid ?? config.NODE_DID; 47 + const nodePublicKeyMultibase = nodeOpts?.nodePublicKeyMultibase ?? ""; 48 + const nodeRepoManager = nodeOpts?.nodeRepoManager; 49 + // Social account repo manager (may be undefined for replication-only nodes) 50 + const socialRepoManager = nodeOpts?.repoManager; 51 + // Primary repo manager: social if available, else node 52 + const primaryRepoManager = socialRepoManager ?? nodeRepoManager; 53 + 36 54 const app = new Hono<{ Bindings: Config }>(); 37 55 38 56 // Bind config to all requests ··· 54 72 }), 55 73 ); 56 74 57 - // DID document for did:web resolution 75 + // DID document for did:web resolution — serves the node's DID document 58 76 app.get("/.well-known/did.json", (c) => { 59 - const didDocument = { 60 - "@context": [ 61 - "https://www.w3.org/ns/did/v1", 62 - "https://w3id.org/security/multikey/v1", 63 - "https://w3id.org/security/suites/secp256k1-2019/v1", 64 - ], 65 - id: config.DID, 66 - alsoKnownAs: [`at://${config.HANDLE}`], 67 - verificationMethod: [ 68 - { 69 - id: `${config.DID}#atproto`, 70 - type: "Multikey", 71 - controller: config.DID, 72 - publicKeyMultibase: config.SIGNING_KEY_PUBLIC, 73 - }, 74 - ], 75 - service: [ 76 - { 77 - id: "#atproto_pds", 78 - type: "AtprotoPersonalDataServer", 79 - serviceEndpoint: `https://${config.PDS_HOSTNAME}`, 80 - }, 81 - ], 82 - }; 77 + const didDocument = generateNodeDidDocument( 78 + nodeDid, 79 + nodePublicKeyMultibase, 80 + config.PDS_HOSTNAME, 81 + ); 83 82 return c.json(didDocument); 84 83 }); 85 84 86 85 // Handle verification for AT Protocol 86 + // If the hostname matches the handle, return the social DID (for did:web handle resolution). 87 + // Also resolves the hostname itself to the node DID. 87 88 app.get("/.well-known/atproto-did", (c) => { 88 - if (config.HANDLE !== config.PDS_HOSTNAME) { 89 - return c.notFound(); 89 + if (config.HANDLE && config.DID && config.HANDLE === config.PDS_HOSTNAME) { 90 + return new Response(config.DID, { 91 + headers: { "Content-Type": "text/plain" }, 92 + }); 90 93 } 91 - return new Response(config.DID, { 94 + // Resolve hostname to node DID 95 + return new Response(nodeDid, { 92 96 headers: { "Content-Type": "text/plain" }, 93 97 }); 94 98 }); ··· 107 111 } 108 112 109 113 // 2. Fallback: SQLite blocks table 110 - if (!bytes) { 114 + if (!bytes && primaryRepoManager) { 111 115 try { 112 116 const { CID } = await import("@atproto/lex-data"); 113 117 const cid = CID.parse(cidStr); 114 - bytes = await repoManager.storage.getBytes(cid); 118 + bytes = await primaryRepoManager.storage.getBytes(cid); 115 119 } catch { 116 120 // CID parse failure or storage error 117 121 } ··· 144 148 // Health check 145 149 app.get("/xrpc/_health", (c) => { 146 150 try { 147 - repoManager.healthCheck(); 151 + if (primaryRepoManager) { 152 + primaryRepoManager.healthCheck(); 153 + } 148 154 const health: Record<string, unknown> = { 149 155 status: "ok", 150 156 version: VERSION, ··· 164 170 165 171 // Homepage 166 172 app.get("/", (c) => { 173 + const handleHtml = config.HANDLE 174 + ? `<div class="handle"><a href="https://bsky.app/profile/${config.HANDLE}" target="_blank">@${config.HANDLE}</a></div>` 175 + : `<div class="handle">${nodeDid}</div>`; 167 176 const html = `<!DOCTYPE html> 168 177 <html lang="en"> 169 178 <head> ··· 194 203 <body> 195 204 <div class="name">P2PDS</div> 196 205 <div class="what">a personal data server for the atmosphere</div> 197 - <div class="handle"><a href="https://bsky.app/profile/${config.HANDLE}" target="_blank">@${config.HANDLE}</a></div> 206 + ${handleHtml} 198 207 <div class="version">v${VERSION}</div> 199 208 </body> 200 209 </html>`; ··· 204 213 // ============================================ 205 214 // Sync endpoints (federation) 206 215 // ============================================ 207 - app.get("/xrpc/com.atproto.sync.getRepo", (c) => 208 - sync.getRepo(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 209 - ); 210 - app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => 211 - sync.getRepoStatus(c, repoManager, replicatedRepoReader), 212 - ); 213 - app.get("/xrpc/com.atproto.sync.getBlocks", (c) => 214 - sync.getBlocks(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 215 - ); 216 - app.get("/xrpc/com.atproto.sync.getBlob", (c) => 217 - sync.getBlob(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 218 - ); 219 - app.get("/xrpc/com.atproto.sync.listRepos", (c) => 220 - sync.listRepos(c, repoManager, replicatedRepoReader), 221 - ); 222 - app.get("/xrpc/com.atproto.sync.listBlobs", (c) => 223 - sync.listBlobs(c, repoManager), 224 - ); 225 - app.get("/xrpc/com.atproto.sync.getRecord", (c) => 226 - sync.getRecord(c, repoManager), 227 - ); 216 + 217 + // Only register sync endpoints if we have a repo manager 218 + if (primaryRepoManager) { 219 + const rm = primaryRepoManager; 220 + app.get("/xrpc/com.atproto.sync.getRepo", (c) => 221 + sync.getRepo(c, rm, blockStore, replicationManager?.getSyncStorage()), 222 + ); 223 + app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => 224 + sync.getRepoStatus(c, rm, replicatedRepoReader), 225 + ); 226 + app.get("/xrpc/com.atproto.sync.getBlocks", (c) => 227 + sync.getBlocks(c, rm, blockStore, replicationManager?.getSyncStorage()), 228 + ); 229 + app.get("/xrpc/com.atproto.sync.getBlob", (c) => 230 + sync.getBlob(c, rm, blockStore, replicationManager?.getSyncStorage()), 231 + ); 232 + app.get("/xrpc/com.atproto.sync.listRepos", (c) => 233 + sync.listRepos(c, rm, replicatedRepoReader), 234 + ); 235 + app.get("/xrpc/com.atproto.sync.listBlobs", (c) => 236 + sync.listBlobs(c, rm), 237 + ); 238 + app.get("/xrpc/com.atproto.sync.getRecord", (c) => 239 + sync.getRecord(c, rm), 240 + ); 241 + } 228 242 229 243 // WebSocket firehose - handled via ws library upgrade, not Hono 230 244 // (see server.ts for WebSocket setup) ··· 232 246 // ============================================ 233 247 // Repository operations 234 248 // ============================================ 235 - app.use("/xrpc/com.atproto.repo.describeRepo", async (c, next) => { 236 - const requestedRepo = c.req.query("repo"); 237 - if (!requestedRepo || requestedRepo === config.DID) { 238 - return repo.describeRepo(c, repoManager); 239 - } 240 - if ( 241 - replicatedRepoReader && 242 - requestedRepo && 243 - replicatedRepoReader.isReplicatedDid(requestedRepo) 244 - ) { 245 - return repo.describeRepoReplicated(c, replicatedRepoReader, requestedRepo); 246 - } 247 - await next(); 248 - }); 249 + if (primaryRepoManager) { 250 + const rm = primaryRepoManager; 249 251 250 - app.use("/xrpc/com.atproto.repo.getRecord", async (c, next) => { 251 - const requestedRepo = c.req.query("repo"); 252 - if (!requestedRepo || requestedRepo === config.DID) { 253 - return repo.getRecord(c, repoManager); 254 - } 255 - if ( 256 - replicatedRepoReader && 257 - requestedRepo && 258 - replicatedRepoReader.isReplicatedDid(requestedRepo) 259 - ) { 260 - return repo.getRecordReplicated(c, replicatedRepoReader, requestedRepo); 261 - } 262 - await next(); 263 - }); 252 + app.use("/xrpc/com.atproto.repo.describeRepo", async (c, next) => { 253 + const requestedRepo = c.req.query("repo"); 254 + if (!requestedRepo || requestedRepo === config.DID || requestedRepo === nodeDid) { 255 + return repo.describeRepo(c, rm); 256 + } 257 + if ( 258 + replicatedRepoReader && 259 + requestedRepo && 260 + replicatedRepoReader.isReplicatedDid(requestedRepo) 261 + ) { 262 + return repo.describeRepoReplicated(c, replicatedRepoReader, requestedRepo); 263 + } 264 + await next(); 265 + }); 264 266 265 - app.use("/xrpc/com.atproto.repo.listRecords", async (c, next) => { 266 - const requestedRepo = c.req.query("repo"); 267 - if (!requestedRepo || requestedRepo === config.DID) { 268 - return repo.listRecords(c, repoManager); 269 - } 270 - if ( 271 - replicatedRepoReader && 272 - requestedRepo && 273 - replicatedRepoReader.isReplicatedDid(requestedRepo) 274 - ) { 275 - return repo.listRecordsReplicated(c, replicatedRepoReader, requestedRepo); 276 - } 277 - await next(); 278 - }); 267 + app.use("/xrpc/com.atproto.repo.getRecord", async (c, next) => { 268 + const requestedRepo = c.req.query("repo"); 269 + if (!requestedRepo || requestedRepo === config.DID || requestedRepo === nodeDid) { 270 + return repo.getRecord(c, rm); 271 + } 272 + if ( 273 + replicatedRepoReader && 274 + requestedRepo && 275 + replicatedRepoReader.isReplicatedDid(requestedRepo) 276 + ) { 277 + return repo.getRecordReplicated(c, replicatedRepoReader, requestedRepo); 278 + } 279 + await next(); 280 + }); 279 281 280 - // Write operations require authentication 281 - app.post("/xrpc/com.atproto.repo.createRecord", requireAuth, (c) => 282 - repo.createRecord(c, repoManager), 283 - ); 284 - app.post("/xrpc/com.atproto.repo.deleteRecord", requireAuth, (c) => 285 - repo.deleteRecord(c, repoManager), 286 - ); 287 - app.post("/xrpc/com.atproto.repo.uploadBlob", requireAuth, (c) => 288 - repo.uploadBlob(c, repoManager), 289 - ); 290 - app.post("/xrpc/com.atproto.repo.applyWrites", requireAuth, (c) => 291 - repo.applyWrites(c, repoManager), 292 - ); 293 - app.post("/xrpc/com.atproto.repo.putRecord", requireAuth, (c) => 294 - repo.putRecord(c, repoManager), 295 - ); 296 - app.post("/xrpc/com.atproto.repo.importRepo", requireAuth, (c) => 297 - repo.importRepo(c, repoManager), 298 - ); 299 - app.get("/xrpc/com.atproto.repo.listMissingBlobs", requireAuth, (c) => 300 - repo.listMissingBlobs(c, repoManager), 301 - ); 282 + app.use("/xrpc/com.atproto.repo.listRecords", async (c, next) => { 283 + const requestedRepo = c.req.query("repo"); 284 + if (!requestedRepo || requestedRepo === config.DID || requestedRepo === nodeDid) { 285 + return repo.listRecords(c, rm); 286 + } 287 + if ( 288 + replicatedRepoReader && 289 + requestedRepo && 290 + replicatedRepoReader.isReplicatedDid(requestedRepo) 291 + ) { 292 + return repo.listRecordsReplicated(c, replicatedRepoReader, requestedRepo); 293 + } 294 + await next(); 295 + }); 296 + 297 + // Write operations require authentication 298 + app.post("/xrpc/com.atproto.repo.createRecord", requireAuth, (c) => 299 + repo.createRecord(c, rm), 300 + ); 301 + app.post("/xrpc/com.atproto.repo.deleteRecord", requireAuth, (c) => 302 + repo.deleteRecord(c, rm), 303 + ); 304 + app.post("/xrpc/com.atproto.repo.uploadBlob", requireAuth, (c) => 305 + repo.uploadBlob(c, rm), 306 + ); 307 + app.post("/xrpc/com.atproto.repo.applyWrites", requireAuth, (c) => 308 + repo.applyWrites(c, rm), 309 + ); 310 + app.post("/xrpc/com.atproto.repo.putRecord", requireAuth, (c) => 311 + repo.putRecord(c, rm), 312 + ); 313 + app.post("/xrpc/com.atproto.repo.importRepo", requireAuth, (c) => 314 + repo.importRepo(c, rm), 315 + ); 316 + app.get("/xrpc/com.atproto.repo.listMissingBlobs", requireAuth, (c) => 317 + repo.listMissingBlobs(c, rm), 318 + ); 319 + } 302 320 303 321 // ============================================ 304 322 // Server identity ··· 308 326 // Handle resolution 309 327 app.get("/xrpc/com.atproto.identity.resolveHandle", (c) => { 310 328 const handle = c.req.query("handle"); 311 - if (handle === config.HANDLE) { 329 + if (config.HANDLE && config.DID && handle === config.HANDLE) { 312 330 return c.json({ did: config.DID }); 313 331 } 332 + if (handle === config.PDS_HOSTNAME) { 333 + return c.json({ did: nodeDid }); 334 + } 314 335 return c.json( 315 336 { error: "HandleNotFound", message: `Handle not found: ${handle}` }, 316 337 404, ··· 318 339 }); 319 340 320 341 // ============================================ 321 - // Session management 322 - // ============================================ 323 - app.post("/xrpc/com.atproto.server.createSession", (c) => 324 - server.createSession(c, repoManager), 325 - ); 326 - app.post("/xrpc/com.atproto.server.refreshSession", (c) => 327 - server.refreshSession(c, repoManager), 328 - ); 329 - app.get("/xrpc/com.atproto.server.getSession", (c) => 330 - server.getSession(c, repoManager), 331 - ); 332 - app.post("/xrpc/com.atproto.server.deleteSession", server.deleteSession); 333 - 342 + // Session management (requires social account) 334 343 // ============================================ 335 - // Account lifecycle 336 - // ============================================ 337 - app.get("/xrpc/com.atproto.server.checkAccountStatus", requireAuth, (c) => 338 - server.checkAccountStatus(c, repoManager), 339 - ); 340 - app.post("/xrpc/com.atproto.server.activateAccount", requireAuth, (c) => 341 - server.activateAccount(c, repoManager), 342 - ); 343 - app.post("/xrpc/com.atproto.server.deactivateAccount", requireAuth, (c) => 344 - server.deactivateAccount(c, repoManager), 345 - ); 346 - app.post("/xrpc/gg.mk.experimental.resetMigration", requireAuth, (c) => 347 - server.resetMigration(c, repoManager), 348 - ); 349 - app.post( 350 - "/xrpc/com.atproto.server.requestEmailUpdate", 351 - requireAuth, 352 - server.requestEmailUpdate, 353 - ); 354 - app.post( 355 - "/xrpc/com.atproto.server.requestEmailConfirmation", 356 - requireAuth, 357 - server.requestEmailConfirmation, 358 - ); 359 - app.post("/xrpc/com.atproto.server.updateEmail", requireAuth, (c) => 360 - server.updateEmail(c, repoManager), 361 - ); 344 + if (primaryRepoManager) { 345 + const rm = primaryRepoManager; 346 + app.post("/xrpc/com.atproto.server.createSession", (c) => 347 + server.createSession(c, rm), 348 + ); 349 + app.post("/xrpc/com.atproto.server.refreshSession", (c) => 350 + server.refreshSession(c, rm), 351 + ); 352 + app.get("/xrpc/com.atproto.server.getSession", (c) => 353 + server.getSession(c, rm), 354 + ); 355 + app.post("/xrpc/com.atproto.server.deleteSession", server.deleteSession); 362 356 363 - // Service auth 364 - app.get( 365 - "/xrpc/com.atproto.server.getServiceAuth", 366 - requireAuth, 367 - server.getServiceAuth, 368 - ); 357 + // ============================================ 358 + // Account lifecycle 359 + // ============================================ 360 + app.get("/xrpc/com.atproto.server.checkAccountStatus", requireAuth, (c) => 361 + server.checkAccountStatus(c, rm), 362 + ); 363 + app.post("/xrpc/com.atproto.server.activateAccount", requireAuth, (c) => 364 + server.activateAccount(c, rm), 365 + ); 366 + app.post("/xrpc/com.atproto.server.deactivateAccount", requireAuth, (c) => 367 + server.deactivateAccount(c, rm), 368 + ); 369 + app.post("/xrpc/gg.mk.experimental.resetMigration", requireAuth, (c) => 370 + server.resetMigration(c, rm), 371 + ); 372 + app.post( 373 + "/xrpc/com.atproto.server.requestEmailUpdate", 374 + requireAuth, 375 + server.requestEmailUpdate, 376 + ); 377 + app.post( 378 + "/xrpc/com.atproto.server.requestEmailConfirmation", 379 + requireAuth, 380 + server.requestEmailConfirmation, 381 + ); 382 + app.post("/xrpc/com.atproto.server.updateEmail", requireAuth, (c) => 383 + server.updateEmail(c, rm), 384 + ); 369 385 370 - // ============================================ 371 - // Actor preferences 372 - // ============================================ 373 - app.get("/xrpc/app.bsky.actor.getPreferences", requireAuth, async (c) => { 374 - const result = await repoManager.getPreferences(); 375 - return c.json(result); 376 - }); 377 - app.post("/xrpc/app.bsky.actor.putPreferences", requireAuth, async (c) => { 378 - const body = await c.req.json<{ preferences: unknown[] }>(); 379 - await repoManager.putPreferences(body.preferences); 380 - return c.json({}); 381 - }); 386 + // Service auth 387 + app.get( 388 + "/xrpc/com.atproto.server.getServiceAuth", 389 + requireAuth, 390 + server.getServiceAuth, 391 + ); 382 392 383 - // ============================================ 384 - // Identity events 385 - // ============================================ 386 - app.post( 387 - "/xrpc/gg.mk.experimental.emitIdentityEvent", 388 - requireAuth, 389 - async (c) => { 390 - const result = await repoManager.emitIdentityEvent(config.HANDLE); 393 + // ============================================ 394 + // Actor preferences 395 + // ============================================ 396 + app.get("/xrpc/app.bsky.actor.getPreferences", requireAuth, async (c) => { 397 + const result = await rm.getPreferences(); 391 398 return c.json(result); 392 - }, 393 - ); 399 + }); 400 + app.post("/xrpc/app.bsky.actor.putPreferences", requireAuth, async (c) => { 401 + const body = await c.req.json<{ preferences: unknown[] }>(); 402 + await rm.putPreferences(body.preferences); 403 + return c.json({}); 404 + }); 394 405 395 - // ============================================ 396 - // Firehose status 397 - // ============================================ 398 - app.get( 399 - "/xrpc/gg.mk.experimental.getFirehoseStatus", 400 - requireAuth, 401 - (c) => { 402 - return c.json( 403 - repoManager.getFirehoseStatus(firehose.subscriberCount), 404 - ); 405 - }, 406 - ); 406 + // ============================================ 407 + // Identity events 408 + // ============================================ 409 + app.post( 410 + "/xrpc/gg.mk.experimental.emitIdentityEvent", 411 + requireAuth, 412 + async (c) => { 413 + const result = await rm.emitIdentityEvent(config.HANDLE ?? config.PDS_HOSTNAME); 414 + return c.json(result); 415 + }, 416 + ); 417 + 418 + // ============================================ 419 + // Firehose status 420 + // ============================================ 421 + app.get( 422 + "/xrpc/gg.mk.experimental.getFirehoseStatus", 423 + requireAuth, 424 + (c) => { 425 + return c.json( 426 + rm.getFirehoseStatus(firehose.subscriberCount), 427 + ); 428 + }, 429 + ); 430 + } 407 431 408 432 // ============================================ 409 433 // Replication status ··· 442 466 const response = await respondToChallenge( 443 467 challenge, 444 468 blockStore, 445 - config.DID, 469 + nodeDid, 446 470 ); 447 471 return c.json(serializeResponse(response)); 448 472 }); ··· 552 576 // Admin monitoring 553 577 // ============================================ 554 578 app.get("/xrpc/org.p2pds.admin.getOverview", requireAuth, (c) => 555 - admin.getOverview(c, networkService, replicationManager), 579 + admin.getOverview(c, nodeDid, networkService, replicationManager), 556 580 ); 557 581 app.get("/xrpc/org.p2pds.admin.getDidStatus", requireAuth, (c) => 558 582 admin.getDidStatus(c, replicationManager), ··· 565 589 ); 566 590 app.get("/xrpc/org.p2pds.admin.getSyncHistory", requireAuth, (c) => 567 591 admin.getSyncHistory(c, replicationManager), 592 + ); 593 + app.post("/xrpc/org.p2pds.admin.addDid", requireAuth, (c) => 594 + admin.addDid(c, nodeDid, replicationManager), 595 + ); 596 + app.post("/xrpc/org.p2pds.admin.removeDid", requireAuth, (c) => 597 + admin.removeDid(c, replicationManager), 568 598 ); 569 599 app.get("/xrpc/org.p2pds.admin.dashboard", (c) => 570 600 admin.getDashboard(c, networkService, replicationManager),
+4 -2
src/ipfs.test.ts
··· 43 43 REPLICATE_DIDS: [], 44 44 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 45 45 FIREHOSE_ENABLED: false, 46 + NODE_DID: "did:web:test.example.com", 47 + NODE_MANAGERS: [], 46 48 }; 47 49 } 48 50 ··· 247 249 }); 248 250 await ipfsService.start(); 249 251 250 - blobStore = new BlobStore(tmpDir, config.DID); 252 + blobStore = new BlobStore(tmpDir, config.DID!); 251 253 252 - app = createApp(config, repoManager, firehose, ipfsService, ipfsService, blobStore); 254 + app = createApp(config, firehose, ipfsService, ipfsService, blobStore, undefined, undefined, { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }); 253 255 }); 254 256 255 257 afterEach(async () => {
+15
src/ipfs.ts
··· 14 14 getBlock(cidStr: string): Promise<Uint8Array | null>; 15 15 hasBlock(cidStr: string): Promise<boolean>; 16 16 putBlocks(blocks: BlockMap): Promise<void>; 17 + deleteBlock(cidStr: string): Promise<void>; 17 18 } 18 19 19 20 /** ··· 205 206 return await store.has(cid); 206 207 } catch { 207 208 return false; 209 + } 210 + } 211 + 212 + async deleteBlock(cidStr: string): Promise<void> { 213 + if (!this.blockstore) return; 214 + try { 215 + const cid = CID.parse(cidStr); 216 + if (this.helia) { 217 + await this.helia.blockstore.delete(cid); 218 + } else { 219 + await this.blockstore.delete(cid); 220 + } 221 + } catch { 222 + // No-op if block doesn't exist 208 223 } 209 224 } 210 225
+35 -14
src/middleware/auth.ts
··· 12 12 auth: AuthInfo; 13 13 }; 14 14 15 + /** 16 + * Build the set of DIDs accepted for authentication. 17 + * Includes: node DID, social DID (if configured), and any manager DIDs. 18 + */ 19 + function getAcceptedDids(env: Config): Set<string> { 20 + const accepted = new Set<string>(); 21 + accepted.add(env.NODE_DID); 22 + if (env.DID) { 23 + accepted.add(env.DID); 24 + } 25 + for (const managerDid of env.NODE_MANAGERS) { 26 + accepted.add(managerDid); 27 + } 28 + return accepted; 29 + } 30 + 15 31 export async function requireAuth( 16 32 c: Context<{ Bindings: Config; Variables: AuthVariables }>, 17 33 next: Next, ··· 43 59 44 60 // Try static token first 45 61 if (token === c.env.AUTH_TOKEN) { 46 - c.set("auth", { did: c.env.DID, scope: "com.atproto.access" }); 62 + c.set("auth", { did: c.env.NODE_DID, scope: "com.atproto.access" }); 47 63 return next(); 48 64 } 49 65 50 66 const serviceDid = `did:web:${c.env.PDS_HOSTNAME}`; 67 + const acceptedDids = getAcceptedDids(c.env); 51 68 52 69 // Try session JWT verification (HS256, signed with JWT_SECRET) 53 70 try { ··· 57 74 serviceDid, 58 75 ); 59 76 60 - if (payload.sub !== c.env.DID) { 77 + if (!payload.sub || !acceptedDids.has(payload.sub)) { 61 78 return c.json( 62 79 { 63 80 error: "AuthenticationRequired", ··· 67 84 ); 68 85 } 69 86 70 - c.set("auth", { did: payload.sub, scope: payload.scope as string }); 87 + c.set("auth", { did: payload.sub as string, scope: payload.scope as string }); 71 88 return next(); 72 89 } catch (err) { 73 90 if (err instanceof TokenExpiredError) { ··· 83 100 } 84 101 85 102 // Try service JWT verification (ES256K, signed with our signing key) 86 - try { 87 - const payload = await verifyServiceJwt( 88 - token, 89 - c.env.SIGNING_KEY, 90 - serviceDid, 91 - c.env.DID, 92 - ); 103 + if (c.env.SIGNING_KEY) { 104 + try { 105 + const payload = await verifyServiceJwt( 106 + token, 107 + c.env.SIGNING_KEY, 108 + serviceDid, 109 + c.env.DID ?? c.env.NODE_DID, 110 + ); 93 111 94 - c.set("auth", { did: payload.iss, scope: payload.lxm || "atproto" }); 95 - return next(); 96 - } catch { 97 - // Service JWT verification also failed 112 + if (acceptedDids.has(payload.iss)) { 113 + c.set("auth", { did: payload.iss, scope: payload.lxm || "atproto" }); 114 + return next(); 115 + } 116 + } catch { 117 + // Service JWT verification also failed 118 + } 98 119 } 99 120 100 121 return c.json(
+124
src/node-identity.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 2 + import { mkdtempSync, rmSync, readFileSync, existsSync } from "node:fs"; 3 + import { join } from "node:path"; 4 + import { tmpdir } from "node:os"; 5 + import { 6 + loadOrCreateNodeIdentity, 7 + getNodeDid, 8 + getPublicKeyMultibase, 9 + generateNodeDidDocument, 10 + } from "./node-identity.js"; 11 + 12 + describe("node-identity", () => { 13 + let tmpDir: string; 14 + 15 + beforeEach(() => { 16 + tmpDir = mkdtempSync(join(tmpdir(), "node-identity-test-")); 17 + }); 18 + 19 + afterEach(() => { 20 + try { 21 + rmSync(tmpDir, { recursive: true, force: true }); 22 + } catch {} 23 + }); 24 + 25 + describe("loadOrCreateNodeIdentity", () => { 26 + it("creates a new keypair when none exists", async () => { 27 + const { keypair, exported } = await loadOrCreateNodeIdentity(tmpDir); 28 + 29 + expect(keypair).toBeDefined(); 30 + expect(exported).toMatch(/^[0-9a-f]{64}$/); 31 + 32 + // Key file should exist 33 + const keyPath = join(tmpDir, "node-signing.key"); 34 + expect(existsSync(keyPath)).toBe(true); 35 + 36 + // File content should match exported hex 37 + const fileContent = readFileSync(keyPath, "utf-8").trim(); 38 + expect(fileContent).toBe(exported); 39 + }); 40 + 41 + it("loads existing keypair from file", async () => { 42 + // Create first 43 + const { exported: hex1 } = await loadOrCreateNodeIdentity(tmpDir); 44 + 45 + // Load again — should get same key 46 + const { exported: hex2 } = await loadOrCreateNodeIdentity(tmpDir); 47 + 48 + expect(hex2).toBe(hex1); 49 + }); 50 + 51 + it("produces a keypair that can sign and verify", async () => { 52 + const { keypair } = await loadOrCreateNodeIdentity(tmpDir); 53 + 54 + expect(keypair.jwtAlg).toBe("ES256K"); 55 + expect(keypair.publicKeyBytes()).toBeInstanceOf(Uint8Array); 56 + expect(keypair.publicKeyBytes().length).toBeGreaterThan(0); 57 + }); 58 + }); 59 + 60 + describe("getNodeDid", () => { 61 + it("returns did:web for simple hostname", () => { 62 + expect(getNodeDid("example.com")).toBe("did:web:example.com"); 63 + }); 64 + 65 + it("encodes port colons as %3A", () => { 66 + expect(getNodeDid("localhost:3000")).toBe("did:web:localhost%3A3000"); 67 + }); 68 + 69 + it("handles hostname with multiple colons", () => { 70 + expect(getNodeDid("host:1234:extra")).toBe( 71 + "did:web:host%3A1234%3Aextra", 72 + ); 73 + }); 74 + }); 75 + 76 + describe("getPublicKeyMultibase", () => { 77 + it("returns a multibase-encoded public key", async () => { 78 + const { keypair } = await loadOrCreateNodeIdentity(tmpDir); 79 + const multibase = getPublicKeyMultibase(keypair); 80 + 81 + expect(multibase).toMatch(/^z/); // multibase prefix for base58btc 82 + expect(multibase.length).toBeGreaterThan(10); 83 + }); 84 + }); 85 + 86 + describe("generateNodeDidDocument", () => { 87 + it("generates a valid DID document structure", () => { 88 + const did = "did:web:example.com"; 89 + const publicKeyMultibase = "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr"; 90 + const hostname = "example.com"; 91 + 92 + const doc = generateNodeDidDocument( 93 + did, 94 + publicKeyMultibase, 95 + hostname, 96 + ) as Record<string, unknown>; 97 + 98 + expect(doc["@context"]).toContain("https://www.w3.org/ns/did/v1"); 99 + expect(doc.id).toBe(did); 100 + 101 + const vm = (doc.verificationMethod as Array<Record<string, unknown>>)[0]!; 102 + expect(vm.id).toBe(`${did}#atproto`); 103 + expect(vm.type).toBe("Multikey"); 104 + expect(vm.controller).toBe(did); 105 + expect(vm.publicKeyMultibase).toBe(publicKeyMultibase); 106 + 107 + const svc = (doc.service as Array<Record<string, unknown>>)[0]!; 108 + expect(svc.id).toBe("#atproto_pds"); 109 + expect(svc.type).toBe("AtprotoPersonalDataServer"); 110 + expect(svc.serviceEndpoint).toBe("https://example.com"); 111 + }); 112 + 113 + it("uses hostname with port in service endpoint", () => { 114 + const doc = generateNodeDidDocument( 115 + "did:web:localhost%3A3000", 116 + "zTest", 117 + "localhost:3000", 118 + ) as Record<string, unknown>; 119 + 120 + const svc = (doc.service as Array<Record<string, unknown>>)[0]!; 121 + expect(svc.serviceEndpoint).toBe("https://localhost:3000"); 122 + }); 123 + }); 124 + });
+84
src/node-identity.ts
··· 1 + /** 2 + * Node identity management: keypair lifecycle, did:web derivation, DID document generation. 3 + * 4 + * The node has its own did:web identity, separate from any social account it may host. 5 + * The keypair is stored as a hex-encoded private key file in the data directory. 6 + */ 7 + 8 + import { readFileSync, writeFileSync, existsSync } from "node:fs"; 9 + import { resolve } from "node:path"; 10 + import { Secp256k1Keypair, formatMultikey } from "@atproto/crypto"; 11 + 12 + const KEY_FILENAME = "node-signing.key"; 13 + 14 + /** 15 + * Load or create the node's signing keypair. 16 + * Checks for `{dataDir}/node-signing.key` (hex string). 17 + * If missing, generates a new keypair, exports to hex, writes to file with mode 0o600. 18 + */ 19 + export async function loadOrCreateNodeIdentity( 20 + dataDir: string, 21 + ): Promise<{ keypair: Secp256k1Keypair; exported: string }> { 22 + const keyPath = resolve(dataDir, KEY_FILENAME); 23 + 24 + if (existsSync(keyPath)) { 25 + const hex = readFileSync(keyPath, "utf-8").trim(); 26 + const keypair = await Secp256k1Keypair.import(hex, { 27 + exportable: true, 28 + }); 29 + return { keypair, exported: hex }; 30 + } 31 + 32 + const keypair = await Secp256k1Keypair.create({ exportable: true }); 33 + const exported = Buffer.from(await keypair.export()).toString("hex"); 34 + writeFileSync(keyPath, exported + "\n", { mode: 0o600 }); 35 + return { keypair, exported }; 36 + } 37 + 38 + /** 39 + * Derive the node's did:web from its hostname. 40 + * Encodes `:` as `%3A` per the did:web spec (for ports). 41 + */ 42 + export function getNodeDid(hostname: string): string { 43 + return `did:web:${hostname.replace(/:/g, "%3A")}`; 44 + } 45 + 46 + /** 47 + * Get the public key multibase string for a keypair. 48 + */ 49 + export function getPublicKeyMultibase(keypair: Secp256k1Keypair): string { 50 + return formatMultikey(keypair.jwtAlg, keypair.publicKeyBytes()); 51 + } 52 + 53 + /** 54 + * Generate a DID document for the node's did:web identity. 55 + */ 56 + export function generateNodeDidDocument( 57 + did: string, 58 + publicKeyMultibase: string, 59 + hostname: string, 60 + ): object { 61 + return { 62 + "@context": [ 63 + "https://www.w3.org/ns/did/v1", 64 + "https://w3id.org/security/multikey/v1", 65 + "https://w3id.org/security/suites/secp256k1-2019/v1", 66 + ], 67 + id: did, 68 + verificationMethod: [ 69 + { 70 + id: `${did}#atproto`, 71 + type: "Multikey", 72 + controller: did, 73 + publicKeyMultibase, 74 + }, 75 + ], 76 + service: [ 77 + { 78 + id: "#atproto_pds", 79 + type: "AtprotoPersonalDataServer", 80 + serviceEndpoint: `https://${hostname}`, 81 + }, 82 + ], 83 + }; 84 + }
+2
src/replication/challenge-response/challenge-response.test.ts
··· 35 35 REPLICATE_DIDS: [], 36 36 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 37 37 FIREHOSE_ENABLED: false, 38 + NODE_DID: "did:web:test.example.com", 39 + NODE_MANAGERS: [], 38 40 }; 39 41 } 40 42
+5
src/replication/challenge-response/e2e-challenge.test.ts
··· 43 43 REPLICATE_DIDS: [], 44 44 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 45 45 FIREHOSE_ENABLED: false, 46 + NODE_DID: "did:web:test.example.com", 47 + NODE_MANAGERS: [], 46 48 }; 47 49 } 48 50 ··· 276 278 } 277 279 }, 278 280 async putBlocks() {}, 281 + async deleteBlock() {}, 279 282 }; 280 283 281 284 // Register challenge handler on node A ··· 354 357 } 355 358 }, 356 359 async putBlocks() {}, 360 + async deleteBlock() {}, 357 361 }; 358 362 359 363 transportA!.onChallenge(async (challenge) => { ··· 428 432 } 429 433 }, 430 434 async putBlocks() {}, 435 + async deleteBlock() {}, 431 436 }; 432 437 433 438 transportA!.onChallenge(async (challenge) => {
+2
src/replication/challenge-response/scheduler.test.ts
··· 71 71 } 72 72 73 73 async putBlocks(_blocks: BlockMap): Promise<void> {} 74 + 75 + async deleteBlock(_cidStr: string): Promise<void> {} 74 76 } 75 77 76 78 // ============================================
+3
src/replication/e2e-multi-node.test.ts
··· 48 48 REPLICATE_DIDS: [], 49 49 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 50 50 FIREHOSE_ENABLED: false, 51 + NODE_DID: "did:web:test.example.com", 52 + NODE_MANAGERS: [], 51 53 }; 52 54 } 53 55 ··· 148 150 } 149 151 }, 150 152 async putBlocks() {}, 153 + async deleteBlock() {}, 151 154 }; 152 155 } 153 156
+2
src/replication/firehose-incremental.test.ts
··· 62 62 REPLICATE_DIDS: replicateDids, 63 63 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 64 64 FIREHOSE_ENABLED: false, 65 + NODE_DID: "did:web:local.example.com", 66 + NODE_MANAGERS: [], 65 67 }; 66 68 } 67 69
+53 -2
src/replication/firehose-subscription.ts
··· 32 32 rebase: boolean; 33 33 } 34 34 35 + /** Parsed account status event from the firehose. */ 36 + export interface FirehoseAccountEvent { 37 + seq: number; 38 + did: string; 39 + time: string; 40 + active: boolean; 41 + status?: string; // "takendown", "suspended", "deleted", "deactivated" 42 + } 43 + 35 44 /** Callback for commit events. */ 36 45 export type CommitHandler = (event: FirehoseCommitEvent) => void | Promise<void>; 46 + 47 + /** Callback for account status events. */ 48 + export type AccountHandler = (event: FirehoseAccountEvent) => void | Promise<void>; 37 49 38 50 /** Configuration for the firehose subscription. */ 39 51 export interface FirehoseSubscriptionConfig { ··· 62 74 private ws: WebSocket | null = null; 63 75 private dids: Set<string> = new Set(); 64 76 private commitHandlers: CommitHandler[] = []; 77 + private accountHandlers: AccountHandler[] = []; 65 78 private cursor: number | null = null; 66 79 private running = false; 67 80 private reconnectTimer: ReturnType<typeof setTimeout> | null = null; ··· 121 134 */ 122 135 onCommit(handler: CommitHandler): void { 123 136 this.commitHandlers.push(handler); 137 + } 138 + 139 + /** 140 + * Register a handler for account status events matching serviced DIDs. 141 + */ 142 + onAccount(handler: AccountHandler): void { 143 + this.accountHandlers.push(handler); 124 144 } 125 145 126 146 /** ··· 256 276 // Only process normal events 257 277 if (op !== 1) return; 258 278 259 - // Only process commit events 260 - if (type !== "#commit") return; 279 + // Process commit and account events 280 + if (type !== "#commit" && type !== "#account") return; 261 281 262 282 const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>; 263 283 this._eventsReceived++; 284 + 285 + // Handle account status events 286 + if (type === "#account") { 287 + const did = body.did as string | undefined; 288 + if (!did || !this.dids.has(did)) return; 289 + 290 + const event: FirehoseAccountEvent = { 291 + seq: body.seq as number, 292 + did, 293 + time: (body.time as string) ?? new Date().toISOString(), 294 + active: (body.active as boolean) ?? true, 295 + status: body.status as string | undefined, 296 + }; 297 + 298 + this.cursor = event.seq; 299 + this._eventsProcessed++; 300 + 301 + for (const handler of this.accountHandlers) { 302 + try { 303 + const result = handler(event); 304 + if (result && typeof result === "object" && "catch" in result) { 305 + (result as Promise<void>).catch((err) => { 306 + console.error("[firehose-subscription] Account handler error:", err); 307 + }); 308 + } 309 + } catch (err) { 310 + console.error("[firehose-subscription] Account handler error:", err); 311 + } 312 + } 313 + return; 314 + } 264 315 265 316 // Filter: only process events for our serviced DIDs 266 317 const repo = body.repo as string | undefined;
+7
src/replication/gossipsub-notifications.test.ts
··· 271 271 getBlock: vi.fn().mockResolvedValue(null), 272 272 hasBlock: vi.fn().mockResolvedValue(false), 273 273 putBlocks: vi.fn().mockResolvedValue(undefined), 274 + deleteBlock: vi.fn().mockResolvedValue(undefined), 274 275 }; 275 276 } 276 277 ··· 296 297 REPLICATE_DIDS: ["did:plc:remote1", "did:plc:remote2"], 297 298 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 298 299 FIREHOSE_ENABLED: false, 300 + NODE_DID: "did:web:test.example.com", 301 + NODE_MANAGERS: [], 299 302 }; 300 303 301 304 const { RepoManager } = await import("../repo-manager.js"); ··· 354 357 REPLICATE_DIDS: ["did:plc:remote1"], 355 358 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 356 359 FIREHOSE_ENABLED: false, 360 + NODE_DID: "did:web:test.example.com", 361 + NODE_MANAGERS: [], 357 362 }; 358 363 359 364 const { RepoManager } = await import("../repo-manager.js"); ··· 434 439 REPLICATE_DIDS: ["did:plc:remote1"], 435 440 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 436 441 FIREHOSE_ENABLED: false, 442 + NODE_DID: "did:web:test.example.com", 443 + NODE_MANAGERS: [], 437 444 }; 438 445 439 446 const { RepoManager } = await import("../repo-manager.js");
+63 -1
src/replication/mst-proof.test.ts
··· 7 7 import { RepoManager } from "../repo-manager.js"; 8 8 import type { Config } from "../config.js"; 9 9 import { readCarWithRoot } from "@atproto/repo"; 10 - import { generateMstProof, verifyMstProof, extractAllRecordPaths } from "./mst-proof.js"; 10 + import { generateMstProof, verifyMstProof, extractAllRecordPaths, extractAllCids } from "./mst-proof.js"; 11 11 12 12 function testConfig(dataDir: string): Config { 13 13 return { ··· 28 28 REPLICATE_DIDS: [], 29 29 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 30 30 FIREHOSE_ENABLED: false, 31 + NODE_DID: "did:web:test.example.com", 32 + NODE_MANAGERS: [], 31 33 }; 32 34 } 33 35 ··· 639 641 "bafyreig6mxqmjlb7yjbhhhz6vqmtiw4kgipvhqoowdkggjlpzpd5tcm4", 640 642 ); 641 643 expect(paths).toEqual([]); 644 + }); 645 + 646 + // ============================================ 647 + // extractAllCids 648 + // ============================================ 649 + 650 + it("extractAllCids returns all reachable CIDs", async () => { 651 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 652 + $type: "app.bsky.feed.post", 653 + text: "CID walk test", 654 + createdAt: new Date().toISOString(), 655 + }); 656 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 657 + $type: "app.bsky.feed.post", 658 + text: "Another post", 659 + createdAt: new Date().toISOString(), 660 + }); 661 + 662 + const rootCid = await getRepoRootCid(); 663 + 664 + const cids = await extractAllCids(ipfsService, rootCid); 665 + 666 + // Should include at least: commit CID, MST root, MST nodes, record value CIDs 667 + expect(cids.size).toBeGreaterThanOrEqual(4); 668 + // The commit CID itself should be in the set 669 + expect(cids.has(rootCid)).toBe(true); 670 + }); 671 + 672 + it("extractAllCids grows with more records", async () => { 673 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 674 + $type: "app.bsky.feed.post", 675 + text: "First post", 676 + createdAt: new Date().toISOString(), 677 + }); 678 + 679 + const rootCid1 = await getRepoRootCid(); 680 + const cids1 = await extractAllCids(ipfsService, rootCid1); 681 + 682 + // Add more records 683 + for (let i = 0; i < 10; i++) { 684 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 685 + $type: "app.bsky.feed.post", 686 + text: `Additional post ${i}`, 687 + createdAt: new Date().toISOString(), 688 + }); 689 + } 690 + 691 + const rootCid2 = await getRepoRootCid(); 692 + const cids2 = await extractAllCids(ipfsService, rootCid2); 693 + 694 + // More records = more CIDs 695 + expect(cids2.size).toBeGreaterThan(cids1.size); 696 + }); 697 + 698 + it("extractAllCids returns only commit CID for missing data", async () => { 699 + const fakeCid = "bafyreig6mxqmjlb7yjbhhhz6vqmtiw4kgipvhqoowdkggjlpzpd5tcm4"; 700 + const cids = await extractAllCids(ipfsService, fakeCid); 701 + // Only the commit CID itself (data can't be loaded) 702 + expect(cids.size).toBe(1); 703 + expect(cids.has(fakeCid)).toBe(true); 642 704 }); 643 705 });
+68
src/replication/mst-proof.ts
··· 173 173 return paths; 174 174 } 175 175 176 + /** 177 + * Walk an MST node recursively, collecting ALL CIDs reachable from it. 178 + * Collects node CIDs, record value CIDs, and subtree pointer CIDs. 179 + */ 180 + async function walkMstCids( 181 + blockStore: BlockStore, 182 + nodeCid: CID, 183 + cids: Set<string>, 184 + ): Promise<void> { 185 + const cidStr = nodeCid.toString(); 186 + if (cids.has(cidStr)) return; // Already visited 187 + cids.add(cidStr); 188 + 189 + const bytes = await blockStore.getBlock(cidStr); 190 + if (!bytes) return; 191 + 192 + const nodeData = decodeNodeData(bytes); 193 + 194 + // Visit left subtree 195 + if (nodeData.l) { 196 + await walkMstCids(blockStore, nodeData.l, cids); 197 + } 198 + 199 + // Visit each entry: collect value CID, recurse into right subtree 200 + for (const entry of nodeData.e) { 201 + cids.add(entry.v.toString()); 202 + if (entry.t) { 203 + await walkMstCids(blockStore, entry.t, cids); 204 + } 205 + } 206 + } 207 + 208 + /** 209 + * Extract ALL CIDs referenced by a repo at a given commit. 210 + * 211 + * Walks the commit → MST → all reachable CIDs including: 212 + * - The commit CID itself 213 + * - The MST root CID 214 + * - Every MST node CID 215 + * - Every record value CID 216 + * - Every subtree pointer CID 217 + * 218 + * This produces the complete "live CID set" for a repo. Any CID tracked 219 + * in replication_blocks but NOT in this set is orphaned from this DID's 220 + * perspective and can be considered for GC. 221 + * 222 + * @param blockStore - Block storage containing the repo blocks 223 + * @param commitCid - CID of the commit block (repo head) 224 + * @returns Set of all CIDs referenced by the repo 225 + */ 226 + export async function extractAllCids( 227 + blockStore: BlockStore, 228 + commitCid: string, 229 + ): Promise<Set<string>> { 230 + const cids = new Set<string>(); 231 + cids.add(commitCid); 232 + 233 + const commitBytes = await blockStore.getBlock(commitCid); 234 + if (!commitBytes) return cids; 235 + 236 + const commitObj = cborDecode(commitBytes) as { data: CID }; 237 + const mstRootCid = commitObj.data; 238 + if (!mstRootCid) return cids; 239 + 240 + await walkMstCids(blockStore, mstRootCid, cids); 241 + return cids; 242 + } 243 + 176 244 // ---------- Generation ---------- 177 245 178 246 /**
+2
src/replication/offer-manager.test.ts
··· 30 30 REPLICATE_DIDS: [], 31 31 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 32 32 FIREHOSE_ENABLED: false, 33 + NODE_DID: "did:web:test.example.com", 34 + NODE_MANAGERS: [], 33 35 }; 34 36 } 35 37
+4 -1
src/replication/peer-freshness.test.ts
··· 44 44 REPLICATE_DIDS: replicateDids, 45 45 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 46 46 FIREHOSE_ENABLED: false, 47 + NODE_DID: "did:web:test.example.com", 48 + NODE_MANAGERS: [], 47 49 }; 48 50 } 49 51 ··· 112 114 getBlock: vi.fn().mockResolvedValue(null), 113 115 hasBlock: vi.fn().mockResolvedValue(false), 114 116 putBlocks: vi.fn().mockResolvedValue(undefined), 117 + deleteBlock: vi.fn().mockResolvedValue(undefined), 115 118 }; 116 119 } 117 120 ··· 713 716 714 717 // publishPeerIdentity was called during init 715 718 expect(mockNet.publishIdentityNotification).toHaveBeenCalledWith( 716 - "did:plc:local", 719 + "did:web:test.example.com", 717 720 "12D3KooWMockPeer", 718 721 ["/ip4/127.0.0.1/tcp/4001"], 719 722 );
+12
src/replication/policy-integration.test.ts
··· 52 52 REPLICATE_DIDS: replicateDids, 53 53 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 54 54 FIREHOSE_ENABLED: false, 55 + NODE_DID: "did:web:test.example.com", 56 + NODE_MANAGERS: [], 55 57 }; 56 58 } 57 59 ··· 100 102 didResolver = new DidResolver({ 101 103 didCache: new InMemoryDidCache(), 102 104 }); 105 + 106 + // Ensure replication tables exist (needed for getReplicateDids which queries admin_tracked_dids) 107 + const { SyncStorage } = await import("./sync-storage.js"); 108 + new SyncStorage(db).initSchema(); 103 109 }); 104 110 105 111 afterEach(async () => { ··· 707 713 didResolver = new DidResolver({ 708 714 didCache: new InMemoryDidCache(), 709 715 }); 716 + 717 + const { SyncStorage } = await import("./sync-storage.js"); 718 + new SyncStorage(db).initSchema(); 710 719 }); 711 720 712 721 afterEach(async () => { ··· 861 870 didResolver = new DidResolver({ 862 871 didCache: new InMemoryDidCache(), 863 872 }); 873 + 874 + const { SyncStorage } = await import("./sync-storage.js"); 875 + new SyncStorage(db).initSchema(); 864 876 }); 865 877 866 878 afterEach(async () => {
+381 -39
src/replication/replication-manager.ts
··· 31 31 import { RepoFetcher } from "./repo-fetcher.js"; 32 32 import { PeerDiscovery } from "./peer-discovery.js"; 33 33 import { BlockVerifier, RemoteVerifier } from "./verification.js"; 34 - import { extractAllRecordPaths } from "./mst-proof.js"; 34 + import { extractAllRecordPaths, extractAllCids } from "./mst-proof.js"; 35 35 import { 36 36 FirehoseSubscription, 37 37 type FirehoseCommitEvent, 38 + type FirehoseAccountEvent, 38 39 } from "./firehose-subscription.js"; 39 40 import { ChallengeScheduler } from "./challenge-response/challenge-scheduler.js"; 40 41 import { ChallengeStorage, type ChallengeHistoryRow, type PeerReliabilityRow } from "./challenge-response/challenge-storage.js"; ··· 76 77 private recentNotifications: Set<string> = new Set(); 77 78 private notificationCleanupTimer: ReturnType<typeof setInterval> | null = null; 78 79 80 + /** The node's own DID (did:web), used for identity in replication/challenges. */ 81 + private nodeDid: string; 82 + 79 83 constructor( 80 84 db: Database.Database, 81 85 private config: Config, ··· 86 90 verificationConfig?: Partial<VerificationConfig>, 87 91 private replicatedRepoReader?: ReplicatedRepoReader, 88 92 policyEngine?: PolicyEngine, 93 + nodeOpts?: { nodeDid: string }, 89 94 ) { 95 + this.nodeDid = nodeOpts?.nodeDid ?? config.NODE_DID; 90 96 this.syncStorage = new SyncStorage(db); 91 97 this.challengeStorage = new ChallengeStorage(db); 92 98 this.repoFetcher = new RepoFetcher(didResolver); ··· 106 112 repoManager, 107 113 this.peerDiscovery, 108 114 policyEngine, 109 - config.DID, 115 + this.nodeDid, 110 116 ); 111 117 } 112 118 } 113 119 114 120 /** 121 + * Get the node's DID (did:web identity). 122 + */ 123 + getNodeDid(): string { 124 + return this.nodeDid; 125 + } 126 + 127 + /** 115 128 * Get the PolicyEngine, if one is configured. 116 129 */ 117 130 getPolicyEngine(): PolicyEngine | null { ··· 158 171 this.lastPublishedMultiaddrs = multiaddrs; 159 172 160 173 // Broadcast identity change via gossipsub (fire-and-forget) 161 - this.networkService.publishIdentityNotification(this.config.DID, peerId, multiaddrs).catch(() => {}); 174 + this.networkService.publishIdentityNotification(this.nodeDid, peerId, multiaddrs).catch(() => {}); 162 175 } 163 176 164 177 /** ··· 177 190 178 191 /** 179 192 * Get the merged list of DIDs to replicate. 180 - * Combines config.REPLICATE_DIDS with policy engine explicit DIDs (deduplicated). 181 - * When a policy engine is present, filters out DIDs where shouldReplicate is false. 193 + * Combines config.REPLICATE_DIDS, admin-added DIDs, and policy engine explicit DIDs (deduplicated). 194 + * When a policy engine is present, filters out DIDs where shouldReplicate is false, 195 + * but config DIDs and admin DIDs always replicate. 182 196 */ 183 197 getReplicateDids(): string[] { 184 - const configDids = new Set(this.config.REPLICATE_DIDS); 198 + const allDids = new Set(this.config.REPLICATE_DIDS); 199 + 200 + // Source 2: Admin-added DIDs from SQLite 201 + for (const did of this.syncStorage.getAdminDids()) { 202 + allDids.add(did); 203 + } 185 204 186 205 if (this.policyEngine) { 187 - // Add DIDs from policy engine explicit lists 206 + // Source 3: Policy engine explicit DIDs 188 207 for (const did of this.policyEngine.getExplicitDids()) { 189 - configDids.add(did); 208 + allDids.add(did); 190 209 } 191 210 192 211 // Filter: only include DIDs where policy says to replicate, 193 - // but always include config DIDs (they replicate even without a matching policy) 212 + // but always include config DIDs and admin DIDs (they replicate regardless) 194 213 const result: string[] = []; 195 - for (const did of configDids) { 214 + for (const did of allDids) { 196 215 if ( 197 216 this.config.REPLICATE_DIDS.includes(did) || 217 + this.syncStorage.isAdminDid(did) || 198 218 this.policyEngine.shouldReplicate(did) 199 219 ) { 200 220 result.push(did); ··· 203 223 return result; 204 224 } 205 225 206 - return [...configDids]; 226 + return [...allDids]; 227 + } 228 + 229 + /** 230 + * Determine the source of a tracked DID. 231 + */ 232 + getDidSource(did: string): "config" | "admin" | "policy" | null { 233 + if (this.config.REPLICATE_DIDS.includes(did)) return "config"; 234 + if (this.syncStorage.isAdminDid(did)) return "admin"; 235 + if (this.policyEngine?.getExplicitDids().includes(did)) return "policy"; 236 + return null; 207 237 } 208 238 209 239 /** 210 - * Ensure a manifest record exists for each configured DID. 211 - * Creates new manifests or updates existing ones. 240 + * Add a DID via the admin interface. 241 + * Persists to SQLite, creates manifest + sync state, subscribes gossipsub, updates firehose. 242 + * Returns the status and source if already tracked. 212 243 */ 213 - async syncManifests(): Promise<void> { 214 - for (const did of this.getReplicateDids()) { 215 - const rkey = didToRkey(did); 244 + async addDid(did: string): Promise<{ status: "added" | "already_tracked"; source?: string }> { 245 + if (this.config.REPLICATE_DIDS.includes(did)) { 246 + return { status: "already_tracked", source: "config" }; 247 + } 248 + if (this.syncStorage.isAdminDid(did)) { 249 + return { status: "already_tracked", source: "admin" }; 250 + } 251 + if (this.policyEngine?.getExplicitDids().includes(did)) { 252 + return { status: "already_tracked", source: "policy" }; 253 + } 216 254 217 - // Check if manifest already exists 218 - const existing = await this.repoManager.getRecord( 219 - MANIFEST_NSID, 220 - rkey, 221 - ); 255 + this.syncStorage.addAdminDid(did); 256 + await this.syncManifestForDid(did); 257 + this.networkService.subscribeCommitTopics([did]); 258 + this.networkService.subscribeIdentityTopics([did]); 259 + this.updateFirehoseDids(); 222 260 223 - if (!existing) { 261 + // Trigger initial sync in background (fire-and-forget) 262 + this.syncDid(did).catch((err) => { 263 + console.error(`[replication] Initial sync for admin-added ${did} failed:`, err); 264 + }); 265 + 266 + return { status: "added" }; 267 + } 268 + 269 + /** 270 + * Remove an admin-added DID. 271 + * Returns error info if the DID cannot be removed (config/policy origin). 272 + * If purgeData is true, deletes all associated data. Otherwise pauses. 273 + */ 274 + async removeDid(did: string, purgeData: boolean = false): Promise<{ 275 + status: "removed" | "error"; 276 + purged: boolean; 277 + error?: string; 278 + }> { 279 + if (this.config.REPLICATE_DIDS.includes(did)) { 280 + return { status: "error", purged: false, error: "Cannot remove config-origin DID. Remove from REPLICATE_DIDS env var." }; 281 + } 282 + if (this.policyEngine?.getExplicitDids().includes(did) && !this.syncStorage.isAdminDid(did)) { 283 + return { status: "error", purged: false, error: "Cannot remove policy-managed DID. Update the policy instead." }; 284 + } 285 + 286 + this.syncStorage.removeAdminDid(did); 287 + this.networkService.unsubscribeCommitTopics([did]); 288 + this.networkService.unsubscribeIdentityTopics([did]); 289 + this.updateFirehoseDids(); 290 + 291 + if (purgeData) { 292 + this.syncStorage.clearBlocks(did); 293 + this.syncStorage.clearBlobs(did); 294 + this.syncStorage.clearRecordPaths(did); 295 + this.syncStorage.clearPeerEndpoints(did); 296 + this.syncStorage.deleteState(did); 297 + 298 + // Remove manifest record 299 + const rkey = didToRkey(did); 300 + try { 301 + await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 302 + } catch { 303 + // Non-fatal: manifest may not exist 304 + } 305 + } else { 306 + // Pause: update manifest status but keep data 307 + const rkey = didToRkey(did); 308 + const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 309 + if (existing) { 224 310 const manifest: ManifestRecord = { 225 - $type: MANIFEST_NSID, 226 - subject: did, 227 - status: "active", 228 - lastSyncRev: null, 229 - lastSyncAt: null, 230 - createdAt: new Date().toISOString(), 311 + ...(existing.record as ManifestRecord), 312 + status: "paused", 231 313 }; 232 - 233 314 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 234 315 } 316 + } 235 317 236 - // Ensure sync state exists 237 - const pdsEndpoint = await this.repoFetcher.resolvePds(did); 238 - if (pdsEndpoint) { 239 - this.syncStorage.upsertState({ 240 - did, 241 - pdsEndpoint, 242 - }); 243 - } 318 + return { status: "removed", purged: purgeData }; 319 + } 320 + 321 + /** 322 + * Update firehose subscription with the current set of tracked DIDs. 323 + */ 324 + private updateFirehoseDids(): void { 325 + if (this.firehoseSubscription) { 326 + this.firehoseSubscription.updateDids(new Set(this.getReplicateDids())); 327 + } 328 + } 329 + 330 + /** 331 + * Ensure a manifest record and sync state exist for a single DID. 332 + * Extracted from syncManifests() for use by addDid(). 333 + */ 334 + private async syncManifestForDid(did: string): Promise<void> { 335 + const rkey = didToRkey(did); 336 + const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 337 + 338 + if (!existing) { 339 + const manifest: ManifestRecord = { 340 + $type: MANIFEST_NSID, 341 + subject: did, 342 + status: "active", 343 + lastSyncRev: null, 344 + lastSyncAt: null, 345 + createdAt: new Date().toISOString(), 346 + }; 347 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 348 + } 349 + 350 + const pdsEndpoint = await this.repoFetcher.resolvePds(did); 351 + if (pdsEndpoint) { 352 + this.syncStorage.upsertState({ did, pdsEndpoint }); 353 + } 354 + } 355 + 356 + /** 357 + * Ensure a manifest record exists for each configured DID. 358 + * Creates new manifests or updates existing ones. 359 + */ 360 + async syncManifests(): Promise<void> { 361 + for (const did of this.getReplicateDids()) { 362 + await this.syncManifestForDid(did); 244 363 } 245 364 } 246 365 ··· 365 484 for (const did of sortedDids) { 366 485 if (this.stopped) break; 367 486 487 + // Skip tombstoned DIDs (handled separately by cleanupTombstonedDids) 488 + const currentState = this.syncStorage.getState(did); 489 + if (currentState?.status === "tombstoned") continue; 490 + 368 491 // Check per-DID interval when policy engine is present 369 492 if (this.policyEngine && !this.isDidDueForSync(did)) { 370 493 continue; ··· 380 503 this.syncStorage.updateStatus(did, "error", message); 381 504 } 382 505 } 506 + 507 + // Run deferred GC for DIDs flagged by firehose delete/update ops 508 + await this.runDeferredGc(); 509 + 510 + // Clean up tombstoned DIDs after grace period 511 + await this.cleanupTombstonedDids(); 383 512 384 513 // Discover peer endpoints for P2P fallback 385 514 await this.discoverPeerEndpoints(); ··· 447 576 let pdsEndpoint = await this.repoFetcher.resolvePds(did); 448 577 449 578 if (!pdsEndpoint) { 450 - this.syncStorage.updateStatus(did, "error", "Could not resolve PDS endpoint"); 579 + // If previously had a PDS endpoint, this may be a tombstoned/deleted account 580 + if (state?.pdsEndpoint) { 581 + this.syncStorage.markTombstoned(did); 582 + console.warn(`[replication] DID ${did} no longer resolvable — marked as tombstoned`); 583 + } else { 584 + this.syncStorage.updateStatus(did, "error", "Could not resolve PDS endpoint"); 585 + } 451 586 return; 452 587 } 453 588 ··· 589 724 // Non-fatal: path extraction is best-effort 590 725 } 591 726 727 + // 9d. Reconcile blocks: remove orphaned blocks from this DID's tracking 728 + try { 729 + const liveCids = await extractAllCids(this.blockStore, rootCidStr); 730 + const trackedCids = this.syncStorage.getBlockCidSet(did); 731 + 732 + // Find CIDs that are tracked but no longer live 733 + const orphanedFromDid: string[] = []; 734 + for (const cid of trackedCids) { 735 + if (!liveCids.has(cid)) { 736 + orphanedFromDid.push(cid); 737 + } 738 + } 739 + 740 + if (orphanedFromDid.length > 0) { 741 + // Remove from this DID's tracking 742 + this.syncStorage.removeBlocks(did, orphanedFromDid); 743 + 744 + // Find which are truly orphaned (no other DID references them) 745 + const trulyOrphaned = this.syncStorage.findOrphanedCids(orphanedFromDid); 746 + 747 + // Delete from blockstore 748 + for (const cid of trulyOrphaned) { 749 + await this.blockStore.deleteBlock(cid); 750 + } 751 + } 752 + 753 + // Clear the needs_gc flag since we just did a full reconciliation 754 + this.syncStorage.clearNeedsGc(did); 755 + } catch { 756 + // Non-fatal: GC is best-effort 757 + } 758 + 759 + // 9e. Reconcile blobs: remove blobs for deleted records 760 + try { 761 + const currentBlobCids = new Set<string>(); 762 + if (this.replicatedRepoReader) { 763 + const repo = await this.replicatedRepoReader.getRepo(did); 764 + if (repo) { 765 + for await (const entry of repo.walkRecords()) { 766 + const cids = extractBlobCids(entry.record); 767 + for (const cid of cids) { 768 + currentBlobCids.add(cid); 769 + } 770 + } 771 + } 772 + } 773 + 774 + const trackedBlobCids = this.syncStorage.getBlobCids(did); 775 + const orphanedBlobs = trackedBlobCids.filter(cid => !currentBlobCids.has(cid)); 776 + 777 + if (orphanedBlobs.length > 0) { 778 + this.syncStorage.removeBlobs(did, orphanedBlobs); 779 + const trulyOrphanedBlobs = this.syncStorage.findOrphanedBlobCids(orphanedBlobs); 780 + for (const cid of trulyOrphanedBlobs) { 781 + await this.blockStore.deleteBlock(cid); 782 + } 783 + } 784 + } catch { 785 + // Non-fatal: blob GC is best-effort 786 + } 787 + 592 788 // 10. Update manifest record 593 789 const rkey = didToRkey(did); 594 790 const existingManifest = await this.repoManager.getRecord( ··· 635 831 } 636 832 } 637 833 834 + /** Grace period before purging tombstoned DID data (24 hours). */ 835 + private static readonly TOMBSTONE_GRACE_MS = 24 * 60 * 60 * 1000; 836 + 837 + /** 838 + * Clean up tombstoned DIDs: re-verify resolution, purge if still dead. 839 + * Only purges after a grace period (24 hours) to avoid premature deletion 840 + * from transient DID resolution failures. 841 + */ 842 + private async cleanupTombstonedDids(): Promise<void> { 843 + const states = this.syncStorage.getAllStates(); 844 + const tombstoned = states.filter((s) => s.status === "tombstoned"); 845 + 846 + for (const state of tombstoned) { 847 + if (this.stopped) break; 848 + 849 + // Check grace period: only purge if tombstoned for > 24 hours 850 + const lastSyncAt = state.lastSyncAt ? new Date(state.lastSyncAt).getTime() : 0; 851 + const timeSinceLastSync = Date.now() - lastSyncAt; 852 + if (timeSinceLastSync < ReplicationManager.TOMBSTONE_GRACE_MS) { 853 + continue; 854 + } 855 + 856 + // Re-verify: try to resolve the DID again 857 + try { 858 + const pdsEndpoint = await this.repoFetcher.resolvePds(state.did); 859 + if (pdsEndpoint) { 860 + // DID is alive again! Un-tombstone and resume. 861 + this.syncStorage.updateStatus(state.did, "pending"); 862 + this.syncStorage.upsertState({ did: state.did, pdsEndpoint }); 863 + console.log(`[replication] Tombstoned DID ${state.did} is alive again, resuming`); 864 + this.syncDid(state.did).catch((err) => { 865 + const message = err instanceof Error ? err.message : String(err); 866 + this.syncStorage.updateStatus(state.did, "error", message); 867 + }); 868 + continue; 869 + } 870 + } catch { 871 + // Resolution failed, proceed with purge 872 + } 873 + 874 + // Still dead — purge data 875 + console.warn(`[replication] Purging data for tombstoned DID ${state.did}`); 876 + const { blocksRemoved, blobsRemoved } = this.syncStorage.purgeDidData(state.did); 877 + 878 + // Delete truly orphaned blocks/blobs from blockstore 879 + if (blocksRemoved.length > 0) { 880 + const orphanedBlocks = this.syncStorage.findOrphanedCids(blocksRemoved); 881 + for (const cid of orphanedBlocks) { 882 + await this.blockStore.deleteBlock(cid); 883 + } 884 + } 885 + if (blobsRemoved.length > 0) { 886 + const orphanedBlobs = this.syncStorage.findOrphanedBlobCids(blobsRemoved); 887 + for (const cid of orphanedBlobs) { 888 + await this.blockStore.deleteBlock(cid); 889 + } 890 + } 891 + 892 + // Remove manifest record 893 + const rkey = didToRkey(state.did); 894 + try { 895 + await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 896 + } catch { 897 + // Non-fatal 898 + } 899 + } 900 + } 901 + 902 + /** 903 + * Run deferred GC for DIDs that were flagged by firehose delete/update ops. 904 + * Triggers a full sync for each flagged DID, which includes block/blob reconciliation. 905 + */ 906 + private async runDeferredGc(): Promise<void> { 907 + const didsNeedingGc = this.syncStorage.getDidsNeedingGc(); 908 + for (const did of didsNeedingGc) { 909 + if (this.stopped) break; 910 + try { 911 + await this.syncDid(did); 912 + this.lastSyncTimestamps.set(did, Date.now()); 913 + } catch (err) { 914 + const message = err instanceof Error ? err.message : String(err); 915 + this.syncStorage.updateStatus(did, "error", message); 916 + } 917 + } 918 + } 919 + 638 920 /** 639 921 * Compute the tick interval for periodic sync. 640 922 * ··· 731 1013 }); 732 1014 }); 733 1015 1016 + // Register handler for account status events (tombstone/deactivation) 1017 + this.firehoseSubscription.onAccount((event) => { 1018 + this.handleFirehoseAccount(event).catch((err) => { 1019 + console.error(`[replication] Firehose account handler error for ${event.did}:`, err); 1020 + }); 1021 + }); 1022 + 734 1023 // Load saved cursor for resumption 735 1024 const savedCursor = this.syncStorage.getFirehoseCursor(); 736 1025 ··· 825 1114 } 826 1115 827 1116 /** 1117 + * Handle an account status event from the firehose. 1118 + * Detects tombstoned/deactivated accounts and marks them accordingly. 1119 + * Handles re-activation by clearing tombstone and triggering sync. 1120 + */ 1121 + private async handleFirehoseAccount(event: FirehoseAccountEvent): Promise<void> { 1122 + const did = event.did; 1123 + 1124 + // Only process DIDs we are tracking 1125 + const replicateDids = this.getReplicateDids(); 1126 + if (!replicateDids.includes(did)) return; 1127 + 1128 + if (!event.active || event.status === "deleted" || event.status === "takendown") { 1129 + // Mark as tombstoned 1130 + this.syncStorage.markTombstoned(did); 1131 + 1132 + // Update manifest to paused 1133 + const rkey = didToRkey(did); 1134 + const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 1135 + if (existing) { 1136 + const manifest: ManifestRecord = { 1137 + ...(existing.record as ManifestRecord), 1138 + status: "paused", 1139 + }; 1140 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1141 + } 1142 + 1143 + console.warn( 1144 + `[replication] Account ${did} deactivated/tombstoned (status=${event.status}, active=${event.active})`, 1145 + ); 1146 + } else if (event.active) { 1147 + // Re-activated: clear tombstone and trigger full sync 1148 + const state = this.syncStorage.getState(did); 1149 + if (state?.status === "tombstoned") { 1150 + this.syncStorage.updateStatus(did, "pending"); 1151 + console.log(`[replication] Account ${did} re-activated, triggering sync`); 1152 + this.syncDid(did).catch((err) => { 1153 + const message = err instanceof Error ? err.message : String(err); 1154 + this.syncStorage.updateStatus(did, "error", message); 1155 + }); 1156 + } 1157 + } 1158 + } 1159 + 1160 + /** 828 1161 * Apply blocks from a firehose commit event directly to the blockstore. 829 1162 * The event's `blocks` field contains CAR-encoded bytes that can be parsed 830 1163 * and stored without fetching from the source PDS. ··· 908 1241 } 909 1242 if (deletedPaths.length > 0) { 910 1243 this.syncStorage.removeRecordPaths(did, deletedPaths); 1244 + } 1245 + 1246 + // Flag for deferred GC if deletes or updates occurred 1247 + // (updates may orphan old MST nodes/record blocks) 1248 + const hasDeletesOrUpdates = event.ops.some( 1249 + (op) => op.action === "delete" || op.action === "update", 1250 + ); 1251 + if (hasDeletesOrUpdates) { 1252 + this.syncStorage.setNeedsGc(did); 911 1253 } 912 1254 } catch { 913 1255 // Non-fatal: path tracking is best-effort ··· 1335 1677 if (this.challengeScheduler) return; 1336 1678 1337 1679 this.challengeScheduler = new ChallengeScheduler( 1338 - this.config.DID, 1680 + this.nodeDid, 1339 1681 this.policyEngine, 1340 1682 this.syncStorage, 1341 1683 this.challengeStorage,
+5 -3
src/replication/replication.test.ts
··· 55 55 REPLICATE_DIDS: replicateDids, 56 56 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 57 57 FIREHOSE_ENABLED: false, 58 + NODE_DID: "did:web:test.example.com", 59 + NODE_MANAGERS: [], 58 60 }; 59 61 } 60 62 ··· 604 606 605 607 // Replica setup (different DID for the local node, but will replicate source's data) 606 608 const replicaConfig = testConfig(join(tmpDir, "replica"), [ 607 - sourceConfig.DID, 609 + sourceConfig.DID!, 608 610 ]); 609 611 replicaConfig.DID = "did:plc:replica456"; 610 612 replicaConfig.SIGNING_KEY = ··· 1555 1557 const firehose = new Firehose(replicaRepo); 1556 1558 app = createApp( 1557 1559 replicaConfig, 1558 - replicaRepo, 1559 1560 firehose, 1560 1561 replicaIpfs, 1561 1562 replicaIpfs, 1562 1563 undefined, 1563 1564 undefined, 1564 1565 reader, 1566 + { nodeDid: replicaConfig.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: replicaRepo, repoManager: replicaRepo }, 1565 1567 ); 1566 1568 }); 1567 1569 ··· 1977 1979 const firehose = new Firehose(replicaRepo); 1978 1980 app = createApp( 1979 1981 replicaConfig, 1980 - replicaRepo, 1981 1982 firehose, 1982 1983 replicaIpfs, // blockStore 1983 1984 replicaIpfs, // networkService 1984 1985 undefined, // blobStore 1985 1986 mockReplicationManager, 1986 1987 undefined, // replicatedRepoReader 1988 + { nodeDid: replicaConfig.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: replicaRepo, repoManager: replicaRepo }, 1987 1989 ); 1988 1990 }); 1989 1991
+214
src/replication/sync-storage.ts
··· 75 75 ); 76 76 `); 77 77 78 + // Admin-added DIDs table: persists DIDs added via admin UI. 79 + this.db.exec(` 80 + CREATE TABLE IF NOT EXISTS admin_tracked_dids ( 81 + did TEXT PRIMARY KEY, 82 + added_at TEXT NOT NULL DEFAULT (datetime('now')), 83 + added_by TEXT 84 + ); 85 + `); 86 + 78 87 // Sync history table: logs each sync event with metrics. 79 88 this.db.exec(` 80 89 CREATE TABLE IF NOT EXISTS sync_history ( ··· 128 137 if (!blobCols.some((c) => c.name === "size_bytes")) { 129 138 this.db.exec( 130 139 "ALTER TABLE replication_blobs ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0", 140 + ); 141 + } 142 + 143 + // Migration: add needs_gc flag to replication_state 144 + if (!columns.some((c) => c.name === "needs_gc")) { 145 + this.db.exec( 146 + "ALTER TABLE replication_state ADD COLUMN needs_gc INTEGER NOT NULL DEFAULT 0", 131 147 ); 132 148 } 133 149 } ··· 532 548 } 533 549 534 550 // ============================================ 551 + // Admin-added DID management 552 + // ============================================ 553 + 554 + /** 555 + * Add a DID to the admin-tracked list (idempotent). 556 + */ 557 + addAdminDid(did: string): void { 558 + this.db 559 + .prepare( 560 + "INSERT OR IGNORE INTO admin_tracked_dids (did) VALUES (?)", 561 + ) 562 + .run(did); 563 + } 564 + 565 + /** 566 + * Remove a DID from the admin-tracked list. 567 + * Returns true if the DID was actually removed. 568 + */ 569 + removeAdminDid(did: string): boolean { 570 + const result = this.db 571 + .prepare("DELETE FROM admin_tracked_dids WHERE did = ?") 572 + .run(did); 573 + return result.changes > 0; 574 + } 575 + 576 + /** 577 + * Get all admin-added DIDs. 578 + */ 579 + getAdminDids(): string[] { 580 + const rows = this.db 581 + .prepare("SELECT did FROM admin_tracked_dids ORDER BY added_at") 582 + .all() as Array<{ did: string }>; 583 + return rows.map((r) => r.did); 584 + } 585 + 586 + /** 587 + * Check if a DID was added via the admin interface. 588 + */ 589 + isAdminDid(did: string): boolean { 590 + const row = this.db 591 + .prepare("SELECT 1 FROM admin_tracked_dids WHERE did = ?") 592 + .get(did); 593 + return row !== undefined; 594 + } 595 + 596 + /** 597 + * Delete sync state for a DID. 598 + */ 599 + deleteState(did: string): void { 600 + this.db 601 + .prepare("DELETE FROM replication_state WHERE did = ?") 602 + .run(did); 603 + } 604 + 605 + /** 606 + * Clear all tracked blobs for a DID. 607 + */ 608 + clearBlobs(did: string): void { 609 + this.db 610 + .prepare("DELETE FROM replication_blobs WHERE did = ?") 611 + .run(did); 612 + } 613 + 614 + // ============================================ 615 + // Block/blob GC and orphan detection 616 + // ============================================ 617 + 618 + /** 619 + * Remove specific block tracking entries for a DID (batch delete). 620 + */ 621 + removeBlocks(did: string, cids: string[]): void { 622 + if (cids.length === 0) return; 623 + const remove = this.db.prepare( 624 + "DELETE FROM replication_blocks WHERE did = ? AND cid = ?", 625 + ); 626 + const batch = this.db.transaction((items: string[]) => { 627 + for (const cid of items) { 628 + remove.run(did, cid); 629 + } 630 + }); 631 + batch(cids); 632 + } 633 + 634 + /** 635 + * Find CIDs that have zero remaining references across all DIDs. 636 + * Given a list of CID strings, returns those with no rows in replication_blocks. 637 + */ 638 + findOrphanedCids(cids: string[]): string[] { 639 + if (cids.length === 0) return []; 640 + const orphaned: string[] = []; 641 + const check = this.db.prepare( 642 + "SELECT 1 FROM replication_blocks WHERE cid = ? LIMIT 1", 643 + ); 644 + for (const cid of cids) { 645 + const row = check.get(cid); 646 + if (!row) orphaned.push(cid); 647 + } 648 + return orphaned; 649 + } 650 + 651 + /** 652 + * Remove specific blob tracking entries for a DID (batch delete). 653 + */ 654 + removeBlobs(did: string, cids: string[]): void { 655 + if (cids.length === 0) return; 656 + const remove = this.db.prepare( 657 + "DELETE FROM replication_blobs WHERE did = ? AND cid = ?", 658 + ); 659 + const batch = this.db.transaction((items: string[]) => { 660 + for (const cid of items) { 661 + remove.run(did, cid); 662 + } 663 + }); 664 + batch(cids); 665 + } 666 + 667 + /** 668 + * Find blob CIDs that have zero remaining references across all DIDs. 669 + */ 670 + findOrphanedBlobCids(cids: string[]): string[] { 671 + if (cids.length === 0) return []; 672 + const orphaned: string[] = []; 673 + const check = this.db.prepare( 674 + "SELECT 1 FROM replication_blobs WHERE cid = ? LIMIT 1", 675 + ); 676 + for (const cid of cids) { 677 + const row = check.get(cid); 678 + if (!row) orphaned.push(cid); 679 + } 680 + return orphaned; 681 + } 682 + 683 + /** 684 + * Get all tracked block CIDs for a DID as a Set for efficient diffing. 685 + */ 686 + getBlockCidSet(did: string): Set<string> { 687 + return new Set(this.getBlockCids(did)); 688 + } 689 + 690 + /** 691 + * Mark a DID as needing garbage collection. 692 + */ 693 + setNeedsGc(did: string): void { 694 + this.db 695 + .prepare("UPDATE replication_state SET needs_gc = 1 WHERE did = ?") 696 + .run(did); 697 + } 698 + 699 + /** 700 + * Clear the needs_gc flag for a DID. 701 + */ 702 + clearNeedsGc(did: string): void { 703 + this.db 704 + .prepare("UPDATE replication_state SET needs_gc = 0 WHERE did = ?") 705 + .run(did); 706 + } 707 + 708 + /** 709 + * Get all DIDs that need garbage collection. 710 + */ 711 + getDidsNeedingGc(): string[] { 712 + const rows = this.db 713 + .prepare("SELECT did FROM replication_state WHERE needs_gc = 1") 714 + .all() as Array<{ did: string }>; 715 + return rows.map((r) => r.did); 716 + } 717 + 718 + /** 719 + * Mark a DID as tombstoned (account deleted/deactivated upstream). 720 + */ 721 + markTombstoned(did: string): void { 722 + this.db 723 + .prepare("UPDATE replication_state SET status = 'tombstoned' WHERE did = ?") 724 + .run(did); 725 + } 726 + 727 + /** 728 + * Purge all tracking data for a DID. 729 + * Returns the CID lists before deletion so the caller can check for orphans. 730 + */ 731 + purgeDidData(did: string): { blocksRemoved: string[]; blobsRemoved: string[] } { 732 + const blocksRemoved = this.getBlockCids(did); 733 + const blobsRemoved = this.getBlobCids(did); 734 + 735 + const purge = this.db.transaction(() => { 736 + this.db.prepare("DELETE FROM replication_blocks WHERE did = ?").run(did); 737 + this.db.prepare("DELETE FROM replication_blobs WHERE did = ?").run(did); 738 + this.db.prepare("DELETE FROM replication_record_paths WHERE did = ?").run(did); 739 + this.db.prepare("DELETE FROM replication_state WHERE did = ?").run(did); 740 + this.db.prepare("DELETE FROM peer_endpoints WHERE target_did = ?").run(did); 741 + }); 742 + purge(); 743 + 744 + return { blocksRemoved, blobsRemoved }; 745 + } 746 + 747 + // ============================================ 535 748 // Sync history tracking 536 749 // ============================================ 537 750 ··· 757 970 lastVerifiedAt: (row.last_verified_at as string) ?? null, 758 971 status: row.status as SyncState["status"], 759 972 errorMessage: (row.error_message as string) ?? null, 973 + needsGc: (row.needs_gc as number) === 1, 760 974 }; 761 975 } 762 976 }
+2 -1
src/replication/types.ts
··· 46 46 rootCid: string | null; 47 47 lastSyncAt: string | null; 48 48 lastVerifiedAt: string | null; 49 - status: "pending" | "syncing" | "synced" | "error"; 49 + status: "pending" | "syncing" | "synced" | "error" | "tombstoned"; 50 50 errorMessage: string | null; 51 + needsGc: boolean; 51 52 } 52 53 53 54 /**
+9 -1
src/repo-manager.ts
··· 82 82 private async ensureRepoInitialized(): Promise<void> { 83 83 if (this.repoInitialized) return; 84 84 85 + if (!this.config.DID || !this.config.SIGNING_KEY) { 86 + throw new Error("RepoManager requires DID and SIGNING_KEY to be configured"); 87 + } 88 + 85 89 this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY); 86 90 87 91 const root = await this.storage.getRoot(); ··· 635 639 const importRev = tidNow(); 636 640 await this.storage.putMany(blocks, importRev); 637 641 642 + if (!this.config.DID || !this.config.SIGNING_KEY) { 643 + throw new Error("RepoManager requires DID and SIGNING_KEY for import"); 644 + } 645 + 638 646 this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY); 639 647 this.repo = await Repo.load(this.storage, rootCid); 640 648 ··· 813 821 type: "identity", 814 822 event: { 815 823 seq, 816 - did: this.config.DID, 824 + did: this.config.DID ?? "", 817 825 time, 818 826 handle, 819 827 },
+57 -10
src/server.ts
··· 22 22 import { FailoverChallengeTransport } from "./replication/challenge-response/failover-transport.js"; 23 23 import type { ChallengeTransport } from "./replication/challenge-response/transport.js"; 24 24 import type { Libp2p } from "@libp2p/interface"; 25 + import { loadOrCreateNodeIdentity, getPublicKeyMultibase } from "./node-identity.js"; 25 26 26 27 // Load configuration 27 28 const config = loadConfig(); ··· 51 52 }); 52 53 } 53 54 54 - // Initialize repo manager 55 - const repoManager = new RepoManager(db, config); 56 - const blobStore = new BlobStore(dataDir, config.DID); 57 - repoManager.init(blobStore, ipfsService, ipfsService); 55 + // Load node identity (keypair + did:web) 56 + const { keypair: nodeKeypair } = await loadOrCreateNodeIdentity(dataDir); 57 + const nodeDid = config.NODE_DID; 58 + const nodePublicKeyMultibase = getPublicKeyMultibase(nodeKeypair); 59 + 60 + // Initialize node repo (separate DB for node's own records: peer identity, manifests, offers) 61 + const nodeDbPath = resolve(dataDir, "node-repo.db"); 62 + const nodeDb = new Database(nodeDbPath); 63 + nodeDb.pragma("journal_mode = WAL"); 64 + nodeDb.pragma("synchronous = NORMAL"); 65 + 66 + const nodeKeyHex = Buffer.from(await nodeKeypair.export()).toString("hex"); 67 + const nodeRepoConfig = { 68 + ...config, 69 + DID: nodeDid, 70 + SIGNING_KEY: nodeKeyHex, 71 + SIGNING_KEY_PUBLIC: nodePublicKeyMultibase, 72 + }; 73 + const nodeRepoManager = new RepoManager(nodeDb, nodeRepoConfig); 74 + nodeRepoManager.init(); 75 + 76 + // Initialize social account repo (optional — only if DID is configured) 77 + let repoManager: RepoManager | undefined; 78 + let blobStore: BlobStore | undefined; 79 + if (config.DID && config.SIGNING_KEY) { 80 + repoManager = new RepoManager(db, config as typeof config & { DID: string; SIGNING_KEY: string }); 81 + blobStore = new BlobStore(dataDir, config.DID); 82 + repoManager.init(blobStore, ipfsService, ipfsService); 83 + } 58 84 59 - // Initialize firehose 60 - const firehose = new Firehose(repoManager); 85 + // Initialize firehose (uses social repo if available, else node repo) 86 + const firehose = new Firehose(repoManager ?? nodeRepoManager); 61 87 62 88 // Initialize DID resolver 63 89 const didResolver = new DidResolver({ ··· 86 112 replicationManager = new ReplicationManager( 87 113 db, 88 114 config, 89 - repoManager, 115 + nodeRepoManager, 90 116 ipfsService, 91 117 ipfsService, 92 118 didResolver, 93 119 undefined, 94 120 undefined, 95 121 policyEngine, 122 + { nodeDid }, 96 123 ); 97 124 replicatedRepoReader = new ReplicatedRepoReader( 98 125 ipfsService, ··· 102 129 } 103 130 104 131 // Create Hono app 105 - const app = createApp(config, repoManager, firehose, ipfsService, ipfsService, blobStore, replicationManager, replicatedRepoReader); 132 + const app = createApp( 133 + config, 134 + firehose, 135 + ipfsService, 136 + ipfsService, 137 + blobStore, 138 + replicationManager, 139 + replicatedRepoReader, 140 + { 141 + nodeDid, 142 + nodePublicKeyMultibase, 143 + nodeRepoManager, 144 + repoManager, 145 + }, 146 + ); 106 147 107 148 // Create HTTP server using @hono/node-server's request listener 108 149 const requestListener = getRequestListener(app.fetch); ··· 153 194 pc.bold(`\nP2PDS running at `) + 154 195 pc.cyan(`http://localhost:${config.PORT}`), 155 196 ); 156 - console.log(pc.dim(` DID: ${config.DID}`)); 157 - console.log(pc.dim(` Handle: @${config.HANDLE}`)); 197 + console.log(pc.dim(` Node: ${nodeDid}`)); 198 + if (config.DID) { 199 + console.log(pc.dim(` DID: ${config.DID}`)); 200 + } 201 + if (config.HANDLE) { 202 + console.log(pc.dim(` Handle: @${config.HANDLE}`)); 203 + } 158 204 console.log(pc.dim(` Data: ${dataDir}`)); 159 205 160 206 // Start IPFS after HTTP server is listening (IPFS startup can be slow) ··· 227 273 if (ipfsService) { 228 274 await ipfsService.stop(); 229 275 } 276 + nodeDb.close(); 230 277 db.close(); 231 278 }; 232 279 cleanup().finally(() => process.exit(0));
+5 -2
src/xrpc/admin-e2e.test.ts
··· 45 45 REPLICATE_DIDS: replicateDids, 46 46 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 47 47 FIREHOSE_ENABLED: false, 48 + NODE_DID: "did:web:test.example.com", 49 + NODE_MANAGERS: [], 48 50 }; 49 51 } 50 52 ··· 113 115 } 114 116 115 117 const firehoseA = new Firehose(repoManagerA); 116 - const appA = createApp(configA, repoManagerA, firehoseA); 118 + const appA = createApp(configA, firehoseA, undefined, undefined, undefined, undefined, undefined, { nodeDid: configA.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManagerA, repoManager: repoManagerA }); 117 119 ({ server: serverA, port: portA } = await startServer(appA)); 118 120 119 121 // ---- Node B: replicator with admin dashboard ---- ··· 164 166 const firehoseB = new Firehose(repoManagerB); 165 167 const appB = createApp( 166 168 configB, 167 - repoManagerB, 168 169 firehoseB, 169 170 ipfsB, 170 171 mockNetworkService, 171 172 undefined, 172 173 replicationManager, 174 + undefined, 175 + { nodeDid: configB.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManagerB, repoManager: repoManagerB }, 173 176 ); 174 177 ({ server: serverB, port: portB } = await startServer(appB)); 175 178
+236 -16
src/xrpc/admin.test.ts
··· 31 31 REPLICATE_DIDS: replicateDids, 32 32 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 33 33 FIREHOSE_ENABLED: false, 34 + NODE_DID: "did:web:test.example.com", 35 + NODE_MANAGERS: [], 34 36 }; 35 37 } 36 38 ··· 42 44 return app.request(path, { headers: AUTH_HEADERS }, {}); 43 45 } 44 46 47 + function authPost(app: ReturnType<typeof createApp>, path: string, body: unknown) { 48 + return app.request(path, { 49 + method: "POST", 50 + headers: { ...AUTH_HEADERS, "Content-Type": "application/json" }, 51 + body: JSON.stringify(body), 52 + }, {}); 53 + } 54 + 45 55 function noAuthGet(app: ReturnType<typeof createApp>, path: string) { 46 56 return app.request(path, undefined, {}); 47 57 } 48 58 59 + function noAuthPost(app: ReturnType<typeof createApp>, path: string, body: unknown) { 60 + return app.request(path, { 61 + method: "POST", 62 + headers: { "Content-Type": "application/json" }, 63 + body: JSON.stringify(body), 64 + }, {}); 65 + } 66 + 49 67 // ============================================ 50 68 // Auth check (covers all admin endpoints) 51 69 // ============================================ ··· 62 80 const repoManager = new RepoManager(db, config); 63 81 repoManager.init(); 64 82 const firehose = new Firehose(repoManager); 65 - app = createApp(config, repoManager, firehose); 83 + app = createApp(config, firehose, undefined, undefined, undefined, undefined, undefined, { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }); 66 84 }); 67 85 68 86 afterEach(() => { ··· 71 89 }); 72 90 73 91 it("returns 401 for all admin endpoints without auth", async () => { 74 - const endpoints = [ 92 + const getEndpoints = [ 75 93 "/xrpc/org.p2pds.admin.getOverview", 76 94 "/xrpc/org.p2pds.admin.getDidStatus?did=did:plc:test", 77 95 "/xrpc/org.p2pds.admin.getNetworkStatus", ··· 79 97 "/xrpc/org.p2pds.admin.getSyncHistory", 80 98 ]; 81 99 82 - for (const endpoint of endpoints) { 100 + for (const endpoint of getEndpoints) { 83 101 const res = await noAuthGet(app, endpoint); 84 102 expect(res.status, `${endpoint} should require auth`).toBe(401); 85 103 } 104 + 105 + const postEndpoints = [ 106 + "/xrpc/org.p2pds.admin.addDid", 107 + "/xrpc/org.p2pds.admin.removeDid", 108 + ]; 109 + 110 + for (const endpoint of postEndpoints) { 111 + const res = await noAuthPost(app, endpoint, { did: "did:plc:test" }); 112 + expect(res.status, `${endpoint} should require auth`).toBe(401); 113 + } 86 114 }); 87 115 }); 88 116 ··· 109 137 const repoManager = new RepoManager(db, config); 110 138 repoManager.init(); 111 139 const firehose = new Firehose(repoManager); 112 - const app = createApp(config, repoManager, firehose); 140 + const app = createApp(config, firehose, undefined, undefined, undefined, undefined, undefined, { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }); 113 141 114 142 const res = await authGet(app, "/xrpc/org.p2pds.admin.getOverview"); 115 143 expect(res.status).toBe(200); ··· 157 185 db, 158 186 config, 159 187 repoManager, 160 - { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {} }, 188 + { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {}, deleteBlock: async () => {} }, 161 189 mockNetworkService, 162 190 mockDidResolver as any, 163 191 ); ··· 167 195 const firehose = new Firehose(repoManager); 168 196 const app = createApp( 169 197 config, 170 - repoManager, 171 198 firehose, 172 199 undefined, 173 200 mockNetworkService, 174 201 undefined, 175 202 replicationManager, 203 + undefined, 204 + { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }, 176 205 ); 177 206 178 207 const res = await authGet(app, "/xrpc/org.p2pds.admin.getOverview"); ··· 187 216 }); 188 217 const repl = json.replication as Record<string, unknown>; 189 218 expect(repl.enabled).toBe(true); 219 + expect(repl.didSources).toBeDefined(); 190 220 expect(repl.aggregate).toBeDefined(); 191 221 const agg = repl.aggregate as Record<string, unknown>; 192 222 expect(typeof agg.totalBlocks).toBe("number"); ··· 241 271 db, 242 272 config, 243 273 repoManager, 244 - { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {} }, 274 + { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {}, deleteBlock: async () => {} }, 245 275 mockNetworkService, 246 276 { resolve: async () => null } as any, 247 277 ); ··· 250 280 const firehose = new Firehose(repoManager); 251 281 app = createApp( 252 282 config, 253 - repoManager, 254 283 firehose, 255 284 undefined, 256 285 mockNetworkService, 257 286 undefined, 258 287 replicationManager, 288 + undefined, 289 + { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }, 259 290 ); 260 291 }); 261 292 ··· 338 369 const repoManager = new RepoManager(db, config); 339 370 repoManager.init(); 340 371 const firehose = new Firehose(repoManager); 341 - const app = createApp(config, repoManager, firehose); 372 + const app = createApp(config, firehose, undefined, undefined, undefined, undefined, undefined, { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }); 342 373 343 374 const res = await authGet(app, "/xrpc/org.p2pds.admin.getNetworkStatus"); 344 375 expect(res.status).toBe(200); ··· 373 404 const firehose = new Firehose(repoManager); 374 405 const app = createApp( 375 406 config, 376 - repoManager, 377 407 firehose, 378 408 undefined, 379 409 mockNetworkService, 410 + undefined, 411 + undefined, 412 + undefined, 413 + { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }, 380 414 ); 381 415 382 416 const res = await authGet(app, "/xrpc/org.p2pds.admin.getNetworkStatus"); ··· 412 446 const repoManager = new RepoManager(db, config); 413 447 repoManager.init(); 414 448 const firehose = new Firehose(repoManager); 415 - const app = createApp(config, repoManager, firehose); 449 + const app = createApp(config, firehose, undefined, undefined, undefined, undefined, undefined, { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }); 416 450 417 451 const res = await authGet(app, "/xrpc/org.p2pds.admin.getPolicies"); 418 452 expect(res.status).toBe(200); ··· 448 482 db, 449 483 config, 450 484 repoManager, 451 - { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {} }, 485 + { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {}, deleteBlock: async () => {} }, 452 486 mockNetworkService, 453 487 { resolve: async () => null } as any, 454 488 ); ··· 457 491 const firehose = new Firehose(repoManager); 458 492 const app = createApp( 459 493 config, 460 - repoManager, 461 494 firehose, 462 495 undefined, 463 496 mockNetworkService, 464 497 undefined, 465 498 replicationManager, 499 + undefined, 500 + { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }, 466 501 ); 467 502 468 503 const res = await authGet(app, "/xrpc/org.p2pds.admin.getPolicies"); ··· 514 549 db, 515 550 config, 516 551 repoManager, 517 - { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {} }, 552 + { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {}, deleteBlock: async () => {} }, 518 553 mockNetworkService, 519 554 { resolve: async () => null } as any, 520 555 undefined, ··· 526 561 const firehose = new Firehose(repoManager); 527 562 const app = createApp( 528 563 config, 529 - repoManager, 530 564 firehose, 531 565 undefined, 532 566 mockNetworkService, 533 567 undefined, 534 568 replicationManager, 569 + undefined, 570 + { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }, 535 571 ); 536 572 537 573 const res = await authGet(app, "/xrpc/org.p2pds.admin.getPolicies"); ··· 570 606 const repoManager = new RepoManager(db, config); 571 607 repoManager.init(); 572 608 const firehose = new Firehose(repoManager); 573 - const app = createApp(config, repoManager, firehose); 609 + const app = createApp(config, firehose, undefined, undefined, undefined, undefined, undefined, { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }); 574 610 575 611 const res = await noAuthGet(app, "/xrpc/org.p2pds.admin.dashboard"); 576 612 expect(res.status).toBe(200); ··· 585 621 expect(html).toContain('id="section-network"'); 586 622 expect(html).toContain('id="section-policies"'); 587 623 expect(html).toContain('id="section-verification"'); 624 + expect(html).toContain('id="add-did-input"'); 625 + expect(html).toContain('id="add-did-btn"'); 588 626 expect(html).toContain("<script>"); 589 627 expect(html).toContain("test-auth-token"); 590 628 }); 591 629 }); 630 + 631 + // ============================================ 632 + // addDid / removeDid 633 + // ============================================ 634 + 635 + describe("Admin: addDid / removeDid", () => { 636 + let tmpDir: string; 637 + let db: InstanceType<typeof Database>; 638 + let config: Config; 639 + let repoManager: RepoManager; 640 + let replicationManager: ReplicationManager; 641 + let app: ReturnType<typeof createApp>; 642 + 643 + const configDid = "did:plc:configdid1"; 644 + 645 + beforeEach(() => { 646 + tmpDir = mkdtempSync(join(tmpdir(), "admin-did-mgmt-test-")); 647 + db = new Database(join(tmpDir, "test.db")); 648 + config = testConfig(tmpDir, [configDid]); 649 + repoManager = new RepoManager(db, config); 650 + repoManager.init(); 651 + 652 + const mockNetworkService: NetworkService = { 653 + provideBlocks: async () => {}, 654 + publishCommitNotification: async () => {}, 655 + onCommitNotification: () => {}, 656 + subscribeCommitTopics: () => {}, 657 + unsubscribeCommitTopics: () => {}, 658 + getPeerId: () => null, 659 + getMultiaddrs: () => [], 660 + getConnectionCount: () => 0, 661 + getRemoteAddrs: () => [], 662 + publishIdentityNotification: async () => {}, 663 + onIdentityNotification: () => {}, 664 + subscribeIdentityTopics: () => {}, 665 + unsubscribeIdentityTopics: () => {}, 666 + }; 667 + 668 + const mockDidResolver = { 669 + resolve: async () => null, 670 + }; 671 + 672 + replicationManager = new ReplicationManager( 673 + db, 674 + config, 675 + repoManager, 676 + { putBlock: async () => {}, getBlock: async () => null, hasBlock: async () => false, putBlocks: async () => {}, deleteBlock: async () => {} }, 677 + mockNetworkService, 678 + mockDidResolver as any, 679 + ); 680 + replicationManager.getSyncStorage().initSchema(); 681 + 682 + const firehose = new Firehose(repoManager); 683 + app = createApp( 684 + config, 685 + firehose, 686 + undefined, 687 + mockNetworkService, 688 + undefined, 689 + replicationManager, 690 + undefined, 691 + { nodeDid: config.NODE_DID, nodePublicKeyMultibase: "", nodeRepoManager: repoManager, repoManager }, 692 + ); 693 + }); 694 + 695 + afterEach(() => { 696 + db.close(); 697 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 698 + }); 699 + 700 + it("returns 400 when did is missing", async () => { 701 + const res = await authPost(app, "/xrpc/org.p2pds.admin.addDid", {}); 702 + expect(res.status).toBe(400); 703 + const json = await res.json() as Record<string, unknown>; 704 + expect(json.error).toBe("MissingParameter"); 705 + }); 706 + 707 + it("returns 400 for invalid DID format", async () => { 708 + const res = await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: "not-a-did" }); 709 + expect(res.status).toBe(400); 710 + const json = await res.json() as Record<string, unknown>; 711 + expect(json.error).toBe("InvalidDid"); 712 + }); 713 + 714 + it("returns 400 when adding own DID", async () => { 715 + const res = await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: "did:plc:test123" }); 716 + expect(res.status).toBe(400); 717 + const json = await res.json() as Record<string, unknown>; 718 + expect(json.error).toBe("InvalidDid"); 719 + expect(json.message).toContain("own DID"); 720 + }); 721 + 722 + it("reports already_tracked for config DID", async () => { 723 + const res = await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: configDid }); 724 + expect(res.status).toBe(200); 725 + const json = await res.json() as Record<string, unknown>; 726 + expect(json.status).toBe("already_tracked"); 727 + expect(json.source).toBe("config"); 728 + }); 729 + 730 + it("adds a new DID", async () => { 731 + const newDid = "did:plc:newdid123"; 732 + const res = await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: newDid }); 733 + expect(res.status).toBe(200); 734 + const json = await res.json() as Record<string, unknown>; 735 + expect(json.status).toBe("added"); 736 + expect(json.did).toBe(newDid); 737 + 738 + // Verify it's in admin DIDs and getReplicateDids 739 + const syncStorage = replicationManager.getSyncStorage(); 740 + expect(syncStorage.isAdminDid(newDid)).toBe(true); 741 + expect(replicationManager.getReplicateDids()).toContain(newDid); 742 + expect(replicationManager.getDidSource(newDid)).toBe("admin"); 743 + }); 744 + 745 + it("idempotent: re-adding returns already_tracked", async () => { 746 + const newDid = "did:plc:idempotent1"; 747 + await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: newDid }); 748 + const res = await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: newDid }); 749 + expect(res.status).toBe(200); 750 + const json = await res.json() as Record<string, unknown>; 751 + expect(json.status).toBe("already_tracked"); 752 + expect(json.source).toBe("admin"); 753 + }); 754 + 755 + it("cannot remove a config DID", async () => { 756 + const res = await authPost(app, "/xrpc/org.p2pds.admin.removeDid", { did: configDid }); 757 + expect(res.status).toBe(400); 758 + const json = await res.json() as Record<string, unknown>; 759 + expect(json.error).toBe("CannotRemove"); 760 + }); 761 + 762 + it("removes an admin-added DID", async () => { 763 + const newDid = "did:plc:removable1"; 764 + await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: newDid }); 765 + 766 + const res = await authPost(app, "/xrpc/org.p2pds.admin.removeDid", { did: newDid }); 767 + expect(res.status).toBe(200); 768 + const json = await res.json() as Record<string, unknown>; 769 + expect(json.status).toBe("removed"); 770 + expect(json.purged).toBe(false); 771 + }); 772 + 773 + it("removes with purgeData deletes all tracking data", async () => { 774 + const newDid = "did:plc:purgeable1"; 775 + await authPost(app, "/xrpc/org.p2pds.admin.addDid", { did: newDid }); 776 + 777 + // Add some tracking data 778 + const syncStorage = replicationManager.getSyncStorage(); 779 + syncStorage.trackBlocks(newDid, ["bafyblock1"]); 780 + syncStorage.trackRecordPaths(newDid, ["app.bsky.feed.post/abc"]); 781 + 782 + const res = await authPost(app, "/xrpc/org.p2pds.admin.removeDid", { did: newDid, purgeData: true }); 783 + expect(res.status).toBe(200); 784 + const json = await res.json() as Record<string, unknown>; 785 + expect(json.status).toBe("removed"); 786 + expect(json.purged).toBe(true); 787 + 788 + // Verify data was purged 789 + expect(syncStorage.getBlockCids(newDid)).toEqual([]); 790 + expect(syncStorage.getRecordPaths(newDid)).toEqual([]); 791 + expect(syncStorage.isAdminDid(newDid)).toBe(false); 792 + }); 793 + 794 + it("getReplicateDids includes admin-added DIDs", () => { 795 + const syncStorage = replicationManager.getSyncStorage(); 796 + syncStorage.addAdminDid("did:plc:adminadded1"); 797 + 798 + const dids = replicationManager.getReplicateDids(); 799 + expect(dids).toContain(configDid); 800 + expect(dids).toContain("did:plc:adminadded1"); 801 + }); 802 + 803 + it("getDidSource returns correct sources", () => { 804 + const syncStorage = replicationManager.getSyncStorage(); 805 + syncStorage.addAdminDid("did:plc:adminadded1"); 806 + 807 + expect(replicationManager.getDidSource(configDid)).toBe("config"); 808 + expect(replicationManager.getDidSource("did:plc:adminadded1")).toBe("admin"); 809 + expect(replicationManager.getDidSource("did:plc:unknown")).toBeNull(); 810 + }); 811 + });
+175 -3
src/xrpc/admin.ts
··· 5 5 6 6 const VERSION = "0.1.0"; 7 7 8 + const DID_REGEX = /^did:[a-z]+:[a-zA-Z0-9._:%-]+$/; 9 + 10 + function isValidDid(did: string): boolean { 11 + return DID_REGEX.test(did) && did.length <= 2048; 12 + } 13 + 8 14 export function getOverview( 9 15 c: Context<AuthedAppEnv>, 16 + nodeDid: string, 10 17 networkService: NetworkService | undefined, 11 18 replicationManager: ReplicationManager | undefined, 12 19 ): Response { ··· 27 34 const syncStates = replicationManager.getSyncStates(); 28 35 const syncStorage = replicationManager.getSyncStorage(); 29 36 const aggregate = syncStorage.getAggregateMetrics(); 37 + const didSources: Record<string, string> = {}; 38 + for (const s of syncStates) { 39 + didSources[s.did] = replicationManager.getDidSource(s.did) ?? "unknown"; 40 + } 30 41 replication = { 31 42 enabled: true, 32 43 trackedDids: syncStates.map((s) => s.did), 33 44 syncStates, 34 45 aggregate, 46 + didSources, 35 47 }; 36 48 37 49 firehose = replicationManager.getFirehoseStats() ?? null; ··· 54 66 55 67 return c.json({ 56 68 version: VERSION, 57 - did: c.env.DID, 69 + nodeDid, 70 + did: c.env.DID ?? nodeDid, 58 71 network, 59 72 replication, 60 73 firehose, ··· 195 208 .verify-pass { color: #22c55e; font-weight: 600; } 196 209 .verify-fail { color: #ef4444; font-weight: 600; } 197 210 .loading { color: #999; font-style: italic; } 211 + .add-did-form { display: flex; gap: 0.5rem; margin-bottom: 0.8rem; } 212 + .add-did-form input { 213 + flex: 1; padding: 0.4rem 0.6rem; font-family: inherit; font-size: 0.85rem; 214 + border: 1px solid #ccc; border-radius: 4px; outline: none; 215 + } 216 + .add-did-form input:focus { border-color: #000; } 217 + .add-did-form button, .btn-remove { 218 + padding: 0.4rem 0.8rem; font-family: inherit; font-size: 0.82rem; 219 + border: 1px solid #000; border-radius: 4px; cursor: pointer; background: #fff; 220 + } 221 + .add-did-form button:hover { background: #000; color: #fff; } 222 + .btn-remove { border-color: #ef4444; color: #ef4444; padding: 0.2rem 0.5rem; font-size: 0.75rem; } 223 + .btn-remove:hover { background: #ef4444; color: #fff; } 224 + .did-source { font-size: 0.7rem; padding: 1px 6px; border-radius: 3px; } 225 + .did-source-config { background: #e0e7ff; color: #3730a3; } 226 + .did-source-admin { background: #d1fae5; color: #065f46; } 227 + .did-source-policy { background: #fef3c7; color: #92400e; } 228 + .did-source-unknown { background: #f3f4f6; color: #6b7280; } 229 + .add-did-error { color: #ef4444; font-size: 0.82rem; margin-bottom: 0.5rem; min-height: 1.2em; } 230 + .add-did-success { color: #22c55e; font-size: 0.82rem; margin-bottom: 0.5rem; min-height: 1.2em; } 198 231 </style> 199 232 </head> 200 233 <body> ··· 219 252 220 253 <section class="card" id="section-replication"> 221 254 <h2>Replicated DIDs</h2> 255 + <div class="add-did-form"> 256 + <input type="text" id="add-did-input" placeholder="did:plc:..." autocomplete="off"> 257 + <button id="add-did-btn">Add DID</button> 258 + </div> 259 + <div id="add-did-msg" class="add-did-error"></div> 222 260 <div id="replication-content" class="loading">Loading...</div> 223 261 </section> 224 262 ··· 291 329 return '<span class="source-badge ' + cls + '">' + esc(sourceType || "pds") + '</span>'; 292 330 } 293 331 332 + function didSourceBadge(source) { 333 + var cls = "did-source did-source-" + (source || "unknown"); 334 + return '<span class="' + cls + '">' + esc(source || "unknown") + '</span>'; 335 + } 336 + 337 + async function apiPost(endpoint, body) { 338 + const url = new URL("/xrpc/" + endpoint, location.origin); 339 + const res = await fetch(url, { 340 + method: "POST", 341 + headers: { ...HEADERS, "Content-Type": "application/json" }, 342 + body: JSON.stringify(body), 343 + }); 344 + return res.json(); 345 + } 346 + 294 347 function renderOverview(data) { 295 348 const el = document.getElementById("overview-content"); 296 349 const net = data.network || {}; ··· 327 380 const repl = data.replication; 328 381 if (!repl || !repl.enabled) { el.innerHTML = "Replication disabled"; return; } 329 382 const states = repl.syncStates || []; 383 + const sources = repl.didSources || {}; 330 384 if (states.length === 0) { el.innerHTML = "No tracked DIDs"; return; } 331 - let html = "<table><thead><tr><th>DID</th><th>Status</th><th>Last Sync</th><th>Error</th></tr></thead><tbody>"; 385 + let html = "<table><thead><tr><th>DID</th><th>Source</th><th>Status</th><th>Last Sync</th><th>Error</th><th></th></tr></thead><tbody>"; 332 386 for (const s of states) { 333 387 const st = s.status || "pending"; 388 + const src = sources[s.did] || "unknown"; 334 389 const rid = "detail-" + s.did.replace(/[^a-zA-Z0-9]/g, "_"); 335 390 html += '<tr class="clickable" data-did="' + esc(s.did) + '" data-rid="' + rid + '">' 336 391 + "<td>" + esc(s.did) + "</td>" 392 + + "<td>" + didSourceBadge(src) + "</td>" 337 393 + "<td>" + statusDot(st) + "</td>" 338 394 + "<td>" + timeAgo(s.lastSyncAt) + "</td>" 339 395 + "<td>" + esc(s.errorMessage || "-") + "</td>" 396 + + "<td>" + (src === "admin" ? '<button class="btn-remove" data-did="' + esc(s.did) + '">Remove</button>' : "") + "</td>" 340 397 + "</tr>"; 341 - html += '<tr class="detail-row" id="' + rid + '" style="display:none"><td colspan="4"><div class="detail-inner loading">Click to load...</div></td></tr>'; 398 + html += '<tr class="detail-row" id="' + rid + '" style="display:none"><td colspan="6"><div class="detail-inner loading">Click to load...</div></td></tr>'; 342 399 } 343 400 html += "</tbody></table>"; 344 401 el.innerHTML = html; ··· 382 439 } 383 440 }); 384 441 }); 442 + 443 + // Remove button handlers (stop propagation to prevent row click) 444 + el.querySelectorAll(".btn-remove").forEach(function(btn) { 445 + btn.addEventListener("click", async function(e) { 446 + e.stopPropagation(); 447 + var did = this.dataset.did; 448 + if (!confirm("Remove " + did + "?\\n\\nData will be kept (paused). To purge data, use the API with purgeData: true.")) return; 449 + var msgEl = document.getElementById("add-did-msg"); 450 + try { 451 + var result = await apiPost("org.p2pds.admin.removeDid", { did: did }); 452 + if (result.error) { msgEl.className = "add-did-error"; msgEl.textContent = result.message || result.error; } 453 + else { msgEl.className = "add-did-success"; msgEl.textContent = "Removed " + did; refresh(); } 454 + } catch (err) { msgEl.className = "add-did-error"; msgEl.textContent = "Error: " + err.message; } 455 + }); 456 + }); 385 457 } 386 458 387 459 function renderSyncHistory(history) { ··· 480 552 }); 481 553 482 554 refresh(); 555 + 556 + // Add DID button handler 557 + document.getElementById("add-did-btn").addEventListener("click", async function() { 558 + var input = document.getElementById("add-did-input"); 559 + var msgEl = document.getElementById("add-did-msg"); 560 + var did = input.value.trim(); 561 + if (!did) { msgEl.className = "add-did-error"; msgEl.textContent = "Enter a DID"; return; } 562 + msgEl.className = ""; msgEl.textContent = ""; 563 + try { 564 + var result = await apiPost("org.p2pds.admin.addDid", { did: did }); 565 + if (result.error) { 566 + msgEl.className = "add-did-error"; msgEl.textContent = result.message || result.error; 567 + } else { 568 + msgEl.className = "add-did-success"; 569 + msgEl.textContent = result.status === "already_tracked" 570 + ? did + " already tracked (source: " + result.source + ")" 571 + : "Added " + did; 572 + input.value = ""; 573 + refresh(); 574 + } 575 + } catch (err) { msgEl.className = "add-did-error"; msgEl.textContent = "Error: " + err.message; } 576 + }); 577 + 578 + document.getElementById("add-did-input").addEventListener("keydown", function(e) { 579 + if (e.key === "Enter") document.getElementById("add-did-btn").click(); 580 + }); 483 581 </script> 484 582 </body> 485 583 </html>`; ··· 506 604 const history = syncStorage.getSyncHistory(did, limit); 507 605 508 606 return c.json({ history }); 607 + } 608 + 609 + export async function addDid( 610 + c: Context<AuthedAppEnv>, 611 + nodeDid: string, 612 + replicationManager: ReplicationManager | undefined, 613 + ): Promise<Response> { 614 + if (!replicationManager) { 615 + return c.json( 616 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 617 + 400, 618 + ); 619 + } 620 + 621 + const body = await c.req.json<{ did?: string }>().catch(() => ({}) as { did?: string }); 622 + const did = body.did; 623 + if (!did || typeof did !== "string") { 624 + return c.json( 625 + { error: "MissingParameter", message: "did is required" }, 626 + 400, 627 + ); 628 + } 629 + 630 + if (!isValidDid(did)) { 631 + return c.json( 632 + { error: "InvalidDid", message: "Invalid DID format" }, 633 + 400, 634 + ); 635 + } 636 + 637 + if (did === nodeDid || did === c.env.DID) { 638 + return c.json( 639 + { error: "InvalidDid", message: "Cannot replicate own DID" }, 640 + 400, 641 + ); 642 + } 643 + 644 + const result = await replicationManager.addDid(did); 645 + return c.json({ did, ...result }); 646 + } 647 + 648 + export async function removeDid( 649 + c: Context<AuthedAppEnv>, 650 + replicationManager: ReplicationManager | undefined, 651 + ): Promise<Response> { 652 + if (!replicationManager) { 653 + return c.json( 654 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 655 + 400, 656 + ); 657 + } 658 + 659 + const body = await c.req.json<{ did?: string; purgeData?: boolean }>().catch(() => ({}) as { did?: string; purgeData?: boolean }); 660 + const did = body.did; 661 + if (!did || typeof did !== "string") { 662 + return c.json( 663 + { error: "MissingParameter", message: "did is required" }, 664 + 400, 665 + ); 666 + } 667 + 668 + if (!isValidDid(did)) { 669 + return c.json( 670 + { error: "InvalidDid", message: "Invalid DID format" }, 671 + 400, 672 + ); 673 + } 674 + 675 + const result = await replicationManager.removeDid(did, body.purgeData ?? false); 676 + if (result.status === "error") { 677 + return c.json({ did, error: "CannotRemove", message: result.error }, 400); 678 + } 679 + 680 + return c.json({ did, ...result }); 509 681 } 510 682 511 683 export function getPolicies(
+14 -14
src/xrpc/repo.ts
··· 59 59 ); 60 60 } 61 61 62 - if (repo !== c.env.DID) { 62 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 63 63 return c.json( 64 64 { error: "RepoNotFound", message: `Repository not found: ${repo}` }, 65 65 404, ··· 69 69 const data = await repoManager.describeRepo(); 70 70 71 71 return c.json({ 72 - did: c.env.DID, 73 - handle: c.env.HANDLE, 72 + did: c.env.DID ?? c.env.NODE_DID, 73 + handle: c.env.HANDLE ?? c.env.PDS_HOSTNAME, 74 74 didDoc: { 75 75 "@context": ["https://www.w3.org/ns/did/v1"], 76 - id: c.env.DID, 77 - alsoKnownAs: [`at://${c.env.HANDLE}`], 76 + id: c.env.DID ?? c.env.NODE_DID, 77 + alsoKnownAs: [`at://${c.env.HANDLE ?? c.env.PDS_HOSTNAME}`], 78 78 verificationMethod: [ 79 79 { 80 - id: `${c.env.DID}#atproto`, 80 + id: `${c.env.DID ?? c.env.NODE_DID}#atproto`, 81 81 type: "Multikey", 82 - controller: c.env.DID, 83 - publicKeyMultibase: c.env.SIGNING_KEY_PUBLIC, 82 + controller: c.env.DID ?? c.env.NODE_DID, 83 + publicKeyMultibase: c.env.SIGNING_KEY_PUBLIC ?? "", 84 84 }, 85 85 ], 86 86 }, ··· 114 114 ); 115 115 } 116 116 117 - if (repo !== c.env.DID) { 117 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 118 118 return c.json( 119 119 { error: "RepoNotFound", message: `Repository not found: ${repo}` }, 120 120 404, ··· 167 167 ); 168 168 } 169 169 170 - if (repo !== c.env.DID) { 170 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 171 171 return c.json( 172 172 { error: "RepoNotFound", message: `Repository not found: ${repo}` }, 173 173 404, ··· 203 203 ); 204 204 } 205 205 206 - if (repo !== c.env.DID) { 206 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 207 207 return c.json( 208 208 { error: "InvalidRepo", message: `Invalid repository: ${repo}` }, 209 209 400, ··· 243 243 ); 244 244 } 245 245 246 - if (repo !== c.env.DID) { 246 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 247 247 return c.json( 248 248 { error: "InvalidRepo", message: `Invalid repository: ${repo}` }, 249 249 400, ··· 288 288 ); 289 289 } 290 290 291 - if (repo !== c.env.DID) { 291 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 292 292 return c.json( 293 293 { error: "InvalidRepo", message: `Invalid repository: ${repo}` }, 294 294 400, ··· 334 334 ); 335 335 } 336 336 337 - if (repo !== c.env.DID) { 337 + if (repo !== (c.env.DID ?? c.env.NODE_DID)) { 338 338 return c.json( 339 339 { error: "InvalidRepo", message: `Invalid repository: ${repo}` }, 340 340 400,
+24 -17
src/xrpc/server.ts
··· 13 13 14 14 export async function describeServer(c: Context<AppEnv>): Promise<Response> { 15 15 return c.json({ 16 - did: c.env.DID, 16 + did: c.env.DID ?? c.env.NODE_DID, 17 17 availableUserDomains: [], 18 18 inviteCodeRequired: false, 19 19 }); ··· 37 37 ); 38 38 } 39 39 40 - if (identifier !== c.env.HANDLE && identifier !== c.env.DID) { 40 + if (identifier !== c.env.HANDLE && identifier !== (c.env.DID ?? c.env.NODE_DID)) { 41 41 return c.json( 42 42 { 43 43 error: "AuthenticationRequired", ··· 61 61 const serviceDid = `did:web:${c.env.PDS_HOSTNAME}`; 62 62 const accessJwt = await createAccessToken( 63 63 c.env.JWT_SECRET, 64 - c.env.DID, 64 + c.env.DID ?? c.env.NODE_DID, 65 65 serviceDid, 66 66 ); 67 67 const refreshJwt = await createRefreshToken( 68 68 c.env.JWT_SECRET, 69 - c.env.DID, 69 + c.env.DID ?? c.env.NODE_DID, 70 70 serviceDid, 71 71 ); 72 72 ··· 76 76 return c.json({ 77 77 accessJwt, 78 78 refreshJwt, 79 - handle: c.env.HANDLE, 80 - did: c.env.DID, 79 + handle: c.env.HANDLE ?? c.env.PDS_HOSTNAME, 80 + did: c.env.DID ?? c.env.NODE_DID, 81 81 ...(email ? { email } : {}), 82 82 emailConfirmed: true, 83 83 active: true, ··· 107 107 serviceDid, 108 108 ); 109 109 110 - if (payload.sub !== c.env.DID) { 110 + if (payload.sub !== (c.env.DID ?? c.env.NODE_DID)) { 111 111 return c.json( 112 112 { 113 113 error: "AuthenticationRequired", ··· 119 119 120 120 const accessJwt = await createAccessToken( 121 121 c.env.JWT_SECRET, 122 - c.env.DID, 122 + c.env.DID ?? c.env.NODE_DID, 123 123 serviceDid, 124 124 ); 125 125 const refreshJwt = await createRefreshToken( 126 126 c.env.JWT_SECRET, 127 - c.env.DID, 127 + c.env.DID ?? c.env.NODE_DID, 128 128 serviceDid, 129 129 ); 130 130 ··· 134 134 return c.json({ 135 135 accessJwt, 136 136 refreshJwt, 137 - handle: c.env.HANDLE, 138 - did: c.env.DID, 137 + handle: c.env.HANDLE ?? c.env.PDS_HOSTNAME, 138 + did: c.env.DID ?? c.env.NODE_DID, 139 139 ...(email ? { email } : {}), 140 140 emailConfirmed: true, 141 141 active: true, ··· 177 177 const { email: storedEmail } = await repoManager.getEmail(); 178 178 const email = storedEmail || c.env.EMAIL; 179 179 return c.json({ 180 - handle: c.env.HANDLE, 181 - did: c.env.DID, 180 + handle: c.env.HANDLE ?? c.env.PDS_HOSTNAME, 181 + did: c.env.DID ?? c.env.NODE_DID, 182 182 ...(email ? { email } : {}), 183 183 emailConfirmed: true, 184 184 active: true, ··· 192 192 serviceDid, 193 193 ); 194 194 195 - if (payload.sub !== c.env.DID) { 195 + if (payload.sub !== (c.env.DID ?? c.env.NODE_DID)) { 196 196 return c.json( 197 197 { 198 198 error: "AuthenticationRequired", ··· 205 205 const { email: storedEmail } = await repoManager.getEmail(); 206 206 const email = storedEmail || c.env.EMAIL; 207 207 return c.json({ 208 - handle: c.env.HANDLE, 209 - did: c.env.DID, 208 + handle: c.env.HANDLE ?? c.env.PDS_HOSTNAME, 209 + did: c.env.DID ?? c.env.NODE_DID, 210 210 ...(email ? { email } : {}), 211 211 emailConfirmed: true, 212 212 active: true, ··· 291 291 ); 292 292 } 293 293 294 + if (!c.env.SIGNING_KEY) { 295 + return c.json( 296 + { error: "NotSupported", message: "Service auth requires a social account signing key" }, 297 + 400, 298 + ); 299 + } 300 + 294 301 const keypair = await getSigningKeypair(c.env.SIGNING_KEY); 295 302 const token = await createServiceJwt({ 296 - iss: c.env.DID, 303 + iss: c.env.DID ?? c.env.NODE_DID, 297 304 aud, 298 305 lxm, 299 306 keypair,
+6 -6
src/xrpc/sync.ts
··· 32 32 } 33 33 34 34 // Local DID: serve from RepoManager 35 - if (did === c.env.DID) { 35 + if (did === (c.env.DID ?? c.env.NODE_DID)) { 36 36 const carBytes = await repoManager.getRepoCar(); 37 37 return new Response(carBytes, { 38 38 status: 200, ··· 94 94 ); 95 95 } 96 96 97 - if (did === c.env.DID) { 97 + if (did === (c.env.DID ?? c.env.NODE_DID)) { 98 98 const data = await repoManager.getRepoStatus(); 99 99 return c.json({ 100 100 did: data.did, ··· 179 179 ); 180 180 } 181 181 182 - if (did !== c.env.DID) { 182 + if (did !== (c.env.DID ?? c.env.NODE_DID)) { 183 183 return c.json( 184 184 { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 185 185 404, ··· 234 234 } 235 235 236 236 // Local DID: serve from RepoManager 237 - if (did === c.env.DID) { 237 + if (did === (c.env.DID ?? c.env.NODE_DID)) { 238 238 const carBytes = await repoManager.getBlocks(cidsParam); 239 239 return new Response(carBytes, { 240 240 status: 200, ··· 302 302 } 303 303 304 304 // Local DID: serve from filesystem BlobStore 305 - if (did === c.env.DID) { 305 + if (did === (c.env.DID ?? c.env.NODE_DID)) { 306 306 if (!repoManager.blobStore) { 307 307 return c.json( 308 308 { error: "ServiceUnavailable", message: "Blob storage is not configured" }, ··· 406 406 ); 407 407 } 408 408 409 - if (did !== c.env.DID) { 409 + if (did !== (c.env.DID ?? c.env.NODE_DID)) { 410 410 return c.json( 411 411 { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 412 412 404,