Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { readFileSync } from 'fs';
2import { dirname, join as pathJoin } from 'path';
3import { fileURLToPath } from 'url';
4
5import { createClient, type ClickHouseClient } from '@clickhouse/client';
6import { wrapMigration, type DatabaseConfig } from '@roostorg/db-migrator';
7import type { UmzugStorage } from 'umzug';
8
9const __dirname = dirname(fileURLToPath(import.meta.url));
10const relativePath = (it: string) => pathJoin(__dirname, it);
11
12interface ClickhouseConnectionOptions {
13 host: string;
14 port: number;
15 protocol: 'http' | 'https';
16 username: string;
17 password?: string;
18 database: string;
19}
20
21const passwordFromEnv = process.env.CLICKHOUSE_PASSWORD;
22
23const connectionOptions: ClickhouseConnectionOptions = {
24 host: process.env.CLICKHOUSE_HOST ?? 'localhost',
25 port: Number.parseInt(process.env.CLICKHOUSE_PORT ?? '8123', 10),
26 protocol: (process.env.CLICKHOUSE_PROTOCOL as 'http' | 'https') ?? 'http',
27 username: process.env.CLICKHOUSE_USERNAME ?? 'default',
28 database: process.env.CLICKHOUSE_DATABASE ?? 'analytics',
29 ...(passwordFromEnv ? { password: passwordFromEnv } : {}),
30};
31
32const MIGRATIONS_TABLE =
33 process.env.CLICKHOUSE_MIGRATIONS_TABLE ?? 'MIGRATIONS_METADATA';
34
35function createClickhouseClient(database?: string): ClickHouseClient {
36 const { host, port, protocol, username, password } = connectionOptions;
37 const url = `${protocol}://${host}:${port}`;
38
39 return createClient({
40 url,
41 database: database ?? connectionOptions.database,
42 username,
43 ...(password ? { password } : {}),
44 clickhouse_settings: {
45 allow_experimental_object_type: 1,
46 },
47 });
48}
49
50class ClickhouseMigrationStorage implements UmzugStorage<ClickHouseClient> {
51 private readonly client: ClickHouseClient;
52 private readonly tableIdentifier: string;
53 private isTableEnsured = false;
54
55 constructor() {
56 this.client = createClickhouseClient();
57 this.tableIdentifier = `${connectionOptions.database}.${MIGRATIONS_TABLE}`;
58 }
59
60 async logMigration({ name }: { name: string }): Promise<void> {
61 await this.ensureTable();
62 await this.client.insert({
63 table: this.tableIdentifier,
64 format: 'JSONEachRow',
65 values: [
66 {
67 name,
68 executed_at: Math.floor(Date.now() / 1000),
69 },
70 ],
71 });
72 }
73
74 async unlogMigration({ name }: { name: string }): Promise<void> {
75 await this.ensureTable();
76 await this.client.command({
77 query: `ALTER TABLE ${this.tableIdentifier} DELETE WHERE name = {name:String}`,
78 query_params: { name },
79 });
80 }
81
82 async executed(): Promise<string[]> {
83 await this.ensureTable();
84
85 try {
86 const result = await this.client.query({
87 query: `SELECT name FROM ${this.tableIdentifier} ORDER BY executed_at`,
88 format: 'JSONEachRow',
89 });
90 const rows = await result.json<{ name: string }>();
91 return rows.map((row) => row.name);
92 } catch (error: unknown) {
93 if (
94 typeof error === 'object' &&
95 error !== null &&
96 'code' in error &&
97 // ClickHouse error code 60 => TABLE_DOESNT_EXIST
98 (error as { code?: number }).code === 60
99 ) {
100 return [];
101 }
102 throw error;
103 }
104 }
105
106 async shutdown(): Promise<void> {
107 await this.client.close();
108 }
109
110 private async ensureTable(): Promise<void> {
111 if (this.isTableEnsured) {
112 return;
113 }
114
115 await this.client.command({
116 query: `
117 CREATE TABLE IF NOT EXISTS ${this.tableIdentifier} (
118 name String,
119 executed_at DateTime64(3) DEFAULT now()
120 )
121 ENGINE = MergeTree
122 ORDER BY (name)
123 SETTINGS index_granularity = 1
124 `,
125 });
126
127 this.isTableEnsured = true;
128 }
129}
130
131export default {
132 supportedEnvironments: ['staging', 'prod'],
133 supportedScriptFormats: ['sql'] as const,
134 defaultScriptFormat: 'sql' as const,
135 scriptsDirectory: relativePath('../scripts/clickhouse'),
136
137 createStorage() {
138 return new ClickhouseMigrationStorage();
139 },
140
141 async destroyStorage(storage: ClickhouseMigrationStorage) {
142 await storage.shutdown();
143 },
144
145 createContext(): ClickHouseClient {
146 return createClickhouseClient();
147 },
148
149 async destroyContext(context: ClickHouseClient) {
150 await context.close();
151 },
152
153 resolveScript(params) {
154 const { path, name, context } = params;
155
156 if (!path.endsWith('.sql')) {
157 throw new Error(
158 `Unsupported ClickHouse migration format for ${name}. Expected .sql file.`,
159 );
160 }
161
162 const fileContents = readFileSync(path, 'utf8');
163 const statements = splitSqlStatements(fileContents);
164 const baseResult = {
165 name,
166 async up(): Promise<void> {
167 for (const statement of statements) {
168 if (!statement.length) {
169 continue;
170 }
171
172 await context.command({ query: statement });
173 }
174 },
175 };
176
177 return wrapMigration(
178 {
179 runBefore: async () => {
180 await context.command({
181 query: 'SET allow_experimental_object_type = 1;',
182 });
183 },
184 },
185 baseResult,
186 );
187 },
188
189 async dropDbAndDisconnect() {
190 const client = createClickhouseClient(connectionOptions.database);
191
192 try {
193 await client.command({
194 query: `DROP TABLE IF EXISTS ${connectionOptions.database}.${MIGRATIONS_TABLE}`,
195 });
196 } finally {
197 await client.close();
198 }
199 },
200
201 async prepareDbAndDisconnect() {
202 const client = createClickhouseClient(connectionOptions.database);
203
204 try {
205 await client.command({
206 query: `CREATE DATABASE IF NOT EXISTS ${connectionOptions.database}`,
207 });
208 } finally {
209 await client.close();
210 }
211 },
212} satisfies DatabaseConfig<'sql', ClickHouseClient, ClickhouseMigrationStorage>;
213
214function splitSqlStatements(sql: string): string[] {
215 const statements: string[] = [];
216 let buffer = '';
217
218 for (const line of sql.split('\n')) {
219 const trimmedLine = line.trim();
220
221 if (!trimmedLine.length || trimmedLine.startsWith('--')) {
222 continue;
223 }
224
225 buffer += buffer.length ? `\n${line}` : line;
226
227 if (trimmedLine.endsWith(';')) {
228 const statement = buffer.replace(/;\s*$/u, '').trim();
229 if (statement.length) {
230 statements.push(statement);
231 }
232 buffer = '';
233 }
234 }
235
236 const remaining = buffer.trim();
237 if (remaining.length) {
238 statements.push(remaining);
239 }
240
241 return statements;
242}
243