A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Ingest scrobbles

need to do dedup and batching yet

Mia 05ed06cb 59a87bd8

+147 -12
+15 -11
src/ingest/mod.rs
··· 1 + use std::sync::Arc; 2 + use duckdb::DuckdbConnectionManager; 1 3 use futures::StreamExt; 2 4 use jacquard::jetstream::{ 3 5 CommitOperation, JetstreamAccount, JetstreamCommit, JetstreamIdentity, JetstreamMessage, ··· 10 12 use jacquard_api::fm_teal::alpha::feed::play::Play as TealAlphaPlay; 11 13 use tracing::instrument; 12 14 13 - pub async fn jetstream(jetstream: String, cursor: Option<i64>) -> eyre::Result<()> { 15 + mod scrobbles; 16 + 17 + pub async fn jetstream(db: Arc<DuckdbConnectionManager>, jetstream: String, cursor: Option<i64>) -> eyre::Result<()> { 14 18 let client = TungsteniteSubscriptionClient::from_base_uri(jetstream.parse()?); 15 19 16 20 let params = JetstreamParams::new() ··· 24 28 25 29 while let Some(msg) = messages.next().await { 26 30 match msg { 27 - Ok(msg) => match handle_message(msg).await { 31 + Ok(msg) => match handle_message(&db, msg).await { 28 32 Ok(_) => {} 29 33 Err(e) => println!("Error handling jetstream message: {:?}", e), 30 34 }, ··· 37 41 Ok(()) 38 42 } 39 43 40 - async fn handle_message(message: JetstreamMessage<'_>) -> eyre::Result<()> { 44 + async fn handle_message(db: &DuckdbConnectionManager, message: JetstreamMessage<'_>) -> eyre::Result<()> { 41 45 match message { 42 - JetstreamMessage::Commit { did, commit, .. } => handle_js_commit(did, commit).await, 46 + JetstreamMessage::Commit { did, commit, time_us, .. } => handle_js_commit(db, did, commit, time_us).await, 43 47 JetstreamMessage::Identity { identity, .. } => handle_js_identity(identity).await, 44 48 JetstreamMessage::Account { account, .. } => handle_js_account(account).await, 45 49 } 46 50 } 47 51 48 - #[instrument(skip(commit), fields(nsid=commit.collection.as_str(), rkey=commit.rkey.as_str()))] 49 - async fn handle_js_commit(did: Did<'_>, commit: JetstreamCommit<'_>) -> eyre::Result<()> { 52 + #[instrument(skip(db, commit), fields(nsid=commit.collection.as_str(), rkey=commit.rkey.as_str()))] 53 + async fn handle_js_commit(db: &DuckdbConnectionManager, did: Did<'_>, commit: JetstreamCommit<'_>, time_us: i64) -> eyre::Result<()> { 50 54 if commit.operation == CommitOperation::Delete { 51 55 return Ok(()); 52 56 } ··· 57 61 58 62 match data.type_discriminator() { 59 63 Some(RockskyScrobble::NSID) => { 60 - let scrobble: RockskyScrobble = jacquard::from_data(&data)?; 61 - dbg!(&scrobble); 64 + let scrobble = jacquard::from_data(&data)?; 65 + scrobbles::scrobble_rocksky(db, &did, &commit.rkey, scrobble).await?; 62 66 } 63 67 Some(TealAlphaPlay::NSID) => { 64 - let scrobble: TealAlphaPlay = jacquard::from_data(&data)?; 65 - dbg!(&scrobble); 68 + let scrobble = jacquard::from_data(&data)?; 69 + scrobbles::scrobble_teal(db, &did, &commit.rkey, scrobble).await?; 66 70 } 67 71 _ => unreachable!("only rocksky or teal (alpha) scrobbles supported"), 68 - } 72 + }; 69 73 70 74 Ok(()) 71 75 }
+106
src/ingest/scrobbles.rs
··· 1 + use crate::mbz::{FindMbzData, try_find_mbz_data}; 2 + use chrono::prelude::*; 3 + use duckdb::{Connection, DuckdbConnectionManager, params}; 4 + use jacquard::types::datetime::Datetime; 5 + use jacquard_api::app_rocksky::scrobble::Scrobble as RockskyScrobble; 6 + use jacquard_api::fm_teal::alpha::feed::play::Play as TealAlphaPlay; 7 + use r2d2::ManageConnection; 8 + 9 + pub async fn scrobble_teal<'a>( 10 + db: &DuckdbConnectionManager, 11 + did: &str, 12 + rkey: &str, 13 + scrobble: TealAlphaPlay<'a>, 14 + ) -> eyre::Result<()> { 15 + let conn = db.connect()?; 16 + let created = scrobble.played_time.unwrap_or(Datetime::now()); 17 + let created = created.as_ref().to_utc(); 18 + 19 + // TODO: check track name and time (and artist?) to see if there's already been a scrobble. 20 + // this is notably an issue with the dual rocksky-teal records. 21 + 22 + let artist = scrobble 23 + .artists 24 + .as_ref() 25 + .and_then(|artists| artists.first()) 26 + .map(|v| v.artist_name.as_str()); 27 + 28 + let find = FindMbzData { 29 + release_name: scrobble.release_name.as_deref(), 30 + release_mbid: scrobble.release_mb_id.as_deref(), 31 + release_discrim: scrobble.release_discriminant.as_deref(), 32 + recording_mbid: scrobble.recording_mb_id.as_deref(), 33 + artist_name: artist, 34 + track_discrim: scrobble.track_discriminant.as_deref(), 35 + }; 36 + resolve_and_insert_scrobble(&conn, did, rkey, &scrobble.track_name, find, created) 37 + } 38 + 39 + pub async fn scrobble_rocksky<'a>( 40 + db: &DuckdbConnectionManager, 41 + did: &str, 42 + rkey: &str, 43 + scrobble: RockskyScrobble<'a>, 44 + ) -> eyre::Result<()> { 45 + let conn = db.connect()?; 46 + let created = scrobble.created_at.as_ref().to_utc(); 47 + 48 + // TODO: check track name and time (and artist?) to see if there's already been a scrobble. 49 + // this is notably an issue with the dual rocksky-teal records. 50 + 51 + let find = FindMbzData { 52 + release_name: Some(&scrobble.album), 53 + release_mbid: None, 54 + recording_mbid: None, 55 + artist_name: Some(&scrobble.artist), 56 + ..Default::default() 57 + }; 58 + resolve_and_insert_scrobble(&conn, did, rkey, &scrobble.title, find, created) 59 + } 60 + 61 + fn resolve_and_insert_scrobble( 62 + conn: &Connection, 63 + did: &str, 64 + rkey: &str, 65 + track: &str, 66 + find: FindMbzData, 67 + created: DateTime<Utc>, 68 + ) -> eyre::Result<()> { 69 + if let Some(data) = try_find_mbz_data(conn, track, &find)? { 70 + conn.execute( 71 + r"INSERT INTO scrobbles (did, rkey, track_name, track_mbid, release_name, 72 + release_mbid, release_group_name, release_group_mbid, artists, created_at, debug) 73 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", 74 + params![ 75 + did, 76 + rkey, 77 + track, 78 + data.track_gid, 79 + data.release, 80 + data.release_gid, 81 + data.release_group, 82 + data.release_group_gid, 83 + data.artists, 84 + created, 85 + data.debug, 86 + ], 87 + )?; 88 + } else { 89 + // oh dear, oh dear, oh dear - run with the data from the search... 90 + conn.execute( 91 + r"INSERT INTO scrobbles (did, rkey, track_name, release_name, release_mbid, artists, created_at) 92 + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING", 93 + params![ 94 + did, 95 + rkey, 96 + track, 97 + find.release_name, 98 + find.release_mbid, 99 + find.artist_name, 100 + created, 101 + ], 102 + )?; 103 + } 104 + 105 + Ok(()) 106 + }
+20
src/init.sql
··· 1 + create table if not exists scrobbles 2 + ( 3 + did text not null, 4 + rkey text not null, 5 + 6 + track_name text not null, 7 + track_mbid text, 8 + release_name text, 9 + release_mbid text, 10 + release_group_name text, 11 + release_group_mbid text, 12 + artists text, 13 + 14 + debug text, 15 + 16 + created_at timestamptz not null, 17 + indexed_at timestamptz not null default now(), 18 + 19 + primary key (did, rkey) 20 + );
+6 -1
src/main.rs
··· 1 1 use clap::Parser; 2 2 use std::sync::Arc; 3 + use r2d2::ManageConnection; 3 4 4 5 mod config; 5 6 mod ingest; ··· 12 13 let config = config::Config::parse(); 13 14 14 15 let ddb = duckdb::DuckdbConnectionManager::file(config.db)?; 16 + { 17 + ddb.connect()?.execute_batch(include_str!("init.sql"))?; 18 + } 19 + 15 20 let ddb = Arc::new(ddb); 16 21 17 22 let mut tasks = tokio::task::JoinSet::new(); 18 23 19 - tasks.spawn(ingest::jetstream(config.jetstream, None)); 24 + tasks.spawn(ingest::jetstream(ddb.clone(), config.jetstream, None)); 20 25 tasks.spawn(mbz::start_replication(ddb, config.mb_dump, config.mb_agent)); 21 26 22 27 tasks.join_all().await;