A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

musicbrainz mirror to duckdb

Mia ddda7139 6b669ba3

+653 -6
+168 -1
Cargo.lock
··· 291 291 ] 292 292 293 293 [[package]] 294 + name = "astral-tokio-tar" 295 + version = "0.5.6" 296 + source = "registry+https://github.com/rust-lang/crates.io-index" 297 + checksum = "ec179a06c1769b1e42e1e2cbe74c7dcdb3d6383c838454d063eaac5bbb7ebbe5" 298 + dependencies = [ 299 + "filetime", 300 + "futures-core", 301 + "libc", 302 + "portable-atomic", 303 + "rustc-hash", 304 + "tokio", 305 + "tokio-stream", 306 + "xattr", 307 + ] 308 + 309 + [[package]] 294 310 name = "async-compression" 295 311 version = "0.4.32" 296 312 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 570 586 ] 571 587 572 588 [[package]] 589 + name = "bzip2" 590 + version = "0.6.1" 591 + source = "registry+https://github.com/rust-lang/crates.io-index" 592 + checksum = "f3a53fac24f34a81bc9954b5d6cfce0c21e18ec6959f44f56e8e90e4bb7c346c" 593 + dependencies = [ 594 + "libbz2-rs-sys", 595 + ] 596 + 597 + [[package]] 573 598 name = "camino" 574 599 version = "1.2.1" 575 600 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 766 791 source = "registry+https://github.com/rust-lang/crates.io-index" 767 792 checksum = "ef8a506ec4b81c460798f572caead636d57d3d7e940f998160f52bd254bf2d23" 768 793 dependencies = [ 794 + "bzip2", 769 795 "compression-core", 770 796 "flate2", 771 797 "memchr", ··· 1240 1266 name = "flashback" 1241 1267 version = "0.1.0" 1242 1268 dependencies = [ 1269 + "astral-tokio-tar", 1270 + "async-compression", 1243 1271 "axum", 1244 1272 "chrono", 1245 1273 "clap", ··· 1249 1277 "jacquard", 1250 1278 "jacquard-api", 1251 1279 "r2d2", 1280 + "reqwest", 1281 + "tempfile", 1252 1282 "tokio", 1283 + "tokio-util", 1253 1284 "tower-http", 1254 1285 "tracing", 1255 1286 "tracing-subscriber", ··· 1278 1309 checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 1279 1310 1280 1311 [[package]] 1312 + name = "foreign-types" 1313 + version = "0.3.2" 1314 + source = "registry+https://github.com/rust-lang/crates.io-index" 1315 + checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" 1316 + dependencies = [ 1317 + "foreign-types-shared", 1318 + ] 1319 + 1320 + [[package]] 1321 + name = "foreign-types-shared" 1322 + version = "0.1.1" 1323 + source = "registry+https://github.com/rust-lang/crates.io-index" 1324 + checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" 1325 + 1326 + [[package]] 1281 1327 name = "form_urlencoded" 1282 1328 version = "1.2.2" 1283 1329 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1732 1778 "tokio-rustls", 1733 1779 "tower-service", 1734 1780 "webpki-roots", 1781 + ] 1782 + 1783 + [[package]] 1784 + name = "hyper-tls" 1785 + version = "0.6.0" 1786 + source = "registry+https://github.com/rust-lang/crates.io-index" 1787 + checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" 1788 + dependencies = [ 1789 + "bytes", 1790 + "http-body-util", 1791 + "hyper", 1792 + "hyper-util", 1793 + "native-tls", 1794 + "tokio", 1795 + "tokio-native-tls", 1796 + "tower-service", 1735 1797 ] 1736 1798 1737 1799 [[package]] ··· 2324 2386 ] 2325 2387 2326 2388 [[package]] 2389 + name = "libbz2-rs-sys" 2390 + version = "0.2.2" 2391 + source = "registry+https://github.com/rust-lang/crates.io-index" 2392 + checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" 2393 + 2394 + [[package]] 2327 2395 name = "libc" 2328 2396 version = "0.2.177" 2329 2397 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2598 2666 "wasm-bindgen", 2599 2667 "wasm-bindgen-futures", 2600 2668 "web-time", 2669 + ] 2670 + 2671 + [[package]] 2672 + name = "native-tls" 2673 + version = "0.2.14" 2674 + source = "registry+https://github.com/rust-lang/crates.io-index" 2675 + checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" 2676 + dependencies = [ 2677 + "libc", 2678 + "log", 2679 + "openssl", 2680 + "openssl-probe", 2681 + "openssl-sys", 2682 + "schannel", 2683 + "security-framework 2.11.1", 2684 + "security-framework-sys", 2685 + "tempfile", 2601 2686 ] 2602 2687 2603 2688 [[package]] ··· 2734 2819 checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" 2735 2820 2736 2821 [[package]] 2822 + name = "openssl" 2823 + version = "0.10.74" 2824 + source = "registry+https://github.com/rust-lang/crates.io-index" 2825 + checksum = "24ad14dd45412269e1a30f52ad8f0664f0f4f4a89ee8fe28c3b3527021ebb654" 2826 + dependencies = [ 2827 + "bitflags", 2828 + "cfg-if", 2829 + "foreign-types", 2830 + "libc", 2831 + "once_cell", 2832 + "openssl-macros", 2833 + "openssl-sys", 2834 + ] 2835 + 2836 + [[package]] 2837 + name = "openssl-macros" 2838 + version = "0.1.1" 2839 + source = "registry+https://github.com/rust-lang/crates.io-index" 2840 + checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" 2841 + dependencies = [ 2842 + "proc-macro2", 2843 + "quote", 2844 + "syn 2.0.108", 2845 + ] 2846 + 2847 + [[package]] 2737 2848 name = "openssl-probe" 2738 2849 version = "0.1.6" 2739 2850 source = "registry+https://github.com/rust-lang/crates.io-index" 2740 2851 checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" 2852 + 2853 + [[package]] 2854 + name = "openssl-sys" 2855 + version = "0.9.110" 2856 + source = "registry+https://github.com/rust-lang/crates.io-index" 2857 + checksum = "0a9f0075ba3c21b09f8e8b2026584b1d18d49388648f2fbbf3c97ea8deced8e2" 2858 + dependencies = [ 2859 + "cc", 2860 + "libc", 2861 + "pkg-config", 2862 + "vcpkg", 2863 + ] 2741 2864 2742 2865 [[package]] 2743 2866 name = "ouroboros" ··· 2927 3050 checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 2928 3051 2929 3052 [[package]] 3053 + name = "portable-atomic" 3054 + version = "1.11.1" 3055 + source = "registry+https://github.com/rust-lang/crates.io-index" 3056 + checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 3057 + 3058 + [[package]] 2930 3059 name = "potential_utf" 2931 3060 version = "0.1.4" 2932 3061 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3298 3427 "http-body-util", 3299 3428 "hyper", 3300 3429 "hyper-rustls", 3430 + "hyper-tls", 3301 3431 "hyper-util", 3302 3432 "js-sys", 3303 3433 "log", 3304 3434 "mime", 3435 + "native-tls", 3305 3436 "percent-encoding", 3306 3437 "pin-project-lite", 3307 3438 "quinn", ··· 3312 3443 "serde_urlencoded", 3313 3444 "sync_wrapper", 3314 3445 "tokio", 3446 + "tokio-native-tls", 3315 3447 "tokio-rustls", 3316 3448 "tokio-util", 3317 3449 "tower", ··· 3462 3594 "openssl-probe", 3463 3595 "rustls-pki-types", 3464 3596 "schannel", 3465 - "security-framework", 3597 + "security-framework 3.5.1", 3466 3598 ] 3467 3599 3468 3600 [[package]] ··· 3579 3711 "pkcs8", 3580 3712 "subtle", 3581 3713 "zeroize", 3714 + ] 3715 + 3716 + [[package]] 3717 + name = "security-framework" 3718 + version = "2.11.1" 3719 + source = "registry+https://github.com/rust-lang/crates.io-index" 3720 + checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" 3721 + dependencies = [ 3722 + "bitflags", 3723 + "core-foundation 0.9.4", 3724 + "core-foundation-sys", 3725 + "libc", 3726 + "security-framework-sys", 3582 3727 ] 3583 3728 3584 3729 [[package]] ··· 4278 4423 ] 4279 4424 4280 4425 [[package]] 4426 + name = "tokio-native-tls" 4427 + version = "0.3.1" 4428 + source = "registry+https://github.com/rust-lang/crates.io-index" 4429 + checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" 4430 + dependencies = [ 4431 + "native-tls", 4432 + "tokio", 4433 + ] 4434 + 4435 + [[package]] 4281 4436 name = "tokio-rustls" 4282 4437 version = "0.26.4" 4283 4438 source = "registry+https://github.com/rust-lang/crates.io-index" 4284 4439 checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" 4285 4440 dependencies = [ 4286 4441 "rustls", 4442 + "tokio", 4443 + ] 4444 + 4445 + [[package]] 4446 + name = "tokio-stream" 4447 + version = "0.1.17" 4448 + source = "registry+https://github.com/rust-lang/crates.io-index" 4449 + checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" 4450 + dependencies = [ 4451 + "futures-core", 4452 + "pin-project-lite", 4287 4453 "tokio", 4288 4454 ] 4289 4455 ··· 4330 4496 dependencies = [ 4331 4497 "bytes", 4332 4498 "futures-core", 4499 + "futures-io", 4333 4500 "futures-sink", 4334 4501 "futures-util", 4335 4502 "pin-project-lite",
+5
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 + astral-tokio-tar = "0.5.6" 8 + async-compression = { version = "0.4.32", features = ["bzip2", "tokio"] } 7 9 axum = { version = "0.8", features = ["json"] } 8 10 chrono = { version = "0.4", features = ["serde"] } 9 11 clap = { version = "4.5", features = ["derive", "env"] } ··· 13 15 jacquard = { version = "0.9", default-features = false , features = ["api_bluesky", "derive", "dns", "websocket"] } 14 16 jacquard-api = { version = "0.9", features = ["fm_teal", "app_rocksky"] } 15 17 r2d2 = "0.8" 18 + reqwest = "0.12.24" 19 + tempfile = "3.23.0" 16 20 tokio = { version = "1.42", features = ["full"] } 21 + tokio-util = { version = "0.7.17", features = ["compat"] } 17 22 tower-http = { version = "0.6", features = ["cors", "trace"] } 18 23 tracing = "0.1" 19 24 tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+5 -2
src/config.rs
··· 2 2 3 3 #[derive(Debug, Parser)] 4 4 pub struct Config { 5 - /// URI of the Postgres instance containing musicbrainz data 5 + /// Identification for Musicbrainz - goes in user-agent header. 6 + #[clap(env)] 7 + pub mb_agent: String, 8 + /// Optionally, an already downloaded Musicbrainz dump (in tar.bz2 format) 6 9 #[clap(env)] 7 - pub mb_pg_uri: String, 10 + pub mb_dump: Option<String>, 8 11 /// Location for the Flashback database 9 12 #[clap(env, default_value = "/data/flashback.db")] 10 13 pub db: String,
+9 -3
src/main.rs
··· 1 1 use clap::Parser; 2 + use std::sync::Arc; 2 3 3 4 mod config; 4 5 mod ingest; 6 + mod mbz; 5 7 6 8 #[tokio::main] 7 9 async fn main() -> eyre::Result<()> { ··· 10 12 let config = config::Config::parse(); 11 13 12 14 let ddb = duckdb::DuckdbConnectionManager::file(config.db)?; 13 - 14 - let jetstream_handle = tokio::spawn(ingest::jetstream(config.jetstream, None)); 15 + let ddb = Arc::new(ddb); 15 16 16 - jetstream_handle.await??; 17 + let mut tasks = tokio::task::JoinSet::new(); 18 + 19 + tasks.spawn(ingest::jetstream(config.jetstream, None)); 20 + tasks.spawn(mbz::start_replication(ddb, config.mb_dump, config.mb_agent)); 21 + 22 + tasks.join_all().await; 17 23 18 24 Ok(()) 19 25 }
+280
src/mbz/init.sql
··· 1 + create schema mbz; 2 + 3 + -- TODO MAYBE: l_*_* link tables? 4 + -- TODO MAYBE: label, label_alias, label_gid_redirect, release_label ?? 5 + 6 + create table mbz.artist 7 + ( 8 + id bigint primary key, 9 + gid uuid not null, 10 + name text not null, 11 + sort_name text not null, 12 + begin_date_year int, 13 + begin_date_month int, 14 + begin_date_day int, 15 + end_date_year int, 16 + end_date_month int, 17 + end_date_day int, 18 + type int, 19 + area int, 20 + gender int, 21 + comment text, 22 + edits_pending int, 23 + last_update timestamptz, 24 + ended bool not null, 25 + begin_area int, 26 + end_area int, 27 + ); 28 + 29 + create table mbz.artist_alias 30 + ( 31 + id bigint primary key, 32 + artist bigint not null, 33 + name text not null, 34 + locale text, 35 + edits_pending int, 36 + last_update timestamptz, 37 + type int, 38 + sort_name text not null, 39 + begin_date_year int, 40 + begin_date_month int, 41 + begin_date_day int, 42 + end_date_year int, 43 + end_date_month int, 44 + end_date_day int, 45 + primary_for_locale bool not null, 46 + ended bool not null, 47 + ); 48 + 49 + create table mbz.artist_credit 50 + ( 51 + id bigint primary key, 52 + name text not null, 53 + artist_count int not null, 54 + ref_count int not null, 55 + created timestamptz not null, 56 + edits_pending int, 57 + gid uuid not null, 58 + ); 59 + 60 + create table mbz.artist_credit_name 61 + ( 62 + artist_credit bigint not null, 63 + position int not null, 64 + artist bigint not null, 65 + name text not null, 66 + join_phrase text, 67 + 68 + primary key (artist_credit, position) 69 + ); 70 + 71 + create table mbz.artist_gid_redirect 72 + ( 73 + gid text primary key, 74 + new_id bigint not null, 75 + created timestamptz not null 76 + ); 77 + 78 + 79 + 80 + create table mbz.genre 81 + ( 82 + id bigint primary key, 83 + gid uuid not null, 84 + name text not null, 85 + comment text, 86 + edits_pending int, 87 + last_update timestamptz 88 + ); 89 + 90 + create table mbz.genre_alias 91 + ( 92 + id bigint primary key, 93 + genre bigint not null, 94 + name text not null, 95 + locale text, 96 + edits_pending int, 97 + last_update timestamptz, 98 + type int, 99 + sort_name text not null, 100 + begin_date_year int, 101 + begin_date_month int, 102 + begin_date_day int, 103 + end_date_year int, 104 + end_date_month int, 105 + end_date_day int, 106 + primary_for_locale bool not null, 107 + ended bool not null, 108 + ); 109 + 110 + create table mbz.isrc 111 + ( 112 + id bigint primary key, 113 + recording bigint not null, 114 + isrc text not null, 115 + source int, 116 + edits_pending int, 117 + created timestamptz 118 + ); 119 + 120 + create table mbz.medium 121 + ( 122 + id bigint primary key, 123 + release bigint not null, 124 + position int not null, 125 + format int, 126 + name text, 127 + edits_pending int, 128 + last_update timestamptz, 129 + track_count int, 130 + gid uuid not null, 131 + ); 132 + 133 + create table mbz.recording 134 + ( 135 + id bigint primary key, 136 + gid uuid not null, 137 + name text not null, 138 + artist_credit bigint not null, 139 + length int, 140 + comment text, 141 + edits_pending int, 142 + last_update timestamptz, 143 + video bool not null, 144 + ); 145 + 146 + create table mbz.recording_alias 147 + ( 148 + id bigint primary key, 149 + recording bigint not null, 150 + name text not null, 151 + locale text, 152 + edits_pending int, 153 + last_update timestamptz, 154 + type int, 155 + sort_name text not null, 156 + begin_date_year int, 157 + begin_date_month int, 158 + begin_date_day int, 159 + end_date_year int, 160 + end_date_month int, 161 + end_date_day int, 162 + primary_for_locale bool not null, 163 + ended bool not null, 164 + ); 165 + 166 + create table mbz.recording_gid_redirect 167 + ( 168 + gid text primary key, 169 + new_id bigint not null, 170 + created timestamptz not null 171 + ); 172 + 173 + create table mbz.release 174 + ( 175 + id bigint primary key, 176 + gid uuid not null, 177 + name text not null, 178 + artist_credit bigint not null, 179 + release_group bigint not null, 180 + status int, 181 + packaging int, 182 + language int, 183 + script int, 184 + barcode text, 185 + comment text, 186 + edits_pending int, 187 + quality int, 188 + last_update timestamptz 189 + ); 190 + 191 + create table mbz.release_alias 192 + ( 193 + id bigint primary key, 194 + release bigint not null, 195 + name text not null, 196 + locale text, 197 + edits_pending int, 198 + last_update timestamptz, 199 + type int, 200 + sort_name text not null, 201 + begin_date_year int, 202 + begin_date_month int, 203 + begin_date_day int, 204 + end_date_year int, 205 + end_date_month int, 206 + end_date_day int, 207 + primary_for_locale bool not null, 208 + ended bool not null, 209 + ); 210 + 211 + create table mbz.release_gid_redirect 212 + ( 213 + gid text primary key, 214 + new_id bigint not null, 215 + created timestamptz not null 216 + ); 217 + 218 + create table mbz.release_group 219 + ( 220 + id bigint primary key, 221 + gid uuid not null, 222 + name text not null, 223 + artist_credit bigint not null, 224 + type int, 225 + comment text, 226 + edits_pending int, 227 + last_update timestamptz 228 + ); 229 + 230 + create table mbz.release_group_alias 231 + ( 232 + id bigint primary key, 233 + release_group bigint not null, 234 + name text not null, 235 + locale text, 236 + edits_pending int, 237 + last_update timestamptz, 238 + type int, 239 + sort_name text not null, 240 + begin_date_year int, 241 + begin_date_month int, 242 + begin_date_day int, 243 + end_date_year int, 244 + end_date_month int, 245 + end_date_day int, 246 + primary_for_locale bool not null, 247 + ended bool not null, 248 + ); 249 + 250 + create table mbz.release_group_gid_redirect 251 + ( 252 + gid text primary key, 253 + new_id bigint not null, 254 + created timestamptz not null 255 + ); 256 + 257 + create table mbz.track 258 + ( 259 + id bigint primary key, 260 + gid uuid not null, 261 + recording bigint not null, 262 + medium bigint not null, 263 + position int, 264 + number text, 265 + name text not null, 266 + artist_credit bigint not null, 267 + length int, 268 + edits_pending int, 269 + last_updated timestamptz, 270 + is_data_track bool not null 271 + 272 + ); 273 + 274 + create table mbz.track_gid_redirect 275 + ( 276 + gid text primary key, 277 + new_id bigint not null, 278 + created timestamptz not null 279 + ); 280 + commit;
+3
src/mbz/mod.rs
··· 1 + mod replica; 2 + 3 + pub use replica::start_replication;
+183
src/mbz/replica.rs
··· 1 + use async_compression::tokio::bufread::BzDecoder; 2 + use duckdb::{DuckdbConnectionManager, params, OptionalExt}; 3 + use futures::{StreamExt, TryStreamExt}; 4 + use r2d2::ManageConnection; 5 + use reqwest::Client; 6 + use std::path::{Path, PathBuf}; 7 + use std::str::FromStr; 8 + use std::sync::Arc; 9 + use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, BufReader}; 10 + use tokio_util::compat::FuturesAsyncReadCompatExt; 11 + use tracing::{debug, info}; 12 + 13 + const CUR_SEQ_SCHEMA: u32 = 30; 14 + const MBZ_API_BASE: &str = "https://metabrainz.org/api/musicbrainz"; 15 + const MBZ_FTP_DUMP: &str = "https://ftp.musicbrainz.org/pub/musicbrainz/data/fullexport"; 16 + const WANTED_TABLES: &[&str] = &[ 17 + "artist", 18 + "artist_alias", 19 + "artist_credit", 20 + "artist_credit_name", 21 + "artist_gid_redirect", 22 + "genre", 23 + "genre_alias", 24 + "isrc", 25 + "medium", 26 + "recording", 27 + "recording_alias", 28 + "recording_gid_redirect", 29 + "release", 30 + "release_alias", 31 + "release_gid_redirect", 32 + "release_group", 33 + "release_group_alias", 34 + "release_group_gid_redirect", 35 + "track", 36 + "track_gid_redirect", 37 + ]; 38 + 39 + fn check_mbz_schema(db: &DuckdbConnectionManager) -> eyre::Result<bool> { 40 + let conn = db.connect()?; 41 + let maybe_schema = conn.query_row( 42 + "SELECT schema_name FROM duckdb_schemas() WHERE schema_name='mbz'", 43 + params![], 44 + |row| {row.get::<_, String>(0)}, 45 + ).optional()?; 46 + 47 + Ok(maybe_schema.is_some()) 48 + } 49 + 50 + pub async fn start_replication( 51 + db: Arc<DuckdbConnectionManager>, 52 + dump: Option<String>, 53 + agent: String, 54 + ) -> eyre::Result<()> { 55 + let client = Client::builder().user_agent(agent).build()?; 56 + 57 + if !check_mbz_schema(&db)? { 58 + replica_init(db.clone(), &client, dump).await?; 59 + } else { 60 + debug!("skipping initial load - schema mbz exists"); 61 + } 62 + 63 + // TODO: start hourly replication syncs 64 + 65 + Ok(()) 66 + } 67 + 68 + /// Prepares the replica for the first time 69 + async fn replica_init( 70 + db: Arc<DuckdbConnectionManager>, 71 + client: &Client, 72 + dump: Option<String>, 73 + ) -> eyre::Result<()> { 74 + // create a temp folder to use for downloading and extraction 75 + let tmp = tempfile::tempdir()?; 76 + let dir = tmp.path(); 77 + 78 + match dump { 79 + Some(dump) => { 80 + info!("skipping dump download as path provided"); 81 + let compressed = tokio::fs::File::open(&dump).await?; 82 + let compressed = BufReader::new(compressed); 83 + replica_extract(compressed, dir.to_path_buf()).await?; 84 + } 85 + None => { 86 + let compressed = replica_get_dump(client).await?; 87 + replica_extract(compressed, dir.to_path_buf()).await?; 88 + } 89 + } 90 + 91 + replica_import_dump(db, &dir).await?; 92 + 93 + tmp.close()?; 94 + 95 + Ok(()) 96 + } 97 + 98 + /// Downloads the full Musicbrainz dump file 99 + async fn replica_get_dump(client: &Client) -> eyre::Result<impl AsyncRead + AsyncBufRead> { 100 + let latest = client 101 + .get(format!("{MBZ_FTP_DUMP}/LATEST")) 102 + .send() 103 + .await? 104 + .error_for_status()? 105 + .text() 106 + .await?; 107 + 108 + info!("downloading musicbrainz dump {latest}"); 109 + let dump_res = client 110 + .get(format!("{MBZ_FTP_DUMP}/{latest}/mbdump.tar.bz2")) 111 + .send() 112 + .await? 113 + .error_for_status()?; 114 + let dump_stream = dump_res.bytes_stream(); 115 + let dump = dump_stream 116 + .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e)) 117 + .into_async_read() 118 + .compat(); 119 + 120 + Ok(dump) 121 + } 122 + 123 + async fn replica_extract<T>(compressed: T, tmpdir: PathBuf) -> eyre::Result<u32> 124 + where 125 + T: AsyncRead + AsyncBufRead + Unpin, 126 + { 127 + debug!("extracting musicbrainz dump to {}", tmpdir.display()); 128 + 129 + let bzd = BzDecoder::new(compressed); 130 + let mut seq_replication = 0; 131 + 132 + let mut archive = tokio_tar::Archive::new(bzd); 133 + let mut entries = archive.entries()?; 134 + 135 + while let Some(entry) = entries.next().await { 136 + let mut entry = entry?; 137 + let hdr = entry.header(); 138 + let path = hdr.path()?; 139 + 140 + // check for some files we need 141 + if path.as_os_str() == "REPLICATION_SEQUENCE" { 142 + let mut out = String::new(); 143 + entry.read_to_string(&mut out).await?; 144 + seq_replication = u32::from_str(out.trim())?; 145 + 146 + debug!("got replication sequence {seq_replication}"); 147 + } else if path.as_os_str() == "SCHEMA_SEQUENCE" { 148 + let mut out = String::new(); 149 + entry.read_to_string(&mut out).await?; 150 + if u32::from_str(out.trim())? != CUR_SEQ_SCHEMA { 151 + eyre::bail!("Current schema is not 30 - check if Flashback needs an update!"); 152 + } 153 + } else if let Some(path) = path.strip_prefix("mbdump").ok() { 154 + if WANTED_TABLES.contains(&path.to_str().unwrap()) { 155 + debug!("unpacking {} ({} bytes)", path.display(), hdr.size()?); 156 + entry.unpack_in(&tmpdir).await?; 157 + } 158 + } 159 + } 160 + 161 + Ok(seq_replication) 162 + } 163 + 164 + /// Imports the downloaded dump into duckdb 165 + async fn replica_import_dump(db: Arc<DuckdbConnectionManager>, dump: &Path) -> eyre::Result<()> { 166 + let db = db.connect()?; 167 + let dump = dump.join("mbdump"); 168 + 169 + debug!("creating musicbrainz tables"); 170 + db.execute_batch(include_str!("init.sql"))?; 171 + 172 + for table in WANTED_TABLES { 173 + debug!("importing dump - mbz.{table}"); 174 + db.execute( 175 + &format!("COPY mbz.{table} FROM ? (NULLSTR '\\N', QUOTE '')"), 176 + params![dump.join(table).to_string_lossy()], 177 + )?; 178 + } 179 + 180 + info!("musicbrainz dump imported successfully!"); 181 + 182 + Ok(()) 183 + }