Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

workqueue: Implement disable/enable for (delayed) work items

While (delayed) work items could be flushed and canceled, there was no way
to prevent them from being queued in the future. While this didn't lead to
functional deficiencies, it sometimes required a bit more effort from the
workqueue users to e.g. sequence shutdown steps with more care.

Workqueue is currently in the process of replacing tasklet which does
support disabling and enabling. The feature is used relatively widely to,
for example, temporarily suppress main path while a control plane operation
(reset or config change) is in progress.

To enable easy conversion of tasklet users and as it seems like an inherent
useful feature, this patch implements disabling and enabling of work items.

- A work item carries 16bit disable count in work->data while not queued.
The access to the count is synchronized by the PENDING bit like all other
parts of work->data.

- If the count is non-zero, the work item cannot be queued. Any attempt to
queue the work item fails and returns %false.

- disable_work[_sync](), enable_work(), disable_delayed_work[_sync]() and
enable_delayed_work() are added.

v3: enable_work() was using local_irq_enable() instead of
local_irq_restore() to undo IRQ-disable by work_grab_pending(). This is
awkward now and will become incorrect as enable_work() will later be
used from IRQ context too. (Lai)

v2: Lai noticed that queue_work_node() wasn't checking the disable count.
Fixed. queue_rcu_work() is updated to trigger warning if the inner work
item is disabled.

Signed-off-by: Tejun Heo <tj@kernel.org>
Reviewed-by: Lai Jiangshan <jiangshanlai@gmail.com>

+182 -13
+15 -3
include/linux/workqueue.h
··· 51 51 * data contains off-queue information when !WORK_STRUCT_PWQ. 52 52 * 53 53 * MSB 54 - * [ pool ID ] [ OFFQ flags ] [ STRUCT flags ] 55 - * 1 bit 4 or 5 bits 54 + * [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ] 55 + * 16 bits 1 bit 4 or 5 bits 56 56 */ 57 57 WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS, 58 58 WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT, 59 59 WORK_OFFQ_FLAG_END, 60 60 WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT, 61 61 62 + WORK_OFFQ_DISABLE_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS, 63 + WORK_OFFQ_DISABLE_BITS = 16, 64 + 62 65 /* 63 66 * When a work item is off queue, the high bits encode off-queue flags 64 67 * and the last pool it was on. Cap pool ID to 31 bits and use the 65 68 * highest number to indicate that no pool is associated. 66 69 */ 67 - WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS, 70 + WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_DISABLE_SHIFT + WORK_OFFQ_DISABLE_BITS, 68 71 WORK_OFFQ_LEFT = BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT, 69 72 WORK_OFFQ_POOL_BITS = WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31, 70 73 }; ··· 101 98 /* Convenience constants - of type 'unsigned long', not 'enum'! */ 102 99 #define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT) 103 100 #define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT) 101 + #define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT) 104 102 #define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1) 105 103 #define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT) 106 104 #define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1)) ··· 563 559 extern bool flush_delayed_work(struct delayed_work *dwork); 564 560 extern bool cancel_delayed_work(struct delayed_work *dwork); 565 561 extern bool cancel_delayed_work_sync(struct delayed_work *dwork); 562 + 563 + extern bool disable_work(struct work_struct *work); 564 + extern bool disable_work_sync(struct work_struct *work); 565 + extern bool enable_work(struct work_struct *work); 566 + 567 + extern bool disable_delayed_work(struct delayed_work *dwork); 568 + extern bool disable_delayed_work_sync(struct delayed_work *dwork); 569 + extern bool enable_delayed_work(struct delayed_work *dwork); 566 570 567 571 extern bool flush_rcu_work(struct rcu_work *rwork); 568 572
+167 -10
kernel/workqueue.c
··· 99 99 100 100 enum work_cancel_flags { 101 101 WORK_CANCEL_DELAYED = 1 << 0, /* canceling a delayed_work */ 102 + WORK_CANCEL_DISABLE = 1 << 1, /* canceling to disable */ 102 103 }; 103 104 104 105 enum wq_internal_consts { ··· 395 394 396 395 struct work_offq_data { 397 396 u32 pool_id; 397 + u32 disable; 398 398 u32 flags; 399 399 }; 400 400 ··· 910 908 911 909 offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT, 912 910 WORK_OFFQ_POOL_BITS); 911 + offqd->disable = shift_and_mask(data, WORK_OFFQ_DISABLE_SHIFT, 912 + WORK_OFFQ_DISABLE_BITS); 913 913 offqd->flags = data & WORK_OFFQ_FLAG_MASK; 914 914 } 915 915 916 916 static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd) 917 917 { 918 - return (unsigned long)offqd->flags; 918 + return ((unsigned long)offqd->disable << WORK_OFFQ_DISABLE_SHIFT) | 919 + ((unsigned long)offqd->flags); 919 920 } 920 921 921 922 static bool work_is_canceling(struct work_struct *work) ··· 2413 2408 rcu_read_unlock(); 2414 2409 } 2415 2410 2411 + static bool clear_pending_if_disabled(struct work_struct *work) 2412 + { 2413 + unsigned long data = *work_data_bits(work); 2414 + struct work_offq_data offqd; 2415 + 2416 + if (likely((data & WORK_STRUCT_PWQ) || 2417 + !(data & WORK_OFFQ_DISABLE_MASK))) 2418 + return false; 2419 + 2420 + work_offqd_unpack(&offqd, data); 2421 + set_work_pool_and_clear_pending(work, offqd.pool_id, 2422 + work_offqd_pack_flags(&offqd)); 2423 + return true; 2424 + } 2425 + 2416 2426 /** 2417 2427 * queue_work_on - queue work on specific cpu 2418 2428 * @cpu: CPU number to execute work on ··· 2450 2430 2451 2431 local_irq_save(irq_flags); 2452 2432 2453 - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 2433 + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) && 2434 + !clear_pending_if_disabled(work)) { 2454 2435 __queue_work(cpu, wq, work); 2455 2436 ret = true; 2456 2437 } ··· 2529 2508 2530 2509 local_irq_save(irq_flags); 2531 2510 2532 - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 2511 + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) && 2512 + !clear_pending_if_disabled(work)) { 2533 2513 int cpu = select_numa_node_cpu(node); 2534 2514 2535 2515 __queue_work(cpu, wq, work); ··· 2612 2590 /* read the comment in __queue_work() */ 2613 2591 local_irq_save(irq_flags); 2614 2592 2615 - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 2593 + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) && 2594 + !clear_pending_if_disabled(work)) { 2616 2595 __queue_delayed_work(cpu, wq, dwork, delay); 2617 2596 ret = true; 2618 2597 } ··· 2686 2663 { 2687 2664 struct work_struct *work = &rwork->work; 2688 2665 2689 - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 2666 + /* 2667 + * rcu_work can't be canceled or disabled. Warn if the user reached 2668 + * inside @rwork and disabled the inner work. 2669 + */ 2670 + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) && 2671 + !WARN_ON_ONCE(clear_pending_if_disabled(work))) { 2690 2672 rwork->wq = wq; 2691 2673 call_rcu_hurry(&rwork->rcu, rcu_work_rcufn); 2692 2674 return true; ··· 4296 4268 } 4297 4269 EXPORT_SYMBOL(flush_rcu_work); 4298 4270 4271 + static void work_offqd_disable(struct work_offq_data *offqd) 4272 + { 4273 + const unsigned long max = (1lu << WORK_OFFQ_DISABLE_BITS) - 1; 4274 + 4275 + if (likely(offqd->disable < max)) 4276 + offqd->disable++; 4277 + else 4278 + WARN_ONCE(true, "workqueue: work disable count overflowed\n"); 4279 + } 4280 + 4281 + static void work_offqd_enable(struct work_offq_data *offqd) 4282 + { 4283 + if (likely(offqd->disable > 0)) 4284 + offqd->disable--; 4285 + else 4286 + WARN_ONCE(true, "workqueue: work disable count underflowed\n"); 4287 + } 4288 + 4299 4289 static bool __cancel_work(struct work_struct *work, u32 cflags) 4300 4290 { 4301 4291 struct work_offq_data offqd; 4302 4292 unsigned long irq_flags; 4303 4293 int ret; 4304 4294 4305 - do { 4306 - ret = try_to_grab_pending(work, cflags, &irq_flags); 4307 - } while (unlikely(ret == -EAGAIN)); 4295 + if (cflags & WORK_CANCEL_DISABLE) { 4296 + ret = work_grab_pending(work, cflags, &irq_flags); 4297 + } else { 4298 + do { 4299 + ret = try_to_grab_pending(work, cflags, &irq_flags); 4300 + } while (unlikely(ret == -EAGAIN)); 4308 4301 4309 - if (unlikely(ret < 0)) 4310 - return false; 4302 + if (unlikely(ret < 0)) 4303 + return false; 4304 + } 4311 4305 4312 4306 work_offqd_unpack(&offqd, *work_data_bits(work)); 4307 + 4308 + if (cflags & WORK_CANCEL_DISABLE) 4309 + work_offqd_disable(&offqd); 4310 + 4313 4311 set_work_pool_and_clear_pending(work, offqd.pool_id, 4314 4312 work_offqd_pack_flags(&offqd)); 4315 4313 local_irq_restore(irq_flags); ··· 4352 4298 ret = work_grab_pending(work, cflags, &irq_flags); 4353 4299 4354 4300 work_offqd_unpack(&offqd, *work_data_bits(work)); 4301 + 4302 + if (cflags & WORK_CANCEL_DISABLE) 4303 + work_offqd_disable(&offqd); 4304 + 4355 4305 offqd.flags |= WORK_OFFQ_CANCELING; 4356 4306 set_work_pool_and_keep_pending(work, offqd.pool_id, 4357 4307 work_offqd_pack_flags(&offqd)); ··· 4454 4396 return __cancel_work_sync(&dwork->work, WORK_CANCEL_DELAYED); 4455 4397 } 4456 4398 EXPORT_SYMBOL(cancel_delayed_work_sync); 4399 + 4400 + /** 4401 + * disable_work - Disable and cancel a work item 4402 + * @work: work item to disable 4403 + * 4404 + * Disable @work by incrementing its disable count and cancel it if currently 4405 + * pending. As long as the disable count is non-zero, any attempt to queue @work 4406 + * will fail and return %false. The maximum supported disable depth is 2 to the 4407 + * power of %WORK_OFFQ_DISABLE_BITS, currently 65536. 4408 + * 4409 + * Must be called from a sleepable context. Returns %true if @work was pending, 4410 + * %false otherwise. 4411 + */ 4412 + bool disable_work(struct work_struct *work) 4413 + { 4414 + return __cancel_work(work, WORK_CANCEL_DISABLE); 4415 + } 4416 + EXPORT_SYMBOL_GPL(disable_work); 4417 + 4418 + /** 4419 + * disable_work_sync - Disable, cancel and drain a work item 4420 + * @work: work item to disable 4421 + * 4422 + * Similar to disable_work() but also wait for @work to finish if currently 4423 + * executing. 4424 + * 4425 + * Must be called from a sleepable context. Returns %true if @work was pending, 4426 + * %false otherwise. 4427 + */ 4428 + bool disable_work_sync(struct work_struct *work) 4429 + { 4430 + return __cancel_work_sync(work, WORK_CANCEL_DISABLE); 4431 + } 4432 + EXPORT_SYMBOL_GPL(disable_work_sync); 4433 + 4434 + /** 4435 + * enable_work - Enable a work item 4436 + * @work: work item to enable 4437 + * 4438 + * Undo disable_work[_sync]() by decrementing @work's disable count. @work can 4439 + * only be queued if its disable count is 0. 4440 + * 4441 + * Must be called from a sleepable context. Returns %true if the disable count 4442 + * reached 0. Otherwise, %false. 4443 + */ 4444 + bool enable_work(struct work_struct *work) 4445 + { 4446 + struct work_offq_data offqd; 4447 + unsigned long irq_flags; 4448 + 4449 + work_grab_pending(work, 0, &irq_flags); 4450 + 4451 + work_offqd_unpack(&offqd, *work_data_bits(work)); 4452 + work_offqd_enable(&offqd); 4453 + set_work_pool_and_clear_pending(work, offqd.pool_id, 4454 + work_offqd_pack_flags(&offqd)); 4455 + local_irq_restore(irq_flags); 4456 + 4457 + return !offqd.disable; 4458 + } 4459 + EXPORT_SYMBOL_GPL(enable_work); 4460 + 4461 + /** 4462 + * disable_delayed_work - Disable and cancel a delayed work item 4463 + * @dwork: delayed work item to disable 4464 + * 4465 + * disable_work() for delayed work items. 4466 + */ 4467 + bool disable_delayed_work(struct delayed_work *dwork) 4468 + { 4469 + return __cancel_work(&dwork->work, 4470 + WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE); 4471 + } 4472 + EXPORT_SYMBOL_GPL(disable_delayed_work); 4473 + 4474 + /** 4475 + * disable_delayed_work_sync - Disable, cancel and drain a delayed work item 4476 + * @dwork: delayed work item to disable 4477 + * 4478 + * disable_work_sync() for delayed work items. 4479 + */ 4480 + bool disable_delayed_work_sync(struct delayed_work *dwork) 4481 + { 4482 + return __cancel_work_sync(&dwork->work, 4483 + WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE); 4484 + } 4485 + EXPORT_SYMBOL_GPL(disable_delayed_work_sync); 4486 + 4487 + /** 4488 + * enable_delayed_work - Enable a delayed work item 4489 + * @dwork: delayed work item to enable 4490 + * 4491 + * enable_work() for delayed work items. 4492 + */ 4493 + bool enable_delayed_work(struct delayed_work *dwork) 4494 + { 4495 + return enable_work(&dwork->work); 4496 + } 4497 + EXPORT_SYMBOL_GPL(enable_delayed_work); 4457 4498 4458 4499 /** 4459 4500 * schedule_on_each_cpu - execute a function synchronously on each online CPU