personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor activity record storage for schema defaults and locking

+860 -74
+10 -1
talent/activities.py
··· 489 489 for activity in activities: 490 490 record_id = activity.get("id", "") 491 491 description = activity.get("description", "") 492 + title = activity.get("title") or None 493 + details = activity.get("details") or None 492 494 if record_id and description: 493 - if update_record_description(facet, day, record_id, description): 495 + if update_record_description( 496 + facet, 497 + day, 498 + record_id, 499 + description, 500 + title=title, 501 + details=details, 502 + ): 494 503 updated_count += 1 495 504 llm_descriptions.setdefault(facet, {})[record_id] = description 496 505
+258
tests/test_activities.py
··· 8 8 import tempfile 9 9 from pathlib import Path 10 10 11 + import pytest 12 + 11 13 12 14 def test_get_default_activities(): 13 15 """Test that default activities are returned correctly.""" ··· 529 531 assert len(records) == 1 530 532 assert records[0]["id"] == "coding_100000_300" 531 533 assert records[0]["segments"] == ["100000_300", "100500_300"] 534 + assert records[0]["title"] == "Test coding session" 535 + assert records[0]["details"] == "" 536 + assert records[0]["hidden"] is False 537 + assert records[0]["edits"] == [] 532 538 533 539 def test_append_idempotent(self, monkeypatch): 534 540 from think.activities import append_activity_record, load_activity_records ··· 582 588 583 589 records = load_activity_records("work", "20260209") 584 590 assert records[0]["description"] == "Updated description" 591 + assert records[0]["title"] == "Updated description" 592 + assert records[0]["details"] == "" 593 + 594 + def test_update_description_with_title_and_details(self, monkeypatch): 595 + from think.activities import ( 596 + append_activity_record, 597 + load_activity_records, 598 + update_record_description, 599 + ) 600 + 601 + with tempfile.TemporaryDirectory() as tmpdir: 602 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 603 + 604 + record = { 605 + "id": "coding_100000_300", 606 + "activity": "coding", 607 + "description": "Original description", 608 + "segments": ["100000_300"], 609 + "created_at": 1234567890000, 610 + } 611 + 612 + append_activity_record("work", "20260209", record) 613 + result = update_record_description( 614 + "work", 615 + "20260209", 616 + "coding_100000_300", 617 + "Updated description", 618 + title="Focused coding", 619 + details="Pairing with Alex on tests.", 620 + ) 621 + 622 + assert result is True 623 + records = load_activity_records("work", "20260209") 624 + assert records[0]["description"] == "Updated description" 625 + assert records[0]["title"] == "Focused coding" 626 + assert records[0]["details"] == "Pairing with Alex on tests." 627 + 628 + def test_update_description_none_title_and_details_only_updates_description( 629 + self, monkeypatch 630 + ): 631 + from think.activities import ( 632 + append_activity_record, 633 + load_activity_records, 634 + update_record_description, 635 + ) 636 + 637 + with tempfile.TemporaryDirectory() as tmpdir: 638 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 639 + 640 + append_activity_record( 641 + "work", 642 + "20260209", 643 + { 644 + "id": "coding_100000_300", 645 + "activity": "coding", 646 + "title": "Existing title", 647 + "details": "Existing details", 648 + "description": "Original description", 649 + "segments": ["100000_300"], 650 + "created_at": 1234567890000, 651 + }, 652 + ) 653 + 654 + assert ( 655 + update_record_description( 656 + "work", 657 + "20260209", 658 + "coding_100000_300", 659 + "Updated description", 660 + title=None, 661 + details=None, 662 + ) 663 + is True 664 + ) 665 + 666 + records = load_activity_records("work", "20260209") 667 + assert records[0]["description"] == "Updated description" 668 + assert records[0]["title"] == "Existing title" 669 + assert records[0]["details"] == "Existing details" 585 670 586 671 def test_update_nonexistent_returns_false(self, monkeypatch): 587 672 from think.activities import update_record_description ··· 630 715 assert records[0]["description"] == "Updated first" 631 716 assert records[1]["description"] == "Second" 632 717 718 + def test_update_activity_record_appends_edit(self, monkeypatch): 719 + from think.activities import ( 720 + append_activity_record, 721 + load_activity_records, 722 + update_activity_record, 723 + ) 724 + 725 + with tempfile.TemporaryDirectory() as tmpdir: 726 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 727 + 728 + append_activity_record( 729 + "work", 730 + "20260209", 731 + { 732 + "id": "coding_100000_300", 733 + "activity": "coding", 734 + "description": "Original description", 735 + "segments": ["100000_300"], 736 + "created_at": 1234567890000, 737 + }, 738 + ) 739 + 740 + updated = update_activity_record( 741 + "work", 742 + "20260209", 743 + "coding_100000_300", 744 + {"title": "Focused coding", "details": "Updated details"}, 745 + actor="cli:update", 746 + note="updated fields: details, title", 747 + ) 748 + 749 + assert updated is not None 750 + assert updated["title"] == "Focused coding" 751 + assert updated["details"] == "Updated details" 752 + assert updated["edits"][-1]["actor"] == "cli:update" 753 + assert updated["edits"][-1]["fields"] == ["title", "details"] 754 + 755 + records = load_activity_records("work", "20260209") 756 + assert records[0]["edits"][-1]["note"] == "updated fields: details, title" 757 + 758 + def test_update_activity_record_validates_patch(self, monkeypatch): 759 + from think.activities import update_activity_record 760 + 761 + with tempfile.TemporaryDirectory() as tmpdir: 762 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 763 + 764 + with pytest.raises(ValueError, match="patch cannot be empty"): 765 + update_activity_record( 766 + "work", 767 + "20260209", 768 + "coding_100000_300", 769 + {}, 770 + actor="cli:update", 771 + note="no-op", 772 + ) 773 + 774 + with pytest.raises(ValueError, match="disallowed fields"): 775 + update_activity_record( 776 + "work", 777 + "20260209", 778 + "coding_100000_300", 779 + {"activity": "meeting"}, 780 + actor="cli:update", 781 + note="bad field", 782 + ) 783 + 784 + def test_hidden_records_filtered_by_default(self, monkeypatch): 785 + from think.activities import ( 786 + append_activity_record, 787 + load_activity_records, 788 + mute_activity_record, 789 + unmute_activity_record, 790 + ) 791 + 792 + with tempfile.TemporaryDirectory() as tmpdir: 793 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 794 + 795 + append_activity_record( 796 + "work", 797 + "20260209", 798 + { 799 + "id": "coding_100000_300", 800 + "activity": "coding", 801 + "description": "Original description", 802 + "segments": ["100000_300"], 803 + "created_at": 1234567890000, 804 + }, 805 + ) 806 + 807 + muted = mute_activity_record( 808 + "work", 809 + "20260209", 810 + "coding_100000_300", 811 + actor="cli:mute", 812 + reason="too noisy", 813 + ) 814 + assert muted is not None 815 + assert muted["hidden"] is True 816 + assert muted["edits"][-1]["note"] == "too noisy" 817 + 818 + assert load_activity_records("work", "20260209") == [] 819 + hidden_records = load_activity_records( 820 + "work", "20260209", include_hidden=True 821 + ) 822 + assert len(hidden_records) == 1 823 + assert hidden_records[0]["hidden"] is True 824 + 825 + hidden_count = len(hidden_records[0]["edits"]) 826 + muted_again = mute_activity_record( 827 + "work", 828 + "20260209", 829 + "coding_100000_300", 830 + actor="cli:mute", 831 + reason="still noisy", 832 + ) 833 + assert muted_again is not None 834 + assert len(muted_again["edits"]) == hidden_count 835 + 836 + unmuted = unmute_activity_record( 837 + "work", 838 + "20260209", 839 + "coding_100000_300", 840 + actor="cli:unmute", 841 + reason=None, 842 + ) 843 + assert unmuted is not None 844 + assert unmuted["hidden"] is False 845 + assert unmuted["edits"][-1]["note"] == "unmuted" 846 + assert len(load_activity_records("work", "20260209")) == 1 847 + 633 848 634 849 # --------------------------------------------------------------------------- 635 850 # Activities Agent Hooks (talent/activities.py) ··· 1149 1364 "work": [ 1150 1365 { 1151 1366 "id": "coding_100000_300", 1367 + "title": "Coding summary", 1368 + "details": "Worked through test failures and cleanup.", 1152 1369 "description": "Synthesized full description of coding session", 1153 1370 } 1154 1371 ] ··· 1162 1379 records[0]["description"] 1163 1380 == "Synthesized full description of coding session" 1164 1381 ) 1382 + assert records[0]["title"] == "Coding summary" 1383 + assert records[0]["details"] == "Worked through test failures and cleanup." 1384 + 1385 + def test_updates_descriptions_without_optional_fields(self, monkeypatch): 1386 + from talent.activities import post_process 1387 + from think.activities import append_activity_record, load_activity_records 1388 + 1389 + with tempfile.TemporaryDirectory() as tmpdir: 1390 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 1391 + 1392 + append_activity_record( 1393 + "work", 1394 + "20260209", 1395 + { 1396 + "id": "coding_100000_300", 1397 + "activity": "coding", 1398 + "title": "Existing title", 1399 + "details": "Existing details", 1400 + "description": "Preliminary description", 1401 + "segments": ["100000_300"], 1402 + "created_at": 1, 1403 + }, 1404 + ) 1405 + 1406 + llm_result = json.dumps( 1407 + { 1408 + "work": [ 1409 + { 1410 + "id": "coding_100000_300", 1411 + "description": "Only description changed", 1412 + } 1413 + ] 1414 + } 1415 + ) 1416 + 1417 + post_process(llm_result, {"day": "20260209"}) 1418 + 1419 + records = load_activity_records("work", "20260209") 1420 + assert records[0]["description"] == "Only description changed" 1421 + assert records[0]["title"] == "Existing title" 1422 + assert records[0]["details"] == "Existing details" 1165 1423 1166 1424 def test_handles_invalid_json(self): 1167 1425 from talent.activities import post_process
+83
tests/test_activities_locking.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + import threading 6 + 7 + 8 + def test_locked_modify_serializes_concurrent_edits(tmp_path, monkeypatch): 9 + from think.activities import ( 10 + append_activity_record, 11 + append_edit, 12 + load_activity_records, 13 + locked_modify, 14 + ) 15 + 16 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 17 + 18 + facet = "work" 19 + day = "20260418" 20 + record_id = "coding_090000_300" 21 + record_path = tmp_path / "facets" / facet / "activities" / f"{day}.jsonl" 22 + 23 + append_activity_record( 24 + facet, 25 + day, 26 + { 27 + "id": record_id, 28 + "activity": "coding", 29 + "description": "Initial description", 30 + "segments": ["090000_300"], 31 + "created_at": 1, 32 + }, 33 + ) 34 + 35 + first_inside_lock = threading.Event() 36 + release_first = threading.Event() 37 + 38 + def worker(actor: str, note: str, hold_lock: bool = False) -> None: 39 + def modify_fn(records: list[dict]) -> list[dict]: 40 + updated = [] 41 + for record in records: 42 + if record.get("id") == record_id: 43 + record = append_edit( 44 + record, 45 + actor=actor, 46 + fields=["details"], 47 + note=note, 48 + ) 49 + if hold_lock: 50 + first_inside_lock.set() 51 + release_first.wait(timeout=2) 52 + updated.append(record) 53 + return updated 54 + 55 + locked_modify(record_path, modify_fn) 56 + 57 + first = threading.Thread( 58 + target=worker, args=("cli:update", "first writer"), kwargs={"hold_lock": True} 59 + ) 60 + second = threading.Thread( 61 + target=worker, args=("cli:mute", "second writer"), kwargs={"hold_lock": False} 62 + ) 63 + 64 + first.start() 65 + assert first_inside_lock.wait(timeout=2) 66 + second.start() 67 + release_first.set() 68 + 69 + first.join(timeout=2) 70 + second.join(timeout=2) 71 + assert not first.is_alive() 72 + assert not second.is_alive() 73 + 74 + records = load_activity_records(facet, day, include_hidden=True) 75 + assert len(records) == 1 76 + assert [edit["note"] for edit in records[0]["edits"]] == [ 77 + "first writer", 78 + "second writer", 79 + ] 80 + 81 + raw_lines = record_path.read_text(encoding="utf-8").splitlines() 82 + assert len(raw_lines) == 1 83 + assert json.loads(raw_lines[0])["edits"][1]["actor"] == "cli:mute"
+3
tests/test_activity_record_merge.py
··· 82 82 assert record["participation_confidence"] == 0.77 83 83 assert record["participation"][0]["entity_id"] == "john_borthwick" 84 84 assert record["participation"][1]["entity_id"] is None 85 + assert record["title"] == "Team sync" 86 + assert record["details"] == "" 87 + assert record["hidden"] is False 85 88 86 89 87 90 def test_participation_post_hook_leaves_file_unchanged_on_malformed_json(
+3
tests/test_participation_resolver.py
··· 93 93 record = load_activity_records(facet, day)[0] 94 94 assert record["participation"][0]["entity_id"] == "john_borthwick" 95 95 assert record["participation"][1]["entity_id"] is None 96 + assert record["title"] == "Team sync" 97 + assert record["details"] == "" 98 + assert record["hidden"] is False
+36 -2
tests/test_think_activity.py
··· 296 296 297 297 assert result is False 298 298 299 - def test_empty_segments_returns_false(self, monkeypatch): 299 + def test_empty_segments_returns_true_for_synthetic_record(self, monkeypatch): 300 300 from think.thinking import run_activity_prompts 301 301 302 302 with tempfile.TemporaryDirectory() as tmpdir: ··· 322 322 facet="work", 323 323 ) 324 324 325 - assert result is False 325 + assert result is True 326 + 327 + def test_cogitate_source_returns_true_without_running_agents(self, monkeypatch): 328 + from think.thinking import run_activity_prompts 329 + 330 + with tempfile.TemporaryDirectory() as tmpdir: 331 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 332 + 333 + self._write_record( 334 + tmpdir, 335 + "work", 336 + "20260209", 337 + { 338 + "id": "coding_100000_300", 339 + "activity": "coding", 340 + "source": "cogitate", 341 + "segments": ["100000_300"], 342 + "level_avg": 0.5, 343 + "description": "Synthetic", 344 + "active_entities": [], 345 + }, 346 + ) 347 + 348 + monkeypatch.setattr( 349 + "think.thinking.get_talent_configs", 350 + lambda schedule: {"session_review": {"activities": ["*"]}}, 351 + ) 352 + 353 + result = run_activity_prompts( 354 + day="20260209", 355 + activity_id="coding_100000_300", 356 + facet="work", 357 + ) 358 + 359 + assert result is True 326 360 327 361 def test_emits_think_events(self, monkeypatch): 328 362 from think.thinking import run_activity_prompts
+444 -60
think/activities.py
··· 10 10 stored as facets/{facet}/activities/{day}.jsonl. 11 11 """ 12 12 13 + import fcntl 13 14 import json 14 15 import logging 15 16 import os 17 + import random 16 18 import re 19 + import tempfile 20 + import time 21 + from datetime import UTC, datetime 17 22 from pathlib import Path 18 23 from typing import Any 19 24 ··· 266 271 def _save_activities_jsonl(facet: str, activities: list[dict[str, Any]]) -> None: 267 272 """Save activities to a facet's JSONL file.""" 268 273 path = _get_activities_path(facet) 269 - path.parent.mkdir(parents=True, exist_ok=True) 270 274 271 - with open(path, "w", encoding="utf-8") as f: 272 - for activity in activities: 273 - f.write(json.dumps(activity, ensure_ascii=False) + "\n") 275 + def modify_fn(_existing: list[dict[str, Any]]) -> list[dict[str, Any]]: 276 + return [dict(activity) for activity in activities] 277 + 278 + locked_modify(path, modify_fn, create_if_missing=True) 274 279 275 280 276 281 def get_facet_activities(facet: str) -> list[dict[str, Any]]: ··· 681 686 return Path(get_journal()) / "facets" / facet / "activities" / f"{day}.jsonl" 682 687 683 688 689 + def _read_jsonl_records(path: Path) -> list[dict[str, Any]]: 690 + """Load JSONL entries from *path*, skipping malformed lines.""" 691 + if not path.exists(): 692 + return [] 693 + 694 + records: list[dict[str, Any]] = [] 695 + with open(path, "r", encoding="utf-8") as handle: 696 + for line_num, line in enumerate(handle, 1): 697 + line = line.strip() 698 + if not line: 699 + continue 700 + try: 701 + data = json.loads(line) 702 + except json.JSONDecodeError as exc: 703 + logger.warning( 704 + "Skipping malformed line %d in %s: %s", line_num, path, exc 705 + ) 706 + continue 707 + if isinstance(data, dict): 708 + records.append(data) 709 + return records 710 + 711 + 712 + def _write_jsonl_records(path: Path, records: list[dict[str, Any]]) -> None: 713 + """Atomically write JSONL entries to *path*.""" 714 + path.parent.mkdir(parents=True, exist_ok=True) 715 + fd, tmp_name = tempfile.mkstemp(dir=path.parent, suffix=".tmp") 716 + try: 717 + with os.fdopen(fd, "w", encoding="utf-8") as handle: 718 + for record in records: 719 + handle.write(json.dumps(record, ensure_ascii=False) + "\n") 720 + os.replace(tmp_name, path) 721 + except BaseException: 722 + if os.path.exists(tmp_name): 723 + os.unlink(tmp_name) 724 + raise 725 + 726 + 727 + def _fallback_activity_title(record: dict[str, Any]) -> str: 728 + """Return the best available title for an activity record.""" 729 + title = str(record.get("title") or "").strip() 730 + if title: 731 + return title 732 + 733 + description = str(record.get("description") or "").strip() 734 + if description: 735 + return description 736 + 737 + activity = str(record.get("activity") or record.get("id") or "").strip() 738 + if activity: 739 + return activity.replace("_", " ").title() 740 + 741 + return "Untitled activity" 742 + 743 + 744 + def _normalize_activity_record(record: dict[str, Any]) -> dict[str, Any]: 745 + """Return a normalized activity record copy with schema defaults.""" 746 + normalized = dict(record) 747 + normalized["title"] = _fallback_activity_title(record) 748 + normalized["details"] = str(record.get("details") or "") 749 + normalized["hidden"] = bool(record.get("hidden", False)) 750 + 751 + edits = record.get("edits") 752 + normalized["edits"] = ( 753 + [dict(edit) for edit in edits if isinstance(edit, dict)] 754 + if isinstance(edits, list) 755 + else [] 756 + ) 757 + return normalized 758 + 759 + 760 + def locked_modify( 761 + path: Path, 762 + modify_fn: Any, 763 + *, 764 + create_if_missing: bool = False, 765 + max_retries: int = 3, 766 + ) -> None: 767 + """Perform a locked load-modify-save cycle on a JSONL file.""" 768 + lock_path = path.parent / f"{path.name}.lock" 769 + 770 + last_error: OSError | None = None 771 + for attempt in range(max_retries): 772 + try: 773 + path.parent.mkdir(parents=True, exist_ok=True) 774 + with open(lock_path, "w", encoding="utf-8") as lock_file: 775 + fcntl.flock(lock_file, fcntl.LOCK_EX) 776 + try: 777 + existed = path.exists() 778 + if not existed and not create_if_missing: 779 + raise FileNotFoundError(path) 780 + current = _read_jsonl_records(path) if existed else [] 781 + updated = modify_fn([dict(item) for item in current]) 782 + if not isinstance(updated, list): 783 + raise TypeError("modify_fn must return list[dict]") 784 + if not existed and not updated: 785 + return 786 + if existed and updated == current: 787 + return 788 + _write_jsonl_records(path, updated) 789 + finally: 790 + fcntl.flock(lock_file, fcntl.LOCK_UN) 791 + return 792 + except (FileNotFoundError, TypeError, ValueError): 793 + raise 794 + except OSError as exc: 795 + last_error = exc 796 + if attempt < max_retries - 1: 797 + time.sleep(random.uniform(0.05, 0.3) * (attempt + 1)) 798 + 799 + if last_error is not None: 800 + raise last_error 801 + 802 + 803 + def append_edit( 804 + record: dict[str, Any], *, actor: str, fields: list[str], note: str 805 + ) -> dict[str, Any]: 806 + """Append an edit entry to an activity record and return the record.""" 807 + normalized = _normalize_activity_record(record) 808 + edits = [dict(edit) for edit in normalized.get("edits", [])] 809 + edits.append( 810 + { 811 + "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 812 + "actor": actor, 813 + "fields": list(fields), 814 + "note": note, 815 + } 816 + ) 817 + normalized["edits"] = edits 818 + return normalized 819 + 820 + 684 821 def get_activity_output_path( 685 822 facet: str, 686 823 day: str, ··· 720 857 ) 721 858 722 859 723 - def load_activity_records(facet: str, day: str) -> list[dict[str, Any]]: 860 + def load_activity_records( 861 + facet: str, day: str, *, include_hidden: bool = False 862 + ) -> list[dict[str, Any]]: 724 863 """Load activity records for a facet and day. 725 864 726 865 Returns list of record dicts, empty list if file doesn't exist. 727 866 """ 728 867 path = _get_records_path(facet, day) 729 - if not path.exists(): 730 - return [] 731 - 732 - records = [] 733 - with open(path, "r", encoding="utf-8") as f: 734 - for line in f: 735 - line = line.strip() 736 - if line: 737 - try: 738 - records.append(json.loads(line)) 739 - except json.JSONDecodeError: 740 - continue 741 - return records 868 + records = [ 869 + _normalize_activity_record(record) for record in _read_jsonl_records(path) 870 + ] 871 + if include_hidden: 872 + return records 873 + return [record for record in records if not record.get("hidden", False)] 742 874 743 875 744 876 def load_record_ids(facet: str, day: str) -> set[str]: 745 877 """Load just the IDs of existing activity records for idempotency checks.""" 746 - return {r["id"] for r in load_activity_records(facet, day) if "id" in r} 878 + return { 879 + r["id"] 880 + for r in load_activity_records(facet, day, include_hidden=True) 881 + if "id" in r 882 + } 747 883 748 884 749 885 def append_activity_record( ··· 762 898 Returns: 763 899 True if record was written, False if duplicate ID found. 764 900 """ 901 + del _checked # retained for compatibility; duplicate checks now happen under lock 765 902 path = _get_records_path(facet, day) 903 + written = False 766 904 767 - if not _checked: 768 - # Check for existing ID 769 - existing_ids = load_record_ids(facet, day) 770 - if record.get("id") in existing_ids: 771 - return False 905 + def modify_fn(records: list[dict[str, Any]]) -> list[dict[str, Any]]: 906 + nonlocal written 907 + record_id = record.get("id") 908 + if record_id and any(item.get("id") == record_id for item in records): 909 + return records 910 + written = True 911 + return records + [_normalize_activity_record(record)] 772 912 773 - path.parent.mkdir(parents=True, exist_ok=True) 774 - with open(path, "a", encoding="utf-8") as f: 775 - f.write(json.dumps(record, ensure_ascii=False) + "\n") 776 - return True 913 + locked_modify(path, modify_fn, create_if_missing=True) 914 + return written 777 915 778 916 779 917 def update_record_fields( ··· 786 924 787 925 Returns True if record was found and updated, False otherwise. 788 926 """ 789 - import tempfile 790 - 791 927 path = _get_records_path(facet, day) 792 - if not path.exists(): 793 - return False 794 - 795 - lines = path.read_text(encoding="utf-8").splitlines() 796 928 updated = False 797 - new_lines = [] 798 - for line in lines: 799 - line = line.strip() 800 - if not line: 801 - continue 802 - try: 803 - record = json.loads(line) 804 - except json.JSONDecodeError: 805 - new_lines.append(line) 806 - continue 807 929 808 - if record.get("id") == record_id: 809 - record.update(fields) 810 - updated = True 930 + try: 811 931 812 - new_lines.append(json.dumps(record, ensure_ascii=False)) 932 + def modify_fn(records: list[dict[str, Any]]) -> list[dict[str, Any]]: 933 + nonlocal updated 934 + new_records: list[dict[str, Any]] = [] 935 + for record in records: 936 + if record.get("id") == record_id: 937 + merged = dict(record) 938 + merged.update(fields) 939 + new_records.append(_normalize_activity_record(merged)) 940 + updated = True 941 + else: 942 + new_records.append(record) 943 + return new_records 813 944 814 - if updated: 815 - content = "\n".join(new_lines) + "\n" 816 - fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp") 817 - try: 818 - with os.fdopen(fd, "w", encoding="utf-8") as f: 819 - f.write(content) 820 - os.replace(tmp, path) 821 - except BaseException: 822 - os.unlink(tmp) 823 - raise 945 + locked_modify(path, modify_fn) 946 + except FileNotFoundError: 947 + return False 824 948 825 949 return updated 826 950 827 951 828 952 def update_record_description( 829 - facet: str, day: str, record_id: str, description: str 953 + facet: str, 954 + day: str, 955 + record_id: str, 956 + description: str, 957 + *, 958 + title: str | None = None, 959 + details: str | None = None, 830 960 ) -> bool: 831 961 """Update the description of an existing activity record.""" 832 - return update_record_fields(facet, day, record_id, {"description": description}) 962 + patch: dict[str, Any] = {"description": description} 963 + current = get_activity_record(facet, day, record_id) 964 + if title is not None: 965 + patch["title"] = title 966 + elif current is not None: 967 + current_title = str(current.get("title") or "").strip() 968 + current_description = str(current.get("description") or "").strip() 969 + if not current_title or current_title == current_description: 970 + patch["title"] = description 971 + if details is not None: 972 + patch["details"] = details 973 + return update_record_fields(facet, day, record_id, patch) 974 + 975 + 976 + def get_activity_record(facet: str, day: str, record_id: str) -> dict[str, Any] | None: 977 + """Return one activity record by ID, including hidden records.""" 978 + for record in load_activity_records(facet, day, include_hidden=True): 979 + if record.get("id") == record_id: 980 + return record 981 + return None 982 + 983 + 984 + def update_activity_record( 985 + facet: str, 986 + day: str, 987 + record_id: str, 988 + patch: dict[str, Any], 989 + *, 990 + actor: str, 991 + note: str, 992 + ) -> dict[str, Any] | None: 993 + """Apply a shallow patch to an activity record and append one edit.""" 994 + allowed_fields = {"title", "description", "details"} 995 + if not patch: 996 + raise ValueError("patch cannot be empty") 997 + 998 + disallowed = sorted(set(patch) - allowed_fields) 999 + if disallowed: 1000 + raise ValueError(f"patch contains disallowed fields: {', '.join(disallowed)}") 1001 + 1002 + updated_record: dict[str, Any] | None = None 1003 + 1004 + def modify_fn(records: list[dict[str, Any]]) -> list[dict[str, Any]]: 1005 + nonlocal updated_record 1006 + new_records: list[dict[str, Any]] = [] 1007 + for record in records: 1008 + if record.get("id") == record_id: 1009 + merged = _normalize_activity_record({**record, **patch}) 1010 + merged = append_edit( 1011 + merged, 1012 + actor=actor, 1013 + fields=list(patch.keys()), 1014 + note=note, 1015 + ) 1016 + updated_record = merged 1017 + new_records.append(merged) 1018 + else: 1019 + new_records.append(record) 1020 + return new_records 1021 + 1022 + try: 1023 + locked_modify(_get_records_path(facet, day), modify_fn) 1024 + except FileNotFoundError: 1025 + return None 1026 + 1027 + return updated_record 1028 + 1029 + 1030 + def _set_activity_hidden_state( 1031 + facet: str, 1032 + day: str, 1033 + record_id: str, 1034 + *, 1035 + hidden: bool, 1036 + actor: str, 1037 + reason: str | None, 1038 + ) -> dict[str, Any] | None: 1039 + updated_record: dict[str, Any] | None = None 1040 + 1041 + def modify_fn(records: list[dict[str, Any]]) -> list[dict[str, Any]]: 1042 + nonlocal updated_record 1043 + new_records: list[dict[str, Any]] = [] 1044 + for record in records: 1045 + if record.get("id") != record_id: 1046 + new_records.append(record) 1047 + continue 1048 + 1049 + normalized = _normalize_activity_record(record) 1050 + if normalized.get("hidden", False) == hidden: 1051 + updated_record = normalized 1052 + new_records.append(normalized) 1053 + continue 1054 + 1055 + normalized["hidden"] = hidden 1056 + normalized = append_edit( 1057 + normalized, 1058 + actor=actor, 1059 + fields=["hidden"], 1060 + note=reason or ("muted" if hidden else "unmuted"), 1061 + ) 1062 + updated_record = normalized 1063 + new_records.append(normalized) 1064 + return new_records 1065 + 1066 + try: 1067 + locked_modify(_get_records_path(facet, day), modify_fn) 1068 + except FileNotFoundError: 1069 + return None 1070 + 1071 + return updated_record 1072 + 1073 + 1074 + def mute_activity_record( 1075 + facet: str, 1076 + day: str, 1077 + record_id: str, 1078 + *, 1079 + actor: str, 1080 + reason: str | None, 1081 + ) -> dict[str, Any] | None: 1082 + """Hide an activity record without deleting it.""" 1083 + return _set_activity_hidden_state( 1084 + facet, 1085 + day, 1086 + record_id, 1087 + hidden=True, 1088 + actor=actor, 1089 + reason=reason, 1090 + ) 1091 + 1092 + 1093 + def unmute_activity_record( 1094 + facet: str, 1095 + day: str, 1096 + record_id: str, 1097 + *, 1098 + actor: str, 1099 + reason: str | None, 1100 + ) -> dict[str, Any] | None: 1101 + """Restore a previously hidden activity record.""" 1102 + return _set_activity_hidden_state( 1103 + facet, 1104 + day, 1105 + record_id, 1106 + hidden=False, 1107 + actor=actor, 1108 + reason=reason, 1109 + ) 833 1110 834 1111 835 1112 def estimate_duration_minutes(segments: list[str]) -> int: ··· 860 1137 return 0.5 861 1138 values = [LEVEL_VALUES.get(level, 0.5) for level in levels] 862 1139 return round(sum(values) / len(values), 2) 1140 + 1141 + 1142 + def _extract_activity_header(file_path: str | os.PathLike[str] | None) -> str: 1143 + """Build a formatter header from an activities file path.""" 1144 + if not file_path: 1145 + return "# Activities" 1146 + 1147 + path = Path(file_path) 1148 + parts = path.parts 1149 + try: 1150 + facet_idx = parts.index("facets") 1151 + facet_name = parts[facet_idx + 1] 1152 + except (ValueError, IndexError): 1153 + facet_name = "unknown" 1154 + 1155 + stem = path.stem 1156 + if stem.isdigit() and len(stem) == 8: 1157 + return f"# Activities: {facet_name} ({stem[:4]}-{stem[4:6]}-{stem[6:8]})" 1158 + return f"# Activities: {facet_name}" 1159 + 1160 + 1161 + def _activity_time_range(segments: list[str]) -> str | None: 1162 + """Return a compact HH:MM-HH:MM label for a list of segment keys.""" 1163 + if not segments: 1164 + return None 1165 + 1166 + start_time, _ = segment_parse(segments[0]) 1167 + _, end_time = segment_parse(segments[-1]) 1168 + if start_time is None or end_time is None: 1169 + return None 1170 + 1171 + return f"{start_time.strftime('%H:%M')}-{end_time.strftime('%H:%M')}" 1172 + 1173 + 1174 + def _format_participation(record: dict[str, Any]) -> str | None: 1175 + """Format participation names for display.""" 1176 + participation = record.get("participation") 1177 + if not isinstance(participation, list) or not participation: 1178 + return None 1179 + 1180 + names = [] 1181 + for entry in participation: 1182 + if not isinstance(entry, dict): 1183 + continue 1184 + name = str(entry.get("name") or entry.get("entity_id") or "").strip() 1185 + if name: 1186 + names.append(name) 1187 + 1188 + if not names: 1189 + return None 1190 + return ", ".join(names) 1191 + 1192 + 1193 + def format_activities( 1194 + entries: list[dict], 1195 + context: dict | None = None, 1196 + ) -> tuple[list[dict], dict]: 1197 + """Format activity JSONL entries into markdown chunks.""" 1198 + ctx = context or {} 1199 + meta: dict[str, Any] = { 1200 + "header": _extract_activity_header(ctx.get("file_path")), 1201 + "indexer": {"agent": "activity"}, 1202 + } 1203 + chunks: list[dict[str, Any]] = [] 1204 + 1205 + for entry in entries: 1206 + if not isinstance(entry, dict): 1207 + continue 1208 + 1209 + record = _normalize_activity_record(entry) 1210 + lines = [f"### {_fallback_activity_title(record)}"] 1211 + 1212 + activity_type = str(record.get("activity") or record.get("id") or "").strip() 1213 + if activity_type: 1214 + lines.append(f"- Activity: {activity_type}") 1215 + 1216 + time_range = _activity_time_range(record.get("segments", [])) 1217 + if time_range: 1218 + lines.append(f"- Time: {time_range}") 1219 + 1220 + if "level_avg" in record: 1221 + lines.append(f"- Level: {record['level_avg']}") 1222 + 1223 + description = str(record.get("description") or "").strip() 1224 + if description: 1225 + lines.append(f"- Description: {description}") 1226 + 1227 + details = str(record.get("details") or "").strip() 1228 + if details: 1229 + lines.append(f"- Details: {details}") 1230 + 1231 + participants = _format_participation(record) 1232 + if participants: 1233 + lines.append(f"- Participation: {participants}") 1234 + 1235 + if record.get("hidden", False): 1236 + lines.append("- Hidden: yes") 1237 + 1238 + chunks.append( 1239 + { 1240 + "timestamp": int(record.get("created_at", 0) or 0), 1241 + "markdown": "\n".join(lines), 1242 + "source": record, 1243 + } 1244 + ) 1245 + 1246 + return chunks, meta
+1
think/formatters.py
··· 140 140 False, # Indexed via _index_entity_search_chunks (enriched with relationship data) 141 141 ), 142 142 "facets/*/events/*.jsonl": ("think.events", "format_events", True), 143 + "facets/*/activities/*.jsonl": ("think.activities", "format_activities", True), 143 144 "facets/*/calendar/*.jsonl": ("think.events", "format_events", True), 144 145 "facets/*/todos/*.jsonl": ("apps.todos.todo", "format_todos", True), 145 146 "facets/*/logs/*.jsonl": ("think.facets", "format_logs", True),
+13 -2
think/merge.py
··· 12 12 from pathlib import Path 13 13 from typing import Any 14 14 15 + from think.activities import locked_modify 15 16 from think.entities.core import entity_slug 16 17 from think.entities.journal import ( 17 18 load_all_journal_entities, ··· 596 597 }, 597 598 ) 598 599 if new_config and not dry_run: 599 - _append_jsonl(target_config_file, new_config) 600 + _append_jsonl_locked(target_config_file, new_config) 600 601 except Exception as exc: 601 602 summary.errors.append(f"facet {facet_name} activities config: {exc}") 602 603 ··· 633 634 }, 634 635 ) 635 636 if new_records and not dry_run: 636 - _append_jsonl(target_day_file, new_records) 637 + _append_jsonl_locked(target_day_file, new_records) 637 638 except Exception as exc: 638 639 summary.errors.append( 639 640 f"facet {facet_name} activities {source_day_file.name}: {exc}" ··· 821 822 with open(path, "a", encoding="utf-8") as handle: 822 823 for item in items: 823 824 handle.write(json.dumps(item, ensure_ascii=False) + "\n") 825 + 826 + 827 + def _append_jsonl_locked(path: Path, items: list[dict[str, Any]]) -> None: 828 + if not items: 829 + return 830 + 831 + def modify_fn(records: list[dict[str, Any]]) -> list[dict[str, Any]]: 832 + return records + [dict(item) for item in items] 833 + 834 + locked_modify(path, modify_fn, create_if_missing=True) 824 835 825 836 826 837 __all__ = ["MergeSummary", "merge_journals"]
+9 -9
think/thinking.py
··· 23 23 from think.activities import ( 24 24 append_activity_record, 25 25 get_activity_output_path, 26 + get_activity_record, 26 27 load_activity_records, 27 28 ) 28 29 from think.activity_state_machine import ActivityStateMachine ··· 1834 1835 True if all agents succeeded, False if any failed 1835 1836 """ 1836 1837 # Load activity record 1837 - records = load_activity_records(facet, day) 1838 - record = None 1839 - for r in records: 1840 - if r.get("id") == activity_id: 1841 - record = r 1842 - break 1838 + record = get_activity_record(facet, day, activity_id) 1843 1839 1844 1840 if not record: 1845 1841 logging.error( ··· 1853 1849 activity_type = record.get("activity", "") 1854 1850 segments = record.get("segments", []) 1855 1851 1856 - if not segments: 1857 - logging.error("Activity record %s has no segments", activity_id) 1858 - return False 1852 + if record.get("source") == "cogitate" or not segments: 1853 + logging.info( 1854 + "Skipping activity-scheduled generators for synthetic activity %s (source=%s)", 1855 + activity_id, 1856 + record.get("source"), 1857 + ) 1858 + return True 1859 1859 1860 1860 # Load activity-scheduled agents 1861 1861 all_prompts = get_talent_configs(schedule="activity")