Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'net-rds-rds-tcp-protocol-and-extension-improvements'

Allison Henderson says:

====================
net/rds: RDS-TCP protocol and extension improvements

This is subset 3 of the larger RDS-TCP patch series I posted last
Oct. The greater series aims to correct multiple rds-tcp issues that
can cause dropped or out of sequence messages. I've broken it down into
smaller sets to make reviews more manageable.

In this set, we introduce extension headers for byte accounting
and fix several RDS/TCP protocol issues including message preservation
during connection transitions and multipath lane handling.

The entire set can be viewed in the rfc here:
https://lore.kernel.org/netdev/20251022191715.157755-1-achender@kernel.org/
====================

Link: https://patch.msgid.link/20260203055723.1085751-1-achender@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>

+450 -121
+6 -1
net/rds/connection.c
··· 442 442 * to the conn hash, so we never trigger a reconnect on this 443 443 * conn - the reconnect is always triggered by the active peer. */ 444 444 cancel_delayed_work_sync(&cp->cp_conn_w); 445 + 446 + clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); 445 447 rcu_read_lock(); 446 448 if (!hlist_unhashed(&conn->c_hash_node)) { 447 449 rcu_read_unlock(); 450 + if (conn->c_trans->t_mp_capable && 451 + cp->cp_index == 0) 452 + rds_send_ping(conn, 0); 448 453 rds_queue_reconnect(cp); 449 454 } else { 450 455 rcu_read_unlock(); 451 456 } 452 457 453 458 if (conn->c_trans->conn_slots_available) 454 - conn->c_trans->conn_slots_available(conn); 459 + conn->c_trans->conn_slots_available(conn, false); 455 460 } 456 461 457 462 /* destroy a single rds_conn_path. rds_conn_destroy() iterates over
+33 -7
net/rds/ib_send.c
··· 577 577 /* If it has a RDMA op, tell the peer we did it. This is 578 578 * used by the peer to release use-once RDMA MRs. */ 579 579 if (rm->rdma.op_active) { 580 - struct rds_ext_header_rdma ext_hdr; 580 + struct rds_ext_header_rdma ext_hdr = {}; 581 + struct rds_ext_header_rdma_bytes 582 + rdma_bytes_ext_hdr = {}; 581 583 582 584 ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey); 583 - rds_message_add_extension(&rm->m_inc.i_hdr, 584 - RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr)); 585 + if (rds_message_add_extension(&rm->m_inc.i_hdr, 586 + RDS_EXTHDR_RDMA, 587 + &ext_hdr)) { 588 + /* prepare the rdma bytes ext header */ 589 + rdma_bytes_ext_hdr.h_rflags = 590 + rm->rdma.op_write ? 591 + RDS_FLAG_RDMA_WR_BYTES : 592 + RDS_FLAG_RDMA_RD_BYTES; 593 + rdma_bytes_ext_hdr.h_rdma_bytes = 594 + cpu_to_be32(rm->rdma.op_bytes); 595 + } else { 596 + rdsdebug("RDS_EXTHDR_RDMA dropped"); 597 + } 598 + 599 + if (rds_message_add_extension(&rm->m_inc.i_hdr, 600 + RDS_EXTHDR_RDMA_BYTES, 601 + &rdma_bytes_ext_hdr)) { 602 + /* rdma bytes ext header was added successfully, 603 + * notify the remote side via flag in header 604 + */ 605 + rm->m_inc.i_hdr.h_flags |= 606 + RDS_FLAG_EXTHDR_EXTENSION; 607 + } else { 608 + rdsdebug("RDS_EXTHDR_RDMA_BYTES dropped"); 609 + } 585 610 } 586 - if (rm->m_rdma_cookie) { 587 - rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr, 588 - rds_rdma_cookie_key(rm->m_rdma_cookie), 589 - rds_rdma_cookie_offset(rm->m_rdma_cookie)); 611 + if (rm->m_rdma_cookie && 612 + !rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr, 613 + rds_rdma_cookie_key(rm->m_rdma_cookie), 614 + rds_rdma_cookie_offset(rm->m_rdma_cookie))) { 615 + rdsdebug("RDS_EXTHDR_RDMA_DEST dropped\n"); 590 616 } 591 617 592 618 /* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so
+53 -13
net/rds/message.c
··· 44 44 [RDS_EXTHDR_VERSION] = sizeof(struct rds_ext_header_version), 45 45 [RDS_EXTHDR_RDMA] = sizeof(struct rds_ext_header_rdma), 46 46 [RDS_EXTHDR_RDMA_DEST] = sizeof(struct rds_ext_header_rdma_dest), 47 + [RDS_EXTHDR_RDMA_BYTES] = sizeof(struct rds_ext_header_rdma_bytes), 47 48 [RDS_EXTHDR_NPATHS] = sizeof(__be16), 48 49 [RDS_EXTHDR_GEN_NUM] = sizeof(__be32), 50 + [RDS_EXTHDR_SPORT_IDX] = 1, 49 51 }; 50 52 51 53 void rds_message_addref(struct rds_message *rm) ··· 193 191 hdr->h_sport = sport; 194 192 hdr->h_dport = dport; 195 193 hdr->h_sequence = cpu_to_be64(seq); 196 - hdr->h_exthdr[0] = RDS_EXTHDR_NONE; 194 + /* see rds_find_next_ext_space for reason why we memset the 195 + * ext header 196 + */ 197 + memset(hdr->h_exthdr, RDS_EXTHDR_NONE, RDS_HEADER_EXT_SPACE); 197 198 } 198 199 EXPORT_SYMBOL_GPL(rds_message_populate_header); 199 200 200 - int rds_message_add_extension(struct rds_header *hdr, unsigned int type, 201 - const void *data, unsigned int len) 201 + /* 202 + * Find the next place we can add an RDS header extension with 203 + * specific length. Extension headers are pushed one after the 204 + * other. In the following, the number after the colon is the number 205 + * of bytes: 206 + * 207 + * [ type1:1 dta1:len1 [ type2:1 dta2:len2 ] ... ] RDS_EXTHDR_NONE 208 + * 209 + * If the extension headers fill the complete extension header space 210 + * (16 bytes), the trailing RDS_EXTHDR_NONE is omitted. 211 + */ 212 + static int rds_find_next_ext_space(struct rds_header *hdr, unsigned int len, 213 + u8 **ext_start) 202 214 { 203 - unsigned int ext_len = sizeof(u8) + len; 215 + unsigned int ext_len; 216 + unsigned int type; 217 + int ind = 0; 218 + 219 + while ((ind + 1 + len) <= RDS_HEADER_EXT_SPACE) { 220 + if (hdr->h_exthdr[ind] == RDS_EXTHDR_NONE) { 221 + *ext_start = hdr->h_exthdr + ind; 222 + return 0; 223 + } 224 + 225 + type = hdr->h_exthdr[ind]; 226 + 227 + ext_len = (type < __RDS_EXTHDR_MAX) ? rds_exthdr_size[type] : 0; 228 + WARN_ONCE(!ext_len, "Unknown ext hdr type %d\n", type); 229 + if (!ext_len) 230 + return -EINVAL; 231 + 232 + /* ind points to a valid ext hdr with known length */ 233 + ind += 1 + ext_len; 234 + } 235 + 236 + /* no room for extension */ 237 + return -ENOSPC; 238 + } 239 + 240 + /* The ext hdr space is prefilled with zero from the kzalloc() */ 241 + int rds_message_add_extension(struct rds_header *hdr, 242 + unsigned int type, const void *data) 243 + { 204 244 unsigned char *dst; 245 + unsigned int len; 205 246 206 - /* For now, refuse to add more than one extension header */ 207 - if (hdr->h_exthdr[0] != RDS_EXTHDR_NONE) 247 + len = (type < __RDS_EXTHDR_MAX) ? rds_exthdr_size[type] : 0; 248 + if (!len) 208 249 return 0; 209 250 210 - if (type >= __RDS_EXTHDR_MAX || len != rds_exthdr_size[type]) 251 + if (rds_find_next_ext_space(hdr, len, &dst)) 211 252 return 0; 212 - 213 - if (ext_len >= RDS_HEADER_EXT_SPACE) 214 - return 0; 215 - dst = hdr->h_exthdr; 216 253 217 254 *dst++ = type; 218 255 memcpy(dst, data, len); 219 256 220 - dst[len] = RDS_EXTHDR_NONE; 221 257 return 1; 222 258 } 223 259 EXPORT_SYMBOL_GPL(rds_message_add_extension); ··· 312 272 313 273 ext_hdr.h_rdma_rkey = cpu_to_be32(r_key); 314 274 ext_hdr.h_rdma_offset = cpu_to_be32(offset); 315 - return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr, sizeof(ext_hdr)); 275 + return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr); 316 276 } 317 277 EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension); 318 278
+63 -42
net/rds/rds.h
··· 147 147 c_ping_triggered:1, 148 148 c_pad_to_32:29; 149 149 int c_npaths; 150 + bool c_with_sport_idx; 150 151 struct rds_connection *c_passive; 151 152 struct rds_transport *c_trans; 152 153 ··· 170 169 171 170 u32 c_my_gen_num; 172 171 u32 c_peer_gen_num; 172 + 173 + u64 c_cp0_mprds_catchup_tx_seq; 173 174 }; 174 175 175 176 static inline ··· 186 183 write_pnet(&conn->c_net, net); 187 184 } 188 185 189 - #define RDS_FLAG_CONG_BITMAP 0x01 190 - #define RDS_FLAG_ACK_REQUIRED 0x02 191 - #define RDS_FLAG_RETRANSMITTED 0x04 192 - #define RDS_MAX_ADV_CREDIT 255 186 + #define RDS_FLAG_CONG_BITMAP 0x01 187 + #define RDS_FLAG_ACK_REQUIRED 0x02 188 + #define RDS_FLAG_RETRANSMITTED 0x04 189 + #define RDS_FLAG_EXTHDR_EXTENSION 0x20 190 + #define RDS_MAX_ADV_CREDIT 255 193 191 194 192 /* RDS_FLAG_PROBE_PORT is the reserved sport used for sending a ping 195 193 * probe to exchange control information before establishing a connection. ··· 262 258 __be32 h_rdma_offset; 263 259 }; 264 260 261 + /* 262 + * This extension header tells the peer about delivered RDMA byte count. 263 + */ 264 + #define RDS_EXTHDR_RDMA_BYTES 4 265 + 266 + struct rds_ext_header_rdma_bytes { 267 + __be32 h_rdma_bytes; /* byte count */ 268 + u8 h_rflags; /* direction of RDMA, write or read */ 269 + u8 h_pad[3]; 270 + }; 271 + 272 + #define RDS_FLAG_RDMA_WR_BYTES 0x01 273 + #define RDS_FLAG_RDMA_RD_BYTES 0x02 274 + 265 275 /* Extension header announcing number of paths. 266 276 * Implicit length = 2 bytes. 267 277 */ 268 278 #define RDS_EXTHDR_NPATHS 5 269 279 #define RDS_EXTHDR_GEN_NUM 6 280 + #define RDS_EXTHDR_SPORT_IDX 8 270 281 271 282 #define __RDS_EXTHDR_MAX 16 /* for now */ 283 + 272 284 #define RDS_RX_MAX_TRACES (RDS_MSG_RX_DGRAM_TRACE_MAX + 1) 273 285 #define RDS_MSG_RX_HDR 0 274 286 #define RDS_MSG_RX_START 1 ··· 549 529 * messages received on the new socket are not discarded when no 550 530 * connection path was available at the time. 551 531 */ 552 - void (*conn_slots_available)(struct rds_connection *conn); 532 + void (*conn_slots_available)(struct rds_connection *conn, bool fan_out); 553 533 int (*conn_path_connect)(struct rds_conn_path *cp); 554 534 555 535 /* ··· 715 695 } 716 696 717 697 struct rds_statistics { 718 - uint64_t s_conn_reset; 719 - uint64_t s_recv_drop_bad_checksum; 720 - uint64_t s_recv_drop_old_seq; 721 - uint64_t s_recv_drop_no_sock; 722 - uint64_t s_recv_drop_dead_sock; 723 - uint64_t s_recv_deliver_raced; 724 - uint64_t s_recv_delivered; 725 - uint64_t s_recv_queued; 726 - uint64_t s_recv_immediate_retry; 727 - uint64_t s_recv_delayed_retry; 728 - uint64_t s_recv_ack_required; 729 - uint64_t s_recv_rdma_bytes; 730 - uint64_t s_recv_ping; 731 - uint64_t s_send_queue_empty; 732 - uint64_t s_send_queue_full; 733 - uint64_t s_send_lock_contention; 734 - uint64_t s_send_lock_queue_raced; 735 - uint64_t s_send_immediate_retry; 736 - uint64_t s_send_delayed_retry; 737 - uint64_t s_send_drop_acked; 738 - uint64_t s_send_ack_required; 739 - uint64_t s_send_queued; 740 - uint64_t s_send_rdma; 741 - uint64_t s_send_rdma_bytes; 742 - uint64_t s_send_pong; 743 - uint64_t s_page_remainder_hit; 744 - uint64_t s_page_remainder_miss; 745 - uint64_t s_copy_to_user; 746 - uint64_t s_copy_from_user; 747 - uint64_t s_cong_update_queued; 748 - uint64_t s_cong_update_received; 749 - uint64_t s_cong_send_error; 750 - uint64_t s_cong_send_blocked; 751 - uint64_t s_recv_bytes_added_to_socket; 752 - uint64_t s_recv_bytes_removed_from_socket; 753 - uint64_t s_send_stuck_rm; 698 + u64 s_conn_reset; 699 + u64 s_recv_drop_bad_checksum; 700 + u64 s_recv_drop_old_seq; 701 + u64 s_recv_drop_no_sock; 702 + u64 s_recv_drop_dead_sock; 703 + u64 s_recv_deliver_raced; 704 + u64 s_recv_delivered; 705 + u64 s_recv_queued; 706 + u64 s_recv_immediate_retry; 707 + u64 s_recv_delayed_retry; 708 + u64 s_recv_ack_required; 709 + u64 s_recv_rdma_bytes; 710 + u64 s_recv_ping; 711 + u64 s_send_queue_empty; 712 + u64 s_send_queue_full; 713 + u64 s_send_lock_contention; 714 + u64 s_send_lock_queue_raced; 715 + u64 s_send_immediate_retry; 716 + u64 s_send_delayed_retry; 717 + u64 s_send_drop_acked; 718 + u64 s_send_ack_required; 719 + u64 s_send_queued; 720 + u64 s_send_rdma; 721 + u64 s_send_rdma_bytes; 722 + u64 s_send_pong; 723 + u64 s_page_remainder_hit; 724 + u64 s_page_remainder_miss; 725 + u64 s_copy_to_user; 726 + u64 s_copy_from_user; 727 + u64 s_cong_update_queued; 728 + u64 s_cong_update_received; 729 + u64 s_cong_send_error; 730 + u64 s_cong_send_blocked; 731 + u64 s_recv_bytes_added_to_socket; 732 + u64 s_recv_bytes_removed_from_socket; 733 + u64 s_send_stuck_rm; 734 + u64 s_mprds_catchup_tx0_retries; 754 735 }; 755 736 756 737 /* af_rds.c */ ··· 892 871 void rds_message_populate_header(struct rds_header *hdr, __be16 sport, 893 872 __be16 dport, u64 seq); 894 873 int rds_message_add_extension(struct rds_header *hdr, 895 - unsigned int type, const void *data, unsigned int len); 874 + unsigned int type, const void *data); 896 875 int rds_message_next_extension(struct rds_header *hdr, 897 876 unsigned int *pos, void *buf, unsigned int *buflen); 898 877 int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset);
+33 -4
net/rds/recv.c
··· 204 204 struct rds_ext_header_version version; 205 205 __be16 rds_npaths; 206 206 __be32 rds_gen_num; 207 + u8 dummy; 207 208 } buffer; 209 + bool new_with_sport_idx = false; 208 210 u32 new_peer_gen_num = 0; 211 + int new_npaths; 212 + bool fan_out; 213 + 214 + new_npaths = conn->c_npaths; 209 215 210 216 while (1) { 211 217 len = sizeof(buffer); ··· 221 215 /* Process extension header here */ 222 216 switch (type) { 223 217 case RDS_EXTHDR_NPATHS: 224 - conn->c_npaths = min_t(int, RDS_MPATH_WORKERS, 225 - be16_to_cpu(buffer.rds_npaths)); 218 + new_npaths = min_t(int, RDS_MPATH_WORKERS, 219 + be16_to_cpu(buffer.rds_npaths)); 226 220 break; 227 221 case RDS_EXTHDR_GEN_NUM: 228 222 new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num); 223 + break; 224 + case RDS_EXTHDR_SPORT_IDX: 225 + new_with_sport_idx = true; 229 226 break; 230 227 default: 231 228 pr_warn_ratelimited("ignoring unknown exthdr type " 232 229 "0x%x\n", type); 233 230 } 234 231 } 232 + 233 + conn->c_with_sport_idx = new_with_sport_idx; 234 + 235 + if (new_npaths > 1 && new_npaths != conn->c_npaths) { 236 + /* We're about to fan-out. 237 + * Make sure that messages from cp_index#0 238 + * are sent prior to handling other lanes. 239 + */ 240 + struct rds_conn_path *cp0 = conn->c_path; 241 + unsigned long flags; 242 + 243 + spin_lock_irqsave(&cp0->cp_lock, flags); 244 + conn->c_cp0_mprds_catchup_tx_seq = cp0->cp_next_tx_seq; 245 + spin_unlock_irqrestore(&cp0->cp_lock, flags); 246 + fan_out = true; 247 + } else { 248 + fan_out = false; 249 + } 250 + 235 251 /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */ 236 - conn->c_npaths = max_t(int, conn->c_npaths, 1); 252 + conn->c_npaths = max_t(int, new_npaths, 1); 253 + 237 254 conn->c_ping_triggered = 0; 238 255 rds_conn_peer_gen_update(conn, new_peer_gen_num); 239 256 240 257 if (conn->c_npaths > 1 && 241 258 conn->c_trans->conn_slots_available) 242 - conn->c_trans->conn_slots_available(conn); 259 + conn->c_trans->conn_slots_available(conn, fan_out); 243 260 } 244 261 245 262 /* rds_start_mprds() will synchronously start multiple paths when appropriate.
+90 -40
net/rds/send.c
··· 120 120 } 121 121 122 122 /* 123 + * Helper function for multipath fanout to ensure lane 0 transmits queued 124 + * messages before other lanes to prevent out-of-order delivery. 125 + * 126 + * Returns true if lane 0 still has messages or false otherwise 127 + */ 128 + static bool rds_mprds_cp0_catchup(struct rds_connection *conn) 129 + { 130 + struct rds_conn_path *cp0 = conn->c_path; 131 + struct rds_message *rm0; 132 + unsigned long flags; 133 + bool ret = false; 134 + 135 + spin_lock_irqsave(&cp0->cp_lock, flags); 136 + 137 + /* the oldest / first message in the retransmit queue 138 + * has to be at or beyond c_cp0_mprds_catchup_tx_seq 139 + */ 140 + if (!list_empty(&cp0->cp_retrans)) { 141 + rm0 = list_entry(cp0->cp_retrans.next, struct rds_message, 142 + m_conn_item); 143 + if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) < 144 + conn->c_cp0_mprds_catchup_tx_seq) { 145 + /* the retransmit queue of cp_index#0 has not 146 + * quite caught up yet 147 + */ 148 + ret = true; 149 + goto unlock; 150 + } 151 + } 152 + 153 + /* the oldest / first message of the send queue 154 + * has to be at or beyond c_cp0_mprds_catchup_tx_seq 155 + */ 156 + rm0 = cp0->cp_xmit_rm; 157 + if (!rm0 && !list_empty(&cp0->cp_send_queue)) 158 + rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message, 159 + m_conn_item); 160 + if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) < 161 + conn->c_cp0_mprds_catchup_tx_seq) { 162 + /* the send queue of cp_index#0 has not quite 163 + * caught up yet 164 + */ 165 + ret = true; 166 + } 167 + 168 + unlock: 169 + spin_unlock_irqrestore(&cp0->cp_lock, flags); 170 + return ret; 171 + } 172 + 173 + /* 123 174 * We're making the conscious trade-off here to only send one message 124 175 * down the connection at a time. 125 176 * Pro: ··· 298 247 */ 299 248 if (batch_count >= send_batch_count) 300 249 goto over_batch; 250 + 251 + /* make sure cp_index#0 caught up during fan-out in 252 + * order to avoid lane races 253 + */ 254 + if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) { 255 + rds_stats_inc(s_mprds_catchup_tx0_retries); 256 + goto over_batch; 257 + } 301 258 302 259 spin_lock_irqsave(&cp->cp_lock, flags); 303 260 ··· 1101 1042 return ret; 1102 1043 } 1103 1044 1104 - static int rds_send_mprds_hash(struct rds_sock *rs, 1105 - struct rds_connection *conn, int nonblock) 1106 - { 1107 - int hash; 1108 - 1109 - if (conn->c_npaths == 0) 1110 - hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); 1111 - else 1112 - hash = RDS_MPATH_HASH(rs, conn->c_npaths); 1113 - if (conn->c_npaths == 0 && hash != 0) { 1114 - rds_send_ping(conn, 0); 1115 - 1116 - /* The underlying connection is not up yet. Need to wait 1117 - * until it is up to be sure that the non-zero c_path can be 1118 - * used. But if we are interrupted, we have to use the zero 1119 - * c_path in case the connection ends up being non-MP capable. 1120 - */ 1121 - if (conn->c_npaths == 0) { 1122 - /* Cannot wait for the connection be made, so just use 1123 - * the base c_path. 1124 - */ 1125 - if (nonblock) 1126 - return 0; 1127 - if (wait_event_interruptible(conn->c_hs_waitq, 1128 - conn->c_npaths != 0)) 1129 - hash = 0; 1130 - } 1131 - if (conn->c_npaths == 1) 1132 - hash = 0; 1133 - } 1134 - return hash; 1135 - } 1136 - 1137 1045 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) 1138 1046 { 1139 1047 struct rds_rdma_args *args; ··· 1330 1304 rs->rs_conn = conn; 1331 1305 } 1332 1306 1333 - if (conn->c_trans->t_mp_capable) 1334 - cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)]; 1335 - else 1307 + if (conn->c_trans->t_mp_capable) { 1308 + /* Use c_path[0] until we learn that 1309 + * the peer supports more (c_npaths > 1) 1310 + */ 1311 + cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)]; 1312 + } else { 1336 1313 cpath = &conn->c_path[0]; 1314 + } 1315 + 1316 + /* If we're multipath capable and path 0 is down, queue reconnect 1317 + * and send a ping. This initiates the multipath handshake through 1318 + * rds_send_probe(), which sends RDS_EXTHDR_NPATHS to the peer, 1319 + * starting multipath capability negotiation. 1320 + */ 1321 + if (conn->c_trans->t_mp_capable && 1322 + !rds_conn_path_up(&conn->c_path[0])) { 1323 + /* Ensures that only one request is queued. And 1324 + * rds_send_ping() ensures that only one ping is 1325 + * outstanding. 1326 + */ 1327 + if (!test_and_set_bit(RDS_RECONNECT_PENDING, 1328 + &conn->c_path[0].cp_flags)) 1329 + queue_delayed_work(conn->c_path[0].cp_wq, 1330 + &conn->c_path[0].cp_conn_w, 0); 1331 + rds_send_ping(conn, 0); 1332 + } 1337 1333 1338 1334 rm->m_conn_path = cpath; 1339 1335 ··· 1505 1457 cp->cp_conn->c_trans->t_mp_capable) { 1506 1458 __be16 npaths = cpu_to_be16(RDS_MPATH_WORKERS); 1507 1459 __be32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num); 1460 + u8 dummy = 0; 1508 1461 1509 1462 rds_message_add_extension(&rm->m_inc.i_hdr, 1510 - RDS_EXTHDR_NPATHS, &npaths, 1511 - sizeof(npaths)); 1463 + RDS_EXTHDR_NPATHS, &npaths); 1512 1464 rds_message_add_extension(&rm->m_inc.i_hdr, 1513 1465 RDS_EXTHDR_GEN_NUM, 1514 - &my_gen_num, 1515 - sizeof(u32)); 1466 + &my_gen_num); 1467 + rds_message_add_extension(&rm->m_inc.i_hdr, 1468 + RDS_EXTHDR_SPORT_IDX, 1469 + &dummy); 1516 1470 } 1517 1471 spin_unlock_irqrestore(&cp->cp_lock, flags); 1518 1472
+1
net/rds/stats.c
··· 79 79 "recv_bytes_added_to_sock", 80 80 "recv_bytes_freed_fromsock", 81 81 "send_stuck_rm", 82 + "mprds_catchup_tx0_retries", 82 83 }; 83 84 84 85 void rds_stats_info_copy(struct rds_info_iterator *iter,
+1
net/rds/tcp.c
··· 384 384 tc->t_tinc = NULL; 385 385 tc->t_tinc_hdr_rem = sizeof(struct rds_header); 386 386 tc->t_tinc_data_rem = 0; 387 + init_waitqueue_head(&tc->t_recv_done_waitq); 387 388 388 389 conn->c_path[i].cp_transport_data = tc; 389 390 tc->t_cpath = &conn->c_path[i];
+6 -1
net/rds/tcp.h
··· 34 34 */ 35 35 struct mutex t_conn_path_lock; 36 36 struct socket *t_sock; 37 + u32 t_client_port_group; 37 38 struct rds_tcp_net *t_rtn; 38 39 void *t_orig_write_space; 39 40 void *t_orig_data_ready; ··· 55 54 u32 t_last_sent_nxt; 56 55 u32 t_last_expected_una; 57 56 u32 t_last_seen_una; 57 + 58 + /* for rds_tcp_conn_path_shutdown */ 59 + wait_queue_head_t t_recv_done_waitq; 58 60 }; 59 61 60 62 struct rds_tcp_statistics { ··· 90 86 struct socket *rds_tcp_listen_init(struct net *net, bool isv6); 91 87 void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor); 92 88 void rds_tcp_listen_data_ready(struct sock *sk); 93 - void rds_tcp_conn_slots_available(struct rds_connection *conn); 89 + void rds_tcp_conn_slots_available(struct rds_connection *conn, bool fan_out); 94 90 int rds_tcp_accept_one(struct rds_tcp_net *rtn); 95 91 void rds_tcp_keepalive(struct socket *sock); 96 92 void *rds_tcp_listen_sock_def_readable(struct net *net); ··· 108 104 void rds_tcp_xmit_path_complete(struct rds_conn_path *cp); 109 105 int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, 110 106 unsigned int hdr_off, unsigned int sg, unsigned int off); 107 + int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack); 111 108 void rds_tcp_write_space(struct sock *sk); 112 109 113 110 /* tcp_stats.c */
+74 -5
net/rds/tcp_connect.c
··· 75 75 rds_connect_path_complete(cp, RDS_CONN_CONNECTING); 76 76 } 77 77 break; 78 + case TCP_CLOSING: 79 + case TCP_TIME_WAIT: 80 + if (wq_has_sleeper(&tc->t_recv_done_waitq)) 81 + wake_up(&tc->t_recv_done_waitq); 82 + break; 78 83 case TCP_CLOSE_WAIT: 84 + case TCP_LAST_ACK: 79 85 case TCP_CLOSE: 86 + if (wq_has_sleeper(&tc->t_recv_done_waitq)) 87 + wake_up(&tc->t_recv_done_waitq); 80 88 rds_conn_path_drop(cp, false); 81 89 break; 82 90 default: ··· 101 93 struct sockaddr_in6 sin6; 102 94 struct sockaddr_in sin; 103 95 struct sockaddr *addr; 96 + int port_low, port_high, port; 97 + int port_groups, groups_left; 104 98 int addrlen; 105 99 bool isv6; 106 100 int ret; ··· 155 145 addrlen = sizeof(sin); 156 146 } 157 147 158 - ret = kernel_bind(sock, (struct sockaddr_unsized *)addr, addrlen); 148 + /* encode cp->cp_index in lowest bits of source-port */ 149 + inet_get_local_port_range(rds_conn_net(conn), &port_low, &port_high); 150 + port_low = ALIGN(port_low, RDS_MPATH_WORKERS); 151 + port_groups = (port_high - port_low + 1) / RDS_MPATH_WORKERS; 152 + ret = -EADDRINUSE; 153 + groups_left = port_groups; 154 + while (groups_left-- > 0 && ret) { 155 + if (++tc->t_client_port_group >= port_groups) 156 + tc->t_client_port_group = 0; 157 + port = port_low + 158 + tc->t_client_port_group * RDS_MPATH_WORKERS + 159 + cp->cp_index; 160 + 161 + if (isv6) 162 + sin6.sin6_port = htons(port); 163 + else 164 + sin.sin_port = htons(port); 165 + ret = kernel_bind(sock, (struct sockaddr_unsized *)addr, 166 + addrlen); 167 + } 159 168 if (ret) { 160 169 rdsdebug("bind failed with %d at address %pI6c\n", 161 170 ret, &conn->c_laddr); ··· 234 205 { 235 206 struct rds_tcp_connection *tc = cp->cp_transport_data; 236 207 struct socket *sock = tc->t_sock; 208 + struct sock *sk; 209 + unsigned int rounds; 237 210 238 211 rdsdebug("shutting down conn %p tc %p sock %p\n", 239 212 cp->cp_conn, tc, sock); 240 213 241 214 if (sock) { 215 + sk = sock->sk; 242 216 if (rds_destroy_pending(cp->cp_conn)) 243 - sock_no_linger(sock->sk); 244 - sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); 245 - lock_sock(sock->sk); 217 + sock_no_linger(sk); 218 + 219 + sock->ops->shutdown(sock, SHUT_WR); 220 + 221 + /* after sending FIN, 222 + * wait until we processed all incoming messages 223 + * and we're sure that there won't be any more: 224 + * i.e. state CLOSING, TIME_WAIT, CLOSE_WAIT, 225 + * LAST_ACK, or CLOSE (RFC 793). 226 + * 227 + * Give up waiting after 5 seconds and allow messages 228 + * to theoretically get dropped, if the TCP transition 229 + * didn't happen. 230 + */ 231 + rounds = 0; 232 + do { 233 + /* we need to ensure messages are dequeued here 234 + * since "rds_recv_worker" only dispatches messages 235 + * while the connection is still in RDS_CONN_UP 236 + * and there is no guarantee that "rds_tcp_data_ready" 237 + * was called nor that "sk_data_ready" still points to 238 + * it. 239 + */ 240 + rds_tcp_recv_path(cp); 241 + } while (!wait_event_timeout(tc->t_recv_done_waitq, 242 + (sk->sk_state == TCP_CLOSING || 243 + sk->sk_state == TCP_TIME_WAIT || 244 + sk->sk_state == TCP_CLOSE_WAIT || 245 + sk->sk_state == TCP_LAST_ACK || 246 + sk->sk_state == TCP_CLOSE) && 247 + skb_queue_empty_lockless(&sk->sk_receive_queue), 248 + msecs_to_jiffies(100)) && 249 + ++rounds < 50); 250 + lock_sock(sk); 251 + 252 + /* discard messages that the peer received already */ 253 + tc->t_last_seen_una = rds_tcp_snd_una(tc); 254 + rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), 255 + rds_tcp_is_acked); 256 + 246 257 rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */ 247 258 248 - release_sock(sock->sk); 259 + release_sock(sk); 249 260 sock_release(sock); 250 261 } 251 262
+85 -7
net/rds/tcp_listen.c
··· 56 56 tcp_sock_set_keepintvl(sock->sk, keepidle); 57 57 } 58 58 59 + static int 60 + rds_tcp_get_peer_sport(struct socket *sock) 61 + { 62 + union { 63 + struct sockaddr_storage storage; 64 + struct sockaddr addr; 65 + struct sockaddr_in sin; 66 + struct sockaddr_in6 sin6; 67 + } saddr; 68 + int sport; 69 + 70 + if (kernel_getpeername(sock, &saddr.addr) >= 0) { 71 + switch (saddr.addr.sa_family) { 72 + case AF_INET: 73 + sport = ntohs(saddr.sin.sin_port); 74 + break; 75 + case AF_INET6: 76 + sport = ntohs(saddr.sin6.sin6_port); 77 + break; 78 + default: 79 + sport = -1; 80 + } 81 + } else { 82 + sport = -1; 83 + } 84 + 85 + return sport; 86 + } 87 + 59 88 /* rds_tcp_accept_one_path(): if accepting on cp_index > 0, make sure the 60 89 * client's ipaddr < server's ipaddr. Otherwise, close the accepted 61 90 * socket and force a reconneect from smaller -> larger ip addr. The reason 62 91 * we special case cp_index 0 is to allow the rds probe ping itself to itself 63 92 * get through efficiently. 64 93 */ 65 - static 66 - struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) 94 + static struct rds_tcp_connection * 95 + rds_tcp_accept_one_path(struct rds_connection *conn, struct socket *sock) 67 96 { 68 - int i; 69 - int npaths = max_t(int, 1, conn->c_npaths); 97 + int sport, npaths, i_min, i_max, i; 70 98 71 - for (i = 0; i < npaths; i++) { 99 + if (conn->c_with_sport_idx) 100 + /* cp->cp_index is encoded in lowest bits of source-port */ 101 + sport = rds_tcp_get_peer_sport(sock); 102 + else 103 + sport = -1; 104 + 105 + npaths = max_t(int, 1, conn->c_npaths); 106 + 107 + if (sport >= 0) { 108 + i_min = sport % npaths; 109 + i_max = i_min; 110 + } else { 111 + i_min = 0; 112 + i_max = npaths - 1; 113 + } 114 + 115 + for (i = i_min; i <= i_max; i++) { 72 116 struct rds_conn_path *cp = &conn->c_path[i]; 73 117 74 118 if (rds_conn_path_transition(cp, RDS_CONN_DOWN, 75 119 RDS_CONN_CONNECTING)) 76 120 return cp->cp_transport_data; 77 121 } 122 + 78 123 return NULL; 79 124 } 80 125 81 - void rds_tcp_conn_slots_available(struct rds_connection *conn) 126 + void rds_tcp_conn_slots_available(struct rds_connection *conn, bool fan_out) 82 127 { 83 128 struct rds_tcp_connection *tc; 84 129 struct rds_tcp_net *rtn; 130 + struct socket *sock; 131 + int sport, npaths; 85 132 86 133 if (rds_destroy_pending(conn)) 87 134 return; ··· 137 90 rtn = tc->t_rtn; 138 91 if (!rtn) 139 92 return; 93 + 94 + sock = tc->t_sock; 95 + 96 + /* During fan-out, check that the connection we already 97 + * accepted in slot#0 carried the proper source port modulo. 98 + */ 99 + if (fan_out && conn->c_with_sport_idx && sock && 100 + rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) > 0) { 101 + /* cp->cp_index is encoded in lowest bits of source-port */ 102 + sport = rds_tcp_get_peer_sport(sock); 103 + npaths = max_t(int, 1, conn->c_npaths); 104 + if (sport >= 0 && sport % npaths != 0) 105 + /* peer initiated with a non-#0 lane first */ 106 + rds_conn_path_drop(conn->c_path, 0); 107 + } 140 108 141 109 /* As soon as a connection went down, 142 110 * it is safe to schedule a "rds_tcp_accept_one" ··· 261 199 * to and discarded by the sender. 262 200 * We must not throw those away! 263 201 */ 264 - rs_tcp = rds_tcp_accept_one_path(conn); 202 + rs_tcp = rds_tcp_accept_one_path(conn, new_sock); 265 203 if (!rs_tcp) { 266 204 /* It's okay to stash "new_sock", since 267 205 * "rds_tcp_conn_slots_available" triggers ··· 307 245 rds_tcp_set_callbacks(new_sock, cp); 308 246 rds_connect_path_complete(cp, RDS_CONN_CONNECTING); 309 247 } 248 + 249 + /* Since "rds_tcp_set_callbacks" happens this late 250 + * the connection may already have been closed without 251 + * "rds_tcp_state_change" doing its due diligence. 252 + * 253 + * If that's the case, we simply drop the path, 254 + * knowing that "rds_tcp_conn_path_shutdown" will 255 + * dequeue pending messages. 256 + */ 257 + if (new_sock->sk->sk_state == TCP_CLOSE_WAIT || 258 + new_sock->sk->sk_state == TCP_LAST_ACK || 259 + new_sock->sk->sk_state == TCP_CLOSE) 260 + rds_conn_path_drop(cp, 0); 261 + else 262 + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0); 263 + 310 264 new_sock = NULL; 311 265 ret = 0; 312 266 if (conn->c_npaths == 0)
+4
net/rds/tcp_recv.c
··· 278 278 rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp, 279 279 desc.error); 280 280 281 + if (skb_queue_empty_lockless(&sock->sk->sk_receive_queue) && 282 + wq_has_sleeper(&tc->t_recv_done_waitq)) 283 + wake_up(&tc->t_recv_done_waitq); 284 + 281 285 return desc.error; 282 286 } 283 287
+1 -1
net/rds/tcp_send.c
··· 169 169 * unacked byte of the TCP sequence space. We have to do very careful 170 170 * wrapping 32bit comparisons here. 171 171 */ 172 - static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) 172 + int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) 173 173 { 174 174 if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags)) 175 175 return 0;