A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "testing"
8 "time"
9
10 "atcr.io/pkg/atproto"
11 _ "github.com/mattn/go-sqlite3"
12)
13
14// setupTestDB creates an in-memory SQLite database for testing
15func setupTestDB(t *testing.T) *sql.DB {
16 database, err := sql.Open("sqlite3", ":memory:")
17 if err != nil {
18 t.Fatalf("Failed to open test database: %v", err)
19 }
20
21 // Create schema
22 schema := `
23 CREATE TABLE users (
24 did TEXT PRIMARY KEY,
25 handle TEXT NOT NULL,
26 pds_endpoint TEXT NOT NULL,
27 avatar TEXT,
28 last_seen TIMESTAMP NOT NULL
29 );
30
31 CREATE TABLE manifests (
32 id INTEGER PRIMARY KEY AUTOINCREMENT,
33 did TEXT NOT NULL,
34 repository TEXT NOT NULL,
35 digest TEXT NOT NULL,
36 hold_endpoint TEXT NOT NULL,
37 schema_version INTEGER NOT NULL,
38 media_type TEXT NOT NULL,
39 config_digest TEXT,
40 config_size INTEGER,
41 artifact_type TEXT NOT NULL DEFAULT 'container-image',
42 created_at TIMESTAMP NOT NULL,
43 UNIQUE(did, repository, digest)
44 );
45
46 CREATE TABLE repository_annotations (
47 did TEXT NOT NULL,
48 repository TEXT NOT NULL,
49 key TEXT NOT NULL,
50 value TEXT NOT NULL,
51 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
52 PRIMARY KEY(did, repository, key),
53 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
54 );
55
56 CREATE TABLE layers (
57 manifest_id INTEGER NOT NULL,
58 digest TEXT NOT NULL,
59 size INTEGER NOT NULL,
60 media_type TEXT NOT NULL,
61 layer_index INTEGER NOT NULL,
62 PRIMARY KEY(manifest_id, layer_index)
63 );
64
65 CREATE TABLE manifest_references (
66 manifest_id INTEGER NOT NULL,
67 digest TEXT NOT NULL,
68 media_type TEXT NOT NULL,
69 size INTEGER NOT NULL,
70 platform_architecture TEXT,
71 platform_os TEXT,
72 platform_variant TEXT,
73 platform_os_version TEXT,
74 is_attestation BOOLEAN DEFAULT FALSE,
75 reference_index INTEGER NOT NULL,
76 PRIMARY KEY(manifest_id, reference_index)
77 );
78
79 CREATE TABLE tags (
80 id INTEGER PRIMARY KEY AUTOINCREMENT,
81 did TEXT NOT NULL,
82 repository TEXT NOT NULL,
83 tag TEXT NOT NULL,
84 digest TEXT NOT NULL,
85 created_at TIMESTAMP NOT NULL,
86 UNIQUE(did, repository, tag)
87 );
88
89 CREATE TABLE stars (
90 starrer_did TEXT NOT NULL,
91 owner_did TEXT NOT NULL,
92 repository TEXT NOT NULL,
93 created_at TIMESTAMP NOT NULL,
94 PRIMARY KEY(starrer_did, owner_did, repository)
95 );
96 `
97
98 if _, err := database.Exec(schema); err != nil {
99 t.Fatalf("Failed to create schema: %v", err)
100 }
101
102 return database
103}
104
105func TestNewProcessor(t *testing.T) {
106 database := setupTestDB(t)
107 defer database.Close()
108
109 tests := []struct {
110 name string
111 useCache bool
112 }{
113 {"with cache", true},
114 {"without cache", false},
115 }
116
117 for _, tt := range tests {
118 t.Run(tt.name, func(t *testing.T) {
119 p := NewProcessor(database, tt.useCache, nil)
120 if p == nil {
121 t.Fatal("NewProcessor returned nil")
122 }
123 if p.db != database {
124 t.Error("Processor database not set correctly")
125 }
126 if p.useCache != tt.useCache {
127 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
128 }
129 if tt.useCache && p.userCache == nil {
130 t.Error("Cache enabled but userCache is nil")
131 }
132 if !tt.useCache && p.userCache != nil {
133 t.Error("Cache disabled but userCache is not nil")
134 }
135 })
136 }
137}
138
139func TestProcessManifest_ImageManifest(t *testing.T) {
140 database := setupTestDB(t)
141 defer database.Close()
142
143 p := NewProcessor(database, false, nil)
144 ctx := context.Background()
145
146 // Create test manifest record
147 manifestRecord := &atproto.ManifestRecord{
148 Repository: "test-app",
149 Digest: "sha256:abc123",
150 MediaType: "application/vnd.oci.image.manifest.v1+json",
151 SchemaVersion: 2,
152 HoldEndpoint: "did:web:hold01.atcr.io",
153 CreatedAt: time.Now(),
154 Config: &atproto.BlobReference{
155 Digest: "sha256:config123",
156 Size: 1234,
157 },
158 Layers: []atproto.BlobReference{
159 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
160 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
161 },
162 Annotations: map[string]string{
163 "org.opencontainers.image.title": "Test App",
164 "org.opencontainers.image.description": "A test application",
165 "org.opencontainers.image.source": "https://github.com/test/app",
166 "org.opencontainers.image.licenses": "MIT",
167 "io.atcr.icon": "https://example.com/icon.png",
168 },
169 }
170
171 // Marshal to bytes for ProcessManifest
172 recordBytes, err := json.Marshal(manifestRecord)
173 if err != nil {
174 t.Fatalf("Failed to marshal manifest: %v", err)
175 }
176
177 // Process manifest
178 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
179 if err != nil {
180 t.Fatalf("ProcessManifest failed: %v", err)
181 }
182 if manifestID == 0 {
183 t.Error("Expected non-zero manifest ID")
184 }
185
186 // Verify manifest was inserted
187 var count int
188 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
189 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
190 if err != nil {
191 t.Fatalf("Failed to query manifests: %v", err)
192 }
193 if count != 1 {
194 t.Errorf("Expected 1 manifest, got %d", count)
195 }
196
197 // Verify annotations were stored in repository_annotations table
198 var title, source string
199 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
200 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
201 if err != nil {
202 t.Fatalf("Failed to query title annotation: %v", err)
203 }
204 if title != "Test App" {
205 t.Errorf("title = %q, want %q", title, "Test App")
206 }
207
208 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
209 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
210 if err != nil {
211 t.Fatalf("Failed to query source annotation: %v", err)
212 }
213 if source != "https://github.com/test/app" {
214 t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
215 }
216
217 // Verify layers were inserted
218 var layerCount int
219 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
220 if err != nil {
221 t.Fatalf("Failed to query layers: %v", err)
222 }
223 if layerCount != 2 {
224 t.Errorf("Expected 2 layers, got %d", layerCount)
225 }
226
227 // Verify no manifest references (this is an image, not a list)
228 var refCount int
229 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
230 if err != nil {
231 t.Fatalf("Failed to query manifest_references: %v", err)
232 }
233 if refCount != 0 {
234 t.Errorf("Expected 0 manifest references, got %d", refCount)
235 }
236}
237
238func TestProcessManifest_ManifestList(t *testing.T) {
239 database := setupTestDB(t)
240 defer database.Close()
241
242 p := NewProcessor(database, false, nil)
243 ctx := context.Background()
244
245 // Create test manifest list record
246 manifestRecord := &atproto.ManifestRecord{
247 Repository: "test-app",
248 Digest: "sha256:list123",
249 MediaType: "application/vnd.oci.image.index.v1+json",
250 SchemaVersion: 2,
251 HoldEndpoint: "did:web:hold01.atcr.io",
252 CreatedAt: time.Now(),
253 Manifests: []atproto.ManifestReference{
254 {
255 Digest: "sha256:amd64manifest",
256 MediaType: "application/vnd.oci.image.manifest.v1+json",
257 Size: 1000,
258 Platform: &atproto.Platform{
259 Architecture: "amd64",
260 OS: "linux",
261 },
262 },
263 {
264 Digest: "sha256:arm64manifest",
265 MediaType: "application/vnd.oci.image.manifest.v1+json",
266 Size: 1100,
267 Platform: &atproto.Platform{
268 Architecture: "arm64",
269 OS: "linux",
270 Variant: "v8",
271 },
272 },
273 },
274 }
275
276 // Marshal to bytes for ProcessManifest
277 recordBytes, err := json.Marshal(manifestRecord)
278 if err != nil {
279 t.Fatalf("Failed to marshal manifest: %v", err)
280 }
281
282 // Process manifest list
283 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
284 if err != nil {
285 t.Fatalf("ProcessManifest failed: %v", err)
286 }
287
288 // Verify manifest references were inserted
289 var refCount int
290 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
291 if err != nil {
292 t.Fatalf("Failed to query manifest_references: %v", err)
293 }
294 if refCount != 2 {
295 t.Errorf("Expected 2 manifest references, got %d", refCount)
296 }
297
298 // Verify platform info was stored
299 var arch, os string
300 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
301 if err != nil {
302 t.Fatalf("Failed to query platform info: %v", err)
303 }
304 if arch != "amd64" {
305 t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
306 }
307 if os != "linux" {
308 t.Errorf("platform_os = %q, want %q", os, "linux")
309 }
310
311 // Verify no layers (this is a list, not an image)
312 var layerCount int
313 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
314 if err != nil {
315 t.Fatalf("Failed to query layers: %v", err)
316 }
317 if layerCount != 0 {
318 t.Errorf("Expected 0 layers, got %d", layerCount)
319 }
320}
321
322func TestProcessTag(t *testing.T) {
323 database := setupTestDB(t)
324 defer database.Close()
325
326 p := NewProcessor(database, false, nil)
327 ctx := context.Background()
328
329 // Create test tag record (using ManifestDigest field for simplicity)
330 tagRecord := &atproto.TagRecord{
331 Repository: "test-app",
332 Tag: "latest",
333 ManifestDigest: "sha256:abc123",
334 UpdatedAt: time.Now(),
335 }
336
337 // Marshal to bytes for ProcessTag
338 recordBytes, err := json.Marshal(tagRecord)
339 if err != nil {
340 t.Fatalf("Failed to marshal tag: %v", err)
341 }
342
343 // Process tag
344 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
345 if err != nil {
346 t.Fatalf("ProcessTag failed: %v", err)
347 }
348
349 // Verify tag was inserted
350 var count int
351 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
352 "did:plc:test123", "test-app", "latest").Scan(&count)
353 if err != nil {
354 t.Fatalf("Failed to query tags: %v", err)
355 }
356 if count != 1 {
357 t.Errorf("Expected 1 tag, got %d", count)
358 }
359
360 // Verify digest was stored
361 var digest string
362 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
363 "did:plc:test123", "test-app", "latest").Scan(&digest)
364 if err != nil {
365 t.Fatalf("Failed to query tag digest: %v", err)
366 }
367 if digest != "sha256:abc123" {
368 t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
369 }
370
371 // Test upserting same tag with new digest
372 tagRecord.ManifestDigest = "sha256:newdigest"
373 recordBytes, err = json.Marshal(tagRecord)
374 if err != nil {
375 t.Fatalf("Failed to marshal tag: %v", err)
376 }
377 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
378 if err != nil {
379 t.Fatalf("ProcessTag (upsert) failed: %v", err)
380 }
381
382 // Verify tag was updated
383 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
384 "did:plc:test123", "test-app", "latest").Scan(&digest)
385 if err != nil {
386 t.Fatalf("Failed to query updated tag: %v", err)
387 }
388 if digest != "sha256:newdigest" {
389 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
390 }
391
392 // Verify still only one tag (upsert, not insert)
393 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
394 "did:plc:test123", "test-app", "latest").Scan(&count)
395 if err != nil {
396 t.Fatalf("Failed to query tags after upsert: %v", err)
397 }
398 if count != 1 {
399 t.Errorf("Expected 1 tag after upsert, got %d", count)
400 }
401}
402
403func TestProcessStar(t *testing.T) {
404 database := setupTestDB(t)
405 defer database.Close()
406
407 p := NewProcessor(database, false, nil)
408 ctx := context.Background()
409
410 // Create test star record
411 starRecord := &atproto.StarRecord{
412 Subject: atproto.StarSubject{
413 DID: "did:plc:owner123",
414 Repository: "test-app",
415 },
416 CreatedAt: time.Now(),
417 }
418
419 // Marshal to bytes for ProcessStar
420 recordBytes, err := json.Marshal(starRecord)
421 if err != nil {
422 t.Fatalf("Failed to marshal star: %v", err)
423 }
424
425 // Process star
426 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
427 if err != nil {
428 t.Fatalf("ProcessStar failed: %v", err)
429 }
430
431 // Verify star was inserted
432 var count int
433 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
434 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
435 if err != nil {
436 t.Fatalf("Failed to query stars: %v", err)
437 }
438 if count != 1 {
439 t.Errorf("Expected 1 star, got %d", count)
440 }
441
442 // Test upserting same star (should be idempotent)
443 recordBytes, err = json.Marshal(starRecord)
444 if err != nil {
445 t.Fatalf("Failed to marshal star: %v", err)
446 }
447 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
448 if err != nil {
449 t.Fatalf("ProcessStar (upsert) failed: %v", err)
450 }
451
452 // Verify still only one star
453 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
454 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
455 if err != nil {
456 t.Fatalf("Failed to query stars after upsert: %v", err)
457 }
458 if count != 1 {
459 t.Errorf("Expected 1 star after upsert, got %d", count)
460 }
461}
462
463func TestProcessManifest_Duplicate(t *testing.T) {
464 database := setupTestDB(t)
465 defer database.Close()
466
467 p := NewProcessor(database, false, nil)
468 ctx := context.Background()
469
470 manifestRecord := &atproto.ManifestRecord{
471 Repository: "test-app",
472 Digest: "sha256:abc123",
473 MediaType: "application/vnd.oci.image.manifest.v1+json",
474 SchemaVersion: 2,
475 HoldEndpoint: "did:web:hold01.atcr.io",
476 CreatedAt: time.Now(),
477 }
478
479 // Marshal to bytes for ProcessManifest
480 recordBytes, err := json.Marshal(manifestRecord)
481 if err != nil {
482 t.Fatalf("Failed to marshal manifest: %v", err)
483 }
484
485 // Insert first time
486 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
487 if err != nil {
488 t.Fatalf("First ProcessManifest failed: %v", err)
489 }
490
491 // Insert duplicate
492 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
493 if err != nil {
494 t.Fatalf("Duplicate ProcessManifest failed: %v", err)
495 }
496
497 // Should return existing ID
498 if id1 != id2 {
499 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
500 }
501
502 // Verify only one manifest exists
503 var count int
504 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
505 "did:plc:test123", "sha256:abc123").Scan(&count)
506 if err != nil {
507 t.Fatalf("Failed to query manifests: %v", err)
508 }
509 if count != 1 {
510 t.Errorf("Expected 1 manifest, got %d", count)
511 }
512}
513
514func TestProcessManifest_EmptyAnnotations(t *testing.T) {
515 database := setupTestDB(t)
516 defer database.Close()
517
518 p := NewProcessor(database, false, nil)
519 ctx := context.Background()
520
521 // Manifest with nil annotations
522 manifestRecord := &atproto.ManifestRecord{
523 Repository: "test-app",
524 Digest: "sha256:abc123",
525 MediaType: "application/vnd.oci.image.manifest.v1+json",
526 SchemaVersion: 2,
527 HoldEndpoint: "did:web:hold01.atcr.io",
528 CreatedAt: time.Now(),
529 Annotations: nil,
530 }
531
532 // Marshal to bytes for ProcessManifest
533 recordBytes, err := json.Marshal(manifestRecord)
534 if err != nil {
535 t.Fatalf("Failed to marshal manifest: %v", err)
536 }
537
538 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
539 if err != nil {
540 t.Fatalf("ProcessManifest failed: %v", err)
541 }
542
543 // Verify no annotations were stored (nil annotations should not create entries)
544 var annotationCount int
545 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
546 "did:plc:test123", "test-app").Scan(&annotationCount)
547 if err != nil {
548 t.Fatalf("Failed to query annotations: %v", err)
549 }
550 if annotationCount != 0 {
551 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
552 }
553}
554
555func TestProcessIdentity(t *testing.T) {
556 db := setupTestDB(t)
557 defer db.Close()
558
559 processor := NewProcessor(db, false, nil)
560
561 // Setup: Create test user
562 testDID := "did:plc:alice123"
563 testHandle := "alice.bsky.social"
564 testPDS := "https://bsky.social"
565 _, err := db.Exec(`
566 INSERT INTO users (did, handle, pds_endpoint, last_seen)
567 VALUES (?, ?, ?, ?)
568 `, testDID, testHandle, testPDS, time.Now())
569 if err != nil {
570 t.Fatalf("Failed to insert test user: %v", err)
571 }
572
573 // Test 1: Process identity change event
574 newHandle := "alice-new.bsky.social"
575 err = processor.ProcessIdentity(context.Background(), testDID, newHandle)
576 // Note: This will fail to invalidate cache since we don't have a real identity directory,
577 // but we can still verify the database update happened
578 if err != nil {
579 t.Logf("Expected cache invalidation error (no real directory): %v", err)
580 }
581
582 // Verify handle was updated in database
583 var retrievedHandle string
584 err = db.QueryRow(`
585 SELECT handle FROM users WHERE did = ?
586 `, testDID).Scan(&retrievedHandle)
587 if err != nil {
588 t.Fatalf("Failed to query updated user: %v", err)
589 }
590 if retrievedHandle != newHandle {
591 t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle)
592 }
593
594 // Test 2: Process identity change for non-existent user
595 // Should not error (UPDATE just affects 0 rows)
596 err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle")
597 if err != nil {
598 t.Logf("Expected cache invalidation error: %v", err)
599 }
600
601 // Test 3: Process multiple identity changes
602 handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"}
603 for _, handle := range handles {
604 err = processor.ProcessIdentity(context.Background(), testDID, handle)
605 if err != nil {
606 t.Logf("Expected cache invalidation error: %v", err)
607 }
608
609 err = db.QueryRow(`
610 SELECT handle FROM users WHERE did = ?
611 `, testDID).Scan(&retrievedHandle)
612 if err != nil {
613 t.Fatalf("Failed to query user after handle update: %v", err)
614 }
615 if retrievedHandle != handle {
616 t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle)
617 }
618 }
619}
620
621func TestProcessAccount(t *testing.T) {
622 db := setupTestDB(t)
623 defer db.Close()
624
625 processor := NewProcessor(db, false, nil)
626
627 // Setup: Create test user
628 testDID := "did:plc:bob456"
629 testHandle := "bob.bsky.social"
630 testPDS := "https://bsky.social"
631 _, err := db.Exec(`
632 INSERT INTO users (did, handle, pds_endpoint, last_seen)
633 VALUES (?, ?, ?, ?)
634 `, testDID, testHandle, testPDS, time.Now())
635 if err != nil {
636 t.Fatalf("Failed to insert test user: %v", err)
637 }
638
639 // Test 1: Process account deactivation event
640 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
641 // Note: Cache invalidation will fail without real directory, but that's expected
642 if err != nil {
643 t.Logf("Expected cache invalidation error (no real directory): %v", err)
644 }
645
646 // Verify user still exists in database (we don't delete on deactivation)
647 var exists bool
648 err = db.QueryRow(`
649 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
650 `, testDID).Scan(&exists)
651 if err != nil {
652 t.Fatalf("Failed to check if user exists: %v", err)
653 }
654 if !exists {
655 t.Error("User should still exist after deactivation event (no deletion)")
656 }
657
658 // Test 2: Process account with active=true (should be ignored)
659 err = processor.ProcessAccount(context.Background(), testDID, true, "active")
660 if err != nil {
661 t.Errorf("Expected no error for active account, got: %v", err)
662 }
663
664 // Test 3: Process account with status != "deactivated" (should be ignored)
665 err = processor.ProcessAccount(context.Background(), testDID, false, "suspended")
666 if err != nil {
667 t.Errorf("Expected no error for non-deactivated status, got: %v", err)
668 }
669
670 // Test 4: Process account deactivation for non-existent user
671 err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated")
672 // Cache invalidation will fail, but that's expected
673 if err != nil {
674 t.Logf("Expected cache invalidation error: %v", err)
675 }
676
677 // Test 5: Process multiple deactivation events (idempotent)
678 for i := 0; i < 3; i++ {
679 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
680 if err != nil {
681 t.Logf("Expected cache invalidation error on iteration %d: %v", i, err)
682 }
683 }
684
685 // User should still exist after multiple deactivations
686 err = db.QueryRow(`
687 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
688 `, testDID).Scan(&exists)
689 if err != nil {
690 t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err)
691 }
692 if !exists {
693 t.Error("User should still exist after multiple deactivation events")
694 }
695
696 // Test 6: Process account deletion - should delete user data
697 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
698 if err != nil {
699 t.Logf("Cache invalidation error during deletion (expected): %v", err)
700 }
701
702 // User should be deleted after "deleted" status
703 err = db.QueryRow(`
704 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
705 `, testDID).Scan(&exists)
706 if err != nil {
707 t.Fatalf("Failed to check if user exists after deletion: %v", err)
708 }
709 if exists {
710 t.Error("User should NOT exist after deletion event")
711 }
712
713 // Test 7: Process deletion for already-deleted user (idempotent)
714 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
715 if err != nil {
716 t.Errorf("Deletion of non-existent user should not error, got: %v", err)
717 }
718}