A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix backfill

+32 -20
+6 -6
cmd/registry/serve.go
··· 474 474 475 475 // Start backfill worker if enabled 476 476 if backfillEnabled := os.Getenv("ATCR_BACKFILL_ENABLED"); backfillEnabled == "true" { 477 - // Get BGS endpoint for sync API (defaults to Bluesky's BGS) 478 - bgsEndpoint := os.Getenv("ATCR_BGS_ENDPOINT") 479 - if bgsEndpoint == "" { 480 - bgsEndpoint = "https://bsky.network" 477 + // Get relay endpoint for sync API (defaults to Bluesky's relay) 478 + relayEndpoint := os.Getenv("ATCR_RELAY_ENDPOINT") 479 + if relayEndpoint == "" { 480 + relayEndpoint = "https://relay1.us-east.bsky.network" 481 481 } 482 482 483 - backfillWorker, err := jetstream.NewBackfillWorker(database, bgsEndpoint) 483 + backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint) 484 484 if err != nil { 485 485 fmt.Printf("Warning: Failed to create backfill worker: %v\n", err) 486 486 } else { 487 487 go func() { 488 - fmt.Printf("Backfill: Starting sync-based backfill from %s...\n", bgsEndpoint) 488 + fmt.Printf("Backfill: Starting sync-based backfill from %s...\n", relayEndpoint) 489 489 if err := backfillWorker.Start(context.Background()); err != nil { 490 490 fmt.Printf("Backfill: Finished with error: %v\n", err) 491 491 } else {
+1 -3
docker-compose.yml
··· 10 10 environment: 11 11 - ATCR_TOKEN_STORAGE_PATH=/var/lib/atcr/tokens/oauth-tokens.json 12 12 - ATCR_UI_ENABLED=true 13 - # Jetstream backfill: Replay historical events (runs once until caught up) 14 - # Examples: -120h (5 days ago), -336h (2 weeks ago), 2025-10-01T00:00:00Z 15 - - JETSTREAM_BACKFILL_START=-121h 13 + - ATCR_BACKFILL_ENABLED=true 16 14 volumes: 17 15 # Auth keys (JWT signing keys) 18 16 - atcr-auth:/var/lib/atcr/auth
+18 -9
pkg/appview/jetstream/backfill.go
··· 31 31 } 32 32 33 33 // NewBackfillWorker creates a backfill worker using sync API 34 - func NewBackfillWorker(database *sql.DB, pdsEndpoint string) (*BackfillWorker, error) { 35 - // Create client without auth - sync endpoints are public 36 - client := atproto.NewClient(pdsEndpoint, "", "") 34 + func NewBackfillWorker(database *sql.DB, relayEndpoint string) (*BackfillWorker, error) { 35 + // Create client for relay - used only for listReposByCollection 36 + client := atproto.NewClient(relayEndpoint, "", "") 37 37 38 38 return &BackfillWorker{ 39 39 db: database, 40 - client: client, 40 + client: client, // This points to the relay 41 41 resolver: atproto.NewResolver(), 42 42 }, nil 43 43 } ··· 82 82 fmt.Printf("Backfill: Found %d repos with %s (cursor: %s)\n", len(result.Repos), collection, repoCursor) 83 83 84 84 // Process each repo (DID) 85 - for _, did := range result.Repos { 86 - recordCount, err := b.backfillRepo(ctx, did, collection) 85 + for _, repo := range result.Repos { 86 + recordCount, err := b.backfillRepo(ctx, repo.DID, collection) 87 87 if err != nil { 88 - fmt.Printf("WARNING: Failed to backfill repo %s: %v\n", did, err) 88 + fmt.Printf("WARNING: Failed to backfill repo %s: %v\n", repo.DID, err) 89 89 continue 90 90 } 91 91 ··· 111 111 112 112 // backfillRepo backfills all records for a single repo/DID 113 113 func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) { 114 - // Ensure user exists in database 114 + // Ensure user exists in database and get their PDS endpoint 115 115 if err := b.ensureUser(ctx, did); err != nil { 116 116 return 0, fmt.Errorf("failed to ensure user: %w", err) 117 117 } 118 118 119 + // Resolve DID to get user's PDS endpoint 120 + _, pdsEndpoint, err := b.resolver.ResolveIdentity(ctx, did) 121 + if err != nil { 122 + return 0, fmt.Errorf("failed to resolve DID to PDS: %w", err) 123 + } 124 + 125 + // Create a client for this user's PDS 126 + pdsClient := atproto.NewClient(pdsEndpoint, "", "") 127 + 119 128 var recordCursor string 120 129 recordCount := 0 121 130 ··· 125 134 126 135 // Paginate through all records for this repo 127 136 for { 128 - records, cursor, err := b.client.ListRecordsForRepo(ctx, did, collection, 100, recordCursor) 137 + records, cursor, err := pdsClient.ListRecordsForRepo(ctx, did, collection, 100, recordCursor) 129 138 if err != nil { 130 139 return recordCount, fmt.Errorf("failed to list records: %w", err) 131 140 }
+7 -2
pkg/atproto/client.go
··· 302 302 303 303 // ListReposByCollectionResult represents the response from com.atproto.sync.listReposByCollection 304 304 type ListReposByCollectionResult struct { 305 - Repos []string `json:"repos"` // Array of DIDs 306 - Cursor string `json:"cursor,omitempty"` 305 + Repos []RepoRef `json:"repos"` // Array of repo references 306 + Cursor string `json:"cursor,omitempty"` 307 + } 308 + 309 + // RepoRef represents a repository reference in listReposByCollection response 310 + type RepoRef struct { 311 + DID string `json:"did"` 307 312 } 308 313 309 314 // ListReposByCollection lists all repos (DIDs) that have records in a collection