A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "testing"
8 "time"
9
10 "atcr.io/pkg/atproto"
11 _ "github.com/mattn/go-sqlite3"
12)
13
14// setupTestDB creates an in-memory SQLite database for testing
15func setupTestDB(t *testing.T) *sql.DB {
16 database, err := sql.Open("sqlite3", ":memory:")
17 if err != nil {
18 t.Fatalf("Failed to open test database: %v", err)
19 }
20
21 // Create schema
22 schema := `
23 CREATE TABLE users (
24 did TEXT PRIMARY KEY,
25 handle TEXT NOT NULL,
26 pds_endpoint TEXT NOT NULL,
27 avatar TEXT,
28 last_seen TIMESTAMP NOT NULL
29 );
30
31 CREATE TABLE manifests (
32 id INTEGER PRIMARY KEY AUTOINCREMENT,
33 did TEXT NOT NULL,
34 repository TEXT NOT NULL,
35 digest TEXT NOT NULL,
36 hold_endpoint TEXT NOT NULL,
37 schema_version INTEGER NOT NULL,
38 media_type TEXT NOT NULL,
39 config_digest TEXT,
40 config_size INTEGER,
41 created_at TIMESTAMP NOT NULL,
42 UNIQUE(did, repository, digest)
43 );
44
45 CREATE TABLE repository_annotations (
46 did TEXT NOT NULL,
47 repository TEXT NOT NULL,
48 key TEXT NOT NULL,
49 value TEXT NOT NULL,
50 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
51 PRIMARY KEY(did, repository, key),
52 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
53 );
54
55 CREATE TABLE layers (
56 manifest_id INTEGER NOT NULL,
57 digest TEXT NOT NULL,
58 size INTEGER NOT NULL,
59 media_type TEXT NOT NULL,
60 layer_index INTEGER NOT NULL,
61 PRIMARY KEY(manifest_id, layer_index)
62 );
63
64 CREATE TABLE manifest_references (
65 manifest_id INTEGER NOT NULL,
66 digest TEXT NOT NULL,
67 media_type TEXT NOT NULL,
68 size INTEGER NOT NULL,
69 platform_architecture TEXT,
70 platform_os TEXT,
71 platform_variant TEXT,
72 platform_os_version TEXT,
73 reference_index INTEGER NOT NULL,
74 PRIMARY KEY(manifest_id, reference_index)
75 );
76
77 CREATE TABLE tags (
78 id INTEGER PRIMARY KEY AUTOINCREMENT,
79 did TEXT NOT NULL,
80 repository TEXT NOT NULL,
81 tag TEXT NOT NULL,
82 digest TEXT NOT NULL,
83 created_at TIMESTAMP NOT NULL,
84 UNIQUE(did, repository, tag)
85 );
86
87 CREATE TABLE stars (
88 starrer_did TEXT NOT NULL,
89 owner_did TEXT NOT NULL,
90 repository TEXT NOT NULL,
91 created_at TIMESTAMP NOT NULL,
92 PRIMARY KEY(starrer_did, owner_did, repository)
93 );
94 `
95
96 if _, err := database.Exec(schema); err != nil {
97 t.Fatalf("Failed to create schema: %v", err)
98 }
99
100 return database
101}
102
103func TestNewProcessor(t *testing.T) {
104 database := setupTestDB(t)
105 defer database.Close()
106
107 tests := []struct {
108 name string
109 useCache bool
110 }{
111 {"with cache", true},
112 {"without cache", false},
113 }
114
115 for _, tt := range tests {
116 t.Run(tt.name, func(t *testing.T) {
117 p := NewProcessor(database, tt.useCache)
118 if p == nil {
119 t.Fatal("NewProcessor returned nil")
120 }
121 if p.db != database {
122 t.Error("Processor database not set correctly")
123 }
124 if p.useCache != tt.useCache {
125 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
126 }
127 if tt.useCache && p.userCache == nil {
128 t.Error("Cache enabled but userCache is nil")
129 }
130 if !tt.useCache && p.userCache != nil {
131 t.Error("Cache disabled but userCache is not nil")
132 }
133 })
134 }
135}
136
137func TestProcessManifest_ImageManifest(t *testing.T) {
138 database := setupTestDB(t)
139 defer database.Close()
140
141 p := NewProcessor(database, false)
142 ctx := context.Background()
143
144 // Create test manifest record
145 manifestRecord := &atproto.ManifestRecord{
146 Repository: "test-app",
147 Digest: "sha256:abc123",
148 MediaType: "application/vnd.oci.image.manifest.v1+json",
149 SchemaVersion: 2,
150 HoldEndpoint: "did:web:hold01.atcr.io",
151 CreatedAt: time.Now(),
152 Config: &atproto.BlobReference{
153 Digest: "sha256:config123",
154 Size: 1234,
155 },
156 Layers: []atproto.BlobReference{
157 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
158 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
159 },
160 Annotations: map[string]string{
161 "org.opencontainers.image.title": "Test App",
162 "org.opencontainers.image.description": "A test application",
163 "org.opencontainers.image.source": "https://github.com/test/app",
164 "org.opencontainers.image.licenses": "MIT",
165 "io.atcr.icon": "https://example.com/icon.png",
166 },
167 }
168
169 // Marshal to bytes for ProcessManifest
170 recordBytes, err := json.Marshal(manifestRecord)
171 if err != nil {
172 t.Fatalf("Failed to marshal manifest: %v", err)
173 }
174
175 // Process manifest
176 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
177 if err != nil {
178 t.Fatalf("ProcessManifest failed: %v", err)
179 }
180 if manifestID == 0 {
181 t.Error("Expected non-zero manifest ID")
182 }
183
184 // Verify manifest was inserted
185 var count int
186 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
187 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
188 if err != nil {
189 t.Fatalf("Failed to query manifests: %v", err)
190 }
191 if count != 1 {
192 t.Errorf("Expected 1 manifest, got %d", count)
193 }
194
195 // Verify annotations were stored in repository_annotations table
196 var title, source string
197 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
198 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
199 if err != nil {
200 t.Fatalf("Failed to query title annotation: %v", err)
201 }
202 if title != "Test App" {
203 t.Errorf("title = %q, want %q", title, "Test App")
204 }
205
206 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
207 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
208 if err != nil {
209 t.Fatalf("Failed to query source annotation: %v", err)
210 }
211 if source != "https://github.com/test/app" {
212 t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
213 }
214
215 // Verify layers were inserted
216 var layerCount int
217 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
218 if err != nil {
219 t.Fatalf("Failed to query layers: %v", err)
220 }
221 if layerCount != 2 {
222 t.Errorf("Expected 2 layers, got %d", layerCount)
223 }
224
225 // Verify no manifest references (this is an image, not a list)
226 var refCount int
227 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
228 if err != nil {
229 t.Fatalf("Failed to query manifest_references: %v", err)
230 }
231 if refCount != 0 {
232 t.Errorf("Expected 0 manifest references, got %d", refCount)
233 }
234}
235
236func TestProcessManifest_ManifestList(t *testing.T) {
237 database := setupTestDB(t)
238 defer database.Close()
239
240 p := NewProcessor(database, false)
241 ctx := context.Background()
242
243 // Create test manifest list record
244 manifestRecord := &atproto.ManifestRecord{
245 Repository: "test-app",
246 Digest: "sha256:list123",
247 MediaType: "application/vnd.oci.image.index.v1+json",
248 SchemaVersion: 2,
249 HoldEndpoint: "did:web:hold01.atcr.io",
250 CreatedAt: time.Now(),
251 Manifests: []atproto.ManifestReference{
252 {
253 Digest: "sha256:amd64manifest",
254 MediaType: "application/vnd.oci.image.manifest.v1+json",
255 Size: 1000,
256 Platform: &atproto.Platform{
257 Architecture: "amd64",
258 OS: "linux",
259 },
260 },
261 {
262 Digest: "sha256:arm64manifest",
263 MediaType: "application/vnd.oci.image.manifest.v1+json",
264 Size: 1100,
265 Platform: &atproto.Platform{
266 Architecture: "arm64",
267 OS: "linux",
268 Variant: "v8",
269 },
270 },
271 },
272 }
273
274 // Marshal to bytes for ProcessManifest
275 recordBytes, err := json.Marshal(manifestRecord)
276 if err != nil {
277 t.Fatalf("Failed to marshal manifest: %v", err)
278 }
279
280 // Process manifest list
281 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
282 if err != nil {
283 t.Fatalf("ProcessManifest failed: %v", err)
284 }
285
286 // Verify manifest references were inserted
287 var refCount int
288 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
289 if err != nil {
290 t.Fatalf("Failed to query manifest_references: %v", err)
291 }
292 if refCount != 2 {
293 t.Errorf("Expected 2 manifest references, got %d", refCount)
294 }
295
296 // Verify platform info was stored
297 var arch, os string
298 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
299 if err != nil {
300 t.Fatalf("Failed to query platform info: %v", err)
301 }
302 if arch != "amd64" {
303 t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
304 }
305 if os != "linux" {
306 t.Errorf("platform_os = %q, want %q", os, "linux")
307 }
308
309 // Verify no layers (this is a list, not an image)
310 var layerCount int
311 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
312 if err != nil {
313 t.Fatalf("Failed to query layers: %v", err)
314 }
315 if layerCount != 0 {
316 t.Errorf("Expected 0 layers, got %d", layerCount)
317 }
318}
319
320func TestProcessTag(t *testing.T) {
321 database := setupTestDB(t)
322 defer database.Close()
323
324 p := NewProcessor(database, false)
325 ctx := context.Background()
326
327 // Create test tag record (using ManifestDigest field for simplicity)
328 tagRecord := &atproto.TagRecord{
329 Repository: "test-app",
330 Tag: "latest",
331 ManifestDigest: "sha256:abc123",
332 UpdatedAt: time.Now(),
333 }
334
335 // Marshal to bytes for ProcessTag
336 recordBytes, err := json.Marshal(tagRecord)
337 if err != nil {
338 t.Fatalf("Failed to marshal tag: %v", err)
339 }
340
341 // Process tag
342 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
343 if err != nil {
344 t.Fatalf("ProcessTag failed: %v", err)
345 }
346
347 // Verify tag was inserted
348 var count int
349 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
350 "did:plc:test123", "test-app", "latest").Scan(&count)
351 if err != nil {
352 t.Fatalf("Failed to query tags: %v", err)
353 }
354 if count != 1 {
355 t.Errorf("Expected 1 tag, got %d", count)
356 }
357
358 // Verify digest was stored
359 var digest string
360 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
361 "did:plc:test123", "test-app", "latest").Scan(&digest)
362 if err != nil {
363 t.Fatalf("Failed to query tag digest: %v", err)
364 }
365 if digest != "sha256:abc123" {
366 t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
367 }
368
369 // Test upserting same tag with new digest
370 tagRecord.ManifestDigest = "sha256:newdigest"
371 recordBytes, err = json.Marshal(tagRecord)
372 if err != nil {
373 t.Fatalf("Failed to marshal tag: %v", err)
374 }
375 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
376 if err != nil {
377 t.Fatalf("ProcessTag (upsert) failed: %v", err)
378 }
379
380 // Verify tag was updated
381 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
382 "did:plc:test123", "test-app", "latest").Scan(&digest)
383 if err != nil {
384 t.Fatalf("Failed to query updated tag: %v", err)
385 }
386 if digest != "sha256:newdigest" {
387 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
388 }
389
390 // Verify still only one tag (upsert, not insert)
391 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
392 "did:plc:test123", "test-app", "latest").Scan(&count)
393 if err != nil {
394 t.Fatalf("Failed to query tags after upsert: %v", err)
395 }
396 if count != 1 {
397 t.Errorf("Expected 1 tag after upsert, got %d", count)
398 }
399}
400
401func TestProcessStar(t *testing.T) {
402 database := setupTestDB(t)
403 defer database.Close()
404
405 p := NewProcessor(database, false)
406 ctx := context.Background()
407
408 // Create test star record
409 starRecord := &atproto.StarRecord{
410 Subject: atproto.StarSubject{
411 DID: "did:plc:owner123",
412 Repository: "test-app",
413 },
414 CreatedAt: time.Now(),
415 }
416
417 // Marshal to bytes for ProcessStar
418 recordBytes, err := json.Marshal(starRecord)
419 if err != nil {
420 t.Fatalf("Failed to marshal star: %v", err)
421 }
422
423 // Process star
424 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
425 if err != nil {
426 t.Fatalf("ProcessStar failed: %v", err)
427 }
428
429 // Verify star was inserted
430 var count int
431 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
432 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
433 if err != nil {
434 t.Fatalf("Failed to query stars: %v", err)
435 }
436 if count != 1 {
437 t.Errorf("Expected 1 star, got %d", count)
438 }
439
440 // Test upserting same star (should be idempotent)
441 recordBytes, err = json.Marshal(starRecord)
442 if err != nil {
443 t.Fatalf("Failed to marshal star: %v", err)
444 }
445 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
446 if err != nil {
447 t.Fatalf("ProcessStar (upsert) failed: %v", err)
448 }
449
450 // Verify still only one star
451 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
452 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
453 if err != nil {
454 t.Fatalf("Failed to query stars after upsert: %v", err)
455 }
456 if count != 1 {
457 t.Errorf("Expected 1 star after upsert, got %d", count)
458 }
459}
460
461func TestProcessManifest_Duplicate(t *testing.T) {
462 database := setupTestDB(t)
463 defer database.Close()
464
465 p := NewProcessor(database, false)
466 ctx := context.Background()
467
468 manifestRecord := &atproto.ManifestRecord{
469 Repository: "test-app",
470 Digest: "sha256:abc123",
471 MediaType: "application/vnd.oci.image.manifest.v1+json",
472 SchemaVersion: 2,
473 HoldEndpoint: "did:web:hold01.atcr.io",
474 CreatedAt: time.Now(),
475 }
476
477 // Marshal to bytes for ProcessManifest
478 recordBytes, err := json.Marshal(manifestRecord)
479 if err != nil {
480 t.Fatalf("Failed to marshal manifest: %v", err)
481 }
482
483 // Insert first time
484 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
485 if err != nil {
486 t.Fatalf("First ProcessManifest failed: %v", err)
487 }
488
489 // Insert duplicate
490 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
491 if err != nil {
492 t.Fatalf("Duplicate ProcessManifest failed: %v", err)
493 }
494
495 // Should return existing ID
496 if id1 != id2 {
497 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
498 }
499
500 // Verify only one manifest exists
501 var count int
502 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
503 "did:plc:test123", "sha256:abc123").Scan(&count)
504 if err != nil {
505 t.Fatalf("Failed to query manifests: %v", err)
506 }
507 if count != 1 {
508 t.Errorf("Expected 1 manifest, got %d", count)
509 }
510}
511
512func TestProcessManifest_EmptyAnnotations(t *testing.T) {
513 database := setupTestDB(t)
514 defer database.Close()
515
516 p := NewProcessor(database, false)
517 ctx := context.Background()
518
519 // Manifest with nil annotations
520 manifestRecord := &atproto.ManifestRecord{
521 Repository: "test-app",
522 Digest: "sha256:abc123",
523 MediaType: "application/vnd.oci.image.manifest.v1+json",
524 SchemaVersion: 2,
525 HoldEndpoint: "did:web:hold01.atcr.io",
526 CreatedAt: time.Now(),
527 Annotations: nil,
528 }
529
530 // Marshal to bytes for ProcessManifest
531 recordBytes, err := json.Marshal(manifestRecord)
532 if err != nil {
533 t.Fatalf("Failed to marshal manifest: %v", err)
534 }
535
536 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
537 if err != nil {
538 t.Fatalf("ProcessManifest failed: %v", err)
539 }
540
541 // Verify no annotations were stored (nil annotations should not create entries)
542 var annotationCount int
543 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
544 "did:plc:test123", "test-app").Scan(&annotationCount)
545 if err != nil {
546 t.Fatalf("Failed to query annotations: %v", err)
547 }
548 if annotationCount != 0 {
549 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
550 }
551}