tmux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add cache_retention_days config for automatic cleanup of synced segments

Default 7 days. Values: positive int = days to keep, 0 = delete
immediately after confirmed sync, -1 = keep forever. Cleanup runs
at the end of each sync pass with triple-gated safety: synced_days
membership, age check, and per-segment server confirmation.

+435 -39
+14
INSTALL.md
··· 45 45 } 46 46 ``` 47 47 48 + **optional: cache retention.** by default, synced segments are deleted after 7 days. to change this, add `cache_retention_days` to config.json: 49 + - positive number: keep synced segments for that many days (default: `7`) 50 + - `0`: delete immediately after confirmed sync 51 + - `-1`: keep forever (never auto-delete) 52 + 53 + ```json 54 + { 55 + "server_url": "http://localhost:5015", 56 + "key": "THE_API_KEY_FROM_STEP_2", 57 + "stream": "HOSTNAME.tmux", 58 + "cache_retention_days": 7 59 + } 60 + ``` 61 + 48 62 4. install and start the systemd user service: 49 63 ``` 50 64 solstone-tmux install-service
+1
pyproject.toml
··· 12 12 [project.optional-dependencies] 13 13 dev = [ 14 14 "pytest", 15 + "pytest-asyncio", 15 16 "ruff", 16 17 ] 17 18
+9 -5
src/solstone_tmux/capture.py
··· 123 123 """Get all windows for a session.""" 124 124 output = run_tmux_command( 125 125 [ 126 - "list-windows", "-t", session, "-F", 126 + "list-windows", 127 + "-t", 128 + session, 129 + "-F", 127 130 "#{window_active} #{window_id} #{window_index} #{window_name}", 128 131 ] 129 132 ) ··· 157 160 """Get all panes for a window with layout info.""" 158 161 output = run_tmux_command( 159 162 [ 160 - "list-panes", "-t", window_id, "-F", 163 + "list-panes", 164 + "-t", 165 + window_id, 166 + "-F", 161 167 "#{pane_id} #{pane_index} #{pane_left} #{pane_top} #{pane_width} #{pane_height} #{pane_active}", 162 168 ] 163 169 ) ··· 191 197 192 198 def capture_pane(self, pane_id: str) -> str: 193 199 """Capture visible pane content with ANSI escape codes.""" 194 - output = run_tmux_command( 195 - ["capture-pane", "-p", "-e", "-t", pane_id] 196 - ) 200 + output = run_tmux_command(["capture-pane", "-p", "-e", "-t", pane_id]) 197 201 return output if output else "" 198 202 199 203 def capture_session(self, session: str) -> CaptureResult | None:
+25 -13
src/solstone_tmux/cli.py
··· 16 16 import asyncio 17 17 import json 18 18 import logging 19 - import os 20 19 import shutil 21 20 import socket 22 21 import subprocess 23 22 import sys 24 23 from pathlib import Path 25 24 26 - from .config import Config, load_config, save_config 25 + from .config import load_config, save_config 27 26 from .streams import stream_name 28 27 29 28 ··· 46 45 47 46 if not config.stream: 48 47 try: 49 - config.stream = stream_name( 50 - host=socket.gethostname(), qualifier="tmux" 51 - ) 48 + config.stream = stream_name(host=socket.gethostname(), qualifier="tmux") 52 49 except ValueError as e: 53 50 print(f"Error: {e}", file=sys.stderr) 54 51 return 1 ··· 85 82 # Derive stream name 86 83 if not config.stream: 87 84 try: 88 - config.stream = stream_name( 89 - host=socket.gethostname(), qualifier="tmux" 90 - ) 85 + config.stream = stream_name(host=socket.gethostname(), qualifier="tmux") 91 86 except ValueError as e: 92 87 print(f"Error deriving stream name: {e}", file=sys.stderr) 93 88 return 1 ··· 105 100 try: 106 101 result = subprocess.run( 107 102 [sol, "observer", "--json", "create", config.stream], 108 - capture_output=True, text=True, timeout=10, 103 + capture_output=True, 104 + text=True, 105 + timeout=10, 109 106 ) 110 107 if result.returncode == 0: 111 108 data = json.loads(result.stdout) ··· 113 110 save_config(config) 114 111 print(f"Registered (key: {config.key[:8]}...)") 115 112 else: 116 - print(f"CLI registration failed, trying HTTP...") 113 + print("CLI registration failed, trying HTTP...") 117 114 except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, OSError): 118 115 print("CLI registration failed, trying HTTP...") 119 116 ··· 124 121 config = load_config() 125 122 print(f"Registered (key: {config.key[:8]}...)") 126 123 else: 127 - print("Warning: registration failed. Run setup again when server is available.") 124 + print( 125 + "Warning: registration failed. Run setup again when server is available." 126 + ) 128 127 else: 129 128 print(f"Already registered (key: {config.key[:8]}...)") 130 129 131 130 print(f"\nConfig saved to {config.config_path}") 132 131 print(f"Captures will go to {config.captures_dir}") 133 - print(f"\nRun 'solstone-tmux run' to start, or 'solstone-tmux install-service' for systemd.") 132 + print( 133 + "\nRun 'solstone-tmux run' to start, or 'solstone-tmux install-service' for systemd." 134 + ) 134 135 return 0 135 136 136 137 ··· 224 225 225 226 size_mb = total_size / (1024 * 1024) 226 227 print(f"Cache: {captures_dir}") 227 - print(f" {segment_count} segments across {day_count} day(s), {size_mb:.1f} MB") 228 + print( 229 + f" {segment_count} segments across {day_count} day(s), {size_mb:.1f} MB" 230 + ) 228 231 else: 229 232 print(f"Cache: {captures_dir} (not created yet)") 233 + 234 + # Retention policy 235 + retention = config.cache_retention_days 236 + if retention < 0: 237 + print("Retain: forever") 238 + elif retention == 0: 239 + print("Retain: delete after sync") 240 + else: 241 + print(f"Retain: {retention} day(s)") 230 242 231 243 # Synced days 232 244 synced_path = config.state_dir / "synced_days.json"
+10 -1
src/solstone_tmux/config.py
··· 34 34 stream: str = "" 35 35 capture_interval: int = DEFAULT_CAPTURE_INTERVAL 36 36 segment_interval: int = DEFAULT_SEGMENT_INTERVAL 37 - sync_retry_delays: list[int] = field(default_factory=lambda: list(DEFAULT_SYNC_RETRY_DELAYS)) 37 + sync_retry_delays: list[int] = field( 38 + default_factory=lambda: list(DEFAULT_SYNC_RETRY_DELAYS) 39 + ) 38 40 sync_max_retries: int = DEFAULT_SYNC_MAX_RETRIES 41 + cache_retention_days: int = 7 39 42 base_dir: Path = DEFAULT_BASE_DIR 40 43 41 44 @property ··· 87 90 config.sync_retry_delays = data["sync_retry_delays"] 88 91 if "sync_max_retries" in data: 89 92 config.sync_max_retries = data["sync_max_retries"] 93 + if "cache_retention_days" in data: 94 + try: 95 + config.cache_retention_days = int(data["cache_retention_days"]) 96 + except (ValueError, TypeError): 97 + pass 90 98 91 99 return config 92 100 ··· 103 111 "segment_interval": config.segment_interval, 104 112 "sync_retry_delays": config.sync_retry_delays, 105 113 "sync_max_retries": config.sync_max_retries, 114 + "cache_retention_days": config.cache_retention_days, 106 115 } 107 116 108 117 config_path = config.config_path
+1 -3
src/solstone_tmux/observer.py
··· 84 84 if now - self.last_capture_time < self.capture_interval: 85 85 return 86 86 87 - active_sessions = self.tmux_capture.get_active_sessions( 88 - self.capture_interval 89 - ) 87 + active_sessions = self.tmux_capture.get_active_sessions(self.capture_interval) 90 88 if not active_sessions: 91 89 return 92 90
+112 -2
src/solstone_tmux/sync.py
··· 15 15 import logging 16 16 import os 17 17 import time 18 - from datetime import datetime 18 + import shutil 19 + from datetime import datetime, timedelta 19 20 from pathlib import Path 20 21 from typing import Any 21 22 ··· 178 179 self._synced_days.add(day) 179 180 self._save_synced_days() 180 181 182 + # Cleanup old synced segments 183 + if not self._circuit_open and self._running: 184 + try: 185 + await self._cleanup_synced_segments() 186 + except Exception as e: 187 + logger.error(f"Cleanup error: {e}", exc_info=True) 188 + 189 + async def _cleanup_synced_segments(self) -> None: 190 + """Delete synced segments older than cache_retention_days. 191 + 192 + Triple-gated safety: 193 + 1. Day must be in _synced_days (fully synced locally) 194 + 2. Segment must be older than retention threshold (unless retention=0) 195 + 3. Segment must be confirmed present on server (fresh query) 196 + """ 197 + retention = self._config.cache_retention_days 198 + if retention < 0: 199 + return 200 + 201 + captures_dir = self._config.captures_dir 202 + if not captures_dir.exists(): 203 + return 204 + 205 + today = datetime.now().strftime("%Y%m%d") 206 + if retention > 0: 207 + cutoff = (datetime.now() - timedelta(days=retention)).strftime("%Y%m%d") 208 + else: 209 + cutoff = today # 0 means delete immediately — all days qualify 210 + 211 + deleted_total = 0 212 + 213 + for day_dir in sorted(captures_dir.iterdir()): 214 + if not day_dir.is_dir(): 215 + continue 216 + 217 + day = day_dir.name 218 + 219 + if not self._running: 220 + break 221 + 222 + # Gate 1: day must be in synced_days 223 + if day not in self._synced_days: 224 + continue 225 + 226 + # Gate 2: day must be old enough (unless retention=0) 227 + if retention > 0 and day >= cutoff: 228 + continue 229 + 230 + # Don't clean today's segments 231 + if day == today: 232 + continue 233 + 234 + # Gate 3: fresh server confirmation 235 + server_segments = await asyncio.to_thread( 236 + self._client.get_server_segments, day 237 + ) 238 + if server_segments is None: 239 + logger.warning("Cleanup: skipping day %s — server unreachable", day) 240 + continue 241 + 242 + server_keys: set[str] = set() 243 + for seg in server_segments: 244 + server_keys.add(seg.get("key", "")) 245 + if "original_key" in seg: 246 + server_keys.add(seg["original_key"]) 247 + 248 + deleted_day = 0 249 + 250 + for stream_dir in day_dir.iterdir(): 251 + if not stream_dir.is_dir(): 252 + continue 253 + 254 + for seg_dir in sorted(stream_dir.iterdir()): 255 + if not seg_dir.is_dir(): 256 + continue 257 + 258 + name = seg_dir.name 259 + # Never touch incomplete or failed 260 + if name.endswith(".incomplete") or name.endswith(".failed"): 261 + continue 262 + 263 + if name not in server_keys: 264 + logger.warning( 265 + "Cleanup: keeping %s/%s — not confirmed on server", 266 + day, 267 + name, 268 + ) 269 + continue 270 + 271 + shutil.rmtree(seg_dir) 272 + logger.info("Cleanup: deleted %s/%s", day, name) 273 + deleted_day += 1 274 + 275 + # Remove empty stream dir 276 + if stream_dir.is_dir() and not any(stream_dir.iterdir()): 277 + stream_dir.rmdir() 278 + 279 + # Remove empty day dir 280 + if day_dir.is_dir() and not any(day_dir.iterdir()): 281 + day_dir.rmdir() 282 + 283 + if deleted_day: 284 + deleted_total += deleted_day 285 + 286 + if deleted_total: 287 + logger.info("Cleanup: deleted %d segment(s) total", deleted_total) 288 + 181 289 def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]: 182 290 """Collect completed segments grouped by day.""" 183 291 result: dict[str, list[Path]] = {} ··· 236 344 237 345 if attempt < max_retries - 1: 238 346 delay = retry_delays[min(attempt, len(retry_delays) - 1)] 239 - logger.info(f"Retrying {day}/{segment_key} in {delay}s (attempt {attempt + 2})") 347 + logger.info( 348 + f"Retrying {day}/{segment_key} in {delay}s (attempt {attempt + 2})" 349 + ) 240 350 await asyncio.sleep(delay) 241 351 242 352 logger.error(f"Upload failed after {max_retries} attempts: {day}/{segment_key}")
+15 -4
src/solstone_tmux/upload.py
··· 71 71 try: 72 72 result = subprocess.run( 73 73 [sol, "observer", "--json", "create", name], 74 - capture_output=True, text=True, timeout=10, 74 + capture_output=True, 75 + text=True, 76 + timeout=10, 75 77 ) 76 78 if result.returncode == 0: 77 79 data = json.loads(result.stdout) ··· 79 81 self._persist_key(config, self._key) 80 82 logger.info(f"CLI-registered as '{name}' (key: {self._key[:8]}...)") 81 83 return True 82 - except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, OSError) as e: 84 + except ( 85 + subprocess.TimeoutExpired, 86 + json.JSONDecodeError, 87 + KeyError, 88 + OSError, 89 + ) as e: 83 90 logger.debug(f"CLI registration failed: {e}") 84 91 85 92 if not self._url: ··· 96 103 data = resp.json() 97 104 self._key = data["key"] 98 105 self._persist_key(config, self._key) 99 - logger.info(f"Auto-registered as '{name}' (key: {self._key[:8]}...)") 106 + logger.info( 107 + f"Auto-registered as '{name}' (key: {self._key[:8]}...)" 108 + ) 100 109 return True 101 110 elif resp.status_code == 403: 102 111 self._revoked = True ··· 180 189 if attempt < len(self._retry_backoff) - 1: 181 190 time.sleep(delay) 182 191 183 - logger.error(f"Upload failed after {len(self._retry_backoff)} attempts: {day}/{segment}") 192 + logger.error( 193 + f"Upload failed after {len(self._retry_backoff)} attempts: {day}/{segment}" 194 + ) 184 195 return UploadResult(False) 185 196 186 197 def get_server_segments(self, day: str) -> list[dict] | None:
+24 -6
tests/test_capture.py
··· 51 51 windows=[], 52 52 panes=[ 53 53 PaneInfo( 54 - id="%0", index=0, left=0, top=0, 55 - width=80, height=24, active=True, content="hello", 54 + id="%0", 55 + index=0, 56 + left=0, 57 + top=0, 58 + width=80, 59 + height=24, 60 + active=True, 61 + content="hello", 56 62 ) 57 63 ], 58 64 ) ··· 68 74 windows=[], 69 75 panes=[ 70 76 PaneInfo( 71 - id="%0", index=0, left=0, top=0, 72 - width=80, height=24, active=True, content="hello", 77 + id="%0", 78 + index=0, 79 + left=0, 80 + top=0, 81 + width=80, 82 + height=24, 83 + active=True, 84 + content="hello", 73 85 ) 74 86 ], 75 87 ) ··· 79 91 windows=[], 80 92 panes=[ 81 93 PaneInfo( 82 - id="%0", index=0, left=0, top=0, 83 - width=80, height=24, active=True, content="world", 94 + id="%0", 95 + index=0, 96 + left=0, 97 + top=0, 98 + width=80, 99 + height=24, 100 + active=True, 101 + content="world", 84 102 ) 85 103 ], 86 104 )
+17 -1
tests/test_config.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - import json 5 4 from pathlib import Path 6 5 7 6 from solstone_tmux.config import Config, load_config, save_config ··· 55 54 56 55 mode = config.config_path.stat().st_mode & 0o777 57 56 assert mode == 0o600 57 + 58 + def test_cache_retention_days_roundtrip(self, tmp_path: Path): 59 + config = Config(base_dir=tmp_path) 60 + config.cache_retention_days = 14 61 + save_config(config) 62 + 63 + loaded = load_config(tmp_path) 64 + assert loaded.cache_retention_days == 14 65 + 66 + def test_cache_retention_days_default(self, tmp_path: Path): 67 + """Existing configs without cache_retention_days default to 7.""" 68 + config_dir = tmp_path / "config" 69 + config_dir.mkdir(parents=True) 70 + (config_dir / "config.json").write_text('{"server_url": "http://test"}') 71 + 72 + loaded = load_config(tmp_path) 73 + assert loaded.cache_retention_days == 7
+207 -4
tests/test_sync.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - import json 5 4 from pathlib import Path 5 + from unittest.mock import AsyncMock, patch 6 + 7 + import pytest 6 8 7 9 from solstone_tmux.config import Config 8 10 from solstone_tmux.recovery import recover_incomplete_segments 11 + from solstone_tmux.sync import SyncService 12 + from solstone_tmux.upload import UploadClient 9 13 10 14 11 15 class TestRecovery: 12 16 """Test crash recovery for incomplete segments.""" 13 17 14 18 def _make_incomplete( 15 - self, captures_dir: Path, day: str, stream: str, time_prefix: str, age: int = 300 19 + self, 20 + captures_dir: Path, 21 + day: str, 22 + stream: str, 23 + time_prefix: str, 24 + age: int = 300, 16 25 ) -> Path: 17 26 """Create an incomplete segment directory with a dummy file.""" 18 27 import os ··· 20 29 21 30 seg_dir = captures_dir / day / stream / f"{time_prefix}.incomplete" 22 31 seg_dir.mkdir(parents=True) 23 - (seg_dir / f"tmux_main_screen.jsonl").write_text('{"frame_id": 1}\n') 32 + (seg_dir / "tmux_main_screen.jsonl").write_text('{"frame_id": 1}\n') 24 33 25 34 # Set timestamps to simulate age 26 35 old_time = time.time() - age ··· 29 38 30 39 def test_recovers_old_incomplete(self, tmp_path: Path): 31 40 captures_dir = tmp_path / "captures" 32 - self._make_incomplete(captures_dir, "20260403", "archon.tmux", "140000", age=300) 41 + self._make_incomplete( 42 + captures_dir, "20260403", "archon.tmux", "140000", age=300 43 + ) 33 44 34 45 recovered = recover_incomplete_segments(captures_dir) 35 46 assert recovered == 1 ··· 106 117 assert "150000_300" in names 107 118 assert "145000.incomplete" not in names 108 119 assert "143000.failed" not in names 120 + 121 + 122 + class TestCleanupSyncedSegments: 123 + """Test cache retention cleanup of synced segments.""" 124 + 125 + def _make_sync(self, tmp_path: Path, retention: int = 7) -> SyncService: 126 + config = Config(base_dir=tmp_path) 127 + config.cache_retention_days = retention 128 + config.ensure_dirs() 129 + client = UploadClient(config) 130 + return SyncService(config, client) 131 + 132 + def _create_segment( 133 + self, captures_dir: Path, day: str, stream: str, name: str 134 + ) -> Path: 135 + seg_dir = captures_dir / day / stream / name 136 + seg_dir.mkdir(parents=True, exist_ok=True) 137 + (seg_dir / "test.jsonl").write_text("{}\n") 138 + return seg_dir 139 + 140 + @pytest.mark.asyncio 141 + async def test_deletes_old_synced_confirmed(self, tmp_path: Path): 142 + """Segments in synced_days + confirmed on server + old enough -> deleted.""" 143 + sync = self._make_sync(tmp_path, retention=7) 144 + captures = sync._config.captures_dir 145 + 146 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 147 + sync._synced_days.add("20260101") 148 + 149 + server_response = [{"key": "120000_300"}] 150 + with patch( 151 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 152 + ): 153 + await sync._cleanup_synced_segments() 154 + 155 + assert not (captures / "20260101" / "archon.tmux" / "120000_300").exists() 156 + 157 + @pytest.mark.asyncio 158 + async def test_keeps_unconfirmed_on_server(self, tmp_path: Path): 159 + """Segments in synced_days + NOT on server -> not deleted.""" 160 + sync = self._make_sync(tmp_path, retention=7) 161 + captures = sync._config.captures_dir 162 + 163 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 164 + sync._synced_days.add("20260101") 165 + 166 + server_response = [{"key": "999999_300"}] 167 + with patch( 168 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 169 + ): 170 + await sync._cleanup_synced_segments() 171 + 172 + assert (captures / "20260101" / "archon.tmux" / "120000_300").exists() 173 + 174 + @pytest.mark.asyncio 175 + async def test_keeps_segments_not_in_synced_days(self, tmp_path: Path): 176 + """Segments NOT in synced_days -> not deleted.""" 177 + sync = self._make_sync(tmp_path, retention=7) 178 + captures = sync._config.captures_dir 179 + 180 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 181 + 182 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 183 + await sync._cleanup_synced_segments() 184 + 185 + assert (captures / "20260101" / "archon.tmux" / "120000_300").exists() 186 + mock_thread.assert_not_called() 187 + 188 + @pytest.mark.asyncio 189 + async def test_keeps_when_server_unreachable(self, tmp_path: Path): 190 + """Server unreachable (returns None) -> nothing deleted.""" 191 + sync = self._make_sync(tmp_path, retention=7) 192 + captures = sync._config.captures_dir 193 + 194 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 195 + sync._synced_days.add("20260101") 196 + 197 + with patch("asyncio.to_thread", new_callable=AsyncMock, return_value=None): 198 + await sync._cleanup_synced_segments() 199 + 200 + assert (captures / "20260101" / "archon.tmux" / "120000_300").exists() 201 + 202 + @pytest.mark.asyncio 203 + async def test_never_touches_incomplete_or_failed(self, tmp_path: Path): 204 + """.incomplete and .failed segments are never deleted.""" 205 + sync = self._make_sync(tmp_path, retention=7) 206 + captures = sync._config.captures_dir 207 + 208 + self._create_segment(captures, "20260101", "archon.tmux", "120000.incomplete") 209 + self._create_segment(captures, "20260101", "archon.tmux", "130000.failed") 210 + self._create_segment(captures, "20260101", "archon.tmux", "140000_300") 211 + sync._synced_days.add("20260101") 212 + 213 + server_response = [{"key": "140000_300"}] 214 + with patch( 215 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 216 + ): 217 + await sync._cleanup_synced_segments() 218 + 219 + assert (captures / "20260101" / "archon.tmux" / "120000.incomplete").exists() 220 + assert (captures / "20260101" / "archon.tmux" / "130000.failed").exists() 221 + assert not (captures / "20260101" / "archon.tmux" / "140000_300").exists() 222 + 223 + @pytest.mark.asyncio 224 + async def test_retention_negative_one_keeps_forever(self, tmp_path: Path): 225 + """cache_retention_days = -1 -> nothing deleted.""" 226 + sync = self._make_sync(tmp_path, retention=-1) 227 + captures = sync._config.captures_dir 228 + 229 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 230 + sync._synced_days.add("20260101") 231 + 232 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 233 + await sync._cleanup_synced_segments() 234 + 235 + assert (captures / "20260101" / "archon.tmux" / "120000_300").exists() 236 + mock_thread.assert_not_called() 237 + 238 + @pytest.mark.asyncio 239 + async def test_retention_zero_deletes_immediately(self, tmp_path: Path): 240 + """cache_retention_days = 0 -> deletes immediately (no age check).""" 241 + sync = self._make_sync(tmp_path, retention=0) 242 + captures = sync._config.captures_dir 243 + 244 + from datetime import datetime, timedelta 245 + 246 + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") 247 + 248 + self._create_segment(captures, yesterday, "archon.tmux", "120000_300") 249 + sync._synced_days.add(yesterday) 250 + 251 + server_response = [{"key": "120000_300"}] 252 + with patch( 253 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 254 + ): 255 + await sync._cleanup_synced_segments() 256 + 257 + assert not (captures / yesterday / "archon.tmux" / "120000_300").exists() 258 + 259 + @pytest.mark.asyncio 260 + async def test_never_cleans_today(self, tmp_path: Path): 261 + """Today's segments are never cleaned, even with retention=0.""" 262 + sync = self._make_sync(tmp_path, retention=0) 263 + captures = sync._config.captures_dir 264 + 265 + from datetime import datetime 266 + 267 + today = datetime.now().strftime("%Y%m%d") 268 + 269 + self._create_segment(captures, today, "archon.tmux", "120000_300") 270 + sync._synced_days.add(today) 271 + 272 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 273 + await sync._cleanup_synced_segments() 274 + 275 + assert (captures / today / "archon.tmux" / "120000_300").exists() 276 + mock_thread.assert_not_called() 277 + 278 + @pytest.mark.asyncio 279 + async def test_cleans_empty_dirs(self, tmp_path: Path): 280 + """Empty stream and day dirs are removed after segment deletion.""" 281 + sync = self._make_sync(tmp_path, retention=7) 282 + captures = sync._config.captures_dir 283 + 284 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 285 + sync._synced_days.add("20260101") 286 + 287 + server_response = [{"key": "120000_300"}] 288 + with patch( 289 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 290 + ): 291 + await sync._cleanup_synced_segments() 292 + 293 + assert not (captures / "20260101" / "archon.tmux").exists() 294 + assert not (captures / "20260101").exists() 295 + 296 + @pytest.mark.asyncio 297 + async def test_original_key_lookup(self, tmp_path: Path): 298 + """Server segment with original_key should match local segment.""" 299 + sync = self._make_sync(tmp_path, retention=7) 300 + captures = sync._config.captures_dir 301 + 302 + self._create_segment(captures, "20260101", "archon.tmux", "120000_300") 303 + sync._synced_days.add("20260101") 304 + 305 + server_response = [{"key": "renamed_key", "original_key": "120000_300"}] 306 + with patch( 307 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 308 + ): 309 + await sync._cleanup_synced_segments() 310 + 311 + assert not (captures / "20260101" / "archon.tmux" / "120000_300").exists()