A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1// Package jetstream provides an ATProto Jetstream consumer for real-time updates.
2// It connects to the Bluesky Jetstream WebSocket, processes repository events,
3// indexes manifests and tags, and populates the AppView database for the web UI.
4package jetstream
5
6import (
7 "context"
8 "database/sql"
9 "encoding/json"
10 "fmt"
11 "log/slog"
12 "net/url"
13 "sync"
14 "time"
15
16 "atcr.io/pkg/appview/db"
17 "atcr.io/pkg/atproto"
18 "github.com/gorilla/websocket"
19 "github.com/klauspost/compress/zstd"
20)
21
22// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
23type UserCache struct {
24 cache map[string]*db.User
25}
26
27// EventCallback is called for each processed event
28type EventCallback func(timeUS int64)
29
30// Worker consumes Jetstream events and populates the UI database
31type Worker struct {
32 db *sql.DB
33 jetstreamURL string
34 startCursor int64
35 wantedCollections []string
36 debugCollectionCount int
37 processor *Processor // Shared processor for DB operations
38 statsCache *StatsCache // In-memory cache for stats aggregation across holds
39 eventCallback EventCallback
40 connStartTime time.Time // Track when connection started for debugging
41
42 // Ping/pong tracking for connection health
43 pingsSent int64
44 pongsReceived int64
45 lastPongTime time.Time
46 pongMutex sync.Mutex
47
48 // In-memory cursor tracking for reconnects
49 lastCursor int64
50 cursorMutex sync.RWMutex
51}
52
53// NewWorker creates a new Jetstream worker
54// startCursor: Unix microseconds timestamp to start from (0 = start from now)
55func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker {
56 if jetstreamURL == "" {
57 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
58 }
59
60 // Create shared stats cache for aggregating across holds
61 statsCache := NewStatsCache()
62
63 return &Worker{
64 db: database,
65 jetstreamURL: jetstreamURL,
66 startCursor: startCursor,
67 wantedCollections: []string{
68 "io.atcr.*", // Subscribe to all ATCR collections
69 },
70 statsCache: statsCache,
71 processor: NewProcessor(database, true, statsCache), // Use cache for live streaming
72 }
73}
74
75// Start begins consuming Jetstream events
76// This is a blocking function that runs until the context is cancelled
77func (w *Worker) Start(ctx context.Context) error {
78 // Build connection URL with filters
79 u, err := url.Parse(w.jetstreamURL)
80 if err != nil {
81 return fmt.Errorf("invalid jetstream URL: %w", err)
82 }
83
84 q := u.Query()
85 for _, collection := range w.wantedCollections {
86 q.Add("wantedCollections", collection)
87 }
88
89 // Add cursor if specified (for backfilling historical data or reconnects)
90 if w.startCursor > 0 {
91 q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
92
93 // Calculate lag (cursor is in microseconds)
94 now := time.Now().UnixMicro()
95 lagSeconds := float64(now-w.startCursor) / 1_000_000.0
96 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds)
97 }
98
99 // Disable compression for now to debug
100 // q.Set("compress", "true")
101 u.RawQuery = q.Encode()
102
103 slog.Info("Connecting to Jetstream", "url", u.String())
104
105 // Connect to Jetstream
106 conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
107 if err != nil {
108 return fmt.Errorf("failed to connect to jetstream: %w", err)
109 }
110 defer conn.Close()
111
112 // Track connection start time for debugging
113 w.connStartTime = time.Now()
114
115 // Reset ping/pong counters for this connection
116 w.pongMutex.Lock()
117 w.pingsSent = 0
118 w.pongsReceived = 0
119 w.lastPongTime = time.Now()
120 w.pongMutex.Unlock()
121
122 // Set up pong handler - called when server responds to our ping
123 conn.SetPongHandler(func(appData string) error {
124 w.pongMutex.Lock()
125 w.pongsReceived++
126 w.lastPongTime = time.Now()
127 w.pongMutex.Unlock()
128
129 // Reset read deadline - we know connection is alive
130 // Allow 90 seconds for next pong (3x ping interval)
131 conn.SetReadDeadline(time.Now().Add(90 * time.Second))
132 return nil
133 })
134
135 // Set initial read deadline
136 conn.SetReadDeadline(time.Now().Add(90 * time.Second))
137
138 // Create zstd decoder for decompressing messages
139 decoder, err := zstd.NewReader(nil)
140 if err != nil {
141 return fmt.Errorf("failed to create zstd decoder: %w", err)
142 }
143 defer decoder.Close()
144
145 slog.Info("Connected to Jetstream, listening for events...")
146
147 // Start heartbeat ticker to show Jetstream is alive
148 heartbeatTicker := time.NewTicker(30 * time.Second)
149 defer heartbeatTicker.Stop()
150
151 // Start ping ticker for keepalive
152 pingTicker := time.NewTicker(30 * time.Second)
153 defer pingTicker.Stop()
154
155 // Start ping sender goroutine
156 pingDone := make(chan struct{})
157 defer close(pingDone)
158
159 go func() {
160 for {
161 select {
162 case <-ctx.Done():
163 return
164 case <-pingDone:
165 return
166 case <-pingTicker.C:
167 // Check if we've received a pong recently
168 w.pongMutex.Lock()
169 timeSinceLastPong := time.Since(w.lastPongTime)
170 pingsTotal := w.pingsSent
171 pongsTotal := w.pongsReceived
172 w.pongMutex.Unlock()
173
174 // If no pong for 60 seconds, connection is likely dead
175 if timeSinceLastPong > 60*time.Second {
176 slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal)
177 conn.Close()
178 return
179 }
180
181 // Send ping with write deadline
182 conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
183 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
184 slog.Warn("Jetstream failed to send ping", "error", err)
185 conn.Close()
186 return
187 }
188
189 w.pongMutex.Lock()
190 w.pingsSent++
191 w.pongMutex.Unlock()
192 }
193 }
194 }()
195
196 eventCount := 0
197 lastHeartbeat := time.Now()
198
199 // Read messages
200 for {
201 select {
202 case <-ctx.Done():
203 return ctx.Err()
204 case <-heartbeatTicker.C:
205 elapsed := time.Since(lastHeartbeat)
206 slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
207 eventCount = 0
208 lastHeartbeat = time.Now()
209 default:
210 _, message, err := conn.ReadMessage()
211 if err != nil {
212 // Calculate connection duration and idle time for debugging
213 connDuration := time.Since(w.connStartTime)
214 timeSinceLastEvent := time.Since(lastHeartbeat)
215
216 // Get ping/pong stats
217 w.pongMutex.Lock()
218 pingsTotal := w.pingsSent
219 pongsTotal := w.pongsReceived
220 timeSinceLastPong := time.Since(w.lastPongTime)
221 w.pongMutex.Unlock()
222
223 // Calculate ping/pong success rate
224 var pongRate float64
225 if pingsTotal > 0 {
226 pongRate = float64(pongsTotal) / float64(pingsTotal) * 100
227 }
228
229 // Determine diagnosis
230 var diagnosis string
231 if pongRate >= 95 && timeSinceLastPong < 60*time.Second {
232 diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption"
233 } else if timeSinceLastPong > 60*time.Second {
234 diagnosis = "Connection died (no pong for >60s), network issue detected"
235 } else if pongRate < 80 {
236 diagnosis = "Connection unstable (low pong rate), network quality issues"
237 } else {
238 diagnosis = "Connection closed unexpectedly"
239 }
240
241 // Log detailed context about the failure
242 slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis)
243
244 return fmt.Errorf("failed to read message: %w", err)
245 }
246
247 // For now, process uncompressed messages
248 // TODO: Re-enable compression once debugging is complete
249 _ = decoder // Keep decoder to avoid unused variable error
250
251 if err := w.processMessage(message); err != nil {
252 slog.Error("ERROR processing message", "error", err)
253 // Continue processing other messages
254 } else {
255 eventCount++
256 }
257 }
258 }
259}
260
261// SetEventCallback sets a callback to be called for each event
262func (w *Worker) SetEventCallback(cb EventCallback) {
263 w.eventCallback = cb
264}
265
266// GetLastCursor returns the last processed cursor (time_us) for reconnects
267func (w *Worker) GetLastCursor() int64 {
268 w.cursorMutex.RLock()
269 defer w.cursorMutex.RUnlock()
270 return w.lastCursor
271}
272
273// processMessage processes a single Jetstream event
274func (w *Worker) processMessage(message []byte) error {
275 var event JetstreamEvent
276 if err := json.Unmarshal(message, &event); err != nil {
277 return fmt.Errorf("failed to unmarshal event: %w", err)
278 }
279
280 // Update cursor for reconnects (do this first, even if processing fails)
281 w.cursorMutex.Lock()
282 w.lastCursor = event.TimeUS
283 w.cursorMutex.Unlock()
284
285 // Call callback if set
286 if w.eventCallback != nil {
287 w.eventCallback(event.TimeUS)
288 }
289
290 // Process based on event kind
291 switch event.Kind {
292 case "commit":
293 commit := event.Commit
294 if commit == nil {
295 return nil
296 }
297
298 // Set DID on commit from parent event
299 commit.DID = event.DID
300
301 // Debug: log first few collections we see to understand what's coming through
302 if w.debugCollectionCount < 5 {
303 slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID)
304 w.debugCollectionCount++
305 }
306
307 // Process based on collection
308 switch commit.Collection {
309 case atproto.ManifestCollection:
310 slog.Info("Jetstream processing manifest event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
311 return w.processManifest(commit)
312 case atproto.TagCollection:
313 slog.Info("Jetstream processing tag event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
314 return w.processTag(commit)
315 case atproto.StarCollection:
316 slog.Info("Jetstream processing star event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
317 return w.processStar(commit)
318 case atproto.RepoPageCollection:
319 slog.Info("Jetstream processing repo page event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
320 return w.processRepoPage(commit)
321 case atproto.StatsCollection:
322 slog.Info("Jetstream processing stats event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
323 return w.processStats(commit)
324 default:
325 // Ignore other collections
326 return nil
327 }
328
329 case "identity":
330 if event.Identity == nil {
331 return nil
332 }
333 return w.processIdentity(&event)
334
335 case "account":
336 if event.Account == nil {
337 return nil
338 }
339 return w.processAccount(&event)
340
341 default:
342 // Ignore unknown event kinds
343 return nil
344 }
345}
346
347// processManifest processes a manifest commit event
348func (w *Worker) processManifest(commit *CommitEvent) error {
349 // Resolve and upsert user with handle/PDS endpoint
350 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
351 return fmt.Errorf("failed to ensure user: %w", err)
352 }
353
354 if commit.Operation == "delete" {
355 // Delete manifest - rkey is just the digest, repository is not encoded
356 digest := commit.RKey
357 if err := db.DeleteManifest(w.db, commit.DID, "", digest); err != nil {
358 return err
359 }
360 // Clean up any orphaned tags pointing to this manifest
361 return db.CleanupOrphanedTags(w.db, commit.DID)
362 }
363
364 // Parse manifest record
365 if commit.Record == nil {
366 return nil // No record data, can't process
367 }
368
369 // Marshal map to bytes for processing
370 recordBytes, err := json.Marshal(commit.Record)
371 if err != nil {
372 return fmt.Errorf("failed to marshal record: %w", err)
373 }
374
375 // Use shared processor for DB operations
376 _, err = w.processor.ProcessManifest(context.Background(), commit.DID, recordBytes)
377 return err
378}
379
380// processTag processes a tag commit event
381func (w *Worker) processTag(commit *CommitEvent) error {
382 // Resolve and upsert user with handle/PDS endpoint
383 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
384 return fmt.Errorf("failed to ensure user: %w", err)
385 }
386
387 if commit.Operation == "delete" {
388 // Delete tag - decode rkey back to repository and tag
389 repo, tag := atproto.RKeyToRepositoryTag(commit.RKey)
390 slog.Info("Jetstream deleting tag", "did", commit.DID, "repository", repo, "tag", tag, "rkey", commit.RKey)
391 if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil {
392 slog.Error("Jetstream ERROR deleting tag", "error", err)
393 return err
394 }
395 slog.Info("Jetstream successfully deleted tag", "did", commit.DID, "repository", repo, "tag", tag)
396 return nil
397 }
398
399 // Parse tag record
400 if commit.Record == nil {
401 return nil
402 }
403
404 // Marshal map to bytes for processing
405 recordBytes, err := json.Marshal(commit.Record)
406 if err != nil {
407 return fmt.Errorf("failed to marshal record: %w", err)
408 }
409
410 // Use shared processor for DB operations
411 return w.processor.ProcessTag(context.Background(), commit.DID, recordBytes)
412}
413
414// processStar processes a star commit event
415func (w *Worker) processStar(commit *CommitEvent) error {
416 // Resolve and upsert the user who starred (starrer)
417 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
418 return fmt.Errorf("failed to ensure user: %w", err)
419 }
420
421 if commit.Operation == "delete" {
422 // Unstar - parse the rkey to get the subject (owner DID and repository)
423 // Delete events don't include the full record, but the rkey contains the info we need
424 ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey)
425 if err != nil {
426 return fmt.Errorf("failed to parse star rkey: %w", err)
427 }
428
429 // Delete the star record
430 return db.DeleteStar(w.db, commit.DID, ownerDID, repository)
431 }
432
433 // Parse star record
434 if commit.Record == nil {
435 return nil
436 }
437
438 // Marshal map to bytes for processing
439 recordBytes, err := json.Marshal(commit.Record)
440 if err != nil {
441 return fmt.Errorf("failed to marshal record: %w", err)
442 }
443
444 // Use shared processor for DB operations
445 return w.processor.ProcessStar(context.Background(), commit.DID, recordBytes)
446}
447
448// processRepoPage processes a repo page commit event
449func (w *Worker) processRepoPage(commit *CommitEvent) error {
450 // Resolve and upsert user with handle/PDS endpoint
451 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
452 return fmt.Errorf("failed to ensure user: %w", err)
453 }
454
455 isDelete := commit.Operation == "delete"
456
457 if isDelete {
458 // Delete - rkey is the repository name
459 slog.Info("Jetstream deleting repo page", "did", commit.DID, "repository", commit.RKey)
460 if err := w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, nil, true); err != nil {
461 slog.Error("Jetstream ERROR deleting repo page", "error", err)
462 return err
463 }
464 slog.Info("Jetstream successfully deleted repo page", "did", commit.DID, "repository", commit.RKey)
465 return nil
466 }
467
468 // Parse repo page record
469 if commit.Record == nil {
470 return nil
471 }
472
473 // Marshal map to bytes for processing
474 recordBytes, err := json.Marshal(commit.Record)
475 if err != nil {
476 return fmt.Errorf("failed to marshal record: %w", err)
477 }
478
479 // Use shared processor for DB operations
480 return w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, recordBytes, false)
481}
482
483// processStats processes a stats commit event from a hold PDS
484func (w *Worker) processStats(commit *CommitEvent) error {
485 isDelete := commit.Operation == "delete"
486
487 if isDelete {
488 // For delete events, we need to parse the rkey to get ownerDID + repository
489 // The rkey is deterministic: base32(sha256(ownerDID + "/" + repository)[:16])
490 // Unfortunately, we can't reverse this - we need the record data
491 // Delete events don't include record data, so we can't delete from cache
492 // This is acceptable - stats will be refreshed on next update from hold
493 slog.Debug("Jetstream ignoring stats delete event (cannot reverse rkey)", "did", commit.DID, "rkey", commit.RKey)
494 return nil
495 }
496
497 // Parse stats record
498 if commit.Record == nil {
499 return nil
500 }
501
502 // Marshal map to bytes for processing
503 recordBytes, err := json.Marshal(commit.Record)
504 if err != nil {
505 return fmt.Errorf("failed to marshal record: %w", err)
506 }
507
508 // Use shared processor - commit.DID is the hold's DID
509 return w.processor.ProcessStats(context.Background(), commit.DID, recordBytes, false)
510}
511
512// processIdentity processes an identity event (handle change)
513func (w *Worker) processIdentity(event *JetstreamEvent) error {
514 if event.Identity == nil {
515 return nil
516 }
517
518 identity := event.Identity
519 // Process via shared processor (only ATCR users will be logged at Info level)
520 return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle)
521}
522
523// processAccount processes an account event (status change)
524func (w *Worker) processAccount(event *JetstreamEvent) error {
525 if event.Account == nil {
526 return nil
527 }
528
529 account := event.Account
530 // Process via shared processor (only ATCR users will be logged at Info level)
531 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status)
532}
533
534// JetstreamEvent represents a Jetstream event
535type JetstreamEvent struct {
536 DID string `json:"did"`
537 TimeUS int64 `json:"time_us"`
538 Kind string `json:"kind"` // "commit", "identity", "account"
539 Commit *CommitEvent `json:"commit,omitempty"`
540 Identity *IdentityInfo `json:"identity,omitempty"`
541 Account *AccountInfo `json:"account,omitempty"`
542}
543
544// CommitEvent represents a commit event (create/update/delete)
545type CommitEvent struct {
546 Rev string `json:"rev"`
547 Operation string `json:"operation"` // "create", "update", "delete"
548 Collection string `json:"collection"`
549 RKey string `json:"rkey"`
550 Record map[string]any `json:"record,omitempty"`
551 CID string `json:"cid,omitempty"`
552 DID string `json:"-"` // Set from parent event
553}
554
555// IdentityInfo represents an identity event
556type IdentityInfo struct {
557 DID string `json:"did"`
558 Handle string `json:"handle"`
559 Seq int64 `json:"seq"`
560 Time string `json:"time"`
561}
562
563// AccountInfo represents an account status event
564type AccountInfo struct {
565 Active bool `json:"active"`
566 DID string `json:"did"`
567 Seq int64 `json:"seq"`
568 Time string `json:"time"`
569 Status string `json:"status,omitempty"`
570}