A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix some backfill and db queries

+317 -50
+8
.tangled/workflows/tests.yml
··· 10 10 nixpkgs: 11 11 - gcc 12 12 - go 13 + - curl 13 14 14 15 steps: 16 + - name: Download and Generate 17 + environment: 18 + CGO_ENABLED: 1 19 + command: | 20 + go mod download 21 + go generate ./... 22 + 15 23 - name: Run Tests 16 24 environment: 17 25 CGO_ENABLED: 1
+21 -17
pkg/appview/db/models.go
··· 13 13 14 14 // Manifest represents an OCI manifest stored in the cache 15 15 type Manifest struct { 16 - ID int64 17 - DID string 18 - Repository string 19 - Digest string 20 - HoldEndpoint string 21 - SchemaVersion int 22 - MediaType string 23 - ConfigDigest string 24 - ConfigSize int64 25 - CreatedAt time.Time 26 - Title string 27 - Description string 28 - SourceURL string 29 - DocumentationURL string 30 - Licenses string 31 - IconURL string 32 - ReadmeURL string 16 + ID int64 17 + DID string 18 + Repository string 19 + Digest string 20 + HoldEndpoint string 21 + SchemaVersion int 22 + MediaType string 23 + ConfigDigest string 24 + ConfigSize int64 25 + CreatedAt time.Time 26 + Title string 27 + Description string 28 + SourceURL string 29 + DocumentationURL string 30 + Licenses string 31 + IconURL string 32 + ReadmeURL string 33 + PlatformOS string // UNUSED: Reserved for future use, always NULL 34 + PlatformArchitecture string // UNUSED: Reserved for future use, always NULL 35 + PlatformVariant string // UNUSED: Reserved for future use, always NULL 36 + PlatformOSVersion string // UNUSED: Reserved for future use, always NULL 33 37 } 34 38 35 39 // Layer represents a layer in a manifest
+66 -5
pkg/appview/db/queries.go
··· 534 534 535 535 // InsertManifest inserts or updates a manifest record 536 536 // Uses UPSERT to update labels/annotations if manifest already exists 537 + // Returns the manifest ID (works correctly for both insert and update) 537 538 func InsertManifest(db *sql.DB, manifest *Manifest) (int64, error) { 538 - result, err := db.Exec(` 539 + _, err := db.Exec(` 539 540 INSERT INTO manifests 540 541 (did, repository, digest, hold_endpoint, schema_version, media_type, 541 542 config_digest, config_size, created_at, ··· 564 565 return 0, err 565 566 } 566 567 567 - return result.LastInsertId() 568 + // Query for the ID (works for both insert and update) 569 + var id int64 570 + err = db.QueryRow(` 571 + SELECT id FROM manifests 572 + WHERE did = ? AND repository = ? AND digest = ? 573 + `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&id) 574 + 575 + if err != nil { 576 + return 0, fmt.Errorf("failed to get manifest ID after upsert: %w", err) 577 + } 578 + 579 + return id, nil 568 580 } 569 581 570 582 // InsertLayer inserts a new layer record ··· 597 609 } 598 610 599 611 // GetTagsWithPlatforms returns all tags for a repository with platform information 600 - // For multi-arch tags, includes all platforms from manifest_references 601 - // For single-arch tags, includes the platform info 612 + // Only multi-arch tags (manifest lists) have platform info in manifest_references 613 + // Single-arch tags will have empty Platforms slice (platform is obvious for single-arch) 602 614 func GetTagsWithPlatforms(db *sql.DB, did, repository string) ([]TagWithPlatforms, error) { 603 615 rows, err := db.Query(` 604 616 SELECT ··· 648 660 tagOrder = append(tagOrder, tagKey) 649 661 } 650 662 651 - // Add platform info if present 663 + // Add platform info if present (only for multi-arch manifest lists) 652 664 if platformOS != "" || platformArch != "" { 653 665 tagMap[tagKey].Platforms = append(tagMap[tagKey].Platforms, PlatformInfo{ 654 666 OS: platformOS, ··· 906 918 m.IsManifestList = strings.Contains(m.MediaType, "index") || strings.Contains(m.MediaType, "manifest.list") 907 919 908 920 manifests = append(manifests, m) 921 + } 922 + 923 + // Fetch platform details for multi-arch manifests AFTER closing the main query 924 + for i := range manifests { 925 + if manifests[i].IsManifestList { 926 + platformRows, err := db.Query(` 927 + SELECT 928 + mr.platform_os, 929 + mr.platform_architecture, 930 + mr.platform_variant, 931 + mr.platform_os_version 932 + FROM manifest_references mr 933 + WHERE mr.manifest_id = ? 934 + ORDER BY mr.reference_index 935 + `, manifests[i].ID) 936 + 937 + if err != nil { 938 + return nil, err 939 + } 940 + 941 + manifests[i].Platforms = []PlatformInfo{} 942 + for platformRows.Next() { 943 + var p PlatformInfo 944 + var os, arch, variant, osVersion sql.NullString 945 + 946 + if err := platformRows.Scan(&os, &arch, &variant, &osVersion); err != nil { 947 + platformRows.Close() 948 + return nil, err 949 + } 950 + 951 + if os.Valid { 952 + p.OS = os.String 953 + } 954 + if arch.Valid { 955 + p.Architecture = arch.String 956 + } 957 + if variant.Valid { 958 + p.Variant = variant.String 959 + } 960 + if osVersion.Valid { 961 + p.OSVersion = osVersion.String 962 + } 963 + 964 + manifests[i].Platforms = append(manifests[i].Platforms, p) 965 + } 966 + platformRows.Close() 967 + 968 + manifests[i].PlatformCount = len(manifests[i].Platforms) 969 + } 909 970 } 910 971 911 972 return manifests, nil
+171
pkg/appview/db/tag_delete_test.go
··· 1 + package db 2 + 3 + import ( 4 + "testing" 5 + "time" 6 + 7 + "atcr.io/pkg/atproto" 8 + ) 9 + 10 + // TestTagDeleteRoundTrip tests the full flow of creating and deleting tags 11 + // This simulates what Jetstream does: encode repo/tag to rkey, then decode and delete 12 + func TestTagDeleteRoundTrip(t *testing.T) { 13 + // Create in-memory test database 14 + db, err := InitDB(":memory:") 15 + if err != nil { 16 + t.Fatalf("Failed to init database: %v", err) 17 + } 18 + defer db.Close() 19 + 20 + // Insert test user 21 + testUser := &User{ 22 + DID: "did:plc:test123", 23 + Handle: "testuser.bsky.social", 24 + PDSEndpoint: "https://test.pds.example.com", 25 + Avatar: "", 26 + LastSeen: time.Now(), 27 + } 28 + if err := UpsertUser(db, testUser); err != nil { 29 + t.Fatalf("Failed to insert user: %v", err) 30 + } 31 + 32 + // Test cases covering different tag patterns 33 + testCases := []struct { 34 + name string 35 + repository string 36 + tag string 37 + expectRoundTrip bool // Some cases can't round-trip due to encoding limitations 38 + }{ 39 + { 40 + name: "simple tag", 41 + repository: "test-image", 42 + tag: "latest", 43 + expectRoundTrip: true, 44 + }, 45 + { 46 + name: "tag with hyphen (like latest-amd64)", 47 + repository: "test-image", 48 + tag: "latest-amd64", 49 + expectRoundTrip: true, 50 + }, 51 + { 52 + name: "tag with hyphen (like latest-arm64)", 53 + repository: "test-image", 54 + tag: "latest-arm64", 55 + expectRoundTrip: true, 56 + }, 57 + { 58 + name: "tag with version", 59 + repository: "myapp", 60 + tag: "v1.0.0", 61 + expectRoundTrip: true, 62 + }, 63 + { 64 + name: "repository with underscore", 65 + repository: "my_repo", 66 + tag: "latest", 67 + expectRoundTrip: true, 68 + }, 69 + { 70 + name: "both with underscores (known limitation)", 71 + repository: "my_repo", 72 + tag: "my_tag", 73 + expectRoundTrip: false, // Cannot round-trip: underscore is the separator 74 + }, 75 + { 76 + name: "repository with multiple hyphens", 77 + repository: "multi-part-name", 78 + tag: "test-build", 79 + expectRoundTrip: true, 80 + }, 81 + } 82 + 83 + for _, tc := range testCases { 84 + t.Run(tc.name, func(t *testing.T) { 85 + // Step 1: Insert tag using UpsertTag (simulates tag creation) 86 + tag := &Tag{ 87 + DID: testUser.DID, 88 + Repository: tc.repository, 89 + Tag: tc.tag, 90 + Digest: "sha256:abc123def456", 91 + CreatedAt: time.Now(), 92 + } 93 + if err := UpsertTag(db, tag); err != nil { 94 + t.Fatalf("Failed to upsert tag: %v", err) 95 + } 96 + 97 + // Step 2: Verify tag was created 98 + var count int 99 + err := db.QueryRow(` 100 + SELECT COUNT(*) FROM tags 101 + WHERE did = ? AND repository = ? AND tag = ? 102 + `, testUser.DID, tc.repository, tc.tag).Scan(&count) 103 + if err != nil { 104 + t.Fatalf("Failed to count tags: %v", err) 105 + } 106 + if count != 1 { 107 + t.Fatalf("Expected 1 tag after insert, got %d", count) 108 + } 109 + 110 + // Step 3: Simulate Jetstream delete flow 111 + // This is what happens in processTag when operation == "delete" 112 + // The rkey comes from ATProto, we need to parse it back to repo/tag 113 + 114 + // First, let's see what the rkey would be (this is how tags are stored in ATProto) 115 + rkey := atproto.RepositoryTagToRKey(tc.repository, tc.tag) 116 + t.Logf("RKey for %s:%s = %s", tc.repository, tc.tag, rkey) 117 + 118 + // Then parse it back (this is what Jetstream does) 119 + parsedRepo, parsedTag := atproto.RKeyToRepositoryTag(rkey) 120 + t.Logf("Parsed back: repository=%s, tag=%s", parsedRepo, parsedTag) 121 + 122 + // Verify round-trip (skip for known limitations) 123 + if tc.expectRoundTrip { 124 + if parsedRepo != tc.repository { 125 + t.Errorf("Repository round-trip failed: stored=%s, parsed=%s", tc.repository, parsedRepo) 126 + } 127 + if parsedTag != tc.tag { 128 + t.Errorf("Tag round-trip failed: stored=%s, parsed=%s", tc.tag, parsedTag) 129 + } 130 + 131 + // Step 4: Delete using parsed values (like Jetstream does) 132 + if err := DeleteTag(db, testUser.DID, parsedRepo, parsedTag); err != nil { 133 + t.Fatalf("Failed to delete tag: %v", err) 134 + } 135 + 136 + // Step 5: Verify tag was deleted 137 + err = db.QueryRow(` 138 + SELECT COUNT(*) FROM tags 139 + WHERE did = ? AND repository = ? AND tag = ? 140 + `, testUser.DID, tc.repository, tc.tag).Scan(&count) 141 + if err != nil { 142 + t.Fatalf("Failed to count tags after delete: %v", err) 143 + } 144 + if count != 0 { 145 + // This is the bug! Tag wasn't deleted 146 + t.Errorf("Expected 0 tags after delete, got %d (tag still exists!)", count) 147 + 148 + // Debug: show what's actually in the database 149 + rows, err := db.Query(` 150 + SELECT repository, tag FROM tags WHERE did = ? 151 + `, testUser.DID) 152 + if err != nil { 153 + t.Logf("Failed to query remaining tags: %v", err) 154 + } else { 155 + t.Logf("Remaining tags in database:") 156 + for rows.Next() { 157 + var repo, tag string 158 + rows.Scan(&repo, &tag) 159 + t.Logf(" - repository=%s, tag=%s", repo, tag) 160 + } 161 + rows.Close() 162 + } 163 + } 164 + } else { 165 + // Known limitation: skip delete test for non-round-trippable cases 166 + t.Logf("Skipping delete test - known limitation: %s != %s or %s != %s", 167 + tc.repository, parsedRepo, tc.tag, parsedTag) 168 + } 169 + }) 170 + } 171 + }
+18 -4
pkg/appview/jetstream/backfill.go
··· 336 336 manifest.ConfigSize = manifestRecord.Config.Size 337 337 } 338 338 339 - // Insert manifest 339 + // Platform info is only stored for multi-arch images in manifest_references table 340 + // Single-arch images don't need platform display (it's obvious) 341 + 342 + // Insert manifest (or get existing ID if already exists) 340 343 manifestID, err := db.InsertManifest(b.db, manifest) 341 344 if err != nil { 342 - // Skip if already exists 345 + // If manifest already exists, get its ID so we can still insert references/layers 343 346 if strings.Contains(err.Error(), "UNIQUE constraint failed") { 344 - return nil 347 + // Query for existing manifest ID 348 + var existingID int64 349 + err := b.db.QueryRow(` 350 + SELECT id FROM manifests 351 + WHERE did = ? AND repository = ? AND digest = ? 352 + `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 353 + 354 + if err != nil { 355 + return fmt.Errorf("failed to get existing manifest ID: %w", err) 356 + } 357 + manifestID = existingID 358 + } else { 359 + return fmt.Errorf("failed to insert manifest: %w", err) 345 360 } 346 - return fmt.Errorf("failed to insert manifest: %w", err) 347 361 } 348 362 349 363 if isManifestList {
+9 -1
pkg/appview/jetstream/worker.go
··· 545 545 if commit.Operation == "delete" { 546 546 // Delete tag - decode rkey back to repository and tag 547 547 repo, tag := atproto.RKeyToRepositoryTag(commit.RKey) 548 - return db.DeleteTag(w.db, commit.DID, repo, tag) 548 + fmt.Printf("Jetstream: Deleting tag: did=%s, repository=%s, tag=%s (from rkey=%s)\n", 549 + commit.DID, repo, tag, commit.RKey) 550 + if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil { 551 + fmt.Printf("Jetstream: ERROR deleting tag: %v\n", err) 552 + return err 553 + } 554 + fmt.Printf("Jetstream: Successfully deleted tag: did=%s, repository=%s, tag=%s\n", 555 + commit.DID, repo, tag) 556 + return nil 549 557 } 550 558 551 559 // Parse tag record
+10 -9
pkg/atproto/manifest_store.go
··· 185 185 if tagOpt, ok := option.(distribution.WithTagOption); ok { 186 186 tag := tagOpt.Tag 187 187 tagRecord := NewTagRecord(s.client.DID(), s.repository, tag, dgst.String()) 188 - tagRKey := repositoryTagToRKey(s.repository, tag) 188 + tagRKey := RepositoryTagToRKey(s.repository, tag) 189 189 _, err = s.client.PutRecord(ctx, TagCollection, tagRKey, tagRecord) 190 190 if err != nil { 191 191 return "", fmt.Errorf("failed to store tag in ATProto: %w", err) ··· 209 209 return dgst.Encoded() 210 210 } 211 211 212 - // repositoryTagToRKey converts a repository and tag to an ATProto record key 212 + // RepositoryTagToRKey converts a repository and tag to an ATProto record key 213 213 // ATProto record keys must match: ^[a-zA-Z0-9._~-]{1,512}$ 214 - func repositoryTagToRKey(repository, tag string) string { 214 + func RepositoryTagToRKey(repository, tag string) string { 215 215 // Combine repository and tag to create a unique key 216 - // Replace invalid characters: slashes become dashes 216 + // Replace invalid characters: slashes become tildes (~) 217 + // We use tilde instead of dash to avoid ambiguity with repository names that contain hyphens 217 218 key := fmt.Sprintf("%s_%s", repository, tag) 218 219 219 - // Replace / with - (slash not allowed in rkeys) 220 - key = strings.ReplaceAll(key, "/", "-") 220 + // Replace / with ~ (slash not allowed in rkeys, tilde is allowed and unlikely in repo names) 221 + key = strings.ReplaceAll(key, "/", "~") 221 222 222 223 return key 223 224 } 224 225 225 226 // RKeyToRepositoryTag converts an ATProto record key back to repository and tag 226 - // This is the inverse of repositoryTagToRKey 227 + // This is the inverse of RepositoryTagToRKey 227 228 // Note: If the tag contains underscores, this will split on the LAST underscore 228 229 func RKeyToRepositoryTag(rkey string) (repository, tag string) { 229 230 // Find the last underscore to split repository and tag ··· 236 237 repository = rkey[:lastUnderscore] 237 238 tag = rkey[lastUnderscore+1:] 238 239 239 - // Convert dashes back to slashes in repository 240 - repository = strings.ReplaceAll(repository, "-", "/") 240 + // Convert tildes back to slashes in repository (tilde was used to encode slashes) 241 + repository = strings.ReplaceAll(repository, "~", "/") 241 242 242 243 return repository, tag 243 244 }
+7 -7
pkg/atproto/manifest_store_test.go
··· 152 152 name: "repo with namespace", 153 153 repository: "org/myapp", 154 154 tag: "v1.0.0", 155 - want: "org-myapp_v1.0.0", 155 + want: "org~myapp_v1.0.0", 156 156 }, 157 157 { 158 158 name: "tag with underscore", ··· 164 164 name: "deep namespace", 165 165 repository: "a/b/c/myapp", 166 166 tag: "prod", 167 - want: "a-b-c-myapp_prod", 167 + want: "a~b~c~myapp_prod", 168 168 }, 169 169 } 170 170 171 171 for _, tt := range tests { 172 172 t.Run(tt.name, func(t *testing.T) { 173 - got := repositoryTagToRKey(tt.repository, tt.tag) 173 + got := RepositoryTagToRKey(tt.repository, tt.tag) 174 174 if got != tt.want { 175 - t.Errorf("repositoryTagToRKey() = %v, want %v", got, tt.want) 175 + t.Errorf("RepositoryTagToRKey() = %v, want %v", got, tt.want) 176 176 } 177 177 }) 178 178 } ··· 194 194 }, 195 195 { 196 196 name: "namespaced repo", 197 - rkey: "org-myapp_v1.0.0", 197 + rkey: "org~myapp_v1.0.0", 198 198 wantRepository: "org/myapp", 199 199 wantTag: "v1.0.0", 200 200 }, ··· 206 206 }, 207 207 { 208 208 name: "deep namespace", 209 - rkey: "a-b-c-myapp_prod", 209 + rkey: "a~b~c~myapp_prod", 210 210 wantRepository: "a/b/c/myapp", 211 211 wantTag: "prod", 212 212 }, ··· 247 247 248 248 for _, tt := range tests { 249 249 t.Run(tt.repository+":"+tt.tag, func(t *testing.T) { 250 - rkey := repositoryTagToRKey(tt.repository, tt.tag) 250 + rkey := RepositoryTagToRKey(tt.repository, tt.tag) 251 251 gotRepo, gotTag := RKeyToRepositoryTag(rkey) 252 252 253 253 if gotRepo != tt.repository {
+3 -3
pkg/atproto/tag_store.go
··· 27 27 // Get retrieves the descriptor for a tag 28 28 func (s *TagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) { 29 29 // Build record key 30 - rkey := repositoryTagToRKey(s.repository, tag) 30 + rkey := RepositoryTagToRKey(s.repository, tag) 31 31 32 32 // Fetch tag record from ATProto 33 33 record, err := s.client.GetRecord(ctx, TagCollection, rkey) ··· 65 65 tagRecord := NewTagRecord(s.client.DID(), s.repository, tag, desc.Digest.String()) 66 66 67 67 // Store in ATProto 68 - rkey := repositoryTagToRKey(s.repository, tag) 68 + rkey := RepositoryTagToRKey(s.repository, tag) 69 69 _, err := s.client.PutRecord(ctx, TagCollection, rkey, tagRecord) 70 70 if err != nil { 71 71 return fmt.Errorf("failed to store tag in ATProto: %w", err) ··· 76 76 77 77 // Untag removes a tag 78 78 func (s *TagStore) Untag(ctx context.Context, tag string) error { 79 - rkey := repositoryTagToRKey(s.repository, tag) 79 + rkey := RepositoryTagToRKey(s.repository, tag) 80 80 return s.client.DeleteRecord(ctx, TagCollection, rkey) 81 81 } 82 82
+3 -3
pkg/atproto/tag_store_test.go
··· 67 67 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 68 68 // Verify query parameters 69 69 query := r.URL.Query() 70 - rkey := repositoryTagToRKey("myapp", tt.tag) 70 + rkey := RepositoryTagToRKey("myapp", tt.tag) 71 71 if query.Get("rkey") != rkey { 72 72 t.Errorf("rkey = %v, want %v", query.Get("rkey"), rkey) 73 73 } ··· 240 240 json.NewDecoder(r.Body).Decode(&body) 241 241 242 242 // Verify rkey 243 - expectedRKey := repositoryTagToRKey("myapp", tt.tag) 243 + expectedRKey := RepositoryTagToRKey("myapp", tt.tag) 244 244 if body["rkey"] != expectedRKey { 245 245 t.Errorf("rkey = %v, want %v", body["rkey"], expectedRKey) 246 246 } ··· 340 340 var body map[string]interface{} 341 341 json.NewDecoder(r.Body).Decode(&body) 342 342 343 - expectedRKey := repositoryTagToRKey("myapp", tt.tag) 343 + expectedRKey := RepositoryTagToRKey("myapp", tt.tag) 344 344 if body["rkey"] != expectedRKey { 345 345 t.Errorf("rkey = %v, want %v", body["rkey"], expectedRKey) 346 346 }
+1 -1
pkg/auth/oauth/client.go
··· 133 133 // OCI artifact manifests (for cosign signatures, SBOMs, attestations) 134 134 "blob:application/vnd.cncf.oras.artifact.manifest.v1+json", 135 135 } 136 - 136 + 137 137 scopes = append(scopes, fmt.Sprintf("rpc:com.atproto.repo.getRecord?aud=%s", "*")) 138 138 139 139 // Add repo scopes