···11+-- Create a follows table
22+CREATE TABLE IF NOT EXISTS follows
33+(
44+ id INTEGER PRIMARY KEY NOT NULL,
55+ subject TEXT NOT NULL,
66+ rkey TEXT NOT NULL
77+);
+13-1
src/main.rs
···11mod server;
22mod tap;
33+use sqlx::migrate::MigrateDatabase;
3445#[tokio::main]
56async fn main() -> anyhow::Result<()> {
67 dotenv::dotenv().ok();
7899+ let db_url = std::env::var("DATABASE_URL")?;
1010+1111+ if !sqlx::Sqlite::database_exists(&db_url).await? {
1212+ sqlx::Sqlite::create_database(&db_url).await?;
1313+ }
1414+1515+ let pool = sqlx::SqlitePool::connect(&db_url).await?;
1616+1717+ // run migrations
1818+ sqlx::migrate!("./migrations").run(&pool).await?;
1919+820 // add the users repo to tap to ensure it has all of that when the tap subscribing starts,
921 // it has all the users data
1022 let users_did = std::env::var("USERS_DID").unwrap();
1123 tap::add_repo(&users_did).await?;
12241313- tokio::join!(server::run_server(), tap::run_tap(users_did));
2525+ tokio::join!(server::run_server(), tap::run_tap(users_did, &pool));
1426 Ok(())
1527}
+64-13
src/tap.rs
···11-use atproto_tap::{TapClient, TapEvent, connect_to};
11+use atproto_tap::{RecordAction, RecordEvent, TapClient, TapEvent, connect_to};
22+use axum::response::sse::Event;
33+use serde::Deserialize;
24use tokio_stream::StreamExt;
3544-pub async fn run_tap(users_did: String) {
66+#[derive(Deserialize)]
77+struct FollowRecord {
88+ subject: String,
99+}
1010+1111+pub async fn add_repo(did: &String) -> anyhow::Result<()> {
1212+ let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string());
1313+ let client = TapClient::new(tap_url.as_str(), Some("password".to_string()));
1414+1515+ // Add repositories to track
1616+ client.add_repos(&[did.as_str()]).await?;
1717+1818+ Ok(())
1919+}
2020+2121+pub async fn run_tap(users_did: String, pool: &sqlx::SqlitePool) {
522 let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string());
623 let mut stream = connect_to(tap_url.as_str());
724···926 match result {
1027 Ok(event) => match event.as_ref() {
1128 TapEvent::Record { record, .. } => {
1212- // TODO: If the record collection is something other than bsky post and the appview users did
1313- // handle -> such as a follow, block etc
1414- // Otherwise the record collection is a bsky post in which case index
1515- if record.did.clone().into_string() == users_did {
1616- println!("event from user")
2929+ let collection = String::from(record.collection.clone());
3030+3131+ match collection.as_str() {
3232+ "app.bsky.graph.follow" => {
3333+ if record.did.clone().into_string() == users_did {
3434+ handle_user_follow_event(record, pool).await;
3535+ }
3636+ }
3737+ "app.bsky.feed.post" => {
3838+ handle_post_event(record, pool).await;
3939+ }
4040+ _ => {}
1741 }
4242+1843 println!("{} {} {}", record.action, record.collection, record.did);
1944 }
2045 TapEvent::Identity { identity, .. } => {
···2651 }
2752}
28532929-pub async fn add_repo(did: &String) -> anyhow::Result<()> {
3030- let tap_url = std::env::var("TAP_URL").unwrap_or("localhost:2480".to_string());
3131- let client = TapClient::new(tap_url.as_str(), Some("password".to_string()));
5454+async fn handle_user_follow_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
5555+ match record.action {
5656+ RecordAction::Create => {
5757+ let follow: FollowRecord = record.parse_record().unwrap();
5858+ let result = sqlx::query("INSERT INTO follows (subject, rkey) VALUES ($1, $2)")
5959+ .bind(follow.subject)
6060+ .bind(record.rkey.to_string())
6161+ .execute(pool)
6262+ .await;
32633333- // Add repositories to track
3434- client.add_repos(&[did.as_str()]).await?;
6464+ if result.is_err() {
6565+ println!("Error inserting follow into the database: {result:?}");
6666+ }
6767+ }
6868+ RecordAction::Delete => {
6969+ let result = sqlx::query("DELETE FROM follows WHERE rkey = $1")
7070+ .bind(record.rkey.to_string())
7171+ .execute(pool)
7272+ .await;
35733636- Ok(())
7474+ if result.is_err() {
7575+ println!("Error deleting follow from the database: {result:?}");
7676+ }
7777+ }
7878+ RecordAction::Update => {}
7979+ }
8080+}
8181+8282+async fn handle_post_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
8383+ match record.action {
8484+ RecordAction::Create => {}
8585+ RecordAction::Delete => {}
8686+ RecordAction::Update => {}
8787+ }
3788}