polls on atproto
pollz.waow.tech
atproto
zig
1const std = @import("std");
2const mem = std.mem;
3const json = std.json;
4const Allocator = mem.Allocator;
5const zat = @import("zat");
6const db = @import("db.zig");
7const http = @import("http.zig");
8
9const POLL_COLLECTION = "tech.waow.pollz.poll";
10const VOTE_COLLECTION = "tech.waow.pollz.vote";
11
12const Handler = struct {
13 allocator: Allocator,
14 msg_count: usize = 0,
15
16 pub fn onEvent(self: *Handler, event: zat.JetstreamEvent) void {
17 self.msg_count += 1;
18 if (self.msg_count % 100 == 1) {
19 std.debug.print("jetstream: processed {} events\n", .{self.msg_count});
20 }
21
22 switch (event) {
23 .commit => |commit| processCommit(self.allocator, commit) catch |err| {
24 std.debug.print("commit processing error: {}\n", .{err});
25 },
26 else => {},
27 }
28 }
29
30 pub fn onConnect(_: *Handler, host: []const u8) void {
31 std.debug.print("jetstream connected to {s}\n", .{host});
32 }
33
34 pub fn onError(_: *Handler, err: anyerror) void {
35 std.debug.print("jetstream error: {s}\n", .{@errorName(err)});
36 }
37};
38
39fn processCommit(allocator: Allocator, commit: zat.jetstream.CommitEvent) !void {
40 const collection = commit.collection;
41 const is_poll = mem.eql(u8, collection, POLL_COLLECTION);
42 const is_vote = mem.eql(u8, collection, VOTE_COLLECTION);
43 if (!is_poll and !is_vote) return;
44
45 const uri = try std.fmt.allocPrint(allocator, "at://{s}/{s}/{s}", .{ commit.did, collection, commit.rkey });
46 defer allocator.free(uri);
47
48 switch (commit.operation) {
49 .create, .update => {
50 const record = commit.record orelse return;
51 if (record != .object) return;
52
53 if (is_poll) {
54 try processPoll(allocator, uri, commit.did, commit.rkey, record.object);
55 } else {
56 try processVote(allocator, uri, commit.did, record.object);
57 }
58 },
59 .delete => {
60 if (is_poll) {
61 db.deletePoll(uri);
62 std.debug.print("deleted poll: {s}\n", .{uri});
63 } else {
64 db.deleteVote(uri);
65 std.debug.print("deleted vote: {s}\n", .{uri});
66 }
67 },
68 }
69}
70
71fn processPoll(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void {
72 const text_val = record.get("text") orelse return;
73 if (text_val != .string) return;
74
75 const options_val = record.get("options") orelse return;
76 if (options_val != .array) return;
77
78 const created_at_val = record.get("createdAt") orelse return;
79 if (created_at_val != .string) return;
80
81 var options_buf: std.ArrayList(u8) = .empty;
82 defer options_buf.deinit(allocator);
83 try options_buf.print(allocator, "{f}", .{json.fmt(options_val, .{})});
84
85 var text_buf: std.ArrayList(u8) = .empty;
86 defer text_buf.deinit(allocator);
87 try text_buf.print(allocator, "{f}", .{json.fmt(text_val, .{})});
88
89 try db.insertPoll(uri, did, rkey, text_buf.items, options_buf.items, created_at_val.string);
90 std.debug.print("indexed poll: {s}\n", .{uri});
91}
92
93fn processVote(allocator: Allocator, uri: []const u8, did: []const u8, record: json.ObjectMap) !void {
94 const subject_val = record.get("subject") orelse return;
95 if (subject_val != .string) return;
96
97 const option_val = record.get("option") orelse return;
98 if (option_val != .integer) return;
99
100 const created_at: ?[]const u8 = if (record.get("createdAt")) |v| (if (v == .string) v.string else null) else null;
101
102 try db.insertVote(uri, subject_val.string, @as(i32, @intCast(option_val.integer)), did, created_at);
103 std.debug.print("indexed vote: {s} -> {s}\n", .{ uri, subject_val.string });
104
105 // cache voter profile so handles resolve in the votes API
106 if (db.getProfile(did) == null) {
107 http.fetchAndCacheProfile(allocator, did);
108 }
109}
110
111pub fn start(io: std.Io, allocator: Allocator) void {
112 var client = zat.JetstreamClient.init(io, allocator, .{
113 .wanted_collections = &.{ POLL_COLLECTION, VOTE_COLLECTION },
114 });
115 defer client.deinit();
116
117 var handler = Handler{ .allocator = allocator };
118 client.subscribe(&handler) catch |err| {
119 std.debug.print("jetstream subscription ended: {s}\n", .{@errorName(err)});
120 };
121}