this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

pass over service setup

+270 -295
+47 -15
cmd/relayered/handlers.go
··· 26 26 } 27 27 28 28 if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 29 - if s.ssl { 29 + if s.relay.Config.SSL { 30 30 host = "https://" + host 31 31 } else { 32 32 host = "http://" + host ··· 38 38 return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 39 39 } 40 40 41 - if u.Scheme == "http" && s.ssl { 41 + if u.Scheme == "http" && s.relay.Config.SSL { 42 42 return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 43 43 } 44 44 45 - if u.Scheme == "https" && !s.ssl { 45 + if u.Scheme == "https" && !s.relay.Config.SSL { 46 46 return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") 47 47 } 48 48 ··· 61 61 return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned") 62 62 } 63 63 64 - s.log.Warn("TODO: better host validation for crawl requests") 64 + s.logger.Warn("TODO: better host validation for crawl requests") 65 65 66 66 clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 67 67 ··· 79 79 // Maybe we could do something with this response later 80 80 _ = desc 81 81 82 - if len(s.nextCrawlers) != 0 { 82 + if len(s.config.NextCrawlers) != 0 { 83 83 blob, err := json.Marshal(body) 84 84 if err != nil { 85 - s.log.Warn("could not forward requestCrawl, json err", "err", err) 85 + s.logger.Warn("could not forward requestCrawl, json err", "err", err) 86 86 } else { 87 87 go func(bodyBlob []byte) { 88 - for _, rpu := range s.nextCrawlers { 88 + for _, rpu := range s.config.NextCrawlers { 89 89 pu := rpu.JoinPath("/xrpc/com.atproto.sync.requestCrawl") 90 - response, err := s.httpClient.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 90 + response, err := s.crawlForwardClient.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 91 91 if response != nil && response.Body != nil { 92 92 response.Body.Close() 93 93 } 94 94 if err != nil || response == nil { 95 - s.log.Warn("requestCrawl forward failed", "host", rpu, "err", err) 95 + s.logger.Warn("requestCrawl forward failed", "host", rpu, "err", err) 96 96 } else if response.StatusCode != http.StatusOK { 97 - s.log.Warn("requestCrawl forward failed", "host", rpu, "status", response.Status) 97 + s.logger.Warn("requestCrawl forward failed", "host", rpu, "status", response.Status) 98 98 } else { 99 - s.log.Info("requestCrawl forward successful", "host", rpu) 99 + s.logger.Info("requestCrawl forward successful", "host", rpu) 100 100 } 101 101 } 102 102 }(blob) ··· 112 112 if err == gorm.ErrRecordNotFound { 113 113 return &comatproto.SyncListRepos_Output{}, nil 114 114 } 115 - s.log.Error("failed to query accounts", "err", err) 115 + s.logger.Error("failed to query accounts", "err", err) 116 116 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query accounts") 117 117 } 118 118 ··· 133 133 134 134 root, err := s.relay.GetRepoRoot(ctx, user.ID) 135 135 if err != nil { 136 - s.log.Error("failed to get repo root", "err", err, "did", user.Did) 136 + s.logger.Error("failed to get repo root", "err", err, "did", user.Did) 137 137 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error())) 138 138 } 139 139 ··· 181 181 prevState, err := s.relay.GetAccountPreviousState(ctx, u.ID) 182 182 if err != nil { 183 183 if errors.Is(err, gorm.ErrRecordNotFound) { 184 - return nil, relay.ErrUserStatusUnavailable 184 + return nil, relay.ErrAccountLastUnavailable 185 185 } 186 - s.log.Error("user db err", "err", err) 186 + s.logger.Error("user db err", "err", err) 187 187 return nil, fmt.Errorf("user prev db err, %w", err) 188 188 } 189 189 ··· 192 192 Rev: prevState.Rev, 193 193 }, nil 194 194 } 195 + 196 + type HealthStatus struct { 197 + Status string `json:"status"` 198 + Message string `json:"msg,omitempty"` 199 + } 200 + 201 + func (svc *Service) HandleHealthCheck(c echo.Context) error { 202 + if err := svc.relay.Healthcheck(); err != nil { 203 + svc.logger.Error("healthcheck can't connect to database", "err", err) 204 + return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"}) 205 + } else { 206 + return c.JSON(200, HealthStatus{Status: "ok"}) 207 + } 208 + } 209 + 210 + var homeMessage string = ` 211 + .########..########.##..........###....##....## 212 + .##.....##.##.......##.........##.##....##..##. 213 + .##.....##.##.......##........##...##....####.. 214 + .########..######...##.......##.....##....##... 215 + .##...##...##.......##.......#########....##... 216 + .##....##..##.......##.......##.....##....##... 217 + .##.....##.########.########.##.....##....##... 218 + 219 + This is an atproto [https://atproto.com] relay instance, running the 'relayered' codebase [https://github.com/bluesky-social/indigo] 220 + 221 + The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 222 + ` 223 + 224 + func (svc *Service) HandleHomeMessage(c echo.Context) error { 225 + return c.String(http.StatusOK, homeMessage) 226 + }
+3 -3
cmd/relayered/handlers_admin.go
··· 444 444 } 445 445 446 446 if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 447 - if svc.ssl { 447 + if svc.relay.Config.SSL { 448 448 host = "https://" + host 449 449 } else { 450 450 host = "http://" + host ··· 456 456 return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 457 457 } 458 458 459 - if u.Scheme == "http" && svc.ssl { 459 + if u.Scheme == "http" && svc.relay.Config.SSL { 460 460 return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 461 461 } 462 462 463 - if u.Scheme == "https" && !svc.ssl { 463 + if u.Scheme == "https" && !svc.relay.Config.SSL { 464 464 return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") 465 465 } 466 466
+151 -136
cmd/relayered/main.go
··· 4 4 "crypto/rand" 5 5 "encoding/base64" 6 6 "fmt" 7 + "io" 7 8 "log/slog" 8 9 "net/url" 9 10 "os" 10 11 "os/signal" 11 - "path/filepath" 12 12 "strings" 13 13 "syscall" 14 14 "time" ··· 21 21 "github.com/bluesky-social/indigo/cmd/relayered/relay" 22 22 "github.com/bluesky-social/indigo/cmd/relayered/relay/validator" 23 23 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 24 - "github.com/bluesky-social/indigo/cmd/relayered/stream/persist" 25 24 "github.com/bluesky-social/indigo/cmd/relayered/stream/persist/diskpersist" 26 25 "github.com/bluesky-social/indigo/util" 27 26 "github.com/bluesky-social/indigo/util/cliutil" ··· 34 33 35 34 func main() { 36 35 if err := run(os.Args); err != nil { 37 - slog.Error(err.Error()) 38 - os.Exit(1) 36 + slog.Error("exiting process", "err", err.Error()) 37 + os.Exit(-1) 39 38 } 40 39 } 41 40 42 41 func run(args []string) error { 43 42 44 43 app := cli.App{ 45 - Name: "relay", 46 - Usage: "atproto Relay daemon", 44 + Name: "relayered", 45 + Usage: "atproto relay daemon", 47 46 Version: versioninfo.Short(), 47 + Action: runRelay, 48 48 } 49 - 50 49 app.Flags = []cli.Flag{ 51 - &cli.BoolFlag{ 52 - Name: "jaeger", 53 - }, 54 50 &cli.StringFlag{ 55 51 Name: "db-url", 56 52 Usage: "database connection string for relay database", 57 - Value: "sqlite://./data/relay/relay.sqlite", 53 + Value: "sqlite://data/relay/relay.sqlite", 58 54 EnvVars: []string{"DATABASE_URL"}, 59 55 }, 60 - &cli.BoolFlag{ 61 - Name: "db-tracing", 56 + &cli.IntFlag{ 57 + Name: "max-db-conn", 58 + Usage: "limit on size of database connection pool", 59 + EnvVars: []string{"MAX_DB_CONNECTIONS", "MAX_METADB_CONNECTIONS"}, 60 + Value: 40, 62 61 }, 63 62 &cli.StringFlag{ 63 + Name: "bind", 64 + Usage: "IP or address, and port, to listen on for HTTP APIs (including firehose)", 65 + Value: ":2470", 66 + EnvVars: []string{"RELAY_API_BIND", "RELAY_API_LISTEN"}, 67 + }, 68 + &cli.StringFlag{ 69 + Name: "persist-dir", 70 + Usage: "local folder to store firehose playback files", 71 + Value: "data/relay/events", 72 + EnvVars: []string{"RELAY_PERSIST_DIR", "RELAY_PERSISTER_DIR"}, 73 + }, 74 + &cli.DurationFlag{ 75 + Name: "replay-window", 76 + Usage: "retention duration for firehose playback", 77 + EnvVars: []string{"RELAY_REPLAY_WINDOW", "RELAY_EVENT_PLAYBACK_TTL"}, 78 + Value: 72 * time.Hour, 79 + }, 80 + // XXX: actually disabled if empty? 81 + &cli.StringFlag{ 82 + Name: "admin-password", 83 + Usage: "secret password/token for accessing admin endpoints (random is used if not set)", 84 + EnvVars: []string{"RELAY_ADMIN_PASSWORD", "RELAY_ADMIN_KEY"}, 85 + }, 86 + &cli.IntFlag{ 87 + Name: "host-concurrency", 88 + Usage: "number of concurrent worker routines per upstream host", 89 + EnvVars: []string{"RELAY_HOST_CONCURRENCY", "RELAY_CONCURRENCY_PER_PDS"}, 90 + Value: 100, 91 + }, 92 + &cli.IntFlag{ 93 + Name: "max-queue-per-host", 94 + Value: 1_000, 95 + Usage: "size of in-process DID (identity) cache", 96 + EnvVars: []string{"RELAY_MAX_QUEUE_PER_HOST", "RELAY_MAX_QUEUE_PER_PDS"}, 97 + }, 98 + &cli.IntFlag{ 99 + Name: "default-account-limit", 100 + Value: 100, 101 + Usage: "max number of active accounts for new upstream hosts", 102 + EnvVars: []string{"RELAY_DEFAULT_ACCOUUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"}, 103 + }, 104 + &cli.IntFlag{ 105 + Name: "did-cache-size", 106 + Value: 5_000_000, 107 + Usage: "size of in-process DID (identity) cache", 108 + EnvVars: []string{"RELAY_DID_CACHE_SIZE"}, 109 + }, 110 + // XXX: not used? 111 + &cli.StringFlag{ 64 112 Name: "plc-host", 65 113 Usage: "method, hostname, and port of PLC registry", 66 114 Value: "https://plc.directory", 67 115 EnvVars: []string{"ATP_PLC_HOST"}, 68 116 }, 117 + // XXX: refactor this flag 69 118 &cli.BoolFlag{ 70 119 Name: "crawl-insecure-ws", 71 120 Usage: "when connecting to PDS instances, use ws:// instead of wss://", 72 121 }, 73 - &cli.StringFlag{ 74 - Name: "api-listen", 75 - Value: ":2470", 76 - EnvVars: []string{"RELAY_API_LISTEN"}, 77 - }, 78 - &cli.StringFlag{ 79 - Name: "metrics-listen", 80 - Value: ":2471", 81 - EnvVars: []string{"RELAY_METRICS_LISTEN"}, 122 + &cli.StringSliceFlag{ 123 + Name: "forward-crawl-requests", 124 + Usage: "comma-separated list of servers (eg https://example.com) to forward requestCrawl on to", 125 + EnvVars: []string{"RELAY_FORWARD_CRAWL_REQUESTS", "RELAY_NEXT_CRAWLER"}, 82 126 }, 83 127 &cli.StringFlag{ 84 - Name: "disk-persister-dir", 85 - Usage: "set directory for disk persister (implicitly enables disk persister)", 86 - EnvVars: []string{"RELAY_PERSISTER_DIR"}, 128 + Name: "bsky-social-rate-limit-skip", 129 + EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP"}, 130 + Usage: "ratelimit bypass secret token for *.bsky.social domains", 87 131 }, 88 132 &cli.StringFlag{ 89 - Name: "admin-key", 90 - EnvVars: []string{"RELAY_ADMIN_KEY"}, 91 - }, 92 - &cli.IntFlag{ 93 - Name: "max-metadb-connections", 94 - EnvVars: []string{"MAX_METADB_CONNECTIONS"}, 95 - Value: 40, 133 + Name: "log-level", 134 + Usage: "log verbosity level (eg: warn, info, debug)", 135 + EnvVars: []string{"BLUEPAGES_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 96 136 }, 97 137 &cli.StringFlag{ 98 138 Name: "env", ··· 100 140 EnvVars: []string{"ENVIRONMENT"}, 101 141 Usage: "declared hosting environment (prod, qa, etc); used in metrics", 102 142 }, 103 - &cli.StringFlag{ 104 - Name: "otel-exporter-otlp-endpoint", 105 - EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"}, 106 - }, 107 - &cli.StringFlag{ 108 - Name: "bsky-social-rate-limit-skip", 109 - EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP"}, 110 - Usage: "ratelimit bypass secret token for *.bsky.social domains", 143 + &cli.BoolFlag{ 144 + Name: "enable-db-tracing", 111 145 }, 112 - &cli.IntFlag{ 113 - Name: "default-repo-limit", 114 - Value: 100, 115 - EnvVars: []string{"RELAY_DEFAULT_REPO_LIMIT"}, 146 + &cli.BoolFlag{ 147 + Name: "enable-jaeger-tracing", 116 148 }, 117 - &cli.IntFlag{ 118 - Name: "concurrency-per-pds", 119 - EnvVars: []string{"RELAY_CONCURRENCY_PER_PDS"}, 120 - Value: 100, 149 + &cli.BoolFlag{ 150 + Name: "enable-otel-tracing", 121 151 }, 122 - &cli.IntFlag{ 123 - Name: "max-queue-per-pds", 124 - EnvVars: []string{"RELAY_MAX_QUEUE_PER_PDS"}, 125 - Value: 1_000, 152 + &cli.StringFlag{ 153 + Name: "metrics-listen", 154 + Usage: "IP or address, and port, to listen on for prometheus metrics", 155 + Value: ":2471", 156 + EnvVars: []string{"RELAY_METRICS_LISTEN"}, 126 157 }, 127 - &cli.IntFlag{ 128 - Name: "did-cache-size", 129 - Usage: "in-process cache by number of Did documents", 130 - EnvVars: []string{"RELAY_DID_CACHE_SIZE"}, 131 - Value: 5_000_000, 132 - }, 133 - &cli.DurationFlag{ 134 - Name: "event-playback-ttl", 135 - Usage: "time to live for event playback buffering (only applies to disk persister)", 136 - EnvVars: []string{"RELAY_EVENT_PLAYBACK_TTL"}, 137 - Value: 72 * time.Hour, 138 - }, 139 - &cli.StringSliceFlag{ 140 - Name: "next-crawler", 141 - Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", 142 - EnvVars: []string{"RELAY_NEXT_CRAWLER"}, 143 - }, 144 - &cli.BoolFlag{ 145 - Name: "time-seq", 146 - EnvVars: []string{"RELAY_TIME_SEQUENCE"}, 147 - Value: false, 148 - Usage: "make outbound firehose sequence number approximately unix microseconds", 158 + &cli.StringFlag{ 159 + Name: "otel-exporter-otlp-endpoint", 160 + Value: "http://localhost:4328", 161 + EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"}, 149 162 }, 150 163 } 151 - 152 - app.Action = runRelay 153 164 return app.Run(os.Args) 165 + 166 + } 167 + 168 + func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 169 + var level slog.Level 170 + switch strings.ToLower(cctx.String("log-level")) { 171 + case "error": 172 + level = slog.LevelError 173 + case "warn": 174 + level = slog.LevelWarn 175 + case "info": 176 + level = slog.LevelInfo 177 + case "debug": 178 + level = slog.LevelDebug 179 + default: 180 + level = slog.LevelInfo 181 + } 182 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 183 + Level: level, 184 + })) 185 + slog.SetDefault(logger) 186 + return logger 154 187 } 155 188 156 189 func runRelay(cctx *cli.Context) error { 190 + logger := configLogger(cctx, os.Stdout) 191 + 157 192 // Trap SIGINT to trigger a shutdown. 158 193 signals := make(chan os.Signal, 1) 159 194 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 160 195 161 - logger, logWriter, err := cliutil.SetupSlog(cliutil.LogOptions{}) 162 - if err != nil { 163 - return err 164 - } 165 - 166 - // start observability/tracing (OTEL and jaeger) 167 - if err := setupOTEL(cctx); err != nil { 168 - return err 169 - } 170 - 171 196 dburl := cctx.String("db-url") 172 - logger.Info("setting up main database", "url", dburl) 173 - db, err := cliutil.SetupDatabase(dburl, cctx.Int("max-metadb-connections")) 197 + maxConn := cctx.Int("max-db-conn") 198 + logger.Info("configuring database", "url", dburl, "maxConn", maxConn) 199 + db, err := cliutil.SetupDatabase(dburl, maxConn) 174 200 if err != nil { 175 201 return err 176 - } 177 - if cctx.Bool("db-tracing") { 178 - if err := db.Use(tracing.NewPlugin()); err != nil { 179 - return err 180 - } 181 202 } 182 203 183 204 // TODO: add shared external cache ··· 186 207 SkipDNSDomainSuffixes: []string{".bsky.social"}, 187 208 TryAuthoritativeDNS: true, 188 209 } 189 - cacheDir := identity.NewCacheDirectory(&baseDir, cctx.Int("did-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5) 210 + dir := identity.NewCacheDirectory(&baseDir, cctx.Int("did-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5) 190 211 191 - vldtr := validator.NewValidator(&cacheDir) 192 - 193 - var persister persist.EventPersistence 194 - 195 - dpd := cctx.String("disk-persister-dir") 196 - if dpd == "" { 197 - logger.Info("empty disk-persister-dir, use current working directory") 198 - cwd, err := os.Getwd() 199 - if err != nil { 200 - return err 201 - } 202 - dpd = filepath.Join(cwd, "relay-persist") 203 - } 204 - logger.Info("setting up disk persister", "dir", dpd) 205 - 212 + persistDir := cctx.String("persist-dir") 213 + os.MkdirAll(persistDir, os.ModePerm) 206 214 pOpts := diskpersist.DefaultDiskPersistOptions() 207 - pOpts.Retention = cctx.Duration("event-playback-ttl") 208 - pOpts.TimeSequence = cctx.Bool("time-seq") 209 - 210 - dp, err := diskpersist.NewDiskPersistence(dpd, "", db, pOpts) 215 + pOpts.Retention = cctx.Duration("replay-window") 216 + logger.Info("setting up disk persister", "dir", persistDir, "replayWindow", pOpts.Retention) 217 + persister, err := diskpersist.NewDiskPersistence(persistDir, "", db, pOpts) 211 218 if err != nil { 212 219 return fmt.Errorf("setting up disk persister: %w", err) 213 220 } 214 - persister = dp 215 221 216 - evtman := eventmgr.NewEventManager(persister) 217 - 218 - ratelimitBypass := cctx.String("bsky-social-rate-limit-skip") 219 - 220 - logger.Info("constructing relay service") 221 222 svcConfig := DefaultServiceConfig() 222 223 relayConfig := relay.DefaultRelayConfig() 223 224 relayConfig.SSL = !cctx.Bool("crawl-insecure-ws") 224 - relayConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") 225 - relayConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds") 226 - relayConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") 225 + relayConfig.ConcurrencyPerPDS = cctx.Int64("host-concurrency") 226 + relayConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-host") 227 + relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit") 228 + ratelimitBypass := cctx.String("bsky-social-rate-limit-skip") 227 229 relayConfig.ApplyPDSClientSettings = makePdsClientSetup(ratelimitBypass) 228 - nextCrawlers := cctx.StringSlice("next-crawler") 229 - if len(nextCrawlers) != 0 { 230 + nextCrawlers := cctx.StringSlice("forward-crawl-requests") 231 + if len(nextCrawlers) > 0 { 230 232 nextCrawlerUrls := make([]*url.URL, len(nextCrawlers)) 231 233 for i, tu := range nextCrawlers { 232 234 var err error 233 235 nextCrawlerUrls[i], err = url.Parse(tu) 234 236 if err != nil { 235 - return fmt.Errorf("failed to parse next-crawler url: %w", err) 237 + return fmt.Errorf("invalid crawl request forwarding URL: %w", err) 236 238 } 237 - logger.Info("configuring relay for requestCrawl", "host", nextCrawlerUrls[i]) 238 239 } 239 240 svcConfig.NextCrawlers = nextCrawlerUrls 241 + logger.Info("crawl request forwarding enabled", "servers", svcConfig.NextCrawlers) 240 242 } 241 - if cctx.IsSet("admin-key") { 242 - svcConfig.AdminToken = cctx.String("admin-key") 243 + if cctx.IsSet("admin-password") { 244 + svcConfig.AdminPassword = cctx.String("admin-password") 243 245 } else { 244 246 var rblob [10]byte 245 247 _, _ = rand.Read(rblob[:]) 246 - svcConfig.AdminToken = base64.URLEncoding.EncodeToString(rblob[:]) 247 - logger.Info("generated random admin key", "header", "Authorization: Bearer "+svcConfig.AdminToken) 248 + svcConfig.AdminPassword = base64.URLEncoding.EncodeToString(rblob[:]) 249 + logger.Info("generated random admin password", "username", "admin", "password", svcConfig.AdminPassword) 248 250 } 249 251 250 - r, err := relay.NewRelay(db, vldtr, evtman, &cacheDir, relayConfig) 252 + evtman := eventmgr.NewEventManager(persister) 253 + vldtr := validator.NewValidator(&dir) 254 + 255 + logger.Info("constructing relay service") 256 + r, err := relay.NewRelay(db, vldtr, evtman, &dir, relayConfig) 251 257 if err != nil { 252 258 return err 253 259 } 254 - 255 - svc, err := NewService(db, r, &cacheDir, svcConfig) 260 + svc, err := NewService(db, r, svcConfig) 256 261 if err != nil { 257 262 return err 258 263 } 259 - dp.SetUidSource(r) 264 + persister.SetUidSource(r) 260 265 261 - // set up metrics endpoint 266 + // start metrics endpoint 262 267 go func() { 263 268 if err := svc.StartMetrics(cctx.String("metrics-listen")); err != nil { 264 269 logger.Error("failed to start metrics endpoint", "err", err) ··· 266 271 } 267 272 }() 268 273 274 + // start observability/tracing (OTEL and jaeger) 275 + if err := setupOTEL(cctx); err != nil { 276 + return err 277 + } 278 + if cctx.Bool("enable-db-tracing") { 279 + if err := db.Use(tracing.NewPlugin()); err != nil { 280 + return err 281 + } 282 + } 283 + 269 284 svcErr := make(chan error, 1) 270 285 271 286 go func() { 272 - err := svc.Start(cctx.String("api-listen"), logWriter) 287 + err := svc.StartAPI(cctx.String("bind")) 273 288 svcErr <- err 274 289 }() 275 290
+4 -2
cmd/relayered/otel.go
··· 22 22 if env == "" { 23 23 env = "dev" 24 24 } 25 - if cctx.Bool("jaeger") { 25 + 26 + if cctx.Bool("enable-jaeger-tracing") { 26 27 jaegerUrl := "http://localhost:14268/api/traces" 27 28 exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) 28 29 if err != nil { ··· 49 50 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 50 51 // At a minimum, you need to set 51 52 // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 52 - if ep := cctx.String("otel-exporter-otlp-endpoint"); ep != "" { 53 + if cctx.Bool("enable-otel-tracing") { 54 + ep := cctx.String("otel-exporter-otlp-endpoint") 53 55 slog.Info("setting up trace exporter", "endpoint", ep) 54 56 ctx, cancel := context.WithCancel(context.Background()) 55 57 defer cancel()
+9 -8
cmd/relayered/relay/account.go
··· 18 18 "gorm.io/gorm" 19 19 ) 20 20 21 - var ErrNotFound = errors.New("not found") 22 - var ErrUserStatusUnavailable = errors.New("user status unavailable") 21 + var ( 22 + ErrAccountNotFound = errors.New("account not found") 23 + ErrAccountLastUnavailable = errors.New("account last commit not available") 24 + ErrCommitNoUser = errors.New("commit no user") // TODO 25 + ) 23 26 24 27 func (r *Relay) DidToUid(ctx context.Context, did string) (models.Uid, error) { 25 28 xu, err := r.LookupUserByDid(ctx, did) ··· 27 30 return 0, err 28 31 } 29 32 if xu == nil { 30 - return 0, ErrNotFound 33 + return 0, ErrAccountNotFound 31 34 } 32 35 return xu.ID, nil 33 36 } ··· 83 86 return account, nil 84 87 } 85 88 86 - var ErrCommitNoUser = errors.New("commit no user") 87 - 88 89 // syncPDSAccount ensures that a DID has an account record in the database attached to a PDS record in the database 89 90 // Some fields may be updated if needed. 90 91 // did is the user ··· 202 203 if cachedAccount == nil { 203 204 cachedAccount, err = r.LookupUserByDid(ctx, did) 204 205 } 205 - if errors.Is(err, ErrNotFound) || errors.Is(err, gorm.ErrRecordNotFound) { 206 + if errors.Is(err, ErrAccountNotFound) || errors.Is(err, gorm.ErrRecordNotFound) { 206 207 err = nil 207 208 } 208 209 if err != nil { ··· 294 295 var prevState slurper.AccountPreviousState 295 296 if err := r.db.First(&prevState, uid).Error; err != nil { 296 297 if errors.Is(err, gorm.ErrRecordNotFound) { 297 - return nil, ErrUserStatusUnavailable 298 + return nil, ErrAccountLastUnavailable 298 299 } 299 300 r.Logger.Error("user db err", "err", err) 300 301 return nil, err ··· 308 309 if err == nil { 309 310 return prevState.Cid.CID, nil 310 311 } else if errors.Is(err, gorm.ErrRecordNotFound) { 311 - return cid.Cid{}, ErrUserStatusUnavailable 312 + return cid.Cid{}, ErrAccountLastUnavailable 312 313 } else { 313 314 r.Logger.Error("user db err", "err", err) 314 315 return cid.Cid{}, fmt.Errorf("user prev db err, %w", err)
+11 -23
cmd/relayered/relay/relay.go
··· 3 3 import ( 4 4 "log/slog" 5 5 "sync" 6 - "time" 7 6 8 7 "github.com/bluesky-social/indigo/atproto/identity" 9 8 "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" ··· 11 10 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 12 11 "github.com/bluesky-social/indigo/xrpc" 13 12 14 - lru "github.com/hashicorp/golang-lru/v2" 15 - promclient "github.com/prometheus/client_golang/prometheus" 13 + "github.com/hashicorp/golang-lru/v2" 16 14 "go.opentelemetry.io/otel" 17 15 "gorm.io/gorm" 18 16 ) ··· 20 18 var tracer = otel.Tracer("relay") 21 19 22 20 type Relay struct { 23 - db *gorm.DB 24 - dir identity.Directory 25 - 21 + db *gorm.DB 22 + dir identity.Directory 23 + Logger *slog.Logger 26 24 Slurper *slurper.Slurper 27 25 Events *eventmgr.EventManager 28 26 Validator *validator.Validator 29 - 30 - Config RelayConfig 27 + Config RelayConfig 31 28 32 29 // extUserLk serializes a section of syncPDSAccount() 33 30 // TODO: at some point we will want to lock specific DIDs, this lock as is ··· 41 38 42 39 // Account cache 43 40 userCache *lru.Cache[string, *slurper.Account] 44 - Logger *slog.Logger 45 41 } 46 42 47 43 type RelayConfig struct { ··· 67 63 config = DefaultRelayConfig() 68 64 } 69 65 70 - uc, _ := lru.New[string, *slurper.Account](1_000_000) 66 + uc, _ := lru.New[string, *slurper.Account](2_000_000) 71 67 72 68 r := &Relay{ 73 69 db: db, 70 + dir: dir, 71 + Logger: slog.Default().With("system", "relay"), 74 72 Events: evtman, 75 73 Validator: vldtr, 76 - dir: dir, 77 74 Config: *config, 78 75 79 76 consumersLk: sync.RWMutex{}, 80 77 consumers: make(map[uint64]*SocketConsumer), 81 78 82 79 userCache: uc, 80 + } 83 81 84 - Logger: slog.Default().With("system", "relay"), 82 + if err := r.MigrateDatabase(); err != nil { 83 + return nil, err 85 84 } 86 85 87 86 slOpts := slurper.DefaultSlurperOptions() ··· 99 98 if err := r.Slurper.RestartAll(); err != nil { 100 99 return nil, err 101 100 } 102 - 103 - if err := r.MigrateDatabase(); err != nil { 104 - return nil, err 105 - } 106 101 return r, nil 107 102 } 108 103 ··· 126 121 func (r *Relay) Healthcheck() error { 127 122 return r.db.Exec("SELECT 1").Error 128 123 } 129 - 130 - type SocketConsumer struct { 131 - UserAgent string 132 - RemoteAddr string 133 - ConnectedAt time.Time 134 - EventsSent promclient.Counter 135 - }
+8
cmd/relayered/relay/subscribe_repos.go
··· 12 12 13 13 "github.com/gorilla/websocket" 14 14 "github.com/labstack/echo/v4" 15 + promclient "github.com/prometheus/client_golang/prometheus" 15 16 dto "github.com/prometheus/client_model/go" 16 17 ) 18 + 19 + type SocketConsumer struct { 20 + UserAgent string 21 + RemoteAddr string 22 + ConnectedAt time.Time 23 + EventsSent promclient.Counter 24 + } 17 25 18 26 func (r *Relay) registerConsumer(c *SocketConsumer) uint64 { 19 27 r.consumersLk.Lock()
+34 -93
cmd/relayered/service.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "io" 5 + "encoding/base64" 6 6 "log/slog" 7 7 "net" 8 8 "net/http" ··· 10 10 "strings" 11 11 "time" 12 12 13 - "github.com/bluesky-social/indigo/atproto/identity" 14 13 "github.com/bluesky-social/indigo/cmd/relayered/relay" 15 14 16 15 "github.com/labstack/echo/v4" ··· 19 18 "gorm.io/gorm" 20 19 ) 21 20 22 - // serverListenerBootTimeout is how long to wait for the requested server socket 23 - // to become available for use. This is an arbitrary timeout that should be safe 24 - // on any platform, but there's no great way to weave this timeout without 25 - // adding another parameter to the (at time of writing) long signature of 26 - // NewServer. 27 - const serverListenerBootTimeout = 5 * time.Second 28 - 29 21 type Service struct { 30 - db *gorm.DB // XXX 31 - relay *relay.Relay 32 - dir identity.Directory 33 - 34 - // TODO: work on doing away with this flag in favor of more pluggable 35 - // pieces that abstract the need for explicit ssl checks 36 - ssl bool 37 - 38 - // nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl 39 - nextCrawlers []*url.URL 40 - httpClient http.Client 41 - 42 - log *slog.Logger 43 - 22 + db *gorm.DB // XXX 23 + logger *slog.Logger 24 + relay *relay.Relay 44 25 config ServiceConfig 26 + 27 + crawlForwardClient http.Client 45 28 } 46 29 47 30 type ServiceConfig struct { 48 31 // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl 49 32 NextCrawlers []*url.URL 50 33 51 - // AdminToken checked against "Authorization: Bearer {}" header 52 - AdminToken string 34 + // verified against Basic admin auth 35 + AdminPassword string 36 + 37 + // how long to wait for the requested server socket to become available for use 38 + ListenerBootTimeout time.Duration 53 39 } 54 40 55 41 func DefaultServiceConfig() *ServiceConfig { 56 - return &ServiceConfig{} 42 + return &ServiceConfig{ 43 + ListenerBootTimeout: 5 * time.Second, 44 + } 57 45 } 58 46 59 - func NewService(db *gorm.DB, r *relay.Relay, dir identity.Directory, config *ServiceConfig) (*Service, error) { 47 + func NewService(db *gorm.DB, r *relay.Relay, config *ServiceConfig) (*Service, error) { 60 48 61 49 if config == nil { 62 50 config = DefaultServiceConfig() 63 51 } 64 52 65 53 svc := &Service{ 66 - db: db, 67 - relay: r, 68 - dir: dir, 69 - ssl: r.Config.SSL, 70 - 71 - log: slog.Default().With("system", "relay"), 72 - 73 - config: *config, 54 + db: db, 55 + logger: slog.Default().With("system", "relay"), 56 + relay: r, 57 + config: *config, 58 + crawlForwardClient: http.Client{}, 74 59 } 75 - 76 - svc.nextCrawlers = config.NextCrawlers 77 - svc.httpClient.Timeout = time.Second * 5 60 + svc.crawlForwardClient.Timeout = time.Second * 5 78 61 79 62 return svc, nil 80 63 } ··· 84 67 return http.ListenAndServe(listen, nil) 85 68 } 86 69 87 - func (svc *Service) Start(addr string, logWriter io.Writer) error { 70 + func (svc *Service) StartAPI(bind string) error { 88 71 var lc net.ListenConfig 89 - ctx, cancel := context.WithTimeout(context.Background(), serverListenerBootTimeout) 72 + ctx, cancel := context.WithTimeout(context.Background(), svc.config.ListenerBootTimeout) 90 73 defer cancel() 91 74 92 - li, err := lc.Listen(ctx, "tcp", addr) 75 + li, err := lc.Listen(ctx, "tcp", bind) 93 76 if err != nil { 94 77 return err 95 78 } 96 - return svc.StartWithListener(li, logWriter) 79 + return svc.startWithListener(li) 97 80 } 98 81 99 - func (svc *Service) StartWithListener(listen net.Listener, logWriter io.Writer) error { 82 + func (svc *Service) startWithListener(listen net.Listener) error { 100 83 e := echo.New() 101 - e.Logger.SetOutput(logWriter) 102 84 e.HideBanner = true 103 85 104 86 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ ··· 106 88 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 107 89 })) 108 90 109 - if !svc.ssl { 91 + if !svc.relay.Config.SSL { 110 92 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 111 93 Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 112 94 })) ··· 128 110 if err2 := ctx.JSON(err.Code, map[string]any{ 129 111 "error": err.Message, 130 112 }); err2 != nil { 131 - svc.log.Error("Failed to write http error", "err", err2) 113 + svc.logger.Error("Failed to write http error", "err", err2) 132 114 } 133 115 default: 134 116 sendHeader := true ··· 136 118 sendHeader = false 137 119 } 138 120 139 - svc.log.Warn("HANDLER ERROR: (%s) %s", ctx.Path(), err) 121 + svc.logger.Warn("HANDLER ERROR: (%s) %s", ctx.Path(), err) 140 122 141 123 if strings.HasPrefix(ctx.Path(), "/admin/") { 142 124 ctx.JSON(500, map[string]any{ ··· 152 134 } 153 135 154 136 // TODO: this API is temporary until we formalize what we want here 137 + e.GET("/", svc.HandleHomeMessage) 138 + e.GET("/_health", svc.HandleHealthCheck) 139 + e.GET("/xrpc/_health", svc.HandleHealthCheck) 155 140 156 141 e.GET("/xrpc/com.atproto.sync.subscribeRepos", svc.relay.EventsHandler) 157 142 ··· 159 144 e.GET("/xrpc/com.atproto.sync.listRepos", svc.HandleComAtprotoSyncListRepos) 160 145 e.GET("/xrpc/com.atproto.sync.getRepo", svc.HandleComAtprotoSyncGetRepo) // just returns 3xx redirect to source PDS 161 146 e.GET("/xrpc/com.atproto.sync.getLatestCommit", svc.HandleComAtprotoSyncGetLatestCommit) 162 - e.GET("/xrpc/_health", svc.HandleHealthCheck) 163 - e.GET("/_health", svc.HandleHealthCheck) 164 - e.GET("/", svc.HandleHomeMessage) 165 147 166 148 admin := e.Group("/admin", svc.checkAdminAuth) 167 149 ··· 212 194 return errs 213 195 } 214 196 215 - type HealthStatus struct { 216 - Status string `json:"status"` 217 - Message string `json:"msg,omitempty"` 218 - } 219 - 220 - func (svc *Service) HandleHealthCheck(c echo.Context) error { 221 - if err := svc.relay.Healthcheck(); err != nil { 222 - svc.log.Error("healthcheck can't connect to database", "err", err) 223 - return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"}) 224 - } else { 225 - return c.JSON(200, HealthStatus{Status: "ok"}) 226 - } 227 - } 228 - 229 - var homeMessage string = ` 230 - .########..########.##..........###....##....## 231 - .##.....##.##.......##.........##.##....##..##. 232 - .##.....##.##.......##........##...##....####.. 233 - .########..######...##.......##.....##....##... 234 - .##...##...##.......##.......#########....##... 235 - .##....##..##.......##.......##.....##....##... 236 - .##.....##.########.########.##.....##....##... 237 - 238 - This is an atproto [https://atproto.com] relay instance, running the 'relay' codebase [https://github.com/bluesky-social/indigo] 239 - 240 - The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 241 - ` 242 - 243 - func (svc *Service) HandleHomeMessage(c echo.Context) error { 244 - return c.String(http.StatusOK, homeMessage) 245 - } 246 - 247 - const authorizationBearerPrefix = "Bearer " 248 - 249 197 func (svc *Service) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc { 198 + headerVal := "Basic " + base64.StdEncoding.EncodeToString([]byte("admin:"+svc.config.AdminPassword)) 250 199 return func(e echo.Context) error { 251 - authheader := e.Request().Header.Get("Authorization") 252 - if !strings.HasPrefix(authheader, authorizationBearerPrefix) { 200 + if svc.config.AdminPassword != headerVal { 253 201 return echo.ErrForbidden 254 202 } 255 - 256 - token := authheader[len(authorizationBearerPrefix):] 257 - 258 - if svc.config.AdminToken != token { 259 - return echo.ErrForbidden 260 - } 261 - 262 203 return next(e) 263 204 } 264 205 }
+2 -14
cmd/relayered/stream/persist/diskpersist/diskpersist.go
··· 40 40 41 41 eventCounter int64 42 42 curSeq int64 43 - timeSequence bool 44 43 45 44 uids UidSource 46 45 uidCache *arc.ARCCache[models.Uid, string] // TODO: unused ··· 86 85 Retention time.Duration 87 86 88 87 Logger *slog.Logger 89 - 90 - TimeSequence bool 91 88 } 92 89 93 90 func DefaultDiskPersistOptions() *DiskPersistOptions { ··· 147 144 outbuf: new(bytes.Buffer), 148 145 writeBufferSize: opts.WriteBufferSize, 149 146 shutdown: make(chan struct{}), 150 - timeSequence: opts.TimeSequence, 151 147 log: opts.Logger, 152 148 } 153 149 if dp.log == nil { ··· 198 194 return fmt.Errorf("failed to scan log file for last seqno: %w", err) 199 195 } 200 196 201 - dp.log.Info("loaded seq", "seq", seq, "now", time.Now().UnixMicro(), "time-seq", dp.timeSequence) 197 + dp.log.Info("loaded seq", "seq", seq, "now", time.Now().UnixMicro()) 202 198 203 199 dp.curSeq = seq + 1 204 200 dp.logfi = fi ··· 469 465 470 466 func (dp *DiskPersistence) doPersist(ctx context.Context, pjob persistJob) error { 471 467 seq := dp.curSeq 472 - if dp.timeSequence { 473 - seq = time.Now().UnixMicro() 474 - if seq < dp.curSeq { 475 - seq = dp.curSeq 476 - } 477 - dp.curSeq = seq + 1 478 - } else { 479 - dp.curSeq++ 480 - } 468 + dp.curSeq++ 481 469 482 470 // Set sequence number in event header 483 471 // the rest of the header is set in DiskPersistence.Persist()
+1 -1
cmd/relayered/stubs.go
··· 115 115 if errors.Is(err, gorm.ErrRecordNotFound) { 116 116 return c.JSON(http.StatusNotFound, XRPCError{Message: "NULL"}) 117 117 } 118 - s.log.Error("user.pds.host lookup", "err", err) 118 + s.logger.Error("user.pds.host lookup", "err", err) 119 119 return c.JSON(http.StatusInternalServerError, XRPCError{Message: "sorry"}) 120 120 } 121 121