A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
73
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 97d1b3cdd50e4727e5db3c498f4e8bb73851fd39 551 lines 16 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "testing" 8 "time" 9 10 "atcr.io/pkg/atproto" 11 _ "github.com/mattn/go-sqlite3" 12) 13 14// setupTestDB creates an in-memory SQLite database for testing 15func setupTestDB(t *testing.T) *sql.DB { 16 database, err := sql.Open("sqlite3", ":memory:") 17 if err != nil { 18 t.Fatalf("Failed to open test database: %v", err) 19 } 20 21 // Create schema 22 schema := ` 23 CREATE TABLE users ( 24 did TEXT PRIMARY KEY, 25 handle TEXT NOT NULL, 26 pds_endpoint TEXT NOT NULL, 27 avatar TEXT, 28 last_seen TIMESTAMP NOT NULL 29 ); 30 31 CREATE TABLE manifests ( 32 id INTEGER PRIMARY KEY AUTOINCREMENT, 33 did TEXT NOT NULL, 34 repository TEXT NOT NULL, 35 digest TEXT NOT NULL, 36 hold_endpoint TEXT NOT NULL, 37 schema_version INTEGER NOT NULL, 38 media_type TEXT NOT NULL, 39 config_digest TEXT, 40 config_size INTEGER, 41 created_at TIMESTAMP NOT NULL, 42 UNIQUE(did, repository, digest) 43 ); 44 45 CREATE TABLE repository_annotations ( 46 did TEXT NOT NULL, 47 repository TEXT NOT NULL, 48 key TEXT NOT NULL, 49 value TEXT NOT NULL, 50 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 51 PRIMARY KEY(did, repository, key), 52 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 53 ); 54 55 CREATE TABLE layers ( 56 manifest_id INTEGER NOT NULL, 57 digest TEXT NOT NULL, 58 size INTEGER NOT NULL, 59 media_type TEXT NOT NULL, 60 layer_index INTEGER NOT NULL, 61 PRIMARY KEY(manifest_id, layer_index) 62 ); 63 64 CREATE TABLE manifest_references ( 65 manifest_id INTEGER NOT NULL, 66 digest TEXT NOT NULL, 67 media_type TEXT NOT NULL, 68 size INTEGER NOT NULL, 69 platform_architecture TEXT, 70 platform_os TEXT, 71 platform_variant TEXT, 72 platform_os_version TEXT, 73 reference_index INTEGER NOT NULL, 74 PRIMARY KEY(manifest_id, reference_index) 75 ); 76 77 CREATE TABLE tags ( 78 id INTEGER PRIMARY KEY AUTOINCREMENT, 79 did TEXT NOT NULL, 80 repository TEXT NOT NULL, 81 tag TEXT NOT NULL, 82 digest TEXT NOT NULL, 83 created_at TIMESTAMP NOT NULL, 84 UNIQUE(did, repository, tag) 85 ); 86 87 CREATE TABLE stars ( 88 starrer_did TEXT NOT NULL, 89 owner_did TEXT NOT NULL, 90 repository TEXT NOT NULL, 91 created_at TIMESTAMP NOT NULL, 92 PRIMARY KEY(starrer_did, owner_did, repository) 93 ); 94 ` 95 96 if _, err := database.Exec(schema); err != nil { 97 t.Fatalf("Failed to create schema: %v", err) 98 } 99 100 return database 101} 102 103func TestNewProcessor(t *testing.T) { 104 database := setupTestDB(t) 105 defer database.Close() 106 107 tests := []struct { 108 name string 109 useCache bool 110 }{ 111 {"with cache", true}, 112 {"without cache", false}, 113 } 114 115 for _, tt := range tests { 116 t.Run(tt.name, func(t *testing.T) { 117 p := NewProcessor(database, tt.useCache) 118 if p == nil { 119 t.Fatal("NewProcessor returned nil") 120 } 121 if p.db != database { 122 t.Error("Processor database not set correctly") 123 } 124 if p.useCache != tt.useCache { 125 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) 126 } 127 if tt.useCache && p.userCache == nil { 128 t.Error("Cache enabled but userCache is nil") 129 } 130 if !tt.useCache && p.userCache != nil { 131 t.Error("Cache disabled but userCache is not nil") 132 } 133 }) 134 } 135} 136 137func TestProcessManifest_ImageManifest(t *testing.T) { 138 database := setupTestDB(t) 139 defer database.Close() 140 141 p := NewProcessor(database, false) 142 ctx := context.Background() 143 144 // Create test manifest record 145 manifestRecord := &atproto.ManifestRecord{ 146 Repository: "test-app", 147 Digest: "sha256:abc123", 148 MediaType: "application/vnd.oci.image.manifest.v1+json", 149 SchemaVersion: 2, 150 HoldEndpoint: "did:web:hold01.atcr.io", 151 CreatedAt: time.Now(), 152 Config: &atproto.BlobReference{ 153 Digest: "sha256:config123", 154 Size: 1234, 155 }, 156 Layers: []atproto.BlobReference{ 157 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 158 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 159 }, 160 Annotations: map[string]string{ 161 "org.opencontainers.image.title": "Test App", 162 "org.opencontainers.image.description": "A test application", 163 "org.opencontainers.image.source": "https://github.com/test/app", 164 "org.opencontainers.image.licenses": "MIT", 165 "io.atcr.icon": "https://example.com/icon.png", 166 }, 167 } 168 169 // Marshal to bytes for ProcessManifest 170 recordBytes, err := json.Marshal(manifestRecord) 171 if err != nil { 172 t.Fatalf("Failed to marshal manifest: %v", err) 173 } 174 175 // Process manifest 176 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 177 if err != nil { 178 t.Fatalf("ProcessManifest failed: %v", err) 179 } 180 if manifestID == 0 { 181 t.Error("Expected non-zero manifest ID") 182 } 183 184 // Verify manifest was inserted 185 var count int 186 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", 187 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) 188 if err != nil { 189 t.Fatalf("Failed to query manifests: %v", err) 190 } 191 if count != 1 { 192 t.Errorf("Expected 1 manifest, got %d", count) 193 } 194 195 // Verify annotations were stored in repository_annotations table 196 var title, source string 197 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 198 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) 199 if err != nil { 200 t.Fatalf("Failed to query title annotation: %v", err) 201 } 202 if title != "Test App" { 203 t.Errorf("title = %q, want %q", title, "Test App") 204 } 205 206 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 207 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) 208 if err != nil { 209 t.Fatalf("Failed to query source annotation: %v", err) 210 } 211 if source != "https://github.com/test/app" { 212 t.Errorf("source = %q, want %q", source, "https://github.com/test/app") 213 } 214 215 // Verify layers were inserted 216 var layerCount int 217 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 218 if err != nil { 219 t.Fatalf("Failed to query layers: %v", err) 220 } 221 if layerCount != 2 { 222 t.Errorf("Expected 2 layers, got %d", layerCount) 223 } 224 225 // Verify no manifest references (this is an image, not a list) 226 var refCount int 227 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 228 if err != nil { 229 t.Fatalf("Failed to query manifest_references: %v", err) 230 } 231 if refCount != 0 { 232 t.Errorf("Expected 0 manifest references, got %d", refCount) 233 } 234} 235 236func TestProcessManifest_ManifestList(t *testing.T) { 237 database := setupTestDB(t) 238 defer database.Close() 239 240 p := NewProcessor(database, false) 241 ctx := context.Background() 242 243 // Create test manifest list record 244 manifestRecord := &atproto.ManifestRecord{ 245 Repository: "test-app", 246 Digest: "sha256:list123", 247 MediaType: "application/vnd.oci.image.index.v1+json", 248 SchemaVersion: 2, 249 HoldEndpoint: "did:web:hold01.atcr.io", 250 CreatedAt: time.Now(), 251 Manifests: []atproto.ManifestReference{ 252 { 253 Digest: "sha256:amd64manifest", 254 MediaType: "application/vnd.oci.image.manifest.v1+json", 255 Size: 1000, 256 Platform: &atproto.Platform{ 257 Architecture: "amd64", 258 OS: "linux", 259 }, 260 }, 261 { 262 Digest: "sha256:arm64manifest", 263 MediaType: "application/vnd.oci.image.manifest.v1+json", 264 Size: 1100, 265 Platform: &atproto.Platform{ 266 Architecture: "arm64", 267 OS: "linux", 268 Variant: "v8", 269 }, 270 }, 271 }, 272 } 273 274 // Marshal to bytes for ProcessManifest 275 recordBytes, err := json.Marshal(manifestRecord) 276 if err != nil { 277 t.Fatalf("Failed to marshal manifest: %v", err) 278 } 279 280 // Process manifest list 281 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 282 if err != nil { 283 t.Fatalf("ProcessManifest failed: %v", err) 284 } 285 286 // Verify manifest references were inserted 287 var refCount int 288 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 289 if err != nil { 290 t.Fatalf("Failed to query manifest_references: %v", err) 291 } 292 if refCount != 2 { 293 t.Errorf("Expected 2 manifest references, got %d", refCount) 294 } 295 296 // Verify platform info was stored 297 var arch, os string 298 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) 299 if err != nil { 300 t.Fatalf("Failed to query platform info: %v", err) 301 } 302 if arch != "amd64" { 303 t.Errorf("platform_architecture = %q, want %q", arch, "amd64") 304 } 305 if os != "linux" { 306 t.Errorf("platform_os = %q, want %q", os, "linux") 307 } 308 309 // Verify no layers (this is a list, not an image) 310 var layerCount int 311 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 312 if err != nil { 313 t.Fatalf("Failed to query layers: %v", err) 314 } 315 if layerCount != 0 { 316 t.Errorf("Expected 0 layers, got %d", layerCount) 317 } 318} 319 320func TestProcessTag(t *testing.T) { 321 database := setupTestDB(t) 322 defer database.Close() 323 324 p := NewProcessor(database, false) 325 ctx := context.Background() 326 327 // Create test tag record (using ManifestDigest field for simplicity) 328 tagRecord := &atproto.TagRecord{ 329 Repository: "test-app", 330 Tag: "latest", 331 ManifestDigest: "sha256:abc123", 332 UpdatedAt: time.Now(), 333 } 334 335 // Marshal to bytes for ProcessTag 336 recordBytes, err := json.Marshal(tagRecord) 337 if err != nil { 338 t.Fatalf("Failed to marshal tag: %v", err) 339 } 340 341 // Process tag 342 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 343 if err != nil { 344 t.Fatalf("ProcessTag failed: %v", err) 345 } 346 347 // Verify tag was inserted 348 var count int 349 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 350 "did:plc:test123", "test-app", "latest").Scan(&count) 351 if err != nil { 352 t.Fatalf("Failed to query tags: %v", err) 353 } 354 if count != 1 { 355 t.Errorf("Expected 1 tag, got %d", count) 356 } 357 358 // Verify digest was stored 359 var digest string 360 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 361 "did:plc:test123", "test-app", "latest").Scan(&digest) 362 if err != nil { 363 t.Fatalf("Failed to query tag digest: %v", err) 364 } 365 if digest != "sha256:abc123" { 366 t.Errorf("digest = %q, want %q", digest, "sha256:abc123") 367 } 368 369 // Test upserting same tag with new digest 370 tagRecord.ManifestDigest = "sha256:newdigest" 371 recordBytes, err = json.Marshal(tagRecord) 372 if err != nil { 373 t.Fatalf("Failed to marshal tag: %v", err) 374 } 375 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 376 if err != nil { 377 t.Fatalf("ProcessTag (upsert) failed: %v", err) 378 } 379 380 // Verify tag was updated 381 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 382 "did:plc:test123", "test-app", "latest").Scan(&digest) 383 if err != nil { 384 t.Fatalf("Failed to query updated tag: %v", err) 385 } 386 if digest != "sha256:newdigest" { 387 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") 388 } 389 390 // Verify still only one tag (upsert, not insert) 391 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 392 "did:plc:test123", "test-app", "latest").Scan(&count) 393 if err != nil { 394 t.Fatalf("Failed to query tags after upsert: %v", err) 395 } 396 if count != 1 { 397 t.Errorf("Expected 1 tag after upsert, got %d", count) 398 } 399} 400 401func TestProcessStar(t *testing.T) { 402 database := setupTestDB(t) 403 defer database.Close() 404 405 p := NewProcessor(database, false) 406 ctx := context.Background() 407 408 // Create test star record 409 starRecord := &atproto.StarRecord{ 410 Subject: atproto.StarSubject{ 411 DID: "did:plc:owner123", 412 Repository: "test-app", 413 }, 414 CreatedAt: time.Now(), 415 } 416 417 // Marshal to bytes for ProcessStar 418 recordBytes, err := json.Marshal(starRecord) 419 if err != nil { 420 t.Fatalf("Failed to marshal star: %v", err) 421 } 422 423 // Process star 424 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 425 if err != nil { 426 t.Fatalf("ProcessStar failed: %v", err) 427 } 428 429 // Verify star was inserted 430 var count int 431 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 432 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 433 if err != nil { 434 t.Fatalf("Failed to query stars: %v", err) 435 } 436 if count != 1 { 437 t.Errorf("Expected 1 star, got %d", count) 438 } 439 440 // Test upserting same star (should be idempotent) 441 recordBytes, err = json.Marshal(starRecord) 442 if err != nil { 443 t.Fatalf("Failed to marshal star: %v", err) 444 } 445 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 446 if err != nil { 447 t.Fatalf("ProcessStar (upsert) failed: %v", err) 448 } 449 450 // Verify still only one star 451 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 452 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 453 if err != nil { 454 t.Fatalf("Failed to query stars after upsert: %v", err) 455 } 456 if count != 1 { 457 t.Errorf("Expected 1 star after upsert, got %d", count) 458 } 459} 460 461func TestProcessManifest_Duplicate(t *testing.T) { 462 database := setupTestDB(t) 463 defer database.Close() 464 465 p := NewProcessor(database, false) 466 ctx := context.Background() 467 468 manifestRecord := &atproto.ManifestRecord{ 469 Repository: "test-app", 470 Digest: "sha256:abc123", 471 MediaType: "application/vnd.oci.image.manifest.v1+json", 472 SchemaVersion: 2, 473 HoldEndpoint: "did:web:hold01.atcr.io", 474 CreatedAt: time.Now(), 475 } 476 477 // Marshal to bytes for ProcessManifest 478 recordBytes, err := json.Marshal(manifestRecord) 479 if err != nil { 480 t.Fatalf("Failed to marshal manifest: %v", err) 481 } 482 483 // Insert first time 484 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 485 if err != nil { 486 t.Fatalf("First ProcessManifest failed: %v", err) 487 } 488 489 // Insert duplicate 490 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 491 if err != nil { 492 t.Fatalf("Duplicate ProcessManifest failed: %v", err) 493 } 494 495 // Should return existing ID 496 if id1 != id2 { 497 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) 498 } 499 500 // Verify only one manifest exists 501 var count int 502 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", 503 "did:plc:test123", "sha256:abc123").Scan(&count) 504 if err != nil { 505 t.Fatalf("Failed to query manifests: %v", err) 506 } 507 if count != 1 { 508 t.Errorf("Expected 1 manifest, got %d", count) 509 } 510} 511 512func TestProcessManifest_EmptyAnnotations(t *testing.T) { 513 database := setupTestDB(t) 514 defer database.Close() 515 516 p := NewProcessor(database, false) 517 ctx := context.Background() 518 519 // Manifest with nil annotations 520 manifestRecord := &atproto.ManifestRecord{ 521 Repository: "test-app", 522 Digest: "sha256:abc123", 523 MediaType: "application/vnd.oci.image.manifest.v1+json", 524 SchemaVersion: 2, 525 HoldEndpoint: "did:web:hold01.atcr.io", 526 CreatedAt: time.Now(), 527 Annotations: nil, 528 } 529 530 // Marshal to bytes for ProcessManifest 531 recordBytes, err := json.Marshal(manifestRecord) 532 if err != nil { 533 t.Fatalf("Failed to marshal manifest: %v", err) 534 } 535 536 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 537 if err != nil { 538 t.Fatalf("ProcessManifest failed: %v", err) 539 } 540 541 // Verify no annotations were stored (nil annotations should not create entries) 542 var annotationCount int 543 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", 544 "did:plc:test123", "test-app").Scan(&annotationCount) 545 if err != nil { 546 t.Fatalf("Failed to query annotations: %v", err) 547 } 548 if annotationCount != 0 { 549 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) 550 } 551}