don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(knot): cleanup jetstream init and make consumer optional

Signed-off-by: tjh <x@tjh.dev>

tjh 236184c9 414b4bca

+77 -74
+11 -13
crates/knot/src/cli.rs
··· 3 3 use gix::bstr::BString; 4 4 use knot::model::config::{DEFAULT_READMES, KnotConfiguration, RepoCacheConfig}; 5 5 use std::{env, path::PathBuf, time::Duration}; 6 - use url::Url; 7 6 8 7 pub fn parse() -> Arguments { 9 8 Arguments::parse() ··· 49 50 #[arg(default_value = "https://plc.directory")] 50 51 pub plc_directory: String, 51 52 52 - #[arg(long, short, value_delimiter = ',', value_hint = ValueHint::Url, env = "KNOT_JETSREAM")] 53 + #[arg(long, short, value_delimiter = ',', value_hint = ValueHint::Url, env = "KNOT_JETSTREAM")] 53 54 #[arg(default_value = default_jetstream_instances())] 54 - pub jetstream: Vec<Url>, 55 + pub jetstream: Vec<String>, 55 56 56 57 /// Acceptable authorization methods for git pushes over http. 57 58 #[arg(hide = true, long, require_equals = true, value_delimiter = ',')] ··· 82 83 pub repo_cache_live: u64, 83 84 } 84 85 85 - #[derive(Debug, thiserror::Error)] 86 - pub enum Error { 87 - #[error("unable to build 'did:web:{{name}}' from knot fqdn: {0}")] 88 - Name(#[from] atproto::did::Error), 89 - } 90 - 91 - impl TryFrom<Arguments> for KnotConfiguration { 92 - type Error = Error; 93 - fn try_from(value: Arguments) -> Result<KnotConfiguration, Self::Error> { 86 + impl Arguments { 87 + pub fn to_knot_config(&self) -> Result<KnotConfiguration, Error> { 94 88 let Arguments { 95 89 name, 96 90 owner, ··· 99 107 repo_cache_size, 100 108 repo_cache_idle, 101 109 repo_cache_live, 102 - } = value; 110 + } = self.clone(); 103 111 104 112 // @TODO Validate? 105 113 ··· 122 130 }, 123 131 }) 124 132 } 133 + } 134 + 135 + #[derive(Debug, thiserror::Error)] 136 + pub enum Error { 137 + #[error("unable to build 'did:web:{{name}}' from knot fqdn: {0}")] 138 + Name(#[from] atproto::did::Error), 125 139 } 126 140 127 141 #[derive(Clone, Debug, ValueEnum)]
+8 -58
crates/knot/src/main.rs
··· 1 1 mod cli; 2 2 mod hooks; 3 3 4 - use anyhow::Context; 4 + use anyhow::Context as _; 5 5 use atproto::tid::Tid; 6 6 use axum::{ 7 7 extract::Query, 8 8 http::{HeaderName, Request, Response}, 9 9 }; 10 - use futures_util::StreamExt as _; 10 + use futures_util::FutureExt as _; 11 11 use identity::Resolver; 12 - use jetstream::client_config::JetstreamConfig; 13 12 use knot::{ 14 13 model::{Knot, KnotState, config::KnotConfiguration}, 15 14 services::database::DataStore, 16 15 }; 17 16 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; 18 17 use std::{ 19 - borrow::Cow, 20 18 collections::BTreeMap, 21 19 env, 22 20 ffi::OsStr, 23 - net::ToSocketAddrs, 21 + net::ToSocketAddrs as _, 24 22 sync::{Arc, atomic::AtomicU64}, 25 23 time::Duration, 26 24 }; ··· 188 190 189 191 tracing::info!(?private_addrs, "bound internal API"); 190 192 191 - let jetstream_instances: Vec<_> = arguments 192 - .jetstream 193 - .iter() 194 - .map(|url| Cow::Owned(url.to_string())) 195 - .collect(); 196 - 197 - let config: KnotConfiguration = arguments.try_into()?; 193 + let config: KnotConfiguration = arguments.to_knot_config()?; 198 194 let knot_state = KnotState::new(config, resolver, public_http, database, &private_addrs)?; 199 195 let knot = Knot::from(knot_state); 200 196 ··· 209 217 } 210 218 211 219 // Spawn the jetstream consumer. 212 - { 213 - let cursor = knot 214 - .database() 215 - .get_jetstream_cursor() 216 - .await? 217 - .map(|odt| (odt, (odt.unix_timestamp_nanos() / 1000).unsigned_abs())); 218 - 219 - if let Some((cursor, cursor_us)) = &cursor { 220 - tracing::info!(?cursor, ?cursor_us, "found jetstream cursor"); 221 - } 222 - 223 - let mut config = JetstreamConfig::default() 224 - .with_instances(jetstream_instances) 225 - .with_cursor(cursor.map(|(_, us)| us)) 226 - .with_collections([ 227 - "sh.tangled.knot.member", 228 - "sh.tangled.publicKey", 229 - "sh.tangled.repo", 230 - "sh.tangled.repo.collaborator", 231 - ]); 232 - 233 - let mut member_dids = knot.database().members(); 234 - while let Some(did) = member_dids.next().await { 235 - config 236 - .subscriber_options 237 - .add_did(did?) 238 - .expect("knot members shouldn't exceed maximum DID filters"); 239 - } 240 - 241 - let (jetstream, jetstream_rx, jetstream_task) = config.connect(); 242 - let knot = knot.clone(); 243 - service.spawn(async move { 244 - tokio::join!(knot::services::jetstream::jetstream_task( 245 - jetstream, 246 - knot.clone(), 247 - jetstream_rx, 248 - ),); 249 - 250 - panic!("jetstream consumer/task completed"); 251 - }); 252 - 253 - service.spawn(async move { 254 - jetstream_task.await; 255 - Ok(()) 256 - }); 257 - }; 220 + service.spawn( 221 + knot::services::jetstream::init_consumer(&knot, arguments.jetstream.as_slice()) 222 + .map(|_| Ok(())), 223 + ); 258 224 259 225 // Build the public API. 260 226 let router = knot::public::router()
+58 -3
crates/knot/src/services/jetstream.rs
··· 6 6 RepositoryDeletePolicy, RepositoryRef, 7 7 }, 8 8 }; 9 - use jetstream::{CommitEvent, Event, JetstreamClient}; 9 + use futures_util::StreamExt as _; 10 + use jetstream::{CommitEvent, Event, JetstreamClient, client_config::JetstreamConfig}; 10 11 use lexicon::Lexicon; 11 - use std::time::Duration; 12 + use std::{borrow::Cow, time::Duration}; 12 13 use tokio::time::Instant; 13 14 14 - pub async fn jetstream_task( 15 + pub fn init_consumer<T: AsRef<str>>( 16 + knot: &Knot, 17 + instances: &[T], 18 + ) -> impl Future<Output = anyhow::Result<()>> + use<T> { 19 + let knot = knot.clone(); 20 + let jetstream_instances: Vec<_> = instances 21 + .iter() 22 + .filter(|s| !s.as_ref().is_empty()) 23 + .map(|url| Cow::Owned(url.as_ref().to_string())) 24 + .collect(); 25 + 26 + async move { 27 + if jetstream_instances.is_empty() { 28 + tracing::warn!("no jetstream instances provided"); 29 + return Ok(()); 30 + } 31 + 32 + let cursor = knot 33 + .database() 34 + .get_jetstream_cursor() 35 + .await? 36 + .map(|odt| (odt, (odt.unix_timestamp_nanos() / 1000).unsigned_abs())); 37 + 38 + if let Some((cursor, cursor_us)) = &cursor { 39 + tracing::info!(?cursor, ?cursor_us, "found jetstream cursor"); 40 + } 41 + 42 + let mut config = JetstreamConfig::default() 43 + .with_instances(jetstream_instances) 44 + .with_cursor(cursor.map(|(_, us)| us)) 45 + .with_collections([ 46 + "sh.tangled.knot.member", 47 + "sh.tangled.publicKey", 48 + "sh.tangled.repo", 49 + "sh.tangled.repo.collaborator", 50 + ]); 51 + 52 + { 53 + let mut member_dids = knot.database().members(); 54 + while let Some(did) = member_dids.next().await { 55 + config 56 + .subscriber_options 57 + .add_did(did?) 58 + .expect("knot members shouldn't exceed maximum DID filters"); 59 + } 60 + } 61 + 62 + let (jetstream, jetstream_rx, jetstream_rx_task) = config.connect(); 63 + 64 + tokio::join!(consume(jetstream, knot, jetstream_rx), jetstream_rx_task); 65 + Ok(()) 66 + } 67 + } 68 + 69 + pub async fn consume( 15 70 client: JetstreamClient, 16 71 knot: Knot, 17 72 jetstream_rx: jetstream::JetstreamReceiver,