···30303131# Backfill cap (number of posts; defaults to 10000)
3232BACKFILL_MAX_POSTS=10000
3333+3434+# OpenTelemetry (traces to Axiom)
3535+# Tracing is disabled when OTEL_EXPORTER_OTLP_ENDPOINT is unset.
3636+# OTEL_SERVICE_NAME is set per-process in docker-compose.yml (web/worker/jetstream).
3737+OTEL_EXPORTER_OTLP_ENDPOINT=https://api.axiom.co
3838+OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <AXIOM_API_TOKEN>,X-Axiom-Dataset=<AXIOM_DATASET>
···11import { randomUUID } from 'node:crypto'
22import { inject } from '@adonisjs/core'
33import { Job } from '@adonisjs/queue'
44+import { record, setAttributes } from '@adonisjs/otel/helpers'
45import { AtprotoClient, parseGetAuthorFeedResponse } from '#lib/atproto/index'
56import { ClickHouseStore } from '#lib/clickhouse/index'
67import TrackedProfile from '#models/tracked_profile'
···5859 async execute(): Promise<void> {
5960 const { did } = this.payload
60616161- // Read BACKFILL_MAX_POSTS from env (default 10,000)
6262- const maxPosts = Number(process.env['BACKFILL_MAX_POSTS'] ?? 10000)
6262+ await record('backfill_job.execute', async () => {
6363+ setAttributes({ 'backfill.did': did })
63646464- // Fetch the TrackedProfile row — dispatcher must insert it before dispatching
6565- const profile = await TrackedProfile.find(did)
6666- if (!profile) {
6767- throw new Error(
6868- `BackfillJob: TrackedProfile row missing for DID ${did} — dispatcher should insert it first`
6969- )
7070- }
6565+ // Read BACKFILL_MAX_POSTS from env (default 10,000)
6666+ const maxPosts = Number(process.env['BACKFILL_MAX_POSTS'] ?? 10000)
71677272- // Fetch the BackfillJob row (the Lucid row)
7373- const jobRow = await BackfillJobRow.find(did)
7474- if (!jobRow) {
7575- throw new Error(
7676- `BackfillJob: BackfillJob row missing for DID ${did} — dispatcher should insert it first`
7777- )
7878- }
6868+ // Fetch the TrackedProfile row — dispatcher must insert it before dispatching
6969+ const profile = await TrackedProfile.find(did)
7070+ if (!profile) {
7171+ throw new Error(
7272+ `BackfillJob: TrackedProfile row missing for DID ${did} — dispatcher should insert it first`
7373+ )
7474+ }
79758080- // If state is not 'running', this job is already done or was superseded
8181- if (jobRow.state !== 'running') {
8282- return
8383- }
7676+ setAttributes({
7777+ 'backfill.handle': profile.handle,
7878+ ...(profile.displayName && { 'backfill.display_name': profile.displayName }),
7979+ })
84808585- // Atomically claim exclusive execution. The database queue driver may
8686- // re-dispatch long-running jobs if it considers them stalled, so two
8787- // instances of the same job can race. We use a random lock token with
8888- // a compare-and-swap UPDATE: only one instance can set lock_token when
8989- // it is still NULL, and SQLite serialises writes so exactly one wins.
9090- const lockToken = randomUUID()
9191- const claimed = await BackfillJobRow.query()
9292- .where('did', did)
9393- .where('state', 'running')
9494- .whereNull('lockToken')
9595- .update({ lockToken })
8181+ // Fetch the BackfillJob row (the Lucid row)
8282+ const jobRow = await BackfillJobRow.find(did)
8383+ if (!jobRow) {
8484+ throw new Error(
8585+ `BackfillJob: BackfillJob row missing for DID ${did} — dispatcher should insert it first`
8686+ )
8787+ }
96889797- // claimed is [rowsAffected] — if 0, another execution already has the lock
9898- if ((Array.isArray(claimed) ? claimed[0] : claimed) === 0) {
9999- return
100100- }
8989+ // If state is not 'running', this job is already done or was superseded
9090+ if (jobRow.state !== 'running') {
9191+ return
9292+ }
10193102102- // ---------------------------------------------------------------------------
103103- // Main backfill loop
104104- // ---------------------------------------------------------------------------
9494+ // Atomically claim exclusive execution. The database queue driver may
9595+ // re-dispatch long-running jobs if it considers them stalled, so two
9696+ // instances of the same job can race. We use a random lock token with
9797+ // a compare-and-swap UPDATE: only one instance can set lock_token when
9898+ // it is still NULL, and SQLite serialises writes so exactly one wins.
9999+ const lockToken = randomUUID()
100100+ const claimed = await BackfillJobRow.query()
101101+ .where('did', did)
102102+ .where('state', 'running')
103103+ .whereNull('lockToken')
104104+ .update({ lockToken })
105105+106106+ // claimed is [rowsAffected] — if 0, another execution already has the lock
107107+ if ((Array.isArray(claimed) ? claimed[0] : claimed) === 0) {
108108+ return
109109+ }
110110+111111+ // -------------------------------------------------------------------------
112112+ // Main backfill loop
113113+ // -------------------------------------------------------------------------
114114+115115+ let cursor: string | undefined
116116+ let fetchedCount = 0
105117106106- let cursor: string | undefined
107107- let fetchedCount = 0
118118+ while (true) {
119119+ const page = await this.atprotoClient.getAuthorFeed(did, cursor, 100)
108120109109- while (true) {
110110- const page = await this.atprotoClient.getAuthorFeed(did, cursor, 100)
121121+ // Parse the whole page, filtering reposts of other users client-side.
122122+ const snapshots = parseGetAuthorFeedResponse({ feed: page.posts }, did)
111123112112- // Parse the whole page, filtering reposts of other users client-side.
113113- const snapshots = parseGetAuthorFeedResponse({ feed: page.posts }, did)
124124+ if (snapshots.length > 0) {
125125+ await this.clickHouseStore.insertPostSnapshots(snapshots)
114126115115- if (snapshots.length > 0) {
116116- await this.clickHouseStore.insertPostSnapshots(snapshots)
127127+ fetchedCount += snapshots.length
128128+ jobRow.fetchedPosts = fetchedCount
129129+ await jobRow.save()
130130+ }
117131118118- fetchedCount += snapshots.length
119119- jobRow.fetchedPosts = fetchedCount
120120- await jobRow.save()
121121- }
132132+ // Cap is checked per-page — since pages are up to 100, fetchedCount
133133+ // may overshoot maxPosts by up to (pageSize - 1). This is intentional:
134134+ // mid-page cutoff would complicate the loop for negligible savings.
135135+ if (fetchedCount >= maxPosts || page.cursor === undefined) {
136136+ break
137137+ }
122138123123- // Cap is checked per-page — since pages are up to 100, fetchedCount
124124- // may overshoot maxPosts by up to (pageSize - 1). This is intentional:
125125- // mid-page cutoff would complicate the loop for negligible savings.
126126- if (fetchedCount >= maxPosts || page.cursor === undefined) {
127127- break
139139+ cursor = page.cursor
128140 }
129141130130- cursor = page.cursor
131131- }
142142+ // -------------------------------------------------------------------------
143143+ // Mark completion
144144+ // -------------------------------------------------------------------------
132145133133- // ---------------------------------------------------------------------------
134134- // Mark completion
135135- // ---------------------------------------------------------------------------
146146+ setAttributes({ 'backfill.fetched_posts': fetchedCount })
136147137137- const now = Date.now()
148148+ const now = Date.now()
138149139139- profile.backfilledAt = now
140140- await profile.save()
150150+ profile.backfilledAt = now
151151+ await profile.save()
141152142142- jobRow.state = 'done'
143143- jobRow.finishedAt = now
144144- jobRow.fetchedPosts = fetchedCount
145145- jobRow.truncated = fetchedCount >= maxPosts
146146- await jobRow.save()
153153+ jobRow.state = 'done'
154154+ jobRow.finishedAt = now
155155+ jobRow.fetchedPosts = fetchedCount
156156+ jobRow.truncated = fetchedCount >= maxPosts
157157+ await jobRow.save()
158158+ })
147159 }
148160149161 async failed(error: Error): Promise<void> {
150150- const jobRow = await BackfillJobRow.find(this.payload.did)
151151- if (!jobRow) return
162162+ await record('backfill_job.failed', async () => {
163163+ setAttributes({
164164+ 'backfill.did': this.payload.did,
165165+ 'backfill.error': error.message,
166166+ })
167167+168168+ const jobRow = await BackfillJobRow.find(this.payload.did)
169169+ if (!jobRow) return
152170153153- jobRow.state = 'failed'
154154- jobRow.error = error.message
155155- jobRow.finishedAt = Date.now()
156156- jobRow.lockToken = null
157157- await jobRow.save()
171171+ jobRow.state = 'failed'
172172+ jobRow.error = error.message
173173+ jobRow.finishedAt = Date.now()
174174+ jobRow.lockToken = null
175175+ await jobRow.save()
176176+ })
158177 }
159178}
+5
bin/console.ts
···11+/**
22+ * OpenTelemetry initialization - MUST be the first import
33+ * @see https://opentelemetry.io/docs/languages/js/getting-started/nodejs/
44+ */
55+import '../otel.js'
16/*
27|--------------------------------------------------------------------------
38| Ace entry point
+5
bin/server.ts
···11+/**
22+ * OpenTelemetry initialization - MUST be the first import
33+ * @see https://opentelemetry.io/docs/languages/js/getting-started/nodejs/
44+ */
55+import '../otel.js'
16/*
27|--------------------------------------------------------------------------
38| HTTP server entrypoint
···11+/**
22+ * OpenTelemetry initialization file.
33+ *
44+ * IMPORTANT: This file must be imported FIRST in bin/server.ts
55+ * for auto-instrumentation to work correctly.
66+ */
77+import { init } from '@adonisjs/otel/init'
88+99+await init(import.meta.dirname)