this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 376 lines 12 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4 5/// Thread safe. Fixed size. Blocking push and pop. 6pub fn Queue( 7 comptime T: type, 8 comptime size: usize, 9) type { 10 return struct { 11 buf: [size]T = undefined, 12 13 read_index: usize = 0, 14 write_index: usize = 0, 15 16 io: std.Io, 17 mutex: std.Io.Mutex = .init, 18 // blocks when the buffer is full 19 not_full: std.Io.Condition = .init, 20 // ...or empty 21 not_empty: std.Io.Condition = .init, 22 23 const Self = @This(); 24 25 pub fn init(io: std.Io) Self { 26 return .{ .io = io }; 27 } 28 29 /// Pop an item from the queue. Blocks until an item is available. 30 pub fn pop(self: *Self) !T { 31 try self.mutex.lock(self.io); 32 defer self.mutex.unlock(self.io); 33 while (self.isEmptyLH()) { 34 try self.not_empty.wait(self.io, &self.mutex); 35 } 36 std.debug.assert(!self.isEmptyLH()); 37 return self.popAndSignalLH(); 38 } 39 40 /// Push an item into the queue. Blocks until an item has been 41 /// put in the queue. 42 pub fn push(self: *Self, item: T) !void { 43 try self.mutex.lock(self.io); 44 defer self.mutex.unlock(self.io); 45 while (self.isFullLH()) { 46 try self.not_full.wait(self.io, &self.mutex); 47 } 48 std.debug.assert(!self.isFullLH()); 49 self.pushAndSignalLH(item); 50 } 51 52 /// Push an item into the queue. Returns true when the item 53 /// was successfully placed in the queue, false if the queue 54 /// was full. 55 pub fn tryPush(self: *Self, item: T) !bool { 56 try self.mutex.lock(self.io); 57 defer self.mutex.unlock(self.io); 58 if (self.isFullLH()) return false; 59 self.pushAndSignalLH(item); 60 return true; 61 } 62 63 /// Pop an item from the queue. Returns null when no item is 64 /// available. 65 pub fn tryPop(self: *Self) !?T { 66 try self.mutex.lock(self.io); 67 defer self.mutex.unlock(self.io); 68 if (self.isEmptyLH()) return null; 69 return self.popAndSignalLH(); 70 } 71 72 /// Poll the queue. This call blocks until events are in the queue 73 pub fn poll(self: *Self) !void { 74 try self.mutex.lock(self.io); 75 defer self.mutex.unlock(self.io); 76 while (self.isEmptyLH()) { 77 try self.not_empty.wait(self.io, &self.mutex); 78 } 79 std.debug.assert(!self.isEmptyLH()); 80 } 81 82 pub fn lock(self: *Self) !void { 83 try self.mutex.lock(self.io); 84 } 85 86 pub fn unlock(self: *Self) void { 87 self.mutex.unlock(self.io); 88 } 89 90 /// Used to efficiently drain the queue while the lock is externally held 91 pub fn drain(self: *Self) ?T { 92 if (self.isEmptyLH()) return null; 93 // Preserve queue push wakeups when draining under external lock. 94 // If the queue was full before this pop, a producer may be blocked 95 // waiting on not_full. 96 const was_full = self.isFullLH(); 97 const item = self.popLH(); 98 if (was_full) { 99 self.not_full.signal(self.io); 100 } 101 return item; 102 } 103 104 fn isEmptyLH(self: Self) bool { 105 return self.write_index == self.read_index; 106 } 107 108 fn isFullLH(self: Self) bool { 109 return self.mask2(self.write_index + self.buf.len) == 110 self.read_index; 111 } 112 113 /// Returns `true` if the queue is empty and `false` otherwise. 114 pub fn isEmpty(self: *Self) !bool { 115 try self.mutex.lock(self.io); 116 defer self.mutex.unlock(self.io); 117 return self.isEmptyLH(); 118 } 119 120 /// Returns `true` if the queue is full and `false` otherwise. 121 pub fn isFull(self: *Self) !bool { 122 try self.mutex.lock(self.io); 123 defer self.mutex.unlock(self.io); 124 return self.isFullLH(); 125 } 126 127 /// Returns the length 128 fn len(self: Self) usize { 129 const wrap_offset = 2 * self.buf.len * 130 @intFromBool(self.write_index < self.read_index); 131 const adjusted_write_index = self.write_index + wrap_offset; 132 return adjusted_write_index - self.read_index; 133 } 134 135 /// Returns `index` modulo the length of the backing slice. 136 fn mask(self: Self, index: usize) usize { 137 return index % self.buf.len; 138 } 139 140 /// Returns `index` modulo twice the length of the backing slice. 141 fn mask2(self: Self, index: usize) usize { 142 return index % (2 * self.buf.len); 143 } 144 145 fn pushAndSignalLH(self: *Self, item: T) void { 146 const was_empty = self.isEmptyLH(); 147 self.buf[self.mask(self.write_index)] = item; 148 self.write_index = self.mask2(self.write_index + 1); 149 if (was_empty) { 150 self.not_empty.signal(self.io); 151 } 152 } 153 154 fn popAndSignalLH(self: *Self) T { 155 const was_full = self.isFullLH(); 156 const result = self.popLH(); 157 if (was_full) { 158 self.not_full.signal(self.io); 159 } 160 return result; 161 } 162 163 fn popLH(self: *Self) T { 164 const result = self.buf[self.mask(self.read_index)]; 165 self.read_index = self.mask2(self.read_index + 1); 166 return result; 167 } 168 }; 169} 170 171const testing = std.testing; 172const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 173test "Queue: simple push / pop" { 174 const io = std.testing.io; 175 var queue: Queue(u8, 16) = .init(io); 176 try queue.push(1); 177 try queue.push(2); 178 const pop = try queue.pop(); 179 try testing.expectEqual(1, pop); 180 try testing.expectEqual(2, try queue.pop()); 181} 182 183const Thread = std.Thread; 184fn testPushPop(q: *Queue(u8, 2)) !void { 185 try q.push(3); 186 try testing.expectEqual(2, try q.pop()); 187} 188 189test "Fill, wait to push, pop once in another thread" { 190 const io = std.testing.io; 191 var queue: Queue(u8, 2) = .init(io); 192 try queue.push(1); 193 try queue.push(2); 194 var t = try io.concurrent(testPushPop, .{&queue}); 195 try testing.expectEqual(false, try queue.tryPush(3)); 196 try testing.expectEqual(1, try queue.pop()); 197 try t.await(io); 198 try testing.expectEqual(3, try queue.pop()); 199 try testing.expectEqual(null, try queue.tryPop()); 200} 201 202fn testPush(q: *Queue(u8, 2)) !void { 203 try q.push(0); 204 try q.push(1); 205 try q.push(2); 206 try q.push(3); 207 try q.push(4); 208} 209 210test "Try to pop, fill from another thread" { 211 const io = std.testing.io; 212 var queue: Queue(u8, 2) = .init(io); 213 var task = try io.concurrent(testPush, .{&queue}); 214 defer task.cancel(io) catch {}; 215 for (0..5) |idx| { 216 try testing.expectEqual(@as(u8, @intCast(idx)), try queue.pop()); 217 } 218 try task.await(io); 219} 220 221fn sleepyPop(io: std.Io, q: *Queue(u8, 2), state: *atomic.Value(u8)) !void { 222 // First we wait for the queue to be full. 223 while (state.load(.acquire) < 1) 224 try Thread.yield(); 225 226 // Then we spuriously wake it up, because that's a thing that can 227 // happen. 228 q.not_full.signal(io); 229 q.not_empty.signal(io); 230 231 // Then give the other thread a good chance of waking up. It's not 232 // clear that yield guarantees the other thread will be scheduled, 233 // so we'll throw a sleep in here just to be sure. The queue is 234 // still full and the push in the other thread is still blocked 235 // waiting for space. 236 try Thread.yield(); 237 try io.sleep(.fromMilliseconds(10), .real); 238 // Finally, let that other thread go. 239 try std.testing.expectEqual(1, q.pop()); 240 241 // Wait for the other thread to signal it's ready for second push 242 while (state.load(.acquire) < 2) 243 try Thread.yield(); 244 // But we want to ensure that there's a second push waiting, so 245 // here's another sleep. 246 try io.sleep(.fromMilliseconds(10), .real); 247 248 // Another spurious wake... 249 q.not_full.signal(io); 250 q.not_empty.signal(io); 251 // And another chance for the other thread to see that it's 252 // spurious and go back to sleep. 253 try Thread.yield(); 254 try io.sleep(.fromMilliseconds(10), .real); 255 256 // Pop that thing and we're done. 257 try std.testing.expectEqual(2, q.pop()); 258} 259 260test "Fill, block, fill, block" { 261 const io = std.testing.io; 262 263 // Fill the queue, block while trying to write another item, have 264 // a background thread unblock us, then block while trying to 265 // write yet another thing. Have the background thread unblock 266 // that too (after some time) then drain the queue. This test 267 // fails if the while loop in `push` is turned into an `if`. 268 269 var queue: Queue(u8, 2) = .init(io); 270 var state = atomic.Value(u8).init(0); 271 var task = try io.concurrent(sleepyPop, .{ io, &queue, &state }); 272 try queue.push(1); 273 try queue.push(2); 274 state.store(1, .release); 275 const now = std.Io.Timestamp.now(io, .real).toMilliseconds(); 276 try queue.push(3); // This one should block. 277 const then = std.Io.Timestamp.now(io, .real).toMilliseconds(); 278 279 // Just to make sure the sleeps are yielding to this thread, make 280 // sure it took at least 5ms to do the push. 281 try std.testing.expect(then - now > 5); 282 283 state.store(2, .release); 284 // This should block again, waiting for the other thread. 285 try queue.push(4); 286 287 // And once that push has gone through, the other thread's done. 288 try task.await(io); 289 try std.testing.expectEqual(3, try queue.pop()); 290 try std.testing.expectEqual(4, try queue.pop()); 291} 292 293fn sleepyPush(io: std.Io, q: *Queue(u8, 1), state: *atomic.Value(u8)) !void { 294 // Try to ensure the other thread has already started trying to pop. 295 try Thread.yield(); 296 try io.sleep(.fromMilliseconds(10), .real); 297 298 // Spurious wake 299 q.not_full.signal(io); 300 q.not_empty.signal(io); 301 302 try Thread.yield(); 303 try io.sleep(.fromMilliseconds(10), .real); 304 305 // Stick something in the queue so it can be popped. 306 try q.push(1); 307 // Ensure it's been popped. 308 while (state.load(.acquire) < 1) 309 try Thread.yield(); 310 // Give the other thread time to block again. 311 try Thread.yield(); 312 try io.sleep(.fromMilliseconds(10), .real); 313 314 // Spurious wake 315 q.not_full.signal(io); 316 q.not_empty.signal(io); 317 318 try q.push(2); 319} 320 321test "Drain, block, drain, block" { 322 const io = std.testing.io; 323 324 // This is like fill/block/fill/block, but on the pop end. This 325 // test should fail if the `while` loop in `pop` is turned into an 326 // `if`. 327 328 var queue: Queue(u8, 1) = .init(io); 329 var state = atomic.Value(u8).init(0); 330 var task = try io.concurrent(sleepyPush, .{ io, &queue, &state }); 331 try std.testing.expectEqual(1, queue.pop()); 332 state.store(1, .release); 333 try std.testing.expectEqual(2, queue.pop()); 334 try task.await(io); 335} 336 337fn readerThread(q: *Queue(u8, 1)) !void { 338 try testing.expectEqual(1, try q.pop()); 339} 340 341test "2 readers" { 342 const io = std.testing.io; 343 // 2 threads read, one thread writes 344 var queue: Queue(u8, 1) = .init(io); 345 var t1 = try io.concurrent(readerThread, .{&queue}); 346 defer t1.cancel(io) catch {}; 347 var t2 = try io.concurrent(readerThread, .{&queue}); 348 defer t2.cancel(io) catch {}; 349 try Thread.yield(); 350 try io.sleep(.fromMilliseconds(10), .real); 351 try queue.push(1); 352 try queue.push(1); 353 try t1.await(io); 354 try t2.await(io); 355} 356 357fn writerThread(q: *Queue(u8, 1)) !void { 358 try q.push(1); 359} 360 361test "2 writers" { 362 const io = std.testing.io; 363 364 var queue: Queue(u8, 1) = .init(io); 365 var t1 = try io.concurrent(writerThread, .{&queue}); 366 var t2 = try io.concurrent(writerThread, .{&queue}); 367 368 try testing.expectEqual(1, try queue.pop()); 369 try testing.expectEqual(1, try queue.pop()); 370 try t1.await(io); 371 try t2.await(io); 372} 373 374test { 375 std.testing.refAllDecls(@This()); 376}