Monorepo for Aesthetic.Computer aesthetic.computer
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 228 lines 6.6 kB view raw
1#!/usr/bin/env python3 2""" 3HP 7585B serial driver with monitoring + safe chunking. 4 5- Xon/Xoff hardware flow control (kernel-level via pyserial). 6- Chunks on HP-GL `;` boundaries, max 256 bytes/frame (the plotter's 7 documented per-frame ceiling). Never splits mid-command. 8- Per-byte delay (default 1 ms) to keep tiny machines happy. 9- Reader thread prints anything the plotter sends back, prefixed [<-]. 10- Raw mode (`-c "OI;"`) for ad-hoc commands; file mode (`-f file.hpgl`) 11 for prepared plots. 12 13usage: 14 serial-tx.py --port /dev/cu.usbserial-110 -c "OI;" --listen 2 15 serial-tx.py --port /dev/cu.usbserial-110 -f test-print.hpgl 16""" 17 18from __future__ import annotations 19 20import argparse 21import os 22import sys 23import threading 24import time 25from pathlib import Path 26 27import serial 28 29 30XON = 0x11 31XOFF = 0x13 32 33 34def _reader( 35 ser: serial.Serial, stop: threading.Event, can_send: threading.Event 36) -> None: 37 while not stop.is_set(): 38 try: 39 data = ser.read(64) 40 except serial.SerialException: 41 return 42 if not data: 43 continue 44 45 # Strip Xon/Xoff out of the user-visible stream and act on them. 46 passthrough = bytearray() 47 for b in data: 48 if b == XOFF: 49 if can_send.is_set(): 50 sys.stdout.write("[<-] <Xoff>\n") 51 sys.stdout.flush() 52 can_send.clear() 53 elif b == XON: 54 if not can_send.is_set(): 55 sys.stdout.write("[<-] <Xon>\n") 56 sys.stdout.flush() 57 can_send.set() 58 else: 59 passthrough.append(b) 60 if passthrough: 61 try: 62 text = bytes(passthrough).decode("ascii", errors="replace") 63 except Exception: 64 text = repr(bytes(passthrough)) 65 sys.stdout.write(f"[<-] {text}") 66 sys.stdout.flush() 67 68 69def _chunks(payload: bytes, limit: int) -> list[bytes]: 70 """Split payload into <= `limit`-byte frames on `;` boundaries. 71 72 Walks forward, taking as many full HP-GL commands as fit before each 73 cut. Falls back to a hard split if a single command exceeds `limit` 74 (rare — long PA polylines from vpype can do this). 75 """ 76 out: list[bytes] = [] 77 i = 0 78 n = len(payload) 79 while i < n: 80 end = min(i + limit, n) 81 if end < n: 82 cut = payload.rfind(b";", i, end + 1) 83 if cut > i: 84 end = cut + 1 # include the ';' 85 out.append(payload[i:end]) 86 i = end 87 return out 88 89 90def send( 91 port: str, 92 payload: bytes, 93 baud: int, 94 frame_limit: int, 95 byte_delay_ms: float, 96 chunk_delay_ms: float, 97 listen_after_s: float, 98) -> None: 99 ser = serial.Serial( 100 port=port, 101 baudrate=baud, 102 bytesize=serial.EIGHTBITS, 103 parity=serial.PARITY_NONE, 104 stopbits=serial.STOPBITS_ONE, 105 xonxoff=False, # plotter is in hardwire-handshake mode 106 rtscts=True, # honor the plotter's CTS line 107 dsrdtr=True, # assert DTR; honor DSR 108 timeout=0.5, 109 write_timeout=30, 110 ) 111 # Make sure modem control lines are asserted from our side. 112 ser.setDTR(True) 113 ser.setRTS(True) 114 stop = threading.Event() 115 can_send = threading.Event() 116 can_send.set() # start in "go" state; the plotter Xoffs us when full 117 reader = threading.Thread( 118 target=_reader, args=(ser, stop, can_send), daemon=True 119 ) 120 reader.start() 121 122 def _wait_xon() -> None: 123 while not can_send.wait(timeout=0.5): 124 if stop.is_set(): 125 return 126 127 try: 128 # drain anything already buffered (boot banner, prior responses) 129 ser.reset_input_buffer() 130 131 frames = _chunks(payload, frame_limit) 132 total = len(payload) 133 sent = 0 134 byte_pause = byte_delay_ms / 1000.0 135 chunk_pause = chunk_delay_ms / 1000.0 136 137 for idx, frame in enumerate(frames, 1): 138 _wait_xon() 139 sys.stdout.write( 140 f"[->] frame {idx}/{len(frames)} ({len(frame)} B, " 141 f"{sent}/{total} sent)\n" 142 ) 143 sys.stdout.flush() 144 if byte_pause > 0: 145 for b in frame: 146 _wait_xon() 147 ser.write(bytes([b])) 148 time.sleep(byte_pause) 149 else: 150 ser.write(frame) 151 ser.flush() 152 sent += len(frame) 153 if chunk_pause > 0 and idx < len(frames): 154 time.sleep(chunk_pause) 155 156 sys.stdout.write(f"[->] done, {sent} B sent\n") 157 sys.stdout.flush() 158 159 if listen_after_s > 0: 160 time.sleep(listen_after_s) 161 finally: 162 stop.set() 163 time.sleep(0.1) 164 ser.close() 165 166 167def main() -> int: 168 p = argparse.ArgumentParser(description=__doc__.splitlines()[0]) 169 p.add_argument( 170 "--port", 171 default=os.environ.get("HP7585B_TTY", ""), 172 help="serial device (defaults to $HP7585B_TTY)", 173 ) 174 p.add_argument("--baud", type=int, default=9600) 175 p.add_argument( 176 "--frame-bytes", 177 type=int, 178 default=256, 179 help="max bytes per frame (plotter limit; default 256)", 180 ) 181 p.add_argument( 182 "--byte-delay-ms", 183 type=float, 184 default=2.0, 185 help="delay between bytes in ms (default 2.0; set 0 to disable)", 186 ) 187 p.add_argument( 188 "--chunk-delay-ms", 189 type=float, 190 default=10.0, 191 help="delay between frames in ms (default 10)", 192 ) 193 p.add_argument( 194 "--listen", 195 type=float, 196 default=1.0, 197 metavar="SECONDS", 198 help="seconds to keep listening after the last byte is sent", 199 ) 200 src = p.add_mutually_exclusive_group(required=True) 201 src.add_argument("-c", "--command", help='inline HP-GL, e.g. "OI;"') 202 src.add_argument("-f", "--file", help="path to .hpgl file") 203 204 args = p.parse_args() 205 if not args.port: 206 p.error("no --port and HP7585B_TTY not set") 207 if not Path(args.port).exists(): 208 p.error(f"{args.port} does not exist") 209 210 if args.command is not None: 211 payload = args.command.encode("ascii") 212 else: 213 payload = Path(args.file).read_bytes() 214 215 send( 216 port=args.port, 217 payload=payload, 218 baud=args.baud, 219 frame_limit=args.frame_bytes, 220 byte_delay_ms=args.byte_delay_ms, 221 chunk_delay_ms=args.chunk_delay_ms, 222 listen_after_s=args.listen, 223 ) 224 return 0 225 226 227if __name__ == "__main__": 228 sys.exit(main())