this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3d37f043a161fd32897ad44a2dddbfaab75faa8a 355 lines 11 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 return self.popAndSignalLH(); 34 } 35 36 /// Push an item into the queue. Blocks until an item has been 37 /// put in the queue. 38 pub fn push(self: *Self, item: T) void { 39 self.mutex.lock(); 40 defer self.mutex.unlock(); 41 while (self.isFullLH()) { 42 self.not_full.wait(&self.mutex); 43 } 44 std.debug.assert(!self.isFullLH()); 45 self.pushAndSignalLH(item); 46 } 47 48 /// Push an item into the queue. Returns true when the item 49 /// was successfully placed in the queue, false if the queue 50 /// was full. 51 pub fn tryPush(self: *Self, item: T) bool { 52 self.mutex.lock(); 53 defer self.mutex.unlock(); 54 if (self.isFullLH()) return false; 55 self.pushAndSignalLH(item); 56 return true; 57 } 58 59 /// Pop an item from the queue. Returns null when no item is 60 /// available. 61 pub fn tryPop(self: *Self) ?T { 62 self.mutex.lock(); 63 defer self.mutex.unlock(); 64 if (self.isEmptyLH()) return null; 65 return self.popAndSignalLH(); 66 } 67 68 /// Poll the queue. This call blocks until events are in the queue 69 pub fn poll(self: *Self) void { 70 self.mutex.lock(); 71 defer self.mutex.unlock(); 72 while (self.isEmptyLH()) { 73 self.not_empty.wait(&self.mutex); 74 } 75 std.debug.assert(!self.isEmptyLH()); 76 } 77 78 pub fn lock(self: *Self) void { 79 self.mutex.lock(); 80 } 81 82 pub fn unlock(self: *Self) void { 83 self.mutex.unlock(); 84 } 85 86 /// Used to efficiently drain the queue while the lock is externally held 87 pub fn drain(self: *Self) ?T { 88 if (self.isEmptyLH()) return null; 89 // Preserve queue push wakeups when draining under external lock. 90 // If the queue was full before this pop, a producer may be blocked 91 // waiting on not_full. 92 const was_full = self.isFullLH(); 93 const item = self.popLH(); 94 if (was_full) { 95 self.not_full.signal(); 96 } 97 return item; 98 } 99 100 fn isEmptyLH(self: Self) bool { 101 return self.write_index == self.read_index; 102 } 103 104 fn isFullLH(self: Self) bool { 105 return self.mask2(self.write_index + self.buf.len) == 106 self.read_index; 107 } 108 109 /// Returns `true` if the queue is empty and `false` otherwise. 110 pub fn isEmpty(self: *Self) bool { 111 self.mutex.lock(); 112 defer self.mutex.unlock(); 113 return self.isEmptyLH(); 114 } 115 116 /// Returns `true` if the queue is full and `false` otherwise. 117 pub fn isFull(self: *Self) bool { 118 self.mutex.lock(); 119 defer self.mutex.unlock(); 120 return self.isFullLH(); 121 } 122 123 /// Returns the length 124 fn len(self: Self) usize { 125 const wrap_offset = 2 * self.buf.len * 126 @intFromBool(self.write_index < self.read_index); 127 const adjusted_write_index = self.write_index + wrap_offset; 128 return adjusted_write_index - self.read_index; 129 } 130 131 /// Returns `index` modulo the length of the backing slice. 132 fn mask(self: Self, index: usize) usize { 133 return index % self.buf.len; 134 } 135 136 /// Returns `index` modulo twice the length of the backing slice. 137 fn mask2(self: Self, index: usize) usize { 138 return index % (2 * self.buf.len); 139 } 140 141 fn pushAndSignalLH(self: *Self, item: T) void { 142 const was_empty = self.isEmptyLH(); 143 self.buf[self.mask(self.write_index)] = item; 144 self.write_index = self.mask2(self.write_index + 1); 145 if (was_empty) { 146 self.not_empty.signal(); 147 } 148 } 149 150 fn popAndSignalLH(self: *Self) T { 151 const was_full = self.isFullLH(); 152 const result = self.popLH(); 153 if (was_full) { 154 self.not_full.signal(); 155 } 156 return result; 157 } 158 159 fn popLH(self: *Self) T { 160 const result = self.buf[self.mask(self.read_index)]; 161 self.read_index = self.mask2(self.read_index + 1); 162 return result; 163 } 164 }; 165} 166 167const testing = std.testing; 168const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 169test "Queue: simple push / pop" { 170 var queue: Queue(u8, 16) = .{}; 171 queue.push(1); 172 queue.push(2); 173 const pop = queue.pop(); 174 try testing.expectEqual(1, pop); 175 try testing.expectEqual(2, queue.pop()); 176} 177 178const Thread = std.Thread; 179fn testPushPop(q: *Queue(u8, 2)) !void { 180 q.push(3); 181 try testing.expectEqual(2, q.pop()); 182} 183 184test "Fill, wait to push, pop once in another thread" { 185 var queue: Queue(u8, 2) = .{}; 186 queue.push(1); 187 queue.push(2); 188 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 189 try testing.expectEqual(false, queue.tryPush(3)); 190 try testing.expectEqual(1, queue.pop()); 191 t.join(); 192 try testing.expectEqual(3, queue.pop()); 193 try testing.expectEqual(null, queue.tryPop()); 194} 195 196fn testPush(q: *Queue(u8, 2)) void { 197 q.push(0); 198 q.push(1); 199 q.push(2); 200 q.push(3); 201 q.push(4); 202} 203 204test "Try to pop, fill from another thread" { 205 var queue: Queue(u8, 2) = .{}; 206 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 207 for (0..5) |idx| { 208 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 209 } 210 thread.join(); 211} 212 213fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void { 214 // First we wait for the queue to be full. 215 while (state.load(.acquire) < 1) 216 try Thread.yield(); 217 218 // Then we spuriously wake it up, because that's a thing that can 219 // happen. 220 q.not_full.signal(); 221 q.not_empty.signal(); 222 223 // Then give the other thread a good chance of waking up. It's not 224 // clear that yield guarantees the other thread will be scheduled, 225 // so we'll throw a sleep in here just to be sure. The queue is 226 // still full and the push in the other thread is still blocked 227 // waiting for space. 228 try Thread.yield(); 229 std.Thread.sleep(10 * std.time.ns_per_ms); 230 // Finally, let that other thread go. 231 try std.testing.expectEqual(1, q.pop()); 232 233 // Wait for the other thread to signal it's ready for second push 234 while (state.load(.acquire) < 2) 235 try Thread.yield(); 236 // But we want to ensure that there's a second push waiting, so 237 // here's another sleep. 238 std.Thread.sleep(10 * std.time.ns_per_ms); 239 240 // Another spurious wake... 241 q.not_full.signal(); 242 q.not_empty.signal(); 243 // And another chance for the other thread to see that it's 244 // spurious and go back to sleep. 245 try Thread.yield(); 246 std.Thread.sleep(10 * std.time.ns_per_ms); 247 248 // Pop that thing and we're done. 249 try std.testing.expectEqual(2, q.pop()); 250} 251 252test "Fill, block, fill, block" { 253 // Fill the queue, block while trying to write another item, have 254 // a background thread unblock us, then block while trying to 255 // write yet another thing. Have the background thread unblock 256 // that too (after some time) then drain the queue. This test 257 // fails if the while loop in `push` is turned into an `if`. 258 259 var queue: Queue(u8, 2) = .{}; 260 var state = atomic.Value(u8).init(0); 261 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state }); 262 queue.push(1); 263 queue.push(2); 264 state.store(1, .release); 265 const now = std.time.milliTimestamp(); 266 queue.push(3); // This one should block. 267 const then = std.time.milliTimestamp(); 268 269 // Just to make sure the sleeps are yielding to this thread, make 270 // sure it took at least 5ms to do the push. 271 try std.testing.expect(then - now > 5); 272 273 state.store(2, .release); 274 // This should block again, waiting for the other thread. 275 queue.push(4); 276 277 // And once that push has gone through, the other thread's done. 278 thread.join(); 279 try std.testing.expectEqual(3, queue.pop()); 280 try std.testing.expectEqual(4, queue.pop()); 281} 282 283fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void { 284 // Try to ensure the other thread has already started trying to pop. 285 try Thread.yield(); 286 std.Thread.sleep(10 * std.time.ns_per_ms); 287 288 // Spurious wake 289 q.not_full.signal(); 290 q.not_empty.signal(); 291 292 try Thread.yield(); 293 std.Thread.sleep(10 * std.time.ns_per_ms); 294 295 // Stick something in the queue so it can be popped. 296 q.push(1); 297 // Ensure it's been popped. 298 while (state.load(.acquire) < 1) 299 try Thread.yield(); 300 // Give the other thread time to block again. 301 try Thread.yield(); 302 std.Thread.sleep(10 * std.time.ns_per_ms); 303 304 // Spurious wake 305 q.not_full.signal(); 306 q.not_empty.signal(); 307 308 q.push(2); 309} 310 311test "Drain, block, drain, block" { 312 // This is like fill/block/fill/block, but on the pop end. This 313 // test should fail if the `while` loop in `pop` is turned into an 314 // `if`. 315 316 var queue: Queue(u8, 1) = .{}; 317 var state = atomic.Value(u8).init(0); 318 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state }); 319 try std.testing.expectEqual(1, queue.pop()); 320 state.store(1, .release); 321 try std.testing.expectEqual(2, queue.pop()); 322 thread.join(); 323} 324 325fn readerThread(q: *Queue(u8, 1)) !void { 326 try testing.expectEqual(1, q.pop()); 327} 328 329test "2 readers" { 330 // 2 threads read, one thread writes 331 var queue: Queue(u8, 1) = .{}; 332 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 333 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 334 try Thread.yield(); 335 std.Thread.sleep(10 * std.time.ns_per_ms); 336 queue.push(1); 337 queue.push(1); 338 t1.join(); 339 t2.join(); 340} 341 342fn writerThread(q: *Queue(u8, 1)) !void { 343 q.push(1); 344} 345 346test "2 writers" { 347 var queue: Queue(u8, 1) = .{}; 348 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 349 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 350 351 try testing.expectEqual(1, queue.pop()); 352 try testing.expectEqual(1, queue.pop()); 353 t1.join(); 354 t2.join(); 355}