A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
72
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix backfilling manifests in the correct order, not just digest order

+141 -20
+46
pkg/appview/db/queries.go
··· 652 652 return &m, nil 653 653 } 654 654 655 + // GetNewestManifestForRepo returns the newest manifest for a specific repository 656 + // Used by backfill to ensure annotations come from the most recent manifest 657 + func GetNewestManifestForRepo(db *sql.DB, did, repository string) (*Manifest, error) { 658 + var m Manifest 659 + err := db.QueryRow(` 660 + SELECT id, did, repository, digest, hold_endpoint, schema_version, media_type, 661 + config_digest, config_size, created_at 662 + FROM manifests 663 + WHERE did = ? AND repository = ? 664 + ORDER BY created_at DESC 665 + LIMIT 1 666 + `, did, repository).Scan( 667 + &m.ID, &m.DID, &m.Repository, &m.Digest, 668 + &m.HoldEndpoint, &m.SchemaVersion, &m.MediaType, 669 + &m.ConfigDigest, &m.ConfigSize, &m.CreatedAt, 670 + ) 671 + if err != nil { 672 + return nil, err 673 + } 674 + return &m, nil 675 + } 676 + 677 + // GetRepositoriesForDID returns all unique repository names for a DID 678 + // Used by backfill to reconcile annotations for all repositories 679 + func GetRepositoriesForDID(db *sql.DB, did string) ([]string, error) { 680 + rows, err := db.Query(` 681 + SELECT DISTINCT repository 682 + FROM manifests 683 + WHERE did = ? 684 + `, did) 685 + if err != nil { 686 + return nil, err 687 + } 688 + defer rows.Close() 689 + 690 + var repositories []string 691 + for rows.Next() { 692 + var repo string 693 + if err := rows.Scan(&repo); err != nil { 694 + return nil, err 695 + } 696 + repositories = append(repositories, repo) 697 + } 698 + return repositories, rows.Err() 699 + } 700 + 655 701 // GetLayersForManifest fetches all layers for a manifest 656 702 func GetLayersForManifest(db *sql.DB, manifestID int64) ([]Layer, error) { 657 703 rows, err := db.Query(`
+66 -2
pkg/appview/jetstream/backfill.go
··· 152 152 return 0, fmt.Errorf("no PDS endpoint found for DID %s", did) 153 153 } 154 154 155 - // Create a client for this user's PDS 156 - pdsClient := atproto.NewClient(pdsEndpoint, "", "") 155 + // Create a client for this user's PDS with the user's DID 156 + // This allows GetRecord to work properly with the repo parameter 157 + pdsClient := atproto.NewClient(pdsEndpoint, did, "") 157 158 158 159 var recordCursor string 159 160 recordCount := 0 ··· 219 220 if collection == atproto.ManifestCollection { 220 221 if err := db.CleanupOrphanedTags(b.db, did); err != nil { 221 222 fmt.Printf("WARNING: Failed to cleanup orphaned tags for %s: %v\n", did, err) 223 + } 224 + 225 + // Reconcile annotations - ensure they come from newest manifest per repository 226 + // This fixes out-of-order backfill where older manifests can overwrite newer annotations 227 + if err := b.reconcileAnnotations(ctx, did, pdsClient); err != nil { 228 + fmt.Printf("WARNING: Failed to reconcile annotations for %s: %v\n", did, err) 222 229 } 223 230 } 224 231 ··· 361 368 fmt.Printf("Backfill: Cached captain record for hold %s (owner: %s)\n", holdDID, captainRecord.OwnerDID) 362 369 return nil 363 370 } 371 + 372 + // reconcileAnnotations ensures annotations come from the newest manifest in each repository 373 + // This fixes the out-of-order backfill issue where older manifests can overwrite newer annotations 374 + func (b *BackfillWorker) reconcileAnnotations(ctx context.Context, did string, pdsClient *atproto.Client) error { 375 + // Get all repositories for this DID 376 + repositories, err := db.GetRepositoriesForDID(b.db, did) 377 + if err != nil { 378 + return fmt.Errorf("failed to get repositories: %w", err) 379 + } 380 + 381 + for _, repo := range repositories { 382 + // Find newest manifest for this repository 383 + newestManifest, err := db.GetNewestManifestForRepo(b.db, did, repo) 384 + if err != nil { 385 + fmt.Printf("WARNING [backfill]: Failed to get newest manifest for %s/%s: %v\n", did, repo, err) 386 + continue // Skip on error 387 + } 388 + 389 + // Fetch the full manifest record from PDS using the digest as rkey 390 + rkey := strings.TrimPrefix(newestManifest.Digest, "sha256:") 391 + record, err := pdsClient.GetRecord(ctx, atproto.ManifestCollection, rkey) 392 + if err != nil { 393 + fmt.Printf("WARNING [backfill]: Failed to fetch manifest record for %s/%s: %v\n", did, repo, err) 394 + continue // Skip on error 395 + } 396 + 397 + // Parse manifest record 398 + var manifestRecord atproto.ManifestRecord 399 + if err := json.Unmarshal(record.Value, &manifestRecord); err != nil { 400 + fmt.Printf("WARNING [backfill]: Failed to parse manifest record for %s/%s: %v\n", did, repo, err) 401 + continue 402 + } 403 + 404 + // Update annotations from newest manifest only 405 + if manifestRecord.Annotations != nil && len(manifestRecord.Annotations) > 0 { 406 + // Filter out empty annotations 407 + hasData := false 408 + for _, value := range manifestRecord.Annotations { 409 + if value != "" { 410 + hasData = true 411 + break 412 + } 413 + } 414 + 415 + if hasData { 416 + err = db.UpsertRepositoryAnnotations(b.db, did, repo, manifestRecord.Annotations) 417 + if err != nil { 418 + fmt.Printf("WARNING [backfill]: Failed to reconcile annotations for %s/%s: %v\n", did, repo, err) 419 + } else { 420 + fmt.Printf("Backfill: Reconciled annotations for %s/%s from newest manifest %s\n", did, repo, newestManifest.Digest) 421 + } 422 + } 423 + } 424 + } 425 + 426 + return nil 427 + }
+29 -18
pkg/appview/jetstream/processor_test.go
··· 39 39 config_digest TEXT, 40 40 config_size INTEGER, 41 41 created_at TIMESTAMP NOT NULL, 42 - title TEXT, 43 - description TEXT, 44 - source_url TEXT, 45 - documentation_url TEXT, 46 - licenses TEXT, 47 - icon_url TEXT, 48 - readme_url TEXT, 49 42 UNIQUE(did, repository, digest) 43 + ); 44 + 45 + CREATE TABLE repository_annotations ( 46 + did TEXT NOT NULL, 47 + repository TEXT NOT NULL, 48 + key TEXT NOT NULL, 49 + value TEXT NOT NULL, 50 + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 51 + PRIMARY KEY(did, repository, key), 52 + FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 50 53 ); 51 54 52 55 CREATE TABLE layers ( ··· 189 192 t.Errorf("Expected 1 manifest, got %d", count) 190 193 } 191 194 192 - // Verify annotations were stored 195 + // Verify annotations were stored in repository_annotations table 193 196 var title, source string 194 - err = database.QueryRow("SELECT title, source_url FROM manifests WHERE id = ?", manifestID).Scan(&title, &source) 197 + err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 198 + "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) 195 199 if err != nil { 196 - t.Fatalf("Failed to query manifest fields: %v", err) 200 + t.Fatalf("Failed to query title annotation: %v", err) 197 201 } 198 202 if title != "Test App" { 199 203 t.Errorf("title = %q, want %q", title, "Test App") 200 204 } 205 + 206 + err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 207 + "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) 208 + if err != nil { 209 + t.Fatalf("Failed to query source annotation: %v", err) 210 + } 201 211 if source != "https://github.com/test/app" { 202 - t.Errorf("source_url = %q, want %q", source, "https://github.com/test/app") 212 + t.Errorf("source = %q, want %q", source, "https://github.com/test/app") 203 213 } 204 214 205 215 // Verify layers were inserted ··· 523 533 t.Fatalf("Failed to marshal manifest: %v", err) 524 534 } 525 535 526 - manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 536 + _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 527 537 if err != nil { 528 538 t.Fatalf("ProcessManifest failed: %v", err) 529 539 } 530 540 531 - // Verify annotation fields are empty strings (not NULL) 532 - var title string 533 - err = database.QueryRow("SELECT title FROM manifests WHERE id = ?", manifestID).Scan(&title) 541 + // Verify no annotations were stored (nil annotations should not create entries) 542 + var annotationCount int 543 + err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", 544 + "did:plc:test123", "test-app").Scan(&annotationCount) 534 545 if err != nil { 535 - t.Fatalf("Failed to query title: %v", err) 546 + t.Fatalf("Failed to query annotations: %v", err) 536 547 } 537 - if title != "" { 538 - t.Errorf("Expected empty title, got %q", title) 548 + if annotationCount != 0 { 549 + t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) 539 550 } 540 551 }