A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8d39daa09d8d9d5066c9fb336c61ef5bfb8a0b1f 820 lines 27 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "atcr.io/pkg/appview/db" 12 "atcr.io/pkg/atproto" 13 "github.com/bluesky-social/indigo/atproto/identity" 14 "github.com/bluesky-social/indigo/atproto/lexicon" 15) 16 17// Processor handles shared database operations for both Worker (live) and Backfill (sync) 18// This eliminates code duplication between the two data ingestion paths 19type Processor struct { 20 db db.DBTX 21 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill 22 statsCache *StatsCache // In-memory cache for per-hold stats aggregation 23 useCache bool 24 catalog *lexicon.ResolvingCatalog // For debug logging of validation failures 25} 26 27// NewProcessor creates a new shared processor 28// useCache: true for Worker (live streaming), false for Backfill (batch processing) 29// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing) 30func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor { 31 // Create lexicon catalog for debug validation logging 32 dir := identity.DefaultDirectory() 33 catalog := lexicon.NewResolvingCatalog() 34 catalog.Directory = dir 35 36 p := &Processor{ 37 db: database, 38 useCache: useCache, 39 statsCache: statsCache, 40 catalog: catalog, 41 } 42 43 if useCache { 44 p.userCache = &UserCache{ 45 cache: make(map[string]*db.User), 46 } 47 } 48 49 return p 50} 51 52// EnsureUser resolves and upserts a user by DID 53// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) 54func (p *Processor) EnsureUser(ctx context.Context, did string) error { 55 // Check cache first (if enabled) 56 if p.useCache && p.userCache != nil { 57 if _, ok := p.userCache.cache[did]; ok { 58 // User in cache - just update last seen timestamp 59 return db.UpdateUserLastSeen(p.db, did) 60 } 61 } else if !p.useCache { 62 // No cache - check if user already exists in DB 63 existingUser, err := db.GetUserByDID(p.db, did) 64 if err == nil && existingUser != nil { 65 // User exists - just update last seen timestamp 66 return db.UpdateUserLastSeen(p.db, did) 67 } 68 } 69 70 // Resolve DID to get handle and PDS endpoint 71 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) 72 if err != nil { 73 return err 74 } 75 76 // Fetch user's Bluesky profile record from their PDS (including avatar) 77 avatarURL := "" 78 client := atproto.NewClient(pdsEndpoint, "", "") 79 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) 80 if err != nil { 81 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) 82 // Continue without avatar 83 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { 84 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) 85 } 86 87 // Create user record 88 user := &db.User{ 89 DID: resolvedDID, 90 Handle: handle, 91 PDSEndpoint: pdsEndpoint, 92 Avatar: avatarURL, 93 LastSeen: time.Now(), 94 } 95 96 // Cache if enabled 97 if p.useCache { 98 p.userCache.cache[did] = user 99 } 100 101 // Upsert to database 102 // Use UpsertUser if we successfully fetched an avatar (to update existing users) 103 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars) 104 if avatarURL != "" { 105 return db.UpsertUser(p.db, user) 106 } 107 return db.UpsertUserIgnoreAvatar(p.db, user) 108} 109 110// ValidateRecord performs validation on records. 111// - Full lexicon validation is logged for debugging but does NOT block ingestion 112// - Targeted validation (captain/crew DID checks) DOES block bogus records 113func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error { 114 var recordData map[string]any 115 if err := json.Unmarshal(data, &recordData); err != nil { 116 return fmt.Errorf("invalid JSON: %w", err) 117 } 118 119 // Debug: Full lexicon validation (log only, don't block) 120 if p.catalog != nil { 121 if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil { 122 slog.Debug("Record failed full lexicon validation (ingesting anyway)", 123 "component", "processor", 124 "collection", collection, 125 "error", err) 126 } 127 } 128 129 // Targeted validation for collections that had bogus data issues 130 // These DO block ingestion 131 switch collection { 132 case atproto.CaptainCollection: 133 // Captain must have non-empty owner DID 134 owner, _ := recordData["owner"].(string) 135 if owner == "" || !strings.HasPrefix(owner, "did:") { 136 return fmt.Errorf("captain record missing or invalid owner DID") 137 } 138 139 case atproto.CrewCollection: 140 // Crew must have non-empty member DID 141 member, _ := recordData["member"].(string) 142 if member == "" || !strings.HasPrefix(member, "did:") { 143 return fmt.Errorf("crew record missing or invalid member DID") 144 } 145 } 146 147 return nil 148} 149 150// ProcessRecord is the unified entry point for processing any ATCR record. 151// It handles: 152// 1. Schema validation against published lexicons 153// 2. User creation for user-activity collections 154// 3. Dispatch to the appropriate Process* method 155// 156// queryCaptainFn is optional - used by backfill for sailor profile processing 157func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error { 158 // Skip validation for deletes (no record data) 159 if !isDelete && data != nil { 160 if err := p.ValidateRecord(ctx, collection, data); err != nil { 161 slog.Warn("Record failed schema validation, skipping", 162 "component", "processor", 163 "collection", collection, 164 "did", did, 165 "error", err) 166 return nil // Skip invalid records silently 167 } 168 } 169 170 // User-activity collections create/update user entries 171 // Skip for deletes - user should already exist, and we don't need to resolve identity 172 if !isDelete { 173 switch collection { 174 case atproto.ManifestCollection, 175 atproto.TagCollection, 176 atproto.StarCollection, 177 atproto.RepoPageCollection, 178 atproto.SailorProfileCollection: 179 if err := p.EnsureUser(ctx, did); err != nil { 180 return fmt.Errorf("failed to ensure user: %w", err) 181 } 182 // Hold collections (captain, crew, stats) - don't create user entries 183 // These are records FROM holds, not user activity 184 } 185 } 186 187 // Dispatch to specific handler 188 switch collection { 189 case atproto.ManifestCollection: 190 if isDelete { 191 return db.DeleteManifest(p.db, did, "", rkey) 192 } 193 _, err := p.ProcessManifest(ctx, did, data) 194 return err 195 196 case atproto.TagCollection: 197 if isDelete { 198 repo, tag := atproto.RKeyToRepositoryTag(rkey) 199 return db.DeleteTag(p.db, did, repo, tag) 200 } 201 return p.ProcessTag(ctx, did, data) 202 203 case atproto.StarCollection: 204 if isDelete { 205 ownerDID, repository, err := atproto.ParseStarRecordKey(rkey) 206 if err != nil { 207 return err 208 } 209 return db.DeleteStar(p.db, did, ownerDID, repository) 210 } 211 return p.ProcessStar(ctx, did, data) 212 213 case atproto.RepoPageCollection: 214 return p.ProcessRepoPage(ctx, did, rkey, data, isDelete) 215 216 case atproto.SailorProfileCollection: 217 return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn) 218 219 case atproto.StatsCollection: 220 return p.ProcessStats(ctx, did, data, isDelete) 221 222 case atproto.CaptainCollection: 223 if isDelete { 224 return db.DeleteCaptainRecord(p.db, did) 225 } 226 return p.ProcessCaptain(ctx, did, data) 227 228 case atproto.CrewCollection: 229 if isDelete { 230 return db.DeleteCrewMemberByRkey(p.db, did, rkey) 231 } 232 return p.ProcessCrew(ctx, did, rkey, data) 233 234 default: 235 return nil // Unknown collection, ignore 236 } 237} 238 239// ProcessManifest processes a manifest record and stores it in the database 240// Returns the manifest ID for further processing (layers/references) 241func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { 242 // Unmarshal manifest record 243 var manifestRecord atproto.ManifestRecord 244 if err := json.Unmarshal(recordData, &manifestRecord); err != nil { 245 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) 246 } 247 // Detect manifest type 248 isManifestList := len(manifestRecord.Manifests) > 0 249 250 // Extract hold DID from manifest (with fallback for legacy manifests) 251 // New manifests use holdDid field (DID format) 252 // Old manifests use holdEndpoint field (URL format) - convert to DID 253 holdDID := manifestRecord.HoldDID 254 if holdDID == "" && manifestRecord.HoldEndpoint != "" { 255 // Legacy manifest - convert URL to DID 256 holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint) 257 } 258 259 // Detect artifact type from config media type 260 artifactType := "container-image" 261 if !isManifestList && manifestRecord.Config != nil { 262 artifactType = db.GetArtifactType(manifestRecord.Config.MediaType) 263 } 264 265 // Prepare manifest for insertion (WITHOUT annotation fields) 266 manifest := &db.Manifest{ 267 DID: did, 268 Repository: manifestRecord.Repository, 269 Digest: manifestRecord.Digest, 270 MediaType: manifestRecord.MediaType, 271 SchemaVersion: manifestRecord.SchemaVersion, 272 HoldEndpoint: holdDID, 273 ArtifactType: artifactType, 274 CreatedAt: manifestRecord.CreatedAt, 275 // Annotations removed - stored separately in repository_annotations table 276 } 277 278 // Set config fields only for image manifests (not manifest lists) 279 if !isManifestList && manifestRecord.Config != nil { 280 manifest.ConfigDigest = manifestRecord.Config.Digest 281 manifest.ConfigSize = manifestRecord.Config.Size 282 } 283 284 // Insert manifest 285 manifestID, err := db.InsertManifest(p.db, manifest) 286 if err != nil { 287 // For backfill: if manifest already exists, get its ID 288 if strings.Contains(err.Error(), "UNIQUE constraint failed") { 289 var existingID int64 290 err := p.db.QueryRow(` 291 SELECT id FROM manifests 292 WHERE did = ? AND repository = ? AND digest = ? 293 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 294 295 if err != nil { 296 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) 297 } 298 manifestID = existingID 299 } else { 300 return 0, fmt.Errorf("failed to insert manifest: %w", err) 301 } 302 } 303 304 // Update repository annotations ONLY if manifest has at least one non-empty annotation 305 if manifestRecord.Annotations != nil { 306 hasData := false 307 for _, value := range manifestRecord.Annotations { 308 if value != "" { 309 hasData = true 310 break 311 } 312 } 313 314 if hasData { 315 // Replace all annotations for this repository 316 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations) 317 if err != nil { 318 return 0, fmt.Errorf("failed to upsert annotations: %w", err) 319 } 320 } 321 } 322 323 // Insert manifest references or layers 324 if isManifestList { 325 // Insert manifest references (for manifest lists/indexes) 326 for i, ref := range manifestRecord.Manifests { 327 platformArch := "" 328 platformOS := "" 329 platformVariant := "" 330 platformOSVersion := "" 331 332 if ref.Platform != nil { 333 platformArch = ref.Platform.Architecture 334 platformOS = ref.Platform.OS 335 platformVariant = ref.Platform.Variant 336 platformOSVersion = ref.Platform.OSVersion 337 } 338 339 // Detect attestation manifests from annotations 340 isAttestation := false 341 if ref.Annotations != nil { 342 if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok { 343 isAttestation = refType == "attestation-manifest" 344 } 345 } 346 347 if err := db.InsertManifestReference(p.db, &db.ManifestReference{ 348 ManifestID: manifestID, 349 Digest: ref.Digest, 350 MediaType: ref.MediaType, 351 Size: ref.Size, 352 PlatformArchitecture: platformArch, 353 PlatformOS: platformOS, 354 PlatformVariant: platformVariant, 355 PlatformOSVersion: platformOSVersion, 356 IsAttestation: isAttestation, 357 ReferenceIndex: i, 358 }); err != nil { 359 // Continue on error - reference might already exist 360 continue 361 } 362 } 363 } else { 364 // Insert layers (for image manifests) 365 for i, layer := range manifestRecord.Layers { 366 if err := db.InsertLayer(p.db, &db.Layer{ 367 ManifestID: manifestID, 368 Digest: layer.Digest, 369 MediaType: layer.MediaType, 370 Size: layer.Size, 371 LayerIndex: i, 372 }); err != nil { 373 // Continue on error - layer might already exist 374 continue 375 } 376 } 377 } 378 379 return manifestID, nil 380} 381 382// ProcessTag processes a tag record and stores it in the database 383func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { 384 // Unmarshal tag record 385 var tagRecord atproto.TagRecord 386 if err := json.Unmarshal(recordData, &tagRecord); err != nil { 387 return fmt.Errorf("failed to unmarshal tag: %w", err) 388 } 389 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 390 manifestDigest, err := tagRecord.GetManifestDigest() 391 if err != nil { 392 return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 393 } 394 395 // Insert or update tag 396 return db.UpsertTag(p.db, &db.Tag{ 397 DID: did, 398 Repository: tagRecord.Repository, 399 Tag: tagRecord.Tag, 400 Digest: manifestDigest, 401 CreatedAt: tagRecord.UpdatedAt, 402 }) 403} 404 405// ProcessStar processes a star record and stores it in the database 406func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { 407 // Unmarshal star record (handles both old object and new AT URI subject formats) 408 var starRecord atproto.StarRecord 409 if err := json.Unmarshal(recordData, &starRecord); err != nil { 410 return fmt.Errorf("failed to unmarshal star: %w", err) 411 } 412 413 // Extract owner DID and repository from subject AT URI 414 ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository() 415 if err != nil { 416 return fmt.Errorf("failed to parse star subject: %w", err) 417 } 418 419 // Ensure the starred repository's owner exists in the users table 420 // (the starrer is already ensured by ProcessRecord, but the owner 421 // may not have been processed yet during backfill or live events) 422 if err := p.EnsureUser(ctx, ownerDID); err != nil { 423 return fmt.Errorf("failed to ensure star subject user: %w", err) 424 } 425 426 // Upsert the star record (idempotent - won't duplicate) 427 // The DID here is the starrer (user who starred) 428 // Star count will be calculated on demand from the stars table 429 return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt) 430} 431 432// ProcessSailorProfile processes a sailor profile record 433// This is primarily used by backfill to cache captain records for holds 434func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { 435 // Unmarshal sailor profile record 436 var profileRecord atproto.SailorProfileRecord 437 if err := json.Unmarshal(recordData, &profileRecord); err != nil { 438 return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 439 } 440 441 // Skip if no default hold set 442 if profileRecord.DefaultHold == "" { 443 return nil 444 } 445 446 // Convert hold URL/DID to canonical DID 447 holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) 448 if holdDID == "" { 449 slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold) 450 return nil 451 } 452 453 // Query and cache the captain record using provided function 454 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here 455 if queryCaptainFn != nil { 456 return queryCaptainFn(ctx, holdDID) 457 } 458 459 return nil 460} 461 462// ProcessRepoPage processes a repository page record 463// This is called when Jetstream receives a repo page create/update event 464func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error { 465 if isDelete { 466 // Delete the repo page from our cache 467 return db.DeleteRepoPage(p.db, did, rkey) 468 } 469 470 // Unmarshal repo page record 471 var pageRecord atproto.RepoPageRecord 472 if err := json.Unmarshal(recordData, &pageRecord); err != nil { 473 return fmt.Errorf("failed to unmarshal repo page: %w", err) 474 } 475 476 // Extract avatar CID if present 477 avatarCID := "" 478 if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" { 479 avatarCID = pageRecord.Avatar.Ref.Link 480 } 481 482 // Upsert to database 483 return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt) 484} 485 486// ProcessIdentity handles identity change events (handle updates) 487// This is called when Jetstream receives an identity event indicating a handle change. 488// The identity cache is invalidated to ensure the next lookup uses the new handle, 489// and the database is updated to reflect the change in the UI. 490// 491// Only processes events for users who already exist in our database (have ATCR activity). 492func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error { 493 // Check if user exists in our database - only update if they're an ATCR user 494 user, err := db.GetUserByDID(p.db, did) 495 if err != nil { 496 return fmt.Errorf("failed to check user existence: %w", err) 497 } 498 499 // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.) 500 if user == nil { 501 return nil 502 } 503 504 // Update handle in database 505 if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil { 506 slog.Warn("Failed to update user handle in database", 507 "component", "processor", 508 "did", did, 509 "handle", newHandle, 510 "error", err) 511 // Continue to invalidate cache even if DB update fails 512 } 513 514 // Invalidate cached identity data to force re-resolution on next lookup 515 if err := atproto.InvalidateIdentity(ctx, did); err != nil { 516 slog.Warn("Failed to invalidate identity cache", 517 "component", "processor", 518 "did", did, 519 "error", err) 520 return err 521 } 522 523 slog.Info("Processed identity change event", 524 "component", "processor", 525 "did", did, 526 "old_handle", user.Handle, 527 "new_handle", newHandle) 528 529 return nil 530} 531 532// ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users 533// This refreshes the cached avatar URL when a user changes their Bluesky profile picture 534func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error { 535 // Check if user exists in our database - only update if they're an ATCR user 536 user, err := db.GetUserByDID(p.db, did) 537 if err != nil { 538 return fmt.Errorf("failed to check user existence: %w", err) 539 } 540 541 // Skip if user doesn't exist - they don't have any ATCR activity 542 if user == nil { 543 return nil 544 } 545 546 // Parse the profile record to extract avatar 547 var profile struct { 548 Avatar *atproto.ATProtoBlobRef `json:"avatar"` 549 } 550 if err := json.Unmarshal(recordData, &profile); err != nil { 551 return fmt.Errorf("failed to unmarshal profile: %w", err) 552 } 553 554 // Build new avatar URL 555 avatarURL := "" 556 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" { 557 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) 558 } 559 560 // Update if changed 561 if avatarURL != user.Avatar { 562 slog.Info("Updating avatar from profile change", 563 "component", "processor", 564 "did", did, 565 "old_avatar", user.Avatar, 566 "new_avatar", avatarURL) 567 return db.UpdateUserAvatar(p.db, did, avatarURL) 568 } 569 570 return nil 571} 572 573// RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar 574// This is called during backfill to ensure avatars stay fresh for existing users 575func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error { 576 // Get user from database to compare avatar 577 user, err := db.GetUserByDID(p.db, did) 578 if err != nil || user == nil { 579 return nil // User doesn't exist, skip 580 } 581 582 // Fetch profile from PDS 583 client := atproto.NewClient(pdsEndpoint, "", "") 584 profile, err := client.GetProfileRecord(ctx, did) 585 if err != nil { 586 return fmt.Errorf("failed to fetch profile: %w", err) 587 } 588 589 // Build avatar URL 590 avatarURL := "" 591 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" { 592 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) 593 } 594 595 // Update if changed 596 if avatarURL != user.Avatar { 597 slog.Info("Backfill refreshing avatar", 598 "component", "processor", 599 "did", did, 600 "old_avatar", user.Avatar, 601 "new_avatar", avatarURL) 602 return db.UpdateUserAvatar(p.db, did, avatarURL) 603 } 604 605 return nil 606} 607 608// ProcessStats handles stats record events from hold PDSes 609// This is called when Jetstream receives a stats create/update/delete event from a hold 610// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository 611func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { 612 // Skip if no stats cache configured 613 if p.statsCache == nil { 614 return nil 615 } 616 617 // Unmarshal stats record 618 var statsRecord atproto.StatsRecord 619 if err := json.Unmarshal(recordData, &statsRecord); err != nil { 620 return fmt.Errorf("failed to unmarshal stats record: %w", err) 621 } 622 623 if isDelete { 624 // Delete from in-memory cache 625 p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository) 626 } else { 627 // Parse timestamps 628 var lastPull, lastPush *time.Time 629 if statsRecord.LastPull != "" { 630 t, err := time.Parse(time.RFC3339, statsRecord.LastPull) 631 if err == nil { 632 lastPull = &t 633 } 634 } 635 if statsRecord.LastPush != "" { 636 t, err := time.Parse(time.RFC3339, statsRecord.LastPush) 637 if err == nil { 638 lastPush = &t 639 } 640 } 641 642 // Update in-memory cache 643 p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository, 644 statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush) 645 } 646 647 // Get aggregated stats across all holds 648 totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated( 649 statsRecord.OwnerDID, statsRecord.Repository) 650 651 // Upsert aggregated stats to repository_stats 652 return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{ 653 DID: statsRecord.OwnerDID, 654 Repository: statsRecord.Repository, 655 PullCount: int(totalPull), 656 PushCount: int(totalPush), 657 LastPull: latestPull, 658 LastPush: latestPush, 659 }) 660} 661 662// ProcessCaptain handles captain record events from hold PDSes 663// This is called when Jetstream receives a captain create/update/delete event from a hold 664// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info 665func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error { 666 // Unmarshal captain record 667 var captainRecord atproto.CaptainRecord 668 if err := json.Unmarshal(recordData, &captainRecord); err != nil { 669 return fmt.Errorf("failed to unmarshal captain record: %w", err) 670 } 671 672 // Convert to db struct and upsert 673 record := &db.HoldCaptainRecord{ 674 HoldDID: holdDID, 675 OwnerDID: captainRecord.Owner, 676 Public: captainRecord.Public, 677 AllowAllCrew: captainRecord.AllowAllCrew, 678 DeployedAt: captainRecord.DeployedAt, 679 Region: captainRecord.Region, 680 UpdatedAt: time.Now(), 681 } 682 683 if err := db.UpsertCaptainRecord(p.db, record); err != nil { 684 return fmt.Errorf("failed to upsert captain record: %w", err) 685 } 686 687 slog.Info("Processed captain record", 688 "component", "processor", 689 "hold_did", holdDID, 690 "owner_did", captainRecord.Owner, 691 "public", captainRecord.Public, 692 "allow_all_crew", captainRecord.AllowAllCrew) 693 694 return nil 695} 696 697// ProcessCrew handles crew record events from hold PDSes 698// This is called when Jetstream receives a crew create/update/delete event from a hold 699// The holdDID is the DID of the hold PDS (event.DID), and the record contains member info 700func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error { 701 // Unmarshal crew record 702 var crewRecord atproto.CrewRecord 703 if err := json.Unmarshal(recordData, &crewRecord); err != nil { 704 return fmt.Errorf("failed to unmarshal crew record: %w", err) 705 } 706 707 // Marshal permissions to JSON string 708 permissionsJSON := "" 709 if len(crewRecord.Permissions) > 0 { 710 if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil { 711 permissionsJSON = string(jsonBytes) 712 } 713 } 714 715 // Convert to db struct and upsert 716 member := &db.CrewMember{ 717 HoldDID: holdDID, 718 MemberDID: crewRecord.Member, 719 Rkey: rkey, 720 Role: crewRecord.Role, 721 Permissions: permissionsJSON, 722 Tier: crewRecord.Tier, 723 AddedAt: crewRecord.AddedAt, 724 } 725 726 if err := db.UpsertCrewMember(p.db, member); err != nil { 727 return fmt.Errorf("failed to upsert crew member: %w", err) 728 } 729 730 slog.Debug("Processed crew record", 731 "component", "processor", 732 "hold_did", holdDID, 733 "member_did", crewRecord.Member, 734 "role", crewRecord.Role, 735 "permissions", crewRecord.Permissions) 736 737 return nil 738} 739 740// ProcessAccount handles account status events (deactivation/deletion/etc) 741// This is called when Jetstream receives an account event indicating status changes. 742// 743// Status handling: 744// - "deleted": Account permanently deleted - remove all cached data 745// - "deactivated": Could be PDS migration or permanent - invalidate cache only 746// - "takendown": Moderation action - invalidate cache only 747// - Other: Ignore 748// 749// For "deactivated", we don't delete data because it's ambiguous: 750// - Could be permanent deactivation (user deleted account) 751// - Could be PDS migration (account moves to new PDS) 752// Cache invalidation forces re-resolution on next lookup. 753// 754// Only processes events for users who already exist in our database (have ATCR activity). 755func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error { 756 // Skip active accounts or unknown statuses 757 if active { 758 return nil 759 } 760 761 // Check if user exists in our database - only process if they're an ATCR user 762 user, err := db.GetUserByDID(p.db, did) 763 if err != nil { 764 return fmt.Errorf("failed to check user existence: %w", err) 765 } 766 767 // Skip if user doesn't exist - they don't have any ATCR activity 768 if user == nil { 769 return nil 770 } 771 772 switch status { 773 case "deleted": 774 // Account permanently deleted - remove all cached data 775 if err := db.DeleteUserData(p.db, did); err != nil { 776 slog.Error("Failed to delete user data for deleted account", 777 "component", "processor", 778 "did", did, 779 "handle", user.Handle, 780 "error", err) 781 return err 782 } 783 784 // Also invalidate identity cache 785 _ = atproto.InvalidateIdentity(ctx, did) 786 787 slog.Info("Deleted user data for deleted account", 788 "component", "processor", 789 "did", did, 790 "handle", user.Handle) 791 792 case "deactivated", "takendown": 793 // Ambiguous status - invalidate cache but keep data 794 // For deactivated: could be PDS migration, will resolve on next lookup 795 // For takendown: moderation action, keep data in case of appeal 796 if err := atproto.InvalidateIdentity(ctx, did); err != nil { 797 slog.Warn("Failed to invalidate identity cache", 798 "component", "processor", 799 "did", did, 800 "status", status, 801 "error", err) 802 return err 803 } 804 805 slog.Info("Processed account status event - cache invalidated", 806 "component", "processor", 807 "did", did, 808 "handle", user.Handle, 809 "status", status) 810 811 default: 812 // Unknown status - ignore 813 slog.Debug("Ignoring unknown account status", 814 "component", "processor", 815 "did", did, 816 "status", status) 817 } 818 819 return nil 820}