Ionosphere.tv
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: backfill comments from known authors on startup

On startup, fetches tv.ionosphere.comment records directly from the
PDS of any author who already has comments in the DB. Catches
anything the Jetstream missed during downtime.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+79 -1
+79 -1
apps/ionosphere-appview/src/public-jetstream.ts
··· 1 1 import type Database from "better-sqlite3"; 2 2 import { JetstreamClient } from "./jetstream.js"; 3 - import { processEvent } from "./indexer.js"; 3 + import { processEvent, type JetstreamEvent } from "./indexer.js"; 4 4 5 5 const PUBLIC_JETSTREAM_URL = process.env.PUBLIC_JETSTREAM_URL ?? "wss://jetstream1.us-east.bsky.network"; 6 6 ··· 51 51 onError: (err) => console.error("Public Jetstream error:", err), 52 52 }); 53 53 54 + // Backfill comments from known authors on startup 55 + backfillComments(db).catch((err) => 56 + console.error("[Public Jetstream] Backfill error:", err) 57 + ); 58 + 54 59 return client; 55 60 } 61 + 62 + /** 63 + * Backfill comments from DIDs that already have comments in the DB. 64 + * Fetches their tv.ionosphere.comment records directly from their PDS 65 + * to catch anything the Jetstream missed. 66 + */ 67 + async function backfillComments(db: Database.Database): Promise<void> { 68 + const authors = db 69 + .prepare("SELECT DISTINCT author_did FROM comments") 70 + .all() as { author_did: string }[]; 71 + 72 + if (authors.length === 0) return; 73 + 74 + for (const { author_did } of authors) { 75 + try { 76 + // Resolve DID to PDS endpoint 77 + const didDoc = await fetch( 78 + `https://plc.directory/${author_did}` 79 + ).then((r) => r.json()); 80 + 81 + const pdsEndpoint = didDoc?.service?.find( 82 + (s: any) => s.type === "AtprotoPersonalDataServer" 83 + )?.serviceEndpoint; 84 + 85 + if (!pdsEndpoint) continue; 86 + 87 + // Fetch all comments from this author's PDS 88 + let cursor: string | undefined; 89 + let total = 0; 90 + do { 91 + const params = new URLSearchParams({ 92 + repo: author_did, 93 + collection: "tv.ionosphere.comment", 94 + limit: "100", 95 + }); 96 + if (cursor) params.set("cursor", cursor); 97 + 98 + const res = await fetch( 99 + `${pdsEndpoint}/xrpc/com.atproto.repo.listRecords?${params}` 100 + ); 101 + if (!res.ok) break; 102 + const data = await res.json(); 103 + 104 + for (const record of data.records || []) { 105 + const rkey = record.uri.split("/").pop()!; 106 + const event: JetstreamEvent = { 107 + did: author_did, 108 + kind: "commit", 109 + commit: { 110 + operation: "create", 111 + collection: "tv.ionosphere.comment", 112 + rkey, 113 + record: record.value, 114 + cid: record.cid || "", 115 + rev: "", 116 + }, 117 + time_us: Date.now() * 1000, 118 + }; 119 + try { 120 + processEvent(db, event); 121 + total++; 122 + } catch {} 123 + } 124 + 125 + cursor = data.cursor; 126 + } while (cursor); 127 + 128 + if (total > 0) { 129 + console.log(`[Public Jetstream] Backfilled ${total} comments from ${author_did.slice(0, 24)}...`); 130 + } 131 + } catch {} 132 + } 133 + }