A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

xrpc endpoints

Mia 492b5357 efb27a70

+543 -3
+27 -2
Cargo.lock
··· 1277 1277 "inventory", 1278 1278 "jacquard", 1279 1279 "jacquard-api", 1280 + "jacquard-axum", 1280 1281 "jacquard-common", 1282 + "jacquard-identity", 1281 1283 "jacquard-lexicon", 1282 1284 "r2d2", 1283 1285 "reqwest", ··· 2136 2138 ] 2137 2139 2138 2140 [[package]] 2141 + name = "jacquard-axum" 2142 + version = "0.9.2" 2143 + source = "registry+https://github.com/rust-lang/crates.io-index" 2144 + checksum = "ee1b58111f0a2a08ee18525ac661a0448fdc84d4ecb423d64aaaf88074460380" 2145 + dependencies = [ 2146 + "axum", 2147 + "bytes", 2148 + "jacquard", 2149 + "jacquard-common", 2150 + "jacquard-derive", 2151 + "jacquard-identity", 2152 + "miette", 2153 + "multibase", 2154 + "serde", 2155 + "serde_html_form", 2156 + "serde_json", 2157 + "thiserror 2.0.17", 2158 + "tokio", 2159 + "tower-http", 2160 + "tracing", 2161 + ] 2162 + 2163 + [[package]] 2139 2164 name = "jacquard-common" 2140 2165 version = "0.9.2" 2141 2166 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2193 2218 2194 2219 [[package]] 2195 2220 name = "jacquard-identity" 2196 - version = "0.9.1" 2221 + version = "0.9.2" 2197 2222 source = "registry+https://github.com/rust-lang/crates.io-index" 2198 - checksum = "d7883c20ea50ac3de477a2363d466b8024d0c9680df31f7ab50b29fedd906ee8" 2223 + checksum = "1ef714cacebfca486558a9f8e205daf466bfba0466c4d0c450fd6d0252400a53" 2199 2224 dependencies = [ 2200 2225 "bon", 2201 2226 "bytes",
+2
Cargo.toml
··· 15 15 inventory = "0.3.21" 16 16 jacquard = { version = "0.9", default-features = false , features = ["api_bluesky", "derive", "dns", "websocket"] } 17 17 jacquard-api = { version = "0.9", features = ["fm_teal", "app_rocksky"] } 18 + jacquard-axum = "0.9.2" 18 19 jacquard-common = "0.9" 20 + jacquard-identity = { version = "0.9.2", features = ["cache"] } 19 21 jacquard-lexicon = "0.9.2" 20 22 r2d2 = "0.8" 21 23 reqwest = "0.12.24"
+125
src/analysis.rs
··· 1 + use chrono::prelude::*; 2 + use duckdb::{Connection, params}; 3 + 4 + #[derive(Debug)] 5 + pub struct TopAlbumsResp { 6 + pub mbid: String, 7 + pub name: String, 8 + pub artist_credit: Option<String>, 9 + pub release_group: i64, 10 + pub count: i64, 11 + } 12 + 13 + pub fn get_top_albums( 14 + conn: &Connection, 15 + did: &str, 16 + start: DateTime<Utc>, 17 + end: DateTime<Utc>, 18 + limit: i64, 19 + ) -> duckdb::Result<Vec<TopAlbumsResp>> { 20 + let mut stmt = conn.prepare_cached( 21 + r"WITH albums AS ( 22 + SELECT release_mbid, count: count(*) FROM scrobbles 23 + WHERE did=$1 AND created_at BETWEEN $2 AND $3 24 + GROUP BY release_mbid 25 + ORDER BY count DESC LIMIT $4) 26 + SELECT release.gid, release.name, artist_credit.name, release_group, count FROM mbz.release 27 + INNER JOIN albums ON release_mbid = release.gid 28 + INNER JOIN mbz.artist_credit ON artist_credit.id = release.artist_credit 29 + ORDER BY count DESC", 30 + )?; 31 + 32 + let rows = stmt 33 + .query_map(params![did, start, end, limit], |row| { 34 + Ok(TopAlbumsResp { 35 + mbid: row.get(0)?, 36 + name: row.get(1)?, 37 + artist_credit: row.get(2)?, 38 + release_group: row.get(3)?, 39 + count: row.get(4)?, 40 + }) 41 + })? 42 + .collect::<Result<_, _>>()?; 43 + 44 + Ok(rows) 45 + } 46 + 47 + #[derive(Debug)] 48 + pub struct TopArtistResp { 49 + pub mbid: String, 50 + pub name: String, 51 + pub count: i64, 52 + } 53 + pub fn get_top_artists( 54 + conn: &Connection, 55 + did: &str, 56 + start: DateTime<Utc>, 57 + end: DateTime<Utc>, 58 + limit: i64, 59 + ) -> duckdb::Result<Vec<TopArtistResp>> { 60 + let mut stmt = conn.prepare_cached( 61 + r"WITH artists AS ( 62 + SELECT id: artist_credit_name.artist, count: count(*) FROM scrobbles 63 + INNER JOIN mbz.track ON track.gid = track_mbid 64 + INNER JOIN mbz.artist_credit_name ON artist_credit_name.artist_credit = track.artist_credit 65 + WHERE did=$1 AND created_at BETWEEN $2 AND $3 66 + GROUP BY artist_credit_name.artist 67 + ORDER BY count DESC limit $4) 68 + SELECT gid, name, count FROM mbz.artist 69 + INNER JOIN artists ON artists.id = artist.id 70 + ORDER BY count DESC" 71 + )?; 72 + 73 + let rows = stmt 74 + .query_map(params![did, start, end, limit], |row| { 75 + Ok(TopArtistResp { 76 + mbid: row.get(0)?, 77 + name: row.get(1)?, 78 + count: row.get(2)?, 79 + }) 80 + })? 81 + .collect::<Result<_, _>>()?; 82 + 83 + Ok(rows) 84 + } 85 + 86 + #[derive(Debug)] 87 + pub struct TopTracksResp { 88 + pub mbid: String, 89 + pub name: String, 90 + pub artist_credit: Option<String>, 91 + pub count: i64, 92 + } 93 + 94 + pub fn get_top_tracks( 95 + conn: &Connection, 96 + did: &str, 97 + start: DateTime<Utc>, 98 + end: DateTime<Utc>, 99 + limit: i64, 100 + ) -> duckdb::Result<Vec<TopTracksResp>> { 101 + let mut stmt = conn.prepare_cached( 102 + r"WITH tracks AS ( 103 + SELECT track_mbid, count: count(*) FROM scrobbles 104 + WHERE did=$1 AND created_at BETWEEN $2 AND $3 105 + GROUP BY track_mbid 106 + ORDER BY count DESC LIMIT $4) 107 + SELECT track.gid, track.name, artist_credit.name, count FROM mbz.track 108 + INNER JOIN tracks on track_mbid = track.gid 109 + INNER JOIN mbz.artist_credit ON artist_credit.id = track.artist_credit 110 + ORDER BY count DESC", 111 + )?; 112 + 113 + let rows = stmt 114 + .query_map(params![did, start, end, limit], |row| { 115 + Ok(TopTracksResp { 116 + mbid: row.get(0)?, 117 + name: row.get(1)?, 118 + artist_credit: row.get(2)?, 119 + count: row.get(3)?, 120 + }) 121 + })? 122 + .collect::<Result<_, _>>()?; 123 + 124 + Ok(rows) 125 + }
+2
src/main.rs
··· 2 2 use std::sync::Arc; 3 3 use r2d2::ManageConnection; 4 4 5 + mod analysis; 5 6 mod config; 6 7 mod ingest; 7 8 mod lex; 8 9 mod mbz; 9 10 mod server; 11 + mod utils; 10 12 11 13 #[tokio::main] 12 14 async fn main() -> eyre::Result<()> {
+152
src/server/actor.rs
··· 1 + use super::GlobalState; 2 + use crate::analysis; 3 + use crate::lex::queries::{ 4 + GetTopAlbumsOutput, GetTopAlbumsRequest, GetTopArtistsOutput, GetTopArtistsRequest, 5 + GetTopTracksOutput, GetTopTracksRequest, 6 + }; 7 + use crate::lex::{Album, Artist, TopAlbumEntry, TopArtistEntry, TopTrackEntry, Track}; 8 + use crate::utils::{db_call, get_public_profile, resolve_atid}; 9 + use axum::Json; 10 + use axum::extract::State; 11 + use axum::http::StatusCode; 12 + use jacquard_axum::ExtractXrpc; 13 + 14 + pub async fn get_top_albums( 15 + State(state): State<GlobalState>, 16 + ExtractXrpc(req): ExtractXrpc<GetTopAlbumsRequest>, 17 + ) -> Result<Json<GetTopAlbumsOutput<'static>>, StatusCode> { 18 + let did = resolve_atid(req.actor).await.map_err(|err| { 19 + tracing::error!("Failed to resolve handle: {err}"); 20 + StatusCode::INTERNAL_SERVER_ERROR 21 + })?; 22 + 23 + let start = req.start.as_ref().to_utc(); 24 + let end = req.end.as_ref().to_utc(); 25 + let limit = req.limit.unwrap_or(50).clamp(1, 100); 26 + 27 + let profile = get_public_profile(&did).await.map_err(|err| { 28 + tracing::error!("Failed to load profile: {err}"); 29 + StatusCode::INTERNAL_SERVER_ERROR 30 + })?; 31 + 32 + let albums = db_call(&state.db, move |conn| { 33 + analysis::get_top_albums(&conn, &did, start, end, limit) 34 + }) 35 + .await 36 + .map_err(|_err| StatusCode::INTERNAL_SERVER_ERROR)? 37 + .map_err(|err| { 38 + tracing::error!("Failed to load top albums: {err}"); 39 + StatusCode::INTERNAL_SERVER_ERROR 40 + })? 41 + .into_iter() 42 + .map(|album| TopAlbumEntry { 43 + album: Album { 44 + album_name: album.name.into(), 45 + album_mbid: Some(album.mbid.into()), 46 + album_art_uri: None, 47 + }, 48 + count: album.count, 49 + }) 50 + .collect(); 51 + 52 + Ok(Json(GetTopAlbumsOutput { 53 + start: req.start, 54 + end: req.end, 55 + profile, 56 + albums, 57 + extra_data: None, 58 + })) 59 + } 60 + 61 + pub async fn get_top_artists( 62 + State(state): State<GlobalState>, 63 + ExtractXrpc(req): ExtractXrpc<GetTopArtistsRequest>, 64 + ) -> Result<Json<GetTopArtistsOutput<'static>>, StatusCode> { 65 + let did = resolve_atid(req.actor).await.map_err(|err| { 66 + tracing::error!("Failed to resolve handle: {err}"); 67 + StatusCode::INTERNAL_SERVER_ERROR 68 + })?; 69 + 70 + let start = req.start.as_ref().to_utc(); 71 + let end = req.end.as_ref().to_utc(); 72 + let limit = req.limit.unwrap_or(50).clamp(1, 100); 73 + 74 + let profile = get_public_profile(&did).await.map_err(|err| { 75 + tracing::error!("Failed to load profile: {err}"); 76 + StatusCode::INTERNAL_SERVER_ERROR 77 + })?; 78 + 79 + let artists = db_call(&state.db, move |conn| { 80 + analysis::get_top_artists(&conn, &did, start, end, limit) 81 + }) 82 + .await 83 + .map_err(|_err| StatusCode::INTERNAL_SERVER_ERROR)? 84 + .map_err(|err| { 85 + tracing::error!("Failed to load top artists: {err}"); 86 + StatusCode::INTERNAL_SERVER_ERROR 87 + })? 88 + .into_iter() 89 + .map(|artist| TopArtistEntry { 90 + artist: Artist { 91 + artist_name: artist.name.into(), 92 + artist_mbid: Some(artist.mbid.into()), 93 + artist_art_uri: None, 94 + }, 95 + count: artist.count, 96 + }) 97 + .collect(); 98 + 99 + Ok(Json(GetTopArtistsOutput { 100 + start: req.start, 101 + end: req.end, 102 + profile, 103 + artists, 104 + extra_data: None, 105 + })) 106 + } 107 + 108 + pub async fn get_top_tracks( 109 + State(state): State<GlobalState>, 110 + ExtractXrpc(req): ExtractXrpc<GetTopTracksRequest>, 111 + ) -> Result<Json<GetTopTracksOutput<'static>>, StatusCode> { 112 + let did = resolve_atid(req.actor).await.map_err(|err| { 113 + tracing::error!("Failed to resolve handle: {err}"); 114 + StatusCode::INTERNAL_SERVER_ERROR 115 + })?; 116 + 117 + let start = req.start.as_ref().to_utc(); 118 + let end = req.end.as_ref().to_utc(); 119 + let limit = req.limit.unwrap_or(50).clamp(1, 100); 120 + 121 + let profile = get_public_profile(&did).await.map_err(|err| { 122 + tracing::error!("Failed to load profile: {err}"); 123 + StatusCode::INTERNAL_SERVER_ERROR 124 + })?; 125 + 126 + let tracks = db_call(&state.db, move |conn| { 127 + analysis::get_top_tracks(&conn, &did, start, end, limit) 128 + }) 129 + .await 130 + .map_err(|_err| StatusCode::INTERNAL_SERVER_ERROR)? 131 + .map_err(|err| { 132 + tracing::error!("Failed to load top tracks: {err}"); 133 + StatusCode::INTERNAL_SERVER_ERROR 134 + })? 135 + .into_iter() 136 + .map(|track| TopTrackEntry { 137 + track: Track { 138 + track_name: track.name.into(), 139 + track_mbid: Some(track.mbid.into()), 140 + }, 141 + count: track.count, 142 + }) 143 + .collect(); 144 + 145 + Ok(Json(GetTopTracksOutput { 146 + start: req.start, 147 + end: req.end, 148 + profile, 149 + tracks, 150 + extra_data: None, 151 + })) 152 + }
+23 -1
src/server/mod.rs
··· 1 + use crate::lex::queries; 1 2 use axum::Router; 2 3 use duckdb::DuckdbConnectionManager; 4 + use jacquard_axum::IntoRouter; 3 5 use std::net::SocketAddr; 4 6 use std::sync::Arc; 5 7 use tokio::net::TcpListener; 6 8 use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; 7 9 use tower_http::trace::TraceLayer; 10 + 11 + mod actor; 12 + mod summary; 8 13 9 14 #[derive(Clone)] 10 15 pub struct GlobalState { ··· 20 25 .layer(TraceLayer::new_for_http()) 21 26 .layer(cors); 22 27 23 - let app = Router::new().layer(mw).with_state(GlobalState { db }); 28 + let app = Router::new() 29 + .merge(queries::GetTopAlbumsRequest::into_router( 30 + actor::get_top_albums, 31 + )) 32 + .merge(queries::GetTopArtistsRequest::into_router( 33 + actor::get_top_artists, 34 + )) 35 + .merge(queries::GetTopTracksRequest::into_router( 36 + actor::get_top_tracks, 37 + )) 38 + .merge(queries::GetAnnualSummaryRequest::into_router( 39 + summary::get_annual_summary, 40 + )) 41 + .merge(queries::GetPeriodSummaryRequest::into_router( 42 + summary::get_period_summary, 43 + )) 44 + .layer(mw) 45 + .with_state(GlobalState { db }); 24 46 25 47 let addr = SocketAddr::from(([0, 0, 0, 0], port)); 26 48 let listener = TcpListener::bind(addr).await?;
+147
src/server/summary.rs
··· 1 + use super::GlobalState; 2 + use crate::analysis; 3 + use crate::lex::queries::{ 4 + GetAnnualSummaryOutput, GetAnnualSummaryRequest, GetPeriodSummaryOutput, 5 + GetPeriodSummaryRequest, 6 + }; 7 + use crate::lex::{Album, Artist, Summary, TopAlbumEntry, TopArtistEntry, TopTrackEntry, Track}; 8 + use crate::utils::{db_call, get_public_profile, resolve_atid}; 9 + use axum::Json; 10 + use axum::extract::State; 11 + use axum::http::StatusCode; 12 + use chrono::prelude::*; 13 + use jacquard_axum::ExtractXrpc; 14 + 15 + pub async fn get_annual_summary( 16 + State(state): State<GlobalState>, 17 + ExtractXrpc(req): ExtractXrpc<GetAnnualSummaryRequest>, 18 + ) -> Result<Json<GetAnnualSummaryOutput<'static>>, StatusCode> { 19 + let did = resolve_atid(req.actor).await.map_err(|err| { 20 + tracing::error!("Failed to resolve handle: {err}"); 21 + StatusCode::INTERNAL_SERVER_ERROR 22 + })?; 23 + let profile = get_public_profile(&did).await.map_err(|err| { 24 + tracing::error!("Failed to load profile: {err}"); 25 + StatusCode::INTERNAL_SERVER_ERROR 26 + })?; 27 + 28 + // this sucks hard, but idk if there's a nicer way of doing it 29 + let start = Utc 30 + .with_ymd_and_hms(req.year as i32, 1, 1, 0, 0, 0) 31 + .unwrap(); 32 + let end = Utc 33 + .with_ymd_and_hms(req.year as i32, 12, 31, 23, 59, 59) 34 + .unwrap(); 35 + 36 + let (albums, artists, tracks) = db_call(&state.db, move |conn| { 37 + let (albums, artists, tracks) = get_core_summary(&conn, &did, start, end)?; 38 + 39 + Ok((albums, artists, tracks)) 40 + }) 41 + .await 42 + .map_err(|_err| StatusCode::INTERNAL_SERVER_ERROR)? 43 + .map_err(|err| { 44 + tracing::error!("Failed to load summary: {err}"); 45 + StatusCode::INTERNAL_SERVER_ERROR 46 + })?; 47 + 48 + Ok(Json(GetAnnualSummaryOutput { 49 + year: req.year, 50 + profile, 51 + summary: Summary { 52 + top_artists: artists, 53 + top_albums: albums, 54 + top_tracks: tracks, 55 + }, 56 + extra_data: None, 57 + })) 58 + } 59 + 60 + pub async fn get_period_summary( 61 + State(state): State<GlobalState>, 62 + ExtractXrpc(req): ExtractXrpc<GetPeriodSummaryRequest>, 63 + ) -> Result<Json<GetPeriodSummaryOutput<'static>>, StatusCode> { 64 + let did = resolve_atid(req.actor).await.map_err(|err| { 65 + tracing::error!("Failed to resolve handle: {err}"); 66 + StatusCode::INTERNAL_SERVER_ERROR 67 + })?; 68 + 69 + let start = req.start.as_ref().to_utc(); 70 + let end = req.end.as_ref().to_utc(); 71 + 72 + let profile = get_public_profile(&did).await.map_err(|err| { 73 + tracing::error!("Failed to load profile: {err}"); 74 + StatusCode::INTERNAL_SERVER_ERROR 75 + })?; 76 + 77 + let (albums, artists, tracks) = db_call(&state.db, move |conn| { 78 + let (albums, artists, tracks) = get_core_summary(&conn, &did, start, end)?; 79 + 80 + Ok((albums, artists, tracks)) 81 + }) 82 + .await 83 + .map_err(|_err| StatusCode::INTERNAL_SERVER_ERROR)? 84 + .map_err(|err| { 85 + tracing::error!("Failed to load summary: {err}"); 86 + StatusCode::INTERNAL_SERVER_ERROR 87 + })?; 88 + 89 + Ok(Json(GetPeriodSummaryOutput { 90 + start: req.start, 91 + end: req.end, 92 + profile, 93 + summary: Summary { 94 + top_artists: artists, 95 + top_albums: albums, 96 + top_tracks: tracks, 97 + }, 98 + })) 99 + } 100 + 101 + fn get_core_summary( 102 + conn: &duckdb::Connection, 103 + did: &str, 104 + start: DateTime<Utc>, 105 + end: DateTime<Utc>, 106 + ) -> duckdb::Result<( 107 + Vec<TopAlbumEntry<'static>>, 108 + Vec<TopArtistEntry<'static>>, 109 + Vec<TopTrackEntry<'static>>, 110 + )> { 111 + let albums = analysis::get_top_albums(conn, did, start, end, 10)? 112 + .into_iter() 113 + .map(|album| TopAlbumEntry { 114 + album: Album { 115 + album_name: album.name.into(), 116 + album_mbid: Some(album.mbid.into()), 117 + album_art_uri: None, 118 + }, 119 + count: album.count, 120 + }) 121 + .collect(); 122 + 123 + let artists = analysis::get_top_artists(conn, did, start, end, 10)? 124 + .into_iter() 125 + .map(|artist| TopArtistEntry { 126 + artist: Artist { 127 + artist_name: artist.name.into(), 128 + artist_mbid: Some(artist.mbid.into()), 129 + artist_art_uri: None, 130 + }, 131 + count: artist.count, 132 + }) 133 + .collect(); 134 + 135 + let tracks = analysis::get_top_tracks(conn, did, start, end, 10)? 136 + .into_iter() 137 + .map(|track| TopTrackEntry { 138 + track: Track { 139 + track_name: track.name.into(), 140 + track_mbid: Some(track.mbid.into()), 141 + }, 142 + count: track.count, 143 + }) 144 + .collect(); 145 + 146 + Ok((albums, artists, tracks)) 147 + }
+65
src/utils.rs
··· 1 + use duckdb::{Connection, DuckdbConnectionManager}; 2 + use jacquard_api::app_bsky::actor::ProfileViewBasic; 3 + use jacquard_api::app_bsky::actor::get_profile::GetProfile; 4 + use jacquard_common::types::did::Did; 5 + use jacquard_common::types::ident::AtIdentifier; 6 + use jacquard_common::xrpc::XrpcExt; 7 + use jacquard_identity::JacquardResolver; 8 + use jacquard_identity::resolver::{HandleStep, IdentityResolver, PlcSource, ResolverOptions}; 9 + use r2d2::ManageConnection; 10 + use std::sync::LazyLock; 11 + use tokio::task::{JoinHandle, spawn_blocking}; 12 + 13 + const BSKY_PUBLIC_API: &str = "https://public.api.bsky.app"; 14 + static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new); 15 + static RESOLVER: LazyLock<JacquardResolver> = LazyLock::new(|| { 16 + JacquardResolver::new( 17 + CLIENT.clone(), 18 + ResolverOptions { 19 + plc_source: PlcSource::slingshot_default(), 20 + handle_order: vec![HandleStep::PdsResolveHandle], 21 + ..Default::default() 22 + }, 23 + ) 24 + }); 25 + 26 + pub fn db_call<F, R>(db: &DuckdbConnectionManager, func: F) -> JoinHandle<duckdb::Result<R>> 27 + where 28 + F: Fn(Connection) -> duckdb::Result<R> + Send + Sync + 'static, 29 + R: Send + Sync + 'static, 30 + { 31 + let conn = db.connect(); 32 + spawn_blocking(move || func(conn?)) 33 + } 34 + 35 + pub async fn get_public_profile(did: &Did<'_>) -> eyre::Result<ProfileViewBasic<'static>> { 36 + let resp = CLIENT 37 + .xrpc(BSKY_PUBLIC_API.parse()?) 38 + .send(&GetProfile::new().actor(AtIdentifier::raw(did)).build()) 39 + .await?; 40 + 41 + let out = resp.into_output()?; 42 + 43 + Ok(ProfileViewBasic { 44 + associated: out.value.associated, 45 + avatar: out.value.avatar, 46 + created_at: out.value.created_at, 47 + debug: out.value.debug, 48 + did: out.value.did, 49 + display_name: out.value.display_name, 50 + handle: out.value.handle, 51 + labels: out.value.labels, 52 + pronouns: out.value.pronouns, 53 + status: out.value.status, 54 + verification: out.value.verification, 55 + viewer: out.value.viewer, 56 + extra_data: out.value.extra_data, 57 + }) 58 + } 59 + 60 + pub async fn resolve_atid(atid: AtIdentifier<'_>) -> eyre::Result<Did<'_>> { 61 + match atid { 62 + AtIdentifier::Did(did) => Ok(did), 63 + AtIdentifier::Handle(handle) => Ok(RESOLVER.resolve_handle(&handle).await?), 64 + } 65 + }