search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: sync publications incrementally (close 852-row replica gap)

publications were silently omitted from incrementalSync — they only
synced on fullSync (first boot), so every publication added after
the initial sync stayed invisible to the local replica forever.
turso had 4,476 publications, replica had 3,624.

root cause: publications table had no indexed_at column, so there
was no incremental cursor.

- schema: add publications.indexed_at + one-time backfill to now()
- indexer.insertPublication: stamp indexed_at via ON CONFLICT DO UPDATE
(was INSERT OR REPLACE with no timestamp)
- sync.incrementalSync: add publications fetch mirroring documents,
with its own labeled block so a failure falls through to tombstones
- sync_span gains new_pubs attribute alongside new_docs/deleted
- local replica schema + migration gain publications.indexed_at
- fullSync publications SELECT now includes indexed_at column

the schema backfill sets every existing publication's indexed_at to
the deploy timestamp, which means the first post-deploy incremental
sync pulls all 4,476 rows down in one shot (~1 round trip).

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+59 -7
+3 -1
backend/src/db/LocalDb.zig
··· 181 181 \\ description TEXT, 182 182 \\ base_path TEXT, 183 183 \\ platform TEXT DEFAULT 'leaflet', 184 - \\ source_collection TEXT 184 + \\ source_collection TEXT, 185 + \\ indexed_at TEXT 185 186 \\) 186 187 , .{}) catch |err| { 187 188 std.debug.print("local db: failed to create publications table: {}\n", .{err}); ··· 255 256 c.exec("ALTER TABLE documents ADD COLUMN embedded_at TEXT", .{}) catch {}; 256 257 c.exec("ALTER TABLE documents ADD COLUMN cover_image TEXT DEFAULT ''", .{}) catch {}; 257 258 c.exec("ALTER TABLE documents ADD COLUMN is_bridgyfed INTEGER DEFAULT 0", .{}) catch {}; 259 + c.exec("ALTER TABLE publications ADD COLUMN indexed_at TEXT", .{}) catch {}; 258 260 } 259 261 260 262 /// Row adapter matching result.Row interface (column-indexed access)
+11
backend/src/db/schema.zig
··· 251 251 // is_bridgyfed: marks documents from bridgy fed (brid.gy PDS) 252 252 // 0 = normal, 1 = bridgy fed (excluded from search by default) 253 253 client.exec("ALTER TABLE documents ADD COLUMN is_bridgyfed INTEGER DEFAULT 0", &.{}) catch {}; 254 + 255 + // indexed_at for publications — mirrors documents.indexed_at so incremental 256 + // sync can pull new/updated publications. without this, publications only 257 + // sync on fullSync (first boot) and drift forever afterward. 258 + client.exec("ALTER TABLE publications ADD COLUMN indexed_at TEXT", &.{}) catch {}; 259 + // one-time backfill so existing rows become visible to the incremental sync. 260 + // subsequent inserts/updates stamp indexed_at in indexer.zig. 261 + client.exec( 262 + "UPDATE publications SET indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') WHERE indexed_at IS NULL", 263 + &.{}, 264 + ) catch {}; 254 265 }
+35 -5
backend/src/db/sync.zig
··· 106 106 var pub_count: usize = 0; 107 107 { 108 108 var pub_result = turso.query( 109 - "SELECT uri, did, rkey, name, description, base_path, platform FROM publications", 109 + "SELECT uri, did, rkey, name, description, base_path, platform, indexed_at FROM publications", 110 110 &.{}, 111 111 ) catch |err| { 112 112 std.debug.print("sync: turso publications query failed: {}\n", .{err}); ··· 314 314 ) catch {}; 315 315 } 316 316 317 + // fetch new/updated publications (same since_str as documents). 318 + // publications were silently missing from incremental sync before 319 + // `publications.indexed_at` was added — they only synced on fullSync, 320 + // so the local replica drifted by hundreds of rows over time. 321 + var new_pubs: usize = 0; 322 + pubs: { 323 + var pub_result = turso.query( 324 + \\SELECT uri, did, rkey, name, description, base_path, platform, indexed_at 325 + \\FROM publications 326 + \\WHERE indexed_at >= ? 327 + \\ORDER BY indexed_at 328 + , &.{since_str}) catch |err| { 329 + std.debug.print("sync: incremental publications query failed: {}\n", .{err}); 330 + sync_span.recordError(err); 331 + // fall through to tombstones — don't abort entire sync 332 + break :pubs; 333 + }; 334 + defer pub_result.deinit(); 335 + 336 + local.lock(); 337 + defer local.unlock(); 338 + 339 + for (pub_result.rows) |row| { 340 + insertPublicationLocal(conn, row) catch {}; 341 + new_pubs += 1; 342 + } 343 + } 344 + 317 345 // sync deletions via tombstones (deleted_at is unix timestamp integer) 318 346 var deleted: usize = 0; 319 347 tombstone: { ··· 355 383 local.unlock(); 356 384 357 385 sync_span.setAttribute("new_docs", @as(i64, @intCast(new_docs))); 386 + sync_span.setAttribute("new_pubs", @as(i64, @intCast(new_pubs))); 358 387 sync_span.setAttribute("deleted", @as(i64, @intCast(deleted))); 359 388 360 - if (new_docs > 0 or deleted > 0) { 361 - std.debug.print("sync: incremental sync — {d} new docs, {d} tombstone deletions\n", .{ new_docs, deleted }); 389 + if (new_docs > 0 or new_pubs > 0 or deleted > 0) { 390 + std.debug.print("sync: incremental sync — {d} new docs, {d} new pubs, {d} tombstone deletions\n", .{ new_docs, new_pubs, deleted }); 362 391 } 363 392 } 364 393 ··· 403 432 // insert into main table (no created_at - Turso publications table doesn't have it) 404 433 conn.exec( 405 434 \\INSERT OR REPLACE INTO publications 406 - \\(uri, did, rkey, name, description, base_path, platform) 407 - \\VALUES (?, ?, ?, ?, ?, ?, ?) 435 + \\(uri, did, rkey, name, description, base_path, platform, indexed_at) 436 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 408 437 , .{ 409 438 row.text(0), // uri 410 439 row.text(1), // did ··· 413 442 row.text(4), // description 414 443 row.text(5), // base_path 415 444 row.text(6), // platform 445 + row.text(7), // indexed_at 416 446 }) catch |err| { 417 447 return err; 418 448 };
+10 -1
backend/src/ingest/indexer.zig
··· 261 261 } else |_| {} 262 262 263 263 try c.exec( 264 - "INSERT OR REPLACE INTO publications (uri, did, rkey, name, description, base_path) VALUES (?, ?, ?, ?, ?, ?)", 264 + \\INSERT INTO publications (uri, did, rkey, name, description, base_path, indexed_at) 265 + \\VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%S', 'now')) 266 + \\ON CONFLICT(uri) DO UPDATE SET 267 + \\ did = excluded.did, 268 + \\ rkey = excluded.rkey, 269 + \\ name = excluded.name, 270 + \\ description = excluded.description, 271 + \\ base_path = excluded.base_path, 272 + \\ indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') 273 + , 265 274 &.{ uri, did, rkey, name, description orelse "", base_path orelse "" }, 266 275 ); 267 276