Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 460 lines 13 kB view raw
1import { createClient, type ClickHouseClient } from '@clickhouse/client'; 2 3import type SafeTracer from '../../../utils/SafeTracer.js'; 4import { jsonStringify, tryJsonParse } from '../../../utils/encoding.js'; 5import type { IAnalyticsAdapter } from '../IAnalyticsAdapter.js'; 6import { 7 type AnalyticsEventInput, 8 type AnalyticsQueryResult, 9 type AnalyticsWriteOptions, 10} from '../types.js'; 11import { formatClickhouseQuery } from '../../warehouse/utils/clickhouseSql.js'; 12 13export interface ClickhouseAnalyticsConnection { 14 host: string; 15 username: string; 16 password: string; 17 database: string; 18 port?: number; 19 protocol?: 'http' | 'https'; 20} 21 22export interface ClickhouseAnalyticsAdapterOptions { 23 connection: ClickhouseAnalyticsConnection; 24 tracer?: SafeTracer; 25 defaultBatchSize?: number; 26} 27 28export class ClickhouseAnalyticsAdapter implements IAnalyticsAdapter { 29 private static readonly JSON_OBJECT_FIELDS_BY_TABLE = new Map< 30 string, 31 ReadonlySet<string> 32 >([ 33 [ 34 'content_api_requests', 35 new Set(['item_type_schema_field_roles']), 36 ], 37 [ 38 'item_model_scores_log', 39 new Set(['item_type_schema_field_roles']), 40 ], 41 [ 42 'appeals', 43 new Set(['actioned_item_type_schema_field_roles']), 44 ], 45 [ 46 'reporting_rule_executions', 47 new Set(['result', 'item_type_schema_field_roles']), 48 ], 49 [ 50 'reports', 51 new Set(['reported_item_data', 'reported_item_type_schema_field_roles']), 52 ], 53 ]); 54 55 private static readonly JSON_ARRAY_FIELDS_BY_TABLE = new Map< 56 string, 57 ReadonlySet<string> 58 >([ 59 [ 60 'action_executions', 61 new Set(['rules', 'policies', 'rule_tags']), 62 ], 63 [ 64 'reporting_rule_executions', 65 new Set([]), 66 ], 67 [ 68 'reports', 69 new Set([]), 70 ], 71 ]); 72 73 private static readonly DATE_TIME_FIELDS = new Set([ 74 'rule_version', 75 'prior_rule_version', 76 ]); 77 78 private static readonly DATE_FIELD_KINDS = new Map<string, DateFieldKind>([ 79 ['ts', 'datetime'], 80 ['ds', 'date'], 81 ['rule_version', 'datetime'], 82 ['prior_rule_version', 'datetime'], 83 ['ts_start_inclusive', 'datetime'], 84 ['ts_end_exclusive', 'datetime'], 85 ['reported_at', 'datetime'], 86 ['appealed_at', 'datetime'], 87 ['created_at', 'datetime'], 88 ['updated_at', 'datetime'], 89 ['action_time', 'datetime'], 90 ]); 91 92 readonly name = 'clickhouse-analytics'; 93 94 private readonly tracer?: SafeTracer; 95 private readonly client: ClickHouseClient; 96 private readonly defaultBatchSize: number; 97 98 constructor(options: ClickhouseAnalyticsAdapterOptions) { 99 this.tracer = options.tracer; 100 this.defaultBatchSize = options.defaultBatchSize ?? 500; 101 102 const protocol = options.connection.protocol ?? 'http'; 103 const port = options.connection.port ?? 8123; 104 105 const url = `${protocol}://${options.connection.host}:${port}`; 106 const password = options.connection.password.length 107 ? options.connection.password 108 : undefined; 109 this.client = createClient({ 110 url, 111 username: options.connection.username, 112 ...(password ? { password } : {}), 113 database: options.connection.database, 114 }); 115 } 116 117 async writeEvents( 118 table: string, 119 events: readonly AnalyticsEventInput[], 120 _options?: AnalyticsWriteOptions, 121 ): Promise<void> { 122 if (events.length === 0) { 123 return; 124 } 125 126 const batches = this.partition(events, this.defaultBatchSize); 127 128 for (const batch of batches) { 129 const normalizedBatch = batch.map((row) => 130 this.normalizeRecord(row, table), 131 ); 132 133 await this.client.insert({ 134 table, 135 values: normalizedBatch, 136 format: 'JSONEachRow', 137 }); 138 } 139 } 140 141 async query<T = AnalyticsQueryResult>( 142 sql: string, 143 params: readonly unknown[] = [], 144 ): Promise<readonly T[]> { 145 const execute = async () => { 146 const statement = formatClickhouseQuery(sql, params); 147 const result = await this.client.query({ 148 query: statement, 149 format: 'JSONEachRow', 150 }); 151 152 const rows = (await result.json()); 153 return rows as readonly T[]; 154 }; 155 156 if (this.tracer) { 157 return this.tracer.addActiveSpan( 158 { resource: 'clickhouse.client', operation: 'clickhouse.query' }, 159 execute, 160 ); 161 } 162 163 return execute(); 164 } 165 166 async flush(): Promise<void> { 167 // No-op: inserts are executed eagerly. 168 } 169 170 async close(): Promise<void> { 171 await this.client.close(); 172 } 173 174 private partition<T>(values: readonly T[], size: number): T[][] { 175 if (values.length <= size) { 176 return [values.slice()]; 177 } 178 179 const batches: T[][] = []; 180 for (let index = 0; index < values.length; index += size) { 181 batches.push(values.slice(index, index + size)); 182 } 183 184 return batches; 185 } 186 187 private normalizeRecord( 188 record: AnalyticsEventInput, 189 table: string, 190 ): AnalyticsEventInput { 191 const normalized: AnalyticsEventInput = {}; 192 const tableKey = ClickhouseAnalyticsAdapter.normalizeTableName(table); 193 const jsonObjectFields = 194 ClickhouseAnalyticsAdapter.JSON_OBJECT_FIELDS_BY_TABLE.get(tableKey); 195 const jsonArrayFields = 196 ClickhouseAnalyticsAdapter.JSON_ARRAY_FIELDS_BY_TABLE.get(tableKey); 197 198 for (const [key, value] of Object.entries(record)) { 199 // ClickHouse doesn't support nullable arrays, so convert null to empty array 200 // policy_names, policy_ids, tags are regular Array(String) columns 201 // rules, policies, rule_tags are String columns storing JSON 202 const regularArrayFields = ['policy_names', 'policy_ids', 'tags']; 203 const jsonStringFields = ['rules', 'policies', 'rule_tags']; 204 205 if (regularArrayFields.includes(key) && (value === null || value === undefined)) { 206 normalized[key] = []; 207 continue; 208 } 209 210 if (jsonStringFields.includes(key) && (value === null || value === undefined)) { 211 normalized[key] = '[]'; 212 continue; 213 } 214 215 normalized[key] = this.normalizeFieldValue({ 216 key, 217 value, 218 jsonArrayFields, 219 jsonObjectFields, 220 }); 221 } 222 223 // Stringify JSON fields (stored as String in ClickHouse) 224 this.stringifyJsonFields(normalized, jsonObjectFields, '{}'); 225 this.stringifyJsonFields(normalized, jsonArrayFields, '[]'); 226 227 return normalized; 228 } 229 230 private stringifyJsonFields( 231 normalized: AnalyticsEventInput, 232 fields: ReadonlySet<string> | undefined, 233 defaultValue: string, 234 ): void { 235 if (!fields) { 236 return; 237 } 238 239 for (const fieldKey of fields) { 240 if (!(fieldKey in normalized)) { 241 normalized[fieldKey] = defaultValue; 242 } else { 243 const value = normalized[fieldKey]; 244 if (typeof value === 'string') { 245 // Already a string, keep it 246 continue; 247 } 248 if (value !== null && (typeof value === 'object' || Array.isArray(value))) { 249 normalized[fieldKey] = jsonStringify(value); 250 } else { 251 normalized[fieldKey] = defaultValue; 252 } 253 } 254 } 255 } 256 257 private normalizeJsonObject(value: unknown): unknown { 258 if (value == null) { 259 return {}; 260 } 261 262 if (typeof value === 'string') { 263 return tryJsonParse(value) ?? {}; 264 } 265 266 if (Array.isArray(value)) { 267 return value; 268 } 269 270 if (typeof value === 'object') { 271 return value; 272 } 273 274 return {}; 275 } 276 277 private normalizeJsonArray(value: unknown): unknown[] { 278 if (value == null) { 279 return []; 280 } 281 282 let candidate: unknown; 283 284 if (Array.isArray(value)) { 285 candidate = value; 286 } else if (typeof value === 'string') { 287 candidate = tryJsonParse(value); 288 if (candidate == null) { 289 return []; 290 } 291 } else { 292 candidate = tryJsonParse(jsonStringify(value)); 293 if (candidate == null) { 294 return []; 295 } 296 } 297 298 if (!Array.isArray(candidate)) { 299 return []; 300 } 301 302 return candidate.map((entry) => { 303 if (entry == null) { 304 return entry; 305 } 306 if (Array.isArray(entry)) { 307 return this.normalizeJsonArray(entry); 308 } 309 if (typeof entry === 'object') { 310 return entry; 311 } 312 return entry; 313 }); 314 } 315 316 private normalizeFieldValue(params: { 317 key: string; 318 value: unknown; 319 jsonArrayFields?: ReadonlySet<string>; 320 jsonObjectFields?: ReadonlySet<string>; 321 }): unknown { 322 const { key, value, jsonArrayFields, jsonObjectFields } = params; 323 const lowerKey = key.toLowerCase(); 324 325 if (jsonArrayFields?.has(key)) { 326 return this.normalizeJsonArray(value); 327 } 328 329 const dateKind = 330 ClickhouseAnalyticsAdapter.DATE_FIELD_KINDS.get(lowerKey); 331 if (dateKind) { 332 return this.normalizeDateField(value, dateKind); 333 } 334 335 if (jsonObjectFields?.has(key)) { 336 return this.normalizeJsonObject(value); 337 } 338 339 // Some JSON/complex fields are stored as String in ClickHouse because: 340 // 1. ClickHouse doesn't support Nullable(JSON/Object) 341 // 2. ClickHouse JSON type doesn't handle arrays of objects well 342 const stringifyFields = [ 343 'reported_item_thread', // REPORTING_SERVICE.REPORTS - nullable array 344 'reported_items_in_thread', // REPORTING_SERVICE.REPORTS - nullable array 345 'reported_item_type_schema', // REPORTING_SERVICE.REPORTS - array of objects 346 'item_type_schema', // various tables - array of objects 347 'actioned_item_type_schema', // REPORTING_SERVICE.APPEALS - array of objects 348 'additional_items', // REPORTING_SERVICE.REPORTS/APPEALS - array of objects 349 ]; 350 351 if (stringifyFields.includes(key) && value != null) { 352 if (typeof value === 'string') { 353 return value; // Already stringified 354 } 355 return jsonStringify(value); 356 } 357 358 // For ClickHouse JSON columns with experimental object type enabled, 359 // we should pass objects/arrays directly. The ClickHouse client will 360 // handle serialization when using JSONEachRow format. 361 362 if (Array.isArray(value)) { 363 if (stringifyFields.includes(key)) { 364 return jsonStringify(value); 365 } 366 return value.map((item) => this.normalizeValue(item)); 367 } 368 369 if (value && typeof value === 'object') { 370 const normalizedEntries = Object.entries( 371 value as Record<string, unknown>, 372 ).map(([entryKey, entryValue]) => [ 373 entryKey, 374 this.normalizeValue(entryValue, entryKey), 375 ]); 376 return Object.fromEntries(normalizedEntries); 377 } 378 379 return value; 380 } 381 382 private normalizeDateField( 383 value: unknown, 384 kind: DateFieldKind, 385 ): unknown { 386 if (value == null) { 387 return value; 388 } 389 390 let date: Date | undefined; 391 if (value instanceof Date) { 392 date = value; 393 } else if (typeof value === 'number') { 394 date = new Date(value); 395 } else if (typeof value === 'string') { 396 const trimmed = value.trim(); 397 if (!trimmed.length) { 398 return value; 399 } 400 const maybeNumber = Number(trimmed); 401 // eslint-disable-next-line security/detect-unsafe-regex 402 if (!Number.isNaN(maybeNumber) && /^[+-]?\d+(\.\d+)?$/u.test(trimmed)) { 403 // Interpret numeric strings as epoch milliseconds (or seconds if clearly seconds) 404 date = new Date( 405 trimmed.includes('.') ? maybeNumber * 1000 : maybeNumber, 406 ); 407 } else { 408 date = new Date(trimmed); 409 } 410 } 411 412 if (!date || Number.isNaN(date.getTime())) { 413 return value; 414 } 415 416 return this.formatDate(date, kind); 417 } 418 419 private normalizeValue(value: unknown, key?: string): unknown { 420 // Used for recursion on nested values that aren't top-level fields. 421 if (key) { 422 return this.normalizeFieldValue({ 423 key, 424 value, 425 }); 426 } 427 return value; 428 } 429 430 private parseAndFormatDate(value: string, column: string): string | undefined { 431 const kind = 432 ClickhouseAnalyticsAdapter.DATE_FIELD_KINDS.get(column.toLowerCase()); 433 if (!kind) { 434 return undefined; 435 } 436 437 const parsed = new Date(value); 438 if (Number.isNaN(parsed.getTime())) { 439 return undefined; 440 } 441 442 return this.formatDate(parsed, kind); 443 } 444 445 private formatDate(date: Date, kind: DateFieldKind): string { 446 if (kind === 'date') { 447 return date.toISOString().slice(0, 10); 448 } 449 450 return date.toISOString().replace('T', ' ').replace('Z', ''); 451 } 452 453 private static normalizeTableName(table: string): string { 454 const lower = table.toLowerCase(); 455 const dotIndex = lower.lastIndexOf('.'); 456 return dotIndex >= 0 ? lower.slice(dotIndex + 1) : lower; 457 } 458} 459 460type DateFieldKind = 'datetime' | 'date';