A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codeberg-source 718 lines 21 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "testing" 8 "time" 9 10 "atcr.io/pkg/atproto" 11 _ "github.com/mattn/go-sqlite3" 12) 13 14// setupTestDB creates an in-memory SQLite database for testing 15func setupTestDB(t *testing.T) *sql.DB { 16 database, err := sql.Open("sqlite3", ":memory:") 17 if err != nil { 18 t.Fatalf("Failed to open test database: %v", err) 19 } 20 21 // Create schema 22 schema := ` 23 CREATE TABLE users ( 24 did TEXT PRIMARY KEY, 25 handle TEXT NOT NULL, 26 pds_endpoint TEXT NOT NULL, 27 avatar TEXT, 28 last_seen TIMESTAMP NOT NULL 29 ); 30 31 CREATE TABLE manifests ( 32 id INTEGER PRIMARY KEY AUTOINCREMENT, 33 did TEXT NOT NULL, 34 repository TEXT NOT NULL, 35 digest TEXT NOT NULL, 36 hold_endpoint TEXT NOT NULL, 37 schema_version INTEGER NOT NULL, 38 media_type TEXT NOT NULL, 39 config_digest TEXT, 40 config_size INTEGER, 41 artifact_type TEXT NOT NULL DEFAULT 'container-image', 42 created_at TIMESTAMP NOT NULL, 43 UNIQUE(did, repository, digest) 44 ); 45 46 CREATE TABLE repository_annotations ( 47 did TEXT NOT NULL, 48 repository TEXT NOT NULL, 49 key TEXT NOT NULL, 50 value TEXT NOT NULL, 51 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 52 PRIMARY KEY(did, repository, key), 53 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 54 ); 55 56 CREATE TABLE layers ( 57 manifest_id INTEGER NOT NULL, 58 digest TEXT NOT NULL, 59 size INTEGER NOT NULL, 60 media_type TEXT NOT NULL, 61 layer_index INTEGER NOT NULL, 62 PRIMARY KEY(manifest_id, layer_index) 63 ); 64 65 CREATE TABLE manifest_references ( 66 manifest_id INTEGER NOT NULL, 67 digest TEXT NOT NULL, 68 media_type TEXT NOT NULL, 69 size INTEGER NOT NULL, 70 platform_architecture TEXT, 71 platform_os TEXT, 72 platform_variant TEXT, 73 platform_os_version TEXT, 74 is_attestation BOOLEAN DEFAULT FALSE, 75 reference_index INTEGER NOT NULL, 76 PRIMARY KEY(manifest_id, reference_index) 77 ); 78 79 CREATE TABLE tags ( 80 id INTEGER PRIMARY KEY AUTOINCREMENT, 81 did TEXT NOT NULL, 82 repository TEXT NOT NULL, 83 tag TEXT NOT NULL, 84 digest TEXT NOT NULL, 85 created_at TIMESTAMP NOT NULL, 86 UNIQUE(did, repository, tag) 87 ); 88 89 CREATE TABLE stars ( 90 starrer_did TEXT NOT NULL, 91 owner_did TEXT NOT NULL, 92 repository TEXT NOT NULL, 93 created_at TIMESTAMP NOT NULL, 94 PRIMARY KEY(starrer_did, owner_did, repository) 95 ); 96 ` 97 98 if _, err := database.Exec(schema); err != nil { 99 t.Fatalf("Failed to create schema: %v", err) 100 } 101 102 return database 103} 104 105func TestNewProcessor(t *testing.T) { 106 database := setupTestDB(t) 107 defer database.Close() 108 109 tests := []struct { 110 name string 111 useCache bool 112 }{ 113 {"with cache", true}, 114 {"without cache", false}, 115 } 116 117 for _, tt := range tests { 118 t.Run(tt.name, func(t *testing.T) { 119 p := NewProcessor(database, tt.useCache, nil) 120 if p == nil { 121 t.Fatal("NewProcessor returned nil") 122 } 123 if p.db != database { 124 t.Error("Processor database not set correctly") 125 } 126 if p.useCache != tt.useCache { 127 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) 128 } 129 if tt.useCache && p.userCache == nil { 130 t.Error("Cache enabled but userCache is nil") 131 } 132 if !tt.useCache && p.userCache != nil { 133 t.Error("Cache disabled but userCache is not nil") 134 } 135 }) 136 } 137} 138 139func TestProcessManifest_ImageManifest(t *testing.T) { 140 database := setupTestDB(t) 141 defer database.Close() 142 143 p := NewProcessor(database, false, nil) 144 ctx := context.Background() 145 146 // Create test manifest record 147 manifestRecord := &atproto.ManifestRecord{ 148 Repository: "test-app", 149 Digest: "sha256:abc123", 150 MediaType: "application/vnd.oci.image.manifest.v1+json", 151 SchemaVersion: 2, 152 HoldEndpoint: "did:web:hold01.atcr.io", 153 CreatedAt: time.Now(), 154 Config: &atproto.BlobReference{ 155 Digest: "sha256:config123", 156 Size: 1234, 157 }, 158 Layers: []atproto.BlobReference{ 159 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 160 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 161 }, 162 Annotations: map[string]string{ 163 "org.opencontainers.image.title": "Test App", 164 "org.opencontainers.image.description": "A test application", 165 "org.opencontainers.image.source": "https://github.com/test/app", 166 "org.opencontainers.image.licenses": "MIT", 167 "io.atcr.icon": "https://example.com/icon.png", 168 }, 169 } 170 171 // Marshal to bytes for ProcessManifest 172 recordBytes, err := json.Marshal(manifestRecord) 173 if err != nil { 174 t.Fatalf("Failed to marshal manifest: %v", err) 175 } 176 177 // Process manifest 178 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 179 if err != nil { 180 t.Fatalf("ProcessManifest failed: %v", err) 181 } 182 if manifestID == 0 { 183 t.Error("Expected non-zero manifest ID") 184 } 185 186 // Verify manifest was inserted 187 var count int 188 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", 189 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) 190 if err != nil { 191 t.Fatalf("Failed to query manifests: %v", err) 192 } 193 if count != 1 { 194 t.Errorf("Expected 1 manifest, got %d", count) 195 } 196 197 // Verify annotations were stored in repository_annotations table 198 var title, source string 199 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 200 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) 201 if err != nil { 202 t.Fatalf("Failed to query title annotation: %v", err) 203 } 204 if title != "Test App" { 205 t.Errorf("title = %q, want %q", title, "Test App") 206 } 207 208 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 209 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) 210 if err != nil { 211 t.Fatalf("Failed to query source annotation: %v", err) 212 } 213 if source != "https://github.com/test/app" { 214 t.Errorf("source = %q, want %q", source, "https://github.com/test/app") 215 } 216 217 // Verify layers were inserted 218 var layerCount int 219 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 220 if err != nil { 221 t.Fatalf("Failed to query layers: %v", err) 222 } 223 if layerCount != 2 { 224 t.Errorf("Expected 2 layers, got %d", layerCount) 225 } 226 227 // Verify no manifest references (this is an image, not a list) 228 var refCount int 229 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 230 if err != nil { 231 t.Fatalf("Failed to query manifest_references: %v", err) 232 } 233 if refCount != 0 { 234 t.Errorf("Expected 0 manifest references, got %d", refCount) 235 } 236} 237 238func TestProcessManifest_ManifestList(t *testing.T) { 239 database := setupTestDB(t) 240 defer database.Close() 241 242 p := NewProcessor(database, false, nil) 243 ctx := context.Background() 244 245 // Create test manifest list record 246 manifestRecord := &atproto.ManifestRecord{ 247 Repository: "test-app", 248 Digest: "sha256:list123", 249 MediaType: "application/vnd.oci.image.index.v1+json", 250 SchemaVersion: 2, 251 HoldEndpoint: "did:web:hold01.atcr.io", 252 CreatedAt: time.Now(), 253 Manifests: []atproto.ManifestReference{ 254 { 255 Digest: "sha256:amd64manifest", 256 MediaType: "application/vnd.oci.image.manifest.v1+json", 257 Size: 1000, 258 Platform: &atproto.Platform{ 259 Architecture: "amd64", 260 OS: "linux", 261 }, 262 }, 263 { 264 Digest: "sha256:arm64manifest", 265 MediaType: "application/vnd.oci.image.manifest.v1+json", 266 Size: 1100, 267 Platform: &atproto.Platform{ 268 Architecture: "arm64", 269 OS: "linux", 270 Variant: "v8", 271 }, 272 }, 273 }, 274 } 275 276 // Marshal to bytes for ProcessManifest 277 recordBytes, err := json.Marshal(manifestRecord) 278 if err != nil { 279 t.Fatalf("Failed to marshal manifest: %v", err) 280 } 281 282 // Process manifest list 283 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 284 if err != nil { 285 t.Fatalf("ProcessManifest failed: %v", err) 286 } 287 288 // Verify manifest references were inserted 289 var refCount int 290 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 291 if err != nil { 292 t.Fatalf("Failed to query manifest_references: %v", err) 293 } 294 if refCount != 2 { 295 t.Errorf("Expected 2 manifest references, got %d", refCount) 296 } 297 298 // Verify platform info was stored 299 var arch, os string 300 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) 301 if err != nil { 302 t.Fatalf("Failed to query platform info: %v", err) 303 } 304 if arch != "amd64" { 305 t.Errorf("platform_architecture = %q, want %q", arch, "amd64") 306 } 307 if os != "linux" { 308 t.Errorf("platform_os = %q, want %q", os, "linux") 309 } 310 311 // Verify no layers (this is a list, not an image) 312 var layerCount int 313 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 314 if err != nil { 315 t.Fatalf("Failed to query layers: %v", err) 316 } 317 if layerCount != 0 { 318 t.Errorf("Expected 0 layers, got %d", layerCount) 319 } 320} 321 322func TestProcessTag(t *testing.T) { 323 database := setupTestDB(t) 324 defer database.Close() 325 326 p := NewProcessor(database, false, nil) 327 ctx := context.Background() 328 329 // Create test tag record (using ManifestDigest field for simplicity) 330 tagRecord := &atproto.TagRecord{ 331 Repository: "test-app", 332 Tag: "latest", 333 ManifestDigest: "sha256:abc123", 334 UpdatedAt: time.Now(), 335 } 336 337 // Marshal to bytes for ProcessTag 338 recordBytes, err := json.Marshal(tagRecord) 339 if err != nil { 340 t.Fatalf("Failed to marshal tag: %v", err) 341 } 342 343 // Process tag 344 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 345 if err != nil { 346 t.Fatalf("ProcessTag failed: %v", err) 347 } 348 349 // Verify tag was inserted 350 var count int 351 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 352 "did:plc:test123", "test-app", "latest").Scan(&count) 353 if err != nil { 354 t.Fatalf("Failed to query tags: %v", err) 355 } 356 if count != 1 { 357 t.Errorf("Expected 1 tag, got %d", count) 358 } 359 360 // Verify digest was stored 361 var digest string 362 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 363 "did:plc:test123", "test-app", "latest").Scan(&digest) 364 if err != nil { 365 t.Fatalf("Failed to query tag digest: %v", err) 366 } 367 if digest != "sha256:abc123" { 368 t.Errorf("digest = %q, want %q", digest, "sha256:abc123") 369 } 370 371 // Test upserting same tag with new digest 372 tagRecord.ManifestDigest = "sha256:newdigest" 373 recordBytes, err = json.Marshal(tagRecord) 374 if err != nil { 375 t.Fatalf("Failed to marshal tag: %v", err) 376 } 377 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 378 if err != nil { 379 t.Fatalf("ProcessTag (upsert) failed: %v", err) 380 } 381 382 // Verify tag was updated 383 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 384 "did:plc:test123", "test-app", "latest").Scan(&digest) 385 if err != nil { 386 t.Fatalf("Failed to query updated tag: %v", err) 387 } 388 if digest != "sha256:newdigest" { 389 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") 390 } 391 392 // Verify still only one tag (upsert, not insert) 393 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 394 "did:plc:test123", "test-app", "latest").Scan(&count) 395 if err != nil { 396 t.Fatalf("Failed to query tags after upsert: %v", err) 397 } 398 if count != 1 { 399 t.Errorf("Expected 1 tag after upsert, got %d", count) 400 } 401} 402 403func TestProcessStar(t *testing.T) { 404 database := setupTestDB(t) 405 defer database.Close() 406 407 p := NewProcessor(database, false, nil) 408 ctx := context.Background() 409 410 // Create test star record 411 starRecord := &atproto.StarRecord{ 412 Subject: atproto.StarSubject{ 413 DID: "did:plc:owner123", 414 Repository: "test-app", 415 }, 416 CreatedAt: time.Now(), 417 } 418 419 // Marshal to bytes for ProcessStar 420 recordBytes, err := json.Marshal(starRecord) 421 if err != nil { 422 t.Fatalf("Failed to marshal star: %v", err) 423 } 424 425 // Process star 426 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 427 if err != nil { 428 t.Fatalf("ProcessStar failed: %v", err) 429 } 430 431 // Verify star was inserted 432 var count int 433 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 434 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 435 if err != nil { 436 t.Fatalf("Failed to query stars: %v", err) 437 } 438 if count != 1 { 439 t.Errorf("Expected 1 star, got %d", count) 440 } 441 442 // Test upserting same star (should be idempotent) 443 recordBytes, err = json.Marshal(starRecord) 444 if err != nil { 445 t.Fatalf("Failed to marshal star: %v", err) 446 } 447 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 448 if err != nil { 449 t.Fatalf("ProcessStar (upsert) failed: %v", err) 450 } 451 452 // Verify still only one star 453 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 454 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 455 if err != nil { 456 t.Fatalf("Failed to query stars after upsert: %v", err) 457 } 458 if count != 1 { 459 t.Errorf("Expected 1 star after upsert, got %d", count) 460 } 461} 462 463func TestProcessManifest_Duplicate(t *testing.T) { 464 database := setupTestDB(t) 465 defer database.Close() 466 467 p := NewProcessor(database, false, nil) 468 ctx := context.Background() 469 470 manifestRecord := &atproto.ManifestRecord{ 471 Repository: "test-app", 472 Digest: "sha256:abc123", 473 MediaType: "application/vnd.oci.image.manifest.v1+json", 474 SchemaVersion: 2, 475 HoldEndpoint: "did:web:hold01.atcr.io", 476 CreatedAt: time.Now(), 477 } 478 479 // Marshal to bytes for ProcessManifest 480 recordBytes, err := json.Marshal(manifestRecord) 481 if err != nil { 482 t.Fatalf("Failed to marshal manifest: %v", err) 483 } 484 485 // Insert first time 486 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 487 if err != nil { 488 t.Fatalf("First ProcessManifest failed: %v", err) 489 } 490 491 // Insert duplicate 492 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 493 if err != nil { 494 t.Fatalf("Duplicate ProcessManifest failed: %v", err) 495 } 496 497 // Should return existing ID 498 if id1 != id2 { 499 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) 500 } 501 502 // Verify only one manifest exists 503 var count int 504 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", 505 "did:plc:test123", "sha256:abc123").Scan(&count) 506 if err != nil { 507 t.Fatalf("Failed to query manifests: %v", err) 508 } 509 if count != 1 { 510 t.Errorf("Expected 1 manifest, got %d", count) 511 } 512} 513 514func TestProcessManifest_EmptyAnnotations(t *testing.T) { 515 database := setupTestDB(t) 516 defer database.Close() 517 518 p := NewProcessor(database, false, nil) 519 ctx := context.Background() 520 521 // Manifest with nil annotations 522 manifestRecord := &atproto.ManifestRecord{ 523 Repository: "test-app", 524 Digest: "sha256:abc123", 525 MediaType: "application/vnd.oci.image.manifest.v1+json", 526 SchemaVersion: 2, 527 HoldEndpoint: "did:web:hold01.atcr.io", 528 CreatedAt: time.Now(), 529 Annotations: nil, 530 } 531 532 // Marshal to bytes for ProcessManifest 533 recordBytes, err := json.Marshal(manifestRecord) 534 if err != nil { 535 t.Fatalf("Failed to marshal manifest: %v", err) 536 } 537 538 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 539 if err != nil { 540 t.Fatalf("ProcessManifest failed: %v", err) 541 } 542 543 // Verify no annotations were stored (nil annotations should not create entries) 544 var annotationCount int 545 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", 546 "did:plc:test123", "test-app").Scan(&annotationCount) 547 if err != nil { 548 t.Fatalf("Failed to query annotations: %v", err) 549 } 550 if annotationCount != 0 { 551 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) 552 } 553} 554 555func TestProcessIdentity(t *testing.T) { 556 db := setupTestDB(t) 557 defer db.Close() 558 559 processor := NewProcessor(db, false, nil) 560 561 // Setup: Create test user 562 testDID := "did:plc:alice123" 563 testHandle := "alice.bsky.social" 564 testPDS := "https://bsky.social" 565 _, err := db.Exec(` 566 INSERT INTO users (did, handle, pds_endpoint, last_seen) 567 VALUES (?, ?, ?, ?) 568 `, testDID, testHandle, testPDS, time.Now()) 569 if err != nil { 570 t.Fatalf("Failed to insert test user: %v", err) 571 } 572 573 // Test 1: Process identity change event 574 newHandle := "alice-new.bsky.social" 575 err = processor.ProcessIdentity(context.Background(), testDID, newHandle) 576 // Note: This will fail to invalidate cache since we don't have a real identity directory, 577 // but we can still verify the database update happened 578 if err != nil { 579 t.Logf("Expected cache invalidation error (no real directory): %v", err) 580 } 581 582 // Verify handle was updated in database 583 var retrievedHandle string 584 err = db.QueryRow(` 585 SELECT handle FROM users WHERE did = ? 586 `, testDID).Scan(&retrievedHandle) 587 if err != nil { 588 t.Fatalf("Failed to query updated user: %v", err) 589 } 590 if retrievedHandle != newHandle { 591 t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle) 592 } 593 594 // Test 2: Process identity change for non-existent user 595 // Should not error (UPDATE just affects 0 rows) 596 err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle") 597 if err != nil { 598 t.Logf("Expected cache invalidation error: %v", err) 599 } 600 601 // Test 3: Process multiple identity changes 602 handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"} 603 for _, handle := range handles { 604 err = processor.ProcessIdentity(context.Background(), testDID, handle) 605 if err != nil { 606 t.Logf("Expected cache invalidation error: %v", err) 607 } 608 609 err = db.QueryRow(` 610 SELECT handle FROM users WHERE did = ? 611 `, testDID).Scan(&retrievedHandle) 612 if err != nil { 613 t.Fatalf("Failed to query user after handle update: %v", err) 614 } 615 if retrievedHandle != handle { 616 t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle) 617 } 618 } 619} 620 621func TestProcessAccount(t *testing.T) { 622 db := setupTestDB(t) 623 defer db.Close() 624 625 processor := NewProcessor(db, false, nil) 626 627 // Setup: Create test user 628 testDID := "did:plc:bob456" 629 testHandle := "bob.bsky.social" 630 testPDS := "https://bsky.social" 631 _, err := db.Exec(` 632 INSERT INTO users (did, handle, pds_endpoint, last_seen) 633 VALUES (?, ?, ?, ?) 634 `, testDID, testHandle, testPDS, time.Now()) 635 if err != nil { 636 t.Fatalf("Failed to insert test user: %v", err) 637 } 638 639 // Test 1: Process account deactivation event 640 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") 641 // Note: Cache invalidation will fail without real directory, but that's expected 642 if err != nil { 643 t.Logf("Expected cache invalidation error (no real directory): %v", err) 644 } 645 646 // Verify user still exists in database (we don't delete on deactivation) 647 var exists bool 648 err = db.QueryRow(` 649 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 650 `, testDID).Scan(&exists) 651 if err != nil { 652 t.Fatalf("Failed to check if user exists: %v", err) 653 } 654 if !exists { 655 t.Error("User should still exist after deactivation event (no deletion)") 656 } 657 658 // Test 2: Process account with active=true (should be ignored) 659 err = processor.ProcessAccount(context.Background(), testDID, true, "active") 660 if err != nil { 661 t.Errorf("Expected no error for active account, got: %v", err) 662 } 663 664 // Test 3: Process account with status != "deactivated" (should be ignored) 665 err = processor.ProcessAccount(context.Background(), testDID, false, "suspended") 666 if err != nil { 667 t.Errorf("Expected no error for non-deactivated status, got: %v", err) 668 } 669 670 // Test 4: Process account deactivation for non-existent user 671 err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated") 672 // Cache invalidation will fail, but that's expected 673 if err != nil { 674 t.Logf("Expected cache invalidation error: %v", err) 675 } 676 677 // Test 5: Process multiple deactivation events (idempotent) 678 for i := 0; i < 3; i++ { 679 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") 680 if err != nil { 681 t.Logf("Expected cache invalidation error on iteration %d: %v", i, err) 682 } 683 } 684 685 // User should still exist after multiple deactivations 686 err = db.QueryRow(` 687 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 688 `, testDID).Scan(&exists) 689 if err != nil { 690 t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err) 691 } 692 if !exists { 693 t.Error("User should still exist after multiple deactivation events") 694 } 695 696 // Test 6: Process account deletion - should delete user data 697 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") 698 if err != nil { 699 t.Logf("Cache invalidation error during deletion (expected): %v", err) 700 } 701 702 // User should be deleted after "deleted" status 703 err = db.QueryRow(` 704 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 705 `, testDID).Scan(&exists) 706 if err != nil { 707 t.Fatalf("Failed to check if user exists after deletion: %v", err) 708 } 709 if exists { 710 t.Error("User should NOT exist after deletion event") 711 } 712 713 // Test 7: Process deletion for already-deleted user (idempotent) 714 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") 715 if err != nil { 716 t.Errorf("Deletion of non-existent user should not error, got: %v", err) 717 } 718}