personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4import argparse
5import json
6import logging
7import os
8from collections import Counter
9from datetime import datetime, timezone
10from pathlib import Path
11from typing import Dict
12
13from observe.sense import scan_day as sense_scan_day
14from observe.utils import VIDEO_EXTENSIONS, load_analysis_frames
15from think.activities import estimate_duration_minutes, load_activity_records
16from think.facets import get_facets
17from think.stats_schema import DAY_FIELDS, SCHEMA_VERSION
18from think.stats_schema import validate as validate_stats
19from think.talents import scan_day as generate_scan_day
20from think.utils import day_dirs, get_journal, segment_parse, setup_cli
21
22logger = logging.getLogger(__name__)
23
24
25class JournalStats:
26 def __init__(self) -> None:
27 self.days: Dict[str, Dict[str, float | int]] = {}
28 self.totals: Counter[str] = Counter()
29 self.total_transcript_duration = 0.0
30 self.total_percept_duration = 0.0
31 self.agent_counts: Counter[str] = Counter()
32 self.agent_minutes: Counter[str] = Counter()
33 self.facet_counts: Counter[str] = Counter()
34 self.facet_minutes: Counter[str] = Counter()
35 self.heatmap: list[list[float]] = [[0.0 for _ in range(24)] for _ in range(7)]
36 # Token usage tracking: {day: {model: {token_type: count}}}
37 self.token_usage: Dict[str, Dict[str, Dict[str, int]]] = {}
38 # Total token usage by model: {model: {token_type: count}}
39 self.token_totals: Dict[str, Dict[str, int]] = {}
40 # Per-day agent counts: {day: {agent: count}}
41 self.agent_counts_by_day: Dict[str, Dict[str, int]] = {}
42 # Per-day facet counts: {day: {facet: count}}
43 self.facet_counts_by_day: Dict[str, Dict[str, int]] = {}
44
45 def _get_day_mtime(self, day_dir: Path) -> float:
46 """Get latest modification time of files we scan."""
47 files = []
48 # Check segment subdirectories for processed files (day/stream/segment/)
49 files.extend(day_dir.glob("*/*/*audio.jsonl"))
50 files.extend(day_dir.glob("*/*/*_transcript.jsonl"))
51 files.extend(day_dir.glob("*/*/*_transcript.md"))
52 files.extend(day_dir.glob("*/*/*screen.jsonl"))
53 # Check day root for unprocessed media files
54 files.extend(day_dir.glob("*.flac"))
55 files.extend(day_dir.glob("*.m4a"))
56 for ext in VIDEO_EXTENSIONS:
57 files.extend(day_dir.glob(f"*{ext}"))
58
59 talents_dir = day_dir / "talents"
60 if talents_dir.is_dir():
61 files.extend(talents_dir.glob("*.json"))
62 files.extend(talents_dir.glob("*.md"))
63 files.extend(talents_dir.glob("*/*.json"))
64 files.extend(talents_dir.glob("*/*.md"))
65
66 if not files:
67 return 0.0
68 return max(f.stat().st_mtime for f in files)
69
70 def _load_day_cache(self, day: str, day_dir: Path) -> dict | None:
71 """Load cached day stats if fresh."""
72 cache_file = day_dir / "stats.json"
73 if not cache_file.exists():
74 return None
75
76 try:
77 cache_mtime = cache_file.stat().st_mtime
78 day_mtime = self._get_day_mtime(day_dir)
79
80 if cache_mtime > day_mtime:
81 with open(cache_file, encoding="utf-8") as f:
82 return json.load(f)
83 except Exception as e:
84 logger.debug(f"Cache load failed for {day}: {e}")
85
86 return None
87
88 def _save_day_cache(self, day_dir: Path, stats: dict) -> None:
89 """Save day stats to cache."""
90 try:
91 cache_file = day_dir / "stats.json"
92 with open(cache_file, "w", encoding="utf-8") as f:
93 json.dump(stats, f, indent=2)
94 except Exception as e:
95 logger.debug(f"Cache save failed: {e}")
96
97 def _parse_timestamp(self, ts: str) -> float:
98 """Parse HH:MM:SS timestamp to seconds since midnight."""
99 try:
100 h, m, s = ts.split(":")
101 return int(h) * 3600 + int(m) * 60 + int(s)
102 except Exception:
103 return 0.0
104
105 def _calculate_audio_duration(self, segments: list) -> float:
106 """Calculate audio duration from min/max timestamps."""
107 timestamps = [seg.get("start") for seg in segments if seg.get("start")]
108 if not timestamps:
109 return 0.0
110
111 times_seconds = [self._parse_timestamp(t) for t in timestamps]
112 return max(times_seconds) - min(times_seconds)
113
114 def _calculate_percept_duration(self, frames: list) -> float:
115 """Calculate screen duration from min/max frame timestamps."""
116 # Skip header (first element if it has no frame_id)
117 frame_timestamps = [
118 f["timestamp"] for f in frames if "timestamp" in f and "frame_id" in f
119 ]
120 if not frame_timestamps:
121 return 0.0
122
123 return max(frame_timestamps) - min(frame_timestamps)
124
125 def _apply_day_stats(self, day: str, cached_data: dict) -> None:
126 """Apply cached day stats to instance state."""
127 # Extract components from cache
128 stats = cached_data.get("stats", {})
129 agent_data = cached_data.get("agent_data", {})
130 heatmap_data = cached_data.get("heatmap_data", {})
131
132 # Apply day stats
133 self.days[day] = stats
134
135 # Update totals (excluding per-day durations)
136 counts_for_totals = {
137 k: v
138 for k, v in stats.items()
139 if k not in ("transcript_duration", "percept_duration")
140 }
141 self.totals.update(counts_for_totals)
142
143 # Accumulate durations
144 self.total_transcript_duration += stats.get("transcript_duration", 0.0)
145 self.total_percept_duration += stats.get("percept_duration", 0.0)
146
147 # Apply agent data
148 day_agent_counts: Dict[str, int] = {}
149 for agent, data in agent_data.items():
150 count = data.get("count", 0)
151 self.agent_counts[agent] += count
152 self.agent_minutes[agent] += data.get("minutes", 0.0)
153 if count > 0:
154 day_agent_counts[agent] = count
155 if day_agent_counts:
156 self.agent_counts_by_day[day] = day_agent_counts
157
158 # Apply facet data
159 facet_data = cached_data.get("facet_data", {})
160 day_facet_counts: Dict[str, int] = {}
161 for facet, data in facet_data.items():
162 count = data.get("count", 0)
163 self.facet_counts[facet] += count
164 self.facet_minutes[facet] += data.get("minutes", 0.0)
165 if count > 0:
166 day_facet_counts[facet] = count
167 if day_facet_counts:
168 self.facet_counts_by_day[day] = day_facet_counts
169
170 # Apply heatmap data
171 weekday = heatmap_data.get("weekday")
172 hours = heatmap_data.get("hours", {})
173 if weekday is not None:
174 for hour_str, minutes in hours.items():
175 hour = int(hour_str)
176 self.heatmap[weekday][hour] += minutes
177
178 def scan_day(self, day: str, path: str) -> dict:
179 """Scan a single day and return stats dict for caching."""
180 stats: Counter[str] = Counter()
181 transcript_duration = 0.0
182 percept_duration = 0.0
183 day_dir = Path(path)
184
185 # Track agent data for cache
186 agent_data = {}
187 facet_data = {}
188 heatmap_hours = {}
189
190 # --- Transcript sessions ---
191 # Check segment subdirectories for transcript JSONL files (day/stream/segment/)
192 transcript_files = list(day_dir.glob("*/*/audio.jsonl"))
193 transcript_files.extend(day_dir.glob("*/*/*_audio.jsonl"))
194 transcript_files.extend(day_dir.glob("*/*/*_transcript.jsonl"))
195 for jsonl_file in sorted(set(transcript_files)):
196 stats["transcript_sessions"] += 1
197
198 try:
199 with open(jsonl_file, encoding="utf-8") as f:
200 lines = [line.strip() for line in f if line.strip()]
201
202 if not lines:
203 logger.debug(f"Empty transcript file: {jsonl_file}")
204 continue
205
206 # First line is metadata, rest are segments
207 segments = []
208 for i, line in enumerate(lines[1:], start=2):
209 try:
210 segments.append(json.loads(line))
211 except json.JSONDecodeError as e:
212 logger.debug(f"Invalid JSON at line {i} in {jsonl_file}: {e}")
213 continue
214
215 stats["transcript_segments"] += len(segments)
216
217 # Calculate duration from timestamps
218 if segments:
219 duration = self._calculate_audio_duration(segments)
220 transcript_duration += duration
221
222 except (OSError, IOError) as e:
223 logger.warning(f"Error reading transcript file {jsonl_file}: {e}")
224 except Exception as e:
225 logger.warning(f"Unexpected error processing {jsonl_file}: {e}")
226
227 # --- Screen sessions ---
228 # Check segment subdirectories for screen files (day/stream/segment/)
229 screen_files = list(day_dir.glob("*/*/screen.jsonl"))
230 screen_files.extend(day_dir.glob("*/*/*_screen.jsonl"))
231 for jsonl_file in sorted(screen_files):
232 stats["percept_sessions"] += 1
233
234 try:
235 frames = load_analysis_frames(jsonl_file)
236 if not frames:
237 logger.debug(f"No valid frames in: {jsonl_file}")
238 continue
239
240 # Count frames (excluding header)
241 frame_count = sum(1 for f in frames if "frame_id" in f)
242 stats["percept_frames"] += frame_count
243
244 # Calculate duration from timestamps
245 if frame_count > 0:
246 duration = self._calculate_percept_duration(frames)
247 percept_duration += duration
248
249 except (OSError, IOError) as e:
250 logger.warning(f"Error reading screen file {jsonl_file}: {e}")
251 except Exception as e:
252 logger.warning(f"Unexpected error processing {jsonl_file}: {e}")
253
254 # --- Pending segments (unprocessed media files) ---
255 sense_info = sense_scan_day(day_dir)
256 stats["pending_segments"] = sense_info["pending_segments"]
257
258 # --- Insight summaries ---
259 output_info = generate_scan_day(day)
260 stats["outputs_processed"] = len(output_info["processed"])
261 stats["outputs_pending"] = len(output_info["repairable"])
262
263 # --- Activities and heatmap from facets/*/activities/YYYYMMDD.jsonl ---
264 weekday = datetime.strptime(day, "%Y%m%d").weekday()
265 for facet_name, _facet_meta in get_facets().items():
266 activities_file = (
267 Path(get_journal())
268 / "facets"
269 / facet_name
270 / "activities"
271 / f"{day}.jsonl"
272 )
273 try:
274 records = load_activity_records(facet_name, day)
275 for record in records:
276 activity_type = record.get("activity") or "unknown"
277 segments = record.get("segments") or []
278 if not segments:
279 continue
280
281 if activity_type not in agent_data:
282 agent_data[activity_type] = {"count": 0, "minutes": 0.0}
283 agent_data[activity_type]["count"] += 1
284
285 duration_minutes = float(estimate_duration_minutes(segments))
286 agent_data[activity_type]["minutes"] += duration_minutes
287
288 if facet_name not in facet_data:
289 facet_data[facet_name] = {"count": 0, "minutes": 0.0}
290 facet_data[facet_name]["count"] += 1
291 facet_data[facet_name]["minutes"] += duration_minutes
292
293 # Build heatmap hours for this day
294 for seg in segments:
295 start, end = segment_parse(seg)
296 if start is None or end is None:
297 continue
298
299 start_sec = start.hour * 3600 + start.minute * 60 + start.second
300 end_sec = end.hour * 3600 + end.minute * 60 + end.second
301 cur = start_sec
302 while cur < end_sec:
303 hour = cur // 3600
304 if hour >= 24:
305 break
306 next_tick = min((hour + 1) * 3600, end_sec)
307 minutes = (next_tick - cur) / 60
308 heatmap_hours[str(hour)] = (
309 heatmap_hours.get(str(hour), 0.0) + minutes
310 )
311 cur = next_tick
312 except (OSError, IOError) as e:
313 logger.warning(f"Error reading {activities_file}: {e}")
314
315 # --- Disk usage ---
316 stats["day_bytes"] = sum(
317 f.stat().st_size for f in day_dir.rglob("*") if f.is_file()
318 )
319
320 # --- Build return dict ---
321 stats["transcript_duration"] = transcript_duration
322 stats["percept_duration"] = percept_duration
323
324 return {
325 "stats": dict(stats),
326 # NOTE: agent_data keys are now activity types (e.g., "meeting", "coding"), not extractor agent names. Key name retained for cache-format compatibility.
327 "agent_data": agent_data,
328 "facet_data": facet_data,
329 "heatmap_data": {"weekday": weekday, "hours": heatmap_hours},
330 }
331
332 def scan_all_tokens(self, journal_path: Path, use_cache: bool = True) -> None:
333 """Scan all token usage files in the tokens directory.
334
335 Reads daily *.jsonl files (one JSON object per line).
336 """
337 tokens_dir = journal_path / "tokens"
338 if not tokens_dir.is_dir():
339 return
340
341 today = datetime.now(timezone.utc).strftime("%Y%m%d")
342
343 # Scan JSONL files only
344 for token_file in tokens_dir.glob("*.jsonl"):
345 day = token_file.stem
346 cache_file = token_file.parent / f"{day}.tokens_cache.json"
347
348 if use_cache and day != today and cache_file.exists():
349 try:
350 if cache_file.stat().st_mtime > token_file.stat().st_mtime:
351 with open(cache_file, encoding="utf-8") as f:
352 cached = json.load(f)
353 self.token_usage[day] = cached
354 for model, counts in cached.items():
355 if model not in self.token_totals:
356 self.token_totals[model] = {}
357 for token_type, count in counts.items():
358 if token_type not in self.token_totals[model]:
359 self.token_totals[model][token_type] = 0
360 self.token_totals[model][token_type] += count
361 continue
362 except Exception as e:
363 logger.debug(f"Token cache load failed for {token_file}: {e}")
364
365 try:
366 with open(token_file, "r", encoding="utf-8") as f:
367 for line in f:
368 line = line.strip()
369 if not line:
370 continue
371 try:
372 data = json.loads(line)
373 self._process_token_entry(data)
374 except json.JSONDecodeError as e:
375 logger.debug(f"Invalid JSON in {token_file}: {e}")
376 continue
377
378 except (OSError, IOError) as e:
379 logger.warning(f"Error reading token file {token_file}: {e}")
380 continue
381
382 if use_cache and day != today:
383 try:
384 with open(cache_file, "w", encoding="utf-8") as f:
385 json.dump(self.token_usage.get(day, {}), f)
386 except Exception as e:
387 logger.debug(f"Token cache save failed for {token_file}: {e}")
388
389 def _process_token_entry(self, data: dict) -> None:
390 """Process a single token usage entry (expects normalized format)."""
391 # Extract date from timestamp
392 timestamp = data.get("timestamp")
393 if not timestamp:
394 return
395
396 # Use UTC for consistent date extraction (timestamps are in UTC from time.time())
397 file_date = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime(
398 "%Y%m%d"
399 )
400 model = data.get("model", "unknown")
401 usage = data.get("usage", {})
402
403 # Initialize day's token usage if not exists
404 if file_date not in self.token_usage:
405 self.token_usage[file_date] = {}
406
407 # Initialize model entry if not exists
408 if model not in self.token_usage[file_date]:
409 self.token_usage[file_date][model] = {}
410 if model not in self.token_totals:
411 self.token_totals[model] = {}
412
413 # Add token counts (all fields are already normalized by migration)
414 for token_type, count in usage.items():
415 if not isinstance(count, int):
416 continue
417
418 # Add to day's model totals
419 if token_type not in self.token_usage[file_date][model]:
420 self.token_usage[file_date][model][token_type] = 0
421 self.token_usage[file_date][model][token_type] += count
422
423 # Add to overall model totals
424 if token_type not in self.token_totals[model]:
425 self.token_totals[model][token_type] = 0
426 self.token_totals[model][token_type] += count
427
428 def scan(self, journal: str, verbose: bool = False, use_cache: bool = True) -> None:
429 days_map = day_dirs()
430 sorted_days = sorted(days_map.items())
431 cache_hits = 0
432 cache_misses = 0
433
434 for idx, (day, path) in enumerate(sorted_days, 1):
435 if not os.path.isdir(path):
436 continue
437
438 day_dir = Path(path)
439
440 # Try cache first
441 cached_data = None
442 if use_cache:
443 cached_data = self._load_day_cache(day, day_dir)
444
445 if cached_data:
446 # Cache hit - apply cached data
447 self._apply_day_stats(day, cached_data)
448 cache_hits += 1
449 if verbose:
450 print(
451 f"[{idx}/{len(sorted_days)}] {day} (cached)",
452 end="\r",
453 flush=True,
454 )
455 else:
456 # Cache miss - scan and save
457 cache_misses += 1
458 if verbose:
459 print(
460 f"[{idx}/{len(sorted_days)}] Scanning {day}...",
461 end="\r",
462 flush=True,
463 )
464 day_data = self.scan_day(day, path)
465 self._apply_day_stats(day, day_data)
466
467 if use_cache:
468 self._save_day_cache(day_dir, day_data)
469
470 # Scan tokens directory once after all days are processed
471 self.scan_all_tokens(Path(journal), use_cache=use_cache)
472
473 if verbose:
474 cache_status = (
475 f" (cache: {cache_hits} hits, {cache_misses} misses)"
476 if use_cache
477 else ""
478 )
479 logger.info(
480 f"Scanned {len(self.days)} days, "
481 f"{self.totals.get('transcript_sessions', 0)} transcript sessions, "
482 f"{self.totals.get('percept_sessions', 0)} percept sessions"
483 f"{cache_status}"
484 )
485
486 def to_dict(self) -> dict:
487 """Return a dictionary with all collected statistics."""
488 days = {
489 day: {field: stats.get(field, 0) for field in DAY_FIELDS}
490 for day, stats in self.days.items()
491 }
492 return {
493 "schema_version": SCHEMA_VERSION,
494 "generated_at": datetime.now(timezone.utc).isoformat(),
495 "day_count": len(self.days),
496 "days": days,
497 "totals": {
498 **dict(self.totals),
499 "total_transcript_duration": self.total_transcript_duration,
500 "total_percept_duration": self.total_percept_duration,
501 },
502 "heatmap": self.heatmap,
503 "tokens": {
504 "by_day": self.token_usage,
505 "by_model": self.token_totals,
506 },
507 "talents": {
508 "counts": dict(self.agent_counts),
509 "minutes": {k: round(v, 2) for k, v in self.agent_minutes.items()},
510 "counts_by_day": self.agent_counts_by_day,
511 },
512 "facets": {
513 "counts": dict(self.facet_counts),
514 "minutes": {k: round(v, 2) for k, v in self.facet_minutes.items()},
515 "counts_by_day": self.facet_counts_by_day,
516 },
517 }
518
519 def save_json(self, journal: str) -> None:
520 """Write full statistics to ``stats.json`` in ``journal``."""
521 data = self.to_dict()
522 errors = validate_stats(data)
523 if errors:
524 raise ValueError(f"Stats validation failed: {'; '.join(errors)}")
525 path = os.path.join(journal, "stats.json")
526 with open(path, "w", encoding="utf-8") as f:
527 json.dump(data, f, indent=2)
528
529
530def main() -> None:
531 parser = argparse.ArgumentParser(
532 description="Scan a solstone journal and generate statistics"
533 )
534 parser.add_argument(
535 "--no-cache",
536 action="store_true",
537 help="Disable per-day caching (force re-scan all days)",
538 )
539 args = setup_cli(parser)
540 journal = get_journal()
541
542 js = JournalStats()
543 js.scan(journal, verbose=args.verbose, use_cache=not args.no_cache)
544
545 try:
546 js.save_json(journal)
547 logger.info(f"Statistics saved to {journal}/stats.json")
548 except Exception as e:
549 logger.error(f"Error writing stats.json: {e}")
550 raise
551
552
553if __name__ == "__main__":
554 main()