See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix duplicate BackfillJob execution via atomic lock token

The database queue driver's stalled-job recovery can re-dispatch
long-running backfill jobs while the original is still active, causing
progress to jump erratically. Add a lock_token column to backfill_jobs
and claim it with an atomic CAS UPDATE at the start of execute() — only
one instance wins, the other bails out immediately.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+112
+19
app/jobs/backfill_job.ts
··· 1 + import { randomUUID } from 'node:crypto' 1 2 import { inject } from '@adonisjs/core' 2 3 import { Job } from '@adonisjs/queue' 3 4 import { AtprotoClient, parseGetAuthorFeedResponse } from '#lib/atproto/index' ··· 81 82 return 82 83 } 83 84 85 + // Atomically claim exclusive execution. The database queue driver may 86 + // re-dispatch long-running jobs if it considers them stalled, so two 87 + // instances of the same job can race. We use a random lock token with 88 + // a compare-and-swap UPDATE: only one instance can set lock_token when 89 + // it is still NULL, and SQLite serialises writes so exactly one wins. 90 + const lockToken = randomUUID() 91 + const claimed = await BackfillJobRow.query() 92 + .where('did', did) 93 + .where('state', 'running') 94 + .whereNull('lockToken') 95 + .update({ lockToken }) 96 + 97 + // claimed is [rowsAffected] — if 0, another execution already has the lock 98 + if ((Array.isArray(claimed) ? claimed[0] : claimed) === 0) { 99 + return 100 + } 101 + 84 102 // --------------------------------------------------------------------------- 85 103 // Main backfill loop 86 104 // --------------------------------------------------------------------------- ··· 135 153 jobRow.state = 'failed' 136 154 jobRow.error = error.message 137 155 jobRow.finishedAt = Date.now() 156 + jobRow.lockToken = null 138 157 await jobRow.save() 139 158 } 140 159 }
+3
app/models/backfill_job.ts
··· 27 27 28 28 @column() 29 29 declare error: string | null 30 + 31 + @column() 32 + declare lockToken: string | null 30 33 }
+17
database/migrations/1775991741240_add_lock_token_to_backfill_jobs.ts
··· 1 + import { BaseSchema } from '@adonisjs/lucid/schema' 2 + 3 + export default class extends BaseSchema { 4 + protected tableName = 'backfill_jobs' 5 + 6 + async up() { 7 + this.schema.alterTable(this.tableName, (table) => { 8 + table.text('lock_token').nullable() 9 + }) 10 + } 11 + 12 + async down() { 13 + this.schema.alterTable(this.tableName, (table) => { 14 + table.dropColumn('lock_token') 15 + }) 16 + } 17 + }
+73
tests/unit/jobs/backfill_job.spec.ts
··· 412 412 }) 413 413 }) 414 414 415 + test.group('BackfillJob — lock token prevents concurrent execution', (group) => { 416 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 417 + 418 + test('second execution bails out when lock is already held', async ({ assert }) => { 419 + await User.create({ did: TEST_DID, handle: 'test.bsky.social', firstSeenAt: Date.now() }) 420 + await BackfillJobRow.create({ 421 + did: TEST_DID, 422 + startedAt: Date.now(), 423 + state: 'running', 424 + fetchedPosts: 0, 425 + }) 426 + 427 + // First job: slow — uses a promise we control to keep it running 428 + let resolveFirstPage!: () => void 429 + const firstPagePromise = new Promise<void>((r) => { 430 + resolveFirstPage = r 431 + }) 432 + 433 + const firstJobAtproto = { 434 + async getAuthorFeed() { 435 + await firstPagePromise 436 + return { 437 + posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/a`)], 438 + cursor: undefined, 439 + } 440 + }, 441 + async getPosts() { 442 + throw new Error('should not be called') 443 + }, 444 + } as unknown as AtprotoClient 445 + 446 + // Second job: fast — would insert data if it ran 447 + const secondJobClickhouse = makeMockClickHouseStore() 448 + const secondJobAtproto = makeMockAtprotoClient({ 449 + pages: [ 450 + { 451 + posts: [makeFeedViewPost(`at://${TEST_DID}/app.bsky.feed.post/b`)], 452 + }, 453 + ], 454 + }) 455 + 456 + const clickhouse = makeMockClickHouseStore() 457 + 458 + const job1 = makeJob(firstJobAtproto, clickhouse) 459 + const job2 = makeJob(secondJobAtproto, secondJobClickhouse) 460 + 461 + // Start job1 — it will claim the lock then block on getAuthorFeed 462 + const job1Promise = job1.execute() 463 + 464 + // Give job1 a tick to claim the lock 465 + await new Promise((r) => setTimeout(r, 50)) 466 + 467 + // Job2 should bail out immediately because lock is already held 468 + await job2.execute() 469 + 470 + // Job2's ClickHouse store should never have been called 471 + assert.equal(secondJobClickhouse._calls.length, 0) 472 + 473 + // Verify the lock was claimed (not null) 474 + const midRow = await BackfillJobRow.findOrFail(TEST_DID) 475 + assert.isNotNull(midRow.lockToken) 476 + 477 + // Unblock job1 so it can finish 478 + resolveFirstPage() 479 + await job1Promise 480 + 481 + // Job1 completed normally 482 + const finalRow = await BackfillJobRow.findOrFail(TEST_DID) 483 + assert.equal(finalRow.state, 'done') 484 + assert.equal(finalRow.fetchedPosts, 1) 485 + }) 486 + }) 487 + 415 488 test.group('BackfillJob — error cases', (group) => { 416 489 group.each.setup(() => testUtils.db().withGlobalTransaction()) 417 490