this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add backfill interface to stores

Paul Frazee 5b10f824 43a42cfd

+544 -39
+20
cmd/butterfly/README.md
··· 2 2 3 3 A sync engine for atproto with an optional baked-in database. 4 4 5 + ## WIP notes 6 + 7 + - Until the Store interface has stabilized, let's only work on stdout and duckdb to keep things simple. 8 + 9 + ## TODOs 10 + 11 + v1 12 + 13 + - Create first working implementations of all Remote interfaces so that we build familiarity with their semantics and idiosyncracies 14 + - Implement repo discovery interfaces on Remote 15 + - Implement bidi identity resolution and caching 16 + - Implement a work-scheduler which abstracts Remote, Identity, and Store to backfill & sync using Selectors 17 + - Implement Store querying interfaces; develop indexing strategies 18 + - Create v1 CLI and APIs 19 + 20 + future 21 + 22 + - A local data read/write model, perhaps modeled as virtual local users 23 + - Prometheus endpoints 24 + 5 25 ## Example selectors 6 26 7 27 My data:
+1 -1
cmd/butterfly/main.go
··· 71 71 defer stream.Close() 72 72 73 73 // Process the stream 74 - if err := s.Receive(ctx, stream); err != nil { 74 + if err := s.BackfillRepo(ctx, *did, stream); err != nil { 75 75 logger.Fatalf("failed to process stream: %v", err) 76 76 } 77 77 }
+1 -1
cmd/butterfly/remote/carfile.go
··· 136 136 137 137 // SubscribeRecords is not supported for CAR files 138 138 func (c *CarfileRemote) SubscribeRecords(ctx context.Context, params SubscribeRecordsParams) (*RemoteStream, error) { 139 - return nil, fmt.Errorf("subscribe records: %w", ErrNotImplemented) 139 + return nil, fmt.Errorf("subscribe records: %w", ErrNotSupported) 140 140 } 141 141 142 142 // readCar reads and validates a CAR file
+7 -3
cmd/butterfly/remote/jetstream.go
··· 3 3 */ 4 4 package remote 5 5 6 - import "fmt" 6 + import ( 7 + "fmt" 8 + ) 7 9 8 10 type JetstreamRemote struct { 9 11 service string 10 12 } 11 13 14 + // ListRepos not supported by jetstream 12 15 func (self JetstreamRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 13 - return nil, fmt.Errorf("Not yet implemented") 16 + return nil, fmt.Errorf("list repos: %w", ErrNotSupported) 14 17 } 15 18 19 + // FetchRepo not supported by jetstream 16 20 func (self JetstreamRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 17 - return nil, fmt.Errorf("Not yet implemented") 21 + return nil, fmt.Errorf("fetch repo: %w", ErrNotSupported) 18 22 } 19 23 20 24 func (self JetstreamRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) {
+3 -2
cmd/butterfly/remote/remote.go
··· 123 123 124 124 // StreamEventError represents an error event in the stream 125 125 type StreamEventError struct { 126 - Err error 127 - Fatal bool // Whether this error terminates the stream 126 + Err error 127 + Fatal bool // Whether this error terminates the stream 128 128 RetryAfter *time.Duration // Suggested retry delay 129 129 } 130 130 131 131 // Common errors 132 132 var ( 133 133 ErrRemoteUnavailable = errors.New("remote service unavailable") 134 + ErrNotSupported = errors.New("operation not supported by this remote") 134 135 ErrNotImplemented = errors.New("operation not implemented by this remote") 135 136 ErrInvalidDID = errors.New("invalid DID format") 136 137 ErrRepoNotFound = errors.New("repository not found")
+2
cmd/butterfly/store/clickhouse.go
··· 3 3 */ 4 4 5 5 package store 6 + 7 + // NOTE: do not implement this until the Store interface is fully mature
+130 -3
cmd/butterfly/store/duckdb.go
··· 11 11 "sync" 12 12 "time" 13 13 14 - _ "github.com/marcboeker/go-duckdb" 14 + "github.com/marcboeker/go-duckdb" 15 15 16 16 "github.com/bluesky-social/indigo/cmd/butterfly/remote" 17 17 ) ··· 147 147 return nil 148 148 } 149 149 150 - // Receive processes events from the stream 151 - func (d *DuckdbStore) Receive(ctx context.Context, stream *remote.RemoteStream) error { 150 + // BackfillRepo resets a repo and re-ingests it from a remote stream 151 + // The implementation should handle context cancellation appropriately 152 + func (d *DuckdbStore) BackfillRepo(ctx context.Context, did string, stream *remote.RemoteStream) error { 153 + d.mu.Lock() 154 + defer d.mu.Unlock() 155 + 156 + // First, delete all existing records for this DID 157 + _, err := d.db.ExecContext(ctx, "DELETE FROM records WHERE did = ?", did) 158 + if err != nil { 159 + return fmt.Errorf("failed to delete existing records for DID %s: %w", did, err) 160 + } 161 + 162 + // Get a connection from the existing database 163 + conn, err := d.db.Conn(ctx) 164 + if err != nil { 165 + return fmt.Errorf("failed to get connection: %w", err) 166 + } 167 + defer conn.Close() 168 + 169 + // Create appender for records table 170 + var appender *duckdb.Appender 171 + err = conn.Raw(func(driverConn interface{}) error { 172 + duckdbConn, ok := driverConn.(*duckdb.Conn) 173 + if !ok { 174 + return fmt.Errorf("failed to cast to duckdb connection") 175 + } 176 + appender, err = duckdb.NewAppenderFromConn(duckdbConn, "", "records") 177 + return err 178 + }) 179 + if err != nil { 180 + return fmt.Errorf("failed to create appender: %w", err) 181 + } 182 + defer appender.Close() 183 + 184 + batchSize := 0 185 + const maxBatchSize = 1000 186 + 187 + for event := range stream.Ch { 188 + select { 189 + case <-ctx.Done(): 190 + // Flush any pending data before returning 191 + if batchSize > 0 { 192 + if flushErr := appender.Flush(); flushErr != nil { 193 + return fmt.Errorf("failed to flush appender on context cancel: %w", flushErr) 194 + } 195 + } 196 + return ctx.Err() 197 + default: 198 + } 199 + 200 + // Only process commit events for the specified DID 201 + if event.Kind != remote.EventKindCommit || event.Commit == nil || event.Did != did { 202 + continue 203 + } 204 + 205 + if err := d.processCommitWithAppender(ctx, event.Did, event.Commit, appender); err != nil { 206 + // Log error but continue processing 207 + fmt.Fprintf(os.Stderr, "duckdb: error processing commit for %s during backfill: %v\n", event.Did, err) 208 + continue 209 + } 210 + 211 + batchSize++ 212 + // Flush appender periodically for better performance 213 + if batchSize >= maxBatchSize { 214 + if err := appender.Flush(); err != nil { 215 + return fmt.Errorf("failed to flush appender: %w", err) 216 + } 217 + batchSize = 0 218 + } 219 + } 220 + 221 + // Close appender to flush any remaining data 222 + if err := appender.Close(); err != nil { 223 + return fmt.Errorf("failed to close appender: %w", err) 224 + } 225 + 226 + return nil 227 + } 228 + 229 + // ActiveSync processes live update events from a remote stream 230 + func (d *DuckdbStore) ActiveSync(ctx context.Context, stream *remote.RemoteStream) error { 152 231 // Start a transaction for better performance with batch operations 153 232 tx, err := d.db.BeginTx(ctx, nil) 154 233 if err != nil { ··· 233 312 now, did, commit.Collection, commit.Rkey) 234 313 if err != nil { 235 314 return fmt.Errorf("failed to delete record: %w", err) 315 + } 316 + 317 + default: 318 + return fmt.Errorf("unknown operation: %s", commit.Operation) 319 + } 320 + 321 + return nil 322 + } 323 + 324 + // processCommitWithAppender handles a single commit event using the appender API 325 + func (d *DuckdbStore) processCommitWithAppender(ctx context.Context, did string, commit *remote.StreamEventCommit, appender *duckdb.Appender) error { 326 + now := time.Now() 327 + 328 + switch commit.Operation { 329 + case remote.OpCreate, remote.OpUpdate: 330 + // For updates in a backfill, we don't need to delete first since we already cleared all records 331 + // Just append the new record 332 + err := appender.AppendRow( 333 + did, // did 334 + commit.Collection, // collection 335 + commit.Rkey, // rkey 336 + commit.Cid, // cid 337 + commit.Rev, // rev 338 + commit.Record, // record 339 + false, // deleted 340 + now, // created_at 341 + now, // updated_at 342 + ) 343 + if err != nil { 344 + return fmt.Errorf("failed to append record: %w", err) 345 + } 346 + 347 + case remote.OpDelete: 348 + // For deletes during backfill, we append a deleted record 349 + // (Note- this really shouldnt happen) 350 + err := appender.AppendRow( 351 + did, // did 352 + commit.Collection, // collection 353 + commit.Rkey, // rkey 354 + "", // cid (empty for deleted) 355 + "", // rev (empty for deleted) 356 + nil, // record (empty JSON for deleted) 357 + true, // deleted 358 + now, // created_at 359 + now, // updated_at 360 + ) 361 + if err != nil { 362 + return fmt.Errorf("failed to append deleted record: %w", err) 236 363 } 237 364 238 365 default:
+341 -7
cmd/butterfly/store/duckdb_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "path/filepath" 6 7 "testing" 7 8 "time" ··· 121 122 }() 122 123 123 124 // Process the stream 124 - err = store.Receive(ctx, stream) 125 + err = store.ActiveSync(ctx, stream) 125 126 require.NoError(t, err) 126 127 127 128 // Verify the post was updated ··· 188 189 }() 189 190 190 191 // Process the stream 191 - err = store.Receive(ctx, stream) 192 + err = store.ActiveSync(ctx, stream) 192 193 require.NoError(t, err) 193 194 194 195 // List all posts ··· 249 250 }() 250 251 251 252 // Process the stream 252 - err = store.Receive(ctx, stream) 253 + err = store.ActiveSync(ctx, stream) 253 254 require.NoError(t, err) 254 255 255 256 // Verify stats ··· 307 308 }() 308 309 309 310 // Process should handle transaction batching automatically 310 - err = store.Receive(ctx, stream) 311 + err = store.ActiveSync(ctx, stream) 311 312 require.NoError(t, err) 312 313 313 314 // Verify all records were saved ··· 357 358 }() 358 359 359 360 // Process should stop when context is cancelled 360 - err = store.Receive(ctx, stream) 361 + err = store.ActiveSync(ctx, stream) 361 362 assert.ErrorIs(t, err, context.Canceled) 362 363 } 363 364 ··· 426 427 }() 427 428 428 429 // Should process without error, skipping invalid events 429 - err = store.Receive(ctx, stream) 430 + err = store.ActiveSync(ctx, stream) 430 431 require.NoError(t, err) 431 432 432 433 // Verify only valid events were processed ··· 442 443 post2, err := store.GetRecord(ctx, "did:plc:testuser", "app.bsky.feed.post", "valid2") 443 444 require.NoError(t, err) 444 445 assert.NotNil(t, post2) 445 - } 446 + } 447 + 448 + func TestDuckdbStore_BackfillRepo(t *testing.T) { 449 + tmpDir := t.TempDir() 450 + dbPath := filepath.Join(tmpDir, "test.db") 451 + store := NewDuckdbStore(dbPath) 452 + 453 + ctx := context.Background() 454 + err := store.Setup(ctx) 455 + require.NoError(t, err) 456 + defer store.Close() 457 + 458 + testDID := "did:plc:backfilltest" 459 + 460 + // First, create some initial records using ActiveSync 461 + stream1 := &remote.RemoteStream{ 462 + Ch: make(chan remote.StreamEvent, 10), 463 + } 464 + 465 + go func() { 466 + defer close(stream1.Ch) 467 + 468 + // Create initial posts 469 + for i := 0; i < 3; i++ { 470 + stream1.Ch <- remote.StreamEvent{ 471 + Did: testDID, 472 + Timestamp: time.Now(), 473 + Kind: remote.EventKindCommit, 474 + Commit: &remote.StreamEventCommit{ 475 + Rev: "rev" + string(rune(i)), 476 + Operation: remote.OpCreate, 477 + Collection: "app.bsky.feed.post", 478 + Rkey: "post" + string(rune(i+1)), 479 + Record: map[string]any{ 480 + "text": "Initial post " + string(rune(i)), 481 + "createdAt": time.Now().Format(time.RFC3339), 482 + }, 483 + Cid: "cid" + string(rune(i)), 484 + }, 485 + } 486 + } 487 + }() 488 + 489 + err = store.ActiveSync(ctx, stream1) 490 + require.NoError(t, err) 491 + 492 + // Verify initial records 493 + stats, err := store.GetStats(ctx) 494 + require.NoError(t, err) 495 + assert.Equal(t, int64(3), stats["total_records"]) 496 + 497 + posts, err := store.ListRecords(ctx, testDID, "app.bsky.feed.post", 0) 498 + require.NoError(t, err) 499 + assert.Len(t, posts, 3) 500 + 501 + // Now backfill with new data 502 + stream2 := &remote.RemoteStream{ 503 + Ch: make(chan remote.StreamEvent, 10), 504 + } 505 + 506 + go func() { 507 + defer close(stream2.Ch) 508 + 509 + // Send different posts for backfill 510 + for i := 0; i < 5; i++ { 511 + stream2.Ch <- remote.StreamEvent{ 512 + Did: testDID, 513 + Timestamp: time.Now(), 514 + Kind: remote.EventKindCommit, 515 + Commit: &remote.StreamEventCommit{ 516 + Rev: "backfillrev" + string(rune(i)), 517 + Operation: remote.OpCreate, 518 + Collection: "app.bsky.feed.post", 519 + Rkey: "backfillpost" + string(rune(i+1)), 520 + Record: map[string]any{ 521 + "text": "Backfilled post " + string(rune(i)), 522 + "createdAt": time.Now().Format(time.RFC3339), 523 + }, 524 + Cid: "backfillcid" + string(rune(i)), 525 + }, 526 + } 527 + } 528 + }() 529 + 530 + // Backfill should replace all existing records for this DID 531 + err = store.BackfillRepo(ctx, testDID, stream2) 532 + require.NoError(t, err) 533 + 534 + // Verify old records are gone and new ones are present 535 + stats, err = store.GetStats(ctx) 536 + require.NoError(t, err) 537 + assert.Equal(t, int64(5), stats["total_records"]) 538 + 539 + posts, err = store.ListRecords(ctx, testDID, "app.bsky.feed.post", 0) 540 + require.NoError(t, err) 541 + assert.Len(t, posts, 5) 542 + 543 + // Verify old posts are gone 544 + for i := 0; i < 3; i++ { 545 + post, err := store.GetRecord(ctx, testDID, "app.bsky.feed.post", "post"+string(rune(i+1))) 546 + require.NoError(t, err) 547 + assert.Nil(t, post) 548 + } 549 + 550 + // Verify new posts exist 551 + for i := 0; i < 5; i++ { 552 + post, err := store.GetRecord(ctx, testDID, "app.bsky.feed.post", "backfillpost"+string(rune(i+1))) 553 + fmt.Printf("%d %v\n", i, post) 554 + require.NoError(t, err) 555 + assert.NotNil(t, post) 556 + assert.Contains(t, post["text"], "Backfilled post") 557 + } 558 + } 559 + 560 + func TestDuckdbStore_BackfillRepo_MultipleCollections(t *testing.T) { 561 + tmpDir := t.TempDir() 562 + dbPath := filepath.Join(tmpDir, "test.db") 563 + store := NewDuckdbStore(dbPath) 564 + 565 + ctx := context.Background() 566 + err := store.Setup(ctx) 567 + require.NoError(t, err) 568 + defer store.Close() 569 + 570 + testDID := "did:plc:multicollection" 571 + 572 + // Create initial records across multiple collections 573 + stream1 := &remote.RemoteStream{ 574 + Ch: make(chan remote.StreamEvent, 10), 575 + } 576 + 577 + go func() { 578 + defer close(stream1.Ch) 579 + 580 + stream1.Ch <- remote.StreamEvent{ 581 + Did: testDID, 582 + Timestamp: time.Now(), 583 + Kind: remote.EventKindCommit, 584 + Commit: &remote.StreamEventCommit{ 585 + Operation: remote.OpCreate, 586 + Collection: "app.bsky.feed.post", 587 + Rkey: "oldpost", 588 + Record: map[string]any{"text": "Old post"}, 589 + }, 590 + } 591 + 592 + stream1.Ch <- remote.StreamEvent{ 593 + Did: testDID, 594 + Timestamp: time.Now(), 595 + Kind: remote.EventKindCommit, 596 + Commit: &remote.StreamEventCommit{ 597 + Operation: remote.OpCreate, 598 + Collection: "app.bsky.graph.follow", 599 + Rkey: "oldfollow", 600 + Record: map[string]any{"subject": "did:plc:olduser"}, 601 + }, 602 + } 603 + 604 + stream1.Ch <- remote.StreamEvent{ 605 + Did: testDID, 606 + Timestamp: time.Now(), 607 + Kind: remote.EventKindCommit, 608 + Commit: &remote.StreamEventCommit{ 609 + Operation: remote.OpCreate, 610 + Collection: "app.bsky.actor.profile", 611 + Rkey: "self", 612 + Record: map[string]any{"displayName": "Old Name"}, 613 + }, 614 + } 615 + }() 616 + 617 + err = store.ActiveSync(ctx, stream1) 618 + require.NoError(t, err) 619 + 620 + // Backfill with new data 621 + stream2 := &remote.RemoteStream{ 622 + Ch: make(chan remote.StreamEvent, 10), 623 + } 624 + 625 + go func() { 626 + defer close(stream2.Ch) 627 + 628 + stream2.Ch <- remote.StreamEvent{ 629 + Did: testDID, 630 + Timestamp: time.Now(), 631 + Kind: remote.EventKindCommit, 632 + Commit: &remote.StreamEventCommit{ 633 + Operation: remote.OpCreate, 634 + Collection: "app.bsky.feed.post", 635 + Rkey: "newpost1", 636 + Record: map[string]any{"text": "New post 1"}, 637 + }, 638 + } 639 + 640 + stream2.Ch <- remote.StreamEvent{ 641 + Did: testDID, 642 + Timestamp: time.Now(), 643 + Kind: remote.EventKindCommit, 644 + Commit: &remote.StreamEventCommit{ 645 + Operation: remote.OpCreate, 646 + Collection: "app.bsky.feed.post", 647 + Rkey: "newpost2", 648 + Record: map[string]any{"text": "New post 2"}, 649 + }, 650 + } 651 + 652 + stream2.Ch <- remote.StreamEvent{ 653 + Did: testDID, 654 + Timestamp: time.Now(), 655 + Kind: remote.EventKindCommit, 656 + Commit: &remote.StreamEventCommit{ 657 + Operation: remote.OpCreate, 658 + Collection: "app.bsky.actor.profile", 659 + Rkey: "self", 660 + Record: map[string]any{"displayName": "New Name"}, 661 + }, 662 + } 663 + }() 664 + 665 + err = store.BackfillRepo(ctx, testDID, stream2) 666 + require.NoError(t, err) 667 + 668 + // Verify backfill replaced ALL collections for this DID 669 + posts, err := store.ListRecords(ctx, testDID, "app.bsky.feed.post", 0) 670 + require.NoError(t, err) 671 + assert.Len(t, posts, 2) 672 + 673 + // Old follow should be gone 674 + follow, err := store.GetRecord(ctx, testDID, "app.bsky.graph.follow", "oldfollow") 675 + require.NoError(t, err) 676 + assert.Nil(t, follow) 677 + 678 + // Profile should be updated 679 + profile, err := store.GetRecord(ctx, testDID, "app.bsky.actor.profile", "self") 680 + require.NoError(t, err) 681 + assert.NotNil(t, profile) 682 + assert.Equal(t, "New Name", profile["displayName"]) 683 + } 684 + 685 + func TestDuckdbStore_BackfillRepo_ContextCancellation(t *testing.T) { 686 + tmpDir := t.TempDir() 687 + dbPath := filepath.Join(tmpDir, "test.db") 688 + store := NewDuckdbStore(dbPath) 689 + 690 + ctx, cancel := context.WithCancel(context.Background()) 691 + err := store.Setup(ctx) 692 + require.NoError(t, err) 693 + defer store.Close() 694 + 695 + testDID := "did:plc:canceltest" 696 + 697 + // Create a stream that sends many events 698 + stream := &remote.RemoteStream{ 699 + Ch: make(chan remote.StreamEvent, 100), 700 + } 701 + 702 + go func() { 703 + defer close(stream.Ch) 704 + for i := 0; i < 100; i++ { 705 + stream.Ch <- remote.StreamEvent{ 706 + Did: testDID, 707 + Timestamp: time.Now(), 708 + Kind: remote.EventKindCommit, 709 + Commit: &remote.StreamEventCommit{ 710 + Operation: remote.OpCreate, 711 + Collection: "app.bsky.feed.post", 712 + Rkey: "post" + string(rune(i)), 713 + Record: map[string]any{"text": "Test post"}, 714 + }, 715 + } 716 + time.Sleep(10 * time.Millisecond) 717 + } 718 + }() 719 + 720 + // Cancel context after a short time 721 + go func() { 722 + time.Sleep(50 * time.Millisecond) 723 + cancel() 724 + }() 725 + 726 + // Backfill should stop when context is cancelled 727 + err = store.BackfillRepo(ctx, testDID, stream) 728 + assert.ErrorIs(t, err, context.Canceled) 729 + } 730 + 731 + func TestDuckdbStore_BackfillRepo_EmptyStream(t *testing.T) { 732 + tmpDir := t.TempDir() 733 + dbPath := filepath.Join(tmpDir, "test.db") 734 + store := NewDuckdbStore(dbPath) 735 + 736 + ctx := context.Background() 737 + err := store.Setup(ctx) 738 + require.NoError(t, err) 739 + defer store.Close() 740 + 741 + testDID := "did:plc:emptytest" 742 + 743 + // Create some initial records 744 + stream1 := &remote.RemoteStream{ 745 + Ch: make(chan remote.StreamEvent, 2), 746 + } 747 + 748 + go func() { 749 + defer close(stream1.Ch) 750 + stream1.Ch <- remote.StreamEvent{ 751 + Did: testDID, 752 + Timestamp: time.Now(), 753 + Kind: remote.EventKindCommit, 754 + Commit: &remote.StreamEventCommit{ 755 + Operation: remote.OpCreate, 756 + Collection: "app.bsky.feed.post", 757 + Rkey: "post1", 758 + Record: map[string]any{"text": "Post to be deleted"}, 759 + }, 760 + } 761 + }() 762 + 763 + err = store.ActiveSync(ctx, stream1) 764 + require.NoError(t, err) 765 + 766 + // Backfill with empty stream 767 + stream2 := &remote.RemoteStream{ 768 + Ch: make(chan remote.StreamEvent), 769 + } 770 + close(stream2.Ch) 771 + 772 + err = store.BackfillRepo(ctx, testDID, stream2) 773 + require.NoError(t, err) 774 + 775 + // Verify all records for this DID were deleted 776 + posts, err := store.ListRecords(ctx, testDID, "app.bsky.feed.post", 0) 777 + require.NoError(t, err) 778 + assert.Len(t, posts, 0) 779 + }
+12 -7
cmd/butterfly/store/stdout.go
··· 23 23 } 24 24 25 25 type repoStats struct { 26 - numRecords int 27 - numCommits int 28 - numErrors int 29 - collections map[string]int 26 + numRecords int 27 + numCommits int 28 + numErrors int 29 + collections map[string]int 30 30 } 31 31 32 32 // Setup initializes the store ··· 45 45 return nil 46 46 } 47 47 48 - // Receive processes events from the stream 49 - func (s *StdoutStore) Receive(ctx context.Context, stream *remote.RemoteStream) error { 48 + // BackfillRepo resets a repo and re-ingests it from a remote stream 49 + func (s *StdoutStore) BackfillRepo(ctx context.Context, did string, stream *remote.RemoteStream) error { 50 + return s.ActiveSync(ctx, stream) 51 + } 52 + 53 + // ActiveSync processes live update events from a remote stream 54 + func (s *StdoutStore) ActiveSync(ctx context.Context, stream *remote.RemoteStream) error { 50 55 for event := range stream.Ch { 51 56 select { 52 57 case <-ctx.Done(): ··· 94 99 if stats.numErrors > 0 { 95 100 fmt.Printf(" Errors: %d\n", stats.numErrors) 96 101 } 97 - 102 + 98 103 if len(stats.collections) > 0 { 99 104 fmt.Println(" Collections:") 100 105 for col, count := range stats.collections {
+6 -2
cmd/butterfly/store/store.go
··· 15 15 // Close tears down the store and releases resources 16 16 Close() error 17 17 18 - // Receive processes events from a remote stream 18 + // BackfillRepo resets a repo and re-ingests it from a remote stream 19 19 // The implementation should handle context cancellation appropriately 20 - Receive(ctx context.Context, stream *remote.RemoteStream) error 20 + BackfillRepo(ctx context.Context, did string, stream *remote.RemoteStream) error 21 + 22 + // ActiveSync processes live update events from a remote stream 23 + // The implementation should handle context cancellation appropriately 24 + ActiveSync(ctx context.Context, stream *remote.RemoteStream) error 21 25 } 22 26 23 27 // StoreType identifies the type of store
+15 -7
cmd/butterfly/store/tarfiles.go
··· 18 18 "github.com/bluesky-social/indigo/cmd/butterfly/remote" 19 19 ) 20 20 21 + // NOTE: do not work on this this until the Store interface is fully mature 22 + 21 23 // TarfilesStore implements Store by writing repository data to gzipped tar files 22 24 type TarfilesStore struct { 23 25 // The directory to store the .tar.gz files ··· 34 36 35 37 // tarWriter manages writing to a single tar file 36 38 type tarWriter struct { 37 - file *os.File 39 + file *os.File 38 40 gzipWriter *gzip.Writer 39 - writer *tar.Writer 40 - entries map[string]bool // Track existing entries 41 - tempFile string 42 - finalFile string 41 + writer *tar.Writer 42 + entries map[string]bool // Track existing entries 43 + tempFile string 44 + finalFile string 43 45 } 44 46 45 47 // NewTarfilesStore creates a new TarfilesStore ··· 89 91 return nil 90 92 } 91 93 92 - // Receive processes events from the stream 93 - func (t *TarfilesStore) Receive(ctx context.Context, stream *remote.RemoteStream) error { 94 + // BackfillRepo resets a repo and re-ingests it from a remote stream 95 + func (t *TarfilesStore) BackfillRepo(ctx context.Context, did string, stream *remote.RemoteStream) error { 96 + // TODO For now, it's fine to just reuse ActiveSync. A more optimized variant could be useful. 97 + return t.ActiveSync(ctx, stream) 98 + } 99 + 100 + // ActiveSync processes live update events from a remote stream 101 + func (t *TarfilesStore) ActiveSync(ctx context.Context, stream *remote.RemoteStream) error { 94 102 for event := range stream.Ch { 95 103 select { 96 104 case <-ctx.Done():
+6 -6
cmd/butterfly/store/tarfiles_test.go
··· 114 114 }() 115 115 116 116 // Process the stream 117 - err = store.Receive(ctx, stream) 117 + err = store.ActiveSync(ctx, stream) 118 118 require.NoError(t, err) 119 119 120 120 // Close the store to finalize tar files ··· 187 187 }() 188 188 189 189 // Process the stream 190 - err = store.Receive(ctx, stream) 190 + err = store.ActiveSync(ctx, stream) 191 191 require.NoError(t, err) 192 192 193 193 // Close the store ··· 241 241 }() 242 242 243 243 // Process should stop when context is cancelled 244 - err = store.Receive(ctx, stream) 244 + err = store.ActiveSync(ctx, stream) 245 245 assert.ErrorIs(t, err, context.Canceled) 246 246 } 247 247 ··· 273 273 } 274 274 close(stream.Ch) 275 275 276 - err = store.Receive(ctx, stream) 276 + err = store.ActiveSync(ctx, stream) 277 277 require.NoError(t, err) 278 278 err = store.Close() 279 279 require.NoError(t, err) ··· 303 303 } 304 304 close(stream.Ch) 305 305 306 - err = store.Receive(ctx, stream) 306 + err = store.ActiveSync(ctx, stream) 307 307 require.NoError(t, err) 308 308 err = store.Close() 309 309 require.NoError(t, err) ··· 381 381 }() 382 382 383 383 // Should process without error, skipping invalid events 384 - err = store.Receive(ctx, stream) 384 + err = store.ActiveSync(ctx, stream) 385 385 require.NoError(t, err) 386 386 387 387 err = store.Close()