ive harnessed the harness
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move daemon code into daemon crate

dawn b7df550e 1781bc44

+128 -87
+1 -1
Cargo.lock
··· 1105 1105 "anyhow", 1106 1106 "dirs", 1107 1107 "futures", 1108 - "klbr-ipc", 1109 1108 "reqwest", 1110 1109 "rusqlite", 1111 1110 "serde", ··· 1123 1122 "anyhow", 1124 1123 "klbr-core", 1125 1124 "klbr-ipc", 1125 + "serde_json", 1126 1126 "tokio", 1127 1127 "tracing", 1128 1128 "tracing-subscriber",
-1
klbr-core/Cargo.toml
··· 8 8 path = "src/lib.rs" 9 9 10 10 [dependencies] 11 - klbr-ipc = { path = "../klbr-ipc" } 12 11 sqlite-vec = "0.1.0" 13 12 tokio = { version = "1", features = ["full"] } 14 13 rusqlite = { version = "0.39", features = ["bundled"] }
+16 -21
klbr-core/src/agent.rs
··· 1 1 use anyhow::Result; 2 - use std::sync::Arc; 3 - use tokio::sync::{broadcast, mpsc, RwLock}; 2 + use tokio::sync::{broadcast, mpsc}; 4 3 5 - pub type MetricsSnapshot = Arc<RwLock<Option<klbr_ipc::ServerMsg>>>; 4 + use crate::MetricsSnapshot; 6 5 7 6 use crate::{ 8 7 config::Config, ··· 10 9 interrupt::Interrupt, 11 10 llm::{LlmClient, LlmEvent, Message}, 12 11 memory::MemoryStore, 12 + AgentEvent, AgentMetrics, 13 13 }; 14 - use klbr_ipc::ServerMsg; 15 14 16 15 pub async fn run( 17 16 config: Config, 18 17 llm: LlmClient, 19 18 memory: MemoryStore, 20 19 mut rx: mpsc::Receiver<Interrupt>, 21 - output: broadcast::Sender<ServerMsg>, 20 + output: broadcast::Sender<AgentEvent>, 22 21 snapshot: MetricsSnapshot, 23 22 ) -> Result<()> { 24 23 let mut ctx = Context::new(&config.anchor); ··· 50 49 .collect(); 51 50 52 51 if !memories.is_empty() { 53 - let _ = output.send(ServerMsg::Status { 54 - content: format!( 55 - "recalled {} memor{}", 56 - memories.len(), 57 - if memories.len() == 1 { "y" } else { "ies" } 58 - ), 59 - }); 52 + let _ = output.send(AgentEvent::Status(format!( 53 + "recalled {} memor{}", 54 + memories.len(), 55 + if memories.len() == 1 { "y" } else { "ies" } 56 + ))); 60 57 } 61 58 62 59 ctx.push_user(&source, &text, &memories); ··· 69 66 tokio::spawn(async move { 70 67 let _ = llm2.stream(&msgs, tok_tx).await; 71 68 }); 72 - let _ = output.send(ServerMsg::Started); 69 + let _ = output.send(AgentEvent::Started); 73 70 74 71 let mut response = String::new(); 75 72 let mut thinking = String::new(); ··· 77 74 match ev { 78 75 LlmEvent::ThinkToken(tok) => { 79 76 thinking.push_str(&tok); 80 - let _ = output.send(ServerMsg::ThinkToken { content: tok }); 77 + let _ = output.send(AgentEvent::ThinkToken(tok)); 81 78 } 82 79 LlmEvent::Token(tok) => { 83 80 response.push_str(&tok); 84 - let _ = output.send(ServerMsg::Token { content: tok }); 81 + let _ = output.send(AgentEvent::Token(tok)); 85 82 } 86 83 LlmEvent::Usage(usage) => { 87 84 ctx.update_tokens(usage.total_tokens); ··· 94 91 let _ = memory.log_turn("assistant", &response, thinking_ref); 95 92 turn_count += 1; 96 93 97 - let _ = output.send(ServerMsg::Done); 98 - let metrics = ServerMsg::Metrics { 94 + let _ = output.send(AgentEvent::Done); 95 + let metrics = AgentMetrics { 99 96 turn_count, 100 97 context_tokens: ctx.total_tokens, 101 98 watermark: config.watermark_tokens, 102 99 }; 103 100 *snapshot.write().await = Some(metrics.clone()); 104 - let _ = output.send(metrics); 101 + let _ = output.send(AgentEvent::Metrics(metrics)); 105 102 106 103 if ctx.total_tokens > config.watermark_tokens { 107 - let _ = output.send(ServerMsg::Status { 108 - content: "compacting...".into(), 109 - }); 104 + let _ = output.send(AgentEvent::Status("compacting...".into())); 110 105 compact(&config, &llm, &memory, &mut ctx).await; 111 106 // reset turn count after compaction since context was partially evicted 112 107 turn_count = ctx.turn_count();
+42 -14
klbr-core/src/daemon.rs klbr-daemon/src/daemon.rs
··· 1 1 use anyhow::Result; 2 - use std::sync::Arc; 3 2 use tokio::{ 4 3 io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}, 5 4 net::UnixListener, 6 - sync::{broadcast, mpsc, RwLock}, 5 + sync::{broadcast, mpsc}, 7 6 }; 8 7 9 - use crate::{interrupt::Interrupt, memory::MemoryStore}; 10 - use klbr_ipc::{sock_path, ClientMsg, ServerMsg}; 11 - 12 - /// last known metrics snapshot, updated by agent, read on new client connect 13 - pub type MetricsSnapshot = Arc<RwLock<Option<ServerMsg>>>; 8 + use klbr_core::{interrupt::Interrupt, memory::MemoryStore, AgentEvent, MetricsSnapshot}; 9 + use klbr_ipc::{sock_path, ClientMsg, HistoryEntry as IpcHistoryEntry, ServerMsg}; 14 10 15 11 const HISTORY_PAGE: usize = 50; 16 12 17 13 pub async fn serve( 18 14 interrupt_tx: mpsc::Sender<Interrupt>, 19 - output: broadcast::Sender<ServerMsg>, 15 + output: broadcast::Sender<AgentEvent>, 20 16 snapshot: MetricsSnapshot, 21 17 memory: MemoryStore, 22 18 history_window: usize, ··· 54 50 async fn handle( 55 51 sock: tokio::net::UnixStream, 56 52 interrupt_tx: mpsc::Sender<Interrupt>, 57 - output: broadcast::Sender<ServerMsg>, 53 + output: broadcast::Sender<AgentEvent>, 58 54 snapshot: MetricsSnapshot, 59 55 memory: MemoryStore, 60 56 history_window: usize, ··· 65 61 66 62 // push current context window so tui starts with history 67 63 let turns = memory.recent_turns(history_window).unwrap_or_default(); 64 + let turns = turns.into_iter().map(map_history).collect(); 68 65 send_msg(&mut sock_tx, &ServerMsg::History { turns }).await?; 69 66 70 67 // send last known metrics so status bar is populated immediately 71 - let snap: Option<ServerMsg> = snapshot.read().await.clone(); 68 + let snap = snapshot.read().await.clone(); 72 69 if let Some(metrics) = snap { 73 - send_msg(&mut sock_tx, &metrics).await?; 70 + send_msg( 71 + &mut sock_tx, 72 + &ServerMsg::Metrics { 73 + turn_count: metrics.turn_count, 74 + context_tokens: metrics.context_tokens, 75 + watermark: metrics.watermark, 76 + }, 77 + ) 78 + .await?; 74 79 } 75 80 76 81 loop { ··· 89 94 ClientMsg::FetchHistory { before_id, limit } => { 90 95 let limit = limit.min(HISTORY_PAGE); 91 96 let turns = memory.turns_before(before_id, limit).unwrap_or_default(); 97 + let turns = turns.into_iter().map(map_history).collect(); 92 98 send_msg(&mut sock_tx, &ServerMsg::History { turns }).await?; 93 99 } 94 100 } 95 101 } 96 - msg = rx.recv() => { 97 - let msg = msg?; 98 - tracing::info!(msg = ?msg, "received message from agent"); 102 + ev = rx.recv() => { 103 + let ev = ev?; 104 + let msg = match ev { 105 + AgentEvent::Started => ServerMsg::Started, 106 + AgentEvent::Token(content) => ServerMsg::Token { content }, 107 + AgentEvent::ThinkToken(content) => ServerMsg::ThinkToken { content }, 108 + AgentEvent::Done => ServerMsg::Done, 109 + AgentEvent::Status(content) => ServerMsg::Status { content }, 110 + AgentEvent::Metrics(m) => ServerMsg::Metrics { 111 + turn_count: m.turn_count, 112 + context_tokens: m.context_tokens, 113 + watermark: m.watermark, 114 + }, 115 + }; 116 + tracing::info!(msg = ?msg, "sending message to client"); 99 117 send_msg(&mut sock_tx, &msg).await?; 100 118 } 101 119 } ··· 103 121 104 122 Ok(()) 105 123 } 124 + 125 + fn map_history(e: klbr_core::memory::HistoryEntry) -> IpcHistoryEntry { 126 + IpcHistoryEntry { 127 + id: e.id, 128 + timestamp: e.timestamp, 129 + role: e.role, 130 + content: e.content, 131 + reasoning: e.reasoning, 132 + } 133 + }
+22 -48
klbr-core/src/lib.rs
··· 1 - mod agent; 2 - mod config; 3 - mod context; 4 - mod daemon; 5 - mod interrupt; 6 - mod llm; 7 - mod memory; 8 - use klbr_ipc::ServerMsg; 1 + pub mod agent; 2 + pub mod config; 3 + pub mod context; 4 + pub mod interrupt; 5 + pub mod llm; 6 + pub mod memory; 9 7 10 - use anyhow::Result; 11 8 use std::sync::Arc; 12 - use tokio::sync::{broadcast, mpsc, RwLock}; 9 + use tokio::sync::RwLock; 13 10 14 - use config::Config; 15 - 16 - pub async fn run_daemon() -> Result<()> { 17 - tracing_subscriber::fmt().init(); 18 - 19 - let config = Config::default(); 20 - let memory = memory::MemoryStore::open(&config.db_path)?; 21 - let llm = llm::LlmClient::new(config.clone()); 22 - let snapshot = Arc::new(RwLock::new(None)); 23 - 24 - let (interrupt_tx, interrupt_rx) = mpsc::channel::<interrupt::Interrupt>(32); 25 - let (output_tx, _) = broadcast::channel::<ServerMsg>(256); 26 - 27 - // --- register extra interrupt sources here --- 28 - // interrupt::spawn_source(interrupt_tx.clone(), |tx| async move { 29 - // bsky::run(tx).await 30 - // }); 11 + pub type MetricsSnapshot = Arc<RwLock<Option<AgentMetrics>>>; 31 12 32 - let history_window = config.compaction_keep; 33 - let agent = tokio::spawn(agent::run( 34 - config, 35 - llm, 36 - memory.clone(), 37 - interrupt_rx, 38 - output_tx.clone(), 39 - snapshot.clone(), 40 - )); 41 - let sock = tokio::spawn(daemon::serve( 42 - interrupt_tx, 43 - output_tx, 44 - snapshot, 45 - memory, 46 - history_window, 47 - )); 13 + #[derive(Debug, Clone)] 14 + pub struct AgentMetrics { 15 + pub turn_count: usize, 16 + pub context_tokens: usize, 17 + pub watermark: usize, 18 + } 48 19 49 - tokio::select! { 50 - r = agent => { r??; } 51 - r = sock => { r??; } 52 - } 53 - Ok(()) 20 + #[derive(Debug, Clone)] 21 + pub enum AgentEvent { 22 + Started, 23 + Token(String), 24 + ThinkToken(String), 25 + Done, 26 + Status(String), 27 + Metrics(AgentMetrics), 54 28 }
+9 -1
klbr-core/src/memory.rs
··· 1 1 use anyhow::Result; 2 - use klbr_ipc::HistoryEntry; 3 2 use rusqlite::{ffi::sqlite3_auto_extension, params, Connection}; 4 3 use sqlite_vec::sqlite3_vec_init; 5 4 use std::sync::{Arc, Mutex}; 5 + 6 + #[derive(Debug, Clone)] 7 + pub struct HistoryEntry { 8 + pub id: i64, 9 + pub timestamp: i64, 10 + pub role: String, 11 + pub content: String, 12 + pub reasoning: Option<String>, 13 + } 6 14 7 15 pub struct Memory { 8 16 pub content: String,
+1
klbr-daemon/Cargo.toml
··· 10 10 anyhow = "1" 11 11 tracing = "0.1.44" 12 12 tracing-subscriber = "0.3.23" 13 + serde_json = "1"
+37 -1
klbr-daemon/src/main.rs
··· 1 + mod daemon; 2 + 1 3 use anyhow::Result; 4 + use klbr_core::{agent, config::Config, llm, memory, AgentEvent, MetricsSnapshot}; 5 + use std::sync::Arc; 6 + use tokio::sync::{broadcast, mpsc, RwLock}; 2 7 3 8 #[tokio::main] 4 9 async fn main() -> Result<()> { 5 - klbr_core::run_daemon().await 10 + tracing_subscriber::fmt().init(); 11 + 12 + let config = Config::default(); 13 + let memory = memory::MemoryStore::open(&config.db_path)?; 14 + let llm = llm::LlmClient::new(config.clone()); 15 + let snapshot = Arc::new(RwLock::new(None)) as MetricsSnapshot; 16 + 17 + let (interrupt_tx, interrupt_rx) = mpsc::channel(32); 18 + let (output_tx, _) = broadcast::channel::<AgentEvent>(256); 19 + 20 + let history_window = config.compaction_keep; 21 + let agent = tokio::spawn(agent::run( 22 + config, 23 + llm, 24 + memory.clone(), 25 + interrupt_rx, 26 + output_tx.clone(), 27 + snapshot.clone(), 28 + )); 29 + let sock = tokio::spawn(daemon::serve( 30 + interrupt_tx, 31 + output_tx, 32 + snapshot, 33 + memory, 34 + history_window, 35 + )); 36 + 37 + tokio::select! { 38 + r = agent => { r??; } 39 + r = sock => { r??; } 40 + } 41 + Ok(()) 6 42 }