atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

bump pg.zig to fix pool multi-waiter panic, make pool size configurable

pg.zig pool used Io.Event with reset() for connection-available signaling.
Io.Event.reset() assumes no pending waiters — violated when 16 frame
workers contend for 5 connections. Updated pg.zig replaces Event with a
monotonic futex counter (safe for any number of concurrent waiters).

Also:
- make DB pool size configurable via DB_POOL_SIZE env (default 20)
- previous hardcoded 5 guaranteed constant contention with 16 workers

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e5ed0d1e f996812d

+15 -14
+2 -2
build.zig.zon
··· 13 13 .hash = "websocket-0.1.0-ZPISdSmqAwCbwcFtrAQC_q9cegdw-iHyrjCftgfMz-Nf", 14 14 }, 15 15 .pg = .{ 16 - .url = "git+https://github.com/zzstoatzz/pg.zig?ref=dev#9ef9a0f407c67ad32958eec3370aa561d53c192b", 17 - .hash = "pg-0.0.0-Wp_7ga-BBgBHHXZdYhK8gv2IJXXqUKeIdkTuluGSnipW", 16 + .url = "git+https://github.com/zzstoatzz/pg.zig?ref=dev#5ce2355b1d851075523709c7d3068dcdb0224322", 17 + .hash = "pg-0.0.0-Wp_7geqBBgCL9JCOinXO9PKSlJ3kaWSsc6QCTR3t4C4Z", 18 18 }, 19 19 .rocksdb = .{ 20 20 .url = "https://github.com/zzstoatzz/rocksdb-zig/archive/cdef67bb2141df4eb4906d367d60285d1527ab3f.tar.gz",
+11 -11
src/event_log.zig
··· 140 140 return self.outbuf.capacity; 141 141 } 142 142 143 - pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8, io: Io) !DiskPersist { 143 + pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8, db_pool_size: u16, io: Io) !DiskPersist { 144 144 // ensure directory exists 145 145 try Io.Dir.cwd().createDirPath(io, dir_path); 146 146 ··· 149 149 150 150 // connect to Postgres 151 151 const uri = std.Uri.parse(database_url) catch return error.InvalidDatabaseUrl; 152 - const pool = try pg.Pool.initUri(allocator, io, uri, .{ .size = 5 }); 152 + const pool = try pg.Pool.initUri(allocator, io, uri, .{ .size = db_pool_size }); 153 153 errdefer pool.deinit(); 154 154 155 155 // create tables (matching indigo's Go relay schema) ··· 1216 1216 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1217 1217 defer std.testing.allocator.free(dir_path); 1218 1218 1219 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1219 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1220 1220 defer dp.deinit(); 1221 1221 1222 1222 // persist some events (sync flush, no background thread) ··· 1259 1259 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1260 1260 defer std.testing.allocator.free(dir_path); 1261 1261 1262 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1262 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1263 1263 defer dp.deinit(); 1264 1264 1265 1265 _ = try dp.persist(.commit, 1, "a"); ··· 1295 1295 1296 1296 // write some events 1297 1297 { 1298 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1298 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1299 1299 defer dp.deinit(); 1300 1300 _ = try dp.persist(.commit, 1, "x"); 1301 1301 _ = try dp.persist(.commit, 2, "y"); ··· 1307 1307 1308 1308 // reinit — should recover seq 1309 1309 { 1310 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1310 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1311 1311 defer dp.deinit(); 1312 1312 try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?); 1313 1313 const seq4 = try dp.persist(.commit, 1, "w"); ··· 1324 1324 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1325 1325 defer std.testing.allocator.free(dir_path); 1326 1326 1327 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1327 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1328 1328 defer dp.deinit(); 1329 1329 1330 1330 _ = try dp.persist(.commit, 42, "secret-data"); ··· 1359 1359 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1360 1360 defer std.testing.allocator.free(dir_path); 1361 1361 1362 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1362 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1363 1363 defer dp.deinit(); 1364 1364 1365 1365 // first call creates the account ··· 1387 1387 1388 1388 var uid1: u64 = undefined; 1389 1389 { 1390 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1390 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1391 1391 defer dp.deinit(); 1392 1392 uid1 = try dp.uidForDid("did:plc:carol"); 1393 1393 } 1394 1394 1395 1395 // reinit — UID should be the same from database 1396 1396 { 1397 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1397 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1398 1398 defer dp.deinit(); 1399 1399 const uid1_again = try dp.uidForDid("did:plc:carol"); 1400 1400 try std.testing.expectEqual(uid1, uid1_again); ··· 1410 1410 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1411 1411 defer std.testing.allocator.free(dir_path); 1412 1412 1413 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, std.testing.io); 1413 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url, 5, std.testing.io); 1414 1414 defer dp.deinit(); 1415 1415 1416 1416 const alice_uid = try dp.uidForDid("did:plc:alice");
+2 -1
src/main.zig
··· 182 182 const max_events_gb = parseEnvInt(u64, "RELAY_MAX_EVENTS_GB", 100); 183 183 const frame_workers = parseEnvInt(u16, "FRAME_WORKERS", 16); 184 184 const frame_queue_capacity = parseEnvInt(u16, "FRAME_QUEUE_CAPACITY", 4096); 185 + const db_pool_size = parseEnvInt(u16, "DB_POOL_SIZE", 20); 185 186 186 187 // install signal handlers (including SIGPIPE ignore) 187 188 installSignalHandlers(); ··· 196 197 197 198 // init disk persistence (indigo-compatible diskpersist format + Postgres index) 198 199 const database_url = getenv("DATABASE_URL") orelse "postgres://relay:relay@localhost:5432/relay"; 199 - var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url, io) catch |err| { 200 + var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url, db_pool_size, io) catch |err| { 200 201 log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) }); 201 202 return err; 202 203 };