search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: make sync thread failures visible (watchdog + logfire spans)

local replica silently desynced for 6 days; dashboard showed stale
13,990 while turso had 14,625. sync.zig only used std.debug.print
and zig 0.16 http.Client has no timeout, so a wedged turso call
killed sync forever with no alert.

- sync.incrementalSync now emits a logfire span per run with
new_docs/deleted/since attributes and recordError on failures
- db.zig syncLoop updates a heartbeat atomic before each attempt
- new watchdog thread aborts the process (→ fly restart) if the
heartbeat goes stale for >3x the sync interval
- also instrument the silent drop paths in indexer.zig
(content_hash_dupe, test_domain, bridgy_fed) as tap.dropped
spans so the drop-reason breakdown is queryable in logfire

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

zzstoatzz 99b86c15 81c2ac9f

+75 -8
+55 -6
backend/src/db.zig
··· 1 1 const std = @import("std"); 2 2 const Io = std.Io; 3 + const logfire = @import("logfire"); 3 4 4 5 const schema = @import("db/schema.zig"); 5 6 const result = @import("db/result.zig"); ··· 17 18 var client: ?Client = null; 18 19 var sync_client: ?Client = null; 19 20 var local_db: ?LocalDb = null; 21 + 22 + // sync liveness heartbeat (unix seconds). set at the top of each sync loop 23 + // iteration; watchdog thread aborts the process if it goes stale so fly 24 + // restarts us. zig 0.16 http.Client has no timeout, so a wedged HTTP call 25 + // would otherwise silently kill sync forever. 26 + var sync_heartbeat_s = std.atomic.Value(i64).init(0); 27 + var sync_interval_secs_atomic = std.atomic.Value(u64).init(300); 20 28 21 29 /// Initialize Turso client only (fast, call synchronously at startup). 22 30 /// Schema migrations run separately via initSchema() in the background thread ··· 96 104 }; 97 105 thread.detach(); 98 106 std.debug.print("sync: background thread started\n", .{}); 107 + 108 + const watchdog = std.Thread.spawn(.{}, syncWatchdog, .{io}) catch |err| { 109 + std.debug.print("sync: failed to start watchdog: {}\n", .{err}); 110 + return; 111 + }; 112 + watchdog.detach(); 113 + std.debug.print("sync: watchdog thread started\n", .{}); 114 + } 115 + 116 + fn nowSeconds(io: Io) i64 { 117 + return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 99 118 } 100 119 101 120 fn syncLoop(turso: *Client, local: *LocalDb, io: Io) void { 121 + // get sync interval from env (default 5 minutes) 122 + const interval_secs: u64 = blk: { 123 + const env_val = if (std.c.getenv("SYNC_INTERVAL_SECS")) |p| std.mem.span(p) else "300"; 124 + break :blk std.fmt.parseInt(u64, env_val, 10) catch 300; 125 + }; 126 + sync_interval_secs_atomic.store(interval_secs, .release); 127 + 102 128 // incremental sync on startup — gets new docs + cleans tombstoned deletions 103 129 // (falls back to full sync automatically if no last_sync exists, i.e., first boot) 130 + sync_heartbeat_s.store(nowSeconds(io), .release); 104 131 sync.incrementalSync(turso, local) catch |err| { 105 132 std.debug.print("sync: initial sync failed: {}\n", .{err}); 106 133 }; 107 134 108 - // get sync interval from env (default 5 minutes) 109 - const interval_secs: u64 = blk: { 110 - const env_val = if (std.c.getenv("SYNC_INTERVAL_SECS")) |p| std.mem.span(p) else "300"; 111 - break :blk std.fmt.parseInt(u64, env_val, 10) catch 300; 112 - }; 113 - 114 135 std.debug.print("sync: incremental sync every {d} seconds\n", .{interval_secs}); 115 136 116 137 // periodic incremental sync 117 138 while (true) { 118 139 io.sleep(Io.Duration.fromSeconds(@intCast(interval_secs)), .awake) catch {}; 140 + // update heartbeat before each attempt; a hung HTTP call prevents the 141 + // next iteration's update, which the watchdog will catch. 142 + sync_heartbeat_s.store(nowSeconds(io), .release); 119 143 sync.incrementalSync(turso, local) catch |err| { 120 144 std.debug.print("sync: incremental sync failed: {}\n", .{err}); 121 145 }; 122 146 } 123 147 } 148 + 149 + /// Watchdog: aborts the process if the sync loop hasn't updated its heartbeat 150 + /// within ~3x the configured sync interval. fly auto-restarts the machine. 151 + fn syncWatchdog(io: Io) void { 152 + // wait for syncLoop to record its initial interval + heartbeat 153 + io.sleep(Io.Duration.fromSeconds(30), .awake) catch {}; 154 + 155 + while (true) { 156 + io.sleep(Io.Duration.fromSeconds(60), .awake) catch {}; 157 + 158 + const interval = sync_interval_secs_atomic.load(.acquire); 159 + const heartbeat = sync_heartbeat_s.load(.acquire); 160 + if (heartbeat == 0) continue; // not started yet 161 + 162 + const now_s = nowSeconds(io); 163 + const staleness_s: i64 = now_s - heartbeat; 164 + const max_staleness_s: i64 = @intCast(interval * 3); 165 + 166 + if (staleness_s > max_staleness_s) { 167 + logfire.err("sync watchdog: heartbeat stale for {d}s (max {d}s) — aborting for restart", .{ staleness_s, max_staleness_s }); 168 + std.debug.print("sync watchdog: heartbeat stale for {d}s (max {d}s) — aborting for restart\n", .{ staleness_s, max_staleness_s }); 169 + std.process.exit(1); 170 + } 171 + } 172 + }
+14 -1
backend/src/db/sync.zig
··· 4 4 const std = @import("std"); 5 5 const Io = std.Io; 6 6 const zqlite = @import("zqlite"); 7 + const logfire = @import("logfire"); 7 8 const Allocator = std.mem.Allocator; 8 9 const Client = @import("Client.zig"); 9 10 const LocalDb = @import("LocalDb.zig"); ··· 214 215 215 216 /// Incremental sync: fetch documents created since last sync 216 217 pub fn incrementalSync(turso: *Client, local: *LocalDb) !void { 217 - const conn = local.getConn() orelse return error.LocalNotOpen; 218 + const sync_span = logfire.span("sync.incremental", .{}); 219 + defer sync_span.end(); 220 + 221 + const conn = local.getConn() orelse { 222 + sync_span.recordError(error.LocalNotOpen); 223 + return error.LocalNotOpen; 224 + }; 218 225 219 226 // get last sync time 220 227 local.lock(); ··· 269 276 }; 270 277 271 278 std.debug.print("sync: incremental sync since {s}\n", .{since_str}); 279 + sync_span.setAttribute("since", since_str); 272 280 273 281 // fetch new documents (use indexed_at, not created_at, because resynced 274 282 // documents can have old publication dates but recent insertion times) ··· 283 291 \\ORDER BY indexed_at 284 292 , &.{since_str}) catch |err| { 285 293 std.debug.print("sync: incremental query failed: {}\n", .{err}); 294 + sync_span.recordError(err); 286 295 return; 287 296 }; 288 297 defer result.deinit(); ··· 316 325 &.{since_ts_str}, 317 326 ) catch |err| { 318 327 std.debug.print("sync: tombstone query failed: {}\n", .{err}); 328 + sync_span.recordError(err); 319 329 break :tombstone; 320 330 }; 321 331 defer tomb_result.deinit(); ··· 343 353 local.lock(); 344 354 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 345 355 local.unlock(); 356 + 357 + sync_span.setAttribute("new_docs", @as(i64, @intCast(new_docs))); 358 + sync_span.setAttribute("deleted", @as(i64, @intCast(deleted))); 346 359 347 360 if (new_docs > 0 or deleted > 0) { 348 361 std.debug.print("sync: incremental sync — {d} new docs, {d} tombstone deletions\n", .{ new_docs, deleted });
+6 -1
backend/src/ingest/indexer.zig
··· 56 56 const existing_uri = row.text(0); 57 57 if (!std.mem.eql(u8, existing_uri, uri)) { 58 58 logfire.debug("indexer: skipping dupe for {s} (existing: {s})", .{ uri, existing_uri }); 59 + logfire.span("tap.dropped", .{ .reason = "content_hash_dupe", .uri = uri, .existing_uri = existing_uri }).end(); 59 60 return; 60 61 } 61 62 } ··· 151 152 base_path = base_path[0 .. base_path.len - 1]; 152 153 153 154 // skip .test domains (dev/staging data) 154 - if (std.mem.endsWith(u8, base_path, ".test")) return; 155 + if (std.mem.endsWith(u8, base_path, ".test")) { 156 + logfire.span("tap.dropped", .{ .reason = "test_domain", .uri = uri, .base_path = base_path }).end(); 157 + return; 158 + } 155 159 156 160 // detect platform from basePath if platform is unknown/other 157 161 // this handles site.standard.* documents where collection doesn't indicate platform ··· 185 189 // drop bridgy fed content entirely — low quality, pollutes the index 186 190 if (std.mem.eql(u8, is_bridgyfed, "1")) { 187 191 logfire.debug("indexer: dropping bridgy fed content {s}", .{uri}); 192 + logfire.span("tap.dropped", .{ .reason = "bridgy_fed", .uri = uri, .pub_uri = pub_uri }).end(); 188 193 return; 189 194 } 190 195