Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 587 lines 21 kB view raw
1use crate::metrics::{ 2 define_metrics, emit_worker::SpawnEmitWorker, string_interner::StringInterner, SharedMetrics, 3}; 4use crate::tokio_utils::AbortOnDrop; 5use anyhow::Result; 6use prost::Message; 7use std::{ 8 collections::HashMap, 9 fmt::Display, 10 marker::PhantomData, 11 mem::take, 12 sync::{mpsc::SendError, Arc}, 13 time::Duration, 14}; 15// Use backoff 0.4 explicitly (there's also 0.1.6 in the dep tree) 16extern crate backoff as backoff_v04; 17use backoff_v04 as backoff; 18use tokio::{ 19 sync::{broadcast, mpsc}, 20 time::MissedTickBehavior, 21}; 22use tonic::codegen::InterceptedService; 23 24use crate::gcloud::{ 25 auth::AuthorizationHeaderInterceptor, 26 google::pubsub::v1::{self as proto, publisher_client, subscriber_client}, 27 grpc::connection::Connection, 28}; 29 30pub const GOOGLE_PUBSUB_DOMAIN: &str = "pubsub.googleapis.com"; 31 32impl Connection { 33 /// Creates a pubsub publisher for a specific topic. The publisher ends when all handles are dropped. 34 /// 35 /// Messages are published when: 36 /// - `max_interval` time has passed 37 /// - `max_buffer_size` for the queue buffer has been reached 38 pub fn create_pubsub_publisher( 39 &self, 40 topic: PubSubTopic, 41 max_interval: Duration, 42 max_buffer_size: usize, 43 metrics: SharedMetrics, 44 ) -> TopicPublisherHandle { 45 let publisher = TopicPublisher { 46 client: self.create_publisher_client(), 47 topic, 48 pubsub_stats: PubsubStats::new(), 49 string_interner: StringInterner::new(8), 50 }; 51 52 let (send_queue_tx, closed_rx) = 53 publisher.spawn_send_queue_flusher(max_interval, max_buffer_size, metrics); 54 55 TopicPublisherHandle { 56 send_queue_tx, 57 closed_rx, 58 } 59 } 60 61 /// Creates a **synchronous** pubsub publisher for a specific topic. 62 /// 63 /// Synchronous, not meaning "blocking", but rather, meaning, that there is no batching. When you call `publish`, 64 /// if it succeeds, it means that your message was accepted by the remote server. 65 pub fn create_synchronous_pubsub_publisher( 66 &self, 67 topic: PubSubTopic, 68 metrics: SharedMetrics, 69 ) -> SynchronousTopicPublisher { 70 SynchronousTopicPublisher::new(topic, self.create_publisher_client(), metrics) 71 } 72 73 /// Creates a subscriber client, which is used to provide a client to `pub_sub_streaming_pull::StreamingPullManager` 74 pub fn create_subscriber_client( 75 &self, 76 ) -> subscriber_client::SubscriberClient< 77 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>, 78 > { 79 subscriber_client::SubscriberClient::with_interceptor( 80 self.channel.clone(), 81 self.authorization_header_interceptor.clone(), 82 ) 83 } 84 85 /// Creates a publisher client, which is used to provide a client to `pub_sub_streaming_pull::StreamingPullManager` 86 pub fn create_publisher_client( 87 &self, 88 ) -> publisher_client::PublisherClient< 89 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>, 90 > { 91 publisher_client::PublisherClient::with_interceptor( 92 self.channel.clone(), 93 self.authorization_header_interceptor.clone(), 94 ) 95 } 96} 97 98/// Represents a pub-sub subscription 99#[derive(Debug, Clone, Eq, PartialEq, Hash)] 100pub struct PubSubSubscription { 101 project: String, 102 subscription: String, 103} 104 105impl PubSubSubscription { 106 pub fn new(project: impl Into<String>, subscription: impl Into<String>) -> Self { 107 Self { 108 project: project.into(), 109 subscription: subscription.into(), 110 } 111 } 112 113 pub fn project(&self) -> &str { 114 &self.project 115 } 116 117 pub fn subscription(&self) -> &str { 118 &self.subscription 119 } 120} 121 122impl Display for PubSubSubscription { 123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 124 write!( 125 f, 126 "projects/{}/subscriptions/{}", 127 self.project, self.subscription 128 ) 129 } 130} 131 132/// Represents a pub-sub topic 133#[derive(Debug, Clone, Eq, PartialEq, Hash)] 134pub struct PubSubTopic { 135 project: String, 136 topic: String, 137} 138 139impl PubSubTopic { 140 pub fn new(project: impl Into<String>, topic: impl Into<String>) -> Self { 141 Self { 142 project: project.into(), 143 topic: topic.into(), 144 } 145 } 146 147 pub fn project(&self) -> &str { 148 &self.project 149 } 150 151 pub fn topic(&self) -> &str { 152 &self.topic 153 } 154} 155 156impl Display for PubSubTopic { 157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 158 write!(f, "projects/{}/topics/{}", self.project, self.topic) 159 } 160} 161 162pub struct TopicPublisher { 163 client: publisher_client::PublisherClient< 164 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>, 165 >, 166 topic: PubSubTopic, 167 pubsub_stats: Arc<PubsubStats>, 168 string_interner: StringInterner, 169} 170 171impl TopicPublisher { 172 // FIXME: To make this a more generalizable abstraction, we should add a few things: 173 // - the ability to "close" the sender, 174 // - ability to wait for the send queue after close to finish flushing, to allow an application to delay shutdown 175 // until all messages are successfully delivered to pub-sub. 176 177 /// Starts a background task that flushes the queue when: 178 /// - `max_queue_flush_interval` time has passed, or: 179 /// - `max_buffer_size` for the queue buffer has been reached. 180 fn spawn_send_queue_flusher( 181 mut self, 182 max_queue_flush_interval: Duration, 183 max_buffer_size: usize, 184 metrics: SharedMetrics, 185 ) -> ( 186 mpsc::UnboundedSender<proto::PubsubMessage>, 187 broadcast::Receiver<()>, 188 ) { 189 let (send_queue_tx, mut send_queue_rx) = mpsc::unbounded_channel(); 190 191 // We use a broadcast channel to notify any outstanding [`TopicPublisherHandle`] instances that we have flushed 192 // everything. The sender (closed_tx) is moved into the background thread and dropped when the publisher shuts 193 // down. 194 let (closed_tx, closed_rx) = broadcast::channel(1); 195 196 tokio::spawn(async move { 197 let _closed_tx = closed_tx; 198 let mut interval = tokio::time::interval(max_queue_flush_interval); 199 interval.set_missed_tick_behavior(MissedTickBehavior::Skip); 200 let _metrics_emitter = self.pubsub_stats.clone().spawn_emit_worker(metrics); 201 202 let mut messages = Vec::new(); 203 204 tracing::info!( 205 "Pubsub: publisher created for {} with {} buffer @ {:?} flush interval", 206 self.topic, 207 max_buffer_size, 208 max_queue_flush_interval 209 ); 210 211 let mut sender_dropped = false; 212 213 while !sender_dropped { 214 let should_flush_queue = tokio::select! { 215 // Force a flush when the interval is elapsed, if we have something in the queue. 216 _ = interval.tick() => !messages.is_empty(), 217 message = send_queue_rx.recv() => { 218 if let Some(message) = message { 219 messages.push(message); 220 messages.len() >= max_buffer_size 221 } else { 222 // Getting None back from send_queue_rx, means that we have no more senders, so we can stop, 223 // after we've completed all our work. 224 tracing::info!( 225 "Pubsub: publisher for {} shutting down, because no more senders are alive, {} messages in queue to flush, before shutdown", 226 self.topic, 227 messages.len() 228 ); 229 230 sender_dropped = true; 231 !messages.is_empty() 232 } 233 234 }, 235 }; 236 237 if should_flush_queue { 238 // Reset since we are flushing, and this could've been triggered 239 // by a request rather than the interval 240 interval.reset(); 241 242 let req = proto::PublishRequest { 243 topic: self.topic.to_string(), 244 messages: take(&mut messages), 245 }; 246 247 self.send_publish(req).await; 248 } 249 } 250 }); 251 252 (send_queue_tx, closed_rx) 253 } 254 255 async fn send_publish(&mut self, req: proto::PublishRequest) { 256 let queue_size = req.messages.len() as i64; 257 258 let req_ref = &req; 259 let publish = || { 260 // NOTE(rossdylan): In order to handle the lifetime requirements on future we return here, we need to either 261 // clone or take references to the fields we need out of self. 262 let mut client_clone = self.client.clone(); 263 let stats = self.pubsub_stats.clone(); 264 let project = self.string_interner.intern(&self.topic.project); 265 let topic = self.string_interner.intern(&self.topic.topic); 266 async move { 267 // NOTE(rossdylan): Tonic requires a brand new message for every attempt so we unfortunately need this clone 268 match client_clone.publish(req_ref.clone()).await { 269 Err(e) => { 270 tracing::error!("failed to flush pubsub messages due to {:?}", e); 271 stats.publish_failed.incr_by(queue_size, project, topic); 272 // Mapping of transient/permanent errors taken from 273 // https://cloud.google.com/pubsub/docs/reference/error-codes 274 let err = match e.code() { 275 tonic::Code::NotFound => backoff::Error::Permanent(e), 276 tonic::Code::AlreadyExists => backoff::Error::Permanent(e), 277 tonic::Code::PermissionDenied => backoff::Error::Permanent(e), 278 tonic::Code::FailedPrecondition => backoff::Error::Permanent(e), 279 tonic::Code::Unauthenticated => backoff::Error::Permanent(e), 280 _ => backoff::Error::transient(e), 281 }; 282 Err(err) 283 } 284 _ => { 285 stats.publish_sends.incr_by(queue_size, project, topic); 286 Ok(()) 287 } 288 } 289 } 290 }; 291 let final_res = 292 backoff::future::retry(backoff::ExponentialBackoff::default(), publish).await; 293 if let Err(err) = final_res { 294 tracing::error!("permanent failure to flush {queue_size} messages due to {err:?}"); 295 } 296 } 297} 298 299#[derive(Debug, Clone)] 300pub struct PubSubMessage<State = PubSubMessageValid> { 301 data: Vec<u8>, 302 attributes: HashMap<String, String>, 303 ordering_key: Option<String>, 304 _state: PhantomData<State>, 305} 306 307// Two different type-states to represent a message that is fully formed (valid), versus partially formed (invalid), 308// a message is fully formed when it has a non-empty data, or when it has an empty data, but non-empty attributes. 309pub struct PubSubMessageInvalid; 310 311#[derive(Clone, Debug)] 312pub struct PubSubMessageValid; 313 314impl PubSubMessage<PubSubMessageInvalid> { 315 /// Constructs a message with no data, you must call [`PubSubMessage::set_attribute`] at least once, before you 316 /// can publish this message. If data is empty, an attribute, *must* be specified. 317 pub fn empty() -> Self { 318 Self { 319 data: Default::default(), 320 attributes: Default::default(), 321 ordering_key: Default::default(), 322 _state: PhantomData, 323 } 324 } 325} 326 327/// Returned from `PubsubMessage::with_data` when the provided data is an empty Vec. 328#[derive(Debug, Clone)] 329pub struct PubsubMessageDataEmpty; 330 331impl std::fmt::Display for PubsubMessageDataEmpty { 332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 333 f.write_str("creating an empty PubsubMessage via with_data is not supported, use PubsubMessage::empty() instead") 334 } 335} 336impl std::error::Error for PubsubMessageDataEmpty {} 337 338impl PubSubMessage<PubSubMessageValid> { 339 /// Constructs a message with the given data. 340 /// 341 /// Returns an error if the provided `data` is Empty. Prefer to use `PubsubMessage::empty()` instead. 342 pub fn with_data(data: impl Into<Vec<u8>>) -> Result<Self, PubsubMessageDataEmpty> { 343 let data: Vec<_> = data.into(); 344 if data.is_empty() { 345 return Err(PubsubMessageDataEmpty); 346 } 347 348 Ok(Self { 349 data, 350 attributes: Default::default(), 351 ordering_key: Default::default(), 352 _state: PhantomData, 353 }) 354 } 355 356 /// Constructs a message with the given proto data. 357 /// 358 /// The program will panic if the provided proto message `data` is empty. 359 pub fn with_proto_data<T: prost::Message>(data: T) -> Self { 360 PubSubMessage::with_data(data.encode_to_vec()).expect("proto message can't be empty") 361 } 362 363 /// Creates a `PubSubMessage` with no data, and the given attribute set. 364 pub fn with_attribute(key: impl Into<String>, value: impl Into<String>) -> Self { 365 let mut attributes = HashMap::new(); 366 attributes.insert(key.into(), value.into()); 367 368 Self { 369 attributes, 370 data: Default::default(), 371 ordering_key: Default::default(), 372 _state: PhantomData, 373 } 374 } 375 376 pub fn with_serde_data<T: serde::Serialize>(data: T) -> Result<Self, PubsubMessageDataEmpty> { 377 let data = serde_json::to_string(&data).map_err(|_| PubsubMessageDataEmpty)?; 378 Self::with_data(data) 379 } 380 381 /// Creates a `PubSubMessage` with no data, and the given attribute item set. 382 pub fn with_attribute_item<MessageAttributeItemT: MessageAttributeItem>( 383 attribute_item: MessageAttributeItemT, 384 ) -> Self { 385 Self::with_attribute(MessageAttributeItemT::KEY, attribute_item.into_value()) 386 } 387 388 /// Converts this to an encoded proto message. 389 pub fn encode_to_vec(self) -> Vec<u8> { 390 self.to_proto_message().encode_to_vec() 391 } 392 393 // Converts this to a proto message. 394 fn to_proto_message(self) -> proto::PubsubMessage { 395 proto::PubsubMessage { 396 attributes: self.attributes, 397 data: self.data, 398 ordering_key: self.ordering_key.unwrap_or_default(), 399 // These fields are set by the server. 400 message_id: Default::default(), 401 publish_time: None, 402 } 403 } 404} 405 406impl<T> PubSubMessage<T> { 407 /// Sets a given attribute on the message. 408 pub fn set_attribute( 409 mut self, 410 key: impl Into<String>, 411 value: impl Into<String>, 412 ) -> PubSubMessage<PubSubMessageValid> { 413 self.attributes.insert(key.into(), value.into()); 414 415 PubSubMessage { 416 data: self.data, 417 attributes: self.attributes, 418 ordering_key: self.ordering_key, 419 _state: PhantomData, 420 } 421 } 422 423 pub fn set_attribute_item<MessageAttributeItemT: MessageAttributeItem>( 424 self, 425 attribute_item: MessageAttributeItemT, 426 ) -> PubSubMessage<PubSubMessageValid> { 427 self.set_attribute(MessageAttributeItemT::KEY, attribute_item.into_value()) 428 } 429 430 /// Sets the ordering key for the message. 431 pub fn set_ordering_key(mut self, ordering_key: impl Into<String>) -> Self { 432 self.ordering_key = Some(ordering_key.into()); 433 434 self 435 } 436} 437 438/// A message attribute item which encodes the key and value in a single struct. 439pub trait MessageAttributeItem { 440 const KEY: &'static str; 441 442 fn into_value(self) -> String; 443} 444 445#[derive(Debug)] 446pub struct TopicPublisherHandle { 447 closed_rx: broadcast::Receiver<()>, 448 send_queue_tx: mpsc::UnboundedSender<proto::PubsubMessage>, 449} 450 451impl Clone for TopicPublisherHandle { 452 fn clone(&self) -> Self { 453 Self { 454 closed_rx: self.closed_rx.resubscribe(), 455 send_queue_tx: self.send_queue_tx.clone(), 456 } 457 } 458} 459 460impl TopicPublisherHandle { 461 pub fn new( 462 send_queue_tx: mpsc::UnboundedSender<proto::PubsubMessage>, 463 closed_rx: broadcast::Receiver<()>, 464 ) -> Self { 465 Self { 466 send_queue_tx, 467 closed_rx, 468 } 469 } 470} 471 472impl TopicPublisherHandle { 473 pub fn publish(&self, message: PubSubMessage) -> Result<(), SendError<PubSubMessage>> { 474 let did_have_ordering_key = message.ordering_key.is_some(); 475 476 self.send_queue_tx 477 .send(message.to_proto_message()) 478 .map_err(|e| { 479 SendError(PubSubMessage { 480 data: e.0.data, 481 attributes: e.0.attributes, 482 ordering_key: did_have_ordering_key.then_some(e.0.ordering_key), 483 _state: PhantomData, 484 }) 485 }) 486 } 487 488 /// Consume this handle and wait for the underlying publisher to exit. 489 /// NOTE: If you clone and move this handle around this method will block until all handles have been dropped 490 pub async fn close(self) { 491 // Split our fields out so we can explicitly drop the send queue and wait for the closed notification 492 let Self { 493 mut closed_rx, 494 send_queue_tx, 495 } = self; 496 drop(send_queue_tx); 497 // This is fine because all we care about is that this completed 498 let _res = closed_rx.recv().await; 499 } 500 501 pub async fn close_timeout(self, timeout: Duration) -> Result<()> { 502 // Split our fields out so we can explicitly drop the send queue and wait for the closed notification 503 let Self { 504 mut closed_rx, 505 send_queue_tx, 506 } = self; 507 drop(send_queue_tx); 508 let _res = tokio::time::timeout(timeout, closed_rx.recv()).await?; 509 Ok(()) 510 } 511} 512 513struct SynchronousTopicPublisherInner { 514 topic: PubSubTopic, 515 client: publisher_client::PublisherClient< 516 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>, 517 >, 518 pubsub_stats: Arc<PubsubStats>, 519 string_interner: StringInterner, 520 _metrics_emitter: AbortOnDrop, 521} 522 523#[derive(Clone)] 524pub struct SynchronousTopicPublisher { 525 inner: Arc<SynchronousTopicPublisherInner>, 526} 527 528impl SynchronousTopicPublisher { 529 pub fn new( 530 topic: PubSubTopic, 531 client: publisher_client::PublisherClient< 532 InterceptedService<tonic::transport::Channel, AuthorizationHeaderInterceptor>, 533 >, 534 metrics: SharedMetrics, 535 ) -> Self { 536 let pubsub_stats = PubsubStats::new(); 537 let _metrics_emitter = pubsub_stats.clone().spawn_emit_worker(metrics); 538 539 Self { 540 inner: Arc::new(SynchronousTopicPublisherInner { 541 topic, 542 client, 543 pubsub_stats, 544 string_interner: StringInterner::new(8), 545 _metrics_emitter, 546 }), 547 } 548 } 549 550 pub async fn publish(&self, messages: &[PubSubMessage]) -> Result<(), tonic::Status> { 551 let mut client = self.inner.client.clone(); 552 let num_messages = messages.len() as i64; 553 match client 554 .publish(proto::PublishRequest { 555 topic: self.inner.topic.to_string(), 556 messages: messages 557 .iter() 558 .cloned() 559 .map(|x| x.to_proto_message()) 560 .collect(), 561 }) 562 .await 563 { 564 Ok(_) => { 565 self.inner.pubsub_stats.publish_sends.incr_by( 566 num_messages, 567 self.inner.string_interner.intern(&self.inner.topic.project), 568 self.inner.string_interner.intern(&self.inner.topic.topic), 569 ); 570 Ok(()) 571 } 572 Err(err) => { 573 self.inner.pubsub_stats.publish_failed.incr_by( 574 num_messages, 575 self.inner.string_interner.intern(&self.inner.topic.project), 576 self.inner.string_interner.intern(&self.inner.topic.topic), 577 ); 578 Err(err) 579 } 580 } 581 } 582} 583 584define_metrics!(PubsubStats, [ 585 publish_failed => DynamicCounter("osprey_gcloud.pubsub.publish.failed", [project, topic]), 586 publish_sends => DynamicCounter("osprey_gcloud.pubsub.publish.sends", [project, topic]), 587]);