GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1//! Sync from Turso to local SQLite
2//! Full sync on startup, incremental sync every 5 minutes
3
4const std = @import("std");
5const zqlite = @import("zqlite");
6const Allocator = std.mem.Allocator;
7const TursoClient = @import("TursoClient.zig");
8const LocalDb = @import("LocalDb.zig");
9
10const log = std.log.scoped(.sync);
11
12const BATCH_SIZE = 2000;
13const SYNC_INTERVAL_S = 300; // 5 minutes
14const TOMBSTONE_RETENTION_S = 7 * 24 * 3600; // 7 days — must match cron.ts pruning
15
16/// Full sync: fetch all searchable actors from Turso using keyset pagination.
17/// Does NOT run stale cleanup — that's handled by incremental sync (tombstones +
18/// "became unsearchable" queries). This avoids races with the dual-write path.
19/// When `wipe` is true, deletes all local data first (used by stale rebuild path).
20pub fn fullSync(turso: *TursoClient, local: *LocalDb, wipe: bool) !void {
21 log.info("starting full sync (wipe={})...", .{wipe});
22
23 const conn = local.getConn() orelse return error.LocalNotOpen;
24
25 // check if a previous full sync completed successfully
26 const was_completed = blk: {
27 local.lock();
28 defer local.unlock();
29 const row = conn.row(
30 "SELECT value FROM sync_meta WHERE key = 'sync_complete'",
31 .{},
32 ) catch break :blk false;
33 if (row) |r| {
34 defer r.deinit();
35 break :blk std.mem.eql(u8, r.text(0), "1");
36 }
37 break :blk false;
38 };
39
40 if (wipe or !was_completed) {
41 // bootstrap path: use staging tables for fast bulk load
42 return fullSyncBootstrap(turso, local, conn, wipe);
43 }
44
45 // re-sync path: previous sync completed, keep serving while re-syncing
46 local.setReady(true);
47 log.info("previous sync complete, keeping ready during re-sync", .{});
48 return fullSyncResync(turso, local, conn);
49}
50
51/// Bootstrap path: load into staging tables (no indexes), build indexes in bulk,
52/// then atomically swap into place. Live schema is never in a degraded state.
53fn fullSyncBootstrap(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn, wipe: bool) !void {
54 const total_t0 = std.time.milliTimestamp();
55 local.setReady(false);
56
57 // close read connection during bootstrap — not serving traffic,
58 // avoids WAL reader lock interference with DDL/index builds
59 local.closeReadConn();
60 errdefer local.reopenReadConn() catch {};
61
62 if (wipe) {
63 local.lock();
64 defer local.unlock();
65 conn.exec("DELETE FROM sync_meta", .{}) catch {};
66 }
67
68 // clean up any leftover staging tables from a previous failed attempt
69 {
70 local.lock();
71 defer local.unlock();
72 conn.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {};
73 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
74 }
75
76 // create staging table — no PK, no indexes for fast sequential appends
77 {
78 local.lock();
79 defer local.unlock();
80 conn.exec(
81 \\CREATE TABLE actors_stage (
82 \\ did TEXT NOT NULL,
83 \\ handle TEXT NOT NULL DEFAULT '',
84 \\ display_name TEXT DEFAULT '',
85 \\ avatar_url TEXT DEFAULT '',
86 \\ hidden INTEGER NOT NULL DEFAULT 0,
87 \\ labels TEXT NOT NULL DEFAULT '[]',
88 \\ created_at TEXT DEFAULT '',
89 \\ associated TEXT DEFAULT '{}'
90 \\)
91 , .{}) catch |err| {
92 log.err("failed to create actors_stage: {}", .{err});
93 return err;
94 };
95 }
96
97 // keyset pagination: fetch rows WHERE rowid > last_rowid ORDER BY rowid
98 var actor_count: usize = 0;
99 var error_count: usize = 0;
100 var last_rowid: i64 = 0;
101 var had_turso_error = false;
102 const load_t0 = std.time.milliTimestamp();
103
104 while (true) {
105 var rowid_buf: [20]u8 = undefined;
106 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break;
107
108 const t0 = std.time.milliTimestamp();
109
110 // fetch from turso (no lock held)
111 var result = turso.query(
112 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid
113 \\FROM actors WHERE handle != '' AND rowid > ?
114 \\ORDER BY rowid LIMIT 2000
115 , &.{rowid_str}) catch |err| {
116 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err });
117 had_turso_error = true;
118 break;
119 };
120 defer result.deinit();
121
122 const t1 = std.time.milliTimestamp();
123
124 if (result.rows.len == 0) break;
125
126 // write batch to staging table — no indexes, fast sequential appends
127 {
128 local.lock();
129 defer local.unlock();
130 conn.exec("BEGIN", .{}) catch {};
131 for (result.rows) |row| {
132 insertActorStage(conn, row) catch |err| {
133 log.err("insert actor failed: {}", .{err});
134 error_count += 1;
135 continue;
136 };
137 actor_count += 1;
138 }
139 conn.exec("COMMIT", .{}) catch {};
140 }
141
142 const t2 = std.time.milliTimestamp();
143
144 // advance cursor to last row's rowid (column 8)
145 last_rowid = result.rows[result.rows.len - 1].int(8);
146
147 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{
148 result.rows.len, last_rowid, actor_count,
149 t1 - t0, t2 - t1,
150 });
151 }
152
153 if (had_turso_error or error_count > 0) {
154 log.warn("bootstrap incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error });
155 // clean up staging table
156 {
157 local.lock();
158 defer local.unlock();
159 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
160 }
161 // restore read connection so health endpoint works
162 local.reopenReadConn() catch {};
163 return;
164 }
165
166 const load_ms = std.time.milliTimestamp() - load_t0;
167
168 // checkpoint WAL before heavy DDL — clean slate for index builds
169 {
170 local.lock();
171 defer local.unlock();
172 conn.exec("PRAGMA wal_checkpoint(TRUNCATE)", .{}) catch {};
173 }
174
175 // build indexes on staging table
176 log.info("building indexes on {d} rows...", .{actor_count});
177 const idx_did_t0 = std.time.milliTimestamp();
178 {
179 local.lock();
180 defer local.unlock();
181 conn.exec("CREATE UNIQUE INDEX idx_stage_did ON actors_stage(did)", .{}) catch |err| {
182 log.err("failed to create unique index on staging: {}", .{err});
183 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
184 return err;
185 };
186 }
187 const idx_did_ms = std.time.milliTimestamp() - idx_did_t0;
188
189 const idx_handle_t0 = std.time.milliTimestamp();
190 {
191 local.lock();
192 defer local.unlock();
193 conn.exec("CREATE INDEX idx_stage_handle ON actors_stage(handle COLLATE NOCASE)", .{}) catch |err| {
194 log.err("failed to create handle index on staging: {}", .{err});
195 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
196 return err;
197 };
198 }
199 const idx_handle_ms = std.time.milliTimestamp() - idx_handle_t0;
200
201 // swap actors table first, then build FTS with final name.
202 // FTS5 virtual tables use shadow tables (_content, _data, _idx, etc.)
203 // that don't reliably rename with ALTER TABLE RENAME — so we never rename FTS5.
204 {
205 local.lock();
206 defer local.unlock();
207 conn.exec("BEGIN", .{}) catch {};
208 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {};
209 conn.exec("DROP TABLE IF EXISTS actors", .{}) catch {};
210 conn.exec("ALTER TABLE actors_stage RENAME TO actors", .{}) catch |err| {
211 log.err("swap failed (rename actors_stage): {}", .{err});
212 conn.exec("ROLLBACK", .{}) catch {};
213 return err;
214 };
215 conn.exec("COMMIT", .{}) catch |err| {
216 log.err("swap commit failed: {}", .{err});
217 return err;
218 };
219 }
220
221 // build FTS with its final name — no rename needed
222 const fts_create_t0 = std.time.milliTimestamp();
223 {
224 local.lock();
225 defer local.unlock();
226 conn.exec(
227 \\CREATE VIRTUAL TABLE actors_fts USING fts5(
228 \\ did UNINDEXED, handle, display_name,
229 \\ tokenize='unicode61 remove_diacritics 2'
230 \\)
231 , .{}) catch |err| {
232 log.err("failed to create FTS table: {}", .{err});
233 return err;
234 };
235 }
236 const fts_create_ms = std.time.milliTimestamp() - fts_create_t0;
237
238 const fts_pop_t0 = std.time.milliTimestamp();
239 {
240 local.lock();
241 defer local.unlock();
242 conn.exec(
243 \\INSERT INTO actors_fts (did, handle, display_name)
244 \\SELECT did, handle, display_name FROM actors WHERE handle != ''
245 , .{}) catch |err| {
246 log.err("FTS populate failed: {}", .{err});
247 return err;
248 };
249 }
250 const fts_pop_ms = std.time.milliTimestamp() - fts_pop_t0;
251
252 // post-swap: optimize, record completion + actor count, checkpoint
253 {
254 local.lock();
255 defer local.unlock();
256 conn.exec("PRAGMA optimize", .{}) catch {};
257
258 var ts_buf: [20]u8 = undefined;
259 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0";
260 conn.exec(
261 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)",
262 .{ts_str},
263 ) catch {};
264 conn.exec(
265 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')",
266 .{},
267 ) catch {};
268
269 // cache actor count so health endpoint doesn't need COUNT(*)
270 var count_buf: [20]u8 = undefined;
271 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0";
272 conn.exec(
273 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)",
274 .{count_str},
275 ) catch {};
276
277 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
278 }
279
280 const total_ms = std.time.milliTimestamp() - total_t0;
281
282 // reopen read connection before serving traffic
283 local.reopenReadConn() catch |err| {
284 log.err("failed to reopen read conn after bootstrap: {}", .{err});
285 };
286
287 local.setReady(true);
288 log.info("bootstrap complete — total_rows={d} load_ms={d} idx_did_ms={d} idx_handle_ms={d} fts_create_ms={d} fts_populate_ms={d} total_ms={d}", .{
289 actor_count, load_ms, idx_did_ms, idx_handle_ms, fts_create_ms, fts_pop_ms, total_ms,
290 });
291}
292
293/// Re-sync path: previous sync completed, update live table directly.
294/// Only processes actors that may have changed — index maintenance cost is negligible.
295fn fullSyncResync(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn) !void {
296 var actor_count: usize = 0;
297 var error_count: usize = 0;
298 var last_rowid: i64 = 0;
299 var had_turso_error = false;
300
301 while (true) {
302 var rowid_buf: [20]u8 = undefined;
303 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break;
304
305 const t0 = std.time.milliTimestamp();
306
307 var result = turso.query(
308 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid
309 \\FROM actors WHERE handle != '' AND rowid > ?
310 \\ORDER BY rowid LIMIT 2000
311 , &.{rowid_str}) catch |err| {
312 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err });
313 had_turso_error = true;
314 break;
315 };
316 defer result.deinit();
317
318 const t1 = std.time.milliTimestamp();
319
320 if (result.rows.len == 0) break;
321
322 {
323 local.lock();
324 defer local.unlock();
325 conn.exec("BEGIN", .{}) catch {};
326 for (result.rows) |row| {
327 insertActorOnly(conn, row) catch |err| {
328 log.err("insert actor failed: {}", .{err});
329 error_count += 1;
330 continue;
331 };
332 actor_count += 1;
333 }
334 conn.exec("COMMIT", .{}) catch {};
335 }
336
337 const t2 = std.time.milliTimestamp();
338
339 last_rowid = result.rows[result.rows.len - 1].int(8);
340
341 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{
342 result.rows.len, last_rowid, actor_count,
343 t1 - t0, t2 - t1,
344 });
345 }
346
347 if (had_turso_error or error_count > 0) {
348 log.warn("full sync incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error });
349 return;
350 }
351
352 // rebuild FTS from scratch (DROP + CREATE — never reuse a potentially
353 // broken FTS table from a previous failed rename)
354 {
355 log.info("building FTS index for {d} actors...", .{actor_count});
356 const fts_t0 = std.time.milliTimestamp();
357 local.lock();
358 defer local.unlock();
359 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {};
360 conn.exec(
361 \\CREATE VIRTUAL TABLE actors_fts USING fts5(
362 \\ did UNINDEXED, handle, display_name,
363 \\ tokenize='unicode61 remove_diacritics 2'
364 \\)
365 , .{}) catch |err| {
366 log.err("failed to create FTS table: {}", .{err});
367 };
368 conn.exec(
369 \\INSERT INTO actors_fts (did, handle, display_name)
370 \\SELECT did, handle, display_name FROM actors WHERE handle != ''
371 , .{}) catch |err| {
372 log.err("FTS bulk insert failed: {}", .{err});
373 };
374 const fts_t1 = std.time.milliTimestamp();
375 log.info("FTS index built in {d}ms", .{fts_t1 - fts_t0});
376 }
377
378 // record sync time + mark complete + actor count
379 {
380 local.lock();
381 defer local.unlock();
382 var ts_buf: [20]u8 = undefined;
383 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0";
384 conn.exec(
385 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)",
386 .{ts_str},
387 ) catch {};
388 conn.exec(
389 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')",
390 .{},
391 ) catch {};
392
393 var count_buf: [20]u8 = undefined;
394 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0";
395 conn.exec(
396 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)",
397 .{count_str},
398 ) catch {};
399
400 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
401 }
402
403 local.setReady(true);
404 log.info("full sync complete — {d} actors", .{actor_count});
405}
406
407/// Incremental sync: fetch actors updated since last sync + tombstones.
408/// Uses keyset pagination on (updated_at, did) to drain all updates.
409/// Only advances watermark when all turso queries succeed and all local writes succeed.
410pub fn incrementalSync(turso: *TursoClient, local: *LocalDb) !void {
411 const conn = local.getConn() orelse return error.LocalNotOpen;
412
413 // get last sync time
414 local.lock();
415 const last_sync_ts = blk: {
416 const row = conn.row(
417 "SELECT value FROM sync_meta WHERE key = 'last_sync'",
418 .{},
419 ) catch {
420 local.unlock();
421 break :blk @as(i64, 0);
422 };
423 if (row) |r| {
424 defer r.deinit();
425 const val = r.text(0);
426 local.unlock();
427 break :blk if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0;
428 }
429 local.unlock();
430 break :blk @as(i64, 0);
431 };
432
433 if (last_sync_ts == 0) {
434 log.info("no last_sync found, doing full sync", .{});
435 return fullSync(turso, local, false);
436 }
437
438 // if last sync is older than tombstone retention, local data is too stale —
439 // tombstones may have been pruned, so incremental sync can't catch deletions.
440 // wipe and rebuild from scratch.
441 const now = std.time.timestamp();
442 if (now - last_sync_ts > TOMBSTONE_RETENTION_S) {
443 log.warn("last sync {d}s ago (>{d}s tombstone retention), wiping stale replica", .{
444 now - last_sync_ts, TOMBSTONE_RETENTION_S,
445 });
446 return fullSync(turso, local, true);
447 }
448
449 local.setReady(true);
450
451 // buffer: subtract 300s to catch stragglers (matches leaflet-search)
452 const since_ts = last_sync_ts - 300;
453 var since_buf: [20]u8 = undefined;
454 const since_str = std.fmt.bufPrint(&since_buf, "{d}", .{since_ts}) catch return;
455
456 // fetch updated searchable actors — keyset pagination to drain all
457 var updated: usize = 0;
458 var error_count: usize = 0;
459 var had_turso_error = false;
460 {
461 var cursor_ts_buf: [20]u8 = undefined;
462 var cursor_did_buf: [128]u8 = undefined;
463 @memcpy(cursor_ts_buf[0..since_str.len], since_str);
464 var cursor_ts: []const u8 = cursor_ts_buf[0..since_str.len];
465 var cursor_did_len: usize = 0;
466
467 while (true) {
468 const cursor_did: []const u8 = if (cursor_did_len > 0) cursor_did_buf[0..cursor_did_len] else "";
469
470 const args: []const []const u8 = if (cursor_did_len > 0)
471 &.{ cursor_ts, cursor_ts, cursor_did }
472 else
473 &.{cursor_ts};
474
475 const sql = if (cursor_did_len > 0)
476 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at
477 \\FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3))
478 \\ORDER BY updated_at, did LIMIT 2000
479 else
480 \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at
481 \\FROM actors WHERE handle != '' AND updated_at > ?1
482 \\ORDER BY updated_at, did LIMIT 2000
483 ;
484
485 var result = turso.query(sql, args) catch |err| {
486 log.err("incremental query failed: {}", .{err});
487 had_turso_error = true;
488 break;
489 };
490 defer result.deinit();
491
492 if (result.rows.len == 0) break;
493
494 {
495 local.lock();
496 defer local.unlock();
497 conn.exec("BEGIN", .{}) catch {};
498 for (result.rows) |row| {
499 upsertActorLocal(conn, row) catch {
500 error_count += 1;
501 continue;
502 };
503 updated += 1;
504 }
505 conn.exec("COMMIT", .{}) catch {};
506 }
507
508 // advance cursor to last row's (updated_at, did)
509 const last_row = result.rows[result.rows.len - 1];
510 const last_ua_int = last_row.int(8); // updated_at is integer
511 const last_did = last_row.text(0);
512
513 // serialize updated_at integer to string for next query
514 const ua_str = std.fmt.bufPrint(&cursor_ts_buf, "{d}", .{last_ua_int}) catch break;
515 cursor_ts = ua_str;
516
517 if (last_did.len > 0 and last_did.len <= cursor_did_buf.len) {
518 @memcpy(cursor_did_buf[0..last_did.len], last_did);
519 cursor_did_len = last_did.len;
520 }
521
522 // safety: stop if batch was less than full (we've drained)
523 if (result.rows.len < BATCH_SIZE) break;
524 }
525 }
526
527 // fetch actors that became unsearchable (handle cleared)
528 var cleared: usize = 0;
529 unsearchable: {
530 var result = turso.query(
531 "SELECT did FROM actors WHERE updated_at > ? AND handle = ''",
532 &.{since_str},
533 ) catch |err| {
534 log.err("unsearchable query failed: {}", .{err});
535 had_turso_error = true;
536 break :unsearchable;
537 };
538 defer result.deinit();
539
540 if (result.rows.len > 0) {
541 local.lock();
542 defer local.unlock();
543 for (result.rows) |row| {
544 const did = row.text(0);
545 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {
546 error_count += 1;
547 continue;
548 };
549 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {};
550 cleared += 1;
551 }
552 }
553 }
554
555 // fetch tombstones
556 var deleted: usize = 0;
557 tombstone: {
558 var tomb_result = turso.query(
559 "SELECT did FROM tombstones WHERE deleted_at > ?",
560 &.{since_str},
561 ) catch |err| {
562 log.err("tombstone query failed: {}", .{err});
563 had_turso_error = true;
564 break :tombstone;
565 };
566 defer tomb_result.deinit();
567
568 if (tomb_result.rows.len > 0) {
569 local.lock();
570 defer local.unlock();
571 for (tomb_result.rows) |row| {
572 const did = row.text(0);
573 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {
574 error_count += 1;
575 continue;
576 };
577 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {};
578 deleted += 1;
579 }
580 }
581 }
582
583 // only advance watermark when everything succeeded
584 const had_error = had_turso_error or error_count > 0;
585 if (had_error) {
586 log.warn("incremental sync had errors — {d} updated, {d} deleted, {d} cleared, {d} write errors, turso_error={}", .{
587 updated, deleted, cleared, error_count, had_turso_error,
588 });
589 } else {
590 local.lock();
591 defer local.unlock();
592 var ts_buf: [20]u8 = undefined;
593 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0";
594 conn.exec(
595 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)",
596 .{ts_str},
597 ) catch {};
598
599 // refresh cached actor count when data changed
600 if (updated > 0 or deleted > 0 or cleared > 0) {
601 updateActorCount(conn);
602 log.info("incremental sync — {d} updated, {d} deleted, {d} cleared", .{ updated, deleted, cleared });
603 }
604
605 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
606 }
607}
608
609/// Background sync loop: full sync on boot, then incremental every 5 min
610pub fn syncLoop(allocator: Allocator, local: *LocalDb) void {
611 var turso = TursoClient.init(allocator) catch |err| {
612 log.err("turso client init failed: {}, sync disabled", .{err});
613 return;
614 };
615 defer turso.deinit();
616
617 // initial sync (full or incremental depending on state)
618 incrementalSync(&turso, local) catch |err| {
619 log.err("initial sync failed: {}", .{err});
620 };
621
622 // periodic incremental sync
623 while (true) {
624 std.Thread.sleep(SYNC_INTERVAL_S * std.time.ns_per_s);
625
626 incrementalSync(&turso, local) catch |err| {
627 log.err("periodic sync failed: {}", .{err});
628 };
629 }
630}
631
632/// Insert actor row into staging table (no indexes, no FTS — used during bootstrap)
633fn insertActorStage(conn: zqlite.Conn, row: anytype) !void {
634 conn.exec(
635 \\INSERT INTO actors_stage
636 \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated)
637 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?)
638 , .{
639 row.text(0), // did
640 row.text(1), // handle
641 row.text(2), // display_name
642 row.text(3), // avatar_url
643 row.int(4), // hidden
644 row.text(5), // labels
645 row.text(6), // created_at
646 row.text(7), // associated
647 }) catch |err| {
648 return err;
649 };
650}
651
652/// Insert actor row without FTS update (used during re-sync on live table)
653fn insertActorOnly(conn: zqlite.Conn, row: anytype) !void {
654 conn.exec(
655 \\INSERT OR REPLACE INTO actors
656 \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated)
657 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?)
658 , .{
659 row.text(0), // did
660 row.text(1), // handle
661 row.text(2), // display_name
662 row.text(3), // avatar_url
663 row.int(4), // hidden
664 row.text(5), // labels
665 row.text(6), // created_at
666 row.text(7), // associated
667 }) catch |err| {
668 return err;
669 };
670}
671
672/// Upsert a turso row into local SQLite + FTS
673fn upsertActorLocal(conn: zqlite.Conn, row: anytype) !void {
674 const did = row.text(0);
675
676 conn.exec(
677 \\INSERT OR REPLACE INTO actors
678 \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated)
679 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?)
680 , .{
681 did,
682 row.text(1), // handle
683 row.text(2), // display_name
684 row.text(3), // avatar_url
685 row.int(4), // hidden
686 row.text(5), // labels
687 row.text(6), // created_at
688 row.text(7), // associated
689 }) catch |err| {
690 return err;
691 };
692
693 // manual FTS update (standalone FTS, not content-synced)
694 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {};
695 conn.exec(
696 "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)",
697 .{ did, row.text(1), row.text(2) },
698 ) catch {};
699}
700
701/// Update cached actor count in sync_meta (avoids COUNT(*) on health checks)
702fn updateActorCount(conn: zqlite.Conn) void {
703 const row = conn.row("SELECT COUNT(*) FROM actors WHERE handle != ''", .{}) catch return;
704 if (row) |r| {
705 defer r.deinit();
706 var buf: [20]u8 = undefined;
707 const count_str = std.fmt.bufPrint(&buf, "{d}", .{r.int(0)}) catch return;
708 conn.exec(
709 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)",
710 .{count_str},
711 ) catch {};
712 }
713}