polls on atproto pollz.waow.tech
atproto zig
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 121 lines 4.3 kB view raw
1const std = @import("std"); 2const mem = std.mem; 3const json = std.json; 4const Allocator = mem.Allocator; 5const zat = @import("zat"); 6const db = @import("db.zig"); 7const http = @import("http.zig"); 8 9const POLL_COLLECTION = "tech.waow.pollz.poll"; 10const VOTE_COLLECTION = "tech.waow.pollz.vote"; 11 12const Handler = struct { 13 allocator: Allocator, 14 msg_count: usize = 0, 15 16 pub fn onEvent(self: *Handler, event: zat.JetstreamEvent) void { 17 self.msg_count += 1; 18 if (self.msg_count % 100 == 1) { 19 std.debug.print("jetstream: processed {} events\n", .{self.msg_count}); 20 } 21 22 switch (event) { 23 .commit => |commit| processCommit(self.allocator, commit) catch |err| { 24 std.debug.print("commit processing error: {}\n", .{err}); 25 }, 26 else => {}, 27 } 28 } 29 30 pub fn onConnect(_: *Handler, host: []const u8) void { 31 std.debug.print("jetstream connected to {s}\n", .{host}); 32 } 33 34 pub fn onError(_: *Handler, err: anyerror) void { 35 std.debug.print("jetstream error: {s}\n", .{@errorName(err)}); 36 } 37}; 38 39fn processCommit(allocator: Allocator, commit: zat.jetstream.CommitEvent) !void { 40 const collection = commit.collection; 41 const is_poll = mem.eql(u8, collection, POLL_COLLECTION); 42 const is_vote = mem.eql(u8, collection, VOTE_COLLECTION); 43 if (!is_poll and !is_vote) return; 44 45 const uri = try std.fmt.allocPrint(allocator, "at://{s}/{s}/{s}", .{ commit.did, collection, commit.rkey }); 46 defer allocator.free(uri); 47 48 switch (commit.operation) { 49 .create, .update => { 50 const record = commit.record orelse return; 51 if (record != .object) return; 52 53 if (is_poll) { 54 try processPoll(allocator, uri, commit.did, commit.rkey, record.object); 55 } else { 56 try processVote(allocator, uri, commit.did, record.object); 57 } 58 }, 59 .delete => { 60 if (is_poll) { 61 db.deletePoll(uri); 62 std.debug.print("deleted poll: {s}\n", .{uri}); 63 } else { 64 db.deleteVote(uri); 65 std.debug.print("deleted vote: {s}\n", .{uri}); 66 } 67 }, 68 } 69} 70 71fn processPoll(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void { 72 const text_val = record.get("text") orelse return; 73 if (text_val != .string) return; 74 75 const options_val = record.get("options") orelse return; 76 if (options_val != .array) return; 77 78 const created_at_val = record.get("createdAt") orelse return; 79 if (created_at_val != .string) return; 80 81 var options_buf: std.ArrayList(u8) = .empty; 82 defer options_buf.deinit(allocator); 83 try options_buf.print(allocator, "{f}", .{json.fmt(options_val, .{})}); 84 85 var text_buf: std.ArrayList(u8) = .empty; 86 defer text_buf.deinit(allocator); 87 try text_buf.print(allocator, "{f}", .{json.fmt(text_val, .{})}); 88 89 try db.insertPoll(uri, did, rkey, text_buf.items, options_buf.items, created_at_val.string); 90 std.debug.print("indexed poll: {s}\n", .{uri}); 91} 92 93fn processVote(allocator: Allocator, uri: []const u8, did: []const u8, record: json.ObjectMap) !void { 94 const subject_val = record.get("subject") orelse return; 95 if (subject_val != .string) return; 96 97 const option_val = record.get("option") orelse return; 98 if (option_val != .integer) return; 99 100 const created_at: ?[]const u8 = if (record.get("createdAt")) |v| (if (v == .string) v.string else null) else null; 101 102 try db.insertVote(uri, subject_val.string, @as(i32, @intCast(option_val.integer)), did, created_at); 103 std.debug.print("indexed vote: {s} -> {s}\n", .{ uri, subject_val.string }); 104 105 // cache voter profile so handles resolve in the votes API 106 if (db.getProfile(did) == null) { 107 http.fetchAndCacheProfile(allocator, did); 108 } 109} 110 111pub fn start(io: std.Io, allocator: Allocator) void { 112 var client = zat.JetstreamClient.init(io, allocator, .{ 113 .wanted_collections = &.{ POLL_COLLECTION, VOTE_COLLECTION }, 114 }); 115 defer client.deinit(); 116 117 var handler = Handler{ .allocator = allocator }; 118 client.subscribe(&handler) catch |err| { 119 std.debug.print("jetstream subscription ended: {s}\n", .{@errorName(err)}); 120 }; 121}