A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package main
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "html/template"
9 "log/slog"
10 "net/http"
11 "os"
12 "os/signal"
13 "strings"
14 "syscall"
15 "time"
16
17 "github.com/distribution/distribution/v3/registry"
18 "github.com/distribution/distribution/v3/registry/handlers"
19 "github.com/spf13/cobra"
20
21 "atcr.io/pkg/appview/middleware"
22 "atcr.io/pkg/appview/storage"
23 "atcr.io/pkg/atproto"
24 "atcr.io/pkg/auth"
25 "atcr.io/pkg/auth/oauth"
26 "atcr.io/pkg/auth/token"
27 "atcr.io/pkg/logging"
28
29 // UI components
30 "atcr.io/pkg/appview"
31 "atcr.io/pkg/appview/db"
32 uihandlers "atcr.io/pkg/appview/handlers"
33 "atcr.io/pkg/appview/holdhealth"
34 "atcr.io/pkg/appview/jetstream"
35 "atcr.io/pkg/appview/readme"
36 "atcr.io/pkg/appview/routes"
37 "github.com/go-chi/chi/v5"
38 chimiddleware "github.com/go-chi/chi/v5/middleware"
39)
40
41var serveCmd = &cobra.Command{
42 Use: "serve",
43 Short: "Start the ATCR registry server",
44 Long: `Start the ATCR registry server with authentication endpoints.
45
46Configuration is loaded from environment variables.
47See .env.appview.example for available environment variables.`,
48 Args: cobra.NoArgs,
49 RunE: serveRegistry,
50}
51
52func init() {
53 // Replace the default serve command with our custom one
54 for i, cmd := range registry.RootCmd.Commands() {
55 if cmd.Name() == "serve" {
56 registry.RootCmd.Commands()[i] = serveCmd
57 break
58 }
59 }
60}
61
62func serveRegistry(cmd *cobra.Command, args []string) error {
63 // Load configuration from environment variables
64 cfg, err := appview.LoadConfigFromEnv()
65 if err != nil {
66 return fmt.Errorf("failed to load config from environment: %w", err)
67 }
68
69 // Initialize structured logging
70 logging.InitLogger(cfg.LogLevel)
71
72 slog.Info("Configuration loaded successfully from environment")
73
74 // Initialize UI database first (required for all stores)
75 slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath)
76 uiDatabase, uiReadOnlyDB, uiSessionStore := db.InitializeDatabase(cfg.UI.Enabled, cfg.UI.DatabasePath, cfg.UI.SkipDBMigrations)
77 if uiDatabase == nil {
78 return fmt.Errorf("failed to initialize UI database - required for session storage")
79 }
80
81 // Initialize hold health checker
82 slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL)
83 healthChecker := holdhealth.NewChecker(cfg.Health.CacheTTL)
84
85 // Initialize README fetcher for rendering repo page descriptions
86 readmeFetcher := readme.NewFetcher()
87
88 // Start background health check worker
89 startupDelay := 5 * time.Second // Wait for hold services to start (Docker compose)
90 dbAdapter := holdhealth.NewDBAdapter(uiDatabase)
91 healthWorker := holdhealth.NewWorkerWithStartupDelay(healthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay)
92
93 // Create context for worker lifecycle management
94 workerCtx, workerCancel := context.WithCancel(context.Background())
95 defer workerCancel() // Ensure context is cancelled on all exit paths
96 healthWorker.Start(workerCtx)
97 slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL)
98
99 // Initialize OAuth components
100 slog.Info("Initializing OAuth components")
101
102 // Create OAuth session storage (SQLite-backed)
103 oauthStore := db.NewOAuthStore(uiDatabase)
104 slog.Info("Using SQLite for OAuth session storage")
105
106 // Create device store (SQLite-backed)
107 deviceStore := db.NewDeviceStore(uiDatabase)
108 slog.Info("Using SQLite for device storage")
109
110 // Get base URL and default hold DID from config
111 baseURL := cfg.Server.BaseURL
112 defaultHoldDID := cfg.Server.DefaultHoldDID
113 testMode := cfg.Server.TestMode
114
115 slog.Debug("Base URL for OAuth", "base_url", baseURL)
116 if testMode {
117 slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution and transition:generic scope")
118 }
119
120 // Create OAuth client app (automatically configures confidential client for production)
121 desiredScopes := oauth.GetDefaultScopes(defaultHoldDID)
122 oauthClientApp, err := oauth.NewClientApp(baseURL, oauthStore, desiredScopes, cfg.Server.OAuthKeyPath, cfg.Server.ClientName)
123 if err != nil {
124 return fmt.Errorf("failed to create OAuth client app: %w", err)
125 }
126 if testMode {
127 slog.Info("Using OAuth scopes with transition:generic (test mode)")
128 } else {
129 slog.Info("Using OAuth scopes with RPC scope (production mode)")
130 }
131
132 // Invalidate sessions with mismatched scopes on startup
133 // This ensures all users have the latest required scopes after deployment
134 invalidatedCount, err := oauthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes)
135 if err != nil {
136 slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err)
137 } else if invalidatedCount > 0 {
138 slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount)
139 }
140
141 // Create oauth token refresher
142 refresher := oauth.NewRefresher(oauthClientApp)
143
144 // Wire up UI session store to refresher so it can invalidate UI sessions on OAuth failures
145 if uiSessionStore != nil {
146 refresher.SetUISessionStore(uiSessionStore)
147 }
148
149 // Set global refresher for middleware
150 middleware.SetGlobalRefresher(refresher)
151
152 // Set global database for hold DID lookups (used by blob routing)
153 holdDIDDB := db.NewHoldDIDDB(uiDatabase)
154 middleware.SetGlobalDatabase(holdDIDDB)
155
156 // Create RemoteHoldAuthorizer for hold authorization with caching
157 holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase, testMode)
158 middleware.SetGlobalAuthorizer(holdAuthorizer)
159 slog.Info("Hold authorizer initialized with database caching")
160
161 // Clear all denial caches on startup for a clean slate (non-blocking)
162 if remote, ok := holdAuthorizer.(*auth.RemoteHoldAuthorizer); ok {
163 go func() {
164 if err := remote.ClearAllDenials(); err != nil {
165 slog.Warn("Failed to clear denial caches on startup", "error", err)
166 }
167 }()
168 }
169
170 // Initialize Jetstream workers (background services before HTTP routes)
171 initializeJetstream(uiDatabase, &cfg.Jetstream, defaultHoldDID, testMode, refresher)
172
173 // // Run stats migration to holds (one-time migration, skipped if already done)
174 // go func() {
175 // // Wait for services to be ready (Docker startup race condition)
176 // time.Sleep(10 * time.Second)
177 // if err := db.MigrateStatsToHolds(context.Background(), uiDatabase); err != nil {
178 // slog.Warn("Stats migration failed", "error", err)
179 // }
180 // }()
181
182 // Create main chi router
183 mainRouter := chi.NewRouter()
184
185 // Add core middleware
186 mainRouter.Use(chimiddleware.Logger)
187 mainRouter.Use(chimiddleware.Recoverer)
188 mainRouter.Use(chimiddleware.GetHead) // Automatically handle HEAD requests for GET routes
189 mainRouter.Use(routes.CORSMiddleware())
190
191 // Load templates if UI is enabled
192 var uiTemplates *template.Template
193 if cfg.UI.Enabled {
194 var err error
195 uiTemplates, err = appview.Templates()
196 if err != nil {
197 slog.Warn("Failed to load UI templates", "error", err)
198 } else {
199 // Register UI routes with dependencies
200 routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{
201 Database: uiDatabase,
202 ReadOnlyDB: uiReadOnlyDB,
203 SessionStore: uiSessionStore,
204 OAuthClientApp: oauthClientApp,
205 OAuthStore: oauthStore,
206 Refresher: refresher,
207 BaseURL: baseURL,
208 DeviceStore: deviceStore,
209 HealthChecker: healthChecker,
210 ReadmeFetcher: readmeFetcher,
211 Templates: uiTemplates,
212 })
213 }
214 }
215
216 // Create OAuth server
217 oauthServer := oauth.NewServer(oauthClientApp)
218 // Connect server to refresher for cache invalidation
219 oauthServer.SetRefresher(refresher)
220 // Connect UI session store for web login
221 if uiSessionStore != nil {
222 oauthServer.SetUISessionStore(uiSessionStore)
223 }
224
225 // Register OAuth post-auth callback for AppView business logic
226 // This decouples the OAuth package from AppView-specific dependencies
227 oauthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error {
228 slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did)
229
230 // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety)
231 client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, refresher)
232
233 // Ensure sailor profile exists (creates with default hold if configured)
234 slog.Debug("Ensuring profile exists", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
235 if err := storage.EnsureProfile(ctx, client, defaultHoldDID); err != nil {
236 slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
237 // Continue anyway - profile creation is not critical for avatar fetch
238 } else {
239 slog.Debug("Profile ensured", "component", "appview/callback", "did", did)
240 }
241
242 // Fetch user's profile record from PDS (contains blob references)
243 profileRecord, err := client.GetProfileRecord(ctx, did)
244 if err != nil {
245 slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err)
246 // Continue without avatar - set profileRecord to nil to skip avatar extraction
247 profileRecord = nil
248 }
249
250 // Construct avatar URL from blob CID using imgs.blue CDN (if profile record was fetched successfully)
251 avatarURL := ""
252 if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
253 avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link)
254 slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL)
255 }
256
257 // Store user in database (with or without avatar)
258 // Use UpsertUser if we successfully fetched an avatar (to update existing users)
259 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
260 if avatarURL != "" {
261 err = db.UpsertUser(uiDatabase, &db.User{
262 DID: did,
263 Handle: handle,
264 PDSEndpoint: pdsEndpoint,
265 Avatar: avatarURL,
266 LastSeen: time.Now(),
267 })
268 } else {
269 err = db.UpsertUserIgnoreAvatar(uiDatabase, &db.User{
270 DID: did,
271 Handle: handle,
272 PDSEndpoint: pdsEndpoint,
273 Avatar: avatarURL,
274 LastSeen: time.Now(),
275 })
276 }
277 if err != nil {
278 slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err)
279 return nil // Non-fatal
280 }
281
282 slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "")
283
284 // Migrate profile URL→DID if needed
285 profile, err := storage.GetProfile(ctx, client)
286 if err != nil {
287 slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err)
288 return nil // Non-fatal
289 }
290
291 var holdDID string
292 if profile != nil && profile.DefaultHold != "" {
293 // Check if defaultHold is a URL (needs migration)
294 if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") {
295 slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold)
296
297 // Resolve URL to DID
298 holdDID := atproto.ResolveHoldDIDFromURL(profile.DefaultHold)
299
300 // Update profile with DID
301 profile.DefaultHold = holdDID
302 if err := storage.UpdateProfile(ctx, client, profile); err != nil {
303 slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err)
304 } else {
305 slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID)
306 }
307 } else {
308 // Already a DID - use it
309 holdDID = profile.DefaultHold
310 }
311 // Register crew regardless of migration (outside the migration block)
312 // Run in background to avoid blocking OAuth callback if hold is offline
313 // Use background context - don't inherit request context which gets canceled on response
314 slog.Debug("Attempting crew registration", "component", "appview/callback", "did", did, "hold_did", holdDID)
315 go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string, authorizer auth.HoldAuthorizer) {
316 ctx := context.Background()
317 storage.EnsureCrewMembership(ctx, client, refresher, holdDID, authorizer)
318 }(client, refresher, holdDID, holdAuthorizer)
319
320 }
321
322 return nil // All errors are non-fatal, logged for debugging
323 })
324
325 // Create token issuer (also initializes auth keys if needed)
326 var issuer *token.Issuer
327 if cfg.Distribution.Auth["token"] != nil {
328 issuer, err = createTokenIssuer(cfg)
329 if err != nil {
330 return fmt.Errorf("failed to create token issuer: %w", err)
331 }
332
333 // Log successful initialization
334 slog.Info("Auth keys initialized", "path", cfg.Auth.KeyPath)
335 }
336
337 // Create registry app (returns http.Handler)
338 ctx := context.Background()
339 app := handlers.NewApp(ctx, cfg.Distribution)
340
341 // Wrap registry app with auth method extraction middleware
342 // This extracts the auth method from the JWT and stores it in the request context
343 wrappedApp := middleware.ExtractAuthMethod(app)
344
345 // Mount registry at /v2/
346 mainRouter.Handle("/v2/*", wrappedApp)
347
348 // Mount static files if UI is enabled
349 if uiSessionStore != nil && uiTemplates != nil {
350 // Register dynamic routes for root-level files (favicons, manifests, etc.)
351 staticHandler := appview.StaticHandler()
352 rootFiles, err := appview.StaticRootFiles()
353 if err != nil {
354 slog.Warn("Failed to scan static root files", "error", err)
355 } else {
356 for _, filename := range rootFiles {
357 // Create a closure to capture the filename
358 file := filename
359 mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) {
360 // Serve the specific file from static root
361 r.URL.Path = "/" + file
362 staticHandler.ServeHTTP(w, r)
363 })
364 }
365 slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles)
366 }
367
368 // Mount subdirectory routes with clean paths
369 mainRouter.Handle("/css/*", http.StripPrefix("/css/", appview.StaticSubdir("css")))
370 mainRouter.Handle("/js/*", http.StripPrefix("/js/", appview.StaticSubdir("js")))
371 mainRouter.Handle("/static/*", http.StripPrefix("/static/", appview.StaticSubdir("static")))
372
373 slog.Info("UI enabled", "home", "/", "settings", "/settings")
374 }
375
376 // Mount OAuth endpoints
377 mainRouter.Get("/auth/oauth/authorize", oauthServer.ServeAuthorize)
378 mainRouter.Get("/auth/oauth/callback", oauthServer.ServeCallback)
379
380 // OAuth client metadata endpoint
381 mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
382 config := oauthClientApp.Config
383 metadata := config.ClientMetadata()
384
385 // For confidential clients, ensure JWKS is included
386 // The indigo library should populate this automatically, but we explicitly set it here
387 // to be defensive and ensure it's always present for confidential clients
388 if config.IsConfidential() && metadata.JWKS == nil {
389 jwks := config.PublicJWKS()
390 metadata.JWKS = &jwks
391 }
392
393 // Convert indigo's metadata to map so we can add custom fields
394 metadataBytes, err := json.Marshal(metadata)
395 if err != nil {
396 http.Error(w, "Failed to marshal metadata", http.StatusInternalServerError)
397 return
398 }
399
400 var metadataMap map[string]interface{}
401 if err := json.Unmarshal(metadataBytes, &metadataMap); err != nil {
402 http.Error(w, "Failed to unmarshal metadata", http.StatusInternalServerError)
403 return
404 }
405
406 // Add custom fields
407 metadataMap["client_name"] = cfg.Server.ClientName
408 metadataMap["client_uri"] = cfg.Server.BaseURL
409 metadataMap["logo_uri"] = cfg.Server.BaseURL + "/web-app-manifest-192x192.png"
410
411 w.Header().Set("Content-Type", "application/json")
412 w.Header().Set("Access-Control-Allow-Origin", "*")
413 // Limit caching to allow scope changes to propagate quickly
414 // PDS servers cache client metadata, so short max-age helps with updates
415 w.Header().Set("Cache-Control", "public, max-age=300")
416 if err := json.NewEncoder(w).Encode(metadataMap); err != nil {
417 http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
418 }
419 })
420
421 // Note: Indigo handles OAuth state cleanup internally via its store
422
423 // Mount auth endpoints if enabled
424 if issuer != nil {
425 // Basic Auth token endpoint (supports device secrets and app passwords)
426 tokenHandler := token.NewHandler(issuer, deviceStore)
427
428 // Register OAuth session validator for device auth validation
429 // This validates OAuth sessions are usable (not just exist) before issuing tokens
430 // Prevents the flood of errors when a stale session is discovered during push
431 tokenHandler.SetOAuthSessionValidator(refresher)
432
433 // Register token post-auth callback for profile management
434 // This decouples the token package from AppView-specific dependencies
435 tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error {
436 slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did)
437
438 // Create ATProto client with validated token
439 atprotoClient := atproto.NewClient(pdsEndpoint, did, accessToken)
440
441 // Ensure profile exists (will create with default hold if not exists and default is configured)
442 if err := storage.EnsureProfile(ctx, atprotoClient, defaultHoldDID); err != nil {
443 // Log error but don't fail auth - profile management is not critical
444 slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
445 } else {
446 slog.Debug("Profile ensured with default hold", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
447 }
448
449 return nil // All errors are non-fatal
450 })
451
452 mainRouter.Get("/auth/token", tokenHandler.ServeHTTP)
453
454 // Device authorization endpoints (public)
455 mainRouter.Handle("/auth/device/code", &uihandlers.DeviceCodeHandler{
456 Store: deviceStore,
457 AppViewBaseURL: baseURL,
458 })
459 mainRouter.Handle("/auth/device/token", &uihandlers.DeviceTokenHandler{
460 Store: deviceStore,
461 })
462
463 slog.Info("Auth endpoints enabled",
464 "basic_auth", "/auth/token",
465 "device_code", "/auth/device/code",
466 "device_token", "/auth/device/token",
467 "oauth_authorize", "/auth/oauth/authorize",
468 "oauth_callback", "/auth/oauth/callback",
469 "oauth_metadata", "/client-metadata.json")
470 }
471
472 // Register credential helper version API (public endpoint)
473 mainRouter.Handle("/api/credential-helper/version", &uihandlers.CredentialHelperVersionHandler{
474 Version: cfg.CredentialHelper.Version,
475 TangledRepo: cfg.CredentialHelper.TangledRepo,
476 Checksums: cfg.CredentialHelper.Checksums,
477 })
478 if cfg.CredentialHelper.Version != "" {
479 slog.Info("Credential helper version API enabled",
480 "endpoint", "/api/credential-helper/version",
481 "version", cfg.CredentialHelper.Version)
482 }
483
484 // Create HTTP server
485 server := &http.Server{
486 Addr: cfg.Server.Addr,
487 Handler: mainRouter,
488 }
489
490 // Handle graceful shutdown
491 stop := make(chan os.Signal, 1)
492 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
493
494 // Start server in goroutine
495 errChan := make(chan error, 1)
496 go func() {
497 slog.Info("Starting registry server", "addr", cfg.Server.Addr)
498 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
499 errChan <- err
500 }
501 }()
502
503 // Wait for shutdown signal or error
504 select {
505 case <-stop:
506 slog.Info("Shutting down registry server")
507
508 // Stop health worker first
509 slog.Info("Stopping hold health worker")
510 healthWorker.Stop()
511
512 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
513 defer cancel()
514
515 if err := server.Shutdown(shutdownCtx); err != nil {
516 return fmt.Errorf("server shutdown error: %w", err)
517 }
518 case err := <-errChan:
519 // Stop health worker on error (workerCancel called by defer)
520 healthWorker.Stop()
521 return fmt.Errorf("server error: %w", err)
522 }
523
524 return nil
525}
526
527// createTokenIssuer creates a token issuer for auth handlers
528func createTokenIssuer(cfg *appview.Config) (*token.Issuer, error) {
529 return token.NewIssuer(
530 cfg.Auth.KeyPath,
531 cfg.Auth.ServiceName, // issuer
532 cfg.Auth.ServiceName, // service
533 cfg.Auth.TokenExpiration,
534 )
535}
536
537// initializeJetstream initializes the Jetstream workers for real-time events and backfill
538func initializeJetstream(database *sql.DB, jetstreamCfg *appview.JetstreamConfig, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) {
539 // Start Jetstream worker
540 jetstreamURL := jetstreamCfg.URL
541
542 // Start real-time Jetstream worker with cursor tracking for reconnects
543 go func() {
544 var lastCursor int64 = 0 // Start from now on first connect
545 for {
546 worker := jetstream.NewWorker(database, jetstreamURL, lastCursor)
547 if err := worker.Start(context.Background()); err != nil {
548 // Save cursor from this connection for next reconnect
549 lastCursor = worker.GetLastCursor()
550 slog.Warn("Jetstream real-time worker error, reconnecting", "component", "jetstream", "error", err, "reconnect_delay", "10s")
551 time.Sleep(10 * time.Second)
552 }
553 }
554 }()
555 slog.Info("Jetstream real-time worker started", "component", "jetstream")
556
557 // Start backfill worker (enabled by default, set ATCR_BACKFILL_ENABLED=false to disable)
558 if jetstreamCfg.BackfillEnabled {
559 // Get relay endpoint for sync API (defaults to Bluesky's relay)
560 relayEndpoint := jetstreamCfg.RelayEndpoint
561
562 backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint, defaultHoldDID, testMode, refresher)
563 if err != nil {
564 slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err)
565 } else {
566 // Run initial backfill with startup delay for Docker compose
567 go func() {
568 // Wait for hold service to be ready (Docker startup race condition)
569 startupDelay := 5 * time.Second
570 slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay)
571 time.Sleep(startupDelay)
572
573 slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoint", relayEndpoint)
574 if err := backfillWorker.Start(context.Background()); err != nil {
575 slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err)
576 } else {
577 slog.Info("Backfill completed successfully", "component", "jetstream/backfill")
578 }
579 }()
580
581 // Start periodic backfill scheduler
582 interval := jetstreamCfg.BackfillInterval
583
584 go func() {
585 ticker := time.NewTicker(interval)
586 defer ticker.Stop()
587
588 for range ticker.C {
589 slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval)
590 if err := backfillWorker.Start(context.Background()); err != nil {
591 slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err)
592 } else {
593 slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill")
594 }
595 }
596 }()
597 slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval)
598 }
599 }
600}