personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add convey.emit() for non-blocking Callosum events from routes

Adds emit() function to convey/bridge.py that reuses the shared
CallosumConnection for sending events. Non-blocking with drop-on-disconnect
semantics - messages queue for background thread, dropped if disconnected.

- Export emit() from convey package
- Migrate apps/import and apps/remote from callosum_send() to emit()
- Document server-side events in APPS.md and CALLOSUM.md
- Update bridge.py docstring for bidirectional capability
- Remove unused timeout parameter from stop_bridge()

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+405 -14
+3 -5
apps/import/routes.py
··· 10 10 from flask import Blueprint, jsonify, render_template, request 11 11 from werkzeug.utils import secure_filename 12 12 13 - from convey import state 14 - from think.callosum import callosum_send 13 + from convey import emit, state 15 14 from think.detect_created import detect_created 16 15 from think.importer_utils import ( 17 16 build_import_info, ··· 399 398 except Exception as e: 400 399 return jsonify({"error": f"Failed to update metadata: {str(e)}"}), 500 401 400 402 - # Emit task request to Callosum 403 - if not callosum_send("supervisor", "request", ref=task_id, cmd=cmd): 404 - return jsonify({"error": "Failed to submit task"}), 500 401 + # Emit task request to Callosum (non-blocking, drops if disconnected) 402 + emit("supervisor", "request", ref=task_id, cmd=cmd) 405 403 406 404 return jsonify({"status": "ok", "task_id": task_id})
+349
apps/remote/routes.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Remote app - manage remote observer connections. 5 + 6 + Provides endpoints for: 7 + - Managing remote observer registrations (UI) 8 + - Receiving file uploads from remote observers (ingest) 9 + - Relaying events from remote observers to local Callosum 10 + """ 11 + 12 + from __future__ import annotations 13 + 14 + import base64 15 + import json 16 + import logging 17 + import secrets 18 + import time 19 + from pathlib import Path 20 + from typing import Any 21 + 22 + import re 23 + 24 + from flask import Blueprint, jsonify, request 25 + from werkzeug.utils import secure_filename 26 + 27 + from apps.utils import get_app_storage_path 28 + from convey import emit, state 29 + from think.utils import day_path 30 + 31 + logger = logging.getLogger(__name__) 32 + 33 + remote_bp = Blueprint( 34 + "app:remote", 35 + __name__, 36 + url_prefix="/app/remote", 37 + ) 38 + 39 + # Key length in bytes (256 bits = 32 bytes) 40 + KEY_BYTES = 32 41 + 42 + 43 + def _generate_key() -> str: 44 + """Generate a URL-safe key for remote authentication.""" 45 + return base64.urlsafe_b64encode(secrets.token_bytes(KEY_BYTES)).decode().rstrip("=") 46 + 47 + 48 + def _get_remotes_dir() -> Path: 49 + """Get the remotes storage directory.""" 50 + return get_app_storage_path("remote", "remotes", ensure_exists=True) 51 + 52 + 53 + def _load_remote(key: str) -> dict | None: 54 + """Load remote metadata by key.""" 55 + remotes_dir = _get_remotes_dir() 56 + # Use first 8 chars of key as filename for readability 57 + remote_path = remotes_dir / f"{key[:8]}.json" 58 + if not remote_path.exists(): 59 + return None 60 + try: 61 + with open(remote_path) as f: 62 + data = json.load(f) 63 + # Verify full key matches 64 + if data.get("key") != key: 65 + return None 66 + return data 67 + except (json.JSONDecodeError, OSError): 68 + return None 69 + 70 + 71 + def _save_remote(data: dict) -> bool: 72 + """Save remote metadata.""" 73 + key = data.get("key") 74 + if not key: 75 + return False 76 + remotes_dir = _get_remotes_dir() 77 + remote_path = remotes_dir / f"{key[:8]}.json" 78 + try: 79 + with open(remote_path, "w") as f: 80 + json.dump(data, f, indent=2) 81 + return True 82 + except OSError: 83 + return False 84 + 85 + 86 + def _delete_remote(key: str) -> bool: 87 + """Delete remote by key.""" 88 + remotes_dir = _get_remotes_dir() 89 + remote_path = remotes_dir / f"{key[:8]}.json" 90 + try: 91 + if remote_path.exists(): 92 + remote_path.unlink() 93 + return True 94 + return False 95 + except OSError: 96 + return False 97 + 98 + 99 + def _list_remotes() -> list[dict]: 100 + """List all registered remotes.""" 101 + remotes_dir = _get_remotes_dir() 102 + remotes = [] 103 + for remote_path in remotes_dir.glob("*.json"): 104 + try: 105 + with open(remote_path) as f: 106 + data = json.load(f) 107 + remotes.append(data) 108 + except (json.JSONDecodeError, OSError): 109 + continue 110 + # Sort by created_at descending 111 + remotes.sort(key=lambda x: x.get("created_at", 0), reverse=True) 112 + return remotes 113 + 114 + 115 + # === Management API (session-protected) === 116 + 117 + 118 + @remote_bp.route("/api/list") 119 + def api_list() -> Any: 120 + """List all registered remotes.""" 121 + remotes = _list_remotes() 122 + # Sanitize output - don't expose full keys 123 + result = [] 124 + for r in remotes: 125 + result.append( 126 + { 127 + "key_prefix": r.get("key", "")[:8], 128 + "name": r.get("name", ""), 129 + "created_at": r.get("created_at", 0), 130 + "last_seen": r.get("last_seen"), 131 + "last_segment": r.get("last_segment"), 132 + "enabled": r.get("enabled", True), 133 + "stats": r.get("stats", {}), 134 + } 135 + ) 136 + return jsonify(result) 137 + 138 + 139 + @remote_bp.route("/api/create", methods=["POST"]) 140 + def api_create() -> Any: 141 + """Create a new remote registration.""" 142 + data = request.get_json(force=True) if request.is_json else {} 143 + name = data.get("name", "").strip() 144 + if not name: 145 + return jsonify({"error": "Name is required"}), 400 146 + 147 + # Generate key 148 + key = _generate_key() 149 + 150 + # Create remote record 151 + remote_data = { 152 + "key": key, 153 + "name": name, 154 + "created_at": int(time.time() * 1000), 155 + "last_seen": None, 156 + "last_segment": None, 157 + "enabled": True, 158 + "stats": { 159 + "segments_received": 0, 160 + "bytes_received": 0, 161 + }, 162 + } 163 + 164 + if not _save_remote(remote_data): 165 + return jsonify({"error": "Failed to save remote"}), 500 166 + 167 + # Build ingest URL 168 + ingest_url = f"/app/remote/ingest/{key}" 169 + 170 + return jsonify( 171 + { 172 + "key": key, 173 + "key_prefix": key[:8], 174 + "name": name, 175 + "ingest_url": ingest_url, 176 + } 177 + ) 178 + 179 + 180 + @remote_bp.route("/api/<key_prefix>", methods=["DELETE"]) 181 + def api_delete(key_prefix: str) -> Any: 182 + """Delete/revoke a remote by key prefix.""" 183 + # Find remote by prefix 184 + remotes_dir = _get_remotes_dir() 185 + remote_path = remotes_dir / f"{key_prefix}.json" 186 + if not remote_path.exists(): 187 + return jsonify({"error": "Remote not found"}), 404 188 + 189 + try: 190 + with open(remote_path) as f: 191 + data = json.load(f) 192 + key = data.get("key", "") 193 + except (json.JSONDecodeError, OSError): 194 + return jsonify({"error": "Failed to read remote"}), 500 195 + 196 + if not _delete_remote(key): 197 + return jsonify({"error": "Failed to delete remote"}), 500 198 + 199 + return jsonify({"status": "ok"}) 200 + 201 + 202 + # === Ingest API (key-protected) === 203 + 204 + 205 + @remote_bp.route("/ingest/<key>", methods=["POST"]) 206 + def ingest_upload(key: str) -> Any: 207 + """Receive file uploads from remote observer. 208 + 209 + Expects multipart form with: 210 + - segment: Segment key (HHMMSS_LEN) 211 + - day: Day string (YYYYMMDD) 212 + - files: One or more media files 213 + 214 + Writes files to journal and emits observe.observing event. 215 + """ 216 + # Validate key 217 + remote = _load_remote(key) 218 + if not remote: 219 + return jsonify({"error": "Invalid key"}), 401 220 + 221 + if not remote.get("enabled", True): 222 + return jsonify({"error": "Remote disabled"}), 403 223 + 224 + # Get segment and day from form 225 + segment = request.form.get("segment", "").strip() 226 + day = request.form.get("day", "").strip() 227 + 228 + if not segment: 229 + return jsonify({"error": "Missing segment"}), 400 230 + if not day: 231 + return jsonify({"error": "Missing day"}), 400 232 + 233 + # Validate segment format (HHMMSS_LEN) 234 + if not re.match(r"^\d{6}_\d+$", segment): 235 + return jsonify({"error": "Invalid segment format"}), 400 236 + 237 + # Validate day format (YYYYMMDD) 238 + if not re.match(r"^\d{8}$", day): 239 + return jsonify({"error": "Invalid day format"}), 400 240 + 241 + # Get uploaded files 242 + files = request.files.getlist("files") 243 + if not files: 244 + return jsonify({"error": "No files uploaded"}), 400 245 + 246 + # Ensure day directory exists 247 + target_dir = day_path(day) 248 + target_dir.mkdir(parents=True, exist_ok=True) 249 + 250 + # Save files 251 + saved_files = [] 252 + total_bytes = 0 253 + 254 + for upload in files: 255 + if not upload.filename: 256 + continue 257 + 258 + # Secure the filename 259 + filename = secure_filename(upload.filename) 260 + if not filename: 261 + continue 262 + 263 + target_path = target_dir / filename 264 + 265 + # Save file 266 + try: 267 + upload.save(target_path) 268 + saved_files.append(filename) 269 + total_bytes += target_path.stat().st_size 270 + logger.info(f"Saved {filename} to {target_dir}") 271 + except OSError as e: 272 + logger.error(f"Failed to save {filename}: {e}") 273 + return jsonify({"error": f"Failed to save {filename}"}), 500 274 + 275 + if not saved_files: 276 + return jsonify({"error": "No valid files saved"}), 400 277 + 278 + # Update remote stats 279 + remote["last_seen"] = int(time.time() * 1000) 280 + remote["last_segment"] = segment 281 + remote["stats"]["segments_received"] = ( 282 + remote["stats"].get("segments_received", 0) + 1 283 + ) 284 + remote["stats"]["bytes_received"] = ( 285 + remote["stats"].get("bytes_received", 0) + total_bytes 286 + ) 287 + _save_remote(remote) 288 + 289 + # Emit observe.observing event to local Callosum 290 + emit( 291 + "observe", 292 + "observing", 293 + segment=segment, 294 + day=day, 295 + files=saved_files, 296 + remote=remote.get("name", "unknown"), 297 + ) 298 + 299 + logger.info( 300 + f"Received {len(saved_files)} files for {day}/{segment} from {remote.get('name')}" 301 + ) 302 + 303 + return jsonify( 304 + { 305 + "status": "ok", 306 + "files": saved_files, 307 + "bytes": total_bytes, 308 + } 309 + ) 310 + 311 + 312 + @remote_bp.route("/ingest/<key>/event", methods=["POST"]) 313 + def ingest_event(key: str) -> Any: 314 + """Receive events from remote observer and relay to local Callosum. 315 + 316 + Expects JSON body with: 317 + - tract: Event tract 318 + - event: Event name 319 + - ...additional fields 320 + """ 321 + # Validate key 322 + remote = _load_remote(key) 323 + if not remote: 324 + return jsonify({"error": "Invalid key"}), 401 325 + 326 + if not remote.get("enabled", True): 327 + return jsonify({"error": "Remote disabled"}), 403 328 + 329 + # Parse event 330 + data = request.get_json(force=True) if request.is_json else {} 331 + 332 + tract = data.get("tract") 333 + event = data.get("event") 334 + 335 + if not tract or not event: 336 + return jsonify({"error": "Missing tract or event"}), 400 337 + 338 + # Add remote identifier 339 + data["remote"] = remote.get("name", "unknown") 340 + 341 + # Relay to local Callosum 342 + emit(tract, event, **{k: v for k, v in data.items() if k not in ("tract", "event")}) 343 + 344 + # Update last_seen on status events 345 + if tract == "observe" and event == "status": 346 + remote["last_seen"] = int(time.time() * 1000) 347 + _save_remote(remote) 348 + 349 + return jsonify({"status": "ok"})
+2 -1
convey/__init__.py
··· 16 16 17 17 from . import state 18 18 from .apps import register_app_context 19 - from .bridge import register_websocket 19 + from .bridge import emit, register_websocket 20 20 from .cli import run_service 21 21 from .config import bp as config_bp 22 22 from .root import bp as root_bp 23 23 24 24 __all__ = [ 25 25 "create_app", 26 + "emit", 26 27 "run_service", 27 28 ] 28 29
+24 -5
convey/bridge.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Bridge between Callosum message bus and WebSocket clients. 4 + """Bidirectional bridge between Callosum message bus and WebSocket clients. 5 5 6 - Listens to all Callosum events (cortex, task, indexer, etc.) and broadcasts them 7 - to connected WebSocket clients. 6 + Receives Callosum events and broadcasts them to connected WebSocket clients. 7 + Also provides emit() for route handlers to send events via the shared connection. 8 8 """ 9 9 10 10 from __future__ import annotations ··· 73 73 _CALLOSUM_CONNECTION = None 74 74 75 75 76 - def stop_bridge(timeout: float = 5.0) -> None: 77 - """Stop listening for Callosum events.""" 76 + def stop_bridge() -> None: 77 + """Stop the Callosum bridge.""" 78 78 global _CALLOSUM_CONNECTION 79 79 with _WATCH_LOCK: 80 80 if _CALLOSUM_CONNECTION: 81 81 _CALLOSUM_CONNECTION.stop() 82 82 _CALLOSUM_CONNECTION = None 83 83 logger.info("Callosum bridge stopped") 84 + 85 + 86 + def emit(tract: str, event: str, **fields) -> bool: 87 + """Emit event via shared Callosum connection. 88 + 89 + Non-blocking: queues message for background thread to send. 90 + If disconnected, message is dropped (with debug logging). 91 + 92 + Args: 93 + tract: Event category/namespace 94 + event: Event type 95 + **fields: Additional event fields 96 + 97 + Returns: 98 + True if queued successfully, False if bridge not started or queue full 99 + """ 100 + if _CALLOSUM_CONNECTION: 101 + return _CALLOSUM_CONNECTION.emit(tract, event, **fields) 102 + return False 84 103 85 104 86 105 def register_websocket(sock: Sock, path: str = "/ws/events") -> None:
+25 -1
docs/APPS.md
··· 413 413 414 414 **See implementation:** `convey/static/app.js` - Facet switching logic and event dispatch 415 415 416 - ### WebSocket Events 416 + ### WebSocket Events (Client-Side) 417 417 418 418 `window.appEvents` API defined in `convey/static/websocket.js`: 419 419 - `listen(tract, callback)` - Subscribe to specific tract or '*' for all events ··· 422 422 **Common tracts:** `cortex`, `indexer`, `observe`, `task` 423 423 424 424 See [CALLOSUM.md](CALLOSUM.md) for complete event protocol. 425 + 426 + ### Server-Side Events 427 + 428 + Emit Callosum events from route handlers using `convey.emit()`: 429 + 430 + ```python 431 + from convey import emit 432 + 433 + @my_bp.route("/action", methods=["POST"]) 434 + def handle_action(): 435 + # ... process request ... 436 + 437 + # Emit event (non-blocking, drops if disconnected) 438 + emit("my_app", "action_complete", item_id=123, status="success") 439 + 440 + return jsonify({"status": "ok"}) 441 + ``` 442 + 443 + **Behavior:** 444 + - Non-blocking: queues message for background thread 445 + - If Callosum disconnected, message is dropped (with debug logging) 446 + - Returns `True` if queued, `False` if bridge not started or queue full 447 + 448 + **Reference implementations:** `apps/import/routes.py`, `apps/remote/routes.py` 425 449 426 450 --- 427 451
+2 -2
docs/CALLOSUM.md
··· 58 58 **Fields:** 59 59 - `status`: Periodic state (every 5s while running) 60 60 - From `observer.py`: `screencast`, `audio`, `activity` - Live capture state 61 - - `screencast.files_growing` - Whether recording files are actively being written (used for health) 61 + - `screencast.files_growing` - Whether recording files are actively being written 62 62 - From `sense.py`: `describe`, `transcribe` - Processing pipeline state (with `running`/`queued` sub-fields) 63 63 - `observing`: `day`, `segment`, `files` - Recording window boundary crossed with saved files 64 64 - `detected`: `file`, `handler`, `ref` - File detected and handler spawned ··· 66 66 - `observed`: `segment`, `duration` - All files for segment fully processed 67 67 - Observer events (`status`, `observing`) include `host` (hostname) and `platform` ("linux"/"darwin") for multi-host support 68 68 **Purpose:** Track observation pipeline from live capture state through processing completion 69 - **Health Derivation:** Supervisor derives `see`/`hear` health from `observe.status` event recency and content 69 + **Health Model:** Fail-fast - observers exit if capture stalls (e.g., files not growing). Supervisor checks event freshness only. 70 70 **Path Format:** Relative to `JOURNAL_PATH` (e.g., `20251102/163045_300_center_DP-3_screen.webm` for multi-monitor recordings) 71 71 **Correlation:** `detected.ref` matches `logs.exec.ref` for the same handler process; `observed.segment` groups all files from same capture window 72 72