this repo has no description
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6/// Thread safe. Fixed size. Blocking push and pop.
7pub fn Queue(
8 comptime T: type,
9 comptime size: usize,
10) type {
11 return struct {
12 buf: [size]T = undefined,
13
14 read_index: usize = 0,
15 write_index: usize = 0,
16
17 mutex: std.Thread.Mutex = .{},
18 // blocks when the buffer is full
19 not_full: Condition = .{},
20 // ...or empty
21 not_empty: Condition = .{},
22
23 const Self = @This();
24
25 /// Pop an item from the queue. Blocks until an item is available.
26 pub fn pop(self: *Self) T {
27 self.mutex.lock();
28 defer self.mutex.unlock();
29 while (self.isEmptyLH()) {
30 self.not_empty.wait(&self.mutex);
31 }
32 std.debug.assert(!self.isEmptyLH());
33 return self.popAndSignalLH();
34 }
35
36 /// Push an item into the queue. Blocks until an item has been
37 /// put in the queue.
38 pub fn push(self: *Self, item: T) void {
39 self.mutex.lock();
40 defer self.mutex.unlock();
41 while (self.isFullLH()) {
42 self.not_full.wait(&self.mutex);
43 }
44 std.debug.assert(!self.isFullLH());
45 self.pushAndSignalLH(item);
46 }
47
48 /// Push an item into the queue. Returns true when the item
49 /// was successfully placed in the queue, false if the queue
50 /// was full.
51 pub fn tryPush(self: *Self, item: T) bool {
52 self.mutex.lock();
53 defer self.mutex.unlock();
54 if (self.isFullLH()) return false;
55 self.pushAndSignalLH(item);
56 return true;
57 }
58
59 /// Pop an item from the queue. Returns null when no item is
60 /// available.
61 pub fn tryPop(self: *Self) ?T {
62 self.mutex.lock();
63 defer self.mutex.unlock();
64 if (self.isEmptyLH()) return null;
65 return self.popAndSignalLH();
66 }
67
68 /// Poll the queue. This call blocks until events are in the queue
69 pub fn poll(self: *Self) void {
70 self.mutex.lock();
71 defer self.mutex.unlock();
72 while (self.isEmptyLH()) {
73 self.not_empty.wait(&self.mutex);
74 }
75 std.debug.assert(!self.isEmptyLH());
76 }
77
78 pub fn lock(self: *Self) void {
79 self.mutex.lock();
80 }
81
82 pub fn unlock(self: *Self) void {
83 self.mutex.unlock();
84 }
85
86 /// Used to efficiently drain the queue while the lock is externally held
87 pub fn drain(self: *Self) ?T {
88 if (self.isEmptyLH()) return null;
89 // Preserve queue push wakeups when draining under external lock.
90 // If the queue was full before this pop, a producer may be blocked
91 // waiting on not_full.
92 const was_full = self.isFullLH();
93 const item = self.popLH();
94 if (was_full) {
95 self.not_full.signal();
96 }
97 return item;
98 }
99
100 fn isEmptyLH(self: Self) bool {
101 return self.write_index == self.read_index;
102 }
103
104 fn isFullLH(self: Self) bool {
105 return self.mask2(self.write_index + self.buf.len) ==
106 self.read_index;
107 }
108
109 /// Returns `true` if the queue is empty and `false` otherwise.
110 pub fn isEmpty(self: *Self) bool {
111 self.mutex.lock();
112 defer self.mutex.unlock();
113 return self.isEmptyLH();
114 }
115
116 /// Returns `true` if the queue is full and `false` otherwise.
117 pub fn isFull(self: *Self) bool {
118 self.mutex.lock();
119 defer self.mutex.unlock();
120 return self.isFullLH();
121 }
122
123 /// Returns the length
124 fn len(self: Self) usize {
125 const wrap_offset = 2 * self.buf.len *
126 @intFromBool(self.write_index < self.read_index);
127 const adjusted_write_index = self.write_index + wrap_offset;
128 return adjusted_write_index - self.read_index;
129 }
130
131 /// Returns `index` modulo the length of the backing slice.
132 fn mask(self: Self, index: usize) usize {
133 return index % self.buf.len;
134 }
135
136 /// Returns `index` modulo twice the length of the backing slice.
137 fn mask2(self: Self, index: usize) usize {
138 return index % (2 * self.buf.len);
139 }
140
141 fn pushAndSignalLH(self: *Self, item: T) void {
142 const was_empty = self.isEmptyLH();
143 self.buf[self.mask(self.write_index)] = item;
144 self.write_index = self.mask2(self.write_index + 1);
145 if (was_empty) {
146 self.not_empty.signal();
147 }
148 }
149
150 fn popAndSignalLH(self: *Self) T {
151 const was_full = self.isFullLH();
152 const result = self.popLH();
153 if (was_full) {
154 self.not_full.signal();
155 }
156 return result;
157 }
158
159 fn popLH(self: *Self) T {
160 const result = self.buf[self.mask(self.read_index)];
161 self.read_index = self.mask2(self.read_index + 1);
162 return result;
163 }
164 };
165}
166
167const testing = std.testing;
168const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
169test "Queue: simple push / pop" {
170 var queue: Queue(u8, 16) = .{};
171 queue.push(1);
172 queue.push(2);
173 const pop = queue.pop();
174 try testing.expectEqual(1, pop);
175 try testing.expectEqual(2, queue.pop());
176}
177
178const Thread = std.Thread;
179fn testPushPop(q: *Queue(u8, 2)) !void {
180 q.push(3);
181 try testing.expectEqual(2, q.pop());
182}
183
184test "Fill, wait to push, pop once in another thread" {
185 var queue: Queue(u8, 2) = .{};
186 queue.push(1);
187 queue.push(2);
188 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
189 try testing.expectEqual(false, queue.tryPush(3));
190 try testing.expectEqual(1, queue.pop());
191 t.join();
192 try testing.expectEqual(3, queue.pop());
193 try testing.expectEqual(null, queue.tryPop());
194}
195
196fn testPush(q: *Queue(u8, 2)) void {
197 q.push(0);
198 q.push(1);
199 q.push(2);
200 q.push(3);
201 q.push(4);
202}
203
204test "Try to pop, fill from another thread" {
205 var queue: Queue(u8, 2) = .{};
206 const thread = try Thread.spawn(cfg, testPush, .{&queue});
207 for (0..5) |idx| {
208 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
209 }
210 thread.join();
211}
212
213fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void {
214 // First we wait for the queue to be full.
215 while (state.load(.acquire) < 1)
216 try Thread.yield();
217
218 // Then we spuriously wake it up, because that's a thing that can
219 // happen.
220 q.not_full.signal();
221 q.not_empty.signal();
222
223 // Then give the other thread a good chance of waking up. It's not
224 // clear that yield guarantees the other thread will be scheduled,
225 // so we'll throw a sleep in here just to be sure. The queue is
226 // still full and the push in the other thread is still blocked
227 // waiting for space.
228 try Thread.yield();
229 std.Thread.sleep(10 * std.time.ns_per_ms);
230 // Finally, let that other thread go.
231 try std.testing.expectEqual(1, q.pop());
232
233 // Wait for the other thread to signal it's ready for second push
234 while (state.load(.acquire) < 2)
235 try Thread.yield();
236 // But we want to ensure that there's a second push waiting, so
237 // here's another sleep.
238 std.Thread.sleep(10 * std.time.ns_per_ms);
239
240 // Another spurious wake...
241 q.not_full.signal();
242 q.not_empty.signal();
243 // And another chance for the other thread to see that it's
244 // spurious and go back to sleep.
245 try Thread.yield();
246 std.Thread.sleep(10 * std.time.ns_per_ms);
247
248 // Pop that thing and we're done.
249 try std.testing.expectEqual(2, q.pop());
250}
251
252test "Fill, block, fill, block" {
253 // Fill the queue, block while trying to write another item, have
254 // a background thread unblock us, then block while trying to
255 // write yet another thing. Have the background thread unblock
256 // that too (after some time) then drain the queue. This test
257 // fails if the while loop in `push` is turned into an `if`.
258
259 var queue: Queue(u8, 2) = .{};
260 var state = atomic.Value(u8).init(0);
261 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state });
262 queue.push(1);
263 queue.push(2);
264 state.store(1, .release);
265 const now = std.time.milliTimestamp();
266 queue.push(3); // This one should block.
267 const then = std.time.milliTimestamp();
268
269 // Just to make sure the sleeps are yielding to this thread, make
270 // sure it took at least 5ms to do the push.
271 try std.testing.expect(then - now > 5);
272
273 state.store(2, .release);
274 // This should block again, waiting for the other thread.
275 queue.push(4);
276
277 // And once that push has gone through, the other thread's done.
278 thread.join();
279 try std.testing.expectEqual(3, queue.pop());
280 try std.testing.expectEqual(4, queue.pop());
281}
282
283fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void {
284 // Try to ensure the other thread has already started trying to pop.
285 try Thread.yield();
286 std.Thread.sleep(10 * std.time.ns_per_ms);
287
288 // Spurious wake
289 q.not_full.signal();
290 q.not_empty.signal();
291
292 try Thread.yield();
293 std.Thread.sleep(10 * std.time.ns_per_ms);
294
295 // Stick something in the queue so it can be popped.
296 q.push(1);
297 // Ensure it's been popped.
298 while (state.load(.acquire) < 1)
299 try Thread.yield();
300 // Give the other thread time to block again.
301 try Thread.yield();
302 std.Thread.sleep(10 * std.time.ns_per_ms);
303
304 // Spurious wake
305 q.not_full.signal();
306 q.not_empty.signal();
307
308 q.push(2);
309}
310
311test "Drain, block, drain, block" {
312 // This is like fill/block/fill/block, but on the pop end. This
313 // test should fail if the `while` loop in `pop` is turned into an
314 // `if`.
315
316 var queue: Queue(u8, 1) = .{};
317 var state = atomic.Value(u8).init(0);
318 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state });
319 try std.testing.expectEqual(1, queue.pop());
320 state.store(1, .release);
321 try std.testing.expectEqual(2, queue.pop());
322 thread.join();
323}
324
325fn readerThread(q: *Queue(u8, 1)) !void {
326 try testing.expectEqual(1, q.pop());
327}
328
329test "2 readers" {
330 // 2 threads read, one thread writes
331 var queue: Queue(u8, 1) = .{};
332 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
333 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
334 try Thread.yield();
335 std.Thread.sleep(10 * std.time.ns_per_ms);
336 queue.push(1);
337 queue.push(1);
338 t1.join();
339 t2.join();
340}
341
342fn writerThread(q: *Queue(u8, 1)) !void {
343 q.push(1);
344}
345
346test "2 writers" {
347 var queue: Queue(u8, 1) = .{};
348 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
349 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
350
351 try testing.expectEqual(1, queue.pop());
352 try testing.expectEqual(1, queue.pop());
353 t1.join();
354 t2.join();
355}