A lexicon-driven AppView for ATProto. happyview.dev
backfill firehose jetstream atproto appview oauth lexicon
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add backfill system for historical record discovery and indexing

Trezy 66552c7b 86c83e41

+480
+13
migrations/20260216000000_create_backfill_jobs.sql
··· 1 + CREATE TABLE IF NOT EXISTS backfill_jobs ( 2 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), 3 + collection TEXT, 4 + did TEXT, 5 + status TEXT NOT NULL DEFAULT 'pending', 6 + total_repos INT DEFAULT 0, 7 + processed_repos INT DEFAULT 0, 8 + total_records INT DEFAULT 0, 9 + error TEXT, 10 + started_at TIMESTAMPTZ, 11 + completed_at TIMESTAMPTZ, 12 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 13 + );
+99
src/admin.rs
··· 19 19 .route("/lexicons", post(upload_lexicon).get(list_lexicons)) 20 20 .route("/lexicons/{id}", get(get_lexicon).delete(delete_lexicon)) 21 21 .route("/stats", get(stats)) 22 + .route("/backfill", post(create_backfill)) 23 + .route("/backfill/status", get(backfill_status)) 22 24 } 23 25 24 26 /// Extract and validate the admin Bearer token from request headers. ··· 299 301 .collect(), 300 302 })) 301 303 } 304 + 305 + // --------------------------------------------------------------------------- 306 + // Backfill endpoints 307 + // --------------------------------------------------------------------------- 308 + 309 + #[derive(Deserialize)] 310 + struct CreateBackfillBody { 311 + collection: Option<String>, 312 + did: Option<String>, 313 + } 314 + 315 + #[derive(Serialize)] 316 + struct BackfillJob { 317 + id: String, 318 + collection: Option<String>, 319 + did: Option<String>, 320 + status: String, 321 + total_repos: Option<i32>, 322 + processed_repos: Option<i32>, 323 + total_records: Option<i32>, 324 + error: Option<String>, 325 + started_at: Option<chrono::DateTime<chrono::Utc>>, 326 + completed_at: Option<chrono::DateTime<chrono::Utc>>, 327 + created_at: chrono::DateTime<chrono::Utc>, 328 + } 329 + 330 + /// POST /admin/backfill — create a new backfill job. 331 + async fn create_backfill( 332 + State(state): State<AppState>, 333 + _admin: AdminAuth, 334 + Json(body): Json<CreateBackfillBody>, 335 + ) -> Result<(StatusCode, Json<Value>), AppError> { 336 + let row: (String,) = sqlx::query_as( 337 + "INSERT INTO backfill_jobs (collection, did) VALUES ($1, $2) RETURNING id::text", 338 + ) 339 + .bind(&body.collection) 340 + .bind(&body.did) 341 + .fetch_one(&state.db) 342 + .await 343 + .map_err(|e| AppError::Internal(format!("failed to create backfill job: {e}")))?; 344 + 345 + Ok(( 346 + StatusCode::CREATED, 347 + Json(serde_json::json!({ 348 + "id": row.0, 349 + "status": "pending", 350 + })), 351 + )) 352 + } 353 + 354 + /// GET /admin/backfill/status — list all backfill jobs. 355 + async fn backfill_status( 356 + State(state): State<AppState>, 357 + _admin: AdminAuth, 358 + ) -> Result<Json<Vec<BackfillJob>>, AppError> { 359 + let rows: Vec<( 360 + String, 361 + Option<String>, 362 + Option<String>, 363 + String, 364 + Option<i32>, 365 + Option<i32>, 366 + Option<i32>, 367 + Option<String>, 368 + Option<chrono::DateTime<chrono::Utc>>, 369 + Option<chrono::DateTime<chrono::Utc>>, 370 + chrono::DateTime<chrono::Utc>, 371 + )> = sqlx::query_as( 372 + "SELECT id::text, collection, did, status, total_repos, processed_repos, total_records, error, started_at, completed_at, created_at FROM backfill_jobs ORDER BY created_at DESC", 373 + ) 374 + .fetch_all(&state.db) 375 + .await 376 + .map_err(|e| AppError::Internal(format!("failed to list backfill jobs: {e}")))?; 377 + 378 + let jobs: Vec<BackfillJob> = rows 379 + .into_iter() 380 + .map( 381 + |(id, collection, did, status, total_repos, processed_repos, total_records, error, started_at, completed_at, created_at)| { 382 + BackfillJob { 383 + id, 384 + collection, 385 + did, 386 + status, 387 + total_repos, 388 + processed_repos, 389 + total_records, 390 + error, 391 + started_at, 392 + completed_at, 393 + created_at, 394 + } 395 + }, 396 + ) 397 + .collect(); 398 + 399 + Ok(Json(jobs)) 400 + }
+363
src/backfill.rs
··· 1 + use serde::Deserialize; 2 + use serde_json::Value; 3 + use sqlx::PgPool; 4 + use std::sync::Arc; 5 + use tokio::sync::Semaphore; 6 + use tracing::{debug, error, info, warn}; 7 + 8 + use crate::profile; 9 + 10 + // --------------------------------------------------------------------------- 11 + // Relay / PDS response types 12 + // --------------------------------------------------------------------------- 13 + 14 + #[derive(Deserialize)] 15 + struct ListReposResponse { 16 + repos: Vec<RepoEntry>, 17 + cursor: Option<String>, 18 + } 19 + 20 + #[derive(Deserialize)] 21 + struct RepoEntry { 22 + did: String, 23 + } 24 + 25 + #[derive(Deserialize)] 26 + struct ListRecordsResponse { 27 + records: Vec<RecordEntry>, 28 + cursor: Option<String>, 29 + } 30 + 31 + #[derive(Deserialize)] 32 + struct RecordEntry { 33 + uri: String, 34 + cid: String, 35 + value: Value, 36 + } 37 + 38 + // --------------------------------------------------------------------------- 39 + // Relay discovery 40 + // --------------------------------------------------------------------------- 41 + 42 + /// Discover all DIDs that have records in `collection` via the relay's 43 + /// `com.atproto.sync.listReposByCollection` endpoint. Paginates until done. 44 + async fn list_repos_by_collection( 45 + http: &reqwest::Client, 46 + relay_url: &str, 47 + collection: &str, 48 + ) -> Result<Vec<String>, String> { 49 + let base = relay_url.trim_end_matches('/'); 50 + let mut dids = Vec::new(); 51 + let mut cursor: Option<String> = None; 52 + 53 + loop { 54 + let mut url = format!( 55 + "{base}/xrpc/com.atproto.sync.listReposByCollection?collection={collection}&limit=1000" 56 + ); 57 + if let Some(ref c) = cursor { 58 + url.push_str(&format!("&cursor={c}")); 59 + } 60 + 61 + let resp = http 62 + .get(&url) 63 + .send() 64 + .await 65 + .map_err(|e| format!("relay request failed: {e}"))?; 66 + 67 + if !resp.status().is_success() { 68 + return Err(format!("relay returned {}", resp.status())); 69 + } 70 + 71 + let body: ListReposResponse = resp 72 + .json() 73 + .await 74 + .map_err(|e| format!("invalid relay response: {e}"))?; 75 + 76 + let page_count = body.repos.len(); 77 + for repo in body.repos { 78 + dids.push(repo.did); 79 + } 80 + 81 + match body.cursor { 82 + Some(c) if page_count > 0 => cursor = Some(c), 83 + _ => break, 84 + } 85 + } 86 + 87 + Ok(dids) 88 + } 89 + 90 + // --------------------------------------------------------------------------- 91 + // PDS record fetching 92 + // --------------------------------------------------------------------------- 93 + 94 + /// Fetch all records for a DID + collection from their PDS via 95 + /// `com.atproto.repo.listRecords`. Paginates until done. 96 + async fn fetch_records( 97 + http: &reqwest::Client, 98 + pds_url: &str, 99 + did: &str, 100 + collection: &str, 101 + ) -> Result<Vec<(String, String, String, Value)>, String> { 102 + let base = pds_url.trim_end_matches('/'); 103 + let mut records = Vec::new(); 104 + let mut cursor: Option<String> = None; 105 + 106 + loop { 107 + let mut url = format!( 108 + "{base}/xrpc/com.atproto.repo.listRecords?repo={did}&collection={collection}&limit=100" 109 + ); 110 + if let Some(ref c) = cursor { 111 + url.push_str(&format!("&cursor={c}")); 112 + } 113 + 114 + let resp = http 115 + .get(&url) 116 + .send() 117 + .await 118 + .map_err(|e| format!("PDS listRecords failed: {e}"))?; 119 + 120 + if !resp.status().is_success() { 121 + return Err(format!("PDS returned {} for {did}", resp.status())); 122 + } 123 + 124 + let body: ListRecordsResponse = resp 125 + .json() 126 + .await 127 + .map_err(|e| format!("invalid PDS listRecords response: {e}"))?; 128 + 129 + let page_count = body.records.len(); 130 + for entry in body.records { 131 + let rkey = entry.uri.split('/').last().unwrap_or_default().to_string(); 132 + records.push((entry.uri, rkey, entry.cid, entry.value)); 133 + } 134 + 135 + match body.cursor { 136 + Some(c) if page_count > 0 => cursor = Some(c), 137 + _ => break, 138 + } 139 + } 140 + 141 + Ok(records) 142 + } 143 + 144 + // --------------------------------------------------------------------------- 145 + // Job runner 146 + // --------------------------------------------------------------------------- 147 + 148 + /// Run a single backfill job: discover repos, fetch records, upsert into DB. 149 + async fn run_job( 150 + db: &PgPool, 151 + http: &reqwest::Client, 152 + relay_url: &str, 153 + job_id: &str, 154 + ) -> Result<(), String> { 155 + // Fetch the job 156 + let job: (Option<String>, Option<String>) = sqlx::query_as( 157 + "SELECT collection, did FROM backfill_jobs WHERE id::text = $1", 158 + ) 159 + .bind(job_id) 160 + .fetch_one(db) 161 + .await 162 + .map_err(|e| format!("failed to fetch job: {e}"))?; 163 + 164 + let (job_collection, job_did) = job; 165 + 166 + // Mark as running 167 + let _ = sqlx::query( 168 + "UPDATE backfill_jobs SET status = 'running', started_at = NOW() WHERE id::text = $1", 169 + ) 170 + .bind(job_id) 171 + .execute(db) 172 + .await; 173 + 174 + // Determine target collections 175 + let collections: Vec<String> = if let Some(ref col) = job_collection { 176 + vec![col.clone()] 177 + } else { 178 + // All backfill-eligible collections 179 + let rows: Vec<(String,)> = sqlx::query_as( 180 + "SELECT id FROM lexicons WHERE backfill = TRUE AND lexicon_json->'defs'->'main'->>'type' = 'record'", 181 + ) 182 + .fetch_all(db) 183 + .await 184 + .map_err(|e| format!("failed to query backfill-eligible lexicons: {e}"))?; 185 + rows.into_iter().map(|(id,)| id).collect() 186 + }; 187 + 188 + if collections.is_empty() { 189 + let _ = sqlx::query( 190 + "UPDATE backfill_jobs SET status = 'completed', completed_at = NOW(), error = 'no backfill-eligible collections' WHERE id::text = $1", 191 + ) 192 + .bind(job_id) 193 + .execute(db) 194 + .await; 195 + return Ok(()); 196 + } 197 + 198 + info!(job = job_id, ?collections, "starting backfill"); 199 + 200 + let semaphore = Arc::new(Semaphore::new(8)); 201 + let mut total_repos = 0i32; 202 + let mut processed_repos = 0i32; 203 + let mut total_records = 0i32; 204 + 205 + for collection in &collections { 206 + // Discover DIDs 207 + let dids = if let Some(ref did) = job_did { 208 + vec![did.clone()] 209 + } else { 210 + match list_repos_by_collection(http, relay_url, collection).await { 211 + Ok(dids) => dids, 212 + Err(e) => { 213 + warn!(collection, error = %e, "failed to discover repos, skipping"); 214 + continue; 215 + } 216 + } 217 + }; 218 + 219 + total_repos += dids.len() as i32; 220 + let _ = sqlx::query("UPDATE backfill_jobs SET total_repos = $2 WHERE id::text = $1") 221 + .bind(job_id) 222 + .bind(total_repos) 223 + .execute(db) 224 + .await; 225 + 226 + // Process each DID concurrently (bounded by semaphore) 227 + let mut tasks = Vec::new(); 228 + 229 + for did in dids { 230 + let permit = semaphore.clone().acquire_owned().await.unwrap(); 231 + let http = http.clone(); 232 + let db = db.clone(); 233 + let collection = collection.clone(); 234 + 235 + let task = tokio::spawn(async move { 236 + let _permit = permit; 237 + backfill_repo(&db, &http, &did, &collection).await 238 + }); 239 + tasks.push(task); 240 + } 241 + 242 + for task in tasks { 243 + match task.await { 244 + Ok(Ok(count)) => { 245 + total_records += count; 246 + processed_repos += 1; 247 + } 248 + Ok(Err(e)) => { 249 + warn!(error = %e, "repo backfill failed"); 250 + processed_repos += 1; 251 + } 252 + Err(e) => { 253 + warn!(error = %e, "repo backfill task panicked"); 254 + processed_repos += 1; 255 + } 256 + } 257 + 258 + // Update progress periodically 259 + let _ = sqlx::query( 260 + "UPDATE backfill_jobs SET processed_repos = $2, total_records = $3 WHERE id::text = $1", 261 + ) 262 + .bind(job_id) 263 + .bind(processed_repos) 264 + .bind(total_records) 265 + .execute(db) 266 + .await; 267 + } 268 + } 269 + 270 + // Mark completed 271 + let _ = sqlx::query( 272 + "UPDATE backfill_jobs SET status = 'completed', completed_at = NOW(), processed_repos = $2, total_records = $3 WHERE id::text = $1", 273 + ) 274 + .bind(job_id) 275 + .bind(processed_repos) 276 + .bind(total_records) 277 + .execute(db) 278 + .await; 279 + 280 + info!(job = job_id, processed_repos, total_records, "backfill completed"); 281 + Ok(()) 282 + } 283 + 284 + /// Backfill a single repo's records for a collection. Returns the number of 285 + /// records upserted. 286 + async fn backfill_repo( 287 + db: &PgPool, 288 + http: &reqwest::Client, 289 + did: &str, 290 + collection: &str, 291 + ) -> Result<i32, String> { 292 + // Resolve PDS 293 + let pds = profile::resolve_pds_endpoint(http, did) 294 + .await 295 + .map_err(|e| format!("PDS resolution failed for {did}: {e}"))?; 296 + 297 + // Fetch records 298 + let records = fetch_records(http, &pds, did, collection).await?; 299 + let count = records.len() as i32; 300 + 301 + debug!(did, collection, count, "fetched records from PDS"); 302 + 303 + // Upsert into DB 304 + for (uri, rkey, cid, value) in records { 305 + let _ = sqlx::query( 306 + r#" 307 + INSERT INTO records (uri, did, collection, rkey, record, cid) 308 + VALUES ($1, $2, $3, $4, $5, $6) 309 + ON CONFLICT (uri) DO UPDATE 310 + SET record = EXCLUDED.record, 311 + cid = EXCLUDED.cid 312 + "#, 313 + ) 314 + .bind(&uri) 315 + .bind(did) 316 + .bind(collection) 317 + .bind(&rkey) 318 + .bind(&value) 319 + .bind(&cid) 320 + .execute(db) 321 + .await 322 + .map_err(|e| format!("DB upsert failed for {uri}: {e}"))?; 323 + } 324 + 325 + Ok(count) 326 + } 327 + 328 + // --------------------------------------------------------------------------- 329 + // Background worker 330 + // --------------------------------------------------------------------------- 331 + 332 + /// Spawn a background task that polls for pending backfill jobs and runs them. 333 + pub fn spawn_worker(db: PgPool, http: reqwest::Client, relay_url: String) { 334 + tokio::spawn(async move { 335 + info!("backfill worker started"); 336 + loop { 337 + // Poll for a pending job 338 + let job: Option<(String,)> = sqlx::query_as( 339 + "SELECT id::text FROM backfill_jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1", 340 + ) 341 + .fetch_optional(&db) 342 + .await 343 + .unwrap_or(None); 344 + 345 + if let Some((job_id,)) = job { 346 + info!(job = %job_id, "picked up backfill job"); 347 + if let Err(e) = run_job(&db, &http, &relay_url, &job_id).await { 348 + error!(job = %job_id, error = %e, "backfill job failed"); 349 + let _ = sqlx::query( 350 + "UPDATE backfill_jobs SET status = 'failed', completed_at = NOW(), error = $2 WHERE id::text = $1", 351 + ) 352 + .bind(&job_id) 353 + .bind(&e) 354 + .execute(&db) 355 + .await; 356 + } 357 + } else { 358 + // No pending jobs, wait before polling again 359 + tokio::time::sleep(std::time::Duration::from_secs(5)).await; 360 + } 361 + } 362 + }); 363 + }
+3
src/config.rs
··· 9 9 pub aip_url: String, 10 10 pub jetstream_url: String, 11 11 pub admin_secret: Option<String>, 12 + pub relay_url: String, 12 13 } 13 14 14 15 impl Config { ··· 26 27 jetstream_url: env::var("JETSTREAM_URL") 27 28 .unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network/subscribe".into()), 28 29 admin_secret: env::var("ADMIN_SECRET").ok(), 30 + relay_url: env::var("RELAY_URL") 31 + .unwrap_or_else(|_| "https://bsky.network".into()), 29 32 } 30 33 } 31 34
+2
src/main.rs
··· 1 1 mod admin; 2 2 mod auth; 3 + mod backfill; 3 4 mod config; 4 5 mod error; 5 6 mod jetstream; ··· 66 67 }; 67 68 68 69 jetstream::spawn(state.db.clone(), config.jetstream_url.clone(), collections_rx); 70 + backfill::spawn_worker(state.db.clone(), state.http.clone(), config.relay_url.clone()); 69 71 70 72 let app = server::router(state); 71 73 let addr = config.listen_addr();