CLI app for developers prototyping atproto functionality
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add subscribeLabels frame parser and types

Implements the CBOR frame decoding logic for the subscription stage,
including FrameHeader, three payload types (SubscribeLabelsPayload,
SubscribeInfoPayload, SubscribeErrorPayload), DecodedFrame enum, and
FrameDecodeError enum. Includes 7 unit tests covering valid frames,
header decode failures, payload decode failures, unknown message types,
and malformed error frames.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>

+331
+1
src/commands/test/labeler.rs
··· 4 4 pub mod identity; 5 5 pub mod pipeline; 6 6 pub mod report; 7 + pub mod subscription; 7 8 8 9 use std::io; 9 10 use std::process::ExitCode;
+330
src/commands/test/labeler/subscription.rs
··· 1 + //! Subscription stage for the labeler conformance suite. 2 + //! 3 + //! Performs `com.atproto.label.subscribeLabels` requests against the labeler endpoint, 4 + //! using a two-connection strategy: backfill with cursor=0, and live-tail if backfill 5 + //! did not complete within the budget. 6 + 7 + use std::sync::Arc; 8 + 9 + use atrium_api::com::atproto::label::defs::Label; 10 + use serde::{Deserialize, Serialize}; 11 + 12 + /// Frame header parsed from CBOR. 13 + #[derive(Debug, Clone, Serialize, Deserialize)] 14 + pub struct FrameHeader { 15 + /// Operation type: 1 for message, -1 for error. 16 + op: i64, 17 + /// Message type identifier (e.g., "#labels", "#info"), optional for error frames. 18 + #[serde(skip_serializing_if = "Option::is_none")] 19 + t: Option<String>, 20 + } 21 + 22 + /// Payload for `#labels` message frames. 23 + #[derive(Debug, Clone, Serialize, Deserialize)] 24 + pub struct SubscribeLabelsPayload { 25 + /// Sequence number of this label batch. 26 + seq: i64, 27 + /// Array of labels in this batch. 28 + labels: Vec<Label>, 29 + } 30 + 31 + /// Payload for `#info` message frames. 32 + #[derive(Debug, Clone, Serialize, Deserialize)] 33 + pub struct SubscribeInfoPayload { 34 + /// Service name. 35 + name: String, 36 + /// Optional additional message. 37 + message: Option<String>, 38 + } 39 + 40 + /// Payload for error frames (op == -1). 41 + #[derive(Debug, Clone, Serialize, Deserialize)] 42 + pub struct SubscribeErrorPayload { 43 + /// Error code or identifier. 44 + error: String, 45 + /// Optional error description. 46 + message: Option<String>, 47 + } 48 + 49 + /// A decoded WebSocket frame from subscribeLabels. 50 + #[derive(Debug, Clone)] 51 + pub enum DecodedFrame { 52 + /// A labels message frame. 53 + Labels(SubscribeLabelsPayload), 54 + /// An info message frame. 55 + Info(SubscribeInfoPayload), 56 + /// An error frame. 57 + Error(SubscribeErrorPayload), 58 + } 59 + 60 + /// Errors that can occur when decoding a WebSocket frame. 61 + #[derive(Debug, Clone)] 62 + pub enum FrameDecodeError { 63 + /// Failed to decode the header CBOR block. 64 + HeaderDecode { 65 + /// Raw bytes of the frame. 66 + raw: Arc<[u8]>, 67 + /// Human-readable error message. 68 + cause: String, 69 + }, 70 + /// Failed to decode the payload CBOR block. 71 + PayloadDecode { 72 + /// Header successfully decoded. 73 + header: FrameHeader, 74 + /// Raw bytes of the frame. 75 + raw: Arc<[u8]>, 76 + /// Human-readable error message. 77 + cause: String, 78 + }, 79 + /// Message type not recognized. 80 + UnknownMessageType { 81 + /// The unrecognized type identifier. 82 + t: String, 83 + /// Raw bytes of the frame. 84 + raw: Arc<[u8]>, 85 + }, 86 + /// Text frame received (not allowed). 87 + TextFrameRejected(Arc<[u8]>), 88 + } 89 + 90 + /// Decode a two-CBOR-block WebSocket frame into a typed message. 91 + pub fn decode_frame(bytes: &[u8]) -> Result<DecodedFrame, FrameDecodeError> { 92 + let mut cursor = bytes; 93 + 94 + // Decode the header CBOR block. 95 + let header = ciborium::de::from_reader::<FrameHeader, _>(&mut cursor).map_err(|e| { 96 + FrameDecodeError::HeaderDecode { 97 + raw: Arc::from(bytes), 98 + cause: e.to_string(), 99 + } 100 + })?; 101 + 102 + // Based on op and t, decode the payload block accordingly. 103 + match (header.op, &header.t) { 104 + (1, Some(t)) if t == "#labels" => { 105 + let payload = ciborium::de::from_reader::<SubscribeLabelsPayload, _>(&mut cursor) 106 + .map_err(|e| FrameDecodeError::PayloadDecode { 107 + header: header.clone(), 108 + raw: Arc::from(bytes), 109 + cause: e.to_string(), 110 + })?; 111 + Ok(DecodedFrame::Labels(payload)) 112 + } 113 + (1, Some(t)) if t == "#info" => { 114 + let payload = ciborium::de::from_reader::<SubscribeInfoPayload, _>(&mut cursor) 115 + .map_err(|e| FrameDecodeError::PayloadDecode { 116 + header: header.clone(), 117 + raw: Arc::from(bytes), 118 + cause: e.to_string(), 119 + })?; 120 + Ok(DecodedFrame::Info(payload)) 121 + } 122 + (-1, _) => { 123 + let payload = ciborium::de::from_reader::<SubscribeErrorPayload, _>(&mut cursor) 124 + .map_err(|e| FrameDecodeError::PayloadDecode { 125 + header: header.clone(), 126 + raw: Arc::from(bytes), 127 + cause: e.to_string(), 128 + })?; 129 + Ok(DecodedFrame::Error(payload)) 130 + } 131 + (_, Some(t)) => Err(FrameDecodeError::UnknownMessageType { 132 + t: t.clone(), 133 + raw: Arc::from(bytes), 134 + }), 135 + _ => Err(FrameDecodeError::UnknownMessageType { 136 + t: format!("unknown op={} t={:?}", header.op, header.t), 137 + raw: Arc::from(bytes), 138 + }), 139 + } 140 + } 141 + 142 + /// Outcome of the backfill phase. 143 + #[derive(Debug, Clone)] 144 + pub enum BackfillOutcome { 145 + /// Backfill completed with an idle gap (no frames for 500ms). 146 + CompletedWithIdleGap { 147 + /// Number of frames observed during backfill. 148 + frames_observed: usize, 149 + /// Duration of idle gap in milliseconds. 150 + idle_gap_ms: u64, 151 + }, 152 + /// Backfill exceeded the time budget while still producing frames. 153 + ExceededBudget { 154 + /// Number of frames observed before timeout. 155 + frames_observed: usize, 156 + }, 157 + /// No frames received during the entire budget. 158 + NoFramesWithinBudget, 159 + } 160 + 161 + /// Outcome of the live-tail phase. 162 + #[derive(Debug, Clone)] 163 + pub enum LiveTailOutcome { 164 + /// Live tail observed after backfill completed (implicit pass). 165 + FromBackfill, 166 + /// Live-tail connection held open, frames may have been observed. 167 + CleanHold { 168 + /// Number of frames observed during live tail. 169 + frames_observed: usize, 170 + }, 171 + /// Live tail skipped because no frames were observed in backfill. 172 + SkippedEmpty, 173 + } 174 + 175 + /// Facts gathered from the subscription stage. 176 + #[derive(Debug, Clone)] 177 + pub struct SubscriptionFacts { 178 + /// Outcome of the backfill phase. 179 + pub backfill_outcome: BackfillOutcome, 180 + /// Outcome of the live-tail phase (if applicable). 181 + pub live_tail_outcome: Option<LiveTailOutcome>, 182 + /// Any frame decode errors encountered. 183 + pub decode_errors: Vec<FrameDecodeError>, 184 + } 185 + 186 + #[cfg(test)] 187 + mod tests { 188 + use super::*; 189 + 190 + /// Helper to encode a struct into CBOR bytes. 191 + fn encode_cbor<T: Serialize>(value: &T) -> Vec<u8> { 192 + let mut buf = Vec::new(); 193 + ciborium::ser::into_writer(value, &mut buf).expect("failed to encode CBOR"); 194 + buf 195 + } 196 + 197 + #[test] 198 + fn decode_labels_frame_valid() { 199 + let header = FrameHeader { 200 + op: 1, 201 + t: Some("#labels".to_string()), 202 + }; 203 + let payload = SubscribeLabelsPayload { 204 + seq: 0, 205 + labels: vec![], 206 + }; 207 + 208 + let mut frame_bytes = encode_cbor(&header); 209 + frame_bytes.extend(encode_cbor(&payload)); 210 + 211 + match decode_frame(&frame_bytes) { 212 + Ok(DecodedFrame::Labels(p)) => { 213 + assert_eq!(p.seq, 0); 214 + assert!(p.labels.is_empty()); 215 + } 216 + other => panic!("expected DecodedFrame::Labels, got {other:?}"), 217 + } 218 + } 219 + 220 + #[test] 221 + fn decode_info_frame_valid() { 222 + let header = FrameHeader { 223 + op: 1, 224 + t: Some("#info".to_string()), 225 + }; 226 + let payload = SubscribeInfoPayload { 227 + name: "test-service".to_string(), 228 + message: Some("info message".to_string()), 229 + }; 230 + 231 + let mut frame_bytes = encode_cbor(&header); 232 + frame_bytes.extend(encode_cbor(&payload)); 233 + 234 + match decode_frame(&frame_bytes) { 235 + Ok(DecodedFrame::Info(p)) => { 236 + assert_eq!(p.name, "test-service"); 237 + assert_eq!(p.message, Some("info message".to_string())); 238 + } 239 + other => panic!("expected DecodedFrame::Info, got {other:?}"), 240 + } 241 + } 242 + 243 + #[test] 244 + fn decode_error_frame_valid() { 245 + let header = FrameHeader { op: -1, t: None }; 246 + let payload = SubscribeErrorPayload { 247 + error: "TestError".to_string(), 248 + message: Some("Test error message".to_string()), 249 + }; 250 + 251 + let mut frame_bytes = encode_cbor(&header); 252 + frame_bytes.extend(encode_cbor(&payload)); 253 + 254 + match decode_frame(&frame_bytes) { 255 + Ok(DecodedFrame::Error(p)) => { 256 + assert_eq!(p.error, "TestError"); 257 + assert_eq!(p.message, Some("Test error message".to_string())); 258 + } 259 + other => panic!("expected DecodedFrame::Error, got {other:?}"), 260 + } 261 + } 262 + 263 + #[test] 264 + fn decode_frame_header_decode_failure() { 265 + let garbage = vec![0x1f, 0x2f, 0x3f]; // Invalid CBOR 266 + match decode_frame(&garbage) { 267 + Err(FrameDecodeError::HeaderDecode { raw, cause: _ }) => { 268 + assert_eq!(raw.as_ref(), &garbage); 269 + } 270 + other => panic!("expected HeaderDecode error, got {other:?}"), 271 + } 272 + } 273 + 274 + #[test] 275 + fn decode_frame_payload_decode_failure() { 276 + let header = FrameHeader { 277 + op: 1, 278 + t: Some("#labels".to_string()), 279 + }; 280 + let mut frame_bytes = encode_cbor(&header); 281 + frame_bytes.push(0xff); // Garbage after header 282 + 283 + match decode_frame(&frame_bytes) { 284 + Err(FrameDecodeError::PayloadDecode { 285 + header: _, 286 + raw, 287 + cause: _, 288 + }) => { 289 + assert_eq!(raw.as_ref(), &frame_bytes); 290 + } 291 + other => panic!("expected PayloadDecode error, got {other:?}"), 292 + } 293 + } 294 + 295 + #[test] 296 + fn decode_frame_unknown_message_type() { 297 + let header = FrameHeader { 298 + op: 1, 299 + t: Some("#futureType".to_string()), 300 + }; 301 + let mut frame_bytes = encode_cbor(&header); 302 + // Add some dummy payload bytes. 303 + frame_bytes.extend(encode_cbor(&serde_json::json!({}))); 304 + 305 + match decode_frame(&frame_bytes) { 306 + Err(FrameDecodeError::UnknownMessageType { t, raw: _ }) => { 307 + assert_eq!(t, "#futureType"); 308 + } 309 + other => panic!("expected UnknownMessageType error, got {other:?}"), 310 + } 311 + } 312 + 313 + #[test] 314 + fn decode_frame_error_payload_malformed() { 315 + let header = FrameHeader { op: -1, t: None }; 316 + let mut frame_bytes = encode_cbor(&header); 317 + frame_bytes.push(0xff); // Garbage payload 318 + 319 + match decode_frame(&frame_bytes) { 320 + Err(FrameDecodeError::PayloadDecode { 321 + header: _, 322 + raw, 323 + cause: _, 324 + }) => { 325 + assert_eq!(raw.as_ref(), &frame_bytes); 326 + } 327 + other => panic!("expected PayloadDecode error, got {other:?}"), 328 + } 329 + } 330 + }