linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 290 lines 10 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""HTTP upload client for solstone ingest server. 5 6Extracted from solstone's observe/remote_client.py. Accepts Config 7as constructor parameter instead of reading config internally. 8 9Refinements over tmux baseline: 10- Respects configured sync_max_retries without hard cap 11- Error classification: auth (401/403) vs transient (5xx/network) 12""" 13 14from __future__ import annotations 15 16import json 17import logging 18import shutil 19import subprocess 20import time 21from enum import Enum 22from pathlib import Path 23from typing import Any, NamedTuple 24 25import requests 26 27from .config import Config 28 29logger = logging.getLogger(__name__) 30 31UPLOAD_TIMEOUT = 300 32EVENT_TIMEOUT = 30 33 34 35class ErrorType(Enum): 36 """Classification of upload errors for circuit breaker tuning.""" 37 38 AUTH = "auth" # 401, 403 — open circuit immediately 39 CLIENT = "client" # 400 — non-retryable, don't count for circuit 40 TRANSIENT = "transient" # 5xx, network, timeout — allow more failures 41 42 43class UploadResult(NamedTuple): 44 success: bool 45 duplicate: bool = False 46 error_type: ErrorType | None = None 47 48 49class UploadClient: 50 """HTTP client for uploading observer segments to the ingest server.""" 51 52 def __init__(self, config: Config): 53 self._url = config.server_url.rstrip("/") if config.server_url else "" 54 self._key = config.key 55 self._stream = config.stream 56 self._revoked = False 57 self._session = requests.Session() 58 self._retry_backoff = config.sync_retry_delays or [5, 30, 120, 300] 59 # Respect configured retry cap — no hard min(config, 3) 60 self._max_retries = config.sync_max_retries 61 62 @property 63 def is_revoked(self) -> bool: 64 return self._revoked 65 66 def _persist_key(self, config: Config, key: str) -> None: 67 """Save auto-registered key back to config.""" 68 from .config import save_config 69 70 config.key = key 71 save_config(config) 72 73 def ensure_registered(self, config: Config) -> bool: 74 """Ensure the client has a valid key, auto-registering if needed. 75 76 Tries sol CLI first (no server needed), falls back to HTTP. 77 Returns True if a key is available. 78 """ 79 if self._key: 80 return True 81 82 # Try sol CLI registration first 83 name = self._stream or "solstone-linux" 84 sol = shutil.which("sol") 85 if sol: 86 try: 87 result = subprocess.run( 88 [sol, "observer", "--json", "create", name], 89 capture_output=True, 90 text=True, 91 timeout=10, 92 ) 93 if result.returncode == 0: 94 data = json.loads(result.stdout) 95 self._key = data["key"] 96 self._persist_key(config, self._key) 97 logger.info(f"CLI-registered as '{name}' (key: {self._key[:8]}...)") 98 return True 99 except ( 100 subprocess.TimeoutExpired, 101 json.JSONDecodeError, 102 KeyError, 103 OSError, 104 ) as e: 105 logger.debug(f"CLI registration failed: {e}") 106 107 if not self._url: 108 return False 109 110 url = f"{self._url}/app/observer/api/create" 111 112 retries = min(3, len(self._retry_backoff)) 113 for attempt in range(retries): 114 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 115 try: 116 resp = self._session.post( 117 url, json={"name": name}, timeout=EVENT_TIMEOUT 118 ) 119 if resp.status_code == 200: 120 data = resp.json() 121 self._key = data["key"] 122 self._persist_key(config, self._key) 123 logger.info( 124 f"Auto-registered as '{name}' (key: {self._key[:8]}...)" 125 ) 126 return True 127 elif resp.status_code == 403: 128 self._revoked = True 129 logger.error("Registration rejected (403)") 130 return False 131 else: 132 logger.warning( 133 f"Registration attempt {attempt + 1} failed: {resp.status_code}" 134 ) 135 except requests.RequestException as e: 136 logger.warning(f"Registration attempt {attempt + 1} failed: {e}") 137 if attempt < retries - 1: 138 time.sleep(delay) 139 140 logger.error(f"Registration failed after {retries} attempts") 141 return False 142 143 @staticmethod 144 def classify_error( 145 status_code: int | None, is_network_error: bool = False 146 ) -> ErrorType: 147 """Classify an error for circuit breaker and retry decisions.""" 148 if is_network_error: 149 return ErrorType.TRANSIENT 150 if status_code is None: 151 return ErrorType.TRANSIENT 152 if status_code in (401, 403): 153 return ErrorType.AUTH 154 if status_code == 400: 155 return ErrorType.CLIENT 156 # 5xx and anything else 157 return ErrorType.TRANSIENT 158 159 def upload_segment( 160 self, 161 day: str, 162 segment: str, 163 files: list[Path], 164 meta: dict[str, Any] | None = None, 165 ) -> UploadResult: 166 """Upload a segment's files to the ingest server.""" 167 if self._revoked or not self._key or not self._url: 168 return UploadResult( 169 False, error_type=ErrorType.AUTH if self._revoked else None 170 ) 171 172 url = f"{self._url}/app/observer/ingest/{self._key}" 173 174 for attempt in range(self._max_retries): 175 file_handles = [] 176 files_data = [] 177 error_type = None 178 try: 179 for path in files: 180 if not path.exists(): 181 logger.warning(f"File not found, skipping: {path}") 182 continue 183 fh = open(path, "rb") 184 file_handles.append(fh) 185 files_data.append( 186 ("files", (path.name, fh, "application/octet-stream")) 187 ) 188 189 if not files_data: 190 return UploadResult(False) 191 192 data: dict[str, Any] = {"day": day, "segment": segment} 193 if meta: 194 data["meta"] = json.dumps(meta) 195 196 response = self._session.post( 197 url, data=data, files=files_data, timeout=UPLOAD_TIMEOUT 198 ) 199 200 if response.status_code == 200: 201 resp_data = response.json() 202 is_duplicate = resp_data.get("status") == "duplicate" 203 return UploadResult(True, duplicate=is_duplicate) 204 205 error_type = self.classify_error(response.status_code) 206 207 if error_type == ErrorType.AUTH: 208 if response.status_code == 403: 209 self._revoked = True 210 logger.error( 211 f"Upload rejected ({response.status_code}): {response.text}" 212 ) 213 return UploadResult(False, error_type=error_type) 214 215 if error_type == ErrorType.CLIENT: 216 logger.error( 217 f"Upload rejected ({response.status_code}): {response.text}" 218 ) 219 return UploadResult(False, error_type=error_type) 220 221 logger.warning( 222 f"Upload attempt {attempt + 1} failed: " 223 f"{response.status_code} {response.text}" 224 ) 225 except requests.RequestException as e: 226 error_type = ErrorType.TRANSIENT 227 logger.warning(f"Upload attempt {attempt + 1} failed: {e}") 228 finally: 229 for fh in file_handles: 230 try: 231 fh.close() 232 except Exception: 233 pass 234 235 if attempt < self._max_retries - 1: 236 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 237 time.sleep(delay) 238 239 logger.error( 240 f"Upload failed after {self._max_retries} attempts: {day}/{segment}" 241 ) 242 return UploadResult(False, error_type=error_type) 243 244 def get_server_segments(self, day: str) -> list[dict] | None: 245 """Query server for segments on a given day. 246 247 Returns list of segment dicts, or None on failure. 248 """ 249 if self._revoked or not self._key or not self._url: 250 return None 251 252 url = f"{self._url}/app/observer/ingest/{self._key}/segments/{day}" 253 params = {} 254 if self._stream: 255 params["stream"] = self._stream 256 257 try: 258 resp = self._session.get(url, params=params, timeout=EVENT_TIMEOUT) 259 if resp.status_code == 200: 260 return resp.json() 261 if resp.status_code in (401, 403): 262 if resp.status_code == 403: 263 self._revoked = True 264 logger.error(f"Segments query rejected ({resp.status_code})") 265 return None 266 logger.warning(f"Segments query failed: {resp.status_code}") 267 return None 268 except requests.RequestException as e: 269 logger.debug(f"Segments query failed: {e}") 270 return None 271 272 def relay_event(self, tract: str, event: str, **fields: Any) -> bool: 273 """Fire-and-forget event relay.""" 274 if self._revoked or not self._key or not self._url: 275 return False 276 277 url = f"{self._url}/app/observer/ingest/{self._key}/event" 278 payload = {"tract": tract, "event": event, **fields} 279 try: 280 resp = self._session.post(url, json=payload, timeout=EVENT_TIMEOUT) 281 if resp.status_code == 200: 282 return True 283 if resp.status_code == 403: 284 self._revoked = True 285 return False 286 except requests.RequestException: 287 return False 288 289 def stop(self) -> None: 290 self._session.close()