personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Talent and generator orchestration utilities.
5
6This module provides functionality for configuring and orchestrating talents
7and generators from talent/*.md and apps/*/talent/*.md.
8
9Key functions:
10- get_talent_configs(): Discover all talent configs with filtering
11- get_talent(): Load complete talent configuration by name
12- Hook loading: load_pre_hook(), load_post_hook()
13
14For simple prompt loading without orchestration (observe/, think/*.md prompts),
15use think.prompts.load_prompt() directly.
16"""
17
18from __future__ import annotations
19
20import copy
21import importlib.util
22import json
23import logging
24import os
25import re
26from pathlib import Path
27from typing import Any, Callable
28
29import frontmatter
30from jsonschema import Draft202012Validator, SchemaError
31
32from think.facets import get_facets
33
34# Import core prompt utilities from think.prompts
35from think.prompts import _load_prompt_metadata, load_prompt
36
37# ---------------------------------------------------------------------------
38# Constants
39# ---------------------------------------------------------------------------
40
41TALENT_DIR = Path(__file__).parent.parent / "talent"
42APPS_DIR = Path(__file__).parent.parent / "apps"
43RUNTIME_FACETS_SENTINEL = "__RUNTIME_FACETS__"
44SLUG_RE = re.compile(r"^[a-z][a-z0-9_-]*$")
45LOG = logging.getLogger(__name__)
46
47
48# ---------------------------------------------------------------------------
49# Talent Config Discovery
50# ---------------------------------------------------------------------------
51
52
53def _validate_cwd(raw_cwd: Any, talent_type: Any, key: str) -> str | None:
54 """Validate and normalize the optional talent cwd setting."""
55 if talent_type == "cogitate":
56 if raw_cwd is None:
57 return "journal"
58 if raw_cwd in {"journal", "repo"}:
59 return raw_cwd
60 raise ValueError(
61 f"Prompt '{key}' has invalid 'cwd' value '{raw_cwd}' "
62 "(must be 'journal' or 'repo')"
63 )
64
65 if talent_type == "generate":
66 if raw_cwd is not None:
67 raise ValueError(
68 f"Prompt '{key}' sets 'cwd' but cwd is only valid for type: cogitate"
69 )
70 return None
71
72 if raw_cwd is None:
73 return None
74
75 raise ValueError(
76 f"Prompt '{key}' has invalid 'cwd' value '{raw_cwd}' "
77 "(must be 'journal' or 'repo')"
78 )
79
80
81def key_to_context(key: str) -> str:
82 """Convert talent config key to context pattern.
83
84 Parameters
85 ----------
86 key:
87 Talent config key in format "name" (system) or "app:name" (app).
88
89 Returns
90 -------
91 str
92 Context pattern: "talent.system.{name}" or "talent.{app}.{name}".
93
94 Examples
95 --------
96 >>> key_to_context("meetings")
97 'talent.system.meetings'
98 >>> key_to_context("entities:observer")
99 'talent.entities.observer'
100 """
101 if ":" in key:
102 app, name = key.split(":", 1)
103 return f"talent.{app}.{name}"
104 return f"talent.system.{key}"
105
106
107def get_output_name(key: str) -> str:
108 """Convert talent/generator key to a filesystem-safe filename stem.
109
110 Parameters
111 ----------
112 key:
113 Generator key in format "name" (system) or "app:name" (app).
114
115 Returns
116 -------
117 str
118 Filesystem-safe stem: "name" or "_app_name".
119
120 Examples
121 --------
122 >>> get_output_name("activity")
123 'activity'
124 >>> get_output_name("chat:sentiment")
125 '_chat_sentiment'
126 """
127 if ":" in key:
128 app, name = key.split(":", 1)
129 return f"_{app}_{name}"
130 return key
131
132
133def get_output_path(
134 day_dir: "os.PathLike[str]",
135 key: str,
136 segment: str | None = None,
137 output_format: str | None = None,
138 facet: str | None = None,
139 stream: str | None = None,
140) -> Path:
141 """Return output path for generator/talent output.
142
143 Shared utility for determining where to write generator results.
144 Used by think.talents and think.cortex.
145
146 Parameters
147 ----------
148 day_dir:
149 Day directory path (YYYYMMDD).
150 key:
151 Generator key or talent name (e.g., "activity", "chat:sentiment",
152 "entities:observer").
153 segment:
154 Optional segment key (HHMMSS_LEN) for segment-level output.
155 output_format:
156 Output format - "json" for JSON, anything else for markdown.
157 facet:
158 Optional facet name for multi-facet talents. When provided, output is
159 written under a talents/{facet}/ subdirectory.
160 stream:
161 Optional stream name for segment-level output. When provided with
162 segment, constructs path as YYYYMMDD/{stream}/{segment}/talents/...
163
164 Returns
165 -------
166 Path
167 Output file path:
168 - Segment + no facet: YYYYMMDD/{stream}/{segment}/talents/{name}.{ext}
169 - Segment + facet: YYYYMMDD/{stream}/{segment}/talents/{facet}/{name}.{ext}
170 - Daily + no facet: YYYYMMDD/talents/{name}.{ext}
171 - Daily + facet: YYYYMMDD/talents/{facet}/{name}.{ext}
172 Where name is derived from key and ext is "json" or "md".
173 """
174 day = Path(day_dir)
175 name = get_output_name(key)
176 ext = "json" if output_format == "json" else "md"
177 filename = f"{name}.{ext}"
178
179 if segment:
180 if stream:
181 seg_dir = day / stream / segment
182 else:
183 seg_dir = day / segment
184 if facet:
185 return seg_dir / "talents" / facet / filename
186 return seg_dir / "talents" / filename
187 if facet:
188 return day / "talents" / facet / filename
189 return day / "talents" / filename
190
191
192def get_talent_configs(
193 *,
194 type: str | None = None,
195 schedule: str | None = None,
196 include_disabled: bool = False,
197) -> dict[str, dict[str, Any]]:
198 """Load talent configs from system and app directories.
199
200 Unified function for loading both cogitate agents and generate prompts from
201 talent/*.md and apps/*/talent/*.md files. Filters based on explicit type field.
202
203 Args:
204 type: If provided, only configs with matching type value
205 ("generate" or "cogitate").
206 schedule: If provided, only configs where schedule matches this value
207 (e.g., "segment", "daily").
208 include_disabled: If True, include configs with disabled=True.
209 Default False (for processing pipelines).
210
211 Returns:
212 Dictionary mapping config keys to their metadata including:
213 - path: Path to the .md file
214 - source: "system" or "app"
215 - app: App name (only for app configs)
216 - All fields from frontmatter
217 """
218 from think.utils import get_config
219
220 configs: dict[str, dict[str, Any]] = {}
221
222 def matches_filter(info: dict) -> bool:
223 """Check if config matches the filter criteria."""
224 # Check explicit type filter
225 if type is not None and info.get("type") != type:
226 return False
227
228 # Check specific schedule value
229 if schedule is not None and info.get("schedule") != schedule:
230 return False
231
232 # Check disabled status
233 if not include_disabled and info.get("disabled", False):
234 return False
235
236 return True
237
238 # System configs from talent/
239 if TALENT_DIR.is_dir():
240 for md_path in sorted(TALENT_DIR.glob("*.md")):
241 name = md_path.stem
242 info = _load_prompt_metadata(md_path)
243
244 info["source"] = "system"
245 configs[name] = info
246
247 # App configs from apps/*/talent/
248 apps_dir = APPS_DIR
249 if apps_dir.is_dir():
250 for app_path in sorted(apps_dir.iterdir()):
251 if not app_path.is_dir() or app_path.name.startswith("_"):
252 continue
253 app_talent_dir = app_path / "talent"
254 if not app_talent_dir.is_dir():
255 continue
256 app_name = app_path.name
257 for md_path in sorted(app_talent_dir.glob("*.md")):
258 item_name = md_path.stem
259 info = _load_prompt_metadata(md_path)
260
261 key = f"{app_name}:{item_name}"
262 info["source"] = "app"
263 info["app"] = app_name
264 configs[key] = info
265
266 # Merge journal config overrides from providers.contexts
267 providers_config = get_config().get("providers", {})
268 contexts = providers_config.get("contexts", {})
269
270 for key, info in configs.items():
271 context_key = key_to_context(key)
272
273 # Check for exact match in contexts
274 override = contexts.get(context_key)
275 if override and isinstance(override, dict):
276 # Merge supported override fields
277 if "disabled" in override:
278 info["disabled"] = override["disabled"]
279 if "extract" in override:
280 info["extract"] = override["extract"]
281 if "tier" in override:
282 info["tier"] = override["tier"]
283 if "provider" in override:
284 info["provider"] = override["provider"]
285
286 # Validate: scheduled prompts must have explicit priority
287 for key, info in configs.items():
288 if info.get("schedule") and "priority" not in info:
289 raise ValueError(
290 f"Scheduled prompt '{key}' is missing required 'priority' field. "
291 f"All prompts with 'schedule' must declare an explicit priority."
292 )
293
294 # Validate: prompts with output must have consistent explicit type
295 valid_types = {"generate", "cogitate"}
296 for key, info in configs.items():
297 output_present = "output" in info
298 config_type = info.get("type")
299
300 if config_type is not None and config_type not in valid_types:
301 raise ValueError(
302 f"Prompt '{key}' has invalid type {config_type!r}. "
303 "Expected 'generate' or 'cogitate'."
304 )
305
306 if not output_present and config_type is None:
307 continue
308
309 if config_type is None:
310 raise ValueError(
311 f"Prompt '{key}' has output but is missing required 'type' field."
312 )
313
314 if config_type == "generate" and not output_present:
315 raise ValueError(
316 f"Prompt '{key}' has type='generate' but is missing required 'output' field."
317 )
318
319 # Validate: activity-scheduled prompts must have 'activities' list
320 for key, info in configs.items():
321 if info.get("schedule") == "activity":
322 activities_field = info.get("activities")
323 if not activities_field or not isinstance(activities_field, list):
324 raise ValueError(
325 f"Activity-scheduled prompt '{key}' must have a non-empty 'activities' list "
326 f'(activity types to match, or ["*"] for all types).'
327 )
328
329 # Validate: cwd is only valid for cogitate prompts and defaults there
330 for key, info in configs.items():
331 normalized_cwd = _validate_cwd(info.get("cwd"), info.get("type"), key)
332 if normalized_cwd is None:
333 info.pop("cwd", None)
334 else:
335 info["cwd"] = normalized_cwd
336
337 return {key: info for key, info in configs.items() if matches_filter(info)}
338
339
340# ---------------------------------------------------------------------------
341# Talent Resolution
342# ---------------------------------------------------------------------------
343
344
345def _resolve_talent_path(name: str) -> tuple[Path, str]:
346 """Resolve talent name to directory path and filename.
347
348 Parameters
349 ----------
350 name:
351 Talent name - either system talent (e.g., "chat") or
352 app-namespaced talent (e.g., "support:support").
353
354 Returns
355 -------
356 tuple[Path, str]
357 (talent_directory, talent_name) tuple.
358 """
359 if ":" in name:
360 # App talent: "support:support" -> apps/support/talent/support
361 app, talent_name = name.split(":", 1)
362 talent_dir = Path(__file__).parent.parent / "apps" / app / "talent"
363 else:
364 # System talent: bare name -> talent/{name}
365 talent_dir = TALENT_DIR
366 talent_name = name
367 return talent_dir, talent_name
368
369
370# Default load configuration - prompts must explicitly opt into source loading
371_DEFAULT_LOAD = {
372 "transcripts": False,
373 "percepts": False,
374 "talents": False,
375}
376
377
378# ---------------------------------------------------------------------------
379# Source Configuration Helpers
380# ---------------------------------------------------------------------------
381
382
383def source_is_enabled(value: bool | str | dict) -> bool:
384 """Check if a source should be loaded based on its config value.
385
386 Sources can be configured as:
387 - False: don't load
388 - True: load if available
389 - "required": load (and generation will fail if none found)
390 - dict: for talents source, selective loading (e.g., {"entities": true})
391
392 Both True and "required" mean the source should be loaded.
393 A non-empty dict means the source should be loaded (with filtering).
394
395 Args:
396 value: The source config value (bool, "required" string, or dict for talents)
397
398 Returns:
399 True if the source should be loaded, False otherwise.
400 """
401 if isinstance(value, dict):
402 # Dict means selective loading - enabled if any agent is enabled
403 return any(v is True or v == "required" for v in value.values())
404 return value is True or value == "required"
405
406
407def source_is_required(value: bool | str | dict) -> bool:
408 """Check if a source must have content for generation to proceed.
409
410 Args:
411 value: The source config value (bool, "required" string, or dict for talents)
412
413 Returns:
414 True if the source is required (generation should skip if no content).
415 For dict values, returns True if any agent is marked "required".
416 """
417 if isinstance(value, dict):
418 return any(v == "required" for v in value.values())
419 return value == "required"
420
421
422def get_talent_filter(value: bool | str | dict) -> dict[str, bool | str] | None:
423 """Extract talent filter from sources config.
424
425 When talents source is a dict, returns it as filter mapping talent names
426 to their enabled/required status. When talents source is bool or "required",
427 returns None to indicate all talents should be loaded.
428
429 Args:
430 value: The talents source config value
431
432 Returns:
433 Dict mapping talent names to bool/"required", or None for all talents.
434 Returns empty dict if value is False (no talents).
435
436 Examples:
437 >>> get_talent_filter(True)
438 None # All talents
439 >>> get_talent_filter(False)
440 {} # No talents
441 >>> get_talent_filter({"entities": True, "meetings": "required"})
442 {"entities": True, "meetings": "required"}
443 """
444 if isinstance(value, dict):
445 return value
446 if value is False:
447 return {} # No talents
448 return None # All talents (True or "required")
449
450
451def _valid_runtime_facets() -> list[str]:
452 """Return sorted list of facet directory names matching SLUG_RE."""
453 return sorted(slug for slug in get_facets() if SLUG_RE.fullmatch(slug))
454
455
456def hydrate_runtime_enums(schema: Any) -> Any:
457 """Replace runtime sentinels in schema enums with current journal state.
458
459 Walks the schema; wherever an `enum` is exactly [RUNTIME_FACETS_SENTINEL],
460 replaces it with the sorted list of valid runtime facet slugs. If no
461 valid facets exist, drops the `enum` key and sets `minLength: 1` on
462 that node so the schema remains satisfiable.
463 Also removes the parent facets array `minItems` constraint in that case.
464
465 Returns None when given None. Deep-copies non-None input. Idempotent
466 for already-hydrated schemas (sentinel is gone after first call).
467 """
468 if schema is None:
469 return None
470
471 hydrated = copy.deepcopy(schema)
472 facets = _valid_runtime_facets()
473 used_empty_fallback = False
474
475 def _walk(node: Any) -> None:
476 nonlocal used_empty_fallback
477 if isinstance(node, dict):
478 if node.get("enum") == [RUNTIME_FACETS_SENTINEL]:
479 if facets:
480 node["enum"] = list(facets)
481 else:
482 node.pop("enum", None)
483 node["minLength"] = 1
484 used_empty_fallback = True
485 for value in node.values():
486 _walk(value)
487 elif isinstance(node, list):
488 for item in node:
489 _walk(item)
490
491 _walk(hydrated)
492
493 if used_empty_fallback:
494 facets_node = hydrated.get("properties", {}).get("facets")
495 if isinstance(facets_node, dict):
496 facets_node.pop("minItems", None)
497 LOG.info(
498 "hydrate_runtime_enums: no valid runtime facets; using minLength fallback"
499 )
500
501 return hydrated
502
503
504# ---------------------------------------------------------------------------
505# Talent Loading
506# ---------------------------------------------------------------------------
507
508
509def _load_talent_schema(
510 *,
511 name: str,
512 md_path: Path,
513 raw_schema: Any,
514) -> dict[str, Any]:
515 """Load and validate a talent JSON Schema from a relative file path."""
516 if not isinstance(raw_schema, str):
517 raise ValueError(
518 f"talent {name}: schema must be a string, got {type(raw_schema).__name__}: "
519 f"{raw_schema!r}"
520 )
521
522 raw_path = Path(raw_schema)
523 if raw_path.is_absolute():
524 raise ValueError(f"talent {name}: schema path must be relative: {raw_schema}")
525 if ".." in raw_path.parts:
526 raise ValueError(
527 f"talent {name}: schema path must not contain '..': {raw_schema}"
528 )
529
530 talent_dir = md_path.parent.resolve()
531 schema_path = (md_path.parent / raw_schema).resolve()
532 if not schema_path.is_relative_to(talent_dir):
533 raise ValueError(
534 f"talent {name}: schema path escapes talent directory: {schema_path}"
535 )
536 if not schema_path.exists():
537 raise FileNotFoundError(f"talent {name}: schema file not found: {schema_path}")
538
539 try:
540 with open(schema_path, encoding="utf-8") as f:
541 parsed = json.load(f)
542 except json.JSONDecodeError as exc:
543 raise ValueError(
544 f"talent {name}: schema file is not valid JSON: {schema_path}"
545 ) from exc
546
547 try:
548 Draft202012Validator.check_schema(parsed)
549 except SchemaError as exc:
550 raise ValueError(
551 f"talent {name}: schema file is not a valid JSON Schema: {schema_path}"
552 ) from exc
553
554 return parsed
555
556
557def get_talent(
558 name: str = "chat",
559 facet: str | None = None,
560 analysis_day: str | None = None,
561) -> dict:
562 """Return a complete talent configuration by name.
563
564 Loads configuration from .md file with JSON frontmatter and instruction text.
565 Template variables like $facets are resolved during prompt loading.
566 Source data config comes from the frontmatter 'load' key.
567
568 Parameters
569 ----------
570 name:
571 Talent name to load. Can be a system talent (e.g., "chat")
572 or an app-namespaced talent (e.g., "support:support" for apps/support/talent/support).
573 facet:
574 Optional facet name to focus on. Controls $facets template variable.
575 analysis_day:
576 Optional day in YYYYMMDD format. Not used directly — day-based
577 template context is applied in prepare_config().
578
579 Returns
580 -------
581 dict
582 Complete talent configuration including:
583 - name: Talent name
584 - path: Path to the .md file
585 - user_instruction: Composed prompt with template vars resolved
586 - sources: Source config from 'load' key
587 - All frontmatter fields (tools, hook, disabled, thinking_budget, etc.)
588 """
589 from think.prompts import _resolve_facets
590
591 # Resolve talent path based on namespace
592 talent_dir, talent_name = _resolve_talent_path(name)
593
594 # Verify talent prompt file exists
595 md_path = talent_dir / f"{talent_name}.md"
596 if not md_path.exists():
597 raise FileNotFoundError(f"Talent not found: {name}")
598
599 # Load config from frontmatter - preserve all fields
600 post = frontmatter.load(md_path)
601 config = dict(post.metadata) if post.metadata else {}
602 normalized_cwd = _validate_cwd(config.get("cwd"), config.get("type"), name)
603 if normalized_cwd is None:
604 config.pop("cwd", None)
605 else:
606 config["cwd"] = normalized_cwd
607
608 # Store path for later use
609 config["path"] = str(md_path)
610
611 if "schema" in config:
612 config["json_schema"] = _load_talent_schema(
613 name=name,
614 md_path=md_path,
615 raw_schema=config["schema"],
616 )
617 del config["schema"]
618
619 # Extract source config from 'load' key (replaces instructions.sources)
620 config["sources"] = config.pop("load", _DEFAULT_LOAD.copy())
621
622 # Build template context for $facets resolution
623 prompt_context: dict[str, str] = {}
624 prompt_context["facets"] = _resolve_facets(facet)
625
626 prompt_obj = load_prompt(talent_name, base_dir=talent_dir, context=prompt_context)
627 config["user_instruction"] = prompt_obj.text
628
629 # Set talent name
630 config["name"] = name
631
632 return config
633
634
635# ---------------------------------------------------------------------------
636# Hook Loading
637# ---------------------------------------------------------------------------
638
639
640def _resolve_hook_path(hook_name: str) -> Path:
641 """Resolve hook name to file path.
642
643 Resolution:
644 - Named: "name" -> talent/{name}.py
645 - App-qualified: "app:name" -> apps/{app}/talent/{name}.py
646 - Explicit path: "path/to/hook.py" -> direct path
647 """
648 if "/" in hook_name or hook_name.endswith(".py"):
649 # Explicit paths are relative to project root
650 project_root = Path(__file__).parent.parent
651 return project_root / hook_name
652 elif ":" in hook_name:
653 app, name = hook_name.split(":", 1)
654 return Path(__file__).parent.parent / "apps" / app / "talent" / f"{name}.py"
655 else:
656 return TALENT_DIR / f"{hook_name}.py"
657
658
659def _load_hook_function(config: dict, key: str, func_name: str) -> Callable | None:
660 """Load a hook function from config.
661
662 Args:
663 config: Agent/generator config dict
664 key: Hook key in config ("pre" or "post")
665 func_name: Function name to load ("pre_process" or "post_process")
666
667 Returns:
668 The hook function, or None if no hook configured.
669
670 Raises:
671 ValueError: If hook file doesn't define the required function.
672 ImportError: If hook file cannot be loaded.
673 """
674 hook_config = config.get("hook")
675 if not hook_config or not isinstance(hook_config, dict):
676 return None
677
678 hook_name = hook_config.get(key)
679 if not hook_name:
680 return None
681
682 hook_path = _resolve_hook_path(hook_name)
683
684 if not hook_path.exists():
685 raise ImportError(f"Hook file not found: {hook_path}")
686
687 spec = importlib.util.spec_from_file_location(
688 f"{key}_hook_{hook_path.stem}", hook_path
689 )
690 if spec is None or spec.loader is None:
691 raise ImportError(f"Cannot load hook from {hook_path}")
692
693 module = importlib.util.module_from_spec(spec)
694 spec.loader.exec_module(module)
695
696 if not hasattr(module, func_name):
697 raise ValueError(f"Hook {hook_path} must define a '{func_name}' function")
698
699 process_func = getattr(module, func_name)
700 if not callable(process_func):
701 raise ValueError(f"Hook {hook_path} '{func_name}' must be callable")
702
703 return process_func
704
705
706def load_post_hook(config: dict) -> Callable[[str, "HookContext"], str | None] | None:
707 """Load post-processing hook from config if defined.
708
709 Hook config format: {"hook": {"post": "name"}}
710
711 Returns:
712 Post-processing function or None if no hook configured.
713 Function signature: (result: str, context: HookContext) -> str | None
714 """
715 return _load_hook_function(config, "post", "post_process")
716
717
718def load_pre_hook(config: dict) -> Callable[["PreHookContext"], dict | None] | None:
719 """Load pre-processing hook from config if defined.
720
721 Hook config format: {"hook": {"pre": "name"}}
722
723 Returns:
724 Pre-processing function or None if no hook configured.
725 Function signature: (context: PreHookContext) -> dict | None
726 """
727 return _load_hook_function(config, "pre", "pre_process")
728
729
730# Type aliases for hook context - hooks receive the full config dict
731HookContext = dict
732PreHookContext = dict