this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3a1ce8e0e4e1e2a5a69cb057d8debe3a19c8dad9 198 lines 7.1 kB view raw
1use std::{collections::VecDeque, sync::Arc}; 2 3use futures_util::future; 4use ipld_core::ipld::Ipld; 5use jacquard::{ 6 api::com_atproto::sync::subscribe_repos::{Commit, SubscribeReposMessage, Sync}, 7 types::string::Handle, 8}; 9use jacquard_repo::{BlockStore, MemoryBlockStore}; 10use sqlx::{Pool, Postgres, query}; 11use thiserror::Error; 12use tokio::{sync::Mutex, task::JoinHandle}; 13 14use crate::{backfill::backfill, utils::ipld_json::ipld_to_json_value}; 15 16trait Ingest { 17 type Error; 18 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error>; 19} 20 21#[derive(Debug, Error)] 22enum CommitError { 23 #[error("Error parsing #commit event: {}", .0)] 24 ParseCarBytes(#[from] jacquard_repo::RepoError), 25} 26 27impl Ingest for Commit<'_> { 28 type Error = CommitError; 29 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> { 30 let car = jacquard_repo::car::parse_car_bytes(&self.blocks).await?; 31 let storage = Arc::new(MemoryBlockStore::new_from_blocks(car.blocks)); 32 33 let ops = future::join_all(self.ops.clone().into_iter().map(|op| async { 34 // get block data by cid, or None if errors/not found 35 if let Some(cid) = &op.cid { 36 if let Ok(cid) = cid.0.to_ipld() 37 && let Ok(contents) = storage.get(&cid).await 38 && let Some(contents) = contents 39 && let Ok(val) = serde_ipld_dagcbor::from_slice::<Ipld>(&contents) 40 { 41 (op, ipld_to_json_value(&val).ok()) 42 } else { 43 (op, None) 44 } 45 } else { 46 (op, None) 47 } 48 })) 49 .await; 50 51 future::join_all(ops.into_iter().map(|(op, val)| async { 52 let mut path = op.path.split("/"); 53 let Some(collection) = path.next() else { 54 eprintln!("Invalid path ({})", op.path.as_str()); 55 return; 56 }; 57 let Some(rkey) = path.next() else { 58 eprintln!("Invalid path ({})", op.path.as_str()); 59 return; 60 }; 61 // assert the path is only collection/rkey 62 if path.next().is_some() { 63 eprintln!("Invalid path ({})", op.path.as_str()); 64 return; 65 }; 66 match op.action.clone().as_str() { 67 "create" | "update" => { 68 let Some(cid) = op.cid.map(|x| x.0.to_string()) else { 69 eprintln!( 70 "Missing cid for {} {}/{}", 71 op.action.clone().as_str(), 72 collection, 73 rkey 74 ); 75 return; 76 }; 77 let Some(val) = val else { 78 eprintln!( 79 "Missing value for {} {}/{}/{}", 80 op.action.clone().as_str(), 81 collection, 82 rkey, 83 cid 84 ); 85 return; 86 }; 87 if let Err(err) = query!( 88 "INSERT INTO records (collection, rkey, cid, record) 89 VALUES ($1, $2, $3, $4) 90 ON CONFLICT (collection, rkey) 91 DO UPDATE SET 92 cid = EXCLUDED.cid, 93 record = EXCLUDED.record;", 94 collection, 95 rkey, 96 cid, 97 val 98 ) 99 .execute(&*conn) 100 .await 101 { 102 eprintln!( 103 "Error applying {} to {}/{}/{}\n{}", 104 op.action.clone().as_str(), 105 collection, 106 rkey, 107 cid, 108 err 109 ); 110 } else { 111 println!( 112 "{} {}/{}/{}", 113 op.action.clone().as_str(), 114 collection, 115 rkey, 116 cid 117 ); 118 }; 119 } 120 "delete" => { 121 if let Err(err) = query!( 122 "DELETE FROM records WHERE 123 collection = $1 124 and rkey = $2", 125 collection, 126 rkey, 127 ) 128 .execute(&*conn) 129 .await 130 { 131 eprintln!("Error deleting {}/{}\n{}", collection, rkey, err); 132 } else { 133 println!("delete {}/{}", collection, rkey); 134 }; 135 } 136 _ => { 137 println!( 138 "unknown action {} for {:#?} {:#?}", 139 op.action.as_str(), 140 op, 141 val 142 ) 143 } 144 } 145 })) 146 .await; 147 148 Ok(()) 149 } 150} 151 152impl Ingest for Sync<'_> { 153 type Error = crate::backfill::Error; 154 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> { 155 backfill(conn, None).await 156 } 157} 158 159pub fn ingest( 160 queue: Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>, 161 conn: Arc<Pool<Postgres>>, 162) -> JoinHandle<()> { 163 tokio::spawn(async move { 164 loop { 165 let Some(next) = queue.lock().await.pop_front() else { 166 continue; 167 }; 168 169 match next { 170 SubscribeReposMessage::Commit(commit) => { 171 commit.ingest(conn.clone()).await.unwrap_or_else(|err| { 172 eprintln!("error handling #commit({}): {:?}", commit.clone().rev, err) 173 }) 174 } 175 SubscribeReposMessage::Sync(sync) => { 176 sync.ingest(conn.clone()).await.unwrap_or_else(|err| { 177 eprintln!("error handling #sync({}): {:?}", sync.clone().rev, err) 178 }) 179 } 180 SubscribeReposMessage::Identity(identity) => println!( 181 "ignoring #identity({}) event. has user migrated?", 182 identity.handle.unwrap_or(Handle::raw("handle.invalid")) 183 ), 184 SubscribeReposMessage::Account(account) => println!( 185 "ignoring #account({} {}) event. has user deactivated?", 186 account.active, 187 account.status.unwrap_or("unknown".into()) 188 ), 189 SubscribeReposMessage::Info(info) => { 190 println!("ignoring #info({}) event", info.name) 191 } 192 SubscribeReposMessage::Unknown(_) => { 193 println!("ignoring unknown event. is meview outdated?") 194 } 195 }; 196 } 197 }) 198}