Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/**
2 * Extended interface for data warehouse analytics features
3 * Allows integrators to implement CDC, bulk writes, and logging for any warehouse
4 */
5
6import type SafeTracer from '../../utils/SafeTracer.js';
7
8/**
9 * Schema definition that integrators must implement
10 * Each warehouse implementation creates their own tables/schemas matching these types
11 */
12export type AnalyticsSchema = {
13 RULE_EXECUTIONS: {
14 ds: string;
15 ts: number;
16 org_id: string;
17 item_id: string;
18 item_type_id: string;
19 item_type_kind: string;
20 item_submission_id?: string;
21 item_data?: string;
22 item_type_name?: string;
23 item_creator_id?: string;
24 item_creator_type_id?: string;
25 item_type_schema?: string;
26 item_type_schema_field_roles?: Record<string, string>;
27 item_type_schema_variant?: string;
28 item_type_version?: string;
29 rule: string;
30 rule_id: string;
31 rule_version: string;
32 tags: readonly string[];
33 policy_ids: readonly string[];
34 policy_names: readonly string[];
35 environment: string;
36 correlation_id: string;
37 result: string;
38 passed: boolean;
39 };
40
41 ACTION_EXECUTIONS: {
42 ds: string;
43 ts: number;
44 org_id: string;
45 action_id: string;
46 action_name: string;
47 action_source: string;
48 correlation_id: string;
49 item_id: string;
50 item_type_id: string;
51 item_type_kind: string;
52 item_submission_id?: string;
53 item_creator_id?: string;
54 item_creator_type_id?: string;
55 rule_environment?: string;
56 rules?: readonly unknown[];
57 rule_tags?: readonly string[];
58 policies: readonly unknown[];
59 actor_id?: string;
60 job_id?: string;
61 failed: boolean;
62 };
63
64 ITEM_MODEL_SCORES_LOG: {
65 ds: string;
66 ts: number;
67 org_id: string;
68 item_id: string;
69 item_data: string;
70 item_type_id: string;
71 item_type_kind: string;
72 item_type_name: string;
73 item_type_version: string;
74 item_type_schema_variant: string;
75 item_type_schema: string;
76 item_type_schema_field_roles?: Record<string, unknown>;
77 item_submission_id?: string;
78 item_creator_id?: string;
79 item_creator_type_id?: string;
80 submission_id?: string;
81 model_id?: string;
82 model_version?: number | string;
83 model_score?: number;
84 event: string;
85 failure_reason?: string;
86 metadata?: Record<string, unknown>;
87 correlation_id?: string;
88 failed?: boolean;
89 error_message?: string;
90 };
91
92 CONTENT_API_REQUESTS: {
93 ds: string;
94 ts: number;
95 org_id: string;
96 item_id: string;
97 item_type_id: string;
98 item_submission_id?: string;
99 item_creator_id?: string;
100 endpoint: string;
101 method: string;
102 correlation_id: string;
103 duration_ms: number;
104 failed: boolean;
105 error_message?: string;
106 };
107
108 // Operational tables (using warehouse for operational data)
109 'REPORTING_SERVICE.REPORTS': {
110 [key: string]: unknown; // Dynamic schema
111 };
112
113 'REPORTING_SERVICE.APPEALS': {
114 [key: string]: unknown; // Dynamic schema
115 };
116
117 'USER_STATISTICS_SERVICE.USER_SCORES': {
118 [key: string]: unknown; // Dynamic schema
119 };
120
121 'USER_STATISTICS_SERVICE.SUBMISSION_STATS': {
122 [key: string]: unknown; // Dynamic schema
123 };
124};
125
126/**
127 * Configuration for bulk/eventual writes
128 */
129export interface BulkWriteConfig {
130 batchSize?: number;
131 batchTimeout?: number;
132 compression?: boolean;
133}
134
135/**
136 * CDC/Streaming configuration
137 */
138export interface CDCConfig<TableName extends string> {
139 tableName: TableName;
140 schemaName?: string;
141 batchSize?: number;
142 pollInterval?: number;
143}
144
145/**
146 * CDC change record
147 */
148export interface CDCChange<T = unknown> {
149 operation: 'INSERT' | 'UPDATE' | 'DELETE';
150 before?: T;
151 after?: T;
152 metadata: {
153 timestamp: Date;
154 transactionId?: string;
155 };
156}
157
158/**
159 * Extended interface for analytics-specific warehouse features
160 * Implementations provide CDC, bulk writes, and other analytics capabilities
161 */
162export interface IDataWarehouseAnalytics {
163 /**
164 * Bulk write rows to a table with batching/buffering
165 * Implementations handle batching, compression, and optimal ingestion
166 */
167 bulkWrite<TableName extends keyof AnalyticsSchema>(
168 tableName: TableName,
169 rows: readonly AnalyticsSchema[TableName][],
170 config?: BulkWriteConfig,
171 ): Promise<void>;
172
173 /**
174 * Create a CDC stream/listener on a table
175 * Implementations use their warehouse's CDC mechanism (Clickhouse materialized
176 * views, PostgreSQL logical replication, etc.)
177 */
178 createCDCStream<TableName extends string>(
179 config: CDCConfig<TableName>,
180 ): Promise<void>;
181
182 /**
183 * Consume changes from a CDC stream
184 * Callback is called with batches of changes
185 */
186 consumeCDCChanges<T = unknown>(
187 streamName: string,
188 callback: (changes: CDCChange<T>[]) => Promise<void>,
189 tracer: SafeTracer,
190 ): Promise<void>;
191
192 /**
193 * Check if CDC is supported by this warehouse
194 */
195 supportsCDC(): boolean;
196
197 /**
198 * Flush any pending bulk writes
199 * Called during shutdown to ensure all data is written
200 */
201 flushPendingWrites(): Promise<void>;
202
203 /**
204 * Close/cleanup the analytics adapter
205 * Alias for flushPendingWrites for IOC container compatibility
206 */
207 close?(): Promise<void>;
208}
209
210/**
211 * Schema documentation for integrators
212 * Describes what tables/schemas need to be created in the warehouse
213 */
214export const ANALYTICS_SCHEMA_DOCS = {
215 RULE_EXECUTIONS: {
216 description: 'Logs every rule execution against content',
217 partitionKey: 'ds',
218 sortKey: 'ts',
219 indexes: ['org_id', 'rule_id', 'item_id'],
220 },
221 ACTION_EXECUTIONS: {
222 description: 'Logs every action execution (moderation actions taken)',
223 partitionKey: 'ds',
224 sortKey: 'ts',
225 indexes: ['org_id', 'action_id', 'item_id'],
226 },
227 ITEM_MODEL_SCORES_LOG: {
228 description: 'Logs ML model scores for content',
229 partitionKey: 'ds',
230 sortKey: 'ts',
231 indexes: ['org_id', 'model_id', 'item_id'],
232 },
233 CONTENT_API_REQUESTS: {
234 description: 'Logs API requests for content moderation',
235 partitionKey: 'ds',
236 sortKey: 'ts',
237 indexes: ['org_id', 'endpoint'],
238 },
239} as const;
240
241/**
242 * Example migration documentation for integrators
243 */
244export const MIGRATION_EXAMPLE = `
245-- Example for PostgreSQL:
246CREATE TABLE rule_executions (
247 ds DATE NOT NULL,
248 ts BIGINT NOT NULL,
249 org_id TEXT NOT NULL,
250 item_id TEXT NOT NULL,
251 rule_id TEXT NOT NULL,
252 passed BOOLEAN NOT NULL,
253 -- ... other fields from AnalyticsSchema
254 PRIMARY KEY (ds, ts, org_id, item_id)
255);
256
257CREATE INDEX idx_rule_executions_org ON rule_executions(org_id);
258CREATE INDEX idx_rule_executions_rule ON rule_executions(rule_id);
259
260-- Example for Clickhouse:
261CREATE TABLE rule_executions (
262 ds Date,
263 ts UInt64,
264 org_id String,
265 item_id String,
266 rule_id String,
267 passed UInt8,
268 -- ... other fields
269) ENGINE = MergeTree()
270PARTITION BY ds
271ORDER BY (ds, ts, org_id);
272`;
273