Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { createClient, type ClickHouseClient } from '@clickhouse/client';
2
3import type SafeTracer from '../../../utils/SafeTracer.js';
4import { jsonStringify, tryJsonParse } from '../../../utils/encoding.js';
5import type { IAnalyticsAdapter } from '../IAnalyticsAdapter.js';
6import {
7 type AnalyticsEventInput,
8 type AnalyticsQueryResult,
9 type AnalyticsWriteOptions,
10} from '../types.js';
11import { formatClickhouseQuery } from '../../warehouse/utils/clickhouseSql.js';
12
13export interface ClickhouseAnalyticsConnection {
14 host: string;
15 username: string;
16 password: string;
17 database: string;
18 port?: number;
19 protocol?: 'http' | 'https';
20}
21
22export interface ClickhouseAnalyticsAdapterOptions {
23 connection: ClickhouseAnalyticsConnection;
24 tracer?: SafeTracer;
25 defaultBatchSize?: number;
26}
27
28export class ClickhouseAnalyticsAdapter implements IAnalyticsAdapter {
29 private static readonly JSON_OBJECT_FIELDS_BY_TABLE = new Map<
30 string,
31 ReadonlySet<string>
32 >([
33 [
34 'content_api_requests',
35 new Set(['item_type_schema_field_roles']),
36 ],
37 [
38 'item_model_scores_log',
39 new Set(['item_type_schema_field_roles']),
40 ],
41 [
42 'appeals',
43 new Set(['actioned_item_type_schema_field_roles']),
44 ],
45 [
46 'reporting_rule_executions',
47 new Set(['result', 'item_type_schema_field_roles']),
48 ],
49 [
50 'reports',
51 new Set(['reported_item_data', 'reported_item_type_schema_field_roles']),
52 ],
53 ]);
54
55 private static readonly JSON_ARRAY_FIELDS_BY_TABLE = new Map<
56 string,
57 ReadonlySet<string>
58 >([
59 [
60 'action_executions',
61 new Set(['rules', 'policies', 'rule_tags']),
62 ],
63 [
64 'reporting_rule_executions',
65 new Set([]),
66 ],
67 [
68 'reports',
69 new Set([]),
70 ],
71 ]);
72
73 private static readonly DATE_TIME_FIELDS = new Set([
74 'rule_version',
75 'prior_rule_version',
76 ]);
77
78 private static readonly DATE_FIELD_KINDS = new Map<string, DateFieldKind>([
79 ['ts', 'datetime'],
80 ['ds', 'date'],
81 ['rule_version', 'datetime'],
82 ['prior_rule_version', 'datetime'],
83 ['ts_start_inclusive', 'datetime'],
84 ['ts_end_exclusive', 'datetime'],
85 ['reported_at', 'datetime'],
86 ['appealed_at', 'datetime'],
87 ['created_at', 'datetime'],
88 ['updated_at', 'datetime'],
89 ['action_time', 'datetime'],
90 ]);
91
92 readonly name = 'clickhouse-analytics';
93
94 private readonly tracer?: SafeTracer;
95 private readonly client: ClickHouseClient;
96 private readonly defaultBatchSize: number;
97
98 constructor(options: ClickhouseAnalyticsAdapterOptions) {
99 this.tracer = options.tracer;
100 this.defaultBatchSize = options.defaultBatchSize ?? 500;
101
102 const protocol = options.connection.protocol ?? 'http';
103 const port = options.connection.port ?? 8123;
104
105 const url = `${protocol}://${options.connection.host}:${port}`;
106 const password = options.connection.password.length
107 ? options.connection.password
108 : undefined;
109 this.client = createClient({
110 url,
111 username: options.connection.username,
112 ...(password ? { password } : {}),
113 database: options.connection.database,
114 });
115 }
116
117 async writeEvents(
118 table: string,
119 events: readonly AnalyticsEventInput[],
120 _options?: AnalyticsWriteOptions,
121 ): Promise<void> {
122 if (events.length === 0) {
123 return;
124 }
125
126 const batches = this.partition(events, this.defaultBatchSize);
127
128 for (const batch of batches) {
129 const normalizedBatch = batch.map((row) =>
130 this.normalizeRecord(row, table),
131 );
132
133 await this.client.insert({
134 table,
135 values: normalizedBatch,
136 format: 'JSONEachRow',
137 });
138 }
139 }
140
141 async query<T = AnalyticsQueryResult>(
142 sql: string,
143 params: readonly unknown[] = [],
144 ): Promise<readonly T[]> {
145 const execute = async () => {
146 const statement = formatClickhouseQuery(sql, params);
147 const result = await this.client.query({
148 query: statement,
149 format: 'JSONEachRow',
150 });
151
152 const rows = (await result.json());
153 return rows as readonly T[];
154 };
155
156 if (this.tracer) {
157 return this.tracer.addActiveSpan(
158 { resource: 'clickhouse.client', operation: 'clickhouse.query' },
159 execute,
160 );
161 }
162
163 return execute();
164 }
165
166 async flush(): Promise<void> {
167 // No-op: inserts are executed eagerly.
168 }
169
170 async close(): Promise<void> {
171 await this.client.close();
172 }
173
174 private partition<T>(values: readonly T[], size: number): T[][] {
175 if (values.length <= size) {
176 return [values.slice()];
177 }
178
179 const batches: T[][] = [];
180 for (let index = 0; index < values.length; index += size) {
181 batches.push(values.slice(index, index + size));
182 }
183
184 return batches;
185 }
186
187 private normalizeRecord(
188 record: AnalyticsEventInput,
189 table: string,
190 ): AnalyticsEventInput {
191 const normalized: AnalyticsEventInput = {};
192 const tableKey = ClickhouseAnalyticsAdapter.normalizeTableName(table);
193 const jsonObjectFields =
194 ClickhouseAnalyticsAdapter.JSON_OBJECT_FIELDS_BY_TABLE.get(tableKey);
195 const jsonArrayFields =
196 ClickhouseAnalyticsAdapter.JSON_ARRAY_FIELDS_BY_TABLE.get(tableKey);
197
198 for (const [key, value] of Object.entries(record)) {
199 // ClickHouse doesn't support nullable arrays, so convert null to empty array
200 // policy_names, policy_ids, tags are regular Array(String) columns
201 // rules, policies, rule_tags are String columns storing JSON
202 const regularArrayFields = ['policy_names', 'policy_ids', 'tags'];
203 const jsonStringFields = ['rules', 'policies', 'rule_tags'];
204
205 if (regularArrayFields.includes(key) && (value === null || value === undefined)) {
206 normalized[key] = [];
207 continue;
208 }
209
210 if (jsonStringFields.includes(key) && (value === null || value === undefined)) {
211 normalized[key] = '[]';
212 continue;
213 }
214
215 normalized[key] = this.normalizeFieldValue({
216 key,
217 value,
218 jsonArrayFields,
219 jsonObjectFields,
220 });
221 }
222
223 // Stringify JSON fields (stored as String in ClickHouse)
224 this.stringifyJsonFields(normalized, jsonObjectFields, '{}');
225 this.stringifyJsonFields(normalized, jsonArrayFields, '[]');
226
227 return normalized;
228 }
229
230 private stringifyJsonFields(
231 normalized: AnalyticsEventInput,
232 fields: ReadonlySet<string> | undefined,
233 defaultValue: string,
234 ): void {
235 if (!fields) {
236 return;
237 }
238
239 for (const fieldKey of fields) {
240 if (!(fieldKey in normalized)) {
241 normalized[fieldKey] = defaultValue;
242 } else {
243 const value = normalized[fieldKey];
244 if (typeof value === 'string') {
245 // Already a string, keep it
246 continue;
247 }
248 if (value !== null && (typeof value === 'object' || Array.isArray(value))) {
249 normalized[fieldKey] = jsonStringify(value);
250 } else {
251 normalized[fieldKey] = defaultValue;
252 }
253 }
254 }
255 }
256
257 private normalizeJsonObject(value: unknown): unknown {
258 if (value == null) {
259 return {};
260 }
261
262 if (typeof value === 'string') {
263 return tryJsonParse(value) ?? {};
264 }
265
266 if (Array.isArray(value)) {
267 return value;
268 }
269
270 if (typeof value === 'object') {
271 return value;
272 }
273
274 return {};
275 }
276
277 private normalizeJsonArray(value: unknown): unknown[] {
278 if (value == null) {
279 return [];
280 }
281
282 let candidate: unknown;
283
284 if (Array.isArray(value)) {
285 candidate = value;
286 } else if (typeof value === 'string') {
287 candidate = tryJsonParse(value);
288 if (candidate == null) {
289 return [];
290 }
291 } else {
292 candidate = tryJsonParse(jsonStringify(value));
293 if (candidate == null) {
294 return [];
295 }
296 }
297
298 if (!Array.isArray(candidate)) {
299 return [];
300 }
301
302 return candidate.map((entry) => {
303 if (entry == null) {
304 return entry;
305 }
306 if (Array.isArray(entry)) {
307 return this.normalizeJsonArray(entry);
308 }
309 if (typeof entry === 'object') {
310 return entry;
311 }
312 return entry;
313 });
314 }
315
316 private normalizeFieldValue(params: {
317 key: string;
318 value: unknown;
319 jsonArrayFields?: ReadonlySet<string>;
320 jsonObjectFields?: ReadonlySet<string>;
321 }): unknown {
322 const { key, value, jsonArrayFields, jsonObjectFields } = params;
323 const lowerKey = key.toLowerCase();
324
325 if (jsonArrayFields?.has(key)) {
326 return this.normalizeJsonArray(value);
327 }
328
329 const dateKind =
330 ClickhouseAnalyticsAdapter.DATE_FIELD_KINDS.get(lowerKey);
331 if (dateKind) {
332 return this.normalizeDateField(value, dateKind);
333 }
334
335 if (jsonObjectFields?.has(key)) {
336 return this.normalizeJsonObject(value);
337 }
338
339 // Some JSON/complex fields are stored as String in ClickHouse because:
340 // 1. ClickHouse doesn't support Nullable(JSON/Object)
341 // 2. ClickHouse JSON type doesn't handle arrays of objects well
342 const stringifyFields = [
343 'reported_item_thread', // REPORTING_SERVICE.REPORTS - nullable array
344 'reported_items_in_thread', // REPORTING_SERVICE.REPORTS - nullable array
345 'reported_item_type_schema', // REPORTING_SERVICE.REPORTS - array of objects
346 'item_type_schema', // various tables - array of objects
347 'actioned_item_type_schema', // REPORTING_SERVICE.APPEALS - array of objects
348 'additional_items', // REPORTING_SERVICE.REPORTS/APPEALS - array of objects
349 ];
350
351 if (stringifyFields.includes(key) && value != null) {
352 if (typeof value === 'string') {
353 return value; // Already stringified
354 }
355 return jsonStringify(value);
356 }
357
358 // For ClickHouse JSON columns with experimental object type enabled,
359 // we should pass objects/arrays directly. The ClickHouse client will
360 // handle serialization when using JSONEachRow format.
361
362 if (Array.isArray(value)) {
363 if (stringifyFields.includes(key)) {
364 return jsonStringify(value);
365 }
366 return value.map((item) => this.normalizeValue(item));
367 }
368
369 if (value && typeof value === 'object') {
370 const normalizedEntries = Object.entries(
371 value as Record<string, unknown>,
372 ).map(([entryKey, entryValue]) => [
373 entryKey,
374 this.normalizeValue(entryValue, entryKey),
375 ]);
376 return Object.fromEntries(normalizedEntries);
377 }
378
379 return value;
380 }
381
382 private normalizeDateField(
383 value: unknown,
384 kind: DateFieldKind,
385 ): unknown {
386 if (value == null) {
387 return value;
388 }
389
390 let date: Date | undefined;
391 if (value instanceof Date) {
392 date = value;
393 } else if (typeof value === 'number') {
394 date = new Date(value);
395 } else if (typeof value === 'string') {
396 const trimmed = value.trim();
397 if (!trimmed.length) {
398 return value;
399 }
400 const maybeNumber = Number(trimmed);
401 // eslint-disable-next-line security/detect-unsafe-regex
402 if (!Number.isNaN(maybeNumber) && /^[+-]?\d+(\.\d+)?$/u.test(trimmed)) {
403 // Interpret numeric strings as epoch milliseconds (or seconds if clearly seconds)
404 date = new Date(
405 trimmed.includes('.') ? maybeNumber * 1000 : maybeNumber,
406 );
407 } else {
408 date = new Date(trimmed);
409 }
410 }
411
412 if (!date || Number.isNaN(date.getTime())) {
413 return value;
414 }
415
416 return this.formatDate(date, kind);
417 }
418
419 private normalizeValue(value: unknown, key?: string): unknown {
420 // Used for recursion on nested values that aren't top-level fields.
421 if (key) {
422 return this.normalizeFieldValue({
423 key,
424 value,
425 });
426 }
427 return value;
428 }
429
430 private parseAndFormatDate(value: string, column: string): string | undefined {
431 const kind =
432 ClickhouseAnalyticsAdapter.DATE_FIELD_KINDS.get(column.toLowerCase());
433 if (!kind) {
434 return undefined;
435 }
436
437 const parsed = new Date(value);
438 if (Number.isNaN(parsed.getTime())) {
439 return undefined;
440 }
441
442 return this.formatDate(parsed, kind);
443 }
444
445 private formatDate(date: Date, kind: DateFieldKind): string {
446 if (kind === 'date') {
447 return date.toISOString().slice(0, 10);
448 }
449
450 return date.toISOString().replace('T', ' ').replace('Z', '');
451 }
452
453 private static normalizeTableName(table: string): string {
454 const lower = table.toLowerCase();
455 const dotIndex = lower.lastIndexOf('.');
456 return dotIndex >= 0 ? lower.slice(dotIndex + 1) : lower;
457 }
458}
459
460type DateFieldKind = 'datetime' | 'date';