fuzzy find my records ken.waow.tech
embeddings pds search
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

add FIFO job queue for indexing — sequential processing with position feedback

replaces the previous model (one thread per job, all fighting over the
embedder mutex) with a single worker thread draining a FIFO queue.

- jobs run one at a time: CAR download + embed, then next job
- status response includes queue_position and queue_depth
- frontend shows "N jobs ahead of you" while waiting
- predictable throughput, honest wait times

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+102 -7
+5
backend/src/assets/main.js
··· 408 408 } else { 409 409 updateProgress({ fetched, embedded: 0, reused: 0, walking: true }); 410 410 } 411 + // show queue position if waiting behind other jobs 412 + const pos = j.queue_position; 413 + if (pos != null && pos > 0) { 414 + progressRate.textContent = `${pos} ${pos === 1 ? "job" : "jobs"} ahead of you`; 415 + } 411 416 } 412 417 } catch (e) { 413 418 stopProgress();
+10
backend/src/main.zig
··· 73 73 std.log.warn("EMBED_DEV_NOAUTH=1 — consent gate bypassed. DO NOT DEPLOY WITH THIS.", .{}); 74 74 } 75 75 76 + var job_queue: server.JobQueue = .{}; 77 + 76 78 var app = server.App{ 77 79 .io = io, 78 80 .allocator = allocator, 79 81 .embedder = &embedder, 80 82 .embedder_mutex = &embedder_mutex, 81 83 .cache = &cache, 84 + .job_queue = &job_queue, 82 85 .dev_noauth = dev_noauth, 83 86 }; 87 + 88 + // start the single job queue worker thread 89 + const worker = std.Thread.spawn(.{}, server.jobQueueWorker, .{&app}) catch |err| { 90 + std.log.err("failed to spawn job queue worker: {t}", .{err}); 91 + return err; 92 + }; 93 + worker.detach(); 84 94 85 95 var addr = try Io.net.IpAddress.parse("::", port); 86 96 var listener = addr.listen(io, .{ .reuse_address = true }) catch |err| {
+87 -7
backend/src/server.zig
··· 52 52 embedder: *Llama.Embedder, 53 53 embedder_mutex: *Io.Mutex, 54 54 cache: *indexer.Cache, 55 + job_queue: *JobQueue, 55 56 /// dev-only: skip the consent gate in handleIndex. set from 56 57 /// EMBED_DEV_NOAUTH=1 at startup. never true in prod. 57 58 dev_noauth: bool = false, 59 + }; 60 + 61 + /// FIFO job queue processed by a single worker thread. jobs run one at a 62 + /// time so embedding throughput is predictable and the frontend can show 63 + /// an accurate queue position ("3 jobs ahead of you"). 64 + pub const JobQueue = struct { 65 + mutex: Io.Mutex = .init, 66 + items: std.ArrayListUnmanaged(*indexer.Job) = .empty, 67 + /// DID of the job currently being processed (null if idle) 68 + current_did: ?[]const u8 = null, 69 + 70 + fn push(self: *JobQueue, io: Io, allocator: Allocator, job: *indexer.Job) !void { 71 + self.mutex.lockUncancelable(io); 72 + defer self.mutex.unlock(io); 73 + try self.items.append(allocator, job); 74 + } 75 + 76 + fn pop(self: *JobQueue, io: Io) ?*indexer.Job { 77 + self.mutex.lockUncancelable(io); 78 + defer self.mutex.unlock(io); 79 + if (self.items.items.len == 0) return null; 80 + return self.items.orderedRemove(0); 81 + } 82 + 83 + /// how many jobs are ahead of the given DID? returns null if the DID 84 + /// isn't in the queue or currently running. 85 + fn position(self: *JobQueue, io: Io, did: []const u8) ?u32 { 86 + self.mutex.lockUncancelable(io); 87 + defer self.mutex.unlock(io); 88 + 89 + // currently being processed = position 0 90 + if (self.current_did) |cur| { 91 + if (mem.eql(u8, cur, did)) return 0; 92 + } 93 + 94 + // search the queue 95 + for (self.items.items, 0..) |job, i| { 96 + if (mem.eql(u8, job.pack.did, did)) { 97 + // +1 because the currently-running job is ahead of everything in the queue 98 + return @intCast(if (self.current_did != null) i + 1 else i); 99 + } 100 + } 101 + return null; 102 + } 103 + 104 + /// total number of jobs waiting + the one currently running 105 + fn depth(self: *JobQueue, io: Io) u32 { 106 + self.mutex.lockUncancelable(io); 107 + defer self.mutex.unlock(io); 108 + var d: u32 = @intCast(self.items.items.len); 109 + if (self.current_did != null) d += 1; 110 + return d; 111 + } 58 112 }; 59 113 60 114 // ---------- connection handling ---------- ··· 582 636 .max_per_collection = 0, 583 637 }; 584 638 585 - const t = Thread.spawn(.{}, indexerWorker, .{job}) catch |err| { 586 - std.log.err("failed to spawn indexer thread: {t}", .{err}); 639 + app.job_queue.push(app.io, app.allocator, job) catch |err| { 640 + std.log.err("failed to enqueue indexer job: {t}", .{err}); 587 641 app.allocator.destroy(job); 588 - try sendJsonStatus(request, .internal_server_error, "{\"error\":\"failed to spawn worker\"}"); 642 + try sendJsonStatus(request, .internal_server_error, "{\"error\":\"failed to enqueue job\"}"); 589 643 return; 590 644 }; 591 - t.detach(); 592 645 593 646 try writeStatusResponse(request, app, pack); 594 647 } ··· 646 699 try kickoffIndexing(request, app, handle); 647 700 } 648 701 649 - fn indexerWorker(job: *indexer.Job) void { 650 - defer job.allocator.destroy(job); 651 - indexer.runJob(job); 702 + /// single worker thread that drains the job queue sequentially. started 703 + /// once at init and runs for the lifetime of the process. polls every 704 + /// 100ms when idle. 705 + pub fn jobQueueWorker(app: *App) void { 706 + while (true) { 707 + const job = app.job_queue.pop(app.io) orelse { 708 + // idle — sleep briefly then check again 709 + app.io.sleep(.{ .nanoseconds = 100_000_000 }, .real) catch {}; 710 + continue; 711 + }; 712 + { 713 + app.job_queue.mutex.lockUncancelable(app.io); 714 + app.job_queue.current_did = job.pack.did; 715 + app.job_queue.mutex.unlock(app.io); 716 + } 717 + 718 + indexer.runJob(job); 719 + 720 + { 721 + app.job_queue.mutex.lockUncancelable(app.io); 722 + app.job_queue.current_did = null; 723 + app.job_queue.mutex.unlock(app.io); 724 + } 725 + job.allocator.destroy(job); 726 + } 652 727 } 653 728 654 729 // ---------- route: /api/status/:handle ---------- ··· 703 778 try buf.print(alloc, "\"build_ms\":{d},", .{pack.build_ms}); 704 779 try buf.print(alloc, "\"prior_build_ms\":{d},", .{pack.prior_build_ms}); 705 780 try buf.print(alloc, "\"prior_count\":{d},", .{pack.prior_count}); 781 + // queue position for this job (null if not queued / already done) 782 + if (app.job_queue.position(app.io, pack.did)) |pos| { 783 + try buf.print(alloc, "\"queue_position\":{d},", .{pos}); 784 + } 785 + try buf.print(alloc, "\"queue_depth\":{d},", .{app.job_queue.depth(app.io)}); 706 786 if (pack.persisted_uri) |uri| { 707 787 try buf.appendSlice(alloc, "\"persisted\":true,\"persisted_uri\":"); 708 788 try writeJsonString(&buf, alloc, uri);