personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

observer: move freshness classification to the server

Classify registered-observer freshness in `apps/observer/routes.py` via
_classify_observer_freshness(). /app/observer/api/list now returns
state/group/label/elapsed_ms/clock_skew per observer; workspace.html
consumes them directly and the client-side freshness() computation
is deleted. FUTURE_CLOCK_DRIFT_TOLERANCE_MS (5 min) clamps obviously
bad client timestamps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+259 -132
+72 -15
apps/observer/routes.py
··· 60 60 KEY_BYTES = 32 61 61 ACTIVE_THRESHOLD_MS = 30_000 62 62 STALE_THRESHOLD_MS = 120_000 63 + FUTURE_CLOCK_DRIFT_TOLERANCE_MS = 5 * 60 * 1000 64 + 65 + OBSERVER_STATE_LABELS = { 66 + "connected": "Connected", 67 + "stale": "Stale", 68 + "disconnected": "Disconnected", 69 + "revoked": "Revoked", 70 + } 63 71 64 72 65 73 def _get_key(url_key: str | None = None) -> str | None: ··· 77 85 return base64.urlsafe_b64encode(secrets.token_bytes(KEY_BYTES)).decode().rstrip("=") 78 86 79 87 80 - def _group_for(last_seen_ms: int | None, revoked: bool, now_ms: int) -> str: 81 - """Derive observer display group from freshness state.""" 82 - if revoked or last_seen_ms is None: 83 - return "inactive" 88 + def _classify_observer_freshness( 89 + last_seen_ms: int | None, 90 + revoked: bool, 91 + now_ms: int, 92 + ) -> dict[str, object]: 93 + """Classify a registered observer's freshness. 94 + 95 + Returns keys: state, group, elapsed_ms, clock_skew. 96 + """ 97 + if revoked: 98 + return { 99 + "state": "revoked", 100 + "group": "inactive", 101 + "elapsed_ms": None, 102 + "clock_skew": False, 103 + } 104 + if last_seen_ms is None: 105 + return { 106 + "state": "disconnected", 107 + "group": "inactive", 108 + "elapsed_ms": None, 109 + "clock_skew": False, 110 + } 84 111 elapsed = now_ms - last_seen_ms 112 + if elapsed < -FUTURE_CLOCK_DRIFT_TOLERANCE_MS: 113 + return { 114 + "state": "disconnected", 115 + "group": "inactive", 116 + "elapsed_ms": elapsed, 117 + "clock_skew": True, 118 + } 119 + if elapsed < 0: 120 + return { 121 + "state": "connected", 122 + "group": "active", 123 + "elapsed_ms": 0, 124 + "clock_skew": False, 125 + } 85 126 if elapsed < ACTIVE_THRESHOLD_MS: 86 - return "active" 127 + return { 128 + "state": "connected", 129 + "group": "active", 130 + "elapsed_ms": elapsed, 131 + "clock_skew": False, 132 + } 87 133 if elapsed < STALE_THRESHOLD_MS: 88 - return "stale" 89 - return "inactive" 134 + return { 135 + "state": "stale", 136 + "group": "stale", 137 + "elapsed_ms": elapsed, 138 + "clock_skew": False, 139 + } 140 + return { 141 + "state": "disconnected", 142 + "group": "inactive", 143 + "elapsed_ms": elapsed, 144 + "clock_skew": False, 145 + } 90 146 91 147 92 148 def _revoke_observer(key: str) -> bool: ··· 105 161 @observer_bp.route("/api/list") 106 162 def api_list() -> Any: 107 163 """List all registered observers.""" 108 - now = now_ms() 164 + current_now = now_ms() 109 165 observers = list_observers() 110 166 # Sanitize output - don't expose full keys 111 167 result = [] 112 168 for r in observers: 113 169 key_prefix = r.get("key", "")[:8] 170 + freshness = _classify_observer_freshness( 171 + r.get("last_seen"), 172 + r.get("revoked", False), 173 + current_now, 174 + ) 114 175 result.append( 115 176 { 116 177 "key_prefix": key_prefix, ··· 122 183 "revoked": r.get("revoked", False), 123 184 "revoked_at": r.get("revoked_at"), 124 185 "stats": r.get("stats", {}), 186 + **freshness, 187 + "label": OBSERVER_STATE_LABELS[str(freshness["state"])], 125 188 } 126 189 ) 127 190 128 191 group_order = {"active": 0, "stale": 1, "inactive": 2} 129 192 result.sort( 130 193 key=lambda observer: ( 131 - group_order[ 132 - _group_for( 133 - observer.get("last_seen"), 134 - observer.get("revoked", False), 135 - now, 136 - ) 137 - ], 194 + group_order[observer.get("group", "inactive")], 138 195 1 if observer.get("last_seen") is None else 0, 139 196 -(observer.get("last_seen") or 0), 140 197 observer.get("key_prefix", ""),
+166 -61
apps/observer/tests/test_routes.py
··· 9 9 import json 10 10 11 11 import apps.observer.routes as routes_module 12 + from apps.observer.routes import ( 13 + ACTIVE_THRESHOLD_MS, 14 + FUTURE_CLOCK_DRIFT_TOLERANCE_MS, 15 + OBSERVER_STATE_LABELS, 16 + STALE_THRESHOLD_MS, 17 + _classify_observer_freshness, 18 + ) 12 19 from apps.observer.utils import save_observer 13 20 14 21 ··· 50 57 return key 51 58 52 59 53 - def _client_state(observer: dict, thresholds: dict[str, int], current_now: int) -> str: 54 - if observer.get("revoked"): 55 - return "revoked" 56 - last_seen = observer.get("last_seen") 57 - if last_seen is None: 58 - return "disconnected" 59 - elapsed = current_now - last_seen 60 - if elapsed < thresholds["active_ms"]: 61 - return "connected" 62 - if elapsed < thresholds["stale_ms"]: 63 - return "stale" 64 - return "disconnected" 60 + def test_classifier_last_seen_none_returns_disconnected(): 61 + """Missing last_seen is classified as disconnected.""" 62 + assert _classify_observer_freshness(None, False, 1_000_000) == { 63 + "state": "disconnected", 64 + "group": "inactive", 65 + "elapsed_ms": None, 66 + "clock_skew": False, 67 + } 68 + 69 + 70 + def test_classifier_future_within_tolerance_returns_connected_no_skew(): 71 + """Small future drift stays connected without clock skew.""" 72 + current_now = 1_000_000 73 + assert 60_000 < FUTURE_CLOCK_DRIFT_TOLERANCE_MS 74 + 75 + assert _classify_observer_freshness(current_now + 60_000, False, current_now) == { 76 + "state": "connected", 77 + "group": "active", 78 + "elapsed_ms": 0, 79 + "clock_skew": False, 80 + } 81 + 65 82 83 + def test_classifier_future_beyond_tolerance_returns_disconnected_with_skew(): 84 + """Large future drift is disconnected and flagged for clock skew.""" 85 + current_now = 1_000_000 86 + last_seen = current_now + (10 * 60_000) 87 + assert (10 * 60_000) > FUTURE_CLOCK_DRIFT_TOLERANCE_MS 66 88 67 - def _group_from_client_state(state: str) -> str: 68 - if state == "connected": 69 - return "active" 70 - if state == "stale": 71 - return "stale" 72 - return "inactive" 89 + result = _classify_observer_freshness(last_seen, False, current_now) 90 + 91 + assert result["state"] == "disconnected" 92 + assert result["group"] == "inactive" 93 + assert result["clock_skew"] is True 94 + assert result["elapsed_ms"] == -600_000 95 + 96 + 97 + def test_classifier_just_under_active_returns_connected(): 98 + """Elapsed time just under the active threshold stays connected.""" 99 + current_now = 1_000_000 100 + 101 + assert _classify_observer_freshness( 102 + current_now - (ACTIVE_THRESHOLD_MS - 1), 103 + False, 104 + current_now, 105 + ) == { 106 + "state": "connected", 107 + "group": "active", 108 + "elapsed_ms": ACTIVE_THRESHOLD_MS - 1, 109 + "clock_skew": False, 110 + } 111 + 112 + 113 + def test_classifier_just_over_active_returns_stale(): 114 + """Elapsed time at the active threshold enters the stale bucket.""" 115 + current_now = 1_000_000 116 + 117 + assert _classify_observer_freshness( 118 + current_now - ACTIVE_THRESHOLD_MS, 119 + False, 120 + current_now, 121 + ) == { 122 + "state": "stale", 123 + "group": "stale", 124 + "elapsed_ms": ACTIVE_THRESHOLD_MS, 125 + "clock_skew": False, 126 + } 127 + 128 + 129 + def test_classifier_beyond_stale_returns_disconnected(): 130 + """Elapsed time at the stale threshold becomes disconnected.""" 131 + current_now = 1_000_000 132 + 133 + assert _classify_observer_freshness( 134 + current_now - STALE_THRESHOLD_MS, 135 + False, 136 + current_now, 137 + ) == { 138 + "state": "disconnected", 139 + "group": "inactive", 140 + "elapsed_ms": STALE_THRESHOLD_MS, 141 + "clock_skew": False, 142 + } 143 + 144 + 145 + def test_classifier_revoked_returns_revoked_regardless_of_last_seen(): 146 + """Revoked observers stay revoked for both missing and recent last_seen.""" 147 + current_now = 1_000_000 148 + expected = { 149 + "state": "revoked", 150 + "group": "inactive", 151 + "elapsed_ms": None, 152 + "clock_skew": False, 153 + } 154 + 155 + assert _classify_observer_freshness(None, True, current_now) == expected 156 + assert _classify_observer_freshness(current_now, True, current_now) == expected 73 157 74 158 75 159 def test_api_list_empty(observer_env): ··· 150 234 assert observers[0]["name"] == "my-observer" 151 235 assert observers[0]["enabled"] is True 152 236 assert observers[0]["stats"]["segments_received"] == 0 237 + assert observers[0]["state"] == "disconnected" 238 + assert observers[0]["group"] == "inactive" 239 + assert observers[0]["label"] == OBSERVER_STATE_LABELS["disconnected"] 240 + assert observers[0]["elapsed_ms"] is None 241 + assert observers[0]["clock_skew"] is False 153 242 154 243 155 244 def test_api_delete_observer(observer_env): ··· 175 264 assert observers[0]["key_prefix"] == key_prefix 176 265 assert observers[0]["revoked"] is True 177 266 assert observers[0]["revoked_at"] is not None 267 + assert observers[0]["state"] == "revoked" 268 + assert observers[0]["group"] == "inactive" 269 + assert observers[0]["label"] == OBSERVER_STATE_LABELS["revoked"] 270 + assert observers[0]["elapsed_ms"] is None 271 + assert observers[0]["clock_skew"] is False 178 272 179 273 180 274 def test_api_list_sorts_by_group_and_last_seen(observer_env, monkeypatch): ··· 215 309 "inactive-disconnected", 216 310 "inactive-never", 217 311 ] 312 + assert [ 313 + ( 314 + observer["state"], 315 + observer["group"], 316 + observer["label"], 317 + observer["elapsed_ms"], 318 + observer["clock_skew"], 319 + ) 320 + for observer in observers 321 + ] == [ 322 + ("connected", "active", OBSERVER_STATE_LABELS["connected"], 5_000, False), 323 + ("stale", "stale", OBSERVER_STATE_LABELS["stale"], 60_000, False), 324 + ( 325 + "disconnected", 326 + "inactive", 327 + OBSERVER_STATE_LABELS["disconnected"], 328 + 600_000, 329 + False, 330 + ), 331 + ( 332 + "disconnected", 333 + "inactive", 334 + OBSERVER_STATE_LABELS["disconnected"], 335 + None, 336 + False, 337 + ), 338 + ] 218 339 219 340 220 341 def test_api_list_tie_breaks_by_key_prefix(observer_env, monkeypatch): ··· 241 362 "aaaa0000", 242 363 "bbbb0000", 243 364 ] 365 + assert all(observer["state"] == "connected" for observer in observers) 366 + assert all(observer["group"] == "active" for observer in observers) 367 + assert all( 368 + observer["label"] == OBSERVER_STATE_LABELS["connected"] 369 + for observer in observers 370 + ) 244 371 245 372 246 373 def test_api_list_revoked_observer_buckets_inactive(observer_env, monkeypatch): ··· 268 395 "stale-observer", 269 396 "revoked-observer", 270 397 ] 398 + assert observers[0]["state"] == "stale" 399 + assert observers[0]["group"] == "stale" 400 + assert observers[0]["label"] == OBSERVER_STATE_LABELS["stale"] 401 + assert observers[0]["elapsed_ms"] == 60_000 402 + assert observers[0]["clock_skew"] is False 403 + assert observers[1]["state"] == "revoked" 404 + assert observers[1]["group"] == "inactive" 405 + assert observers[1]["label"] == OBSERVER_STATE_LABELS["revoked"] 406 + assert observers[1]["elapsed_ms"] is None 407 + assert observers[1]["clock_skew"] is False 271 408 272 409 273 - def test_api_list_client_server_mapping_agree(observer_env, monkeypatch): 274 - """Client freshness and server ordering stay aligned via shared thresholds.""" 410 + def test_api_list_includes_state_and_group_per_observer(observer_env, monkeypatch): 411 + """api_list includes freshness state, grouping, label, and skew metadata.""" 275 412 env = observer_env() 276 413 fixed_now = 5_000_000 277 414 monkeypatch.setattr(routes_module, "now_ms", lambda: fixed_now) 278 415 279 - _save_test_observer("eeee0000", "never", created_at=10, last_seen=None) 280 - _save_test_observer( 281 - "dddd0000", 282 - "revoked", 283 - created_at=20, 284 - last_seen=fixed_now - 1_000, 285 - revoked=True, 286 - ) 287 - _save_test_observer( 288 - "cccc0000", 289 - "inactive", 290 - created_at=30, 291 - last_seen=fixed_now - 600_000, 292 - ) 293 - _save_test_observer( 294 - "bbbb0000", 295 - "stale", 296 - created_at=40, 297 - last_seen=fixed_now - 60_000, 298 - ) 299 416 _save_test_observer( 300 417 "aaaa0000", 301 - "active", 302 - created_at=50, 418 + "active-observer", 419 + created_at=10, 303 420 last_seen=fixed_now - 5_000, 304 421 ) 305 422 306 - payload = _api_list_payload(env) 307 - thresholds = payload["thresholds"] 308 - observers = payload["observers"] 309 - group_order = {"active": 0, "stale": 1, "inactive": 2} 310 - 311 - expected = sorted( 312 - observers, 313 - key=lambda observer: ( 314 - group_order[ 315 - _group_from_client_state(_client_state(observer, thresholds, fixed_now)) 316 - ], 317 - 1 if observer.get("last_seen") is None else 0, 318 - -(observer.get("last_seen") or 0), 319 - observer["key_prefix"], 320 - ), 321 - ) 423 + observer = _api_list_observers(env)[0] 322 424 323 - assert [observer["key_prefix"] for observer in observers] == [ 324 - observer["key_prefix"] for observer in expected 325 - ] 425 + assert observer["state"] == "connected" 426 + assert observer["group"] == "active" 427 + assert observer["label"] == OBSERVER_STATE_LABELS["connected"] 428 + assert isinstance(observer["elapsed_ms"], int) 429 + assert observer["elapsed_ms"] == 5_000 430 + assert observer["clock_skew"] is False 326 431 327 432 328 433 def test_api_delete_nonexistent(observer_env):
+21 -56
apps/observer/workspace.html
··· 642 642 let currentFullKey = null; 643 643 let revealTimer = null; 644 644 645 - function statusMeta(observer) { 646 - if (observer.revoked) { 647 - return { 648 - statusClass: 'revoked', 649 - statusText: 'Revoked', 650 - cardClass: 'revoked' 651 - }; 652 - } 653 - 654 - const state = freshness(observer.last_seen); 645 + function statusMeta(state) { 646 + const currentState = state || 'disconnected'; 655 647 return { 656 - statusClass: state, 657 - statusText: state === 'connected' ? 'Connected' : state === 'stale' ? 'Stale' : 'Disconnected', 658 - cardClass: state 648 + statusClass: currentState, 649 + statusText: 650 + currentState === 'connected' 651 + ? 'Connected' 652 + : currentState === 'stale' 653 + ? 'Stale' 654 + : currentState === 'revoked' 655 + ? 'Revoked' 656 + : 'Disconnected', 657 + cardClass: currentState 659 658 }; 660 659 } 661 660 662 - function groupFor(state, revoked) { 663 - if (revoked) return 'inactive'; 661 + function groupFor(state) { 664 662 if (state === 'connected') return 'active'; 665 663 if (state === 'stale') return 'stale'; 666 664 return 'inactive'; ··· 671 669 } 672 670 673 671 function observerCardHTML(observer) { 674 - const { statusClass, statusText, cardClass } = statusMeta(observer); 672 + const { statusClass, statusText, cardClass } = statusMeta(observer.state); 673 + const label = observer.label || statusText; 674 + const safeLabel = escapeHtml(label); 675 675 676 676 return ` 677 - <div class="observer-card ${cardClass}" data-key="${observer.key_prefix}" role="listitem" aria-label="${escapeHtml(observer.name)}, ${statusText}"> 677 + <div class="observer-card ${cardClass}" data-key="${observer.key_prefix}" role="listitem" aria-label="${escapeHtml(observer.name)}, ${safeLabel}"> 678 678 <div class="observer-header"> 679 679 <span class="observer-name">${escapeHtml(observer.name)}</span> 680 - <span class="observer-status ${statusClass}">${statusText}</span> 680 + <span class="observer-status ${statusClass}">${safeLabel}</span> 681 681 </div> 682 682 <div class="observer-stats">${statsHTML(observer, statusClass)}</div> 683 683 <div class="observer-actions"> ··· 700 700 }; 701 701 702 702 for (const observer of observers) { 703 - const { statusClass } = statusMeta(observer); 704 - groups[groupFor(statusClass, observer.revoked)].push(observer); 703 + const group = observer.group || groupFor(observer.state); 704 + (groups[group] || groups.inactive).push(observer); 705 705 } 706 706 707 707 let html = ''; ··· 733 733 if (state === 'stale') return `${text} — stale`; 734 734 if (state === 'disconnected') return `${text} — offline`; 735 735 return text; 736 - } 737 - 738 - function freshness(lastSeen) { 739 - if (!lastSeen) return 'disconnected'; 740 - const elapsed = Date.now() - lastSeen; 741 - if (elapsed < thresholds.active_ms) return 'connected'; 742 - if (elapsed < thresholds.stale_ms) return 'stale'; 743 - return 'disconnected'; 744 736 } 745 737 746 738 async function loadObservers() { ··· 809 801 for (const span of spans) { 810 802 const lastSeen = parseInt(span.getAttribute('data-last-seen'), 10); 811 803 if (!lastSeen) continue; 812 - 813 - // Recompute freshness — it may have changed since last data fetch 814 - const newState = freshness(lastSeen); 815 - const oldState = span.getAttribute('data-state'); 816 - 817 - // Update timestamp text 818 - span.textContent = formatTimeAgo(lastSeen, newState); 819 - span.setAttribute('data-state', newState); 820 - 821 - // If freshness changed, update the card's visual state too. Group placement 822 - // intentionally lags until the next 30s poll rebuild. 823 - if (newState !== oldState) { 824 - const card = span.closest('[data-key]'); 825 - if (card && !card.classList.contains('revoked')) { 826 - card.className = `observer-card ${newState}`; 827 - const statusEl = card.querySelector('.observer-status'); 828 - if (statusEl) { 829 - statusEl.className = `observer-status ${newState}`; 830 - statusEl.textContent = newState === 'connected' ? 'Connected' : newState === 'stale' ? 'Stale' : 'Disconnected'; 831 - } 832 - // Update aria-label 833 - const nameEl = card.querySelector('.observer-name'); 834 - if (nameEl) { 835 - const statusText = newState === 'connected' ? 'Connected' : newState === 'stale' ? 'Stale' : 'Disconnected'; 836 - card.setAttribute('aria-label', `${nameEl.textContent}, ${statusText}`); 837 - } 838 - } 839 - } 804 + span.textContent = formatTimeAgo(lastSeen, span.getAttribute('data-state')); 840 805 } 841 806 } 842 807