A personal app view to see Bsky posts of your followers (for when their app view goes down)
1use ::chrono::{DateTime, Utc};
2use atproto_tap::{RecordAction, RecordEvent, TapClient, TapEvent, connect_to};
3use serde::Deserialize;
4use tokio_stream::StreamExt;
5
6use crate::store;
7
8#[derive(Deserialize)]
9struct FollowRecord {
10 subject: String,
11}
12
13pub async fn add_repo(did: &String) -> anyhow::Result<()> {
14 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string());
15 let client = TapClient::new(tap_url.as_str(), Some("password".to_string()));
16
17 // Add repositories to track
18 let res = client.add_repos(&[did.as_str()]).await;
19 match res {
20 Err(e) => {
21 println!("failed to add repo with tap: {e}");
22 }
23 Ok(()) => {
24 println!("added repo {did}");
25 }
26 }
27
28 Ok(())
29}
30
31pub async fn remove_repo(did: &String) {
32 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string());
33 let client = TapClient::new(tap_url.as_str(), Some("password".to_string()));
34
35 let res = client.remove_repos(&[did.as_str()]).await;
36
37 match res {
38 Err(e) => {
39 println!("failed to remove repo with tap: {e}");
40 }
41 Ok(()) => {
42 println!("removed repo {did}");
43 }
44 }
45}
46
47pub async fn run_tap(users_did: String, pool: &sqlx::SqlitePool) {
48 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string());
49 let mut stream = connect_to(tap_url.as_str());
50
51 while let Some(result) = stream.next().await {
52 match result {
53 Ok(event) => match event.as_ref() {
54 TapEvent::Record { record, .. } => {
55 let collection = String::from(record.collection.clone());
56
57 match collection.as_str() {
58 "app.bsky.graph.follow" => {
59 if record.did.clone().into_string() == users_did {
60 handle_user_follow_event(record, pool).await;
61 }
62 }
63 "app.bsky.feed.post" => {
64 handle_post_event(record, pool).await;
65 }
66 "app.bsky.feed.repost" => {
67 handle_repost_event(record, pool).await;
68 }
69 _ => {}
70 }
71
72 println!("{} {} {}", record.action, record.collection, record.did);
73 }
74 TapEvent::Identity { identity, .. } => {
75 println!("Identity: {} = {}", identity.did, identity.handle);
76 }
77 },
78 Err(e) => eprintln!("Error: {}", e),
79 }
80 }
81}
82
83async fn handle_user_follow_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
84 match record.action {
85 RecordAction::Create => {
86 let follow: FollowRecord = record.parse_record().unwrap(); // TODO - bad error handling there
87 let result = sqlx::query("INSERT INTO follows (subject, rkey) VALUES ($1, $2)")
88 .bind(&follow.subject)
89 .bind(record.rkey.to_string())
90 .execute(pool)
91 .await;
92
93 if result.is_err() {
94 println!("Error inserting follow into the database: {result:?}");
95 }
96
97 // track the follow anyway
98 _ = add_repo(&follow.subject).await;
99 }
100 RecordAction::Delete => {
101 let follow_subject = store::get_follow_by_rkey(&record.rkey.to_string(), pool).await;
102 if follow_subject != "" {
103 _ = remove_repo(&follow_subject).await;
104 }
105
106 let result = sqlx::query("DELETE FROM follows WHERE rkey = $1")
107 .bind(record.rkey.to_string())
108 .execute(pool)
109 .await;
110
111 if result.is_err() {
112 println!("Error deleting follow from the database: {result:?}");
113 }
114 }
115 RecordAction::Update => {}
116 }
117}
118
119async fn handle_post_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
120 match record.action {
121 RecordAction::Create => {
122 let cid: String = match record.cid.clone() {
123 Some(x) => x.to_string(),
124 None => "".to_string(),
125 };
126 let mut post = store::PostRecord {
127 created: Utc::now(),
128 indexed: Utc::now(),
129 author: record.did.clone().to_string(),
130 rkey: record.rkey.to_string(),
131 cid: cid,
132 };
133
134 let tap_post: TapPost = record.parse_record().unwrap(); // TODO: better error handling here
135
136 let created_at = DateTime::parse_from_rfc3339(&tap_post.created_at);
137 match created_at {
138 Ok(ca) => {
139 post.created = ca.to_utc();
140 }
141 Err(e) => {
142 println!("parsing created at: {e}");
143 }
144 }
145
146 store::insert_post(post, pool).await;
147 }
148 RecordAction::Delete => {
149 store::delete_post(record.rkey.to_string(), pool).await;
150 }
151 RecordAction::Update => {}
152 }
153}
154
155async fn handle_repost_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
156 match record.action {
157 RecordAction::Create => {
158 let tap_post: TapPost = record.parse_record().unwrap(); // TODO: better error handling here
159 let cid: String = match record.cid.clone() {
160 Some(x) => x.to_string(),
161 None => "".to_string(),
162 };
163
164 let subject: String = match tap_post.subject {
165 Some(x) => x.uri.clone().to_string(),
166 None => "".to_string(),
167 };
168
169 let mut repost = store::RepostRecord {
170 created: Utc::now(),
171 indexed: Utc::now(),
172 author: record.did.clone().to_string(),
173 rkey: record.rkey.to_string(),
174 subject: subject,
175 cid: cid,
176 };
177
178 let created_at = DateTime::parse_from_rfc3339(&tap_post.created_at);
179 match created_at {
180 Ok(ca) => {
181 repost.created = ca.to_utc();
182 }
183 Err(e) => {
184 println!("parsing created at: {e}");
185 }
186 }
187
188 store::insert_repost(repost, pool).await;
189 }
190 RecordAction::Delete => {
191 store::delete_repost(record.rkey.to_string(), pool).await;
192 }
193 RecordAction::Update => {}
194 }
195}
196
197#[derive(Debug, Deserialize)]
198struct TapPost {
199 #[serde(rename = "createdAt")]
200 created_at: String,
201 subject: Option<Subject>,
202}
203
204#[derive(Debug, Deserialize)]
205struct Subject {
206 uri: String,
207}