ive harnessed the harness
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update memory to use sqlite-vec, store turns, restore on daemon restart

dawn 6062e8f7 bb153323

+239 -73
+15 -2
src/agent.rs
··· 24 24 let mut ctx = Context::new(&config.anchor); 25 25 let mut turn_count = 0usize; 26 26 27 + // resume: replay the sliding window from the last run 28 + if let Ok(prior) = memory.recent_turns(config.compaction_keep) { 29 + let pairs: Vec<(String, String)> = prior.into_iter().map(|(_, r, c)| (r, c)).collect(); 30 + ctx.load_turns(&pairs); 31 + turn_count = pairs.len(); 32 + } 33 + 27 34 while let Some(interrupt) = rx.recv().await { 28 35 let text = interrupt.to_text().to_string(); 29 36 let source = interrupt.source_tag().to_string(); ··· 32 39 .embed(&text) 33 40 .await 34 41 .ok() 35 - .and_then(|emb| memory.retrieve(&emb, config.memory_top_k).ok()) 42 + .and_then(|emb| memory.top_k(&emb, config.memory_top_k).ok()) 36 43 .unwrap_or_default() 37 44 .into_iter() 38 - .map(|m| m.content) 45 + .filter(|(dist, _)| *dist < config.memory_sim_threshold) 46 + .map(|(_, m)| m.content) 39 47 .collect(); 40 48 41 49 if !memories.is_empty() { ··· 49 57 } 50 58 51 59 ctx.push_user(&source, &text, &memories); 60 + let _ = memory.log_turn("user", &text, None); 52 61 turn_count += 1; 53 62 54 63 let (tok_tx, mut tok_rx) = mpsc::channel(256); ··· 59 68 }); 60 69 61 70 let mut response = String::new(); 71 + let mut thinking = String::new(); 62 72 while let Some((is_think, tok)) = tok_rx.recv().await { 63 73 if is_think { 74 + thinking.push_str(&tok); 64 75 let _ = output.send(ServerMsg::ThinkToken { content: tok }); 65 76 } else { 66 77 response.push_str(&tok); ··· 69 80 } 70 81 71 82 ctx.push_assistant(&response); 83 + let thinking_ref = (!thinking.is_empty()).then_some(thinking.as_str()); 84 + let _ = memory.log_turn("assistant", &response, thinking_ref); 72 85 turn_count += 1; 73 86 74 87 let _ = output.send(ServerMsg::Done);
+4
src/config.rs
··· 12 12 pub compaction_keep: usize, 13 13 /// memories to inject per turn 14 14 pub memory_top_k: usize, 15 + /// cosine distance cutoff — only inject memories below this (0=identical, 2=opposite). 16 + /// 0.3 ≈ cosine similarity ≥ 0.7, a reasonable bar for nomic-embed. 17 + pub memory_sim_threshold: f32, 15 18 pub db_path: String, 16 19 pub anchor: String, 17 20 } ··· 32 35 watermark_chars: 48_000, 33 36 compaction_keep: 10, 34 37 memory_top_k: 3, 38 + memory_sim_threshold: 0.3, 35 39 db_path: "agent.db".into(), 36 40 anchor: ANCHOR.into(), 37 41 }
+10
src/context.rs
··· 15 15 } 16 16 } 17 17 18 + /// replay persisted turns into context on startup 19 + pub fn load_turns(&mut self, turns: &[(String, String)]) { 20 + for (role, content) in turns { 21 + self.turns.push(Message { 22 + role: role.clone(), 23 + content: content.clone(), 24 + }); 25 + } 26 + } 27 + 18 28 /// push a user turn, optionally prepending recalled memories 19 29 pub fn push_user(&mut self, source: &str, content: &str, memories: &[String]) { 20 30 let text = if memories.is_empty() {
+29 -9
src/daemon.rs
··· 1 1 use anyhow::Result; 2 2 use std::sync::Arc; 3 3 use tokio::{ 4 - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, 4 + io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}, 5 5 net::UnixListener, 6 6 sync::{broadcast, mpsc, RwLock}, 7 7 }; ··· 9 9 use crate::{ 10 10 interrupt::Interrupt, 11 11 ipc::{sock_path, ClientMsg, ServerMsg}, 12 + memory::MemoryStore, 12 13 }; 13 14 14 15 /// last known metrics snapshot, updated by agent, read on new client connect 15 16 pub type MetricsSnapshot = Arc<RwLock<Option<ServerMsg>>>; 17 + 18 + const HISTORY_PAGE: usize = 50; 16 19 17 20 pub async fn serve( 18 21 interrupt_tx: mpsc::Sender<Interrupt>, 19 22 output: broadcast::Sender<ServerMsg>, 20 23 snapshot: MetricsSnapshot, 24 + memory: MemoryStore, 25 + history_window: usize, 21 26 ) -> Result<()> { 22 27 let path = sock_path(); 23 28 if path.exists() { ··· 32 37 let tx = interrupt_tx.clone(); 33 38 let bcast = output.clone(); 34 39 let snapshot = snapshot.clone(); 40 + let memory = memory.clone(); 35 41 36 42 tokio::spawn(async move { 37 - if let Err(e) = handle(stream, tx, bcast, snapshot).await { 43 + if let Err(e) = handle(stream, tx, bcast, snapshot, memory, history_window).await { 38 44 tracing::error!(err = %e, "client error"); 39 45 } 40 46 }); 41 47 } 42 48 } 43 49 50 + async fn send_msg(sock_tx: &mut (impl AsyncWrite + Unpin), msg: &ServerMsg) -> Result<()> { 51 + let mut line = serde_json::to_string(msg)?; 52 + line.push('\n'); 53 + sock_tx.write_all(line.as_bytes()).await?; 54 + Ok(()) 55 + } 56 + 44 57 async fn handle( 45 58 sock: tokio::net::UnixStream, 46 59 interrupt_tx: mpsc::Sender<Interrupt>, 47 60 output: broadcast::Sender<ServerMsg>, 48 61 snapshot: MetricsSnapshot, 62 + memory: MemoryStore, 63 + history_window: usize, 49 64 ) -> Result<()> { 50 65 let mut rx = output.subscribe(); 51 66 let (sock_rx, mut sock_tx) = sock.into_split(); 52 67 let mut sock_rx = BufReader::new(sock_rx).lines(); 53 68 54 - // send current metrics immediately so tui doesn't start blank 69 + // push current context window so tui starts with history 70 + let turns = memory.recent_turns(history_window).unwrap_or_default(); 71 + send_msg(&mut sock_tx, &ServerMsg::History { turns }).await?; 72 + 73 + // send last known metrics so status bar is populated immediately 55 74 if let Some(metrics) = snapshot.read().await.clone() { 56 - let mut line = serde_json::to_string(&metrics)?; 57 - line.push('\n'); 58 - sock_tx.write_all(line.as_bytes()).await?; 75 + send_msg(&mut sock_tx, &metrics).await?; 59 76 } 60 77 61 78 loop { ··· 71 88 }; 72 89 interrupt_tx.send(int).await?; 73 90 } 91 + ClientMsg::FetchHistory { before_id, limit } => { 92 + let limit = limit.min(HISTORY_PAGE); 93 + let turns = memory.turns_before(before_id, limit).unwrap_or_default(); 94 + send_msg(&mut sock_tx, &ServerMsg::HistoryPage { turns }).await?; 95 + } 74 96 } 75 97 } 76 98 msg = rx.recv() => { 77 99 let msg = msg?; 78 100 tracing::info!(msg = ?msg, "received message from agent"); 79 - let mut line = serde_json::to_string(&msg)?; 80 - line.push('\n'); 81 - sock_tx.write_all(line.as_bytes()).await?; 101 + send_msg(&mut sock_tx, &msg).await?; 82 102 } 83 103 } 84 104 }
+9
src/ipc.rs
··· 6 6 #[serde(tag = "type", rename_all = "snake_case")] 7 7 pub enum ClientMsg { 8 8 Message { source: String, content: String }, 9 + FetchHistory { before_id: i64, limit: usize }, 9 10 } 10 11 11 12 /// daemon → client ··· 27 28 turn_count: usize, 28 29 context_chars: usize, 29 30 watermark: usize, 31 + }, 32 + /// pushed on connect: current context window turns 33 + History { 34 + turns: Vec<(i64, String, String)>, 35 + }, 36 + /// response to FetchHistory 37 + HistoryPage { 38 + turns: Vec<(i64, String, String)>, 30 39 }, 31 40 } 32 41
+3 -2
src/main.rs
··· 42 42 // bsky::run(tx).await 43 43 // }); 44 44 45 + let history_window = config.compaction_keep; 45 46 let agent = tokio::spawn(agent::run( 46 47 config, 47 48 llm, 48 - memory, 49 + memory.clone(), 49 50 interrupt_rx, 50 51 output_tx.clone(), 51 52 snapshot.clone(), 52 53 )); 53 - let sock = tokio::spawn(daemon::serve(interrupt_tx, output_tx, snapshot)); 54 + let sock = tokio::spawn(daemon::serve(interrupt_tx, output_tx, snapshot, memory, history_window)); 54 55 55 56 tokio::select! { 56 57 r = agent => { r??; }
+98 -55
src/memory.rs
··· 1 1 use anyhow::Result; 2 - use rusqlite::{params, Connection}; 2 + use rusqlite::{ffi::sqlite3_auto_extension, params, Connection}; 3 + use sqlite_vec::sqlite3_vec_init; 3 4 use std::sync::{Arc, Mutex}; 4 5 5 6 pub struct Memory { 6 7 pub content: String, 7 8 } 8 9 9 - /// sqlite-backed episodic memory store. 10 - /// embeddings stored as raw little-endian f32 blobs, 11 - /// retrieval does brute force cosine similarity (fast enough at personal scale). 10 + /// sqlite-backed episodic memory store using sqlite-vec for cosine ANN. 12 11 #[derive(Clone)] 13 12 pub struct MemoryStore(Arc<Mutex<Connection>>); 14 13 15 14 impl MemoryStore { 16 15 pub fn open(path: &str) -> Result<Self> { 16 + // register sqlite-vec extension for all connections opened after this point 17 + unsafe { 18 + sqlite3_auto_extension(Some(std::mem::transmute(sqlite3_vec_init as *const ()))); 19 + } 20 + 17 21 let conn = Connection::open(path)?; 18 22 conn.execute_batch( 19 23 " 20 24 CREATE TABLE IF NOT EXISTS memories ( 21 25 id INTEGER PRIMARY KEY, 22 26 content TEXT NOT NULL, 23 - emb BLOB NOT NULL, 24 - ts INTEGER NOT NULL DEFAULT (unixepoch()), 25 - hits INTEGER NOT NULL DEFAULT 0 27 + ts INTEGER NOT NULL DEFAULT (unixepoch()) 28 + ); 29 + 30 + CREATE VIRTUAL TABLE IF NOT EXISTS vec_memories USING vec0( 31 + embedding float[1024], 32 + distance_metric=cosine 33 + ); 34 + 35 + CREATE TABLE IF NOT EXISTS turns ( 36 + id INTEGER PRIMARY KEY, 37 + role TEXT NOT NULL, 38 + content TEXT NOT NULL, 39 + thinking TEXT, 40 + ts INTEGER NOT NULL DEFAULT (unixepoch()) 26 41 ); 27 42 ", 28 43 )?; ··· 30 45 } 31 46 32 47 pub fn store(&self, content: &str, emb: &[f32]) -> Result<()> { 48 + let conn = self.0.lock().unwrap(); 49 + conn.execute( 50 + "INSERT INTO memories (content) VALUES (?1)", 51 + params![content], 52 + )?; 53 + let id = conn.last_insert_rowid(); 33 54 let blob = f32s_to_bytes(emb); 34 - self.0.lock().unwrap().execute( 35 - "INSERT INTO memories (content, emb) VALUES (?1, ?2)", 36 - params![content, blob], 55 + conn.execute( 56 + "INSERT INTO vec_memories (rowid, embedding) VALUES (?1, ?2)", 57 + params![id, blob], 37 58 )?; 38 59 Ok(()) 39 60 } 40 61 41 - pub fn retrieve(&self, query: &[f32], k: usize) -> Result<Vec<Memory>> { 62 + /// k-nearest memories by cosine distance, returns (distance, memory) pairs. 63 + pub fn top_k(&self, query: &[f32], k: usize) -> Result<Vec<(f32, Memory)>> { 42 64 let conn = self.0.lock().unwrap(); 65 + let blob = f32s_to_bytes(query); 66 + let mut stmt = conn.prepare( 67 + " 68 + SELECT m.content, v.distance 69 + FROM vec_memories v 70 + JOIN memories m ON m.id = v.rowid 71 + WHERE v.embedding MATCH ?1 72 + ORDER BY v.distance 73 + LIMIT ?2 74 + ", 75 + )?; 76 + let results = stmt 77 + .query_map(params![blob, k as i64], |row| { 78 + Ok((row.get::<_, f32>(1)?, row.get::<_, String>(0)?)) 79 + })? 80 + .filter_map(|r| r.ok()) 81 + .map(|(dist, content)| (dist, Memory { content })) 82 + .collect(); 83 + Ok(results) 84 + } 43 85 44 - // load recent candidates (cap at 1000 to bound brute force cost) 45 - let mut stmt = 46 - conn.prepare("SELECT id, content, emb FROM memories ORDER BY ts DESC LIMIT 1000")?; 86 + /// all memories, newest first. 87 + pub fn get_all(&self) -> Result<Vec<Memory>> { 88 + let conn = self.0.lock().unwrap(); 89 + let mut stmt = conn.prepare("SELECT content FROM memories ORDER BY ts DESC")?; 90 + let results = stmt 91 + .query_map([], |row| row.get::<_, String>(0))? 92 + .filter_map(|r| r.ok()) 93 + .map(|content| Memory { content }) 94 + .collect(); 95 + Ok(results) 96 + } 97 + 98 + pub fn log_turn(&self, role: &str, content: &str, thinking: Option<&str>) -> Result<()> { 99 + self.0.lock().unwrap().execute( 100 + "INSERT INTO turns (role, content, thinking) VALUES (?1, ?2, ?3)", 101 + params![role, content, thinking], 102 + )?; 103 + Ok(()) 104 + } 47 105 48 - let mut scored: Vec<(f32, i64, String)> = stmt 49 - .query_map([], |row| { 106 + /// last `n` turns in chronological order (oldest first, ready to replay into context). 107 + pub fn recent_turns(&self, n: usize) -> Result<Vec<(i64, String, String)>> { 108 + let conn = self.0.lock().unwrap(); 109 + let mut stmt = conn.prepare( 110 + "SELECT id, role, content FROM (SELECT id, role, content, ts FROM turns ORDER BY ts DESC LIMIT ?1) ORDER BY ts ASC", 111 + )?; 112 + let results = stmt 113 + .query_map(params![n as i64], |row| { 50 114 Ok(( 51 115 row.get::<_, i64>(0)?, 52 116 row.get::<_, String>(1)?, 53 - row.get::<_, Vec<u8>>(2)?, 117 + row.get::<_, String>(2)?, 54 118 )) 55 119 })? 56 120 .filter_map(|r| r.ok()) 57 - .map(|(id, content, blob)| { 58 - let score = cosine(query, &bytes_to_f32s(&blob)); 59 - (score, id, content) 60 - }) 61 121 .collect(); 62 - 63 - scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); 122 + Ok(results) 123 + } 64 124 65 - let results: Vec<Memory> = scored 66 - .into_iter() 67 - .take(k) 68 - .map(|(_, id, content)| { 69 - // bump hit count + recency for recalled memories 70 - let _ = conn.execute( 71 - "UPDATE memories SET hits = hits + 1, ts = unixepoch() WHERE id = ?1", 72 - params![id], 73 - ); 74 - Memory { content } 75 - }) 125 + /// turns older than `before_id`, newest-first then reversed, for scroll-back paging. 126 + pub fn turns_before(&self, before_id: i64, limit: usize) -> Result<Vec<(i64, String, String)>> { 127 + let conn = self.0.lock().unwrap(); 128 + let mut stmt = conn.prepare( 129 + "SELECT id, role, content FROM (SELECT id, role, content FROM turns WHERE id < ?1 ORDER BY id DESC LIMIT ?2) ORDER BY id ASC", 130 + )?; 131 + let results = stmt 132 + .query_map(params![before_id, limit as i64], |row| { 133 + Ok(( 134 + row.get::<_, i64>(0)?, 135 + row.get::<_, String>(1)?, 136 + row.get::<_, String>(2)?, 137 + )) 138 + })? 139 + .filter_map(|r| r.ok()) 76 140 .collect(); 77 - 78 141 Ok(results) 79 142 } 80 143 } ··· 82 145 fn f32s_to_bytes(v: &[f32]) -> Vec<u8> { 83 146 v.iter().flat_map(|f| f.to_le_bytes()).collect() 84 147 } 85 - 86 - fn bytes_to_f32s(b: &[u8]) -> Vec<f32> { 87 - b.chunks_exact(4) 88 - .map(|c| f32::from_le_bytes(c.try_into().unwrap())) 89 - .collect() 90 - } 91 - 92 - fn cosine(a: &[f32], b: &[f32]) -> f32 { 93 - if a.len() != b.len() || a.is_empty() { 94 - return 0.0; 95 - } 96 - let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum(); 97 - let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt(); 98 - let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt(); 99 - if na == 0.0 || nb == 0.0 { 100 - 0.0 101 - } else { 102 - dot / (na * nb) 103 - } 104 - }
+71 -5
src/tui.rs
··· 104 104 status: String, 105 105 cmd_mode: bool, 106 106 107 + // history scrollback 108 + oldest_turn_id: Option<i64>, 109 + history_exhausted: bool, 110 + loading_history: bool, 111 + 107 112 // metrics 108 113 turn_count: usize, 109 114 context_chars: usize, ··· 123 128 cursor: 0, 124 129 status: String::new(), 125 130 cmd_mode: false, 131 + oldest_turn_id: None, 132 + history_exhausted: false, 133 + loading_history: false, 126 134 turn_count: 0, 127 135 context_chars: 0, 128 136 watermark: 0, ··· 181 189 self.at_bottom = false; 182 190 } 183 191 192 + fn prepend_turns(&mut self, turns: Vec<(i64, String, String)>) -> bool { 193 + if turns.is_empty() { 194 + return false; 195 + } 196 + let msgs: Vec<ChatMsg> = turns 197 + .iter() 198 + .filter_map(|(id, role, content)| { 199 + // track oldest id for next page request 200 + if self.oldest_turn_id.map_or(true, |cur| *id < cur) { 201 + self.oldest_turn_id = Some(*id); 202 + } 203 + match role.as_str() { 204 + "user" => Some(ChatMsg::user(content.clone())), 205 + "assistant" => { 206 + let mut m = ChatMsg::assistant(); 207 + m.content = content.clone(); 208 + m.role = Role::Assistant { 209 + reason: None, 210 + done: true, 211 + }; 212 + Some(m) 213 + } 214 + _ => None, 215 + } 216 + }) 217 + .collect(); 218 + let mut new_history = msgs; 219 + new_history.extend(std::mem::take(&mut self.history)); 220 + self.history = new_history; 221 + true 222 + } 223 + 184 224 fn snap_to_bottom(&mut self) { 185 225 self.scroll.scroll_to_bottom(); 186 226 self.at_bottom = true; ··· 356 396 pub async fn run() -> Result<()> { 357 397 let path = sock_path(); 358 398 if !path.exists() { 359 - tracing::error!("daemon not running, start it with `agent daemon`"); 399 + tracing::error!("daemon not running, start it with `klbr daemon`"); 360 400 std::process::exit(1); 361 401 } 362 402 ··· 436 476 437 477 let context_str = if app.watermark > 0 { 438 478 let remaining = app.watermark.saturating_sub(app.context_chars); 439 - // rough estimate: ~4 chars per token average 440 - // todo: fix this 441 479 let tokens_left = remaining / 4; 442 480 format!("ctx {ctx_pct}% (~{tokens_left} tok until compact)") 443 481 } else { 444 482 String::new() 445 483 }; 446 484 485 + let history_str = if app.loading_history { 486 + "loading history... ".to_string() 487 + } else { 488 + String::new() 489 + }; 490 + 447 491 let metrics = format!( 448 - "{}{}(turns: {}){}", 492 + "{}{}{}(turns: {}){}", 493 + history_str, 449 494 tps_str 450 495 .is_empty() 451 496 .not() ··· 504 549 match event { 505 550 Event::Key(k) if k.kind == KeyEventKind::Press => match (k.code, k.modifiers) { 506 551 (KeyCode::Char('c'), KeyModifiers::CONTROL) => return Ok(true), 507 - (KeyCode::PageUp, _) => app.scroll_page_up(), 552 + (KeyCode::PageUp, _) => { 553 + app.scroll_page_up(); 554 + // request older history if not exhausted and not already loading 555 + if !app.history_exhausted && !app.loading_history { 556 + app.loading_history = true; 557 + let before_id = app.oldest_turn_id.unwrap_or(i64::MAX); 558 + let payload = serde_json::to_string(&ClientMsg::FetchHistory { 559 + before_id, 560 + limit: 50, 561 + })? + "\n"; 562 + sock_tx.write_all(payload.as_bytes()).await?; 563 + } 564 + } 508 565 (KeyCode::PageDown, _) => app.scroll_page_down(), 509 566 (KeyCode::Enter, _) => { 510 567 let raw = app.take_input(); ··· 587 644 app.turn_count = turn_count; 588 645 app.context_chars = context_chars; 589 646 app.watermark = watermark; 647 + } 648 + ServerMsg::History { turns } => { 649 + app.prepend_turns(turns); 650 + } 651 + ServerMsg::HistoryPage { turns } => { 652 + app.loading_history = false; 653 + if !app.prepend_turns(turns) { 654 + app.history_exhausted = true; 655 + } 590 656 } 591 657 } 592 658 Ok(())