Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use crate::metrics::{
2 define_metrics, emit_worker::SpawnEmitWorker, string_interner::StringInterner, SharedMetrics,
3};
4use crate::tokio_utils::AbortOnDrop;
5use anyhow::Result;
6use prost::Message;
7use std::{
8 collections::HashMap,
9 fmt::Display,
10 marker::PhantomData,
11 mem::take,
12 sync::{mpsc::SendError, Arc},
13 time::Duration,
14};
15// Use backoff 0.4 explicitly (there's also 0.1.6 in the dep tree)
16extern crate backoff as backoff_v04;
17use backoff_v04 as backoff;
18use tokio::{
19 sync::{broadcast, mpsc},
20 time::MissedTickBehavior,
21};
22use tonic::codegen::InterceptedService;
23
24use crate::gcloud::{
25 auth::AuthorizationHeaderInterceptor,
26 google::pubsub::v1::{self as proto, publisher_client, subscriber_client},
27 grpc::connection::Connection,
28};
29
30pub const GOOGLE_PUBSUB_DOMAIN: &str = "pubsub.googleapis.com";
31
32impl Connection {
33 /// Creates a pubsub publisher for a specific topic. The publisher ends when all handles are dropped.
34 ///
35 /// Messages are published when:
36 /// - `max_interval` time has passed
37 /// - `max_buffer_size` for the queue buffer has been reached
38 pub fn create_pubsub_publisher(
39 &self,
40 topic: PubSubTopic,
41 max_interval: Duration,
42 max_buffer_size: usize,
43 metrics: SharedMetrics,
44 ) -> TopicPublisherHandle {
45 let publisher = TopicPublisher {
46 client: self.create_publisher_client(),
47 topic,
48 pubsub_stats: PubsubStats::new(),
49 string_interner: StringInterner::new(8),
50 };
51
52 let (send_queue_tx, closed_rx) =
53 publisher.spawn_send_queue_flusher(max_interval, max_buffer_size, metrics);
54
55 TopicPublisherHandle {
56 send_queue_tx,
57 closed_rx,
58 }
59 }
60
61 /// Creates a **synchronous** pubsub publisher for a specific topic.
62 ///
63 /// Synchronous, not meaning "blocking", but rather, meaning, that there is no batching. When you call `publish`,
64 /// if it succeeds, it means that your message was accepted by the remote server.
65 pub fn create_synchronous_pubsub_publisher(
66 &self,
67 topic: PubSubTopic,
68 metrics: SharedMetrics,
69 ) -> SynchronousTopicPublisher {
70 SynchronousTopicPublisher::new(topic, self.create_publisher_client(), metrics)
71 }
72
73 /// Creates a subscriber client, which is used to provide a client to `pub_sub_streaming_pull::StreamingPullManager`
74 pub fn create_subscriber_client(
75 &self,
76 ) -> subscriber_client::SubscriberClient<
77 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>,
78 > {
79 subscriber_client::SubscriberClient::with_interceptor(
80 self.channel.clone(),
81 self.authorization_header_interceptor.clone(),
82 )
83 }
84
85 /// Creates a publisher client, which is used to provide a client to `pub_sub_streaming_pull::StreamingPullManager`
86 pub fn create_publisher_client(
87 &self,
88 ) -> publisher_client::PublisherClient<
89 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>,
90 > {
91 publisher_client::PublisherClient::with_interceptor(
92 self.channel.clone(),
93 self.authorization_header_interceptor.clone(),
94 )
95 }
96}
97
98/// Represents a pub-sub subscription
99#[derive(Debug, Clone, Eq, PartialEq, Hash)]
100pub struct PubSubSubscription {
101 project: String,
102 subscription: String,
103}
104
105impl PubSubSubscription {
106 pub fn new(project: impl Into<String>, subscription: impl Into<String>) -> Self {
107 Self {
108 project: project.into(),
109 subscription: subscription.into(),
110 }
111 }
112
113 pub fn project(&self) -> &str {
114 &self.project
115 }
116
117 pub fn subscription(&self) -> &str {
118 &self.subscription
119 }
120}
121
122impl Display for PubSubSubscription {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 write!(
125 f,
126 "projects/{}/subscriptions/{}",
127 self.project, self.subscription
128 )
129 }
130}
131
132/// Represents a pub-sub topic
133#[derive(Debug, Clone, Eq, PartialEq, Hash)]
134pub struct PubSubTopic {
135 project: String,
136 topic: String,
137}
138
139impl PubSubTopic {
140 pub fn new(project: impl Into<String>, topic: impl Into<String>) -> Self {
141 Self {
142 project: project.into(),
143 topic: topic.into(),
144 }
145 }
146
147 pub fn project(&self) -> &str {
148 &self.project
149 }
150
151 pub fn topic(&self) -> &str {
152 &self.topic
153 }
154}
155
156impl Display for PubSubTopic {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 write!(f, "projects/{}/topics/{}", self.project, self.topic)
159 }
160}
161
162pub struct TopicPublisher {
163 client: publisher_client::PublisherClient<
164 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>,
165 >,
166 topic: PubSubTopic,
167 pubsub_stats: Arc<PubsubStats>,
168 string_interner: StringInterner,
169}
170
171impl TopicPublisher {
172 // FIXME: To make this a more generalizable abstraction, we should add a few things:
173 // - the ability to "close" the sender,
174 // - ability to wait for the send queue after close to finish flushing, to allow an application to delay shutdown
175 // until all messages are successfully delivered to pub-sub.
176
177 /// Starts a background task that flushes the queue when:
178 /// - `max_queue_flush_interval` time has passed, or:
179 /// - `max_buffer_size` for the queue buffer has been reached.
180 fn spawn_send_queue_flusher(
181 mut self,
182 max_queue_flush_interval: Duration,
183 max_buffer_size: usize,
184 metrics: SharedMetrics,
185 ) -> (
186 mpsc::UnboundedSender<proto::PubsubMessage>,
187 broadcast::Receiver<()>,
188 ) {
189 let (send_queue_tx, mut send_queue_rx) = mpsc::unbounded_channel();
190
191 // We use a broadcast channel to notify any outstanding [`TopicPublisherHandle`] instances that we have flushed
192 // everything. The sender (closed_tx) is moved into the background thread and dropped when the publisher shuts
193 // down.
194 let (closed_tx, closed_rx) = broadcast::channel(1);
195
196 tokio::spawn(async move {
197 let _closed_tx = closed_tx;
198 let mut interval = tokio::time::interval(max_queue_flush_interval);
199 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200 let _metrics_emitter = self.pubsub_stats.clone().spawn_emit_worker(metrics);
201
202 let mut messages = Vec::new();
203
204 tracing::info!(
205 "Pubsub: publisher created for {} with {} buffer @ {:?} flush interval",
206 self.topic,
207 max_buffer_size,
208 max_queue_flush_interval
209 );
210
211 let mut sender_dropped = false;
212
213 while !sender_dropped {
214 let should_flush_queue = tokio::select! {
215 // Force a flush when the interval is elapsed, if we have something in the queue.
216 _ = interval.tick() => !messages.is_empty(),
217 message = send_queue_rx.recv() => {
218 if let Some(message) = message {
219 messages.push(message);
220 messages.len() >= max_buffer_size
221 } else {
222 // Getting None back from send_queue_rx, means that we have no more senders, so we can stop,
223 // after we've completed all our work.
224 tracing::info!(
225 "Pubsub: publisher for {} shutting down, because no more senders are alive, {} messages in queue to flush, before shutdown",
226 self.topic,
227 messages.len()
228 );
229
230 sender_dropped = true;
231 !messages.is_empty()
232 }
233
234 },
235 };
236
237 if should_flush_queue {
238 // Reset since we are flushing, and this could've been triggered
239 // by a request rather than the interval
240 interval.reset();
241
242 let req = proto::PublishRequest {
243 topic: self.topic.to_string(),
244 messages: take(&mut messages),
245 };
246
247 self.send_publish(req).await;
248 }
249 }
250 });
251
252 (send_queue_tx, closed_rx)
253 }
254
255 async fn send_publish(&mut self, req: proto::PublishRequest) {
256 let queue_size = req.messages.len() as i64;
257
258 let req_ref = &req;
259 let publish = || {
260 // NOTE(rossdylan): In order to handle the lifetime requirements on future we return here, we need to either
261 // clone or take references to the fields we need out of self.
262 let mut client_clone = self.client.clone();
263 let stats = self.pubsub_stats.clone();
264 let project = self.string_interner.intern(&self.topic.project);
265 let topic = self.string_interner.intern(&self.topic.topic);
266 async move {
267 // NOTE(rossdylan): Tonic requires a brand new message for every attempt so we unfortunately need this clone
268 match client_clone.publish(req_ref.clone()).await {
269 Err(e) => {
270 tracing::error!("failed to flush pubsub messages due to {:?}", e);
271 stats.publish_failed.incr_by(queue_size, project, topic);
272 // Mapping of transient/permanent errors taken from
273 // https://cloud.google.com/pubsub/docs/reference/error-codes
274 let err = match e.code() {
275 tonic::Code::NotFound => backoff::Error::Permanent(e),
276 tonic::Code::AlreadyExists => backoff::Error::Permanent(e),
277 tonic::Code::PermissionDenied => backoff::Error::Permanent(e),
278 tonic::Code::FailedPrecondition => backoff::Error::Permanent(e),
279 tonic::Code::Unauthenticated => backoff::Error::Permanent(e),
280 _ => backoff::Error::transient(e),
281 };
282 Err(err)
283 }
284 _ => {
285 stats.publish_sends.incr_by(queue_size, project, topic);
286 Ok(())
287 }
288 }
289 }
290 };
291 let final_res =
292 backoff::future::retry(backoff::ExponentialBackoff::default(), publish).await;
293 if let Err(err) = final_res {
294 tracing::error!("permanent failure to flush {queue_size} messages due to {err:?}");
295 }
296 }
297}
298
299#[derive(Debug, Clone)]
300pub struct PubSubMessage<State = PubSubMessageValid> {
301 data: Vec<u8>,
302 attributes: HashMap<String, String>,
303 ordering_key: Option<String>,
304 _state: PhantomData<State>,
305}
306
307// Two different type-states to represent a message that is fully formed (valid), versus partially formed (invalid),
308// a message is fully formed when it has a non-empty data, or when it has an empty data, but non-empty attributes.
309pub struct PubSubMessageInvalid;
310
311#[derive(Clone, Debug)]
312pub struct PubSubMessageValid;
313
314impl PubSubMessage<PubSubMessageInvalid> {
315 /// Constructs a message with no data, you must call [`PubSubMessage::set_attribute`] at least once, before you
316 /// can publish this message. If data is empty, an attribute, *must* be specified.
317 pub fn empty() -> Self {
318 Self {
319 data: Default::default(),
320 attributes: Default::default(),
321 ordering_key: Default::default(),
322 _state: PhantomData,
323 }
324 }
325}
326
327/// Returned from `PubsubMessage::with_data` when the provided data is an empty Vec.
328#[derive(Debug, Clone)]
329pub struct PubsubMessageDataEmpty;
330
331impl std::fmt::Display for PubsubMessageDataEmpty {
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 f.write_str("creating an empty PubsubMessage via with_data is not supported, use PubsubMessage::empty() instead")
334 }
335}
336impl std::error::Error for PubsubMessageDataEmpty {}
337
338impl PubSubMessage<PubSubMessageValid> {
339 /// Constructs a message with the given data.
340 ///
341 /// Returns an error if the provided `data` is Empty. Prefer to use `PubsubMessage::empty()` instead.
342 pub fn with_data(data: impl Into<Vec<u8>>) -> Result<Self, PubsubMessageDataEmpty> {
343 let data: Vec<_> = data.into();
344 if data.is_empty() {
345 return Err(PubsubMessageDataEmpty);
346 }
347
348 Ok(Self {
349 data,
350 attributes: Default::default(),
351 ordering_key: Default::default(),
352 _state: PhantomData,
353 })
354 }
355
356 /// Constructs a message with the given proto data.
357 ///
358 /// The program will panic if the provided proto message `data` is empty.
359 pub fn with_proto_data<T: prost::Message>(data: T) -> Self {
360 PubSubMessage::with_data(data.encode_to_vec()).expect("proto message can't be empty")
361 }
362
363 /// Creates a `PubSubMessage` with no data, and the given attribute set.
364 pub fn with_attribute(key: impl Into<String>, value: impl Into<String>) -> Self {
365 let mut attributes = HashMap::new();
366 attributes.insert(key.into(), value.into());
367
368 Self {
369 attributes,
370 data: Default::default(),
371 ordering_key: Default::default(),
372 _state: PhantomData,
373 }
374 }
375
376 pub fn with_serde_data<T: serde::Serialize>(data: T) -> Result<Self, PubsubMessageDataEmpty> {
377 let data = serde_json::to_string(&data).map_err(|_| PubsubMessageDataEmpty)?;
378 Self::with_data(data)
379 }
380
381 /// Creates a `PubSubMessage` with no data, and the given attribute item set.
382 pub fn with_attribute_item<MessageAttributeItemT: MessageAttributeItem>(
383 attribute_item: MessageAttributeItemT,
384 ) -> Self {
385 Self::with_attribute(MessageAttributeItemT::KEY, attribute_item.into_value())
386 }
387
388 /// Converts this to an encoded proto message.
389 pub fn encode_to_vec(self) -> Vec<u8> {
390 self.to_proto_message().encode_to_vec()
391 }
392
393 // Converts this to a proto message.
394 fn to_proto_message(self) -> proto::PubsubMessage {
395 proto::PubsubMessage {
396 attributes: self.attributes,
397 data: self.data,
398 ordering_key: self.ordering_key.unwrap_or_default(),
399 // These fields are set by the server.
400 message_id: Default::default(),
401 publish_time: None,
402 }
403 }
404}
405
406impl<T> PubSubMessage<T> {
407 /// Sets a given attribute on the message.
408 pub fn set_attribute(
409 mut self,
410 key: impl Into<String>,
411 value: impl Into<String>,
412 ) -> PubSubMessage<PubSubMessageValid> {
413 self.attributes.insert(key.into(), value.into());
414
415 PubSubMessage {
416 data: self.data,
417 attributes: self.attributes,
418 ordering_key: self.ordering_key,
419 _state: PhantomData,
420 }
421 }
422
423 pub fn set_attribute_item<MessageAttributeItemT: MessageAttributeItem>(
424 self,
425 attribute_item: MessageAttributeItemT,
426 ) -> PubSubMessage<PubSubMessageValid> {
427 self.set_attribute(MessageAttributeItemT::KEY, attribute_item.into_value())
428 }
429
430 /// Sets the ordering key for the message.
431 pub fn set_ordering_key(mut self, ordering_key: impl Into<String>) -> Self {
432 self.ordering_key = Some(ordering_key.into());
433
434 self
435 }
436}
437
438/// A message attribute item which encodes the key and value in a single struct.
439pub trait MessageAttributeItem {
440 const KEY: &'static str;
441
442 fn into_value(self) -> String;
443}
444
445#[derive(Debug)]
446pub struct TopicPublisherHandle {
447 closed_rx: broadcast::Receiver<()>,
448 send_queue_tx: mpsc::UnboundedSender<proto::PubsubMessage>,
449}
450
451impl Clone for TopicPublisherHandle {
452 fn clone(&self) -> Self {
453 Self {
454 closed_rx: self.closed_rx.resubscribe(),
455 send_queue_tx: self.send_queue_tx.clone(),
456 }
457 }
458}
459
460impl TopicPublisherHandle {
461 pub fn new(
462 send_queue_tx: mpsc::UnboundedSender<proto::PubsubMessage>,
463 closed_rx: broadcast::Receiver<()>,
464 ) -> Self {
465 Self {
466 send_queue_tx,
467 closed_rx,
468 }
469 }
470}
471
472impl TopicPublisherHandle {
473 pub fn publish(&self, message: PubSubMessage) -> Result<(), SendError<PubSubMessage>> {
474 let did_have_ordering_key = message.ordering_key.is_some();
475
476 self.send_queue_tx
477 .send(message.to_proto_message())
478 .map_err(|e| {
479 SendError(PubSubMessage {
480 data: e.0.data,
481 attributes: e.0.attributes,
482 ordering_key: did_have_ordering_key.then_some(e.0.ordering_key),
483 _state: PhantomData,
484 })
485 })
486 }
487
488 /// Consume this handle and wait for the underlying publisher to exit.
489 /// NOTE: If you clone and move this handle around this method will block until all handles have been dropped
490 pub async fn close(self) {
491 // Split our fields out so we can explicitly drop the send queue and wait for the closed notification
492 let Self {
493 mut closed_rx,
494 send_queue_tx,
495 } = self;
496 drop(send_queue_tx);
497 // This is fine because all we care about is that this completed
498 let _res = closed_rx.recv().await;
499 }
500
501 pub async fn close_timeout(self, timeout: Duration) -> Result<()> {
502 // Split our fields out so we can explicitly drop the send queue and wait for the closed notification
503 let Self {
504 mut closed_rx,
505 send_queue_tx,
506 } = self;
507 drop(send_queue_tx);
508 let _res = tokio::time::timeout(timeout, closed_rx.recv()).await?;
509 Ok(())
510 }
511}
512
513struct SynchronousTopicPublisherInner {
514 topic: PubSubTopic,
515 client: publisher_client::PublisherClient<
516 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>,
517 >,
518 pubsub_stats: Arc<PubsubStats>,
519 string_interner: StringInterner,
520 _metrics_emitter: AbortOnDrop,
521}
522
523#[derive(Clone)]
524pub struct SynchronousTopicPublisher {
525 inner: Arc<SynchronousTopicPublisherInner>,
526}
527
528impl SynchronousTopicPublisher {
529 pub fn new(
530 topic: PubSubTopic,
531 client: publisher_client::PublisherClient<
532 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>,
533 >,
534 metrics: SharedMetrics,
535 ) -> Self {
536 let pubsub_stats = PubsubStats::new();
537 let _metrics_emitter = pubsub_stats.clone().spawn_emit_worker(metrics);
538
539 Self {
540 inner: Arc::new(SynchronousTopicPublisherInner {
541 topic,
542 client,
543 pubsub_stats,
544 string_interner: StringInterner::new(8),
545 _metrics_emitter,
546 }),
547 }
548 }
549
550 pub async fn publish(&self, messages: &[PubSubMessage]) -> Result<(), tonic::Status> {
551 let mut client = self.inner.client.clone();
552 let num_messages = messages.len() as i64;
553 match client
554 .publish(proto::PublishRequest {
555 topic: self.inner.topic.to_string(),
556 messages: messages
557 .iter()
558 .cloned()
559 .map(|x| x.to_proto_message())
560 .collect(),
561 })
562 .await
563 {
564 Ok(_) => {
565 self.inner.pubsub_stats.publish_sends.incr_by(
566 num_messages,
567 self.inner.string_interner.intern(&self.inner.topic.project),
568 self.inner.string_interner.intern(&self.inner.topic.topic),
569 );
570 Ok(())
571 }
572 Err(err) => {
573 self.inner.pubsub_stats.publish_failed.incr_by(
574 num_messages,
575 self.inner.string_interner.intern(&self.inner.topic.project),
576 self.inner.string_interner.intern(&self.inner.topic.topic),
577 );
578 Err(err)
579 }
580 }
581 }
582}
583
584define_metrics!(PubsubStats, [
585 publish_failed => DynamicCounter("osprey_gcloud.pubsub.publish.failed", [project, topic]),
586 publish_sends => DynamicCounter("osprey_gcloud.pubsub.publish.sends", [project, topic]),
587]);