···44edition = "2024"
5566[dependencies]
77+anyhow = "1.0"
78async-trait = "0.1.83"
99+atproto-jetstream = "0.13"
810axum = "0.8.8"
911base64 = "0.22"
1012chrono = { version = "0.4.42", features = ["serde"] }
···1820reqwest = { version = "0.12.28", features = ["json"] }
1921serde = "1.0.228"
2022serde_json = "1.0.148"
2121-serde_qs = "0.13"
2323+serde_qs = "0.15"
2224sha2 = "0.10"
2325tokio = { version = "1.48.0", features = ["full"] }
2626+tokio-util = { version = "0.7", features = ["rt"] }
2427urlencoding = "2.1"
2528tokio-postgres = { version = "0.7.13", features = [
2629 "with-serde_json-1",
+8-4
crates/server/src/api/card.rs
···8787 use std::sync::Arc;
88888989 fn create_test_state() -> SharedState {
9090- let pool = crate::db::create_pool().unwrap_or_else(|_| panic!("For testing without DB, use mock pool"));
9090+ let pool = crate::db::create_mock_pool();
9191 let card_repo = Arc::new(MockCardRepository::new()) as Arc<dyn crate::repository::card::CardRepository>;
9292 let note_repo = Arc::new(crate::repository::note::mock::MockNoteRepository::new())
9393 as Arc<dyn crate::repository::note::NoteRepository>;
9494- AppState::new_with_repos(pool, card_repo, note_repo)
9494+ let oauth_repo = Arc::new(crate::repository::oauth::mock::MockOAuthRepository::new())
9595+ as Arc<dyn crate::repository::oauth::OAuthRepository>;
9696+ AppState::new_with_repos(pool, card_repo, note_repo, oauth_repo)
9597 }
96989799 #[tokio::test]
···133135134136 #[tokio::test]
135137 async fn test_list_cards_success() {
136136- let pool = crate::db::create_pool().unwrap_or_else(|_| panic!("For testing without DB, use mock pool"));
138138+ let pool = crate::db::create_mock_pool();
137139138140 let test_deck_id = "550e8400-e29b-41d4-a716-446655440000".to_string();
139141 let test_cards = vec![
···159161 Arc::new(MockCardRepository::with_cards(test_cards)) as Arc<dyn crate::repository::card::CardRepository>;
160162 let note_repo = Arc::new(crate::repository::note::mock::MockNoteRepository::new())
161163 as Arc<dyn crate::repository::note::NoteRepository>;
164164+ let oauth_repo = Arc::new(crate::repository::oauth::mock::MockOAuthRepository::new())
165165+ as Arc<dyn crate::repository::oauth::OAuthRepository>;
162166163163- let state = AppState::new_with_repos(pool, card_repo, note_repo);
167167+ let state = AppState::new_with_repos(pool, card_repo, note_repo, oauth_repo);
164168165169 let response = list_cards(axum::extract::State(state), None, Path(test_deck_id))
166170 .await
+132-53
crates/server/src/api/deck.rs
···294294 };
295295296296 let deck_row = match client
297297- .query_opt("SELECT owner_did FROM decks WHERE id = $1", &[&deck_id])
297297+ .query_opt(
298298+ "SELECT id, owner_did, title, description, tags, visibility, published_at, fork_of
299299+ FROM decks WHERE id = $1",
300300+ &[&deck_id],
301301+ )
298302 .await
299303 {
300304 Ok(Some(row)) => row,
···314318 return (StatusCode::FORBIDDEN, Json(json!({"error": "Only owner can publish"}))).into_response();
315319 }
316320317317- let (new_visibility, published_at) = if payload.published {
318318- (
319319- serde_json::to_value(&Visibility::Public).unwrap(),
320320- Some(chrono::Utc::now()),
321321- )
322322- } else {
323323- (serde_json::to_value(&Visibility::Private).unwrap(), None)
324324- };
325325-326326- match client
327327- .execute(
328328- "UPDATE decks SET visibility = $1, published_at = $2 WHERE id = $3",
329329- &[&new_visibility, &published_at, &deck_id],
330330- )
331331- .await
332332- {
333333- Ok(_) => (),
321321+ let visibility_json: serde_json::Value = deck_row.get("visibility");
322322+ let visibility: Visibility = match serde_json::from_value(visibility_json) {
323323+ Ok(v) => v,
334324 Err(e) => {
335335- tracing::error!("Failed to update deck: {}", e);
325325+ tracing::error!("Failed to parse visibility: {}", e);
336326 return (
337327 StatusCode::INTERNAL_SERVER_ERROR,
338338- Json(json!({"error": "Failed to update deck"})),
328328+ Json(json!({"error": "Invalid deck data"})),
339329 )
340330 .into_response();
341331 }
342342- }
332332+ };
333333+334334+ let fork_of: Option<uuid::Uuid> = deck_row.get("fork_of");
335335+ let mut deck = Deck {
336336+ id: deck_id.to_string(),
337337+ owner_did: owner_did.clone(),
338338+ title: deck_row.get("title"),
339339+ description: deck_row.get("description"),
340340+ tags: deck_row.get("tags"),
341341+ visibility: visibility.clone(),
342342+ published_at: deck_row
343343+ .get::<_, Option<chrono::DateTime<chrono::Utc>>>("published_at")
344344+ .map(|dt| dt.to_rfc3339()),
345345+ fork_of: fork_of.map(|u| u.to_string()),
346346+ };
347347+348348+ let mut deck_at_uri: Option<String> = None;
349349+350350+ if payload.published {
351351+ let card_rows = match client
352352+ .query(
353353+ "SELECT id, owner_did, deck_id, front, back, media_url
354354+ FROM cards WHERE deck_id = $1 ORDER BY created_at ASC",
355355+ &[&deck_id],
356356+ )
357357+ .await
358358+ {
359359+ Ok(rows) => rows,
360360+ Err(e) => {
361361+ tracing::error!("Failed to fetch cards: {}", e);
362362+ return (
363363+ StatusCode::INTERNAL_SERVER_ERROR,
364364+ Json(json!({"error": "Failed to fetch cards"})),
365365+ )
366366+ .into_response();
367367+ }
368368+ };
369369+370370+ let cards: Vec<malfestio_core::model::Card> = card_rows
371371+ .iter()
372372+ .map(|row| {
373373+ let card_id: uuid::Uuid = row.get("id");
374374+ let card_deck_id: uuid::Uuid = row.get("deck_id");
375375+ malfestio_core::model::Card {
376376+ id: card_id.to_string(),
377377+ owner_did: row.get("owner_did"),
378378+ deck_id: card_deck_id.to_string(),
379379+ front: row.get("front"),
380380+ back: row.get("back"),
381381+ media_url: row.get("media_url"),
382382+ }
383383+ })
384384+ .collect();
385385+386386+ match crate::pds::publish::publish_deck_to_pds(state.oauth_repo.clone(), &user.did, &deck, &cards).await {
387387+ Ok(result) => {
388388+ deck_at_uri = Some(result.deck_at_uri.clone());
389389+390390+ if let Err(e) = client
391391+ .execute(
392392+ "UPDATE decks SET at_uri = $1, visibility = $2, published_at = $3 WHERE id = $4",
393393+ &[
394394+ &result.deck_at_uri,
395395+ &serde_json::to_value(&Visibility::Public).unwrap(),
396396+ &Some(chrono::Utc::now()),
397397+ &deck_id,
398398+ ],
399399+ )
400400+ .await
401401+ {
402402+ tracing::error!("Failed to store deck AT-URI: {}", e);
403403+ }
343404344344- let row = match client
345345- .query_one(
346346- "SELECT id, owner_did, title, description, tags, visibility, published_at, fork_of
347347- FROM decks WHERE id = $1",
348348- &[&deck_id],
349349- )
350350- .await
351351- {
352352- Ok(row) => row,
353353- Err(e) => {
354354- tracing::error!("Failed to fetch updated deck: {}", e);
405405+ for (i, at_uri) in result.card_at_uris.iter().enumerate() {
406406+ if i < cards.len()
407407+ && let Ok(card_uuid) = uuid::Uuid::parse_str(&cards[i].id)
408408+ && let Err(e) = client
409409+ .execute("UPDATE cards SET at_uri = $1 WHERE id = $2", &[at_uri, &card_uuid])
410410+ .await
411411+ {
412412+ tracing::warn!("Failed to store card AT-URI: {}", e);
413413+ }
414414+ }
415415+416416+ deck.visibility = Visibility::Public;
417417+ deck.published_at = Some(chrono::Utc::now().to_rfc3339());
418418+ }
419419+ Err(e) => {
420420+ tracing::error!("Failed to publish to PDS: {}", e);
421421+ return (
422422+ StatusCode::SERVICE_UNAVAILABLE,
423423+ Json(json!({"error": format!("Failed to publish to PDS: {}", e)})),
424424+ )
425425+ .into_response();
426426+ }
427427+ }
428428+ } else {
429429+ // Unpublish - just update local visibility
430430+ let (new_visibility, published_at) = (
431431+ serde_json::to_value(&Visibility::Private).unwrap(),
432432+ None::<chrono::DateTime<chrono::Utc>>,
433433+ );
434434+ if let Err(e) = client
435435+ .execute(
436436+ "UPDATE decks SET visibility = $1, published_at = $2 WHERE id = $3",
437437+ &[&new_visibility, &published_at, &deck_id],
438438+ )
439439+ .await
440440+ {
441441+ tracing::error!("Failed to update deck: {}", e);
355442 return (
356443 StatusCode::INTERNAL_SERVER_ERROR,
357357- Json(json!({"error": "Failed to retrieve updated deck"})),
444444+ Json(json!({"error": "Failed to update deck"})),
358445 )
359446 .into_response();
360447 }
361361- };
362362-363363- let visibility_json: serde_json::Value = row.get("visibility");
364364- let visibility: Visibility = serde_json::from_value(visibility_json).unwrap();
365365- let uuid_id: uuid::Uuid = row.get("id");
366366- let fork_of: Option<uuid::Uuid> = row.get("fork_of");
367367-368368- let deck = Deck {
369369- id: uuid_id.to_string(),
370370- owner_did: row.get("owner_did"),
371371- title: row.get("title"),
372372- description: row.get("description"),
373373- tags: row.get("tags"),
374374- visibility,
375375- published_at: row
376376- .get::<_, Option<chrono::DateTime<chrono::Utc>>>("published_at")
377377- .map(|dt| dt.to_rfc3339()),
378378- fork_of: fork_of.map(|u| u.to_string()),
379379- };
448448+ deck.visibility = Visibility::Private;
449449+ deck.published_at = None;
450450+ }
380451381381- Json(deck).into_response()
452452+ if let Some(at_uri) = deck_at_uri {
453453+ Json(json!({
454454+ "deck": deck,
455455+ "at_uri": at_uri
456456+ }))
457457+ .into_response()
458458+ } else {
459459+ Json(deck).into_response()
460460+ }
382461}
+12-6
crates/server/src/api/note.rs
···140140 use std::sync::Arc;
141141142142 fn create_test_state() -> SharedState {
143143- let pool = crate::db::create_pool().unwrap_or_else(|_| panic!("For testing without DB, use mock pool"));
143143+ let pool = crate::db::create_mock_pool();
144144 let card_repo = Arc::new(crate::repository::card::mock::MockCardRepository::new())
145145 as Arc<dyn crate::repository::card::CardRepository>;
146146 let note_repo = Arc::new(MockNoteRepository::new()) as Arc<dyn crate::repository::note::NoteRepository>;
147147- AppState::new_with_repos(pool, card_repo, note_repo)
147147+ let oauth_repo = Arc::new(crate::repository::oauth::mock::MockOAuthRepository::new())
148148+ as Arc<dyn crate::repository::oauth::OAuthRepository>;
149149+ AppState::new_with_repos(pool, card_repo, note_repo, oauth_repo)
148150 }
149151150152 #[tokio::test]
···186188187189 #[tokio::test]
188190 async fn test_list_notes_with_visibility_filtering() {
189189- let pool = crate::db::create_pool().unwrap_or_else(|_| panic!("For testing without DB, use mock pool"));
191191+ let pool = crate::db::create_mock_pool();
190192191193 let test_notes = vec![
192194 Note {
···215217 Arc::new(MockNoteRepository::with_notes(test_notes)) as Arc<dyn crate::repository::note::NoteRepository>;
216218 let card_repo = Arc::new(crate::repository::card::mock::MockCardRepository::new())
217219 as Arc<dyn crate::repository::card::CardRepository>;
220220+ let oauth_repo = Arc::new(crate::repository::oauth::mock::MockOAuthRepository::new())
221221+ as Arc<dyn crate::repository::oauth::OAuthRepository>;
218222219219- let state = AppState::new_with_repos(pool, card_repo, note_repo);
223223+ let state = AppState::new_with_repos(pool, card_repo, note_repo, oauth_repo);
220224221225 let response = list_notes(axum::extract::State(state.clone()), None)
222226 .await
···227231228232 #[tokio::test]
229233 async fn test_get_note_access_control() {
230230- let pool = crate::db::create_pool().unwrap_or_else(|_| panic!("For testing without DB, use mock pool"));
234234+ let pool = crate::db::create_mock_pool();
231235232236 let note_id = "test-note-id".to_string();
233237 let test_notes = vec![Note {
···245249 Arc::new(MockNoteRepository::with_notes(test_notes)) as Arc<dyn crate::repository::note::NoteRepository>;
246250 let card_repo = Arc::new(crate::repository::card::mock::MockCardRepository::new())
247251 as Arc<dyn crate::repository::card::CardRepository>;
252252+ let oauth_repo = Arc::new(crate::repository::oauth::mock::MockOAuthRepository::new())
253253+ as Arc<dyn crate::repository::oauth::OAuthRepository>;
248254249249- let state = AppState::new_with_repos(pool, card_repo, note_repo);
255255+ let state = AppState::new_with_repos(pool, card_repo, note_repo, oauth_repo);
250256251257 let owner = UserContext { did: "did:plc:owner".to_string(), handle: "owner.handle".to_string() };
252258
+11
crates/server/src/db.rs
···2727 Ok(Pool::builder(mgr).max_size(16).build()?)
2828}
29293030+/// Create a mock pool for testing that won't actually connect
3131+#[cfg(test)]
3232+pub fn create_mock_pool() -> DbPool {
3333+ let config = "host=localhost user=test dbname=test"
3434+ .parse::<tokio_postgres::Config>()
3535+ .unwrap();
3636+ let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Fast };
3737+ let mgr = Manager::from_config(config, NoTls, mgr_config);
3838+ Pool::builder(mgr).max_size(1).build().unwrap()
3939+}
4040+3041/// Retry wrapper for getting database connections with exponential backoff
3142pub async fn get_connection_with_retry(
3243 pool: &DbPool, max_retries: u32,
+196
crates/server/src/firehose.rs
···11+//! Firehose consumption via AT Protocol Jetstream.
22+//!
33+//! Provides WebSocket subscription to Jetstream for indexing public records.
44+//! Filters for `app.malfestio.*` collections and indexes them locally.
55+66+use crate::db::DbPool;
77+use async_trait::async_trait;
88+use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent};
99+use tokio_util::sync::CancellationToken;
1010+1111+/// Default Jetstream endpoint (Bluesky's public instance).
1212+pub const DEFAULT_JETSTREAM_URL: &str = "wss://jetstream2.us-west.bsky.network/subscribe";
1313+1414+/// Collections we're interested in indexing.
1515+pub const MALFESTIO_COLLECTIONS: &[&str] = &["app.malfestio.deck", "app.malfestio.card", "app.malfestio.note"];
1616+1717+/// Event handler for Malfestio records from Jetstream.
1818+pub struct MalfestioEventHandler {
1919+ pool: DbPool,
2020+ handler_id: String,
2121+}
2222+2323+impl MalfestioEventHandler {
2424+ /// Create a new event handler with database connection.
2525+ pub fn new(pool: DbPool) -> Self {
2626+ Self { pool, handler_id: "malfestio-indexer".to_string() }
2727+ }
2828+2929+ /// Index a record into the database.
3030+ async fn index_record(
3131+ &self, did: &str, collection: &str, rkey: &str,
3232+ ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3333+ let at_uri = format!("at://{}/{}/{}", did, collection, rkey);
3434+3535+ let client = self.pool.get().await?;
3636+3737+ // Upsert into indexed_records table
3838+ client
3939+ .execute(
4040+ "INSERT INTO indexed_records (at_uri, did, collection, rkey, indexed_at)
4141+ VALUES ($1, $2, $3, $4, NOW())
4242+ ON CONFLICT (at_uri) DO UPDATE SET indexed_at = NOW()",
4343+ &[&at_uri, &did, &collection, &rkey],
4444+ )
4545+ .await?;
4646+4747+ tracing::debug!("Indexed record: {}", at_uri);
4848+ Ok(())
4949+ }
5050+5151+ /// Update cursor position in database for reconnection.
5252+ #[allow(dead_code)]
5353+ pub async fn save_cursor(&self, cursor_us: i64) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
5454+ let client = self.pool.get().await?;
5555+ client
5656+ .execute(
5757+ "INSERT INTO firehose_cursors (endpoint, cursor_us)
5858+ VALUES ($1, $2)
5959+ ON CONFLICT (endpoint) DO UPDATE SET cursor_us = $2, updated_at = NOW()",
6060+ &[&DEFAULT_JETSTREAM_URL, &cursor_us],
6161+ )
6262+ .await?;
6363+ Ok(())
6464+ }
6565+6666+ /// Get saved cursor position for reconnection.
6767+ pub async fn get_cursor(&self) -> Option<i64> {
6868+ let client = self.pool.get().await.ok()?;
6969+ let row = client
7070+ .query_opt(
7171+ "SELECT cursor_us FROM firehose_cursors WHERE endpoint = $1",
7272+ &[&DEFAULT_JETSTREAM_URL],
7373+ )
7474+ .await
7575+ .ok()??;
7676+ row.get("cursor_us")
7777+ }
7878+}
7979+8080+#[async_trait]
8181+impl EventHandler for MalfestioEventHandler {
8282+ fn handler_id(&self) -> String {
8383+ self.handler_id.clone()
8484+ }
8585+8686+ async fn handle_event(&self, event: JetstreamEvent) -> Result<(), anyhow::Error> {
8787+ match event {
8888+ JetstreamEvent::Commit { did, commit, .. } => {
8989+ let collection = &commit.collection;
9090+9191+ // Only process our collections
9292+ if !MALFESTIO_COLLECTIONS.iter().any(|c| collection == *c) {
9393+ return Ok(());
9494+ }
9595+9696+ let rkey = &commit.rkey;
9797+9898+ tracing::info!("Received {} event: did={}, rkey={}", collection, did, rkey);
9999+100100+ // Index the record
101101+ if let Err(e) = self.index_record(&did, collection, rkey).await {
102102+ tracing::warn!("Failed to index record: {}", e);
103103+ }
104104+ }
105105+ JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } | JetstreamEvent::Delete { .. } => {
106106+ // Ignore identity, account, and delete events
107107+ }
108108+ }
109109+ Ok(())
110110+ }
111111+}
112112+113113+/// Configuration for the firehose consumer.
114114+pub struct FirehoseConfig {
115115+ /// Jetstream WebSocket URL
116116+ pub jetstream_url: String,
117117+ /// Collections to filter for
118118+ pub collections: Vec<String>,
119119+ /// Enable zstd compression
120120+ pub compress: bool,
121121+}
122122+123123+impl Default for FirehoseConfig {
124124+ fn default() -> Self {
125125+ Self {
126126+ jetstream_url: DEFAULT_JETSTREAM_URL.to_string(),
127127+ collections: MALFESTIO_COLLECTIONS.iter().map(|s| s.to_string()).collect(),
128128+ compress: true,
129129+ }
130130+ }
131131+}
132132+133133+/// Start the firehose consumer as a background task.
134134+///
135135+/// Returns a `CancellationToken` that can be used to stop the consumer.
136136+pub async fn start_firehose(pool: DbPool, config: FirehoseConfig) -> CancellationToken {
137137+ let cancel = CancellationToken::new();
138138+ let cancel_clone = cancel.clone();
139139+140140+ let handler = MalfestioEventHandler::new(pool);
141141+142142+ // Build consumer config
143143+ let task_config = ConsumerTaskConfig {
144144+ user_agent: "malfestio-indexer/0.1.0".to_string(),
145145+ compression: config.compress,
146146+ zstd_dictionary_location: String::new(),
147147+ jetstream_hostname: config.jetstream_url.replace("wss://", "").replace("/subscribe", ""),
148148+ collections: config.collections,
149149+ dids: vec![],
150150+ max_message_size_bytes: None,
151151+ cursor: None,
152152+ require_hello: false,
153153+ };
154154+155155+ tokio::spawn(async move {
156156+ tracing::info!("Starting Jetstream firehose consumer...");
157157+158158+ if let Some(cursor) = handler.get_cursor().await {
159159+ tracing::info!("Resuming from cursor: {}", cursor);
160160+ }
161161+162162+ let consumer = Consumer::new(task_config);
163163+ if let Err(e) = consumer.register_handler(std::sync::Arc::new(handler)).await {
164164+ tracing::error!("Failed to register handler: {}", e);
165165+ return;
166166+ }
167167+168168+ if let Err(e) = consumer.run_background(cancel_clone).await {
169169+ tracing::error!("Firehose consumer error: {}", e);
170170+ }
171171+172172+ tracing::info!("Firehose consumer stopped");
173173+ });
174174+175175+ cancel
176176+}
177177+178178+#[cfg(test)]
179179+mod tests {
180180+ use super::*;
181181+182182+ #[test]
183183+ fn test_default_firehose_config() {
184184+ let config = FirehoseConfig::default();
185185+ assert_eq!(config.jetstream_url, DEFAULT_JETSTREAM_URL);
186186+ assert_eq!(config.collections.len(), 3);
187187+ assert!(config.compress);
188188+ }
189189+190190+ #[test]
191191+ fn test_malfestio_collections() {
192192+ assert!(MALFESTIO_COLLECTIONS.contains(&"app.malfestio.deck"));
193193+ assert!(MALFESTIO_COLLECTIONS.contains(&"app.malfestio.card"));
194194+ assert!(MALFESTIO_COLLECTIONS.contains(&"app.malfestio.note"));
195195+ }
196196+}
+4
crates/server/src/lib.rs
···11pub mod api;
22pub mod db;
33+pub mod firehose;
34pub mod middleware;
45pub mod oauth;
56pub mod pds;
67pub mod repository;
78pub mod state;
99+pub mod well_known;
810911use axum::http::Method;
1012use axum::{
···4547 let auth_routes = Router::new()
4648 .route("/me", get(api::auth::me))
4749 .route("/decks", post(api::deck::create_deck))
5050+ .route("/decks/{id}/publish", post(api::deck::publish_deck))
4851 .route("/notes", post(api::note::create_note))
4952 .route("/cards", post(api::card::create_card))
5053 .layer(axum_middleware::from_fn(middleware::auth::auth_middleware));
···6972 "/.well-known/oauth-client-metadata",
7073 get(oauth::client_metadata::client_metadata_handler),
7174 )
7575+ .route("/.well-known/atproto-did", get(well_known::atproto_did_handler))
7276 .route("/api/auth/login", post(api::auth::login))
7377 .route("/api/import/article", post(api::importer::import_article))
7478 .nest("/api/oauth", oauth_routes)
+1
crates/server/src/pds/mod.rs
···66//! - uploadBlob - Upload media attachments
7788pub mod client;
99+pub mod publish;
910pub mod records;
+122
crates/server/src/pds/publish.rs
···11+//! Publishing service for AT Protocol PDS operations.
22+//!
33+//! Encapsulates the logic for publishing records to a user's PDS.
44+55+use crate::pds::client::{PdsClient, PdsError};
66+use crate::pds::records::{prepare_card_record, prepare_deck_record};
77+use crate::repository::oauth::{OAuthRepoError, OAuthRepository, StoredToken};
88+use malfestio_core::model::{Card, Deck};
99+use std::sync::Arc;
1010+1111+/// Error type for publishing operations.
1212+#[derive(Debug)]
1313+pub enum PublishError {
1414+ /// User has no stored OAuth tokens
1515+ NoTokens(String),
1616+ /// OAuth token retrieval failed
1717+ TokenError(String),
1818+ /// Invalid DPoP keypair
1919+ InvalidKeypair,
2020+ /// PDS operation failed
2121+ PdsError(PdsError),
2222+ /// Database error storing AT-URI
2323+ DatabaseError(String),
2424+}
2525+2626+impl std::fmt::Display for PublishError {
2727+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2828+ match self {
2929+ PublishError::NoTokens(did) => write!(f, "No OAuth tokens for DID: {}", did),
3030+ PublishError::TokenError(e) => write!(f, "Token error: {}", e),
3131+ PublishError::InvalidKeypair => write!(f, "Invalid DPoP keypair"),
3232+ PublishError::PdsError(e) => write!(f, "PDS error: {}", e),
3333+ PublishError::DatabaseError(e) => write!(f, "Database error: {}", e),
3434+ }
3535+ }
3636+}
3737+3838+impl std::error::Error for PublishError {}
3939+4040+impl From<OAuthRepoError> for PublishError {
4141+ fn from(e: OAuthRepoError) -> Self {
4242+ match e {
4343+ OAuthRepoError::NotFound(did) => PublishError::NoTokens(did),
4444+ _ => PublishError::TokenError(e.to_string()),
4545+ }
4646+ }
4747+}
4848+4949+impl From<PdsError> for PublishError {
5050+ fn from(e: PdsError) -> Self {
5151+ PublishError::PdsError(e)
5252+ }
5353+}
5454+5555+/// Result of publishing a deck to PDS.
5656+pub struct PublishDeckResult {
5757+ /// The AT-URI of the published deck
5858+ pub deck_at_uri: String,
5959+ /// The AT-URIs of the published cards
6060+ pub card_at_uris: Vec<String>,
6161+}
6262+6363+/// Publish a deck and its cards to the user's PDS.
6464+///
6565+/// This function:
6666+/// 1. Retrieves OAuth tokens for the user
6767+/// 2. Creates a PDS client
6868+/// 3. Publishes each card (with placeholder deck ref initially)
6969+/// 4. Publishes the deck with card AT-URIs
7070+///
7171+/// Note: Cards are published with an empty deck_ref since we don't have the
7272+/// deck's AT-URI yet. This is acceptable per the Lexicon - the deck holds
7373+/// the authoritative list of card references.
7474+pub async fn publish_deck_to_pds(
7575+ oauth_repo: Arc<dyn OAuthRepository>, did: &str, deck: &Deck, cards: &[Card],
7676+) -> Result<PublishDeckResult, PublishError> {
7777+ let stored_token: StoredToken = oauth_repo.get_tokens(did).await?;
7878+ let dpop_keypair = stored_token.dpop_keypair().ok_or(PublishError::InvalidKeypair)?;
7979+8080+ let pds_client = PdsClient::new(
8181+ stored_token.pds_url.clone(),
8282+ stored_token.access_token.clone(),
8383+ dpop_keypair,
8484+ );
8585+8686+ let mut card_at_uris = Vec::with_capacity(cards.len());
8787+ for card in cards {
8888+ let prepared = prepare_card_record(card, "");
8989+ let at_uri = pds_client
9090+ .put_record(did, &prepared.collection, &prepared.rkey, prepared.record)
9191+ .await?;
9292+ card_at_uris.push(at_uri.to_string());
9393+ }
9494+9595+ let prepared = prepare_deck_record(deck, card_at_uris.clone());
9696+ let deck_at_uri = pds_client
9797+ .put_record(did, &prepared.collection, &prepared.rkey, prepared.record)
9898+ .await?;
9999+100100+ Ok(PublishDeckResult { deck_at_uri: deck_at_uri.to_string(), card_at_uris })
101101+}
102102+103103+#[cfg(test)]
104104+mod tests {
105105+ use super::*;
106106+107107+ #[test]
108108+ fn test_publish_error_display() {
109109+ let err = PublishError::NoTokens("did:plc:test".to_string());
110110+ assert!(err.to_string().contains("did:plc:test"));
111111+112112+ let err = PublishError::InvalidKeypair;
113113+ assert!(err.to_string().contains("Invalid DPoP keypair"));
114114+ }
115115+116116+ #[test]
117117+ fn test_publish_error_from_oauth_error() {
118118+ let oauth_err = OAuthRepoError::NotFound("did:plc:test".to_string());
119119+ let publish_err: PublishError = oauth_err.into();
120120+ assert!(matches!(publish_err, PublishError::NoTokens(_)));
121121+ }
122122+}
···11+//! Well-known endpoints for AT Protocol.
22+//!
33+//! Provides:
44+//! - `/.well-known/atproto-did` - Returns the server's DID for domain verification
55+66+use axum::response::IntoResponse;
77+88+/// Handler for `/.well-known/atproto-did`.
99+///
1010+/// Returns the server's DID from the `ATPROTO_SERVER_DID` environment variable.
1111+/// Used for domain verification in AT Protocol.
1212+pub async fn atproto_did_handler() -> impl IntoResponse {
1313+ std::env::var("ATPROTO_SERVER_DID").unwrap_or_default()
1414+}
1515+1616+#[cfg(test)]
1717+mod tests {
1818+ use super::*;
1919+2020+ #[tokio::test]
2121+ async fn test_atproto_did_handler_empty_when_not_set() {
2222+ let original = std::env::var("ATPROTO_SERVER_DID").ok();
2323+ unsafe {
2424+ std::env::remove_var("ATPROTO_SERVER_DID");
2525+ }
2626+2727+ let result = atproto_did_handler().await.into_response();
2828+ assert_eq!(result.status(), axum::http::StatusCode::OK);
2929+3030+ if let Some(val) = original {
3131+ unsafe {
3232+ std::env::set_var("ATPROTO_SERVER_DID", val);
3333+ }
3434+ }
3535+ }
3636+}
+5-20
docs/todo.md
···3838## Roadmap Milestones
39394040- **(Done) Milestone A**: Defined core user journeys, information architecture, and privacy rules for the platform.
4141-4241- **(Done) Milestone B**: Designed AT Protocol Lexicons for all core types and documented data model mapping + publishing pipeline.
4343-4442- **(Done) Milestone C**: Foundations: Repo, CI, Axum API Skeleton, Solid Shell.
4543 - Monorepo layout, CI, Axum/Solid skeletons implemented.
4644 - Backend running on 8080, Frontend on 3000.
4747-4845- **(Done) Milestone D**: Identity + Permissions + Publishing Model.
4946 - Auth MVP, Permission model (Private/Public/SharedWith), and basic Publishing flow implemented.
5047 - Backend API and Frontend Editor updated with tests covering permissions and publishing.
4848+- **(Done) Milestone F**: OAuth + PDS Record Publishing.
4949+ - OAuth 2.1 client flow (PKCE, DPoP, handle/DID resolution, token refresh).
5050+ - PDS client for `putRecord`, `deleteRecord`, `uploadBlob`.
5151+ - TID generation and AT-URI builder in core crate.
5252+ - Database migration for token storage and AT-URI columns.
51535254### Milestone E - Content Authoring (Notes + Cards + Deck Builder)
5355···6668#### Acceptance
67696870- A creator can build a deck from an article and publish it.
6969-7070-### Milestone F - OAuth + PDS Record Publishing
7171-7272-#### Deliverables
7373-7474-- OAuth 2.1 client flow (PKCE + DPoP)
7575- - client_metadata.json endpoint
7676- - handle/DID resolution
7777- - token refresh
7878-- Record publishing to user's PDS
7979- - putRecord for decks, cards, notes
8080- - blob uploads for media attachments
8181- - AT-URI generation for cross-references
8282-8383-#### Acceptance
8484-8585-- A user can authenticate via OAuth, create a deck, and see it in their PDS repository.
86718772### Milestone G - Study Engine (SRS) + Daily Review UX
8873
+25
migrations/003_2025_12_28_firehose_cursor.sql
···11+-- Firehose cursor and indexed records for AT Protocol Jetstream consumption
22+-- Tracks cursor position for reconnection and indexes discovered records
33+44+-- Table for tracking Jetstream cursor position
55+CREATE TABLE firehose_cursors (
66+ id SERIAL PRIMARY KEY,
77+ endpoint TEXT NOT NULL UNIQUE,
88+ cursor_us BIGINT NOT NULL, -- Unix microseconds timestamp
99+ updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
1010+);
1111+1212+CREATE INDEX idx_firehose_cursors_endpoint ON firehose_cursors(endpoint);
1313+1414+CREATE TABLE indexed_records (
1515+ id SERIAL PRIMARY KEY,
1616+ at_uri TEXT NOT NULL UNIQUE,
1717+ did TEXT NOT NULL,
1818+ collection TEXT NOT NULL,
1919+ rkey TEXT NOT NULL,
2020+ indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
2121+);
2222+2323+CREATE INDEX idx_indexed_records_did ON indexed_records(did);
2424+CREATE INDEX idx_indexed_records_collection ON indexed_records(collection);
2525+CREATE INDEX idx_indexed_records_at_uri ON indexed_records(at_uri);