···11package db
2233-import (
44- "database/sql"
55- "time"
66-)
33+import "time"
7485// GetRepositoryAnnotations retrieves all annotations for a repository
99-func GetRepositoryAnnotations(db *sql.DB, did, repository string) (map[string]string, error) {
66+func GetRepositoryAnnotations(db DBTX, did, repository string) (map[string]string, error) {
107 rows, err := db.Query(`
118 SELECT key, value
129 FROM repository_annotations
···3027}
31283229// UpsertRepositoryAnnotations replaces all annotations for a repository
3333-// Only called when manifest has at least one non-empty annotation
3434-func UpsertRepositoryAnnotations(db *sql.DB, did, repository string, annotations map[string]string) error {
3535- tx, err := db.Begin()
3636- if err != nil {
3737- return err
3838- }
3939- defer tx.Rollback()
4040-3030+// Only called when manifest has at least one non-empty annotation.
3131+// Atomicity is provided by the caller's transaction when used during backfill.
3232+func UpsertRepositoryAnnotations(db DBTX, did, repository string, annotations map[string]string) error {
4133 // Delete existing annotations
4242- _, err = tx.Exec(`
3434+ _, err := db.Exec(`
4335 DELETE FROM repository_annotations
4436 WHERE did = ? AND repository = ?
4537 `, did, repository)
···4840 }
49415042 // Insert new annotations
5151- stmt, err := tx.Prepare(`
4343+ stmt, err := db.Prepare(`
5244 INSERT INTO repository_annotations (did, repository, key, value, updated_at)
5345 VALUES (?, ?, ?, ?, ?)
5446 `)
···6557 }
6658 }
67596868- return tx.Commit()
6060+ return nil
6961}
70627163// DeleteRepositoryAnnotations removes all annotations for a repository
7272-func DeleteRepositoryAnnotations(db *sql.DB, did, repository string) error {
6464+func DeleteRepositoryAnnotations(db DBTX, did, repository string) error {
7365 _, err := db.Exec(`
7466 DELETE FROM repository_annotations
7567 WHERE did = ? AND repository = ?
+13
pkg/appview/db/dbtx.go
···11+package db
22+33+import "database/sql"
44+55+// DBTX is an interface satisfied by both *sql.DB and *sql.Tx.
66+// All query functions in this package accept DBTX to allow callers
77+// to choose whether operations run in a transaction or standalone.
88+type DBTX interface {
99+ Exec(query string, args ...any) (sql.Result, error)
1010+ Query(query string, args ...any) (*sql.Rows, error)
1111+ QueryRow(query string, args ...any) *sql.Row
1212+ Prepare(query string) (*sql.Stmt, error)
1313+}
+2-3
pkg/appview/db/delete.go
···2233import (
44 "context"
55- "database/sql"
65 "fmt"
76 "log/slog"
87)
···1716//
1817// This should be called AFTER remote cleanup (hold services, PDS records)
1918// since we need the OAuth tokens to authenticate those requests.
2020-func DeleteUserDataFull(db *sql.DB, oauthStore *OAuthStore, did string) error {
1919+func DeleteUserDataFull(db DBTX, oauthStore *OAuthStore, did string) error {
2120 slog.Info("Starting full user data deletion", "did", did)
22212322 // 1. Delete non-cascading hold membership tables
···48474948// deleteHoldMembershipData deletes non-cascading hold membership tables.
5049// These tables don't have foreign keys to the users table.
5151-func deleteHoldMembershipData(db *sql.DB, did string) error {
5050+func deleteHoldMembershipData(db DBTX, did string) error {
5251 // Delete from hold_crew_approvals (where user is the approved member)
5352 result, err := db.Exec(`DELETE FROM hold_crew_approvals WHERE user_did = ?`, did)
5453 if err != nil {
+5-5
pkg/appview/db/export.go
···75757676// ExportUserData gathers all user data for GDPR export
7777// Only includes data we originate, not cached PDS data
7878-func ExportUserData(db *sql.DB, did string) (*UserDataExport, error) {
7878+func ExportUserData(db DBTX, did string) (*UserDataExport, error) {
7979 export := &UserDataExport{
8080 ExportedAt: time.Now().UTC(),
8181 ExportVersion: "1.0",
···128128}
129129130130// getDevicesForExport retrieves sanitized device records
131131-func getDevicesForExport(db *sql.DB, did string) ([]DeviceExport, error) {
131131+func getDevicesForExport(db DBTX, did string) ([]DeviceExport, error) {
132132 rows, err := db.Query(`
133133 SELECT id, name, ip_address, location, user_agent, created_at, last_used
134134 FROM devices
···169169}
170170171171// getOAuthSessionsForExport retrieves sanitized OAuth session records
172172-func getOAuthSessionsForExport(db *sql.DB, did string) ([]OAuthSessionExport, error) {
172172+func getOAuthSessionsForExport(db DBTX, did string) ([]OAuthSessionExport, error) {
173173 rows, err := db.Query(`
174174 SELECT session_id, created_at, updated_at
175175 FROM oauth_sessions
···199199}
200200201201// getUISessionsForExport retrieves sanitized UI session records
202202-func getUISessionsForExport(db *sql.DB, did string) ([]UISessionExport, error) {
202202+func getUISessionsForExport(db DBTX, did string) ([]UISessionExport, error) {
203203 rows, err := db.Query(`
204204 SELECT id, expires_at, created_at
205205 FROM ui_sessions
···229229}
230230231231// getHoldMembershipsForExport retrieves hold approval and denial records
232232-func getHoldMembershipsForExport(db *sql.DB, did string) (HoldMembershipsExport, error) {
232232+func getHoldMembershipsForExport(db DBTX, did string) (HoldMembershipsExport, error) {
233233 memberships := HoldMembershipsExport{
234234 Approvals: []HoldApprovalExport{},
235235 Denials: []HoldDenialExport{},
+9-9
pkg/appview/db/hold_store.go
···30303131// GetCaptainRecord retrieves a captain record from the cache
3232// Returns nil if not found (cache miss)
3333-func GetCaptainRecord(db *sql.DB, holdDID string) (*HoldCaptainRecord, error) {
3333+func GetCaptainRecord(db DBTX, holdDID string) (*HoldCaptainRecord, error) {
3434 query := `
3535 SELECT hold_did, owner_did, public, allow_all_crew,
3636 deployed_at, region, updated_at
···7171}
72727373// UpsertCaptainRecord inserts or updates a captain record in the cache
7474-func UpsertCaptainRecord(db *sql.DB, record *HoldCaptainRecord) error {
7474+func UpsertCaptainRecord(db DBTX, record *HoldCaptainRecord) error {
7575 query := `
7676 INSERT INTO hold_captain_records (
7777 hold_did, owner_did, public, allow_all_crew,
···104104}
105105106106// ListHoldDIDs returns all known hold DIDs from the cache
107107-func ListHoldDIDs(db *sql.DB) ([]string, error) {
107107+func ListHoldDIDs(db DBTX) ([]string, error) {
108108 query := `
109109 SELECT hold_did
110110 FROM hold_captain_records
···143143144144// GetCaptainRecordsForOwner retrieves all captain records where the user is the owner
145145// Used for GDPR export to find all holds owned by a user
146146-func GetCaptainRecordsForOwner(db *sql.DB, ownerDID string) ([]*HoldCaptainRecord, error) {
146146+func GetCaptainRecordsForOwner(db DBTX, ownerDID string) ([]*HoldCaptainRecord, error) {
147147 query := `
148148 SELECT hold_did, owner_did, public, allow_all_crew,
149149 deployed_at, region, updated_at
···198198}
199199200200// DeleteCaptainRecord removes a captain record from the cache
201201-func DeleteCaptainRecord(db *sql.DB, holdDID string) error {
201201+func DeleteCaptainRecord(db DBTX, holdDID string) error {
202202 // Note: hold_crew_members doesn't have CASCADE, so delete crew first
203203 _, err := db.Exec(`DELETE FROM hold_crew_members WHERE hold_did = ?`, holdDID)
204204 if err != nil {
···226226}
227227228228// UpsertCrewMember inserts or updates a crew member record
229229-func UpsertCrewMember(db *sql.DB, member *CrewMember) error {
229229+func UpsertCrewMember(db DBTX, member *CrewMember) error {
230230 query := `
231231 INSERT INTO hold_crew_members (
232232 hold_did, member_did, rkey, role, permissions, tier, added_at, updated_at
···257257}
258258259259// DeleteCrewMemberByRkey removes a crew member by rkey (for delete events from Jetstream)
260260-func DeleteCrewMemberByRkey(db *sql.DB, holdDID, rkey string) error {
260260+func DeleteCrewMemberByRkey(db DBTX, holdDID, rkey string) error {
261261 _, err := db.Exec(`DELETE FROM hold_crew_members WHERE hold_did = ? AND rkey = ?`, holdDID, rkey)
262262 if err != nil {
263263 return fmt.Errorf("failed to delete crew member by rkey: %w", err)
···278278279279// GetAvailableHolds returns all holds available to a user, grouped by membership type
280280// Results are ordered: owner first, then crew, then eligible, then public
281281-func GetAvailableHolds(db *sql.DB, userDID string) ([]AvailableHold, error) {
281281+func GetAvailableHolds(db DBTX, userDID string) ([]AvailableHold, error) {
282282 query := `
283283 SELECT
284284 h.hold_did,
···352352}
353353354354// GetCrewMemberships returns all holds where a user is a crew member
355355-func GetCrewMemberships(db *sql.DB, memberDID string) ([]CrewMember, error) {
355355+func GetCrewMemberships(db DBTX, memberDID string) ([]CrewMember, error) {
356356 query := `
357357 SELECT hold_did, member_did, rkey, role, permissions, tier, added_at, created_at, updated_at
358358 FROM hold_crew_members
+60-67
pkg/appview/db/queries.go
···55555656// SearchRepositories searches for repositories matching the query across handles, DIDs, repositories, and annotations
5757// Returns RepoCardData (one per repository) instead of individual pushes/tags
5858-func SearchRepositories(db *sql.DB, query string, limit, offset int, currentUserDID string) ([]RepoCardData, int, error) {
5858+func SearchRepositories(db DBTX, query string, limit, offset int, currentUserDID string) ([]RepoCardData, int, error) {
5959 // Escape LIKE wildcards so they're treated literally
6060 query = escapeLikePattern(query)
6161···181181}
182182183183// GetUserRepositories fetches all repositories for a user
184184-func GetUserRepositories(db *sql.DB, did string) ([]Repository, error) {
184184+func GetUserRepositories(db DBTX, did string) ([]Repository, error) {
185185 // Get repository summary
186186 rows, err := db.Query(`
187187 SELECT
···310310311311// GetRepositoryMetadata retrieves metadata for a repository from annotations table
312312// Returns a map of annotation key -> value for easy access in templates and handlers
313313-func GetRepositoryMetadata(db *sql.DB, did string, repository string) (map[string]string, error) {
313313+func GetRepositoryMetadata(db DBTX, did string, repository string) (map[string]string, error) {
314314 return GetRepositoryAnnotations(db, did, repository)
315315}
316316317317// GetUserByDID retrieves a user by DID
318318-func GetUserByDID(db *sql.DB, did string) (*User, error) {
318318+func GetUserByDID(db DBTX, did string) (*User, error) {
319319 var user User
320320 var avatar sql.NullString
321321 err := db.QueryRow(`
···340340}
341341342342// GetUserByHandle retrieves a user by handle
343343-func GetUserByHandle(db *sql.DB, handle string) (*User, error) {
343343+func GetUserByHandle(db DBTX, handle string) (*User, error) {
344344 var user User
345345 var avatar sql.NullString
346346 err := db.QueryRow(`
···365365}
366366367367// UpsertUser inserts or updates a user record
368368-func UpsertUser(db *sql.DB, user *User) error {
368368+func UpsertUser(db DBTX, user *User) error {
369369 _, err := db.Exec(`
370370 INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
371371 VALUES (?, ?, ?, ?, ?)
···380380381381// UpsertUserIgnoreAvatar inserts or updates a user record, but preserves existing avatar on update
382382// This is useful when avatar fetch fails, and we don't want to overwrite an existing avatar with empty string
383383-func UpsertUserIgnoreAvatar(db *sql.DB, user *User) error {
383383+func UpsertUserIgnoreAvatar(db DBTX, user *User) error {
384384 _, err := db.Exec(`
385385 INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
386386 VALUES (?, ?, ?, ?, ?)
···394394395395// UpdateUserLastSeen updates only the last_seen timestamp for a user
396396// This is more efficient than UpsertUser when only updating activity timestamp
397397-func UpdateUserLastSeen(db *sql.DB, did string) error {
397397+func UpdateUserLastSeen(db DBTX, did string) error {
398398 _, err := db.Exec(`
399399 UPDATE users SET last_seen = ? WHERE did = ?
400400 `, time.Now(), did)
···403403404404// UpdateUserHandle updates a user's handle when an identity change event is received
405405// This is called when Jetstream receives an identity event indicating a handle change
406406-func UpdateUserHandle(db *sql.DB, did string, newHandle string) error {
406406+func UpdateUserHandle(db DBTX, did string, newHandle string) error {
407407 _, err := db.Exec(`
408408 UPDATE users SET handle = ?, last_seen = ? WHERE did = ?
409409 `, newHandle, time.Now(), did)
···412412413413// UpdateUserAvatar updates a user's avatar URL when a profile change is detected
414414// This is called when Jetstream receives an app.bsky.actor.profile update
415415-func UpdateUserAvatar(db *sql.DB, did string, avatarURL string) error {
415415+func UpdateUserAvatar(db DBTX, did string, avatarURL string) error {
416416 _, err := db.Exec(`
417417 UPDATE users SET avatar = ?, last_seen = ? WHERE did = ?
418418 `, avatarURL, time.Now(), did)
···420420}
421421422422// GetManifestDigestsForDID returns all manifest digests for a DID
423423-func GetManifestDigestsForDID(db *sql.DB, did string) ([]string, error) {
423423+func GetManifestDigestsForDID(db DBTX, did string) ([]string, error) {
424424 rows, err := db.Query(`
425425 SELECT digest FROM manifests WHERE did = ?
426426 `, did)
···442442}
443443444444// DeleteManifestsNotInList deletes all manifests for a DID that are not in the provided list
445445-func DeleteManifestsNotInList(db *sql.DB, did string, keepDigests []string) error {
445445+func DeleteManifestsNotInList(db DBTX, did string, keepDigests []string) error {
446446 if len(keepDigests) == 0 {
447447 // No manifests to keep - delete all for this DID
448448 _, err := db.Exec(`DELETE FROM manifests WHERE did = ?`, did)
···467467}
468468469469// GetTagsForDID returns all (repository, tag) pairs for a DID
470470-func GetTagsForDID(db *sql.DB, did string) ([]struct{ Repository, Tag string }, error) {
470470+func GetTagsForDID(db DBTX, did string) ([]struct{ Repository, Tag string }, error) {
471471 rows, err := db.Query(`
472472 SELECT repository, tag FROM tags WHERE did = ?
473473 `, did)
···488488 return tags, rows.Err()
489489}
490490491491-// DeleteTagsNotInList deletes all tags for a DID that are not in the provided list
492492-func DeleteTagsNotInList(db *sql.DB, did string, keepTags []struct{ Repository, Tag string }) error {
491491+// DeleteTagsNotInList deletes all tags for a DID that are not in the provided list.
492492+// Atomicity is provided by the caller's transaction when used during backfill.
493493+func DeleteTagsNotInList(db DBTX, did string, keepTags []struct{ Repository, Tag string }) error {
493494 if len(keepTags) == 0 {
494495 // No tags to keep - delete all for this DID
495496 _, err := db.Exec(`DELETE FROM tags WHERE did = ?`, did)
496497 return err
497498 }
498499499499- // For tags, we need to check (repository, tag) pairs
500500- // Build a DELETE query that excludes the pairs we want to keep
501501- tx, err := db.Begin()
502502- if err != nil {
503503- return err
504504- }
505505- defer tx.Rollback()
506506-507500 // First, get all current tags
508508- rows, err := tx.Query(`SELECT id, repository, tag FROM tags WHERE did = ?`, did)
501501+ rows, err := db.Query(`SELECT id, repository, tag FROM tags WHERE did = ?`, did)
509502 if err != nil {
510503 return err
511504 }
···536529537530 // Delete tags not in keep list
538531 for _, id := range toDelete {
539539- if _, err := tx.Exec(`DELETE FROM tags WHERE id = ?`, id); err != nil {
532532+ if _, err := db.Exec(`DELETE FROM tags WHERE id = ?`, id); err != nil {
540533 return err
541534 }
542535 }
543536544544- return tx.Commit()
537537+ return nil
545538}
546539547540// InsertManifest inserts or updates a manifest record
548541// Uses UPSERT to update core metadata if manifest already exists
549542// Returns the manifest ID (works correctly for both insert and update)
550543// Note: Annotations are stored separately in repository_annotations table
551551-func InsertManifest(db *sql.DB, manifest *Manifest) (int64, error) {
544544+func InsertManifest(db DBTX, manifest *Manifest) (int64, error) {
552545 _, err := db.Exec(`
553546 INSERT INTO manifests
554547 (did, repository, digest, hold_endpoint, schema_version, media_type,
···584577}
585578586579// InsertLayer inserts a new layer record
587587-func InsertLayer(db *sql.DB, layer *Layer) error {
580580+func InsertLayer(db DBTX, layer *Layer) error {
588581 _, err := db.Exec(`
589582 INSERT INTO layers (manifest_id, digest, size, media_type, layer_index)
590583 VALUES (?, ?, ?, ?, ?)
···593586}
594587595588// UpsertTag inserts or updates a tag record
596596-func UpsertTag(db *sql.DB, tag *Tag) error {
589589+func UpsertTag(db DBTX, tag *Tag) error {
597590 _, err := db.Exec(`
598591 INSERT INTO tags (did, repository, tag, digest, created_at)
599592 VALUES (?, ?, ?, ?, ?)
···605598}
606599607600// DeleteTag deletes a tag record
608608-func DeleteTag(db *sql.DB, did, repository, tag string) error {
601601+func DeleteTag(db DBTX, did, repository, tag string) error {
609602 _, err := db.Exec(`
610603 DELETE FROM tags WHERE did = ? AND repository = ? AND tag = ?
611604 `, did, repository, tag)
···616609// Only multi-arch tags (manifest lists) have platform info in manifest_references
617610// Single-arch tags will have empty Platforms slice (platform is obvious for single-arch)
618611// Attestation references (unknown/unknown platforms) are filtered out but tracked via HasAttestations
619619-func GetTagsWithPlatforms(db *sql.DB, did, repository string) ([]TagWithPlatforms, error) {
612612+func GetTagsWithPlatforms(db DBTX, did, repository string) ([]TagWithPlatforms, error) {
620613 rows, err := db.Query(`
621614 SELECT
622615 t.id,
···700693701694// DeleteManifest deletes a manifest and its associated layers
702695// If repository is empty, deletes all manifests matching did and digest
703703-func DeleteManifest(db *sql.DB, did, repository, digest string) error {
696696+func DeleteManifest(db DBTX, did, repository, digest string) error {
704697 var err error
705698 if repository == "" {
706699 // Delete by DID + digest only (used when repository is unknown, e.g., Jetstream DELETE events)
···718711//
719712// Due to ON DELETE CASCADE in the schema, deleting from users will automatically
720713// cascade to: manifests, tags, layers, references, annotations, stars, repo_pages, etc.
721721-func DeleteUserData(db *sql.DB, did string) error {
714714+func DeleteUserData(db DBTX, did string) error {
722715 result, err := db.Exec(`DELETE FROM users WHERE did = ?`, did)
723716 if err != nil {
724717 return fmt.Errorf("failed to delete user: %w", err)
···735728736729// GetManifest fetches a single manifest by digest
737730// Note: Annotations are stored separately in repository_annotations table
738738-func GetManifest(db *sql.DB, digest string) (*Manifest, error) {
731731+func GetManifest(db DBTX, digest string) (*Manifest, error) {
739732 var m Manifest
740733741734 err := db.QueryRow(`
···756749757750// GetNewestManifestForRepo returns the newest manifest for a specific repository
758751// Used by backfill to ensure annotations come from the most recent manifest
759759-func GetNewestManifestForRepo(db *sql.DB, did, repository string) (*Manifest, error) {
752752+func GetNewestManifestForRepo(db DBTX, did, repository string) (*Manifest, error) {
760753 var m Manifest
761754 err := db.QueryRow(`
762755 SELECT id, did, repository, digest, hold_endpoint, schema_version, media_type,
···779772// GetLatestHoldDIDForRepo returns the hold DID from the most recent manifest for a repository
780773// Returns empty string if no manifests exist (e.g., first push)
781774// This is used instead of the in-memory cache to determine which hold to use for blob operations
782782-func GetLatestHoldDIDForRepo(db *sql.DB, did, repository string) (string, error) {
775775+func GetLatestHoldDIDForRepo(db DBTX, did, repository string) (string, error) {
783776 var holdDID string
784777 err := db.QueryRow(`
785778 SELECT hold_endpoint
···802795803796// GetRepositoriesForDID returns all unique repository names for a DID
804797// Used by backfill to reconcile annotations for all repositories
805805-func GetRepositoriesForDID(db *sql.DB, did string) ([]string, error) {
798798+func GetRepositoriesForDID(db DBTX, did string) ([]string, error) {
806799 rows, err := db.Query(`
807800 SELECT DISTINCT repository
808801 FROM manifests
···825818}
826819827820// GetLayersForManifest fetches all layers for a manifest
828828-func GetLayersForManifest(db *sql.DB, manifestID int64) ([]Layer, error) {
821821+func GetLayersForManifest(db DBTX, manifestID int64) ([]Layer, error) {
829822 rows, err := db.Query(`
830823 SELECT manifest_id, digest, size, media_type, layer_index
831824 FROM layers
···851844}
852845853846// InsertManifestReference inserts a new manifest reference record (for manifest lists/indexes)
854854-func InsertManifestReference(db *sql.DB, ref *ManifestReference) error {
847847+func InsertManifestReference(db DBTX, ref *ManifestReference) error {
855848 _, err := db.Exec(`
856849 INSERT INTO manifest_references (manifest_id, digest, size, media_type,
857850 platform_architecture, platform_os,
···866859}
867860868861// GetManifestReferencesForManifest fetches all manifest references for a manifest list/index
869869-func GetManifestReferencesForManifest(db *sql.DB, manifestID int64) ([]ManifestReference, error) {
862862+func GetManifestReferencesForManifest(db DBTX, manifestID int64) ([]ManifestReference, error) {
870863 rows, err := db.Query(`
871864 SELECT manifest_id, digest, size, media_type,
872865 platform_architecture, platform_os, platform_variant, platform_os_version,
···914907// GetTopLevelManifests returns only manifest lists and orphaned single-arch manifests
915908// Filters out platform-specific manifests that are referenced by manifest lists
916909// Note: Annotations are stored separately in repository_annotations table - use GetRepositoryMetadata to fetch them
917917-func GetTopLevelManifests(db *sql.DB, did, repository string, limit, offset int) ([]ManifestWithMetadata, error) {
910910+func GetTopLevelManifests(db DBTX, did, repository string, limit, offset int) ([]ManifestWithMetadata, error) {
918911 rows, err := db.Query(`
919912 WITH manifest_list_children AS (
920913 -- Get all digests that are children of manifest lists
···1047104010481041// GetManifestDetail returns a manifest with full platform details and tags
10491042// Note: Annotations are stored separately in repository_annotations table - use GetRepositoryMetadata to fetch them
10501050-func GetManifestDetail(db *sql.DB, did, repository, digest string) (*ManifestWithMetadata, error) {
10431043+func GetManifestDetail(db DBTX, did, repository, digest string) (*ManifestWithMetadata, error) {
10511044 // First, get the manifest and its tags
10521045 var m ManifestWithMetadata
10531046 var tags, configDigest sql.NullString
···11521145}
1153114611541147// GetFirehoseCursor retrieves the current firehose cursor
11551155-func GetFirehoseCursor(db *sql.DB) (int64, error) {
11481148+func GetFirehoseCursor(db DBTX) (int64, error) {
11561149 var cursor int64
11571150 err := db.QueryRow("SELECT cursor FROM firehose_cursor WHERE id = 1").Scan(&cursor)
11581151 if err == sql.ErrNoRows {
···11621155}
1163115611641157// UpdateFirehoseCursor updates the firehose cursor
11651165-func UpdateFirehoseCursor(db *sql.DB, cursor int64) error {
11581158+func UpdateFirehoseCursor(db DBTX, cursor int64) error {
11661159 _, err := db.Exec(`
11671160 INSERT INTO firehose_cursor (id, cursor, updated_at)
11681161 VALUES (1, ?, datetime('now'))
···11741167}
1175116811761169// IsManifestTagged checks if a manifest has any tags
11771177-func IsManifestTagged(db *sql.DB, did, repository, digest string) (bool, error) {
11701170+func IsManifestTagged(db DBTX, did, repository, digest string) (bool, error) {
11781171 var count int
11791172 err := db.QueryRow(`
11801173 SELECT COUNT(*) FROM tags
···11891182}
1190118311911184// GetManifestTags retrieves all tags for a manifest
11921192-func GetManifestTags(db *sql.DB, did, repository, digest string) ([]string, error) {
11851185+func GetManifestTags(db DBTX, did, repository, digest string) ([]string, error) {
11931186 rows, err := db.Query(`
11941187 SELECT tag FROM tags
11951188 WHERE did = ? AND repository = ? AND digest = ?
···12251218}
1226121912271220// GetBackfillState retrieves the backfill state
12281228-func GetBackfillState(db *sql.DB) (*BackfillState, error) {
12211221+func GetBackfillState(db DBTX) (*BackfillState, error) {
12291222 var state BackfillState
12301223 var updatedAtStr string
12311224···12631256}
1264125712651258// UpsertBackfillState updates or creates backfill state
12661266-func UpsertBackfillState(db *sql.DB, state *BackfillState) error {
12591259+func UpsertBackfillState(db DBTX, state *BackfillState) error {
12671260 _, err := db.Exec(`
12681261 INSERT INTO backfill_state (id, start_cursor, current_cursor, completed, updated_at)
12691262 VALUES (1, ?, ?, ?, datetime('now'))
···12771270}
1278127112791272// UpdateBackfillCursor updates just the current cursor position
12801280-func UpdateBackfillCursor(db *sql.DB, cursor int64) error {
12731273+func UpdateBackfillCursor(db DBTX, cursor int64) error {
12811274 _, err := db.Exec(`
12821275 UPDATE backfill_state
12831276 SET current_cursor = ?, updated_at = datetime('now')
···12871280}
1288128112891282// MarkBackfillCompleted marks the backfill as completed
12901290-func MarkBackfillCompleted(db *sql.DB) error {
12831283+func MarkBackfillCompleted(db DBTX) error {
12911284 _, err := db.Exec(`
12921285 UPDATE backfill_state
12931286 SET completed = 1, updated_at = datetime('now')
···12971290}
1298129112991292// GetRepository fetches a specific repository for a user
13001300-func GetRepository(db *sql.DB, did, repository string) (*Repository, error) {
12931293+func GetRepository(db DBTX, did, repository string) (*Repository, error) {
13011294 // Get repository summary
13021295 var r Repository
13031296 r.Name = repository
···14121405}
1413140614141407// GetRepositoryStats fetches stats for a repository
14151415-func GetRepositoryStats(db *sql.DB, did, repository string) (*RepositoryStats, error) {
14081408+func GetRepositoryStats(db DBTX, did, repository string) (*RepositoryStats, error) {
14161409 var stats RepositoryStats
14171410 var lastPullStr, lastPushStr sql.NullString
14181411···1463145614641457// UpsertRepositoryStats inserts or updates repository stats
14651458// Note: star_count is calculated dynamically from the stars table, not stored here
14661466-func UpsertRepositoryStats(db *sql.DB, stats *RepositoryStats) error {
14591459+func UpsertRepositoryStats(db DBTX, stats *RepositoryStats) error {
14671460 _, err := db.Exec(`
14681461 INSERT INTO repository_stats (did, repository, pull_count, last_pull, push_count, last_push)
14691462 VALUES (?, ?, ?, ?, ?, ?)
···14771470}
1478147114791472// UpsertStar inserts or updates a star record (idempotent)
14801480-func UpsertStar(db *sql.DB, starrerDID, ownerDID, repository string, createdAt time.Time) error {
14731473+func UpsertStar(db DBTX, starrerDID, ownerDID, repository string, createdAt time.Time) error {
14811474 _, err := db.Exec(`
14821475 INSERT INTO stars (starrer_did, owner_did, repository, created_at)
14831476 VALUES (?, ?, ?, ?)
···14881481}
1489148214901483// DeleteStar deletes a star record
14911491-func DeleteStar(db *sql.DB, starrerDID, ownerDID, repository string) error {
14841484+func DeleteStar(db DBTX, starrerDID, ownerDID, repository string) error {
14921485 _, err := db.Exec(`
14931486 DELETE FROM stars
14941487 WHERE starrer_did = ? AND owner_did = ? AND repository = ?
···14971490}
1498149114991492// RebuildStarCount rebuilds the star count for a specific repository from the stars table
15001500-func RebuildStarCount(db *sql.DB, ownerDID, repository string) error {
14931493+func RebuildStarCount(db DBTX, ownerDID, repository string) error {
15011494 _, err := db.Exec(`
15021495 INSERT INTO repository_stats (did, repository, star_count)
15031496 VALUES (?, ?, (
···1515150815161509// GetStarsForDID returns all stars created by a specific DID (for backfill reconciliation)
15171510// Returns a map of (ownerDID, repository) -> createdAt
15181518-func GetStarsForDID(db *sql.DB, starrerDID string) (map[string]time.Time, error) {
15111511+func GetStarsForDID(db DBTX, starrerDID string) (map[string]time.Time, error) {
15191512 rows, err := db.Query(`
15201513 SELECT owner_did, repository, created_at
15211514 FROM stars
···1542153515431536// CleanupOrphanedTags removes tags whose manifest digest no longer exists
15441537// This handles cases where manifests were deleted but tags pointing to them remain
15451545-func CleanupOrphanedTags(db *sql.DB, did string) error {
15381538+func CleanupOrphanedTags(db DBTX, did string) error {
15461539 _, err := db.Exec(`
15471540 DELETE FROM tags
15481541 WHERE did = ?
···1557155015581551// DeleteStarsNotInList deletes stars from the database that are not in the provided list
15591552// This is used during backfill reconciliation to remove stars that no longer exist on PDS
15601560-func DeleteStarsNotInList(db *sql.DB, starrerDID string, foundStars map[string]time.Time) error {
15531553+func DeleteStarsNotInList(db DBTX, starrerDID string, foundStars map[string]time.Time) error {
15611554 // Get current stars in DB
15621555 currentStars, err := GetStarsForDID(db, starrerDID)
15631556 if err != nil {
···16081601// HoldDIDDB wraps a sql.DB and implements the HoldDIDLookup interface for middleware
16091602// This is a minimal wrapper that only provides hold DID lookups for blob routing
16101603type HoldDIDDB struct {
16111611- db *sql.DB
16041604+ db DBTX
16121605}
1613160616141607// NewHoldDIDDB creates a new hold DID database wrapper
16151615-func NewHoldDIDDB(db *sql.DB) *HoldDIDDB {
16081608+func NewHoldDIDDB(db DBTX) *HoldDIDDB {
16161609 return &HoldDIDDB{db: db}
16171610}
16181611···16321625)
1633162616341627// GetRepoCards fetches repository cards with full data including Tag, Digest, and LastUpdated
16351635-func GetRepoCards(db *sql.DB, limit int, currentUserDID string, sortOrder RepoCardSortOrder) ([]RepoCardData, error) {
16281628+func GetRepoCards(db DBTX, limit int, currentUserDID string, sortOrder RepoCardSortOrder) ([]RepoCardData, error) {
16361629 // Build ORDER BY clause based on sort order
16371630 var orderBy string
16381631 switch sortOrder {
···17131706}
1714170717151708// GetUserRepoCards fetches repository cards for a specific user with full data
17161716-func GetUserRepoCards(db *sql.DB, userDID string, currentUserDID string) ([]RepoCardData, error) {
17091709+func GetUserRepoCards(db DBTX, userDID string, currentUserDID string) ([]RepoCardData, error) {
17171710 query := `
17181711 WITH latest_manifests AS (
17191712 SELECT did, repository, MAX(id) as latest_id
···17951788}
1796178917971790// UpsertRepoPage inserts or updates a repo page record
17981798-func UpsertRepoPage(db *sql.DB, did, repository, description, avatarCID string, createdAt, updatedAt time.Time) error {
17911791+func UpsertRepoPage(db DBTX, did, repository, description, avatarCID string, createdAt, updatedAt time.Time) error {
17991792 _, err := db.Exec(`
18001793 INSERT INTO repo_pages (did, repository, description, avatar_cid, created_at, updated_at)
18011794 VALUES (?, ?, ?, ?, ?, ?)
···18081801}
1809180218101803// GetRepoPage retrieves a repo page record
18111811-func GetRepoPage(db *sql.DB, did, repository string) (*RepoPage, error) {
18041804+func GetRepoPage(db DBTX, did, repository string) (*RepoPage, error) {
18121805 var rp RepoPage
18131806 err := db.QueryRow(`
18141807 SELECT did, repository, description, avatar_cid, created_at, updated_at
···18221815}
1823181618241817// DeleteRepoPage deletes a repo page record
18251825-func DeleteRepoPage(db *sql.DB, did, repository string) error {
18181818+func DeleteRepoPage(db DBTX, did, repository string) error {
18261819 _, err := db.Exec(`
18271820 DELETE FROM repo_pages WHERE did = ? AND repository = ?
18281821 `, did, repository)
···18301823}
1831182418321825// GetRepoPagesByDID returns all repo pages for a DID
18331833-func GetRepoPagesByDID(db *sql.DB, did string) ([]RepoPage, error) {
18261826+func GetRepoPagesByDID(db DBTX, did string) ([]RepoPage, error) {
18341827 rows, err := db.Query(`
18351828 SELECT did, repository, description, avatar_cid, created_at, updated_at
18361829 FROM repo_pages
+29-5
pkg/appview/jetstream/backfill.go
···181181 return nil
182182}
183183184184-// backfillRepo backfills all records for a single repo/DID
184184+// backfillRepo backfills all records for a single repo/DID.
185185+// Per-record processing is wrapped in a single SQL transaction to batch writes
186186+// (one commit per repo instead of per-statement).
185187func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) {
186188 // Resolve DID to get user's PDS endpoint
187189 pdsEndpoint, err := atproto.ResolveDIDToPDS(ctx, did)
···192194 // Create a client for this user's PDS with the user's DID
193195 // This allows GetRecord to work properly with the repo parameter
194196 pdsClient := atproto.NewClient(pdsEndpoint, did, "")
197197+198198+ // Begin transaction for per-record processing (batches all writes into one commit)
199199+ tx, err := b.db.Begin()
200200+ if err != nil {
201201+ return 0, fmt.Errorf("failed to begin transaction: %w", err)
202202+ }
203203+ defer tx.Rollback()
204204+205205+ // Create a transactional processor — all DB writes go through this tx
206206+ txProcessor := NewProcessor(tx, false, b.processor.statsCache)
195207196208 var recordCursor string
197209 recordCount := 0
···235247 }
236248 }
237249238238- if err := b.processRecord(ctx, did, collection, &record); err != nil {
250250+ if err := b.processRecordWith(ctx, txProcessor, did, collection, &record); err != nil {
239251 slog.Warn("Backfill failed to process record", "uri", record.URI, "error", err)
240252 continue
241253 }
···250262 recordCursor = cursor
251263 }
252264265265+ // Commit all per-record writes in one batch
266266+ if err := tx.Commit(); err != nil {
267267+ return 0, fmt.Errorf("failed to commit transaction: %w", err)
268268+ }
269269+270270+ // Reconciliation runs outside the transaction (involves network I/O and fewer writes)
271271+253272 // Reconcile deletions - remove records from DB that no longer exist on PDS
254273 if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags, foundStars); err != nil {
255274 slog.Warn("Backfill failed to reconcile deletions", "did", did, "error", err)
···334353 return nil
335354}
336355337337-// processRecord processes a single record using the unified ProcessRecord method.
338338-// This ensures consistent handling (validation, user creation) between Worker and Backfill.
356356+// processRecord processes a single record using the default processor.
339357func (b *BackfillWorker) processRecord(ctx context.Context, did, collection string, record *atproto.Record) error {
358358+ return b.processRecordWith(ctx, b.processor, did, collection, record)
359359+}
360360+361361+// processRecordWith processes a single record using the given processor.
362362+// This allows backfillRepo to use a transactional processor while other callers use the default.
363363+func (b *BackfillWorker) processRecordWith(ctx context.Context, proc *Processor, did, collection string, record *atproto.Record) error {
340364 rkey := extractRkeyFromURI(record.URI)
341365342366 // For sailor profile collection, we need to pass the queryCaptainFn
···346370 queryCaptainFn = b.queryCaptainRecordWrapper
347371 }
348372349349- return b.processor.ProcessRecord(ctx, did, collection, rkey, record.Value, false, queryCaptainFn)
373373+ return proc.ProcessRecord(ctx, did, collection, rkey, record.Value, false, queryCaptainFn)
350374}
351375352376// queryCaptainRecordWrapper wraps queryCaptainRecord with backfill-specific logic
+2-3
pkg/appview/jetstream/processor.go
···2233import (
44 "context"
55- "database/sql"
65 "encoding/json"
76 "fmt"
87 "log/slog"
···1817// Processor handles shared database operations for both Worker (live) and Backfill (sync)
1918// This eliminates code duplication between the two data ingestion paths
2019type Processor struct {
2121- db *sql.DB
2020+ db db.DBTX
2221 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
2322 statsCache *StatsCache // In-memory cache for per-hold stats aggregation
2423 useCache bool
···2827// NewProcessor creates a new shared processor
2928// useCache: true for Worker (live streaming), false for Backfill (batch processing)
3029// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
3131-func NewProcessor(database *sql.DB, useCache bool, statsCache *StatsCache) *Processor {
3030+func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor {
3231 // Create lexicon catalog for debug validation logging
3332 dir := identity.DefaultDirectory()
3433 catalog := lexicon.NewResolvingCatalog()