this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Scaffold out remotes and stores

Paul Frazee ec89a9c5 1d49d154

+404 -7
+5
cmd/butterfly/identity.go
··· 1 + /* 2 + Identity resolution infra 3 + */ 4 + 5 + package main
+18 -7
cmd/butterfly/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "encoding/json" 5 4 "fmt" 5 + 6 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 7 + "github.com/bluesky-social/indigo/cmd/butterfly/store" 6 8 ) 7 9 8 10 func main() { 9 - fmt.Println(WhereClause{}) 10 - fmt.Println(WhereClause{Repo: "pfrazee.com", Collection: "app.bsky.graph.follow", Attr: "subject"}) 11 + r := remote.CarfileRemote{Filepath: "/Users/paulfrazee/tmp/carfiles/pfrazee.car"} 12 + s := store.StdoutStore{Mode: store.StdoutStoreModeStats} 13 + 14 + if err := s.Setup(); err != nil { 15 + fmt.Println(err) 16 + return 17 + } 18 + defer s.Close() 19 + 20 + res, err := r.FetchRepo(remote.FetchRepoParams{Did: "did:plc:ragtjsm2j2vknwkz3zp4oxrd"}) 21 + if err != nil { 22 + fmt.Println(err) 23 + return 24 + } 11 25 12 - docJson := `{"select": [{"where": {"repo": "pfrazee.com"},"tag": "user"}],"retain": {"user": {"*": "*"}}}` 13 - var doc SelectorDoc 14 - json.Unmarshal([]byte(docJson), &doc) 15 - fmt.Println((doc)) 26 + s.Receive(res) 16 27 }
+117
cmd/butterfly/remote/carfile.go
··· 1 + /* 2 + .car file remote interface 3 + */ 4 + package remote 5 + 6 + import ( 7 + "context" 8 + "fmt" 9 + "os" 10 + 11 + "github.com/bluesky-social/indigo/atproto/data" 12 + "github.com/bluesky-social/indigo/atproto/repo" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + "github.com/ipfs/go-cid" 15 + ) 16 + 17 + type CarfileRemote struct { 18 + Filepath string 19 + } 20 + 21 + func (self CarfileRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 22 + ctx := context.Background() 23 + 24 + _, _, did, err := ReadCar(ctx, self.Filepath) 25 + if err != nil { 26 + return nil, err 27 + } 28 + 29 + res := ListReposResult{ 30 + Dids: []string{did}, 31 + } 32 + return &res, nil 33 + } 34 + 35 + func (self CarfileRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 36 + ctx := context.Background() 37 + 38 + // read & validate 39 + _, r, did, err := ReadCar(ctx, self.Filepath) 40 + if err != nil { 41 + return nil, err 42 + } 43 + if did != params.Did { 44 + return nil, fmt.Errorf("Repo not found: %s", params.Did) 45 + } 46 + 47 + res := RemoteStream{Ch: make(chan StreamEvent)} 48 + 49 + // walk & emit 50 + go (func() { 51 + err = r.MST.Walk(func(k []byte, v cid.Cid) error { 52 + col, rkey, err := syntax.ParseRepoPath(string(k)) 53 + if err != nil { 54 + return err 55 + } 56 + recBytes, _, err := r.GetRecordBytes(ctx, col, rkey) 57 + if err != nil { 58 + return err 59 + } 60 + 61 + rec, err := data.UnmarshalCBOR(recBytes) 62 + if err != nil { 63 + return err 64 + } 65 + 66 + res.Ch <- StreamEvent{ 67 + Did: did, 68 + Time: 0, // TODO 69 + Kind: "commit", 70 + Commit: StreamEventCommit{ 71 + Rev: "", // TODO 72 + Operation: "create", 73 + Collection: col.String(), 74 + Rkey: rkey.String(), 75 + Record: rec, 76 + Cid: "", // TODO 77 + }, 78 + } 79 + return nil 80 + }) 81 + 82 + if err != nil { 83 + res.Ch <- StreamEvent{ 84 + Did: did, 85 + Time: 0, // TODO 86 + Kind: "Error", 87 + Error: StreamEventError{Err: err}, 88 + } 89 + } 90 + 91 + close(res.Ch) 92 + })() 93 + 94 + return &res, nil 95 + } 96 + 97 + func (self CarfileRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 98 + res := RemoteStream{Ch: make(chan StreamEvent)} 99 + close(res.Ch) 100 + return &res, nil 101 + } 102 + 103 + func ReadCar(ctx context.Context, path string) (*repo.Commit, *repo.Repo, string, error) { 104 + file, err := os.Open(path) 105 + if err != nil { 106 + return nil, nil, "", err 107 + } 108 + c, r, err := repo.LoadRepoFromCAR(ctx, file) 109 + if err != nil { 110 + return nil, nil, "", err 111 + } 112 + did, err := syntax.ParseDID(c.DID) 113 + if err != nil { 114 + return nil, nil, "", err 115 + } 116 + return c, r, did.String(), nil 117 + }
+22
cmd/butterfly/remote/jetstream.go
··· 1 + /* 2 + Jetstream remote interface 3 + */ 4 + package remote 5 + 6 + import "fmt" 7 + 8 + type JetstreamRemote struct { 9 + service string 10 + } 11 + 12 + func (self JetstreamRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 13 + return nil, fmt.Errorf("Not yet implemented") 14 + } 15 + 16 + func (self JetstreamRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 17 + return nil, fmt.Errorf("Not yet implemented") 18 + } 19 + 20 + func (self JetstreamRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 21 + return nil, fmt.Errorf("Not yet implemented") 22 + }
+22
cmd/butterfly/remote/pds.go
··· 1 + /* 2 + PDS remote interface 3 + */ 4 + package remote 5 + 6 + import "fmt" 7 + 8 + type PdsRemote struct { 9 + service string 10 + } 11 + 12 + func (self PdsRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 13 + return nil, fmt.Errorf("Not yet implemented") 14 + } 15 + 16 + func (self PdsRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 17 + return nil, fmt.Errorf("Not yet implemented") 18 + } 19 + 20 + func (self PdsRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 21 + return nil, fmt.Errorf("Not yet implemented") 22 + }
+22
cmd/butterfly/remote/relay.go
··· 1 + /* 2 + Relay remote interface 3 + */ 4 + package remote 5 + 6 + import "fmt" 7 + 8 + type RelayRemote struct { 9 + service string 10 + } 11 + 12 + func (self RelayRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 13 + return nil, fmt.Errorf("Not yet implemented") 14 + } 15 + 16 + func (self RelayRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 17 + return nil, fmt.Errorf("Not yet implemented") 18 + } 19 + 20 + func (self RelayRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 21 + return nil, fmt.Errorf("Not yet implemented") 22 + }
+67
cmd/butterfly/remote/remote.go
··· 1 + package remote 2 + 3 + type Remote interface { 4 + // Lists repositories hosted at the given remote. Not all parameters will be supported. 5 + ListRepos(params ListReposParams) (ListReposResult, error) 6 + 7 + // Fetches the contents of the requested repositories. Not all parameters will be supported. 8 + FetchRepo(params FetchRepoParams) (RemoteStream, error) 9 + 10 + // Subscribes to the record event-stream of the remote. Not all parameters will be supported. 11 + SubscribeRecords(params SubscribeRecordsParams) (RemoteStream, error) 12 + } 13 + 14 + type ListReposParams struct { 15 + Collection string 16 + } 17 + type ListReposResult struct { 18 + Dids []string 19 + } 20 + 21 + type FetchRepoParams struct { 22 + Did string 23 + Collections []string 24 + } 25 + 26 + type SubscribeRecordsParams struct { 27 + Dids []string 28 + Collections []string 29 + } 30 + 31 + type RemoteStream struct { 32 + Ch chan StreamEvent 33 + } 34 + 35 + type StreamEvent struct { 36 + Did string 37 + Time uint 38 + Kind string 39 + Commit StreamEventCommit 40 + Identity StreamEventIdentity 41 + Account StreamEventAccount 42 + Error StreamEventError 43 + } 44 + 45 + type StreamEventCommit struct { 46 + Rev string 47 + Operation string 48 + Collection string 49 + Rkey string 50 + Record map[string]any 51 + Cid string 52 + } 53 + type StreamEventIdentity struct { 54 + Did string 55 + Handle string 56 + Seq uint 57 + Time string 58 + } 59 + type StreamEventAccount struct { 60 + Active bool 61 + Did string 62 + Seq uint 63 + Time string 64 + } 65 + type StreamEventError struct { 66 + Err error 67 + }
+22
cmd/butterfly/selectors.go
··· 28 28 29 29 type Retainer map[string]map[string]string 30 30 31 + // SelectorDoc 32 + 31 33 func (self SelectorDoc) String() string { 32 34 return fmt.Sprintf("%s retain=%s", self.Selectors, self.Retainers) 33 35 } 34 36 37 + // Selector 38 + 39 + func (self Selector) IsRepo() bool { 40 + return self.Where.Repo != "" 41 + } 42 + 43 + func (self Selector) IsRepoRecord() bool { 44 + return self.Where.Repo != "" && self.Where.Collection != "" && self.Where.Attr != "" 45 + } 46 + 47 + func (self Selector) IsService() bool { 48 + return self.Where.Service != "" && self.Where.Method != "" && self.Where.Attr != "" 49 + } 50 + 51 + func (self Selector) IsValid() bool { 52 + return self.IsRepo() || self.IsRepoRecord() || self.IsService() 53 + } 54 + 35 55 func (self Selector) String() string { 36 56 if self.Tag == "" { 37 57 return "(Invalid selector)" 38 58 } 39 59 return fmt.Sprintf("%s,tag=%s", self.Where, self.Tag) 40 60 } 61 + 62 + // WhereClause 41 63 42 64 func (self WhereClause) String() string { 43 65 if self.Repo != "" && self.Collection != "" && self.Attr != "" {
+5
cmd/butterfly/store/clickhouse.go
··· 1 + /* 2 + Clickhouse storage interface 3 + */ 4 + 5 + package store
+5
cmd/butterfly/store/duckdb.go
··· 1 + /* 2 + DuckDB storage interface 3 + */ 4 + 5 + package store
+58
cmd/butterfly/store/stdout.go
··· 1 + /* 2 + Dump-to-stdout storage interface 3 + */ 4 + 5 + package store 6 + 7 + import ( 8 + "fmt" 9 + 10 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 11 + ) 12 + 13 + const ( 14 + StdoutStoreModePassthrough = iota 15 + StdoutStoreModeStats 16 + ) 17 + 18 + type StdoutStore struct { 19 + Mode int 20 + 21 + // stats 22 + // TODO: should support multiple repos 23 + Did string 24 + NumRecords uint 25 + } 26 + 27 + func (self *StdoutStore) Setup() error { 28 + return nil 29 + } 30 + 31 + func (self *StdoutStore) Close() error { 32 + return nil 33 + } 34 + 35 + func (self *StdoutStore) Receive(s *remote.RemoteStream) error { 36 + for event := range s.Ch { 37 + if self.Did == "" { 38 + self.Did = event.Did 39 + } 40 + 41 + switch self.Mode { 42 + case StdoutStoreModePassthrough: 43 + fmt.Println(event) 44 + case StdoutStoreModeStats: 45 + if event.Kind == "commit" && event.Commit.Operation == "create" { 46 + self.NumRecords++ 47 + } 48 + } 49 + } 50 + 51 + if self.Mode == StdoutStoreModeStats { 52 + // TODO make this more interesting 53 + fmt.Printf("Stats for repo %s\n", self.Did) 54 + fmt.Printf("%d records", self.NumRecords) 55 + } 56 + 57 + return nil 58 + }
+14
cmd/butterfly/store/store.go
··· 1 + package store 2 + 3 + import "github.com/bluesky-social/indigo/cmd/butterfly/remote" 4 + 5 + type Store interface { 6 + // Initialize the store 7 + Setup() error 8 + 9 + // Teardown the store 10 + Close() error 11 + 12 + // Subscribe to a record emitter 13 + Receive(s *remote.RemoteStream) error 14 + }
+27
cmd/butterfly/store/tarfiles.go
··· 1 + /* 2 + Write-to-tarfiles storage interface 3 + */ 4 + 5 + package store 6 + 7 + import ( 8 + "fmt" 9 + 10 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 11 + ) 12 + 13 + type TarfilesStore struct { 14 + dirpath string 15 + } 16 + 17 + func (self *TarfilesStore) Setup() error { 18 + return fmt.Errorf("Not yet implemented") 19 + } 20 + 21 + func (self *TarfilesStore) Close() error { 22 + return fmt.Errorf("Not yet implemented") 23 + } 24 + 25 + func (self *TarfilesStore) Receive(s *remote.RemoteStream) error { 26 + return fmt.Errorf("Not yet implemented") 27 + }