See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Parse jetstream like-delete events into LikeDeleteEvent

The firehose virality worker needs to observe unlikes so it can
decrement the per-post like count. Extend parseJetstreamEvent to emit
a new LikeDeleteEvent kind for app.bsky.feed.like deletes; repost
deletes remain ignored (no use case yet). The existing jetstream
consumer is unaffected because it only switches on the kinds it cares
about.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+446 -6
+1
app/lib/atproto/index.ts
··· 8 8 export type { 9 9 JetstreamEvent, 10 10 LikeEvent, 11 + LikeDeleteEvent, 11 12 RepostEvent, 12 13 PostEvent, 13 14 PostDeleteEvent,
+29 -4
app/lib/atproto/parsers/jetstream.ts
··· 2 2 import type { 3 3 JetstreamEvent, 4 4 LikeEvent, 5 + LikeDeleteEvent, 5 6 RepostEvent, 6 7 PostEvent, 7 8 PostDeleteEvent, ··· 303 304 } 304 305 } 305 306 307 + function parseLikeDelete( 308 + did: string, 309 + timeUs: number, 310 + commit: Record<string, unknown> 311 + ): LikeDeleteEvent | null { 312 + const rkey = getString(commit, 'rkey') 313 + const collection = getString(commit, 'collection') 314 + if (!rkey || !collection) return null 315 + 316 + const likeUri = `at://${did}/${collection}/${rkey}` 317 + 318 + return { 319 + kind: 'like-delete', 320 + actorDid: did, 321 + rkey, 322 + likeUri, 323 + ingestedAt: timeUsToDate(timeUs), 324 + } 325 + } 326 + 306 327 function parseRepostCreate( 307 328 did: string, 308 329 timeUs: number, ··· 452 473 * - The event is malformed / not an object 453 474 * - The kind/collection is one favs.blue v1 doesn't care about 454 475 * 455 - * **Like and repost DELETE events return null intentionally.** 456 - * Per spec §10: "Unlikes / unreposts are not tracked. Counts may drift up 457 - * over time. The alternative is a multi-billion-row rkey-to-post mapping 476 + * **Like DELETE events are now parsed as `LikeDeleteEvent`** (consumed by 477 + * the firehose virality worker, which resolves the subject URI via the 478 + * `like_events_lookup` ClickHouse table). 479 + * 480 + * **Repost DELETE events still return null intentionally.** 481 + * Per spec §10: "Unreposts are not tracked. Counts may drift up over 482 + * time. The alternative is a multi-billion-row rkey-to-post mapping 458 483 * table, which is not worth the storage cost for v1." 459 484 * 460 485 * @param rawJson - A parsed JSON value (not a string — JSON parsing is ··· 489 514 // Like events 490 515 if (collection === 'app.bsky.feed.like') { 491 516 if (operation === 'create') return parseLikeCreate(did, timeUs, commit) 492 - // DELETE: intentionally ignored (spec §10 — unlikes not tracked) 517 + if (operation === 'delete') return parseLikeDelete(did, timeUs, commit) 493 518 return null 494 519 } 495 520
+22
app/lib/atproto/types.ts
··· 27 27 } 28 28 29 29 /** 30 + * A like was deleted (an unlike) by an actor. 31 + * Source: commit event with collection=app.bsky.feed.like, operation=delete 32 + * 33 + * Unlike `LikeEvent`, we do NOT have the subject post URI here — the delete 34 + * commit only carries `(did, collection, rkey)`. The firehose worker is 35 + * responsible for resolving the original subject via the ClickHouse 36 + * `like_events_lookup` table. 37 + */ 38 + export interface LikeDeleteEvent { 39 + kind: 'like-delete' 40 + /** DID of the user who performed the unlike */ 41 + actorDid: string 42 + /** rkey of the original like record (the one being deleted) */ 43 + rkey: string 44 + /** AT-URI of the original like record, reconstructed from did + collection + rkey */ 45 + likeUri: string 46 + /** When the event was ingested (from top-level time_us, converted from µs) */ 47 + ingestedAt: Date 48 + } 49 + 50 + /** 30 51 * A repost was created by an actor on a post. 31 52 * Source: commit event with collection=app.bsky.feed.repost, operation=create 32 53 */ ··· 135 156 */ 136 157 export type JetstreamEvent = 137 158 | LikeEvent 159 + | LikeDeleteEvent 138 160 | RepostEvent 139 161 | PostEvent 140 162 | PostDeleteEvent
+384
docs/superpowers/specs/2026-04-14-firehose-virality-webhook-design.md
··· 1 + # Firehose Virality Webhook — Design 2 + 3 + ## Problem 4 + 5 + We want to know in near real-time when any post on Bluesky crosses a like 6 + threshold (1,000 and 10,000 likes), and push a Discord-compatible webhook 7 + to a configured URL — one URL per threshold, so the two signal levels 8 + can be routed to different Discord channels — each time a post first 9 + crosses each threshold. This is 10 + distinct from the existing per-user tracking: we are counting likes on 11 + arbitrary posts across the whole network, not only posts by tracked profiles. 12 + 13 + Constraints: 14 + 15 + - Must run on the existing 8 GB / 4 vCPU Hetzner box alongside the web, 16 + jetstream, and queue processes. 17 + - Must not store unbounded data — the design works over a rolling window. 18 + - Must survive process restarts. 19 + - Must handle unlikes correctly, to prevent the threshold being gamed by 20 + a user rapidly liking/unliking the same post. Unlikes cancel their 21 + matching like, so a like/unlike/like cycle from one user nets to a 22 + single like regardless of how many cycles happen. 23 + - Must not fire for deleted posts. 24 + 25 + ## Scope 26 + 27 + - New `firehose-worker` process that subscribes to the unfiltered 28 + `app.bsky.feed.like` jetstream and writes like / unlike deltas into 29 + ClickHouse. 30 + - Two new ClickHouse tables: a lookup table mapping `like_uri → subject_uri` 31 + and a per-day per-post counts table. 32 + - Cursor persistence for the firehose worker (SQLite), mirroring the existing 33 + jetstream consumer pattern. 34 + - New scheduled queue job (via `adonisjs-scheduler` + the existing 35 + AdonisJS queue) that scans for posts crossing thresholds and fires 36 + Discord webhooks. 37 + - New SQLite table `notified_thresholds` for per-(post, threshold) dedup. 38 + - ClickHouse memory caps configured explicitly. 39 + - `queue-worker` container memory trimmed to leave room for `firehose-worker`. 40 + 41 + Out of scope: 42 + 43 + - Multi-subscriber webhooks, per-user thresholds, auth. Single global 44 + Discord URL configured via env. 45 + - Reposts, replies, or any engagement signal other than likes. 46 + - Backfilling pre-existing like counts from the AppView. We count only 47 + what we see on the firehose from startup onward. 48 + - Exact replay idempotency. Jetstream reconnect replay can cause tiny 49 + overcounts; accepted as design-level noise. Escape hatch documented 50 + under "Deferred work". 51 + - Per-day historical queries against `like_counts_daily`. The schema only 52 + supports "total likes in the last N days" aggregation. 53 + 54 + ## Design 55 + 56 + ### Counting model 57 + 58 + We treat the system as counting net likes (likes minus unlikes) per post 59 + over a rolling 7-day window. A post "crosses" a threshold the first time 60 + its 7-day net like count is observed at or above the threshold value. 61 + 62 + Counts are approximate by design: 63 + 64 + - We count only likes observed on the jetstream from process start onward. 65 + A post that was already popular when we started watching won't be counted 66 + against its true total. 67 + - Jetstream delete events may arrive slightly out of order or be replayed 68 + briefly on reconnect, causing small drift. 69 + - The 7-day window means a post which accumulates likes slowly over weeks 70 + never crosses a threshold. This is intentional; we're detecting virality, 71 + not cumulative popularity. 72 + 73 + ### ClickHouse schema 74 + 75 + Two tables, both `MergeTree` family: 76 + 77 + ```sql 78 + CREATE TABLE like_events_lookup ( 79 + like_uri String, 80 + subject_uri String, 81 + created_at DateTime 82 + ) 83 + ENGINE = MergeTree 84 + ORDER BY like_uri 85 + TTL created_at + INTERVAL 8 DAY; 86 + 87 + CREATE TABLE like_counts_daily ( 88 + subject_uri String, 89 + day Date, 90 + count Int64 91 + ) 92 + ENGINE = SummingMergeTree 93 + ORDER BY subject_uri 94 + PARTITION BY day 95 + TTL day + INTERVAL 8 DAY; 96 + ``` 97 + 98 + `like_events_lookup` exists so that when an unlike arrives — Jetstream 99 + delete events give us `(did, collection, rkey)` but not the subject of the 100 + original like — we can look up the subject URI by primary-key scan. 101 + `ORDER BY like_uri` makes this a cheap sparse-index lookup 102 + (sub-millisecond cached, single-digit ms cold). 103 + 104 + `like_counts_daily` is the aggregation target for the threshold poll. 105 + `PARTITION BY day` means TTL drops whole partitions cheaply rather than 106 + doing row-level mutations. Within each daily partition, `SummingMergeTree` 107 + collapses all `(subject_uri, day)` deltas into a single row per post. 108 + 8-day retention is 7 days for the query window plus one day of buffer. 109 + 110 + TTL is 8 days on both tables; the query window is 7 days, giving one day 111 + of slack for merge lag and clock skew. 112 + 113 + ### Write path (firehose-worker) 114 + 115 + The worker subscribes to jetstream with a collection filter of 116 + `app.bsky.feed.like` and no DID filter. For each commit: 117 + 118 + - **Create** (new like): 119 + - `INSERT INTO like_events_lookup VALUES (like_uri, subject_uri, created_at)` 120 + - `INSERT INTO like_counts_daily VALUES (subject_uri, today(), +1)` 121 + 122 + - **Delete** (unlike): 123 + - `SELECT subject_uri FROM like_events_lookup WHERE like_uri = ?` (PK lookup) 124 + - If found: `INSERT INTO like_counts_daily VALUES (subject_uri, today(), -1)` 125 + - If not found (original like aged out or never seen): drop the event 126 + 127 + Inserts are batched in the worker: collect events for up to 2 seconds or 128 + 5,000 events, whichever comes first, then issue one batched `INSERT` per 129 + table using ClickHouse async inserts. 130 + 131 + Cursor checkpointing mirrors `app/services/jetstream_cursor_io.ts`: the 132 + worker persists the last processed jetstream cursor to SQLite every few 133 + seconds, resumes from there on restart. 134 + 135 + ### Threshold poll (scheduled queue job) 136 + 137 + We use [`adonisjs-scheduler`](https://packages.adonisjs.com/packages/adonisjs-scheduler) 138 + to trigger a queue job on a cron schedule. The scheduler runs as part of 139 + the `queue-worker` process (or a dedicated scheduler entrypoint if that 140 + package recommends it during install — to be confirmed during 141 + implementation). Every 60 seconds it dispatches a `ThresholdScanJob` 142 + onto the existing AdonisJS queue, which runs inside `queue-worker`. 143 + 144 + The job does: 145 + 146 + 1. Query ClickHouse for candidate posts: 147 + 148 + ```sql 149 + SELECT subject_uri, sum(count) AS c 150 + FROM like_counts_daily 151 + WHERE day >= today() - 7 152 + GROUP BY subject_uri 153 + HAVING c >= :min_threshold 154 + ``` 155 + 156 + `:min_threshold` is the smallest configured threshold (1,000), so we 157 + never need to consider posts below that. 158 + 159 + 2. For each candidate, look up SQLite `notified_thresholds` by 160 + `(subject_uri, threshold)` to find the largest threshold already fired. 161 + Determine which thresholds the post has newly crossed (e.g. `c=10_400`, 162 + last fired `1000` → fire `10000`). 163 + 164 + 3. For the set of (post, threshold) pairs to fire, batch an AppView 165 + `app.bsky.feed.getPosts` call (up to 25 URIs per call) to fetch 166 + author handle and post text. 167 + 168 + Outcomes per post: 169 + 170 + - **Post/author resolved cleanly:** fire the webhook for the 171 + relevant threshold(s). 172 + - **Post or author deleted** (AppView returns not-found/tombstone 173 + for the post, or the post exists but the author handle can't be 174 + resolved because the account was deleted): skip firing, insert 175 + dedup row(s) so we don't re-check on the next poll. Do **not** 176 + log an error — this is expected. 177 + - **Any other enrichment failure** (network error, AppView 5xx, 178 + unexpected shape): skip firing, do **not** insert a dedup row 179 + (so the next poll retries), and log an error to PostHog with 180 + the subject URI and failure reason. 181 + 182 + 4. For each post selected to fire: POST to the Discord webhook URL for 183 + the corresponding threshold, then 184 + `INSERT INTO notified_thresholds VALUES (subject_uri, threshold, now)`. 185 + 186 + Webhook HTTP failures retry up to 3 times with exponential backoff. 187 + On final failure, log to PostHog and still insert the dedup row — 188 + we don't want a broken Discord URL to cause the system to re-try 189 + forever. 190 + 191 + ### Discord webhook payload 192 + 193 + Two env vars, one per threshold: 194 + 195 + - `FIREHOSE_WEBHOOK_URL_1K` — Discord webhook for posts crossing 1,000 likes. 196 + - `FIREHOSE_WEBHOOK_URL_10K` — Discord webhook for posts crossing 10,000 likes. 197 + 198 + This keeps the two signal levels routable to different Discord channels. 199 + Both are optional — if a URL is absent, posts crossing that threshold 200 + still get a dedup row written (so we don't re-process every poll) but 201 + no webhook is sent. 202 + 203 + Payload shape: 204 + 205 + ```json 206 + { 207 + "username": "favs.blue firehose", 208 + "embeds": [{ 209 + "author": { "name": "@joe.bsky.social" }, 210 + "title": "Post crossed 1,000 likes", 211 + "url": "https://bsky.app/profile/did:plc:.../post/3k...", 212 + "description": "the post text, truncated to ~500 chars if needed", 213 + "color": 3447003, 214 + "timestamp": "2026-04-14T12:34:56Z", 215 + "fields": [ 216 + { "name": "Estimated likes", "value": "1,037", "inline": true }, 217 + { "name": "Threshold", "value": "1,000", "inline": true } 218 + ] 219 + }] 220 + } 221 + ``` 222 + 223 + AT-URI to bsky.app web URL conversion: 224 + `at://{did}/app.bsky.feed.post/{rkey}` → `https://bsky.app/profile/{did}/post/{rkey}`. 225 + 226 + Enrichment failure handling is described in the threshold poll section: 227 + if the post or author was deleted, skip silently; for any other 228 + enrichment failure, skip, log to PostHog, and do not write a dedup row 229 + (retry on next poll). 230 + 231 + ### SQLite schema 232 + 233 + ``` 234 + notified_thresholds 235 + subject_uri TEXT NOT NULL 236 + threshold INTEGER NOT NULL 237 + fired_at INTEGER NOT NULL -- ms since epoch 238 + PRIMARY KEY (subject_uri, threshold) 239 + ``` 240 + 241 + No TTL needed. Bounded by (unique viral posts) × (number of thresholds). 242 + Even at Bluesky scale over a year, a few tens of thousands of rows max. 243 + 244 + ### Process and container changes 245 + 246 + New `firehose-worker` entry in docker-compose: 247 + 248 + - Command: `node ace.js firehose:watch` 249 + - Memory limit: 384M 250 + - CPU limit: 0.5 251 + - Same volume mounts as jetstream-worker 252 + - Same restart/healthcheck policies as jetstream-worker 253 + - Env: `JETSTREAM_URL` (same as the existing consumer, or its own unfiltered 254 + endpoint if different) 255 + 256 + Existing `queue-worker` memory trimmed from 1G → 512M. The queue worker 257 + runs backfill jobs which are I/O bound; 1 G is generous and the threshold 258 + poll work is cheap. 259 + 260 + ClickHouse internal memory caps added to a new 261 + `clickhouse/config.d/memory.xml` file mounted into the ClickHouse 262 + container: 263 + 264 + ```xml 265 + <clickhouse> 266 + <max_server_memory_usage>3221225472</max_server_memory_usage> 267 + <profiles> 268 + <default> 269 + <max_memory_usage>1073741824</max_memory_usage> 270 + </default> 271 + </profiles> 272 + </clickhouse> 273 + ``` 274 + 275 + (3 GiB server cap, 1 GiB per-query cap.) This ensures ClickHouse fails 276 + queries gracefully rather than getting OOM-killed by Docker. 277 + 278 + Resulting memory budget: 279 + 280 + | Service | Limit | Notes | 281 + |------------------|-------|--------------------------------| 282 + | clickhouse | 4 G | Internal cap set to 3 G | 283 + | web | 1.5 G | | 284 + | jetstream-worker | 512 M | | 285 + | queue-worker | 512 M | Trimmed from 1 G | 286 + | firehose-worker | 384 M | New | 287 + | **Total** | ~6.9 G | Leaves ~1.1 G for OS/overhead | 288 + 289 + ### Ace commands and code layout 290 + 291 + - `commands/firehose_watch.ts` — the `node ace firehose:watch` entrypoint. 292 + - `app/services/firehose_consumer.ts` — modeled on 293 + `app/services/jetstream_consumer.ts`. Takes an injected `WebSocketLike` 294 + factory; subscribes without DID filter; filters for the `.like` 295 + collection only. 296 + - `app/services/firehose_cursor_io.ts` — cursor persistence, mirrors 297 + `jetstream_cursor_io.ts`. 298 + - `app/lib/clickhouse/firehose_writes.ts` — batched insert helpers for 299 + `like_events_lookup` and `like_counts_daily`. 300 + - `app/jobs/threshold_scan_job.ts` — the queue job dispatched by the 301 + scheduler that runs the threshold poll, does AppView enrichment, 302 + fires webhooks. 303 + - Scheduler configuration — cron entry (via `adonisjs-scheduler`) that 304 + dispatches `ThresholdScanJob` every 60 seconds. 305 + - `app/services/discord_webhook.ts` — builds the payload and POSTs with 306 + retry/backoff. 307 + - `database/migrations/<timestamp>_create_notified_thresholds.ts` — 308 + SQLite table. 309 + - `database/clickhouse/<N>_firehose_tables.sql` — ClickHouse DDL. 310 + - `config/clickhouse.xml.d/memory.xml` — ClickHouse memory caps (actual 311 + path depends on how we mount config overrides today). 312 + 313 + ### Configuration 314 + 315 + New env vars in `start/env.ts` (all via Vine): 316 + 317 + - `FIREHOSE_WEBHOOK_URL_1K` — Discord webhook URL for 1,000-like 318 + crossings. Optional. 319 + - `FIREHOSE_WEBHOOK_URL_10K` — Discord webhook URL for 10,000-like 320 + crossings. Optional. 321 + - `FIREHOSE_JETSTREAM_URL` — optional override; defaults to the same 322 + URL as the existing consumer. 323 + 324 + Thresholds themselves (`[1000, 10000]`) are hardcoded in the job — they 325 + are tied 1:1 to webhook URLs, so there's no sensible way to configure 326 + them independently. 327 + 328 + ### Testing 329 + 330 + Unit tests (`tests/unit/`): 331 + 332 + - `firehose_consumer.spec.ts` — with injected fake WebSocket, verify 333 + like/unlike events produce the right ClickHouse inserts; verify 334 + unlike with no matching lookup drops silently; verify cursor is 335 + persisted. 336 + - `discord_webhook.spec.ts` — payload shape, truncation, URL conversion, 337 + retry/backoff behavior, graceful degradation when enrichment fails. 338 + - `threshold_scan_job.spec.ts` — given a fake ClickHouse result set and 339 + fake `notified_thresholds`, verify the correct (post, threshold) pairs 340 + fire and dedup rows are written. 341 + 342 + Functional tests (`tests/functional/`, real ClickHouse): 343 + 344 + - Insert synthetic like/unlike events, run the threshold query, verify 345 + only posts above threshold come back and counts match expected net. 346 + - Verify unlikes correctly decrement counts when the like is still in 347 + the lookup window, and drop silently when it isn't. 348 + - Verify partition-level TTL: insert events dated 9 days ago, run the 349 + `OPTIMIZE TABLE` flow, check old partitions are dropped. 350 + - Remember to drain query streams (per CLAUDE.md). 351 + 352 + ### Rollout 353 + 354 + 1. Land schema migrations first. The `firehose:watch` process can be 355 + deployed but kept scaled to zero until we're confident. 356 + 2. Start `firehose-worker` at 1 replica, observe jetstream throughput 357 + and ClickHouse ingest rate for a few hours before enabling the 358 + threshold job. 359 + 3. Enable the threshold job with the two `FIREHOSE_WEBHOOK_URL_*` 360 + env vars initially pointing to test Discord channels; confirm 361 + realistic firings. 362 + 4. Swap to the production channels. 363 + 364 + ### Deferred work 365 + 366 + Things explicitly not built now, with escape hatches: 367 + 368 + - **Exact idempotency under replay.** If jetstream reconnect replays 369 + cause user-visible double-firing, switch `like_events_lookup` to 370 + `ReplacingMergeTree(created_at)` and gate the counts `+1` insert on 371 + "was this like_uri already in the lookup table?". Adds ~1 PK lookup 372 + per like event. 373 + - **Unique-liker filtering (bot resistance).** Switch 374 + `like_counts_daily` to `AggregatingMergeTree` storing 375 + `uniqState(liker_did)`, query with `uniqMerge`. Catches situations 376 + where a single bot account mass-likes to fake virality. Costs more 377 + storage and query memory. 378 + - **RocksDB-backed lookup.** If point-lookup latency on 379 + `like_events_lookup` becomes a bottleneck in production, swap its 380 + engine to `EmbeddedRocksDB`. Requires a manual cleanup job to 381 + replace the TTL. 382 + - **Auto-reposting from a Bluesky account.** The current design just 383 + fires a Discord webhook. Layering an auto-repost action on top is 384 + straightforward once this proves out.
+10 -2
tests/unit/atproto/jetstream.spec.ts
··· 365 365 }) 366 366 367 367 test.group('parseJetstreamEvent — ignored events', () => { 368 - test('returns null for like delete (spec §10: unlikes not tracked)', ({ assert }) => { 368 + test('parses a like delete into a LikeDeleteEvent', ({ assert }) => { 369 369 const result = parseJetstreamEvent(LIKE_DELETE_EVENT) 370 - assert.isNull(result) 370 + if (result === null) return assert.fail('expected LikeDeleteEvent, got null') 371 + if (result.kind !== 'like-delete') return assert.fail(`expected kind=like-delete, got ${result.kind}`) 372 + assert.equal(result.actorDid, 'did:plc:unlikingactor') 373 + assert.equal(result.rkey, '3l3likedeletedkey') 374 + assert.equal( 375 + result.likeUri, 376 + 'at://did:plc:unlikingactor/app.bsky.feed.like/3l3likedeletedkey' 377 + ) 378 + assert.instanceOf(result.ingestedAt, Date) 371 379 }) 372 380 373 381 test('returns null for repost delete (spec §10: unreposts not tracked)', ({ assert }) => {