a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 286 lines 9.9 kB view raw
1"""Show which bot version created each memory record. 2 3Cross-references turbopuffer record timestamps against Fly.io deployment history 4and git tags to attribute each record to a bot version. 5 6Usage: 7 uv run scripts/memory_versions.py # all user namespaces 8 uv run scripts/memory_versions.py USER_HANDLE # specific user 9 uv run scripts/memory_versions.py --summary # version counts only 10 uv run scripts/memory_versions.py --episodic # episodic memories 11""" 12 13import argparse 14import json 15import subprocess 16import sys 17from datetime import datetime, timezone 18 19from turbopuffer import Turbopuffer 20 21from bot.config import settings 22 23 24def get_client() -> Turbopuffer: 25 return Turbopuffer(api_key=settings.turbopuffer_api_key, region=settings.turbopuffer_region) 26 27 28def get_deploy_windows() -> list[dict]: 29 """Build version windows from Fly.io releases and git tags. 30 31 Returns a sorted list of {start, end, fly_version, git_tag} dicts. 32 """ 33 # fly.io releases 34 result = subprocess.run( 35 ["fly", "releases", "-a", "zzstoatzz-phi", "--json"], 36 capture_output=True, text=True, 37 ) 38 if result.returncode != 0: 39 print(f"error fetching fly releases: {result.stderr}", file=sys.stderr) 40 sys.exit(1) 41 42 releases = json.loads(result.stdout) 43 releases.sort(key=lambda r: r["CreatedAt"]) 44 45 # git tags with timestamps 46 result = subprocess.run( 47 ["git", "tag", "-l", "v*", "--format=%(creatordate:iso-strict) %(refname:short)"], 48 capture_output=True, text=True, cwd=".", 49 ) 50 tag_times: dict[str, datetime] = {} 51 for line in result.stdout.strip().splitlines(): 52 if not line.strip(): 53 continue 54 # format: "2026-03-25T01:21:17-05:00 v0.0.8" 55 parts = line.strip().split(maxsplit=1) 56 if len(parts) == 2: 57 ts = datetime.fromisoformat(parts[0]) 58 tag_times[parts[1]] = ts 59 60 # build windows: each release's window is [its start, next release's start) 61 windows = [] 62 for i, rel in enumerate(releases): 63 start = datetime.fromisoformat(rel["CreatedAt"].replace("Z", "+00:00")) 64 if i + 1 < len(releases): 65 end = datetime.fromisoformat(releases[i + 1]["CreatedAt"].replace("Z", "+00:00")) 66 else: 67 end = datetime.now(timezone.utc) 68 69 fly_version = rel["Version"] 70 71 # find the most recent git tag at or before this deploy 72 matching_tag = None 73 for tag, tag_ts in sorted(tag_times.items(), key=lambda kv: kv[1], reverse=True): 74 tag_utc = tag_ts.astimezone(timezone.utc) 75 if tag_utc <= start: 76 matching_tag = tag 77 break 78 79 windows.append({ 80 "start": start, 81 "end": end, 82 "fly_version": fly_version, 83 "git_tag": matching_tag or "pre-tags", 84 }) 85 86 return windows 87 88 89def classify_record(created_at: str, windows: list[dict]) -> dict: 90 """Find which deploy window a record's created_at falls into.""" 91 if not created_at: 92 return {"fly_version": "?", "git_tag": "?"} 93 94 try: 95 ts = datetime.fromisoformat(created_at) 96 if ts.tzinfo is None: 97 ts = ts.replace(tzinfo=timezone.utc) 98 except ValueError: 99 return {"fly_version": "?", "git_tag": "?"} 100 101 for w in windows: 102 if w["start"] <= ts < w["end"]: 103 return {"fly_version": w["fly_version"], "git_tag": w["git_tag"]} 104 105 # before earliest deploy 106 if windows and ts < windows[0]["start"]: 107 return {"fly_version": f"<{windows[0]['fly_version']}", "git_tag": "pre-deploy"} 108 109 return {"fly_version": "?", "git_tag": "?"} 110 111 112def dump_with_versions(client: Turbopuffer, handle: str, windows: list[dict], summary_only: bool = False): 113 """Dump records for a user, annotated with bot version.""" 114 clean = handle.replace(".", "_").replace("@", "").replace("-", "_") 115 ns_name = f"phi-users-{clean}" 116 ns = client.namespace(ns_name) 117 118 try: 119 response = ns.query( 120 rank_by=("vector", "ANN", [0.5] * 1536), 121 top_k=200, 122 include_attributes=["kind", "content", "tags", "created_at"], 123 ) 124 except Exception as e: 125 if "was not found" in str(e): 126 print(f"no namespace found for @{handle}") 127 return 128 if "attribute" in str(e) and "not found" in str(e): 129 response = ns.query( 130 rank_by=("vector", "ANN", [0.5] * 1536), 131 top_k=200, 132 include_attributes=True, 133 ) 134 else: 135 raise 136 137 if not response.rows: 138 print(f"no rows for @{handle}") 139 return 140 141 # classify each record 142 records = [] 143 for row in response.rows: 144 created_at = getattr(row, "created_at", "") 145 version_info = classify_record(created_at, windows) 146 records.append({ 147 "id": row.id, 148 "kind": getattr(row, "kind", "?"), 149 "content": row.content, 150 "tags": getattr(row, "tags", []), 151 "created_at": created_at, 152 **version_info, 153 }) 154 155 if summary_only: 156 print(f"\n@{handle} ({len(records)} records)") 157 counts: dict[str, dict[str, int]] = {} 158 for r in records: 159 label = r["git_tag"] 160 kind = r["kind"] 161 counts.setdefault(label, {}).setdefault(kind, 0) 162 counts[label][kind] += 1 163 for label in sorted(counts.keys()): 164 kinds = ", ".join(f"{k}={v}" for k, v in sorted(counts[label].items())) 165 print(f" {label:<15} {kinds}") 166 return 167 168 print(f"\n{'='*70}") 169 print(f"@{handle} ({len(records)} records)") 170 print(f"{'='*70}\n") 171 172 for r in sorted(records, key=lambda x: x["created_at"]): 173 kind = r["kind"] 174 content = r["content"][:90].replace("\n", " ") 175 tags = f" [{', '.join(r['tags'])}]" if r["tags"] else "" 176 version = f"{r['git_tag']} (fly v{r['fly_version']})" 177 print(f" {version:<25} ({kind:<11}) {content}{tags}") 178 print(f" {'':25} [{r['id']}] {r['created_at']}") 179 print() 180 181 182def dump_episodic_with_versions(client: Turbopuffer, windows: list[dict], summary_only: bool = False): 183 """Dump episodic memories annotated with bot version.""" 184 ns = client.namespace("phi-episodic") 185 186 try: 187 response = ns.query( 188 rank_by=("vector", "ANN", [0.5] * 1536), 189 top_k=200, 190 include_attributes=["content", "tags", "source", "created_at"], 191 ) 192 except Exception as e: 193 if "was not found" in str(e): 194 print("no episodic memories found") 195 return 196 raise 197 198 if not response.rows: 199 print("no episodic memories") 200 return 201 202 records = [] 203 for row in response.rows: 204 created_at = getattr(row, "created_at", "") 205 version_info = classify_record(created_at, windows) 206 records.append({ 207 "id": row.id, 208 "content": row.content, 209 "tags": getattr(row, "tags", []), 210 "source": getattr(row, "source", "unknown"), 211 "created_at": created_at, 212 **version_info, 213 }) 214 215 if summary_only: 216 print(f"\nepisodic ({len(records)} records)") 217 counts: dict[str, int] = {} 218 for r in records: 219 counts[r["git_tag"]] = counts.get(r["git_tag"], 0) + 1 220 for tag in sorted(counts.keys()): 221 print(f" {tag:<15} {counts[tag]} records") 222 return 223 224 print(f"\n{'='*60}") 225 print(f"episodic memories ({len(records)} records)") 226 print(f"{'='*60}\n") 227 228 by_version: dict[str, list[dict]] = {} 229 for r in records: 230 by_version.setdefault(r["git_tag"], []).append(r) 231 232 for tag in sorted(by_version.keys()): 233 group = by_version[tag] 234 print(f"--- {tag} (fly v{group[0]['fly_version']}) ---\n") 235 for r in sorted(group, key=lambda x: x["created_at"]): 236 content = r["content"][:100].replace("\n", " ") 237 tags = f" [{', '.join(r['tags'])}]" if r["tags"] else "" 238 print(f" [{r['id']}] {content}{tags}") 239 print(f" source: {r['source']} {r['created_at']}") 240 print() 241 242 243def main(): 244 parser = argparse.ArgumentParser(description="Show which bot version created each memory") 245 parser.add_argument("handle", nargs="?", help="User handle to inspect") 246 parser.add_argument("--summary", action="store_true", help="Version counts only") 247 parser.add_argument("--episodic", action="store_true", help="Episodic memories") 248 parser.add_argument("--all", action="store_true", help="All user namespaces") 249 args = parser.parse_args() 250 251 client = get_client() 252 windows = get_deploy_windows() 253 254 if args.episodic: 255 dump_episodic_with_versions(client, windows, args.summary) 256 return 257 258 if args.handle: 259 dump_with_versions(client, args.handle, windows, args.summary) 260 return 261 262 if args.all or args.summary: 263 prefix = "phi-users-" 264 page = client.namespaces(prefix=prefix) 265 for ns_summary in sorted(page.namespaces, key=lambda n: n.id): 266 handle = ns_summary.id.removeprefix(prefix).replace("_", ".") 267 dump_with_versions(client, handle, windows, args.summary) 268 return 269 270 # default: list namespaces 271 prefix = "phi-users-" 272 page = client.namespaces(prefix=prefix) 273 user_ns = [ns for ns in page.namespaces if ns.id.startswith(prefix)] 274 if not user_ns: 275 print("no user namespaces found") 276 return 277 print(f"found {len(user_ns)} user namespaces:\n") 278 for ns in sorted(user_ns, key=lambda n: n.id): 279 handle = ns.id.removeprefix(prefix).replace("_", ".") 280 print(f" {handle:<40} ({ns.id})") 281 print(f"\nuse: uv run scripts/memory_versions.py HANDLE") 282 print(f" or: uv run scripts/memory_versions.py --all --summary") 283 284 285if __name__ == "__main__": 286 main()