Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1//! This crate is a vendored fork of https://github.com/rnw159/tonic-mock which is not maintained anymore.
2//! We keep this fork so we can push our version of tonic forward
3use futures::{Stream, StreamExt};
4use prost::Message;
5use std::pin::Pin;
6use tonic::{Request, Response, Status, Streaming};
7
8mod mock;
9
10pub use mock::{MockBody, ProstDecoder};
11
12pub type StreamResponseInner<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync>>;
13pub type StreamResponse<T> = Response<StreamResponseInner<T>>;
14
15/// Generate streaming request for GRPC
16///
17/// When testing streaming RPC implemented with tonic, it is pretty clumsy
18/// to build the streaming request, this function extracted test code and prost
19/// decoder from tonic source code and wrap it with a nice interface. With it,
20/// testing your streaming RPC implementation is much easier.
21///
22/// Usage:
23/// ```
24/// use bytes::Bytes;
25/// use prost::Message;
26/// use tonic_mock::streaming_request;
27///
28/// // normally this should be generated from protos with prost
29/// #[derive(Clone, PartialEq, Message)]
30/// pub struct Event {
31/// #[prost(bytes = "bytes", tag = "1")]
32/// pub id: Bytes,
33/// #[prost(bytes = "bytes", tag = "2")]
34/// pub data: Bytes,
35/// }
36///
37/// let event = Event { id: Bytes::from("1"), data: Bytes::from("a".repeat(10)) };
38/// let mut events = vec![event.clone(), event.clone(), event];
39/// let stream = tonic_mock::streaming_request(events);
40///
41pub fn streaming_request<T>(messages: Vec<T>) -> Request<Streaming<T>>
42where
43 T: Message + Default + 'static,
44{
45 let body = MockBody::new(messages);
46 let decoder: ProstDecoder<T> = ProstDecoder::new();
47 let stream = Streaming::new_request(decoder, body, None, None);
48
49 Request::new(stream)
50}
51
52/// a simple wrapper to process and validate streaming response
53///
54/// Usage:
55/// ```
56/// use tonic::{Response, Status};
57/// use futures::Stream;
58/// use std::pin::Pin;
59///
60/// #[derive(Clone, PartialEq, ::prost::Message)]
61/// pub struct ResponsePush {
62/// #[prost(int32, tag = "1")]
63/// pub code: i32,
64/// }
65///
66/// // below code is to mimic a stream response from a GRPC service
67/// let output = async_stream::try_stream! {
68/// yield ResponsePush { code: 0 };
69/// yield ResponsePush { code: 1 };
70/// yield ResponsePush { code: 2 };
71/// };
72/// let response = Response::new(Box::pin(output) as tonic_mock::StreamResponseInner<ResponsePush>);
73/// let rt = tokio::runtime::Runtime::new().unwrap();
74///
75/// // now we process the events
76/// rt.block_on(async {
77/// tonic_mock::process_streaming_response(response, |msg, i| {
78/// assert!(msg.is_ok());
79/// assert_eq!(msg.as_ref().unwrap().code, i as i32);
80/// }).await;
81/// });
82/// ```
83
84pub async fn process_streaming_response<T, F>(response: StreamResponse<T>, f: F)
85where
86 T: Message + Default + 'static,
87 F: Fn(Result<T, Status>, usize),
88{
89 let mut i: usize = 0;
90 let mut messages = response.into_inner();
91 while let Some(v) = messages.next().await {
92 f(v, i);
93 i += 1;
94 }
95}
96
97/// convert a streaming response to a Vec for simplified testing
98///
99/// Usage:
100/// ```
101/// use tonic::{Response, Status};
102/// use futures::Stream;
103/// use std::pin::Pin;
104///
105/// #[derive(Clone, PartialEq, ::prost::Message)]
106/// pub struct ResponsePush {
107/// #[prost(int32, tag = "1")]
108/// pub code: i32,
109/// }
110///
111/// // below code is to mimic a stream response from a GRPC service
112/// let output = async_stream::try_stream! {
113/// yield ResponsePush { code: 0 };
114/// yield ResponsePush { code: 1 };
115/// yield ResponsePush { code: 2 };
116/// };
117/// let response = Response::new(Box::pin(output) as tonic_mock::StreamResponseInner<ResponsePush>);
118/// let rt = tokio::runtime::Runtime::new().unwrap();
119///
120/// // now we convert response to vec
121/// let result: Vec<Result<ResponsePush, Status>> = rt.block_on(async { tonic_mock::stream_to_vec(response).await });
122/// for (i, v) in result.iter().enumerate() {
123/// assert!(v.is_ok());
124/// assert_eq!(v.as_ref().unwrap().code, i as i32);
125/// }
126/// ```
127pub async fn stream_to_vec<T>(response: StreamResponse<T>) -> Vec<Result<T, Status>>
128where
129 T: Message + Default + 'static,
130{
131 let mut result = Vec::new();
132 let mut messages = response.into_inner();
133 while let Some(v) = messages.next().await {
134 result.push(v)
135 }
136 result
137}