A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

switch from jetstream to tap (prev. nexus (prev. sync-tool))

Mia 3b076610 8e5615b2

+96 -48
+3 -3
src/config.rs
··· 5 5 /// Identification for Musicbrainz - goes in user-agent header. 6 6 #[clap(env)] 7 7 pub mb_agent: String, 8 + /// Tap URL. 9 + #[clap(env)] 10 + pub tap: String, 8 11 /// Optionally, an already downloaded Musicbrainz dump (in tar.bz2 format) 9 12 #[clap(env)] 10 13 pub mb_dump: Option<String>, 11 14 /// Location for the Flashback database 12 15 #[clap(env, default_value = "/data/flashback.db")] 13 16 pub db: String, 14 - /// Jetstream URL to use for record ingest. 15 - #[clap(env, default_value = "wss://jetstream1.us-east.bsky.network")] 16 - pub jetstream: String, 17 17 /// Port for the Flashback server 18 18 #[clap(env, default_value_t = 8080)] 19 19 pub port: u16,
+21 -44
src/ingest/mod.rs
··· 1 1 use duckdb::DuckdbConnectionManager; 2 2 use futures::StreamExt; 3 3 use jacquard::StreamErrorKind; 4 - use jacquard::jetstream::{ 5 - CommitOperation, JetstreamAccount, JetstreamCommit, JetstreamIdentity, JetstreamMessage, 6 - JetstreamParams, 7 - }; 4 + use jacquard::jetstream::CommitOperation; 8 5 use jacquard::types::collection::Collection; 9 - use jacquard::types::did::Did; 10 6 use jacquard::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 11 7 use jacquard_api::app_rocksky::scrobble::Scrobble as RockskyScrobble; 12 8 use jacquard_api::fm_teal::alpha::feed::play::Play as TealAlphaPlay; ··· 14 10 use tracing::instrument; 15 11 16 12 mod scrobbles; 13 + mod tap; 17 14 18 - pub async fn jetstream( 19 - db: Arc<DuckdbConnectionManager>, 20 - jetstream: String, 21 - cursor: Option<i64>, 22 - ) -> eyre::Result<()> { 23 - let client = TungsteniteSubscriptionClient::from_base_uri(jetstream.parse()?); 15 + pub async fn tapstream(db: Arc<DuckdbConnectionManager>, tap: String) -> eyre::Result<()> { 16 + let client = TungsteniteSubscriptionClient::from_base_uri(tap.parse()?); 24 17 25 - let params = JetstreamParams::new() 26 - .maybe_cursor(cursor) 27 - .wanted_collections(vec![RockskyScrobble::nsid(), TealAlphaPlay::nsid()]) 28 - .build(); 29 - 30 - let stream = client.subscribe(&params).await?; 18 + let stream = client.subscribe(&tap::TapParams {}).await?; 31 19 32 20 let (_sink, mut messages) = stream.into_stream(); 33 21 ··· 37 25 let db = db.clone(); 38 26 while let Some(msg) = rx.blocking_recv() { 39 27 if let Err(e) = handle_message(&db, msg) { 40 - tracing::error!("Error handling jetstream message: {e:?}"); 28 + tracing::error!("Error handling tap message: {e:?}"); 41 29 } 42 30 } 43 31 }); ··· 45 33 while let Some(msg) = messages.next().await { 46 34 match msg { 47 35 Ok(msg) => match tx.send(msg).await { 48 - Ok(_) => {metrics::counter!("jetstream_events").increment(1)} 49 - Err(e) => tracing::error!("Error sending jetstream message: {:?}", e), 36 + Ok(_) => metrics::counter!("tapstream_events").increment(1), 37 + Err(e) => tracing::error!("Error sending tap message: {:?}", e), 50 38 }, 51 39 Err(e) if *e.kind() == StreamErrorKind::Closed => break, 52 40 Err(e) => { 53 - tracing::error!("Failed to read jetstream: {e}"); 41 + tracing::error!("Failed to read tap stream: {e}"); 54 42 } 55 43 } 56 44 } ··· 62 50 Ok(()) 63 51 } 64 52 65 - fn handle_message(db: &DuckdbConnectionManager, message: JetstreamMessage<'_>) -> eyre::Result<()> { 53 + fn handle_message(db: &DuckdbConnectionManager, message: tap::TapMessage<'_>) -> eyre::Result<()> { 66 54 match message { 67 - JetstreamMessage::Commit { 68 - did, 69 - commit, 70 - time_us, 71 - .. 72 - } => handle_js_commit(db, did, commit, time_us), 73 - JetstreamMessage::Identity { identity, .. } => handle_js_identity(identity), 74 - JetstreamMessage::Account { account, .. } => handle_js_account(account), 55 + tap::TapMessage::Record { id, record, .. } => handle_record(db, id, record), 56 + tap::TapMessage::User { id, user, .. } => handle_user(id, user), 75 57 } 76 58 } 77 59 78 - #[instrument(skip(db, commit), fields(nsid=commit.collection.as_str(), rkey=commit.rkey.as_str()))] 79 - fn handle_js_commit( 60 + #[instrument(skip(db, commit), fields(nsid=commit.collection.as_str(), did=commit.did.as_str(), rkey=commit.rkey.as_str()))] 61 + fn handle_record( 80 62 db: &DuckdbConnectionManager, 81 - did: Did<'_>, 82 - commit: JetstreamCommit<'_>, 83 - time_us: i64, 63 + id: i64, 64 + commit: tap::TapRecord<'_>, 84 65 ) -> eyre::Result<()> { 85 - if commit.operation == CommitOperation::Delete { 66 + if commit.action == CommitOperation::Delete { 86 67 return Ok(()); 87 68 } 88 69 89 70 let Some(data) = commit.record else { 90 - eyre::bail!("got no data for a {:?} operation", commit.operation); 71 + eyre::bail!("got no data for a {:?} operation", commit.action); 91 72 }; 92 73 93 74 match data.type_discriminator() { 94 75 Some(RockskyScrobble::NSID) => { 95 76 let scrobble = jacquard::from_data(&data)?; 96 - scrobbles::scrobble_rocksky(db, &did, &commit.rkey, scrobble)?; 77 + scrobbles::scrobble_rocksky(db, &commit.did, &commit.rkey, scrobble)?; 97 78 } 98 79 Some(TealAlphaPlay::NSID) => { 99 80 let scrobble = jacquard::from_data(&data)?; 100 - scrobbles::scrobble_teal(db, &did, &commit.rkey, scrobble)?; 81 + scrobbles::scrobble_teal(db, &commit.did, &commit.rkey, scrobble)?; 101 82 } 102 83 _ => unreachable!("only rocksky or teal (alpha) scrobbles supported"), 103 84 }; ··· 105 86 Ok(()) 106 87 } 107 88 108 - fn handle_js_identity(account: JetstreamIdentity<'_>) -> eyre::Result<()> { 109 - Ok(()) 110 - } 111 - 112 - fn handle_js_account(account: JetstreamAccount<'_>) -> eyre::Result<()> { 89 + fn handle_user(id: i64, account: tap::TapUser<'_>) -> eyre::Result<()> { 113 90 Ok(()) 114 91 }
+71
src/ingest/tap.rs
··· 1 + use jacquard::IntoStatic; 2 + use jacquard_common::jetstream::CommitOperation; 3 + use jacquard_common::types::cid::Cid; 4 + use jacquard_common::types::did::Did; 5 + use jacquard_common::types::handle::Handle; 6 + use jacquard_common::types::nsid::Nsid; 7 + use jacquard_common::types::recordkey::Rkey; 8 + use jacquard_common::xrpc::{GenericError, MessageEncoding, SubscriptionResp, XrpcSubscription}; 9 + use jacquard_common::{CowStr, Data}; 10 + use serde::{Deserialize, Serialize}; 11 + 12 + #[derive(Debug, Serialize, Deserialize, IntoStatic)] 13 + #[serde(tag = "type")] 14 + #[serde(rename_all = "lowercase")] 15 + pub enum TapMessage<'a> { 16 + Record { 17 + id: i64, 18 + #[serde(borrow)] 19 + record: TapRecord<'a>, 20 + }, 21 + User { 22 + id: i64, 23 + #[serde(borrow)] 24 + user: TapUser<'a>, 25 + }, 26 + } 27 + 28 + #[derive(Debug, Serialize, Deserialize, IntoStatic)] 29 + pub struct TapRecord<'a> { 30 + #[serde(borrow)] 31 + pub did: Did<'a>, 32 + #[serde(borrow)] 33 + pub collection: Nsid<'a>, 34 + #[serde(borrow)] 35 + pub rkey: Rkey<'a>, 36 + pub action: CommitOperation, 37 + #[serde(borrow)] 38 + pub cid: Option<Cid<'a>>, 39 + #[serde(borrow)] 40 + pub record: Option<Data<'a>>, 41 + } 42 + 43 + #[derive(Debug, Serialize, Deserialize, IntoStatic)] 44 + #[serde(rename_all = "camelCase")] 45 + pub struct TapUser<'a> { 46 + #[serde(borrow)] 47 + pub did: Did<'a>, 48 + #[serde(borrow)] 49 + pub handle: Handle<'a>, 50 + pub is_active: Option<bool>, 51 + #[serde(borrow)] 52 + pub status: CowStr<'a>, 53 + } 54 + 55 + pub struct TapStream; 56 + impl SubscriptionResp for TapStream { 57 + const NSID: &'static str = "tap"; 58 + const ENCODING: MessageEncoding = MessageEncoding::Json; 59 + type Message<'de> = TapMessage<'de>; 60 + type Error<'de> = GenericError<'de>; 61 + } 62 + 63 + #[derive(Debug, Serialize)] 64 + pub struct TapParams; 65 + 66 + impl XrpcSubscription for TapParams { 67 + const NSID: &'static str = "tap"; 68 + const ENCODING: MessageEncoding = MessageEncoding::Json; 69 + const CUSTOM_PATH: Option<&'static str> = Some("/channel"); 70 + type Stream = TapStream; 71 + }
+1 -1
src/main.rs
··· 28 28 let mut tasks = tokio::task::JoinSet::new(); 29 29 30 30 tasks.spawn(server::start_server(ddb.clone(), config.port)); 31 - tasks.spawn(ingest::jetstream(ddb.clone(), config.jetstream, None)); 31 + tasks.spawn(ingest::tapstream(ddb.clone(), config.tap)); 32 32 tasks.spawn(mbz::start_replication(ddb, config.mb_dump, config.mb_agent)); 33 33 34 34 tasks.join_all().await;