personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4import json
5import math
6import re
7import time
8from datetime import datetime
9from pathlib import Path
10from typing import Any, Optional
11
12DATE_RE = re.compile(r"\d{8}")
13
14
15def format_date(date_str: str) -> str:
16 """Convert YYYYMMDD to 'Wednesday April 2nd' format."""
17 try:
18 date_obj = datetime.strptime(date_str, "%Y%m%d")
19 day = date_obj.day
20 if 10 <= day % 100 <= 20:
21 suffix = "th"
22 else:
23 suffix = {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
24 return date_obj.strftime(f"%A %B {day}{suffix}")
25 except ValueError:
26 return date_str
27
28
29def format_date_short(date_str: str) -> str:
30 """Convert YYYYMMDD to smart relative/short format.
31
32 Returns:
33 - "Today", "Yesterday", "Tomorrow" for those days
34 - Day name (e.g., "Wednesday") for dates within the past 6 days
35 - "Sat Nov 29" for other dates in current/recent year
36 - "Sat Nov 29 '24" for dates >6 months ago in a different year
37 """
38 try:
39 date_obj = datetime.strptime(date_str, "%Y%m%d")
40 today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
41 date_normalized = date_obj.replace(hour=0, minute=0, second=0, microsecond=0)
42 delta_days = (date_normalized - today).days
43
44 # Today, Yesterday, Tomorrow
45 if delta_days == 0:
46 return "Today"
47 elif delta_days == -1:
48 return "Yesterday"
49 elif delta_days == 1:
50 return "Tomorrow"
51 # Within past 6 days - use day name
52 elif -6 <= delta_days < 0:
53 return date_obj.strftime("%A")
54 # Default short format
55 else:
56 short = date_obj.strftime("%a %b %-d")
57 # Add year suffix if >6 months ago AND different year
58 months_ago = (today.year - date_obj.year) * 12 + (
59 today.month - date_obj.month
60 )
61 if months_ago > 6 and date_obj.year != today.year:
62 short += date_obj.strftime(" '%y")
63 return short
64 except ValueError:
65 return date_str
66
67
68def _plural(value: int, unit: str) -> str:
69 return f"{value} {unit}{'s' if value != 1 else ''}"
70
71
72def relative_time(seconds: int | float) -> str:
73 """Return canonical human readable duration for ``seconds``."""
74 if not math.isfinite(seconds) or seconds < 0:
75 seconds = 0
76 seconds = int(seconds)
77 if seconds < 60:
78 return _plural(seconds, "second")
79 minutes = seconds // 60
80 if minutes < 60:
81 return _plural(minutes, "minute")
82 hours = minutes // 60
83 if hours < 24:
84 return _plural(hours, "hour")
85 days = hours // 24
86 if days < 7:
87 return _plural(days, "day")
88 if days < 28:
89 return _plural(days // 7, "week")
90 if days < 60:
91 return "1 month"
92 return _plural(days // 30, "month")
93
94
95def time_since(epoch: int) -> str:
96 """Return short human readable age for ``epoch`` seconds."""
97 delta_seconds = max(0, time.time() - epoch)
98 return f"{relative_time(delta_seconds)} ago"
99
100
101def spawn_agent(
102 prompt: str,
103 name: str,
104 provider: Optional[str] = None,
105 config: Optional[dict[str, Any]] = None,
106 use_id: Optional[str] = None,
107) -> str | None:
108 """Spawn a Cortex agent and return the use_id.
109
110 Thin wrapper around cortex_request that ensures imports are handled
111 and returns the use_id directly.
112
113 Args:
114 prompt: The task or question for the agent
115 name: Agent name - system (e.g., "default") or app-qualified (e.g., "entities:entity_assist")
116 provider: Optional provider override (openai, google, anthropic)
117 config: Additional configuration (max_tokens, facet, session_id, etc.)
118 use_id: Optional pre-reserved Cortex use_id to reuse for the request
119
120 Returns:
121 use_id string (timestamp-based), or None if the request could not be sent.
122
123 Raises:
124 ValueError: If config is invalid
125 """
126 from think.cortex_client import cortex_request
127
128 return cortex_request(
129 prompt=prompt,
130 name=name,
131 provider=provider,
132 config=config,
133 use_id=use_id,
134 )
135
136
137def parse_pagination_params(
138 default_limit: int = 20,
139 max_limit: int = 100,
140 min_limit: int = 1,
141) -> tuple[int, int]:
142 """Parse and validate pagination parameters from request.args.
143
144 Extracts limit and offset from Flask request.args, validates them,
145 and enforces bounds to prevent API abuse.
146
147 Args:
148 default_limit: Default value for limit if not provided or invalid
149 max_limit: Maximum allowed value for limit
150 min_limit: Minimum allowed value for limit
151
152 Returns:
153 (limit, offset) tuple with validated integers
154
155 Example:
156 limit, offset = parse_pagination_params(default_limit=20, max_limit=100)
157 """
158 from flask import request
159
160 # Parse limit with error handling
161 try:
162 limit = int(request.args.get("limit", default_limit))
163 except (ValueError, TypeError):
164 limit = default_limit
165
166 # Parse offset with error handling
167 try:
168 offset = int(request.args.get("offset", 0))
169 except (ValueError, TypeError):
170 offset = 0
171
172 # Enforce bounds
173 limit = max(min_limit, min(limit, max_limit))
174 offset = max(0, offset)
175
176 return limit, offset
177
178
179def load_json(path: str | Path) -> dict | list | None:
180 """Load JSON file with consistent error handling.
181
182 Args:
183 path: Path to JSON file (string or Path object)
184
185 Returns:
186 Parsed JSON data (dict or list), or None if file doesn't exist or can't be parsed
187
188 Example:
189 data = load_json("config.json")
190 if data:
191 print(data.get("key"))
192 """
193 try:
194 with open(path, "r", encoding="utf-8") as f:
195 return json.load(f)
196 except (FileNotFoundError, json.JSONDecodeError, OSError):
197 return None
198
199
200def save_json(
201 path: str | Path,
202 data: dict | list,
203 indent: int = 2,
204 add_newline: bool = True,
205) -> bool:
206 """Save JSON file with consistent formatting.
207
208 Args:
209 path: Path to JSON file (string or Path object)
210 data: Data to serialize (dict or list)
211 indent: Indentation level (default: 2)
212 add_newline: Whether to add trailing newline for readability (default: True)
213
214 Returns:
215 True if successful, False otherwise
216
217 Example:
218 success = save_json("config.json", {"key": "value"})
219 """
220 try:
221 with open(path, "w", encoding="utf-8") as f:
222 json.dump(data, f, indent=indent, ensure_ascii=False)
223 if add_newline:
224 f.write("\n")
225 return True
226 except (OSError, TypeError):
227 return False
228
229
230def error_response(message: str, code: int = 400) -> tuple[Any, int]:
231 """Create a standard JSON error response.
232
233 Provides consistent error response format across all API endpoints.
234
235 Args:
236 message: Error message to return to client
237 code: HTTP status code (default: 400 Bad Request)
238
239 Returns:
240 Tuple of (jsonify response, status_code) ready for Flask return
241
242 Example:
243 return error_response("Invalid input", 400)
244 return error_response("Not found", 404)
245 """
246 from flask import jsonify
247
248 return jsonify({"error": message}), code
249
250
251def success_response(
252 data: dict[str, Any] | None = None, code: int = 200
253) -> tuple[Any, int]:
254 """Create a standard JSON success response.
255
256 Provides consistent success response format across all API endpoints.
257
258 Args:
259 data: Optional dict of additional data to include in response
260 code: HTTP status code (default: 200 OK)
261
262 Returns:
263 Tuple of (jsonify response, status_code) ready for Flask return
264
265 Example:
266 return success_response() # Returns {"success": True}
267 return success_response({"use_id": "123"}) # Returns {"success": True, "use_id": "123"}
268 """
269 from flask import jsonify
270
271 response_data = {"success": True}
272 if data:
273 response_data.update(data)
274 return jsonify(response_data), code