this repo has no description
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4
5/// Thread safe. Fixed size. Blocking push and pop.
6pub fn Queue(
7 comptime T: type,
8 comptime size: usize,
9) type {
10 return struct {
11 buf: [size]T = undefined,
12
13 read_index: usize = 0,
14 write_index: usize = 0,
15
16 io: std.Io,
17 mutex: std.Io.Mutex = .init,
18 // blocks when the buffer is full
19 not_full: std.Io.Condition = .init,
20 // ...or empty
21 not_empty: std.Io.Condition = .init,
22
23 const Self = @This();
24
25 pub fn init(io: std.Io) Self {
26 return .{ .io = io };
27 }
28
29 /// Pop an item from the queue. Blocks until an item is available.
30 pub fn pop(self: *Self) !T {
31 try self.mutex.lock(self.io);
32 defer self.mutex.unlock(self.io);
33 while (self.isEmptyLH()) {
34 try self.not_empty.wait(self.io, &self.mutex);
35 }
36 std.debug.assert(!self.isEmptyLH());
37 return self.popAndSignalLH();
38 }
39
40 /// Push an item into the queue. Blocks until an item has been
41 /// put in the queue.
42 pub fn push(self: *Self, item: T) !void {
43 try self.mutex.lock(self.io);
44 defer self.mutex.unlock(self.io);
45 while (self.isFullLH()) {
46 try self.not_full.wait(self.io, &self.mutex);
47 }
48 std.debug.assert(!self.isFullLH());
49 self.pushAndSignalLH(item);
50 }
51
52 /// Push an item into the queue. Returns true when the item
53 /// was successfully placed in the queue, false if the queue
54 /// was full.
55 pub fn tryPush(self: *Self, item: T) !bool {
56 try self.mutex.lock(self.io);
57 defer self.mutex.unlock(self.io);
58 if (self.isFullLH()) return false;
59 self.pushAndSignalLH(item);
60 return true;
61 }
62
63 /// Pop an item from the queue. Returns null when no item is
64 /// available.
65 pub fn tryPop(self: *Self) !?T {
66 try self.mutex.lock(self.io);
67 defer self.mutex.unlock(self.io);
68 if (self.isEmptyLH()) return null;
69 return self.popAndSignalLH();
70 }
71
72 /// Poll the queue. This call blocks until events are in the queue
73 pub fn poll(self: *Self) !void {
74 try self.mutex.lock(self.io);
75 defer self.mutex.unlock(self.io);
76 while (self.isEmptyLH()) {
77 try self.not_empty.wait(self.io, &self.mutex);
78 }
79 std.debug.assert(!self.isEmptyLH());
80 }
81
82 pub fn lock(self: *Self) !void {
83 try self.mutex.lock(self.io);
84 }
85
86 pub fn unlock(self: *Self) void {
87 self.mutex.unlock(self.io);
88 }
89
90 /// Used to efficiently drain the queue while the lock is externally held
91 pub fn drain(self: *Self) ?T {
92 if (self.isEmptyLH()) return null;
93 // Preserve queue push wakeups when draining under external lock.
94 // If the queue was full before this pop, a producer may be blocked
95 // waiting on not_full.
96 const was_full = self.isFullLH();
97 const item = self.popLH();
98 if (was_full) {
99 self.not_full.signal(self.io);
100 }
101 return item;
102 }
103
104 fn isEmptyLH(self: Self) bool {
105 return self.write_index == self.read_index;
106 }
107
108 fn isFullLH(self: Self) bool {
109 return self.mask2(self.write_index + self.buf.len) ==
110 self.read_index;
111 }
112
113 /// Returns `true` if the queue is empty and `false` otherwise.
114 pub fn isEmpty(self: *Self) !bool {
115 try self.mutex.lock(self.io);
116 defer self.mutex.unlock(self.io);
117 return self.isEmptyLH();
118 }
119
120 /// Returns `true` if the queue is full and `false` otherwise.
121 pub fn isFull(self: *Self) !bool {
122 try self.mutex.lock(self.io);
123 defer self.mutex.unlock(self.io);
124 return self.isFullLH();
125 }
126
127 /// Returns the length
128 fn len(self: Self) usize {
129 const wrap_offset = 2 * self.buf.len *
130 @intFromBool(self.write_index < self.read_index);
131 const adjusted_write_index = self.write_index + wrap_offset;
132 return adjusted_write_index - self.read_index;
133 }
134
135 /// Returns `index` modulo the length of the backing slice.
136 fn mask(self: Self, index: usize) usize {
137 return index % self.buf.len;
138 }
139
140 /// Returns `index` modulo twice the length of the backing slice.
141 fn mask2(self: Self, index: usize) usize {
142 return index % (2 * self.buf.len);
143 }
144
145 fn pushAndSignalLH(self: *Self, item: T) void {
146 const was_empty = self.isEmptyLH();
147 self.buf[self.mask(self.write_index)] = item;
148 self.write_index = self.mask2(self.write_index + 1);
149 if (was_empty) {
150 self.not_empty.signal(self.io);
151 }
152 }
153
154 fn popAndSignalLH(self: *Self) T {
155 const was_full = self.isFullLH();
156 const result = self.popLH();
157 if (was_full) {
158 self.not_full.signal(self.io);
159 }
160 return result;
161 }
162
163 fn popLH(self: *Self) T {
164 const result = self.buf[self.mask(self.read_index)];
165 self.read_index = self.mask2(self.read_index + 1);
166 return result;
167 }
168 };
169}
170
171const testing = std.testing;
172const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
173test "Queue: simple push / pop" {
174 const io = std.testing.io;
175 var queue: Queue(u8, 16) = .init(io);
176 try queue.push(1);
177 try queue.push(2);
178 const pop = try queue.pop();
179 try testing.expectEqual(1, pop);
180 try testing.expectEqual(2, try queue.pop());
181}
182
183const Thread = std.Thread;
184fn testPushPop(q: *Queue(u8, 2)) !void {
185 try q.push(3);
186 try testing.expectEqual(2, try q.pop());
187}
188
189test "Fill, wait to push, pop once in another thread" {
190 const io = std.testing.io;
191 var queue: Queue(u8, 2) = .init(io);
192 try queue.push(1);
193 try queue.push(2);
194 var t = try io.concurrent(testPushPop, .{&queue});
195 try testing.expectEqual(false, try queue.tryPush(3));
196 try testing.expectEqual(1, try queue.pop());
197 try t.await(io);
198 try testing.expectEqual(3, try queue.pop());
199 try testing.expectEqual(null, try queue.tryPop());
200}
201
202fn testPush(q: *Queue(u8, 2)) !void {
203 try q.push(0);
204 try q.push(1);
205 try q.push(2);
206 try q.push(3);
207 try q.push(4);
208}
209
210test "Try to pop, fill from another thread" {
211 const io = std.testing.io;
212 var queue: Queue(u8, 2) = .init(io);
213 var task = try io.concurrent(testPush, .{&queue});
214 defer task.cancel(io) catch {};
215 for (0..5) |idx| {
216 try testing.expectEqual(@as(u8, @intCast(idx)), try queue.pop());
217 }
218 try task.await(io);
219}
220
221fn sleepyPop(io: std.Io, q: *Queue(u8, 2), state: *atomic.Value(u8)) !void {
222 // First we wait for the queue to be full.
223 while (state.load(.acquire) < 1)
224 try Thread.yield();
225
226 // Then we spuriously wake it up, because that's a thing that can
227 // happen.
228 q.not_full.signal(io);
229 q.not_empty.signal(io);
230
231 // Then give the other thread a good chance of waking up. It's not
232 // clear that yield guarantees the other thread will be scheduled,
233 // so we'll throw a sleep in here just to be sure. The queue is
234 // still full and the push in the other thread is still blocked
235 // waiting for space.
236 try Thread.yield();
237 try io.sleep(.fromMilliseconds(10), .real);
238 // Finally, let that other thread go.
239 try std.testing.expectEqual(1, q.pop());
240
241 // Wait for the other thread to signal it's ready for second push
242 while (state.load(.acquire) < 2)
243 try Thread.yield();
244 // But we want to ensure that there's a second push waiting, so
245 // here's another sleep.
246 try io.sleep(.fromMilliseconds(10), .real);
247
248 // Another spurious wake...
249 q.not_full.signal(io);
250 q.not_empty.signal(io);
251 // And another chance for the other thread to see that it's
252 // spurious and go back to sleep.
253 try Thread.yield();
254 try io.sleep(.fromMilliseconds(10), .real);
255
256 // Pop that thing and we're done.
257 try std.testing.expectEqual(2, q.pop());
258}
259
260test "Fill, block, fill, block" {
261 const io = std.testing.io;
262
263 // Fill the queue, block while trying to write another item, have
264 // a background thread unblock us, then block while trying to
265 // write yet another thing. Have the background thread unblock
266 // that too (after some time) then drain the queue. This test
267 // fails if the while loop in `push` is turned into an `if`.
268
269 var queue: Queue(u8, 2) = .init(io);
270 var state = atomic.Value(u8).init(0);
271 var task = try io.concurrent(sleepyPop, .{ io, &queue, &state });
272 try queue.push(1);
273 try queue.push(2);
274 state.store(1, .release);
275 const now = std.Io.Timestamp.now(io, .real).toMilliseconds();
276 try queue.push(3); // This one should block.
277 const then = std.Io.Timestamp.now(io, .real).toMilliseconds();
278
279 // Just to make sure the sleeps are yielding to this thread, make
280 // sure it took at least 5ms to do the push.
281 try std.testing.expect(then - now > 5);
282
283 state.store(2, .release);
284 // This should block again, waiting for the other thread.
285 try queue.push(4);
286
287 // And once that push has gone through, the other thread's done.
288 try task.await(io);
289 try std.testing.expectEqual(3, try queue.pop());
290 try std.testing.expectEqual(4, try queue.pop());
291}
292
293fn sleepyPush(io: std.Io, q: *Queue(u8, 1), state: *atomic.Value(u8)) !void {
294 // Try to ensure the other thread has already started trying to pop.
295 try Thread.yield();
296 try io.sleep(.fromMilliseconds(10), .real);
297
298 // Spurious wake
299 q.not_full.signal(io);
300 q.not_empty.signal(io);
301
302 try Thread.yield();
303 try io.sleep(.fromMilliseconds(10), .real);
304
305 // Stick something in the queue so it can be popped.
306 try q.push(1);
307 // Ensure it's been popped.
308 while (state.load(.acquire) < 1)
309 try Thread.yield();
310 // Give the other thread time to block again.
311 try Thread.yield();
312 try io.sleep(.fromMilliseconds(10), .real);
313
314 // Spurious wake
315 q.not_full.signal(io);
316 q.not_empty.signal(io);
317
318 try q.push(2);
319}
320
321test "Drain, block, drain, block" {
322 const io = std.testing.io;
323
324 // This is like fill/block/fill/block, but on the pop end. This
325 // test should fail if the `while` loop in `pop` is turned into an
326 // `if`.
327
328 var queue: Queue(u8, 1) = .init(io);
329 var state = atomic.Value(u8).init(0);
330 var task = try io.concurrent(sleepyPush, .{ io, &queue, &state });
331 try std.testing.expectEqual(1, queue.pop());
332 state.store(1, .release);
333 try std.testing.expectEqual(2, queue.pop());
334 try task.await(io);
335}
336
337fn readerThread(q: *Queue(u8, 1)) !void {
338 try testing.expectEqual(1, try q.pop());
339}
340
341test "2 readers" {
342 const io = std.testing.io;
343 // 2 threads read, one thread writes
344 var queue: Queue(u8, 1) = .init(io);
345 var t1 = try io.concurrent(readerThread, .{&queue});
346 defer t1.cancel(io) catch {};
347 var t2 = try io.concurrent(readerThread, .{&queue});
348 defer t2.cancel(io) catch {};
349 try Thread.yield();
350 try io.sleep(.fromMilliseconds(10), .real);
351 try queue.push(1);
352 try queue.push(1);
353 try t1.await(io);
354 try t2.await(io);
355}
356
357fn writerThread(q: *Queue(u8, 1)) !void {
358 try q.push(1);
359}
360
361test "2 writers" {
362 const io = std.testing.io;
363
364 var queue: Queue(u8, 1) = .init(io);
365 var t1 = try io.concurrent(writerThread, .{&queue});
366 var t2 = try io.concurrent(writerThread, .{&queue});
367
368 try testing.expectEqual(1, try queue.pop());
369 try testing.expectEqual(1, try queue.pop());
370 try t1.await(io);
371 try t2.await(io);
372}
373
374test {
375 std.testing.refAllDecls(@This());
376}