Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: propagate context.Context through FeedIndex SQL methods for otelsql tracing

Add context.Context parameter to all FeedIndex methods that perform SQL
queries but previously lacked it, and switch from Query/Exec/QueryRow to
their Context variants (QueryContext/ExecContext/QueryRowContext).

This leverages the existing otelsql instrumentation to automatically create
spans for ALL SQL operations when there's a parent span from the HTTP
middleware — no manual tracing.SqliteSpan() needed per method. The otelsql
SpanFilter ensures background operations (firehose consumer) don't create
orphan spans since they lack a parent span.

Methods updated: GetCursor, SetCursor, DeleteRecord, GetRecord, GetKnownDIDs,
ListRecordsByCollection, BrewCountsByRecipeURI, refCounts, BrewCountsByBeanURI,
BrewCountsByGrinderURI, BrewCountsByBrewerURI, BeanCountsByRoasterURI,
IsBackfilled, MarkBackfilled, UpsertLike, DeleteLike, GetLikeCount,
HasUserLiked, GetUserLikeRKey, UpsertComment, DeleteComment, GetCommentCount.

Also switched existing context-aware methods (UpsertRecord, GetProfile) to
use ExecContext/QueryRowContext instead of Exec/QueryRow.

authored by

Patrick Dewey and committed by tangled.org 26f29d77 e11ed970

+166 -153
+1 -1
cmd/server/main.go
··· 248 248 }, 60*time.Second) 249 249 250 250 // Log known DIDs from database (DIDs discovered via firehose) 251 - if knownDIDsFromDB, err := feedIndex.GetKnownDIDs(); err == nil { 251 + if knownDIDsFromDB, err := feedIndex.GetKnownDIDs(context.Background()); err == nil { 252 252 if len(knownDIDsFromDB) > 0 { 253 253 log.Info(). 254 254 Int("count", len(knownDIDsFromDB)).
+9 -6
internal/firehose/consumer.go
··· 74 74 } 75 75 76 76 // Load cursor from index 77 - if cursor, err := index.GetCursor(); err == nil && cursor > 0 { 77 + if cursor, err := index.GetCursor(context.Background()); err == nil && cursor > 0 { 78 78 c.cursor.Store(cursor) 79 79 log.Info().Int64("cursor", cursor).Msg("firehose: loaded cursor from index") 80 80 } ··· 297 297 298 298 // Persist cursor periodically (every 1000 events) 299 299 if c.eventsReceived.Load()%1000 == 0 { 300 - if err := c.index.SetCursor(event.TimeUS); err != nil { 300 + if err := c.index.SetCursor(context.Background(), event.TimeUS); err != nil { 301 301 log.Warn().Err(err).Msg("firehose: failed to persist cursor") 302 302 } 303 303 } ··· 347 347 if err := json.Unmarshal(commit.Record, &recordData); err == nil { 348 348 if subject, ok := recordData["subject"].(map[string]interface{}); ok { 349 349 if subjectURI, ok := subject["uri"].(string); ok { 350 - if err := c.index.UpsertLike(event.DID, commit.RKey, subjectURI); err != nil { 350 + if err := c.index.UpsertLike(context.Background(), event.DID, commit.RKey, subjectURI); err != nil { 351 351 log.Warn().Err(err).Str("did", event.DID).Str("subject", subjectURI).Msg("failed to index like") 352 352 } 353 353 // Create notification for the like ··· 379 379 if parent, ok := recordData["parent"].(map[string]interface{}); ok { 380 380 parentURI, _ = parent["uri"].(string) 381 381 } 382 - if err := c.index.UpsertComment(event.DID, commit.RKey, subjectURI, parentURI, commit.CID, text, createdAt); err != nil { 382 + if err := c.index.UpsertComment(context.Background(), event.DID, commit.RKey, subjectURI, parentURI, commit.CID, text, createdAt); err != nil { 383 383 log.Warn().Err(err).Str("did", event.DID).Str("subject", subjectURI).Msg("failed to index comment") 384 384 } 385 385 // Create notification for the comment ··· 394 394 if commit.Collection == "social.arabica.alpha.like" { 395 395 // Try to get the existing record to find its subject 396 396 if existingRecord, err := c.index.GetRecord( 397 + context.Background(), 397 398 fmt.Sprintf("at://%s/%s/%s", event.DID, commit.Collection, commit.RKey), 398 399 ); err == nil && existingRecord != nil { 399 400 var recordData map[string]interface{} 400 401 if err := json.Unmarshal(existingRecord.Record, &recordData); err == nil { 401 402 if subject, ok := recordData["subject"].(map[string]interface{}); ok { 402 403 if subjectURI, ok := subject["uri"].(string); ok { 403 - if err := c.index.DeleteLike(event.DID, subjectURI); err != nil { 404 + if err := c.index.DeleteLike(context.Background(), event.DID, subjectURI); err != nil { 404 405 log.Warn().Err(err).Str("did", event.DID).Str("subject", subjectURI).Msg("failed to delete like index") 405 406 } 406 407 c.index.DeleteLikeNotification(event.DID, subjectURI) ··· 414 415 if commit.Collection == "social.arabica.alpha.comment" { 415 416 // Try to get the existing record to find its subject 416 417 if existingRecord, err := c.index.GetRecord( 418 + context.Background(), 417 419 fmt.Sprintf("at://%s/%s/%s", event.DID, commit.Collection, commit.RKey), 418 420 ); err == nil && existingRecord != nil { 419 421 var recordData map[string]interface{} ··· 424 426 if parent, ok := recordData["parent"].(map[string]interface{}); ok { 425 427 parentURI, _ = parent["uri"].(string) 426 428 } 427 - if err := c.index.DeleteComment(event.DID, commit.RKey, subjectURI); err != nil { 429 + if err := c.index.DeleteComment(context.Background(), event.DID, commit.RKey, subjectURI); err != nil { 428 430 log.Warn().Err(err).Str("did", event.DID).Str("subject", subjectURI).Msg("failed to delete comment index") 429 431 } 430 432 c.index.DeleteCommentNotification(event.DID, subjectURI, parentURI) ··· 435 437 } 436 438 437 439 if err := c.index.DeleteRecord( 440 + context.Background(), 438 441 event.DID, 439 442 commit.Collection, 440 443 commit.RKey,
+59 -59
internal/firehose/index.go
··· 365 365 } 366 366 367 367 // GetCursor returns the last processed cursor (microseconds timestamp) 368 - func (idx *FeedIndex) GetCursor() (int64, error) { 368 + func (idx *FeedIndex) GetCursor(ctx context.Context) (int64, error) { 369 369 var cursor int64 370 - err := idx.db.QueryRow(`SELECT value FROM meta WHERE key = 'cursor'`).Scan(&cursor) 370 + err := idx.db.QueryRowContext(ctx, `SELECT value FROM meta WHERE key = 'cursor'`).Scan(&cursor) 371 371 if err == sql.ErrNoRows { 372 372 return 0, nil 373 373 } ··· 375 375 } 376 376 377 377 // SetCursor stores the cursor position 378 - func (idx *FeedIndex) SetCursor(cursor int64) error { 379 - _, err := idx.db.Exec(`INSERT OR REPLACE INTO meta (key, value) VALUES ('cursor', ?)`, cursor) 378 + func (idx *FeedIndex) SetCursor(ctx context.Context, cursor int64) error { 379 + _, err := idx.db.ExecContext(ctx, `INSERT OR REPLACE INTO meta (key, value) VALUES ('cursor', ?)`, cursor) 380 380 return err 381 381 } 382 382 ··· 415 415 416 416 now := time.Now() 417 417 418 - _, err := idx.db.Exec(stmt, uri, did, collection, rkey, string(record), cid, 418 + _, err := idx.db.ExecContext(ctx, stmt, uri, did, collection, rkey, string(record), cid, 419 419 now.Format(time.RFC3339Nano), createdAt.Format(time.RFC3339Nano)) 420 420 if err != nil { 421 421 tracing.EndWithError(span, err) ··· 423 423 } 424 424 425 425 // Track known DID 426 - _, err = idx.db.Exec(`INSERT OR IGNORE INTO known_dids (did) VALUES (?)`, did) 426 + _, err = idx.db.ExecContext(ctx, `INSERT OR IGNORE INTO known_dids (did) VALUES (?)`, did) 427 427 if err != nil { 428 428 tracing.EndWithError(span, err) 429 429 return fmt.Errorf("failed to track known DID: %w", err) ··· 433 433 } 434 434 435 435 // DeleteRecord removes a record from the index 436 - func (idx *FeedIndex) DeleteRecord(did, collection, rkey string) error { 436 + func (idx *FeedIndex) DeleteRecord(ctx context.Context, did, collection, rkey string) error { 437 437 uri := atproto.BuildATURI(did, collection, rkey) 438 - _, err := idx.db.Exec(`DELETE FROM records WHERE uri = ?`, uri) 438 + _, err := idx.db.ExecContext(ctx, `DELETE FROM records WHERE uri = ?`, uri) 439 439 return err 440 440 } 441 441 ··· 522 522 } 523 523 524 524 // DeleteWitnessRecord implements atproto.WitnessCache for write-through caching. 525 - func (idx *FeedIndex) DeleteWitnessRecord(_ context.Context, did, collection, rkey string) error { 526 - return idx.DeleteRecord(did, collection, rkey) 525 + func (idx *FeedIndex) DeleteWitnessRecord(ctx context.Context, did, collection, rkey string) error { 526 + return idx.DeleteRecord(ctx, did, collection, rkey) 527 527 } 528 528 529 529 // GetRecord retrieves a single record by URI 530 - func (idx *FeedIndex) GetRecord(uri string) (*IndexedRecord, error) { 530 + func (idx *FeedIndex) GetRecord(ctx context.Context, uri string) (*IndexedRecord, error) { 531 531 var rec IndexedRecord 532 532 var recordStr, indexedAtStr, createdAtStr string 533 533 534 - err := idx.db.QueryRow(` 534 + err := idx.db.QueryRowContext(ctx, ` 535 535 SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at 536 536 FROM records WHERE uri = ? 537 537 `, uri).Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey, ··· 967 967 // Populate like-related fields for all record types 968 968 item.SubjectURI = record.URI 969 969 item.SubjectCID = record.CID 970 - item.LikeCount = idx.GetLikeCount(record.URI) 971 - item.CommentCount = idx.GetCommentCount(record.URI) 970 + item.LikeCount = idx.GetLikeCount(ctx, record.URI) 971 + item.CommentCount = idx.GetCommentCount(ctx, record.URI) 972 972 973 973 return item, nil 974 974 } ··· 985 985 986 986 // Check persistent cache 987 987 var dataStr, expiresAtStr string 988 - err := idx.db.QueryRow(`SELECT data, expires_at FROM profiles WHERE did = ?`, did).Scan(&dataStr, &expiresAtStr) 988 + err := idx.db.QueryRowContext(ctx, `SELECT data, expires_at FROM profiles WHERE did = ?`, did).Scan(&dataStr, &expiresAtStr) 989 989 if err == nil { 990 990 expiresAt, _ := time.Parse(time.RFC3339Nano, expiresAtStr) 991 991 if time.Now().Before(expiresAt) { ··· 1020 1020 1021 1021 // Persist to database 1022 1022 data, _ := json.Marshal(cached) 1023 - _, _ = idx.db.Exec(`INSERT OR REPLACE INTO profiles (did, data, expires_at) VALUES (?, ?, ?)`, 1023 + _, _ = idx.db.ExecContext(ctx, `INSERT OR REPLACE INTO profiles (did, data, expires_at) VALUES (?, ?, ?)`, 1024 1024 did, string(data), cached.ExpiresAt.Format(time.RFC3339Nano)) 1025 1025 1026 1026 return profile, nil 1027 1027 } 1028 1028 1029 1029 // GetKnownDIDs returns all DIDs that have created Arabica records 1030 - func (idx *FeedIndex) GetKnownDIDs() ([]string, error) { 1031 - rows, err := idx.db.Query(`SELECT did FROM known_dids`) 1030 + func (idx *FeedIndex) GetKnownDIDs(ctx context.Context) ([]string, error) { 1031 + rows, err := idx.db.QueryContext(ctx, `SELECT did FROM known_dids`) 1032 1032 if err != nil { 1033 1033 return nil, err 1034 1034 } ··· 1046 1046 } 1047 1047 1048 1048 // ListRecordsByCollection returns all indexed records for a given collection. 1049 - func (idx *FeedIndex) ListRecordsByCollection(collection string) ([]IndexedRecord, error) { 1050 - rows, err := idx.db.Query(` 1049 + func (idx *FeedIndex) ListRecordsByCollection(ctx context.Context, collection string) ([]IndexedRecord, error) { 1050 + rows, err := idx.db.QueryContext(ctx, ` 1051 1051 SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at 1052 1052 FROM records WHERE collection = ? ORDER BY created_at DESC 1053 1053 `, collection) ··· 1120 1120 1121 1121 // BrewCountsByRecipeURI returns a map of recipe AT-URI -> number of brews referencing that recipe. 1122 1122 // Uses SQLite json_extract to efficiently query the recipeRef field in brew records. 1123 - func (idx *FeedIndex) BrewCountsByRecipeURI() map[string]int { 1123 + func (idx *FeedIndex) BrewCountsByRecipeURI(ctx context.Context) map[string]int { 1124 1124 counts := make(map[string]int) 1125 - rows, err := idx.db.Query(` 1125 + rows, err := idx.db.QueryContext(ctx, ` 1126 1126 SELECT json_extract(record, '$.recipeRef') as recipe_uri, COUNT(*) as cnt 1127 1127 FROM records 1128 1128 WHERE collection = 'social.arabica.alpha.brew' ··· 1146 1146 // refCounts returns a map of ref AT-URI -> count of records in the given collection 1147 1147 // that reference it via the specified JSON field. If did is non-empty, only records 1148 1148 // owned by that DID are counted. 1149 - func (idx *FeedIndex) refCounts(collection, jsonField, did string) map[string]int { 1149 + func (idx *FeedIndex) refCounts(ctx context.Context, collection, jsonField, did string) map[string]int { 1150 1150 counts := make(map[string]int) 1151 1151 var rows *sql.Rows 1152 1152 var err error 1153 1153 if did != "" { 1154 - rows, err = idx.db.Query(fmt.Sprintf(` 1154 + rows, err = idx.db.QueryContext(ctx, fmt.Sprintf(` 1155 1155 SELECT json_extract(record, '$.%s') as ref_uri, COUNT(*) as cnt 1156 1156 FROM records 1157 1157 WHERE collection = ? AND did = ? ··· 1159 1159 GROUP BY ref_uri 1160 1160 `, jsonField), collection, did) 1161 1161 } else { 1162 - rows, err = idx.db.Query(fmt.Sprintf(` 1162 + rows, err = idx.db.QueryContext(ctx, fmt.Sprintf(` 1163 1163 SELECT json_extract(record, '$.%s') as ref_uri, COUNT(*) as cnt 1164 1164 FROM records 1165 1165 WHERE collection = ? ··· 1183 1183 1184 1184 // BrewCountsByBeanURI returns a map of bean AT-URI -> number of brews referencing that bean. 1185 1185 // If did is non-empty, only brews owned by that DID are counted. 1186 - func (idx *FeedIndex) BrewCountsByBeanURI(did string) map[string]int { 1187 - return idx.refCounts("social.arabica.alpha.brew", "beanRef", did) 1186 + func (idx *FeedIndex) BrewCountsByBeanURI(ctx context.Context, did string) map[string]int { 1187 + return idx.refCounts(ctx, "social.arabica.alpha.brew", "beanRef", did) 1188 1188 } 1189 1189 1190 1190 // BrewCountsByGrinderURI returns a map of grinder AT-URI -> number of brews referencing that grinder. 1191 1191 // If did is non-empty, only brews owned by that DID are counted. 1192 - func (idx *FeedIndex) BrewCountsByGrinderURI(did string) map[string]int { 1193 - return idx.refCounts("social.arabica.alpha.brew", "grinderRef", did) 1192 + func (idx *FeedIndex) BrewCountsByGrinderURI(ctx context.Context, did string) map[string]int { 1193 + return idx.refCounts(ctx, "social.arabica.alpha.brew", "grinderRef", did) 1194 1194 } 1195 1195 1196 1196 // BrewCountsByBrewerURI returns a map of brewer AT-URI -> number of brews referencing that brewer. 1197 1197 // If did is non-empty, only brews owned by that DID are counted. 1198 - func (idx *FeedIndex) BrewCountsByBrewerURI(did string) map[string]int { 1199 - return idx.refCounts("social.arabica.alpha.brew", "brewerRef", did) 1198 + func (idx *FeedIndex) BrewCountsByBrewerURI(ctx context.Context, did string) map[string]int { 1199 + return idx.refCounts(ctx, "social.arabica.alpha.brew", "brewerRef", did) 1200 1200 } 1201 1201 1202 1202 // BeanCountsByRoasterURI returns a map of roaster AT-URI -> number of beans referencing that roaster. 1203 1203 // If did is non-empty, only beans owned by that DID are counted. 1204 - func (idx *FeedIndex) BeanCountsByRoasterURI(did string) map[string]int { 1205 - return idx.refCounts("social.arabica.alpha.bean", "roasterRef", did) 1204 + func (idx *FeedIndex) BeanCountsByRoasterURI(ctx context.Context, did string) map[string]int { 1205 + return idx.refCounts(ctx, "social.arabica.alpha.bean", "roasterRef", did) 1206 1206 } 1207 1207 1208 1208 func formatTimeAgo(t time.Time) string { ··· 1245 1245 } 1246 1246 1247 1247 // IsBackfilled checks if a DID has already been backfilled 1248 - func (idx *FeedIndex) IsBackfilled(did string) bool { 1248 + func (idx *FeedIndex) IsBackfilled(ctx context.Context, did string) bool { 1249 1249 var exists int 1250 - err := idx.db.QueryRow(`SELECT 1 FROM backfilled WHERE did = ?`, did).Scan(&exists) 1250 + err := idx.db.QueryRowContext(ctx, `SELECT 1 FROM backfilled WHERE did = ?`, did).Scan(&exists) 1251 1251 return err == nil 1252 1252 } 1253 1253 1254 1254 // MarkBackfilled marks a DID as backfilled with current timestamp 1255 - func (idx *FeedIndex) MarkBackfilled(did string) error { 1256 - _, err := idx.db.Exec(`INSERT OR IGNORE INTO backfilled (did, backfilled_at) VALUES (?, ?)`, 1255 + func (idx *FeedIndex) MarkBackfilled(ctx context.Context, did string) error { 1256 + _, err := idx.db.ExecContext(ctx, `INSERT OR IGNORE INTO backfilled (did, backfilled_at) VALUES (?, ?)`, 1257 1257 did, time.Now().Format(time.RFC3339)) 1258 1258 return err 1259 1259 } 1260 1260 1261 1261 // BackfillUser fetches all existing records for a DID and adds them to the index 1262 1262 func (idx *FeedIndex) BackfillUser(ctx context.Context, did string) error { 1263 - if idx.IsBackfilled(did) { 1263 + if idx.IsBackfilled(ctx, did) { 1264 1264 log.Debug().Str("did", did).Msg("DID already backfilled, skipping") 1265 1265 return nil 1266 1266 } ··· 1297 1297 case atproto.NSIDLike: 1298 1298 if subject, ok := record.Value["subject"].(map[string]interface{}); ok { 1299 1299 if subjectURI, ok := subject["uri"].(string); ok { 1300 - if err := idx.UpsertLike(did, rkey, subjectURI); err != nil { 1300 + if err := idx.UpsertLike(ctx, did, rkey, subjectURI); err != nil { 1301 1301 log.Warn().Err(err).Str("uri", record.URI).Msg("failed to index like during backfill") 1302 1302 } 1303 1303 } ··· 1320 1320 if parent, ok := record.Value["parent"].(map[string]interface{}); ok { 1321 1321 parentURI, _ = parent["uri"].(string) 1322 1322 } 1323 - if err := idx.UpsertComment(did, rkey, subjectURI, parentURI, record.CID, text, createdAt); err != nil { 1323 + if err := idx.UpsertComment(ctx, did, rkey, subjectURI, parentURI, record.CID, text, createdAt); err != nil { 1324 1324 log.Warn().Err(err).Str("uri", record.URI).Msg("failed to index comment during backfill") 1325 1325 } 1326 1326 } ··· 1329 1329 } 1330 1330 } 1331 1331 1332 - if err := idx.MarkBackfilled(did); err != nil { 1332 + if err := idx.MarkBackfilled(ctx, did); err != nil { 1333 1333 log.Warn().Err(err).Str("did", did).Msg("failed to mark DID as backfilled") 1334 1334 } 1335 1335 ··· 1340 1340 // ========== Like Indexing Methods ========== 1341 1341 1342 1342 // UpsertLike adds or updates a like in the index 1343 - func (idx *FeedIndex) UpsertLike(actorDID, rkey, subjectURI string) error { 1344 - _, err := idx.db.Exec(`INSERT OR IGNORE INTO likes (subject_uri, actor_did, rkey) VALUES (?, ?, ?)`, 1343 + func (idx *FeedIndex) UpsertLike(ctx context.Context, actorDID, rkey, subjectURI string) error { 1344 + _, err := idx.db.ExecContext(ctx, `INSERT OR IGNORE INTO likes (subject_uri, actor_did, rkey) VALUES (?, ?, ?)`, 1345 1345 subjectURI, actorDID, rkey) 1346 1346 return err 1347 1347 } 1348 1348 1349 1349 // DeleteLike removes a like from the index 1350 - func (idx *FeedIndex) DeleteLike(actorDID, subjectURI string) error { 1351 - _, err := idx.db.Exec(`DELETE FROM likes WHERE subject_uri = ? AND actor_did = ?`, 1350 + func (idx *FeedIndex) DeleteLike(ctx context.Context, actorDID, subjectURI string) error { 1351 + _, err := idx.db.ExecContext(ctx, `DELETE FROM likes WHERE subject_uri = ? AND actor_did = ?`, 1352 1352 subjectURI, actorDID) 1353 1353 return err 1354 1354 } 1355 1355 1356 1356 // GetLikeCount returns the number of likes for a record 1357 - func (idx *FeedIndex) GetLikeCount(subjectURI string) int { 1357 + func (idx *FeedIndex) GetLikeCount(ctx context.Context, subjectURI string) int { 1358 1358 var count int 1359 - _ = idx.db.QueryRow(`SELECT COUNT(*) FROM likes WHERE subject_uri = ?`, subjectURI).Scan(&count) 1359 + _ = idx.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM likes WHERE subject_uri = ?`, subjectURI).Scan(&count) 1360 1360 return count 1361 1361 } 1362 1362 1363 1363 // HasUserLiked checks if a user has liked a specific record 1364 - func (idx *FeedIndex) HasUserLiked(actorDID, subjectURI string) bool { 1364 + func (idx *FeedIndex) HasUserLiked(ctx context.Context, actorDID, subjectURI string) bool { 1365 1365 var exists int 1366 - err := idx.db.QueryRow(`SELECT 1 FROM likes WHERE actor_did = ? AND subject_uri = ? LIMIT 1`, 1366 + err := idx.db.QueryRowContext(ctx, `SELECT 1 FROM likes WHERE actor_did = ? AND subject_uri = ? LIMIT 1`, 1367 1367 actorDID, subjectURI).Scan(&exists) 1368 1368 return err == nil 1369 1369 } 1370 1370 1371 1371 // GetUserLikeRKey returns the rkey of a user's like for a specific record, or empty string if not found 1372 - func (idx *FeedIndex) GetUserLikeRKey(actorDID, subjectURI string) string { 1372 + func (idx *FeedIndex) GetUserLikeRKey(ctx context.Context, actorDID, subjectURI string) string { 1373 1373 var rkey string 1374 - err := idx.db.QueryRow(`SELECT rkey FROM likes WHERE actor_did = ? AND subject_uri = ?`, 1374 + err := idx.db.QueryRowContext(ctx, `SELECT rkey FROM likes WHERE actor_did = ? AND subject_uri = ?`, 1375 1375 actorDID, subjectURI).Scan(&rkey) 1376 1376 if err != nil { 1377 1377 return "" ··· 1403 1403 } 1404 1404 1405 1405 // UpsertComment adds or updates a comment in the index 1406 - func (idx *FeedIndex) UpsertComment(actorDID, rkey, subjectURI, parentURI, cid, text string, createdAt time.Time) error { 1406 + func (idx *FeedIndex) UpsertComment(ctx context.Context, actorDID, rkey, subjectURI, parentURI, cid, text string, createdAt time.Time) error { 1407 1407 // Extract parent rkey from parent URI if present 1408 1408 var parentRKey string 1409 1409 if parentURI != "" { ··· 1413 1413 } 1414 1414 } 1415 1415 1416 - _, err := idx.db.Exec(` 1416 + _, err := idx.db.ExecContext(ctx, ` 1417 1417 INSERT INTO comments (actor_did, rkey, subject_uri, parent_uri, parent_rkey, cid, text, created_at) 1418 1418 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 1419 1419 ON CONFLICT(actor_did, rkey) DO UPDATE SET ··· 1428 1428 } 1429 1429 1430 1430 // DeleteComment removes a comment from the index 1431 - func (idx *FeedIndex) DeleteComment(actorDID, rkey, subjectURI string) error { 1432 - _, err := idx.db.Exec(`DELETE FROM comments WHERE actor_did = ? AND rkey = ?`, actorDID, rkey) 1431 + func (idx *FeedIndex) DeleteComment(ctx context.Context, actorDID, rkey, subjectURI string) error { 1432 + _, err := idx.db.ExecContext(ctx, `DELETE FROM comments WHERE actor_did = ? AND rkey = ?`, actorDID, rkey) 1433 1433 return err 1434 1434 } 1435 1435 1436 1436 // GetCommentCount returns the number of comments on a record 1437 - func (idx *FeedIndex) GetCommentCount(subjectURI string) int { 1437 + func (idx *FeedIndex) GetCommentCount(ctx context.Context, subjectURI string) int { 1438 1438 var count int 1439 - _ = idx.db.QueryRow(`SELECT COUNT(*) FROM comments WHERE subject_uri = ?`, subjectURI).Scan(&count) 1439 + _ = idx.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM comments WHERE subject_uri = ?`, subjectURI).Scan(&count) 1440 1440 return count 1441 1441 } 1442 1442 ··· 1481 1481 } 1482 1482 1483 1483 commentURI := fmt.Sprintf("at://%s/social.arabica.alpha.comment/%s", comments[i].ActorDID, comments[i].RKey) 1484 - comments[i].LikeCount = idx.GetLikeCount(commentURI) 1484 + comments[i].LikeCount = idx.GetLikeCount(ctx, commentURI) 1485 1485 if viewerDID != "" { 1486 - comments[i].IsLiked = idx.HasUserLiked(viewerDID, commentURI) 1486 + comments[i].IsLiked = idx.HasUserLiked(ctx, viewerDID, commentURI) 1487 1487 } 1488 1488 } 1489 1489
+37 -28
internal/firehose/index_test.go
··· 17 17 } 18 18 defer idx.Close() 19 19 20 + ctx := context.Background() 20 21 testDID := "did:plc:test123abc" 21 22 22 23 // Initially should not be backfilled 23 - if idx.IsBackfilled(testDID) { 24 + if idx.IsBackfilled(ctx, testDID) { 24 25 t.Error("DID should not be backfilled initially") 25 26 } 26 27 27 28 // Mark as backfilled 28 - if err := idx.MarkBackfilled(testDID); err != nil { 29 + if err := idx.MarkBackfilled(ctx, testDID); err != nil { 29 30 t.Fatalf("Failed to mark DID as backfilled: %v", err) 30 31 } 31 32 32 33 // Now should be backfilled 33 - if !idx.IsBackfilled(testDID) { 34 + if !idx.IsBackfilled(ctx, testDID) { 34 35 t.Error("DID should be marked as backfilled") 35 36 } 36 37 37 38 // Different DID should not be backfilled 38 39 otherDID := "did:plc:other456def" 39 - if idx.IsBackfilled(otherDID) { 40 + if idx.IsBackfilled(ctx, otherDID) { 40 41 t.Error("Other DID should not be backfilled") 41 42 } 42 43 } ··· 53 54 t.Fatalf("Failed to create index: %v", err) 54 55 } 55 56 56 - if err := idx.MarkBackfilled(testDID); err != nil { 57 + if err := idx.MarkBackfilled(context.Background(), testDID); err != nil { 57 58 t.Fatalf("Failed to mark DID as backfilled: %v", err) 58 59 } 59 60 ··· 68 69 } 69 70 defer idx.Close() 70 71 71 - if !idx.IsBackfilled(testDID) { 72 + if !idx.IsBackfilled(context.Background(), testDID) { 72 73 t.Error("DID should still be marked as backfilled after reopening") 73 74 } 74 75 } ··· 89 90 "did:plc:user3", 90 91 } 91 92 93 + ctx := context.Background() 94 + 92 95 // Mark all as backfilled 93 96 for _, did := range dids { 94 - if err := idx.MarkBackfilled(did); err != nil { 97 + if err := idx.MarkBackfilled(ctx, did); err != nil { 95 98 t.Fatalf("Failed to mark DID %s as backfilled: %v", did, err) 96 99 } 97 100 } 98 101 99 102 // Verify all are marked 100 103 for _, did := range dids { 101 - if !idx.IsBackfilled(did) { 104 + if !idx.IsBackfilled(ctx, did) { 102 105 t.Errorf("DID %s should be marked as backfilled", did) 103 106 } 104 107 } ··· 116 119 117 120 // Create a top-level comment 118 121 now := time.Now() 119 - err = idx.UpsertComment(actorDID, "comment1", subjectURI, "", "cid1", "Top level comment", now) 122 + err = idx.UpsertComment(ctx, actorDID, "comment1", subjectURI, "", "cid1", "Top level comment", now) 120 123 assert.NoError(t, err) 121 124 122 125 // Create a reply to the top-level comment 123 126 parentURI := "at://did:plc:commenter1/social.arabica.alpha.comment/comment1" 124 - err = idx.UpsertComment("did:plc:commenter2", "comment2", subjectURI, parentURI, "cid2", "Reply to comment", now.Add(time.Second)) 127 + err = idx.UpsertComment(ctx, "did:plc:commenter2", "comment2", subjectURI, parentURI, "cid2", "Reply to comment", now.Add(time.Second)) 125 128 assert.NoError(t, err) 126 129 127 130 // Create a nested reply (depth 2) 128 131 parentURI2 := "at://did:plc:commenter2/social.arabica.alpha.comment/comment2" 129 - err = idx.UpsertComment("did:plc:commenter3", "comment3", subjectURI, parentURI2, "cid3", "Nested reply", now.Add(2*time.Second)) 132 + err = idx.UpsertComment(ctx, "did:plc:commenter3", "comment3", subjectURI, parentURI2, "cid3", "Nested reply", now.Add(2*time.Second)) 130 133 assert.NoError(t, err) 131 134 132 135 // Get threaded comments ··· 145 148 assert.Equal(t, 2, comments[2].Depth) 146 149 147 150 // Verify comment count 148 - count := idx.GetCommentCount(subjectURI) 151 + count := idx.GetCommentCount(ctx, subjectURI) 149 152 assert.Equal(t, 3, count) 150 153 } 151 154 ··· 163 166 parentURI := "" 164 167 for i := 0; i < 5; i++ { 165 168 rkey := "comment" + string(rune('A'+i)) 166 - err = idx.UpsertComment("did:plc:user", rkey, subjectURI, parentURI, "cid"+rkey, "Comment", now.Add(time.Duration(i)*time.Second)) 169 + err = idx.UpsertComment(ctx, "did:plc:user", rkey, subjectURI, parentURI, "cid"+rkey, "Comment", now.Add(time.Duration(i)*time.Second)) 167 170 assert.NoError(t, err) 168 171 parentURI = "at://did:plc:user/social.arabica.alpha.comment/" + rkey 169 172 } ··· 192 195 now := time.Now() 193 196 194 197 // Create two top-level comments 195 - err = idx.UpsertComment("did:plc:user1", "topA", subjectURI, "", "cidA", "First top comment", now) 198 + err = idx.UpsertComment(ctx, "did:plc:user1", "topA", subjectURI, "", "cidA", "First top comment", now) 196 199 assert.NoError(t, err) 197 - err = idx.UpsertComment("did:plc:user2", "topB", subjectURI, "", "cidB", "Second top comment", now.Add(5*time.Second)) 200 + err = idx.UpsertComment(ctx, "did:plc:user2", "topB", subjectURI, "", "cidB", "Second top comment", now.Add(5*time.Second)) 198 201 assert.NoError(t, err) 199 202 200 203 // Reply to first top-level comment 201 - err = idx.UpsertComment("did:plc:user3", "replyA1", subjectURI, "at://did:plc:user1/social.arabica.alpha.comment/topA", "cidA1", "Reply to first", now.Add(2*time.Second)) 204 + err = idx.UpsertComment(ctx, "did:plc:user3", "replyA1", subjectURI, "at://did:plc:user1/social.arabica.alpha.comment/topA", "cidA1", "Reply to first", now.Add(2*time.Second)) 202 205 assert.NoError(t, err) 203 206 204 207 // Reply to second top-level comment 205 - err = idx.UpsertComment("did:plc:user4", "replyB1", subjectURI, "at://did:plc:user2/social.arabica.alpha.comment/topB", "cidB1", "Reply to second", now.Add(6*time.Second)) 208 + err = idx.UpsertComment(ctx, "did:plc:user4", "replyB1", subjectURI, "at://did:plc:user2/social.arabica.alpha.comment/topB", "cidB1", "Reply to second", now.Add(6*time.Second)) 206 209 assert.NoError(t, err) 207 210 208 211 // Get threaded comments ··· 238 241 err = idx.UpsertRecord(context.Background(), did, collection, rkey, "cid123", record, time.Now().Unix()) 239 242 assert.NoError(t, err) 240 243 244 + ctx := context.Background() 245 + 241 246 // Verify it exists 242 247 uri := "at://" + did + "/" + collection + "/" + rkey 243 - rec, err := idx.GetRecord(uri) 248 + rec, err := idx.GetRecord(ctx, uri) 244 249 assert.NoError(t, err) 245 250 assert.NotNil(t, rec, "record should exist after upsert") 246 251 247 252 // Verify it appears in collection listing 248 - records, err := idx.ListRecordsByCollection(collection) 253 + records, err := idx.ListRecordsByCollection(ctx, collection) 249 254 assert.NoError(t, err) 250 255 assert.Len(t, records, 1) 251 256 252 257 // Delete the record 253 - err = idx.DeleteRecord(did, collection, rkey) 258 + err = idx.DeleteRecord(ctx, did, collection, rkey) 254 259 assert.NoError(t, err) 255 260 256 261 // Verify it no longer exists via GetRecord 257 - rec, err = idx.GetRecord(uri) 262 + rec, err = idx.GetRecord(ctx, uri) 258 263 assert.NoError(t, err) 259 264 assert.Nil(t, rec, "record should not exist after delete") 260 265 261 266 // Verify it no longer appears in collection listing 262 - records, err = idx.ListRecordsByCollection(collection) 267 + records, err = idx.ListRecordsByCollection(ctx, collection) 263 268 assert.NoError(t, err) 264 269 assert.Len(t, records, 0, "deleted record should not appear in collection listing") 265 270 ··· 286 291 287 292 assert.Equal(t, 2, idx.RecordCount()) 288 293 294 + ctx := context.Background() 295 + 289 296 // Delete only the first record 290 - err = idx.DeleteRecord(did, collection, "bean1") 297 + err = idx.DeleteRecord(ctx, did, collection, "bean1") 291 298 assert.NoError(t, err) 292 299 293 300 // Second record should still exist 294 301 uri2 := "at://" + did + "/" + collection + "/bean2" 295 - rec, err := idx.GetRecord(uri2) 302 + rec, err := idx.GetRecord(ctx, uri2) 296 303 assert.NoError(t, err) 297 304 assert.NotNil(t, rec, "second record should still exist after deleting first") 298 305 299 306 // Only one record should remain 300 307 assert.Equal(t, 1, idx.RecordCount()) 301 308 302 - records, err := idx.ListRecordsByCollection(collection) 309 + records, err := idx.ListRecordsByCollection(ctx, collection) 303 310 assert.NoError(t, err) 304 311 assert.Len(t, records, 1) 305 312 assert.Equal(t, "bean2", records[0].RKey) ··· 312 319 defer idx.Close() 313 320 314 321 // Deleting a record that doesn't exist should not error 315 - err = idx.DeleteRecord("did:plc:nobody", "social.arabica.alpha.bean", "nonexistent") 322 + err = idx.DeleteRecord(context.Background(), "did:plc:nobody", "social.arabica.alpha.bean", "nonexistent") 316 323 assert.NoError(t, err) 317 324 } 318 325 ··· 346 353 347 354 assert.Equal(t, 6, idx.RecordCount()) 348 355 356 + ctx := context.Background() 357 + 349 358 // Delete each record and verify it's gone 350 359 for _, tt := range types { 351 - err := idx.DeleteRecord(did, tt.collection, tt.rkey) 360 + err := idx.DeleteRecord(ctx, did, tt.collection, tt.rkey) 352 361 assert.NoError(t, err, "failed to delete %s/%s", tt.collection, tt.rkey) 353 362 354 363 uri := "at://" + did + "/" + tt.collection + "/" + tt.rkey 355 - rec, err := idx.GetRecord(uri) 364 + rec, err := idx.GetRecord(ctx, uri) 356 365 assert.NoError(t, err) 357 366 assert.Nil(t, rec, "%s should not exist after delete", tt.collection) 358 367 }
+3 -3
internal/handlers/brew.go
··· 264 264 var isLiked bool 265 265 var likeCount int 266 266 if h.feedIndex != nil && subjectURI != "" { 267 - likeCount = h.feedIndex.GetLikeCount(subjectURI) 267 + likeCount = h.feedIndex.GetLikeCount(r.Context(), subjectURI) 268 268 if isAuthenticated { 269 - isLiked = h.feedIndex.HasUserLiked(didStr, subjectURI) 269 + isLiked = h.feedIndex.HasUserLiked(r.Context(), didStr, subjectURI) 270 270 } 271 271 } 272 272 ··· 274 274 var commentCount int 275 275 var comments []firehose.IndexedComment 276 276 if h.feedIndex != nil && subjectURI != "" { 277 - commentCount = h.feedIndex.GetCommentCount(subjectURI) 277 + commentCount = h.feedIndex.GetCommentCount(r.Context(), subjectURI) 278 278 comments = h.feedIndex.GetThreadedCommentsForSubject(r.Context(), subjectURI, 100, didStr) 279 279 comments = h.filterHiddenComments(r.Context(), comments) 280 280 }
+7 -7
internal/handlers/entity_views.go
··· 35 35 var sd socialData 36 36 37 37 if h.feedIndex != nil && subjectURI != "" { 38 - sd.LikeCount = h.feedIndex.GetLikeCount(subjectURI) 39 - sd.CommentCount = h.feedIndex.GetCommentCount(subjectURI) 38 + sd.LikeCount = h.feedIndex.GetLikeCount(ctx, subjectURI) 39 + sd.CommentCount = h.feedIndex.GetCommentCount(ctx, subjectURI) 40 40 sd.Comments = h.feedIndex.GetThreadedCommentsForSubject(ctx, subjectURI, 100, didStr) 41 41 sd.Comments = h.filterHiddenComments(ctx, sd.Comments) 42 42 if isAuthenticated { 43 - sd.IsLiked = h.feedIndex.HasUserLiked(didStr, subjectURI) 43 + sd.IsLiked = h.feedIndex.HasUserLiked(ctx, didStr, subjectURI) 44 44 } 45 45 } 46 46 ··· 229 229 if ownerDID == "" { 230 230 ownerDID = didStr 231 231 } 232 - counts := h.feedIndex.BrewCountsByBeanURI(ownerDID) 232 + counts := h.feedIndex.BrewCountsByBeanURI(r.Context(), ownerDID) 233 233 beanViewProps.BrewCount = counts[subjectURI] 234 234 } 235 235 ··· 361 361 if ownerDID == "" { 362 362 ownerDID = didStr 363 363 } 364 - counts := h.feedIndex.BeanCountsByRoasterURI(ownerDID) 364 + counts := h.feedIndex.BeanCountsByRoasterURI(r.Context(), ownerDID) 365 365 props.BeanCount = counts[subjectURI] 366 366 } 367 367 ··· 493 493 if ownerDID == "" { 494 494 ownerDID = didStr 495 495 } 496 - counts := h.feedIndex.BrewCountsByGrinderURI(ownerDID) 496 + counts := h.feedIndex.BrewCountsByGrinderURI(r.Context(), ownerDID) 497 497 props.BrewCount = counts[subjectURI] 498 498 } 499 499 ··· 625 625 if ownerDID == "" { 626 626 ownerDID = didStr 627 627 } 628 - counts := h.feedIndex.BrewCountsByBrewerURI(ownerDID) 628 + counts := h.feedIndex.BrewCountsByBrewerURI(r.Context(), ownerDID) 629 629 props.BrewCount = counts[subjectURI] 630 630 } 631 631
+5 -5
internal/handlers/feed.go
··· 123 123 } 124 124 // Check if viewer liked this record 125 125 if h.feedIndex != nil && item.SubjectURI != "" { 126 - item.IsLikedByViewer = h.feedIndex.HasUserLiked(viewerDID, item.SubjectURI) 126 + item.IsLikedByViewer = h.feedIndex.HasUserLiked(r.Context(), viewerDID, item.SubjectURI) 127 127 } 128 128 } 129 129 } ··· 203 203 204 204 // Update firehose index 205 205 if h.feedIndex != nil { 206 - if err := h.feedIndex.DeleteLike(didStr, subjectURI); err != nil { 206 + if err := h.feedIndex.DeleteLike(r.Context(), didStr, subjectURI); err != nil { 207 207 log.Warn().Err(err).Str("did", didStr).Str("subject_uri", subjectURI).Msg("Failed to delete like from feed index") 208 208 } 209 209 h.feedIndex.DeleteLikeNotification(didStr, subjectURI) 210 - likeCount = h.feedIndex.GetLikeCount(subjectURI) 210 + likeCount = h.feedIndex.GetLikeCount(r.Context(), subjectURI) 211 211 } 212 212 } else { 213 213 // Like: create a new like ··· 226 226 227 227 // Update firehose index 228 228 if h.feedIndex != nil { 229 - if err := h.feedIndex.UpsertLike(didStr, like.RKey, subjectURI); err != nil { 229 + if err := h.feedIndex.UpsertLike(r.Context(), didStr, like.RKey, subjectURI); err != nil { 230 230 log.Warn().Err(err).Str("did", didStr).Str("subject_uri", subjectURI).Msg("Failed to upsert like in feed index") 231 231 } 232 - likeCount = h.feedIndex.GetLikeCount(subjectURI) 232 + likeCount = h.feedIndex.GetLikeCount(r.Context(), subjectURI) 233 233 } 234 234 } 235 235
+3 -3
internal/handlers/handlers.go
··· 318 318 if h.feedIndex != nil && collection != "" { 319 319 didStr, _ := atproto.GetAuthenticatedDID(r.Context()) 320 320 if didStr != "" { 321 - if err := h.feedIndex.DeleteRecord(didStr, collection, rkey); err != nil { 321 + if err := h.feedIndex.DeleteRecord(r.Context(), didStr, collection, rkey); err != nil { 322 322 log.Warn().Err(err).Str("rkey", rkey).Str("collection", collection).Msg("Failed to delete record from feed index") 323 323 } 324 324 } ··· 420 420 421 421 // Update firehose index (pass parent URI and comment's CID for threading) 422 422 if h.feedIndex != nil { 423 - if err := h.feedIndex.UpsertComment(didStr, comment.RKey, subjectURI, parentURI, comment.CID, text, comment.CreatedAt); err != nil { 423 + if err := h.feedIndex.UpsertComment(r.Context(), didStr, comment.RKey, subjectURI, parentURI, comment.CID, text, comment.CreatedAt); err != nil { 424 424 log.Warn().Err(err).Str("did", didStr).Str("rkey", comment.RKey).Str("subject_uri", subjectURI).Msg("Failed to upsert comment in feed index") 425 425 } 426 426 // Create notification for the comment/reply ··· 484 484 // Look up subject URI before deletion for notification cleanup 485 485 subjectURI := h.feedIndex.GetCommentSubjectURI(didStr, rkey) 486 486 487 - if err := h.feedIndex.DeleteComment(didStr, rkey, ""); err != nil { 487 + if err := h.feedIndex.DeleteComment(r.Context(), didStr, rkey, ""); err != nil { 488 488 log.Warn().Err(err).Str("did", didStr).Str("rkey", rkey).Msg("Failed to delete comment from feed index") 489 489 } 490 490
+7 -7
internal/handlers/profile.go
··· 579 579 if h.feedIndex != nil && profile != nil { 580 580 for _, brew := range profileData.Brews { 581 581 subjectURI := atproto.BuildATURI(profile.DID, atproto.NSIDBrew, brew.RKey) 582 - brewLikeCounts[brew.RKey] = h.feedIndex.GetLikeCount(subjectURI) 582 + brewLikeCounts[brew.RKey] = h.feedIndex.GetLikeCount(ctx, subjectURI) 583 583 if isAuthenticated { 584 - brewLikedByUser[brew.RKey] = h.feedIndex.HasUserLiked(didStr, subjectURI) 584 + brewLikedByUser[brew.RKey] = h.feedIndex.HasUserLiked(ctx, didStr, subjectURI) 585 585 } 586 586 // Get CID from the firehose index record 587 - if record, err := h.feedIndex.GetRecord(subjectURI); err == nil && record != nil { 587 + if record, err := h.feedIndex.GetRecord(ctx, subjectURI); err == nil && record != nil { 588 588 brewCIDs[brew.RKey] = record.CID 589 589 } 590 590 } 591 591 // Entity usage counts 592 - beanBrewCounts = h.feedIndex.BrewCountsByBeanURI(did) 593 - grinderBrewCounts = h.feedIndex.BrewCountsByGrinderURI(did) 594 - brewerBrewCounts = h.feedIndex.BrewCountsByBrewerURI(did) 595 - roasterBeanCounts = h.feedIndex.BeanCountsByRoasterURI(did) 592 + beanBrewCounts = h.feedIndex.BrewCountsByBeanURI(ctx, did) 593 + grinderBrewCounts = h.feedIndex.BrewCountsByGrinderURI(ctx, did) 594 + brewerBrewCounts = h.feedIndex.BrewCountsByBrewerURI(ctx, did) 595 + roasterBeanCounts = h.feedIndex.BeanCountsByRoasterURI(ctx, did) 596 596 } 597 597 598 598 if err := components.ProfileContentPartial(components.ProfileContentPartialProps{
+4 -4
internal/handlers/recipe.go
··· 159 159 if h.feedIndex != nil { 160 160 didStr, _ := atproto.GetAuthenticatedDID(r.Context()) 161 161 if didStr != "" { 162 - if err := h.feedIndex.DeleteRecord(didStr, atproto.NSIDRecipe, rkey); err != nil { 162 + if err := h.feedIndex.DeleteRecord(r.Context(), didStr, atproto.NSIDRecipe, rkey); err != nil { 163 163 log.Warn().Err(err).Str("rkey", rkey).Msg("Failed to delete recipe from feed index") 164 164 } 165 165 } ··· 488 488 return nil, fmt.Errorf("feed index not available") 489 489 } 490 490 491 - records, err := h.feedIndex.ListRecordsByCollection(atproto.NSIDRecipe) 491 + records, err := h.feedIndex.ListRecordsByCollection(ctx, atproto.NSIDRecipe) 492 492 if err != nil { 493 493 return nil, err 494 494 } ··· 564 564 } 565 565 566 566 // Batch query brew counts per recipe 567 - brewCounts := h.feedIndex.BrewCountsByRecipeURI() 567 + brewCounts := h.feedIndex.BrewCountsByRecipeURI(ctx) 568 568 569 569 // Build final recipe list 570 570 recipes := make([]*models.Recipe, 0, len(parsed)) ··· 576 576 if c, parseErr := atproto.ResolveATURI(brewerRef); parseErr == nil { 577 577 recipe.BrewerRKey = c.RKey 578 578 } 579 - if brewerRec, getErr := h.feedIndex.GetRecord(brewerRef); getErr == nil && brewerRec != nil { 579 + if brewerRec, getErr := h.feedIndex.GetRecord(ctx, brewerRef); getErr == nil && brewerRec != nil { 580 580 var brewerData map[string]interface{} 581 581 if err := json.Unmarshal(brewerRec.Record, &brewerData); err == nil { 582 582 if brewer, err := atproto.RecordToBrewer(brewerData, brewerRef); err == nil {
+1 -1
internal/handlers/suggestions.go
··· 62 62 // community records, not their own data echoed back. 63 63 excludeDID, _ := atproto.GetAuthenticatedDID(r.Context()) 64 64 65 - results, err := suggestions.Search(h.feedIndex, nsid, query, limit, excludeDID) 65 + results, err := suggestions.Search(r.Context(), h.feedIndex, nsid, query, limit, excludeDID) 66 66 if err != nil { 67 67 log.Error().Err(err).Str("entity", entityType).Str("query", query).Msg("Failed to search suggestions") 68 68 http.Error(w, "Failed to search suggestions", http.StatusInternalServerError)
+4 -3
internal/suggestions/suggestions.go
··· 1 1 package suggestions 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "regexp" 6 7 "sort" ··· 20 21 21 22 // RecordSource provides read access to indexed records. 22 23 type RecordSource interface { 23 - ListRecordsByCollection(collection string) ([]firehose.IndexedRecord, error) 24 + ListRecordsByCollection(ctx context.Context, collection string) ([]firehose.IndexedRecord, error) 24 25 } 25 26 26 27 // entityFieldConfig defines which fields to extract and search for each entity type ··· 209 210 // It matches against searchable fields, deduplicates using entity-specific 210 211 // keys, and returns results sorted by popularity. 211 212 // If excludeDID is non-empty, records from that DID are excluded from results. 212 - func Search(source RecordSource, collection, query string, limit int, excludeDID ...string) ([]EntitySuggestion, error) { 213 + func Search(ctx context.Context, source RecordSource, collection, query string, limit int, excludeDID ...string) ([]EntitySuggestion, error) { 213 214 if limit <= 0 { 214 215 limit = 10 215 216 } ··· 229 230 skipDID = excludeDID[0] 230 231 } 231 232 232 - records, err := source.ListRecordsByCollection(collection) 233 + records, err := source.ListRecordsByCollection(ctx, collection) 233 234 if err != nil { 234 235 return nil, err 235 236 }
+26 -26
internal/suggestions/suggestions_test.go
··· 97 97 "location": "New York, NY", 98 98 }) 99 99 100 - results, err := Search(idx, atproto.NSIDRoaster, "stumptown", 10) 100 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "stumptown", 10) 101 101 assert.NoError(t, err) 102 102 assert.Len(t, results, 2, "different locations should produce separate suggestions") 103 103 } ··· 116 116 "location": "Durham, NC", 117 117 }) 118 118 119 - results, err := Search(idx, atproto.NSIDRoaster, "counter", 10) 119 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "counter", 10) 120 120 assert.NoError(t, err) 121 121 assert.Len(t, results, 1, "fuzzy name + same location should merge") 122 122 assert.Equal(t, 2, results[0].Count) ··· 133 133 "name": "Blue Bottle", 134 134 }) 135 135 136 - results, err := Search(idx, atproto.NSIDRoaster, "blue", 10) 136 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "blue", 10) 137 137 assert.NoError(t, err) 138 138 assert.Len(t, results, 1, "same fuzzy name with no location should merge") 139 139 assert.Equal(t, 2, results[0].Count) ··· 155 155 "burrType": "flat", 156 156 }) 157 157 158 - results, err := Search(idx, atproto.NSIDGrinder, "baratza", 10) 158 + results, err := Search(context.Background(), idx,atproto.NSIDGrinder, "baratza", 10) 159 159 assert.NoError(t, err) 160 160 assert.Len(t, results, 2, "different burr types should produce separate suggestions") 161 161 } ··· 174 174 "burrType": "conical", 175 175 }) 176 176 177 - results, err := Search(idx, atproto.NSIDGrinder, "1zp", 10) 177 + results, err := Search(context.Background(), idx,atproto.NSIDGrinder, "1zp", 10) 178 178 assert.NoError(t, err) 179 179 assert.Len(t, results, 1) 180 180 assert.Equal(t, 2, results[0].Count) ··· 194 194 "brewerType": "dripper", 195 195 }) 196 196 197 - results, err := Search(idx, atproto.NSIDBrewer, "hario", 10) 197 + results, err := Search(context.Background(), idx,atproto.NSIDBrewer, "hario", 10) 198 198 assert.NoError(t, err) 199 199 assert.Len(t, results, 2, "different brewer types should produce separate suggestions") 200 200 } ··· 211 211 "brewerType": "immersion", 212 212 }) 213 213 214 - results, err := Search(idx, atproto.NSIDBrewer, "aero", 10) 214 + results, err := Search(context.Background(), idx,atproto.NSIDBrewer, "aero", 10) 215 215 assert.NoError(t, err) 216 216 assert.Len(t, results, 1) 217 217 assert.Equal(t, 2, results[0].Count) ··· 233 233 "process": "Natural", 234 234 }) 235 235 236 - results, err := Search(idx, atproto.NSIDBean, "yirga", 10) 236 + results, err := Search(context.Background(), idx,atproto.NSIDBean, "yirga", 10) 237 237 assert.NoError(t, err) 238 238 assert.Len(t, results, 2, "different processes should produce separate suggestions") 239 239 } ··· 250 250 "origin": "Ethiopia", 251 251 }) 252 252 253 - results, err := Search(idx, atproto.NSIDBean, "gesha", 10) 253 + results, err := Search(context.Background(), idx,atproto.NSIDBean, "gesha", 10) 254 254 assert.NoError(t, err) 255 255 assert.Len(t, results, 2, "different origins should produce separate suggestions") 256 256 } ··· 270 270 "process": "Washed", 271 271 }) 272 272 273 - results, err := Search(idx, atproto.NSIDBean, "ethiopia", 10) 273 + results, err := Search(context.Background(), idx,atproto.NSIDBean, "ethiopia", 10) 274 274 assert.NoError(t, err) 275 275 assert.Len(t, results, 1) 276 276 assert.Equal(t, 2, results[0].Count) ··· 290 290 "location": "Oakland, CA", 291 291 }) 292 292 293 - results, err := Search(idx, atproto.NSIDRoaster, "bl", 10) 293 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "bl", 10) 294 294 assert.NoError(t, err) 295 295 assert.Len(t, results, 2) 296 296 assert.Equal(t, "Black & White Coffee", results[0].Name) ··· 304 304 "name": "Stumptown Coffee", 305 305 }) 306 306 307 - results, err := Search(idx, atproto.NSIDRoaster, "STUMP", 10) 307 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "STUMP", 10) 308 308 assert.NoError(t, err) 309 309 assert.Len(t, results, 1) 310 310 assert.Equal(t, "Stumptown Coffee", results[0].Name) ··· 318 318 "location": "Floyd, VA", 319 319 }) 320 320 321 - results, err := Search(idx, atproto.NSIDRoaster, "floyd", 10) 321 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "floyd", 10) 322 322 assert.NoError(t, err) 323 323 assert.Len(t, results, 1) 324 324 assert.Equal(t, "Red Rooster Coffee", results[0].Name) ··· 338 338 "location": "Durham, NC", 339 339 }) 340 340 341 - results, err := Search(idx, atproto.NSIDRoaster, "counter", 10) 341 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "counter", 10) 342 342 assert.NoError(t, err) 343 343 assert.Len(t, results, 1) 344 344 assert.Equal(t, 2, results[0].Count) ··· 356 356 }) 357 357 } 358 358 359 - results, err := Search(idx, atproto.NSIDGrinder, "grinder", 3) 359 + results, err := Search(context.Background(), idx,atproto.NSIDGrinder, "grinder", 3) 360 360 assert.NoError(t, err) 361 361 assert.Len(t, results, 3) 362 362 } ··· 368 368 "name": "ABC", 369 369 }) 370 370 371 - results, err := Search(idx, atproto.NSIDRoaster, "a", 10) 371 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "a", 10) 372 372 assert.NoError(t, err) 373 373 assert.Empty(t, results) 374 374 375 - results, err = Search(idx, atproto.NSIDRoaster, "ab", 10) 375 + results, err = Search(context.Background(), idx,atproto.NSIDRoaster, "ab", 10) 376 376 assert.NoError(t, err) 377 377 assert.Len(t, results, 1) 378 378 } ··· 380 380 func TestSearch_EmptyQuery(t *testing.T) { 381 381 idx := newTestFeedIndex(t) 382 382 383 - results, err := Search(idx, atproto.NSIDRoaster, "", 10) 383 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "", 10) 384 384 assert.NoError(t, err) 385 385 assert.Empty(t, results) 386 386 } ··· 388 388 func TestSearch_UnknownCollection(t *testing.T) { 389 389 idx := newTestFeedIndex(t) 390 390 391 - results, err := Search(idx, "unknown.collection", "test", 10) 391 + results, err := Search(context.Background(), idx,"unknown.collection", "test", 10) 392 392 assert.NoError(t, err) 393 393 assert.Empty(t, results) 394 394 } ··· 402 402 "burrType": "conical", 403 403 }) 404 404 405 - results, err := Search(idx, atproto.NSIDGrinder, "1zp", 10) 405 + results, err := Search(context.Background(), idx,atproto.NSIDGrinder, "1zp", 10) 406 406 assert.NoError(t, err) 407 407 assert.Len(t, results, 1) 408 408 assert.Equal(t, "hand", results[0].Fields["grinderType"]) ··· 419 419 "process": "Washed", 420 420 }) 421 421 422 - results, err := Search(idx, atproto.NSIDBean, "ethiopia", 10) 422 + results, err := Search(context.Background(), idx,atproto.NSIDBean, "ethiopia", 10) 423 423 assert.NoError(t, err) 424 424 assert.Len(t, results, 1) 425 425 assert.Equal(t, "Ethiopian Yirgacheffe", results[0].Name) ··· 434 434 "brewerType": "Pour-Over", 435 435 }) 436 436 437 - results, err := Search(idx, atproto.NSIDBrewer, "hario", 10) 437 + results, err := Search(context.Background(), idx,atproto.NSIDBrewer, "hario", 10) 438 438 assert.NoError(t, err) 439 439 assert.Len(t, results, 1) 440 440 assert.Equal(t, "Pour-Over", results[0].Fields["brewerType"]) ··· 454 454 "brewerType": "immersion", 455 455 }) 456 456 457 - results, err := Search(idx, atproto.NSIDRecipe, "v60", 10) 457 + results, err := Search(context.Background(), idx,atproto.NSIDRecipe, "v60", 10) 458 458 assert.NoError(t, err) 459 459 assert.Len(t, results, 2, "different brewer types should produce separate suggestions") 460 460 } ··· 471 471 "brewerType": "immersion", 472 472 }) 473 473 474 - results, err := Search(idx, atproto.NSIDRecipe, "aero", 10) 474 + results, err := Search(context.Background(), idx,atproto.NSIDRecipe, "aero", 10) 475 475 assert.NoError(t, err) 476 476 assert.Len(t, results, 1) 477 477 assert.Equal(t, 2, results[0].Count) ··· 485 485 "brewerType": "pourover", 486 486 }) 487 487 488 - results, err := Search(idx, atproto.NSIDRecipe, "hoffmann", 10) 488 + results, err := Search(context.Background(), idx,atproto.NSIDRecipe, "hoffmann", 10) 489 489 assert.NoError(t, err) 490 490 assert.Len(t, results, 1) 491 491 assert.Equal(t, "James Hoffmann V60", results[0].Name) ··· 503 503 // "Alpha Beta" used by 1 person 504 504 insertRecord(t, idx, "did:plc:dave", atproto.NSIDRoaster, "r4", map[string]interface{}{"name": "Alpha Beta"}) 505 505 506 - results, err := Search(idx, atproto.NSIDRoaster, "alpha", 10) 506 + results, err := Search(context.Background(), idx,atproto.NSIDRoaster, "alpha", 10) 507 507 assert.NoError(t, err) 508 508 assert.Len(t, results, 2) 509 509 assert.Equal(t, "Alpha Roasters", results[0].Name)