this repo has no description
1use std::thread;
2use std::time::Duration;
3use std::{collections::VecDeque, sync::Arc};
4
5use futures_util::stream::StreamExt;
6use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
7use jacquard::url::Url;
8use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient};
9use tokio::{
10 sync::Mutex,
11 task::{self, JoinHandle},
12};
13
14use crate::config;
15
16pub async fn queue() -> (
17 Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>,
18 JoinHandle<()>,
19) {
20 let queue = Arc::new(Mutex::new(VecDeque::new()));
21
22 // USER_SUBSCRIBE_URL is formatted as a domain
23 let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL))
24 .expect("Env var USER_SUBSCRIBE_URL should be formated as a domain.");
25 let client = TungsteniteSubscriptionClient::from_base_uri(uri);
26 let (_sink, mut messages) = client
27 .subscribe(&SubscribeRepos::new().build())
28 .await
29 .expect("Could not subscribe to new events")
30 .into_stream();
31
32 let queue_clone = queue.clone();
33 let handle = task::spawn(async move {
34 let queue = queue_clone;
35
36 loop {
37 if let Some(msg) = messages.next().await {
38 let msg = match msg {
39 Ok(val) => val,
40 Err(err) => {
41 eprintln!("Warning: Websocket error: {} ({:?})", err, err.source());
42 if &jacquard::StreamErrorKind::Closed == err.kind() {
43 let stream = client.subscribe(&SubscribeRepos::new().build()).await;
44 // if it reconnected successfully, just continue
45 let new_messages = match stream {
46 Ok(val) => val.into_stream().1,
47 // if it failed, try reconnect 10 times, waiting a second between each attempt
48 Err(_) => {
49 let mut new_messages = None;
50 for _ in 0..10 {
51 let Ok(stream) =
52 client.subscribe(&SubscribeRepos::new().build()).await
53 else {
54 // wait a second
55 thread::sleep(Duration::from_secs(1));
56 continue;
57 };
58 new_messages = Some(stream.into_stream().1)
59 }
60
61 if let Some(new_messages) = new_messages {
62 new_messages
63 } else {
64 // could not reconnect so just die lmao
65 panic!("Could not reconnect to client. Fatal");
66 }
67 }
68 };
69 // for some reason new_messages doesnt need to be mut ?
70 messages = new_messages;
71 }
72 continue;
73 }
74 };
75
76 // filter messages by user did
77 // note that #identity #account #info and #unknown will probably be ignored
78 let ev = match msg.clone() {
79 SubscribeReposMessage::Commit(commit) => {
80 if commit.repo != *config::USER_DID {
81 continue;
82 } else {
83 SubscribeReposMessage::Commit(commit)
84 }
85 }
86 SubscribeReposMessage::Sync(sync) => {
87 if sync.did != *config::USER_DID {
88 continue;
89 } else {
90 SubscribeReposMessage::Sync(sync)
91 }
92 }
93
94 SubscribeReposMessage::Identity(identity) => {
95 if identity.did != *config::USER_DID {
96 continue;
97 } else {
98 eprintln!(
99 "Warning: Recieved #identity event. Configuration may be out of date"
100 );
101 SubscribeReposMessage::Identity(identity)
102 }
103 }
104 SubscribeReposMessage::Account(account) => {
105 if account.did != *config::USER_DID {
106 continue;
107 } else {
108 eprintln!(
109 "Warning: Recieved #account event. Account active: `{}`. Account status: `{}`",
110 account.active,
111 account.status.clone().unwrap_or("Unknown".into())
112 );
113 SubscribeReposMessage::Account(account)
114 }
115 }
116 SubscribeReposMessage::Info(info) => SubscribeReposMessage::Info(info),
117 SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data),
118 };
119
120 queue.lock().await.push_back(ev);
121 }
122 }
123 });
124
125 (queue, handle)
126}