A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codeberg-source 570 lines 18 kB view raw
1// Package jetstream provides an ATProto Jetstream consumer for real-time updates. 2// It connects to the Bluesky Jetstream WebSocket, processes repository events, 3// indexes manifests and tags, and populates the AppView database for the web UI. 4package jetstream 5 6import ( 7 "context" 8 "database/sql" 9 "encoding/json" 10 "fmt" 11 "log/slog" 12 "net/url" 13 "sync" 14 "time" 15 16 "atcr.io/pkg/appview/db" 17 "atcr.io/pkg/atproto" 18 "github.com/gorilla/websocket" 19 "github.com/klauspost/compress/zstd" 20) 21 22// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups 23type UserCache struct { 24 cache map[string]*db.User 25} 26 27// EventCallback is called for each processed event 28type EventCallback func(timeUS int64) 29 30// Worker consumes Jetstream events and populates the UI database 31type Worker struct { 32 db *sql.DB 33 jetstreamURL string 34 startCursor int64 35 wantedCollections []string 36 debugCollectionCount int 37 processor *Processor // Shared processor for DB operations 38 statsCache *StatsCache // In-memory cache for stats aggregation across holds 39 eventCallback EventCallback 40 connStartTime time.Time // Track when connection started for debugging 41 42 // Ping/pong tracking for connection health 43 pingsSent int64 44 pongsReceived int64 45 lastPongTime time.Time 46 pongMutex sync.Mutex 47 48 // In-memory cursor tracking for reconnects 49 lastCursor int64 50 cursorMutex sync.RWMutex 51} 52 53// NewWorker creates a new Jetstream worker 54// startCursor: Unix microseconds timestamp to start from (0 = start from now) 55func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker { 56 if jetstreamURL == "" { 57 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe" 58 } 59 60 // Create shared stats cache for aggregating across holds 61 statsCache := NewStatsCache() 62 63 return &Worker{ 64 db: database, 65 jetstreamURL: jetstreamURL, 66 startCursor: startCursor, 67 wantedCollections: []string{ 68 "io.atcr.*", // Subscribe to all ATCR collections 69 }, 70 statsCache: statsCache, 71 processor: NewProcessor(database, true, statsCache), // Use cache for live streaming 72 } 73} 74 75// Start begins consuming Jetstream events 76// This is a blocking function that runs until the context is cancelled 77func (w *Worker) Start(ctx context.Context) error { 78 // Build connection URL with filters 79 u, err := url.Parse(w.jetstreamURL) 80 if err != nil { 81 return fmt.Errorf("invalid jetstream URL: %w", err) 82 } 83 84 q := u.Query() 85 for _, collection := range w.wantedCollections { 86 q.Add("wantedCollections", collection) 87 } 88 89 // Add cursor if specified (for backfilling historical data or reconnects) 90 if w.startCursor > 0 { 91 q.Set("cursor", fmt.Sprintf("%d", w.startCursor)) 92 93 // Calculate lag (cursor is in microseconds) 94 now := time.Now().UnixMicro() 95 lagSeconds := float64(now-w.startCursor) / 1_000_000.0 96 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds) 97 } 98 99 // Disable compression for now to debug 100 // q.Set("compress", "true") 101 u.RawQuery = q.Encode() 102 103 slog.Info("Connecting to Jetstream", "url", u.String()) 104 105 // Connect to Jetstream 106 conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) 107 if err != nil { 108 return fmt.Errorf("failed to connect to jetstream: %w", err) 109 } 110 defer conn.Close() 111 112 // Track connection start time for debugging 113 w.connStartTime = time.Now() 114 115 // Reset ping/pong counters for this connection 116 w.pongMutex.Lock() 117 w.pingsSent = 0 118 w.pongsReceived = 0 119 w.lastPongTime = time.Now() 120 w.pongMutex.Unlock() 121 122 // Set up pong handler - called when server responds to our ping 123 conn.SetPongHandler(func(appData string) error { 124 w.pongMutex.Lock() 125 w.pongsReceived++ 126 w.lastPongTime = time.Now() 127 w.pongMutex.Unlock() 128 129 // Reset read deadline - we know connection is alive 130 // Allow 90 seconds for next pong (3x ping interval) 131 conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 132 return nil 133 }) 134 135 // Set initial read deadline 136 conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 137 138 // Create zstd decoder for decompressing messages 139 decoder, err := zstd.NewReader(nil) 140 if err != nil { 141 return fmt.Errorf("failed to create zstd decoder: %w", err) 142 } 143 defer decoder.Close() 144 145 slog.Info("Connected to Jetstream, listening for events...") 146 147 // Start heartbeat ticker to show Jetstream is alive 148 heartbeatTicker := time.NewTicker(30 * time.Second) 149 defer heartbeatTicker.Stop() 150 151 // Start ping ticker for keepalive 152 pingTicker := time.NewTicker(30 * time.Second) 153 defer pingTicker.Stop() 154 155 // Start ping sender goroutine 156 pingDone := make(chan struct{}) 157 defer close(pingDone) 158 159 go func() { 160 for { 161 select { 162 case <-ctx.Done(): 163 return 164 case <-pingDone: 165 return 166 case <-pingTicker.C: 167 // Check if we've received a pong recently 168 w.pongMutex.Lock() 169 timeSinceLastPong := time.Since(w.lastPongTime) 170 pingsTotal := w.pingsSent 171 pongsTotal := w.pongsReceived 172 w.pongMutex.Unlock() 173 174 // If no pong for 60 seconds, connection is likely dead 175 if timeSinceLastPong > 60*time.Second { 176 slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal) 177 conn.Close() 178 return 179 } 180 181 // Send ping with write deadline 182 conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) 183 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 184 slog.Warn("Jetstream failed to send ping", "error", err) 185 conn.Close() 186 return 187 } 188 189 w.pongMutex.Lock() 190 w.pingsSent++ 191 w.pongMutex.Unlock() 192 } 193 } 194 }() 195 196 eventCount := 0 197 lastHeartbeat := time.Now() 198 199 // Read messages 200 for { 201 select { 202 case <-ctx.Done(): 203 return ctx.Err() 204 case <-heartbeatTicker.C: 205 elapsed := time.Since(lastHeartbeat) 206 slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds()) 207 eventCount = 0 208 lastHeartbeat = time.Now() 209 default: 210 _, message, err := conn.ReadMessage() 211 if err != nil { 212 // Calculate connection duration and idle time for debugging 213 connDuration := time.Since(w.connStartTime) 214 timeSinceLastEvent := time.Since(lastHeartbeat) 215 216 // Get ping/pong stats 217 w.pongMutex.Lock() 218 pingsTotal := w.pingsSent 219 pongsTotal := w.pongsReceived 220 timeSinceLastPong := time.Since(w.lastPongTime) 221 w.pongMutex.Unlock() 222 223 // Calculate ping/pong success rate 224 var pongRate float64 225 if pingsTotal > 0 { 226 pongRate = float64(pongsTotal) / float64(pingsTotal) * 100 227 } 228 229 // Determine diagnosis 230 var diagnosis string 231 if pongRate >= 95 && timeSinceLastPong < 60*time.Second { 232 diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption" 233 } else if timeSinceLastPong > 60*time.Second { 234 diagnosis = "Connection died (no pong for >60s), network issue detected" 235 } else if pongRate < 80 { 236 diagnosis = "Connection unstable (low pong rate), network quality issues" 237 } else { 238 diagnosis = "Connection closed unexpectedly" 239 } 240 241 // Log detailed context about the failure 242 slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis) 243 244 return fmt.Errorf("failed to read message: %w", err) 245 } 246 247 // For now, process uncompressed messages 248 // TODO: Re-enable compression once debugging is complete 249 _ = decoder // Keep decoder to avoid unused variable error 250 251 if err := w.processMessage(message); err != nil { 252 slog.Error("ERROR processing message", "error", err) 253 // Continue processing other messages 254 } else { 255 eventCount++ 256 } 257 } 258 } 259} 260 261// SetEventCallback sets a callback to be called for each event 262func (w *Worker) SetEventCallback(cb EventCallback) { 263 w.eventCallback = cb 264} 265 266// GetLastCursor returns the last processed cursor (time_us) for reconnects 267func (w *Worker) GetLastCursor() int64 { 268 w.cursorMutex.RLock() 269 defer w.cursorMutex.RUnlock() 270 return w.lastCursor 271} 272 273// processMessage processes a single Jetstream event 274func (w *Worker) processMessage(message []byte) error { 275 var event JetstreamEvent 276 if err := json.Unmarshal(message, &event); err != nil { 277 return fmt.Errorf("failed to unmarshal event: %w", err) 278 } 279 280 // Update cursor for reconnects (do this first, even if processing fails) 281 w.cursorMutex.Lock() 282 w.lastCursor = event.TimeUS 283 w.cursorMutex.Unlock() 284 285 // Call callback if set 286 if w.eventCallback != nil { 287 w.eventCallback(event.TimeUS) 288 } 289 290 // Process based on event kind 291 switch event.Kind { 292 case "commit": 293 commit := event.Commit 294 if commit == nil { 295 return nil 296 } 297 298 // Set DID on commit from parent event 299 commit.DID = event.DID 300 301 // Debug: log first few collections we see to understand what's coming through 302 if w.debugCollectionCount < 5 { 303 slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID) 304 w.debugCollectionCount++ 305 } 306 307 // Process based on collection 308 switch commit.Collection { 309 case atproto.ManifestCollection: 310 slog.Info("Jetstream processing manifest event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 311 return w.processManifest(commit) 312 case atproto.TagCollection: 313 slog.Info("Jetstream processing tag event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 314 return w.processTag(commit) 315 case atproto.StarCollection: 316 slog.Info("Jetstream processing star event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 317 return w.processStar(commit) 318 case atproto.RepoPageCollection: 319 slog.Info("Jetstream processing repo page event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 320 return w.processRepoPage(commit) 321 case atproto.StatsCollection: 322 slog.Info("Jetstream processing stats event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 323 return w.processStats(commit) 324 default: 325 // Ignore other collections 326 return nil 327 } 328 329 case "identity": 330 if event.Identity == nil { 331 return nil 332 } 333 return w.processIdentity(&event) 334 335 case "account": 336 if event.Account == nil { 337 return nil 338 } 339 return w.processAccount(&event) 340 341 default: 342 // Ignore unknown event kinds 343 return nil 344 } 345} 346 347// processManifest processes a manifest commit event 348func (w *Worker) processManifest(commit *CommitEvent) error { 349 // Resolve and upsert user with handle/PDS endpoint 350 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 351 return fmt.Errorf("failed to ensure user: %w", err) 352 } 353 354 if commit.Operation == "delete" { 355 // Delete manifest - rkey is just the digest, repository is not encoded 356 digest := commit.RKey 357 if err := db.DeleteManifest(w.db, commit.DID, "", digest); err != nil { 358 return err 359 } 360 // Clean up any orphaned tags pointing to this manifest 361 return db.CleanupOrphanedTags(w.db, commit.DID) 362 } 363 364 // Parse manifest record 365 if commit.Record == nil { 366 return nil // No record data, can't process 367 } 368 369 // Marshal map to bytes for processing 370 recordBytes, err := json.Marshal(commit.Record) 371 if err != nil { 372 return fmt.Errorf("failed to marshal record: %w", err) 373 } 374 375 // Use shared processor for DB operations 376 _, err = w.processor.ProcessManifest(context.Background(), commit.DID, recordBytes) 377 return err 378} 379 380// processTag processes a tag commit event 381func (w *Worker) processTag(commit *CommitEvent) error { 382 // Resolve and upsert user with handle/PDS endpoint 383 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 384 return fmt.Errorf("failed to ensure user: %w", err) 385 } 386 387 if commit.Operation == "delete" { 388 // Delete tag - decode rkey back to repository and tag 389 repo, tag := atproto.RKeyToRepositoryTag(commit.RKey) 390 slog.Info("Jetstream deleting tag", "did", commit.DID, "repository", repo, "tag", tag, "rkey", commit.RKey) 391 if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil { 392 slog.Error("Jetstream ERROR deleting tag", "error", err) 393 return err 394 } 395 slog.Info("Jetstream successfully deleted tag", "did", commit.DID, "repository", repo, "tag", tag) 396 return nil 397 } 398 399 // Parse tag record 400 if commit.Record == nil { 401 return nil 402 } 403 404 // Marshal map to bytes for processing 405 recordBytes, err := json.Marshal(commit.Record) 406 if err != nil { 407 return fmt.Errorf("failed to marshal record: %w", err) 408 } 409 410 // Use shared processor for DB operations 411 return w.processor.ProcessTag(context.Background(), commit.DID, recordBytes) 412} 413 414// processStar processes a star commit event 415func (w *Worker) processStar(commit *CommitEvent) error { 416 // Resolve and upsert the user who starred (starrer) 417 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 418 return fmt.Errorf("failed to ensure user: %w", err) 419 } 420 421 if commit.Operation == "delete" { 422 // Unstar - parse the rkey to get the subject (owner DID and repository) 423 // Delete events don't include the full record, but the rkey contains the info we need 424 ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey) 425 if err != nil { 426 return fmt.Errorf("failed to parse star rkey: %w", err) 427 } 428 429 // Delete the star record 430 return db.DeleteStar(w.db, commit.DID, ownerDID, repository) 431 } 432 433 // Parse star record 434 if commit.Record == nil { 435 return nil 436 } 437 438 // Marshal map to bytes for processing 439 recordBytes, err := json.Marshal(commit.Record) 440 if err != nil { 441 return fmt.Errorf("failed to marshal record: %w", err) 442 } 443 444 // Use shared processor for DB operations 445 return w.processor.ProcessStar(context.Background(), commit.DID, recordBytes) 446} 447 448// processRepoPage processes a repo page commit event 449func (w *Worker) processRepoPage(commit *CommitEvent) error { 450 // Resolve and upsert user with handle/PDS endpoint 451 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 452 return fmt.Errorf("failed to ensure user: %w", err) 453 } 454 455 isDelete := commit.Operation == "delete" 456 457 if isDelete { 458 // Delete - rkey is the repository name 459 slog.Info("Jetstream deleting repo page", "did", commit.DID, "repository", commit.RKey) 460 if err := w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, nil, true); err != nil { 461 slog.Error("Jetstream ERROR deleting repo page", "error", err) 462 return err 463 } 464 slog.Info("Jetstream successfully deleted repo page", "did", commit.DID, "repository", commit.RKey) 465 return nil 466 } 467 468 // Parse repo page record 469 if commit.Record == nil { 470 return nil 471 } 472 473 // Marshal map to bytes for processing 474 recordBytes, err := json.Marshal(commit.Record) 475 if err != nil { 476 return fmt.Errorf("failed to marshal record: %w", err) 477 } 478 479 // Use shared processor for DB operations 480 return w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, recordBytes, false) 481} 482 483// processStats processes a stats commit event from a hold PDS 484func (w *Worker) processStats(commit *CommitEvent) error { 485 isDelete := commit.Operation == "delete" 486 487 if isDelete { 488 // For delete events, we need to parse the rkey to get ownerDID + repository 489 // The rkey is deterministic: base32(sha256(ownerDID + "/" + repository)[:16]) 490 // Unfortunately, we can't reverse this - we need the record data 491 // Delete events don't include record data, so we can't delete from cache 492 // This is acceptable - stats will be refreshed on next update from hold 493 slog.Debug("Jetstream ignoring stats delete event (cannot reverse rkey)", "did", commit.DID, "rkey", commit.RKey) 494 return nil 495 } 496 497 // Parse stats record 498 if commit.Record == nil { 499 return nil 500 } 501 502 // Marshal map to bytes for processing 503 recordBytes, err := json.Marshal(commit.Record) 504 if err != nil { 505 return fmt.Errorf("failed to marshal record: %w", err) 506 } 507 508 // Use shared processor - commit.DID is the hold's DID 509 return w.processor.ProcessStats(context.Background(), commit.DID, recordBytes, false) 510} 511 512// processIdentity processes an identity event (handle change) 513func (w *Worker) processIdentity(event *JetstreamEvent) error { 514 if event.Identity == nil { 515 return nil 516 } 517 518 identity := event.Identity 519 // Process via shared processor (only ATCR users will be logged at Info level) 520 return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle) 521} 522 523// processAccount processes an account event (status change) 524func (w *Worker) processAccount(event *JetstreamEvent) error { 525 if event.Account == nil { 526 return nil 527 } 528 529 account := event.Account 530 // Process via shared processor (only ATCR users will be logged at Info level) 531 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status) 532} 533 534// JetstreamEvent represents a Jetstream event 535type JetstreamEvent struct { 536 DID string `json:"did"` 537 TimeUS int64 `json:"time_us"` 538 Kind string `json:"kind"` // "commit", "identity", "account" 539 Commit *CommitEvent `json:"commit,omitempty"` 540 Identity *IdentityInfo `json:"identity,omitempty"` 541 Account *AccountInfo `json:"account,omitempty"` 542} 543 544// CommitEvent represents a commit event (create/update/delete) 545type CommitEvent struct { 546 Rev string `json:"rev"` 547 Operation string `json:"operation"` // "create", "update", "delete" 548 Collection string `json:"collection"` 549 RKey string `json:"rkey"` 550 Record map[string]any `json:"record,omitempty"` 551 CID string `json:"cid,omitempty"` 552 DID string `json:"-"` // Set from parent event 553} 554 555// IdentityInfo represents an identity event 556type IdentityInfo struct { 557 DID string `json:"did"` 558 Handle string `json:"handle"` 559 Seq int64 `json:"seq"` 560 Time string `json:"time"` 561} 562 563// AccountInfo represents an account status event 564type AccountInfo struct { 565 Active bool `json:"active"` 566 DID string `json:"did"` 567 Seq int64 `json:"seq"` 568 Time string `json:"time"` 569 Status string `json:"status,omitempty"` 570}