personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(routines): dispatch activity-anticipation cadence for upcoming activities

- Wire {"type": "activity-anticipation", "offset_minutes": N}
cadences into think.routines.check() so routines like
meeting-prep fire at start + offset (±60s) for visible
anticipated activities across all facets on the current
local day.
- _run_routine now accepts an optional
trigger_context={"activity": ..., "facet": ...} and injects
an ## Upcoming Activity block (title, type, facet,
start/end, description, details, attendee names) before the
"Execute this routine now." line; other callers are unchanged.
- Dedup is in-memory only (_fired_triggers, pruned at 2 days
per check() tick). State does not survive supervisor restarts
— acceptable since re-firing a past trigger is worse than
missing a restart-window fire.
- Unknown dict cadence types (e.g. legacy {"type": "event"})
are silently skipped with one INFO log per routine_id via
_logged_unknown_cadence; string-cron behavior is untouched.
- Known limitations: (1) anticipated activities carry no
timezone field — the routine's timezone is assumed. (2)
negative offsets that cross midnight (e.g. a 00:15 activity
with offset_minutes=-30) will miss their trigger because the
dispatcher only scans the activity-day file. (3) dedup is
written before dispatch, so a _run_routine failure will not
auto-retry.
- Follow-up: think/tools/routines.py::_validate_routine_cadence
still rejects non-string cadences, so sol call routines
create --template meeting-prep fails until that validation is
extended to accept dict cadences (and the manual routines run
path updated to supply trigger context).

+338 -3
+190
tests/test_routines.py
··· 70 70 mod._config = {} 71 71 mod._callosum = None 72 72 mod._last_fired = {} 73 + mod._fired_triggers = {} 74 + mod._logged_unknown_cadence = set() 73 75 yield 74 76 mod._config = {} 75 77 mod._callosum = None 76 78 mod._last_fired = {} 79 + mod._fired_triggers = {} 80 + mod._logged_unknown_cadence = set() 77 81 78 82 79 83 @pytest.fixture ··· 1557 1561 entry = config["_meta"]["suggestions"]["morning-briefing"] 1558 1562 assert entry["response"] == "accepted" 1559 1563 assert entry["trigger_count"] == 5 1564 + 1565 + 1566 + class TestActivityAnticipation: 1567 + @staticmethod 1568 + def _make_routine(routine_id: str, offset_minutes: int) -> dict: 1569 + return { 1570 + "id": routine_id, 1571 + "name": "Meeting prep", 1572 + "instruction": "Prepare for the upcoming activity.", 1573 + "cadence": { 1574 + "type": "activity-anticipation", 1575 + "offset_minutes": offset_minutes, 1576 + }, 1577 + "timezone": "UTC", 1578 + "enabled": True, 1579 + "facets": [], 1580 + "template": None, 1581 + "notify": False, 1582 + "last_run": None, 1583 + } 1584 + 1585 + @staticmethod 1586 + def _make_anticipated_record( 1587 + activity_id: str, 1588 + start: str, 1589 + title: str = "Sync", 1590 + description: str = "Discuss current status.", 1591 + participation=None, 1592 + *, 1593 + facet: str = "work", 1594 + ) -> dict: 1595 + return { 1596 + "id": activity_id, 1597 + "activity": "meeting", 1598 + "target_date": "2026-04-18", 1599 + "start": start, 1600 + "end": "10:30:00", 1601 + "title": title, 1602 + "description": description, 1603 + "details": "Review open items.", 1604 + "facet": facet, 1605 + "source": "anticipated", 1606 + "participation": participation or [], 1607 + "hidden": False, 1608 + } 1609 + 1610 + @staticmethod 1611 + def _seed_activity_record(facet: str, day: str, record: dict) -> None: 1612 + from think.activities import append_activity_record 1613 + from think.facets import create_facet 1614 + 1615 + title = " ".join(part.capitalize() for part in facet.split("-")) 1616 + slug = create_facet(title) 1617 + assert slug == facet 1618 + written = append_activity_record(facet, day, record) 1619 + assert written is True 1620 + 1621 + def test_dispatch_fires_and_injects_prompt(self, journal_path): 1622 + import think.routines as mod 1623 + 1624 + save_config({"routine-1": self._make_routine("routine-1", -30)}) 1625 + self._seed_activity_record( 1626 + "work", 1627 + "20260418", 1628 + self._make_anticipated_record( 1629 + "anticipated_meeting_100000_0418", 1630 + "10:00:00", 1631 + title="Roadmap Sync", 1632 + description="Discuss Q2 roadmap.", 1633 + participation=[ 1634 + {"role": "attendee", "name": "Alex Rivera"}, 1635 + {"role": "attendee", "name": "Jordan Lee"}, 1636 + {"role": "organizer", "name": "Morgan Shaw"}, 1637 + ], 1638 + ), 1639 + ) 1640 + 1641 + dt = datetime(2026, 4, 18, 9, 30, tzinfo=timezone.utc) 1642 + with ( 1643 + patch( 1644 + "think.routines.cortex_request", return_value="fake_agent_id" 1645 + ) as mock_req, 1646 + patch( 1647 + "think.routines.wait_for_uses", 1648 + return_value=({"fake_agent_id": "finish"}, []), 1649 + ), 1650 + patch("think.routines.callosum_send", return_value=True), 1651 + _fake_now(dt), 1652 + ): 1653 + mod.check() 1654 + 1655 + mock_req.assert_called_once() 1656 + prompt = mock_req.call_args.kwargs["prompt"] 1657 + assert prompt.index("## Upcoming Activity") < prompt.index( 1658 + "Execute this routine now." 1659 + ) 1660 + assert "Roadmap Sync" in prompt 1661 + assert "10:00:00" in prompt 1662 + assert "Discuss Q2 roadmap." in prompt 1663 + assert "Alex Rivera" in prompt 1664 + assert "Jordan Lee" in prompt 1665 + assert "Morgan Shaw" not in prompt 1666 + 1667 + def test_same_minute_fires_only_once(self, journal_path): 1668 + import think.routines as mod 1669 + 1670 + save_config({"routine-1": self._make_routine("routine-1", -30)}) 1671 + self._seed_activity_record( 1672 + "work", 1673 + "20260418", 1674 + self._make_anticipated_record( 1675 + "anticipated_meeting_100000_0418", 1676 + "10:00:00", 1677 + ), 1678 + ) 1679 + 1680 + dt = datetime(2026, 4, 18, 9, 30, tzinfo=timezone.utc) 1681 + with ( 1682 + patch( 1683 + "think.routines.cortex_request", return_value="fake_agent_id" 1684 + ) as mock_req, 1685 + patch( 1686 + "think.routines.wait_for_uses", 1687 + return_value=({"fake_agent_id": "finish"}, []), 1688 + ), 1689 + patch("think.routines.callosum_send", return_value=True), 1690 + _fake_now(dt), 1691 + ): 1692 + mod.check() 1693 + mod.check() 1694 + 1695 + assert mock_req.call_count == 1 1696 + 1697 + def test_hidden_records_are_skipped(self, journal_path): 1698 + import think.routines as mod 1699 + 1700 + save_config({"routine-1": self._make_routine("routine-1", -30)}) 1701 + record = self._make_anticipated_record( 1702 + "anticipated_meeting_100000_0418", 1703 + "10:00:00", 1704 + ) 1705 + record["hidden"] = True 1706 + self._seed_activity_record("work", "20260418", record) 1707 + 1708 + dt = datetime(2026, 4, 18, 9, 30, tzinfo=timezone.utc) 1709 + with ( 1710 + patch( 1711 + "think.routines.cortex_request", return_value="fake_agent_id" 1712 + ) as mock_req, 1713 + patch( 1714 + "think.routines.wait_for_uses", 1715 + return_value=({"fake_agent_id": "finish"}, []), 1716 + ), 1717 + patch("think.routines.callosum_send", return_value=True), 1718 + _fake_now(dt), 1719 + ): 1720 + mod.check() 1721 + 1722 + mock_req.assert_not_called() 1723 + 1724 + def test_non_anticipated_records_are_skipped(self, journal_path): 1725 + import think.routines as mod 1726 + 1727 + save_config({"routine-1": self._make_routine("routine-1", -30)}) 1728 + record = self._make_anticipated_record( 1729 + "anticipated_meeting_100000_0418", 1730 + "10:00:00", 1731 + ) 1732 + record["source"] = "completed" 1733 + self._seed_activity_record("work", "20260418", record) 1734 + 1735 + dt = datetime(2026, 4, 18, 9, 30, tzinfo=timezone.utc) 1736 + with ( 1737 + patch( 1738 + "think.routines.cortex_request", return_value="fake_agent_id" 1739 + ) as mock_req, 1740 + patch( 1741 + "think.routines.wait_for_uses", 1742 + return_value=({"fake_agent_id": "finish"}, []), 1743 + ), 1744 + patch("think.routines.callosum_send", return_value=True), 1745 + _fake_now(dt), 1746 + ): 1747 + mod.check() 1748 + 1749 + mock_req.assert_not_called()
+148 -3
think/routines.py
··· 31 31 _config: dict[str, dict[str, Any]] = {} 32 32 _callosum: Any = None 33 33 _last_fired: dict[str, str] = {} # routine_id -> "YYYY-MM-DD HH:MM" of last fire 34 + _fired_triggers: dict[str, dict[str, str]] = {} 35 + _logged_unknown_cadence: set[str] = set() 34 36 35 37 36 38 def _parse_cron_field(field: str, min_val: int, max_val: int) -> set[int]: ··· 208 210 ) 209 211 210 212 211 - def _run_routine(routine: dict) -> None: 213 + def _render_upcoming_activity_block(activity: dict | None, facet: str) -> str: 214 + """Render an upcoming activity context block for routine prompts.""" 215 + if not activity: 216 + return "" 217 + 218 + participation = activity.get("participation") or [] 219 + if not isinstance(participation, list): 220 + participation = [] 221 + attendees = ", ".join( 222 + str(entry.get("name") or "").strip() 223 + for entry in participation 224 + if isinstance(entry, dict) 225 + and entry.get("role") == "attendee" 226 + and str(entry.get("name") or "").strip() 227 + ) 228 + attendees = attendees or "(none listed)" 229 + 230 + title = str(activity.get("title") or "") 231 + activity_type = str(activity.get("activity") or "(unknown)") 232 + start = str(activity.get("start") or "(unknown)") 233 + end = str(activity.get("end") or "(unknown)") 234 + description = str(activity.get("description") or "(none)") 235 + details = str(activity.get("details") or "(none)") 236 + facet_display = str(facet or "(none)") 237 + 238 + return ( 239 + "## Upcoming Activity\n\n" 240 + f"- **Title:** {title}\n" 241 + f"- **Type:** {activity_type}\n" 242 + f"- **Facet:** {facet_display}\n" 243 + f"- **Start:** {start}\n" 244 + f"- **End:** {end}\n" 245 + f"- **Description:** {description}\n" 246 + f"- **Details:** {details}\n" 247 + f"- **Attendees:** {attendees}\n\n" 248 + ) 249 + 250 + 251 + def _run_routine(routine: dict, trigger_context: dict | None = None) -> None: 212 252 """Execute a single routine and persist its outcome.""" 213 253 routine_id = str(routine.get("id", "unknown")) 214 254 name = str(routine.get("name", routine_id)) ··· 217 257 218 258 try: 219 259 instruction = str(routine.get("instruction", "")) 220 - cadence = str(routine.get("cadence", "")) 221 260 facets = routine.get("facets") or [] 222 261 _template = routine.get("template") 223 262 _notify = bool(routine.get("notify", False)) ··· 238 277 previous_line = ( 239 278 f"**Previous output:** {prev_output_path}" if prev_output_path else "" 240 279 ) 280 + cadence_raw = routine.get("cadence") 281 + if isinstance(cadence_raw, dict): 282 + cadence_display = str(cadence_raw.get("type", "")) 283 + else: 284 + cadence_display = str(cadence_raw or "") 285 + 286 + upcoming_block = "" 287 + if trigger_context and "activity" in trigger_context: 288 + upcoming_block = _render_upcoming_activity_block( 289 + trigger_context["activity"], trigger_context.get("facet", "") 290 + ) 291 + 241 292 prompt = ( 242 293 f"## Routine: {name}\n\n" 243 294 f"**Instruction:** {instruction}\n\n" 244 - f"**Cadence:** {cadence}\n" 295 + f"**Cadence:** {cadence_display}\n" 245 296 f"{facets_line}\n" 246 297 f"{previous_line}\n\n" 298 + f"{upcoming_block}" 247 299 "Execute this routine now. Write your output as concise, actionable markdown.\n" 248 300 ) 249 301 ··· 310 362 ) 311 363 except Exception: 312 364 logger.exception("Failed to emit routine completion for %s", routine_id) 365 + 366 + 367 + def _prune_fired_triggers(*, now_utc: datetime) -> None: 368 + """Drop in-memory activity trigger dedupe entries older than two days.""" 369 + from datetime import timedelta 370 + 371 + cutoff = now_utc - timedelta(days=2) 372 + for routine_id, fired_for_routine in list(_fired_triggers.items()): 373 + for activity_id, fired_at in list(fired_for_routine.items()): 374 + try: 375 + fired_dt = real_datetime.fromisoformat(fired_at) 376 + except ValueError: 377 + del fired_for_routine[activity_id] 378 + continue 379 + if fired_dt < cutoff: 380 + del fired_for_routine[activity_id] 381 + if not fired_for_routine: 382 + del _fired_triggers[routine_id] 313 383 314 384 315 385 def check() -> None: 316 386 """Reload config and run any due routines.""" 317 387 global _config 318 388 _config = get_config() 389 + _prune_fired_triggers(now_utc=datetime.now(timezone.utc)) 319 390 320 391 config_changed = False 321 392 for routine in _config.values(): ··· 368 439 if cron_matches(cadence, local_now): 369 440 _last_fired[routine_id] = minute_key 370 441 _run_routine(routine) 442 + elif ( 443 + isinstance(cadence, dict) and cadence.get("type") == "activity-anticipation" 444 + ): 445 + from datetime import timedelta 446 + 447 + from think.activities import load_activity_records 448 + from think.facets import get_facets 449 + 450 + offset_raw = cadence.get("offset_minutes", 0) 451 + try: 452 + offset_minutes = int(offset_raw) 453 + except (TypeError, ValueError): 454 + logger.warning( 455 + "Routine %s has invalid offset_minutes %r, skipping", 456 + routine_id, 457 + offset_raw, 458 + ) 459 + continue 460 + 461 + day_str = local_now.strftime("%Y%m%d") 462 + fired_for_routine = _fired_triggers.setdefault(routine_id, {}) 463 + 464 + for facet_name in get_facets().keys(): 465 + try: 466 + records = load_activity_records(facet_name, day_str) 467 + except Exception: 468 + logger.warning( 469 + "Failed loading activities for facet %s on %s", 470 + facet_name, 471 + day_str, 472 + exc_info=True, 473 + ) 474 + continue 475 + 476 + for record in records: 477 + if record.get("source") != "anticipated": 478 + continue 479 + activity_id = record.get("id") 480 + if not activity_id or activity_id in fired_for_routine: 481 + continue 482 + start_str = record.get("start") 483 + if not start_str: 484 + continue 485 + try: 486 + time_parts = [int(x) for x in str(start_str).split(":")] 487 + except (ValueError, AttributeError): 488 + continue 489 + if len(time_parts) == 2: 490 + h, m = time_parts 491 + s = 0 492 + elif len(time_parts) == 3: 493 + h, m, s = time_parts 494 + else: 495 + continue 496 + start_dt = local_now.replace( 497 + hour=h, minute=m, second=s, microsecond=0 498 + ) 499 + trigger_dt = start_dt + timedelta(minutes=offset_minutes) 500 + if abs((local_now - trigger_dt).total_seconds()) > 60: 501 + continue 502 + fired_for_routine[activity_id] = now_utc.isoformat() 503 + _run_routine( 504 + routine, 505 + trigger_context={"activity": record, "facet": facet_name}, 506 + ) 507 + elif isinstance(cadence, dict): 508 + cadence_type = str(cadence.get("type", "unknown")) 509 + if routine_id not in _logged_unknown_cadence: 510 + logger.info( 511 + "Routine %s has unsupported cadence type %r, skipping", 512 + routine_id, 513 + cadence_type, 514 + ) 515 + _logged_unknown_cadence.add(routine_id) 371 516 372 517 373 518 def save_state() -> None: