atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

add checked xrpc errors and retries

+365 -43
+116 -42
src/internal/xrpc/transport.zig
··· 60 60 return try self.fetchResolved(options, resolved, headers, extra_buf[0..extra_count]); 61 61 } 62 62 63 - if (options.max_response_size) |max| { 64 - const body_buf = try self.allocator.alloc(u8, max); 65 - defer self.allocator.free(body_buf); 66 - var writer = std.Io.Writer.fixed(body_buf); 63 + return try self.fetchUrl(options, headers, extra_buf[0..extra_count]); 64 + } 67 65 68 - const result = self.http_client.fetch(.{ 69 - .location = .{ .url = options.url }, 70 - .response_writer = &writer, 71 - .method = options.method, 72 - .payload = options.payload, 73 - .headers = headers, 74 - .extra_headers = extra_buf[0..extra_count], 75 - .keep_alive = self.keep_alive, 76 - .redirect_behavior = options.redirect_behavior, 77 - }) catch |err| switch (err) { 78 - error.WriteFailed => return error.ResponseTooLarge, 79 - else => |e| return e, 80 - }; 66 + fn fetchUrl( 67 + self: *HttpTransport, 68 + options: FetchOptions, 69 + headers: std.http.Client.Request.Headers, 70 + extra_headers: []const std.http.Header, 71 + ) !FetchResult { 72 + const uri = try std.Uri.parse(options.url); 73 + const redirect_behavior = redirectBehavior(options); 74 + var request = try self.http_client.request(options.method, uri, .{ 75 + .headers = headers, 76 + .extra_headers = extra_headers, 77 + .keep_alive = self.keep_alive, 78 + .redirect_behavior = redirect_behavior, 79 + }); 80 + defer request.deinit(); 81 81 82 - return .{ 83 - .status = result.status, 84 - .body = try self.allocator.dupe(u8, writer.buffered()), 85 - }; 82 + if (options.payload) |payload| { 83 + request.transfer_encoding = .{ .content_length = payload.len }; 84 + var body = try request.sendBodyUnflushed(&.{}); 85 + try body.writer.writeAll(payload); 86 + try body.end(); 87 + try request.connection.?.flush(); 88 + } else { 89 + try request.sendBodiless(); 86 90 } 87 91 88 - var aw: std.Io.Writer.Allocating = .init(self.allocator); 89 - defer aw.deinit(); 92 + const redirect_buffer = try self.allocRedirectBuffer(redirect_behavior); 93 + defer self.freeRedirectBuffer(redirect_buffer, redirect_behavior); 90 94 91 - const result = try self.http_client.fetch(.{ 92 - .location = .{ .url = options.url }, 93 - .response_writer = &aw.writer, 94 - .method = options.method, 95 - .payload = options.payload, 96 - .headers = headers, 97 - .extra_headers = extra_buf[0..extra_count], 98 - .keep_alive = self.keep_alive, 99 - .redirect_behavior = options.redirect_behavior, 100 - }); 101 - 102 - return .{ 103 - .status = result.status, 104 - .body = try self.allocator.dupe(u8, aw.written()), 105 - }; 95 + var response = try request.receiveHead(redirect_buffer); 96 + return try self.readFetchResult(options, &response); 106 97 } 107 98 108 99 fn fetchResolved( ··· 130 121 .proxied_host = logical_host, 131 122 }); 132 123 124 + const redirect_behavior = redirectBehavior(options); 133 125 var request = self.http_client.request(options.method, uri, .{ 134 126 .connection = connection, 135 127 .headers = headers, 136 128 .extra_headers = extra_headers, 137 129 .keep_alive = self.keep_alive, 138 - .redirect_behavior = options.redirect_behavior orelse .unhandled, 130 + .redirect_behavior = redirect_behavior, 139 131 }) catch |err| { 140 132 self.http_client.connection_pool.release(connection, self.io); 141 133 return err; ··· 152 144 try request.sendBodiless(); 153 145 } 154 146 155 - var response = try request.receiveHead(&.{}); 147 + const redirect_buffer = try self.allocRedirectBuffer(redirect_behavior); 148 + defer self.freeRedirectBuffer(redirect_buffer, redirect_behavior); 149 + 150 + var response = try request.receiveHead(redirect_buffer); 151 + return try self.readFetchResult(options, &response); 152 + } 153 + 154 + fn allocRedirectBuffer(self: *HttpTransport, behavior: std.http.Client.Request.RedirectBehavior) ![]u8 { 155 + if (behavior == .unhandled) return &.{}; 156 + return try self.allocator.alloc(u8, 8 * 1024); 157 + } 158 + 159 + fn freeRedirectBuffer(self: *HttpTransport, buffer: []u8, behavior: std.http.Client.Request.RedirectBehavior) void { 160 + if (behavior != .unhandled) self.allocator.free(buffer); 161 + } 162 + 163 + fn readFetchResult( 164 + self: *HttpTransport, 165 + options: FetchOptions, 166 + response: *std.http.Client.Response, 167 + ) !FetchResult { 168 + const rate_limit = RateLimitHeaders.fromResponseHead(response.head); 169 + 156 170 if (options.max_response_size) |max| { 157 171 const body_buf = try self.allocator.alloc(u8, max); 158 172 defer self.allocator.free(body_buf); 159 173 var writer = std.Io.Writer.fixed(body_buf); 160 - try streamResponseBody(&response, &writer); 174 + try streamResponseBody(response, &writer); 161 175 return .{ 162 176 .status = response.head.status, 163 177 .body = try self.allocator.dupe(u8, writer.buffered()), 178 + .rate_limit = rate_limit, 164 179 }; 165 180 } 166 181 167 182 var aw: std.Io.Writer.Allocating = .init(self.allocator); 168 183 defer aw.deinit(); 169 - try streamResponseBody(&response, &aw.writer); 184 + try streamResponseBody(response, &aw.writer); 170 185 return .{ 171 186 .status = response.head.status, 172 187 .body = try self.allocator.dupe(u8, aw.written()), 188 + .rate_limit = rate_limit, 173 189 }; 174 190 } 175 191 ··· 196 212 pub const FetchResult = struct { 197 213 status: std.http.Status, 198 214 body: []u8, 215 + rate_limit: RateLimitHeaders = .{}, 216 + }; 217 + 218 + pub const RateLimitHeaders = struct { 219 + limit: ?u64 = null, 220 + remaining: ?u64 = null, 221 + reset: ?u64 = null, 222 + retry_after: ?u64 = null, 223 + 224 + pub fn fromResponseHead(head: std.http.Client.Response.Head) RateLimitHeaders { 225 + var result: RateLimitHeaders = .{}; 226 + var it = head.iterateHeaders(); 227 + while (it.next()) |header| { 228 + if (std.ascii.eqlIgnoreCase(header.name, "ratelimit-limit")) { 229 + result.limit = parseHeaderInt(header.value); 230 + } else if (std.ascii.eqlIgnoreCase(header.name, "ratelimit-remaining")) { 231 + result.remaining = parseHeaderInt(header.value); 232 + } else if (std.ascii.eqlIgnoreCase(header.name, "ratelimit-reset")) { 233 + result.reset = parseHeaderInt(header.value); 234 + } else if (std.ascii.eqlIgnoreCase(header.name, "retry-after")) { 235 + result.retry_after = parseHeaderInt(header.value); 236 + } 237 + } 238 + return result; 239 + } 240 + 241 + pub fn isEmpty(self: RateLimitHeaders) bool { 242 + return self.limit == null and self.remaining == null and self.reset == null and self.retry_after == null; 243 + } 199 244 }; 200 245 }; 246 + 247 + fn redirectBehavior(options: HttpTransport.FetchOptions) std.http.Client.Request.RedirectBehavior { 248 + return options.redirect_behavior orelse if (options.payload == null) 249 + std.http.Client.Request.RedirectBehavior.init(3) 250 + else 251 + .unhandled; 252 + } 253 + 254 + fn parseHeaderInt(value: []const u8) ?u64 { 255 + const trimmed = std.mem.trim(u8, value, " \t"); 256 + return std.fmt.parseInt(u64, trimmed, 10) catch null; 257 + } 201 258 202 259 fn streamResponseBody(response: *std.http.Client.Response, writer: *std.Io.Writer) !void { 203 260 var transfer_buffer: [64]u8 = undefined; ··· 244 301 }, 245 302 })); 246 303 } 304 + 305 + test "transport parses rate limit headers" { 306 + const response_bytes = "HTTP/1.1 429 Too Many Requests\r\n" ++ 307 + "RateLimit-Limit: 3000\r\n" ++ 308 + "ratelimit-remaining: 0\r\n" ++ 309 + "RateLimit-Reset: 1710000000\r\n" ++ 310 + "Retry-After: 2\r\n\r\n"; 311 + 312 + const head = try std.http.Client.Response.Head.parse(response_bytes); 313 + const headers = HttpTransport.RateLimitHeaders.fromResponseHead(head); 314 + 315 + try std.testing.expectEqual(@as(?u64, 3000), headers.limit); 316 + try std.testing.expectEqual(@as(?u64, 0), headers.remaining); 317 + try std.testing.expectEqual(@as(?u64, 1710000000), headers.reset); 318 + try std.testing.expectEqual(@as(?u64, 2), headers.retry_after); 319 + try std.testing.expect(!headers.isEmpty()); 320 + }
+249 -1
src/internal/xrpc/xrpc.zig
··· 8 8 const std = @import("std"); 9 9 const Nsid = @import("../syntax/nsid.zig").Nsid; 10 10 const HttpTransport = @import("transport.zig").HttpTransport; 11 + const json_helpers = @import("json.zig"); 11 12 12 13 pub const XrpcClient = struct { 13 14 allocator: std.mem.Allocator, ··· 55 56 return try self.doRequest(url, body); 56 57 } 57 58 59 + pub fn queryChecked( 60 + self: *XrpcClient, 61 + nsid: Nsid, 62 + params: ?std.StringHashMap([]const u8), 63 + retry_policy: RetryPolicy, 64 + ) !Result { 65 + const url = try self.buildUrl(nsid, params); 66 + defer self.allocator.free(url); 67 + 68 + return try self.requestCheckedUrl(url, null, retry_policy); 69 + } 70 + 71 + pub fn procedureChecked( 72 + self: *XrpcClient, 73 + nsid: Nsid, 74 + body: ?[]const u8, 75 + retry_policy: RetryPolicy, 76 + ) !Result { 77 + const url = try self.buildUrl(nsid, null); 78 + defer self.allocator.free(url); 79 + 80 + return try self.requestCheckedUrl(url, body, retry_policy); 81 + } 82 + 58 83 fn buildUrl(self: *XrpcClient, nsid: Nsid, params: ?std.StringHashMap([]const u8)) ![]u8 { 59 84 var url: std.ArrayList(u8) = .empty; 60 85 errdefer url.deinit(self.allocator); ··· 103 128 .allocator = self.allocator, 104 129 .status = result.status, 105 130 .body = result.body, 131 + .rate_limit = result.rate_limit, 106 132 }; 107 133 } 108 134 135 + fn requestCheckedUrl(self: *XrpcClient, url: []const u8, body: ?[]const u8, retry_policy: RetryPolicy) !Result { 136 + const attempts = @max(@as(u8, 1), retry_policy.max_attempts); 137 + var attempt: u8 = 0; 138 + 139 + while (true) : (attempt += 1) { 140 + var response = self.doRequest(url, body) catch |err| { 141 + if (attempt + 1 >= attempts or !retry_policy.retry_transient_errors or !isRetryableTransportError(err)) { 142 + return err; 143 + } 144 + try retry_policy.sleepBeforeRetry(self.transport.io, attempt, null); 145 + continue; 146 + }; 147 + 148 + if (response.ok()) { 149 + return .{ .ok = response }; 150 + } 151 + 152 + if (attempt + 1 < attempts and retry_policy.isRetryableStatus(response.status)) { 153 + const rate_limit = response.rate_limit; 154 + response.deinit(); 155 + try retry_policy.sleepBeforeRetry(self.transport.io, attempt, rate_limit); 156 + continue; 157 + } 158 + 159 + return .{ .err = try XrpcError.fromResponse(response) }; 160 + } 161 + } 162 + 109 163 pub const Response = struct { 110 164 allocator: std.mem.Allocator, 111 165 status: std.http.Status, 112 166 body: []u8, 167 + rate_limit: HttpTransport.RateLimitHeaders = .{}, 113 168 114 169 pub fn deinit(self: *Response) void { 115 170 self.allocator.free(self.body); ··· 117 172 118 173 /// check if request succeeded 119 174 pub fn ok(self: Response) bool { 120 - return self.status == .ok; 175 + return self.status.class() == .success; 121 176 } 122 177 123 178 /// parse body as json ··· 125 180 return try std.json.parseFromSlice(std.json.Value, self.allocator, self.body, .{}); 126 181 } 127 182 }; 183 + 184 + pub const Result = union(enum) { 185 + ok: Response, 186 + err: XrpcError, 187 + 188 + pub fn deinit(self: *Result) void { 189 + switch (self.*) { 190 + .ok => |*response| response.deinit(), 191 + .err => |*xrpc_error| xrpc_error.deinit(), 192 + } 193 + } 194 + }; 195 + 196 + pub const XrpcError = struct { 197 + allocator: std.mem.Allocator, 198 + status: std.http.Status, 199 + error_name: ?[]u8 = null, 200 + message: ?[]u8 = null, 201 + body: []u8, 202 + rate_limit: HttpTransport.RateLimitHeaders = .{}, 203 + 204 + pub fn fromResponse(response: Response) !XrpcError { 205 + var result: XrpcError = .{ 206 + .allocator = response.allocator, 207 + .status = response.status, 208 + .body = response.body, 209 + .rate_limit = response.rate_limit, 210 + }; 211 + errdefer result.deinit(); 212 + 213 + var parsed = std.json.parseFromSlice(std.json.Value, response.allocator, response.body, .{}) catch return result; 214 + defer parsed.deinit(); 215 + 216 + if (json_helpers.getString(parsed.value, "error")) |name| { 217 + result.error_name = try response.allocator.dupe(u8, name); 218 + } 219 + if (json_helpers.getString(parsed.value, "message")) |message| { 220 + result.message = try response.allocator.dupe(u8, message); 221 + } 222 + 223 + return result; 224 + } 225 + 226 + pub fn deinit(self: *XrpcError) void { 227 + if (self.error_name) |name| self.allocator.free(name); 228 + if (self.message) |message| self.allocator.free(message); 229 + self.allocator.free(self.body); 230 + } 231 + }; 232 + 233 + pub const RetryPolicy = struct { 234 + max_attempts: u8 = 3, 235 + base_delay_ms: u64 = 500, 236 + max_delay_ms: u64 = 30_000, 237 + jitter_percent: u8 = 20, 238 + retry_transient_errors: bool = true, 239 + 240 + pub fn none() RetryPolicy { 241 + return .{ .max_attempts = 1 }; 242 + } 243 + 244 + pub fn isRetryableStatus(_: RetryPolicy, status: std.http.Status) bool { 245 + return switch (@intFromEnum(status)) { 246 + 429, 500, 502, 503, 504 => true, 247 + else => false, 248 + }; 249 + } 250 + 251 + pub fn delayMillis(self: RetryPolicy, attempt: u8, rate_limit: ?HttpTransport.RateLimitHeaders) u64 { 252 + return self.delayMillisAt(attempt, rate_limit, null); 253 + } 254 + 255 + pub fn delayMillisAt( 256 + self: RetryPolicy, 257 + attempt: u8, 258 + rate_limit: ?HttpTransport.RateLimitHeaders, 259 + now_unix_seconds: ?u64, 260 + ) u64 { 261 + if (rate_limit) |headers| { 262 + if (headers.retry_after) |seconds| { 263 + const milliseconds = std.math.mul(u64, seconds, std.time.ms_per_s) catch return self.max_delay_ms; 264 + return @min(milliseconds, self.max_delay_ms); 265 + } 266 + if (now_unix_seconds) |now| { 267 + if (headers.reset) |reset| { 268 + if (reset > now) { 269 + const seconds = reset - now; 270 + const milliseconds = std.math.mul(u64, seconds, std.time.ms_per_s) catch return self.max_delay_ms; 271 + return @min(milliseconds, self.max_delay_ms); 272 + } 273 + } 274 + } 275 + } 276 + 277 + const shift: u6 = @intCast(@min(attempt, 16)); 278 + const base = self.base_delay_ms * (@as(u64, 1) << shift); 279 + return @min(base, self.max_delay_ms); 280 + } 281 + 282 + pub fn sleepBeforeRetry(self: RetryPolicy, io: std.Io, attempt: u8, rate_limit: ?HttpTransport.RateLimitHeaders) !void { 283 + const now_seconds: ?u64 = if (rate_limit != null) 284 + @intCast(@max(@as(i64, 0), std.Io.Clock.real.now(io).toSeconds())) 285 + else 286 + null; 287 + var delay_ms = self.delayMillisAt(attempt, rate_limit, now_seconds); 288 + delay_ms = self.jitteredDelayMillis(io, delay_ms); 289 + if (delay_ms == 0) return; 290 + try io.sleep(std.Io.Duration.fromMilliseconds(@intCast(delay_ms)), .awake); 291 + } 292 + 293 + fn jitteredDelayMillis(self: RetryPolicy, io: std.Io, delay_ms: u64) u64 { 294 + if (delay_ms == 0 or self.jitter_percent == 0) return delay_ms; 295 + 296 + const spread = delay_ms * @as(u64, self.jitter_percent) / 100; 297 + if (spread == 0) return delay_ms; 298 + 299 + var source: std.Random.IoSource = .{ .io = io }; 300 + const random = source.interface(); 301 + const min = delay_ms - spread; 302 + const max = std.math.add(u64, delay_ms, spread) catch std.math.maxInt(u64); 303 + return @min(random.intRangeAtMost(u64, min, max), self.max_delay_ms); 304 + } 305 + }; 128 306 }; 129 307 308 + fn isRetryableTransportError(err: anyerror) bool { 309 + return switch (err) { 310 + error.ConnectionRefused, 311 + error.ConnectionResetByPeer, 312 + error.HostUnreachable, 313 + error.NetworkUnreachable, 314 + error.NetworkDown, 315 + error.Timeout, 316 + error.TlsInitializationFailed, 317 + error.Unexpected, 318 + error.ReadFailed, 319 + error.WriteFailed, 320 + => true, 321 + else => false, 322 + }; 323 + } 324 + 130 325 // === tests === 131 326 132 327 test "build url without params" { ··· 155 350 try std.testing.expect(std.mem.startsWith(u8, url, "https://bsky.social/xrpc/app.bsky.actor.getProfile?")); 156 351 try std.testing.expect(std.mem.indexOf(u8, url, "actor=did%3Aplc%3Atest123") != null); 157 352 } 353 + 354 + test "xrpc error parses atproto error envelope and rate limits" { 355 + const body = try std.testing.allocator.dupe(u8, 356 + \\{"error":"RateLimitExceeded","message":"slow down"} 357 + ); 358 + 359 + const response: XrpcClient.Response = .{ 360 + .allocator = std.testing.allocator, 361 + .status = .too_many_requests, 362 + .body = body, 363 + .rate_limit = .{ 364 + .limit = 3000, 365 + .remaining = 0, 366 + .reset = 1710000000, 367 + .retry_after = 2, 368 + }, 369 + }; 370 + 371 + var xrpc_error = try XrpcClient.XrpcError.fromResponse(response); 372 + defer xrpc_error.deinit(); 373 + 374 + try std.testing.expectEqual(.too_many_requests, xrpc_error.status); 375 + try std.testing.expectEqualStrings("RateLimitExceeded", xrpc_error.error_name.?); 376 + try std.testing.expectEqualStrings("slow down", xrpc_error.message.?); 377 + try std.testing.expectEqual(@as(?u64, 3000), xrpc_error.rate_limit.limit); 378 + try std.testing.expectEqual(@as(?u64, 0), xrpc_error.rate_limit.remaining); 379 + try std.testing.expectEqual(@as(?u64, 1710000000), xrpc_error.rate_limit.reset); 380 + try std.testing.expectEqual(@as(?u64, 2), xrpc_error.rate_limit.retry_after); 381 + } 382 + 383 + test "retry policy is conservative and deterministic" { 384 + const policy: XrpcClient.RetryPolicy = .{ 385 + .base_delay_ms = 100, 386 + .max_delay_ms = 1000, 387 + .jitter_percent = 0, 388 + }; 389 + 390 + try std.testing.expect(policy.isRetryableStatus(.too_many_requests)); 391 + try std.testing.expect(policy.isRetryableStatus(.internal_server_error)); 392 + try std.testing.expect(policy.isRetryableStatus(.bad_gateway)); 393 + try std.testing.expect(policy.isRetryableStatus(.service_unavailable)); 394 + try std.testing.expect(policy.isRetryableStatus(.gateway_timeout)); 395 + try std.testing.expect(!policy.isRetryableStatus(.bad_request)); 396 + try std.testing.expect(!policy.isRetryableStatus(.unauthorized)); 397 + try std.testing.expect(!policy.isRetryableStatus(.not_found)); 398 + 399 + try std.testing.expectEqual(@as(u64, 100), policy.delayMillis(0, null)); 400 + try std.testing.expectEqual(@as(u64, 200), policy.delayMillis(1, null)); 401 + try std.testing.expectEqual(@as(u64, 1000), policy.delayMillis(10, null)); 402 + try std.testing.expectEqual(@as(u64, 1000), policy.delayMillis(0, .{ .retry_after = 5 })); 403 + try std.testing.expectEqual(@as(u64, 1000), policy.delayMillisAt(0, .{ .reset = 1005 }, 1000)); 404 + try std.testing.expectEqual(@as(u64, 100), policy.delayMillisAt(0, .{ .reset = 999 }, 1000)); 405 + }