atmosphere explorer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

switch to @atcute/firehose and handle abnormal firehose disconnect

+120 -39
+1
package.json
··· 31 31 "@atcute/client": "^4.2.1", 32 32 "@atcute/crypto": "^2.3.0", 33 33 "@atcute/did-plc": "^0.3.2", 34 + "@atcute/firehose": "^0.1.0", 34 35 "@atcute/identity": "^1.1.3", 35 36 "@atcute/identity-resolver": "^1.2.2", 36 37 "@atcute/lexicon-doc": "^2.1.1",
+53
pnpm-lock.yaml
··· 32 32 '@atcute/did-plc': 33 33 specifier: ^0.3.2 34 34 version: 0.3.2 35 + '@atcute/firehose': 36 + specifier: ^0.1.0 37 + version: 0.1.0 35 38 '@atcute/identity': 36 39 specifier: ^1.1.3 37 40 version: 1.1.3 ··· 164 167 165 168 '@atcute/did-plc@0.3.2': 166 169 resolution: {integrity: sha512-zOqk5mcZJa+xnfpYFN8aIgRA5uQdTDeiDvQeWWIZKOslFBJTtWWL912FLI8r5JWzIc7SgYEp+SbpXmAB6t26EA==} 170 + 171 + '@atcute/firehose@0.1.0': 172 + resolution: {integrity: sha512-xBEKdi6rkODpCIIRpXtXhhcuQ1vTbufDykAM2kA6bmWEpuwI4acoUCg9zbiUWqd21SMMOAthu9Eh72i2VYrD7A==} 167 173 168 174 '@atcute/identity-resolver@1.2.2': 169 175 resolution: {integrity: sha512-eUh/UH4bFvuXS0X7epYCeJC/kj4rbBXfSRumLEH4smMVwNOgTo7cL/0Srty+P/qVPoZEyXdfEbS0PHJyzoXmHw==} ··· 721 727 '@marijn/find-cluster-break@1.0.2': 722 728 resolution: {integrity: sha512-l0h88YhZFyKdXIFNfSWpyjStDjGHwZ/U7iobcK1cQQD8sejsONdQtTVU+1wVN1PBw40PiiHB1vA5S7VTfQiP9g==} 723 729 730 + '@mary-ext/event-iterator@1.0.0': 731 + resolution: {integrity: sha512-l6gCPsWJ8aRCe/s7/oCmero70kDHgIK5m4uJvYgwEYTqVxoBOIXbKr5tnkLqUHEg6mNduB4IWvms3h70Hp9ADQ==} 732 + 733 + '@mary-ext/simple-event-emitter@1.0.1': 734 + resolution: {integrity: sha512-9+VvZisxZ/gSg+JJH7hmXaA8Qj42Qjz3O58RSB+INYc8iLA0icATZxHB9vKbj59ojDGZjO3hCKzMXocx3L0H8w==} 735 + 724 736 '@noble/secp256k1@3.0.0': 725 737 resolution: {integrity: sha512-NJBaR352KyIvj3t6sgT/+7xrNyF9Xk9QlLSIqUGVUYlsnDTAUqY8LOmwpcgEx4AMJXRITQ5XEVHD+mMaPfr3mg==} 726 738 ··· 1110 1122 esm-env@1.2.2: 1111 1123 resolution: {integrity: sha512-Epxrv+Nr/CaL4ZcFGPJIYLWFom+YeV1DqMLHJoEd9SYRxNbaFruBwfEX/kkHUJf55j2+TUbmDcmuilbP1TmXHA==} 1112 1124 1125 + event-target-polyfill@0.0.4: 1126 + resolution: {integrity: sha512-Gs6RLjzlLRdT8X9ZipJdIZI/Y6/HhRLyq9RdDlCsnpxr/+Nn6bU2EFGuC94GjxqhM+Nmij2Vcq98yoHrU8uNFQ==} 1127 + 1113 1128 fdir@6.5.0: 1114 1129 resolution: {integrity: sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg==} 1115 1130 engines: {node: '>=12.0.0'} ··· 1287 1302 parse5@7.3.0: 1288 1303 resolution: {integrity: sha512-IInvU7fabl34qmi9gY8XOVxhYyMyuH2xUNpb2q8/Y+7552KlejkRvqvD19nMoUW/uQGGbqNpA6Tufu5FL5BZgw==} 1289 1304 1305 + partysocket@1.1.16: 1306 + resolution: {integrity: sha512-d7xFv+ZC7x0p/DAHWJ5FhxQhimIx+ucyZY+kxL0cKddLBmK9c4p2tEA/L+dOOrWm6EYrRwrBjKQV0uSzOY9x1w==} 1307 + peerDependencies: 1308 + react: '>=17' 1309 + peerDependenciesMeta: 1310 + react: 1311 + optional: true 1312 + 1290 1313 pathe@2.0.3: 1291 1314 resolution: {integrity: sha512-WUjGcAqP1gQacoQe+OBJsFA7Ld4DyXuUIjZ5cc75cLHvJ7dtNsTugphxIADwspS+AraAUePCKrSVtPLFj/F88w==} 1292 1315 ··· 1439 1462 resolution: {integrity: sha512-pOUl6Vo2LUq/bSa8S5q7b91cgNSjctn9ugq/+Mvow99qW6x/UZYwzxy/3NmqoT66eHYfCVvFvACC58UBPFf28g==} 1440 1463 engines: {node: '>=18.0.0'} 1441 1464 hasBin: true 1465 + 1466 + type-fest@4.41.0: 1467 + resolution: {integrity: sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA==} 1468 + engines: {node: '>=16'} 1442 1469 1443 1470 typescript@5.9.3: 1444 1471 resolution: {integrity: sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==} ··· 1593 1620 '@atcute/util-fetch': 1.0.5 1594 1621 '@badrap/valita': 0.4.6 1595 1622 1623 + '@atcute/firehose@0.1.0': 1624 + dependencies: 1625 + '@atcute/cbor': 2.3.2 1626 + '@atcute/lexicons': 1.2.9 1627 + '@atcute/uint8array': 1.1.1 1628 + '@mary-ext/event-iterator': 1.0.0 1629 + '@mary-ext/simple-event-emitter': 1.0.1 1630 + partysocket: 1.1.16 1631 + type-fest: 4.41.0 1632 + transitivePeerDependencies: 1633 + - react 1634 + 1596 1635 '@atcute/identity-resolver@1.2.2(@atcute/identity@1.1.3)': 1597 1636 dependencies: 1598 1637 '@atcute/identity': 1.1.3 ··· 2119 2158 2120 2159 '@marijn/find-cluster-break@1.0.2': {} 2121 2160 2161 + '@mary-ext/event-iterator@1.0.0': 2162 + dependencies: 2163 + yocto-queue: 1.2.2 2164 + 2165 + '@mary-ext/simple-event-emitter@1.0.1': {} 2166 + 2122 2167 '@noble/secp256k1@3.0.0': {} 2123 2168 2124 2169 '@rollup/rollup-android-arm-eabi@4.59.0': ··· 2487 2532 2488 2533 esm-env@1.2.2: {} 2489 2534 2535 + event-target-polyfill@0.0.4: {} 2536 + 2490 2537 fdir@6.5.0(picomatch@4.0.3): 2491 2538 optionalDependencies: 2492 2539 picomatch: 4.0.3 ··· 2613 2660 dependencies: 2614 2661 entities: 6.0.1 2615 2662 2663 + partysocket@1.1.16: 2664 + dependencies: 2665 + event-target-polyfill: 0.0.4 2666 + 2616 2667 pathe@2.0.3: {} 2617 2668 2618 2669 picocolors@1.1.1: {} ··· 2735 2786 optionalDependencies: 2736 2787 fsevents: 2.3.3 2737 2788 optional: true 2789 + 2790 + type-fest@4.41.0: {} 2738 2791 2739 2792 typescript@5.9.3: {} 2740 2793
+66 -39
src/views/stream/index.tsx
··· 1 - import { Firehose } from "@skyware/firehose"; 1 + import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto"; 2 + import { FirehoseSubscription } from "@atcute/firehose"; 2 3 import { Title } from "@solidjs/meta"; 3 4 import { A, useLocation, useSearchParams } from "@solidjs/router"; 4 5 import { createSignal, For, onCleanup, onMount, Show } from "solid-js"; ··· 117 118 const [currentTime, setCurrentTime] = createSignal(Date.now()); 118 119 119 120 let socket: WebSocket; 120 - let firehose: Firehose; 121 + let firehoseIterator: AsyncIterator<ComAtprotoSyncSubscribeRepos.$message>; 121 122 let formRef!: HTMLFormElement; 122 123 let pendingRecords: any[] = []; 123 124 let rafId: number | null = null; ··· 159 160 160 161 const disconnect = () => { 161 162 if (!config().useFirehoseLib) socket?.close(); 162 - else firehose?.close(); 163 + else firehoseIterator?.return?.(); 163 164 164 165 if (rafId !== null) { 165 166 cancelAnimationFrame(rafId); ··· 256 257 disconnect(); 257 258 }); 258 259 } else { 259 - const cursor = formData.get("cursor")?.toString(); 260 - firehose = new Firehose({ 261 - relay: url, 262 - cursor: cursor, 263 - autoReconnect: false, 260 + const cursorParam = formData.get("cursor")?.toString(); 261 + const cursor = cursorParam ? parseInt(cursorParam, 10) : undefined; 262 + let reconnectCursor: number; 263 + const firehose = new FirehoseSubscription({ 264 + service: url, 265 + nsid: ComAtprotoSyncSubscribeRepos.mainSchema, 266 + params: () => ({ cursor: reconnectCursor ?? cursor }), 267 + onConnectionOpen() { 268 + setNotice(""); 269 + setConnected(true); 270 + }, 271 + onConnectionClose(ev) { 272 + reconnectCursor = records().at(-1)?.seq; 273 + console.log(ev); 274 + if (!ev.wasClean) { 275 + setNotice("Reconnecting..."); 276 + } 277 + }, 278 + onConnectionError(err) { 279 + console.error(err); 280 + setNotice(`Connection error: ${err.message}`); 281 + disconnect(); 282 + }, 264 283 }); 265 - firehose.on("error", (err) => { 266 - console.error(err); 267 - const message = err instanceof Error ? err.message : "Unknown error"; 268 - setNotice(`Connection error: ${message}`); 269 - disconnect(); 270 - }); 271 - firehose.on("commit", (commit) => { 272 - for (const op of commit.ops) { 273 - addRecord({ 274 - $type: commit.$type, 275 - repo: commit.repo, 276 - seq: commit.seq, 277 - time: commit.time, 278 - rev: commit.rev, 279 - since: commit.since, 280 - op: op, 281 - }); 284 + 285 + firehoseIterator = firehose[Symbol.asyncIterator](); 286 + 287 + while (true) { 288 + const { value: message, done } = await firehoseIterator.next(); 289 + if (done) break; 290 + 291 + switch (message.$type) { 292 + case "com.atproto.sync.subscribeRepos#commit": { 293 + for (const op of message.ops) { 294 + addRecord({ 295 + $type: message.$type, 296 + repo: message.repo, 297 + seq: message.seq, 298 + time: message.time, 299 + rev: message.rev, 300 + since: message.since, 301 + op: op, 302 + }); 303 + } 304 + break; 305 + } 306 + case "com.atproto.sync.subscribeRepos#identity": 307 + case "com.atproto.sync.subscribeRepos#account": { 308 + addRecord(message); 309 + break; 310 + } 311 + case "com.atproto.sync.subscribeRepos#sync": { 312 + addRecord({ 313 + $type: message.$type, 314 + did: message.did, 315 + rev: message.rev, 316 + seq: message.seq, 317 + time: message.time, 318 + }); 319 + break; 320 + } 282 321 } 283 - }); 284 - firehose.on("identity", (identity) => addRecord(identity)); 285 - firehose.on("account", (account) => addRecord(account)); 286 - firehose.on("sync", (sync) => { 287 - addRecord({ 288 - $type: sync.$type, 289 - did: sync.did, 290 - rev: sync.rev, 291 - seq: sync.seq, 292 - time: sync.time, 293 - }); 294 - }); 295 - firehose.start(); 322 + } 296 323 } 297 324 }; 298 325 ··· 310 337 311 338 onCleanup(() => { 312 339 socket?.close(); 313 - firehose?.close(); 340 + firehoseIterator?.return?.(); 314 341 if (rafId !== null) cancelAnimationFrame(rafId); 315 342 if (statsIntervalId !== null) clearInterval(statsIntervalId); 316 343 if (statsUpdateIntervalId !== null) clearInterval(statsUpdateIntervalId);