this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

palomar: split into multiple files

+701 -654
+63
cmd/palomar/handlers.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "strings" 6 + 7 + api "github.com/bluesky-social/indigo/api" 8 + bsky "github.com/bluesky-social/indigo/api/bsky" 9 + "github.com/labstack/echo/v4" 10 + otel "go.opentelemetry.io/otel" 11 + ) 12 + 13 + type ActorSearchResp struct { 14 + bsky.ActorProfile 15 + DID string `json:"did"` 16 + } 17 + 18 + func (s *Server) handleFromDid(ctx context.Context, did string) (string, error) { 19 + handle, _, err := api.ResolveDidToHandle(ctx, s.xrpcc, s.plc, did) 20 + if err != nil { 21 + return "", err 22 + } 23 + 24 + return handle, nil 25 + } 26 + 27 + func (s *Server) handleSearchRequestPosts(e echo.Context) error { 28 + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestPosts") 29 + defer span.End() 30 + 31 + q := strings.TrimSpace(e.QueryParam("q")) 32 + if q == "" { 33 + return e.JSON(400, map[string]any{ 34 + "error": "must pass non-empty search query", 35 + }) 36 + } 37 + 38 + out, err := s.SearchPosts(ctx, q) 39 + if err != nil { 40 + return err 41 + } 42 + 43 + return e.JSON(200, out) 44 + } 45 + 46 + func (s *Server) handleSearchRequestProfiles(e echo.Context) error { 47 + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestProfiles") 48 + defer span.End() 49 + 50 + q := strings.TrimSpace(e.QueryParam("q")) 51 + if q == "" { 52 + return e.JSON(400, map[string]any{ 53 + "error": "must pass non-empty search query", 54 + }) 55 + } 56 + 57 + out, err := s.SearchProfiles(ctx, q) 58 + if err != nil { 59 + return err 60 + } 61 + 62 + return e.JSON(200, out) 63 + }
+154
cmd/palomar/indexing.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "time" 9 + 10 + bsky "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/util" 12 + "github.com/ipfs/go-cid" 13 + 14 + esapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" 15 + ) 16 + 17 + func (s *Server) deletePost(ctx context.Context, u *User, path string) error { 18 + log.Infof("deleting post: %s", path) 19 + req := esapi.DeleteRequest{ 20 + Index: "posts", 21 + DocumentID: encodeDocumentID(u.ID, path), 22 + Refresh: "true", 23 + } 24 + 25 + res, err := req.Do(ctx, s.escli) 26 + if err != nil { 27 + return fmt.Errorf("failed to delete post: %w", err) 28 + } 29 + 30 + fmt.Println(res) 31 + 32 + return nil 33 + } 34 + 35 + func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid string, pcid cid.Cid) error { 36 + if err := s.db.Create(&PostRef{ 37 + Cid: pcid.String(), 38 + Tid: tid, 39 + Uid: u.ID, 40 + }).Error; err != nil { 41 + return err 42 + } 43 + 44 + ts, err := time.Parse(util.ISO8601, rec.CreatedAt) 45 + if err != nil { 46 + return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, tid, rec.CreatedAt, err) 47 + } 48 + 49 + blob := map[string]any{ 50 + "text": rec.Text, 51 + "createdAt": ts.UnixNano(), 52 + "user": u.Handle, 53 + } 54 + b, err := json.Marshal(blob) 55 + if err != nil { 56 + return err 57 + } 58 + 59 + log.Infof("Indexing post") 60 + req := esapi.IndexRequest{ 61 + Index: "posts", 62 + DocumentID: encodeDocumentID(u.ID, tid), 63 + Body: bytes.NewReader(b), 64 + Refresh: "true", 65 + } 66 + 67 + res, err := req.Do(ctx, s.escli) 68 + if err != nil { 69 + return fmt.Errorf("failed to send indexing request: %w", err) 70 + } 71 + 72 + fmt.Println(res) 73 + 74 + return nil 75 + } 76 + 77 + func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile) error { 78 + b, err := json.Marshal(rec) 79 + if err != nil { 80 + return err 81 + } 82 + 83 + n := "" 84 + if rec.DisplayName != nil { 85 + n = *rec.DisplayName 86 + } 87 + 88 + blob := map[string]string{ 89 + "displayName": n, 90 + "handle": u.Handle, 91 + "did": u.Did, 92 + } 93 + 94 + if rec.Description != nil { 95 + blob["description"] = *rec.Description 96 + } 97 + 98 + log.Infof("Indexing profile: %s", n) 99 + req := esapi.IndexRequest{ 100 + Index: "profiles", 101 + DocumentID: fmt.Sprint(u.ID), 102 + Body: bytes.NewReader(b), 103 + Refresh: "true", 104 + } 105 + 106 + res, err := req.Do(context.Background(), s.escli) 107 + if err != nil { 108 + return fmt.Errorf("failed to send indexing request: %w", err) 109 + } 110 + fmt.Println(res) 111 + 112 + return nil 113 + } 114 + 115 + func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { 116 + u, err := s.getOrCreateUser(ctx, did) 117 + if err != nil { 118 + return err 119 + } 120 + 121 + if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("handle", handle).Error; err != nil { 122 + return err 123 + } 124 + 125 + u.Handle = handle 126 + 127 + b, err := json.Marshal(map[string]any{ 128 + "script": map[string]any{ 129 + "source": "ctx._source.handle = params.handle", 130 + "lang": "painless", 131 + "params": map[string]any{ 132 + "handle": handle, 133 + }, 134 + }, 135 + }) 136 + if err != nil { 137 + return err 138 + } 139 + 140 + req := esapi.UpdateRequest{ 141 + Index: "profiles", 142 + DocumentID: fmt.Sprint(u.ID), 143 + Body: bytes.NewReader(b), 144 + Refresh: "true", 145 + } 146 + 147 + res, err := req.Do(context.Background(), s.escli) 148 + if err != nil { 149 + return fmt.Errorf("failed to send indexing request: %w", err) 150 + } 151 + fmt.Println(res) 152 + 153 + return nil 154 + }
-654
cmd/palomar/main.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 - "encoding/base32" 7 6 "encoding/json" 8 7 "fmt" 9 - "net/http" 10 8 "os" 11 - "strconv" 12 9 "strings" 13 - "time" 14 10 15 11 api "github.com/bluesky-social/indigo/api" 16 - comatproto "github.com/bluesky-social/indigo/api/atproto" 17 - bsky "github.com/bluesky-social/indigo/api/bsky" 18 12 cliutil "github.com/bluesky-social/indigo/cmd/gosky/util" 19 - "github.com/bluesky-social/indigo/events" 20 - lexutil "github.com/bluesky-social/indigo/lex/util" 21 - "github.com/bluesky-social/indigo/repo" 22 - "github.com/bluesky-social/indigo/repomgr" 23 - "github.com/bluesky-social/indigo/util" 24 13 "github.com/bluesky-social/indigo/xrpc" 25 - "github.com/gorilla/websocket" 26 14 lru "github.com/hashicorp/golang-lru" 27 - "github.com/ipfs/go-cid" 28 - flatfs "github.com/ipfs/go-ds-flatfs" 29 - blockstore "github.com/ipfs/go-ipfs-blockstore" 30 15 logging "github.com/ipfs/go-log" 31 16 "github.com/labstack/echo/v4" 32 17 "github.com/labstack/echo/v4/middleware" 33 - otel "go.opentelemetry.io/otel" 34 18 35 19 es "github.com/opensearch-project/opensearch-go/v2" 36 - esapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" 37 20 38 21 cli "github.com/urfave/cli/v2" 39 - 40 - gorm "gorm.io/gorm" 41 22 ) 42 23 43 24 var log = logging.Logger("search") 44 25 45 - type PostRef struct { 46 - gorm.Model 47 - Cid string 48 - Tid string `gorm:"index"` 49 - Uid uint `gorm:"index"` 50 - } 51 - 52 - type User struct { 53 - gorm.Model 54 - Did string `gorm:"index"` 55 - Handle string 56 - LastCrawl string 57 - } 58 - 59 - type LastSeq struct { 60 - ID uint `gorm:"primarykey"` 61 - Seq int64 62 - } 63 - 64 26 func main() { 65 27 app := cli.NewApp() 66 28 ··· 74 36 app.RunAndExitOnError() 75 37 } 76 38 77 - type Server struct { 78 - escli *es.Client 79 - db *gorm.DB 80 - bgshost string 81 - xrpcc *xrpc.Client 82 - bgsxrpc *xrpc.Client 83 - plc *api.PLCServer 84 - 85 - userCache *lru.Cache 86 - } 87 - 88 39 var runCmd = &cli.Command{ 89 40 Name: "run", 90 41 Flags: []cli.Flag{ ··· 189 140 }, 190 141 } 191 142 192 - func (s *Server) getLastCursor() (int64, error) { 193 - var lastSeq LastSeq 194 - if err := s.db.Find(&lastSeq).Error; err != nil { 195 - return 0, err 196 - } 197 - 198 - if lastSeq.ID == 0 { 199 - return 0, s.db.Create(&lastSeq).Error 200 - } 201 - 202 - return lastSeq.Seq, nil 203 - } 204 - 205 - func (s *Server) updateLastCursor(curs int64) error { 206 - return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error 207 - } 208 - 209 - func (s *Server) Run(ctx context.Context) error { 210 - cur, err := s.getLastCursor() 211 - if err != nil { 212 - return fmt.Errorf("get last cursor: %w", err) 213 - } 214 - 215 - d := websocket.DefaultDialer 216 - con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) 217 - if err != nil { 218 - return fmt.Errorf("events dial failed: %w", err) 219 - } 220 - 221 - return events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 222 - RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 223 - if evt.TooBig && evt.Prev != nil { 224 - log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) 225 - return nil 226 - } 227 - 228 - if evt.TooBig { 229 - if err := s.processTooBigCommit(ctx, evt); err != nil { 230 - log.Errorf("failed to process tooBig event: %s", err) 231 - return nil 232 - } 233 - 234 - return nil 235 - } 236 - 237 - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 238 - if err != nil { 239 - log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) 240 - return nil 241 - } 242 - 243 - for _, op := range evt.Ops { 244 - ek := repomgr.EventKind(op.Action) 245 - switch ek { 246 - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 247 - rc, rec, err := r.GetRecord(ctx, op.Path) 248 - if err != nil { 249 - e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) 250 - log.Error(e) 251 - return nil 252 - } 253 - 254 - if lexutil.LexLink(rc) != *op.Cid { 255 - log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 256 - return nil 257 - } 258 - 259 - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 260 - log.Errorf("failed to handle op: %s", err) 261 - return nil 262 - } 263 - 264 - case repomgr.EvtKindDeleteRecord: 265 - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 266 - log.Errorf("failed to handle delete: %s", err) 267 - return nil 268 - } 269 - } 270 - } 271 - 272 - return nil 273 - 274 - }, 275 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 276 - if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { 277 - log.Errorf("failed to update user handle: %s", err) 278 - } 279 - return nil 280 - }, 281 - }) 282 - } 283 - 284 - func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error { 285 - if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { 286 - 287 - log.Infof("handling event(%d): %s - %s", seq, did, path) 288 - u, err := s.getOrCreateUser(ctx, did) 289 - if err != nil { 290 - return fmt.Errorf("checking user: %w", err) 291 - } 292 - switch rec := rec.(type) { 293 - case *bsky.FeedPost: 294 - if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { 295 - return fmt.Errorf("indexing post: %w", err) 296 - } 297 - case *bsky.ActorProfile: 298 - if err := s.indexProfile(ctx, u, rec); err != nil { 299 - return fmt.Errorf("indexing profile: %w", err) 300 - } 301 - default: 302 - } 303 - 304 - } else if op == repomgr.EvtKindDeleteRecord { 305 - u, err := s.getOrCreateUser(ctx, did) 306 - if err != nil { 307 - return err 308 - } 309 - 310 - switch { 311 - // TODO: handle profile deletes, its an edge case, but worth doing still 312 - case strings.Contains(path, "app.bsky.feed.post"): 313 - if err := s.deletePost(ctx, u, path); err != nil { 314 - return err 315 - } 316 - } 317 - 318 - } 319 - 320 - if seq%50 == 0 { 321 - if err := s.updateLastCursor(seq); err != nil { 322 - log.Error("Failed to update cursor: ", err) 323 - } 324 - } 325 - 326 - return nil 327 - } 328 - 329 - func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 330 - repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String()) 331 - if err != nil { 332 - return err 333 - } 334 - 335 - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) 336 - if err != nil { 337 - return err 338 - } 339 - 340 - u, err := s.getOrCreateUser(ctx, evt.Repo) 341 - if err != nil { 342 - return err 343 - } 344 - 345 - return r.ForEach(ctx, "", func(k string, v cid.Cid) error { 346 - if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { 347 - rcid, rec, err := r.GetRecord(ctx, k) 348 - if err != nil { 349 - log.Errorf("failed to get record from repo checkout: %s", err) 350 - return nil 351 - } 352 - 353 - switch rec := rec.(type) { 354 - case *bsky.FeedPost: 355 - if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { 356 - return fmt.Errorf("indexing post: %w", err) 357 - } 358 - case *bsky.ActorProfile: 359 - if err := s.indexProfile(ctx, u, rec); err != nil { 360 - return fmt.Errorf("indexing profile: %w", err) 361 - } 362 - default: 363 - } 364 - 365 - } 366 - return nil 367 - }) 368 - } 369 - 370 - func (s *Server) handleSearchRequestPosts(e echo.Context) error { 371 - ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestPosts") 372 - defer span.End() 373 - 374 - q := strings.TrimSpace(e.QueryParam("q")) 375 - if q == "" { 376 - return e.JSON(400, map[string]any{ 377 - "error": "must pass non-empty search query", 378 - }) 379 - } 380 - 381 - out, err := s.SearchPosts(ctx, q) 382 - if err != nil { 383 - return err 384 - } 385 - 386 - return e.JSON(200, out) 387 - } 388 - 389 - func (s *Server) handleSearchRequestProfiles(e echo.Context) error { 390 - ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestProfiles") 391 - defer span.End() 392 - 393 - q := strings.TrimSpace(e.QueryParam("q")) 394 - if q == "" { 395 - return e.JSON(400, map[string]any{ 396 - "error": "must pass non-empty search query", 397 - }) 398 - } 399 - 400 - out, err := s.SearchProfiles(ctx, q) 401 - if err != nil { 402 - return err 403 - } 404 - 405 - return e.JSON(200, out) 406 - } 407 - 408 - func (s *Server) SearchPosts(ctx context.Context, srch string) ([]PostSearchResult, error) { 409 - resp, err := doSearchPosts(ctx, s.escli, srch) 410 - if err != nil { 411 - return nil, err 412 - } 413 - 414 - out := []PostSearchResult{} 415 - for _, r := range resp.Hits.Hits { 416 - uid, tid, err := decodeDocumentID(r.ID) 417 - if err != nil { 418 - return nil, fmt.Errorf("decoding document id: %w", err) 419 - } 420 - 421 - var p PostRef 422 - if err := s.db.First(&p, "tid = ? AND uid = ?", tid, uid).Error; err != nil { 423 - log.Infof("failed to find post in database that is referenced by elasticsearch: %s", r.ID) 424 - return nil, err 425 - } 426 - 427 - var u User 428 - if err := s.db.First(&u, "id = ?", p.Uid).Error; err != nil { 429 - return nil, err 430 - } 431 - 432 - var rec map[string]any 433 - if err := json.Unmarshal(r.Source, &rec); err != nil { 434 - return nil, err 435 - } 436 - 437 - out = append(out, PostSearchResult{ 438 - Tid: p.Tid, 439 - Cid: p.Cid, 440 - User: UserResult{ 441 - Did: u.Did, 442 - Handle: u.Handle, 443 - }, 444 - Post: &rec, 445 - }) 446 - } 447 - 448 - return out, nil 449 - } 450 - 451 - type ActorSearchResp struct { 452 - bsky.ActorProfile 453 - DID string `json:"did"` 454 - } 455 - 456 - func (s *Server) SearchProfiles(ctx context.Context, srch string) ([]*ActorSearchResp, error) { 457 - resp, err := doSearchProfiles(ctx, s.escli, srch) 458 - if err != nil { 459 - return nil, err 460 - } 461 - 462 - out := []*ActorSearchResp{} 463 - for _, r := range resp.Hits.Hits { 464 - uid, err := strconv.Atoi(r.ID) 465 - if err != nil { 466 - return nil, err 467 - } 468 - 469 - var u User 470 - if err := s.db.First(&u, "id = ?", uid).Error; err != nil { 471 - return nil, err 472 - } 473 - 474 - var rec bsky.ActorProfile 475 - if err := json.Unmarshal(r.Source, &rec); err != nil { 476 - return nil, err 477 - } 478 - 479 - out = append(out, &ActorSearchResp{ 480 - ActorProfile: rec, 481 - DID: u.Did, 482 - }) 483 - } 484 - 485 - return out, nil 486 - } 487 - 488 - func (s *Server) getOrCreateUser(ctx context.Context, did string) (*User, error) { 489 - cu, ok := s.userCache.Get(did) 490 - if ok { 491 - return cu.(*User), nil 492 - } 493 - 494 - var u User 495 - if err := s.db.Find(&u, "did = ?", did).Error; err != nil { 496 - return nil, err 497 - } 498 - if u.ID == 0 { 499 - // TODO: figure out peoples handles 500 - h, err := s.handleFromDid(ctx, did) 501 - if err != nil { 502 - log.Errorw("failed to resolve did to handle", "did", did, "err", err) 503 - } else { 504 - u.Handle = h 505 - } 506 - 507 - u.Did = did 508 - if err := s.db.Create(&u).Error; err != nil { 509 - return nil, err 510 - } 511 - } 512 - 513 - s.userCache.Add(did, &u) 514 - 515 - return &u, nil 516 - } 517 - 518 - func (s *Server) handleFromDid(ctx context.Context, did string) (string, error) { 519 - handle, _, err := api.ResolveDidToHandle(ctx, s.xrpcc, s.plc, did) 520 - if err != nil { 521 - return "", err 522 - } 523 - 524 - return handle, nil 525 - } 526 - 527 - var ErrDoneIterating = fmt.Errorf("done iterating") 528 - 529 - func encodeDocumentID(uid uint, tid string) string { 530 - comb := fmt.Sprintf("%d:%s", uid, tid) 531 - return base32.StdEncoding.EncodeToString([]byte(comb)) 532 - } 533 - 534 - func decodeDocumentID(docid string) (uint, string, error) { 535 - dec, err := base32.StdEncoding.DecodeString(docid) 536 - if err != nil { 537 - return 0, "", err 538 - } 539 - 540 - parts := strings.SplitN(string(dec), ":", 2) 541 - if len(parts) < 2 { 542 - return 0, "", fmt.Errorf("invalid document id: %q", string(dec)) 543 - } 544 - 545 - uid, err := strconv.Atoi(parts[0]) 546 - if err != nil { 547 - return 0, "", err 548 - } 549 - 550 - return uint(uid), parts[1], nil 551 - } 552 - 553 - func (s *Server) deletePost(ctx context.Context, u *User, path string) error { 554 - log.Infof("deleting post: %s", path) 555 - req := esapi.DeleteRequest{ 556 - Index: "posts", 557 - DocumentID: encodeDocumentID(u.ID, path), 558 - Refresh: "true", 559 - } 560 - 561 - res, err := req.Do(ctx, s.escli) 562 - if err != nil { 563 - return fmt.Errorf("failed to delete post: %w", err) 564 - } 565 - 566 - fmt.Println(res) 567 - 568 - return nil 569 - } 570 - 571 - func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid string, pcid cid.Cid) error { 572 - if err := s.db.Create(&PostRef{ 573 - Cid: pcid.String(), 574 - Tid: tid, 575 - Uid: u.ID, 576 - }).Error; err != nil { 577 - return err 578 - } 579 - 580 - ts, err := time.Parse(util.ISO8601, rec.CreatedAt) 581 - if err != nil { 582 - return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, tid, rec.CreatedAt, err) 583 - } 584 - 585 - blob := map[string]any{ 586 - "text": rec.Text, 587 - "createdAt": ts.UnixNano(), 588 - "user": u.Handle, 589 - } 590 - b, err := json.Marshal(blob) 591 - if err != nil { 592 - return err 593 - } 594 - 595 - log.Infof("Indexing post") 596 - req := esapi.IndexRequest{ 597 - Index: "posts", 598 - DocumentID: encodeDocumentID(u.ID, tid), 599 - Body: bytes.NewReader(b), 600 - Refresh: "true", 601 - } 602 - 603 - res, err := req.Do(ctx, s.escli) 604 - if err != nil { 605 - return fmt.Errorf("failed to send indexing request: %w", err) 606 - } 607 - 608 - fmt.Println(res) 609 - 610 - return nil 611 - } 612 - 613 - func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile) error { 614 - b, err := json.Marshal(rec) 615 - if err != nil { 616 - return err 617 - } 618 - 619 - n := "" 620 - if rec.DisplayName != nil { 621 - n = *rec.DisplayName 622 - } 623 - 624 - blob := map[string]string{ 625 - "displayName": n, 626 - "handle": u.Handle, 627 - "did": u.Did, 628 - } 629 - 630 - if rec.Description != nil { 631 - blob["description"] = *rec.Description 632 - } 633 - 634 - log.Infof("Indexing profile: %s", n) 635 - req := esapi.IndexRequest{ 636 - Index: "profiles", 637 - DocumentID: fmt.Sprint(u.ID), 638 - Body: bytes.NewReader(b), 639 - Refresh: "true", 640 - } 641 - 642 - res, err := req.Do(context.Background(), s.escli) 643 - if err != nil { 644 - return fmt.Errorf("failed to send indexing request: %w", err) 645 - } 646 - fmt.Println(res) 647 - 648 - return nil 649 - } 650 - 651 - func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { 652 - u, err := s.getOrCreateUser(ctx, did) 653 - if err != nil { 654 - return err 655 - } 656 - 657 - if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("handle", handle).Error; err != nil { 658 - return err 659 - } 660 - 661 - u.Handle = handle 662 - 663 - b, err := json.Marshal(map[string]any{ 664 - "script": map[string]any{ 665 - "source": "ctx._source.handle = params.handle", 666 - "lang": "painless", 667 - "params": map[string]any{ 668 - "handle": handle, 669 - }, 670 - }, 671 - }) 672 - if err != nil { 673 - return err 674 - } 675 - 676 - req := esapi.UpdateRequest{ 677 - Index: "profiles", 678 - DocumentID: fmt.Sprint(u.ID), 679 - Body: bytes.NewReader(b), 680 - Refresh: "true", 681 - } 682 - 683 - res, err := req.Do(context.Background(), s.escli) 684 - if err != nil { 685 - return fmt.Errorf("failed to send indexing request: %w", err) 686 - } 687 - fmt.Println(res) 688 - 689 - return nil 690 - } 691 - 692 143 func getEsCli(certfi string) (*es.Client, error) { 693 144 user := "elastic" 694 145 if u := os.Getenv("ELASTIC_USERNAME"); u != "" { ··· 760 211 }, 761 212 } 762 213 763 - type EsSearchHit struct { 764 - Index string `json:"_index"` 765 - ID string `json:"_id"` 766 - Score float64 `json:"_score"` 767 - Source json.RawMessage `json:"_source"` 768 - } 769 - 770 - type EsSearchHits struct { 771 - Total struct { 772 - Value int 773 - Relation string 774 - } `json:"total"` 775 - MaxScore float64 `json:"max_score"` 776 - Hits []EsSearchHit `json:"hits"` 777 - } 778 - 779 - type EsSearchResponse struct { 780 - Took int `json:"took"` 781 - TimedOut bool `json:"timed_out"` 782 - // Shards ??? 783 - Hits EsSearchHits `json:"hits"` 784 - } 785 - 786 - type UserResult struct { 787 - Did string `json:"did"` 788 - Handle string `json:"handle"` 789 - } 790 - 791 - type PostSearchResult struct { 792 - Tid string `json:"tid"` 793 - Cid string `json:"cid"` 794 - User UserResult `json:"user"` 795 - Post any `json:"post"` 796 - } 797 - 798 - func doSearchPosts(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { 799 - query := map[string]interface{}{ 800 - /* 801 - "sort": map[string]any{ 802 - "createdAt": map[string]any{ 803 - "order": "desc", 804 - "format": "date_nanos", 805 - }, 806 - }, 807 - */ 808 - "query": map[string]interface{}{ 809 - "match": map[string]interface{}{ 810 - "text": q, 811 - }, 812 - }, 813 - } 814 - 815 - return doSearch(ctx, escli, "posts", query) 816 - } 817 - 818 - func doSearchProfiles(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { 819 - query := map[string]interface{}{ 820 - "query": map[string]interface{}{ 821 - "multi_match": map[string]interface{}{ 822 - "query": q, 823 - "fields": []string{"description", "displayName", "handle"}, 824 - "operator": "or", 825 - }, 826 - }, 827 - } 828 - 829 - return doSearch(ctx, escli, "profiles", query) 830 - } 831 - 832 - func doSearch(ctx context.Context, escli *es.Client, index string, query interface{}) (*EsSearchResponse, error) { 833 - var buf bytes.Buffer 834 - if err := json.NewEncoder(&buf).Encode(query); err != nil { 835 - log.Fatalf("Error encoding query: %s", err) 836 - } 837 - 838 - // Perform the search request. 839 - res, err := escli.Search( 840 - escli.Search.WithContext(ctx), 841 - escli.Search.WithIndex(index), 842 - escli.Search.WithBody(&buf), 843 - escli.Search.WithTrackTotalHits(true), 844 - escli.Search.WithSize(30), 845 - ) 846 - if err != nil { 847 - log.Fatalf("Error getting response: %s", err) 848 - } 849 - defer res.Body.Close() 850 - 851 - var out EsSearchResponse 852 - if err := json.NewDecoder(res.Body).Decode(&out); err != nil { 853 - return nil, fmt.Errorf("decoding search response: %w", err) 854 - } 855 - 856 - return &out, nil 857 - } 858 - 859 214 var searchCmd = &cli.Command{ 860 215 Name: "search", 861 216 Flags: []cli.Flag{ ··· 898 253 899 254 }, 900 255 } 901 - 902 - func OpenBlockstore(dir string) (blockstore.Blockstore, error) { 903 - fds, err := flatfs.CreateOrOpen(dir, flatfs.IPFS_DEF_SHARD, false) 904 - if err != nil { 905 - return nil, err 906 - } 907 - 908 - return blockstore.NewBlockstoreNoPrefix(fds), nil 909 - }
+106
cmd/palomar/query.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + 9 + es "github.com/opensearch-project/opensearch-go/v2" 10 + ) 11 + 12 + type EsSearchHit struct { 13 + Index string `json:"_index"` 14 + ID string `json:"_id"` 15 + Score float64 `json:"_score"` 16 + Source json.RawMessage `json:"_source"` 17 + } 18 + 19 + type EsSearchHits struct { 20 + Total struct { 21 + Value int 22 + Relation string 23 + } `json:"total"` 24 + MaxScore float64 `json:"max_score"` 25 + Hits []EsSearchHit `json:"hits"` 26 + } 27 + 28 + type EsSearchResponse struct { 29 + Took int `json:"took"` 30 + TimedOut bool `json:"timed_out"` 31 + // Shards ??? 32 + Hits EsSearchHits `json:"hits"` 33 + } 34 + 35 + type UserResult struct { 36 + Did string `json:"did"` 37 + Handle string `json:"handle"` 38 + } 39 + 40 + type PostSearchResult struct { 41 + Tid string `json:"tid"` 42 + Cid string `json:"cid"` 43 + User UserResult `json:"user"` 44 + Post any `json:"post"` 45 + } 46 + 47 + func doSearchPosts(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { 48 + query := map[string]interface{}{ 49 + /* 50 + "sort": map[string]any{ 51 + "createdAt": map[string]any{ 52 + "order": "desc", 53 + "format": "date_nanos", 54 + }, 55 + }, 56 + */ 57 + "query": map[string]interface{}{ 58 + "match": map[string]interface{}{ 59 + "text": q, 60 + }, 61 + }, 62 + } 63 + 64 + return doSearch(ctx, escli, "posts", query) 65 + } 66 + 67 + func doSearchProfiles(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { 68 + query := map[string]interface{}{ 69 + "query": map[string]interface{}{ 70 + "multi_match": map[string]interface{}{ 71 + "query": q, 72 + "fields": []string{"description", "displayName", "handle"}, 73 + "operator": "or", 74 + }, 75 + }, 76 + } 77 + 78 + return doSearch(ctx, escli, "profiles", query) 79 + } 80 + 81 + func doSearch(ctx context.Context, escli *es.Client, index string, query interface{}) (*EsSearchResponse, error) { 82 + var buf bytes.Buffer 83 + if err := json.NewEncoder(&buf).Encode(query); err != nil { 84 + log.Fatalf("Error encoding query: %s", err) 85 + } 86 + 87 + // Perform the search request. 88 + res, err := escli.Search( 89 + escli.Search.WithContext(ctx), 90 + escli.Search.WithIndex(index), 91 + escli.Search.WithBody(&buf), 92 + escli.Search.WithTrackTotalHits(true), 93 + escli.Search.WithSize(30), 94 + ) 95 + if err != nil { 96 + log.Fatalf("Error getting response: %s", err) 97 + } 98 + defer res.Body.Close() 99 + 100 + var out EsSearchResponse 101 + if err := json.NewDecoder(res.Body).Decode(&out); err != nil { 102 + return nil, fmt.Errorf("decoding search response: %w", err) 103 + } 104 + 105 + return &out, nil 106 + }
+378
cmd/palomar/server.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/base32" 7 + "encoding/json" 8 + "fmt" 9 + "net/http" 10 + "strconv" 11 + "strings" 12 + 13 + api "github.com/bluesky-social/indigo/api" 14 + comatproto "github.com/bluesky-social/indigo/api/atproto" 15 + bsky "github.com/bluesky-social/indigo/api/bsky" 16 + "github.com/bluesky-social/indigo/events" 17 + lexutil "github.com/bluesky-social/indigo/lex/util" 18 + "github.com/bluesky-social/indigo/repo" 19 + "github.com/bluesky-social/indigo/repomgr" 20 + "github.com/bluesky-social/indigo/xrpc" 21 + "github.com/gorilla/websocket" 22 + lru "github.com/hashicorp/golang-lru" 23 + "github.com/ipfs/go-cid" 24 + flatfs "github.com/ipfs/go-ds-flatfs" 25 + blockstore "github.com/ipfs/go-ipfs-blockstore" 26 + 27 + es "github.com/opensearch-project/opensearch-go/v2" 28 + 29 + gorm "gorm.io/gorm" 30 + ) 31 + 32 + type Server struct { 33 + escli *es.Client 34 + db *gorm.DB 35 + bgshost string 36 + xrpcc *xrpc.Client 37 + bgsxrpc *xrpc.Client 38 + plc *api.PLCServer 39 + 40 + userCache *lru.Cache 41 + } 42 + 43 + type PostRef struct { 44 + gorm.Model 45 + Cid string 46 + Tid string `gorm:"index"` 47 + Uid uint `gorm:"index"` 48 + } 49 + 50 + type User struct { 51 + gorm.Model 52 + Did string `gorm:"index"` 53 + Handle string 54 + LastCrawl string 55 + } 56 + 57 + type LastSeq struct { 58 + ID uint `gorm:"primarykey"` 59 + Seq int64 60 + } 61 + 62 + func (s *Server) getLastCursor() (int64, error) { 63 + var lastSeq LastSeq 64 + if err := s.db.Find(&lastSeq).Error; err != nil { 65 + return 0, err 66 + } 67 + 68 + if lastSeq.ID == 0 { 69 + return 0, s.db.Create(&lastSeq).Error 70 + } 71 + 72 + return lastSeq.Seq, nil 73 + } 74 + 75 + func (s *Server) updateLastCursor(curs int64) error { 76 + return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error 77 + } 78 + 79 + func (s *Server) Run(ctx context.Context) error { 80 + cur, err := s.getLastCursor() 81 + if err != nil { 82 + return fmt.Errorf("get last cursor: %w", err) 83 + } 84 + 85 + d := websocket.DefaultDialer 86 + con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) 87 + if err != nil { 88 + return fmt.Errorf("events dial failed: %w", err) 89 + } 90 + 91 + return events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 92 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 93 + if evt.TooBig && evt.Prev != nil { 94 + log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) 95 + return nil 96 + } 97 + 98 + if evt.TooBig { 99 + if err := s.processTooBigCommit(ctx, evt); err != nil { 100 + log.Errorf("failed to process tooBig event: %s", err) 101 + return nil 102 + } 103 + 104 + return nil 105 + } 106 + 107 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 108 + if err != nil { 109 + log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) 110 + return nil 111 + } 112 + 113 + for _, op := range evt.Ops { 114 + ek := repomgr.EventKind(op.Action) 115 + switch ek { 116 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 117 + rc, rec, err := r.GetRecord(ctx, op.Path) 118 + if err != nil { 119 + e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) 120 + log.Error(e) 121 + return nil 122 + } 123 + 124 + if lexutil.LexLink(rc) != *op.Cid { 125 + log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 126 + return nil 127 + } 128 + 129 + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 130 + log.Errorf("failed to handle op: %s", err) 131 + return nil 132 + } 133 + 134 + case repomgr.EvtKindDeleteRecord: 135 + if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 136 + log.Errorf("failed to handle delete: %s", err) 137 + return nil 138 + } 139 + } 140 + } 141 + 142 + return nil 143 + 144 + }, 145 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 146 + if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { 147 + log.Errorf("failed to update user handle: %s", err) 148 + } 149 + return nil 150 + }, 151 + }) 152 + } 153 + 154 + func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error { 155 + if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { 156 + 157 + log.Infof("handling event(%d): %s - %s", seq, did, path) 158 + u, err := s.getOrCreateUser(ctx, did) 159 + if err != nil { 160 + return fmt.Errorf("checking user: %w", err) 161 + } 162 + switch rec := rec.(type) { 163 + case *bsky.FeedPost: 164 + if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { 165 + return fmt.Errorf("indexing post: %w", err) 166 + } 167 + case *bsky.ActorProfile: 168 + if err := s.indexProfile(ctx, u, rec); err != nil { 169 + return fmt.Errorf("indexing profile: %w", err) 170 + } 171 + default: 172 + } 173 + 174 + } else if op == repomgr.EvtKindDeleteRecord { 175 + u, err := s.getOrCreateUser(ctx, did) 176 + if err != nil { 177 + return err 178 + } 179 + 180 + switch { 181 + // TODO: handle profile deletes, its an edge case, but worth doing still 182 + case strings.Contains(path, "app.bsky.feed.post"): 183 + if err := s.deletePost(ctx, u, path); err != nil { 184 + return err 185 + } 186 + } 187 + 188 + } 189 + 190 + if seq%50 == 0 { 191 + if err := s.updateLastCursor(seq); err != nil { 192 + log.Error("Failed to update cursor: ", err) 193 + } 194 + } 195 + 196 + return nil 197 + } 198 + 199 + func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 200 + repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String()) 201 + if err != nil { 202 + return err 203 + } 204 + 205 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) 206 + if err != nil { 207 + return err 208 + } 209 + 210 + u, err := s.getOrCreateUser(ctx, evt.Repo) 211 + if err != nil { 212 + return err 213 + } 214 + 215 + return r.ForEach(ctx, "", func(k string, v cid.Cid) error { 216 + if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { 217 + rcid, rec, err := r.GetRecord(ctx, k) 218 + if err != nil { 219 + log.Errorf("failed to get record from repo checkout: %s", err) 220 + return nil 221 + } 222 + 223 + switch rec := rec.(type) { 224 + case *bsky.FeedPost: 225 + if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { 226 + return fmt.Errorf("indexing post: %w", err) 227 + } 228 + case *bsky.ActorProfile: 229 + if err := s.indexProfile(ctx, u, rec); err != nil { 230 + return fmt.Errorf("indexing profile: %w", err) 231 + } 232 + default: 233 + } 234 + 235 + } 236 + return nil 237 + }) 238 + } 239 + 240 + func (s *Server) SearchPosts(ctx context.Context, srch string) ([]PostSearchResult, error) { 241 + resp, err := doSearchPosts(ctx, s.escli, srch) 242 + if err != nil { 243 + return nil, err 244 + } 245 + 246 + out := []PostSearchResult{} 247 + for _, r := range resp.Hits.Hits { 248 + uid, tid, err := decodeDocumentID(r.ID) 249 + if err != nil { 250 + return nil, fmt.Errorf("decoding document id: %w", err) 251 + } 252 + 253 + var p PostRef 254 + if err := s.db.First(&p, "tid = ? AND uid = ?", tid, uid).Error; err != nil { 255 + log.Infof("failed to find post in database that is referenced by elasticsearch: %s", r.ID) 256 + return nil, err 257 + } 258 + 259 + var u User 260 + if err := s.db.First(&u, "id = ?", p.Uid).Error; err != nil { 261 + return nil, err 262 + } 263 + 264 + var rec map[string]any 265 + if err := json.Unmarshal(r.Source, &rec); err != nil { 266 + return nil, err 267 + } 268 + 269 + out = append(out, PostSearchResult{ 270 + Tid: p.Tid, 271 + Cid: p.Cid, 272 + User: UserResult{ 273 + Did: u.Did, 274 + Handle: u.Handle, 275 + }, 276 + Post: &rec, 277 + }) 278 + } 279 + 280 + return out, nil 281 + } 282 + 283 + func (s *Server) getOrCreateUser(ctx context.Context, did string) (*User, error) { 284 + cu, ok := s.userCache.Get(did) 285 + if ok { 286 + return cu.(*User), nil 287 + } 288 + 289 + var u User 290 + if err := s.db.Find(&u, "did = ?", did).Error; err != nil { 291 + return nil, err 292 + } 293 + if u.ID == 0 { 294 + // TODO: figure out peoples handles 295 + h, err := s.handleFromDid(ctx, did) 296 + if err != nil { 297 + log.Errorw("failed to resolve did to handle", "did", did, "err", err) 298 + } else { 299 + u.Handle = h 300 + } 301 + 302 + u.Did = did 303 + if err := s.db.Create(&u).Error; err != nil { 304 + return nil, err 305 + } 306 + } 307 + 308 + s.userCache.Add(did, &u) 309 + 310 + return &u, nil 311 + } 312 + 313 + var ErrDoneIterating = fmt.Errorf("done iterating") 314 + 315 + func encodeDocumentID(uid uint, tid string) string { 316 + comb := fmt.Sprintf("%d:%s", uid, tid) 317 + return base32.StdEncoding.EncodeToString([]byte(comb)) 318 + } 319 + 320 + func decodeDocumentID(docid string) (uint, string, error) { 321 + dec, err := base32.StdEncoding.DecodeString(docid) 322 + if err != nil { 323 + return 0, "", err 324 + } 325 + 326 + parts := strings.SplitN(string(dec), ":", 2) 327 + if len(parts) < 2 { 328 + return 0, "", fmt.Errorf("invalid document id: %q", string(dec)) 329 + } 330 + 331 + uid, err := strconv.Atoi(parts[0]) 332 + if err != nil { 333 + return 0, "", err 334 + } 335 + 336 + return uint(uid), parts[1], nil 337 + } 338 + 339 + func (s *Server) SearchProfiles(ctx context.Context, srch string) ([]*ActorSearchResp, error) { 340 + resp, err := doSearchProfiles(ctx, s.escli, srch) 341 + if err != nil { 342 + return nil, err 343 + } 344 + 345 + out := []*ActorSearchResp{} 346 + for _, r := range resp.Hits.Hits { 347 + uid, err := strconv.Atoi(r.ID) 348 + if err != nil { 349 + return nil, err 350 + } 351 + 352 + var u User 353 + if err := s.db.First(&u, "id = ?", uid).Error; err != nil { 354 + return nil, err 355 + } 356 + 357 + var rec bsky.ActorProfile 358 + if err := json.Unmarshal(r.Source, &rec); err != nil { 359 + return nil, err 360 + } 361 + 362 + out = append(out, &ActorSearchResp{ 363 + ActorProfile: rec, 364 + DID: u.Did, 365 + }) 366 + } 367 + 368 + return out, nil 369 + } 370 + 371 + func OpenBlockstore(dir string) (blockstore.Blockstore, error) { 372 + fds, err := flatfs.CreateOrOpen(dir, flatfs.IPFS_DEF_SHARD, false) 373 + if err != nil { 374 + return nil, err 375 + } 376 + 377 + return blockstore.NewBlockstoreNoPrefix(fds), nil 378 + }