···1616import { emit, timer } from './logger.ts'
1717import type { BackfillConfig } from './config.ts'
18181919+/** Options passed to {@link runBackfill}. */
1920interface BackfillOpts {
2121+ /** Base URL of the relay or PDS to enumerate repos from (e.g. `wss://bsky.network`). */
2022 pdsUrl: string
2323+ /** PLC directory URL used to resolve `did:plc` identifiers (e.g. `https://plc.directory`). */
2124 plcUrl: string
2525+ /** AT Protocol collection NSIDs to index (e.g. `app.bsky.feed.post`). */
2226 collections: Set<string>
2727+ /** Backfill behavior settings from `config.yaml`. */
2328 config: BackfillConfig
2429}
25302626-// --- DID Resolution ---
27312832interface PdsResolution {
3333+ /** The PDS service endpoint URL from the DID document. */
2934 pds: string
3535+ /** The user's handle extracted from `alsoKnownAs`, or `null` if not present. */
3036 handle: string | null
3137}
32383939+/** In-memory cache of DID → PDS resolution results to avoid redundant lookups. */
3340const pdsCache = new Map<string, PdsResolution>()
3441let plcUrl: string
35424343+/**
4444+ * Resolves a DID to its PDS endpoint and handle by fetching the DID document.
4545+ *
4646+ * Supports both `did:web` (fetches `/.well-known/did.json`) and `did:plc`
4747+ * (fetches from the PLC directory). Results are cached for the lifetime of the process.
4848+ *
4949+ * @example
5050+ * ```ts
5151+ * const { pds, handle } = await resolvePds('did:plc:abc123')
5252+ * // pds = "https://puffball.us-east.host.bsky.network"
5353+ * // handle = "alice.bsky.social"
5454+ * ```
5555+ */
3656async function resolvePds(did: string): Promise<PdsResolution> {
3757 const cached = pdsCache.get(did)
3858 if (cached) return cached
···6181 return result
6282}
63836464-// --- Repo Enumeration ---
65848585+/**
8686+ * Paginates through all active repos on a relay/PDS using `com.atproto.sync.listRepos`.
8787+ * Yields `{ did, rev }` for each active repo. Skips deactivated repos.
8888+ */
6689async function* listRepos(pdsUrl: string): AsyncGenerator<{ did: string; rev: string }> {
6790 let cursor: string | undefined
6891 while (true) {
···79102 }
80103}
81104105105+/**
106106+ * Paginates through repos that contain records in a specific collection using
107107+ * `com.atproto.sync.listReposByCollection`. More efficient than {@link listRepos}
108108+ * when only a few collections are needed, since the relay can filter server-side.
109109+ *
110110+ * Not all relays support this endpoint — callers should fall back to {@link listRepos}.
111111+ */
82112async function* listReposByCollection(
83113 pdsUrl: string,
84114 collection: string,
···98128 }
99129}
100130101101-// --- Single Repo Backfill ---
102131132132+/**
133133+ * Downloads and indexes a single user's repo via `com.atproto.sync.getRepo`.
134134+ *
135135+ * The full flow:
136136+ * 1. Resolve the DID to find the user's PDS endpoint
137137+ * 2. Fetch the repo as a CAR file from the PDS
138138+ * 3. Parse the CAR, decode the commit, and walk the MST (Merkle Search Tree)
139139+ * 4. Delete any existing records for this DID (so deletions are reflected)
140140+ * 5. Bulk-insert all records matching the target collections
141141+ *
142142+ * On failure, applies exponential backoff retry logic. HTTP 4xx errors are
143143+ * treated as permanent failures (repo doesn't exist or is deactivated) and
144144+ * are not retried.
145145+ *
146146+ * @param did - The DID of the repo to backfill (e.g. `did:plc:abc123`)
147147+ * @param collections - Collection NSIDs to index; records in other collections are skipped
148148+ * @param fetchTimeout - Maximum seconds to wait for the CAR download before aborting
149149+ * @returns The number of records successfully indexed
150150+ *
151151+ * @example
152152+ * ```ts
153153+ * const count = await backfillRepo('did:plc:abc123', new Set(['app.bsky.feed.post']), 30)
154154+ * console.log(`Indexed ${count} records`)
155155+ * ```
156156+ */
103157export async function backfillRepo(did: string, collections: Set<string>, fetchTimeout: number): Promise<number> {
104158 const elapsed = timer()
105159 let count = 0
···215269 }
216270}
217271218218-// --- Worker Pool ---
219272273273+/**
274274+ * Processes items concurrently with a fixed number of workers.
275275+ * Workers pull from a shared index so the pool stays saturated even when
276276+ * individual items complete at different speeds. Errors from `fn` are
277277+ * swallowed (they're expected to be captured via structured logging).
278278+ *
279279+ * @param items - The work items to process
280280+ * @param parallelism - Maximum number of concurrent workers
281281+ * @param fn - Async function to run for each item
282282+ */
220283async function runWorkerPool<T>(items: T[], parallelism: number, fn: (item: T) => Promise<void>): Promise<void> {
221284 let index = 0
222285···235298 await Promise.all(workers)
236299}
237300238238-// --- Main Backfill Entry Point ---
239301302302+/**
303303+ * Orchestrates a full backfill run: enumerate repos, filter to pending, download, and index.
304304+ *
305305+ * Operates in one of three modes based on config:
306306+ * - **Pinned repos** — backfill only the DIDs listed in `config.repos`
307307+ * - **Full network** — enumerate every active repo on the relay via `listRepos`
308308+ * - **Collection signal** (default) — use `listReposByCollection` to discover repos that
309309+ * contain records in the configured signal collections, falling back to `listRepos`
310310+ * if the relay doesn't support collection-scoped enumeration
311311+ *
312312+ * After the initial pass, failed repos are retried with exponential backoff
313313+ * (up to `config.maxRetries` attempts). The run emits structured log events for
314314+ * monitoring via the `backfill.run` and `backfill.retry_round` event types.
315315+ *
316316+ * @example
317317+ * ```ts
318318+ * await runBackfill({
319319+ * pdsUrl: 'wss://bsky.network',
320320+ * plcUrl: 'https://plc.directory',
321321+ * collections: new Set(['xyz.statusphere.status']),
322322+ * config: {
323323+ * fullNetwork: false,
324324+ * parallelism: 10,
325325+ * fetchTimeout: 30,
326326+ * maxRetries: 5,
327327+ * },
328328+ * })
329329+ * ```
330330+ */
240331export async function runBackfill(opts: BackfillOpts): Promise<void> {
241332 const { pdsUrl, collections, config } = opts
242333 plcUrl = opts.plcUrl