Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'net-rds-rds-tcp-bug-fix-collection-subset-1-work-queue-scalability'

Allison Henderson says:

====================
net/rds: RDS-TCP bug fix collection, subset 1: Work queue scalability

This is subset 1 of the RDS-TCP bug fix collection series I posted last
Oct. The greater series aims to correct multiple rds-tcp bugs that
can cause dropped or out of sequence messages. The set was starting to
get a bit large, so I've broken it down into smaller sets to make
reviews more manageable.

In this subset, we focus on work queue scalability. Messages queues
are refactored to operate in parallel across multiple connections,
which improves response times and avoids timeouts.

The entire set can be viewed in the rfc here:
https://lore.kernel.org/netdev/20251022191715.157755-1-achender@kernel.org/

Questions, comments, flames appreciated!
====================

Link: https://patch.msgid.link/20260109224843.128076-1-achender@kernel.org
Signed-off-by: Paolo Abeni <pabeni@redhat.com>

+43 -23
+1 -1
net/rds/cong.c
··· 242 242 * therefore trigger warnings. 243 243 * Defer the xmit to rds_send_worker() instead. 244 244 */ 245 - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); 245 + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0); 246 246 } 247 247 rcu_read_unlock(); 248 248 }
+23 -5
net/rds/connection.c
··· 169 169 struct rds_connection *conn, *parent = NULL; 170 170 struct hlist_head *head = rds_conn_bucket(laddr, faddr); 171 171 struct rds_transport *loop_trans; 172 + struct rds_conn_path *free_cp = NULL; 172 173 unsigned long flags; 173 174 int ret, i; 174 175 int npaths = (trans->t_mp_capable ? RDS_MPATH_WORKERS : 1); ··· 270 269 __rds_conn_path_init(conn, &conn->c_path[i], 271 270 is_outgoing); 272 271 conn->c_path[i].cp_index = i; 272 + conn->c_path[i].cp_wq = 273 + alloc_ordered_workqueue("krds_cp_wq#%lu/%d", 0, 274 + rds_conn_count, i); 275 + if (!conn->c_path[i].cp_wq) 276 + conn->c_path[i].cp_wq = rds_wq; 273 277 } 274 278 rcu_read_lock(); 275 279 if (rds_destroy_pending(conn)) ··· 283 277 ret = trans->conn_alloc(conn, GFP_ATOMIC); 284 278 if (ret) { 285 279 rcu_read_unlock(); 286 - kfree(conn->c_path); 280 + free_cp = conn->c_path; 287 281 kmem_cache_free(rds_conn_slab, conn); 288 282 conn = ERR_PTR(ret); 289 283 goto out; ··· 306 300 /* Creating passive conn */ 307 301 if (parent->c_passive) { 308 302 trans->conn_free(conn->c_path[0].cp_transport_data); 309 - kfree(conn->c_path); 303 + free_cp = conn->c_path; 310 304 kmem_cache_free(rds_conn_slab, conn); 311 305 conn = parent->c_passive; 312 306 } else { ··· 333 327 if (cp->cp_transport_data) 334 328 trans->conn_free(cp->cp_transport_data); 335 329 } 336 - kfree(conn->c_path); 330 + free_cp = conn->c_path; 337 331 kmem_cache_free(rds_conn_slab, conn); 338 332 conn = found; 339 333 } else { ··· 348 342 rcu_read_unlock(); 349 343 350 344 out: 345 + if (free_cp) { 346 + for (i = 0; i < npaths; i++) 347 + if (free_cp[i].cp_wq != rds_wq) 348 + destroy_workqueue(free_cp[i].cp_wq); 349 + kfree(free_cp); 350 + } 351 + 351 352 return conn; 352 353 } 353 354 ··· 481 468 WARN_ON(delayed_work_pending(&cp->cp_recv_w)); 482 469 WARN_ON(delayed_work_pending(&cp->cp_conn_w)); 483 470 WARN_ON(work_pending(&cp->cp_down_w)); 471 + 472 + if (cp->cp_wq != rds_wq) { 473 + destroy_workqueue(cp->cp_wq); 474 + cp->cp_wq = NULL; 475 + } 484 476 485 477 cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); 486 478 } ··· 902 884 rcu_read_unlock(); 903 885 return; 904 886 } 905 - queue_work(rds_wq, &cp->cp_down_w); 887 + queue_work(cp->cp_wq, &cp->cp_down_w); 906 888 rcu_read_unlock(); 907 889 } 908 890 EXPORT_SYMBOL_GPL(rds_conn_path_drop); ··· 927 909 } 928 910 if (rds_conn_path_state(cp) == RDS_CONN_DOWN && 929 911 !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) 930 - queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); 912 + queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0); 931 913 rcu_read_unlock(); 932 914 } 933 915 EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
+1 -1
net/rds/ib_recv.c
··· 457 457 (must_wake || 458 458 (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) || 459 459 rds_ib_ring_empty(&ic->i_recv_ring))) { 460 - queue_delayed_work(rds_wq, &conn->c_recv_w, 1); 460 + queue_delayed_work(conn->c_path->cp_wq, &conn->c_recv_w, 1); 461 461 } 462 462 if (can_wait) 463 463 cond_resched();
+2 -2
net/rds/ib_send.c
··· 297 297 298 298 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 299 299 test_bit(0, &conn->c_map_queued)) 300 - queue_delayed_work(rds_wq, &conn->c_send_w, 0); 300 + queue_delayed_work(conn->c_path->cp_wq, &conn->c_send_w, 0); 301 301 302 302 /* We expect errors as the qp is drained during shutdown */ 303 303 if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) { ··· 419 419 420 420 atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits); 421 421 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 422 - queue_delayed_work(rds_wq, &conn->c_send_w, 0); 422 + queue_delayed_work(conn->c_path->cp_wq, &conn->c_send_w, 0); 423 423 424 424 WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384); 425 425
+1
net/rds/rds.h
··· 118 118 119 119 void *cp_transport_data; 120 120 121 + struct workqueue_struct *cp_wq; 121 122 atomic_t cp_state; 122 123 unsigned long cp_send_gen; 123 124 unsigned long cp_flags;
+5 -4
net/rds/send.c
··· 458 458 if (rds_destroy_pending(cp->cp_conn)) 459 459 ret = -ENETUNREACH; 460 460 else 461 - queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 461 + queue_delayed_work(cp->cp_wq, 462 + &cp->cp_send_w, 1); 462 463 rcu_read_unlock(); 463 464 } else if (raced) { 464 465 rds_stats_inc(s_send_lock_queue_raced); ··· 1381 1380 if (rds_destroy_pending(cpath->cp_conn)) 1382 1381 ret = -ENETUNREACH; 1383 1382 else 1384 - queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); 1383 + queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1); 1385 1384 rcu_read_unlock(); 1386 1385 } 1387 1386 if (ret) ··· 1471 1470 rds_stats_inc(s_send_queued); 1472 1471 rds_stats_inc(s_send_pong); 1473 1472 1474 - /* schedule the send work on rds_wq */ 1473 + /* schedule the send work on cp_wq */ 1475 1474 rcu_read_lock(); 1476 1475 if (!rds_destroy_pending(cp->cp_conn)) 1477 - queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 1476 + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1); 1478 1477 rcu_read_unlock(); 1479 1478 1480 1479 rds_message_put(rm);
+1 -1
net/rds/tcp_recv.c
··· 327 327 if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) { 328 328 rcu_read_lock(); 329 329 if (!rds_destroy_pending(cp->cp_conn)) 330 - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); 330 + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0); 331 331 rcu_read_unlock(); 332 332 } 333 333 out:
+1 -1
net/rds/tcp_send.c
··· 201 201 rcu_read_lock(); 202 202 if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf && 203 203 !rds_destroy_pending(cp->cp_conn)) 204 - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); 204 + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0); 205 205 rcu_read_unlock(); 206 206 207 207 out:
+8 -8
net/rds/threads.c
··· 89 89 set_bit(0, &cp->cp_conn->c_map_queued); 90 90 rcu_read_lock(); 91 91 if (!rds_destroy_pending(cp->cp_conn)) { 92 - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); 93 - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); 92 + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0); 93 + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0); 94 94 } 95 95 rcu_read_unlock(); 96 96 cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION; ··· 140 140 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; 141 141 rcu_read_lock(); 142 142 if (!rds_destroy_pending(cp->cp_conn)) 143 - queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); 143 + queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0); 144 144 rcu_read_unlock(); 145 145 return; 146 146 } ··· 151 151 conn, &conn->c_laddr, &conn->c_faddr); 152 152 rcu_read_lock(); 153 153 if (!rds_destroy_pending(cp->cp_conn)) 154 - queue_delayed_work(rds_wq, &cp->cp_conn_w, 154 + queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 155 155 rand % cp->cp_reconnect_jiffies); 156 156 rcu_read_unlock(); 157 157 ··· 203 203 switch (ret) { 204 204 case -EAGAIN: 205 205 rds_stats_inc(s_send_immediate_retry); 206 - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); 206 + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0); 207 207 break; 208 208 case -ENOMEM: 209 209 rds_stats_inc(s_send_delayed_retry); 210 - queue_delayed_work(rds_wq, &cp->cp_send_w, 2); 210 + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 2); 211 211 break; 212 212 default: 213 213 break; ··· 228 228 switch (ret) { 229 229 case -EAGAIN: 230 230 rds_stats_inc(s_recv_immediate_retry); 231 - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); 231 + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0); 232 232 break; 233 233 case -ENOMEM: 234 234 rds_stats_inc(s_recv_delayed_retry); 235 - queue_delayed_work(rds_wq, &cp->cp_recv_w, 2); 235 + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 2); 236 236 break; 237 237 default: 238 238 break;