GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 713 lines 26 kB view raw
1//! Sync from Turso to local SQLite 2//! Full sync on startup, incremental sync every 5 minutes 3 4const std = @import("std"); 5const zqlite = @import("zqlite"); 6const Allocator = std.mem.Allocator; 7const TursoClient = @import("TursoClient.zig"); 8const LocalDb = @import("LocalDb.zig"); 9 10const log = std.log.scoped(.sync); 11 12const BATCH_SIZE = 2000; 13const SYNC_INTERVAL_S = 300; // 5 minutes 14const TOMBSTONE_RETENTION_S = 7 * 24 * 3600; // 7 days — must match cron.ts pruning 15 16/// Full sync: fetch all searchable actors from Turso using keyset pagination. 17/// Does NOT run stale cleanup — that's handled by incremental sync (tombstones + 18/// "became unsearchable" queries). This avoids races with the dual-write path. 19/// When `wipe` is true, deletes all local data first (used by stale rebuild path). 20pub fn fullSync(turso: *TursoClient, local: *LocalDb, wipe: bool) !void { 21 log.info("starting full sync (wipe={})...", .{wipe}); 22 23 const conn = local.getConn() orelse return error.LocalNotOpen; 24 25 // check if a previous full sync completed successfully 26 const was_completed = blk: { 27 local.lock(); 28 defer local.unlock(); 29 const row = conn.row( 30 "SELECT value FROM sync_meta WHERE key = 'sync_complete'", 31 .{}, 32 ) catch break :blk false; 33 if (row) |r| { 34 defer r.deinit(); 35 break :blk std.mem.eql(u8, r.text(0), "1"); 36 } 37 break :blk false; 38 }; 39 40 if (wipe or !was_completed) { 41 // bootstrap path: use staging tables for fast bulk load 42 return fullSyncBootstrap(turso, local, conn, wipe); 43 } 44 45 // re-sync path: previous sync completed, keep serving while re-syncing 46 local.setReady(true); 47 log.info("previous sync complete, keeping ready during re-sync", .{}); 48 return fullSyncResync(turso, local, conn); 49} 50 51/// Bootstrap path: load into staging tables (no indexes), build indexes in bulk, 52/// then atomically swap into place. Live schema is never in a degraded state. 53fn fullSyncBootstrap(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn, wipe: bool) !void { 54 const total_t0 = std.time.milliTimestamp(); 55 local.setReady(false); 56 57 // close read connection during bootstrap — not serving traffic, 58 // avoids WAL reader lock interference with DDL/index builds 59 local.closeReadConn(); 60 errdefer local.reopenReadConn() catch {}; 61 62 if (wipe) { 63 local.lock(); 64 defer local.unlock(); 65 conn.exec("DELETE FROM sync_meta", .{}) catch {}; 66 } 67 68 // clean up any leftover staging tables from a previous failed attempt 69 { 70 local.lock(); 71 defer local.unlock(); 72 conn.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {}; 73 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 74 } 75 76 // create staging table — no PK, no indexes for fast sequential appends 77 { 78 local.lock(); 79 defer local.unlock(); 80 conn.exec( 81 \\CREATE TABLE actors_stage ( 82 \\ did TEXT NOT NULL, 83 \\ handle TEXT NOT NULL DEFAULT '', 84 \\ display_name TEXT DEFAULT '', 85 \\ avatar_url TEXT DEFAULT '', 86 \\ hidden INTEGER NOT NULL DEFAULT 0, 87 \\ labels TEXT NOT NULL DEFAULT '[]', 88 \\ created_at TEXT DEFAULT '', 89 \\ associated TEXT DEFAULT '{}' 90 \\) 91 , .{}) catch |err| { 92 log.err("failed to create actors_stage: {}", .{err}); 93 return err; 94 }; 95 } 96 97 // keyset pagination: fetch rows WHERE rowid > last_rowid ORDER BY rowid 98 var actor_count: usize = 0; 99 var error_count: usize = 0; 100 var last_rowid: i64 = 0; 101 var had_turso_error = false; 102 const load_t0 = std.time.milliTimestamp(); 103 104 while (true) { 105 var rowid_buf: [20]u8 = undefined; 106 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 107 108 const t0 = std.time.milliTimestamp(); 109 110 // fetch from turso (no lock held) 111 var result = turso.query( 112 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid 113 \\FROM actors WHERE handle != '' AND rowid > ? 114 \\ORDER BY rowid LIMIT 2000 115 , &.{rowid_str}) catch |err| { 116 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 117 had_turso_error = true; 118 break; 119 }; 120 defer result.deinit(); 121 122 const t1 = std.time.milliTimestamp(); 123 124 if (result.rows.len == 0) break; 125 126 // write batch to staging table — no indexes, fast sequential appends 127 { 128 local.lock(); 129 defer local.unlock(); 130 conn.exec("BEGIN", .{}) catch {}; 131 for (result.rows) |row| { 132 insertActorStage(conn, row) catch |err| { 133 log.err("insert actor failed: {}", .{err}); 134 error_count += 1; 135 continue; 136 }; 137 actor_count += 1; 138 } 139 conn.exec("COMMIT", .{}) catch {}; 140 } 141 142 const t2 = std.time.milliTimestamp(); 143 144 // advance cursor to last row's rowid (column 8) 145 last_rowid = result.rows[result.rows.len - 1].int(8); 146 147 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 148 result.rows.len, last_rowid, actor_count, 149 t1 - t0, t2 - t1, 150 }); 151 } 152 153 if (had_turso_error or error_count > 0) { 154 log.warn("bootstrap incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error }); 155 // clean up staging table 156 { 157 local.lock(); 158 defer local.unlock(); 159 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 160 } 161 // restore read connection so health endpoint works 162 local.reopenReadConn() catch {}; 163 return; 164 } 165 166 const load_ms = std.time.milliTimestamp() - load_t0; 167 168 // checkpoint WAL before heavy DDL — clean slate for index builds 169 { 170 local.lock(); 171 defer local.unlock(); 172 conn.exec("PRAGMA wal_checkpoint(TRUNCATE)", .{}) catch {}; 173 } 174 175 // build indexes on staging table 176 log.info("building indexes on {d} rows...", .{actor_count}); 177 const idx_did_t0 = std.time.milliTimestamp(); 178 { 179 local.lock(); 180 defer local.unlock(); 181 conn.exec("CREATE UNIQUE INDEX idx_stage_did ON actors_stage(did)", .{}) catch |err| { 182 log.err("failed to create unique index on staging: {}", .{err}); 183 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 184 return err; 185 }; 186 } 187 const idx_did_ms = std.time.milliTimestamp() - idx_did_t0; 188 189 const idx_handle_t0 = std.time.milliTimestamp(); 190 { 191 local.lock(); 192 defer local.unlock(); 193 conn.exec("CREATE INDEX idx_stage_handle ON actors_stage(handle COLLATE NOCASE)", .{}) catch |err| { 194 log.err("failed to create handle index on staging: {}", .{err}); 195 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 196 return err; 197 }; 198 } 199 const idx_handle_ms = std.time.milliTimestamp() - idx_handle_t0; 200 201 // swap actors table first, then build FTS with final name. 202 // FTS5 virtual tables use shadow tables (_content, _data, _idx, etc.) 203 // that don't reliably rename with ALTER TABLE RENAME — so we never rename FTS5. 204 { 205 local.lock(); 206 defer local.unlock(); 207 conn.exec("BEGIN", .{}) catch {}; 208 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 209 conn.exec("DROP TABLE IF EXISTS actors", .{}) catch {}; 210 conn.exec("ALTER TABLE actors_stage RENAME TO actors", .{}) catch |err| { 211 log.err("swap failed (rename actors_stage): {}", .{err}); 212 conn.exec("ROLLBACK", .{}) catch {}; 213 return err; 214 }; 215 conn.exec("COMMIT", .{}) catch |err| { 216 log.err("swap commit failed: {}", .{err}); 217 return err; 218 }; 219 } 220 221 // build FTS with its final name — no rename needed 222 const fts_create_t0 = std.time.milliTimestamp(); 223 { 224 local.lock(); 225 defer local.unlock(); 226 conn.exec( 227 \\CREATE VIRTUAL TABLE actors_fts USING fts5( 228 \\ did UNINDEXED, handle, display_name, 229 \\ tokenize='unicode61 remove_diacritics 2' 230 \\) 231 , .{}) catch |err| { 232 log.err("failed to create FTS table: {}", .{err}); 233 return err; 234 }; 235 } 236 const fts_create_ms = std.time.milliTimestamp() - fts_create_t0; 237 238 const fts_pop_t0 = std.time.milliTimestamp(); 239 { 240 local.lock(); 241 defer local.unlock(); 242 conn.exec( 243 \\INSERT INTO actors_fts (did, handle, display_name) 244 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 245 , .{}) catch |err| { 246 log.err("FTS populate failed: {}", .{err}); 247 return err; 248 }; 249 } 250 const fts_pop_ms = std.time.milliTimestamp() - fts_pop_t0; 251 252 // post-swap: optimize, record completion + actor count, checkpoint 253 { 254 local.lock(); 255 defer local.unlock(); 256 conn.exec("PRAGMA optimize", .{}) catch {}; 257 258 var ts_buf: [20]u8 = undefined; 259 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 260 conn.exec( 261 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 262 .{ts_str}, 263 ) catch {}; 264 conn.exec( 265 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')", 266 .{}, 267 ) catch {}; 268 269 // cache actor count so health endpoint doesn't need COUNT(*) 270 var count_buf: [20]u8 = undefined; 271 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 272 conn.exec( 273 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 274 .{count_str}, 275 ) catch {}; 276 277 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 278 } 279 280 const total_ms = std.time.milliTimestamp() - total_t0; 281 282 // reopen read connection before serving traffic 283 local.reopenReadConn() catch |err| { 284 log.err("failed to reopen read conn after bootstrap: {}", .{err}); 285 }; 286 287 local.setReady(true); 288 log.info("bootstrap complete — total_rows={d} load_ms={d} idx_did_ms={d} idx_handle_ms={d} fts_create_ms={d} fts_populate_ms={d} total_ms={d}", .{ 289 actor_count, load_ms, idx_did_ms, idx_handle_ms, fts_create_ms, fts_pop_ms, total_ms, 290 }); 291} 292 293/// Re-sync path: previous sync completed, update live table directly. 294/// Only processes actors that may have changed — index maintenance cost is negligible. 295fn fullSyncResync(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn) !void { 296 var actor_count: usize = 0; 297 var error_count: usize = 0; 298 var last_rowid: i64 = 0; 299 var had_turso_error = false; 300 301 while (true) { 302 var rowid_buf: [20]u8 = undefined; 303 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 304 305 const t0 = std.time.milliTimestamp(); 306 307 var result = turso.query( 308 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid 309 \\FROM actors WHERE handle != '' AND rowid > ? 310 \\ORDER BY rowid LIMIT 2000 311 , &.{rowid_str}) catch |err| { 312 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 313 had_turso_error = true; 314 break; 315 }; 316 defer result.deinit(); 317 318 const t1 = std.time.milliTimestamp(); 319 320 if (result.rows.len == 0) break; 321 322 { 323 local.lock(); 324 defer local.unlock(); 325 conn.exec("BEGIN", .{}) catch {}; 326 for (result.rows) |row| { 327 insertActorOnly(conn, row) catch |err| { 328 log.err("insert actor failed: {}", .{err}); 329 error_count += 1; 330 continue; 331 }; 332 actor_count += 1; 333 } 334 conn.exec("COMMIT", .{}) catch {}; 335 } 336 337 const t2 = std.time.milliTimestamp(); 338 339 last_rowid = result.rows[result.rows.len - 1].int(8); 340 341 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 342 result.rows.len, last_rowid, actor_count, 343 t1 - t0, t2 - t1, 344 }); 345 } 346 347 if (had_turso_error or error_count > 0) { 348 log.warn("full sync incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error }); 349 return; 350 } 351 352 // rebuild FTS from scratch (DROP + CREATE — never reuse a potentially 353 // broken FTS table from a previous failed rename) 354 { 355 log.info("building FTS index for {d} actors...", .{actor_count}); 356 const fts_t0 = std.time.milliTimestamp(); 357 local.lock(); 358 defer local.unlock(); 359 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 360 conn.exec( 361 \\CREATE VIRTUAL TABLE actors_fts USING fts5( 362 \\ did UNINDEXED, handle, display_name, 363 \\ tokenize='unicode61 remove_diacritics 2' 364 \\) 365 , .{}) catch |err| { 366 log.err("failed to create FTS table: {}", .{err}); 367 }; 368 conn.exec( 369 \\INSERT INTO actors_fts (did, handle, display_name) 370 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 371 , .{}) catch |err| { 372 log.err("FTS bulk insert failed: {}", .{err}); 373 }; 374 const fts_t1 = std.time.milliTimestamp(); 375 log.info("FTS index built in {d}ms", .{fts_t1 - fts_t0}); 376 } 377 378 // record sync time + mark complete + actor count 379 { 380 local.lock(); 381 defer local.unlock(); 382 var ts_buf: [20]u8 = undefined; 383 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 384 conn.exec( 385 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 386 .{ts_str}, 387 ) catch {}; 388 conn.exec( 389 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')", 390 .{}, 391 ) catch {}; 392 393 var count_buf: [20]u8 = undefined; 394 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 395 conn.exec( 396 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 397 .{count_str}, 398 ) catch {}; 399 400 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 401 } 402 403 local.setReady(true); 404 log.info("full sync complete — {d} actors", .{actor_count}); 405} 406 407/// Incremental sync: fetch actors updated since last sync + tombstones. 408/// Uses keyset pagination on (updated_at, did) to drain all updates. 409/// Only advances watermark when all turso queries succeed and all local writes succeed. 410pub fn incrementalSync(turso: *TursoClient, local: *LocalDb) !void { 411 const conn = local.getConn() orelse return error.LocalNotOpen; 412 413 // get last sync time 414 local.lock(); 415 const last_sync_ts = blk: { 416 const row = conn.row( 417 "SELECT value FROM sync_meta WHERE key = 'last_sync'", 418 .{}, 419 ) catch { 420 local.unlock(); 421 break :blk @as(i64, 0); 422 }; 423 if (row) |r| { 424 defer r.deinit(); 425 const val = r.text(0); 426 local.unlock(); 427 break :blk if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0; 428 } 429 local.unlock(); 430 break :blk @as(i64, 0); 431 }; 432 433 if (last_sync_ts == 0) { 434 log.info("no last_sync found, doing full sync", .{}); 435 return fullSync(turso, local, false); 436 } 437 438 // if last sync is older than tombstone retention, local data is too stale — 439 // tombstones may have been pruned, so incremental sync can't catch deletions. 440 // wipe and rebuild from scratch. 441 const now = std.time.timestamp(); 442 if (now - last_sync_ts > TOMBSTONE_RETENTION_S) { 443 log.warn("last sync {d}s ago (>{d}s tombstone retention), wiping stale replica", .{ 444 now - last_sync_ts, TOMBSTONE_RETENTION_S, 445 }); 446 return fullSync(turso, local, true); 447 } 448 449 local.setReady(true); 450 451 // buffer: subtract 300s to catch stragglers (matches leaflet-search) 452 const since_ts = last_sync_ts - 300; 453 var since_buf: [20]u8 = undefined; 454 const since_str = std.fmt.bufPrint(&since_buf, "{d}", .{since_ts}) catch return; 455 456 // fetch updated searchable actors — keyset pagination to drain all 457 var updated: usize = 0; 458 var error_count: usize = 0; 459 var had_turso_error = false; 460 { 461 var cursor_ts_buf: [20]u8 = undefined; 462 var cursor_did_buf: [128]u8 = undefined; 463 @memcpy(cursor_ts_buf[0..since_str.len], since_str); 464 var cursor_ts: []const u8 = cursor_ts_buf[0..since_str.len]; 465 var cursor_did_len: usize = 0; 466 467 while (true) { 468 const cursor_did: []const u8 = if (cursor_did_len > 0) cursor_did_buf[0..cursor_did_len] else ""; 469 470 const args: []const []const u8 = if (cursor_did_len > 0) 471 &.{ cursor_ts, cursor_ts, cursor_did } 472 else 473 &.{cursor_ts}; 474 475 const sql = if (cursor_did_len > 0) 476 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at 477 \\FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) 478 \\ORDER BY updated_at, did LIMIT 2000 479 else 480 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at 481 \\FROM actors WHERE handle != '' AND updated_at > ?1 482 \\ORDER BY updated_at, did LIMIT 2000 483 ; 484 485 var result = turso.query(sql, args) catch |err| { 486 log.err("incremental query failed: {}", .{err}); 487 had_turso_error = true; 488 break; 489 }; 490 defer result.deinit(); 491 492 if (result.rows.len == 0) break; 493 494 { 495 local.lock(); 496 defer local.unlock(); 497 conn.exec("BEGIN", .{}) catch {}; 498 for (result.rows) |row| { 499 upsertActorLocal(conn, row) catch { 500 error_count += 1; 501 continue; 502 }; 503 updated += 1; 504 } 505 conn.exec("COMMIT", .{}) catch {}; 506 } 507 508 // advance cursor to last row's (updated_at, did) 509 const last_row = result.rows[result.rows.len - 1]; 510 const last_ua_int = last_row.int(8); // updated_at is integer 511 const last_did = last_row.text(0); 512 513 // serialize updated_at integer to string for next query 514 const ua_str = std.fmt.bufPrint(&cursor_ts_buf, "{d}", .{last_ua_int}) catch break; 515 cursor_ts = ua_str; 516 517 if (last_did.len > 0 and last_did.len <= cursor_did_buf.len) { 518 @memcpy(cursor_did_buf[0..last_did.len], last_did); 519 cursor_did_len = last_did.len; 520 } 521 522 // safety: stop if batch was less than full (we've drained) 523 if (result.rows.len < BATCH_SIZE) break; 524 } 525 } 526 527 // fetch actors that became unsearchable (handle cleared) 528 var cleared: usize = 0; 529 unsearchable: { 530 var result = turso.query( 531 "SELECT did FROM actors WHERE updated_at > ? AND handle = ''", 532 &.{since_str}, 533 ) catch |err| { 534 log.err("unsearchable query failed: {}", .{err}); 535 had_turso_error = true; 536 break :unsearchable; 537 }; 538 defer result.deinit(); 539 540 if (result.rows.len > 0) { 541 local.lock(); 542 defer local.unlock(); 543 for (result.rows) |row| { 544 const did = row.text(0); 545 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 546 error_count += 1; 547 continue; 548 }; 549 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 550 cleared += 1; 551 } 552 } 553 } 554 555 // fetch tombstones 556 var deleted: usize = 0; 557 tombstone: { 558 var tomb_result = turso.query( 559 "SELECT did FROM tombstones WHERE deleted_at > ?", 560 &.{since_str}, 561 ) catch |err| { 562 log.err("tombstone query failed: {}", .{err}); 563 had_turso_error = true; 564 break :tombstone; 565 }; 566 defer tomb_result.deinit(); 567 568 if (tomb_result.rows.len > 0) { 569 local.lock(); 570 defer local.unlock(); 571 for (tomb_result.rows) |row| { 572 const did = row.text(0); 573 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 574 error_count += 1; 575 continue; 576 }; 577 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 578 deleted += 1; 579 } 580 } 581 } 582 583 // only advance watermark when everything succeeded 584 const had_error = had_turso_error or error_count > 0; 585 if (had_error) { 586 log.warn("incremental sync had errors — {d} updated, {d} deleted, {d} cleared, {d} write errors, turso_error={}", .{ 587 updated, deleted, cleared, error_count, had_turso_error, 588 }); 589 } else { 590 local.lock(); 591 defer local.unlock(); 592 var ts_buf: [20]u8 = undefined; 593 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 594 conn.exec( 595 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 596 .{ts_str}, 597 ) catch {}; 598 599 // refresh cached actor count when data changed 600 if (updated > 0 or deleted > 0 or cleared > 0) { 601 updateActorCount(conn); 602 log.info("incremental sync — {d} updated, {d} deleted, {d} cleared", .{ updated, deleted, cleared }); 603 } 604 605 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 606 } 607} 608 609/// Background sync loop: full sync on boot, then incremental every 5 min 610pub fn syncLoop(allocator: Allocator, local: *LocalDb) void { 611 var turso = TursoClient.init(allocator) catch |err| { 612 log.err("turso client init failed: {}, sync disabled", .{err}); 613 return; 614 }; 615 defer turso.deinit(); 616 617 // initial sync (full or incremental depending on state) 618 incrementalSync(&turso, local) catch |err| { 619 log.err("initial sync failed: {}", .{err}); 620 }; 621 622 // periodic incremental sync 623 while (true) { 624 std.Thread.sleep(SYNC_INTERVAL_S * std.time.ns_per_s); 625 626 incrementalSync(&turso, local) catch |err| { 627 log.err("periodic sync failed: {}", .{err}); 628 }; 629 } 630} 631 632/// Insert actor row into staging table (no indexes, no FTS — used during bootstrap) 633fn insertActorStage(conn: zqlite.Conn, row: anytype) !void { 634 conn.exec( 635 \\INSERT INTO actors_stage 636 \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 637 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 638 , .{ 639 row.text(0), // did 640 row.text(1), // handle 641 row.text(2), // display_name 642 row.text(3), // avatar_url 643 row.int(4), // hidden 644 row.text(5), // labels 645 row.text(6), // created_at 646 row.text(7), // associated 647 }) catch |err| { 648 return err; 649 }; 650} 651 652/// Insert actor row without FTS update (used during re-sync on live table) 653fn insertActorOnly(conn: zqlite.Conn, row: anytype) !void { 654 conn.exec( 655 \\INSERT OR REPLACE INTO actors 656 \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 657 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 658 , .{ 659 row.text(0), // did 660 row.text(1), // handle 661 row.text(2), // display_name 662 row.text(3), // avatar_url 663 row.int(4), // hidden 664 row.text(5), // labels 665 row.text(6), // created_at 666 row.text(7), // associated 667 }) catch |err| { 668 return err; 669 }; 670} 671 672/// Upsert a turso row into local SQLite + FTS 673fn upsertActorLocal(conn: zqlite.Conn, row: anytype) !void { 674 const did = row.text(0); 675 676 conn.exec( 677 \\INSERT OR REPLACE INTO actors 678 \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 679 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 680 , .{ 681 did, 682 row.text(1), // handle 683 row.text(2), // display_name 684 row.text(3), // avatar_url 685 row.int(4), // hidden 686 row.text(5), // labels 687 row.text(6), // created_at 688 row.text(7), // associated 689 }) catch |err| { 690 return err; 691 }; 692 693 // manual FTS update (standalone FTS, not content-synced) 694 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 695 conn.exec( 696 "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)", 697 .{ did, row.text(1), row.text(2) }, 698 ) catch {}; 699} 700 701/// Update cached actor count in sync_meta (avoids COUNT(*) on health checks) 702fn updateActorCount(conn: zqlite.Conn) void { 703 const row = conn.row("SELECT COUNT(*) FROM actors WHERE handle != ''", .{}) catch return; 704 if (row) |r| { 705 defer r.deinit(); 706 var buf: [20]u8 = undefined; 707 const count_str = std.fmt.bufPrint(&buf, "{d}", .{r.int(0)}) catch return; 708 conn.exec( 709 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 710 .{count_str}, 711 ) catch {}; 712 } 713}