Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Data Warehouse Abstraction Layer#

Overview#

The data warehouse abstraction allows you to use any data warehouse (Snowflake, Clickhouse, PostgreSQL, BigQuery, Redshift, Databricks, etc.) without changing application code. Define your warehouse settings by changing one environment variable.

Quick Start#

import { inject, type Dependencies } from '../iocContainer/index.js';

class MyService {
  constructor(private readonly dataWarehouse: Dependencies['DataWarehouse']) {}
  
  async getUserData(userId: string, tracer: SafeTracer) {
    return this.dataWarehouse.query(
      'SELECT * FROM users WHERE id = :1',
      tracer,
      [userId]
    );
  }
}

export default inject(['DataWarehouse'], MyService);

Configuration#

Select adapters with WAREHOUSE_ADAPTER and (optionally) ANALYTICS_ADAPTER.
Legacy deployments can keep using DATA_WAREHOUSE_PROVIDER; it is still accepted as a fallback.

Snowflake#

WAREHOUSE_ADAPTER=snowflake
ANALYTICS_ADAPTER=snowflake
# Legacy fallback:
DATA_WAREHOUSE_PROVIDER=snowflake
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_USERNAME=user
SNOWFLAKE_PASSWORD=pass
SNOWFLAKE_DB_NAME=analytics
SNOWFLAKE_WAREHOUSE=COMPUTE_WH
SNOWFLAKE_SCHEMA=PUBLIC

PostgreSQL#

WAREHOUSE_ADAPTER=postgresql
ANALYTICS_ADAPTER=postgresql
# Legacy fallback:
DATA_WAREHOUSE_PROVIDER=postgresql
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_NAME=analytics
DATABASE_USER=postgres
DATABASE_PASSWORD=password

Clickhouse#

WAREHOUSE_ADAPTER=clickhouse
# Optional: override analytics adapter
# ANALYTICS_ADAPTER=clickhouse
# Legacy fallback:
DATA_WAREHOUSE_PROVIDER=clickhouse
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=8123
CLICKHOUSE_USERNAME=default
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_DATABASE=analytics
CLICKHOUSE_PROTOCOL=http

# Disable analytics writes (while keeping the warehouse)
# ANALYTICS_ADAPTER=noop

How It Works#

Three Interfaces#

1. IDataWarehouse - Raw SQL queries

await dataWarehouse.query('SELECT * FROM users', tracer);
await dataWarehouse.transaction(async (query) => {
  await query('UPDATE users SET score = :1', [100]);
  await query('INSERT INTO audit_log VALUES (:1)', [userId]);
});

2. IDataWarehouseDialect - Type-safe Kysely queries

const kysely = dialect.getKyselyInstance();
await kysely.selectFrom('users').selectAll().execute();

3. IDataWarehouseAnalytics - Bulk writes & logging

await analytics.bulkWrite('RULE_EXECUTIONS', [
  { ds: '2024-01-01', ts: Date.now(), org_id: 'org1', ... }
]);

How Loggers Work#

All analytics loggers use the abstraction:

// server/services/analyticsLoggers/RuleExecutionLogger.ts
class RuleExecutionLogger {
  constructor(private readonly analytics: Dependencies['DataWarehouseAnalytics']) {}
  
  async logRuleExecutions(executions: any[]) {
    await this.analytics.bulkWrite('RULE_EXECUTIONS', executions);
  }
}

export default inject(['DataWarehouseAnalytics'], RuleExecutionLogger);

What happens:

  1. Service calls logger.logRuleExecutions(data)
  2. Logger calls analytics.bulkWrite('RULE_EXECUTIONS', data)
  3. For Snowflake: Buffers → Kafka → Worker → Snowflake (high-throughput)
  4. For Clickhouse: Chunked JSONEachRow inserts over HTTP (default batches of 500 rows)
  5. For PostgreSQL: Buffers → COPY or batch INSERT

No warehouse-specific code in loggers! They just call bulkWrite().

Data Flow#

Snowflake (High-Throughput)#

RuleExecutionLogger
    ↓
DataWarehouseAnalytics.bulkWrite()
    ↓
SnowflakeAnalyticsAdapter
    ↓
DataLoader (batches 200 rows)
    ↓
Kafka Topic: DATA_WAREHOUSE_INGEST_EVENTS
    ↓
SnowflakeIngestionWorker
    ↓
Snowflake Tables

Clickhouse/PostgreSQL (Direct)#

RuleExecutionLogger
    ↓
DataWarehouseAnalytics.bulkWrite()
    ↓
ClickhouseAnalyticsAdapter
    ↓
Chunk rows (default size 500)
    ↓
HTTP JSONEachRow INSERT into Clickhouse

Required Tables#

All warehouses need these tables. Schema types defined in /server/storage/dataWarehouse/IDataWarehouseAnalytics.ts.

Core tables:

  • RULE_EXECUTIONS - Rule evaluation logs
  • ACTION_EXECUTIONS - Moderation action logs
  • ITEM_MODEL_SCORES_LOG - ML model prediction logs
  • CONTENT_API_REQUESTS - API request logs

ClickHouse DDL lives alongside the rest of our migrations at
.devops/migrator/src/scripts/clickhouse/. Add new files there when the schema evolves.

Migration examples:

Snowflake#

CREATE TABLE RULE_EXECUTIONS (
  DS DATE,
  TS NUMBER,
  ORG_ID VARCHAR,
  RULE_ID VARCHAR,
  PASSED BOOLEAN,
  RESULT VARIANT,  -- JSON
  -- ... ~20 more fields, see IDataWarehouseAnalytics.ts
);

Clickhouse#

CREATE TABLE rule_executions (
  ds Date,
  ts UInt64,
  org_id String,
  rule_id String,
  passed UInt8,
  result String,  -- JSON as string
  -- ... ~20 more fields
) ENGINE = MergeTree()
PARTITION BY ds
ORDER BY (ds, ts, org_id);

PostgreSQL#

CREATE TABLE rule_executions (
  ds DATE,
  ts BIGINT,
  org_id VARCHAR(255),
  rule_id VARCHAR(255),
  passed BOOLEAN,
  result JSONB,
  -- ... ~20 more fields
) PARTITION BY RANGE (ds);

Full schema: See /server/storage/dataWarehouse/IDataWarehouseAnalytics.ts lines 23-140.

Implementing a Custom Warehouse#

Step 1: Implement an IWarehouseAdapter plugin#

Create a warehouse adapter under server/plugins/warehouse/adapters:

// server/plugins/warehouse/adapters/MyWarehouseAdapter.ts
import type SafeTracer from '../../../utils/SafeTracer.js';
import type { IWarehouseAdapter } from '../IWarehouseAdapter.js';
import {
  type WarehouseQueryFn,
  type WarehouseQueryResult,
  type WarehouseTransactionFn,
} from '../types.js';

export class MyWarehouseAdapter implements IWarehouseAdapter {
  readonly name = 'my-warehouse';

  constructor(private readonly client: SomeWarehouseClient, private readonly tracer?: SafeTracer) {}

  start(): void {
    // Optional: warm up connection pools
  }

  async query<T = WarehouseQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> {
    const execute = async () => {
      const rows = await this.client.execute(sql, params);
      return rows as readonly T[];
    };

    return this.tracer
      ? (this.tracer.addActiveSpan({ resource: 'my-warehouse.query', operation: 'query' }, execute) as Promise<readonly T[]>)
      : execute();
  }

  async transaction<T>(fn: WarehouseTransactionFn<T>): Promise<T> {
    return this.client.transaction(async () => fn((statement, parameters) => this.query(statement, parameters)));
  }

  async flush(): Promise<void> {}

  async close(): Promise<void> {
    await this.client.close();
  }
}

Step 2: Provide a IDataWarehouseDialect (Kysely) implementation#

If you need type-safe queries, create a dialect wrapper (see ClickhouseKyselyAdapter for a concrete example) and return it from DataWarehouseFactory.createKyselyDialect.

Step 3: Implement an IAnalyticsAdapter plugin#

Analytics adapters live under server/plugins/analytics/adapters and implement bulk writes plus optional CDC:

// server/plugins/analytics/adapters/MyAnalyticsAdapter.ts
import type { IAnalyticsAdapter } from '../IAnalyticsAdapter.js';
import {
  type AnalyticsEventInput,
  type AnalyticsQueryResult,
  type AnalyticsWriteOptions,
} from '../types.js';

export class MyAnalyticsAdapter implements IAnalyticsAdapter {
  readonly name = 'my-analytics';

  constructor(private readonly client: SomeWarehouseClient) {}

  async writeEvents(table: string, events: readonly AnalyticsEventInput[], _options?: AnalyticsWriteOptions): Promise<void> {
    if (events.length === 0) {
      return;
    }
    await this.client.insert(table, events);
  }

  async query<T = AnalyticsQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> {
    return (await this.client.query(sql, params)) as readonly T[];
  }

  async flush(): Promise<void> {}

  async close(): Promise<void> {
    await this.client.close();
  }
}

Step 4: Register the provider in DataWarehouseFactory#

Update DataWarehouseFactory.createDataWarehouse, createKyselyDialect, and createAnalyticsAdapter to instantiate your plugins. The factory wraps them in bridges so the rest of the application only speaks the generic interfaces.

Step 5: Create Analytics Tables#

All warehouses need the same tables (schema in IDataWarehouseAnalytics.ts):

-- Adapt syntax for your warehouse
CREATE TABLE rule_executions (
  ds DATE,
  ts BIGINT,
  org_id VARCHAR,
  item_id VARCHAR,
  rule_id VARCHAR,
  passed BOOLEAN,
  result JSON,  -- Or VARIANT, JSONB, String depending on warehouse
  -- ... see IDataWarehouseAnalytics.ts for all ~20 fields
);

Step 6: Configure and Run#

export WAREHOUSE_ADAPTER=your-warehouse
# Optional overrides
# export ANALYTICS_ADAPTER=your-warehouse
# Legacy fallback:
# export DATA_WAREHOUSE_PROVIDER=your-warehouse
export YOUR_WAREHOUSE_HOST=localhost
# ... other config vars

npm start

How Services Consume Analytics Data#

Services query analytics data using DataWarehouseDialect:

// server/services/analyticsQueries/UserHistoryQueries.ts
class UserHistoryQueries {
  constructor(private readonly dialect: Dependencies['DataWarehouseDialect']) {}

  async getUserRuleExecutionsHistory(orgId: string, userId: string) {
    const kysely = this.dialect.getKyselyInstance();
    
    return kysely
      .selectFrom('RULE_EXECUTIONS')
      .where('ORG_ID', '=', orgId)
      .where('ITEM_CREATOR_ID', '=', userId)
      .selectAll()
      .execute();
  }
}

export default inject(['DataWarehouseDialect'], UserHistoryQueries);

Works with any warehouse:

  • Snowflake: Uses SnowflakeDialect
  • Clickhouse: Uses ClickhouseDialect
  • PostgreSQL: Uses PostgresDialect

Available IOC Services#

Service Type Purpose
DataWarehouse IDataWarehouse Raw SQL, transactions
DataWarehouseDialect IDataWarehouseDialect Type-safe queries
DataWarehouseAnalytics IDataWarehouseAnalytics Bulk writes, logging

File Structure#

server/storage/dataWarehouse/
├── IDataWarehouse.ts              # Core interface
├── IDataWarehouseAnalytics.ts     # Analytics interface + schema types
├── DataWarehouseFactory.ts        # Instantiates adapters via env configuration
├── ClickhouseAdapter.ts           # 📝 Stub - implement this
├── ClickhouseAnalyticsAdapter.ts  # 📝 Stub - implement this
├── PostgresAnalyticsAdapter.ts    # 📝 Stub - implement this
└── index.ts

server/plugins/warehouse/           # Pluggable warehouse adapters
├── adapters/SnowflakeWarehouseAdapter.ts
├── examples/NoOpWarehouseAdapter.ts
└── ...

server/plugins/analytics/           # Pluggable analytics adapters
├── adapters/SnowflakeAnalyticsAdapter.ts
├── examples/NoOpAnalyticsAdapter.ts
└── ...

server/services/analyticsLoggers/   # Warehouse-agnostic loggers
├── RuleExecutionLogger.ts         # Uses DataWarehouseAnalytics
├── ActionExecutionLogger.ts       # Uses DataWarehouseAnalytics
├── ItemModelScoreLogger.ts        # Uses DataWarehouseAnalytics
└── ...

server/services/analyticsQueries/   # Warehouse-agnostic queries
├── UserHistoryQueries.ts          # Uses DataWarehouseDialect
├── ItemHistoryQueries.ts          # Uses DataWarehouseDialect
└── ...

References#

  • Schema types: /server/storage/dataWarehouse/IDataWarehouseAnalytics.ts
  • Snowflake warehouse adapter: /server/plugins/warehouse/adapters/SnowflakeWarehouseAdapter.ts
  • Snowflake analytics adapter: /server/plugins/analytics/adapters/SnowflakeAnalyticsAdapter.ts
  • Migration setup: /.devops/migrator/src/configs/snowflake.ts
  • Loggers: /server/services/analyticsLoggers/
  • Queries: /server/services/analyticsQueries/