Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 123 lines 3.9 kB view raw
1/** 2 * PostgreSQL analytics adapter stub - SAMPLE ONLY 3 * 4 * This is a basic stub showing what you need to implement for PostgreSQL. 5 * 6 * To implement: 7 * - Use Kysely with PostgreSQL dialect 8 * - Use COPY for bulk inserts 9 * - Use logical replication for CDC (Debezium/pglogical) 10 * - See ClickhouseAnalyticsAdapter for a concrete analytics adapter reference 11 * - See ../README.md for implementation guide 12 */ 13 14import { sql, type Kysely } from 'kysely'; 15import type SafeTracer from '../../utils/SafeTracer.js'; 16import { 17 type IDataWarehouseAnalytics, 18 type AnalyticsSchema, 19 type BulkWriteConfig, 20 type CDCConfig, 21 type CDCChange, 22} from './IDataWarehouseAnalytics.js'; 23 24/** 25 * PostgreSQL analytics adapter 26 * Uses batch inserts and logical replication for CDC 27 */ 28export class PostgresAnalyticsAdapter implements IDataWarehouseAnalytics { 29 private pendingWrites: Map<string, any[]> = new Map(); 30 31 constructor(private readonly kysely: Kysely<any>) {} 32 33 async bulkWrite<TableName extends keyof AnalyticsSchema>( 34 tableName: TableName, 35 rows: readonly AnalyticsSchema[TableName][], 36 config?: BulkWriteConfig, 37 ): Promise<void> { 38 const tableKey = tableName as string; 39 40 if (!this.pendingWrites.has(tableKey)) { 41 this.pendingWrites.set(tableKey, []); 42 } 43 this.pendingWrites.get(tableKey)!.push(...rows); 44 45 const batchSize = config?.batchSize ?? 500; 46 const pending = this.pendingWrites.get(tableKey)!; 47 48 if (config?.batchTimeout === 0 || pending.length >= batchSize) { 49 await this.flushTable(tableKey); 50 } 51 } 52 53 async createCDCStream<TableName extends string>( 54 config: CDCConfig<TableName>, 55 ): Promise<void> { 56 const { tableName, schemaName = 'public' } = config; 57 await sql`ALTER TABLE ${sql.ref(`${schemaName}.${tableName}`)} REPLICA IDENTITY FULL`.execute( 58 this.kysely, 59 ); 60 await sql`CREATE PUBLICATION IF NOT EXISTS ${sql.raw(tableName)}_cdc FOR TABLE ${sql.ref(`${schemaName}.${tableName}`)}`.execute( 61 this.kysely, 62 ); 63 } 64 65 async consumeCDCChanges<T = unknown>( 66 _streamName: string, 67 _callback: (changes: CDCChange<T>[]) => Promise<void>, 68 _tracer: SafeTracer, 69 ): Promise<void> { 70 throw new Error( 71 'PostgreSQL CDC consumption requires external tools like Debezium. See INTEGRATOR_GUIDE.md', 72 ); 73 } 74 75 supportsCDC(): boolean { 76 return true; 77 } 78 79 async flushPendingWrites(): Promise<void> { 80 for (const [tableName] of this.pendingWrites) { 81 await this.flushTable(tableName); 82 } 83 } 84 85 async close(): Promise<void> { 86 await this.flushPendingWrites(); 87 } 88 89 private async flushTable(tableName: string): Promise<void> { 90 const rows = this.pendingWrites.get(tableName); 91 if (!rows || rows.length === 0) return; 92 93 await this.kysely.insertInto(tableName as any).values(rows).execute(); 94 this.pendingWrites.set(tableName, []); 95 } 96 97 // Stub implementations - integrators must implement these 98 logActionExecutions = async (..._args: any[]): Promise<void> => { 99 throw new Error('Not implemented'); 100 }; 101 logRuleExecutions = async (..._args: any[]): Promise<void> => { 102 throw new Error('Not implemented'); 103 }; 104 logItemModelScore = async (..._args: any[]): Promise<void> => { 105 throw new Error('Not implemented'); 106 }; 107 logReportingRuleExecutions = async (..._args: any[]): Promise<void> => { 108 throw new Error('Not implemented'); 109 }; 110 logContentApiRequest = async (..._args: any[]): Promise<void> => { 111 throw new Error('Not implemented'); 112 }; 113 logContentDetailsApiRequest = async (..._args: any[]): Promise<void> => { 114 throw new Error('Not implemented'); 115 }; 116 logRoutingRuleExecutions = async (..._args: any[]): Promise<void> => { 117 throw new Error('Not implemented'); 118 }; 119 logOrgCreation = async (..._args: any[]): Promise<void> => { 120 throw new Error('Not implemented'); 121 }; 122} 123