personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 554 lines 22 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4import argparse 5import json 6import logging 7import os 8from collections import Counter 9from datetime import datetime, timezone 10from pathlib import Path 11from typing import Dict 12 13from observe.sense import scan_day as sense_scan_day 14from observe.utils import VIDEO_EXTENSIONS, load_analysis_frames 15from think.activities import estimate_duration_minutes, load_activity_records 16from think.facets import get_facets 17from think.stats_schema import DAY_FIELDS, SCHEMA_VERSION 18from think.stats_schema import validate as validate_stats 19from think.talents import scan_day as generate_scan_day 20from think.utils import day_dirs, get_journal, segment_parse, setup_cli 21 22logger = logging.getLogger(__name__) 23 24 25class JournalStats: 26 def __init__(self) -> None: 27 self.days: Dict[str, Dict[str, float | int]] = {} 28 self.totals: Counter[str] = Counter() 29 self.total_transcript_duration = 0.0 30 self.total_percept_duration = 0.0 31 self.agent_counts: Counter[str] = Counter() 32 self.agent_minutes: Counter[str] = Counter() 33 self.facet_counts: Counter[str] = Counter() 34 self.facet_minutes: Counter[str] = Counter() 35 self.heatmap: list[list[float]] = [[0.0 for _ in range(24)] for _ in range(7)] 36 # Token usage tracking: {day: {model: {token_type: count}}} 37 self.token_usage: Dict[str, Dict[str, Dict[str, int]]] = {} 38 # Total token usage by model: {model: {token_type: count}} 39 self.token_totals: Dict[str, Dict[str, int]] = {} 40 # Per-day agent counts: {day: {agent: count}} 41 self.agent_counts_by_day: Dict[str, Dict[str, int]] = {} 42 # Per-day facet counts: {day: {facet: count}} 43 self.facet_counts_by_day: Dict[str, Dict[str, int]] = {} 44 45 def _get_day_mtime(self, day_dir: Path) -> float: 46 """Get latest modification time of files we scan.""" 47 files = [] 48 # Check segment subdirectories for processed files (day/stream/segment/) 49 files.extend(day_dir.glob("*/*/*audio.jsonl")) 50 files.extend(day_dir.glob("*/*/*_transcript.jsonl")) 51 files.extend(day_dir.glob("*/*/*_transcript.md")) 52 files.extend(day_dir.glob("*/*/*screen.jsonl")) 53 # Check day root for unprocessed media files 54 files.extend(day_dir.glob("*.flac")) 55 files.extend(day_dir.glob("*.m4a")) 56 for ext in VIDEO_EXTENSIONS: 57 files.extend(day_dir.glob(f"*{ext}")) 58 59 talents_dir = day_dir / "talents" 60 if talents_dir.is_dir(): 61 files.extend(talents_dir.glob("*.json")) 62 files.extend(talents_dir.glob("*.md")) 63 files.extend(talents_dir.glob("*/*.json")) 64 files.extend(talents_dir.glob("*/*.md")) 65 66 if not files: 67 return 0.0 68 return max(f.stat().st_mtime for f in files) 69 70 def _load_day_cache(self, day: str, day_dir: Path) -> dict | None: 71 """Load cached day stats if fresh.""" 72 cache_file = day_dir / "stats.json" 73 if not cache_file.exists(): 74 return None 75 76 try: 77 cache_mtime = cache_file.stat().st_mtime 78 day_mtime = self._get_day_mtime(day_dir) 79 80 if cache_mtime > day_mtime: 81 with open(cache_file, encoding="utf-8") as f: 82 return json.load(f) 83 except Exception as e: 84 logger.debug(f"Cache load failed for {day}: {e}") 85 86 return None 87 88 def _save_day_cache(self, day_dir: Path, stats: dict) -> None: 89 """Save day stats to cache.""" 90 try: 91 cache_file = day_dir / "stats.json" 92 with open(cache_file, "w", encoding="utf-8") as f: 93 json.dump(stats, f, indent=2) 94 except Exception as e: 95 logger.debug(f"Cache save failed: {e}") 96 97 def _parse_timestamp(self, ts: str) -> float: 98 """Parse HH:MM:SS timestamp to seconds since midnight.""" 99 try: 100 h, m, s = ts.split(":") 101 return int(h) * 3600 + int(m) * 60 + int(s) 102 except Exception: 103 return 0.0 104 105 def _calculate_audio_duration(self, segments: list) -> float: 106 """Calculate audio duration from min/max timestamps.""" 107 timestamps = [seg.get("start") for seg in segments if seg.get("start")] 108 if not timestamps: 109 return 0.0 110 111 times_seconds = [self._parse_timestamp(t) for t in timestamps] 112 return max(times_seconds) - min(times_seconds) 113 114 def _calculate_percept_duration(self, frames: list) -> float: 115 """Calculate screen duration from min/max frame timestamps.""" 116 # Skip header (first element if it has no frame_id) 117 frame_timestamps = [ 118 f["timestamp"] for f in frames if "timestamp" in f and "frame_id" in f 119 ] 120 if not frame_timestamps: 121 return 0.0 122 123 return max(frame_timestamps) - min(frame_timestamps) 124 125 def _apply_day_stats(self, day: str, cached_data: dict) -> None: 126 """Apply cached day stats to instance state.""" 127 # Extract components from cache 128 stats = cached_data.get("stats", {}) 129 agent_data = cached_data.get("agent_data", {}) 130 heatmap_data = cached_data.get("heatmap_data", {}) 131 132 # Apply day stats 133 self.days[day] = stats 134 135 # Update totals (excluding per-day durations) 136 counts_for_totals = { 137 k: v 138 for k, v in stats.items() 139 if k not in ("transcript_duration", "percept_duration") 140 } 141 self.totals.update(counts_for_totals) 142 143 # Accumulate durations 144 self.total_transcript_duration += stats.get("transcript_duration", 0.0) 145 self.total_percept_duration += stats.get("percept_duration", 0.0) 146 147 # Apply agent data 148 day_agent_counts: Dict[str, int] = {} 149 for agent, data in agent_data.items(): 150 count = data.get("count", 0) 151 self.agent_counts[agent] += count 152 self.agent_minutes[agent] += data.get("minutes", 0.0) 153 if count > 0: 154 day_agent_counts[agent] = count 155 if day_agent_counts: 156 self.agent_counts_by_day[day] = day_agent_counts 157 158 # Apply facet data 159 facet_data = cached_data.get("facet_data", {}) 160 day_facet_counts: Dict[str, int] = {} 161 for facet, data in facet_data.items(): 162 count = data.get("count", 0) 163 self.facet_counts[facet] += count 164 self.facet_minutes[facet] += data.get("minutes", 0.0) 165 if count > 0: 166 day_facet_counts[facet] = count 167 if day_facet_counts: 168 self.facet_counts_by_day[day] = day_facet_counts 169 170 # Apply heatmap data 171 weekday = heatmap_data.get("weekday") 172 hours = heatmap_data.get("hours", {}) 173 if weekday is not None: 174 for hour_str, minutes in hours.items(): 175 hour = int(hour_str) 176 self.heatmap[weekday][hour] += minutes 177 178 def scan_day(self, day: str, path: str) -> dict: 179 """Scan a single day and return stats dict for caching.""" 180 stats: Counter[str] = Counter() 181 transcript_duration = 0.0 182 percept_duration = 0.0 183 day_dir = Path(path) 184 185 # Track agent data for cache 186 agent_data = {} 187 facet_data = {} 188 heatmap_hours = {} 189 190 # --- Transcript sessions --- 191 # Check segment subdirectories for transcript JSONL files (day/stream/segment/) 192 transcript_files = list(day_dir.glob("*/*/audio.jsonl")) 193 transcript_files.extend(day_dir.glob("*/*/*_audio.jsonl")) 194 transcript_files.extend(day_dir.glob("*/*/*_transcript.jsonl")) 195 for jsonl_file in sorted(set(transcript_files)): 196 stats["transcript_sessions"] += 1 197 198 try: 199 with open(jsonl_file, encoding="utf-8") as f: 200 lines = [line.strip() for line in f if line.strip()] 201 202 if not lines: 203 logger.debug(f"Empty transcript file: {jsonl_file}") 204 continue 205 206 # First line is metadata, rest are segments 207 segments = [] 208 for i, line in enumerate(lines[1:], start=2): 209 try: 210 segments.append(json.loads(line)) 211 except json.JSONDecodeError as e: 212 logger.debug(f"Invalid JSON at line {i} in {jsonl_file}: {e}") 213 continue 214 215 stats["transcript_segments"] += len(segments) 216 217 # Calculate duration from timestamps 218 if segments: 219 duration = self._calculate_audio_duration(segments) 220 transcript_duration += duration 221 222 except (OSError, IOError) as e: 223 logger.warning(f"Error reading transcript file {jsonl_file}: {e}") 224 except Exception as e: 225 logger.warning(f"Unexpected error processing {jsonl_file}: {e}") 226 227 # --- Screen sessions --- 228 # Check segment subdirectories for screen files (day/stream/segment/) 229 screen_files = list(day_dir.glob("*/*/screen.jsonl")) 230 screen_files.extend(day_dir.glob("*/*/*_screen.jsonl")) 231 for jsonl_file in sorted(screen_files): 232 stats["percept_sessions"] += 1 233 234 try: 235 frames = load_analysis_frames(jsonl_file) 236 if not frames: 237 logger.debug(f"No valid frames in: {jsonl_file}") 238 continue 239 240 # Count frames (excluding header) 241 frame_count = sum(1 for f in frames if "frame_id" in f) 242 stats["percept_frames"] += frame_count 243 244 # Calculate duration from timestamps 245 if frame_count > 0: 246 duration = self._calculate_percept_duration(frames) 247 percept_duration += duration 248 249 except (OSError, IOError) as e: 250 logger.warning(f"Error reading screen file {jsonl_file}: {e}") 251 except Exception as e: 252 logger.warning(f"Unexpected error processing {jsonl_file}: {e}") 253 254 # --- Pending segments (unprocessed media files) --- 255 sense_info = sense_scan_day(day_dir) 256 stats["pending_segments"] = sense_info["pending_segments"] 257 258 # --- Insight summaries --- 259 output_info = generate_scan_day(day) 260 stats["outputs_processed"] = len(output_info["processed"]) 261 stats["outputs_pending"] = len(output_info["repairable"]) 262 263 # --- Activities and heatmap from facets/*/activities/YYYYMMDD.jsonl --- 264 weekday = datetime.strptime(day, "%Y%m%d").weekday() 265 for facet_name, _facet_meta in get_facets().items(): 266 activities_file = ( 267 Path(get_journal()) 268 / "facets" 269 / facet_name 270 / "activities" 271 / f"{day}.jsonl" 272 ) 273 try: 274 records = load_activity_records(facet_name, day) 275 for record in records: 276 activity_type = record.get("activity") or "unknown" 277 segments = record.get("segments") or [] 278 if not segments: 279 continue 280 281 if activity_type not in agent_data: 282 agent_data[activity_type] = {"count": 0, "minutes": 0.0} 283 agent_data[activity_type]["count"] += 1 284 285 duration_minutes = float(estimate_duration_minutes(segments)) 286 agent_data[activity_type]["minutes"] += duration_minutes 287 288 if facet_name not in facet_data: 289 facet_data[facet_name] = {"count": 0, "minutes": 0.0} 290 facet_data[facet_name]["count"] += 1 291 facet_data[facet_name]["minutes"] += duration_minutes 292 293 # Build heatmap hours for this day 294 for seg in segments: 295 start, end = segment_parse(seg) 296 if start is None or end is None: 297 continue 298 299 start_sec = start.hour * 3600 + start.minute * 60 + start.second 300 end_sec = end.hour * 3600 + end.minute * 60 + end.second 301 cur = start_sec 302 while cur < end_sec: 303 hour = cur // 3600 304 if hour >= 24: 305 break 306 next_tick = min((hour + 1) * 3600, end_sec) 307 minutes = (next_tick - cur) / 60 308 heatmap_hours[str(hour)] = ( 309 heatmap_hours.get(str(hour), 0.0) + minutes 310 ) 311 cur = next_tick 312 except (OSError, IOError) as e: 313 logger.warning(f"Error reading {activities_file}: {e}") 314 315 # --- Disk usage --- 316 stats["day_bytes"] = sum( 317 f.stat().st_size for f in day_dir.rglob("*") if f.is_file() 318 ) 319 320 # --- Build return dict --- 321 stats["transcript_duration"] = transcript_duration 322 stats["percept_duration"] = percept_duration 323 324 return { 325 "stats": dict(stats), 326 # NOTE: agent_data keys are now activity types (e.g., "meeting", "coding"), not extractor agent names. Key name retained for cache-format compatibility. 327 "agent_data": agent_data, 328 "facet_data": facet_data, 329 "heatmap_data": {"weekday": weekday, "hours": heatmap_hours}, 330 } 331 332 def scan_all_tokens(self, journal_path: Path, use_cache: bool = True) -> None: 333 """Scan all token usage files in the tokens directory. 334 335 Reads daily *.jsonl files (one JSON object per line). 336 """ 337 tokens_dir = journal_path / "tokens" 338 if not tokens_dir.is_dir(): 339 return 340 341 today = datetime.now(timezone.utc).strftime("%Y%m%d") 342 343 # Scan JSONL files only 344 for token_file in tokens_dir.glob("*.jsonl"): 345 day = token_file.stem 346 cache_file = token_file.parent / f"{day}.tokens_cache.json" 347 348 if use_cache and day != today and cache_file.exists(): 349 try: 350 if cache_file.stat().st_mtime > token_file.stat().st_mtime: 351 with open(cache_file, encoding="utf-8") as f: 352 cached = json.load(f) 353 self.token_usage[day] = cached 354 for model, counts in cached.items(): 355 if model not in self.token_totals: 356 self.token_totals[model] = {} 357 for token_type, count in counts.items(): 358 if token_type not in self.token_totals[model]: 359 self.token_totals[model][token_type] = 0 360 self.token_totals[model][token_type] += count 361 continue 362 except Exception as e: 363 logger.debug(f"Token cache load failed for {token_file}: {e}") 364 365 try: 366 with open(token_file, "r", encoding="utf-8") as f: 367 for line in f: 368 line = line.strip() 369 if not line: 370 continue 371 try: 372 data = json.loads(line) 373 self._process_token_entry(data) 374 except json.JSONDecodeError as e: 375 logger.debug(f"Invalid JSON in {token_file}: {e}") 376 continue 377 378 except (OSError, IOError) as e: 379 logger.warning(f"Error reading token file {token_file}: {e}") 380 continue 381 382 if use_cache and day != today: 383 try: 384 with open(cache_file, "w", encoding="utf-8") as f: 385 json.dump(self.token_usage.get(day, {}), f) 386 except Exception as e: 387 logger.debug(f"Token cache save failed for {token_file}: {e}") 388 389 def _process_token_entry(self, data: dict) -> None: 390 """Process a single token usage entry (expects normalized format).""" 391 # Extract date from timestamp 392 timestamp = data.get("timestamp") 393 if not timestamp: 394 return 395 396 # Use UTC for consistent date extraction (timestamps are in UTC from time.time()) 397 file_date = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime( 398 "%Y%m%d" 399 ) 400 model = data.get("model", "unknown") 401 usage = data.get("usage", {}) 402 403 # Initialize day's token usage if not exists 404 if file_date not in self.token_usage: 405 self.token_usage[file_date] = {} 406 407 # Initialize model entry if not exists 408 if model not in self.token_usage[file_date]: 409 self.token_usage[file_date][model] = {} 410 if model not in self.token_totals: 411 self.token_totals[model] = {} 412 413 # Add token counts (all fields are already normalized by migration) 414 for token_type, count in usage.items(): 415 if not isinstance(count, int): 416 continue 417 418 # Add to day's model totals 419 if token_type not in self.token_usage[file_date][model]: 420 self.token_usage[file_date][model][token_type] = 0 421 self.token_usage[file_date][model][token_type] += count 422 423 # Add to overall model totals 424 if token_type not in self.token_totals[model]: 425 self.token_totals[model][token_type] = 0 426 self.token_totals[model][token_type] += count 427 428 def scan(self, journal: str, verbose: bool = False, use_cache: bool = True) -> None: 429 days_map = day_dirs() 430 sorted_days = sorted(days_map.items()) 431 cache_hits = 0 432 cache_misses = 0 433 434 for idx, (day, path) in enumerate(sorted_days, 1): 435 if not os.path.isdir(path): 436 continue 437 438 day_dir = Path(path) 439 440 # Try cache first 441 cached_data = None 442 if use_cache: 443 cached_data = self._load_day_cache(day, day_dir) 444 445 if cached_data: 446 # Cache hit - apply cached data 447 self._apply_day_stats(day, cached_data) 448 cache_hits += 1 449 if verbose: 450 print( 451 f"[{idx}/{len(sorted_days)}] {day} (cached)", 452 end="\r", 453 flush=True, 454 ) 455 else: 456 # Cache miss - scan and save 457 cache_misses += 1 458 if verbose: 459 print( 460 f"[{idx}/{len(sorted_days)}] Scanning {day}...", 461 end="\r", 462 flush=True, 463 ) 464 day_data = self.scan_day(day, path) 465 self._apply_day_stats(day, day_data) 466 467 if use_cache: 468 self._save_day_cache(day_dir, day_data) 469 470 # Scan tokens directory once after all days are processed 471 self.scan_all_tokens(Path(journal), use_cache=use_cache) 472 473 if verbose: 474 cache_status = ( 475 f" (cache: {cache_hits} hits, {cache_misses} misses)" 476 if use_cache 477 else "" 478 ) 479 logger.info( 480 f"Scanned {len(self.days)} days, " 481 f"{self.totals.get('transcript_sessions', 0)} transcript sessions, " 482 f"{self.totals.get('percept_sessions', 0)} percept sessions" 483 f"{cache_status}" 484 ) 485 486 def to_dict(self) -> dict: 487 """Return a dictionary with all collected statistics.""" 488 days = { 489 day: {field: stats.get(field, 0) for field in DAY_FIELDS} 490 for day, stats in self.days.items() 491 } 492 return { 493 "schema_version": SCHEMA_VERSION, 494 "generated_at": datetime.now(timezone.utc).isoformat(), 495 "day_count": len(self.days), 496 "days": days, 497 "totals": { 498 **dict(self.totals), 499 "total_transcript_duration": self.total_transcript_duration, 500 "total_percept_duration": self.total_percept_duration, 501 }, 502 "heatmap": self.heatmap, 503 "tokens": { 504 "by_day": self.token_usage, 505 "by_model": self.token_totals, 506 }, 507 "talents": { 508 "counts": dict(self.agent_counts), 509 "minutes": {k: round(v, 2) for k, v in self.agent_minutes.items()}, 510 "counts_by_day": self.agent_counts_by_day, 511 }, 512 "facets": { 513 "counts": dict(self.facet_counts), 514 "minutes": {k: round(v, 2) for k, v in self.facet_minutes.items()}, 515 "counts_by_day": self.facet_counts_by_day, 516 }, 517 } 518 519 def save_json(self, journal: str) -> None: 520 """Write full statistics to ``stats.json`` in ``journal``.""" 521 data = self.to_dict() 522 errors = validate_stats(data) 523 if errors: 524 raise ValueError(f"Stats validation failed: {'; '.join(errors)}") 525 path = os.path.join(journal, "stats.json") 526 with open(path, "w", encoding="utf-8") as f: 527 json.dump(data, f, indent=2) 528 529 530def main() -> None: 531 parser = argparse.ArgumentParser( 532 description="Scan a solstone journal and generate statistics" 533 ) 534 parser.add_argument( 535 "--no-cache", 536 action="store_true", 537 help="Disable per-day caching (force re-scan all days)", 538 ) 539 args = setup_cli(parser) 540 journal = get_journal() 541 542 js = JournalStats() 543 js.scan(journal, verbose=args.verbose, use_cache=not args.no_cache) 544 545 try: 546 js.save_json(journal) 547 logger.info(f"Statistics saved to {journal}/stats.json") 548 except Exception as e: 549 logger.error(f"Error writing stats.json: {e}") 550 raise 551 552 553if __name__ == "__main__": 554 main()