this repo has no description
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

initial commit

Tim Culverhouse 07e32988

+466
+2
.gitignore
··· 1 + .zig-cache/ 2 + zig-out/
+18
LICENSE
··· 1 + Copyright 2025 Tim Culverhouse 2 + 3 + Permission is hereby granted, free of charge, to any person obtaining a copy of 4 + this software and associated documentation files (the “Software”), to deal in 5 + the Software without restriction, including without limitation the rights to 6 + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of 7 + the Software, and to permit persons to whom the Software is furnished to do so, 8 + subject to the following conditions: 9 + 10 + The above copyright notice and this permission notice shall be included in all 11 + copies or substantial portions of the Software. 12 + 13 + THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS 15 + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR 16 + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER 17 + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 18 + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+14
README.md
··· 1 + # zig-atproto 2 + 3 + A library for creating ATProtocol based applications using zig. 4 + 5 + ## Features 6 + 7 + - [Ourio](https://github.com/rockorager/ourio) async IO 8 + - Jetstream 9 + 10 + ## TODO 11 + 12 + - DID resolution 13 + - CBOR + lexicon codegen 14 + - OAuth
+25
build.zig
··· 1 + const std = @import("std"); 2 + 3 + pub fn build(b: *std.Build) void { 4 + const target = b.standardTargetOptions(.{}); 5 + const optimize = b.standardOptimizeOption(.{}); 6 + 7 + const ourio_dep = b.dependency("ourio", .{ .target = target, .optimize = optimize }); 8 + 9 + const lib_mod = b.createModule(.{ 10 + .root_source_file = b.path("src/root.zig"), 11 + .target = target, 12 + .optimize = optimize, 13 + }); 14 + lib_mod.addImport("ourio", ourio_dep.module("ourio")); 15 + lib_mod.addImport("stda", ourio_dep.module("stda")); 16 + 17 + const lib_unit_tests = b.addTest(.{ 18 + .root_module = lib_mod, 19 + }); 20 + 21 + const run_lib_unit_tests = b.addRunArtifact(lib_unit_tests); 22 + 23 + const test_step = b.step("test", "Run unit tests"); 24 + test_step.dependOn(&run_lib_unit_tests.step); 25 + }
+21
build.zig.zon
··· 1 + .{ 2 + .name = .zig_atproto, 3 + .version = "0.0.0", 4 + 5 + .fingerprint = 0xbebf555e47399aa4, // Changing this has security and trust implications. 6 + 7 + .minimum_zig_version = "0.14.0", 8 + 9 + .dependencies = .{ 10 + .ourio = .{ 11 + .url = "/home/tim/repos/github.com/rockorager/ourio", 12 + .hash = "ourio-0.0.0-_s-z0SsPAgB0SDzLiMiEKWY9MrHCrdVEPL01OQF4uVcm", 13 + }, 14 + }, 15 + 16 + .paths = .{ 17 + "build.zig", 18 + "build.zig.zon", 19 + "src", 20 + }, 21 + }
+380
src/jetstream.zig
··· 1 + const std = @import("std"); 2 + const stda = @import("stda"); 3 + const ourio = @import("ourio"); 4 + 5 + const Allocator = std.mem.Allocator; 6 + const posix = std.posix; 7 + 8 + /// A stream of events from the jetstream 9 + pub const Stream = struct { 10 + gpa: Allocator, 11 + fd: posix.fd_t = -1, 12 + bundle: std.crypto.Certificate.Bundle, 13 + host: u2, 14 + path: []const u8, 15 + ctx: ourio.Context, 16 + 17 + /// the expected Sec-WebSocket-Accept value 18 + challenge: []const u8, 19 + 20 + state: union(enum) { 21 + connect: *stda.net.ConnectTask, 22 + handshake: *stda.tls.Client.HandshakeTask, 23 + conn: *stda.tls.Client, 24 + }, 25 + 26 + const hosts = [_][]const u8{ 27 + "jetstream1.us-east.bsky.network", 28 + "jetstream2.us-east.bsky.network", 29 + "jetstream1.us-west.bsky.network", 30 + "jetstream2.us-west.bsky.network", 31 + }; 32 + 33 + const guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 34 + 35 + const Msg = enum { 36 + connect, 37 + handshake, 38 + upgrade, 39 + recv, 40 + write, 41 + close, 42 + }; 43 + 44 + pub const Options = struct { 45 + /// array of NSIDs to filter for. Can be wildcard prefixes 46 + collections: []const []const u8 = &.{}, 47 + 48 + /// array of DIDs to filter for 49 + dids: []const []const u8 = &.{}, 50 + 51 + /// maximum message size this client would like to receive 52 + max_msg_size: usize = 0, 53 + 54 + /// microsecond timestamp of when to begin replay of events. Now or in the future results in 55 + /// a live tail of events 56 + cursor_us: ?i64 = null, 57 + }; 58 + 59 + pub fn init( 60 + self: *Stream, 61 + gpa: Allocator, 62 + io: *ourio.Ring, 63 + bundle: std.crypto.Certificate.Bundle, 64 + opts: Options, 65 + ctx: ourio.Context, 66 + ) !void { 67 + var seed: u64 = undefined; 68 + try std.posix.getrandom(std.mem.asBytes(&seed)); 69 + var prng = std.Random.DefaultPrng.init(seed); 70 + const idx = prng.random().int(u2); 71 + const host = hosts[idx]; 72 + const task = try stda.net.tcpConnectToHost(io, host, 443, .{ 73 + .ptr = self, 74 + .msg = @intFromEnum(Msg.connect), 75 + .cb = Stream.onCompletion, 76 + }); 77 + 78 + var list: std.ArrayList(u8) = .init(gpa); 79 + defer list.deinit(); 80 + 81 + for (opts.collections) |collection| { 82 + if (list.items.len > 0) { 83 + try list.append('&'); 84 + } 85 + try list.appendSlice("wantedCollections="); 86 + 87 + if (std.mem.indexOfScalar(u8, collection, '*')) |_| { 88 + try list.appendSlice(std.mem.trimRight(u8, collection, "*")); 89 + try list.appendSlice("%2A"); 90 + } else { 91 + try list.appendSlice(collection); 92 + } 93 + } 94 + 95 + for (opts.dids) |did| { 96 + if (list.items.len > 0) { 97 + try list.append('&'); 98 + } 99 + try list.appendSlice("wantedDids="); 100 + try list.appendSlice(did); 101 + } 102 + 103 + if (opts.max_msg_size > 0) { 104 + if (list.items.len > 0) { 105 + try list.append('&'); 106 + } 107 + try list.writer().print("maxMessageSizeBytes={d}", .{opts.max_msg_size}); 108 + } 109 + 110 + if (opts.cursor_us) |cursor| { 111 + if (list.items.len > 0) { 112 + try list.append('&'); 113 + } 114 + try list.writer().print("cursor={d}", .{cursor}); 115 + } 116 + 117 + const path: []const u8 = if (list.items.len > 0) 118 + try std.fmt.allocPrint(gpa, "?{s}", .{list.items}) 119 + else 120 + ""; 121 + 122 + self.* = .{ 123 + .gpa = gpa, 124 + .fd = -1, 125 + .bundle = bundle, 126 + .state = .{ .connect = task }, 127 + .challenge = "", 128 + .host = idx, 129 + .path = path, 130 + .ctx = ctx, 131 + }; 132 + } 133 + 134 + pub fn deinit(self: *Stream) void { 135 + self.gpa.free(self.path); 136 + self.gpa.free(self.challenge); 137 + switch (self.state) { 138 + .connect => {}, 139 + .handshake => {}, 140 + .conn => |conn| { 141 + conn.deinit(self.gpa); 142 + self.gpa.destroy(conn); 143 + }, 144 + } 145 + } 146 + 147 + fn reportError(self: *Stream, io: *ourio.Ring, err: anyerror) !void { 148 + const task: ourio.Task = .{ 149 + .callback = self.ctx.cb, 150 + .userdata = self.ctx.ptr, 151 + .msg = self.ctx.msg, 152 + .result = .{ .userbytes = err }, 153 + }; 154 + try self.ctx.cb(io, task); 155 + } 156 + 157 + pub fn onCompletion(io: *ourio.Ring, task: ourio.Task) anyerror!void { 158 + const self = task.userdataCast(Stream); 159 + const result = task.result.?; 160 + 161 + switch (task.msgToEnum(Msg)) { 162 + .connect => { 163 + self.fd = result.userfd catch |err| return self.reportError(io, err); 164 + const hs_task = try stda.tls.Client.init( 165 + io, 166 + self.fd, 167 + .{ .host = hosts[self.host], .root_ca = self.bundle }, 168 + .{ 169 + .ptr = self, 170 + .msg = @intFromEnum(Msg.handshake), 171 + .cb = Stream.onCompletion, 172 + }, 173 + ); 174 + 175 + self.state = .{ .handshake = hs_task }; 176 + }, 177 + 178 + .handshake => { 179 + const ptr = result.userptr catch |err| return self.reportError(io, err); 180 + const conn: *stda.tls.Client = @ptrCast(@alignCast(ptr)); 181 + 182 + conn.userdata = self; 183 + conn.callback = Stream.onCompletion; 184 + conn.recv_msg = @intFromEnum(Msg.upgrade); 185 + conn.write_msg = @intFromEnum(Msg.write); 186 + conn.close_msg = @intFromEnum(Msg.close); 187 + 188 + self.state = .{ .conn = conn }; 189 + 190 + var key: [16]u8 = undefined; 191 + std.crypto.random.bytes(&key); 192 + var base64_buf: [24]u8 = undefined; 193 + const base64_key = std.base64.standard.Encoder.encode(&base64_buf, &key); 194 + 195 + // Precompute the Accept response for the challenge 196 + { 197 + var accept_buf: [24 + guid.len]u8 = undefined; 198 + @memcpy(accept_buf[0..24], base64_key); 199 + @memcpy(accept_buf[24..], guid); 200 + var hash_buf: [std.crypto.hash.Sha1.digest_length]u8 = undefined; 201 + std.crypto.hash.Sha1.hash(&accept_buf, &hash_buf, .{}); 202 + self.gpa.free(self.challenge); 203 + const base64_hash = try self.gpa.alloc(u8, 28); 204 + self.challenge = std.base64.standard.Encoder.encode(base64_hash, &hash_buf); 205 + } 206 + 207 + // Write an upgrade request 208 + var writer = conn.cleartext_buf.writer(self.gpa); 209 + try writer.print("GET /subscribe{s} HTTP/1.1\r\n", .{self.path}); 210 + try writer.print("Host: {s}\r\n", .{hosts[self.host]}); 211 + try writer.writeAll("Upgrade: websocket\r\n"); 212 + try writer.writeAll("Connection: Upgrade\r\n"); 213 + try writer.print("Sec-WebSocket-Key: {s}\r\n", .{base64_key}); 214 + try writer.writeAll("Sec-WebSocket-Version: 13\r\n"); 215 + try writer.writeAll("\r\n"); 216 + 217 + try conn.flush(self.gpa, io); 218 + 219 + // Initiate receiving messages 220 + try conn.recv(io); 221 + }, 222 + 223 + .upgrade => { 224 + // Assume we'll always get the header in the first read 225 + const n = result.recv catch |err| return self.reportError(io, err); 226 + const buf = self.state.conn.read_buf[0..n]; 227 + 228 + var iter = std.mem.splitSequence(u8, buf, "\r\n"); 229 + if (!std.mem.startsWith(u8, iter.first(), "HTTP/1.1 101")) { 230 + return error.InvalidUpgrade; 231 + } 232 + 233 + var valid_accept = false; 234 + while (iter.next()) |line| { 235 + if (line.len == 0) break; 236 + var line_iter = std.mem.splitScalar(u8, line, ':'); 237 + if (std.ascii.eqlIgnoreCase("Sec-WebSocket-Accept", line_iter.first())) { 238 + const value = std.mem.trim(u8, line_iter.rest(), &std.ascii.whitespace); 239 + valid_accept = std.mem.eql(u8, value, self.challenge); 240 + } 241 + } 242 + 243 + if (!valid_accept) { 244 + return error.InvalidSecWebSocketAccept; 245 + } 246 + 247 + self.state.conn.recv_msg = @intFromEnum(Msg.recv); 248 + 249 + try self.decodeFrames(io, iter.rest()); 250 + }, 251 + 252 + .recv => { 253 + const n = try result.recv; 254 + const buf = self.state.conn.read_buf[0..n]; 255 + try self.decodeFrames(io, buf); 256 + @panic("here"); 257 + }, 258 + 259 + .write => { 260 + _ = result.write catch |err| return self.reportError(io, err); 261 + }, 262 + 263 + .close => { 264 + _ = result.close catch |err| return self.reportError(io, err); 265 + }, 266 + } 267 + } 268 + 269 + fn decodeFrames(self: *Stream, io: *ourio.Ring, bytes: []const u8) !void { 270 + var iter: FrameIterator = .{ .bytes = bytes }; 271 + while (try iter.next()) |data| { 272 + switch (data) { 273 + .text => |s| { 274 + const task: ourio.Task = .{ 275 + .callback = self.ctx.cb, 276 + .userdata = self.ctx.ptr, 277 + .msg = self.ctx.msg, 278 + .result = .{ .userbytes = s }, 279 + }; 280 + try self.ctx.cb(io, task); 281 + }, 282 + else => return error.UnsupportedOp, 283 + } 284 + } 285 + } 286 + }; 287 + 288 + const FrameIterator = struct { 289 + bytes: []const u8, 290 + idx: usize = 0, 291 + 292 + const Data = union(enum) { 293 + continuation: []const u8, 294 + text: []const u8, 295 + binary: []const u8, 296 + close, 297 + ping, 298 + pong, 299 + }; 300 + 301 + fn next(self: *FrameIterator) !?Data { 302 + if (self.idx + 2 >= self.bytes.len) return null; 303 + 304 + const Byte1 = packed struct { 305 + opcode: enum(u4) { 306 + continuation = 0x0, 307 + text = 0x1, 308 + binary = 0x2, 309 + close = 0x8, 310 + ping = 0x9, 311 + pong = 0xA, 312 + _, 313 + }, 314 + reserved: u3, 315 + final: bool, 316 + }; 317 + 318 + const Byte2 = packed struct { 319 + len: u7, 320 + mask: bool, 321 + }; 322 + 323 + const byte1: Byte1 = @bitCast(self.bytes[self.idx]); 324 + const byte2: Byte2 = @bitCast(self.bytes[self.idx + 1]); 325 + self.idx += 2; 326 + 327 + if (!byte1.final) return error.ContinuationFramesNotSupportedYet; 328 + if (byte2.mask) return error.MaskNotSupported; 329 + 330 + const len: usize, const data_start: usize = switch (byte2.len) { 331 + 126 => .{ std.mem.readInt(u16, self.bytes[self.idx..][0..2], .big), self.idx + 2 }, 332 + 127 => .{ std.mem.readInt(u64, self.bytes[self.idx..][0..8], .big), self.idx + 8 }, 333 + else => .{ byte2.len, self.idx }, 334 + }; 335 + 336 + const end = data_start + len; 337 + defer self.idx = end; 338 + 339 + switch (byte1.opcode) { 340 + .continuation => return .{ .continuation = self.bytes[data_start..end] }, 341 + .text => return .{ .text = self.bytes[data_start..end] }, 342 + .binary => return .{ .binary = self.bytes[data_start..end] }, 343 + .close => return .close, 344 + .ping => return .ping, 345 + .pong => return .pong, 346 + else => return error.InvalidOpcode, 347 + } 348 + } 349 + 350 + fn rest(self: *FrameIterator) []const u8 { 351 + if (self.idx >= self.bytes.len) return ""; 352 + return self.bytes[self.idx..]; 353 + } 354 + }; 355 + 356 + fn testHandler(_: *ourio.Ring, task: ourio.Task) anyerror!void { 357 + const s = try task.result.?.userbytes; 358 + const value = try std.json.parseFromSlice(std.json.Value, std.testing.allocator, s, .{}); 359 + defer value.deinit(); 360 + const pretty = try std.json.stringifyAlloc(std.testing.allocator, value.value, .{ .whitespace = .indent_2 }); 361 + defer std.testing.allocator.free(pretty); 362 + std.log.err("json={s}", .{pretty}); 363 + } 364 + 365 + test { 366 + const gpa = std.testing.allocator; 367 + 368 + var bundle: std.crypto.Certificate.Bundle = .{}; 369 + try bundle.rescan(gpa); 370 + defer bundle.deinit(gpa); 371 + 372 + var io: ourio.Ring = try .init(gpa, 16); 373 + defer io.deinit(); 374 + 375 + var stream: Stream = undefined; 376 + try stream.init(gpa, &io, bundle, .{}, .{ .cb = testHandler }); 377 + defer stream.deinit(); 378 + 379 + try io.run(.until_done); 380 + }
+6
src/root.zig
··· 1 + const std = @import("std"); 2 + const testing = std.testing; 3 + 4 + test { 5 + _ = @import("jetstream.zig"); 6 + }