this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at ea0c083a24251604d46c388e42f6cf8bb3fbcb2b 349 lines 11 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 if (self.isFullLH()) { 34 // If we are full, wake up a push that might be 35 // waiting here. 36 self.not_full.signal(); 37 } 38 39 return self.popLH(); 40 } 41 42 /// Push an item into the queue. Blocks until an item has been 43 /// put in the queue. 44 pub fn push(self: *Self, item: T) void { 45 self.mutex.lock(); 46 defer self.mutex.unlock(); 47 while (self.isFullLH()) { 48 self.not_full.wait(&self.mutex); 49 } 50 std.debug.assert(!self.isFullLH()); 51 const was_empty = self.isEmptyLH(); 52 53 self.buf[self.mask(self.write_index)] = item; 54 self.write_index = self.mask2(self.write_index + 1); 55 56 // If we were empty, wake up a pop if it was waiting. 57 if (was_empty) { 58 self.not_empty.signal(); 59 } 60 } 61 62 /// Push an item into the queue. Returns true when the item 63 /// was successfully placed in the queue, false if the queue 64 /// was full. 65 pub fn tryPush(self: *Self, item: T) bool { 66 self.mutex.lock(); 67 if (self.isFullLH()) { 68 self.mutex.unlock(); 69 return false; 70 } 71 self.mutex.unlock(); 72 self.push(item); 73 return true; 74 } 75 76 /// Pop an item from the queue. Returns null when no item is 77 /// available. 78 pub fn tryPop(self: *Self) ?T { 79 self.mutex.lock(); 80 if (self.isEmptyLH()) { 81 self.mutex.unlock(); 82 return null; 83 } 84 self.mutex.unlock(); 85 return self.pop(); 86 } 87 88 /// Poll the queue. This call blocks until events are in the queue 89 pub fn poll(self: *Self) void { 90 self.mutex.lock(); 91 defer self.mutex.unlock(); 92 while (self.isEmptyLH()) { 93 self.not_empty.wait(&self.mutex); 94 } 95 std.debug.assert(!self.isEmptyLH()); 96 } 97 98 pub fn lock(self: *Self) void { 99 self.mutex.lock(); 100 } 101 102 pub fn unlock(self: *Self) void { 103 self.mutex.unlock(); 104 } 105 106 /// Used to efficiently drain the queue while the lock is externally held 107 pub fn drain(self: *Self) ?T { 108 if (self.isEmptyLH()) return null; 109 return self.popLH(); 110 } 111 112 fn isEmptyLH(self: Self) bool { 113 return self.write_index == self.read_index; 114 } 115 116 fn isFullLH(self: Self) bool { 117 return self.mask2(self.write_index + self.buf.len) == 118 self.read_index; 119 } 120 121 /// Returns `true` if the queue is empty and `false` otherwise. 122 pub fn isEmpty(self: *Self) bool { 123 self.mutex.lock(); 124 defer self.mutex.unlock(); 125 return self.isEmptyLH(); 126 } 127 128 /// Returns `true` if the queue is full and `false` otherwise. 129 pub fn isFull(self: *Self) bool { 130 self.mutex.lock(); 131 defer self.mutex.unlock(); 132 return self.isFullLH(); 133 } 134 135 /// Returns the length 136 fn len(self: Self) usize { 137 const wrap_offset = 2 * self.buf.len * 138 @intFromBool(self.write_index < self.read_index); 139 const adjusted_write_index = self.write_index + wrap_offset; 140 return adjusted_write_index - self.read_index; 141 } 142 143 /// Returns `index` modulo the length of the backing slice. 144 fn mask(self: Self, index: usize) usize { 145 return index % self.buf.len; 146 } 147 148 /// Returns `index` modulo twice the length of the backing slice. 149 fn mask2(self: Self, index: usize) usize { 150 return index % (2 * self.buf.len); 151 } 152 153 fn popLH(self: *Self) T { 154 const result = self.buf[self.mask(self.read_index)]; 155 self.read_index = self.mask2(self.read_index + 1); 156 return result; 157 } 158 }; 159} 160 161const testing = std.testing; 162const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 163test "Queue: simple push / pop" { 164 var queue: Queue(u8, 16) = .{}; 165 queue.push(1); 166 queue.push(2); 167 const pop = queue.pop(); 168 try testing.expectEqual(1, pop); 169 try testing.expectEqual(2, queue.pop()); 170} 171 172const Thread = std.Thread; 173fn testPushPop(q: *Queue(u8, 2)) !void { 174 q.push(3); 175 try testing.expectEqual(2, q.pop()); 176} 177 178test "Fill, wait to push, pop once in another thread" { 179 var queue: Queue(u8, 2) = .{}; 180 queue.push(1); 181 queue.push(2); 182 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 183 try testing.expectEqual(false, queue.tryPush(3)); 184 try testing.expectEqual(1, queue.pop()); 185 t.join(); 186 try testing.expectEqual(3, queue.pop()); 187 try testing.expectEqual(null, queue.tryPop()); 188} 189 190fn testPush(q: *Queue(u8, 2)) void { 191 q.push(0); 192 q.push(1); 193 q.push(2); 194 q.push(3); 195 q.push(4); 196} 197 198test "Try to pop, fill from another thread" { 199 var queue: Queue(u8, 2) = .{}; 200 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 201 for (0..5) |idx| { 202 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 203 } 204 thread.join(); 205} 206 207fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void { 208 // First we wait for the queue to be full. 209 while (state.load(.acquire) < 1) 210 try Thread.yield(); 211 212 // Then we spuriously wake it up, because that's a thing that can 213 // happen. 214 q.not_full.signal(); 215 q.not_empty.signal(); 216 217 // Then give the other thread a good chance of waking up. It's not 218 // clear that yield guarantees the other thread will be scheduled, 219 // so we'll throw a sleep in here just to be sure. The queue is 220 // still full and the push in the other thread is still blocked 221 // waiting for space. 222 try Thread.yield(); 223 std.Thread.sleep(10 * std.time.ns_per_ms); 224 // Finally, let that other thread go. 225 try std.testing.expectEqual(1, q.pop()); 226 227 // Wait for the other thread to signal it's ready for second push 228 while (state.load(.acquire) < 2) 229 try Thread.yield(); 230 // But we want to ensure that there's a second push waiting, so 231 // here's another sleep. 232 std.Thread.sleep(10 * std.time.ns_per_ms); 233 234 // Another spurious wake... 235 q.not_full.signal(); 236 q.not_empty.signal(); 237 // And another chance for the other thread to see that it's 238 // spurious and go back to sleep. 239 try Thread.yield(); 240 std.Thread.sleep(10 * std.time.ns_per_ms); 241 242 // Pop that thing and we're done. 243 try std.testing.expectEqual(2, q.pop()); 244} 245 246test "Fill, block, fill, block" { 247 // Fill the queue, block while trying to write another item, have 248 // a background thread unblock us, then block while trying to 249 // write yet another thing. Have the background thread unblock 250 // that too (after some time) then drain the queue. This test 251 // fails if the while loop in `push` is turned into an `if`. 252 253 var queue: Queue(u8, 2) = .{}; 254 var state = atomic.Value(u8).init(0); 255 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state }); 256 queue.push(1); 257 queue.push(2); 258 state.store(1, .release); 259 const now = std.time.milliTimestamp(); 260 queue.push(3); // This one should block. 261 const then = std.time.milliTimestamp(); 262 263 // Just to make sure the sleeps are yielding to this thread, make 264 // sure it took at least 5ms to do the push. 265 try std.testing.expect(then - now > 5); 266 267 state.store(2, .release); 268 // This should block again, waiting for the other thread. 269 queue.push(4); 270 271 // And once that push has gone through, the other thread's done. 272 thread.join(); 273 try std.testing.expectEqual(3, queue.pop()); 274 try std.testing.expectEqual(4, queue.pop()); 275} 276 277fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void { 278 // Try to ensure the other thread has already started trying to pop. 279 try Thread.yield(); 280 std.Thread.sleep(10 * std.time.ns_per_ms); 281 282 // Spurious wake 283 q.not_full.signal(); 284 q.not_empty.signal(); 285 286 try Thread.yield(); 287 std.Thread.sleep(10 * std.time.ns_per_ms); 288 289 // Stick something in the queue so it can be popped. 290 q.push(1); 291 // Ensure it's been popped. 292 while (state.load(.acquire) < 1) 293 try Thread.yield(); 294 // Give the other thread time to block again. 295 try Thread.yield(); 296 std.Thread.sleep(10 * std.time.ns_per_ms); 297 298 // Spurious wake 299 q.not_full.signal(); 300 q.not_empty.signal(); 301 302 q.push(2); 303} 304 305test "Drain, block, drain, block" { 306 // This is like fill/block/fill/block, but on the pop end. This 307 // test should fail if the `while` loop in `pop` is turned into an 308 // `if`. 309 310 var queue: Queue(u8, 1) = .{}; 311 var state = atomic.Value(u8).init(0); 312 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state }); 313 try std.testing.expectEqual(1, queue.pop()); 314 state.store(1, .release); 315 try std.testing.expectEqual(2, queue.pop()); 316 thread.join(); 317} 318 319fn readerThread(q: *Queue(u8, 1)) !void { 320 try testing.expectEqual(1, q.pop()); 321} 322 323test "2 readers" { 324 // 2 threads read, one thread writes 325 var queue: Queue(u8, 1) = .{}; 326 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 327 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 328 try Thread.yield(); 329 std.Thread.sleep(10 * std.time.ns_per_ms); 330 queue.push(1); 331 queue.push(1); 332 t1.join(); 333 t2.join(); 334} 335 336fn writerThread(q: *Queue(u8, 1)) !void { 337 q.push(1); 338} 339 340test "2 writers" { 341 var queue: Queue(u8, 1) = .{}; 342 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 343 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 344 345 try testing.expectEqual(1, queue.pop()); 346 try testing.expectEqual(1, queue.pop()); 347 t1.join(); 348 t2.join(); 349}