this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update collectiondir to cli/v3

+69 -69
+33 -33
cmd/collectiondir/collectiondir.go
··· 4 4 "bufio" 5 5 "bytes" 6 6 "compress/gzip" 7 + "context" 7 8 "encoding/csv" 8 9 "encoding/json" 9 10 "errors" ··· 18 19 "time" 19 20 20 21 "github.com/earthboundkid/versioninfo/v2" 21 - "github.com/urfave/cli/v2" 22 + "github.com/urfave/cli/v3" 22 23 ) 23 24 24 25 func main() { 25 - app := cli.App{ 26 + app := cli.Command{ 26 27 Name: "collectiondir", 27 28 Usage: "collection directory service", 28 29 Version: versioninfo.Short(), ··· 40 41 adminCrawlCmd, 41 42 }, 42 43 } 43 - err := app.Run(os.Args) 44 - if err != nil { 44 + if err := app.Run(context.Background(), os.Args); err != nil { 45 45 fmt.Fprintf(os.Stderr, "%s\n", err.Error()) 46 46 os.Exit(1) 47 47 } ··· 57 57 Required: true, 58 58 }, 59 59 }, 60 - Action: func(cctx *cli.Context) error { 60 + Action: func(ctx context.Context, cmd *cli.Command) error { 61 61 logLevel := slog.LevelInfo 62 - if cctx.Bool("verbose") { 62 + if cmd.Bool("verbose") { 63 63 logLevel = slog.LevelDebug 64 64 } 65 65 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 66 66 slog.SetDefault(log) 67 - pebblePath := cctx.String("pebble") 67 + pebblePath := cmd.String("pebble") 68 68 var db PebbleCollectionDirectory 69 69 db.log = log 70 70 err := db.Open(pebblePath) ··· 94 94 Required: true, 95 95 }, 96 96 }, 97 - Action: func(cctx *cli.Context) error { 97 + Action: func(ctx context.Context, cmd *cli.Command) error { 98 98 logLevel := slog.LevelInfo 99 - if cctx.Bool("verbose") { 99 + if cmd.Bool("verbose") { 100 100 logLevel = slog.LevelDebug 101 101 } 102 102 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 103 103 slog.SetDefault(log) 104 - pebblePath := cctx.String("pebble") 104 + pebblePath := cmd.String("pebble") 105 105 var db PebbleCollectionDirectory 106 106 db.log = log 107 107 err := db.Open(pebblePath) ··· 112 112 113 113 rows := make(chan CollectionDidTime, 100) 114 114 go func() { 115 - err := db.ReadAllPrimary(cctx.Context, rows) 115 + err := db.ReadAllPrimary(ctx, rows) 116 116 if err != nil { 117 117 log.Error("db read", "path", pebblePath, "err", err) 118 118 } ··· 153 153 Required: true, 154 154 }, 155 155 }, 156 - Action: func(cctx *cli.Context) error { 156 + Action: func(ctx context.Context, cmd *cli.Command) error { 157 157 logLevel := slog.LevelInfo 158 - if cctx.Bool("verbose") { 158 + if cmd.Bool("verbose") { 159 159 logLevel = slog.LevelDebug 160 160 } 161 161 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 162 162 slog.SetDefault(log) 163 - pebblePath := cctx.String("pebble") 163 + pebblePath := cmd.String("pebble") 164 164 var db PebbleCollectionDirectory 165 165 db.log = log 166 166 err := db.Open(pebblePath) ··· 168 168 return err 169 169 } 170 170 defer db.Close() 171 - csvPath := cctx.String("csv") 171 + csvPath := cmd.String("csv") 172 172 var fin io.Reader 173 173 if csvPath == "-" { 174 174 fin = os.Stdin ··· 231 231 Name: "url", 232 232 Usage: "host:port of collectiondir server", 233 233 Required: true, 234 - EnvVars: []string{"COLLECTIONDIR_URL"}, 234 + Sources: cli.EnvVars("COLLECTIONDIR_URL"), 235 235 }, 236 236 &cli.StringFlag{ 237 237 Name: "auth", 238 238 Usage: "Auth token for admin api", 239 - EnvVars: []string{"ADMIN_AUTH"}, 239 + Sources: cli.EnvVars("ADMIN_AUTH"), 240 240 }, 241 241 }, 242 - Action: func(cctx *cli.Context) error { 242 + Action: func(ctx context.Context, cmd *cli.Command) error { 243 243 logLevel := slog.LevelInfo 244 - if cctx.Bool("verbose") { 244 + if cmd.Bool("verbose") { 245 245 logLevel = slog.LevelDebug 246 246 } 247 247 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) ··· 249 249 250 250 var hostList []string 251 251 252 - serverUrl, err := url.Parse(cctx.String("url")) 252 + serverUrl, err := url.Parse(cmd.String("url")) 253 253 if err != nil { 254 254 var e2 error 255 255 // try to fixup a bare host:port which can confuse url.Parse 256 - serverUrl, e2 = url.Parse("http://" + cctx.String("url")) 256 + serverUrl, e2 = url.Parse("http://" + cmd.String("url")) 257 257 if e2 != nil { 258 258 return fmt.Errorf("could not parse url, %w", err) 259 259 } 260 260 } 261 261 requestCrawlUrl := serverUrl.JoinPath("/admin/pds/requestCrawl") 262 262 263 - if cctx.IsSet("list") { 264 - fin, err := os.Open(cctx.String("list")) 263 + if cmd.IsSet("list") { 264 + fin, err := os.Open(cmd.String("list")) 265 265 if err != nil { 266 - return fmt.Errorf("%s: could not open, %w", cctx.String("list"), err) 266 + return fmt.Errorf("%s: could not open, %w", cmd.String("list"), err) 267 267 } 268 268 defer fin.Close() 269 269 bufin := bufio.NewScanner(fin) ··· 272 272 } 273 273 err = bufin.Err() 274 274 if err != nil { 275 - return fmt.Errorf("%s: error reading, %w", cctx.String("list"), err) 275 + return fmt.Errorf("%s: error reading, %w", cmd.String("list"), err) 276 276 } 277 - } else if cctx.IsSet("csv") { 278 - fin, err := os.Open(cctx.String("csv")) 277 + } else if cmd.IsSet("csv") { 278 + fin, err := os.Open(cmd.String("csv")) 279 279 if err != nil { 280 - return fmt.Errorf("%s: could not open, %w", cctx.String("csv"), err) 280 + return fmt.Errorf("%s: could not open, %w", cmd.String("csv"), err) 281 281 } 282 282 defer fin.Close() 283 283 data, err := csv.NewReader(fin).ReadAll() 284 284 if err != nil { 285 - return fmt.Errorf("%s: could not read, %w", cctx.String("csv"), err) 285 + return fmt.Errorf("%s: could not read, %w", cmd.String("csv"), err) 286 286 } 287 287 if len(data) < 2 { 288 - return fmt.Errorf("%s: empty CSV file", cctx.String("csv")) 288 + return fmt.Errorf("%s: empty CSV file", cmd.String("csv")) 289 289 } 290 290 headerRow := data[0] 291 291 hostCol := -1 ··· 297 297 } 298 298 } 299 299 if hostCol < 0 { 300 - return fmt.Errorf("%s: header missing 'host' or 'hostname'", cctx.String("csv")) 300 + return fmt.Errorf("%s: header missing 'host' or 'hostname'", cmd.String("csv")) 301 301 } 302 302 for _, row := range data[1:] { 303 303 hostList = append(hostList, row[hostCol]) ··· 310 310 311 311 client := http.Client{Timeout: 1 * time.Second} 312 312 var headers http.Header = make(http.Header) 313 - if cctx.IsSet("auth") { 314 - headers.Add("Authorization", "Bearer "+cctx.String("auth")) 313 + if cmd.IsSet("auth") { 314 + headers.Add("Authorization", "Bearer "+cmd.String("auth")) 315 315 } 316 316 headers.Add("Content-Type", "application/json") 317 317 var response *http.Response
+8 -8
cmd/collectiondir/crawl.go
··· 15 15 "github.com/bluesky-social/indigo/util" 16 16 "github.com/bluesky-social/indigo/xrpc" 17 17 18 - "github.com/urfave/cli/v2" 18 + "github.com/urfave/cli/v3" 19 19 "golang.org/x/time/rate" 20 20 ) 21 21 ··· 55 55 &cli.StringFlag{ 56 56 Name: "ratelimit-header", 57 57 Usage: "secret for friend PDSes", 58 - EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 58 + Sources: cli.EnvVars("BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"), 59 59 }, 60 60 }, 61 - Action: func(cctx *cli.Context) error { 61 + Action: func(ctx context.Context, cmd *cli.Command) error { 62 62 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) 63 63 ctx, cancel := context.WithCancel(context.Background()) 64 64 defer cancel() 65 - hostname := cctx.String("host") 65 + hostname := cmd.String("host") 66 66 hosturl, err := url.Parse(hostname) 67 67 if err != nil { 68 68 hosturl = new(url.URL) ··· 73 73 Host: hosturl.String(), 74 74 Client: util.RobustHTTPClient(), 75 75 } 76 - if cctx.IsSet("ratelimit-header") { 76 + if cmd.IsSet("ratelimit-header") { 77 77 rpcClient.Headers = map[string]string{ 78 - "x-ratelimit-bypass": cctx.String("ratelimit-header"), 78 + "x-ratelimit-bypass": cmd.String("ratelimit-header"), 79 79 } 80 80 } 81 81 log.Info("will crawl", "url", rpcClient.Host) 82 - csvOutPath := cctx.String("csv-out") 82 + csvOutPath := cmd.String("csv-out") 83 83 var fout io.Writer = os.Stdout 84 84 if csvOutPath != "" { 85 85 if csvOutPath == "-" { ··· 91 91 } 92 92 } 93 93 } 94 - qps := cctx.Float64("qps") 94 + qps := cmd.Float64("qps") 95 95 results := make(chan DidCollection, 100) 96 96 defer close(results) 97 97 go DidCollectionsToCsv(fout, results)
+28 -28
cmd/collectiondir/serve.go
··· 33 33 "github.com/labstack/echo/v4" 34 34 "github.com/labstack/echo/v4/middleware" 35 35 "github.com/prometheus/client_golang/prometheus/promhttp" 36 - "github.com/urfave/cli/v2" 36 + "github.com/urfave/cli/v3" 37 37 ) 38 38 39 39 var serveCmd = &cli.Command{ ··· 42 42 &cli.StringFlag{ 43 43 Name: "api-listen", 44 44 Value: ":2510", 45 - EnvVars: []string{"COLLECTIONS_API_LISTEN"}, 45 + Sources: cli.EnvVars("COLLECTIONS_API_LISTEN"), 46 46 }, 47 47 &cli.StringFlag{ 48 48 Name: "metrics-listen", 49 49 Value: ":2511", 50 - EnvVars: []string{"COLLECTIONS_METRICS_LISTEN"}, 50 + Sources: cli.EnvVars("COLLECTIONS_METRICS_LISTEN"), 51 51 }, 52 52 &cli.StringFlag{ 53 53 Name: "pebble", ··· 62 62 &cli.StringFlag{ 63 63 Name: "upstream", 64 64 Usage: "URL, e.g. wss://bsky.network", 65 - EnvVars: []string{"COLLECTIONS_UPSTREAM"}, 65 + Sources: cli.EnvVars("COLLECTIONS_UPSTREAM"), 66 66 }, 67 67 &cli.StringFlag{ 68 68 Name: "admin-token", 69 69 Usage: "admin authentication", 70 - EnvVars: []string{"COLLECTIONS_ADMIN_TOKEN"}, 70 + Sources: cli.EnvVars("COLLECTIONS_ADMIN_TOKEN"), 71 71 }, 72 72 &cli.Float64Flag{ 73 73 Name: "crawl-qps", ··· 77 77 &cli.StringFlag{ 78 78 Name: "ratelimit-header", 79 79 Usage: "secret for friend PDSes", 80 - EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 80 + Sources: cli.EnvVars("BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"), 81 81 }, 82 82 &cli.Uint64Flag{ 83 83 Name: "clist-min-dids", 84 84 Usage: "filter collection list to >= N dids", 85 85 Value: 5, 86 - EnvVars: []string{"COLLECTIONS_CLIST_MIN_DIDS"}, 86 + Sources: cli.EnvVars("COLLECTIONS_CLIST_MIN_DIDS"), 87 87 }, 88 88 &cli.IntFlag{ 89 89 Name: "max-did-collections", 90 90 Usage: "stop recording new collections per did after it has >= this many collections", 91 91 Value: 1000, 92 - EnvVars: []string{"COLLECTIONS_MAX_DID_COLLECTIONS"}, 92 + Sources: cli.EnvVars("COLLECTIONS_MAX_DID_COLLECTIONS"), 93 93 }, 94 94 &cli.StringFlag{ 95 95 Name: "sets-json-path", 96 96 Usage: "file path of JSON file containing static word sets", 97 - EnvVars: []string{"HEPA_SETS_JSON_PATH", "COLLECTIONS_SETS_JSON_PATH"}, 97 + Sources: cli.EnvVars("HEPA_SETS_JSON_PATH", "COLLECTIONS_SETS_JSON_PATH"), 98 98 }, 99 99 &cli.BoolFlag{ 100 100 Name: "verbose", 101 101 }, 102 102 }, 103 - Action: func(cctx *cli.Context) error { 103 + Action: func(ctx context.Context, cmd *cli.Command) error { 104 104 var server collectionServer 105 - return server.run(cctx) 105 + return server.run(ctx, cmd) 106 106 }, 107 107 } 108 108 ··· 165 165 stats *CrawlStats 166 166 } 167 167 168 - func (cs *collectionServer) run(cctx *cli.Context) error { 168 + func (cs *collectionServer) run(ctx context.Context, cmd *cli.Command) error { 169 169 signals := make(chan os.Signal, 1) 170 170 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 171 171 cs.shutdown = make(chan struct{}) 172 172 level := slog.LevelInfo 173 - if cctx.Bool("verbose") { 173 + if cmd.Bool("verbose") { 174 174 level = slog.LevelDebug 175 175 } 176 176 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})) 177 177 slog.SetDefault(log) 178 178 179 - if cctx.IsSet("ratelimit-header") { 180 - cs.ratelimitHeader = cctx.String("ratelimit-header") 179 + if cmd.IsSet("ratelimit-header") { 180 + cs.ratelimitHeader = cmd.String("ratelimit-header") 181 181 } 182 - if cctx.IsSet("sets-json-path") { 183 - badwords, err := loadBadwords(cctx.String("sets-json-path")) 182 + if cmd.IsSet("sets-json-path") { 183 + badwords, err := loadBadwords(cmd.String("sets-json-path")) 184 184 if err != nil { 185 185 return err 186 186 } 187 187 cs.badwords = badwords 188 188 } 189 - cs.MinDidsForCollectionList = cctx.Uint64("clist-min-dids") 190 - cs.MaxDidCollections = cctx.Int("max-did-collections") 189 + cs.MinDidsForCollectionList = cmd.Uint64("clist-min-dids") 190 + cs.MaxDidCollections = cmd.Int("max-did-collections") 191 191 cs.ingestFirehose = make(chan DidCollection, 1000) 192 192 cs.ingestCrawl = make(chan DidCollection, 1000) 193 193 var err error ··· 196 196 return fmt.Errorf("lru init, %w", err) 197 197 } 198 198 cs.log = log 199 - cs.ctx = cctx.Context 200 - cs.AdminToken = cctx.String("admin-token") 199 + cs.ctx = ctx 200 + cs.AdminToken = cmd.String("admin-token") 201 201 cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken 202 202 cs.wg.Add(1) 203 203 go cs.ingestReceiver() 204 - pebblePath := cctx.String("pebble") 204 + pebblePath := cmd.String("pebble") 205 205 cs.pcd = &PebbleCollectionDirectory{ 206 206 log: cs.log, 207 207 } ··· 209 209 if err != nil { 210 210 return fmt.Errorf("%s: failed to open pebble db: %w", pebblePath, err) 211 211 } 212 - cs.dauDirectoryDir = cctx.String("dau-directory") 212 + cs.dauDirectoryDir = cmd.String("dau-directory") 213 213 if cs.dauDirectoryDir != "" { 214 214 err := cs.openDau() 215 215 if err != nil { ··· 218 218 } 219 219 cs.statsCacheFresh.L = &cs.statsCacheLock 220 220 221 - apiServerEcho, err := cs.createApiServer(cctx.Context, cctx.String("api-listen")) 221 + apiServerEcho, err := cs.createApiServer(ctx, cmd.String("api-listen")) 222 222 if err != nil { 223 223 return err 224 224 } 225 225 cs.wg.Add(1) 226 - go func() { cs.StartApiServer(cctx.Context, apiServerEcho) }() 226 + go func() { cs.StartApiServer(ctx, apiServerEcho) }() 227 227 228 - cs.createMetricsServer(cctx.String("metrics-listen")) 228 + cs.createMetricsServer(cmd.String("metrics-listen")) 229 229 cs.wg.Add(1) 230 - go func() { cs.StartMetricsServer(cctx.Context) }() 230 + go func() { cs.StartMetricsServer(ctx) }() 231 231 232 - upstream := cctx.String("upstream") 232 + upstream := cmd.String("upstream") 233 233 if upstream != "" { 234 234 fh := Firehose{ 235 235 Log: log,