A SpaceTraders Agent
1const std = @import("std");
2const HTTPClient = std.http.Client;
3const Io = std.Io;
4const json = std.json;
5
6const buildin = @import("builtin");
7const models = @import("models.zig");
8
9const log = std.log.scoped(.SpaceTraders);
10
11const Semaphore = struct {
12 mutex: Io.Mutex = .init,
13 cond: Io.Condition = .init,
14 /// It is OK to initialise this field to any value.
15 permits: u64 = 0,
16
17 pub fn post(sem: *Semaphore, io: Io) void {
18 sem.mutex.lockUncancelable(io);
19 defer sem.mutex.unlock(io);
20
21 sem.permits += 1;
22 sem.cond.signal(io);
23 }
24
25 pub fn set(sem: *Semaphore, io: Io, permits: u64) void {
26 sem.mutex.lockUncancelable(io);
27 defer sem.mutex.unlock(io);
28
29 sem.permits = permits;
30 sem.cond.signal(io);
31 }
32
33 pub fn wait(sem: *Semaphore, io: Io) !void {
34 sem.mutex.lockUncancelable(io);
35 defer sem.mutex.unlock(io);
36
37 while (sem.permits == 0)
38 try sem.cond.wait(io, &sem.mutex);
39
40 sem.permits -= 1;
41 if (sem.permits > 0)
42 sem.cond.signal(io);
43 }
44
45 pub fn available(sem: *Semaphore, io: Io) u64 {
46 sem.mutex.lockUncancelable(io);
47 defer sem.mutex.unlock(io);
48
49 return sem.permits;
50 }
51};
52
53pub const Limiter = struct {
54 points: u64,
55 duration: i64,
56 time: ?Io.Timestamp,
57
58 mutex: Io.Mutex = .init,
59 semaphor: Semaphore,
60
61 pub fn init(opts: struct { points: u54 = 2, duration: i64 = 1000 }) Limiter {
62 return .{
63 .points = opts.points,
64 .duration = opts.duration,
65 .time = null,
66 .semaphor = Semaphore{ .permits = opts.points },
67 };
68 }
69
70 pub fn checkReset(l: *Limiter, io: Io) bool {
71 l.mutex.lock(io) catch return false;
72 defer l.mutex.unlock(io);
73
74 if (l.time) |t| {
75 const dur = t.durationTo(Io.Clock.now(.real, io));
76 if (dur.toSeconds() > 0) {
77 l.semaphor.set(io, l.points);
78 l.time = null;
79 return true;
80 }
81 }
82
83 return false;
84 }
85
86 pub fn aquire(l: *Limiter, io: Io) !void {
87 try l.mutex.lock(io);
88 defer l.mutex.unlock(io);
89
90 if (l.time == null) {
91 const now = Io.Clock.now(.real, io);
92 l.time = now.addDuration(.fromMilliseconds(l.duration));
93 }
94
95 return l.semaphor.wait(io);
96 }
97
98 pub fn timeToReset(l: *Limiter, io: Io) Io.Duration {
99 if (l.time) |t| {
100 return t.durationTo(Io.Clock.now(.real, io));
101 }
102 return .zero;
103 }
104
105 pub fn available(l: *Limiter, io: Io) bool {
106 return l.semaphor.available(io) > 0;
107 }
108};
109
110pub const BurstyLimiter = struct {
111 static: Limiter,
112 burst: Limiter,
113
114 pub fn wait(bl: *BurstyLimiter, io: Io) !void {
115 _ = bl.static.checkReset(io);
116 _ = bl.burst.checkReset(io);
117
118 if (!bl.static.available(io)) {
119 if (bl.burst.available(io)) {
120 log.debug("Using Burst", .{});
121 try bl.burst.aquire(io);
122 return;
123 } else {
124 log.warn("No request available, waiting", .{});
125
126 var static = bl.static.checkReset(io);
127 var burst = bl.burst.checkReset(io);
128
129 while (!static and !burst and !bl.static.available(io)) {
130 try io.sleep(bl.static.timeToReset(io), .real);
131 static = bl.static.checkReset(io);
132 burst = bl.burst.checkReset(io);
133 }
134
135 log.debug("sleep done", .{});
136
137 if (burst) {
138 log.debug("Using Burst", .{});
139 try bl.burst.aquire(io);
140 return;
141 }
142 }
143 }
144
145 try bl.static.aquire(io);
146 return;
147 }
148};
149
150pub const AuthType = enum { account, agent, none };
151
152pub const Auth = struct {
153 account: []const u8 = "",
154 agent: []const u8 = "",
155};
156
157pub const RequestOptions = struct {
158 method: std.http.Method = .GET,
159 auth: AuthType = .none,
160 body: Body = .empty,
161
162 free_body_after_sending: bool = false,
163
164 pub const Body = union(enum) {
165 empty: void,
166 buffer: []u8,
167 };
168
169 pub fn authorization(opts: *const RequestOptions, client: *const Client) HTTPClient.Request.Headers.Value {
170 switch (opts.auth) {
171 .account => return .{ .override = client.auth.account },
172 .agent => return .{ .override = client.auth.agent },
173 .none => return .{ .omit = {} },
174 }
175 }
176};
177
178pub const RequestError = error{
179 OutOfMemory,
180 InvalidResponse,
181 RateLimiterError,
182} || HTTPClient.RequestError || HTTPClient.Request.ReceiveHeadError || std.Uri.ParseError;
183
184pub fn RawResponse(comptime T: type) type {
185 return Io.Future(RequestError!json.Parsed(T));
186}
187
188pub fn Response(comptime T: type) type {
189 return RawResponse(models.Wrapper(T));
190}
191
192pub const Client = struct {
193 allocator: std.mem.Allocator,
194 io: Io,
195 limiter: BurstyLimiter,
196
197 base_url: []const u8,
198 auth: Auth,
199
200 http: HTTPClient,
201
202 // Stats
203 total_requests: std.atomic.Value(u64) = .init(0),
204 successful_requests: std.atomic.Value(u64) = .init(0),
205 total_latency: std.atomic.Value(u64) = .init(0),
206 average_latency: std.atomic.Value(u64) = .init(0),
207
208 pub fn init(
209 allocator: std.mem.Allocator,
210 io: std.Io,
211 opts: struct {
212 base_url: []const u8 = "https://api.spacetraders.io/v2",
213 auth: Auth = .{},
214 },
215 ) Client {
216 return .{
217 .allocator = allocator,
218 .io = io,
219 .limiter = .{
220 .static = .init(.{}),
221 .burst = .init(.{ .points = 30, .duration = 60_000 }),
222 },
223 .base_url = opts.base_url,
224 .auth = opts.auth,
225 .http = .{ .allocator = allocator, .io = io },
226 };
227 }
228
229 pub fn deinit(client: *Client) void {
230 client.http.deinit();
231 }
232
233 pub fn get(
234 cl: *Client,
235 comptime T: type,
236 comptime path: []const u8,
237 args: anytype,
238 auth: AuthType,
239 ) !RawResponse(T) {
240 return cl.request(T, path, args, .{ .auth = auth });
241 }
242
243 pub fn getW(
244 cl: *Client,
245 comptime T: type,
246 comptime path: []const u8,
247 args: anytype,
248 auth: AuthType,
249 ) !Response(T) {
250 return cl.get(models.Wrapper(T), path, args, auth);
251 }
252
253 pub fn post(
254 cl: *Client,
255 comptime T: type,
256 comptime path: []const u8,
257 args: anytype,
258 body: anytype,
259 auth: AuthType,
260 ) !RawResponse(T) {
261 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{});
262 log.debug("json body: {s}", .{buffer});
263 return cl.request(T, path, args, .{
264 .method = .POST,
265 .auth = auth,
266 .body = .{ .buffer = buffer },
267 .free_body_after_sending = true,
268 });
269 }
270
271 pub fn postW(
272 cl: *Client,
273 comptime T: type,
274 comptime path: []const u8,
275 args: anytype,
276 body: anytype,
277 auth: AuthType,
278 ) !Response(T) {
279 return cl.post(models.Wrapper(T), path, args, body, auth);
280 }
281
282 pub fn patch(
283 cl: *Client,
284 comptime T: type,
285 comptime path: []const u8,
286 args: anytype,
287 body: anytype,
288 auth: AuthType,
289 ) !RawResponse(T) {
290 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{});
291 log.debug("json body: {s}", .{buffer});
292 return cl.request(T, path, args, .{
293 .method = .PATCH,
294 .auth = auth,
295 .body = .{ .buffer = buffer },
296 .free_body_after_sending = true,
297 });
298 }
299
300 pub fn patchW(
301 cl: *Client,
302 comptime T: type,
303 comptime path: []const u8,
304 args: anytype,
305 body: anytype,
306 auth: AuthType,
307 ) !Response(T) {
308 return cl.patch(models.Wrapper(T), path, args, body, auth);
309 }
310
311 pub fn request(
312 client: *Client,
313 comptime T: type,
314 comptime path: []const u8,
315 args: anytype,
316 opts: RequestOptions,
317 ) !RawResponse(T) {
318 const path_fmt = try std.fmt.allocPrint(client.allocator, path, args);
319 defer client.allocator.free(path_fmt);
320
321 const url = try std.fmt.allocPrint(client.allocator, "{s}{s}", .{ client.base_url, path_fmt });
322
323 const Wrapper = struct {
324 fn call(
325 cl: *Client,
326 url_param: []const u8,
327 opts_param: RequestOptions,
328 ) RequestError!json.Parsed(T) {
329 defer cl.allocator.free(url_param);
330 cl.limiter.wait(cl.io) catch return error.RateLimiterError;
331 return Client.requestRaw(cl, T, url_param, opts_param);
332 }
333 };
334
335 return client.io.concurrent(
336 Wrapper.call,
337 .{ client, url, opts },
338 );
339 }
340
341 const user_agent = blk: {
342 const os = std.fmt.comptimePrint("{}", .{buildin.os.tag})[1..];
343 const arch = std.fmt.comptimePrint("{}", .{buildin.cpu.arch})[1..];
344 break :blk "space/0.1 (" ++ buildin.zig_version_string ++ "/" ++ os ++ "/" ++ arch ++ "; https://tangled.org/altagos.dev/space; +agent)";
345 };
346
347 pub fn requestRaw(
348 client: *Client,
349 comptime T: type,
350 url: []const u8,
351 opts: RequestOptions,
352 ) RequestError!json.Parsed(T) {
353 const uri = std.Uri.parse(url) catch |err| {
354 log.err("Error parsing url: {} - url = {s}", .{ err, url });
355 return err;
356 };
357
358 var req = try client.http.request(opts.method, uri, .{
359 .headers = .{
360 .authorization = opts.authorization(client),
361 .user_agent = .{ .override = user_agent },
362 .content_type = .{ .override = "application/json" },
363 },
364 });
365 defer req.deinit();
366
367 log.debug("requesting: {s}", .{uri.path.percent_encoded});
368
369 const start = Io.Clock.now(.real, client.io);
370
371 switch (opts.body) {
372 .empty => try req.sendBodiless(),
373 .buffer => |body| {
374 try req.sendBodyComplete(body);
375 if (opts.free_body_after_sending) client.allocator.free(body);
376 },
377 }
378
379 var redirect_buffer: [1024]u8 = undefined;
380
381 var response = try req.receiveHead(&redirect_buffer);
382 const colour = blk: {
383 if (@intFromEnum(response.head.status) >= 200 and @intFromEnum(response.head.status) < 300) {
384 break :blk "\x1b[92m";
385 } else {
386 break :blk "\x1b[1m\x1b[91m";
387 }
388 };
389
390 _ = client.total_requests.fetchAdd(1, .seq_cst);
391
392 const latency: u64 = @intCast(start.durationTo(Io.Clock.now(.real, client.io)).toMilliseconds());
393 const old_average = client.average_latency.load(.seq_cst);
394 var new_average: u64 = 0;
395
396 if (old_average == 0) {
397 new_average = latency;
398 } else {
399 const total_reqs = client.total_requests.load(.seq_cst);
400 new_average = old_average * (total_reqs - 1) / total_reqs + latency / total_reqs;
401 }
402
403 client.average_latency.store(new_average, .seq_cst);
404 _ = client.total_latency.fetchAdd(latency, .seq_cst);
405
406 log.debug(
407 "\x1b[2m[latency = {d}ms path = {s}]\x1b[0m received {s}{d} {s}\x1b[0m",
408 .{ latency, url[client.base_url.len..], colour, response.head.status, response.head.reason },
409 );
410
411 // var header_iter = response.head.iterateHeaders();
412 // while (header_iter.next()) |header| {
413 // log.debug("{s}: {s}", .{ header.name, header.value });
414 // }
415
416 var decompress_buffer: [std.compress.flate.max_window_len]u8 = undefined;
417 var transfer_buffer: [64]u8 = undefined;
418 var decompress: std.http.Decompress = undefined;
419
420 const decompressed_body_reader = response.readerDecompressing(
421 &transfer_buffer,
422 &decompress,
423 &decompress_buffer,
424 );
425
426 const body = decompressed_body_reader.allocRemaining(client.allocator, .unlimited) catch
427 return RequestError.OutOfMemory;
428 defer client.allocator.free(body);
429
430 var reader: Io.Reader = .fixed(body);
431 var json_reader: json.Reader = .init(client.allocator, &reader);
432 defer json_reader.deinit();
433
434 const result = json.parseFromTokenSource(T, client.allocator, &json_reader, .{
435 .ignore_unknown_fields = true,
436 }) catch |err| {
437 log.err("Error parsing response: {} - Body:\n{s}", .{ err, body });
438 return RequestError.InvalidResponse;
439 };
440
441 _ = client.successful_requests.fetchAdd(1, .seq_cst);
442
443 return result;
444 }
445};