Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
67
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: clean up consumer main

feat: replace parakeet-index with pkdb

Mia e26332f2 885cb966

+189 -133
-18
crates/consumer/src/cmd.rs
··· 1 - use clap::Parser; 2 - 3 - #[derive(Debug, Parser)] 4 - pub struct Cli { 5 - /// Run backfill threads 6 - #[arg(long, default_value_t = false)] 7 - pub backfill: bool, 8 - /// Run the firehose consumer and indexer 9 - #[arg(long, default_value_t = false)] 10 - pub indexer: bool, 11 - /// Connect to label services and ingest labels 12 - #[arg(long, default_value_t = false)] 13 - pub labels: bool, 14 - } 15 - 16 - pub fn parse() -> Cli { 17 - Cli::parse() 18 - }
+1 -1
crates/consumer/src/config.rs
··· 2 2 use figment::Figment; 3 3 use serde::Deserialize; 4 4 5 - pub(crate) fn load_config() -> eyre::Result<Config> { 5 + pub fn load_config() -> eyre::Result<Config> { 6 6 let conf = Figment::new() 7 7 .merge(Toml::file("Config.toml")) 8 8 .merge(Env::prefixed("PKC_").split("__"))
+160
crates/consumer/src/lib.rs
··· 1 + use std::collections::HashSet; 2 + 3 + use clap::Parser; 4 + use deadpool_postgres::Pool; 5 + use jacquard_identity::JacquardResolver; 6 + use redis::aio::MultiplexedConnection; 7 + use tokio::sync::watch::Receiver as WatchReceiver; 8 + use tokio_util::task::TaskTracker; 9 + 10 + mod backfill; 11 + pub mod config; 12 + mod db; 13 + mod firehose; 14 + mod indexer; 15 + pub mod instrumentation; 16 + mod label_indexer; 17 + mod utils; 18 + 19 + #[derive(Debug, Parser)] 20 + pub struct Cli { 21 + /// Run backfill threads 22 + #[arg(long, default_value_t = false)] 23 + pub backfill: bool, 24 + /// Run the firehose consumer and indexer 25 + #[arg(long, default_value_t = false)] 26 + pub indexer: bool, 27 + /// Connect to label services and ingest labels 28 + #[arg(long, default_value_t = false)] 29 + pub labels: bool, 30 + } 31 + 32 + pub fn cli_parse() -> Cli { 33 + Cli::parse() 34 + } 35 + 36 + #[derive(Clone)] 37 + pub struct State { 38 + pub pool: Pool, 39 + pub rc: MultiplexedConnection, 40 + pub resolver: JacquardResolver, 41 + pub stop: WatchReceiver<bool>, 42 + } 43 + 44 + pub async fn run_labels( 45 + state: State, 46 + ua: &str, 47 + resume: sled::Db, 48 + src: String, 49 + ) -> eyre::Result<impl std::future::Future<Output = eyre::Result<()>>> { 50 + let mgr = label_indexer::LabelServiceManager::new(state.pool, resume, ua.to_string()).await?; 51 + 52 + let fut = mgr.run(src, state.stop); 53 + 54 + Ok(fut) 55 + } 56 + 57 + pub async fn run_backfill( 58 + state: State, 59 + idxc: Option<parakeet_index::Client>, 60 + conf: config::BackfillConfig, 61 + ) -> eyre::Result<impl std::future::Future<Output = eyre::Result<()>>> { 62 + let mgr = 63 + backfill::BackfillManager::new(state.pool, state.rc, state.resolver, idxc, conf).await?; 64 + 65 + let fut = mgr.run(state.stop); 66 + 67 + Ok(fut) 68 + } 69 + 70 + pub async fn run_indexer( 71 + state: State, 72 + tracker: &TaskTracker, 73 + idxc: parakeet_index::Client, 74 + ua: &str, 75 + resume: sled::Db, 76 + conf: config::IndexerConfig, 77 + ) -> eyre::Result<()> { 78 + let (idxc_tx, idxc_rx) = tokio::sync::mpsc::channel(128); 79 + 80 + let start_seq = resume 81 + .get("firehose")? 82 + .and_then(utils::u64_from_ivec) 83 + .or(conf.start_commit_seq); 84 + 85 + if let Some(start_seq) = start_seq { 86 + tracing::info!("starting firehose consumer from {start_seq}"); 87 + } 88 + 89 + let allowlist = conf.allowlist.map(HashSet::from_iter); 90 + if let Some(l) = &allowlist { 91 + tracing::info!("running with allowlist enabled ({} DIDs)", l.len()); 92 + } 93 + 94 + let relay_firehose = 95 + firehose::FirehoseConsumer::new_relay(&conf.relay_source, start_seq, ua).await?; 96 + 97 + let indexer_opts = indexer::RelayIndexerOpts { 98 + history_mode: conf.history_mode, 99 + skip_handle_validation: conf.skip_handle_validation, 100 + request_backfill: conf.request_backfill, 101 + allowlist, 102 + }; 103 + 104 + let relay_indexer = indexer::RelayIndexer::new( 105 + state.pool, 106 + state.rc, 107 + idxc_tx, 108 + state.resolver, 109 + relay_firehose, 110 + resume, 111 + indexer_opts, 112 + ) 113 + .await?; 114 + 115 + tracker.spawn(relay_indexer.run(conf.workers, state.stop)); 116 + tracker.spawn(index_transport(idxc, idxc_rx)); 117 + 118 + Ok(()) 119 + } 120 + 121 + async fn index_transport( 122 + mut idxc: parakeet_index::Client, 123 + rx: tokio::sync::mpsc::Receiver<parakeet_index::AggregateDeltaReq>, 124 + ) -> eyre::Result<()> { 125 + use tokio_stream::wrappers::ReceiverStream; 126 + 127 + idxc.submit_aggregate_delta_stream(ReceiverStream::new(rx)) 128 + .await?; 129 + 130 + Ok(()) 131 + } 132 + 133 + pub fn build_ua(contact: &Option<String>) -> String { 134 + let mut ua = format!("Parakeet {}", env!("CARGO_PKG_VERSION")); 135 + 136 + if let Some(contact) = contact { 137 + ua += &format!(" ({contact})"); 138 + } 139 + 140 + ua 141 + } 142 + 143 + pub fn build_resolver(plc_dir: Option<String>) -> JacquardResolver { 144 + use jacquard_identity::resolver::{DidStep, HandleStep, PlcSource, ResolverOptions}; 145 + 146 + let plc_source = plc_dir 147 + .and_then(|url| url.parse().ok()) 148 + .map(|base| PlcSource::PlcDirectory { base }) 149 + .unwrap_or_default(); 150 + 151 + let opts = ResolverOptions::new() 152 + .plc_source(plc_source) 153 + .handle_order(vec![HandleStep::DnsTxt, HandleStep::HttpsWellKnown]) 154 + .did_order(vec![DidStep::DidWebHttps, DidStep::PlcHttp]) 155 + .validate_doc_id(true) 156 + .public_fallback_for_handle(false) 157 + .build(); 158 + 159 + JacquardResolver::new_dns(reqwest::Client::new(), opts).with_cache() 160 + }
+28 -114
crates/consumer/src/main.rs
··· 1 1 use deadpool_postgres::Runtime; 2 2 use eyre::OptionExt; 3 - use jacquard_identity::{resolver::ResolverOptions, JacquardResolver}; 4 3 use metrics_exporter_prometheus::PrometheusBuilder; 5 - use std::collections::HashSet; 6 4 use tokio::signal::ctrl_c; 7 5 use tokio_postgres::NoTls; 8 6 9 - mod backfill; 10 - mod cmd; 11 - mod config; 12 - mod db; 13 - mod firehose; 14 - mod indexer; 15 - mod instrumentation; 16 - mod label_indexer; 17 - mod utils; 7 + use consumer::{build_resolver, build_ua, cli_parse, run_backfill, run_indexer, run_labels}; 18 8 19 9 #[tokio::main] 20 10 async fn main() -> eyre::Result<()> { 21 - let cli = cmd::parse(); 22 - let conf = config::load_config()?; 11 + let cli = cli_parse(); 12 + let conf = consumer::config::load_config()?; 23 13 24 14 PrometheusBuilder::new() 25 15 .with_http_listener(std::net::SocketAddr::new( ··· 28 18 )) 29 19 .install()?; 30 20 31 - instrumentation::init_instruments(&conf.instruments); 21 + consumer::instrumentation::init_instruments(&conf.instruments); 32 22 let user_agent = build_ua(&conf.ua_contact); 33 23 34 24 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?; ··· 41 31 Ok::<_, eyre::Report>(()) 42 32 }); 43 33 44 - let resolver_opts = build_resolver_opts(conf.plc_directory); 45 - let resolver = JacquardResolver::new_dns(reqwest::Client::new(), resolver_opts).with_cache(); 34 + let resolver = build_resolver(conf.plc_directory); 46 35 47 36 let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 48 37 ··· 50 39 let (stop_tx, stop) = tokio::sync::watch::channel(false); 51 40 52 41 let resume = (cli.labels || cli.indexer) 53 - .then::<Result<_, eyre::Report>, _>(|| { 42 + .then::<eyre::Result<_>, _>(|| { 54 43 let resume_path = conf.resume_path.ok_or_eyre( 55 44 "Config item resume_path must be specified when using --indexer or --labels", 56 45 )?; ··· 61 50 }) 62 51 .transpose()?; 63 52 53 + let state = consumer::State { 54 + pool, 55 + rc: redis_conn, 56 + resolver, 57 + stop, 58 + }; 59 + 64 60 if cli.labels { 65 61 let resume = resume.clone().unwrap(); 62 + let source = conf 63 + .label_source 64 + .ok_or_eyre("Label source must be specified")?; 66 65 67 - let label_mgr = 68 - label_indexer::LabelServiceManager::new(pool.clone(), resume, user_agent.clone()) 69 - .await?; 70 - 71 - if let Some(label_source) = conf.label_source { 72 - tracker.spawn(label_mgr.run(label_source, stop.clone())); 73 - } 66 + let fut = run_labels(state.clone(), &user_agent, resume, source).await?; 67 + tracker.spawn(fut); 74 68 } 75 69 76 70 if cli.backfill { 77 71 let bf_cfg = conf 78 72 .backfill 79 - .ok_or_eyre("Config item [backfill] must be specified when using --backfill")?; 73 + .ok_or_eyre("Config item [backfill] must be specified")?; 74 + let idxc = (!bf_cfg.skip_aggregation).then_some(index_client.clone()); 80 75 81 - let backfiller = backfill::BackfillManager::new( 82 - pool.clone(), 83 - redis_conn.clone(), 84 - resolver.clone(), 85 - (!bf_cfg.skip_aggregation).then_some(index_client.clone()), 86 - bf_cfg, 87 - ) 88 - .await?; 89 - 90 - tracker.spawn(backfiller.run(stop.clone())); 76 + let fut = run_backfill(state.clone(), idxc, bf_cfg).await?; 77 + tracker.spawn(fut); 91 78 } 92 79 93 80 if cli.indexer { ··· 95 82 96 83 let indexer_cfg = conf 97 84 .indexer 98 - .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?; 85 + .ok_or_eyre("Config item [indexer] must be specified")?; 99 86 100 - let (idxc_tx, idxc_rx) = tokio::sync::mpsc::channel(128); 101 - 102 - let start_seq = resume 103 - .get("firehose")? 104 - .and_then(utils::u64_from_ivec) 105 - .or(indexer_cfg.start_commit_seq); 106 - 107 - if let Some(start_seq) = start_seq { 108 - tracing::info!("starting firehose consumer from {start_seq}"); 109 - } 110 - 111 - let relay_firehose = firehose::FirehoseConsumer::new_relay( 112 - &indexer_cfg.relay_source, 113 - start_seq, 87 + run_indexer( 88 + state.clone(), 89 + &tracker, 90 + index_client, 114 91 &user_agent, 115 - ) 116 - .await?; 117 - 118 - let allowlist = indexer_cfg.allowlist.map(HashSet::from_iter); 119 - if let Some(l) = &allowlist { 120 - tracing::info!("running with allowlist enabled ({} DIDs)", l.len()); 121 - } 122 - 123 - let indexer_opts = indexer::RelayIndexerOpts { 124 - history_mode: indexer_cfg.history_mode, 125 - skip_handle_validation: indexer_cfg.skip_handle_validation, 126 - request_backfill: indexer_cfg.request_backfill, 127 - allowlist, 128 - }; 129 - 130 - let relay_indexer = indexer::RelayIndexer::new( 131 - pool.clone(), 132 - redis_conn.clone(), 133 - idxc_tx, 134 - resolver.clone(), 135 - relay_firehose, 136 92 resume, 137 - indexer_opts, 93 + indexer_cfg, 138 94 ) 139 95 .await?; 140 - 141 - tracker.spawn(relay_indexer.run(indexer_cfg.workers, stop)); 142 - tracker.spawn(index_transport(index_client, idxc_rx)); 143 96 } 144 97 145 98 tokio::spawn(async move { ··· 153 106 154 107 Ok(()) 155 108 } 156 - 157 - async fn index_transport( 158 - mut idxc: parakeet_index::Client, 159 - rx: tokio::sync::mpsc::Receiver<parakeet_index::AggregateDeltaReq>, 160 - ) -> eyre::Result<()> { 161 - use tokio_stream::wrappers::ReceiverStream; 162 - 163 - idxc.submit_aggregate_delta_stream(ReceiverStream::new(rx)) 164 - .await?; 165 - 166 - Ok(()) 167 - } 168 - 169 - fn build_ua(contact: &Option<String>) -> String { 170 - let mut ua = format!("Parakeet {}", env!("CARGO_PKG_VERSION")); 171 - 172 - if let Some(contact) = contact { 173 - ua += &format!(" ({contact})"); 174 - } 175 - 176 - ua 177 - } 178 - 179 - fn build_resolver_opts(plc_dir: Option<String>) -> ResolverOptions { 180 - use jacquard_identity::resolver::{DidStep, HandleStep, PlcSource}; 181 - 182 - let plc_source = plc_dir 183 - .and_then(|url| url.parse().ok()) 184 - .map(|base| PlcSource::PlcDirectory { base }) 185 - .unwrap_or_default(); 186 - 187 - ResolverOptions::new() 188 - .plc_source(plc_source) 189 - .handle_order(vec![HandleStep::DnsTxt, HandleStep::HttpsWellKnown]) 190 - .did_order(vec![DidStep::DidWebHttps, DidStep::PlcHttp]) 191 - .validate_doc_id(true) 192 - .public_fallback_for_handle(false) 193 - .build() 194 - }