Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 273 lines 6.9 kB view raw
1/** 2 * Extended interface for data warehouse analytics features 3 * Allows integrators to implement CDC, bulk writes, and logging for any warehouse 4 */ 5 6import type SafeTracer from '../../utils/SafeTracer.js'; 7 8/** 9 * Schema definition that integrators must implement 10 * Each warehouse implementation creates their own tables/schemas matching these types 11 */ 12export type AnalyticsSchema = { 13 RULE_EXECUTIONS: { 14 ds: string; 15 ts: number; 16 org_id: string; 17 item_id: string; 18 item_type_id: string; 19 item_type_kind: string; 20 item_submission_id?: string; 21 item_data?: string; 22 item_type_name?: string; 23 item_creator_id?: string; 24 item_creator_type_id?: string; 25 item_type_schema?: string; 26 item_type_schema_field_roles?: Record<string, string>; 27 item_type_schema_variant?: string; 28 item_type_version?: string; 29 rule: string; 30 rule_id: string; 31 rule_version: string; 32 tags: readonly string[]; 33 policy_ids: readonly string[]; 34 policy_names: readonly string[]; 35 environment: string; 36 correlation_id: string; 37 result: string; 38 passed: boolean; 39 }; 40 41 ACTION_EXECUTIONS: { 42 ds: string; 43 ts: number; 44 org_id: string; 45 action_id: string; 46 action_name: string; 47 action_source: string; 48 correlation_id: string; 49 item_id: string; 50 item_type_id: string; 51 item_type_kind: string; 52 item_submission_id?: string; 53 item_creator_id?: string; 54 item_creator_type_id?: string; 55 rule_environment?: string; 56 rules?: readonly unknown[]; 57 rule_tags?: readonly string[]; 58 policies: readonly unknown[]; 59 actor_id?: string; 60 job_id?: string; 61 failed: boolean; 62 }; 63 64 ITEM_MODEL_SCORES_LOG: { 65 ds: string; 66 ts: number; 67 org_id: string; 68 item_id: string; 69 item_data: string; 70 item_type_id: string; 71 item_type_kind: string; 72 item_type_name: string; 73 item_type_version: string; 74 item_type_schema_variant: string; 75 item_type_schema: string; 76 item_type_schema_field_roles?: Record<string, unknown>; 77 item_submission_id?: string; 78 item_creator_id?: string; 79 item_creator_type_id?: string; 80 submission_id?: string; 81 model_id?: string; 82 model_version?: number | string; 83 model_score?: number; 84 event: string; 85 failure_reason?: string; 86 metadata?: Record<string, unknown>; 87 correlation_id?: string; 88 failed?: boolean; 89 error_message?: string; 90 }; 91 92 CONTENT_API_REQUESTS: { 93 ds: string; 94 ts: number; 95 org_id: string; 96 item_id: string; 97 item_type_id: string; 98 item_submission_id?: string; 99 item_creator_id?: string; 100 endpoint: string; 101 method: string; 102 correlation_id: string; 103 duration_ms: number; 104 failed: boolean; 105 error_message?: string; 106 }; 107 108 // Operational tables (using warehouse for operational data) 109 'REPORTING_SERVICE.REPORTS': { 110 [key: string]: unknown; // Dynamic schema 111 }; 112 113 'REPORTING_SERVICE.APPEALS': { 114 [key: string]: unknown; // Dynamic schema 115 }; 116 117 'USER_STATISTICS_SERVICE.USER_SCORES': { 118 [key: string]: unknown; // Dynamic schema 119 }; 120 121 'USER_STATISTICS_SERVICE.SUBMISSION_STATS': { 122 [key: string]: unknown; // Dynamic schema 123 }; 124}; 125 126/** 127 * Configuration for bulk/eventual writes 128 */ 129export interface BulkWriteConfig { 130 batchSize?: number; 131 batchTimeout?: number; 132 compression?: boolean; 133} 134 135/** 136 * CDC/Streaming configuration 137 */ 138export interface CDCConfig<TableName extends string> { 139 tableName: TableName; 140 schemaName?: string; 141 batchSize?: number; 142 pollInterval?: number; 143} 144 145/** 146 * CDC change record 147 */ 148export interface CDCChange<T = unknown> { 149 operation: 'INSERT' | 'UPDATE' | 'DELETE'; 150 before?: T; 151 after?: T; 152 metadata: { 153 timestamp: Date; 154 transactionId?: string; 155 }; 156} 157 158/** 159 * Extended interface for analytics-specific warehouse features 160 * Implementations provide CDC, bulk writes, and other analytics capabilities 161 */ 162export interface IDataWarehouseAnalytics { 163 /** 164 * Bulk write rows to a table with batching/buffering 165 * Implementations handle batching, compression, and optimal ingestion 166 */ 167 bulkWrite<TableName extends keyof AnalyticsSchema>( 168 tableName: TableName, 169 rows: readonly AnalyticsSchema[TableName][], 170 config?: BulkWriteConfig, 171 ): Promise<void>; 172 173 /** 174 * Create a CDC stream/listener on a table 175 * Implementations use their warehouse's CDC mechanism (Clickhouse materialized 176 * views, PostgreSQL logical replication, etc.) 177 */ 178 createCDCStream<TableName extends string>( 179 config: CDCConfig<TableName>, 180 ): Promise<void>; 181 182 /** 183 * Consume changes from a CDC stream 184 * Callback is called with batches of changes 185 */ 186 consumeCDCChanges<T = unknown>( 187 streamName: string, 188 callback: (changes: CDCChange<T>[]) => Promise<void>, 189 tracer: SafeTracer, 190 ): Promise<void>; 191 192 /** 193 * Check if CDC is supported by this warehouse 194 */ 195 supportsCDC(): boolean; 196 197 /** 198 * Flush any pending bulk writes 199 * Called during shutdown to ensure all data is written 200 */ 201 flushPendingWrites(): Promise<void>; 202 203 /** 204 * Close/cleanup the analytics adapter 205 * Alias for flushPendingWrites for IOC container compatibility 206 */ 207 close?(): Promise<void>; 208} 209 210/** 211 * Schema documentation for integrators 212 * Describes what tables/schemas need to be created in the warehouse 213 */ 214export const ANALYTICS_SCHEMA_DOCS = { 215 RULE_EXECUTIONS: { 216 description: 'Logs every rule execution against content', 217 partitionKey: 'ds', 218 sortKey: 'ts', 219 indexes: ['org_id', 'rule_id', 'item_id'], 220 }, 221 ACTION_EXECUTIONS: { 222 description: 'Logs every action execution (moderation actions taken)', 223 partitionKey: 'ds', 224 sortKey: 'ts', 225 indexes: ['org_id', 'action_id', 'item_id'], 226 }, 227 ITEM_MODEL_SCORES_LOG: { 228 description: 'Logs ML model scores for content', 229 partitionKey: 'ds', 230 sortKey: 'ts', 231 indexes: ['org_id', 'model_id', 'item_id'], 232 }, 233 CONTENT_API_REQUESTS: { 234 description: 'Logs API requests for content moderation', 235 partitionKey: 'ds', 236 sortKey: 'ts', 237 indexes: ['org_id', 'endpoint'], 238 }, 239} as const; 240 241/** 242 * Example migration documentation for integrators 243 */ 244export const MIGRATION_EXAMPLE = ` 245-- Example for PostgreSQL: 246CREATE TABLE rule_executions ( 247 ds DATE NOT NULL, 248 ts BIGINT NOT NULL, 249 org_id TEXT NOT NULL, 250 item_id TEXT NOT NULL, 251 rule_id TEXT NOT NULL, 252 passed BOOLEAN NOT NULL, 253 -- ... other fields from AnalyticsSchema 254 PRIMARY KEY (ds, ts, org_id, item_id) 255); 256 257CREATE INDEX idx_rule_executions_org ON rule_executions(org_id); 258CREATE INDEX idx_rule_executions_rule ON rule_executions(rule_id); 259 260-- Example for Clickhouse: 261CREATE TABLE rule_executions ( 262 ds Date, 263 ts UInt64, 264 org_id String, 265 item_id String, 266 rule_id String, 267 passed UInt8, 268 -- ... other fields 269) ENGINE = MergeTree() 270PARTITION BY ds 271ORDER BY (ds, ts, org_id); 272`; 273