Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 243 lines 6.4 kB view raw
1import { readFileSync } from 'fs'; 2import { dirname, join as pathJoin } from 'path'; 3import { fileURLToPath } from 'url'; 4 5import { createClient, type ClickHouseClient } from '@clickhouse/client'; 6import { wrapMigration, type DatabaseConfig } from '@roostorg/db-migrator'; 7import type { UmzugStorage } from 'umzug'; 8 9const __dirname = dirname(fileURLToPath(import.meta.url)); 10const relativePath = (it: string) => pathJoin(__dirname, it); 11 12interface ClickhouseConnectionOptions { 13 host: string; 14 port: number; 15 protocol: 'http' | 'https'; 16 username: string; 17 password?: string; 18 database: string; 19} 20 21const passwordFromEnv = process.env.CLICKHOUSE_PASSWORD; 22 23const connectionOptions: ClickhouseConnectionOptions = { 24 host: process.env.CLICKHOUSE_HOST ?? 'localhost', 25 port: Number.parseInt(process.env.CLICKHOUSE_PORT ?? '8123', 10), 26 protocol: (process.env.CLICKHOUSE_PROTOCOL as 'http' | 'https') ?? 'http', 27 username: process.env.CLICKHOUSE_USERNAME ?? 'default', 28 database: process.env.CLICKHOUSE_DATABASE ?? 'analytics', 29 ...(passwordFromEnv ? { password: passwordFromEnv } : {}), 30}; 31 32const MIGRATIONS_TABLE = 33 process.env.CLICKHOUSE_MIGRATIONS_TABLE ?? 'MIGRATIONS_METADATA'; 34 35function createClickhouseClient(database?: string): ClickHouseClient { 36 const { host, port, protocol, username, password } = connectionOptions; 37 const url = `${protocol}://${host}:${port}`; 38 39 return createClient({ 40 url, 41 database: database ?? connectionOptions.database, 42 username, 43 ...(password ? { password } : {}), 44 clickhouse_settings: { 45 allow_experimental_object_type: 1, 46 }, 47 }); 48} 49 50class ClickhouseMigrationStorage implements UmzugStorage<ClickHouseClient> { 51 private readonly client: ClickHouseClient; 52 private readonly tableIdentifier: string; 53 private isTableEnsured = false; 54 55 constructor() { 56 this.client = createClickhouseClient(); 57 this.tableIdentifier = `${connectionOptions.database}.${MIGRATIONS_TABLE}`; 58 } 59 60 async logMigration({ name }: { name: string }): Promise<void> { 61 await this.ensureTable(); 62 await this.client.insert({ 63 table: this.tableIdentifier, 64 format: 'JSONEachRow', 65 values: [ 66 { 67 name, 68 executed_at: Math.floor(Date.now() / 1000), 69 }, 70 ], 71 }); 72 } 73 74 async unlogMigration({ name }: { name: string }): Promise<void> { 75 await this.ensureTable(); 76 await this.client.command({ 77 query: `ALTER TABLE ${this.tableIdentifier} DELETE WHERE name = {name:String}`, 78 query_params: { name }, 79 }); 80 } 81 82 async executed(): Promise<string[]> { 83 await this.ensureTable(); 84 85 try { 86 const result = await this.client.query({ 87 query: `SELECT name FROM ${this.tableIdentifier} ORDER BY executed_at`, 88 format: 'JSONEachRow', 89 }); 90 const rows = await result.json<{ name: string }>(); 91 return rows.map((row) => row.name); 92 } catch (error: unknown) { 93 if ( 94 typeof error === 'object' && 95 error !== null && 96 'code' in error && 97 // ClickHouse error code 60 => TABLE_DOESNT_EXIST 98 (error as { code?: number }).code === 60 99 ) { 100 return []; 101 } 102 throw error; 103 } 104 } 105 106 async shutdown(): Promise<void> { 107 await this.client.close(); 108 } 109 110 private async ensureTable(): Promise<void> { 111 if (this.isTableEnsured) { 112 return; 113 } 114 115 await this.client.command({ 116 query: ` 117 CREATE TABLE IF NOT EXISTS ${this.tableIdentifier} ( 118 name String, 119 executed_at DateTime64(3) DEFAULT now() 120 ) 121 ENGINE = MergeTree 122 ORDER BY (name) 123 SETTINGS index_granularity = 1 124 `, 125 }); 126 127 this.isTableEnsured = true; 128 } 129} 130 131export default { 132 supportedEnvironments: ['staging', 'prod'], 133 supportedScriptFormats: ['sql'] as const, 134 defaultScriptFormat: 'sql' as const, 135 scriptsDirectory: relativePath('../scripts/clickhouse'), 136 137 createStorage() { 138 return new ClickhouseMigrationStorage(); 139 }, 140 141 async destroyStorage(storage: ClickhouseMigrationStorage) { 142 await storage.shutdown(); 143 }, 144 145 createContext(): ClickHouseClient { 146 return createClickhouseClient(); 147 }, 148 149 async destroyContext(context: ClickHouseClient) { 150 await context.close(); 151 }, 152 153 resolveScript(params) { 154 const { path, name, context } = params; 155 156 if (!path.endsWith('.sql')) { 157 throw new Error( 158 `Unsupported ClickHouse migration format for ${name}. Expected .sql file.`, 159 ); 160 } 161 162 const fileContents = readFileSync(path, 'utf8'); 163 const statements = splitSqlStatements(fileContents); 164 const baseResult = { 165 name, 166 async up(): Promise<void> { 167 for (const statement of statements) { 168 if (!statement.length) { 169 continue; 170 } 171 172 await context.command({ query: statement }); 173 } 174 }, 175 }; 176 177 return wrapMigration( 178 { 179 runBefore: async () => { 180 await context.command({ 181 query: 'SET allow_experimental_object_type = 1;', 182 }); 183 }, 184 }, 185 baseResult, 186 ); 187 }, 188 189 async dropDbAndDisconnect() { 190 const client = createClickhouseClient(connectionOptions.database); 191 192 try { 193 await client.command({ 194 query: `DROP TABLE IF EXISTS ${connectionOptions.database}.${MIGRATIONS_TABLE}`, 195 }); 196 } finally { 197 await client.close(); 198 } 199 }, 200 201 async prepareDbAndDisconnect() { 202 const client = createClickhouseClient(connectionOptions.database); 203 204 try { 205 await client.command({ 206 query: `CREATE DATABASE IF NOT EXISTS ${connectionOptions.database}`, 207 }); 208 } finally { 209 await client.close(); 210 } 211 }, 212} satisfies DatabaseConfig<'sql', ClickHouseClient, ClickhouseMigrationStorage>; 213 214function splitSqlStatements(sql: string): string[] { 215 const statements: string[] = []; 216 let buffer = ''; 217 218 for (const line of sql.split('\n')) { 219 const trimmedLine = line.trim(); 220 221 if (!trimmedLine.length || trimmedLine.startsWith('--')) { 222 continue; 223 } 224 225 buffer += buffer.length ? `\n${line}` : line; 226 227 if (trimmedLine.endsWith(';')) { 228 const statement = buffer.replace(/;\s*$/u, '').trim(); 229 if (statement.length) { 230 statements.push(statement); 231 } 232 buffer = ''; 233 } 234 } 235 236 const remaining = buffer.trim(); 237 if (remaining.length) { 238 statements.push(remaining); 239 } 240 241 return statements; 242} 243