See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add firehose ClickHouse write helpers

Introduce insertLikeEventsLookup, insertLikeCountsDailyDeltas, and
lookupSubjectUri on ClickHouseStore — the write path and unlike-resolution
query for the firehose virality worker. A dateToClickHouseDate helper
formats Dates as UTC YYYY-MM-DD so per-day counts are consistent
regardless of the server's local timezone.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+220
+2
app/lib/clickhouse/index.ts
··· 4 4 TopPostsResult, 5 5 TopPostsKind, 6 6 EngagementEventRow, 7 + LikeEventLookupRow, 8 + LikeCountsDailyDeltaRow, 7 9 ClickHouseConfig, 8 10 } from './types.js' 9 11 export type { PostSnapshot, PostEmbed } from '#lib/atproto/index'
+106
app/lib/clickhouse/store.ts
··· 4 4 import type { 5 5 ClickHouseConfig, 6 6 EngagementEventRow, 7 + LikeCountsDailyDeltaRow, 8 + LikeEventLookupRow, 7 9 TopPostsKind, 8 10 TopPostsQuery, 9 11 TopPostsResult, ··· 103 105 */ 104 106 function dateToClickHouseStr(date: Date): string { 105 107 return date.toISOString() 108 + } 109 + 110 + /** 111 + * Converts a JS Date to a ClickHouse Date literal (YYYY-MM-DD) using the UTC 112 + * calendar — matches the semantics of ClickHouse's `today()` which is UTC. 113 + * The firehose worker computes per-event day values on the Node side so that 114 + * server-local timezone doesn't leak into counts. 115 + */ 116 + function dateToClickHouseDate(date: Date): string { 117 + const year = date.getUTCFullYear().toString().padStart(4, '0') 118 + const month = (date.getUTCMonth() + 1).toString().padStart(2, '0') 119 + const day = date.getUTCDate().toString().padStart(2, '0') 120 + return `${year}-${month}-${day}` 106 121 } 107 122 108 123 // --------------------------------------------------------------------------- ··· 414 429 { cause: err }, 415 430 ) 416 431 } 432 + } 433 + 434 + // ------------------------------------------------------------------------- 435 + // Write path — firehose virality (like_events_lookup + like_counts_daily) 436 + // ------------------------------------------------------------------------- 437 + 438 + /** 439 + * Batch-inserts like lookup rows into like_events_lookup. This table is 440 + * used so that unlike events — which only carry `(actor_did, rkey)` — can 441 + * resolve to the original post URI via a cheap PK-ordered lookup. 442 + */ 443 + async insertLikeEventsLookup(rows: LikeEventLookupRow[]): Promise<void> { 444 + if (rows.length === 0) return 445 + 446 + const values = rows.map((r) => ({ 447 + like_uri: r.likeUri, 448 + subject_uri: r.subjectUri, 449 + created_at: dateToClickHouseStr(r.createdAt), 450 + })) 451 + 452 + try { 453 + await this.client.insert({ 454 + table: 'like_events_lookup', 455 + values, 456 + format: 'JSONEachRow', 457 + }) 458 + } catch (err) { 459 + const detail = err instanceof Error ? err.message : String(err) 460 + throw new Error( 461 + `ClickHouseStore.insertLikeEventsLookup failed (${rows.length} rows): ${detail}`, 462 + { cause: err } 463 + ) 464 + } 465 + } 466 + 467 + /** 468 + * Batch-inserts per-day like count deltas into like_counts_daily. 469 + * Each row is a signed delta (+1 for a new like, -1 for an unlike); 470 + * the SummingMergeTree engine collapses them into a single summed row 471 + * per (subject_uri, day). 472 + */ 473 + async insertLikeCountsDailyDeltas(rows: LikeCountsDailyDeltaRow[]): Promise<void> { 474 + if (rows.length === 0) return 475 + 476 + const values = rows.map((r) => ({ 477 + subject_uri: r.subjectUri, 478 + day: dateToClickHouseDate(r.day), 479 + count: r.count, 480 + })) 481 + 482 + try { 483 + await this.client.insert({ 484 + table: 'like_counts_daily', 485 + values, 486 + format: 'JSONEachRow', 487 + }) 488 + } catch (err) { 489 + const detail = err instanceof Error ? err.message : String(err) 490 + throw new Error( 491 + `ClickHouseStore.insertLikeCountsDailyDeltas failed (${rows.length} rows): ${detail}`, 492 + { cause: err } 493 + ) 494 + } 495 + } 496 + 497 + /** 498 + * Looks up the subject URI for a given like AT-URI in like_events_lookup. 499 + * Returns null when no row exists (e.g. the original like predates the 500 + * firehose worker's history window, or the like_uri was never ingested). 501 + * 502 + * Drains the underlying result stream per CLAUDE.md. 503 + */ 504 + async lookupSubjectUri(likeUri: string): Promise<string | null> { 505 + let resultSet 506 + try { 507 + resultSet = await this.client.query({ 508 + query: 509 + 'SELECT subject_uri FROM like_events_lookup WHERE like_uri = {likeUri: String} LIMIT 1', 510 + format: 'JSONEachRow', 511 + query_params: { likeUri }, 512 + }) 513 + } catch (err) { 514 + const detail = err instanceof Error ? err.message : String(err) 515 + throw new Error(`ClickHouseStore.lookupSubjectUri failed for ${likeUri}: ${detail}`, { 516 + cause: err, 517 + }) 518 + } 519 + 520 + const rows = await resultSet.json<{ subject_uri: string }>() 521 + if (rows.length === 0) return null 522 + return rows[0].subject_uri 417 523 } 418 524 }
+24
app/lib/clickhouse/types.ts
··· 49 49 eventCreatedAt: Date 50 50 } 51 51 52 + /** 53 + * A single row in the firehose like_events_lookup table — maps a like's 54 + * AT-URI to the post URI it targets. Used to resolve unlike events 55 + * (which only carry the like record's rkey, not the subject). 56 + */ 57 + export interface LikeEventLookupRow { 58 + likeUri: string 59 + subjectUri: string 60 + createdAt: Date 61 + } 62 + 63 + /** 64 + * A single delta row in the firehose like_counts_daily SummingMergeTree 65 + * table. `count` is the per-event signed delta (+1 for a like, -1 for an 66 + * unlike); the engine collapses all rows for the same (subjectUri, day) 67 + * into a single summed row. 68 + */ 69 + export interface LikeCountsDailyDeltaRow { 70 + subjectUri: string 71 + /** UTC day. The firehose worker computes this on the Node side. */ 72 + day: Date 73 + count: number 74 + } 75 + 52 76 // --------------------------------------------------------------------------- 53 77 // Config 54 78 // ---------------------------------------------------------------------------
+88
tests/unit/clickhouse_store.spec.ts
··· 138 138 if (!store) return 139 139 await store.client.command({ query: 'TRUNCATE TABLE post_snapshots' }) 140 140 await store.client.command({ query: 'TRUNCATE TABLE engagement_events' }) 141 + await store.client.command({ query: 'TRUNCATE TABLE like_events_lookup' }) 142 + await store.client.command({ query: 'TRUNCATE TABLE like_counts_daily' }) 141 143 }) 142 144 143 145 // ------------------------------------------------------------------------- ··· 846 848 const error = thrown as Error & { cause?: unknown } 847 849 assert.exists(error.cause, 'Error.cause should be set to the original underlying error') 848 850 assert.instanceOf(error.cause, Error) 851 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 852 + 853 + // ------------------------------------------------------------------------- 854 + // Firehose virality write helpers 855 + // ------------------------------------------------------------------------- 856 + 857 + test('insertLikeEventsLookup round-trips rows', async ({ assert }) => { 858 + const likeUri = 'at://did:plc:actor1/app.bsky.feed.like/likerkey001' 859 + const subjectUri = 'at://did:plc:author1/app.bsky.feed.post/postrkey001' 860 + // Use a current timestamp — the TTL on like_events_lookup is 8 days. 861 + await store.insertLikeEventsLookup([{ likeUri, subjectUri, createdAt: new Date() }]) 862 + 863 + const count = await countRows(store, 'like_events_lookup') 864 + assert.equal(count, 1) 865 + 866 + const rs = await store.client.query({ 867 + query: 'SELECT like_uri, subject_uri FROM like_events_lookup', 868 + format: 'JSONEachRow', 869 + }) 870 + const rows = await rs.json<{ like_uri: string; subject_uri: string }>() 871 + assert.equal(rows[0].like_uri, likeUri) 872 + assert.equal(rows[0].subject_uri, subjectUri) 873 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 874 + 875 + test('lookupSubjectUri returns the subject for a known like_uri', async ({ assert }) => { 876 + const likeUri = 'at://did:plc:actor2/app.bsky.feed.like/likerkey002' 877 + const subjectUri = 'at://did:plc:author2/app.bsky.feed.post/postrkey002' 878 + await store.insertLikeEventsLookup([{ likeUri, subjectUri, createdAt: new Date() }]) 879 + 880 + const found = await store.lookupSubjectUri(likeUri) 881 + assert.equal(found, subjectUri) 882 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 883 + 884 + test('lookupSubjectUri returns null for an unknown like_uri', async ({ assert }) => { 885 + const found = await store.lookupSubjectUri( 886 + 'at://did:plc:nobody/app.bsky.feed.like/doesnotexist' 887 + ) 888 + assert.isNull(found) 889 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 890 + 891 + test('insertLikeCountsDailyDeltas sums +1/-1 via SummingMergeTree', async ({ assert }) => { 892 + const subjectUri = 'at://did:plc:author3/app.bsky.feed.post/postrkey003' 893 + // Use today's UTC date — the TTL on like_counts_daily is 8 days. 894 + const day = new Date() 895 + 896 + await store.insertLikeCountsDailyDeltas([ 897 + { subjectUri, day, count: 1 }, 898 + { subjectUri, day, count: 1 }, 899 + { subjectUri, day, count: 1 }, 900 + { subjectUri, day, count: -1 }, 901 + ]) 902 + 903 + // FINAL forces the merge so we can assert on the net count 904 + const rs = await store.client.query({ 905 + query: 906 + 'SELECT sum(count) AS c FROM like_counts_daily WHERE subject_uri = {s: String}', 907 + format: 'JSONEachRow', 908 + query_params: { s: subjectUri }, 909 + }) 910 + const rows = await rs.json<{ c: string }>() 911 + assert.equal(Number(rows[0].c), 2) 912 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 913 + 914 + test('insertLikeCountsDailyDeltas stores the day as UTC', async ({ assert }) => { 915 + const subjectUri = 'at://did:plc:author4/app.bsky.feed.post/postrkey004' 916 + // Use today at 23:30 UTC so the row isn't TTL-dropped (8-day window). 917 + const now = new Date() 918 + const d = new Date( 919 + Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate(), 23, 30, 0) 920 + ) 921 + const expectedDay = `${d.getUTCFullYear().toString().padStart(4, '0')}-${( 922 + d.getUTCMonth() + 1 923 + ) 924 + .toString() 925 + .padStart(2, '0')}-${d.getUTCDate().toString().padStart(2, '0')}` 926 + 927 + await store.insertLikeCountsDailyDeltas([{ subjectUri, day: d, count: 1 }]) 928 + 929 + const rs = await store.client.query({ 930 + query: 931 + 'SELECT toString(day) AS day FROM like_counts_daily WHERE subject_uri = {s: String}', 932 + format: 'JSONEachRow', 933 + query_params: { s: subjectUri }, 934 + }) 935 + const rows = await rs.json<{ day: string }>() 936 + assert.equal(rows[0].day, expectedDay) 849 937 }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 850 938 })