···77anyhow = "1.0.102"
88atproto-tap = "0.14.5"
99axum = "0.8.9"
1010+chrono = "0.4.44"
1011dotenv = "0.15.0"
1112serde = { version = "1.0.228", features = ["derive"] }
1213serde_json = "1.0.149"
1313-sqlx = { version = "0.8.6", features = ["runtime-tokio-native-tls", "sqlite"] }
1414+sqlx = { version = "0.8.6", features = ["chrono", "runtime-tokio-native-tls", "sqlite"] }
1515+time = "0.3.47"
1416tokio = { version = "1.52.1", features = ["full"] }
1517tokio-stream = "0.1.18"
+9
migrations/20260422183446_posts.sql
···11+-- Create a posts table
22+CREATE TABLE IF NOT EXISTS posts
33+(
44+ id INTEGER PRIMARY KEY NOT NULL,
55+ created TIMESTAMP NOT NULL,
66+ indexed TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
77+ author TEXT NOT NULL,
88+ rkey TEXT NOT NULL
99+);
+70-6
src/tap.rs
···11+use ::chrono::{DateTime, ParseError, TimeZone, Utc};
12use atproto_tap::{RecordAction, RecordEvent, TapClient, TapEvent, connect_to};
22-use serde::Deserialize;
33-use sqlx::Row;
33+use serde::{Deserialize, Serialize};
44+use sqlx::{FromRow, Row, types::chrono};
45use tokio_stream::StreamExt;
5667#[derive(Deserialize)]
···7879async fn handle_user_follow_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
7980 match record.action {
8081 RecordAction::Create => {
8181- let follow: FollowRecord = record.parse_record().unwrap();
8282+ let follow: FollowRecord = record.parse_record().unwrap(); // TODO - bad error handling there
8283 let result = sqlx::query("INSERT INTO follows (subject, rkey) VALUES ($1, $2)")
8384 .bind(&follow.subject)
8485 .bind(record.rkey.to_string())
···111112 }
112113}
113114114114-async fn handle_post_event(record: &RecordEvent, _pool: &sqlx::SqlitePool) {
115115+async fn handle_post_event(record: &RecordEvent, pool: &sqlx::SqlitePool) {
115116 match record.action {
116116- RecordAction::Create => {}
117117- RecordAction::Delete => {}
117117+ RecordAction::Create => {
118118+ let mut post = PostRecord {
119119+ created: Utc::now(),
120120+ indexed: Utc::now(),
121121+ author: record.did.clone().to_string(),
122122+ rkey: record.rkey.to_string(),
123123+ };
124124+125125+ let tap_post: TapPost = record.parse_record().unwrap(); // TODO: better error handling here
126126+127127+ let created_at = DateTime::parse_from_rfc3339(&tap_post.created_at);
128128+ match created_at {
129129+ Ok(ca) => {
130130+ post.created = ca.to_utc();
131131+ }
132132+ Err(e) => {
133133+ println!("parsing created at: {e}");
134134+ }
135135+ }
136136+137137+ insert_post(post, pool).await;
138138+ }
139139+ RecordAction::Delete => {
140140+ delete_post(record.rkey.to_string(), pool).await;
141141+ }
118142 RecordAction::Update => {}
119143 }
120144}
···141165 }
142166 }
143167}
168168+#[derive(Debug, Deserialize)]
169169+struct TapPost {
170170+ #[serde(rename = "createdAt")]
171171+ created_at: String,
172172+}
173173+174174+#[derive(Debug, FromRow)]
175175+struct PostRecord {
176176+ created: chrono::DateTime<chrono::Utc>,
177177+ indexed: chrono::DateTime<chrono::Utc>,
178178+ author: String,
179179+ rkey: String,
180180+ // TODO: other fields like reply to etc
181181+}
182182+183183+async fn insert_post(post: PostRecord, pool: &sqlx::SqlitePool) {
184184+ let result =
185185+ sqlx::query("INSERT INTO posts (created, indexed, author, rkey) VALUES ($1, $2, $3, $4)")
186186+ .bind(post.created)
187187+ .bind(post.indexed)
188188+ .bind(post.author)
189189+ .bind(post.rkey)
190190+ .execute(pool)
191191+ .await;
192192+193193+ if result.is_err() {
194194+ println!("Error inserting post into the database: {result:?}");
195195+ }
196196+}
197197+198198+async fn delete_post(rkey: String, pool: &sqlx::SqlitePool) {
199199+ let result = sqlx::query("DELETE FROM posts WHERE rkey = $1")
200200+ .bind(rkey)
201201+ .execute(pool)
202202+ .await;
203203+204204+ if result.is_err() {
205205+ println!("Error deleting post from the database: {result:?}");
206206+ }
207207+}