this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 990fe29b38e57de69733025ab580b8917c2d092f 345 lines 11 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 if (self.isFullLH()) { 34 // If we are full, wake up a push that might be 35 // waiting here. 36 self.not_full.signal(); 37 } 38 39 return self.popLH(); 40 } 41 42 /// Push an item into the queue. Blocks until an item has been 43 /// put in the queue. 44 pub fn push(self: *Self, item: T) void { 45 self.mutex.lock(); 46 defer self.mutex.unlock(); 47 while (self.isFullLH()) { 48 self.not_full.wait(&self.mutex); 49 } 50 std.debug.assert(!self.isFullLH()); 51 const was_empty = self.isEmptyLH(); 52 53 self.buf[self.mask(self.write_index)] = item; 54 self.write_index = self.mask2(self.write_index + 1); 55 56 // If we were empty, wake up a pop if it was waiting. 57 if (was_empty) { 58 self.not_empty.signal(); 59 } 60 } 61 62 /// Push an item into the queue. Returns true when the item 63 /// was successfully placed in the queue, false if the queue 64 /// was full. 65 pub fn tryPush(self: *Self, item: T) bool { 66 self.mutex.lock(); 67 if (self.isFullLH()) { 68 self.mutex.unlock(); 69 return false; 70 } 71 self.mutex.unlock(); 72 self.push(item); 73 return true; 74 } 75 76 /// Pop an item from the queue. Returns null when no item is 77 /// available. 78 pub fn tryPop(self: *Self) ?T { 79 self.mutex.lock(); 80 if (self.isEmptyLH()) { 81 self.mutex.unlock(); 82 return null; 83 } 84 self.mutex.unlock(); 85 return self.pop(); 86 } 87 88 /// Poll the queue. This call blocks until events are in the queue 89 pub fn poll(self: *Self) void { 90 self.mutex.lock(); 91 defer self.mutex.unlock(); 92 while (self.isEmptyLH()) { 93 self.not_empty.wait(&self.mutex); 94 } 95 std.debug.assert(!self.isEmptyLH()); 96 } 97 98 pub fn lock(self: *Self) void { 99 self.mutex.lock(); 100 } 101 102 pub fn unlock(self: *Self) void { 103 self.mutex.unlock(); 104 } 105 106 /// Used to efficiently drain the queue while the lock is externally held 107 pub fn drain(self: *Self) ?T { 108 if (self.isEmptyLH()) return null; 109 return self.popLH(); 110 } 111 112 fn isEmptyLH(self: Self) bool { 113 return self.write_index == self.read_index; 114 } 115 116 fn isFullLH(self: Self) bool { 117 return self.mask2(self.write_index + self.buf.len) == 118 self.read_index; 119 } 120 121 /// Returns `true` if the queue is empty and `false` otherwise. 122 pub fn isEmpty(self: *Self) bool { 123 self.mutex.lock(); 124 defer self.mutex.unlock(); 125 return self.isEmptyLH(); 126 } 127 128 /// Returns `true` if the queue is full and `false` otherwise. 129 pub fn isFull(self: *Self) bool { 130 self.mutex.lock(); 131 defer self.mutex.unlock(); 132 return self.isFullLH(); 133 } 134 135 /// Returns the length 136 fn len(self: Self) usize { 137 const wrap_offset = 2 * self.buf.len * 138 @intFromBool(self.write_index < self.read_index); 139 const adjusted_write_index = self.write_index + wrap_offset; 140 return adjusted_write_index - self.read_index; 141 } 142 143 /// Returns `index` modulo the length of the backing slice. 144 fn mask(self: Self, index: usize) usize { 145 return index % self.buf.len; 146 } 147 148 /// Returns `index` modulo twice the length of the backing slice. 149 fn mask2(self: Self, index: usize) usize { 150 return index % (2 * self.buf.len); 151 } 152 153 fn popLH(self: *Self) T { 154 const result = self.buf[self.mask(self.read_index)]; 155 self.read_index = self.mask2(self.read_index + 1); 156 return result; 157 } 158 }; 159} 160 161const testing = std.testing; 162const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 163test "Queue: simple push / pop" { 164 var queue: Queue(u8, 16) = .{}; 165 queue.push(1); 166 queue.push(2); 167 const pop = queue.pop(); 168 try testing.expectEqual(1, pop); 169 try testing.expectEqual(2, queue.pop()); 170} 171 172const Thread = std.Thread; 173fn testPushPop(q: *Queue(u8, 2)) !void { 174 q.push(3); 175 try testing.expectEqual(2, q.pop()); 176} 177 178test "Fill, wait to push, pop once in another thread" { 179 var queue: Queue(u8, 2) = .{}; 180 queue.push(1); 181 queue.push(2); 182 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 183 try testing.expectEqual(false, queue.tryPush(3)); 184 try testing.expectEqual(1, queue.pop()); 185 t.join(); 186 try testing.expectEqual(3, queue.pop()); 187 try testing.expectEqual(null, queue.tryPop()); 188} 189 190fn testPush(q: *Queue(u8, 2)) void { 191 q.push(0); 192 q.push(1); 193 q.push(2); 194 q.push(3); 195 q.push(4); 196} 197 198test "Try to pop, fill from another thread" { 199 var queue: Queue(u8, 2) = .{}; 200 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 201 for (0..5) |idx| { 202 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 203 } 204 thread.join(); 205} 206 207fn sleepyPop(q: *Queue(u8, 2)) !void { 208 // First we wait for the queue to be full. 209 while (!q.isFull()) 210 try Thread.yield(); 211 212 // Then we spuriously wake it up, because that's a thing that can 213 // happen. 214 q.not_full.signal(); 215 q.not_empty.signal(); 216 217 // Then give the other thread a good chance of waking up. It's not 218 // clear that yield guarantees the other thread will be scheduled, 219 // so we'll throw a sleep in here just to be sure. The queue is 220 // still full and the push in the other thread is still blocked 221 // waiting for space. 222 try Thread.yield(); 223 std.Thread.sleep(std.time.ns_per_s); 224 // Finally, let that other thread go. 225 try std.testing.expectEqual(1, q.pop()); 226 227 // This won't continue until the other thread has had a chance to 228 // put at least one item in the queue. 229 while (!q.isFull()) 230 try Thread.yield(); 231 // But we want to ensure that there's a second push waiting, so 232 // here's another sleep. 233 std.Thread.sleep(std.time.ns_per_s / 2); 234 235 // Another spurious wake... 236 q.not_full.signal(); 237 q.not_empty.signal(); 238 // And another chance for the other thread to see that it's 239 // spurious and go back to sleep. 240 try Thread.yield(); 241 std.Thread.sleep(std.time.ns_per_s / 2); 242 243 // Pop that thing and we're done. 244 try std.testing.expectEqual(2, q.pop()); 245} 246 247test "Fill, block, fill, block" { 248 // Fill the queue, block while trying to write another item, have 249 // a background thread unblock us, then block while trying to 250 // write yet another thing. Have the background thread unblock 251 // that too (after some time) then drain the queue. This test 252 // fails if the while loop in `push` is turned into an `if`. 253 254 var queue: Queue(u8, 2) = .{}; 255 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue}); 256 queue.push(1); 257 queue.push(2); 258 const now = std.time.milliTimestamp(); 259 queue.push(3); // This one should block. 260 const then = std.time.milliTimestamp(); 261 262 // Just to make sure the sleeps are yielding to this thread, make 263 // sure it took at least 900ms to do the push. 264 try std.testing.expect(then - now > 900); 265 266 // This should block again, waiting for the other thread. 267 queue.push(4); 268 269 // And once that push has gone through, the other thread's done. 270 thread.join(); 271 try std.testing.expectEqual(3, queue.pop()); 272 try std.testing.expectEqual(4, queue.pop()); 273} 274 275fn sleepyPush(q: *Queue(u8, 1)) !void { 276 // Try to ensure the other thread has already started trying to pop. 277 try Thread.yield(); 278 std.Thread.sleep(std.time.ns_per_s / 2); 279 280 // Spurious wake 281 q.not_full.signal(); 282 q.not_empty.signal(); 283 284 try Thread.yield(); 285 std.Thread.sleep(std.time.ns_per_s / 2); 286 287 // Stick something in the queue so it can be popped. 288 q.push(1); 289 // Ensure it's been popped. 290 while (!q.isEmpty()) 291 try Thread.yield(); 292 // Give the other thread time to block again. 293 try Thread.yield(); 294 std.Thread.sleep(std.time.ns_per_s / 2); 295 296 // Spurious wake 297 q.not_full.signal(); 298 q.not_empty.signal(); 299 300 q.push(2); 301} 302 303test "Drain, block, drain, block" { 304 // This is like fill/block/fill/block, but on the pop end. This 305 // test should fail if the `while` loop in `pop` is turned into an 306 // `if`. 307 308 var queue: Queue(u8, 1) = .{}; 309 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue}); 310 try std.testing.expectEqual(1, queue.pop()); 311 try std.testing.expectEqual(2, queue.pop()); 312 thread.join(); 313} 314 315fn readerThread(q: *Queue(u8, 1)) !void { 316 try testing.expectEqual(1, q.pop()); 317} 318 319test "2 readers" { 320 // 2 threads read, one thread writes 321 var queue: Queue(u8, 1) = .{}; 322 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 323 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 324 try Thread.yield(); 325 std.Thread.sleep(std.time.ns_per_s / 2); 326 queue.push(1); 327 queue.push(1); 328 t1.join(); 329 t2.join(); 330} 331 332fn writerThread(q: *Queue(u8, 1)) !void { 333 q.push(1); 334} 335 336test "2 writers" { 337 var queue: Queue(u8, 1) = .{}; 338 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 339 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 340 341 try testing.expectEqual(1, queue.pop()); 342 try testing.expectEqual(1, queue.pop()); 343 t1.join(); 344 t2.join(); 345}