Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1# Data Warehouse Abstraction Layer
2
3## Overview
4
5The data warehouse abstraction allows you to use **any data warehouse** (Clickhouse, PostgreSQL, BigQuery, Redshift, Databricks, etc.) without changing application code. Define your warehouse settings by changing one environment variable.
6
7## Quick Start
8
9```typescript
10import { inject, type Dependencies } from '../iocContainer/index.js';
11
12class MyService {
13 constructor(private readonly dataWarehouse: Dependencies['DataWarehouse']) {}
14
15 async getUserData(userId: string, tracer: SafeTracer) {
16 return this.dataWarehouse.query(
17 'SELECT * FROM users WHERE id = :1',
18 tracer,
19 [userId]
20 );
21 }
22}
23
24export default inject(['DataWarehouse'], MyService);
25```
26
27## Configuration
28
29Select adapters with `WAREHOUSE_ADAPTER` and (optionally) `ANALYTICS_ADAPTER`.
30Legacy deployments can keep using `DATA_WAREHOUSE_PROVIDER`; it is still accepted as a fallback.
31
32### PostgreSQL
33```bash
34WAREHOUSE_ADAPTER=postgresql
35ANALYTICS_ADAPTER=postgresql
36# Legacy fallback:
37DATA_WAREHOUSE_PROVIDER=postgresql
38DATABASE_HOST=localhost
39DATABASE_PORT=5432
40DATABASE_NAME=analytics
41DATABASE_USER=postgres
42DATABASE_PASSWORD=password
43```
44
45### Clickhouse
46```bash
47WAREHOUSE_ADAPTER=clickhouse
48# Optional: override analytics adapter
49# ANALYTICS_ADAPTER=clickhouse
50# Legacy fallback:
51DATA_WAREHOUSE_PROVIDER=clickhouse
52CLICKHOUSE_HOST=localhost
53CLICKHOUSE_PORT=8123
54CLICKHOUSE_USERNAME=default
55CLICKHOUSE_PASSWORD=password
56CLICKHOUSE_DATABASE=analytics
57CLICKHOUSE_PROTOCOL=http
58
59# Disable analytics writes (while keeping the warehouse)
60# ANALYTICS_ADAPTER=noop
61```
62
63## How It Works
64
65### Three Interfaces
66
67**1. IDataWarehouse** - Raw SQL queries
68```typescript
69await dataWarehouse.query('SELECT * FROM users', tracer);
70await dataWarehouse.transaction(async (query) => {
71 await query('UPDATE users SET score = :1', [100]);
72 await query('INSERT INTO audit_log VALUES (:1)', [userId]);
73});
74```
75
76**2. IDataWarehouseDialect** - Type-safe Kysely queries
77```typescript
78const kysely = dialect.getKyselyInstance();
79await kysely.selectFrom('users').selectAll().execute();
80```
81
82**3. IDataWarehouseAnalytics** - Bulk writes & logging
83```typescript
84await analytics.bulkWrite('RULE_EXECUTIONS', [
85 { ds: '2024-01-01', ts: Date.now(), org_id: 'org1', ... }
86]);
87```
88
89### How Loggers Work
90
91**All analytics loggers use the abstraction:**
92
93```typescript
94// server/services/analyticsLoggers/RuleExecutionLogger.ts
95class RuleExecutionLogger {
96 constructor(private readonly analytics: Dependencies['DataWarehouseAnalytics']) {}
97
98 async logRuleExecutions(executions: any[]) {
99 await this.analytics.bulkWrite('RULE_EXECUTIONS', executions);
100 }
101}
102
103export default inject(['DataWarehouseAnalytics'], RuleExecutionLogger);
104```
105
106**What happens:**
1071. Service calls `logger.logRuleExecutions(data)`
1082. Logger calls `analytics.bulkWrite('RULE_EXECUTIONS', data)`
1093. For **Clickhouse**: Chunked JSONEachRow inserts over HTTP (default batches of 500 rows)
1104. For **PostgreSQL**: Buffers → COPY or batch INSERT
111
112**No warehouse-specific code in loggers!** They just call `bulkWrite()`.
113
114### Data Flow
115
116#### Clickhouse / PostgreSQL (direct)
117```
118RuleExecutionLogger
119 ↓
120DataWarehouseAnalytics.bulkWrite()
121 ↓
122ClickhouseAnalyticsAdapter / PostgresAnalyticsAdapter
123 ↓
124HTTP JSONEachRow (Clickhouse) or batched INSERT (PostgreSQL)
125 ↓
126Analytics tables
127```
128
129## Required Tables
130
131All warehouses need these tables. Schema types defined in `/server/storage/dataWarehouse/IDataWarehouseAnalytics.ts`.
132
133**Core tables:**
134- `RULE_EXECUTIONS` - Rule evaluation logs
135- `ACTION_EXECUTIONS` - Moderation action logs
136- `ITEM_MODEL_SCORES_LOG` - ML model prediction logs
137- `CONTENT_API_REQUESTS` - API request logs
138
139ClickHouse DDL lives alongside the rest of our migrations at
140`db/src/scripts/clickhouse/`. Add new files there when the schema evolves.
141
142**Migration examples:**
143
144### Clickhouse
145```sql
146CREATE TABLE rule_executions (
147 ds Date,
148 ts UInt64,
149 org_id String,
150 rule_id String,
151 passed UInt8,
152 result String, -- JSON as string
153 -- ... ~20 more fields
154) ENGINE = MergeTree()
155PARTITION BY ds
156ORDER BY (ds, ts, org_id);
157```
158
159### PostgreSQL
160```sql
161CREATE TABLE rule_executions (
162 ds DATE,
163 ts BIGINT,
164 org_id VARCHAR(255),
165 rule_id VARCHAR(255),
166 passed BOOLEAN,
167 result JSONB,
168 -- ... ~20 more fields
169) PARTITION BY RANGE (ds);
170```
171
172**Full schema:** See `/server/storage/dataWarehouse/IDataWarehouseAnalytics.ts` lines 23-140.
173
174## Implementing a Custom Warehouse
175
176### Step 1: Implement an `IWarehouseAdapter` plugin
177
178Create a warehouse adapter under `server/plugins/warehouse/adapters`:
179
180```typescript
181// server/plugins/warehouse/adapters/MyWarehouseAdapter.ts
182import type SafeTracer from '../../../utils/SafeTracer.js';
183import type { IWarehouseAdapter } from '../IWarehouseAdapter.js';
184import {
185 type WarehouseQueryFn,
186 type WarehouseQueryResult,
187 type WarehouseTransactionFn,
188} from '../types.js';
189
190export class MyWarehouseAdapter implements IWarehouseAdapter {
191 readonly name = 'my-warehouse';
192
193 constructor(private readonly client: SomeWarehouseClient, private readonly tracer?: SafeTracer) {}
194
195 start(): void {
196 // Optional: warm up connection pools
197 }
198
199 async query<T = WarehouseQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> {
200 const execute = async () => {
201 const rows = await this.client.execute(sql, params);
202 return rows as readonly T[];
203 };
204
205 return this.tracer
206 ? (this.tracer.addActiveSpan({ resource: 'my-warehouse.query', operation: 'query' }, execute) as Promise<readonly T[]>)
207 : execute();
208 }
209
210 async transaction<T>(fn: WarehouseTransactionFn<T>): Promise<T> {
211 return this.client.transaction(async () => fn((statement, parameters) => this.query(statement, parameters)));
212 }
213
214 async flush(): Promise<void> {}
215
216 async close(): Promise<void> {
217 await this.client.close();
218 }
219}
220```
221
222### Step 2: Provide a `IDataWarehouseDialect` (Kysely) implementation
223
224If you need type-safe queries, create a dialect wrapper (see `ClickhouseKyselyAdapter` for a concrete example) and return it from `DataWarehouseFactory.createKyselyDialect`.
225
226### Step 3: Implement an `IAnalyticsAdapter` plugin
227
228Analytics adapters live under `server/plugins/analytics/adapters` and implement bulk writes plus optional CDC:
229
230```typescript
231// server/plugins/analytics/adapters/MyAnalyticsAdapter.ts
232import type { IAnalyticsAdapter } from '../IAnalyticsAdapter.js';
233import {
234 type AnalyticsEventInput,
235 type AnalyticsQueryResult,
236 type AnalyticsWriteOptions,
237} from '../types.js';
238
239export class MyAnalyticsAdapter implements IAnalyticsAdapter {
240 readonly name = 'my-analytics';
241
242 constructor(private readonly client: SomeWarehouseClient) {}
243
244 async writeEvents(table: string, events: readonly AnalyticsEventInput[], _options?: AnalyticsWriteOptions): Promise<void> {
245 if (events.length === 0) {
246 return;
247 }
248 await this.client.insert(table, events);
249 }
250
251 async query<T = AnalyticsQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> {
252 return (await this.client.query(sql, params)) as readonly T[];
253 }
254
255 async flush(): Promise<void> {}
256
257 async close(): Promise<void> {
258 await this.client.close();
259 }
260}
261```
262
263### Step 4: Register the provider in `DataWarehouseFactory`
264
265Update `DataWarehouseFactory.createDataWarehouse`, `createKyselyDialect`, and `createAnalyticsAdapter` to instantiate your plugins. The factory wraps them in bridges so the rest of the application only speaks the generic interfaces.
266
267### Step 5: Create Analytics Tables
268
269All warehouses need the same tables (schema in `IDataWarehouseAnalytics.ts`):
270
271```sql
272-- Adapt syntax for your warehouse
273CREATE TABLE rule_executions (
274 ds DATE,
275 ts BIGINT,
276 org_id VARCHAR,
277 item_id VARCHAR,
278 rule_id VARCHAR,
279 passed BOOLEAN,
280 result JSON, -- Or JSONB, String depending on warehouse
281 -- ... see IDataWarehouseAnalytics.ts for all ~20 fields
282);
283```
284
285### Step 6: Configure and Run
286
287```bash
288export WAREHOUSE_ADAPTER=your-warehouse
289# Optional overrides
290# export ANALYTICS_ADAPTER=your-warehouse
291# Legacy fallback:
292# export DATA_WAREHOUSE_PROVIDER=your-warehouse
293export YOUR_WAREHOUSE_HOST=localhost
294# ... other config vars
295
296npm start
297```
298
299
300## How Services Consume Analytics Data
301
302Services query analytics data using `DataWarehouseDialect`:
303
304```typescript
305// server/services/analyticsQueries/UserHistoryQueries.ts
306class UserHistoryQueries {
307 constructor(private readonly dialect: Dependencies['DataWarehouseDialect']) {}
308
309 async getUserRuleExecutionsHistory(orgId: string, userId: string) {
310 const kysely = this.dialect.getKyselyInstance();
311
312 return kysely
313 .selectFrom('RULE_EXECUTIONS')
314 .where('ORG_ID', '=', orgId)
315 .where('ITEM_CREATOR_ID', '=', userId)
316 .selectAll()
317 .execute();
318 }
319}
320
321export default inject(['DataWarehouseDialect'], UserHistoryQueries);
322```
323
324**Works with any supported warehouse:**
325- Clickhouse: Uses ClickhouseDialect
326- PostgreSQL: Uses PostgresDialect
327
328## Available IOC Services
329
330| Service | Type | Purpose |
331|---------|------|---------|
332| `DataWarehouse` | `IDataWarehouse` | Raw SQL, transactions |
333| `DataWarehouseDialect` | `IDataWarehouseDialect` | Type-safe queries |
334| `DataWarehouseAnalytics` | `IDataWarehouseAnalytics` | Bulk writes, logging |
335
336## File Structure
337
338```
339server/storage/dataWarehouse/
340├── IDataWarehouse.ts # Core interface
341├── IDataWarehouseAnalytics.ts # Analytics interface + schema types
342├── DataWarehouseFactory.ts # Instantiates adapters via env configuration
343├── ClickhouseAdapter.ts # 📝 Stub - implement this
344├── ClickhouseAnalyticsAdapter.ts # 📝 Stub - implement this
345├── PostgresAnalyticsAdapter.ts # 📝 Stub - implement this
346└── index.ts
347
348server/plugins/warehouse/ # Pluggable warehouse adapters
349├── examples/NoOpWarehouseAdapter.ts
350└── ...
351
352server/plugins/analytics/ # Pluggable analytics adapters
353├── examples/NoOpAnalyticsAdapter.ts
354└── ...
355
356server/services/analyticsLoggers/ # Warehouse-agnostic loggers
357├── RuleExecutionLogger.ts # Uses DataWarehouseAnalytics
358├── ActionExecutionLogger.ts # Uses DataWarehouseAnalytics
359├── ItemModelScoreLogger.ts # Uses DataWarehouseAnalytics
360└── ...
361
362server/services/analyticsQueries/ # Warehouse-agnostic queries
363├── UserHistoryQueries.ts # Uses DataWarehouseDialect
364├── ItemHistoryQueries.ts # Uses DataWarehouseDialect
365└── ...
366```
367
368## References
369
370- **Schema types:** `/server/storage/dataWarehouse/IDataWarehouseAnalytics.ts`
371- **Clickhouse:** `server/plugins/warehouse` and `server/plugins/analytics` adapters
372- **PostgreSQL migrations:** `db/src/scripts/api-server-pg/` (app DB); analytics tables may live in a dedicated analytics database per deployment
373- **Loggers:** `/server/services/analyticsLoggers/`
374- **Queries:** `/server/services/analyticsQueries/`