this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

make ingestion (hopefully) more robust

- if it misses a create/update/delete it should handle that fine now (upsert)
- if it gets a stream closed error, reconnect asterisk
- remove default pds from .env.template and just use resolver logic

+76 -42
+3 -3
.env.template
··· 3 3 DATABASE_URL=postgres://admin@localhost:5432/meview # postgres uri 4 4 5 5 # optional 6 - USER_PDS_URL=katproto.girlonthemoon.xyz # if ommited, will resolve from did 7 - USER_EXPORT_URL=katproto.girlonthemoon.xyz # if ommited, will copy USER_PDS_URL 8 - USER_SUBSCRIBE_URL=katproto.girlonthemoon.xyz # if ommited, will copy USER_PDS_URL 6 + USER_PDS_URL=abnormal.zip # if ommited, will resolve from did 7 + USER_EXPORT_URL=abnormal.zip # if ommited, will copy USER_PDS_URL 8 + USER_SUBSCRIBE_URL=abnormal.zip # if ommited, will copy USER_PDS_URL
+41 -39
src/ingest/ingest.rs
··· 64 64 return; 65 65 }; 66 66 match op.action.clone().as_str() { 67 - "create" => { 67 + "create" | "update" => { 68 68 let Some(cid) = op.cid.map(|x| x.0.to_string()) else { 69 - eprintln!("Missing cid for create {}/{}", collection, rkey); 69 + eprintln!( 70 + "Missing cid for {} {}/{}", 71 + op.action.clone().as_str(), 72 + collection, 73 + rkey 74 + ); 70 75 return; 71 76 }; 72 77 let Some(val) = val else { 73 - eprintln!("Missing value for create {}/{}/{}", collection, rkey, cid); 78 + eprintln!( 79 + "Missing value for {} {}/{}/{}", 80 + op.action.clone().as_str(), 81 + collection, 82 + rkey, 83 + cid 84 + ); 74 85 return; 75 86 }; 76 87 if let Err(err) = query!( 77 88 "INSERT INTO records (collection, rkey, cid, record) 78 - VALUES ($1, $2, $3, $4)", 79 - collection, 80 - rkey, 81 - cid, 82 - val 83 - ) 84 - .execute(&*conn) 85 - .await 86 - { 87 - eprintln!("Error creating {}/{}/{}\n{}", collection, rkey, cid, err); 88 - } else { 89 - println!("wrote {}/{}/{} successfully.", collection, rkey, cid); 90 - }; 91 - } 92 - "update" => { 93 - let Some(cid) = op.cid.map(|x| x.0.to_string()) else { 94 - eprintln!("Missing cid for update {}/{}", collection, rkey); 95 - return; 96 - }; 97 - let Some(val) = val else { 98 - eprintln!("Missing value for update {}/{}/{}", collection, rkey, cid); 99 - return; 100 - }; 101 - if let Err(err) = query!( 102 - "UPDATE records SET 103 - collection = $1, 104 - rkey = $2, 105 - cid = $3, 106 - record = $4 107 - WHERE 108 - collection = $1 109 - and rkey = $2", 89 + VALUES ($1, $2, $3, $4) 90 + ON CONFLICT (collection, rkey) 91 + DO UPDATE SET 92 + cid = EXCLUDED.cid, 93 + record = EXCLUDED.record;", 110 94 collection, 111 95 rkey, 112 96 cid, ··· 115 99 .execute(&*conn) 116 100 .await 117 101 { 118 - eprintln!("Error updating {}/{}/{}\n{}", collection, rkey, cid, err); 102 + eprintln!( 103 + "Error applying {} to {}/{}/{}\n{}", 104 + op.action.clone().as_str(), 105 + collection, 106 + rkey, 107 + cid, 108 + err 109 + ); 119 110 } else { 120 - println!("updated {}/{}/{} successfully.", collection, rkey, cid); 111 + println!( 112 + "{} {}/{}/{}", 113 + op.action.clone().as_str(), 114 + collection, 115 + rkey, 116 + cid 117 + ); 121 118 }; 122 119 } 123 120 "delete" => { ··· 133 130 { 134 131 eprintln!("Error deleting {}/{}\n{}", collection, rkey, err); 135 132 } else { 136 - println!("deleted {}/{} successfully.", collection, rkey); 133 + println!("delete {}/{}", collection, rkey); 137 134 }; 138 135 } 139 136 _ => { 140 - println!("missing #{} {:#?} {:#?}", op.action.as_str(), op, val) 137 + println!( 138 + "unknown action {} for {:#?} {:#?}", 139 + op.action.as_str(), 140 + op, 141 + val 142 + ) 141 143 } 142 144 } 143 145 }))
+32
src/ingest/queue.rs
··· 1 + use std::thread; 2 + use std::time::Duration; 1 3 use std::{collections::VecDeque, sync::Arc}; 2 4 3 5 use futures_util::stream::StreamExt; ··· 37 39 Ok(val) => val, 38 40 Err(err) => { 39 41 eprintln!("Warning: Websocket error: {} ({:?})", err, err.source()); 42 + if &jacquard::StreamErrorKind::Closed == err.kind() { 43 + let stream = client.subscribe(&SubscribeRepos::new().build()).await; 44 + // if it reconnected successfully, just continue 45 + let new_messages = match stream { 46 + Ok(val) => val.into_stream().1, 47 + // if it failed, try reconnect 10 times, waiting a second between each attempt 48 + Err(_) => { 49 + let mut new_messages = None; 50 + for _ in 0..10 { 51 + let Ok(stream) = 52 + client.subscribe(&SubscribeRepos::new().build()).await 53 + else { 54 + // wait a second 55 + thread::sleep(Duration::from_secs(1)); 56 + continue; 57 + }; 58 + new_messages = Some(stream.into_stream().1) 59 + } 60 + 61 + if let Some(new_messages) = new_messages { 62 + new_messages 63 + } else { 64 + // could not reconnect so just die lmao 65 + panic!("Could not reconnect to client. Fatal"); 66 + } 67 + } 68 + }; 69 + // for some reason new_messages doesnt need to be mut ? 70 + messages = new_messages; 71 + } 40 72 continue; 41 73 } 42 74 };