this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Guard per-keeper SSE applies against a mid-event stop

The consumer loop read `started_flag` once at the top of each
iteration, then ran three separate lock-protected applies
(tree_keeper → workspace_keeper → inbox_keeper) against the same
event. If `stopSseConsumer` flipped the flag while the consumer was
awaiting any of those locks — and `wipeState` spawned its own task
to drain them — a final event could slot in *after* the drain and
re-inhabit the keepers with outgoing-session material.

Re-check the flag inside each apply step, *after* the relevant
keeper lock has been acquired. On a mid-event stop the consumer
bails instead of mutating drained state; the wipe task's drain then
races safely against nothing. Both orderings (consumer-wins-lock /
wipe-wins-lock) are now symmetric: the loser checks the flag and
returns.

Thread `started_flag` into `apply_keyring_to_workspace_keeper` and
`apply_grant_to_inbox_keeper` so they can perform the check-then-
act under their own lock. Inline the equivalent guard for the
tree_keeper apply in the main loop.

+36 -2
+36 -2
crates/opake-wasm/src/sse_wasm.rs
··· 400 400 } 401 401 } else { 402 402 let mut keeper = tree_keeper_rc.lock().await; 403 + // Re-check the flag after acquiring the lock: if the 404 + // consumer was stopped while we were waiting for it, 405 + // bail instead of re-inhabiting the tree the wipe 406 + // task is about to drain (or just drained). 407 + if !started_flag.get() { 408 + log::debug!("[sse] consumer stopped while awaiting tree_keeper lock"); 409 + break; 410 + } 403 411 if let Err(e) = keeper.apply_event(&event) { 404 412 log::warn!("[sse] tree_keeper apply failed: {e}"); 405 413 } ··· 409 417 // so subscribers see changes without an indexer round- 410 418 // trip. Idempotent upserts (same rotation + same data) 411 419 // don't re-fire watchers — see `WorkspaceKeeper::upsert`. 412 - apply_keyring_to_workspace_keeper(&opake_rc, &workspace_keeper_rc, &event).await; 420 + // 421 + // Same post-lock flag recheck as the tree apply above: 422 + // the helper bails internally if a stop landed while it 423 + // was waiting for the workspace_keeper / inbox_keeper 424 + // mutex. 425 + apply_keyring_to_workspace_keeper( 426 + &opake_rc, 427 + &workspace_keeper_rc, 428 + &started_flag, 429 + &event, 430 + ) 431 + .await; 413 432 414 433 // Grant events: apply to the inbox keeper so the 415 434 // "Shared with me" view updates live. 416 - apply_grant_to_inbox_keeper(&opake_rc, &inbox_keeper_rc, &event).await; 435 + apply_grant_to_inbox_keeper(&opake_rc, &inbox_keeper_rc, &started_flag, &event) 436 + .await; 417 437 } 418 438 // Task exited — clear the flag in case we broke on a 419 439 // transport error rather than an explicit stop, so a ··· 619 639 async fn apply_keyring_to_workspace_keeper( 620 640 opake_rc: &Rc<Mutex<Option<WasmOpake>>>, 621 641 workspace_keeper_rc: &Rc<Mutex<WorkspaceKeeper>>, 642 + started_flag: &Rc<std::cell::Cell<bool>>, 622 643 event: &SseEvent, 623 644 ) { 624 645 match event { ··· 645 666 wk::try_build_entry_from_sse_record(record, &did, &private_key) 646 667 }; 647 668 let mut keeper = workspace_keeper_rc.lock().await; 669 + if !started_flag.get() { 670 + return; 671 + } 648 672 keeper.apply_keyring_record(&record.uri, maybe_entry); 649 673 } 650 674 SseEvent::KeyringDelete(payload) => { 651 675 if let Some(uri) = payload.best_uri() { 652 676 let mut keeper = workspace_keeper_rc.lock().await; 677 + if !started_flag.get() { 678 + return; 679 + } 653 680 keeper.delete(uri); 654 681 } 655 682 } ··· 664 691 async fn apply_grant_to_inbox_keeper( 665 692 opake_rc: &Rc<Mutex<Option<WasmOpake>>>, 666 693 inbox_keeper_rc: &Rc<Mutex<InboxKeeper>>, 694 + started_flag: &Rc<std::cell::Cell<bool>>, 667 695 event: &SseEvent, 668 696 ) { 669 697 match event { ··· 684 712 return; 685 713 }; 686 714 let mut keeper = inbox_keeper_rc.lock().await; 715 + if !started_flag.get() { 716 + return; 717 + } 687 718 keeper.upsert(entry); 688 719 } 689 720 SseEvent::GrantDelete(payload) => { 690 721 if let Some(uri) = payload.best_uri() { 691 722 let mut keeper = inbox_keeper_rc.lock().await; 723 + if !started_flag.get() { 724 + return; 725 + } 692 726 keeper.delete(uri); 693 727 } 694 728 }