alf: the atproto Latency Fabric
alf.fly.dev/
1// ABOUTME: Background scheduler that polls for ready drafts and publishes them to the PDS
2
3import { Agent } from '@atproto/api';
4import { randomUUID } from 'node:crypto';
5import type { ServiceConfig } from './config.js';
6import { createLogger } from './logger.js';
7import { schedulerWakeupsTotal, schedulerPublishesTotal, publishDurationSeconds } from './metrics.js';
8import {
9 getDb,
10 getReadyDrafts,
11 getNextScheduledAt,
12 claimDraftForPublishing,
13 markDraftPublished,
14 markDraftFailed,
15 incrementRetryCount,
16 getDraft,
17 getDraftRawRow,
18 getUserAuthorization,
19 getBlobsByCids,
20 deleteBlobs,
21 getRawSchedule,
22 incrementScheduleFireCount,
23 updateScheduleNextDraft,
24 updateScheduleStatus,
25 createDraft,
26} from './storage.js';
27import { getOAuthClient } from './oauth.js';
28import type { DraftRow } from './schema.js';
29import { extractDidFromAtUri } from './schema.js';
30import { computeNextOccurrence, getOccurrenceRecord } from '@newpublic/recurrence';
31import type { RecurrenceRule } from '@newpublic/recurrence';
32
33const logger = createLogger('Scheduler');
34
35const MAX_RETRIES = 3;
36
37// Backoff delays for retries (milliseconds)
38const RETRY_BACKOFF_MS = [
39 1 * 60 * 1000, // 1 minute
40 5 * 60 * 1000, // 5 minutes
41 15 * 60 * 1000, // 15 minutes
42];
43
44let schedulerRunning = false;
45let wakeupTimeout: ReturnType<typeof setTimeout> | null = null;
46let currentConfig: ServiceConfig | null = null;
47let scheduleGeneration = 0;
48
49/**
50 * Walk a record JSON and collect all blob CIDs (nodes with { $type: "blob" })
51 */
52function collectBlobCids(value: unknown): string[] {
53 if (typeof value !== 'object' || value === null) return [];
54 const obj = value as Record<string, unknown>;
55 if (obj['$type'] === 'blob' && typeof obj['ref'] === 'object' && obj['ref'] !== null) {
56 const ref = obj['ref'] as Record<string, unknown>;
57 const link = ref['$link'];
58 if (typeof link === 'string') return [link];
59 }
60 return Object.values(obj).flatMap(collectBlobCids);
61}
62
63/**
64 * After a successful publish, handle schedule chaining:
65 * increment fire_count, compute next occurrence, create next draft.
66 */
67async function handleScheduleChaining(draftRow: DraftRow): Promise<void> {
68 const scheduleId = draftRow.schedule_id;
69 /* istanbul ignore next */
70 if (!scheduleId) return;
71
72 const schedule = await getRawSchedule(scheduleId);
73 if (!schedule || schedule.status !== 'active') return;
74
75 const userDid = draftRow.user_did;
76 const now = new Date();
77
78 // If dynamic content, fetch it (just for logging — it was already published)
79 if (schedule.content_url) {
80 try {
81 const url = new URL(schedule.content_url);
82 url.searchParams.set('fireCount', String(schedule.fire_count + 1));
83 url.searchParams.set('scheduledAt', new Date(Number(draftRow.scheduled_at ?? /* istanbul ignore next */ Date.now())).toISOString());
84 logger.info('Dynamic schedule published', { scheduleId, contentUrl: url.toString() });
85 } catch (err) {
86 /* istanbul ignore next */
87 logger.warn('Failed to build content URL for logging', { scheduleId, error: String(err) });
88 }
89 }
90
91 // Increment fire count
92 await incrementScheduleFireCount(scheduleId);
93
94 // Compute next occurrence
95 let rule: RecurrenceRule;
96 try {
97 rule = JSON.parse(schedule.recurrence_rule) as RecurrenceRule;
98 } catch {
99 logger.error('Invalid recurrence rule JSON in schedule', undefined, { scheduleId });
100 await updateScheduleStatus(scheduleId, 'error');
101 return;
102 }
103
104 const nextFireAt = computeNextOccurrence(rule, now);
105 if (!nextFireAt) {
106 // Series exhausted — 'once' schedules complete naturally; others are cancelled
107 const isOnce = rule.rule.type === 'once';
108 if (isOnce) {
109 logger.info('Once schedule completed, marking completed', { scheduleId });
110 await updateScheduleStatus(scheduleId, 'completed');
111 } else {
112 logger.info('Schedule series exhausted, marking cancelled', { scheduleId });
113 await updateScheduleStatus(scheduleId, 'cancelled');
114 }
115 await updateScheduleNextDraft(scheduleId, null);
116 return;
117 }
118
119 // For dynamic schedules, create a draft with null record (fetched at publish time)
120 // For static schedules, create a draft with the template record
121 const nextRecord = schedule.content_url ? null : (schedule.record ? JSON.parse(schedule.record) as Record<string, unknown> : /* istanbul ignore next */ null);
122
123 // Build AT-URI for next draft
124 const rkey = `sched-${Date.now()}-${randomUUID().substring(0, 8)}`;
125 const collection = schedule.collection;
126 const uri = `at://${userDid}/${collection}/${rkey}`;
127
128 try {
129 await createDraft({
130 uri,
131 userDid,
132 collection,
133 rkey,
134 record: nextRecord,
135 recordCid: null, // Will be computed at publish time for static records
136 action: 'create',
137 scheduledAt: nextFireAt.getTime(),
138 scheduleId,
139 });
140
141 await updateScheduleNextDraft(scheduleId, uri);
142 notifyScheduler();
143 logger.info('Next scheduled draft created', { scheduleId, uri, nextFireAt: nextFireAt.toISOString() });
144 } catch (err) {
145 logger.error('Failed to create next schedule draft', err instanceof Error ? err : /* istanbul ignore next */ undefined, { scheduleId });
146 await updateScheduleStatus(scheduleId, 'error');
147 }
148}
149
150/**
151 * Publish a single draft to the user's PDS.
152 * This is the shared logic used by both the scheduler and publishPost().
153 */
154export async function publishDraft(uri: string, config: ServiceConfig): Promise<void> {
155 // Atomically claim the draft
156 const claimed = await claimDraftForPublishing(uri);
157 if (!claimed) {
158 logger.warn('Draft already claimed or not in publishable state', { uri });
159 return;
160 }
161
162 const draft = await getDraft(uri);
163 if (!draft) {
164 logger.error('Draft not found after claiming', undefined, { uri });
165 return;
166 }
167
168 logger.info('Publishing draft', { uri, action: draft.action });
169
170 let userDid: string;
171 try {
172 userDid = extractDidFromAtUri(uri);
173 } catch {
174 logger.error('Invalid AT-URI in database, skipping draft', undefined, { uri });
175 return;
176 }
177 const publishStart = Date.now();
178
179 try {
180 // Look up user authorization — OAuth is the only supported auth type
181 const authRecord = await getUserAuthorization(userDid);
182
183 if (!authRecord || authRecord.auth_type !== 'oauth') {
184 await markDraftFailed(uri, 'no_oauth_authorization', false);
185 logger.error('No OAuth authorization found for user', undefined, { uri, userDid });
186 return;
187 }
188
189 let agent: Agent;
190
191 // OAuth-type auth: restore OAuth + DPoP session
192 const oauthClient = getOAuthClient();
193 try {
194 const session = await oauthClient.restore(userDid);
195 agent = new Agent(session);
196 } catch (oauthErr) {
197 const errMsg = oauthErr instanceof Error ? oauthErr.message : /* istanbul ignore next */ String(oauthErr);
198 const errCode = (oauthErr as { error?: string }).error ?? '';
199 // OAuthResponseError from a failed token refresh has .error on the cause
200 const causeCode = ((oauthErr as { cause?: { error?: string } }).cause?.error) ?? '';
201 const isPermanentAuthFailure =
202 errMsg.includes('revoked') ||
203 errMsg.includes('invalid_grant') ||
204 errCode === 'invalid_token' ||
205 errMsg.includes('invalid_token') ||
206 causeCode === 'invalid_grant' ||
207 // TokenRefreshError wraps the underlying OAuth error
208 errMsg.toLowerCase().includes('token refresh') ||
209 errMsg.toLowerCase().includes('refresh failed');
210 if (isPermanentAuthFailure) {
211 await markDraftFailed(uri, 'oauth_revoked', false);
212 logger.error('OAuth token revoked or refresh failed for draft', undefined, { uri, errMsg, errCode, causeCode });
213 return;
214 }
215 throw oauthErr;
216 }
217
218 // Execute the PDS call based on the draft action
219 if (draft.action === 'delete') {
220 await agent.com.atproto.repo.deleteRecord({
221 repo: userDid,
222 collection: draft.collection,
223 rkey: draft.rkey,
224 });
225 } else {
226 // For dynamic schedule drafts, fetch content from content_url at publish time
227 let record: Record<string, unknown>;
228 const draftRaw = await getDraftRawRow(uri);
229 const scheduleId = draftRaw?.schedule_id;
230
231 if (scheduleId && !draftRaw?.record) {
232 // Dynamic schedule: fetch content URL
233 const schedule = await getRawSchedule(scheduleId);
234 if (!schedule?.content_url) {
235 await markDraftFailed(uri, 'dynamic_schedule_missing_content_url', false);
236 return;
237 }
238 try {
239 const url = new URL(schedule.content_url);
240 url.searchParams.set('fireCount', String(schedule.fire_count + 1));
241 url.searchParams.set('scheduledAt', new Date(Number(draftRaw?.scheduled_at ?? /* istanbul ignore next */ Date.now())).toISOString());
242 const resp = await fetch(url.toString(), {
243 headers: { 'Accept': 'application/json' },
244 });
245 if (!resp.ok) {
246 throw new Error(`Content URL returned ${resp.status}`);
247 }
248 record = await resp.json() as Record<string, unknown>;
249 } catch (fetchErr) {
250 const errMsg = fetchErr instanceof Error ? fetchErr.message : /* istanbul ignore next */ String(fetchErr);
251 logger.error('Failed to fetch dynamic schedule content', fetchErr instanceof Error ? fetchErr : /* istanbul ignore next */ undefined, { uri, scheduleId });
252 await markDraftFailed(uri, `content_url_fetch_failed: ${errMsg}`, false);
253 await updateScheduleStatus(scheduleId, 'error');
254 return;
255 }
256 } else {
257 record = JSON.parse(
258 /* istanbul ignore next */ (await getDraftRecord(uri)) ?? '{}',
259 ) as Record<string, unknown>;
260
261 // Check for override_payload exception on this occurrence
262 if (scheduleId && draftRaw?.scheduled_at) {
263 const schedule = await getRawSchedule(scheduleId);
264 if (schedule) {
265 try {
266 const parsedRule = JSON.parse(schedule.recurrence_rule) as RecurrenceRule;
267 const overrideRecord = getOccurrenceRecord(parsedRule, new Date(Number(draftRaw.scheduled_at)));
268 if (overrideRecord) record = overrideRecord;
269 } catch {
270 // Invalid rule JSON — use draft record as-is
271 }
272 }
273 }
274 }
275
276 // Use scheduled time as the published createdAt so the post appears
277 // in the feed at the intended time rather than at draft creation time.
278 if (draft.scheduledAt) {
279 record.createdAt = draft.scheduledAt;
280 record.scheduledAt = draft.scheduledAt;
281 }
282
283 // Re-upload any stored blobs before publishing.
284 // Blobs are deleted from local storage only AFTER the PDS write succeeds,
285 // so that retries can re-upload them if the record commit fails.
286 const blobCids = collectBlobCids(record);
287 if (blobCids.length > 0) {
288 logger.info('Re-uploading blobs for draft', { uri, blobCount: blobCids.length });
289 const blobs = await getBlobsByCids(userDid, blobCids);
290 for (const blob of blobs) {
291 try {
292 await agent.uploadBlob(blob.data, { encoding: blob.mimeType });
293 logger.info('Blob re-uploaded', { cid: blob.cid });
294 } catch (blobErr) {
295 logger.warn('Failed to re-upload blob (may already exist on PDS)', { cid: blob.cid });
296 // Non-fatal: PDS may already have the blob from a previous attempt
297 void blobErr;
298 }
299 }
300 }
301
302 if (draft.action === 'create') {
303 await agent.com.atproto.repo.createRecord({
304 repo: userDid,
305 collection: draft.collection,
306 rkey: draft.rkey,
307 record,
308 });
309 } else {
310 // put
311 await agent.com.atproto.repo.putRecord({
312 repo: userDid,
313 collection: draft.collection,
314 rkey: draft.rkey,
315 record,
316 });
317 }
318
319 // Delete local blob copies now that the record is committed on the PDS
320 if (blobCids.length > 0) {
321 await deleteBlobs(blobCids);
322 }
323 }
324
325 await markDraftPublished(uri);
326 publishDurationSeconds.observe((Date.now() - publishStart) / 1000);
327 schedulerPublishesTotal.inc({ result: 'success' });
328 logger.info('Draft published successfully', { uri });
329
330 // Handle schedule chaining (create next draft for recurring schedules)
331 const rawRow = await getDraftRawRow(uri);
332 if (rawRow?.schedule_id) {
333 try {
334 await handleScheduleChaining(rawRow);
335 } catch (chainErr) {
336 logger.error('Error in schedule chaining (non-fatal)', chainErr instanceof Error ? chainErr : /* istanbul ignore next */ undefined, { uri });
337 }
338 }
339
340 // Call post-publish webhook if configured
341 const webhookUrl = process.env.POST_PUBLISH_WEBHOOK_URL ?? config.postPublishWebhookUrl;
342 if (webhookUrl) {
343 try {
344 await fetch(webhookUrl, {
345 method: 'POST',
346 headers: { 'Content-Type': 'application/json' },
347 body: JSON.stringify({ uri, publishedAt: new Date().toISOString() }),
348 });
349 } catch (webhookErr) {
350 logger.warn('Post-publish webhook failed (non-fatal)', { uri, error: String(webhookErr) });
351 }
352 }
353 } catch (err) {
354 const errMsg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err);
355 const errCode = (err as { error?: string }).error ?? '';
356 logger.error('Failed to publish draft', err instanceof Error ? err : /* istanbul ignore next */ undefined, { uri });
357
358 // Permanent auth failures — re-authentication required, retrying won't help
359 if (errCode === 'invalid_token' || errMsg.includes('invalid_grant') || errMsg.includes('revoked')) {
360 await markDraftFailed(uri, `oauth_failure: ${errMsg}`, false);
361 schedulerPublishesTotal.inc({ result: 'failed' });
362 logger.warn('Draft failed due to OAuth auth error, marked as permanently failed', { uri });
363 return;
364 }
365
366 // Increment retry count
367 const retryCount = await incrementRetryCount(uri);
368
369 if (retryCount < MAX_RETRIES) {
370 const backoffMs = RETRY_BACKOFF_MS[retryCount - 1] ??
371 /* istanbul ignore next */ RETRY_BACKOFF_MS[RETRY_BACKOFF_MS.length - 1];
372 const retryAt = Date.now() + backoffMs;
373 await markDraftFailed(uri, errMsg, true, retryAt);
374 schedulerPublishesTotal.inc({ result: 'retry' });
375 logger.info('Draft scheduled for retry', { uri, retryCount, retryAt: new Date(retryAt).toISOString() });
376 } else {
377 await markDraftFailed(uri, errMsg, false);
378 schedulerPublishesTotal.inc({ result: 'failed' });
379 logger.warn('Draft exhausted retries, marked as failed', { uri, retryCount });
380 }
381 }
382}
383
384/**
385 * Get the raw record JSON for a draft (needed for PDS calls)
386 */
387async function getDraftRecord(uri: string): Promise<string | null> {
388 const row = await getDb()
389 .selectFrom('drafts')
390 .select('record')
391 .where('uri', '=', uri)
392 .executeTakeFirst();
393 /* istanbul ignore next */
394 return row?.record ?? null;
395}
396
397/**
398 * Poll for ready drafts and publish them
399 */
400async function poll(config: ServiceConfig): Promise<void> {
401 schedulerWakeupsTotal.inc();
402 let readyDrafts: DraftRow[] = [];
403 try {
404 readyDrafts = await getReadyDrafts();
405 } catch (err) {
406 logger.error('Failed to poll for ready drafts', err instanceof Error ? err : /* istanbul ignore next */ undefined);
407 return;
408 }
409
410 /* istanbul ignore next */
411 if (readyDrafts.length === 0) return;
412
413 logger.info(`Found ${readyDrafts.length} draft(s) ready to publish`);
414
415 for (const draft of readyDrafts) {
416 try {
417 await publishDraft(draft.uri, config);
418 } catch (err) /* istanbul ignore next */ {
419 logger.error('Unhandled error publishing draft', err instanceof Error ? err : undefined, {
420 uri: draft.uri,
421 });
422 }
423 }
424}
425
426/**
427 * Schedule the next wakeup based on the earliest pending scheduled draft.
428 * Cancels any existing wakeup timer before setting the new one.
429 */
430async function scheduleNextWakeup(): Promise<void> {
431 if (wakeupTimeout) {
432 clearTimeout(wakeupTimeout);
433 wakeupTimeout = null;
434 }
435 /* istanbul ignore next */
436 if (!schedulerRunning || !currentConfig) return;
437
438 const myGen = ++scheduleGeneration;
439 const nextAt = await getNextScheduledAt();
440
441 // A newer scheduleNextWakeup() call superseded us while we were awaiting the DB
442 if (myGen !== scheduleGeneration) return;
443 if (nextAt === null) return;
444
445 const delay = Math.max(0, nextAt - Date.now());
446 wakeupTimeout = setTimeout(() => {
447 wakeupTimeout = null;
448 void poll(currentConfig!).then(() => scheduleNextWakeup());
449 }, delay);
450 wakeupTimeout.unref();
451}
452
453/**
454 * Notify the scheduler that the set of scheduled drafts has changed.
455 * Recalculates the next wakeup time immediately.
456 */
457export function notifyScheduler(): void {
458 void scheduleNextWakeup();
459}
460
461/**
462 * Start the background scheduler
463 */
464export function startScheduler(config: ServiceConfig): void {
465 if (schedulerRunning) {
466 logger.warn('Scheduler already running');
467 return;
468 }
469
470 schedulerRunning = true;
471 currentConfig = config;
472 logger.info('Starting event-driven scheduler');
473 void scheduleNextWakeup();
474}
475
476/**
477 * Stop the background scheduler
478 */
479export function stopScheduler(): void {
480 if (!schedulerRunning) return;
481 schedulerRunning = false;
482 currentConfig = null;
483 scheduleGeneration++; // invalidate any in-flight scheduleNextWakeup() calls
484 if (wakeupTimeout) {
485 clearTimeout(wakeupTimeout);
486 wakeupTimeout = null;
487 }
488 logger.info('Scheduler stopped');
489}