A fork of https://github.com/crosspoint-reader/crosspoint-reader
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 512 lines 17 kB view raw
1#!/usr/bin/env python3 2""" 3ESP32 Serial Monitor with Memory Graph 4 5This script provides a comprehensive real-time serial monitor for ESP32 devices with 6integrated memory usage graphing capabilities. It reads serial output, parses memory 7information, and displays it in both console and graphical form. 8 9Features: 10- Real-time serial output monitoring with color-coded log levels 11- Interactive memory usage graphing with matplotlib 12- Command input interface for sending commands to the ESP32 device 13- Screenshot capture and processing (1-bit black/white format) 14- Graceful shutdown handling with Ctrl-C signal processing 15- Configurable filtering and suppression of log messages 16- Thread-safe operation with coordinated shutdown events 17 18Usage: 19 python debugging_monitor.py [port] [options] 20 21The script will open a matplotlib window showing memory usage over time and provide 22an interactive command prompt for sending commands to the device. Press Ctrl-C or 23close the graph window to exit gracefully. 24""" 25 26from __future__ import annotations 27 28import argparse 29import glob 30import platform 31import re 32import signal 33import sys 34import threading 35from collections import deque 36from datetime import datetime 37 38# Try to import potentially missing packages 39PACKAGE_MAPPING: dict[str, str] = { 40 "serial": "pyserial", 41 "colorama": "colorama", 42 "matplotlib": "matplotlib", 43 "PIL": "Pillow", 44} 45 46try: 47 import matplotlib.pyplot as plt 48 import serial 49 from colorama import Fore, Style, init 50 from matplotlib import animation 51 52 try: 53 from PIL import Image 54 except ImportError: 55 Image = None 56except ImportError as e: 57 ERROR_MSG = str(e).lower() 58 missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG] 59 60 if not missing_packages: 61 # Fallback if mapping doesn't cover 62 missing_packages = ["pyserial", "colorama", "matplotlib"] 63 64 print("\n" + "!" * 50) 65 print(f" Error: Required package(s) not installed: {', '.join(missing_packages)}") 66 print("!" * 50) 67 68 print("\nTo fix this, please run the following command in your terminal:\n") 69 INSTALL_CMD = "pip install " if sys.platform.startswith("win") else "pip3 install " 70 print(f" {INSTALL_CMD}{' '.join(missing_packages)}") 71 72 print("\nExiting...") 73 sys.exit(1) 74 75# --- Global Variables for Data Sharing --- 76# Store last 50 data points 77MAX_POINTS = 50 78time_data: deque[str] = deque(maxlen=MAX_POINTS) 79free_mem_data: deque[float] = deque(maxlen=MAX_POINTS) 80total_mem_data: deque[float] = deque(maxlen=MAX_POINTS) 81max_alloc_data: deque[float] = deque(maxlen=MAX_POINTS) 82data_lock: threading.Lock = threading.Lock() # Prevent reading while writing 83 84# Global shutdown flag 85shutdown_event = threading.Event() 86 87# Initialize colors 88init(autoreset=True) 89 90# Color mapping for log lines 91COLOR_KEYWORDS: dict[str, list[str]] = { 92 Fore.RED: ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"], 93 Fore.CYAN: ["[MEM]", "FREE:"], 94 Fore.MAGENTA: [ 95 "[GFX]", 96 "[ERS]", 97 "DISPLAY", 98 "RAM WRITE", 99 "RAM COMPLETE", 100 "REFRESH", 101 "POWERING ON", 102 "FRAME BUFFER", 103 "LUT", 104 ], 105 Fore.GREEN: [ 106 "[EBP]", 107 "[BMC]", 108 "[ZIP]", 109 "[PARSER]", 110 "[EHP]", 111 "LOADING EPUB", 112 "CACHE", 113 "DECOMPRESSED", 114 "PARSING", 115 ], 116 Fore.YELLOW: ["[ACT]", "ENTERING ACTIVITY", "EXITING ACTIVITY"], 117 Fore.BLUE: ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"], 118 Fore.LIGHTYELLOW_EX: [ 119 "[CPS]", 120 "SETTINGS", 121 "[CLEAR_CACHE]", 122 "[CHAP]", 123 "[OPDS]", 124 "[COF]", 125 ], 126 Fore.LIGHTBLACK_EX: [ 127 "ESP-ROM", 128 "BUILD:", 129 "RST:", 130 "BOOT:", 131 "SPIWP:", 132 "MODE:", 133 "LOAD:", 134 "ENTRY", 135 "[SD]", 136 "STARTING CROSSPOINT", 137 "VERSION", 138 ], 139 Fore.LIGHTCYAN_EX: ["[RBS]"], 140 Fore.LIGHTMAGENTA_EX: [ 141 "[KRS]", 142 "EINKDISPLAY:", 143 "STATIC FRAME", 144 "INITIALIZING", 145 "SPI INITIALIZED", 146 "GPIO PINS", 147 "RESETTING", 148 "SSD1677", 149 "E-INK", 150 ], 151 Fore.LIGHTGREEN_EX: ["[FNS]", "FOOTNOTE"], 152} 153 154 155def signal_handler(signum, frame): 156 """Handle SIGINT (Ctrl-C) by setting the shutdown event.""" 157 # frame parameter is required by signal handler signature but not used 158 del frame # Explicitly mark as unused to satisfy linters 159 print(f"\n{Fore.YELLOW}Received signal {signum}. Shutting down...{Style.RESET_ALL}") 160 shutdown_event.set() 161 plt.close("all") 162 163 164# pylint: disable=R0912 165def get_color_for_line(line: str) -> str: 166 """ 167 Classify log lines by type and assign appropriate colors. 168 """ 169 line_upper = line.upper() 170 for color, keywords in COLOR_KEYWORDS.items(): 171 if any(keyword in line_upper for keyword in keywords): 172 return color 173 return Fore.WHITE 174 175 176def parse_memory_line(line: str) -> tuple[int | None, int | None, int | None]: 177 """ 178 Extracts memory stats from MEM log lines. 179 Format: Free: N bytes, Total: N bytes, Min Free: N bytes, MaxAlloc: N bytes 180 Returns: (free_bytes, total_bytes, max_alloc_bytes) 181 """ 182 def _find(pattern: str) -> int | None: 183 m = re.search(pattern, line) 184 if m: 185 try: 186 return int(m.group(1)) 187 except ValueError: 188 pass 189 return None 190 191 return ( 192 _find(r"\bFree:\s*(\d+)"), 193 _find(r"\bTotal:\s*(\d+)"), 194 _find(r"\bMaxAlloc:\s*(\d+)"), 195 ) 196 197 198def serial_worker(ser, kwargs: dict[str, str]) -> None: 199 """ 200 Runs in a background thread. Handles reading serial data, printing to console, 201 updating memory usage data for graphing, and processing screenshot data. 202 Monitors the global shutdown event for graceful termination. 203 """ 204 print(f"{Fore.CYAN}--- Opening serial port ---{Style.RESET_ALL}") 205 filter_keyword = kwargs.get("filter", "").lower() 206 suppress = kwargs.get("suppress", "").lower() 207 if filter_keyword and suppress and filter_keyword == suppress: 208 print( 209 f"{Fore.YELLOW}Warning: Filter and Suppress keywords are the same. " 210 f"This may result in no output.{Style.RESET_ALL}" 211 ) 212 if filter_keyword: 213 print( 214 f"{Fore.YELLOW}Filtering lines to only show those containing: " 215 f"'{filter_keyword}'{Style.RESET_ALL}" 216 ) 217 if suppress: 218 print( 219 f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}" 220 ) 221 222 expecting_screenshot = False 223 screenshot_size = 0 224 screenshot_data = b"" 225 226 try: 227 while not shutdown_event.is_set(): 228 if expecting_screenshot: 229 data = ser.read(screenshot_size - len(screenshot_data)) 230 if not data: 231 continue 232 screenshot_data += data 233 if len(screenshot_data) == screenshot_size: 234 if Image: 235 img = Image.frombytes("1", (800, 480), screenshot_data) 236 # We need to rotate the image because the raw data is in landscape mode 237 img = img.transpose(Image.ROTATE_270) 238 img.save("screenshot.bmp") 239 print( 240 f"{Fore.GREEN}Screenshot saved to screenshot.bmp{Style.RESET_ALL}" 241 ) 242 else: 243 with open("screenshot.raw", "wb") as f: 244 f.write(screenshot_data) 245 print( 246 f"{Fore.GREEN}Screenshot saved to screenshot.raw (PIL not available){Style.RESET_ALL}" 247 ) 248 expecting_screenshot = False 249 screenshot_data = b"" 250 else: 251 try: 252 raw_data = ser.readline().decode("utf-8", errors="replace") 253 254 if not raw_data: 255 continue 256 257 clean_line = raw_data.strip() 258 if not clean_line: 259 continue 260 261 if clean_line.startswith("SCREENSHOT_START:"): 262 screenshot_size = int(clean_line.split(":")[1]) 263 expecting_screenshot = True 264 continue 265 elif clean_line == "SCREENSHOT_END": 266 continue # ignore 267 268 # Add PC timestamp 269 pc_time = datetime.now().strftime("%H:%M:%S") 270 formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line) 271 272 # Check for Memory Line 273 if "[MEM]" in formatted_line: 274 free_val, total_val, max_alloc_val = parse_memory_line(formatted_line) 275 if free_val is not None and total_val is not None: 276 with data_lock: 277 time_data.append(pc_time) 278 free_mem_data.append(free_val / 1024) 279 total_mem_data.append(total_val / 1024) 280 max_alloc_data.append((max_alloc_val or 0) / 1024) 281 # Apply filters 282 if filter_keyword and filter_keyword not in formatted_line.lower(): 283 continue 284 if suppress and suppress in formatted_line.lower(): 285 continue 286 # Print to console 287 line_color = get_color_for_line(formatted_line) 288 print(f"{line_color}{formatted_line}") 289 290 except (OSError, UnicodeDecodeError): 291 print( 292 f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}" 293 ) 294 break 295 except KeyboardInterrupt: 296 # If thread is killed violently (e.g. main exit), silence errors 297 pass 298 finally: 299 pass # ser closed in main 300 301 302def input_worker(ser) -> None: 303 """ 304 Runs in a background thread. Handles user input to send commands to the ESP32 device. 305 Monitors the global shutdown event for graceful termination on Ctrl-C. 306 """ 307 while not shutdown_event.is_set(): 308 try: 309 cmd = input("Command: ") 310 ser.write(f"CMD:{cmd}\n".encode()) 311 except (EOFError, KeyboardInterrupt): 312 break 313 314 315def update_graph(frame) -> list: # pylint: disable=unused-argument 316 """ 317 Called by Matplotlib animation to redraw the memory usage chart. 318 Monitors the global shutdown event and closes the plot when shutdown is requested. 319 Shows DRAM metrics (free, total, max contiguous alloc) and an optional PSRAM subplot. 320 """ 321 if shutdown_event.is_set(): 322 plt.close("all") 323 return [] 324 325 with data_lock: 326 if not time_data: 327 return [] 328 329 x = list(time_data) 330 y_free = list(free_mem_data) 331 y_total = list(total_mem_data) 332 y_max_alloc = list(max_alloc_data) 333 334 fig = plt.gcf() 335 fig.clf() 336 ax1 = fig.add_subplot(111) 337 338 ax1.plot(x, y_total, label="Total RAM (KB)", color="red", linestyle="--") 339 ax1.plot(x, y_free, label="Free RAM (KB)", color="green", marker="o", markersize=3) 340 if any(v > 0 for v in y_max_alloc): 341 ax1.plot(x, y_max_alloc, label="Max Alloc (KB)", color="orange", linestyle="-.") 342 ax1.fill_between(x, y_free, color="green", alpha=0.1) 343 ax1.set_title("ESP32 Memory Monitor") 344 ax1.set_ylabel("Memory (KB)") 345 ax1.set_xlabel("Time") 346 ax1.legend(loc="upper left") 347 ax1.grid(True, linestyle=":", alpha=0.6) 348 plt.setp(ax1.get_xticklabels(), rotation=45, ha="right") 349 350 fig.tight_layout() 351 return [] 352 353 354def get_auto_detected_port() -> list[str]: 355 """ 356 Attempts to auto-detect the serial port for the ESP32 device. 357 Returns a list of all detected ports. 358 If no suitable port is found, the list will be empty. 359 Darwin/Linux logic by jonasdiemer 360 """ 361 port_list = [] 362 system = platform.system() 363 # Code for darwin (macOS), linux, and windows 364 if system in ("Darwin", "Linux"): 365 pattern = "/dev/tty.usbmodem*" if system == "Darwin" else "/dev/ttyACM*" 366 port_list = sorted(glob.glob(pattern)) 367 elif system == "Windows": 368 from serial.tools import list_ports 369 370 # Be careful with this pattern list - it should be specific 371 # enough to avoid picking up unrelated devices, but broad enough 372 # to catch all common USB-serial adapters used with ESP32 373 # Caveat: localized versions of Windows may have different descriptions, 374 # so we also check for specific VID:PID (but that may not cover all clones) 375 pattern_list = ["CP210x", "CH340", "USB Serial"] 376 found_ports = list_ports.comports() 377 port_list = [ 378 port.device 379 for port in found_ports 380 if any(pat in port.description for pat in pattern_list) 381 or port.hwid.startswith( 382 "USB VID:PID=303A:1001" 383 ) # Add specific VID:PID for XTEINK X4 384 ] 385 386 return port_list 387 388 389def main() -> None: 390 """ 391 Main entry point for the ESP32 monitor application. 392 393 Sets up argument parsing, initializes serial communication, starts background threads 394 for serial monitoring and command input, and launches the memory usage graph. 395 Implements graceful shutdown handling with signal processing for clean termination. 396 397 Features: 398 - Serial port monitoring with color-coded output 399 - Real-time memory usage graphing 400 - Interactive command interface 401 - Screenshot capture capability 402 - Graceful shutdown on Ctrl-C or window close 403 """ 404 parser = argparse.ArgumentParser( 405 description="ESP32 Serial Monitor with Memory Graph - Real-time monitoring, graphing, and command interface" 406 ) 407 default_baudrate = 115200 408 parser.add_argument( 409 "port", 410 nargs="?", 411 default=None, 412 help="Serial port (leave empty for autodetection)", 413 ) 414 parser.add_argument( 415 "--baud", 416 type=int, 417 default=default_baudrate, 418 help=f"Baud rate (default: {default_baudrate})", 419 ) 420 parser.add_argument( 421 "--filter", 422 type=str, 423 default="", 424 help="Only display lines containing this keyword (case-insensitive)", 425 ) 426 parser.add_argument( 427 "--suppress", 428 type=str, 429 default="", 430 help="Suppress lines containing this keyword (case-insensitive)", 431 ) 432 args = parser.parse_args() 433 port = args.port 434 if port is None: 435 port_list = get_auto_detected_port() 436 if len(port_list) == 1: 437 port = port_list[0] 438 print(f"{Fore.CYAN}Auto-detected serial port: {port}{Style.RESET_ALL}") 439 elif len(port_list) > 1: 440 print(f"{Fore.YELLOW}Multiple serial ports found:{Style.RESET_ALL}") 441 for p in port_list: 442 print(f" - {p}") 443 print( 444 f"{Fore.YELLOW}Please specify the desired port as a command-line argument.{Style.RESET_ALL}" 445 ) 446 if port is None: 447 print(f"{Fore.RED}Error: No suitable serial port found.{Style.RESET_ALL}") 448 sys.exit(1) 449 450 try: 451 ser = serial.Serial(port, args.baud, timeout=0.1) 452 ser.dtr = False 453 ser.rts = False 454 except serial.SerialException as e: 455 print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}") 456 return 457 458 # Set up signal handler for graceful shutdown 459 signal.signal(signal.SIGINT, signal_handler) 460 461 # 1. Start the Serial Reader in a separate thread 462 # Daemon=True means this thread dies when the main program closes 463 myargs = vars(args) # Convert Namespace to dict for easier passing 464 t = threading.Thread(target=serial_worker, args=(ser, myargs), daemon=True) 465 t.start() 466 467 # Start input thread 468 input_thread = threading.Thread(target=input_worker, args=(ser,), daemon=True) 469 input_thread.start() 470 471 # 2. Set up the Graph (Main Thread) 472 try: 473 import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel 474 475 default_styles = ( 476 "light_background", 477 "ggplot", 478 "seaborn", 479 "dark_background", 480 ) 481 styles = list(mplstyle.available) 482 for default_style in default_styles: 483 if default_style in styles: 484 print( 485 f"\n{Fore.CYAN}--- Using Matplotlib style: {default_style} ---{Style.RESET_ALL}" 486 ) 487 mplstyle.use(default_style) 488 break 489 except (AttributeError, ValueError): 490 pass 491 492 fig = plt.figure(figsize=(10, 6)) 493 494 # Update graph every 1000ms 495 _ = animation.FuncAnimation( 496 fig, update_graph, interval=1000, cache_frame_data=False 497 ) 498 499 try: 500 print( 501 f"{Fore.YELLOW}Starting Graph Window... (Close window or press Ctrl-C to exit){Style.RESET_ALL}" 502 ) 503 plt.show() 504 except KeyboardInterrupt: 505 print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}") 506 finally: 507 shutdown_event.set() # Ensure all threads know to stop 508 plt.close("all") # Force close any lingering plot windows 509 510 511if __name__ == "__main__": 512 main()