A local-first private AI assistant for everyday use. Runs on-device models with encrypted P2P sync, and supports sharing chats publicly on ATProto.
10
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Added bi-directional p2p append-only chat sync over internet

madclaws edba53f8 18a068d1

+284 -128
+1 -1
Cargo.lock
··· 6539 6539 6540 6540 [[package]] 6541 6541 name = "tiles" 6542 - version = "0.4.5" 6542 + version = "0.4.6" 6543 6543 dependencies = [ 6544 6544 "anyhow", 6545 6545 "async-std",
+2
justfile
··· 31 31 bundle_pkg_full: 32 32 ./pkg/build.sh 33 33 ./pkg/build_full.sh 34 + 35 + # runtiles: RUST_LOG=tiles=info,iroh=off cargo run
+1 -1
tilekit/src/optimize.rs
··· 120 120 println!("Optimizing Modelfile: {}", modelfile_path); 121 121 122 122 // 1. Read Modelfile 123 - let content = fs::read_to_string(&modelfile_path) 123 + let content = fs::read_to_string(modelfile_path) 124 124 .map_err(|e| format!("Error reading Modelfile: {}", e))?; 125 125 126 126 let mut modelfile: Modelfile = content
+1 -1
tiles/Cargo.toml
··· 1 1 [package] 2 2 name = "tiles" 3 - version = "0.4.5" 3 + version = "0.4.6" 4 4 edition = "2024" 5 5 6 6 [dependencies]
+70 -59
tiles/src/core/accounts.rs
··· 2 2 // Stuff related to account and identity system 3 3 use anyhow::{Result, anyhow}; 4 4 use iroh::SecretKey; 5 - use rusqlite::{Connection, types::FromSqlError}; 5 + use rusqlite::{Connection, Row, types::FromSqlError}; 6 6 use std::{ 7 7 fmt::Display, 8 8 time::{SystemTime, UNIX_EPOCH}, ··· 71 71 #[allow(dead_code)] 72 72 #[derive(Debug, Clone)] 73 73 pub struct User { 74 - pub id: uuid::Uuid, 74 + pub id: String, 75 75 pub user_id: String, 76 76 pub username: String, 77 77 pub active_profile: bool, ··· 208 208 209 209 fetch_current_user 210 210 .query_one([], |row| { 211 - let id: String = row.get(0)?; 212 211 let account_type: String = row.get(3)?; 213 212 let created_at: f64 = row.get(6)?; 214 213 let updated_at: f64 = row.get(7)?; 215 214 Ok(User { 216 - id: Uuid::try_parse(&id).map_err(FromSqlError::other)?, 215 + id: row.get(0)?, 217 216 user_id: row.get(1)?, 218 217 username: row.get(2)?, 219 218 account_type: ACCOUNT::try_from(account_type).map_err(FromSqlError::other)?, ··· 232 231 233 232 fetch_current_user 234 233 .query_one([did], |row| { 235 - let id: String = row.get(0)?; 236 234 let account_type: String = row.get(3)?; 237 235 let created_at: f64 = row.get(6)?; 238 236 let updated_at: f64 = row.get(7)?; 239 237 Ok(User { 240 - id: Uuid::try_parse(&id).map_err(FromSqlError::other)?, 238 + id: row.get(0)?, 241 239 user_id: row.get(1)?, 242 240 username: row.get(2)?, 243 241 account_type: ACCOUNT::try_from(account_type).map_err(FromSqlError::other)?, ··· 256 254 let config = get_or_create_config()?; 257 255 let root_user = get_root_user_details(&config)?; 258 256 let user = User { 259 - id: Uuid::now_v7(), 257 + id: Uuid::now_v7().to_string(), 260 258 user_id: root_user.id, 261 259 username: root_user.nickname, 262 260 account_type: ACCOUNT::LOCAL, ··· 290 288 // we will wait for it until we solve the sync part 291 289 pub fn save_peer_account_db(db_conn: &Connection, user_id: &str, nickname: &str) -> Result<()> { 292 290 let user = User { 293 - id: Uuid::now_v7(), 291 + id: Uuid::now_v7().to_string(), 294 292 user_id: String::from(user_id), 295 293 username: String::from(nickname), 296 294 account_type: ACCOUNT::PEER, ··· 320 318 Ok(()) 321 319 } 322 320 323 - pub fn get_user_by_user_id(conn: &Connection, user_id: String) -> Result<()> { 324 - let mut fetch_root_user = conn.prepare("select id from users where user_id = ?1")?; 325 - 326 - match fetch_root_user.query_one([user_id], |_row| Ok(())) { 327 - Ok(_) => Ok(()), 328 - Err(rusqlite::Error::QueryReturnedNoRows) => Err(anyhow!("User doesnt exist")), 329 - Err(_err) => Err(anyhow!("Fetching user from db failed")), 330 - } 331 - } 332 - 333 321 fn create_root_user(root_user_config: &Table, nickname: Option<String>) -> Result<Table> { 334 322 let mut root_user_table = root_user_config.clone(); 335 323 let app_name = if cfg!(debug_assertions) { ··· 349 337 } 350 338 } 351 339 340 + fn parse_user_from_row(row: &Row<'_>) -> Result<User, rusqlite::Error> { 341 + let account_type: String = row.get(3)?; 342 + let created_at: f64 = row.get(6)?; 343 + let updated_at: f64 = row.get(7)?; 344 + Ok(User { 345 + id: row.get(0)?, 346 + user_id: row.get(1)?, 347 + username: row.get(2)?, 348 + account_type: ACCOUNT::try_from(account_type).map_err(FromSqlError::other)?, 349 + active_profile: row.get(4)?, 350 + root: row.get(5)?, 351 + 352 + created_at: created_at as u64, 353 + updated_at: updated_at as u64, 354 + }) 355 + } 356 + /// Gets a peer by its DID 357 + pub fn get_user_info(conn: &Connection, did: &str) -> Result<User> { 358 + let mut fetch_user = conn.prepare("select id, user_id, username, account_type, active_profile, root, created_at, updated_at from users where user_id = ?1")?; 359 + 360 + match fetch_user.query_one([did], parse_user_from_row) { 361 + Ok(user) => Ok(user), 362 + Err(rusqlite::Error::QueryReturnedNoRows) => Err(anyhow!("Peer doesnt exist")), 363 + Err(err) => { 364 + log::error!("{:?}", err); 365 + Err(anyhow!("Fetching user from db failed due to {:?}", err)) 366 + } 367 + } 368 + } 369 + 352 370 pub fn get_peer_list(db_conn: &Connection) -> Result<Vec<User>> { 353 371 let mut stmt= db_conn.prepare("select id, user_id, username, account_type, active_profile, root, created_at, updated_at from users where account_type != \'local\'")?; 354 372 355 373 let user_rows = stmt 356 - .query_map([], |row| { 357 - let id: String = row.get(0)?; 358 - let account_type: String = row.get(3)?; 359 - let created_at: f64 = row.get(6)?; 360 - let updated_at: f64 = row.get(7)?; 361 - Ok(User { 362 - id: Uuid::try_parse(&id).map_err(FromSqlError::other)?, 363 - user_id: row.get(1)?, 364 - username: row.get(2)?, 365 - account_type: ACCOUNT::try_from(account_type).map_err(FromSqlError::other)?, 366 - active_profile: row.get(4)?, 367 - root: row.get(5)?, 368 - 369 - created_at: created_at as u64, 370 - updated_at: updated_at as u64, 371 - }) 372 - }) 374 + .query_map([], parse_user_from_row) 373 375 .map_err(<rusqlite::Error as Into<anyhow::Error>>::into)?; 374 376 375 377 let mut peer_list: Vec<User> = vec![]; ··· 407 409 Ok(SecretKey::from_bytes(&signing_key)) 408 410 } 409 411 412 + pub fn create_dummy_user() -> User { 413 + let id = Uuid::now_v7().to_string(); 414 + let chunk = id.split('-').collect::<Vec<&str>>()[0]; 415 + let user_id = format!("did:key:{}", id.split('-').collect::<Vec<&str>>()[0]); 416 + let username = format!("nickname-{}", chunk); 417 + User { 418 + id, 419 + user_id, 420 + username, 421 + account_type: ACCOUNT::PEER, 422 + active_profile: true, 423 + root: true, 424 + created_at: SystemTime::now() 425 + .duration_since(UNIX_EPOCH) 426 + .expect("time went backwards") 427 + .as_secs(), 428 + updated_at: SystemTime::now() 429 + .duration_since(UNIX_EPOCH) 430 + .expect("time went backwards") 431 + .as_secs(), 432 + } 433 + } 410 434 #[cfg(test)] 411 435 mod tests { 412 436 use super::*; ··· 657 681 fn test_get_current_user_valid() { 658 682 let conn = setup_db_schema(); 659 683 let user = User { 660 - id: Uuid::now_v7(), 684 + id: Uuid::now_v7().to_string(), 661 685 user_id: String::from("did"), 662 686 username: String::from("nickname"), 663 687 account_type: ACCOUNT::LOCAL, ··· 691 715 } 692 716 693 717 #[test] 694 - fn test_get_current_user_invalid_uuid_fails() { 695 - let conn = setup_db_schema(); 696 - conn.execute( 697 - "insert into users (id, user_id, username, active_profile, account_type, root, created_at, updated_at) 698 - values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", 699 - ( 700 - "not-a-uuid", 701 - "did:key:test", 702 - "nickname", 703 - true, 704 - "local", 705 - true, 706 - 1_i64, 707 - 1_i64, 708 - ), 709 - ) 710 - .unwrap(); 711 - 712 - assert!(get_current_user(&conn).is_err()); 713 - } 714 - 715 - #[test] 716 718 fn test_get_current_user_invalid_account_type_fails() { 717 719 let conn = setup_db_schema(); 718 720 conn.execute( ··· 758 760 759 761 fn create_user(conn: &Connection, account_type: ACCOUNT) -> User { 760 762 let user = User { 761 - id: Uuid::now_v7(), 763 + id: Uuid::now_v7().to_string(), 762 764 user_id: String::from("did"), 763 765 username: String::from("nickname"), 764 766 account_type, ··· 819 821 let local_user = create_user(&conn, ACCOUNT::LOCAL); 820 822 821 823 assert!(unlink(&conn, &local_user.user_id).is_err()) 824 + } 825 + 826 + #[test] 827 + fn test_get_user_info() { 828 + let conn = setup_db_schema(); 829 + let _local_user = create_user(&conn, ACCOUNT::LOCAL); 830 + save_peer_account_db(&conn, "did:jey:varathan", "varathan").unwrap(); 831 + let user_info = get_user_info(&conn, "did:jey:varathan"); 832 + assert!(user_info.is_ok()) 822 833 } 823 834 }
+60 -4
tiles/src/core/chats.rs
··· 6 6 use std::str::FromStr; 7 7 8 8 use crate::core::accounts::User; 9 + use crate::core::storage::db::get_db_conn; 9 10 use crate::runtime::mlx::ChatResponse; 10 11 use crate::utils::get_unix_time_now; 11 - use anyhow::Result; 12 + use anyhow::{Result, anyhow}; 13 + use log::info; 12 14 use rusqlite::types::FromSqlError; 13 15 use rusqlite::{Connection, params}; 14 16 use tilekit::modelfile::Role; 17 + use tokio::sync::mpsc::{self, Sender}; 18 + use tokio::sync::oneshot; 15 19 use uuid::Uuid; 16 20 // model the chats table 17 21 ··· 47 51 created_at: u64, 48 52 updated_at: u64, 49 53 row_counter: i64, 54 + } 55 + 56 + type Responder<T> = oneshot::Sender<T>; 57 + pub enum SyncOp { 58 + GetLastRowCounter { 59 + user_id: String, 60 + resp: Responder<Result<i64>>, 61 + }, 62 + GetEncodedData { 63 + user_id: String, 64 + last_row_counter: i64, 65 + resp: Responder<Result<Vec<u8>>>, 66 + }, 67 + ApplyDelta { 68 + delta: Vec<u8>, 69 + resp: Responder<Result<()>>, 70 + }, 50 71 } 51 72 52 73 pub fn save_chat( ··· 178 199 Err(rusqlite::Error::SqliteFailure(_, Some(reason))) 179 200 if reason == "UNIQUE constraint failed: chats.id" => 180 201 { 181 - log::error!( 202 + log::warn!( 182 203 "err in writing row {:?}, already exists, skipping", 183 204 &chat.id 184 205 ); ··· 199 220 ) -> Result<Vec<u8>> { 200 221 let delta = get_delta(conn, user_id, last_row_couter)?; 201 222 Ok(encode_delta_to_bytes(&delta)) 223 + } 224 + 225 + pub fn create_sync_channel() -> Sender<SyncOp> { 226 + let (tx, mut rx) = mpsc::channel::<SyncOp>(32); 227 + 228 + tokio::spawn(async move { 229 + let mut chat_db_conn = get_db_conn(super::storage::db::DBTYPE::CHAT)?; 230 + info!("DB sync channel ready.."); 231 + while let Some(msg) = rx.recv().await { 232 + match msg { 233 + SyncOp::GetLastRowCounter { user_id, resp } => { 234 + let counter = get_last_row_counter(&chat_db_conn, &user_id); 235 + resp.send(counter) 236 + .map_err(|_op| anyhow!("Error sending counter"))?; 237 + } 238 + SyncOp::GetEncodedData { 239 + user_id, 240 + last_row_counter, 241 + resp, 242 + } => { 243 + let encoded_res = get_encoded_delta(&chat_db_conn, &user_id, last_row_counter); 244 + resp.send(encoded_res) 245 + .map_err(|_op| anyhow!("Error sending encoded_delta"))?; 246 + } 247 + SyncOp::ApplyDelta { delta, resp } => { 248 + let chat_rows = decode_delta_from_bytes(&delta)?; 249 + let apply_res = apply_delta(&mut chat_db_conn, &chat_rows); 250 + resp.send(apply_res) 251 + .map_err(|_| anyhow!("Error sending apply delta response"))?; 252 + } 253 + } 254 + } 255 + Ok::<(), anyhow::Error>(()) 256 + }); 257 + tx 202 258 } 203 259 204 260 fn encode_delta_to_bytes(delta_chats: &Vec<Chats>) -> Vec<u8> { ··· 597 653 598 654 fn create_user() -> User { 599 655 User { 600 - id: Uuid::now_v7(), 656 + id: Uuid::now_v7().to_string(), 601 657 user_id: String::from("did"), 602 658 username: String::from("nickname"), 603 659 account_type: ACCOUNT::LOCAL, ··· 615 671 } 616 672 fn create_user_by_id(user_id: &str) -> User { 617 673 User { 618 - id: Uuid::now_v7(), 674 + id: Uuid::now_v7().to_string(), 619 675 user_id: String::from(user_id), 620 676 username: String::from("nickname"), 621 677 account_type: ACCOUNT::LOCAL,
+139 -60
tiles/src/core/network/mod.rs
··· 24 24 api::{Event, GossipReceiver, GossipSender}, 25 25 }; 26 26 27 + use log::info; 27 28 use rusqlite::Connection; 28 29 use tilekit::accounts::{ 29 30 get_did_from_public_key, get_public_key_from_did, get_random_bytes, get_random_bytes_32, 30 31 }; 31 - use tokio::task::spawn_blocking; 32 + use tokio::{ 33 + sync::{mpsc::Sender, oneshot}, 34 + task::spawn_blocking, 35 + }; 32 36 use uuid::Uuid; 33 37 34 38 use crate::core::{ 35 39 accounts::{ 36 - self, get_app_secret_key, get_current_user, get_user_by_user_id, save_peer_account_db, 40 + self, create_dummy_user, get_app_secret_key, get_current_user, get_user_info, 41 + save_peer_account_db, 37 42 }, 38 - chats::{get_delta, get_encoded_delta, get_last_row_counter}, 43 + chats::{SyncOp, create_sync_channel}, 39 44 network::ticket::{EndpointUserData, LinkTicket}, 40 45 storage::db::{DBTYPE, get_db_conn}, 41 46 }; 47 + use owo_colors::OwoColorize; 42 48 use sha2::{Digest, Sha256}; 43 49 44 50 const DEVICE_LINK_LOCAL_TOPIC: &str = "com.tilesprivacy.tiles.link"; ··· 118 124 did = endpoint_user_data.did; 119 125 nickname = endpoint_user_data.nickname; 120 126 }; 121 - if get_user_by_user_id(&user_db_conn, did.to_owned()).is_ok() { 127 + if get_user_info(&user_db_conn, &did).is_ok() { 122 128 println!("Device {}({}) already linked", nickname, did); 123 129 return Ok(()); 124 130 } ··· 328 334 mut receiver: GossipReceiver, 329 335 sender: GossipSender, 330 336 user: accounts::User, 331 - user_db_conn: Connection, 332 337 store: MemStore, 333 338 endpoint: Endpoint, 339 + sync_channel_sender: Sender<SyncOp>, 334 340 ) -> Result<()> { 335 341 while let Some(event) = receiver.try_next().await? { 336 - if cfg!(debug_assertions) { 337 - println!("SYNC_LOOP: In {}:, some event {:?}", user.username, event); 338 - } 342 + info!( 343 + "SYNC_LOOP: Received by {}:, event {:?}", 344 + user.username, event 345 + ); 339 346 if let Event::Received(msg) = event { 340 347 let pub_key = msg.delivered_from; 341 348 let msg = NetworkMessage::from_bytes(&msg.content)?; ··· 350 357 MessageBody::SyncStart { 351 358 last_row_counter: _, 352 359 } => { 353 - // TODO: REJECT SYNC_REQUESTS FROM NON_PEERS 354 - println!("Received sync start"); 355 - on_sync_start_event(&sender, &store, &msg, pub_key, &user).await?; 360 + info!("Received sync start event..."); 361 + on_sync_start_event( 362 + &sender, 363 + &store, 364 + &msg, 365 + pub_key, 366 + &user, 367 + &sync_channel_sender, 368 + ) 369 + .await?; 356 370 } 357 371 MessageBody::SyncSendDeltaInfo { 358 372 blob_ticket: _, 359 373 last_row_counter: _, 360 374 } => { 361 - on_sync_send_delta_info(&sender, &store, &msg, pub_key, &user, &endpoint) 362 - .await?; 375 + on_sync_send_delta_info( 376 + &sender, 377 + &store, 378 + &msg, 379 + pub_key, 380 + &user, 381 + &endpoint, 382 + &sync_channel_sender, 383 + ) 384 + .await?; 363 385 } 364 386 MessageBody::SyncEnd => { 365 - println!("sync end, can close"); 387 + println!("Sync completed..., you can exit now"); 366 388 } 367 389 msg_body => { 368 - println!("Invalid sync message {:?}", msg_body) 390 + info!("Invalid sync message {:?}", msg_body) 369 391 } 370 392 } 371 393 } ··· 396 418 pub async fn sync(did: Option<String>) -> Result<()> { 397 419 let user_db_conn = get_db_conn(DBTYPE::COMMON)?; 398 420 let user = get_current_user(&user_db_conn)?; 399 - let chat_db_conn = get_db_conn(DBTYPE::CHAT)?; 400 421 let endpoint = create_endpoint(&user).await?; 401 422 let is_online = is_online(&endpoint).await; 423 + let tx = create_sync_channel(); 402 424 if let Some(receiver_did) = did { 403 425 // INITIATOR BLOCK 404 426 // The sync gossip topic is basically derived from the receiver's 405 427 // DID, so that initiator's can directly connect w/o any 406 428 // initial handshake 407 429 let receiver_pub_key = get_public_key_from_did(&receiver_did)?; 430 + let receiver_user = if let Ok(receiver_user) = get_user_info(&user_db_conn, &receiver_did) { 431 + receiver_user 432 + } else { 433 + if cfg!(debug_assertions) == false { 434 + eprintln!("The DID {} is not a linked peer", receiver_did); 435 + return Ok(()); 436 + } 437 + info!("creating a dummy user"); 438 + create_dummy_user() 439 + }; 440 + 408 441 let receiver_endpoint_id = PublicKey::from_bytes(&receiver_pub_key)?; 409 - println!("receiver endpoint id {:?}", receiver_endpoint_id); 442 + info!("receiver endpoint id {:?}", receiver_endpoint_id); 410 443 let sync_topic = format!("sync:{}", receiver_did); 411 444 let sync_topic_id = create_topic_id(&sync_topic); 412 445 ··· 418 451 receiver, 419 452 sender.clone(), 420 453 user.clone(), 421 - user_db_conn, 422 454 store, 423 455 endpoint.clone(), 456 + tx.clone(), 424 457 )); 425 458 426 - // get the last_row_counter 427 - // 428 - let receiver_last_row_counter = get_last_row_counter(&chat_db_conn, &receiver_did)?; 459 + let receiver_last_row_counter = fetch_last_row_counter(&receiver_did, &tx).await?; 429 460 let sync_start_msg = NetworkMessage::new( 430 461 &user, 431 462 is_online, ··· 434 465 }, 435 466 ); 436 467 sender.broadcast(sync_start_msg.to_bytes().into()).await?; 468 + info!("Sent SyncStart event"); 437 469 438 470 println!( 439 - "\nSent sync start request to {}({})", 440 - user.username, user.user_id 471 + "\nSyncing in progress with ....{}({})", 472 + receiver_user.username, receiver_did 441 473 ); 442 474 tokio::signal::ctrl_c().await?; 443 475 recv_router.shutdown().await?; ··· 447 479 // public-key, so that initiator's can directly connect w/o any 448 480 // initial handshake 449 481 450 - println!("endpointId {:?}", endpoint.id()); 451 482 let did = if cfg!(debug_assertions) { 452 483 let pub_key = endpoint.id(); 453 484 &get_did_from_public_key(pub_key.as_bytes())? ··· 459 490 let sync_topic_id = create_topic_id(&sync_topic); 460 491 let (sender, receiver, recv_router, store) = 461 492 create_sync_network(&endpoint, sync_topic_id, vec![]).await?; 462 - 493 + info!("sync gossip network created"); 463 494 tokio::spawn(sync_subscribe_loop( 464 495 receiver, 465 496 sender.clone(), 466 497 user.clone(), 467 - user_db_conn, 468 498 store, 469 499 endpoint.clone(), 500 + tx.clone(), 470 501 )); 471 - println!("Ready to accept sync requests from peers..."); 502 + println!("{}", "Ready to accept sync requests from peers...".blue()); 472 503 473 - // Since in dev, we use create endpoints, at the initiator side 504 + // Since in dev, we create endpoints randomly, at the initiator side 474 505 // we can use the DID derived from this, instead of actual ones 475 506 // for the network to form correctly 476 507 if cfg!(debug_assertions) { ··· 604 635 } 605 636 } 606 637 638 + async fn fetch_last_row_counter(user_id: &str, sender: &Sender<SyncOp>) -> Result<i64> { 639 + let (sendx, recvx) = oneshot::channel(); 640 + let sync_op_msg = SyncOp::GetLastRowCounter { 641 + user_id: user_id.to_owned(), 642 + resp: sendx, 643 + }; 644 + 645 + sender.send(sync_op_msg).await?; 646 + recvx.await? 647 + } 648 + 649 + async fn fetch_encoded_delta_ticket( 650 + user_id: &str, 651 + sender: &Sender<SyncOp>, 652 + lrc: i64, 653 + store: &MemStore, 654 + delivered_from: PublicKey, 655 + ) -> Result<BlobTicket> { 656 + let (sendx, recvx) = oneshot::channel(); 657 + 658 + let sync_op_msg = SyncOp::GetEncodedData { 659 + user_id: user_id.to_owned(), 660 + last_row_counter: lrc, 661 + resp: sendx, 662 + }; 663 + 664 + sender.send(sync_op_msg).await?; 665 + let encoded_data_result = recvx.await??; 666 + 667 + let tag = store 668 + .blobs() 669 + .add_bytes(Into::<Bytes>::into(encoded_data_result)) 670 + .await?; 671 + 672 + Ok(BlobTicket::new(delivered_from.into(), tag.hash, tag.format)) 673 + } 607 674 async fn on_sync_start_event( 608 675 sender: &GossipSender, 609 676 store: &MemStore, 610 677 msg: &NetworkMessage, 611 678 delivered_from: PublicKey, 612 679 user: &accounts::User, 613 - // chat_db_conn: &Connection, 680 + sync_channel_sender: &Sender<SyncOp>, 614 681 ) -> Result<()> { 615 - println!("Received sync start"); 616 682 if let MessageBody::SyncStart { 617 683 last_row_counter: lrc, 618 684 } = &msg.body 619 685 { 620 - // let chat_delta = get_encoded_delta( 621 - // &chat_db_conn, 622 - // &user.user_id, 623 - // lrc.expect("Expected a valid last row counter"), 624 - // ); 625 - // let chat_delta_bytes = 626 - let rand_bytes = get_random_bytes_32().to_vec(); 627 - println!("rand bytes\n{:?}", rand_bytes); 628 - let tag = store 629 - .blobs() 630 - .add_bytes(Into::<Bytes>::into(rand_bytes)) 631 - .await?; 686 + let sender_did = get_did_from_public_key(delivered_from.as_bytes())?; 687 + let ticket = fetch_encoded_delta_ticket( 688 + &user.user_id, 689 + sync_channel_sender, 690 + lrc.expect("lrc failed"), 691 + store, 692 + delivered_from, 693 + ) 694 + .await?; 695 + 696 + let receiver_last_row_counter = 697 + fetch_last_row_counter(&sender_did, sync_channel_sender).await?; 632 698 633 - let ticket = BlobTicket::new(delivered_from.into(), tag.hash, tag.format); 634 699 let delta_info = NetworkMessage::new( 635 - &user, 700 + user, 636 701 msg.is_online, 637 702 MessageBody::SyncSendDeltaInfo { 638 703 blob_ticket: ticket.to_string(), 639 - last_row_counter: Some(0), 704 + last_row_counter: Some(receiver_last_row_counter), 640 705 }, 641 706 ); 642 707 sender.broadcast(delta_info.to_bytes().into()).await?; 643 - println!("Sent blob ticket {}", ticket.to_string()); 708 + info!("Sent blob ticket {} to {}", ticket, sender_did); 644 709 } 645 710 Ok(()) 646 711 } ··· 652 717 delivered_from: PublicKey, 653 718 user: &accounts::User, 654 719 endpoint: &Endpoint, 720 + sync_channel_sender: &Sender<SyncOp>, 655 721 ) -> Result<()> { 656 722 if let MessageBody::SyncSendDeltaInfo { 657 723 blob_ticket, ··· 659 725 } = &msg.body 660 726 { 661 727 let ticket: BlobTicket = blob_ticket.parse()?; 662 - let downloader = store.downloader(&endpoint); 728 + let downloader = store.downloader(endpoint); 663 729 downloader 664 730 .download(ticket.hash(), Some(delivered_from)) 665 731 .await?; 666 732 667 733 let data = store.blobs().get_bytes(ticket.hash()).await?; 668 - println!("rand bytes received {:?}", data.to_vec()); 734 + 735 + info!("Downloaded data diff"); 736 + let (sendx, recvx) = oneshot::channel(); 737 + let sync_op_msg = SyncOp::ApplyDelta { 738 + delta: data.to_vec(), 739 + resp: sendx, 740 + }; 669 741 670 - println!("finished download"); 671 - if let Some(_row_counter) = last_row_counter { 672 - let rand_bytes = get_random_bytes_32().to_vec(); 673 - println!("rand bytes\n{:?}", rand_bytes); 674 - let tag = store 675 - .blobs() 676 - .add_bytes(Into::<Bytes>::into(rand_bytes)) 677 - .await?; 742 + sync_channel_sender.send(sync_op_msg).await?; 743 + 744 + recvx.await??; 745 + info!("Diff applied successfully"); 678 746 679 - let ticket = BlobTicket::new(delivered_from.into(), tag.hash, tag.format); 747 + // last_row_counter None means its end of sync relay 748 + if let Some(row_counter) = last_row_counter { 749 + let ticket = fetch_encoded_delta_ticket( 750 + &user.user_id, 751 + sync_channel_sender, 752 + *row_counter, 753 + store, 754 + delivered_from, 755 + ) 756 + .await?; 680 757 let delta_info = NetworkMessage::new( 681 - &user, 758 + user, 682 759 msg.is_online, 683 760 MessageBody::SyncSendDeltaInfo { 684 761 blob_ticket: ticket.to_string(), ··· 686 763 }, 687 764 ); 688 765 sender.broadcast(delta_info.to_bytes().into()).await?; 766 + info!("Sent blob ticket {} to {}", ticket, delivered_from); 689 767 } else { 690 - let stop_req = NetworkMessage::new(&user, msg.is_online, MessageBody::SyncEnd); 768 + let stop_req = NetworkMessage::new(user, msg.is_online, MessageBody::SyncEnd); 691 769 sender.broadcast(stop_req.to_bytes().into()).await?; 692 - println!("sync end, can close"); 770 + info!("sync ended"); 771 + println!("\nSync completed..., you can exit now"); 693 772 } 694 773 } 695 774 Ok(())
+10 -2
tiles/src/main.rs
··· 1 - #![warn(clippy::pedantic)] 1 + // #![warn(clippy::pedantic)] 2 2 3 3 use std::error::Error; 4 4 ··· 185 185 } 186 186 #[tokio::main] 187 187 pub async fn main() -> Result<(), Box<dyn Error>> { 188 - env_logger::init(); 188 + env_logger::try_init()?; 189 189 let cli = Cli::parse(); 190 190 let runtime = build_runtime(); 191 191 match cli.command { ··· 279 279 } 280 280 Ok(()) 281 281 } 282 + 283 + // fn build_logger() -> Result<(), SetLoggerError> { 284 + // let mut env_builder = env_logger::Builder::new(); 285 + // if !cfg!(debug_assertions) { 286 + // env_builder.filter_module("iroh", log::LevelFilter::Off); 287 + // } 288 + // env_builder.try_init() 289 + // }