optimizing a gate level bcm to the end of the earth and back
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: nicer progress and quit handling

+107 -3
+107 -3
search_with_4input.py
··· 7 7 from multiprocessing import Manager 8 8 from concurrent.futures import ProcessPoolExecutor, as_completed 9 9 import sys 10 + import os 10 11 import time 11 12 import threading 12 13 import queue 14 + import termios 15 + import tty 16 + import signal 17 + import atexit 13 18 14 19 from bcd_optimization.solver import BCDTo7SegmentSolver 15 20 from bcd_optimization.truth_tables import SEGMENT_MINTERMS, SEGMENT_NAMES ··· 23 28 DIM = "\033[2m" 24 29 RESET = "\033[0m" 25 30 BOLD = "\033[1m" 31 + HIDE_CURSOR = "\033[?25l" 32 + SHOW_CURSOR = "\033[?25h" 26 33 27 34 28 35 class ProgressDisplay: ··· 33 40 self.running = False 34 41 self.thread = None 35 42 self.stats_queue = stats_queue 43 + self.original_termios = None 36 44 37 45 # State 38 46 self.completed = 0 ··· 49 57 self.total_vars = 0 50 58 self.total_clauses = 0 51 59 60 + def _disable_input(self): 61 + """Disable keyboard echo and hide cursor.""" 62 + try: 63 + self.original_termios = termios.tcgetattr(sys.stdin) 64 + new_termios = termios.tcgetattr(sys.stdin) 65 + new_termios[3] = new_termios[3] & ~termios.ECHO & ~termios.ICANON 66 + termios.tcsetattr(sys.stdin, termios.TCSANOW, new_termios) 67 + except (termios.error, AttributeError): 68 + pass 69 + sys.stdout.write(HIDE_CURSOR) 70 + sys.stdout.flush() 71 + 72 + def _restore_input(self): 73 + """Restore keyboard echo and show cursor.""" 74 + sys.stdout.write(SHOW_CURSOR) 75 + sys.stdout.flush() 76 + if self.original_termios: 77 + try: 78 + termios.tcsetattr(sys.stdin, termios.TCSANOW, self.original_termios) 79 + except (termios.error, AttributeError): 80 + pass 81 + self.original_termios = None 82 + 52 83 def start(self, total, cost): 53 84 """Start the progress display thread.""" 54 85 with self.lock: ··· 63 94 self.total_decisions = 0 64 95 self.running = True 65 96 97 + self._disable_input() 66 98 self.thread = threading.Thread(target=self._update_loop, daemon=True) 67 99 self.thread.start() 68 100 ··· 71 103 self.running = False 72 104 if self.thread: 73 105 self.thread.join(timeout=0.5) 106 + self._restore_input() 74 107 # Clear the line 75 108 print(f"\r{CLEAR_LINE}", end="", flush=True) 76 109 ··· 297 330 return try_config_with_stats(n2, n3, n4, use_complements, restrict_functions, stats_queue) 298 331 299 332 333 + def worker_init(): 334 + """Initialize worker process to ignore SIGINT (parent handles it).""" 335 + signal.signal(signal.SIGINT, signal.SIG_IGN) 336 + 337 + 300 338 def verify_result(result): 301 339 """Verify a synthesis result is correct.""" 302 340 def eval_func2(func, a, b): ··· 337 375 return True, "All correct" 338 376 339 377 378 + def cleanup_terminal(): 379 + """Restore terminal to normal state.""" 380 + sys.stdout.write(SHOW_CURSOR) 381 + sys.stdout.flush() 382 + 383 + 340 384 def main(): 385 + # Register cleanup to ensure cursor is always restored 386 + atexit.register(cleanup_terminal) 387 + 388 + # Store original terminal settings for signal handler 389 + original_termios = None 390 + try: 391 + original_termios = termios.tcgetattr(sys.stdin) 392 + except (termios.error, AttributeError): 393 + pass 394 + 395 + # Create progress display early so signal handler can access it 396 + progress = ProgressDisplay(None) # Queue set later 397 + executor_ref = [None] # Mutable container for executor reference 398 + # State for signal handler status message 399 + search_state = { 400 + 'start_time': 0, 401 + 'group_start': 0, 402 + 'configs_tested': 0, 403 + 'current_cost': 0, 404 + 'group_completed': 0, 405 + 'group_size': 0, 406 + } 407 + 408 + def signal_handler(signum, frame): 409 + """Handle interrupt signals gracefully.""" 410 + # Stop progress display first to prevent overwrites 411 + progress.running = False 412 + if progress.thread: 413 + progress.thread.join(timeout=0.2) 414 + # Shutdown executor without waiting 415 + if executor_ref[0]: 416 + executor_ref[0].shutdown(wait=False, cancel_futures=True) 417 + # Restore terminal 418 + sys.stdout.write(f"\r{CLEAR_LINE}") 419 + sys.stdout.write(SHOW_CURSOR) 420 + sys.stdout.flush() 421 + if original_termios: 422 + try: 423 + termios.tcsetattr(sys.stdin, termios.TCSANOW, original_termios) 424 + except (termios.error, AttributeError): 425 + pass 426 + # Print status message 427 + elapsed = time.time() - search_state['group_start'] if search_state['group_start'] else 0 428 + conflicts = progress.total_conflicts 429 + print(f" {YELLOW}● Interrupted at {search_state['group_completed']}/{search_state['group_size']} configurations{RESET} ({elapsed:.1f}s, {conflicts} conflicts)") 430 + print(f"\n{YELLOW}Search interrupted by user{RESET}") 431 + os._exit(0) # Hard exit to avoid cleanup errors from child processes 432 + 433 + signal.signal(signal.SIGINT, signal_handler) 434 + signal.signal(signal.SIGTERM, signal_handler) 435 + 341 436 print(f"{BOLD}{'=' * 60}{RESET}") 342 437 print(f"{BOLD}BCD to 7-Segment Optimal Circuit Search (with 4-input gates){RESET}") 343 438 print(f"{BOLD}{'=' * 60}{RESET}") ··· 379 474 best_cost = float('inf') 380 475 381 476 total_start = time.time() 477 + search_state['start_time'] = total_start 382 478 configs_tested = 0 383 479 total_configs = len(configs) 384 480 385 481 # Create shared queue for stats 386 482 manager = Manager() 387 483 stats_queue = manager.Queue() 484 + progress.stats_queue = stats_queue 388 485 389 - progress = ProgressDisplay(stats_queue) 390 - 391 - with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor: 486 + with ProcessPoolExecutor(max_workers=mp.cpu_count(), initializer=worker_init) as executor: 487 + executor_ref[0] = executor 392 488 for cost in sorted(cost_groups.keys()): 393 489 if cost >= best_cost: 394 490 continue ··· 398 494 group_start = time.time() 399 495 completed_in_group = 0 400 496 497 + # Update state for signal handler 498 + search_state['current_cost'] = cost 499 + search_state['group_size'] = group_size 500 + search_state['group_completed'] = 0 501 + search_state['group_start'] = group_start 502 + 401 503 print(f"\n{CYAN}{BOLD}Testing {cost} inputs{RESET} ({group_size} configurations)") 402 504 print("-" * 50) 403 505 ··· 414 516 n2, n3, n4 = cfg[0], cfg[1], cfg[2] # First 3 elements are gate counts 415 517 completed_in_group += 1 416 518 configs_tested += 1 519 + search_state['group_completed'] = completed_in_group 520 + search_state['configs_tested'] = configs_tested 417 521 config_str = f"{n2}x2+{n3}x3+{n4}x4" 418 522 419 523 try: