A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

support importing the other dumps

Mia 12545ef5 da7acdd3

+170 -70
-3
src/config.rs
··· 8 8 /// Tap URL. 9 9 #[clap(env)] 10 10 pub tap: String, 11 - /// Optionally, an already downloaded Musicbrainz dump (in tar.bz2 format) 12 - #[clap(env)] 13 - pub mb_dump: Option<String>, 14 11 /// Location for the Flashback database 15 12 #[clap(env, default_value = "/data/flashback.db")] 16 13 pub db: String,
+4 -2
src/main.rs
··· 25 25 26 26 let ddb = Arc::new(ddb); 27 27 28 + let replication_agent = mbz::ReplicationAgent::new(ddb.clone(), config.mb_agent); 29 + 28 30 let mut tasks = tokio::task::JoinSet::new(); 29 31 30 32 tasks.spawn(server::start_server(ddb.clone(), config.port)); 31 - tasks.spawn(ingest::tapstream(ddb.clone(), config.tap)); 32 - tasks.spawn(mbz::start_replication(ddb, config.mb_dump, config.mb_agent)); 33 + tasks.spawn(ingest::tapstream(ddb, config.tap)); 34 + tasks.spawn(replication_agent.start()); 33 35 34 36 tasks.join_all().await; 35 37
+46
src/mbz/init.sql
··· 1 1 install fts; 2 2 load fts; 3 3 4 + begin; 4 5 create schema mbz; 5 6 6 7 -- TODO MAYBE: l_*_* link tables? ··· 78 79 created timestamptz not null 79 80 ); 80 81 82 + create table mbz.artist_tag 83 + ( 84 + artist bigint not null, 85 + tag bigint not null, 86 + count int, 87 + last_update timestamptz, 81 88 89 + primary key (artist, tag) 90 + ); 82 91 83 92 create table mbz.genre 84 93 ( ··· 173 182 created timestamptz not null 174 183 ); 175 184 185 + create table mbz.recording_tag 186 + ( 187 + recording bigint not null, 188 + tag bigint not null, 189 + count int, 190 + last_updated timestamptz, 191 + 192 + primary key (recording, tag) 193 + ); 194 + 176 195 create table mbz.release 177 196 ( 178 197 id bigint primary key, ··· 255 274 gid text primary key, 256 275 new_id bigint not null, 257 276 created timestamptz not null 277 + ); 278 + 279 + create table mbz.release_group_tag 280 + ( 281 + release_group bigint not null, 282 + tag bigint not null, 283 + count int, 284 + last_update timestamptz, 285 + 286 + primary key (release_group, tag) 287 + ); 288 + 289 + create table mbz.release_tag 290 + ( 291 + release bigint not null, 292 + tag bigint not null, 293 + count int, 294 + last_update timestamptz, 295 + 296 + primary key (release, tag) 297 + ); 298 + 299 + create table mbz.tag 300 + ( 301 + id bigint primary key, 302 + name text not null, 303 + ref_count bigint, 258 304 ); 259 305 260 306 create table mbz.track
+1 -1
src/mbz/mod.rs
··· 2 2 3 3 mod replica; 4 4 5 - pub use replica::start_replication; 5 + pub use replica::ReplicationAgent; 6 6 7 7 #[derive(Debug, Default)] 8 8 pub struct FindMbzData<'a> {
+119 -64
src/mbz/replica.rs
··· 3 3 use futures::{StreamExt, TryStreamExt}; 4 4 use r2d2::ManageConnection; 5 5 use reqwest::Client; 6 - use std::path::{Path, PathBuf}; 6 + use std::path::Path; 7 7 use std::str::FromStr; 8 8 use std::sync::Arc; 9 - use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, BufReader}; 9 + use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt}; 10 10 use tokio_util::compat::FuturesAsyncReadCompatExt; 11 11 use tracing::{debug, info}; 12 12 13 13 const CUR_SEQ_SCHEMA: u32 = 30; 14 14 const MBZ_API_BASE: &str = "https://metabrainz.org/api/musicbrainz"; 15 15 const MBZ_FTP_DUMP: &str = "https://ftp.musicbrainz.org/pub/musicbrainz/data/fullexport"; 16 - const WANTED_TABLES: &[&str] = &[ 16 + const WANTED_TABLES_PRIMARY: &[&str] = &[ 17 17 "artist", 18 18 "artist_alias", 19 19 "artist_credit", ··· 35 35 "track", 36 36 "track_gid_redirect", 37 37 ]; 38 + const WANTED_TABLES_DERIVED: &[&str] = &[ 39 + "artist_tag", 40 + "recording_tag", 41 + "release_tag", 42 + "release_group_tag", 43 + "tag", 44 + ]; 38 45 39 - fn check_mbz_schema(db: &DuckdbConnectionManager) -> eyre::Result<bool> { 40 - let conn = db.connect()?; 41 - let maybe_schema = conn 42 - .query_row( 43 - "SELECT schema_name FROM duckdb_schemas() WHERE schema_name='mbz'", 44 - params![], 45 - |row| row.get::<_, String>(0), 46 - ) 47 - .optional()?; 46 + #[derive(Debug, Default)] 47 + #[allow(unused)] 48 + enum MusicbrainzDump { 49 + #[default] 50 + Primary, 51 + CdStubs, 52 + CoverArtArchive, 53 + Derived, 54 + Documentation, 55 + Edit, 56 + Editor, 57 + EventArtArchive, 58 + Stats, 59 + WikiDocs, 60 + } 48 61 49 - Ok(maybe_schema.is_some()) 62 + impl std::fmt::Display for MusicbrainzDump { 63 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 64 + match self { 65 + MusicbrainzDump::Primary => write!(f, "mbdump"), 66 + MusicbrainzDump::CdStubs => write!(f, "mbdump-cdstubs"), 67 + MusicbrainzDump::CoverArtArchive => write!(f, "mbdump-cover-art-archive"), 68 + MusicbrainzDump::Derived => write!(f, "mbdump-derived"), 69 + MusicbrainzDump::Documentation => write!(f, "mbdump-documentation"), 70 + MusicbrainzDump::Edit => write!(f, "mbdump-edit"), 71 + MusicbrainzDump::Editor => write!(f, "mbdump-editor"), 72 + MusicbrainzDump::EventArtArchive => write!(f, "mbdump-event-art-archive"), 73 + MusicbrainzDump::Stats => write!(f, "mbdump-stats"), 74 + MusicbrainzDump::WikiDocs => write!(f, "mbdump-wikidocs"), 75 + } 76 + } 50 77 } 51 78 52 - pub async fn start_replication( 79 + pub struct ReplicationAgent { 53 80 db: Arc<DuckdbConnectionManager>, 54 - dump: Option<String>, 55 - agent: String, 56 - ) -> eyre::Result<()> { 57 - let client = Client::builder().user_agent(agent).build()?; 81 + client: Client, 82 + } 83 + 84 + impl ReplicationAgent { 85 + pub fn new(db: Arc<DuckdbConnectionManager>, agent: String) -> Self { 86 + let client = Client::builder().user_agent(agent).build().unwrap(); 58 87 59 - if !check_mbz_schema(&db)? { 60 - replica_init(db.clone(), &client, dump).await?; 61 - } else { 62 - debug!("skipping initial load - schema mbz exists"); 88 + ReplicationAgent { db, client } 63 89 } 64 90 65 - // TODO: start hourly replication syncs 91 + pub async fn start(self) -> eyre::Result<()> { 92 + if !check_mbz_schema(&self.db)? { 93 + self.init().await?; 94 + } else { 95 + debug!("skipping initial load - schema mbz exists"); 96 + } 97 + 98 + // TODO: start hourly replication syncs 99 + 100 + Ok(()) 101 + } 102 + 103 + /// Prepares the replica for the first time 104 + async fn init(&self) -> eyre::Result<()> { 105 + // create a temp folder to use for downloading and extraction 106 + let tmp = tempfile::tempdir()?; 107 + let dir = tmp.path(); 108 + 109 + debug!("creating musicbrainz tables"); 110 + self.connect_and_run_sql_batch(include_str!("init.sql"))?; 111 + 112 + self.init_dump(MusicbrainzDump::Primary, dir).await?; 113 + self.init_dump(MusicbrainzDump::Derived, dir).await?; 66 114 67 - Ok(()) 68 - } 115 + debug!("musicbrainz dump imported successfully, building FTS"); 116 + self.connect_and_run_sql_batch(include_str!("init_fts.sql"))?; 117 + debug!("finished building FTS"); 69 118 70 - /// Prepares the replica for the first time 71 - async fn replica_init( 72 - db: Arc<DuckdbConnectionManager>, 73 - client: &Client, 74 - dump: Option<String>, 75 - ) -> eyre::Result<()> { 76 - // create a temp folder to use for downloading and extraction 77 - let tmp = tempfile::tempdir()?; 78 - let dir = tmp.path(); 119 + tmp.close()?; 79 120 80 - match dump { 81 - Some(dump) => { 82 - info!("skipping dump download as path provided"); 83 - let compressed = tokio::fs::File::open(&dump).await?; 84 - let compressed = BufReader::new(compressed); 85 - replica_extract(compressed, dir.to_path_buf()).await?; 86 - } 87 - None => { 88 - let compressed = replica_get_dump(client).await?; 89 - replica_extract(compressed, dir.to_path_buf()).await?; 90 - } 121 + info!("musicbrainz import complete!"); 122 + 123 + Ok(()) 124 + } 125 + 126 + async fn init_dump(&self, dump: MusicbrainzDump, dir: &Path) -> eyre::Result<()> { 127 + let wanted = match dump { 128 + MusicbrainzDump::Primary => WANTED_TABLES_PRIMARY, 129 + MusicbrainzDump::Derived => WANTED_TABLES_DERIVED, 130 + _ => eyre::bail!("tried to load a dump with no defined tables!"), 131 + }; 132 + 133 + let compressed = replica_get_dump(&self.client, dump).await?; 134 + replica_extract(compressed, wanted, dir).await?; 135 + replica_import_dump(&self.db, wanted, dir).await 91 136 } 92 137 93 - replica_import_dump(db, dir).await?; 138 + fn connect_and_run_sql_batch(&self, batch: &str) -> duckdb::Result<()> { 139 + self.db.connect()?.execute_batch(batch) 140 + } 141 + } 94 142 95 - tmp.close()?; 143 + fn check_mbz_schema(db: &DuckdbConnectionManager) -> eyre::Result<bool> { 144 + let conn = db.connect()?; 145 + let maybe_schema = conn 146 + .query_row( 147 + "SELECT schema_name FROM duckdb_schemas() WHERE schema_name='mbz'", 148 + params![], 149 + |row| row.get::<_, String>(0), 150 + ) 151 + .optional()?; 96 152 97 - Ok(()) 153 + Ok(maybe_schema.is_some()) 98 154 } 99 155 100 156 /// Downloads the full Musicbrainz dump file 101 - async fn replica_get_dump(client: &Client) -> eyre::Result<impl AsyncBufRead> { 157 + async fn replica_get_dump( 158 + client: &Client, 159 + dump: MusicbrainzDump, 160 + ) -> eyre::Result<impl AsyncBufRead> { 102 161 let latest = client 103 162 .get(format!("{MBZ_FTP_DUMP}/LATEST")) 104 163 .send() ··· 106 165 .error_for_status()? 107 166 .text() 108 167 .await?; 168 + let latest = latest.trim(); 109 169 110 - info!("downloading musicbrainz dump {latest}"); 170 + info!("downloading musicbrainz dump {dump}: {latest}"); 111 171 let dump_res = client 112 - .get(format!("{MBZ_FTP_DUMP}/{latest}/mbdump.tar.bz2")) 172 + .get(format!("{MBZ_FTP_DUMP}/{latest}/{dump}.tar.bz2")) 113 173 .send() 114 174 .await? 115 175 .error_for_status()?; ··· 122 182 Ok(dump) 123 183 } 124 184 125 - async fn replica_extract<T>(compressed: T, tmpdir: PathBuf) -> eyre::Result<u32> 185 + async fn replica_extract<T>(compressed: T, wanted: &[&str], tmpdir: &Path) -> eyre::Result<u32> 126 186 where 127 187 T: AsyncRead + AsyncBufRead + Unpin, 128 188 { ··· 153 213 eyre::bail!("Current schema is not 30 - check if Flashback needs an update!"); 154 214 } 155 215 } else if let Ok(path) = path.strip_prefix("mbdump") 156 - && WANTED_TABLES.contains(&path.to_str().unwrap()) 216 + && wanted.contains(&path.to_str().unwrap()) 157 217 { 158 218 debug!("unpacking {} ({} bytes)", path.display(), hdr.size()?); 159 219 entry.unpack_in(&tmpdir).await?; ··· 164 224 } 165 225 166 226 /// Imports the downloaded dump into duckdb 167 - async fn replica_import_dump(db: Arc<DuckdbConnectionManager>, dump: &Path) -> eyre::Result<()> { 227 + async fn replica_import_dump( 228 + db: &DuckdbConnectionManager, 229 + wanted: &[&str], 230 + dump: &Path, 231 + ) -> eyre::Result<()> { 168 232 let db = db.connect()?; 169 233 let dump = dump.join("mbdump"); 170 234 171 - debug!("creating musicbrainz tables"); 172 - db.execute_batch(include_str!("init.sql"))?; 173 - 174 - for table in WANTED_TABLES { 235 + for table in wanted { 175 236 debug!("importing dump - mbz.{table}"); 176 237 db.execute( 177 238 &format!("COPY mbz.{table} FROM ? (NULLSTR '\\N', QUOTE '')"), 178 239 params![dump.join(table).to_string_lossy()], 179 240 )?; 180 241 } 181 - 182 - info!("musicbrainz dump imported successfully, building FTS"); 183 - 184 - db.execute_batch(include_str!("init_fts.sql"))?; 185 - 186 - info!("finished building FTS"); 187 242 188 243 Ok(()) 189 244 }