Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 359 lines 10 kB view raw
1/** 2 * Factory for creating data warehouse instances based on configuration 3 */ 4 5/* eslint-disable max-classes-per-file */ 6import { 7 ClickhouseAnalyticsAdapter as ClickhouseAnalyticsPlugin, 8 NoOpAnalyticsAdapter, 9 type AnalyticsEventInput, 10 type IAnalyticsAdapter, 11} from '../../plugins/analytics/index.js'; 12import { 13 ClickhouseWarehouseAdapter, 14 NoOpWarehouseAdapter, 15 type IWarehouseAdapter, 16} from '../../plugins/warehouse/index.js'; 17import { assertUnreachable } from '../../utils/misc.js'; 18import type SafeTracer from '../../utils/SafeTracer.js'; 19import { 20 ClickhouseKyselyAdapter, 21 type ClickhouseConnectionSettings, 22} from './ClickhouseAdapter.js'; 23import { 24 type DataWarehousePoolSettings, 25 type IDataWarehouse, 26 type IDataWarehouseDialect, 27 type DataWarehouseProvider as IDataWarehouseProvider, 28 type TransactionFunction, 29} from './IDataWarehouse.js'; 30import { type Kysely } from 'kysely'; 31import type { 32 AnalyticsSchema, 33 BulkWriteConfig, 34 CDCChange, 35 CDCConfig, 36 IDataWarehouseAnalytics, 37} from './IDataWarehouseAnalytics.js'; 38import { PostgresAnalyticsAdapter } from './PostgresAnalyticsAdapter.js'; 39 40/** 41 * Concrete data warehouse providers 42 * Extend this type to add new warehouse implementations 43 */ 44export type DataWarehouseProvider = 'clickhouse' | 'postgresql' | 'noop'; 45 46export type AnalyticsProvider = 'clickhouse' | 'postgresql' | 'noop'; 47 48// Re-export the interface provider type for external use 49export type { IDataWarehouseProvider }; 50 51export type DataWarehouseConfig = 52 | { 53 provider: 'clickhouse'; 54 connection: ClickhouseConnectionSettings; 55 pool?: DataWarehousePoolSettings; 56 analyticsProvider?: AnalyticsProvider; 57 } 58 | { 59 provider: 'postgresql'; 60 connection: { 61 host: string; 62 port?: number; 63 username: string; 64 password: string; 65 database: string; 66 }; 67 pool?: DataWarehousePoolSettings; 68 analyticsProvider?: AnalyticsProvider; 69 } 70 | { 71 provider: 'noop'; 72 analyticsProvider?: AnalyticsProvider; 73 }; 74 75class NoOpKyselyDialect implements IDataWarehouseDialect { 76 getKyselyInstance(): Kysely<any> { 77 // Return a proxy so services can hold a Kysely reference without 78 // crashing at startup; any attempt to build or execute a query throws. 79 return new Proxy({} as Kysely<any>, { 80 get(_target, prop) { 81 if (prop === 'destroy') return async () => {}; 82 if (prop === 'then') return undefined; // not thenable 83 if (typeof prop === 'symbol') return undefined; 84 return () => { 85 throw new Error('NoOp dialect: Kysely queries are not supported'); 86 }; 87 }, 88 }); 89 } 90 async destroy(): Promise<void> {} 91} 92 93class WarehouseAdapterBridge implements IDataWarehouse { 94 constructor( 95 private readonly provider: DataWarehouseProvider, 96 private readonly adapter: IWarehouseAdapter, 97 ) {} 98 99 async query( 100 query: string, 101 tracer: SafeTracer, 102 binds: readonly unknown[] = [], 103 ): Promise<unknown[]> { 104 const runQuery = async () => { 105 const rows = await this.adapter.query(query, binds); 106 return Array.from(rows) as unknown[]; 107 }; 108 109 return tracer.addActiveSpan( 110 { 111 resource: `${this.provider}.client`, 112 operation: `${this.provider}.query`, 113 }, 114 runQuery, 115 ); 116 } 117 118 async transaction<T>(fn: TransactionFunction<T>): Promise<T> { 119 return this.adapter.transaction(async (warehouseQuery) => { 120 return fn(async (statement, parameters = []) => { 121 const rows = await warehouseQuery(statement, parameters); 122 return Array.from(rows) as unknown[]; 123 }); 124 }); 125 } 126 127 start(): void { 128 const maybeStart = (this.adapter as { start?: () => void }).start; 129 if (typeof maybeStart === 'function') { 130 maybeStart.call(this.adapter); 131 } 132 } 133 134 async close(): Promise<void> { 135 await this.adapter.flush(); 136 await this.adapter.close(); 137 } 138 139 getProvider(): DataWarehouseProvider { 140 return this.provider; 141 } 142} 143 144class AnalyticsAdapterBridge implements IDataWarehouseAnalytics { 145 constructor( 146 private readonly provider: DataWarehouseProvider, 147 private readonly adapter: IAnalyticsAdapter, 148 ) {} 149 150 async bulkWrite<TableName extends keyof AnalyticsSchema>( 151 tableName: TableName, 152 rows: readonly AnalyticsSchema[TableName][], 153 config?: BulkWriteConfig, 154 ): Promise<void> { 155 await this.adapter.writeEvents( 156 tableName, 157 rows as readonly AnalyticsEventInput[], 158 config?.batchTimeout !== undefined 159 ? { batchTimeout: config.batchTimeout } 160 : undefined, 161 ); 162 } 163 164 async createCDCStream<TableName extends string>( 165 config: CDCConfig<TableName>, 166 ): Promise<void> { 167 if (!this.adapter.createCDCStream) { 168 throw new Error( 169 `Analytics adapter "${this.provider}" does not support CDC streams.`, 170 ); 171 } 172 await this.adapter.createCDCStream(config); 173 } 174 175 async consumeCDCChanges<T = unknown>( 176 streamName: string, 177 callback: (changes: CDCChange<T>[]) => Promise<void>, 178 tracer: SafeTracer, 179 ): Promise<void> { 180 if (!this.adapter.consumeCDCChanges) { 181 throw new Error( 182 `Analytics adapter "${this.provider}" does not support CDC consumption.`, 183 ); 184 } 185 await this.adapter.consumeCDCChanges(streamName, callback, tracer); 186 } 187 188 supportsCDC(): boolean { 189 return this.adapter.supportsCDC?.() ?? false; 190 } 191 192 async flushPendingWrites(): Promise<void> { 193 await this.adapter.flush(); 194 } 195 196 async close(): Promise<void> { 197 await this.adapter.close(); 198 } 199} 200 201/** 202 * Factory class for creating data warehouse instances 203 */ 204// eslint-disable-next-line @typescript-eslint/no-extraneous-class 205export class DataWarehouseFactory { 206 /** 207 * Create a data warehouse instance based on the provided configuration 208 */ 209 static createDataWarehouse(config: DataWarehouseConfig): IDataWarehouse { 210 switch (config.provider) { 211 case 'noop': 212 return new WarehouseAdapterBridge('noop', new NoOpWarehouseAdapter()); 213 case 'clickhouse': 214 return new WarehouseAdapterBridge( 215 'clickhouse', 216 new ClickhouseWarehouseAdapter({ 217 connection: config.connection, 218 }), 219 ); 220 case 'postgresql': 221 return new WarehouseAdapterBridge( 222 'postgresql', 223 new NoOpWarehouseAdapter(), 224 ); 225 default: 226 return assertUnreachable( 227 config, 228 `Unknown data warehouse provider: ${ 229 (config as DataWarehouseConfig).provider 230 }`, 231 ); 232 } 233 } 234 235 /** 236 * Create a Kysely dialect instance based on the provided configuration 237 */ 238 static createKyselyDialect( 239 config: DataWarehouseConfig, 240 ): IDataWarehouseDialect { 241 switch (config.provider) { 242 case 'clickhouse': 243 return new ClickhouseKyselyAdapter(config.connection, config.pool); 244 case 'postgresql': 245 throw new Error('PostgreSQL Kysely dialect not yet implemented'); 246 case 'noop': 247 return new NoOpKyselyDialect(); 248 default: 249 return assertUnreachable( 250 config, 251 `Unknown data warehouse provider: ${ 252 (config as DataWarehouseConfig).provider 253 }`, 254 ); 255 } 256 } 257 258 /** 259 * Create an analytics adapter for warehouse-specific analytics features 260 * (bulk writes, CDC, logging) 261 */ 262 static createAnalyticsAdapter( 263 config: DataWarehouseConfig, 264 dialect?: IDataWarehouseDialect, 265 ): IDataWarehouseAnalytics { 266 const analyticsProvider = 267 config.analyticsProvider ?? (config.provider as AnalyticsProvider); 268 269 switch (analyticsProvider) { 270 case 'noop': 271 return new AnalyticsAdapterBridge('noop', new NoOpAnalyticsAdapter()); 272 case 'clickhouse': 273 if (config.provider !== 'clickhouse') { 274 throw new Error( 275 'Clickhouse analytics provider requires the clickhouse warehouse configuration.', 276 ); 277 } 278 return new AnalyticsAdapterBridge( 279 'clickhouse', 280 new ClickhouseAnalyticsPlugin({ 281 connection: config.connection, 282 }), 283 ); 284 case 'postgresql': { 285 const pgKysely = dialect?.getKyselyInstance(); 286 if (!pgKysely) { 287 throw new Error('PostgreSQL analytics requires Kysely instance'); 288 } 289 return new PostgresAnalyticsAdapter(pgKysely); 290 } 291 default: 292 return assertUnreachable( 293 analyticsProvider, 294 `Unknown analytics provider: ${analyticsProvider as string}`, 295 ); 296 } 297 } 298 299 /** 300 * Create configuration from environment variables 301 */ 302 303 static createConfigFromEnv(): DataWarehouseConfig { 304 const provider = (process.env.WAREHOUSE_ADAPTER ?? 305 process.env.DATA_WAREHOUSE_PROVIDER ?? 306 'clickhouse') as DataWarehouseProvider; 307 const analyticsProvider = (process.env.ANALYTICS_ADAPTER ?? 308 provider) as AnalyticsProvider; 309 310 switch (provider) { 311 case 'noop': 312 return { 313 provider: 'noop', 314 analyticsProvider, 315 }; 316 case 'clickhouse': 317 return { 318 provider: 'clickhouse', 319 analyticsProvider, 320 connection: { 321 host: process.env.CLICKHOUSE_HOST ?? 'localhost', 322 port: process.env.CLICKHOUSE_PORT 323 ? parseInt(process.env.CLICKHOUSE_PORT) 324 : 8123, 325 username: process.env.CLICKHOUSE_USERNAME ?? 'default', 326 password: process.env.CLICKHOUSE_PASSWORD ?? '', 327 database: process.env.CLICKHOUSE_DATABASE ?? 'default', 328 protocol: (process.env.CLICKHOUSE_PROTOCOL ?? 'http') as 329 | 'http' 330 | 'https', 331 }, 332 pool: { 333 max: process.env.CLICKHOUSE_POOL_SIZE 334 ? parseInt(process.env.CLICKHOUSE_POOL_SIZE) 335 : 10, 336 }, 337 }; 338 case 'postgresql': 339 return { 340 provider: 'postgresql', 341 analyticsProvider, 342 connection: { 343 host: process.env.POSTGRES_HOST ?? 'localhost', 344 port: process.env.POSTGRES_PORT 345 ? parseInt(process.env.POSTGRES_PORT) 346 : undefined, 347 username: process.env.POSTGRES_USERNAME ?? 'postgres', 348 password: process.env.POSTGRES_PASSWORD ?? '', 349 database: process.env.POSTGRES_DATABASE ?? 'postgres', 350 }, 351 }; 352 default: 353 return assertUnreachable( 354 provider, 355 `Unknown data warehouse provider: ${provider as string}`, 356 ); 357 } 358 } 359}