Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 137 lines 4.4 kB view raw
1//! This crate is a vendored fork of https://github.com/rnw159/tonic-mock which is not maintained anymore. 2//! We keep this fork so we can push our version of tonic forward 3use futures::{Stream, StreamExt}; 4use prost::Message; 5use std::pin::Pin; 6use tonic::{Request, Response, Status, Streaming}; 7 8mod mock; 9 10pub use mock::{MockBody, ProstDecoder}; 11 12pub type StreamResponseInner<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync>>; 13pub type StreamResponse<T> = Response<StreamResponseInner<T>>; 14 15/// Generate streaming request for GRPC 16/// 17/// When testing streaming RPC implemented with tonic, it is pretty clumsy 18/// to build the streaming request, this function extracted test code and prost 19/// decoder from tonic source code and wrap it with a nice interface. With it, 20/// testing your streaming RPC implementation is much easier. 21/// 22/// Usage: 23/// ``` 24/// use bytes::Bytes; 25/// use prost::Message; 26/// use tonic_mock::streaming_request; 27/// 28/// // normally this should be generated from protos with prost 29/// #[derive(Clone, PartialEq, Message)] 30/// pub struct Event { 31/// #[prost(bytes = "bytes", tag = "1")] 32/// pub id: Bytes, 33/// #[prost(bytes = "bytes", tag = "2")] 34/// pub data: Bytes, 35/// } 36/// 37/// let event = Event { id: Bytes::from("1"), data: Bytes::from("a".repeat(10)) }; 38/// let mut events = vec![event.clone(), event.clone(), event]; 39/// let stream = tonic_mock::streaming_request(events); 40/// 41pub fn streaming_request<T>(messages: Vec<T>) -> Request<Streaming<T>> 42where 43 T: Message + Default + 'static, 44{ 45 let body = MockBody::new(messages); 46 let decoder: ProstDecoder<T> = ProstDecoder::new(); 47 let stream = Streaming::new_request(decoder, body, None, None); 48 49 Request::new(stream) 50} 51 52/// a simple wrapper to process and validate streaming response 53/// 54/// Usage: 55/// ``` 56/// use tonic::{Response, Status}; 57/// use futures::Stream; 58/// use std::pin::Pin; 59/// 60/// #[derive(Clone, PartialEq, ::prost::Message)] 61/// pub struct ResponsePush { 62/// #[prost(int32, tag = "1")] 63/// pub code: i32, 64/// } 65/// 66/// // below code is to mimic a stream response from a GRPC service 67/// let output = async_stream::try_stream! { 68/// yield ResponsePush { code: 0 }; 69/// yield ResponsePush { code: 1 }; 70/// yield ResponsePush { code: 2 }; 71/// }; 72/// let response = Response::new(Box::pin(output) as tonic_mock::StreamResponseInner<ResponsePush>); 73/// let rt = tokio::runtime::Runtime::new().unwrap(); 74/// 75/// // now we process the events 76/// rt.block_on(async { 77/// tonic_mock::process_streaming_response(response, |msg, i| { 78/// assert!(msg.is_ok()); 79/// assert_eq!(msg.as_ref().unwrap().code, i as i32); 80/// }).await; 81/// }); 82/// ``` 83 84pub async fn process_streaming_response<T, F>(response: StreamResponse<T>, f: F) 85where 86 T: Message + Default + 'static, 87 F: Fn(Result<T, Status>, usize), 88{ 89 let mut i: usize = 0; 90 let mut messages = response.into_inner(); 91 while let Some(v) = messages.next().await { 92 f(v, i); 93 i += 1; 94 } 95} 96 97/// convert a streaming response to a Vec for simplified testing 98/// 99/// Usage: 100/// ``` 101/// use tonic::{Response, Status}; 102/// use futures::Stream; 103/// use std::pin::Pin; 104/// 105/// #[derive(Clone, PartialEq, ::prost::Message)] 106/// pub struct ResponsePush { 107/// #[prost(int32, tag = "1")] 108/// pub code: i32, 109/// } 110/// 111/// // below code is to mimic a stream response from a GRPC service 112/// let output = async_stream::try_stream! { 113/// yield ResponsePush { code: 0 }; 114/// yield ResponsePush { code: 1 }; 115/// yield ResponsePush { code: 2 }; 116/// }; 117/// let response = Response::new(Box::pin(output) as tonic_mock::StreamResponseInner<ResponsePush>); 118/// let rt = tokio::runtime::Runtime::new().unwrap(); 119/// 120/// // now we convert response to vec 121/// let result: Vec<Result<ResponsePush, Status>> = rt.block_on(async { tonic_mock::stream_to_vec(response).await }); 122/// for (i, v) in result.iter().enumerate() { 123/// assert!(v.is_ok()); 124/// assert_eq!(v.as_ref().unwrap().code, i as i32); 125/// } 126/// ``` 127pub async fn stream_to_vec<T>(response: StreamResponse<T>) -> Vec<Result<T, Status>> 128where 129 T: Message + Default + 'static, 130{ 131 let mut result = Vec::new(); 132 let mut messages = response.into_inner(); 133 while let Some(v) = messages.next().await { 134 result.push(v) 135 } 136 result 137}