···11+package db
22+33+import (
44+ "context"
55+ "database/sql"
66+ "database/sql/driver"
77+ "errors"
88+ "strings"
99+)
1010+1111+// poisonedTxSubstrings are error-message substrings emitted when go-libsql or the
1212+// remote libsql server leaves a connection in a state that cannot safely be reused.
1313+// Most come from Bunny Database killing a transaction that exceeded its server-side
1414+// timeout; the follow-on COMMIT then sees the connection in a poisoned state.
1515+var poisonedTxSubstrings = []string{
1616+ "Transaction timed-out",
1717+ "no transaction is active",
1818+ "connection has reached an invalid state",
1919+ "invalid state, started with",
2020+}
2121+2222+// IsPoisonedTxErr reports whether err indicates the underlying connection is no
2323+// longer usable for further statements. Callers should evict the connection from
2424+// the pool when this returns true.
2525+func IsPoisonedTxErr(err error) bool {
2626+ if err == nil {
2727+ return false
2828+ }
2929+ msg := err.Error()
3030+ for _, s := range poisonedTxSubstrings {
3131+ if strings.Contains(msg, s) {
3232+ return true
3333+ }
3434+ }
3535+ return false
3636+}
3737+3838+// ExecResilient borrows a dedicated connection from db, runs fn against it, and
3939+// evicts the connection from the pool when fn returns a poisoned-transaction
4040+// error. The connection is always released via Close.
4141+//
4242+// Poison eviction works by returning driver.ErrBadConn from within conn.Raw:
4343+// database/sql treats that as a signal to discard the underlying driver conn
4444+// rather than returning it to the idle pool.
4545+//
4646+// ExecResilient does NOT retry. Callers wrap the call in their own retry policy
4747+// when that is desired (for example, a single retry on the live Jetstream path).
4848+func ExecResilient(ctx context.Context, db *sql.DB, fn func(*sql.Conn) error) error {
4949+ conn, err := db.Conn(ctx)
5050+ if err != nil {
5151+ return err
5252+ }
5353+ defer conn.Close()
5454+5555+ execErr := fn(conn)
5656+ if IsPoisonedTxErr(execErr) {
5757+ // Discard the underlying driver conn so it never serves another caller.
5858+ // The Raw callback's return value is what triggers eviction; we ignore
5959+ // any error from Raw itself.
6060+ _ = conn.Raw(func(any) error { return driver.ErrBadConn })
6161+ }
6262+ return execErr
6363+}
6464+6565+// ErrNoPoolConn is returned by ExecResilient when a connection cannot be
6666+// obtained from the pool (e.g. context cancelled). It wraps the underlying
6767+// pool error for callers that want to distinguish pool-exhaustion from
6868+// statement-level errors.
6969+var ErrNoPoolConn = errors.New("db: failed to acquire pool connection")
+28
pkg/appview/db/conn_test.go
···11+package db
22+33+import (
44+ "errors"
55+ "testing"
66+)
77+88+func TestIsPoisonedTxErr(t *testing.T) {
99+ cases := []struct {
1010+ name string
1111+ err error
1212+ want bool
1313+ }{
1414+ {"nil", nil, false},
1515+ {"unrelated", errors.New("disk full"), false},
1616+ {"bunny timeout", errors.New("Remote SQlite failure: `2:0:Transaction timed-out`"), true},
1717+ {"no active tx", errors.New("Remote SQlite failure: `3:1:cannot commit - no transaction is active`"), true},
1818+ {"init state", errors.New("error code = 2: Error executing statement: connection has reached an invalid state, started with Init"), true},
1919+ {"just invalid state", errors.New("generic failure: invalid state, started with Query"), true},
2020+ }
2121+ for _, c := range cases {
2222+ t.Run(c.name, func(t *testing.T) {
2323+ if got := IsPoisonedTxErr(c.err); got != c.want {
2424+ t.Errorf("IsPoisonedTxErr(%v) = %v, want %v", c.err, got, c.want)
2525+ }
2626+ })
2727+ }
2828+}
+33
pkg/appview/db/jetstream_cursor.go
···11+package db
22+33+import (
44+ "database/sql"
55+ "errors"
66+)
77+88+// GetJetstreamCursor returns the last persisted Jetstream cursor (time_us).
99+// Returns 0 when no cursor has been saved yet (e.g. fresh database).
1010+func GetJetstreamCursor(db DBTX) (int64, error) {
1111+ var cursor int64
1212+ err := db.QueryRow(`SELECT cursor FROM jetstream_cursor WHERE id = 1`).Scan(&cursor)
1313+ if errors.Is(err, sql.ErrNoRows) {
1414+ return 0, nil
1515+ }
1616+ if err != nil {
1717+ return 0, err
1818+ }
1919+ return cursor, nil
2020+}
2121+2222+// SaveJetstreamCursor writes the given cursor to the singleton jetstream_cursor row.
2323+// Idempotent — safe to call on every tick.
2424+func SaveJetstreamCursor(db DBTX, cursor int64) error {
2525+ _, err := db.Exec(`
2626+ INSERT INTO jetstream_cursor (id, cursor, updated_at)
2727+ VALUES (1, ?, CURRENT_TIMESTAMP)
2828+ ON CONFLICT(id) DO UPDATE SET
2929+ cursor = excluded.cursor,
3030+ updated_at = excluded.updated_at
3131+ `, cursor)
3232+ return err
3333+}
···11+description: Persist Jetstream cursor so reconnects resume from last processed event
22+query: |
33+ CREATE TABLE IF NOT EXISTS jetstream_cursor (
44+ id INTEGER PRIMARY KEY CHECK (id = 1),
55+ cursor INTEGER NOT NULL,
66+ updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
77+ );
+518
pkg/appview/jetstream/backfill_batch.go
···11+package jetstream
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "fmt"
77+ "log/slog"
88+ "strings"
99+ "time"
1010+1111+ "atcr.io/pkg/appview/db"
1212+ "atcr.io/pkg/atproto"
1313+)
1414+1515+// batchManifests decodes all manifest records for a repo and writes them as
1616+// a small set of multi-row INSERTs: one per table (manifests, layers,
1717+// manifest_references, repository_annotations). This replaces the previous
1818+// per-record chunked-transaction loop, which exceeded Bunny Database's
1919+// remote transaction timeout once chunks grew large.
2020+//
2121+// Returns the number of manifest records that were successfully decoded and
2222+// included in the batch. Decode/validation failures are logged and skipped.
2323+func (b *BackfillWorker) batchManifests(ctx context.Context, did string, records []atproto.Record) (int, error) {
2424+ if len(records) == 0 {
2525+ return 0, nil
2626+ }
2727+2828+ type decoded struct {
2929+ manifestRecord atproto.ManifestRecord
3030+ manifest db.Manifest
3131+ }
3232+3333+ decodedRecords := make([]decoded, 0, len(records))
3434+ for i := range records {
3535+ r := &records[i]
3636+ var mr atproto.ManifestRecord
3737+ if err := json.Unmarshal(r.Value, &mr); err != nil {
3838+ slog.Warn("Backfill skipping invalid manifest record", "uri", r.URI, "error", err)
3939+ continue
4040+ }
4141+ if mr.Digest == "" || mr.Repository == "" {
4242+ slog.Warn("Backfill skipping manifest with missing fields", "uri", r.URI)
4343+ continue
4444+ }
4545+4646+ // Resolve holdDID the same way the single-record path does.
4747+ holdDID := mr.HoldDID
4848+ if holdDID == "" && mr.HoldEndpoint != "" {
4949+ if resolved, err := atproto.ResolveHoldDID(ctx, mr.HoldEndpoint); err == nil {
5050+ holdDID = resolved
5151+ }
5252+ }
5353+5454+ isList := len(mr.Manifests) > 0
5555+ artifactType := "container-image"
5656+ if !isList && mr.Config != nil {
5757+ artifactType = db.GetArtifactType(mr.Config.MediaType)
5858+ }
5959+6060+ m := db.Manifest{
6161+ DID: did,
6262+ Repository: mr.Repository,
6363+ Digest: mr.Digest,
6464+ MediaType: mr.MediaType,
6565+ SchemaVersion: mr.SchemaVersion,
6666+ HoldEndpoint: holdDID,
6767+ ArtifactType: artifactType,
6868+ CreatedAt: mr.CreatedAt,
6969+ }
7070+ if !isList && mr.Config != nil {
7171+ m.ConfigDigest = mr.Config.Digest
7272+ m.ConfigSize = mr.Config.Size
7373+ }
7474+ if mr.Subject != nil {
7575+ m.SubjectDigest = mr.Subject.Digest
7676+ }
7777+ decodedRecords = append(decodedRecords, decoded{mr, m})
7878+ }
7979+8080+ if len(decodedRecords) == 0 {
8181+ return 0, nil
8282+ }
8383+8484+ // Phase 1: upsert all manifests in one batch, fetch ids.
8585+ manifests := make([]db.Manifest, len(decodedRecords))
8686+ for i, d := range decodedRecords {
8787+ manifests[i] = d.manifest
8888+ }
8989+ ids, err := db.BatchInsertManifests(b.db, manifests)
9090+ if err != nil {
9191+ return 0, fmt.Errorf("batch insert manifests: %w", err)
9292+ }
9393+9494+ // Phase 2: derive layers, references, and annotations using the returned ids.
9595+ var (
9696+ layerRows []db.Layer
9797+ refRows []db.ManifestReference
9898+ )
9999+100100+ // For annotations, we keep only the newest manifest per (did, repo) with a
101101+ // non-empty annotation set. Matches reconcileAnnotations semantics at
102102+ // backfill.go:573.
103103+ type newest struct {
104104+ createdAt time.Time
105105+ annotations map[string]string
106106+ }
107107+ newestByRepo := make(map[string]newest)
108108+109109+ for _, d := range decodedRecords {
110110+ mid, ok := ids[db.ManifestKey(did, d.manifest.Repository, d.manifest.Digest)]
111111+ if !ok {
112112+ // BatchInsertManifests did not return an id for this row — either the
113113+ // row was constraint-rejected or the SELECT missed it. Skip its
114114+ // dependent rows rather than inserting with id 0.
115115+ slog.Warn("Backfill manifest missing id after batch insert",
116116+ "did", did, "repository", d.manifest.Repository, "digest", d.manifest.Digest)
117117+ continue
118118+ }
119119+120120+ if len(d.manifestRecord.Manifests) > 0 {
121121+ for i, ref := range d.manifestRecord.Manifests {
122122+ var pa, po, pv, pov string
123123+ if ref.Platform != nil {
124124+ pa = ref.Platform.Architecture
125125+ po = ref.Platform.OS
126126+ pv = ref.Platform.Variant
127127+ pov = ref.Platform.OSVersion
128128+ }
129129+ isAttestation := false
130130+ if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
131131+ isAttestation = refType == "attestation-manifest"
132132+ }
133133+ refRows = append(refRows, db.ManifestReference{
134134+ ManifestID: mid,
135135+ Digest: ref.Digest,
136136+ MediaType: ref.MediaType,
137137+ Size: ref.Size,
138138+ PlatformArchitecture: pa,
139139+ PlatformOS: po,
140140+ PlatformVariant: pv,
141141+ PlatformOSVersion: pov,
142142+ IsAttestation: isAttestation,
143143+ ReferenceIndex: i,
144144+ })
145145+ }
146146+ } else {
147147+ for i, layer := range d.manifestRecord.Layers {
148148+ layerRows = append(layerRows, db.Layer{
149149+ ManifestID: mid,
150150+ Digest: layer.Digest,
151151+ MediaType: layer.MediaType,
152152+ Size: layer.Size,
153153+ LayerIndex: i,
154154+ Annotations: layer.Annotations,
155155+ })
156156+ }
157157+ }
158158+159159+ if hasNonEmpty(d.manifestRecord.Annotations) {
160160+ key := d.manifest.Repository
161161+ prev, ok := newestByRepo[key]
162162+ if !ok || d.manifestRecord.CreatedAt.After(prev.createdAt) {
163163+ newestByRepo[key] = newest{d.manifestRecord.CreatedAt, d.manifestRecord.Annotations}
164164+ }
165165+ }
166166+ }
167167+168168+ if err := db.BatchInsertLayers(b.db, layerRows); err != nil {
169169+ return 0, err
170170+ }
171171+ if err := db.BatchInsertManifestReferences(b.db, refRows); err != nil {
172172+ return 0, err
173173+ }
174174+175175+ // Flatten annotations into AnnotationRows.
176176+ var annotationRows []db.AnnotationRow
177177+ for repo, n := range newestByRepo {
178178+ for k, v := range n.annotations {
179179+ if v == "" {
180180+ continue
181181+ }
182182+ annotationRows = append(annotationRows, db.AnnotationRow{
183183+ DID: did,
184184+ Repository: repo,
185185+ Key: k,
186186+ Value: v,
187187+ })
188188+ }
189189+ }
190190+ if err := db.BatchUpsertRepositoryAnnotations(b.db, annotationRows); err != nil {
191191+ return 0, err
192192+ }
193193+194194+ slog.Info("Backfill batch manifests",
195195+ "did", did,
196196+ "manifests", len(manifests),
197197+ "layers", len(layerRows),
198198+ "references", len(refRows),
199199+ "annotations", len(annotationRows))
200200+201201+ return len(decodedRecords), nil
202202+}
203203+204204+func hasNonEmpty(m map[string]string) bool {
205205+ for _, v := range m {
206206+ if v != "" {
207207+ return true
208208+ }
209209+ }
210210+ return false
211211+}
212212+213213+// batchTags decodes tag records and writes them in one multi-row upsert.
214214+func (b *BackfillWorker) batchTags(did string, records []atproto.Record) (int, error) {
215215+ tags := make([]db.Tag, 0, len(records))
216216+ for i := range records {
217217+ r := &records[i]
218218+ var tr atproto.TagRecord
219219+ if err := json.Unmarshal(r.Value, &tr); err != nil {
220220+ slog.Warn("Backfill skipping invalid tag record", "uri", r.URI, "error", err)
221221+ continue
222222+ }
223223+ digest, err := tr.GetManifestDigest()
224224+ if err != nil {
225225+ slog.Warn("Backfill skipping tag record without digest", "uri", r.URI, "error", err)
226226+ continue
227227+ }
228228+ if tr.Repository == "" || tr.Tag == "" {
229229+ continue
230230+ }
231231+ tags = append(tags, db.Tag{
232232+ DID: did,
233233+ Repository: tr.Repository,
234234+ Tag: tr.Tag,
235235+ Digest: digest,
236236+ CreatedAt: tr.UpdatedAt,
237237+ })
238238+ }
239239+ if err := db.BatchUpsertTags(b.db, tags); err != nil {
240240+ return 0, err
241241+ }
242242+ slog.Info("Backfill batch tags", "did", did, "rows", len(tags))
243243+ return len(tags), nil
244244+}
245245+246246+// batchStars decodes star records and writes them in one multi-row upsert.
247247+// Ensures star subject owners exist as users first (FK requirement).
248248+func (b *BackfillWorker) batchStars(ctx context.Context, did string, records []atproto.Record) (int, error) {
249249+ stars := make([]db.StarInput, 0, len(records))
250250+ ownerDIDs := make(map[string]struct{})
251251+252252+ for i := range records {
253253+ r := &records[i]
254254+ var sr atproto.StarRecord
255255+ if err := json.Unmarshal(r.Value, &sr); err != nil {
256256+ slog.Warn("Backfill skipping invalid star record", "uri", r.URI, "error", err)
257257+ continue
258258+ }
259259+ owner, repo, err := sr.GetSubjectDIDAndRepository()
260260+ if err != nil {
261261+ slog.Warn("Backfill skipping star with bad subject", "uri", r.URI, "error", err)
262262+ continue
263263+ }
264264+ ownerDIDs[owner] = struct{}{}
265265+ stars = append(stars, db.StarInput{
266266+ StarrerDID: did,
267267+ OwnerDID: owner,
268268+ Repository: repo,
269269+ CreatedAt: sr.CreatedAt,
270270+ })
271271+ }
272272+273273+ // Ensure every star subject has a users row (FK to users.did on stars).
274274+ // These calls are idempotent and cached, so repeated owners cost nothing.
275275+ for owner := range ownerDIDs {
276276+ if err := b.processor.EnsureUserExists(ctx, owner); err != nil {
277277+ slog.Warn("Backfill failed to ensure star subject user", "owner_did", owner, "error", err)
278278+ }
279279+ }
280280+281281+ if err := db.BatchUpsertStars(b.db, stars); err != nil {
282282+ return 0, err
283283+ }
284284+ slog.Info("Backfill batch stars", "did", did, "rows", len(stars))
285285+ return len(stars), nil
286286+}
287287+288288+// batchRepoPages decodes repo page records and writes them in one upsert.
289289+func (b *BackfillWorker) batchRepoPages(did string, records []atproto.Record) (int, error) {
290290+ pages := make([]db.RepoPage, 0, len(records))
291291+ for i := range records {
292292+ r := &records[i]
293293+ var pr atproto.RepoPageRecord
294294+ if err := json.Unmarshal(r.Value, &pr); err != nil {
295295+ slog.Warn("Backfill skipping invalid repo page", "uri", r.URI, "error", err)
296296+ continue
297297+ }
298298+ if pr.Repository == "" {
299299+ continue
300300+ }
301301+ avatarCID := ""
302302+ if pr.Avatar != nil && pr.Avatar.Ref.Link != "" {
303303+ avatarCID = pr.Avatar.Ref.Link
304304+ }
305305+ pages = append(pages, db.RepoPage{
306306+ DID: did,
307307+ Repository: pr.Repository,
308308+ Description: pr.Description,
309309+ AvatarCID: avatarCID,
310310+ UserEdited: pr.UserEdited,
311311+ CreatedAt: pr.CreatedAt,
312312+ UpdatedAt: pr.UpdatedAt,
313313+ })
314314+ }
315315+ if err := db.BatchUpsertRepoPages(b.db, pages); err != nil {
316316+ return 0, err
317317+ }
318318+ slog.Info("Backfill batch repo pages", "did", did, "rows", len(pages))
319319+ return len(pages), nil
320320+}
321321+322322+// batchDailyStats decodes daily stats records and writes them in one upsert.
323323+// Ensures every distinct owner exists as a user first (FK requirement).
324324+func (b *BackfillWorker) batchDailyStats(ctx context.Context, holdDID string, records []atproto.Record) (int, error) {
325325+ stats := make([]db.DailyStats, 0, len(records))
326326+ ownerDIDs := make(map[string]struct{})
327327+328328+ for i := range records {
329329+ r := &records[i]
330330+ var dr atproto.DailyStatsRecord
331331+ if err := json.Unmarshal(r.Value, &dr); err != nil {
332332+ slog.Warn("Backfill skipping invalid daily stats", "uri", r.URI, "error", err)
333333+ continue
334334+ }
335335+ if dr.OwnerDID == "" || dr.Repository == "" || dr.Date == "" {
336336+ continue
337337+ }
338338+ ownerDIDs[dr.OwnerDID] = struct{}{}
339339+ stats = append(stats, db.DailyStats{
340340+ DID: dr.OwnerDID,
341341+ Repository: dr.Repository,
342342+ Date: dr.Date,
343343+ PullCount: int(dr.PullCount),
344344+ PushCount: int(dr.PushCount),
345345+ })
346346+ }
347347+348348+ for owner := range ownerDIDs {
349349+ if err := b.processor.EnsureUserExists(ctx, owner); err != nil {
350350+ slog.Warn("Backfill failed to ensure daily stats owner user", "owner_did", owner, "error", err)
351351+ }
352352+ }
353353+354354+ if err := db.BatchUpsertDailyStats(b.db, stats); err != nil {
355355+ return 0, err
356356+ }
357357+ slog.Info("Backfill batch daily stats", "hold_did", holdDID, "rows", len(stats))
358358+ return len(stats), nil
359359+}
360360+361361+// batchStats updates the in-memory stats cache from a hold's stats records,
362362+// then flushes the aggregated view of every touched (owner, repo) to the
363363+// repository_stats table in a single multi-row upsert. Aggregation is across
364364+// all holds known to the cache, preserving the single-record semantics.
365365+func (b *BackfillWorker) batchStats(ctx context.Context, holdDID string, records []atproto.Record) (int, error) {
366366+ type key struct{ owner, repo string }
367367+ touched := make(map[key]struct{})
368368+ ownerDIDs := make(map[string]struct{})
369369+370370+ for i := range records {
371371+ r := &records[i]
372372+ var sr atproto.StatsRecord
373373+ if err := json.Unmarshal(r.Value, &sr); err != nil {
374374+ slog.Warn("Backfill skipping invalid stats record", "uri", r.URI, "error", err)
375375+ continue
376376+ }
377377+ if sr.OwnerDID == "" || sr.Repository == "" {
378378+ continue
379379+ }
380380+381381+ var lastPull, lastPush *time.Time
382382+ if sr.LastPull != "" {
383383+ if t, err := time.Parse(time.RFC3339, sr.LastPull); err == nil {
384384+ lastPull = &t
385385+ }
386386+ }
387387+ if sr.LastPush != "" {
388388+ if t, err := time.Parse(time.RFC3339, sr.LastPush); err == nil {
389389+ lastPush = &t
390390+ }
391391+ }
392392+393393+ b.processor.statsCache.Update(holdDID, sr.OwnerDID, sr.Repository,
394394+ sr.PullCount, sr.PushCount, lastPull, lastPush)
395395+ touched[key{sr.OwnerDID, sr.Repository}] = struct{}{}
396396+ ownerDIDs[sr.OwnerDID] = struct{}{}
397397+ }
398398+399399+ for owner := range ownerDIDs {
400400+ if err := b.processor.EnsureUserExists(ctx, owner); err != nil {
401401+ slog.Warn("Backfill failed to ensure stats owner user", "owner_did", owner, "error", err)
402402+ }
403403+ }
404404+405405+ // Build aggregated rows from the cache.
406406+ rows := make([]db.RepositoryStats, 0, len(touched))
407407+ for k := range touched {
408408+ totalPull, totalPush, latestPull, latestPush := b.processor.statsCache.GetAggregated(k.owner, k.repo)
409409+ rows = append(rows, db.RepositoryStats{
410410+ DID: k.owner,
411411+ Repository: k.repo,
412412+ PullCount: int(totalPull),
413413+ PushCount: int(totalPush),
414414+ LastPull: latestPull,
415415+ LastPush: latestPush,
416416+ })
417417+ }
418418+ if err := db.BatchUpsertRepositoryStats(b.db, rows); err != nil {
419419+ return 0, err
420420+ }
421421+ slog.Info("Backfill batch stats", "hold_did", holdDID, "rows", len(rows))
422422+ return len(rows), nil
423423+}
424424+425425+// batchCaptains decodes captain records and writes them in one upsert.
426426+func (b *BackfillWorker) batchCaptains(holdDID string, records []atproto.Record) (int, error) {
427427+ captains := make([]db.HoldCaptainRecord, 0, len(records))
428428+ now := time.Now()
429429+ for i := range records {
430430+ r := &records[i]
431431+ var cr atproto.CaptainRecord
432432+ if err := json.Unmarshal(r.Value, &cr); err != nil {
433433+ slog.Warn("Backfill skipping invalid captain record", "uri", r.URI, "error", err)
434434+ continue
435435+ }
436436+ if cr.Owner == "" || !strings.HasPrefix(cr.Owner, "did:") {
437437+ slog.Warn("Backfill skipping captain with invalid owner", "uri", r.URI)
438438+ continue
439439+ }
440440+ // Captain rkey is the hold DID (collections are stored on each hold's PDS,
441441+ // so record.URI already encodes the hold DID in the authority segment).
442442+ recordHoldDID := extractDIDFromURI(r.URI)
443443+ if recordHoldDID == "" {
444444+ recordHoldDID = holdDID
445445+ }
446446+ captains = append(captains, db.HoldCaptainRecord{
447447+ HoldDID: recordHoldDID,
448448+ OwnerDID: cr.Owner,
449449+ Public: cr.Public,
450450+ AllowAllCrew: cr.AllowAllCrew,
451451+ DeployedAt: cr.DeployedAt,
452452+ Region: cr.Region,
453453+ Successor: cr.Successor,
454454+ UpdatedAt: now,
455455+ })
456456+ }
457457+ if err := db.BatchUpsertCaptainRecords(b.db, captains); err != nil {
458458+ return 0, err
459459+ }
460460+ slog.Info("Backfill batch captains", "rows", len(captains))
461461+ return len(captains), nil
462462+}
463463+464464+// batchCrew decodes crew records and writes them in one upsert.
465465+func (b *BackfillWorker) batchCrew(holdDID string, records []atproto.Record) (int, error) {
466466+ members := make([]db.CrewMember, 0, len(records))
467467+ for i := range records {
468468+ r := &records[i]
469469+ var cr atproto.CrewRecord
470470+ if err := json.Unmarshal(r.Value, &cr); err != nil {
471471+ slog.Warn("Backfill skipping invalid crew record", "uri", r.URI, "error", err)
472472+ continue
473473+ }
474474+ if cr.Member == "" || !strings.HasPrefix(cr.Member, "did:") {
475475+ slog.Warn("Backfill skipping crew with invalid member", "uri", r.URI)
476476+ continue
477477+ }
478478+ recordHoldDID := extractDIDFromURI(r.URI)
479479+ if recordHoldDID == "" {
480480+ recordHoldDID = holdDID
481481+ }
482482+ permsJSON := ""
483483+ if len(cr.Permissions) > 0 {
484484+ if b, err := json.Marshal(cr.Permissions); err == nil {
485485+ permsJSON = string(b)
486486+ }
487487+ }
488488+ rkey := extractRkeyFromURI(r.URI)
489489+ members = append(members, db.CrewMember{
490490+ HoldDID: recordHoldDID,
491491+ MemberDID: cr.Member,
492492+ Rkey: rkey,
493493+ Role: cr.Role,
494494+ Permissions: permsJSON,
495495+ Tier: cr.Tier,
496496+ AddedAt: cr.AddedAt,
497497+ })
498498+ }
499499+ if err := db.BatchUpsertCrewMembers(b.db, members); err != nil {
500500+ return 0, err
501501+ }
502502+ slog.Info("Backfill batch crew", "hold_did", holdDID, "rows", len(members))
503503+ return len(members), nil
504504+}
505505+506506+// extractDIDFromURI pulls the DID authority segment out of an AT-URI.
507507+// Format: at://did:…/collection/rkey → "did:…".
508508+func extractDIDFromURI(uri string) string {
509509+ const prefix = "at://"
510510+ if !strings.HasPrefix(uri, prefix) {
511511+ return ""
512512+ }
513513+ rest := uri[len(prefix):]
514514+ if slash := strings.IndexByte(rest, '/'); slash >= 0 {
515515+ return rest[:slash]
516516+ }
517517+ return rest
518518+}