A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 374 lines 10 kB view raw
1// db-migrate copies all tables and data from a local SQLite database to a 2// remote libsql database (e.g. Bunny Database, Turso). It reads the schema 3// from sqlite_master, creates tables on the remote, and inserts all rows 4// in batches. Generic — works with any SQLite DB (appview, hold, etc.). 5// 6// Usage: 7// 8// go run ./cmd/db-migrate --local /path/to/local.db --remote "libsql://..." --token "..." 9// go run ./cmd/db-migrate --local /path/to/local.db --remote "libsql://..." --token "..." --skip-existing 10package main 11 12import ( 13 "database/sql" 14 "flag" 15 "fmt" 16 "log" 17 "os" 18 "strings" 19 "time" 20 21 _ "github.com/tursodatabase/go-libsql" 22) 23 24func main() { 25 localPath := flag.String("local", "", "Path to local SQLite database file") 26 remoteURL := flag.String("remote", "", "Remote libsql URL (libsql://...)") 27 authToken := flag.String("token", "", "Auth token for remote database") 28 skipExisting := flag.Bool("skip-existing", false, "Skip tables that already have data on remote") 29 batchSize := flag.Int("batch-size", 100, "Number of rows per INSERT batch") 30 dryRun := flag.Bool("dry-run", false, "Show what would be migrated without writing") 31 flag.Parse() 32 33 if *localPath == "" || *remoteURL == "" || *authToken == "" { 34 flag.Usage() 35 os.Exit(1) 36 } 37 38 // Open local database read-only 39 localDSN := *localPath 40 if !strings.HasPrefix(localDSN, "file:") { 41 localDSN = "file:" + localDSN 42 } 43 localDSN += "?mode=ro" 44 45 localDB, err := sql.Open("libsql", localDSN) 46 if err != nil { 47 log.Fatalf("Failed to open local database: %v", err) 48 } 49 defer localDB.Close() 50 51 if err := localDB.Ping(); err != nil { 52 log.Fatalf("Failed to ping local database: %v", err) 53 } 54 55 // Open remote database 56 remoteDSN := fmt.Sprintf("%s?authToken=%s", *remoteURL, *authToken) 57 remoteDB, err := sql.Open("libsql", remoteDSN) 58 if err != nil { 59 log.Fatalf("Failed to open remote database: %v", err) 60 } 61 defer remoteDB.Close() 62 63 if err := remoteDB.Ping(); err != nil { 64 log.Fatalf("Failed to ping remote database: %v", err) 65 } 66 // Get all user tables from local 67 tables, err := getTables(localDB) 68 if err != nil { 69 log.Fatalf("Failed to list tables: %v", err) 70 } 71 72 if len(tables) == 0 { 73 log.Println("No tables found in local database") 74 return 75 } 76 77 fmt.Printf("Found %d tables to migrate\n\n", len(tables)) 78 79 start := time.Now() 80 81 if !*dryRun { 82 // Phase 1: Create all tables first so FK references resolve 83 fmt.Println("Creating tables...") 84 for _, t := range tables { 85 if err := createTable(remoteDB, t); err != nil { 86 log.Fatalf("Failed to create table %s: %v", t.name, err) 87 } 88 } 89 fmt.Println() 90 } 91 92 // Phase 2: Copy data 93 fmt.Println("Migrating data...") 94 totalRows := 0 95 for _, t := range tables { 96 count, err := migrateTable(localDB, remoteDB, t, *batchSize, *skipExisting, *dryRun) 97 if err != nil { 98 log.Fatalf("Failed to migrate table %s: %v", t.name, err) 99 } 100 totalRows += count 101 } 102 103 if !*dryRun { 104 // Phase 3: Create indexes after data is loaded (faster than indexing during insert) 105 fmt.Println("\nCreating indexes...") 106 for _, t := range tables { 107 if err := createIndexes(localDB, remoteDB, t.name); err != nil { 108 log.Fatalf("Failed to create indexes for %s: %v", t.name, err) 109 } 110 } 111 112 } 113 114 fmt.Printf("\nDone. %d total rows across %d tables in %s\n", totalRows, len(tables), time.Since(start).Round(time.Millisecond)) 115 if *dryRun { 116 fmt.Println("(dry run — nothing was written)") 117 } 118} 119 120type tableInfo struct { 121 name string 122 ddl string 123} 124 125func getTables(db *sql.DB) ([]tableInfo, error) { 126 rows, err := db.Query(` 127 SELECT name, sql FROM sqlite_master 128 WHERE type = 'table' 129 AND name NOT LIKE 'sqlite_%' 130 AND name NOT LIKE '_litestream_%' 131 AND name NOT LIKE 'libsql_%' 132 ORDER BY name 133 `) 134 if err != nil { 135 return nil, err 136 } 137 defer rows.Close() 138 139 var tables []tableInfo 140 for rows.Next() { 141 var t tableInfo 142 var ddl sql.NullString 143 if err := rows.Scan(&t.name, &ddl); err != nil { 144 return nil, err 145 } 146 if ddl.Valid { 147 t.ddl = ddl.String 148 } 149 tables = append(tables, t) 150 } 151 if err := rows.Err(); err != nil { 152 return nil, err 153 } 154 155 // Sort tables so those referenced by foreign keys come first. 156 // Tables with FK references depend on other tables existing and 157 // having data, so we insert referenced tables first. 158 return topoSortTables(db, tables) 159} 160 161// topoSortTables orders tables so that referenced (parent) tables come before 162// tables that reference them via foreign keys. 163func topoSortTables(db *sql.DB, tables []tableInfo) ([]tableInfo, error) { 164 byName := make(map[string]tableInfo, len(tables)) 165 for _, t := range tables { 166 byName[t.name] = t 167 } 168 169 // Build dependency graph: table -> tables it references 170 deps := make(map[string][]string) 171 for _, t := range tables { 172 fkRows, err := db.Query(fmt.Sprintf("PRAGMA foreign_key_list([%s])", t.name)) 173 if err != nil { 174 // PRAGMA might not return rows for tables without FKs 175 continue 176 } 177 seen := make(map[string]bool) 178 for fkRows.Next() { 179 var id, seq int 180 var table, from, to, onUpdate, onDelete, match string 181 if err := fkRows.Scan(&id, &seq, &table, &from, &to, &onUpdate, &onDelete, &match); err != nil { 182 fkRows.Close() 183 return nil, err 184 } 185 if !seen[table] { 186 deps[t.name] = append(deps[t.name], table) 187 seen[table] = true 188 } 189 } 190 fkRows.Close() 191 } 192 193 // Topological sort (Kahn's algorithm) 194 visited := make(map[string]bool) 195 var sorted []tableInfo 196 var visit func(name string) 197 visit = func(name string) { 198 if visited[name] { 199 return 200 } 201 visited[name] = true 202 for _, dep := range deps[name] { 203 visit(dep) 204 } 205 if t, ok := byName[name]; ok { 206 sorted = append(sorted, t) 207 } 208 } 209 for _, t := range tables { 210 visit(t.name) 211 } 212 return sorted, nil 213} 214 215func getIndexes(db *sql.DB, tableName string) ([]string, error) { 216 rows, err := db.Query(` 217 SELECT sql FROM sqlite_master 218 WHERE type = 'index' 219 AND tbl_name = ? 220 AND sql IS NOT NULL 221 `, tableName) 222 if err != nil { 223 return nil, err 224 } 225 defer rows.Close() 226 227 var indexes []string 228 for rows.Next() { 229 var ddl string 230 if err := rows.Scan(&ddl); err != nil { 231 return nil, err 232 } 233 indexes = append(indexes, ddl) 234 } 235 return indexes, rows.Err() 236} 237 238func createTable(remoteDB *sql.DB, t tableInfo) error { 239 if t.ddl == "" { 240 return nil 241 } 242 ddl := t.ddl 243 if !strings.Contains(strings.ToUpper(ddl), "IF NOT EXISTS") { 244 ddl = strings.Replace(ddl, "CREATE TABLE", "CREATE TABLE IF NOT EXISTS", 1) 245 } 246 if _, err := remoteDB.Exec(ddl); err != nil { 247 return fmt.Errorf("create table %s: %w", t.name, err) 248 } 249 fmt.Printf(" %s\n", t.name) 250 return nil 251} 252 253func createIndexes(localDB, remoteDB *sql.DB, tableName string) error { 254 indexes, err := getIndexes(localDB, tableName) 255 if err != nil { 256 return err 257 } 258 for _, idx := range indexes { 259 ddl := idx 260 if !strings.Contains(strings.ToUpper(ddl), "IF NOT EXISTS") { 261 ddl = strings.Replace(ddl, "CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1) 262 ddl = strings.Replace(ddl, "CREATE UNIQUE INDEX", "CREATE UNIQUE INDEX IF NOT EXISTS", 1) 263 } 264 if _, err := remoteDB.Exec(ddl); err != nil { 265 return fmt.Errorf("create index on %s: %w", tableName, err) 266 } 267 } 268 if len(indexes) > 0 { 269 fmt.Printf(" %s: %d indexes\n", tableName, len(indexes)) 270 } 271 return nil 272} 273 274func migrateTable(localDB, remoteDB *sql.DB, t tableInfo, batchSize int, skipExisting, dryRun bool) (int, error) { 275 var localCount int 276 if err := localDB.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM [%s]", t.name)).Scan(&localCount); err != nil { 277 return 0, fmt.Errorf("count local rows: %w", err) 278 } 279 280 if localCount == 0 { 281 fmt.Printf(" %-30s %6d rows (empty)\n", t.name, 0) 282 return 0, nil 283 } 284 285 if dryRun { 286 fmt.Printf(" %-30s %6d rows (would migrate)\n", t.name, localCount) 287 return localCount, nil 288 } 289 290 if skipExisting { 291 var remoteCount int 292 if err := remoteDB.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM [%s]", t.name)).Scan(&remoteCount); err != nil { 293 return 0, fmt.Errorf("count remote rows: %w", err) 294 } 295 if remoteCount > 0 { 296 fmt.Printf(" %-30s %6d rows (skipped, %d on remote)\n", t.name, localCount, remoteCount) 297 return 0, nil 298 } 299 } 300 301 rows, err := localDB.Query(fmt.Sprintf("SELECT * FROM [%s]", t.name)) 302 if err != nil { 303 return 0, fmt.Errorf("select: %w", err) 304 } 305 defer rows.Close() 306 307 cols, err := rows.Columns() 308 if err != nil { 309 return 0, fmt.Errorf("columns: %w", err) 310 } 311 312 placeholders := make([]string, len(cols)) 313 quotedCols := make([]string, len(cols)) 314 for i, c := range cols { 315 placeholders[i] = "?" 316 quotedCols[i] = fmt.Sprintf("[%s]", c) 317 } 318 insertPrefix := fmt.Sprintf("INSERT INTO [%s] (%s) VALUES ", t.name, strings.Join(quotedCols, ", ")) 319 rowPlaceholder := "(" + strings.Join(placeholders, ", ") + ")" 320 321 inserted := 0 322 batch := make([][]any, 0, batchSize) 323 324 for rows.Next() { 325 vals := make([]any, len(cols)) 326 ptrs := make([]any, len(cols)) 327 for i := range vals { 328 ptrs[i] = &vals[i] 329 } 330 if err := rows.Scan(ptrs...); err != nil { 331 return 0, fmt.Errorf("scan: %w", err) 332 } 333 batch = append(batch, vals) 334 335 if len(batch) >= batchSize { 336 if err := insertBatch(remoteDB, insertPrefix, rowPlaceholder, batch); err != nil { 337 return 0, fmt.Errorf("insert batch at row %d: %w", inserted, err) 338 } 339 inserted += len(batch) 340 batch = batch[:0] 341 } 342 } 343 344 if len(batch) > 0 { 345 if err := insertBatch(remoteDB, insertPrefix, rowPlaceholder, batch); err != nil { 346 return 0, fmt.Errorf("insert final batch: %w", err) 347 } 348 inserted += len(batch) 349 } 350 351 if err := rows.Err(); err != nil { 352 return 0, fmt.Errorf("rows iteration: %w", err) 353 } 354 355 fmt.Printf(" %-30s %6d rows migrated\n", t.name, inserted) 356 return inserted, nil 357} 358 359func insertBatch(db *sql.DB, prefix, rowPlaceholder string, batch [][]any) error { 360 if len(batch) == 0 { 361 return nil 362 } 363 364 placeholders := make([]string, len(batch)) 365 var args []any 366 for i, row := range batch { 367 placeholders[i] = rowPlaceholder 368 args = append(args, row...) 369 } 370 371 query := prefix + strings.Join(placeholders, ", ") 372 _, err := db.Exec(query, args...) 373 return err 374}