A personal app view to see Bsky posts of your followers (for when their app view goes down)
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

track and store reposts

Signed-off-by: Will Andrews <did:plc:dadhhalkfcq3gucaq25hjqon>

+98 -3
+1 -1
docker-compose.yaml
··· 26 26 volumes: 27 27 - ./data:/data 28 28 environment: 29 - TAP_COLLECTION_FILTERS: "app.bsky.feed.post,app.bsky.graph.*" 29 + TAP_COLLECTION_FILTERS: "app.bsky.feed.post,app.bsky.feed.repost,app.bsky.graph.follow"
+10
migrations/20260423190103_reposts.sql
··· 1 + -- Create a reposts table 2 + CREATE TABLE IF NOT EXISTS reposts 3 + ( 4 + id INTEGER PRIMARY KEY NOT NULL, 5 + created TIMESTAMP NOT NULL, 6 + indexed TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 7 + author TEXT NOT NULL, 8 + rkey TEXT NOT NULL, 9 + subject TEXT NOT NULL 10 + );
+87 -2
src/tap.rs
··· 1 - use ::chrono::{DateTime, ParseError, TimeZone, Utc}; 1 + use ::chrono::{DateTime, Utc}; 2 2 use atproto_tap::{RecordAction, RecordEvent, TapClient, TapEvent, connect_to}; 3 - use serde::{Deserialize, Serialize}; 3 + use serde::Deserialize; 4 4 use sqlx::{FromRow, Row, types::chrono}; 5 5 use tokio_stream::StreamExt; 6 6 ··· 61 61 } 62 62 "app.bsky.feed.post" => { 63 63 handle_post_event(record, pool).await; 64 + } 65 + "app.bsky.feed.repost" => { 66 + handle_repost_event(record, pool).await; 64 67 } 65 68 _ => {} 66 69 } ··· 143 146 } 144 147 } 145 148 149 + async fn handle_repost_event(record: &RecordEvent, pool: &sqlx::SqlitePool) { 150 + match record.action { 151 + RecordAction::Create => { 152 + let tap_post: TapRepost = record.parse_record().unwrap(); // TODO: better error handling here 153 + 154 + let mut repost = RepostRecord { 155 + created: Utc::now(), 156 + indexed: Utc::now(), 157 + author: record.did.clone().to_string(), 158 + rkey: record.rkey.to_string(), 159 + subject: tap_post.subject.uri, 160 + }; 161 + 162 + let created_at = DateTime::parse_from_rfc3339(&tap_post.created_at); 163 + match created_at { 164 + Ok(ca) => { 165 + repost.created = ca.to_utc(); 166 + } 167 + Err(e) => { 168 + println!("parsing created at: {e}"); 169 + } 170 + } 171 + 172 + insert_repost(repost, pool).await; 173 + } 174 + RecordAction::Delete => { 175 + delete_repost(record.rkey.to_string(), pool).await; 176 + } 177 + RecordAction::Update => {} 178 + } 179 + } 180 + 146 181 async fn get_follow_by_rkey(rkey: &String, pool: &sqlx::SqlitePool) -> String { 147 182 let result = sqlx::query("SELECT subject FROM follows where rkey = ? LIMIT 1") 148 183 .bind(rkey) ··· 171 206 created_at: String, 172 207 } 173 208 209 + #[derive(Debug, Deserialize)] 210 + struct TapRepost { 211 + #[serde(rename = "createdAt")] 212 + created_at: String, 213 + subject: Subject, 214 + } 215 + 216 + #[derive(Debug, Deserialize)] 217 + struct Subject { 218 + uri: String, 219 + } 220 + 174 221 #[derive(Debug, FromRow)] 175 222 struct PostRecord { 176 223 created: chrono::DateTime<chrono::Utc>, ··· 180 227 // TODO: other fields like reply to etc 181 228 } 182 229 230 + #[derive(Debug, FromRow)] 231 + struct RepostRecord { 232 + created: chrono::DateTime<chrono::Utc>, 233 + indexed: chrono::DateTime<chrono::Utc>, 234 + author: String, 235 + rkey: String, 236 + subject: String, 237 + // TODO: other fields like reply to etc 238 + } 239 + 183 240 async fn insert_post(post: PostRecord, pool: &sqlx::SqlitePool) { 184 241 let result = 185 242 sqlx::query("INSERT INTO posts (created, indexed, author, rkey) VALUES ($1, $2, $3, $4)") ··· 195 252 } 196 253 } 197 254 255 + async fn insert_repost(repost: RepostRecord, pool: &sqlx::SqlitePool) { 256 + let result = sqlx::query( 257 + "INSERT INTO reposts (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5)", 258 + ) 259 + .bind(repost.created) 260 + .bind(repost.indexed) 261 + .bind(repost.author) 262 + .bind(repost.rkey) 263 + .bind(repost.subject) 264 + .execute(pool) 265 + .await; 266 + 267 + if result.is_err() { 268 + println!("Error inserting repost into the database: {result:?}"); 269 + } 270 + } 271 + 198 272 async fn delete_post(rkey: String, pool: &sqlx::SqlitePool) { 199 273 let result = sqlx::query("DELETE FROM posts WHERE rkey = $1") 200 274 .bind(rkey) ··· 205 279 println!("Error deleting post from the database: {result:?}"); 206 280 } 207 281 } 282 + 283 + async fn delete_repost(rkey: String, pool: &sqlx::SqlitePool) { 284 + let result = sqlx::query("DELETE FROM reposts WHERE rkey = $1") 285 + .bind(rkey) 286 + .execute(pool) 287 + .await; 288 + 289 + if result.is_err() { 290 + println!("Error deleting repost from the database: {result:?}"); 291 + } 292 + }