A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

try and implement firehose cursor for subscribeRepo

+394 -25
+22 -1
cmd/hold/main.go
··· 61 61 } 62 62 63 63 // Create event broadcaster for subscribeRepos firehose 64 - broadcaster = pds.NewEventBroadcaster(holdDID, 100) // Keep 100 events for backfill 64 + // Database path: carstore creates db.sqlite3 inside cfg.Database.Path 65 + var dbPath string 66 + if cfg.Database.Path != ":memory:" { 67 + dbPath = cfg.Database.Path + "/db.sqlite3" 68 + } else { 69 + dbPath = ":memory:" 70 + } 71 + broadcaster = pds.NewEventBroadcaster(holdDID, 100, dbPath) 72 + 73 + // Bootstrap events from existing repo records (one-time migration) 74 + if err := broadcaster.BootstrapFromRepo(holdPDS); err != nil { 75 + log.Printf("Warning: Failed to bootstrap events from repo: %v", err) 76 + } 65 77 66 78 // Wire up repo event handler to broadcaster 67 79 holdPDS.RepomgrRef().SetEventHandler(broadcaster.SetRepoEventHandler(), true) ··· 172 184 log.Printf("Warning: Failed to set status post to offline: %v", err) 173 185 } else { 174 186 log.Printf("Status post set to offline") 187 + } 188 + } 189 + 190 + // Close broadcaster database connection 191 + if broadcaster != nil { 192 + if err := broadcaster.Close(); err != nil { 193 + log.Printf("Warning: Failed to close broadcaster database: %v", err) 194 + } else { 195 + log.Printf("Broadcaster database closed") 175 196 } 176 197 } 177 198
+359 -11
pkg/hold/pds/events.go
··· 1 1 package pds 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 6 + "database/sql" 5 7 "encoding/json" 8 + "fmt" 6 9 "log" 10 + "strings" 7 11 "sync" 8 12 "time" 9 13 10 14 atproto "github.com/bluesky-social/indigo/api/atproto" 11 15 "github.com/bluesky-social/indigo/events" 12 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 + "github.com/bluesky-social/indigo/repo" 13 18 "github.com/gorilla/websocket" 14 19 "github.com/ipfs/go-cid" 20 + "github.com/ipld/go-car" 21 + carutil "github.com/ipld/go-car/util" 22 + _ "github.com/mattn/go-sqlite3" 15 23 ) 16 24 17 25 // EventBroadcaster manages WebSocket connections and broadcasts repo events ··· 19 27 mu sync.RWMutex 20 28 subscribers map[*Subscriber]bool 21 29 eventSeq int64 22 - eventHistory []HistoricalEvent // Ring buffer for cursor backfill 30 + eventHistory []HistoricalEvent // Ring buffer for cursor backfill (deprecated, kept for compatibility) 23 31 maxHistory int 24 - holdDID string // DID of the hold for setting repo field 32 + holdDID string // DID of the hold for setting repo field 33 + db *sql.DB // Database for persistent event storage 34 + dbPath string // Path to database file 25 35 } 26 36 27 37 // Subscriber represents a WebSocket client subscribed to the firehose ··· 50 60 Type string `json:"$type" cborgen:"$type"` // Always "#commit" 51 61 } 52 62 53 - // NewEventBroadcaster creates a new event broadcaster 54 - func NewEventBroadcaster(holdDID string, maxHistory int) *EventBroadcaster { 63 + // NewEventBroadcaster creates a new event broadcaster with persistent storage 64 + // dbPath should point to the carstore database file (e.g., "/path/to/pds/db.sqlite3") 65 + func NewEventBroadcaster(holdDID string, maxHistory int, dbPath string) *EventBroadcaster { 55 66 if maxHistory <= 0 { 56 67 maxHistory = 100 // Default to keeping 100 events 57 68 } 58 69 59 - return &EventBroadcaster{ 70 + broadcaster := &EventBroadcaster{ 60 71 subscribers: make(map[*Subscriber]bool), 61 72 eventSeq: 0, 62 73 eventHistory: make([]HistoricalEvent, 0, maxHistory), 63 74 maxHistory: maxHistory, 64 75 holdDID: holdDID, 76 + dbPath: dbPath, 65 77 } 78 + 79 + // Initialize database connection and schema 80 + if dbPath != "" && dbPath != ":memory:" { 81 + if err := broadcaster.initDatabase(); err != nil { 82 + log.Printf("Warning: Failed to initialize event database: %v", err) 83 + log.Printf("Events will not persist across restarts") 84 + } 85 + } 86 + 87 + return broadcaster 88 + } 89 + 90 + // initDatabase opens database connection, creates table, and loads last sequence 91 + func (b *EventBroadcaster) initDatabase() error { 92 + // Open database connection 93 + db, err := sql.Open("sqlite3", b.dbPath) 94 + if err != nil { 95 + return err 96 + } 97 + 98 + // Test connection 99 + if err := db.Ping(); err != nil { 100 + db.Close() 101 + return err 102 + } 103 + 104 + b.db = db 105 + 106 + // Create events table if it doesn't exist 107 + schema := ` 108 + CREATE TABLE IF NOT EXISTS firehose_events ( 109 + seq INTEGER PRIMARY KEY, 110 + commit_cid TEXT NOT NULL, 111 + rev TEXT NOT NULL, 112 + since_rev TEXT, 113 + repo_slice BLOB NOT NULL, 114 + ops_json TEXT NOT NULL, 115 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 116 + ); 117 + CREATE INDEX IF NOT EXISTS idx_firehose_events_rev ON firehose_events(rev); 118 + ` 119 + 120 + if _, err := db.Exec(schema); err != nil { 121 + db.Close() 122 + b.db = nil 123 + return err 124 + } 125 + 126 + // Load last sequence number from database 127 + var lastSeq sql.NullInt64 128 + err = db.QueryRow("SELECT MAX(seq) FROM firehose_events").Scan(&lastSeq) 129 + if err != nil { 130 + log.Printf("Warning: Failed to load last event sequence: %v", err) 131 + } else if lastSeq.Valid { 132 + b.eventSeq = lastSeq.Int64 133 + log.Printf("Loaded event sequence from database: seq=%d", b.eventSeq) 134 + } else { 135 + // Database is empty but might have existing repo records 136 + // This happens on first deployment after adding persistent events 137 + log.Printf("No events in database - will bootstrap from repo if needed") 138 + } 139 + 140 + return nil 141 + } 142 + 143 + // BootstrapFromRepo generates synthetic events from all current records in the repo 144 + // This is called once when deploying persistent events to an existing repo 145 + func (b *EventBroadcaster) BootstrapFromRepo(pds *HoldPDS) error { 146 + if b.db == nil { 147 + return fmt.Errorf("database not initialized") 148 + } 149 + 150 + // Check if we already have events 151 + var count int64 152 + if err := b.db.QueryRow("SELECT COUNT(*) FROM firehose_events").Scan(&count); err != nil { 153 + return fmt.Errorf("failed to check event count: %w", err) 154 + } 155 + 156 + if count > 0 { 157 + log.Printf("Database already has %d events, skipping bootstrap", count) 158 + return nil 159 + } 160 + 161 + ctx := context.Background() 162 + 163 + // Get current repo state 164 + session, err := pds.carstore.ReadOnlySession(pds.uid) 165 + if err != nil { 166 + return fmt.Errorf("failed to create session: %w", err) 167 + } 168 + 169 + head, err := pds.carstore.GetUserRepoHead(ctx, pds.uid) 170 + if err != nil || !head.Defined() { 171 + // Empty repo, nothing to bootstrap 172 + log.Printf("Empty repo, no events to bootstrap") 173 + return nil 174 + } 175 + 176 + repoHandle, err := repo.OpenRepo(ctx, session, head) 177 + if err != nil { 178 + return fmt.Errorf("failed to open repo: %w", err) 179 + } 180 + 181 + // Get current rev 182 + rev, err := pds.repomgr.GetRepoRev(ctx, pds.uid) 183 + if err != nil { 184 + return fmt.Errorf("failed to get repo rev: %w", err) 185 + } 186 + 187 + log.Printf("Bootstrapping firehose events from current repo state (head=%s, rev=%s)", head.String(), rev) 188 + 189 + var recordCount int64 190 + 191 + // Walk all records in the repo and create synthetic events 192 + err = repoHandle.ForEach(ctx, "", func(path string, recordCID cid.Cid) error { 193 + // Get record value 194 + _, recBytes, err := repoHandle.GetRecordBytes(ctx, path) 195 + if err != nil { 196 + log.Printf("Warning: failed to get record bytes for %s: %v", path, err) 197 + return nil // Skip this record but continue 198 + } 199 + 200 + recordValue, err := lexutil.CborDecodeValue(*recBytes) 201 + if err != nil { 202 + log.Printf("Warning: failed to decode record %s: %v", path, err) 203 + return nil 204 + } 205 + 206 + // Parse collection and rkey from path 207 + parts := strings.Split(path, "/") 208 + if len(parts) < 2 { 209 + return nil // Invalid path 210 + } 211 + 212 + collection := strings.Join(parts[:len(parts)-1], "/") 213 + rkey := parts[len(parts)-1] 214 + 215 + // Create synthetic RepoOp 216 + ops := []RepoOp{ 217 + { 218 + Kind: EvtKindCreateRecord, 219 + Collection: collection, 220 + Rkey: rkey, 221 + RecCid: &recordCID, 222 + Record: recordValue, 223 + }, 224 + } 225 + 226 + // Get CAR slice for this record (minimal - just the record block) 227 + var carBuf bytes.Buffer 228 + carHeader := &car.CarHeader{ 229 + Roots: []cid.Cid{head}, 230 + Version: 1, 231 + } 232 + if err := car.WriteHeader(carHeader, &carBuf); err != nil { 233 + log.Printf("Warning: failed to write CAR header: %v", err) 234 + return nil 235 + } 236 + 237 + // Write the record block 238 + if err := carutil.LdWrite(&carBuf, recordCID.Bytes(), *recBytes); err != nil { 239 + log.Printf("Warning: failed to write record block: %v", err) 240 + return nil 241 + } 242 + 243 + // Create synthetic RepoEvent 244 + repoEvent := &RepoEvent{ 245 + NewRoot: head, 246 + Rev: rev, 247 + Since: nil, // No "since" for bootstrap events 248 + RepoSlice: carBuf.Bytes(), 249 + Ops: ops, 250 + } 251 + 252 + // Convert to commit event and persist 253 + b.mu.Lock() 254 + b.eventSeq++ 255 + seq := b.eventSeq 256 + commitEvent := b.convertToCommitEvent(repoEvent, seq) 257 + 258 + // Persist to database 259 + if err := b.persistEvent(commitEvent); err != nil { 260 + b.mu.Unlock() 261 + return fmt.Errorf("failed to persist bootstrap event seq=%d: %w", seq, err) 262 + } 263 + 264 + // Also add to in-memory history for immediate use 265 + b.addToHistory(seq, commitEvent) 266 + b.mu.Unlock() 267 + 268 + recordCount++ 269 + return nil 270 + }) 271 + 272 + if err != nil { 273 + return fmt.Errorf("failed to walk repo: %w", err) 274 + } 275 + 276 + log.Printf("✅ Bootstrapped %d events from repo (seq now at %d)", recordCount, b.eventSeq) 277 + return nil 278 + } 279 + 280 + // Close closes the database connection 281 + func (b *EventBroadcaster) Close() error { 282 + if b.db != nil { 283 + return b.db.Close() 284 + } 285 + return nil 66 286 } 67 287 68 288 // Subscribe adds a new WebSocket subscriber ··· 78 298 currentSeq := b.eventSeq 79 299 b.mu.Unlock() 80 300 81 - // Send historical events if cursor is provided and < current seq 82 - // cursor=0 means "replay all events from the beginning" 83 - // cursor >= 0 triggers backfill, negative cursor means "no backfill" 84 - if cursor >= 0 && cursor < currentSeq { 85 - go b.backfillSubscriber(sub, cursor) 301 + // Handle cursor-based backfill: 302 + // - cursor < 0: No backfill, stream new events only 303 + // - cursor >= 0: Backfill events from cursor onwards 304 + // - cursor=0: Replay all events from beginning 305 + // - cursor < currentSeq: Normal backfill 306 + // - cursor >= currentSeq: Relay reconnecting after our restart, backfill from database 307 + if cursor >= 0 { 308 + if cursor < currentSeq { 309 + // Normal case: relay is behind, backfill missing events 310 + go b.backfillSubscriber(sub, cursor) 311 + } else if cursor > currentSeq { 312 + // Relay has cursor ahead of us - server was restarted 313 + // Database should have the events if we had them before 314 + log.Printf("Relay cursor %d > currentSeq %d (server restarted), attempting database backfill", cursor, currentSeq) 315 + go b.backfillSubscriber(sub, cursor) 316 + } 317 + // else cursor == currentSeq: relay is caught up, just stream new events 86 318 } 87 319 88 320 // Start goroutine to handle sending events to this subscriber ··· 114 346 // Convert RepoEvent to RepoCommitEvent 115 347 commitEvent := b.convertToCommitEvent(event, seq) 116 348 117 - // Store in history for backfill 349 + // Persist event to database 350 + if b.db != nil { 351 + if err := b.persistEvent(commitEvent); err != nil { 352 + log.Printf("Warning: Failed to persist event seq=%d to database: %v", seq, err) 353 + } 354 + } 355 + 356 + // Store in history for backfill (deprecated, but kept for compatibility) 118 357 b.addToHistory(seq, commitEvent) 119 358 120 359 // Broadcast to all subscribers ··· 129 368 } 130 369 } 131 370 371 + // persistEvent stores an event in the database 372 + func (b *EventBroadcaster) persistEvent(event *RepoCommitEvent) error { 373 + // Serialize ops to JSON 374 + opsJSON, err := json.Marshal(event.Ops) 375 + if err != nil { 376 + return err 377 + } 378 + 379 + // Get since_rev value (may be nil) 380 + var sinceRev sql.NullString 381 + if event.Since != nil { 382 + sinceRev = sql.NullString{String: *event.Since, Valid: true} 383 + } 384 + 385 + // Insert event 386 + query := ` 387 + INSERT INTO firehose_events (seq, commit_cid, rev, since_rev, repo_slice, ops_json) 388 + VALUES (?, ?, ?, ?, ?, ?) 389 + ` 390 + _, err = b.db.Exec(query, event.Seq, event.Commit, event.Rev, sinceRev, event.Blocks, opsJSON) 391 + return err 392 + } 393 + 132 394 // convertToCommitEvent converts a RepoEvent to a RepoCommitEvent 133 395 func (b *EventBroadcaster) convertToCommitEvent(event *RepoEvent, seq int64) *RepoCommitEvent { 134 396 // Convert RepoOps to atproto.SyncSubscribeRepos_RepoOp ··· 183 445 } 184 446 185 447 // backfillSubscriber sends historical events to a subscriber 448 + // Query events from database where seq > cursor 186 449 func (b *EventBroadcaster) backfillSubscriber(sub *Subscriber, cursor int64) { 450 + // If database is available, use it for backfill 451 + if b.db != nil { 452 + if err := b.backfillFromDatabase(sub, cursor); err != nil { 453 + log.Printf("Database backfill failed, falling back to in-memory: %v", err) 454 + b.backfillFromMemory(sub, cursor) 455 + } 456 + return 457 + } 458 + 459 + // Fall back to in-memory backfill 460 + b.backfillFromMemory(sub, cursor) 461 + } 462 + 463 + // backfillFromDatabase queries events from database and sends to subscriber 464 + func (b *EventBroadcaster) backfillFromDatabase(sub *Subscriber, cursor int64) error { 465 + // Query events where seq > cursor, ordered by seq 466 + query := ` 467 + SELECT seq, commit_cid, rev, since_rev, repo_slice, ops_json 468 + FROM firehose_events 469 + WHERE seq > ? 470 + ORDER BY seq ASC 471 + ` 472 + 473 + rows, err := b.db.Query(query, cursor) 474 + if err != nil { 475 + return err 476 + } 477 + defer rows.Close() 478 + 479 + for rows.Next() { 480 + var ( 481 + seq int64 482 + commitCID string 483 + rev string 484 + sinceRev sql.NullString 485 + repoSlice []byte 486 + opsJSON []byte 487 + ) 488 + 489 + if err := rows.Scan(&seq, &commitCID, &rev, &sinceRev, &repoSlice, &opsJSON); err != nil { 490 + log.Printf("Error scanning event row: %v", err) 491 + continue 492 + } 493 + 494 + // Deserialize ops from JSON 495 + var ops []*atproto.SyncSubscribeRepos_RepoOp 496 + if err := json.Unmarshal(opsJSON, &ops); err != nil { 497 + log.Printf("Error unmarshaling ops for seq=%d: %v", seq, err) 498 + continue 499 + } 500 + 501 + // Reconstruct event 502 + var since *string 503 + if sinceRev.Valid { 504 + since = &sinceRev.String 505 + } 506 + 507 + event := &RepoCommitEvent{ 508 + Seq: seq, 509 + Repo: b.holdDID, 510 + Commit: commitCID, 511 + Rev: rev, 512 + Since: since, 513 + Blocks: repoSlice, 514 + Ops: ops, 515 + Time: time.Now().Format(time.RFC3339), 516 + Type: "#commit", 517 + } 518 + 519 + // Send to subscriber 520 + select { 521 + case sub.send <- event: 522 + // Sent successfully 523 + case <-time.After(5 * time.Second): 524 + // Timeout, subscriber too slow 525 + log.Printf("Backfill timeout for subscriber at seq=%d", seq) 526 + return nil 527 + } 528 + } 529 + 530 + return rows.Err() 531 + } 532 + 533 + // backfillFromMemory sends events from in-memory ring buffer (fallback) 534 + func (b *EventBroadcaster) backfillFromMemory(sub *Subscriber, cursor int64) { 187 535 b.mu.RLock() 188 536 defer b.mu.RUnlock() 189 537
+12 -12
pkg/hold/pds/events_test.go
··· 13 13 // TestNewEventBroadcaster tests event broadcaster creation 14 14 func TestNewEventBroadcaster(t *testing.T) { 15 15 holdDID := "did:web:hold.example.com" 16 - broadcaster := NewEventBroadcaster(holdDID, 100) 16 + broadcaster := NewEventBroadcaster(holdDID, 100, "") 17 17 18 18 if broadcaster.holdDID != holdDID { 19 19 t.Errorf("Expected holdDID=%s, got %s", holdDID, broadcaster.holdDID) ··· 35 35 // TestNewEventBroadcaster_DefaultHistory tests default history size 36 36 func TestNewEventBroadcaster_DefaultHistory(t *testing.T) { 37 37 // Zero or negative maxHistory should default to 100 38 - broadcaster := NewEventBroadcaster("did:web:test", 0) 38 + broadcaster := NewEventBroadcaster("did:web:test", 0, "") 39 39 if broadcaster.maxHistory != 100 { 40 40 t.Errorf("Expected default maxHistory=100 for input 0, got %d", broadcaster.maxHistory) 41 41 } 42 42 43 - broadcaster2 := NewEventBroadcaster("did:web:test", -5) 43 + broadcaster2 := NewEventBroadcaster("did:web:test", -5, "") 44 44 if broadcaster2.maxHistory != 100 { 45 45 t.Errorf("Expected default maxHistory=100 for negative input, got %d", broadcaster2.maxHistory) 46 46 } ··· 48 48 49 49 // TestGetCurrentSeq tests sequence number tracking 50 50 func TestGetCurrentSeq(t *testing.T) { 51 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 51 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10, "") 52 52 53 53 // Initial seq should be 0 54 54 seq := broadcaster.GetCurrentSeq() ··· 91 91 92 92 // TestBroadcast tests event broadcasting 93 93 func TestBroadcast(t *testing.T) { 94 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 94 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10, "") 95 95 ctx := context.Background() 96 96 97 97 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") ··· 144 144 // TestAddToHistory_RingBuffer tests ring buffer behavior 145 145 func TestAddToHistory_RingBuffer(t *testing.T) { 146 146 // Create broadcaster with small history 147 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 3) 147 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 3, "") 148 148 ctx := context.Background() 149 149 150 150 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") ··· 181 181 182 182 // TestConvertToCommitEvent tests event conversion 183 183 func TestConvertToCommitEvent(t *testing.T) { 184 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 184 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10, "") 185 185 186 186 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 187 187 since := "prev-rev" ··· 296 296 297 297 // TestConvertToCommitEvent_NoSince tests event without since field 298 298 func TestConvertToCommitEvent_NoSince(t *testing.T) { 299 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 299 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10, "") 300 300 301 301 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 302 302 ··· 317 317 318 318 // TestSetRepoEventHandler tests handler registration 319 319 func TestSetRepoEventHandler(t *testing.T) { 320 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 320 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10, "") 321 321 322 322 handler := broadcaster.SetRepoEventHandler() 323 323 if handler == nil { ··· 385 385 386 386 // TestSubscribe_CursorZeroBackfill tests that cursor=0 replays all events 387 387 func TestSubscribe_CursorZeroBackfill(t *testing.T) { 388 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 100) 388 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 100, "") 389 389 ctx := context.Background() 390 390 391 391 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") ··· 456 456 457 457 // TestSubscribe_MidCursorBackfill tests that cursor=N only gets events after N 458 458 func TestSubscribe_MidCursorBackfill(t *testing.T) { 459 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 100) 459 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 100, "") 460 460 ctx := context.Background() 461 461 462 462 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") ··· 506 506 507 507 // TestSubscribe_NegativeCursorNoBackfill tests that negative cursor means no backfill 508 508 func TestSubscribe_NegativeCursorNoBackfill(t *testing.T) { 509 - broadcaster := NewEventBroadcaster("did:web:hold.example.com", 100) 509 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 100, "") 510 510 ctx := context.Background() 511 511 512 512 testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke")
+1 -1
pkg/hold/pds/xrpc_test.go
··· 2059 2059 handler, ctx := setupTestXRPCHandler(t) 2060 2060 2061 2061 // Create EventBroadcaster 2062 - broadcaster := NewEventBroadcaster(handler.pds.DID(), 100) 2062 + broadcaster := NewEventBroadcaster(handler.pds.DID(), 100, "") 2063 2063 handler.broadcaster = broadcaster 2064 2064 2065 2065 // Set up test HTTP server