alf: the atproto Latency Fabric alf.fly.dev/
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 489 lines 18 kB view raw
1// ABOUTME: Background scheduler that polls for ready drafts and publishes them to the PDS 2 3import { Agent } from '@atproto/api'; 4import { randomUUID } from 'node:crypto'; 5import type { ServiceConfig } from './config.js'; 6import { createLogger } from './logger.js'; 7import { schedulerWakeupsTotal, schedulerPublishesTotal, publishDurationSeconds } from './metrics.js'; 8import { 9 getDb, 10 getReadyDrafts, 11 getNextScheduledAt, 12 claimDraftForPublishing, 13 markDraftPublished, 14 markDraftFailed, 15 incrementRetryCount, 16 getDraft, 17 getDraftRawRow, 18 getUserAuthorization, 19 getBlobsByCids, 20 deleteBlobs, 21 getRawSchedule, 22 incrementScheduleFireCount, 23 updateScheduleNextDraft, 24 updateScheduleStatus, 25 createDraft, 26} from './storage.js'; 27import { getOAuthClient } from './oauth.js'; 28import type { DraftRow } from './schema.js'; 29import { extractDidFromAtUri } from './schema.js'; 30import { computeNextOccurrence, getOccurrenceRecord } from '@newpublic/recurrence'; 31import type { RecurrenceRule } from '@newpublic/recurrence'; 32 33const logger = createLogger('Scheduler'); 34 35const MAX_RETRIES = 3; 36 37// Backoff delays for retries (milliseconds) 38const RETRY_BACKOFF_MS = [ 39 1 * 60 * 1000, // 1 minute 40 5 * 60 * 1000, // 5 minutes 41 15 * 60 * 1000, // 15 minutes 42]; 43 44let schedulerRunning = false; 45let wakeupTimeout: ReturnType<typeof setTimeout> | null = null; 46let currentConfig: ServiceConfig | null = null; 47let scheduleGeneration = 0; 48 49/** 50 * Walk a record JSON and collect all blob CIDs (nodes with { $type: "blob" }) 51 */ 52function collectBlobCids(value: unknown): string[] { 53 if (typeof value !== 'object' || value === null) return []; 54 const obj = value as Record<string, unknown>; 55 if (obj['$type'] === 'blob' && typeof obj['ref'] === 'object' && obj['ref'] !== null) { 56 const ref = obj['ref'] as Record<string, unknown>; 57 const link = ref['$link']; 58 if (typeof link === 'string') return [link]; 59 } 60 return Object.values(obj).flatMap(collectBlobCids); 61} 62 63/** 64 * After a successful publish, handle schedule chaining: 65 * increment fire_count, compute next occurrence, create next draft. 66 */ 67async function handleScheduleChaining(draftRow: DraftRow): Promise<void> { 68 const scheduleId = draftRow.schedule_id; 69 /* istanbul ignore next */ 70 if (!scheduleId) return; 71 72 const schedule = await getRawSchedule(scheduleId); 73 if (!schedule || schedule.status !== 'active') return; 74 75 const userDid = draftRow.user_did; 76 const now = new Date(); 77 78 // If dynamic content, fetch it (just for logging — it was already published) 79 if (schedule.content_url) { 80 try { 81 const url = new URL(schedule.content_url); 82 url.searchParams.set('fireCount', String(schedule.fire_count + 1)); 83 url.searchParams.set('scheduledAt', new Date(Number(draftRow.scheduled_at ?? /* istanbul ignore next */ Date.now())).toISOString()); 84 logger.info('Dynamic schedule published', { scheduleId, contentUrl: url.toString() }); 85 } catch (err) { 86 /* istanbul ignore next */ 87 logger.warn('Failed to build content URL for logging', { scheduleId, error: String(err) }); 88 } 89 } 90 91 // Increment fire count 92 await incrementScheduleFireCount(scheduleId); 93 94 // Compute next occurrence 95 let rule: RecurrenceRule; 96 try { 97 rule = JSON.parse(schedule.recurrence_rule) as RecurrenceRule; 98 } catch { 99 logger.error('Invalid recurrence rule JSON in schedule', undefined, { scheduleId }); 100 await updateScheduleStatus(scheduleId, 'error'); 101 return; 102 } 103 104 const nextFireAt = computeNextOccurrence(rule, now); 105 if (!nextFireAt) { 106 // Series exhausted — 'once' schedules complete naturally; others are cancelled 107 const isOnce = rule.rule.type === 'once'; 108 if (isOnce) { 109 logger.info('Once schedule completed, marking completed', { scheduleId }); 110 await updateScheduleStatus(scheduleId, 'completed'); 111 } else { 112 logger.info('Schedule series exhausted, marking cancelled', { scheduleId }); 113 await updateScheduleStatus(scheduleId, 'cancelled'); 114 } 115 await updateScheduleNextDraft(scheduleId, null); 116 return; 117 } 118 119 // For dynamic schedules, create a draft with null record (fetched at publish time) 120 // For static schedules, create a draft with the template record 121 const nextRecord = schedule.content_url ? null : (schedule.record ? JSON.parse(schedule.record) as Record<string, unknown> : /* istanbul ignore next */ null); 122 123 // Build AT-URI for next draft 124 const rkey = `sched-${Date.now()}-${randomUUID().substring(0, 8)}`; 125 const collection = schedule.collection; 126 const uri = `at://${userDid}/${collection}/${rkey}`; 127 128 try { 129 await createDraft({ 130 uri, 131 userDid, 132 collection, 133 rkey, 134 record: nextRecord, 135 recordCid: null, // Will be computed at publish time for static records 136 action: 'create', 137 scheduledAt: nextFireAt.getTime(), 138 scheduleId, 139 }); 140 141 await updateScheduleNextDraft(scheduleId, uri); 142 notifyScheduler(); 143 logger.info('Next scheduled draft created', { scheduleId, uri, nextFireAt: nextFireAt.toISOString() }); 144 } catch (err) { 145 logger.error('Failed to create next schedule draft', err instanceof Error ? err : /* istanbul ignore next */ undefined, { scheduleId }); 146 await updateScheduleStatus(scheduleId, 'error'); 147 } 148} 149 150/** 151 * Publish a single draft to the user's PDS. 152 * This is the shared logic used by both the scheduler and publishPost(). 153 */ 154export async function publishDraft(uri: string, config: ServiceConfig): Promise<void> { 155 // Atomically claim the draft 156 const claimed = await claimDraftForPublishing(uri); 157 if (!claimed) { 158 logger.warn('Draft already claimed or not in publishable state', { uri }); 159 return; 160 } 161 162 const draft = await getDraft(uri); 163 if (!draft) { 164 logger.error('Draft not found after claiming', undefined, { uri }); 165 return; 166 } 167 168 logger.info('Publishing draft', { uri, action: draft.action }); 169 170 let userDid: string; 171 try { 172 userDid = extractDidFromAtUri(uri); 173 } catch { 174 logger.error('Invalid AT-URI in database, skipping draft', undefined, { uri }); 175 return; 176 } 177 const publishStart = Date.now(); 178 179 try { 180 // Look up user authorization — OAuth is the only supported auth type 181 const authRecord = await getUserAuthorization(userDid); 182 183 if (!authRecord || authRecord.auth_type !== 'oauth') { 184 await markDraftFailed(uri, 'no_oauth_authorization', false); 185 logger.error('No OAuth authorization found for user', undefined, { uri, userDid }); 186 return; 187 } 188 189 let agent: Agent; 190 191 // OAuth-type auth: restore OAuth + DPoP session 192 const oauthClient = getOAuthClient(); 193 try { 194 const session = await oauthClient.restore(userDid); 195 agent = new Agent(session); 196 } catch (oauthErr) { 197 const errMsg = oauthErr instanceof Error ? oauthErr.message : /* istanbul ignore next */ String(oauthErr); 198 const errCode = (oauthErr as { error?: string }).error ?? ''; 199 // OAuthResponseError from a failed token refresh has .error on the cause 200 const causeCode = ((oauthErr as { cause?: { error?: string } }).cause?.error) ?? ''; 201 const isPermanentAuthFailure = 202 errMsg.includes('revoked') || 203 errMsg.includes('invalid_grant') || 204 errCode === 'invalid_token' || 205 errMsg.includes('invalid_token') || 206 causeCode === 'invalid_grant' || 207 // TokenRefreshError wraps the underlying OAuth error 208 errMsg.toLowerCase().includes('token refresh') || 209 errMsg.toLowerCase().includes('refresh failed'); 210 if (isPermanentAuthFailure) { 211 await markDraftFailed(uri, 'oauth_revoked', false); 212 logger.error('OAuth token revoked or refresh failed for draft', undefined, { uri, errMsg, errCode, causeCode }); 213 return; 214 } 215 throw oauthErr; 216 } 217 218 // Execute the PDS call based on the draft action 219 if (draft.action === 'delete') { 220 await agent.com.atproto.repo.deleteRecord({ 221 repo: userDid, 222 collection: draft.collection, 223 rkey: draft.rkey, 224 }); 225 } else { 226 // For dynamic schedule drafts, fetch content from content_url at publish time 227 let record: Record<string, unknown>; 228 const draftRaw = await getDraftRawRow(uri); 229 const scheduleId = draftRaw?.schedule_id; 230 231 if (scheduleId && !draftRaw?.record) { 232 // Dynamic schedule: fetch content URL 233 const schedule = await getRawSchedule(scheduleId); 234 if (!schedule?.content_url) { 235 await markDraftFailed(uri, 'dynamic_schedule_missing_content_url', false); 236 return; 237 } 238 try { 239 const url = new URL(schedule.content_url); 240 url.searchParams.set('fireCount', String(schedule.fire_count + 1)); 241 url.searchParams.set('scheduledAt', new Date(Number(draftRaw?.scheduled_at ?? /* istanbul ignore next */ Date.now())).toISOString()); 242 const resp = await fetch(url.toString(), { 243 headers: { 'Accept': 'application/json' }, 244 }); 245 if (!resp.ok) { 246 throw new Error(`Content URL returned ${resp.status}`); 247 } 248 record = await resp.json() as Record<string, unknown>; 249 } catch (fetchErr) { 250 const errMsg = fetchErr instanceof Error ? fetchErr.message : /* istanbul ignore next */ String(fetchErr); 251 logger.error('Failed to fetch dynamic schedule content', fetchErr instanceof Error ? fetchErr : /* istanbul ignore next */ undefined, { uri, scheduleId }); 252 await markDraftFailed(uri, `content_url_fetch_failed: ${errMsg}`, false); 253 await updateScheduleStatus(scheduleId, 'error'); 254 return; 255 } 256 } else { 257 record = JSON.parse( 258 /* istanbul ignore next */ (await getDraftRecord(uri)) ?? '{}', 259 ) as Record<string, unknown>; 260 261 // Check for override_payload exception on this occurrence 262 if (scheduleId && draftRaw?.scheduled_at) { 263 const schedule = await getRawSchedule(scheduleId); 264 if (schedule) { 265 try { 266 const parsedRule = JSON.parse(schedule.recurrence_rule) as RecurrenceRule; 267 const overrideRecord = getOccurrenceRecord(parsedRule, new Date(Number(draftRaw.scheduled_at))); 268 if (overrideRecord) record = overrideRecord; 269 } catch { 270 // Invalid rule JSON — use draft record as-is 271 } 272 } 273 } 274 } 275 276 // Use scheduled time as the published createdAt so the post appears 277 // in the feed at the intended time rather than at draft creation time. 278 if (draft.scheduledAt) { 279 record.createdAt = draft.scheduledAt; 280 record.scheduledAt = draft.scheduledAt; 281 } 282 283 // Re-upload any stored blobs before publishing. 284 // Blobs are deleted from local storage only AFTER the PDS write succeeds, 285 // so that retries can re-upload them if the record commit fails. 286 const blobCids = collectBlobCids(record); 287 if (blobCids.length > 0) { 288 logger.info('Re-uploading blobs for draft', { uri, blobCount: blobCids.length }); 289 const blobs = await getBlobsByCids(userDid, blobCids); 290 for (const blob of blobs) { 291 try { 292 await agent.uploadBlob(blob.data, { encoding: blob.mimeType }); 293 logger.info('Blob re-uploaded', { cid: blob.cid }); 294 } catch (blobErr) { 295 logger.warn('Failed to re-upload blob (may already exist on PDS)', { cid: blob.cid }); 296 // Non-fatal: PDS may already have the blob from a previous attempt 297 void blobErr; 298 } 299 } 300 } 301 302 if (draft.action === 'create') { 303 await agent.com.atproto.repo.createRecord({ 304 repo: userDid, 305 collection: draft.collection, 306 rkey: draft.rkey, 307 record, 308 }); 309 } else { 310 // put 311 await agent.com.atproto.repo.putRecord({ 312 repo: userDid, 313 collection: draft.collection, 314 rkey: draft.rkey, 315 record, 316 }); 317 } 318 319 // Delete local blob copies now that the record is committed on the PDS 320 if (blobCids.length > 0) { 321 await deleteBlobs(blobCids); 322 } 323 } 324 325 await markDraftPublished(uri); 326 publishDurationSeconds.observe((Date.now() - publishStart) / 1000); 327 schedulerPublishesTotal.inc({ result: 'success' }); 328 logger.info('Draft published successfully', { uri }); 329 330 // Handle schedule chaining (create next draft for recurring schedules) 331 const rawRow = await getDraftRawRow(uri); 332 if (rawRow?.schedule_id) { 333 try { 334 await handleScheduleChaining(rawRow); 335 } catch (chainErr) { 336 logger.error('Error in schedule chaining (non-fatal)', chainErr instanceof Error ? chainErr : /* istanbul ignore next */ undefined, { uri }); 337 } 338 } 339 340 // Call post-publish webhook if configured 341 const webhookUrl = process.env.POST_PUBLISH_WEBHOOK_URL ?? config.postPublishWebhookUrl; 342 if (webhookUrl) { 343 try { 344 await fetch(webhookUrl, { 345 method: 'POST', 346 headers: { 'Content-Type': 'application/json' }, 347 body: JSON.stringify({ uri, publishedAt: new Date().toISOString() }), 348 }); 349 } catch (webhookErr) { 350 logger.warn('Post-publish webhook failed (non-fatal)', { uri, error: String(webhookErr) }); 351 } 352 } 353 } catch (err) { 354 const errMsg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err); 355 const errCode = (err as { error?: string }).error ?? ''; 356 logger.error('Failed to publish draft', err instanceof Error ? err : /* istanbul ignore next */ undefined, { uri }); 357 358 // Permanent auth failures — re-authentication required, retrying won't help 359 if (errCode === 'invalid_token' || errMsg.includes('invalid_grant') || errMsg.includes('revoked')) { 360 await markDraftFailed(uri, `oauth_failure: ${errMsg}`, false); 361 schedulerPublishesTotal.inc({ result: 'failed' }); 362 logger.warn('Draft failed due to OAuth auth error, marked as permanently failed', { uri }); 363 return; 364 } 365 366 // Increment retry count 367 const retryCount = await incrementRetryCount(uri); 368 369 if (retryCount < MAX_RETRIES) { 370 const backoffMs = RETRY_BACKOFF_MS[retryCount - 1] ?? 371 /* istanbul ignore next */ RETRY_BACKOFF_MS[RETRY_BACKOFF_MS.length - 1]; 372 const retryAt = Date.now() + backoffMs; 373 await markDraftFailed(uri, errMsg, true, retryAt); 374 schedulerPublishesTotal.inc({ result: 'retry' }); 375 logger.info('Draft scheduled for retry', { uri, retryCount, retryAt: new Date(retryAt).toISOString() }); 376 } else { 377 await markDraftFailed(uri, errMsg, false); 378 schedulerPublishesTotal.inc({ result: 'failed' }); 379 logger.warn('Draft exhausted retries, marked as failed', { uri, retryCount }); 380 } 381 } 382} 383 384/** 385 * Get the raw record JSON for a draft (needed for PDS calls) 386 */ 387async function getDraftRecord(uri: string): Promise<string | null> { 388 const row = await getDb() 389 .selectFrom('drafts') 390 .select('record') 391 .where('uri', '=', uri) 392 .executeTakeFirst(); 393 /* istanbul ignore next */ 394 return row?.record ?? null; 395} 396 397/** 398 * Poll for ready drafts and publish them 399 */ 400async function poll(config: ServiceConfig): Promise<void> { 401 schedulerWakeupsTotal.inc(); 402 let readyDrafts: DraftRow[] = []; 403 try { 404 readyDrafts = await getReadyDrafts(); 405 } catch (err) { 406 logger.error('Failed to poll for ready drafts', err instanceof Error ? err : /* istanbul ignore next */ undefined); 407 return; 408 } 409 410 /* istanbul ignore next */ 411 if (readyDrafts.length === 0) return; 412 413 logger.info(`Found ${readyDrafts.length} draft(s) ready to publish`); 414 415 for (const draft of readyDrafts) { 416 try { 417 await publishDraft(draft.uri, config); 418 } catch (err) /* istanbul ignore next */ { 419 logger.error('Unhandled error publishing draft', err instanceof Error ? err : undefined, { 420 uri: draft.uri, 421 }); 422 } 423 } 424} 425 426/** 427 * Schedule the next wakeup based on the earliest pending scheduled draft. 428 * Cancels any existing wakeup timer before setting the new one. 429 */ 430async function scheduleNextWakeup(): Promise<void> { 431 if (wakeupTimeout) { 432 clearTimeout(wakeupTimeout); 433 wakeupTimeout = null; 434 } 435 /* istanbul ignore next */ 436 if (!schedulerRunning || !currentConfig) return; 437 438 const myGen = ++scheduleGeneration; 439 const nextAt = await getNextScheduledAt(); 440 441 // A newer scheduleNextWakeup() call superseded us while we were awaiting the DB 442 if (myGen !== scheduleGeneration) return; 443 if (nextAt === null) return; 444 445 const delay = Math.max(0, nextAt - Date.now()); 446 wakeupTimeout = setTimeout(() => { 447 wakeupTimeout = null; 448 void poll(currentConfig!).then(() => scheduleNextWakeup()); 449 }, delay); 450 wakeupTimeout.unref(); 451} 452 453/** 454 * Notify the scheduler that the set of scheduled drafts has changed. 455 * Recalculates the next wakeup time immediately. 456 */ 457export function notifyScheduler(): void { 458 void scheduleNextWakeup(); 459} 460 461/** 462 * Start the background scheduler 463 */ 464export function startScheduler(config: ServiceConfig): void { 465 if (schedulerRunning) { 466 logger.warn('Scheduler already running'); 467 return; 468 } 469 470 schedulerRunning = true; 471 currentConfig = config; 472 logger.info('Starting event-driven scheduler'); 473 void scheduleNextWakeup(); 474} 475 476/** 477 * Stop the background scheduler 478 */ 479export function stopScheduler(): void { 480 if (!schedulerRunning) return; 481 schedulerRunning = false; 482 currentConfig = null; 483 scheduleGeneration++; // invalidate any in-flight scheduleNextWakeup() calls 484 if (wakeupTimeout) { 485 clearTimeout(wakeupTimeout); 486 wakeupTimeout = null; 487 } 488 logger.info('Scheduler stopped'); 489}