atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

add checked xrpc errors and retries

authored by

zzstoatzz and committed by
Tangled
8ba4cc0c 8287ff23

+365 -43
+116 -42
src/internal/xrpc/transport.zig
··· 60 60 return try self.fetchResolved(options, resolved, headers, extra_buf[0..extra_count]); 61 61 } 62 62 63 - if (options.max_response_size) |max| { 64 - const body_buf = try self.allocator.alloc(u8, max); 65 - defer self.allocator.free(body_buf); 66 - var writer = std.Io.Writer.fixed(body_buf); 63 + return try self.fetchUrl(options, headers, extra_buf[0..extra_count]); 64 + } 67 65 68 - const result = self.http_client.fetch(.{ 69 - .location = .{ .url = options.url }, 70 - .response_writer = &writer, 71 - .method = options.method, 72 - .payload = options.payload, 73 - .headers = headers, 74 - .extra_headers = extra_buf[0..extra_count], 75 - .keep_alive = self.keep_alive, 76 - .redirect_behavior = options.redirect_behavior, 77 - }) catch |err| switch (err) { 78 - error.WriteFailed => return error.ResponseTooLarge, 79 - else => |e| return e, 80 - }; 66 + fn fetchUrl( 67 + self: *HttpTransport, 68 + options: FetchOptions, 69 + headers: std.http.Client.Request.Headers, 70 + extra_headers: []const std.http.Header, 71 + ) !FetchResult { 72 + const uri = try std.Uri.parse(options.url); 73 + const redirect_behavior = redirectBehavior(options); 74 + var request = try self.http_client.request(options.method, uri, .{ 75 + .headers = headers, 76 + .extra_headers = extra_headers, 77 + .keep_alive = self.keep_alive, 78 + .redirect_behavior = redirect_behavior, 79 + }); 80 + defer request.deinit(); 81 81 82 - return .{ 83 - .status = result.status, 84 - .body = try self.allocator.dupe(u8, writer.buffered()), 85 - }; 82 + if (options.payload) |payload| { 83 + request.transfer_encoding = .{ .content_length = payload.len }; 84 + var body = try request.sendBodyUnflushed(&.{}); 85 + try body.writer.writeAll(payload); 86 + try body.end(); 87 + try request.connection.?.flush(); 88 + } else { 89 + try request.sendBodiless(); 86 90 } 87 91 88 - var aw: std.Io.Writer.Allocating = .init(self.allocator); 89 - defer aw.deinit(); 92 + const redirect_buffer = try self.allocRedirectBuffer(redirect_behavior); 93 + defer self.freeRedirectBuffer(redirect_buffer, redirect_behavior); 90 94 91 - const result = try self.http_client.fetch(.{ 92 - .location = .{ .url = options.url }, 93 - .response_writer = &aw.writer, 94 - .method = options.method, 95 - .payload = options.payload, 96 - .headers = headers, 97 - .extra_headers = extra_buf[0..extra_count], 98 - .keep_alive = self.keep_alive, 99 - .redirect_behavior = options.redirect_behavior, 100 - }); 101 - 102 - return .{ 103 - .status = result.status, 104 - .body = try self.allocator.dupe(u8, aw.written()), 105 - }; 95 + var response = try request.receiveHead(redirect_buffer); 96 + return try self.readFetchResult(options, &response); 106 97 } 107 98 108 99 fn fetchResolved( ··· 130 121 .proxied_host = logical_host, 131 122 }); 132 123 124 + const redirect_behavior = redirectBehavior(options); 133 125 var request = self.http_client.request(options.method, uri, .{ 134 126 .connection = connection, 135 127 .headers = headers, 136 128 .extra_headers = extra_headers, 137 129 .keep_alive = self.keep_alive, 138 - .redirect_behavior = options.redirect_behavior orelse .unhandled, 130 + .redirect_behavior = redirect_behavior, 139 131 }) catch |err| { 140 132 self.http_client.connection_pool.release(connection, self.io); 141 133 return err; ··· 152 144 try request.sendBodiless(); 153 145 } 154 146 155 - var response = try request.receiveHead(&.{}); 147 + const redirect_buffer = try self.allocRedirectBuffer(redirect_behavior); 148 + defer self.freeRedirectBuffer(redirect_buffer, redirect_behavior); 149 + 150 + var response = try request.receiveHead(redirect_buffer); 151 + return try self.readFetchResult(options, &response); 152 + } 153 + 154 + fn allocRedirectBuffer(self: *HttpTransport, behavior: std.http.Client.Request.RedirectBehavior) ![]u8 { 155 + if (behavior == .unhandled) return &.{}; 156 + return try self.allocator.alloc(u8, 8 * 1024); 157 + } 158 + 159 + fn freeRedirectBuffer(self: *HttpTransport, buffer: []u8, behavior: std.http.Client.Request.RedirectBehavior) void { 160 + if (behavior != .unhandled) self.allocator.free(buffer); 161 + } 162 + 163 + fn readFetchResult( 164 + self: *HttpTransport, 165 + options: FetchOptions, 166 + response: *std.http.Client.Response, 167 + ) !FetchResult { 168 + const rate_limit = RateLimitHeaders.fromResponseHead(response.head); 169 + 156 170 if (options.max_response_size) |max| { 157 171 const body_buf = try self.allocator.alloc(u8, max); 158 172 defer self.allocator.free(body_buf); 159 173 var writer = std.Io.Writer.fixed(body_buf); 160 - try streamResponseBody(&response, &writer); 174 + try streamResponseBody(response, &writer); 161 175 return .{ 162 176 .status = response.head.status, 163 177 .body = try self.allocator.dupe(u8, writer.buffered()), 178 + .rate_limit = rate_limit, 164 179 }; 165 180 } 166 181 167 182 var aw: std.Io.Writer.Allocating = .init(self.allocator); 168 183 defer aw.deinit(); 169 - try streamResponseBody(&response, &aw.writer); 184 + try streamResponseBody(response, &aw.writer); 170 185 return .{ 171 186 .status = response.head.status, 172 187 .body = try self.allocator.dupe(u8, aw.written()), 188 + .rate_limit = rate_limit, 173 189 }; 174 190 } 175 191 ··· 196 212 pub const FetchResult = struct { 197 213 status: std.http.Status, 198 214 body: []u8, 215 + rate_limit: RateLimitHeaders = .{}, 216 + }; 217 + 218 + pub const RateLimitHeaders = struct { 219 + limit: ?u64 = null, 220 + remaining: ?u64 = null, 221 + reset: ?u64 = null, 222 + retry_after: ?u64 = null, 223 + 224 + pub fn fromResponseHead(head: std.http.Client.Response.Head) RateLimitHeaders { 225 + var result: RateLimitHeaders = .{}; 226 + var it = head.iterateHeaders(); 227 + while (it.next()) |header| { 228 + if (std.ascii.eqlIgnoreCase(header.name, "ratelimit-limit")) { 229 + result.limit = parseHeaderInt(header.value); 230 + } else if (std.ascii.eqlIgnoreCase(header.name, "ratelimit-remaining")) { 231 + result.remaining = parseHeaderInt(header.value); 232 + } else if (std.ascii.eqlIgnoreCase(header.name, "ratelimit-reset")) { 233 + result.reset = parseHeaderInt(header.value); 234 + } else if (std.ascii.eqlIgnoreCase(header.name, "retry-after")) { 235 + result.retry_after = parseHeaderInt(header.value); 236 + } 237 + } 238 + return result; 239 + } 240 + 241 + pub fn isEmpty(self: RateLimitHeaders) bool { 242 + return self.limit == null and self.remaining == null and self.reset == null and self.retry_after == null; 243 + } 199 244 }; 200 245 }; 246 + 247 + fn redirectBehavior(options: HttpTransport.FetchOptions) std.http.Client.Request.RedirectBehavior { 248 + return options.redirect_behavior orelse if (options.payload == null) 249 + std.http.Client.Request.RedirectBehavior.init(3) 250 + else 251 + .unhandled; 252 + } 253 + 254 + fn parseHeaderInt(value: []const u8) ?u64 { 255 + const trimmed = std.mem.trim(u8, value, " \t"); 256 + return std.fmt.parseInt(u64, trimmed, 10) catch null; 257 + } 201 258 202 259 fn streamResponseBody(response: *std.http.Client.Response, writer: *std.Io.Writer) !void { 203 260 var transfer_buffer: [64]u8 = undefined; ··· 244 301 }, 245 302 })); 246 303 } 304 + 305 + test "transport parses rate limit headers" { 306 + const response_bytes = "HTTP/1.1 429 Too Many Requests\r\n" ++ 307 + "RateLimit-Limit: 3000\r\n" ++ 308 + "ratelimit-remaining: 0\r\n" ++ 309 + "RateLimit-Reset: 1710000000\r\n" ++ 310 + "Retry-After: 2\r\n\r\n"; 311 + 312 + const head = try std.http.Client.Response.Head.parse(response_bytes); 313 + const headers = HttpTransport.RateLimitHeaders.fromResponseHead(head); 314 + 315 + try std.testing.expectEqual(@as(?u64, 3000), headers.limit); 316 + try std.testing.expectEqual(@as(?u64, 0), headers.remaining); 317 + try std.testing.expectEqual(@as(?u64, 1710000000), headers.reset); 318 + try std.testing.expectEqual(@as(?u64, 2), headers.retry_after); 319 + try std.testing.expect(!headers.isEmpty()); 320 + }
+249 -1
src/internal/xrpc/xrpc.zig
··· 8 8 const std = @import("std"); 9 9 const Nsid = @import("../syntax/nsid.zig").Nsid; 10 10 const HttpTransport = @import("transport.zig").HttpTransport; 11 + const json_helpers = @import("json.zig"); 11 12 12 13 pub const XrpcClient = struct { 13 14 allocator: std.mem.Allocator, ··· 55 56 return try self.doRequest(url, body); 56 57 } 57 58 59 + pub fn queryChecked( 60 + self: *XrpcClient, 61 + nsid: Nsid, 62 + params: ?std.StringHashMap([]const u8), 63 + retry_policy: RetryPolicy, 64 + ) !Result { 65 + const url = try self.buildUrl(nsid, params); 66 + defer self.allocator.free(url); 67 + 68 + return try self.requestCheckedUrl(url, null, retry_policy); 69 + } 70 + 71 + pub fn procedureChecked( 72 + self: *XrpcClient, 73 + nsid: Nsid, 74 + body: ?[]const u8, 75 + retry_policy: RetryPolicy, 76 + ) !Result { 77 + const url = try self.buildUrl(nsid, null); 78 + defer self.allocator.free(url); 79 + 80 + return try self.requestCheckedUrl(url, body, retry_policy); 81 + } 82 + 58 83 fn buildUrl(self: *XrpcClient, nsid: Nsid, params: ?std.StringHashMap([]const u8)) ![]u8 { 59 84 var url: std.ArrayList(u8) = .empty; 60 85 errdefer url.deinit(self.allocator); ··· 103 128 .allocator = self.allocator, 104 129 .status = result.status, 105 130 .body = result.body, 131 + .rate_limit = result.rate_limit, 106 132 }; 107 133 } 108 134 135 + fn requestCheckedUrl(self: *XrpcClient, url: []const u8, body: ?[]const u8, retry_policy: RetryPolicy) !Result { 136 + const attempts = @max(@as(u8, 1), retry_policy.max_attempts); 137 + var attempt: u8 = 0; 138 + 139 + while (true) : (attempt += 1) { 140 + var response = self.doRequest(url, body) catch |err| { 141 + if (attempt + 1 >= attempts or !retry_policy.retry_transient_errors or !isRetryableTransportError(err)) { 142 + return err; 143 + } 144 + try retry_policy.sleepBeforeRetry(self.transport.io, attempt, null); 145 + continue; 146 + }; 147 + 148 + if (response.ok()) { 149 + return .{ .ok = response }; 150 + } 151 + 152 + if (attempt + 1 < attempts and retry_policy.isRetryableStatus(response.status)) { 153 + const rate_limit = response.rate_limit; 154 + response.deinit(); 155 + try retry_policy.sleepBeforeRetry(self.transport.io, attempt, rate_limit); 156 + continue; 157 + } 158 + 159 + return .{ .err = try XrpcError.fromResponse(response) }; 160 + } 161 + } 162 + 109 163 pub const Response = struct { 110 164 allocator: std.mem.Allocator, 111 165 status: std.http.Status, 112 166 body: []u8, 167 + rate_limit: HttpTransport.RateLimitHeaders = .{}, 113 168 114 169 pub fn deinit(self: *Response) void { 115 170 self.allocator.free(self.body); ··· 117 172 118 173 /// check if request succeeded 119 174 pub fn ok(self: Response) bool { 120 - return self.status == .ok; 175 + return self.status.class() == .success; 121 176 } 122 177 123 178 /// parse body as json ··· 125 180 return try std.json.parseFromSlice(std.json.Value, self.allocator, self.body, .{}); 126 181 } 127 182 }; 183 + 184 + pub const Result = union(enum) { 185 + ok: Response, 186 + err: XrpcError, 187 + 188 + pub fn deinit(self: *Result) void { 189 + switch (self.*) { 190 + .ok => |*response| response.deinit(), 191 + .err => |*xrpc_error| xrpc_error.deinit(), 192 + } 193 + } 194 + }; 195 + 196 + pub const XrpcError = struct { 197 + allocator: std.mem.Allocator, 198 + status: std.http.Status, 199 + error_name: ?[]u8 = null, 200 + message: ?[]u8 = null, 201 + body: []u8, 202 + rate_limit: HttpTransport.RateLimitHeaders = .{}, 203 + 204 + pub fn fromResponse(response: Response) !XrpcError { 205 + var result: XrpcError = .{ 206 + .allocator = response.allocator, 207 + .status = response.status, 208 + .body = response.body, 209 + .rate_limit = response.rate_limit, 210 + }; 211 + errdefer result.deinit(); 212 + 213 + var parsed = std.json.parseFromSlice(std.json.Value, response.allocator, response.body, .{}) catch return result; 214 + defer parsed.deinit(); 215 + 216 + if (json_helpers.getString(parsed.value, "error")) |name| { 217 + result.error_name = try response.allocator.dupe(u8, name); 218 + } 219 + if (json_helpers.getString(parsed.value, "message")) |message| { 220 + result.message = try response.allocator.dupe(u8, message); 221 + } 222 + 223 + return result; 224 + } 225 + 226 + pub fn deinit(self: *XrpcError) void { 227 + if (self.error_name) |name| self.allocator.free(name); 228 + if (self.message) |message| self.allocator.free(message); 229 + self.allocator.free(self.body); 230 + } 231 + }; 232 + 233 + pub const RetryPolicy = struct { 234 + max_attempts: u8 = 3, 235 + base_delay_ms: u64 = 500, 236 + max_delay_ms: u64 = 30_000, 237 + jitter_percent: u8 = 20, 238 + retry_transient_errors: bool = true, 239 + 240 + pub fn none() RetryPolicy { 241 + return .{ .max_attempts = 1 }; 242 + } 243 + 244 + pub fn isRetryableStatus(_: RetryPolicy, status: std.http.Status) bool { 245 + return switch (@intFromEnum(status)) { 246 + 429, 500, 502, 503, 504 => true, 247 + else => false, 248 + }; 249 + } 250 + 251 + pub fn delayMillis(self: RetryPolicy, attempt: u8, rate_limit: ?HttpTransport.RateLimitHeaders) u64 { 252 + return self.delayMillisAt(attempt, rate_limit, null); 253 + } 254 + 255 + pub fn delayMillisAt( 256 + self: RetryPolicy, 257 + attempt: u8, 258 + rate_limit: ?HttpTransport.RateLimitHeaders, 259 + now_unix_seconds: ?u64, 260 + ) u64 { 261 + if (rate_limit) |headers| { 262 + if (headers.retry_after) |seconds| { 263 + const milliseconds = std.math.mul(u64, seconds, std.time.ms_per_s) catch return self.max_delay_ms; 264 + return @min(milliseconds, self.max_delay_ms); 265 + } 266 + if (now_unix_seconds) |now| { 267 + if (headers.reset) |reset| { 268 + if (reset > now) { 269 + const seconds = reset - now; 270 + const milliseconds = std.math.mul(u64, seconds, std.time.ms_per_s) catch return self.max_delay_ms; 271 + return @min(milliseconds, self.max_delay_ms); 272 + } 273 + } 274 + } 275 + } 276 + 277 + const shift: u6 = @intCast(@min(attempt, 16)); 278 + const base = self.base_delay_ms * (@as(u64, 1) << shift); 279 + return @min(base, self.max_delay_ms); 280 + } 281 + 282 + pub fn sleepBeforeRetry(self: RetryPolicy, io: std.Io, attempt: u8, rate_limit: ?HttpTransport.RateLimitHeaders) !void { 283 + const now_seconds: ?u64 = if (rate_limit != null) 284 + @intCast(@max(@as(i64, 0), std.Io.Clock.real.now(io).toSeconds())) 285 + else 286 + null; 287 + var delay_ms = self.delayMillisAt(attempt, rate_limit, now_seconds); 288 + delay_ms = self.jitteredDelayMillis(io, delay_ms); 289 + if (delay_ms == 0) return; 290 + try io.sleep(std.Io.Duration.fromMilliseconds(@intCast(delay_ms)), .awake); 291 + } 292 + 293 + fn jitteredDelayMillis(self: RetryPolicy, io: std.Io, delay_ms: u64) u64 { 294 + if (delay_ms == 0 or self.jitter_percent == 0) return delay_ms; 295 + 296 + const spread = delay_ms * @as(u64, self.jitter_percent) / 100; 297 + if (spread == 0) return delay_ms; 298 + 299 + var source: std.Random.IoSource = .{ .io = io }; 300 + const random = source.interface(); 301 + const min = delay_ms - spread; 302 + const max = std.math.add(u64, delay_ms, spread) catch std.math.maxInt(u64); 303 + return @min(random.intRangeAtMost(u64, min, max), self.max_delay_ms); 304 + } 305 + }; 128 306 }; 129 307 308 + fn isRetryableTransportError(err: anyerror) bool { 309 + return switch (err) { 310 + error.ConnectionRefused, 311 + error.ConnectionResetByPeer, 312 + error.HostUnreachable, 313 + error.NetworkUnreachable, 314 + error.NetworkDown, 315 + error.Timeout, 316 + error.TlsInitializationFailed, 317 + error.Unexpected, 318 + error.ReadFailed, 319 + error.WriteFailed, 320 + => true, 321 + else => false, 322 + }; 323 + } 324 + 130 325 // === tests === 131 326 132 327 test "build url without params" { ··· 155 350 try std.testing.expect(std.mem.startsWith(u8, url, "https://bsky.social/xrpc/app.bsky.actor.getProfile?")); 156 351 try std.testing.expect(std.mem.indexOf(u8, url, "actor=did%3Aplc%3Atest123") != null); 157 352 } 353 + 354 + test "xrpc error parses atproto error envelope and rate limits" { 355 + const body = try std.testing.allocator.dupe(u8, 356 + \\{"error":"RateLimitExceeded","message":"slow down"} 357 + ); 358 + 359 + const response: XrpcClient.Response = .{ 360 + .allocator = std.testing.allocator, 361 + .status = .too_many_requests, 362 + .body = body, 363 + .rate_limit = .{ 364 + .limit = 3000, 365 + .remaining = 0, 366 + .reset = 1710000000, 367 + .retry_after = 2, 368 + }, 369 + }; 370 + 371 + var xrpc_error = try XrpcClient.XrpcError.fromResponse(response); 372 + defer xrpc_error.deinit(); 373 + 374 + try std.testing.expectEqual(.too_many_requests, xrpc_error.status); 375 + try std.testing.expectEqualStrings("RateLimitExceeded", xrpc_error.error_name.?); 376 + try std.testing.expectEqualStrings("slow down", xrpc_error.message.?); 377 + try std.testing.expectEqual(@as(?u64, 3000), xrpc_error.rate_limit.limit); 378 + try std.testing.expectEqual(@as(?u64, 0), xrpc_error.rate_limit.remaining); 379 + try std.testing.expectEqual(@as(?u64, 1710000000), xrpc_error.rate_limit.reset); 380 + try std.testing.expectEqual(@as(?u64, 2), xrpc_error.rate_limit.retry_after); 381 + } 382 + 383 + test "retry policy is conservative and deterministic" { 384 + const policy: XrpcClient.RetryPolicy = .{ 385 + .base_delay_ms = 100, 386 + .max_delay_ms = 1000, 387 + .jitter_percent = 0, 388 + }; 389 + 390 + try std.testing.expect(policy.isRetryableStatus(.too_many_requests)); 391 + try std.testing.expect(policy.isRetryableStatus(.internal_server_error)); 392 + try std.testing.expect(policy.isRetryableStatus(.bad_gateway)); 393 + try std.testing.expect(policy.isRetryableStatus(.service_unavailable)); 394 + try std.testing.expect(policy.isRetryableStatus(.gateway_timeout)); 395 + try std.testing.expect(!policy.isRetryableStatus(.bad_request)); 396 + try std.testing.expect(!policy.isRetryableStatus(.unauthorized)); 397 + try std.testing.expect(!policy.isRetryableStatus(.not_found)); 398 + 399 + try std.testing.expectEqual(@as(u64, 100), policy.delayMillis(0, null)); 400 + try std.testing.expectEqual(@as(u64, 200), policy.delayMillis(1, null)); 401 + try std.testing.expectEqual(@as(u64, 1000), policy.delayMillis(10, null)); 402 + try std.testing.expectEqual(@as(u64, 1000), policy.delayMillis(0, .{ .retry_after = 5 })); 403 + try std.testing.expectEqual(@as(u64, 1000), policy.delayMillisAt(0, .{ .reset = 1005 }, 1000)); 404 + try std.testing.expectEqual(@as(u64, 100), policy.delayMillisAt(0, .{ .reset = 999 }, 1000)); 405 + }