this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement PDS backfill

Paul Frazee 87a11644 5b10f824

+234 -35
+73 -15
cmd/butterfly/main.go
··· 14 14 func main() { 15 15 // Command line flags 16 16 var ( 17 - carFile = flag.String("car", "", "Path to CAR file to read") 18 - did = flag.String("did", "", "DID to fetch (required)") 17 + // Input source flags 18 + inputMode = flag.String("input", "", "Input mode: carfile, pds, relay, or jetstream (required)") 19 + carFile = flag.String("car", "", "Path to CAR file to read (for carfile mode)") 20 + pdsService = flag.String("pds", "", "PDS service URL (for pds mode)") 21 + relayService = flag.String("relay", "", "Relay service URL (for relay mode)") 22 + jetService = flag.String("jetstream", "", "Jetstream service URL (for jetstream mode)") 23 + 24 + // Common flags 25 + did = flag.String("did", "", "DID to fetch (required for carfile/pds modes)") 19 26 outputMode = flag.String("output", "stats", "Output mode: stats, passthrough, tarfiles, or duckdb") 20 27 outputDir = flag.String("output-dir", "./output", "Output directory for tarfiles mode") 21 28 dbPath = flag.String("db", "./butterfly.db", "Path to DuckDB database file") ··· 23 30 ) 24 31 flag.Parse() 25 32 26 - if *help || *carFile == "" || *did == "" { 27 - fmt.Fprintf(os.Stderr, "Usage: butterfly -car <path> -did <did> [-output stats|passthrough|tarfiles|duckdb] [-output-dir <dir>] [-db <path>]\n") 33 + if *help || *inputMode == "" { 34 + fmt.Fprintf(os.Stderr, "Usage: butterfly -input <mode> [options]\n\n") 35 + fmt.Fprintf(os.Stderr, "Input modes:\n") 36 + fmt.Fprintf(os.Stderr, " carfile Read from a CAR file (-car <path> -did <did>)\n") 37 + fmt.Fprintf(os.Stderr, " pds Fetch from a PDS (-pds <url> -did <did>)\n") 38 + fmt.Fprintf(os.Stderr, " relay Subscribe to a relay (-relay <url>)\n") 39 + fmt.Fprintf(os.Stderr, " jetstream Subscribe to Jetstream (-jetstream <url>)\n\n") 28 40 flag.PrintDefaults() 29 41 os.Exit(1) 30 42 } ··· 32 44 // Set up logger 33 45 logger := log.New(os.Stderr, "butterfly: ", log.LstdFlags) 34 46 35 - // Create remote 36 - r := &remote.CarfileRemote{Filepath: *carFile} 47 + // Create remote based on input mode 48 + var r remote.Remote 49 + switch *inputMode { 50 + case "carfile": 51 + if *carFile == "" || *did == "" { 52 + logger.Fatalf("carfile mode requires -car and -did flags") 53 + } 54 + r = &remote.CarfileRemote{Filepath: *carFile} 55 + case "pds": 56 + if *pdsService == "" || *did == "" { 57 + logger.Fatalf("pds mode requires -pds and -did flags") 58 + } 59 + r = &remote.PdsRemote{Service: *pdsService} 60 + case "relay": 61 + if *relayService == "" { 62 + logger.Fatalf("relay mode requires -relay flag") 63 + } 64 + r = &remote.RelayRemote{Service: *relayService} 65 + case "jetstream": 66 + if *jetService == "" { 67 + logger.Fatalf("jetstream mode requires -jetstream flag") 68 + } 69 + r = &remote.JetstreamRemote{Service: *jetService} 70 + default: 71 + logger.Fatalf("unknown input mode: %s", *inputMode) 72 + } 37 73 38 74 // Create store based on output mode 39 75 var s store.Store ··· 63 99 } 64 100 }() 65 101 66 - // Fetch repository 67 - stream, err := r.FetchRepo(ctx, remote.FetchRepoParams{Did: *did}) 68 - if err != nil { 69 - logger.Fatalf("failed to fetch repo: %v", err) 70 - } 71 - defer stream.Close() 102 + // Handle different input modes 103 + switch *inputMode { 104 + case "carfile", "pds": 105 + // These modes fetch a specific repository 106 + stream, err := r.FetchRepo(ctx, remote.FetchRepoParams{Did: *did}) 107 + if err != nil { 108 + logger.Fatalf("failed to fetch repo: %v", err) 109 + } 110 + defer stream.Close() 72 111 73 - // Process the stream 74 - if err := s.BackfillRepo(ctx, *did, stream); err != nil { 75 - logger.Fatalf("failed to process stream: %v", err) 112 + // Process the stream 113 + if err := s.BackfillRepo(ctx, *did, stream); err != nil { 114 + logger.Fatalf("failed to process stream: %v", err) 115 + } 116 + 117 + case "relay", "jetstream": 118 + // These modes subscribe to record streams 119 + params := remote.SubscribeRecordsParams{} 120 + if *did != "" { 121 + params.Dids = []string{*did} 122 + } 123 + 124 + stream, err := r.SubscribeRecords(ctx, params) 125 + if err != nil { 126 + logger.Fatalf("failed to subscribe to records: %v", err) 127 + } 128 + defer stream.Close() 129 + 130 + // Process the stream continuously 131 + if err := s.ActiveSync(ctx, stream); err != nil { 132 + logger.Fatalf("failed to process stream: %v", err) 133 + } 76 134 } 77 135 }
+5 -4
cmd/butterfly/remote/jetstream.go
··· 4 4 package remote 5 5 6 6 import ( 7 + "context" 7 8 "fmt" 8 9 ) 9 10 10 11 type JetstreamRemote struct { 11 - service string 12 + Service string 12 13 } 13 14 14 15 // ListRepos not supported by jetstream 15 - func (self JetstreamRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 16 + func (self JetstreamRemote) ListRepos(ctx context.Context, params ListReposParams) (*ListReposResult, error) { 16 17 return nil, fmt.Errorf("list repos: %w", ErrNotSupported) 17 18 } 18 19 19 20 // FetchRepo not supported by jetstream 20 - func (self JetstreamRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 21 + func (self JetstreamRemote) FetchRepo(ctx context.Context, params FetchRepoParams) (*RemoteStream, error) { 21 22 return nil, fmt.Errorf("fetch repo: %w", ErrNotSupported) 22 23 } 23 24 24 - func (self JetstreamRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 25 + func (self JetstreamRemote) SubscribeRecords(ctx context.Context, params SubscribeRecordsParams) (*RemoteStream, error) { 25 26 return nil, fmt.Errorf("Not yet implemented") 26 27 }
+148 -11
cmd/butterfly/remote/pds.go
··· 1 - /* 2 - PDS remote interface 3 - */ 1 + // Package remote provides a PDS implementation of the Remote interface 4 2 package remote 5 3 6 - import "fmt" 4 + import ( 5 + "context" 6 + "fmt" 7 + "net/http" 8 + "time" 7 9 10 + "github.com/bluesky-social/indigo/atproto/data" 11 + "github.com/bluesky-social/indigo/atproto/repo" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "github.com/ipfs/go-cid" 14 + ) 15 + 16 + // PdsRemote implements the Remote interface for reading from CAR files 8 17 type PdsRemote struct { 9 - service string 18 + Service string 10 19 } 11 20 12 - func (self PdsRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 13 - return nil, fmt.Errorf("Not yet implemented") 21 + // ListRepos returns the DIDs hosted by the PDS 22 + func (p *PdsRemote) ListRepos(ctx context.Context, params ListReposParams) (*ListReposResult, error) { 23 + // TODO 24 + return nil, fmt.Errorf("list repos: %w", ErrNotImplemented) 14 25 } 15 26 16 - func (self PdsRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 17 - return nil, fmt.Errorf("Not yet implemented") 27 + // FetchRepo streams the contents of a repository from the PDS 28 + func (p *PdsRemote) FetchRepo(ctx context.Context, params FetchRepoParams) (*RemoteStream, error) { 29 + // Clone repo 30 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", p.Service, params.Did) 31 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 32 + if err != nil { 33 + return nil, fmt.Errorf("failed to create request: %w", err) 34 + } 35 + 36 + req.Header.Set("Accept", "application/vnd.ipld.car") 37 + req.Header.Set("User-Agent", "bsky-butterfly/0.0.1") 38 + 39 + // TODO 40 + // if s.magicHeaderKey != "" && s.magicHeaderVal != "" { 41 + // req.Header.Set(s.magicHeaderKey, s.magicHeaderVal) 42 + // } 43 + 44 + client := &http.Client{ 45 + Timeout: 180 * time.Second, 46 + } 47 + resp, err := client.Do(req) 48 + if err != nil { 49 + return nil, fmt.Errorf("failed to get repo: %w", err) 50 + } 51 + defer resp.Body.Close() 52 + 53 + if resp.StatusCode != http.StatusOK { 54 + return nil, fmt.Errorf("failed to get repo: %s", resp.Status) 55 + } 56 + 57 + commit, r, err := repo.LoadRepoFromCAR(ctx, resp.Body) 58 + if err != nil { 59 + return nil, fmt.Errorf("Failed to read repo from CAR: %w", err) 60 + } 61 + 62 + // Create stream with cancellable context 63 + streamCtx, cancel := context.WithCancel(ctx) 64 + stream := &RemoteStream{ 65 + Ch: make(chan StreamEvent, 100), // Buffer for better performance 66 + cancel: cancel, 67 + } 68 + 69 + // Stream repository contents 70 + go func() { 71 + defer close(stream.Ch) 72 + defer cancel() 73 + 74 + // Send records from the repository 75 + err := r.MST.Walk(func(k []byte, v cid.Cid) error { 76 + // Check for cancellation 77 + select { 78 + case <-streamCtx.Done(): 79 + return streamCtx.Err() 80 + default: 81 + } 82 + 83 + col, rkey, err := syntax.ParseRepoPath(string(k)) 84 + if err != nil { 85 + return fmt.Errorf("invalid repo path %q: %w", string(k), err) 86 + } 87 + 88 + // Skip if collections filter is specified 89 + if len(params.Collections) > 0 { 90 + found := false 91 + for _, c := range params.Collections { 92 + if c == col.String() { 93 + found = true 94 + break 95 + } 96 + } 97 + if !found { 98 + return nil 99 + } 100 + } 101 + 102 + recBytes, _, err := r.GetRecordBytes(streamCtx, col, rkey) 103 + if err != nil { 104 + return fmt.Errorf("failed to get record %s/%s: %w", col, rkey, err) 105 + } 106 + 107 + rec, err := data.UnmarshalCBOR(recBytes) 108 + if err != nil { 109 + return fmt.Errorf("failed to unmarshal record %s/%s: %w", col, rkey, err) 110 + } 111 + 112 + event := StreamEvent{ 113 + Did: params.Did, 114 + Timestamp: time.Now(), // TODO - CAR files don't have timestamps? 115 + Kind: EventKindCommit, 116 + Commit: &StreamEventCommit{ 117 + Rev: commit.Rev, // TODO - is this accurate? 118 + Operation: OpCreate, 119 + Collection: col.String(), 120 + Rkey: rkey.String(), 121 + Record: rec, 122 + Cid: v.String(), 123 + }, 124 + } 125 + 126 + select { 127 + case stream.Ch <- event: 128 + case <-streamCtx.Done(): 129 + return streamCtx.Err() 130 + } 131 + 132 + return nil 133 + }) 134 + 135 + if err != nil { 136 + // Send error event 137 + select { 138 + case stream.Ch <- StreamEvent{ 139 + Did: params.Did, 140 + Timestamp: time.Now(), 141 + Kind: EventKindError, 142 + Error: &StreamEventError{ 143 + Err: err, 144 + Fatal: true, 145 + }, 146 + }: 147 + case <-streamCtx.Done(): 148 + } 149 + } 150 + }() 151 + 152 + return stream, nil 18 153 } 19 154 20 - func (self PdsRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 21 - return nil, fmt.Errorf("Not yet implemented") 155 + // SubscribeRecords subscribes to the record event-stream of the remote 156 + func (p *PdsRemote) SubscribeRecords(ctx context.Context, params SubscribeRecordsParams) (*RemoteStream, error) { 157 + // TODO 158 + return nil, fmt.Errorf("subscribe records: %w", ErrNotImplemented) 22 159 }
+8 -5
cmd/butterfly/remote/relay.go
··· 3 3 */ 4 4 package remote 5 5 6 - import "fmt" 6 + import ( 7 + "context" 8 + "fmt" 9 + ) 7 10 8 11 type RelayRemote struct { 9 - service string 12 + Service string 10 13 } 11 14 12 - func (self RelayRemote) ListRepos(params ListReposParams) (*ListReposResult, error) { 15 + func (self RelayRemote) ListRepos(ctx context.Context, params ListReposParams) (*ListReposResult, error) { 13 16 return nil, fmt.Errorf("Not yet implemented") 14 17 } 15 18 16 - func (self RelayRemote) FetchRepo(params FetchRepoParams) (*RemoteStream, error) { 19 + func (self RelayRemote) FetchRepo(ctx context.Context, params FetchRepoParams) (*RemoteStream, error) { 17 20 return nil, fmt.Errorf("Not yet implemented") 18 21 } 19 22 20 - func (self RelayRemote) SubscribeRecords(params SubscribeRecordsParams) (*RemoteStream, error) { 23 + func (self RelayRemote) SubscribeRecords(ctx context.Context, params SubscribeRecordsParams) (*RemoteStream, error) { 21 24 return nil, fmt.Errorf("Not yet implemented") 22 25 }