this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 01605eebf65be21caa85ee9d840047f864553308 325 lines 10 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 if (self.isFullLH()) { 34 // If we are full, wake up a push that might be 35 // waiting here. 36 self.not_full.signal(); 37 } 38 39 const result = self.buf[self.mask(self.read_index)]; 40 self.read_index = self.mask2(self.read_index + 1); 41 return result; 42 } 43 44 /// Push an item into the queue. Blocks until an item has been 45 /// put in the queue. 46 pub fn push(self: *Self, item: T) void { 47 self.mutex.lock(); 48 defer self.mutex.unlock(); 49 while (self.isFullLH()) { 50 self.not_full.wait(&self.mutex); 51 } 52 if (self.isEmptyLH()) { 53 // If we were empty, wake up a pop if it was waiting. 54 self.not_empty.signal(); 55 } 56 std.debug.assert(!self.isFullLH()); 57 58 self.buf[self.mask(self.write_index)] = item; 59 self.write_index = self.mask2(self.write_index + 1); 60 } 61 62 /// Push an item into the queue. Returns true when the item 63 /// was successfully placed in the queue, false if the queue 64 /// was full. 65 pub fn tryPush(self: *Self, item: T) bool { 66 self.mutex.lock(); 67 if (self.isFullLH()) { 68 self.mutex.unlock(); 69 return false; 70 } 71 self.mutex.unlock(); 72 self.push(item); 73 return true; 74 } 75 76 /// Pop an item from the queue. Returns null when no item is 77 /// available. 78 pub fn tryPop(self: *Self) ?T { 79 self.mutex.lock(); 80 if (self.isEmptyLH()) { 81 self.mutex.unlock(); 82 return null; 83 } 84 self.mutex.unlock(); 85 return self.pop(); 86 } 87 88 /// Poll the queue. This call blocks until events are in the queue 89 pub fn poll(self: *Self) void { 90 self.mutex.lock(); 91 defer self.mutex.unlock(); 92 while (self.isEmptyLH()) { 93 self.not_empty.wait(&self.mutex); 94 } 95 std.debug.assert(!self.isEmptyLH()); 96 } 97 98 fn isEmptyLH(self: Self) bool { 99 return self.write_index == self.read_index; 100 } 101 102 fn isFullLH(self: Self) bool { 103 return self.mask2(self.write_index + self.buf.len) == 104 self.read_index; 105 } 106 107 /// Returns `true` if the queue is empty and `false` otherwise. 108 pub fn isEmpty(self: *Self) bool { 109 self.mutex.lock(); 110 defer self.mutex.unlock(); 111 return self.isEmptyLH(); 112 } 113 114 /// Returns `true` if the queue is full and `false` otherwise. 115 pub fn isFull(self: *Self) bool { 116 self.mutex.lock(); 117 defer self.mutex.unlock(); 118 return self.isFullLH(); 119 } 120 121 /// Returns the length 122 fn len(self: Self) usize { 123 const wrap_offset = 2 * self.buf.len * 124 @intFromBool(self.write_index < self.read_index); 125 const adjusted_write_index = self.write_index + wrap_offset; 126 return adjusted_write_index - self.read_index; 127 } 128 129 /// Returns `index` modulo the length of the backing slice. 130 fn mask(self: Self, index: usize) usize { 131 return index % self.buf.len; 132 } 133 134 /// Returns `index` modulo twice the length of the backing slice. 135 fn mask2(self: Self, index: usize) usize { 136 return index % (2 * self.buf.len); 137 } 138 }; 139} 140 141const testing = std.testing; 142const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 143test "Queue: simple push / pop" { 144 var queue: Queue(u8, 16) = .{}; 145 queue.push(1); 146 queue.push(2); 147 const pop = queue.pop(); 148 try testing.expectEqual(1, pop); 149 try testing.expectEqual(2, queue.pop()); 150} 151 152const Thread = std.Thread; 153fn testPushPop(q: *Queue(u8, 2)) !void { 154 q.push(3); 155 try testing.expectEqual(2, q.pop()); 156} 157 158test "Fill, wait to push, pop once in another thread" { 159 var queue: Queue(u8, 2) = .{}; 160 queue.push(1); 161 queue.push(2); 162 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 163 try testing.expectEqual(false, queue.tryPush(3)); 164 try testing.expectEqual(1, queue.pop()); 165 t.join(); 166 try testing.expectEqual(3, queue.pop()); 167 try testing.expectEqual(null, queue.tryPop()); 168} 169 170fn testPush(q: *Queue(u8, 2)) void { 171 q.push(0); 172 q.push(1); 173 q.push(2); 174 q.push(3); 175 q.push(4); 176} 177 178test "Try to pop, fill from another thread" { 179 var queue: Queue(u8, 2) = .{}; 180 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 181 for (0..5) |idx| { 182 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 183 } 184 thread.join(); 185} 186 187fn sleepyPop(q: *Queue(u8, 2)) !void { 188 // First we wait for the queue to be full. 189 while (!q.isFull()) 190 try Thread.yield(); 191 192 // Then we spuriously wake it up, because that's a thing that can 193 // happen. 194 q.not_full.signal(); 195 q.not_empty.signal(); 196 197 // Then give the other thread a good chance of waking up. It's not 198 // clear that yield guarantees the other thread will be scheduled, 199 // so we'll throw a sleep in here just to be sure. The queue is 200 // still full and the push in the other thread is still blocked 201 // waiting for space. 202 try Thread.yield(); 203 std.time.sleep(std.time.ns_per_s); 204 // Finally, let that other thread go. 205 try std.testing.expectEqual(1, q.pop()); 206 207 // This won't continue until the other thread has had a chance to 208 // put at least one item in the queue. 209 while (!q.isFull()) 210 try Thread.yield(); 211 // But we want to ensure that there's a second push waiting, so 212 // here's another sleep. 213 std.time.sleep(std.time.ns_per_s / 2); 214 215 // Another spurious wake... 216 q.not_full.signal(); 217 q.not_empty.signal(); 218 // And another chance for the other thread to see that it's 219 // spurious and go back to sleep. 220 try Thread.yield(); 221 std.time.sleep(std.time.ns_per_s / 2); 222 223 // Pop that thing and we're done. 224 try std.testing.expectEqual(2, q.pop()); 225} 226 227test "Fill, block, fill, block" { 228 // Fill the queue, block while trying to write another item, have 229 // a background thread unblock us, then block while trying to 230 // write yet another thing. Have the background thread unblock 231 // that too (after some time) then drain the queue. This test 232 // fails if the while loop in `push` is turned into an `if`. 233 234 var queue: Queue(u8, 2) = .{}; 235 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue}); 236 queue.push(1); 237 queue.push(2); 238 const now = std.time.milliTimestamp(); 239 queue.push(3); // This one should block. 240 const then = std.time.milliTimestamp(); 241 242 // Just to make sure the sleeps are yielding to this thread, make 243 // sure it took at least 900ms to do the push. 244 try std.testing.expect(then - now > 900); 245 246 // This should block again, waiting for the other thread. 247 queue.push(4); 248 249 // And once that push has gone through, the other thread's done. 250 thread.join(); 251 try std.testing.expectEqual(3, queue.pop()); 252 try std.testing.expectEqual(4, queue.pop()); 253} 254 255fn sleepyPush(q: *Queue(u8, 1)) !void { 256 // Try to ensure the other thread has already started trying to pop. 257 try Thread.yield(); 258 std.time.sleep(std.time.ns_per_s / 2); 259 260 // Spurious wake 261 q.not_full.signal(); 262 q.not_empty.signal(); 263 264 try Thread.yield(); 265 std.time.sleep(std.time.ns_per_s / 2); 266 267 // Stick something in the queue so it can be popped. 268 q.push(1); 269 // Ensure it's been popped. 270 while (!q.isEmpty()) 271 try Thread.yield(); 272 // Give the other thread time to block again. 273 try Thread.yield(); 274 std.time.sleep(std.time.ns_per_s / 2); 275 276 // Spurious wake 277 q.not_full.signal(); 278 q.not_empty.signal(); 279 280 q.push(2); 281} 282 283test "Drain, block, drain, block" { 284 // This is like fill/block/fill/block, but on the pop end. This 285 // test should fail if the `while` loop in `pop` is turned into an 286 // `if`. 287 288 var queue: Queue(u8, 1) = .{}; 289 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue}); 290 try std.testing.expectEqual(1, queue.pop()); 291 try std.testing.expectEqual(2, queue.pop()); 292 thread.join(); 293} 294 295fn readerThread(q: *Queue(u8, 1)) !void { 296 try testing.expectEqual(1, q.pop()); 297} 298 299test "2 readers" { 300 // 2 threads read, one thread writes 301 var queue: Queue(u8, 1) = .{}; 302 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 303 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 304 try Thread.yield(); 305 std.time.sleep(std.time.ns_per_s / 2); 306 queue.push(1); 307 queue.push(1); 308 t1.join(); 309 t2.join(); 310} 311 312fn writerThread(q: *Queue(u8, 1)) !void { 313 q.push(1); 314} 315 316test "2 writers" { 317 var queue: Queue(u8, 1) = .{}; 318 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 319 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 320 321 try testing.expectEqual(1, queue.pop()); 322 try testing.expectEqual(1, queue.pop()); 323 t1.join(); 324 t2.join(); 325}