semantic bufo search find-bufo.com
bufo
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

migrate to zig 0.16 (zat v0.3.0-alpha.7)

- Io.Threaded with explicit io passing via init() per module
- Io.Mutex, Io.Timestamp, Io.net, smp_allocator
- thread-per-connection, ArrayList .empty
- Dockerfile updated to zig 0.16.0-dev.3059

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 7aeb4ff5 2d07d3b1

+240 -193
+3 -3
bot/Dockerfile
··· 7 7 xz-utils \ 8 8 && rm -rf /var/lib/apt/lists/* 9 9 10 - # install zig 0.15.2 11 - RUN curl -L https://ziglang.org/download/0.15.2/zig-x86_64-linux-0.15.2.tar.xz | tar -xJ -C /usr/local \ 12 - && ln -s /usr/local/zig-x86_64-linux-0.15.2/zig /usr/local/bin/zig 10 + # install zig 0.16.0-dev.3059+42e33db9d 11 + RUN curl -L https://ziglang.org/builds/zig-x86_64-linux-0.16.0-dev.3059+42e33db9d.tar.xz | tar -xJ -C /usr/local \ 12 + && ln -s /usr/local/zig-x86_64-linux-0.16.0-dev.3059+42e33db9d/zig /usr/local/bin/zig 13 13 14 14 WORKDIR /app 15 15 COPY build.zig build.zig.zon ./
+3 -3
bot/build.zig.zon
··· 2 2 .name = .bufo_bot, 3 3 .version = "0.0.1", 4 4 .fingerprint = 0xe143490f82fa96db, 5 - .minimum_zig_version = "0.15.0", 5 + .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.15", 9 - .hash = "zat-0.2.15-5PuC7tjwBAAR5tL2Nc5LfSUaUORA6vONr1IdgaU4vAvo", 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.7.tar.gz", 9 + .hash = "zat-0.3.0-alpha.7-5PuC7uNjBQDv28db31DEKkFn1tU5I4f1GfJs-RrG8_pS", 10 10 }, 11 11 }, 12 12 .paths = .{
+19 -8
bot/src/bsky.zig
··· 5 5 const Allocator = mem.Allocator; 6 6 const Io = std.Io; 7 7 8 + // module state — initialized via init(), not from a global 9 + var io: Io = undefined; 10 + 11 + pub fn init(app_io: Io) void { 12 + io = app_io; 13 + } 14 + 15 + fn timestamp() i64 { 16 + return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 17 + } 18 + 8 19 pub const BskyClient = struct { 9 20 allocator: Allocator, 10 21 handle: []const u8, ··· 13 24 did: ?[]const u8 = null, 14 25 pds_host: ?[]const u8 = null, 15 26 16 - pub fn init(allocator: Allocator, handle: []const u8, app_password: []const u8) BskyClient { 27 + pub fn initClient(allocator: Allocator, handle: []const u8, app_password: []const u8) BskyClient { 17 28 return .{ 18 29 .allocator = allocator, 19 30 .handle = handle, ··· 28 39 } 29 40 30 41 fn httpClient(self: *BskyClient) http.Client { 31 - return .{ .allocator = self.allocator }; 42 + return .{ .allocator = self.allocator, .io = io }; 32 43 } 33 44 34 45 pub fn login(self: *BskyClient) !void { ··· 37 48 var client = self.httpClient(); 38 49 defer client.deinit(); 39 50 40 - var body_buf: std.ArrayList(u8) = .{}; 51 + var body_buf: std.ArrayList(u8) = .empty; 41 52 defer body_buf.deinit(self.allocator); 42 53 try body_buf.print(self.allocator, "{{\"identifier\":\"{s}\",\"password\":\"{s}\"}}", .{ self.handle, self.app_password }); 43 54 ··· 231 242 var client = self.httpClient(); 232 243 defer client.deinit(); 233 244 234 - var body_buf: std.ArrayList(u8) = .{}; 245 + var body_buf: std.ArrayList(u8) = .empty; 235 246 defer body_buf.deinit(self.allocator); 236 247 237 248 var ts_buf: [30]u8 = undefined; ··· 499 510 return error.VideoProcessingFailed; 500 511 } 501 512 502 - std.Thread.sleep(1 * std.time.ns_per_s); 513 + io.sleep(.{ .nanoseconds = 1 * std.time.ns_per_s }, .awake) catch {}; 503 514 } 504 515 505 516 return error.VideoTimeout; ··· 511 522 var client = self.httpClient(); 512 523 defer client.deinit(); 513 524 514 - var body_buf: std.ArrayList(u8) = .{}; 525 + var body_buf: std.ArrayList(u8) = .empty; 515 526 defer body_buf.deinit(self.allocator); 516 527 517 528 var ts_buf: [30]u8 = undefined; ··· 555 566 var client = self.httpClient(); 556 567 defer client.deinit(); 557 568 558 - var body_buf: std.ArrayList(u8) = .{}; 569 + var body_buf: std.ArrayList(u8) = .empty; 559 570 defer body_buf.deinit(self.allocator); 560 571 561 572 try body_buf.print(self.allocator, ··· 717 728 }; 718 729 719 730 fn getIsoTimestamp(buf: *[30]u8) []const u8 { 720 - const ts = std.time.timestamp(); 731 + const ts = timestamp(); 721 732 const epoch_secs: u64 = @intCast(ts); 722 733 const epoch = std.time.epoch.EpochSeconds{ .secs = epoch_secs }; 723 734 const day = epoch.getEpochDay();
+13 -12
bot/src/config.zig
··· 1 1 const std = @import("std"); 2 - const posix = std.posix; 3 2 4 3 pub const Config = struct { 5 4 bsky_handle: []const u8, ··· 8 7 min_phrase_words: u32, 9 8 posting_enabled: bool, 10 9 cooldown_minutes: u32, 11 - global_cooldown_minutes: u32, 12 10 exclude_patterns: []const u8, 13 11 stats_port: u16, 14 12 backend_url: []const u8, 15 13 16 14 pub fn fromEnv() Config { 17 15 return .{ 18 - .bsky_handle = posix.getenv("BSKY_HANDLE") orelse "find-bufo.com", 19 - .bsky_app_password = posix.getenv("BSKY_APP_PASSWORD") orelse "", 20 - .preferred_jetstream = posix.getenv("PREFERRED_JETSTREAM"), 21 - .min_phrase_words = parseU32(posix.getenv("MIN_PHRASE_WORDS"), 4), 22 - .posting_enabled = parseBool(posix.getenv("POSTING_ENABLED")), 23 - .cooldown_minutes = parseU32(posix.getenv("COOLDOWN_MINUTES"), 120), 24 - .global_cooldown_minutes = parseU32(posix.getenv("GLOBAL_COOLDOWN_MINUTES"), 30), 25 - .exclude_patterns = posix.getenv("EXCLUDE_PATTERNS") orelse "what-have-you-done,what-have-i-done,sad,crying,cant-take,knife,what-are-you-doing-with-that", 26 - .stats_port = parseU16(posix.getenv("STATS_PORT"), 8080), 27 - .backend_url = posix.getenv("BACKEND_URL") orelse "https://find-bufo.com", 16 + .bsky_handle = getenv("BSKY_HANDLE") orelse "find-bufo.com", 17 + .bsky_app_password = getenv("BSKY_APP_PASSWORD") orelse "", 18 + .preferred_jetstream = getenv("PREFERRED_JETSTREAM"), 19 + .min_phrase_words = parseU32(getenv("MIN_PHRASE_WORDS"), 4), 20 + .posting_enabled = parseBool(getenv("POSTING_ENABLED")), 21 + .cooldown_minutes = parseU32(getenv("COOLDOWN_MINUTES"), 120), 22 + .exclude_patterns = getenv("EXCLUDE_PATTERNS") orelse "what-have-you-done,what-have-i-done,sad,crying,cant-take,knife,what-are-you-doing-with-that", 23 + .stats_port = parseU16(getenv("STATS_PORT"), 8080), 24 + .backend_url = getenv("BACKEND_URL") orelse "https://find-bufo.com", 28 25 }; 29 26 } 30 27 }; 28 + 29 + fn getenv(name: [*:0]const u8) ?[]const u8 { 30 + return if (std.c.getenv(name)) |p| std.mem.span(p) else null; 31 + } 31 32 32 33 fn parseU16(str: ?[]const u8, default: u16) u16 { 33 34 if (str) |s| {
+59 -45
bot/src/main.zig
··· 4 4 const http = std.http; 5 5 const Thread = std.Thread; 6 6 const Allocator = mem.Allocator; 7 + const Io = std.Io; 7 8 const zat = @import("zat"); 8 9 const config = @import("config.zig"); 9 10 const matcher = @import("matcher.zig"); ··· 11 12 const bsky = @import("bsky.zig"); 12 13 const stats = @import("stats.zig"); 13 14 15 + // Override the default single-threaded debug_io with a proper multi-threaded instance. 16 + // Initialized in main() before any threads are spawned. 17 + var app_threaded_io: Io.Threaded = undefined; 18 + pub const std_options_debug_threaded_io: ?*Io.Threaded = &app_threaded_io; 19 + 14 20 var global_state: ?*BotState = null; 15 21 16 22 const BotState = struct { ··· 18 24 config: config.Config, 19 25 matcher: matcher.Matcher, 20 26 bsky_client: bsky.BskyClient, 21 - mutex: Thread.Mutex = .{}, 27 + mutex: Io.Mutex = Io.Mutex.init, 22 28 stats: stats.Stats, 23 29 }; 24 30 25 31 pub fn main() !void { 26 - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 27 - defer _ = gpa.deinit(); 28 - const allocator = gpa.allocator(); 32 + const allocator = std.heap.smp_allocator; 33 + 34 + // initialize the threaded Io instance before anything else 35 + app_threaded_io = Io.Threaded.init(allocator, .{}); 36 + const io = app_threaded_io.io(); 37 + 38 + // initialize module-level io for bsky and stats 39 + bsky.init(io); 40 + stats.init(io); 29 41 30 42 std.debug.print("starting bufo bot...\n", .{}); 31 43 ··· 33 45 34 46 // load bufos from API 35 47 var m = matcher.Matcher.init(allocator, cfg.min_phrase_words); 36 - try loadBufos(allocator, &m, cfg.exclude_patterns); 48 + try loadBufos(allocator, &m, cfg.exclude_patterns, io); 37 49 std.debug.print("loaded {} bufos with >= {} word phrases\n", .{ m.count(), cfg.min_phrase_words }); 38 50 39 51 if (m.count() == 0) { ··· 42 54 } 43 55 44 56 // init bluesky client 45 - var bsky_client = bsky.BskyClient.init(allocator, cfg.bsky_handle, cfg.bsky_app_password); 57 + var bsky_client = bsky.BskyClient.initClient(allocator, cfg.bsky_handle, cfg.bsky_app_password); 46 58 defer bsky_client.deinit(); 47 59 48 60 if (cfg.posting_enabled) { ··· 52 64 } 53 65 54 66 // init stats 55 - var bot_stats = stats.Stats.init(allocator); 67 + var bot_stats = stats.Stats.initStats(allocator); 56 68 defer bot_stats.deinit(); 57 69 bot_stats.setBufosLoaded(@intCast(m.count())); 58 70 ··· 72 84 73 85 // startup scan: bootstrap tracking and clean stale posts 74 86 if (cfg.posting_enabled) { 75 - startupScan(&state); 87 + startupScan(&state, io); 76 88 } 77 89 78 90 // start stats server on background thread 79 - var stats_server = stats.StatsServer.init(allocator, &state.stats, cfg.stats_port); 91 + var stats_server = stats.StatsServer.initServer(allocator, &state.stats, cfg.stats_port); 80 92 const stats_thread = Thread.spawn(.{}, stats.StatsServer.run, .{&stats_server}) catch |err| { 81 93 std.debug.print("failed to start stats server: {}\n", .{err}); 82 94 return err; ··· 104 116 hosts_len += 1; 105 117 } 106 118 107 - var client = zat.JetstreamClient.init(allocator, .{ 119 + var client = zat.JetstreamClient.init(io, allocator, .{ 108 120 .hosts = hosts_buf[0..hosts_len], 109 121 .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.graph.block", "app.bsky.feed.postgate" }, 110 122 }); 111 123 defer client.deinit(); 112 - client.subscribe(&handler); 124 + client.subscribe(&handler) catch |err| { 125 + std.debug.print("jetstream subscription ended: {s}\n", .{@errorName(err)}); 126 + }; 113 127 } 114 128 115 129 fn onConnect(host: []const u8) void { ··· 120 134 121 135 fn onPost(post: jetstream.Post) void { 122 136 const state = global_state orelse return; 137 + const io = app_threaded_io.io(); 123 138 124 139 state.stats.incPostsChecked(); 125 140 ··· 135 150 return; 136 151 } 137 152 138 - state.mutex.lock(); 139 - defer state.mutex.unlock(); 140 - 141 - const now = std.time.timestamp(); 153 + state.mutex.lockUncancelable(io); 154 + defer state.mutex.unlock(io); 142 155 143 - // global cooldown: minimum time between any post regardless of bufo 144 - const global_cooldown_secs: i64 = @intCast(@as(u64, state.config.global_cooldown_minutes) * 60); 145 - if (state.stats.getLastGlobalPost()) |last_global| { 146 - if (now - last_global < global_cooldown_secs) { 147 - state.stats.incCooldownsHit(); 148 - std.debug.print("global cooldown: {} min remaining, skipping\n", .{@divTrunc(@as(u64, @intCast(global_cooldown_secs - (now - last_global))), 60)}); 149 - return; 150 - } 151 - } 156 + const now = timestamp(io); 152 157 153 158 // per-bufo cooldown (scaled by match frequency, persisted across restarts) 154 159 const base_secs: u64 = @as(u64, state.config.cooldown_minutes) * 60; ··· 242 247 // upload as image 243 248 const content_type = if (mem.endsWith(u8, match.url, ".png")) 244 249 "image/png" 250 + else if (mem.endsWith(u8, match.url, ".webp")) 251 + "image/webp" 245 252 else 246 253 "image/jpeg"; 247 254 ··· 259 266 // track our post for cleanup on delete/block 260 267 state.stats.addTrackedPost(our_rkey, post.uri, post.did, now); 261 268 262 - // update cooldown caches (persisted to disk) 269 + // update cooldown cache (persisted to disk) 263 270 state.stats.setLastPosted(match.name, now); 264 - state.stats.setLastGlobalPost(now); 265 271 } 266 272 267 273 fn onDelete(did: []const u8, rkey: []const u8) void { 268 274 const state = global_state orelse return; 275 + const io = app_threaded_io.io(); 269 276 270 277 // construct the original URI and check if we quote-posted it 271 278 var uri_buf: [256]u8 = undefined; ··· 276 283 277 284 std.debug.print("original post deleted ({s}), deleting our quote-post {s}\n", .{ uri, our_rkey }); 278 285 279 - state.mutex.lock(); 280 - defer state.mutex.unlock(); 286 + state.mutex.lockUncancelable(io); 287 + defer state.mutex.unlock(io); 281 288 282 289 state.bsky_client.deleteRecord(our_rkey) catch |err| { 283 290 std.debug.print("failed to delete our post: {}\n", .{err}); ··· 287 294 288 295 fn onBlock(blocker_did: []const u8, subject_did: []const u8) void { 289 296 const state = global_state orelse return; 297 + const io = app_threaded_io.io(); 290 298 291 299 // only care if someone is blocking us 292 300 const our_did = state.bsky_client.did orelse return; ··· 294 302 295 303 std.debug.print("blocked by {s}, cleaning up our quote-posts of their content\n", .{blocker_did}); 296 304 297 - state.mutex.lock(); 298 - defer state.mutex.unlock(); 305 + state.mutex.lockUncancelable(io); 306 + defer state.mutex.unlock(io); 299 307 300 308 // collect and remove tracked posts from this DID 301 309 var rkey_buf: [64][]const u8 = undefined; 302 310 const rkeys = state.stats.removeByOriginalDid(blocker_did, &rkey_buf); 303 311 304 - for (rkeys) |rkey| { 305 - state.bsky_client.deleteRecord(rkey) catch |err| { 306 - std.debug.print("failed to delete post {s}: {}\n", .{ rkey, err }); 312 + for (rkeys) |rk| { 313 + state.bsky_client.deleteRecord(rk) catch |err| { 314 + std.debug.print("failed to delete post {s}: {}\n", .{ rk, err }); 307 315 state.stats.incErrors(); 308 316 }; 309 - state.allocator.free(rkey); 317 + state.allocator.free(rk); 310 318 } 311 319 312 320 if (rkeys.len > 0) { ··· 316 324 317 325 fn onDetach(_: []const u8, record: json.Value) void { 318 326 const state = global_state orelse return; 327 + const io = app_threaded_io.io(); 319 328 320 329 // postgate record has detachedEmbeddingUris: array of AT-URIs that should no longer embed 321 330 const uris = record.object.get("detachedEmbeddingUris") orelse return; ··· 323 332 324 333 const our_did = state.bsky_client.did orelse return; 325 334 326 - state.mutex.lock(); 327 - defer state.mutex.unlock(); 335 + state.mutex.lockUncancelable(io); 336 + defer state.mutex.unlock(io); 328 337 329 338 for (uris.array.items) |uri_val| { 330 339 if (uri_val != .string) continue; ··· 340 349 if (!mem.eql(u8, uri_did, our_did)) continue; 341 350 342 351 _ = parts.next(); // collection 343 - const rkey = parts.next() orelse continue; 352 + const rk = parts.next() orelse continue; 344 353 345 - if (state.stats.removeByOurRkey(rkey)) { 346 - std.debug.print("post detached, deleting our quote-post {s}\n", .{rkey}); 347 - state.bsky_client.deleteRecord(rkey) catch |err| { 348 - std.debug.print("failed to delete detached post {s}: {}\n", .{ rkey, err }); 354 + if (state.stats.removeByOurRkey(rk)) { 355 + std.debug.print("post detached, deleting our quote-post {s}\n", .{rk}); 356 + state.bsky_client.deleteRecord(rk) catch |err| { 357 + std.debug.print("failed to delete detached post {s}: {}\n", .{ rk, err }); 349 358 state.stats.incErrors(); 350 359 }; 351 360 } 352 361 } 353 362 } 354 363 355 - fn startupScan(state: *BotState) void { 364 + fn startupScan(state: *BotState, io_arg: Io) void { 365 + _ = io_arg; 356 366 std.debug.print("running startup scan...\n", .{}); 357 367 358 368 var feed_buf: [100]bsky.BskyClient.FeedPost = undefined; ··· 363 373 364 374 var bootstrapped: usize = 0; 365 375 var cleaned: usize = 0; 366 - const now = std.time.timestamp(); 376 + const now = timestamp(app_threaded_io.io()); 367 377 368 378 for (posts) |post| { 369 379 defer { ··· 395 405 std.debug.print("startup scan complete: {} bootstrapped, {} cleaned\n", .{ bootstrapped, cleaned }); 396 406 } 397 407 398 - fn loadBufos(allocator: Allocator, m: *matcher.Matcher, exclude_patterns: []const u8) !void { 399 - var client = http.Client{ .allocator = allocator }; 408 + fn timestamp(io_arg: Io) i64 { 409 + return @intCast(@divFloor(Io.Timestamp.now(io_arg, .real).nanoseconds, std.time.ns_per_s)); 410 + } 411 + 412 + fn loadBufos(allocator: Allocator, m: *matcher.Matcher, exclude_patterns: []const u8, io_arg: Io) !void { 413 + var client: http.Client = .{ .allocator = allocator, .io = io_arg }; 400 414 defer client.deinit(); 401 415 402 416 var url_buf: [512]u8 = undefined;
+5 -3
bot/src/matcher.zig
··· 14 14 }; 15 15 16 16 pub const Matcher = struct { 17 - bufos: std.ArrayList(Bufo) = .{}, 17 + bufos: std.ArrayList(Bufo) = .empty, 18 18 allocator: Allocator, 19 19 min_words: u32, 20 20 ··· 54 54 } 55 55 56 56 pub fn findMatch(self: *Matcher, text: []const u8) ?Match { 57 - var words: std.ArrayList([]const u8) = .{}; 57 + var words: std.ArrayList([]const u8) = .empty; 58 58 defer words.deinit(self.allocator); 59 59 60 60 var i: usize = 0; ··· 101 101 end -= 4; 102 102 } else if (mem.endsWith(u8, name, ".jpeg")) { 103 103 end -= 5; 104 + } else if (mem.endsWith(u8, name, ".webp")) { 105 + end -= 5; 104 106 } 105 107 106 108 const slug = name[start..end]; 107 109 108 - var words: std.ArrayList([]const u8) = .{}; 110 + var words: std.ArrayList([]const u8) = .empty; 109 111 errdefer { 110 112 for (words.items) |word| allocator.free(word); 111 113 words.deinit(allocator);
+136 -117
bot/src/stats.zig
··· 1 1 const std = @import("std"); 2 2 const mem = std.mem; 3 3 const json = std.json; 4 - const fs = std.fs; 5 4 const Allocator = mem.Allocator; 6 5 const Thread = std.Thread; 6 + const Io = std.Io; 7 + const net = Io.net; 8 + const http = std.http; 7 9 const template = @import("stats_template.zig"); 8 10 9 11 const STATS_PATH = "/data/stats.json"; 12 + const STATS_TMP_PATH = "/data/stats.json.tmp"; 13 + 14 + // module state — initialized via init(), not from a global 15 + var io: Io = undefined; 16 + 17 + pub fn init(app_io: Io) void { 18 + io = app_io; 19 + } 20 + 21 + fn timestamp() i64 { 22 + return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 23 + } 10 24 11 25 pub const TrackedPost = struct { 12 26 our_rkey: []const u8, ··· 29 43 jetstream_host_buf: [256]u8 = undefined, 30 44 jetstream_host_len: std.atomic.Value(usize) = .init(0), 31 45 46 + last_snapshot_hour: i64 = -1, // hour-of-day of last snapshot (-1 = none yet) 47 + 32 48 // track per-bufo match counts: name -> {count, url} 33 49 bufo_matches: std.StringHashMap(BufoMatchData), 34 - bufo_mutex: Thread.Mutex = .{}, 50 + bufo_mutex: Io.Mutex = Io.Mutex.init, 35 51 // track last post time per bufo (persisted to survive restarts) 36 52 last_posted: std.StringHashMap(i64), 37 - // global cooldown: timestamp of most recent post (any bufo) 38 - last_global_post: ?i64 = null, 39 53 // track our quote-posts for cleanup on delete/block 40 54 tracked_posts: std.ArrayList(TrackedPost), 41 55 ··· 44 58 url: []const u8, 45 59 }; 46 60 47 - pub fn init(allocator: Allocator) Stats { 61 + pub fn initStats(allocator: Allocator) Stats { 48 62 var self = Stats{ 49 63 .allocator = allocator, 50 - .start_time = std.time.timestamp(), 64 + .start_time = timestamp(), 51 65 .bufo_matches = std.StringHashMap(BufoMatchData).init(allocator), 52 66 .last_posted = std.StringHashMap(i64).init(allocator), 53 - .tracked_posts = .{}, 67 + .tracked_posts = .empty, 54 68 }; 55 69 self.load(); 56 70 return self; ··· 78 92 } 79 93 80 94 fn load(self: *Stats) void { 81 - const file = fs.openFileAbsolute(STATS_PATH, .{}) catch return; 82 - defer file.close(); 95 + const file = Io.Dir.openFileAbsolute(io, STATS_PATH, .{}) catch return; 96 + defer file.close(io); 83 97 84 - var buf: [64 * 1024]u8 = undefined; 85 - const len = file.readAll(&buf) catch return; 98 + var buf: [256 * 1024]u8 = undefined; 99 + const len = file.readPositionalAll(io, &buf, 0) catch return; 86 100 if (len == 0) return; 87 101 88 102 const parsed = json.parseFromSlice(json.Value, self.allocator, buf[0..len], .{}) catch return; ··· 111 125 if (root.get("cumulative_uptime")) |v| if (v == .integer) { 112 126 self.prior_uptime = @intCast(@max(0, v.integer)); 113 127 }; 114 - if (root.get("last_global_post")) |v| if (v == .integer) { 115 - self.last_global_post = v.integer; 116 - }; 117 - 118 128 // load bufo_matches (or legacy bufo_posts) 119 129 const matches_key = if (root.get("bufo_matches") != null) "bufo_matches" else "bufo_posts"; 120 130 if (root.get(matches_key)) |bp| { ··· 215 225 } 216 226 217 227 pub fn save(self: *Stats) void { 218 - self.bufo_mutex.lock(); 219 - defer self.bufo_mutex.unlock(); 228 + self.bufo_mutex.lockUncancelable(io); 229 + defer self.bufo_mutex.unlock(io); 220 230 self.saveUnlocked(); 221 231 } 222 232 223 233 pub fn totalUptime(self: *Stats) i64 { 224 - const now = std.time.timestamp(); 234 + const now = timestamp(); 225 235 const session: i64 = now - self.start_time; 226 236 return @as(i64, @intCast(self.prior_uptime)) + session; 227 237 } ··· 235 245 } 236 246 237 247 pub fn incBufoMatch(self: *Stats, bufo_name: []const u8, bufo_url: []const u8) void { 238 - self.bufo_mutex.lock(); 239 - defer self.bufo_mutex.unlock(); 248 + self.bufo_mutex.lockUncancelable(io); 249 + defer self.bufo_mutex.unlock(io); 240 250 241 251 if (self.bufo_matches.getPtr(bufo_name)) |data| { 242 252 data.count += 1; ··· 260 270 261 271 fn saveUnlocked(self: *Stats) void { 262 272 // called when mutex is already held 263 - const file = fs.createFileAbsolute(STATS_PATH, .{}) catch return; 264 - defer file.close(); 273 + const file = Io.Dir.createFileAbsolute(io, STATS_TMP_PATH, .{}) catch return; 265 274 266 - const now = std.time.timestamp(); 275 + const now = timestamp(); 267 276 const session_uptime: u64 = @intCast(@max(0, now - self.start_time)); 268 277 const total_uptime = self.prior_uptime + session_uptime; 269 278 270 - var buf: [64 * 1024]u8 = undefined; 271 - var fbs = std.io.fixedBufferStream(&buf); 272 - const writer = fbs.writer(); 279 + var buf: [256 * 1024]u8 = undefined; 280 + var w: Io.Writer = .fixed(&buf); 273 281 274 - writer.writeAll("{") catch return; 275 - std.fmt.format(writer, "\"posts_checked\":{},", .{self.posts_checked.load(.monotonic)}) catch return; 276 - std.fmt.format(writer, "\"matches_found\":{},", .{self.matches_found.load(.monotonic)}) catch return; 277 - std.fmt.format(writer, "\"posts_created\":{},", .{self.posts_created.load(.monotonic)}) catch return; 278 - std.fmt.format(writer, "\"cooldowns_hit\":{},", .{self.cooldowns_hit.load(.monotonic)}) catch return; 279 - std.fmt.format(writer, "\"blocks_respected\":{},", .{self.blocks_respected.load(.monotonic)}) catch return; 280 - std.fmt.format(writer, "\"errors\":{},", .{self.errors.load(.monotonic)}) catch return; 281 - std.fmt.format(writer, "\"cumulative_uptime\":{},", .{total_uptime}) catch return; 282 - if (self.last_global_post) |lgp| { 283 - std.fmt.format(writer, "\"last_global_post\":{},", .{lgp}) catch return; 284 - } 285 - writer.writeAll("\"bufo_matches\":{") catch return; 282 + w.print("{{", .{}) catch return; 283 + w.print("\"posts_checked\":{},", .{self.posts_checked.load(.monotonic)}) catch return; 284 + w.print("\"matches_found\":{},", .{self.matches_found.load(.monotonic)}) catch return; 285 + w.print("\"posts_created\":{},", .{self.posts_created.load(.monotonic)}) catch return; 286 + w.print("\"cooldowns_hit\":{},", .{self.cooldowns_hit.load(.monotonic)}) catch return; 287 + w.print("\"blocks_respected\":{},", .{self.blocks_respected.load(.monotonic)}) catch return; 288 + w.print("\"errors\":{},", .{self.errors.load(.monotonic)}) catch return; 289 + w.print("\"cumulative_uptime\":{},", .{total_uptime}) catch return; 290 + w.print("\"bufo_matches\":{{", .{}) catch return; 286 291 287 292 var first = true; 288 293 var iter = self.bufo_matches.iterator(); 289 294 while (iter.next()) |entry| { 290 - if (!first) writer.writeAll(",") catch return; 295 + if (!first) w.print(",", .{}) catch return; 291 296 first = false; 292 - std.fmt.format(writer, "\"{s}\":{{\"count\":{},\"url\":\"{s}\"}}", .{ entry.key_ptr.*, entry.value_ptr.count, entry.value_ptr.url }) catch return; 297 + w.print("\"{s}\":{{\"count\":{},\"url\":\"{s}\"}}", .{ entry.key_ptr.*, entry.value_ptr.count, entry.value_ptr.url }) catch return; 293 298 } 294 299 295 - writer.writeAll("},") catch return; 300 + w.print("}},", .{}) catch return; 296 301 297 302 // write last_posted timestamps 298 - writer.writeAll("\"last_posted\":{") catch return; 303 + w.print("\"last_posted\":{{", .{}) catch return; 299 304 var lp_first = true; 300 305 var lp_iter = self.last_posted.iterator(); 301 306 while (lp_iter.next()) |entry| { 302 - if (!lp_first) writer.writeAll(",") catch return; 307 + if (!lp_first) w.print(",", .{}) catch return; 303 308 lp_first = false; 304 - std.fmt.format(writer, "\"{s}\":{}", .{ entry.key_ptr.*, entry.value_ptr.* }) catch return; 309 + w.print("\"{s}\":{}", .{ entry.key_ptr.*, entry.value_ptr.* }) catch return; 305 310 } 306 - writer.writeAll("},") catch return; 311 + w.print("}},", .{}) catch return; 307 312 308 313 // write tracked posts 309 - writer.writeAll("\"tracked_posts\":[") catch return; 314 + w.print("\"tracked_posts\":[", .{}) catch return; 310 315 for (self.tracked_posts.items, 0..) |tp, i| { 311 - if (i > 0) writer.writeAll(",") catch return; 312 - std.fmt.format(writer, "{{\"our_rkey\":\"{s}\",\"original_uri\":\"{s}\",\"original_did\":\"{s}\",\"timestamp\":{}}}", .{ tp.our_rkey, tp.original_uri, tp.original_did, tp.timestamp }) catch return; 316 + if (i > 0) w.print(",", .{}) catch return; 317 + w.print("{{\"our_rkey\":\"{s}\",\"original_uri\":\"{s}\",\"original_did\":\"{s}\",\"timestamp\":{}}}", .{ tp.our_rkey, tp.original_uri, tp.original_did, tp.timestamp }) catch return; 313 318 } 314 - writer.writeAll("]}") catch return; 319 + w.print("]}}", .{}) catch return; 315 320 316 - file.writeAll(fbs.getWritten()) catch return; 321 + const written = buf[0..w.end]; 322 + file.writeStreamingAll(io, written) catch return; 323 + file.close(io); 324 + Io.Dir.renameAbsolute(STATS_TMP_PATH, STATS_PATH, io) catch return; 325 + 326 + // hourly snapshot: copy to /data/stats.snapshot.HH (24 rolling files) 327 + const hour = @mod(@divTrunc(now, 3600), 24); 328 + if (hour != self.last_snapshot_hour) { 329 + self.last_snapshot_hour = hour; 330 + var snap_path: [48]u8 = undefined; 331 + const snap = std.fmt.bufPrint(&snap_path, "/data/stats.snapshot.{d:0>2}", .{@as(u64, @intCast(hour))}) catch return; 332 + const snap_file = Io.Dir.createFileAbsolute(io, snap, .{}) catch return; 333 + snap_file.writeStreamingAll(io, written) catch {}; 334 + snap_file.close(io); 335 + } 317 336 } 318 337 319 338 /// Quadratic cooldown scaling: bufos that dominate the feed get exponentially longer cooldowns. 320 - /// At 10% of matches: ~2x base. At 30%: ~10x base. At 50%: ~26x base. 321 - const COOLDOWN_SCALE_FACTOR: f64 = 100.0; 339 + /// At 5% of matches: ~1.5x base. At 20%: ~9x base. At 33%: ~23x base. 340 + const COOLDOWN_SCALE_FACTOR: f64 = 200.0; 322 341 323 342 pub fn getCooldownSeconds(self: *Stats, bufo_name: []const u8, base_secs: u64) u64 { 324 - self.bufo_mutex.lock(); 325 - defer self.bufo_mutex.unlock(); 343 + self.bufo_mutex.lockUncancelable(io); 344 + defer self.bufo_mutex.unlock(io); 326 345 327 346 const bufo_count: u64 = if (self.bufo_matches.get(bufo_name)) |data| data.count else 0; 328 347 ··· 334 353 if (total_count == 0) return base_secs; 335 354 336 355 const ratio = @as(f64, @floatFromInt(bufo_count)) / @as(f64, @floatFromInt(total_count)); 356 + // rare bufos (< 1% of matches) post immediately — no cooldown 357 + if (ratio < 0.01) return 0; 337 358 // quadratic: dominant bufos get penalized much harder 338 359 const multiplier = 1.0 + COOLDOWN_SCALE_FACTOR * ratio * ratio; 339 360 return @intFromFloat(@as(f64, @floatFromInt(base_secs)) * multiplier); 340 361 } 341 362 342 363 pub fn getLastPosted(self: *Stats, bufo_name: []const u8) ?i64 { 343 - self.bufo_mutex.lock(); 344 - defer self.bufo_mutex.unlock(); 364 + self.bufo_mutex.lockUncancelable(io); 365 + defer self.bufo_mutex.unlock(io); 345 366 return self.last_posted.get(bufo_name); 346 367 } 347 368 348 - pub fn setLastPosted(self: *Stats, bufo_name: []const u8, timestamp: i64) void { 349 - self.bufo_mutex.lock(); 350 - defer self.bufo_mutex.unlock(); 369 + pub fn setLastPosted(self: *Stats, bufo_name: []const u8, ts: i64) void { 370 + self.bufo_mutex.lockUncancelable(io); 371 + defer self.bufo_mutex.unlock(io); 351 372 if (self.last_posted.getPtr(bufo_name)) |ptr| { 352 - ptr.* = timestamp; 373 + ptr.* = ts; 353 374 } else { 354 375 const key = self.allocator.dupe(u8, bufo_name) catch return; 355 - self.last_posted.put(key, timestamp) catch { 376 + self.last_posted.put(key, ts) catch { 356 377 self.allocator.free(key); 357 378 }; 358 379 } 359 380 self.saveUnlocked(); 360 381 } 361 382 362 - pub fn getLastGlobalPost(self: *Stats) ?i64 { 363 - self.bufo_mutex.lock(); 364 - defer self.bufo_mutex.unlock(); 365 - return self.last_global_post; 366 - } 367 - 368 - pub fn setLastGlobalPost(self: *Stats, timestamp: i64) void { 369 - self.bufo_mutex.lock(); 370 - defer self.bufo_mutex.unlock(); 371 - self.last_global_post = timestamp; 372 - self.saveUnlocked(); 373 - } 374 - 375 - pub fn addTrackedPost(self: *Stats, our_rkey: []const u8, original_uri: []const u8, original_did: []const u8, timestamp: i64) void { 376 - self.bufo_mutex.lock(); 377 - defer self.bufo_mutex.unlock(); 383 + pub fn addTrackedPost(self: *Stats, our_rkey: []const u8, original_uri: []const u8, original_did: []const u8, ts: i64) void { 384 + self.bufo_mutex.lockUncancelable(io); 385 + defer self.bufo_mutex.unlock(io); 378 386 379 387 const rkey = self.allocator.dupe(u8, our_rkey) catch return; 380 388 const uri = self.allocator.dupe(u8, original_uri) catch { ··· 390 398 .our_rkey = rkey, 391 399 .original_uri = uri, 392 400 .original_did = did, 393 - .timestamp = timestamp, 401 + .timestamp = ts, 394 402 }) catch { 395 403 self.allocator.free(rkey); 396 404 self.allocator.free(uri); ··· 402 410 403 411 /// atomically find+remove a tracked post by original URI, returning the caller-owned rkey 404 412 pub fn removeByOriginalUri(self: *Stats, uri: []const u8) ?[]const u8 { 405 - self.bufo_mutex.lock(); 406 - defer self.bufo_mutex.unlock(); 413 + self.bufo_mutex.lockUncancelable(io); 414 + defer self.bufo_mutex.unlock(io); 407 415 408 416 for (self.tracked_posts.items, 0..) |tp, i| { 409 417 if (mem.eql(u8, tp.original_uri, uri)) { ··· 419 427 420 428 /// find+remove a tracked post by our rkey, returning true if found 421 429 pub fn removeByOurRkey(self: *Stats, rkey: []const u8) bool { 422 - self.bufo_mutex.lock(); 423 - defer self.bufo_mutex.unlock(); 430 + self.bufo_mutex.lockUncancelable(io); 431 + defer self.bufo_mutex.unlock(io); 424 432 425 433 for (self.tracked_posts.items, 0..) |tp, i| { 426 434 if (mem.eql(u8, tp.our_rkey, rkey)) { ··· 437 445 438 446 /// check if a given rkey is already tracked 439 447 pub fn isTracked(self: *Stats, rkey: []const u8) bool { 440 - self.bufo_mutex.lock(); 441 - defer self.bufo_mutex.unlock(); 448 + self.bufo_mutex.lockUncancelable(io); 449 + defer self.bufo_mutex.unlock(io); 442 450 443 451 for (self.tracked_posts.items) |tp| { 444 452 if (mem.eql(u8, tp.our_rkey, rkey)) return true; ··· 448 456 449 457 /// collect rkeys of tracked posts matching a given original DID, then remove them 450 458 pub fn removeByOriginalDid(self: *Stats, did: []const u8, buf: [][]const u8) [][]const u8 { 451 - self.bufo_mutex.lock(); 452 - defer self.bufo_mutex.unlock(); 459 + self.bufo_mutex.lockUncancelable(io); 460 + defer self.bufo_mutex.unlock(io); 453 461 454 462 var count: usize = 0; 455 463 var i: usize = 0; ··· 469 477 } 470 478 471 479 pub fn pruneOldPosts(self: *Stats, max_age_secs: i64) void { 472 - self.bufo_mutex.lock(); 473 - defer self.bufo_mutex.unlock(); 480 + self.bufo_mutex.lockUncancelable(io); 481 + defer self.bufo_mutex.unlock(io); 474 482 475 - const now = std.time.timestamp(); 483 + const now = timestamp(); 476 484 var i: usize = 0; 477 485 var pruned: usize = 0; 478 486 while (i < self.tracked_posts.items.len) { ··· 555 563 }; 556 564 557 565 // collect top bufos 558 - var top_bufos: std.ArrayList(BufoEntry) = .{}; 566 + var top_bufos: std.ArrayList(BufoEntry) = .empty; 559 567 defer top_bufos.deinit(allocator); 560 568 561 569 { 562 - self.bufo_mutex.lock(); 563 - defer self.bufo_mutex.unlock(); 570 + self.bufo_mutex.lockUncancelable(io); 571 + defer self.bufo_mutex.unlock(io); 564 572 565 573 var iter = self.bufo_matches.iterator(); 566 574 while (iter.next()) |entry| { ··· 572 580 mem.sort(BufoEntry, top_bufos.items, {}, BufoEntry.compare); 573 581 574 582 // build top bufos grid html 575 - var top_html: std.ArrayList(u8) = .{}; 583 + var top_html: std.ArrayList(u8) = .empty; 576 584 defer top_html.deinit(allocator); 577 585 578 586 // find max count for scaling ··· 596 604 display_name = entry.name[0 .. entry.name.len - 4]; 597 605 } 598 606 599 - try std.fmt.format(top_html.writer(allocator), 607 + try top_html.print(allocator, 600 608 \\<div class="bufo-card" style="width:{}px;height:{}px;" title="{s} ({} matches)" data-name="{s}" onclick="showPosts(this)"> 601 609 \\<img src="{s}" alt="{s}" loading="lazy"> 602 610 \\<span class="bufo-count">{}</span> ··· 636 644 stats: *Stats, 637 645 port: u16, 638 646 639 - pub fn init(allocator: Allocator, stats: *Stats, port: u16) StatsServer { 647 + pub fn initServer(allocator: Allocator, s: *Stats, port: u16) StatsServer { 640 648 return .{ 641 649 .allocator = allocator, 642 - .stats = stats, 650 + .stats = s, 643 651 .port = port, 644 652 }; 645 653 } 646 654 647 655 pub fn run(self: *StatsServer) void { 648 656 // spawn periodic save ticker (every 60s) 649 - _ = Thread.spawn(.{}, saveTicker, .{self.stats}) catch {}; 657 + const ticker = Thread.spawn(.{}, saveTicker, .{self.stats}) catch |err| { 658 + std.debug.print("failed to start save ticker: {}\n", .{err}); 659 + return; 660 + }; 661 + ticker.detach(); 650 662 651 663 self.serve() catch |err| { 652 664 std.debug.print("stats server error: {}\n", .{err}); ··· 655 667 656 668 fn saveTicker(s: *Stats) void { 657 669 while (true) { 658 - std.Thread.sleep(60 * std.time.ns_per_s); 670 + io.sleep(.{ .nanoseconds = 60 * std.time.ns_per_s }, .awake) catch {}; 659 671 s.save(); 660 672 } 661 673 } 662 674 663 675 fn serve(self: *StatsServer) !void { 664 - const addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, self.port); 665 - 666 - var server = try addr.listen(.{ .reuse_address = true }); 667 - defer server.deinit(); 676 + var address = try net.IpAddress.parse("::", self.port); 677 + var server = try net.IpAddress.listen(&address, io, .{ .reuse_address = true }); 678 + defer server.deinit(io); 668 679 669 - std.debug.print("stats server listening on http://0.0.0.0:{}\n", .{self.port}); 680 + std.debug.print("stats server listening on http://[::]:{} \n", .{self.port}); 670 681 671 682 while (true) { 672 - const conn = server.accept() catch |err| { 683 + const stream = server.accept(io) catch |err| { 673 684 std.debug.print("accept error: {}\n", .{err}); 674 685 continue; 675 686 }; 676 687 677 - self.handleConnection(conn) catch |err| { 678 - std.debug.print("connection error: {}\n", .{err}); 688 + const t = Thread.spawn(.{}, handleConnection, .{ self, stream }) catch |err| { 689 + std.debug.print("spawn error: {}\n", .{err}); 690 + stream.close(io); 691 + continue; 679 692 }; 693 + t.detach(); 680 694 } 681 695 } 682 696 683 - fn handleConnection(self: *StatsServer, conn: std.net.Server.Connection) !void { 684 - defer conn.stream.close(); 697 + fn handleConnection(self: *StatsServer, stream: net.Stream) void { 698 + defer stream.close(io); 699 + 700 + var read_buffer: [4096]u8 = undefined; 701 + var write_buffer: [8192]u8 = undefined; 702 + 703 + var reader = net.Stream.Reader.init(stream, io, &read_buffer); 704 + var writer = net.Stream.Writer.init(stream, io, &write_buffer); 705 + 706 + var server = http.Server.init(&reader.interface, &writer.interface); 685 707 686 - // read request (we don't really care about it, just serve stats) 687 - var buf: [1024]u8 = undefined; 688 - _ = conn.stream.read(&buf) catch {}; 708 + var request = server.receiveHead() catch return; 689 709 690 710 const html = self.stats.renderHtml(self.allocator) catch |err| { 691 711 std.debug.print("render error: {}\n", .{err}); ··· 693 713 }; 694 714 defer self.allocator.free(html); 695 715 696 - // write raw HTTP response 697 - var response_buf: [128]u8 = undefined; 698 - const header = std.fmt.bufPrint(&response_buf, "HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", .{html.len}) catch return; 699 - 700 - _ = conn.stream.write(header) catch return; 701 - _ = conn.stream.write(html) catch return; 716 + request.respond(html, .{ 717 + .extra_headers = &.{ 718 + .{ .name = "content-type", .value = "text/html; charset=utf-8" }, 719 + }, 720 + }) catch return; 702 721 } 703 722 };
+2 -2
bot/src/stats_template.zig
··· 249 249 \\ 250 250 \\<div class="strategy"> 251 251 \\ <h2 style="margin-top:0">posting strategy</h2> 252 - \\ <p>global cooldown of 30 min between any post, plus per-bufo scaling &mdash; 253 - \\ bufos that match more often get longer cooldowns (quadratic: a bufo at 30% of matches waits ~10x longer).</p> 252 + \\ <p>rare bufos (&lt;1% of matches) post immediately &mdash; no cooldown. frequent bufos get 253 + \\ quadratic scaling (a bufo at 30% of matches waits ~10x the base cooldown).</p> 254 254 \\ <div class="strategy-rates" id="strategy-rates"></div> 255 255 \\</div> 256 256 \\