A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

split jetstream consumer up

we really need to run the database bit in a blocking thread until duckdb gets async.
paves the way for write batching (which we need for perf reasons)

Mia 5643fce5 c4566e1e

+70 -35
+44 -13
src/ingest/mod.rs
··· 1 - use std::sync::Arc; 2 1 use duckdb::DuckdbConnectionManager; 3 2 use futures::StreamExt; 3 + use jacquard::StreamErrorKind; 4 4 use jacquard::jetstream::{ 5 5 CommitOperation, JetstreamAccount, JetstreamCommit, JetstreamIdentity, JetstreamMessage, 6 6 JetstreamParams, ··· 10 10 use jacquard::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 11 11 use jacquard_api::app_rocksky::scrobble::Scrobble as RockskyScrobble; 12 12 use jacquard_api::fm_teal::alpha::feed::play::Play as TealAlphaPlay; 13 + use std::sync::Arc; 13 14 use tracing::instrument; 14 15 15 16 mod scrobbles; 16 17 17 - pub async fn jetstream(db: Arc<DuckdbConnectionManager>, jetstream: String, cursor: Option<i64>) -> eyre::Result<()> { 18 + pub async fn jetstream( 19 + db: Arc<DuckdbConnectionManager>, 20 + jetstream: String, 21 + cursor: Option<i64>, 22 + ) -> eyre::Result<()> { 18 23 let client = TungsteniteSubscriptionClient::from_base_uri(jetstream.parse()?); 19 24 20 25 let params = JetstreamParams::new() ··· 26 31 27 32 let (_sink, mut messages) = stream.into_stream(); 28 33 34 + let (tx, mut rx) = tokio::sync::mpsc::channel(64); 35 + 36 + let blocking_handle = tokio::task::spawn_blocking(move || { 37 + let db = db.clone(); 38 + while let Some(msg) = rx.blocking_recv() { 39 + if let Err(e) = handle_message(&db, msg) { 40 + tracing::error!("Error handling jetstream message: {e:?}"); 41 + } 42 + } 43 + }); 44 + 29 45 while let Some(msg) = messages.next().await { 30 46 match msg { 31 - Ok(msg) => match handle_message(&db, msg).await { 47 + Ok(msg) => match tx.send(msg).await { 32 48 Ok(_) => {} 33 - Err(e) => println!("Error handling jetstream message: {:?}", e), 49 + Err(e) => tracing::error!("Error sending jetstream message: {:?}", e), 34 50 }, 51 + Err(e) if *e.kind() == StreamErrorKind::Closed => break, 35 52 Err(e) => { 36 53 tracing::error!("Failed to read jetstream: {e}"); 37 54 } 38 55 } 39 56 } 57 + 58 + // when the jetstream exits, drop tx and 59 + drop(tx); 60 + blocking_handle.await?; 40 61 41 62 Ok(()) 42 63 } 43 64 44 - async fn handle_message(db: &DuckdbConnectionManager, message: JetstreamMessage<'_>) -> eyre::Result<()> { 65 + fn handle_message(db: &DuckdbConnectionManager, message: JetstreamMessage<'_>) -> eyre::Result<()> { 45 66 match message { 46 - JetstreamMessage::Commit { did, commit, time_us, .. } => handle_js_commit(db, did, commit, time_us).await, 47 - JetstreamMessage::Identity { identity, .. } => handle_js_identity(identity).await, 48 - JetstreamMessage::Account { account, .. } => handle_js_account(account).await, 67 + JetstreamMessage::Commit { 68 + did, 69 + commit, 70 + time_us, 71 + .. 72 + } => handle_js_commit(db, did, commit, time_us), 73 + JetstreamMessage::Identity { identity, .. } => handle_js_identity(identity), 74 + JetstreamMessage::Account { account, .. } => handle_js_account(account), 49 75 } 50 76 } 51 77 52 78 #[instrument(skip(db, commit), fields(nsid=commit.collection.as_str(), rkey=commit.rkey.as_str()))] 53 - async fn handle_js_commit(db: &DuckdbConnectionManager, did: Did<'_>, commit: JetstreamCommit<'_>, time_us: i64) -> eyre::Result<()> { 79 + fn handle_js_commit( 80 + db: &DuckdbConnectionManager, 81 + did: Did<'_>, 82 + commit: JetstreamCommit<'_>, 83 + time_us: i64, 84 + ) -> eyre::Result<()> { 54 85 if commit.operation == CommitOperation::Delete { 55 86 return Ok(()); 56 87 } ··· 62 93 match data.type_discriminator() { 63 94 Some(RockskyScrobble::NSID) => { 64 95 let scrobble = jacquard::from_data(&data)?; 65 - scrobbles::scrobble_rocksky(db, &did, &commit.rkey, scrobble).await?; 96 + scrobbles::scrobble_rocksky(db, &did, &commit.rkey, scrobble)?; 66 97 } 67 98 Some(TealAlphaPlay::NSID) => { 68 99 let scrobble = jacquard::from_data(&data)?; 69 - scrobbles::scrobble_teal(db, &did, &commit.rkey, scrobble).await?; 100 + scrobbles::scrobble_teal(db, &did, &commit.rkey, scrobble)?; 70 101 } 71 102 _ => unreachable!("only rocksky or teal (alpha) scrobbles supported"), 72 103 }; ··· 74 105 Ok(()) 75 106 } 76 107 77 - async fn handle_js_identity(account: JetstreamIdentity<'_>) -> eyre::Result<()> { 108 + fn handle_js_identity(account: JetstreamIdentity<'_>) -> eyre::Result<()> { 78 109 Ok(()) 79 110 } 80 111 81 - async fn handle_js_account(account: JetstreamAccount<'_>) -> eyre::Result<()> { 112 + fn handle_js_account(account: JetstreamAccount<'_>) -> eyre::Result<()> { 82 113 Ok(()) 83 114 }
+26 -22
src/ingest/scrobbles.rs
··· 6 6 use jacquard_api::fm_teal::alpha::feed::play::Play as TealAlphaPlay; 7 7 use r2d2::ManageConnection; 8 8 9 - pub async fn scrobble_teal<'a>( 9 + pub fn scrobble_teal<'a>( 10 10 db: &DuckdbConnectionManager, 11 11 did: &str, 12 12 rkey: &str, 13 13 scrobble: TealAlphaPlay<'a>, 14 14 ) -> eyre::Result<()> { 15 15 let conn = db.connect()?; 16 - let created = scrobble.played_time.unwrap_or(Datetime::now()); 16 + let created = scrobble.played_time.clone().unwrap_or(Datetime::now()); 17 17 let created = created.as_ref().to_utc(); 18 18 19 19 if let Some(rkey) = check_duplicate_scrobble(&conn, did, &scrobble.track_name, created)? { ··· 35 35 artist_name: artist, 36 36 track_discrim: scrobble.track_discriminant.as_deref(), 37 37 }; 38 - resolve_and_insert_scrobble(&conn, did, rkey, &scrobble.track_name, find, created) 38 + resolve_and_insert_scrobble(&conn, did, rkey, &scrobble.track_name, find, created)?; 39 + 40 + Ok(()) 39 41 } 40 42 41 - pub async fn scrobble_rocksky<'a>( 43 + pub fn scrobble_rocksky<'a>( 42 44 db: &DuckdbConnectionManager, 43 45 did: &str, 44 46 rkey: &str, ··· 59 61 artist_name: Some(&scrobble.artist), 60 62 ..Default::default() 61 63 }; 62 - resolve_and_insert_scrobble(&conn, did, rkey, &scrobble.title, find, created) 64 + resolve_and_insert_scrobble(&conn, did, rkey, &scrobble.title, find, created)?; 65 + 66 + Ok(()) 63 67 } 64 68 65 69 fn resolve_and_insert_scrobble( ··· 67 71 did: &str, 68 72 rkey: &str, 69 73 track: &str, 70 - find: FindMbzData, 74 + find: FindMbzData<'_>, 71 75 created: DateTime<Utc>, 72 - ) -> eyre::Result<()> { 76 + ) -> duckdb::Result<()> { 73 77 if let Some(data) = try_find_mbz_data(conn, track, &find)? { 74 78 conn.execute( 75 79 r"INSERT INTO scrobbles (did, rkey, track_name, track_mbid, release_name, 76 - release_mbid, release_group_name, release_group_mbid, artists, created_at, debug) 77 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", 80 + release_mbid, release_group_name, release_group_mbid, artists, created_at, debug) 81 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", 78 82 params![ 79 83 did, 80 84 rkey, ··· 92 96 } else { 93 97 // oh dear, oh dear, oh dear - run with the data from the search... 94 98 conn.execute( 95 - r"INSERT INTO scrobbles (did, rkey, track_name, release_name, release_mbid, artists, created_at) 96 - VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING", 97 - params![ 98 - did, 99 - rkey, 100 - track, 101 - find.release_name, 102 - find.release_mbid, 103 - find.artist_name, 104 - created, 105 - ], 106 - )?; 99 + r"INSERT INTO scrobbles (did, rkey, track_name, release_name, release_mbid, artists, created_at) 100 + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING", 101 + params![ 102 + did, 103 + rkey, 104 + track, 105 + find.release_name, 106 + find.release_mbid, 107 + find.artist_name, 108 + created, 109 + ], 110 + )?; 107 111 } 108 112 109 113 Ok(()) ··· 114 118 did: &str, 115 119 track: &str, 116 120 created: DateTime<Utc>, 117 - ) -> eyre::Result<Option<String>> { 121 + ) -> duckdb::Result<Option<String>> { 118 122 let mut stmt = conn.prepare_cached( 119 123 "SELECT rkey FROM scrobbles WHERE did = $1 AND track_name = $2 AND created_at = $3", 120 124 )?;