this repo has no description
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6/// Thread safe. Fixed size. Blocking push and pop.
7pub fn Queue(
8 comptime T: type,
9 comptime size: usize,
10) type {
11 return struct {
12 buf: [size]T = undefined,
13
14 read_index: usize = 0,
15 write_index: usize = 0,
16
17 mutex: std.Thread.Mutex = .{},
18 // blocks when the buffer is full
19 not_full: Condition = .{},
20 // ...or empty
21 not_empty: Condition = .{},
22
23 const Self = @This();
24
25 /// Pop an item from the queue. Blocks until an item is available.
26 pub fn pop(self: *Self) T {
27 self.mutex.lock();
28 defer self.mutex.unlock();
29 while (self.isEmptyLH()) {
30 self.not_empty.wait(&self.mutex);
31 }
32 std.debug.assert(!self.isEmptyLH());
33 if (self.isFullLH()) {
34 // If we are full, wake up a push that might be
35 // waiting here.
36 self.not_full.signal();
37 }
38
39 return self.popLH();
40 }
41
42 /// Push an item into the queue. Blocks until an item has been
43 /// put in the queue.
44 pub fn push(self: *Self, item: T) void {
45 self.mutex.lock();
46 defer self.mutex.unlock();
47 while (self.isFullLH()) {
48 self.not_full.wait(&self.mutex);
49 }
50 std.debug.assert(!self.isFullLH());
51 const was_empty = self.isEmptyLH();
52
53 self.buf[self.mask(self.write_index)] = item;
54 self.write_index = self.mask2(self.write_index + 1);
55
56 // If we were empty, wake up a pop if it was waiting.
57 if (was_empty) {
58 self.not_empty.signal();
59 }
60 }
61
62 /// Push an item into the queue. Returns true when the item
63 /// was successfully placed in the queue, false if the queue
64 /// was full.
65 pub fn tryPush(self: *Self, item: T) bool {
66 self.mutex.lock();
67 if (self.isFullLH()) {
68 self.mutex.unlock();
69 return false;
70 }
71 self.mutex.unlock();
72 self.push(item);
73 return true;
74 }
75
76 /// Pop an item from the queue. Returns null when no item is
77 /// available.
78 pub fn tryPop(self: *Self) ?T {
79 self.mutex.lock();
80 if (self.isEmptyLH()) {
81 self.mutex.unlock();
82 return null;
83 }
84 self.mutex.unlock();
85 return self.pop();
86 }
87
88 /// Poll the queue. This call blocks until events are in the queue
89 pub fn poll(self: *Self) void {
90 self.mutex.lock();
91 defer self.mutex.unlock();
92 while (self.isEmptyLH()) {
93 self.not_empty.wait(&self.mutex);
94 }
95 std.debug.assert(!self.isEmptyLH());
96 }
97
98 pub fn lock(self: *Self) void {
99 self.mutex.lock();
100 }
101
102 pub fn unlock(self: *Self) void {
103 self.mutex.unlock();
104 }
105
106 /// Used to efficiently drain the queue while the lock is externally held
107 pub fn drain(self: *Self) ?T {
108 if (self.isEmptyLH()) return null;
109 return self.popLH();
110 }
111
112 fn isEmptyLH(self: Self) bool {
113 return self.write_index == self.read_index;
114 }
115
116 fn isFullLH(self: Self) bool {
117 return self.mask2(self.write_index + self.buf.len) ==
118 self.read_index;
119 }
120
121 /// Returns `true` if the queue is empty and `false` otherwise.
122 pub fn isEmpty(self: *Self) bool {
123 self.mutex.lock();
124 defer self.mutex.unlock();
125 return self.isEmptyLH();
126 }
127
128 /// Returns `true` if the queue is full and `false` otherwise.
129 pub fn isFull(self: *Self) bool {
130 self.mutex.lock();
131 defer self.mutex.unlock();
132 return self.isFullLH();
133 }
134
135 /// Returns the length
136 fn len(self: Self) usize {
137 const wrap_offset = 2 * self.buf.len *
138 @intFromBool(self.write_index < self.read_index);
139 const adjusted_write_index = self.write_index + wrap_offset;
140 return adjusted_write_index - self.read_index;
141 }
142
143 /// Returns `index` modulo the length of the backing slice.
144 fn mask(self: Self, index: usize) usize {
145 return index % self.buf.len;
146 }
147
148 /// Returns `index` modulo twice the length of the backing slice.
149 fn mask2(self: Self, index: usize) usize {
150 return index % (2 * self.buf.len);
151 }
152
153 fn popLH(self: *Self) T {
154 const result = self.buf[self.mask(self.read_index)];
155 self.read_index = self.mask2(self.read_index + 1);
156 return result;
157 }
158 };
159}
160
161const testing = std.testing;
162const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
163test "Queue: simple push / pop" {
164 var queue: Queue(u8, 16) = .{};
165 queue.push(1);
166 queue.push(2);
167 const pop = queue.pop();
168 try testing.expectEqual(1, pop);
169 try testing.expectEqual(2, queue.pop());
170}
171
172const Thread = std.Thread;
173fn testPushPop(q: *Queue(u8, 2)) !void {
174 q.push(3);
175 try testing.expectEqual(2, q.pop());
176}
177
178test "Fill, wait to push, pop once in another thread" {
179 var queue: Queue(u8, 2) = .{};
180 queue.push(1);
181 queue.push(2);
182 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
183 try testing.expectEqual(false, queue.tryPush(3));
184 try testing.expectEqual(1, queue.pop());
185 t.join();
186 try testing.expectEqual(3, queue.pop());
187 try testing.expectEqual(null, queue.tryPop());
188}
189
190fn testPush(q: *Queue(u8, 2)) void {
191 q.push(0);
192 q.push(1);
193 q.push(2);
194 q.push(3);
195 q.push(4);
196}
197
198test "Try to pop, fill from another thread" {
199 var queue: Queue(u8, 2) = .{};
200 const thread = try Thread.spawn(cfg, testPush, .{&queue});
201 for (0..5) |idx| {
202 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
203 }
204 thread.join();
205}
206
207fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void {
208 // First we wait for the queue to be full.
209 while (state.load(.acquire) < 1)
210 try Thread.yield();
211
212 // Then we spuriously wake it up, because that's a thing that can
213 // happen.
214 q.not_full.signal();
215 q.not_empty.signal();
216
217 // Then give the other thread a good chance of waking up. It's not
218 // clear that yield guarantees the other thread will be scheduled,
219 // so we'll throw a sleep in here just to be sure. The queue is
220 // still full and the push in the other thread is still blocked
221 // waiting for space.
222 try Thread.yield();
223 std.Thread.sleep(10 * std.time.ns_per_ms);
224 // Finally, let that other thread go.
225 try std.testing.expectEqual(1, q.pop());
226
227 // Wait for the other thread to signal it's ready for second push
228 while (state.load(.acquire) < 2)
229 try Thread.yield();
230 // But we want to ensure that there's a second push waiting, so
231 // here's another sleep.
232 std.Thread.sleep(10 * std.time.ns_per_ms);
233
234 // Another spurious wake...
235 q.not_full.signal();
236 q.not_empty.signal();
237 // And another chance for the other thread to see that it's
238 // spurious and go back to sleep.
239 try Thread.yield();
240 std.Thread.sleep(10 * std.time.ns_per_ms);
241
242 // Pop that thing and we're done.
243 try std.testing.expectEqual(2, q.pop());
244}
245
246test "Fill, block, fill, block" {
247 // Fill the queue, block while trying to write another item, have
248 // a background thread unblock us, then block while trying to
249 // write yet another thing. Have the background thread unblock
250 // that too (after some time) then drain the queue. This test
251 // fails if the while loop in `push` is turned into an `if`.
252
253 var queue: Queue(u8, 2) = .{};
254 var state = atomic.Value(u8).init(0);
255 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state });
256 queue.push(1);
257 queue.push(2);
258 state.store(1, .release);
259 const now = std.time.milliTimestamp();
260 queue.push(3); // This one should block.
261 const then = std.time.milliTimestamp();
262
263 // Just to make sure the sleeps are yielding to this thread, make
264 // sure it took at least 5ms to do the push.
265 try std.testing.expect(then - now > 5);
266
267 state.store(2, .release);
268 // This should block again, waiting for the other thread.
269 queue.push(4);
270
271 // And once that push has gone through, the other thread's done.
272 thread.join();
273 try std.testing.expectEqual(3, queue.pop());
274 try std.testing.expectEqual(4, queue.pop());
275}
276
277fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void {
278 // Try to ensure the other thread has already started trying to pop.
279 try Thread.yield();
280 std.Thread.sleep(10 * std.time.ns_per_ms);
281
282 // Spurious wake
283 q.not_full.signal();
284 q.not_empty.signal();
285
286 try Thread.yield();
287 std.Thread.sleep(10 * std.time.ns_per_ms);
288
289 // Stick something in the queue so it can be popped.
290 q.push(1);
291 // Ensure it's been popped.
292 while (state.load(.acquire) < 1)
293 try Thread.yield();
294 // Give the other thread time to block again.
295 try Thread.yield();
296 std.Thread.sleep(10 * std.time.ns_per_ms);
297
298 // Spurious wake
299 q.not_full.signal();
300 q.not_empty.signal();
301
302 q.push(2);
303}
304
305test "Drain, block, drain, block" {
306 // This is like fill/block/fill/block, but on the pop end. This
307 // test should fail if the `while` loop in `pop` is turned into an
308 // `if`.
309
310 var queue: Queue(u8, 1) = .{};
311 var state = atomic.Value(u8).init(0);
312 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state });
313 try std.testing.expectEqual(1, queue.pop());
314 state.store(1, .release);
315 try std.testing.expectEqual(2, queue.pop());
316 thread.join();
317}
318
319fn readerThread(q: *Queue(u8, 1)) !void {
320 try testing.expectEqual(1, q.pop());
321}
322
323test "2 readers" {
324 // 2 threads read, one thread writes
325 var queue: Queue(u8, 1) = .{};
326 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
327 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
328 try Thread.yield();
329 std.Thread.sleep(10 * std.time.ns_per_ms);
330 queue.push(1);
331 queue.push(1);
332 t1.join();
333 t2.join();
334}
335
336fn writerThread(q: *Queue(u8, 1)) !void {
337 q.push(1);
338}
339
340test "2 writers" {
341 var queue: Queue(u8, 1) = .{};
342 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
343 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
344
345 try testing.expectEqual(1, queue.pop());
346 try testing.expectEqual(1, queue.pop());
347 t1.join();
348 t2.join();
349}