ive harnessed the harness
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

compaction stuff

dawn c11c95c5 b7df550e

+266 -63
+20
Cargo.lock
··· 481 481 ] 482 482 483 483 [[package]] 484 + name = "fastrand" 485 + version = "2.4.0" 486 + source = "registry+https://github.com/rust-lang/crates.io-index" 487 + checksum = "a043dc74da1e37d6afe657061213aa6f425f855399a11d3463c6ecccc4dfda1f" 488 + 489 + [[package]] 484 490 name = "filedescriptor" 485 491 version = "0.8.3" 486 492 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1110 1116 "serde", 1111 1117 "serde_json", 1112 1118 "sqlite-vec", 1119 + "tempfile", 1113 1120 "tokio", 1114 1121 "tracing", 1115 1122 "tracing-subscriber", ··· 2347 2354 dependencies = [ 2348 2355 "core-foundation-sys", 2349 2356 "libc", 2357 + ] 2358 + 2359 + [[package]] 2360 + name = "tempfile" 2361 + version = "3.27.0" 2362 + source = "registry+https://github.com/rust-lang/crates.io-index" 2363 + checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" 2364 + dependencies = [ 2365 + "fastrand", 2366 + "getrandom 0.4.2", 2367 + "once_cell", 2368 + "rustix", 2369 + "windows-sys 0.61.2", 2350 2370 ] 2351 2371 2352 2372 [[package]]
+2
klbr-core/Cargo.toml
··· 19 19 dirs = "5" 20 20 tracing = "0.1.44" 21 21 tracing-subscriber = "0.3.23" 22 + [dev-dependencies] 23 + tempfile = "3"
+59 -35
klbr-core/src/agent.rs
··· 34 34 } 35 35 36 36 while let Some(interrupt) = rx.recv().await { 37 - let text = interrupt.to_text().to_string(); 38 - let source = interrupt.source_tag().to_string(); 37 + match interrupt { 38 + Interrupt::Reset => { 39 + ctx.clear(); 40 + turn_count = 0; 41 + let _ = output.send(AgentEvent::Status("context reset".into())); 42 + continue; 43 + } 44 + Interrupt::Compact => { 45 + let _ = output.send(AgentEvent::Status("compacting...".into())); 46 + if let Err(e) = compact(&llm, &memory, &mut ctx, 0).await { 47 + let _ = output.send(AgentEvent::Status(format!("compaction failed: {e}"))); 48 + } else { 49 + turn_count = ctx.turn_count(); 50 + let _ = output.send(AgentEvent::Status("done".into())); 51 + } 52 + continue; 53 + } 54 + Interrupt::UserMessage(ref text) => { 55 + let source = interrupt.source_tag().to_string(); 39 56 40 - let memories: Vec<String> = llm 41 - .embed(&text) 42 - .await 43 - .ok() 44 - .and_then(|emb| memory.top_k(&emb, config.memory_top_k).ok()) 45 - .unwrap_or_default() 46 - .into_iter() 47 - .filter(|(dist, _)| *dist < config.memory_sim_threshold) 48 - .map(|(_, m)| m.content) 49 - .collect(); 57 + let memories: Vec<String> = llm 58 + .embed(&text) 59 + .await 60 + .ok() 61 + .and_then(|emb| memory.top_k(&emb, config.memory_top_k).ok()) 62 + .unwrap_or_default() 63 + .into_iter() 64 + .filter(|(dist, _)| *dist < config.memory_sim_threshold) 65 + .map(|(_, m)| m.content) 66 + .collect(); 50 67 51 - if !memories.is_empty() { 52 - let _ = output.send(AgentEvent::Status(format!( 53 - "recalled {} memor{}", 54 - memories.len(), 55 - if memories.len() == 1 { "y" } else { "ies" } 56 - ))); 57 - } 68 + if !memories.is_empty() { 69 + let _ = output.send(AgentEvent::Status(format!( 70 + "recalled {} memor{}", 71 + memories.len(), 72 + if memories.len() == 1 { "y" } else { "ies" } 73 + ))); 74 + } 58 75 59 - ctx.push_user(&source, &text, &memories); 60 - let _ = memory.log_turn("user", &text, None); 61 - turn_count += 1; 76 + ctx.push_user(&source, &text, &memories); 77 + let _ = memory.log_turn("user", &text, None); 78 + turn_count += 1; 79 + } 80 + } 62 81 63 82 let (tok_tx, mut tok_rx) = mpsc::channel(256); 64 83 let llm2 = llm.clone(); ··· 102 121 103 122 if ctx.total_tokens > config.watermark_tokens { 104 123 let _ = output.send(AgentEvent::Status("compacting...".into())); 105 - compact(&config, &llm, &memory, &mut ctx).await; 106 - // reset turn count after compaction since context was partially evicted 107 - turn_count = ctx.turn_count(); 124 + if let Err(e) = compact(&llm, &memory, &mut ctx, config.compaction_keep).await { 125 + let _ = output.send(AgentEvent::Status(format!("compaction failed: {e}"))); 126 + } else { 127 + // reset turn count after compaction since context was partially evicted 128 + turn_count = ctx.turn_count(); 129 + } 108 130 } 109 131 } 110 132 111 133 Ok(()) 112 134 } 113 135 114 - async fn compact(config: &Config, llm: &LlmClient, memory: &MemoryStore, ctx: &mut Context) { 115 - let drained = ctx.drain_oldest(config.compaction_keep); 136 + async fn compact( 137 + llm: &LlmClient, 138 + memory: &MemoryStore, 139 + ctx: &mut Context, 140 + keep: usize, 141 + ) -> Result<()> { 142 + let drained = ctx.drain_oldest(keep); 116 143 if drained.is_empty() { 117 - return; 144 + return Ok(()); 118 145 } 119 146 120 147 let turns_text = drained ··· 127 154 "summarize these conversation turns concisely, preserving key facts and topics:\n\n{turns_text}" 128 155 ))]; 129 156 130 - let Ok((summary, _)) = llm.complete(&prompt).await else { 131 - return; 132 - }; 133 - let Ok(emb) = llm.embed(&summary).await else { 134 - return; 135 - }; 136 - let _ = memory.store(&summary, &emb); 157 + let (summary, _) = llm.complete(&prompt).await?; 158 + let emb = llm.embed(&summary).await?; 159 + memory.store(&summary, &emb)?; 160 + Ok(()) 137 161 }
+2
klbr-core/src/config.rs
··· 16 16 pub memory_sim_threshold: f32, 17 17 pub db_path: String, 18 18 pub anchor: String, 19 + pub embed_dim: usize, 19 20 } 20 21 21 22 const ANCHOR: &str = r#" ··· 37 38 memory_sim_threshold: 0.3, 38 39 db_path: "agent.db".into(), 39 40 anchor: ANCHOR.into(), 41 + embed_dim: 768, 40 42 } 41 43 } 42 44 }
+5
klbr-core/src/context.rs
··· 27 27 } 28 28 } 29 29 30 + pub fn clear(&mut self) { 31 + self.turns.clear(); 32 + self.total_tokens = 0; 33 + } 34 + 30 35 /// push a user turn, optionally prepending recalled memories 31 36 pub fn push_user(&mut self, source: &str, content: &str, memories: &[String]) { 32 37 let text = if memories.is_empty() {
+4 -6
klbr-core/src/interrupt.rs
··· 3 3 #[derive(Debug, Clone)] 4 4 pub enum Interrupt { 5 5 UserMessage(String), 6 - // add new sources here, e.g.: 7 - // BskyPost { author: String, content: String }, 8 - // FileChanged(std::path::PathBuf), 9 - // ShellCommand(String), 6 + Reset, 7 + Compact, 10 8 } 11 9 12 10 impl Interrupt { 13 11 pub fn to_text(&self) -> &str { 14 12 match self { 15 13 Interrupt::UserMessage(s) => s, 16 - // Interrupt::BskyPost { author, content } => content, 14 + Interrupt::Reset | Interrupt::Compact => "", 17 15 } 18 16 } 19 17 ··· 21 19 pub fn source_tag(&self) -> &str { 22 20 match self { 23 21 Interrupt::UserMessage(_) => "user", 24 - // Interrupt::BskyPost { .. } => "bsky", 22 + Interrupt::Reset | Interrupt::Compact => "system", 25 23 } 26 24 } 27 25 }
+100 -16
klbr-core/src/memory.rs
··· 1 + use serde::{Deserialize, Serialize}; 1 2 use anyhow::Result; 2 3 use rusqlite::{ffi::sqlite3_auto_extension, params, Connection}; 3 4 use sqlite_vec::sqlite3_vec_init; ··· 12 13 pub reasoning: Option<String>, 13 14 } 14 15 16 + #[derive(Debug, Clone, Serialize, Deserialize)] 15 17 pub struct Memory { 16 18 pub content: String, 19 + pub embedding: Vec<f32>, 17 20 } 18 21 19 22 /// sqlite-backed episodic memory store using sqlite-vec for cosine ANN. 20 23 #[derive(Clone)] 21 - pub struct MemoryStore(Arc<Mutex<Connection>>); 24 + pub struct MemoryStore { 25 + conn: Arc<Mutex<Connection>>, 26 + embed_dim: usize, 27 + } 22 28 23 29 impl MemoryStore { 24 - pub fn open(path: &str) -> Result<Self> { 30 + pub fn open(path: &str, embed_dim: usize) -> Result<Self> { 25 31 // register sqlite-vec extension for all connections opened after this point 26 32 unsafe { 27 33 sqlite3_auto_extension(Some(std::mem::transmute(sqlite3_vec_init as *const ()))); 28 34 } 29 35 30 36 let conn = Connection::open(path)?; 31 - conn.execute_batch( 37 + let store = Self { 38 + conn: Arc::new(Mutex::new(conn)), 39 + embed_dim, 40 + }; 41 + store.init_schema()?; 42 + Ok(store) 43 + } 44 + 45 + fn init_schema(&self) -> Result<()> { 46 + let conn = self.conn.lock().unwrap(); 47 + conn.execute_batch(&format!( 32 48 " 33 49 CREATE TABLE IF NOT EXISTS memories ( 34 50 id INTEGER PRIMARY KEY, ··· 37 53 ); 38 54 39 55 CREATE VIRTUAL TABLE IF NOT EXISTS vec_memories USING vec0( 40 - embedding float[1024] distance_metric=cosine 56 + embedding float[{}] distance_metric=cosine 41 57 ); 42 58 43 59 CREATE TABLE IF NOT EXISTS turns ( ··· 48 64 ts INTEGER NOT NULL DEFAULT (unixepoch()) 49 65 ); 50 66 ", 67 + self.embed_dim 68 + ))?; 69 + Ok(()) 70 + } 71 + 72 + pub fn reset(&self) -> Result<()> { 73 + let conn = self.conn.lock().unwrap(); 74 + conn.execute_batch( 75 + " 76 + DROP TABLE IF EXISTS vec_memories; 77 + DROP TABLE IF EXISTS memories; 78 + DROP TABLE IF EXISTS turns; 79 + ", 51 80 )?; 52 - Ok(Self(Arc::new(Mutex::new(conn)))) 81 + drop(conn); 82 + self.init_schema()?; 83 + Ok(()) 53 84 } 54 85 55 86 pub fn store(&self, content: &str, emb: &[f32]) -> Result<()> { 56 - let conn = self.0.lock().unwrap(); 87 + let conn = self.conn.lock().unwrap(); 57 88 conn.execute( 58 89 "INSERT INTO memories (content) VALUES (?1)", 59 90 params![content], ··· 69 100 70 101 /// k-nearest memories by cosine distance, returns (distance, memory) pairs. 71 102 pub fn top_k(&self, query: &[f32], k: usize) -> Result<Vec<(f32, Memory)>> { 72 - let conn = self.0.lock().unwrap(); 103 + let conn = self.conn.lock().unwrap(); 73 104 let blob = f32s_to_bytes(query); 74 105 let mut stmt = conn.prepare( 75 106 " ··· 83 114 )?; 84 115 let results = stmt 85 116 .query_map(params![blob, k as i64], |row| { 86 - Ok((row.get::<_, f32>(1)?, row.get::<_, String>(0)?)) 117 + let content = row.get::<_, String>(0)?; 118 + let dist = row.get::<_, f32>(1)?; 119 + // fetch embedding too since we want Memory to be complete 120 + let mut stmt_emb = conn.prepare("SELECT embedding FROM vec_memories WHERE rowid = (SELECT id FROM memories WHERE content = ?1 LIMIT 1)")?; 121 + let emb_blob: Vec<u8> = stmt_emb.query_row(params![content], |r| r.get(0))?; 122 + let embedding = bytes_to_f32s(&emb_blob); 123 + Ok((dist, Memory { content, embedding })) 87 124 })? 88 125 .filter_map(|r| r.ok()) 89 - .map(|(dist, content)| (dist, Memory { content })) 90 126 .collect(); 91 127 Ok(results) 92 128 } 93 129 94 130 /// all memories, newest first. 95 131 pub fn get_all(&self) -> Result<Vec<Memory>> { 96 - let conn = self.0.lock().unwrap(); 97 - let mut stmt = conn.prepare("SELECT content FROM memories ORDER BY ts DESC")?; 132 + let conn = self.conn.lock().unwrap(); 133 + let mut stmt = conn.prepare( 134 + " 135 + SELECT m.content, v.embedding 136 + FROM memories m 137 + JOIN vec_memories v ON v.rowid = m.id 138 + ORDER BY m.ts DESC 139 + ", 140 + )?; 98 141 let results = stmt 99 - .query_map([], |row| row.get::<_, String>(0))? 142 + .query_map([], |row| { 143 + let content = row.get::<_, String>(0)?; 144 + let emb_bytes = row.get::<_, Vec<u8>>(1)?; 145 + let embedding = bytes_to_f32s(&emb_bytes); 146 + Ok(Memory { content, embedding }) 147 + })? 100 148 .filter_map(|r| r.ok()) 101 - .map(|content| Memory { content }) 102 149 .collect(); 103 150 Ok(results) 104 151 } 105 152 106 153 pub fn log_turn(&self, role: &str, content: &str, thinking: Option<&str>) -> Result<()> { 107 - self.0.lock().unwrap().execute( 154 + self.conn.lock().unwrap().execute( 108 155 "INSERT INTO turns (role, content, thinking) VALUES (?1, ?2, ?3)", 109 156 params![role, content, thinking], 110 157 )?; ··· 113 160 114 161 /// last `n` turns in chronological order (oldest first, ready to replay into context). 115 162 pub fn recent_turns(&self, n: usize) -> Result<Vec<HistoryEntry>> { 116 - let conn = self.0.lock().unwrap(); 163 + let conn = self.conn.lock().unwrap(); 117 164 let mut stmt = conn.prepare( 118 165 "SELECT id, role, content, thinking, ts FROM (SELECT id, role, content, thinking, ts FROM turns ORDER BY ts DESC LIMIT ?1) ORDER BY ts ASC", 119 166 )?; ··· 134 181 135 182 /// turns older than `before_id`, newest-first then reversed, for scroll-back paging. 136 183 pub fn turns_before(&self, before_id: i64, limit: usize) -> Result<Vec<HistoryEntry>> { 137 - let conn = self.0.lock().unwrap(); 184 + let conn = self.conn.lock().unwrap(); 138 185 let mut stmt = conn.prepare( 139 186 "SELECT id, role, content, thinking, ts FROM (SELECT id, role, content, thinking, ts FROM turns WHERE id < ?1 ORDER BY id DESC LIMIT ?2) ORDER BY id ASC", 140 187 )?; ··· 157 204 fn f32s_to_bytes(v: &[f32]) -> Vec<u8> { 158 205 v.iter().flat_map(|f| f.to_le_bytes()).collect() 159 206 } 207 + 208 + fn bytes_to_f32s(b: &[u8]) -> Vec<f32> { 209 + b.chunks_exact(4) 210 + .map(|c| f32::from_le_bytes(c.try_into().unwrap())) 211 + .collect() 212 + } 213 + 214 + #[cfg(test)] 215 + mod tests { 216 + use super::*; 217 + use tempfile::NamedTempFile; 218 + 219 + #[test] 220 + fn test_memory_reset() -> Result<()> { 221 + let tmp = NamedTempFile::new()?; 222 + let path = tmp.path().to_str().unwrap(); 223 + let store = MemoryStore::open(path, 768)?; 224 + 225 + // add some data 226 + store.log_turn("user", "hello", None)?; 227 + store.store("mem", &[1.0; 768])?; 228 + 229 + assert_eq!(store.recent_turns(10)?.len(), 1); 230 + let all = store.get_all()?; 231 + assert_eq!(all.len(), 1); 232 + assert_eq!(all[0].content, "mem"); 233 + assert_eq!(all[0].embedding.len(), 768); 234 + 235 + // reset 236 + store.reset()?; 237 + 238 + assert_eq!(store.recent_turns(10)?.len(), 0); 239 + assert_eq!(store.get_all()?.len(), 0); 240 + 241 + Ok(()) 242 + } 243 + }
+39
klbr-daemon/src/daemon.rs
··· 97 97 let turns = turns.into_iter().map(map_history).collect(); 98 98 send_msg(&mut sock_tx, &ServerMsg::History { turns }).await?; 99 99 } 100 + ClientMsg::Compact => { 101 + interrupt_tx.send(Interrupt::Compact).await?; 102 + } 103 + ClientMsg::Reset => { 104 + memory.reset()?; 105 + interrupt_tx.send(Interrupt::Reset).await?; 106 + send_msg( 107 + &mut sock_tx, 108 + &ServerMsg::Status { 109 + content: "database reset".into(), 110 + }, 111 + ) 112 + .await?; 113 + } 114 + ClientMsg::DumpMemories { path } => { 115 + let path = path.unwrap_or_else(|| "memories.json".into()); 116 + let memories = memory.get_all()?; 117 + let json = serde_json::to_string_pretty(&memories)?; 118 + match std::fs::write(&path, json) { 119 + Ok(_) => { 120 + send_msg( 121 + &mut sock_tx, 122 + &ServerMsg::Status { 123 + content: format!("memories dumped to {path}"), 124 + }, 125 + ) 126 + .await?; 127 + } 128 + Err(e) => { 129 + send_msg( 130 + &mut sock_tx, 131 + &ServerMsg::Status { 132 + content: format!("dump failed: {e}"), 133 + }, 134 + ) 135 + .await?; 136 + } 137 + } 138 + } 100 139 } 101 140 } 102 141 ev = rx.recv() => {
+1 -1
klbr-daemon/src/main.rs
··· 10 10 tracing_subscriber::fmt().init(); 11 11 12 12 let config = Config::default(); 13 - let memory = memory::MemoryStore::open(&config.db_path)?; 13 + let memory = memory::MemoryStore::open(&config.db_path, config.embed_dim)?; 14 14 let llm = llm::LlmClient::new(config.clone()); 15 15 let snapshot = Arc::new(RwLock::new(None)) as MetricsSnapshot; 16 16
+3
klbr-ipc/src/lib.rs
··· 7 7 pub enum ClientMsg { 8 8 Message { source: String, content: String }, 9 9 FetchHistory { before_id: i64, limit: usize }, 10 + Compact, 11 + Reset, 12 + DumpMemories { path: Option<String> }, 10 13 } 11 14 12 15 #[derive(Debug, Serialize, Deserialize, Clone)]
+31 -5
klbr-tui/src/main.rs
··· 299 299 enum Cmd { 300 300 Send(String), 301 301 Clear, 302 + Compact, 303 + Reset, 304 + Dump(Option<String>), 302 305 ToggleThink, 303 306 Help, 304 307 Unknown(String), ··· 310 313 } 311 314 match raw[1..].splitn(2, ' ').next().unwrap_or("") { 312 315 "clear" | "c" => Cmd::Clear, 316 + "compact" | "cp" => Cmd::Compact, 317 + "reset" => Cmd::Reset, 318 + "dump" | "d" => { 319 + let path = raw.splitn(2, ' ').nth(1).map(|s| s.to_string()); 320 + Cmd::Dump(path) 321 + } 313 322 "think" | "t" => Cmd::ToggleThink, 314 323 "help" | "h" => Cmd::Help, 315 324 other => Cmd::Unknown(other.to_string()), ··· 317 326 } 318 327 319 328 fn help_lines() -> &'static str { 320 - "/clear (/c) clear chat history\n\ 321 - /think (/t) toggle last reasoning trace\n\ 322 - /help (/h) show this help\n\ 323 - pgup/dn scroll history\n\ 324 - ctrl+c quit" 329 + "/clear (/c) clear chat history display\n\ 330 + /compact (/cp) manually trigger agent context compaction\n\ 331 + /reset reset entire database and agent context\n\ 332 + /dump (/d) dump all memories to a file [path]\n\ 333 + /think (/t) toggle last reasoning trace\n\ 334 + /help (/h) show this help\n\ 335 + pgup/dn scroll history\n\ 336 + ctrl+c quit" 325 337 } 326 338 327 339 // ── rendering ──────────────────────────────────────────────────────────────── ··· 607 619 Cmd::Clear => { 608 620 app.history.clear(); 609 621 app.snap_to_bottom(); 622 + } 623 + Cmd::Compact => { 624 + let payload = serde_json::to_string(&ClientMsg::Compact)? + "\n"; 625 + sock_tx.write_all(payload.as_bytes()).await?; 626 + } 627 + Cmd::Reset => { 628 + app.history.clear(); 629 + app.snap_to_bottom(); 630 + let payload = serde_json::to_string(&ClientMsg::Reset)? + "\n"; 631 + sock_tx.write_all(payload.as_bytes()).await?; 632 + } 633 + Cmd::Dump(path) => { 634 + let payload = serde_json::to_string(&ClientMsg::DumpMemories { path })? + "\n"; 635 + sock_tx.write_all(payload.as_bytes()).await?; 610 636 } 611 637 Cmd::ToggleThink => app.toggle_last_think(), 612 638 Cmd::Help => {