Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: batch FeedIndex queries to eliminate N+1 patterns

authored by

Patrick Dewey and committed by tangled.org 7e0eaa9c 26f29d77

+212 -30
+166 -18
internal/firehose/index.go
··· 779 779 } 780 780 } 781 781 782 + // Batch-fetch social data for all records 783 + recordURIs := make([]string, 0, len(records)) 784 + didSet := make(map[string]struct{}, len(records)) 785 + for _, r := range records { 786 + recordURIs = append(recordURIs, r.URI) 787 + didSet[r.DID] = struct{}{} 788 + } 789 + likeCounts := idx.GetLikeCountsBatch(ctx, recordURIs) 790 + commentCounts := idx.GetCommentCountsBatch(ctx, recordURIs) 791 + 792 + // Pre-warm profile cache for all unique DIDs 793 + profiles := make(map[string]*atproto.Profile, len(didSet)) 794 + for did := range didSet { 795 + if p, err := idx.GetProfile(ctx, did); err == nil { 796 + profiles[did] = p 797 + } 798 + } 799 + 782 800 // Convert to FeedItems 783 801 items := make([]*FeedItem, 0, len(records)) 784 802 for _, record := range records { 785 - item, err := idx.recordToFeedItem(ctx, record, recordsByURI) 803 + item, err := idx.recordToFeedItem(ctx, record, recordsByURI, profiles) 786 804 if err != nil { 787 805 log.Warn().Err(err).Str("uri", record.URI).Msg("failed to convert record to feed item") 788 806 continue ··· 790 808 if !FeedableRecordTypes[item.RecordType] { 791 809 continue 792 810 } 811 + item.LikeCount = likeCounts[record.URI] 812 + item.CommentCount = commentCounts[record.URI] 793 813 items = append(items, item) 794 814 } 795 815 796 816 return items, nil 797 817 } 798 818 799 - // recordToFeedItem converts an IndexedRecord to a FeedItem 800 - func (idx *FeedIndex) recordToFeedItem(ctx context.Context, record *IndexedRecord, refMap map[string]*IndexedRecord) (*FeedItem, error) { 819 + // recordToFeedItem converts an IndexedRecord to a FeedItem. 820 + // The profiles map provides pre-fetched profiles keyed by DID; if nil or missing, 821 + // the profile is fetched individually as a fallback. 822 + func (idx *FeedIndex) recordToFeedItem(ctx context.Context, record *IndexedRecord, refMap map[string]*IndexedRecord, profiles map[string]*atproto.Profile) (*FeedItem, error) { 801 823 var recordData map[string]any 802 824 if err := json.Unmarshal(record.Record, &recordData); err != nil { 803 825 return nil, err ··· 808 830 TimeAgo: formatTimeAgo(record.CreatedAt), 809 831 } 810 832 811 - // Get author profile 812 - profile, err := idx.GetProfile(ctx, record.DID) 813 - if err != nil { 814 - log.Warn().Err(err).Str("did", record.DID).Msg("failed to get profile") 815 - profile = &atproto.Profile{ 816 - DID: record.DID, 817 - Handle: record.DID, 833 + // Get author profile from pre-fetched map or fallback to individual fetch 834 + profile, ok := profiles[record.DID] 835 + if !ok || profile == nil { 836 + var err error 837 + profile, err = idx.GetProfile(ctx, record.DID) 838 + if err != nil { 839 + log.Warn().Err(err).Str("did", record.DID).Msg("failed to get profile") 840 + profile = &atproto.Profile{ 841 + DID: record.DID, 842 + Handle: record.DID, 843 + } 818 844 } 819 845 } 820 846 item.Author = profile ··· 964 990 return nil, fmt.Errorf("unknown collection: %s", record.Collection) 965 991 } 966 992 967 - // Populate like-related fields for all record types 993 + // Populate subject fields (like/comment counts are set by caller via batch) 968 994 item.SubjectURI = record.URI 969 995 item.SubjectCID = record.CID 970 - item.LikeCount = idx.GetLikeCount(ctx, record.URI) 971 - item.CommentCount = idx.GetCommentCount(ctx, record.URI) 972 996 973 997 return item, nil 974 998 } ··· 1379 1403 return rkey 1380 1404 } 1381 1405 1406 + // ========== Batch Query Methods ========== 1407 + 1408 + // placeholders returns a string of "?,?,?" for n items and a corresponding []any slice. 1409 + func placeholders(uris []string) (string, []any) { 1410 + ph := make([]string, len(uris)) 1411 + args := make([]any, len(uris)) 1412 + for i, u := range uris { 1413 + ph[i] = "?" 1414 + args[i] = u 1415 + } 1416 + return strings.Join(ph, ","), args 1417 + } 1418 + 1419 + // GetLikeCountsBatch returns like counts for multiple subject URIs in a single query. 1420 + func (idx *FeedIndex) GetLikeCountsBatch(ctx context.Context, uris []string) map[string]int { 1421 + counts := make(map[string]int, len(uris)) 1422 + if len(uris) == 0 { 1423 + return counts 1424 + } 1425 + ph, args := placeholders(uris) 1426 + rows, err := idx.db.QueryContext(ctx, 1427 + `SELECT subject_uri, COUNT(*) FROM likes WHERE subject_uri IN (`+ph+`) GROUP BY subject_uri`, args...) 1428 + if err != nil { 1429 + return counts 1430 + } 1431 + defer rows.Close() 1432 + for rows.Next() { 1433 + var uri string 1434 + var count int 1435 + if err := rows.Scan(&uri, &count); err == nil { 1436 + counts[uri] = count 1437 + } 1438 + } 1439 + return counts 1440 + } 1441 + 1442 + // HasUserLikedBatch checks if a user has liked multiple records in a single query. 1443 + func (idx *FeedIndex) HasUserLikedBatch(ctx context.Context, actorDID string, uris []string) map[string]bool { 1444 + liked := make(map[string]bool, len(uris)) 1445 + if len(uris) == 0 || actorDID == "" { 1446 + return liked 1447 + } 1448 + ph, args := placeholders(uris) 1449 + // Prepend actorDID to args 1450 + allArgs := make([]any, 0, len(args)+1) 1451 + allArgs = append(allArgs, actorDID) 1452 + allArgs = append(allArgs, args...) 1453 + rows, err := idx.db.QueryContext(ctx, 1454 + `SELECT subject_uri FROM likes WHERE actor_did = ? AND subject_uri IN (`+ph+`)`, allArgs...) 1455 + if err != nil { 1456 + return liked 1457 + } 1458 + defer rows.Close() 1459 + for rows.Next() { 1460 + var uri string 1461 + if err := rows.Scan(&uri); err == nil { 1462 + liked[uri] = true 1463 + } 1464 + } 1465 + return liked 1466 + } 1467 + 1468 + // GetCommentCountsBatch returns comment counts for multiple subject URIs in a single query. 1469 + func (idx *FeedIndex) GetCommentCountsBatch(ctx context.Context, uris []string) map[string]int { 1470 + counts := make(map[string]int, len(uris)) 1471 + if len(uris) == 0 { 1472 + return counts 1473 + } 1474 + ph, args := placeholders(uris) 1475 + rows, err := idx.db.QueryContext(ctx, 1476 + `SELECT subject_uri, COUNT(*) FROM comments WHERE subject_uri IN (`+ph+`) GROUP BY subject_uri`, args...) 1477 + if err != nil { 1478 + return counts 1479 + } 1480 + defer rows.Close() 1481 + for rows.Next() { 1482 + var uri string 1483 + var count int 1484 + if err := rows.Scan(&uri, &count); err == nil { 1485 + counts[uri] = count 1486 + } 1487 + } 1488 + return counts 1489 + } 1490 + 1491 + // GetRecordsBatch retrieves multiple records by URI in a single query. 1492 + func (idx *FeedIndex) GetRecordsBatch(ctx context.Context, uris []string) map[string]*IndexedRecord { 1493 + records := make(map[string]*IndexedRecord, len(uris)) 1494 + if len(uris) == 0 { 1495 + return records 1496 + } 1497 + ph, args := placeholders(uris) 1498 + rows, err := idx.db.QueryContext(ctx, 1499 + `SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at FROM records WHERE uri IN (`+ph+`)`, args...) 1500 + if err != nil { 1501 + return records 1502 + } 1503 + defer rows.Close() 1504 + for rows.Next() { 1505 + var rec IndexedRecord 1506 + var recordStr, indexedAtStr, createdAtStr string 1507 + if err := rows.Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey, 1508 + &recordStr, &rec.CID, &indexedAtStr, &createdAtStr); err != nil { 1509 + continue 1510 + } 1511 + rec.Record = json.RawMessage(recordStr) 1512 + rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr) 1513 + rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr) 1514 + records[rec.URI] = &rec 1515 + } 1516 + return records 1517 + } 1518 + 1382 1519 // IndexedComment represents a comment stored in the index 1383 1520 type IndexedComment struct { 1384 1521 RKey string `json:"rkey"` ··· 1469 1606 comments = append(comments, c) 1470 1607 } 1471 1608 1472 - // Populate profile and like info for each comment 1609 + // Batch-fetch profiles and social data for all comments 1610 + commentURIs := make([]string, len(comments)) 1611 + didSet := make(map[string]struct{}, len(comments)) 1612 + for i, c := range comments { 1613 + commentURIs[i] = fmt.Sprintf("at://%s/social.arabica.alpha.comment/%s", c.ActorDID, c.RKey) 1614 + didSet[c.ActorDID] = struct{}{} 1615 + } 1616 + likeCounts := idx.GetLikeCountsBatch(ctx, commentURIs) 1617 + var likedByViewer map[string]bool 1618 + if viewerDID != "" { 1619 + likedByViewer = idx.HasUserLikedBatch(ctx, viewerDID, commentURIs) 1620 + } 1621 + 1473 1622 for i := range comments { 1474 1623 profile, err := idx.GetProfile(ctx, comments[i].ActorDID) 1475 1624 if err != nil { ··· 1480 1629 comments[i].Avatar = profile.Avatar 1481 1630 } 1482 1631 1483 - commentURI := fmt.Sprintf("at://%s/social.arabica.alpha.comment/%s", comments[i].ActorDID, comments[i].RKey) 1484 - comments[i].LikeCount = idx.GetLikeCount(ctx, commentURI) 1485 - if viewerDID != "" { 1486 - comments[i].IsLiked = idx.HasUserLiked(ctx, viewerDID, commentURI) 1632 + comments[i].LikeCount = likeCounts[commentURIs[i]] 1633 + if likedByViewer != nil { 1634 + comments[i].IsLiked = likedByViewer[commentURIs[i]] 1487 1635 } 1488 1636 } 1489 1637
+13 -4
internal/handlers/feed.go
··· 116 116 117 117 // Populate IsLikedByViewer and IsOwner for each feed item if user is authenticated 118 118 if isAuthenticated { 119 + // Batch fetch liked status for all feed items 120 + var likedByViewer map[string]bool 121 + if h.feedIndex != nil { 122 + uris := make([]string, 0, len(feedItems)) 123 + for _, item := range feedItems { 124 + if item.SubjectURI != "" { 125 + uris = append(uris, item.SubjectURI) 126 + } 127 + } 128 + likedByViewer = h.feedIndex.HasUserLikedBatch(r.Context(), viewerDID, uris) 129 + } 119 130 for _, item := range feedItems { 120 - // Check if viewer owns this record 121 131 if item.Author != nil { 122 132 item.IsOwner = item.Author.DID == viewerDID 123 133 } 124 - // Check if viewer liked this record 125 - if h.feedIndex != nil && item.SubjectURI != "" { 126 - item.IsLikedByViewer = h.feedIndex.HasUserLiked(r.Context(), viewerDID, item.SubjectURI) 134 + if likedByViewer != nil { 135 + item.IsLikedByViewer = likedByViewer[item.SubjectURI] 127 136 } 128 137 } 129 138 }
+23 -7
internal/handlers/profile.go
··· 577 577 brewCIDs := make(map[string]string) 578 578 var beanBrewCounts, grinderBrewCounts, brewerBrewCounts, roasterBeanCounts map[string]int 579 579 if h.feedIndex != nil && profile != nil { 580 + // Collect all brew URIs for batch lookup 581 + brewURIs := make([]string, 0, len(profileData.Brews)) 582 + uriToRKey := make(map[string]string, len(profileData.Brews)) 580 583 for _, brew := range profileData.Brews { 581 - subjectURI := atproto.BuildATURI(profile.DID, atproto.NSIDBrew, brew.RKey) 582 - brewLikeCounts[brew.RKey] = h.feedIndex.GetLikeCount(ctx, subjectURI) 583 - if isAuthenticated { 584 - brewLikedByUser[brew.RKey] = h.feedIndex.HasUserLiked(ctx, didStr, subjectURI) 584 + uri := atproto.BuildATURI(profile.DID, atproto.NSIDBrew, brew.RKey) 585 + brewURIs = append(brewURIs, uri) 586 + uriToRKey[uri] = brew.RKey 587 + } 588 + 589 + // Batch fetch like counts, liked status, and records 590 + batchLikes := h.feedIndex.GetLikeCountsBatch(ctx, brewURIs) 591 + batchRecords := h.feedIndex.GetRecordsBatch(ctx, brewURIs) 592 + var batchLiked map[string]bool 593 + if isAuthenticated { 594 + batchLiked = h.feedIndex.HasUserLikedBatch(ctx, didStr, brewURIs) 595 + } 596 + 597 + for uri, rkey := range uriToRKey { 598 + brewLikeCounts[rkey] = batchLikes[uri] 599 + if batchLiked != nil { 600 + brewLikedByUser[rkey] = batchLiked[uri] 585 601 } 586 - // Get CID from the firehose index record 587 - if record, err := h.feedIndex.GetRecord(ctx, subjectURI); err == nil && record != nil { 588 - brewCIDs[brew.RKey] = record.CID 602 + if rec, ok := batchRecords[uri]; ok { 603 + brewCIDs[rkey] = rec.CID 589 604 } 590 605 } 606 + 591 607 // Entity usage counts 592 608 beanBrewCounts = h.feedIndex.BrewCountsByBeanURI(ctx, did) 593 609 grinderBrewCounts = h.feedIndex.BrewCountsByGrinderURI(ctx, did)
+10 -1
internal/handlers/recipe.go
··· 566 566 // Batch query brew counts per recipe 567 567 brewCounts := h.feedIndex.BrewCountsByRecipeURI(ctx) 568 568 569 + // Batch-fetch brewer records referenced by recipes 570 + brewerURIs := make([]string, 0) 571 + for _, pr := range parsed { 572 + if brewerRef, ok := pr.data["brewerRef"].(string); ok && brewerRef != "" { 573 + brewerURIs = append(brewerURIs, brewerRef) 574 + } 575 + } 576 + brewerRecords := h.feedIndex.GetRecordsBatch(ctx, brewerURIs) 577 + 569 578 // Build final recipe list 570 579 recipes := make([]*models.Recipe, 0, len(parsed)) 571 580 for _, pr := range parsed { ··· 576 585 if c, parseErr := atproto.ResolveATURI(brewerRef); parseErr == nil { 577 586 recipe.BrewerRKey = c.RKey 578 587 } 579 - if brewerRec, getErr := h.feedIndex.GetRecord(ctx, brewerRef); getErr == nil && brewerRec != nil { 588 + if brewerRec, ok := brewerRecords[brewerRef]; ok { 580 589 var brewerData map[string]interface{} 581 590 if err := json.Unmarshal(brewerRec.Record, &brewerData); err == nil { 582 591 if brewer, err := atproto.RecordToBrewer(brewerData, brewerRef); err == nil {