this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

relay tidy (#991)

- removed a deadcode function
- removed lingering "bigsky" references"
- removed unused "MemPersister" (it is still in the top-level events
package)
- move settings (using gorm/db) and otel code out of main.go (makes
main.go shorter)
- group imports consistently
- replace some "BGS" terminology with "relay"
- update README

authored by

bnewbold and committed by
GitHub
0246af60 4bd8faf4

+182 -322
+3 -2
HACKING.md
··· 1 1 2 2 ## git repo contents 3 3 4 - Run with, eg, `go run ./cmd/bigsky`): 4 + Run with, eg, `go run ./cmd/relay`): 5 5 6 - - `cmd/bigsky`: Relay+indexer daemon 6 + - `cmd/bigsky`: relay daemon 7 + - `cmd/relay`: new (sync v1.1) relay daemon 7 8 - `cmd/palomar`: search indexer and query servcie (OpenSearch) 8 9 - `cmd/gosky`: client CLI for talking to a PDS 9 10 - `cmd/lexgen`: codegen tool for lexicons (Lexicon JSON to Go package)
+1
Makefile
··· 18 18 build: ## Build all executables 19 19 go build ./cmd/gosky 20 20 go build ./cmd/bigsky 21 + go build ./cmd/relay 21 22 go build ./cmd/beemo 22 23 go build ./cmd/lexgen 23 24 go build ./cmd/stress
+1 -1
README.md
··· 8 8 9 9 **Go Services:** 10 10 11 - - **bigsky** ([README](./cmd/bigsky/README.md)): "Big Graph Service" (BGS) reference implementation, running at `bsky.network` 11 + - **bigsky** ([README](./cmd/bigsky/README.md)): relay reference implementation, running at `bsky.network` 12 12 - **palomar** ([README](./cmd/palomar/README.md)): fulltext search service for <https://bsky.app> 13 13 - **hepa** ([README](./cmd/hepa/README.md)): auto-moderation bot for [Ozone](https://ozone.tools) 14 14
+16 -83
cmd/relay/README.md
··· 2 2 atproto Relay Service 3 3 =============================== 4 4 5 - *NOTE: "Relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"* 5 + *NOTE: "relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"* 6 6 7 - This is the implementation of an atproto Relay which is running in the production network, written and operated by Bluesky. 7 + This is the implementation of an atproto relay which is running in the production network, written and operated by Bluesky. 8 8 9 - In atproto, a Relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The Relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas. 9 + In atproto, a relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas. 10 10 11 - This Relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 50 million accounts in the network, and thousands of repo events per second (peak). 11 + This relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 100 million accounts in the network, and tens of thousands of repo events per second (peak). 12 12 13 13 Features and design decisions: 14 14 ··· 20 20 - observability: logging, prometheus metrics, OTEL traces 21 21 - admin web interface: configure limits, add upstream PDS instances, etc 22 22 23 - This software is not as packaged, documented, and supported for self-hosting as our PDS distribution or Ozone service. But it is relatively simple and inexpensive to get running. 23 + This software is not yet as packaged, documented, and supported for self-hosting as our PDS distribution or Ozone service. But it is relatively simple and inexpensive to get running. 24 24 25 - A note and reminder about Relays in general are that they are more of a convenience in the protocol than a hard requirement. The "firehose" API is the exact same on the PDS and on a Relay. Any service which subscribes to the Relay could instead connect to one or more PDS instances directly. 25 + A note and reminder about relays in general are that they are more of a convenience in the protocol than a hard requirement. The "firehose" API is the exact same on the PDS and on a relay. Any service which subscribes to the relay could instead connect to one or more PDS instances directly. 26 26 27 27 28 28 ## Development Tips 29 29 30 30 The README and Makefile at the top level of this git repo have some generic helpers for testing, linting, formatting code, etc. 31 31 32 - To re-build and run the Relay locally: 32 + To re-build and run the relay locally: 33 33 34 34 make run-dev-relay 35 35 ··· 37 37 38 38 RELAY_ADMIN_KEY=localdev go run ./cmd/relay/ --help 39 39 40 - By default, the daemon will use sqlite for databases (in the directory `./data/bigsky/`), CAR data will be stored as individual shard files in `./data/bigsky/carstore/`), and the HTTP API will be bound to localhost port 2470. 40 + By default, the daemon will use sqlite for databases (in the directory `./data/relay/`) and the HTTP API will be bound to localhost port 2470. 41 41 42 42 When the daemon isn't running, sqlite database files can be inspected with: 43 43 44 - sqlite3 data/bigsky/bgs.sqlite 44 + sqlite3 data/relay/relay.sqlite 45 45 [...] 46 46 sqlite> .schema 47 47 48 48 Wipe all local data: 49 49 50 50 # careful! double-check this destructive command 51 - rm -rf ./data/bigsky/* 51 + rm -rf ./data/relay/* 52 52 53 53 There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: <http://localhost:2470/dash/>. Paste in the admin key, eg `localdev`. 54 54 ··· 63 63 64 64 ## Docker Containers 65 65 66 - One way to deploy is running a docker image. You can pull and/or run a specific version of bigsky, referenced by git commit, from the Bluesky Github container registry. For example: 66 + One way to deploy is running a docker image. You can pull and/or run a specific version of relay, referenced by git commit, from the Bluesky Github container registry. For example: 67 67 68 68 docker pull ghcr.io/bluesky-social/indigo:relay-fd66f93ce1412a3678a1dd3e6d53320b725978a6 69 69 docker run ghcr.io/bluesky-social/indigo:relay-fd66f93ce1412a3678a1dd3e6d53320b725978a6 70 70 71 - There is a Dockerfile in this directory, which can be used to build customized/patched versions of the Relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. See docs and guides for docker and cluster management systems for details. 71 + There is a Dockerfile in this directory, which can be used to build customized/patched versions of the relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. See docs and guides for docker and cluster management systems for details. 72 72 73 73 74 74 ## Database Setup 75 75 76 - PostgreSQL and Sqlite are both supported. When using Sqlite, separate files are used for Relay metadata and CarStore metadata. With PostgreSQL a single database server, user, and logical database can all be reused: table names will not conflict. 77 - 78 - Database configuration is passed via the `DATABASE_URL` and `CARSTORE_DATABASE_URL` environment variables, or the corresponding CLI args. 76 + PostgreSQL and Sqlite are both supported. Database configuration is passed via the `DATABASE_URL` environment variable, or the corresponding CLI arg. 79 77 80 78 For PostgreSQL, the user and database must already be configured. Some example SQL commands are: 81 79 82 - CREATE DATABASE bgs; 83 - CREATE DATABASE carstore; 80 + CREATE DATABASE relay; 84 81 85 82 CREATE USER ${username} WITH PASSWORD '${password}'; 86 - GRANT ALL PRIVILEGES ON DATABASE bgs TO ${username}; 87 - GRANT ALL PRIVILEGES ON DATABASE carstore TO ${username}; 83 + GRANT ALL PRIVILEGES ON DATABASE relay TO ${username}; 88 84 89 85 This service currently uses `gorm` to automatically run database migrations as the regular user. There is no concept of running a separate set of migrations under more privileged database user. 90 86 91 87 92 88 ## Deployment 93 89 94 - *NOTE: this is not a complete guide to operating a Relay. There are decisions to be made and communicated about policies, bandwidth use, PDS crawling and rate-limits, financial sustainability, etc, which are not covered here. This is just a quick overview of how to technically get a relay up and running.* 90 + *NOTE: this is not a complete guide to operating a relay. There are decisions to be made and communicated about policies, bandwidth use, PDS crawling and rate-limits, financial sustainability, etc, which are not covered here. This is just a quick overview of how to technically get a relay up and running.* 95 91 96 92 In a real-world system, you will probably want to use PostgreSQL. 97 93 ··· 99 95 100 96 - `ENVIRONMENT`: eg, `production` 101 97 - `DATABASE_URL`: see section below 102 - - `DATA_DIR`: misc data will go in a subdirectory 103 98 - `GOLOG_LOG_LEVEL`: log verbosity 104 - - `RESOLVE_ADDRESS`: DNS server to use 105 - - `FORCE_DNS_UDP`: recommend "true" 106 99 107 100 There is a health check endpoint at `/xrpc/_health`. Prometheus metrics are exposed by default on port 2471, path `/metrics`. The service logs fairly verbosely to stderr; use `GOLOG_LOG_LEVEL` to control log volume. 108 - 109 - As a rough guideline for the compute resources needed to run a full-network Relay, in June 2024 an example Relay for over 5 million repositories used: 110 - 111 - - roughly 1 TByte of disk for PostgreSQL 112 - - roughly 1 TByte of disk for event playback buffer 113 - - roughly 5k disk I/O operations per second (all combined) 114 - - roughly 100% of one CPU core (quite low CPU utilization) 115 - - roughly 5GB of RAM for `relay`, and as much RAM as available for PostgreSQL and page cache 116 - - on the order of 1 megabit inbound bandwidth (crawling PDS instances) and 1 megabit outbound per connected client. 1 mbit continuous is approximately 350 GByte/month 117 101 118 102 Be sure to double-check bandwidth usage and pricing if running a public relay! Bandwidth prices can vary widely between providers, and popular cloud services (AWS, Google Cloud, Azure) are very expensive compared to alternatives like OVH or Hetzner. 119 103 ··· 202 186 203 187 POST `?did={did:...}` to reverse a repo take-down 204 188 205 - ### /admin/repo/compact 206 - 207 - POST `?did={did:...}` to compact a repo. Optionally `&fast=true`. HTTP blocks until the compaction finishes. 208 - 209 - ### /admin/repo/compactAll 210 - 211 - POST to begin compaction of all repos. Optional query params: 212 - 213 - * `fast=true` 214 - * `limit={int}` maximum number of repos to compact (biggest first) (default 50) 215 - * `threhsold={int}` minimum number of shard files a repo must have on disk to merit compaction (default 20) 216 - 217 - ### /admin/repo/reset 218 - 219 - POST `?did={did:...}` deletes all local data for the repo 220 - 221 - ### /admin/repo/verify 222 - 223 - POST `?did={did:...}` checks that all repo data is accessible. HTTP blocks until done. 224 - 225 189 ### /admin/pds/requestCrawl 226 190 227 191 POST `{"hostname":"pds host"}` to start crawling a PDS ··· 252 216 "CrawlRate": {"Max": float, "Window": float seconds}, 253 217 "UserCount": int, 254 218 }, ...] 255 - ``` 256 - 257 - ### /admin/pds/resync 258 - 259 - POST `?host={host}` to start a resync of a PDS 260 - 261 - GET `?host={host}` to get status of a PDS resync, return 262 - 263 - ```json 264 - {"resync": { 265 - "pds": { 266 - "Host": string, 267 - "Did": string, 268 - "SSL": bool, 269 - "Cursor": int, 270 - "Registered": bool, 271 - "Blocked": bool, 272 - "RateLimit": float, 273 - "CrawlRateLimit": float, 274 - "RepoCount": int, 275 - "RepoLimit": int, 276 - "HourlyEventLimit": int, 277 - "DailyEventLimit": int, 278 - }, 279 - "numRepoPages": int, 280 - "numRepos": int, 281 - "numReposChecked": int, 282 - "numReposToResync": int, 283 - "status": string, 284 - "statusChangedAt": time, 285 - }} 286 219 ``` 287 220 288 221 ### /admin/pds/changeLimits
+1 -10
cmd/relay/bgs/bgs.go
··· 21 21 comatproto "github.com/bluesky-social/indigo/api/atproto" 22 22 "github.com/bluesky-social/indigo/cmd/relay/events" 23 23 "github.com/bluesky-social/indigo/cmd/relay/models" 24 - lexutil "github.com/bluesky-social/indigo/lex/util" 25 24 "github.com/bluesky-social/indigo/xrpc" 26 25 27 26 "github.com/gorilla/websocket" ··· 330 329 .##....##..##.......##.......##.....##....##... 331 330 .##.....##.########.########.##.....##....##... 332 331 333 - This is an atproto [https://atproto.com] relay instance, running the 'bigsky' codebase [https://github.com/bluesky-social/indigo] 332 + This is an atproto [https://atproto.com] relay instance, running the 'relay' codebase [https://github.com/bluesky-social/indigo] 334 333 335 334 The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 336 335 ` ··· 711 710 } 712 711 713 712 return &u, nil 714 - } 715 - 716 - func stringLink(lnk *lexutil.LexLink) string { 717 - if lnk == nil { 718 - return "<nil>" 719 - } 720 - 721 - return lnk.String() 722 713 } 723 714 724 715 // handleFedEvent() is the callback passed to Slurper called from Slurper.handleConnection()
+2 -2
cmd/relay/events/cbor_gen.go
··· 8 8 "math" 9 9 "sort" 10 10 11 - cid "github.com/ipfs/go-cid" 11 + "github.com/ipfs/go-cid" 12 12 cbg "github.com/whyrusleeping/cbor-gen" 13 - xerrors "golang.org/x/xerrors" 13 + "golang.org/x/xerrors" 14 14 ) 15 15 16 16 var _ = xerrors.Errorf
+2 -2
cmd/relay/events/consumer.go
··· 8 8 "net" 9 9 "time" 10 10 11 - "github.com/RussellLuo/slidingwindow" 12 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 - "github.com/prometheus/client_golang/prometheus" 14 12 13 + "github.com/RussellLuo/slidingwindow" 15 14 "github.com/gorilla/websocket" 15 + "github.com/prometheus/client_golang/prometheus" 16 16 ) 17 17 18 18 type RepoStreamCallbacks struct {
+1 -1
cmd/relay/events/events.go
··· 13 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 14 "github.com/bluesky-social/indigo/cmd/relay/models" 15 15 lexutil "github.com/bluesky-social/indigo/lex/util" 16 - "github.com/prometheus/client_golang/prometheus" 17 16 17 + "github.com/prometheus/client_golang/prometheus" 18 18 cbg "github.com/whyrusleeping/cbor-gen" 19 19 "go.opentelemetry.io/otel" 20 20 )
-81
cmd/relay/events/persist.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "fmt" 6 - "sync" 7 5 8 6 "github.com/bluesky-social/indigo/cmd/relay/models" 9 7 ) ··· 18 16 19 17 SetEventBroadcaster(func(*XRPCStreamEvent)) 20 18 } 21 - 22 - // MemPersister is the most naive implementation of event persistence 23 - // This EventPersistence option works fine with all event types 24 - // ill do better later 25 - type MemPersister struct { 26 - buf []*XRPCStreamEvent 27 - lk sync.Mutex 28 - seq int64 29 - 30 - broadcast func(*XRPCStreamEvent) 31 - } 32 - 33 - func NewMemPersister() *MemPersister { 34 - return &MemPersister{} 35 - } 36 - 37 - func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error { 38 - mp.lk.Lock() 39 - defer mp.lk.Unlock() 40 - mp.seq++ 41 - switch { 42 - case e.RepoCommit != nil: 43 - e.RepoCommit.Seq = mp.seq 44 - case e.RepoHandle != nil: 45 - e.RepoHandle.Seq = mp.seq 46 - case e.RepoIdentity != nil: 47 - e.RepoIdentity.Seq = mp.seq 48 - case e.RepoAccount != nil: 49 - e.RepoAccount.Seq = mp.seq 50 - case e.RepoMigrate != nil: 51 - e.RepoMigrate.Seq = mp.seq 52 - case e.RepoTombstone != nil: 53 - e.RepoTombstone.Seq = mp.seq 54 - case e.LabelLabels != nil: 55 - e.LabelLabels.Seq = mp.seq 56 - default: 57 - panic("no event in persist call") 58 - } 59 - mp.buf = append(mp.buf, e) 60 - 61 - mp.broadcast(e) 62 - 63 - return nil 64 - } 65 - 66 - func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 67 - mp.lk.Lock() 68 - l := len(mp.buf) 69 - mp.lk.Unlock() 70 - 71 - if since >= int64(l) { 72 - return nil 73 - } 74 - 75 - // TODO: abusing the fact that buf[0].seq is currently always 1 76 - for _, e := range mp.buf[since:l] { 77 - if err := cb(e); err != nil { 78 - return err 79 - } 80 - } 81 - 82 - return nil 83 - } 84 - 85 - func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error { 86 - return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only") 87 - } 88 - 89 - func (mp *MemPersister) Flush(ctx context.Context) error { 90 - return nil 91 - } 92 - 93 - func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 94 - mp.broadcast = brc 95 - } 96 - 97 - func (mp *MemPersister) Shutdown(context.Context) error { 98 - return nil 99 - }
+12 -140
cmd/relay/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "context" 5 4 "crypto/rand" 6 5 "encoding/base64" 7 - "errors" 8 6 "fmt" 9 - "github.com/bluesky-social/indigo/atproto/identity" 10 - "github.com/bluesky-social/indigo/cmd/relay/events/diskpersist" 11 - "gorm.io/gorm" 12 7 "io" 13 8 "log/slog" 14 - _ "net/http/pprof" 15 9 "net/url" 16 10 "os" 17 11 "os/signal" 18 12 "path/filepath" 19 - "strconv" 20 13 "strings" 21 14 "syscall" 22 15 "time" 23 16 17 + _ "github.com/joho/godotenv/autoload" 18 + _ "go.uber.org/automaxprocs" 19 + _ "net/http/pprof" 20 + 21 + "github.com/bluesky-social/indigo/atproto/identity" 24 22 libbgs "github.com/bluesky-social/indigo/cmd/relay/bgs" 25 23 "github.com/bluesky-social/indigo/cmd/relay/events" 24 + "github.com/bluesky-social/indigo/cmd/relay/events/diskpersist" 26 25 "github.com/bluesky-social/indigo/util" 27 26 "github.com/bluesky-social/indigo/util/cliutil" 28 27 "github.com/bluesky-social/indigo/xrpc" 29 28 30 - _ "github.com/joho/godotenv/autoload" 31 - _ "go.uber.org/automaxprocs" 32 - 33 29 "github.com/carlmjohnson/versioninfo" 34 30 "github.com/urfave/cli/v2" 35 - "go.opentelemetry.io/otel" 36 - "go.opentelemetry.io/otel/attribute" 37 - "go.opentelemetry.io/otel/exporters/jaeger" 38 - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 39 - "go.opentelemetry.io/otel/sdk/resource" 40 - tracesdk "go.opentelemetry.io/otel/sdk/trace" 41 - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 42 31 "gorm.io/plugin/opentelemetry/tracing" 43 32 ) 44 33 ··· 63 52 }, 64 53 &cli.StringFlag{ 65 54 Name: "db-url", 66 - Usage: "database connection string for BGS database", 67 - Value: "sqlite://./data/bigsky/bgs.sqlite", 55 + Usage: "database connection string for relay database", 56 + Value: "sqlite://./data/relay/relay.sqlite", 68 57 EnvVars: []string{"DATABASE_URL"}, 69 58 }, 70 59 &cli.BoolFlag{ ··· 168 157 return app.Run(os.Args) 169 158 } 170 159 171 - func setupOTEL(cctx *cli.Context) error { 172 - 173 - env := cctx.String("env") 174 - if env == "" { 175 - env = "dev" 176 - } 177 - if cctx.Bool("jaeger") { 178 - jaegerUrl := "http://localhost:14268/api/traces" 179 - exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) 180 - if err != nil { 181 - return err 182 - } 183 - tp := tracesdk.NewTracerProvider( 184 - // Always be sure to batch in production. 185 - tracesdk.WithBatcher(exp), 186 - // Record information about this application in a Resource. 187 - tracesdk.WithResource(resource.NewWithAttributes( 188 - semconv.SchemaURL, 189 - semconv.ServiceNameKey.String("bgs"), 190 - attribute.String("env", env), // DataDog 191 - attribute.String("environment", env), // Others 192 - attribute.Int64("ID", 1), 193 - )), 194 - ) 195 - 196 - otel.SetTracerProvider(tp) 197 - } 198 - 199 - // Enable OTLP HTTP exporter 200 - // For relevant environment variables: 201 - // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 202 - // At a minimum, you need to set 203 - // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 204 - if ep := cctx.String("otel-exporter-otlp-endpoint"); ep != "" { 205 - slog.Info("setting up trace exporter", "endpoint", ep) 206 - ctx, cancel := context.WithCancel(context.Background()) 207 - defer cancel() 208 - 209 - exp, err := otlptracehttp.New(ctx) 210 - if err != nil { 211 - slog.Error("failed to create trace exporter", "error", err) 212 - os.Exit(1) 213 - } 214 - defer func() { 215 - ctx, cancel := context.WithTimeout(context.Background(), time.Second) 216 - defer cancel() 217 - if err := exp.Shutdown(ctx); err != nil { 218 - slog.Error("failed to shutdown trace exporter", "error", err) 219 - } 220 - }() 221 - 222 - tp := tracesdk.NewTracerProvider( 223 - tracesdk.WithBatcher(exp), 224 - tracesdk.WithResource(resource.NewWithAttributes( 225 - semconv.SchemaURL, 226 - semconv.ServiceNameKey.String("bgs"), 227 - attribute.String("env", env), // DataDog 228 - attribute.String("environment", env), // Others 229 - attribute.Int64("ID", 1), 230 - )), 231 - ) 232 - otel.SetTracerProvider(tp) 233 - } 234 - 235 - return nil 236 - } 237 - 238 160 func runRelay(cctx *cli.Context) error { 239 161 // Trap SIGINT to trigger a shutdown. 240 162 signals := make(chan os.Signal, 1) ··· 334 256 335 257 ratelimitBypass := cctx.String("bsky-social-rate-limit-skip") 336 258 337 - logger.Info("constructing bgs") 259 + logger.Info("constructing relay service") 338 260 bgsConfig := libbgs.DefaultBGSConfig() 339 261 bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws") 340 262 bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") ··· 390 312 logger.Info("received shutdown signal") 391 313 errs := bgs.Shutdown() 392 314 for err := range errs { 393 - logger.Error("error during BGS shutdown", "err", err) 315 + logger.Error("error during shutdown", "err", err) 394 316 } 395 317 case err := <-bgsErr: 396 318 if err != nil { 397 - logger.Error("error during BGS startup", "err", err) 319 + logger.Error("error during startup", "err", err) 398 320 } 399 321 logger.Info("shutting down") 400 322 errs := bgs.Shutdown() 401 323 for err := range errs { 402 - logger.Error("error during BGS shutdown", "err", err) 324 + logger.Error("error during shutdown", "err", err) 403 325 } 404 326 } 405 327 ··· 426 348 } 427 349 } 428 350 } 429 - 430 - // RelaySetting is a gorm model 431 - type RelaySetting struct { 432 - Name string `gorm:"primarykey"` 433 - Value string 434 - } 435 - 436 - func getRelaySetting(db *gorm.DB, name string) (value string, found bool, err error) { 437 - var setting RelaySetting 438 - dbResult := db.First(&setting, "name = ?", name) 439 - if errors.Is(dbResult.Error, gorm.ErrRecordNotFound) { 440 - return "", false, nil 441 - } 442 - if dbResult.Error != nil { 443 - return "", false, dbResult.Error 444 - } 445 - return setting.Value, true, nil 446 - } 447 - 448 - func setRelaySetting(db *gorm.DB, name string, value string) error { 449 - return db.Transaction(func(tx *gorm.DB) error { 450 - var setting RelaySetting 451 - found := tx.First(&setting, "name = ?", name) 452 - if errors.Is(found.Error, gorm.ErrRecordNotFound) { 453 - // ok! create it 454 - setting.Name = name 455 - setting.Value = value 456 - return tx.Create(&setting).Error 457 - } else if found.Error != nil { 458 - return found.Error 459 - } 460 - setting.Value = value 461 - return tx.Save(&setting).Error 462 - }) 463 - } 464 - 465 - func getRelaySettingBool(db *gorm.DB, name string) (value bool, found bool, err error) { 466 - strval, found, err := getRelaySetting(db, name) 467 - if err != nil || !found { 468 - return false, found, err 469 - } 470 - value, err = strconv.ParseBool(strval) 471 - if err != nil { 472 - return false, false, err 473 - } 474 - return value, true, nil 475 - } 476 - func setRelaySettingBool(db *gorm.DB, name string, value bool) error { 477 - return setRelaySetting(db, name, strconv.FormatBool(value)) 478 - }
+1
cmd/relay/models/models.go
··· 4 4 "database/sql/driver" 5 5 "encoding/json" 6 6 "fmt" 7 + 7 8 "github.com/ipfs/go-cid" 8 9 "gorm.io/gorm" 9 10 )
+84
cmd/relay/otel.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "os" 7 + "time" 8 + 9 + "github.com/urfave/cli/v2" 10 + "go.opentelemetry.io/otel" 11 + "go.opentelemetry.io/otel/attribute" 12 + "go.opentelemetry.io/otel/exporters/jaeger" 13 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 14 + "go.opentelemetry.io/otel/sdk/resource" 15 + tracesdk "go.opentelemetry.io/otel/sdk/trace" 16 + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 17 + ) 18 + 19 + func setupOTEL(cctx *cli.Context) error { 20 + 21 + env := cctx.String("env") 22 + if env == "" { 23 + env = "dev" 24 + } 25 + if cctx.Bool("jaeger") { 26 + jaegerUrl := "http://localhost:14268/api/traces" 27 + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) 28 + if err != nil { 29 + return err 30 + } 31 + tp := tracesdk.NewTracerProvider( 32 + // Always be sure to batch in production. 33 + tracesdk.WithBatcher(exp), 34 + // Record information about this application in a Resource. 35 + tracesdk.WithResource(resource.NewWithAttributes( 36 + semconv.SchemaURL, 37 + semconv.ServiceNameKey.String("bgs"), 38 + attribute.String("env", env), // DataDog 39 + attribute.String("environment", env), // Others 40 + attribute.Int64("ID", 1), 41 + )), 42 + ) 43 + 44 + otel.SetTracerProvider(tp) 45 + } 46 + 47 + // Enable OTLP HTTP exporter 48 + // For relevant environment variables: 49 + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 50 + // At a minimum, you need to set 51 + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 52 + if ep := cctx.String("otel-exporter-otlp-endpoint"); ep != "" { 53 + slog.Info("setting up trace exporter", "endpoint", ep) 54 + ctx, cancel := context.WithCancel(context.Background()) 55 + defer cancel() 56 + 57 + exp, err := otlptracehttp.New(ctx) 58 + if err != nil { 59 + slog.Error("failed to create trace exporter", "error", err) 60 + os.Exit(1) 61 + } 62 + defer func() { 63 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 64 + defer cancel() 65 + if err := exp.Shutdown(ctx); err != nil { 66 + slog.Error("failed to shutdown trace exporter", "error", err) 67 + } 68 + }() 69 + 70 + tp := tracesdk.NewTracerProvider( 71 + tracesdk.WithBatcher(exp), 72 + tracesdk.WithResource(resource.NewWithAttributes( 73 + semconv.SchemaURL, 74 + semconv.ServiceNameKey.String("bgs"), 75 + attribute.String("env", env), // DataDog 76 + attribute.String("environment", env), // Others 77 + attribute.Int64("ID", 1), 78 + )), 79 + ) 80 + otel.SetTracerProvider(tp) 81 + } 82 + 83 + return nil 84 + }
+58
cmd/relay/settings.go
··· 1 + package main 2 + 3 + import ( 4 + "errors" 5 + "strconv" 6 + 7 + "gorm.io/gorm" 8 + ) 9 + 10 + // RelaySetting is a gorm model 11 + type RelaySetting struct { 12 + Name string `gorm:"primarykey"` 13 + Value string 14 + } 15 + 16 + func getRelaySetting(db *gorm.DB, name string) (value string, found bool, err error) { 17 + var setting RelaySetting 18 + dbResult := db.First(&setting, "name = ?", name) 19 + if errors.Is(dbResult.Error, gorm.ErrRecordNotFound) { 20 + return "", false, nil 21 + } 22 + if dbResult.Error != nil { 23 + return "", false, dbResult.Error 24 + } 25 + return setting.Value, true, nil 26 + } 27 + 28 + func setRelaySetting(db *gorm.DB, name string, value string) error { 29 + return db.Transaction(func(tx *gorm.DB) error { 30 + var setting RelaySetting 31 + found := tx.First(&setting, "name = ?", name) 32 + if errors.Is(found.Error, gorm.ErrRecordNotFound) { 33 + // ok! create it 34 + setting.Name = name 35 + setting.Value = value 36 + return tx.Create(&setting).Error 37 + } else if found.Error != nil { 38 + return found.Error 39 + } 40 + setting.Value = value 41 + return tx.Save(&setting).Error 42 + }) 43 + } 44 + 45 + func getRelaySettingBool(db *gorm.DB, name string) (value bool, found bool, err error) { 46 + strval, found, err := getRelaySetting(db, name) 47 + if err != nil || !found { 48 + return false, found, err 49 + } 50 + value, err = strconv.ParseBool(strval) 51 + if err != nil { 52 + return false, false, err 53 + } 54 + return value, true, nil 55 + } 56 + func setRelaySettingBool(db *gorm.DB, name string, value bool) error { 57 + return setRelaySetting(db, name, strconv.FormatBool(value)) 58 + }