this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at bf4e6ae3d396e62f1be2c1fde61dae1cbbd2aa57 327 lines 10 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 if (self.isFullLH()) { 34 // If we are full, wake up a push that might be 35 // waiting here. 36 self.not_full.signal(); 37 } 38 39 const result = self.buf[self.mask(self.read_index)]; 40 self.read_index = self.mask2(self.read_index + 1); 41 return result; 42 } 43 44 /// Push an item into the queue. Blocks until an item has been 45 /// put in the queue. 46 pub fn push(self: *Self, item: T) void { 47 self.mutex.lock(); 48 defer self.mutex.unlock(); 49 while (self.isFullLH()) { 50 self.not_full.wait(&self.mutex); 51 } 52 std.debug.assert(!self.isFullLH()); 53 const was_empty = self.isEmptyLH(); 54 55 self.buf[self.mask(self.write_index)] = item; 56 self.write_index = self.mask2(self.write_index + 1); 57 58 // If we were empty, wake up a pop if it was waiting. 59 if (was_empty) { 60 self.not_empty.signal(); 61 } 62 } 63 64 /// Push an item into the queue. Returns true when the item 65 /// was successfully placed in the queue, false if the queue 66 /// was full. 67 pub fn tryPush(self: *Self, item: T) bool { 68 self.mutex.lock(); 69 if (self.isFullLH()) { 70 self.mutex.unlock(); 71 return false; 72 } 73 self.mutex.unlock(); 74 self.push(item); 75 return true; 76 } 77 78 /// Pop an item from the queue. Returns null when no item is 79 /// available. 80 pub fn tryPop(self: *Self) ?T { 81 self.mutex.lock(); 82 if (self.isEmptyLH()) { 83 self.mutex.unlock(); 84 return null; 85 } 86 self.mutex.unlock(); 87 return self.pop(); 88 } 89 90 /// Poll the queue. This call blocks until events are in the queue 91 pub fn poll(self: *Self) void { 92 self.mutex.lock(); 93 defer self.mutex.unlock(); 94 while (self.isEmptyLH()) { 95 self.not_empty.wait(&self.mutex); 96 } 97 std.debug.assert(!self.isEmptyLH()); 98 } 99 100 fn isEmptyLH(self: Self) bool { 101 return self.write_index == self.read_index; 102 } 103 104 fn isFullLH(self: Self) bool { 105 return self.mask2(self.write_index + self.buf.len) == 106 self.read_index; 107 } 108 109 /// Returns `true` if the queue is empty and `false` otherwise. 110 pub fn isEmpty(self: *Self) bool { 111 self.mutex.lock(); 112 defer self.mutex.unlock(); 113 return self.isEmptyLH(); 114 } 115 116 /// Returns `true` if the queue is full and `false` otherwise. 117 pub fn isFull(self: *Self) bool { 118 self.mutex.lock(); 119 defer self.mutex.unlock(); 120 return self.isFullLH(); 121 } 122 123 /// Returns the length 124 fn len(self: Self) usize { 125 const wrap_offset = 2 * self.buf.len * 126 @intFromBool(self.write_index < self.read_index); 127 const adjusted_write_index = self.write_index + wrap_offset; 128 return adjusted_write_index - self.read_index; 129 } 130 131 /// Returns `index` modulo the length of the backing slice. 132 fn mask(self: Self, index: usize) usize { 133 return index % self.buf.len; 134 } 135 136 /// Returns `index` modulo twice the length of the backing slice. 137 fn mask2(self: Self, index: usize) usize { 138 return index % (2 * self.buf.len); 139 } 140 }; 141} 142 143const testing = std.testing; 144const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 145test "Queue: simple push / pop" { 146 var queue: Queue(u8, 16) = .{}; 147 queue.push(1); 148 queue.push(2); 149 const pop = queue.pop(); 150 try testing.expectEqual(1, pop); 151 try testing.expectEqual(2, queue.pop()); 152} 153 154const Thread = std.Thread; 155fn testPushPop(q: *Queue(u8, 2)) !void { 156 q.push(3); 157 try testing.expectEqual(2, q.pop()); 158} 159 160test "Fill, wait to push, pop once in another thread" { 161 var queue: Queue(u8, 2) = .{}; 162 queue.push(1); 163 queue.push(2); 164 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 165 try testing.expectEqual(false, queue.tryPush(3)); 166 try testing.expectEqual(1, queue.pop()); 167 t.join(); 168 try testing.expectEqual(3, queue.pop()); 169 try testing.expectEqual(null, queue.tryPop()); 170} 171 172fn testPush(q: *Queue(u8, 2)) void { 173 q.push(0); 174 q.push(1); 175 q.push(2); 176 q.push(3); 177 q.push(4); 178} 179 180test "Try to pop, fill from another thread" { 181 var queue: Queue(u8, 2) = .{}; 182 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 183 for (0..5) |idx| { 184 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 185 } 186 thread.join(); 187} 188 189fn sleepyPop(q: *Queue(u8, 2)) !void { 190 // First we wait for the queue to be full. 191 while (!q.isFull()) 192 try Thread.yield(); 193 194 // Then we spuriously wake it up, because that's a thing that can 195 // happen. 196 q.not_full.signal(); 197 q.not_empty.signal(); 198 199 // Then give the other thread a good chance of waking up. It's not 200 // clear that yield guarantees the other thread will be scheduled, 201 // so we'll throw a sleep in here just to be sure. The queue is 202 // still full and the push in the other thread is still blocked 203 // waiting for space. 204 try Thread.yield(); 205 std.time.sleep(std.time.ns_per_s); 206 // Finally, let that other thread go. 207 try std.testing.expectEqual(1, q.pop()); 208 209 // This won't continue until the other thread has had a chance to 210 // put at least one item in the queue. 211 while (!q.isFull()) 212 try Thread.yield(); 213 // But we want to ensure that there's a second push waiting, so 214 // here's another sleep. 215 std.time.sleep(std.time.ns_per_s / 2); 216 217 // Another spurious wake... 218 q.not_full.signal(); 219 q.not_empty.signal(); 220 // And another chance for the other thread to see that it's 221 // spurious and go back to sleep. 222 try Thread.yield(); 223 std.time.sleep(std.time.ns_per_s / 2); 224 225 // Pop that thing and we're done. 226 try std.testing.expectEqual(2, q.pop()); 227} 228 229test "Fill, block, fill, block" { 230 // Fill the queue, block while trying to write another item, have 231 // a background thread unblock us, then block while trying to 232 // write yet another thing. Have the background thread unblock 233 // that too (after some time) then drain the queue. This test 234 // fails if the while loop in `push` is turned into an `if`. 235 236 var queue: Queue(u8, 2) = .{}; 237 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue}); 238 queue.push(1); 239 queue.push(2); 240 const now = std.time.milliTimestamp(); 241 queue.push(3); // This one should block. 242 const then = std.time.milliTimestamp(); 243 244 // Just to make sure the sleeps are yielding to this thread, make 245 // sure it took at least 900ms to do the push. 246 try std.testing.expect(then - now > 900); 247 248 // This should block again, waiting for the other thread. 249 queue.push(4); 250 251 // And once that push has gone through, the other thread's done. 252 thread.join(); 253 try std.testing.expectEqual(3, queue.pop()); 254 try std.testing.expectEqual(4, queue.pop()); 255} 256 257fn sleepyPush(q: *Queue(u8, 1)) !void { 258 // Try to ensure the other thread has already started trying to pop. 259 try Thread.yield(); 260 std.time.sleep(std.time.ns_per_s / 2); 261 262 // Spurious wake 263 q.not_full.signal(); 264 q.not_empty.signal(); 265 266 try Thread.yield(); 267 std.time.sleep(std.time.ns_per_s / 2); 268 269 // Stick something in the queue so it can be popped. 270 q.push(1); 271 // Ensure it's been popped. 272 while (!q.isEmpty()) 273 try Thread.yield(); 274 // Give the other thread time to block again. 275 try Thread.yield(); 276 std.time.sleep(std.time.ns_per_s / 2); 277 278 // Spurious wake 279 q.not_full.signal(); 280 q.not_empty.signal(); 281 282 q.push(2); 283} 284 285test "Drain, block, drain, block" { 286 // This is like fill/block/fill/block, but on the pop end. This 287 // test should fail if the `while` loop in `pop` is turned into an 288 // `if`. 289 290 var queue: Queue(u8, 1) = .{}; 291 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue}); 292 try std.testing.expectEqual(1, queue.pop()); 293 try std.testing.expectEqual(2, queue.pop()); 294 thread.join(); 295} 296 297fn readerThread(q: *Queue(u8, 1)) !void { 298 try testing.expectEqual(1, q.pop()); 299} 300 301test "2 readers" { 302 // 2 threads read, one thread writes 303 var queue: Queue(u8, 1) = .{}; 304 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 305 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 306 try Thread.yield(); 307 std.time.sleep(std.time.ns_per_s / 2); 308 queue.push(1); 309 queue.push(1); 310 t1.join(); 311 t2.join(); 312} 313 314fn writerThread(q: *Queue(u8, 1)) !void { 315 q.push(1); 316} 317 318test "2 writers" { 319 var queue: Queue(u8, 1) = .{}; 320 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 321 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 322 323 try testing.expectEqual(1, queue.pop()); 324 try testing.expectEqual(1, queue.pop()); 325 t1.join(); 326 t2.join(); 327}