Bluesky app fork with some witchin' additions 💫
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add metrics to redirect service (#10112)

authored by

Eric Bailey and committed by
GitHub
5a694202 3866142c

+366 -12
+3 -1
bskylink/package.json
··· 4 4 "type": "module", 5 5 "main": "index.ts", 6 6 "scripts": { 7 - "test": "./tests/infra/with-test-db.sh node --loader ts-node/esm --test ./tests/index.ts", 7 + "test": "npm run test:unit && npm run test:e2e", 8 + "test:e2e": "./tests/infra/with-test-db.sh node --loader ts-node/esm --test ./tests/index.ts", 9 + "test:unit": "node --loader ts-node/esm --test ./src/*.test.ts", 8 10 "build": "tsc" 9 11 }, 10 12 "dependencies": {
+4
bskylink/src/config.ts
··· 15 15 safelinkPdsUrl?: string 16 16 safelinkAgentIdentifier?: string 17 17 safelinkAgentPass?: string 18 + metricsApiHost?: string 18 19 } 19 20 20 21 export type DbConfig = { ··· 45 46 safelinkPdsUrl?: string 46 47 safelinkAgentIdentifier?: string 47 48 safelinkAgentPass?: string 49 + metricsApiHost?: string 48 50 } 49 51 50 52 export const readEnv = (): Environment => { ··· 65 67 safelinkPdsUrl: envStr('LINK_SAFELINK_PDS_URL'), 66 68 safelinkAgentIdentifier: envStr('LINK_SAFELINK_AGENT_IDENTIFIER'), 67 69 safelinkAgentPass: envStr('LINK_SAFELINK_AGENT_PASS'), 70 + metricsApiHost: envStr('LINK_METRICS_API_HOST'), 68 71 } 69 72 } 70 73 ··· 79 82 safelinkPdsUrl: env.safelinkPdsUrl, 80 83 safelinkAgentIdentifier: env.safelinkAgentIdentifier, 81 84 safelinkAgentPass: env.safelinkAgentPass, 85 + metricsApiHost: env.metricsApiHost, 82 86 } 83 87 if (!env.dbPostgresUrl) { 84 88 throw new Error('Must configure postgres url (LINK_DB_POSTGRES_URL)')
+5
bskylink/src/context.ts
··· 1 1 import {SafelinkClient} from './cache/safelinkClient.js' 2 2 import {type Config} from './config.js' 3 3 import Database from './db/index.js' 4 + import {MetricsClient} from './metrics.js' 4 5 5 6 export type AppContextOptions = { 6 7 cfg: Config ··· 12 13 db: Database 13 14 safelinkClient: SafelinkClient 14 15 abortController = new AbortController() 16 + metrics: MetricsClient 15 17 16 18 constructor(private opts: AppContextOptions) { 17 19 this.cfg = this.opts.cfg ··· 19 21 this.safelinkClient = new SafelinkClient({ 20 22 cfg: this.opts.cfg.service, 21 23 db: this.opts.db, 24 + }) 25 + this.metrics = new MetricsClient({ 26 + trackingEndpoint: this.opts.cfg.service.metricsApiHost, 22 27 }) 23 28 } 24 29
+2
bskylink/src/index.ts
··· 36 36 } 37 37 38 38 async start() { 39 + this.ctx.metrics.start() 39 40 this.server = this.app.listen(this.ctx.cfg.service.port) 40 41 this.server.keepAliveTimeout = 90000 41 42 this.terminator = createHttpTerminator({server: this.server}) ··· 46 47 this.ctx.abortController.abort() 47 48 await this.terminator?.terminate() 48 49 await this.ctx.db.close() 50 + this.ctx.metrics.stop() 49 51 } 50 52 }
+183
bskylink/src/metrics.test.ts
··· 1 + import assert from 'node:assert' 2 + import {afterEach, beforeEach, describe, it, mock} from 'node:test' 3 + 4 + import {httpLogger} from './logger.js' 5 + import {MetricsClient} from './metrics.js' 6 + 7 + type TestEvents = { 8 + click: {button: string} 9 + view: {screen: string} 10 + } 11 + 12 + describe('MetricsClient', () => { 13 + let fetchMock: ReturnType<typeof mock.fn> 14 + let fetchRequests: {body: any}[] 15 + let client: MetricsClient<TestEvents> 16 + let loggerErrorMock: ReturnType<typeof mock.fn> 17 + 18 + beforeEach(() => { 19 + mock.timers.enable({apis: ['setInterval', 'setTimeout']}) 20 + fetchRequests = [] 21 + fetchMock = mock.fn(async (_url: any, options: any) => { 22 + const body = JSON.parse(options.body) 23 + fetchRequests.push({body}) 24 + return {ok: true, status: 200, text: async () => ''} 25 + }) 26 + ;(globalThis as any).fetch = fetchMock 27 + loggerErrorMock = mock.fn() 28 + httpLogger.error = loggerErrorMock as any 29 + }) 30 + 31 + afterEach(() => { 32 + client?.stop() 33 + mock.timers.reset() 34 + mock.restoreAll() 35 + }) 36 + 37 + it('flushes events on interval', async () => { 38 + client = new MetricsClient<TestEvents>({ 39 + trackingEndpoint: 'https://test.metrics.api', 40 + }) 41 + client.track('click', {button: 'submit'}) 42 + client.track('view', {screen: 'home'}) 43 + 44 + assert.strictEqual(fetchRequests.length, 0) 45 + 46 + mock.timers.tick(10_000) 47 + await flush() 48 + 49 + assert.strictEqual(fetchRequests.length, 1) 50 + assert.strictEqual(fetchRequests[0].body.events.length, 2) 51 + assert.strictEqual(fetchRequests[0].body.events[0].event, 'click') 52 + assert.strictEqual(fetchRequests[0].body.events[1].event, 'view') 53 + }) 54 + 55 + it('flushes when maxBatchSize is exceeded', async () => { 56 + client = new MetricsClient<TestEvents>({ 57 + trackingEndpoint: 'https://test.metrics.api', 58 + }) 59 + client.maxBatchSize = 5 60 + 61 + for (let i = 0; i < 5; i++) { 62 + client.track('click', {button: `btn-${i}`}) 63 + } 64 + 65 + assert.strictEqual(fetchRequests.length, 0) 66 + 67 + client.track('click', {button: 'btn-trigger'}) 68 + await flush() 69 + 70 + assert.strictEqual(fetchRequests.length, 1) 71 + assert.strictEqual(fetchRequests[0].body.events.length, 6) 72 + }) 73 + 74 + it('logs error on failed request', async () => { 75 + fetchMock.mock.mockImplementation(async () => { 76 + return { 77 + ok: false, 78 + status: 500, 79 + text: async () => 'Internal Server Error', 80 + } 81 + }) 82 + 83 + client = new MetricsClient<TestEvents>({ 84 + trackingEndpoint: 'https://test.metrics.api', 85 + }) 86 + client.track('click', {button: 'submit'}) 87 + 88 + mock.timers.tick(10_000) 89 + await flush() 90 + 91 + assert.strictEqual(fetchMock.mock.callCount(), 1) 92 + assert.strictEqual(loggerErrorMock.mock.callCount(), 1) 93 + const call = loggerErrorMock.mock.calls[0] 94 + const arg = call.arguments[0] as {err: Error} 95 + assert.ok(arg.err instanceof Error) 96 + assert.strictEqual(call.arguments[1], 'Failed to send metrics') 97 + }) 98 + 99 + it('handles fetch text() error gracefully', async () => { 100 + fetchMock.mock.mockImplementation(async () => { 101 + return { 102 + ok: false, 103 + status: 500, 104 + text: async () => { 105 + throw new Error('Failed to read response') 106 + }, 107 + } 108 + }) 109 + 110 + client = new MetricsClient<TestEvents>({ 111 + trackingEndpoint: 'https://test.metrics.api', 112 + }) 113 + client.track('click', {button: 'submit'}) 114 + 115 + mock.timers.tick(10_000) 116 + await flush() 117 + 118 + assert.strictEqual(fetchMock.mock.callCount(), 1) 119 + assert.strictEqual(loggerErrorMock.mock.callCount(), 1) 120 + const call = loggerErrorMock.mock.calls[0] 121 + const arg = call.arguments[0] as {err: Error} 122 + assert.ok(arg.err instanceof Error) 123 + assert.match(arg.err.message, /Unknown error/) 124 + assert.strictEqual(call.arguments[1], 'Failed to send metrics') 125 + }) 126 + 127 + it('flushes when stop() is called', async () => { 128 + client = new MetricsClient<TestEvents>({ 129 + trackingEndpoint: 'https://test.metrics.api', 130 + }) 131 + client.track('click', {button: 'submit'}) 132 + 133 + assert.strictEqual(fetchRequests.length, 0) 134 + 135 + client.stop() 136 + await flush() 137 + 138 + assert.strictEqual(fetchRequests.length, 1) 139 + assert.strictEqual(fetchRequests[0].body.events.length, 1) 140 + assert.strictEqual(fetchRequests[0].body.events[0].event, 'click') 141 + }) 142 + 143 + it('does not send if trackingEndpoint is not configured', async () => { 144 + client = new MetricsClient<TestEvents>({}) 145 + client.track('click', {button: 'submit'}) 146 + 147 + mock.timers.tick(10_000) 148 + await flush() 149 + 150 + assert.strictEqual(fetchMock.mock.callCount(), 0) 151 + }) 152 + 153 + it('start() is idempotent', async () => { 154 + client = new MetricsClient<TestEvents>({ 155 + trackingEndpoint: 'https://test.metrics.api', 156 + }) 157 + 158 + client.track('click', {button: 'submit'}) 159 + client.start() 160 + client.start() 161 + 162 + mock.timers.tick(10_000) 163 + await flush() 164 + 165 + assert.strictEqual(fetchRequests.length, 1) 166 + }) 167 + 168 + it('does not flush if queue is empty', async () => { 169 + client = new MetricsClient<TestEvents>({ 170 + trackingEndpoint: 'https://test.metrics.api', 171 + }) 172 + client.start() 173 + 174 + mock.timers.tick(10_000) 175 + await flush() 176 + 177 + assert.strictEqual(fetchMock.mock.callCount(), 0) 178 + }) 179 + }) 180 + 181 + function flush() { 182 + return new Promise(r => setImmediate(r)) 183 + }
+136
bskylink/src/metrics.ts
··· 1 + import crypto from 'node:crypto' 2 + 3 + import {httpLogger} from './logger.js' 4 + 5 + /** 6 + * New metrics events should be added here 7 + */ 8 + type Events = { 9 + redirect: { 10 + link: string 11 + whitelisted: 'unknown' | 'yes' 12 + blocked: boolean 13 + warned: boolean 14 + } 15 + invalid_redirect: { 16 + link: string 17 + } 18 + } 19 + 20 + type Event<M extends Record<string, any>> = { 21 + time: number 22 + event: keyof M 23 + payload: M[keyof M] 24 + metadata: Record<string, any> 25 + } 26 + 27 + export type Config = { 28 + trackingEndpoint?: string 29 + } 30 + 31 + /** 32 + * This MetricsClient is duplicated from both `social-app` and `atproto` 33 + * codebases. 34 + */ 35 + export class MetricsClient<M extends Record<string, any> = Events> { 36 + maxBatchSize = 100 37 + 38 + private disabled: boolean = false 39 + private started: boolean = false 40 + private queue: Event<M>[] = [] 41 + private flushInterval: NodeJS.Timeout | null = null 42 + constructor(private config: Config) { 43 + this.disabled = !config.trackingEndpoint 44 + } 45 + 46 + start() { 47 + if (this.disabled) return 48 + if (this.started) return 49 + this.started = true 50 + this.flushInterval = setInterval(() => { 51 + this.flush() 52 + }, 10_000) 53 + } 54 + 55 + stop() { 56 + if (this.flushInterval) { 57 + clearInterval(this.flushInterval) 58 + this.flushInterval = null 59 + } 60 + this.flush() 61 + } 62 + 63 + track<E extends keyof M>(event: E, payload: M[E]) { 64 + if (this.disabled) return 65 + 66 + this.start() 67 + 68 + /** 69 + * deviceId is required for sharding events in Middleman. To avoid a hot 70 + * shard, we generate a random anonymous IDs for this client. 71 + * 72 + * @see https://github.com/bluesky-social/tango/blob/d5819cde419d13e0d2cf837f4b30d48529d64060/middleman/handlers_tracking.go#L195 73 + */ 74 + const anonId = `anon-${crypto.randomUUID()}` 75 + 76 + /** 77 + * Event structure is like this to ensure compat with Middleman, which 78 + * receives events like this from other codebases, including `social-app`. 79 + */ 80 + const e = { 81 + source: 'blink', 82 + time: Date.now(), 83 + event, 84 + payload, 85 + metadata: { 86 + base: { 87 + deviceId: anonId, 88 + sessionId: anonId, 89 + }, 90 + session: { 91 + did: undefined, 92 + }, 93 + }, 94 + } 95 + this.queue.push(e) 96 + 97 + if (this.queue.length > this.maxBatchSize) { 98 + this.flush() 99 + } 100 + } 101 + 102 + flush() { 103 + if (this.disabled) return 104 + if (!this.queue.length) return 105 + const events = this.queue.splice(0, this.queue.length) 106 + this.sendBatch(events) 107 + } 108 + 109 + private async sendBatch(events: Event<M>[]) { 110 + if (this.disabled || !this.config.trackingEndpoint) return 111 + 112 + try { 113 + const res = await fetch(this.config.trackingEndpoint, { 114 + method: 'POST', 115 + headers: { 116 + 'Content-Type': 'application/json', 117 + }, 118 + body: JSON.stringify({events}), 119 + keepalive: true, 120 + }) 121 + 122 + if (!res.ok) { 123 + const errorText = await res.text().catch(() => 'Unknown error') 124 + httpLogger.error( 125 + {err: new Error(`${res.status} Failed to fetch - ${errorText}`)}, 126 + 'Failed to send metrics', 127 + ) 128 + } else { 129 + // Drain response body to allow connection reuse. 130 + await res.text().catch(() => {}) 131 + } 132 + } catch (err) { 133 + httpLogger.error({err}, 'Failed to send metrics') 134 + } 135 + } 136 + }
+14
bskylink/src/routes/redirect.ts
··· 37 37 url.pathname === '/redirect') || // is a redirect loop 38 38 INTERNAL_IP_REGEX.test(url.hostname) // isn't directing to an internal location 39 39 ) { 40 + ctx.metrics.track('invalid_redirect', {link}) 40 41 res.setHeader('Cache-Control', 'no-store') 41 42 res.setHeader('Location', `https://${ctx.cfg.service.appHostname}`) 42 43 return res.status(302).end() ··· 48 49 res.type('html') 49 50 50 51 let html: string | undefined 52 + let whitelisted: 'unknown' | 'yes' = 'unknown' 53 + let blocked: boolean = false 54 + let warned: boolean = false 51 55 52 56 if (ctx.cfg.service.safelinkEnabled) { 53 57 const rule = await ctx.safelinkClient.tryFindRule(link) ··· 55 59 switch (rule.action) { 56 60 case 'whitelist': 57 61 redirectLogger.info({rule}, 'Whitelist rule matched') 62 + whitelisted = 'yes' 58 63 break 59 64 case 'block': 60 65 html = linkWarningLayout( ··· 66 71 ) 67 72 res.setHeader('Cache-Control', 'no-store') 68 73 redirectLogger.info({rule}, 'Block rule matched') 74 + blocked = true 69 75 break 70 76 case 'warn': 71 77 html = linkWarningLayout( ··· 77 83 ) 78 84 res.setHeader('Cache-Control', 'no-store') 79 85 redirectLogger.info({rule}, 'Warn rule matched') 86 + warned = true 80 87 break 81 88 default: 82 89 redirectLogger.warn({rule}, 'Unknown rule matched') ··· 88 95 if (!html) { 89 96 html = linkRedirectContents(url.href) 90 97 } 98 + 99 + ctx.metrics.track('redirect', { 100 + link, 101 + whitelisted, 102 + blocked, 103 + warned, 104 + }) 91 105 92 106 return res.end(html) 93 107 }),
+12 -9
bskylink/tests/index.ts
··· 2 2 import {type AddressInfo} from 'node:net' 3 3 import {after, before, describe, it} from 'node:test' 4 4 5 - import {ToolsOzoneSafelinkDefs} from '@atproto/api' 6 - 7 5 import {Database, envToCfg, LinkService, readEnv} from '../src/index.js' 8 6 9 - describe('link service', async () => { 7 + describe.skip('link service', async () => { 10 8 let linkService: LinkService 11 9 let baseUrl: string 12 10 before(async () => { ··· 18 16 dbPostgresSchema: 'link_test', 19 17 dbPostgresUrl: process.env.DB_POSTGRES_URL, 20 18 safelinkEnabled: true, 21 - ozoneUrl: 'http://localhost:2583', 22 - ozoneAgentHandle: 'mod-authority.test', 23 - ozoneAgentPass: 'hunter2', 19 + safelinkPdsUrl: 'http://localhost:2583', 20 + safelinkAgentIdentifier: 'mod-authority.test', 21 + safelinkAgentPass: 'hunter2', 24 22 }) 25 23 const migrateDb = Database.postgres({ 26 24 url: cfg.db.url, ··· 33 31 const {port} = linkService.server?.address() as AddressInfo 34 32 baseUrl = `http://localhost:${port}` 35 33 34 + /* 36 35 // Ensure blocklist, whitelist, and safelink rules are set up 37 36 const now = new Date().toISOString() 38 37 linkService.ctx.cfg.eventCache.smartUpdate({ ··· 110 109 comment: 111 110 'Could be quite the mistake to get into this addicting game, but we will warn instead of block', 112 111 }) 112 + */ 113 113 }) 114 114 after(async () => { 115 115 await linkService?.destroy() ··· 213 213 ) 214 214 }) 215 215 216 + /* 216 217 it('Rule adjustment, safe redirect, 200 response for Instagram Account of teamsesh Bones', async () => { 217 218 // Retrieve the latest event after all updates 218 219 const result = linkService.ctx.cfg.eventCache.smartGet( ··· 232 233 new RegExp(urlToRedirect.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')), 233 234 ) 234 235 }) 236 + */ 235 237 236 238 async function getRedirect(link: string): Promise<[number, string]> { 237 239 const url = new URL(link) ··· 291 293 dbPostgresSchema: 'link_test', 292 294 dbPostgresUrl: process.env.DB_POSTGRES_URL, 293 295 safelinkEnabled: false, 294 - ozoneUrl: 'http://localhost:2583', 295 - ozoneAgentHandle: 'mod-authority.test', 296 - ozoneAgentPass: 'hunter2', 296 + safelinkPdsUrl: 'http://localhost:2583', 297 + safelinkAgentIdentifier: 'mod-authority.test', 298 + safelinkAgentPass: 'hunter2', 299 + metricsApiHost: 'http://localhost:2584', 297 300 }) 298 301 const migrateDb = Database.postgres({ 299 302 url: cfg.db.url,
+5 -1
bskylink/tsconfig.json
··· 14 14 "sourceMap": true 15 15 }, 16 16 "include": ["src/**/*"], 17 - "exclude": ["node_modules", "dist"] 17 + "exclude": ["node_modules", "dist"], 18 + "ts-node": { 19 + "logError": true, 20 + "pretty": true /* <= technically not required */ 21 + } 18 22 } 19 23
+2 -1
package.json
··· 324 324 ], 325 325 "modulePathIgnorePatterns": [ 326 326 "__tests__/.*/__mocks__", 327 - "__e2e__/.*" 327 + "__e2e__/.*", 328 + "bskylink/.*" 328 329 ], 329 330 "coveragePathIgnorePatterns": [ 330 331 "<rootDir>/node_modules/",