Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

1# Data Warehouse Abstraction Layer 2 3## Overview 4 5The data warehouse abstraction allows you to use **any data warehouse** (Clickhouse, PostgreSQL, BigQuery, Redshift, Databricks, etc.) without changing application code. Define your warehouse settings by changing one environment variable. 6 7## Quick Start 8 9```typescript 10import { inject, type Dependencies } from '../iocContainer/index.js'; 11 12class MyService { 13 constructor(private readonly dataWarehouse: Dependencies['DataWarehouse']) {} 14 15 async getUserData(userId: string, tracer: SafeTracer) { 16 return this.dataWarehouse.query( 17 'SELECT * FROM users WHERE id = :1', 18 tracer, 19 [userId] 20 ); 21 } 22} 23 24export default inject(['DataWarehouse'], MyService); 25``` 26 27## Configuration 28 29Select adapters with `WAREHOUSE_ADAPTER` and (optionally) `ANALYTICS_ADAPTER`. 30Legacy deployments can keep using `DATA_WAREHOUSE_PROVIDER`; it is still accepted as a fallback. 31 32### PostgreSQL 33```bash 34WAREHOUSE_ADAPTER=postgresql 35ANALYTICS_ADAPTER=postgresql 36# Legacy fallback: 37DATA_WAREHOUSE_PROVIDER=postgresql 38DATABASE_HOST=localhost 39DATABASE_PORT=5432 40DATABASE_NAME=analytics 41DATABASE_USER=postgres 42DATABASE_PASSWORD=password 43``` 44 45### Clickhouse 46```bash 47WAREHOUSE_ADAPTER=clickhouse 48# Optional: override analytics adapter 49# ANALYTICS_ADAPTER=clickhouse 50# Legacy fallback: 51DATA_WAREHOUSE_PROVIDER=clickhouse 52CLICKHOUSE_HOST=localhost 53CLICKHOUSE_PORT=8123 54CLICKHOUSE_USERNAME=default 55CLICKHOUSE_PASSWORD=password 56CLICKHOUSE_DATABASE=analytics 57CLICKHOUSE_PROTOCOL=http 58 59# Disable analytics writes (while keeping the warehouse) 60# ANALYTICS_ADAPTER=noop 61``` 62 63## How It Works 64 65### Three Interfaces 66 67**1. IDataWarehouse** - Raw SQL queries 68```typescript 69await dataWarehouse.query('SELECT * FROM users', tracer); 70await dataWarehouse.transaction(async (query) => { 71 await query('UPDATE users SET score = :1', [100]); 72 await query('INSERT INTO audit_log VALUES (:1)', [userId]); 73}); 74``` 75 76**2. IDataWarehouseDialect** - Type-safe Kysely queries 77```typescript 78const kysely = dialect.getKyselyInstance(); 79await kysely.selectFrom('users').selectAll().execute(); 80``` 81 82**3. IDataWarehouseAnalytics** - Bulk writes & logging 83```typescript 84await analytics.bulkWrite('RULE_EXECUTIONS', [ 85 { ds: '2024-01-01', ts: Date.now(), org_id: 'org1', ... } 86]); 87``` 88 89### How Loggers Work 90 91**All analytics loggers use the abstraction:** 92 93```typescript 94// server/services/analyticsLoggers/RuleExecutionLogger.ts 95class RuleExecutionLogger { 96 constructor(private readonly analytics: Dependencies['DataWarehouseAnalytics']) {} 97 98 async logRuleExecutions(executions: any[]) { 99 await this.analytics.bulkWrite('RULE_EXECUTIONS', executions); 100 } 101} 102 103export default inject(['DataWarehouseAnalytics'], RuleExecutionLogger); 104``` 105 106**What happens:** 1071. Service calls `logger.logRuleExecutions(data)` 1082. Logger calls `analytics.bulkWrite('RULE_EXECUTIONS', data)` 1093. For **Clickhouse**: Chunked JSONEachRow inserts over HTTP (default batches of 500 rows) 1104. For **PostgreSQL**: Buffers → COPY or batch INSERT 111 112**No warehouse-specific code in loggers!** They just call `bulkWrite()`. 113 114### Data Flow 115 116#### Clickhouse / PostgreSQL (direct) 117``` 118RuleExecutionLogger 119120DataWarehouseAnalytics.bulkWrite() 121122ClickhouseAnalyticsAdapter / PostgresAnalyticsAdapter 123124HTTP JSONEachRow (Clickhouse) or batched INSERT (PostgreSQL) 125126Analytics tables 127``` 128 129## Required Tables 130 131All warehouses need these tables. Schema types defined in `/server/storage/dataWarehouse/IDataWarehouseAnalytics.ts`. 132 133**Core tables:** 134- `RULE_EXECUTIONS` - Rule evaluation logs 135- `ACTION_EXECUTIONS` - Moderation action logs 136- `ITEM_MODEL_SCORES_LOG` - ML model prediction logs 137- `CONTENT_API_REQUESTS` - API request logs 138 139ClickHouse DDL lives alongside the rest of our migrations at 140`db/src/scripts/clickhouse/`. Add new files there when the schema evolves. 141 142**Migration examples:** 143 144### Clickhouse 145```sql 146CREATE TABLE rule_executions ( 147 ds Date, 148 ts UInt64, 149 org_id String, 150 rule_id String, 151 passed UInt8, 152 result String, -- JSON as string 153 -- ... ~20 more fields 154) ENGINE = MergeTree() 155PARTITION BY ds 156ORDER BY (ds, ts, org_id); 157``` 158 159### PostgreSQL 160```sql 161CREATE TABLE rule_executions ( 162 ds DATE, 163 ts BIGINT, 164 org_id VARCHAR(255), 165 rule_id VARCHAR(255), 166 passed BOOLEAN, 167 result JSONB, 168 -- ... ~20 more fields 169) PARTITION BY RANGE (ds); 170``` 171 172**Full schema:** See `/server/storage/dataWarehouse/IDataWarehouseAnalytics.ts` lines 23-140. 173 174## Implementing a Custom Warehouse 175 176### Step 1: Implement an `IWarehouseAdapter` plugin 177 178Create a warehouse adapter under `server/plugins/warehouse/adapters`: 179 180```typescript 181// server/plugins/warehouse/adapters/MyWarehouseAdapter.ts 182import type SafeTracer from '../../../utils/SafeTracer.js'; 183import type { IWarehouseAdapter } from '../IWarehouseAdapter.js'; 184import { 185 type WarehouseQueryFn, 186 type WarehouseQueryResult, 187 type WarehouseTransactionFn, 188} from '../types.js'; 189 190export class MyWarehouseAdapter implements IWarehouseAdapter { 191 readonly name = 'my-warehouse'; 192 193 constructor(private readonly client: SomeWarehouseClient, private readonly tracer?: SafeTracer) {} 194 195 start(): void { 196 // Optional: warm up connection pools 197 } 198 199 async query<T = WarehouseQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> { 200 const execute = async () => { 201 const rows = await this.client.execute(sql, params); 202 return rows as readonly T[]; 203 }; 204 205 return this.tracer 206 ? (this.tracer.addActiveSpan({ resource: 'my-warehouse.query', operation: 'query' }, execute) as Promise<readonly T[]>) 207 : execute(); 208 } 209 210 async transaction<T>(fn: WarehouseTransactionFn<T>): Promise<T> { 211 return this.client.transaction(async () => fn((statement, parameters) => this.query(statement, parameters))); 212 } 213 214 async flush(): Promise<void> {} 215 216 async close(): Promise<void> { 217 await this.client.close(); 218 } 219} 220``` 221 222### Step 2: Provide a `IDataWarehouseDialect` (Kysely) implementation 223 224If you need type-safe queries, create a dialect wrapper (see `ClickhouseKyselyAdapter` for a concrete example) and return it from `DataWarehouseFactory.createKyselyDialect`. 225 226### Step 3: Implement an `IAnalyticsAdapter` plugin 227 228Analytics adapters live under `server/plugins/analytics/adapters` and implement bulk writes plus optional CDC: 229 230```typescript 231// server/plugins/analytics/adapters/MyAnalyticsAdapter.ts 232import type { IAnalyticsAdapter } from '../IAnalyticsAdapter.js'; 233import { 234 type AnalyticsEventInput, 235 type AnalyticsQueryResult, 236 type AnalyticsWriteOptions, 237} from '../types.js'; 238 239export class MyAnalyticsAdapter implements IAnalyticsAdapter { 240 readonly name = 'my-analytics'; 241 242 constructor(private readonly client: SomeWarehouseClient) {} 243 244 async writeEvents(table: string, events: readonly AnalyticsEventInput[], _options?: AnalyticsWriteOptions): Promise<void> { 245 if (events.length === 0) { 246 return; 247 } 248 await this.client.insert(table, events); 249 } 250 251 async query<T = AnalyticsQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> { 252 return (await this.client.query(sql, params)) as readonly T[]; 253 } 254 255 async flush(): Promise<void> {} 256 257 async close(): Promise<void> { 258 await this.client.close(); 259 } 260} 261``` 262 263### Step 4: Register the provider in `DataWarehouseFactory` 264 265Update `DataWarehouseFactory.createDataWarehouse`, `createKyselyDialect`, and `createAnalyticsAdapter` to instantiate your plugins. The factory wraps them in bridges so the rest of the application only speaks the generic interfaces. 266 267### Step 5: Create Analytics Tables 268 269All warehouses need the same tables (schema in `IDataWarehouseAnalytics.ts`): 270 271```sql 272-- Adapt syntax for your warehouse 273CREATE TABLE rule_executions ( 274 ds DATE, 275 ts BIGINT, 276 org_id VARCHAR, 277 item_id VARCHAR, 278 rule_id VARCHAR, 279 passed BOOLEAN, 280 result JSON, -- Or JSONB, String depending on warehouse 281 -- ... see IDataWarehouseAnalytics.ts for all ~20 fields 282); 283``` 284 285### Step 6: Configure and Run 286 287```bash 288export WAREHOUSE_ADAPTER=your-warehouse 289# Optional overrides 290# export ANALYTICS_ADAPTER=your-warehouse 291# Legacy fallback: 292# export DATA_WAREHOUSE_PROVIDER=your-warehouse 293export YOUR_WAREHOUSE_HOST=localhost 294# ... other config vars 295 296npm start 297``` 298 299 300## How Services Consume Analytics Data 301 302Services query analytics data using `DataWarehouseDialect`: 303 304```typescript 305// server/services/analyticsQueries/UserHistoryQueries.ts 306class UserHistoryQueries { 307 constructor(private readonly dialect: Dependencies['DataWarehouseDialect']) {} 308 309 async getUserRuleExecutionsHistory(orgId: string, userId: string) { 310 const kysely = this.dialect.getKyselyInstance(); 311 312 return kysely 313 .selectFrom('RULE_EXECUTIONS') 314 .where('ORG_ID', '=', orgId) 315 .where('ITEM_CREATOR_ID', '=', userId) 316 .selectAll() 317 .execute(); 318 } 319} 320 321export default inject(['DataWarehouseDialect'], UserHistoryQueries); 322``` 323 324**Works with any supported warehouse:** 325- Clickhouse: Uses ClickhouseDialect 326- PostgreSQL: Uses PostgresDialect 327 328## Available IOC Services 329 330| Service | Type | Purpose | 331|---------|------|---------| 332| `DataWarehouse` | `IDataWarehouse` | Raw SQL, transactions | 333| `DataWarehouseDialect` | `IDataWarehouseDialect` | Type-safe queries | 334| `DataWarehouseAnalytics` | `IDataWarehouseAnalytics` | Bulk writes, logging | 335 336## File Structure 337 338``` 339server/storage/dataWarehouse/ 340├── IDataWarehouse.ts # Core interface 341├── IDataWarehouseAnalytics.ts # Analytics interface + schema types 342├── DataWarehouseFactory.ts # Instantiates adapters via env configuration 343├── ClickhouseAdapter.ts # 📝 Stub - implement this 344├── ClickhouseAnalyticsAdapter.ts # 📝 Stub - implement this 345├── PostgresAnalyticsAdapter.ts # 📝 Stub - implement this 346└── index.ts 347 348server/plugins/warehouse/ # Pluggable warehouse adapters 349├── examples/NoOpWarehouseAdapter.ts 350└── ... 351 352server/plugins/analytics/ # Pluggable analytics adapters 353├── examples/NoOpAnalyticsAdapter.ts 354└── ... 355 356server/services/analyticsLoggers/ # Warehouse-agnostic loggers 357├── RuleExecutionLogger.ts # Uses DataWarehouseAnalytics 358├── ActionExecutionLogger.ts # Uses DataWarehouseAnalytics 359├── ItemModelScoreLogger.ts # Uses DataWarehouseAnalytics 360└── ... 361 362server/services/analyticsQueries/ # Warehouse-agnostic queries 363├── UserHistoryQueries.ts # Uses DataWarehouseDialect 364├── ItemHistoryQueries.ts # Uses DataWarehouseDialect 365└── ... 366``` 367 368## References 369 370- **Schema types:** `/server/storage/dataWarehouse/IDataWarehouseAnalytics.ts` 371- **Clickhouse:** `server/plugins/warehouse` and `server/plugins/analytics` adapters 372- **PostgreSQL migrations:** `db/src/scripts/api-server-pg/` (app DB); analytics tables may live in a dedicated analytics database per deployment 373- **Loggers:** `/server/services/analyticsLoggers/` 374- **Queries:** `/server/services/analyticsQueries/`