AppView in a box as a Vite plugin thing hatk.dev
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: auto-schema migration on startup

Diff lexicon-derived schema against actual DB columns on every startup.
Emit ALTER TABLE ADD/DROP COLUMN for changes, drop orphaned child tables,
and trigger backfill when new empty collection tables are detected.

Also includes:
- Skip post-backfill restart in dev mode (DEV_MODE env)
- Suppress ECONNREFUSED proxy errors in Vite plugin during startup
- Move db/schema.sql write after setup hooks
- Normalize SQL parameter passing to use arrays consistently

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+939 -185
+48
docs/plans/2026-03-13-auto-schema-migration-design.md
··· 1 + # Auto-Schema Migration Design 2 + 3 + **Goal:** When a developer changes a lexicon, hatk automatically migrates the database schema on startup and re-syncs affected data. No migration files, no manual commands. 4 + 5 + ## How It Works 6 + 7 + On every startup, after `initDatabase` creates any missing tables but before setup hooks or indexing: 8 + 9 + 1. For each collection table, query the DB for its current columns 10 + - SQLite: `PRAGMA table_info(tableName)` 11 + - DuckDB: `SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ?` 12 + 13 + 2. Compare against `generateTableSchema()` output from the current lexicons 14 + 15 + 3. Emit changes: 16 + - **New column in lexicon** → `ALTER TABLE ADD COLUMN name TYPE [DEFAULT NULL]` 17 + - **Column removed from lexicon** → `ALTER TABLE DROP COLUMN name` 18 + - **Column type changed** → drop column + re-add with new type 19 + - **New child/union table** → already handled by `CREATE TABLE IF NOT EXISTS` 20 + - **Child/union table removed** → `DROP TABLE IF EXISTS` 21 + 22 + 4. Apply same logic to child tables and union branch tables 23 + 24 + 5. Log every change: 25 + ``` 26 + [migration] added column "rating" INTEGER to "fm.teal.alpha.feed.play" 27 + [migration] dropped column "old_field" from "fm.teal.alpha.feed.play" 28 + ``` 29 + 30 + 6. No automatic re-sync for column changes — existing data is already indexed, and new fields won't exist in old network records. But if any collection table is **empty** (newly added collection), mark all repos as `pending` so the backfill cycle picks up records for the new collection. 31 + 32 + ## What This Means for Developers 33 + 34 + - Change a lexicon field → restart dev server → schema updates automatically → data re-syncs 35 + - Add a new collection → restart → table created → backfill picks it up 36 + - Remove a collection → restart → warning logged, run `hatk destroy collection` to clean up 37 + - No migration files, no migration history, no `hatk migrate` command 38 + 39 + ## Edge Cases 40 + 41 + - **SQLite ALTER TABLE limitations**: SQLite doesn't support `DROP COLUMN` before 3.35.0. `better-sqlite3` bundles SQLite 3.45+, so this is fine. 42 + - **Indexes**: If a ref column is added, the corresponding index is created. If removed, the index is dropped with the column. 43 + - **FTS**: FTS indexes are rebuilt on every startup anyway, so schema changes are automatically reflected. 44 + - **Data loss on column drop/type change**: Acceptable because data comes from the AT Protocol network and is recoverable via backfill. Log clearly so developers understand what happened. 45 + 46 + ## Implementation Location 47 + 48 + Add a `migrateSchema()` function in `database/db.ts` called from `initDatabase` after DDL execution. It receives the `TableSchema[]` and compares against the live database state.
+540
docs/plans/2026-03-13-auto-schema-migration.md
··· 1 + # Auto-Schema Migration Implementation Plan 2 + 3 + > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. 4 + 5 + **Goal:** When a developer changes a lexicon, hatk automatically migrates the database schema on startup and re-syncs affected data — no migration files, no manual commands. 6 + 7 + **Architecture:** Add a `migrateSchema()` function in `database/db.ts` that runs after `initDatabase` creates tables but before setup hooks. It compares each collection's current DB columns against `generateTableSchema()` output and emits `ALTER TABLE` statements. If any columns changed, all repos are marked for re-sync. 8 + 9 + **Tech Stack:** SQLite (`PRAGMA table_info`), DuckDB (`information_schema.columns`), existing `TableSchema`/`SqlDialect` types 10 + 11 + --- 12 + 13 + ### Task 1: Add introspection methods to SqlDialect 14 + 15 + **Files:** 16 + - Modify: `packages/hatk/src/database/dialect.ts` 17 + 18 + **Step 1: Add `introspectColumns` to `SqlDialect` interface** 19 + 20 + Add a method that returns the SQL query to get column names and types for a given table. The query should return rows with `column_name` and `data_type` fields. 21 + 22 + ```typescript 23 + // Add to SqlDialect interface: 24 + /** SQL to get columns for a table. Receives table name as $1/?. Returns rows with column_name, data_type. */ 25 + introspectColumnsQuery(tableName: string): string 26 + ``` 27 + 28 + **Step 2: Implement for DuckDB** 29 + 30 + ```typescript 31 + // In DUCKDB_DIALECT: 32 + introspectColumnsQuery: (tableName: string) => 33 + `SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '${tableName}'`, 34 + ``` 35 + 36 + **Step 3: Implement for SQLite** 37 + 38 + SQLite's `PRAGMA table_info` returns `name` and `type` columns. Wrap it to match the expected shape: 39 + 40 + ```typescript 41 + // In SQLITE_DIALECT: 42 + introspectColumnsQuery: (tableName: string) => 43 + `PRAGMA table_info("${tableName}")`, 44 + ``` 45 + 46 + Note: SQLite PRAGMA returns `name` and `type` instead of `column_name` and `data_type`. The migration function will need to normalize these. 47 + 48 + **Step 4: Verify the build compiles** 49 + 50 + Run: `cd packages/hatk && npx tsc --noEmit` 51 + Expected: No errors 52 + 53 + --- 54 + 55 + ### Task 2: Write migrateSchema function — column introspection 56 + 57 + **Files:** 58 + - Modify: `packages/hatk/src/database/db.ts` 59 + 60 + **Step 1: Add the `migrateSchema` function signature and column fetching** 61 + 62 + Add after `initDatabase`. This function queries each collection table for its current columns and compares against the expected schema. 63 + 64 + ```typescript 65 + interface MigrationChange { 66 + table: string 67 + action: 'add' | 'drop' | 'retype' 68 + column: string 69 + type?: string 70 + } 71 + 72 + export async function migrateSchema(tableSchemas: TableSchema[]): Promise<MigrationChange[]> { 73 + const changes: MigrationChange[] = [] 74 + 75 + for (const schema of tableSchemas) { 76 + if (schema.columns.length === 0) continue // generic JSON storage, skip 77 + 78 + const tableName = schema.collection 79 + const existingCols = await getExistingColumns(tableName) 80 + if (existingCols.size === 0) continue // table just created, nothing to migrate 81 + 82 + // Compare columns (next task) 83 + } 84 + 85 + return changes 86 + } 87 + ``` 88 + 89 + **Step 2: Add `getExistingColumns` helper** 90 + 91 + This queries the DB and returns a Map of column name → type, normalizing across SQLite/DuckDB. 92 + 93 + ```typescript 94 + async function getExistingColumns(tableName: string): Promise<Map<string, string>> { 95 + const cols = new Map<string, string>() 96 + try { 97 + const query = dialect.introspectColumnsQuery(tableName) 98 + const rows = await all(query) 99 + for (const row of rows) { 100 + // SQLite PRAGMA returns { name, type }, DuckDB returns { column_name, data_type } 101 + const name = (row.column_name || row.name) as string 102 + const type = ((row.data_type || row.type) as string).toUpperCase() 103 + cols.set(name, type) 104 + } 105 + } catch { 106 + // Table doesn't exist yet 107 + } 108 + return cols 109 + } 110 + ``` 111 + 112 + **Step 3: Verify the build compiles** 113 + 114 + Run: `cd packages/hatk && npx tsc --noEmit` 115 + Expected: No errors 116 + 117 + --- 118 + 119 + ### Task 3: Implement column diff logic 120 + 121 + **Files:** 122 + - Modify: `packages/hatk/src/database/db.ts` 123 + 124 + **Step 1: Add column comparison inside `migrateSchema`** 125 + 126 + Replace the `// Compare columns (next task)` comment with: 127 + 128 + ```typescript 129 + // Expected columns: base columns (uri, cid, did, indexed_at) + schema columns 130 + const expectedCols = new Map<string, string>() 131 + expectedCols.set('uri', 'TEXT') 132 + expectedCols.set('cid', 'TEXT') 133 + expectedCols.set('did', 'TEXT') 134 + expectedCols.set('indexed_at', dialect.timestampType.toUpperCase()) 135 + for (const col of schema.columns) { 136 + expectedCols.set(col.name, col.sqlType.toUpperCase()) 137 + } 138 + 139 + // Find new columns (in expected but not in existing) 140 + for (const [colName, colType] of expectedCols) { 141 + if (!existingCols.has(colName)) { 142 + changes.push({ table: tableName, action: 'add', column: colName, type: colType }) 143 + } 144 + } 145 + 146 + // Find removed columns (in existing but not in expected) 147 + for (const [colName] of existingCols) { 148 + if (!expectedCols.has(colName)) { 149 + changes.push({ table: tableName, action: 'drop', column: colName }) 150 + } 151 + } 152 + 153 + // Find type changes (same name, different type) 154 + for (const [colName, colType] of expectedCols) { 155 + const existingType = existingCols.get(colName) 156 + if (existingType && existingType !== colType) { 157 + changes.push({ table: tableName, action: 'retype', column: colName, type: colType }) 158 + } 159 + } 160 + ``` 161 + 162 + **Step 2: Verify the build compiles** 163 + 164 + Run: `cd packages/hatk && npx tsc --noEmit` 165 + Expected: No errors 166 + 167 + --- 168 + 169 + ### Task 4: Implement child table and union table diffing 170 + 171 + **Files:** 172 + - Modify: `packages/hatk/src/database/db.ts` 173 + 174 + **Step 1: Add child table migration logic** 175 + 176 + After the main table diff in `migrateSchema`, add diffing for child tables and union branch tables. These follow the same pattern but with `parent_uri` and `parent_did` as base columns instead of `uri/cid/did/indexed_at`. 177 + 178 + ```typescript 179 + // Diff child tables 180 + for (const child of schema.children) { 181 + const childTable = child.tableName.replace(/"/g, '') 182 + const existingChildCols = await getExistingColumns(childTable) 183 + if (existingChildCols.size === 0) continue 184 + 185 + const expectedChildCols = new Map<string, string>() 186 + expectedChildCols.set('parent_uri', 'TEXT') 187 + expectedChildCols.set('parent_did', 'TEXT') 188 + for (const col of child.columns) { 189 + expectedChildCols.set(col.name, col.sqlType.toUpperCase()) 190 + } 191 + 192 + for (const [colName, colType] of expectedChildCols) { 193 + if (!existingChildCols.has(colName)) { 194 + changes.push({ table: childTable, action: 'add', column: colName, type: colType }) 195 + } 196 + } 197 + for (const [colName] of existingChildCols) { 198 + if (!expectedChildCols.has(colName)) { 199 + changes.push({ table: childTable, action: 'drop', column: colName }) 200 + } 201 + } 202 + for (const [colName, colType] of expectedChildCols) { 203 + const existingType = existingChildCols.get(colName) 204 + if (existingType && existingType !== colType) { 205 + changes.push({ table: childTable, action: 'retype', column: colName, type: colType }) 206 + } 207 + } 208 + } 209 + 210 + // Diff union branch tables 211 + for (const union of schema.unions) { 212 + for (const branch of union.branches) { 213 + const branchTable = branch.tableName.replace(/"/g, '') 214 + const existingBranchCols = await getExistingColumns(branchTable) 215 + if (existingBranchCols.size === 0) continue 216 + 217 + const expectedBranchCols = new Map<string, string>() 218 + expectedBranchCols.set('parent_uri', 'TEXT') 219 + expectedBranchCols.set('parent_did', 'TEXT') 220 + for (const col of branch.columns) { 221 + expectedBranchCols.set(col.name, col.sqlType.toUpperCase()) 222 + } 223 + 224 + for (const [colName, colType] of expectedBranchCols) { 225 + if (!existingBranchCols.has(colName)) { 226 + changes.push({ table: branchTable, action: 'add', column: colName, type: colType }) 227 + } 228 + } 229 + for (const [colName] of existingBranchCols) { 230 + if (!expectedBranchCols.has(colName)) { 231 + changes.push({ table: branchTable, action: 'drop', column: colName }) 232 + } 233 + } 234 + for (const [colName, colType] of expectedBranchCols) { 235 + const existingType = existingBranchCols.get(colName) 236 + if (existingType && existingType !== colType) { 237 + changes.push({ table: branchTable, action: 'retype', column: colName, type: colType }) 238 + } 239 + } 240 + } 241 + } 242 + ``` 243 + 244 + **Step 2: Verify the build compiles** 245 + 246 + Run: `cd packages/hatk && npx tsc --noEmit` 247 + Expected: No errors 248 + 249 + --- 250 + 251 + ### Task 5: Apply ALTER TABLE statements 252 + 253 + **Files:** 254 + - Modify: `packages/hatk/src/database/db.ts` 255 + 256 + **Step 1: Add `applyMigrationChanges` function** 257 + 258 + This takes the list of changes and executes the corresponding SQL. 259 + 260 + ```typescript 261 + async function applyMigrationChanges(changes: MigrationChange[]): Promise<void> { 262 + for (const change of changes) { 263 + const quotedTable = `"${change.table}"` 264 + try { 265 + switch (change.action) { 266 + case 'add': 267 + await run(`ALTER TABLE ${quotedTable} ADD COLUMN ${change.column} ${change.type}`) 268 + emit('migration', `added column "${change.column}" ${change.type} to "${change.table}"`) 269 + break 270 + case 'drop': 271 + await run(`ALTER TABLE ${quotedTable} DROP COLUMN ${change.column}`) 272 + emit('migration', `dropped column "${change.column}" from "${change.table}"`) 273 + break 274 + case 'retype': 275 + // Drop and re-add with new type 276 + await run(`ALTER TABLE ${quotedTable} DROP COLUMN ${change.column}`) 277 + await run(`ALTER TABLE ${quotedTable} ADD COLUMN ${change.column} ${change.type}`) 278 + emit('migration', `changed column "${change.column}" type to ${change.type} in "${change.table}"`) 279 + break 280 + } 281 + } catch (err: any) { 282 + emit('migration', `failed to ${change.action} column "${change.column}" on "${change.table}": ${err.message}`) 283 + } 284 + } 285 + } 286 + ``` 287 + 288 + **Step 2: Call `applyMigrationChanges` from `migrateSchema`** 289 + 290 + At the end of `migrateSchema`, before `return changes`: 291 + 292 + ```typescript 293 + if (changes.length > 0) { 294 + await applyMigrationChanges(changes) 295 + } 296 + 297 + return changes 298 + ``` 299 + 300 + **Step 3: Verify the build compiles** 301 + 302 + Run: `cd packages/hatk && npx tsc --noEmit` 303 + Expected: No errors 304 + 305 + --- 306 + 307 + ### Task 6: Handle orphaned child/union tables 308 + 309 + **Files:** 310 + - Modify: `packages/hatk/src/database/db.ts` 311 + 312 + **Step 1: Add orphaned table detection and cleanup to `migrateSchema`** 313 + 314 + After diffing each collection's child/union tables, detect tables that exist in the DB with the collection prefix but are no longer referenced in the schema. 315 + 316 + ```typescript 317 + // Detect orphaned child/union tables 318 + for (const schema of tableSchemas) { 319 + if (schema.columns.length === 0) continue 320 + 321 + const expectedTables = new Set<string>() 322 + for (const child of schema.children) { 323 + expectedTables.add(child.tableName.replace(/"/g, '')) 324 + } 325 + for (const union of schema.unions) { 326 + for (const branch of union.branches) { 327 + expectedTables.add(branch.tableName.replace(/"/g, '')) 328 + } 329 + } 330 + 331 + // Query existing tables that match the collection prefix 332 + try { 333 + const rows = await all(dialect.listTablesQuery) 334 + for (const row of rows) { 335 + const name = row.table_name as string 336 + if (name.startsWith(schema.collection + '__') && !expectedTables.has(name)) { 337 + await run(`DROP TABLE IF EXISTS "${name}"`) 338 + emit('migration', `dropped orphaned table "${name}"`) 339 + changes.push({ table: name, action: 'drop', column: '*' }) 340 + } 341 + } 342 + } catch {} 343 + } 344 + ``` 345 + 346 + **Step 2: Verify the build compiles** 347 + 348 + Run: `cd packages/hatk && npx tsc --noEmit` 349 + Expected: No errors 350 + 351 + --- 352 + 353 + ### Task 7: Handle index creation for new ref columns 354 + 355 + **Files:** 356 + - Modify: `packages/hatk/src/database/db.ts` 357 + 358 + **Step 1: Add index creation for newly added ref columns** 359 + 360 + When a new column is added, if it's a ref column, create the corresponding index. Add this logic inside the `case 'add':` block in `applyMigrationChanges`. 361 + 362 + ```typescript 363 + case 'add': { 364 + await run(`ALTER TABLE ${quotedTable} ADD COLUMN ${change.column} ${change.type}`) 365 + emit('migration', `added column "${change.column}" ${change.type} to "${change.table}"`) 366 + // Create index for ref columns 367 + const schema = schemas.get(change.table) 368 + if (schema?.refColumns.includes(change.column)) { 369 + const prefix = change.table.replace(/\./g, '_') 370 + await run(`CREATE INDEX IF NOT EXISTS idx_${prefix}_${change.column} ON ${quotedTable}(${change.column})`) 371 + emit('migration', `created index for ref column "${change.column}" on "${change.table}"`) 372 + } 373 + break 374 + } 375 + ``` 376 + 377 + **Step 2: Verify the build compiles** 378 + 379 + Run: `cd packages/hatk && npx tsc --noEmit` 380 + Expected: No errors 381 + 382 + --- 383 + 384 + ### Task 8: Trigger backfill for new empty collection tables 385 + 386 + **Files:** 387 + - Modify: `packages/hatk/src/database/db.ts` 388 + 389 + **Step 1: Add empty-table detection to `migrateSchema`** 390 + 391 + After applying ALTER changes and orphaned table cleanup, check if any collection table has zero rows. If so, mark all repos as pending so backfill picks up the new collection. 392 + 393 + Add at the end of `migrateSchema`, before `return changes`: 394 + 395 + ```typescript 396 + // Check for empty collection tables — these are newly added and need backfill 397 + for (const schema of tableSchemas) { 398 + if (schema.columns.length === 0) continue 399 + try { 400 + const [row] = await all(`SELECT 1 FROM ${schema.tableName} LIMIT 1`) 401 + if (!row) { 402 + await run(`UPDATE _repos SET status = 'pending'`) 403 + emit('migration', `new collection "${schema.collection}" detected, marking repos for backfill`) 404 + break // only need to mark once 405 + } 406 + } catch {} 407 + } 408 + ``` 409 + 410 + **Step 2: Verify the build compiles** 411 + 412 + Run: `cd packages/hatk && npx tsc --noEmit` 413 + Expected: No errors 414 + 415 + --- 416 + 417 + ### Task 9: Wire migrateSchema into main.ts 418 + 419 + **Files:** 420 + - Modify: `packages/hatk/src/main.ts` 421 + 422 + **Step 1: Import migrateSchema** 423 + 424 + Add to the existing import from `./database/db.ts`: 425 + 426 + ```typescript 427 + import { initDatabase, getCursor, querySQL, getSqlDialect, getSchemaDump, migrateSchema } from './database/db.ts' 428 + ``` 429 + 430 + **Step 2: Call migrateSchema after initDatabase, before setup hooks** 431 + 432 + Insert between `initDatabase` (line 89) and the schema.sql write (line 94): 433 + 434 + ```typescript 435 + // Auto-migrate schema if lexicons changed 436 + const migrationChanges = await migrateSchema(schemas) 437 + if (migrationChanges.length > 0) { 438 + log(`[main] Applied ${migrationChanges.length} schema migration(s)`) 439 + } 440 + ``` 441 + 442 + This runs after `initDatabase` (which creates new tables) but before setup hooks and the backfill cycle. 443 + 444 + **Step 3: Verify the build compiles** 445 + 446 + Run: `cd packages/hatk && npx tsc --noEmit` 447 + Expected: No errors 448 + 449 + --- 450 + 451 + ### Task 10: Remove hardcoded handle migration 452 + 453 + **Files:** 454 + - Modify: `packages/hatk/src/database/db.ts` 455 + 456 + **Step 1: Remove the manual handle migration from `initDatabase`** 457 + 458 + The existing hardcoded migration for the `handle` column in `_repos` (lines 74-79) is now superseded by the general migration system. However, `_repos` is an internal table not covered by lexicon schemas. Keep it as-is for now — it doesn't conflict and handles a one-time migration for existing databases. 459 + 460 + No code change needed. This task is a deliberate no-op to document the decision. 461 + 462 + --- 463 + 464 + ### Task 11: Manual integration test — add a column 465 + 466 + **Step 1: In the teal template, add a field to a lexicon** 467 + 468 + Edit `~/code/hatk-template-teal/lexicons/fm/teal/alpha/feed/play.json` and add a `rating` integer field to the record properties. 469 + 470 + **Step 2: Restart the dev server** 471 + 472 + Run: `cd ~/code/hatk-template-teal && npm run dev` 473 + 474 + **Step 3: Check the logs** 475 + 476 + Expected output should include: 477 + ``` 478 + [migration] added column "rating" INTEGER to "fm.teal.alpha.feed.play" 479 + ``` 480 + 481 + **Step 4: Verify the schema.sql file** 482 + 483 + Run: `cat ~/code/hatk-template-teal/db/schema.sql` 484 + 485 + The `fm.teal.alpha.feed.play` table should now include the `rating` column. 486 + 487 + **Step 5: Revert the lexicon change** 488 + 489 + Remove the `rating` field from the lexicon file. Restart the dev server again. 490 + 491 + Expected output: 492 + ``` 493 + [migration] dropped column "rating" from "fm.teal.alpha.feed.play" 494 + ``` 495 + 496 + --- 497 + 498 + ### Task 12: Manual integration test — remove a child table 499 + 500 + **Step 1: In the teal template, remove the `artists` array field from the play lexicon** 501 + 502 + This should cause the `fm.teal.alpha.feed.play__artists` table to be dropped on next startup. 503 + 504 + **Step 2: Restart the dev server** 505 + 506 + Expected output: 507 + ``` 508 + [migration] dropped orphaned table "fm.teal.alpha.feed.play__artists" 509 + ``` 510 + 511 + **Step 3: Revert the lexicon change** 512 + 513 + Restore the `artists` array field. Restart — the table should be recreated automatically by `CREATE TABLE IF NOT EXISTS`. 514 + 515 + --- 516 + 517 + ### Task 13: Build and verify 518 + 519 + **Step 1: Run the build** 520 + 521 + Run: `cd packages/hatk && npm run build` 522 + Expected: Clean build, no errors 523 + 524 + **Step 2: Verify with TypeScript** 525 + 526 + Run: `cd packages/hatk && npx tsc --noEmit` 527 + Expected: No errors 528 + 529 + --- 530 + 531 + ### Task 14: Commit 532 + 533 + ```bash 534 + git add packages/hatk/src/database/db.ts packages/hatk/src/database/dialect.ts packages/hatk/src/main.ts 535 + git commit -m "feat: auto-schema migration on startup 536 + 537 + Diff lexicon-derived schema against actual DB columns on every startup. 538 + Emit ALTER TABLE ADD/DROP COLUMN for changes, drop orphaned child tables, 539 + and trigger backfill when new empty collection tables are detected." 540 + ```
+3 -3
packages/hatk/src/backfill.ts
··· 220 220 for (const col of collections) { 221 221 const schema = getSchema(col) 222 222 if (!schema) continue 223 - await runSQL(`DELETE FROM ${schema.tableName} WHERE did = $1`, did) 223 + await runSQL(`DELETE FROM ${schema.tableName} WHERE did = $1`, [did]) 224 224 for (const child of schema.children) { 225 - await runSQL(`DELETE FROM ${child.tableName} WHERE parent_did = $1`, did) 225 + await runSQL(`DELETE FROM ${child.tableName} WHERE parent_did = $1`, [did]) 226 226 } 227 227 for (const union of schema.unions) { 228 228 for (const branch of union.branches) { 229 - await runSQL(`DELETE FROM ${branch.tableName} WHERE parent_did = $1`, did) 229 + await runSQL(`DELETE FROM ${branch.tableName} WHERE parent_did = $1`, [did]) 230 230 } 231 231 } 232 232 }
+1 -1
packages/hatk/src/database/adapters/duckdb.ts
··· 73 73 }) 74 74 } 75 75 76 - async createBulkInserter(table: string, columns: string[]): Promise<BulkInserter> { 76 + async createBulkInserter(table: string, columns: string[], _options?: { onConflict?: 'ignore' | 'replace'; batchSize?: number }): Promise<BulkInserter> { 77 77 const appender = await this.writeCon.createAppender(table.replace(/"/g, '')) 78 78 return { 79 79 append(values: unknown[]) {
+20 -35
packages/hatk/src/database/adapters/sqlite.ts
··· 63 63 this.db.exec('ROLLBACK') 64 64 } 65 65 66 - async createBulkInserter(table: string, columns: string[]): Promise<BulkInserter> { 66 + async createBulkInserter(table: string, columns: string[], options?: { onConflict?: 'ignore' | 'replace'; batchSize?: number }): Promise<BulkInserter> { 67 67 const placeholders = columns.map(() => '?').join(', ') 68 - const sql = `INSERT INTO ${table} (${columns.join(', ')}) VALUES (${placeholders})` 68 + const conflict = options?.onConflict === 'ignore' ? ' OR IGNORE' : options?.onConflict === 'replace' ? ' OR REPLACE' : '' 69 + const sql = `INSERT${conflict} INTO ${table} (${columns.join(', ')}) VALUES (${placeholders})` 69 70 const stmt = this.db.prepare(sql) 70 71 const buffer: unknown[][] = [] 71 - const BATCH_SIZE = 500 72 + const batchSize = options?.batchSize ?? 5000 72 73 73 - const self = this 74 + const flushBuffer = this.db.transaction(() => { 75 + for (const row of buffer) { 76 + stmt.run(...row) 77 + } 78 + }) 79 + 80 + const flush = () => { 81 + if (buffer.length > 0) { 82 + flushBuffer() 83 + buffer.length = 0 84 + } 85 + } 86 + 74 87 return { 75 88 append(values: unknown[]) { 76 89 buffer.push(values) 77 - if (buffer.length >= BATCH_SIZE) { 78 - const tx = self.db.transaction(() => { 79 - for (const row of buffer) { 80 - stmt.run(...row) 81 - } 82 - }) 83 - tx() 84 - buffer.length = 0 85 - } 90 + if (buffer.length >= batchSize) flush() 86 91 }, 87 - async flush() { 88 - if (buffer.length > 0) { 89 - const tx = self.db.transaction(() => { 90 - for (const row of buffer) { 91 - stmt.run(...row) 92 - } 93 - }) 94 - tx() 95 - buffer.length = 0 96 - } 97 - }, 98 - async close() { 99 - if (buffer.length > 0) { 100 - const tx = self.db.transaction(() => { 101 - for (const row of buffer) { 102 - stmt.run(...row) 103 - } 104 - }) 105 - tx() 106 - buffer.length = 0 107 - } 108 - }, 92 + async flush() { flush() }, 93 + async close() { flush() }, 109 94 } 110 95 } 111 96 }
+256 -82
packages/hatk/src/database/db.ts
··· 17 17 port?.close() 18 18 } 19 19 20 - async function run(sql: string, ...params: any[]): Promise<void> { 20 + async function run(sql: string, params: any[] = []): Promise<void> { 21 21 return port.execute(sql, params) 22 22 } 23 23 ··· 37 37 } 38 38 } 39 39 40 - async function all(sql: string, ...params: any[]): Promise<any[]> { 40 + async function all(sql: string, params: any[] = []): Promise<any[]> { 41 41 return port.query(sql, params) 42 42 } 43 43 ··· 122 122 await port.executeMultiple(OAUTH_DDL) 123 123 } 124 124 125 + interface MigrationChange { 126 + table: string 127 + action: 'add' | 'drop' | 'retype' 128 + column: string 129 + type?: string 130 + } 131 + 132 + /** Normalize SQL type names to handle dialect differences (e.g. VARCHAR → TEXT) */ 133 + function normalizeType(type: string): string { 134 + const upper = type.toUpperCase() 135 + if (upper === 'VARCHAR' || upper === 'CHARACTER VARYING') return 'TEXT' 136 + if (upper === 'TIMESTAMP WITH TIME ZONE') return 'TIMESTAMPTZ' 137 + if (upper === 'BOOLEAN' || upper === 'BOOL') return 'BOOLEAN' 138 + if (upper === 'INT' || upper === 'INT4' || upper === 'INT8' || upper === 'BIGINT' || upper === 'SMALLINT') return 'INTEGER' 139 + return upper 140 + } 141 + 142 + async function getExistingColumns(tableName: string): Promise<Map<string, string>> { 143 + if (!/^[a-zA-Z0-9._]+$/.test(tableName)) { 144 + throw new Error(`Invalid table name for introspection: ${tableName}`) 145 + } 146 + const cols = new Map<string, string>() 147 + try { 148 + const query = dialect.introspectColumnsQuery(tableName) 149 + const rows = await all(query) 150 + for (const row of rows) { 151 + // SQLite PRAGMA returns { name, type }, DuckDB returns { column_name, data_type } 152 + const name = (row.column_name || row.name) as string 153 + const type = normalizeType((row.data_type || row.type || 'TEXT') as string) 154 + cols.set(name, type) 155 + } 156 + } catch { 157 + // Table doesn't exist yet 158 + } 159 + return cols 160 + } 161 + 162 + function diffColumns( 163 + tableName: string, 164 + existingCols: Map<string, string>, 165 + expectedCols: Map<string, string>, 166 + changes: MigrationChange[], 167 + ): void { 168 + for (const [colName, colType] of expectedCols) { 169 + if (!existingCols.has(colName)) { 170 + changes.push({ table: tableName, action: 'add', column: colName, type: colType }) 171 + } 172 + } 173 + for (const [colName] of existingCols) { 174 + if (!expectedCols.has(colName)) { 175 + changes.push({ table: tableName, action: 'drop', column: colName }) 176 + } 177 + } 178 + for (const [colName, colType] of expectedCols) { 179 + const existingType = existingCols.get(colName) 180 + if (existingType && normalizeType(existingType) !== normalizeType(colType)) { 181 + changes.push({ table: tableName, action: 'retype', column: colName, type: colType }) 182 + } 183 + } 184 + } 185 + 186 + /** Build expected columns map for a child/union table */ 187 + function buildChildExpectedCols(columns: { name: string; sqlType: string }[]): Map<string, string> { 188 + const expected = new Map<string, string>() 189 + expected.set('parent_uri', 'TEXT') 190 + expected.set('parent_did', 'TEXT') 191 + for (const col of columns) { 192 + expected.set(col.name, normalizeType(col.sqlType)) 193 + } 194 + return expected 195 + } 196 + 197 + export async function migrateSchema(tableSchemas: TableSchema[]): Promise<MigrationChange[]> { 198 + const changes: MigrationChange[] = [] 199 + 200 + for (const schema of tableSchemas) { 201 + if (schema.columns.length === 0) continue // generic JSON storage, skip 202 + 203 + const tableName = schema.collection 204 + const existingCols = await getExistingColumns(tableName) 205 + if (existingCols.size === 0) continue // table just created, nothing to migrate 206 + 207 + // Expected columns: base columns (uri, cid, did, indexed_at) + schema columns 208 + const expectedCols = new Map<string, string>() 209 + expectedCols.set('uri', 'TEXT') 210 + expectedCols.set('cid', 'TEXT') 211 + expectedCols.set('did', 'TEXT') 212 + expectedCols.set('indexed_at', normalizeType(dialect.timestampType)) 213 + for (const col of schema.columns) { 214 + expectedCols.set(col.name, normalizeType(col.sqlType)) 215 + } 216 + 217 + diffColumns(tableName, existingCols, expectedCols, changes) 218 + 219 + // Diff child tables 220 + for (const child of schema.children) { 221 + const childTable = child.tableName.replace(/"/g, '') 222 + const existingChildCols = await getExistingColumns(childTable) 223 + if (existingChildCols.size === 0) continue 224 + diffColumns(childTable, existingChildCols, buildChildExpectedCols(child.columns), changes) 225 + } 226 + 227 + // Diff union branch tables 228 + for (const union of schema.unions) { 229 + for (const branch of union.branches) { 230 + const branchTable = branch.tableName.replace(/"/g, '') 231 + const existingBranchCols = await getExistingColumns(branchTable) 232 + if (existingBranchCols.size === 0) continue 233 + diffColumns(branchTable, existingBranchCols, buildChildExpectedCols(branch.columns), changes) 234 + } 235 + } 236 + } 237 + 238 + // Detect and drop orphaned child/union tables (query table list once) 239 + let allTableNames: string[] | null = null 240 + try { 241 + const rows = await all(dialect.listTablesQuery) 242 + allTableNames = rows.map((r: any) => r.table_name as string) 243 + } catch {} 244 + 245 + if (allTableNames) { 246 + for (const schema of tableSchemas) { 247 + if (schema.columns.length === 0) continue 248 + 249 + const expectedTables = new Set<string>() 250 + for (const child of schema.children) { 251 + expectedTables.add(child.tableName.replace(/"/g, '')) 252 + } 253 + for (const union of schema.unions) { 254 + for (const branch of union.branches) { 255 + expectedTables.add(branch.tableName.replace(/"/g, '')) 256 + } 257 + } 258 + 259 + for (const name of allTableNames) { 260 + if (name.startsWith(schema.collection + '__') && !expectedTables.has(name)) { 261 + await run(`DROP TABLE IF EXISTS "${name}"`) 262 + emit('migration', 'drop_table', { table: name }) 263 + } 264 + } 265 + } 266 + } 267 + 268 + if (changes.length > 0) { 269 + await applyMigrationChanges(changes) 270 + } 271 + 272 + // Check for empty collection tables — these are newly added and need backfill 273 + // Skip on fresh DB (no repos yet) since backfill runs naturally 274 + const [hasRepos] = await all(`SELECT 1 FROM _repos LIMIT 1`) 275 + if (hasRepos) { 276 + for (const schema of tableSchemas) { 277 + if (schema.columns.length === 0) continue 278 + try { 279 + const [row] = await all(`SELECT 1 FROM ${schema.tableName} LIMIT 1`) 280 + if (!row) { 281 + await run(`UPDATE _repos SET status = 'pending' WHERE status = 'active'`) 282 + emit('migration', 'new_collection', { collection: schema.collection }) 283 + break // only need to mark once 284 + } 285 + } catch {} 286 + } 287 + } 288 + 289 + return changes 290 + } 291 + 292 + async function applyMigrationChanges(changes: MigrationChange[]): Promise<void> { 293 + for (const change of changes) { 294 + const quotedTable = `"${change.table}"` 295 + const quotedColumn = `"${change.column}"` 296 + try { 297 + switch (change.action) { 298 + case 'add': { 299 + await run(`ALTER TABLE ${quotedTable} ADD COLUMN ${quotedColumn} ${change.type}`) 300 + emit('migration', 'add_column', { table: change.table, column: change.column, type: change.type }) 301 + const schema = schemas.get(change.table) 302 + if (schema?.refColumns.includes(change.column)) { 303 + const prefix = change.table.replace(/\./g, '_') 304 + await run(`CREATE INDEX IF NOT EXISTS idx_${prefix}_${change.column} ON ${quotedTable}(${quotedColumn})`) 305 + } 306 + break 307 + } 308 + case 'drop': 309 + await run(`ALTER TABLE ${quotedTable} DROP COLUMN ${quotedColumn}`) 310 + emit('migration', 'drop_column', { table: change.table, column: change.column }) 311 + break 312 + case 'retype': 313 + await run(`ALTER TABLE ${quotedTable} DROP COLUMN ${quotedColumn}`) 314 + await run(`ALTER TABLE ${quotedTable} ADD COLUMN ${quotedColumn} ${change.type}`) 315 + emit('migration', 'retype_column', { table: change.table, column: change.column, type: change.type }) 316 + break 317 + } 318 + } catch (err: any) { 319 + console.warn(`[migration] failed to ${change.action} column "${change.column}" on "${change.table}": ${err.message}`) 320 + emit('migration', 'error', { action: change.action, table: change.table, column: change.column, error: err.message }) 321 + } 322 + } 323 + } 324 + 125 325 export async function getCursor(key: string): Promise<string | null> { 126 - const rows = await all(`SELECT value FROM _cursor WHERE key = $1`, key) 326 + const rows = await all(`SELECT value FROM _cursor WHERE key = $1`, [key]) 127 327 return rows[0]?.value || null 128 328 } 129 329 130 330 export async function setCursor(key: string, value: string): Promise<void> { 131 - await run(`INSERT OR REPLACE INTO _cursor (key, value) VALUES ($1, $2)`, key, value) 331 + await run(`INSERT OR REPLACE INTO _cursor (key, value) VALUES ($1, $2)`, [key, value]) 132 332 } 133 333 134 334 export async function getRepoStatus(did: string): Promise<string | null> { 135 - const rows = await all(`SELECT status FROM _repos WHERE did = $1`, did) 335 + const rows = await all(`SELECT status FROM _repos WHERE did = $1`, [did]) 136 336 return rows[0]?.status || null 137 337 } 138 338 ··· 146 346 // Update existing row preserving handle if not provided 147 347 await run( 148 348 `UPDATE _repos SET status = $1, handle = COALESCE($2, handle), backfilled_at = $3, rev = COALESCE($4, rev), retry_count = 0, retry_after = 0 WHERE did = $5`, 149 - status, 150 - opts?.handle || null, 151 - new Date().toISOString(), 152 - rev || null, 153 - did, 349 + [status, opts?.handle || null, new Date().toISOString(), rev || null, did], 154 350 ) 155 351 // Insert if row didn't exist yet 156 352 await run( 157 353 `INSERT OR IGNORE INTO _repos (did, status, handle, backfilled_at, rev, retry_count, retry_after) VALUES ($1, $2, $3, $4, $5, 0, 0)`, 158 - did, 159 - status, 160 - opts?.handle || null, 161 - new Date().toISOString(), 162 - rev || null, 354 + [did, status, opts?.handle || null, new Date().toISOString(), rev || null], 163 355 ) 164 356 } else if (status === 'failed' && opts) { 165 357 await run( 166 358 `UPDATE _repos SET status = $1, retry_count = $2, retry_after = $3, handle = COALESCE($4, handle) WHERE did = $5`, 167 - status, 168 - opts.retryCount ?? 0, 169 - opts.retryAfter ?? 0, 170 - opts.handle || null, 171 - did, 359 + [status, opts.retryCount ?? 0, opts.retryAfter ?? 0, opts.handle || null, did], 172 360 ) 173 361 // If row didn't exist yet, insert it 174 362 await run( 175 363 `INSERT OR IGNORE INTO _repos (did, status, handle, retry_count, retry_after) VALUES ($1, $2, $3, $4, $5)`, 176 - did, 177 - status, 178 - opts.handle || null, 179 - opts.retryCount ?? 0, 180 - opts.retryAfter ?? 0, 364 + [did, status, opts.handle || null, opts.retryCount ?? 0, opts.retryAfter ?? 0], 181 365 ) 182 366 } else { 183 - await run(`UPDATE _repos SET status = $1 WHERE did = $2`, status, did) 184 - await run(`INSERT OR IGNORE INTO _repos (did, status) VALUES ($1, $2)`, did, status) 367 + await run(`UPDATE _repos SET status = $1 WHERE did = $2`, [status, did]) 368 + await run(`INSERT OR IGNORE INTO _repos (did, status) VALUES ($1, $2)`, [did, status]) 185 369 } 186 370 } 187 371 188 372 export async function getRepoRev(did: string): Promise<string | null> { 189 - const rows = await all(`SELECT rev FROM _repos WHERE did = $1`, did) 373 + const rows = await all(`SELECT rev FROM _repos WHERE did = $1`, [did]) 190 374 return rows[0]?.rev ?? null 191 375 } 192 376 193 377 export async function getRepoRetryInfo(did: string): Promise<{ retryCount: number; retryAfter: number } | null> { 194 - const rows = await all(`SELECT retry_count, retry_after FROM _repos WHERE did = $1`, did) 378 + const rows = await all(`SELECT retry_count, retry_after FROM _repos WHERE did = $1`, [did]) 195 379 if (rows.length === 0) return null 196 380 return { retryCount: Number(rows[0].retry_count), retryAfter: Number(rows[0].retry_after) } 197 381 } ··· 200 384 const now = Math.floor(Date.now() / 1000) 201 385 const rows = await all( 202 386 `SELECT did FROM _repos WHERE status = 'failed' AND retry_after <= $1 AND retry_count < $2`, 203 - now, 204 - maxRetries, 387 + [now, maxRetries], 205 388 ) 206 389 return rows.map((r: any) => r.did) 207 390 } ··· 240 423 241 424 const where = conditions.length ? ' WHERE ' + conditions.join(' AND ') : '' 242 425 243 - const countRows = await all(`SELECT ${dialect.countAsInteger} as total FROM _repos${where}`, ...params) 426 + const countRows = await all(`SELECT ${dialect.countAsInteger} as total FROM _repos${where}`, params) 244 427 const total = Number(countRows[0]?.total || 0) 245 428 246 429 const rows = await all( 247 430 `SELECT did, handle, status, backfilled_at, rev FROM _repos${where} ORDER BY CASE WHEN backfilled_at IS NULL THEN 1 ELSE 0 END, backfilled_at DESC, did LIMIT $${paramIdx++} OFFSET $${paramIdx++}`, 248 - ...params, 249 - limit, 250 - offset, 431 + [...params, limit, offset], 251 432 ) 252 433 253 434 return { repos: rows, total } ··· 340 521 const schema = schemas.get(collection) 341 522 if (!schema) throw new Error(`Unknown collection: ${collection}`) 342 523 const { sql, params } = buildInsertOp(collection, uri, cid, authorDid, record) 343 - await run(sql, ...params) 524 + await run(sql, params) 344 525 345 526 // Insert child table rows 346 527 for (const child of schema.children) { ··· 348 529 if (!Array.isArray(items)) continue 349 530 350 531 // Delete existing child rows (handles INSERT OR REPLACE on main table) 351 - await run(`DELETE FROM ${child.tableName} WHERE parent_uri = $1`, uri) 532 + await run(`DELETE FROM ${child.tableName} WHERE parent_uri = $1`, [uri]) 352 533 353 534 for (const item of items) { 354 535 const colNames = ['parent_uri', 'parent_did'] ··· 371 552 372 553 await run( 373 554 `INSERT INTO ${child.tableName} (${colNames.join(', ')}) VALUES (${placeholders.join(', ')})`, 374 - ...values, 555 + values, 375 556 ) 376 557 } 377 558 } ··· 386 567 387 568 // Delete existing branch rows (handles INSERT OR REPLACE) 388 569 for (const b of union.branches) { 389 - await run(`DELETE FROM ${b.tableName} WHERE parent_uri = $1`, uri) 570 + await run(`DELETE FROM ${b.tableName} WHERE parent_uri = $1`, [uri]) 390 571 } 391 572 392 573 if (branch.isArray && branch.arrayField) { ··· 412 593 } 413 594 await run( 414 595 `INSERT INTO ${branch.tableName} (${colNames.join(', ')}) VALUES (${placeholders.join(', ')})`, 415 - ...values, 596 + values, 416 597 ) 417 598 } 418 599 } else { ··· 436 617 } 437 618 await run( 438 619 `INSERT INTO ${branch.tableName} (${colNames.join(', ')}) VALUES (${placeholders.join(', ')})`, 439 - ...values, 620 + values, 440 621 ) 441 622 } 442 623 } ··· 455 636 const schema = schemas.get(collection) 456 637 if (!schema) return 457 638 for (const child of schema.children) { 458 - await run(`DELETE FROM ${child.tableName} WHERE parent_uri = $1`, uri) 639 + await run(`DELETE FROM ${child.tableName} WHERE parent_uri = $1`, [uri]) 459 640 } 460 641 for (const union of schema.unions) { 461 642 for (const branch of union.branches) { 462 - await run(`DELETE FROM ${branch.tableName} WHERE parent_uri = $1`, uri) 643 + await run(`DELETE FROM ${branch.tableName} WHERE parent_uri = $1`, [uri]) 463 644 } 464 645 } 465 - await run(`DELETE FROM ${schema.tableName} WHERE uri = $1`, uri) 646 + await run(`DELETE FROM ${schema.tableName} WHERE uri = $1`, [uri]) 466 647 } 467 648 468 649 export async function insertLabels( ··· 473 654 // Skip if an active (non-negated, non-expired, not-superseded-by-negation) label already exists 474 655 const existing = await all( 475 656 `SELECT 1 FROM _labels l1 WHERE l1.src = $1 AND l1.uri = $2 AND l1.val = $3 AND l1.neg = false AND (l1.exp IS NULL OR l1.exp > CURRENT_TIMESTAMP) AND NOT EXISTS (SELECT 1 FROM _labels l2 WHERE l2.uri = l1.uri AND l2.val = l1.val AND l2.neg = true AND l2.id > l1.id) LIMIT 1`, 476 - label.src, 477 - label.uri, 478 - label.val, 657 + [label.src, label.uri, label.val], 479 658 ) 480 659 if (!label.neg && existing.length > 0) continue 481 660 482 661 await run( 483 662 `INSERT INTO _labels (src, uri, val, neg, cts, exp) VALUES ($1, $2, $3, $4, $5, $6)`, 484 - label.src, 485 - label.uri, 486 - label.val, 487 - label.neg || false, 488 - label.cts || new Date().toISOString(), 489 - label.exp || null, 663 + [label.src, label.uri, label.val, label.neg || false, label.cts || new Date().toISOString(), label.exp || null], 490 664 ) 491 665 } 492 666 } ··· 500 674 const placeholders = uris.map((_, i) => `$${i + 1}`).join(',') 501 675 const rows = await all( 502 676 `SELECT src, uri, val, neg, cts, exp FROM _labels l1 WHERE uri IN (${placeholders}) AND (exp IS NULL OR exp > CURRENT_TIMESTAMP) AND neg = false AND NOT EXISTS (SELECT 1 FROM _labels l2 WHERE l2.uri = l1.uri AND l2.val = l1.val AND l2.neg = true AND l2.id > l1.id)`, 503 - ...uris, 677 + uris, 504 678 ) 505 679 const result = new Map<string, Array<any>>() 506 680 for (const row of rows) { ··· 832 1006 sql += ` ORDER BY t.${sortName} ${order.toUpperCase()}, t.cid ${order.toUpperCase()} LIMIT $${paramIdx++}` 833 1007 params.push(limit + 1) // fetch one extra for cursor 834 1008 835 - const rows = await all(sql, ...params) 1009 + const rows = await all(sql, params) 836 1010 const hasMore = rows.length > limit 837 1011 if (hasMore) rows.pop() 838 1012 ··· 876 1050 for (const [_collection, schema] of schemas) { 877 1051 const rows = await all( 878 1052 `SELECT t.*, r.handle FROM ${schema.tableName} t LEFT JOIN _repos r ON t.did = r.did WHERE t.uri = $1 AND (r.status IS NULL OR r.status != 'takendown')`, 879 - uri, 1053 + [uri], 880 1054 ) 881 1055 if (rows.length > 0) { 882 1056 const row = rows[0] ··· 913 1087 const placeholders = uris.map((_, i) => `$${i + 1}`).join(',') 914 1088 const rows = await all( 915 1089 `SELECT t.*, r.handle FROM ${schema.tableName} t LEFT JOIN _repos r ON t.did = r.did WHERE t.uri IN (${placeholders}) AND (r.status IS NULL OR r.status != 'takendown')`, 916 - ...uris, 1090 + uris, 917 1091 ) 918 1092 919 1093 // Batch-fetch child rows for all URIs ··· 1005 1179 LEFT JOIN _repos r ON m.did = r.did 1006 1180 WHERE m.uri IN (${placeholders}) 1007 1181 AND (r.status IS NULL OR r.status != 'takendown')`, 1008 - ...uriList, 1182 + uriList, 1009 1183 ) 1010 1184 1011 1185 // Re-attach scores and sort ··· 1056 1230 LIMIT $${paramIdx++}` 1057 1231 params.push(limit) 1058 1232 1059 - const rows = await all(exactSQL, ...params) 1233 + const rows = await all(exactSQL, params) 1060 1234 phasesUsed.push('exact') 1061 1235 for (const row of rows) { 1062 1236 if (!bm25Uris.has(row.uri)) { ··· 1097 1271 params.push(rebuiltAt, remaining + existingUris.size) 1098 1272 1099 1273 try { 1100 - const recentRows = await all(recentSQL, ...params) 1274 + const recentRows = await all(recentSQL, params) 1101 1275 phasesUsed.push('recent') 1102 1276 for (const row of recentRows) { 1103 1277 if (bm25Results.length >= limit) break ··· 1140 1314 LIMIT $2` 1141 1315 1142 1316 try { 1143 - const fuzzyRows = await all(fuzzySQL, query, remaining + existingUris.size) 1317 + const fuzzyRows = await all(fuzzySQL, [query, remaining + existingUris.size]) 1144 1318 phasesUsed.push('fuzzy') 1145 1319 for (const row of fuzzyRows) { 1146 1320 if (bm25Results.length >= limit) break ··· 1178 1352 1179 1353 // Raw SQL for script feeds 1180 1354 export async function querySQL(sql: string, params: any[] = []): Promise<any[]> { 1181 - return all(sql, ...params) 1355 + return all(sql, params) 1356 + } 1357 + 1358 + export async function runSQL(sql: string, params: any[] = []): Promise<void> { 1359 + return run(sql, params) 1182 1360 } 1183 1361 1184 - export async function runSQL(sql: string, ...params: any[]): Promise<void> { 1185 - return run(sql, ...params) 1362 + export async function createBulkInserterSQL(table: string, columns: string[], options?: { onConflict?: 'ignore' | 'replace'; batchSize?: number }): Promise<import('./ports.ts').BulkInserter> { 1363 + return port.createBulkInserter(table, columns, options) 1186 1364 } 1187 1365 1188 1366 export function getSchema(collection: string): TableSchema | undefined { ··· 1192 1370 export async function countByField(collection: string, field: string, value: string): Promise<number> { 1193 1371 const schema = schemas.get(collection) 1194 1372 if (!schema) return 0 1195 - const rows = await all(`SELECT COUNT(*) as count FROM ${schema.tableName} WHERE ${field} = $1`, value) 1373 + const rows = await all(`SELECT COUNT(*) as count FROM ${schema.tableName} WHERE ${field} = $1`, [value]) 1196 1374 return Number(rows[0]?.count || 0) 1197 1375 } 1198 1376 ··· 1207 1385 const placeholders = values.map((_, i) => `$${i + 1}`).join(',') 1208 1386 const rows = await all( 1209 1387 `SELECT ${field}, COUNT(*) as count FROM ${schema.tableName} WHERE ${field} IN (${placeholders}) GROUP BY ${field}`, 1210 - ...values, 1388 + values, 1211 1389 ) 1212 1390 const result = new Map<string, number>() 1213 1391 for (const row of rows) { ··· 1219 1397 export async function findByField(collection: string, field: string, value: string): Promise<any | null> { 1220 1398 const schema = schemas.get(collection) 1221 1399 if (!schema) return null 1222 - const rows = await all(`SELECT * FROM ${schema.tableName} WHERE ${field} = $1 LIMIT 1`, value) 1400 + const rows = await all(`SELECT * FROM ${schema.tableName} WHERE ${field} = $1 LIMIT 1`, [value]) 1223 1401 return rows[0] || null 1224 1402 } 1225 1403 ··· 1234 1412 const placeholders = values.map((_, i) => `$${i + 1}`).join(',') 1235 1413 const rows = await all( 1236 1414 `SELECT t.*, r.handle FROM ${schema.tableName} t LEFT JOIN _repos r ON t.did = r.did WHERE t.${field} IN (${placeholders})`, 1237 - ...values, 1415 + values, 1238 1416 ) 1239 1417 // Attach child data if this collection has decomposed arrays 1240 1418 if (schema.children.length > 0 && rows.length > 0) { ··· 1281 1459 if (!schema) return null 1282 1460 const where = conditions.map((c, i) => `${c.field} = $${i + 1}`).join(' AND ') 1283 1461 const params = conditions.map((c) => c.value) 1284 - const rows = await all(`SELECT uri FROM ${schema.tableName} WHERE ${where} LIMIT 1`, ...params) 1462 + const rows = await all(`SELECT uri FROM ${schema.tableName} WHERE ${where} LIMIT 1`, params) 1285 1463 return rows[0]?.uri || null 1286 1464 } 1287 1465 ··· 1297 1475 export async function getChildRows(childTableName: string, parentUris: string[]): Promise<Map<string, any[]>> { 1298 1476 if (parentUris.length === 0) return new Map() 1299 1477 const placeholders = parentUris.map((_, i) => `$${i + 1}`).join(',') 1300 - const rows = await all(`SELECT * FROM ${childTableName} WHERE parent_uri IN (${placeholders})`, ...parentUris) 1478 + const rows = await all(`SELECT * FROM ${childTableName} WHERE parent_uri IN (${placeholders})`, parentUris) 1301 1479 const result = new Map<string, any[]>() 1302 1480 for (const row of rows) { 1303 1481 const key = row.parent_uri as string ··· 1429 1607 export async function queryLabelsByDid(did: string): Promise<any[]> { 1430 1608 return all( 1431 1609 `SELECT * FROM _labels WHERE uri LIKE $1 AND neg = false AND (exp IS NULL OR exp > CURRENT_TIMESTAMP)`, 1432 - `at://${did}/%`, 1610 + [`at://${did}/%`], 1433 1611 ) 1434 1612 } 1435 1613 1436 1614 export async function searchAccounts(query: string, limit: number = 20): Promise<any[]> { 1437 1615 return all( 1438 1616 `SELECT did, handle, status FROM _repos WHERE did ${dialect.ilike} $1 OR handle ${dialect.ilike} $1 ORDER BY handle LIMIT $2`, 1439 - `%${query}%`, 1440 - limit, 1617 + [`%${query}%`, limit], 1441 1618 ) 1442 1619 } 1443 1620 1444 1621 export async function getAccountRecordCount(did: string): Promise<number> { 1445 1622 let total = 0 1446 1623 for (const [, schema] of schemas) { 1447 - const rows = await all(`SELECT COUNT(*) as count FROM ${schema.tableName} WHERE did = $1`, did) 1624 + const rows = await all(`SELECT COUNT(*) as count FROM ${schema.tableName} WHERE did = $1`, [did]) 1448 1625 total += Number(rows[0]?.count || 0) 1449 1626 } 1450 1627 return total ··· 1453 1630 export async function getAllRecordUrisForDid(did: string): Promise<string[]> { 1454 1631 const uris: string[] = [] 1455 1632 for (const [, schema] of schemas) { 1456 - const rows = await all(`SELECT uri FROM ${schema.tableName} WHERE did = $1`, did) 1633 + const rows = await all(`SELECT uri FROM ${schema.tableName} WHERE did = $1`, [did]) 1457 1634 uris.push(...rows.map((r: any) => r.uri)) 1458 1635 } 1459 1636 return uris 1460 1637 } 1461 1638 1462 1639 export async function isTakendownDid(did: string): Promise<boolean> { 1463 - const rows = await all(`SELECT 1 FROM _repos WHERE did = $1 AND status = 'takendown' LIMIT 1`, did) 1640 + const rows = await all(`SELECT 1 FROM _repos WHERE did = $1 AND status = 'takendown' LIMIT 1`, [did]) 1464 1641 return rows.length > 0 1465 1642 } 1466 1643 1467 1644 export async function getPreferences(did: string): Promise<Record<string, any>> { 1468 - const rows = await all(`SELECT key, value FROM _preferences WHERE did = $1`, did) 1645 + const rows = await all(`SELECT key, value FROM _preferences WHERE did = $1`, [did]) 1469 1646 const prefs: Record<string, any> = {} 1470 1647 for (const row of rows) { 1471 1648 try { ··· 1480 1657 export async function putPreference(did: string, key: string, value: any): Promise<void> { 1481 1658 await run( 1482 1659 `INSERT OR REPLACE INTO _preferences (did, key, value, updated_at) VALUES ($1, $2, $3, $4)`, 1483 - did, 1484 - key, 1485 - JSON.stringify(value), 1486 - new Date().toISOString(), 1660 + [did, key, JSON.stringify(value), new Date().toISOString()], 1487 1661 ) 1488 1662 } 1489 1663 1490 1664 export async function filterTakendownDids(dids: string[]): Promise<Set<string>> { 1491 1665 if (dids.length === 0) return new Set() 1492 1666 const placeholders = dids.map((_, i) => `$${i + 1}`).join(',') 1493 - const rows = await all(`SELECT did FROM _repos WHERE did IN (${placeholders}) AND status = 'takendown'`, ...dids) 1667 + const rows = await all(`SELECT did FROM _repos WHERE did IN (${placeholders}) AND status = 'takendown'`, dids) 1494 1668 return new Set(rows.map((r: any) => r.did)) 1495 1669 }
+7
packages/hatk/src/database/dialect.ts
··· 54 54 55 55 /** CREATE SEQUENCE support */ 56 56 supportsSequences: boolean 57 + 58 + /** SQL to get columns for a table. Returns rows with column_name/name and data_type/type. */ 59 + introspectColumnsQuery(tableName: string): string 57 60 } 58 61 59 62 export const DUCKDB_DIALECT: SqlDialect = { ··· 84 87 jaroWinklerSimilarity: 'jaro_winkler_similarity', 85 88 stringAgg: (col, sep) => `string_agg(${col}, ${sep})`, 86 89 supportsSequences: true, 90 + introspectColumnsQuery: (tableName) => 91 + `SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '${tableName}'`, 87 92 } 88 93 89 94 export const SQLITE_DIALECT: SqlDialect = { ··· 116 121 jaroWinklerSimilarity: null, 117 122 stringAgg: (col, sep) => `group_concat(${col}, ${sep})`, 118 123 supportsSequences: false, 124 + introspectColumnsQuery: (tableName) => 125 + `PRAGMA table_info("${tableName}")`, 119 126 } 120 127 121 128 export function getDialect(dialect: Dialect): SqlDialect {
+1 -1
packages/hatk/src/database/ports.ts
··· 29 29 rollback(): Promise<void> 30 30 31 31 /** Create a bulk inserter for high-throughput writes */ 32 - createBulkInserter(table: string, columns: string[]): Promise<BulkInserter> 32 + createBulkInserter(table: string, columns: string[], options?: { onConflict?: 'ignore' | 'replace'; batchSize?: number }): Promise<BulkInserter> 33 33 } 34 34 35 35 export interface BulkInserter {
+12 -6
packages/hatk/src/main.ts
··· 10 10 buildSchemas, 11 11 } from './database/schema.ts' 12 12 import { discoverViews } from './views.ts' 13 - import { initDatabase, getCursor, querySQL, getSqlDialect, getSchemaDump } from './database/db.ts' 13 + import { initDatabase, getCursor, querySQL, getSqlDialect, getSchemaDump, migrateSchema } from './database/db.ts' 14 14 import { createAdapter } from './database/adapter-factory.ts' 15 15 import { getDialect } from './database/dialect.ts' 16 16 import { setSearchPort } from './database/fts.ts' ··· 90 90 logMemory('after-db-init') 91 91 log(`[main] Database initialized (${config.databaseEngine}, ${config.database === ':memory:' ? 'in-memory' : config.database})`) 92 92 93 - // Write db/schema.sql 93 + // Auto-migrate schema if lexicons changed 94 + const migrationChanges = await migrateSchema(schemas) 95 + if (migrationChanges.length > 0) { 96 + log(`[main] Applied ${migrationChanges.length} schema migration(s)`) 97 + } 98 + 99 + // 3b. Run setup hooks (after DB init, before server) 100 + await initSetup(resolve(configDir, 'setup')) 101 + 102 + // Write db/schema.sql (after setup, so setup-created tables are included) 94 103 try { 95 104 const schemaDir = resolve(configDir, 'db') 96 105 mkdirSync(schemaDir, { recursive: true }) ··· 101 110 ) 102 111 log(`[main] Schema written to db/schema.sql`) 103 112 } catch {} 104 - 105 - // 3b. Run setup hooks (after DB init, before server) 106 - await initSetup(resolve(configDir, 'setup')) 107 113 108 114 // Detect orphaned tables 109 115 try { ··· 163 169 }) 164 170 .then((recordCount) => { 165 171 log('[main] FTS indexes ready') 166 - if (recordCount > 0) { 172 + if (recordCount > 0 && !process.env.DEV_MODE) { 167 173 logMemory('after-backfill') 168 174 log('[main] Restarting to reclaim memory...') 169 175 process.exit(1)
+30 -43
packages/hatk/src/oauth/db.ts
··· 75 75 export async function storeServerKey(kid: string, privateKey: string, publicKey: string): Promise<void> { 76 76 await runSQL( 77 77 'INSERT OR REPLACE INTO _oauth_keys (kid, private_key, public_key) VALUES ($1, $2, $3)', 78 - kid, 79 - privateKey, 80 - publicKey, 78 + [kid, privateKey, publicKey], 81 79 ) 82 80 } 83 81 ··· 105 103 await runSQL( 106 104 `INSERT INTO _oauth_requests (request_uri, client_id, redirect_uri, scope, state, code_challenge, code_challenge_method, dpop_jkt, pds_request_uri, pds_auth_server, pds_code_verifier, pds_state, did, login_hint, expires_at) 107 105 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)`, 108 - requestUri, 109 - data.clientId, 110 - data.redirectUri, 111 - data.scope || null, 112 - data.state || null, 113 - data.codeChallenge, 114 - data.codeChallengeMethod || 'S256', 115 - data.dpopJkt, 116 - data.pdsRequestUri || null, 117 - data.pdsAuthServer || null, 118 - data.pdsCodeVerifier || null, 119 - data.pdsState || null, 120 - data.did || null, 121 - data.loginHint || null, 122 - data.expiresAt, 106 + [ 107 + requestUri, 108 + data.clientId, 109 + data.redirectUri, 110 + data.scope || null, 111 + data.state || null, 112 + data.codeChallenge, 113 + data.codeChallengeMethod || 'S256', 114 + data.dpopJkt, 115 + data.pdsRequestUri || null, 116 + data.pdsAuthServer || null, 117 + data.pdsCodeVerifier || null, 118 + data.pdsState || null, 119 + data.did || null, 120 + data.loginHint || null, 121 + data.expiresAt, 122 + ], 123 123 ) 124 124 } 125 125 ··· 132 132 } 133 133 134 134 export async function deleteOAuthRequest(requestUri: string): Promise<void> { 135 - await runSQL('DELETE FROM _oauth_requests WHERE request_uri = $1', requestUri) 135 + await runSQL('DELETE FROM _oauth_requests WHERE request_uri = $1', [requestUri]) 136 136 } 137 137 138 138 // --- Authorization Codes --- ··· 140 140 export async function storeAuthCode(code: string, requestUri: string): Promise<void> { 141 141 await runSQL( 142 142 'INSERT INTO _oauth_codes (code, request_uri, created_at) VALUES ($1, $2, $3)', 143 - code, 144 - requestUri, 145 - Math.floor(Date.now() / 1000), 143 + [code, requestUri, Math.floor(Date.now() / 1000)], 146 144 ) 147 145 } 148 146 149 147 export async function consumeAuthCode(code: string): Promise<string | null> { 150 148 const rows = await querySQL('SELECT request_uri FROM _oauth_codes WHERE code = $1', [code]) 151 149 if (rows.length === 0) return null 152 - await runSQL('DELETE FROM _oauth_codes WHERE code = $1', code) 150 + await runSQL('DELETE FROM _oauth_codes WHERE code = $1', [code]) 153 151 return rows[0].request_uri as string 154 152 } 155 153 ··· 168 166 await runSQL( 169 167 `INSERT OR REPLACE INTO _oauth_sessions (did, pds_endpoint, access_token, refresh_token, dpop_jkt, token_expires_at, updated_at) 170 168 VALUES ($1,$2,$3,$4,$5,$6,CURRENT_TIMESTAMP)`, 171 - did, 172 - data.pdsEndpoint, 173 - data.accessToken, 174 - data.refreshToken || null, 175 - data.dpopJkt, 176 - data.tokenExpiresAt || null, 169 + [did, data.pdsEndpoint, data.accessToken, data.refreshToken || null, data.dpopJkt, data.tokenExpiresAt || null], 177 170 ) 178 171 } 179 172 ··· 183 176 } 184 177 185 178 export async function deleteSession(did: string): Promise<void> { 186 - await runSQL('DELETE FROM _oauth_sessions WHERE did = $1', did) 179 + await runSQL('DELETE FROM _oauth_sessions WHERE did = $1', [did]) 187 180 } 188 181 189 182 // --- Refresh Tokens --- ··· 203 196 await runSQL( 204 197 `INSERT INTO _oauth_refresh_tokens (token, client_id, did, dpop_jkt, scope, created_at, expires_at) 205 198 VALUES ($1,$2,$3,$4,$5,$6,$7)`, 206 - token, 207 - data.clientId, 208 - data.did, 209 - data.dpopJkt, 210 - data.scope || null, 211 - now, 212 - expiresAt, 199 + [token, data.clientId, data.did, data.dpopJkt, data.scope || null, now, expiresAt], 213 200 ) 214 201 } 215 202 ··· 219 206 } 220 207 221 208 export async function revokeRefreshToken(token: string): Promise<void> { 222 - await runSQL('UPDATE _oauth_refresh_tokens SET revoked = 1 WHERE token = $1', token) 209 + await runSQL('UPDATE _oauth_refresh_tokens SET revoked = 1 WHERE token = $1', [token]) 223 210 } 224 211 225 212 // --- DPoP JTI Replay Protection --- ··· 227 214 export async function checkAndStoreDpopJti(jti: string, expiresAt: number): Promise<boolean> { 228 215 const rows = await querySQL('SELECT 1 FROM _oauth_dpop_jtis WHERE jti = $1', [jti]) 229 216 if (rows.length > 0) return false 230 - await runSQL('INSERT INTO _oauth_dpop_jtis (jti, expires_at) VALUES ($1, $2)', jti, expiresAt) 217 + await runSQL('INSERT INTO _oauth_dpop_jtis (jti, expires_at) VALUES ($1, $2)', [jti, expiresAt]) 231 218 return true 232 219 } 233 220 234 221 export async function cleanupExpiredOAuth(): Promise<void> { 235 222 const now = Math.floor(Date.now() / 1000) 236 - await runSQL('DELETE FROM _oauth_dpop_jtis WHERE expires_at < $1', now) 237 - await runSQL('DELETE FROM _oauth_requests WHERE expires_at < $1', now) 238 - await runSQL('DELETE FROM _oauth_codes WHERE created_at < $1', now - 600) 223 + await runSQL('DELETE FROM _oauth_dpop_jtis WHERE expires_at < $1', [now]) 224 + await runSQL('DELETE FROM _oauth_requests WHERE expires_at < $1', [now]) 225 + await runSQL('DELETE FROM _oauth_codes WHERE created_at < $1', [now - 600]) 239 226 await runSQL( 240 227 'DELETE FROM _oauth_refresh_tokens WHERE revoked = 1 OR (expires_at IS NOT NULL AND expires_at < $1)', 241 - now, 228 + [now], 242 229 ) 243 230 }
+1 -1
packages/hatk/src/oauth/server.ts
··· 438 438 // Update the request with the DID (in case it wasn't set during PAR) 439 439 if (!request.did && did) { 440 440 const { runSQL } = await import('../database/db.ts') 441 - await runSQL('UPDATE _oauth_requests SET did = $1 WHERE request_uri = $2', did, request.request_uri) 441 + await runSQL('UPDATE _oauth_requests SET did = $1 WHERE request_uri = $2', [did, request.request_uri]) 442 442 } 443 443 444 444 // Build redirect back to client
+6 -3
packages/hatk/src/setup.ts
··· 25 25 import { resolve, relative } from 'node:path' 26 26 import { readdirSync, statSync } from 'node:fs' 27 27 import { log } from './logger.ts' 28 - import { querySQL, runSQL } from './database/db.ts' 28 + import { querySQL, runSQL, runBatch, createBulkInserterSQL } from './database/db.ts' 29 + import type { BulkInserter } from './database/ports.ts' 29 30 30 31 /** Context passed to each setup script's handler function. */ 31 32 export interface SetupContext { 32 33 db: { 33 34 query: (sql: string, params?: any[]) => Promise<any[]> 34 - run: (sql: string, ...params: any[]) => Promise<void> 35 + run: (sql: string, params?: any[]) => Promise<void> 36 + runBatch: (operations: Array<{ sql: string; params: any[] }>) => Promise<void> 37 + createBulkInserter: (table: string, columns: string[], options?: { onConflict?: 'ignore' | 'replace'; batchSize?: number }) => Promise<BulkInserter> 35 38 } 36 39 } 37 40 ··· 75 78 } 76 79 77 80 const ctx: SetupContext = { 78 - db: { query: querySQL, run: runSQL }, 81 + db: { query: querySQL, run: runSQL, runBatch, createBulkInserter: createBulkInserterSQL }, 79 82 } 80 83 81 84 log(`[setup] running: ${name}`)
+4 -10
packages/hatk/src/test.ts
··· 27 27 export interface TestContext { 28 28 db: { 29 29 query: (sql: string, params?: any[]) => Promise<any[]> 30 - run: (sql: string, ...params: any[]) => Promise<void> 30 + run: (sql: string, params?: any[]) => Promise<void> 31 31 } 32 32 loadFixtures: (dir?: string) => Promise<void> 33 33 loadFeed: (name: string) => { generate: (ctx: FeedContext) => Promise<any> } ··· 143 143 const row = interpolateHelpers(rec) 144 144 await runSQL( 145 145 `INSERT OR IGNORE INTO _repos (did, status, handle, backfilled_at) VALUES ($1, $2, $3, $4)`, 146 - row.did, 147 - row.status || 'active', 148 - row.handle || row.did.split(':').pop() + '.test', 149 - new Date().toISOString(), 146 + [row.did, row.status || 'active', row.handle || row.did.split(':').pop() + '.test', new Date().toISOString()], 150 147 ) 151 148 } 152 149 } ··· 174 171 const placeholders = keys.map((_, i) => `$${i + 1}`).join(', ') 175 172 await runSQL( 176 173 `INSERT INTO "${tableName}" (${keys.map((k) => `"${k}"`).join(', ')}) VALUES (${placeholders})`, 177 - ...vals, 174 + vals, 178 175 ) 179 176 } 180 177 continue ··· 194 191 seenDids.add(did) 195 192 await runSQL( 196 193 `INSERT OR IGNORE INTO _repos (did, status, handle, backfilled_at) VALUES ($1, $2, $3, $4)`, 197 - did, 198 - 'active', 199 - did.split(':').pop() + '.test', 200 - new Date().toISOString(), 194 + [did, 'active', did.split(':').pop() + '.test', new Date().toISOString()], 201 195 ) 202 196 } 203 197 await insertRecord(tableName, uri, cid, did, fields)
+10
packages/hatk/src/vite-plugin.ts
··· 20 20 server: { 21 21 host: '127.0.0.1', 22 22 port: devPort, 23 + watch: { 24 + ignored: ['**/db/**', '**/data/**'], 25 + }, 23 26 proxy: { 24 27 '/xrpc': rule, 25 28 '/oauth/par': rule, ··· 78 81 DEV_MODE: '1', 79 82 }, 80 83 }) 84 + 85 + // Suppress ECONNREFUSED proxy errors while backend is booting 86 + const origError = server.config.logger.error 87 + server.config.logger.error = (msg: string, opts?: any) => { 88 + if (typeof msg === 'string' && msg.includes('ECONNREFUSED')) return 89 + origError(msg, opts) 90 + } 81 91 82 92 server.httpServer?.on('close', () => { 83 93 serverProcess?.kill()