Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 152 lines 4.0 kB view raw
1import { createClient, type ClickHouseClient } from '@clickhouse/client'; 2import { 3 Kysely, 4 PostgresAdapter, 5 PostgresIntrospector, 6 PostgresQueryCompiler, 7 type CompiledQuery, 8 type DatabaseConnection, 9 type Dialect, 10 type DialectAdapter, 11 type Driver, 12 type QueryCompiler, 13 type QueryResult, 14 type TransactionSettings, 15} from 'kysely'; 16 17import { formatClickhouseQuery } from '../../plugins/warehouse/utils/clickhouseSql.js'; 18import type { 19 DataWarehousePoolSettings, 20 IDataWarehouseDialect, 21} from './IDataWarehouse.js'; 22 23export interface ClickhouseConnectionSettings { 24 host: string; 25 username: string; 26 password: string; 27 database: string; 28 port?: number; 29 protocol?: 'http' | 'https'; 30} 31 32function createConnection(client: ClickHouseClient): DatabaseConnection { 33 const execute = async <R>( 34 compiledQuery: CompiledQuery, 35 ): Promise<QueryResult<R>> => { 36 const statement = formatClickhouseQuery( 37 compiledQuery.sql, 38 compiledQuery.parameters, 39 ); 40 const result = await client.query({ 41 query: statement, 42 format: 'JSONEachRow', 43 }); 44 45 const rows = await result.json<R>(); 46 return { rows }; 47 }; 48 49 return { 50 executeQuery: execute, 51 streamQuery<R>(compiledQuery: CompiledQuery) { 52 return (async function* iterator(): AsyncIterableIterator< 53 QueryResult<R> 54 > { 55 yield await execute<R>(compiledQuery); 56 })(); 57 }, 58 }; 59} 60 61function createDriver(client: ClickHouseClient): Driver { 62 return { 63 async init() { 64 // No initialization steps required for the HTTP client. 65 }, 66 async acquireConnection() { 67 return createConnection(client); 68 }, 69 async beginTransaction( 70 _connection: DatabaseConnection, 71 _settings: TransactionSettings, 72 ) { 73 throw new Error( 74 'ClickHouse does not support multi-statement transactions', 75 ); 76 }, 77 async commitTransaction() { 78 throw new Error( 79 'ClickHouse does not support multi-statement transactions', 80 ); 81 }, 82 async rollbackTransaction() { 83 throw new Error( 84 'ClickHouse does not support multi-statement transactions', 85 ); 86 }, 87 async releaseConnection(_connection: DatabaseConnection) { 88 // Nothing to release; HTTP client pools internally. 89 }, 90 async destroy() { 91 await client.close(); 92 }, 93 }; 94} 95 96function createDialect(client: ClickHouseClient): Dialect { 97 return { 98 createDriver() { 99 return createDriver(client); 100 }, 101 createQueryCompiler(): QueryCompiler { 102 return new PostgresQueryCompiler(); 103 }, 104 createAdapter(): DialectAdapter { 105 return new PostgresAdapter(); 106 }, 107 createIntrospector(db: Kysely<any>) { 108 return new PostgresIntrospector(db); 109 }, 110 }; 111} 112 113export class ClickhouseKyselyAdapter implements IDataWarehouseDialect { 114 private readonly client: ClickHouseClient; 115 // eslint-disable-next-line @typescript-eslint/no-explicit-any 116 private readonly kysely: Kysely<any>; 117 118 constructor( 119 connectionSettings: ClickhouseConnectionSettings, 120 _poolSettings?: DataWarehousePoolSettings, 121 ) { 122 const protocol = connectionSettings.protocol ?? 'http'; 123 const port = connectionSettings.port ?? 8123; 124 125 const url = `${protocol}://${connectionSettings.host}:${port}`; 126 const rawPassword = connectionSettings.password; 127 const password = 128 rawPassword && rawPassword.length > 0 ? rawPassword : undefined; 129 this.client = createClient({ 130 url, 131 username: connectionSettings.username, 132 ...(password ? { password } : {}), 133 database: connectionSettings.database, 134 clickhouse_settings: { 135 allow_experimental_object_type: 1, 136 }, 137 }); 138 139 this.kysely = new Kysely({ 140 dialect: createDialect(this.client), 141 }); 142 } 143 144 // eslint-disable-next-line @typescript-eslint/no-explicit-any 145 getKyselyInstance(): Kysely<any> { 146 return this.kysely; 147 } 148 149 async destroy(): Promise<void> { 150 await this.kysely.destroy(); 151 } 152}