Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

rxrpc: Prepare to be able to send jumbo DATA packets

Prepare to be able to send jumbo DATA packets if the we decide to, but
don't enable that yet. This will allow larger chunks of data to be sent
without reducing the retryability as the subpackets in a jumbo packet can
also be retransmitted individually.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Link: https://patch.msgid.link/20241204074710.990092-12-dhowells@redhat.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>

authored by

David Howells and committed by
Jakub Kicinski
b7313009 3d2bdf73

+137 -60
+17 -1
net/rxrpc/ar-internal.h
··· 832 832 __be16 cksum; /* Checksum to go in header */ 833 833 unsigned short ack_rwind; /* ACK receive window */ 834 834 u8 /*enum rxrpc_propose_ack_trace*/ ack_why; /* If ack, why */ 835 + bool jumboable; /* Can be non-terminal jumbo subpacket */ 835 836 u8 nr_kvec; /* Amount of kvec[] used */ 836 837 struct kvec kvec[3]; 837 838 }; ··· 860 859 if (serial == 0) 861 860 serial = 1; 862 861 conn->tx_serial = serial + 1; 862 + return serial; 863 + } 864 + 865 + /* 866 + * Allocate the next serial n numbers on a connection. 0 must be skipped. 867 + */ 868 + static inline rxrpc_serial_t rxrpc_get_next_serials(struct rxrpc_connection *conn, 869 + unsigned int n) 870 + { 871 + rxrpc_serial_t serial; 872 + 873 + serial = conn->tx_serial; 874 + if (serial + n <= n) 875 + serial = 1; 876 + conn->tx_serial = serial + n; 863 877 return serial; 864 878 } 865 879 ··· 1192 1176 void rxrpc_send_conn_abort(struct rxrpc_connection *conn); 1193 1177 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb); 1194 1178 void rxrpc_send_keepalive(struct rxrpc_peer *); 1195 - void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb); 1179 + void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n); 1196 1180 1197 1181 /* 1198 1182 * peer_event.c
+28 -20
net/rxrpc/call_event.c
··· 124 124 ktime_sub(resend_at, now)); 125 125 126 126 txb->flags |= RXRPC_TXBUF_RESENT; 127 - rxrpc_transmit_one(call, txb); 127 + rxrpc_transmit_data(call, txb, 1); 128 128 did_send = true; 129 129 now = ktime_get_real(); 130 130 ··· 164 164 unacked = true; 165 165 166 166 txb->flags |= RXRPC_TXBUF_RESENT; 167 - rxrpc_transmit_one(call, txb); 167 + rxrpc_transmit_data(call, txb, 1); 168 168 did_send = true; 169 169 rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); 170 170 now = ktime_get_real(); ··· 231 231 } 232 232 } 233 233 234 - static bool rxrpc_tx_window_has_space(struct rxrpc_call *call) 234 + static unsigned int rxrpc_tx_window_space(struct rxrpc_call *call) 235 235 { 236 - unsigned int winsize = umin(call->tx_winsize, call->cong_cwnd + call->cong_extra); 237 - rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize; 238 - rxrpc_seq_t tx_top = call->tx_top; 239 - int space; 236 + int winsize = umin(call->tx_winsize, call->cong_cwnd + call->cong_extra); 237 + int in_flight = call->tx_top - call->acks_hard_ack; 240 238 241 - space = wtop - tx_top; 242 - return space > 0; 239 + return max(winsize - in_flight, 0); 243 240 } 244 241 245 242 /* ··· 244 247 */ 245 248 static void rxrpc_decant_prepared_tx(struct rxrpc_call *call) 246 249 { 247 - struct rxrpc_txbuf *txb; 250 + int space = rxrpc_tx_window_space(call); 248 251 249 252 if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { 250 253 if (list_empty(&call->tx_sendmsg)) ··· 252 255 rxrpc_expose_client_call(call); 253 256 } 254 257 255 - while ((txb = list_first_entry_or_null(&call->tx_sendmsg, 256 - struct rxrpc_txbuf, call_link))) { 258 + while (space > 0) { 259 + struct rxrpc_txbuf *head = NULL, *txb; 260 + int count = 0, limit = min(space, 1); 261 + 262 + if (list_empty(&call->tx_sendmsg)) 263 + break; 264 + 257 265 spin_lock(&call->tx_lock); 258 - list_del(&txb->call_link); 266 + do { 267 + txb = list_first_entry(&call->tx_sendmsg, 268 + struct rxrpc_txbuf, call_link); 269 + if (!head) 270 + head = txb; 271 + list_move_tail(&txb->call_link, &call->tx_buffer); 272 + count++; 273 + if (!txb->jumboable) 274 + break; 275 + } while (count < limit && !list_empty(&call->tx_sendmsg)); 276 + 259 277 spin_unlock(&call->tx_lock); 260 278 261 279 call->tx_top = txb->seq; 262 - list_add_tail(&txb->call_link, &call->tx_buffer); 263 - 264 280 if (txb->flags & RXRPC_LAST_PACKET) 265 281 rxrpc_close_tx_phase(call); 266 282 267 - rxrpc_transmit_one(call, txb); 268 - 269 - if (!rxrpc_tx_window_has_space(call)) 270 - break; 283 + space -= count; 284 + rxrpc_transmit_data(call, head, count); 271 285 } 272 286 } 273 287 ··· 293 285 294 286 case RXRPC_CALL_SERVER_SEND_REPLY: 295 287 case RXRPC_CALL_CLIENT_SEND_REQUEST: 296 - if (!rxrpc_tx_window_has_space(call)) 288 + if (!rxrpc_tx_window_space(call)) 297 289 return; 298 290 if (list_empty(&call->tx_sendmsg)) { 299 291 rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
+22 -14
net/rxrpc/input.c
··· 693 693 { 694 694 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 695 695 struct rxrpc_peer *peer = call->peer; 696 - unsigned int max_data; 696 + unsigned int max_data, capacity; 697 697 bool wake = false; 698 - u32 rwind = ntohl(trailer->rwind); 698 + u32 max_mtu = ntohl(trailer->maxMTU); 699 + //u32 if_mtu = ntohl(trailer->ifMTU); 700 + u32 rwind = ntohl(trailer->rwind); 701 + u32 jumbo_max = ntohl(trailer->jumbo_max); 699 702 700 703 if (rwind > RXRPC_TX_MAX_WINDOW) 701 704 rwind = RXRPC_TX_MAX_WINDOW; ··· 709 706 call->tx_winsize = rwind; 710 707 } 711 708 712 - if (trailer->jumbo_max == 0) { 709 + max_mtu = clamp(max_mtu, 500, 65535); 710 + peer->ackr_max_data = max_mtu; 711 + 712 + if (max_mtu < peer->max_data) { 713 + trace_rxrpc_pmtud_reduce(peer, sp->hdr.serial, max_mtu, 714 + rxrpc_pmtud_reduce_ack); 715 + write_seqcount_begin(&peer->mtu_lock); 716 + peer->max_data = max_mtu; 717 + write_seqcount_end(&peer->mtu_lock); 718 + } 719 + 720 + max_data = umin(max_mtu, peer->max_data); 721 + capacity = max_data; 722 + capacity += sizeof(struct rxrpc_jumbo_header); /* First subpacket has main hdr, not jumbo */ 723 + capacity /= sizeof(struct rxrpc_jumbo_header) + RXRPC_JUMBO_DATALEN; 724 + 725 + if (jumbo_max == 0) { 713 726 /* The peer says it supports pmtu discovery */ 714 727 peer->ackr_adv_pmtud = true; 715 728 } else { 716 729 peer->ackr_adv_pmtud = false; 717 - } 718 - 719 - max_data = ntohl(trailer->maxMTU); 720 - peer->ackr_max_data = max_data; 721 - 722 - if (max_data < peer->max_data) { 723 - trace_rxrpc_pmtud_reduce(peer, sp->hdr.serial, max_data, 724 - rxrpc_pmtud_reduce_ack); 725 - write_seqcount_begin(&peer->mtu_lock); 726 - peer->max_data = max_data; 727 - write_seqcount_end(&peer->mtu_lock); 728 730 } 729 731 730 732 if (wake)
+2
net/rxrpc/insecure.c
··· 25 25 static int none_secure_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb) 26 26 { 27 27 txb->pkt_len = txb->len; 28 + if (txb->len == RXRPC_JUMBO_DATALEN) 29 + txb->jumboable = true; 28 30 return 0; 29 31 } 30 32
+55 -25
net/rxrpc/output.c
··· 377 377 */ 378 378 static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc_txbuf *txb, 379 379 rxrpc_serial_t serial, 380 - int subpkt) 380 + int subpkt, int nr_subpkts) 381 381 { 382 382 struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base; 383 + struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo); 383 384 enum rxrpc_req_ack_trace why; 384 385 struct rxrpc_connection *conn = call->conn; 385 386 struct kvec *kv = &call->local->kvec[subpkt]; ··· 399 398 txb->flags &= ~RXRPC_REQUEST_ACK; 400 399 flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS; 401 400 last = txb->flags & RXRPC_LAST_PACKET; 401 + 402 + if (subpkt < nr_subpkts - 1) { 403 + len = RXRPC_JUMBO_DATALEN; 404 + goto dont_set_request_ack; 405 + } 402 406 403 407 more = (!list_is_last(&txb->call_link, &call->tx_buffer) || 404 408 !list_empty(&call->tx_sendmsg)); ··· 442 436 } 443 437 dont_set_request_ack: 444 438 445 - whdr->flags = flags; 446 - whdr->serial = htonl(txb->serial); 447 - whdr->cksum = txb->cksum; 448 - whdr->serviceId = htons(conn->service_id); 449 - kv->iov_base = whdr; 450 - len += sizeof(*whdr); 451 - // TODO: Convert into a jumbo header for tail subpackets 439 + /* The jumbo header overlays the wire header in the txbuf. */ 440 + if (subpkt < nr_subpkts - 1) 441 + flags |= RXRPC_JUMBO_PACKET; 442 + else 443 + flags &= ~RXRPC_JUMBO_PACKET; 444 + if (subpkt == 0) { 445 + whdr->flags = flags; 446 + whdr->serial = htonl(txb->serial); 447 + whdr->cksum = txb->cksum; 448 + whdr->serviceId = htons(conn->service_id); 449 + kv->iov_base = whdr; 450 + len += sizeof(*whdr); 451 + } else { 452 + jumbo->flags = flags; 453 + jumbo->pad = 0; 454 + jumbo->cksum = txb->cksum; 455 + kv->iov_base = jumbo; 456 + len += sizeof(*jumbo); 457 + } 452 458 453 459 trace_rxrpc_tx_data(call, txb->seq, txb->serial, flags, false); 454 460 kv->iov_len = len; ··· 468 450 } 469 451 470 452 /* 471 - * Prepare a packet for transmission. 453 + * Prepare a (jumbo) packet for transmission. 472 454 */ 473 - static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb) 455 + static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *head, int n) 474 456 { 457 + struct rxrpc_txbuf *txb = head; 475 458 rxrpc_serial_t serial; 476 459 size_t len = 0; 477 460 478 461 /* Each transmission of a Tx packet needs a new serial number */ 479 - serial = rxrpc_get_next_serial(call->conn); 462 + serial = rxrpc_get_next_serials(call->conn, n); 480 463 481 - len += rxrpc_prepare_data_subpacket(call, txb, serial, 0); 482 - // TODO: Loop around adding tail subpackets 464 + for (int i = 0; i < n; i++) { 465 + len += rxrpc_prepare_data_subpacket(call, txb, serial, i, n); 466 + serial++; 467 + txb = list_next_entry(txb, call_link); 468 + } 483 469 484 470 return len; 485 471 } ··· 491 469 /* 492 470 * Set timeouts after transmitting a packet. 493 471 */ 494 - static void rxrpc_tstamp_data_packets(struct rxrpc_call *call, struct rxrpc_txbuf *txb) 472 + static void rxrpc_tstamp_data_packets(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n) 495 473 { 474 + rxrpc_serial_t serial; 496 475 ktime_t now = ktime_get_real(); 497 476 bool ack_requested = txb->flags & RXRPC_REQUEST_ACK; 477 + int i; 498 478 499 479 call->tx_last_sent = now; 500 - txb->last_sent = now; 480 + 481 + for (i = 0; i < n; i++) { 482 + txb->last_sent = now; 483 + ack_requested |= txb->flags & RXRPC_REQUEST_ACK; 484 + serial = txb->serial; 485 + txb = list_next_entry(txb, call_link); 486 + } 501 487 502 488 if (ack_requested) { 503 - rxrpc_begin_rtt_probe(call, txb->serial, now, rxrpc_rtt_tx_data); 489 + rxrpc_begin_rtt_probe(call, serial, now, rxrpc_rtt_tx_data); 504 490 505 491 call->peer->rtt_last_req = now; 506 492 if (call->peer->rtt_count > 1) { ··· 532 502 /* 533 503 * send a packet through the transport endpoint 534 504 */ 535 - static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb) 505 + static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n) 536 506 { 537 507 struct rxrpc_connection *conn = call->conn; 538 508 enum rxrpc_tx_point frag; ··· 542 512 543 513 _enter("%x,{%d}", txb->seq, txb->pkt_len); 544 514 545 - len = rxrpc_prepare_data_packet(call, txb); 515 + len = rxrpc_prepare_data_packet(call, txb, n); 546 516 547 517 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { 548 518 static int lose; ··· 554 524 } 555 525 } 556 526 557 - iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, 1, len); 527 + iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, n, len); 558 528 559 529 msg.msg_name = &call->peer->srx.transport; 560 530 msg.msg_namelen = call->peer->srx.transport_len; ··· 567 537 * yet. 568 538 */ 569 539 if (txb->seq == call->tx_transmitted + 1) 570 - call->tx_transmitted = txb->seq; 540 + call->tx_transmitted = txb->seq + n - 1; 571 541 572 542 /* send the packet with the don't fragment bit set if we currently 573 543 * think it's small enough */ ··· 598 568 } 599 569 600 570 rxrpc_tx_backoff(call, ret); 601 - if (ret == -EMSGSIZE && frag == rxrpc_tx_point_call_data_frag) { 571 + if (ret == -EMSGSIZE && frag == rxrpc_tx_point_call_data_nofrag) { 602 572 rxrpc_local_dont_fragment(conn->local, false); 603 573 frag = rxrpc_tx_point_call_data_frag; 604 574 goto retry; ··· 606 576 607 577 done: 608 578 if (ret >= 0) { 609 - rxrpc_tstamp_data_packets(call, txb); 579 + rxrpc_tstamp_data_packets(call, txb, n); 610 580 } else { 611 581 /* Cancel the call if the initial transmission fails, 612 582 * particularly if that's due to network routing issues that ··· 806 776 } 807 777 808 778 /* 809 - * Transmit one packet. 779 + * Transmit a packet, possibly gluing several subpackets together. 810 780 */ 811 - void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb) 781 + void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n) 812 782 { 813 783 int ret; 814 784 815 - ret = rxrpc_send_data_packet(call, txb); 785 + ret = rxrpc_send_data_packet(call, txb, n); 816 786 if (ret < 0) { 817 787 switch (ret) { 818 788 case -ENETUNREACH:
+13
net/rxrpc/rxkad.c
··· 392 392 break; 393 393 case RXRPC_SECURITY_AUTH: 394 394 ret = rxkad_secure_packet_auth(call, txb, req); 395 + if (txb->alloc_size == RXRPC_JUMBO_DATALEN) 396 + txb->jumboable = true; 395 397 break; 396 398 case RXRPC_SECURITY_ENCRYPT: 397 399 ret = rxkad_secure_packet_encrypt(call, txb, req); 400 + if (txb->alloc_size == RXRPC_JUMBO_DATALEN) 401 + txb->jumboable = true; 398 402 break; 399 403 default: 400 404 ret = -EPERM; 401 405 break; 406 + } 407 + 408 + /* Clear excess space in the packet */ 409 + if (txb->pkt_len < txb->alloc_size) { 410 + struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base; 411 + size_t gap = txb->alloc_size - txb->pkt_len; 412 + void *p = whdr + 1; 413 + 414 + memset(p + txb->pkt_len, 0, gap); 402 415 } 403 416 404 417 skcipher_request_free(req);