ceres: a small planet in a giant solar system
33
fork

Configure Feed

Select the types of activity you want to include in your feed.

get profile wip

+114 -32
+20 -1
src/main.rs
··· 1 1 use std::env; 2 2 use std::net::SocketAddr; 3 3 use std::path::Path; 4 + use std::sync::Arc; 4 5 6 + use axum::http::{HeaderValue, header::USER_AGENT}; 5 7 use env_logger::Env; 8 + use jacquard::client::credential_session::CredentialSession; 9 + use jacquard::client::{Agent, MemorySessionStore}; 10 + use jacquard::xrpc::CallOptions; 6 11 use jacquard::{identity::resolver::ResolverOptions, prelude::JacquardResolver, types::did::Did}; 7 12 use jacquard_axum::service_auth::ServiceAuthConfig; 8 13 use log::{info, warn}; ··· 40 45 .build() 41 46 .map_err(|e| Error::Other(format!("reqwest build: {e}")))?; 42 47 48 + let store = MemorySessionStore::default(); 49 + 43 50 let resolver = JacquardResolver::new(reqwest_client.clone(), ResolverOptions::default()); 51 + let session = CredentialSession::new(Arc::new(store), Arc::new(resolver.clone())).with_options( 52 + CallOptions { 53 + auth: None, 54 + atproto_proxy: None, 55 + atproto_accept_labelers: None, 56 + extra_headers: vec![(USER_AGENT, HeaderValue::from_static(APP_USER_AGENT))], 57 + }, 58 + ); 59 + let agent = Arc::new(Agent::new(session)); 44 60 45 61 let app_view_domain = env::var("APP_VIEW_DOMAIN").expect("APP_VIEW_DOMAIN is not set"); 46 62 let service_did = Did::new_owned(format!("did:web:{app_view_domain}")) ··· 53 69 let state = AppState { 54 70 service_auth, 55 71 reqwest_client, 72 + agent, 56 73 resolver, 57 74 forwarded_app_view: env::var("FORWARDED_APP_VIEW").ok(), 58 75 db: db.clone(), ··· 66 83 let db = db.clone(); 67 84 let token = token.clone(); 68 85 tasks.spawn(async move { 69 - sync::firehose::Subscriber::new(fh_host, db).run(token).await 86 + sync::firehose::Subscriber::new(fh_host, db) 87 + .run(token) 88 + .await 70 89 }); 71 90 } else { 72 91 info!("firehose: CERES_FIREHOSE_HOST unset; not spawning subscriber");
+84 -27
src/server/xrpc/app_bsky_actor.rs
··· 1 - use crate::server::xrpc::XrpcErrorResponse; 1 + use std::str::FromStr; 2 + use std::time::SystemTime; 3 + 2 4 use crate::state::AppState; 3 5 use crate::storage; 6 + use crate::{server::xrpc::XrpcErrorResponse, sync::backfill::BackfillJob}; 7 + use axum::Form; 4 8 use axum::{Json, Router, extract::State}; 9 + use jacquard::client::AgentSessionExt; 10 + use jacquard::types::aturi::AtUri; 5 11 use jacquard::{ 6 12 IntoStatic, 7 13 prelude::IdentityResolver, ··· 11 17 PreferencesItem, ProfileViewDetailed, 12 18 get_preferences::{GetPreferencesOutput, GetPreferencesRequest}, 13 19 get_profile::{GetProfileOutput, GetProfileRequest}, 20 + profile::Profile, 14 21 put_preferences::PutPreferencesRequest, 15 22 }; 16 23 use jacquard_axum::{ExtractXrpc, IntoRouter, service_auth::ExtractOptionalServiceAuth}; ··· 49 56 XrpcErrorResponse::internal_server_error() 50 57 })?; 51 58 52 - info!("{:?}", auth); 59 + let pds_url = did_doc.pds_endpoint().ok_or_else(|| { 60 + log::error!("No PDS URL found in DID document"); 61 + XrpcErrorResponse::internal_server_error() 62 + })?; 63 + 64 + let at_uri = AtUri::from_str(format!("at://{did}/app.bsky.actor.profile/self").as_str()) 65 + .map_err(|err| { 66 + log::error!("{err}"); 67 + XrpcErrorResponse::internal_server_error() 68 + })?; 69 + let result = state 70 + .agent 71 + .fetch_record_slingshot(&at_uri) 72 + .await 73 + .map_err(|err| { 74 + log::error!("{err}"); 75 + XrpcErrorResponse::internal_server_error() 76 + })?; 77 + let profile_record: Profile<'static> = jacquard::common::from_data_owned(result.value) 78 + .map_err(|err| { 79 + log::error!("parse profile record: {err}"); 80 + XrpcErrorResponse::internal_server_error() 81 + })?; 82 + 83 + // if let Some(auth) = auth { 84 + // storage::backfill_queue::enqueue( 85 + // &state.db, 86 + // SystemTime::now(), 87 + // &BackfillJob { 88 + // did: auth.did().to_string(), 89 + // pds_host: "idk".to_string(), 90 + // cursor: None, 91 + // retry_count: 0, 92 + // reason: "".to_string(), 93 + // }, 94 + // ) 95 + // .map_err(|err| { 96 + // log::error!("{err}"); 97 + // XrpcErrorResponse::internal_server_error() 98 + // })?; 99 + // } 100 + 101 + // info!("{:?}", auth); 53 102 let profile = GetProfileOutput { 54 103 value: ProfileViewDetailed { 55 104 associated: None, 56 - avatar: Some( 57 - UriValue::new_owned( 58 - "https://cdn.bsky.app/img/avatar/plain/did:plc:exampleuser/avatar@jpeg", 59 - ) 60 - .map_err(|_| XrpcErrorResponse::internal_server_error())?, 61 - ), 62 - banner: Some( 63 - UriValue::new_owned( 64 - "https://cdn.bsky.app/img/banner/plain/did:plc:exampleuser/banner@jpeg", 65 - ) 66 - .map_err(|_| XrpcErrorResponse::internal_server_error())?, 67 - ), 68 - created_at: Some(Datetime::raw_str("2024-01-01T00:00:00.000Z")), 105 + avatar: profile_record 106 + .avatar 107 + .map(|blob_ref| { 108 + UriValue::new_owned(format!( 109 + "{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}", 110 + cid = blob_ref.blob().cid() 111 + )) 112 + }) 113 + .transpose() 114 + .map_err(|err| { 115 + log::error!("avatar uri: {err}"); 116 + XrpcErrorResponse::internal_server_error() 117 + })?, 118 + banner: profile_record 119 + .banner 120 + .map(|blob_ref| { 121 + UriValue::new_owned(format!( 122 + "{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}", 123 + cid = blob_ref.blob().cid() 124 + )) 125 + }) 126 + .transpose() 127 + .map_err(|err| { 128 + log::error!("banner uri: {err}"); 129 + XrpcErrorResponse::internal_server_error() 130 + })?, 131 + created_at: profile_record.created_at, 69 132 debug: None, 70 - description: Some( 71 - "An example profile served from the ceres reconnaissance server.".into(), 72 - ), 73 - did: Did::new_static("did:plc:z72i7hdynmk6r22z27h6tvur") 74 - .map_err(|_| XrpcErrorResponse::internal_server_error())?, 75 - display_name: Some("Example User".into()), 133 + description: profile_record.description, 134 + did: did, 135 + display_name: profile_record.display_name, 76 136 followers_count: Some(1234), 77 137 follows_count: Some(56), 78 138 handle: handle, 79 139 indexed_at: Some(Datetime::now()), 80 140 joined_via_starter_pack: None, 81 141 labels: None, 82 - pinned_post: None, 142 + pinned_post: profile_record.pinned_post, 83 143 posts_count: Some(42), 84 - pronouns: Some("they/them".into()), 144 + pronouns: profile_record.pronouns, 85 145 status: None, 86 146 verification: None, 87 147 viewer: None, 88 - website: Some( 89 - UriValue::new_owned("https://example.com") 90 - .map_err(|_| XrpcErrorResponse::internal_server_error())?, 91 - ), 148 + website: profile_record.website, 92 149 extra_data: Default::default(), 93 150 }, 94 151 extra_data: Default::default(),
+7
src/state.rs
··· 1 + use std::sync::Arc; 2 + 3 + use jacquard::client::credential_session::SessionKey; 4 + use jacquard::client::{AtpSession, CredentialAgent, MemorySessionStore}; 1 5 use jacquard::{prelude::JacquardResolver, types::did::Did}; 2 6 use jacquard_axum::service_auth::{ServiceAuth, ServiceAuthConfig}; 3 7 4 8 use crate::storage::DbRef; 5 9 10 + pub type AppAgent = CredentialAgent<MemorySessionStore<SessionKey, AtpSession>, JacquardResolver>; 11 + 6 12 #[derive(Clone)] 7 13 pub struct AppState { 8 14 pub service_auth: ServiceAuthConfig<JacquardResolver>, 9 15 pub reqwest_client: reqwest::Client, 16 + pub agent: Arc<AppAgent>, 10 17 pub resolver: JacquardResolver, 11 18 pub forwarded_app_view: Option<String>, 12 19 pub db: DbRef,
+3 -4
src/sync/firehose/mod.rs
··· 171 171 let Some(seq) = seq else { return }; 172 172 let db = db.clone(); 173 173 let host = host.to_string(); 174 - let res = tokio::task::spawn_blocking(move || { 175 - storage::firehose_cursor::set(&db, &host, seq as u64) 176 - }) 177 - .await; 174 + let res = 175 + tokio::task::spawn_blocking(move || storage::firehose_cursor::set(&db, &host, seq as u64)) 176 + .await; 178 177 match res { 179 178 Ok(Ok(())) => debug!("firehose: cursor saved seq={seq}"), 180 179 Ok(Err(e)) => warn!("firehose: cursor save failed: {e}"),