Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use http_body::Body;
3use prost::Message;
4use std::{
5 collections::VecDeque,
6 marker::PhantomData,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use tonic::{
12 codec::{DecodeBuf, Decoder},
13 Status,
14};
15
16#[derive(Clone)]
17pub struct MockBody {
18 data: VecDeque<Bytes>,
19}
20
21impl MockBody {
22 pub fn new(data: Vec<impl Message>) -> Self {
23 let mut queue: VecDeque<Bytes> = VecDeque::with_capacity(16);
24 for msg in data {
25 let buf = Self::encode(msg);
26 queue.push_back(buf);
27 }
28
29 MockBody { data: queue }
30 }
31
32 pub fn len(&self) -> usize {
33 self.data.len()
34 }
35
36 pub fn is_empty(&self) -> bool {
37 self.data.is_empty()
38 }
39
40 // see: https://github.com/hyperium/tonic/blob/1b03ece2a81cb7e8b1922b3c3c1f496bd402d76c/tonic/src/codec/encode.rs#L52
41 fn encode(msg: impl Message) -> Bytes {
42 let mut buf = BytesMut::with_capacity(256);
43
44 buf.reserve(5);
45 unsafe {
46 buf.advance_mut(5);
47 }
48 msg.encode(&mut buf).unwrap();
49 {
50 let len = buf.len() - 5;
51 let mut buf = &mut buf[..5];
52 buf.put_u8(0); // byte must be 0, reserve doesn't auto-zero
53 buf.put_u32(len as u32);
54 }
55 buf.freeze()
56 }
57}
58
59impl Body for MockBody {
60 type Data = Bytes;
61 type Error = Status;
62
63 fn poll_data(
64 mut self: Pin<&mut Self>,
65 _: &mut Context<'_>,
66 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
67 if !self.is_empty() {
68 let msg = self.data.pop_front().unwrap();
69 Poll::Ready(Some(Ok(msg)))
70 } else {
71 Poll::Ready(None)
72 }
73 }
74
75 fn poll_trailers(
76 self: Pin<&mut Self>,
77 _: &mut Context<'_>,
78 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
79 Poll::Ready(Ok(None))
80 }
81}
82/// A [`Decoder`] that knows how to decode `U`.
83#[derive(Debug, Clone, Default)]
84pub struct ProstDecoder<U>(PhantomData<U>);
85
86impl<U> ProstDecoder<U> {
87 pub fn new() -> Self {
88 Self(PhantomData)
89 }
90}
91
92impl<U: Message + Default> Decoder for ProstDecoder<U> {
93 type Item = U;
94 type Error = Status;
95
96 fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
97 let item = Message::decode(buf.chunk())
98 .map(Option::Some)
99 .map_err(|e| Status::internal(e.to_string()))?;
100
101 buf.advance(buf.chunk().len());
102 Ok(item)
103 }
104}