Monorepo for Aesthetic.Computer
aesthetic.computer
1#!/usr/bin/env python3
2"""
3HP 7585B serial driver with monitoring + safe chunking.
4
5- Xon/Xoff hardware flow control (kernel-level via pyserial).
6- Chunks on HP-GL `;` boundaries, max 256 bytes/frame (the plotter's
7 documented per-frame ceiling). Never splits mid-command.
8- Per-byte delay (default 1 ms) to keep tiny machines happy.
9- Reader thread prints anything the plotter sends back, prefixed [<-].
10- Raw mode (`-c "OI;"`) for ad-hoc commands; file mode (`-f file.hpgl`)
11 for prepared plots.
12
13usage:
14 serial-tx.py --port /dev/cu.usbserial-110 -c "OI;" --listen 2
15 serial-tx.py --port /dev/cu.usbserial-110 -f test-print.hpgl
16"""
17
18from __future__ import annotations
19
20import argparse
21import os
22import sys
23import threading
24import time
25from pathlib import Path
26
27import serial
28
29
30XON = 0x11
31XOFF = 0x13
32
33
34def _reader(
35 ser: serial.Serial, stop: threading.Event, can_send: threading.Event
36) -> None:
37 while not stop.is_set():
38 try:
39 data = ser.read(64)
40 except serial.SerialException:
41 return
42 if not data:
43 continue
44
45 # Strip Xon/Xoff out of the user-visible stream and act on them.
46 passthrough = bytearray()
47 for b in data:
48 if b == XOFF:
49 if can_send.is_set():
50 sys.stdout.write("[<-] <Xoff>\n")
51 sys.stdout.flush()
52 can_send.clear()
53 elif b == XON:
54 if not can_send.is_set():
55 sys.stdout.write("[<-] <Xon>\n")
56 sys.stdout.flush()
57 can_send.set()
58 else:
59 passthrough.append(b)
60 if passthrough:
61 try:
62 text = bytes(passthrough).decode("ascii", errors="replace")
63 except Exception:
64 text = repr(bytes(passthrough))
65 sys.stdout.write(f"[<-] {text}")
66 sys.stdout.flush()
67
68
69def _chunks(payload: bytes, limit: int) -> list[bytes]:
70 """Split payload into <= `limit`-byte frames on `;` boundaries.
71
72 Walks forward, taking as many full HP-GL commands as fit before each
73 cut. Falls back to a hard split if a single command exceeds `limit`
74 (rare — long PA polylines from vpype can do this).
75 """
76 out: list[bytes] = []
77 i = 0
78 n = len(payload)
79 while i < n:
80 end = min(i + limit, n)
81 if end < n:
82 cut = payload.rfind(b";", i, end + 1)
83 if cut > i:
84 end = cut + 1 # include the ';'
85 out.append(payload[i:end])
86 i = end
87 return out
88
89
90def send(
91 port: str,
92 payload: bytes,
93 baud: int,
94 frame_limit: int,
95 byte_delay_ms: float,
96 chunk_delay_ms: float,
97 listen_after_s: float,
98) -> None:
99 ser = serial.Serial(
100 port=port,
101 baudrate=baud,
102 bytesize=serial.EIGHTBITS,
103 parity=serial.PARITY_NONE,
104 stopbits=serial.STOPBITS_ONE,
105 xonxoff=False, # plotter is in hardwire-handshake mode
106 rtscts=True, # honor the plotter's CTS line
107 dsrdtr=True, # assert DTR; honor DSR
108 timeout=0.5,
109 write_timeout=30,
110 )
111 # Make sure modem control lines are asserted from our side.
112 ser.setDTR(True)
113 ser.setRTS(True)
114 stop = threading.Event()
115 can_send = threading.Event()
116 can_send.set() # start in "go" state; the plotter Xoffs us when full
117 reader = threading.Thread(
118 target=_reader, args=(ser, stop, can_send), daemon=True
119 )
120 reader.start()
121
122 def _wait_xon() -> None:
123 while not can_send.wait(timeout=0.5):
124 if stop.is_set():
125 return
126
127 try:
128 # drain anything already buffered (boot banner, prior responses)
129 ser.reset_input_buffer()
130
131 frames = _chunks(payload, frame_limit)
132 total = len(payload)
133 sent = 0
134 byte_pause = byte_delay_ms / 1000.0
135 chunk_pause = chunk_delay_ms / 1000.0
136
137 for idx, frame in enumerate(frames, 1):
138 _wait_xon()
139 sys.stdout.write(
140 f"[->] frame {idx}/{len(frames)} ({len(frame)} B, "
141 f"{sent}/{total} sent)\n"
142 )
143 sys.stdout.flush()
144 if byte_pause > 0:
145 for b in frame:
146 _wait_xon()
147 ser.write(bytes([b]))
148 time.sleep(byte_pause)
149 else:
150 ser.write(frame)
151 ser.flush()
152 sent += len(frame)
153 if chunk_pause > 0 and idx < len(frames):
154 time.sleep(chunk_pause)
155
156 sys.stdout.write(f"[->] done, {sent} B sent\n")
157 sys.stdout.flush()
158
159 if listen_after_s > 0:
160 time.sleep(listen_after_s)
161 finally:
162 stop.set()
163 time.sleep(0.1)
164 ser.close()
165
166
167def main() -> int:
168 p = argparse.ArgumentParser(description=__doc__.splitlines()[0])
169 p.add_argument(
170 "--port",
171 default=os.environ.get("HP7585B_TTY", ""),
172 help="serial device (defaults to $HP7585B_TTY)",
173 )
174 p.add_argument("--baud", type=int, default=9600)
175 p.add_argument(
176 "--frame-bytes",
177 type=int,
178 default=256,
179 help="max bytes per frame (plotter limit; default 256)",
180 )
181 p.add_argument(
182 "--byte-delay-ms",
183 type=float,
184 default=2.0,
185 help="delay between bytes in ms (default 2.0; set 0 to disable)",
186 )
187 p.add_argument(
188 "--chunk-delay-ms",
189 type=float,
190 default=10.0,
191 help="delay between frames in ms (default 10)",
192 )
193 p.add_argument(
194 "--listen",
195 type=float,
196 default=1.0,
197 metavar="SECONDS",
198 help="seconds to keep listening after the last byte is sent",
199 )
200 src = p.add_mutually_exclusive_group(required=True)
201 src.add_argument("-c", "--command", help='inline HP-GL, e.g. "OI;"')
202 src.add_argument("-f", "--file", help="path to .hpgl file")
203
204 args = p.parse_args()
205 if not args.port:
206 p.error("no --port and HP7585B_TTY not set")
207 if not Path(args.port).exists():
208 p.error(f"{args.port} does not exist")
209
210 if args.command is not None:
211 payload = args.command.encode("ascii")
212 else:
213 payload = Path(args.file).read_bytes()
214
215 send(
216 port=args.port,
217 payload=payload,
218 baud=args.baud,
219 frame_limit=args.frame_bytes,
220 byte_delay_ms=args.byte_delay_ms,
221 chunk_delay_ms=args.chunk_delay_ms,
222 listen_after_s=args.listen,
223 )
224 return 0
225
226
227if __name__ == "__main__":
228 sys.exit(main())