personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Deliver triage responses over WebSocket

Change POST /api/triage to return an agent_id immediately after spawning the triage agent instead of blocking for the result. Deliver triage completions in the browser via cortex/finish WebSocket events, add GET /api/triage/result/<agent_id> for reload recovery, and record completed triage exchanges from a new home app cortex finish handler.

+404 -167
+59
apps/home/events.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Callosum event handlers for conversation exchange recording. 5 + 6 + Records triage agent completions to the conversation memory service 7 + so exchanges persist for future context injection. 8 + """ 9 + 10 + import logging 11 + 12 + from apps.events import EventContext, on_event 13 + from think.conversation import record_exchange 14 + from think.cortex_client import read_agent_events 15 + 16 + logger = logging.getLogger(__name__) 17 + 18 + TRIAGE_AGENT_NAMES = {"unified", "triage", "onboarding"} 19 + 20 + 21 + @on_event("cortex", "finish") 22 + def record_triage_exchange(ctx: EventContext) -> None: 23 + """Record completed triage agent exchanges to conversation memory.""" 24 + name = ctx.msg.get("name") 25 + if name not in TRIAGE_AGENT_NAMES: 26 + return 27 + 28 + agent_id = ctx.msg.get("agent_id") 29 + if not agent_id: 30 + return 31 + 32 + try: 33 + events = read_agent_events(agent_id) 34 + facet = "" 35 + app = "" 36 + path = "" 37 + user_message = "" 38 + for event in events: 39 + if event.get("event") == "request": 40 + facet = event.get("facet", "") 41 + app = event.get("app", "") 42 + path = event.get("path", "") 43 + user_message = event.get("user_message", "") 44 + break 45 + 46 + result = ctx.msg.get("result", "") 47 + record_exchange( 48 + facet=facet, 49 + app=app, 50 + path=path, 51 + user_message=user_message, 52 + agent_response=result, 53 + muse=name, 54 + agent_id=agent_id, 55 + ) 56 + except Exception: 57 + logger.debug( 58 + "Failed to record conversation exchange for agent %s", agent_id, exc_info=True 59 + )
+168 -119
convey/templates/app.html
··· 274 274 } 275 275 } 276 276 277 - // Auto-retry pending request if < 30 seconds old 278 - if (saved.pendingMessage && saved.pendingMessage.sentAt) { 277 + // Auto-recover pending request if < 5 minutes old 278 + if (saved.pendingMessage && saved.pendingMessage.agentId && saved.pendingMessage.sentAt) { 279 279 var age = Date.now() - saved.pendingMessage.sentAt; 280 - if (age < 30000) { 280 + if (age < 300000) { 281 281 pendingMessage = saved.pendingMessage; 282 282 thinking.style.display = ''; 283 283 inlineResp.style.display = 'none'; ··· 285 285 appBar.classList.add('app-bar--glance'); 286 286 287 287 (async function() { 288 - try { 289 - var history = messages.slice(0, -1).map(function(m) { 290 - return { role: m.role, content: m.content }; 291 - }); 292 - var r = await fetch('/api/triage', { 293 - method: 'POST', 294 - headers: { 'Content-Type': 'application/json' }, 295 - body: JSON.stringify({ 296 - message: pendingMessage.text, 297 - app: '{{ app }}', 298 - path: window.location.pathname, 299 - facet: window.selectedFacet || null, 300 - conversation_history: history.length > 0 ? history : undefined 301 - }) 302 - }); 288 + var agentId = pendingMessage.agentId; 289 + var recoveryCleanup = null; 290 + var recoveryWatchdog = null; 291 + var resolved = false; 303 292 304 - var resp = ''; 305 - var panelHint = false; 306 - var errMsg = null; 307 - if (r.ok) { 308 - var data = await r.json(); 309 - resp = data.response || ''; 310 - panelHint = !!data.panel; 311 - } else { 312 - errMsg = 'Something went wrong. Try again.'; 313 - } 293 + function recoverCleanup() { 294 + if (recoveryCleanup) { recoveryCleanup(); recoveryCleanup = null; } 295 + if (recoveryWatchdog) { clearTimeout(recoveryWatchdog); recoveryWatchdog = null; } 296 + thinking.style.display = 'none'; 297 + input.disabled = false; 298 + input.focus(); 299 + } 300 + 301 + function recoverDeliver(resp, panelHint, errMsg) { 302 + if (resolved) return; 303 + resolved = true; 314 304 var content = errMsg || resp; 315 305 if (content) { 316 306 messages.push({ role: 'agent', content: content, ts: Date.now() }); ··· 319 309 320 310 var userCount = messages.filter(function(m) { return m.role === 'user'; }).length; 321 311 var shouldInline = userCount === 1 && !panelHint && isOneLiner(resp) && !errMsg; 322 - if (shouldInline) { 312 + 313 + if (panelOpen) { 314 + renderMessages(); 315 + msgArea.scrollTop = msgArea.scrollHeight; 316 + } else if (shouldInline) { 323 317 thinking.style.display = 'none'; 324 318 inlineResp.textContent = content; 325 319 inlineResp.style.display = content ? '' : 'none'; ··· 331 325 openPanel(); 332 326 } 333 327 save(); 328 + recoverCleanup(); 329 + } 330 + 331 + // Subscribe to WS first, then check GET 332 + if (window.appEvents) { 333 + recoveryCleanup = window.appEvents.listen('cortex', function(msg) { 334 + if (msg.agent_id === agentId) { 335 + if (recoveryWatchdog) { clearTimeout(recoveryWatchdog); recoveryWatchdog = null; } 336 + recoveryWatchdog = setTimeout(function() { 337 + recoverDeliver('', false, 'Request timed out. Try again.'); 338 + }, 180000); 339 + } 340 + if (msg.agent_id === agentId && msg.event === 'finish') { 341 + var resp = msg.result || ''; 342 + var panelHint = resp.length >= 120 || resp.includes('\n') || resp.split('. ').length > 2; 343 + recoverDeliver(resp, panelHint, null); 344 + } else if (msg.agent_id === agentId && msg.event === 'error') { 345 + recoverDeliver('', false, 'Something went wrong. Try again.'); 346 + } 347 + }); 348 + } 349 + 350 + // Start watchdog for recovery 351 + recoveryWatchdog = setTimeout(function() { 352 + recoverDeliver('', false, 'Request timed out. Try again.'); 353 + }, 180000); 354 + 355 + // Check GET immediately in case agent already finished 356 + try { 357 + var r = await fetch('/api/triage/result/' + agentId); 358 + if (r.ok) { 359 + // Agent already finished — cancel WS subscription and display 360 + if (recoveryCleanup) { recoveryCleanup(); recoveryCleanup = null; } 361 + if (recoveryWatchdog) { clearTimeout(recoveryWatchdog); recoveryWatchdog = null; } 362 + var data = await r.json(); 363 + var resp = data.response || ''; 364 + var panelHint = !!data.panel; 365 + recoverDeliver(resp, panelHint, null); 366 + } 367 + // If 404, agent still running — keep WS subscription active 334 368 } catch (err) { 335 - pendingMessage = null; 336 - var errContent = 'Connection error. Try again.'; 337 - messages.push({ role: 'agent', content: errContent, ts: Date.now() }); 338 - thinking.style.display = 'none'; 339 - inlineResp.textContent = errContent; 340 - inlineResp.style.display = ''; 341 - dismissBtn.style.display = ''; 342 - save(); 343 - } finally { 344 - input.disabled = false; 345 - input.focus(); 369 + // Network error — keep WS subscription active 346 370 } 347 371 })(); 348 372 } else { ··· 413 437 if (!text) return; 414 438 415 439 messages.push({ role: 'user', content: text, ts: Date.now() }); 416 - pendingMessage = { text: text, sentAt: Date.now() }; 417 440 418 441 // Clear input 419 442 input.value = ''; ··· 442 465 var cleanupCortex = null; 443 466 var timerInterval = null; 444 467 var timerTimeout = null; 468 + var watchdogTimer = null; 445 469 var thinkingStarted = Date.now(); 446 470 471 + function cleanup() { 472 + if (cleanupCortex) { cleanupCortex(); cleanupCortex = null; } 473 + if (timerInterval) { clearInterval(timerInterval); timerInterval = null; } 474 + if (timerTimeout) { clearTimeout(timerTimeout); timerTimeout = null; } 475 + if (watchdogTimer) { clearTimeout(watchdogTimer); watchdogTimer = null; } 476 + var pt = document.getElementById('panelThinking'); 477 + if (pt) pt.remove(); 478 + thinking.style.display = 'none'; 479 + input.disabled = false; 480 + input.focus(); 481 + } 482 + 483 + function deliverResult(resp, panelHint, errMsg) { 484 + var content = errMsg || resp; 485 + if (content) { 486 + messages.push({ role: 'agent', content: content, ts: Date.now() }); 487 + } 488 + pendingMessage = null; 489 + 490 + var userCount = messages.filter(function(m) { return m.role === 'user'; }).length; 491 + var shouldInline = userCount === 1 && !panelHint && isOneLiner(resp) && !errMsg; 492 + 493 + if (panelOpen) { 494 + renderMessages(); 495 + msgArea.scrollTop = msgArea.scrollHeight; 496 + } else if (shouldInline) { 497 + inlineResp.textContent = content; 498 + inlineResp.style.display = content ? '' : 'none'; 499 + dismissBtn.style.display = content ? '' : 'none'; 500 + if (!content) appBar.classList.remove('app-bar--glance'); 501 + } else { 502 + appBar.classList.remove('app-bar--glance'); 503 + openPanel(); 504 + } 505 + save(); 506 + cleanup(); 507 + } 508 + 509 + function startWatchdog(agentId) { 510 + if (watchdogTimer) clearTimeout(watchdogTimer); 511 + watchdogTimer = setTimeout(function() { 512 + deliverResult('', false, 'Request timed out. Try again.'); 513 + }, 180000); 514 + } 515 + 516 + // Build conversation history from prior turns (exclude current message) 517 + var history = messages.slice(0, -1).map(function(m) { 518 + return { role: m.role, content: m.content }; 519 + }); 520 + 447 521 try { 448 - if (window.appEvents) { 449 - cleanupCortex = window.appEvents.listen('cortex', function(msg) { 450 - var label = getProgressLabel(msg); 451 - if (label) updateThinkingLabel(label); 452 - }); 522 + var r = await fetch('/api/triage', { 523 + method: 'POST', 524 + headers: { 'Content-Type': 'application/json' }, 525 + body: JSON.stringify({ 526 + message: text, 527 + app: '{{ app }}', 528 + path: window.location.pathname, 529 + facet: window.selectedFacet || null, 530 + conversation_history: history.length > 0 ? history : undefined 531 + }) 532 + }); 533 + 534 + if (!r.ok) { 535 + deliverResult('', false, 'Something went wrong. Try again.'); 536 + return; 453 537 } 454 538 539 + var data = await r.json(); 540 + var agentId = data.agent_id; 541 + if (!agentId) { 542 + deliverResult('', false, 'Something went wrong. Try again.'); 543 + return; 544 + } 545 + 546 + pendingMessage = { text: text, sentAt: Date.now(), agentId: agentId }; 547 + save(); 548 + 549 + // Start elapsed timer 455 550 timerTimeout = setTimeout(function() { 456 551 timerInterval = setInterval(function() { 457 552 var elapsed = Math.round((Date.now() - thinkingStarted) / 1000); ··· 470 565 }, 1000); 471 566 }, 5000); 472 567 473 - // Build conversation history from prior turns (exclude current message) 474 - var history = messages.slice(0, -1).map(function(m) { 475 - return { role: m.role, content: m.content }; 476 - }); 568 + // Start inactivity watchdog 569 + startWatchdog(agentId); 477 570 478 - var r = await fetch('/api/triage', { 479 - method: 'POST', 480 - headers: { 'Content-Type': 'application/json' }, 481 - body: JSON.stringify({ 482 - message: text, 483 - app: '{{ app }}', 484 - path: window.location.pathname, 485 - facet: window.selectedFacet || null, 486 - conversation_history: history.length > 0 ? history : undefined 487 - }) 488 - }); 571 + // Subscribe to cortex events for this agent 572 + if (window.appEvents) { 573 + cleanupCortex = window.appEvents.listen('cortex', function(msg) { 574 + // Update thinking label on any progress event 575 + var label = getProgressLabel(msg); 576 + if (label) updateThinkingLabel(label); 489 577 490 - var resp = ''; 491 - var panelHint = false; 492 - var errMsg = null; 493 - if (r.ok) { 494 - var data = await r.json(); 495 - resp = data.response || ''; 496 - panelHint = !!data.panel; 497 - } else { 498 - errMsg = 'Something went wrong. Try again.'; 499 - } 500 - var content = errMsg || resp; 578 + // Reset inactivity watchdog on any event for our agent 579 + if (msg.agent_id === agentId) { 580 + startWatchdog(agentId); 581 + } 501 582 502 - if (content) { 503 - messages.push({ role: 'agent', content: content, ts: Date.now() }); 583 + // Handle finish/error for our agent 584 + if (msg.agent_id === agentId && msg.event === 'finish') { 585 + var resp = msg.result || ''; 586 + var panelHint = ( 587 + resp.length >= 120 || 588 + resp.includes('\n') || 589 + resp.split('. ').length > 2 || 590 + history.length > 0 591 + ); 592 + deliverResult(resp, panelHint, null); 593 + } else if (msg.agent_id === agentId && msg.event === 'error') { 594 + deliverResult('', false, 'Something went wrong. Try again.'); 595 + } 596 + }); 504 597 } 505 - pendingMessage = null; 506 - 507 - var userCount = messages.filter(function(m) { return m.role === 'user'; }).length; 508 - var shouldInline = userCount === 1 && !panelHint && isOneLiner(resp) && !errMsg; 509 - 510 - if (panelOpen) { 511 - renderMessages(); 512 - msgArea.scrollTop = msgArea.scrollHeight; 513 - } else if (shouldInline) { 514 - // Inline in bar 515 - thinking.style.display = 'none'; 516 - inlineResp.textContent = content; 517 - inlineResp.style.display = content ? '' : 'none'; 518 - dismissBtn.style.display = content ? '' : 'none'; 519 - if (!content) appBar.classList.remove('app-bar--glance'); 520 - } else { 521 - // Expand to panel 522 - thinking.style.display = 'none'; 523 - appBar.classList.remove('app-bar--glance'); 524 - openPanel(); 525 - } 526 - 527 - save(); 528 598 } catch (err) { 529 - pendingMessage = null; 530 - var errContent = 'Connection error. Try again.'; 531 - messages.push({ role: 'agent', content: errContent, ts: Date.now() }); 532 - if (panelOpen) { 533 - renderMessages(); 534 - msgArea.scrollTop = msgArea.scrollHeight; 535 - } else { 536 - thinking.style.display = 'none'; 537 - inlineResp.textContent = errContent; 538 - inlineResp.style.display = ''; 539 - dismissBtn.style.display = ''; 540 - } 541 - save(); 542 - } finally { 543 - if (cleanupCortex) { cleanupCortex(); cleanupCortex = null; } 544 - if (timerInterval) { clearInterval(timerInterval); timerInterval = null; } 545 - if (timerTimeout) { clearTimeout(timerTimeout); timerTimeout = null; } 546 - var pt = document.getElementById('panelThinking'); 547 - if (pt) pt.remove(); 548 - thinking.style.display = 'none'; 549 - input.disabled = false; 550 - input.focus(); 599 + deliverResult('', false, 'Connection error. Try again.'); 551 600 } 552 601 }; 553 602 })();
+34 -48
convey/triage.py
··· 22 22 23 23 @bp.route("", methods=["POST"]) 24 24 def triage() -> Any: 25 - """Accept a message from the conversation panel and return a response. 25 + """Accept a message from the conversation panel and spawn a triage agent. 26 26 27 27 Expects JSON: {message, app, path, facet, conversation_history?} 28 - Returns JSON: {response, panel} 28 + Returns JSON: {agent_id} 29 + 30 + The agent runs asynchronously. The browser receives the result via 31 + WebSocket (cortex/finish event). For reload recovery, use GET /result/<agent_id>. 29 32 30 33 When conversation_history is provided (array of {role, content} pairs), 31 34 routes to the unified muse with full journal context. Otherwise falls ··· 201 204 202 205 try: 203 206 from convey.utils import spawn_agent 204 - from think.cortex_client import read_agent_events, wait_for_agents 205 207 206 208 config: dict[str, Any] = {} 207 209 if facet: 208 210 config["facet"] = facet 211 + config["app"] = app_name 212 + config["path"] = path 213 + config["user_message"] = message 209 214 210 215 agent_id = spawn_agent( 211 216 prompt=full_prompt, ··· 216 221 if agent_id is None: 217 222 return error_response("Failed to connect to agent service", 503) 218 223 219 - completed, timed_out = wait_for_agents([agent_id], timeout=60) 224 + return jsonify(agent_id=agent_id) 220 225 221 - if agent_id in timed_out: 222 - return error_response("Triage request timed out", 504) 226 + except Exception: 227 + logger.exception("Triage request failed") 228 + return error_response("Failed to process triage request", 500) 223 229 224 - end_state = completed.get(agent_id) 225 - if end_state == "error": 226 - return error_response("Triage agent encountered an error", 500) 227 230 228 - # Extract result text from finish event 229 - try: 230 - events = read_agent_events(agent_id) 231 - for event in reversed(events): 232 - if event.get("event") == "finish": 233 - result = event.get("result", "") 231 + @bp.route("/result/<agent_id>", methods=["GET"]) 232 + def triage_result(agent_id: str) -> Any: 233 + """Return the result of a completed triage agent. 234 234 235 - # Record conversation exchange for memory service 236 - try: 237 - from think.conversation import record_exchange 235 + Returns {response, panel} if the agent has finished, 404 otherwise. 236 + Used for page-reload recovery when the WebSocket may have missed the finish event. 237 + """ 238 + try: 239 + from think.cortex_client import read_agent_events 238 240 239 - record_exchange( 240 - facet=facet, 241 - app=app_name, 242 - path=path, 243 - user_message=message, 244 - agent_response=result, 245 - muse=agent_name, 246 - agent_id=agent_id or "", 247 - ) 248 - except Exception: 249 - logger.debug( 250 - "Failed to record conversation exchange", 251 - exc_info=True, 252 - ) 253 - 254 - # Provide panel hint based on response characteristics 255 - panel = ( 256 - len(result) >= 120 257 - or "\n" in result 258 - or len((result or "").split(". ")) > 2 259 - or has_conversation 260 - ) 261 - return jsonify(response=result, panel=panel) 262 - except FileNotFoundError: 263 - pass 264 - 265 - return error_response("No response from triage agent", 500) 266 - 241 + events = read_agent_events(agent_id) 242 + for event in reversed(events): 243 + if event.get("event") == "finish": 244 + result = event.get("result", "") 245 + panel = ( 246 + len(result) >= 120 247 + or "\n" in result 248 + or len((result or "").split(". ")) > 2 249 + ) 250 + return jsonify(response=result, panel=panel) 251 + except FileNotFoundError: 252 + pass 267 253 except Exception: 268 - logger.exception("Triage request failed") 269 - return error_response("Failed to process triage request", 500) 254 + logger.debug("Failed to read triage result for %s", agent_id, exc_info=True) 255 + return jsonify(error="not found"), 404
+143
tests/test_home_events.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for apps/home/events.py — conversation exchange recording.""" 5 + 6 + from unittest.mock import patch 7 + 8 + import pytest 9 + 10 + from apps.events import EventContext, clear_handlers, stop_dispatcher 11 + from apps.home.events import TRIAGE_AGENT_NAMES, record_triage_exchange 12 + 13 + 14 + @pytest.fixture(autouse=True) 15 + def clean_handlers(): 16 + clear_handlers() 17 + yield 18 + clear_handlers() 19 + stop_dispatcher() 20 + 21 + 22 + class TestRecordTriageExchange: 23 + """Tests for record_triage_exchange handler.""" 24 + 25 + def _make_ctx(self, msg): 26 + return EventContext(msg=msg, app="home", tract="cortex", event="finish") 27 + 28 + def test_ignores_non_triage_agent(self): 29 + """Handler returns early for non-triage agent names.""" 30 + ctx = self._make_ctx( 31 + { 32 + "tract": "cortex", 33 + "event": "finish", 34 + "name": "reviewer", 35 + "agent_id": "123", 36 + "result": "hello", 37 + } 38 + ) 39 + with patch("apps.home.events.record_exchange") as mock_record: 40 + record_triage_exchange(ctx) 41 + mock_record.assert_not_called() 42 + 43 + def test_ignores_missing_agent_id(self): 44 + """Handler returns early if agent_id is missing.""" 45 + ctx = self._make_ctx( 46 + { 47 + "tract": "cortex", 48 + "event": "finish", 49 + "name": "unified", 50 + "result": "hello", 51 + } 52 + ) 53 + with patch("apps.home.events.record_exchange") as mock_record: 54 + record_triage_exchange(ctx) 55 + mock_record.assert_not_called() 56 + 57 + @pytest.mark.parametrize("agent_name", sorted(TRIAGE_AGENT_NAMES)) 58 + def test_records_exchange_for_triage_agents(self, agent_name): 59 + """Handler calls record_exchange with correct fields for each triage agent name.""" 60 + events = [ 61 + { 62 + "event": "request", 63 + "ts": 1700000000000, 64 + "agent_id": "abc123", 65 + "facet": "work", 66 + "app": "home", 67 + "path": "/home", 68 + "user_message": "hello world", 69 + }, 70 + { 71 + "event": "finish", 72 + "ts": 1700000001000, 73 + "agent_id": "abc123", 74 + "result": "hi there", 75 + }, 76 + ] 77 + ctx = self._make_ctx( 78 + { 79 + "tract": "cortex", 80 + "event": "finish", 81 + "name": agent_name, 82 + "agent_id": "abc123", 83 + "result": "hi there", 84 + } 85 + ) 86 + with patch("apps.home.events.read_agent_events", return_value=events): 87 + with patch("apps.home.events.record_exchange") as mock_record: 88 + record_triage_exchange(ctx) 89 + mock_record.assert_called_once_with( 90 + facet="work", 91 + app="home", 92 + path="/home", 93 + user_message="hello world", 94 + agent_response="hi there", 95 + muse=agent_name, 96 + agent_id="abc123", 97 + ) 98 + 99 + def test_handles_missing_request_event(self): 100 + """Handler uses empty strings for metadata if request event not found.""" 101 + events = [ 102 + {"event": "finish", "agent_id": "abc123", "result": "done"}, 103 + ] 104 + ctx = self._make_ctx( 105 + { 106 + "tract": "cortex", 107 + "event": "finish", 108 + "name": "unified", 109 + "agent_id": "abc123", 110 + "result": "done", 111 + } 112 + ) 113 + with patch("apps.home.events.read_agent_events", return_value=events): 114 + with patch("apps.home.events.record_exchange") as mock_record: 115 + record_triage_exchange(ctx) 116 + mock_record.assert_called_once_with( 117 + facet="", 118 + app="", 119 + path="", 120 + user_message="", 121 + agent_response="done", 122 + muse="unified", 123 + agent_id="abc123", 124 + ) 125 + 126 + def test_handles_read_error_gracefully(self): 127 + """Handler logs and swallows exceptions from read_agent_events.""" 128 + ctx = self._make_ctx( 129 + { 130 + "tract": "cortex", 131 + "event": "finish", 132 + "name": "unified", 133 + "agent_id": "abc123", 134 + "result": "done", 135 + } 136 + ) 137 + with patch( 138 + "apps.home.events.read_agent_events", 139 + side_effect=FileNotFoundError("not found"), 140 + ): 141 + with patch("apps.home.events.record_exchange") as mock_record: 142 + record_triage_exchange(ctx) # should not raise 143 + mock_record.assert_not_called()