this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8a9c2d5e1b3778f1ea43c9bd5d325cfa72016584 347 lines 11 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 return self.popAndSignalLH(); 34 } 35 36 /// Push an item into the queue. Blocks until an item has been 37 /// put in the queue. 38 pub fn push(self: *Self, item: T) void { 39 self.mutex.lock(); 40 defer self.mutex.unlock(); 41 while (self.isFullLH()) { 42 self.not_full.wait(&self.mutex); 43 } 44 std.debug.assert(!self.isFullLH()); 45 self.pushAndSignalLH(item); 46 } 47 48 /// Push an item into the queue. Returns true when the item 49 /// was successfully placed in the queue, false if the queue 50 /// was full. 51 pub fn tryPush(self: *Self, item: T) bool { 52 self.mutex.lock(); 53 defer self.mutex.unlock(); 54 if (self.isFullLH()) return false; 55 self.pushAndSignalLH(item); 56 return true; 57 } 58 59 /// Pop an item from the queue. Returns null when no item is 60 /// available. 61 pub fn tryPop(self: *Self) ?T { 62 self.mutex.lock(); 63 defer self.mutex.unlock(); 64 if (self.isEmptyLH()) return null; 65 return self.popAndSignalLH(); 66 } 67 68 /// Poll the queue. This call blocks until events are in the queue 69 pub fn poll(self: *Self) void { 70 self.mutex.lock(); 71 defer self.mutex.unlock(); 72 while (self.isEmptyLH()) { 73 self.not_empty.wait(&self.mutex); 74 } 75 std.debug.assert(!self.isEmptyLH()); 76 } 77 78 pub fn lock(self: *Self) void { 79 self.mutex.lock(); 80 } 81 82 pub fn unlock(self: *Self) void { 83 self.mutex.unlock(); 84 } 85 86 /// Used to efficiently drain the queue while the lock is externally held 87 pub fn drain(self: *Self) ?T { 88 if (self.isEmptyLH()) return null; 89 return self.popLH(); 90 } 91 92 fn isEmptyLH(self: Self) bool { 93 return self.write_index == self.read_index; 94 } 95 96 fn isFullLH(self: Self) bool { 97 return self.mask2(self.write_index + self.buf.len) == 98 self.read_index; 99 } 100 101 /// Returns `true` if the queue is empty and `false` otherwise. 102 pub fn isEmpty(self: *Self) bool { 103 self.mutex.lock(); 104 defer self.mutex.unlock(); 105 return self.isEmptyLH(); 106 } 107 108 /// Returns `true` if the queue is full and `false` otherwise. 109 pub fn isFull(self: *Self) bool { 110 self.mutex.lock(); 111 defer self.mutex.unlock(); 112 return self.isFullLH(); 113 } 114 115 /// Returns the length 116 fn len(self: Self) usize { 117 const wrap_offset = 2 * self.buf.len * 118 @intFromBool(self.write_index < self.read_index); 119 const adjusted_write_index = self.write_index + wrap_offset; 120 return adjusted_write_index - self.read_index; 121 } 122 123 /// Returns `index` modulo the length of the backing slice. 124 fn mask(self: Self, index: usize) usize { 125 return index % self.buf.len; 126 } 127 128 /// Returns `index` modulo twice the length of the backing slice. 129 fn mask2(self: Self, index: usize) usize { 130 return index % (2 * self.buf.len); 131 } 132 133 fn pushAndSignalLH(self: *Self, item: T) void { 134 const was_empty = self.isEmptyLH(); 135 self.buf[self.mask(self.write_index)] = item; 136 self.write_index = self.mask2(self.write_index + 1); 137 if (was_empty) { 138 self.not_empty.signal(); 139 } 140 } 141 142 fn popAndSignalLH(self: *Self) T { 143 const was_full = self.isFullLH(); 144 const result = self.popLH(); 145 if (was_full) { 146 self.not_full.signal(); 147 } 148 return result; 149 } 150 151 fn popLH(self: *Self) T { 152 const result = self.buf[self.mask(self.read_index)]; 153 self.read_index = self.mask2(self.read_index + 1); 154 return result; 155 } 156 }; 157} 158 159const testing = std.testing; 160const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 161test "Queue: simple push / pop" { 162 var queue: Queue(u8, 16) = .{}; 163 queue.push(1); 164 queue.push(2); 165 const pop = queue.pop(); 166 try testing.expectEqual(1, pop); 167 try testing.expectEqual(2, queue.pop()); 168} 169 170const Thread = std.Thread; 171fn testPushPop(q: *Queue(u8, 2)) !void { 172 q.push(3); 173 try testing.expectEqual(2, q.pop()); 174} 175 176test "Fill, wait to push, pop once in another thread" { 177 var queue: Queue(u8, 2) = .{}; 178 queue.push(1); 179 queue.push(2); 180 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 181 try testing.expectEqual(false, queue.tryPush(3)); 182 try testing.expectEqual(1, queue.pop()); 183 t.join(); 184 try testing.expectEqual(3, queue.pop()); 185 try testing.expectEqual(null, queue.tryPop()); 186} 187 188fn testPush(q: *Queue(u8, 2)) void { 189 q.push(0); 190 q.push(1); 191 q.push(2); 192 q.push(3); 193 q.push(4); 194} 195 196test "Try to pop, fill from another thread" { 197 var queue: Queue(u8, 2) = .{}; 198 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 199 for (0..5) |idx| { 200 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 201 } 202 thread.join(); 203} 204 205fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void { 206 // First we wait for the queue to be full. 207 while (state.load(.acquire) < 1) 208 try Thread.yield(); 209 210 // Then we spuriously wake it up, because that's a thing that can 211 // happen. 212 q.not_full.signal(); 213 q.not_empty.signal(); 214 215 // Then give the other thread a good chance of waking up. It's not 216 // clear that yield guarantees the other thread will be scheduled, 217 // so we'll throw a sleep in here just to be sure. The queue is 218 // still full and the push in the other thread is still blocked 219 // waiting for space. 220 try Thread.yield(); 221 std.Thread.sleep(10 * std.time.ns_per_ms); 222 // Finally, let that other thread go. 223 try std.testing.expectEqual(1, q.pop()); 224 225 // Wait for the other thread to signal it's ready for second push 226 while (state.load(.acquire) < 2) 227 try Thread.yield(); 228 // But we want to ensure that there's a second push waiting, so 229 // here's another sleep. 230 std.Thread.sleep(10 * std.time.ns_per_ms); 231 232 // Another spurious wake... 233 q.not_full.signal(); 234 q.not_empty.signal(); 235 // And another chance for the other thread to see that it's 236 // spurious and go back to sleep. 237 try Thread.yield(); 238 std.Thread.sleep(10 * std.time.ns_per_ms); 239 240 // Pop that thing and we're done. 241 try std.testing.expectEqual(2, q.pop()); 242} 243 244test "Fill, block, fill, block" { 245 // Fill the queue, block while trying to write another item, have 246 // a background thread unblock us, then block while trying to 247 // write yet another thing. Have the background thread unblock 248 // that too (after some time) then drain the queue. This test 249 // fails if the while loop in `push` is turned into an `if`. 250 251 var queue: Queue(u8, 2) = .{}; 252 var state = atomic.Value(u8).init(0); 253 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state }); 254 queue.push(1); 255 queue.push(2); 256 state.store(1, .release); 257 const now = std.time.milliTimestamp(); 258 queue.push(3); // This one should block. 259 const then = std.time.milliTimestamp(); 260 261 // Just to make sure the sleeps are yielding to this thread, make 262 // sure it took at least 5ms to do the push. 263 try std.testing.expect(then - now > 5); 264 265 state.store(2, .release); 266 // This should block again, waiting for the other thread. 267 queue.push(4); 268 269 // And once that push has gone through, the other thread's done. 270 thread.join(); 271 try std.testing.expectEqual(3, queue.pop()); 272 try std.testing.expectEqual(4, queue.pop()); 273} 274 275fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void { 276 // Try to ensure the other thread has already started trying to pop. 277 try Thread.yield(); 278 std.Thread.sleep(10 * std.time.ns_per_ms); 279 280 // Spurious wake 281 q.not_full.signal(); 282 q.not_empty.signal(); 283 284 try Thread.yield(); 285 std.Thread.sleep(10 * std.time.ns_per_ms); 286 287 // Stick something in the queue so it can be popped. 288 q.push(1); 289 // Ensure it's been popped. 290 while (state.load(.acquire) < 1) 291 try Thread.yield(); 292 // Give the other thread time to block again. 293 try Thread.yield(); 294 std.Thread.sleep(10 * std.time.ns_per_ms); 295 296 // Spurious wake 297 q.not_full.signal(); 298 q.not_empty.signal(); 299 300 q.push(2); 301} 302 303test "Drain, block, drain, block" { 304 // This is like fill/block/fill/block, but on the pop end. This 305 // test should fail if the `while` loop in `pop` is turned into an 306 // `if`. 307 308 var queue: Queue(u8, 1) = .{}; 309 var state = atomic.Value(u8).init(0); 310 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state }); 311 try std.testing.expectEqual(1, queue.pop()); 312 state.store(1, .release); 313 try std.testing.expectEqual(2, queue.pop()); 314 thread.join(); 315} 316 317fn readerThread(q: *Queue(u8, 1)) !void { 318 try testing.expectEqual(1, q.pop()); 319} 320 321test "2 readers" { 322 // 2 threads read, one thread writes 323 var queue: Queue(u8, 1) = .{}; 324 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 325 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 326 try Thread.yield(); 327 std.Thread.sleep(10 * std.time.ns_per_ms); 328 queue.push(1); 329 queue.push(1); 330 t1.join(); 331 t2.join(); 332} 333 334fn writerThread(q: *Queue(u8, 1)) !void { 335 q.push(1); 336} 337 338test "2 writers" { 339 var queue: Queue(u8, 1) = .{}; 340 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 341 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 342 343 try testing.expectEqual(1, queue.pop()); 344 try testing.expectEqual(1, queue.pop()); 345 t1.join(); 346 t2.join(); 347}