···3333 pub content: String,
3434}
35353636-#[derive(Debug)]
3636+#[derive(Debug, serde::Serialize, serde::Deserialize)]
3737pub struct Chats {
3838 pub id: String,
3939 content: String,
···122122 Err(err) => Err(<rusqlite::Error as Into<anyhow::Error>>::into(err)),
123123 }
124124}
125125-/// Return list of rows..
126126-/// encoding is the job of network modules
125125+/// Return list of rows for the given `user_id` since `last_row_counter`
127126pub fn get_delta(conn: &Connection, user_id: &str, last_row_couter: i64) -> Result<Vec<Chats>> {
128127 let mut stmt = conn.prepare("select id, user_id, content, resp_id, role, context_id, created_at, updated_at , row_counter from chats where user_id = ?1 and row_counter > ?2 order by id")?;
129128···156155 Ok(chats)
157156}
158157159159-pub fn apply_delta(chat_conn: &mut Connection, delta_chats: Vec<Chats>) -> Result<()> {
160160- // bulk insert
161161- // TODO: Handle primary key conflict, for now upsert it, later
162162- // do LWW based on iss of UCAN
158158+pub fn apply_delta(chat_conn: &mut Connection, delta_chats: &Vec<Chats>) -> Result<()> {
159159+ // TODO: Handle primary key conflict, for now reject it (in a way its impossible to have this scenario, and if its occuring then that means
160160+ // some issue in syncing, so ignore it, by rejecting it), later
161161+ // do LWW based on issuer of UCAN
163162 let txn = chat_conn.transaction()?;
164163 {
165164 let mut stmt = txn.prepare("insert into chats(id, user_id, content, resp_id, role, context_id, created_at, updated_at, row_counter) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)")?;
166165167166 for chat in delta_chats {
168168- stmt.execute(params![
167167+ match stmt.execute(params![
169168 &chat.id.to_string(),
170169 &chat.user_id,
171170 &chat.content,
···175174 &chat.created_at.to_string(),
176175 &chat.updated_at.to_string(),
177176 &chat.row_counter,
178178- ])?;
177177+ ]) {
178178+ Err(rusqlite::Error::SqliteFailure(_, Some(reason)))
179179+ if reason == "UNIQUE constraint failed: chats.id" =>
180180+ {
181181+ log::error!(
182182+ "err in writing row {:?}, already exists, skipping",
183183+ &chat.id
184184+ );
185185+ }
186186+ _ => (),
187187+ }
179188 }
180189 }
181190 txn.commit()?;
···183192 Ok(())
184193}
185194195195+pub fn get_encoded_delta(
196196+ conn: &Connection,
197197+ user_id: &str,
198198+ last_row_couter: i64,
199199+) -> Result<Vec<u8>> {
200200+ let delta = get_delta(conn, user_id, last_row_couter)?;
201201+ Ok(encode_delta_to_bytes(&delta))
202202+}
203203+204204+fn encode_delta_to_bytes(delta_chats: &Vec<Chats>) -> Vec<u8> {
205205+ postcard::to_stdvec(delta_chats).expect("Failed to convert to bytes with postcard")
206206+}
207207+208208+fn decode_delta_from_bytes(bytes: &[u8]) -> Result<Vec<Chats>> {
209209+ postcard::from_bytes(bytes).map_err(Into::into)
210210+}
211211+186212#[cfg(test)]
187213mod tests {
214214+188215 use std::time::{SystemTime, UNIX_EPOCH};
189216190217 use rusqlite::Connection;
···194221 use crate::{
195222 core::{
196223 accounts::{ACCOUNT, User},
197197- chats::{apply_delta, get_delta, get_last_row_counter, save_chat},
224224+ chats::{
225225+ apply_delta, decode_delta_from_bytes, encode_delta_to_bytes, get_delta,
226226+ get_last_row_counter, save_chat,
227227+ },
198228 },
199229 runtime::mlx::ChatResponse,
230230+ utils::test_logger,
200231 };
201232202233 #[test]
···366397367398 let rows = get_delta(&conn, &user.user_id, 0).unwrap();
368399 assert_eq!(rows.len(), 4);
369369- assert!(apply_delta(&mut conn_2, rows).is_ok());
400400+ assert!(apply_delta(&mut conn_2, &rows).is_ok());
401401+ let rows = get_delta(&conn_2, &user.user_id, 0).unwrap();
402402+ assert_eq!(rows.len(), 4);
403403+ }
404404+405405+ #[test]
406406+ fn test_e2e_delta_roundtrip() {
407407+ let conn = setup_db_schema();
408408+ let mut conn_2 = setup_db_schema();
409409+ let user = create_user();
410410+ let input = "2+2";
411411+ let _chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved");
412412+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
413413+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
414414+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
415415+416416+ let rows = get_delta(&conn, &user.user_id, 0).unwrap();
417417+ assert_eq!(rows.len(), 4);
418418+ let chat_bytes = encode_delta_to_bytes(&rows);
419419+ let decoded_chat = decode_delta_from_bytes(&chat_bytes).unwrap();
420420+ assert!(apply_delta(&mut conn_2, &decoded_chat).is_ok());
421421+ let rows = get_delta(&conn_2, &user.user_id, 0).unwrap();
422422+ assert_eq!(rows.len(), 4);
423423+ }
424424+425425+ #[test]
426426+ fn test_e2e_delta_roundtrip_w_empty_bytes() {
427427+ let conn = setup_db_schema();
428428+ let mut conn_2 = setup_db_schema();
429429+ let user = create_user();
430430+ let input = "2+2";
431431+ let _chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved");
432432+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
433433+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
434434+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
435435+436436+ let rows = get_delta(&conn, &user.user_id, 4).unwrap();
437437+ assert_eq!(rows.len(), 0);
438438+ let chat_bytes = encode_delta_to_bytes(&rows);
439439+ let decoded_chat = decode_delta_from_bytes(&chat_bytes).unwrap();
440440+ assert!(apply_delta(&mut conn_2, &decoded_chat).is_ok());
441441+ let rows = get_delta(&conn_2, &user.user_id, 0).unwrap();
442442+ assert_eq!(rows.len(), 0);
443443+ }
444444+445445+ #[test]
446446+ fn test_non_zero_last_counter_delta() {
447447+ let conn = setup_db_schema();
448448+ let mut _conn_2 = setup_db_schema();
449449+ let user = create_user();
450450+ let input = "2+2";
451451+ let chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved");
452452+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
453453+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
454454+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
455455+ let rows = get_delta(&conn, &user.user_id, chat_1.row_counter).unwrap();
456456+ assert_eq!(rows.len(), 3);
457457+ }
458458+459459+ #[test]
460460+ fn test_duplicate_row_apply() {
461461+ test_logger();
462462+ let conn = setup_db_schema();
463463+ let mut conn_2 = setup_db_schema();
464464+ let user = create_user();
465465+ let input = "2+2";
466466+ let _chat_1 = save_chat(&conn, &user, input, None).expect("chat should be saved");
467467+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
468468+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
469469+ let _ = save_chat(&conn, &user, input, None).expect("chat should be saved");
470470+471471+ let rows = get_delta(&conn, &user.user_id, 0).unwrap();
472472+ assert_eq!(rows.len(), 4);
473473+ let chat_bytes = encode_delta_to_bytes(&rows);
474474+ let decoded_chat = decode_delta_from_bytes(&chat_bytes).unwrap();
475475+ assert!(apply_delta(&mut conn_2, &decoded_chat).is_ok());
476476+ let rows = get_delta(&conn_2, &user.user_id, 0).unwrap();
477477+ assert_eq!(rows.len(), 4);
478478+ assert!(apply_delta(&mut conn_2, &decoded_chat).is_ok());
370479 let rows = get_delta(&conn_2, &user.user_id, 0).unwrap();
371480 assert_eq!(rows.len(), 4);
372481 }
373482483483+ #[test]
484484+ fn test_e2e_syncing_both_ways_w_eventual_consistency() {
485485+ test_logger();
486486+ let mut conn = setup_db_schema();
487487+ let mut conn_2 = setup_db_schema();
488488+ let user_a = create_user_by_id("user_a");
489489+ let user_b = create_user_by_id("user_b");
490490+491491+ // Node user A adds stuff
492492+ let input = "2+2";
493493+ let _chat_1 = save_chat(&conn, &user_a, input, None).expect("chat should be saved");
494494+ let _ = save_chat(&conn, &user_a, input, None).expect("chat should be saved");
495495+ let _ = save_chat(&conn, &user_a, input, None).expect("chat should be saved");
496496+ let _ = save_chat(&conn, &user_a, input, None).expect("chat should be saved");
497497+498498+ // Node user B adds stuff
499499+ let input = "4+4";
500500+ let _chat_1 = save_chat(&conn_2, &user_b, input, None).expect("chat should be saved");
501501+ let _ = save_chat(&conn_2, &user_b, input, None).expect("chat should be saved");
502502+ let _ = save_chat(&conn_2, &user_b, input, None).expect("chat should be saved");
503503+ let _ = save_chat(&conn_2, &user_b, input, None).expect("chat should be saved");
504504+505505+ // Node A wants to sync with Node B
506506+507507+ // 1. So its sends last_row_counter of Node B to Node B and hopefully
508508+ // it sends the diff since then and last_row_counter of Node A back..
509509+510510+ let user_b_last_entry_of_user_a = get_last_row_counter(&conn, &user_b.user_id).unwrap();
511511+512512+ // user_b is extracting the row's of it since the given last_row_counter
513513+ let user_bs_diff_rows =
514514+ get_delta(&conn_2, &user_b.user_id, user_b_last_entry_of_user_a).unwrap();
515515+516516+ assert_eq!(user_bs_diff_rows.len(), 4);
517517+518518+ // user_bs diff is encoded
519519+ let user_b_chat_bytes = encode_delta_to_bytes(&user_bs_diff_rows);
520520+521521+ // send to user_a and its decoded
522522+ let user_b_decoded_chat = decode_delta_from_bytes(&user_b_chat_bytes).unwrap();
523523+524524+ // Now user_a is gonna apply the user_b diff
525525+ assert!(apply_delta(&mut conn, &user_b_decoded_chat).is_ok());
526526+527527+ // Just checking if we user_a has all 8 rows
528528+529529+ let user_a_rows = conn
530530+ .query_row("select count(*) from chats", [], |row| {
531531+ row.get::<usize, i64>(0)
532532+ })
533533+ .unwrap();
534534+535535+ assert_eq!(user_a_rows, 8);
536536+537537+ // cool, now lets do the reverse sync, user B syncs user A stuff
538538+539539+ let user_a_last_entry_of_user_b = get_last_row_counter(&conn_2, &user_a.user_id).unwrap();
540540+541541+ // user_a is extracting the row's of it since the given last_row_counter
542542+ let user_as_diff_rows =
543543+ get_delta(&conn, &user_a.user_id, user_a_last_entry_of_user_b).unwrap();
544544+545545+ assert_eq!(user_as_diff_rows.len(), 4);
546546+547547+ // user_as diff is encoded
548548+ let user_a_chat_bytes = encode_delta_to_bytes(&user_as_diff_rows);
549549+550550+ // send to user_b and its decoded
551551+ let user_a_decoded_chat = decode_delta_from_bytes(&user_a_chat_bytes).unwrap();
552552+553553+ // Now user_b is gonna apply the user_b diff
554554+ assert!(apply_delta(&mut conn_2, &user_a_decoded_chat).is_ok());
555555+556556+ // Just checking eventual consistency
557557+558558+ let user_a_rows = conn
559559+ .query_row("select count(*) from chats", [], |row| {
560560+ row.get::<usize, i64>(0)
561561+ })
562562+ .unwrap();
563563+564564+ let user_b_rows = conn_2
565565+ .query_row("select count(*) from chats", [], |row| {
566566+ row.get::<usize, i64>(0)
567567+ })
568568+ .unwrap();
569569+570570+ assert_eq!(user_a_rows, user_b_rows);
571571+ }
572572+374573 struct SavedChatRow {
375574 content: String,
376575 resp_id: Option<String>,
···400599 User {
401600 id: Uuid::now_v7(),
402601 user_id: String::from("did"),
602602+ username: String::from("nickname"),
603603+ account_type: ACCOUNT::LOCAL,
604604+ active_profile: true,
605605+ root: true,
606606+ created_at: SystemTime::now()
607607+ .duration_since(UNIX_EPOCH)
608608+ .expect("time went backwards")
609609+ .as_secs(),
610610+ updated_at: SystemTime::now()
611611+ .duration_since(UNIX_EPOCH)
612612+ .expect("time went backwards")
613613+ .as_secs(),
614614+ }
615615+ }
616616+ fn create_user_by_id(user_id: &str) -> User {
617617+ User {
618618+ id: Uuid::now_v7(),
619619+ user_id: String::from(user_id),
403620 username: String::from("nickname"),
404621 account_type: ACCOUNT::LOCAL,
405622 active_profile: true,
+275-44
tiles/src/core/network/mod.rs
···99};
10101111use anyhow::Result;
1212+use axum::body::Bytes;
1213use futures_util::{StreamExt, TryStreamExt};
1314use iroh::{
1415 Endpoint, EndpointId, NET_REPORT_TIMEOUT, PublicKey,
···1718 endpoint_info::UserData,
1819 protocol::Router,
1920};
2121+use iroh_blobs::{BlobsProtocol, store::mem::MemStore, ticket::BlobTicket};
2022use iroh_gossip::{
2123 Gossip, TopicId,
2224 api::{Event, GossipReceiver, GossipSender},
2325};
2626+2427use rusqlite::Connection;
2525-use tilekit::accounts::{get_did_from_public_key, get_random_bytes, get_random_bytes_32};
2828+use tilekit::accounts::{
2929+ get_did_from_public_key, get_public_key_from_did, get_random_bytes, get_random_bytes_32,
3030+};
2631use tokio::task::spawn_blocking;
2732use uuid::Uuid;
2833···3035 accounts::{
3136 self, get_app_secret_key, get_current_user, get_user_by_user_id, save_peer_account_db,
3237 },
3838+ chats::{get_delta, get_encoded_delta, get_last_row_counter},
3339 network::ticket::{EndpointUserData, LinkTicket},
3440 storage::db::{DBTYPE, get_db_conn},
3541};
···6470 }
6571}
66726767-#[derive(serde::Serialize, serde::Deserialize)]
7373+#[derive(serde::Serialize, serde::Deserialize, Debug)]
6874#[allow(clippy::enum_variant_names)]
6975enum MessageBody {
7070- LinkRequest { ticket: String },
7676+ LinkRequest {
7777+ ticket: String,
7878+ },
7179 LinkAccepted,
7272- LinkRejected { reason: String },
8080+ LinkRejected {
8181+ reason: String,
8282+ },
8383+ SyncStart {
8484+ last_row_counter: Option<i64>,
8585+ },
8686+ SyncSendDeltaInfo {
8787+ blob_ticket: String,
8888+ last_row_counter: Option<i64>,
8989+ },
9090+ SyncEnd,
7391}
74927575-// Entrypoint of network connection
7676-// pub async fn init(ticket: Option<&str>) -> Result<()> {
7777-// if let Some(ticket_addr) = ticket {
7878-// let sender_endpoint = Endpoint::bind(presets::N0).await?;
7979-// println!("{:?}", sender_endpoint.addr());
8080-// let se_clone = sender_endpoint.clone();
8181-// let send_pinger = Ping::new();
8282-// let rtt = send_pinger
8383-// .ping(
8484-// &sender_endpoint,
8585-// EndpointTicket::from_str(ticket_addr)?
8686-// .endpoint_addr()
8787-// .clone(),
8888-// )
8989-// .await?;
9090-9191-// println!("ping took: {:?} to complete", rtt);
9292-// se_clone.close().await;
9393-// } else {
9494-// let endpoint = Endpoint::bind(presets::N0).await?;
9595-// let ep = endpoint.clone();
9696-// let ep2 = endpoint.clone();
9797-// endpoint.online().await;
9898-9999-// let ping = Ping::new();
100100-101101-// let ticket = EndpointTicket::new(endpoint.addr());
102102-103103-// println!("ticket\n{:?}", ticket.to_string());
104104-105105-// let recv_router = Router::builder(ep).accept(iroh_ping::ALPN, ping).spawn();
106106-// ep2.close().await;
107107-// recv_router.shutdown().await?;
108108-// }
109109-// Ok(())
110110-// }
111111-11293pub async fn link(ticket: Option<String>) -> Result<()> {
11394 let user_db_conn = get_db_conn(DBTYPE::COMMON)?;
11495 let user = get_current_user(&user_db_conn)?;
11596 let endpoint = create_endpoint(&user).await?;
11697 let is_online = is_online(&endpoint).await;
11798 let mut bootstrap_ids: Vec<EndpointId> = vec![];
118118- // if ticket's there, then this is link enable sender's command, e;se receiver end
9999+ // if ticket's there, then this is link enable sender's command, else receiver end
119100 if let Some(ticket) = ticket {
120101 let (endpoint_id, mut did, mut nickname, topic_value) = parse_link_ticket(&ticket)?;
121102···242223 if cfg!(debug_assertions) {
243224 println!("In {}:, some event {:?}", user.username, event);
244225 }
226226+ // TODO: Damn refactor the loop, its getting bigger
245227 if let Event::Received(msg) = event {
246228 let pub_key = msg.delivered_from;
247229 let msg = NetworkMessage::from_bytes(&msg.content)?;
···333315 msg.from_nickname, msg.from_did, reason
334316 );
335317 }
318318+ msg_body => {
319319+ eprintln!("Invalid link message {:?}", msg_body)
320320+ }
336321 }
337322 }
338323 }
339324 Ok(())
340325}
341326327327+async fn sync_subscribe_loop(
328328+ mut receiver: GossipReceiver,
329329+ sender: GossipSender,
330330+ user: accounts::User,
331331+ user_db_conn: Connection,
332332+ store: MemStore,
333333+ endpoint: Endpoint,
334334+) -> Result<()> {
335335+ while let Some(event) = receiver.try_next().await? {
336336+ if cfg!(debug_assertions) {
337337+ println!("SYNC_LOOP: In {}:, some event {:?}", user.username, event);
338338+ }
339339+ if let Event::Received(msg) = event {
340340+ let pub_key = msg.delivered_from;
341341+ let msg = NetworkMessage::from_bytes(&msg.content)?;
342342+ if !is_did_valid(&msg.from_did, pub_key)? {
343343+ eprintln!(
344344+ "Incoming peer DID {} invalid, blocking request",
345345+ msg.from_did
346346+ );
347347+ continue;
348348+ }
349349+ match msg.body {
350350+ MessageBody::SyncStart {
351351+ last_row_counter: _,
352352+ } => {
353353+ // TODO: REJECT SYNC_REQUESTS FROM NON_PEERS
354354+ println!("Received sync start");
355355+ on_sync_start_event(&sender, &store, &msg, pub_key, &user).await?;
356356+ }
357357+ MessageBody::SyncSendDeltaInfo {
358358+ blob_ticket: _,
359359+ last_row_counter: _,
360360+ } => {
361361+ on_sync_send_delta_info(&sender, &store, &msg, pub_key, &user, &endpoint)
362362+ .await?;
363363+ }
364364+ MessageBody::SyncEnd => {
365365+ println!("sync end, can close");
366366+ }
367367+ msg_body => {
368368+ println!("Invalid sync message {:?}", msg_body)
369369+ }
370370+ }
371371+ }
372372+ }
373373+ Ok(())
374374+}
342375async fn create_endpoint(user: &accounts::User) -> Result<Endpoint> {
343376 // In release mode, we will build the endpoint using
344377 // tiles keypair in keychain
···360393 }
361394}
362395396396+pub async fn sync(did: Option<String>) -> Result<()> {
397397+ let user_db_conn = get_db_conn(DBTYPE::COMMON)?;
398398+ let user = get_current_user(&user_db_conn)?;
399399+ let chat_db_conn = get_db_conn(DBTYPE::CHAT)?;
400400+ let endpoint = create_endpoint(&user).await?;
401401+ let is_online = is_online(&endpoint).await;
402402+ if let Some(receiver_did) = did {
403403+ // INITIATOR BLOCK
404404+ // The sync gossip topic is basically derived from the receiver's
405405+ // DID, so that initiator's can directly connect w/o any
406406+ // initial handshake
407407+ let receiver_pub_key = get_public_key_from_did(&receiver_did)?;
408408+ let receiver_endpoint_id = PublicKey::from_bytes(&receiver_pub_key)?;
409409+ println!("receiver endpoint id {:?}", receiver_endpoint_id);
410410+ let sync_topic = format!("sync:{}", receiver_did);
411411+ let sync_topic_id = create_topic_id(&sync_topic);
412412+413413+ let (sender, mut receiver, recv_router, store) =
414414+ create_sync_network(&endpoint, sync_topic_id, vec![receiver_endpoint_id]).await?;
415415+ println!("\nConnecting to {}.....", receiver_did);
416416+ receiver.joined().await?;
417417+ tokio::spawn(sync_subscribe_loop(
418418+ receiver,
419419+ sender.clone(),
420420+ user.clone(),
421421+ user_db_conn,
422422+ store,
423423+ endpoint.clone(),
424424+ ));
425425+426426+ // get the last_row_counter
427427+ //
428428+ let receiver_last_row_counter = get_last_row_counter(&chat_db_conn, &receiver_did)?;
429429+ let sync_start_msg = NetworkMessage::new(
430430+ &user,
431431+ is_online,
432432+ MessageBody::SyncStart {
433433+ last_row_counter: Some(receiver_last_row_counter),
434434+ },
435435+ );
436436+ sender.broadcast(sync_start_msg.to_bytes().into()).await?;
437437+438438+ println!(
439439+ "\nSent sync start request to {}({})",
440440+ user.username, user.user_id
441441+ );
442442+ tokio::signal::ctrl_c().await?;
443443+ recv_router.shutdown().await?;
444444+ } else {
445445+ // RECEIVER BLOCK
446446+ // The sync gossip topic is basically derived from the receiver's
447447+ // public-key, so that initiator's can directly connect w/o any
448448+ // initial handshake
449449+450450+ println!("endpointId {:?}", endpoint.id());
451451+ let did = if cfg!(debug_assertions) {
452452+ let pub_key = endpoint.id();
453453+ &get_did_from_public_key(pub_key.as_bytes())?
454454+ } else {
455455+ &user.user_id
456456+ };
457457+458458+ let sync_topic = format!("sync:{}", did);
459459+ let sync_topic_id = create_topic_id(&sync_topic);
460460+ let (sender, receiver, recv_router, store) =
461461+ create_sync_network(&endpoint, sync_topic_id, vec![]).await?;
462462+463463+ tokio::spawn(sync_subscribe_loop(
464464+ receiver,
465465+ sender.clone(),
466466+ user.clone(),
467467+ user_db_conn,
468468+ store,
469469+ endpoint.clone(),
470470+ ));
471471+ println!("Ready to accept sync requests from peers...");
472472+473473+ // Since in dev, we use create endpoints, at the initiator side
474474+ // we can use the DID derived from this, instead of actual ones
475475+ // for the network to form correctly
476476+ if cfg!(debug_assertions) {
477477+ println!("Use this DID {} in dev for testing", did);
478478+ }
479479+ tokio::signal::ctrl_c().await?;
480480+ recv_router.shutdown().await?;
481481+ }
482482+ endpoint.close().await;
483483+ Ok(())
484484+}
485485+486486+// Router with gossip and blob protocol
487487+async fn create_sync_network(
488488+ endpoint: &Endpoint,
489489+ topic_id: TopicId,
490490+ bootstrap_ids: Vec<iroh::PublicKey>,
491491+) -> Result<(GossipSender, GossipReceiver, Router, MemStore)> {
492492+ let gossip = Gossip::builder().spawn(endpoint.clone());
493493+ let store = MemStore::new();
494494+ let blobs = BlobsProtocol::new(&store, None);
495495+ let recv_router = Router::builder(endpoint.clone())
496496+ .accept(iroh_gossip::ALPN, gossip.clone())
497497+ .accept(iroh_blobs::ALPN, blobs.clone())
498498+ .spawn();
499499+500500+ let (goss_sender, goss_receiver) = gossip.subscribe(topic_id, bootstrap_ids).await?.split();
501501+502502+ Ok((goss_sender, goss_receiver, recv_router, store))
503503+}
504504+363505fn create_topic_id(topic_name: &str) -> TopicId {
364506 let mut hasher = Sha256::new();
365507 hasher.update(topic_name.as_bytes());
···461603 Ok(get_did_from_public_key(&pub_key)? == did)
462604 }
463605}
464464-// fn subsribe_mdns_events(mdns_events) {}
465465-//TODO: Add tests, can we get some from iroh reference?
606606+607607+async fn on_sync_start_event(
608608+ sender: &GossipSender,
609609+ store: &MemStore,
610610+ msg: &NetworkMessage,
611611+ delivered_from: PublicKey,
612612+ user: &accounts::User,
613613+ // chat_db_conn: &Connection,
614614+) -> Result<()> {
615615+ println!("Received sync start");
616616+ if let MessageBody::SyncStart {
617617+ last_row_counter: lrc,
618618+ } = &msg.body
619619+ {
620620+ // let chat_delta = get_encoded_delta(
621621+ // &chat_db_conn,
622622+ // &user.user_id,
623623+ // lrc.expect("Expected a valid last row counter"),
624624+ // );
625625+ // let chat_delta_bytes =
626626+ let rand_bytes = get_random_bytes_32().to_vec();
627627+ println!("rand bytes\n{:?}", rand_bytes);
628628+ let tag = store
629629+ .blobs()
630630+ .add_bytes(Into::<Bytes>::into(rand_bytes))
631631+ .await?;
632632+633633+ let ticket = BlobTicket::new(delivered_from.into(), tag.hash, tag.format);
634634+ let delta_info = NetworkMessage::new(
635635+ &user,
636636+ msg.is_online,
637637+ MessageBody::SyncSendDeltaInfo {
638638+ blob_ticket: ticket.to_string(),
639639+ last_row_counter: Some(0),
640640+ },
641641+ );
642642+ sender.broadcast(delta_info.to_bytes().into()).await?;
643643+ println!("Sent blob ticket {}", ticket.to_string());
644644+ }
645645+ Ok(())
646646+}
647647+648648+async fn on_sync_send_delta_info(
649649+ sender: &GossipSender,
650650+ store: &MemStore,
651651+ msg: &NetworkMessage,
652652+ delivered_from: PublicKey,
653653+ user: &accounts::User,
654654+ endpoint: &Endpoint,
655655+) -> Result<()> {
656656+ if let MessageBody::SyncSendDeltaInfo {
657657+ blob_ticket,
658658+ last_row_counter,
659659+ } = &msg.body
660660+ {
661661+ let ticket: BlobTicket = blob_ticket.parse()?;
662662+ let downloader = store.downloader(&endpoint);
663663+ downloader
664664+ .download(ticket.hash(), Some(delivered_from))
665665+ .await?;
666666+667667+ let data = store.blobs().get_bytes(ticket.hash()).await?;
668668+ println!("rand bytes received {:?}", data.to_vec());
669669+670670+ println!("finished download");
671671+ if let Some(_row_counter) = last_row_counter {
672672+ let rand_bytes = get_random_bytes_32().to_vec();
673673+ println!("rand bytes\n{:?}", rand_bytes);
674674+ let tag = store
675675+ .blobs()
676676+ .add_bytes(Into::<Bytes>::into(rand_bytes))
677677+ .await?;
678678+679679+ let ticket = BlobTicket::new(delivered_from.into(), tag.hash, tag.format);
680680+ let delta_info = NetworkMessage::new(
681681+ &user,
682682+ msg.is_online,
683683+ MessageBody::SyncSendDeltaInfo {
684684+ blob_ticket: ticket.to_string(),
685685+ last_row_counter: None,
686686+ },
687687+ );
688688+ sender.broadcast(delta_info.to_bytes().into()).await?;
689689+ } else {
690690+ let stop_req = NetworkMessage::new(&user, msg.is_online, MessageBody::SyncEnd);
691691+ sender.broadcast(stop_req.to_bytes().into()).await?;
692692+ println!("sync end, can close");
693693+ }
694694+ }
695695+ Ok(())
696696+}
+15-2
tiles/src/main.rs
···11+#![warn(clippy::pedantic)]
22+13use std::error::Error;
2435use clap::{Args, Parser, Subcommand};
46use tiles::{
55- core::{self, network::link},
77+ core::{
88+ self,
99+ network::{link, sync},
1010+ },
611 daemon::{start_cmd, start_server, stop_cmd},
712 runtime::{RunArgs, build_runtime},
813 utils::installer,
···66716772 /// Link with other devices p2p
6873 Link(LinkArgs),
7474+7575+ /// Syncs the chats to peers
7676+ Sync {
7777+ /// The DID of the peer you want to sync
7878+ did: Option<String>,
7979+ },
6980}
70817182#[derive(Debug, Args)]
···174185}
175186#[tokio::main]
176187pub async fn main() -> Result<(), Box<dyn Error>> {
188188+ env_logger::init();
177189 let cli = Cli::parse();
178190 let runtime = build_runtime();
179191 match cli.command {
···232244 data,
233245 model,
234246 }) => {
235235- let modelfile = commands::optimize(modelfile_path.clone(), data, model).await?;
247247+ let modelfile = commands::optimize(&modelfile_path, data, &model).await?;
236248 std::fs::write(&modelfile_path, modelfile.to_string())?;
237249 println!("Successfully updated {}", modelfile_path);
238250 }
···263275 show_peers()?;
264276 }
265277 },
278278+ Some(Commands::Sync { did }) => sync(did).await?,
266279 }
267280 Ok(())
268281}
+1-2
tiles/src/utils/config.rs
···9696 .get("data")
9797 .expect("Failed to get data")
9898 .as_table()
9999- .expect("Failed to parse to table (data)")
100100- .clone();
9999+ .expect("Failed to parse to table (data)");
101100102101 if let Some(path) = data_config
103102 .get("path")
+4
tiles/src/utils/mod.rs
···1010 .expect("time went backwards")
1111 .as_secs()
1212}
1313+1414+pub fn test_logger() {
1515+ let _ = env_logger::builder().is_test(true).try_init();
1616+}