A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "atcr.io/pkg/appview/db"
12 "atcr.io/pkg/atproto"
13 "github.com/bluesky-social/indigo/atproto/identity"
14 "github.com/bluesky-social/indigo/atproto/lexicon"
15)
16
17// Processor handles shared database operations for both Worker (live) and Backfill (sync)
18// This eliminates code duplication between the two data ingestion paths
19type Processor struct {
20 db db.DBTX
21 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
22 statsCache *StatsCache // In-memory cache for per-hold stats aggregation
23 useCache bool
24 catalog *lexicon.ResolvingCatalog // For debug logging of validation failures
25}
26
27// NewProcessor creates a new shared processor
28// useCache: true for Worker (live streaming), false for Backfill (batch processing)
29// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
30func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor {
31 // Create lexicon catalog for debug validation logging
32 dir := identity.DefaultDirectory()
33 catalog := lexicon.NewResolvingCatalog()
34 catalog.Directory = dir
35
36 p := &Processor{
37 db: database,
38 useCache: useCache,
39 statsCache: statsCache,
40 catalog: catalog,
41 }
42
43 if useCache {
44 p.userCache = &UserCache{
45 cache: make(map[string]*db.User),
46 }
47 }
48
49 return p
50}
51
52// EnsureUser resolves and upserts a user by DID
53// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
54func (p *Processor) EnsureUser(ctx context.Context, did string) error {
55 // Check cache first (if enabled)
56 if p.useCache && p.userCache != nil {
57 if _, ok := p.userCache.cache[did]; ok {
58 // User in cache - just update last seen timestamp
59 return db.UpdateUserLastSeen(p.db, did)
60 }
61 } else if !p.useCache {
62 // No cache - check if user already exists in DB
63 existingUser, err := db.GetUserByDID(p.db, did)
64 if err == nil && existingUser != nil {
65 // User exists - just update last seen timestamp
66 return db.UpdateUserLastSeen(p.db, did)
67 }
68 }
69
70 // Resolve DID to get handle and PDS endpoint
71 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
72 if err != nil {
73 return err
74 }
75
76 // Fetch user's Bluesky profile record from their PDS (including avatar)
77 avatarURL := ""
78 client := atproto.NewClient(pdsEndpoint, "", "")
79 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
80 if err != nil {
81 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
82 // Continue without avatar
83 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
84 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
85 }
86
87 // Create user record
88 user := &db.User{
89 DID: resolvedDID,
90 Handle: handle,
91 PDSEndpoint: pdsEndpoint,
92 Avatar: avatarURL,
93 LastSeen: time.Now(),
94 }
95
96 // Cache if enabled
97 if p.useCache {
98 p.userCache.cache[did] = user
99 }
100
101 // Upsert to database
102 // Use UpsertUser if we successfully fetched an avatar (to update existing users)
103 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
104 if avatarURL != "" {
105 return db.UpsertUser(p.db, user)
106 }
107 return db.UpsertUserIgnoreAvatar(p.db, user)
108}
109
110// ValidateRecord performs validation on records.
111// - Full lexicon validation is logged for debugging but does NOT block ingestion
112// - Targeted validation (captain/crew DID checks) DOES block bogus records
113func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error {
114 var recordData map[string]any
115 if err := json.Unmarshal(data, &recordData); err != nil {
116 return fmt.Errorf("invalid JSON: %w", err)
117 }
118
119 // Debug: Full lexicon validation (log only, don't block)
120 if p.catalog != nil {
121 if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil {
122 slog.Debug("Record failed full lexicon validation (ingesting anyway)",
123 "component", "processor",
124 "collection", collection,
125 "error", err)
126 }
127 }
128
129 // Targeted validation for collections that had bogus data issues
130 // These DO block ingestion
131 switch collection {
132 case atproto.CaptainCollection:
133 // Captain must have non-empty owner DID
134 owner, _ := recordData["owner"].(string)
135 if owner == "" || !strings.HasPrefix(owner, "did:") {
136 return fmt.Errorf("captain record missing or invalid owner DID")
137 }
138
139 case atproto.CrewCollection:
140 // Crew must have non-empty member DID
141 member, _ := recordData["member"].(string)
142 if member == "" || !strings.HasPrefix(member, "did:") {
143 return fmt.Errorf("crew record missing or invalid member DID")
144 }
145 }
146
147 return nil
148}
149
150// ProcessRecord is the unified entry point for processing any ATCR record.
151// It handles:
152// 1. Schema validation against published lexicons
153// 2. User creation for user-activity collections
154// 3. Dispatch to the appropriate Process* method
155//
156// queryCaptainFn is optional - used by backfill for sailor profile processing
157func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error {
158 // Skip validation for deletes (no record data)
159 if !isDelete && data != nil {
160 if err := p.ValidateRecord(ctx, collection, data); err != nil {
161 slog.Warn("Record failed schema validation, skipping",
162 "component", "processor",
163 "collection", collection,
164 "did", did,
165 "error", err)
166 return nil // Skip invalid records silently
167 }
168 }
169
170 // User-activity collections create/update user entries
171 // Skip for deletes - user should already exist, and we don't need to resolve identity
172 if !isDelete {
173 switch collection {
174 case atproto.ManifestCollection,
175 atproto.TagCollection,
176 atproto.StarCollection,
177 atproto.RepoPageCollection,
178 atproto.SailorProfileCollection:
179 if err := p.EnsureUser(ctx, did); err != nil {
180 return fmt.Errorf("failed to ensure user: %w", err)
181 }
182 // Hold collections (captain, crew, stats) - don't create user entries
183 // These are records FROM holds, not user activity
184 }
185 }
186
187 // Dispatch to specific handler
188 switch collection {
189 case atproto.ManifestCollection:
190 if isDelete {
191 return db.DeleteManifest(p.db, did, "", rkey)
192 }
193 _, err := p.ProcessManifest(ctx, did, data)
194 return err
195
196 case atproto.TagCollection:
197 if isDelete {
198 repo, tag := atproto.RKeyToRepositoryTag(rkey)
199 return db.DeleteTag(p.db, did, repo, tag)
200 }
201 return p.ProcessTag(ctx, did, data)
202
203 case atproto.StarCollection:
204 if isDelete {
205 ownerDID, repository, err := atproto.ParseStarRecordKey(rkey)
206 if err != nil {
207 return err
208 }
209 return db.DeleteStar(p.db, did, ownerDID, repository)
210 }
211 return p.ProcessStar(ctx, did, data)
212
213 case atproto.RepoPageCollection:
214 return p.ProcessRepoPage(ctx, did, rkey, data, isDelete)
215
216 case atproto.SailorProfileCollection:
217 return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn)
218
219 case atproto.StatsCollection:
220 return p.ProcessStats(ctx, did, data, isDelete)
221
222 case atproto.CaptainCollection:
223 if isDelete {
224 return db.DeleteCaptainRecord(p.db, did)
225 }
226 return p.ProcessCaptain(ctx, did, data)
227
228 case atproto.CrewCollection:
229 if isDelete {
230 return db.DeleteCrewMemberByRkey(p.db, did, rkey)
231 }
232 return p.ProcessCrew(ctx, did, rkey, data)
233
234 default:
235 return nil // Unknown collection, ignore
236 }
237}
238
239// ProcessManifest processes a manifest record and stores it in the database
240// Returns the manifest ID for further processing (layers/references)
241func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
242 // Unmarshal manifest record
243 var manifestRecord atproto.ManifestRecord
244 if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
245 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
246 }
247 // Detect manifest type
248 isManifestList := len(manifestRecord.Manifests) > 0
249
250 // Extract hold DID from manifest (with fallback for legacy manifests)
251 // New manifests use holdDid field (DID format)
252 // Old manifests use holdEndpoint field (URL format) - convert to DID
253 holdDID := manifestRecord.HoldDID
254 if holdDID == "" && manifestRecord.HoldEndpoint != "" {
255 // Legacy manifest - convert URL to DID
256 holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
257 }
258
259 // Detect artifact type from config media type
260 artifactType := "container-image"
261 if !isManifestList && manifestRecord.Config != nil {
262 artifactType = db.GetArtifactType(manifestRecord.Config.MediaType)
263 }
264
265 // Prepare manifest for insertion (WITHOUT annotation fields)
266 manifest := &db.Manifest{
267 DID: did,
268 Repository: manifestRecord.Repository,
269 Digest: manifestRecord.Digest,
270 MediaType: manifestRecord.MediaType,
271 SchemaVersion: manifestRecord.SchemaVersion,
272 HoldEndpoint: holdDID,
273 ArtifactType: artifactType,
274 CreatedAt: manifestRecord.CreatedAt,
275 // Annotations removed - stored separately in repository_annotations table
276 }
277
278 // Set config fields only for image manifests (not manifest lists)
279 if !isManifestList && manifestRecord.Config != nil {
280 manifest.ConfigDigest = manifestRecord.Config.Digest
281 manifest.ConfigSize = manifestRecord.Config.Size
282 }
283
284 // Insert manifest
285 manifestID, err := db.InsertManifest(p.db, manifest)
286 if err != nil {
287 // For backfill: if manifest already exists, get its ID
288 if strings.Contains(err.Error(), "UNIQUE constraint failed") {
289 var existingID int64
290 err := p.db.QueryRow(`
291 SELECT id FROM manifests
292 WHERE did = ? AND repository = ? AND digest = ?
293 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
294
295 if err != nil {
296 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
297 }
298 manifestID = existingID
299 } else {
300 return 0, fmt.Errorf("failed to insert manifest: %w", err)
301 }
302 }
303
304 // Update repository annotations ONLY if manifest has at least one non-empty annotation
305 if manifestRecord.Annotations != nil {
306 hasData := false
307 for _, value := range manifestRecord.Annotations {
308 if value != "" {
309 hasData = true
310 break
311 }
312 }
313
314 if hasData {
315 // Replace all annotations for this repository
316 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
317 if err != nil {
318 return 0, fmt.Errorf("failed to upsert annotations: %w", err)
319 }
320 }
321 }
322
323 // Insert manifest references or layers
324 if isManifestList {
325 // Insert manifest references (for manifest lists/indexes)
326 for i, ref := range manifestRecord.Manifests {
327 platformArch := ""
328 platformOS := ""
329 platformVariant := ""
330 platformOSVersion := ""
331
332 if ref.Platform != nil {
333 platformArch = ref.Platform.Architecture
334 platformOS = ref.Platform.OS
335 platformVariant = ref.Platform.Variant
336 platformOSVersion = ref.Platform.OSVersion
337 }
338
339 // Detect attestation manifests from annotations
340 isAttestation := false
341 if ref.Annotations != nil {
342 if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
343 isAttestation = refType == "attestation-manifest"
344 }
345 }
346
347 if err := db.InsertManifestReference(p.db, &db.ManifestReference{
348 ManifestID: manifestID,
349 Digest: ref.Digest,
350 MediaType: ref.MediaType,
351 Size: ref.Size,
352 PlatformArchitecture: platformArch,
353 PlatformOS: platformOS,
354 PlatformVariant: platformVariant,
355 PlatformOSVersion: platformOSVersion,
356 IsAttestation: isAttestation,
357 ReferenceIndex: i,
358 }); err != nil {
359 // Continue on error - reference might already exist
360 continue
361 }
362 }
363 } else {
364 // Insert layers (for image manifests)
365 for i, layer := range manifestRecord.Layers {
366 if err := db.InsertLayer(p.db, &db.Layer{
367 ManifestID: manifestID,
368 Digest: layer.Digest,
369 MediaType: layer.MediaType,
370 Size: layer.Size,
371 LayerIndex: i,
372 }); err != nil {
373 // Continue on error - layer might already exist
374 continue
375 }
376 }
377 }
378
379 return manifestID, nil
380}
381
382// ProcessTag processes a tag record and stores it in the database
383func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
384 // Unmarshal tag record
385 var tagRecord atproto.TagRecord
386 if err := json.Unmarshal(recordData, &tagRecord); err != nil {
387 return fmt.Errorf("failed to unmarshal tag: %w", err)
388 }
389 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
390 manifestDigest, err := tagRecord.GetManifestDigest()
391 if err != nil {
392 return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
393 }
394
395 // Insert or update tag
396 return db.UpsertTag(p.db, &db.Tag{
397 DID: did,
398 Repository: tagRecord.Repository,
399 Tag: tagRecord.Tag,
400 Digest: manifestDigest,
401 CreatedAt: tagRecord.UpdatedAt,
402 })
403}
404
405// ProcessStar processes a star record and stores it in the database
406func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
407 // Unmarshal star record (handles both old object and new AT URI subject formats)
408 var starRecord atproto.StarRecord
409 if err := json.Unmarshal(recordData, &starRecord); err != nil {
410 return fmt.Errorf("failed to unmarshal star: %w", err)
411 }
412
413 // Extract owner DID and repository from subject AT URI
414 ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository()
415 if err != nil {
416 return fmt.Errorf("failed to parse star subject: %w", err)
417 }
418
419 // Ensure the starred repository's owner exists in the users table
420 // (the starrer is already ensured by ProcessRecord, but the owner
421 // may not have been processed yet during backfill or live events)
422 if err := p.EnsureUser(ctx, ownerDID); err != nil {
423 return fmt.Errorf("failed to ensure star subject user: %w", err)
424 }
425
426 // Upsert the star record (idempotent - won't duplicate)
427 // The DID here is the starrer (user who starred)
428 // Star count will be calculated on demand from the stars table
429 return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt)
430}
431
432// ProcessSailorProfile processes a sailor profile record
433// This is primarily used by backfill to cache captain records for holds
434func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
435 // Unmarshal sailor profile record
436 var profileRecord atproto.SailorProfileRecord
437 if err := json.Unmarshal(recordData, &profileRecord); err != nil {
438 return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
439 }
440
441 // Skip if no default hold set
442 if profileRecord.DefaultHold == "" {
443 return nil
444 }
445
446 // Convert hold URL/DID to canonical DID
447 holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold)
448 if holdDID == "" {
449 slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold)
450 return nil
451 }
452
453 // Query and cache the captain record using provided function
454 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here
455 if queryCaptainFn != nil {
456 return queryCaptainFn(ctx, holdDID)
457 }
458
459 return nil
460}
461
462// ProcessRepoPage processes a repository page record
463// This is called when Jetstream receives a repo page create/update event
464func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
465 if isDelete {
466 // Delete the repo page from our cache
467 return db.DeleteRepoPage(p.db, did, rkey)
468 }
469
470 // Unmarshal repo page record
471 var pageRecord atproto.RepoPageRecord
472 if err := json.Unmarshal(recordData, &pageRecord); err != nil {
473 return fmt.Errorf("failed to unmarshal repo page: %w", err)
474 }
475
476 // Extract avatar CID if present
477 avatarCID := ""
478 if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
479 avatarCID = pageRecord.Avatar.Ref.Link
480 }
481
482 // Upsert to database
483 return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt)
484}
485
486// ProcessIdentity handles identity change events (handle updates)
487// This is called when Jetstream receives an identity event indicating a handle change.
488// The identity cache is invalidated to ensure the next lookup uses the new handle,
489// and the database is updated to reflect the change in the UI.
490//
491// Only processes events for users who already exist in our database (have ATCR activity).
492func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
493 // Check if user exists in our database - only update if they're an ATCR user
494 user, err := db.GetUserByDID(p.db, did)
495 if err != nil {
496 return fmt.Errorf("failed to check user existence: %w", err)
497 }
498
499 // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
500 if user == nil {
501 return nil
502 }
503
504 // Update handle in database
505 if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
506 slog.Warn("Failed to update user handle in database",
507 "component", "processor",
508 "did", did,
509 "handle", newHandle,
510 "error", err)
511 // Continue to invalidate cache even if DB update fails
512 }
513
514 // Invalidate cached identity data to force re-resolution on next lookup
515 if err := atproto.InvalidateIdentity(ctx, did); err != nil {
516 slog.Warn("Failed to invalidate identity cache",
517 "component", "processor",
518 "did", did,
519 "error", err)
520 return err
521 }
522
523 slog.Info("Processed identity change event",
524 "component", "processor",
525 "did", did,
526 "old_handle", user.Handle,
527 "new_handle", newHandle)
528
529 return nil
530}
531
532// ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users
533// This refreshes the cached avatar URL when a user changes their Bluesky profile picture
534func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error {
535 // Check if user exists in our database - only update if they're an ATCR user
536 user, err := db.GetUserByDID(p.db, did)
537 if err != nil {
538 return fmt.Errorf("failed to check user existence: %w", err)
539 }
540
541 // Skip if user doesn't exist - they don't have any ATCR activity
542 if user == nil {
543 return nil
544 }
545
546 // Parse the profile record to extract avatar
547 var profile struct {
548 Avatar *atproto.ATProtoBlobRef `json:"avatar"`
549 }
550 if err := json.Unmarshal(recordData, &profile); err != nil {
551 return fmt.Errorf("failed to unmarshal profile: %w", err)
552 }
553
554 // Build new avatar URL
555 avatarURL := ""
556 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
557 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
558 }
559
560 // Update if changed
561 if avatarURL != user.Avatar {
562 slog.Info("Updating avatar from profile change",
563 "component", "processor",
564 "did", did,
565 "old_avatar", user.Avatar,
566 "new_avatar", avatarURL)
567 return db.UpdateUserAvatar(p.db, did, avatarURL)
568 }
569
570 return nil
571}
572
573// RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar
574// This is called during backfill to ensure avatars stay fresh for existing users
575func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error {
576 // Get user from database to compare avatar
577 user, err := db.GetUserByDID(p.db, did)
578 if err != nil || user == nil {
579 return nil // User doesn't exist, skip
580 }
581
582 // Fetch profile from PDS
583 client := atproto.NewClient(pdsEndpoint, "", "")
584 profile, err := client.GetProfileRecord(ctx, did)
585 if err != nil {
586 return fmt.Errorf("failed to fetch profile: %w", err)
587 }
588
589 // Build avatar URL
590 avatarURL := ""
591 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
592 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
593 }
594
595 // Update if changed
596 if avatarURL != user.Avatar {
597 slog.Info("Backfill refreshing avatar",
598 "component", "processor",
599 "did", did,
600 "old_avatar", user.Avatar,
601 "new_avatar", avatarURL)
602 return db.UpdateUserAvatar(p.db, did, avatarURL)
603 }
604
605 return nil
606}
607
608// ProcessStats handles stats record events from hold PDSes
609// This is called when Jetstream receives a stats create/update/delete event from a hold
610// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository
611func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
612 // Skip if no stats cache configured
613 if p.statsCache == nil {
614 return nil
615 }
616
617 // Unmarshal stats record
618 var statsRecord atproto.StatsRecord
619 if err := json.Unmarshal(recordData, &statsRecord); err != nil {
620 return fmt.Errorf("failed to unmarshal stats record: %w", err)
621 }
622
623 if isDelete {
624 // Delete from in-memory cache
625 p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository)
626 } else {
627 // Parse timestamps
628 var lastPull, lastPush *time.Time
629 if statsRecord.LastPull != "" {
630 t, err := time.Parse(time.RFC3339, statsRecord.LastPull)
631 if err == nil {
632 lastPull = &t
633 }
634 }
635 if statsRecord.LastPush != "" {
636 t, err := time.Parse(time.RFC3339, statsRecord.LastPush)
637 if err == nil {
638 lastPush = &t
639 }
640 }
641
642 // Update in-memory cache
643 p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository,
644 statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush)
645 }
646
647 // Get aggregated stats across all holds
648 totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated(
649 statsRecord.OwnerDID, statsRecord.Repository)
650
651 // Upsert aggregated stats to repository_stats
652 return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{
653 DID: statsRecord.OwnerDID,
654 Repository: statsRecord.Repository,
655 PullCount: int(totalPull),
656 PushCount: int(totalPush),
657 LastPull: latestPull,
658 LastPush: latestPush,
659 })
660}
661
662// ProcessCaptain handles captain record events from hold PDSes
663// This is called when Jetstream receives a captain create/update/delete event from a hold
664// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info
665func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error {
666 // Unmarshal captain record
667 var captainRecord atproto.CaptainRecord
668 if err := json.Unmarshal(recordData, &captainRecord); err != nil {
669 return fmt.Errorf("failed to unmarshal captain record: %w", err)
670 }
671
672 // Convert to db struct and upsert
673 record := &db.HoldCaptainRecord{
674 HoldDID: holdDID,
675 OwnerDID: captainRecord.Owner,
676 Public: captainRecord.Public,
677 AllowAllCrew: captainRecord.AllowAllCrew,
678 DeployedAt: captainRecord.DeployedAt,
679 Region: captainRecord.Region,
680 UpdatedAt: time.Now(),
681 }
682
683 if err := db.UpsertCaptainRecord(p.db, record); err != nil {
684 return fmt.Errorf("failed to upsert captain record: %w", err)
685 }
686
687 slog.Info("Processed captain record",
688 "component", "processor",
689 "hold_did", holdDID,
690 "owner_did", captainRecord.Owner,
691 "public", captainRecord.Public,
692 "allow_all_crew", captainRecord.AllowAllCrew)
693
694 return nil
695}
696
697// ProcessCrew handles crew record events from hold PDSes
698// This is called when Jetstream receives a crew create/update/delete event from a hold
699// The holdDID is the DID of the hold PDS (event.DID), and the record contains member info
700func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error {
701 // Unmarshal crew record
702 var crewRecord atproto.CrewRecord
703 if err := json.Unmarshal(recordData, &crewRecord); err != nil {
704 return fmt.Errorf("failed to unmarshal crew record: %w", err)
705 }
706
707 // Marshal permissions to JSON string
708 permissionsJSON := ""
709 if len(crewRecord.Permissions) > 0 {
710 if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil {
711 permissionsJSON = string(jsonBytes)
712 }
713 }
714
715 // Convert to db struct and upsert
716 member := &db.CrewMember{
717 HoldDID: holdDID,
718 MemberDID: crewRecord.Member,
719 Rkey: rkey,
720 Role: crewRecord.Role,
721 Permissions: permissionsJSON,
722 Tier: crewRecord.Tier,
723 AddedAt: crewRecord.AddedAt,
724 }
725
726 if err := db.UpsertCrewMember(p.db, member); err != nil {
727 return fmt.Errorf("failed to upsert crew member: %w", err)
728 }
729
730 slog.Debug("Processed crew record",
731 "component", "processor",
732 "hold_did", holdDID,
733 "member_did", crewRecord.Member,
734 "role", crewRecord.Role,
735 "permissions", crewRecord.Permissions)
736
737 return nil
738}
739
740// ProcessAccount handles account status events (deactivation/deletion/etc)
741// This is called when Jetstream receives an account event indicating status changes.
742//
743// Status handling:
744// - "deleted": Account permanently deleted - remove all cached data
745// - "deactivated": Could be PDS migration or permanent - invalidate cache only
746// - "takendown": Moderation action - invalidate cache only
747// - Other: Ignore
748//
749// For "deactivated", we don't delete data because it's ambiguous:
750// - Could be permanent deactivation (user deleted account)
751// - Could be PDS migration (account moves to new PDS)
752// Cache invalidation forces re-resolution on next lookup.
753//
754// Only processes events for users who already exist in our database (have ATCR activity).
755func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
756 // Skip active accounts or unknown statuses
757 if active {
758 return nil
759 }
760
761 // Check if user exists in our database - only process if they're an ATCR user
762 user, err := db.GetUserByDID(p.db, did)
763 if err != nil {
764 return fmt.Errorf("failed to check user existence: %w", err)
765 }
766
767 // Skip if user doesn't exist - they don't have any ATCR activity
768 if user == nil {
769 return nil
770 }
771
772 switch status {
773 case "deleted":
774 // Account permanently deleted - remove all cached data
775 if err := db.DeleteUserData(p.db, did); err != nil {
776 slog.Error("Failed to delete user data for deleted account",
777 "component", "processor",
778 "did", did,
779 "handle", user.Handle,
780 "error", err)
781 return err
782 }
783
784 // Also invalidate identity cache
785 _ = atproto.InvalidateIdentity(ctx, did)
786
787 slog.Info("Deleted user data for deleted account",
788 "component", "processor",
789 "did", did,
790 "handle", user.Handle)
791
792 case "deactivated", "takendown":
793 // Ambiguous status - invalidate cache but keep data
794 // For deactivated: could be PDS migration, will resolve on next lookup
795 // For takendown: moderation action, keep data in case of appeal
796 if err := atproto.InvalidateIdentity(ctx, did); err != nil {
797 slog.Warn("Failed to invalidate identity cache",
798 "component", "processor",
799 "did", did,
800 "status", status,
801 "error", err)
802 return err
803 }
804
805 slog.Info("Processed account status event - cache invalidated",
806 "component", "processor",
807 "did", did,
808 "handle", user.Handle,
809 "status", status)
810
811 default:
812 // Unknown status - ignore
813 slog.Debug("Ignoring unknown account status",
814 "component", "processor",
815 "did", did,
816 "status", status)
817 }
818
819 return nil
820}