this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 174 lines 4.4 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "log/slog" 8 "net/http" 9 "os" 10 "os/signal" 11 "sync" 12 "syscall" 13 14 _ "net/http/pprof" 15 16 "github.com/bluesky-social/indigo/querycheck" 17 "github.com/bluesky-social/indigo/util/tracing" 18 19 "github.com/earthboundkid/versioninfo/v2" 20 "github.com/labstack/echo-contrib/pprof" 21 "github.com/labstack/echo/v4" 22 "github.com/labstack/echo/v4/middleware" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 "github.com/urfave/cli/v3" 25 "go.opentelemetry.io/otel/trace" 26) 27 28func main() { 29 app := cli.Command{ 30 Name: "querycheck", 31 Usage: "a postgresql query plan checker", 32 Version: versioninfo.Short(), 33 } 34 35 app.Flags = []cli.Flag{ 36 &cli.StringFlag{ 37 Name: "postgres-url", 38 Usage: "postgres url for storing events", 39 Value: "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable", 40 Sources: cli.EnvVars("POSTGRES_URL"), 41 }, 42 &cli.IntFlag{ 43 Name: "port", 44 Usage: "port to serve metrics on", 45 Value: 8080, 46 Sources: cli.EnvVars("PORT"), 47 }, 48 &cli.StringFlag{ 49 Name: "auth-token", 50 Usage: "auth token for accessing the querycheck api", 51 Value: "", 52 Sources: cli.EnvVars("AUTH_TOKEN"), 53 }, 54 } 55 56 app.Action = Querycheck 57 58 if err := app.Run(context.Background(), os.Args); err != nil { 59 log.Fatal(err) 60 } 61} 62 63var tracer trace.Tracer 64 65// Querycheck is the main function for querycheck 66func Querycheck(ctx context.Context, cmd *cli.Command) error { 67 ctx, cancel := context.WithCancel(ctx) 68 defer cancel() 69 70 // Trap SIGINT to trigger a shutdown. 71 signals := make(chan os.Signal, 1) 72 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 73 74 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 75 Level: slog.LevelInfo, 76 })) 77 defer func() { 78 logger.Info("main function teardown") 79 }() 80 81 logger = logger.With("source", "querycheck_main") 82 logger.Info("starting querycheck") 83 84 // Registers a tracer Provider globally if the exporter endpoint is set 85 if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" { 86 logger.Info("initializing tracer") 87 shutdown, err := tracing.InstallExportPipeline(ctx, "Querycheck", 1) 88 if err != nil { 89 log.Fatal(err) 90 } 91 defer func() { 92 if err := shutdown(ctx); err != nil { 93 log.Fatal(err) 94 } 95 }() 96 } 97 98 wg := sync.WaitGroup{} 99 100 // HTTP Server setup and Middleware Plumbing 101 e := echo.New() 102 e.HideBanner = true 103 e.HidePort = true 104 pprof.Register(e) 105 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 106 e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) 107 108 // Start the query checker 109 querychecker, err := querycheck.NewQuerychecker(ctx, cmd.String("postgres-url")) 110 if err != nil { 111 log.Fatalf("failed to create querychecker: %+v\n", err) 112 } 113 114 // getLikeCountQuery := `SELECT * 115 // FROM like_counts 116 // WHERE actor_did = 'did:plc:q6gjnaw2blty4crticxkmujt' 117 // AND ns = 'app.bsky.feed.post' 118 // AND rkey = '3k3jf5lgbsw24' 119 // LIMIT 1;` 120 121 // querychecker.AddQuery(ctx, "get_like_count", getLikeCountQuery, time.Second*20) 122 123 err = querychecker.Start() 124 if err != nil { 125 log.Fatalf("failed to start querychecker: %+v\n", err) 126 } 127 128 e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { 129 return func(c echo.Context) error { 130 if cmd.String("auth-token") != "" && c.Request().Header.Get("Authorization") != cmd.String("auth-token") { 131 return c.String(http.StatusUnauthorized, "unauthorized") 132 } 133 return next(c) 134 } 135 }) 136 137 e.GET("/query", querychecker.HandleGetQuery) 138 e.GET("/queries", querychecker.HandleGetQueries) 139 e.POST("/query", querychecker.HandleAddQuery) 140 e.PUT("/query", querychecker.HandleUpdateQuery) 141 e.DELETE("/query", querychecker.HandleDeleteQuery) 142 143 // Start the metrics server 144 wg.Add(1) 145 go func() { 146 logger.Info("starting metrics serverd", "port", cmd.Int("port")) 147 if err := e.Start(fmt.Sprintf(":%d", cmd.Int("port"))); err != nil { 148 logger.Error("failed to start metrics server", "err", err) 149 } 150 wg.Done() 151 }() 152 153 select { 154 case <-signals: 155 cancel() 156 fmt.Println("shutting down on signal") 157 case <-ctx.Done(): 158 fmt.Println("shutting down on context done") 159 } 160 161 logger.Info("shutting down, waiting for workers to clean up") 162 163 if err := e.Shutdown(ctx); err != nil { 164 logger.Error("failed to shut down metrics server", "err", err) 165 wg.Done() 166 } 167 168 querychecker.Stop() 169 170 wg.Wait() 171 logger.Info("shut down successfully") 172 173 return nil 174}