See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add ClickHouseStore.findPostsAtOrAboveThreshold

Returns every post whose net like count over the last N days is at least
`minThreshold`, by summing the signed deltas in `like_counts_daily`.
Backs the firehose threshold scan job's candidate query.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+83 -5
+42 -5
app/lib/clickhouse/store.ts
··· 200 200 const detail = err instanceof Error ? err.message : String(err) 201 201 throw new Error( 202 202 `ClickHouseStore.getTopPosts failed for author ${query.authorDid}: ${detail}`, 203 - { cause: err }, 203 + { cause: err } 204 204 ) 205 205 } 206 206 ··· 242 242 const detail = err instanceof Error ? err.message : String(err) 243 243 throw new Error( 244 244 `ClickHouseStore.getOldestPostDate failed for author ${authorDid}: ${detail}`, 245 - { cause: err }, 245 + { cause: err } 246 246 ) 247 247 } 248 248 ··· 292 292 const detail = err instanceof Error ? err.message : String(err) 293 293 throw new Error( 294 294 `ClickHouseStore.insertPostSnapshots failed (${snapshots.length} rows): ${detail}`, 295 - { cause: err }, 295 + { cause: err } 296 296 ) 297 297 } 298 298 } ··· 327 327 const detail = err instanceof Error ? err.message : String(err) 328 328 throw new Error( 329 329 `ClickHouseStore.insertEngagementEvents failed (${events.length} rows): ${detail}`, 330 - { cause: err }, 330 + { cause: err } 331 331 ) 332 332 } 333 333 } ··· 426 426 const detail = err instanceof Error ? err.message : String(err) 427 427 throw new Error( 428 428 `ClickHouseStore.tombstoneUserSnapshots failed for author ${authorDid}: ${detail}`, 429 - { cause: err }, 429 + { cause: err } 430 430 ) 431 431 } 432 432 } ··· 492 492 { cause: err } 493 493 ) 494 494 } 495 + } 496 + 497 + /** 498 + * Returns every post whose net like count over the last `sinceDaysAgo` days 499 + * is at least `minThreshold`. Scans `like_counts_daily` (SummingMergeTree) 500 + * and aggregates the raw delta rows — `sum()` reflects the net of likes 501 + * minus unlikes. 502 + * 503 + * Drains the result stream per CLAUDE.md. 504 + */ 505 + async findPostsAtOrAboveThreshold( 506 + minThreshold: number, 507 + sinceDaysAgo: number 508 + ): Promise<Array<{ subjectUri: string; count: number }>> { 509 + const sql = ` 510 + SELECT subject_uri, sum(count) AS c 511 + FROM like_counts_daily 512 + WHERE day >= today() - {sinceDaysAgo:Int32} 513 + GROUP BY subject_uri 514 + HAVING c >= {minThreshold:Int32}` 515 + 516 + let resultSet 517 + try { 518 + resultSet = await this.client.query({ 519 + query: sql, 520 + format: 'JSONEachRow', 521 + query_params: { minThreshold, sinceDaysAgo }, 522 + }) 523 + } catch (err) { 524 + const detail = err instanceof Error ? err.message : String(err) 525 + throw new Error(`ClickHouseStore.findPostsAtOrAboveThreshold failed: ${detail}`, { 526 + cause: err, 527 + }) 528 + } 529 + 530 + const rows = await resultSet.json<{ subject_uri: string; c: string }>() 531 + return rows.map((r) => ({ subjectUri: r.subject_uri, count: Number(r.c) })) 495 532 } 496 533 497 534 /**
+41
tests/unit/clickhouse_store.spec.ts
··· 931 931 const rows = await rs.json<{ day: string }>() 932 932 assert.equal(rows[0].day, expectedDay) 933 933 }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 934 + 935 + test('findPostsAtOrAboveThreshold returns only posts whose summed count meets the threshold', async ({ 936 + assert, 937 + }) => { 938 + const below = 'at://did:plc:virality_below/app.bsky.feed.post/p1' 939 + const exact = 'at://did:plc:virality_exact/app.bsky.feed.post/p2' 940 + const above = 'at://did:plc:virality_above/app.bsky.feed.post/p3' 941 + const today = new Date() 942 + 943 + await store.insertLikeCountsDailyDeltas([ 944 + // below: 999 likes, -1 unlike = 998 (below threshold) 945 + { subjectUri: below, day: today, count: 999 }, 946 + { subjectUri: below, day: today, count: -1 }, 947 + // exact: 1000 likes 948 + { subjectUri: exact, day: today, count: 1000 }, 949 + // above: 500 + 700 = 1200 (split across delta rows) 950 + { subjectUri: above, day: today, count: 500 }, 951 + { subjectUri: above, day: today, count: 700 }, 952 + ]) 953 + 954 + const candidates = await store.findPostsAtOrAboveThreshold(1000, 7) 955 + 956 + const byUri = new Map(candidates.map((r) => [r.subjectUri, r.count])) 957 + assert.isFalse(byUri.has(below)) 958 + assert.equal(byUri.get(exact), 1000) 959 + assert.equal(byUri.get(above), 1200) 960 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 961 + 962 + test('findPostsAtOrAboveThreshold excludes posts whose deltas fall outside the window', async ({ 963 + assert, 964 + }) => { 965 + const recent = 'at://did:plc:virality_recent/app.bsky.feed.post/r1' 966 + const today = new Date() 967 + 968 + await store.insertLikeCountsDailyDeltas([{ subjectUri: recent, day: today, count: 1500 }]) 969 + 970 + // sinceDaysAgo = 0 includes only today(): should find the post 971 + const candidates = await store.findPostsAtOrAboveThreshold(1000, 0) 972 + const byUri = new Map(candidates.map((r) => [r.subjectUri, r.count])) 973 + assert.equal(byUri.get(recent), 1500) 974 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 934 975 })