atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Allow DID-less server startup with lazy identity from OAuth

Server now starts with just infrastructure config (PORT, AUTH_TOKEN, etc.)
and gets its identity from OAuth login. node_identity table persists the
DID across restarts. RepoManager is optional throughout — Firehose,
ReplicationManager, and OAuth routes all handle its absence gracefully.

+218 -104
+12 -5
src/firehose.ts
··· 15 15 export class Firehose { 16 16 private clients = new Set<WebSocket>(); 17 17 18 - constructor(private repoManager: RepoManager) { 19 - // Register as the event handler on the repo manager 20 - this.repoManager.onFirehoseEvent = (event) => { 21 - this.broadcast(event); 22 - }; 18 + constructor(private repoManager?: RepoManager) { 19 + // Register as the event handler on the repo manager (if present) 20 + if (this.repoManager) { 21 + this.repoManager.onFirehoseEvent = (event) => { 22 + this.broadcast(event); 23 + }; 24 + } 23 25 } 24 26 25 27 /** ··· 77 79 * Backfill events from a cursor. 78 80 */ 79 81 private async backfill(ws: WebSocket, cursor: number): Promise<void> { 82 + if (!this.repoManager) { 83 + // No repo manager — no local events to backfill 84 + return; 85 + } 86 + 80 87 const latestSeq = this.repoManager.sequencer.getLatestSeq(); 81 88 82 89 if (cursor > latestSeq) {
+6 -5
src/index.ts
··· 25 25 import type { StorageChallenge } from "./replication/challenge-response/types.js"; 26 26 import { MAX_RECORD_PATHS, MAX_BLOCK_CIDS } from "./replication/challenge-response/types.js"; 27 27 import { generateMstProof } from "./replication/mst-proof.js"; 28 - import { registerOAuthRoutes } from "./oauth/routes.js"; 28 + import { registerOAuthRoutes, type PdsClientRef } from "./oauth/routes.js"; 29 29 import type { OAuthClientManager } from "./oauth/client.js"; 30 - import type { PdsClient } from "./oauth/pds-client.js"; 30 + import type Database from "better-sqlite3"; 31 31 32 32 const VERSION = "0.1.0"; 33 33 ··· 46 46 repoManager?: RepoManager, 47 47 rateLimiter?: RateLimiter, 48 48 oauthClientManager?: OAuthClientManager, 49 - pdsClient?: PdsClient, 49 + pdsClientRef?: PdsClientRef, 50 + db?: Database.Database, 50 51 ) { 51 52 const configDid = config.DID ?? ""; 52 53 ··· 74 75 // ============================================ 75 76 // OAuth routes (before auth middleware — these handle browser redirect flow) 76 77 // ============================================ 77 - if (oauthClientManager && pdsClient) { 78 - registerOAuthRoutes(app, config, oauthClientManager.client, pdsClient, networkService); 78 + if (oauthClientManager && pdsClientRef && db) { 79 + registerOAuthRoutes(app, config, oauthClientManager.client, pdsClientRef, db, networkService, oauthClientManager.sessionStore); 79 80 } 80 81 81 82 // ============================================
+71 -6
src/oauth/routes.ts
··· 8 8 9 9 import type { Hono } from "hono"; 10 10 import type { NodeOAuthClient } from "@atproto/oauth-client-node"; 11 + import type Database from "better-sqlite3"; 11 12 import type { Config } from "../config.js"; 12 - import type { PdsClient } from "./pds-client.js"; 13 + import { PdsClient } from "./pds-client.js"; 13 14 import type { NetworkService } from "../ipfs.js"; 14 15 16 + /** 17 + * Mutable holder for PdsClient — allows OAuth callback to create/update 18 + * the client after identity is established. 19 + */ 20 + export interface PdsClientRef { 21 + current: PdsClient | undefined; 22 + } 23 + 15 24 // eslint-disable-next-line @typescript-eslint/no-explicit-any 16 25 export function registerOAuthRoutes( 17 26 app: Hono<any>, 18 27 config: Config, 19 28 oauthClient: NodeOAuthClient, 20 - pdsClient: PdsClient, 29 + pdsClientRef: PdsClientRef, 30 + db: Database.Database, 21 31 networkService?: NetworkService, 32 + sessionStore?: import("./stores.js").OAuthSessionStore, 22 33 ): void { 23 34 /** 24 35 * Start OAuth login flow. ··· 58 69 const { session } = await oauthClient.callback(params); 59 70 const did = session.did; 60 71 61 - // Enforce DID match if configured 72 + // Enforce DID match if node already has an identity 62 73 if (config.DID && did !== config.DID) { 63 74 return c.html(errorPage( 64 75 "DID Mismatch", ··· 66 77 ), 403); 67 78 } 68 79 80 + // First login: establish node identity 81 + if (!config.DID) { 82 + config.DID = did; 83 + 84 + // Resolve handle for display 85 + let handle: string | undefined; 86 + try { 87 + const res = await fetch( 88 + `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(did)}`, 89 + ); 90 + if (res.ok) { 91 + const data = await res.json() as Record<string, unknown>; 92 + if (typeof data.handle === "string") { 93 + handle = data.handle; 94 + config.HANDLE = handle; 95 + } 96 + } 97 + } catch { 98 + // Handle resolution is best-effort 99 + } 100 + 101 + // Persist identity so it survives restarts 102 + db.prepare( 103 + "INSERT OR REPLACE INTO node_identity (did, handle) VALUES (?, ?)", 104 + ).run(did, handle ?? null); 105 + 106 + console.log(`[oauth] Node identity established: ${did}${handle ? ` (@${handle})` : ""}`); 107 + } 108 + 109 + // Create PdsClient if we don't have one yet 110 + if (!pdsClientRef.current) { 111 + pdsClientRef.current = new PdsClient(oauthClient, did); 112 + } 113 + 69 114 // Publish peer record on successful auth 70 115 try { 71 - await publishPeerRecord(pdsClient, networkService); 116 + await publishPeerRecord(pdsClientRef.current, networkService); 72 117 } catch (err) { 73 118 console.warn( 74 119 "[oauth] Failed to publish peer record:", ··· 76 121 ); 77 122 } 78 123 79 - return c.html(successPage(did)); 124 + return c.redirect("/xrpc/org.p2pds.admin.dashboard"); 80 125 } catch (err) { 81 126 const message = err instanceof Error ? err.message : String(err); 82 127 return c.html(errorPage("Authentication Failed", message), 500); ··· 89 134 */ 90 135 app.get("/oauth/status", async (c) => { 91 136 try { 92 - const hasSession = await pdsClient.hasSession(); 137 + if (!pdsClientRef.current) { 138 + return c.json({ authenticated: false, did: null }); 139 + } 140 + const hasSession = await pdsClientRef.current.hasSession(); 93 141 if (!hasSession) { 94 142 return c.json({ authenticated: false, did: null }); 95 143 } ··· 124 172 }); 125 173 } catch { 126 174 return c.json({ authenticated: false, did: null }); 175 + } 176 + }); 177 + 178 + /** 179 + * Logout — clear the OAuth session. 180 + */ 181 + app.post("/oauth/logout", async (c) => { 182 + try { 183 + const did = config.DID; 184 + if (did && sessionStore) { 185 + await sessionStore.del(did); 186 + } 187 + pdsClientRef.current?.clearAgent(); 188 + return c.redirect("/xrpc/org.p2pds.admin.dashboard"); 189 + } catch (err) { 190 + const message = err instanceof Error ? err.message : String(err); 191 + return c.json({ error: "LogoutFailed", message }, 500); 127 192 } 128 193 }); 129 194 }
+96 -80
src/replication/replication-manager.ts
··· 76 76 constructor( 77 77 db: Database.Database, 78 78 private config: Config, 79 - private repoManager: RepoManager, 79 + private repoManager: RepoManager | undefined, 80 80 private blockStore: BlockStore, 81 81 private networkService: NetworkService, 82 82 private didResolver: DidResolver, ··· 101 101 if (policyEngine) { 102 102 this.policyEngine = policyEngine; 103 103 // Prefer remote PDS client for offer records; fall back to local repo 104 - const recordWriter: RecordWriter = pdsClient ?? repoManager; 105 - this.offerManager = new OfferManager( 106 - recordWriter, 107 - this.peerDiscovery, 108 - policyEngine, 109 - config.DID ?? "", 110 - ); 104 + const recordWriter: RecordWriter | undefined = pdsClient ?? repoManager; 105 + if (recordWriter) { 106 + this.offerManager = new OfferManager( 107 + recordWriter, 108 + this.peerDiscovery, 109 + policyEngine, 110 + config.DID ?? "", 111 + ); 112 + } 111 113 } 112 114 } 113 115 ··· 245 247 this.syncStorage.deleteState(did); 246 248 247 249 // Remove manifest record 248 - const rkey = didToRkey(did); 249 - try { 250 - await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 251 - } catch { 252 - // Non-fatal: manifest may not exist 250 + if (this.repoManager) { 251 + const rkey = didToRkey(did); 252 + try { 253 + await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 254 + } catch { 255 + // Non-fatal: manifest may not exist 256 + } 253 257 } 254 258 } else { 255 259 // Pause: update manifest status but keep data 256 - const rkey = didToRkey(did); 257 - const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 258 - if (existing) { 259 - const manifest: ManifestRecord = { 260 - ...(existing.record as ManifestRecord), 261 - status: "paused", 262 - }; 263 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 260 + if (this.repoManager) { 261 + const rkey = didToRkey(did); 262 + const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 263 + if (existing) { 264 + const manifest: ManifestRecord = { 265 + ...(existing.record as ManifestRecord), 266 + status: "paused", 267 + }; 268 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 269 + } 264 270 } 265 271 } 266 272 ··· 281 287 * Extracted from syncManifests() for use by addDid(). 282 288 */ 283 289 private async syncManifestForDid(did: string): Promise<void> { 284 - const rkey = didToRkey(did); 285 - const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 290 + if (this.repoManager) { 291 + const rkey = didToRkey(did); 292 + const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 286 293 287 - if (!existing) { 288 - const manifest: ManifestRecord = { 289 - $type: MANIFEST_NSID, 290 - subject: did, 291 - status: "active", 292 - lastSyncRev: null, 293 - lastSyncAt: null, 294 - createdAt: new Date().toISOString(), 295 - }; 296 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 294 + if (!existing) { 295 + const manifest: ManifestRecord = { 296 + $type: MANIFEST_NSID, 297 + subject: did, 298 + status: "active", 299 + lastSyncRev: null, 300 + lastSyncAt: null, 301 + createdAt: new Date().toISOString(), 302 + }; 303 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 304 + } 297 305 } 298 306 299 307 const pdsEndpoint = await this.repoFetcher.resolvePds(did); ··· 737 745 } 738 746 739 747 // 10. Update manifest record 740 - const rkey = didToRkey(did); 741 - const existingManifest = await this.repoManager.getRecord( 742 - MANIFEST_NSID, 743 - rkey, 744 - ); 745 - if (existingManifest) { 746 - const manifest: ManifestRecord = { 747 - $type: MANIFEST_NSID, 748 - subject: did, 749 - status: "active", 750 - lastSyncRev: rev, 751 - lastSyncAt: new Date().toISOString(), 752 - createdAt: 753 - (existingManifest.record as Record<string, unknown>) 754 - ?.createdAt as string ?? new Date().toISOString(), 755 - }; 756 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 748 + if (this.repoManager) { 749 + const rkey = didToRkey(did); 750 + const existingManifest = await this.repoManager.getRecord( 751 + MANIFEST_NSID, 752 + rkey, 753 + ); 754 + if (existingManifest) { 755 + const manifest: ManifestRecord = { 756 + $type: MANIFEST_NSID, 757 + subject: did, 758 + status: "active", 759 + lastSyncRev: rev, 760 + lastSyncAt: new Date().toISOString(), 761 + createdAt: 762 + (existingManifest.record as Record<string, unknown>) 763 + ?.createdAt as string ?? new Date().toISOString(), 764 + }; 765 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 766 + } 757 767 } 758 768 759 769 // 11. Sync blobs and capture result ··· 847 857 } 848 858 849 859 // Remove manifest record 850 - const rkey = didToRkey(state.did); 851 - try { 852 - await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 853 - } catch { 854 - // Non-fatal 860 + if (this.repoManager) { 861 + const rkey = didToRkey(state.did); 862 + try { 863 + await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 864 + } catch { 865 + // Non-fatal 866 + } 855 867 } 856 868 } 857 869 } ··· 1087 1099 this.syncStorage.markTombstoned(did); 1088 1100 1089 1101 // Update manifest to paused 1090 - const rkey = didToRkey(did); 1091 - const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 1092 - if (existing) { 1093 - const manifest: ManifestRecord = { 1094 - ...(existing.record as ManifestRecord), 1095 - status: "paused", 1096 - }; 1097 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1102 + if (this.repoManager) { 1103 + const rkey = didToRkey(did); 1104 + const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 1105 + if (existing) { 1106 + const manifest: ManifestRecord = { 1107 + ...(existing.record as ManifestRecord), 1108 + status: "paused", 1109 + }; 1110 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1111 + } 1098 1112 } 1099 1113 1100 1114 console.warn( ··· 1213 1227 } 1214 1228 1215 1229 // 9. Update manifest record 1216 - const rkey = didToRkey(did); 1217 - const existingManifest = await this.repoManager.getRecord( 1218 - MANIFEST_NSID, 1219 - rkey, 1220 - ); 1221 - if (existingManifest) { 1222 - const manifest: ManifestRecord = { 1223 - $type: MANIFEST_NSID, 1224 - subject: did, 1225 - status: "active", 1226 - lastSyncRev: rev, 1227 - lastSyncAt: new Date().toISOString(), 1228 - createdAt: 1229 - (existingManifest.record as Record<string, unknown>) 1230 - ?.createdAt as string ?? new Date().toISOString(), 1231 - }; 1232 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1230 + if (this.repoManager) { 1231 + const rkey = didToRkey(did); 1232 + const existingManifest = await this.repoManager.getRecord( 1233 + MANIFEST_NSID, 1234 + rkey, 1235 + ); 1236 + if (existingManifest) { 1237 + const manifest: ManifestRecord = { 1238 + $type: MANIFEST_NSID, 1239 + subject: did, 1240 + status: "active", 1241 + lastSyncRev: rev, 1242 + lastSyncAt: new Date().toISOString(), 1243 + createdAt: 1244 + (existingManifest.record as Record<string, unknown>) 1245 + ?.createdAt as string ?? new Date().toISOString(), 1246 + }; 1247 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1248 + } 1233 1249 } 1234 1250 1235 1251 // 10. Sync blobs for firehose ops (fire-and-forget)
+33 -8
src/start.ts
··· 34 34 import { RateLimiter } from "./rate-limiter.js"; 35 35 import { createOAuthClient, type OAuthClientManager } from "./oauth/client.js"; 36 36 import { PdsClient } from "./oauth/pds-client.js"; 37 + import type { PdsClientRef } from "./oauth/routes.js"; 37 38 38 39 export interface StartServerOpts { 39 40 /** Override DID resolver (e.g. mock resolver for tests). */ ··· 68 69 const db = new Database(dbPath); 69 70 db.pragma("journal_mode = WAL"); 70 71 db.pragma("synchronous = NORMAL"); 72 + 73 + // Create node_identity table for OAuth-established identity 74 + db.exec(` 75 + CREATE TABLE IF NOT EXISTS node_identity ( 76 + did TEXT PRIMARY KEY, 77 + handle TEXT, 78 + created_at TEXT NOT NULL DEFAULT (datetime('now')) 79 + ) 80 + `); 81 + 82 + // Load stored identity from previous OAuth login (if not overridden by env) 83 + const storedIdentity = db.prepare("SELECT did, handle FROM node_identity LIMIT 1").get() as 84 + | { did: string; handle: string | null } 85 + | undefined; 86 + if (storedIdentity && !config.DID) { 87 + config.DID = storedIdentity.did; 88 + if (storedIdentity.handle) { 89 + config.HANDLE = storedIdentity.handle; 90 + } 91 + } 71 92 72 93 // Initialize IPFS service (if enabled) 73 94 let ipfsService: IpfsService | undefined; ··· 93 114 repoManager.init(blobStore, ipfsService, ipfsService); 94 115 } 95 116 96 - // Initialize firehose 97 - const firehose = new Firehose(repoManager!); 117 + // Initialize firehose (repoManager is optional — without it, no local event backfill) 118 + const firehose = new Firehose(repoManager); 98 119 99 120 // Initialize DID resolver (allow override for tests) 100 121 const didResolver = opts?.didResolver ?? new DidResolver({ ··· 113 134 114 135 // Initialize OAuth client (if enabled) 115 136 let oauthClientManager: OAuthClientManager | undefined; 116 - let pdsClient: PdsClient | undefined; 137 + const pdsClientRef: PdsClientRef = { current: undefined }; 117 138 if (config.OAUTH_ENABLED) { 118 139 oauthClientManager = await createOAuthClient(db, config); 119 140 if (config.DID) { 120 - pdsClient = new PdsClient(oauthClientManager.client, config.DID); 141 + pdsClientRef.current = new PdsClient(oauthClientManager.client, config.DID); 121 142 } 122 143 } 123 144 124 145 // Initialize replication manager and replicated repo reader (if IPFS enabled) 146 + // repoManager is optional — without it, manifest records are skipped but sync works 125 147 let replicationManager: ReplicationManager | undefined; 126 148 let replicatedRepoReader: ReplicatedRepoReader | undefined; 127 - if (ipfsService && repoManager) { 149 + if (ipfsService) { 128 150 replicationManager = new ReplicationManager( 129 151 db, 130 152 config, ··· 135 157 undefined, 136 158 undefined, 137 159 policyEngine, 138 - pdsClient, 160 + pdsClientRef.current, 139 161 ); 140 162 replicatedRepoReader = new ReplicatedRepoReader( 141 163 ipfsService, ··· 161 183 repoManager, 162 184 rateLimiter, 163 185 oauthClientManager, 164 - pdsClient, 186 + pdsClientRef, 187 + db, 165 188 ); 166 189 167 190 // Create HTTP server using @hono/node-server's request listener ··· 240 263 pc.bold(`\nP2PDS running at `) + 241 264 pc.cyan(url), 242 265 ); 266 + // Machine-readable line for sidecar consumers (Tauri, tests) 267 + console.log(`P2PDS_READY ${JSON.stringify({ port: actualPort, url })}`); 243 268 if (config.DID) { 244 269 console.log(pc.dim(` DID: ${config.DID}`)); 245 270 } ··· 248 273 } 249 274 console.log(pc.dim(` Data: ${dataDir}`)); 250 275 if (oauthClientManager) { 251 - if (pdsClient && await pdsClient.hasSession().catch(() => false)) { 276 + if (pdsClientRef.current && await pdsClientRef.current.hasSession().catch(() => false)) { 252 277 console.log(pc.dim(` OAuth: session active for ${config.DID}`)); 253 278 } else { 254 279 console.log(pc.dim(` OAuth: enabled (no active session)`));