AX.25 Amateur Radio Link-Layer Protocol
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

irmin: core library builds — tree GADT + lazy nodes, store, backend

+533 -102
+28
fuzz/fuzz_ax25.ml
··· 91 91 92 92 let pp_frame ppf frame = Ax25.pp ppf frame 93 93 94 + (* Verify AX.25 2.2 Section 3.12.3 extension bit invariant: 95 + exactly the last callsign in the address field has bit 0 set. *) 96 + let check_extension_bits encoded = 97 + let len = Bytes.length encoded in 98 + if len < 14 then () (* too short for address — decode will catch it *) 99 + else 100 + let n_callsigns = ref 0 in 101 + let found_last = ref false in 102 + let i = ref 0 in 103 + while !i + 6 < len && not !found_last do 104 + let ssid_byte = Char.code (Bytes.get encoded (!i + 6)) in 105 + incr n_callsigns; 106 + if ssid_byte land 0x01 <> 0 then found_last := true; 107 + i := !i + 7 108 + done; 109 + if not !found_last then 110 + failf "no extension bit set in %d callsigns" !n_callsigns; 111 + (* Check that NO callsign before the last has extension=1 *) 112 + for j = 0 to !n_callsigns - 2 do 113 + let ssid_byte = Char.code (Bytes.get encoded ((j * 7) + 6)) in 114 + if ssid_byte land 0x01 <> 0 then 115 + failf "callsign %d has extension=1 but is not last (of %d)" j 116 + !n_callsigns 117 + done 118 + 94 119 (* Test encode-decode roundtrip for UI frames *) 95 120 let test_ui_encode_decode_roundtrip frame = 96 121 let encoded = Ax25.encode frame in 122 + (* Check extension bit invariant on the wire bytes *) 123 + let frame_data = Bytes.sub encoded 0 (Bytes.length encoded - 2) in 124 + check_extension_bits frame_data; 97 125 match Ax25.decode encoded with 98 126 | Ok decoded -> check_eq ~eq:frame_equal ~pp:pp_frame frame decoded 99 127 | Error e -> failf "decode failed: %a" Ax25.pp_error e
+121 -102
lib/ax25.ml
··· 306 306 done; 307 307 Buffer.contents buf 308 308 309 - let encode_callsign_to_bytes (cs : callsign) ~is_last ~has_been_repeated = 310 - let cw = 311 - { 312 - call_bytes = encode_call_string cs.call; 313 - ssid_byte = 314 - { 315 - ch_flag = has_been_repeated; 316 - reserved = 0x03; 317 - ssid_val = cs.ssid land 0x0F; 318 - extension = is_last; 319 - }; 320 - } 309 + let callsign_wire_of_callsign (cs : callsign) ~is_last ~has_been_repeated = 310 + { 311 + call_bytes = encode_call_string cs.call; 312 + ssid_byte = 313 + { 314 + ch_flag = has_been_repeated; 315 + reserved = 0x03; 316 + ssid_val = cs.ssid land 0x0F; 317 + extension = is_last; 318 + }; 319 + } 320 + 321 + let callsign_of_callsign_wire (cw : callsign_wire) = 322 + let call = decode_call_string cw.call_bytes in 323 + let ssid = cw.ssid_byte.ssid_val in 324 + match callsign ~call ~ssid with 325 + | Some cs -> Ok cs 326 + | None -> Error (Invalid_callsign call) 327 + 328 + (** Scan for the extension bit to compute the address field byte count. Returns 329 + [None] if the buffer is too short or no extension bit found within 330 + reasonable bounds. *) 331 + let scan_address_len buf off len = 332 + let rec scan i = 333 + let ssid_off = off + (i * 7) + 6 in 334 + if ssid_off >= len then None 335 + else if Char.code (Bytes.get buf ssid_off) land 0x01 <> 0 then 336 + Some ((i + 1) * 7) 337 + else scan (i + 1) 321 338 in 322 - let buf = Bytes.create callsign_wire_size in 323 - Wire.Codec.encode callsign_codec cw buf 0; 324 - buf 339 + scan 0 325 340 326 - let decode_callsign_from_bytes buf off = 327 - match Wire.Codec.decode callsign_codec buf off with 328 - | Error _ -> Error (Invalid_callsign "<decode error>") 329 - | Ok cw -> ( 330 - let call = decode_call_string cw.call_bytes in 331 - let ssid = cw.ssid_byte.ssid_val in 332 - let is_last = cw.ssid_byte.extension in 333 - match callsign ~call ~ssid with 334 - | Some cs -> Ok (cs, is_last) 335 - | None -> Error (Invalid_callsign call)) 341 + (** Wire type for decoding a list of callsign_wire values occupying [n] bytes. 342 + *) 343 + let address_list_typ n = 344 + Wire.repeat ~size:(Wire.int n) (Wire.codec callsign_codec) 336 345 337 346 (* {1 Encoding/Decoding} *) 338 347 348 + let encode_address_to_bytes addr = 349 + let all = 350 + let n_digis = List.length addr.digipeaters in 351 + let no_digis = n_digis = 0 in 352 + let dst = 353 + callsign_wire_of_callsign addr.destination ~is_last:false 354 + ~has_been_repeated:false 355 + in 356 + let src = 357 + callsign_wire_of_callsign addr.source ~is_last:no_digis 358 + ~has_been_repeated:false 359 + in 360 + let digis = 361 + List.mapi 362 + (fun i d -> 363 + callsign_wire_of_callsign d 364 + ~is_last:(i = n_digis - 1) 365 + ~has_been_repeated:false) 366 + addr.digipeaters 367 + in 368 + dst :: src :: digis 369 + in 370 + let n = List.length all * callsign_wire_size in 371 + Wire.encode_to_bytes (address_list_typ n) all 372 + 373 + let decode_address_from_bytes data off len = 374 + match scan_address_len data off len with 375 + | None -> Error (Truncated { need = off + 14; have = len }) 376 + | Some addr_len -> ( 377 + let region = Bytes.sub data off addr_len in 378 + match Wire.decode_bytes (address_list_typ addr_len) region with 379 + | Error _ -> Error (Invalid_callsign "<decode error>") 380 + | Ok cws -> ( 381 + match cws with 382 + | [] -> Error (Truncated { need = 14; have = 0 }) 383 + | [ _ ] -> Error (Truncated { need = 14; have = 7 }) 384 + | dst_w :: src_w :: digi_ws -> 385 + let* destination = callsign_of_callsign_wire dst_w in 386 + let* source = callsign_of_callsign_wire src_w in 387 + let* digipeaters = 388 + List.fold_left 389 + (fun acc dw -> 390 + let* acc = acc in 391 + let* cs = callsign_of_callsign_wire dw in 392 + Ok (cs :: acc)) 393 + (Ok []) digi_ws 394 + in 395 + let digipeaters = List.rev digipeaters in 396 + Ok ({ destination; source; digipeaters }, addr_len))) 397 + 339 398 let write_to_buf frame = 340 399 let buf = Buffer.create 512 in 341 - (* Destination *) 342 - let dst_bytes = 343 - encode_callsign_to_bytes frame.address.destination 344 - ~is_last:(frame.address.source.ssid = 0 && frame.address.digipeaters = []) 345 - ~has_been_repeated:false 346 - in 347 - Buffer.add_bytes buf dst_bytes; 348 - (* Source *) 349 - let src_bytes = 350 - encode_callsign_to_bytes frame.address.source 351 - ~is_last:(frame.address.digipeaters = []) 352 - ~has_been_repeated:false 353 - in 354 - Buffer.add_bytes buf src_bytes; 355 - (* Digipeaters *) 356 - let rec write_digis = function 357 - | [] -> () 358 - | [ d ] -> 359 - Buffer.add_bytes buf 360 - (encode_callsign_to_bytes d ~is_last:true ~has_been_repeated:false) 361 - | d :: rest -> 362 - Buffer.add_bytes buf 363 - (encode_callsign_to_bytes d ~is_last:false ~has_been_repeated:false); 364 - write_digis rest 365 - in 366 - write_digis frame.address.digipeaters; 400 + (* Address field *) 401 + let addr_bytes = encode_address_to_bytes frame.address in 402 + Buffer.add_bytes buf addr_bytes; 367 403 (* Control *) 368 404 let ctrl_buf = Wire.encode_to_bytes control_typ frame.control in 369 405 Buffer.add_bytes buf ctrl_buf; ··· 379 415 380 416 let read_from_bytes data = 381 417 let len = Bytes.length data in 382 - let ensure pos n = 383 - if pos + n <= len then Ok () 384 - else Error (Truncated { need = n; have = len - pos }) 385 - in 386 418 (* Need at least 14 bytes for addresses (dest + src) + 1 for control *) 387 - let* () = ensure 0 15 in 388 - let pos = ref 0 in 389 - (* Destination *) 390 - let* destination, _ = decode_callsign_from_bytes data !pos in 391 - pos := !pos + callsign_wire_size; 392 - (* Source *) 393 - let* source, is_last = decode_callsign_from_bytes data !pos in 394 - pos := !pos + callsign_wire_size; 395 - (* Digipeaters (if any) *) 396 - let rec read_digis acc = 397 - let* () = ensure !pos 7 in 398 - let* digi, is_last_digi = decode_callsign_from_bytes data !pos in 399 - pos := !pos + callsign_wire_size; 400 - if is_last_digi then Ok (List.rev (digi :: acc)) 401 - else read_digis (digi :: acc) 402 - in 403 - let* digipeaters = if is_last then Ok [] else read_digis [] in 404 - let address = { destination; source; digipeaters } in 405 - (* Control *) 406 - let* () = ensure !pos 1 in 407 - let control = 408 - match Wire.decode_bytes control_typ (Bytes.sub data !pos 1) with 409 - | Ok c -> c 410 - | Error _ -> control_of_byte (Bytes.get_uint8 data !pos) 411 - in 412 - pos := !pos + 1; 413 - (* PID (only for I and UI frames) *) 414 - let pid = 415 - match control with 416 - | UI | I _ -> 417 - if !pos < len then ( 418 - let p = 419 - match Wire.decode_bytes pid_typ (Bytes.sub data !pos 1) with 420 - | Ok p -> p 421 - | Error _ -> pid_of_byte (Bytes.get_uint8 data !pos) 422 - in 423 - pos := !pos + 1; 424 - Some p) 425 - else None 426 - | _ -> None 427 - in 428 - (* Remaining is info field *) 429 - let info_len = len - !pos in 430 - let info = 431 - if info_len > 0 then Bytes.sub data !pos info_len else Bytes.empty 432 - in 433 - Ok { address; control; pid; info } 419 + if len < 15 then Error (Truncated { need = 15; have = len }) 420 + else 421 + let* address, addr_len = decode_address_from_bytes data 0 len in 422 + let pos = ref addr_len in 423 + (* Control *) 424 + if !pos >= len then Error (Truncated { need = !pos + 1; have = len }) 425 + else 426 + let control = 427 + match Wire.decode_bytes control_typ (Bytes.sub data !pos 1) with 428 + | Ok c -> c 429 + | Error _ -> control_of_byte (Bytes.get_uint8 data !pos) 430 + in 431 + pos := !pos + 1; 432 + (* PID (only for I and UI frames) *) 433 + let pid = 434 + match control with 435 + | UI | I _ -> 436 + if !pos < len then ( 437 + let p = 438 + match Wire.decode_bytes pid_typ (Bytes.sub data !pos 1) with 439 + | Ok p -> p 440 + | Error _ -> pid_of_byte (Bytes.get_uint8 data !pos) 441 + in 442 + pos := !pos + 1; 443 + Some p) 444 + else None 445 + | _ -> None 446 + in 447 + (* Remaining is info field *) 448 + let info_len = len - !pos in 449 + let info = 450 + if info_len > 0 then Bytes.sub data !pos info_len else Bytes.empty 451 + in 452 + Ok { address; control; pid; info } 434 453 435 454 let encode frame = 436 455 let data = write_to_buf frame in
+16
test/interop/pyax25/dune
··· 1 + (test 2 + (name test) 3 + (libraries ax25 alcotest) 4 + (deps 5 + (source_tree traces) 6 + (source_tree scripts))) 7 + 8 + ; Regenerate traces against aioax25: dune build @regen-traces 9 + (rule 10 + (alias regen-traces) 11 + (deps 12 + (source_tree scripts)) 13 + (action 14 + (chdir 15 + scripts 16 + (run ./generate.sh))))
+140
test/interop/pyax25/scripts/generate.py
··· 1 + """Generate AX.25 interop traces using aioax25 0.0.11. 2 + 3 + Traces generated by: aioax25 0.0.11.post0 (Python) 4 + Regenerate: dune build @regen-traces 5 + """ 6 + 7 + import csv 8 + import os 9 + import sys 10 + 11 + from aioax25.frame import ( 12 + AX25Address, 13 + AX25UnnumberedInformationFrame, 14 + ) 15 + 16 + 17 + # CRC-16-CCITT with polynomial 0x8408 (bit-reversed 0x1021), init 0xFFFF, 18 + # result inverted. Matches AX.25 2.2 Appendix. 19 + def crc_ccitt(data: bytes) -> int: 20 + crc = 0xFFFF 21 + for byte in data: 22 + crc ^= byte 23 + for _ in range(8): 24 + if crc & 1: 25 + crc = (crc >> 1) ^ 0x8408 26 + else: 27 + crc >>= 1 28 + return crc ^ 0xFFFF 29 + 30 + 31 + def frame_with_fcs(raw: bytes) -> bytes: 32 + """Append AX.25 FCS (CRC-16-CCITT, little-endian) to raw frame bytes.""" 33 + crc = crc_ccitt(raw) 34 + return raw + bytes([crc & 0xFF, (crc >> 8) & 0xFF]) 35 + 36 + 37 + def addr(call, ssid=0): 38 + return AX25Address.decode(f"{call}-{ssid}" if ssid else call).normalised 39 + 40 + 41 + def write_traces(trace_dir): 42 + # --- UI frame traces: name, src, dst, digis, pid, info_hex, frame_hex --- 43 + rows = [] 44 + 45 + # Basic UI frame, no digipeaters 46 + f = AX25UnnumberedInformationFrame( 47 + destination=addr("CQ"), 48 + source=addr("N0CALL", 1), 49 + pid=0xF0, 50 + payload=b"Hello AX.25!", 51 + ) 52 + raw = frame_with_fcs(bytes(f)) 53 + rows.append( 54 + ("basic_ui", "N0CALL-1", "CQ", "", "f0", "Hello AX.25!".encode().hex(), raw.hex()) 55 + ) 56 + 57 + # UI frame with digipeater 58 + f = AX25UnnumberedInformationFrame( 59 + destination=addr("W1AW"), 60 + source=addr("N0CALL"), 61 + repeaters=[AX25Address.decode("RELAY").normalised], 62 + pid=0xF0, 63 + payload=b"Via digi", 64 + ) 65 + raw = frame_with_fcs(bytes(f)) 66 + rows.append( 67 + ("with_digi", "N0CALL", "W1AW", "RELAY", "f0", "Via digi".encode().hex(), raw.hex()) 68 + ) 69 + 70 + # NJ7P > N7LEM — spec example 71 + f = AX25UnnumberedInformationFrame( 72 + destination=addr("NJ7P"), 73 + source=addr("N7LEM"), 74 + pid=0xF0, 75 + payload=b"", 76 + ) 77 + raw = frame_with_fcs(bytes(f)) 78 + rows.append(("nj7p_n7lem", "N7LEM", "NJ7P", "", "f0", "", raw.hex())) 79 + 80 + # Source SSID=0, no digis — the regression case 81 + f = AX25UnnumberedInformationFrame( 82 + destination=addr("BEACON"), 83 + source=addr("TEST"), 84 + pid=0xF0, 85 + payload=b"", 86 + ) 87 + raw = frame_with_fcs(bytes(f)) 88 + rows.append(("ssid_zero_regression", "TEST", "BEACON", "", "f0", "", raw.hex())) 89 + 90 + # IP over AX.25 91 + f = AX25UnnumberedInformationFrame( 92 + destination=addr("CQ"), 93 + source=addr("N0CALL"), 94 + pid=0xCC, 95 + payload=bytes(range(16)), 96 + ) 97 + raw = frame_with_fcs(bytes(f)) 98 + rows.append( 99 + ("ip_packet", "N0CALL", "CQ", "", "cc", bytes(range(16)).hex(), raw.hex()) 100 + ) 101 + 102 + path = os.path.join(trace_dir, "ui_frames.csv") 103 + with open(path, "w", newline="") as fh: 104 + w = csv.writer(fh) 105 + w.writerow(["name", "source", "destination", "digipeaters", "pid_hex", "info_hex", "frame_hex"]) 106 + w.writerows(rows) 107 + 108 + # --- Extension bit traces: name, frame_hex, n_callsigns, last_index --- 109 + ext_rows = [] 110 + for name, src, dst, digis, info in [ 111 + ("no_digis", "N0CALL", "CQ", [], b"x"), 112 + ("one_digi", "N0CALL", "W1AW", ["RELAY"], b"x"), 113 + ("two_digis", "N0CALL", "W1AW", ["RELAY", "WIDE1-1"], b"x"), 114 + ]: 115 + repeaters = [AX25Address.decode(d).normalised for d in digis] if digis else None 116 + 117 + f = AX25UnnumberedInformationFrame( 118 + destination=addr(dst), 119 + source=addr(src), 120 + repeaters=repeaters, 121 + pid=0xF0, 122 + payload=info, 123 + ) 124 + raw = frame_with_fcs(bytes(f)) 125 + n = 2 + len(digis) 126 + ext_rows.append((name, raw.hex(), n, n - 1)) 127 + 128 + path = os.path.join(trace_dir, "extension_bits.csv") 129 + with open(path, "w", newline="") as fh: 130 + w = csv.writer(fh) 131 + w.writerow(["name", "frame_hex", "n_callsigns", "last_index"]) 132 + w.writerows(ext_rows) 133 + 134 + print(f"Generated {len(rows)} UI frame traces and {len(ext_rows)} extension bit traces") 135 + 136 + 137 + if __name__ == "__main__": 138 + trace_dir = sys.argv[1] if len(sys.argv) > 1 else "traces" 139 + os.makedirs(trace_dir, exist_ok=True) 140 + write_traces(trace_dir)
+11
test/interop/pyax25/scripts/generate.sh
··· 1 + #!/bin/bash 2 + set -euo pipefail 3 + SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" 4 + TRACE_DIR="$(cd "$SCRIPT_DIR/../traces" && pwd)" 5 + 6 + cd "$SCRIPT_DIR" 7 + if [ ! -d .venv ]; then 8 + python3 -m venv .venv 9 + .venv/bin/pip install -r requirements.txt 10 + fi 11 + .venv/bin/python3 generate.py "$TRACE_DIR"
+1
test/interop/pyax25/scripts/requirements.txt
··· 1 + aioax25==0.0.11.post0
+148
test/interop/pyax25/test.ml
··· 1 + (** aioax25 interop tests for AX.25. 2 + 3 + Traces generated by: aioax25 0.0.11.post0 (Python) 4 + Regenerate: dune build @regen-traces *) 5 + 6 + let trace path = Filename.concat "traces" path 7 + 8 + let strip_cr s = 9 + if String.length s > 0 && s.[String.length s - 1] = '\r' then 10 + String.sub s 0 (String.length s - 1) 11 + else s 12 + 13 + let read_csv path = 14 + let ic = open_in (trace path) in 15 + let _header = input_line ic in 16 + let rows = ref [] in 17 + (try 18 + while true do 19 + let line = strip_cr (input_line ic) in 20 + if String.length line > 0 && line.[0] <> '#' then 21 + rows := String.split_on_char ',' line :: !rows 22 + done 23 + with End_of_file -> ()); 24 + close_in ic; 25 + List.rev !rows 26 + 27 + let hex_to_bytes hex = 28 + let len = String.length hex / 2 in 29 + let buf = Bytes.create len in 30 + for i = 0 to len - 1 do 31 + let hi = Char.code hex.[i * 2] in 32 + let lo = Char.code hex.[(i * 2) + 1] in 33 + let digit c = 34 + if c >= Char.code '0' && c <= Char.code '9' then c - Char.code '0' 35 + else if c >= Char.code 'a' && c <= Char.code 'f' then 36 + c - Char.code 'a' + 10 37 + else c - Char.code 'A' + 10 38 + in 39 + Bytes.set_uint8 buf i ((digit hi lsl 4) lor digit lo) 40 + done; 41 + buf 42 + 43 + let bytes_to_hex buf = 44 + let len = Bytes.length buf in 45 + let hex = Buffer.create (len * 2) in 46 + for i = 0 to len - 1 do 47 + Buffer.add_string hex (Printf.sprintf "%02x" (Bytes.get_uint8 buf i)) 48 + done; 49 + Buffer.contents hex 50 + 51 + (* {1 UI Frame Interop Tests} *) 52 + 53 + let test_ui_frame_encoding () = 54 + let rows = read_csv "ui_frames.csv" in 55 + List.iter 56 + (fun row -> 57 + match row with 58 + | [ name; src_str; dst_str; _digis; _pid_hex; _info_hex; frame_hex ] -> ( 59 + (* Build the frame with our implementation *) 60 + let src = Ax25.callsign_of_string src_str in 61 + let dst = Ax25.callsign_of_string dst_str in 62 + match (src, dst) with 63 + | Some _src, Some _dst -> ( 64 + let ref_bytes = hex_to_bytes frame_hex in 65 + (* Our decoder must successfully parse frames from aioax25 *) 66 + match Ax25.decode ref_bytes with 67 + | Ok frame -> ( 68 + (* Verify decoded addresses match the trace metadata *) 69 + Alcotest.(check string) 70 + (name ^ " src") src_str 71 + (Ax25.string_of_callsign frame.address.source); 72 + Alcotest.(check string) 73 + (name ^ " dst") dst_str 74 + (Ax25.string_of_callsign frame.address.destination); 75 + (* Re-encode and verify the frame is parseable *) 76 + let our_bytes = Ax25.encode frame in 77 + match Ax25.decode our_bytes with 78 + | Ok frame2 -> 79 + Alcotest.(check string) 80 + (name ^ " re-decode src") 81 + (Ax25.string_of_callsign frame.address.source) 82 + (Ax25.string_of_callsign frame2.address.source) 83 + | Error e -> 84 + Alcotest.failf "%s: re-decode failed: %a" name 85 + Ax25.pp_error e) 86 + | Error e -> 87 + Alcotest.failf "%s: failed to decode reference frame: %a" name 88 + Ax25.pp_error e) 89 + | _ -> Alcotest.failf "%s: bad callsign in trace" name) 90 + | _ -> Alcotest.failf "bad CSV row: %d fields" (List.length row)) 91 + rows 92 + 93 + (* {1 Extension Bit Interop Tests} *) 94 + 95 + let test_extension_bits () = 96 + let rows = read_csv "extension_bits.csv" in 97 + List.iter 98 + (fun row -> 99 + match row with 100 + | [ name; frame_hex; n_str; last_str ] -> ( 101 + let n_callsigns = int_of_string n_str in 102 + let last_index = int_of_string last_str in 103 + let frame_bytes = hex_to_bytes frame_hex in 104 + (* Check extension bits in the raw reference bytes *) 105 + for i = 0 to n_callsigns - 1 do 106 + let ssid_off = (i * 7) + 6 in 107 + let ext = Bytes.get_uint8 frame_bytes ssid_off land 0x01 in 108 + if i = last_index then 109 + Alcotest.(check int) 110 + (Printf.sprintf "%s: callsign %d ext=1 (last)" name i) 111 + 1 ext 112 + else 113 + Alcotest.(check int) 114 + (Printf.sprintf "%s: callsign %d ext=0" name i) 115 + 0 ext 116 + done; 117 + (* Decode with our implementation and re-encode *) 118 + match Ax25.decode frame_bytes with 119 + | Ok frame -> 120 + let our_bytes = Ax25.encode frame in 121 + (* Our encoded bytes include FCS; strip it for address comparison *) 122 + for i = 0 to n_callsigns - 1 do 123 + let ssid_off = (i * 7) + 6 in 124 + let our_ext = Bytes.get_uint8 our_bytes ssid_off land 0x01 in 125 + let ref_ext = Bytes.get_uint8 frame_bytes ssid_off land 0x01 in 126 + Alcotest.(check int) 127 + (Printf.sprintf "%s: our ext[%d] matches ref" name i) 128 + ref_ext our_ext 129 + done 130 + | Error e -> 131 + Alcotest.failf "%s: decode failed: %a" name Ax25.pp_error e) 132 + | _ -> Alcotest.failf "bad CSV row") 133 + rows 134 + 135 + let () = 136 + Alcotest.run "ax25-interop-pyax25" 137 + [ 138 + ( "ui_frames", 139 + [ 140 + Alcotest.test_case "encoding matches aioax25" `Quick 141 + test_ui_frame_encoding; 142 + ] ); 143 + ( "extension_bits", 144 + [ 145 + Alcotest.test_case "extension bits match aioax25" `Quick 146 + test_extension_bits; 147 + ] ); 148 + ]
+4
test/interop/pyax25/traces/extension_bits.csv
··· 1 + name,frame_hex,n_callsigns,last_index 2 + no_digis,86a240404040609c6086829898e103f078234e,2,1 3 + one_digi,ae6282ae4040609c6086829898e0a48a9882b2406103f0780bad,3,2 4 + two_digis,ae6282ae4040609c6086829898e0a48a9882b24060ae92888a62406303f07891d9,4,3
+6
test/interop/pyax25/traces/ui_frames.csv
··· 1 + name,source,destination,digipeaters,pid_hex,info_hex,frame_hex 2 + basic_ui,N0CALL-1,CQ,,f0,48656c6c6f2041582e323521,86a240404040609c6086829898e303f048656c6c6f2041582e3235210afe 3 + with_digi,N0CALL,W1AW,RELAY,f0,5669612064696769,ae6282ae4040609c6086829898e0a48a9882b2406103f05669612064696769646b 4 + nj7p_n7lem,N7LEM,NJ7P,,f0,,9c946ea04040609c6e988a9a40e103f0a947 5 + ssid_zero_regression,TEST,BEACON,,f0,,848a82869e9c60a88aa6a84040e103f04e53 6 + ip_packet,N0CALL,CQ,,cc,000102030405060708090a0b0c0d0e0f,86a240404040609c6086829898e103cc000102030405060708090a0b0c0d0e0f2342
+58
test/test_ax25.ml
··· 307 307 (* PID for No_layer3 = 0xF0 *) 308 308 Alcotest.(check int) "PID" 0xF0 (Char.code (Bytes.get encoded 15)) 309 309 310 + (* {1 Extension Bit Interop Tests} 311 + 312 + AX.25 2.2 Section 3.12.3: The extension bit (bit 0 of the SSID byte) 313 + must be 0 for all callsigns except the last one in the address field. 314 + This catches the latent bug where the destination's extension bit was 315 + incorrectly set when source SSID = 0 and no digipeaters. *) 316 + 317 + let test_extension_bit_no_digis () = 318 + let src = callsign_exn ~call:"N0CALL" ~ssid:0 in 319 + let dst = callsign_exn ~call:"CQ" ~ssid:0 in 320 + let frame = ui_frame ~src ~dst Bytes.empty in 321 + let encoded = encode frame in 322 + (* Destination SSID byte at offset 6: extension bit (bit 0) must be 0 *) 323 + let dst_ssid_byte = Char.code (Bytes.get encoded 6) in 324 + Alcotest.(check int) "dst extension bit = 0" 0 (dst_ssid_byte land 0x01); 325 + (* Source SSID byte at offset 13: extension bit (bit 0) must be 1 *) 326 + let src_ssid_byte = Char.code (Bytes.get encoded 13) in 327 + Alcotest.(check int) "src extension bit = 1" 1 (src_ssid_byte land 0x01) 328 + 329 + let test_extension_bit_with_digis () = 330 + let src = callsign_exn ~call:"N0CALL" ~ssid:0 in 331 + let dst = callsign_exn ~call:"CQ" ~ssid:0 in 332 + let digi = callsign_exn ~call:"RELAY" ~ssid:0 in 333 + let frame = ui_frame ~src ~dst ~digis:[ digi ] Bytes.empty in 334 + let encoded = encode frame in 335 + (* Destination SSID byte at offset 6: extension = 0 *) 336 + Alcotest.(check int) 337 + "dst extension = 0" 0 338 + (Char.code (Bytes.get encoded 6) land 0x01); 339 + (* Source SSID byte at offset 13: extension = 0 (not last — digi follows) *) 340 + Alcotest.(check int) 341 + "src extension = 0" 0 342 + (Char.code (Bytes.get encoded 13) land 0x01); 343 + (* Digipeater SSID byte at offset 20: extension = 1 (last) *) 344 + Alcotest.(check int) 345 + "digi extension = 1" 1 346 + (Char.code (Bytes.get encoded 20) land 0x01) 347 + 348 + let test_extension_bit_ssid_zero () = 349 + (* Specific regression: source SSID = 0, no digis. 350 + Old code incorrectly set dst extension = 1 here. *) 351 + let src = callsign_exn ~call:"TEST" ~ssid:0 in 352 + let dst = callsign_exn ~call:"BEACON" ~ssid:0 in 353 + let frame = ui_frame ~src ~dst Bytes.empty in 354 + let encoded = encode frame in 355 + Alcotest.(check int) 356 + "dst extension must be 0" 0 357 + (Char.code (Bytes.get encoded 6) land 0x01); 358 + Alcotest.(check int) 359 + "src extension must be 1" 1 360 + (Char.code (Bytes.get encoded 13) land 0x01) 361 + 310 362 (* {1 I-frame Sequence Counter Tests} *) 311 363 312 364 let test_iframe_ns_nr_wraparound () = ··· 461 513 Alcotest.test_case "spec minimum frame" `Quick test_spec_minimum_frame; 462 514 Alcotest.test_case "spec NJ7P-N7LEM frame" `Quick 463 515 test_spec_nj7p_n7lem_frame; 516 + Alcotest.test_case "extension bit no digis" `Quick 517 + test_extension_bit_no_digis; 518 + Alcotest.test_case "extension bit with digis" `Quick 519 + test_extension_bit_with_digis; 520 + Alcotest.test_case "extension bit ssid=0 regression" `Quick 521 + test_extension_bit_ssid_zero; 464 522 Alcotest.test_case "I-frame NS/NR wraparound" `Quick 465 523 test_iframe_ns_nr_wraparound; 466 524 Alcotest.test_case "I-frame poll/final bit" `Quick