Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge tag 'locking-urgent-2021-11-28' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip

Pull locking fixes from Thomas Gleixner:
"Two regression fixes for reader writer semaphores:

- Plug a race in the lock handoff which is caused by inconsistency of
the reader and writer path and can lead to corruption of the
underlying counter.

- down_read_trylock() is suboptimal when the lock is contended and
multiple readers trylock concurrently. That's due to the initial
value being read non-atomically which results in at least two
compare exchange loops. Making the initial readout atomic reduces
this significantly. Whith 40 readers by 11% in a benchmark which
enforces contention on mmap_sem"

* tag 'locking-urgent-2021-11-28' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip:
locking/rwsem: Optimize down_read_trylock() under highly contended case
locking/rwsem: Make handoff bit handling more consistent

+91 -95
+91 -95
kernel/locking/rwsem.c
··· 105 105 * atomic_long_cmpxchg() will be used to obtain writer lock. 106 106 * 107 107 * There are three places where the lock handoff bit may be set or cleared. 108 - * 1) rwsem_mark_wake() for readers. 109 - * 2) rwsem_try_write_lock() for writers. 110 - * 3) Error path of rwsem_down_write_slowpath(). 108 + * 1) rwsem_mark_wake() for readers -- set, clear 109 + * 2) rwsem_try_write_lock() for writers -- set, clear 110 + * 3) rwsem_del_waiter() -- clear 111 111 * 112 112 * For all the above cases, wait_lock will be held. A writer must also 113 113 * be the first one in the wait_list to be eligible for setting the handoff ··· 334 334 struct task_struct *task; 335 335 enum rwsem_waiter_type type; 336 336 unsigned long timeout; 337 + 338 + /* Writer only, not initialized in reader */ 339 + bool handoff_set; 337 340 }; 338 341 #define rwsem_first_waiter(sem) \ 339 342 list_first_entry(&sem->wait_list, struct rwsem_waiter, list) ··· 345 342 RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */ 346 343 RWSEM_WAKE_READERS, /* Wake readers only */ 347 344 RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */ 348 - }; 349 - 350 - enum writer_wait_state { 351 - WRITER_NOT_FIRST, /* Writer is not first in wait list */ 352 - WRITER_FIRST, /* Writer is first in wait list */ 353 - WRITER_HANDOFF /* Writer is first & handoff needed */ 354 345 }; 355 346 356 347 /* ··· 362 365 */ 363 366 #define MAX_READERS_WAKEUP 0x100 364 367 368 + static inline void 369 + rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter) 370 + { 371 + lockdep_assert_held(&sem->wait_lock); 372 + list_add_tail(&waiter->list, &sem->wait_list); 373 + /* caller will set RWSEM_FLAG_WAITERS */ 374 + } 375 + 376 + /* 377 + * Remove a waiter from the wait_list and clear flags. 378 + * 379 + * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of 380 + * this function. Modify with care. 381 + */ 382 + static inline void 383 + rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter) 384 + { 385 + lockdep_assert_held(&sem->wait_lock); 386 + list_del(&waiter->list); 387 + if (likely(!list_empty(&sem->wait_list))) 388 + return; 389 + 390 + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count); 391 + } 392 + 365 393 /* 366 394 * handle the lock release when processes blocked on it that can now run 367 395 * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must ··· 398 376 * preferably when the wait_lock is released 399 377 * - woken process blocks are discarded from the list after having task zeroed 400 378 * - writers are only marked woken if downgrading is false 379 + * 380 + * Implies rwsem_del_waiter() for all woken readers. 401 381 */ 402 382 static void rwsem_mark_wake(struct rw_semaphore *sem, 403 383 enum rwsem_wake_type wake_type, ··· 514 490 515 491 adjustment = woken * RWSEM_READER_BIAS - adjustment; 516 492 lockevent_cond_inc(rwsem_wake_reader, woken); 517 - if (list_empty(&sem->wait_list)) { 518 - /* hit end of list above */ 519 - adjustment -= RWSEM_FLAG_WAITERS; 520 - } 521 493 522 - /* 523 - * When we've woken a reader, we no longer need to force writers 524 - * to give up the lock and we can clear HANDOFF. 525 - */ 526 - if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF)) 527 - adjustment -= RWSEM_FLAG_HANDOFF; 494 + oldcount = atomic_long_read(&sem->count); 495 + if (list_empty(&sem->wait_list)) { 496 + /* 497 + * Combined with list_move_tail() above, this implies 498 + * rwsem_del_waiter(). 499 + */ 500 + adjustment -= RWSEM_FLAG_WAITERS; 501 + if (oldcount & RWSEM_FLAG_HANDOFF) 502 + adjustment -= RWSEM_FLAG_HANDOFF; 503 + } else if (woken) { 504 + /* 505 + * When we've woken a reader, we no longer need to force 506 + * writers to give up the lock and we can clear HANDOFF. 507 + */ 508 + if (oldcount & RWSEM_FLAG_HANDOFF) 509 + adjustment -= RWSEM_FLAG_HANDOFF; 510 + } 528 511 529 512 if (adjustment) 530 513 atomic_long_add(adjustment, &sem->count); ··· 563 532 * race conditions between checking the rwsem wait list and setting the 564 533 * sem->count accordingly. 565 534 * 566 - * If wstate is WRITER_HANDOFF, it will make sure that either the handoff 567 - * bit is set or the lock is acquired with handoff bit cleared. 535 + * Implies rwsem_del_waiter() on success. 568 536 */ 569 537 static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, 570 - enum writer_wait_state wstate) 538 + struct rwsem_waiter *waiter) 571 539 { 540 + bool first = rwsem_first_waiter(sem) == waiter; 572 541 long count, new; 573 542 574 543 lockdep_assert_held(&sem->wait_lock); ··· 577 546 do { 578 547 bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF); 579 548 580 - if (has_handoff && wstate == WRITER_NOT_FIRST) 581 - return false; 549 + if (has_handoff) { 550 + if (!first) 551 + return false; 552 + 553 + /* First waiter inherits a previously set handoff bit */ 554 + waiter->handoff_set = true; 555 + } 582 556 583 557 new = count; 584 558 585 559 if (count & RWSEM_LOCK_MASK) { 586 - if (has_handoff || (wstate != WRITER_HANDOFF)) 560 + if (has_handoff || (!rt_task(waiter->task) && 561 + !time_after(jiffies, waiter->timeout))) 587 562 return false; 588 563 589 564 new |= RWSEM_FLAG_HANDOFF; ··· 606 569 * We have either acquired the lock with handoff bit cleared or 607 570 * set the handoff bit. 608 571 */ 609 - if (new & RWSEM_FLAG_HANDOFF) 572 + if (new & RWSEM_FLAG_HANDOFF) { 573 + waiter->handoff_set = true; 574 + lockevent_inc(rwsem_wlock_handoff); 610 575 return false; 576 + } 611 577 578 + /* 579 + * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on 580 + * success. 581 + */ 582 + list_del(&waiter->list); 612 583 rwsem_set_owner(sem); 613 584 return true; 614 585 } ··· 1001 956 } 1002 957 adjustment += RWSEM_FLAG_WAITERS; 1003 958 } 1004 - list_add_tail(&waiter.list, &sem->wait_list); 959 + rwsem_add_waiter(sem, &waiter); 1005 960 1006 961 /* we're now waiting on the lock, but no longer actively locking */ 1007 962 count = atomic_long_add_return(adjustment, &sem->count); ··· 1047 1002 return sem; 1048 1003 1049 1004 out_nolock: 1050 - list_del(&waiter.list); 1051 - if (list_empty(&sem->wait_list)) { 1052 - atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF, 1053 - &sem->count); 1054 - } 1005 + rwsem_del_waiter(sem, &waiter); 1055 1006 raw_spin_unlock_irq(&sem->wait_lock); 1056 1007 __set_current_state(TASK_RUNNING); 1057 1008 lockevent_inc(rwsem_rlock_fail); ··· 1061 1020 rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) 1062 1021 { 1063 1022 long count; 1064 - enum writer_wait_state wstate; 1065 1023 struct rwsem_waiter waiter; 1066 - struct rw_semaphore *ret = sem; 1067 1024 DEFINE_WAKE_Q(wake_q); 1068 1025 1069 1026 /* do optimistic spinning and steal lock if possible */ ··· 1077 1038 waiter.task = current; 1078 1039 waiter.type = RWSEM_WAITING_FOR_WRITE; 1079 1040 waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; 1041 + waiter.handoff_set = false; 1080 1042 1081 1043 raw_spin_lock_irq(&sem->wait_lock); 1082 - 1083 - /* account for this before adding a new element to the list */ 1084 - wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST; 1085 - 1086 - list_add_tail(&waiter.list, &sem->wait_list); 1044 + rwsem_add_waiter(sem, &waiter); 1087 1045 1088 1046 /* we're now waiting on the lock */ 1089 - if (wstate == WRITER_NOT_FIRST) { 1047 + if (rwsem_first_waiter(sem) != &waiter) { 1090 1048 count = atomic_long_read(&sem->count); 1091 1049 1092 1050 /* ··· 1119 1083 /* wait until we successfully acquire the lock */ 1120 1084 set_current_state(state); 1121 1085 for (;;) { 1122 - if (rwsem_try_write_lock(sem, wstate)) { 1086 + if (rwsem_try_write_lock(sem, &waiter)) { 1123 1087 /* rwsem_try_write_lock() implies ACQUIRE on success */ 1124 1088 break; 1125 1089 } 1126 1090 1127 1091 raw_spin_unlock_irq(&sem->wait_lock); 1092 + 1093 + if (signal_pending_state(state, current)) 1094 + goto out_nolock; 1128 1095 1129 1096 /* 1130 1097 * After setting the handoff bit and failing to acquire ··· 1137 1098 * In this case, we attempt to acquire the lock again 1138 1099 * without sleeping. 1139 1100 */ 1140 - if (wstate == WRITER_HANDOFF) { 1101 + if (waiter.handoff_set) { 1141 1102 enum owner_state owner_state; 1142 1103 1143 1104 preempt_disable(); ··· 1148 1109 goto trylock_again; 1149 1110 } 1150 1111 1151 - /* Block until there are no active lockers. */ 1152 - for (;;) { 1153 - if (signal_pending_state(state, current)) 1154 - goto out_nolock; 1155 - 1156 - schedule(); 1157 - lockevent_inc(rwsem_sleep_writer); 1158 - set_current_state(state); 1159 - /* 1160 - * If HANDOFF bit is set, unconditionally do 1161 - * a trylock. 1162 - */ 1163 - if (wstate == WRITER_HANDOFF) 1164 - break; 1165 - 1166 - if ((wstate == WRITER_NOT_FIRST) && 1167 - (rwsem_first_waiter(sem) == &waiter)) 1168 - wstate = WRITER_FIRST; 1169 - 1170 - count = atomic_long_read(&sem->count); 1171 - if (!(count & RWSEM_LOCK_MASK)) 1172 - break; 1173 - 1174 - /* 1175 - * The setting of the handoff bit is deferred 1176 - * until rwsem_try_write_lock() is called. 1177 - */ 1178 - if ((wstate == WRITER_FIRST) && (rt_task(current) || 1179 - time_after(jiffies, waiter.timeout))) { 1180 - wstate = WRITER_HANDOFF; 1181 - lockevent_inc(rwsem_wlock_handoff); 1182 - break; 1183 - } 1184 - } 1112 + schedule(); 1113 + lockevent_inc(rwsem_sleep_writer); 1114 + set_current_state(state); 1185 1115 trylock_again: 1186 1116 raw_spin_lock_irq(&sem->wait_lock); 1187 1117 } 1188 1118 __set_current_state(TASK_RUNNING); 1189 - list_del(&waiter.list); 1190 1119 raw_spin_unlock_irq(&sem->wait_lock); 1191 1120 lockevent_inc(rwsem_wlock); 1192 - 1193 - return ret; 1121 + return sem; 1194 1122 1195 1123 out_nolock: 1196 1124 __set_current_state(TASK_RUNNING); 1197 1125 raw_spin_lock_irq(&sem->wait_lock); 1198 - list_del(&waiter.list); 1199 - 1200 - if (unlikely(wstate == WRITER_HANDOFF)) 1201 - atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count); 1202 - 1203 - if (list_empty(&sem->wait_list)) 1204 - atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count); 1205 - else 1126 + rwsem_del_waiter(sem, &waiter); 1127 + if (!list_empty(&sem->wait_list)) 1206 1128 rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); 1207 1129 raw_spin_unlock_irq(&sem->wait_lock); 1208 1130 wake_up_q(&wake_q); 1209 1131 lockevent_inc(rwsem_wlock_fail); 1210 - 1211 1132 return ERR_PTR(-EINTR); 1212 1133 } 1213 1134 ··· 1248 1249 1249 1250 DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem); 1250 1251 1251 - /* 1252 - * Optimize for the case when the rwsem is not locked at all. 1253 - */ 1254 - tmp = RWSEM_UNLOCKED_VALUE; 1255 - do { 1252 + tmp = atomic_long_read(&sem->count); 1253 + while (!(tmp & RWSEM_READ_FAILED_MASK)) { 1256 1254 if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, 1257 - tmp + RWSEM_READER_BIAS)) { 1255 + tmp + RWSEM_READER_BIAS)) { 1258 1256 rwsem_set_reader_owned(sem); 1259 1257 return 1; 1260 1258 } 1261 - } while (!(tmp & RWSEM_READ_FAILED_MASK)); 1259 + } 1262 1260 return 0; 1263 1261 } 1264 1262