See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Make backfill jobs self-recover via lease + checkpoint

Replaces the never-cleared lock_token mutex with an expiring lease and
persists (cursor, fetched_posts) on every page so a re-dispatched job
resumes instead of restarting. A crashed worker no longer leaves a row
stuck forever — the next dispatch reclaims an expired lease, and
backfill:unstick drops its fetched_posts reset since checkpointing
handles resume. Also refreshes three stale loading-page assertions
left over from the meta-refresh removal.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+546 -170
+144 -77
app/jobs/backfill_job.ts
··· 24 24 */ 25 25 const STALL_RETRY_SLEEP_MS = 5_000 26 26 27 + /** 28 + * How long a worker holds the row's lease before another worker may steal 29 + * it. Each progress save extends the lease, so the worst case is the time 30 + * between saves: a single getAuthorFeed call (≤ ~45s with stall retries) 31 + * plus the ClickHouse insert. 2 min gives comfortable headroom while still 32 + * recovering from a hard crash within the same order of magnitude. 33 + */ 34 + const LEASE_DURATION_MS = 2 * 60 * 1000 35 + 27 36 const sleep = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms)) 28 37 38 + /** 39 + * Coerce knex's UPDATE return value (number on better-sqlite3, [count] on 40 + * some other drivers) into a single number. 41 + */ 42 + function rowsAffected(result: unknown): number { 43 + return Array.isArray(result) ? Number(result[0]) : Number(result) 44 + } 45 + 29 46 // --------------------------------------------------------------------------- 30 47 // BackfillJob 31 48 // --------------------------------------------------------------------------- ··· 36 53 * Dispatched by the web process when a new user is searched and their 37 54 * backfilled_at is NULL. The job: 38 55 * 39 - * 1. Paginates getAuthorFeed (filter=posts_with_replies) until the cursor is 40 - * exhausted or BACKFILL_MAX_POSTS is reached (default 10,000). 41 - * 2. Parses each page directly via parseGetAuthorFeedResponse — the AppView 56 + * 1. Atomically claims a lease on the row (compare-and-swap on lock_token + 57 + * lock_expires_at). Stale leases — written by a worker that crashed — 58 + * are reclaimable; live leases held by another worker are not. 59 + * 2. Resumes from the persisted (cursor, fetched_posts) checkpoint when one 60 + * is present, so a re-dispatched job picks up where the previous run 61 + * left off rather than restarting at the top. 62 + * 3. Paginates getAuthorFeed (filter=posts_with_replies) until the cursor 63 + * is exhausted or BACKFILL_MAX_POSTS is reached (default 10,000). 64 + * 4. Parses each page directly via parseGetAuthorFeedResponse — the AppView 42 65 * already populates like/repost/quote counts on feed items, so we skip 43 66 * the redundant getPosts re-hydration. The parser also drops reposts of 44 67 * other users (feed items where post.author.did !== targetDid). 45 - * 3. Inserts snapshots into ClickHouse and updates backfill_jobs.fetched_posts 46 - * after every page so the loading page can display progress. 47 - * 4. On completion, sets tracked_profiles.backfilled_at = now() and 68 + * 5. After each page, atomically writes (fetched_posts, cursor, 69 + * lock_expires_at) under "WHERE lock_token = ours". If our lease was 70 + * stolen we abort cleanly instead of overwriting another worker's 71 + * progress. 72 + * 6. On completion, sets tracked_profiles.backfilled_at = now() and 48 73 * backfill_jobs.state = 'done'. 49 - * 5. On permanent failure (after all retries), sets 50 - * backfill_jobs.state = 'failed' and backfill_jobs.error = <message>. 74 + * 7. On any throw, a finally block releases the lease so the next dispatch 75 + * can claim immediately. On permanent failure (after queue retries 76 + * exhausted), failed() sets state='failed' and preserves the error. 51 77 * 52 78 * Spec: §6 Flow 2 53 79 */ ··· 73 99 await record('backfill_job.execute', async () => { 74 100 setAttributes({ 'backfill.did': did }) 75 101 76 - // Read BACKFILL_MAX_POSTS from env (default 10,000) 77 102 const maxPosts = Number(process.env['BACKFILL_MAX_POSTS'] ?? 10000) 78 103 79 - // Fetch the TrackedProfile row — dispatcher must insert it before dispatching 80 104 const profile = await TrackedProfile.find(did) 81 105 if (!profile) { 82 106 throw new Error( ··· 89 113 ...(profile.displayName && { 'backfill.display_name': profile.displayName }), 90 114 }) 91 115 92 - // Fetch the BackfillJob row (the Lucid row) 93 - const jobRow = await BackfillJobRow.find(did) 94 - if (!jobRow) { 116 + const initialRow = await BackfillJobRow.find(did) 117 + if (!initialRow) { 95 118 throw new Error( 96 119 `BackfillJob: BackfillJob row missing for DID ${did} — dispatcher should insert it first` 97 120 ) 98 121 } 99 122 100 - // If state is not 'running', this job is already done or was superseded 101 - if (jobRow.state !== 'running') { 123 + // Already done or marked failed — nothing to do 124 + if (initialRow.state !== 'running') { 102 125 return 103 126 } 104 127 105 - // Atomically claim exclusive execution. The database queue driver may 106 - // re-dispatch long-running jobs if it considers them stalled, so two 107 - // instances of the same job can race. We use a random lock token with 108 - // a compare-and-swap UPDATE: only one instance can set lock_token when 109 - // it is still NULL, and SQLite serialises writes so exactly one wins. 128 + // ----------------------------------------------------------------------- 129 + // Acquire the lease. 130 + // 131 + // The CAS condition is "state='running' AND (no lease OR lease expired)": 132 + // the row is claimable when no worker holds it, OR when the previous 133 + // holder's lease has elapsed (i.e. the worker crashed). SQLite serialises 134 + // writes, so exactly one concurrent claim wins. 135 + // ----------------------------------------------------------------------- 136 + 110 137 const lockToken = randomUUID() 138 + const claimNow = Date.now() 139 + const claimExpiresAt = claimNow + LEASE_DURATION_MS 140 + 111 141 const claimed = await BackfillJobRow.query() 112 142 .where('did', did) 113 143 .where('state', 'running') 114 - .whereNull('lockToken') 115 - .update({ lockToken }) 144 + .where((q) => q.whereNull('lockExpiresAt').orWhere('lockExpiresAt', '<=', claimNow)) 145 + .update({ lockToken, lockExpiresAt: claimExpiresAt }) 116 146 117 - // claimed is [rowsAffected] — if 0, another execution already has the lock 118 - if ((Array.isArray(claimed) ? claimed[0] : claimed) === 0) { 147 + if (rowsAffected(claimed) === 0) { 119 148 return 120 149 } 121 150 122 - // ------------------------------------------------------------------------- 123 - // Main backfill loop 124 - // 125 - // getAuthorFeed stalls on cold BunnyCDN paths are routine — see the 126 - // incident where a single page hung for 117s before returning 200. The 127 - // client aborts at PER_ATTEMPT_TIMEOUT_MS and retries up to MAX_STALL_RETRIES 128 - // times; if even that budget is exhausted, it throws BlueskyStalledError. 129 - // We catch it here and retry the same cursor with a small delay — the 130 - // cursor is in-memory so no page is lost, and a fresh retry typically 131 - // hits a warmer cache node. This is the "retry indefinitely until 132 - // fully loaded" contract the feature needs. 133 - // ------------------------------------------------------------------------- 151 + // Re-read to pick up the persisted checkpoint (cursor + fetchedPosts). 152 + // The lease we just took is the source of truth; everything else may 153 + // be from a previous worker. 154 + const claimedRow = await BackfillJobRow.findOrFail(did) 155 + 156 + let cursor: string | undefined = claimedRow.cursor ?? undefined 157 + let fetchedCount = claimedRow.fetchedPosts 134 158 135 - let cursor: string | undefined 136 - let fetchedCount = 0 159 + try { 160 + // --------------------------------------------------------------------- 161 + // Main backfill loop 162 + // 163 + // getAuthorFeed stalls on cold BunnyCDN paths are routine — the client 164 + // aborts at PER_ATTEMPT_TIMEOUT_MS and retries up to MAX_STALL_RETRIES 165 + // times; if even that budget is exhausted, it throws BlueskyStalledError. 166 + // We catch it here and retry the same cursor with a small delay — the 167 + // cursor is in-memory so no page is lost, and a fresh retry typically 168 + // hits a warmer cache node. 169 + // --------------------------------------------------------------------- 137 170 138 - while (true) { 139 - let page: Awaited<ReturnType<AtprotoClient['getAuthorFeed']>> 140 - try { 141 - page = await this.atprotoClient.getAuthorFeed(did, cursor, 100) 142 - } catch (err) { 143 - if (err instanceof BlueskyStalledError) { 144 - setAttributes({ 'backfill.last_stall_at': new Date().toISOString() }) 145 - await sleep(STALL_RETRY_SLEEP_MS) 146 - continue 171 + while (true) { 172 + let page: Awaited<ReturnType<AtprotoClient['getAuthorFeed']>> 173 + try { 174 + page = await this.atprotoClient.getAuthorFeed(did, cursor, 100) 175 + } catch (err) { 176 + if (err instanceof BlueskyStalledError) { 177 + setAttributes({ 'backfill.last_stall_at': new Date().toISOString() }) 178 + await sleep(STALL_RETRY_SLEEP_MS) 179 + continue 180 + } 181 + throw err 147 182 } 148 - throw err 149 - } 150 183 151 - // Parse the whole page, filtering reposts of other users client-side. 152 - const snapshots = parseGetAuthorFeedResponse({ feed: page.posts }, did) 184 + const snapshots = parseGetAuthorFeedResponse({ feed: page.posts }, did) 153 185 154 - if (snapshots.length > 0) { 155 - await this.clickHouseStore.insertPostSnapshots(snapshots) 186 + if (snapshots.length > 0) { 187 + await this.clickHouseStore.insertPostSnapshots(snapshots) 188 + fetchedCount += snapshots.length 189 + } 156 190 157 - fetchedCount += snapshots.length 158 - jobRow.fetchedPosts = fetchedCount 159 - await jobRow.save() 160 - } 191 + cursor = page.cursor 161 192 162 - // Cap is checked per-page — since pages are up to 100, fetchedCount 163 - // may overshoot maxPosts by up to (pageSize - 1). This is intentional: 164 - // mid-page cutoff would complicate the loop for negligible savings. 165 - if (fetchedCount >= maxPosts || page.cursor === undefined) { 166 - break 167 - } 193 + // Atomically checkpoint progress AND extend the lease. The 194 + // "lockToken = ours" predicate is the safety net: if our lease 195 + // expired and another worker took over, we get rowsAffected=0 196 + // here and abort instead of clobbering their progress. 197 + const persisted = await BackfillJobRow.query() 198 + .where('did', did) 199 + .where('lockToken', lockToken) 200 + .update({ 201 + fetchedPosts: fetchedCount, 202 + cursor: cursor ?? null, 203 + lockExpiresAt: Date.now() + LEASE_DURATION_MS, 204 + }) 168 205 169 - cursor = page.cursor 170 - } 206 + if (rowsAffected(persisted) === 0) { 207 + // Someone else owns the row — exit silently. They'll resume 208 + // from their own checkpoint, which is at most one page behind us 209 + // (the page we just inserted; ClickHouse dedupes via FINAL). 210 + return 211 + } 171 212 172 - // ------------------------------------------------------------------------- 173 - // Mark completion 174 - // ------------------------------------------------------------------------- 213 + // Cap is checked per-page — since pages are up to 100, fetchedCount 214 + // may overshoot maxPosts by up to (pageSize - 1). Mid-page cutoff 215 + // would complicate the loop for negligible savings. 216 + if (fetchedCount >= maxPosts || page.cursor === undefined) { 217 + break 218 + } 219 + } 175 220 176 - setAttributes({ 'backfill.fetched_posts': fetchedCount }) 221 + // ------------------------------------------------------------------- 222 + // Mark completion. Same lease-guarded UPDATE: if we lost the lease 223 + // we don't promote the row to 'done'. 224 + // ------------------------------------------------------------------- 177 225 178 - const now = Date.now() 226 + setAttributes({ 'backfill.fetched_posts': fetchedCount }) 179 227 180 - profile.backfilledAt = now 181 - await profile.save() 228 + const completedAt = Date.now() 229 + const completed = await BackfillJobRow.query() 230 + .where('did', did) 231 + .where('lockToken', lockToken) 232 + .update({ 233 + state: 'done', 234 + finishedAt: completedAt, 235 + fetchedPosts: fetchedCount, 236 + truncated: fetchedCount >= maxPosts, 237 + cursor: null, 238 + lockToken: null, 239 + lockExpiresAt: null, 240 + }) 182 241 183 - jobRow.state = 'done' 184 - jobRow.finishedAt = now 185 - jobRow.fetchedPosts = fetchedCount 186 - jobRow.truncated = fetchedCount >= maxPosts 187 - await jobRow.save() 242 + if (rowsAffected(completed) > 0) { 243 + profile.backfilledAt = completedAt 244 + await profile.save() 245 + } 246 + } finally { 247 + // Release the lease unconditionally. No-op if the success path 248 + // (or a stolen-lease abort) already cleared it. 249 + await BackfillJobRow.query() 250 + .where('did', did) 251 + .where('lockToken', lockToken) 252 + .update({ lockToken: null, lockExpiresAt: null }) 253 + } 188 254 }) 189 255 } 190 256 ··· 202 268 jobRow.error = error.message 203 269 jobRow.finishedAt = Date.now() 204 270 jobRow.lockToken = null 271 + jobRow.lockExpiresAt = null 205 272 await jobRow.save() 206 273 }) 207 274 }
+29 -11
app/lib/backfill_unstick.ts
··· 1 1 import BackfillJobRow from '#models/backfill_job' 2 2 3 3 export interface UnstickOptions { 4 - /** Minutes since startedAt before a locked job is considered stalled. Default: 60 */ 4 + /** 5 + * Minutes of grace beyond the lease expiry before a row is considered 6 + * stuck and re-dispatched. Default: 0 (re-dispatch as soon as the lease 7 + * has expired). Increase if you want to absorb very brief network blips 8 + * before nudging the queue. 9 + */ 5 10 staleMinutes?: number 6 11 /** Called for each unstuck DID to re-enqueue the backfill job. */ 7 12 dispatch: (did: string) => Promise<void> ··· 12 17 } 13 18 14 19 /** 15 - * Finds backfill jobs that are stuck — state='running' with a non-null 16 - * lockToken and startedAt older than the threshold — resets their lock so 17 - * they can be re-dispatched. 20 + * Finds backfill jobs whose worker lease has expired (or was never set 21 + * despite the row being 'running' for a long time) and re-dispatches them. 22 + * 23 + * The job itself owns lease lifecycle now — a healthy worker continually 24 + * extends `lock_expires_at`, releases it in a `finally` block on exit, and 25 + * a successor can claim a row whose lease has elapsed. So this command's 26 + * only remaining responsibility is to nudge the queue: the row is already 27 + * safe to re-execute on its own. 28 + * 29 + * Notably we do NOT reset `fetched_posts` or `cursor` here — the job 30 + * resumes from the saved checkpoint, so wiping them would force a full 31 + * re-paginate. 18 32 */ 19 33 export async function unstickBackfillJobs(options: UnstickOptions): Promise<UnstickResult> { 20 - const staleMinutes = options.staleMinutes ?? 60 34 + const staleMinutes = options.staleMinutes ?? 0 21 35 const cutoff = Date.now() - staleMinutes * 60 * 1000 22 36 37 + // Two flavours of "stuck": 38 + // 1. lock_expires_at <= cutoff → previous worker crashed; lease elapsed 39 + // 2. lock_expires_at IS NULL AND started_at <= cutoff → row was created 40 + // but the dispatched job never claimed it (e.g. the queue worker 41 + // crashed between create and execute), so no lease was ever written. 23 42 const stalled = await BackfillJobRow.query() 24 43 .where('state', 'running') 25 - .whereNotNull('lockToken') 26 - .where('startedAt', '<', cutoff) 44 + .where((q) => 45 + q 46 + .where('lockExpiresAt', '<=', cutoff) 47 + .orWhere((q2) => q2.whereNull('lockExpiresAt').where('startedAt', '<=', cutoff)) 48 + ) 27 49 28 50 for (const row of stalled) { 29 - row.lockToken = null 30 - row.fetchedPosts = 0 31 - row.startedAt = Date.now() 32 - await row.save() 33 51 await options.dispatch(row.did) 34 52 } 35 53
+6
app/models/backfill_job.ts
··· 30 30 31 31 @column() 32 32 declare lockToken: string | null 33 + 34 + @column() 35 + declare lockExpiresAt: number | null 36 + 37 + @column() 38 + declare cursor: string | null 33 39 }
+20 -12
commands/backfill_unstick.ts
··· 5 5 6 6 export default class BackfillUnstick extends BaseCommand { 7 7 static commandName = 'backfill:unstick' 8 - static description = 'Reset stalled backfill jobs and re-dispatch them' 8 + static description = 'Re-dispatch backfill jobs whose worker lease has expired' 9 9 static options: CommandOptions = { startApp: true } 10 10 11 11 @flags.number({ 12 - description: 'Only unstick jobs started more than N minutes ago (default: 60)', 13 - default: 60, 12 + description: 13 + 'Grace period in minutes added to the lease expiry before a row is re-dispatched (default: 0)', 14 + default: 0, 14 15 }) 15 16 declare minutes: number 16 17 17 18 @flags.boolean({ 18 - description: 'Preview which jobs would be unstuck without making changes', 19 + description: 'Preview which jobs would be re-dispatched without making changes', 19 20 default: false, 20 21 }) 21 22 declare dryRun: boolean ··· 26 27 const cutoff = Date.now() - this.minutes * 60 * 1000 27 28 const stalled = await BackfillJobRow.query() 28 29 .where('state', 'running') 29 - .whereNotNull('lockToken') 30 - .where('startedAt', '<', cutoff) 30 + .where((q) => 31 + q 32 + .where('lockExpiresAt', '<=', cutoff) 33 + .orWhere((q2) => q2.whereNull('lockExpiresAt').where('startedAt', '<=', cutoff)) 34 + ) 31 35 32 36 if (stalled.length === 0) { 33 - this.logger.info('No stalled jobs found.') 37 + this.logger.info('No stuck jobs found.') 34 38 return 35 39 } 36 40 37 - this.logger.info(`Would unstick ${stalled.length} job(s):`) 41 + this.logger.info(`Would re-dispatch ${stalled.length} job(s):`) 38 42 for (const row of stalled) { 39 - const stalledFor = Math.round((Date.now() - row.startedAt) / 60_000) 43 + const ageMin = Math.round((Date.now() - row.startedAt) / 60_000) 44 + const leaseStatus = 45 + row.lockExpiresAt === null 46 + ? 'never claimed' 47 + : `lease expired ${Math.round((Date.now() - row.lockExpiresAt) / 60_000)}m ago` 40 48 this.logger.info( 41 - ` ${row.did} — stalled ~${stalledFor}m, fetched ${row.fetchedPosts} posts` 49 + ` ${row.did} — running ~${ageMin}m, ${leaseStatus}, fetched ${row.fetchedPosts} posts` 42 50 ) 43 51 } 44 52 return ··· 53 61 }) 54 62 55 63 if (result.unstuck === 0) { 56 - this.logger.info('No stalled jobs found.') 64 + this.logger.info('No stuck jobs found.') 57 65 } else { 58 - this.logger.success(`Unstuck and re-dispatched ${result.unstuck} job(s).`) 66 + this.logger.success(`Re-dispatched ${result.unstuck} job(s).`) 59 67 } 60 68 } 61 69 }
+25
database/migrations/1776600000000_add_lease_and_cursor_to_backfill_jobs.ts
··· 1 + import { BaseSchema } from '@adonisjs/lucid/schema' 2 + 3 + export default class extends BaseSchema { 4 + protected tableName = 'backfill_jobs' 5 + 6 + async up() { 7 + this.schema.alterTable(this.tableName, (table) => { 8 + // Lease expiry (epoch ms). Workers refresh this on every progress save; 9 + // a worker may claim a row whose lease has expired. Replaces the 10 + // lock_token-only mutex which had no recovery path on crash. 11 + table.bigInteger('lock_expires_at').nullable() 12 + // AppView pagination cursor checkpoint. Persisted on every progress 13 + // save so a re-dispatched job resumes from the last completed page 14 + // instead of restarting at the top. 15 + table.text('cursor').nullable() 16 + }) 17 + } 18 + 19 + async down() { 20 + this.schema.alterTable(this.tableName, (table) => { 21 + table.dropColumn('lock_expires_at') 22 + table.dropColumn('cursor') 23 + }) 24 + } 25 + }
+1 -2
tests/functional/profile_controller.spec.ts
··· 226 226 response.assertHeader('location', '/profile/dril.bsky.social/likes') 227 227 }) 228 228 229 - // Test 10: Unknown user returns 200 with loading page (meta-refresh present) 229 + // Test 10: Unknown user returns 200 with loading page 230 230 test('GET /profile/dril.bsky.social/likes for unknown user returns loading page', async ({ 231 231 client, 232 232 assert, ··· 251 251 const response = await client.get('/profile/dril.bsky.social/likes?days=30') 252 252 response.assertStatus(200) 253 253 const html = response.text() 254 - assert.include(html, 'meta http-equiv="refresh"') 255 254 assert.include(html, 'Indexing') 256 255 // Task 4: live SSE-driven progress bar with Alpine component 257 256 assert.include(html, 'role="progressbar"')
+5 -5
tests/functional/profile_controller_dispatch.spec.ts
··· 101 101 102 102 const response = await client.get(`/profile/${TEST_HANDLE}/likes`) 103 103 104 - // Loading page returned 104 + // Loading page returned — Alpine `backfillProgress` component is the 105 + // distinctive marker (it only appears on the loading page). 105 106 response.assertStatus(200) 106 - assert.include(response.text(), 'meta http-equiv="refresh"') 107 - // Alpine `backfillProgress` component is wired with the fetched totalPosts 107 + assert.include(response.text(), 'x-data="backfillProgress(') 108 108 assert.include(response.text(), 'total: 1234') 109 109 assert.include(response.text(), 'role="progressbar"') 110 110 ··· 191 191 const response = await client.get(`/profile/${TEST_HANDLE}/likes`) 192 192 193 193 response.assertStatus(200) 194 - assert.include(response.text(), 'meta http-equiv="refresh"') 194 + assert.include(response.text(), 'x-data="backfillProgress(') 195 195 196 196 // Pre-seeded totalPosts (500) is rendered, not the fetched-and-discarded 777 197 197 assert.include(response.text(), 'total: 500') ··· 405 405 406 406 // Profile rendered (200), not loading page 407 407 response.assertStatus(200) 408 - assert.notInclude(response.text(), 'meta http-equiv="refresh"') 408 + assert.notInclude(response.text(), 'x-data="backfillProgress(') 409 409 410 410 // No dispatch — but getProfile is called once in the render path for live profile data 411 411 queue.fake().assertNothingPushed()
+65 -63
tests/unit/commands/backfill_unstick.spec.ts
··· 8 8 // Helpers 9 9 // --------------------------------------------------------------------------- 10 10 11 - const DID_STALLED = 'did:plc:stalled1' 12 - const DID_STALLED_2 = 'did:plc:stalled2' 13 - const DID_RUNNING_LEGIT = 'did:plc:running' 11 + const DID_EXPIRED_LEASE = 'did:plc:expired' 12 + const DID_EXPIRED_LEASE_2 = 'did:plc:expired2' 13 + const DID_LIVE_LEASE = 'did:plc:liveseat' 14 + const DID_NEVER_CLAIMED = 'did:plc:neverclaimed' 14 15 const DID_DONE = 'did:plc:done' 15 16 const DID_FAILED = 'did:plc:failed' 16 17 ··· 25 26 // Tests 26 27 // --------------------------------------------------------------------------- 27 28 28 - test.group('unstickBackfillJobs — identifies stalled jobs', (group) => { 29 + test.group('unstickBackfillJobs — re-dispatches jobs whose lease has expired', (group) => { 29 30 group.each.setup(() => testUtils.db().withGlobalTransaction()) 30 31 31 - test('resets jobs that are running with a lock token older than threshold', async ({ 32 - assert, 33 - }) => { 34 - await seedProfile(DID_STALLED) 32 + test('re-dispatches a job whose lease has elapsed (worker crashed)', async ({ assert }) => { 33 + await seedProfile(DID_EXPIRED_LEASE) 35 34 await BackfillJobRow.create({ 36 - did: DID_STALLED, 37 - startedAt: Date.now() - 60 * 60 * 1000, // 1 hour ago 35 + did: DID_EXPIRED_LEASE, 36 + startedAt: Date.now() - 60 * 60 * 1000, 38 37 state: 'running', 39 - fetchedPosts: 42, 40 - lockToken: 'stale-lock-token', 38 + fetchedPosts: 250, 39 + cursor: 'page-3', 40 + lockToken: 'crashed-worker-token', 41 + lockExpiresAt: Date.now() - 5 * 60 * 1000, // expired 5 min ago 41 42 }) 42 43 43 44 const dispatched: string[] = [] 44 45 const result = await unstickBackfillJobs({ 45 - staleMinutes: 30, 46 46 dispatch: async (did) => { 47 47 dispatched.push(did) 48 48 }, 49 49 }) 50 50 51 51 assert.equal(result.unstuck, 1) 52 - 53 - const row = await BackfillJobRow.findOrFail(DID_STALLED) 54 - assert.isNull(row.lockToken) 55 - assert.equal(row.state, 'running') 56 - assert.equal(row.fetchedPosts, 0) 52 + assert.deepEqual(dispatched, [DID_EXPIRED_LEASE]) 57 53 58 - assert.deepEqual(dispatched, [DID_STALLED]) 54 + // Crucially: checkpoint is preserved so the next run resumes from 55 + // page 3 instead of restarting at the top. 56 + const row = await BackfillJobRow.findOrFail(DID_EXPIRED_LEASE) 57 + assert.equal(row.fetchedPosts, 250) 58 + assert.equal(row.cursor, 'page-3') 59 59 }) 60 60 61 - test('does not touch jobs started recently (within threshold)', async ({ assert }) => { 62 - await seedProfile(DID_RUNNING_LEGIT) 61 + test('does not re-dispatch a job whose lease is still live', async ({ assert }) => { 62 + await seedProfile(DID_LIVE_LEASE) 63 63 await BackfillJobRow.create({ 64 - did: DID_RUNNING_LEGIT, 65 - startedAt: Date.now() - 5 * 60 * 1000, // 5 minutes ago 64 + did: DID_LIVE_LEASE, 65 + startedAt: Date.now() - 60 * 60 * 1000, 66 66 state: 'running', 67 67 fetchedPosts: 10, 68 - lockToken: 'active-lock', 68 + lockToken: 'active-worker-token', 69 + lockExpiresAt: Date.now() + 60 * 1000, // expires 1 min from now 69 70 }) 70 71 71 72 const dispatched: string[] = [] 72 73 const result = await unstickBackfillJobs({ 73 - staleMinutes: 30, 74 74 dispatch: async (did) => { 75 75 dispatched.push(did) 76 76 }, ··· 78 78 79 79 assert.equal(result.unstuck, 0) 80 80 assert.lengthOf(dispatched, 0) 81 + }) 81 82 82 - const row = await BackfillJobRow.findOrFail(DID_RUNNING_LEGIT) 83 - assert.equal(row.lockToken, 'active-lock') 84 - assert.equal(row.fetchedPosts, 10) 83 + test('re-dispatches a row that was never claimed (no lease ever set)', async ({ assert }) => { 84 + // Row created by the controller, dispatch fired, but the queue worker 85 + // never ran the job (e.g. crashed before pulling the message). The 86 + // lease never got written, but startedAt is old. 87 + await seedProfile(DID_NEVER_CLAIMED) 88 + await BackfillJobRow.create({ 89 + did: DID_NEVER_CLAIMED, 90 + startedAt: Date.now() - 60 * 60 * 1000, 91 + state: 'running', 92 + fetchedPosts: 0, 93 + lockToken: null, 94 + lockExpiresAt: null, 95 + }) 96 + 97 + const dispatched: string[] = [] 98 + const result = await unstickBackfillJobs({ 99 + dispatch: async (did) => { 100 + dispatched.push(did) 101 + }, 102 + }) 103 + 104 + assert.equal(result.unstuck, 1) 105 + assert.deepEqual(dispatched, [DID_NEVER_CLAIMED]) 85 106 }) 86 107 87 108 test('does not touch done or failed jobs', async ({ assert }) => { ··· 94 115 state: 'done', 95 116 fetchedPosts: 500, 96 117 lockToken: null, 118 + lockExpiresAt: null, 97 119 }) 98 120 await BackfillJobRow.create({ 99 121 did: DID_FAILED, ··· 101 123 state: 'failed', 102 124 fetchedPosts: 0, 103 125 lockToken: null, 126 + lockExpiresAt: null, 104 127 error: 'some error', 105 128 }) 106 129 107 130 const dispatched: string[] = [] 108 131 const result = await unstickBackfillJobs({ 109 - staleMinutes: 30, 110 132 dispatch: async (did) => { 111 133 dispatched.push(did) 112 134 }, ··· 116 138 assert.lengthOf(dispatched, 0) 117 139 }) 118 140 119 - test('does not touch running jobs without a lock token', async ({ assert }) => { 120 - await seedProfile(DID_RUNNING_LEGIT) 121 - await BackfillJobRow.create({ 122 - did: DID_RUNNING_LEGIT, 123 - startedAt: Date.now() - 60 * 60 * 1000, 124 - state: 'running', 125 - fetchedPosts: 0, 126 - lockToken: null, // no lock — hasn't been picked up yet 127 - }) 128 - 129 - const dispatched: string[] = [] 130 - const result = await unstickBackfillJobs({ 131 - staleMinutes: 30, 132 - dispatch: async (did) => { 133 - dispatched.push(did) 134 - }, 135 - }) 136 - 137 - assert.equal(result.unstuck, 0) 138 - assert.lengthOf(dispatched, 0) 139 - }) 140 - 141 - test('unsticks multiple stalled jobs', async ({ assert }) => { 142 - await seedProfile(DID_STALLED) 143 - await seedProfile(DID_STALLED_2) 141 + test('re-dispatches multiple expired jobs', async ({ assert }) => { 142 + await seedProfile(DID_EXPIRED_LEASE) 143 + await seedProfile(DID_EXPIRED_LEASE_2) 144 144 145 - for (const did of [DID_STALLED, DID_STALLED_2]) { 145 + for (const did of [DID_EXPIRED_LEASE, DID_EXPIRED_LEASE_2]) { 146 146 await BackfillJobRow.create({ 147 147 did, 148 148 startedAt: Date.now() - 2 * 60 * 60 * 1000, 149 149 state: 'running', 150 150 fetchedPosts: 10, 151 151 lockToken: `lock-${did}`, 152 + lockExpiresAt: Date.now() - 30 * 60 * 1000, 152 153 }) 153 154 } 154 155 155 156 const dispatched: string[] = [] 156 157 const result = await unstickBackfillJobs({ 157 - staleMinutes: 30, 158 158 dispatch: async (did) => { 159 159 dispatched.push(did) 160 160 }, ··· 162 162 163 163 assert.equal(result.unstuck, 2) 164 164 assert.lengthOf(dispatched, 2) 165 - assert.includeMembers(dispatched, [DID_STALLED, DID_STALLED_2]) 165 + assert.includeMembers(dispatched, [DID_EXPIRED_LEASE, DID_EXPIRED_LEASE_2]) 166 166 }) 167 167 168 - test('uses default 60 minutes when staleMinutes not provided', async ({ assert }) => { 169 - await seedProfile(DID_STALLED) 168 + test('respects staleMinutes grace period before re-dispatching expired leases', async ({ 169 + assert, 170 + }) => { 171 + await seedProfile(DID_EXPIRED_LEASE) 170 172 await BackfillJobRow.create({ 171 - did: DID_STALLED, 172 - startedAt: Date.now() - 45 * 60 * 1000, // 45 min ago 173 + did: DID_EXPIRED_LEASE, 174 + startedAt: Date.now() - 60 * 60 * 1000, 173 175 state: 'running', 174 176 fetchedPosts: 5, 175 177 lockToken: 'stale', 178 + lockExpiresAt: Date.now() - 2 * 60 * 1000, // expired 2 min ago 176 179 }) 177 180 178 181 const dispatched: string[] = [] 179 - 180 - // Should NOT be unstuck with default (60 min) 181 182 const result = await unstickBackfillJobs({ 183 + staleMinutes: 5, // grace > how long ago lease expired 182 184 dispatch: async (did) => { 183 185 dispatched.push(did) 184 186 },
+251
tests/unit/jobs/backfill_job.spec.ts
··· 513 513 }) 514 514 }) 515 515 516 + test.group('BackfillJob — lease lifecycle', (group) => { 517 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 518 + 519 + test('clears lock_token and lock_expires_at after successful completion', async ({ assert }) => { 520 + await TrackedProfile.create({ 521 + did: TEST_DID, 522 + handle: 'test.bsky.social', 523 + firstSeenAt: Date.now(), 524 + }) 525 + await BackfillJobRow.create({ 526 + did: TEST_DID, 527 + startedAt: Date.now(), 528 + state: 'running', 529 + fetchedPosts: 0, 530 + }) 531 + 532 + const atproto = makeMockAtprotoClient({ 533 + pages: [{ posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/a`)] }], 534 + }) 535 + const job = makeJob(atproto, makeMockClickHouseStore()) 536 + 537 + await job.execute() 538 + 539 + const row = await BackfillJobRow.findOrFail(TEST_DID) 540 + assert.equal(row.state, 'done') 541 + assert.isNull(row.lockToken) 542 + assert.isNull(row.lockExpiresAt) 543 + }) 544 + 545 + test('clears lock_token and lock_expires_at when the loop throws', async ({ assert }) => { 546 + await TrackedProfile.create({ 547 + did: TEST_DID, 548 + handle: 'test.bsky.social', 549 + firstSeenAt: Date.now(), 550 + }) 551 + await BackfillJobRow.create({ 552 + did: TEST_DID, 553 + startedAt: Date.now(), 554 + state: 'running', 555 + fetchedPosts: 0, 556 + }) 557 + 558 + const atproto = makeMockAtprotoClient({ 559 + pages: [{ posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/a`)] }], 560 + }) 561 + // ClickHouse throws — simulates the pfrazee outage 562 + const clickhouse = makeMockClickHouseStore({ shouldThrow: true }) 563 + const job = makeJob(atproto, clickhouse) 564 + 565 + await assert.rejects(() => job.execute(), /ClickHouse unavailable/) 566 + 567 + const row = await BackfillJobRow.findOrFail(TEST_DID) 568 + // Lease released so the next dispatch can immediately retry 569 + assert.isNull(row.lockToken) 570 + assert.isNull(row.lockExpiresAt) 571 + // State stays 'running' — failed() only fires after queue retries are exhausted 572 + assert.equal(row.state, 'running') 573 + }) 574 + 575 + test('claims a row whose lease has expired (recovery from a crashed worker)', async ({ 576 + assert, 577 + }) => { 578 + await TrackedProfile.create({ 579 + did: TEST_DID, 580 + handle: 'test.bsky.social', 581 + firstSeenAt: Date.now(), 582 + }) 583 + await BackfillJobRow.create({ 584 + did: TEST_DID, 585 + startedAt: Date.now() - 60 * 60 * 1000, 586 + state: 'running', 587 + fetchedPosts: 0, 588 + // Previous worker crashed mid-execution: lease still set, but expired. 589 + lockToken: 'crashed-worker-token', 590 + lockExpiresAt: Date.now() - 10 * 60 * 1000, 591 + }) 592 + 593 + const atproto = makeMockAtprotoClient({ 594 + pages: [{ posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/a`)] }], 595 + }) 596 + const clickhouse = makeMockClickHouseStore() 597 + const job = makeJob(atproto, clickhouse) 598 + 599 + await job.execute() 600 + 601 + // Backfill ran to completion despite the stale lease left by a crashed worker 602 + assert.equal(clickhouse._calls.length, 1) 603 + const row = await BackfillJobRow.findOrFail(TEST_DID) 604 + assert.equal(row.state, 'done') 605 + assert.isNull(row.lockToken) 606 + }) 607 + 608 + test('refuses to claim a row whose lease is still live', async ({ assert }) => { 609 + await TrackedProfile.create({ 610 + did: TEST_DID, 611 + handle: 'test.bsky.social', 612 + firstSeenAt: Date.now(), 613 + }) 614 + await BackfillJobRow.create({ 615 + did: TEST_DID, 616 + startedAt: Date.now(), 617 + state: 'running', 618 + fetchedPosts: 7, 619 + // Live lease held by another worker — expires 5 minutes from now 620 + lockToken: 'other-worker-token', 621 + lockExpiresAt: Date.now() + 5 * 60 * 1000, 622 + }) 623 + 624 + const clickhouse = makeMockClickHouseStore() 625 + const atproto = makeMockAtprotoClient({ 626 + pages: [{ posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/a`)] }], 627 + }) 628 + const job = makeJob(atproto, clickhouse) 629 + 630 + await job.execute() 631 + 632 + assert.equal(clickhouse._calls.length, 0) 633 + const row = await BackfillJobRow.findOrFail(TEST_DID) 634 + // Other worker's lease untouched 635 + assert.equal(row.lockToken, 'other-worker-token') 636 + assert.equal(row.fetchedPosts, 7) 637 + }) 638 + 639 + test('extends lock_expires_at on each progress save', async ({ assert }) => { 640 + await TrackedProfile.create({ 641 + did: TEST_DID, 642 + handle: 'test.bsky.social', 643 + firstSeenAt: Date.now(), 644 + }) 645 + await BackfillJobRow.create({ 646 + did: TEST_DID, 647 + startedAt: Date.now(), 648 + state: 'running', 649 + fetchedPosts: 0, 650 + }) 651 + 652 + const beforeExecute = Date.now() 653 + const atproto = makeMockAtprotoClient({ 654 + pages: [{ posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/a`)] }], 655 + }) 656 + const job = makeJob(atproto, makeMockClickHouseStore()) 657 + 658 + await job.execute() 659 + 660 + // Job completed, so lease is cleared. We can't observe the in-flight 661 + // extension directly here — covered separately in the resume-after-crash 662 + // and concurrent-execution tests. Sanity-check the row exists post-clear. 663 + const row = await BackfillJobRow.findOrFail(TEST_DID) 664 + assert.isNull(row.lockExpiresAt) 665 + assert.isAtLeast(row.finishedAt!, beforeExecute) 666 + }) 667 + }) 668 + 669 + test.group('BackfillJob — checkpoint resume', (group) => { 670 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 671 + 672 + test('resumes from saved cursor + fetched_posts after a re-dispatch', async ({ assert }) => { 673 + await TrackedProfile.create({ 674 + did: TEST_DID, 675 + handle: 'test.bsky.social', 676 + firstSeenAt: Date.now(), 677 + }) 678 + // Simulates a half-finished backfill: a previous worker fetched 250 679 + // posts, saved cursor 'page-3', then crashed. 680 + await BackfillJobRow.create({ 681 + did: TEST_DID, 682 + startedAt: Date.now() - 60 * 60 * 1000, 683 + state: 'running', 684 + fetchedPosts: 250, 685 + cursor: 'page-3', 686 + }) 687 + 688 + const atproto = makeMockAtprotoClient({ 689 + pages: [ 690 + { posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/p4`)], cursor: 'page-4' }, 691 + { posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/p5`)] }, 692 + ], 693 + }) as unknown as ReturnType<typeof makeMockAtprotoClient> & { 694 + _getAuthorFeedCalls: Array<{ did: string; cursor?: string }> 695 + } 696 + const clickhouse = makeMockClickHouseStore() 697 + const job = makeJob(atproto, clickhouse) 698 + 699 + await job.execute() 700 + 701 + // First call resumes from the saved cursor — no restart from undefined 702 + assert.equal(atproto._getAuthorFeedCalls[0].cursor, 'page-3') 703 + // Total reflects checkpoint + new pages: 250 + 1 + 1 704 + const row = await BackfillJobRow.findOrFail(TEST_DID) 705 + assert.equal(row.fetchedPosts, 252) 706 + assert.equal(row.state, 'done') 707 + assert.isNull(row.cursor) 708 + }) 709 + 710 + test('persists cursor on each progress save so the next dispatch can resume', async ({ 711 + assert, 712 + }) => { 713 + await TrackedProfile.create({ 714 + did: TEST_DID, 715 + handle: 'test.bsky.social', 716 + firstSeenAt: Date.now(), 717 + }) 718 + await BackfillJobRow.create({ 719 + did: TEST_DID, 720 + startedAt: Date.now(), 721 + state: 'running', 722 + fetchedPosts: 0, 723 + }) 724 + 725 + // Page 1 returns a cursor; ClickHouse insert for page 2 throws so we can 726 + // observe the cursor that was checkpointed after page 1. 727 + let pageIndex = 0 728 + const atproto = { 729 + async getAuthorFeed() { 730 + const i = pageIndex++ 731 + if (i === 0) { 732 + return { 733 + posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/p1`)], 734 + cursor: 'cursor-after-page-1', 735 + } 736 + } 737 + return { 738 + posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/p2`)], 739 + cursor: 'cursor-after-page-2', 740 + } 741 + }, 742 + async getPosts() { 743 + throw new Error('should not be called') 744 + }, 745 + } as unknown as AtprotoClient 746 + 747 + let chCallIndex = 0 748 + const clickhouse = { 749 + async insertPostSnapshots() { 750 + const i = chCallIndex++ 751 + if (i === 1) throw new Error('ClickHouse unavailable') 752 + }, 753 + } as unknown as ClickHouseStore 754 + const job = makeJob(atproto, clickhouse) 755 + 756 + await assert.rejects(() => job.execute(), /ClickHouse unavailable/) 757 + 758 + const row = await BackfillJobRow.findOrFail(TEST_DID) 759 + // Cursor saved after page 1 (the successful page) — not after page 2 760 + assert.equal(row.cursor, 'cursor-after-page-1') 761 + assert.equal(row.fetchedPosts, 1) 762 + // Lease released so a retry can claim 763 + assert.isNull(row.lockToken) 764 + }) 765 + }) 766 + 516 767 test.group('BackfillJob — error cases', (group) => { 517 768 group.each.setup(() => testUtils.db().withGlobalTransaction()) 518 769