this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update relay to cli/v3

+74 -74
+51 -51
cmd/relay/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "crypto/rand" 5 6 "encoding/base64" 6 7 "fmt" ··· 23 24 "github.com/bluesky-social/indigo/util/cliutil" 24 25 25 26 "github.com/earthboundkid/versioninfo/v2" 26 - "github.com/urfave/cli/v2" 27 + "github.com/urfave/cli/v3" 27 28 "gorm.io/plugin/opentelemetry/tracing" 28 29 ) 29 30 ··· 36 37 37 38 func run(args []string) error { 38 39 39 - app := cli.App{ 40 + app := cli.Command{ 40 41 Name: "relay", 41 42 Usage: "atproto relay daemon", 42 43 Version: versioninfo.Short(), ··· 45 46 &cli.StringSliceFlag{ 46 47 Name: "admin-password", 47 48 Usage: "secret password/token for accessing admin endpoints (multiple values allowed)", 48 - EnvVars: []string{"RELAY_ADMIN_PASSWORD", "RELAY_ADMIN_KEY"}, 49 + Sources: cli.EnvVars("RELAY_ADMIN_PASSWORD", "RELAY_ADMIN_KEY"), 49 50 }, 50 51 &cli.StringFlag{ 51 52 Name: "plc-host", 52 53 Usage: "method, hostname, and port of PLC registry", 53 54 Value: "https://plc.directory", 54 - EnvVars: []string{"RELAY_PLC_HOST", "ATP_PLC_HOST"}, 55 + Sources: cli.EnvVars("RELAY_PLC_HOST", "ATP_PLC_HOST"), 55 56 }, 56 57 &cli.StringFlag{ 57 58 Name: "log-level", 58 59 Usage: "log verbosity level (eg: warn, info, debug)", 59 - EnvVars: []string{"RELAY_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 60 + Sources: cli.EnvVars("RELAY_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"), 60 61 }, 61 62 } 62 63 app.Commands = []*cli.Command{ ··· 69 70 Name: "db-url", 70 71 Usage: "database connection string for relay database", 71 72 Value: "sqlite://data/relay/relay.sqlite", 72 - EnvVars: []string{"DATABASE_URL"}, 73 + Sources: cli.EnvVars("DATABASE_URL"), 73 74 }, 74 75 &cli.IntFlag{ 75 76 Name: "max-db-conn", 76 77 Usage: "limit on size of database connection pool", 77 - EnvVars: []string{"MAX_DB_CONNECTIONS", "MAX_METADB_CONNECTIONS"}, 78 + Sources: cli.EnvVars("MAX_DB_CONNECTIONS", "MAX_METADB_CONNECTIONS"), 78 79 Value: 40, 79 80 }, 80 81 &cli.StringFlag{ 81 82 Name: "bind", 82 83 Usage: "IP or address, and port, to listen on for HTTP APIs (including firehose)", 83 84 Value: ":2470", 84 - EnvVars: []string{"RELAY_API_BIND", "RELAY_API_LISTEN"}, 85 + Sources: cli.EnvVars("RELAY_API_BIND", "RELAY_API_LISTEN"), 85 86 }, 86 87 &cli.StringFlag{ 87 88 Name: "persist-dir", 88 89 Usage: "local folder to store firehose playback files", 89 90 Value: "data/relay/persist", 90 - EnvVars: []string{"RELAY_PERSIST_DIR", "RELAY_PERSISTER_DIR"}, 91 + Sources: cli.EnvVars("RELAY_PERSIST_DIR", "RELAY_PERSISTER_DIR"), 91 92 }, 92 93 &cli.DurationFlag{ 93 94 Name: "replay-window", 94 95 Usage: "retention duration for firehose playback", 95 - EnvVars: []string{"RELAY_REPLAY_WINDOW", "RELAY_EVENT_PLAYBACK_TTL"}, 96 + Sources: cli.EnvVars("RELAY_REPLAY_WINDOW", "RELAY_EVENT_PLAYBACK_TTL"), 96 97 Value: 72 * time.Hour, 97 98 }, 98 99 &cli.IntFlag{ 99 100 Name: "host-concurrency", 100 101 Usage: "number of concurrent worker routines per upstream host", 101 - EnvVars: []string{"RELAY_HOST_CONCURRENCY", "RELAY_CONCURRENCY_PER_PDS"}, 102 + Sources: cli.EnvVars("RELAY_HOST_CONCURRENCY", "RELAY_CONCURRENCY_PER_PDS"), 102 103 Value: 40, 103 104 }, 104 105 &cli.IntFlag{ 105 106 Name: "default-account-limit", 106 107 Value: 100, 107 108 Usage: "max number of active accounts for new upstream hosts", 108 - EnvVars: []string{"RELAY_DEFAULT_ACCOUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"}, 109 + Sources: cli.EnvVars("RELAY_DEFAULT_ACCOUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"), 109 110 }, 110 111 &cli.IntFlag{ 111 112 Name: "new-hosts-per-day-limit", 112 113 Value: 50, 113 114 Usage: "max number of new upstream hosts subscribed per day via public requestCrawl", 114 - EnvVars: []string{"RELAY_NEW_HOSTS_PER_DAY_LIMIT"}, 115 + Sources: cli.EnvVars("RELAY_NEW_HOSTS_PER_DAY_LIMIT"), 115 116 }, 116 117 &cli.IntFlag{ 117 118 Name: "ident-cache-size", 118 119 Value: 5_000_000, 119 120 Usage: "size of in-process identity cache (eg, DID docs)", 120 - EnvVars: []string{"RELAY_IDENT_CACHE_SIZE", "RELAY_DID_CACHE_SIZE"}, 121 + Sources: cli.EnvVars("RELAY_IDENT_CACHE_SIZE", "RELAY_DID_CACHE_SIZE"), 121 122 }, 122 123 &cli.BoolFlag{ 123 124 Name: "disable-request-crawl", 124 125 Usage: "don't process public (un-authenticated) com.atproto.sync.requestCrawl", 125 - EnvVars: []string{"RELAY_DISABLE_REQUEST_CRAWL"}, 126 + Sources: cli.EnvVars("RELAY_DISABLE_REQUEST_CRAWL"), 126 127 }, 127 128 &cli.BoolFlag{ 128 129 Name: "allow-insecure-hosts", 129 130 Usage: "enables subscription to non-SSL hosts via requestCrawl", 130 - EnvVars: []string{"RELAY_ALLOW_INSECURE_HOSTS"}, 131 + Sources: cli.EnvVars("RELAY_ALLOW_INSECURE_HOSTS"), 131 132 }, 132 133 &cli.BoolFlag{ 133 134 Name: "lenient-sync-validation", 134 135 Usage: "when messages fail atproto 'Sync 1.1' validation, just log, don't drop", 135 - EnvVars: []string{"RELAY_LENIENT_SYNC_VALIDATION"}, 136 + Sources: cli.EnvVars("RELAY_LENIENT_SYNC_VALIDATION"), 136 137 }, 137 138 &cli.IntFlag{ 138 139 Name: "initial-seq-number", 139 140 Usage: "when initializing output firehose, start with this sequence number", 140 141 Value: 1, 141 - EnvVars: []string{"RELAY_INITIAL_SEQ_NUMBER"}, 142 + Sources: cli.EnvVars("RELAY_INITIAL_SEQ_NUMBER"), 142 143 }, 143 144 &cli.StringSliceFlag{ 144 145 Name: "sibling-relays", 145 146 Usage: "servers (eg https://example.com) to forward admin state changes to; multiple allowed", 146 - EnvVars: []string{"RELAY_SIBLING_RELAYS"}, 147 + Sources: cli.EnvVars("RELAY_SIBLING_RELAYS"), 147 148 }, 148 149 &cli.StringSliceFlag{ 149 150 Name: "trusted-domains", 150 151 Usage: "domain names which mark trusted hosts; use wildcard prefix to match suffixes", 151 - Value: cli.NewStringSlice("*.host.bsky.network"), 152 - EnvVars: []string{"RELAY_TRUSTED_DOMAINS"}, 152 + Value: []string{"*.host.bsky.network"}, 153 + Sources: cli.EnvVars("RELAY_TRUSTED_DOMAINS"), 153 154 }, 154 155 &cli.StringFlag{ 155 156 Name: "env", 156 157 Value: "dev", 157 - EnvVars: []string{"ENVIRONMENT"}, 158 + Sources: cli.EnvVars("ENVIRONMENT"), 158 159 Usage: "declared hosting environment (prod, qa, etc); used in metrics", 159 160 }, 160 161 &cli.BoolFlag{ ··· 170 171 Name: "metrics-listen", 171 172 Usage: "IP or address, and port, to listen on for prometheus metrics", 172 173 Value: ":2471", 173 - EnvVars: []string{"RELAY_METRICS_LISTEN"}, 174 + Sources: cli.EnvVars("RELAY_METRICS_LISTEN"), 174 175 }, 175 176 &cli.StringFlag{ 176 177 Name: "otel-exporter-otlp-endpoint", 177 178 Value: "http://localhost:4328", 178 - EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"}, 179 + Sources: cli.EnvVars("OTEL_EXPORTER_OTLP_ENDPOINT"), 179 180 }, 180 181 }, 181 182 }, 182 183 // additional commands defined in pull.go 183 184 cmdPullHosts, 184 185 } 185 - return app.Run(os.Args) 186 + return app.Run(context.Background(), args) 186 187 187 188 } 188 189 189 - func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 190 + func configLogger(cmd *cli.Command, writer io.Writer) *slog.Logger { 190 191 var level slog.Level 191 - switch strings.ToLower(cctx.String("log-level")) { 192 + switch strings.ToLower(cmd.String("log-level")) { 192 193 case "error": 193 194 level = slog.LevelError 194 195 case "warn": ··· 207 208 return logger 208 209 } 209 210 210 - func runRelay(cctx *cli.Context) error { 211 - ctx := cctx.Context 212 - logger := configLogger(cctx, os.Stdout) 211 + func runRelay(ctx context.Context, cmd *cli.Command) error { 212 + logger := configLogger(cmd, os.Stdout) 213 213 214 214 // Trap SIGINT to trigger a shutdown. 215 215 signals := make(chan os.Signal, 1) 216 216 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 217 217 218 - dburl := cctx.String("db-url") 219 - maxConn := cctx.Int("max-db-conn") 218 + dburl := cmd.String("db-url") 219 + maxConn := cmd.Int("max-db-conn") 220 220 logger.Info("configuring database", "url", dburl, "maxConn", maxConn) 221 221 db, err := cliutil.SetupDatabase(dburl, maxConn) 222 222 if err != nil { ··· 228 228 SkipHandleVerification: true, 229 229 SkipDNSDomainSuffixes: []string{".bsky.social"}, 230 230 TryAuthoritativeDNS: true, 231 - PLCURL: cctx.String("plc-host"), 231 + PLCURL: cmd.String("plc-host"), 232 232 } 233 - dir := identity.NewCacheDirectory(&baseDir, cctx.Int("ident-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5) 233 + dir := identity.NewCacheDirectory(&baseDir, cmd.Int("ident-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5) 234 234 235 - persistDir := cctx.String("persist-dir") 235 + persistDir := cmd.String("persist-dir") 236 236 if err := os.MkdirAll(persistDir, os.ModePerm); err != nil { 237 237 return err 238 238 } 239 239 persitConfig := diskpersist.DefaultDiskPersistOptions() 240 - persitConfig.Retention = cctx.Duration("replay-window") 241 - persitConfig.InitialSeq = cctx.Int64("initial-seq-number") 240 + persitConfig.Retention = cmd.Duration("replay-window") 241 + persitConfig.InitialSeq = cmd.Int64("initial-seq-number") 242 242 logger.Info("setting up disk persister", "dir", persistDir, "replayWindow", persitConfig.Retention) 243 243 persister, err := diskpersist.NewDiskPersistence(persistDir, "", db, persitConfig) 244 244 if err != nil { ··· 247 247 248 248 relayConfig := relay.DefaultRelayConfig() 249 249 relayConfig.UserAgent = fmt.Sprintf("indigo-relay/%s (atproto-relay)", versioninfo.Short()) 250 - relayConfig.ConcurrencyPerHost = cctx.Int("host-concurrency") 251 - relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit") 252 - relayConfig.HostPerDayLimit = cctx.Int64("new-hosts-per-day-limit") 253 - relayConfig.TrustedDomains = cctx.StringSlice("trusted-domains") 254 - relayConfig.LenientSyncValidation = cctx.Bool("lenient-sync-validation") 250 + relayConfig.ConcurrencyPerHost = cmd.Int("host-concurrency") 251 + relayConfig.DefaultRepoLimit = cmd.Int64("default-account-limit") 252 + relayConfig.HostPerDayLimit = cmd.Int64("new-hosts-per-day-limit") 253 + relayConfig.TrustedDomains = cmd.StringSlice("trusted-domains") 254 + relayConfig.LenientSyncValidation = cmd.Bool("lenient-sync-validation") 255 255 256 256 svcConfig := DefaultServiceConfig() 257 - svcConfig.AllowInsecureHosts = cctx.Bool("allow-insecure-hosts") 258 - svcConfig.DisableRequestCrawl = cctx.Bool("disable-request-crawl") 259 - svcConfig.SiblingRelayHosts = cctx.StringSlice("sibling-relays") 257 + svcConfig.AllowInsecureHosts = cmd.Bool("allow-insecure-hosts") 258 + svcConfig.DisableRequestCrawl = cmd.Bool("disable-request-crawl") 259 + svcConfig.SiblingRelayHosts = cmd.StringSlice("sibling-relays") 260 260 if len(svcConfig.SiblingRelayHosts) > 0 { 261 261 logger.Info("sibling relay hosts configured for admin state forwarding", "servers", svcConfig.SiblingRelayHosts) 262 262 } 263 - if cctx.IsSet("admin-password") { 264 - svcConfig.AdminPasswords = cctx.StringSlice("admin-password") 263 + if cmd.IsSet("admin-password") { 264 + svcConfig.AdminPasswords = cmd.StringSlice("admin-password") 265 265 } else { 266 266 var rblob [10]byte 267 267 _, _ = rand.Read(rblob[:]) ··· 285 285 286 286 // start metrics endpoint 287 287 go func() { 288 - if err := svc.StartMetrics(cctx.String("metrics-listen")); err != nil { 288 + if err := svc.StartMetrics(cmd.String("metrics-listen")); err != nil { 289 289 logger.Error("failed to start metrics endpoint", "err", err) 290 290 os.Exit(1) 291 291 } 292 292 }() 293 293 294 294 // start observability/tracing (OTEL and jaeger) 295 - if err := setupOTEL(cctx); err != nil { 295 + if err := setupOTEL(cmd); err != nil { 296 296 return err 297 297 } 298 - if cctx.Bool("enable-db-tracing") { 298 + if cmd.Bool("enable-db-tracing") { 299 299 if err := db.Use(tracing.NewPlugin()); err != nil { 300 300 return err 301 301 } ··· 308 308 309 309 svcErr := make(chan error, 1) 310 310 go func() { 311 - err := svc.StartAPI(cctx.String("bind")) 311 + err := svc.StartAPI(cmd.String("bind")) 312 312 svcErr <- err 313 313 }() 314 314
+6 -6
cmd/relay/otel.go
··· 6 6 "os" 7 7 "time" 8 8 9 - "github.com/urfave/cli/v2" 9 + "github.com/urfave/cli/v3" 10 10 "go.opentelemetry.io/otel" 11 11 "go.opentelemetry.io/otel/attribute" 12 12 "go.opentelemetry.io/otel/exporters/jaeger" ··· 16 16 semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 17 17 ) 18 18 19 - func setupOTEL(cctx *cli.Context) error { 19 + func setupOTEL(cmd *cli.Command) error { 20 20 21 - env := cctx.String("env") 21 + env := cmd.String("env") 22 22 if env == "" { 23 23 env = "dev" 24 24 } 25 25 26 - if cctx.Bool("enable-jaeger-tracing") { 26 + if cmd.Bool("enable-jaeger-tracing") { 27 27 jaegerUrl := "http://localhost:14268/api/traces" 28 28 exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) 29 29 if err != nil { ··· 50 50 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 51 51 // At a minimum, you need to set 52 52 // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 53 - if cctx.Bool("enable-otel-tracing") { 54 - ep := cctx.String("otel-exporter-otlp-endpoint") 53 + if cmd.Bool("enable-otel-tracing") { 54 + ep := cmd.String("otel-exporter-otlp-endpoint") 55 55 slog.Info("setting up trace exporter", "endpoint", ep) 56 56 ctx, cancel := context.WithCancel(context.Background()) 57 57 defer cancel()
+17 -17
cmd/relay/pull.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "errors" 5 6 "fmt" 6 7 ··· 11 12 "github.com/bluesky-social/indigo/util/cliutil" 12 13 "github.com/bluesky-social/indigo/xrpc" 13 14 14 - "github.com/urfave/cli/v2" 15 + "github.com/urfave/cli/v3" 15 16 ) 16 17 17 18 var cmdPullHosts = &cli.Command{ ··· 23 24 Name: "relay-host", 24 25 Usage: "method, hostname, and port of relay to pull from", 25 26 Value: "https://bsky.network", 26 - EnvVars: []string{"RELAY_HOST"}, 27 + Sources: cli.EnvVars("RELAY_HOST"), 27 28 }, 28 29 &cli.StringFlag{ 29 30 Name: "db-url", 30 31 Usage: "database connection string for relay database", 31 32 Value: "sqlite://data/relay/relay.sqlite", 32 - EnvVars: []string{"DATABASE_URL"}, 33 + Sources: cli.EnvVars("DATABASE_URL"), 33 34 }, 34 35 &cli.IntFlag{ 35 36 Name: "default-account-limit", 36 37 Value: 100, 37 38 Usage: "max number of active accounts for new upstream hosts", 38 - EnvVars: []string{"RELAY_DEFAULT_ACCOUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"}, 39 + Sources: cli.EnvVars("RELAY_DEFAULT_ACCOUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"), 39 40 }, 40 41 &cli.IntFlag{ 41 42 Name: "batch-size", 42 43 Value: 500, 43 44 Usage: "host many hosts to pull at a time", 44 - EnvVars: []string{"RELAY_PULL_HOSTS_BATCH_SIZE"}, 45 + Sources: cli.EnvVars("RELAY_PULL_HOSTS_BATCH_SIZE"), 45 46 }, 46 47 &cli.StringSliceFlag{ 47 48 Name: "trusted-domains", 48 49 Usage: "domain names which mark trusted hosts; use wildcard prefix to match suffixes", 49 - Value: cli.NewStringSlice("*.host.bsky.network"), 50 - EnvVars: []string{"RELAY_TRUSTED_DOMAINS"}, 50 + Value: []string{"*.host.bsky.network"}, 51 + Sources: cli.EnvVars("RELAY_TRUSTED_DOMAINS"), 51 52 }, 52 53 &cli.BoolFlag{ 53 54 Name: "skip-host-checks", 54 55 Usage: "don't run describeServer requests to see if host is a PDS before adding", 55 - EnvVars: []string{"RELAY_SKIP_HOST_CHECKS"}, 56 + Sources: cli.EnvVars("RELAY_SKIP_HOST_CHECKS"), 56 57 }, 57 58 }, 58 59 } 59 60 60 - func runPullHosts(cctx *cli.Context) error { 61 - ctx := cctx.Context 61 + func runPullHosts(ctx context.Context, cmd *cli.Command) error { 62 62 63 - if cctx.Args().Len() > 0 { 63 + if cmd.Args().Len() > 0 { 64 64 return fmt.Errorf("unexpected arguments") 65 65 } 66 66 67 67 client := xrpc.Client{ 68 - Host: cctx.String("relay-host"), 68 + Host: cmd.String("relay-host"), 69 69 } 70 70 71 - skipHostChecks := cctx.Bool("skip-host-checks") 71 + skipHostChecks := cmd.Bool("skip-host-checks") 72 72 73 73 dir := identity.DefaultDirectory() 74 74 75 - dburl := cctx.String("db-url") 75 + dburl := cmd.String("db-url") 76 76 db, err := cliutil.SetupDatabase(dburl, 10) 77 77 if err != nil { 78 78 return err 79 79 } 80 80 81 81 relayConfig := relay.DefaultRelayConfig() 82 - relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit") 83 - relayConfig.TrustedDomains = cctx.StringSlice("trusted-domains") 82 + relayConfig.DefaultRepoLimit = cmd.Int64("default-account-limit") 83 + relayConfig.TrustedDomains = cmd.StringSlice("trusted-domains") 84 84 85 85 // NOTE: setting evtmgr to nil 86 86 r, err := relay.NewRelay(db, nil, dir, relayConfig) ··· 91 91 checker := relay.NewHostClient(relayConfig.UserAgent) 92 92 93 93 cursor := "" 94 - size := cctx.Int64("batch-size") 94 + size := cmd.Int64("batch-size") 95 95 for { 96 96 resp, err := comatproto.SyncListHosts(ctx, &client, cursor, size) 97 97 if err != nil {