A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

backfill cleanup

+30 -21
+7 -4
cmd/appview/serve.go
··· 601 601 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe" 602 602 } 603 603 604 - // Start real-time Jetstream worker (no cursor = start from now) 605 - worker := jetstream.NewWorker(database, jetstreamURL, 0) 604 + // Start real-time Jetstream worker with cursor tracking for reconnects 606 605 go func() { 606 + var lastCursor int64 = 0 // Start from now on first connect 607 607 for { 608 + worker := jetstream.NewWorker(database, jetstreamURL, lastCursor) 608 609 if err := worker.Start(context.Background()); err != nil { 610 + // Save cursor from this connection for next reconnect 611 + lastCursor = worker.GetLastCursor() 609 612 fmt.Printf("Jetstream: Real-time worker error: %v, reconnecting in 10s...\n", err) 610 613 time.Sleep(10 * time.Second) 611 614 } ··· 613 616 }() 614 617 fmt.Println("Jetstream: Real-time worker started") 615 618 616 - // Start backfill worker if enabled 617 - if backfillEnabled := os.Getenv("ATCR_BACKFILL_ENABLED"); backfillEnabled == "true" { 619 + // Start backfill worker (enabled by default, set ATCR_BACKFILL_ENABLED=false to disable) 620 + if backfillEnabled := os.Getenv("ATCR_BACKFILL_ENABLED"); backfillEnabled != "false" { 618 621 // Get relay endpoint for sync API (defaults to Bluesky's relay) 619 622 relayEndpoint := os.Getenv("ATCR_RELAY_ENDPOINT") 620 623 if relayEndpoint == "" {
+1 -1
docker-compose.yml
··· 21 21 ATCR_LOG_LEVEL: info 22 22 volumes: 23 23 # Auth keys (JWT signing keys) 24 - - atcr-auth:/var/lib/atcr/auth 24 + # - atcr-auth:/var/lib/atcr/auth 25 25 # UI database (includes OAuth sessions, devices, and Jetstream cache) 26 26 - atcr-ui:/var/lib/atcr 27 27 restart: unless-stopped
-14
pkg/appview/db/schema.go
··· 80 80 ); 81 81 CREATE INDEX IF NOT EXISTS idx_tags_did_repo ON tags(did, repository); 82 82 83 - CREATE TABLE IF NOT EXISTS firehose_cursor ( 84 - id INTEGER PRIMARY KEY CHECK (id = 1), 85 - cursor INTEGER NOT NULL, 86 - updated_at TIMESTAMP NOT NULL 87 - ); 88 - 89 - CREATE TABLE IF NOT EXISTS backfill_state ( 90 - id INTEGER PRIMARY KEY CHECK (id = 1), 91 - start_cursor INTEGER NOT NULL, 92 - current_cursor INTEGER NOT NULL, 93 - completed BOOLEAN NOT NULL DEFAULT 0, 94 - updated_at TIMESTAMP NOT NULL 95 - ); 96 - 97 83 CREATE TABLE IF NOT EXISTS oauth_sessions ( 98 84 session_key TEXT PRIMARY KEY, 99 85 account_did TEXT NOT NULL,
+22 -2
pkg/appview/jetstream/worker.go
··· 44 44 pongsReceived int64 45 45 lastPongTime time.Time 46 46 pongMutex sync.Mutex 47 + 48 + // In-memory cursor tracking for reconnects 49 + lastCursor int64 50 + cursorMutex sync.RWMutex 47 51 } 48 52 49 53 // NewWorker creates a new Jetstream worker ··· 83 87 q.Add("wantedCollections", collection) 84 88 } 85 89 86 - // Add cursor if specified (for backfilling historical data) 90 + // Add cursor if specified (for backfilling historical data or reconnects) 87 91 if w.startCursor > 0 { 88 92 q.Set("cursor", fmt.Sprintf("%d", w.startCursor)) 89 - fmt.Printf("Starting from cursor: %d (replaying historical events)\n", w.startCursor) 93 + 94 + // Calculate lag (cursor is in microseconds) 95 + now := time.Now().UnixMicro() 96 + lagSeconds := float64(now-w.startCursor) / 1_000_000.0 97 + fmt.Printf("Jetstream: Starting from cursor %d (%.1f seconds behind live)\n", w.startCursor, lagSeconds) 90 98 } 91 99 92 100 // Disable compression for now to debug ··· 263 271 w.eventCallback = cb 264 272 } 265 273 274 + // GetLastCursor returns the last processed cursor (time_us) for reconnects 275 + func (w *Worker) GetLastCursor() int64 { 276 + w.cursorMutex.RLock() 277 + defer w.cursorMutex.RUnlock() 278 + return w.lastCursor 279 + } 280 + 266 281 // processMessage processes a single Jetstream event 267 282 func (w *Worker) processMessage(message []byte) error { 268 283 var event JetstreamEvent 269 284 if err := json.Unmarshal(message, &event); err != nil { 270 285 return fmt.Errorf("failed to unmarshal event: %w", err) 271 286 } 287 + 288 + // Update cursor for reconnects (do this first, even if processing fails) 289 + w.cursorMutex.Lock() 290 + w.lastCursor = event.TimeUS 291 + w.cursorMutex.Unlock() 272 292 273 293 // Call callback if set 274 294 if w.eventCallback != nil {