this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add pebble store

+870 -4
+4 -2
cmd/butterfly/cmd_discover.go
··· 51 51 &cli.StringFlag{ 52 52 Name: "store", 53 53 Value: "stdout", 54 - Usage: "Storage mode: stdout, tarfiles, or duckdb", 54 + Usage: "Storage mode: stdout, tarfiles, duckdb, or pebble", 55 55 }, 56 56 &cli.StringFlag{ 57 57 Name: "storage-dir", ··· 61 61 &cli.StringFlag{ 62 62 Name: "db", 63 63 Value: "./butterfly.db", 64 - Usage: "Path to DuckDB database file", 64 + Usage: "Path to database file (DuckDB or Pebble)", 65 65 }, 66 66 }, 67 67 Action: runDiscover, ··· 111 111 s = store.NewTarfilesStore(storageDir) 112 112 case "duckdb": 113 113 s = store.NewDuckdbStore(dbPath) 114 + case "pebble": 115 + s = store.NewPebbleStore(dbPath) 114 116 default: 115 117 return fmt.Errorf("unknown storage mode: %s", storeMode) 116 118 }
+10 -2
cmd/butterfly/cmd_sync.go
··· 5 5 "fmt" 6 6 "log" 7 7 "os" 8 + "time" 8 9 9 10 "github.com/bluesky-social/indigo/cmd/butterfly/remote" 10 11 "github.com/bluesky-social/indigo/cmd/butterfly/store" ··· 48 49 &cli.StringFlag{ 49 50 Name: "store", 50 51 Value: "stdout", 51 - Usage: "Storage mode: stdout, tarfiles, or duckdb", 52 + Usage: "Storage mode: stdout, tarfiles, duckdb, or pebble", 52 53 }, 53 54 &cli.StringFlag{ 54 55 Name: "storage-dir", ··· 58 59 &cli.StringFlag{ 59 60 Name: "db", 60 61 Value: "./butterfly.db", 61 - Usage: "Path to DuckDB database file", 62 + Usage: "Path to database file (DuckDB or Pebble)", 62 63 }, 63 64 }, 64 65 Action: runSync, ··· 115 116 s = store.NewTarfilesStore(storageDir) 116 117 case "duckdb": 117 118 s = store.NewDuckdbStore(dbPath) 119 + case "pebble": 120 + s = store.NewPebbleStore(dbPath) 118 121 default: 119 122 return fmt.Errorf("unknown storage mode: %s", storeMode) 120 123 } ··· 131 134 logger.Printf("failed to close store: %v", err) 132 135 } 133 136 }() 137 + 138 + start := time.Now() 134 139 135 140 // Handle different input modes 136 141 switch inputMode { ··· 165 170 return fmt.Errorf("failed to process stream: %w", err) 166 171 } 167 172 } 173 + 174 + elapsed := time.Since(start) 175 + log.Printf("Completed in %s", elapsed) 168 176 169 177 return nil 170 178 }
+525
cmd/butterfly/store/pebble.go
··· 1 + // Package store provides a pebble implementation of the Store interface 2 + package store 3 + 4 + import ( 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "sync" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 12 + "github.com/cockroachdb/pebble" 13 + ) 14 + 15 + // PebbleStore implements Store using CockroachDB's Pebble embedded key-value store 16 + type PebbleStore struct { 17 + db *pebble.DB 18 + path string 19 + 20 + // Batch processing 21 + batchMu sync.Mutex 22 + batch *pebble.Batch 23 + batchSize int 24 + maxBatch int 25 + flushTimer *time.Timer 26 + } 27 + 28 + // NewPebbleStore creates a new Pebble-backed store 29 + func NewPebbleStore(path string) *PebbleStore { 30 + return &PebbleStore{ 31 + path: path, 32 + maxBatch: 1000, // Default batch size 33 + } 34 + } 35 + 36 + // Setup initializes the Pebble database 37 + func (s *PebbleStore) Setup(ctx context.Context) error { 38 + // Configure Pebble 39 + opts := &pebble.Options{ 40 + // Set cache size to 64MB 41 + // Cache: pebble.NewCache(64 << 20), 42 + // // Enable compression 43 + // Levels: []pebble.LevelOptions{ 44 + // {Compression: pebble.SnappyCompression}, 45 + // }, 46 + // // Set write buffer size 47 + // MemTableSize: 64 << 20, 48 + // // Configure compaction 49 + // L0CompactionThreshold: 2, 50 + // L0StopWritesThreshold: 12, 51 + } 52 + // defer opts.Cache.Unref() 53 + 54 + // Open the database 55 + db, err := pebble.Open(s.path, opts) 56 + if err != nil { 57 + return fmt.Errorf("failed to open pebble database: %w", err) 58 + } 59 + 60 + s.db = db 61 + 62 + // Initialize batch 63 + s.batch = s.db.NewBatch() 64 + 65 + return nil 66 + } 67 + 68 + // Close flushes pending writes and closes the database 69 + func (s *PebbleStore) Close() error { 70 + fmt.Println() 71 + // Flush any pending batch 72 + if err := s.flushBatch(); err != nil { 73 + fmt.Printf("Warning: failed to flush batch on close: %v\n", err) 74 + } 75 + 76 + // Cancel flush timer if active 77 + if s.flushTimer != nil { 78 + s.flushTimer.Stop() 79 + } 80 + 81 + // Close the database 82 + if s.db != nil { 83 + return s.db.Close() 84 + } 85 + 86 + return nil 87 + } 88 + 89 + // BackfillRepo resets a repo and re-ingests it from a remote stream 90 + func (s *PebbleStore) BackfillRepo(ctx context.Context, did string, stream *remote.RemoteStream) error { 91 + // Delete all existing data for this repo 92 + if err := s.deleteRepo(did); err != nil { 93 + return fmt.Errorf("failed to delete existing repo data: %w", err) 94 + } 95 + 96 + // Process the stream 97 + return s.ActiveSync(ctx, stream) 98 + } 99 + 100 + // ActiveSync processes live update events from a remote stream 101 + func (s *PebbleStore) ActiveSync(ctx context.Context, stream *remote.RemoteStream) error { 102 + for event := range stream.Ch { 103 + select { 104 + case <-ctx.Done(): 105 + // Flush pending batch before returning 106 + if err := s.flushBatch(); err != nil { 107 + return fmt.Errorf("failed to flush batch: %w", err) 108 + } 109 + return ctx.Err() 110 + default: 111 + } 112 + 113 + if err := s.processEvent(event); err != nil { 114 + // Log error but continue processing 115 + fmt.Printf("Error processing event for %s: %v\n", event.Did, err) 116 + } 117 + 118 + // Auto-flush batch if it gets too large 119 + s.batchMu.Lock() 120 + if s.batchSize >= s.maxBatch { 121 + if err := s.flushBatchLocked(); err != nil { 122 + s.batchMu.Unlock() 123 + return fmt.Errorf("failed to flush batch: %w", err) 124 + } 125 + } 126 + s.batchMu.Unlock() 127 + } 128 + 129 + // Final flush 130 + return s.flushBatch() 131 + } 132 + 133 + // processEvent handles a single stream event 134 + func (s *PebbleStore) processEvent(event remote.StreamEvent) error { 135 + switch event.Kind { 136 + case remote.EventKindCommit: 137 + return s.processCommit(event.Did, event.Commit) 138 + case remote.EventKindError: 139 + return s.processError(event.Did, event.Error) 140 + default: 141 + return fmt.Errorf("unknown event kind: %s", event.Kind) 142 + } 143 + } 144 + 145 + // processCommit handles commit events 146 + func (s *PebbleStore) processCommit(did string, commit *remote.StreamEventCommit) error { 147 + if commit == nil { 148 + return fmt.Errorf("nil commit event") 149 + } 150 + 151 + // Build the key for this record 152 + key := s.buildRecordKey(did, commit.Collection, commit.Rkey) 153 + 154 + s.batchMu.Lock() 155 + defer s.batchMu.Unlock() 156 + 157 + switch commit.Operation { 158 + case remote.OpCreate, remote.OpUpdate: 159 + // Serialize the record 160 + value, err := json.Marshal(commit.Record) 161 + if err != nil { 162 + return fmt.Errorf("failed to marshal record: %w", err) 163 + } 164 + 165 + // Add to batch 166 + if err := s.batch.Set([]byte(key), value, pebble.Sync); err != nil { 167 + return fmt.Errorf("failed to set record: %w", err) 168 + } 169 + 170 + case remote.OpDelete: 171 + // Delete from batch 172 + if err := s.batch.Delete([]byte(key), pebble.Sync); err != nil { 173 + return fmt.Errorf("failed to delete record: %w", err) 174 + } 175 + 176 + default: 177 + return fmt.Errorf("unknown operation: %s", commit.Operation) 178 + } 179 + 180 + // Store commit metadata 181 + commitKey := s.buildCommitKey(did, commit.Rev) 182 + commitData, err := json.Marshal(commit) 183 + if err != nil { 184 + return fmt.Errorf("failed to marshal commit: %w", err) 185 + } 186 + if err := s.batch.Set([]byte(commitKey), commitData, pebble.Sync); err != nil { 187 + return fmt.Errorf("failed to store commit: %w", err) 188 + } 189 + 190 + s.batchSize++ 191 + 192 + // Schedule a flush if timer not already running 193 + if s.flushTimer == nil { 194 + s.flushTimer = time.AfterFunc(5*time.Second, func() { 195 + s.flushBatch() 196 + }) 197 + } 198 + 199 + return nil 200 + } 201 + 202 + // processError handles error events 203 + func (s *PebbleStore) processError(did string, streamErr *remote.StreamEventError) error { 204 + if streamErr == nil { 205 + return fmt.Errorf("nil error event") 206 + } 207 + 208 + // Log the error 209 + fmt.Printf("Stream error for %s: %v (fatal=%v)\n", did, streamErr.Err, streamErr.Fatal) 210 + 211 + if streamErr.Fatal { 212 + return streamErr.Err 213 + } 214 + 215 + return nil 216 + } 217 + 218 + // KV Storage Methods 219 + 220 + // KvGet retrieves a value from the KV namespace 221 + func (s *PebbleStore) KvGet(namespace string, key string) (string, error) { 222 + fullKey := s.buildKvKey(namespace, key) 223 + value, closer, err := s.db.Get([]byte(fullKey)) 224 + if err != nil { 225 + if err == pebble.ErrNotFound { 226 + return "", fmt.Errorf("key %q not found in namespace %q", key, namespace) 227 + } 228 + return "", fmt.Errorf("failed to get key: %w", err) 229 + } 230 + defer closer.Close() 231 + 232 + return string(value), nil 233 + } 234 + 235 + // KvPut stores a value in the KV namespace 236 + func (s *PebbleStore) KvPut(namespace string, key string, value string) error { 237 + fullKey := s.buildKvKey(namespace, key) 238 + return s.db.Set([]byte(fullKey), []byte(value), pebble.Sync) 239 + } 240 + 241 + // KvDel deletes a value from the KV namespace 242 + func (s *PebbleStore) KvDel(namespace string, key string) error { 243 + fullKey := s.buildKvKey(namespace, key) 244 + err := s.db.Delete([]byte(fullKey), pebble.Sync) 245 + if err != nil && err != pebble.ErrNotFound { 246 + return fmt.Errorf("failed to delete key: %w", err) 247 + } 248 + return nil 249 + } 250 + 251 + // Helper methods 252 + 253 + // buildRecordKey constructs a key for a record 254 + func (s *PebbleStore) buildRecordKey(did, collection, rkey string) string { 255 + return fmt.Sprintf("record/%s/%s/%s", did, collection, rkey) 256 + } 257 + 258 + // buildCommitKey constructs a key for a commit 259 + func (s *PebbleStore) buildCommitKey(did, rev string) string { 260 + return fmt.Sprintf("commit/%s/%s", did, rev) 261 + } 262 + 263 + // buildKvKey constructs a key for KV storage 264 + func (s *PebbleStore) buildKvKey(namespace, key string) string { 265 + return fmt.Sprintf("kv/%s/%s", namespace, key) 266 + } 267 + 268 + // deleteRepo removes all data for a repository 269 + func (s *PebbleStore) deleteRepo(did string) error { 270 + // Delete all records with prefix 271 + prefix := fmt.Sprintf("record/%s/", did) 272 + if err := s.deleteByPrefix(prefix); err != nil { 273 + return err 274 + } 275 + 276 + // Delete all commits 277 + prefix = fmt.Sprintf("commit/%s/", did) 278 + if err := s.deleteByPrefix(prefix); err != nil { 279 + return err 280 + } 281 + 282 + return nil 283 + } 284 + 285 + // deleteByPrefix deletes all keys with the given prefix 286 + func (s *PebbleStore) deleteByPrefix(prefix string) error { 287 + iter, err := s.db.NewIter(&pebble.IterOptions{ 288 + LowerBound: []byte(prefix), 289 + UpperBound: []byte(prefix + "\xff"), 290 + }) 291 + if err != nil { 292 + return fmt.Errorf("failed to create iterator: %w", err) 293 + } 294 + defer iter.Close() 295 + 296 + batch := s.db.NewBatch() 297 + defer batch.Close() 298 + 299 + count := 0 300 + for iter.First(); iter.Valid(); iter.Next() { 301 + if err := batch.Delete(iter.Key(), pebble.Sync); err != nil { 302 + return err 303 + } 304 + count++ 305 + 306 + // Commit batch periodically 307 + if count%1000 == 0 { 308 + if err := batch.Commit(pebble.Sync); err != nil { 309 + return err 310 + } 311 + batch = s.db.NewBatch() 312 + } 313 + } 314 + 315 + // Commit remaining 316 + if count%1000 != 0 { 317 + if err := batch.Commit(pebble.Sync); err != nil { 318 + return err 319 + } 320 + } 321 + 322 + return iter.Error() 323 + } 324 + 325 + // flushBatch commits the current batch 326 + func (s *PebbleStore) flushBatch() error { 327 + s.batchMu.Lock() 328 + defer s.batchMu.Unlock() 329 + return s.flushBatchLocked() 330 + } 331 + 332 + // flushBatchLocked commits the current batch (must be called with batchMu locked) 333 + func (s *PebbleStore) flushBatchLocked() error { 334 + if s.batch == nil || s.batchSize == 0 { 335 + return nil 336 + } 337 + 338 + if err := s.batch.Commit(pebble.Sync); err != nil { 339 + return err 340 + } 341 + 342 + // Reset batch 343 + s.batch = s.db.NewBatch() 344 + s.batchSize = 0 345 + 346 + // Cancel timer if running 347 + if s.flushTimer != nil { 348 + s.flushTimer.Stop() 349 + s.flushTimer = nil 350 + } 351 + 352 + return nil 353 + } 354 + 355 + // GetRecord retrieves a single record 356 + func (s *PebbleStore) GetRecord(ctx context.Context, did, collection, rkey string) (map[string]any, error) { 357 + // Build the key for this record 358 + key := s.buildRecordKey(did, collection, rkey) 359 + 360 + // Get the record from the database 361 + value, closer, err := s.db.Get([]byte(key)) 362 + if err != nil { 363 + if err == pebble.ErrNotFound { 364 + return nil, fmt.Errorf("record not found: %s/%s/%s", did, collection, rkey) 365 + } 366 + return nil, fmt.Errorf("failed to get record: %w", err) 367 + } 368 + defer closer.Close() 369 + 370 + // Unmarshal the record 371 + var record map[string]any 372 + if err := json.Unmarshal(value, &record); err != nil { 373 + return nil, fmt.Errorf("failed to unmarshal record: %w", err) 374 + } 375 + 376 + return record, nil 377 + } 378 + 379 + // ListRecords retrieves records for a given DID and collection 380 + func (s *PebbleStore) ListRecords(ctx context.Context, did, collection string, limit int) ([]map[string]any, error) { 381 + // Build the prefix for this collection 382 + prefix := fmt.Sprintf("record/%s/%s/", did, collection) 383 + 384 + // Create iterator with the prefix bounds 385 + iter, err := s.db.NewIter(&pebble.IterOptions{ 386 + LowerBound: []byte(prefix), 387 + UpperBound: []byte(prefix + "\xff"), 388 + }) 389 + if err != nil { 390 + return nil, fmt.Errorf("failed to create iterator: %w", err) 391 + } 392 + defer iter.Close() 393 + 394 + // Collect records 395 + var records []map[string]any 396 + count := 0 397 + 398 + for iter.First(); iter.Valid(); iter.Next() { 399 + // Check context cancellation 400 + select { 401 + case <-ctx.Done(): 402 + return nil, ctx.Err() 403 + default: 404 + } 405 + 406 + // Check limit 407 + if limit > 0 && count >= limit { 408 + break 409 + } 410 + 411 + // Get the value 412 + value := iter.Value() 413 + 414 + // Unmarshal the record 415 + var record map[string]any 416 + if err := json.Unmarshal(value, &record); err != nil { 417 + // Log error but continue to next record 418 + fmt.Printf("Warning: failed to unmarshal record at key %s: %v\n", string(iter.Key()), err) 419 + continue 420 + } 421 + 422 + // Add metadata about the record key 423 + // Extract the rkey from the full key (record/did/collection/rkey) 424 + keyStr := string(iter.Key()) 425 + if len(keyStr) > len(prefix) { 426 + rkey := keyStr[len(prefix):] 427 + record["_rkey"] = rkey 428 + } 429 + 430 + records = append(records, record) 431 + count++ 432 + } 433 + 434 + // Check for iterator errors 435 + if err := iter.Error(); err != nil { 436 + return nil, fmt.Errorf("iterator error: %w", err) 437 + } 438 + 439 + return records, nil 440 + } 441 + 442 + // ListAllRecords retrieves all records for a given DID across all collections 443 + func (s *PebbleStore) ListAllRecords(ctx context.Context, did string, limit int) (map[string][]map[string]any, error) { 444 + // Build the prefix for all records of this DID 445 + prefix := fmt.Sprintf("record/%s/", did) 446 + 447 + // Create iterator with the prefix bounds 448 + iter, err := s.db.NewIter(&pebble.IterOptions{ 449 + LowerBound: []byte(prefix), 450 + UpperBound: []byte(prefix + "\xff"), 451 + }) 452 + if err != nil { 453 + return nil, fmt.Errorf("failed to create iterator: %w", err) 454 + } 455 + defer iter.Close() 456 + 457 + // Collect records organized by collection 458 + result := make(map[string][]map[string]any) 459 + count := 0 460 + 461 + for iter.First(); iter.Valid(); iter.Next() { 462 + // Check context cancellation 463 + select { 464 + case <-ctx.Done(): 465 + return nil, ctx.Err() 466 + default: 467 + } 468 + 469 + // Check limit 470 + if limit > 0 && count >= limit { 471 + break 472 + } 473 + 474 + // Parse the key to extract collection 475 + keyStr := string(iter.Key()) 476 + // Key format: record/did/collection/rkey 477 + parts := []byte(keyStr) 478 + 479 + // Find collection name 480 + collectionStart := len(prefix) 481 + collectionEnd := collectionStart 482 + for i := collectionStart; i < len(parts); i++ { 483 + if parts[i] == '/' { 484 + collectionEnd = i 485 + break 486 + } 487 + } 488 + 489 + if collectionEnd == collectionStart { 490 + continue // Invalid key format 491 + } 492 + 493 + collection := string(parts[collectionStart:collectionEnd]) 494 + rkey := string(parts[collectionEnd+1:]) 495 + 496 + // Get the value 497 + value := iter.Value() 498 + 499 + // Unmarshal the record 500 + var record map[string]any 501 + if err := json.Unmarshal(value, &record); err != nil { 502 + // Log error but continue to next record 503 + fmt.Printf("Warning: failed to unmarshal record at key %s: %v\n", keyStr, err) 504 + continue 505 + } 506 + 507 + // Add metadata 508 + record["_rkey"] = rkey 509 + record["_collection"] = collection 510 + 511 + // Add to result 512 + if _, exists := result[collection]; !exists { 513 + result[collection] = []map[string]any{} 514 + } 515 + result[collection] = append(result[collection], record) 516 + count++ 517 + } 518 + 519 + // Check for iterator errors 520 + if err := iter.Error(); err != nil { 521 + return nil, fmt.Errorf("iterator error: %w", err) 522 + } 523 + 524 + return result, nil 525 + }
+330
cmd/butterfly/store/pebble_test.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "os" 7 + "path/filepath" 8 + "testing" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 12 + ) 13 + 14 + func TestPebbleStore(t *testing.T) { 15 + // Create temp directory for test database 16 + tmpDir, err := os.MkdirTemp("", "pebble-test-*") 17 + if err != nil { 18 + t.Fatal(err) 19 + } 20 + defer os.RemoveAll(tmpDir) 21 + 22 + dbPath := filepath.Join(tmpDir, "test.db") 23 + 24 + // Create store 25 + store := NewPebbleStore(dbPath) 26 + 27 + // Setup 28 + ctx := context.Background() 29 + if err := store.Setup(ctx); err != nil { 30 + t.Fatalf("Failed to setup store: %v", err) 31 + } 32 + defer store.Close() 33 + 34 + // Test KV operations 35 + t.Run("KV Operations", func(t *testing.T) { 36 + // Test Put 37 + if err := store.KvPut("test-namespace", "key1", "value1"); err != nil { 38 + t.Errorf("KvPut failed: %v", err) 39 + } 40 + 41 + // Test Get 42 + value, err := store.KvGet("test-namespace", "key1") 43 + if err != nil { 44 + t.Errorf("KvGet failed: %v", err) 45 + } 46 + if value != "value1" { 47 + t.Errorf("Expected value1, got %s", value) 48 + } 49 + 50 + // Test Get non-existent key 51 + _, err = store.KvGet("test-namespace", "nonexistent") 52 + if err == nil { 53 + t.Error("Expected error for non-existent key") 54 + } 55 + 56 + // Test Delete 57 + if err := store.KvDel("test-namespace", "key1"); err != nil { 58 + t.Errorf("KvDel failed: %v", err) 59 + } 60 + 61 + // Verify deletion 62 + _, err = store.KvGet("test-namespace", "key1") 63 + if err == nil { 64 + t.Error("Expected error after deletion") 65 + } 66 + }) 67 + 68 + // Test event processing 69 + t.Run("Event Processing", func(t *testing.T) { 70 + // Create a channel for events 71 + eventCh := make(chan remote.StreamEvent, 10) 72 + 73 + // Create a test stream 74 + stream := &remote.RemoteStream{ 75 + Ch: eventCh, 76 + } 77 + 78 + // Send test events 79 + go func() { 80 + // Send a commit event 81 + eventCh <- remote.StreamEvent{ 82 + Did: "did:plc:test123", 83 + Timestamp: time.Now(), 84 + Kind: remote.EventKindCommit, 85 + Commit: &remote.StreamEventCommit{ 86 + Rev: "rev1", 87 + Operation: remote.OpCreate, 88 + Collection: "app.bsky.feed.post", 89 + Rkey: "rkey1", 90 + Record: map[string]any{ 91 + "text": "Hello, world!", 92 + "createdAt": time.Now().Format(time.RFC3339), 93 + }, 94 + Cid: "cid1", 95 + }, 96 + } 97 + 98 + // Send a commit event 99 + eventCh <- remote.StreamEvent{ 100 + Did: "did:plc:test123", 101 + Timestamp: time.Now(), 102 + Kind: remote.EventKindCommit, 103 + Commit: &remote.StreamEventCommit{ 104 + Rev: "rev2", 105 + Operation: remote.OpCreate, 106 + Collection: "app.bsky.feed.post", 107 + Rkey: "rkey2", 108 + Record: map[string]any{ 109 + "text": "Hello, world!", 110 + "createdAt": time.Now().Format(time.RFC3339), 111 + }, 112 + Cid: "cid1", 113 + }, 114 + } 115 + 116 + // Send a commit event 117 + eventCh <- remote.StreamEvent{ 118 + Did: "did:plc:test123", 119 + Timestamp: time.Now(), 120 + Kind: remote.EventKindCommit, 121 + Commit: &remote.StreamEventCommit{ 122 + Rev: "rev3", 123 + Operation: remote.OpCreate, 124 + Collection: "app.bsky.feed.post", 125 + Rkey: "rkey3", 126 + Record: map[string]any{ 127 + "text": "Hello, world!", 128 + "createdAt": time.Now().Format(time.RFC3339), 129 + }, 130 + Cid: "cid1", 131 + }, 132 + } 133 + 134 + // Close the channel after a short delay 135 + time.Sleep(100 * time.Millisecond) 136 + close(eventCh) 137 + }() 138 + 139 + // Process the stream 140 + if err := store.ActiveSync(ctx, stream); err != nil { 141 + t.Errorf("ActiveSync failed: %v", err) 142 + } 143 + 144 + // Verify data was stored 145 + records, err := store.ListRecords(ctx, "did:plc:test123", "app.bsky.feed.post", 0) 146 + if err != nil { 147 + t.Errorf("ListRecords failed: %v", err) 148 + } 149 + if len(records) != 3 { 150 + t.Errorf("Expected 3 records, got %d", len(records)) 151 + } 152 + }) 153 + 154 + // Test batch processing 155 + t.Run("Batch Processing", func(t *testing.T) { 156 + // Create a channel for events 157 + eventCh := make(chan remote.StreamEvent, 100) 158 + 159 + // Create a test stream 160 + stream := &remote.RemoteStream{ 161 + Ch: eventCh, 162 + } 163 + 164 + // Send many events to test batching 165 + go func() { 166 + for i := 0; i < 50; i++ { 167 + eventCh <- remote.StreamEvent{ 168 + Did: "did:plc:batch", 169 + Timestamp: time.Now(), 170 + Kind: remote.EventKindCommit, 171 + Commit: &remote.StreamEventCommit{ 172 + Rev: "rev" + string(rune(i)), 173 + Operation: remote.OpCreate, 174 + Collection: "app.bsky.feed.post", 175 + Rkey: "rkey" + string(rune(i)), 176 + Record: map[string]any{ 177 + "text": "Post " + string(rune(i)), 178 + "createdAt": time.Now().Format(time.RFC3339), 179 + }, 180 + Cid: "cid" + string(rune(i)), 181 + }, 182 + } 183 + } 184 + close(eventCh) 185 + }() 186 + 187 + // Process the stream 188 + if err := store.ActiveSync(ctx, stream); err != nil { 189 + t.Errorf("Batch ActiveSync failed: %v", err) 190 + } 191 + 192 + // Verify data was stored 193 + records, err := store.ListRecords(ctx, "did:plc:batch", "app.bsky.feed.post", 0) 194 + if err != nil { 195 + t.Errorf("ListRecords failed: %v", err) 196 + } 197 + if len(records) != 50 { 198 + t.Errorf("Expected 50 records, got %d", len(records)) 199 + } 200 + }) 201 + 202 + // Test GetRecord and ListRecords 203 + t.Run("GetRecord and ListRecords", func(t *testing.T) { 204 + // Create test data 205 + testDid := "did:plc:getlist" 206 + testCollection := "app.bsky.feed.post" 207 + 208 + // Create a channel for events 209 + eventCh := make(chan remote.StreamEvent, 10) 210 + stream := &remote.RemoteStream{Ch: eventCh} 211 + 212 + // Send test events with different records 213 + go func() { 214 + for i := 1; i <= 5; i++ { 215 + eventCh <- remote.StreamEvent{ 216 + Did: testDid, 217 + Timestamp: time.Now(), 218 + Kind: remote.EventKindCommit, 219 + Commit: &remote.StreamEventCommit{ 220 + Rev: fmt.Sprintf("rev%d", i), 221 + Operation: remote.OpCreate, 222 + Collection: testCollection, 223 + Rkey: fmt.Sprintf("post%d", i), 224 + Record: map[string]any{ 225 + "text": fmt.Sprintf("Post number %d", i), 226 + "createdAt": time.Now().Format(time.RFC3339), 227 + "index": i, 228 + }, 229 + Cid: fmt.Sprintf("cid%d", i), 230 + }, 231 + } 232 + } 233 + 234 + // Add a record in a different collection 235 + eventCh <- remote.StreamEvent{ 236 + Did: testDid, 237 + Timestamp: time.Now(), 238 + Kind: remote.EventKindCommit, 239 + Commit: &remote.StreamEventCommit{ 240 + Rev: "rev-like", 241 + Operation: remote.OpCreate, 242 + Collection: "app.bsky.feed.like", 243 + Rkey: "like1", 244 + Record: map[string]any{ 245 + "subject": "at://did:plc:other/app.bsky.feed.post/123", 246 + "createdAt": time.Now().Format(time.RFC3339), 247 + }, 248 + Cid: "cid-like", 249 + }, 250 + } 251 + 252 + close(eventCh) 253 + }() 254 + 255 + // Process the stream 256 + if err := store.ActiveSync(ctx, stream); err != nil { 257 + t.Errorf("Failed to sync test data: %v", err) 258 + } 259 + 260 + // Test GetRecord 261 + record, err := store.GetRecord(ctx, testDid, testCollection, "post3") 262 + if err != nil { 263 + t.Errorf("GetRecord failed: %v", err) 264 + } 265 + if record == nil { 266 + t.Error("GetRecord returned nil record") 267 + } else { 268 + if text, ok := record["text"].(string); !ok || text != "Post number 3" { 269 + t.Errorf("Expected 'Post number 3', got %v", record["text"]) 270 + } 271 + if index, ok := record["index"].(float64); !ok || int(index) != 3 { 272 + t.Errorf("Expected index 3, got %v", record["index"]) 273 + } 274 + } 275 + 276 + // Test GetRecord with non-existent record 277 + _, err = store.GetRecord(ctx, testDid, testCollection, "nonexistent") 278 + if err == nil { 279 + t.Error("Expected error for non-existent record") 280 + } 281 + 282 + // Test ListRecords 283 + records, err := store.ListRecords(ctx, testDid, testCollection, 0) 284 + if err != nil { 285 + t.Errorf("ListRecords failed: %v", err) 286 + } 287 + if len(records) != 5 { 288 + t.Errorf("Expected 5 records, got %d", len(records)) 289 + } 290 + 291 + // Verify records have rkey metadata 292 + for _, rec := range records { 293 + if _, ok := rec["_rkey"].(string); !ok { 294 + t.Error("Record missing _rkey metadata") 295 + } 296 + } 297 + 298 + // Test ListRecords with limit 299 + limitedRecords, err := store.ListRecords(ctx, testDid, testCollection, 3) 300 + if err != nil { 301 + t.Errorf("ListRecords with limit failed: %v", err) 302 + } 303 + if len(limitedRecords) != 3 { 304 + t.Errorf("Expected 3 records with limit, got %d", len(limitedRecords)) 305 + } 306 + 307 + // Test ListAllRecords 308 + allRecords, err := store.ListAllRecords(ctx, testDid, 0) 309 + if err != nil { 310 + t.Errorf("ListAllRecords failed: %v", err) 311 + } 312 + if len(allRecords) != 2 { 313 + t.Errorf("Expected 2 collections, got %d", len(allRecords)) 314 + } 315 + if posts, ok := allRecords["app.bsky.feed.post"]; !ok || len(posts) != 5 { 316 + t.Errorf("Expected 5 posts, got %v", len(posts)) 317 + } 318 + if likes, ok := allRecords["app.bsky.feed.like"]; !ok || len(likes) != 1 { 319 + t.Errorf("Expected 1 like, got %v", len(likes)) 320 + } 321 + 322 + // Test context cancellation 323 + cancelCtx, cancel := context.WithCancel(ctx) 324 + cancel() // Cancel immediately 325 + _, err = store.ListRecords(cancelCtx, testDid, testCollection, 0) 326 + if err == nil || err != context.Canceled { 327 + t.Errorf("Expected context.Canceled error, got %v", err) 328 + } 329 + }) 330 + }
+1
cmd/butterfly/store/store.go
··· 41 41 StoreTypeDuckDB StoreType = "duckdb" 42 42 StoreTypeClickHouse StoreType = "clickhouse" 43 43 StoreTypeTarFiles StoreType = "tarfiles" 44 + StoreTypePebble StoreType = "pebble" 44 45 )