Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use anyhow::Result;
2use async_trait::async_trait;
3use prost_types::Timestamp;
4use std::collections::HashMap;
5use tokio::time::Duration as TokioDuration;
6
7#[derive(Clone)]
8pub struct ConsumerConfig {
9 pub max_time_to_send_to_async_queue: TokioDuration,
10 pub max_acking_receiver_wait_time: TokioDuration,
11}
12
13impl Default for ConsumerConfig {
14 fn default() -> Self {
15 Self {
16 max_time_to_send_to_async_queue: TokioDuration::from_millis(
17 std::env::var("MAX_TIME_TO_SEND_TO_ASYNC_QUEUE_MS")
18 .unwrap_or("500".to_string())
19 .parse::<u64>()
20 .unwrap(),
21 ),
22 max_acking_receiver_wait_time: TokioDuration::from_millis(
23 std::env::var("MAX_ACKING_RECEIVER_WAIT_TIME_MS")
24 .unwrap_or("60000".to_string())
25 .parse::<u64>()
26 .unwrap(),
27 ),
28 }
29 }
30}
31
32pub trait ConsumerMessage {
33 fn data(&self) -> &[u8];
34 fn attributes(&self) -> &HashMap<String, String>;
35 fn timestamp(&self) -> Timestamp;
36 fn id(&self) -> String;
37}
38
39#[async_trait]
40pub trait MessageConsumer: Send {
41 type Message: ConsumerMessage;
42 type Error: std::error::Error + Send + Sync + 'static;
43
44 async fn receive(&mut self) -> Result<Self::Message, Self::Error>;
45
46 async fn ack(&self, message: &Self::Message) -> Result<(), Self::Error>;
47
48 async fn nack(&self, message: &Self::Message) -> Result<(), Self::Error>;
49}