A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

backfill via com.atproto.sync.listReposByCollection

+731 -57
+26 -23
cmd/registry/serve.go
··· 460 460 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe" 461 461 } 462 462 463 - // Parse cursor for backfilling historical data 464 - // Set to Unix microseconds timestamp to replay from that point 465 - // Examples: 466 - // - 2 weeks ago: use `date -d '2 weeks ago' +%s` * 1000000 467 - // - Leave unset (or 0) to start from now 468 - var startCursor int64 469 - if cursorStr := os.Getenv("JETSTREAM_START_CURSOR"); cursorStr != "" { 470 - if cursor, err := time.Parse(time.RFC3339, cursorStr); err == nil { 471 - // Support RFC3339 format: "2025-09-23T00:00:00Z" 472 - startCursor = cursor.UnixMicro() 473 - fmt.Printf("Jetstream: Starting from %s (%d microseconds)\n", cursorStr, startCursor) 474 - } else if cursor, err := time.ParseDuration(cursorStr); err == nil { 475 - // Support duration format: "-336h" (2 weeks ago) 476 - startCursor = time.Now().Add(cursor).UnixMicro() 477 - fmt.Printf("Jetstream: Starting from %s ago (%d microseconds)\n", cursorStr, startCursor) 478 - } else { 479 - fmt.Printf("Warning: Invalid JETSTREAM_START_CURSOR format: %s\n", cursorStr) 480 - } 481 - } 482 - 483 - worker := jetstream.NewWorker(database, jetstreamURL, startCursor) 463 + // Start real-time Jetstream worker (no cursor = start from now) 464 + worker := jetstream.NewWorker(database, jetstreamURL, 0) 484 465 go func() { 485 466 for { 486 467 if err := worker.Start(context.Background()); err != nil { 487 - fmt.Printf("Jetstream worker error: %v, reconnecting in 10s...\n", err) 468 + fmt.Printf("Jetstream: Real-time worker error: %v, reconnecting in 10s...\n", err) 488 469 time.Sleep(10 * time.Second) 489 470 } 490 471 } 491 472 }() 473 + fmt.Println("Jetstream: Real-time worker started") 492 474 493 - fmt.Println("Jetstream worker started") 475 + // Start backfill worker if enabled 476 + if backfillEnabled := os.Getenv("ATCR_BACKFILL_ENABLED"); backfillEnabled == "true" { 477 + // Get BGS endpoint for sync API (defaults to Bluesky's BGS) 478 + bgsEndpoint := os.Getenv("ATCR_BGS_ENDPOINT") 479 + if bgsEndpoint == "" { 480 + bgsEndpoint = "https://bsky.network" 481 + } 482 + 483 + backfillWorker, err := jetstream.NewBackfillWorker(database, bgsEndpoint) 484 + if err != nil { 485 + fmt.Printf("Warning: Failed to create backfill worker: %v\n", err) 486 + } else { 487 + go func() { 488 + fmt.Printf("Backfill: Starting sync-based backfill from %s...\n", bgsEndpoint) 489 + if err := backfillWorker.Start(context.Background()); err != nil { 490 + fmt.Printf("Backfill: Finished with error: %v\n", err) 491 + } else { 492 + fmt.Println("Backfill: Completed successfully!") 493 + } 494 + }() 495 + } 496 + } 494 497 495 498 return database, sessionStore, templates, router 496 499 }
+3 -2
docker-compose.yml
··· 10 10 environment: 11 11 - ATCR_TOKEN_STORAGE_PATH=/var/lib/atcr/tokens/oauth-tokens.json 12 12 - ATCR_UI_ENABLED=true 13 - # Jetstream backfill: Replay 5 days of historical events 14 - # - JETSTREAM_START_CURSOR=-120h 13 + # Jetstream backfill: Replay historical events (runs once until caught up) 14 + # Examples: -120h (5 days ago), -336h (2 weeks ago), 2025-10-01T00:00:00Z 15 + - JETSTREAM_BACKFILL_START=-121h 15 16 volumes: 16 17 # Auth keys (JWT signing keys) 17 18 - atcr-auth:/var/lib/atcr/auth
+4 -4
go.mod
··· 9 9 github.com/distribution/reference v0.6.0 10 10 github.com/golang-jwt/jwt/v5 v5.2.2 11 11 github.com/google/uuid v1.6.0 12 + github.com/gorilla/mux v1.8.1 13 + github.com/gorilla/websocket v1.5.3 14 + github.com/klauspost/compress v1.18.0 15 + github.com/mattn/go-sqlite3 v1.14.32 12 16 github.com/opencontainers/go-digest v1.0.0 13 17 github.com/spf13/cobra v1.8.0 14 18 ) ··· 29 33 github.com/go-logr/logr v1.4.2 // indirect 30 34 github.com/go-logr/stdr v1.2.2 // indirect 31 35 github.com/gorilla/handlers v1.5.2 // indirect 32 - github.com/gorilla/mux v1.8.1 // indirect 33 - github.com/gorilla/websocket v1.5.3 // indirect 34 36 github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect 35 37 github.com/hashicorp/golang-lru/arc/v2 v2.0.6 // indirect 36 38 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 37 39 github.com/inconshreveable/mousetrap v1.1.0 // indirect 38 40 github.com/jmespath/go-jmespath v0.4.0 // indirect 39 - github.com/klauspost/compress v1.18.0 // indirect 40 - github.com/mattn/go-sqlite3 v1.14.32 // indirect 41 41 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect 42 42 github.com/opencontainers/image-spec v1.1.0 // indirect 43 43 github.com/prometheus/client_golang v1.20.5 // indirect
-2
go.sum
··· 94 94 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= 95 95 github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= 96 96 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= 97 - github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= 98 - github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= 99 97 github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= 100 98 github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= 101 99 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+231 -5
pkg/appview/db/queries.go
··· 2 2 3 3 import ( 4 4 "database/sql" 5 + "fmt" 6 + "strings" 5 7 "time" 6 8 ) 7 9 ··· 91 93 if lastPushStr != "" { 92 94 // Try multiple timestamp formats 93 95 formats := []string{ 94 - time.RFC3339Nano, // 2006-01-02T15:04:05.999999999Z07:00 95 - "2006-01-02 15:04:05.999999999-07:00", // SQLite with microseconds and timezone 96 - "2006-01-02 15:04:05.999999999", // SQLite with microseconds 97 - time.RFC3339, // 2006-01-02T15:04:05Z07:00 98 - "2006-01-02 15:04:05", // SQLite default 96 + time.RFC3339Nano, // 2006-01-02T15:04:05.999999999Z07:00 97 + "2006-01-02 15:04:05.999999999-07:00", // SQLite with microseconds and timezone 98 + "2006-01-02 15:04:05.999999999", // SQLite with microseconds 99 + time.RFC3339, // 2006-01-02T15:04:05Z07:00 100 + "2006-01-02 15:04:05", // SQLite default 99 101 } 100 102 101 103 for _, format := range formats { ··· 162 164 return repos, nil 163 165 } 164 166 167 + // GetUserByDID retrieves a user by DID 168 + func GetUserByDID(db *sql.DB, did string) (*User, error) { 169 + var user User 170 + err := db.QueryRow(` 171 + SELECT did, handle, pds_endpoint, last_seen 172 + FROM users 173 + WHERE did = ? 174 + `, did).Scan(&user.DID, &user.Handle, &user.PDSEndpoint, &user.LastSeen) 175 + 176 + if err == sql.ErrNoRows { 177 + return nil, nil 178 + } 179 + if err != nil { 180 + return nil, err 181 + } 182 + 183 + return &user, nil 184 + } 185 + 165 186 // UpsertUser inserts or updates a user record 166 187 func UpsertUser(db *sql.DB, user *User) error { 167 188 _, err := db.Exec(` ··· 175 196 return err 176 197 } 177 198 199 + // GetManifestDigestsForDID returns all manifest digests for a DID 200 + func GetManifestDigestsForDID(db *sql.DB, did string) ([]string, error) { 201 + rows, err := db.Query(` 202 + SELECT digest FROM manifests WHERE did = ? 203 + `, did) 204 + if err != nil { 205 + return nil, err 206 + } 207 + defer rows.Close() 208 + 209 + var digests []string 210 + for rows.Next() { 211 + var digest string 212 + if err := rows.Scan(&digest); err != nil { 213 + return nil, err 214 + } 215 + digests = append(digests, digest) 216 + } 217 + 218 + return digests, rows.Err() 219 + } 220 + 221 + // DeleteManifestsNotInList deletes all manifests for a DID that are not in the provided list 222 + func DeleteManifestsNotInList(db *sql.DB, did string, keepDigests []string) error { 223 + if len(keepDigests) == 0 { 224 + // No manifests to keep - delete all for this DID 225 + _, err := db.Exec(`DELETE FROM manifests WHERE did = ?`, did) 226 + return err 227 + } 228 + 229 + // Build placeholders for IN clause 230 + placeholders := make([]string, len(keepDigests)) 231 + args := []interface{}{did} 232 + for i, digest := range keepDigests { 233 + placeholders[i] = "?" 234 + args = append(args, digest) 235 + } 236 + 237 + query := fmt.Sprintf(` 238 + DELETE FROM manifests 239 + WHERE did = ? AND digest NOT IN (%s) 240 + `, strings.Join(placeholders, ",")) 241 + 242 + _, err := db.Exec(query, args...) 243 + return err 244 + } 245 + 246 + // GetTagsForDID returns all (repository, tag) pairs for a DID 247 + func GetTagsForDID(db *sql.DB, did string) ([]struct{ Repository, Tag string }, error) { 248 + rows, err := db.Query(` 249 + SELECT repository, tag FROM tags WHERE did = ? 250 + `, did) 251 + if err != nil { 252 + return nil, err 253 + } 254 + defer rows.Close() 255 + 256 + var tags []struct{ Repository, Tag string } 257 + for rows.Next() { 258 + var t struct{ Repository, Tag string } 259 + if err := rows.Scan(&t.Repository, &t.Tag); err != nil { 260 + return nil, err 261 + } 262 + tags = append(tags, t) 263 + } 264 + 265 + return tags, rows.Err() 266 + } 267 + 268 + // DeleteTagsNotInList deletes all tags for a DID that are not in the provided list 269 + func DeleteTagsNotInList(db *sql.DB, did string, keepTags []struct{ Repository, Tag string }) error { 270 + if len(keepTags) == 0 { 271 + // No tags to keep - delete all for this DID 272 + _, err := db.Exec(`DELETE FROM tags WHERE did = ?`, did) 273 + return err 274 + } 275 + 276 + // For tags, we need to check (repository, tag) pairs 277 + // Build a DELETE query that excludes the pairs we want to keep 278 + tx, err := db.Begin() 279 + if err != nil { 280 + return err 281 + } 282 + defer tx.Rollback() 283 + 284 + // First, get all current tags 285 + rows, err := tx.Query(`SELECT id, repository, tag FROM tags WHERE did = ?`, did) 286 + if err != nil { 287 + return err 288 + } 289 + 290 + var toDelete []int64 291 + for rows.Next() { 292 + var id int64 293 + var repo, tag string 294 + if err := rows.Scan(&id, &repo, &tag); err != nil { 295 + rows.Close() 296 + return err 297 + } 298 + 299 + // Check if this tag should be kept 300 + found := false 301 + for _, keep := range keepTags { 302 + if keep.Repository == repo && keep.Tag == tag { 303 + found = true 304 + break 305 + } 306 + } 307 + 308 + if !found { 309 + toDelete = append(toDelete, id) 310 + } 311 + } 312 + rows.Close() 313 + 314 + // Delete tags not in keep list 315 + for _, id := range toDelete { 316 + if _, err := tx.Exec(`DELETE FROM tags WHERE id = ?`, id); err != nil { 317 + return err 318 + } 319 + } 320 + 321 + return tx.Commit() 322 + } 323 + 178 324 // InsertManifest inserts a new manifest record 179 325 func InsertManifest(db *sql.DB, manifest *Manifest) (int64, error) { 180 326 result, err := db.Exec(` ··· 311 457 312 458 return count > 0, nil 313 459 } 460 + 461 + // BackfillState represents the backfill progress 462 + type BackfillState struct { 463 + StartCursor int64 464 + CurrentCursor int64 465 + Completed bool 466 + UpdatedAt time.Time 467 + } 468 + 469 + // GetBackfillState retrieves the backfill state 470 + func GetBackfillState(db *sql.DB) (*BackfillState, error) { 471 + var state BackfillState 472 + var updatedAtStr string 473 + 474 + err := db.QueryRow(` 475 + SELECT start_cursor, current_cursor, completed, updated_at 476 + FROM backfill_state 477 + WHERE id = 1 478 + `).Scan(&state.StartCursor, &state.CurrentCursor, &state.Completed, &updatedAtStr) 479 + 480 + if err == sql.ErrNoRows { 481 + return nil, nil // No backfill state exists 482 + } 483 + if err != nil { 484 + return nil, err 485 + } 486 + 487 + // Parse timestamp 488 + if updatedAtStr != "" { 489 + formats := []string{ 490 + time.RFC3339Nano, 491 + "2006-01-02 15:04:05.999999999-07:00", 492 + "2006-01-02 15:04:05.999999999", 493 + time.RFC3339, 494 + "2006-01-02 15:04:05", 495 + } 496 + for _, format := range formats { 497 + if t, err := time.Parse(format, updatedAtStr); err == nil { 498 + state.UpdatedAt = t 499 + break 500 + } 501 + } 502 + } 503 + 504 + return &state, nil 505 + } 506 + 507 + // UpsertBackfillState updates or creates backfill state 508 + func UpsertBackfillState(db *sql.DB, state *BackfillState) error { 509 + _, err := db.Exec(` 510 + INSERT INTO backfill_state (id, start_cursor, current_cursor, completed, updated_at) 511 + VALUES (1, ?, ?, ?, datetime('now')) 512 + ON CONFLICT(id) DO UPDATE SET 513 + start_cursor = excluded.start_cursor, 514 + current_cursor = excluded.current_cursor, 515 + completed = excluded.completed, 516 + updated_at = excluded.updated_at 517 + `, state.StartCursor, state.CurrentCursor, state.Completed) 518 + return err 519 + } 520 + 521 + // UpdateBackfillCursor updates just the current cursor position 522 + func UpdateBackfillCursor(db *sql.DB, cursor int64) error { 523 + _, err := db.Exec(` 524 + UPDATE backfill_state 525 + SET current_cursor = ?, updated_at = datetime('now') 526 + WHERE id = 1 527 + `, cursor) 528 + return err 529 + } 530 + 531 + // MarkBackfillCompleted marks the backfill as completed 532 + func MarkBackfillCompleted(db *sql.DB) error { 533 + _, err := db.Exec(` 534 + UPDATE backfill_state 535 + SET completed = 1, updated_at = datetime('now') 536 + WHERE id = 1 537 + `) 538 + return err 539 + }
+8
pkg/appview/db/schema.go
··· 63 63 cursor INTEGER NOT NULL, 64 64 updated_at TIMESTAMP NOT NULL 65 65 ); 66 + 67 + CREATE TABLE IF NOT EXISTS backfill_state ( 68 + id INTEGER PRIMARY KEY CHECK (id = 1), 69 + start_cursor INTEGER NOT NULL, 70 + current_cursor INTEGER NOT NULL, 71 + completed BOOLEAN NOT NULL DEFAULT 0, 72 + updated_at TIMESTAMP NOT NULL 73 + ); 66 74 ` 67 75 68 76 // InitDB initializes the SQLite database with the schema
+4 -4
pkg/appview/handlers/settings.go
··· 48 48 data := struct { 49 49 User *db.User 50 50 Profile struct { 51 - Handle string 52 - DID string 53 - PDSEndpoint string 54 - DefaultHold string 51 + Handle string 52 + DID string 53 + PDSEndpoint string 54 + DefaultHold string 55 55 } 56 56 SessionExpiry time.Time 57 57 Query string
+330
pkg/appview/jetstream/backfill.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "strings" 9 + "time" 10 + 11 + "atcr.io/pkg/appview/db" 12 + "atcr.io/pkg/atproto" 13 + ) 14 + 15 + // BackfillWorker uses com.atproto.sync.listReposByCollection to backfill historical data 16 + type BackfillWorker struct { 17 + db *sql.DB 18 + client *atproto.Client 19 + resolver *atproto.Resolver 20 + } 21 + 22 + // BackfillState tracks backfill progress 23 + type BackfillState struct { 24 + Collection string 25 + RepoCursor string // Cursor for listReposByCollection 26 + CurrentDID string // Current DID being processed 27 + RecordCursor string // Cursor for listRecords within current DID 28 + ProcessedRepos int 29 + ProcessedRecords int 30 + Completed bool 31 + } 32 + 33 + // NewBackfillWorker creates a backfill worker using sync API 34 + func NewBackfillWorker(database *sql.DB, pdsEndpoint string) (*BackfillWorker, error) { 35 + // Create client without auth - sync endpoints are public 36 + client := atproto.NewClient(pdsEndpoint, "", "") 37 + 38 + return &BackfillWorker{ 39 + db: database, 40 + client: client, 41 + resolver: atproto.NewResolver(), 42 + }, nil 43 + } 44 + 45 + // Start runs the backfill for all ATCR collections 46 + func (b *BackfillWorker) Start(ctx context.Context) error { 47 + fmt.Println("Backfill: Starting sync-based backfill...") 48 + 49 + collections := []string{ 50 + atproto.ManifestCollection, // io.atcr.manifest 51 + atproto.TagCollection, // io.atcr.tag 52 + } 53 + 54 + for _, collection := range collections { 55 + fmt.Printf("Backfill: Processing collection: %s\n", collection) 56 + 57 + if err := b.backfillCollection(ctx, collection); err != nil { 58 + return fmt.Errorf("failed to backfill collection %s: %w", collection, err) 59 + } 60 + 61 + fmt.Printf("Backfill: Completed collection: %s\n", collection) 62 + } 63 + 64 + fmt.Println("Backfill: All collections completed!") 65 + return nil 66 + } 67 + 68 + // backfillCollection backfills a single collection 69 + func (b *BackfillWorker) backfillCollection(ctx context.Context, collection string) error { 70 + var repoCursor string 71 + processedRepos := 0 72 + processedRecords := 0 73 + 74 + // Paginate through all repos with this collection 75 + for { 76 + // List repos that have records in this collection 77 + result, err := b.client.ListReposByCollection(ctx, collection, 1000, repoCursor) 78 + if err != nil { 79 + return fmt.Errorf("failed to list repos: %w", err) 80 + } 81 + 82 + fmt.Printf("Backfill: Found %d repos with %s (cursor: %s)\n", len(result.Repos), collection, repoCursor) 83 + 84 + // Process each repo (DID) 85 + for _, did := range result.Repos { 86 + recordCount, err := b.backfillRepo(ctx, did, collection) 87 + if err != nil { 88 + fmt.Printf("WARNING: Failed to backfill repo %s: %v\n", did, err) 89 + continue 90 + } 91 + 92 + processedRepos++ 93 + processedRecords += recordCount 94 + 95 + if processedRepos%10 == 0 { 96 + fmt.Printf("Backfill: Progress - %d repos, %d records\n", processedRepos, processedRecords) 97 + } 98 + } 99 + 100 + // Check if there are more pages 101 + if result.Cursor == "" { 102 + break 103 + } 104 + 105 + repoCursor = result.Cursor 106 + } 107 + 108 + fmt.Printf("Backfill: Collection %s complete - %d repos, %d records\n", collection, processedRepos, processedRecords) 109 + return nil 110 + } 111 + 112 + // backfillRepo backfills all records for a single repo/DID 113 + func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) { 114 + // Ensure user exists in database 115 + if err := b.ensureUser(ctx, did); err != nil { 116 + return 0, fmt.Errorf("failed to ensure user: %w", err) 117 + } 118 + 119 + var recordCursor string 120 + recordCount := 0 121 + 122 + // Track which records exist on the PDS for reconciliation 123 + var foundManifestDigests []string 124 + var foundTags []struct{ Repository, Tag string } 125 + 126 + // Paginate through all records for this repo 127 + for { 128 + records, cursor, err := b.client.ListRecordsForRepo(ctx, did, collection, 100, recordCursor) 129 + if err != nil { 130 + return recordCount, fmt.Errorf("failed to list records: %w", err) 131 + } 132 + 133 + // Process each record 134 + for _, record := range records { 135 + // Track what we found for deletion reconciliation 136 + if collection == atproto.ManifestCollection { 137 + var manifestRecord atproto.ManifestRecord 138 + if err := json.Unmarshal(record.Value, &manifestRecord); err == nil { 139 + foundManifestDigests = append(foundManifestDigests, manifestRecord.Digest) 140 + } 141 + } else if collection == atproto.TagCollection { 142 + var tagRecord atproto.TagRecord 143 + if err := json.Unmarshal(record.Value, &tagRecord); err == nil { 144 + foundTags = append(foundTags, struct{ Repository, Tag string }{ 145 + Repository: tagRecord.Repository, 146 + Tag: tagRecord.Tag, 147 + }) 148 + } 149 + } 150 + 151 + if err := b.processRecord(ctx, did, collection, &record); err != nil { 152 + fmt.Printf("WARNING: Failed to process record %s: %v\n", record.URI, err) 153 + continue 154 + } 155 + recordCount++ 156 + } 157 + 158 + // Check if there are more pages 159 + if cursor == "" { 160 + break 161 + } 162 + 163 + recordCursor = cursor 164 + } 165 + 166 + // Reconcile deletions - remove records from DB that no longer exist on PDS 167 + if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags); err != nil { 168 + fmt.Printf("WARNING: Failed to reconcile deletions for %s: %v\n", did, err) 169 + } 170 + 171 + return recordCount, nil 172 + } 173 + 174 + // reconcileDeletions removes records from the database that no longer exist on the PDS 175 + func (b *BackfillWorker) reconcileDeletions(did, collection string, foundManifestDigests []string, foundTags []struct{ Repository, Tag string }) error { 176 + switch collection { 177 + case atproto.ManifestCollection: 178 + // Get current manifests in DB 179 + dbDigests, err := db.GetManifestDigestsForDID(b.db, did) 180 + if err != nil { 181 + return fmt.Errorf("failed to get DB manifests: %w", err) 182 + } 183 + 184 + // Delete manifests not found on PDS 185 + if err := db.DeleteManifestsNotInList(b.db, did, foundManifestDigests); err != nil { 186 + return fmt.Errorf("failed to delete orphaned manifests: %w", err) 187 + } 188 + 189 + // Log deletions 190 + deleted := len(dbDigests) - len(foundManifestDigests) 191 + if deleted > 0 { 192 + fmt.Printf("Backfill: Deleted %d orphaned manifests for %s\n", deleted, did) 193 + } 194 + 195 + case atproto.TagCollection: 196 + // Get current tags in DB 197 + dbTags, err := db.GetTagsForDID(b.db, did) 198 + if err != nil { 199 + return fmt.Errorf("failed to get DB tags: %w", err) 200 + } 201 + 202 + // Delete tags not found on PDS 203 + if err := db.DeleteTagsNotInList(b.db, did, foundTags); err != nil { 204 + return fmt.Errorf("failed to delete orphaned tags: %w", err) 205 + } 206 + 207 + // Log deletions 208 + deleted := len(dbTags) - len(foundTags) 209 + if deleted > 0 { 210 + fmt.Printf("Backfill: Deleted %d orphaned tags for %s\n", deleted, did) 211 + } 212 + } 213 + 214 + return nil 215 + } 216 + 217 + // processRecord processes a single record and stores it in the database 218 + func (b *BackfillWorker) processRecord(ctx context.Context, did, collection string, record *atproto.Record) error { 219 + switch collection { 220 + case atproto.ManifestCollection: 221 + return b.processManifestRecord(did, record) 222 + case atproto.TagCollection: 223 + return b.processTagRecord(did, record) 224 + default: 225 + return fmt.Errorf("unsupported collection: %s", collection) 226 + } 227 + } 228 + 229 + // processManifestRecord processes a manifest record 230 + func (b *BackfillWorker) processManifestRecord(did string, record *atproto.Record) error { 231 + var manifestRecord atproto.ManifestRecord 232 + if err := json.Unmarshal(record.Value, &manifestRecord); err != nil { 233 + return fmt.Errorf("failed to unmarshal manifest: %w", err) 234 + } 235 + 236 + // Serialize full manifest as JSON for storage 237 + manifestJSON, err := json.Marshal(manifestRecord) 238 + if err != nil { 239 + return fmt.Errorf("failed to marshal manifest: %w", err) 240 + } 241 + 242 + // Insert manifest 243 + manifestID, err := db.InsertManifest(b.db, &db.Manifest{ 244 + DID: did, 245 + Repository: manifestRecord.Repository, 246 + Digest: manifestRecord.Digest, 247 + MediaType: manifestRecord.MediaType, 248 + SchemaVersion: manifestRecord.SchemaVersion, 249 + ConfigDigest: manifestRecord.Config.Digest, 250 + ConfigSize: manifestRecord.Config.Size, 251 + RawManifest: string(manifestJSON), 252 + HoldEndpoint: manifestRecord.HoldEndpoint, 253 + CreatedAt: manifestRecord.CreatedAt, 254 + }) 255 + if err != nil { 256 + // Skip if already exists 257 + if strings.Contains(err.Error(), "UNIQUE constraint failed") { 258 + return nil 259 + } 260 + return fmt.Errorf("failed to insert manifest: %w", err) 261 + } 262 + 263 + // Insert layers 264 + for i, layer := range manifestRecord.Layers { 265 + if err := db.InsertLayer(b.db, &db.Layer{ 266 + ManifestID: manifestID, 267 + Digest: layer.Digest, 268 + MediaType: layer.MediaType, 269 + Size: layer.Size, 270 + LayerIndex: i, 271 + }); err != nil { 272 + // Continue on error - layer might already exist 273 + continue 274 + } 275 + } 276 + 277 + return nil 278 + } 279 + 280 + // processTagRecord processes a tag record 281 + func (b *BackfillWorker) processTagRecord(did string, record *atproto.Record) error { 282 + var tagRecord atproto.TagRecord 283 + if err := json.Unmarshal(record.Value, &tagRecord); err != nil { 284 + return fmt.Errorf("failed to unmarshal tag: %w", err) 285 + } 286 + 287 + // Insert or update tag 288 + return db.UpsertTag(b.db, &db.Tag{ 289 + DID: did, 290 + Repository: tagRecord.Repository, 291 + Tag: tagRecord.Tag, 292 + Digest: tagRecord.ManifestDigest, 293 + CreatedAt: tagRecord.UpdatedAt, 294 + }) 295 + } 296 + 297 + // ensureUser resolves and upserts a user by DID 298 + func (b *BackfillWorker) ensureUser(ctx context.Context, did string) error { 299 + // Check if user already exists 300 + existingUser, err := db.GetUserByDID(b.db, did) 301 + if err == nil && existingUser != nil { 302 + // Update last seen 303 + existingUser.LastSeen = time.Now() 304 + return db.UpsertUser(b.db, existingUser) 305 + } 306 + 307 + // Resolve DID to get handle and PDS endpoint 308 + resolvedDID, pdsEndpoint, err := b.resolver.ResolveIdentity(ctx, did) 309 + if err != nil { 310 + // Fallback: use DID as handle 311 + resolvedDID = did 312 + pdsEndpoint = "https://bsky.social" 313 + } 314 + 315 + // Get handle from DID document 316 + handle, err := b.resolver.ResolveHandleFromDID(ctx, resolvedDID) 317 + if err != nil { 318 + handle = resolvedDID // Fallback to DID 319 + } 320 + 321 + // Upsert to database 322 + user := &db.User{ 323 + DID: resolvedDID, 324 + Handle: handle, 325 + PDSEndpoint: pdsEndpoint, 326 + LastSeen: time.Now(), 327 + } 328 + 329 + return db.UpsertUser(b.db, user) 330 + }
+21 -7
pkg/appview/jetstream/worker.go
··· 20 20 cache map[string]*db.User 21 21 } 22 22 23 + // EventCallback is called for each processed event 24 + type EventCallback func(timeUS int64) 25 + 23 26 // Worker consumes Jetstream events and populates the UI database 24 27 type Worker struct { 25 28 db *sql.DB ··· 29 32 debugCollectionCount int 30 33 userCache *UserCache 31 34 resolver *atproto.Resolver 35 + eventCallback EventCallback 32 36 } 33 37 34 38 // NewWorker creates a new Jetstream worker ··· 44 48 startCursor: startCursor, 45 49 wantedCollections: []string{ 46 50 atproto.ManifestCollection, // io.atcr.manifest 47 - atproto.TagCollection, // io.atcr.tag 51 + atproto.TagCollection, // io.atcr.tag 48 52 }, 49 53 userCache: &UserCache{ 50 54 cache: make(map[string]*db.User), ··· 132 136 } 133 137 } 134 138 139 + // SetEventCallback sets a callback to be called for each event 140 + func (w *Worker) SetEventCallback(cb EventCallback) { 141 + w.eventCallback = cb 142 + } 143 + 135 144 // processMessage processes a single Jetstream event 136 145 func (w *Worker) processMessage(message []byte) error { 137 146 var event JetstreamEvent 138 147 if err := json.Unmarshal(message, &event); err != nil { 139 148 return fmt.Errorf("failed to unmarshal event: %w", err) 149 + } 150 + 151 + // Call callback if set 152 + if w.eventCallback != nil { 153 + w.eventCallback(event.TimeUS) 140 154 } 141 155 142 156 // Only process commit events ··· 325 339 326 340 // JetstreamEvent represents a Jetstream event 327 341 type JetstreamEvent struct { 328 - DID string `json:"did"` 329 - TimeUS int64 `json:"time_us"` 330 - Kind string `json:"kind"` // "commit", "identity", "account" 331 - Commit *CommitEvent `json:"commit,omitempty"` 332 - Identity *IdentityInfo `json:"identity,omitempty"` 333 - Account *AccountInfo `json:"account,omitempty"` 342 + DID string `json:"did"` 343 + TimeUS int64 `json:"time_us"` 344 + Kind string `json:"kind"` // "commit", "identity", "account" 345 + Commit *CommitEvent `json:"commit,omitempty"` 346 + Identity *IdentityInfo `json:"identity,omitempty"` 347 + Account *AccountInfo `json:"account,omitempty"` 334 348 } 335 349 336 350 // CommitEvent represents a commit event (create/update/delete)
+94
pkg/atproto/client.go
··· 299 299 300 300 return data, nil 301 301 } 302 + 303 + // ListReposByCollectionResult represents the response from com.atproto.sync.listReposByCollection 304 + type ListReposByCollectionResult struct { 305 + Repos []string `json:"repos"` // Array of DIDs 306 + Cursor string `json:"cursor,omitempty"` 307 + } 308 + 309 + // ListReposByCollection lists all repos (DIDs) that have records in a collection 310 + // This is a network-wide query, not limited to a single PDS 311 + func (c *Client) ListReposByCollection(ctx context.Context, collection string, limit int, cursor string) (*ListReposByCollectionResult, error) { 312 + // Build URL with query parameters 313 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listReposByCollection?collection=%s", c.pdsEndpoint, collection) 314 + 315 + if limit > 0 { 316 + url += fmt.Sprintf("&limit=%d", limit) 317 + } 318 + if cursor != "" { 319 + url += fmt.Sprintf("&cursor=%s", cursor) 320 + } 321 + 322 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 323 + if err != nil { 324 + return nil, err 325 + } 326 + 327 + // This endpoint typically doesn't require auth for public data 328 + // but we include it if available 329 + if c.accessToken != "" { 330 + req.Header.Set("Authorization", c.authHeader()) 331 + } 332 + 333 + resp, err := c.httpClient.Do(req) 334 + if err != nil { 335 + return nil, fmt.Errorf("failed to list repos by collection: %w", err) 336 + } 337 + defer resp.Body.Close() 338 + 339 + if resp.StatusCode != http.StatusOK { 340 + bodyBytes, _ := io.ReadAll(resp.Body) 341 + return nil, fmt.Errorf("list repos by collection failed with status %d: %s", resp.StatusCode, string(bodyBytes)) 342 + } 343 + 344 + var result ListReposByCollectionResult 345 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 346 + return nil, fmt.Errorf("failed to decode response: %w", err) 347 + } 348 + 349 + return &result, nil 350 + } 351 + 352 + // ListRecordsForRepo lists records in a collection for a specific repo (DID) 353 + // This differs from ListRecords which uses the client's DID 354 + func (c *Client) ListRecordsForRepo(ctx context.Context, repoDID, collection string, limit int, cursor string) ([]Record, string, error) { 355 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s", 356 + c.pdsEndpoint, repoDID, collection) 357 + 358 + if limit > 0 { 359 + url += fmt.Sprintf("&limit=%d", limit) 360 + } 361 + if cursor != "" { 362 + url += fmt.Sprintf("&cursor=%s", cursor) 363 + } 364 + 365 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 366 + if err != nil { 367 + return nil, "", err 368 + } 369 + 370 + // This endpoint typically doesn't require auth for public records 371 + if c.accessToken != "" { 372 + req.Header.Set("Authorization", c.authHeader()) 373 + } 374 + 375 + resp, err := c.httpClient.Do(req) 376 + if err != nil { 377 + return nil, "", fmt.Errorf("failed to list records: %w", err) 378 + } 379 + defer resp.Body.Close() 380 + 381 + if resp.StatusCode != http.StatusOK { 382 + bodyBytes, _ := io.ReadAll(resp.Body) 383 + return nil, "", fmt.Errorf("list records failed with status %d: %s", resp.StatusCode, string(bodyBytes)) 384 + } 385 + 386 + var result struct { 387 + Records []Record `json:"records"` 388 + Cursor string `json:"cursor,omitempty"` 389 + } 390 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 391 + return nil, "", fmt.Errorf("failed to decode response: %w", err) 392 + } 393 + 394 + return result.Records, result.Cursor, nil 395 + }
+2 -2
pkg/auth/oauth/client.go
··· 45 45 dpopKey: dpopKey, 46 46 dpopTransport: NewDPoPTransport(http.DefaultTransport, dpopKey), 47 47 resolver: atproto.NewResolver(), 48 - baseUrl: baseURL, 48 + baseUrl: baseURL, 49 49 } 50 50 } 51 51 ··· 201 201 return newToken, nil 202 202 } 203 203 204 - func (c *Client) ClientID() (string) { 204 + func (c *Client) ClientID() string { 205 205 return c.ClientIDWithScopes(c.GetDefaultScopes()) 206 206 } 207 207
+8 -8
pkg/auth/oauth/server.go
··· 21 21 22 22 // Server handles OAuth authorization for the AppView 23 23 type Server struct { 24 - storage *RefreshTokenStorage 25 - sessionManager *session.Manager 26 - resolver *atproto.Resolver 27 - refresher *Refresher 28 - uiSessionStore UISessionStore 29 - baseURL string 30 - states map[string]*OAuthState 31 - statesMu sync.RWMutex 24 + storage *RefreshTokenStorage 25 + sessionManager *session.Manager 26 + resolver *atproto.Resolver 27 + refresher *Refresher 28 + uiSessionStore UISessionStore 29 + baseURL string 30 + states map[string]*OAuthState 31 + statesMu sync.RWMutex 32 32 } 33 33 34 34 // OAuthState tracks an in-progress OAuth flow