Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

rxrpc: Fix CPU time starvation in I/O thread

Starvation can happen in the rxrpc I/O thread because it goes back to the
top of the I/O loop after it does any one thing without trying to give any
other connection or call CPU time. Also, because it processes one call
packet at a time, it tries to do the retransmission loop after each ACK
without checking to see if there are other ACKs already in the queue that
can update the SACK state.

Fix this by:

(1) Add a received-packet queue on each call.

(2) Distribute packets from the master Rx queue to the individual call,
conn and error queues and 'poking' calls to add them to the attend
queue first thing in the I/O thread.

(3) Go through all the attention-seeking connections and calls before
going back to the top of the I/O thread. Each queue is extracted as a
whole and then gone through so that new additions to insert themselves
into the queue.

(4) Make the call event handler go through all the packets currently on
the call's rx_queue before transmitting and retransmitting DATA
packets.

(5) Drop the skb argument from the call event handler as this is now
replaced with the rx_queue. Instead, keep track of whether we
received a packet or an ACK for the tests that used to rely on that.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Link: https://patch.msgid.link/20241204074710.990092-14-dhowells@redhat.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>

authored by

David Howells and committed by
Jakub Kicinski
9e3cccd1 149d002b

+98 -77
+3
include/trace/events/rxrpc.h
··· 120 120 EM(rxrpc_call_poke_conn_abort, "Conn-abort") \ 121 121 EM(rxrpc_call_poke_error, "Error") \ 122 122 EM(rxrpc_call_poke_idle, "Idle") \ 123 + EM(rxrpc_call_poke_rx_packet, "Rx-packet") \ 123 124 EM(rxrpc_call_poke_set_timeout, "Set-timo") \ 124 125 EM(rxrpc_call_poke_start, "Start") \ 125 126 EM(rxrpc_call_poke_timer, "Timer") \ ··· 129 128 #define rxrpc_skb_traces \ 130 129 EM(rxrpc_skb_eaten_by_unshare, "ETN unshare ") \ 131 130 EM(rxrpc_skb_eaten_by_unshare_nomem, "ETN unshar-nm") \ 131 + EM(rxrpc_skb_get_call_rx, "GET call-rx ") \ 132 132 EM(rxrpc_skb_get_conn_secured, "GET conn-secd") \ 133 133 EM(rxrpc_skb_get_conn_work, "GET conn-work") \ 134 134 EM(rxrpc_skb_get_last_nack, "GET last-nack") \ ··· 141 139 EM(rxrpc_skb_new_error_report, "NEW error-rpt") \ 142 140 EM(rxrpc_skb_new_jumbo_subpacket, "NEW jumbo-sub") \ 143 141 EM(rxrpc_skb_new_unshared, "NEW unshared ") \ 142 + EM(rxrpc_skb_put_call_rx, "PUT call-rx ") \ 144 143 EM(rxrpc_skb_put_conn_secured, "PUT conn-secd") \ 145 144 EM(rxrpc_skb_put_conn_work, "PUT conn-work") \ 146 145 EM(rxrpc_skb_put_error_report, "PUT error-rep") \
+9 -1
net/rxrpc/ar-internal.h
··· 705 705 706 706 /* Received data tracking */ 707 707 struct sk_buff_head recvmsg_queue; /* Queue of packets ready for recvmsg() */ 708 + struct sk_buff_head rx_queue; /* Queue of packets for this call to receive */ 708 709 struct sk_buff_head rx_oos_queue; /* Queue of out of sequence packets */ 709 710 710 711 rxrpc_seq_t rx_highest_seq; /* Higest sequence number received */ ··· 907 906 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *); 908 907 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb); 909 908 910 - bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb); 909 + bool rxrpc_input_call_event(struct rxrpc_call *call); 911 910 912 911 /* 913 912 * call_object.c ··· 1351 1350 static inline bool after_eq(u32 seq1, u32 seq2) 1352 1351 { 1353 1352 return (s32)(seq1 - seq2) >= 0; 1353 + } 1354 + 1355 + static inline void rxrpc_queue_rx_call_packet(struct rxrpc_call *call, struct sk_buff *skb) 1356 + { 1357 + rxrpc_get_skb(skb, rxrpc_skb_get_call_rx); 1358 + __skb_queue_tail(&call->rx_queue, skb); 1359 + rxrpc_poke_call(call, rxrpc_call_poke_rx_packet); 1354 1360 } 1355 1361 1356 1362 /*
+1 -1
net/rxrpc/call_accept.c
··· 408 408 } 409 409 410 410 _leave(" = %p{%d}", call, call->debug_id); 411 - rxrpc_input_call_event(call, skb); 411 + rxrpc_queue_rx_call_packet(call, skb); 412 412 rxrpc_put_call(call, rxrpc_call_put_input); 413 413 return true; 414 414
+19 -15
net/rxrpc/call_event.c
··· 324 324 /* 325 325 * Handle retransmission and deferred ACK/abort generation. 326 326 */ 327 - bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) 327 + bool rxrpc_input_call_event(struct rxrpc_call *call) 328 328 { 329 + struct sk_buff *skb; 329 330 ktime_t now, t; 330 - bool resend = false; 331 + bool resend = false, did_receive = false, saw_ack = false; 331 332 s32 abort_code; 332 333 333 334 rxrpc_see_call(call, rxrpc_call_see_input); ··· 338 337 call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)], 339 338 call->events); 340 339 341 - if (__rxrpc_call_is_complete(call)) 342 - goto out; 343 - 344 340 /* Handle abort request locklessly, vs rxrpc_propose_abort(). */ 345 341 abort_code = smp_load_acquire(&call->send_abort); 346 342 if (abort_code) { ··· 346 348 goto out; 347 349 } 348 350 349 - if (skb && skb->mark == RXRPC_SKB_MARK_ERROR) 350 - goto out; 351 + while ((skb = __skb_dequeue(&call->rx_queue))) { 352 + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 351 353 352 - if (skb) 354 + if (__rxrpc_call_is_complete(call) || 355 + skb->mark == RXRPC_SKB_MARK_ERROR) { 356 + rxrpc_free_skb(skb, rxrpc_skb_put_call_rx); 357 + goto out; 358 + } 359 + 360 + saw_ack |= sp->hdr.type == RXRPC_PACKET_TYPE_ACK; 361 + 353 362 rxrpc_input_call_packet(call, skb); 363 + rxrpc_free_skb(skb, rxrpc_skb_put_call_rx); 364 + did_receive = true; 365 + } 354 366 355 367 /* If we see our async-event poke, check for timeout trippage. */ 356 368 now = ktime_get_real(); ··· 426 418 rxrpc_propose_ack_ping_for_keepalive); 427 419 } 428 420 429 - if (skb) { 430 - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 431 - 432 - if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK) 433 - rxrpc_congestion_degrade(call); 434 - } 421 + if (saw_ack) 422 + rxrpc_congestion_degrade(call); 435 423 436 424 if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events)) 437 425 rxrpc_send_initial_ping(call); ··· 498 494 if (call->security) 499 495 call->security->free_call_crypto(call); 500 496 } else { 501 - if (skb && 497 + if (did_receive && 502 498 call->peer->ackr_adv_pmtud && 503 499 call->peer->pmtud_pending) 504 500 rxrpc_send_probe_for_pmtud(call);
+2
net/rxrpc/call_object.c
··· 148 148 INIT_LIST_HEAD(&call->attend_link); 149 149 INIT_LIST_HEAD(&call->tx_sendmsg); 150 150 INIT_LIST_HEAD(&call->tx_buffer); 151 + skb_queue_head_init(&call->rx_queue); 151 152 skb_queue_head_init(&call->recvmsg_queue); 152 153 skb_queue_head_init(&call->rx_oos_queue); 153 154 init_waitqueue_head(&call->waitq); ··· 537 536 static void rxrpc_cleanup_ring(struct rxrpc_call *call) 538 537 { 539 538 rxrpc_purge_queue(&call->recvmsg_queue); 539 + rxrpc_purge_queue(&call->rx_queue); 540 540 rxrpc_purge_queue(&call->rx_oos_queue); 541 541 } 542 542
+7 -5
net/rxrpc/conn_client.c
··· 508 508 void rxrpc_connect_client_calls(struct rxrpc_local *local) 509 509 { 510 510 struct rxrpc_call *call; 511 + LIST_HEAD(new_client_calls); 511 512 512 - while ((call = list_first_entry_or_null(&local->new_client_calls, 513 - struct rxrpc_call, wait_link)) 514 - ) { 513 + spin_lock(&local->client_call_lock); 514 + list_splice_tail_init(&local->new_client_calls, &new_client_calls); 515 + spin_unlock(&local->client_call_lock); 516 + 517 + while ((call = list_first_entry_or_null(&new_client_calls, 518 + struct rxrpc_call, wait_link))) { 515 519 struct rxrpc_bundle *bundle = call->bundle; 516 520 517 - spin_lock(&local->client_call_lock); 518 521 list_move_tail(&call->wait_link, &bundle->waiting_calls); 519 522 rxrpc_see_call(call, rxrpc_call_see_waiting_call); 520 - spin_unlock(&local->client_call_lock); 521 523 522 524 if (rxrpc_bundle_has_space(bundle)) 523 525 rxrpc_activate_channels(bundle);
+1 -1
net/rxrpc/input.c
··· 1124 1124 break; 1125 1125 } 1126 1126 1127 - rxrpc_input_call_event(call, skb); 1127 + rxrpc_input_call_event(call); 1128 1128 }
+55 -53
net/rxrpc/io_thread.c
··· 338 338 struct rxrpc_channel *chan; 339 339 struct rxrpc_call *call = NULL; 340 340 unsigned int channel; 341 - bool ret; 342 341 343 342 if (sp->hdr.securityIndex != conn->security_ix) 344 343 return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security, ··· 424 425 peer_srx, skb); 425 426 } 426 427 427 - ret = rxrpc_input_call_event(call, skb); 428 + rxrpc_queue_rx_call_packet(call, skb); 428 429 rxrpc_put_call(call, rxrpc_call_put_input); 429 - return ret; 430 + return true; 430 431 } 431 432 432 433 /* ··· 443 444 ktime_t now; 444 445 #endif 445 446 bool should_stop; 447 + LIST_HEAD(conn_attend_q); 448 + LIST_HEAD(call_attend_q); 446 449 447 450 complete(&local->io_thread_ready); 448 451 ··· 455 454 for (;;) { 456 455 rxrpc_inc_stat(local->rxnet, stat_io_loop); 457 456 458 - /* Deal with connections that want immediate attention. */ 459 - conn = list_first_entry_or_null(&local->conn_attend_q, 460 - struct rxrpc_connection, 461 - attend_link); 462 - if (conn) { 463 - spin_lock_bh(&local->lock); 464 - list_del_init(&conn->attend_link); 465 - spin_unlock_bh(&local->lock); 457 + /* Inject a delay into packets if requested. */ 458 + #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 459 + now = ktime_get_real(); 460 + while ((skb = skb_peek(&local->rx_delay_queue))) { 461 + if (ktime_before(now, skb->tstamp)) 462 + break; 463 + skb = skb_dequeue(&local->rx_delay_queue); 464 + skb_queue_tail(&local->rx_queue, skb); 465 + } 466 + #endif 466 467 467 - rxrpc_input_conn_event(conn, NULL); 468 - rxrpc_put_connection(conn, rxrpc_conn_put_poke); 469 - continue; 468 + if (!skb_queue_empty(&local->rx_queue)) { 469 + spin_lock_irq(&local->rx_queue.lock); 470 + skb_queue_splice_tail_init(&local->rx_queue, &rx_queue); 471 + spin_unlock_irq(&local->rx_queue.lock); 470 472 } 471 473 472 - if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER, 473 - &local->client_conn_flags)) 474 - rxrpc_discard_expired_client_conns(local); 475 - 476 - /* Deal with calls that want immediate attention. */ 477 - if ((call = list_first_entry_or_null(&local->call_attend_q, 478 - struct rxrpc_call, 479 - attend_link))) { 480 - spin_lock_bh(&local->lock); 481 - list_del_init(&call->attend_link); 482 - spin_unlock_bh(&local->lock); 483 - 484 - trace_rxrpc_call_poked(call); 485 - rxrpc_input_call_event(call, NULL); 486 - rxrpc_put_call(call, rxrpc_call_put_poke); 487 - continue; 488 - } 489 - 490 - if (!list_empty(&local->new_client_calls)) 491 - rxrpc_connect_client_calls(local); 492 - 493 - /* Process received packets and errors. */ 494 - if ((skb = __skb_dequeue(&rx_queue))) { 474 + /* Distribute packets and errors. */ 475 + while ((skb = __skb_dequeue(&rx_queue))) { 495 476 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 496 477 switch (skb->mark) { 497 478 case RXRPC_SKB_MARK_PACKET: ··· 497 514 rxrpc_free_skb(skb, rxrpc_skb_put_unknown); 498 515 break; 499 516 } 500 - continue; 501 517 } 502 518 503 - /* Inject a delay into packets if requested. */ 504 - #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 505 - now = ktime_get_real(); 506 - while ((skb = skb_peek(&local->rx_delay_queue))) { 507 - if (ktime_before(now, skb->tstamp)) 508 - break; 509 - skb = skb_dequeue(&local->rx_delay_queue); 510 - skb_queue_tail(&local->rx_queue, skb); 511 - } 512 - #endif 519 + /* Deal with connections that want immediate attention. */ 520 + spin_lock_bh(&local->lock); 521 + list_splice_tail_init(&local->conn_attend_q, &conn_attend_q); 522 + spin_unlock_bh(&local->lock); 513 523 514 - if (!skb_queue_empty(&local->rx_queue)) { 515 - spin_lock_irq(&local->rx_queue.lock); 516 - skb_queue_splice_tail_init(&local->rx_queue, &rx_queue); 517 - spin_unlock_irq(&local->rx_queue.lock); 518 - continue; 524 + while ((conn = list_first_entry_or_null(&conn_attend_q, 525 + struct rxrpc_connection, 526 + attend_link))) { 527 + spin_lock_bh(&local->lock); 528 + list_del_init(&conn->attend_link); 529 + spin_unlock_bh(&local->lock); 530 + rxrpc_input_conn_event(conn, NULL); 531 + rxrpc_put_connection(conn, rxrpc_conn_put_poke); 519 532 } 533 + 534 + if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER, 535 + &local->client_conn_flags)) 536 + rxrpc_discard_expired_client_conns(local); 537 + 538 + /* Deal with calls that want immediate attention. */ 539 + spin_lock_bh(&local->lock); 540 + list_splice_tail_init(&local->call_attend_q, &call_attend_q); 541 + spin_unlock_bh(&local->lock); 542 + 543 + while ((call = list_first_entry_or_null(&call_attend_q, 544 + struct rxrpc_call, 545 + attend_link))) { 546 + spin_lock_bh(&local->lock); 547 + list_del_init(&call->attend_link); 548 + spin_unlock_bh(&local->lock); 549 + trace_rxrpc_call_poked(call); 550 + rxrpc_input_call_event(call); 551 + rxrpc_put_call(call, rxrpc_call_put_poke); 552 + } 553 + 554 + if (!list_empty(&local->new_client_calls)) 555 + rxrpc_connect_client_calls(local); 520 556 521 557 set_current_state(TASK_INTERRUPTIBLE); 522 558 should_stop = kthread_should_stop();
+1 -1
net/rxrpc/peer_event.c
··· 224 224 225 225 rxrpc_see_call(call, rxrpc_call_see_distribute_error); 226 226 rxrpc_set_call_completion(call, compl, 0, -err); 227 - rxrpc_input_call_event(call, skb); 227 + rxrpc_input_call_event(call); 228 228 229 229 spin_lock(&peer->lock); 230 230 }