this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Move cli to use urfave/cli and break into subcommands; add discover subcommand

Paul Frazee d0384459 c60860df

+426 -122
+234
cmd/butterfly/cmd-discover.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log" 8 + "os" 9 + 10 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 11 + "github.com/urfave/cli/v2" 12 + ) 13 + 14 + var discoverCommand = &cli.Command{ 15 + Name: "discover", 16 + Usage: "Discover repositories using ListRepos queries", 17 + Description: `The discover command uses the remote ListRepos method to discover and query 18 + repositories from PDS instances and relays. It supports pagination and filtering 19 + by collection.`, 20 + Flags: []cli.Flag{ 21 + // Input source flags 22 + &cli.StringFlag{ 23 + Name: "input", 24 + Usage: "Input mode: pds or relay", 25 + Required: true, 26 + }, 27 + &cli.StringFlag{ 28 + Name: "pds", 29 + Usage: "PDS service URL (for pds mode)", 30 + }, 31 + &cli.StringFlag{ 32 + Name: "relay", 33 + Usage: "Relay service URL (for relay mode)", 34 + }, 35 + // Query parameters 36 + &cli.StringFlag{ 37 + Name: "collection", 38 + Usage: "Filter by collection (optional)", 39 + }, 40 + &cli.StringFlag{ 41 + Name: "cursor", 42 + Usage: "Pagination cursor (optional)", 43 + }, 44 + &cli.IntFlag{ 45 + Name: "limit", 46 + Value: 100, 47 + Usage: "Maximum number of repositories to return", 48 + }, 49 + // Output format 50 + &cli.StringFlag{ 51 + Name: "format", 52 + Value: "text", 53 + Usage: "Output format: text, json, or csv", 54 + }, 55 + // Identity resolution 56 + &cli.BoolFlag{ 57 + Name: "resolve", 58 + Usage: "Resolve DIDs to handles using identity resolution", 59 + }, 60 + &cli.BoolFlag{ 61 + Name: "cache", 62 + Usage: "Enable identity resolution caching", 63 + Value: true, 64 + }, 65 + }, 66 + Action: runDiscover, 67 + } 68 + 69 + func runDiscover(c *cli.Context) error { 70 + // Set up logger 71 + logger := log.New(os.Stderr, "butterfly discover: ", log.LstdFlags) 72 + 73 + // Get flags 74 + inputMode := c.String("input") 75 + pdsService := c.String("pds") 76 + relayService := c.String("relay") 77 + collection := c.String("collection") 78 + cursor := c.String("cursor") 79 + limit := c.Int("limit") 80 + format := c.String("format") 81 + resolve := c.Bool("resolve") 82 + enableCache := c.Bool("cache") 83 + 84 + // Create remote based on input mode 85 + var r remote.Remote 86 + switch inputMode { 87 + case "pds": 88 + if pdsService == "" { 89 + return fmt.Errorf("pds mode requires -pds flag") 90 + } 91 + r = &remote.PdsRemote{Service: pdsService} 92 + case "relay": 93 + if relayService == "" { 94 + return fmt.Errorf("relay mode requires -relay flag") 95 + } 96 + r = &remote.RelayRemote{Service: relayService} 97 + default: 98 + return fmt.Errorf("unknown input mode for discover: %s (must be pds or relay)", inputMode) 99 + } 100 + 101 + // Create context 102 + ctx := context.Background() 103 + 104 + // Prepare ListRepos parameters 105 + params := remote.ListReposParams{ 106 + Collection: collection, 107 + Cursor: cursor, 108 + Limit: limit, 109 + } 110 + 111 + // Call ListRepos 112 + result, err := r.ListRepos(ctx, params) 113 + if err != nil { 114 + return fmt.Errorf("failed to list repos: %w", err) 115 + } 116 + 117 + // Set up identity resolver if requested 118 + var resolver *IdentityResolver 119 + if resolve { 120 + resolver = NewIdentityResolverWithConfig(IdentityResolverConfig{ 121 + EnableCache: enableCache, 122 + }) 123 + } 124 + 125 + // Format and output results 126 + switch format { 127 + case "text": 128 + return outputDiscoverText(result, resolver, logger) 129 + case "json": 130 + return outputDiscoverJSON(result, resolver) 131 + case "csv": 132 + return outputDiscoverCSV(result, resolver) 133 + default: 134 + return fmt.Errorf("unknown output format: %s", format) 135 + } 136 + } 137 + 138 + func outputDiscoverText(result *remote.ListReposResult, resolver *IdentityResolver, logger *log.Logger) error { 139 + fmt.Printf("Found %d repositories\n\n", len(result.Dids)) 140 + 141 + for i, did := range result.Dids { 142 + fmt.Printf("%d. %s", i+1, did) 143 + 144 + // Resolve to handle if requested 145 + if resolver != nil { 146 + ctx := context.Background() 147 + ident, err := resolver.ResolveDID(ctx, did) 148 + if err != nil { 149 + logger.Printf("Failed to resolve %s: %v", did, err) 150 + fmt.Println() 151 + } else { 152 + fmt.Printf(" (%s)", ident.Handle) 153 + if pds, err := resolver.GetPDSEndpoint(ident); err == nil { 154 + fmt.Printf(" - PDS: %s", pds) 155 + } 156 + fmt.Println() 157 + } 158 + } else { 159 + fmt.Println() 160 + } 161 + } 162 + 163 + if result.Cursor != "" { 164 + fmt.Printf("\nNext cursor: %s\n", result.Cursor) 165 + } 166 + 167 + return nil 168 + } 169 + 170 + func outputDiscoverJSON(result *remote.ListReposResult, resolver *IdentityResolver) error { 171 + output := map[string]interface{}{ 172 + "count": len(result.Dids), 173 + "cursor": result.Cursor, 174 + } 175 + 176 + if resolver != nil { 177 + // Resolve DIDs and include additional info 178 + repos := make([]map[string]interface{}, 0, len(result.Dids)) 179 + ctx := context.Background() 180 + 181 + for _, did := range result.Dids { 182 + repo := map[string]interface{}{ 183 + "did": did, 184 + } 185 + 186 + if ident, err := resolver.ResolveDID(ctx, did); err == nil { 187 + repo["handle"] = ident.Handle.String() 188 + if pds, err := resolver.GetPDSEndpoint(ident); err == nil { 189 + repo["pds"] = pds 190 + } 191 + } 192 + 193 + repos = append(repos, repo) 194 + } 195 + output["repos"] = repos 196 + } else { 197 + output["dids"] = result.Dids 198 + } 199 + 200 + // Pretty print JSON 201 + encoder := json.NewEncoder(os.Stdout) 202 + encoder.SetIndent("", " ") 203 + return encoder.Encode(output) 204 + } 205 + 206 + func outputDiscoverCSV(result *remote.ListReposResult, resolver *IdentityResolver) error { 207 + // Print CSV header 208 + if resolver != nil { 209 + fmt.Println("did,handle,pds") 210 + } else { 211 + fmt.Println("did") 212 + } 213 + 214 + // Print rows 215 + ctx := context.Background() 216 + for _, did := range result.Dids { 217 + if resolver != nil { 218 + ident, err := resolver.ResolveDID(ctx, did) 219 + if err != nil { 220 + fmt.Printf("%s,,\n", did) 221 + } else { 222 + pds := "" 223 + if endpoint, err := resolver.GetPDSEndpoint(ident); err == nil { 224 + pds = endpoint 225 + } 226 + fmt.Printf("%s,%s,%s\n", did, ident.Handle, pds) 227 + } 228 + } else { 229 + fmt.Println(did) 230 + } 231 + } 232 + 233 + return nil 234 + }
+172
cmd/butterfly/cmd-sync.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log" 7 + "os" 8 + 9 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 10 + "github.com/bluesky-social/indigo/cmd/butterfly/store" 11 + "github.com/urfave/cli/v2" 12 + ) 13 + 14 + var syncCommand = &cli.Command{ 15 + Name: "sync", 16 + Usage: "Sync repositories from various sources", 17 + Description: `The sync command fetches and syncs repository data from various sources including 18 + CAR files, PDS instances, relays, and Jetstream. It supports different output modes 19 + for storing or processing the synced data.`, 20 + Flags: []cli.Flag{ 21 + // Input source flags 22 + &cli.StringFlag{ 23 + Name: "input", 24 + Usage: "Input mode: carfile, pds, relay, or jetstream", 25 + Required: true, 26 + }, 27 + &cli.StringFlag{ 28 + Name: "car", 29 + Usage: "Path to CAR file to read (for carfile mode)", 30 + }, 31 + &cli.StringFlag{ 32 + Name: "pds", 33 + Usage: "PDS service URL (for pds mode)", 34 + }, 35 + &cli.StringFlag{ 36 + Name: "relay", 37 + Usage: "Relay service URL (for relay mode)", 38 + }, 39 + &cli.StringFlag{ 40 + Name: "jetstream", 41 + Usage: "Jetstream service URL (for jetstream mode)", 42 + }, 43 + // Common flags 44 + &cli.StringFlag{ 45 + Name: "did", 46 + Usage: "DID to fetch (required for carfile/pds modes)", 47 + }, 48 + &cli.StringFlag{ 49 + Name: "output", 50 + Value: "stats", 51 + Usage: "Output mode: stats, passthrough, tarfiles, or duckdb", 52 + }, 53 + &cli.StringFlag{ 54 + Name: "output-dir", 55 + Value: "./output", 56 + Usage: "Output directory for tarfiles mode", 57 + }, 58 + &cli.StringFlag{ 59 + Name: "db", 60 + Value: "./butterfly.db", 61 + Usage: "Path to DuckDB database file", 62 + }, 63 + }, 64 + Action: runSync, 65 + } 66 + 67 + func runSync(c *cli.Context) error { 68 + // Set up logger 69 + logger := log.New(os.Stderr, "butterfly sync: ", log.LstdFlags) 70 + 71 + // Get flags 72 + inputMode := c.String("input") 73 + carFile := c.String("car") 74 + pdsService := c.String("pds") 75 + relayService := c.String("relay") 76 + jetService := c.String("jetstream") 77 + did := c.String("did") 78 + outputMode := c.String("output") 79 + outputDir := c.String("output-dir") 80 + dbPath := c.String("db") 81 + 82 + // Create remote based on input mode 83 + var r remote.Remote 84 + switch inputMode { 85 + case "carfile": 86 + if carFile == "" || did == "" { 87 + return fmt.Errorf("carfile mode requires -car and -did flags") 88 + } 89 + r = &remote.CarfileRemote{Filepath: carFile} 90 + case "pds": 91 + if pdsService == "" || did == "" { 92 + return fmt.Errorf("pds mode requires -pds and -did flags") 93 + } 94 + r = &remote.PdsRemote{Service: pdsService} 95 + case "relay": 96 + if relayService == "" { 97 + return fmt.Errorf("relay mode requires -relay flag") 98 + } 99 + r = &remote.RelayRemote{Service: relayService} 100 + case "jetstream": 101 + if jetService == "" { 102 + return fmt.Errorf("jetstream mode requires -jetstream flag") 103 + } 104 + r = &remote.JetstreamRemote{Service: jetService} 105 + default: 106 + return fmt.Errorf("unknown input mode: %s", inputMode) 107 + } 108 + 109 + // Create store based on output mode 110 + var s store.Store 111 + switch outputMode { 112 + case "passthrough": 113 + s = &store.StdoutStore{Mode: store.StdoutStoreModePassthrough} 114 + case "stats": 115 + s = &store.StdoutStore{Mode: store.StdoutStoreModeStats} 116 + case "tarfiles": 117 + s = store.NewTarfilesStore(outputDir) 118 + case "duckdb": 119 + s = store.NewDuckdbStore(dbPath) 120 + default: 121 + return fmt.Errorf("unknown output mode: %s", outputMode) 122 + } 123 + 124 + // Create context 125 + ctx := context.Background() 126 + 127 + // Initialize store 128 + if err := s.Setup(ctx); err != nil { 129 + return fmt.Errorf("failed to setup store: %w", err) 130 + } 131 + defer func() { 132 + if err := s.Close(); err != nil { 133 + logger.Printf("failed to close store: %v", err) 134 + } 135 + }() 136 + 137 + // Handle different input modes 138 + switch inputMode { 139 + case "carfile", "pds": 140 + // These modes fetch a specific repository 141 + stream, err := r.FetchRepo(ctx, remote.FetchRepoParams{Did: did}) 142 + if err != nil { 143 + return fmt.Errorf("failed to fetch repo: %w", err) 144 + } 145 + defer stream.Close() 146 + 147 + // Process the stream 148 + if err := s.BackfillRepo(ctx, did, stream); err != nil { 149 + return fmt.Errorf("failed to process stream: %w", err) 150 + } 151 + 152 + case "relay", "jetstream": 153 + // These modes subscribe to record streams 154 + params := remote.SubscribeRecordsParams{} 155 + if did != "" { 156 + params.Dids = []string{did} 157 + } 158 + 159 + stream, err := r.SubscribeRecords(ctx, params) 160 + if err != nil { 161 + return fmt.Errorf("failed to subscribe to records: %w", err) 162 + } 163 + defer stream.Close() 164 + 165 + // Process the stream continuously 166 + if err := s.ActiveSync(ctx, stream); err != nil { 167 + return fmt.Errorf("failed to process stream: %w", err) 168 + } 169 + } 170 + 171 + return nil 172 + }
+20 -122
cmd/butterfly/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "context" 5 - "flag" 6 - "fmt" 7 4 "log" 8 5 "os" 9 6 10 - "github.com/bluesky-social/indigo/cmd/butterfly/remote" 11 - "github.com/bluesky-social/indigo/cmd/butterfly/store" 7 + "github.com/urfave/cli/v2" 12 8 ) 13 9 14 10 func main() { 15 - // Command line flags 16 - var ( 17 - // Input source flags 18 - inputMode = flag.String("input", "", "Input mode: carfile, pds, relay, or jetstream (required)") 19 - carFile = flag.String("car", "", "Path to CAR file to read (for carfile mode)") 20 - pdsService = flag.String("pds", "", "PDS service URL (for pds mode)") 21 - relayService = flag.String("relay", "", "Relay service URL (for relay mode)") 22 - jetService = flag.String("jetstream", "", "Jetstream service URL (for jetstream mode)") 23 - 24 - // Common flags 25 - did = flag.String("did", "", "DID to fetch (required for carfile/pds modes)") 26 - outputMode = flag.String("output", "stats", "Output mode: stats, passthrough, tarfiles, or duckdb") 27 - outputDir = flag.String("output-dir", "./output", "Output directory for tarfiles mode") 28 - dbPath = flag.String("db", "./butterfly.db", "Path to DuckDB database file") 29 - help = flag.Bool("help", false, "Show help") 30 - ) 31 - flag.Parse() 32 - 33 - if *help || *inputMode == "" { 34 - fmt.Fprintf(os.Stderr, "Usage: butterfly -input <mode> [options]\n\n") 35 - fmt.Fprintf(os.Stderr, "Input modes:\n") 36 - fmt.Fprintf(os.Stderr, " carfile Read from a CAR file (-car <path> -did <did>)\n") 37 - fmt.Fprintf(os.Stderr, " pds Fetch from a PDS (-pds <url> -did <did>)\n") 38 - fmt.Fprintf(os.Stderr, " relay Subscribe to a relay (-relay <url>)\n") 39 - fmt.Fprintf(os.Stderr, " jetstream Subscribe to Jetstream (-jetstream <url>)\n\n") 40 - flag.PrintDefaults() 41 - os.Exit(1) 42 - } 43 - 44 - // Set up logger 45 - logger := log.New(os.Stderr, "butterfly: ", log.LstdFlags) 46 - 47 - // Create remote based on input mode 48 - var r remote.Remote 49 - switch *inputMode { 50 - case "carfile": 51 - if *carFile == "" || *did == "" { 52 - logger.Fatalf("carfile mode requires -car and -did flags") 53 - } 54 - r = &remote.CarfileRemote{Filepath: *carFile} 55 - case "pds": 56 - if *pdsService == "" || *did == "" { 57 - logger.Fatalf("pds mode requires -pds and -did flags") 58 - } 59 - r = &remote.PdsRemote{Service: *pdsService} 60 - case "relay": 61 - if *relayService == "" { 62 - logger.Fatalf("relay mode requires -relay flag") 63 - } 64 - r = &remote.RelayRemote{Service: *relayService} 65 - case "jetstream": 66 - if *jetService == "" { 67 - logger.Fatalf("jetstream mode requires -jetstream flag") 68 - } 69 - r = &remote.JetstreamRemote{Service: *jetService} 70 - default: 71 - logger.Fatalf("unknown input mode: %s", *inputMode) 72 - } 73 - 74 - // Create store based on output mode 75 - var s store.Store 76 - switch *outputMode { 77 - case "passthrough": 78 - s = &store.StdoutStore{Mode: store.StdoutStoreModePassthrough} 79 - case "stats": 80 - s = &store.StdoutStore{Mode: store.StdoutStoreModeStats} 81 - case "tarfiles": 82 - s = store.NewTarfilesStore(*outputDir) 83 - case "duckdb": 84 - s = store.NewDuckdbStore(*dbPath) 85 - default: 86 - logger.Fatalf("unknown output mode: %s", *outputMode) 11 + app := &cli.App{ 12 + Name: "butterfly", 13 + Usage: "AT Protocol data sync and discovery tool", 14 + Description: `Butterfly is a flexible tool for syncing and discovering AT Protocol data from various sources. 15 + It supports fetching data from CAR files, PDS instances, relays, and Jetstream, and can output 16 + to different storage backends.`, 17 + Commands: []*cli.Command{ 18 + syncCommand, 19 + discoverCommand, 20 + }, 21 + Version: "0.0.1", 22 + Authors: []*cli.Author{ 23 + { 24 + Name: "Bluesky", 25 + Email: "support@bsky.social", 26 + }, 27 + }, 87 28 } 88 29 89 - // Create context 90 - ctx := context.Background() 91 - 92 - // Initialize store 93 - if err := s.Setup(ctx); err != nil { 94 - logger.Fatalf("failed to setup store: %v", err) 95 - } 96 - defer func() { 97 - if err := s.Close(); err != nil { 98 - logger.Printf("failed to close store: %v", err) 99 - } 100 - }() 101 - 102 - // Handle different input modes 103 - switch *inputMode { 104 - case "carfile", "pds": 105 - // These modes fetch a specific repository 106 - stream, err := r.FetchRepo(ctx, remote.FetchRepoParams{Did: *did}) 107 - if err != nil { 108 - logger.Fatalf("failed to fetch repo: %v", err) 109 - } 110 - defer stream.Close() 111 - 112 - // Process the stream 113 - if err := s.BackfillRepo(ctx, *did, stream); err != nil { 114 - logger.Fatalf("failed to process stream: %v", err) 115 - } 116 - 117 - case "relay", "jetstream": 118 - // These modes subscribe to record streams 119 - params := remote.SubscribeRecordsParams{} 120 - if *did != "" { 121 - params.Dids = []string{*did} 122 - } 123 - 124 - stream, err := r.SubscribeRecords(ctx, params) 125 - if err != nil { 126 - logger.Fatalf("failed to subscribe to records: %v", err) 127 - } 128 - defer stream.Close() 129 - 130 - // Process the stream continuously 131 - if err := s.ActiveSync(ctx, stream); err != nil { 132 - logger.Fatalf("failed to process stream: %v", err) 133 - } 30 + if err := app.Run(os.Args); err != nil { 31 + log.Fatal(err) 134 32 } 135 33 }