A local-first private AI assistant for everyday use. Runs on-device models with encrypted P2P sync, and supports sharing chats publicly on ATProto.
10
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Added get_last_row_counter fn + chat db migration + refactor

- row_counter field will be used as a kind of version vector for p2p chat sync

madclaws 10ceca31 7a3ec466

+146 -100
-13
Cargo.lock
··· 3203 3203 ] 3204 3204 3205 3205 [[package]] 3206 - name = "iroh-ping" 3207 - version = "0.9.0" 3208 - source = "registry+https://github.com/rust-lang/crates.io-index" 3209 - checksum = "ae0d3040396ce546281e3716e3fa88bcc425a99ab2fd2715484a86f050a5d36e" 3210 - dependencies = [ 3211 - "anyhow", 3212 - "iroh", 3213 - "iroh-metrics", 3214 - "n0-error", 3215 - ] 3216 - 3217 - [[package]] 3218 3206 name = "iroh-relay" 3219 3207 version = "0.97.0" 3220 3208 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 6096 6084 "hf-hub", 6097 6085 "iroh", 6098 6086 "iroh-gossip", 6099 - "iroh-ping", 6100 6087 "iroh-tickets", 6101 6088 "keyring", 6102 6089 "owo-colors",
-1
tiles/Cargo.toml
··· 22 22 uuid = {version = "1.21.0", features = ["v7"]} 23 23 axum = "0.8.8" 24 24 iroh = {version = "0.97.0", features = ["address-lookup-mdns"]} 25 - iroh-ping = "0.9.0" 26 25 iroh-tickets = "0.4.0" 27 26 axum-macros = "0.5.0" 28 27 iroh-gossip = "0.97.0"
+97 -49
tiles/src/core/chats.rs
··· 35 35 36 36 #[derive(Debug)] 37 37 pub struct Chats { 38 - pub id: Uuid, 38 + pub id: String, 39 39 content: String, 40 40 // The id of the responses api obj 41 41 response_id: Option<String>, ··· 43 43 role: Role, 44 44 user_id: String, 45 45 // The parent Id of a model's reply 46 - context_id: Option<Uuid>, 46 + context_id: Option<String>, 47 47 created_at: u64, 48 48 updated_at: u64, 49 + row_counter: i64, 49 50 } 50 51 51 52 pub fn save_chat( ··· 54 55 input: &str, 55 56 chat_resp: Option<&ChatResponse>, 56 57 ) -> Result<Chats> { 58 + let row_counter = get_last_row_counter(&conn, &user.user_id)?; 57 59 if let Some(chat_response) = chat_resp { 58 60 let chat_resp_cloned = chat_response.clone(); 61 + 59 62 let chat = Chats { 60 - id: Uuid::now_v7(), 63 + id: Uuid::now_v7().to_string(), 61 64 user_id: user.user_id.clone(), 62 65 content: input.to_owned(), 63 66 response_id: Some(chat_resp_cloned.prev_response_id), ··· 65 68 context_id: chat_resp_cloned.parent_chat_id, 66 69 created_at: get_unix_time_now(), 67 70 updated_at: get_unix_time_now(), 71 + row_counter: row_counter + 1, 68 72 }; 69 73 70 - conn.execute("insert into chats(id, user_id, content, resp_id, role, context_id, created_at, updated_at) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", (&chat.id.to_string(), &chat.user_id, &chat.content, &chat.response_id, Into::<String>::into(chat.role), &chat.context_id.unwrap_or(Uuid::nil()).to_string(), &chat.created_at.to_string(), &chat.updated_at.to_string()))?; 74 + conn.execute("insert into chats(id, user_id, content, resp_id, role, context_id, created_at, updated_at, row_counter) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", (&chat.id.to_string(), &chat.user_id, &chat.content, &chat.response_id, Into::<String>::into(chat.role), &chat.context_id, &chat.created_at.to_string(), &chat.updated_at.to_string(), &chat.row_counter))?; 71 75 72 76 Ok(chat) 73 77 } else { 74 78 let chat = Chats { 75 - id: Uuid::now_v7(), 79 + id: Uuid::now_v7().to_string(), 76 80 user_id: user.user_id.clone(), 77 81 content: input.to_owned(), 78 82 response_id: None, ··· 80 84 context_id: None, 81 85 created_at: get_unix_time_now(), 82 86 updated_at: get_unix_time_now(), 87 + row_counter: row_counter + 1, 83 88 }; 84 89 85 - conn.execute("insert into chats(id, user_id, content, resp_id, role, context_id, created_at, updated_at) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", (&chat.id.to_string(), &chat.user_id, &chat.content, &chat.response_id, Into::<String>::into(chat.role), &chat.context_id.unwrap_or(Uuid::nil()).to_string(), &chat.created_at.to_string(), &chat.updated_at.to_string()))?; 90 + conn.execute("insert into chats(id, user_id, content, resp_id, role, context_id, created_at, updated_at, row_counter) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", (&chat.id, &chat.user_id, &chat.content, &chat.response_id, Into::<String>::into(chat.role), &chat.context_id, &chat.created_at.to_string(), &chat.updated_at.to_string(), &chat.row_counter))?; 86 91 87 92 Ok(chat) 88 93 } 89 94 } 90 95 91 - fn get_last_entry_id(conn: &Connection, user_id: &str) -> Result<Option<Uuid>> { 96 + /// Returns the `id` of the last entry of the given user_id 97 + /// Used as the offset point for fetching the chat delta from the user_id 98 + pub fn get_last_entry_id(conn: &Connection, user_id: &str) -> Result<Option<Uuid>> { 92 99 match conn.query_row( 93 100 "select id from chats where user_id = ?1 order by id desc limit 1", 94 101 [user_id], 95 102 |row| row.get::<usize, String>(0), 96 103 ) { 97 - Ok(res) => Uuid::from_str(&res) 98 - .map_err(Into::into) 99 - .map(|uuid| Some(uuid)), 104 + Ok(res) => Uuid::from_str(&res).map_err(Into::into).map(Some), 100 105 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 101 106 Err(err) => Err(<rusqlite::Error as Into<anyhow::Error>>::into(err)), 102 107 } 103 108 } 104 109 110 + /// Returns the `row_counter` of the last entry of the given user_id 111 + /// Used as the offset point for fetching the chat delta from the user_id 112 + pub fn get_last_row_counter(conn: &Connection, user_id: &str) -> Result<i64> { 113 + match conn.query_row( 114 + "select max(row_counter) from chats where user_id = ?1", 115 + [user_id], 116 + |row| row.get::<usize, i64>(0), 117 + ) { 118 + Ok(res) => Ok(res), 119 + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0), 120 + // It returns NULL, if there are now no rows 121 + Err(rusqlite::Error::InvalidColumnType(_, _, _)) => Ok(0), 122 + Err(err) => Err(<rusqlite::Error as Into<anyhow::Error>>::into(err)), 123 + } 124 + } 105 125 /// Return list of rows.. 106 126 /// encoding is the job of network modules 107 - /// 108 - fn get_delta_since_id(conn: &Connection, user_id: &str, last_entry_id: &str) -> Result<Vec<Chats>> { 109 - let mut stmt = conn.prepare("select id, user_id, content, resp_id, role, context_id, created_at, updated_at from chats where user_id = ?1 and id > ?2 order by id")?; 127 + pub fn get_delta_since_id( 128 + conn: &Connection, 129 + user_id: &str, 130 + last_entry_id: &str, 131 + ) -> Result<Vec<Chats>> { 132 + let mut stmt = conn.prepare("select id, user_id, content, resp_id, role, context_id, created_at, updated_at , row_counter from chats where user_id = ?1 and id > ?2 order by id")?; 110 133 111 134 let chat_rows = stmt.query_map([user_id, last_entry_id], |row| { 112 135 let id: String = row.get(0)?; 113 136 let role: String = row.get(4)?; 114 137 let created_at: f64 = row.get(6)?; 115 138 let updated_at: f64 = row.get(7)?; 116 - let ctx_id: String = row.get(5)?; 117 - let resp_id: String = row.get(3)?; 118 - let resp_id_opt = if resp_id.is_empty() { 119 - None 120 - } else { 121 - Some(resp_id) 122 - }; 123 - let ctx_id_opt = if ctx_id.is_empty() { 124 - None 125 - } else { 126 - Some(Uuid::from_str(&ctx_id).map_err(FromSqlError::other)?) 127 - }; 139 + let resp_id: Option<String> = row.get(3)?; 140 + let ctx_id = row.get(5)?; 128 141 Ok(Chats { 129 - id: Uuid::from_str(&id).map_err(FromSqlError::other)?, 142 + id, 130 143 content: row.get(2)?, 131 - response_id: resp_id_opt, 144 + response_id: resp_id, 132 145 role: Role::from_str(&role).map_err(FromSqlError::other)?, 133 146 user_id: row.get(1)?, 134 - context_id: ctx_id_opt, 147 + context_id: ctx_id, 135 148 created_at: created_at as u64, 136 149 updated_at: updated_at as u64, 150 + row_counter: row.get(8)?, 137 151 }) 138 152 })?; 139 153 ··· 146 160 Ok(chats) 147 161 } 148 162 163 + fn apply_delta() -> Result<()> { 164 + // bulk insert 165 + 166 + unimplemented!() 167 + } 168 + 149 169 #[cfg(test)] 150 170 mod tests { 151 171 use std::time::{SystemTime, UNIX_EPOCH}; ··· 157 177 use crate::{ 158 178 core::{ 159 179 accounts::{ACCOUNT, User}, 160 - chats::{get_delta_since_id, get_last_entry_id, save_chat}, 180 + chats::{get_delta_since_id, get_last_row_counter, save_chat}, 161 181 }, 162 182 runtime::mlx::ChatResponse, 163 183 }; ··· 178 198 assert_eq!(saved.resp_id, None); 179 199 assert_eq!(saved.role, Into::<String>::into(Role::User)); 180 200 assert_eq!(saved.user_id, user.user_id); 181 - assert_eq!(saved.context_id, Uuid::nil().to_string()); 201 + assert_eq!(saved.context_id, None); 182 202 } 183 203 184 204 #[test] 185 205 fn test_valid_response_save_chat() { 186 206 let conn = setup_db_schema(); 187 207 let user = create_user(); 188 - let parent_chat_id = Uuid::now_v7(); 208 + let parent_chat_id = Uuid::now_v7().to_string(); 189 209 let chat_resp = ChatResponse { 190 210 reply: "reply".to_owned(), 191 211 code: "code".to_owned(), 192 212 prev_response_id: String::from("resp_prev"), 193 - parent_chat_id: Some(parent_chat_id), 213 + parent_chat_id: Some(parent_chat_id.clone()), 194 214 metrics: None, 195 215 }; 196 216 let input = "2+2"; ··· 198 218 199 219 assert_eq!(chat.user_id, user.user_id); 200 220 assert_eq!(chat.response_id.as_deref(), Some("resp_prev")); 201 - assert_eq!(chat.context_id, Some(parent_chat_id)); 221 + assert_eq!(chat.context_id, Some(parent_chat_id.clone())); 202 222 203 223 let saved = fetch_saved_chat_row(&conn, &chat.id); 204 224 assert_eq!(saved.content, input); 205 225 assert_eq!(saved.resp_id, Some(String::from("resp_prev"))); 206 226 assert_eq!(saved.role, Into::<String>::into(Role::Assistant)); 207 227 assert_eq!(saved.user_id, user.user_id); 208 - assert_eq!(saved.context_id, parent_chat_id.to_string()); 228 + assert_eq!(saved.context_id, Some(parent_chat_id.clone())); 209 229 } 210 230 211 231 #[test] ··· 216 236 reply: "reply".to_owned(), 217 237 code: "code".to_owned(), 218 238 prev_response_id: String::from("resp_prev"), 219 - parent_chat_id: None, 239 + parent_chat_id: Some(Uuid::now_v7().to_string()), 220 240 metrics: None, 221 241 }; 222 242 223 243 let chat = 224 244 save_chat(&conn, &user, "hello", Some(&chat_resp)).expect("chat should be saved"); 225 245 226 - assert_eq!(chat.context_id, None); 246 + assert!(chat.context_id.is_some()); 227 247 let saved = fetch_saved_chat_row(&conn, &chat.id); 228 248 assert_eq!(saved.role, Into::<String>::into(Role::Assistant)); 229 - assert_eq!(saved.context_id, Uuid::nil().to_string()); 249 + assert!(saved.context_id.is_some()); 230 250 } 231 251 232 252 #[test] ··· 252 272 } 253 273 254 274 #[test] 255 - fn test_get_last_entry() { 275 + fn test_last_row_counter() { 256 276 let conn = setup_db_schema(); 257 277 let user = create_user(); 258 278 let input = "2+2"; ··· 262 282 assert!(chat.response_id.is_none()); 263 283 assert!(chat.context_id.is_none()); 264 284 265 - let saved = get_last_entry_id(&conn, &user.user_id); 266 - assert!(saved.is_ok()) 285 + let saved = get_last_row_counter(&conn, &user.user_id); 286 + assert_eq!(saved.unwrap(), 1); 267 287 } 268 288 269 289 #[test] 270 - fn test_get_last_entry_without_entry() { 290 + fn test_get_last_row_counter_without_entry() { 271 291 let conn = setup_db_schema(); 272 292 let user = create_user(); 273 - let saved = get_last_entry_id(&conn, &user.user_id); 274 - println!("{:?}", saved); 275 - assert!(saved.unwrap().is_none()) 293 + let saved = get_last_row_counter(&conn, &user.user_id); 294 + assert_eq!(saved.unwrap(), 0) 276 295 } 277 296 278 297 #[test] 279 298 fn test_get_delta_diff() { 280 299 let conn = setup_db_schema(); 281 - 282 300 let user = create_user(); 283 301 let input = "2+2"; 284 302 let chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved"); ··· 287 305 let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 288 306 289 307 let rows = get_delta_since_id(&conn, &user.user_id, &chat_1.id.to_string()).unwrap(); 290 - println!("{:?}", rows); 308 + assert_eq!(rows.len(), 3); 309 + } 310 + 311 + #[test] 312 + fn test_get_delta_diff_empty_last_entry_id() { 313 + let conn = setup_db_schema(); 314 + let user = create_user(); 315 + let input = "2+2"; 316 + let _chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved"); 317 + let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 318 + let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 319 + let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 320 + 321 + let rows = get_delta_since_id(&conn, &user.user_id, "").unwrap(); 322 + assert_eq!(rows.len(), 4); 323 + } 324 + 325 + #[test] 326 + fn test_get_delta_diff_empty_wrong_user_id() { 327 + let conn = setup_db_schema(); 328 + let user = create_user(); 329 + let input = "2+2"; 330 + let _chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved"); 331 + let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 332 + let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 333 + let _ = save_chat(&conn, &user, input, None).expect("chat should be saved"); 334 + 335 + let rows = get_delta_since_id(&conn, "", "").unwrap(); 336 + assert_eq!(rows.len(), 0); 291 337 } 292 338 293 339 struct SavedChatRow { ··· 295 341 resp_id: Option<String>, 296 342 role: String, 297 343 user_id: String, 298 - context_id: String, 344 + context_id: Option<String>, 299 345 } 300 346 301 - fn fetch_saved_chat_row(conn: &Connection, chat_id: &Uuid) -> SavedChatRow { 347 + fn fetch_saved_chat_row(conn: &Connection, chat_id: &str) -> SavedChatRow { 302 348 conn.query_row( 303 349 "SELECT content, resp_id, role, user_id, context_id FROM chats WHERE id = ?1", 304 350 [chat_id.to_string()], ··· 344 390 user_id TEXT NOT NULL, 345 391 context_id TEXT , 346 392 created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), 347 - updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now')) 393 + updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), 394 + row_counter INTEGER, 395 + session_id TEXT 348 396 );", 349 397 [], 350 398 )
+30 -32
tiles/src/core/network/mod.rs
··· 21 21 Gossip, TopicId, 22 22 api::{Event, GossipReceiver, GossipSender}, 23 23 }; 24 - use iroh_ping::Ping; 25 - use iroh_tickets::endpoint::EndpointTicket; 26 24 use rusqlite::Connection; 27 25 use tilekit::accounts::{get_did_from_public_key, get_random_bytes, get_random_bytes_32}; 28 26 use tokio::task::spawn_blocking; ··· 75 73 } 76 74 77 75 // Entrypoint of network connection 78 - pub async fn init(ticket: Option<&str>) -> Result<()> { 79 - if let Some(ticket_addr) = ticket { 80 - let sender_endpoint = Endpoint::bind(presets::N0).await?; 81 - println!("{:?}", sender_endpoint.addr()); 82 - let se_clone = sender_endpoint.clone(); 83 - let send_pinger = Ping::new(); 84 - let rtt = send_pinger 85 - .ping( 86 - &sender_endpoint, 87 - EndpointTicket::from_str(ticket_addr)? 88 - .endpoint_addr() 89 - .clone(), 90 - ) 91 - .await?; 76 + // pub async fn init(ticket: Option<&str>) -> Result<()> { 77 + // if let Some(ticket_addr) = ticket { 78 + // let sender_endpoint = Endpoint::bind(presets::N0).await?; 79 + // println!("{:?}", sender_endpoint.addr()); 80 + // let se_clone = sender_endpoint.clone(); 81 + // let send_pinger = Ping::new(); 82 + // let rtt = send_pinger 83 + // .ping( 84 + // &sender_endpoint, 85 + // EndpointTicket::from_str(ticket_addr)? 86 + // .endpoint_addr() 87 + // .clone(), 88 + // ) 89 + // .await?; 92 90 93 - println!("ping took: {:?} to complete", rtt); 94 - se_clone.close().await; 95 - } else { 96 - let endpoint = Endpoint::bind(presets::N0).await?; 97 - let ep = endpoint.clone(); 98 - let ep2 = endpoint.clone(); 99 - endpoint.online().await; 91 + // println!("ping took: {:?} to complete", rtt); 92 + // se_clone.close().await; 93 + // } else { 94 + // let endpoint = Endpoint::bind(presets::N0).await?; 95 + // let ep = endpoint.clone(); 96 + // let ep2 = endpoint.clone(); 97 + // endpoint.online().await; 100 98 101 - let ping = Ping::new(); 99 + // let ping = Ping::new(); 102 100 103 - let ticket = EndpointTicket::new(endpoint.addr()); 101 + // let ticket = EndpointTicket::new(endpoint.addr()); 104 102 105 - println!("ticket\n{:?}", ticket.to_string()); 103 + // println!("ticket\n{:?}", ticket.to_string()); 106 104 107 - let recv_router = Router::builder(ep).accept(iroh_ping::ALPN, ping).spawn(); 108 - ep2.close().await; 109 - recv_router.shutdown().await?; 110 - } 111 - Ok(()) 112 - } 105 + // let recv_router = Router::builder(ep).accept(iroh_ping::ALPN, ping).spawn(); 106 + // ep2.close().await; 107 + // recv_router.shutdown().await?; 108 + // } 109 + // Ok(()) 110 + // } 113 111 114 112 pub async fn link(ticket: Option<String>) -> Result<()> { 115 113 let user_db_conn = get_db_conn(DBTYPE::COMMON)?;
+18 -3
tiles/src/core/storage/db.rs
··· 37 37 const COMMON_MIGRATIONS: Migrations = Migrations::from_slice(COMMON_MIGRATION_ARRAY); 38 38 39 39 // TODO: add the schema doc 40 - const CHATS_MIGRATION_ARRAY: &[M] = &[M::up( 41 - "CREATE TABLE IF NOT EXISTS chats ( 40 + const CHATS_MIGRATION_ARRAY: &[M] = &[ 41 + M::up( 42 + "CREATE TABLE IF NOT EXISTS chats ( 42 43 id TEXT PRIMARY KEY, 43 44 content TEXT NOT NULL, 44 45 resp_id TEXT, ··· 48 49 created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), 49 50 updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now')) 50 51 )", 51 - )]; 52 + ), 53 + // After creating row_counter, we backfill the row_counter for existing rows 54 + // which doesnt have any 55 + M::up( 56 + " 57 + ALTER TABLE CHATS ADD COLUMN row_counter INTEGER; 58 + UPDATE chats SET row_counter = ( 59 + SELECT rn FROM ( 60 + SELECT id, ROW_NUMBER() OVER ( PARTITION BY user_id ORDER BY id ) as rn FROM chats 61 + ) t WHERE t.id = chats.id ); 62 + 63 + ALTER TABLE CHATS ADD COLUMN session_id TEXT; 64 + ", 65 + ), 66 + ]; 52 67 53 68 const CHATS_MIGRATIONS: Migrations = Migrations::from_slice(CHATS_MIGRATION_ARRAY); 54 69
+1 -2
tiles/src/runtime/mlx.rs
··· 25 25 use tilekit::modelfile::Modelfile; 26 26 use tilekit::modelfile::Role; 27 27 use tokio::time::sleep; 28 - use uuid::Uuid; 29 28 30 29 #[derive(Debug, Deserialize, Serialize, Clone)] 31 30 pub struct BenchmarkMetrics { ··· 56 55 pub reply: String, 57 56 pub code: String, 58 57 pub prev_response_id: String, 59 - pub parent_chat_id: Option<Uuid>, 58 + pub parent_chat_id: Option<String>, 60 59 pub metrics: Option<BenchmarkMetrics>, 61 60 } 62 61