Monorepo for Aesthetic.Computer aesthetic.computer
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 142 lines 4.8 kB view raw
1#!/usr/bin/env python3 2"""Send progressively-larger slabs of the slide to find which section 3triggers E2. After each slab we read OE; — first non-zero reading 4identifies the offending section.""" 5 6from __future__ import annotations 7 8import os 9import sys 10import time 11from pathlib import Path 12 13import serial 14 15XON, XOFF = 0x11, 0x13 16 17SECTIONS = [ 18 ("init", ["IN;DT\x03;DI1,0;LT;SP1;VS25;FS2;\n"]), 19 ("title", ["SI1.476,2.214;PA-7969,3500;LBnotepat.com keymap\x03\n"]), 20 ("divider", ["PU;PA-7567,3050;PD;PA7567,3050;PU;\n"]), 21 ("piano-naturals",[]), # filled below 22 ("piano-sharps", []), 23 ("qwerty-row1", []), 24 ("qwerty-row2", []), 25 ("qwerty-row3", []), 26 ("connectors", []), 27 ("tagline", []), 28 ("urls", []), 29 ("park", ["LT;PU;PA0,0;SP0;\n"]), 30] 31 32# Reuse the actual HPGL we generate by parsing the existing file into the 33# section buckets. Simpler: just send the file in halves first. 34FILE = Path("/Users/jas/aesthetic-computer/plotters/hp7585b/notepat-slide.hpgl") 35data = FILE.read_text() 36lines = data.splitlines(keepends=True) 37 38# Split lines into sections by content heuristics. 39naturals, sharps, q1, q2, q3, conns, tag, urls, post = [], [], [], [], [], [], [], [], [] 40phase = "header" 41for ln in lines: 42 if ln.startswith("IN;") or ln.startswith("PU;PA-7567"): 43 continue # init/divider already in SECTIONS 44 if "LBnotepat.com keymap" in ln: 45 continue 46 # piano naturals: y range 1200..2500, the "labelled" lines 47 if "LB" in ln and "1370" in ln and len(ln.split("LB")[-1]) <= 3: 48 naturals.append(ln); continue 49 # piano sharps: double_rect calls — y at 1700..2500 and 1760..2440 50 if ("EA" in ln) and ("1700" in ln or "1760" in ln or "2500" in ln) and "PA-" in ln and "1200" not in ln: 51 sharps.append(ln); continue 52 # QWERTY row 1 ytop=800 53 if "EA" in ln and "800" in ln and "-100" in ln: 54 q1.append(ln); continue 55 if "LB" in ln and "200" in ln and "y" in "": 56 q1.append(ln); continue 57 # ... fall back: dump remaining into post 58 post.append(ln) 59 60# This heuristic is rough — instead just split file into 4 quarters. 61quarters = [data[i:i+len(data)//4] for i in range(0, len(data), len(data)//4)] 62 63 64def open_port(): 65 s = serial.Serial("/dev/cu.usbserial-110", 9600, bytesize=8, 66 parity=serial.PARITY_NONE, stopbits=1, 67 xonxoff=False, rtscts=True, dsrdtr=True, timeout=0.5) 68 s.setDTR(True); s.setRTS(True) 69 return s 70 71 72def ask(ser, cmd): 73 ser.reset_input_buffer() 74 ser.write(cmd); ser.flush() 75 deadline = time.time() + 1.0 76 buf = bytearray() 77 while time.time() < deadline: 78 c = ser.read(64) 79 if c: 80 for b in c: 81 if b not in (XON, XOFF): 82 buf.append(b) 83 if b"\n" in buf or b"\r" in buf: 84 break 85 else: 86 time.sleep(0.05) 87 return buf.decode("ascii", errors="replace").strip() 88 89 90def send_slow(ser, payload: bytes): 91 """Send with 256-byte frames + 2ms/byte + 30ms/frame.""" 92 i = 0 93 while i < len(payload): 94 # cut on ; or ETX boundary, max 256 95 end = min(i + 256, len(payload)) 96 if end < len(payload): 97 cut = payload.rfind(b";", i, end + 1) 98 if cut > i: 99 end = cut + 1 100 chunk = payload[i:end] 101 for b in chunk: 102 ser.write(bytes([b])) 103 time.sleep(0.002) 104 ser.flush() 105 time.sleep(0.030) 106 i = end 107 108 109def main(): 110 ser = open_port() 111 try: 112 # always start with init + clear error 113 print("clearing error...") 114 ask(ser, b"OE;") 115 time.sleep(0.5) 116 cumulative = b"" 117 for idx, q in enumerate(quarters, 1): 118 print(f"\n=== sending quarter {idx}/{len(quarters)} ({len(q)} bytes) ===") 119 cumulative += q.encode("ascii") 120 # we resend cumulative each time so plotter state is consistent 121 # but that's expensive — instead just send the new piece. 122 send_slow(ser, q.encode("ascii")) 123 time.sleep(2.0) 124 err = ask(ser, b"OE;") 125 print(f" OE; -> {err}") 126 if err.strip() and err.strip() != "0": 127 print(f" *** ERROR after quarter {idx} ***") 128 # save the quarter to disk for inspection 129 Path(f"/tmp/quarter-{idx}.hpgl").write_text(q) 130 print(f" payload saved to /tmp/quarter-{idx}.hpgl") 131 # park the pen 132 ask(ser, b"PU;PA0,0;SP0;") 133 return 1 134 print("\nall quarters clean") 135 ask(ser, b"PU;PA0,0;SP0;") 136 finally: 137 ser.close() 138 return 0 139 140 141if __name__ == "__main__": 142 sys.exit(main())