···291291 }
292292 }
293293294294+ // Pre-load already-backfilled DIDs to avoid N+1 SELECT per DID
295295+ alreadyBackfilled, err := firehoseConsumer.BackfilledDIDs(ctx)
296296+ if err != nil {
297297+ log.Warn().Err(err).Msg("Failed to load backfilled DIDs, will check individually")
298298+ alreadyBackfilled = make(map[string]struct{})
299299+ }
300300+301301+ // Remove already-backfilled DIDs
302302+ for did := range alreadyBackfilled {
303303+ delete(didsToBackfill, did)
304304+ }
305305+294306 // Create a root span so all backfill PDS calls are grouped under one trace
295307 backfillCtx, backfillSpan := tracing.HandlerSpan(ctx, "backfill.startup")
296308297297- // Backfill all collected DIDs
309309+ // Backfill remaining DIDs
298310 successCount := 0
299311 for did := range didsToBackfill {
300312 if err := firehoseConsumer.BackfillDID(backfillCtx, did); err != nil {
···304316 }
305317 }
306318 backfillSpan.End()
307307- log.Info().Int("total", len(didsToBackfill)).Int("success", successCount).Msg("Backfill complete")
319319+ log.Info().
320320+ Int("skipped", len(alreadyBackfilled)).
321321+ Int("backfilled", successCount).
322322+ Int("failed", len(didsToBackfill)-successCount).
323323+ Msg("Backfill complete")
308324 }()
309325310326 // Register users in the feed when they authenticate
+5
internal/firehose/consumer.go
···451451func (c *Consumer) BackfillDID(ctx context.Context, did string) error {
452452 return c.index.BackfillUser(ctx, did)
453453}
454454+455455+// BackfilledDIDs returns the set of all DIDs that have been backfilled.
456456+func (c *Consumer) BackfilledDIDs(ctx context.Context) (map[string]struct{}, error) {
457457+ return c.index.BackfilledDIDs(ctx)
458458+}
+19
internal/firehose/index.go
···14461446 return err == nil
14471447}
1448144814491449+// BackfilledDIDs returns the set of all DIDs that have been backfilled.
14501450+func (idx *FeedIndex) BackfilledDIDs(ctx context.Context) (map[string]struct{}, error) {
14511451+ rows, err := idx.db.QueryContext(ctx, `SELECT did FROM backfilled`)
14521452+ if err != nil {
14531453+ return nil, err
14541454+ }
14551455+ defer rows.Close()
14561456+14571457+ result := make(map[string]struct{})
14581458+ for rows.Next() {
14591459+ var did string
14601460+ if err := rows.Scan(&did); err != nil {
14611461+ return nil, err
14621462+ }
14631463+ result[did] = struct{}{}
14641464+ }
14651465+ return result, rows.Err()
14661466+}
14671467+14491468// MarkBackfilled marks a DID as backfilled with current timestamp
14501469func (idx *FeedIndex) MarkBackfilled(ctx context.Context, did string) error {
14511470 _, err := idx.db.ExecContext(ctx, `INSERT OR IGNORE INTO backfilled (did, backfilled_at) VALUES (?, ?)`,