See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add ThresholdScanJob for firehose virality webhook firing

Scheduled queue job that scans like_counts_daily for posts at or above
the 1k / 10k like thresholds, enriches them via the AppView, and fires
a Discord webhook the first time each post crosses each threshold.

Dedup is stored in SQLite notified_thresholds so a given (post, threshold)
only fires once. Deleted posts / accounts silently insert a dedup row so
they're not re-enriched on every poll. AppView failures (network / 5xx)
capture to PostHog and retry on the next tick. Webhook HTTP failures
after the retry budget also capture to PostHog but still insert the
dedup row to prevent an unbounded retry loop against a broken URL.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+607
+230
app/jobs/threshold_scan_job.ts
··· 1 + import { inject } from '@adonisjs/core' 2 + import { Job } from '@adonisjs/queue' 3 + import { AtprotoClient, type PostView } from '#lib/atproto/index' 4 + import { ClickHouseStore } from '#lib/clickhouse/index' 5 + import NotifiedThreshold from '#models/notified_threshold' 6 + import { DiscordWebhookService, DiscordWebhookError } from '#services/discord_webhook' 7 + import { getPostHogClient } from '#services/posthog' 8 + 9 + // --------------------------------------------------------------------------- 10 + // Types 11 + // --------------------------------------------------------------------------- 12 + 13 + /** 14 + * Thresholds scanned by this job. Ordered ascending so that 15 + * findPostsAtOrAboveThreshold can be called with the smallest value. 16 + */ 17 + const THRESHOLDS = [1000, 10_000] as const 18 + 19 + const WINDOW_DAYS = 7 20 + 21 + /** 22 + * Shape a PostHog-compatible capture shim has to match. The real PostHog 23 + * client has other methods too, but this is all the job uses. 24 + */ 25 + interface PostHogCapturer { 26 + capture: (event: { 27 + distinctId: string 28 + event: string 29 + properties?: Record<string, unknown> 30 + }) => void 31 + } 32 + 33 + /** 34 + * Hooks for tests to override env-var reads and the PostHog client without 35 + * mutating globals. 36 + */ 37 + export interface ThresholdScanJobTestOverrides { 38 + /** 39 + * Per-threshold webhook URL. `null` means "env var not configured — skip 40 + * POSTing but still insert the dedup row". 41 + */ 42 + webhookUrls?: Record<number, string | null> 43 + posthogCapture?: PostHogCapturer['capture'] 44 + } 45 + 46 + // --------------------------------------------------------------------------- 47 + // Job 48 + // --------------------------------------------------------------------------- 49 + 50 + /** 51 + * Scheduled queue job that scans `like_counts_daily` for posts crossing 52 + * configured virality thresholds (1k, 10k) and fires a Discord webhook the 53 + * first time each post crosses each threshold. 54 + * 55 + * Dispatched every 60 seconds by `start/scheduler.ts`. 56 + */ 57 + @inject() 58 + export default class ThresholdScanJob extends Job<Record<string, never>> { 59 + static options = { 60 + queue: 'default', 61 + retry: { 62 + maxRetries: 2, 63 + }, 64 + } 65 + 66 + /** 67 + * Test-only override hook. Production code leaves this undefined; the job 68 + * reads env vars and the PostHog client directly. Tests set this to inject 69 + * fake webhook URLs and a capture collector. 70 + */ 71 + __testOverrides?: ThresholdScanJobTestOverrides 72 + 73 + constructor( 74 + private readonly atprotoClient: AtprotoClient, 75 + private readonly clickHouseStore: ClickHouseStore, 76 + private readonly webhookService: DiscordWebhookService 77 + ) { 78 + super() 79 + } 80 + 81 + private getWebhookUrl(threshold: number): string | null { 82 + if (this.__testOverrides?.webhookUrls) { 83 + const raw = this.__testOverrides.webhookUrls[threshold] 84 + return raw ?? null 85 + } 86 + const envVar = threshold === 1000 ? 'FIREHOSE_WEBHOOK_URL_1K' : 'FIREHOSE_WEBHOOK_URL_10K' 87 + const value = process.env[envVar] 88 + return value && value.length > 0 ? value : null 89 + } 90 + 91 + private capturePostHog(event: string, properties: Record<string, unknown>): void { 92 + const capture = this.__testOverrides?.posthogCapture 93 + if (capture) { 94 + capture({ distinctId: 'firehose-worker', event, properties }) 95 + return 96 + } 97 + getPostHogClient()?.capture({ 98 + distinctId: 'firehose-worker', 99 + event, 100 + properties, 101 + }) 102 + } 103 + 104 + async execute(): Promise<void> { 105 + const minThreshold = THRESHOLDS[0] 106 + const candidates = await this.clickHouseStore.findPostsAtOrAboveThreshold( 107 + minThreshold, 108 + WINDOW_DAYS 109 + ) 110 + if (candidates.length === 0) return 111 + 112 + // ------------------------------------------------------------------------- 113 + // Step 1: load existing dedup rows so we only fire newly crossed thresholds 114 + // ------------------------------------------------------------------------- 115 + const uris = candidates.map((c) => c.subjectUri) 116 + const existing = await NotifiedThreshold.query().whereIn('subject_uri', uris) 117 + const alreadyFired = new Map<string, Set<number>>() 118 + for (const row of existing) { 119 + let set = alreadyFired.get(row.subjectUri) 120 + if (!set) { 121 + set = new Set<number>() 122 + alreadyFired.set(row.subjectUri, set) 123 + } 124 + set.add(row.threshold) 125 + } 126 + 127 + // ------------------------------------------------------------------------- 128 + // Step 2: compute pending (candidate -> thresholds to fire). Drop anything 129 + // that has nothing to fire so we don't pay the AppView roundtrip for it. 130 + // ------------------------------------------------------------------------- 131 + type Pending = { subjectUri: string; count: number; thresholds: number[] } 132 + const pending: Pending[] = [] 133 + for (const cand of candidates) { 134 + const fired = alreadyFired.get(cand.subjectUri) ?? new Set<number>() 135 + const toFire = THRESHOLDS.filter((t) => cand.count >= t && !fired.has(t)) 136 + if (toFire.length > 0) { 137 + pending.push({ subjectUri: cand.subjectUri, count: cand.count, thresholds: [...toFire] }) 138 + } 139 + } 140 + if (pending.length === 0) return 141 + 142 + // ------------------------------------------------------------------------- 143 + // Step 3: enrichment via AppView. Network / 5xx failures capture to 144 + // PostHog and drop without writing dedup rows (retry next tick). 145 + // ------------------------------------------------------------------------- 146 + const pendingUris = pending.map((p) => p.subjectUri) 147 + let posts: PostView[] 148 + try { 149 + posts = await this.atprotoClient.getPosts(pendingUris) 150 + } catch (err) { 151 + for (const p of pending) { 152 + this.capturePostHog('firehose_enrichment_failed', { 153 + subject_uri: p.subjectUri, 154 + reason: err instanceof Error ? err.message : String(err), 155 + }) 156 + } 157 + return 158 + } 159 + 160 + const postByUri = new Map<string, PostView>() 161 + for (const post of posts) postByUri.set(post.uri, post) 162 + 163 + // ------------------------------------------------------------------------- 164 + // Step 4: fire webhooks and persist dedup rows 165 + // ------------------------------------------------------------------------- 166 + for (const p of pending) { 167 + const post = postByUri.get(p.subjectUri) 168 + const isDeletedOrMissing = 169 + !post || !post.author || !post.author.handle || !isValidHandle(post.author.handle) 170 + 171 + for (const threshold of p.thresholds) { 172 + if (isDeletedOrMissing) { 173 + await this.insertDedupRow(p.subjectUri, threshold) 174 + continue 175 + } 176 + 177 + const webhookUrl = this.getWebhookUrl(threshold) 178 + if (webhookUrl === null) { 179 + await this.insertDedupRow(p.subjectUri, threshold) 180 + continue 181 + } 182 + 183 + try { 184 + await this.webhookService.send({ 185 + webhookUrl, 186 + subjectUri: p.subjectUri, 187 + threshold, 188 + estimatedCount: p.count, 189 + authorHandle: post!.author.handle, 190 + postText: typeof post!.record?.text === 'string' ? post!.record.text : '', 191 + }) 192 + } catch (err) { 193 + this.capturePostHog('firehose_webhook_failed', { 194 + subject_uri: p.subjectUri, 195 + threshold, 196 + reason: err instanceof Error ? err.message : String(err), 197 + final: err instanceof DiscordWebhookError, 198 + }) 199 + } 200 + 201 + // Always insert dedup row — whether the webhook succeeded or the 202 + // retry budget was exhausted. A broken Discord URL must not cause 203 + // unbounded retries on every poll. 204 + await this.insertDedupRow(p.subjectUri, threshold) 205 + } 206 + } 207 + } 208 + 209 + private async insertDedupRow(subjectUri: string, threshold: number): Promise<void> { 210 + // Use create; the caller is expected to have already filtered out existing 211 + // (subject, threshold) pairs via the alreadyFired map. 212 + await NotifiedThreshold.create({ 213 + subjectUri, 214 + threshold, 215 + firedAt: Date.now(), 216 + }) 217 + } 218 + } 219 + 220 + /** 221 + * The AppView sometimes returns the sentinel string "handle.invalid" when the 222 + * account has been deleted / taken down but the post record still exists. Treat 223 + * that as the "author deleted" case so we silently dedup instead of sending 224 + * a webhook embed with a broken handle. 225 + */ 226 + function isValidHandle(handle: string): boolean { 227 + if (handle.length === 0) return false 228 + if (handle === 'handle.invalid') return false 229 + return true 230 + }
+377
tests/unit/jobs/threshold_scan_job.spec.ts
··· 1 + /** 2 + * Unit tests for ThresholdScanJob. 3 + * 4 + * Uses in-memory fakes for the AtprotoClient, ClickHouseStore and 5 + * DiscordWebhookService. SQLite is real — NotifiedThreshold rows are asserted 6 + * directly against the test DB (wrapped in a global transaction). 7 + */ 8 + import { test } from '@japa/runner' 9 + import testUtils from '@adonisjs/core/services/test_utils' 10 + import type { AtprotoClient, PostView } from '#lib/atproto/index' 11 + import type { ClickHouseStore } from '#lib/clickhouse/index' 12 + import NotifiedThreshold from '#models/notified_threshold' 13 + import ThresholdScanJob from '#jobs/threshold_scan_job' 14 + import { type DiscordWebhookService, DiscordWebhookError } from '#services/discord_webhook' 15 + 16 + // --------------------------------------------------------------------------- 17 + // Fakes 18 + // --------------------------------------------------------------------------- 19 + 20 + interface FakeCandidates { 21 + candidates: Array<{ subjectUri: string; count: number }> 22 + } 23 + 24 + function makeClickHouse(config: FakeCandidates): ClickHouseStore { 25 + return { 26 + async findPostsAtOrAboveThreshold() { 27 + return config.candidates 28 + }, 29 + } as unknown as ClickHouseStore 30 + } 31 + 32 + interface FakeAtprotoConfig { 33 + posts?: Record<string, PostView> 34 + throwError?: Error 35 + } 36 + 37 + function makeAtproto(config: FakeAtprotoConfig): AtprotoClient & { _calls: string[][] } { 38 + const calls: string[][] = [] 39 + return { 40 + _calls: calls, 41 + async getPosts(uris: string[]): Promise<PostView[]> { 42 + calls.push(uris) 43 + if (config.throwError) throw config.throwError 44 + return uris.map((u) => config.posts?.[u]).filter((p): p is PostView => p !== undefined) 45 + }, 46 + } as unknown as AtprotoClient & { _calls: string[][] } 47 + } 48 + 49 + function makePost(opts: { uri: string; did: string; handle: string; text: string }): PostView { 50 + return { 51 + uri: opts.uri, 52 + cid: 'bafycid', 53 + author: { did: opts.did, handle: opts.handle }, 54 + record: { $type: 'app.bsky.feed.post', text: opts.text, createdAt: '2026-04-14T00:00:00.000Z' }, 55 + indexedAt: '2026-04-14T00:00:00.000Z', 56 + } 57 + } 58 + 59 + interface FakeWebhookConfig { 60 + failAll?: boolean 61 + } 62 + 63 + function makeWebhook(config: FakeWebhookConfig = {}): DiscordWebhookService & { 64 + _calls: Array<{ webhookUrl: string; subjectUri: string; threshold: number }> 65 + } { 66 + const calls: Array<{ webhookUrl: string; subjectUri: string; threshold: number }> = [] 67 + const svc = { 68 + _calls: calls, 69 + async send(params: { 70 + webhookUrl: string 71 + subjectUri: string 72 + threshold: number 73 + [key: string]: unknown 74 + }) { 75 + calls.push({ 76 + webhookUrl: params.webhookUrl, 77 + subjectUri: params.subjectUri, 78 + threshold: params.threshold, 79 + }) 80 + if (config.failAll) { 81 + throw new DiscordWebhookError('failed for test', 3) 82 + } 83 + }, 84 + } as unknown as DiscordWebhookService & { 85 + _calls: typeof calls 86 + } 87 + return svc 88 + } 89 + 90 + // PostHog capture collector 91 + interface PostHogCapture { 92 + distinctId: string 93 + event: string 94 + properties?: Record<string, unknown> 95 + } 96 + function makePostHogCollector(): { 97 + captures: PostHogCapture[] 98 + capture: (c: PostHogCapture) => void 99 + } { 100 + const captures: PostHogCapture[] = [] 101 + return { 102 + captures, 103 + capture: (c) => { 104 + captures.push(c) 105 + }, 106 + } 107 + } 108 + 109 + // Build a job with injected fakes. 110 + function makeJob( 111 + atproto: AtprotoClient, 112 + clickhouse: ClickHouseStore, 113 + webhook: DiscordWebhookService, 114 + options: { 115 + webhookUrls?: Partial<Record<number, string | null>> 116 + posthogCapture?: (c: PostHogCapture) => void 117 + } = {} 118 + ) { 119 + const job = new ThresholdScanJob(atproto, clickhouse, webhook) 120 + const urls: Record<number, string | null> = { 121 + 1000: 'https://discord.example/hook-1k', 122 + 10000: 'https://discord.example/hook-10k', 123 + } 124 + if (options.webhookUrls) { 125 + for (const k of Object.keys(options.webhookUrls)) { 126 + urls[Number(k)] = options.webhookUrls[Number(k)]! 127 + } 128 + } 129 + job.__testOverrides = { 130 + webhookUrls: urls, 131 + posthogCapture: options.posthogCapture, 132 + } 133 + job.$hydrate({}, { jobId: 'test-job', attempt: 1, queue: 'default' } as Parameters< 134 + typeof job.$hydrate 135 + >[1]) 136 + return job 137 + } 138 + 139 + // --------------------------------------------------------------------------- 140 + // Tests 141 + // --------------------------------------------------------------------------- 142 + 143 + test.group('ThresholdScanJob — no candidates', (group) => { 144 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 145 + 146 + test('does nothing when ClickHouse returns no candidates', async ({ assert }) => { 147 + const atproto = makeAtproto({}) 148 + const clickhouse = makeClickHouse({ candidates: [] }) 149 + const webhook = makeWebhook() 150 + const job = makeJob(atproto, clickhouse, webhook) 151 + 152 + await job.execute() 153 + 154 + assert.equal(atproto._calls.length, 0) 155 + assert.equal(webhook._calls.length, 0) 156 + const rows = await NotifiedThreshold.all() 157 + assert.equal(rows.length, 0) 158 + }) 159 + }) 160 + 161 + test.group('ThresholdScanJob — single new 1k crossing', (group) => { 162 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 163 + 164 + test('fires 1k webhook and writes one dedup row', async ({ assert }) => { 165 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r1' 166 + const post = makePost({ 167 + uri, 168 + did: 'did:plc:alice', 169 + handle: 'alice.bsky.social', 170 + text: 'viral!', 171 + }) 172 + const atproto = makeAtproto({ posts: { [uri]: post } }) 173 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 1500 }] }) 174 + const webhook = makeWebhook() 175 + const job = makeJob(atproto, clickhouse, webhook) 176 + 177 + await job.execute() 178 + 179 + assert.equal(webhook._calls.length, 1) 180 + assert.equal(webhook._calls[0].threshold, 1000) 181 + assert.equal(webhook._calls[0].webhookUrl, 'https://discord.example/hook-1k') 182 + 183 + const rows = await NotifiedThreshold.query().where('subject_uri', uri) 184 + assert.equal(rows.length, 1) 185 + assert.equal(rows[0].threshold, 1000) 186 + }) 187 + }) 188 + 189 + test.group('ThresholdScanJob — first-time dual crossing', (group) => { 190 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 191 + 192 + test('fires both 1k and 10k webhooks and writes two dedup rows', async ({ assert }) => { 193 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r2' 194 + const post = makePost({ 195 + uri, 196 + did: 'did:plc:alice', 197 + handle: 'alice.bsky.social', 198 + text: 'extreme virality', 199 + }) 200 + const atproto = makeAtproto({ posts: { [uri]: post } }) 201 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 10500 }] }) 202 + const webhook = makeWebhook() 203 + const job = makeJob(atproto, clickhouse, webhook) 204 + 205 + await job.execute() 206 + 207 + assert.equal(webhook._calls.length, 2) 208 + const thresholds = webhook._calls.map((c) => c.threshold).sort((a, b) => a - b) 209 + assert.deepEqual(thresholds, [1000, 10000]) 210 + 211 + const rows = await NotifiedThreshold.query().where('subject_uri', uri) 212 + assert.equal(rows.length, 2) 213 + }) 214 + }) 215 + 216 + test.group('ThresholdScanJob — dedup prevents re-fire', (group) => { 217 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 218 + 219 + test('does not fire for thresholds already recorded', async ({ assert }) => { 220 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r3' 221 + await NotifiedThreshold.create({ 222 + subjectUri: uri, 223 + threshold: 1000, 224 + firedAt: Date.now(), 225 + }) 226 + 227 + const post = makePost({ 228 + uri, 229 + did: 'did:plc:alice', 230 + handle: 'alice.bsky.social', 231 + text: 'still viral', 232 + }) 233 + const atproto = makeAtproto({ posts: { [uri]: post } }) 234 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 2000 }] }) 235 + const webhook = makeWebhook() 236 + const job = makeJob(atproto, clickhouse, webhook) 237 + 238 + await job.execute() 239 + 240 + assert.equal(webhook._calls.length, 0) 241 + // getPosts should not even be called — we skip candidates with no new thresholds 242 + assert.equal(atproto._calls.length, 0) 243 + }) 244 + 245 + test('fires only newly crossed thresholds when some already fired', async ({ assert }) => { 246 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r4' 247 + await NotifiedThreshold.create({ 248 + subjectUri: uri, 249 + threshold: 1000, 250 + firedAt: Date.now(), 251 + }) 252 + 253 + const post = makePost({ 254 + uri, 255 + did: 'did:plc:alice', 256 + handle: 'alice.bsky.social', 257 + text: 'now very viral', 258 + }) 259 + const atproto = makeAtproto({ posts: { [uri]: post } }) 260 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 12000 }] }) 261 + const webhook = makeWebhook() 262 + const job = makeJob(atproto, clickhouse, webhook) 263 + 264 + await job.execute() 265 + 266 + assert.equal(webhook._calls.length, 1) 267 + assert.equal(webhook._calls[0].threshold, 10000) 268 + 269 + const rows = await NotifiedThreshold.query().where('subject_uri', uri).orderBy('threshold') 270 + assert.equal(rows.length, 2) 271 + assert.equal(rows[1].threshold, 10000) 272 + }) 273 + }) 274 + 275 + test.group('ThresholdScanJob — deleted post', (group) => { 276 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 277 + 278 + test('getPosts omission inserts dedup rows and skips webhook, no PostHog error', async ({ 279 + assert, 280 + }) => { 281 + const uri = 'at://did:plc:gone/app.bsky.feed.post/r5' 282 + const atproto = makeAtproto({ posts: {} }) 283 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 1500 }] }) 284 + const webhook = makeWebhook() 285 + const posthog = makePostHogCollector() 286 + const job = makeJob(atproto, clickhouse, webhook, { posthogCapture: posthog.capture }) 287 + 288 + await job.execute() 289 + 290 + assert.equal(webhook._calls.length, 0) 291 + assert.equal(posthog.captures.length, 0) 292 + 293 + const rows = await NotifiedThreshold.query().where('subject_uri', uri) 294 + assert.equal(rows.length, 1) 295 + assert.equal(rows[0].threshold, 1000) 296 + }) 297 + }) 298 + 299 + test.group('ThresholdScanJob — AppView errors', (group) => { 300 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 301 + 302 + test('enrichment failure logs to PostHog and does NOT write dedup rows', async ({ assert }) => { 303 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r6' 304 + const atproto = makeAtproto({ throwError: new Error('AppView 503') }) 305 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 1500 }] }) 306 + const webhook = makeWebhook() 307 + const posthog = makePostHogCollector() 308 + const job = makeJob(atproto, clickhouse, webhook, { posthogCapture: posthog.capture }) 309 + 310 + await job.execute() 311 + 312 + assert.equal(webhook._calls.length, 0) 313 + const rows = await NotifiedThreshold.all() 314 + assert.equal(rows.length, 0) 315 + 316 + assert.equal(posthog.captures.length, 1) 317 + assert.equal(posthog.captures[0].event, 'firehose_enrichment_failed') 318 + assert.equal(posthog.captures[0].properties?.subject_uri, uri) 319 + }) 320 + }) 321 + 322 + test.group('ThresholdScanJob — webhook failure', (group) => { 323 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 324 + 325 + test('captures PostHog error and still writes dedup row', async ({ assert }) => { 326 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r7' 327 + const post = makePost({ 328 + uri, 329 + did: 'did:plc:alice', 330 + handle: 'alice.bsky.social', 331 + text: 'test', 332 + }) 333 + const atproto = makeAtproto({ posts: { [uri]: post } }) 334 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 1500 }] }) 335 + const webhook = makeWebhook({ failAll: true }) 336 + const posthog = makePostHogCollector() 337 + const job = makeJob(atproto, clickhouse, webhook, { posthogCapture: posthog.capture }) 338 + 339 + await job.execute() 340 + 341 + assert.equal(webhook._calls.length, 1) 342 + 343 + const rows = await NotifiedThreshold.query().where('subject_uri', uri) 344 + assert.equal(rows.length, 1) 345 + assert.equal(rows[0].threshold, 1000) 346 + 347 + const failures = posthog.captures.filter((c) => c.event === 'firehose_webhook_failed') 348 + assert.equal(failures.length, 1) 349 + assert.equal(failures[0].properties?.subject_uri, uri) 350 + }) 351 + }) 352 + 353 + test.group('ThresholdScanJob — missing webhook URL', (group) => { 354 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 355 + 356 + test('when env URL is missing, still inserts dedup row but does not POST', async ({ assert }) => { 357 + const uri = 'at://did:plc:alice/app.bsky.feed.post/r8' 358 + const post = makePost({ 359 + uri, 360 + did: 'did:plc:alice', 361 + handle: 'alice.bsky.social', 362 + text: 't', 363 + }) 364 + const atproto = makeAtproto({ posts: { [uri]: post } }) 365 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 1500 }] }) 366 + const webhook = makeWebhook() 367 + const job = makeJob(atproto, clickhouse, webhook, { 368 + webhookUrls: { 1000: null, 10000: null }, 369 + }) 370 + 371 + await job.execute() 372 + 373 + assert.equal(webhook._calls.length, 0) 374 + const rows = await NotifiedThreshold.query().where('subject_uri', uri) 375 + assert.equal(rows.length, 1) 376 + }) 377 + })