MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1const std = @import("std");
2const c_allocator = std.heap.c_allocator;
3
4const debug = @import("debug.zig");
5const extractor = @import("extractor.zig");
6const uv = @import("uv.zig");
7const tlsuv = @import("tlsuv.zig");
8const nghttp2 = @import("nghttp2.zig");
9
10const config = @import("config");
11const user_agent: [:0]const u8 = "ant/" ++ config.version;
12
13pub const FetchError = error{
14 ConnectionFailed,
15 TlsError,
16 Http2Error,
17 Timeout,
18 InvalidUrl,
19 ResponseError,
20 OutOfMemory,
21};
22
23pub const ParsedUrl = struct {
24 scheme: []const u8,
25 host: []const u8,
26 port: u16,
27 path: []const u8,
28
29 pub fn parse(url: []const u8) !ParsedUrl {
30 var remaining = url;
31 const scheme_end = std.mem.indexOf(u8, remaining, "://") orelse return error.InvalidUrl;
32 const scheme = remaining[0..scheme_end];
33 remaining = remaining[scheme_end + 3 ..];
34
35 const path_start = std.mem.indexOf(u8, remaining, "/") orelse remaining.len;
36 const host_port = remaining[0..path_start];
37 remaining = if (path_start < remaining.len) remaining[path_start..] else "/";
38
39 var host: []const u8 = host_port;
40 var port: u16 = if (std.mem.eql(u8, scheme, "https")) 443 else 80;
41
42 if (std.mem.indexOf(u8, host_port, ":")) |colon| {
43 host = host_port[0..colon];
44 port = std.fmt.parseInt(u16, host_port[colon + 1 ..], 10) catch return error.InvalidUrl;
45 }
46
47 return .{ .scheme = scheme, .host = host, .port = port, .path = remaining };
48 }
49};
50
51pub const StreamHandler = struct {
52 on_data: *const fn ([]const u8, ?*anyopaque) void,
53 on_complete: *const fn (u16, ?*anyopaque) void,
54 on_error: *const fn (FetchError, ?*anyopaque) void,
55 user_data: ?*anyopaque,
56
57 pub fn init(
58 on_data: *const fn ([]const u8, ?*anyopaque) void,
59 on_complete: *const fn (u16, ?*anyopaque) void,
60 on_error: *const fn (FetchError, ?*anyopaque) void,
61 user_data: ?*anyopaque,
62 ) StreamHandler {
63 return .{ .on_data = on_data, .on_complete = on_complete, .on_error = on_error, .user_data = user_data };
64 }
65};
66
67const PendingRequest = struct {
68 url: []const u8,
69 handler: ?StreamHandler,
70};
71
72const MAX_PENDING_REQUESTS = 20;
73const NUM_CONNECTIONS = 6;
74const NUM_META_CONNECTIONS = 3;
75const META_SLOW_LOG_MS: u64 = 250;
76
77const Http2Client = struct {
78 allocator: std.mem.Allocator,
79 loop: *uv.loop_t,
80 tls: tlsuv.stream_t,
81 h2_session: ?*nghttp2.session,
82 host: [:0]const u8,
83 use_tls: bool,
84 connected: i32,
85 connect_pending: bool,
86 closing: bool,
87 write_buf: std.ArrayListUnmanaged(u8),
88 requests: [MAX_PENDING_REQUESTS]RequestState,
89 request_count: usize,
90 requests_done: usize,
91 last_response_status_code: u16,
92
93 const RequestState = struct {
94 stream_id: i32,
95 path: ?[:0]const u8,
96 on_data: ?*const fn ([]const u8, ?*anyopaque) void,
97 on_complete: ?*const fn (u16, ?*anyopaque) void,
98 on_error: ?*const fn (FetchError, ?*anyopaque) void,
99 userdata: ?*anyopaque,
100 response_body: std.ArrayListUnmanaged(u8),
101 status_code: u16,
102 done: bool,
103 has_error: bool,
104 start_ns: u64,
105 end_ns: u64,
106 bytes: usize,
107 content_encoding: ContentEncoding,
108 };
109
110 const ContentEncoding = enum {
111 identity,
112 gzip,
113 };
114
115 const alpn_protocols = [_][*:0]const u8{ "h2", "http/1.1" };
116
117 pub fn init(allocator: std.mem.Allocator, host: []const u8, use_tls: bool) !*Http2Client {
118 const client = try allocator.create(Http2Client);
119 errdefer allocator.destroy(client);
120
121 const host_z = try allocator.dupeZ(u8, host);
122 errdefer allocator.free(host_z);
123
124 client.* = .{
125 .allocator = allocator,
126 .loop = uv.uv_default_loop(),
127 .tls = .{},
128 .h2_session = null,
129 .host = host_z,
130 .use_tls = use_tls,
131 .connected = 0,
132 .connect_pending = false,
133 .closing = false,
134 .write_buf = .{},
135 .requests = undefined,
136 .request_count = 0,
137 .requests_done = 0,
138 .last_response_status_code = 0,
139 };
140
141 for (&client.requests) |*req| {
142 req.* = .{
143 .stream_id = 0,
144 .path = null,
145 .on_data = null,
146 .on_complete = null,
147 .on_error = null,
148 .userdata = null,
149 .response_body = .{},
150 .status_code = 0,
151 .done = false,
152 .has_error = false,
153 .start_ns = 0,
154 .end_ns = 0,
155 .bytes = 0,
156 .content_encoding = .identity,
157 };
158 }
159
160 if (tlsuv.tlsuv_stream_init(client.loop, &client.tls, null) != 0) {
161 allocator.free(host_z);
162 allocator.destroy(client);
163 return error.ConnectionFailed;
164 }
165
166 _ = tlsuv.tlsuv_stream_set_hostname(&client.tls, host_z.ptr);
167 _ = tlsuv.tlsuv_stream_set_protocols(&client.tls, 2, &alpn_protocols);
168
169 return client;
170 }
171
172 pub fn deinit(self: *Http2Client) void {
173 self.closing = true;
174 self.connect_pending = false;
175
176 for (&self.requests) |*req| {
177 req.on_data = null;
178 req.on_complete = null;
179 req.on_error = null;
180 req.userdata = null;
181 }
182
183 if (self.connected > 0) {
184 self.tls.data = self;
185 _ = tlsuv.tlsuv_stream_close(&self.tls, onStreamClose);
186 while (self.connected > 0) _ = uv.uv_run(self.loop, uv.RUN_ONCE);
187 }
188
189 if (self.h2_session) |session| nghttp2.nghttp2_session_del(session);
190
191 for (&self.requests) |*req| {
192 if (req.stream_id != -1) {
193 if (req.path) |p| self.allocator.free(p);
194 req.response_body.deinit(self.allocator);
195 }
196 }
197
198 self.write_buf.deinit(self.allocator);
199 self.allocator.free(self.host);
200 self.allocator.destroy(self);
201 }
202
203 pub fn resetRequests(self: *Http2Client) void {
204 for (self.requests[0..self.request_count]) |*req| {
205 if (req.stream_id != -1) {
206 if (req.path) |p| self.allocator.free(p);
207 req.response_body.deinit(self.allocator);
208 }
209 req.* = .{
210 .stream_id = 0,
211 .path = null,
212 .on_data = null,
213 .on_complete = null,
214 .on_error = null,
215 .userdata = null,
216 .response_body = .{},
217 .status_code = 0,
218 .done = false,
219 .has_error = false,
220 .start_ns = 0,
221 .end_ns = 0,
222 .bytes = 0,
223 .content_encoding = .identity,
224 };
225 }
226 self.request_count = 0;
227 self.requests_done = 0;
228 }
229
230 pub fn hasCapacity(self: *const Http2Client) bool {
231 for (self.requests[0..self.request_count]) |req| {
232 if (req.stream_id == -1) return true;
233 }
234 return self.request_count < MAX_PENDING_REQUESTS - 1;
235 }
236
237 pub fn recycleCompletedRequests(self: *Http2Client) void {
238 if (self.requests_done == 0) return;
239
240 for (self.requests[0..self.request_count]) |*req| {
241 if (req.done and req.stream_id != -1) {
242 if (req.path) |p| self.allocator.free(p);
243 req.response_body.deinit(self.allocator);
244 req.path = null;
245 req.response_body = .{};
246 req.stream_id = -1;
247 }
248 }
249 }
250
251 fn findOrAllocSlot(self: *Http2Client) ?*RequestState {
252 for (self.requests[0..self.request_count]) |*req| {
253 if (req.stream_id == -1) return req;
254 }
255 if (self.request_count < MAX_PENDING_REQUESTS) {
256 const req = &self.requests[self.request_count];
257 self.request_count += 1;
258 return req;
259 }
260 return null;
261 }
262
263 fn onStreamClose(handle: *uv.handle_t) callconv(.c) void {
264 const tls: *tlsuv.stream_t = @ptrCast(@alignCast(handle));
265 const client: *Http2Client = @ptrCast(@alignCast(tls.data));
266 client.connected = -2;
267 client.connect_pending = false;
268 }
269
270 fn findRequest(self: *Http2Client, stream_id: i32) ?*RequestState {
271 for (self.requests[0..self.request_count]) |*req| if (req.stream_id == stream_id) return req;
272 return null;
273 }
274
275 fn h2Send(_: ?*nghttp2.session, data: [*c]const u8, len: usize, _: c_int, ud: ?*anyopaque) callconv(.c) isize {
276 const client: *Http2Client = @ptrCast(@alignCast(ud));
277 client.write_buf.appendSlice(client.allocator, data[0..len]) catch return nghttp2.ERR_NOMEM;
278 return @intCast(len);
279 }
280
281 fn h2FrameRecv(_: ?*nghttp2.session, frame: *const nghttp2.frame, ud: ?*anyopaque) callconv(.c) c_int {
282 const client: *Http2Client = @ptrCast(@alignCast(ud));
283 if (frame.hd.flags & nghttp2.FLAG_END_STREAM != 0) {
284 if (client.findRequest(frame.hd.stream_id)) |req| {
285 if (!req.done) {
286 req.done = true;
287 req.end_ns = @intCast(std.time.nanoTimestamp());
288 client.requests_done += 1;
289 if (req.on_complete) |cb| cb(req.status_code, req.userdata);
290 }
291 }
292 }
293 return 0;
294 }
295
296 fn h2DataChunk(session: ?*nghttp2.session, _: u8, stream_id: i32, data: [*c]const u8, len: usize, ud: ?*anyopaque) callconv(.c) c_int {
297 const client: *Http2Client = @ptrCast(@alignCast(ud));
298 const req = client.findRequest(stream_id) orelse return 0;
299 if (req.on_data) |cb| cb(data[0..len], req.userdata) else req.response_body.appendSlice(client.allocator, data[0..len]) catch {
300 req.has_error = true;
301 }; req.bytes += len;
302 if (session) |s| _ = nghttp2.nghttp2_session_consume(s, stream_id, len);
303
304 return 0;
305 }
306
307 fn h2Header(_: ?*nghttp2.session, frame: *const nghttp2.frame, name: [*c]const u8, namelen: usize, value: [*c]const u8, valuelen: usize, _: u8, ud: ?*anyopaque) callconv(.c) c_int {
308 const client: *Http2Client = @ptrCast(@alignCast(ud));
309 if (frame.hd.type != nghttp2.HEADERS) return 0;
310 const req = client.findRequest(frame.hd.stream_id) orelse return 0;
311 if (namelen == 7 and std.mem.eql(u8, name[0..7], ":status"))
312 req.status_code = std.fmt.parseInt(u16, value[0..valuelen], 10) catch 0;
313 if (std.mem.eql(u8, name[0..namelen], "content-encoding")) {
314 if (std.mem.startsWith(u8, value[0..valuelen], "gzip")) {
315 req.content_encoding = .gzip;
316 }
317 }
318 return 0;
319 }
320
321 fn h2StreamClose(_: ?*nghttp2.session, stream_id: i32, error_code: u32, ud: ?*anyopaque) callconv(.c) c_int {
322 const client: *Http2Client = @ptrCast(@alignCast(ud));
323 const req = client.findRequest(stream_id) orelse return 0;
324 if (!req.done) {
325 req.done = true;
326 req.end_ns = @intCast(std.time.nanoTimestamp());
327 client.requests_done += 1;
328 if (error_code != 0) {
329 req.has_error = true;
330 if (req.on_error) |cb| cb(FetchError.Http2Error, req.userdata);
331 } else if (req.on_complete) |cb| cb(req.status_code, req.userdata);
332 }
333 return 0;
334 }
335
336 fn initH2(self: *Http2Client) !void {
337 var callbacks: *nghttp2.session_callbacks = undefined;
338 if (nghttp2.nghttp2_session_callbacks_new(&callbacks) != 0) return error.Http2Error;
339 defer nghttp2.nghttp2_session_callbacks_del(callbacks);
340
341 nghttp2.nghttp2_session_callbacks_set_send_callback2(callbacks, h2Send);
342 nghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, h2FrameRecv);
343 nghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, h2DataChunk);
344 nghttp2.nghttp2_session_callbacks_set_on_header_callback(callbacks, h2Header);
345 nghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, h2StreamClose);
346
347 var session: *nghttp2.session = undefined;
348 if (nghttp2.nghttp2_session_client_new(&session, callbacks, self) != 0) return error.Http2Error;
349 self.h2_session = session;
350
351 var settings = [_]nghttp2.settings_entry{
352 .{ .settings_id = nghttp2.SETTINGS_MAX_CONCURRENT_STREAMS, .value = MAX_PENDING_REQUESTS },
353 .{ .settings_id = nghttp2.SETTINGS_INITIAL_WINDOW_SIZE, .value = 16 * 1024 * 1024 },
354 };
355 if (nghttp2.nghttp2_submit_settings(self.h2_session.?, nghttp2.FLAG_NONE, &settings, settings.len) != 0) return error.Http2Error;
356
357 const conn_window_increase: i32 = (16 * 1024 * 1024) - 65535;
358 _ = nghttp2.nghttp2_submit_window_update(self.h2_session.?, nghttp2.FLAG_NONE, 0, conn_window_increase);
359 }
360
361 fn flush(self: *Http2Client) !void {
362 if (self.closing) return error.ConnectionFailed;
363 if (self.h2_session) |session| while (nghttp2.nghttp2_session_want_write(session) != 0) if (nghttp2.nghttp2_session_send(session) != 0) break;
364 if (self.write_buf.items.len > 0) {
365 const data = try self.allocator.dupe(u8, self.write_buf.items);
366 self.write_buf.clearRetainingCapacity();
367 const wr = try self.allocator.create(uv.write_t);
368 wr.data = data.ptr;
369 var buf = uv.buf_t{ .base = data.ptr, .len = data.len };
370 if (tlsuv.tlsuv_stream_write(wr, &self.tls, &buf, onWrite) != 0) {
371 self.allocator.free(data);
372 self.allocator.destroy(wr);
373 return error.ConnectionFailed;
374 }
375 }
376 }
377
378 fn onWrite(wr: *uv.write_t, _: c_int) callconv(.c) void {
379 const data_ptr: [*]u8 = @ptrCast(wr.data);
380 std.c.free(data_ptr);
381 std.c.free(@ptrCast(wr));
382 }
383
384 fn allocBuf(_: *uv.handle_t, size: usize, buf: *uv.buf_t) callconv(.c) void {
385 buf.base = @ptrCast(std.c.malloc(size) orelse return);
386 buf.len = size;
387 }
388
389 fn onRead(stream: *uv.stream_t, nread: isize, buf: *const uv.buf_t) callconv(.c) void {
390 const tls: *tlsuv.stream_t = @ptrCast(@alignCast(stream));
391 const client: *Http2Client = @ptrCast(@alignCast(tls.data));
392 defer if (buf.base) |b| std.c.free(b);
393 if (client.closing) return;
394 if (nread < 0) {
395 for (client.requests[0..client.request_count]) |*req| if (!req.done) {
396 req.done = true;
397 req.has_error = true;
398 client.requests_done += 1;
399 if (req.on_error) |cb| cb(FetchError.ConnectionFailed, req.userdata);
400 };
401 return;
402 }
403 if (nread > 0 and client.h2_session != null) {
404 _ = nghttp2.nghttp2_session_mem_recv(client.h2_session.?, @ptrCast(buf.base), @intCast(nread));
405 client.flush() catch {};
406 }
407 }
408
409 fn onConnect(req: *uv.connect_t, status: c_int) callconv(.c) void {
410 const ctx: *ConnectCtx = @ptrCast(@alignCast(req.data));
411 defer ctx.client.allocator.destroy(ctx);
412 ctx.client.connect_pending = false;
413 if (ctx.client.closing) {
414 ctx.client.connected = -1;
415 _ = tlsuv.tlsuv_stream_close(&ctx.client.tls, onStreamClose);
416 return;
417 }
418 if (status < 0) {
419 ctx.client.connected = -1;
420 return;
421 }
422 ctx.client.connected = 1;
423 ctx.client.tls.data = ctx.client;
424 ctx.client.initH2() catch {
425 ctx.client.connected = -1;
426 return;
427 };
428 _ = tlsuv.tlsuv_stream_read_start(&ctx.client.tls, allocBuf, onRead);
429 ctx.client.flush() catch {};
430 }
431
432 const ConnectCtx = struct { client: *Http2Client, req: uv.connect_t };
433
434 fn ensureConnected(self: *Http2Client) !void {
435 if (self.closing) return error.ConnectionFailed;
436 if (self.connected > 0) return;
437 if (self.connected < 0) return error.ConnectionFailed;
438
439 var conn_start: u64 = @intCast(std.time.nanoTimestamp());
440
441 if (!self.connect_pending) {
442 const ctx = try self.allocator.create(ConnectCtx);
443 ctx.* = .{ .client = self, .req = .{} };
444 ctx.req.data = ctx;
445 if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) {
446 self.allocator.destroy(ctx);
447 return error.ConnectionFailed;
448 }
449 self.connect_pending = true;
450 }
451
452 var loop_count: u32 = 0;
453 while (self.connected == 0) {
454 _ = uv.uv_run(self.loop, uv.RUN_ONCE);
455 loop_count += 1;
456 }
457 conn_start = debug.timer(" h2: tls connect", conn_start);
458 debug.log(" h2: connect loop iterations={d}", .{loop_count});
459 if (self.connected < 0) return error.ConnectionFailed;
460 }
461
462 pub fn initiateConnectAsync(self: *Http2Client) !void {
463 if (self.closing) return error.ConnectionFailed;
464 if (self.connected > 0) return;
465 if (self.connected < 0) return error.ConnectionFailed;
466 if (self.connect_pending) return;
467
468 const ctx = try self.allocator.create(ConnectCtx);
469 ctx.* = .{ .client = self, .req = .{} };
470 ctx.req.data = ctx;
471 if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) {
472 self.allocator.destroy(ctx);
473 return error.ConnectionFailed;
474 }
475 self.connect_pending = true;
476 }
477
478 fn makeNv(name: [:0]const u8, value: [:0]const u8) nghttp2.nv {
479 return .{
480 .name = @constCast(name.ptr),
481 .value = @constCast(value.ptr),
482 .namelen = name.len, .valuelen = value.len, .flags = nghttp2.NV_FLAG_NONE
483 };
484 }
485
486 pub fn get(self: *Http2Client, path: []const u8, allocator: std.mem.Allocator) ![]u8 {
487 return self.getWithAccept(path, "application/json", allocator);
488 }
489
490 pub fn getWithAccept(self: *Http2Client, path: []const u8, accept: [:0]const u8, allocator: std.mem.Allocator) ![]u8 {
491 try self.ensureConnected();
492 if (self.request_count >= MAX_PENDING_REQUESTS) self.resetRequests();
493 const req = &self.requests[self.request_count]; self.request_count += 1;
494
495 req.* = .{
496 .stream_id = 0,
497 .path = try self.allocator.dupeZ(u8, path),
498 .on_data = null,
499 .on_complete = null,
500 .on_error = null,
501 .userdata = null,
502 .response_body = .{},
503 .status_code = 0,
504 .done = false,
505 .has_error = false,
506 .start_ns = @intCast(std.time.nanoTimestamp()),
507 .end_ns = 0,
508 .bytes = 0,
509 .content_encoding = .identity,
510 };
511
512 const session = self.h2_session orelse return error.Http2Error;
513
514 var hdrs = [_]nghttp2.nv{
515 makeNv(":method", "GET"),
516 makeNv(":path", req.path.?),
517 makeNv(":scheme", "https"),
518 makeNv(":authority", self.host),
519 makeNv("accept", accept),
520 makeNv("user-agent", user_agent)
521 };
522
523 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
524 if (sid < 0) {
525 self.request_count -= 1;
526 if (req.path) |p| self.allocator.free(p);
527 return error.Http2Error;
528 }
529
530 req.stream_id = sid;
531 try self.flush();
532 while (!req.done) {
533 _ = uv.uv_run(self.loop, uv.RUN_ONCE);
534 try self.flush();
535 }
536
537 self.last_response_status_code = req.status_code;
538 if (req.has_error or req.status_code != 200) return error.ResponseError;
539 return try allocator.dupe(u8, req.response_body.items);
540 }
541
542 pub fn getStream(self: *Http2Client, path: []const u8, on_data: *const fn ([]const u8, ?*anyopaque) void, on_complete: *const fn (u16, ?*anyopaque) void, on_error: *const fn (FetchError, ?*anyopaque) void, userdata: ?*anyopaque) !void {
543 try self.ensureConnected();
544 const req = self.findOrAllocSlot() orelse return error.OutOfMemory;
545
546 req.* = .{
547 .stream_id = 0,
548 .path = try self.allocator.dupeZ(u8, path),
549 .on_data = on_data,
550 .on_complete = on_complete,
551 .on_error = on_error,
552 .userdata = userdata,
553 .response_body = .{},
554 .status_code = 0,
555 .done = false,
556 .has_error = false,
557 .start_ns = @intCast(std.time.nanoTimestamp()),
558 .end_ns = 0,
559 .bytes = 0,
560 .content_encoding = .identity
561 };
562
563 const session = self.h2_session orelse return error.Http2Error;
564
565 var hdrs = [_]nghttp2.nv{
566 makeNv(":method", "GET"),
567 makeNv(":path", req.path.?),
568 makeNv(":scheme", "https"),
569 makeNv(":authority", self.host),
570 makeNv("accept", "*/*"),
571 makeNv("user-agent", user_agent)
572 };
573
574 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
575 if (sid < 0) {
576 if (req.path) |p| self.allocator.free(p);
577 req.stream_id = -1;
578 return error.Http2Error;
579 }
580
581 req.stream_id = sid;
582 try self.flush();
583 }
584
585 pub fn run(self: *Http2Client) !void {
586 const run_start: u64 = @intCast(std.time.nanoTimestamp());
587 var loop_count: u32 = 0;
588 var last_done: usize = 0;
589 var last_report: u64 = run_start;
590
591 while (self.requests_done < self.request_count) {
592 if (uv.uv_run(self.loop, uv.RUN_ONCE) == 0) break;
593 try self.flush();
594 loop_count += 1;
595
596 const now: u64 = @intCast(std.time.nanoTimestamp());
597 if (now - last_report > 1_000_000_000) {
598 const done_delta = self.requests_done - last_done;
599 debug.log(" h2: progress {d}/{d} (+{d} in last 1s) loops={d}", .{
600 self.requests_done, self.request_count,
601 done_delta, loop_count,
602 });
603 last_done = self.requests_done;
604 last_report = now;
605 }
606 }
607
608 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
609 const elapsed_ms = elapsed_ns / 1_000_000;
610 debug.log(" h2: run complete in {d}ms, {d} loops, {d}/{d} done", .{
611 elapsed_ms, loop_count,
612 self.requests_done, self.request_count,
613 });
614
615 var error_count: usize = 0;
616 for (self.requests[0..self.request_count]) |req| {
617 if (req.has_error) error_count += 1;
618 }
619 if (error_count > 0) {
620 debug.log(" h2: {d} requests had errors", .{error_count});
621 return error.ResponseError;
622 }
623 }
624};
625
626pub const TarballCtx = struct {
627 handler: StreamHandler,
628 done: bool,
629 has_error: bool,
630 url: []const u8,
631 start_ns: u64,
632 bytes: usize,
633};
634
635const TarballStats = struct {
636 url: []const u8,
637 bytes: usize,
638 elapsed_ms: u64,
639};
640
641const TarballCallbacks = struct {
642 fn onData(data: []const u8, ud: ?*anyopaque) void {
643 const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
644 ctx.bytes += data.len;
645 ctx.handler.on_data(data, ctx.handler.user_data);
646 }
647
648 fn onComplete(status: u16, ud: ?*anyopaque) void {
649 const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
650 ctx.handler.on_complete(status, ctx.handler.user_data);
651 ctx.done = true;
652 }
653
654 fn onError(err: FetchError, ud: ?*anyopaque) void {
655 const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
656 ctx.handler.on_error(err, ctx.handler.user_data);
657 ctx.done = true;
658 ctx.has_error = true;
659 }
660};
661
662pub const Fetcher = struct {
663 allocator: std.mem.Allocator,
664 registry_host: []const u8,
665 meta_clients: [NUM_META_CONNECTIONS]?*Http2Client,
666 meta_clients_initialized: bool,
667 pending: std.ArrayListUnmanaged(PendingRequest),
668 tarball_clients: [NUM_CONNECTIONS]?*Http2Client,
669 tarball_clients_initialized: bool,
670 tarball_contexts: std.ArrayListUnmanaged(*TarballCtx),
671 tarball_round_robin: usize,
672 tarball_stats: std.ArrayListUnmanaged(TarballStats),
673 last_http_error_url: ?[]u8,
674 last_http_error_status: u16,
675
676 pub const HttpErrorInfo = struct {
677 url: []const u8,
678 status: u16,
679 };
680
681 pub fn init(allocator: std.mem.Allocator, registry_host: []const u8) !*Fetcher {
682 const f = try allocator.create(Fetcher);
683 f.* = .{
684 .allocator = allocator,
685 .registry_host = try allocator.dupe(u8, registry_host),
686 .meta_clients = [_]?*Http2Client{null} ** NUM_META_CONNECTIONS,
687 .meta_clients_initialized = false,
688 .pending = .{},
689 .tarball_clients = [_]?*Http2Client{null} ** NUM_CONNECTIONS,
690 .tarball_clients_initialized = false,
691 .tarball_contexts = .{},
692 .tarball_round_robin = 0,
693 .tarball_stats = .{},
694 .last_http_error_url = null,
695 .last_http_error_status = 0,
696 };
697 return f;
698 }
699
700 pub fn deinit(self: *Fetcher) void {
701 if (self.last_http_error_url) |url| self.allocator.free(url);
702 for (&self.meta_clients) |*maybe_client| {
703 if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; }
704 }
705 for (self.pending.items) |req| self.allocator.free(req.url);
706 self.pending.deinit(self.allocator);
707 for (&self.tarball_clients) |*maybe_client| {
708 if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; }
709 }
710 for (self.tarball_contexts.items) |ctx| {
711 self.allocator.free(ctx.url);
712 self.allocator.destroy(ctx);
713 }
714 self.tarball_contexts.deinit(self.allocator);
715 for (self.tarball_stats.items) |stat| self.allocator.free(stat.url);
716 self.tarball_stats.deinit(self.allocator);
717 self.allocator.free(self.registry_host);
718 self.allocator.destroy(self);
719 }
720
721 fn clearLastHttpError(self: *Fetcher) void {
722 if (self.last_http_error_url) |url| self.allocator.free(url);
723 self.last_http_error_url = null;
724 self.last_http_error_status = 0;
725 }
726
727 fn setLastHttpError(self: *Fetcher, url: []const u8, status: u16) void {
728 self.clearLastHttpError();
729 self.last_http_error_url = self.allocator.dupe(u8, url) catch null;
730 self.last_http_error_status = status;
731 }
732
733 pub fn getLastHttpError(self: *const Fetcher) ?HttpErrorInfo {
734 const url = self.last_http_error_url orelse return null;
735 return .{ .url = url, .status = self.last_http_error_status };
736 }
737
738 fn ensureMetaClients(self: *Fetcher) !void {
739 if (self.meta_clients_initialized) return;
740
741 for (&self.meta_clients, 0..) |*slot, i| {
742 const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| {
743 debug.log("fetcher: failed to init meta connection {d}: {}", .{ i, err });
744 continue;
745 };
746 client.ensureConnected() catch |err| {
747 debug.log("fetcher: failed to connect meta {d}: {}", .{ i, err });
748 client.deinit();
749 continue;
750 };
751 slot.* = client;
752 }
753
754 var any_connected = false;
755 for (self.meta_clients) |slot| {
756 if (slot != null) { any_connected = true; break; }
757 }
758
759 if (!any_connected) return error.ConnectionFailed;
760 self.meta_clients_initialized = true;
761 }
762
763 pub fn resetMetaClients(self: *Fetcher) void {
764 self.clearLastHttpError();
765 for (&self.meta_clients) |*slot| {
766 if (slot.*) |client| { client.deinit(); slot.* = null; }
767 }
768 self.meta_clients_initialized = false;
769 }
770
771 fn ensureTarballClients(self: *Fetcher) !void {
772 if (self.tarball_clients_initialized) return;
773
774 debug.log("fetcher: initializing {d} persistent connections", .{NUM_CONNECTIONS});
775 const init_start: u64 = @intCast(std.time.nanoTimestamp());
776
777 for (&self.tarball_clients, 0..) |*slot, i| {
778 const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| {
779 debug.log("fetcher: failed to init connection {d}: {}", .{ i, err });
780 continue;
781 };
782 client.ensureConnected() catch |err| {
783 debug.log("fetcher: failed to connect {d}: {}", .{ i, err });
784 client.deinit();
785 continue;
786 };
787 slot.* = client;
788 }
789
790 var any_connected = false;
791 for (self.tarball_clients) |slot| {
792 if (slot != null) { any_connected = true; break; }
793 }
794
795 if (!any_connected) return error.ConnectionFailed;
796 self.tarball_clients_initialized = true;
797
798 _ = debug.timer("fetcher: connection pool init", init_start);
799 }
800
801 fn findAvailableClient(self: *Fetcher) ?struct { client: *Http2Client, idx: usize } {
802 var attempts: usize = 0;
803 while (attempts < NUM_CONNECTIONS) : (attempts += 1) {
804 const idx = (self.tarball_round_robin + attempts) % NUM_CONNECTIONS;
805 if (self.tarball_clients[idx]) |client| { if (client.hasCapacity()) return .{ .client = client, .idx = idx }; }
806 }
807 return null;
808 }
809
810 pub fn initiateTarballConnectionsAsync(self: *Fetcher) void {
811 if (self.tarball_clients_initialized) return;
812 debug.log("fetcher: initiating {d} tarball connections (async)", .{NUM_CONNECTIONS});
813
814 for (&self.tarball_clients, 0..) |*slot, i| {
815 const client = Http2Client.init(self.allocator, self.registry_host, true) catch {
816 continue;
817 };
818 client.initiateConnectAsync() catch {
819 client.deinit(); continue;
820 };
821 slot.* = client; _ = i;
822 }
823
824 var any_connected = false;
825 for (self.tarball_clients) |slot| {
826 if (slot != null) { any_connected = true; break; }
827 }
828
829 if (any_connected) self.tarball_clients_initialized = true;
830 }
831
832 pub fn queueTarballAsync(self: *Fetcher, url: []const u8, handler: StreamHandler) !void {
833 try self.ensureTarballClients();
834 const parsed = try ParsedUrl.parse(url);
835
836 const available = self.findAvailableClient() orelse {
837 try self.pending.append(self.allocator, .{
838 .url = try self.allocator.dupe(u8, url),
839 .handler = handler,
840 }); return;
841 };
842
843 const ctx = try self.allocator.create(TarballCtx);
844 ctx.* = .{
845 .handler = handler,
846 .done = false,
847 .has_error = false,
848 .url = try self.allocator.dupe(u8, url),
849 .start_ns = @intCast(std.time.nanoTimestamp()),
850 .bytes = 0,
851 };
852
853 try self.tarball_contexts.append(
854 self.allocator,
855 ctx
856 );
857
858 try available.client.getStream(
859 parsed.path,
860 TarballCallbacks.onData,
861 TarballCallbacks.onComplete,
862 TarballCallbacks.onError,
863 ctx,
864 );
865
866 self.tarball_round_robin = (available.idx + 1) % NUM_CONNECTIONS;
867 }
868
869 pub fn tick(self: *Fetcher) usize {
870 self.ensureTarballClients() catch return 0;
871 const loop = uv.uv_default_loop();
872
873 for (&self.tarball_clients) |maybe_client| {
874 if (maybe_client) |c| c.flush() catch {};
875 }
876 _ = uv.uv_run(loop, uv.RUN_NOWAIT);
877
878 for (&self.tarball_clients) |maybe_client| {
879 if (maybe_client) |c| c.recycleCompletedRequests();
880 }
881
882 const completed = self.cleanupCompletedContexts();
883 self.dispatchPending();
884
885 return completed;
886 }
887
888 fn cleanupCompletedContexts(self: *Fetcher) usize {
889 var completed: usize = 0; var i: usize = 0;
890 while (i < self.tarball_contexts.items.len) {
891 const ctx = self.tarball_contexts.items[i];
892 if (ctx.done) {
893 completed += 1;
894 self.allocator.free(ctx.url);
895 self.allocator.destroy(ctx);
896 _ = self.tarball_contexts.swapRemove(i);
897 } else i += 1;
898 }
899 return completed;
900 }
901
902 fn dispatchPending(self: *Fetcher) void {
903 while (self.pending.items.len > 0) {
904 const available = self.findAvailableClient() orelse break;
905 const req = self.pending.pop() orelse break;
906
907 const handler = req.handler orelse {
908 self.allocator.free(req.url); continue;
909 };
910
911 self.dispatchRequest(available.client, req.url, handler) catch |err| {
912 handler.on_error(errToFetchError(err), handler.user_data);
913 self.allocator.free(req.url); continue;
914 };
915 }
916 }
917
918 fn dispatchRequest(self: *Fetcher, client: *Http2Client, url: []const u8, handler: StreamHandler) !void {
919 const parsed = try ParsedUrl.parse(url);
920 const ctx = try self.allocator.create(TarballCtx);
921
922 ctx.* = .{
923 .handler = handler,
924 .done = false,
925 .has_error = false,
926 .url = url,
927 .start_ns = @intCast(std.time.nanoTimestamp()),
928 .bytes = 0,
929 };
930
931 errdefer self.allocator.destroy(ctx);
932 try self.tarball_contexts.append(self.allocator, ctx);
933
934 try client.getStream(
935 parsed.path,
936 TarballCallbacks.onData,
937 TarballCallbacks.onComplete,
938 TarballCallbacks.onError,
939 ctx,
940 );
941 }
942
943 fn errToFetchError(err: anyerror) FetchError {
944 return switch (err) {
945 error.InvalidUrl => FetchError.InvalidUrl,
946 error.OutOfMemory => FetchError.OutOfMemory,
947 else => FetchError.Http2Error,
948 };
949 }
950
951 pub fn pendingTarballCount(self: *Fetcher) usize {
952 return self.tarball_contexts.items.len;
953 }
954
955 pub fn finishTarballs(self: *Fetcher) void {
956 const loop = uv.uv_default_loop();
957 var last_report: u64 = @intCast(std.time.nanoTimestamp());
958 var loops: usize = 0;
959 var completed: usize = 0;
960 const start = last_report;
961
962 while (self.tarball_contexts.items.len > 0 or self.pending.items.len > 0) {
963 for (&self.tarball_clients) |maybe_client| {
964 if (maybe_client) |c| c.flush() catch {};
965 }
966
967 if (uv.uv_run(loop, uv.RUN_ONCE) == 0 and self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) break;
968 loops += 1;
969
970 for (&self.tarball_clients) |maybe_client| {
971 if (maybe_client) |c| c.recycleCompletedRequests();
972 }
973
974 var i: usize = 0;
975 while (i < self.tarball_contexts.items.len) {
976 const ctx = self.tarball_contexts.items[i];
977 if (ctx.done) {
978 if (!ctx.has_error) {
979 const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - ctx.start_ns) / 1_000_000);
980 const url_copy = self.allocator.dupe(u8, ctx.url) catch null;
981 if (url_copy) |url| {
982 self.tarball_stats.append(self.allocator, .{ .url = url, .bytes = ctx.bytes, .elapsed_ms = elapsed_ms }) catch {};
983 }
984 }
985 self.allocator.free(ctx.url);
986 self.allocator.destroy(ctx);
987 _ = self.tarball_contexts.swapRemove(i);
988 completed += 1;
989 } else {
990 i += 1;
991 }
992 }
993
994 while (self.pending.items.len > 0) {
995 var queued = false;
996 for (&self.tarball_clients, 0..) |maybe_client, conn_idx| {
997 if (maybe_client) |client| {
998 if (client.hasCapacity()) {
999 const maybe_req = self.pending.pop();
1000 const req = maybe_req orelse break;
1001 if (req.handler) |handler| {
1002 const parsed = ParsedUrl.parse(req.url) catch {
1003 handler.on_error(FetchError.InvalidUrl, handler.user_data);
1004 self.allocator.free(req.url);
1005 continue;
1006 };
1007
1008 const ctx = self.allocator.create(TarballCtx) catch {
1009 handler.on_error(FetchError.OutOfMemory, handler.user_data);
1010 self.allocator.free(req.url);
1011 continue;
1012 };
1013 ctx.* = .{
1014 .handler = handler,
1015 .done = false,
1016 .has_error = false,
1017 .url = req.url,
1018 .start_ns = @intCast(std.time.nanoTimestamp()),
1019 .bytes = 0,
1020 };
1021 self.tarball_contexts.append(self.allocator, ctx) catch {
1022 self.allocator.destroy(ctx);
1023 self.allocator.free(req.url);
1024 continue;
1025 };
1026
1027 client.getStream(
1028 parsed.path,
1029 struct {
1030 fn onData(data: []const u8, ud: ?*anyopaque) void {
1031 const c: *TarballCtx = @ptrCast(@alignCast(ud));
1032 c.bytes += data.len;
1033 c.handler.on_data(data, c.handler.user_data);
1034 }
1035 }.onData,
1036 struct {
1037 fn onComplete(status: u16, ud: ?*anyopaque) void {
1038 const c: *TarballCtx = @ptrCast(@alignCast(ud));
1039 c.handler.on_complete(status, c.handler.user_data);
1040 if (debug.enabled) {
1041 const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000);
1042 debug.log(" tarball: done {s} {d}ms {d} bytes status={d}", .{ c.url, elapsed_ms, c.bytes, status });
1043 }
1044 c.done = true;
1045 }
1046 }.onComplete,
1047 struct {
1048 fn onError(err: FetchError, ud: ?*anyopaque) void {
1049 const c: *TarballCtx = @ptrCast(@alignCast(ud));
1050 c.handler.on_error(err, c.handler.user_data);
1051 if (debug.enabled) {
1052 const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000);
1053 debug.log(" tarball: error {s} {d}ms {d} bytes", .{ c.url, elapsed_ms, c.bytes });
1054 }
1055 c.done = true;
1056 c.has_error = true;
1057 }
1058 }.onError,
1059 ctx,
1060 ) catch {
1061 handler.on_error(FetchError.Http2Error, handler.user_data);
1062 ctx.done = true;
1063 };
1064 queued = true;
1065 _ = conn_idx;
1066 } else {
1067 self.allocator.free(req.url);
1068 }
1069 break;
1070 }
1071 }
1072 }
1073 if (!queued) break;
1074 }
1075
1076 const now: u64 = @intCast(std.time.nanoTimestamp());
1077 if (now - last_report > 1_000_000_000) {
1078 var total_bytes: usize = 0;
1079 for (self.tarball_contexts.items) |ctx| {
1080 total_bytes += ctx.bytes;
1081 }
1082 debug.log(" h2: {d} in-flight, {d} pending, {d} completed, {d} loops", .{
1083 self.tarball_contexts.items.len,
1084 self.pending.items.len,
1085 completed,
1086 loops,
1087 });
1088 debug.log(" h2: tarball progress in-flight bytes={d}", .{ total_bytes });
1089 last_report = now;
1090 }
1091 }
1092
1093 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, start));
1094 debug.log("fetcher: finishTarballs completed in {d}ms, {d} loops, {d} completed", .{
1095 elapsed_ns / 1_000_000,
1096 loops,
1097 completed,
1098 });
1099 if (debug.enabled and self.tarball_stats.items.len > 0) {
1100 var top_time: [5]?TarballStats = .{null} ** 5;
1101 var top_size: [5]?TarballStats = .{null} ** 5;
1102
1103 for (self.tarball_stats.items) |stat| {
1104 var idx_time: usize = top_time.len;
1105 for (top_time, 0..) |slot, i| {
1106 if (slot == null or stat.elapsed_ms > slot.?.elapsed_ms) {
1107 idx_time = i;
1108 break;
1109 }
1110 }
1111 if (idx_time < top_time.len) {
1112 var carry = stat;
1113 var j = idx_time;
1114 while (j < top_time.len) : (j += 1) {
1115 const next = top_time[j];
1116 top_time[j] = carry;
1117 if (next) |n| {
1118 carry = n;
1119 } else {
1120 break;
1121 }
1122 }
1123 }
1124
1125 var idx_size: usize = top_size.len;
1126 for (top_size, 0..) |slot, i| {
1127 if (slot == null or stat.bytes > slot.?.bytes) {
1128 idx_size = i;
1129 break;
1130 }
1131 }
1132 if (idx_size < top_size.len) {
1133 var carry_size = stat;
1134 var k = idx_size;
1135 while (k < top_size.len) : (k += 1) {
1136 const next_size = top_size[k];
1137 top_size[k] = carry_size;
1138 if (next_size) |n| {
1139 carry_size = n;
1140 } else {
1141 break;
1142 }
1143 }
1144 }
1145 }
1146
1147 debug.log("fetcher: top tarballs by time", .{});
1148 for (top_time, 0..) |maybe_stat, i| {
1149 if (maybe_stat) |stat| {
1150 debug.log(" {d}. {s} {d}ms {d} bytes", .{ i + 1, stat.url, stat.elapsed_ms, stat.bytes });
1151 }
1152 }
1153 debug.log("fetcher: top tarballs by size", .{});
1154 for (top_size, 0..) |maybe_stat, i| {
1155 if (maybe_stat) |stat| {
1156 debug.log(" {d}. {s} {d} bytes {d}ms", .{ i + 1, stat.url, stat.bytes, stat.elapsed_ms });
1157 }
1158 }
1159 }
1160 }
1161
1162 pub fn fetchMetadata(self: *Fetcher, package_name: []const u8, allocator: std.mem.Allocator) ![]u8 {
1163 return self.fetchMetadataFull(package_name, false, allocator);
1164 }
1165
1166 const DecodedMetadata = struct {
1167 data: []u8,
1168 compressed: bool,
1169 };
1170
1171 fn metaClientCanQueue(c: *const Http2Client) bool {
1172 return c.h2_session != null and c.connected == 1 and c.request_count < MAX_PENDING_REQUESTS - 1;
1173 }
1174
1175 fn nextMetaClient(self: *Fetcher, conn_idx: *usize) ?*Http2Client {
1176 var attempts: usize = 0;
1177 while (attempts < NUM_META_CONNECTIONS) : (attempts += 1) {
1178 if (self.meta_clients[conn_idx.*]) |c| {
1179 if (metaClientCanQueue(c)) return c;
1180 }
1181 conn_idx.* = (conn_idx.* + 1) % NUM_META_CONNECTIONS;
1182 }
1183 return null;
1184 }
1185
1186 fn flushMetaClients(self: *Fetcher) void {
1187 for (self.meta_clients) |maybe_client| {
1188 if (maybe_client) |c| c.flush() catch {};
1189 }
1190 }
1191
1192 fn metaRequestsComplete(self: *Fetcher) bool {
1193 for (self.meta_clients) |maybe_client| {
1194 if (maybe_client) |c| {
1195 for (c.requests[0..c.request_count]) |*req| {
1196 if (!req.done and !req.has_error) return false;
1197 }
1198 }
1199 }
1200 return true;
1201 }
1202
1203 fn decodeMetadataOwned(
1204 req: *Http2Client.RequestState,
1205 allocator: std.mem.Allocator,
1206 decompress_buf: *std.ArrayListUnmanaged(u8),
1207 ) ?DecodedMetadata {
1208 if (req.has_error or req.status_code != 200) return null;
1209
1210 if (req.content_encoding != .gzip) {
1211 const data = allocator.dupe(u8, req.response_body.items) catch return null;
1212 return .{ .data = data, .compressed = false };
1213 }
1214
1215 decompress_buf.clearRetainingCapacity();
1216 const decomp = extractor.GzipDecompressor.init(allocator) catch return null;
1217 defer decomp.deinit();
1218
1219 _ = decomp.decompress(req.response_body.items, struct {
1220 fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void {
1221 const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx));
1222 try buf.appendSlice(c_allocator, data);
1223 }
1224 }.onChunk, decompress_buf) catch return null;
1225
1226 const data = allocator.dupe(u8, decompress_buf.items) catch return null;
1227 return .{ .data = data, .compressed = true };
1228 }
1229
1230 pub fn fetchMetadataFull(self: *Fetcher, package_name: []const u8, full: bool, allocator: std.mem.Allocator) ![]u8 {
1231 try self.ensureMetaClients();
1232 self.clearLastHttpError();
1233 for (self.meta_clients) |maybe_client| {
1234 if (maybe_client) |client| {
1235 var path_buf: [512]u8 = undefined;
1236 const path_slice = std.fmt.bufPrint(&path_buf, "/{s}", .{package_name}) catch return error.OutOfMemory;
1237 const accept: [:0]const u8 = if (full) "application/json" else "application/vnd.npm.install-v1+json";
1238 return client.getWithAccept(path_slice, accept, allocator) catch |err| {
1239 if (err == error.ResponseError) {
1240 var url_buf: [1024]u8 = undefined;
1241 const url = std.fmt.bufPrint(&url_buf, "https://{s}/{s}", .{ self.registry_host, package_name }) catch "";
1242 self.setLastHttpError(url, client.last_response_status_code);
1243 }
1244 return err;
1245 };
1246 }
1247 }
1248 return error.ConnectionFailed;
1249 }
1250
1251 pub const MetadataResult = struct {
1252 name: []const u8,
1253 data: ?[]u8,
1254 compressed: bool,
1255 has_error: bool,
1256 };
1257
1258 fn storeMetadataBatchResult(
1259 req: *Http2Client.RequestState,
1260 result: *MetadataResult,
1261 allocator: std.mem.Allocator,
1262 decompress_buf: *std.ArrayListUnmanaged(u8),
1263 ) bool {
1264 const decoded = decodeMetadataOwned(req, allocator, decompress_buf) orelse {
1265 result.has_error = true;
1266 return false;
1267 };
1268 result.data = decoded.data;
1269 result.compressed = decoded.compressed;
1270 return true;
1271 }
1272
1273 pub fn fetchMetadataBatch(self: *Fetcher, names: []const []const u8, allocator: std.mem.Allocator) ![]MetadataResult {
1274 if (names.len == 0) return &[_]MetadataResult{};
1275
1276 var total_start: u64 = @intCast(std.time.nanoTimestamp());
1277 try self.ensureMetaClients();
1278 total_start = debug.timer(" meta: get clients", total_start);
1279
1280 var active_connections: usize = 0;
1281 for (self.meta_clients) |maybe_client| {
1282 if (maybe_client != null) active_connections += 1;
1283 } if (active_connections == 0) return error.ConnectionFailed;
1284
1285 debug.log(" meta: batch {d} packages across {d} connections", .{ names.len, active_connections });
1286
1287 var results = try allocator.alloc(MetadataResult, names.len);
1288 for (results, 0..) |*r, i| {
1289 r.* = .{ .name = names[i], .data = null, .compressed = false, .has_error = false };
1290 }
1291
1292 const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1);
1293 var offset: usize = 0;
1294 var batch_num: usize = 0;
1295
1296 var decompress_buf = std.ArrayListUnmanaged(u8){};
1297 defer decompress_buf.deinit(c_allocator);
1298
1299 while (offset < names.len) {
1300 const end = @min(offset + total_capacity, names.len);
1301 var batch_start: u64 = @intCast(std.time.nanoTimestamp());
1302 debug.log(" meta: batch {d} ({d}-{d})", .{ batch_num, offset, end });
1303
1304 var queued: usize = 0;
1305 var conn_idx: usize = 0;
1306 for (offset..end) |i| {
1307 const result = &results[i];
1308 const name = names[i];
1309
1310 const c = self.nextMetaClient(&conn_idx) orelse {
1311 result.has_error = true; continue;
1312 };
1313
1314 const session = c.h2_session orelse {
1315 result.has_error = true; continue;
1316 };
1317
1318 var path_buf: [512]u8 = undefined;
1319 const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch {
1320 result.has_error = true;
1321 continue;
1322 };
1323
1324 var hdrs = [_]nghttp2.nv{
1325 Http2Client.makeNv(":method", "GET"),
1326 Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch {
1327 result.has_error = true; continue;
1328 }),
1329 Http2Client.makeNv(":scheme", "https"),
1330 Http2Client.makeNv(":authority", c.host),
1331 Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"),
1332 Http2Client.makeNv("accept-encoding", "gzip"),
1333 Http2Client.makeNv("user-agent", user_agent),
1334 };
1335
1336 const req = &c.requests[c.request_count];
1337 c.request_count += 1;
1338 req.* = .{
1339 .stream_id = 0,
1340 .path = hdrs[1].value[0..hdrs[1].valuelen :0],
1341 .on_data = null,
1342 .on_complete = null,
1343 .on_error = null,
1344 .userdata = result,
1345 .response_body = .{},
1346 .status_code = 0,
1347 .done = false,
1348 .has_error = false,
1349 .start_ns = @intCast(std.time.nanoTimestamp()),
1350 .end_ns = 0,
1351 .bytes = 0,
1352 .content_encoding = .identity,
1353 };
1354
1355 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
1356 if (sid < 0) {
1357 c.request_count -= 1;
1358 if (req.path) |p| c.allocator.free(p);
1359 result.has_error = true;
1360 continue;
1361 }
1362 req.stream_id = sid;
1363 queued += 1;
1364 conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS;
1365 }
1366
1367 batch_start = debug.timer(" meta: queue requests", batch_start);
1368 self.flushMetaClients();
1369
1370 const loop = uv.uv_default_loop();
1371 var all_done = false;
1372 var loops: usize = 0;
1373 const run_start: u64 = @intCast(std.time.nanoTimestamp());
1374
1375 while (!all_done) {
1376 _ = uv.uv_run(loop, uv.RUN_ONCE);
1377 loops += 1;
1378 all_done = self.metaRequestsComplete();
1379 }
1380
1381 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
1382 debug.log(" h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops });
1383 batch_start = debug.timer(" meta: run h2 loop", batch_start);
1384
1385 var slow_count: usize = 0;
1386 var max_req_ms: u64 = 0;
1387 var max_req_name: []const u8 = "";
1388 var total_bytes: usize = 0;
1389 for (self.meta_clients) |maybe_client| {
1390 if (maybe_client) |c| {
1391 for (c.requests[0..c.request_count]) |*req| {
1392 const result: *MetadataResult = @ptrCast(@alignCast(req.userdata));
1393 const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp()));
1394 const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000);
1395 total_bytes += req.response_body.items.len;
1396 if (duration_ms > max_req_ms) {
1397 max_req_ms = duration_ms;
1398 max_req_name = result.name;
1399 }
1400 if (duration_ms >= META_SLOW_LOG_MS) {
1401 slow_count += 1;
1402 debug.log(" meta: slow {s} {d}ms {d} bytes status={d}", .{
1403 result.name,
1404 duration_ms,
1405 req.response_body.items.len,
1406 req.status_code,
1407 });
1408 }
1409 }
1410 }
1411 }
1412 debug.log(" meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes });
1413
1414 var success: usize = 0;
1415 for (self.meta_clients) |maybe_client| {
1416 if (maybe_client) |c| {
1417 for (c.requests[0..c.request_count]) |*req| {
1418 const result: *MetadataResult = @ptrCast(@alignCast(req.userdata));
1419 if (storeMetadataBatchResult(req, result, allocator, &decompress_buf)) success += 1;
1420 }
1421 c.resetRequests();
1422 }
1423 }
1424 _ = debug.timer(" meta: copy results", batch_start);
1425 debug.log(" meta: queued={d} success={d}", .{ queued, success });
1426
1427 offset = end;
1428 batch_num += 1;
1429 }
1430
1431 return results;
1432 }
1433
1434 pub const MetadataCallback = *const fn (
1435 name: []const u8,
1436 data: ?[]const u8,
1437 has_error: bool,
1438 userdata: ?*anyopaque
1439 ) void;
1440
1441 const MetadataStreamTracker = struct {
1442 name: []const u8,
1443 index: usize,
1444 };
1445
1446 fn emitMetadataStreamingResult(
1447 req: *Http2Client.RequestState,
1448 name: []const u8,
1449 allocator: std.mem.Allocator,
1450 decompress_buf: *std.ArrayListUnmanaged(u8),
1451 callback: MetadataCallback,
1452 userdata: ?*anyopaque,
1453 ) void {
1454 if (req.has_error or req.status_code != 200) {
1455 callback(name, null, true, userdata);
1456 return;
1457 }
1458
1459 if (req.content_encoding != .gzip) {
1460 callback(name, req.response_body.items, false, userdata);
1461 return;
1462 }
1463
1464 decompress_buf.clearRetainingCapacity();
1465 const decomp = extractor.GzipDecompressor.init(allocator) catch {
1466 callback(name, null, true, userdata);
1467 return;
1468 };
1469 defer decomp.deinit();
1470
1471 _ = decomp.decompress(req.response_body.items, struct {
1472 fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void {
1473 const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx));
1474 try buf.appendSlice(c_allocator, data);
1475 }
1476 }.onChunk, decompress_buf) catch {
1477 callback(name, null, true, userdata);
1478 return;
1479 };
1480
1481 callback(name, decompress_buf.items, false, userdata);
1482 }
1483
1484 fn emitCompletedStreamingMetadataCallbacks(
1485 self: *Fetcher,
1486 processed: []bool,
1487 allocator: std.mem.Allocator,
1488 decompress_buf: *std.ArrayListUnmanaged(u8),
1489 callback: MetadataCallback,
1490 userdata: ?*anyopaque,
1491 ) void {
1492 for (self.meta_clients) |maybe_client| {
1493 const c = maybe_client orelse continue;
1494 for (c.requests[0..c.request_count]) |*req| {
1495 if (!req.done and !req.has_error) continue;
1496
1497 const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata));
1498 if (processed[tracker.index]) continue;
1499
1500 processed[tracker.index] = true;
1501 emitMetadataStreamingResult(req, tracker.name, allocator, decompress_buf, callback, userdata);
1502 }
1503 }
1504 }
1505
1506 pub fn fetchMetadataStreaming(
1507 self: *Fetcher,
1508 names: []const []const u8,
1509 allocator: std.mem.Allocator,
1510 callback: MetadataCallback,
1511 userdata: ?*anyopaque,
1512 ) !void {
1513 if (names.len == 0) return;
1514
1515 var total_start: u64 = @intCast(std.time.nanoTimestamp());
1516 try self.ensureMetaClients();
1517 total_start = debug.timer(" meta: get clients", total_start);
1518
1519 var active_connections: usize = 0;
1520 for (self.meta_clients) |maybe_client| {
1521 if (maybe_client != null) active_connections += 1;
1522 }
1523
1524 if (active_connections == 0) return error.ConnectionFailed;
1525 debug.log(" meta: streaming {d} packages across {d} connections", .{ names.len, active_connections });
1526
1527 const processed = try allocator.alloc(bool, names.len);
1528 defer allocator.free(processed);
1529 @memset(processed, false);
1530
1531 var trackers = try allocator.alloc(MetadataStreamTracker, names.len);
1532 defer allocator.free(trackers);
1533 for (names, 0..) |name, i| {
1534 trackers[i] = .{ .name = name, .index = i };
1535 }
1536
1537 const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1);
1538 var offset: usize = 0;
1539 var batch_num: usize = 0;
1540
1541 var decompress_buf = std.ArrayListUnmanaged(u8){};
1542 defer decompress_buf.deinit(c_allocator);
1543
1544 while (offset < names.len) {
1545 const end = @min(offset + total_capacity, names.len);
1546 var batch_start: u64 = @intCast(std.time.nanoTimestamp());
1547
1548 debug.log(" meta: batch {d} ({d}-{d})", .{ batch_num, offset, end });
1549
1550 var queued: usize = 0;
1551 var conn_idx: usize = 0;
1552 for (offset..end) |i| {
1553 const tracker = &trackers[i];
1554 const name = names[i];
1555
1556 const c = self.nextMetaClient(&conn_idx) orelse continue;
1557 const session = c.h2_session orelse continue;
1558
1559 var path_buf: [512]u8 = undefined;
1560 const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch continue;
1561
1562 var hdrs = [_]nghttp2.nv{
1563 Http2Client.makeNv(":method", "GET"),
1564 Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch continue),
1565 Http2Client.makeNv(":scheme", "https"),
1566 Http2Client.makeNv(":authority", c.host),
1567 Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"),
1568 Http2Client.makeNv("accept-encoding", "gzip"),
1569 Http2Client.makeNv("user-agent", user_agent),
1570 };
1571
1572 const req = &c.requests[c.request_count];
1573 c.request_count += 1;
1574 req.* = .{
1575 .stream_id = 0,
1576 .path = hdrs[1].value[0..hdrs[1].valuelen :0],
1577 .on_data = null,
1578 .on_complete = null,
1579 .on_error = null,
1580 .userdata = tracker,
1581 .response_body = .{},
1582 .status_code = 0,
1583 .done = false,
1584 .has_error = false,
1585 .start_ns = 0,
1586 .end_ns = 0,
1587 .bytes = 0,
1588 .content_encoding = .identity,
1589 };
1590 req.start_ns = @intCast(std.time.nanoTimestamp());
1591
1592 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
1593 if (sid < 0) {
1594 c.request_count -= 1;
1595 if (req.path) |p| c.allocator.free(p);
1596 continue;
1597 }
1598 req.stream_id = sid;
1599 queued += 1;
1600 conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS;
1601 }
1602
1603 batch_start = debug.timer(" meta: queue requests", batch_start);
1604 self.flushMetaClients();
1605
1606 const loop = uv.uv_default_loop();
1607 var all_done = false;
1608 var loops: usize = 0;
1609 const run_start: u64 = @intCast(std.time.nanoTimestamp());
1610
1611 while (!all_done) {
1612 _ = uv.uv_run(loop, uv.RUN_ONCE);
1613 loops += 1;
1614 self.emitCompletedStreamingMetadataCallbacks(processed, allocator, &decompress_buf, callback, userdata);
1615
1616 all_done = self.metaRequestsComplete();
1617 }
1618
1619 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
1620 debug.log(" h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops });
1621
1622 var slow_count: usize = 0;
1623 var max_req_ms: u64 = 0;
1624 var max_req_name: []const u8 = "";
1625 var total_bytes: usize = 0;
1626 for (self.meta_clients) |maybe_client| {
1627 if (maybe_client) |c| {
1628 for (c.requests[0..c.request_count]) |*req| {
1629 const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata));
1630 const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp()));
1631 const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000);
1632 total_bytes += req.response_body.items.len;
1633 if (duration_ms > max_req_ms) {
1634 max_req_ms = duration_ms;
1635 max_req_name = tracker.name;
1636 }
1637 if (duration_ms >= META_SLOW_LOG_MS) {
1638 slow_count += 1;
1639 debug.log(" meta: slow {s} {d}ms {d} bytes status={d}", .{
1640 tracker.name,
1641 duration_ms,
1642 req.response_body.items.len,
1643 req.status_code,
1644 });
1645 }
1646 }
1647 }
1648 }
1649 debug.log(" meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes });
1650
1651 for (self.meta_clients) |maybe_client| {
1652 if (maybe_client) |c| c.resetRequests();
1653 }
1654
1655 offset = end;
1656 batch_num += 1;
1657 }
1658 }
1659
1660 pub fn fetchTarball(self: *Fetcher, url: []const u8, handler: StreamHandler) !void {
1661 try self.pending.append(self.allocator, .{ .url = try self.allocator.dupe(u8, url), .handler = handler });
1662 }
1663
1664 pub fn run(self: *Fetcher) !void {
1665 if (self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) return;
1666
1667 const run_start: u64 = @intCast(std.time.nanoTimestamp());
1668 const total_requests = self.pending.items.len + self.tarball_contexts.items.len;
1669
1670 debug.log("fetcher: {d} tarballs to download (pending={d}, in-flight={d})", .{
1671 total_requests,
1672 self.pending.items.len,
1673 self.tarball_contexts.items.len,
1674 });
1675
1676 try self.ensureTarballClients();
1677 self.finishTarballs();
1678
1679 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
1680 debug.log("fetcher: {d} tarballs complete in {d}ms", .{ total_requests, elapsed_ns / 1_000_000 });
1681 }
1682};