atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Defer IPFS startup until identity is established

IPFS+libp2p no longer starts unconditionally at boot. Instead, it waits
for a DID — either loaded from stored identity on restart, or established
via first OAuth login. This avoids wasting CPU connecting to peers when
there's nothing to replicate.

+128 -84
+9 -7
src/index.ts
··· 48 48 oauthClientManager?: OAuthClientManager, 49 49 pdsClientRef?: PdsClientRef, 50 50 db?: Database.Database, 51 + onIdentityEstablished?: () => Promise<void>, 51 52 ) { 52 - const configDid = config.DID ?? ""; 53 + // Live getter — config.DID may be set later by OAuth callback 54 + const getConfigDid = () => config.DID ?? ""; 53 55 54 56 const app = new Hono<{ Bindings: Config }>(); 55 57 ··· 76 78 // OAuth routes (before auth middleware — these handle browser redirect flow) 77 79 // ============================================ 78 80 if (oauthClientManager && pdsClientRef && db) { 79 - registerOAuthRoutes(app, config, oauthClientManager.client, pdsClientRef, db, networkService, oauthClientManager.sessionStore, replicationManager); 81 + registerOAuthRoutes(app, config, oauthClientManager.client, pdsClientRef, db, networkService, oauthClientManager.sessionStore, replicationManager, onIdentityEstablished); 80 82 } 81 83 82 84 // ============================================ ··· 458 460 "/xrpc/gg.mk.experimental.emitIdentityEvent", 459 461 requireAuth, 460 462 async (c) => { 461 - const result = await rm.emitIdentityEvent(config.HANDLE ?? config.PDS_HOSTNAME); 463 + const result = await rm.emitIdentityEvent(config.HANDLE ?? config.PDS_HOSTNAME ?? "unknown"); 462 464 return c.json(result); 463 465 }, 464 466 ); ··· 513 515 const challenge = (await c.req.json()) as StorageChallenge; 514 516 515 517 // Validate challenge is targeted at this node's configured DID 516 - if (challenge.targetDid !== configDid) { 518 + if (challenge.targetDid !== getConfigDid()) { 517 519 return c.json( 518 520 { error: "InvalidChallenge", message: "Challenge is not targeted at this node" }, 519 521 400, ··· 543 545 const response = await respondToChallenge( 544 546 challenge, 545 547 blockStore, 546 - configDid, 548 + getConfigDid(), 547 549 ); 548 550 return c.json(serializeResponse(response)); 549 551 }); ··· 653 655 // App monitoring 654 656 // ============================================ 655 657 app.get("/xrpc/org.p2pds.app.getOverview", requireAuth, (c) => 656 - app_routes.getOverview(c, configDid, networkService, replicationManager), 658 + app_routes.getOverview(c, getConfigDid(), networkService, replicationManager), 657 659 ); 658 660 app.get("/xrpc/org.p2pds.app.getDidStatus", requireAuth, (c) => 659 661 app_routes.getDidStatus(c, replicationManager), ··· 668 670 app_routes.getSyncHistory(c, replicationManager), 669 671 ); 670 672 app.post("/xrpc/org.p2pds.app.addDid", requireAuth, (c) => 671 - app_routes.addDid(c, configDid, replicationManager), 673 + app_routes.addDid(c, getConfigDid(), replicationManager), 672 674 ); 673 675 app.post("/xrpc/org.p2pds.app.removeDid", requireAuth, (c) => 674 676 app_routes.removeDid(c, replicationManager),
+27 -2
src/oauth/routes.ts
··· 32 32 networkService?: NetworkService, 33 33 sessionStore?: import("./stores.js").OAuthSessionStore, 34 34 replicationManager?: ReplicationManager, 35 + onIdentityEstablished?: () => Promise<void>, 35 36 ): void { 36 37 /** 37 38 * Start OAuth login flow. ··· 80 81 } 81 82 82 83 // First login: establish node identity 83 - if (!config.DID) { 84 + const isFirstLogin = !config.DID; 85 + if (isFirstLogin) { 84 86 config.DID = did; 85 87 86 88 // Resolve handle for display ··· 127 129 ); 128 130 } 129 131 132 + // First login: trigger deferred IPFS startup, then re-publish peer record 133 + // with now-available peerId/multiaddrs 134 + if (isFirstLogin && onIdentityEstablished) { 135 + onIdentityEstablished() 136 + .then(async () => { 137 + try { 138 + await publishPeerRecord(pdsClientRef.current!, networkService); 139 + } catch (err) { 140 + console.warn( 141 + "[oauth] Failed to re-publish peer record after IPFS start:", 142 + err instanceof Error ? err.message : String(err), 143 + ); 144 + } 145 + }) 146 + .catch((err) => { 147 + console.error( 148 + "[oauth] IPFS startup after login failed:", 149 + err instanceof Error ? err.message : String(err), 150 + ); 151 + }); 152 + } 153 + 130 154 // Self-replicate: sync own repo to seed the local blockstore 131 - if (replicationManager && config.DID) { 155 + // (skipped on first login — startIpfsReplication handles addDid) 156 + if (!isFirstLogin && replicationManager && config.DID) { 132 157 replicationManager.addDid(config.DID).catch((err) => { 133 158 console.warn( 134 159 "[oauth] Self-replication failed:",
+92 -75
src/start.ts
··· 94 94 // Initialize IPFS service (if enabled) 95 95 let ipfsService: IpfsService | undefined; 96 96 if (config.IPFS_ENABLED) { 97 - const ipfsBlocksPath = resolve(dataDir, "ipfs", "blocks"); 98 - const ipfsDatastorePath = resolve(dataDir, "ipfs", "datastore"); 99 - mkdirSync(ipfsBlocksPath, { recursive: true }); 100 - mkdirSync(ipfsDatastorePath, { recursive: true }); 101 - 102 97 ipfsService = new IpfsService({ 103 - blocksPath: ipfsBlocksPath, 104 - datastorePath: ipfsDatastorePath, 98 + db, 105 99 networking: config.IPFS_NETWORKING, 106 100 }); 107 101 } ··· 172 166 ipfsService.setRateLimiter(rateLimiter); 173 167 } 174 168 169 + // --- Lazy IPFS startup: deferred until identity is established --- 170 + let ipfsStarted = false; 171 + 172 + async function startIpfsReplication(): Promise<void> { 173 + if (ipfsStarted || !ipfsService) return; 174 + ipfsStarted = true; 175 + 176 + try { 177 + console.log(pc.dim(` IPFS: starting...`)); 178 + await ipfsService.start(); 179 + const peerId = ipfsService.getPeerId(); 180 + if (peerId) { 181 + console.log(pc.dim(` PeerID: ${peerId}`)); 182 + } else { 183 + console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`)); 184 + } 185 + await backfillIpfs(); 186 + // Start replication after IPFS is ready 187 + if (replicationManager) { 188 + try { 189 + await replicationManager.init(); 190 + 191 + // Auto-track the node's own DID (idempotent — no-op if already tracked) 192 + if (config.DID) { 193 + await replicationManager.addDid(config.DID).catch((err) => { 194 + console.warn("[start] Self-replication addDid failed:", err instanceof Error ? err.message : String(err)); 195 + }); 196 + } 197 + 198 + // Register libp2p repo sync protocol and set libp2p on ReplicationManager 199 + const libp2pForSync = ipfsService?.getLibp2p(); 200 + if (libp2pForSync) { 201 + const syncStorage = replicationManager.getSyncStorage(); 202 + registerRepoSyncProtocol( 203 + libp2pForSync as Libp2p, 204 + ipfsService!, 205 + syncStorage, 206 + ); 207 + replicationManager.setLibp2p(libp2pForSync as Libp2p); 208 + console.log(pc.dim(` Repo sync: libp2p protocol registered`)); 209 + } 210 + 211 + replicationManager.startPeriodicSync(); 212 + const trackedDids = replicationManager.getReplicateDids(); 213 + console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`)); 214 + // Start firehose subscription for real-time updates 215 + if (config.FIREHOSE_ENABLED) { 216 + replicationManager.startFirehose(); 217 + } 218 + // Start challenge scheduler if policy engine is available 219 + if (policyEngine) { 220 + const libp2pNode = ipfsService?.getLibp2p(); 221 + let challengeTransport: ChallengeTransport; 222 + if (libp2pNode) { 223 + const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 224 + const httpTransport = new HttpChallengeTransport(); 225 + const syncStorage = replicationManager.getSyncStorage(); 226 + challengeTransport = new FailoverChallengeTransport( 227 + libp2pTransport, 228 + httpTransport, 229 + { 230 + resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 231 + onFallback: (endpoint, error) => { 232 + console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 233 + replicationManager!.refreshPeerInfoForEndpoint(endpoint); 234 + }, 235 + }, 236 + ); 237 + } else { 238 + challengeTransport = new HttpChallengeTransport(); 239 + } 240 + replicationManager.startChallengeScheduler(challengeTransport); 241 + console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`)); 242 + } 243 + } catch (err) { 244 + console.error(pc.red(` Replication startup failed:`), err); 245 + } 246 + } 247 + } catch (err) { 248 + ipfsStarted = false; // Allow retry on failure 249 + console.error(pc.red(` IPFS startup failed:`), err); 250 + } 251 + } 252 + 175 253 // Create Hono app 176 254 const app = createApp( 177 255 config, ··· 186 264 oauthClientManager, 187 265 pdsClientRef, 188 266 db, 267 + startIpfsReplication, 189 268 ); 190 269 191 270 // Create HTTP server using @hono/node-server's request listener ··· 282 361 } 283 362 284 363 // Start IPFS after HTTP server is listening (IPFS startup can be slow) 285 - if (ipfsService) { 286 - try { 287 - console.log(pc.dim(` IPFS: starting...`)); 288 - await ipfsService.start(); 289 - const peerId = ipfsService.getPeerId(); 290 - if (peerId) { 291 - console.log(pc.dim(` PeerID: ${peerId}`)); 292 - } else { 293 - console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`)); 294 - } 295 - await backfillIpfs(); 296 - // Start replication after IPFS is ready 297 - if (replicationManager) { 298 - try { 299 - await replicationManager.init(); 300 - 301 - // Register libp2p repo sync protocol and set libp2p on ReplicationManager 302 - const libp2pForSync = ipfsService?.getLibp2p(); 303 - if (libp2pForSync) { 304 - const syncStorage = replicationManager.getSyncStorage(); 305 - registerRepoSyncProtocol( 306 - libp2pForSync as Libp2p, 307 - ipfsService!, 308 - syncStorage, 309 - ); 310 - replicationManager.setLibp2p(libp2pForSync as Libp2p); 311 - console.log(pc.dim(` Repo sync: libp2p protocol registered`)); 312 - } 313 - 314 - replicationManager.startPeriodicSync(); 315 - const trackedDids = replicationManager.getReplicateDids(); 316 - console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`)); 317 - // Start firehose subscription for real-time updates 318 - if (config.FIREHOSE_ENABLED) { 319 - replicationManager.startFirehose(); 320 - } 321 - // Start challenge scheduler if policy engine is available 322 - if (policyEngine) { 323 - const libp2pNode = ipfsService?.getLibp2p(); 324 - let challengeTransport: ChallengeTransport; 325 - if (libp2pNode) { 326 - const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 327 - const httpTransport = new HttpChallengeTransport(); 328 - const syncStorage = replicationManager.getSyncStorage(); 329 - challengeTransport = new FailoverChallengeTransport( 330 - libp2pTransport, 331 - httpTransport, 332 - { 333 - resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 334 - onFallback: (endpoint, error) => { 335 - console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 336 - replicationManager!.refreshPeerInfoForEndpoint(endpoint); 337 - }, 338 - }, 339 - ); 340 - } else { 341 - challengeTransport = new HttpChallengeTransport(); 342 - } 343 - replicationManager.startChallengeScheduler(challengeTransport); 344 - console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`)); 345 - } 346 - } catch (err) { 347 - console.error(pc.red(` Replication startup failed:`), err); 348 - } 349 - } 350 - } catch (err) { 351 - console.error(pc.red(` IPFS startup failed:`), err); 352 - } 364 + if (ipfsService && config.DID) { 365 + await startIpfsReplication(); 366 + } else if (ipfsService) { 367 + console.log(pc.dim(` IPFS: waiting for login`)); 353 368 } 354 369 355 370 console.log(); ··· 366 381 replicationManager.stop(); 367 382 } 368 383 wss.close(); 384 + // Force-close keep-alive connections so httpServer.close() resolves promptly 385 + httpServer.closeAllConnections(); 369 386 await new Promise<void>((res) => httpServer.close(() => res())); 370 387 if (ipfsService) { 371 388 await ipfsService.stop();