use futures_util::future::join_all; use reqwest::Client; use serde::Deserialize; use thiserror::Error; #[derive(Debug, Error)] pub enum PdsError { #[error("Network request failed: {0}")] RequestError(#[from] reqwest::Error), #[error("Failed to join task: {0}")] JoinError(#[from] tokio::task::JoinError), #[error("Environment variable not found: {0}")] EnvVarError(#[from] std::env::VarError), } #[derive(Deserialize, Debug)] struct Repo { did: String, } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] struct ListReposResponse { cursor: Option, repos: Vec, } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] struct DescribeRepoResponse { handle_is_correct: bool, } pub async fn get_all_active_dids_from_pdses(pds_hosts: &[String]) -> Result, PdsError> { let client = Client::new(); let mut tasks = Vec::new(); for host in pds_hosts { let host_clone = host.clone(); let client_clone = client.clone(); tasks.push(tokio::spawn(async move { fetch_active_dids_from_single_pds(host_clone, client_clone).await })); } let results = join_all(tasks).await; let mut all_dids = Vec::new(); for result in results { let pds_dids = result??; all_dids.extend(pds_dids); } tracing::info!("--- Sample of fetched ACTIVE DIDs (first 10) ---"); for did in all_dids.iter().take(10) { tracing::info!("{}", did); } tracing::info!("... and {} more.", all_dids.len().saturating_sub(10)); tracing::info!("--- Total Active DIDs fetched: {} ---", all_dids.len()); Ok(all_dids) } async fn fetch_active_dids_from_single_pds( host: String, client: Client, ) -> Result, PdsError> { let mut active_dids = Vec::new(); let mut cursor: Option = None; let limit = 1000; tracing::info!("Fetching active DIDs from PDS: {}", host); loop { let url = match &cursor { Some(c) => format!( "https://{}/xrpc/com.atproto.sync.listRepos?limit={}&cursor={}", host, limit, c ), None => format!( "https://{}/xrpc/com.atproto.sync.listRepos?limit={}", host, limit ), }; let response: ListReposResponse = client.get(&url).send().await?.json().await?; let dids_on_page: Vec = response.repos.into_iter().map(|r| r.did).collect(); if !dids_on_page.is_empty() { let mut check_tasks = Vec::new(); for did in dids_on_page { let host_clone = host.clone(); let client_clone = client.clone(); check_tasks.push(tokio::spawn(async move { let url = format!( "https://{}/xrpc/com.atproto.repo.describeRepo?repo={}", host_clone, did ); let repo_info_result = async { client_clone .get(&url) .send() .await? .json::() .await } .await; if let Ok(repo_info) = repo_info_result && repo_info.handle_is_correct { Some(did) } else { None } })); } let checked_results = join_all(check_tasks).await; for result in checked_results { if let Ok(Some(active_did)) = result { active_dids.push(active_did); } } } if let Some(next_cursor) = response.cursor { cursor = Some(next_cursor); } else { break; } } tracing::info!( "Finished fetching {} active DIDs from {}.", active_dids.len(), host ); Ok(active_dids) }