very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[indexer,relay] use ahash for calculating shard indices, suppress wrong hosts messages after a host accumulates enough of them

dawn 07832329 30f53f5d

+57 -16
+21
Cargo.lock
··· 18 18 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 19 20 20 [[package]] 21 + name = "ahash" 22 + version = "0.8.12" 23 + source = "registry+https://github.com/rust-lang/crates.io-index" 24 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 25 + dependencies = [ 26 + "cfg-if", 27 + "getrandom 0.3.4", 28 + "once_cell", 29 + "version_check", 30 + "zerocopy", 31 + ] 32 + 33 + [[package]] 21 34 name = "aho-corasick" 22 35 version = "1.1.4" 23 36 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1470 1483 name = "hydrant" 1471 1484 version = "0.1.0" 1472 1485 dependencies = [ 1486 + "ahash", 1473 1487 "arc-swap", 1474 1488 "axum", 1475 1489 "bytes", ··· 1492 1506 "miette", 1493 1507 "mimalloc", 1494 1508 "multibase", 1509 + "nohash-hasher", 1495 1510 "parking_lot", 1496 1511 "rand 0.10.0", 1497 1512 "reqwest", ··· 2295 2310 "wasm-bindgen-futures", 2296 2311 "web-time", 2297 2312 ] 2313 + 2314 + [[package]] 2315 + name = "nohash-hasher" 2316 + version = "0.2.0" 2317 + source = "registry+https://github.com/rust-lang/crates.io-index" 2318 + checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" 2298 2319 2299 2320 [[package]] 2300 2321 name = "nu-ansi-term"
+2
Cargo.toml
··· 60 60 sha2 = "0.10.9" 61 61 parking_lot = "0.12.5" 62 62 hyper = "1.8.1" 63 + nohash-hasher = "0.2.0" 64 + ahash = "0.8.12" 63 65 64 66 [dev-dependencies] 65 67 tempfile = "3.26.0"
+2 -8
src/ingest/indexer.rs
··· 2 2 use crate::db::{self, keys, ser_repo_metadata}; 3 3 use crate::ingest::stream::{Account, Commit, Identity}; 4 4 use crate::ingest::validation; 5 - use crate::ops; 6 5 use crate::resolver::{NoSigningKeyError, ResolverError}; 7 6 use crate::state::AppState; 8 7 use crate::types::{ 9 8 AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoMetadata, RepoState, RepoStatus, 10 9 }; 10 + use crate::{ops, util}; 11 11 12 12 use fjall::OwnedWriteBatch; 13 13 ··· 16 16 use jacquard_common::types::did::Did; 17 17 use jacquard_repo::error::CommitError; 18 18 use miette::{Diagnostic, IntoDiagnostic, Result}; 19 - use std::collections::hash_map::DefaultHasher; 20 - use std::hash::{Hash, Hasher}; 21 19 use std::sync::Arc; 22 20 use std::sync::atomic::Ordering::SeqCst; 23 21 use thiserror::Error; ··· 178 176 IndexerMessage::BackfillFinished(did) => did, 179 177 }; 180 178 181 - let mut hasher = DefaultHasher::new(); 182 - did.hash(&mut hasher); 183 - let hash = hasher.finish(); 184 - let shard_idx = (hash as usize) % self.num_shards; 185 - 179 + let shard_idx = (util::hash(did) as usize) % self.num_shards; 186 180 if let Err(e) = shards[shard_idx].send(msg) { 187 181 error!(shard = shard_idx, err = %e, "failed to send message to shard, shard panicked?"); 188 182 break;
+23 -7
src/ingest/relay.rs
··· 1 - use std::collections::hash_map::DefaultHasher; 2 - use std::hash::{Hash, Hasher}; 1 + use std::collections::HashMap; 3 2 use std::sync::Arc; 4 3 #[cfg(feature = "relay")] 5 4 use std::sync::atomic::Ordering; ··· 33 32 #[cfg(feature = "relay")] 34 33 use crate::types::RelayBroadcast; 35 34 use crate::types::{RepoState, RepoStatus}; 35 + use crate::util; 36 36 use smol_str::{SmolStr, ToSmolStr}; 37 37 38 38 struct WorkerContext<'a> { ··· 47 47 #[cfg(feature = "indexer")] 48 48 hook: crate::ingest::indexer::IndexerTx, 49 49 http: reqwest::Client, 50 + error_counts: HashMap<u64, u32, nohash_hasher::BuildNoHashHasher<u64>>, 50 51 } 51 52 52 53 struct WorkerMessage { ··· 151 152 SubscribeReposMessage::Sync(s) => &s.did, 152 153 _ => continue, 153 154 }; 154 - let mut hasher = DefaultHasher::new(); 155 - did.hash(&mut hasher); 156 - let idx = (hasher.finish() as usize) % self.num_shards; 157 - idx 155 + (util::hash(did) as usize) % self.num_shards 158 156 }; 159 157 160 158 if let Err(e) = shards[shard_idx].send(WorkerMessage { ··· 199 197 #[cfg(feature = "indexer")] 200 198 hook, 201 199 http, 200 + error_counts: Default::default(), 202 201 }; 203 202 204 203 while let Some(msg) = rx.blocking_recv() { ··· 254 253 { 255 254 let outcome = ctx.check_host_authority(did, &mut repo_state, host)?; 256 255 if let AuthorityOutcome::WrongHost { expected } = outcome { 257 - warn!(got = host, expected = %expected, "message rejected: wrong host"); 256 + if !ctx.inc_error(host) { 257 + warn!(got = host, expected = %expected, "message rejected: wrong host"); 258 + } 258 259 return Ok(()); 259 260 } 261 + ctx.reset_error(host); 260 262 } 261 263 262 264 match msg.msg { ··· 567 569 } 568 570 569 571 impl WorkerContext<'_> { 572 + /// increments host error counter, returns if host is suppressed or not 573 + fn inc_error(&mut self, host: &str) -> bool { 574 + let error_count = self.error_counts.entry(util::hash(&host)).or_default(); 575 + let is_suppressed = *error_count > 50; 576 + *error_count += 1; 577 + is_suppressed 578 + } 579 + 580 + fn reset_error(&mut self, host: &str) { 581 + if let Some(count) = self.error_counts.get_mut(&util::hash(&host)) { 582 + *count = 0; 583 + } 584 + } 585 + 570 586 fn check_host_authority( 571 587 &mut self, 572 588 did: &Did,
+9 -1
src/util/mod.rs
··· 1 - use std::time::Duration; 1 + use std::{hash::Hash, time::Duration}; 2 2 3 3 use jacquard_common::{deps::fluent_uri, types::string::Handle}; 4 4 use rand::RngExt; ··· 217 217 pub(crate) fn invalid_handle() -> Handle<'static> { 218 218 unsafe { Handle::unchecked("handle.invalid") } 219 219 } 220 + 221 + /// returns hash of value using ahash 222 + pub fn hash<T: Hash>(val: &T) -> u64 { 223 + use std::hash::Hasher; 224 + let mut hasher = ahash::AHasher::default(); 225 + val.hash(&mut hasher); 226 + hasher.finish() 227 + }