A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

bugfixes for stars. implement migration schema

+368 -30
+1
go.mod
··· 14 14 github.com/mattn/go-sqlite3 v1.14.32 15 15 github.com/opencontainers/go-digest v1.0.0 16 16 github.com/spf13/cobra v1.8.0 17 + go.yaml.in/yaml/v4 v4.0.0-rc.2 17 18 golang.org/x/crypto v0.39.0 18 19 ) 19 20
+2
go.sum
··· 282 282 go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= 283 283 go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= 284 284 go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= 285 + go.yaml.in/yaml/v4 v4.0.0-rc.2 h1:/FrI8D64VSr4HtGIlUtlFMGsm7H7pWTbj6vOLVZcA6s= 286 + go.yaml.in/yaml/v4 v4.0.0-rc.2/go.mod h1:aZqd9kCMsGL7AuUv/m/PvWLdg5sjJsZ4oHDEnfPPfY0= 285 287 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= 286 288 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 287 289 golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
+8
pkg/appview/db/migrations/0001_remove_star_count_from_repository_stats.yaml
··· 1 + version: 1 2 + name: remove_star_count_from_repository_stats 3 + up: | 4 + -- Drop star_count column if it exists (SQLite 3.35.0+) 5 + ALTER TABLE repository_stats DROP COLUMN IF EXISTS star_count; 6 + 7 + -- Drop the old star_count index if it exists 8 + DROP INDEX IF EXISTS idx_repository_stats_star_count;
+77
pkg/appview/db/migrations/README.md
··· 1 + # Database Migrations 2 + 3 + This directory contains database migrations for the ATCR AppView database. 4 + 5 + ## Migration Format 6 + 7 + Each migration is a YAML file with the following structure: 8 + 9 + ```yaml 10 + version: 1 11 + name: descriptive_migration_name 12 + up: | 13 + SQL commands to apply the migration 14 + ``` 15 + 16 + ## Naming Convention 17 + 18 + Migration files should be named: `{version:04d}_{name}.yaml` 19 + 20 + Examples: 21 + - `0001_remove_star_count_from_repository_stats.yaml` 22 + - `0002_add_repository_labels.yaml` 23 + - `0003_create_webhooks_table.yaml` 24 + 25 + ## Creating a New Migration 26 + 27 + 1. **Choose the next version number** - Look at existing migrations and increment by 1 28 + 2. **Create a new YAML file** with the naming convention above 29 + 3. **Write your SQL** - Use the `|` block scalar for clean multi-line SQL 30 + 4. **Use `IF EXISTS` / `IF NOT EXISTS`** where possible for idempotency 31 + 32 + ## Examples 33 + 34 + ### Simple single-statement migration: 35 + 36 + ```yaml 37 + version: 2 38 + name: add_repository_description_index 39 + up: | 40 + CREATE INDEX IF NOT EXISTS idx_manifests_description ON manifests(description); 41 + ``` 42 + 43 + ### Complex multi-statement migration: 44 + 45 + ```yaml 46 + version: 3 47 + name: create_webhooks_table 48 + up: | 49 + -- Create webhooks table 50 + CREATE TABLE IF NOT EXISTS webhooks ( 51 + id INTEGER PRIMARY KEY AUTOINCREMENT, 52 + url TEXT NOT NULL, 53 + events TEXT NOT NULL, 54 + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 55 + ); 56 + 57 + -- Create index on URL for faster lookups 58 + CREATE INDEX IF NOT EXISTS idx_webhooks_url ON webhooks(url); 59 + 60 + -- Create index on events for filtering 61 + CREATE INDEX IF NOT EXISTS idx_webhooks_events ON webhooks(events); 62 + ``` 63 + 64 + ## How Migrations Run 65 + 66 + 1. Migrations are loaded from this directory on startup 67 + 2. Sorted by version number (ascending) 68 + 3. Each migration is checked against the `schema_migrations` table 69 + 4. Only unapplied migrations are executed 70 + 5. After successful execution, the version is recorded in `schema_migrations` 71 + 72 + ## Important Notes 73 + 74 + - **Never modify existing migrations** - Once applied, they're immutable 75 + - **Test migrations** before committing - Ensure they work on existing databases 76 + - **Version numbers must be unique** - The migration system will fail if duplicates exist 77 + - **Migrations are run automatically** on `InitDB()` - No manual intervention needed
+1 -1
pkg/appview/db/models.go
··· 81 81 type RepositoryStats struct { 82 82 DID string `json:"did"` 83 83 Repository string `json:"repository"` 84 - StarCount int `json:"star_count"` 84 + StarCount int `json:"star_count"` // Calculated from stars table, not stored 85 85 PullCount int `json:"pull_count"` 86 86 LastPull *time.Time `json:"last_pull,omitempty"` 87 87 PushCount int `json:"push_count"`
+109 -4
pkg/appview/db/queries.go
··· 767 767 var stats RepositoryStats 768 768 var lastPullStr, lastPushStr sql.NullString 769 769 770 + // Get pull/push stats from repository_stats, and star count from stars table 770 771 err := db.QueryRow(` 771 - SELECT did, repository, star_count, pull_count, last_pull, push_count, last_push 772 - FROM repository_stats 773 - WHERE did = ? AND repository = ? 774 - `, did, repository).Scan(&stats.DID, &stats.Repository, &stats.StarCount, &stats.PullCount, &lastPullStr, &stats.PushCount, &lastPushStr) 772 + SELECT 773 + COALESCE(rs.did, ?) as did, 774 + COALESCE(rs.repository, ?) as repository, 775 + (SELECT COUNT(*) FROM stars WHERE owner_did = ? AND repository = ?) as star_count, 776 + COALESCE(rs.pull_count, 0) as pull_count, 777 + rs.last_pull, 778 + COALESCE(rs.push_count, 0) as push_count, 779 + rs.last_push 780 + FROM (SELECT ? as did, ? as repository) AS placeholder 781 + LEFT JOIN repository_stats rs ON rs.did = ? AND rs.repository = ? 782 + `, did, repository, did, repository, did, repository, did, repository).Scan(&stats.DID, &stats.Repository, &stats.StarCount, &stats.PullCount, &lastPullStr, &stats.PushCount, &lastPushStr) 775 783 776 784 if err == sql.ErrNoRows { 777 785 // Return zero stats if no record exists yet ··· 838 846 WHERE did = ? AND repository = ? 839 847 `, did, repository) 840 848 return err 849 + } 850 + 851 + // UpsertStar inserts or updates a star record (idempotent) 852 + func UpsertStar(db *sql.DB, starrerDID, ownerDID, repository string, createdAt time.Time) error { 853 + _, err := db.Exec(` 854 + INSERT INTO stars (starrer_did, owner_did, repository, created_at) 855 + VALUES (?, ?, ?, ?) 856 + ON CONFLICT(starrer_did, owner_did, repository) DO UPDATE SET 857 + created_at = excluded.created_at 858 + `, starrerDID, ownerDID, repository, createdAt) 859 + return err 860 + } 861 + 862 + // DeleteStar deletes a star record 863 + func DeleteStar(db *sql.DB, starrerDID, ownerDID, repository string) error { 864 + _, err := db.Exec(` 865 + DELETE FROM stars 866 + WHERE starrer_did = ? AND owner_did = ? AND repository = ? 867 + `, starrerDID, ownerDID, repository) 868 + return err 869 + } 870 + 871 + // RebuildStarCount rebuilds the star count for a specific repository from the stars table 872 + func RebuildStarCount(db *sql.DB, ownerDID, repository string) error { 873 + _, err := db.Exec(` 874 + INSERT INTO repository_stats (did, repository, star_count) 875 + VALUES (?, ?, ( 876 + SELECT COUNT(*) FROM stars 877 + WHERE owner_did = ? AND repository = ? 878 + )) 879 + ON CONFLICT(did, repository) DO UPDATE SET 880 + star_count = ( 881 + SELECT COUNT(*) FROM stars 882 + WHERE owner_did = ? AND repository = ? 883 + ) 884 + `, ownerDID, repository, ownerDID, repository, ownerDID, repository) 885 + return err 886 + } 887 + 888 + // GetStarsForDID returns all stars created by a specific DID (for backfill reconciliation) 889 + // Returns a map of (ownerDID, repository) -> createdAt 890 + func GetStarsForDID(db *sql.DB, starrerDID string) (map[string]time.Time, error) { 891 + rows, err := db.Query(` 892 + SELECT owner_did, repository, created_at 893 + FROM stars 894 + WHERE starrer_did = ? 895 + `, starrerDID) 896 + if err != nil { 897 + return nil, err 898 + } 899 + defer rows.Close() 900 + 901 + stars := make(map[string]time.Time) 902 + for rows.Next() { 903 + var ownerDID, repository string 904 + var createdAt time.Time 905 + if err := rows.Scan(&ownerDID, &repository, &createdAt); err != nil { 906 + return nil, err 907 + } 908 + key := fmt.Sprintf("%s/%s", ownerDID, repository) 909 + stars[key] = createdAt 910 + } 911 + 912 + return stars, rows.Err() 913 + } 914 + 915 + // DeleteStarsNotInList deletes stars from the database that are not in the provided list 916 + // This is used during backfill reconciliation to remove stars that no longer exist on PDS 917 + func DeleteStarsNotInList(db *sql.DB, starrerDID string, foundStars map[string]time.Time) error { 918 + // Get current stars in DB 919 + currentStars, err := GetStarsForDID(db, starrerDID) 920 + if err != nil { 921 + return fmt.Errorf("failed to get current stars: %w", err) 922 + } 923 + 924 + // Find stars to delete (in DB but not on PDS) 925 + var toDelete []struct{ ownerDID, repository string } 926 + for key := range currentStars { 927 + if _, exists := foundStars[key]; !exists { 928 + parts := strings.SplitN(key, "/", 2) 929 + if len(parts) == 2 { 930 + toDelete = append(toDelete, struct{ ownerDID, repository string }{ 931 + ownerDID: parts[0], 932 + repository: parts[1], 933 + }) 934 + } 935 + } 936 + } 937 + 938 + // Delete orphaned stars 939 + for _, star := range toDelete { 940 + if err := DeleteStar(db, starrerDID, star.ownerDID, star.repository); err != nil { 941 + return fmt.Errorf("failed to delete star: %w", err) 942 + } 943 + } 944 + 945 + return nil 841 946 } 842 947 843 948 // IncrementPullCount increments the pull count for a repository
+122 -2
pkg/appview/db/schema.go
··· 2 2 3 3 import ( 4 4 "database/sql" 5 + "fmt" 6 + "os" 7 + "path/filepath" 8 + "sort" 5 9 6 10 _ "github.com/mattn/go-sqlite3" 11 + "go.yaml.in/yaml/v4" 7 12 ) 8 13 9 14 const schema = ` 15 + CREATE TABLE IF NOT EXISTS schema_migrations ( 16 + version INTEGER PRIMARY KEY, 17 + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 18 + ); 19 + 10 20 CREATE TABLE IF NOT EXISTS users ( 11 21 did TEXT PRIMARY KEY, 12 22 handle TEXT NOT NULL, ··· 144 154 CREATE TABLE IF NOT EXISTS repository_stats ( 145 155 did TEXT NOT NULL, 146 156 repository TEXT NOT NULL, 147 - star_count INTEGER NOT NULL DEFAULT 0, 148 157 pull_count INTEGER NOT NULL DEFAULT 0, 149 158 last_pull TIMESTAMP, 150 159 push_count INTEGER NOT NULL DEFAULT 0, ··· 153 162 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 154 163 ); 155 164 CREATE INDEX IF NOT EXISTS idx_repository_stats_did ON repository_stats(did); 156 - CREATE INDEX IF NOT EXISTS idx_repository_stats_star_count ON repository_stats(star_count DESC); 157 165 CREATE INDEX IF NOT EXISTS idx_repository_stats_pull_count ON repository_stats(pull_count DESC); 166 + 167 + CREATE TABLE IF NOT EXISTS stars ( 168 + starrer_did TEXT NOT NULL, 169 + owner_did TEXT NOT NULL, 170 + repository TEXT NOT NULL, 171 + created_at TIMESTAMP NOT NULL, 172 + PRIMARY KEY(starrer_did, owner_did, repository), 173 + FOREIGN KEY(starrer_did) REFERENCES users(did) ON DELETE CASCADE, 174 + FOREIGN KEY(owner_did) REFERENCES users(did) ON DELETE CASCADE 175 + ); 176 + CREATE INDEX IF NOT EXISTS idx_stars_owner_repo ON stars(owner_did, repository); 177 + CREATE INDEX IF NOT EXISTS idx_stars_starrer ON stars(starrer_did); 158 178 ` 159 179 160 180 // InitDB initializes the SQLite database with the schema ··· 174 194 return nil, err 175 195 } 176 196 197 + // Run migrations 198 + if err := runMigrations(db); err != nil { 199 + return nil, err 200 + } 201 + 177 202 return db, nil 203 + } 204 + 205 + // Migration represents a database migration 206 + type Migration struct { 207 + Version int `yaml:"version"` 208 + Name string `yaml:"name"` 209 + Up string `yaml:"up"` 210 + } 211 + 212 + // runMigrations applies any pending database migrations 213 + func runMigrations(db *sql.DB) error { 214 + // Load migrations from files 215 + migrations, err := loadMigrations() 216 + if err != nil { 217 + return fmt.Errorf("failed to load migrations: %w", err) 218 + } 219 + 220 + // Sort migrations by version 221 + sort.Slice(migrations, func(i, j int) bool { 222 + return migrations[i].Version < migrations[j].Version 223 + }) 224 + 225 + for _, m := range migrations { 226 + // Check if migration already applied 227 + var count int 228 + err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations WHERE version = ?", m.Version).Scan(&count) 229 + if err != nil { 230 + return fmt.Errorf("failed to check migration status: %w", err) 231 + } 232 + 233 + if count > 0 { 234 + // Migration already applied 235 + continue 236 + } 237 + 238 + // Apply migration 239 + fmt.Printf("Applying migration %d: %s\n", m.Version, m.Name) 240 + if _, err := db.Exec(m.Up); err != nil { 241 + return fmt.Errorf("failed to apply migration %d (%s): %w", m.Version, m.Name, err) 242 + } 243 + 244 + // Record migration 245 + if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", m.Version); err != nil { 246 + return fmt.Errorf("failed to record migration %d: %w", m.Version, err) 247 + } 248 + 249 + fmt.Printf("Migration %d applied successfully\n", m.Version) 250 + } 251 + 252 + return nil 253 + } 254 + 255 + // loadMigrations loads all migration files from the migrations directory 256 + func loadMigrations() ([]Migration, error) { 257 + // Get the path to the migrations directory 258 + // Try relative to working directory first, then relative to this file 259 + migrationsDir := "pkg/appview/db/migrations" 260 + if _, err := os.Stat(migrationsDir); os.IsNotExist(err) { 261 + // Try embedded path (when running from different directory) 262 + migrationsDir = filepath.Join(".", "migrations") 263 + } 264 + 265 + // Read all .yaml files in the migrations directory 266 + files, err := filepath.Glob(filepath.Join(migrationsDir, "*.yaml")) 267 + if err != nil { 268 + return nil, fmt.Errorf("failed to list migration files: %w", err) 269 + } 270 + 271 + var migrations []Migration 272 + for _, file := range files { 273 + data, err := os.ReadFile(file) 274 + if err != nil { 275 + return nil, fmt.Errorf("failed to read migration file %s: %w", file, err) 276 + } 277 + 278 + var m Migration 279 + if err := yaml.Unmarshal(data, &m); err != nil { 280 + return nil, fmt.Errorf("failed to parse migration file %s: %w", file, err) 281 + } 282 + 283 + // Validate migration 284 + if m.Version <= 0 { 285 + return nil, fmt.Errorf("invalid migration version in %s: %d", file, m.Version) 286 + } 287 + if m.Name == "" { 288 + return nil, fmt.Errorf("missing migration name in %s", file) 289 + } 290 + if m.Up == "" { 291 + return nil, fmt.Errorf("missing migration 'up' SQL in %s", file) 292 + } 293 + 294 + migrations = append(migrations, m) 295 + } 296 + 297 + return migrations, nil 178 298 }
+19 -4
pkg/appview/jetstream/backfill.go
··· 145 145 // Track which records exist on the PDS for reconciliation 146 146 var foundManifestDigests []string 147 147 var foundTags []struct{ Repository, Tag string } 148 + foundStars := make(map[string]time.Time) // key: "ownerDID/repository", value: createdAt 148 149 149 150 // Paginate through all records for this repo 150 151 for { ··· 169 170 Tag: tagRecord.Tag, 170 171 }) 171 172 } 173 + } else if collection == atproto.StarCollection { 174 + var starRecord atproto.StarRecord 175 + if err := json.Unmarshal(record.Value, &starRecord); err == nil { 176 + key := fmt.Sprintf("%s/%s", starRecord.Subject.DID, starRecord.Subject.Repository) 177 + foundStars[key] = starRecord.CreatedAt 178 + } 172 179 } 173 180 174 181 if err := b.processRecord(ctx, did, collection, &record); err != nil { ··· 187 194 } 188 195 189 196 // Reconcile deletions - remove records from DB that no longer exist on PDS 190 - if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags); err != nil { 197 + if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags, foundStars); err != nil { 191 198 fmt.Printf("WARNING: Failed to reconcile deletions for %s: %v\n", did, err) 192 199 } 193 200 ··· 195 202 } 196 203 197 204 // reconcileDeletions removes records from the database that no longer exist on the PDS 198 - func (b *BackfillWorker) reconcileDeletions(did, collection string, foundManifestDigests []string, foundTags []struct{ Repository, Tag string }) error { 205 + func (b *BackfillWorker) reconcileDeletions(did, collection string, foundManifestDigests []string, foundTags []struct{ Repository, Tag string }, foundStars map[string]time.Time) error { 199 206 switch collection { 200 207 case atproto.ManifestCollection: 201 208 // Get current manifests in DB ··· 231 238 deleted := len(dbTags) - len(foundTags) 232 239 if deleted > 0 { 233 240 fmt.Printf("Backfill: Deleted %d orphaned tags for %s\n", deleted, did) 241 + } 242 + 243 + case atproto.StarCollection: 244 + // Reconcile stars - delete stars that no longer exist on PDS 245 + // Star counts will be calculated on demand from the stars table 246 + if err := db.DeleteStarsNotInList(b.db, did, foundStars); err != nil { 247 + return fmt.Errorf("failed to delete orphaned stars: %w", err) 234 248 } 235 249 } 236 250 ··· 336 350 return fmt.Errorf("failed to unmarshal star: %w", err) 337 351 } 338 352 339 - // Increment star count for the repository being starred 353 + // Upsert the star record (idempotent - won't duplicate) 340 354 // The DID here is the starrer (user who starred) 341 355 // The subject contains the owner DID and repository 342 - return db.IncrementStarCount(b.db, starRecord.Subject.DID, starRecord.Subject.Repository) 356 + // Star count will be calculated on demand from the stars table 357 + return db.UpsertStar(b.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 343 358 } 344 359 345 360 // ensureUser resolves and upserts a user by DID
+10 -17
pkg/appview/jetstream/worker.go
··· 415 415 } 416 416 417 417 if commit.Operation == "delete" { 418 - // Unstar - parse the record to get the subject (owner DID and repository) 419 - var starRecord atproto.StarRecord 420 - if commit.Record != nil { 421 - recordBytes, err := json.Marshal(commit.Record) 422 - if err != nil { 423 - return fmt.Errorf("failed to marshal record: %w", err) 424 - } 425 - if err := json.Unmarshal(recordBytes, &starRecord); err != nil { 426 - return fmt.Errorf("failed to unmarshal star: %w", err) 427 - } 418 + // Unstar - parse the rkey to get the subject (owner DID and repository) 419 + // Delete events don't include the full record, but the rkey contains the info we need 420 + ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey) 421 + if err != nil { 422 + return fmt.Errorf("failed to parse star rkey: %w", err) 423 + } 428 424 429 - // Decrement star count 430 - return db.DecrementStarCount(w.db, starRecord.Subject.DID, starRecord.Subject.Repository) 431 - } 432 - // If no record data, we can't determine what was unstarred 433 - return nil 425 + // Delete the star record 426 + return db.DeleteStar(w.db, commit.DID, ownerDID, repository) 434 427 } 435 428 436 429 // Parse star record ··· 447 440 return nil 448 441 } 449 442 450 - // Increment star count for the repository being starred 451 - return db.IncrementStarCount(w.db, starRecord.Subject.DID, starRecord.Subject.Repository) 443 + // Upsert the star record (idempotent - star count will be calculated on demand) 444 + return db.UpsertStar(w.db, commit.DID, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 452 445 } 453 446 454 447 // JetstreamEvent represents a Jetstream event
+2 -2
pkg/appview/static/js/app.js
··· 172 172 starCountEl.textContent = Math.max(0, currentCount - 1); 173 173 } 174 174 175 - // Refresh actual count from server (will correct if optimistic update was wrong) 176 - await loadStarCount(handle, repository); 175 + // Don't fetch count immediately - trust the optimistic update 176 + // The actual count will be correct on next page load 177 177 178 178 } catch (err) { 179 179 console.error('Error toggling star:', err);
+17
pkg/atproto/lexicon.go
··· 3 3 import ( 4 4 "encoding/base64" 5 5 "encoding/json" 6 + "fmt" 7 + "strings" 6 8 "time" 7 9 ) 8 10 ··· 302 304 combined := ownerDID + "/" + repository 303 305 return base64.RawURLEncoding.EncodeToString([]byte(combined)) 304 306 } 307 + 308 + // ParseStarRecordKey decodes a star record key back to ownerDID and repository 309 + func ParseStarRecordKey(rkey string) (ownerDID, repository string, err error) { 310 + decoded, err := base64.RawURLEncoding.DecodeString(rkey) 311 + if err != nil { 312 + return "", "", fmt.Errorf("failed to decode star rkey: %w", err) 313 + } 314 + 315 + parts := strings.SplitN(string(decoded), "/", 2) 316 + if len(parts) != 2 { 317 + return "", "", fmt.Errorf("invalid star rkey format: %s", string(decoded)) 318 + } 319 + 320 + return parts[0], parts[1], nil 321 + }