···11-/*
22-Identity resolution infra
33-*/
11+// Package main provides identity resolution infrastructure for Butterfly
22+package main
4355-package main
44+// TODO: Implement identity resolution functionality
55+// This will likely include:
66+// - DID resolution
77+// - Handle resolution
88+// - Identity caching
99+// - Integration with the atproto identity package
+54-10
cmd/butterfly/main.go
···11package main
2233import (
44+ "context"
55+ "flag"
46 "fmt"
77+ "log"
88+ "os"
59610 "github.com/bluesky-social/indigo/cmd/butterfly/remote"
711 "github.com/bluesky-social/indigo/cmd/butterfly/store"
812)
9131014func main() {
1111- r := remote.CarfileRemote{Filepath: "/Users/paulfrazee/tmp/carfiles/pfrazee.car"}
1212- s := store.StdoutStore{Mode: store.StdoutStoreModeStats}
1515+ // Command line flags
1616+ var (
1717+ carFile = flag.String("car", "", "Path to CAR file to read")
1818+ did = flag.String("did", "", "DID to fetch (required)")
1919+ outputMode = flag.String("output", "stats", "Output mode: stats or passthrough")
2020+ help = flag.Bool("help", false, "Show help")
2121+ )
2222+ flag.Parse()
13231414- if err := s.Setup(); err != nil {
1515- fmt.Println(err)
1616- return
2424+ if *help || *carFile == "" || *did == "" {
2525+ fmt.Fprintf(os.Stderr, "Usage: butterfly -car <path> -did <did> [-output stats|passthrough]\n")
2626+ flag.PrintDefaults()
2727+ os.Exit(1)
1728 }
1818- defer s.Close()
19292020- res, err := r.FetchRepo(remote.FetchRepoParams{Did: "did:plc:ragtjsm2j2vknwkz3zp4oxrd"})
3030+ // Set up logger
3131+ logger := log.New(os.Stderr, "butterfly: ", log.LstdFlags)
3232+3333+ // Create remote
3434+ r := &remote.CarfileRemote{Filepath: *carFile}
3535+3636+ // Create store based on output mode
3737+ var s store.Store
3838+ switch *outputMode {
3939+ case "passthrough":
4040+ s = &store.StdoutStore{Mode: store.StdoutStoreModePassthrough}
4141+ case "stats":
4242+ s = &store.StdoutStore{Mode: store.StdoutStoreModeStats}
4343+ default:
4444+ logger.Fatalf("unknown output mode: %s", *outputMode)
4545+ }
4646+4747+ // Create context
4848+ ctx := context.Background()
4949+5050+ // Initialize store
5151+ if err := s.Setup(ctx); err != nil {
5252+ logger.Fatalf("failed to setup store: %v", err)
5353+ }
5454+ defer func() {
5555+ if err := s.Close(); err != nil {
5656+ logger.Printf("failed to close store: %v", err)
5757+ }
5858+ }()
5959+6060+ // Fetch repository
6161+ stream, err := r.FetchRepo(ctx, remote.FetchRepoParams{Did: *did})
2162 if err != nil {
2222- fmt.Println(err)
2323- return
6363+ logger.Fatalf("failed to fetch repo: %v", err)
2464 }
6565+ defer stream.Close()
25662626- s.Receive(res)
6767+ // Process the stream
6868+ if err := s.Receive(ctx, stream); err != nil {
6969+ logger.Fatalf("failed to process stream: %v", err)
7070+ }
2771}
+99-55
cmd/butterfly/remote/carfile.go
···11-/*
22-.car file remote interface
33-*/
11+// Package remote provides a CAR file implementation of the Remote interface
42package remote
5364import (
75 "context"
86 "fmt"
97 "os"
88+ "time"
1091110 "github.com/bluesky-social/indigo/atproto/data"
1211 "github.com/bluesky-social/indigo/atproto/repo"
···1413 "github.com/ipfs/go-cid"
1514)
16151616+// CarfileRemote implements the Remote interface for reading from CAR files
1717type CarfileRemote struct {
1818 Filepath string
1919}
20202121-func (self CarfileRemote) ListRepos(params ListReposParams) (*ListReposResult, error) {
2222- ctx := context.Background()
2323-2424- _, _, did, err := ReadCar(ctx, self.Filepath)
2121+// ListRepos returns the DID of the repository in the CAR file
2222+func (c *CarfileRemote) ListRepos(ctx context.Context, params ListReposParams) (*ListReposResult, error) {
2323+ _, _, did, err := c.readCar(ctx)
2524 if err != nil {
2626- return nil, err
2525+ return nil, fmt.Errorf("failed to read CAR file: %w", err)
2726 }
28272929- res := ListReposResult{
2828+ return &ListReposResult{
3029 Dids: []string{did},
3131- }
3232- return &res, nil
3030+ }, nil
3331}
34323535-func (self CarfileRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) {
3636- ctx := context.Background()
3737-3838- // read & validate
3939- _, r, did, err := ReadCar(ctx, self.Filepath)
3333+// FetchRepo streams the contents of a repository from the CAR file
3434+func (c *CarfileRemote) FetchRepo(ctx context.Context, params FetchRepoParams) (*RemoteStream, error) {
3535+ commit, r, did, err := c.readCar(ctx)
4036 if err != nil {
4141- return nil, err
3737+ return nil, fmt.Errorf("failed to read CAR file: %w", err)
4238 }
3939+4340 if did != params.Did {
4444- return nil, fmt.Errorf("Repo not found: %s", params.Did)
4141+ return nil, fmt.Errorf("%w: %s", ErrRepoNotFound, params.Did)
4242+ }
4343+4444+ // Create stream with cancellable context
4545+ streamCtx, cancel := context.WithCancel(ctx)
4646+ stream := &RemoteStream{
4747+ Ch: make(chan StreamEvent, 100), // Buffer for better performance
4848+ cancel: cancel,
4549 }
46504747- res := RemoteStream{Ch: make(chan StreamEvent)}
5151+ // Stream repository contents
5252+ go func() {
5353+ defer close(stream.Ch)
5454+ defer cancel()
5555+5656+ // Send records from the repository
5757+ err := r.MST.Walk(func(k []byte, v cid.Cid) error {
5858+ // Check for cancellation
5959+ select {
6060+ case <-streamCtx.Done():
6161+ return streamCtx.Err()
6262+ default:
6363+ }
48644949- // walk & emit
5050- go (func() {
5151- err = r.MST.Walk(func(k []byte, v cid.Cid) error {
5265 col, rkey, err := syntax.ParseRepoPath(string(k))
5366 if err != nil {
5454- return err
6767+ return fmt.Errorf("invalid repo path %q: %w", string(k), err)
5568 }
5656- recBytes, _, err := r.GetRecordBytes(ctx, col, rkey)
6969+7070+ // Skip if collections filter is specified
7171+ if len(params.Collections) > 0 {
7272+ found := false
7373+ for _, c := range params.Collections {
7474+ if c == col.String() {
7575+ found = true
7676+ break
7777+ }
7878+ }
7979+ if !found {
8080+ return nil
8181+ }
8282+ }
8383+8484+ recBytes, _, err := r.GetRecordBytes(streamCtx, col, rkey)
5785 if err != nil {
5858- return err
8686+ return fmt.Errorf("failed to get record %s/%s: %w", col, rkey, err)
5987 }
60886189 rec, err := data.UnmarshalCBOR(recBytes)
6290 if err != nil {
6363- return err
9191+ return fmt.Errorf("failed to unmarshal record %s/%s: %w", col, rkey, err)
6492 }
65936666- res.Ch <- StreamEvent{
6767- Did: did,
6868- Time: 0, // TODO
6969- Kind: "commit",
7070- Commit: StreamEventCommit{
7171- Rev: "", // TODO
7272- Operation: "create",
9494+ event := StreamEvent{
9595+ Did: did,
9696+ Timestamp: time.Now(), // TODO - CAR files don't have timestamps?
9797+ Kind: EventKindCommit,
9898+ Commit: &StreamEventCommit{
9999+ Rev: commit.Rev, // TODO - is this accurate?
100100+ Operation: OpCreate,
73101 Collection: col.String(),
74102 Rkey: rkey.String(),
75103 Record: rec,
7676- Cid: "", // TODO
104104+ Cid: v.String(),
77105 },
78106 }
107107+108108+ select {
109109+ case stream.Ch <- event:
110110+ case <-streamCtx.Done():
111111+ return streamCtx.Err()
112112+ }
113113+79114 return nil
80115 })
8111682117 if err != nil {
8383- res.Ch <- StreamEvent{
8484- Did: did,
8585- Time: 0, // TODO
8686- Kind: "Error",
8787- Error: StreamEventError{Err: err},
118118+ // Send error event
119119+ select {
120120+ case stream.Ch <- StreamEvent{
121121+ Did: did,
122122+ Timestamp: time.Now(),
123123+ Kind: EventKindError,
124124+ Error: &StreamEventError{
125125+ Err: err,
126126+ Fatal: true,
127127+ },
128128+ }:
129129+ case <-streamCtx.Done():
88130 }
89131 }
9090-9191- close(res.Ch)
9292- })()
132132+ }()
931339494- return &res, nil
134134+ return stream, nil
95135}
961369797-func (self CarfileRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) {
9898- res := RemoteStream{Ch: make(chan StreamEvent)}
9999- close(res.Ch)
100100- return &res, nil
137137+// SubscribeRecords is not supported for CAR files
138138+func (c *CarfileRemote) SubscribeRecords(ctx context.Context, params SubscribeRecordsParams) (*RemoteStream, error) {
139139+ return nil, fmt.Errorf("subscribe records: %w", ErrNotImplemented)
101140}
102141103103-func ReadCar(ctx context.Context, path string) (*repo.Commit, *repo.Repo, string, error) {
104104- file, err := os.Open(path)
142142+// readCar reads and validates a CAR file
143143+func (c *CarfileRemote) readCar(ctx context.Context) (*repo.Commit, *repo.Repo, string, error) {
144144+ file, err := os.Open(c.Filepath)
105145 if err != nil {
106106- return nil, nil, "", err
146146+ return nil, nil, "", fmt.Errorf("failed to open file: %w", err)
107147 }
108108- c, r, err := repo.LoadRepoFromCAR(ctx, file)
148148+ defer file.Close()
149149+150150+ commit, r, err := repo.LoadRepoFromCAR(ctx, file)
109151 if err != nil {
110110- return nil, nil, "", err
152152+ return nil, nil, "", fmt.Errorf("failed to load repo from CAR: %w", err)
111153 }
112112- did, err := syntax.ParseDID(c.DID)
154154+155155+ did, err := syntax.ParseDID(commit.DID)
113156 if err != nil {
114114- return nil, nil, "", err
157157+ return nil, nil, "", fmt.Errorf("invalid DID in commit: %w", err)
115158 }
116116- return c, r, did.String(), nil
159159+160160+ return commit, r, did.String(), nil
117161}
+91-21
cmd/butterfly/remote/remote.go
···11+// Package remote defines interfaces for fetching AT Protocol data from various sources
12package remote
2344+import (
55+ "context"
66+ "errors"
77+ "time"
88+)
99+1010+// Remote defines the interface for data sources in the butterfly sync engine
311type Remote interface {
44- // Lists repositories hosted at the given remote. Not all parameters will be supported.
55- ListRepos(params ListReposParams) (ListReposResult, error)
1212+ // ListRepos lists repositories hosted at the given remote
1313+ // Not all remotes will support all parameters
1414+ ListRepos(ctx context.Context, params ListReposParams) (*ListReposResult, error)
61577- // Fetches the contents of the requested repositories. Not all parameters will be supported.
88- FetchRepo(params FetchRepoParams) (RemoteStream, error)
1616+ // FetchRepo fetches the contents of the requested repository
1717+ // Not all remotes will support all parameters
1818+ FetchRepo(ctx context.Context, params FetchRepoParams) (*RemoteStream, error)
9191010- // Subscribes to the record event-stream of the remote. Not all parameters will be supported.
1111- SubscribeRecords(params SubscribeRecordsParams) (RemoteStream, error)
2020+ // SubscribeRecords subscribes to the record event-stream of the remote
2121+ // Not all remotes will support all parameters
2222+ SubscribeRecords(ctx context.Context, params SubscribeRecordsParams) (*RemoteStream, error)
1223}
13242525+// ListReposParams contains parameters for listing repositories
1426type ListReposParams struct {
1527 Collection string
2828+ Cursor string
2929+ Limit int
1630}
3131+3232+// ListReposResult contains the result of a repository listing
1733type ListReposResult struct {
1818- Dids []string
3434+ Dids []string
3535+ Cursor string // For pagination
1936}
20373838+// FetchRepoParams contains parameters for fetching a repository
2139type FetchRepoParams struct {
2240 Did string
2341 Collections []string
4242+ Since *string // Optional: fetch only changes since this revision
2443}
25444545+// SubscribeRecordsParams contains parameters for subscribing to records
2646type SubscribeRecordsParams struct {
2747 Dids []string
2848 Collections []string
4949+ Cursor int64 // Resume from this cursor position
2950}
30515252+// RemoteStream represents a stream of events from a remote
3153type RemoteStream struct {
3232- Ch chan StreamEvent
5454+ Ch chan StreamEvent
5555+ cancel context.CancelFunc
3356}
34575858+// Close closes the stream
5959+func (s *RemoteStream) Close() error {
6060+ if s.cancel != nil {
6161+ s.cancel()
6262+ }
6363+ return nil
6464+}
6565+6666+// StreamEventKind represents the type of stream event
6767+type StreamEventKind string
6868+6969+const (
7070+ EventKindCommit StreamEventKind = "commit"
7171+ EventKindIdentity StreamEventKind = "identity"
7272+ EventKindAccount StreamEventKind = "account"
7373+ EventKindError StreamEventKind = "error"
7474+)
7575+7676+// StreamEvent represents an event from the remote stream
3577type StreamEvent struct {
3636- Did string
3737- Time uint
3838- Kind string
3939- Commit StreamEventCommit
4040- Identity StreamEventIdentity
4141- Account StreamEventAccount
4242- Error StreamEventError
7878+ Did string
7979+ Timestamp time.Time
8080+ Kind StreamEventKind
8181+8282+ // Event-specific data (only one will be populated based on Kind)
8383+ Commit *StreamEventCommit
8484+ Identity *StreamEventIdentity
8585+ Account *StreamEventAccount
8686+ Error *StreamEventError
4387}
44888989+// CommitOperation represents the type of commit operation
9090+type CommitOperation string
9191+9292+const (
9393+ OpCreate CommitOperation = "create"
9494+ OpUpdate CommitOperation = "update"
9595+ OpDelete CommitOperation = "delete"
9696+)
9797+9898+// StreamEventCommit represents a repository commit event
4599type StreamEventCommit struct {
46100 Rev string
4747- Operation string
101101+ Operation CommitOperation
48102 Collection string
49103 Rkey string
50104 Record map[string]any
51105 Cid string
52106}
107107+108108+// StreamEventIdentity represents an identity update event
53109type StreamEventIdentity struct {
54110 Did string
55111 Handle string
5656- Seq uint
5757- Time string
112112+ Seq uint64
113113+ Time time.Time
58114}
115115+116116+// StreamEventAccount represents an account status change event
59117type StreamEventAccount struct {
60118 Active bool
61119 Did string
6262- Seq uint
6363- Time string
120120+ Seq uint64
121121+ Time time.Time
64122}
123123+124124+// StreamEventError represents an error event in the stream
65125type StreamEventError struct {
6666- Err error
126126+ Err error
127127+ Fatal bool // Whether this error terminates the stream
128128+ RetryAfter *time.Duration // Suggested retry delay
67129}
130130+131131+// Common errors
132132+var (
133133+ ErrRemoteUnavailable = errors.New("remote service unavailable")
134134+ ErrNotImplemented = errors.New("operation not implemented by this remote")
135135+ ErrInvalidDID = errors.New("invalid DID format")
136136+ ErrRepoNotFound = errors.New("repository not found")
137137+)
+107-38
cmd/butterfly/selectors.go
···11-/*
22-Query patterns for selecting content to sync and retain
33-*/
44-11+// Package main provides query patterns for selecting content to sync and retain
52package main
6377-import "fmt"
44+import (
55+ "encoding/json"
66+ "fmt"
77+)
8899+// SelectorDoc represents a complete selector configuration with selection rules and retention policies
910type SelectorDoc struct {
1011 Selectors []Selector `json:"select"`
1112 Retainers Retainer `json:"retain"`
1213}
13141515+// Selector defines a rule for selecting content based on a where clause and assigns it a tag
1416type Selector struct {
1517 Where WhereClause `json:"where"`
1618 Tag string `json:"tag"`
1719}
18202121+// WhereClause specifies the criteria for selecting content
1922type WhereClause struct {
2020- Repo string `json:"repo"`
2121- Collection string `json:"collection"`
2222- Attr string `json:"attr"`
2323- Service string `json:"service"`
2424- Method string `json:"method"`
2525- Params map[string]string `json:"params"`
2626- Pagination map[string]string `json:"pagination"`
2323+ // Repo selection fields
2424+ Repo string `json:"repo,omitempty"`
2525+ Collection string `json:"collection,omitempty"`
2626+ Attr string `json:"attr,omitempty"`
2727+2828+ // Service selection fields
2929+ Service string `json:"service,omitempty"`
3030+ Method string `json:"method,omitempty"`
3131+ Params map[string]string `json:"params,omitempty"`
3232+ Pagination map[string]string `json:"pagination,omitempty"`
2733}
28343535+// Retainer maps tags to their retention policies
3636+// Format: tag -> collection pattern -> retention policy
2937type Retainer map[string]map[string]string
30383131-// SelectorDoc
3939+// String returns a string representation of the SelectorDoc
4040+func (s SelectorDoc) String() string {
4141+ return fmt.Sprintf("selectors=%v retain=%v", s.Selectors, s.Retainers)
4242+}
4343+4444+// Validate checks if the SelectorDoc is valid
4545+func (s SelectorDoc) Validate() error {
4646+ if len(s.Selectors) == 0 {
4747+ return fmt.Errorf("no selectors defined")
4848+ }
32493333-func (self SelectorDoc) String() string {
3434- return fmt.Sprintf("%s retain=%s", self.Selectors, self.Retainers)
5050+ tags := make(map[string]bool)
5151+ for i, sel := range s.Selectors {
5252+ if err := sel.Validate(); err != nil {
5353+ return fmt.Errorf("selector[%d]: %w", i, err)
5454+ }
5555+ if tags[sel.Tag] {
5656+ return fmt.Errorf("duplicate tag: %s", sel.Tag)
5757+ }
5858+ tags[sel.Tag] = true
5959+ }
6060+6161+ // Validate that all retainer tags exist in selectors
6262+ for tag := range s.Retainers {
6363+ if !tags[tag] {
6464+ return fmt.Errorf("retainer references unknown tag: %s", tag)
6565+ }
6666+ }
6767+6868+ return nil
3569}
36703737-// Selector
7171+// IsRepo returns true if this selector targets a repository
7272+func (s Selector) IsRepo() bool {
7373+ return s.Where.Repo != "" && s.Where.Collection == "" && s.Where.Attr == ""
7474+}
38753939-func (self Selector) IsRepo() bool {
4040- return self.Where.Repo != ""
7676+// IsRepoRecord returns true if this selector targets specific records in a repository
7777+func (s Selector) IsRepoRecord() bool {
7878+ return s.Where.Repo != "" && s.Where.Collection != "" && s.Where.Attr != ""
4179}
42804343-func (self Selector) IsRepoRecord() bool {
4444- return self.Where.Repo != "" && self.Where.Collection != "" && self.Where.Attr != ""
8181+// IsService returns true if this selector targets a service endpoint
8282+func (s Selector) IsService() bool {
8383+ return s.Where.Service != "" && s.Where.Method != "" && s.Where.Attr != ""
4584}
46854747-func (self Selector) IsService() bool {
4848- return self.Where.Service != "" && self.Where.Method != "" && self.Where.Attr != ""
8686+// Type returns the type of selector as a string
8787+func (s Selector) Type() string {
8888+ switch {
8989+ case s.IsRepo():
9090+ return "repo"
9191+ case s.IsRepoRecord():
9292+ return "repo_record"
9393+ case s.IsService():
9494+ return "service"
9595+ default:
9696+ return "invalid"
9797+ }
4998}
50995151-func (self Selector) IsValid() bool {
5252- return self.IsRepo() || self.IsRepoRecord() || self.IsService()
5353-}
100100+// Validate checks if the selector is valid
101101+func (s Selector) Validate() error {
102102+ if s.Tag == "" {
103103+ return fmt.Errorf("missing tag")
104104+ }
541055555-func (self Selector) String() string {
5656- if self.Tag == "" {
5757- return "(Invalid selector)"
106106+ if !s.IsRepo() && !s.IsRepoRecord() && !s.IsService() {
107107+ return fmt.Errorf("invalid where clause: must specify either repo, repo+collection+attr, or service+method+attr")
58108 }
5959- return fmt.Sprintf("%s,tag=%s", self.Where, self.Tag)
109109+110110+ return nil
60111}
611126262-// WhereClause
113113+// String returns a string representation of the Selector
114114+func (s Selector) String() string {
115115+ return fmt.Sprintf("tag=%s,%s", s.Tag, s.Where)
116116+}
631176464-func (self WhereClause) String() string {
6565- if self.Repo != "" && self.Collection != "" && self.Attr != "" {
6666- return fmt.Sprintf("where=at://%s/%s/*#%s", self.Repo, self.Collection, self.Attr)
118118+// String returns a string representation of the WhereClause
119119+func (w WhereClause) String() string {
120120+ switch {
121121+ case w.Repo != "" && w.Collection != "" && w.Attr != "":
122122+ return fmt.Sprintf("where=at://%s/%s/*#%s", w.Repo, w.Collection, w.Attr)
123123+ case w.Repo != "":
124124+ return fmt.Sprintf("where=at://%s", w.Repo)
125125+ case w.Service != "" && w.Method != "" && w.Attr != "":
126126+ return fmt.Sprintf("where=https://%s/_xrpc/%s/*#%s", w.Service, w.Method, w.Attr)
127127+ default:
128128+ return "where=(invalid)"
67129 }
6868- if self.Repo != "" {
6969- return fmt.Sprintf("where=at://%s", self.Repo)
130130+}
131131+132132+// ParseSelectorDoc parses a JSON selector document
133133+func ParseSelectorDoc(data []byte) (*SelectorDoc, error) {
134134+ var doc SelectorDoc
135135+ if err := json.Unmarshal(data, &doc); err != nil {
136136+ return nil, fmt.Errorf("failed to parse selector doc: %w", err)
70137 }
7171- if self.Service != "" && self.Method != "" && self.Attr != "" {
7272- return fmt.Sprintf("where=https://%s/_xrpc/%s/*#%s", self.Service, self.Method, self.Attr)
138138+139139+ if err := doc.Validate(); err != nil {
140140+ return nil, fmt.Errorf("invalid selector doc: %w", err)
73141 }
7474- return "where=(Invalid clause)"
142142+143143+ return &doc, nil
75144}
+71-24
cmd/butterfly/store/stdout.go
···11-/*
22-Dump-to-stdout storage interface
33-*/
44-11+// Package store provides a stdout implementation of the Store interface
52package store
6374import (
55+ "context"
86 "fmt"
97108 "github.com/bluesky-social/indigo/cmd/butterfly/remote"
119)
12101111+// Output modes for StdoutStore
1312const (
1413 StdoutStoreModePassthrough = iota
1514 StdoutStoreModeStats
1615)
17161717+// StdoutStore implements Store by writing to stdout
1818type StdoutStore struct {
1919 Mode int
20202121- // stats
2222- // TODO: should support multiple repos
2323- Did string
2424- NumRecords uint
2121+ // Stats tracking
2222+ stats map[string]*repoStats
2523}
26242727-func (self *StdoutStore) Setup() error {
2525+type repoStats struct {
2626+ numRecords int
2727+ numCommits int
2828+ numErrors int
2929+ collections map[string]int
3030+}
3131+3232+// Setup initializes the store
3333+func (s *StdoutStore) Setup(ctx context.Context) error {
3434+ if s.Mode == StdoutStoreModeStats {
3535+ s.stats = make(map[string]*repoStats)
3636+ }
2837 return nil
2938}
30393131-func (self *StdoutStore) Close() error {
4040+// Close outputs final statistics if in stats mode
4141+func (s *StdoutStore) Close() error {
4242+ if s.Mode == StdoutStoreModeStats && len(s.stats) > 0 {
4343+ s.printStats()
4444+ }
3245 return nil
3346}
34473535-func (self *StdoutStore) Receive(s *remote.RemoteStream) error {
3636- for event := range s.Ch {
3737- if self.Did == "" {
3838- self.Did = event.Did
4848+// Receive processes events from the stream
4949+func (s *StdoutStore) Receive(ctx context.Context, stream *remote.RemoteStream) error {
5050+ for event := range stream.Ch {
5151+ select {
5252+ case <-ctx.Done():
5353+ return ctx.Err()
5454+ default:
3955 }
40564141- switch self.Mode {
5757+ switch s.Mode {
4258 case StdoutStoreModePassthrough:
4343- fmt.Println(event)
5959+ fmt.Printf("%+v\n", event)
4460 case StdoutStoreModeStats:
4545- if event.Kind == "commit" && event.Commit.Operation == "create" {
4646- self.NumRecords++
4747- }
6161+ s.updateStats(event)
4862 }
4963 }
6464+ return nil
6565+}
50665151- if self.Mode == StdoutStoreModeStats {
5252- // TODO make this more interesting
5353- fmt.Printf("Stats for repo %s\n", self.Did)
5454- fmt.Printf("%d records", self.NumRecords)
6767+func (s *StdoutStore) updateStats(event remote.StreamEvent) {
6868+ stats, exists := s.stats[event.Did]
6969+ if !exists {
7070+ stats = &repoStats{
7171+ collections: make(map[string]int),
7272+ }
7373+ s.stats[event.Did] = stats
7474+ }
7575+7676+ switch event.Kind {
7777+ case remote.EventKindCommit:
7878+ stats.numCommits++
7979+ if event.Commit != nil {
8080+ stats.numRecords++
8181+ stats.collections[event.Commit.Collection]++
8282+ }
8383+ case remote.EventKindError:
8484+ stats.numErrors++
5585 }
8686+}
56875757- return nil
8888+func (s *StdoutStore) printStats() {
8989+ fmt.Println("\n=== Repository Statistics ===")
9090+ for did, stats := range s.stats {
9191+ fmt.Printf("\nRepo: %s\n", did)
9292+ fmt.Printf(" Records: %d\n", stats.numRecords)
9393+ fmt.Printf(" Commits: %d\n", stats.numCommits)
9494+ if stats.numErrors > 0 {
9595+ fmt.Printf(" Errors: %d\n", stats.numErrors)
9696+ }
9797+9898+ if len(stats.collections) > 0 {
9999+ fmt.Println(" Collections:")
100100+ for col, count := range stats.collections {
101101+ fmt.Printf(" %s: %d\n", col, count)
102102+ }
103103+ }
104104+ }
58105}
+23-6
cmd/butterfly/store/store.go
···11+// Package store defines interfaces for persisting AT Protocol data
12package store
2333-import "github.com/bluesky-social/indigo/cmd/butterfly/remote"
44+import (
55+ "context"
4677+ "github.com/bluesky-social/indigo/cmd/butterfly/remote"
88+)
99+1010+// Store defines the interface for data persistence in the butterfly sync engine
511type Store interface {
66- // Initialize the store
77- Setup() error
1212+ // Setup initializes the store
1313+ Setup(ctx context.Context) error
81499- // Teardown the store
1515+ // Close tears down the store and releases resources
1016 Close() error
11171212- // Subscribe to a record emitter
1313- Receive(s *remote.RemoteStream) error
1818+ // Receive processes events from a remote stream
1919+ // The implementation should handle context cancellation appropriately
2020+ Receive(ctx context.Context, stream *remote.RemoteStream) error
1421}
2222+2323+// StoreType identifies the type of store
2424+type StoreType string
2525+2626+const (
2727+ StoreTypeStdout StoreType = "stdout"
2828+ StoreTypeDuckDB StoreType = "duckdb"
2929+ StoreTypeClickHouse StoreType = "clickhouse"
3030+ StoreTypeTarFiles StoreType = "tarfiles"
3131+)
+4
cmd/butterfly/store/tarfiles.go
···1111)
12121313type TarfilesStore struct {
1414+ // The directory to store the .tar files
1515+ // Each repository is stored as a single .tar file
1616+ // The contents of the .tar file is a collection of json files
1717+ // The directory structure is based on the cllections
1418 dirpath string
1519}
1620