A fork of https://github.com/crosspoint-reader/crosspoint-reader
1#!/usr/bin/env python3
2"""
3ESP32 Serial Monitor with Memory Graph
4
5This script provides a comprehensive real-time serial monitor for ESP32 devices with
6integrated memory usage graphing capabilities. It reads serial output, parses memory
7information, and displays it in both console and graphical form.
8
9Features:
10- Real-time serial output monitoring with color-coded log levels
11- Interactive memory usage graphing with matplotlib
12- Command input interface for sending commands to the ESP32 device
13- Screenshot capture and processing (1-bit black/white format)
14- Graceful shutdown handling with Ctrl-C signal processing
15- Configurable filtering and suppression of log messages
16- Thread-safe operation with coordinated shutdown events
17
18Usage:
19 python debugging_monitor.py [port] [options]
20
21The script will open a matplotlib window showing memory usage over time and provide
22an interactive command prompt for sending commands to the device. Press Ctrl-C or
23close the graph window to exit gracefully.
24"""
25
26from __future__ import annotations
27
28import argparse
29import glob
30import platform
31import re
32import signal
33import sys
34import threading
35from collections import deque
36from datetime import datetime
37
38# Try to import potentially missing packages
39PACKAGE_MAPPING: dict[str, str] = {
40 "serial": "pyserial",
41 "colorama": "colorama",
42 "matplotlib": "matplotlib",
43 "PIL": "Pillow",
44}
45
46try:
47 import matplotlib.pyplot as plt
48 import serial
49 from colorama import Fore, Style, init
50 from matplotlib import animation
51
52 try:
53 from PIL import Image
54 except ImportError:
55 Image = None
56except ImportError as e:
57 ERROR_MSG = str(e).lower()
58 missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG]
59
60 if not missing_packages:
61 # Fallback if mapping doesn't cover
62 missing_packages = ["pyserial", "colorama", "matplotlib"]
63
64 print("\n" + "!" * 50)
65 print(f" Error: Required package(s) not installed: {', '.join(missing_packages)}")
66 print("!" * 50)
67
68 print("\nTo fix this, please run the following command in your terminal:\n")
69 INSTALL_CMD = "pip install " if sys.platform.startswith("win") else "pip3 install "
70 print(f" {INSTALL_CMD}{' '.join(missing_packages)}")
71
72 print("\nExiting...")
73 sys.exit(1)
74
75# --- Global Variables for Data Sharing ---
76# Store last 50 data points
77MAX_POINTS = 50
78time_data: deque[str] = deque(maxlen=MAX_POINTS)
79free_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
80total_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
81max_alloc_data: deque[float] = deque(maxlen=MAX_POINTS)
82data_lock: threading.Lock = threading.Lock() # Prevent reading while writing
83
84# Global shutdown flag
85shutdown_event = threading.Event()
86
87# Initialize colors
88init(autoreset=True)
89
90# Color mapping for log lines
91COLOR_KEYWORDS: dict[str, list[str]] = {
92 Fore.RED: ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"],
93 Fore.CYAN: ["[MEM]", "FREE:"],
94 Fore.MAGENTA: [
95 "[GFX]",
96 "[ERS]",
97 "DISPLAY",
98 "RAM WRITE",
99 "RAM COMPLETE",
100 "REFRESH",
101 "POWERING ON",
102 "FRAME BUFFER",
103 "LUT",
104 ],
105 Fore.GREEN: [
106 "[EBP]",
107 "[BMC]",
108 "[ZIP]",
109 "[PARSER]",
110 "[EHP]",
111 "LOADING EPUB",
112 "CACHE",
113 "DECOMPRESSED",
114 "PARSING",
115 ],
116 Fore.YELLOW: ["[ACT]", "ENTERING ACTIVITY", "EXITING ACTIVITY"],
117 Fore.BLUE: ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"],
118 Fore.LIGHTYELLOW_EX: [
119 "[CPS]",
120 "SETTINGS",
121 "[CLEAR_CACHE]",
122 "[CHAP]",
123 "[OPDS]",
124 "[COF]",
125 ],
126 Fore.LIGHTBLACK_EX: [
127 "ESP-ROM",
128 "BUILD:",
129 "RST:",
130 "BOOT:",
131 "SPIWP:",
132 "MODE:",
133 "LOAD:",
134 "ENTRY",
135 "[SD]",
136 "STARTING CROSSPOINT",
137 "VERSION",
138 ],
139 Fore.LIGHTCYAN_EX: ["[RBS]"],
140 Fore.LIGHTMAGENTA_EX: [
141 "[KRS]",
142 "EINKDISPLAY:",
143 "STATIC FRAME",
144 "INITIALIZING",
145 "SPI INITIALIZED",
146 "GPIO PINS",
147 "RESETTING",
148 "SSD1677",
149 "E-INK",
150 ],
151 Fore.LIGHTGREEN_EX: ["[FNS]", "FOOTNOTE"],
152}
153
154
155def signal_handler(signum, frame):
156 """Handle SIGINT (Ctrl-C) by setting the shutdown event."""
157 # frame parameter is required by signal handler signature but not used
158 del frame # Explicitly mark as unused to satisfy linters
159 print(f"\n{Fore.YELLOW}Received signal {signum}. Shutting down...{Style.RESET_ALL}")
160 shutdown_event.set()
161 plt.close("all")
162
163
164# pylint: disable=R0912
165def get_color_for_line(line: str) -> str:
166 """
167 Classify log lines by type and assign appropriate colors.
168 """
169 line_upper = line.upper()
170 for color, keywords in COLOR_KEYWORDS.items():
171 if any(keyword in line_upper for keyword in keywords):
172 return color
173 return Fore.WHITE
174
175
176def parse_memory_line(line: str) -> tuple[int | None, int | None, int | None]:
177 """
178 Extracts memory stats from MEM log lines.
179 Format: Free: N bytes, Total: N bytes, Min Free: N bytes, MaxAlloc: N bytes
180 Returns: (free_bytes, total_bytes, max_alloc_bytes)
181 """
182 def _find(pattern: str) -> int | None:
183 m = re.search(pattern, line)
184 if m:
185 try:
186 return int(m.group(1))
187 except ValueError:
188 pass
189 return None
190
191 return (
192 _find(r"\bFree:\s*(\d+)"),
193 _find(r"\bTotal:\s*(\d+)"),
194 _find(r"\bMaxAlloc:\s*(\d+)"),
195 )
196
197
198def serial_worker(ser, kwargs: dict[str, str]) -> None:
199 """
200 Runs in a background thread. Handles reading serial data, printing to console,
201 updating memory usage data for graphing, and processing screenshot data.
202 Monitors the global shutdown event for graceful termination.
203 """
204 print(f"{Fore.CYAN}--- Opening serial port ---{Style.RESET_ALL}")
205 filter_keyword = kwargs.get("filter", "").lower()
206 suppress = kwargs.get("suppress", "").lower()
207 if filter_keyword and suppress and filter_keyword == suppress:
208 print(
209 f"{Fore.YELLOW}Warning: Filter and Suppress keywords are the same. "
210 f"This may result in no output.{Style.RESET_ALL}"
211 )
212 if filter_keyword:
213 print(
214 f"{Fore.YELLOW}Filtering lines to only show those containing: "
215 f"'{filter_keyword}'{Style.RESET_ALL}"
216 )
217 if suppress:
218 print(
219 f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}"
220 )
221
222 expecting_screenshot = False
223 screenshot_size = 0
224 screenshot_data = b""
225
226 try:
227 while not shutdown_event.is_set():
228 if expecting_screenshot:
229 data = ser.read(screenshot_size - len(screenshot_data))
230 if not data:
231 continue
232 screenshot_data += data
233 if len(screenshot_data) == screenshot_size:
234 if Image:
235 img = Image.frombytes("1", (800, 480), screenshot_data)
236 # We need to rotate the image because the raw data is in landscape mode
237 img = img.transpose(Image.ROTATE_270)
238 img.save("screenshot.bmp")
239 print(
240 f"{Fore.GREEN}Screenshot saved to screenshot.bmp{Style.RESET_ALL}"
241 )
242 else:
243 with open("screenshot.raw", "wb") as f:
244 f.write(screenshot_data)
245 print(
246 f"{Fore.GREEN}Screenshot saved to screenshot.raw (PIL not available){Style.RESET_ALL}"
247 )
248 expecting_screenshot = False
249 screenshot_data = b""
250 else:
251 try:
252 raw_data = ser.readline().decode("utf-8", errors="replace")
253
254 if not raw_data:
255 continue
256
257 clean_line = raw_data.strip()
258 if not clean_line:
259 continue
260
261 if clean_line.startswith("SCREENSHOT_START:"):
262 screenshot_size = int(clean_line.split(":")[1])
263 expecting_screenshot = True
264 continue
265 elif clean_line == "SCREENSHOT_END":
266 continue # ignore
267
268 # Add PC timestamp
269 pc_time = datetime.now().strftime("%H:%M:%S")
270 formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line)
271
272 # Check for Memory Line
273 if "[MEM]" in formatted_line:
274 free_val, total_val, max_alloc_val = parse_memory_line(formatted_line)
275 if free_val is not None and total_val is not None:
276 with data_lock:
277 time_data.append(pc_time)
278 free_mem_data.append(free_val / 1024)
279 total_mem_data.append(total_val / 1024)
280 max_alloc_data.append((max_alloc_val or 0) / 1024)
281 # Apply filters
282 if filter_keyword and filter_keyword not in formatted_line.lower():
283 continue
284 if suppress and suppress in formatted_line.lower():
285 continue
286 # Print to console
287 line_color = get_color_for_line(formatted_line)
288 print(f"{line_color}{formatted_line}")
289
290 except (OSError, UnicodeDecodeError):
291 print(
292 f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}"
293 )
294 break
295 except KeyboardInterrupt:
296 # If thread is killed violently (e.g. main exit), silence errors
297 pass
298 finally:
299 pass # ser closed in main
300
301
302def input_worker(ser) -> None:
303 """
304 Runs in a background thread. Handles user input to send commands to the ESP32 device.
305 Monitors the global shutdown event for graceful termination on Ctrl-C.
306 """
307 while not shutdown_event.is_set():
308 try:
309 cmd = input("Command: ")
310 ser.write(f"CMD:{cmd}\n".encode())
311 except (EOFError, KeyboardInterrupt):
312 break
313
314
315def update_graph(frame) -> list: # pylint: disable=unused-argument
316 """
317 Called by Matplotlib animation to redraw the memory usage chart.
318 Monitors the global shutdown event and closes the plot when shutdown is requested.
319 Shows DRAM metrics (free, total, max contiguous alloc) and an optional PSRAM subplot.
320 """
321 if shutdown_event.is_set():
322 plt.close("all")
323 return []
324
325 with data_lock:
326 if not time_data:
327 return []
328
329 x = list(time_data)
330 y_free = list(free_mem_data)
331 y_total = list(total_mem_data)
332 y_max_alloc = list(max_alloc_data)
333
334 fig = plt.gcf()
335 fig.clf()
336 ax1 = fig.add_subplot(111)
337
338 ax1.plot(x, y_total, label="Total RAM (KB)", color="red", linestyle="--")
339 ax1.plot(x, y_free, label="Free RAM (KB)", color="green", marker="o", markersize=3)
340 if any(v > 0 for v in y_max_alloc):
341 ax1.plot(x, y_max_alloc, label="Max Alloc (KB)", color="orange", linestyle="-.")
342 ax1.fill_between(x, y_free, color="green", alpha=0.1)
343 ax1.set_title("ESP32 Memory Monitor")
344 ax1.set_ylabel("Memory (KB)")
345 ax1.set_xlabel("Time")
346 ax1.legend(loc="upper left")
347 ax1.grid(True, linestyle=":", alpha=0.6)
348 plt.setp(ax1.get_xticklabels(), rotation=45, ha="right")
349
350 fig.tight_layout()
351 return []
352
353
354def get_auto_detected_port() -> list[str]:
355 """
356 Attempts to auto-detect the serial port for the ESP32 device.
357 Returns a list of all detected ports.
358 If no suitable port is found, the list will be empty.
359 Darwin/Linux logic by jonasdiemer
360 """
361 port_list = []
362 system = platform.system()
363 # Code for darwin (macOS), linux, and windows
364 if system in ("Darwin", "Linux"):
365 pattern = "/dev/tty.usbmodem*" if system == "Darwin" else "/dev/ttyACM*"
366 port_list = sorted(glob.glob(pattern))
367 elif system == "Windows":
368 from serial.tools import list_ports
369
370 # Be careful with this pattern list - it should be specific
371 # enough to avoid picking up unrelated devices, but broad enough
372 # to catch all common USB-serial adapters used with ESP32
373 # Caveat: localized versions of Windows may have different descriptions,
374 # so we also check for specific VID:PID (but that may not cover all clones)
375 pattern_list = ["CP210x", "CH340", "USB Serial"]
376 found_ports = list_ports.comports()
377 port_list = [
378 port.device
379 for port in found_ports
380 if any(pat in port.description for pat in pattern_list)
381 or port.hwid.startswith(
382 "USB VID:PID=303A:1001"
383 ) # Add specific VID:PID for XTEINK X4
384 ]
385
386 return port_list
387
388
389def main() -> None:
390 """
391 Main entry point for the ESP32 monitor application.
392
393 Sets up argument parsing, initializes serial communication, starts background threads
394 for serial monitoring and command input, and launches the memory usage graph.
395 Implements graceful shutdown handling with signal processing for clean termination.
396
397 Features:
398 - Serial port monitoring with color-coded output
399 - Real-time memory usage graphing
400 - Interactive command interface
401 - Screenshot capture capability
402 - Graceful shutdown on Ctrl-C or window close
403 """
404 parser = argparse.ArgumentParser(
405 description="ESP32 Serial Monitor with Memory Graph - Real-time monitoring, graphing, and command interface"
406 )
407 default_baudrate = 115200
408 parser.add_argument(
409 "port",
410 nargs="?",
411 default=None,
412 help="Serial port (leave empty for autodetection)",
413 )
414 parser.add_argument(
415 "--baud",
416 type=int,
417 default=default_baudrate,
418 help=f"Baud rate (default: {default_baudrate})",
419 )
420 parser.add_argument(
421 "--filter",
422 type=str,
423 default="",
424 help="Only display lines containing this keyword (case-insensitive)",
425 )
426 parser.add_argument(
427 "--suppress",
428 type=str,
429 default="",
430 help="Suppress lines containing this keyword (case-insensitive)",
431 )
432 args = parser.parse_args()
433 port = args.port
434 if port is None:
435 port_list = get_auto_detected_port()
436 if len(port_list) == 1:
437 port = port_list[0]
438 print(f"{Fore.CYAN}Auto-detected serial port: {port}{Style.RESET_ALL}")
439 elif len(port_list) > 1:
440 print(f"{Fore.YELLOW}Multiple serial ports found:{Style.RESET_ALL}")
441 for p in port_list:
442 print(f" - {p}")
443 print(
444 f"{Fore.YELLOW}Please specify the desired port as a command-line argument.{Style.RESET_ALL}"
445 )
446 if port is None:
447 print(f"{Fore.RED}Error: No suitable serial port found.{Style.RESET_ALL}")
448 sys.exit(1)
449
450 try:
451 ser = serial.Serial(port, args.baud, timeout=0.1)
452 ser.dtr = False
453 ser.rts = False
454 except serial.SerialException as e:
455 print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
456 return
457
458 # Set up signal handler for graceful shutdown
459 signal.signal(signal.SIGINT, signal_handler)
460
461 # 1. Start the Serial Reader in a separate thread
462 # Daemon=True means this thread dies when the main program closes
463 myargs = vars(args) # Convert Namespace to dict for easier passing
464 t = threading.Thread(target=serial_worker, args=(ser, myargs), daemon=True)
465 t.start()
466
467 # Start input thread
468 input_thread = threading.Thread(target=input_worker, args=(ser,), daemon=True)
469 input_thread.start()
470
471 # 2. Set up the Graph (Main Thread)
472 try:
473 import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel
474
475 default_styles = (
476 "light_background",
477 "ggplot",
478 "seaborn",
479 "dark_background",
480 )
481 styles = list(mplstyle.available)
482 for default_style in default_styles:
483 if default_style in styles:
484 print(
485 f"\n{Fore.CYAN}--- Using Matplotlib style: {default_style} ---{Style.RESET_ALL}"
486 )
487 mplstyle.use(default_style)
488 break
489 except (AttributeError, ValueError):
490 pass
491
492 fig = plt.figure(figsize=(10, 6))
493
494 # Update graph every 1000ms
495 _ = animation.FuncAnimation(
496 fig, update_graph, interval=1000, cache_frame_data=False
497 )
498
499 try:
500 print(
501 f"{Fore.YELLOW}Starting Graph Window... (Close window or press Ctrl-C to exit){Style.RESET_ALL}"
502 )
503 plt.show()
504 except KeyboardInterrupt:
505 print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}")
506 finally:
507 shutdown_event.set() # Ensure all threads know to stop
508 plt.close("all") # Force close any lingering plot windows
509
510
511if __name__ == "__main__":
512 main()