Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""Test USO
5
6Sends large UDP datagrams with UDP_SEGMENT and verifies that the peer
7receives the expected total payload and that the NIC transmitted at least
8the expected number of segments.
9"""
10import random
11import socket
12import string
13
14from lib.py import ksft_run, ksft_exit, KsftSkipEx
15from lib.py import ksft_eq, ksft_ge, ksft_variants, KsftNamedVariant
16from lib.py import NetDrvEpEnv
17from lib.py import bkg, defer, ethtool, ip, rand_port, wait_port_listen
18
19# python doesn't expose this constant, so we need to hardcode it to enable UDP
20# segmentation for large payloads
21UDP_SEGMENT = 103
22
23
24def _send_uso(cfg, ipver, mss, total_payload, port):
25 if ipver == "4":
26 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
27 dst = (cfg.remote_addr_v["4"], port)
28 else:
29 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
30 dst = (cfg.remote_addr_v["6"], port)
31
32 sock.setsockopt(socket.IPPROTO_UDP, UDP_SEGMENT, mss)
33 payload = ''.join(random.choice(string.ascii_lowercase)
34 for _ in range(total_payload))
35 sock.sendto(payload.encode(), dst)
36 sock.close()
37
38
39def _get_tx_packets(cfg):
40 stats = ip(f"-s link show dev {cfg.ifname}", json=True)[0]
41 return stats['stats64']['tx']['packets']
42
43
44def _test_uso(cfg, ipver, mss, total_payload):
45 cfg.require_ipver(ipver)
46 cfg.require_cmd("socat", remote=True)
47
48 features = ethtool(f"-k {cfg.ifname}", json=True)
49 uso_was_on = features[0]["tx-udp-segmentation"]["active"]
50
51 try:
52 ethtool(f"-K {cfg.ifname} tx-udp-segmentation on")
53 except Exception as exc:
54 raise KsftSkipEx(
55 "Device does not support tx-udp-segmentation") from exc
56 if not uso_was_on:
57 defer(ethtool, f"-K {cfg.ifname} tx-udp-segmentation off")
58
59 expected_segs = (total_payload + mss - 1) // mss
60
61 port = rand_port(stype=socket.SOCK_DGRAM)
62 rx_cmd = f"socat -{ipver} -T 2 -u UDP-LISTEN:{port},reuseport STDOUT"
63
64 tx_before = _get_tx_packets(cfg)
65
66 with bkg(rx_cmd, host=cfg.remote, exit_wait=True) as rx:
67 wait_port_listen(port, proto="udp", host=cfg.remote)
68 _send_uso(cfg, ipver, mss, total_payload, port)
69
70 ksft_eq(len(rx.stdout), total_payload,
71 comment=f"Received {len(rx.stdout)}B, expected {total_payload}B")
72
73 cfg.wait_hw_stats_settle()
74
75 tx_after = _get_tx_packets(cfg)
76 tx_delta = tx_after - tx_before
77
78 ksft_ge(tx_delta, expected_segs,
79 comment=f"Expected >= {expected_segs} tx packets, got {tx_delta}")
80
81
82def _uso_variants():
83 for ipver in ["4", "6"]:
84 yield KsftNamedVariant(f"v{ipver}_partial", ipver, 1400, 1400 * 10 + 500)
85 yield KsftNamedVariant(f"v{ipver}_exact", ipver, 1400, 1400 * 5)
86
87
88@ksft_variants(_uso_variants())
89def test_uso(cfg, ipver, mss, total_payload):
90 """Send a USO datagram and verify the peer receives the expected segments."""
91 _test_uso(cfg, ipver, mss, total_payload)
92
93
94def main() -> None:
95 """Run USO tests."""
96 with NetDrvEpEnv(__file__) as cfg:
97 ksft_run([test_uso],
98 args=(cfg, ))
99 ksft_exit()
100
101
102if __name__ == "__main__":
103 main()