atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add SQLite-backed IPFS storage, minimal libp2p, and infrastructure hardening

Replace filesystem-based Helia storage with SQLite:
- SqliteBlockstore: implements Blockstore interface backed by ipfs_blocks table,
eliminating thousands of tiny files from FsBlockstore
- SqliteDatastore: implements Datastore interface backed by ipfs_datastore table,
replacing FsDatastore for libp2p peer/routing state
- All persistent state now lives in a single pds.db file

Strip libp2p down to minimal config (TCP + noise + yamux + identify only):
- Remove DHT, gossipsub, relay, autoNAT, UPnP, dcutr, WebRTC, ping
- These services pegged CPU connecting to random peers on the public network
- P2PDS dials known peers directly using multiaddrs from peer records

Harden auth and server lifecycle for OAuth mode:
- Guard legacy JWT endpoints (createSession, refreshSession, getSession)
against missing config when OAUTH_ENABLED=true — return 501 instead of crash
- Guard auth middleware against undefined PDS_HOSTNAME/JWT_SECRET
- Register SIGINT/SIGTERM handlers before awaiting startServer() so shutdown
signals during startup are handled cleanly

Add two-node manual testing scripts:
- scripts/start-node.sh: build + start a single node on a random port
- scripts/start-both.sh: start two nodes with separate data dirs
- scripts/stop-both.sh: stop both nodes
- scripts/clean.sh: wipe data for both nodes (preserves .env files)
- scripts/logs.sh: show recent logs
- scripts/test-add-did.sh: offer a DID on a running node
- npm scripts: start:node1, start:node2, start:both, stop, clean, logs, test:add-did

Update README with current architecture, configuration, and project structure.
Update .gitignore for test artifacts (data-node2/, plans/, .claude/settings.local.json).

+606 -97
+3
.gitignore
··· 1 1 node_modules/ 2 2 dist/ 3 3 data/ 4 + data-node2/ 5 + plans/ 4 6 .env 5 7 *.db 6 8 *.db-journal 7 9 *.db-wal 8 10 *.db-shm 11 + .claude/settings.local.json
+103 -52
README.md
··· 6 6 - Provides data on P2P networks (IPFS/libp2p) for other nodes to replicate 7 7 - Fetches and stores data from P2P networks for serviced accounts 8 8 - Mutual replication agreements between peers via on-protocol consent records 9 + - Push-based offer discovery: nodes notify each other of replication offers 9 10 10 11 P2PDS is infrastructure — like a torrent client for atproto data. It does not have its own identity. Users authenticate with their own atproto accounts, and records (`org.p2pds.peer`, `org.p2pds.replication.offer`) are published to the user's own repo via their PDS. 11 12 ··· 14 15 - **Runtime**: Node.js, TypeScript (ES2022, strict) 15 16 - **Base**: Generalized from [Cirrus](https://github.com/ascorbic/cirrus) 16 17 - **HTTP**: Hono 17 - - **Database**: better-sqlite3 (sync API) 18 - - **IPFS**: Helia (libp2p + DHT + bitswap + gossipsub) 18 + - **Database**: better-sqlite3 (sync API) — all state in a single `pds.db` file 19 + - **IPFS**: Helia with minimal libp2p (TCP + noise + yamux), SQLite-backed blockstore 19 20 - **Identity**: AT Protocol DIDs via PLC directory 21 + - **Auth**: OAuth (primary) or legacy JWT (fallback) 20 22 - **Desktop**: Tauri v2 (optional, `apps/desktop/`) 21 23 - **Content addressing**: [DASL](https://dasl.ing/) CIDs (CIDv1, SHA-256, dag-cbor/raw, base32lower) 22 24 ··· 29 31 ┌─────────┐ 30 32 │ p2pds │ ← replication infrastructure (local, cloud, or co-located) 31 33 │ │ 32 - │ SQLite │ block/blob tracking, sync state, challenge history 33 - │ Helia │ IPFS storage, DHT announcements, bitswap, gossipsub 34 + │ SQLite │ blocks, blobs, sync state, peer routing, challenge history 35 + │ Helia │ IPFS storage, direct peer connections, bitswap 34 36 │ Hono │ XRPC endpoints, admin dashboard, RASL 35 37 └─────────┘ 36 38 ··· 38 40 Other p2pds nodes (mutual replication via offer records) 39 41 ``` 40 42 41 - Configured with a list of DIDs to replicate: 43 + ### Storage 44 + 45 + All persistent state lives in a single SQLite database (`pds.db`): 46 + 47 + - **IPFS blocks** (`ipfs_blocks`) — replaces filesystem blockstore, avoids thousands of tiny files 48 + - **IPFS datastore** (`ipfs_datastore`) — replaces filesystem datastore for libp2p peer/routing data 49 + - **Replication state** — sync progress, peer info, block/blob tracking, firehose cursor 50 + - **Challenge history** — proof-of-storage results and peer reliability scores 51 + - **Incoming offers** — offers from other nodes awaiting accept/reject 52 + - **Node identity** — DID + handle, established on first OAuth login 53 + 54 + ### Identity model 55 + 56 + P2PDS starts without an identity. On first OAuth login, the user's DID becomes the node identity and is persisted in SQLite. Subsequent restarts load the identity from the database. This "lazy identity" model means: 57 + 58 + - No DID or signing key required in config 59 + - Identity established interactively via the dashboard 60 + - `RepoManager` is optional throughout (firehose, replication, startup all handle its absence) 61 + 62 + ### Libp2p configuration 42 63 43 - 1. Resolves DIDs via PLC directory to find source PDSes 44 - 2. Fetches repos as CAR files from each DID's PDS (incremental via `since`) 45 - 3. Stores blocks in IPFS (Helia) and announces via DHT 46 - 4. Serves blocks via content-addressed RASL endpoint 47 - 5. Real-time sync via firehose (`com.atproto.sync.subscribeRepos`) 48 - 6. Gossipsub notifications for low-latency cross-node sync 49 - 7. Verifies block availability on remote peers via challenge-response protocol 64 + Helia runs with a minimal libp2p stack: TCP transport, Noise encryption, Yamux multiplexing, and Identify only. No DHT, gossipsub, relay, autoNAT, UPnP, or WebRTC — those services peg CPU connecting to random peers. P2PDS dials known peers directly using multiaddrs from `org.p2pds.peer` records. 65 + 66 + ### Replication flow 67 + 68 + 1. User adds a DID via the dashboard → publishes an `org.p2pds.replication.offer` record 69 + 2. Node resolves the target's `org.p2pds.peer` record to find their p2pds endpoint 70 + 3. Node POSTs a notification to the target's `notifyOffer` endpoint 71 + 4. Target verifies the offer exists in the offerer's repo (anti-spoofing) 72 + 5. Target's dashboard shows the incoming offer with Accept/Reject buttons 73 + 6. Accepting creates a reciprocal offer + push notification back 74 + 7. Both nodes detect mutual agreement → promote to active replication 75 + 8. Sync loop: fetch repo, store blocks/blobs, verify, announce 50 76 51 77 ### Design choices 52 78 53 79 - **DHT only** for discovery/routing — no IPNI or centralized indexers 54 80 - **Slow data is fine** as a tradeoff for resilience and decentralization 55 81 - **Transport-agnostic verification** — RASL works over any HTTP transport 56 - - **DASL-compliant content addressing** — all CIDs are CIDv1 + SHA-256 with either dag-cbor (`0x71`) or raw (`0x55`) codec, encoded as base32lower (`b` prefix). This is enforced at the library level by `@atcute/cid` and matches atproto's CID conventions. See [DASL CID spec](https://dasl.ing/cid.html). 57 - - **No node identity** — p2pds acts on behalf of users, not as its own entity. Records publish to the user's own atproto repo. 82 + - **DASL-compliant content addressing** — all CIDs are CIDv1 + SHA-256 with either dag-cbor (`0x71`) or raw (`0x55`) codec, encoded as base32lower (`b` prefix). Enforced by `@atcute/cid`. 83 + - **No node identity** — p2pds acts on behalf of users, not as its own entity 84 + - **SQLite everywhere** — single-file database, no filesystem blockstore churn 85 + - **Consent-gated replication** — "Add" publishes an offer, not an immediate sync. Replication only begins when both sides agree. 58 86 59 87 ### Deployment flexibility 60 88 ··· 68 96 69 97 | NSID | Repo key | Purpose | 70 98 |------|----------|---------| 71 - | `org.p2pds.peer` | `self` | Binds an atproto DID to a libp2p PeerID + multiaddrs | 99 + | `org.p2pds.peer` | `self` | Binds an atproto DID to a libp2p PeerID + multiaddrs + p2pds endpoint URL | 72 100 | `org.p2pds.replication.offer` | `any` | Declares willingness to replicate a specific DID's data | 73 101 74 102 Schemas are in `lexicons/` and validated by `src/lexicons.ts`. 75 103 76 104 **Offer negotiation**: Peers publish offer records declaring willingness to replicate specific DIDs. When two peers have mutual offers (A offers to replicate B, B offers to replicate A), a replication agreement is automatically formed. Parameters are merged: `max(minCopies)`, `min(intervalSec)`, `max(priority)`. Revoking an offer = deleting the record. 77 105 106 + **Push notifications**: When a node publishes an offer, it resolves the target's `org.p2pds.peer` record to find their p2pds HTTP endpoint and POSTs a notification. The receiving node verifies the offer exists in the sender's repo before storing it (prevents spoofing). 107 + 78 108 ## Verification 79 109 80 110 Content-addressed retrieval is unforgeable: if a peer returns the correct bytes for a CID, they have the data. The verification stack exploits this property at multiple layers: ··· 87 117 | L3 | MST proof challenge | Challenge peers to produce Merkle path proofs for specific records | Done | 88 118 89 119 **Challenge-response protocol**: Three message types (`StorageChallenge` → `StorageChallengeResponse` → `StorageChallengeResult`). Deterministic generation from epoch + DIDs + nonce. Transport-agnostic with libp2p primary and HTTP fallback (`FailoverChallengeTransport`). Challenge history and peer reliability tracked in SQLite. 90 - 91 - **L0** and **L1** run on a configurable timer (default 30 min). L1 samples are tuneable via `VerificationConfig.raslSampleSize` (default 50 blocks). 92 - 93 - ### L2 (libp2p+HTTP Gateway) — future 94 - 95 - Reuses RASL verification logic over libp2p transports for NAT traversal and encryption without public IP. Requires [libp2p+HTTP Gateway spec](https://specs.ipfs.tech/http-gateways/libp2p-gateway/) in Helia. 96 - 97 - - Kubo (Go): [ipfs/kubo#10049](https://github.com/ipfs/kubo/issues/10049) (shipped) 98 - - Helia (JS): [ipfs/helia#348](https://github.com/ipfs/helia/issues/348) (not yet) 99 120 100 121 ## Replication 101 122 102 123 Sync loop (per DID, periodic with policy-driven intervals): 103 124 104 125 1. Resolve DID → PDS endpoint (via PLC directory) 105 - 2. Fetch repo (`com.atproto.sync.getRepo`, incremental via `since`) 106 - 3. Parse CAR, store blocks in IPFS, fetch and store blobs 107 - 4. Track block/blob CIDs, populate record paths via MST walk 108 - 5. Announce to DHT 126 + 2. Try libp2p peer-first sync if peer info is known 127 + 3. Fall back to HTTP PDS fetch (`com.atproto.sync.getRepo`, incremental via `since`) 128 + 4. Parse CAR, store blocks in IPFS, fetch and store blobs 129 + 5. Track block/blob CIDs, populate record paths via MST walk 109 130 6. Verify local block availability 110 131 7. If source PDS fails, fall back to peer endpoints 111 132 ··· 125 146 126 147 ## App 127 148 128 - - **Dashboard**: Server-rendered HTML at `/` (auto-refresh) 149 + - **Dashboard**: Server-rendered HTML at `/` with auto-refresh, account search, incoming offer notifications 129 150 - **API**: Authenticated XRPC endpoints for overview, per-DID status, network status, policies, sync history 130 - - **DID management**: Add/remove DIDs at runtime via `addDid`/`removeDid` endpoints 131 - - **Rate limiting**: Per-IP and per-DID limits across HTTP, gossipsub, and libp2p 151 + - **DID management**: Add/remove/offer DIDs at runtime via dashboard or API 152 + - **Incoming offers**: Accept/reject replication offers from other nodes via dashboard 153 + - **Rate limiting**: Per-IP limits across all endpoint groups (meta, sync, session, read, write, challenge, app, notifyOffer) 132 154 133 155 ## Desktop App 134 156 ··· 149 171 npm run dev 150 172 ``` 151 173 174 + ### Two-node manual testing 175 + 176 + Scripts for running two p2pds nodes locally for manual testing: 177 + 178 + ```bash 179 + npm run start:both # Build and start both nodes on random ports 180 + npm run start:node1 # Start node 1 only 181 + npm run start:node2 # Start node 2 only (uses data-node2/.env) 182 + npm run stop # Stop both nodes 183 + npm run clean # Wipe data for both nodes (keeps .env files) 184 + npm run logs # Show recent logs for both nodes 185 + npm run test:add-did # Offer a DID on a running node 186 + ``` 187 + 188 + Node 2 requires a `data-node2/.env` file with a separate `OAUTH_ENABLED=true` and `DATA_DIR=./data-node2` config. Both nodes pick random ports and write them to `/tmp/p2pds-node{1,2}.port`. 189 + 152 190 ### Project structure 153 191 154 192 ``` 155 193 src/ 156 194 index.ts Hono app with all routes 157 195 server.ts HTTP server entry point 196 + start.ts Server startup orchestrator 158 197 config.ts Config interface + loadConfig() 159 - validation.ts Record validator (atproto + p2pds lexicons) 160 - lexicons.ts p2pds lexicon loader + validator 161 - ipfs.ts IpfsService (Helia wrapper) 198 + ipfs.ts IpfsService (Helia wrapper, SQLite-backed) 199 + sqlite-blockstore.ts SQLite blockstore for Helia (replaces FsBlockstore) 200 + sqlite-datastore.ts SQLite datastore for libp2p (replaces FsDatastore) 162 201 repo-manager.ts Local repo management 163 202 storage.ts SQLite block storage 164 203 blobs.ts Blob storage 165 - middleware/auth.ts Auth middleware 204 + middleware/auth.ts Auth middleware (OAuth + legacy JWT) 205 + oauth/ OAuth client, routes, session/state stores, PdsClient 166 206 replication/ Sync, verification, challenges, offers, gossipsub 167 207 policy/ Policy engine types, engine, presets 168 208 xrpc/ XRPC endpoint handlers 209 + scripts/ Two-node testing scripts 169 210 lexicons/ Lexicon JSON schemas 170 211 apps/desktop/ Tauri desktop app 171 212 ``` ··· 174 215 175 216 Environment variables (or `.env` file): 176 217 218 + | Variable | Required | Default | Description | 219 + |----------|----------|---------|-------------| 220 + | `OAUTH_ENABLED` | No | `false` | Enable OAuth login (recommended) | 221 + | `PUBLIC_URL` | No | `http://localhost:$PORT` | Public URL for push notifications between nodes | 222 + | `DATA_DIR` | No | `./data` | Data directory | 223 + | `PORT` | No | `3000` | HTTP port | 224 + | `IPFS_ENABLED` | No | `true` | Enable IPFS | 225 + | `IPFS_NETWORKING` | No | `true` | Enable libp2p networking | 226 + | `REPLICATE_DIDS` | No | | Comma-separated DIDs to replicate | 227 + | `FIREHOSE_URL` | No | `wss://bsky.network/...` | Firehose WebSocket URL | 228 + | `FIREHOSE_ENABLED` | No | `true` | Enable firehose sync | 229 + | `POLICY_FILE` | No | | Path to policy JSON file | 230 + | `RATE_LIMIT_ENABLED` | No | `true` | Enable rate limiting | 231 + 232 + **Legacy auth** (when `OAUTH_ENABLED=false`): 233 + 177 234 | Variable | Required | Description | 178 235 |----------|----------|-------------| 179 - | `DID` | Yes | Your atproto DID (e.g., `did:plc:...`) | 180 - | `HANDLE` | Yes | Your handle (e.g., `user.example.com`) | 236 + | `DID` | Yes | Your atproto DID | 237 + | `HANDLE` | Yes | Your handle | 181 238 | `PDS_HOSTNAME` | Yes | PDS hostname | 182 239 | `AUTH_TOKEN` | Yes | Static auth token | 183 - | `SIGNING_KEY` | Yes | Hex-encoded secp256k1 private key | 184 - | `SIGNING_KEY_PUBLIC` | No | Multibase-encoded public key | 185 240 | `JWT_SECRET` | Yes | JWT signing secret | 186 241 | `PASSWORD_HASH` | Yes | Bcrypt password hash | 187 - | `DATA_DIR` | No | Data directory (default: `./data`) | 188 - | `PORT` | No | HTTP port (default: `3000`) | 189 - | `IPFS_ENABLED` | No | Enable IPFS (default: `true`) | 190 - | `IPFS_NETWORKING` | No | Enable IPFS networking (default: `true`) | 191 - | `REPLICATE_DIDS` | No | Comma-separated DIDs to replicate | 192 - | `FIREHOSE_URL` | No | Firehose WebSocket URL | 193 - | `FIREHOSE_ENABLED` | No | Enable firehose sync (default: `false`) | 194 - | `POLICY_FILE` | No | Path to policy JSON file | 242 + | `SIGNING_KEY` | No | Hex-encoded secp256k1 private key | 195 243 196 244 ## Status 197 245 ··· 202 250 5. Challenge-response proof-of-storage — done 203 251 6. Policy engine — done 204 252 7. P2P offer negotiation — done 205 - 8. Admin dashboard + DID management — done 206 - 9. Rate limiting — done 207 - 10. Architecture refactor (user-DID model) — done 208 - 11. Lexicon definitions — done 209 - 12. Desktop app skeleton — done 253 + 8. Consent-gated replication — done 254 + 9. Incoming offer discovery via push notification — done 255 + 10. Admin dashboard + DID management — done 256 + 11. Rate limiting — done 257 + 12. Architecture refactor (user-DID model, lazy identity) — done 258 + 13. SQLite-backed IPFS storage — done 259 + 14. Lexicon definitions — done 260 + 15. Desktop app skeleton — done
+9 -1
package.json
··· 10 10 "start": "node dist/server.js", 11 11 "typecheck": "tsc --noEmit", 12 12 "test": "vitest run", 13 - "smoke-test": "bash scripts/smoke-test.sh" 13 + "smoke-test": "bash scripts/smoke-test.sh", 14 + "start:node1": "bash scripts/start-node.sh 1", 15 + "start:node2": "bash scripts/start-node.sh 2", 16 + "start:both": "bash scripts/start-both.sh", 17 + "stop": "bash scripts/stop-both.sh", 18 + "clean": "bash scripts/clean.sh", 19 + "logs": "bash scripts/logs.sh", 20 + "test:add-did": "bash scripts/test-add-did.sh", 21 + "serve": "tsc && node dist/server.js > /tmp/p2pds-node1.log 2>&1 & echo $! > /tmp/p2pds-node1.pid && sleep 2 && cat /tmp/p2pds-node1.log" 14 22 }, 15 23 "dependencies": { 16 24 "@atcute/atproto": "^3.1.10",
+15
scripts/clean.sh
··· 1 + #!/bin/bash 2 + # Wipe all data for both nodes (DB, blobs, IPFS blockstore) 3 + # Does NOT touch .env files 4 + set -e 5 + 6 + # Stop any running nodes first 7 + bash scripts/stop-both.sh 2>/dev/null || true 8 + 9 + echo "Cleaning node 1 data (data/)..." 10 + rm -rf data/pds.db data/pds.db-shm data/pds.db-wal data/blobs data/ipfs 11 + 12 + echo "Cleaning node 2 data (data-node2/)..." 13 + rm -rf data-node2/pds.db data-node2/pds.db-shm data-node2/pds.db-wal data-node2/blobs data-node2/ipfs 14 + 15 + echo "Done — both nodes clean"
+16
scripts/logs.sh
··· 1 + #!/bin/bash 2 + # Show recent logs for p2pds nodes 3 + NODE=${1:-0} 4 + if [ "$NODE" = "1" ] || [ "$NODE" = "0" ]; then 5 + if [ -f /tmp/p2pds-node1.log ]; then 6 + echo "=== NODE 1 ===" 7 + tail -30 /tmp/p2pds-node1.log 8 + echo "" 9 + fi 10 + fi 11 + if [ "$NODE" = "2" ] || [ "$NODE" = "0" ]; then 12 + if [ -f /tmp/p2pds-node2.log ]; then 13 + echo "=== NODE 2 ===" 14 + tail -30 /tmp/p2pds-node2.log 15 + fi 16 + fi
+51
scripts/start-both.sh
··· 1 + #!/bin/bash 2 + # Start both p2pds nodes for two-node testing 3 + set -e 4 + 5 + # Stop previous instances if pid files exist 6 + bash scripts/stop-both.sh 2>/dev/null || true 7 + 8 + # Build first 9 + npm run build 10 + 11 + # Start node 1 (random port) 12 + PORT1=$(python3 -c 'import socket; s=socket.socket(); s.bind(("",0)); print(s.getsockname()[1]); s.close()') 13 + export PORT=$PORT1 14 + node dist/server.js > /tmp/p2pds-node1.log 2>&1 & 15 + echo $! > /tmp/p2pds-node1.pid 16 + echo "$PORT1" > /tmp/p2pds-node1.port 17 + echo "Node 1 started (pid $!, port $PORT1, log: /tmp/p2pds-node1.log)" 18 + 19 + # Start node 2 (random port, different env) 20 + PORT2=$(python3 -c 'import socket; s=socket.socket(); s.bind(("",0)); print(s.getsockname()[1]); s.close()') 21 + set -a && source data-node2/.env && set +a 22 + export PORT=$PORT2 23 + node dist/server.js > /tmp/p2pds-node2.log 2>&1 & 24 + echo $! > /tmp/p2pds-node2.pid 25 + echo "$PORT2" > /tmp/p2pds-node2.port 26 + echo "Node 2 started (pid $!, port $PORT2, log: /tmp/p2pds-node2.log)" 27 + 28 + # Wait for both to be ready 29 + echo "" 30 + echo "Waiting for nodes..." 31 + for node in 1 2; do 32 + for i in $(seq 1 30); do 33 + if grep -q "P2PDS_READY" "/tmp/p2pds-node${node}.log" 2>/dev/null; then 34 + echo "Node $node ready (port $(cat /tmp/p2pds-node${node}.port))" 35 + break 36 + fi 37 + if grep -q "Error:" "/tmp/p2pds-node${node}.log" 2>/dev/null; then 38 + echo "Node $node FAILED:" 39 + cat "/tmp/p2pds-node${node}.log" 40 + break 41 + fi 42 + sleep 1 43 + done 44 + done 45 + 46 + echo "" 47 + echo "=== NODE 1 (port $PORT1) ===" 48 + tail -10 /tmp/p2pds-node1.log 49 + echo "" 50 + echo "=== NODE 2 (port $PORT2) ===" 51 + tail -10 /tmp/p2pds-node2.log
+53
scripts/start-node.sh
··· 1 + #!/bin/bash 2 + # Start a single p2pds node: scripts/start-node.sh [1|2] 3 + set -e 4 + NODE=${1:-1} 5 + PIDFILE="/tmp/p2pds-node${NODE}.pid" 6 + PORTFILE="/tmp/p2pds-node${NODE}.port" 7 + LOGFILE="/tmp/p2pds-node${NODE}.log" 8 + 9 + # Stop if already running 10 + if [ -f "$PIDFILE" ]; then 11 + kill "$(cat "$PIDFILE")" 2>/dev/null || true 12 + rm -f "$PIDFILE" "$PORTFILE" 13 + sleep 1 14 + fi 15 + 16 + npm run build 17 + 18 + # Pick a random high port 19 + PORT=$(python3 -c 'import socket; s=socket.socket(); s.bind(("",0)); print(s.getsockname()[1]); s.close()') 20 + export PORT 21 + 22 + if [ "$NODE" = "2" ]; then 23 + set -a && source data-node2/.env && set +a 24 + export PORT # override .env PORT with our random one 25 + fi 26 + 27 + > "$LOGFILE" 28 + node dist/server.js > "$LOGFILE" 2>&1 & 29 + echo $! > "$PIDFILE" 30 + echo "$PORT" > "$PORTFILE" 31 + 32 + # Wait for P2PDS_READY or failure 33 + for i in $(seq 1 30); do 34 + if grep -q "P2PDS_READY" "$LOGFILE" 2>/dev/null; then 35 + echo "" 36 + cat "$LOGFILE" 37 + echo "" 38 + echo "PID: $(cat "$PIDFILE") PORT: $PORT" 39 + exit 0 40 + fi 41 + if grep -q "Error:" "$LOGFILE" 2>/dev/null; then 42 + echo "Node $NODE failed to start:" 43 + cat "$LOGFILE" 44 + rm -f "$PIDFILE" "$PORTFILE" 45 + exit 1 46 + fi 47 + sleep 1 48 + done 49 + 50 + echo "Node $NODE startup timed out. Log:" 51 + cat "$LOGFILE" 52 + rm -f "$PIDFILE" "$PORTFILE" 53 + exit 1
+20
scripts/stop-both.sh
··· 1 + #!/bin/bash 2 + # Stop both p2pds nodes using pid files, with fallback to pgrep 3 + for node in node1 node2; do 4 + pidfile="/tmp/p2pds-${node}.pid" 5 + portfile="/tmp/p2pds-${node}.port" 6 + if [ -f "$pidfile" ]; then 7 + pid=$(cat "$pidfile") 8 + kill "$pid" 2>/dev/null && echo "Stopped $node (pid $pid)" 9 + rm -f "$pidfile" "$portfile" 10 + fi 11 + done 12 + 13 + # Fallback: kill any remaining p2pds server processes (compiled and tsx) 14 + for pattern in "node dist/server" "tsx.*src/server" "tsx watch"; do 15 + for pid in $(pgrep -f "$pattern" 2>/dev/null); do 16 + kill "$pid" 2>/dev/null && echo "Stopped orphaned process $pid ($pattern)" 17 + done 18 + done 19 + 20 + sleep 2
+56
scripts/test-add-did.sh
··· 1 + #!/bin/bash 2 + # Test adding a DID to a running p2pds node 3 + # Usage: scripts/test-add-did.sh [port] <did> 4 + # If port is omitted, reads from /tmp/p2pds-node1.port 5 + 6 + if [ $# -eq 2 ]; then 7 + PORT=$1; DID=$2 8 + elif [ $# -eq 1 ]; then 9 + if [ -f /tmp/p2pds-node1.port ]; then 10 + PORT=$(cat /tmp/p2pds-node1.port) 11 + else 12 + echo "Usage: test-add-did.sh [port] <did>" 13 + echo "No port given and /tmp/p2pds-node1.port not found. Start a node first." 14 + exit 1 15 + fi 16 + DID=$1 17 + else 18 + echo "Usage: test-add-did.sh [port] <did>" 19 + exit 1 20 + fi 21 + 22 + # Extract auth token from dashboard HTML 23 + HTML=$(curl -s "http://localhost:${PORT}/") 24 + TOKEN=$(echo "$HTML" | grep 'const TOKEN' | sed 's/.*const TOKEN = "//;s/".*//') 25 + if [ -z "$TOKEN" ]; then 26 + echo "ERROR: Could not extract auth token from http://localhost:${PORT}/" 27 + exit 1 28 + fi 29 + echo "Auth token: ${TOKEN:0:10}..." 30 + 31 + echo "" 32 + echo "Offering DID: $DID" 33 + RESPONSE=$(curl -s -X POST "http://localhost:${PORT}/xrpc/org.p2pds.app.offerDid" \ 34 + -H "Authorization: Bearer $TOKEN" \ 35 + -H "Content-Type: application/json" \ 36 + -d "{\"did\":\"$DID\"}") 37 + echo "Response: $RESPONSE" 38 + 39 + echo "" 40 + echo "Checking overview..." 41 + curl -s "http://localhost:${PORT}/xrpc/org.p2pds.app.getOverview" \ 42 + -H "Authorization: Bearer $TOKEN" | python3 -c " 43 + import json, sys 44 + d = json.load(sys.stdin) 45 + print('DID:', d.get('did', '(none)')) 46 + r = d.get('replication', {}) 47 + print('Tracked DIDs:', r.get('trackedDids', [])) 48 + states = r.get('syncStates', []) 49 + for s in states: 50 + print(f\" {s['did']}: status={s.get('status','?')}\") 51 + offered = d.get('offeredDids', []) 52 + if offered: 53 + print('Offered DIDs:') 54 + for o in offered: 55 + print(f\" {o['did']}: offered at {o.get('offeredAt','?')}\") 56 + "
+39 -36
src/ipfs.ts
··· 1 1 import { CID } from "multiformats"; 2 - import { FsBlockstore } from "blockstore-fs"; 3 - import { FsDatastore } from "datastore-fs"; 4 2 import type { Helia } from "@helia/interface"; 5 3 import type { BlockMap } from "@atproto/repo"; 4 + import type Database from "better-sqlite3"; 6 5 import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js"; 6 + import { SqliteBlockstore } from "./sqlite-blockstore.js"; 7 + import { SqliteDatastore } from "./sqlite-datastore.js"; 7 8 import type { RateLimiter } from "./rate-limiter.js"; 8 9 import { DEFAULT_RATE_LIMIT_CONFIG } from "./rate-limiter.js"; 9 10 ··· 86 87 } 87 88 88 89 export interface IpfsConfig { 89 - blocksPath: string; 90 - datastorePath: string; 90 + db: Database.Database; 91 91 networking: boolean; 92 92 } 93 93 ··· 103 103 */ 104 104 export class IpfsService implements BlockStore, NetworkService { 105 105 private helia: Helia | null = null; 106 - private blockstore: FsBlockstore | null = null; 106 + private blockstore: SqliteBlockstore | null = null; 107 107 private config: IpfsConfig; 108 108 private running = false; 109 109 private commitHandlers: CommitNotificationHandler[] = []; ··· 124 124 } 125 125 126 126 async start(): Promise<void> { 127 - this.blockstore = new FsBlockstore(this.config.blocksPath); 127 + this.blockstore = new SqliteBlockstore(this.config.db); 128 128 129 129 if (this.config.networking) { 130 - const { createHelia, libp2pDefaults } = await import("helia"); 131 - const { gossipsub } = await import("@libp2p/gossipsub"); 132 - const datastore = new FsDatastore(this.config.datastorePath); 130 + const { createHelia } = await import("helia"); 131 + const { tcp } = await import("@libp2p/tcp"); 132 + const { noise } = await import("@chainsafe/libp2p-noise"); 133 + const { yamux } = await import("@chainsafe/libp2p-yamux"); 134 + const { identify } = await import("@libp2p/identify"); 135 + const datastore = new SqliteDatastore(this.config.db); 133 136 134 - const libp2pConfig = libp2pDefaults(); 135 - libp2pConfig.services.pubsub = gossipsub({ 136 - emitSelf: false, 137 - allowPublishToZeroTopicPeers: true, 138 - maxInboundDataLength: MAX_GOSSIPSUB_MESSAGE_SIZE, 139 - }); 140 - // Connection manager limits 141 - libp2pConfig.connectionManager = { 142 - ...libp2pConfig.connectionManager, 143 - maxConnections: 100, 144 - maxIncomingPendingConnections: 10, 145 - inboundConnectionThreshold: 5, 146 - }; 147 - 137 + // Minimal libp2p config: only what we need for direct peer connections. 138 + // No DHT, gossipsub, relay, autoNAT, UPnP, WebRTC — those peg CPU 139 + // connecting to random peers. We dial p2pds peers directly. 140 + // Cast SQLite stores to Helia's expected interfaces. 141 + // Our implementations are duck-type compatible but use Promise 142 + // returns instead of AwaitGenerator, which Helia handles fine at runtime. 148 143 this.helia = await createHelia({ 149 - libp2p: libp2pConfig, 150 - blockstore: this.blockstore, 151 - datastore, 144 + libp2p: { 145 + addresses: { listen: ["/ip4/0.0.0.0/tcp/0"] }, 146 + transports: [tcp()], 147 + connectionEncrypters: [noise()], 148 + streamMuxers: [yamux()], 149 + services: { 150 + identify: identify(), 151 + }, 152 + connectionManager: { 153 + maxConnections: 20, 154 + maxIncomingPendingConnections: 5, 155 + inboundConnectionThreshold: 3, 156 + }, 157 + }, 158 + blockstore: this.blockstore as any, 159 + datastore: datastore as any, 152 160 }); 153 - this.setupGossipsubHandler(); 154 - } else { 155 - await this.blockstore.open(); 156 161 } 157 162 158 163 this.running = true; ··· 162 167 if (this.helia) { 163 168 await this.helia.stop(); 164 169 this.helia = null; 165 - } else if (this.blockstore) { 166 - await this.blockstore.close(); 167 170 } 168 171 this.blockstore = null; 169 172 this.running = false; ··· 195 198 if (!this.blockstore) return null; 196 199 try { 197 200 const cid = CID.parse(cidStr); 198 - // get() returns AsyncGenerator<Uint8Array>; collect chunks 201 + // Both Helia's blockstore and SqliteBlockstore return AsyncGenerator<Uint8Array> 199 202 const gen = this.helia 200 203 ? this.helia.blockstore.get(cid, { offline: true }) 201 204 : this.blockstore.get(cid); ··· 222 225 if (!this.blockstore) return false; 223 226 try { 224 227 const cid = CID.parse(cidStr); 225 - const store = this.helia 226 - ? this.helia.blockstore 227 - : this.blockstore; 228 - return await store.has(cid); 228 + if (this.helia) { 229 + return await this.helia.blockstore.has(cid); 230 + } 231 + return await this.blockstore.has(cid); 229 232 } catch { 230 233 return false; 231 234 }
+5
src/middleware/auth.ts
··· 47 47 return next(); 48 48 } 49 49 50 + // Legacy JWT auth requires PDS_HOSTNAME and JWT_SECRET 51 + if (!c.env.PDS_HOSTNAME || !c.env.JWT_SECRET) { 52 + return c.json({ error: "AuthenticationRequired", message: "Not authenticated" }, 401); 53 + } 54 + 50 55 const serviceDid = `did:web:${c.env.PDS_HOSTNAME}`; 51 56 52 57 // Try session JWT verification (HS256, signed with JWT_SECRET)
+10 -8
src/server.ts
··· 1 1 import { loadConfig } from "./config.js"; 2 - import { startServer } from "./start.js"; 2 + import { startServer, type ServerHandle } from "./start.js"; 3 3 4 4 const config = loadConfig(); 5 - const handle = await startServer(config); 6 5 7 - process.on("SIGINT", () => { 8 - handle.close().finally(() => process.exit(0)); 9 - }); 10 - process.on("SIGTERM", () => { 11 - handle.close().finally(() => process.exit(0)); 12 - }); 6 + // Register signal handlers BEFORE await so SIGTERM during startup exits cleanly 7 + let handle: ServerHandle | undefined; 8 + const shutdown = () => { 9 + (handle?.close() ?? Promise.resolve()).finally(() => process.exit(0)); 10 + }; 11 + process.on("SIGINT", shutdown); 12 + process.on("SIGTERM", shutdown); 13 + 14 + handle = await startServer(config);
+86
src/sqlite-blockstore.ts
··· 1 + /** 2 + * SQLite-backed Blockstore for Helia. 3 + * 4 + * Replaces FsBlockstore to avoid creating thousands of tiny files 5 + * that hammer macOS fseventsd. All blocks stored in a single SQLite table. 6 + */ 7 + 8 + import type Database from "better-sqlite3"; 9 + import { CID } from "multiformats"; 10 + 11 + export class SqliteBlockstore { 12 + private db: Database.Database; 13 + 14 + constructor(db: Database.Database) { 15 + this.db = db; 16 + this.db.exec(` 17 + CREATE TABLE IF NOT EXISTS ipfs_blocks ( 18 + cid TEXT PRIMARY KEY, 19 + bytes BLOB NOT NULL 20 + ) 21 + `); 22 + } 23 + 24 + async put(key: CID, val: Uint8Array): Promise<CID> { 25 + this.db 26 + .prepare("INSERT OR REPLACE INTO ipfs_blocks (cid, bytes) VALUES (?, ?)") 27 + .run(key.toString(), Buffer.from(val)); 28 + return key; 29 + } 30 + 31 + async * get(key: CID): AsyncGenerator<Uint8Array> { 32 + const row = this.db 33 + .prepare("SELECT bytes FROM ipfs_blocks WHERE cid = ?") 34 + .get(key.toString()) as { bytes: Buffer } | undefined; 35 + if (!row) { 36 + throw new Error(`Block not found: ${key.toString()}`); 37 + } 38 + yield new Uint8Array(row.bytes); 39 + } 40 + 41 + async has(key: CID): Promise<boolean> { 42 + const row = this.db 43 + .prepare("SELECT 1 FROM ipfs_blocks WHERE cid = ?") 44 + .get(key.toString()); 45 + return row !== undefined; 46 + } 47 + 48 + async delete(key: CID): Promise<void> { 49 + this.db 50 + .prepare("DELETE FROM ipfs_blocks WHERE cid = ?") 51 + .run(key.toString()); 52 + } 53 + 54 + async * putMany(source: AsyncIterable<{ cid: CID; block: Uint8Array }> | Iterable<{ cid: CID; block: Uint8Array }>): AsyncGenerator<CID> { 55 + for await (const { cid, block } of source) { 56 + await this.put(cid, block); 57 + yield cid; 58 + } 59 + } 60 + 61 + async * getMany(source: AsyncIterable<CID> | Iterable<CID>): AsyncGenerator<{ cid: CID; block: Uint8Array }> { 62 + for await (const cid of source) { 63 + let block: Uint8Array = new Uint8Array(0); 64 + for await (const chunk of this.get(cid)) { 65 + block = chunk; 66 + } 67 + yield { cid, block }; 68 + } 69 + } 70 + 71 + async * deleteMany(source: AsyncIterable<CID> | Iterable<CID>): AsyncGenerator<CID> { 72 + for await (const cid of source) { 73 + await this.delete(cid); 74 + yield cid; 75 + } 76 + } 77 + 78 + async * getAll(): AsyncGenerator<{ cid: CID; block: Uint8Array }> { 79 + const rows = this.db 80 + .prepare("SELECT cid, bytes FROM ipfs_blocks") 81 + .all() as Array<{ cid: string; bytes: Buffer }>; 82 + for (const row of rows) { 83 + yield { cid: CID.parse(row.cid), block: new Uint8Array(row.bytes) }; 84 + } 85 + } 86 + }
+128
src/sqlite-datastore.ts
··· 1 + /** 2 + * SQLite-backed Datastore for libp2p. 3 + * 4 + * Replaces FsDatastore to avoid filesystem churn. 5 + * Stores libp2p peer/routing data in a single SQLite table. 6 + */ 7 + 8 + import type Database from "better-sqlite3"; 9 + import { Key } from "interface-datastore"; 10 + import type { Pair, Query, KeyQuery, Batch } from "interface-datastore"; 11 + type DatastorePair = Pair; 12 + 13 + export class SqliteDatastore { 14 + private db: Database.Database; 15 + 16 + constructor(db: Database.Database) { 17 + this.db = db; 18 + this.db.exec(` 19 + CREATE TABLE IF NOT EXISTS ipfs_datastore ( 20 + key TEXT PRIMARY KEY, 21 + value BLOB NOT NULL 22 + ) 23 + `); 24 + } 25 + 26 + async put(key: Key, val: Uint8Array): Promise<Key> { 27 + this.db 28 + .prepare("INSERT OR REPLACE INTO ipfs_datastore (key, value) VALUES (?, ?)") 29 + .run(key.toString(), Buffer.from(val)); 30 + return key; 31 + } 32 + 33 + async get(key: Key): Promise<Uint8Array> { 34 + const row = this.db 35 + .prepare("SELECT value FROM ipfs_datastore WHERE key = ?") 36 + .get(key.toString()) as { value: Buffer } | undefined; 37 + if (!row) { 38 + throw new Error(`Key not found: ${key.toString()}`); 39 + } 40 + return new Uint8Array(row.value); 41 + } 42 + 43 + async has(key: Key): Promise<boolean> { 44 + const row = this.db 45 + .prepare("SELECT 1 FROM ipfs_datastore WHERE key = ?") 46 + .get(key.toString()); 47 + return row !== undefined; 48 + } 49 + 50 + async delete(key: Key): Promise<void> { 51 + this.db 52 + .prepare("DELETE FROM ipfs_datastore WHERE key = ?") 53 + .run(key.toString()); 54 + } 55 + 56 + async * putMany(source: AsyncIterable<DatastorePair> | Iterable<DatastorePair>): AsyncGenerator<Key> { 57 + for await (const { key, value } of source) { 58 + await this.put(key, value); 59 + yield key; 60 + } 61 + } 62 + 63 + async * getMany(source: AsyncIterable<Key> | Iterable<Key>): AsyncGenerator<DatastorePair> { 64 + for await (const key of source) { 65 + const value = await this.get(key); 66 + yield { key, value }; 67 + } 68 + } 69 + 70 + async * deleteMany(source: AsyncIterable<Key> | Iterable<Key>): AsyncGenerator<Key> { 71 + for await (const key of source) { 72 + await this.delete(key); 73 + yield key; 74 + } 75 + } 76 + 77 + query(q: Query): AsyncIterable<DatastorePair> { 78 + const self = this; 79 + return (async function* () { 80 + const prefix = q.prefix ?? "/"; 81 + const rows = self.db 82 + .prepare("SELECT key, value FROM ipfs_datastore WHERE key LIKE ?") 83 + .all(prefix + "%") as Array<{ key: string; value: Buffer }>; 84 + for (const row of rows) { 85 + yield { key: new Key(row.key), value: new Uint8Array(row.value) }; 86 + } 87 + })(); 88 + } 89 + 90 + queryKeys(q: KeyQuery): AsyncIterable<Key> { 91 + const self = this; 92 + return (async function* () { 93 + const prefix = q.prefix ?? "/"; 94 + const rows = self.db 95 + .prepare("SELECT key FROM ipfs_datastore WHERE key LIKE ?") 96 + .all(prefix + "%") as Array<{ key: string }>; 97 + for (const row of rows) { 98 + yield new Key(row.key); 99 + } 100 + })(); 101 + } 102 + 103 + batch(): Batch { 104 + const ops: Array<{ type: "put"; key: Key; value: Uint8Array } | { type: "delete"; key: Key }> = []; 105 + return { 106 + put: (key: Key, value: Uint8Array) => { 107 + ops.push({ type: "put", key, value }); 108 + }, 109 + delete: (key: Key) => { 110 + ops.push({ type: "delete", key }); 111 + }, 112 + commit: async () => { 113 + const insertStmt = this.db.prepare("INSERT OR REPLACE INTO ipfs_datastore (key, value) VALUES (?, ?)"); 114 + const deleteStmt = this.db.prepare("DELETE FROM ipfs_datastore WHERE key = ?"); 115 + const tx = this.db.transaction(() => { 116 + for (const op of ops) { 117 + if (op.type === "put") { 118 + insertStmt.run(op.key.toString(), Buffer.from(op.value)); 119 + } else { 120 + deleteStmt.run(op.key.toString()); 121 + } 122 + } 123 + }); 124 + tx(); 125 + }, 126 + }; 127 + } 128 + }
+12
src/xrpc/server.ts
··· 23 23 c: Context<AppEnv>, 24 24 repoManager: RepoManager, 25 25 ): Promise<Response> { 26 + if (!c.env.PASSWORD_HASH || !c.env.JWT_SECRET || !c.env.PDS_HOSTNAME) { 27 + return c.json({ error: "NotSupported", message: "Legacy auth not configured — use OAuth" }, 501); 28 + } 29 + 26 30 const body = await c.req.json<{ 27 31 identifier: string; 28 32 password: string; ··· 89 93 c: Context<AppEnv>, 90 94 repoManager: RepoManager, 91 95 ): Promise<Response> { 96 + if (!c.env.JWT_SECRET || !c.env.PDS_HOSTNAME) { 97 + return c.json({ error: "NotSupported", message: "Legacy auth not configured — use OAuth" }, 501); 98 + } 99 + 92 100 const authHeader = c.req.header("Authorization"); 93 101 94 102 if (!authHeader?.startsWith("Bearer ")) { ··· 163 171 c: Context<AppEnv>, 164 172 repoManager: RepoManager, 165 173 ): Promise<Response> { 174 + if (!c.env.JWT_SECRET || !c.env.PDS_HOSTNAME) { 175 + return c.json({ error: "NotSupported", message: "Legacy auth not configured — use OAuth" }, 501); 176 + } 177 + 166 178 const authHeader = c.req.header("Authorization"); 167 179 168 180 if (!authHeader?.startsWith("Bearer ")) {