this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add local/global actor search merging

+125 -11
+125 -11
search/handlers.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "fmt" 7 + "slices" 7 8 "strconv" 8 9 "strings" 10 + "sync" 9 11 10 12 appbsky "github.com/bluesky-social/indigo/api/bsky" 11 13 "github.com/bluesky-social/indigo/atproto/syntax" ··· 317 319 func (s *Server) SearchProfiles(ctx context.Context, params *ActorSearchParams) (*appbsky.UnspeccedSearchActorsSkeleton_Output, error) { 318 320 ctx, span := tracer.Start(ctx, "SearchProfiles") 319 321 defer span.End() 322 + span.SetAttributes( 323 + attribute.String("query", params.Query), 324 + attribute.Bool("typeahead", params.Typeahead), 325 + attribute.Int("offset", params.Offset), 326 + attribute.Int("size", params.Size), 327 + ) 320 328 321 - var resp *EsSearchResponse 322 - var err error 323 - if params.Typeahead { 324 - resp, err = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, params) 325 - } else { 326 - resp, err = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, params) 329 + var globalResp *EsSearchResponse 330 + var personalizedResp *EsSearchResponse 331 + var globalErr error 332 + var personalizedErr error 333 + 334 + wg := sync.WaitGroup{} 335 + 336 + wg.Add(1) 337 + // Conduct the global search 338 + go func(myQ ActorSearchParams) { 339 + defer wg.Done() 340 + // Clear out the following list to conduct the global search 341 + myQ.Follows = nil 342 + 343 + if myQ.Typeahead { 344 + globalResp, globalErr = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, &myQ) 345 + } else { 346 + globalResp, globalErr = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, &myQ) 347 + } 348 + }(*params) 349 + 350 + // If we have a following list, conduct a second search to filter the results 351 + if len(params.Follows) > 0 { 352 + wg.Add(1) 353 + go func(myQ ActorSearchParams) { 354 + defer wg.Done() 355 + if myQ.Typeahead { 356 + personalizedResp, personalizedErr = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, &myQ) 357 + } else { 358 + personalizedResp, personalizedErr = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, &myQ) 359 + } 360 + }(*params) 327 361 } 328 - if err != nil { 329 - return nil, err 362 + 363 + wg.Wait() 364 + 365 + if globalErr != nil { 366 + return nil, globalErr 367 + } 368 + 369 + if len(params.Follows) > 0 { 370 + if personalizedErr != nil { 371 + return nil, personalizedErr 372 + } 373 + 374 + followingBoost := 0.1 375 + 376 + // Insert the personalized results into the global results, deduping as we go and maintaining score-order 377 + followingSeen := map[string]struct{}{} 378 + for _, r := range personalizedResp.Hits.Hits { 379 + var doc ProfileDoc 380 + if err := json.Unmarshal(r.Source, &doc); err != nil { 381 + return nil, fmt.Errorf("decoding profile doc from search response: %w", err) 382 + } 383 + 384 + did, err := syntax.ParseDID(doc.DID) 385 + if err != nil { 386 + return nil, fmt.Errorf("invalid DID in indexed document: %w", err) 387 + } 388 + 389 + if _, ok := followingSeen[did.String()]; ok { 390 + continue 391 + } 392 + 393 + followingSeen[did.String()] = struct{}{} 394 + 395 + // Insert the profile into the global results 396 + globalResp.Hits.Hits = append(globalResp.Hits.Hits, r) 397 + } 398 + 399 + // Walk the combined results and boost the scores of the personalized results and dedupe 400 + seen := map[string]struct{}{} 401 + deduped := []EsSearchHit{} 402 + for _, r := range globalResp.Hits.Hits { 403 + var doc ProfileDoc 404 + if err := json.Unmarshal(r.Source, &doc); err != nil { 405 + return nil, fmt.Errorf("decoding profile doc from search response: %w", err) 406 + } 407 + 408 + did, err := syntax.ParseDID(doc.DID) 409 + if err != nil { 410 + return nil, fmt.Errorf("invalid DID in indexed document: %w", err) 411 + } 412 + 413 + // Boost the score of the personalized results 414 + if _, ok := followingSeen[did.String()]; ok { 415 + r.Score += followingBoost 416 + } 417 + 418 + // Dedupe the results 419 + if _, ok := seen[did.String()]; ok { 420 + continue 421 + } 422 + 423 + seen[did.String()] = struct{}{} 424 + deduped = append(deduped, r) 425 + } 426 + 427 + // Sort the results by score 428 + slices.SortFunc(deduped, func(a, b EsSearchHit) int { 429 + if a.Score < b.Score { 430 + return 1 431 + } 432 + if a.Score > b.Score { 433 + return -1 434 + } 435 + return 0 436 + }) 437 + 438 + // Trim the results to the requested size 439 + if len(deduped) > params.Size { 440 + deduped = deduped[:params.Size] 441 + } 442 + 443 + globalResp.Hits.Hits = deduped 330 444 } 331 445 332 446 actors := []*appbsky.UnspeccedDefs_SkeletonSearchActor{} 333 - for _, r := range resp.Hits.Hits { 447 + for _, r := range globalResp.Hits.Hits { 334 448 var doc ProfileDoc 335 449 if err := json.Unmarshal(r.Source, &doc); err != nil { 336 450 return nil, fmt.Errorf("decoding profile doc from search response: %w", err) ··· 351 465 s := fmt.Sprintf("%d", params.Offset+params.Size) 352 466 out.Cursor = &s 353 467 } 354 - if resp.Hits.Total.Relation == "eq" { 355 - i := int64(resp.Hits.Total.Value) 468 + if globalResp.Hits.Total.Relation == "eq" { 469 + i := int64(globalResp.Hits.Total.Value) 356 470 out.HitsTotal = &i 357 471 } 358 472 return &out, nil