A personal app view to see Bsky posts of your followers (for when their app view goes down)
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 207 lines 6.8 kB view raw
1use ::chrono::{DateTime, Utc}; 2use atproto_tap::{RecordAction, RecordEvent, TapClient, TapEvent, connect_to}; 3use serde::Deserialize; 4use tokio_stream::StreamExt; 5 6use crate::store; 7 8#[derive(Deserialize)] 9struct FollowRecord { 10 subject: String, 11} 12 13pub async fn add_repo(did: &String) -> anyhow::Result<()> { 14 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string()); 15 let client = TapClient::new(tap_url.as_str(), Some("password".to_string())); 16 17 // Add repositories to track 18 let res = client.add_repos(&[did.as_str()]).await; 19 match res { 20 Err(e) => { 21 println!("failed to add repo with tap: {e}"); 22 } 23 Ok(()) => { 24 println!("added repo {did}"); 25 } 26 } 27 28 Ok(()) 29} 30 31pub async fn remove_repo(did: &String) { 32 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string()); 33 let client = TapClient::new(tap_url.as_str(), Some("password".to_string())); 34 35 let res = client.remove_repos(&[did.as_str()]).await; 36 37 match res { 38 Err(e) => { 39 println!("failed to remove repo with tap: {e}"); 40 } 41 Ok(()) => { 42 println!("removed repo {did}"); 43 } 44 } 45} 46 47pub async fn run_tap(users_did: String, pool: &sqlx::SqlitePool) { 48 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string()); 49 let mut stream = connect_to(tap_url.as_str()); 50 51 while let Some(result) = stream.next().await { 52 match result { 53 Ok(event) => match event.as_ref() { 54 TapEvent::Record { record, .. } => { 55 let collection = String::from(record.collection.clone()); 56 57 match collection.as_str() { 58 "app.bsky.graph.follow" => { 59 if record.did.clone().into_string() == users_did { 60 handle_user_follow_event(record, pool).await; 61 } 62 } 63 "app.bsky.feed.post" => { 64 handle_post_event(record, pool).await; 65 } 66 "app.bsky.feed.repost" => { 67 handle_repost_event(record, pool).await; 68 } 69 _ => {} 70 } 71 72 println!("{} {} {}", record.action, record.collection, record.did); 73 } 74 TapEvent::Identity { identity, .. } => { 75 println!("Identity: {} = {}", identity.did, identity.handle); 76 } 77 }, 78 Err(e) => eprintln!("Error: {}", e), 79 } 80 } 81} 82 83async fn handle_user_follow_event(record: &RecordEvent, pool: &sqlx::SqlitePool) { 84 match record.action { 85 RecordAction::Create => { 86 let follow: FollowRecord = record.parse_record().unwrap(); // TODO - bad error handling there 87 let result = sqlx::query("INSERT INTO follows (subject, rkey) VALUES ($1, $2)") 88 .bind(&follow.subject) 89 .bind(record.rkey.to_string()) 90 .execute(pool) 91 .await; 92 93 if result.is_err() { 94 println!("Error inserting follow into the database: {result:?}"); 95 } 96 97 // track the follow anyway 98 _ = add_repo(&follow.subject).await; 99 } 100 RecordAction::Delete => { 101 let follow_subject = store::get_follow_by_rkey(&record.rkey.to_string(), pool).await; 102 if follow_subject != "" { 103 _ = remove_repo(&follow_subject).await; 104 } 105 106 let result = sqlx::query("DELETE FROM follows WHERE rkey = $1") 107 .bind(record.rkey.to_string()) 108 .execute(pool) 109 .await; 110 111 if result.is_err() { 112 println!("Error deleting follow from the database: {result:?}"); 113 } 114 } 115 RecordAction::Update => {} 116 } 117} 118 119async fn handle_post_event(record: &RecordEvent, pool: &sqlx::SqlitePool) { 120 match record.action { 121 RecordAction::Create => { 122 let cid: String = match record.cid.clone() { 123 Some(x) => x.to_string(), 124 None => "".to_string(), 125 }; 126 let mut post = store::PostRecord { 127 created: Utc::now(), 128 indexed: Utc::now(), 129 author: record.did.clone().to_string(), 130 rkey: record.rkey.to_string(), 131 cid: cid, 132 }; 133 134 let tap_post: TapPost = record.parse_record().unwrap(); // TODO: better error handling here 135 136 let created_at = DateTime::parse_from_rfc3339(&tap_post.created_at); 137 match created_at { 138 Ok(ca) => { 139 post.created = ca.to_utc(); 140 } 141 Err(e) => { 142 println!("parsing created at: {e}"); 143 } 144 } 145 146 store::insert_post(post, pool).await; 147 } 148 RecordAction::Delete => { 149 store::delete_post(record.rkey.to_string(), pool).await; 150 } 151 RecordAction::Update => {} 152 } 153} 154 155async fn handle_repost_event(record: &RecordEvent, pool: &sqlx::SqlitePool) { 156 match record.action { 157 RecordAction::Create => { 158 let tap_post: TapPost = record.parse_record().unwrap(); // TODO: better error handling here 159 let cid: String = match record.cid.clone() { 160 Some(x) => x.to_string(), 161 None => "".to_string(), 162 }; 163 164 let subject: String = match tap_post.subject { 165 Some(x) => x.uri.clone().to_string(), 166 None => "".to_string(), 167 }; 168 169 let mut repost = store::RepostRecord { 170 created: Utc::now(), 171 indexed: Utc::now(), 172 author: record.did.clone().to_string(), 173 rkey: record.rkey.to_string(), 174 subject: subject, 175 cid: cid, 176 }; 177 178 let created_at = DateTime::parse_from_rfc3339(&tap_post.created_at); 179 match created_at { 180 Ok(ca) => { 181 repost.created = ca.to_utc(); 182 } 183 Err(e) => { 184 println!("parsing created at: {e}"); 185 } 186 } 187 188 store::insert_repost(repost, pool).await; 189 } 190 RecordAction::Delete => { 191 store::delete_repost(record.rkey.to_string(), pool).await; 192 } 193 RecordAction::Update => {} 194 } 195} 196 197#[derive(Debug, Deserialize)] 198struct TapPost { 199 #[serde(rename = "createdAt")] 200 created_at: String, 201 subject: Option<Subject>, 202} 203 204#[derive(Debug, Deserialize)] 205struct Subject { 206 uri: String, 207}