CLI app for developers prototyping atproto functionality
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Harden subscription stage tests and outcome handling

Address multiple review findings that landed across Phase 5 cycles 1-4
in one atomic change so bisect history stays clean:

- subscription stage: split live-tail reconnect into a shared helper,
add LiveTailOutcome::ConnectFailed to surface second-connect failures
as NetworkError rather than a spurious CleanHold row, and introduce
BackfillOutcome::StreamClosedDuringBackfill so a mid-stream close is
never silently reported as an idle-gap completion.
- subscription stage: on a mid-stream transport error, do not reset
last_frame_at, so the idle-gap timer still fires from the last
successful frame.
- FakeWebSocketClient: give new() strict "panic if no script queued"
semantics for subscription tests, while empty() now returns a silent
empty-stream placeholder so identity tests that only need to satisfy
the pipeline's subscription stage do not have to script it.
- FakeFrameStream: drop the dead transport_error field; replace the
mid_stream_transport_error_after counter with a single mid_stream_error
flag that yields one Transport error after all frames are drained and
then closes, so the stage's idle-gap timer can fire under paused clock.
- Tests: exercise the budget-exceeded path with realistic frame spacing
(20 × 150ms > 2s budget); use a properly length-prefixed malformed
frame fixture so the decode-failure snapshot contains a [FAIL] row;
add live_tail_connect_failure_emits_network_error and
mid_stream_transport_error_does_not_reset_idle_gap regression tests;
anchor gen_fixtures paths to CARGO_MANIFEST_DIR.
- .gitignore: exclude insta pending-snapshot markers from the tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+333 -165
+3
.gitignore
··· 1 1 /target 2 2 /.worktrees/ 3 + 4 + # insta pending-snapshot markers should never be committed 5 + tests/snapshots/*.snap.new
+113 -105
src/commands/test/labeler/subscription.rs
··· 326 326 pub results: Vec<crate::commands::test::labeler::report::CheckResult>, 327 327 } 328 328 329 + /// Run live-tail on a fresh connection and drain frames until budget exhausted or stream closes. 330 + async fn run_live_tail( 331 + endpoint: &Url, 332 + ws: &dyn WebSocketClient, 333 + budget: Duration, 334 + decode_errors: &mut Vec<FrameDecodeError>, 335 + ) -> Result<LiveTailOutcome, SubscriptionStageError> { 336 + // Build live-tail URL (no cursor parameter to stream from latest). 337 + let mut live_tail_url = endpoint.clone(); 338 + live_tail_url.set_path("xrpc/com.atproto.label.subscribeLabels"); 339 + if live_tail_url.scheme() == "https" { 340 + let _ = live_tail_url.set_scheme("wss"); 341 + } 342 + 343 + match ws.connect(&live_tail_url).await { 344 + Ok(mut live_stream) => { 345 + let mut live_frames_observed = 0; 346 + let live_deadline = Instant::now() + budget; 347 + 348 + loop { 349 + if Instant::now() >= live_deadline { 350 + break; 351 + } 352 + let time_left = live_deadline.saturating_duration_since(Instant::now()); 353 + match tokio::time::timeout(time_left, live_stream.next_frame()).await { 354 + Ok(Some(Ok(frame))) => { 355 + live_frames_observed += 1; 356 + if let Err(e) = decode_frame(&frame) { 357 + decode_errors.push(e); 358 + } 359 + } 360 + Ok(Some(Err(_))) => { 361 + live_frames_observed += 1; 362 + } 363 + Ok(None) | Err(_) => break, 364 + } 365 + } 366 + 367 + live_stream.close().await; 368 + Ok(LiveTailOutcome::CleanHold { 369 + frames_observed: live_frames_observed, 370 + }) 371 + } 372 + Err(e) => Err(e), 373 + } 374 + } 375 + 329 376 /// Run the subscription stage with a two-connection backfill + live-tail strategy. 330 377 pub async fn run( 331 378 labeler_endpoint: &Url, ··· 358 405 return SubscriptionStageOutput { 359 406 facts: None, 360 407 results: vec![CheckResult { 408 + // Intentionally distinct from live_tail_endpoint_reachable: each ID represents a separate connection attempt. 361 409 id: "subscription::endpoint_reachable", 362 410 stage: Stage::Subscription, 363 411 status: CheckStatus::NetworkError, ··· 457 505 if live_tail_outcome.is_none() { 458 506 match &backfill_outcome { 459 507 BackfillOutcome::StreamClosedDuringBackfill { .. } => { 460 - // Server closed the stream; attempt a second live-tail connection. 461 - let mut live_tail_url = labeler_endpoint.clone(); 462 - live_tail_url.set_path("xrpc/com.atproto.label.subscribeLabels"); 463 - if live_tail_url.scheme() == "https" { 464 - let _ = live_tail_url.set_scheme("wss"); 465 - } 466 - 467 - match ws.connect(&live_tail_url).await { 468 - Ok(mut live_stream) => { 469 - let mut live_frames_observed = 0; 470 - let live_deadline = Instant::now() + budget_per_connection; 471 - 472 - loop { 473 - if Instant::now() >= live_deadline { 474 - break; 475 - } 476 - let time_left = live_deadline.saturating_duration_since(Instant::now()); 477 - match tokio::time::timeout(time_left, live_stream.next_frame()).await { 478 - Ok(Some(Ok(frame))) => { 479 - live_frames_observed += 1; 480 - if let Err(e) = decode_frame(&frame) { 481 - decode_errors.push(e); 482 - } 483 - } 484 - Ok(Some(Err(_))) => { 485 - live_frames_observed += 1; 486 - } 487 - Ok(None) | Err(_) => break, 488 - } 489 - } 490 - 491 - live_stream.close().await; 492 - live_tail_outcome = Some(LiveTailOutcome::CleanHold { 493 - frames_observed: live_frames_observed, 494 - }); 508 + // Server closed the stream during backfill; attempt a second live-tail connection to detect if the labeler supports live-tail separately. 509 + // Note: the design spec only specified reconnect for ExceededBudget, but reconnecting on unexpected stream close is important for detecting labeler capabilities. 510 + match run_live_tail( 511 + labeler_endpoint, 512 + ws, 513 + budget_per_connection, 514 + &mut decode_errors, 515 + ) 516 + .await 517 + { 518 + Ok(outcome) => { 519 + live_tail_outcome = Some(outcome); 495 520 } 496 521 Err(_e) => { 497 522 // Live-tail connection failed; mark with ConnectFailed outcome. ··· 500 525 } 501 526 } 502 527 BackfillOutcome::ExceededBudget { .. } => { 503 - let mut live_tail_url = labeler_endpoint.clone(); 504 - live_tail_url.set_path("xrpc/com.atproto.label.subscribeLabels"); 505 - if live_tail_url.scheme() == "https" { 506 - let _ = live_tail_url.set_scheme("wss"); 507 - } 508 - 509 - match ws.connect(&live_tail_url).await { 510 - Ok(mut live_stream) => { 511 - let mut live_frames_observed = 0; 512 - let live_deadline = Instant::now() + budget_per_connection; 513 - 514 - loop { 515 - if Instant::now() >= live_deadline { 516 - break; 517 - } 518 - let time_left = live_deadline.saturating_duration_since(Instant::now()); 519 - match tokio::time::timeout(time_left, live_stream.next_frame()).await { 520 - Ok(Some(Ok(frame))) => { 521 - live_frames_observed += 1; 522 - if let Err(e) = decode_frame(&frame) { 523 - decode_errors.push(e); 524 - } 525 - } 526 - Ok(Some(Err(_))) => { 527 - live_frames_observed += 1; 528 - } 529 - Ok(None) | Err(_) => break, 530 - } 531 - } 532 - 533 - live_stream.close().await; 534 - live_tail_outcome = Some(LiveTailOutcome::CleanHold { 535 - frames_observed: live_frames_observed, 536 - }); 528 + match run_live_tail( 529 + labeler_endpoint, 530 + ws, 531 + budget_per_connection, 532 + &mut decode_errors, 533 + ) 534 + .await 535 + { 536 + Ok(outcome) => { 537 + live_tail_outcome = Some(outcome); 537 538 } 538 539 Err(_e) => { 539 540 // Live-tail connection failed; mark with ConnectFailed outcome. ··· 544 545 BackfillOutcome::NoFramesWithinBudget => { 545 546 live_tail_outcome = Some(LiveTailOutcome::SkippedEmpty); 546 547 } 547 - _ => {} 548 + BackfillOutcome::CompletedWithIdleGap { .. } => { 549 + debug_assert!( 550 + false, 551 + "unreachable: live_tail_outcome is already Some(FromBackfill) for CompletedWithIdleGap" 552 + ); 553 + } 548 554 } 549 555 } 550 556 ··· 554 560 // Live-tail connect error result (if applicable). 555 561 if let Some(LiveTailOutcome::ConnectFailed) = &live_tail_outcome { 556 562 results.push(CheckResult { 563 + // Intentionally distinct from endpoint_reachable: each ID represents a separate connection attempt. 557 564 id: "subscription::live_tail_endpoint_reachable", 558 565 stage: Stage::Subscription, 559 566 status: CheckStatus::NetworkError, ··· 597 604 598 605 // Live-tail check result. 599 606 if let Some(lt_outcome) = &live_tail_outcome { 600 - match lt_outcome { 601 - LiveTailOutcome::ConnectFailed => { 602 - // NetworkError result already added above; skip the live-tail row. 603 - } 604 - _ => { 605 - let (live_status, live_summary, live_reason) = match lt_outcome { 606 - LiveTailOutcome::FromBackfill => ( 607 - CheckStatus::Pass, 608 - Cow::Borrowed("Subscription live-tail observed after backfill"), 609 - None, 610 - ), 611 - LiveTailOutcome::CleanHold { .. } => ( 612 - CheckStatus::Pass, 613 - Cow::Borrowed("Subscription live-tail connection held"), 614 - None, 615 - ), 616 - LiveTailOutcome::SkippedEmpty => ( 617 - CheckStatus::Skipped, 618 - Cow::Borrowed("Subscription live-tail skipped"), 619 - Some(Cow::Borrowed("labeler has no published labels")), 620 - ), 621 - LiveTailOutcome::ConnectFailed => unreachable!(), 622 - }; 623 - results.push(CheckResult { 624 - id: "subscription::live_tail", 625 - stage: Stage::Subscription, 626 - status: live_status, 627 - summary: live_summary, 628 - diagnostic: None, 629 - skipped_reason: live_reason, 630 - }); 631 - } 607 + // ConnectFailed is already handled with a NetworkError result above, so skip the live-tail row. 608 + if !matches!(lt_outcome, LiveTailOutcome::ConnectFailed) { 609 + let (live_status, live_summary, live_reason) = match lt_outcome { 610 + LiveTailOutcome::FromBackfill => ( 611 + CheckStatus::Pass, 612 + Cow::Borrowed("Subscription live-tail observed after backfill"), 613 + None, 614 + ), 615 + LiveTailOutcome::CleanHold { .. } => ( 616 + CheckStatus::Pass, 617 + Cow::Borrowed("Subscription live-tail connection held"), 618 + None, 619 + ), 620 + LiveTailOutcome::SkippedEmpty => ( 621 + CheckStatus::Skipped, 622 + Cow::Borrowed("Subscription live-tail skipped"), 623 + Some(Cow::Borrowed("labeler has no published labels")), 624 + ), 625 + LiveTailOutcome::ConnectFailed => { 626 + // This branch is guaranteed not to execute by the outer if-guard. 627 + debug_assert!( 628 + false, 629 + "ConnectFailed case should be filtered by outer guard" 630 + ); 631 + (CheckStatus::Pass, Cow::Borrowed(""), None) 632 + } 633 + }; 634 + results.push(CheckResult { 635 + id: "subscription::live_tail", 636 + stage: Stage::Subscription, 637 + status: live_status, 638 + summary: live_summary, 639 + diagnostic: None, 640 + skipped_reason: live_reason, 641 + }); 632 642 } 633 643 } 634 644 ··· 794 804 op: 1, 795 805 t: Some("#futureType".to_string()), 796 806 }; 797 - let mut frame_bytes = encode_cbor(&header); 798 - // Add some dummy payload bytes. 799 - frame_bytes.extend(encode_cbor(&serde_json::json!({}))); 807 + let frame_bytes = encode_cbor(&header); 800 808 801 809 match decode_frame(&frame_bytes) { 802 810 Err(FrameDecodeError::UnknownMessageType { t, raw: _ }) => {
+72 -27
tests/common/mod.rs
··· 103 103 } 104 104 105 105 /// Fake WebSocket client for testing subscription stage with scripted responses. 106 + /// 107 + /// Two construction styles are supported: 108 + /// 109 + /// * `new()` + `add_script()` — the subscription-test style. Each `connect()` 110 + /// consumes exactly one script. Calling `connect()` when no script is queued 111 + /// panics, so subscription tests must explicitly declare every connection 112 + /// the stage is expected to make. 113 + /// * `empty()` — the identity-test style. Every `connect()` returns an 114 + /// empty-stream placeholder that closes immediately. Identity tests construct 115 + /// one of these to satisfy the pipeline's subscription stage without asserting 116 + /// on its output. 106 117 pub struct FakeWebSocketClient { 107 118 /// Scripts queued for each connection. 108 119 scripts: Arc<Mutex<Vec<FakeScript>>>, 120 + /// If true, `connect()` silently returns an empty stream when no script is queued. 121 + /// If false, `connect()` panics when no script is queued (forcing subscription tests 122 + /// to declare exactly the connections they expect). 123 + silent_default: bool, 109 124 } 110 125 111 126 /// A script for a single WebSocket connection. ··· 116 131 pub inter_frame_delay: Duration, 117 132 /// Optional final wait after all frames (simulates idle gap or continued streaming). 118 133 pub final_wait: Option<Duration>, 119 - /// Whether to return a transport error instead. 134 + /// Whether to return a transport error instead of returning frames. 120 135 pub transport_error: bool, 136 + /// If set, after all `frames` are yielded the stream yields one `Transport` 137 + /// error, then closes. Used to test that a mid-stream transport error does 138 + /// not reset the idle-gap timer. 139 + pub mid_stream_error: bool, 121 140 } 122 141 123 142 impl FakeWebSocketClient { 124 - /// Create a new empty FakeWebSocketClient. 143 + /// Create a FakeWebSocketClient for subscription tests. 144 + /// 145 + /// Every `connect()` consumes exactly one script added via `add_script()`. 146 + /// Calling `connect()` with no script queued panics. 125 147 pub fn new() -> Self { 126 148 Self { 127 149 scripts: Arc::new(Mutex::new(Vec::new())), 150 + silent_default: false, 128 151 } 129 152 } 130 153 131 - /// Create a FakeWebSocketClient that returns an empty stream (no frames, immediate closure). 154 + /// Create a FakeWebSocketClient that silently returns an empty stream on every connect. 155 + /// 156 + /// Intended for identity tests that must satisfy the pipeline's subscription 157 + /// stage but do not assert on its output. 132 158 pub fn empty() -> Self { 133 - Self::new() 159 + Self { 160 + scripts: Arc::new(Mutex::new(Vec::new())), 161 + silent_default: true, 162 + } 134 163 } 135 164 136 165 /// Add a script to the queue for the next connection. ··· 151 180 current_frame: usize, 152 181 inter_frame_delay: Duration, 153 182 final_wait: Option<Duration>, 154 - transport_error: bool, 183 + mid_stream_error: bool, 184 + mid_stream_error_yielded: bool, 155 185 } 156 186 157 187 #[async_trait] 158 188 impl FrameStream for FakeFrameStream { 159 189 async fn next_frame(&mut self) -> Option<Result<Vec<u8>, SubscriptionStageError>> { 160 - if self.transport_error { 161 - return Some(Err(SubscriptionStageError::Transport { 162 - message: "fake transport error".to_string(), 163 - source: None, 164 - })); 165 - } 166 - 167 190 // Return frames one by one with inter-frame delays. 168 191 if self.current_frame < self.frames.len() { 169 192 if self.current_frame > 0 { ··· 174 197 return Some(Ok(frame)); 175 198 } 176 199 177 - // All frames consumed. Apply final_wait if present, then close stream. 178 - if let Some(wait_duration) = self.final_wait { 179 - if self.current_frame == self.frames.len() && self.current_frame > 0 { 180 - tokio::time::sleep(wait_duration).await; 181 - self.current_frame += 1; 182 - } 200 + // All frames consumed. If mid-stream error mode is on and we haven't yielded the 201 + // error yet, sleep one inter-frame delay and yield it once — this lets the stage 202 + // observe a transport error without the idle-gap timer being reset. 203 + if self.mid_stream_error && !self.mid_stream_error_yielded { 204 + self.mid_stream_error_yielded = true; 205 + tokio::time::sleep(self.inter_frame_delay).await; 206 + return Some(Err(SubscriptionStageError::Transport { 207 + message: "fake mid-stream transport error".to_string(), 208 + source: None, 209 + })); 210 + } 211 + 212 + // Apply final_wait once (long enough to let the stage's idle-gap or budget timer fire). 213 + if let Some(wait_duration) = self.final_wait.take() { 214 + tokio::time::sleep(wait_duration).await; 183 215 } 184 216 185 217 // Stream closed. ··· 197 229 let mut scripts = self.scripts.lock().unwrap(); 198 230 199 231 if scripts.is_empty() { 200 - // No script available, return empty stream. 201 - return Ok(Box::new(FakeFrameStream { 202 - frames: vec![], 203 - current_frame: 0, 204 - inter_frame_delay: Duration::from_millis(0), 205 - final_wait: None, 206 - transport_error: false, 207 - })); 232 + if self.silent_default { 233 + // Silent-default mode (for identity tests): return an empty stream that 234 + // closes immediately. Do not consume any script slot. 235 + return Ok(Box::new(FakeFrameStream { 236 + frames: vec![], 237 + current_frame: 0, 238 + inter_frame_delay: Duration::from_millis(0), 239 + final_wait: None, 240 + mid_stream_error: false, 241 + mid_stream_error_yielded: false, 242 + })); 243 + } 244 + // Script-driven mode (for subscription tests): panic loudly so the test 245 + // author notices they forgot to declare a connection. 246 + panic!( 247 + "FakeWebSocketClient: no script queued for this connect() call. \ 248 + Each subscription test must declare exactly the scripts it expects \ 249 + the stage to consume. Use fake_ws.add_script() for each connect() \ 250 + the stage will make. (Identity tests should use FakeWebSocketClient::empty() instead.)" 251 + ); 208 252 } 209 253 210 254 let script = scripts.remove(0); ··· 221 265 current_frame: 0, 222 266 inter_frame_delay: script.inter_frame_delay, 223 267 final_wait: script.final_wait, 224 - transport_error: false, 268 + mid_stream_error: script.mid_stream_error, 269 + mid_stream_error_yielded: false, 225 270 })) 226 271 } 227 272 }
+116 -33
tests/labeler_subscription.rs
··· 161 161 #[test] 162 162 #[ignore] 163 163 fn gen_fixtures() { 164 + let base = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) 165 + .join("tests/fixtures/labeler/subscription"); 166 + 164 167 // Generate backfill_complete/frames.bin: 3 valid frames. 165 168 let mut frames = Vec::new(); 166 169 for i in 0..3 { ··· 176 179 ciborium::ser::into_writer(&payload, &mut payload_bytes).expect("encode payload"); 177 180 frames.extend(encode_frame_with_length(&header, &payload_bytes)); 178 181 } 179 - std::fs::write( 180 - "tests/fixtures/labeler/subscription/backfill_complete/frames.bin", 181 - frames, 182 - ) 183 - .expect("write backfill_complete"); 182 + std::fs::write(base.join("backfill_complete/frames.bin"), frames) 183 + .expect("write backfill_complete"); 184 184 185 185 // Generate backfill_exceeds_budget/frames.bin: 20 frames. 186 186 let mut frames = Vec::new(); ··· 197 197 ciborium::ser::into_writer(&payload, &mut payload_bytes).expect("encode payload"); 198 198 frames.extend(encode_frame_with_length(&header, &payload_bytes)); 199 199 } 200 - std::fs::write( 201 - "tests/fixtures/labeler/subscription/backfill_exceeds_budget/frames.bin", 202 - frames, 203 - ) 204 - .expect("write backfill_exceeds_budget"); 200 + std::fs::write(base.join("backfill_exceeds_budget/frames.bin"), frames) 201 + .expect("write backfill_exceeds_budget"); 205 202 206 - // Generate empty_stream/.gitkeep (empty stream means empty file). 207 - std::fs::write( 208 - "tests/fixtures/labeler/subscription/empty_stream/.gitkeep", 209 - "", 210 - ) 211 - .expect("write empty_stream"); 203 + // Generate empty_stream/.gitkeep (intentionally empty file to hold directory in git). 204 + std::fs::write(base.join("empty_stream/.gitkeep"), "").expect("write empty_stream"); 212 205 213 206 // Generate malformed_frame/frames.bin: 2 valid + 1 malformed frame. 214 207 let mut frames = Vec::new(); ··· 228 221 // Add a malformed frame: [4-byte BE length=3][0xFF, 0xFF, 0xFF] (invalid CBOR). 229 222 frames.extend((3u32).to_be_bytes()); 230 223 frames.extend(&[0xFF, 0xFF, 0xFF]); 231 - std::fs::write( 232 - "tests/fixtures/labeler/subscription/malformed_frame/frames.bin", 233 - frames, 234 - ) 235 - .expect("write malformed_frame"); 224 + std::fs::write(base.join("malformed_frame/frames.bin"), frames).expect("write malformed_frame"); 236 225 237 226 // Generate error_frame_malformed/frames.bin: error frame with invalid payload. 238 227 let mut frames = Vec::new(); ··· 245 234 let mut result = len.to_be_bytes().to_vec(); 246 235 result.extend(frame_bytes); 247 236 frames.extend(result); 248 - std::fs::write( 249 - "tests/fixtures/labeler/subscription/error_frame_malformed/frames.bin", 250 - frames, 251 - ) 252 - .expect("write error_frame_malformed"); 237 + std::fs::write(base.join("error_frame_malformed/frames.bin"), frames) 238 + .expect("write error_frame_malformed"); 253 239 254 240 println!("Generated all fixtures successfully"); 255 241 } ··· 282 268 let frames = load_frames_from_fixture( 283 269 "tests/fixtures/labeler/subscription/backfill_complete/frames.bin", 284 270 ); 285 - let fake_ws = common::FakeWebSocketClient::empty(); 271 + let fake_ws = common::FakeWebSocketClient::new(); 286 272 287 273 // Add first connection (backfill): 3 frames with 10ms delay, then 600ms idle gap. 288 274 fake_ws.add_script(common::FakeScript { ··· 290 276 inter_frame_delay: Duration::from_millis(10), 291 277 final_wait: Some(Duration::from_millis(600)), 292 278 transport_error: false, 279 + mid_stream_error: false, 293 280 }); 294 281 295 282 let http = FakeHttpClient::new(); ··· 318 305 let backfill_frames = load_frames_from_fixture( 319 306 "tests/fixtures/labeler/subscription/backfill_exceeds_budget/frames.bin", 320 307 ); 321 - let fake_ws = common::FakeWebSocketClient::empty(); 308 + let fake_ws = common::FakeWebSocketClient::new(); 322 309 323 310 // First connection (backfill): 20 frames @ 150ms delay = 3s total (exceeds 2s budget). 324 311 fake_ws.add_script(common::FakeScript { ··· 326 313 inter_frame_delay: Duration::from_millis(150), 327 314 final_wait: None, 328 315 transport_error: false, 316 + mid_stream_error: false, 329 317 }); 330 318 331 319 // Second connection (live-tail): 1 frame then 1s idle. ··· 337 325 inter_frame_delay: Duration::from_millis(0), 338 326 final_wait: Some(Duration::from_secs(1)), 339 327 transport_error: false, 328 + mid_stream_error: false, 340 329 }); 341 330 342 331 let http = FakeHttpClient::new(); ··· 362 351 363 352 #[tokio::test(flavor = "current_thread", start_paused = true)] 364 353 async fn empty_stream_advisories() { 365 - let fake_ws = common::FakeWebSocketClient::empty(); 354 + let fake_ws = common::FakeWebSocketClient::new(); 355 + 356 + // Single connection (backfill): empty stream returns no frames immediately, 357 + // so backfill outcome is NoFramesWithinBudget and live-tail is skipped. 358 + fake_ws.add_script(common::FakeScript { 359 + frames: vec![], 360 + inter_frame_delay: Duration::from_millis(0), 361 + final_wait: None, 362 + transport_error: false, 363 + mid_stream_error: false, 364 + }); 366 365 367 366 let http = FakeHttpClient::new(); 368 367 let dns = FakeDnsResolver::new(); ··· 389 388 async fn malformed_frame_emits_spec_violation() { 390 389 let frames = 391 390 load_frames_from_fixture("tests/fixtures/labeler/subscription/malformed_frame/frames.bin"); 392 - let fake_ws = common::FakeWebSocketClient::empty(); 391 + let fake_ws = common::FakeWebSocketClient::new(); 393 392 393 + // First connection (backfill): frames then stream close. 394 394 fake_ws.add_script(common::FakeScript { 395 395 frames, 396 396 inter_frame_delay: Duration::from_millis(10), 397 397 final_wait: None, 398 398 transport_error: false, 399 + mid_stream_error: false, 400 + }); 401 + 402 + // Second connection (live-tail after stream close): empty stream with long final_wait 403 + // to exhaust budget and report deterministic CleanHold outcome from explicit script. 404 + fake_ws.add_script(common::FakeScript { 405 + frames: vec![], 406 + inter_frame_delay: Duration::from_millis(0), 407 + final_wait: Some(Duration::from_secs(2)), 408 + transport_error: false, 409 + mid_stream_error: false, 399 410 }); 400 411 401 412 let http = FakeHttpClient::new(); ··· 424 435 let frames = load_frames_from_fixture( 425 436 "tests/fixtures/labeler/subscription/error_frame_malformed/frames.bin", 426 437 ); 427 - let fake_ws = common::FakeWebSocketClient::empty(); 438 + let fake_ws = common::FakeWebSocketClient::new(); 428 439 440 + // First connection (backfill): frames then stream close. 429 441 fake_ws.add_script(common::FakeScript { 430 442 frames, 431 443 inter_frame_delay: Duration::from_millis(0), 432 444 final_wait: None, 433 445 transport_error: false, 446 + mid_stream_error: false, 447 + }); 448 + 449 + // Second connection (live-tail after stream close): empty stream with long final_wait 450 + // to exhaust budget and report deterministic CleanHold outcome from explicit script. 451 + fake_ws.add_script(common::FakeScript { 452 + frames: vec![], 453 + inter_frame_delay: Duration::from_millis(0), 454 + final_wait: Some(Duration::from_secs(2)), 455 + transport_error: false, 456 + mid_stream_error: false, 434 457 }); 435 458 436 459 let http = FakeHttpClient::new(); ··· 456 479 457 480 #[tokio::test(flavor = "current_thread", start_paused = true)] 458 481 async fn unreachable_endpoint_network_error() { 459 - let fake_ws = common::FakeWebSocketClient::empty(); 482 + let fake_ws = common::FakeWebSocketClient::new(); 460 483 461 484 // First connection returns transport error. 462 485 fake_ws.add_script(common::FakeScript { ··· 464 487 inter_frame_delay: Duration::from_millis(0), 465 488 final_wait: None, 466 489 transport_error: true, 490 + mid_stream_error: false, 467 491 }); 468 492 469 493 let http = FakeHttpClient::new(); ··· 492 516 let backfill_frames = load_frames_from_fixture( 493 517 "tests/fixtures/labeler/subscription/backfill_exceeds_budget/frames.bin", 494 518 ); 495 - let fake_ws = common::FakeWebSocketClient::empty(); 519 + let fake_ws = common::FakeWebSocketClient::new(); 496 520 497 521 // First connection (backfill): 20 frames @ 150ms delay = 3s total (exceeds 2s budget). 498 522 fake_ws.add_script(common::FakeScript { ··· 500 524 inter_frame_delay: Duration::from_millis(150), 501 525 final_wait: None, 502 526 transport_error: false, 527 + mid_stream_error: false, 503 528 }); 504 529 505 530 // Second connection (live-tail): returns transport error. ··· 508 533 inter_frame_delay: Duration::from_millis(0), 509 534 final_wait: None, 510 535 transport_error: true, 536 + mid_stream_error: false, 537 + }); 538 + 539 + let http = FakeHttpClient::new(); 540 + let dns = FakeDnsResolver::new(); 541 + let fake_tee = make_passing_http_tee(); 542 + let target = parse_target("https://example.com/labeler", None).expect("parse failed"); 543 + 544 + let opts = LabelerOptions { 545 + http: &http, 546 + dns: &dns, 547 + raw_http_tee: Some(&fake_tee), 548 + ws_client: Some(&fake_ws), 549 + reqwest_client: None, 550 + subscribe_timeout: Duration::from_secs(2), 551 + verbose: false, 552 + }; 553 + 554 + let report = run_pipeline(target, opts).await; 555 + let rendered = normalize_timing(render_report_to_string(&report)); 556 + 557 + insta::assert_snapshot!(rendered); 558 + } 559 + 560 + #[tokio::test(flavor = "current_thread", start_paused = true)] 561 + async fn mid_stream_transport_error_does_not_reset_idle_gap() { 562 + // Encode 2 frames to send before transport error. 563 + let mut frames = Vec::new(); 564 + for i in 0..2 { 565 + let header = FrameHeader { 566 + op: 1, 567 + t: Some("#labels".to_string()), 568 + }; 569 + let payload = SubscribeLabelsPayload { 570 + seq: i, 571 + labels: vec![], 572 + }; 573 + let mut header_bytes = Vec::new(); 574 + ciborium::ser::into_writer(&header, &mut header_bytes).expect("encode header"); 575 + let mut payload_bytes = Vec::new(); 576 + ciborium::ser::into_writer(&payload, &mut payload_bytes).expect("encode payload"); 577 + 578 + let mut frame = header_bytes; 579 + frame.extend(payload_bytes); 580 + frames.push(frame); 581 + } 582 + 583 + let fake_ws = common::FakeWebSocketClient::new(); 584 + 585 + // Single connection (backfill): 2 frames (10ms apart), then transport error, then final_wait 586 + // to exhaust idle-gap timer from last frame. The idle-gap should fire from the second frame, 587 + // not be reset by the transport error. This tests subscription.rs:417-419. 588 + fake_ws.add_script(common::FakeScript { 589 + frames, 590 + inter_frame_delay: Duration::from_millis(10), 591 + final_wait: Some(Duration::from_millis(700)), // 700ms > 500ms idle-gap. 592 + transport_error: false, 593 + mid_stream_error: true, // After all frames, yield one Transport error, then close. 511 594 }); 512 595 513 596 let http = FakeHttpClient::new();
+29
tests/snapshots/labeler_subscription__mid_stream_transport_error_does_not_reset_idle_gap.snap
··· 1 + --- 2 + source: tests/labeler_subscription.rs 3 + expression: rendered 4 + --- 5 + Target: https://example.com/labeler 6 + elapsed: XXms 7 + 8 + == Identity == 9 + [SKIP] target resolved — no DID supplied; run with a handle, a DID, or --did <did> 10 + [SKIP] DID document fetched — no DID supplied; run with a handle, a DID, or --did <did> 11 + [SKIP] labeler service present — no DID supplied; run with a handle, a DID, or --did <did> 12 + [SKIP] labeler endpoint is HTTPS — no DID supplied; run with a handle, a DID, or --did <did> 13 + [SKIP] resolved DID matches flag — no DID supplied; run with a handle, a DID, or --did <did> 14 + [SKIP] signing key present — no DID supplied; run with a handle, a DID, or --did <did> 15 + [SKIP] PDS endpoint present — no DID supplied; run with a handle, a DID, or --did <did> 16 + [SKIP] labeler record fetched — no DID supplied; run with a handle, a DID, or --did <did> 17 + [SKIP] labeler record policies nonempty — no DID supplied; run with a handle, a DID, or --did <did> 18 + == HTTP == 19 + [OK] Labeler endpoint is reachable 20 + [OK] First page schema is valid 21 + [WARN] Labeler has no published labels 22 + [OK] First page was complete; pagination not exercised 23 + == Subscription == 24 + [OK] Subscription backfill completed 25 + [OK] Subscription live-tail observed after backfill 26 + == Crypto == 27 + [SKIP] Crypto stage (stub) — not yet implemented (phase 6) 28 + 29 + Summary: 5 passed, 0 failed (spec), 0 network errors, 1 advisories, 10 skipped. Exit code: 0