MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 1682 lines 57 kB view raw
1const std = @import("std"); 2const c_allocator = std.heap.c_allocator; 3 4const debug = @import("debug.zig"); 5const extractor = @import("extractor.zig"); 6const uv = @import("uv.zig"); 7const tlsuv = @import("tlsuv.zig"); 8const nghttp2 = @import("nghttp2.zig"); 9 10const config = @import("config"); 11const user_agent: [:0]const u8 = "ant/" ++ config.version; 12 13pub const FetchError = error{ 14 ConnectionFailed, 15 TlsError, 16 Http2Error, 17 Timeout, 18 InvalidUrl, 19 ResponseError, 20 OutOfMemory, 21}; 22 23pub const ParsedUrl = struct { 24 scheme: []const u8, 25 host: []const u8, 26 port: u16, 27 path: []const u8, 28 29 pub fn parse(url: []const u8) !ParsedUrl { 30 var remaining = url; 31 const scheme_end = std.mem.indexOf(u8, remaining, "://") orelse return error.InvalidUrl; 32 const scheme = remaining[0..scheme_end]; 33 remaining = remaining[scheme_end + 3 ..]; 34 35 const path_start = std.mem.indexOf(u8, remaining, "/") orelse remaining.len; 36 const host_port = remaining[0..path_start]; 37 remaining = if (path_start < remaining.len) remaining[path_start..] else "/"; 38 39 var host: []const u8 = host_port; 40 var port: u16 = if (std.mem.eql(u8, scheme, "https")) 443 else 80; 41 42 if (std.mem.indexOf(u8, host_port, ":")) |colon| { 43 host = host_port[0..colon]; 44 port = std.fmt.parseInt(u16, host_port[colon + 1 ..], 10) catch return error.InvalidUrl; 45 } 46 47 return .{ .scheme = scheme, .host = host, .port = port, .path = remaining }; 48 } 49}; 50 51pub const StreamHandler = struct { 52 on_data: *const fn ([]const u8, ?*anyopaque) void, 53 on_complete: *const fn (u16, ?*anyopaque) void, 54 on_error: *const fn (FetchError, ?*anyopaque) void, 55 user_data: ?*anyopaque, 56 57 pub fn init( 58 on_data: *const fn ([]const u8, ?*anyopaque) void, 59 on_complete: *const fn (u16, ?*anyopaque) void, 60 on_error: *const fn (FetchError, ?*anyopaque) void, 61 user_data: ?*anyopaque, 62 ) StreamHandler { 63 return .{ .on_data = on_data, .on_complete = on_complete, .on_error = on_error, .user_data = user_data }; 64 } 65}; 66 67const PendingRequest = struct { 68 url: []const u8, 69 handler: ?StreamHandler, 70}; 71 72const MAX_PENDING_REQUESTS = 20; 73const NUM_CONNECTIONS = 6; 74const NUM_META_CONNECTIONS = 3; 75const META_SLOW_LOG_MS: u64 = 250; 76 77const Http2Client = struct { 78 allocator: std.mem.Allocator, 79 loop: *uv.loop_t, 80 tls: tlsuv.stream_t, 81 h2_session: ?*nghttp2.session, 82 host: [:0]const u8, 83 use_tls: bool, 84 connected: i32, 85 connect_pending: bool, 86 closing: bool, 87 write_buf: std.ArrayListUnmanaged(u8), 88 requests: [MAX_PENDING_REQUESTS]RequestState, 89 request_count: usize, 90 requests_done: usize, 91 last_response_status_code: u16, 92 93 const RequestState = struct { 94 stream_id: i32, 95 path: ?[:0]const u8, 96 on_data: ?*const fn ([]const u8, ?*anyopaque) void, 97 on_complete: ?*const fn (u16, ?*anyopaque) void, 98 on_error: ?*const fn (FetchError, ?*anyopaque) void, 99 userdata: ?*anyopaque, 100 response_body: std.ArrayListUnmanaged(u8), 101 status_code: u16, 102 done: bool, 103 has_error: bool, 104 start_ns: u64, 105 end_ns: u64, 106 bytes: usize, 107 content_encoding: ContentEncoding, 108 }; 109 110 const ContentEncoding = enum { 111 identity, 112 gzip, 113 }; 114 115 const alpn_protocols = [_][*:0]const u8{ "h2", "http/1.1" }; 116 117 pub fn init(allocator: std.mem.Allocator, host: []const u8, use_tls: bool) !*Http2Client { 118 const client = try allocator.create(Http2Client); 119 errdefer allocator.destroy(client); 120 121 const host_z = try allocator.dupeZ(u8, host); 122 errdefer allocator.free(host_z); 123 124 client.* = .{ 125 .allocator = allocator, 126 .loop = uv.uv_default_loop(), 127 .tls = .{}, 128 .h2_session = null, 129 .host = host_z, 130 .use_tls = use_tls, 131 .connected = 0, 132 .connect_pending = false, 133 .closing = false, 134 .write_buf = .{}, 135 .requests = undefined, 136 .request_count = 0, 137 .requests_done = 0, 138 .last_response_status_code = 0, 139 }; 140 141 for (&client.requests) |*req| { 142 req.* = .{ 143 .stream_id = 0, 144 .path = null, 145 .on_data = null, 146 .on_complete = null, 147 .on_error = null, 148 .userdata = null, 149 .response_body = .{}, 150 .status_code = 0, 151 .done = false, 152 .has_error = false, 153 .start_ns = 0, 154 .end_ns = 0, 155 .bytes = 0, 156 .content_encoding = .identity, 157 }; 158 } 159 160 if (tlsuv.tlsuv_stream_init(client.loop, &client.tls, null) != 0) { 161 allocator.free(host_z); 162 allocator.destroy(client); 163 return error.ConnectionFailed; 164 } 165 166 _ = tlsuv.tlsuv_stream_set_hostname(&client.tls, host_z.ptr); 167 _ = tlsuv.tlsuv_stream_set_protocols(&client.tls, 2, &alpn_protocols); 168 169 return client; 170 } 171 172 pub fn deinit(self: *Http2Client) void { 173 self.closing = true; 174 self.connect_pending = false; 175 176 for (&self.requests) |*req| { 177 req.on_data = null; 178 req.on_complete = null; 179 req.on_error = null; 180 req.userdata = null; 181 } 182 183 if (self.connected > 0) { 184 self.tls.data = self; 185 _ = tlsuv.tlsuv_stream_close(&self.tls, onStreamClose); 186 while (self.connected > 0) _ = uv.uv_run(self.loop, uv.RUN_ONCE); 187 } 188 189 if (self.h2_session) |session| nghttp2.nghttp2_session_del(session); 190 191 for (&self.requests) |*req| { 192 if (req.stream_id != -1) { 193 if (req.path) |p| self.allocator.free(p); 194 req.response_body.deinit(self.allocator); 195 } 196 } 197 198 self.write_buf.deinit(self.allocator); 199 self.allocator.free(self.host); 200 self.allocator.destroy(self); 201 } 202 203 pub fn resetRequests(self: *Http2Client) void { 204 for (self.requests[0..self.request_count]) |*req| { 205 if (req.stream_id != -1) { 206 if (req.path) |p| self.allocator.free(p); 207 req.response_body.deinit(self.allocator); 208 } 209 req.* = .{ 210 .stream_id = 0, 211 .path = null, 212 .on_data = null, 213 .on_complete = null, 214 .on_error = null, 215 .userdata = null, 216 .response_body = .{}, 217 .status_code = 0, 218 .done = false, 219 .has_error = false, 220 .start_ns = 0, 221 .end_ns = 0, 222 .bytes = 0, 223 .content_encoding = .identity, 224 }; 225 } 226 self.request_count = 0; 227 self.requests_done = 0; 228 } 229 230 pub fn hasCapacity(self: *const Http2Client) bool { 231 for (self.requests[0..self.request_count]) |req| { 232 if (req.stream_id == -1) return true; 233 } 234 return self.request_count < MAX_PENDING_REQUESTS - 1; 235 } 236 237 pub fn recycleCompletedRequests(self: *Http2Client) void { 238 if (self.requests_done == 0) return; 239 240 for (self.requests[0..self.request_count]) |*req| { 241 if (req.done and req.stream_id != -1) { 242 if (req.path) |p| self.allocator.free(p); 243 req.response_body.deinit(self.allocator); 244 req.path = null; 245 req.response_body = .{}; 246 req.stream_id = -1; 247 } 248 } 249 } 250 251 fn findOrAllocSlot(self: *Http2Client) ?*RequestState { 252 for (self.requests[0..self.request_count]) |*req| { 253 if (req.stream_id == -1) return req; 254 } 255 if (self.request_count < MAX_PENDING_REQUESTS) { 256 const req = &self.requests[self.request_count]; 257 self.request_count += 1; 258 return req; 259 } 260 return null; 261 } 262 263 fn onStreamClose(handle: *uv.handle_t) callconv(.c) void { 264 const tls: *tlsuv.stream_t = @ptrCast(@alignCast(handle)); 265 const client: *Http2Client = @ptrCast(@alignCast(tls.data)); 266 client.connected = -2; 267 client.connect_pending = false; 268 } 269 270 fn findRequest(self: *Http2Client, stream_id: i32) ?*RequestState { 271 for (self.requests[0..self.request_count]) |*req| if (req.stream_id == stream_id) return req; 272 return null; 273 } 274 275 fn h2Send(_: ?*nghttp2.session, data: [*c]const u8, len: usize, _: c_int, ud: ?*anyopaque) callconv(.c) isize { 276 const client: *Http2Client = @ptrCast(@alignCast(ud)); 277 client.write_buf.appendSlice(client.allocator, data[0..len]) catch return nghttp2.ERR_NOMEM; 278 return @intCast(len); 279 } 280 281 fn h2FrameRecv(_: ?*nghttp2.session, frame: *const nghttp2.frame, ud: ?*anyopaque) callconv(.c) c_int { 282 const client: *Http2Client = @ptrCast(@alignCast(ud)); 283 if (frame.hd.flags & nghttp2.FLAG_END_STREAM != 0) { 284 if (client.findRequest(frame.hd.stream_id)) |req| { 285 if (!req.done) { 286 req.done = true; 287 req.end_ns = @intCast(std.time.nanoTimestamp()); 288 client.requests_done += 1; 289 if (req.on_complete) |cb| cb(req.status_code, req.userdata); 290 } 291 } 292 } 293 return 0; 294 } 295 296 fn h2DataChunk(session: ?*nghttp2.session, _: u8, stream_id: i32, data: [*c]const u8, len: usize, ud: ?*anyopaque) callconv(.c) c_int { 297 const client: *Http2Client = @ptrCast(@alignCast(ud)); 298 const req = client.findRequest(stream_id) orelse return 0; 299 if (req.on_data) |cb| cb(data[0..len], req.userdata) else req.response_body.appendSlice(client.allocator, data[0..len]) catch { 300 req.has_error = true; 301 }; req.bytes += len; 302 if (session) |s| _ = nghttp2.nghttp2_session_consume(s, stream_id, len); 303 304 return 0; 305 } 306 307 fn h2Header(_: ?*nghttp2.session, frame: *const nghttp2.frame, name: [*c]const u8, namelen: usize, value: [*c]const u8, valuelen: usize, _: u8, ud: ?*anyopaque) callconv(.c) c_int { 308 const client: *Http2Client = @ptrCast(@alignCast(ud)); 309 if (frame.hd.type != nghttp2.HEADERS) return 0; 310 const req = client.findRequest(frame.hd.stream_id) orelse return 0; 311 if (namelen == 7 and std.mem.eql(u8, name[0..7], ":status")) 312 req.status_code = std.fmt.parseInt(u16, value[0..valuelen], 10) catch 0; 313 if (std.mem.eql(u8, name[0..namelen], "content-encoding")) { 314 if (std.mem.startsWith(u8, value[0..valuelen], "gzip")) { 315 req.content_encoding = .gzip; 316 } 317 } 318 return 0; 319 } 320 321 fn h2StreamClose(_: ?*nghttp2.session, stream_id: i32, error_code: u32, ud: ?*anyopaque) callconv(.c) c_int { 322 const client: *Http2Client = @ptrCast(@alignCast(ud)); 323 const req = client.findRequest(stream_id) orelse return 0; 324 if (!req.done) { 325 req.done = true; 326 req.end_ns = @intCast(std.time.nanoTimestamp()); 327 client.requests_done += 1; 328 if (error_code != 0) { 329 req.has_error = true; 330 if (req.on_error) |cb| cb(FetchError.Http2Error, req.userdata); 331 } else if (req.on_complete) |cb| cb(req.status_code, req.userdata); 332 } 333 return 0; 334 } 335 336 fn initH2(self: *Http2Client) !void { 337 var callbacks: *nghttp2.session_callbacks = undefined; 338 if (nghttp2.nghttp2_session_callbacks_new(&callbacks) != 0) return error.Http2Error; 339 defer nghttp2.nghttp2_session_callbacks_del(callbacks); 340 341 nghttp2.nghttp2_session_callbacks_set_send_callback2(callbacks, h2Send); 342 nghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, h2FrameRecv); 343 nghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, h2DataChunk); 344 nghttp2.nghttp2_session_callbacks_set_on_header_callback(callbacks, h2Header); 345 nghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, h2StreamClose); 346 347 var session: *nghttp2.session = undefined; 348 if (nghttp2.nghttp2_session_client_new(&session, callbacks, self) != 0) return error.Http2Error; 349 self.h2_session = session; 350 351 var settings = [_]nghttp2.settings_entry{ 352 .{ .settings_id = nghttp2.SETTINGS_MAX_CONCURRENT_STREAMS, .value = MAX_PENDING_REQUESTS }, 353 .{ .settings_id = nghttp2.SETTINGS_INITIAL_WINDOW_SIZE, .value = 16 * 1024 * 1024 }, 354 }; 355 if (nghttp2.nghttp2_submit_settings(self.h2_session.?, nghttp2.FLAG_NONE, &settings, settings.len) != 0) return error.Http2Error; 356 357 const conn_window_increase: i32 = (16 * 1024 * 1024) - 65535; 358 _ = nghttp2.nghttp2_submit_window_update(self.h2_session.?, nghttp2.FLAG_NONE, 0, conn_window_increase); 359 } 360 361 fn flush(self: *Http2Client) !void { 362 if (self.closing) return error.ConnectionFailed; 363 if (self.h2_session) |session| while (nghttp2.nghttp2_session_want_write(session) != 0) if (nghttp2.nghttp2_session_send(session) != 0) break; 364 if (self.write_buf.items.len > 0) { 365 const data = try self.allocator.dupe(u8, self.write_buf.items); 366 self.write_buf.clearRetainingCapacity(); 367 const wr = try self.allocator.create(uv.write_t); 368 wr.data = data.ptr; 369 var buf = uv.buf_t{ .base = data.ptr, .len = data.len }; 370 if (tlsuv.tlsuv_stream_write(wr, &self.tls, &buf, onWrite) != 0) { 371 self.allocator.free(data); 372 self.allocator.destroy(wr); 373 return error.ConnectionFailed; 374 } 375 } 376 } 377 378 fn onWrite(wr: *uv.write_t, _: c_int) callconv(.c) void { 379 const data_ptr: [*]u8 = @ptrCast(wr.data); 380 std.c.free(data_ptr); 381 std.c.free(@ptrCast(wr)); 382 } 383 384 fn allocBuf(_: *uv.handle_t, size: usize, buf: *uv.buf_t) callconv(.c) void { 385 buf.base = @ptrCast(std.c.malloc(size) orelse return); 386 buf.len = size; 387 } 388 389 fn onRead(stream: *uv.stream_t, nread: isize, buf: *const uv.buf_t) callconv(.c) void { 390 const tls: *tlsuv.stream_t = @ptrCast(@alignCast(stream)); 391 const client: *Http2Client = @ptrCast(@alignCast(tls.data)); 392 defer if (buf.base) |b| std.c.free(b); 393 if (client.closing) return; 394 if (nread < 0) { 395 for (client.requests[0..client.request_count]) |*req| if (!req.done) { 396 req.done = true; 397 req.has_error = true; 398 client.requests_done += 1; 399 if (req.on_error) |cb| cb(FetchError.ConnectionFailed, req.userdata); 400 }; 401 return; 402 } 403 if (nread > 0 and client.h2_session != null) { 404 _ = nghttp2.nghttp2_session_mem_recv(client.h2_session.?, @ptrCast(buf.base), @intCast(nread)); 405 client.flush() catch {}; 406 } 407 } 408 409 fn onConnect(req: *uv.connect_t, status: c_int) callconv(.c) void { 410 const ctx: *ConnectCtx = @ptrCast(@alignCast(req.data)); 411 defer ctx.client.allocator.destroy(ctx); 412 ctx.client.connect_pending = false; 413 if (ctx.client.closing) { 414 ctx.client.connected = -1; 415 _ = tlsuv.tlsuv_stream_close(&ctx.client.tls, onStreamClose); 416 return; 417 } 418 if (status < 0) { 419 ctx.client.connected = -1; 420 return; 421 } 422 ctx.client.connected = 1; 423 ctx.client.tls.data = ctx.client; 424 ctx.client.initH2() catch { 425 ctx.client.connected = -1; 426 return; 427 }; 428 _ = tlsuv.tlsuv_stream_read_start(&ctx.client.tls, allocBuf, onRead); 429 ctx.client.flush() catch {}; 430 } 431 432 const ConnectCtx = struct { client: *Http2Client, req: uv.connect_t }; 433 434 fn ensureConnected(self: *Http2Client) !void { 435 if (self.closing) return error.ConnectionFailed; 436 if (self.connected > 0) return; 437 if (self.connected < 0) return error.ConnectionFailed; 438 439 var conn_start: u64 = @intCast(std.time.nanoTimestamp()); 440 441 if (!self.connect_pending) { 442 const ctx = try self.allocator.create(ConnectCtx); 443 ctx.* = .{ .client = self, .req = .{} }; 444 ctx.req.data = ctx; 445 if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) { 446 self.allocator.destroy(ctx); 447 return error.ConnectionFailed; 448 } 449 self.connect_pending = true; 450 } 451 452 var loop_count: u32 = 0; 453 while (self.connected == 0) { 454 _ = uv.uv_run(self.loop, uv.RUN_ONCE); 455 loop_count += 1; 456 } 457 conn_start = debug.timer(" h2: tls connect", conn_start); 458 debug.log(" h2: connect loop iterations={d}", .{loop_count}); 459 if (self.connected < 0) return error.ConnectionFailed; 460 } 461 462 pub fn initiateConnectAsync(self: *Http2Client) !void { 463 if (self.closing) return error.ConnectionFailed; 464 if (self.connected > 0) return; 465 if (self.connected < 0) return error.ConnectionFailed; 466 if (self.connect_pending) return; 467 468 const ctx = try self.allocator.create(ConnectCtx); 469 ctx.* = .{ .client = self, .req = .{} }; 470 ctx.req.data = ctx; 471 if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) { 472 self.allocator.destroy(ctx); 473 return error.ConnectionFailed; 474 } 475 self.connect_pending = true; 476 } 477 478 fn makeNv(name: [:0]const u8, value: [:0]const u8) nghttp2.nv { 479 return .{ 480 .name = @constCast(name.ptr), 481 .value = @constCast(value.ptr), 482 .namelen = name.len, .valuelen = value.len, .flags = nghttp2.NV_FLAG_NONE 483 }; 484 } 485 486 pub fn get(self: *Http2Client, path: []const u8, allocator: std.mem.Allocator) ![]u8 { 487 return self.getWithAccept(path, "application/json", allocator); 488 } 489 490 pub fn getWithAccept(self: *Http2Client, path: []const u8, accept: [:0]const u8, allocator: std.mem.Allocator) ![]u8 { 491 try self.ensureConnected(); 492 if (self.request_count >= MAX_PENDING_REQUESTS) self.resetRequests(); 493 const req = &self.requests[self.request_count]; self.request_count += 1; 494 495 req.* = .{ 496 .stream_id = 0, 497 .path = try self.allocator.dupeZ(u8, path), 498 .on_data = null, 499 .on_complete = null, 500 .on_error = null, 501 .userdata = null, 502 .response_body = .{}, 503 .status_code = 0, 504 .done = false, 505 .has_error = false, 506 .start_ns = @intCast(std.time.nanoTimestamp()), 507 .end_ns = 0, 508 .bytes = 0, 509 .content_encoding = .identity, 510 }; 511 512 const session = self.h2_session orelse return error.Http2Error; 513 514 var hdrs = [_]nghttp2.nv{ 515 makeNv(":method", "GET"), 516 makeNv(":path", req.path.?), 517 makeNv(":scheme", "https"), 518 makeNv(":authority", self.host), 519 makeNv("accept", accept), 520 makeNv("user-agent", user_agent) 521 }; 522 523 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req); 524 if (sid < 0) { 525 self.request_count -= 1; 526 if (req.path) |p| self.allocator.free(p); 527 return error.Http2Error; 528 } 529 530 req.stream_id = sid; 531 try self.flush(); 532 while (!req.done) { 533 _ = uv.uv_run(self.loop, uv.RUN_ONCE); 534 try self.flush(); 535 } 536 537 self.last_response_status_code = req.status_code; 538 if (req.has_error or req.status_code != 200) return error.ResponseError; 539 return try allocator.dupe(u8, req.response_body.items); 540 } 541 542 pub fn getStream(self: *Http2Client, path: []const u8, on_data: *const fn ([]const u8, ?*anyopaque) void, on_complete: *const fn (u16, ?*anyopaque) void, on_error: *const fn (FetchError, ?*anyopaque) void, userdata: ?*anyopaque) !void { 543 try self.ensureConnected(); 544 const req = self.findOrAllocSlot() orelse return error.OutOfMemory; 545 546 req.* = .{ 547 .stream_id = 0, 548 .path = try self.allocator.dupeZ(u8, path), 549 .on_data = on_data, 550 .on_complete = on_complete, 551 .on_error = on_error, 552 .userdata = userdata, 553 .response_body = .{}, 554 .status_code = 0, 555 .done = false, 556 .has_error = false, 557 .start_ns = @intCast(std.time.nanoTimestamp()), 558 .end_ns = 0, 559 .bytes = 0, 560 .content_encoding = .identity 561 }; 562 563 const session = self.h2_session orelse return error.Http2Error; 564 565 var hdrs = [_]nghttp2.nv{ 566 makeNv(":method", "GET"), 567 makeNv(":path", req.path.?), 568 makeNv(":scheme", "https"), 569 makeNv(":authority", self.host), 570 makeNv("accept", "*/*"), 571 makeNv("user-agent", user_agent) 572 }; 573 574 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req); 575 if (sid < 0) { 576 if (req.path) |p| self.allocator.free(p); 577 req.stream_id = -1; 578 return error.Http2Error; 579 } 580 581 req.stream_id = sid; 582 try self.flush(); 583 } 584 585 pub fn run(self: *Http2Client) !void { 586 const run_start: u64 = @intCast(std.time.nanoTimestamp()); 587 var loop_count: u32 = 0; 588 var last_done: usize = 0; 589 var last_report: u64 = run_start; 590 591 while (self.requests_done < self.request_count) { 592 if (uv.uv_run(self.loop, uv.RUN_ONCE) == 0) break; 593 try self.flush(); 594 loop_count += 1; 595 596 const now: u64 = @intCast(std.time.nanoTimestamp()); 597 if (now - last_report > 1_000_000_000) { 598 const done_delta = self.requests_done - last_done; 599 debug.log(" h2: progress {d}/{d} (+{d} in last 1s) loops={d}", .{ 600 self.requests_done, self.request_count, 601 done_delta, loop_count, 602 }); 603 last_done = self.requests_done; 604 last_report = now; 605 } 606 } 607 608 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start)); 609 const elapsed_ms = elapsed_ns / 1_000_000; 610 debug.log(" h2: run complete in {d}ms, {d} loops, {d}/{d} done", .{ 611 elapsed_ms, loop_count, 612 self.requests_done, self.request_count, 613 }); 614 615 var error_count: usize = 0; 616 for (self.requests[0..self.request_count]) |req| { 617 if (req.has_error) error_count += 1; 618 } 619 if (error_count > 0) { 620 debug.log(" h2: {d} requests had errors", .{error_count}); 621 return error.ResponseError; 622 } 623 } 624}; 625 626pub const TarballCtx = struct { 627 handler: StreamHandler, 628 done: bool, 629 has_error: bool, 630 url: []const u8, 631 start_ns: u64, 632 bytes: usize, 633}; 634 635const TarballStats = struct { 636 url: []const u8, 637 bytes: usize, 638 elapsed_ms: u64, 639}; 640 641const TarballCallbacks = struct { 642 fn onData(data: []const u8, ud: ?*anyopaque) void { 643 const ctx: *TarballCtx = @ptrCast(@alignCast(ud)); 644 ctx.bytes += data.len; 645 ctx.handler.on_data(data, ctx.handler.user_data); 646 } 647 648 fn onComplete(status: u16, ud: ?*anyopaque) void { 649 const ctx: *TarballCtx = @ptrCast(@alignCast(ud)); 650 ctx.handler.on_complete(status, ctx.handler.user_data); 651 ctx.done = true; 652 } 653 654 fn onError(err: FetchError, ud: ?*anyopaque) void { 655 const ctx: *TarballCtx = @ptrCast(@alignCast(ud)); 656 ctx.handler.on_error(err, ctx.handler.user_data); 657 ctx.done = true; 658 ctx.has_error = true; 659 } 660}; 661 662pub const Fetcher = struct { 663 allocator: std.mem.Allocator, 664 registry_host: []const u8, 665 meta_clients: [NUM_META_CONNECTIONS]?*Http2Client, 666 meta_clients_initialized: bool, 667 pending: std.ArrayListUnmanaged(PendingRequest), 668 tarball_clients: [NUM_CONNECTIONS]?*Http2Client, 669 tarball_clients_initialized: bool, 670 tarball_contexts: std.ArrayListUnmanaged(*TarballCtx), 671 tarball_round_robin: usize, 672 tarball_stats: std.ArrayListUnmanaged(TarballStats), 673 last_http_error_url: ?[]u8, 674 last_http_error_status: u16, 675 676 pub const HttpErrorInfo = struct { 677 url: []const u8, 678 status: u16, 679 }; 680 681 pub fn init(allocator: std.mem.Allocator, registry_host: []const u8) !*Fetcher { 682 const f = try allocator.create(Fetcher); 683 f.* = .{ 684 .allocator = allocator, 685 .registry_host = try allocator.dupe(u8, registry_host), 686 .meta_clients = [_]?*Http2Client{null} ** NUM_META_CONNECTIONS, 687 .meta_clients_initialized = false, 688 .pending = .{}, 689 .tarball_clients = [_]?*Http2Client{null} ** NUM_CONNECTIONS, 690 .tarball_clients_initialized = false, 691 .tarball_contexts = .{}, 692 .tarball_round_robin = 0, 693 .tarball_stats = .{}, 694 .last_http_error_url = null, 695 .last_http_error_status = 0, 696 }; 697 return f; 698 } 699 700 pub fn deinit(self: *Fetcher) void { 701 if (self.last_http_error_url) |url| self.allocator.free(url); 702 for (&self.meta_clients) |*maybe_client| { 703 if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; } 704 } 705 for (self.pending.items) |req| self.allocator.free(req.url); 706 self.pending.deinit(self.allocator); 707 for (&self.tarball_clients) |*maybe_client| { 708 if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; } 709 } 710 for (self.tarball_contexts.items) |ctx| { 711 self.allocator.free(ctx.url); 712 self.allocator.destroy(ctx); 713 } 714 self.tarball_contexts.deinit(self.allocator); 715 for (self.tarball_stats.items) |stat| self.allocator.free(stat.url); 716 self.tarball_stats.deinit(self.allocator); 717 self.allocator.free(self.registry_host); 718 self.allocator.destroy(self); 719 } 720 721 fn clearLastHttpError(self: *Fetcher) void { 722 if (self.last_http_error_url) |url| self.allocator.free(url); 723 self.last_http_error_url = null; 724 self.last_http_error_status = 0; 725 } 726 727 fn setLastHttpError(self: *Fetcher, url: []const u8, status: u16) void { 728 self.clearLastHttpError(); 729 self.last_http_error_url = self.allocator.dupe(u8, url) catch null; 730 self.last_http_error_status = status; 731 } 732 733 pub fn getLastHttpError(self: *const Fetcher) ?HttpErrorInfo { 734 const url = self.last_http_error_url orelse return null; 735 return .{ .url = url, .status = self.last_http_error_status }; 736 } 737 738 fn ensureMetaClients(self: *Fetcher) !void { 739 if (self.meta_clients_initialized) return; 740 741 for (&self.meta_clients, 0..) |*slot, i| { 742 const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| { 743 debug.log("fetcher: failed to init meta connection {d}: {}", .{ i, err }); 744 continue; 745 }; 746 client.ensureConnected() catch |err| { 747 debug.log("fetcher: failed to connect meta {d}: {}", .{ i, err }); 748 client.deinit(); 749 continue; 750 }; 751 slot.* = client; 752 } 753 754 var any_connected = false; 755 for (self.meta_clients) |slot| { 756 if (slot != null) { any_connected = true; break; } 757 } 758 759 if (!any_connected) return error.ConnectionFailed; 760 self.meta_clients_initialized = true; 761 } 762 763 pub fn resetMetaClients(self: *Fetcher) void { 764 self.clearLastHttpError(); 765 for (&self.meta_clients) |*slot| { 766 if (slot.*) |client| { client.deinit(); slot.* = null; } 767 } 768 self.meta_clients_initialized = false; 769 } 770 771 fn ensureTarballClients(self: *Fetcher) !void { 772 if (self.tarball_clients_initialized) return; 773 774 debug.log("fetcher: initializing {d} persistent connections", .{NUM_CONNECTIONS}); 775 const init_start: u64 = @intCast(std.time.nanoTimestamp()); 776 777 for (&self.tarball_clients, 0..) |*slot, i| { 778 const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| { 779 debug.log("fetcher: failed to init connection {d}: {}", .{ i, err }); 780 continue; 781 }; 782 client.ensureConnected() catch |err| { 783 debug.log("fetcher: failed to connect {d}: {}", .{ i, err }); 784 client.deinit(); 785 continue; 786 }; 787 slot.* = client; 788 } 789 790 var any_connected = false; 791 for (self.tarball_clients) |slot| { 792 if (slot != null) { any_connected = true; break; } 793 } 794 795 if (!any_connected) return error.ConnectionFailed; 796 self.tarball_clients_initialized = true; 797 798 _ = debug.timer("fetcher: connection pool init", init_start); 799 } 800 801 fn findAvailableClient(self: *Fetcher) ?struct { client: *Http2Client, idx: usize } { 802 var attempts: usize = 0; 803 while (attempts < NUM_CONNECTIONS) : (attempts += 1) { 804 const idx = (self.tarball_round_robin + attempts) % NUM_CONNECTIONS; 805 if (self.tarball_clients[idx]) |client| { if (client.hasCapacity()) return .{ .client = client, .idx = idx }; } 806 } 807 return null; 808 } 809 810 pub fn initiateTarballConnectionsAsync(self: *Fetcher) void { 811 if (self.tarball_clients_initialized) return; 812 debug.log("fetcher: initiating {d} tarball connections (async)", .{NUM_CONNECTIONS}); 813 814 for (&self.tarball_clients, 0..) |*slot, i| { 815 const client = Http2Client.init(self.allocator, self.registry_host, true) catch { 816 continue; 817 }; 818 client.initiateConnectAsync() catch { 819 client.deinit(); continue; 820 }; 821 slot.* = client; _ = i; 822 } 823 824 var any_connected = false; 825 for (self.tarball_clients) |slot| { 826 if (slot != null) { any_connected = true; break; } 827 } 828 829 if (any_connected) self.tarball_clients_initialized = true; 830 } 831 832 pub fn queueTarballAsync(self: *Fetcher, url: []const u8, handler: StreamHandler) !void { 833 try self.ensureTarballClients(); 834 const parsed = try ParsedUrl.parse(url); 835 836 const available = self.findAvailableClient() orelse { 837 try self.pending.append(self.allocator, .{ 838 .url = try self.allocator.dupe(u8, url), 839 .handler = handler, 840 }); return; 841 }; 842 843 const ctx = try self.allocator.create(TarballCtx); 844 ctx.* = .{ 845 .handler = handler, 846 .done = false, 847 .has_error = false, 848 .url = try self.allocator.dupe(u8, url), 849 .start_ns = @intCast(std.time.nanoTimestamp()), 850 .bytes = 0, 851 }; 852 853 try self.tarball_contexts.append( 854 self.allocator, 855 ctx 856 ); 857 858 try available.client.getStream( 859 parsed.path, 860 TarballCallbacks.onData, 861 TarballCallbacks.onComplete, 862 TarballCallbacks.onError, 863 ctx, 864 ); 865 866 self.tarball_round_robin = (available.idx + 1) % NUM_CONNECTIONS; 867 } 868 869 pub fn tick(self: *Fetcher) usize { 870 self.ensureTarballClients() catch return 0; 871 const loop = uv.uv_default_loop(); 872 873 for (&self.tarball_clients) |maybe_client| { 874 if (maybe_client) |c| c.flush() catch {}; 875 } 876 _ = uv.uv_run(loop, uv.RUN_NOWAIT); 877 878 for (&self.tarball_clients) |maybe_client| { 879 if (maybe_client) |c| c.recycleCompletedRequests(); 880 } 881 882 const completed = self.cleanupCompletedContexts(); 883 self.dispatchPending(); 884 885 return completed; 886 } 887 888 fn cleanupCompletedContexts(self: *Fetcher) usize { 889 var completed: usize = 0; var i: usize = 0; 890 while (i < self.tarball_contexts.items.len) { 891 const ctx = self.tarball_contexts.items[i]; 892 if (ctx.done) { 893 completed += 1; 894 self.allocator.free(ctx.url); 895 self.allocator.destroy(ctx); 896 _ = self.tarball_contexts.swapRemove(i); 897 } else i += 1; 898 } 899 return completed; 900 } 901 902 fn dispatchPending(self: *Fetcher) void { 903 while (self.pending.items.len > 0) { 904 const available = self.findAvailableClient() orelse break; 905 const req = self.pending.pop() orelse break; 906 907 const handler = req.handler orelse { 908 self.allocator.free(req.url); continue; 909 }; 910 911 self.dispatchRequest(available.client, req.url, handler) catch |err| { 912 handler.on_error(errToFetchError(err), handler.user_data); 913 self.allocator.free(req.url); continue; 914 }; 915 } 916 } 917 918 fn dispatchRequest(self: *Fetcher, client: *Http2Client, url: []const u8, handler: StreamHandler) !void { 919 const parsed = try ParsedUrl.parse(url); 920 const ctx = try self.allocator.create(TarballCtx); 921 922 ctx.* = .{ 923 .handler = handler, 924 .done = false, 925 .has_error = false, 926 .url = url, 927 .start_ns = @intCast(std.time.nanoTimestamp()), 928 .bytes = 0, 929 }; 930 931 errdefer self.allocator.destroy(ctx); 932 try self.tarball_contexts.append(self.allocator, ctx); 933 934 try client.getStream( 935 parsed.path, 936 TarballCallbacks.onData, 937 TarballCallbacks.onComplete, 938 TarballCallbacks.onError, 939 ctx, 940 ); 941 } 942 943 fn errToFetchError(err: anyerror) FetchError { 944 return switch (err) { 945 error.InvalidUrl => FetchError.InvalidUrl, 946 error.OutOfMemory => FetchError.OutOfMemory, 947 else => FetchError.Http2Error, 948 }; 949 } 950 951 pub fn pendingTarballCount(self: *Fetcher) usize { 952 return self.tarball_contexts.items.len; 953 } 954 955 pub fn finishTarballs(self: *Fetcher) void { 956 const loop = uv.uv_default_loop(); 957 var last_report: u64 = @intCast(std.time.nanoTimestamp()); 958 var loops: usize = 0; 959 var completed: usize = 0; 960 const start = last_report; 961 962 while (self.tarball_contexts.items.len > 0 or self.pending.items.len > 0) { 963 for (&self.tarball_clients) |maybe_client| { 964 if (maybe_client) |c| c.flush() catch {}; 965 } 966 967 if (uv.uv_run(loop, uv.RUN_ONCE) == 0 and self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) break; 968 loops += 1; 969 970 for (&self.tarball_clients) |maybe_client| { 971 if (maybe_client) |c| c.recycleCompletedRequests(); 972 } 973 974 var i: usize = 0; 975 while (i < self.tarball_contexts.items.len) { 976 const ctx = self.tarball_contexts.items[i]; 977 if (ctx.done) { 978 if (!ctx.has_error) { 979 const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - ctx.start_ns) / 1_000_000); 980 const url_copy = self.allocator.dupe(u8, ctx.url) catch null; 981 if (url_copy) |url| { 982 self.tarball_stats.append(self.allocator, .{ .url = url, .bytes = ctx.bytes, .elapsed_ms = elapsed_ms }) catch {}; 983 } 984 } 985 self.allocator.free(ctx.url); 986 self.allocator.destroy(ctx); 987 _ = self.tarball_contexts.swapRemove(i); 988 completed += 1; 989 } else { 990 i += 1; 991 } 992 } 993 994 while (self.pending.items.len > 0) { 995 var queued = false; 996 for (&self.tarball_clients, 0..) |maybe_client, conn_idx| { 997 if (maybe_client) |client| { 998 if (client.hasCapacity()) { 999 const maybe_req = self.pending.pop(); 1000 const req = maybe_req orelse break; 1001 if (req.handler) |handler| { 1002 const parsed = ParsedUrl.parse(req.url) catch { 1003 handler.on_error(FetchError.InvalidUrl, handler.user_data); 1004 self.allocator.free(req.url); 1005 continue; 1006 }; 1007 1008 const ctx = self.allocator.create(TarballCtx) catch { 1009 handler.on_error(FetchError.OutOfMemory, handler.user_data); 1010 self.allocator.free(req.url); 1011 continue; 1012 }; 1013 ctx.* = .{ 1014 .handler = handler, 1015 .done = false, 1016 .has_error = false, 1017 .url = req.url, 1018 .start_ns = @intCast(std.time.nanoTimestamp()), 1019 .bytes = 0, 1020 }; 1021 self.tarball_contexts.append(self.allocator, ctx) catch { 1022 self.allocator.destroy(ctx); 1023 self.allocator.free(req.url); 1024 continue; 1025 }; 1026 1027 client.getStream( 1028 parsed.path, 1029 struct { 1030 fn onData(data: []const u8, ud: ?*anyopaque) void { 1031 const c: *TarballCtx = @ptrCast(@alignCast(ud)); 1032 c.bytes += data.len; 1033 c.handler.on_data(data, c.handler.user_data); 1034 } 1035 }.onData, 1036 struct { 1037 fn onComplete(status: u16, ud: ?*anyopaque) void { 1038 const c: *TarballCtx = @ptrCast(@alignCast(ud)); 1039 c.handler.on_complete(status, c.handler.user_data); 1040 if (debug.enabled) { 1041 const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000); 1042 debug.log(" tarball: done {s} {d}ms {d} bytes status={d}", .{ c.url, elapsed_ms, c.bytes, status }); 1043 } 1044 c.done = true; 1045 } 1046 }.onComplete, 1047 struct { 1048 fn onError(err: FetchError, ud: ?*anyopaque) void { 1049 const c: *TarballCtx = @ptrCast(@alignCast(ud)); 1050 c.handler.on_error(err, c.handler.user_data); 1051 if (debug.enabled) { 1052 const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000); 1053 debug.log(" tarball: error {s} {d}ms {d} bytes", .{ c.url, elapsed_ms, c.bytes }); 1054 } 1055 c.done = true; 1056 c.has_error = true; 1057 } 1058 }.onError, 1059 ctx, 1060 ) catch { 1061 handler.on_error(FetchError.Http2Error, handler.user_data); 1062 ctx.done = true; 1063 }; 1064 queued = true; 1065 _ = conn_idx; 1066 } else { 1067 self.allocator.free(req.url); 1068 } 1069 break; 1070 } 1071 } 1072 } 1073 if (!queued) break; 1074 } 1075 1076 const now: u64 = @intCast(std.time.nanoTimestamp()); 1077 if (now - last_report > 1_000_000_000) { 1078 var total_bytes: usize = 0; 1079 for (self.tarball_contexts.items) |ctx| { 1080 total_bytes += ctx.bytes; 1081 } 1082 debug.log(" h2: {d} in-flight, {d} pending, {d} completed, {d} loops", .{ 1083 self.tarball_contexts.items.len, 1084 self.pending.items.len, 1085 completed, 1086 loops, 1087 }); 1088 debug.log(" h2: tarball progress in-flight bytes={d}", .{ total_bytes }); 1089 last_report = now; 1090 } 1091 } 1092 1093 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, start)); 1094 debug.log("fetcher: finishTarballs completed in {d}ms, {d} loops, {d} completed", .{ 1095 elapsed_ns / 1_000_000, 1096 loops, 1097 completed, 1098 }); 1099 if (debug.enabled and self.tarball_stats.items.len > 0) { 1100 var top_time: [5]?TarballStats = .{null} ** 5; 1101 var top_size: [5]?TarballStats = .{null} ** 5; 1102 1103 for (self.tarball_stats.items) |stat| { 1104 var idx_time: usize = top_time.len; 1105 for (top_time, 0..) |slot, i| { 1106 if (slot == null or stat.elapsed_ms > slot.?.elapsed_ms) { 1107 idx_time = i; 1108 break; 1109 } 1110 } 1111 if (idx_time < top_time.len) { 1112 var carry = stat; 1113 var j = idx_time; 1114 while (j < top_time.len) : (j += 1) { 1115 const next = top_time[j]; 1116 top_time[j] = carry; 1117 if (next) |n| { 1118 carry = n; 1119 } else { 1120 break; 1121 } 1122 } 1123 } 1124 1125 var idx_size: usize = top_size.len; 1126 for (top_size, 0..) |slot, i| { 1127 if (slot == null or stat.bytes > slot.?.bytes) { 1128 idx_size = i; 1129 break; 1130 } 1131 } 1132 if (idx_size < top_size.len) { 1133 var carry_size = stat; 1134 var k = idx_size; 1135 while (k < top_size.len) : (k += 1) { 1136 const next_size = top_size[k]; 1137 top_size[k] = carry_size; 1138 if (next_size) |n| { 1139 carry_size = n; 1140 } else { 1141 break; 1142 } 1143 } 1144 } 1145 } 1146 1147 debug.log("fetcher: top tarballs by time", .{}); 1148 for (top_time, 0..) |maybe_stat, i| { 1149 if (maybe_stat) |stat| { 1150 debug.log(" {d}. {s} {d}ms {d} bytes", .{ i + 1, stat.url, stat.elapsed_ms, stat.bytes }); 1151 } 1152 } 1153 debug.log("fetcher: top tarballs by size", .{}); 1154 for (top_size, 0..) |maybe_stat, i| { 1155 if (maybe_stat) |stat| { 1156 debug.log(" {d}. {s} {d} bytes {d}ms", .{ i + 1, stat.url, stat.bytes, stat.elapsed_ms }); 1157 } 1158 } 1159 } 1160 } 1161 1162 pub fn fetchMetadata(self: *Fetcher, package_name: []const u8, allocator: std.mem.Allocator) ![]u8 { 1163 return self.fetchMetadataFull(package_name, false, allocator); 1164 } 1165 1166 const DecodedMetadata = struct { 1167 data: []u8, 1168 compressed: bool, 1169 }; 1170 1171 fn metaClientCanQueue(c: *const Http2Client) bool { 1172 return c.h2_session != null and c.connected == 1 and c.request_count < MAX_PENDING_REQUESTS - 1; 1173 } 1174 1175 fn nextMetaClient(self: *Fetcher, conn_idx: *usize) ?*Http2Client { 1176 var attempts: usize = 0; 1177 while (attempts < NUM_META_CONNECTIONS) : (attempts += 1) { 1178 if (self.meta_clients[conn_idx.*]) |c| { 1179 if (metaClientCanQueue(c)) return c; 1180 } 1181 conn_idx.* = (conn_idx.* + 1) % NUM_META_CONNECTIONS; 1182 } 1183 return null; 1184 } 1185 1186 fn flushMetaClients(self: *Fetcher) void { 1187 for (self.meta_clients) |maybe_client| { 1188 if (maybe_client) |c| c.flush() catch {}; 1189 } 1190 } 1191 1192 fn metaRequestsComplete(self: *Fetcher) bool { 1193 for (self.meta_clients) |maybe_client| { 1194 if (maybe_client) |c| { 1195 for (c.requests[0..c.request_count]) |*req| { 1196 if (!req.done and !req.has_error) return false; 1197 } 1198 } 1199 } 1200 return true; 1201 } 1202 1203 fn decodeMetadataOwned( 1204 req: *Http2Client.RequestState, 1205 allocator: std.mem.Allocator, 1206 decompress_buf: *std.ArrayListUnmanaged(u8), 1207 ) ?DecodedMetadata { 1208 if (req.has_error or req.status_code != 200) return null; 1209 1210 if (req.content_encoding != .gzip) { 1211 const data = allocator.dupe(u8, req.response_body.items) catch return null; 1212 return .{ .data = data, .compressed = false }; 1213 } 1214 1215 decompress_buf.clearRetainingCapacity(); 1216 const decomp = extractor.GzipDecompressor.init(allocator) catch return null; 1217 defer decomp.deinit(); 1218 1219 _ = decomp.decompress(req.response_body.items, struct { 1220 fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void { 1221 const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx)); 1222 try buf.appendSlice(c_allocator, data); 1223 } 1224 }.onChunk, decompress_buf) catch return null; 1225 1226 const data = allocator.dupe(u8, decompress_buf.items) catch return null; 1227 return .{ .data = data, .compressed = true }; 1228 } 1229 1230 pub fn fetchMetadataFull(self: *Fetcher, package_name: []const u8, full: bool, allocator: std.mem.Allocator) ![]u8 { 1231 try self.ensureMetaClients(); 1232 self.clearLastHttpError(); 1233 for (self.meta_clients) |maybe_client| { 1234 if (maybe_client) |client| { 1235 var path_buf: [512]u8 = undefined; 1236 const path_slice = std.fmt.bufPrint(&path_buf, "/{s}", .{package_name}) catch return error.OutOfMemory; 1237 const accept: [:0]const u8 = if (full) "application/json" else "application/vnd.npm.install-v1+json"; 1238 return client.getWithAccept(path_slice, accept, allocator) catch |err| { 1239 if (err == error.ResponseError) { 1240 var url_buf: [1024]u8 = undefined; 1241 const url = std.fmt.bufPrint(&url_buf, "https://{s}/{s}", .{ self.registry_host, package_name }) catch ""; 1242 self.setLastHttpError(url, client.last_response_status_code); 1243 } 1244 return err; 1245 }; 1246 } 1247 } 1248 return error.ConnectionFailed; 1249 } 1250 1251 pub const MetadataResult = struct { 1252 name: []const u8, 1253 data: ?[]u8, 1254 compressed: bool, 1255 has_error: bool, 1256 }; 1257 1258 fn storeMetadataBatchResult( 1259 req: *Http2Client.RequestState, 1260 result: *MetadataResult, 1261 allocator: std.mem.Allocator, 1262 decompress_buf: *std.ArrayListUnmanaged(u8), 1263 ) bool { 1264 const decoded = decodeMetadataOwned(req, allocator, decompress_buf) orelse { 1265 result.has_error = true; 1266 return false; 1267 }; 1268 result.data = decoded.data; 1269 result.compressed = decoded.compressed; 1270 return true; 1271 } 1272 1273 pub fn fetchMetadataBatch(self: *Fetcher, names: []const []const u8, allocator: std.mem.Allocator) ![]MetadataResult { 1274 if (names.len == 0) return &[_]MetadataResult{}; 1275 1276 var total_start: u64 = @intCast(std.time.nanoTimestamp()); 1277 try self.ensureMetaClients(); 1278 total_start = debug.timer(" meta: get clients", total_start); 1279 1280 var active_connections: usize = 0; 1281 for (self.meta_clients) |maybe_client| { 1282 if (maybe_client != null) active_connections += 1; 1283 } if (active_connections == 0) return error.ConnectionFailed; 1284 1285 debug.log(" meta: batch {d} packages across {d} connections", .{ names.len, active_connections }); 1286 1287 var results = try allocator.alloc(MetadataResult, names.len); 1288 for (results, 0..) |*r, i| { 1289 r.* = .{ .name = names[i], .data = null, .compressed = false, .has_error = false }; 1290 } 1291 1292 const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1); 1293 var offset: usize = 0; 1294 var batch_num: usize = 0; 1295 1296 var decompress_buf = std.ArrayListUnmanaged(u8){}; 1297 defer decompress_buf.deinit(c_allocator); 1298 1299 while (offset < names.len) { 1300 const end = @min(offset + total_capacity, names.len); 1301 var batch_start: u64 = @intCast(std.time.nanoTimestamp()); 1302 debug.log(" meta: batch {d} ({d}-{d})", .{ batch_num, offset, end }); 1303 1304 var queued: usize = 0; 1305 var conn_idx: usize = 0; 1306 for (offset..end) |i| { 1307 const result = &results[i]; 1308 const name = names[i]; 1309 1310 const c = self.nextMetaClient(&conn_idx) orelse { 1311 result.has_error = true; continue; 1312 }; 1313 1314 const session = c.h2_session orelse { 1315 result.has_error = true; continue; 1316 }; 1317 1318 var path_buf: [512]u8 = undefined; 1319 const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch { 1320 result.has_error = true; 1321 continue; 1322 }; 1323 1324 var hdrs = [_]nghttp2.nv{ 1325 Http2Client.makeNv(":method", "GET"), 1326 Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch { 1327 result.has_error = true; continue; 1328 }), 1329 Http2Client.makeNv(":scheme", "https"), 1330 Http2Client.makeNv(":authority", c.host), 1331 Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"), 1332 Http2Client.makeNv("accept-encoding", "gzip"), 1333 Http2Client.makeNv("user-agent", user_agent), 1334 }; 1335 1336 const req = &c.requests[c.request_count]; 1337 c.request_count += 1; 1338 req.* = .{ 1339 .stream_id = 0, 1340 .path = hdrs[1].value[0..hdrs[1].valuelen :0], 1341 .on_data = null, 1342 .on_complete = null, 1343 .on_error = null, 1344 .userdata = result, 1345 .response_body = .{}, 1346 .status_code = 0, 1347 .done = false, 1348 .has_error = false, 1349 .start_ns = @intCast(std.time.nanoTimestamp()), 1350 .end_ns = 0, 1351 .bytes = 0, 1352 .content_encoding = .identity, 1353 }; 1354 1355 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req); 1356 if (sid < 0) { 1357 c.request_count -= 1; 1358 if (req.path) |p| c.allocator.free(p); 1359 result.has_error = true; 1360 continue; 1361 } 1362 req.stream_id = sid; 1363 queued += 1; 1364 conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS; 1365 } 1366 1367 batch_start = debug.timer(" meta: queue requests", batch_start); 1368 self.flushMetaClients(); 1369 1370 const loop = uv.uv_default_loop(); 1371 var all_done = false; 1372 var loops: usize = 0; 1373 const run_start: u64 = @intCast(std.time.nanoTimestamp()); 1374 1375 while (!all_done) { 1376 _ = uv.uv_run(loop, uv.RUN_ONCE); 1377 loops += 1; 1378 all_done = self.metaRequestsComplete(); 1379 } 1380 1381 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start)); 1382 debug.log(" h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops }); 1383 batch_start = debug.timer(" meta: run h2 loop", batch_start); 1384 1385 var slow_count: usize = 0; 1386 var max_req_ms: u64 = 0; 1387 var max_req_name: []const u8 = ""; 1388 var total_bytes: usize = 0; 1389 for (self.meta_clients) |maybe_client| { 1390 if (maybe_client) |c| { 1391 for (c.requests[0..c.request_count]) |*req| { 1392 const result: *MetadataResult = @ptrCast(@alignCast(req.userdata)); 1393 const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp())); 1394 const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000); 1395 total_bytes += req.response_body.items.len; 1396 if (duration_ms > max_req_ms) { 1397 max_req_ms = duration_ms; 1398 max_req_name = result.name; 1399 } 1400 if (duration_ms >= META_SLOW_LOG_MS) { 1401 slow_count += 1; 1402 debug.log(" meta: slow {s} {d}ms {d} bytes status={d}", .{ 1403 result.name, 1404 duration_ms, 1405 req.response_body.items.len, 1406 req.status_code, 1407 }); 1408 } 1409 } 1410 } 1411 } 1412 debug.log(" meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes }); 1413 1414 var success: usize = 0; 1415 for (self.meta_clients) |maybe_client| { 1416 if (maybe_client) |c| { 1417 for (c.requests[0..c.request_count]) |*req| { 1418 const result: *MetadataResult = @ptrCast(@alignCast(req.userdata)); 1419 if (storeMetadataBatchResult(req, result, allocator, &decompress_buf)) success += 1; 1420 } 1421 c.resetRequests(); 1422 } 1423 } 1424 _ = debug.timer(" meta: copy results", batch_start); 1425 debug.log(" meta: queued={d} success={d}", .{ queued, success }); 1426 1427 offset = end; 1428 batch_num += 1; 1429 } 1430 1431 return results; 1432 } 1433 1434 pub const MetadataCallback = *const fn ( 1435 name: []const u8, 1436 data: ?[]const u8, 1437 has_error: bool, 1438 userdata: ?*anyopaque 1439 ) void; 1440 1441 const MetadataStreamTracker = struct { 1442 name: []const u8, 1443 index: usize, 1444 }; 1445 1446 fn emitMetadataStreamingResult( 1447 req: *Http2Client.RequestState, 1448 name: []const u8, 1449 allocator: std.mem.Allocator, 1450 decompress_buf: *std.ArrayListUnmanaged(u8), 1451 callback: MetadataCallback, 1452 userdata: ?*anyopaque, 1453 ) void { 1454 if (req.has_error or req.status_code != 200) { 1455 callback(name, null, true, userdata); 1456 return; 1457 } 1458 1459 if (req.content_encoding != .gzip) { 1460 callback(name, req.response_body.items, false, userdata); 1461 return; 1462 } 1463 1464 decompress_buf.clearRetainingCapacity(); 1465 const decomp = extractor.GzipDecompressor.init(allocator) catch { 1466 callback(name, null, true, userdata); 1467 return; 1468 }; 1469 defer decomp.deinit(); 1470 1471 _ = decomp.decompress(req.response_body.items, struct { 1472 fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void { 1473 const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx)); 1474 try buf.appendSlice(c_allocator, data); 1475 } 1476 }.onChunk, decompress_buf) catch { 1477 callback(name, null, true, userdata); 1478 return; 1479 }; 1480 1481 callback(name, decompress_buf.items, false, userdata); 1482 } 1483 1484 fn emitCompletedStreamingMetadataCallbacks( 1485 self: *Fetcher, 1486 processed: []bool, 1487 allocator: std.mem.Allocator, 1488 decompress_buf: *std.ArrayListUnmanaged(u8), 1489 callback: MetadataCallback, 1490 userdata: ?*anyopaque, 1491 ) void { 1492 for (self.meta_clients) |maybe_client| { 1493 const c = maybe_client orelse continue; 1494 for (c.requests[0..c.request_count]) |*req| { 1495 if (!req.done and !req.has_error) continue; 1496 1497 const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata)); 1498 if (processed[tracker.index]) continue; 1499 1500 processed[tracker.index] = true; 1501 emitMetadataStreamingResult(req, tracker.name, allocator, decompress_buf, callback, userdata); 1502 } 1503 } 1504 } 1505 1506 pub fn fetchMetadataStreaming( 1507 self: *Fetcher, 1508 names: []const []const u8, 1509 allocator: std.mem.Allocator, 1510 callback: MetadataCallback, 1511 userdata: ?*anyopaque, 1512 ) !void { 1513 if (names.len == 0) return; 1514 1515 var total_start: u64 = @intCast(std.time.nanoTimestamp()); 1516 try self.ensureMetaClients(); 1517 total_start = debug.timer(" meta: get clients", total_start); 1518 1519 var active_connections: usize = 0; 1520 for (self.meta_clients) |maybe_client| { 1521 if (maybe_client != null) active_connections += 1; 1522 } 1523 1524 if (active_connections == 0) return error.ConnectionFailed; 1525 debug.log(" meta: streaming {d} packages across {d} connections", .{ names.len, active_connections }); 1526 1527 const processed = try allocator.alloc(bool, names.len); 1528 defer allocator.free(processed); 1529 @memset(processed, false); 1530 1531 var trackers = try allocator.alloc(MetadataStreamTracker, names.len); 1532 defer allocator.free(trackers); 1533 for (names, 0..) |name, i| { 1534 trackers[i] = .{ .name = name, .index = i }; 1535 } 1536 1537 const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1); 1538 var offset: usize = 0; 1539 var batch_num: usize = 0; 1540 1541 var decompress_buf = std.ArrayListUnmanaged(u8){}; 1542 defer decompress_buf.deinit(c_allocator); 1543 1544 while (offset < names.len) { 1545 const end = @min(offset + total_capacity, names.len); 1546 var batch_start: u64 = @intCast(std.time.nanoTimestamp()); 1547 1548 debug.log(" meta: batch {d} ({d}-{d})", .{ batch_num, offset, end }); 1549 1550 var queued: usize = 0; 1551 var conn_idx: usize = 0; 1552 for (offset..end) |i| { 1553 const tracker = &trackers[i]; 1554 const name = names[i]; 1555 1556 const c = self.nextMetaClient(&conn_idx) orelse continue; 1557 const session = c.h2_session orelse continue; 1558 1559 var path_buf: [512]u8 = undefined; 1560 const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch continue; 1561 1562 var hdrs = [_]nghttp2.nv{ 1563 Http2Client.makeNv(":method", "GET"), 1564 Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch continue), 1565 Http2Client.makeNv(":scheme", "https"), 1566 Http2Client.makeNv(":authority", c.host), 1567 Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"), 1568 Http2Client.makeNv("accept-encoding", "gzip"), 1569 Http2Client.makeNv("user-agent", user_agent), 1570 }; 1571 1572 const req = &c.requests[c.request_count]; 1573 c.request_count += 1; 1574 req.* = .{ 1575 .stream_id = 0, 1576 .path = hdrs[1].value[0..hdrs[1].valuelen :0], 1577 .on_data = null, 1578 .on_complete = null, 1579 .on_error = null, 1580 .userdata = tracker, 1581 .response_body = .{}, 1582 .status_code = 0, 1583 .done = false, 1584 .has_error = false, 1585 .start_ns = 0, 1586 .end_ns = 0, 1587 .bytes = 0, 1588 .content_encoding = .identity, 1589 }; 1590 req.start_ns = @intCast(std.time.nanoTimestamp()); 1591 1592 const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req); 1593 if (sid < 0) { 1594 c.request_count -= 1; 1595 if (req.path) |p| c.allocator.free(p); 1596 continue; 1597 } 1598 req.stream_id = sid; 1599 queued += 1; 1600 conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS; 1601 } 1602 1603 batch_start = debug.timer(" meta: queue requests", batch_start); 1604 self.flushMetaClients(); 1605 1606 const loop = uv.uv_default_loop(); 1607 var all_done = false; 1608 var loops: usize = 0; 1609 const run_start: u64 = @intCast(std.time.nanoTimestamp()); 1610 1611 while (!all_done) { 1612 _ = uv.uv_run(loop, uv.RUN_ONCE); 1613 loops += 1; 1614 self.emitCompletedStreamingMetadataCallbacks(processed, allocator, &decompress_buf, callback, userdata); 1615 1616 all_done = self.metaRequestsComplete(); 1617 } 1618 1619 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start)); 1620 debug.log(" h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops }); 1621 1622 var slow_count: usize = 0; 1623 var max_req_ms: u64 = 0; 1624 var max_req_name: []const u8 = ""; 1625 var total_bytes: usize = 0; 1626 for (self.meta_clients) |maybe_client| { 1627 if (maybe_client) |c| { 1628 for (c.requests[0..c.request_count]) |*req| { 1629 const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata)); 1630 const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp())); 1631 const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000); 1632 total_bytes += req.response_body.items.len; 1633 if (duration_ms > max_req_ms) { 1634 max_req_ms = duration_ms; 1635 max_req_name = tracker.name; 1636 } 1637 if (duration_ms >= META_SLOW_LOG_MS) { 1638 slow_count += 1; 1639 debug.log(" meta: slow {s} {d}ms {d} bytes status={d}", .{ 1640 tracker.name, 1641 duration_ms, 1642 req.response_body.items.len, 1643 req.status_code, 1644 }); 1645 } 1646 } 1647 } 1648 } 1649 debug.log(" meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes }); 1650 1651 for (self.meta_clients) |maybe_client| { 1652 if (maybe_client) |c| c.resetRequests(); 1653 } 1654 1655 offset = end; 1656 batch_num += 1; 1657 } 1658 } 1659 1660 pub fn fetchTarball(self: *Fetcher, url: []const u8, handler: StreamHandler) !void { 1661 try self.pending.append(self.allocator, .{ .url = try self.allocator.dupe(u8, url), .handler = handler }); 1662 } 1663 1664 pub fn run(self: *Fetcher) !void { 1665 if (self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) return; 1666 1667 const run_start: u64 = @intCast(std.time.nanoTimestamp()); 1668 const total_requests = self.pending.items.len + self.tarball_contexts.items.len; 1669 1670 debug.log("fetcher: {d} tarballs to download (pending={d}, in-flight={d})", .{ 1671 total_requests, 1672 self.pending.items.len, 1673 self.tarball_contexts.items.len, 1674 }); 1675 1676 try self.ensureTarballClients(); 1677 self.finishTarballs(); 1678 1679 const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start)); 1680 debug.log("fetcher: {d} tarballs complete in {d}ms", .{ total_requests, elapsed_ns / 1_000_000 }); 1681 } 1682};