atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add two-node DID-less replication test and fix upsertState status preservation

Two tests: (1) two clean servers establish identity, each replicates a
different external account, verifies sync state; (2) identity and
replication state persist across server restart.

Fix: upsertState no longer overwrites status on conflict — existing
status is preserved, only pds_endpoint and peer_id are updated.

+284 -2
+1 -2
src/replication/sync-storage.ts
··· 163 163 VALUES (?, ?, ?, ?) 164 164 ON CONFLICT(did) DO UPDATE SET 165 165 pds_endpoint = excluded.pds_endpoint, 166 - peer_id = COALESCE(excluded.peer_id, replication_state.peer_id), 167 - status = excluded.status`, 166 + peer_id = COALESCE(excluded.peer_id, replication_state.peer_id)`, 168 167 ) 169 168 .run( 170 169 state.did,
+283
src/two-node-didless.test.ts
··· 1 + /** 2 + * Two-node DID-less startup test. 3 + * 4 + * Two clean p2pds servers start without any DID. Each establishes identity 5 + * (simulating OAuth), then each replicates a different external account 6 + * from mock PDSes. Verifies the full flow: startup → identity → add DID → sync. 7 + */ 8 + 9 + import { describe, it, expect, afterEach } from "vitest"; 10 + import { mkdtempSync, rmSync } from "node:fs"; 11 + import { tmpdir } from "node:os"; 12 + import { join, resolve } from "node:path"; 13 + import Database from "better-sqlite3"; 14 + 15 + import type { Config } from "./config.js"; 16 + import { startServer, type ServerHandle } from "./start.js"; 17 + import { 18 + createTestRepo, 19 + startMockPds, 20 + createMockDidResolver, 21 + type MockPds, 22 + } from "./replication/test-helpers.js"; 23 + 24 + const DID_ALICE = "did:plc:alice111"; 25 + const DID_BOB = "did:plc:bob222"; 26 + 27 + function didlessConfig(dataDir: string): Config { 28 + return { 29 + PDS_HOSTNAME: "local.test", 30 + AUTH_TOKEN: "test-auth-token", 31 + JWT_SECRET: "test-jwt-secret", 32 + PASSWORD_HASH: "$2a$10$test", 33 + DATA_DIR: dataDir, 34 + PORT: 0, 35 + IPFS_ENABLED: true, 36 + IPFS_NETWORKING: false, 37 + REPLICATE_DIDS: [], 38 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 39 + FIREHOSE_ENABLED: false, 40 + RATE_LIMIT_ENABLED: false, 41 + RATE_LIMIT_READ_PER_MIN: 300, 42 + RATE_LIMIT_SYNC_PER_MIN: 30, 43 + RATE_LIMIT_SESSION_PER_MIN: 10, 44 + RATE_LIMIT_WRITE_PER_MIN: 200, 45 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 46 + RATE_LIMIT_MAX_CONNECTIONS: 100, 47 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 48 + OAUTH_ENABLED: false, 49 + }; 50 + } 51 + 52 + describe("Two-node DID-less replication", () => { 53 + let tmpDirA: string; 54 + let tmpDirB: string; 55 + let handleA: ServerHandle | undefined; 56 + let handleB: ServerHandle | undefined; 57 + let mockPds: MockPds | undefined; 58 + 59 + afterEach(async () => { 60 + if (handleA) { await handleA.close(); handleA = undefined; } 61 + if (handleB) { await handleB.close(); handleB = undefined; } 62 + if (mockPds) { await mockPds.close(); mockPds = undefined; } 63 + if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); 64 + if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); 65 + }); 66 + 67 + it("two clean nodes establish identity and replicate different accounts", async () => { 68 + tmpDirA = mkdtempSync(join(tmpdir(), "two-node-a-")); 69 + tmpDirB = mkdtempSync(join(tmpdir(), "two-node-b-")); 70 + 71 + // Create test repos for two external accounts 72 + const aliceCar = await createTestRepo(DID_ALICE, [ 73 + { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } }, 74 + { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, 75 + ]); 76 + const bobCar = await createTestRepo(DID_BOB, [ 77 + { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } }, 78 + ]); 79 + 80 + // Mock PDS serves both accounts 81 + mockPds = await startMockPds([ 82 + { did: DID_ALICE, carBytes: aliceCar }, 83 + { did: DID_BOB, carBytes: bobCar }, 84 + ]); 85 + const mockResolver = createMockDidResolver({ 86 + [DID_ALICE]: mockPds.url, 87 + [DID_BOB]: mockPds.url, 88 + }); 89 + 90 + // Start two clean servers — no DID, no signing key 91 + const configA = didlessConfig(tmpDirA); 92 + const configB = didlessConfig(tmpDirB); 93 + handleA = await startServer(configA, { didResolver: mockResolver }); 94 + handleB = await startServer(configB, { didResolver: mockResolver }); 95 + 96 + // Both should be healthy without DID 97 + const healthA = await fetch(`${handleA.url}/xrpc/_health`); 98 + const healthB = await fetch(`${handleB.url}/xrpc/_health`); 99 + expect(healthA.status).toBe(200); 100 + expect(healthB.status).toBe(200); 101 + 102 + // Both should have replication managers 103 + expect(handleA.replicationManager).toBeDefined(); 104 + expect(handleB.replicationManager).toBeDefined(); 105 + 106 + // Simulate OAuth identity establishment on each node 107 + // (In production this happens in the OAuth callback) 108 + const dbA = new Database(resolve(tmpDirA, "pds.db")); 109 + dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( 110 + "did:plc:node-a-identity", 111 + "node-a.test", 112 + ); 113 + configA.DID = "did:plc:node-a-identity"; 114 + configA.HANDLE = "node-a.test"; 115 + dbA.close(); 116 + 117 + const dbB = new Database(resolve(tmpDirB, "pds.db")); 118 + dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( 119 + "did:plc:node-b-identity", 120 + "node-b.test", 121 + ); 122 + configB.DID = "did:plc:node-b-identity"; 123 + configB.HANDLE = "node-b.test"; 124 + dbB.close(); 125 + 126 + // Verify identity shows up in overview 127 + const overviewA = await fetch(`${handleA.url}/xrpc/org.p2pds.admin.getOverview`, { 128 + headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, 129 + }); 130 + expect(overviewA.status).toBe(200); 131 + const ovA = (await overviewA.json()) as { did: string }; 132 + expect(ovA.did).toBe("did:plc:node-a-identity"); 133 + 134 + // Node A replicates Alice's account 135 + const addAlice = await fetch(`${handleA.url}/xrpc/org.p2pds.admin.addDid`, { 136 + method: "POST", 137 + headers: { 138 + Authorization: `Bearer ${configA.AUTH_TOKEN}`, 139 + "Content-Type": "application/json", 140 + }, 141 + body: JSON.stringify({ did: DID_ALICE }), 142 + }); 143 + expect(addAlice.status).toBe(200); 144 + 145 + // Node B replicates Bob's account 146 + const addBob = await fetch(`${handleB.url}/xrpc/org.p2pds.admin.addDid`, { 147 + method: "POST", 148 + headers: { 149 + Authorization: `Bearer ${configB.AUTH_TOKEN}`, 150 + "Content-Type": "application/json", 151 + }, 152 + body: JSON.stringify({ did: DID_BOB }), 153 + }); 154 + expect(addBob.status).toBe(200); 155 + 156 + // Trigger sync on both 157 + await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { 158 + method: "POST", 159 + headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, 160 + }); 161 + await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, { 162 + method: "POST", 163 + headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` }, 164 + }); 165 + 166 + // Wait for async sync 167 + await new Promise((r) => setTimeout(r, 3000)); 168 + 169 + // Verify Node A synced Alice's data 170 + const statusA = await fetch( 171 + `${handleA.url}/xrpc/org.p2pds.admin.getDidStatus?did=${DID_ALICE}`, 172 + { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } }, 173 + ); 174 + expect(statusA.status).toBe(200); 175 + const didStatusA = (await statusA.json()) as { 176 + did: string; 177 + blockCount: number; 178 + syncState: { status: string }; 179 + }; 180 + expect(didStatusA.did).toBe(DID_ALICE); 181 + expect(didStatusA.blockCount).toBeGreaterThan(0); 182 + expect(didStatusA.syncState.status).toBe("synced"); 183 + 184 + // Verify Node B synced Bob's data 185 + const statusB = await fetch( 186 + `${handleB.url}/xrpc/org.p2pds.admin.getDidStatus?did=${DID_BOB}`, 187 + { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } }, 188 + ); 189 + expect(statusB.status).toBe(200); 190 + const didStatusB = (await statusB.json()) as { 191 + did: string; 192 + blockCount: number; 193 + syncState: { status: string }; 194 + }; 195 + expect(didStatusB.did).toBe(DID_BOB); 196 + expect(didStatusB.blockCount).toBeGreaterThan(0); 197 + expect(didStatusB.syncState.status).toBe("synced"); 198 + }, 30_000); 199 + 200 + it("identity persists across restart and replication continues", async () => { 201 + tmpDirA = mkdtempSync(join(tmpdir(), "two-node-restart-")); 202 + tmpDirB = ""; // not used 203 + 204 + // Create test repo 205 + const aliceCar = await createTestRepo(DID_ALICE, [ 206 + { collection: "app.bsky.feed.post", rkey: "r1", record: { text: "restart test", createdAt: new Date().toISOString() } }, 207 + ]); 208 + mockPds = await startMockPds([{ did: DID_ALICE, carBytes: aliceCar }]); 209 + const mockResolver = createMockDidResolver({ [DID_ALICE]: mockPds.url }); 210 + 211 + // First boot: start clean, establish identity, add DID, sync 212 + const config1 = didlessConfig(tmpDirA); 213 + handleA = await startServer(config1, { didResolver: mockResolver }); 214 + 215 + // Simulate identity establishment 216 + const db1 = new Database(resolve(tmpDirA, "pds.db")); 217 + db1.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( 218 + "did:plc:persistent-node", 219 + "persistent.test", 220 + ); 221 + config1.DID = "did:plc:persistent-node"; 222 + db1.close(); 223 + 224 + // Add and sync 225 + await fetch(`${handleA.url}/xrpc/org.p2pds.admin.addDid`, { 226 + method: "POST", 227 + headers: { 228 + Authorization: `Bearer ${config1.AUTH_TOKEN}`, 229 + "Content-Type": "application/json", 230 + }, 231 + body: JSON.stringify({ did: DID_ALICE }), 232 + }); 233 + await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { 234 + method: "POST", 235 + headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` }, 236 + }); 237 + await new Promise((r) => setTimeout(r, 2000)); 238 + 239 + // Verify sync worked 240 + const status1 = await fetch( 241 + `${handleA.url}/xrpc/org.p2pds.admin.getDidStatus?did=${DID_ALICE}`, 242 + { headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` } }, 243 + ); 244 + const ds1 = (await status1.json()) as { blockCount: number }; 245 + expect(ds1.blockCount).toBeGreaterThan(0); 246 + 247 + // Shut down 248 + await handleA.close(); 249 + handleA = undefined; 250 + 251 + // Second boot: restart with same data dir — identity should load automatically 252 + const config2 = didlessConfig(tmpDirA); 253 + expect(config2.DID).toBeUndefined(); // not set in config 254 + handleA = await startServer(config2, { didResolver: mockResolver }); 255 + 256 + // Identity should have been loaded from node_identity 257 + expect(config2.DID).toBe("did:plc:persistent-node"); 258 + expect(config2.HANDLE).toBe("persistent.test"); 259 + 260 + // Replication state should persist — DID_ALICE is still tracked 261 + const overview = await fetch(`${handleA.url}/xrpc/org.p2pds.admin.getOverview`, { 262 + headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` }, 263 + }); 264 + const ov = (await overview.json()) as { 265 + did: string; 266 + replication: { trackedDids: string[] }; 267 + }; 268 + expect(ov.did).toBe("did:plc:persistent-node"); 269 + expect(ov.replication.trackedDids).toContain(DID_ALICE); 270 + 271 + // Blocks should still be in IPFS (persisted to disk) 272 + const status2 = await fetch( 273 + `${handleA.url}/xrpc/org.p2pds.admin.getDidStatus?did=${DID_ALICE}`, 274 + { headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` } }, 275 + ); 276 + const ds2 = (await status2.json()) as { 277 + blockCount: number; 278 + syncState: { status: string }; 279 + }; 280 + expect(ds2.blockCount).toBeGreaterThan(0); 281 + expect(ds2.syncState.status).toBe("synced"); 282 + }, 30_000); 283 + });