A SpaceTraders Agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 445 lines 13 kB view raw
1const std = @import("std"); 2const HTTPClient = std.http.Client; 3const Io = std.Io; 4const json = std.json; 5 6const buildin = @import("builtin"); 7const models = @import("models.zig"); 8 9const log = std.log.scoped(.SpaceTraders); 10 11const Semaphore = struct { 12 mutex: Io.Mutex = .init, 13 cond: Io.Condition = .init, 14 /// It is OK to initialise this field to any value. 15 permits: u64 = 0, 16 17 pub fn post(sem: *Semaphore, io: Io) void { 18 sem.mutex.lockUncancelable(io); 19 defer sem.mutex.unlock(io); 20 21 sem.permits += 1; 22 sem.cond.signal(io); 23 } 24 25 pub fn set(sem: *Semaphore, io: Io, permits: u64) void { 26 sem.mutex.lockUncancelable(io); 27 defer sem.mutex.unlock(io); 28 29 sem.permits = permits; 30 sem.cond.signal(io); 31 } 32 33 pub fn wait(sem: *Semaphore, io: Io) !void { 34 sem.mutex.lockUncancelable(io); 35 defer sem.mutex.unlock(io); 36 37 while (sem.permits == 0) 38 try sem.cond.wait(io, &sem.mutex); 39 40 sem.permits -= 1; 41 if (sem.permits > 0) 42 sem.cond.signal(io); 43 } 44 45 pub fn available(sem: *Semaphore, io: Io) u64 { 46 sem.mutex.lockUncancelable(io); 47 defer sem.mutex.unlock(io); 48 49 return sem.permits; 50 } 51}; 52 53pub const Limiter = struct { 54 points: u64, 55 duration: i64, 56 time: ?Io.Timestamp, 57 58 mutex: Io.Mutex = .init, 59 semaphor: Semaphore, 60 61 pub fn init(opts: struct { points: u54 = 2, duration: i64 = 1000 }) Limiter { 62 return .{ 63 .points = opts.points, 64 .duration = opts.duration, 65 .time = null, 66 .semaphor = Semaphore{ .permits = opts.points }, 67 }; 68 } 69 70 pub fn checkReset(l: *Limiter, io: Io) bool { 71 l.mutex.lock(io) catch return false; 72 defer l.mutex.unlock(io); 73 74 if (l.time) |t| { 75 const dur = t.durationTo(Io.Clock.now(.real, io)); 76 if (dur.toSeconds() > 0) { 77 l.semaphor.set(io, l.points); 78 l.time = null; 79 return true; 80 } 81 } 82 83 return false; 84 } 85 86 pub fn aquire(l: *Limiter, io: Io) !void { 87 try l.mutex.lock(io); 88 defer l.mutex.unlock(io); 89 90 if (l.time == null) { 91 const now = Io.Clock.now(.real, io); 92 l.time = now.addDuration(.fromMilliseconds(l.duration)); 93 } 94 95 return l.semaphor.wait(io); 96 } 97 98 pub fn timeToReset(l: *Limiter, io: Io) Io.Duration { 99 if (l.time) |t| { 100 return t.durationTo(Io.Clock.now(.real, io)); 101 } 102 return .zero; 103 } 104 105 pub fn available(l: *Limiter, io: Io) bool { 106 return l.semaphor.available(io) > 0; 107 } 108}; 109 110pub const BurstyLimiter = struct { 111 static: Limiter, 112 burst: Limiter, 113 114 pub fn wait(bl: *BurstyLimiter, io: Io) !void { 115 _ = bl.static.checkReset(io); 116 _ = bl.burst.checkReset(io); 117 118 if (!bl.static.available(io)) { 119 if (bl.burst.available(io)) { 120 log.debug("Using Burst", .{}); 121 try bl.burst.aquire(io); 122 return; 123 } else { 124 log.warn("No request available, waiting", .{}); 125 126 var static = bl.static.checkReset(io); 127 var burst = bl.burst.checkReset(io); 128 129 while (!static and !burst and !bl.static.available(io)) { 130 try io.sleep(bl.static.timeToReset(io), .real); 131 static = bl.static.checkReset(io); 132 burst = bl.burst.checkReset(io); 133 } 134 135 log.debug("sleep done", .{}); 136 137 if (burst) { 138 log.debug("Using Burst", .{}); 139 try bl.burst.aquire(io); 140 return; 141 } 142 } 143 } 144 145 try bl.static.aquire(io); 146 return; 147 } 148}; 149 150pub const AuthType = enum { account, agent, none }; 151 152pub const Auth = struct { 153 account: []const u8 = "", 154 agent: []const u8 = "", 155}; 156 157pub const RequestOptions = struct { 158 method: std.http.Method = .GET, 159 auth: AuthType = .none, 160 body: Body = .empty, 161 162 free_body_after_sending: bool = false, 163 164 pub const Body = union(enum) { 165 empty: void, 166 buffer: []u8, 167 }; 168 169 pub fn authorization(opts: *const RequestOptions, client: *const Client) HTTPClient.Request.Headers.Value { 170 switch (opts.auth) { 171 .account => return .{ .override = client.auth.account }, 172 .agent => return .{ .override = client.auth.agent }, 173 .none => return .{ .omit = {} }, 174 } 175 } 176}; 177 178pub const RequestError = error{ 179 OutOfMemory, 180 InvalidResponse, 181 RateLimiterError, 182} || HTTPClient.RequestError || HTTPClient.Request.ReceiveHeadError || std.Uri.ParseError; 183 184pub fn RawResponse(comptime T: type) type { 185 return Io.Future(RequestError!json.Parsed(T)); 186} 187 188pub fn Response(comptime T: type) type { 189 return RawResponse(models.Wrapper(T)); 190} 191 192pub const Client = struct { 193 allocator: std.mem.Allocator, 194 io: Io, 195 limiter: BurstyLimiter, 196 197 base_url: []const u8, 198 auth: Auth, 199 200 http: HTTPClient, 201 202 // Stats 203 total_requests: std.atomic.Value(u64) = .init(0), 204 successful_requests: std.atomic.Value(u64) = .init(0), 205 total_latency: std.atomic.Value(u64) = .init(0), 206 average_latency: std.atomic.Value(u64) = .init(0), 207 208 pub fn init( 209 allocator: std.mem.Allocator, 210 io: std.Io, 211 opts: struct { 212 base_url: []const u8 = "https://api.spacetraders.io/v2", 213 auth: Auth = .{}, 214 }, 215 ) Client { 216 return .{ 217 .allocator = allocator, 218 .io = io, 219 .limiter = .{ 220 .static = .init(.{}), 221 .burst = .init(.{ .points = 30, .duration = 60_000 }), 222 }, 223 .base_url = opts.base_url, 224 .auth = opts.auth, 225 .http = .{ .allocator = allocator, .io = io }, 226 }; 227 } 228 229 pub fn deinit(client: *Client) void { 230 client.http.deinit(); 231 } 232 233 pub fn get( 234 cl: *Client, 235 comptime T: type, 236 comptime path: []const u8, 237 args: anytype, 238 auth: AuthType, 239 ) !RawResponse(T) { 240 return cl.request(T, path, args, .{ .auth = auth }); 241 } 242 243 pub fn getW( 244 cl: *Client, 245 comptime T: type, 246 comptime path: []const u8, 247 args: anytype, 248 auth: AuthType, 249 ) !Response(T) { 250 return cl.get(models.Wrapper(T), path, args, auth); 251 } 252 253 pub fn post( 254 cl: *Client, 255 comptime T: type, 256 comptime path: []const u8, 257 args: anytype, 258 body: anytype, 259 auth: AuthType, 260 ) !RawResponse(T) { 261 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{}); 262 log.debug("json body: {s}", .{buffer}); 263 return cl.request(T, path, args, .{ 264 .method = .POST, 265 .auth = auth, 266 .body = .{ .buffer = buffer }, 267 .free_body_after_sending = true, 268 }); 269 } 270 271 pub fn postW( 272 cl: *Client, 273 comptime T: type, 274 comptime path: []const u8, 275 args: anytype, 276 body: anytype, 277 auth: AuthType, 278 ) !Response(T) { 279 return cl.post(models.Wrapper(T), path, args, body, auth); 280 } 281 282 pub fn patch( 283 cl: *Client, 284 comptime T: type, 285 comptime path: []const u8, 286 args: anytype, 287 body: anytype, 288 auth: AuthType, 289 ) !RawResponse(T) { 290 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{}); 291 log.debug("json body: {s}", .{buffer}); 292 return cl.request(T, path, args, .{ 293 .method = .PATCH, 294 .auth = auth, 295 .body = .{ .buffer = buffer }, 296 .free_body_after_sending = true, 297 }); 298 } 299 300 pub fn patchW( 301 cl: *Client, 302 comptime T: type, 303 comptime path: []const u8, 304 args: anytype, 305 body: anytype, 306 auth: AuthType, 307 ) !Response(T) { 308 return cl.patch(models.Wrapper(T), path, args, body, auth); 309 } 310 311 pub fn request( 312 client: *Client, 313 comptime T: type, 314 comptime path: []const u8, 315 args: anytype, 316 opts: RequestOptions, 317 ) !RawResponse(T) { 318 const path_fmt = try std.fmt.allocPrint(client.allocator, path, args); 319 defer client.allocator.free(path_fmt); 320 321 const url = try std.fmt.allocPrint(client.allocator, "{s}{s}", .{ client.base_url, path_fmt }); 322 323 const Wrapper = struct { 324 fn call( 325 cl: *Client, 326 url_param: []const u8, 327 opts_param: RequestOptions, 328 ) RequestError!json.Parsed(T) { 329 defer cl.allocator.free(url_param); 330 cl.limiter.wait(cl.io) catch return error.RateLimiterError; 331 return Client.requestRaw(cl, T, url_param, opts_param); 332 } 333 }; 334 335 return client.io.concurrent( 336 Wrapper.call, 337 .{ client, url, opts }, 338 ); 339 } 340 341 const user_agent = blk: { 342 const os = std.fmt.comptimePrint("{}", .{buildin.os.tag})[1..]; 343 const arch = std.fmt.comptimePrint("{}", .{buildin.cpu.arch})[1..]; 344 break :blk "space/0.1 (" ++ buildin.zig_version_string ++ "/" ++ os ++ "/" ++ arch ++ "; https://tangled.org/altagos.dev/space; +agent)"; 345 }; 346 347 pub fn requestRaw( 348 client: *Client, 349 comptime T: type, 350 url: []const u8, 351 opts: RequestOptions, 352 ) RequestError!json.Parsed(T) { 353 const uri = std.Uri.parse(url) catch |err| { 354 log.err("Error parsing url: {} - url = {s}", .{ err, url }); 355 return err; 356 }; 357 358 var req = try client.http.request(opts.method, uri, .{ 359 .headers = .{ 360 .authorization = opts.authorization(client), 361 .user_agent = .{ .override = user_agent }, 362 .content_type = .{ .override = "application/json" }, 363 }, 364 }); 365 defer req.deinit(); 366 367 log.debug("requesting: {s}", .{uri.path.percent_encoded}); 368 369 const start = Io.Clock.now(.real, client.io); 370 371 switch (opts.body) { 372 .empty => try req.sendBodiless(), 373 .buffer => |body| { 374 try req.sendBodyComplete(body); 375 if (opts.free_body_after_sending) client.allocator.free(body); 376 }, 377 } 378 379 var redirect_buffer: [1024]u8 = undefined; 380 381 var response = try req.receiveHead(&redirect_buffer); 382 const colour = blk: { 383 if (@intFromEnum(response.head.status) >= 200 and @intFromEnum(response.head.status) < 300) { 384 break :blk "\x1b[92m"; 385 } else { 386 break :blk "\x1b[1m\x1b[91m"; 387 } 388 }; 389 390 _ = client.total_requests.fetchAdd(1, .seq_cst); 391 392 const latency: u64 = @intCast(start.durationTo(Io.Clock.now(.real, client.io)).toMilliseconds()); 393 const old_average = client.average_latency.load(.seq_cst); 394 var new_average: u64 = 0; 395 396 if (old_average == 0) { 397 new_average = latency; 398 } else { 399 const total_reqs = client.total_requests.load(.seq_cst); 400 new_average = old_average * (total_reqs - 1) / total_reqs + latency / total_reqs; 401 } 402 403 client.average_latency.store(new_average, .seq_cst); 404 _ = client.total_latency.fetchAdd(latency, .seq_cst); 405 406 log.debug( 407 "\x1b[2m[latency = {d}ms path = {s}]\x1b[0m received {s}{d} {s}\x1b[0m", 408 .{ latency, url[client.base_url.len..], colour, response.head.status, response.head.reason }, 409 ); 410 411 // var header_iter = response.head.iterateHeaders(); 412 // while (header_iter.next()) |header| { 413 // log.debug("{s}: {s}", .{ header.name, header.value }); 414 // } 415 416 var decompress_buffer: [std.compress.flate.max_window_len]u8 = undefined; 417 var transfer_buffer: [64]u8 = undefined; 418 var decompress: std.http.Decompress = undefined; 419 420 const decompressed_body_reader = response.readerDecompressing( 421 &transfer_buffer, 422 &decompress, 423 &decompress_buffer, 424 ); 425 426 const body = decompressed_body_reader.allocRemaining(client.allocator, .unlimited) catch 427 return RequestError.OutOfMemory; 428 defer client.allocator.free(body); 429 430 var reader: Io.Reader = .fixed(body); 431 var json_reader: json.Reader = .init(client.allocator, &reader); 432 defer json_reader.deinit(); 433 434 const result = json.parseFromTokenSource(T, client.allocator, &json_reader, .{ 435 .ignore_unknown_fields = true, 436 }) catch |err| { 437 log.err("Error parsing response: {} - Body:\n{s}", .{ err, body }); 438 return RequestError.InvalidResponse; 439 }; 440 441 _ = client.successful_requests.fetchAdd(1, .seq_cst); 442 443 return result; 444 } 445};