Code and data for arewedecentralizedyet.online and related projects
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Rust re-write of at-mau-watcher

Significantly more efficient than the python version due to the
immutable/copy-on-write datastructres from the im crate. This prevents
all that wasteful copying to make a snapshot of the in-memory
datastructure, and doesn't require a lock.

It does use almost as much RAM as the Python version, which is
surprising and could use more investigation.

Needs more testing before I switch to it.

+952
+19
data-fetchers/at-mau-watcher/rs/Cargo.toml
··· 1 + [package] 2 + name = "at-mau-watcher" 3 + version = "0.1.0" 4 + edition = "2021" 5 + 6 + [dependencies] 7 + im = { version = "15", features = ["serde"] } 8 + serde = { version = "1", features = ["derive"] } 9 + serde_json = "1" 10 + chrono = { version = "0.4", features = ["serde"] } 11 + tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync", "time", "signal"] } 12 + reqwest = { version = "0.12", features = ["json", "rustls-tls"] } 13 + futures = "0.3" 14 + ipld-core = { version = "0.4.1", default-features = false, features = ["std"] } 15 + serde_ipld_dagcbor = { version = "0.6", features = ["std"] } 16 + tokio-tungstenite = { version = "0.23", features = ["rustls-tls-webpki-roots"] } 17 + # NOTE: Adjust these versions if your Atrium dependency differs. 18 + atrium-api = "0.25.7" 19 + atrium-xrpc = "0.12.4"
+21
data-fetchers/at-mau-watcher/rs/README.md
··· 1 + # at-mau-watcher (Rust) 2 + 3 + Track accounts seen on the ATProto firehose and write periodic JSON snapshots. 4 + 5 + ## Usage 6 + 7 + ```sh 8 + cargo run --release -- --relay wss://bsky.network/xrpc 9 + ``` 10 + 11 + Load a large snapshot produced elsewhere, but write to a new file: 12 + 13 + ```sh 14 + cargo run --release -- --snapshot-read-file /path/to/python_snapshot.json --snapshot-file accounts_snapshot.json 15 + ``` 16 + 17 + Show all options: 18 + 19 + ```sh 20 + cargo run --release -- --help 21 + ```
+912
data-fetchers/at-mau-watcher/rs/src/main.rs
··· 1 + use std::collections::HashMap; 2 + use std::fs; 3 + use std::borrow::Borrow; 4 + use std::hash::{Hash, Hasher}; 5 + use std::io::{BufWriter, Write}; 6 + use std::path::Path; 7 + use std::sync::Arc; 8 + use std::time::Duration; 9 + use std::{io::Cursor}; 10 + use std::sync::atomic::{AtomicU64, Ordering}; 11 + 12 + use chrono::{DateTime, TimeZone, Utc}; 13 + use futures::StreamExt; 14 + use im::hashset::HashSet; 15 + use ipld_core::ipld::Ipld; 16 + use serde::{Deserialize, Serialize, Serializer}; 17 + use serde::ser::SerializeMap; 18 + use serde_ipld_dagcbor as dagcbor; 19 + use tokio_tungstenite::tungstenite::Message; 20 + use tokio::sync::{mpsc, Mutex, Semaphore}; 21 + 22 + // Atrium imports (adjust if your Atrium API differs). 23 + use atrium_api::com::atproto::sync::subscribe_repos::{Account, Commit, Identity, NSID}; 24 + 25 + const SNAPSHOT_FILE: &str = "accounts_snapshot.json"; 26 + const SNAPSHOT_INTERVAL: u64 = 300; 27 + const RESOLVE_TTL_SECONDS: i64 = 24 * 60 * 60; 28 + const RESOLUTION_QUEUE_SIZE: usize = 10_000; 29 + const RESOLUTION_WORKERS: usize = 10; 30 + 31 + #[derive(Clone, Debug)] 32 + struct AccountEntry { 33 + // Box/Arc reduce per-entry allocation overhead; PDS is interned for sharing. 34 + did: Box<str>, 35 + pds: Option<Arc<str>>, 36 + handle: Option<Box<str>>, 37 + last_seen: i64, 38 + last_resolved: Option<i64>, 39 + } 40 + 41 + impl PartialEq for AccountEntry { 42 + fn eq(&self, other: &Self) -> bool { 43 + self.did == other.did 44 + } 45 + } 46 + 47 + impl Eq for AccountEntry {} 48 + 49 + impl Hash for AccountEntry { 50 + fn hash<H: Hasher>(&self, state: &mut H) { 51 + self.did.hash(state); 52 + } 53 + } 54 + 55 + impl Borrow<str> for AccountEntry { 56 + fn borrow(&self) -> &str { 57 + self.did.as_ref() 58 + } 59 + } 60 + 61 + #[derive(Clone, Debug, Serialize, Deserialize)] 62 + struct SnapshotEntry { 63 + pds: Option<Box<str>>, 64 + handle: Option<Box<str>>, 65 + #[serde(with = "iso8601")] 66 + last_seen: DateTime<Utc>, 67 + #[serde(with = "iso8601::option")] 68 + last_resolved: Option<DateTime<Utc>>, 69 + } 70 + 71 + #[derive(Default)] 72 + struct AccountStore { 73 + // Persistent set snapshot for lock-minimized writes. 74 + accounts: Arc<HashSet<AccountEntry>>, 75 + // Intern PDS strings to reduce memory when many accounts share the same PDS. 76 + pds_pool: HashMap<String, Arc<str>>, 77 + } 78 + 79 + impl AccountStore { 80 + fn intern_pds(&mut self, value: &str) -> Arc<str> { 81 + if let Some(existing) = self.pds_pool.get(value) { 82 + return existing.clone(); 83 + } 84 + let arc: Arc<str> = Arc::from(value); 85 + self.pds_pool.insert(value.to_string(), arc.clone()); 86 + arc 87 + } 88 + 89 + fn upsert(&mut self, entry: AccountEntry) { 90 + // Copy-on-write update to preserve snapshot sharing. 91 + let accounts = Arc::make_mut(&mut self.accounts); 92 + let _ = accounts.remove(entry.did.as_ref()); 93 + accounts.insert(entry); 94 + } 95 + 96 + fn update_last_seen( 97 + &mut self, 98 + did: &str, 99 + now: i64, 100 + force_resolve: bool, 101 + resolve_ttl_seconds: i64, 102 + ) -> bool { 103 + let mut need_resolve = false; 104 + // Copy-on-write update keeps snapshots cheap to clone. 105 + let accounts = Arc::make_mut(&mut self.accounts); 106 + let mut entry = accounts.remove(did).unwrap_or_else(|| AccountEntry { 107 + did: did.to_string().into_boxed_str(), 108 + pds: None, 109 + handle: None, 110 + last_seen: now, 111 + last_resolved: None, 112 + }); 113 + 114 + entry.last_seen = now; 115 + 116 + if force_resolve { 117 + need_resolve = true; 118 + } else if let Some(last_resolved) = entry.last_resolved { 119 + let age = now - last_resolved; 120 + if age >= resolve_ttl_seconds { 121 + need_resolve = true; 122 + } 123 + } else { 124 + need_resolve = true; 125 + } 126 + 127 + self.upsert(entry); 128 + need_resolve 129 + } 130 + } 131 + 132 + mod iso8601 { 133 + use chrono::{DateTime, Utc}; 134 + use serde::{Deserialize, Deserializer, Serializer}; 135 + 136 + pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error> 137 + where 138 + S: Serializer, 139 + { 140 + serializer.serialize_str(&value.to_rfc3339()) 141 + } 142 + 143 + pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error> 144 + where 145 + D: Deserializer<'de>, 146 + { 147 + let raw = String::deserialize(deserializer)?; 148 + DateTime::parse_from_rfc3339(&raw) 149 + .map(|dt| dt.with_timezone(&Utc)) 150 + .map_err(serde::de::Error::custom) 151 + } 152 + 153 + pub mod option { 154 + use chrono::{DateTime, Utc}; 155 + use serde::{Deserialize, Deserializer, Serializer}; 156 + 157 + pub fn serialize<S>(value: &Option<DateTime<Utc>>, serializer: S) -> Result<S::Ok, S::Error> 158 + where 159 + S: Serializer, 160 + { 161 + match value { 162 + Some(dt) => serializer.serialize_some(&dt.to_rfc3339()), 163 + None => serializer.serialize_none(), 164 + } 165 + } 166 + 167 + pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error> 168 + where 169 + D: Deserializer<'de>, 170 + { 171 + let raw = Option::<String>::deserialize(deserializer)?; 172 + match raw { 173 + Some(raw) => DateTime::parse_from_rfc3339(&raw) 174 + .map(|dt| Some(dt.with_timezone(&Utc))) 175 + .map_err(serde::de::Error::custom), 176 + None => Ok(None), 177 + } 178 + } 179 + } 180 + } 181 + 182 + fn load_snapshot(path: &str) -> HashMap<Box<str>, SnapshotEntry> { 183 + if !Path::new(path).exists() { 184 + return HashMap::new(); 185 + } 186 + 187 + let data = match fs::read_to_string(path) { 188 + Ok(data) => data, 189 + Err(_) => return HashMap::new(), 190 + }; 191 + 192 + let parsed: HashMap<Box<str>, SnapshotEntry> = match serde_json::from_str(&data) { 193 + Ok(parsed) => parsed, 194 + Err(_) => return HashMap::new(), 195 + }; 196 + 197 + parsed 198 + } 199 + 200 + fn write_snapshot_file(path: &str, accounts: Arc<HashSet<AccountEntry>>, verbose: bool) { 201 + let tmp_path = format!("{path}.tmp"); 202 + 203 + if let Ok(file) = fs::File::create(&tmp_path) { 204 + let mut writer = BufWriter::new(file); 205 + let write_result = (|| { 206 + // Stream JSON without building a full map in memory. 207 + let mut serializer = serde_json::Serializer::new(&mut writer); 208 + let mut map = serializer.serialize_map(Some(accounts.len()))?; 209 + for entry in accounts.iter() { 210 + let last_seen = Utc.timestamp_opt(entry.last_seen, 0).single().unwrap_or_else(|| { 211 + Utc.timestamp_opt(0, 0).single().unwrap() 212 + }); 213 + let last_resolved = entry.last_resolved.and_then(|ts| { 214 + Utc.timestamp_opt(ts, 0).single() 215 + }); 216 + let snapshot = SnapshotEntry { 217 + pds: entry.pds.as_deref().map(|pds| pds.to_string().into_boxed_str()), 218 + handle: entry.handle.clone(), 219 + last_seen, 220 + last_resolved, 221 + }; 222 + map.serialize_entry(&entry.did, &snapshot)?; 223 + } 224 + map.end() 225 + })(); 226 + if write_result.is_ok() { 227 + let _ = writer.flush(); 228 + drop(writer); 229 + let _ = fs::rename(&tmp_path, path); 230 + if verbose { 231 + println!("Saved snapshot to {}: {} accounts", path, accounts.len()); 232 + } 233 + } 234 + } 235 + } 236 + 237 + async fn save_snapshot(path: &str, state: Arc<Mutex<AccountStore>>, verbose: bool) { 238 + let snapshot = { 239 + let guard = state.lock().await; 240 + guard.accounts.clone() 241 + }; 242 + let path = path.to_string(); 243 + 244 + // Snapshot is written in a dedicated thread to avoid blocking async tasks. 245 + let handle = std::thread::spawn(move || write_snapshot_file(&path, snapshot, verbose)); 246 + let _ = tokio::task::spawn_blocking(move || { 247 + let _ = handle.join(); 248 + }) 249 + .await; 250 + } 251 + 252 + async fn periodic_snapshot(path: String, state: Arc<Mutex<AccountStore>>, interval: u64) { 253 + let mut ticker = tokio::time::interval(Duration::from_secs(interval)); 254 + loop { 255 + ticker.tick().await; 256 + save_snapshot(&path, state.clone(), false).await; 257 + } 258 + } 259 + 260 + async fn resolve_did_document( 261 + did: &str, 262 + client: &reqwest::Client, 263 + timeout_seconds: u64, 264 + ) -> Option<serde_json::Value> { 265 + let url = if did.starts_with("did:plc:") { 266 + format!("https://plc.directory/{did}") 267 + } else if did.starts_with("did:web:") { 268 + let domain = &did["did:web:".len()..]; 269 + format!("https://{domain}/.well-known/did.json") 270 + } else { 271 + return None; 272 + }; 273 + 274 + let response = client.get(url).timeout(Duration::from_secs(timeout_seconds)).send().await; 275 + let response = match response { 276 + Ok(response) => response, 277 + Err(_) => return None, 278 + }; 279 + 280 + if !response.status().is_success() { 281 + return None; 282 + } 283 + 284 + response.json::<serde_json::Value>().await.ok() 285 + } 286 + 287 + fn extract_pds_from_diddoc(doc: &serde_json::Value) -> Option<String> { 288 + let services = doc.get("service").or_else(|| doc.get("services"))?; 289 + let services = services.as_array()?; 290 + for svc in services { 291 + let svc_id = svc.get("id").and_then(|v| v.as_str()).unwrap_or(""); 292 + let svc_type = svc.get("type").and_then(|v| v.as_str()).unwrap_or(""); 293 + let endpoint = svc 294 + .get("serviceEndpoint") 295 + .or_else(|| svc.get("endpoint")) 296 + .and_then(|v| v.as_str()); 297 + if svc_id.ends_with("#atproto_pds") 298 + && svc_type == "AtprotoPersonalDataServer" 299 + && endpoint.is_some() 300 + { 301 + return endpoint.map(|s| s.to_string()); 302 + } 303 + } 304 + None 305 + } 306 + 307 + fn extract_handle_from_diddoc(doc: &serde_json::Value) -> Option<String> { 308 + let also_known_as = doc.get("alsoKnownAs")?.as_array()?; 309 + for aka in also_known_as { 310 + if let Some(value) = aka.as_str() { 311 + if let Some(handle) = value.strip_prefix("at://") { 312 + return Some(handle.to_string()); 313 + } 314 + } 315 + } 316 + None 317 + } 318 + 319 + async fn resolve_if_needed( 320 + did: String, 321 + state: Arc<Mutex<AccountStore>>, 322 + client: reqwest::Client, 323 + force_resolve: bool, 324 + resolve_ttl_seconds: i64, 325 + ) { 326 + let now = Utc::now().timestamp(); 327 + { 328 + let mut guard = state.lock().await; 329 + // Update bookkeeping before doing network I/O. 330 + let accounts = Arc::make_mut(&mut guard.accounts); 331 + let mut entry = match accounts.remove(did.as_str()) { 332 + Some(entry) => entry, 333 + None => return, 334 + }; 335 + 336 + let mut need_resolve = false; 337 + if force_resolve { 338 + need_resolve = true; 339 + } else if let Some(last_resolved) = entry.last_resolved { 340 + let age = now - last_resolved; 341 + if age >= resolve_ttl_seconds { 342 + need_resolve = true; 343 + } 344 + } else { 345 + need_resolve = true; 346 + } 347 + 348 + if !need_resolve { 349 + guard.upsert(entry); 350 + return; 351 + } 352 + 353 + entry.last_resolved = Some(now); 354 + guard.upsert(entry); 355 + } 356 + 357 + let doc = resolve_did_document(&did, &client, 5).await; 358 + let doc = match doc { 359 + Some(doc) => doc, 360 + None => return, 361 + }; 362 + 363 + let pds = extract_pds_from_diddoc(&doc); 364 + let handle = extract_handle_from_diddoc(&doc); 365 + 366 + let mut guard = state.lock().await; 367 + // Apply resolved values under lock. 368 + let accounts = Arc::make_mut(&mut guard.accounts); 369 + let mut entry = match accounts.remove(did.as_str()) { 370 + Some(entry) => entry, 371 + None => return, 372 + }; 373 + 374 + if let Some(pds) = pds { 375 + entry.pds = Some(guard.intern_pds(&pds)); 376 + } 377 + if let Some(handle) = handle { 378 + entry.handle = Some(handle.into_boxed_str()); 379 + } 380 + 381 + guard.upsert(entry); 382 + } 383 + 384 + async fn resolution_dispatcher( 385 + mut resolve_rx: mpsc::Receiver<(String, bool)>, 386 + state: Arc<Mutex<AccountStore>>, 387 + client: reqwest::Client, 388 + resolve_ttl_seconds: i64, 389 + workers: usize, 390 + resolve_processed: Arc<AtomicU64>, 391 + ) { 392 + let semaphore = Arc::new(Semaphore::new(workers)); 393 + while let Some((did, force)) = resolve_rx.recv().await { 394 + let permit = match semaphore.clone().acquire_owned().await { 395 + Ok(permit) => permit, 396 + Err(_) => continue, 397 + }; 398 + let state = state.clone(); 399 + let client = client.clone(); 400 + let resolve_processed = resolve_processed.clone(); 401 + tokio::spawn(async move { 402 + let _permit = permit; 403 + resolve_if_needed(did, state, client, force, resolve_ttl_seconds).await; 404 + resolve_processed.fetch_add(1, Ordering::Relaxed); 405 + }); 406 + } 407 + } 408 + 409 + async fn account_update_worker( 410 + mut update_rx: mpsc::UnboundedReceiver<(String, bool)>, 411 + resolve_tx: mpsc::Sender<(String, bool)>, 412 + state: Arc<Mutex<AccountStore>>, 413 + resolve_ttl_seconds: i64, 414 + update_processed: Arc<AtomicU64>, 415 + resolve_enqueued: Arc<AtomicU64>, 416 + resolve_dropped: Arc<AtomicU64>, 417 + ) { 418 + while let Some((did, force)) = update_rx.recv().await { 419 + update_processed.fetch_add(1, Ordering::Relaxed); 420 + let now = Utc::now().timestamp(); 421 + let need_resolve = { 422 + let mut guard = state.lock().await; 423 + guard.update_last_seen(&did, now, force, resolve_ttl_seconds) 424 + }; 425 + 426 + if need_resolve { 427 + if resolve_tx.try_send((did, force)).is_ok() { 428 + resolve_enqueued.fetch_add(1, Ordering::Relaxed); 429 + } else { 430 + resolve_dropped.fetch_add(1, Ordering::Relaxed); 431 + } 432 + } 433 + } 434 + } 435 + 436 + async fn run_firehose( 437 + relay: Option<String>, 438 + update_tx: mpsc::UnboundedSender<(String, bool)>, 439 + update_enqueued: Arc<AtomicU64>, 440 + update_processed: Arc<AtomicU64>, 441 + resolve_enqueued: Arc<AtomicU64>, 442 + resolve_processed: Arc<AtomicU64>, 443 + resolve_dropped: Arc<AtomicU64>, 444 + ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 445 + let base = relay.unwrap_or_else(|| "wss://bsky.network/xrpc".to_string()); 446 + if base.contains(NSID) { 447 + println!( 448 + "Warning: relay already includes subscribeRepos; using as-is: {}", 449 + base 450 + ); 451 + } 452 + let url = if base.contains(NSID) { 453 + base 454 + } else { 455 + format!("{}/{}", base.trim_end_matches('/'), NSID) 456 + }; 457 + println!("Connecting to firehose: {}", url); 458 + let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?; 459 + 460 + let (_, mut stream) = ws_stream.split(); 461 + let mut total_messages: u64 = 0; 462 + let mut binary_messages: u64 = 0; 463 + let mut text_messages: u64 = 0; 464 + let mut close_messages: u64 = 0; 465 + let mut other_messages: u64 = 0; 466 + let mut frame_parse_failures: u64 = 0; 467 + let mut body_parse_failures: u64 = 0; 468 + let mut commit_events: u64 = 0; 469 + let mut account_events: u64 = 0; 470 + let mut identity_events: u64 = 0; 471 + let mut error_frames: u64 = 0; 472 + let mut other_events: u64 = 0; 473 + let mut enqueued_updates: u64 = 0; 474 + 475 + while let Some(message) = stream.next().await { 476 + let message = message?; 477 + total_messages += 1; 478 + let bytes = match message { 479 + Message::Binary(bytes) => { 480 + binary_messages += 1; 481 + bytes 482 + } 483 + Message::Text(text) => { 484 + text_messages += 1; 485 + text.into_bytes() 486 + } 487 + Message::Close(_) => { 488 + close_messages += 1; 489 + break; 490 + } 491 + _ => { 492 + other_messages += 1; 493 + continue; 494 + } 495 + }; 496 + 497 + // Firehose frames are CBOR header + CBOR body (per atrium firehose example). 498 + let frame = match decode_firehose_frame(&bytes) { 499 + Ok(frame) => frame, 500 + Err(err) => { 501 + frame_parse_failures += 1; 502 + if frame_parse_failures <= 5 { 503 + let prefix_len = bytes.len().min(16); 504 + let prefix = bytes[..prefix_len] 505 + .iter() 506 + .map(|b| format!("{:02x}", b)) 507 + .collect::<Vec<_>>() 508 + .join(" "); 509 + println!( 510 + "Firehose frame parse failed ({} bytes, prefix: {}): {}", 511 + bytes.len(), 512 + prefix, 513 + err 514 + ); 515 + } 516 + continue; 517 + } 518 + }; 519 + match frame { 520 + FirehoseFrame::Message(t, body_start) => { 521 + let mut did = None; 522 + let mut force_resolve = false; 523 + let body = &bytes[body_start..]; 524 + 525 + match t.as_deref() { 526 + Some("#commit") => match dagcbor::from_reader::<Commit, _>(body) { 527 + Ok(commit) => { 528 + commit_events += 1; 529 + did = Some(commit.repo.as_str().to_string()); 530 + } 531 + Err(err) => { 532 + body_parse_failures += 1; 533 + if body_parse_failures <= 5 { 534 + println!("Commit decode failed: {err}"); 535 + } 536 + } 537 + }, 538 + Some("#account") => match dagcbor::from_reader::<Account, _>(body) { 539 + Ok(account) => { 540 + account_events += 1; 541 + did = Some(account.did.as_str().to_string()); 542 + force_resolve = true; 543 + } 544 + Err(err) => { 545 + body_parse_failures += 1; 546 + if body_parse_failures <= 5 { 547 + println!("Account decode failed: {err}"); 548 + } 549 + } 550 + }, 551 + Some("#identity") => { 552 + match dagcbor::from_reader::<Identity, _>(body) { 553 + Ok(identity) => { 554 + identity_events += 1; 555 + did = Some(identity.did.as_str().to_string()); 556 + force_resolve = true; 557 + } 558 + Err(err) => { 559 + body_parse_failures += 1; 560 + if body_parse_failures <= 5 { 561 + println!("Identity decode failed: {err}"); 562 + } 563 + } 564 + } 565 + } 566 + _ => { 567 + other_events += 1; 568 + } 569 + } 570 + 571 + if let Some(did) = did { 572 + let _ = update_tx.send((did, force_resolve)); 573 + update_enqueued.fetch_add(1, Ordering::Relaxed); 574 + enqueued_updates += 1; 575 + } 576 + } 577 + FirehoseFrame::Error => { 578 + error_frames += 1; 579 + } 580 + } 581 + 582 + if total_messages % 1000 == 0 { 583 + let updates_backlog = update_enqueued.load(Ordering::Relaxed) 584 + .saturating_sub(update_processed.load(Ordering::Relaxed)); 585 + let resolves_backlog = resolve_enqueued.load(Ordering::Relaxed) 586 + .saturating_sub(resolve_processed.load(Ordering::Relaxed)); 587 + println!( 588 + "Firehose stats: total={}, binary={}, text={}, close={}, other={}, frame_failures={}, body_failures={}, commit={}, account={}, identity={}, other_events={}, error_frames={}, enqueued={}, updates_backlog={}, resolves_backlog={}, resolve_dropped={}", 589 + total_messages, 590 + binary_messages, 591 + text_messages, 592 + close_messages, 593 + other_messages, 594 + frame_parse_failures, 595 + body_parse_failures, 596 + commit_events, 597 + account_events, 598 + identity_events, 599 + other_events, 600 + error_frames, 601 + enqueued_updates, 602 + updates_backlog, 603 + resolves_backlog, 604 + resolve_dropped.load(Ordering::Relaxed) 605 + ); 606 + } 607 + } 608 + 609 + println!( 610 + "Firehose final: total={}, binary={}, text={}, close={}, other={}, frame_failures={}, body_failures={}, commit={}, account={}, identity={}, other_events={}, error_frames={}, enqueued={}, updates_backlog={}, resolves_backlog={}, resolve_dropped={}", 611 + total_messages, 612 + binary_messages, 613 + text_messages, 614 + close_messages, 615 + other_messages, 616 + frame_parse_failures, 617 + body_parse_failures, 618 + commit_events, 619 + account_events, 620 + identity_events, 621 + other_events, 622 + error_frames, 623 + enqueued_updates, 624 + update_enqueued.load(Ordering::Relaxed) 625 + .saturating_sub(update_processed.load(Ordering::Relaxed)), 626 + resolve_enqueued 627 + .load(Ordering::Relaxed) 628 + .saturating_sub(resolve_processed.load(Ordering::Relaxed)), 629 + resolve_dropped.load(Ordering::Relaxed) 630 + ); 631 + 632 + Ok(()) 633 + } 634 + 635 + #[derive(Debug)] 636 + enum FirehoseFrame { 637 + Message(Option<String>, usize), 638 + Error, 639 + } 640 + 641 + #[derive(Debug)] 642 + enum FrameHeader { 643 + Message(Option<String>), 644 + Error, 645 + } 646 + 647 + impl TryFrom<Ipld> for FrameHeader { 648 + type Error = String; 649 + 650 + fn try_from(value: Ipld) -> Result<Self, String> { 651 + if let Ipld::Map(map) = value { 652 + if let Some(Ipld::Integer(op)) = map.get("op") { 653 + match op { 654 + 1 => { 655 + let t = if let Some(Ipld::String(t)) = map.get("t") { 656 + Some(t.clone()) 657 + } else { 658 + None 659 + }; 660 + return Ok(FrameHeader::Message(t)); 661 + } 662 + -1 => return Ok(FrameHeader::Error), 663 + _ => {} 664 + } 665 + } 666 + } 667 + Err("invalid frame header".to_string()) 668 + } 669 + } 670 + 671 + fn decode_firehose_frame(bytes: &[u8]) -> Result<FirehoseFrame, String> { 672 + let mut cursor = Cursor::new(bytes); 673 + match dagcbor::from_reader::<Ipld, _>(&mut cursor) { 674 + Err(dagcbor::DecodeError::TrailingData) => { 675 + let split = cursor.position() as usize; 676 + let (left, _right) = bytes.split_at(split); 677 + // Header is an IPLD map with `op` and optional `t`. 678 + let header_ipld = 679 + dagcbor::from_slice::<Ipld>(left).map_err(|err| err.to_string())?; 680 + let header = FrameHeader::try_from(header_ipld)?; 681 + match header { 682 + FrameHeader::Message(t) => Ok(FirehoseFrame::Message(t, split)), 683 + FrameHeader::Error => Ok(FirehoseFrame::Error), 684 + } 685 + } 686 + Err(err) => Err(err.to_string()), 687 + Ok(_) => Err("missing frame body".to_string()), 688 + } 689 + } 690 + 691 + #[derive(Clone)] 692 + struct Args { 693 + relay: Option<String>, 694 + snapshot_read_file: Option<String>, 695 + snapshot_file: String, 696 + snapshot_interval: u64, 697 + resolve_workers: usize, 698 + resolve_ttl_seconds: i64, 699 + } 700 + 701 + fn parse_args() -> Args { 702 + let mut relay = None; 703 + let mut snapshot_read_file = None; 704 + let mut snapshot_file = SNAPSHOT_FILE.to_string(); 705 + let mut snapshot_interval = SNAPSHOT_INTERVAL; 706 + let mut resolve_workers = RESOLUTION_WORKERS; 707 + let mut resolve_ttl_seconds = RESOLVE_TTL_SECONDS; 708 + 709 + let mut args = std::env::args().skip(1).peekable(); 710 + while let Some(arg) = args.next() { 711 + match arg.as_str() { 712 + "--help" | "-h" => { 713 + println!( 714 + "Usage: at-mau-watcher [options]\n\ 715 + \n\ 716 + --relay <url> Relay base URL, e.g. wss://bsky.network/xrpc\n\ 717 + --snapshot-file <path> Output snapshot JSON path (default: {default_snapshot})\n\ 718 + --snapshot-read-file <path> Input snapshot JSON path (defaults to --snapshot-file)\n\ 719 + --snapshot-interval <seconds> Seconds between periodic snapshots (default: {default_interval})\n\ 720 + --resolve-workers <n> Number of DID resolution workers (default: {default_workers})\n\ 721 + --resolve-ttl-seconds <n> Re-resolve DIDs older than this many seconds (default: {default_ttl})\n", 722 + default_snapshot = SNAPSHOT_FILE, 723 + default_interval = SNAPSHOT_INTERVAL, 724 + default_workers = RESOLUTION_WORKERS, 725 + default_ttl = RESOLVE_TTL_SECONDS 726 + ); 727 + std::process::exit(0); 728 + } 729 + "--relay" => { 730 + if let Some(value) = args.next() { 731 + relay = Some(value); 732 + } 733 + } 734 + "--snapshot-file" => { 735 + if let Some(value) = args.next() { 736 + snapshot_file = value; 737 + } 738 + } 739 + "--snapshot-read-file" => { 740 + if let Some(value) = args.next() { 741 + snapshot_read_file = Some(value); 742 + } 743 + } 744 + "--snapshot-interval" => { 745 + if let Some(value) = args.next() { 746 + if let Ok(parsed) = value.parse::<u64>() { 747 + snapshot_interval = parsed; 748 + } 749 + } 750 + } 751 + "--resolve-workers" => { 752 + if let Some(value) = args.next() { 753 + if let Ok(parsed) = value.parse::<usize>() { 754 + resolve_workers = parsed; 755 + } 756 + } 757 + } 758 + "--resolve-ttl-seconds" => { 759 + if let Some(value) = args.next() { 760 + if let Ok(parsed) = value.parse::<i64>() { 761 + resolve_ttl_seconds = parsed; 762 + } 763 + } 764 + } 765 + _ => {} 766 + } 767 + } 768 + 769 + Args { 770 + relay, 771 + snapshot_read_file, 772 + snapshot_file, 773 + snapshot_interval, 774 + resolve_workers, 775 + resolve_ttl_seconds, 776 + } 777 + } 778 + 779 + #[tokio::main] 780 + async fn main() { 781 + let args = parse_args(); 782 + 783 + let snapshot_read_path = args 784 + .snapshot_read_file 785 + .as_deref() 786 + .unwrap_or(&args.snapshot_file); 787 + let snapshot = load_snapshot(snapshot_read_path); 788 + if !snapshot.is_empty() { 789 + println!( 790 + "Loaded snapshot from {}: {} accounts", 791 + snapshot_read_path, 792 + snapshot.len() 793 + ); 794 + } 795 + 796 + let mut store = AccountStore { 797 + accounts: Arc::new(HashSet::new()), 798 + pds_pool: HashMap::new(), 799 + }; 800 + let mut accounts = HashSet::new(); 801 + for (did, entry) in snapshot { 802 + let last_seen = entry.last_seen.timestamp(); 803 + let last_resolved = entry.last_resolved.map(|dt| dt.timestamp()); 804 + let pds = entry.pds.as_deref().map(|value| store.intern_pds(value)); 805 + accounts.insert(AccountEntry { 806 + did, 807 + pds, 808 + handle: entry.handle, 809 + last_seen, 810 + last_resolved, 811 + }); 812 + } 813 + store.accounts = Arc::new(accounts); 814 + let state = Arc::new(Mutex::new(store)); 815 + 816 + let update_enqueued = Arc::new(AtomicU64::new(0)); 817 + let update_processed = Arc::new(AtomicU64::new(0)); 818 + let resolve_enqueued = Arc::new(AtomicU64::new(0)); 819 + let resolve_processed = Arc::new(AtomicU64::new(0)); 820 + let resolve_dropped = Arc::new(AtomicU64::new(0)); 821 + 822 + let (resolve_tx, resolve_rx) = mpsc::channel(RESOLUTION_QUEUE_SIZE); 823 + let (update_tx, update_rx) = mpsc::unbounded_channel(); 824 + 825 + let client = reqwest::Client::new(); 826 + 827 + tokio::spawn(resolution_dispatcher( 828 + resolve_rx, 829 + state.clone(), 830 + client.clone(), 831 + args.resolve_ttl_seconds, 832 + args.resolve_workers, 833 + resolve_processed.clone(), 834 + )); 835 + 836 + tokio::spawn(account_update_worker( 837 + update_rx, 838 + resolve_tx.clone(), 839 + state.clone(), 840 + args.resolve_ttl_seconds, 841 + update_processed.clone(), 842 + resolve_enqueued.clone(), 843 + resolve_dropped.clone(), 844 + )); 845 + 846 + let snapshot_path = args.snapshot_file.clone(); 847 + tokio::spawn(periodic_snapshot( 848 + snapshot_path, 849 + state.clone(), 850 + args.snapshot_interval, 851 + )); 852 + 853 + #[cfg(unix)] 854 + { 855 + let snapshot_path = args.snapshot_file.clone(); 856 + let state = state.clone(); 857 + tokio::spawn(async move { 858 + use tokio::signal::unix::{signal, SignalKind}; 859 + if let Ok(mut sig) = signal(SignalKind::user_defined1()) { 860 + while sig.recv().await.is_some() { 861 + println!("Received SIGUSR1, scheduling snapshot to {}", snapshot_path); 862 + save_snapshot(&snapshot_path, state.clone(), true).await; 863 + } 864 + } 865 + }); 866 + } 867 + { 868 + let snapshot_path = args.snapshot_file.clone(); 869 + let state = state.clone(); 870 + tokio::spawn(async move { 871 + if tokio::signal::ctrl_c().await.is_ok() { 872 + println!("Received Ctrl+C, saving snapshot to {}", snapshot_path); 873 + save_snapshot(&snapshot_path, state.clone(), true).await; 874 + std::process::exit(0); 875 + } 876 + }); 877 + } 878 + 879 + let mut reconnect_delay = 1u64; 880 + let max_reconnect_delay = 60u64; 881 + 882 + loop { 883 + if let Some(relay) = args.relay.clone() { 884 + println!("Using relay: {}", relay); 885 + } else { 886 + println!("Using default relay: wss://bsky.network/xrpc"); 887 + } 888 + 889 + let relay = args.relay.clone(); 890 + let update_tx = update_tx.clone(); 891 + let firehose = run_firehose( 892 + relay, 893 + update_tx, 894 + update_enqueued.clone(), 895 + update_processed.clone(), 896 + resolve_enqueued.clone(), 897 + resolve_processed.clone(), 898 + resolve_dropped.clone(), 899 + ) 900 + .await; 901 + 902 + if let Err(err) = firehose { 903 + println!("Firehose client error: {err}; reconnecting in {reconnect_delay}s"); 904 + } else { 905 + println!("Firehose client stopped; reconnecting..."); 906 + } 907 + 908 + save_snapshot(&args.snapshot_file, state.clone(), true).await; 909 + tokio::time::sleep(Duration::from_secs(reconnect_delay)).await; 910 + reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); 911 + } 912 + }