this repo has no description
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6/// Thread safe. Fixed size. Blocking push and pop.
7pub fn Queue(
8 comptime T: type,
9 comptime size: usize,
10) type {
11 return struct {
12 buf: [size]T = undefined,
13
14 read_index: usize = 0,
15 write_index: usize = 0,
16
17 mutex: std.Thread.Mutex = .{},
18 // blocks when the buffer is full
19 not_full: Condition = .{},
20 // ...or empty
21 not_empty: Condition = .{},
22
23 const Self = @This();
24
25 /// Pop an item from the queue. Blocks until an item is available.
26 pub fn pop(self: *Self) T {
27 self.mutex.lock();
28 defer self.mutex.unlock();
29 while (self.isEmptyLH()) {
30 self.not_empty.wait(&self.mutex);
31 }
32 std.debug.assert(!self.isEmptyLH());
33 if (self.isFullLH()) {
34 // If we are full, wake up a push that might be
35 // waiting here.
36 self.not_full.signal();
37 }
38
39 return self.popLH();
40 }
41
42 /// Push an item into the queue. Blocks until an item has been
43 /// put in the queue.
44 pub fn push(self: *Self, item: T) void {
45 self.mutex.lock();
46 defer self.mutex.unlock();
47 while (self.isFullLH()) {
48 self.not_full.wait(&self.mutex);
49 }
50 std.debug.assert(!self.isFullLH());
51 const was_empty = self.isEmptyLH();
52
53 self.buf[self.mask(self.write_index)] = item;
54 self.write_index = self.mask2(self.write_index + 1);
55
56 // If we were empty, wake up a pop if it was waiting.
57 if (was_empty) {
58 self.not_empty.signal();
59 }
60 }
61
62 /// Push an item into the queue. Returns true when the item
63 /// was successfully placed in the queue, false if the queue
64 /// was full.
65 pub fn tryPush(self: *Self, item: T) bool {
66 self.mutex.lock();
67 if (self.isFullLH()) {
68 self.mutex.unlock();
69 return false;
70 }
71 self.mutex.unlock();
72 self.push(item);
73 return true;
74 }
75
76 /// Pop an item from the queue. Returns null when no item is
77 /// available.
78 pub fn tryPop(self: *Self) ?T {
79 self.mutex.lock();
80 if (self.isEmptyLH()) {
81 self.mutex.unlock();
82 return null;
83 }
84 self.mutex.unlock();
85 return self.pop();
86 }
87
88 /// Poll the queue. This call blocks until events are in the queue
89 pub fn poll(self: *Self) void {
90 self.mutex.lock();
91 defer self.mutex.unlock();
92 while (self.isEmptyLH()) {
93 self.not_empty.wait(&self.mutex);
94 }
95 std.debug.assert(!self.isEmptyLH());
96 }
97
98 pub fn lock(self: *Self) void {
99 self.mutex.lock();
100 }
101
102 pub fn unlock(self: *Self) void {
103 self.mutex.unlock();
104 }
105
106 /// Used to efficiently drain the queue while the lock is externally held
107 pub fn drain(self: *Self) ?T {
108 if (self.isEmptyLH()) return null;
109 return self.popLH();
110 }
111
112 fn isEmptyLH(self: Self) bool {
113 return self.write_index == self.read_index;
114 }
115
116 fn isFullLH(self: Self) bool {
117 return self.mask2(self.write_index + self.buf.len) ==
118 self.read_index;
119 }
120
121 /// Returns `true` if the queue is empty and `false` otherwise.
122 pub fn isEmpty(self: *Self) bool {
123 self.mutex.lock();
124 defer self.mutex.unlock();
125 return self.isEmptyLH();
126 }
127
128 /// Returns `true` if the queue is full and `false` otherwise.
129 pub fn isFull(self: *Self) bool {
130 self.mutex.lock();
131 defer self.mutex.unlock();
132 return self.isFullLH();
133 }
134
135 /// Returns the length
136 fn len(self: Self) usize {
137 const wrap_offset = 2 * self.buf.len *
138 @intFromBool(self.write_index < self.read_index);
139 const adjusted_write_index = self.write_index + wrap_offset;
140 return adjusted_write_index - self.read_index;
141 }
142
143 /// Returns `index` modulo the length of the backing slice.
144 fn mask(self: Self, index: usize) usize {
145 return index % self.buf.len;
146 }
147
148 /// Returns `index` modulo twice the length of the backing slice.
149 fn mask2(self: Self, index: usize) usize {
150 return index % (2 * self.buf.len);
151 }
152
153 fn popLH(self: *Self) T {
154 const result = self.buf[self.mask(self.read_index)];
155 self.read_index = self.mask2(self.read_index + 1);
156 return result;
157 }
158 };
159}
160
161const testing = std.testing;
162const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
163test "Queue: simple push / pop" {
164 var queue: Queue(u8, 16) = .{};
165 queue.push(1);
166 queue.push(2);
167 const pop = queue.pop();
168 try testing.expectEqual(1, pop);
169 try testing.expectEqual(2, queue.pop());
170}
171
172const Thread = std.Thread;
173fn testPushPop(q: *Queue(u8, 2)) !void {
174 q.push(3);
175 try testing.expectEqual(2, q.pop());
176}
177
178test "Fill, wait to push, pop once in another thread" {
179 var queue: Queue(u8, 2) = .{};
180 queue.push(1);
181 queue.push(2);
182 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
183 try testing.expectEqual(false, queue.tryPush(3));
184 try testing.expectEqual(1, queue.pop());
185 t.join();
186 try testing.expectEqual(3, queue.pop());
187 try testing.expectEqual(null, queue.tryPop());
188}
189
190fn testPush(q: *Queue(u8, 2)) void {
191 q.push(0);
192 q.push(1);
193 q.push(2);
194 q.push(3);
195 q.push(4);
196}
197
198test "Try to pop, fill from another thread" {
199 var queue: Queue(u8, 2) = .{};
200 const thread = try Thread.spawn(cfg, testPush, .{&queue});
201 for (0..5) |idx| {
202 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
203 }
204 thread.join();
205}
206
207fn sleepyPop(q: *Queue(u8, 2)) !void {
208 // First we wait for the queue to be full.
209 while (!q.isFull())
210 try Thread.yield();
211
212 // Then we spuriously wake it up, because that's a thing that can
213 // happen.
214 q.not_full.signal();
215 q.not_empty.signal();
216
217 // Then give the other thread a good chance of waking up. It's not
218 // clear that yield guarantees the other thread will be scheduled,
219 // so we'll throw a sleep in here just to be sure. The queue is
220 // still full and the push in the other thread is still blocked
221 // waiting for space.
222 try Thread.yield();
223 std.Thread.sleep(std.time.ns_per_s);
224 // Finally, let that other thread go.
225 try std.testing.expectEqual(1, q.pop());
226
227 // This won't continue until the other thread has had a chance to
228 // put at least one item in the queue.
229 while (!q.isFull())
230 try Thread.yield();
231 // But we want to ensure that there's a second push waiting, so
232 // here's another sleep.
233 std.Thread.sleep(std.time.ns_per_s / 2);
234
235 // Another spurious wake...
236 q.not_full.signal();
237 q.not_empty.signal();
238 // And another chance for the other thread to see that it's
239 // spurious and go back to sleep.
240 try Thread.yield();
241 std.Thread.sleep(std.time.ns_per_s / 2);
242
243 // Pop that thing and we're done.
244 try std.testing.expectEqual(2, q.pop());
245}
246
247test "Fill, block, fill, block" {
248 // Fill the queue, block while trying to write another item, have
249 // a background thread unblock us, then block while trying to
250 // write yet another thing. Have the background thread unblock
251 // that too (after some time) then drain the queue. This test
252 // fails if the while loop in `push` is turned into an `if`.
253
254 var queue: Queue(u8, 2) = .{};
255 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
256 queue.push(1);
257 queue.push(2);
258 const now = std.time.milliTimestamp();
259 queue.push(3); // This one should block.
260 const then = std.time.milliTimestamp();
261
262 // Just to make sure the sleeps are yielding to this thread, make
263 // sure it took at least 900ms to do the push.
264 try std.testing.expect(then - now > 900);
265
266 // This should block again, waiting for the other thread.
267 queue.push(4);
268
269 // And once that push has gone through, the other thread's done.
270 thread.join();
271 try std.testing.expectEqual(3, queue.pop());
272 try std.testing.expectEqual(4, queue.pop());
273}
274
275fn sleepyPush(q: *Queue(u8, 1)) !void {
276 // Try to ensure the other thread has already started trying to pop.
277 try Thread.yield();
278 std.Thread.sleep(std.time.ns_per_s / 2);
279
280 // Spurious wake
281 q.not_full.signal();
282 q.not_empty.signal();
283
284 try Thread.yield();
285 std.Thread.sleep(std.time.ns_per_s / 2);
286
287 // Stick something in the queue so it can be popped.
288 q.push(1);
289 // Ensure it's been popped.
290 while (!q.isEmpty())
291 try Thread.yield();
292 // Give the other thread time to block again.
293 try Thread.yield();
294 std.Thread.sleep(std.time.ns_per_s / 2);
295
296 // Spurious wake
297 q.not_full.signal();
298 q.not_empty.signal();
299
300 q.push(2);
301}
302
303test "Drain, block, drain, block" {
304 // This is like fill/block/fill/block, but on the pop end. This
305 // test should fail if the `while` loop in `pop` is turned into an
306 // `if`.
307
308 var queue: Queue(u8, 1) = .{};
309 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
310 try std.testing.expectEqual(1, queue.pop());
311 try std.testing.expectEqual(2, queue.pop());
312 thread.join();
313}
314
315fn readerThread(q: *Queue(u8, 1)) !void {
316 try testing.expectEqual(1, q.pop());
317}
318
319test "2 readers" {
320 // 2 threads read, one thread writes
321 var queue: Queue(u8, 1) = .{};
322 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
323 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
324 try Thread.yield();
325 std.Thread.sleep(std.time.ns_per_s / 2);
326 queue.push(1);
327 queue.push(1);
328 t1.join();
329 t2.join();
330}
331
332fn writerThread(q: *Queue(u8, 1)) !void {
333 q.push(1);
334}
335
336test "2 writers" {
337 var queue: Queue(u8, 1) = .{};
338 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
339 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
340
341 try testing.expectEqual(1, queue.pop());
342 try testing.expectEqual(1, queue.pop());
343 t1.join();
344 t2.join();
345}