this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

palomar: refactor more server code into separate file

+235 -135
+142 -132
cmd/palomar/main.go
··· 8 8 "os" 9 9 "strings" 10 10 11 - api "github.com/bluesky-social/indigo/api" 11 + _ "github.com/joho/godotenv/autoload" 12 + 12 13 cliutil "github.com/bluesky-social/indigo/cmd/gosky/util" 13 - "github.com/bluesky-social/indigo/xrpc" 14 - lru "github.com/hashicorp/golang-lru" 15 - logging "github.com/ipfs/go-log" 16 - "github.com/labstack/echo/v4" 17 - "github.com/labstack/echo/v4/middleware" 18 14 15 + "github.com/bluesky-social/indigo/version" 16 + logging "github.com/ipfs/go-log" 19 17 es "github.com/opensearch-project/opensearch-go/v2" 20 - 21 18 cli "github.com/urfave/cli/v2" 22 19 ) 23 20 24 - var log = logging.Logger("search") 21 + var log = logging.Logger("palomar") 25 22 26 23 func main() { 27 - app := cli.NewApp() 24 + if err := run(os.Args); err != nil { 25 + log.Fatal(err) 26 + } 27 + } 28 + 29 + func run(args []string) error { 30 + 31 + app := cli.App{ 32 + Name: "palomar", 33 + Usage: "search indexing and query service (using ES or OS)", 34 + Version: version.Version, 35 + } 36 + 37 + app.Flags = []cli.Flag{ 38 + &cli.StringFlag{ 39 + Name: "elastic-cert-file", 40 + Usage: "certificate file path", 41 + EnvVars: []string{"ES_CERT_FILE", "ELASTIC_CERT_FILE"}, 42 + }, 43 + &cli.StringFlag{ 44 + Name: "elastic-username", 45 + Usage: "elasticsearch username", 46 + Value: "elastic", 47 + EnvVars: []string{"ES_USERNAME", "ELASTIC_USERNAME"}, 48 + }, 49 + &cli.StringFlag{ 50 + Name: "elastic-password", 51 + Usage: "elasticsearch password", 52 + EnvVars: []string{"ES_PASSWORD", "ELASTIC_PASSWORD"}, 53 + }, 54 + &cli.StringFlag{ 55 + Name: "elastic-hosts", 56 + Usage: "elasticsearch hosts", 57 + Value: "localhost:9200", 58 + EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS"}, 59 + }, 60 + &cli.StringFlag{ 61 + Name: "es-post-index", 62 + Usage: "ES index for 'post' documents", 63 + Value: "posts", 64 + EnvVars: []string{"ES_POST_INDEX"}, 65 + }, 66 + &cli.StringFlag{ 67 + Name: "es-profile-index", 68 + Usage: "ES index for 'profile' documents", 69 + Value: "profiles", 70 + EnvVars: []string{"ES_PROFILE_INDEX"}, 71 + }, 72 + &cli.StringFlag{ 73 + Name: "bgs-host", 74 + Usage: "hostname and port of BGS to subscribe to", 75 + Value: "https://bsky.social", 76 + EnvVars: []string{"ATP_BGS_HOST"}, 77 + }, 78 + &cli.StringFlag{ 79 + Name: "plc-host", 80 + Usage: "method, hostname, and port of PLC registry", 81 + Value: "https://plc.directory", 82 + EnvVars: []string{"ATP_PLC_HOST"}, 83 + }, 84 + // TODO(bnewbold): this is a temporary hack to fetch our own blobs 85 + &cli.StringFlag{ 86 + Name: "pds-host", 87 + Usage: "method, hostname, and port of PDS instance", 88 + Value: "https://bsky.social", 89 + EnvVars: []string{"ATP_PDS_HOST"}, 90 + }, 91 + } 28 92 29 - app.Flags = []cli.Flag{} 30 93 app.Commands = []*cli.Command{ 31 94 elasticCheckCmd, 32 95 searchCmd, 33 96 runCmd, 34 97 } 35 98 36 - app.RunAndExitOnError() 99 + return app.Run(args) 37 100 } 38 101 39 102 var runCmd = &cli.Command{ 40 - Name: "run", 103 + Name: "run", 104 + Usage: "combined indexing+query server", 41 105 Flags: []cli.Flag{ 42 106 &cli.StringFlag{ 43 - Name: "database-url", 107 + Name: "database-url", 108 + // XXX: data/palomar/search.db 44 109 Value: "sqlite://data/thecloud.db", 45 110 EnvVars: []string{"DATABASE_URL"}, 46 111 }, 47 - &cli.StringFlag{ 48 - Name: "atp-bgs-host", 49 - Required: true, 50 - EnvVars: []string{"ATP_BGS_HOST"}, 51 - }, 52 112 &cli.BoolFlag{ 53 113 Name: "readonly", 54 114 EnvVars: []string{"READONLY"}, 55 115 }, 56 116 &cli.StringFlag{ 57 - Name: "elastic-cert", 58 - }, 59 - &cli.StringFlag{ 60 - Name: "plc-host", 61 - Value: "https://plc.directory", 62 - }, 63 - &cli.StringFlag{ 64 - Name: "pds-host", 65 - Value: "https://bsky.social", 117 + Name: "bind", 118 + Usage: "IP or address, and port, to listen on for HTTP APIs", 119 + Value: ":3999", 120 + EnvVars: []string{"PALOMAR_BIND"}, 66 121 }, 67 122 }, 68 123 Action: func(cctx *cli.Context) error { 69 - log.Info("Connecting to database") 70 124 db, err := cliutil.SetupDatabase(cctx.String("database-url")) 71 125 if err != nil { 72 126 return err 73 127 } 74 128 75 - log.Info("Migrating database") 76 - db.AutoMigrate(&PostRef{}) 77 - db.AutoMigrate(&User{}) 78 - db.AutoMigrate(&LastSeq{}) 79 - 80 - log.Infof("Configuring ES client") 81 - escli, err := getEsCli(cctx.String("elastic-cert")) 129 + escli, err := createEsClient(cctx) 82 130 if err != nil { 83 131 return fmt.Errorf("failed to get elasticsearch: %w", err) 84 132 } 85 133 86 - log.Infof("Configuring HTTP server") 87 - e := echo.New() 88 - e.HTTPErrorHandler = func(err error, c echo.Context) { 89 - log.Error(err) 90 - } 91 - 92 - xc := &xrpc.Client{ 93 - Host: cctx.String("pds-host"), 94 - } 95 - 96 - plc := &api.PLCServer{ 97 - Host: cctx.String("plc-host"), 98 - } 99 - 100 - bgsws := cctx.String("atp-bgs-host") 101 - if !strings.HasPrefix(bgsws, "ws") { 102 - return fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'") 103 - } 104 - 105 - bgshttp := strings.Replace(bgsws, "ws", "http", 1) 106 - bgsxrpc := &xrpc.Client{ 107 - Host: bgshttp, 134 + srv, err := NewServer( 135 + db, 136 + escli, 137 + cctx.String("atp-plc-host"), 138 + cctx.String("atp-pds-host"), 139 + cctx.String("atp-bgs-host"), 140 + ) 141 + if err != nil { 142 + return err 108 143 } 109 144 110 - ucache, _ := lru.New(100000) 111 - s := &Server{ 112 - escli: escli, 113 - db: db, 114 - bgshost: cctx.String("atp-bgs-host"), 115 - xrpcc: xc, 116 - bgsxrpc: bgsxrpc, 117 - plc: plc, 118 - userCache: ucache, 119 - } 120 - 121 - e.Use(middleware.CORS()) 122 - 123 - e.GET("/search/posts", s.handleSearchRequestPosts) 124 - e.GET("/search/profiles", s.handleSearchRequestProfiles) 125 - 126 145 go func() { 127 - panic(e.Start(":3999")) 146 + srv.RunAPI(cctx.String("bind")) 128 147 }() 129 148 130 149 if cctx.Bool("readonly") { 131 150 select {} 132 151 } else { 133 152 ctx := context.TODO() 134 - if err := s.Run(ctx); err != nil { 135 - return fmt.Errorf("failed to run: %w", err) 153 + if err := srv.RunIndexer(ctx); err != nil { 154 + return fmt.Errorf("failed to run indexer: %w", err) 136 155 } 137 156 } 138 157 ··· 140 159 }, 141 160 } 142 161 143 - func getEsCli(certfi string) (*es.Client, error) { 144 - user := "elastic" 145 - if u := os.Getenv("ELASTIC_USERNAME"); u != "" { 146 - user = u 147 - } 148 - 149 - addrs := []string{ 150 - "https://192.168.1.221:9200", 151 - } 152 - 153 - if hosts := os.Getenv("ELASTIC_HOSTS"); hosts != "" { 154 - addrs = strings.Split(hosts, ",") 155 - } 156 - 157 - pass := os.Getenv("ELASTIC_PASSWORD") 158 - 159 - var cert []byte 160 - if certfi != "" { 161 - b, err := os.ReadFile(certfi) 162 - if err != nil { 163 - return nil, err 164 - } 165 - 166 - cert = b 167 - } 168 - 169 - cfg := es.Config{ 170 - Addresses: addrs, 171 - Username: user, 172 - Password: pass, 173 - 174 - CACert: cert, 175 - } 176 - escli, err := es.NewClient(cfg) 177 - if err != nil { 178 - return nil, fmt.Errorf("failed to set up client: %w", err) 179 - } 180 - info, err := escli.Info() 181 - if err != nil { 182 - return nil, fmt.Errorf("cannot get escli info: %w", err) 183 - } 184 - defer info.Body.Close() 185 - fmt.Println(info) 186 - 187 - return escli, nil 188 - } 189 - 190 162 var elasticCheckCmd = &cli.Command{ 191 163 Name: "elastic-check", 192 164 Flags: []cli.Flag{ ··· 195 167 }, 196 168 }, 197 169 Action: func(cctx *cli.Context) error { 198 - escli, err := getEsCli(cctx.String("elastic-cert")) 170 + escli, err := createEsClient(cctx) 199 171 if err != nil { 200 172 return err 201 173 } 202 174 175 + // NOTE: this extra info check is redundant; createEsClient() already made this call and logged results 203 176 inf, err := escli.Info() 204 177 if err != nil { 205 178 return fmt.Errorf("failed to get info: %w", err) ··· 212 185 } 213 186 214 187 var searchCmd = &cli.Command{ 215 - Name: "search", 216 - Flags: []cli.Flag{ 217 - &cli.StringFlag{ 218 - Name: "elastic-cert", 219 - }, 220 - }, 188 + Name: "search", 189 + Usage: "run a simple query against search index", 221 190 Action: func(cctx *cli.Context) error { 222 - escli, err := getEsCli(cctx.String("elastic-cert")) 191 + escli, err := createEsClient(cctx) 223 192 if err != nil { 224 193 return err 225 194 } ··· 239 208 // Perform the search request. 240 209 res, err := escli.Search( 241 210 escli.Search.WithContext(context.Background()), 242 - escli.Search.WithIndex("posts"), 211 + escli.Search.WithIndex(cctx.String("es-posts-index")), 243 212 escli.Search.WithBody(&buf), 244 213 escli.Search.WithTrackTotalHits(true), 245 214 escli.Search.WithPretty(), ··· 253 222 254 223 }, 255 224 } 225 + 226 + func createEsClient(cctx *cli.Context) (*es.Client, error) { 227 + 228 + addrs := []string{} 229 + if hosts := cctx.String("elastic-hosts"); hosts != "" { 230 + addrs = strings.Split(hosts, ",") 231 + } 232 + 233 + certfi := cctx.String("elastic-cert-file") 234 + var cert []byte 235 + if certfi != "" { 236 + b, err := os.ReadFile(certfi) 237 + if err != nil { 238 + return nil, err 239 + } 240 + 241 + cert = b 242 + } 243 + 244 + cfg := es.Config{ 245 + Addresses: addrs, 246 + Username: cctx.String("elastic-username"), 247 + Password: cctx.String("elastic-password"), 248 + 249 + CACert: cert, 250 + } 251 + 252 + escli, err := es.NewClient(cfg) 253 + if err != nil { 254 + return nil, fmt.Errorf("failed to set up client: %w", err) 255 + } 256 + 257 + info, err := escli.Info() 258 + if err != nil { 259 + return nil, fmt.Errorf("cannot get escli info: %w", err) 260 + } 261 + defer info.Body.Close() 262 + log.Info(info) 263 + 264 + return escli, nil 265 + }
+93 -3
cmd/palomar/server.go
··· 17 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 18 "github.com/bluesky-social/indigo/repo" 19 19 "github.com/bluesky-social/indigo/repomgr" 20 + "github.com/bluesky-social/indigo/version" 20 21 "github.com/bluesky-social/indigo/xrpc" 22 + 21 23 "github.com/gorilla/websocket" 22 24 lru "github.com/hashicorp/golang-lru" 23 25 "github.com/ipfs/go-cid" 24 26 flatfs "github.com/ipfs/go-ds-flatfs" 25 27 blockstore "github.com/ipfs/go-ipfs-blockstore" 26 - 28 + "github.com/labstack/echo/v4" 29 + "github.com/labstack/echo/v4/middleware" 27 30 es "github.com/opensearch-project/opensearch-go/v2" 28 - 29 31 gorm "gorm.io/gorm" 30 32 ) 31 33 ··· 36 38 xrpcc *xrpc.Client 37 39 bgsxrpc *xrpc.Client 38 40 plc *api.PLCServer 41 + echo *echo.Echo 39 42 40 43 userCache *lru.Cache 41 44 } ··· 59 62 Seq int64 60 63 } 61 64 65 + func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) (*Server, error) { 66 + 67 + log.Info("Migrating database") 68 + db.AutoMigrate(&PostRef{}) 69 + db.AutoMigrate(&User{}) 70 + db.AutoMigrate(&LastSeq{}) 71 + 72 + // TODO: robust client 73 + xc := &xrpc.Client{ 74 + Host: pdsHost, 75 + } 76 + 77 + plc := &api.PLCServer{ 78 + Host: plcHost, 79 + } 80 + 81 + bgsws := bgsHost 82 + if !strings.HasPrefix(bgsws, "ws") { 83 + return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'") 84 + } 85 + 86 + bgshttp := strings.Replace(bgsws, "ws", "http", 1) 87 + bgsxrpc := &xrpc.Client{ 88 + Host: bgshttp, 89 + } 90 + 91 + ucache, _ := lru.New(100000) 92 + s := &Server{ 93 + escli: escli, 94 + db: db, 95 + bgshost: bgsHost, 96 + xrpcc: xc, 97 + bgsxrpc: bgsxrpc, 98 + plc: plc, 99 + userCache: ucache, 100 + } 101 + return s, nil 102 + } 103 + 62 104 func (s *Server) getLastCursor() (int64, error) { 63 105 var lastSeq LastSeq 64 106 if err := s.db.Find(&lastSeq).Error; err != nil { ··· 76 118 return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error 77 119 } 78 120 79 - func (s *Server) Run(ctx context.Context) error { 121 + func (s *Server) RunIndexer(ctx context.Context) error { 80 122 cur, err := s.getLastCursor() 81 123 if err != nil { 82 124 return fmt.Errorf("get last cursor: %w", err) ··· 376 418 377 419 return blockstore.NewBlockstoreNoPrefix(fds), nil 378 420 } 421 + 422 + type HealthStatus struct { 423 + Status string `json:"status"` 424 + Version string `json:"version"` 425 + Message string `json:"msg,omitempty"` 426 + } 427 + 428 + func (s *Server) handleHealthCheck(c echo.Context) error { 429 + if err := s.db.Exec("SELECT 1").Error; err != nil { 430 + log.Errorf("healthcheck can't connect to database: %v", err) 431 + return c.JSON(500, HealthStatus{Status: "error", Version: version.Version, Message: "can't connect to database"}) 432 + } else { 433 + return c.JSON(200, HealthStatus{Status: "ok", Version: version.Version}) 434 + } 435 + } 436 + 437 + func (s *Server) RunAPI(listen string) error { 438 + 439 + log.Infof("Configuring HTTP server") 440 + e := echo.New() 441 + e.HideBanner = true 442 + 443 + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 444 + Format: "method=${method} uri=${uri} status=${status} latency=${latency_human}\n", 445 + })) 446 + 447 + e.HTTPErrorHandler = func(err error, ctx echo.Context) { 448 + code := 500 449 + if he, ok := err.(*echo.HTTPError); ok { 450 + code = he.Code 451 + } 452 + log.Warnw("HTTP request error", "statusCode", code, "path", ctx.Path(), "err", err) 453 + ctx.Response().WriteHeader(code) 454 + } 455 + 456 + e.Use(middleware.CORS()) 457 + e.GET("/_health", s.handleHealthCheck) 458 + e.GET("/search/posts", s.handleSearchRequestPosts) 459 + e.GET("/search/profiles", s.handleSearchRequestProfiles) 460 + s.echo = e 461 + 462 + log.Infof("starting search API daemon at: %s", listen) 463 + return s.echo.Start(listen) 464 + } 465 + 466 + func (s *Server) Shutdown(ctx context.Context) error { 467 + return s.echo.Shutdown(ctx) 468 + }