A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codeberg-source 600 lines 23 kB view raw
1package main 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "html/template" 9 "log/slog" 10 "net/http" 11 "os" 12 "os/signal" 13 "strings" 14 "syscall" 15 "time" 16 17 "github.com/distribution/distribution/v3/registry" 18 "github.com/distribution/distribution/v3/registry/handlers" 19 "github.com/spf13/cobra" 20 21 "atcr.io/pkg/appview/middleware" 22 "atcr.io/pkg/appview/storage" 23 "atcr.io/pkg/atproto" 24 "atcr.io/pkg/auth" 25 "atcr.io/pkg/auth/oauth" 26 "atcr.io/pkg/auth/token" 27 "atcr.io/pkg/logging" 28 29 // UI components 30 "atcr.io/pkg/appview" 31 "atcr.io/pkg/appview/db" 32 uihandlers "atcr.io/pkg/appview/handlers" 33 "atcr.io/pkg/appview/holdhealth" 34 "atcr.io/pkg/appview/jetstream" 35 "atcr.io/pkg/appview/readme" 36 "atcr.io/pkg/appview/routes" 37 "github.com/go-chi/chi/v5" 38 chimiddleware "github.com/go-chi/chi/v5/middleware" 39) 40 41var serveCmd = &cobra.Command{ 42 Use: "serve", 43 Short: "Start the ATCR registry server", 44 Long: `Start the ATCR registry server with authentication endpoints. 45 46Configuration is loaded from environment variables. 47See .env.appview.example for available environment variables.`, 48 Args: cobra.NoArgs, 49 RunE: serveRegistry, 50} 51 52func init() { 53 // Replace the default serve command with our custom one 54 for i, cmd := range registry.RootCmd.Commands() { 55 if cmd.Name() == "serve" { 56 registry.RootCmd.Commands()[i] = serveCmd 57 break 58 } 59 } 60} 61 62func serveRegistry(cmd *cobra.Command, args []string) error { 63 // Load configuration from environment variables 64 cfg, err := appview.LoadConfigFromEnv() 65 if err != nil { 66 return fmt.Errorf("failed to load config from environment: %w", err) 67 } 68 69 // Initialize structured logging 70 logging.InitLogger(cfg.LogLevel) 71 72 slog.Info("Configuration loaded successfully from environment") 73 74 // Initialize UI database first (required for all stores) 75 slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath) 76 uiDatabase, uiReadOnlyDB, uiSessionStore := db.InitializeDatabase(cfg.UI.Enabled, cfg.UI.DatabasePath, cfg.UI.SkipDBMigrations) 77 if uiDatabase == nil { 78 return fmt.Errorf("failed to initialize UI database - required for session storage") 79 } 80 81 // Initialize hold health checker 82 slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL) 83 healthChecker := holdhealth.NewChecker(cfg.Health.CacheTTL) 84 85 // Initialize README fetcher for rendering repo page descriptions 86 readmeFetcher := readme.NewFetcher() 87 88 // Start background health check worker 89 startupDelay := 5 * time.Second // Wait for hold services to start (Docker compose) 90 dbAdapter := holdhealth.NewDBAdapter(uiDatabase) 91 healthWorker := holdhealth.NewWorkerWithStartupDelay(healthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay) 92 93 // Create context for worker lifecycle management 94 workerCtx, workerCancel := context.WithCancel(context.Background()) 95 defer workerCancel() // Ensure context is cancelled on all exit paths 96 healthWorker.Start(workerCtx) 97 slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL) 98 99 // Initialize OAuth components 100 slog.Info("Initializing OAuth components") 101 102 // Create OAuth session storage (SQLite-backed) 103 oauthStore := db.NewOAuthStore(uiDatabase) 104 slog.Info("Using SQLite for OAuth session storage") 105 106 // Create device store (SQLite-backed) 107 deviceStore := db.NewDeviceStore(uiDatabase) 108 slog.Info("Using SQLite for device storage") 109 110 // Get base URL and default hold DID from config 111 baseURL := cfg.Server.BaseURL 112 defaultHoldDID := cfg.Server.DefaultHoldDID 113 testMode := cfg.Server.TestMode 114 115 slog.Debug("Base URL for OAuth", "base_url", baseURL) 116 if testMode { 117 slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution and transition:generic scope") 118 } 119 120 // Create OAuth client app (automatically configures confidential client for production) 121 desiredScopes := oauth.GetDefaultScopes(defaultHoldDID) 122 oauthClientApp, err := oauth.NewClientApp(baseURL, oauthStore, desiredScopes, cfg.Server.OAuthKeyPath, cfg.Server.ClientName) 123 if err != nil { 124 return fmt.Errorf("failed to create OAuth client app: %w", err) 125 } 126 if testMode { 127 slog.Info("Using OAuth scopes with transition:generic (test mode)") 128 } else { 129 slog.Info("Using OAuth scopes with RPC scope (production mode)") 130 } 131 132 // Invalidate sessions with mismatched scopes on startup 133 // This ensures all users have the latest required scopes after deployment 134 invalidatedCount, err := oauthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes) 135 if err != nil { 136 slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err) 137 } else if invalidatedCount > 0 { 138 slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount) 139 } 140 141 // Create oauth token refresher 142 refresher := oauth.NewRefresher(oauthClientApp) 143 144 // Wire up UI session store to refresher so it can invalidate UI sessions on OAuth failures 145 if uiSessionStore != nil { 146 refresher.SetUISessionStore(uiSessionStore) 147 } 148 149 // Set global refresher for middleware 150 middleware.SetGlobalRefresher(refresher) 151 152 // Set global database for hold DID lookups (used by blob routing) 153 holdDIDDB := db.NewHoldDIDDB(uiDatabase) 154 middleware.SetGlobalDatabase(holdDIDDB) 155 156 // Create RemoteHoldAuthorizer for hold authorization with caching 157 holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase, testMode) 158 middleware.SetGlobalAuthorizer(holdAuthorizer) 159 slog.Info("Hold authorizer initialized with database caching") 160 161 // Clear all denial caches on startup for a clean slate (non-blocking) 162 if remote, ok := holdAuthorizer.(*auth.RemoteHoldAuthorizer); ok { 163 go func() { 164 if err := remote.ClearAllDenials(); err != nil { 165 slog.Warn("Failed to clear denial caches on startup", "error", err) 166 } 167 }() 168 } 169 170 // Initialize Jetstream workers (background services before HTTP routes) 171 initializeJetstream(uiDatabase, &cfg.Jetstream, defaultHoldDID, testMode, refresher) 172 173 // // Run stats migration to holds (one-time migration, skipped if already done) 174 // go func() { 175 // // Wait for services to be ready (Docker startup race condition) 176 // time.Sleep(10 * time.Second) 177 // if err := db.MigrateStatsToHolds(context.Background(), uiDatabase); err != nil { 178 // slog.Warn("Stats migration failed", "error", err) 179 // } 180 // }() 181 182 // Create main chi router 183 mainRouter := chi.NewRouter() 184 185 // Add core middleware 186 mainRouter.Use(chimiddleware.Logger) 187 mainRouter.Use(chimiddleware.Recoverer) 188 mainRouter.Use(chimiddleware.GetHead) // Automatically handle HEAD requests for GET routes 189 mainRouter.Use(routes.CORSMiddleware()) 190 191 // Load templates if UI is enabled 192 var uiTemplates *template.Template 193 if cfg.UI.Enabled { 194 var err error 195 uiTemplates, err = appview.Templates() 196 if err != nil { 197 slog.Warn("Failed to load UI templates", "error", err) 198 } else { 199 // Register UI routes with dependencies 200 routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{ 201 Database: uiDatabase, 202 ReadOnlyDB: uiReadOnlyDB, 203 SessionStore: uiSessionStore, 204 OAuthClientApp: oauthClientApp, 205 OAuthStore: oauthStore, 206 Refresher: refresher, 207 BaseURL: baseURL, 208 DeviceStore: deviceStore, 209 HealthChecker: healthChecker, 210 ReadmeFetcher: readmeFetcher, 211 Templates: uiTemplates, 212 }) 213 } 214 } 215 216 // Create OAuth server 217 oauthServer := oauth.NewServer(oauthClientApp) 218 // Connect server to refresher for cache invalidation 219 oauthServer.SetRefresher(refresher) 220 // Connect UI session store for web login 221 if uiSessionStore != nil { 222 oauthServer.SetUISessionStore(uiSessionStore) 223 } 224 225 // Register OAuth post-auth callback for AppView business logic 226 // This decouples the OAuth package from AppView-specific dependencies 227 oauthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error { 228 slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did) 229 230 // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 231 client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, refresher) 232 233 // Ensure sailor profile exists (creates with default hold if configured) 234 slog.Debug("Ensuring profile exists", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID) 235 if err := storage.EnsureProfile(ctx, client, defaultHoldDID); err != nil { 236 slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err) 237 // Continue anyway - profile creation is not critical for avatar fetch 238 } else { 239 slog.Debug("Profile ensured", "component", "appview/callback", "did", did) 240 } 241 242 // Fetch user's profile record from PDS (contains blob references) 243 profileRecord, err := client.GetProfileRecord(ctx, did) 244 if err != nil { 245 slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err) 246 // Continue without avatar - set profileRecord to nil to skip avatar extraction 247 profileRecord = nil 248 } 249 250 // Construct avatar URL from blob CID using imgs.blue CDN (if profile record was fetched successfully) 251 avatarURL := "" 252 if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { 253 avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link) 254 slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL) 255 } 256 257 // Store user in database (with or without avatar) 258 // Use UpsertUser if we successfully fetched an avatar (to update existing users) 259 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars) 260 if avatarURL != "" { 261 err = db.UpsertUser(uiDatabase, &db.User{ 262 DID: did, 263 Handle: handle, 264 PDSEndpoint: pdsEndpoint, 265 Avatar: avatarURL, 266 LastSeen: time.Now(), 267 }) 268 } else { 269 err = db.UpsertUserIgnoreAvatar(uiDatabase, &db.User{ 270 DID: did, 271 Handle: handle, 272 PDSEndpoint: pdsEndpoint, 273 Avatar: avatarURL, 274 LastSeen: time.Now(), 275 }) 276 } 277 if err != nil { 278 slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err) 279 return nil // Non-fatal 280 } 281 282 slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "") 283 284 // Migrate profile URL→DID if needed 285 profile, err := storage.GetProfile(ctx, client) 286 if err != nil { 287 slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err) 288 return nil // Non-fatal 289 } 290 291 var holdDID string 292 if profile != nil && profile.DefaultHold != "" { 293 // Check if defaultHold is a URL (needs migration) 294 if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") { 295 slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold) 296 297 // Resolve URL to DID 298 holdDID := atproto.ResolveHoldDIDFromURL(profile.DefaultHold) 299 300 // Update profile with DID 301 profile.DefaultHold = holdDID 302 if err := storage.UpdateProfile(ctx, client, profile); err != nil { 303 slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err) 304 } else { 305 slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID) 306 } 307 } else { 308 // Already a DID - use it 309 holdDID = profile.DefaultHold 310 } 311 // Register crew regardless of migration (outside the migration block) 312 // Run in background to avoid blocking OAuth callback if hold is offline 313 // Use background context - don't inherit request context which gets canceled on response 314 slog.Debug("Attempting crew registration", "component", "appview/callback", "did", did, "hold_did", holdDID) 315 go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string, authorizer auth.HoldAuthorizer) { 316 ctx := context.Background() 317 storage.EnsureCrewMembership(ctx, client, refresher, holdDID, authorizer) 318 }(client, refresher, holdDID, holdAuthorizer) 319 320 } 321 322 return nil // All errors are non-fatal, logged for debugging 323 }) 324 325 // Create token issuer (also initializes auth keys if needed) 326 var issuer *token.Issuer 327 if cfg.Distribution.Auth["token"] != nil { 328 issuer, err = createTokenIssuer(cfg) 329 if err != nil { 330 return fmt.Errorf("failed to create token issuer: %w", err) 331 } 332 333 // Log successful initialization 334 slog.Info("Auth keys initialized", "path", cfg.Auth.KeyPath) 335 } 336 337 // Create registry app (returns http.Handler) 338 ctx := context.Background() 339 app := handlers.NewApp(ctx, cfg.Distribution) 340 341 // Wrap registry app with auth method extraction middleware 342 // This extracts the auth method from the JWT and stores it in the request context 343 wrappedApp := middleware.ExtractAuthMethod(app) 344 345 // Mount registry at /v2/ 346 mainRouter.Handle("/v2/*", wrappedApp) 347 348 // Mount static files if UI is enabled 349 if uiSessionStore != nil && uiTemplates != nil { 350 // Register dynamic routes for root-level files (favicons, manifests, etc.) 351 staticHandler := appview.StaticHandler() 352 rootFiles, err := appview.StaticRootFiles() 353 if err != nil { 354 slog.Warn("Failed to scan static root files", "error", err) 355 } else { 356 for _, filename := range rootFiles { 357 // Create a closure to capture the filename 358 file := filename 359 mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) { 360 // Serve the specific file from static root 361 r.URL.Path = "/" + file 362 staticHandler.ServeHTTP(w, r) 363 }) 364 } 365 slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles) 366 } 367 368 // Mount subdirectory routes with clean paths 369 mainRouter.Handle("/css/*", http.StripPrefix("/css/", appview.StaticSubdir("css"))) 370 mainRouter.Handle("/js/*", http.StripPrefix("/js/", appview.StaticSubdir("js"))) 371 mainRouter.Handle("/static/*", http.StripPrefix("/static/", appview.StaticSubdir("static"))) 372 373 slog.Info("UI enabled", "home", "/", "settings", "/settings") 374 } 375 376 // Mount OAuth endpoints 377 mainRouter.Get("/auth/oauth/authorize", oauthServer.ServeAuthorize) 378 mainRouter.Get("/auth/oauth/callback", oauthServer.ServeCallback) 379 380 // OAuth client metadata endpoint 381 mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) { 382 config := oauthClientApp.Config 383 metadata := config.ClientMetadata() 384 385 // For confidential clients, ensure JWKS is included 386 // The indigo library should populate this automatically, but we explicitly set it here 387 // to be defensive and ensure it's always present for confidential clients 388 if config.IsConfidential() && metadata.JWKS == nil { 389 jwks := config.PublicJWKS() 390 metadata.JWKS = &jwks 391 } 392 393 // Convert indigo's metadata to map so we can add custom fields 394 metadataBytes, err := json.Marshal(metadata) 395 if err != nil { 396 http.Error(w, "Failed to marshal metadata", http.StatusInternalServerError) 397 return 398 } 399 400 var metadataMap map[string]interface{} 401 if err := json.Unmarshal(metadataBytes, &metadataMap); err != nil { 402 http.Error(w, "Failed to unmarshal metadata", http.StatusInternalServerError) 403 return 404 } 405 406 // Add custom fields 407 metadataMap["client_name"] = cfg.Server.ClientName 408 metadataMap["client_uri"] = cfg.Server.BaseURL 409 metadataMap["logo_uri"] = cfg.Server.BaseURL + "/web-app-manifest-192x192.png" 410 411 w.Header().Set("Content-Type", "application/json") 412 w.Header().Set("Access-Control-Allow-Origin", "*") 413 // Limit caching to allow scope changes to propagate quickly 414 // PDS servers cache client metadata, so short max-age helps with updates 415 w.Header().Set("Cache-Control", "public, max-age=300") 416 if err := json.NewEncoder(w).Encode(metadataMap); err != nil { 417 http.Error(w, "Failed to encode metadata", http.StatusInternalServerError) 418 } 419 }) 420 421 // Note: Indigo handles OAuth state cleanup internally via its store 422 423 // Mount auth endpoints if enabled 424 if issuer != nil { 425 // Basic Auth token endpoint (supports device secrets and app passwords) 426 tokenHandler := token.NewHandler(issuer, deviceStore) 427 428 // Register OAuth session validator for device auth validation 429 // This validates OAuth sessions are usable (not just exist) before issuing tokens 430 // Prevents the flood of errors when a stale session is discovered during push 431 tokenHandler.SetOAuthSessionValidator(refresher) 432 433 // Register token post-auth callback for profile management 434 // This decouples the token package from AppView-specific dependencies 435 tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error { 436 slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did) 437 438 // Create ATProto client with validated token 439 atprotoClient := atproto.NewClient(pdsEndpoint, did, accessToken) 440 441 // Ensure profile exists (will create with default hold if not exists and default is configured) 442 if err := storage.EnsureProfile(ctx, atprotoClient, defaultHoldDID); err != nil { 443 // Log error but don't fail auth - profile management is not critical 444 slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err) 445 } else { 446 slog.Debug("Profile ensured with default hold", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID) 447 } 448 449 return nil // All errors are non-fatal 450 }) 451 452 mainRouter.Get("/auth/token", tokenHandler.ServeHTTP) 453 454 // Device authorization endpoints (public) 455 mainRouter.Handle("/auth/device/code", &uihandlers.DeviceCodeHandler{ 456 Store: deviceStore, 457 AppViewBaseURL: baseURL, 458 }) 459 mainRouter.Handle("/auth/device/token", &uihandlers.DeviceTokenHandler{ 460 Store: deviceStore, 461 }) 462 463 slog.Info("Auth endpoints enabled", 464 "basic_auth", "/auth/token", 465 "device_code", "/auth/device/code", 466 "device_token", "/auth/device/token", 467 "oauth_authorize", "/auth/oauth/authorize", 468 "oauth_callback", "/auth/oauth/callback", 469 "oauth_metadata", "/client-metadata.json") 470 } 471 472 // Register credential helper version API (public endpoint) 473 mainRouter.Handle("/api/credential-helper/version", &uihandlers.CredentialHelperVersionHandler{ 474 Version: cfg.CredentialHelper.Version, 475 TangledRepo: cfg.CredentialHelper.TangledRepo, 476 Checksums: cfg.CredentialHelper.Checksums, 477 }) 478 if cfg.CredentialHelper.Version != "" { 479 slog.Info("Credential helper version API enabled", 480 "endpoint", "/api/credential-helper/version", 481 "version", cfg.CredentialHelper.Version) 482 } 483 484 // Create HTTP server 485 server := &http.Server{ 486 Addr: cfg.Server.Addr, 487 Handler: mainRouter, 488 } 489 490 // Handle graceful shutdown 491 stop := make(chan os.Signal, 1) 492 signal.Notify(stop, os.Interrupt, syscall.SIGTERM) 493 494 // Start server in goroutine 495 errChan := make(chan error, 1) 496 go func() { 497 slog.Info("Starting registry server", "addr", cfg.Server.Addr) 498 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 499 errChan <- err 500 } 501 }() 502 503 // Wait for shutdown signal or error 504 select { 505 case <-stop: 506 slog.Info("Shutting down registry server") 507 508 // Stop health worker first 509 slog.Info("Stopping hold health worker") 510 healthWorker.Stop() 511 512 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 513 defer cancel() 514 515 if err := server.Shutdown(shutdownCtx); err != nil { 516 return fmt.Errorf("server shutdown error: %w", err) 517 } 518 case err := <-errChan: 519 // Stop health worker on error (workerCancel called by defer) 520 healthWorker.Stop() 521 return fmt.Errorf("server error: %w", err) 522 } 523 524 return nil 525} 526 527// createTokenIssuer creates a token issuer for auth handlers 528func createTokenIssuer(cfg *appview.Config) (*token.Issuer, error) { 529 return token.NewIssuer( 530 cfg.Auth.KeyPath, 531 cfg.Auth.ServiceName, // issuer 532 cfg.Auth.ServiceName, // service 533 cfg.Auth.TokenExpiration, 534 ) 535} 536 537// initializeJetstream initializes the Jetstream workers for real-time events and backfill 538func initializeJetstream(database *sql.DB, jetstreamCfg *appview.JetstreamConfig, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) { 539 // Start Jetstream worker 540 jetstreamURL := jetstreamCfg.URL 541 542 // Start real-time Jetstream worker with cursor tracking for reconnects 543 go func() { 544 var lastCursor int64 = 0 // Start from now on first connect 545 for { 546 worker := jetstream.NewWorker(database, jetstreamURL, lastCursor) 547 if err := worker.Start(context.Background()); err != nil { 548 // Save cursor from this connection for next reconnect 549 lastCursor = worker.GetLastCursor() 550 slog.Warn("Jetstream real-time worker error, reconnecting", "component", "jetstream", "error", err, "reconnect_delay", "10s") 551 time.Sleep(10 * time.Second) 552 } 553 } 554 }() 555 slog.Info("Jetstream real-time worker started", "component", "jetstream") 556 557 // Start backfill worker (enabled by default, set ATCR_BACKFILL_ENABLED=false to disable) 558 if jetstreamCfg.BackfillEnabled { 559 // Get relay endpoint for sync API (defaults to Bluesky's relay) 560 relayEndpoint := jetstreamCfg.RelayEndpoint 561 562 backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint, defaultHoldDID, testMode, refresher) 563 if err != nil { 564 slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err) 565 } else { 566 // Run initial backfill with startup delay for Docker compose 567 go func() { 568 // Wait for hold service to be ready (Docker startup race condition) 569 startupDelay := 5 * time.Second 570 slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay) 571 time.Sleep(startupDelay) 572 573 slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoint", relayEndpoint) 574 if err := backfillWorker.Start(context.Background()); err != nil { 575 slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err) 576 } else { 577 slog.Info("Backfill completed successfully", "component", "jetstream/backfill") 578 } 579 }() 580 581 // Start periodic backfill scheduler 582 interval := jetstreamCfg.BackfillInterval 583 584 go func() { 585 ticker := time.NewTicker(interval) 586 defer ticker.Stop() 587 588 for range ticker.C { 589 slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval) 590 if err := backfillWorker.Start(context.Background()); err != nil { 591 slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err) 592 } else { 593 slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill") 594 } 595 } 596 }() 597 slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval) 598 } 599 } 600}