Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(consumer): do the rev check in rust instead of DB and don't transaction bf items

Mia 106a6694 bf5930db

+42 -50
+15 -20
consumer/src/indexer/db.rs
··· 57 57 .await 58 58 } 59 59 60 + 61 + pub async fn get_repo_info( 62 + conn: &mut AsyncPgConnection, 63 + repo: &str, 64 + ) -> QueryResult<Option<(Option<String>, types::ActorSyncState)>> { 65 + schema::actors::table 66 + .select((schema::actors::repo_rev, schema::actors::sync_state)) 67 + .find(repo) 68 + .get_result(conn) 69 + .await 70 + .optional() 71 + } 72 + 60 73 pub async fn write_backfill_job(conn: &mut AsyncPgConnection, repo: &str) -> QueryResult<usize> { 61 74 diesel::insert_into(schema::backfill_jobs::table) 62 75 .values(( ··· 92 105 .await 93 106 } 94 107 95 - pub async fn account_sync_status( 96 - conn: &mut AsyncPgConnection, 97 - did: &str, 98 - ) -> QueryResult<Option<types::ActorSyncState>> { 99 - schema::actors::table 100 - .select(schema::actors::sync_state) 101 - .for_update() 102 - .find(did) 103 - .get_result(conn) 104 - .await 105 - .optional() 106 - } 107 - 108 108 pub async fn account_status_and_rev( 109 109 conn: &mut AsyncPgConnection, 110 110 did: &str, ··· 125 125 repo: &str, 126 126 rev: &str, 127 127 cid: Cid, 128 - ) -> QueryResult<bool> { 128 + ) -> QueryResult<usize> { 129 129 diesel::update(schema::actors::table) 130 130 .set(( 131 131 schema::actors::repo_rev.eq(rev), 132 132 schema::actors::repo_cid.eq(cid.to_string()), 133 133 )) 134 134 .filter( 135 - schema::actors::did.eq(repo).and( 136 - schema::actors::repo_rev 137 - .lt(rev) 138 - .or(schema::actors::repo_rev.is_null()), 139 - ), 135 + schema::actors::did.eq(repo) 140 136 ) 141 137 .execute(conn) 142 138 .await 143 - .map(|v| v == 1) 144 139 } 145 140 146 141 pub async fn insert_block(
+27 -30
consumer/src/indexer/mod.rs
··· 277 277 conn: &mut AsyncPgConnection, 278 278 commit: AtpCommitEvent, 279 279 ) -> eyre::Result<()> { 280 + let (current_rev, sync_status) = db::get_repo_info(conn, &commit.repo).await?.unzip(); 281 + 280 282 // what's the backfill status of this account? this respects locks held by the backfiller. 281 283 // we should drop events for 'dirty' and queue 'processing' 282 284 // if we can't find the actor, we should add them and then drop the event as with 'dirty' 283 - let is_active = match db::account_sync_status(conn, &commit.repo).await? { 285 + let is_active = match sync_status { 284 286 Some(ActorSyncState::Synced) => true, 285 287 Some(ActorSyncState::Processing) => false, 286 288 Some(ActorSyncState::Dirty) => { ··· 325 327 } 326 328 }; 327 329 330 + if let Some(current_rev) = current_rev.flatten() { 331 + if current_rev >= commit.rev { 332 + tracing::debug!("Got a repo update older than the current rev. not processing ops."); 333 + return Ok(()); 334 + } 335 + } 336 + 328 337 // turn the car slice into a map of cid:block 329 338 let car_reader = iroh_car::CarReader::new(commit.blocks.as_slice()).await?; 330 339 let blocks = car_reader ··· 333 342 .collect::<HashMap<Cid, Vec<u8>>>() 334 343 .await; 335 344 336 - conn.transaction::<_, diesel::result::Error, _>(|t| { 337 - Box::pin(async move { 338 - if is_active { 339 - // ensure new repo rev > current rev, but only when the repo is synced 340 - // (if repo is processing, they'll end up on the queue anyway) 341 - let updated = 342 - db::update_repo_version(t, &commit.repo, &commit.rev, commit.commit).await?; 343 - 344 - if !updated { 345 - tracing::debug!( 346 - "Got a repo update older than the current rev. not processing ops." 347 - ); 348 - return Ok(false); 349 - } 345 + if is_active { 346 + conn.transaction::<_, diesel::result::Error, _>(|t| { 347 + Box::pin(async move { 348 + db::update_repo_version(t, &commit.repo, &commit.rev, commit.commit).await?; 350 349 351 350 for op in &commit.ops { 352 351 process_op(t, &mut state.idxc_tx, &commit.repo, op, &blocks).await?; 353 352 } 354 - } else { 355 - let items = commit 356 - .ops 357 - .iter() 358 - .filter_map(|op| process_op_bf(&commit.repo, op, &blocks)) 359 - .collect::<Vec<_>>(); 360 - 361 - let items = serde_json::to_value(items).unwrap_or_default(); 362 - 363 - db::write_backfill_row(t, &commit.repo, &commit.rev, commit.commit, items).await?; 364 - } 365 - 366 - Ok(true) 353 + Ok(true) 354 + }) 367 355 }) 368 - }) 369 - .await?; 356 + .await?; 357 + } else { 358 + let items = commit 359 + .ops 360 + .iter() 361 + .filter_map(|op| process_op_bf(&commit.repo, op, &blocks)) 362 + .collect::<Vec<_>>(); 363 + let items = serde_json::to_value(items).unwrap_or_default(); 364 + 365 + db::write_backfill_row(conn, &commit.repo, &commit.rev, commit.commit, items).await?; 366 + } 370 367 371 368 Ok(()) 372 369 }