linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""HTTP upload client for solstone ingest server.
5
6Extracted from solstone's observe/remote_client.py. Accepts Config
7as constructor parameter instead of reading config internally.
8
9Refinements over tmux baseline:
10- Respects configured sync_max_retries without hard cap
11- Error classification: auth (401/403) vs transient (5xx/network)
12"""
13
14from __future__ import annotations
15
16import json
17import logging
18import shutil
19import subprocess
20import time
21from enum import Enum
22from pathlib import Path
23from typing import Any, NamedTuple
24
25import requests
26
27from .config import Config
28
29logger = logging.getLogger(__name__)
30
31UPLOAD_TIMEOUT = 300
32EVENT_TIMEOUT = 30
33
34
35class ErrorType(Enum):
36 """Classification of upload errors for circuit breaker tuning."""
37
38 AUTH = "auth" # 401, 403 — open circuit immediately
39 CLIENT = "client" # 400 — non-retryable, don't count for circuit
40 TRANSIENT = "transient" # 5xx, network, timeout — allow more failures
41
42
43class UploadResult(NamedTuple):
44 success: bool
45 duplicate: bool = False
46 error_type: ErrorType | None = None
47
48
49class UploadClient:
50 """HTTP client for uploading observer segments to the ingest server."""
51
52 def __init__(self, config: Config):
53 self._url = config.server_url.rstrip("/") if config.server_url else ""
54 self._key = config.key
55 self._stream = config.stream
56 self._revoked = False
57 self._session = requests.Session()
58 self._retry_backoff = config.sync_retry_delays or [5, 30, 120, 300]
59 # Respect configured retry cap — no hard min(config, 3)
60 self._max_retries = config.sync_max_retries
61
62 @property
63 def is_revoked(self) -> bool:
64 return self._revoked
65
66 def _persist_key(self, config: Config, key: str) -> None:
67 """Save auto-registered key back to config."""
68 from .config import save_config
69
70 config.key = key
71 save_config(config)
72
73 def ensure_registered(self, config: Config) -> bool:
74 """Ensure the client has a valid key, auto-registering if needed.
75
76 Tries sol CLI first (no server needed), falls back to HTTP.
77 Returns True if a key is available.
78 """
79 if self._key:
80 return True
81
82 # Try sol CLI registration first
83 name = self._stream or "solstone-linux"
84 sol = shutil.which("sol")
85 if sol:
86 try:
87 result = subprocess.run(
88 [sol, "observer", "--json", "create", name],
89 capture_output=True,
90 text=True,
91 timeout=10,
92 )
93 if result.returncode == 0:
94 data = json.loads(result.stdout)
95 self._key = data["key"]
96 self._persist_key(config, self._key)
97 logger.info(f"CLI-registered as '{name}' (key: {self._key[:8]}...)")
98 return True
99 except (
100 subprocess.TimeoutExpired,
101 json.JSONDecodeError,
102 KeyError,
103 OSError,
104 ) as e:
105 logger.debug(f"CLI registration failed: {e}")
106
107 if not self._url:
108 return False
109
110 url = f"{self._url}/app/observer/api/create"
111
112 retries = min(3, len(self._retry_backoff))
113 for attempt in range(retries):
114 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)]
115 try:
116 resp = self._session.post(
117 url, json={"name": name}, timeout=EVENT_TIMEOUT
118 )
119 if resp.status_code == 200:
120 data = resp.json()
121 self._key = data["key"]
122 self._persist_key(config, self._key)
123 logger.info(
124 f"Auto-registered as '{name}' (key: {self._key[:8]}...)"
125 )
126 return True
127 elif resp.status_code == 403:
128 self._revoked = True
129 logger.error("Registration rejected (403)")
130 return False
131 else:
132 logger.warning(
133 f"Registration attempt {attempt + 1} failed: {resp.status_code}"
134 )
135 except requests.RequestException as e:
136 logger.warning(f"Registration attempt {attempt + 1} failed: {e}")
137 if attempt < retries - 1:
138 time.sleep(delay)
139
140 logger.error(f"Registration failed after {retries} attempts")
141 return False
142
143 @staticmethod
144 def classify_error(
145 status_code: int | None, is_network_error: bool = False
146 ) -> ErrorType:
147 """Classify an error for circuit breaker and retry decisions."""
148 if is_network_error:
149 return ErrorType.TRANSIENT
150 if status_code is None:
151 return ErrorType.TRANSIENT
152 if status_code in (401, 403):
153 return ErrorType.AUTH
154 if status_code == 400:
155 return ErrorType.CLIENT
156 # 5xx and anything else
157 return ErrorType.TRANSIENT
158
159 def upload_segment(
160 self,
161 day: str,
162 segment: str,
163 files: list[Path],
164 meta: dict[str, Any] | None = None,
165 ) -> UploadResult:
166 """Upload a segment's files to the ingest server."""
167 if self._revoked or not self._key or not self._url:
168 return UploadResult(
169 False, error_type=ErrorType.AUTH if self._revoked else None
170 )
171
172 url = f"{self._url}/app/observer/ingest/{self._key}"
173
174 for attempt in range(self._max_retries):
175 file_handles = []
176 files_data = []
177 error_type = None
178 try:
179 for path in files:
180 if not path.exists():
181 logger.warning(f"File not found, skipping: {path}")
182 continue
183 fh = open(path, "rb")
184 file_handles.append(fh)
185 files_data.append(
186 ("files", (path.name, fh, "application/octet-stream"))
187 )
188
189 if not files_data:
190 return UploadResult(False)
191
192 data: dict[str, Any] = {"day": day, "segment": segment}
193 if meta:
194 data["meta"] = json.dumps(meta)
195
196 response = self._session.post(
197 url, data=data, files=files_data, timeout=UPLOAD_TIMEOUT
198 )
199
200 if response.status_code == 200:
201 resp_data = response.json()
202 is_duplicate = resp_data.get("status") == "duplicate"
203 return UploadResult(True, duplicate=is_duplicate)
204
205 error_type = self.classify_error(response.status_code)
206
207 if error_type == ErrorType.AUTH:
208 if response.status_code == 403:
209 self._revoked = True
210 logger.error(
211 f"Upload rejected ({response.status_code}): {response.text}"
212 )
213 return UploadResult(False, error_type=error_type)
214
215 if error_type == ErrorType.CLIENT:
216 logger.error(
217 f"Upload rejected ({response.status_code}): {response.text}"
218 )
219 return UploadResult(False, error_type=error_type)
220
221 logger.warning(
222 f"Upload attempt {attempt + 1} failed: "
223 f"{response.status_code} {response.text}"
224 )
225 except requests.RequestException as e:
226 error_type = ErrorType.TRANSIENT
227 logger.warning(f"Upload attempt {attempt + 1} failed: {e}")
228 finally:
229 for fh in file_handles:
230 try:
231 fh.close()
232 except Exception:
233 pass
234
235 if attempt < self._max_retries - 1:
236 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)]
237 time.sleep(delay)
238
239 logger.error(
240 f"Upload failed after {self._max_retries} attempts: {day}/{segment}"
241 )
242 return UploadResult(False, error_type=error_type)
243
244 def get_server_segments(self, day: str) -> list[dict] | None:
245 """Query server for segments on a given day.
246
247 Returns list of segment dicts, or None on failure.
248 """
249 if self._revoked or not self._key or not self._url:
250 return None
251
252 url = f"{self._url}/app/observer/ingest/{self._key}/segments/{day}"
253 params = {}
254 if self._stream:
255 params["stream"] = self._stream
256
257 try:
258 resp = self._session.get(url, params=params, timeout=EVENT_TIMEOUT)
259 if resp.status_code == 200:
260 return resp.json()
261 if resp.status_code in (401, 403):
262 if resp.status_code == 403:
263 self._revoked = True
264 logger.error(f"Segments query rejected ({resp.status_code})")
265 return None
266 logger.warning(f"Segments query failed: {resp.status_code}")
267 return None
268 except requests.RequestException as e:
269 logger.debug(f"Segments query failed: {e}")
270 return None
271
272 def relay_event(self, tract: str, event: str, **fields: Any) -> bool:
273 """Fire-and-forget event relay."""
274 if self._revoked or not self._key or not self._url:
275 return False
276
277 url = f"{self._url}/app/observer/ingest/{self._key}/event"
278 payload = {"tract": tract, "event": event, **fields}
279 try:
280 resp = self._session.post(url, json=payload, timeout=EVENT_TIMEOUT)
281 if resp.status_code == 200:
282 return True
283 if resp.status_code == 403:
284 self._revoked = True
285 return False
286 except requests.RequestException:
287 return False
288
289 def stop(self) -> None:
290 self._session.close()