semantic bufo search find-bufo.com
bufo
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(bot): adopt zat's JetstreamClient

replace hand-rolled websocket/json boilerplate with zat v0.1.3's typed
JetstreamClient. drops direct websocket dependency (zat brings it
transitively). jetstream.zig goes from 208 to 98 lines — mostly NSFW
filter lists now.

closes #1

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 2937764a 04661efc

+36 -140
+2 -2
bot/build.zig
··· 4 4 const target = b.standardTargetOptions(.{}); 5 5 const optimize = b.standardOptimizeOption(.{}); 6 6 7 - const websocket = b.dependency("websocket", .{ 7 + const zat = b.dependency("zat", .{ 8 8 .target = target, 9 9 .optimize = optimize, 10 10 }); ··· 16 16 .target = target, 17 17 .optimize = optimize, 18 18 .imports = &.{ 19 - .{ .name = "websocket", .module = websocket.module("websocket") }, 19 + .{ .name = "zat", .module = zat.module("zat") }, 20 20 }, 21 21 }), 22 22 });
+3 -3
bot/build.zig.zon
··· 4 4 .fingerprint = 0xe143490f82fa96db, 5 5 .minimum_zig_version = "0.15.0", 6 6 .dependencies = .{ 7 - .websocket = .{ 8 - .url = "https://github.com/karlseguin/websocket.zig/archive/refs/heads/master.tar.gz", 9 - .hash = "websocket-0.1.0-ZPISdRNzAwAGszh62EpRtoQxu8wb1MSMVI6Ow0o2dmyJ", 7 + .zat = .{ 8 + .url = "https://tangled.org/zat.dev/zat/archive/7123918.tar.gz", 9 + .hash = "zat-0.1.0-5PuC7qrxAQBPP3pQ2fBn6E0FNJO5cZw_Qj3rM7gEiZYv", 10 10 }, 11 11 }, 12 12 .paths = .{
+23 -133
bot/src/jetstream.zig
··· 1 1 const std = @import("std"); 2 2 const mem = std.mem; 3 3 const json = std.json; 4 - const posix = std.posix; 5 - const Allocator = mem.Allocator; 4 + const zat = @import("zat"); 6 5 7 6 const nsfw_labels: []const []const u8 = &.{ 8 7 "porn", ··· 27 26 "#fetish", 28 27 "#kink", 29 28 }; 30 - const websocket = @import("websocket"); 31 29 32 30 pub const Post = struct { 33 31 uri: []const u8, ··· 36 34 rkey: []const u8, 37 35 }; 38 36 39 - pub const JetstreamClient = struct { 40 - allocator: Allocator, 41 - host: []const u8, 37 + pub const PostHandler = struct { 42 38 callback: *const fn (Post) void, 43 39 44 - pub fn init(allocator: Allocator, host: []const u8, callback: *const fn (Post) void) JetstreamClient { 45 - return .{ 46 - .allocator = allocator, 47 - .host = host, 48 - .callback = callback, 49 - }; 50 - } 51 - 52 - pub fn run(self: *JetstreamClient) void { 53 - // exponential backoff: 1s -> 2s -> 4s -> ... -> 60s cap 54 - var backoff: u64 = 1; 55 - const max_backoff: u64 = 60; 56 - 57 - while (true) { 58 - self.connect() catch |err| { 59 - std.debug.print("jetstream error: {}, reconnecting in {}s...\n", .{ err, backoff }); 60 - }; 61 - posix.nanosleep(backoff, 0); 62 - backoff = @min(backoff * 2, max_backoff); 40 + pub fn onEvent(self: *PostHandler, event: zat.JetstreamEvent) void { 41 + switch (event) { 42 + .commit => |c| self.handleCommit(c), 43 + else => {}, 63 44 } 64 45 } 65 46 66 - fn connect(self: *JetstreamClient) !void { 67 - const path = "/subscribe?wantedCollections=app.bsky.feed.post"; 68 - 69 - std.debug.print("connecting to wss://{s}{s}\n", .{ self.host, path }); 70 - 71 - var client = websocket.Client.init(self.allocator, .{ 72 - .host = self.host, 73 - .port = 443, 74 - .tls = true, 75 - .max_size = 1024 * 1024, // 1MB - some jetstream messages are large 76 - }) catch |err| { 77 - std.debug.print("websocket client init failed: {}\n", .{err}); 78 - return err; 79 - }; 80 - defer client.deinit(); 81 - 82 - var host_header_buf: [256]u8 = undefined; 83 - const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{self.host}) catch self.host; 84 - 85 - client.handshake(path, .{ .headers = host_header }) catch |err| { 86 - std.debug.print("websocket handshake failed: {}\n", .{err}); 87 - return err; 88 - }; 89 - 90 - std.debug.print("jetstream connected!\n", .{}); 91 - 92 - var handler = Handler{ .allocator = self.allocator, .callback = self.callback }; 93 - client.readLoop(&handler) catch |err| { 94 - std.debug.print("websocket read loop error: {}\n", .{err}); 95 - return err; 96 - }; 97 - } 98 - }; 99 - 100 - const Handler = struct { 101 - allocator: Allocator, 102 - callback: *const fn (Post) void, 103 - 104 - pub fn serverMessage(self: *Handler, data: []const u8) !void { 105 - self.processMessage(data) catch |err| { 106 - if (err != error.NotAPost) { 107 - std.debug.print("message processing error: {}\n", .{err}); 108 - } 109 - }; 110 - } 111 - 112 - pub fn close(_: *Handler) void { 113 - std.debug.print("jetstream connection closed\n", .{}); 47 + pub fn onError(_: *PostHandler, err: anyerror) void { 48 + std.debug.print("jetstream error: {s}\n", .{@errorName(err)}); 114 49 } 115 50 116 - fn processMessage(self: *Handler, payload: []const u8) !void { 117 - // jetstream format: 118 - // { "did": "...", "kind": "commit", "commit": { "collection": "app.bsky.feed.post", "rkey": "...", "record": { "text": "...", ... } } } 119 - const parsed = json.parseFromSlice(json.Value, self.allocator, payload, .{}) catch return error.ParseError; 120 - defer parsed.deinit(); 121 - 122 - const root = parsed.value.object; 123 - 124 - // check kind 125 - const kind = root.get("kind") orelse return error.NotAPost; 126 - if (kind != .string or !mem.eql(u8, kind.string, "commit")) return error.NotAPost; 127 - 128 - // get did 129 - const did_val = root.get("did") orelse return error.NotAPost; 130 - if (did_val != .string) return error.NotAPost; 51 + fn handleCommit(self: *PostHandler, c: zat.jetstream.CommitEvent) void { 52 + if (c.operation != .create) return; 131 53 132 - // get commit 133 - const commit = root.get("commit") orelse return error.NotAPost; 134 - if (commit != .object) return error.NotAPost; 135 - 136 - // check collection 137 - const collection = commit.object.get("collection") orelse return error.NotAPost; 138 - if (collection != .string or !mem.eql(u8, collection.string, "app.bsky.feed.post")) return error.NotAPost; 139 - 140 - // check operation (create only) 141 - const operation = commit.object.get("operation") orelse return error.NotAPost; 142 - if (operation != .string or !mem.eql(u8, operation.string, "create")) return error.NotAPost; 143 - 144 - // get rkey 145 - const rkey_val = commit.object.get("rkey") orelse return error.NotAPost; 146 - if (rkey_val != .string) return error.NotAPost; 147 - 148 - // get record 149 - const record = commit.object.get("record") orelse return error.NotAPost; 150 - if (record != .object) return error.NotAPost; 54 + const record = c.record orelse return; 151 55 152 - // check for nsfw labels 153 - if (hasNsfwLabels(record.object)) return error.NotAPost; 56 + if (hasNsfwLabels(record)) return; 154 57 155 - // get text 156 - const text_val = record.object.get("text") orelse return error.NotAPost; 157 - if (text_val != .string) return error.NotAPost; 58 + const text = zat.json.getString(record, "text") orelse return; 158 59 159 - // check for nsfw keywords in text 160 - if (hasNsfwKeywords(text_val.string)) return error.NotAPost; 60 + if (hasNsfwKeywords(text)) return; 161 61 162 - // construct uri 163 62 var uri_buf: [256]u8 = undefined; 164 - const uri = std.fmt.bufPrint(&uri_buf, "at://{s}/app.bsky.feed.post/{s}", .{ did_val.string, rkey_val.string }) catch return error.UriTooLong; 63 + const uri = std.fmt.bufPrint(&uri_buf, "at://{s}/app.bsky.feed.post/{s}", .{ c.did, c.rkey }) catch return; 165 64 166 65 self.callback(.{ 167 66 .uri = uri, 168 - .text = text_val.string, 169 - .did = did_val.string, 170 - .rkey = rkey_val.string, 67 + .text = text, 68 + .did = c.did, 69 + .rkey = c.rkey, 171 70 }); 172 71 } 173 72 }; 174 73 175 - fn hasNsfwLabels(record: json.ObjectMap) bool { 176 - // labels structure: { "values": [{ "val": "porn" }, ...] } 177 - const labels = record.get("labels") orelse return false; 178 - if (labels != .object) return false; 74 + fn hasNsfwLabels(record: json.Value) bool { 75 + const values = zat.json.getArray(record, "labels.values") orelse return false; 179 76 180 - const values = labels.object.get("values") orelse return false; 181 - if (values != .array) return false; 182 - 183 - for (values.array.items) |item| { 184 - if (item != .object) continue; 185 - const val = item.object.get("val") orelse continue; 186 - if (val != .string) continue; 187 - 77 + for (values) |item| { 78 + const val = zat.json.getString(item, "val") orelse continue; 188 79 for (nsfw_labels) |label| { 189 - if (mem.eql(u8, val.string, label)) return true; 80 + if (mem.eql(u8, val, label)) return true; 190 81 } 191 82 } 192 83 return false; 193 84 } 194 85 195 86 fn hasNsfwKeywords(text: []const u8) bool { 196 - // convert to lowercase for case-insensitive matching 197 87 var lower_buf: [4096]u8 = undefined; 198 88 const len = @min(text.len, lower_buf.len); 199 89 for (text[0..len], 0..) |c, i| {
+8 -2
bot/src/main.zig
··· 4 4 const http = std.http; 5 5 const Thread = std.Thread; 6 6 const Allocator = mem.Allocator; 7 + const zat = @import("zat"); 7 8 const config = @import("config.zig"); 8 9 const matcher = @import("matcher.zig"); 9 10 const jetstream = @import("jetstream.zig"); ··· 78 79 defer stats_thread.join(); 79 80 80 81 // start jetstream consumer 81 - var js = jetstream.JetstreamClient.init(allocator, cfg.jetstream_endpoint, onPost); 82 - js.run(); 82 + var handler = jetstream.PostHandler{ .callback = onPost }; 83 + var client = zat.JetstreamClient.init(allocator, .{ 84 + .host = cfg.jetstream_endpoint, 85 + .wanted_collections = &.{"app.bsky.feed.post"}, 86 + }); 87 + defer client.deinit(); 88 + client.subscribe(&handler); 83 89 } 84 90 85 91 fn onPost(post: jetstream.Post) void {