🌿 Collaborative wiki on ATProto lichen.wiki
atproto
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Move to use atcute jetstream library instead of relay

juprodh 67bf50e9 fbf06e13

+138 -116
+28 -3
bun.lock
··· 6 6 "name": "atwiki", 7 7 "dependencies": { 8 8 "@atcute/cid": "^2.4.1", 9 + "@atcute/jetstream": "^1.1.2", 9 10 "@atcute/tid": "^1.1.2", 10 11 "@atproto/api": "^0.13.0", 11 12 "@atproto/identity": "^0.4.12", 12 13 "@atproto/jwk-jose": "^0.1.0", 13 14 "@atproto/oauth-client-node": "^0.1.0", 14 - "@atproto/sync": "^0.1.40", 15 15 "@codemirror/lang-markdown": "^6.5.0", 16 16 "@codemirror/state": "^6.0.0", 17 17 "@elysiajs/static": "^1.4.7", ··· 23 23 "katex": "^0.16.45", 24 24 "markdown-it": "^14.1.1", 25 25 "sharp": "^0.34.5", 26 - "ws": "^8.0.0", 27 26 }, 28 27 "devDependencies": { 29 28 "@atproto/dev-env": "0.3.213", 29 + "@atproto/sync": "^0.1.40", 30 30 "@biomejs/biome": "^2.4.4", 31 31 "@tailwindcss/cli": "^4.2.1", 32 32 "@tailwindcss/typography": "^0.5.16", ··· 38 38 "better-sqlite3": "12.8.0", 39 39 "knip": "^6.0.2", 40 40 "tailwindcss": "^4.2.1", 41 + "ws": "^8.0.0", 41 42 }, 42 43 "peerDependencies": { 43 44 "typescript": "^5", ··· 55 56 "packages": { 56 57 "@atcute/cid": ["@atcute/cid@2.4.1", "", { "dependencies": { "@atcute/multibase": "^1.1.8", "@atcute/uint8array": "^1.1.1" } }, "sha512-bwhna69RCv7yetXudtj+2qrMPYvhhIQqvJz6YUpUS98v7OdF3X2dnye9Nig2NDrklZcuyOsu7sQo7GOykJXRLQ=="], 57 58 59 + "@atcute/jetstream": ["@atcute/jetstream@1.1.2", "", { "dependencies": { "@atcute/lexicons": "^1.2.2", "@badrap/valita": "^0.4.6", "@mary-ext/event-iterator": "^1.0.0", "@mary-ext/simple-event-emitter": "^1.0.0", "partysocket": "^1.1.5", "type-fest": "^4.41.0", "yocto-queue": "^1.2.1" } }, "sha512-u6p/h2xppp7LE6W/9xErAJ6frfN60s8adZuCKtfAaaBBiiYbb1CfpzN8Uc+2qtJZNorqGvuuDb5572Jmh7yHBQ=="], 60 + 61 + "@atcute/lexicons": ["@atcute/lexicons@1.3.0", "", { "dependencies": { "@atcute/uint8array": "^1.1.1", "@atcute/util-text": "^1.2.0", "@standard-schema/spec": "^1.1.0", "esm-env": "^1.2.2" } }, "sha512-Eq5y+9onnCXNVUlNiMf31beSXHKqptB7lUo/68YbhlmxdaR7ooywHmahya9goP5AsmlYEA1z+dRPXIDAa9O7cg=="], 62 + 58 63 "@atcute/multibase": ["@atcute/multibase@1.2.0", "", { "dependencies": { "@atcute/uint8array": "^1.1.1" } }, "sha512-ZK2GRra+qIYq9nNuQB52m2ul0hOmCQEtPobGfTSUxm7pF0OGEkWGkWHugFhNEDVzHzTwPxHp6VGotdZFue4lYQ=="], 59 64 60 65 "@atcute/tid": ["@atcute/tid@1.1.2", "", { "dependencies": { "@atcute/time-ms": "^1.2.2" } }, "sha512-bmPuOX/TOfcm/vsK9vM98spjkcx2wgd9S2PeK5oLgEr8IbNRPq7iMCAPzOL1nu5XAW3LlkOYQEbYRcw5vcQ37w=="], ··· 62 67 "@atcute/time-ms": ["@atcute/time-ms@1.3.2", "", {}, "sha512-F+qOyR9pO55g1d/QmN+Gr+fimoUQQLusdGSB6pjV0wW5KPILR4oQ4e2ZhWzqUbeHLAgWvgoTTMsMDdz62Xa2tg=="], 63 68 64 69 "@atcute/uint8array": ["@atcute/uint8array@1.1.1", "", {}, "sha512-3LsC8XB8TKe9q/5hOA5sFuzGaIFdJZJNewC5OKa3o/eU6+K7JR6see9Zy2JbQERNVnRl11EzbNov1efgLMAs4g=="], 70 + 71 + "@atcute/util-text": ["@atcute/util-text@1.3.1", "", { "dependencies": { "unicode-segmenter": "^0.14.5" } }, "sha512-MRgJXkx67znuBXuoAYCJkBZyd3OApL7zZlNf5kXhuoCXcdiu1nblRDycYTADSkym4epBSQWxh26kmI9sewaq6A=="], 65 72 66 73 "@atproto-labs/did-resolver": ["@atproto-labs/did-resolver@0.1.4", "", { "dependencies": { "@atproto-labs/fetch": "0.1.1", "@atproto-labs/pipe": "0.1.0", "@atproto-labs/simple-store": "0.1.1", "@atproto-labs/simple-store-memory": "0.1.1", "@atproto/did": "0.1.2", "zod": "^3.23.8" } }, "sha512-5d+LHScS2ueYsFRjMOC3c1EwM2ui1yBVbBA0yY3MH7aydbljm5D28scsOVuymIhHwPFwcGvZbMON4PVSfpBbbQ=="], 67 74 ··· 245 252 246 253 "@aws/lambda-invoke-store": ["@aws/lambda-invoke-store@0.2.4", "", {}, "sha512-iY8yvjE0y651BixKNPgmv1WrQc+GZ142sb0z4gYnChDDY2YqI4P/jsSopBWrKfAt7LOJAkOXt7rC/hms+WclQQ=="], 247 254 255 + "@badrap/valita": ["@badrap/valita@0.4.6", "", {}, "sha512-4kdqcjyxo/8RQ8ayjms47HCWZIF5981oE5nIenbfThKDxWXtEHKipAOWlflpPJzZx9y/JWYQkp18Awr7VuepFg=="], 256 + 248 257 "@biomejs/biome": ["@biomejs/biome@2.4.4", "", { "optionalDependencies": { "@biomejs/cli-darwin-arm64": "2.4.4", "@biomejs/cli-darwin-x64": "2.4.4", "@biomejs/cli-linux-arm64": "2.4.4", "@biomejs/cli-linux-arm64-musl": "2.4.4", "@biomejs/cli-linux-x64": "2.4.4", "@biomejs/cli-linux-x64-musl": "2.4.4", "@biomejs/cli-win32-arm64": "2.4.4", "@biomejs/cli-win32-x64": "2.4.4" }, "bin": { "biome": "bin/biome" } }, "sha512-tigwWS5KfJf0cABVd52NVaXyAVv4qpUXOWJ1rxFL8xF1RVoeS2q/LK+FHgYoKMclJCuRoCWAPy1IXaN9/mS61Q=="], 249 258 250 259 "@biomejs/cli-darwin-arm64": ["@biomejs/cli-darwin-arm64@2.4.4", "", { "os": "darwin", "cpu": "arm64" }, "sha512-jZ+Xc6qvD6tTH5jM6eKX44dcbyNqJHssfl2nnwT6vma6B1sj7ZLTGIk6N5QwVBs5xGN52r3trk5fgd3sQ9We9A=="], ··· 411 420 412 421 "@marijn/find-cluster-break": ["@marijn/find-cluster-break@1.0.2", "", {}, "sha512-l0h88YhZFyKdXIFNfSWpyjStDjGHwZ/U7iobcK1cQQD8sejsONdQtTVU+1wVN1PBw40PiiHB1vA5S7VTfQiP9g=="], 413 422 423 + "@mary-ext/event-iterator": ["@mary-ext/event-iterator@1.0.0", "", { "dependencies": { "yocto-queue": "^1.2.1" } }, "sha512-l6gCPsWJ8aRCe/s7/oCmero70kDHgIK5m4uJvYgwEYTqVxoBOIXbKr5tnkLqUHEg6mNduB4IWvms3h70Hp9ADQ=="], 424 + 425 + "@mary-ext/simple-event-emitter": ["@mary-ext/simple-event-emitter@1.0.1", "", {}, "sha512-9+VvZisxZ/gSg+JJH7hmXaA8Qj42Qjz3O58RSB+INYc8iLA0icATZxHB9vKbj59ojDGZjO3hCKzMXocx3L0H8w=="], 426 + 414 427 "@napi-rs/wasm-runtime": ["@napi-rs/wasm-runtime@1.1.1", "", { "dependencies": { "@emnapi/core": "^1.7.1", "@emnapi/runtime": "^1.7.1", "@tybys/wasm-util": "^0.10.1" } }, "sha512-p64ah1M1ld8xjWv3qbvFwHiFVWrq1yFvV4f7w+mzaqiR4IlSgkqhcRdHwsGgomwzBH51sRY4NEowLxnaBjcW/A=="], 415 428 416 429 "@noble/curves": ["@noble/curves@1.9.7", "", { "dependencies": { "@noble/hashes": "1.8.0" } }, "sha512-gbKGcRUYIjA3/zCCNaWDciTMFI0dCkvou3TL8Zmy5Nc7sJ47a0jtOeZoTaMxkuqRo9cRhjOdZJXegxYE5FN/xw=="], ··· 664 677 "@smithy/util-waiter": ["@smithy/util-waiter@4.2.13", "", { "dependencies": { "@smithy/abort-controller": "^4.2.12", "@smithy/types": "^4.13.1", "tslib": "^2.6.2" } }, "sha512-2zdZ9DTHngRtcYxJK1GUDxruNr53kv5W2Lupe0LMU+Imr6ohQg8M2T14MNkj1Y0wS3FFwpgpGQyvuaMF7CiTmQ=="], 665 678 666 679 "@smithy/uuid": ["@smithy/uuid@1.1.2", "", { "dependencies": { "tslib": "^2.6.2" } }, "sha512-O/IEdcCUKkubz60tFbGA7ceITTAJsty+lBjNoorP4Z6XRqaFb/OjQjZODophEcuq68nKm6/0r+6/lLQ+XVpk8g=="], 680 + 681 + "@standard-schema/spec": ["@standard-schema/spec@1.1.0", "", {}, "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w=="], 667 682 668 683 "@tailwindcss/cli": ["@tailwindcss/cli@4.2.1", "", { "dependencies": { "@parcel/watcher": "^2.5.1", "@tailwindcss/node": "4.2.1", "@tailwindcss/oxide": "4.2.1", "enhanced-resolve": "^5.19.0", "mri": "^1.2.0", "picocolors": "^1.1.1", "tailwindcss": "4.2.1" }, "bin": { "tailwindcss": "dist/index.mjs" } }, "sha512-b7MGn51IA80oSG+7fuAgzfQ+7pZBgjzbqwmiv6NO7/+a1sev32cGqnwhscT7h0EcAvMa9r7gjRylqOH8Xhc4DA=="], 669 684 ··· 975 990 976 991 "escape-string-regexp": ["escape-string-regexp@5.0.0", "", {}, "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw=="], 977 992 993 + "esm-env": ["esm-env@1.2.2", "", {}, "sha512-Epxrv+Nr/CaL4ZcFGPJIYLWFom+YeV1DqMLHJoEd9SYRxNbaFruBwfEX/kkHUJf55j2+TUbmDcmuilbP1TmXHA=="], 994 + 978 995 "etag": ["etag@1.8.1", "", {}, "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg=="], 979 996 980 997 "etcd3": ["etcd3@1.1.2", "", { "dependencies": { "@grpc/grpc-js": "^1.8.20", "@grpc/proto-loader": "^0.7.8", "bignumber.js": "^9.1.1", "cockatiel": "^3.1.1" } }, "sha512-YIampCz1/OmrVo/tR3QltAVUtYCQQOSFoqmHKKeoHbalm+WdXe3l4rhLIylklu8EzR/I3PBiOF4dC847dDskKg=="], 998 + 999 + "event-target-polyfill": ["event-target-polyfill@0.0.4", "", {}, "sha512-Gs6RLjzlLRdT8X9ZipJdIZI/Y6/HhRLyq9RdDlCsnpxr/+Nn6bU2EFGuC94GjxqhM+Nmij2Vcq98yoHrU8uNFQ=="], 981 1000 982 1001 "event-target-shim": ["event-target-shim@5.0.1", "", {}, "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ=="], 983 1002 ··· 1253 1272 1254 1273 "parseurl": ["parseurl@1.3.3", "", {}, "sha512-CiyeOxFT/JZyN5m0z9PfXw4SCBJ6Sygz1Dpl0wqjlhDEGGBP1GnsUVEL0p63hoG1fcj3fHynXi9NYO4nWOL+qQ=="], 1255 1274 1275 + "partysocket": ["partysocket@1.1.18", "", { "dependencies": { "event-target-polyfill": "^0.0.4" }, "peerDependencies": { "react": ">=17" }, "optionalPeers": ["react"] }, "sha512-SyuvH9VavWOSa14v6dYdp3yfSUDII4BQB1+TkGOFBkjfZKjnDBiba4fhdhwBlqGBkqw4ea3gTA1DYhSffX24Wg=="], 1276 + 1256 1277 "path-expression-matcher": ["path-expression-matcher@1.1.3", "", {}, "sha512-qdVgY8KXmVdJZRSS1JdEPOKPdTiEK/pi0RkcT2sw1RhXxohdujUlJFPuS1TSkevZ9vzd3ZlL7ULl1MHGTApKzQ=="], 1257 1278 1258 1279 "path-key": ["path-key@3.1.1", "", {}, "sha512-ojmeN0qd+y0jszEtoY48r0Peq5dwMEkIlCOu6Q5f41lfkswXuKtYrhgoTpLnyIcHm24Uhqx+5Tqm2InSwLhE6Q=="], ··· 1457 1478 1458 1479 "tunnel-agent": ["tunnel-agent@0.6.0", "", { "dependencies": { "safe-buffer": "^5.0.1" } }, "sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w=="], 1459 1480 1460 - "type-fest": ["type-fest@2.19.0", "", {}, "sha512-RAH822pAdBgcNMAfWnCBU3CFZcfZ/i1eZjwFU/dsLKumyuuP3niueg2UAukXYF0E2AAoc82ZSSf9J0WQBinzHA=="], 1481 + "type-fest": ["type-fest@4.41.0", "", {}, "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA=="], 1461 1482 1462 1483 "type-is": ["type-is@1.6.18", "", { "dependencies": { "media-typer": "0.3.0", "mime-types": "~2.1.24" } }, "sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g=="], 1463 1484 ··· 1517 1538 1518 1539 "yargs-parser": ["yargs-parser@21.1.1", "", {}, "sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw=="], 1519 1540 1541 + "yocto-queue": ["yocto-queue@1.2.2", "", {}, "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ=="], 1542 + 1520 1543 "zod": ["zod@4.3.6", "", {}, "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg=="], 1521 1544 1522 1545 "@atproto-labs/did-resolver/@atproto-labs/simple-store-memory": ["@atproto-labs/simple-store-memory@0.1.1", "", { "dependencies": { "@atproto-labs/simple-store": "0.1.1", "lru-cache": "^10.2.0" } }, "sha512-PCRqhnZ8NBNBvLku53O56T0lsVOtclfIrQU/rwLCc4+p45/SBPrRYNBi6YFq5rxZbK6Njos9MCmILV/KLQxrWA=="], ··· 1698 1721 "express/cookie": ["cookie@0.7.2", "", {}, "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w=="], 1699 1722 1700 1723 "htmlparser2/entities": ["entities@2.2.0", "", {}, "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="], 1724 + 1725 + "http-terminator/type-fest": ["type-fest@2.19.0", "", {}, "sha512-RAH822pAdBgcNMAfWnCBU3CFZcfZ/i1eZjwFU/dsLKumyuuP3niueg2UAukXYF0E2AAoc82ZSSf9J0WQBinzHA=="], 1701 1726 1702 1727 "ioredis/debug": ["debug@4.4.3", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA=="], 1703 1728
+5 -4
package.json
··· 25 25 }, 26 26 "devDependencies": { 27 27 "@atproto/dev-env": "0.3.213", 28 + "@atproto/sync": "^0.1.40", 28 29 "@biomejs/biome": "^2.4.4", 29 30 "@tailwindcss/cli": "^4.2.1", 30 31 "@tailwindcss/typography": "^0.5.16", ··· 35 36 "@typescript/native-preview": "^7.0.0-dev.20260322.1", 36 37 "better-sqlite3": "12.8.0", 37 38 "knip": "^6.0.2", 38 - "tailwindcss": "^4.2.1" 39 + "tailwindcss": "^4.2.1", 40 + "ws": "^8.0.0" 39 41 }, 40 42 "peerDependencies": { 41 43 "typescript": "^5" ··· 50 52 }, 51 53 "dependencies": { 52 54 "@atcute/cid": "^2.4.1", 55 + "@atcute/jetstream": "^1.1.2", 53 56 "@atcute/tid": "^1.1.2", 54 57 "@atproto/api": "^0.13.0", 55 58 "@atproto/identity": "^0.4.12", 56 59 "@atproto/jwk-jose": "^0.1.0", 57 60 "@atproto/oauth-client-node": "^0.1.0", 58 - "@atproto/sync": "^0.1.40", 59 61 "@codemirror/lang-markdown": "^6.5.0", 60 62 "@codemirror/state": "^6.0.0", 61 63 "@elysiajs/static": "^1.4.7", ··· 66 68 "fflate": "^0.8.2", 67 69 "katex": "^0.16.45", 68 70 "markdown-it": "^14.1.1", 69 - "sharp": "^0.34.5", 70 - "ws": "^8.0.0" 71 + "sharp": "^0.34.5" 71 72 } 72 73 }
+4 -2
src/atproto/env.ts
··· 18 18 return getAtprotoEnv() !== null; 19 19 } 20 20 21 - export function getRelayUrl(): string { 22 - return process.env["RELAY_URL"] ?? "wss://bsky.network"; 21 + export function getJetstreamUrl(): string { 22 + return ( 23 + process.env["JETSTREAM_URL"] ?? "wss://jetstream2.us-east.bsky.network" 24 + ); 23 25 } 24 26 25 27 export function getHandleResolverUrl(): string {
+18 -7
src/firehose/handlers.ts
··· 1 - import type { CommitEvt } from "@atproto/sync"; 2 - import { didOwnsUri } from "../lib/at-uri.ts"; 3 1 import { COLLECTIONS, normalizeRole } from "../lib/constants.ts"; 4 2 import { LIMITS } from "../lib/limits.ts"; 5 3 import { ··· 197 195 198 196 // --- Event dispatch --- 199 197 200 - export function handleCommitEvent(evt: CommitEvt): void { 201 - const atUri = evt.uri.toString(); 202 - if (!didOwnsUri(evt.did, atUri)) return; 198 + /** 199 + * Normalized commit event consumed by the handler. Shaped to be easy to build 200 + * from either a jetstream message (production) or an `@atproto/sync` commit 201 + * (integration tests against a local PDS). 202 + */ 203 + export interface FirehoseCommit { 204 + did: string; 205 + collection: string; 206 + rkey: string; 207 + operation: "create" | "update" | "delete"; 208 + /** The record body. Required for create/update; ignored for delete. */ 209 + record?: unknown; 210 + } 203 211 204 - if (evt.event === "delete") { 212 + export function handleCommitEvent(evt: FirehoseCommit): void { 213 + const atUri = `at://${evt.did}/${evt.collection}/${evt.rkey}`; 214 + 215 + if (evt.operation === "delete") { 205 216 handleDelete(atUri, evt.collection); 206 217 return; 207 218 } ··· 236 247 } 237 248 } 238 249 239 - // --- Handlers (validation already done by type guards + didOwnsUri) --- 250 + // --- Handlers (validation already done by type guards) --- 240 251 241 252 function handleWiki( 242 253 did: string,
+59 -48
src/firehose/index.ts
··· 1 - import "../lib/ws-polyfill.ts"; 2 - 3 - import { type Event, Firehose, MemoryRunner } from "@atproto/sync"; 4 - import { getDevPdsUrl, getRelayUrl } from "../atproto/env.ts"; 1 + import { JetstreamSubscription } from "@atcute/jetstream"; 2 + import { getDevPdsUrl, getJetstreamUrl } from "../atproto/env.ts"; 5 3 import { COLLECTIONS } from "../lib/constants.ts"; 6 - import { getIdResolver } from "../lib/identity.ts"; 7 4 import { getCursor, setCursor } from "../server/db/queries/index.ts"; 8 - import { handleCommitEvent } from "./handlers.ts"; 5 + import { type FirehoseCommit, handleCommitEvent } from "./handlers.ts"; 9 6 10 - const relayUrl = getRelayUrl(); 11 - // In dev mode the PDS is ephemeral -- each run starts fresh from seq 0. 12 - // Persisting the cursor across sessions causes "FutureCursor" errors. 7 + const jetstreamUrl = getJetstreamUrl(); 8 + // In dev mode the PDS is ephemeral -- each run starts fresh. 9 + // Persisting the cursor across sessions causes the subscriber to ask jetstream 10 + // for events older than its retention window. 13 11 const isDevMode = !!getDevPdsUrl(); 14 12 const savedCursor = isDevMode ? null : getCursor(); 15 13 16 - console.log(`Firehose connecting to ${relayUrl}`); 14 + console.log(`Jetstream connecting to ${jetstreamUrl}`); 17 15 if (savedCursor) { 18 16 console.log(`Resuming from cursor ${savedCursor}`); 19 17 } 20 18 21 - const idResolver = getIdResolver(); 22 - 23 - const runner = new MemoryRunner({ 24 - ...(savedCursor != null && { startCursor: savedCursor }), 25 - setCursor: async (cursor: number) => { 26 - if (!isDevMode) setCursor(cursor); 27 - }, 19 + const subscription = new JetstreamSubscription({ 20 + url: jetstreamUrl, 21 + wantedCollections: Object.values(COLLECTIONS), 22 + ...(savedCursor != null && { cursor: savedCursor }), 28 23 }); 29 24 30 - const firehose = new Firehose({ 31 - idResolver, 32 - runner, 33 - service: relayUrl, 34 - filterCollections: Object.values(COLLECTIONS), 35 - excludeIdentity: true, 36 - excludeAccount: true, 37 - handleEvent: (evt: Event) => { 38 - if ( 39 - evt.event === "create" || 40 - evt.event === "update" || 41 - evt.event === "delete" 42 - ) { 43 - try { 44 - handleCommitEvent(evt); 45 - } catch (err) { 46 - console.error(`Error handling ${evt.collection} ${evt.event}:`, err); 47 - } 25 + // Persist the cursor every 5s so a restart resumes near where we left off. 26 + // Jetstream cursors are microsecond timestamps. 27 + const cursorInterval = setInterval(() => { 28 + if (!isDevMode && subscription.cursor != null) { 29 + setCursor(subscription.cursor); 30 + } 31 + }, 5_000); 32 + 33 + let shuttingDown = false; 34 + 35 + async function run(): Promise<void> { 36 + for await (const event of subscription) { 37 + if (event.kind !== "commit") continue; 38 + 39 + const commit: FirehoseCommit = { 40 + did: event.did, 41 + collection: event.commit.collection, 42 + rkey: event.commit.rkey, 43 + operation: event.commit.operation, 44 + record: 45 + event.commit.operation !== "delete" ? event.commit.record : undefined, 46 + }; 47 + 48 + try { 49 + handleCommitEvent(commit); 50 + } catch (err) { 51 + console.error( 52 + `Error handling ${commit.collection} ${commit.operation}:`, 53 + err, 54 + ); 48 55 } 49 - }, 50 - onError: (err: Error) => { 51 - console.error("Firehose error:", err); 52 - }, 56 + } 57 + } 58 + 59 + run().catch((err) => { 60 + if (!shuttingDown) console.error("Jetstream error:", err); 53 61 }); 62 + console.log("Jetstream subscriber running"); 54 63 55 - firehose.start(); 56 - console.log("Firehose subscriber running"); 57 - 58 - function shutdown() { 59 - console.log("Shutting down firehose..."); 60 - firehose.destroy().then(() => { 61 - console.log("Firehose stopped"); 62 - process.exit(0); 63 - }); 64 + function shutdown(): void { 65 + if (shuttingDown) return; 66 + shuttingDown = true; 67 + console.log("Shutting down jetstream..."); 68 + clearInterval(cursorInterval); 69 + if (!isDevMode && subscription.cursor != null) { 70 + setCursor(subscription.cursor); 71 + } 72 + // JetstreamSubscription closes its WebSocket when the async iterator exits; 73 + // process.exit triggers that via teardown. 74 + process.exit(0); 64 75 } 65 76 66 77 process.on("SIGINT", shutdown);
+8 -8
tests/atproto/env.test.ts
··· 5 5 getDevPdsUrl, 6 6 getDevPlcUrl, 7 7 getHandleResolverUrl, 8 - getRelayUrl, 8 + getJetstreamUrl, 9 9 isAuthEnabled, 10 10 } from "../../src/atproto/env.ts"; 11 11 ··· 14 14 const ENV_KEYS = [ 15 15 "PUBLIC_URL", 16 16 "OAUTH_PRIVATE_KEY_PATH", 17 - "RELAY_URL", 17 + "JETSTREAM_URL", 18 18 "HANDLE_RESOLVER_URL", 19 19 "DEV_PDS_URL", 20 20 "DEV_PLC_URL", ··· 75 75 }); 76 76 }); 77 77 78 - describe("getRelayUrl", () => { 79 - test("defaults to bsky.network", () => { 80 - delete process.env["RELAY_URL"]; 81 - expect(getRelayUrl()).toBe("wss://bsky.network"); 78 + describe("getJetstreamUrl", () => { 79 + test("defaults to a Bluesky jetstream instance", () => { 80 + delete process.env["JETSTREAM_URL"]; 81 + expect(getJetstreamUrl()).toBe("wss://jetstream2.us-east.bsky.network"); 82 82 }); 83 83 84 84 test("reads from env", () => { 85 - process.env["RELAY_URL"] = "wss://custom.relay"; 86 - expect(getRelayUrl()).toBe("wss://custom.relay"); 85 + process.env["JETSTREAM_URL"] = "wss://custom.jetstream"; 86 + expect(getJetstreamUrl()).toBe("wss://custom.jetstream"); 87 87 }); 88 88 }); 89 89
+9 -43
tests/firehose/handlers.test.ts
··· 1 1 import { afterAll, beforeAll, describe, expect, test } from "bun:test"; 2 - import type { CommitEvt } from "@atproto/sync"; 3 - import { handleCommitEvent } from "../../src/firehose/handlers.ts"; 2 + import { 3 + type FirehoseCommit, 4 + handleCommitEvent, 5 + } from "../../src/firehose/handlers.ts"; 4 6 import { getDb } from "../../src/server/db/index.ts"; 5 7 import { 6 8 getCurrentNote, ··· 16 18 const WIKI_AT_URI = `at://${ALICE_DID}/wiki.lichen.wiki/test-wiki`; 17 19 18 20 interface CommitEvtInput { 19 - event: string; 21 + event: "create" | "update" | "delete"; 20 22 collection: string; 21 23 rkey: string; 22 24 did?: string; 23 - uri?: { toString(): string }; 24 25 record?: Record<string, unknown>; 25 26 } 26 27 27 - function makeCommitEvt(input: CommitEvtInput): CommitEvt { 28 - const did = input.did ?? ALICE_DID; 29 - const atUri = 30 - input.uri?.toString() ?? `at://${did}/${input.collection}/${input.rkey}`; 31 - 28 + function makeCommitEvt(input: CommitEvtInput): FirehoseCommit { 32 29 return { 33 - seq: 1, 34 - time: new Date().toISOString(), 35 - commit: {} as never, 36 - blocks: {} as never, 37 - rev: "rev1", 38 - did, 30 + did: input.did ?? ALICE_DID, 39 31 collection: input.collection, 40 32 rkey: input.rkey, 41 - uri: { toString: () => atUri } as never, 42 - event: input.event, 33 + operation: input.event, 43 34 record: input.record, 44 - cid: {} as never, 45 - } as CommitEvt; 35 + }; 46 36 } 47 37 48 38 const HANDLER_TEST_WIKIS = [ ··· 125 115 const wiki = getWiki("test-wiki"); 126 116 expect(wiki?.name).toBe("Updated Wiki"); 127 117 expect(wiki?.visibility).toBe("private"); 128 - }); 129 - 130 - test("rejects event when DID does not match AT-URI", () => { 131 - handleCommitEvent( 132 - makeCommitEvt({ 133 - event: "create", 134 - collection: "wiki.lichen.wiki", 135 - rkey: "evil-wiki", 136 - did: BOB_DID, 137 - uri: { 138 - toString: () => `at://${ALICE_DID}/wiki.lichen.wiki/evil-wiki`, 139 - }, 140 - record: { 141 - name: "Evil Wiki", 142 - visibility: "public", 143 - createdAt: "2026-01-01T00:00:00.000Z", 144 - }, 145 - }), 146 - ); 147 - 148 - const wiki = getWiki("evil-wiki"); 149 - expect(wiki).toBeNull(); 150 118 }); 151 119 152 120 test("parses language field from wiki record", () => { ··· 605 573 }); 606 574 607 575 test("deletes bookmark on delete event", () => { 608 - const bookmarkUri = `at://${BOB_DID}/wiki.lichen.bookmark/bk1`; 609 576 expect(isBookmarked(BOB_DID, WIKI_AT_URI)).toBe(true); 610 577 611 578 handleCommitEvent( ··· 614 581 collection: "wiki.lichen.bookmark", 615 582 rkey: "bk1", 616 583 did: BOB_DID, 617 - uri: { toString: () => bookmarkUri }, 618 584 }), 619 585 ); 620 586
+7 -1
tests/integration/helpers.ts
··· 105 105 evt.event === "delete" 106 106 ) { 107 107 try { 108 - handleCommitEvent(evt); 108 + handleCommitEvent({ 109 + did: evt.did, 110 + collection: evt.collection, 111 + rkey: evt.rkey, 112 + operation: evt.event, 113 + record: evt.event !== "delete" ? evt.record : undefined, 114 + }); 109 115 } catch (err) { 110 116 console.error( 111 117 `[test firehose] Error handling ${evt.collection} ${evt.event}:`,