A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

optimize queries for admin panel

+324 -177
hold

This is a binary file and will not be displayed.

+1 -3
pkg/appview/db/hold_store.go
··· 290 290 WHEN h.owner_did = ?1 THEN 'owner' 291 291 WHEN c.member_did IS NOT NULL THEN 'crew' 292 292 WHEN h.allow_all_crew = 1 THEN 'eligible' 293 - WHEN h.public = 1 THEN 'public' 294 293 ELSE 'none' 295 294 END as membership, 296 295 c.permissions 297 296 FROM hold_captain_records h 298 297 LEFT JOIN hold_crew_members c ON h.hold_did = c.hold_did AND c.member_did = ?1 299 - WHERE h.public = 1 300 - OR h.allow_all_crew = 1 298 + WHERE h.allow_all_crew = 1 301 299 OR h.owner_did = ?1 302 300 OR c.member_did IS NOT NULL 303 301 ORDER BY
+3 -5
pkg/appview/handlers/settings.go
··· 58 58 slog.Debug("Fetched profile", "component", "settings", "did", user.DID, "default_hold", profile.DefaultHold) 59 59 60 60 // Get available holds for dropdown 61 - var ownedHolds, crewHolds, eligibleHolds, publicHolds []HoldDisplay 61 + var ownedHolds, crewHolds, eligibleHolds []HoldDisplay 62 62 holdDataMap := make(map[string]HoldDisplay) 63 63 64 64 if h.DB != nil { ··· 93 93 crewHolds = append(crewHolds, display) 94 94 case "eligible": 95 95 eligibleHolds = append(eligibleHolds, display) 96 - case "public": 97 - publicHolds = append(publicHolds, display) 98 96 } 99 97 } 100 98 } ··· 134 132 CurrentHoldDID string 135 133 CurrentHoldDisplay string 136 134 ShowCurrentHold bool 135 + AppViewDefaultHoldDID string 137 136 AppViewDefaultHoldDisplay string 138 137 AppViewDefaultRegion string 139 138 OwnedHolds []HoldDisplay 140 139 CrewHolds []HoldDisplay 141 140 EligibleHolds []HoldDisplay 142 - PublicHolds []HoldDisplay 143 141 HoldDataJSON template.JS 144 142 }{ 145 143 PageData: NewPageData(r, &h.BaseUIHandler), ··· 147 145 CurrentHoldDID: profile.DefaultHold, 148 146 CurrentHoldDisplay: deriveDisplayName(profile.DefaultHold), 149 147 ShowCurrentHold: showCurrentHold, 148 + AppViewDefaultHoldDID: h.DefaultHoldDID, 150 149 AppViewDefaultHoldDisplay: appViewDefaultDisplay, 151 150 AppViewDefaultRegion: appViewDefaultRegion, 152 151 OwnedHolds: ownedHolds, 153 152 CrewHolds: crewHolds, 154 153 EligibleHolds: eligibleHolds, 155 - PublicHolds: publicHolds, 156 154 HoldDataJSON: template.JS(holdDataJSON), 157 155 } 158 156
+1 -10
pkg/appview/templates/pages/settings.html
··· 142 142 <span class="label-text">Storage Hold</span> 143 143 </label> 144 144 <select id="default-hold" name="hold_did" class="select select-bordered w-full" autocomplete="off"> 145 - <option value=""{{ if eq .CurrentHoldDID "" }} selected{{ end }}>AppView Default ({{ .AppViewDefaultHoldDisplay }}{{ if .AppViewDefaultRegion }}, {{ .AppViewDefaultRegion }}{{ end }})</option> 145 + <option value="{{ .AppViewDefaultHoldDID }}"{{ if or (eq .CurrentHoldDID "") (eq .CurrentHoldDID .AppViewDefaultHoldDID) }} selected{{ end }}>AppView Default ({{ .AppViewDefaultHoldDisplay }}{{ if .AppViewDefaultRegion }}, {{ .AppViewDefaultRegion }}{{ end }})</option> 146 146 147 147 {{ if .ShowCurrentHold }} 148 148 <option value="{{ .CurrentHoldDID }}" selected>Current ({{ .CurrentHoldDisplay }})</option> ··· 178 178 </optgroup> 179 179 {{ end }} 180 180 181 - {{ if .PublicHolds }} 182 - <optgroup label="Public Holds"> 183 - {{ range .PublicHolds }} 184 - <option value="{{ .DID }}" {{ if eq $.CurrentHoldDID .DID }}selected{{ end }}> 185 - {{ .DisplayName }}{{ if .Region }} ({{ .Region }}){{ end }} 186 - </option> 187 - {{ end }} 188 - </optgroup> 189 - {{ end }} 190 181 </select> 191 182 <p class="text-sm text-base-content/60 mt-1">Your images will be stored on the selected hold</p> 192 183 </fieldset>
+15 -26
pkg/hold/admin/handlers.go
··· 93 93 return 94 94 } 95 95 96 - // Calculate total storage by summing quota for all crew members 96 + // Calculate total storage with a single bulk query 97 97 var totalSize int64 98 98 uniqueDigests := 0 99 99 100 - crew, err := ui.pds.ListCrewMembers(ctx) 100 + allQuotas, err := ui.pds.GetAllUserQuotas(ctx) 101 101 if err != nil { 102 - slog.Warn("Failed to list crew for stats", "error", err) 102 + slog.Warn("Failed to get all user quotas", "error", err) 103 103 } else { 104 - // Get usage for each crew member 105 - for _, member := range crew { 106 - quotaStats, err := ui.pds.GetQuotaForUser(ctx, member.Record.Member) 107 - if err != nil { 108 - continue 109 - } 110 - totalSize += quotaStats.TotalSize 111 - uniqueDigests += quotaStats.UniqueBlobs 104 + for _, q := range allQuotas { 105 + totalSize += q.TotalSize 106 + uniqueDigests += q.UniqueBlobs 112 107 } 113 108 } 114 109 ··· 152 147 } 153 148 } 154 149 155 - // Get all crew members and their usage 156 - crew, err := ui.pds.ListCrewMembers(ctx) 150 + // Get all user quotas in a single bulk query 151 + allQuotas, err := ui.pds.GetAllUserQuotas(ctx) 157 152 if err != nil { 158 - slog.Error("Failed to list crew members for top users", "error", err) 153 + slog.Error("Failed to get all user quotas", "error", err) 159 154 http.Error(w, "Failed to load top users", http.StatusInternalServerError) 160 155 return 161 156 } 162 157 163 158 var users []UserUsage 164 - for _, member := range crew { 165 - quotaStats, err := ui.pds.GetQuotaForUser(ctx, member.Record.Member) 166 - if err != nil { 167 - slog.Warn("Failed to get quota for user", "did", member.Record.Member, "error", err) 168 - continue 169 - } 170 - 159 + for did, q := range allQuotas { 171 160 users = append(users, UserUsage{ 172 - DID: member.Record.Member, 173 - Handle: resolveHandle(ctx, member.Record.Member), 174 - Usage: quotaStats.TotalSize, 175 - UsageHuman: formatHumanBytes(quotaStats.TotalSize), 176 - BlobCount: quotaStats.UniqueBlobs, 161 + DID: did, 162 + Handle: resolveHandle(ctx, did), 163 + Usage: q.TotalSize, 164 + UsageHuman: formatHumanBytes(q.TotalSize), 165 + BlobCount: q.UniqueBlobs, 177 166 }) 178 167 } 179 168
+13 -10
pkg/hold/admin/handlers_crew.go
··· 9 9 "time" 10 10 11 11 "atcr.io/pkg/atproto" 12 + "atcr.io/pkg/hold/pds" 12 13 "github.com/go-chi/chi/v5" 13 14 ) 14 15 ··· 56 57 return nil, err 57 58 } 58 59 59 - userUsage := make(map[string]int64) 60 - for _, member := range crew { 61 - quotaStats, err := ui.pds.GetQuotaForUser(ctx, member.Record.Member) 62 - if err != nil { 63 - slog.Warn("Failed to get quota for crew member", "did", member.Record.Member, "error", err) 64 - continue 65 - } 66 - userUsage[member.Record.Member] = quotaStats.TotalSize 60 + // Single bulk query for all user quotas 61 + allQuotas, err := ui.pds.GetAllUserQuotas(ctx) 62 + if err != nil { 63 + slog.Warn("Failed to get all user quotas for crew views", "error", err) 64 + allQuotas = make(map[string]*pds.QuotaStats) 67 65 } 68 66 69 67 defaultTier := "default" ··· 88 86 AddedAt: parseTime(member.Record.AddedAt), 89 87 } 90 88 89 + usage := int64(0) 90 + if q, ok := allQuotas[member.Record.Member]; ok { 91 + usage = q.TotalSize 92 + } 93 + 91 94 if ui.quotaMgr != nil && ui.quotaMgr.IsEnabled() { 92 95 if limit := ui.quotaMgr.GetTierLimit(tier); limit != nil { 93 96 view.TierLimit = formatHumanBytes(*limit) 94 97 if *limit > 0 { 95 - view.UsagePercent = int(float64(userUsage[view.DID]) / float64(*limit) * 100) 98 + view.UsagePercent = int(float64(usage) / float64(*limit) * 100) 96 99 } 97 100 } else { 98 101 view.TierLimit = "Unlimited" ··· 101 104 view.TierLimit = "Unlimited" 102 105 } 103 106 104 - view.CurrentUsage = userUsage[view.DID] 107 + view.CurrentUsage = usage 105 108 view.UsageHuman = formatHumanBytes(view.CurrentUsage) 106 109 107 110 crewViews = append(crewViews, view)
+2 -2
pkg/hold/pds/delete_test.go
··· 215 215 216 216 // Index the record 217 217 if pds.recordsIndex != nil { 218 - err = pds.recordsIndex.IndexRecord(atproto.BskyPostCollection, rkey, "testcid", aliceDID) 218 + err = pds.recordsIndex.IndexRecord(atproto.BskyPostCollection, rkey, "testcid", aliceDID, "", 0) 219 219 if err != nil { 220 220 t.Fatalf("Failed to index test post: %v", err) 221 221 } ··· 304 304 305 305 // Index the record 306 306 if pds.recordsIndex != nil { 307 - err = pds.recordsIndex.IndexRecord(atproto.BskyPostCollection, rkey, "testcid", aliceDID) 307 + err = pds.recordsIndex.IndexRecord(atproto.BskyPostCollection, rkey, "testcid", aliceDID, "", 0) 308 308 if err != nil { 309 309 t.Fatalf("Failed to index test post: %v", err) 310 310 }
+27 -87
pkg/hold/pds/layer.go
··· 89 89 Tier string `json:"tier,omitempty"` // quota tier (e.g., 'deckhand', 'bosun', 'quartermaster') 90 90 } 91 91 92 - // GetQuotaForUser calculates storage quota for a specific user 93 - // It iterates through all layer records, filters by userDid, deduplicates by digest, 94 - // and sums the sizes of unique blobs. 92 + // GetQuotaForUser calculates storage quota for a specific user. 93 + // Uses SQL aggregation over the denormalized digest/size columns in the records index. 95 94 func (p *HoldPDS) GetQuotaForUser(ctx context.Context, userDID string) (*QuotaStats, error) { 96 95 if p.recordsIndex == nil { 97 96 return nil, fmt.Errorf("records index not available") 98 97 } 99 98 100 - // Get session for reading record data 101 - session, err := p.carstore.ReadOnlySession(p.uid) 99 + uniqueBlobs, totalSize, err := p.recordsIndex.QuotaForDID(atproto.LayerCollection, userDID) 102 100 if err != nil { 103 - return nil, fmt.Errorf("failed to create session: %w", err) 101 + return nil, fmt.Errorf("failed to query quota: %w", err) 104 102 } 105 103 106 - head, err := p.carstore.GetUserRepoHead(ctx, p.uid) 107 - if err != nil { 108 - return nil, fmt.Errorf("failed to get repo head: %w", err) 109 - } 104 + return &QuotaStats{ 105 + UserDID: userDID, 106 + UniqueBlobs: uniqueBlobs, 107 + TotalSize: totalSize, 108 + }, nil 109 + } 110 110 111 - if !head.Defined() { 112 - // Empty repo - return zero stats 113 - return &QuotaStats{UserDID: userDID}, nil 111 + // GetAllUserQuotas returns quota stats for all users in a single SQL query. 112 + // Used by admin endpoints to avoid N+1 per-user quota lookups. 113 + func (p *HoldPDS) GetAllUserQuotas(ctx context.Context) (map[string]*QuotaStats, error) { 114 + if p.recordsIndex == nil { 115 + return nil, fmt.Errorf("records index not available") 114 116 } 115 117 116 - repoHandle, err := repo.OpenRepo(ctx, session, head) 118 + quotas, err := p.recordsIndex.QuotasByDID(atproto.LayerCollection) 117 119 if err != nil { 118 - return nil, fmt.Errorf("failed to open repo: %w", err) 120 + return nil, fmt.Errorf("failed to query all quotas: %w", err) 119 121 } 120 122 121 - // Track unique digests and their sizes 122 - digestSizes := make(map[string]int64) 123 - 124 - // Iterate all layer records via the index 125 - cursor := "" 126 - batchSize := 1000 // Process in batches 127 - 128 - for { 129 - records, nextCursor, err := p.recordsIndex.ListRecords(atproto.LayerCollection, batchSize, cursor, true) 130 - if err != nil { 131 - return nil, fmt.Errorf("failed to list layer records: %w", err) 132 - } 133 - 134 - for _, rec := range records { 135 - // Construct record path and get the record data 136 - recordPath := rec.Collection + "/" + rec.Rkey 137 - 138 - _, recBytes, err := repoHandle.GetRecordBytes(ctx, recordPath) 139 - if err != nil { 140 - // Skip records we can't read 141 - continue 142 - } 143 - 144 - // Decode the layer record 145 - recordValue, err := lexutil.CborDecodeValue(*recBytes) 146 - if err != nil { 147 - continue 148 - } 149 - 150 - layerRecord, ok := recordValue.(*atproto.LayerRecord) 151 - if !ok { 152 - continue 153 - } 154 - 155 - // Filter by userDID 156 - if layerRecord.UserDID != userDID { 157 - continue 158 - } 159 - 160 - // Deduplicate by digest - keep the size (could be different pushes of same blob) 161 - // Store the size - we only count each unique digest once 162 - if _, exists := digestSizes[layerRecord.Digest]; !exists { 163 - digestSizes[layerRecord.Digest] = layerRecord.Size 164 - } 123 + result := make(map[string]*QuotaStats, len(quotas)) 124 + for did, q := range quotas { 125 + result[did] = &QuotaStats{ 126 + UserDID: did, 127 + UniqueBlobs: q.UniqueBlobs, 128 + TotalSize: q.TotalSize, 165 129 } 166 - 167 - if nextCursor == "" { 168 - break 169 - } 170 - cursor = nextCursor 171 130 } 172 - 173 - // Calculate totals 174 - var totalSize int64 175 - for _, size := range digestSizes { 176 - totalSize += size 177 - } 178 - 179 - return &QuotaStats{ 180 - UserDID: userDID, 181 - UniqueBlobs: len(digestSizes), 182 - TotalSize: totalSize, 183 - }, nil 131 + return result, nil 184 132 } 185 133 186 134 // GetQuotaForUserWithTier calculates quota with tier-aware limits ··· 262 210 263 211 var records []*atproto.LayerRecord 264 212 265 - // Iterate all layer records via the index 213 + // Iterate layer records for this user via the index (filtered by DID in SQL) 266 214 cursor := "" 267 - batchSize := 1000 // Process in batches 215 + batchSize := 1000 268 216 269 217 for { 270 - indexRecords, nextCursor, err := p.recordsIndex.ListRecords(atproto.LayerCollection, batchSize, cursor, true) 218 + indexRecords, nextCursor, err := p.recordsIndex.ListRecordsByDID(atproto.LayerCollection, userDID, batchSize, cursor) 271 219 if err != nil { 272 220 return nil, fmt.Errorf("failed to list layer records: %w", err) 273 221 } 274 222 275 223 for _, rec := range indexRecords { 276 - // Construct record path and get the record data 277 224 recordPath := rec.Collection + "/" + rec.Rkey 278 225 279 226 _, recBytes, err := repoHandle.GetRecordBytes(ctx, recordPath) 280 227 if err != nil { 281 - // Skip records we can't read 282 228 continue 283 229 } 284 230 285 - // Decode the layer record 286 231 recordValue, err := lexutil.CborDecodeValue(*recBytes) 287 232 if err != nil { 288 233 continue ··· 290 235 291 236 layerRecord, ok := recordValue.(*atproto.LayerRecord) 292 237 if !ok { 293 - continue 294 - } 295 - 296 - // Filter by userDID 297 - if layerRecord.UserDID != userDID { 298 238 continue 299 239 } 300 240
+94 -13
pkg/hold/pds/records.go
··· 36 36 rkey TEXT NOT NULL, 37 37 cid TEXT NOT NULL, 38 38 did TEXT, 39 + digest TEXT, 40 + size INTEGER, 39 41 PRIMARY KEY (collection, rkey) 40 42 ); 41 43 CREATE INDEX IF NOT EXISTS idx_records_collection_rkey ON records(collection, rkey); ··· 54 56 return nil, fmt.Errorf("failed to open records database: %w", err) 55 57 } 56 58 57 - // Check if table exists and has the did column 59 + // Check if table exists and has required columns 58 60 needsRebuild := false 59 61 var tableName string 60 62 err = db.QueryRow(`SELECT name FROM sqlite_master WHERE type='table' AND name='records'`).Scan(&tableName) 61 63 if err == nil { 62 - // Table exists, check for did column 63 64 var colCount int 65 + // Check for did column 64 66 err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('records') WHERE name='did'`).Scan(&colCount) 65 67 if err != nil || colCount == 0 { 66 68 needsRebuild = true 67 69 slog.Info("Records index schema outdated, rebuilding with did column") 68 70 } 71 + // Check for digest column (added for SQL-based quota queries) 72 + if !needsRebuild { 73 + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('records') WHERE name='digest'`).Scan(&colCount) 74 + if err != nil || colCount == 0 { 75 + needsRebuild = true 76 + slog.Info("Records index schema outdated, rebuilding with digest/size columns") 77 + } 78 + } 69 79 } 70 80 71 81 if needsRebuild { ··· 91 101 // NewRecordsIndexWithDB creates a records index using an existing *sql.DB connection. 92 102 // The caller is responsible for the DB lifecycle. 93 103 func NewRecordsIndexWithDB(db *sql.DB) (*RecordsIndex, error) { 94 - // Check if table exists and has the did column 104 + // Check if table exists and has required columns 95 105 needsRebuild := false 96 106 var tableName string 97 107 err := db.QueryRow(`SELECT name FROM sqlite_master WHERE type='table' AND name='records'`).Scan(&tableName) 98 108 if err == nil { 99 109 var colCount int 110 + // Check for did column 100 111 err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('records') WHERE name='did'`).Scan(&colCount) 101 112 if err != nil || colCount == 0 { 102 113 needsRebuild = true 103 114 slog.Info("Records index schema outdated, rebuilding with did column") 104 115 } 116 + // Check for digest column (added for SQL-based quota queries) 117 + if !needsRebuild { 118 + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('records') WHERE name='digest'`).Scan(&colCount) 119 + if err != nil || colCount == 0 { 120 + needsRebuild = true 121 + slog.Info("Records index schema outdated, rebuilding with digest/size columns") 122 + } 123 + } 105 124 } 106 125 107 126 if needsRebuild { ··· 129 148 } 130 149 131 150 // IndexRecord adds or updates a record in the index 132 - // did parameter is optional - pass empty string if not applicable 133 - func (ri *RecordsIndex) IndexRecord(collection, rkey, cidStr, did string) error { 151 + // did, digest, size parameters are optional - pass empty/zero if not applicable 152 + func (ri *RecordsIndex) IndexRecord(collection, rkey, cidStr, did, digest string, size int64) error { 134 153 _, err := ri.db.Exec(` 135 - INSERT OR REPLACE INTO records (collection, rkey, cid, did) 136 - VALUES (?, ?, ?, ?) 137 - `, collection, rkey, cidStr, sql.NullString{String: did, Valid: did != ""}) 154 + INSERT OR REPLACE INTO records (collection, rkey, cid, did, digest, size) 155 + VALUES (?, ?, ?, ?, ?, ?) 156 + `, collection, rkey, cidStr, 157 + sql.NullString{String: did, Valid: did != ""}, 158 + sql.NullString{String: digest, Valid: digest != ""}, 159 + sql.NullInt64{Int64: size, Valid: size > 0}) 138 160 return err 139 161 } 140 162 ··· 328 350 defer tx.Rollback() 329 351 330 352 stmt, err := tx.Prepare(` 331 - INSERT OR REPLACE INTO records (collection, rkey, cid, did) 332 - VALUES (?, ?, ?, ?) 353 + INSERT OR REPLACE INTO records (collection, rkey, cid, did, digest, size) 354 + VALUES (?, ?, ?, ?, ?, ?) 333 355 `) 334 356 if err != nil { 335 357 return fmt.Errorf("failed to prepare statement: %w", err) ··· 345 367 } 346 368 collection, rkey := parts[0], parts[1] 347 369 348 - // Extract DID from record content based on collection type 349 - var did string 370 + // Extract fields from record content based on collection type 371 + var did, digest string 372 + var size int64 350 373 _, recBytes, err := repoHandle.GetRecordBytes(ctx, key) 351 374 if err == nil && recBytes != nil { 352 375 did = extractDIDFromRecord(collection, *recBytes) 376 + digest, size = extractLayerFieldsFromRecord(collection, *recBytes) 353 377 } 354 378 355 - _, err = stmt.Exec(collection, rkey, c.String(), sql.NullString{String: did, Valid: did != ""}) 379 + _, err = stmt.Exec(collection, rkey, c.String(), 380 + sql.NullString{String: did, Valid: did != ""}, 381 + sql.NullString{String: digest, Valid: digest != ""}, 382 + sql.NullInt64{Int64: size, Valid: size > 0}) 356 383 if err != nil { 357 384 return fmt.Errorf("failed to index record %s: %w", key, err) 358 385 } ··· 390 417 } 391 418 } 392 419 return out 420 + } 421 + 422 + // QuotaForDID returns unique blob count and total size for a single DID in a collection. 423 + // Uses SQL aggregation over the denormalized digest/size columns. 424 + func (ri *RecordsIndex) QuotaForDID(collection, did string) (uniqueBlobs int, totalSize int64, err error) { 425 + err = ri.db.QueryRow(` 426 + SELECT COUNT(*), COALESCE(SUM(size), 0) 427 + FROM (SELECT DISTINCT digest, size FROM records WHERE collection = ? AND did = ? AND digest IS NOT NULL) 428 + `, collection, did).Scan(&uniqueBlobs, &totalSize) 429 + return 430 + } 431 + 432 + // QuotasByDID returns unique blob count and total size grouped by DID. 433 + // Single query replaces N individual quota lookups. 434 + func (ri *RecordsIndex) QuotasByDID(collection string) (map[string]QuotaResult, error) { 435 + rows, err := ri.db.Query(` 436 + SELECT did, COUNT(*), COALESCE(SUM(size), 0) 437 + FROM (SELECT DISTINCT did, digest, size FROM records WHERE collection = ? AND did IS NOT NULL AND digest IS NOT NULL) 438 + GROUP BY did 439 + `, collection) 440 + if err != nil { 441 + return nil, fmt.Errorf("failed to query quotas by DID: %w", err) 442 + } 443 + defer rows.Close() 444 + 445 + result := make(map[string]QuotaResult) 446 + for rows.Next() { 447 + var did string 448 + var qr QuotaResult 449 + if err := rows.Scan(&did, &qr.UniqueBlobs, &qr.TotalSize); err != nil { 450 + return nil, fmt.Errorf("failed to scan quota row: %w", err) 451 + } 452 + result[did] = qr 453 + } 454 + return result, rows.Err() 455 + } 456 + 457 + // QuotaResult holds aggregated quota data from SQL queries 458 + type QuotaResult struct { 459 + UniqueBlobs int 460 + TotalSize int64 461 + } 462 + 463 + // extractLayerFieldsFromRecord extracts digest and size from layer records. 464 + // Returns empty/zero for non-layer records. 465 + func extractLayerFieldsFromRecord(collection string, recBytes []byte) (string, int64) { 466 + if collection != atproto.LayerCollection { 467 + return "", 0 468 + } 469 + var rec atproto.LayerRecord 470 + if err := rec.UnmarshalCBOR(bytes.NewReader(recBytes)); err != nil { 471 + return "", 0 472 + } 473 + return rec.Digest, rec.Size 393 474 } 394 475 395 476 // extractDIDFromRecord extracts the associated DID from a record based on its collection type
+154 -19
pkg/hold/pds/records_test.go
··· 50 50 defer ri.Close() 51 51 52 52 // Index a record 53 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "") 53 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "", "", 0) 54 54 if err != nil { 55 55 t.Fatalf("IndexRecord() error = %v", err) 56 56 } ··· 75 75 defer ri.Close() 76 76 77 77 // Index a record 78 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "") 78 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "", "", 0) 79 79 if err != nil { 80 80 t.Fatalf("IndexRecord() first call error = %v", err) 81 81 } 82 82 83 83 // Update the same record with new CID 84 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei456", "") 84 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei456", "", "", 0) 85 85 if err != nil { 86 86 t.Fatalf("IndexRecord() second call error = %v", err) 87 87 } ··· 118 118 defer ri.Close() 119 119 120 120 // Index a record 121 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "") 121 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "", "", 0) 122 122 if err != nil { 123 123 t.Fatalf("IndexRecord() error = %v", err) 124 124 } ··· 217 217 {"ccc", "cid3"}, 218 218 } 219 219 for _, r := range records { 220 - if err := ri.IndexRecord("io.atcr.hold.crew", r.rkey, r.cid, ""); err != nil { 220 + if err := ri.IndexRecord("io.atcr.hold.crew", r.rkey, r.cid, "", "", 0); err != nil { 221 221 t.Fatalf("IndexRecord() error = %v", err) 222 222 } 223 223 } ··· 248 248 // Add records with different rkeys (TIDs are lexicographically ordered by time) 249 249 rkeys := []string{"3m3aaaaaaaaa", "3m3bbbbbbbbb", "3m3ccccccccc"} 250 250 for _, rkey := range rkeys { 251 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 251 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, "", "", 0); err != nil { 252 252 t.Fatalf("IndexRecord() error = %v", err) 253 253 } 254 254 } ··· 286 286 // Add records 287 287 rkeys := []string{"3m3aaaaaaaaa", "3m3bbbbbbbbb", "3m3ccccccccc"} 288 288 for _, rkey := range rkeys { 289 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 289 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, "", "", 0); err != nil { 290 290 t.Fatalf("IndexRecord() error = %v", err) 291 291 } 292 292 } ··· 324 324 // Add 5 records 325 325 for i := range 5 { 326 326 rkey := string(rune('a' + i)) 327 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 327 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, "", "", 0); err != nil { 328 328 t.Fatalf("IndexRecord() error = %v", err) 329 329 } 330 330 } ··· 355 355 // Add 5 records 356 356 rkeys := []string{"a", "b", "c", "d", "e"} 357 357 for _, rkey := range rkeys { 358 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 358 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, "", "", 0); err != nil { 359 359 t.Fatalf("IndexRecord() error = %v", err) 360 360 } 361 361 } ··· 430 430 // Add 5 records 431 431 rkeys := []string{"a", "b", "c", "d", "e"} 432 432 for _, rkey := range rkeys { 433 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 433 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, "", "", 0); err != nil { 434 434 t.Fatalf("IndexRecord() error = %v", err) 435 435 } 436 436 } ··· 474 474 475 475 // Add records to two collections 476 476 for i := range 3 { 477 - ri.IndexRecord("io.atcr.hold.crew", string(rune('a'+i)), "cid1", "") 477 + ri.IndexRecord("io.atcr.hold.crew", string(rune('a'+i)), "cid1", "", "", 0) 478 478 } 479 479 for i := range 5 { 480 - ri.IndexRecord("io.atcr.hold.captain", string(rune('a'+i)), "cid2", "") 480 + ri.IndexRecord("io.atcr.hold.captain", string(rune('a'+i)), "cid2", "", "", 0) 481 481 } 482 482 483 483 // Count crew ··· 527 527 defer ri.Close() 528 528 529 529 // Add records to multiple collections 530 - ri.IndexRecord("io.atcr.hold.crew", "a", "cid1", "") 531 - ri.IndexRecord("io.atcr.hold.crew", "b", "cid2", "") 532 - ri.IndexRecord("io.atcr.hold.captain", "self", "cid3", "") 533 - ri.IndexRecord("io.atcr.manifest", "abc123", "cid4", "") 530 + ri.IndexRecord("io.atcr.hold.crew", "a", "cid1", "", "", 0) 531 + ri.IndexRecord("io.atcr.hold.crew", "b", "cid2", "", "", 0) 532 + ri.IndexRecord("io.atcr.hold.captain", "self", "cid3", "", "", 0) 533 + ri.IndexRecord("io.atcr.manifest", "abc123", "cid4", "", "", 0) 534 534 535 535 count, err := ri.TotalCount() 536 536 if err != nil { ··· 581 581 defer ri.Close() 582 582 583 583 // Add records to different collections with same rkeys 584 - ri.IndexRecord("io.atcr.hold.crew", "abc", "cid-crew", "") 585 - ri.IndexRecord("io.atcr.hold.captain", "abc", "cid-captain", "") 586 - ri.IndexRecord("io.atcr.manifest", "abc", "cid-manifest", "") 584 + ri.IndexRecord("io.atcr.hold.crew", "abc", "cid-crew", "", "", 0) 585 + ri.IndexRecord("io.atcr.hold.captain", "abc", "cid-captain", "", "", 0) 586 + ri.IndexRecord("io.atcr.manifest", "abc", "cid-manifest", "", "", 0) 587 587 588 588 // Listing should only return records from requested collection 589 589 records, _, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false) ··· 605 605 t.Errorf("Expected captain count 1 after deleting crew, got %d", count) 606 606 } 607 607 } 608 + 609 + // TestRecordsIndex_QuotaForDID tests single-user quota aggregation 610 + func TestRecordsIndex_QuotaForDID(t *testing.T) { 611 + tmpDir := t.TempDir() 612 + ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db")) 613 + if err != nil { 614 + t.Fatalf("NewRecordsIndex() error = %v", err) 615 + } 616 + defer ri.Close() 617 + 618 + // Add layer records for two users, with some duplicate digests 619 + ri.IndexRecord("io.atcr.hold.layer", "r1", "cid1", "did:plc:alice", "sha256:aaa", 100) 620 + ri.IndexRecord("io.atcr.hold.layer", "r2", "cid2", "did:plc:alice", "sha256:bbb", 200) 621 + ri.IndexRecord("io.atcr.hold.layer", "r3", "cid3", "did:plc:alice", "sha256:aaa", 100) // duplicate digest 622 + ri.IndexRecord("io.atcr.hold.layer", "r4", "cid4", "did:plc:bob", "sha256:ccc", 300) 623 + 624 + // Alice should have 2 unique blobs, total 300 bytes 625 + uniqueBlobs, totalSize, err := ri.QuotaForDID("io.atcr.hold.layer", "did:plc:alice") 626 + if err != nil { 627 + t.Fatalf("QuotaForDID() error = %v", err) 628 + } 629 + if uniqueBlobs != 2 { 630 + t.Errorf("Expected 2 unique blobs for alice, got %d", uniqueBlobs) 631 + } 632 + if totalSize != 300 { 633 + t.Errorf("Expected total size 300 for alice, got %d", totalSize) 634 + } 635 + 636 + // Bob should have 1 unique blob, total 300 bytes 637 + uniqueBlobs, totalSize, err = ri.QuotaForDID("io.atcr.hold.layer", "did:plc:bob") 638 + if err != nil { 639 + t.Fatalf("QuotaForDID() error = %v", err) 640 + } 641 + if uniqueBlobs != 1 { 642 + t.Errorf("Expected 1 unique blob for bob, got %d", uniqueBlobs) 643 + } 644 + if totalSize != 300 { 645 + t.Errorf("Expected total size 300 for bob, got %d", totalSize) 646 + } 647 + 648 + // Unknown user should have 0 649 + uniqueBlobs, totalSize, err = ri.QuotaForDID("io.atcr.hold.layer", "did:plc:unknown") 650 + if err != nil { 651 + t.Fatalf("QuotaForDID() error = %v", err) 652 + } 653 + if uniqueBlobs != 0 || totalSize != 0 { 654 + t.Errorf("Expected 0/0 for unknown user, got %d/%d", uniqueBlobs, totalSize) 655 + } 656 + } 657 + 658 + // TestRecordsIndex_QuotasByDID tests bulk quota aggregation 659 + func TestRecordsIndex_QuotasByDID(t *testing.T) { 660 + tmpDir := t.TempDir() 661 + ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db")) 662 + if err != nil { 663 + t.Fatalf("NewRecordsIndex() error = %v", err) 664 + } 665 + defer ri.Close() 666 + 667 + // Add layer records for multiple users 668 + ri.IndexRecord("io.atcr.hold.layer", "r1", "cid1", "did:plc:alice", "sha256:aaa", 100) 669 + ri.IndexRecord("io.atcr.hold.layer", "r2", "cid2", "did:plc:alice", "sha256:bbb", 200) 670 + ri.IndexRecord("io.atcr.hold.layer", "r3", "cid3", "did:plc:bob", "sha256:ccc", 500) 671 + // Non-layer record should be excluded 672 + ri.IndexRecord("io.atcr.hold.crew", "r4", "cid4", "did:plc:alice", "", 0) 673 + 674 + quotas, err := ri.QuotasByDID("io.atcr.hold.layer") 675 + if err != nil { 676 + t.Fatalf("QuotasByDID() error = %v", err) 677 + } 678 + 679 + if len(quotas) != 2 { 680 + t.Fatalf("Expected 2 users in quotas, got %d", len(quotas)) 681 + } 682 + 683 + alice := quotas["did:plc:alice"] 684 + if alice.UniqueBlobs != 2 { 685 + t.Errorf("Expected 2 unique blobs for alice, got %d", alice.UniqueBlobs) 686 + } 687 + if alice.TotalSize != 300 { 688 + t.Errorf("Expected total size 300 for alice, got %d", alice.TotalSize) 689 + } 690 + 691 + bob := quotas["did:plc:bob"] 692 + if bob.UniqueBlobs != 1 { 693 + t.Errorf("Expected 1 unique blob for bob, got %d", bob.UniqueBlobs) 694 + } 695 + if bob.TotalSize != 500 { 696 + t.Errorf("Expected total size 500 for bob, got %d", bob.TotalSize) 697 + } 698 + } 699 + 700 + // TestRecordsIndex_QuotasByDID_Empty tests bulk quota with no records 701 + func TestRecordsIndex_QuotasByDID_Empty(t *testing.T) { 702 + tmpDir := t.TempDir() 703 + ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db")) 704 + if err != nil { 705 + t.Fatalf("NewRecordsIndex() error = %v", err) 706 + } 707 + defer ri.Close() 708 + 709 + quotas, err := ri.QuotasByDID("io.atcr.hold.layer") 710 + if err != nil { 711 + t.Fatalf("QuotasByDID() error = %v", err) 712 + } 713 + if len(quotas) != 0 { 714 + t.Errorf("Expected empty quotas map, got %d entries", len(quotas)) 715 + } 716 + } 717 + 718 + // TestRecordsIndex_QuotaForDID_IgnoresNullDigest tests that records without digest are excluded 719 + func TestRecordsIndex_QuotaForDID_IgnoresNullDigest(t *testing.T) { 720 + tmpDir := t.TempDir() 721 + ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db")) 722 + if err != nil { 723 + t.Fatalf("NewRecordsIndex() error = %v", err) 724 + } 725 + defer ri.Close() 726 + 727 + // Record with digest+size 728 + ri.IndexRecord("io.atcr.hold.layer", "r1", "cid1", "did:plc:alice", "sha256:aaa", 100) 729 + // Record without digest (e.g. old data before migration) 730 + ri.IndexRecord("io.atcr.hold.layer", "r2", "cid2", "did:plc:alice", "", 0) 731 + 732 + uniqueBlobs, totalSize, err := ri.QuotaForDID("io.atcr.hold.layer", "did:plc:alice") 733 + if err != nil { 734 + t.Fatalf("QuotaForDID() error = %v", err) 735 + } 736 + if uniqueBlobs != 1 { 737 + t.Errorf("Expected 1 unique blob (ignoring null digest), got %d", uniqueBlobs) 738 + } 739 + if totalSize != 100 { 740 + t.Errorf("Expected total size 100, got %d", totalSize) 741 + } 742 + }
+14 -2
pkg/hold/pds/server.go
··· 411 411 if op.RecCid != nil { 412 412 cidStr = op.RecCid.String() 413 413 } 414 - // Extract DID from record based on collection type 414 + // Extract fields from record based on collection type 415 415 did := extractDIDFromOp(op) 416 - if err := p.recordsIndex.IndexRecord(op.Collection, op.Rkey, cidStr, did); err != nil { 416 + digest, size := extractLayerFieldsFromOp(op) 417 + if err := p.recordsIndex.IndexRecord(op.Collection, op.Rkey, cidStr, did, digest, size); err != nil { 417 418 slog.Warn("Failed to index record", "collection", op.Collection, "rkey", op.Rkey, "error", err) 418 419 } 419 420 case EvtKindDeleteRecord: ··· 452 453 } 453 454 } 454 455 return "" 456 + } 457 + 458 + // extractLayerFieldsFromOp extracts digest and size from a layer record operation 459 + func extractLayerFieldsFromOp(op RepoOp) (string, int64) { 460 + if op.Record == nil || op.Collection != atproto.LayerCollection { 461 + return "", 0 462 + } 463 + if rec, ok := op.Record.(*atproto.LayerRecord); ok { 464 + return rec.Digest, rec.Size 465 + } 466 + return "", 0 455 467 } 456 468 457 469 // BackfillRecordsIndex populates the records index from existing MST data