this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add code for search service (#154)

Dockerfile build doesnt quite work yet

authored by

Whyrusleeping and committed by
GitHub
d698626a e055d34f

+1120
+51
.github/workflows/container-palomar-aws.yaml
··· 1 + name: container-palomar-aws 2 + on: [push] 3 + env: 4 + REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} 5 + USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} 6 + PASSWORD: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_PASSWORD }} 7 + # github.repository as <account>/<repo> 8 + IMAGE_NAME: palomar 9 + 10 + jobs: 11 + container-palomar-aws: 12 + runs-on: ubuntu-latest 13 + permissions: 14 + contents: read 15 + packages: write 16 + id-token: write 17 + 18 + steps: 19 + - name: Checkout repository 20 + uses: actions/checkout@v3 21 + 22 + - name: Setup Docker buildx 23 + uses: docker/setup-buildx-action@v1 24 + 25 + - name: Log into registry ${{ env.REGISTRY }} 26 + uses: docker/login-action@v2 27 + with: 28 + registry: ${{ env.REGISTRY }} 29 + username: ${{ env.USERNAME }} 30 + password: ${{ env.PASSWORD }} 31 + 32 + - name: Extract Docker metadata 33 + id: meta 34 + uses: docker/metadata-action@v4 35 + with: 36 + images: | 37 + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} 38 + tags: | 39 + type=sha,enable=true,priority=100,prefix=,suffix=,format=long 40 + 41 + - name: Build and push Docker image 42 + id: build-and-push 43 + uses: docker/build-push-action@v4 44 + with: 45 + context: . 46 + file: ./cmd/palomar/Dockerfile 47 + push: ${{ github.event_name != 'pull_request' }} 48 + tags: ${{ steps.meta.outputs.tags }} 49 + labels: ${{ steps.meta.outputs.labels }} 50 + cache-from: type=gha 51 + cache-to: type=gha,mode=max
+52
.github/workflows/container-palomar-ghcr.yaml
··· 1 + name: container-palomar-ghcr 2 + on: [push] 3 + env: 4 + REGISTRY: ghcr.io 5 + USERNAME: ${{ github.actor }} 6 + PASSWORD: ${{ secrets.GITHUB_TOKEN }} 7 + 8 + # github.repository as <account>/<repo> 9 + IMAGE_NAME: ${{ github.repository }} 10 + 11 + jobs: 12 + container-palomar-ghcr: 13 + runs-on: ubuntu-latest 14 + permissions: 15 + contents: read 16 + packages: write 17 + id-token: write 18 + 19 + steps: 20 + - name: Checkout repository 21 + uses: actions/checkout@v3 22 + 23 + - name: Setup Docker buildx 24 + uses: docker/setup-buildx-action@v1 25 + 26 + - name: Log into registry ${{ env.REGISTRY }} 27 + uses: docker/login-action@v2 28 + with: 29 + registry: ${{ env.REGISTRY }} 30 + username: ${{ env.USERNAME }} 31 + password: ${{ env.PASSWORD }} 32 + 33 + - name: Extract Docker metadata 34 + id: meta 35 + uses: docker/metadata-action@v4 36 + with: 37 + images: | 38 + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} 39 + tags: | 40 + type=sha,enable=true,priority=100,prefix=palomar:,suffix=,format=long 41 + 42 + - name: Build and push Docker image 43 + id: build-and-push 44 + uses: docker/build-push-action@v4 45 + with: 46 + context: . 47 + file: ./cmd/palomar/Dockerfile 48 + push: ${{ github.event_name != 'pull_request' }} 49 + tags: ${{ steps.meta.outputs.tags }} 50 + labels: ${{ steps.meta.outputs.labels }} 51 + cache-from: type=gha 52 + cache-to: type=gha,mode=max
+36
cmd/palomar/Dockerfile
··· 1 + # Run this dockerfile from the top level of the indigo git repository like: 2 + # 3 + # podman build -f ./cmd/palomar/Dockerfile -t palomar . 4 + 5 + ### Compile stage 6 + FROM golang:1.20-alpine3.17 AS build-env 7 + RUN apk add --no-cache build-base make git 8 + 9 + ADD . /dockerbuild 10 + WORKDIR /dockerbuild 11 + 12 + # timezone data for alpine builds 13 + RUN GIT_VERSION=$(git describe --tags --long --always) && \ 14 + go build -tags timetzdata -ldflags="-X github.com/bluesky-social/indigo/version.Version=$GIT_VERSION" -o /palomar ./cmd/palomar 15 + 16 + ### Run stage 17 + FROM alpine:3.17 18 + 19 + RUN apk add --no-cache --update dumb-init ca-certificates 20 + ENTRYPOINT ["dumb-init", "--"] 21 + 22 + WORKDIR / 23 + RUN mkdir -p data/palomar 24 + COPY --from=build-env /palomar / 25 + 26 + # small things to make golang binaries work well under alpine 27 + ENV GODEBUG=netdns=go 28 + ENV TZ=Etc/UTC 29 + 30 + EXPOSE 2470 31 + 32 + CMD ["/palomar", "run"] 33 + 34 + LABEL org.opencontainers.image.source=https://github.com/bluesky-social/indigo 35 + LABEL org.opencontainers.image.description="atproto Search Service (for app.bsky Lexicon)" 36 + LABEL org.opencontainers.image.licenses=MIT
+50
cmd/palomar/README.md
··· 1 + # thecloud (working title) 2 + 3 + An elasticsearch frontend and ATP repo crawler meant to provide search services 4 + for the bluesky network. 5 + 6 + ## Building 7 + 8 + ``` 9 + go build 10 + ``` 11 + 12 + ## Setup 13 + 14 + You will need a running elasticsearch instance (or cluster) for indexing, and 15 + valid credentials for the PDS you wish to index against. 16 + 17 + The following environment variables should be set: 18 + - `ATP_BGS_HOST` 19 + - The url of the bluesky BGS, e.g. `https://bgs.staging.bsky.dev` 20 + - `ELASTIC_HTTPS_FINGERPRINT` 21 + - if using a self signed cert for your elasticsearch deployment, this must be set 22 + - `ELASTIC_USERNAME` 23 + - elasticsearch username, defaults to `elastic` 24 + - `ELASTIC_PASSWORD` 25 + - password for elasticsearch auth 26 + - `ELASTIC_HOSTS` 27 + - comma separated list of elasticsearch endpoints 28 + - `READONLY` 29 + - To be set only if the instance should act as a readonly HTTP server (no indexing) 30 + 31 + ## Running 32 + 33 + After ensuring the env is properly configured, run: 34 + 35 + ``` 36 + ./thecloud run 37 + ``` 38 + 39 + ## Indexing 40 + For now, there isnt an easy way to get updates from the PDS, so to keep the 41 + index up to date you will periodcally need to scrape the data. 42 + 43 + ## API 44 + 45 + ### `/index/:did` 46 + Index the content in the given users repo. Keeps track of the last repo update 47 + and only fetches incremental changes 48 + 49 + ### `/search?q=QUERY` 50 + Very simple case-insensitive search results from across the entire app.
+909
cmd/palomar/main.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/base32" 7 + "encoding/json" 8 + "fmt" 9 + "net/http" 10 + "os" 11 + "strconv" 12 + "strings" 13 + "time" 14 + 15 + api "github.com/bluesky-social/indigo/api" 16 + comatproto "github.com/bluesky-social/indigo/api/atproto" 17 + bsky "github.com/bluesky-social/indigo/api/bsky" 18 + cliutil "github.com/bluesky-social/indigo/cmd/gosky/util" 19 + "github.com/bluesky-social/indigo/events" 20 + lexutil "github.com/bluesky-social/indigo/lex/util" 21 + "github.com/bluesky-social/indigo/repo" 22 + "github.com/bluesky-social/indigo/repomgr" 23 + "github.com/bluesky-social/indigo/util" 24 + "github.com/bluesky-social/indigo/xrpc" 25 + "github.com/gorilla/websocket" 26 + lru "github.com/hashicorp/golang-lru" 27 + "github.com/ipfs/go-cid" 28 + flatfs "github.com/ipfs/go-ds-flatfs" 29 + blockstore "github.com/ipfs/go-ipfs-blockstore" 30 + logging "github.com/ipfs/go-log" 31 + "github.com/labstack/echo/v4" 32 + "github.com/labstack/echo/v4/middleware" 33 + otel "go.opentelemetry.io/otel" 34 + 35 + es "github.com/opensearch-project/opensearch-go/v2" 36 + esapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" 37 + 38 + cli "github.com/urfave/cli/v2" 39 + 40 + gorm "gorm.io/gorm" 41 + ) 42 + 43 + var log = logging.Logger("search") 44 + 45 + type PostRef struct { 46 + gorm.Model 47 + Cid string 48 + Tid string `gorm:"index"` 49 + Uid uint `gorm:"index"` 50 + } 51 + 52 + type User struct { 53 + gorm.Model 54 + Did string `gorm:"index"` 55 + Handle string 56 + LastCrawl string 57 + } 58 + 59 + type LastSeq struct { 60 + ID uint `gorm:"primarykey"` 61 + Seq int64 62 + } 63 + 64 + func main() { 65 + app := cli.NewApp() 66 + 67 + app.Flags = []cli.Flag{} 68 + app.Commands = []*cli.Command{ 69 + elasticCheckCmd, 70 + searchCmd, 71 + runCmd, 72 + } 73 + 74 + app.RunAndExitOnError() 75 + } 76 + 77 + type Server struct { 78 + escli *es.Client 79 + db *gorm.DB 80 + bgshost string 81 + xrpcc *xrpc.Client 82 + bgsxrpc *xrpc.Client 83 + plc *api.PLCServer 84 + 85 + userCache *lru.Cache 86 + } 87 + 88 + var runCmd = &cli.Command{ 89 + Name: "run", 90 + Flags: []cli.Flag{ 91 + &cli.StringFlag{ 92 + Name: "database-url", 93 + Value: "sqlite://data/thecloud.db", 94 + EnvVars: []string{"DATABASE_URL"}, 95 + }, 96 + &cli.StringFlag{ 97 + Name: "atp-bgs-host", 98 + Required: true, 99 + EnvVars: []string{"ATP_BGS_HOST"}, 100 + }, 101 + &cli.BoolFlag{ 102 + Name: "readonly", 103 + EnvVars: []string{"READONLY"}, 104 + }, 105 + &cli.StringFlag{ 106 + Name: "elastic-cert", 107 + }, 108 + &cli.StringFlag{ 109 + Name: "plc-host", 110 + Value: "https://plc.directory", 111 + }, 112 + &cli.StringFlag{ 113 + Name: "pds-host", 114 + Value: "https://bsky.social", 115 + }, 116 + }, 117 + Action: func(cctx *cli.Context) error { 118 + log.Info("Connecting to database") 119 + db, err := cliutil.SetupDatabase(cctx.String("database-url")) 120 + if err != nil { 121 + return err 122 + } 123 + 124 + log.Info("Migrating database") 125 + db.AutoMigrate(&PostRef{}) 126 + db.AutoMigrate(&User{}) 127 + db.AutoMigrate(&LastSeq{}) 128 + 129 + log.Infof("Configuring ES client") 130 + escli, err := getEsCli(cctx.String("elastic-cert")) 131 + if err != nil { 132 + return fmt.Errorf("failed to get elasticsearch: %w", err) 133 + } 134 + 135 + log.Infof("Configuring HTTP server") 136 + e := echo.New() 137 + e.HTTPErrorHandler = func(err error, c echo.Context) { 138 + log.Error(err) 139 + } 140 + 141 + xc := &xrpc.Client{ 142 + Host: cctx.String("pds-host"), 143 + } 144 + 145 + plc := &api.PLCServer{ 146 + Host: cctx.String("plc-host"), 147 + } 148 + 149 + bgsws := cctx.String("atp-bgs-host") 150 + if !strings.HasPrefix(bgsws, "ws") { 151 + return fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'") 152 + } 153 + 154 + bgshttp := strings.Replace(bgsws, "ws", "http", 1) 155 + bgsxrpc := &xrpc.Client{ 156 + Host: bgshttp, 157 + } 158 + 159 + ucache, _ := lru.New(100000) 160 + s := &Server{ 161 + escli: escli, 162 + db: db, 163 + bgshost: cctx.String("atp-bgs-host"), 164 + xrpcc: xc, 165 + bgsxrpc: bgsxrpc, 166 + plc: plc, 167 + userCache: ucache, 168 + } 169 + 170 + e.Use(middleware.CORS()) 171 + 172 + e.GET("/search/posts", s.handleSearchRequestPosts) 173 + e.GET("/search/profiles", s.handleSearchRequestProfiles) 174 + 175 + go func() { 176 + panic(e.Start(":3999")) 177 + }() 178 + 179 + if cctx.Bool("readonly") { 180 + select {} 181 + } else { 182 + ctx := context.TODO() 183 + if err := s.Run(ctx); err != nil { 184 + return fmt.Errorf("failed to run: %w", err) 185 + } 186 + } 187 + 188 + return nil 189 + }, 190 + } 191 + 192 + func (s *Server) getLastCursor() (int64, error) { 193 + var lastSeq LastSeq 194 + if err := s.db.Find(&lastSeq).Error; err != nil { 195 + return 0, err 196 + } 197 + 198 + if lastSeq.ID == 0 { 199 + return 0, s.db.Create(&lastSeq).Error 200 + } 201 + 202 + return lastSeq.Seq, nil 203 + } 204 + 205 + func (s *Server) updateLastCursor(curs int64) error { 206 + return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error 207 + } 208 + 209 + func (s *Server) Run(ctx context.Context) error { 210 + cur, err := s.getLastCursor() 211 + if err != nil { 212 + return fmt.Errorf("get last cursor: %w", err) 213 + } 214 + 215 + d := websocket.DefaultDialer 216 + con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) 217 + if err != nil { 218 + return fmt.Errorf("events dial failed: %w", err) 219 + } 220 + 221 + return events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 222 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 223 + if evt.TooBig && evt.Prev != nil { 224 + log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) 225 + return nil 226 + } 227 + 228 + if evt.TooBig { 229 + if err := s.processTooBigCommit(ctx, evt); err != nil { 230 + log.Errorf("failed to process tooBig event: %s", err) 231 + return nil 232 + } 233 + 234 + return nil 235 + } 236 + 237 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 238 + if err != nil { 239 + log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) 240 + return nil 241 + } 242 + 243 + for _, op := range evt.Ops { 244 + ek := repomgr.EventKind(op.Action) 245 + switch ek { 246 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 247 + rc, rec, err := r.GetRecord(ctx, op.Path) 248 + if err != nil { 249 + e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) 250 + log.Error(e) 251 + return nil 252 + } 253 + 254 + if lexutil.LexLink(rc) != *op.Cid { 255 + log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 256 + return nil 257 + } 258 + 259 + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 260 + log.Errorf("failed to handle op: %s", err) 261 + return nil 262 + } 263 + 264 + case repomgr.EvtKindDeleteRecord: 265 + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 266 + log.Errorf("failed to handle delete: %s", err) 267 + return nil 268 + } 269 + } 270 + } 271 + 272 + return nil 273 + 274 + }, 275 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 276 + if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { 277 + log.Errorf("failed to update user handle: %s", err) 278 + } 279 + return nil 280 + }, 281 + }) 282 + } 283 + 284 + func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error { 285 + if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { 286 + 287 + log.Infof("handling event(%d): %s - %s", seq, did, path) 288 + u, err := s.getOrCreateUser(ctx, did) 289 + if err != nil { 290 + return fmt.Errorf("checking user: %w", err) 291 + } 292 + switch rec := rec.(type) { 293 + case *bsky.FeedPost: 294 + if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { 295 + return fmt.Errorf("indexing post: %w", err) 296 + } 297 + case *bsky.ActorProfile: 298 + if err := s.indexProfile(ctx, u, rec); err != nil { 299 + return fmt.Errorf("indexing profile: %w", err) 300 + } 301 + default: 302 + } 303 + 304 + } else if op == repomgr.EvtKindDeleteRecord { 305 + u, err := s.getOrCreateUser(ctx, did) 306 + if err != nil { 307 + return err 308 + } 309 + 310 + switch { 311 + // TODO: handle profile deletes, its an edge case, but worth doing still 312 + case strings.Contains(path, "app.bsky.feed.post"): 313 + if err := s.deletePost(ctx, u, path); err != nil { 314 + return err 315 + } 316 + } 317 + 318 + } 319 + 320 + if seq%50 == 0 { 321 + if err := s.updateLastCursor(seq); err != nil { 322 + log.Error("Failed to update cursor: ", err) 323 + } 324 + } 325 + 326 + return nil 327 + } 328 + 329 + func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 330 + repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String()) 331 + if err != nil { 332 + return err 333 + } 334 + 335 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) 336 + if err != nil { 337 + return err 338 + } 339 + 340 + u, err := s.getOrCreateUser(ctx, evt.Repo) 341 + if err != nil { 342 + return err 343 + } 344 + 345 + return r.ForEach(ctx, "", func(k string, v cid.Cid) error { 346 + if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { 347 + rcid, rec, err := r.GetRecord(ctx, k) 348 + if err != nil { 349 + log.Errorf("failed to get record from repo checkout: %s", err) 350 + return nil 351 + } 352 + 353 + switch rec := rec.(type) { 354 + case *bsky.FeedPost: 355 + if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { 356 + return fmt.Errorf("indexing post: %w", err) 357 + } 358 + case *bsky.ActorProfile: 359 + if err := s.indexProfile(ctx, u, rec); err != nil { 360 + return fmt.Errorf("indexing profile: %w", err) 361 + } 362 + default: 363 + } 364 + 365 + } 366 + return nil 367 + }) 368 + } 369 + 370 + func (s *Server) handleSearchRequestPosts(e echo.Context) error { 371 + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestPosts") 372 + defer span.End() 373 + 374 + q := strings.TrimSpace(e.QueryParam("q")) 375 + if q == "" { 376 + return e.JSON(400, map[string]any{ 377 + "error": "must pass non-empty search query", 378 + }) 379 + } 380 + 381 + out, err := s.SearchPosts(ctx, q) 382 + if err != nil { 383 + return err 384 + } 385 + 386 + return e.JSON(200, out) 387 + } 388 + 389 + func (s *Server) handleSearchRequestProfiles(e echo.Context) error { 390 + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestProfiles") 391 + defer span.End() 392 + 393 + q := strings.TrimSpace(e.QueryParam("q")) 394 + if q == "" { 395 + return e.JSON(400, map[string]any{ 396 + "error": "must pass non-empty search query", 397 + }) 398 + } 399 + 400 + out, err := s.SearchProfiles(ctx, q) 401 + if err != nil { 402 + return err 403 + } 404 + 405 + return e.JSON(200, out) 406 + } 407 + 408 + func (s *Server) SearchPosts(ctx context.Context, srch string) ([]PostSearchResult, error) { 409 + resp, err := doSearchPosts(ctx, s.escli, srch) 410 + if err != nil { 411 + return nil, err 412 + } 413 + 414 + out := []PostSearchResult{} 415 + for _, r := range resp.Hits.Hits { 416 + uid, tid, err := decodeDocumentID(r.ID) 417 + if err != nil { 418 + return nil, fmt.Errorf("decoding document id: %w", err) 419 + } 420 + 421 + var p PostRef 422 + if err := s.db.First(&p, "tid = ? AND uid = ?", tid, uid).Error; err != nil { 423 + log.Infof("failed to find post in database that is referenced by elasticsearch: %s", r.ID) 424 + return nil, err 425 + } 426 + 427 + var u User 428 + if err := s.db.First(&u, "id = ?", p.Uid).Error; err != nil { 429 + return nil, err 430 + } 431 + 432 + var rec map[string]any 433 + if err := json.Unmarshal(r.Source, &rec); err != nil { 434 + return nil, err 435 + } 436 + 437 + out = append(out, PostSearchResult{ 438 + Tid: p.Tid, 439 + Cid: p.Cid, 440 + User: UserResult{ 441 + Did: u.Did, 442 + Handle: u.Handle, 443 + }, 444 + Post: &rec, 445 + }) 446 + } 447 + 448 + return out, nil 449 + } 450 + 451 + type ActorSearchResp struct { 452 + bsky.ActorProfile 453 + DID string `json:"did"` 454 + } 455 + 456 + func (s *Server) SearchProfiles(ctx context.Context, srch string) ([]*ActorSearchResp, error) { 457 + resp, err := doSearchProfiles(ctx, s.escli, srch) 458 + if err != nil { 459 + return nil, err 460 + } 461 + 462 + out := []*ActorSearchResp{} 463 + for _, r := range resp.Hits.Hits { 464 + uid, err := strconv.Atoi(r.ID) 465 + if err != nil { 466 + return nil, err 467 + } 468 + 469 + var u User 470 + if err := s.db.First(&u, "id = ?", uid).Error; err != nil { 471 + return nil, err 472 + } 473 + 474 + var rec bsky.ActorProfile 475 + if err := json.Unmarshal(r.Source, &rec); err != nil { 476 + return nil, err 477 + } 478 + 479 + out = append(out, &ActorSearchResp{ 480 + ActorProfile: rec, 481 + DID: u.Did, 482 + }) 483 + } 484 + 485 + return out, nil 486 + } 487 + 488 + func (s *Server) getOrCreateUser(ctx context.Context, did string) (*User, error) { 489 + cu, ok := s.userCache.Get(did) 490 + if ok { 491 + return cu.(*User), nil 492 + } 493 + 494 + var u User 495 + if err := s.db.Find(&u, "did = ?", did).Error; err != nil { 496 + return nil, err 497 + } 498 + if u.ID == 0 { 499 + // TODO: figure out peoples handles 500 + h, err := s.handleFromDid(ctx, did) 501 + if err != nil { 502 + log.Errorw("failed to resolve did to handle", "did", did, "err", err) 503 + } else { 504 + u.Handle = h 505 + } 506 + 507 + u.Did = did 508 + if err := s.db.Create(&u).Error; err != nil { 509 + return nil, err 510 + } 511 + } 512 + 513 + s.userCache.Add(did, &u) 514 + 515 + return &u, nil 516 + } 517 + 518 + func (s *Server) handleFromDid(ctx context.Context, did string) (string, error) { 519 + handle, _, err := api.ResolveDidToHandle(ctx, s.xrpcc, s.plc, did) 520 + if err != nil { 521 + return "", err 522 + } 523 + 524 + return handle, nil 525 + } 526 + 527 + var ErrDoneIterating = fmt.Errorf("done iterating") 528 + 529 + func encodeDocumentID(uid uint, tid string) string { 530 + comb := fmt.Sprintf("%d:%s", uid, tid) 531 + return base32.StdEncoding.EncodeToString([]byte(comb)) 532 + } 533 + 534 + func decodeDocumentID(docid string) (uint, string, error) { 535 + dec, err := base32.StdEncoding.DecodeString(docid) 536 + if err != nil { 537 + return 0, "", err 538 + } 539 + 540 + parts := strings.SplitN(string(dec), ":", 2) 541 + if len(parts) < 2 { 542 + return 0, "", fmt.Errorf("invalid document id: %q", string(dec)) 543 + } 544 + 545 + uid, err := strconv.Atoi(parts[0]) 546 + if err != nil { 547 + return 0, "", err 548 + } 549 + 550 + return uint(uid), parts[1], nil 551 + } 552 + 553 + func (s *Server) deletePost(ctx context.Context, u *User, path string) error { 554 + log.Infof("deleting post: %s", path) 555 + req := esapi.DeleteRequest{ 556 + Index: "posts", 557 + DocumentID: encodeDocumentID(u.ID, path), 558 + Refresh: "true", 559 + } 560 + 561 + res, err := req.Do(ctx, s.escli) 562 + if err != nil { 563 + return fmt.Errorf("failed to delete post: %w", err) 564 + } 565 + 566 + fmt.Println(res) 567 + 568 + return nil 569 + } 570 + 571 + func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid string, pcid cid.Cid) error { 572 + if err := s.db.Create(&PostRef{ 573 + Cid: pcid.String(), 574 + Tid: tid, 575 + Uid: u.ID, 576 + }).Error; err != nil { 577 + return err 578 + } 579 + 580 + ts, err := time.Parse(util.ISO8601, rec.CreatedAt) 581 + if err != nil { 582 + return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, tid, rec.CreatedAt, err) 583 + } 584 + 585 + blob := map[string]any{ 586 + "text": rec.Text, 587 + "createdAt": ts.UnixNano(), 588 + "user": u.Handle, 589 + } 590 + b, err := json.Marshal(blob) 591 + if err != nil { 592 + return err 593 + } 594 + 595 + log.Infof("Indexing post") 596 + req := esapi.IndexRequest{ 597 + Index: "posts", 598 + DocumentID: encodeDocumentID(u.ID, tid), 599 + Body: bytes.NewReader(b), 600 + Refresh: "true", 601 + } 602 + 603 + res, err := req.Do(ctx, s.escli) 604 + if err != nil { 605 + return fmt.Errorf("failed to send indexing request: %w", err) 606 + } 607 + 608 + fmt.Println(res) 609 + 610 + return nil 611 + } 612 + 613 + func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile) error { 614 + b, err := json.Marshal(rec) 615 + if err != nil { 616 + return err 617 + } 618 + 619 + n := "" 620 + if rec.DisplayName != nil { 621 + n = *rec.DisplayName 622 + } 623 + 624 + blob := map[string]string{ 625 + "displayName": n, 626 + "handle": u.Handle, 627 + "did": u.Did, 628 + } 629 + 630 + if rec.Description != nil { 631 + blob["description"] = *rec.Description 632 + } 633 + 634 + log.Infof("Indexing profile: %s", n) 635 + req := esapi.IndexRequest{ 636 + Index: "profiles", 637 + DocumentID: fmt.Sprint(u.ID), 638 + Body: bytes.NewReader(b), 639 + Refresh: "true", 640 + } 641 + 642 + res, err := req.Do(context.Background(), s.escli) 643 + if err != nil { 644 + return fmt.Errorf("failed to send indexing request: %w", err) 645 + } 646 + fmt.Println(res) 647 + 648 + return nil 649 + } 650 + 651 + func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { 652 + u, err := s.getOrCreateUser(ctx, did) 653 + if err != nil { 654 + return err 655 + } 656 + 657 + if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("handle", handle).Error; err != nil { 658 + return err 659 + } 660 + 661 + u.Handle = handle 662 + 663 + b, err := json.Marshal(map[string]any{ 664 + "script": map[string]any{ 665 + "source": "ctx._source.handle = params.handle", 666 + "lang": "painless", 667 + "params": map[string]any{ 668 + "handle": handle, 669 + }, 670 + }, 671 + }) 672 + if err != nil { 673 + return err 674 + } 675 + 676 + req := esapi.UpdateRequest{ 677 + Index: "profiles", 678 + DocumentID: fmt.Sprint(u.ID), 679 + Body: bytes.NewReader(b), 680 + Refresh: "true", 681 + } 682 + 683 + res, err := req.Do(context.Background(), s.escli) 684 + if err != nil { 685 + return fmt.Errorf("failed to send indexing request: %w", err) 686 + } 687 + fmt.Println(res) 688 + 689 + return nil 690 + } 691 + 692 + func getEsCli(certfi string) (*es.Client, error) { 693 + user := "elastic" 694 + if u := os.Getenv("ELASTIC_USERNAME"); u != "" { 695 + user = u 696 + } 697 + 698 + addrs := []string{ 699 + "https://192.168.1.221:9200", 700 + } 701 + 702 + if hosts := os.Getenv("ELASTIC_HOSTS"); hosts != "" { 703 + addrs = strings.Split(hosts, ",") 704 + } 705 + 706 + pass := os.Getenv("ELASTIC_PASSWORD") 707 + 708 + var cert []byte 709 + if certfi != "" { 710 + b, err := os.ReadFile(certfi) 711 + if err != nil { 712 + return nil, err 713 + } 714 + 715 + cert = b 716 + } 717 + 718 + cfg := es.Config{ 719 + Addresses: addrs, 720 + Username: user, 721 + Password: pass, 722 + 723 + CACert: cert, 724 + } 725 + escli, err := es.NewClient(cfg) 726 + if err != nil { 727 + return nil, fmt.Errorf("failed to set up client: %w", err) 728 + } 729 + info, err := escli.Info() 730 + if err != nil { 731 + return nil, fmt.Errorf("cannot get escli info: %w", err) 732 + } 733 + defer info.Body.Close() 734 + fmt.Println(info) 735 + 736 + return escli, nil 737 + } 738 + 739 + var elasticCheckCmd = &cli.Command{ 740 + Name: "elastic-check", 741 + Flags: []cli.Flag{ 742 + &cli.StringFlag{ 743 + Name: "elastic-cert", 744 + }, 745 + }, 746 + Action: func(cctx *cli.Context) error { 747 + escli, err := getEsCli(cctx.String("elastic-cert")) 748 + if err != nil { 749 + return err 750 + } 751 + 752 + inf, err := escli.Info() 753 + if err != nil { 754 + return fmt.Errorf("failed to get info: %w", err) 755 + } 756 + 757 + fmt.Println(inf) 758 + return nil 759 + 760 + }, 761 + } 762 + 763 + type EsSearchHit struct { 764 + Index string `json:"_index"` 765 + ID string `json:"_id"` 766 + Score float64 `json:"_score"` 767 + Source json.RawMessage `json:"_source"` 768 + } 769 + 770 + type EsSearchHits struct { 771 + Total struct { 772 + Value int 773 + Relation string 774 + } `json:"total"` 775 + MaxScore float64 `json:"max_score"` 776 + Hits []EsSearchHit `json:"hits"` 777 + } 778 + 779 + type EsSearchResponse struct { 780 + Took int `json:"took"` 781 + TimedOut bool `json:"timed_out"` 782 + // Shards ??? 783 + Hits EsSearchHits `json:"hits"` 784 + } 785 + 786 + type UserResult struct { 787 + Did string `json:"did"` 788 + Handle string `json:"handle"` 789 + } 790 + 791 + type PostSearchResult struct { 792 + Tid string `json:"tid"` 793 + Cid string `json:"cid"` 794 + User UserResult `json:"user"` 795 + Post any `json:"post"` 796 + } 797 + 798 + func doSearchPosts(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { 799 + query := map[string]interface{}{ 800 + /* 801 + "sort": map[string]any{ 802 + "createdAt": map[string]any{ 803 + "order": "desc", 804 + "format": "date_nanos", 805 + }, 806 + }, 807 + */ 808 + "query": map[string]interface{}{ 809 + "match": map[string]interface{}{ 810 + "text": q, 811 + }, 812 + }, 813 + } 814 + 815 + return doSearch(ctx, escli, "posts", query) 816 + } 817 + 818 + func doSearchProfiles(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { 819 + query := map[string]interface{}{ 820 + "query": map[string]interface{}{ 821 + "multi_match": map[string]interface{}{ 822 + "query": q, 823 + "fields": []string{"description", "displayName", "handle"}, 824 + "operator": "or", 825 + }, 826 + }, 827 + } 828 + 829 + return doSearch(ctx, escli, "profiles", query) 830 + } 831 + 832 + func doSearch(ctx context.Context, escli *es.Client, index string, query interface{}) (*EsSearchResponse, error) { 833 + var buf bytes.Buffer 834 + if err := json.NewEncoder(&buf).Encode(query); err != nil { 835 + log.Fatalf("Error encoding query: %s", err) 836 + } 837 + 838 + // Perform the search request. 839 + res, err := escli.Search( 840 + escli.Search.WithContext(ctx), 841 + escli.Search.WithIndex(index), 842 + escli.Search.WithBody(&buf), 843 + escli.Search.WithTrackTotalHits(true), 844 + escli.Search.WithSize(30), 845 + ) 846 + if err != nil { 847 + log.Fatalf("Error getting response: %s", err) 848 + } 849 + defer res.Body.Close() 850 + 851 + var out EsSearchResponse 852 + if err := json.NewDecoder(res.Body).Decode(&out); err != nil { 853 + return nil, fmt.Errorf("decoding search response: %w", err) 854 + } 855 + 856 + return &out, nil 857 + } 858 + 859 + var searchCmd = &cli.Command{ 860 + Name: "search", 861 + Flags: []cli.Flag{ 862 + &cli.StringFlag{ 863 + Name: "elastic-cert", 864 + }, 865 + }, 866 + Action: func(cctx *cli.Context) error { 867 + escli, err := getEsCli(cctx.String("elastic-cert")) 868 + if err != nil { 869 + return err 870 + } 871 + 872 + var buf bytes.Buffer 873 + query := map[string]interface{}{ 874 + "query": map[string]interface{}{ 875 + "match": map[string]interface{}{ 876 + "text": cctx.Args().First(), 877 + }, 878 + }, 879 + } 880 + if err := json.NewEncoder(&buf).Encode(query); err != nil { 881 + log.Fatalf("Error encoding query: %s", err) 882 + } 883 + 884 + // Perform the search request. 885 + res, err := escli.Search( 886 + escli.Search.WithContext(context.Background()), 887 + escli.Search.WithIndex("posts"), 888 + escli.Search.WithBody(&buf), 889 + escli.Search.WithTrackTotalHits(true), 890 + escli.Search.WithPretty(), 891 + ) 892 + if err != nil { 893 + log.Fatalf("Error getting response: %s", err) 894 + } 895 + 896 + fmt.Println(res) 897 + return nil 898 + 899 + }, 900 + } 901 + 902 + func OpenBlockstore(dir string) (blockstore.Blockstore, error) { 903 + fds, err := flatfs.CreateOrOpen(dir, flatfs.IPFS_DEF_SHARD, false) 904 + if err != nil { 905 + return nil, err 906 + } 907 + 908 + return blockstore.NewBlockstoreNoPrefix(fds), nil 909 + }
+1
go.mod
··· 29 29 github.com/mitchellh/go-homedir v1.1.0 30 30 github.com/multiformats/go-multibase v0.2.0 31 31 github.com/multiformats/go-multihash v0.2.1 32 + github.com/opensearch-project/opensearch-go/v2 v2.2.0 32 33 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f 33 34 github.com/prometheus/client_golang v1.14.0 34 35 github.com/stretchr/testify v1.8.2
+21
go.sum
··· 46 46 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= 47 47 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM= 48 48 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= 49 + github.com/aws/aws-sdk-go v1.44.180/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= 50 + github.com/aws/aws-sdk-go-v2 v1.17.3/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= 51 + github.com/aws/aws-sdk-go-v2/config v1.18.8/go.mod h1:5XCmmyutmzzgkpk/6NYTjeWb6lgo9N170m1j6pQkIBs= 52 + github.com/aws/aws-sdk-go-v2/credentials v1.13.8/go.mod h1:lVa4OHbvgjVot4gmh1uouF1ubgexSCN92P6CJQpT0t8= 53 + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21/go.mod h1:ugwW57Z5Z48bpvUyZuaPy4Kv+vEfJWnIrky7RmkBvJg= 54 + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27/go.mod h1:a1/UpzeyBBerajpnP5nGZa9mGzsBn5cOKxm6NWQsvoI= 55 + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21/go.mod h1:+Gxn8jYn5k9ebfHEqlhrMirFjSW0v0C9fI+KN5vk2kE= 56 + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c= 57 + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21/go.mod h1:lRToEJsn+DRA9lW4O9L9+/3hjTkUzlzyzHqn8MTds5k= 58 + github.com/aws/aws-sdk-go-v2/service/sso v1.12.0/go.mod h1:wo/B7uUm/7zw/dWhBJ4FXuw1sySU5lyIhVg1Bu2yL9A= 59 + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.0/go.mod h1:TZSH7xLO7+phDtViY/KUp9WGCJMQkLJ/VpgkTFd5gh8= 60 + github.com/aws/aws-sdk-go-v2/service/sts v1.18.0/go.mod h1:+lGbb3+1ugwKrNTWcf2RT05Xmp543B06zDFTwiTLp7I= 61 + github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= 49 62 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 50 63 github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= 51 64 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= ··· 345 358 github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= 346 359 github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= 347 360 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= 361 + github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= 362 + github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= 348 363 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 349 364 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 350 365 github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= ··· 530 545 github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= 531 546 github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= 532 547 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= 548 + github.com/opensearch-project/opensearch-go/v2 v2.2.0 h1:6RicCBiqboSVtLMjSiKgVQIsND4I3sxELg9uwWe/TKM= 549 + github.com/opensearch-project/opensearch-go/v2 v2.2.0/go.mod h1:R8NTTQMmfSRsmZdfEn2o9ZSuSXn0WTHPYhzgl7LCFLY= 533 550 github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= 534 551 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= 535 552 github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= ··· 783 800 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= 784 801 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= 785 802 golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= 803 + golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= 786 804 golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= 787 805 golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= 788 806 golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= ··· 865 883 golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 866 884 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 867 885 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 886 + golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 868 887 golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 869 888 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 870 889 golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= ··· 872 891 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= 873 892 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= 874 893 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= 894 + golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= 875 895 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= 876 896 golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= 877 897 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= ··· 881 901 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 882 902 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 883 903 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= 904 + golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= 884 905 golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= 885 906 golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= 886 907 golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=