CLI app for developers prototyping atproto functionality
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement subscription stage with two-connection strategy

Adds FrameStream and WebSocketClient traits for testable WebSocket
access. Implements RealWebSocketClient and RealFrameStream using
tokio-tungstenite. Implements the main subscription::run() function
with two-connection backfill + live-tail strategy, idle-gap detection,
and frame decode error tracking. Wires the subscription stage into
the pipeline driver, replacing the stub with real calls.

Fix identity test regression after subscription stage landing

Add WebSocketClient injection seam to LabelerOptions following the
raw_http_tee pattern. Exit the subscription drain loop on stream
closure (next_frame returning None), treating zero frames observed
as NoFramesWithinBudget without waiting for the full budget.
Identity integration tests inject an empty FakeWebSocketClient so the
subscription stage produces deterministic Advisory/Skipped rows in
their snapshots instead of triggering a real WebSocket connection.

All 12 identity tests now pass with updated snapshots reflecting the
new subscription stage output.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>

+659 -31
+1
src/commands/test/labeler.rs
··· 70 70 http: &http, 71 71 dns: &dns, 72 72 raw_http_tee: None, 73 + ws_client: None, 73 74 reqwest_client: Some(&reqwest_client), 74 75 subscribe_timeout: self.subscribe_timeout, 75 76 verbose: self.verbose,
+36 -13
src/commands/test/labeler/pipeline.rs
··· 12 12 use crate::commands::test::labeler::report::{ 13 13 CheckResult, CheckStatus, LabelerReport, ReportHeader, Stage, 14 14 }; 15 + use crate::commands::test::labeler::subscription::{self, RealWebSocketClient}; 15 16 use crate::common::identity::{Did, DnsResolver, HttpClient}; 16 17 17 18 /// A labeler target: either a resolvable identifier (handle or DID) or a raw endpoint URL. ··· 50 51 pub dns: &'a dyn DnsResolver, 51 52 /// HTTP tee for the labeler endpoint (optional, defaults to RealHttpTee). 52 53 pub raw_http_tee: Option<&'a dyn http::RawHttpTee>, 54 + /// WebSocket client for subscription stage (optional, defaults to RealWebSocketClient). 55 + pub ws_client: Option<&'a dyn subscription::WebSocketClient>, 53 56 /// Shared reqwest client for both identity and HTTP stages (optional). 54 57 pub reqwest_client: Option<&'a reqwest::Client>, 55 58 /// Per-connection time budget for subscription checks. ··· 238 241 }; 239 242 240 243 // Run the HTTP stage if we have an endpoint. 241 - if let Some(endpoint) = labeler_endpoint { 244 + if let Some(ref endpoint) = labeler_endpoint { 242 245 let http_output = if let Some(tee) = opts.raw_http_tee { 243 246 // Use the supplied tee (for testing). 244 247 http::run(tee).await ··· 277 280 }); 278 281 } 279 282 280 - // Stub subscription stage. 281 - let sub_reason = blocked_reason 282 - .clone() 283 - .or(Some(Cow::Borrowed("not yet implemented (phase 5)"))); 284 - report.record(CheckResult { 285 - id: "subscription::stub", 286 - stage: Stage::Subscription, 287 - status: CheckStatus::Skipped, 288 - summary: Cow::Borrowed("Subscription stage (stub)"), 289 - diagnostic: None, 290 - skipped_reason: sub_reason, 291 - }); 283 + // Run the subscription stage if we have an endpoint. 284 + if let Some(endpoint) = &labeler_endpoint { 285 + let ws: &dyn subscription::WebSocketClient = if let Some(injected_ws) = opts.ws_client { 286 + injected_ws 287 + } else { 288 + &RealWebSocketClient 289 + }; 290 + let sub_output = subscription::run(endpoint, ws, opts.subscribe_timeout).await; 291 + for result in sub_output.results { 292 + report.record(result); 293 + } 294 + } else if identity_output.facts.is_none() && !is_no_did_supplied { 295 + // Subscription stage blocked by identity failures. 296 + report.record(CheckResult { 297 + id: "subscription::not_run", 298 + stage: Stage::Subscription, 299 + status: CheckStatus::Skipped, 300 + summary: Cow::Borrowed("Subscription stage (not run)"), 301 + diagnostic: None, 302 + skipped_reason: Some(Cow::Borrowed("blocked by identity stage failures")), 303 + }); 304 + } else { 305 + // Subscription stage not run because no endpoint could be derived. 306 + report.record(CheckResult { 307 + id: "subscription::not_run", 308 + stage: Stage::Subscription, 309 + status: CheckStatus::Skipped, 310 + summary: Cow::Borrowed("Subscription stage (not run)"), 311 + diagnostic: None, 312 + skipped_reason: Some(Cow::Borrowed("identity stage produced no labeler endpoint")), 313 + }); 314 + } 292 315 293 316 // Stub crypto stage. 294 317 let crypto_reason = blocked_reason.or(Some(Cow::Borrowed("not yet implemented (phase 6)")));
+419
src/commands/test/labeler/subscription.rs
··· 5 5 //! did not complete within the budget. 6 6 7 7 use std::sync::Arc; 8 + use std::time::{Duration, Instant}; 8 9 10 + use async_trait::async_trait; 9 11 use atrium_api::com::atproto::label::defs::Label; 12 + use futures_util::StreamExt; 13 + use miette::{Diagnostic, NamedSource, SourceSpan}; 10 14 use serde::{Deserialize, Serialize}; 15 + use thiserror::Error; 16 + use url::Url; 11 17 12 18 /// Frame header parsed from CBOR. 13 19 #[derive(Debug, Clone, Serialize, Deserialize)] ··· 181 187 pub live_tail_outcome: Option<LiveTailOutcome>, 182 188 /// Any frame decode errors encountered. 183 189 pub decode_errors: Vec<FrameDecodeError>, 190 + } 191 + 192 + /// Diagnostic for frame decode failures with source context. 193 + #[derive(Debug, Error, Diagnostic)] 194 + #[error("{message}")] 195 + #[diagnostic(code = "labeler::subscription::frame_decode")] 196 + pub struct FrameDecodeFailureDiagnostic { 197 + /// The error message. 198 + pub message: String, 199 + /// The raw frame bytes. 200 + #[source_code] 201 + pub source_code: NamedSource<Arc<[u8]>>, 202 + /// Span highlighting the first byte of the frame. 203 + #[label("frame decode failure")] 204 + pub span: SourceSpan, 205 + } 206 + 207 + /// Errors that can occur in the subscription stage. 208 + #[derive(Debug, Error)] 209 + pub enum SubscriptionStageError { 210 + /// Network or WebSocket transport error. 211 + #[error("Subscription transport error: {message}")] 212 + Transport { 213 + /// Human-readable error message. 214 + message: String, 215 + /// The underlying error, if available. 216 + #[source] 217 + source: Option<Box<dyn std::error::Error + Send + Sync>>, 218 + }, 219 + } 220 + 221 + /// A stream of WebSocket frames from a subscription connection. 222 + #[async_trait] 223 + pub trait FrameStream: Send { 224 + /// Retrieve the next frame from the stream, or None if the stream is closed. 225 + async fn next_frame(&mut self) -> Option<Result<Vec<u8>, SubscriptionStageError>>; 226 + 227 + /// Close the stream gracefully. 228 + async fn close(&mut self); 229 + } 230 + 231 + /// A WebSocket client for connecting to subscription endpoints. 232 + #[async_trait] 233 + pub trait WebSocketClient: Send + Sync { 234 + /// Connect to a WebSocket endpoint and return a frame stream. 235 + async fn connect(&self, url: &Url) -> Result<Box<dyn FrameStream>, SubscriptionStageError>; 236 + } 237 + 238 + /// Real WebSocket client using tokio-tungstenite. 239 + pub struct RealWebSocketClient; 240 + 241 + /// Real frame stream wrapping a tokio-tungstenite WebSocketStream. 242 + pub struct RealFrameStream { 243 + /// The underlying WebSocket stream. 244 + stream: tokio_tungstenite::WebSocketStream< 245 + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, 246 + >, 247 + } 248 + 249 + #[async_trait] 250 + impl FrameStream for RealFrameStream { 251 + async fn next_frame(&mut self) -> Option<Result<Vec<u8>, SubscriptionStageError>> { 252 + use tokio_tungstenite::tungstenite::Message; 253 + 254 + loop { 255 + match self.stream.next().await? { 256 + Ok(Message::Binary(data)) => { 257 + return Some(Ok(data.to_vec())); 258 + } 259 + Ok(Message::Text(_)) => { 260 + return Some(Err(SubscriptionStageError::Transport { 261 + message: "received text frame, expected binary".to_string(), 262 + source: None, 263 + })); 264 + } 265 + Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => { 266 + continue; 267 + } 268 + Ok(Message::Close(_)) => { 269 + return None; 270 + } 271 + Ok(Message::Frame(_)) => { 272 + continue; 273 + } 274 + Err(e) => { 275 + return Some(Err(SubscriptionStageError::Transport { 276 + message: e.to_string(), 277 + source: Some(Box::new(e)), 278 + })); 279 + } 280 + } 281 + } 282 + } 283 + 284 + async fn close(&mut self) { 285 + let _ = self.stream.close(None).await; 286 + } 287 + } 288 + 289 + #[async_trait] 290 + impl WebSocketClient for RealWebSocketClient { 291 + async fn connect(&self, url: &Url) -> Result<Box<dyn FrameStream>, SubscriptionStageError> { 292 + use tokio_tungstenite::tungstenite::client::IntoClientRequest; 293 + 294 + let request = url.to_string().into_client_request().map_err(|e| { 295 + SubscriptionStageError::Transport { 296 + message: e.to_string(), 297 + source: Some(Box::new(e)), 298 + } 299 + })?; 300 + 301 + let (stream, _response) = tokio_tungstenite::connect_async(request) 302 + .await 303 + .map_err(|e| SubscriptionStageError::Transport { 304 + message: e.to_string(), 305 + source: Some(Box::new(e)), 306 + })?; 307 + 308 + Ok(Box::new(RealFrameStream { stream })) 309 + } 310 + } 311 + 312 + /// Output from the subscription stage: facts (if any) plus all check results. 313 + #[derive(Debug)] 314 + pub struct SubscriptionStageOutput { 315 + /// Facts populated only when the stage completes without blocking errors. 316 + pub facts: Option<SubscriptionFacts>, 317 + /// All check results from this stage. 318 + pub results: Vec<crate::commands::test::labeler::report::CheckResult>, 319 + } 320 + 321 + /// Run the subscription stage with a two-connection backfill + live-tail strategy. 322 + pub async fn run( 323 + labeler_endpoint: &Url, 324 + ws: &dyn WebSocketClient, 325 + budget_per_connection: Duration, 326 + ) -> SubscriptionStageOutput { 327 + use crate::commands::test::labeler::report::{CheckResult, CheckStatus, Stage}; 328 + use std::borrow::Cow; 329 + use std::collections::HashSet; 330 + 331 + // Build the subscription URL with cursor=0 for backfill. 332 + let backfill_url = { 333 + let mut url = labeler_endpoint.clone(); 334 + url.set_path("xrpc/com.atproto.label.subscribeLabels"); 335 + { 336 + let mut query = url.query_pairs_mut(); 337 + query.append_pair("cursor", "0"); 338 + } 339 + // Ensure the scheme is wss. 340 + if url.scheme() == "https" { 341 + let _ = url.set_scheme("wss"); 342 + } 343 + url 344 + }; 345 + 346 + // Attempt to connect for backfill. 347 + let mut stream = match ws.connect(&backfill_url).await { 348 + Ok(s) => s, 349 + Err(_e) => { 350 + return SubscriptionStageOutput { 351 + facts: None, 352 + results: vec![CheckResult { 353 + id: "subscription::endpoint_reachable", 354 + stage: Stage::Subscription, 355 + status: CheckStatus::NetworkError, 356 + summary: Cow::Borrowed("Subscription endpoint reachable"), 357 + diagnostic: None, 358 + skipped_reason: None, 359 + }], 360 + }; 361 + } 362 + }; 363 + 364 + // Backfill phase: drain frames with a budget and idle-gap detection. 365 + let mut backfill_outcome = BackfillOutcome::NoFramesWithinBudget; 366 + let mut live_tail_outcome: Option<LiveTailOutcome> = None; 367 + let mut decode_errors: Vec<FrameDecodeError> = vec![]; 368 + let mut frames_observed = 0; 369 + let mut last_frame_at: Option<Instant> = None; 370 + 371 + let backfill_deadline = Instant::now() + budget_per_connection; 372 + 373 + loop { 374 + // Check if the deadline has been exceeded. 375 + if Instant::now() >= backfill_deadline { 376 + if frames_observed > 0 { 377 + backfill_outcome = BackfillOutcome::ExceededBudget { frames_observed }; 378 + } 379 + break; 380 + } 381 + 382 + // Compute the timeout for the next frame: either budget remaining or idle gap. 383 + let idle_gap_deadline = last_frame_at.map(|t| t + Duration::from_millis(500)); 384 + let timeout = if let Some(idle_deadline) = idle_gap_deadline { 385 + if idle_deadline <= Instant::now() { 386 + backfill_outcome = BackfillOutcome::CompletedWithIdleGap { 387 + frames_observed, 388 + idle_gap_ms: 500, 389 + }; 390 + live_tail_outcome = Some(LiveTailOutcome::FromBackfill); 391 + break; 392 + } 393 + let idle_time_left = idle_deadline.saturating_duration_since(Instant::now()); 394 + let budget_time_left = backfill_deadline.saturating_duration_since(Instant::now()); 395 + idle_time_left.min(budget_time_left) 396 + } else { 397 + backfill_deadline.saturating_duration_since(Instant::now()) 398 + }; 399 + 400 + // Wait for the next frame with timeout. 401 + match tokio::time::timeout(timeout, stream.next_frame()).await { 402 + Ok(Some(Ok(frame_bytes))) => { 403 + last_frame_at = Some(Instant::now()); 404 + frames_observed += 1; 405 + if let Err(e) = decode_frame(&frame_bytes) { 406 + decode_errors.push(e); 407 + } 408 + } 409 + Ok(Some(Err(_e))) => { 410 + last_frame_at = Some(Instant::now()); 411 + } 412 + Ok(None) => { 413 + // Stream closed. Exit immediately without waiting for budget. 414 + if frames_observed == 0 { 415 + backfill_outcome = BackfillOutcome::NoFramesWithinBudget; 416 + } else { 417 + backfill_outcome = BackfillOutcome::CompletedWithIdleGap { 418 + frames_observed, 419 + idle_gap_ms: 500, 420 + }; 421 + live_tail_outcome = Some(LiveTailOutcome::FromBackfill); 422 + } 423 + break; 424 + } 425 + Err(_e) => { 426 + if frames_observed > 0 { 427 + if let Some(idle_deadline) = idle_gap_deadline { 428 + if Instant::now() >= idle_deadline { 429 + backfill_outcome = BackfillOutcome::CompletedWithIdleGap { 430 + frames_observed, 431 + idle_gap_ms: 500, 432 + }; 433 + live_tail_outcome = Some(LiveTailOutcome::FromBackfill); 434 + } else { 435 + backfill_outcome = BackfillOutcome::ExceededBudget { frames_observed }; 436 + } 437 + } else { 438 + backfill_outcome = BackfillOutcome::ExceededBudget { frames_observed }; 439 + } 440 + } 441 + break; 442 + } 443 + } 444 + } 445 + 446 + stream.close().await; 447 + 448 + // Determine the live-tail outcome if not already set. 449 + if live_tail_outcome.is_none() { 450 + match &backfill_outcome { 451 + BackfillOutcome::ExceededBudget { .. } => { 452 + let mut live_tail_url = labeler_endpoint.clone(); 453 + live_tail_url.set_path("xrpc/com.atproto.label.subscribeLabels"); 454 + if live_tail_url.scheme() == "https" { 455 + let _ = live_tail_url.set_scheme("wss"); 456 + } 457 + 458 + match ws.connect(&live_tail_url).await { 459 + Ok(mut live_stream) => { 460 + let mut live_frames_observed = 0; 461 + let live_deadline = Instant::now() + budget_per_connection; 462 + 463 + loop { 464 + if Instant::now() >= live_deadline { 465 + break; 466 + } 467 + let time_left = live_deadline.saturating_duration_since(Instant::now()); 468 + match tokio::time::timeout(time_left, live_stream.next_frame()).await { 469 + Ok(Some(Ok(frame))) => { 470 + live_frames_observed += 1; 471 + if let Err(e) = decode_frame(&frame) { 472 + decode_errors.push(e); 473 + } 474 + } 475 + Ok(Some(Err(_))) => { 476 + live_frames_observed += 1; 477 + } 478 + Ok(None) | Err(_) => break, 479 + } 480 + } 481 + 482 + live_stream.close().await; 483 + live_tail_outcome = Some(LiveTailOutcome::CleanHold { 484 + frames_observed: live_frames_observed, 485 + }); 486 + } 487 + Err(_) => { 488 + live_tail_outcome = Some(LiveTailOutcome::CleanHold { frames_observed: 0 }); 489 + } 490 + } 491 + } 492 + BackfillOutcome::NoFramesWithinBudget => { 493 + live_tail_outcome = Some(LiveTailOutcome::SkippedEmpty); 494 + } 495 + _ => {} 496 + } 497 + } 498 + 499 + // Build check results. 500 + let mut results = vec![]; 501 + 502 + // Backfill check result. 503 + let (backfill_status, backfill_summary, backfill_reason) = match &backfill_outcome { 504 + BackfillOutcome::CompletedWithIdleGap { .. } => ( 505 + CheckStatus::Pass, 506 + Cow::Borrowed("Subscription backfill completed"), 507 + None, 508 + ), 509 + BackfillOutcome::ExceededBudget { .. } => ( 510 + CheckStatus::Advisory, 511 + Cow::Borrowed("Subscription backfill exceeded budget"), 512 + None, 513 + ), 514 + BackfillOutcome::NoFramesWithinBudget => ( 515 + CheckStatus::Advisory, 516 + Cow::Borrowed("Subscription backfill had no frames"), 517 + Some(Cow::Borrowed("labeler has no published labels")), 518 + ), 519 + }; 520 + results.push(CheckResult { 521 + id: "subscription::backfill", 522 + stage: Stage::Subscription, 523 + status: backfill_status, 524 + summary: backfill_summary, 525 + diagnostic: None, 526 + skipped_reason: backfill_reason, 527 + }); 528 + 529 + // Live-tail check result. 530 + if let Some(lt_outcome) = &live_tail_outcome { 531 + let (live_status, live_summary, live_reason) = match lt_outcome { 532 + LiveTailOutcome::FromBackfill => ( 533 + CheckStatus::Pass, 534 + Cow::Borrowed("Subscription live-tail observed after backfill"), 535 + None, 536 + ), 537 + LiveTailOutcome::CleanHold { .. } => ( 538 + CheckStatus::Pass, 539 + Cow::Borrowed("Subscription live-tail connection held"), 540 + None, 541 + ), 542 + LiveTailOutcome::SkippedEmpty => ( 543 + CheckStatus::Skipped, 544 + Cow::Borrowed("Subscription live-tail skipped"), 545 + Some(Cow::Borrowed("labeler has no published labels")), 546 + ), 547 + }; 548 + results.push(CheckResult { 549 + id: "subscription::live_tail", 550 + stage: Stage::Subscription, 551 + status: live_status, 552 + summary: live_summary, 553 + diagnostic: None, 554 + skipped_reason: live_reason, 555 + }); 556 + } 557 + 558 + // Add spec violation results for unique decode error variants. 559 + let mut seen_variants = HashSet::new(); 560 + for err in decode_errors.iter() { 561 + let variant_key = std::mem::discriminant(err); 562 + if seen_variants.insert(variant_key) { 563 + let (raw_bytes, msg) = match err { 564 + FrameDecodeError::HeaderDecode { raw, cause } => { 565 + (raw.clone(), format!("Header decode failed: {cause}")) 566 + } 567 + FrameDecodeError::PayloadDecode { raw, cause, .. } => { 568 + (raw.clone(), format!("Payload decode failed: {cause}")) 569 + } 570 + FrameDecodeError::UnknownMessageType { t, raw } => { 571 + (raw.clone(), format!("Unknown message type: {t}")) 572 + } 573 + FrameDecodeError::TextFrameRejected(raw) => ( 574 + raw.clone(), 575 + "Text frame rejected (expected binary)".to_string(), 576 + ), 577 + }; 578 + 579 + let diagnostic = FrameDecodeFailureDiagnostic { 580 + message: msg, 581 + source_code: NamedSource::new("frame", raw_bytes), 582 + span: SourceSpan::new(0.into(), 1), 583 + }; 584 + 585 + results.push(CheckResult { 586 + id: "subscription::frame_decode", 587 + stage: Stage::Subscription, 588 + status: CheckStatus::SpecViolation, 589 + summary: Cow::Borrowed("Subscription frame decode failure"), 590 + diagnostic: Some(Box::new(diagnostic)), 591 + skipped_reason: None, 592 + }); 593 + } 594 + } 595 + 596 + let facts = Some(SubscriptionFacts { 597 + backfill_outcome, 598 + live_tail_outcome, 599 + decode_errors, 600 + }); 601 + 602 + SubscriptionStageOutput { facts, results } 184 603 } 185 604 186 605 #[cfg(test)]
+145
tests/common/mod.rs
··· 5 5 6 6 use async_trait::async_trait; 7 7 use atproto_devtool::commands::test::labeler::http::{HttpStageError, RawHttpTee, RawXrpcResponse}; 8 + use atproto_devtool::commands::test::labeler::subscription::{ 9 + FrameStream, SubscriptionStageError, WebSocketClient, 10 + }; 8 11 use std::collections::HashMap; 9 12 use std::sync::{Arc, Mutex}; 13 + use std::time::Duration; 14 + use url::Url; 10 15 11 16 /// Type alias for HTTP response map in tests. 12 17 pub type FakeHttpResponses = Arc<Mutex<HashMap<Option<String>, (reqwest::StatusCode, Vec<u8>)>>>; ··· 96 101 } 97 102 } 98 103 } 104 + 105 + /// Fake WebSocket client for testing subscription stage with scripted responses. 106 + pub struct FakeWebSocketClient { 107 + /// Scripts queued for each connection. 108 + scripts: Arc<Mutex<Vec<FakeScript>>>, 109 + } 110 + 111 + /// A script for a single WebSocket connection. 112 + pub struct FakeScript { 113 + /// Frames to return (if no transport error). 114 + pub frames: Vec<Vec<u8>>, 115 + /// Delay between frames. 116 + pub inter_frame_delay: Duration, 117 + /// Optional final wait after all frames (simulates idle gap or continued streaming). 118 + pub final_wait: Option<Duration>, 119 + /// Whether to return a transport error instead. 120 + pub transport_error: bool, 121 + } 122 + 123 + impl FakeWebSocketClient { 124 + /// Create a new empty FakeWebSocketClient. 125 + #[allow( 126 + dead_code, 127 + reason = "shared integration-test helper; cargo compiles each tests/*.rs as a separate binary so unused-in-one-binary triggers dead_code — tests/common/mod.rs is the cargo-idiomatic shared module" 128 + )] 129 + pub fn new() -> Self { 130 + Self { 131 + scripts: Arc::new(Mutex::new(Vec::new())), 132 + } 133 + } 134 + 135 + /// Create a FakeWebSocketClient that returns an empty stream (no frames, immediate closure). 136 + #[allow( 137 + dead_code, 138 + reason = "shared integration-test helper; cargo compiles each tests/*.rs as a separate binary so unused-in-one-binary triggers dead_code — tests/common/mod.rs is the cargo-idiomatic shared module" 139 + )] 140 + pub fn empty() -> Self { 141 + Self::new() 142 + } 143 + 144 + /// Add a script to the queue for the next connection. 145 + #[allow( 146 + dead_code, 147 + reason = "shared integration-test helper; cargo compiles each tests/*.rs as a separate binary so unused-in-one-binary triggers dead_code — tests/common/mod.rs is the cargo-idiomatic shared module" 148 + )] 149 + pub fn add_script(&self, script: FakeScript) { 150 + self.scripts.lock().unwrap().push(script); 151 + } 152 + } 153 + 154 + impl Default for FakeWebSocketClient { 155 + fn default() -> Self { 156 + Self::new() 157 + } 158 + } 159 + 160 + /// A fake frame stream returned by FakeWebSocketClient. 161 + #[allow( 162 + dead_code, 163 + reason = "shared integration-test helper; cargo compiles each tests/*.rs as a separate binary so unused-in-one-binary triggers dead_code — tests/common/mod.rs is the cargo-idiomatic shared module" 164 + )] 165 + struct FakeFrameStream { 166 + frames: Vec<Vec<u8>>, 167 + current_frame: usize, 168 + inter_frame_delay: Duration, 169 + final_wait: Option<Duration>, 170 + transport_error: bool, 171 + } 172 + 173 + #[async_trait] 174 + impl FrameStream for FakeFrameStream { 175 + async fn next_frame(&mut self) -> Option<Result<Vec<u8>, SubscriptionStageError>> { 176 + if self.transport_error { 177 + return Some(Err(SubscriptionStageError::Transport { 178 + message: "fake transport error".to_string(), 179 + source: None, 180 + })); 181 + } 182 + 183 + // Return frames one by one with inter-frame delays. 184 + if self.current_frame < self.frames.len() { 185 + if self.current_frame > 0 { 186 + tokio::time::sleep(self.inter_frame_delay).await; 187 + } 188 + let frame = self.frames[self.current_frame].clone(); 189 + self.current_frame += 1; 190 + return Some(Ok(frame)); 191 + } 192 + 193 + // All frames consumed. Apply final_wait if present, then close stream. 194 + if let Some(wait_duration) = self.final_wait { 195 + if self.current_frame == self.frames.len() && self.current_frame > 0 { 196 + tokio::time::sleep(wait_duration).await; 197 + self.current_frame += 1; 198 + } 199 + } 200 + 201 + // Stream closed. 202 + None 203 + } 204 + 205 + async fn close(&mut self) { 206 + // Noop for fake. 207 + } 208 + } 209 + 210 + #[async_trait] 211 + impl WebSocketClient for FakeWebSocketClient { 212 + async fn connect(&self, _url: &Url) -> Result<Box<dyn FrameStream>, SubscriptionStageError> { 213 + let mut scripts = self.scripts.lock().unwrap(); 214 + 215 + if scripts.is_empty() { 216 + // No script available, return empty stream. 217 + return Ok(Box::new(FakeFrameStream { 218 + frames: vec![], 219 + current_frame: 0, 220 + inter_frame_delay: Duration::from_millis(0), 221 + final_wait: None, 222 + transport_error: false, 223 + })); 224 + } 225 + 226 + let script = scripts.remove(0); 227 + 228 + if script.transport_error { 229 + return Err(SubscriptionStageError::Transport { 230 + message: "fake transport error".to_string(), 231 + source: None, 232 + }); 233 + } 234 + 235 + Ok(Box::new(FakeFrameStream { 236 + frames: script.frames, 237 + current_frame: 0, 238 + inter_frame_delay: script.inter_frame_delay, 239 + final_wait: script.final_wait, 240 + transport_error: false, 241 + })) 242 + } 243 + }
+34
tests/labeler_identity.rs
··· 9 9 use atproto_devtool::common::identity::{DnsResolver, HttpClient, IdentityError}; 10 10 use std::collections::HashMap; 11 11 use std::sync::{Arc, Mutex}; 12 + 12 13 use url::Url; 13 14 14 15 /// Type alias for the response map in FakeHttpClient. ··· 144 145 let target = parse_target("did:plc:test123456789abcdefghijklmnop", None).expect("parse failed"); 145 146 let fake_tee = common::FakeRawHttpTee::new(); 146 147 fake_tee.add_response(None, 200, healthy_labels_response()); 148 + let fake_ws = common::FakeWebSocketClient::empty(); 147 149 148 150 let opts = LabelerOptions { 149 151 http: &http, 150 152 dns: &dns, 151 153 raw_http_tee: Some(&fake_tee), 154 + ws_client: Some(&fake_ws), 152 155 reqwest_client: None, 153 156 subscribe_timeout: std::time::Duration::from_secs(5), 154 157 verbose: false, ··· 170 173 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 171 174 fake_tee.add_response(None, 200, empty_response); 172 175 176 + let fake_ws = common::FakeWebSocketClient::empty(); 177 + 173 178 let opts = LabelerOptions { 174 179 http: &http, 175 180 dns: &dns, 176 181 raw_http_tee: Some(&fake_tee), 182 + ws_client: Some(&fake_ws), 177 183 reqwest_client: None, 178 184 subscribe_timeout: std::time::Duration::from_secs(5), 179 185 verbose: false, ··· 218 224 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 219 225 fake_tee.add_response(None, 200, empty_response); 220 226 227 + let fake_ws = common::FakeWebSocketClient::empty(); 228 + 221 229 let opts = LabelerOptions { 222 230 http: &http, 223 231 dns: &dns, 224 232 raw_http_tee: Some(&fake_tee), 233 + ws_client: Some(&fake_ws), 225 234 reqwest_client: None, 226 235 subscribe_timeout: std::time::Duration::from_secs(5), 227 236 verbose: false, ··· 258 267 let target = parse_target("did:plc:test123456789abcdefghijklmnop", None).expect("parse failed"); 259 268 let fake_tee = common::FakeRawHttpTee::new(); 260 269 fake_tee.add_response(None, 200, healthy_labels_response()); 270 + let fake_ws = common::FakeWebSocketClient::empty(); 261 271 262 272 let opts = LabelerOptions { 263 273 http: &http, 264 274 dns: &dns, 265 275 raw_http_tee: Some(&fake_tee), 276 + ws_client: Some(&fake_ws), 266 277 reqwest_client: None, 267 278 subscribe_timeout: std::time::Duration::from_secs(5), 268 279 verbose: false, ··· 300 311 let fake_tee = common::FakeRawHttpTee::new(); 301 312 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 302 313 fake_tee.add_response(None, 200, empty_response); 314 + 315 + let fake_ws = common::FakeWebSocketClient::empty(); 303 316 304 317 let opts = LabelerOptions { 305 318 http: &http, 306 319 dns: &dns, 307 320 raw_http_tee: Some(&fake_tee), 321 + ws_client: Some(&fake_ws), 308 322 reqwest_client: None, 309 323 subscribe_timeout: std::time::Duration::from_secs(5), 310 324 verbose: false, ··· 328 342 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 329 343 fake_tee.add_response(None, 200, empty_response); 330 344 345 + let fake_ws = common::FakeWebSocketClient::empty(); 346 + 331 347 let opts = LabelerOptions { 332 348 http: &http, 333 349 dns: &dns, 334 350 raw_http_tee: Some(&fake_tee), 351 + ws_client: Some(&fake_ws), 335 352 reqwest_client: None, 336 353 subscribe_timeout: std::time::Duration::from_secs(5), 337 354 verbose: false, ··· 366 383 let target = parse_target("did:plc:test123456789abcdefghijklmnop", None).expect("parse failed"); 367 384 let fake_tee = common::FakeRawHttpTee::new(); 368 385 fake_tee.add_response(None, 200, healthy_labels_response()); 386 + let fake_ws = common::FakeWebSocketClient::empty(); 369 387 370 388 let opts = LabelerOptions { 371 389 http: &http, 372 390 dns: &dns, 373 391 raw_http_tee: Some(&fake_tee), 392 + ws_client: Some(&fake_ws), 374 393 reqwest_client: None, 375 394 subscribe_timeout: std::time::Duration::from_secs(5), 376 395 verbose: false, ··· 410 429 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 411 430 fake_tee.add_response(None, 200, empty_response); 412 431 432 + let fake_ws = common::FakeWebSocketClient::empty(); 433 + 413 434 let opts = LabelerOptions { 414 435 http: &http, 415 436 dns: &dns, 416 437 raw_http_tee: Some(&fake_tee), 438 + ws_client: Some(&fake_ws), 417 439 reqwest_client: None, 418 440 subscribe_timeout: std::time::Duration::from_secs(5), 419 441 verbose: false, ··· 455 477 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 456 478 fake_tee.add_response(None, 200, empty_response); 457 479 480 + let fake_ws = common::FakeWebSocketClient::empty(); 481 + 458 482 let opts = LabelerOptions { 459 483 http: &http, 460 484 dns: &dns, 461 485 raw_http_tee: Some(&fake_tee), 486 + ws_client: Some(&fake_ws), 462 487 reqwest_client: None, 463 488 subscribe_timeout: std::time::Duration::from_secs(5), 464 489 verbose: false, ··· 498 523 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 499 524 fake_tee.add_response(None, 200, empty_response); 500 525 526 + let fake_ws = common::FakeWebSocketClient::empty(); 527 + 501 528 let opts = LabelerOptions { 502 529 http: &http, 503 530 dns: &dns, 504 531 raw_http_tee: Some(&fake_tee), 532 + ws_client: Some(&fake_ws), 505 533 reqwest_client: None, 506 534 subscribe_timeout: std::time::Duration::from_secs(5), 507 535 verbose: false, ··· 540 568 let fake_tee = common::FakeRawHttpTee::new(); 541 569 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 542 570 fake_tee.add_response(None, 200, empty_response); 571 + 572 + let fake_ws = common::FakeWebSocketClient::empty(); 543 573 544 574 let opts = LabelerOptions { 545 575 http: &http, 546 576 dns: &dns, 547 577 raw_http_tee: Some(&fake_tee), 578 + ws_client: Some(&fake_ws), 548 579 reqwest_client: None, 549 580 subscribe_timeout: std::time::Duration::from_secs(5), 550 581 verbose: false, ··· 588 619 let empty_response = br#"{"cursor":null,"labels":[]}"#.to_vec(); 589 620 fake_tee.add_response(None, 200, empty_response); 590 621 622 + let fake_ws = common::FakeWebSocketClient::empty(); 623 + 591 624 let opts = LabelerOptions { 592 625 http: &http, 593 626 dns: &dns, 594 627 raw_http_tee: Some(&fake_tee), 628 + ws_client: Some(&fake_ws), 595 629 reqwest_client: None, 596 630 subscribe_timeout: std::time::Duration::from_secs(5), 597 631 verbose: false,
+3 -2
tests/snapshots/labeler_identity__did_plc_direct_happy_path.snap
··· 23 23 [OK] First page schema is valid 24 24 [OK] First page was complete; pagination not exercised 25 25 == Subscription == 26 - [SKIP] Subscription stage (stub) — not yet implemented (phase 5) 26 + [WARN] Subscription backfill had no frames — labeler has no published labels 27 + [SKIP] Subscription live-tail skipped — labeler has no published labels 27 28 == Crypto == 28 29 [SKIP] Crypto stage (stub) — not yet implemented (phase 6) 29 30 30 - Summary: 11 passed, 0 failed (spec), 0 network errors, 0 advisories, 3 skipped. Exit code: 0 31 + Summary: 11 passed, 0 failed (spec), 0 network errors, 1 advisories, 3 skipped. Exit code: 0
+3 -2
tests/snapshots/labeler_identity__did_web_direct_happy_path.snap
··· 24 24 [WARN] Labeler has no published labels 25 25 [OK] First page was complete; pagination not exercised 26 26 == Subscription == 27 - [SKIP] Subscription stage (stub) — not yet implemented (phase 5) 27 + [WARN] Subscription backfill had no frames — labeler has no published labels 28 + [SKIP] Subscription live-tail skipped — labeler has no published labels 28 29 == Crypto == 29 30 [SKIP] Crypto stage (stub) — not yet implemented (phase 6) 30 31 31 - Summary: 11 passed, 0 failed (spec), 0 network errors, 1 advisories, 3 skipped. Exit code: 0 32 + Summary: 11 passed, 0 failed (spec), 0 network errors, 2 advisories, 3 skipped. Exit code: 0
+1 -1
tests/snapshots/labeler_identity__empty_policies_renders_spec_violation_with_span.snap
··· 28 28 == HTTP == 29 29 [SKIP] HTTP stage (not run) — blocked by identity stage failures 30 30 == Subscription == 31 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 31 + [SKIP] Subscription stage (not run) — blocked by identity stage failures 32 32 == Crypto == 33 33 [SKIP] Crypto stage (stub) — blocked by identity stage failures 34 34
+3 -2
tests/snapshots/labeler_identity__endpoint_mismatch_spec_violation.snap
··· 31 31 [WARN] Labeler has no published labels 32 32 [OK] First page was complete; pagination not exercised 33 33 == Subscription == 34 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 34 + [WARN] Subscription backfill had no frames — labeler has no published labels 35 + [SKIP] Subscription live-tail skipped — labeler has no published labels 35 36 == Crypto == 36 37 [SKIP] Crypto stage (stub) — blocked by identity stage failures 37 38 38 - Summary: 11 passed, 1 failed (spec), 0 network errors, 1 advisories, 2 skipped. Exit code: 1 39 + Summary: 11 passed, 1 failed (spec), 0 network errors, 2 advisories, 2 skipped. Exit code: 1
+3 -2
tests/snapshots/labeler_identity__endpoint_only_no_did_skips_identity.snap
··· 21 21 [WARN] Labeler has no published labels 22 22 [OK] First page was complete; pagination not exercised 23 23 == Subscription == 24 - [SKIP] Subscription stage (stub) — not yet implemented (phase 5) 24 + [WARN] Subscription backfill had no frames — labeler has no published labels 25 + [SKIP] Subscription live-tail skipped — labeler has no published labels 25 26 == Crypto == 26 27 [SKIP] Crypto stage (stub) — not yet implemented (phase 6) 27 28 28 - Summary: 3 passed, 0 failed (spec), 0 network errors, 1 advisories, 11 skipped. Exit code: 0 29 + Summary: 3 passed, 0 failed (spec), 0 network errors, 2 advisories, 11 skipped. Exit code: 0
+3 -2
tests/snapshots/labeler_identity__handle_resolution_happy_path.snap
··· 24 24 [WARN] Labeler has no published labels 25 25 [OK] First page was complete; pagination not exercised 26 26 == Subscription == 27 - [SKIP] Subscription stage (stub) — not yet implemented (phase 5) 27 + [WARN] Subscription backfill had no frames — labeler has no published labels 28 + [SKIP] Subscription live-tail skipped — labeler has no published labels 28 29 == Crypto == 29 30 [SKIP] Crypto stage (stub) — not yet implemented (phase 6) 30 31 31 - Summary: 11 passed, 0 failed (spec), 0 network errors, 1 advisories, 3 skipped. Exit code: 0 32 + Summary: 11 passed, 0 failed (spec), 0 network errors, 2 advisories, 3 skipped. Exit code: 0
+3 -2
tests/snapshots/labeler_identity__healthy_plc_renders_all_ok.snap
··· 23 23 [OK] First page schema is valid 24 24 [OK] First page was complete; pagination not exercised 25 25 == Subscription == 26 - [SKIP] Subscription stage (stub) — not yet implemented (phase 5) 26 + [WARN] Subscription backfill had no frames — labeler has no published labels 27 + [SKIP] Subscription live-tail skipped — labeler has no published labels 27 28 == Crypto == 28 29 [SKIP] Crypto stage (stub) — not yet implemented (phase 6) 29 30 30 - Summary: 11 passed, 0 failed (spec), 0 network errors, 0 advisories, 3 skipped. Exit code: 0 31 + Summary: 11 passed, 0 failed (spec), 0 network errors, 1 advisories, 3 skipped. Exit code: 0
+1 -1
tests/snapshots/labeler_identity__missing_labeler_record_renders_404_distinct_from_transport.snap
··· 21 21 == HTTP == 22 22 [SKIP] HTTP stage (not run) — blocked by identity stage failures 23 23 == Subscription == 24 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 24 + [SKIP] Subscription stage (not run) — blocked by identity stage failures 25 25 == Crypto == 26 26 [SKIP] Crypto stage (stub) — blocked by identity stage failures 27 27
+1 -1
tests/snapshots/labeler_identity__missing_service_renders_spec_violation_with_span.snap
··· 28 28 == HTTP == 29 29 [SKIP] HTTP stage (not run) — blocked by identity stage failures 30 30 == Subscription == 31 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 31 + [SKIP] Subscription stage (not run) — blocked by identity stage failures 32 32 == Crypto == 33 33 [SKIP] Crypto stage (stub) — blocked by identity stage failures 34 34
+1 -1
tests/snapshots/labeler_identity__missing_signing_key_renders_spec_violation.snap
··· 28 28 == HTTP == 29 29 [SKIP] HTTP stage (not run) — blocked by identity stage failures 30 30 == Subscription == 31 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 31 + [SKIP] Subscription stage (not run) — blocked by identity stage failures 32 32 == Crypto == 33 33 [SKIP] Crypto stage (stub) — blocked by identity stage failures 34 34
+1 -1
tests/snapshots/labeler_identity__non_https_endpoint_renders_spec_violation.snap
··· 28 28 == HTTP == 29 29 [SKIP] HTTP stage (not run) — blocked by identity stage failures 30 30 == Subscription == 31 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 31 + [SKIP] Subscription stage (not run) — blocked by identity stage failures 32 32 == Crypto == 33 33 [SKIP] Crypto stage (stub) — blocked by identity stage failures 34 34
+1 -1
tests/snapshots/labeler_identity__plc_directory_unreachable_renders_network_error.snap
··· 18 18 == HTTP == 19 19 [SKIP] HTTP stage (not run) — blocked by identity stage failures 20 20 == Subscription == 21 - [SKIP] Subscription stage (stub) — blocked by identity stage failures 21 + [SKIP] Subscription stage (not run) — blocked by identity stage failures 22 22 == Crypto == 23 23 [SKIP] Crypto stage (stub) — blocked by identity stage failures 24 24