See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add backfill:unstick command to recover stalled backfill jobs

After a server restart, backfill jobs can get stuck with a held lock token
that prevents retries. This adds an ace command that finds stalled jobs
(running + locked + older than N minutes), resets their lock, and
re-dispatches them via the queue. Includes --dry-run and --minutes flags.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+272
+37
app/lib/backfill_unstick.ts
··· 1 + import BackfillJobRow from '#models/backfill_job' 2 + 3 + export interface UnstickOptions { 4 + /** Minutes since startedAt before a locked job is considered stalled. Default: 60 */ 5 + staleMinutes?: number 6 + /** Called for each unstuck DID to re-enqueue the backfill job. */ 7 + dispatch: (did: string) => Promise<void> 8 + } 9 + 10 + export interface UnstickResult { 11 + unstuck: number 12 + } 13 + 14 + /** 15 + * Finds backfill jobs that are stuck — state='running' with a non-null 16 + * lockToken and startedAt older than the threshold — resets their lock so 17 + * they can be re-dispatched. 18 + */ 19 + export async function unstickBackfillJobs(options: UnstickOptions): Promise<UnstickResult> { 20 + const staleMinutes = options.staleMinutes ?? 60 21 + const cutoff = Date.now() - staleMinutes * 60 * 1000 22 + 23 + const stalled = await BackfillJobRow.query() 24 + .where('state', 'running') 25 + .whereNotNull('lockToken') 26 + .where('startedAt', '<', cutoff) 27 + 28 + for (const row of stalled) { 29 + row.lockToken = null 30 + row.fetchedPosts = 0 31 + row.startedAt = Date.now() 32 + await row.save() 33 + await options.dispatch(row.did) 34 + } 35 + 36 + return { unstuck: stalled.length } 37 + }
+59
commands/backfill_unstick.ts
··· 1 + import { BaseCommand, flags } from '@adonisjs/core/ace' 2 + import { CommandOptions } from '@adonisjs/core/types/ace' 3 + import { unstickBackfillJobs } from '#lib/backfill_unstick' 4 + import BackfillJob from '#jobs/backfill_job' 5 + 6 + export default class BackfillUnstick extends BaseCommand { 7 + static commandName = 'backfill:unstick' 8 + static description = 'Reset stalled backfill jobs and re-dispatch them' 9 + static options: CommandOptions = { startApp: true } 10 + 11 + @flags.number({ 12 + description: 'Only unstick jobs started more than N minutes ago (default: 60)', 13 + default: 60, 14 + }) 15 + declare minutes: number 16 + 17 + @flags.boolean({ 18 + description: 'Preview which jobs would be unstuck without making changes', 19 + default: false, 20 + }) 21 + declare dryRun: boolean 22 + 23 + async run() { 24 + if (this.dryRun) { 25 + const { default: BackfillJobRow } = await import('#models/backfill_job') 26 + const cutoff = Date.now() - this.minutes * 60 * 1000 27 + const stalled = await BackfillJobRow.query() 28 + .where('state', 'running') 29 + .whereNotNull('lockToken') 30 + .where('startedAt', '<', cutoff) 31 + 32 + if (stalled.length === 0) { 33 + this.logger.info('No stalled jobs found.') 34 + return 35 + } 36 + 37 + this.logger.info(`Would unstick ${stalled.length} job(s):`) 38 + for (const row of stalled) { 39 + const stalledFor = Math.round((Date.now() - row.startedAt) / 60_000) 40 + this.logger.info(` ${row.did} — stalled ~${stalledFor}m, fetched ${row.fetchedPosts} posts`) 41 + } 42 + return 43 + } 44 + 45 + const result = await unstickBackfillJobs({ 46 + staleMinutes: this.minutes, 47 + dispatch: async (did) => { 48 + await BackfillJob.dispatch({ did }) 49 + this.logger.info(` dispatched ${did}`) 50 + }, 51 + }) 52 + 53 + if (result.unstuck === 0) { 54 + this.logger.info('No stalled jobs found.') 55 + } else { 56 + this.logger.success(`Unstuck and re-dispatched ${result.unstuck} job(s).`) 57 + } 58 + } 59 + }
+176
tests/unit/commands/backfill_unstick.spec.ts
··· 1 + import { test } from '@japa/runner' 2 + import testUtils from '@adonisjs/core/services/test_utils' 3 + import BackfillJobRow from '#models/backfill_job' 4 + import TrackedProfile from '#models/tracked_profile' 5 + import { unstickBackfillJobs } from '#lib/backfill_unstick' 6 + 7 + // --------------------------------------------------------------------------- 8 + // Helpers 9 + // --------------------------------------------------------------------------- 10 + 11 + const DID_STALLED = 'did:plc:stalled1' 12 + const DID_STALLED_2 = 'did:plc:stalled2' 13 + const DID_RUNNING_LEGIT = 'did:plc:running' 14 + const DID_DONE = 'did:plc:done' 15 + const DID_FAILED = 'did:plc:failed' 16 + 17 + async function seedProfile(did: string) { 18 + await TrackedProfile.firstOrCreate( 19 + { did }, 20 + { did, handle: `${did.split(':')[2]}.test`, firstSeenAt: Date.now() } 21 + ) 22 + } 23 + 24 + // --------------------------------------------------------------------------- 25 + // Tests 26 + // --------------------------------------------------------------------------- 27 + 28 + test.group('unstickBackfillJobs — identifies stalled jobs', (group) => { 29 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 30 + 31 + test('resets jobs that are running with a lock token older than threshold', async ({ assert }) => { 32 + await seedProfile(DID_STALLED) 33 + await BackfillJobRow.create({ 34 + did: DID_STALLED, 35 + startedAt: Date.now() - 60 * 60 * 1000, // 1 hour ago 36 + state: 'running', 37 + fetchedPosts: 42, 38 + lockToken: 'stale-lock-token', 39 + }) 40 + 41 + const dispatched: string[] = [] 42 + const result = await unstickBackfillJobs({ 43 + staleMinutes: 30, 44 + dispatch: async (did) => { dispatched.push(did) }, 45 + }) 46 + 47 + assert.equal(result.unstuck, 1) 48 + 49 + const row = await BackfillJobRow.findOrFail(DID_STALLED) 50 + assert.isNull(row.lockToken) 51 + assert.equal(row.state, 'running') 52 + assert.equal(row.fetchedPosts, 0) 53 + 54 + assert.deepEqual(dispatched, [DID_STALLED]) 55 + }) 56 + 57 + test('does not touch jobs started recently (within threshold)', async ({ assert }) => { 58 + await seedProfile(DID_RUNNING_LEGIT) 59 + await BackfillJobRow.create({ 60 + did: DID_RUNNING_LEGIT, 61 + startedAt: Date.now() - 5 * 60 * 1000, // 5 minutes ago 62 + state: 'running', 63 + fetchedPosts: 10, 64 + lockToken: 'active-lock', 65 + }) 66 + 67 + const dispatched: string[] = [] 68 + const result = await unstickBackfillJobs({ 69 + staleMinutes: 30, 70 + dispatch: async (did) => { dispatched.push(did) }, 71 + }) 72 + 73 + assert.equal(result.unstuck, 0) 74 + assert.lengthOf(dispatched, 0) 75 + 76 + const row = await BackfillJobRow.findOrFail(DID_RUNNING_LEGIT) 77 + assert.equal(row.lockToken, 'active-lock') 78 + assert.equal(row.fetchedPosts, 10) 79 + }) 80 + 81 + test('does not touch done or failed jobs', async ({ assert }) => { 82 + await seedProfile(DID_DONE) 83 + await seedProfile(DID_FAILED) 84 + 85 + await BackfillJobRow.create({ 86 + did: DID_DONE, 87 + startedAt: Date.now() - 2 * 60 * 60 * 1000, 88 + state: 'done', 89 + fetchedPosts: 500, 90 + lockToken: null, 91 + }) 92 + await BackfillJobRow.create({ 93 + did: DID_FAILED, 94 + startedAt: Date.now() - 2 * 60 * 60 * 1000, 95 + state: 'failed', 96 + fetchedPosts: 0, 97 + lockToken: null, 98 + error: 'some error', 99 + }) 100 + 101 + const dispatched: string[] = [] 102 + const result = await unstickBackfillJobs({ 103 + staleMinutes: 30, 104 + dispatch: async (did) => { dispatched.push(did) }, 105 + }) 106 + 107 + assert.equal(result.unstuck, 0) 108 + assert.lengthOf(dispatched, 0) 109 + }) 110 + 111 + test('does not touch running jobs without a lock token', async ({ assert }) => { 112 + await seedProfile(DID_RUNNING_LEGIT) 113 + await BackfillJobRow.create({ 114 + did: DID_RUNNING_LEGIT, 115 + startedAt: Date.now() - 60 * 60 * 1000, 116 + state: 'running', 117 + fetchedPosts: 0, 118 + lockToken: null, // no lock — hasn't been picked up yet 119 + }) 120 + 121 + const dispatched: string[] = [] 122 + const result = await unstickBackfillJobs({ 123 + staleMinutes: 30, 124 + dispatch: async (did) => { dispatched.push(did) }, 125 + }) 126 + 127 + assert.equal(result.unstuck, 0) 128 + assert.lengthOf(dispatched, 0) 129 + }) 130 + 131 + test('unsticks multiple stalled jobs', async ({ assert }) => { 132 + await seedProfile(DID_STALLED) 133 + await seedProfile(DID_STALLED_2) 134 + 135 + for (const did of [DID_STALLED, DID_STALLED_2]) { 136 + await BackfillJobRow.create({ 137 + did, 138 + startedAt: Date.now() - 2 * 60 * 60 * 1000, 139 + state: 'running', 140 + fetchedPosts: 10, 141 + lockToken: `lock-${did}`, 142 + }) 143 + } 144 + 145 + const dispatched: string[] = [] 146 + const result = await unstickBackfillJobs({ 147 + staleMinutes: 30, 148 + dispatch: async (did) => { dispatched.push(did) }, 149 + }) 150 + 151 + assert.equal(result.unstuck, 2) 152 + assert.lengthOf(dispatched, 2) 153 + assert.includeMembers(dispatched, [DID_STALLED, DID_STALLED_2]) 154 + }) 155 + 156 + test('uses default 60 minutes when staleMinutes not provided', async ({ assert }) => { 157 + await seedProfile(DID_STALLED) 158 + await BackfillJobRow.create({ 159 + did: DID_STALLED, 160 + startedAt: Date.now() - 45 * 60 * 1000, // 45 min ago 161 + state: 'running', 162 + fetchedPosts: 5, 163 + lockToken: 'stale', 164 + }) 165 + 166 + const dispatched: string[] = [] 167 + 168 + // Should NOT be unstuck with default (60 min) 169 + const result = await unstickBackfillJobs({ 170 + dispatch: async (did) => { dispatched.push(did) }, 171 + }) 172 + 173 + assert.equal(result.unstuck, 0) 174 + assert.lengthOf(dispatched, 0) 175 + }) 176 + })