this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3a1ce8e0e4e1e2a5a69cb057d8debe3a19c8dad9 126 lines 5.6 kB view raw
1use std::thread; 2use std::time::Duration; 3use std::{collections::VecDeque, sync::Arc}; 4 5use futures_util::stream::StreamExt; 6use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 7use jacquard::url::Url; 8use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient}; 9use tokio::{ 10 sync::Mutex, 11 task::{self, JoinHandle}, 12}; 13 14use crate::config; 15 16pub async fn queue() -> ( 17 Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>, 18 JoinHandle<()>, 19) { 20 let queue = Arc::new(Mutex::new(VecDeque::new())); 21 22 // USER_SUBSCRIBE_URL is formatted as a domain 23 let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL)) 24 .expect("Env var USER_SUBSCRIBE_URL should be formated as a domain."); 25 let client = TungsteniteSubscriptionClient::from_base_uri(uri); 26 let (_sink, mut messages) = client 27 .subscribe(&SubscribeRepos::new().build()) 28 .await 29 .expect("Could not subscribe to new events") 30 .into_stream(); 31 32 let queue_clone = queue.clone(); 33 let handle = task::spawn(async move { 34 let queue = queue_clone; 35 36 loop { 37 if let Some(msg) = messages.next().await { 38 let msg = match msg { 39 Ok(val) => val, 40 Err(err) => { 41 eprintln!("Warning: Websocket error: {} ({:?})", err, err.source()); 42 if &jacquard::StreamErrorKind::Closed == err.kind() { 43 let stream = client.subscribe(&SubscribeRepos::new().build()).await; 44 // if it reconnected successfully, just continue 45 let new_messages = match stream { 46 Ok(val) => val.into_stream().1, 47 // if it failed, try reconnect 10 times, waiting a second between each attempt 48 Err(_) => { 49 let mut new_messages = None; 50 for _ in 0..10 { 51 let Ok(stream) = 52 client.subscribe(&SubscribeRepos::new().build()).await 53 else { 54 // wait a second 55 thread::sleep(Duration::from_secs(1)); 56 continue; 57 }; 58 new_messages = Some(stream.into_stream().1) 59 } 60 61 if let Some(new_messages) = new_messages { 62 new_messages 63 } else { 64 // could not reconnect so just die lmao 65 panic!("Could not reconnect to client. Fatal"); 66 } 67 } 68 }; 69 // for some reason new_messages doesnt need to be mut ? 70 messages = new_messages; 71 } 72 continue; 73 } 74 }; 75 76 // filter messages by user did 77 // note that #identity #account #info and #unknown will probably be ignored 78 let ev = match msg.clone() { 79 SubscribeReposMessage::Commit(commit) => { 80 if commit.repo != *config::USER_DID { 81 continue; 82 } else { 83 SubscribeReposMessage::Commit(commit) 84 } 85 } 86 SubscribeReposMessage::Sync(sync) => { 87 if sync.did != *config::USER_DID { 88 continue; 89 } else { 90 SubscribeReposMessage::Sync(sync) 91 } 92 } 93 94 SubscribeReposMessage::Identity(identity) => { 95 if identity.did != *config::USER_DID { 96 continue; 97 } else { 98 eprintln!( 99 "Warning: Recieved #identity event. Configuration may be out of date" 100 ); 101 SubscribeReposMessage::Identity(identity) 102 } 103 } 104 SubscribeReposMessage::Account(account) => { 105 if account.did != *config::USER_DID { 106 continue; 107 } else { 108 eprintln!( 109 "Warning: Recieved #account event. Account active: `{}`. Account status: `{}`", 110 account.active, 111 account.status.clone().unwrap_or("Unknown".into()) 112 ); 113 SubscribeReposMessage::Account(account) 114 } 115 } 116 SubscribeReposMessage::Info(info) => SubscribeReposMessage::Info(info), 117 SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data), 118 }; 119 120 queue.lock().await.push_back(ev); 121 } 122 } 123 }); 124 125 (queue, handle) 126}