this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

some tracing

dholms 16ac57aa 2bce4847

+133
+8
cmd/nexus/crawler.go
··· 10 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 11 "github.com/bluesky-social/indigo/cmd/nexus/models" 12 12 "github.com/bluesky-social/indigo/xrpc" 13 + "go.opentelemetry.io/otel/attribute" 13 14 "gorm.io/gorm" 14 15 ) 15 16 ··· 21 22 22 23 // EnumerateNetwork discovers and tracks all repositories on the network. 23 24 func (c *Crawler) EnumerateNetwork(ctx context.Context) error { 25 + ctx, span := tracer.Start(ctx, "EnumerateNetwork") 26 + defer span.End() 27 + 24 28 cursor, err := c.getListReposCursor(ctx) 25 29 if err != nil { 26 30 return err ··· 99 103 100 104 // EnumerateNetworkByCollection discovers repositories that have records in the specified collection. 101 105 func (c *Crawler) EnumerateNetworkByCollection(ctx context.Context, collection string) error { 106 + ctx, span := tracer.Start(ctx, "EnumerateNetworkByCollection") 107 + span.SetAttributes(attribute.String("collection", collection)) 108 + defer span.End() 109 + 102 110 cursor, err := c.getCollectionCursor(ctx, collection) 103 111 if err != nil { 104 112 return err
+20
cmd/nexus/main.go
··· 30 30 31 31 app.Flags = []cli.Flag{ 32 32 &cli.StringFlag{ 33 + Name: "env", 34 + Usage: "environment name for observability", 35 + Value: "dev", 36 + EnvVars: []string{"NEXUS_ENV"}, 37 + }, 38 + &cli.BoolFlag{ 39 + Name: "enable-jaeger-tracing", 40 + }, 41 + &cli.BoolFlag{ 42 + Name: "enable-otel-tracing", 43 + }, 44 + &cli.StringFlag{ 45 + Name: "otel-exporter-otlp-endpoint", 46 + EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"}, 47 + }, 48 + &cli.StringFlag{ 33 49 Name: "db-path", 34 50 Usage: "path to SQLite database file", 35 51 Value: "./nexus.db", ··· 107 123 ctx, cancel := context.WithCancel(cctx.Context) 108 124 logger := configLogger(cctx, os.Stdout) 109 125 slog.SetDefault(logger) 126 + 127 + if err := setupOTEL(cctx); err != nil { 128 + return err 129 + } 110 130 111 131 signals := make(chan os.Signal, 1) 112 132 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
+84
cmd/nexus/otel.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "os" 7 + "time" 8 + 9 + "github.com/urfave/cli/v2" 10 + "go.opentelemetry.io/otel" 11 + "go.opentelemetry.io/otel/attribute" 12 + "go.opentelemetry.io/otel/exporters/jaeger" 13 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 14 + "go.opentelemetry.io/otel/sdk/resource" 15 + tracesdk "go.opentelemetry.io/otel/sdk/trace" 16 + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 17 + ) 18 + 19 + func setupOTEL(cctx *cli.Context) error { 20 + 21 + env := cctx.String("env") 22 + if env == "" { 23 + env = "dev" 24 + } 25 + 26 + if cctx.Bool("enable-jaeger-tracing") { 27 + jaegerUrl := "http://localhost:14268/api/traces" 28 + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) 29 + if err != nil { 30 + return err 31 + } 32 + tp := tracesdk.NewTracerProvider( 33 + tracesdk.WithBatcher(exp), 34 + tracesdk.WithResource(resource.NewWithAttributes( 35 + semconv.SchemaURL, 36 + semconv.ServiceNameKey.String("nexus"), 37 + attribute.String("env", env), 38 + attribute.String("environment", env), 39 + attribute.Int64("ID", 1), 40 + )), 41 + ) 42 + 43 + otel.SetTracerProvider(tp) 44 + } 45 + 46 + // Enable OTLP HTTP exporter 47 + // For relevant environment variables: 48 + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 49 + // At a minimum, you need to set 50 + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 51 + if cctx.Bool("enable-otel-tracing") { 52 + ep := cctx.String("otel-exporter-otlp-endpoint") 53 + slog.Info("setting up trace exporter", "endpoint", ep) 54 + ctx, cancel := context.WithCancel(context.Background()) 55 + defer cancel() 56 + 57 + exp, err := otlptracehttp.New(ctx) 58 + if err != nil { 59 + slog.Error("failed to create trace exporter", "error", err) 60 + os.Exit(1) 61 + } 62 + defer func() { 63 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 64 + defer cancel() 65 + if err := exp.Shutdown(ctx); err != nil { 66 + slog.Error("failed to shutdown trace exporter", "error", err) 67 + } 68 + }() 69 + 70 + tp := tracesdk.NewTracerProvider( 71 + tracesdk.WithBatcher(exp), 72 + tracesdk.WithResource(resource.NewWithAttributes( 73 + semconv.SchemaURL, 74 + semconv.ServiceNameKey.String("nexus"), 75 + attribute.String("env", env), 76 + attribute.String("environment", env), 77 + attribute.Int64("ID", 1), 78 + )), 79 + ) 80 + otel.SetTracerProvider(tp) 81 + } 82 + 83 + return nil 84 + }
+12
cmd/nexus/processor.go
··· 14 14 "github.com/bluesky-social/indigo/atproto/repo" 15 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 16 "github.com/bluesky-social/indigo/cmd/nexus/models" 17 + "go.opentelemetry.io/otel" 17 18 "gorm.io/gorm" 18 19 "gorm.io/gorm/clause" 19 20 ) 21 + 22 + var tracer = otel.Tracer("nexus") 20 23 21 24 type EventProcessor struct { 22 25 Logger *slog.Logger ··· 33 36 34 37 // ProcessCommit validates and applies a commit event from the firehose. 35 38 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 39 + ctx, span := tracer.Start(ctx, "ProcessCommit") 40 + defer span.End() 41 + 36 42 defer ep.lastSeq.Store(evt.Seq) 37 43 38 44 curr, err := ep.GetRepoState(evt.Repo) ··· 170 176 171 177 // ProcessSync handles sync events and marks repos for resync if needed. 172 178 func (ep *EventProcessor) ProcessSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 179 + ctx, span := tracer.Start(ctx, "ProcessSync") 180 + defer span.End() 181 + 173 182 defer ep.lastSeq.Store(evt.Seq) 174 183 175 184 curr, err := ep.GetRepoState(evt.Did) ··· 220 229 221 230 // RefreshIdentity fetches the latest identity information for a DID. 222 231 func (ep *EventProcessor) RefreshIdentity(ctx context.Context, did string) error { 232 + ctx, span := tracer.Start(ctx, "RefreshIdentity") 233 + defer span.End() 234 + 223 235 curr, err := ep.GetRepoState(did) 224 236 if err != nil { 225 237 return err
+9
cmd/nexus/resync.go
··· 15 15 "github.com/bluesky-social/indigo/cmd/nexus/models" 16 16 "github.com/bluesky-social/indigo/xrpc" 17 17 "github.com/ipfs/go-cid" 18 + "go.opentelemetry.io/otel/attribute" 18 19 "gorm.io/gorm" 19 20 "gorm.io/gorm/clause" 20 21 ) ··· 69 70 } 70 71 71 72 func (n *Nexus) resyncDid(ctx context.Context, did string) error { 73 + ctx, span := tracer.Start(ctx, "resyncDid") 74 + span.SetAttributes(attribute.String("did", did)) 75 + defer span.End() 76 + 72 77 n.logger.Info("starting resync", "did", did) 73 78 74 79 err := n.EventProcessor.RefreshIdentity(ctx, did) ··· 91 96 const batchSize = 100 92 97 93 98 func (n *Nexus) doResync(ctx context.Context, did string) (bool, error) { 99 + ctx, span := tracer.Start(ctx, "doResync") 100 + span.SetAttributes(attribute.String("did", did)) 101 + defer span.End() 102 + 94 103 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 95 104 if err != nil { 96 105 return false, fmt.Errorf("failed to resolve DID: %w", err)