Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 104 lines 2.5 kB view raw
1use bytes::{Buf, BufMut, Bytes, BytesMut}; 2use http_body::Body; 3use prost::Message; 4use std::{ 5 collections::VecDeque, 6 marker::PhantomData, 7 pin::Pin, 8 task::{Context, Poll}, 9}; 10 11use tonic::{ 12 codec::{DecodeBuf, Decoder}, 13 Status, 14}; 15 16#[derive(Clone)] 17pub struct MockBody { 18 data: VecDeque<Bytes>, 19} 20 21impl MockBody { 22 pub fn new(data: Vec<impl Message>) -> Self { 23 let mut queue: VecDeque<Bytes> = VecDeque::with_capacity(16); 24 for msg in data { 25 let buf = Self::encode(msg); 26 queue.push_back(buf); 27 } 28 29 MockBody { data: queue } 30 } 31 32 pub fn len(&self) -> usize { 33 self.data.len() 34 } 35 36 pub fn is_empty(&self) -> bool { 37 self.data.is_empty() 38 } 39 40 // see: https://github.com/hyperium/tonic/blob/1b03ece2a81cb7e8b1922b3c3c1f496bd402d76c/tonic/src/codec/encode.rs#L52 41 fn encode(msg: impl Message) -> Bytes { 42 let mut buf = BytesMut::with_capacity(256); 43 44 buf.reserve(5); 45 unsafe { 46 buf.advance_mut(5); 47 } 48 msg.encode(&mut buf).unwrap(); 49 { 50 let len = buf.len() - 5; 51 let mut buf = &mut buf[..5]; 52 buf.put_u8(0); // byte must be 0, reserve doesn't auto-zero 53 buf.put_u32(len as u32); 54 } 55 buf.freeze() 56 } 57} 58 59impl Body for MockBody { 60 type Data = Bytes; 61 type Error = Status; 62 63 fn poll_data( 64 mut self: Pin<&mut Self>, 65 _: &mut Context<'_>, 66 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 67 if !self.is_empty() { 68 let msg = self.data.pop_front().unwrap(); 69 Poll::Ready(Some(Ok(msg))) 70 } else { 71 Poll::Ready(None) 72 } 73 } 74 75 fn poll_trailers( 76 self: Pin<&mut Self>, 77 _: &mut Context<'_>, 78 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { 79 Poll::Ready(Ok(None)) 80 } 81} 82/// A [`Decoder`] that knows how to decode `U`. 83#[derive(Debug, Clone, Default)] 84pub struct ProstDecoder<U>(PhantomData<U>); 85 86impl<U> ProstDecoder<U> { 87 pub fn new() -> Self { 88 Self(PhantomData) 89 } 90} 91 92impl<U: Message + Default> Decoder for ProstDecoder<U> { 93 type Item = U; 94 type Error = Status; 95 96 fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> { 97 let item = Message::decode(buf.chunk()) 98 .map(Option::Some) 99 .map_err(|e| Status::internal(e.to_string()))?; 100 101 buf.advance(buf.chunk().len()); 102 Ok(item) 103 } 104}