···120120 for _, repo := range result.Repos {
121121 recordCount, err := b.backfillRepo(ctx, repo.DID, collection)
122122 if err != nil {
123123- // RepoNotFound means account was deleted/deactivated
124124- // Clean up our cached data since the source is gone
125125- if strings.Contains(err.Error(), "RepoNotFound") {
123123+ // Account may be deleted/deactivated/migrated - clean up our cached data
124124+ errStr := err.Error()
125125+ if strings.Contains(errStr, "RepoNotFound") ||
126126+ strings.Contains(errStr, "Could not find repo") ||
127127+ strings.Contains(errStr, "status 400") ||
128128+ strings.Contains(errStr, "status 404") {
126129 if delErr := db.DeleteUserData(b.db, repo.DID); delErr != nil {
127130 slog.Warn("Backfill failed to delete data for removed repo", "did", repo.DID, "error", delErr)
128131 } else {
···156159157160// backfillRepo backfills all records for a single repo/DID
158161func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) {
159159- // Ensure user exists in database and get their PDS endpoint
160160- if err := b.processor.EnsureUser(ctx, did); err != nil {
161161- return 0, fmt.Errorf("failed to ensure user: %w", err)
162162- }
163163-164162 // Resolve DID to get user's PDS endpoint
165163 pdsEndpoint, err := atproto.ResolveDIDToPDS(ctx, did)
166164 if err != nil {
···304302 return nil
305303}
306304307307-// processRecord processes a single record and stores it in the database
305305+// processRecord processes a single record using the unified ProcessRecord method.
306306+// This ensures consistent handling (validation, user creation) between Worker and Backfill.
308307func (b *BackfillWorker) processRecord(ctx context.Context, did, collection string, record *atproto.Record) error {
309309- switch collection {
310310- case atproto.ManifestCollection:
311311- _, err := b.processor.ProcessManifest(context.Background(), did, record.Value)
312312- return err
313313- case atproto.TagCollection:
314314- return b.processor.ProcessTag(context.Background(), did, record.Value)
315315- case atproto.StarCollection:
316316- return b.processor.ProcessStar(context.Background(), did, record.Value)
317317- case atproto.SailorProfileCollection:
318318- return b.processor.ProcessSailorProfile(ctx, did, record.Value, b.queryCaptainRecordWrapper)
319319- case atproto.RepoPageCollection:
320320- // rkey is extracted from the record URI, but for repo pages we use Repository field
321321- return b.processor.ProcessRepoPage(ctx, did, record.URI, record.Value, false)
322322- case atproto.StatsCollection:
323323- // Stats are stored in hold PDSes, not user PDSes
324324- // 'did' here is the hold's DID (e.g., did:web:hold01.atcr.io)
325325- return b.processor.ProcessStats(ctx, did, record.Value, false)
326326- case atproto.CaptainCollection:
327327- // Captain records are stored in hold PDSes
328328- // 'did' here is the hold's DID (e.g., did:web:hold01.atcr.io)
329329- return b.processor.ProcessCaptain(ctx, did, record.Value)
330330- case atproto.CrewCollection:
331331- // Crew records are stored in hold PDSes
332332- // 'did' here is the hold's DID, rkey is derived from member DID
333333- // Extract rkey from record URI (at://did/collection/rkey)
334334- rkey := extractRkeyFromURI(record.URI)
335335- return b.processor.ProcessCrew(ctx, did, rkey, record.Value)
336336- default:
337337- return fmt.Errorf("unsupported collection: %s", collection)
308308+ rkey := extractRkeyFromURI(record.URI)
309309+310310+ // For sailor profile collection, we need to pass the queryCaptainFn
311311+ // Other collections pass nil
312312+ var queryCaptainFn func(context.Context, string) error
313313+ if collection == atproto.SailorProfileCollection {
314314+ queryCaptainFn = b.queryCaptainRecordWrapper
338315 }
316316+317317+ return b.processor.ProcessRecord(ctx, did, collection, rkey, record.Value, false, queryCaptainFn)
339318}
340319341320// queryCaptainRecordWrapper wraps queryCaptainRecord with backfill-specific logic
+138
pkg/appview/jetstream/processor.go
···11111212 "atcr.io/pkg/appview/db"
1313 "atcr.io/pkg/atproto"
1414+ "github.com/bluesky-social/indigo/atproto/identity"
1515+ "github.com/bluesky-social/indigo/atproto/lexicon"
1416)
15171618// Processor handles shared database operations for both Worker (live) and Backfill (sync)
···2022 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
2123 statsCache *StatsCache // In-memory cache for per-hold stats aggregation
2224 useCache bool
2525+ catalog *lexicon.ResolvingCatalog // For debug logging of validation failures
2326}
24272528// NewProcessor creates a new shared processor
2629// useCache: true for Worker (live streaming), false for Backfill (batch processing)
2730// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
2831func NewProcessor(database *sql.DB, useCache bool, statsCache *StatsCache) *Processor {
3232+ // Create lexicon catalog for debug validation logging
3333+ dir := identity.DefaultDirectory()
3434+ catalog := lexicon.NewResolvingCatalog()
3535+ catalog.Directory = dir
3636+2937 p := &Processor{
3038 db: database,
3139 useCache: useCache,
3240 statsCache: statsCache,
4141+ catalog: &catalog,
3342 }
34433544 if useCache {
···97106 return db.UpsertUser(p.db, user)
98107 }
99108 return db.UpsertUserIgnoreAvatar(p.db, user)
109109+}
110110+111111+// ValidateRecord performs validation on records.
112112+// - Full lexicon validation is logged for debugging but does NOT block ingestion
113113+// - Targeted validation (captain/crew DID checks) DOES block bogus records
114114+func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error {
115115+ var recordData map[string]any
116116+ if err := json.Unmarshal(data, &recordData); err != nil {
117117+ return fmt.Errorf("invalid JSON: %w", err)
118118+ }
119119+120120+ // Debug: Full lexicon validation (log only, don't block)
121121+ if p.catalog != nil {
122122+ if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil {
123123+ slog.Debug("Record failed full lexicon validation (ingesting anyway)",
124124+ "component", "processor",
125125+ "collection", collection,
126126+ "error", err)
127127+ }
128128+ }
129129+130130+ // Targeted validation for collections that had bogus data issues
131131+ // These DO block ingestion
132132+ switch collection {
133133+ case atproto.CaptainCollection:
134134+ // Captain must have non-empty owner DID
135135+ owner, _ := recordData["owner"].(string)
136136+ if owner == "" || !strings.HasPrefix(owner, "did:") {
137137+ return fmt.Errorf("captain record missing or invalid owner DID")
138138+ }
139139+140140+ case atproto.CrewCollection:
141141+ // Crew must have non-empty member DID
142142+ member, _ := recordData["member"].(string)
143143+ if member == "" || !strings.HasPrefix(member, "did:") {
144144+ return fmt.Errorf("crew record missing or invalid member DID")
145145+ }
146146+ }
147147+148148+ return nil
149149+}
150150+151151+// ProcessRecord is the unified entry point for processing any ATCR record.
152152+// It handles:
153153+// 1. Schema validation against published lexicons
154154+// 2. User creation for user-activity collections
155155+// 3. Dispatch to the appropriate Process* method
156156+//
157157+// queryCaptainFn is optional - used by backfill for sailor profile processing
158158+func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error {
159159+ // Skip validation for deletes (no record data)
160160+ if !isDelete && data != nil {
161161+ if err := p.ValidateRecord(ctx, collection, data); err != nil {
162162+ slog.Warn("Record failed schema validation, skipping",
163163+ "component", "processor",
164164+ "collection", collection,
165165+ "did", did,
166166+ "error", err)
167167+ return nil // Skip invalid records silently
168168+ }
169169+ }
170170+171171+ // User-activity collections create/update user entries
172172+ // Skip for deletes - user should already exist, and we don't need to resolve identity
173173+ if !isDelete {
174174+ switch collection {
175175+ case atproto.ManifestCollection,
176176+ atproto.TagCollection,
177177+ atproto.StarCollection,
178178+ atproto.RepoPageCollection,
179179+ atproto.SailorProfileCollection:
180180+ if err := p.EnsureUser(ctx, did); err != nil {
181181+ return fmt.Errorf("failed to ensure user: %w", err)
182182+ }
183183+ // Hold collections (captain, crew, stats) - don't create user entries
184184+ // These are records FROM holds, not user activity
185185+ }
186186+ }
187187+188188+ // Dispatch to specific handler
189189+ switch collection {
190190+ case atproto.ManifestCollection:
191191+ if isDelete {
192192+ return db.DeleteManifest(p.db, did, "", rkey)
193193+ }
194194+ _, err := p.ProcessManifest(ctx, did, data)
195195+ return err
196196+197197+ case atproto.TagCollection:
198198+ if isDelete {
199199+ repo, tag := atproto.RKeyToRepositoryTag(rkey)
200200+ return db.DeleteTag(p.db, did, repo, tag)
201201+ }
202202+ return p.ProcessTag(ctx, did, data)
203203+204204+ case atproto.StarCollection:
205205+ if isDelete {
206206+ ownerDID, repository, err := atproto.ParseStarRecordKey(rkey)
207207+ if err != nil {
208208+ return err
209209+ }
210210+ return db.DeleteStar(p.db, did, ownerDID, repository)
211211+ }
212212+ return p.ProcessStar(ctx, did, data)
213213+214214+ case atproto.RepoPageCollection:
215215+ return p.ProcessRepoPage(ctx, did, rkey, data, isDelete)
216216+217217+ case atproto.SailorProfileCollection:
218218+ return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn)
219219+220220+ case atproto.StatsCollection:
221221+ return p.ProcessStats(ctx, did, data, isDelete)
222222+223223+ case atproto.CaptainCollection:
224224+ if isDelete {
225225+ return db.DeleteCaptainRecord(p.db, did)
226226+ }
227227+ return p.ProcessCaptain(ctx, did, data)
228228+229229+ case atproto.CrewCollection:
230230+ if isDelete {
231231+ return db.DeleteCrewMemberByRkey(p.db, did, rkey)
232232+ }
233233+ return p.ProcessCrew(ctx, did, rkey, data)
234234+235235+ default:
236236+ return nil // Unknown collection, ignore
237237+ }
100238}
101239102240// ProcessManifest processes a manifest record and stores it in the database
+174
pkg/appview/jetstream/processor_test.go
···618618 }
619619}
620620621621+func TestProcessRecord_RoutesCorrectly(t *testing.T) {
622622+ db := setupTestDB(t)
623623+ defer db.Close()
624624+625625+ // Add missing tables for this test
626626+ _, err := db.Exec(`
627627+ CREATE TABLE repo_pages (
628628+ did TEXT NOT NULL,
629629+ repository TEXT NOT NULL,
630630+ description TEXT,
631631+ avatar_cid TEXT,
632632+ created_at TIMESTAMP NOT NULL,
633633+ updated_at TIMESTAMP NOT NULL,
634634+ PRIMARY KEY(did, repository)
635635+ );
636636+ CREATE TABLE hold_captain_records (
637637+ hold_did TEXT PRIMARY KEY,
638638+ owner_did TEXT NOT NULL,
639639+ public BOOLEAN NOT NULL,
640640+ allow_all_crew BOOLEAN NOT NULL,
641641+ deployed_at TEXT,
642642+ region TEXT,
643643+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
644644+ );
645645+ CREATE TABLE hold_crew_members (
646646+ hold_did TEXT NOT NULL,
647647+ member_did TEXT NOT NULL,
648648+ rkey TEXT NOT NULL,
649649+ role TEXT,
650650+ permissions TEXT,
651651+ tier TEXT,
652652+ added_at TEXT,
653653+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
654654+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
655655+ PRIMARY KEY (hold_did, member_did)
656656+ );
657657+ `)
658658+ if err != nil {
659659+ t.Fatalf("Failed to add tables: %v", err)
660660+ }
661661+662662+ processor := NewProcessor(db, false, nil)
663663+ ctx := context.Background()
664664+665665+ // Test 1: ProcessRecord routes manifest correctly
666666+ // Note: Schema validation may fail for io.atcr.manifest since we can't resolve the schema,
667667+ // but this tests the routing logic
668668+ manifestRecord := map[string]any{
669669+ "$type": "io.atcr.manifest",
670670+ "repository": "test-app",
671671+ "digest": "sha256:route123",
672672+ "mediaType": "application/vnd.oci.image.manifest.v1+json",
673673+ "schemaVersion": 2,
674674+ "holdDid": "did:web:hold01.atcr.io",
675675+ "createdAt": time.Now().Format(time.RFC3339),
676676+ }
677677+ recordBytes, _ := json.Marshal(manifestRecord)
678678+679679+ // Note: ProcessRecord will skip validation if lexicon can't be resolved (expected in tests)
680680+ // and will skip EnsureUser since we don't have a real PDS to resolve
681681+ // Just verify the record is processed without panic
682682+ err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "route123", recordBytes, false, nil)
683683+ // Error expected since we can't resolve identity - that's fine for this test
684684+ if err != nil {
685685+ t.Logf("Expected error (can't resolve identity): %v", err)
686686+ }
687687+688688+ // Test 2: ProcessRecord handles captain record without creating user
689689+ captainRecord := map[string]any{
690690+ "$type": "io.atcr.hold.captain",
691691+ "owner": "did:plc:owner123",
692692+ "public": true,
693693+ "allowAllCrew": false,
694694+ "enableBlueskyPosts": false,
695695+ "deployedAt": time.Now().Format(time.RFC3339),
696696+ }
697697+ captainBytes, _ := json.Marshal(captainRecord)
698698+699699+ // This should NOT call EnsureUser (captain is a hold collection)
700700+ err = processor.ProcessRecord(ctx, "did:web:hold.example.com", atproto.CaptainCollection, "self", captainBytes, false, nil)
701701+ if err != nil {
702702+ t.Logf("Error processing captain (validation may fail in test): %v", err)
703703+ }
704704+705705+ // Verify no user was created for the hold DID
706706+ var userCount int
707707+ err = db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, "did:web:hold.example.com").Scan(&userCount)
708708+ if err != nil {
709709+ t.Fatalf("Failed to query users: %v", err)
710710+ }
711711+ if userCount != 0 {
712712+ t.Error("Captain record processing should NOT create a user entry for holds")
713713+ }
714714+715715+ // Test 3: ProcessRecord handles delete operations
716716+ err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "sha256:todelete", nil, true, nil)
717717+ if err != nil {
718718+ t.Errorf("Delete should not error: %v", err)
719719+ }
720720+}
721721+722722+func TestProcessRecord_SkipsInvalidRecords(t *testing.T) {
723723+ db := setupTestDB(t)
724724+ defer db.Close()
725725+726726+ processor := NewProcessor(db, false, nil)
727727+ ctx := context.Background()
728728+729729+ // Test: Invalid JSON should be skipped silently (no error returned)
730730+ invalidJSON := []byte(`{invalid json}`)
731731+ err := processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "test", invalidJSON, false, nil)
732732+ // Should return nil (skipped silently) not an error
733733+ if err != nil {
734734+ t.Errorf("Invalid record should be skipped silently, got error: %v", err)
735735+ }
736736+}
737737+738738+func TestValidateRecord(t *testing.T) {
739739+ db := setupTestDB(t)
740740+ defer db.Close()
741741+742742+ processor := NewProcessor(db, false, nil)
743743+ ctx := context.Background()
744744+745745+ // Test 1: Manifest passes (no strict validation)
746746+ manifestJSON := []byte(`{"$type": "io.atcr.manifest", "repository": "test"}`)
747747+ err := processor.ValidateRecord(ctx, atproto.ManifestCollection, manifestJSON)
748748+ if err != nil {
749749+ t.Errorf("Manifest should pass validation: %v", err)
750750+ }
751751+752752+ // Test 2: Invalid JSON returns error
753753+ invalidJSON := []byte(`{invalid}`)
754754+ err = processor.ValidateRecord(ctx, atproto.ManifestCollection, invalidJSON)
755755+ if err == nil {
756756+ t.Error("Invalid JSON should return error")
757757+ }
758758+759759+ // Test 3: Captain with valid owner passes
760760+ captainValid := []byte(`{"owner": "did:plc:owner123", "public": true}`)
761761+ err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainValid)
762762+ if err != nil {
763763+ t.Errorf("Valid captain should pass: %v", err)
764764+ }
765765+766766+ // Test 4: Captain with empty owner is rejected
767767+ captainEmpty := []byte(`{"owner": "", "public": true}`)
768768+ err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainEmpty)
769769+ if err == nil {
770770+ t.Error("Captain with empty owner should be rejected")
771771+ }
772772+773773+ // Test 5: Captain with invalid owner (not a DID) is rejected
774774+ captainInvalid := []byte(`{"owner": "notadid", "public": true}`)
775775+ err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainInvalid)
776776+ if err == nil {
777777+ t.Error("Captain with invalid owner should be rejected")
778778+ }
779779+780780+ // Test 6: Crew with valid member passes
781781+ crewValid := []byte(`{"member": "did:plc:member123", "role": "write"}`)
782782+ err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewValid)
783783+ if err != nil {
784784+ t.Errorf("Valid crew should pass: %v", err)
785785+ }
786786+787787+ // Test 7: Crew with empty member is rejected
788788+ crewEmpty := []byte(`{"member": "", "role": "write"}`)
789789+ err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewEmpty)
790790+ if err == nil {
791791+ t.Error("Crew with empty member should be rejected")
792792+ }
793793+}
794794+621795func TestProcessAccount(t *testing.T) {
622796 db := setupTestDB(t)
623797 defer db.Close()
+39-247
pkg/appview/jetstream/worker.go
···309309 w.debugCollectionCount++
310310 }
311311312312- // Process based on collection
313313- switch commit.Collection {
314314- case atproto.ManifestCollection:
315315- slog.Info("Jetstream processing manifest event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
316316- return w.processManifest(commit)
317317- case atproto.TagCollection:
318318- slog.Info("Jetstream processing tag event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
319319- return w.processTag(commit)
320320- case atproto.StarCollection:
321321- slog.Info("Jetstream processing star event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
322322- return w.processStar(commit)
323323- case atproto.RepoPageCollection:
324324- slog.Info("Jetstream processing repo page event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
325325- return w.processRepoPage(commit)
326326- case atproto.StatsCollection:
327327- slog.Info("Jetstream processing stats event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
328328- return w.processStats(commit)
329329- case atproto.CaptainCollection:
330330- slog.Info("Jetstream processing captain event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
331331- return w.processCaptain(commit)
332332- case atproto.CrewCollection:
333333- slog.Info("Jetstream processing crew event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
334334- return w.processCrew(commit)
335335- default:
336336- // Ignore other collections
337337- return nil
312312+ // Check if this is an ATCR collection we care about
313313+ if !isATCRCollection(commit.Collection) {
314314+ return nil // Ignore non-ATCR collections
338315 }
339316317317+ slog.Info("Jetstream processing event",
318318+ "collection", commit.Collection,
319319+ "did", commit.DID,
320320+ "operation", commit.Operation,
321321+ "rkey", commit.RKey)
322322+323323+ // Marshal record to bytes for unified processing
324324+ var recordBytes []byte
325325+ if commit.Record != nil {
326326+ var err error
327327+ recordBytes, err = json.Marshal(commit.Record)
328328+ if err != nil {
329329+ return fmt.Errorf("failed to marshal record: %w", err)
330330+ }
331331+ }
332332+333333+ isDelete := commit.Operation == "delete"
334334+ return w.processor.ProcessRecord(context.Background(), commit.DID, commit.Collection, commit.RKey, recordBytes, isDelete, nil)
335335+340336 case "identity":
341337 if event.Identity == nil {
342338 return nil
···355351 }
356352}
357353358358-// processManifest processes a manifest commit event
359359-func (w *Worker) processManifest(commit *CommitEvent) error {
360360- // Resolve and upsert user with handle/PDS endpoint
361361- if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
362362- return fmt.Errorf("failed to ensure user: %w", err)
363363- }
364364-365365- if commit.Operation == "delete" {
366366- // Delete manifest - rkey is just the digest, repository is not encoded
367367- digest := commit.RKey
368368- if err := db.DeleteManifest(w.db, commit.DID, "", digest); err != nil {
369369- return err
370370- }
371371- // Clean up any orphaned tags pointing to this manifest
372372- return db.CleanupOrphanedTags(w.db, commit.DID)
373373- }
374374-375375- // Parse manifest record
376376- if commit.Record == nil {
377377- return nil // No record data, can't process
378378- }
379379-380380- // Marshal map to bytes for processing
381381- recordBytes, err := json.Marshal(commit.Record)
382382- if err != nil {
383383- return fmt.Errorf("failed to marshal record: %w", err)
384384- }
385385-386386- // Use shared processor for DB operations
387387- _, err = w.processor.ProcessManifest(context.Background(), commit.DID, recordBytes)
388388- return err
389389-}
390390-391391-// processTag processes a tag commit event
392392-func (w *Worker) processTag(commit *CommitEvent) error {
393393- // Resolve and upsert user with handle/PDS endpoint
394394- if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
395395- return fmt.Errorf("failed to ensure user: %w", err)
396396- }
397397-398398- if commit.Operation == "delete" {
399399- // Delete tag - decode rkey back to repository and tag
400400- repo, tag := atproto.RKeyToRepositoryTag(commit.RKey)
401401- slog.Info("Jetstream deleting tag", "did", commit.DID, "repository", repo, "tag", tag, "rkey", commit.RKey)
402402- if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil {
403403- slog.Error("Jetstream ERROR deleting tag", "error", err)
404404- return err
405405- }
406406- slog.Info("Jetstream successfully deleted tag", "did", commit.DID, "repository", repo, "tag", tag)
407407- return nil
408408- }
409409-410410- // Parse tag record
411411- if commit.Record == nil {
412412- return nil
413413- }
414414-415415- // Marshal map to bytes for processing
416416- recordBytes, err := json.Marshal(commit.Record)
417417- if err != nil {
418418- return fmt.Errorf("failed to marshal record: %w", err)
419419- }
420420-421421- // Use shared processor for DB operations
422422- return w.processor.ProcessTag(context.Background(), commit.DID, recordBytes)
423423-}
424424-425425-// processStar processes a star commit event
426426-func (w *Worker) processStar(commit *CommitEvent) error {
427427- // Resolve and upsert the user who starred (starrer)
428428- if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
429429- return fmt.Errorf("failed to ensure user: %w", err)
430430- }
431431-432432- if commit.Operation == "delete" {
433433- // Unstar - parse the rkey to get the subject (owner DID and repository)
434434- // Delete events don't include the full record, but the rkey contains the info we need
435435- ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey)
436436- if err != nil {
437437- return fmt.Errorf("failed to parse star rkey: %w", err)
438438- }
439439-440440- // Delete the star record
441441- return db.DeleteStar(w.db, commit.DID, ownerDID, repository)
442442- }
443443-444444- // Parse star record
445445- if commit.Record == nil {
446446- return nil
447447- }
448448-449449- // Marshal map to bytes for processing
450450- recordBytes, err := json.Marshal(commit.Record)
451451- if err != nil {
452452- return fmt.Errorf("failed to marshal record: %w", err)
453453- }
454454-455455- // Use shared processor for DB operations
456456- return w.processor.ProcessStar(context.Background(), commit.DID, recordBytes)
457457-}
458458-459459-// processRepoPage processes a repo page commit event
460460-func (w *Worker) processRepoPage(commit *CommitEvent) error {
461461- // Resolve and upsert user with handle/PDS endpoint
462462- if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
463463- return fmt.Errorf("failed to ensure user: %w", err)
464464- }
465465-466466- isDelete := commit.Operation == "delete"
467467-468468- if isDelete {
469469- // Delete - rkey is the repository name
470470- slog.Info("Jetstream deleting repo page", "did", commit.DID, "repository", commit.RKey)
471471- if err := w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, nil, true); err != nil {
472472- slog.Error("Jetstream ERROR deleting repo page", "error", err)
473473- return err
474474- }
475475- slog.Info("Jetstream successfully deleted repo page", "did", commit.DID, "repository", commit.RKey)
476476- return nil
477477- }
478478-479479- // Parse repo page record
480480- if commit.Record == nil {
481481- return nil
482482- }
483483-484484- // Marshal map to bytes for processing
485485- recordBytes, err := json.Marshal(commit.Record)
486486- if err != nil {
487487- return fmt.Errorf("failed to marshal record: %w", err)
488488- }
489489-490490- // Use shared processor for DB operations
491491- return w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, recordBytes, false)
492492-}
493493-494494-// processStats processes a stats commit event from a hold PDS
495495-func (w *Worker) processStats(commit *CommitEvent) error {
496496- isDelete := commit.Operation == "delete"
497497-498498- if isDelete {
499499- // For delete events, we need to parse the rkey to get ownerDID + repository
500500- // The rkey is deterministic: base32(sha256(ownerDID + "/" + repository)[:16])
501501- // Unfortunately, we can't reverse this - we need the record data
502502- // Delete events don't include record data, so we can't delete from cache
503503- // This is acceptable - stats will be refreshed on next update from hold
504504- slog.Debug("Jetstream ignoring stats delete event (cannot reverse rkey)", "did", commit.DID, "rkey", commit.RKey)
505505- return nil
506506- }
507507-508508- // Parse stats record
509509- if commit.Record == nil {
510510- return nil
511511- }
512512-513513- // Marshal map to bytes for processing
514514- recordBytes, err := json.Marshal(commit.Record)
515515- if err != nil {
516516- return fmt.Errorf("failed to marshal record: %w", err)
517517- }
518518-519519- // Use shared processor - commit.DID is the hold's DID
520520- return w.processor.ProcessStats(context.Background(), commit.DID, recordBytes, false)
521521-}
522522-523523-// processCaptain processes a captain record event from a hold's PDS
524524-func (w *Worker) processCaptain(commit *CommitEvent) error {
525525- holdDID := commit.DID // The repo DID IS the hold DID
526526-527527- if commit.Operation == "delete" {
528528- // Delete captain record - this cascades to crew members
529529- if err := db.DeleteCaptainRecord(w.db, holdDID); err != nil {
530530- return fmt.Errorf("failed to delete captain record: %w", err)
531531- }
532532- slog.Info("Deleted captain record for hold", "hold_did", holdDID)
533533- return nil
534534- }
535535-536536- // Parse captain record
537537- if commit.Record == nil {
538538- return nil
539539- }
540540-541541- // Marshal map to bytes for processing
542542- recordBytes, err := json.Marshal(commit.Record)
543543- if err != nil {
544544- return fmt.Errorf("failed to marshal captain record: %w", err)
545545- }
546546-547547- // Use shared processor
548548- return w.processor.ProcessCaptain(context.Background(), holdDID, recordBytes)
549549-}
550550-551551-// processCrew processes a crew record event from a hold's PDS
552552-func (w *Worker) processCrew(commit *CommitEvent) error {
553553- holdDID := commit.DID // The repo DID IS the hold DID
554554-555555- if commit.Operation == "delete" {
556556- // Delete crew member by rkey
557557- if err := db.DeleteCrewMemberByRkey(w.db, holdDID, commit.RKey); err != nil {
558558- return fmt.Errorf("failed to delete crew member: %w", err)
559559- }
560560- slog.Info("Deleted crew member from hold", "hold_did", holdDID, "rkey", commit.RKey)
561561- return nil
562562- }
563563-564564- // Parse crew record
565565- if commit.Record == nil {
566566- return nil
567567- }
568568-569569- // Marshal map to bytes for processing
570570- recordBytes, err := json.Marshal(commit.Record)
571571- if err != nil {
572572- return fmt.Errorf("failed to marshal crew record: %w", err)
573573- }
574574-575575- // Use shared processor - pass rkey for storage
576576- return w.processor.ProcessCrew(context.Background(), holdDID, commit.RKey, recordBytes)
577577-}
578578-579354// processIdentity processes an identity event (handle change)
580355func (w *Worker) processIdentity(event *JetstreamEvent) error {
581356 if event.Identity == nil {
···596371 account := event.Account
597372 // Process via shared processor (only ATCR users will be logged at Info level)
598373 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status)
374374+}
375375+376376+// isATCRCollection returns true if the collection is one we process
377377+func isATCRCollection(collection string) bool {
378378+ switch collection {
379379+ case atproto.ManifestCollection,
380380+ atproto.TagCollection,
381381+ atproto.StarCollection,
382382+ atproto.RepoPageCollection,
383383+ atproto.SailorProfileCollection,
384384+ atproto.StatsCollection,
385385+ atproto.CaptainCollection,
386386+ atproto.CrewCollection:
387387+ return true
388388+ default:
389389+ return false
390390+ }
599391}
600392601393// JetstreamEvent represents a Jetstream event