Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'af-xdp-tx-batch'

Magnus Karlsson says:

====================
This patch set improves the performance of mainly the Tx processing of
AF_XDP sockets. Though, patch 3 also improves the Rx path. All in all,
this patch set improves the throughput of the l2fwd xdpsock application
by around 11%. If we just take a look at Tx processing part, it is
improved by 35% to 40%.

Hopefully the new batched Tx interfaces should be of value to other
drivers implementing AF_XDP zero-copy support. But patch #3 is generic
and will improve performance of all drivers when using AF_XDP sockets
(under the premises explained in that patch).

@Daniel. In patch 3, I apply all the padding required to hinder the
adjacency prefetcher to prefetch the wrong things. After this patch
set, I will submit another patch set that introduces
____cacheline_padding_in_smp in include/linux/cache.h according to your
suggestions. The last patch in that patch set will then convert the
explicit paddings that we have now to ____cacheline_padding_in_smp.

v2 -> v3:
* Fixed #pragma warning with clang and defined a loop_unrolled_for macro
for easier readability [lkp, Nick]
* Simplified invalid descriptor handling in xskq_cons_read_desc_batch()

v1 -> v2:
* Removed added parameter in i40e_setup_tx_descriptors and adopted a
simpler solution [Maciej]
* Added test for !xs in xsk_tx_peek_release_desc_batch() [John]
* Simplified return path in xsk_tx_peek_release_desc_batch() [John]
* Dropped patch #1 in v1 that introduced lazy completions. Hopefully
this is not needed when we get busy poll [Jakub]
* Iterate over local variable in xskq_prod_reserve_addr_batch() for
improved performance
* Fixed the fallback path in xsk_tx_peek_release_desc_batch() so that
it also produces a batch of descriptors, albeit by using the slower
(but more general) older code. This improves the performance of the
case when multiple sockets are sharing the same device and queue id.
====================

Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>

+256 -54
+11
drivers/net/ethernet/intel/i40e/i40e_txrx.c
··· 676 676 i40e_clean_tx_ring(tx_ring); 677 677 kfree(tx_ring->tx_bi); 678 678 tx_ring->tx_bi = NULL; 679 + kfree(tx_ring->xsk_descs); 680 + tx_ring->xsk_descs = NULL; 679 681 680 682 if (tx_ring->desc) { 681 683 dma_free_coherent(tx_ring->dev, tx_ring->size, ··· 1279 1277 if (!tx_ring->tx_bi) 1280 1278 goto err; 1281 1279 1280 + if (ring_is_xdp(tx_ring)) { 1281 + tx_ring->xsk_descs = kcalloc(I40E_MAX_NUM_DESCRIPTORS, sizeof(*tx_ring->xsk_descs), 1282 + GFP_KERNEL); 1283 + if (!tx_ring->xsk_descs) 1284 + goto err; 1285 + } 1286 + 1282 1287 u64_stats_init(&tx_ring->syncp); 1283 1288 1284 1289 /* round up to nearest 4K */ ··· 1309 1300 return 0; 1310 1301 1311 1302 err: 1303 + kfree(tx_ring->xsk_descs); 1304 + tx_ring->xsk_descs = NULL; 1312 1305 kfree(tx_ring->tx_bi); 1313 1306 tx_ring->tx_bi = NULL; 1314 1307 return -ENOMEM;
+1
drivers/net/ethernet/intel/i40e/i40e_txrx.h
··· 389 389 struct i40e_channel *ch; 390 390 struct xdp_rxq_info xdp_rxq; 391 391 struct xsk_buff_pool *xsk_pool; 392 + struct xdp_desc *xsk_descs; /* For storing descriptors in the AF_XDP ZC path */ 392 393 } ____cacheline_internodealigned_in_smp; 393 394 394 395 static inline bool ring_uses_build_skb(struct i40e_ring *ring)
+82 -37
drivers/net/ethernet/intel/i40e/i40e_xsk.c
··· 2 2 /* Copyright(c) 2018 Intel Corporation. */ 3 3 4 4 #include <linux/bpf_trace.h> 5 + #include <linux/stringify.h> 5 6 #include <net/xdp_sock_drv.h> 6 7 #include <net/xdp.h> 7 8 ··· 382 381 return failure ? budget : (int)total_rx_packets; 383 382 } 384 383 384 + static void i40e_xmit_pkt(struct i40e_ring *xdp_ring, struct xdp_desc *desc, 385 + unsigned int *total_bytes) 386 + { 387 + struct i40e_tx_desc *tx_desc; 388 + dma_addr_t dma; 389 + 390 + dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc->addr); 391 + xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, desc->len); 392 + 393 + tx_desc = I40E_TX_DESC(xdp_ring, xdp_ring->next_to_use++); 394 + tx_desc->buffer_addr = cpu_to_le64(dma); 395 + tx_desc->cmd_type_offset_bsz = build_ctob(I40E_TX_DESC_CMD_ICRC | I40E_TX_DESC_CMD_EOP, 396 + 0, desc->len, 0); 397 + 398 + *total_bytes += desc->len; 399 + } 400 + 401 + static void i40e_xmit_pkt_batch(struct i40e_ring *xdp_ring, struct xdp_desc *desc, 402 + unsigned int *total_bytes) 403 + { 404 + u16 ntu = xdp_ring->next_to_use; 405 + struct i40e_tx_desc *tx_desc; 406 + dma_addr_t dma; 407 + u32 i; 408 + 409 + loop_unrolled_for(i = 0; i < PKTS_PER_BATCH; i++) { 410 + dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc[i].addr); 411 + xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, desc[i].len); 412 + 413 + tx_desc = I40E_TX_DESC(xdp_ring, ntu++); 414 + tx_desc->buffer_addr = cpu_to_le64(dma); 415 + tx_desc->cmd_type_offset_bsz = build_ctob(I40E_TX_DESC_CMD_ICRC | 416 + I40E_TX_DESC_CMD_EOP, 417 + 0, desc[i].len, 0); 418 + 419 + *total_bytes += desc[i].len; 420 + } 421 + 422 + xdp_ring->next_to_use = ntu; 423 + } 424 + 425 + static void i40e_fill_tx_hw_ring(struct i40e_ring *xdp_ring, struct xdp_desc *descs, u32 nb_pkts, 426 + unsigned int *total_bytes) 427 + { 428 + u32 batched, leftover, i; 429 + 430 + batched = nb_pkts & ~(PKTS_PER_BATCH - 1); 431 + leftover = nb_pkts & (PKTS_PER_BATCH - 1); 432 + for (i = 0; i < batched; i += PKTS_PER_BATCH) 433 + i40e_xmit_pkt_batch(xdp_ring, &descs[i], total_bytes); 434 + for (i = batched; i < batched + leftover; i++) 435 + i40e_xmit_pkt(xdp_ring, &descs[i], total_bytes); 436 + } 437 + 438 + static void i40e_set_rs_bit(struct i40e_ring *xdp_ring) 439 + { 440 + u16 ntu = xdp_ring->next_to_use ? xdp_ring->next_to_use - 1 : xdp_ring->count - 1; 441 + struct i40e_tx_desc *tx_desc; 442 + 443 + tx_desc = I40E_TX_DESC(xdp_ring, ntu); 444 + tx_desc->cmd_type_offset_bsz |= (I40E_TX_DESC_CMD_RS << I40E_TXD_QW1_CMD_SHIFT); 445 + } 446 + 385 447 /** 386 448 * i40e_xmit_zc - Performs zero-copy Tx AF_XDP 387 449 * @xdp_ring: XDP Tx ring ··· 454 390 **/ 455 391 static bool i40e_xmit_zc(struct i40e_ring *xdp_ring, unsigned int budget) 456 392 { 457 - unsigned int sent_frames = 0, total_bytes = 0; 458 - struct i40e_tx_desc *tx_desc = NULL; 459 - struct i40e_tx_buffer *tx_bi; 460 - struct xdp_desc desc; 461 - dma_addr_t dma; 393 + struct xdp_desc *descs = xdp_ring->xsk_descs; 394 + u32 nb_pkts, nb_processed = 0; 395 + unsigned int total_bytes = 0; 462 396 463 - while (budget-- > 0) { 464 - if (!xsk_tx_peek_desc(xdp_ring->xsk_pool, &desc)) 465 - break; 397 + nb_pkts = xsk_tx_peek_release_desc_batch(xdp_ring->xsk_pool, descs, budget); 398 + if (!nb_pkts) 399 + return false; 466 400 467 - dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc.addr); 468 - xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, 469 - desc.len); 470 - 471 - tx_bi = &xdp_ring->tx_bi[xdp_ring->next_to_use]; 472 - tx_bi->bytecount = desc.len; 473 - 474 - tx_desc = I40E_TX_DESC(xdp_ring, xdp_ring->next_to_use); 475 - tx_desc->buffer_addr = cpu_to_le64(dma); 476 - tx_desc->cmd_type_offset_bsz = 477 - build_ctob(I40E_TX_DESC_CMD_ICRC 478 - | I40E_TX_DESC_CMD_EOP, 479 - 0, desc.len, 0); 480 - 481 - sent_frames++; 482 - total_bytes += tx_bi->bytecount; 483 - 484 - xdp_ring->next_to_use++; 485 - if (xdp_ring->next_to_use == xdp_ring->count) 486 - xdp_ring->next_to_use = 0; 401 + if (xdp_ring->next_to_use + nb_pkts >= xdp_ring->count) { 402 + nb_processed = xdp_ring->count - xdp_ring->next_to_use; 403 + i40e_fill_tx_hw_ring(xdp_ring, descs, nb_processed, &total_bytes); 404 + xdp_ring->next_to_use = 0; 487 405 } 488 406 489 - if (tx_desc) { 490 - /* Request an interrupt for the last frame and bump tail ptr. */ 491 - tx_desc->cmd_type_offset_bsz |= (I40E_TX_DESC_CMD_RS << 492 - I40E_TXD_QW1_CMD_SHIFT); 493 - i40e_xdp_ring_update_tail(xdp_ring); 407 + i40e_fill_tx_hw_ring(xdp_ring, &descs[nb_processed], nb_pkts - nb_processed, 408 + &total_bytes); 494 409 495 - xsk_tx_release(xdp_ring->xsk_pool); 496 - i40e_update_tx_stats(xdp_ring, sent_frames, total_bytes); 497 - } 410 + /* Request an interrupt for the last frame and bump tail ptr. */ 411 + i40e_set_rs_bit(xdp_ring); 412 + i40e_xdp_ring_update_tail(xdp_ring); 498 413 499 - return !!budget; 414 + i40e_update_tx_stats(xdp_ring, nb_pkts, total_bytes); 415 + 416 + return true; 500 417 } 501 418 502 419 /**
+16
drivers/net/ethernet/intel/i40e/i40e_xsk.h
··· 4 4 #ifndef _I40E_XSK_H_ 5 5 #define _I40E_XSK_H_ 6 6 7 + /* This value should match the pragma in the loop_unrolled_for 8 + * macro. Why 4? It is strictly empirical. It seems to be a good 9 + * compromise between the advantage of having simultaneous outstanding 10 + * reads to the DMA array that can hide each others latency and the 11 + * disadvantage of having a larger code path. 12 + */ 13 + #define PKTS_PER_BATCH 4 14 + 15 + #ifdef __clang__ 16 + #define loop_unrolled_for _Pragma("clang loop unroll_count(4)") for 17 + #elif __GNUC__ >= 8 18 + #define loop_unrolled_for _Pragma("GCC unroll 4") for 19 + #else 20 + #define loop_unrolled_for for 21 + #endif 22 + 7 23 struct i40e_vsi; 8 24 struct xsk_buff_pool; 9 25 struct zero_copy_allocator;
+7
include/net/xdp_sock_drv.h
··· 13 13 14 14 void xsk_tx_completed(struct xsk_buff_pool *pool, u32 nb_entries); 15 15 bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc); 16 + u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, u32 max); 16 17 void xsk_tx_release(struct xsk_buff_pool *pool); 17 18 struct xsk_buff_pool *xsk_get_pool_from_qid(struct net_device *dev, 18 19 u16 queue_id); ··· 127 126 struct xdp_desc *desc) 128 127 { 129 128 return false; 129 + } 130 + 131 + static inline u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, 132 + u32 max) 133 + { 134 + return 0; 130 135 } 131 136 132 137 static inline void xsk_tx_release(struct xsk_buff_pool *pool)
+57
net/xdp/xsk.c
··· 332 332 } 333 333 EXPORT_SYMBOL(xsk_tx_peek_desc); 334 334 335 + static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs, 336 + u32 max_entries) 337 + { 338 + u32 nb_pkts = 0; 339 + 340 + while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts])) 341 + nb_pkts++; 342 + 343 + xsk_tx_release(pool); 344 + return nb_pkts; 345 + } 346 + 347 + u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs, 348 + u32 max_entries) 349 + { 350 + struct xdp_sock *xs; 351 + u32 nb_pkts; 352 + 353 + rcu_read_lock(); 354 + if (!list_is_singular(&pool->xsk_tx_list)) { 355 + /* Fallback to the non-batched version */ 356 + rcu_read_unlock(); 357 + return xsk_tx_peek_release_fallback(pool, descs, max_entries); 358 + } 359 + 360 + xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list); 361 + if (!xs) { 362 + nb_pkts = 0; 363 + goto out; 364 + } 365 + 366 + nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries); 367 + if (!nb_pkts) { 368 + xs->tx->queue_empty_descs++; 369 + goto out; 370 + } 371 + 372 + /* This is the backpressure mechanism for the Tx path. Try to 373 + * reserve space in the completion queue for all packets, but 374 + * if there are fewer slots available, just process that many 375 + * packets. This avoids having to implement any buffering in 376 + * the Tx path. 377 + */ 378 + nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts); 379 + if (!nb_pkts) 380 + goto out; 381 + 382 + xskq_cons_release_n(xs->tx, nb_pkts); 383 + __xskq_cons_release(xs->tx); 384 + xs->sk.sk_write_space(&xs->sk); 385 + 386 + out: 387 + rcu_read_unlock(); 388 + return nb_pkts; 389 + } 390 + EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch); 391 + 335 392 static int xsk_wakeup(struct xdp_sock *xs, u8 flags) 336 393 { 337 394 struct net_device *dev = xs->dev;
+79 -14
net/xdp/xsk_queue.h
··· 18 18 /* Hinder the adjacent cache prefetcher to prefetch the consumer 19 19 * pointer if the producer pointer is touched and vice versa. 20 20 */ 21 - u32 pad ____cacheline_aligned_in_smp; 21 + u32 pad1 ____cacheline_aligned_in_smp; 22 22 u32 consumer ____cacheline_aligned_in_smp; 23 + u32 pad2 ____cacheline_aligned_in_smp; 23 24 u32 flags; 25 + u32 pad3 ____cacheline_aligned_in_smp; 24 26 }; 25 27 26 28 /* Used for the RX and TX queues for packets */ ··· 199 197 return false; 200 198 } 201 199 200 + static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, 201 + struct xdp_desc *descs, 202 + struct xsk_buff_pool *pool, u32 max) 203 + { 204 + u32 cached_cons = q->cached_cons, nb_entries = 0; 205 + 206 + while (cached_cons != q->cached_prod && nb_entries < max) { 207 + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 208 + u32 idx = cached_cons & q->ring_mask; 209 + 210 + descs[nb_entries] = ring->desc[idx]; 211 + if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) { 212 + /* Skip the entry */ 213 + cached_cons++; 214 + continue; 215 + } 216 + 217 + nb_entries++; 218 + cached_cons++; 219 + } 220 + 221 + return nb_entries; 222 + } 223 + 202 224 /* Functions for consumers */ 203 225 204 226 static inline void __xskq_cons_release(struct xsk_queue *q) ··· 244 218 __xskq_cons_peek(q); 245 219 } 246 220 247 - static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) 221 + static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max) 248 222 { 249 223 u32 entries = q->cached_prod - q->cached_cons; 250 224 251 - if (entries >= cnt) 252 - return true; 225 + if (entries >= max) 226 + return max; 253 227 254 228 __xskq_cons_peek(q); 255 229 entries = q->cached_prod - q->cached_cons; 256 230 257 - return entries >= cnt; 231 + return entries >= max ? max : entries; 232 + } 233 + 234 + static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) 235 + { 236 + return xskq_cons_nb_entries(q, cnt) >= cnt ? true : false; 258 237 } 259 238 260 239 static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr) ··· 278 247 return xskq_cons_read_desc(q, desc, pool); 279 248 } 280 249 250 + static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xdp_desc *descs, 251 + struct xsk_buff_pool *pool, u32 max) 252 + { 253 + u32 entries = xskq_cons_nb_entries(q, max); 254 + 255 + return xskq_cons_read_desc_batch(q, descs, pool, entries); 256 + } 257 + 258 + /* To improve performance in the xskq_cons_release functions, only update local state here. 259 + * Reflect this to global state when we get new entries from the ring in 260 + * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop. 261 + */ 281 262 static inline void xskq_cons_release(struct xsk_queue *q) 282 263 { 283 - /* To improve performance, only update local state here. 284 - * Reflect this to global state when we get new entries 285 - * from the ring in xskq_cons_get_entries() and whenever 286 - * Rx or Tx processing are completed in the NAPI loop. 287 - */ 288 264 q->cached_cons++; 265 + } 266 + 267 + static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt) 268 + { 269 + q->cached_cons += cnt; 289 270 } 290 271 291 272 static inline bool xskq_cons_is_full(struct xsk_queue *q) ··· 309 266 310 267 /* Functions for producers */ 311 268 312 - static inline bool xskq_prod_is_full(struct xsk_queue *q) 269 + static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max) 313 270 { 314 271 u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); 315 272 316 - if (free_entries) 317 - return false; 273 + if (free_entries >= max) 274 + return max; 318 275 319 276 /* Refresh the local tail pointer */ 320 277 q->cached_cons = READ_ONCE(q->ring->consumer); 321 278 free_entries = q->nentries - (q->cached_prod - q->cached_cons); 322 279 323 - return !free_entries; 280 + return free_entries >= max ? max : free_entries; 281 + } 282 + 283 + static inline bool xskq_prod_is_full(struct xsk_queue *q) 284 + { 285 + return xskq_prod_nb_free(q, 1) ? false : true; 324 286 } 325 287 326 288 static inline int xskq_prod_reserve(struct xsk_queue *q) ··· 348 300 /* A, matches D */ 349 301 ring->desc[q->cached_prod++ & q->ring_mask] = addr; 350 302 return 0; 303 + } 304 + 305 + static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs, 306 + u32 max) 307 + { 308 + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 309 + u32 nb_entries, i, cached_prod; 310 + 311 + nb_entries = xskq_prod_nb_free(q, max); 312 + 313 + /* A, matches D */ 314 + cached_prod = q->cached_prod; 315 + for (i = 0; i < nb_entries; i++) 316 + ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr; 317 + q->cached_prod = cached_prod; 318 + 319 + return nb_entries; 351 320 } 352 321 353 322 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
+3 -3
samples/bpf/xdpsock_user.c
··· 1146 1146 xsk_ring_prod__submit(&xsk->umem->fq, rcvd); 1147 1147 xsk_ring_cons__release(&xsk->umem->cq, rcvd); 1148 1148 xsk->outstanding_tx -= rcvd; 1149 - xsk->ring_stats.tx_npkts += rcvd; 1150 1149 } 1151 1150 } 1152 1151 ··· 1167 1168 if (rcvd > 0) { 1168 1169 xsk_ring_cons__release(&xsk->umem->cq, rcvd); 1169 1170 xsk->outstanding_tx -= rcvd; 1170 - xsk->ring_stats.tx_npkts += rcvd; 1171 1171 } 1172 1172 } 1173 1173 ··· 1258 1260 } 1259 1261 1260 1262 xsk_ring_prod__submit(&xsk->tx, batch_size); 1263 + xsk->ring_stats.tx_npkts += batch_size; 1261 1264 xsk->outstanding_tx += batch_size; 1262 1265 *frame_nb += batch_size; 1263 1266 *frame_nb %= NUM_FRAMES; ··· 1347 1348 } 1348 1349 return; 1349 1350 } 1351 + xsk->ring_stats.rx_npkts += rcvd; 1350 1352 1351 1353 ret = xsk_ring_prod__reserve(&xsk->tx, rcvd, &idx_tx); 1352 1354 while (ret != rcvd) { ··· 1379 1379 xsk_ring_prod__submit(&xsk->tx, rcvd); 1380 1380 xsk_ring_cons__release(&xsk->rx, rcvd); 1381 1381 1382 - xsk->ring_stats.rx_npkts += rcvd; 1382 + xsk->ring_stats.tx_npkts += rcvd; 1383 1383 xsk->outstanding_tx += rcvd; 1384 1384 } 1385 1385