a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add memory review pass (dream/distill)

structurally matches claude code's /dream skill: write-time is fast
and might be wrong (extraction pipeline), review-time is slow and
has distance (this).

process_review() gathers recent active observations across all user
namespaces, asks a review agent to evaluate each with distance from
the original conversation, and applies the decisions:
- keep: observation stays as-is
- supersede: marked superseded, stops appearing in context
- promote: creates a public cosmik card on semble

new endpoint: POST /api/control/review (bearer auth, same pattern
as /post and /explore). no scheduled slot yet — operator-triggered.

+204 -1
+1 -1
loq.toml
··· 21 21 22 22 [[rules]] 23 23 path = "src/bot/main.py" 24 - max_lines = 813 24 + max_lines = 826
+129
src/bot/agent.py
··· 16 16 from bot.core.graze_client import GrazeClient 17 17 from bot.exploration import EXPLORATION_SYSTEM_PROMPT, ExplorationResult 18 18 from bot.memory.extraction import EXTRACTION_SYSTEM_PROMPT, ExtractionResult 19 + from bot.memory.review import REVIEW_SYSTEM_PROMPT, ReviewResult 19 20 from bot.tools import PhiDeps, _check_services_impl, register_all 20 21 21 22 logger = logging.getLogger("bot.agent") ··· 318 319 model=settings.agent_model, 319 320 system_prompt=f"{self.base_personality}\n\n{EXPLORATION_SYSTEM_PROMPT}", 320 321 output_type=ExplorationResult, 322 + ) 323 + 324 + # Review agent — the dream/distill pass. Reviews observations with 325 + # distance from the original conversation and decides keep/supersede/promote. 326 + self._review_agent = Agent[None, ReviewResult]( 327 + name="phi-reviewer", 328 + model=settings.agent_model, 329 + system_prompt=f"{self.base_personality}\n\n{REVIEW_SYSTEM_PROMPT}", 330 + output_type=ReviewResult, 321 331 ) 322 332 323 333 logger.info("phi agent initialized with pdsx + pub-search mcp tools") ··· 690 700 691 701 await complete(rkey) 692 702 return total_stored 703 + 704 + async def process_review(self) -> str: 705 + """Review recent observations with distance. The dream/distill pass. 706 + 707 + Fetches recent observations across user namespaces, asks the review 708 + agent to evaluate each (keep/supersede/promote), and applies the 709 + decisions. Returns a summary string for logging. 710 + """ 711 + if not self.memory: 712 + return "no memory" 713 + 714 + # gather recent observations across all user namespaces 715 + user_prefix = f"{self.memory.NAMESPACES['users']}-" 716 + observations: list[dict] = [] 717 + try: 718 + page = self.memory.client.namespaces(prefix=user_prefix) 719 + for ns_summary in page.namespaces: 720 + handle = ns_summary.id.removeprefix(user_prefix).replace("_", ".") 721 + user_ns = self.memory.client.namespace(ns_summary.id) 722 + try: 723 + response = user_ns.query( 724 + rank_by=("created_at", "desc"), 725 + top_k=10, 726 + filters=[ 727 + "And", 728 + [ 729 + ["kind", "Eq", "observation"], 730 + ["status", "NotEq", "superseded"], 731 + ], 732 + ], 733 + include_attributes=["content", "tags", "created_at"], 734 + ) 735 + if response.rows: 736 + for row in response.rows: 737 + observations.append( 738 + { 739 + "handle": handle, 740 + "id": row.id, 741 + "content": row.content, 742 + "tags": getattr(row, "tags", []), 743 + "created_at": getattr(row, "created_at", ""), 744 + } 745 + ) 746 + except Exception: 747 + pass 748 + except Exception as e: 749 + logger.warning(f"review: failed to gather observations: {e}") 750 + return f"failed: {e}" 751 + 752 + if not observations: 753 + logger.info("review: no observations to review") 754 + return "nothing to review" 755 + 756 + logger.info(f"review: examining {len(observations)} observations") 757 + 758 + # format for the review agent 759 + lines = [] 760 + for i, obs in enumerate(observations): 761 + lines.append( 762 + f"{i + 1}. [@{obs['handle']}] {obs['content']} " 763 + f"(tags: {obs['tags']}, from: {obs['created_at'][:10]})" 764 + ) 765 + prompt = f"review these {len(observations)} observations:\n\n" + "\n".join( 766 + lines 767 + ) 768 + 769 + try: 770 + result = await self._review_agent.run(prompt) 771 + except Exception as e: 772 + logger.warning(f"review agent failed: {e}") 773 + return f"review agent failed: {e}" 774 + 775 + output = result.output 776 + superseded = 0 777 + promoted = 0 778 + 779 + for i, decision in enumerate(output.decisions): 780 + if i >= len(observations): 781 + break 782 + obs = observations[i] 783 + handle = obs["handle"] 784 + 785 + if decision.action == "supersede": 786 + user_ns = self.memory.get_user_namespace(handle) 787 + user_ns.write(patch_rows=[{"id": obs["id"], "status": "superseded"}]) 788 + superseded += 1 789 + logger.info( 790 + f"review: superseded observation for @{handle}: " 791 + f"{obs['content'][:60]} ({decision.reason})" 792 + ) 793 + 794 + elif decision.action == "promote" and decision.card_title: 795 + try: 796 + from bot.tools._helpers import _create_cosmik_record 797 + from bot.types import CosmikNoteCard, NoteContent 798 + 799 + card = CosmikNoteCard( 800 + content=NoteContent( 801 + text=decision.card_description or obs["content"] 802 + ) 803 + ) 804 + uri = await _create_cosmik_record( 805 + "network.cosmik.card", card.to_record() 806 + ) 807 + promoted += 1 808 + logger.info( 809 + f"review: promoted to cosmik card for @{handle}: " 810 + f"{decision.card_title} → {uri}" 811 + ) 812 + except Exception as e: 813 + logger.warning(f"review: failed to promote: {e}") 814 + 815 + summary = ( 816 + f"reviewed {len(observations)} observations: " 817 + f"{superseded} superseded, {promoted} promoted, " 818 + f"{len(observations) - superseded - promoted} kept" 819 + ) 820 + logger.info(f"review: {summary}") 821 + return summary
+13
src/bot/main.py
··· 389 389 return {"triggered": True} 390 390 391 391 392 + @app.post("/api/control/review") 393 + async def trigger_review(request: Request, background_tasks: BackgroundTasks): 394 + """Trigger a memory review (dream/distill pass) immediately.""" 395 + if err := _check_control_token(request): 396 + return err 397 + poller: NotificationPoller | None = getattr(app.state, "poller", None) 398 + if not poller: 399 + return JSONResponse({"error": "poller not available"}, status_code=503) 400 + background_tasks.add_task(poller.handler.review_memories) 401 + logger.info("memory review triggered via API") 402 + return {"triggered": True} 403 + 404 + 392 405 @app.post("/api/control/explore") 393 406 async def trigger_explore(request: Request, background_tasks: BackgroundTasks): 394 407 """Trigger one exploration from the curiosity queue immediately."""
+52
src/bot/memory/review.py
··· 1 + """Memory review — the distill/dream pass. 2 + 3 + Runs with distance from the original conversations. Reviews recent 4 + observations and decides what to keep, supersede, or promote to 5 + public cosmik cards. Structurally matches Claude Code's /dream skill: 6 + write-time is fast and might be wrong, review-time is slow and has distance. 7 + """ 8 + 9 + from pydantic import BaseModel, Field 10 + 11 + 12 + class ObservationReview(BaseModel): 13 + """Review decision for a single observation.""" 14 + 15 + action: str = Field(description="keep, supersede, or promote") 16 + reason: str = Field(description="why this action") 17 + # only for promote: what the public cosmik card should say 18 + card_title: str | None = Field( 19 + default=None, description="title for the cosmik card if promoting" 20 + ) 21 + card_description: str | None = Field( 22 + default=None, description="description for the cosmik card if promoting" 23 + ) 24 + 25 + 26 + class ReviewResult(BaseModel): 27 + """Result of reviewing a batch of observations.""" 28 + 29 + decisions: list[ObservationReview] = Field(default_factory=list) 30 + summary: str = Field(default="", description="brief summary of what was reviewed") 31 + 32 + 33 + REVIEW_SYSTEM_PROMPT = """\ 34 + You are reviewing observations that were extracted from conversations earlier. 35 + You have distance now — you weren't in the conversation, you're looking at 36 + the extracted facts after the fact. 37 + 38 + For each observation, decide: 39 + - **keep**: the observation is accurate and worth retaining privately. 40 + - **supersede**: the observation is stale, wrong, or redundant. it should 41 + be marked superseded so it stops appearing in context. 42 + - **promote**: the observation captures something worth sharing publicly 43 + as a cosmik card on semble. provide a card_title and card_description 44 + that would make sense to someone discovering it on the network. 45 + 46 + Guidelines: 47 + - most observations should be kept. supersede only if clearly wrong or stale. 48 + - promote rarely — only observations that have value beyond phi's own use. 49 + a fact about what someone works on is private. a pattern worth naming is public. 50 + - if you're unsure, keep. the cost of keeping is low (slightly cluttered context). 51 + the cost of wrong supersession is losing real information. 52 + """
+9
src/bot/services/message_handler.py
··· 306 306 except Exception as e: 307 307 logger.warning(f"exploration failed: {e}") 308 308 309 + async def review_memories(self): 310 + """Run the dream/distill pass — review observations with distance.""" 311 + with logfire.span("memory review"): 312 + try: 313 + summary = await self.agent.process_review() 314 + logger.info(f"memory review: {summary}") 315 + except Exception as e: 316 + logger.warning(f"memory review failed: {e}") 317 + 309 318 async def daily_reflection(self): 310 319 """Generate and post a daily reflection if phi has something to say. 311 320