Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/**
2 * PostgreSQL analytics adapter stub - SAMPLE ONLY
3 *
4 * This is a basic stub showing what you need to implement for PostgreSQL.
5 *
6 * To implement:
7 * - Use Kysely with PostgreSQL dialect
8 * - Use COPY for bulk inserts
9 * - Use logical replication for CDC (Debezium/pglogical)
10 * - See ClickhouseAnalyticsAdapter for a concrete analytics adapter reference
11 * - See ../README.md for implementation guide
12 */
13
14import { sql, type Kysely } from 'kysely';
15import type SafeTracer from '../../utils/SafeTracer.js';
16import {
17 type IDataWarehouseAnalytics,
18 type AnalyticsSchema,
19 type BulkWriteConfig,
20 type CDCConfig,
21 type CDCChange,
22} from './IDataWarehouseAnalytics.js';
23
24/**
25 * PostgreSQL analytics adapter
26 * Uses batch inserts and logical replication for CDC
27 */
28export class PostgresAnalyticsAdapter implements IDataWarehouseAnalytics {
29 private pendingWrites: Map<string, any[]> = new Map();
30
31 constructor(private readonly kysely: Kysely<any>) {}
32
33 async bulkWrite<TableName extends keyof AnalyticsSchema>(
34 tableName: TableName,
35 rows: readonly AnalyticsSchema[TableName][],
36 config?: BulkWriteConfig,
37 ): Promise<void> {
38 const tableKey = tableName as string;
39
40 if (!this.pendingWrites.has(tableKey)) {
41 this.pendingWrites.set(tableKey, []);
42 }
43 this.pendingWrites.get(tableKey)!.push(...rows);
44
45 const batchSize = config?.batchSize ?? 500;
46 const pending = this.pendingWrites.get(tableKey)!;
47
48 if (config?.batchTimeout === 0 || pending.length >= batchSize) {
49 await this.flushTable(tableKey);
50 }
51 }
52
53 async createCDCStream<TableName extends string>(
54 config: CDCConfig<TableName>,
55 ): Promise<void> {
56 const { tableName, schemaName = 'public' } = config;
57 await sql`ALTER TABLE ${sql.ref(`${schemaName}.${tableName}`)} REPLICA IDENTITY FULL`.execute(
58 this.kysely,
59 );
60 await sql`CREATE PUBLICATION IF NOT EXISTS ${sql.raw(tableName)}_cdc FOR TABLE ${sql.ref(`${schemaName}.${tableName}`)}`.execute(
61 this.kysely,
62 );
63 }
64
65 async consumeCDCChanges<T = unknown>(
66 _streamName: string,
67 _callback: (changes: CDCChange<T>[]) => Promise<void>,
68 _tracer: SafeTracer,
69 ): Promise<void> {
70 throw new Error(
71 'PostgreSQL CDC consumption requires external tools like Debezium. See INTEGRATOR_GUIDE.md',
72 );
73 }
74
75 supportsCDC(): boolean {
76 return true;
77 }
78
79 async flushPendingWrites(): Promise<void> {
80 for (const [tableName] of this.pendingWrites) {
81 await this.flushTable(tableName);
82 }
83 }
84
85 async close(): Promise<void> {
86 await this.flushPendingWrites();
87 }
88
89 private async flushTable(tableName: string): Promise<void> {
90 const rows = this.pendingWrites.get(tableName);
91 if (!rows || rows.length === 0) return;
92
93 await this.kysely.insertInto(tableName as any).values(rows).execute();
94 this.pendingWrites.set(tableName, []);
95 }
96
97 // Stub implementations - integrators must implement these
98 logActionExecutions = async (..._args: any[]): Promise<void> => {
99 throw new Error('Not implemented');
100 };
101 logRuleExecutions = async (..._args: any[]): Promise<void> => {
102 throw new Error('Not implemented');
103 };
104 logItemModelScore = async (..._args: any[]): Promise<void> => {
105 throw new Error('Not implemented');
106 };
107 logReportingRuleExecutions = async (..._args: any[]): Promise<void> => {
108 throw new Error('Not implemented');
109 };
110 logContentApiRequest = async (..._args: any[]): Promise<void> => {
111 throw new Error('Not implemented');
112 };
113 logContentDetailsApiRequest = async (..._args: any[]): Promise<void> => {
114 throw new Error('Not implemented');
115 };
116 logRoutingRuleExecutions = async (..._args: any[]): Promise<void> => {
117 throw new Error('Not implemented');
118 };
119 logOrgCreation = async (..._args: any[]): Promise<void> => {
120 throw new Error('Not implemented');
121 };
122}
123