Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/**
2 * Factory for creating data warehouse instances based on configuration
3 */
4
5/* eslint-disable max-classes-per-file */
6import {
7 ClickhouseAnalyticsAdapter as ClickhouseAnalyticsPlugin,
8 NoOpAnalyticsAdapter,
9 type AnalyticsEventInput,
10 type IAnalyticsAdapter,
11} from '../../plugins/analytics/index.js';
12import {
13 ClickhouseWarehouseAdapter,
14 NoOpWarehouseAdapter,
15 type IWarehouseAdapter,
16} from '../../plugins/warehouse/index.js';
17import { assertUnreachable } from '../../utils/misc.js';
18import type SafeTracer from '../../utils/SafeTracer.js';
19import {
20 ClickhouseKyselyAdapter,
21 type ClickhouseConnectionSettings,
22} from './ClickhouseAdapter.js';
23import {
24 type DataWarehousePoolSettings,
25 type IDataWarehouse,
26 type IDataWarehouseDialect,
27 type DataWarehouseProvider as IDataWarehouseProvider,
28 type TransactionFunction,
29} from './IDataWarehouse.js';
30import { type Kysely } from 'kysely';
31import type {
32 AnalyticsSchema,
33 BulkWriteConfig,
34 CDCChange,
35 CDCConfig,
36 IDataWarehouseAnalytics,
37} from './IDataWarehouseAnalytics.js';
38import { PostgresAnalyticsAdapter } from './PostgresAnalyticsAdapter.js';
39
40/**
41 * Concrete data warehouse providers
42 * Extend this type to add new warehouse implementations
43 */
44export type DataWarehouseProvider = 'clickhouse' | 'postgresql' | 'noop';
45
46export type AnalyticsProvider = 'clickhouse' | 'postgresql' | 'noop';
47
48// Re-export the interface provider type for external use
49export type { IDataWarehouseProvider };
50
51export type DataWarehouseConfig =
52 | {
53 provider: 'clickhouse';
54 connection: ClickhouseConnectionSettings;
55 pool?: DataWarehousePoolSettings;
56 analyticsProvider?: AnalyticsProvider;
57 }
58 | {
59 provider: 'postgresql';
60 connection: {
61 host: string;
62 port?: number;
63 username: string;
64 password: string;
65 database: string;
66 };
67 pool?: DataWarehousePoolSettings;
68 analyticsProvider?: AnalyticsProvider;
69 }
70 | {
71 provider: 'noop';
72 analyticsProvider?: AnalyticsProvider;
73 };
74
75class NoOpKyselyDialect implements IDataWarehouseDialect {
76 getKyselyInstance(): Kysely<any> {
77 // Return a proxy so services can hold a Kysely reference without
78 // crashing at startup; any attempt to build or execute a query throws.
79 return new Proxy({} as Kysely<any>, {
80 get(_target, prop) {
81 if (prop === 'destroy') return async () => {};
82 if (prop === 'then') return undefined; // not thenable
83 if (typeof prop === 'symbol') return undefined;
84 return () => {
85 throw new Error('NoOp dialect: Kysely queries are not supported');
86 };
87 },
88 });
89 }
90 async destroy(): Promise<void> {}
91}
92
93class WarehouseAdapterBridge implements IDataWarehouse {
94 constructor(
95 private readonly provider: DataWarehouseProvider,
96 private readonly adapter: IWarehouseAdapter,
97 ) {}
98
99 async query(
100 query: string,
101 tracer: SafeTracer,
102 binds: readonly unknown[] = [],
103 ): Promise<unknown[]> {
104 const runQuery = async () => {
105 const rows = await this.adapter.query(query, binds);
106 return Array.from(rows) as unknown[];
107 };
108
109 return tracer.addActiveSpan(
110 {
111 resource: `${this.provider}.client`,
112 operation: `${this.provider}.query`,
113 },
114 runQuery,
115 );
116 }
117
118 async transaction<T>(fn: TransactionFunction<T>): Promise<T> {
119 return this.adapter.transaction(async (warehouseQuery) => {
120 return fn(async (statement, parameters = []) => {
121 const rows = await warehouseQuery(statement, parameters);
122 return Array.from(rows) as unknown[];
123 });
124 });
125 }
126
127 start(): void {
128 const maybeStart = (this.adapter as { start?: () => void }).start;
129 if (typeof maybeStart === 'function') {
130 maybeStart.call(this.adapter);
131 }
132 }
133
134 async close(): Promise<void> {
135 await this.adapter.flush();
136 await this.adapter.close();
137 }
138
139 getProvider(): DataWarehouseProvider {
140 return this.provider;
141 }
142}
143
144class AnalyticsAdapterBridge implements IDataWarehouseAnalytics {
145 constructor(
146 private readonly provider: DataWarehouseProvider,
147 private readonly adapter: IAnalyticsAdapter,
148 ) {}
149
150 async bulkWrite<TableName extends keyof AnalyticsSchema>(
151 tableName: TableName,
152 rows: readonly AnalyticsSchema[TableName][],
153 config?: BulkWriteConfig,
154 ): Promise<void> {
155 await this.adapter.writeEvents(
156 tableName,
157 rows as readonly AnalyticsEventInput[],
158 config?.batchTimeout !== undefined
159 ? { batchTimeout: config.batchTimeout }
160 : undefined,
161 );
162 }
163
164 async createCDCStream<TableName extends string>(
165 config: CDCConfig<TableName>,
166 ): Promise<void> {
167 if (!this.adapter.createCDCStream) {
168 throw new Error(
169 `Analytics adapter "${this.provider}" does not support CDC streams.`,
170 );
171 }
172 await this.adapter.createCDCStream(config);
173 }
174
175 async consumeCDCChanges<T = unknown>(
176 streamName: string,
177 callback: (changes: CDCChange<T>[]) => Promise<void>,
178 tracer: SafeTracer,
179 ): Promise<void> {
180 if (!this.adapter.consumeCDCChanges) {
181 throw new Error(
182 `Analytics adapter "${this.provider}" does not support CDC consumption.`,
183 );
184 }
185 await this.adapter.consumeCDCChanges(streamName, callback, tracer);
186 }
187
188 supportsCDC(): boolean {
189 return this.adapter.supportsCDC?.() ?? false;
190 }
191
192 async flushPendingWrites(): Promise<void> {
193 await this.adapter.flush();
194 }
195
196 async close(): Promise<void> {
197 await this.adapter.close();
198 }
199}
200
201/**
202 * Factory class for creating data warehouse instances
203 */
204// eslint-disable-next-line @typescript-eslint/no-extraneous-class
205export class DataWarehouseFactory {
206 /**
207 * Create a data warehouse instance based on the provided configuration
208 */
209 static createDataWarehouse(config: DataWarehouseConfig): IDataWarehouse {
210 switch (config.provider) {
211 case 'noop':
212 return new WarehouseAdapterBridge('noop', new NoOpWarehouseAdapter());
213 case 'clickhouse':
214 return new WarehouseAdapterBridge(
215 'clickhouse',
216 new ClickhouseWarehouseAdapter({
217 connection: config.connection,
218 }),
219 );
220 case 'postgresql':
221 return new WarehouseAdapterBridge(
222 'postgresql',
223 new NoOpWarehouseAdapter(),
224 );
225 default:
226 return assertUnreachable(
227 config,
228 `Unknown data warehouse provider: ${
229 (config as DataWarehouseConfig).provider
230 }`,
231 );
232 }
233 }
234
235 /**
236 * Create a Kysely dialect instance based on the provided configuration
237 */
238 static createKyselyDialect(
239 config: DataWarehouseConfig,
240 ): IDataWarehouseDialect {
241 switch (config.provider) {
242 case 'clickhouse':
243 return new ClickhouseKyselyAdapter(config.connection, config.pool);
244 case 'postgresql':
245 throw new Error('PostgreSQL Kysely dialect not yet implemented');
246 case 'noop':
247 return new NoOpKyselyDialect();
248 default:
249 return assertUnreachable(
250 config,
251 `Unknown data warehouse provider: ${
252 (config as DataWarehouseConfig).provider
253 }`,
254 );
255 }
256 }
257
258 /**
259 * Create an analytics adapter for warehouse-specific analytics features
260 * (bulk writes, CDC, logging)
261 */
262 static createAnalyticsAdapter(
263 config: DataWarehouseConfig,
264 dialect?: IDataWarehouseDialect,
265 ): IDataWarehouseAnalytics {
266 const analyticsProvider =
267 config.analyticsProvider ?? (config.provider as AnalyticsProvider);
268
269 switch (analyticsProvider) {
270 case 'noop':
271 return new AnalyticsAdapterBridge('noop', new NoOpAnalyticsAdapter());
272 case 'clickhouse':
273 if (config.provider !== 'clickhouse') {
274 throw new Error(
275 'Clickhouse analytics provider requires the clickhouse warehouse configuration.',
276 );
277 }
278 return new AnalyticsAdapterBridge(
279 'clickhouse',
280 new ClickhouseAnalyticsPlugin({
281 connection: config.connection,
282 }),
283 );
284 case 'postgresql': {
285 const pgKysely = dialect?.getKyselyInstance();
286 if (!pgKysely) {
287 throw new Error('PostgreSQL analytics requires Kysely instance');
288 }
289 return new PostgresAnalyticsAdapter(pgKysely);
290 }
291 default:
292 return assertUnreachable(
293 analyticsProvider,
294 `Unknown analytics provider: ${analyticsProvider as string}`,
295 );
296 }
297 }
298
299 /**
300 * Create configuration from environment variables
301 */
302
303 static createConfigFromEnv(): DataWarehouseConfig {
304 const provider = (process.env.WAREHOUSE_ADAPTER ??
305 process.env.DATA_WAREHOUSE_PROVIDER ??
306 'clickhouse') as DataWarehouseProvider;
307 const analyticsProvider = (process.env.ANALYTICS_ADAPTER ??
308 provider) as AnalyticsProvider;
309
310 switch (provider) {
311 case 'noop':
312 return {
313 provider: 'noop',
314 analyticsProvider,
315 };
316 case 'clickhouse':
317 return {
318 provider: 'clickhouse',
319 analyticsProvider,
320 connection: {
321 host: process.env.CLICKHOUSE_HOST ?? 'localhost',
322 port: process.env.CLICKHOUSE_PORT
323 ? parseInt(process.env.CLICKHOUSE_PORT)
324 : 8123,
325 username: process.env.CLICKHOUSE_USERNAME ?? 'default',
326 password: process.env.CLICKHOUSE_PASSWORD ?? '',
327 database: process.env.CLICKHOUSE_DATABASE ?? 'default',
328 protocol: (process.env.CLICKHOUSE_PROTOCOL ?? 'http') as
329 | 'http'
330 | 'https',
331 },
332 pool: {
333 max: process.env.CLICKHOUSE_POOL_SIZE
334 ? parseInt(process.env.CLICKHOUSE_POOL_SIZE)
335 : 10,
336 },
337 };
338 case 'postgresql':
339 return {
340 provider: 'postgresql',
341 analyticsProvider,
342 connection: {
343 host: process.env.POSTGRES_HOST ?? 'localhost',
344 port: process.env.POSTGRES_PORT
345 ? parseInt(process.env.POSTGRES_PORT)
346 : undefined,
347 username: process.env.POSTGRES_USERNAME ?? 'postgres',
348 password: process.env.POSTGRES_PASSWORD ?? '',
349 database: process.env.POSTGRES_DATABASE ?? 'postgres',
350 },
351 };
352 default:
353 return assertUnreachable(
354 provider,
355 `Unknown data warehouse provider: ${provider as string}`,
356 );
357 }
358 }
359}