Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { createClient, type ClickHouseClient } from '@clickhouse/client';
2import {
3 Kysely,
4 PostgresAdapter,
5 PostgresIntrospector,
6 PostgresQueryCompiler,
7 type CompiledQuery,
8 type DatabaseConnection,
9 type Dialect,
10 type DialectAdapter,
11 type Driver,
12 type QueryCompiler,
13 type QueryResult,
14 type TransactionSettings,
15} from 'kysely';
16
17import { formatClickhouseQuery } from '../../plugins/warehouse/utils/clickhouseSql.js';
18import type {
19 DataWarehousePoolSettings,
20 IDataWarehouseDialect,
21} from './IDataWarehouse.js';
22
23export interface ClickhouseConnectionSettings {
24 host: string;
25 username: string;
26 password: string;
27 database: string;
28 port?: number;
29 protocol?: 'http' | 'https';
30}
31
32function createConnection(client: ClickHouseClient): DatabaseConnection {
33 const execute = async <R>(
34 compiledQuery: CompiledQuery,
35 ): Promise<QueryResult<R>> => {
36 const statement = formatClickhouseQuery(
37 compiledQuery.sql,
38 compiledQuery.parameters,
39 );
40 const result = await client.query({
41 query: statement,
42 format: 'JSONEachRow',
43 });
44
45 const rows = await result.json<R>();
46 return { rows };
47 };
48
49 return {
50 executeQuery: execute,
51 streamQuery<R>(compiledQuery: CompiledQuery) {
52 return (async function* iterator(): AsyncIterableIterator<
53 QueryResult<R>
54 > {
55 yield await execute<R>(compiledQuery);
56 })();
57 },
58 };
59}
60
61function createDriver(client: ClickHouseClient): Driver {
62 return {
63 async init() {
64 // No initialization steps required for the HTTP client.
65 },
66 async acquireConnection() {
67 return createConnection(client);
68 },
69 async beginTransaction(
70 _connection: DatabaseConnection,
71 _settings: TransactionSettings,
72 ) {
73 throw new Error(
74 'ClickHouse does not support multi-statement transactions',
75 );
76 },
77 async commitTransaction() {
78 throw new Error(
79 'ClickHouse does not support multi-statement transactions',
80 );
81 },
82 async rollbackTransaction() {
83 throw new Error(
84 'ClickHouse does not support multi-statement transactions',
85 );
86 },
87 async releaseConnection(_connection: DatabaseConnection) {
88 // Nothing to release; HTTP client pools internally.
89 },
90 async destroy() {
91 await client.close();
92 },
93 };
94}
95
96function createDialect(client: ClickHouseClient): Dialect {
97 return {
98 createDriver() {
99 return createDriver(client);
100 },
101 createQueryCompiler(): QueryCompiler {
102 return new PostgresQueryCompiler();
103 },
104 createAdapter(): DialectAdapter {
105 return new PostgresAdapter();
106 },
107 createIntrospector(db: Kysely<any>) {
108 return new PostgresIntrospector(db);
109 },
110 };
111}
112
113export class ClickhouseKyselyAdapter implements IDataWarehouseDialect {
114 private readonly client: ClickHouseClient;
115 // eslint-disable-next-line @typescript-eslint/no-explicit-any
116 private readonly kysely: Kysely<any>;
117
118 constructor(
119 connectionSettings: ClickhouseConnectionSettings,
120 _poolSettings?: DataWarehousePoolSettings,
121 ) {
122 const protocol = connectionSettings.protocol ?? 'http';
123 const port = connectionSettings.port ?? 8123;
124
125 const url = `${protocol}://${connectionSettings.host}:${port}`;
126 const rawPassword = connectionSettings.password;
127 const password =
128 rawPassword && rawPassword.length > 0 ? rawPassword : undefined;
129 this.client = createClient({
130 url,
131 username: connectionSettings.username,
132 ...(password ? { password } : {}),
133 database: connectionSettings.database,
134 clickhouse_settings: {
135 allow_experimental_object_type: 1,
136 },
137 });
138
139 this.kysely = new Kysely({
140 dialect: createDialect(this.client),
141 });
142 }
143
144 // eslint-disable-next-line @typescript-eslint/no-explicit-any
145 getKyselyInstance(): Kysely<any> {
146 return this.kysely;
147 }
148
149 async destroy(): Promise<void> {
150 await this.kysely.destroy();
151 }
152}