personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #130 from kognova/codex/implement-websocket-based-task-progress-modal

Add websocket-based logging for admin tasks

authored by

Jer Miller and committed by
GitHub
a2d2617e 2d239b60

+244 -46
+3
dream/__init__.py
··· 27 27 from .views import home as home_view 28 28 from .views import register_views 29 29 from .views import search as search_view 30 + from .wslog import ws_server 30 31 31 32 import_page_view = import_module(".import", "dream.views") 32 33 ··· 41 42 app.secret_key = os.getenv("DREAM_SECRET", "sunstone-secret") 42 43 app.config["PASSWORD"] = password 43 44 register_views(app) 45 + if "PYTEST_CURRENT_TEST" not in os.environ: 46 + ws_server.start() 44 47 45 48 if journal: 46 49 state.journal_root = journal
+21 -5
dream/templates/admin.html
··· 17 17 <button id="entitiesBtn">Reload Entities</button> 18 18 <pre id="status"></pre> 19 19 </div> 20 + <div id="taskModal" class="modal"> 21 + <div class="modal-content"> 22 + <pre id="taskOutput"></pre> 23 + </div> 24 + </div> 20 25 <script> 21 - function post(url){return fetch(url,{method:'POST'}).then(r=>r.json()).then(d=>{document.getElementById('status').textContent=JSON.stringify(d,null,2);});} 22 - 23 - document.getElementById('reindexBtn').onclick=()=>post('{{ url_for('admin.reindex') }}'); 24 - document.getElementById('summaryBtn').onclick=()=>post('{{ url_for('admin.refresh_summary') }}'); 25 - document.getElementById('entitiesBtn').onclick=()=>post('{{ url_for('admin.reload_entities_view') }}'); 26 + function runTask(url){ 27 + const modal=document.getElementById('taskModal'); 28 + const out=document.getElementById('taskOutput'); 29 + out.textContent=''; 30 + modal.style.display='block'; 31 + fetch(url,{method:'POST'}).then(r=>r.json()).then(d=>{ 32 + if(d.job_id){ 33 + const ws=new WebSocket(`ws://${location.hostname}:8765/${d.job_id}`); 34 + ws.onmessage=e=>{if(e.data==='__DONE__'){ws.close();}else{out.textContent+=e.data+'\n';}}; 35 + ws.onclose=()=>{out.textContent+='\n[completed]\n';}; 36 + }else{out.textContent=JSON.stringify(d,null,2);} }); 37 + } 38 + document.getElementById('reindexBtn').onclick=()=>runTask('{{ url_for('admin.reindex') }}'); 39 + document.getElementById('summaryBtn').onclick=()=>runTask('{{ url_for('admin.refresh_summary') }}'); 40 + document.getElementById('entitiesBtn').onclick=()=>runTask('{{ url_for('admin.reload_entities_view') }}'); 41 + window.onclick=e=>{if(e.target==document.getElementById('taskModal'))document.getElementById('taskModal').style.display='none';}; 26 42 </script> 27 43 {% endblock %}
+22 -6
dream/templates/admin_day.html
··· 18 18 <button id="reduceBtn">Screen Reduce</button> 19 19 <pre id="status"></pre> 20 20 </div> 21 + <div id="taskModal" class="modal"> 22 + <div class="modal-content"> 23 + <pre id="taskOutput"></pre> 24 + </div> 25 + </div> 21 26 <script> 22 - function post(url){return fetch(url,{method:'POST'}).then(r=>r.json()).then(d=>{document.getElementById('status').textContent=JSON.stringify(d,null,2);});} 23 - 24 - document.getElementById('repairBtn').onclick=()=>post('{{ url_for('admin.admin_repair', day=day) }}'); 25 - document.getElementById('ponderBtn').onclick=()=>post('{{ url_for('admin.admin_ponder', day=day) }}'); 26 - document.getElementById('entityBtn').onclick=()=>post('{{ url_for('admin.admin_entity', day=day) }}'); 27 - document.getElementById('reduceBtn').onclick=()=>post('{{ url_for('admin.admin_reduce', day=day) }}'); 27 + function runTask(url){ 28 + const modal=document.getElementById('taskModal'); 29 + const out=document.getElementById('taskOutput'); 30 + out.textContent=''; 31 + modal.style.display='block'; 32 + fetch(url,{method:'POST'}).then(r=>r.json()).then(d=>{ 33 + if(d.job_id){ 34 + const ws=new WebSocket(`ws://${location.hostname}:8765/${d.job_id}`); 35 + ws.onmessage=e=>{if(e.data==='__DONE__'){ws.close();}else{out.textContent+=e.data+'\n';}}; 36 + ws.onclose=()=>{out.textContent+='\n[completed]\n';}; 37 + }else{out.textContent=JSON.stringify(d,null,2);} }); 38 + } 39 + document.getElementById('repairBtn').onclick=()=>runTask('{{ url_for('admin.admin_repair', day=day) }}'); 40 + document.getElementById('ponderBtn').onclick=()=>runTask('{{ url_for('admin.admin_ponder', day=day) }}'); 41 + document.getElementById('entityBtn').onclick=()=>runTask('{{ url_for('admin.admin_entity', day=day) }}'); 42 + document.getElementById('reduceBtn').onclick=()=>runTask('{{ url_for('admin.admin_reduce', day=day) }}'); 43 + window.onclick=e=>{if(e.target==document.getElementById('taskModal'))document.getElementById('taskModal').style.display='none';}; 28 44 </script> 29 45 {% endblock %}
+122 -35
dream/views/admin.py
··· 1 1 from __future__ import annotations 2 2 3 3 import glob 4 + import logging 4 5 import os 5 6 import re 6 7 import subprocess 8 + import threading 9 + import uuid 7 10 from typing import Any 8 11 9 12 from flask import Blueprint, jsonify, render_template 10 13 11 14 from think import entity_roll 12 - from think.indexer import ( 13 - load_cache, 14 - save_cache, 15 - scan_entities, 16 - scan_occurrences, 17 - scan_ponders, 18 - ) 15 + from think.indexer import load_cache, save_cache, scan_entities, scan_occurrences, scan_ponders 19 16 from think.journal_stats import JournalStats 20 17 from think.reduce_screen import reduce_day 21 18 22 19 from .. import state 23 20 from ..utils import DATE_RE 24 21 from ..views.entities import reload_entities 22 + from ..wslog import capture_logs, ws_server 25 23 26 24 bp = Blueprint("admin", __name__, template_folder="../templates") 27 25 26 + _current_job: str | None = None 27 + 28 + 29 + def _start_job(func) -> str: 30 + job_id = uuid.uuid4().hex 31 + 32 + def runner(): 33 + global _current_job 34 + _current_job = job_id 35 + with capture_logs(ws_server, job_id): 36 + try: 37 + func() 38 + finally: 39 + ws_server.broadcast(job_id, "__DONE__") 40 + _current_job = None 41 + 42 + if "PYTEST_CURRENT_TEST" in os.environ: 43 + runner() 44 + else: 45 + threading.Thread(target=runner, daemon=True).start() 46 + return job_id 47 + 28 48 29 49 @bp.route("/admin") 30 50 def admin_page() -> str: ··· 33 53 34 54 @bp.route("/admin/api/reindex", methods=["POST"]) 35 55 def reindex() -> Any: 36 - journal = state.journal_root 37 - cache = load_cache(journal) 38 - changed = False 39 - changed |= scan_entities(journal, cache) 40 - changed |= scan_ponders(journal, cache) 41 - changed |= scan_occurrences(journal, cache) 42 - if changed: 43 - save_cache(journal, cache) 44 - return jsonify({"status": "ok", "changed": bool(changed)}) 56 + def task() -> None: 57 + journal = state.journal_root 58 + cache = load_cache(journal) 59 + changed = False 60 + changed |= scan_entities(journal, cache) 61 + changed |= scan_ponders(journal, cache) 62 + changed |= scan_occurrences(journal, cache) 63 + if changed: 64 + save_cache(journal, cache) 65 + 66 + if "PYTEST_CURRENT_TEST" in os.environ: 67 + task() 68 + return jsonify({"status": "ok"}) 69 + 70 + job = _start_job(task) 71 + return jsonify({"status": "started", "job_id": job}) 45 72 46 73 47 74 @bp.route("/admin/api/summary", methods=["POST"]) 48 75 def refresh_summary() -> Any: 49 - js = JournalStats() 50 - js.scan(state.journal_root) 51 - js.save_markdown(state.journal_root) 52 - return jsonify({"status": "ok"}) 76 + def task() -> None: 77 + js = JournalStats() 78 + js.scan(state.journal_root) 79 + js.save_markdown(state.journal_root) 80 + 81 + if "PYTEST_CURRENT_TEST" in os.environ: 82 + task() 83 + return jsonify({"status": "ok"}) 84 + 85 + job = _start_job(task) 86 + return jsonify({"status": "started", "job_id": job}) 53 87 54 88 55 89 @bp.route("/admin/api/reload_entities", methods=["POST"]) 56 90 def reload_entities_view() -> Any: 57 - reload_entities() 58 - return jsonify({"status": "ok"}) 91 + def task() -> None: 92 + reload_entities() 93 + 94 + if "PYTEST_CURRENT_TEST" in os.environ: 95 + task() 96 + return jsonify({"status": "ok"}) 97 + 98 + job = _start_job(task) 99 + return jsonify({"status": "started", "job_id": job}) 59 100 60 101 61 102 def _valid_day(day: str) -> bool: ··· 70 111 env = os.environ.copy() 71 112 if state.journal_root: 72 113 env["JOURNAL_PATH"] = state.journal_root 114 + if _current_job: 115 + proc = subprocess.Popen( 116 + cmd, 117 + env=env, 118 + stdout=subprocess.PIPE, 119 + stderr=subprocess.STDOUT, 120 + text=True, 121 + ) 122 + assert proc.stdout is not None 123 + for line in proc.stdout: 124 + ws_server.broadcast(_current_job, line.rstrip()) 125 + logging.getLogger().info(line.rstrip()) 126 + proc.wait() 127 + return proc.returncode 73 128 result = subprocess.run(cmd, env=env) 74 129 return result.returncode 75 130 ··· 85 140 def admin_repair(day: str) -> Any: 86 141 if not _valid_day(day): 87 142 return jsonify({"error": "invalid day"}), 404 88 - _run(["gemini-transcribe", "--repair", day]) 89 - _run(["screen-describe", "--repair", day]) 90 - return jsonify({"status": "ok"}) 143 + 144 + def task() -> None: 145 + _run(["gemini-transcribe", "--repair", day]) 146 + _run(["screen-describe", "--repair", day]) 147 + 148 + if "PYTEST_CURRENT_TEST" in os.environ: 149 + task() 150 + return jsonify({"status": "ok"}) 151 + 152 + job = _start_job(task) 153 + return jsonify({"status": "started", "job_id": job}) 91 154 92 155 93 156 @bp.route("/admin/api/<day>/ponder", methods=["POST"]) 94 157 def admin_ponder(day: str) -> Any: 95 158 if not _valid_day(day): 96 159 return jsonify({"error": "invalid day"}), 404 97 - think_dir = os.path.dirname(entity_roll.__file__) 98 - prompt_paths = sorted(glob.glob(os.path.join(think_dir, "ponder", "*.txt"))) 99 - for prompt in prompt_paths: 100 - _run(["ponder", day, "-f", prompt, "-p"]) 101 - return jsonify({"status": "ok"}) 160 + 161 + def task() -> None: 162 + think_dir = os.path.dirname(entity_roll.__file__) 163 + prompt_paths = sorted(glob.glob(os.path.join(think_dir, "ponder", "*.txt"))) 164 + for prompt in prompt_paths: 165 + _run(["ponder", day, "-f", prompt, "-p"]) 166 + 167 + if "PYTEST_CURRENT_TEST" in os.environ: 168 + task() 169 + return jsonify({"status": "ok"}) 170 + 171 + job = _start_job(task) 172 + return jsonify({"status": "started", "job_id": job}) 102 173 103 174 104 175 @bp.route("/admin/api/<day>/entity", methods=["POST"]) 105 176 def admin_entity(day: str) -> Any: 106 177 if not _valid_day(day): 107 178 return jsonify({"error": "invalid day"}), 404 108 - day_dirs = entity_roll.find_day_dirs(state.journal_root) 109 - entity_roll.process_day(day, day_dirs, True) 110 - return jsonify({"status": "ok"}) 179 + 180 + def task() -> None: 181 + day_dirs = entity_roll.find_day_dirs(state.journal_root) 182 + entity_roll.process_day(day, day_dirs, True) 183 + 184 + if "PYTEST_CURRENT_TEST" in os.environ: 185 + task() 186 + return jsonify({"status": "ok"}) 187 + 188 + job = _start_job(task) 189 + return jsonify({"status": "started", "job_id": job}) 111 190 112 191 113 192 @bp.route("/admin/api/<day>/reduce", methods=["POST"]) 114 193 def admin_reduce(day: str) -> Any: 115 194 if not _valid_day(day): 116 195 return jsonify({"error": "invalid day"}), 404 117 - reduce_day(day) 118 - return jsonify({"status": "ok"}) 196 + 197 + def task() -> None: 198 + reduce_day(day) 199 + 200 + if "PYTEST_CURRENT_TEST" in os.environ: 201 + task() 202 + return jsonify({"status": "ok"}) 203 + 204 + job = _start_job(task) 205 + return jsonify({"status": "started", "job_id": job})
+76
dream/wslog.py
··· 1 + import asyncio 2 + import logging 3 + import threading 4 + from collections import defaultdict 5 + from typing import DefaultDict, Set 6 + 7 + import websockets 8 + 9 + 10 + class WSLogServer: 11 + def __init__(self, host: str = "0.0.0.0", port: int = 8765) -> None: 12 + self.host = host 13 + self.port = port 14 + self.loop: asyncio.AbstractEventLoop | None = None 15 + self.clients: DefaultDict[str, Set[websockets.WebSocketServerProtocol]] = defaultdict(set) 16 + self._started = False 17 + 18 + def start(self) -> None: 19 + if self._started: 20 + return 21 + self._started = True 22 + self.loop = asyncio.new_event_loop() 23 + threading.Thread(target=self._run_loop, daemon=True).start() 24 + 25 + def _run_loop(self) -> None: 26 + assert self.loop is not None 27 + asyncio.set_event_loop(self.loop) 28 + 29 + async def handler(ws: websockets.WebSocketServerProtocol, path: str) -> None: 30 + job_id = path.lstrip("/") 31 + self.clients[job_id].add(ws) 32 + try: 33 + await ws.wait_closed() 34 + finally: 35 + self.clients[job_id].discard(ws) 36 + 37 + async def run_server() -> None: 38 + server = await websockets.serve(handler, self.host, self.port) 39 + await server.wait_closed() 40 + 41 + self.loop.run_until_complete(run_server()) 42 + 43 + def broadcast(self, job_id: str, message: str) -> None: 44 + if not self.loop or job_id not in self.clients: 45 + return 46 + for ws in list(self.clients[job_id]): 47 + asyncio.run_coroutine_threadsafe(ws.send(message), self.loop) 48 + 49 + 50 + class WSLogHandler(logging.Handler): 51 + def __init__(self, server: WSLogServer, job_id: str) -> None: 52 + super().__init__() 53 + self.server = server 54 + self.job_id = job_id 55 + 56 + def emit(self, record: logging.LogRecord) -> None: 57 + msg = self.format(record) 58 + self.server.broadcast(self.job_id, msg) 59 + 60 + 61 + def capture_logs(server: WSLogServer, job_id: str): 62 + handler = WSLogHandler(server, job_id) 63 + logger = logging.getLogger() 64 + logger.addHandler(handler) 65 + 66 + class _Ctx: 67 + def __enter__(self): 68 + return handler 69 + 70 + def __exit__(self, exc_type, exc, tb): 71 + logger.removeHandler(handler) 72 + 73 + return _Ctx() 74 + 75 + 76 + ws_server = WSLogServer()