Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add dockerignore, add kafka support for coordinator (#70)

authored by

Chenyu and committed by
GitHub
41688547 c819200c

+1617 -547
+39
.dockerignore
··· 1 + # Rust build artifacts 2 + **/target/ 3 + **/Cargo.lock 4 + 5 + # Python 6 + **/__pycache__/ 7 + **/*.pyc 8 + **/*.pyo 9 + **/*.pyd 10 + .Python 11 + **/.venv/ 12 + **/venv/ 13 + **/.pytest_cache/ 14 + **/.coverage 15 + **/*.egg-info/ 16 + 17 + # Node 18 + **/node_modules/ 19 + **/npm-debug.log 20 + **/yarn-error.log 21 + 22 + # IDEs 23 + .idea/ 24 + .vscode/ 25 + *.swp 26 + *.swo 27 + *~ 28 + 29 + # OS 30 + .DS_Store 31 + Thumbs.db 32 + 33 + # Git 34 + .git/ 35 + .gitignore 36 + 37 + # Misc 38 + *.log 39 + .env
+10
README.md
··· 97 97 docker compose up -d 98 98 ``` 99 99 100 + or using the wrapper script 101 + 102 + ```bash 103 + ./start.sh 104 + ``` 105 + 106 + this starts the osprey-worker on its own along with all its required dependencies. 107 + 108 + alternatively, you can start Osprey with `osprey-coordinator`, refer to the [Coordinator README](./example_docker_compose/run_osprey_with_coordinator/README.md) for more information 109 + 100 110 6. (Optional) **Port Forward the UI/UI API:** 101 111 102 112 If you are running the docker compose on a headless machine, you will need to port forward the UI and UI API.
+1 -42
docker-compose.yaml
··· 258 258 ports: 259 259 - "5432:5432" 260 260 volumes: 261 - - metadata_data:/var/lib/postgresql 261 + - metadata_data:/var/lib/postgresql/data 262 262 environment: 263 263 - POSTGRES_PASSWORD=FoolishPassword 264 264 - POSTGRES_USER=osprey ··· 377 377 - ./druid/specs:/specs 378 378 command: ["/bin/sh", "/specs/submit-specs.sh"] 379 379 restart: "no" 380 - 381 - osprey_coordinator: 382 - container_name: osprey_coordinator 383 - build: 384 - context: . 385 - dockerfile: osprey_coordinator/Dockerfile 386 - ports: 387 - - "19950:19950" 388 - - "19951:19951" 389 - environment: 390 - - RUST_LOG=info 391 - - ETCD_PEERS=http://etcd:2379 392 - - SNOWFLAKE_API_ENDPOINT=http://snowflake-id-worker:8088 393 - - POD_IP=127.0.0.1 394 - - PUBSUB_EMULATOR_HOST=127.0.0.1:8085 395 - depends_on: 396 - etcd: 397 - condition: service_healthy 398 - snowflake-id-worker: 399 - condition: service_started 400 - 401 - etcd: 402 - image: quay.io/coreos/etcd:v3.5.15 403 - ports: 404 - - "2379:2379" 405 - environment: 406 - - ETCD_ENABLE_V2=true 407 - - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 408 - - ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 409 - healthcheck: 410 - test: 411 - [ 412 - "CMD", 413 - "etcdctl", 414 - "--endpoints=http://localhost:2379", 415 - "endpoint", 416 - "health", 417 - ] 418 - interval: 5s 419 - timeout: 3s 420 - retries: 3
+119
example_data/generate_coordinator_test_data.sh
··· 1 + #!/bin/bash 2 + 3 + # continuously generate and send test actions to the osprey coordinator via gRPC 4 + # this mimics the Kafka test data generator but sends directly to the coordinator 5 + 6 + set -e 7 + 8 + COORDINATOR_HOST="${COORDINATOR_HOST:-localhost:19951}" 9 + 10 + # Check if grpcurl is installed 11 + if ! command -v grpcurl &> /dev/null; then 12 + echo "Error: grpcurl is not installed." 13 + echo "Install it with: brew install grpcurl (macOS) or go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest" 14 + exit 1 15 + fi 16 + 17 + # Check if jq is installed 18 + if ! command -v jq &> /dev/null; then 19 + echo "Error: jq is not installed." 20 + echo "Install it with: brew install jq (macOS)" 21 + exit 1 22 + fi 23 + 24 + # Initialize action_id counter 25 + action_id=1 26 + 27 + # Words to randomly generate post content 28 + words=(hello the quick brown fox jumps over lazy dog and cat runs fast) 29 + 30 + # Get script directory 31 + SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) 32 + 33 + # Function to generate random user ID 34 + generate_random_user_id() { 35 + echo "user_$(shuf -i 100-9999 -n 1)" 36 + } 37 + 38 + # Function to generate current timestamp in RFC3339 format 39 + generate_timestamp() { 40 + date -u +"%Y-%m-%dT%H:%M:%S.000000000Z" 41 + } 42 + 43 + # Function to generate random post text 44 + generate_random_text() { 45 + echo "${words[RANDOM % ${#words[@]}]} ${words[RANDOM % ${#words[@]}]} ${words[RANDOM % ${#words[@]}]} ${words[RANDOM % ${#words[@]}]} ${words[RANDOM % ${#words[@]}]}." 46 + } 47 + 48 + # Function to generate action data from template 49 + generate_action() { 50 + local text=$(generate_random_text) 51 + local timestamp=$(generate_timestamp) 52 + local user_id=$(generate_random_user_id) 53 + local ip_address="192.168.1.$(shuf -i 1-254 -n 1)" 54 + 55 + local sed_commands=() 56 + sed_commands+=("s/\$text/$text/g") 57 + sed_commands+=("s/\$timestamp/$timestamp/g") 58 + sed_commands+=("s/\$user_id/$user_id/g") 59 + sed_commands+=("s/\$ip_address/$ip_address/g") 60 + sed_commands+=("s/\$action_id/$action_id/g") 61 + 62 + # Apply all sed commands to template.json 63 + local cmd="sed" 64 + for sed_cmd in "${sed_commands[@]}"; do 65 + cmd="$cmd -e '$sed_cmd'" 66 + done 67 + eval "$cmd" "$SCRIPT_DIR/template.json" 68 + } 69 + 70 + # Function to send a single action 71 + send_action() { 72 + local kafka_format_json=$(generate_action) 73 + 74 + # Extract the data object from the Kafka format and convert to coordinator format 75 + local action_data=$(echo "$kafka_format_json" | jq -c '.data') 76 + local timestamp=$(echo "$kafka_format_json" | jq -r '.send_time') 77 + local action_name=$(echo "$action_data" | jq -r '.action_name') 78 + local data_payload=$(echo "$action_data" | jq -c '.data') 79 + 80 + echo "[$action_id] Sending action - Name: $action_name, Timestamp: $timestamp" 81 + 82 + # Build gRPC request format 83 + jq -n \ 84 + --arg action_id "$action_id" \ 85 + --arg action_name "$action_name" \ 86 + --argjson data_payload "$data_payload" \ 87 + --arg timestamp "$timestamp" \ 88 + '{ 89 + action_id: ($action_id | tonumber), 90 + action_name: $action_name, 91 + action_data_json: ($data_payload | tostring), 92 + timestamp: $timestamp 93 + }' | grpcurl -plaintext -d @ "$COORDINATOR_HOST" \ 94 + osprey.rpc.osprey_coordinator.sync_action.v1.OspreyCoordinatorSyncActionService/ProcessAction 95 + 96 + # Increment action_id 97 + ((action_id++)) 98 + } 99 + 100 + # Function to handle cleanup on script termination 101 + cleanup() { 102 + echo 103 + echo "Stopping data generation..." 104 + exit 0 105 + } 106 + 107 + # Set up signal handlers for graceful shutdown 108 + trap cleanup SIGINT SIGTERM 109 + 110 + # Main execution 111 + echo "Generating actions every second to Osprey Coordinator at $COORDINATOR_HOST" 112 + echo "Press Ctrl+C to stop..." 113 + echo 114 + 115 + # Infinite loop to generate and send actions 116 + while true; do 117 + send_action 118 + sleep 1 119 + done
+195
example_docker_compose/run_osprey_with_coordinator/README.md
··· 1 + While Osprey worker can stand on its own by directly ingesting data from Kafka, Osprey Coordinator provides an alternative that provides additional features such as load balancing and synchronous actions. 2 + 3 + ## Quick Start 4 + 5 + The easiest way to run Osprey with the Coordinator is using the helper script from the repository root: 6 + 7 + ```bash 8 + # Start with coordinator 9 + ./start.sh --with-coordinator 10 + 11 + # Start in detached mode 12 + ./start.sh --with-coordinator up -d 13 + 14 + # Start with test data producer 15 + ./start.sh --with-coordinator --profile coordinator_test_data up 16 + ``` 17 + 18 + Or manually using docker compose override files: 19 + 20 + ```bash 21 + # From the repository root 22 + docker compose -f docker-compose.yaml -f example_docker_compose/run_osprey_with_coordinator/docker-compose.coordinator.yaml up 23 + ``` 24 + 25 + ## Overview 26 + 27 + The **Osprey Coordinator** is a Rust-based service that acts as a central hub for distributing actions to Osprey Workers for rule evaluation. It provides two primary modes for receiving actions: 28 + 29 + 1. **Bidirectional gRPC Streaming** - Workers connect to the coordinator via persistent bidirectional streams 30 + 2. **Synchronous gRPC API** - External services send actions directly for immediate processing 31 + 32 + The coordinator can consume actions from Kafka, Pubsub and/or receives them via gRPC, manages action distribution across connected workers, handles acknowledgments, and ensures reliable action processing. 33 + 34 + ## Architecture 35 + 36 + ### Components 37 + 38 + ``` 39 + ┌───────────────────────────────┐ 40 + │ Kafka Topics and/or Pubsub │ 41 + │ (actions_input) │ 42 + └──────────┬────────────────────┘ 43 + 44 + 45 + ┌─────────────────────────────────────────────┐ 46 + │ Osprey Coordinator (Rust) │ 47 + │ ┌─────────────────────────────────────┐ │ 48 + │ │ Priority Queue │ │ 49 + │ │ - Sync Actions (high priority) │ │ 50 + │ │ - Async Actions (lower priority) │ │ 51 + │ └─────────────────────────────────────┘ │ 52 + │ │ 53 + │ gRPC Services: │ 54 + │ - Bidirectional Stream (port 19950) │ 55 + │ - Sync Action API (port 19951) │ 56 + └──────────────┬──────────────────────────────┘ 57 + 58 + 59 + ┌──────────────────────┐ 60 + │ Osprey Workers │ 61 + │ (Python) │ 62 + │ - Process rules │ 63 + │ - Send verdicts │ 64 + └──────────────────────┘ 65 + ``` 66 + 67 + ## Configuration 68 + 69 + The coordinator is configured via the `docker-compose.coordinator.yaml` override file in this directory (`example_docker_compose/run_osprey_with_coordinator/`). This file adds the coordinator service and modifies the worker configuration to connect to it. 70 + 71 + ### Environment Variables 72 + 73 + Configure the coordinator via environment variables in `docker-compose.yaml`: 74 + 75 + | Variable | Default | Description | 76 + |----------|---------|-------------| 77 + | `OSPREY_COORDINATOR_BIDI_STREAM_PORT` | `19950` | Port for bidirectional streaming | 78 + | `OSPREY_COORDINATOR_SYNC_ACTION_PORT` | `19951` | Port for synchronous action API | 79 + | `SNOWFLAKE_API_ENDPOINT` | `http://snowflake-id-worker:8088` | Snowflake ID service endpoint | 80 + | `ETCD_PEERS` | `http://etcd:2379` | etcd connection string | 81 + | `OSPREY_KAFKA_BOOTSTRAP_SERVERS` | `kafka:29092` | Kafka broker addresses | 82 + | `OSPREY_KAFKA_INPUT_STREAM_TOPIC` | `osprey.actions_input` | Kafka topic to consume | 83 + | `OSPREY_KAFKA_GROUP_ID` | `osprey_coordinator_group` | Kafka consumer group ID | 84 + | `OSPREY_COORDINATOR_CONSUMER_TYPE` | `kafka` | Consumer type: `kafka` or `pubsub` | 85 + | `MAX_TIME_TO_SEND_TO_ASYNC_QUEUE_MS` | `500` | Max time to wait before queuing async actions | 86 + | `MAX_ACKING_RECEIVER_WAIT_TIME_MS` | `60000` | Max time to wait for worker ack/nack | 87 + 88 + ### Example Configuration 89 + 90 + To customize the coordinator, edit `example_docker_compose/run_osprey_with_coordinator/docker-compose.coordinator.yaml`: 91 + 92 + ```yaml 93 + services: 94 + osprey-coordinator: 95 + environment: 96 + - RUST_LOG=info 97 + - ETCD_PEERS=http://etcd:2379 98 + - SNOWFLAKE_API_ENDPOINT=http://snowflake-id-worker:8088 99 + - OSPREY_COORDINATOR_CONSUMER_TYPE=kafka # or 'pubsub' 100 + - OSPREY_COORDINATOR_BIDI_STREAM_PORT=19950 101 + - OSPREY_COORDINATOR_SYNC_ACTION_PORT=19951 102 + ``` 103 + 104 + ## Using the Coordinator 105 + 106 + **Worker Configuration:** 107 + 108 + ### Worker Connection 109 + 110 + When using `docker-compose.coordinator.yaml`, workers are automatically configured to connect to the coordinator. The override file sets: 111 + 112 + ```yaml 113 + osprey-worker: 114 + environment: 115 + - OSPREY_INPUT_STREAM_SOURCE=osprey_coordinator 116 + - OSPREY_COORDINATOR_SERVICE_NAME=osprey_coordinator 117 + ``` 118 + 119 + **How It Works:** 120 + 121 + 1. Worker connects to coordinator on port 19950 122 + 2. Worker sends initial connection request with client ID 123 + 3. Coordinator sends actions to worker via the bidirectional stream 124 + 4. Worker processes actions through rules 125 + 5. Worker sends ack/nack with optional verdicts back to coordinator 126 + 6. Connection automatically reconnects every 60-120 seconds for load balancing 127 + 128 + 129 + ### Direct Action Submission (Sync API) 130 + 131 + External services can submit actions directly to the coordinator for synchronous processing. 132 + 133 + **Using grpcurl:** 134 + 135 + ```bash 136 + # Send a single action for immediate processing 137 + grpcurl -plaintext \ 138 + -d '{ 139 + "action_id": 12345, 140 + "action_name": "user_login", 141 + "action_data_json": "{\"user_id\":\"user_123\",\"ip_address\":\"192.168.1.1\"}", 142 + "timestamp": "2024-11-25T10:30:00.000000000Z" 143 + }' \ 144 + localhost:19951 \ 145 + osprey.rpc.osprey_coordinator.sync_action.v1.OspreyCoordinatorSyncActionService/ProcessAction 146 + ``` 147 + 148 + or Use the test data producer: 149 + 150 + ```bash 151 + ./start.sh --with-coordinator --profile coordinator_test_data up 152 + ``` 153 + 154 + ### Kafka/Pubsub Integration 155 + The coordinator can automatically consume from either Kafka or PubSub (but not both simultaneously). Set `OSPREY_COORDINATOR_CONSUMER_TYPE` to choose: 156 + 157 + - `kafka` (default) - Consume from Kafka 158 + - `pubsub` - Consume from Google Cloud PubSub 159 + 160 + Configure the appropriate environment variables for your chosen consumer in `example_docker_compose/run_osprey_with_coordinator/docker-compose.coordinator.yaml`: 161 + 162 + ```yaml 163 + osprey-coordinator: 164 + environment: 165 + # Consumer selection (kafka or pubsub) 166 + - OSPREY_COORDINATOR_CONSUMER_TYPE=kafka 167 + # Kafka configuration (when using kafka) 168 + - OSPREY_KAFKA_BOOTSTRAP_SERVERS=kafka:29092 169 + - OSPREY_KAFKA_INPUT_STREAM_TOPIC=osprey.actions_input 170 + - OSPREY_KAFKA_GROUP_ID=osprey_coordinator_group 171 + 172 + # Pubsub 173 + - OSPREY_COORDINATOR_SERVICE_ACCOUNT 174 + - PUBSUB_SUBSCRIPTION_PROJECT_ID 175 + - PUBSUB_SUBSCRIPTION_ID 176 + - PUBSUB_ENCRYPTION_KEY_URI 177 + # Optionally 178 + - PUBSUB_MAX_MESSAGES = 5000 # default 179 + - PUBSUB_MAX_PROCESSING_MESSAGES = 5000 # default 180 + 181 + # shared by both Kafka and Pubsub, optional 182 + - MAX_TIME_TO_SEND_TO_ASYNC_QUEUE_MS = 500 # default 183 + - MAX_ACKING_RECEIVER_WAIT_TIME_MS = 6000 # default 184 + ``` 185 + 186 + 187 + **Sending Actions via Kafka:** 188 + 189 + Use the test data producer: 190 + 191 + ```bash 192 + # Start the Kafka test data producer 193 + ./start.sh --with-coordinator --profile test_data up kafka-test-data-producer -d 194 + ``` 195 +
+109
example_docker_compose/run_osprey_with_coordinator/docker-compose.coordinator.yaml
··· 1 + # Docker Compose override for running Osprey with Coordinator 2 + # Use with: docker compose -f docker-compose.yaml -f docker-compose.coordinator.yaml up 3 + # Or use the helper script: ./start.sh --with-coordinator 4 + 5 + services: 6 + # Override worker to connect to coordinator instead of Kafka directly 7 + osprey-worker: 8 + depends_on: 9 + kafka: 10 + condition: service_healthy 11 + kafka-topic-creator: 12 + condition: service_completed_successfully 13 + bigtable: 14 + condition: service_healthy 15 + bigtable-initializer: 16 + condition: service_completed_successfully 17 + minio: 18 + condition: service_healthy 19 + minio-bucket-init: 20 + condition: service_completed_successfully 21 + etcd: 22 + condition: service_healthy 23 + postgres: 24 + condition: service_healthy 25 + osprey-coordinator: 26 + condition: service_started 27 + environment: 28 + - ETCD_PEERS=http://etcd:2379 29 + - OSPREY_INPUT_STREAM_SOURCE=osprey_coordinator 30 + - OSPREY_COORDINATOR_SERVICE_NAME=osprey_coordinator 31 + 32 + # Add Osprey Coordinator service 33 + osprey-coordinator: 34 + container_name: osprey-coordinator 35 + hostname: osprey-coordinator 36 + build: 37 + context: . 38 + dockerfile: osprey_coordinator/Dockerfile 39 + ports: 40 + - "19950:19950" 41 + - "19951:19951" 42 + environment: 43 + - RUST_LOG=info 44 + - ETCD_PEERS=http://etcd:2379 45 + - SNOWFLAKE_API_ENDPOINT=http://snowflake-id-worker:8088 46 + - OSPREY_COORDINATOR_CONSUMER_TYPE=kafka 47 + - OSPREY_COORDINATOR_BIDI_STREAM_PORT=19950 48 + - OSPREY_COORDINATOR_SYNC_ACTION_PORT=19951 49 + - POD_IP=osprey-coordinator 50 + - OSPREY_KAFKA_BOOTSTRAP_SERVERS=kafka:29092 51 + - OSPREY_KAFKA_INPUT_STREAM_TOPIC=osprey.actions_input 52 + - OSPREY_KAFKA_GROUP_ID=osprey_coordinator_group 53 + - MAX_TIME_TO_SEND_TO_ASYNC_QUEUE_MS=500 54 + - MAX_ACKING_RECEIVER_WAIT_TIME_MS=60000 55 + depends_on: 56 + etcd: 57 + condition: service_healthy 58 + snowflake-id-worker: 59 + condition: service_started 60 + kafka: 61 + condition: service_healthy 62 + kafka-topic-creator: 63 + condition: service_completed_successfully 64 + 65 + # Add etcd service (required by coordinator) 66 + etcd: 67 + image: quay.io/coreos/etcd:v3.5.15 68 + ports: 69 + - "2379:2379" 70 + environment: 71 + - ETCD_ENABLE_V2=true 72 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 73 + - ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 74 + healthcheck: 75 + test: 76 + [ 77 + "CMD", 78 + "etcdctl", 79 + "--endpoints=http://localhost:2379", 80 + "endpoint", 81 + "health", 82 + ] 83 + interval: 5s 84 + timeout: 3s 85 + retries: 3 86 + 87 + # Optional coordinator test data generator 88 + # Run with: docker compose -f docker-compose.yaml -f docker-compose.coordinator.yaml --profile coordinator_test_data up coordinator-test-data-producer 89 + coordinator-test-data-producer: 90 + image: alpine:latest 91 + hostname: coordinator-test-data-producer 92 + container_name: coordinator-test-data-producer 93 + depends_on: 94 + osprey-coordinator: 95 + condition: service_started 96 + profiles: 97 + - coordinator_test_data 98 + - coordinator-test-data 99 + environment: 100 + COORDINATOR_HOST: "osprey-coordinator:19951" 101 + volumes: 102 + - ./example_data:/osprey/example_data 103 + command: 104 + - /bin/sh 105 + - -c 106 + - | 107 + apk add --no-cache bash jq curl && 108 + wget -qO- https://github.com/fullstorydev/grpcurl/releases/download/v1.8.7/grpcurl_1.8.7_linux_x86_64.tar.gz | tar xz -C /usr/local/bin && 109 + /osprey/example_data/generate_coordinator_test_data.sh
+1
osprey_coordinator/Cargo.toml
··· 77 77 trust-dns-resolver = "0.21" 78 78 uuid = { version = "1.0", features = ["v4"] } 79 79 which = "4.4.0" 80 + rdkafka = "0.38.0" 80 81 81 82 [build-dependencies] 82 83 glob = "0.3"
+350
osprey_coordinator/src/consumer/kafka.rs
··· 1 + use crate::consumer::message_consumer::{ConsumerConfig, ConsumerMessage, MessageConsumer}; 2 + use crate::consumer::message_decoder; 3 + use crate::coordinator_metrics::OspreyCoordinatorMetrics; 4 + use crate::metrics::counters::StaticCounter; 5 + use crate::metrics::histograms::StaticHistogram; 6 + use crate::priority_queue::{AckOrNack, AckableAction, PriorityQueueSender}; 7 + use crate::signals::exit_signal; 8 + use crate::snowflake_client::SnowflakeClient; 9 + use anyhow::Result; 10 + use async_trait::async_trait; 11 + use prost_types::Timestamp; 12 + use rdkafka::config::ClientConfig; 13 + use rdkafka::consumer::{Consumer, StreamConsumer}; 14 + use rdkafka::error::KafkaError; 15 + use rdkafka::message::{Headers, Message as KafkaRawMessage}; 16 + use std::collections::HashMap; 17 + use std::sync::Arc; 18 + use std::time::{SystemTime, UNIX_EPOCH}; 19 + use tokio::time::{timeout, Instant}; 20 + 21 + pub struct KafkaConsumer { 22 + consumer: StreamConsumer, 23 + } 24 + 25 + pub struct KafkaMessage { 26 + data: Vec<u8>, 27 + attributes: HashMap<String, String>, 28 + timestamp: Timestamp, 29 + id: String, 30 + } 31 + 32 + impl KafkaMessage { 33 + pub fn new( 34 + data: Vec<u8>, 35 + attributes: HashMap<String, String>, 36 + timestamp: Timestamp, 37 + id: String, 38 + ) -> Self { 39 + Self { 40 + data, 41 + attributes, 42 + timestamp, 43 + id, 44 + } 45 + } 46 + } 47 + 48 + impl ConsumerMessage for KafkaMessage { 49 + fn data(&self) -> &[u8] { 50 + &self.data 51 + } 52 + 53 + fn attributes(&self) -> &HashMap<String, String> { 54 + &self.attributes 55 + } 56 + 57 + fn timestamp(&self) -> Timestamp { 58 + self.timestamp.clone() 59 + } 60 + 61 + fn id(&self) -> String { 62 + self.id.clone() 63 + } 64 + } 65 + 66 + #[async_trait] 67 + impl MessageConsumer for KafkaConsumer { 68 + type Message = KafkaMessage; 69 + type Error = KafkaError; 70 + 71 + async fn receive(&mut self) -> Result<Self::Message, Self::Error> { 72 + let msg = self.consumer.recv().await?; 73 + 74 + let data = msg.payload().unwrap_or(&[]).to_vec(); 75 + 76 + let attributes: HashMap<String, String> = msg 77 + .headers() 78 + .map(|headers| { 79 + headers 80 + .iter() 81 + .filter_map(|header| { 82 + let key = header.key.to_string(); 83 + let value = header 84 + .value 85 + .and_then(|v| String::from_utf8(v.to_vec()).ok()) 86 + .unwrap_or_default(); 87 + Some((key, value)) 88 + }) 89 + .collect() 90 + }) 91 + .unwrap_or_default(); 92 + 93 + let timestamp_millis = msg.timestamp().to_millis().unwrap_or_else(|| { 94 + SystemTime::now() 95 + .duration_since(UNIX_EPOCH) 96 + .unwrap() 97 + .as_millis() as i64 98 + }); 99 + 100 + let timestamp = Timestamp { 101 + seconds: timestamp_millis / 1000, 102 + nanos: ((timestamp_millis % 1000) * 1_000_000) as i32, 103 + }; 104 + 105 + let partition = msg.partition(); 106 + let offset = msg.offset(); 107 + let id = format!("kafka-{}-{}", partition, offset); 108 + 109 + Ok(KafkaMessage::new(data, attributes, timestamp, id)) 110 + } 111 + 112 + async fn ack(&self, _message: &Self::Message) -> Result<(), Self::Error> { 113 + self.consumer 114 + .commit_consumer_state(rdkafka::consumer::CommitMode::Async)?; 115 + Ok(()) 116 + } 117 + 118 + async fn nack(&self, _message: &Self::Message) -> Result<(), Self::Error> { 119 + Ok(()) 120 + } 121 + } 122 + 123 + impl KafkaConsumer { 124 + pub async fn new() -> Result<Self> { 125 + let input_topic = std::env::var("OSPREY_KAFKA_INPUT_STREAM_TOPIC") 126 + .unwrap_or("osprey.actions_input".to_string()); 127 + let input_bootstrap_servers = 128 + std::env::var("OSPREY_KAFKA_BOOTSTRAP_SERVERS").unwrap_or("localhost:9092".to_string()); 129 + let group_id = std::env::var("OSPREY_KAFKA_GROUP_ID") 130 + .unwrap_or("osprey_coordinator_group".to_string()); 131 + 132 + tracing::info!( 133 + "Creating Kafka consumer for topic: {} with bootstrap servers: {}", 134 + input_topic, 135 + input_bootstrap_servers 136 + ); 137 + 138 + let consumer: StreamConsumer = ClientConfig::new() 139 + .set("group.id", &group_id) 140 + .set("bootstrap.servers", &input_bootstrap_servers) 141 + .set("enable.auto.commit", "false") 142 + .set("auto.offset.reset", "earliest") 143 + .create::<StreamConsumer>()?; 144 + 145 + consumer.subscribe(&[&input_topic])?; 146 + 147 + Ok(Self { consumer }) 148 + } 149 + } 150 + 151 + pub async fn start_kafka_consumer( 152 + snowflake_client: Arc<SnowflakeClient>, 153 + priority_queue_sender: PriorityQueueSender, 154 + metrics: Arc<OspreyCoordinatorMetrics>, 155 + ) -> Result<()> { 156 + tracing::info!("Kafka consumer starting..."); 157 + 158 + let mut consumer = KafkaConsumer::new().await?; 159 + let config = ConsumerConfig::default(); 160 + 161 + loop { 162 + tokio::select! { 163 + _ = exit_signal() => { 164 + tracing::info!("Received exit signal, shutting down Kafka consumer"); 165 + return Ok(()); 166 + } 167 + message_result = consumer.receive() => { 168 + let message = match message_result { 169 + Ok(msg) => msg, 170 + Err(e) => { 171 + tracing::error!({error = %e}, "[kafka] error receiving message"); 172 + continue; 173 + } 174 + }; 175 + 176 + let ack_id: u64 = rand::Rng::gen(&mut rand::thread_rng()); 177 + let message_id = message.id(); 178 + 179 + let action = match message.attributes().get("encoding").map(|s| s.as_str()) { 180 + Some("proto") => { 181 + message_decoder::decode_proto_message( 182 + message.data(), 183 + ack_id, 184 + message.timestamp(), 185 + &snowflake_client, 186 + &metrics, 187 + ) 188 + .await 189 + } 190 + _ => { 191 + message_decoder::decode_msgpack_json_message( 192 + message.data(), 193 + ack_id, 194 + message.timestamp(), 195 + &snowflake_client, 196 + &metrics, 197 + ) 198 + .await 199 + } 200 + }; 201 + 202 + let action = match action { 203 + Ok(action) => action, 204 + Err(e) => { 205 + tracing::error!( 206 + {error = %e, ack_id = %ack_id, message_id = %message_id}, 207 + "[kafka] failed to decode message" 208 + ); 209 + if let Err(nack_err) = consumer.nack(&message).await { 210 + tracing::error!( 211 + {error = %nack_err, message_id = %message_id}, 212 + "[kafka] failed to nack message" 213 + ); 214 + } 215 + continue; 216 + } 217 + }; 218 + 219 + let (ackable_action, acking_receiver) = AckableAction::new(action); 220 + 221 + tracing::debug!( 222 + {ack_id = %ack_id, message_id = %message_id}, 223 + "[kafka] received message" 224 + ); 225 + 226 + let send_start_time = Instant::now(); 227 + match timeout( 228 + config.max_time_to_send_to_async_queue, 229 + priority_queue_sender.send_async(ackable_action), 230 + ) 231 + .await 232 + { 233 + Ok(Ok(())) => { 234 + tracing::debug!( 235 + {message_id = %message_id, ack_id = %ack_id}, 236 + "[kafka] sent message to priority queue" 237 + ); 238 + metrics.async_classification_added_to_queue.incr(); 239 + } 240 + Ok(Err(e)) => { 241 + tracing::error!( 242 + {error = %e, message_id = %message_id}, 243 + "[kafka] priority queue send error" 244 + ); 245 + if let Err(nack_err) = consumer.nack(&message).await { 246 + tracing::error!( 247 + {error = %nack_err, message_id = %message_id}, 248 + "[kafka] failed to nack message" 249 + ); 250 + } 251 + continue; 252 + } 253 + Err(_) => { 254 + tracing::error!( 255 + {message_id = %message_id}, 256 + "[kafka] sending to priority queue timed out" 257 + ); 258 + if let Err(nack_err) = consumer.nack(&message).await { 259 + tracing::error!( 260 + {error = %nack_err, message_id = %message_id}, 261 + "[kafka] failed to nack message" 262 + ); 263 + } 264 + continue; 265 + } 266 + } 267 + metrics 268 + .priority_queue_send_time_async 269 + .record(send_start_time.elapsed()); 270 + 271 + tracing::debug!( 272 + {message_id = %message_id, ack_id = %ack_id}, 273 + "[kafka] waiting on ack or nack" 274 + ); 275 + 276 + let receive_start_time = Instant::now(); 277 + match timeout(config.max_acking_receiver_wait_time, acking_receiver).await { 278 + Ok(Ok(ack_or_nack)) => match ack_or_nack { 279 + AckOrNack::Ack(_) => { 280 + tracing::debug!( 281 + {message_id = %message_id, ack_id = %ack_id}, 282 + "[kafka] acking message" 283 + ); 284 + metrics.async_classification_result_ack.incr(); 285 + metrics 286 + .receiver_ack_time_async 287 + .record(receive_start_time.elapsed()); 288 + 289 + if let Err(e) = consumer.ack(&message).await { 290 + tracing::error!( 291 + {error = %e, message_id = %message_id}, 292 + "[kafka] failed to ack message" 293 + ); 294 + } 295 + } 296 + AckOrNack::Nack => { 297 + tracing::debug!( 298 + {message_id = %message_id, ack_id = %ack_id}, 299 + "[kafka] nacking message" 300 + ); 301 + metrics.async_classification_result_nack.incr(); 302 + metrics 303 + .receiver_ack_time_async 304 + .record(receive_start_time.elapsed()); 305 + 306 + if let Err(e) = consumer.nack(&message).await { 307 + tracing::error!( 308 + {error = %e, message_id = %message_id}, 309 + "[kafka] failed to nack message" 310 + ); 311 + } 312 + } 313 + }, 314 + Ok(Err(recv_error)) => { 315 + tracing::error!( 316 + {message_id = %message_id, recv_error = %recv_error, ack_id = %ack_id}, 317 + "[kafka] acking sender dropped" 318 + ); 319 + metrics 320 + .receiver_ack_time_async 321 + .record(receive_start_time.elapsed()); 322 + 323 + if let Err(e) = consumer.nack(&message).await { 324 + tracing::error!( 325 + {error = %e, message_id = %message_id}, 326 + "[kafka] failed to nack message" 327 + ); 328 + } 329 + } 330 + Err(_) => { 331 + tracing::error!( 332 + {message_id = %message_id, ack_id = %ack_id}, 333 + "[kafka] waiting for ack/nack timed out" 334 + ); 335 + metrics 336 + .receiver_ack_time_async 337 + .record(receive_start_time.elapsed()); 338 + 339 + if let Err(e) = consumer.nack(&message).await { 340 + tracing::error!( 341 + {error = %e, message_id = %message_id}, 342 + "[kafka] failed to nack message" 343 + ); 344 + } 345 + } 346 + } 347 + } 348 + } 349 + } 350 + }
+49
osprey_coordinator/src/consumer/message_consumer.rs
··· 1 + use anyhow::Result; 2 + use async_trait::async_trait; 3 + use prost_types::Timestamp; 4 + use std::collections::HashMap; 5 + use tokio::time::Duration as TokioDuration; 6 + 7 + #[derive(Clone)] 8 + pub struct ConsumerConfig { 9 + pub max_time_to_send_to_async_queue: TokioDuration, 10 + pub max_acking_receiver_wait_time: TokioDuration, 11 + } 12 + 13 + impl Default for ConsumerConfig { 14 + fn default() -> Self { 15 + Self { 16 + max_time_to_send_to_async_queue: TokioDuration::from_millis( 17 + std::env::var("MAX_TIME_TO_SEND_TO_ASYNC_QUEUE_MS") 18 + .unwrap_or("500".to_string()) 19 + .parse::<u64>() 20 + .unwrap(), 21 + ), 22 + max_acking_receiver_wait_time: TokioDuration::from_millis( 23 + std::env::var("MAX_ACKING_RECEIVER_WAIT_TIME_MS") 24 + .unwrap_or("60000".to_string()) 25 + .parse::<u64>() 26 + .unwrap(), 27 + ), 28 + } 29 + } 30 + } 31 + 32 + pub trait ConsumerMessage { 33 + fn data(&self) -> &[u8]; 34 + fn attributes(&self) -> &HashMap<String, String>; 35 + fn timestamp(&self) -> Timestamp; 36 + fn id(&self) -> String; 37 + } 38 + 39 + #[async_trait] 40 + pub trait MessageConsumer: Send { 41 + type Message: ConsumerMessage; 42 + type Error: std::error::Error + Send + Sync + 'static; 43 + 44 + async fn receive(&mut self) -> Result<Self::Message, Self::Error>; 45 + 46 + async fn ack(&self, message: &Self::Message) -> Result<(), Self::Error>; 47 + 48 + async fn nack(&self, message: &Self::Message) -> Result<(), Self::Error>; 49 + }
+90
osprey_coordinator/src/consumer/message_decoder.rs
··· 1 + use anyhow::{anyhow, Result}; 2 + use convert_case::{Case, Casing}; 3 + use prost::Message as ProstMessage; 4 + use prost_types::Timestamp; 5 + use serde::Deserialize; 6 + use serde_json::Value; 7 + 8 + use crate::{ 9 + coordinator_metrics::OspreyCoordinatorMetrics, 10 + metrics::counters::StaticCounter, 11 + proto::{ 12 + self, osprey_coordinator_action::ActionData, osprey_coordinator_action::SecretData, 13 + Action as OspreyProtoAction, 14 + }, 15 + snowflake_client::SnowflakeClient, 16 + }; 17 + 18 + pub async fn decode_proto_message( 19 + message_data: &[u8], 20 + ack_id: u64, 21 + message_timestamp: Timestamp, 22 + snowflake_client: &SnowflakeClient, 23 + metrics: &OspreyCoordinatorMetrics, 24 + ) -> Result<proto::OspreyCoordinatorAction> { 25 + let osprey_proto_action = OspreyProtoAction::decode(message_data)?; 26 + let action_id = if osprey_proto_action.id == 0 { 27 + metrics.action_id_snowflake_generation_proto.incr(); 28 + snowflake_client.generate_id().await? 29 + } else { 30 + osprey_proto_action.id 31 + }; 32 + let action_name = osprey_proto_action 33 + .data 34 + .ok_or_else(|| anyhow!("missing action data"))? 35 + .to_string() 36 + .to_case(Case::Snake); 37 + Ok(proto::OspreyCoordinatorAction { 38 + ack_id, 39 + action_id, 40 + action_name, 41 + action_data: Some(ActionData::ProtoActionData(message_data.into())), 42 + secret_data: None, 43 + timestamp: Some(message_timestamp), 44 + }) 45 + } 46 + 47 + pub async fn decode_msgpack_json_message( 48 + message_data: &[u8], 49 + ack_id: u64, 50 + message_timestamp: Timestamp, 51 + snowflake_client: &SnowflakeClient, 52 + metrics: &OspreyCoordinatorMetrics, 53 + ) -> Result<proto::OspreyCoordinatorAction> { 54 + use msgpack_simple::MsgPack; 55 + 56 + #[derive(Deserialize, Debug)] 57 + struct MsgpackAction { 58 + id: Option<String>, 59 + name: String, 60 + data: Value, 61 + secret_data: Option<Value>, 62 + } 63 + 64 + let decoded = MsgPack::parse(message_data)?; 65 + let decoded = decoded.as_string()?; 66 + let action: MsgpackAction = serde_json::from_str(decoded.as_str())?; 67 + 68 + let serde_json_vec = serde_json::to_vec(&action.data)?; 69 + let optional_secret_data = match &action.secret_data { 70 + Some(secret_data) => Some(SecretData::JsonSecretData(serde_json::to_vec(secret_data)?)), 71 + _ => None, 72 + }; 73 + 74 + let action_id = match action.id { 75 + Some(id) => id.parse::<u64>()?, 76 + None => { 77 + metrics.action_id_snowflake_generation_json.incr(); 78 + snowflake_client.generate_id().await? 79 + } 80 + }; 81 + 82 + Ok(proto::OspreyCoordinatorAction { 83 + ack_id, 84 + action_id, 85 + action_name: action.name, 86 + action_data: Some(ActionData::JsonActionData(serde_json_vec)), 87 + secret_data: optional_secret_data, 88 + timestamp: Some(message_timestamp), 89 + }) 90 + }
+7
osprey_coordinator/src/consumer/mod.rs
··· 1 + pub mod kafka; 2 + pub mod message_consumer; 3 + pub mod message_decoder; 4 + pub mod pubsub; 5 + 6 + pub use kafka::start_kafka_consumer; 7 + pub use pubsub::start_pubsub_subscriber;
+520
osprey_coordinator/src/consumer/pubsub.rs
··· 1 + use std::collections::HashMap; 2 + use std::sync::Arc; 3 + use std::time::Duration; 4 + 5 + use crate::consumer::message_consumer::{ConsumerConfig, ConsumerMessage}; 6 + use crate::gcloud::grpc::connection::Connection; 7 + use crate::gcloud::{ 8 + auth::AuthorizationHeaderInterceptor, 9 + gcp_metadata::GCPMetadataClient, 10 + google::pubsub::v1::subscriber_client::SubscriberClient, 11 + kms::{AesGcmEnvelope, GOOGLE_KMS_DOMAIN}, 12 + pubsub::{PubSubSubscription, GOOGLE_PUBSUB_DOMAIN}, 13 + }; 14 + use crate::metrics::counters::StaticCounter; 15 + use crate::metrics::histograms::StaticHistogram; 16 + use crate::metrics::MetricsClientBuilder; 17 + use crate::{ 18 + consumer::message_decoder, 19 + coordinator_metrics::OspreyCoordinatorMetrics, 20 + priority_queue::{AckOrNack, AckableAction, PriorityQueueSender}, 21 + proto, 22 + pub_sub_streaming_pull::DetachedMessage, 23 + pub_sub_streaming_pull::{FlowControl, SpawnTaskPerMessageHandler, StreamingPullManager}, 24 + }; 25 + use anyhow::{anyhow, Result}; 26 + use prost_types::Timestamp; 27 + use rand::Rng; 28 + use tokio::time::{timeout, Instant}; 29 + use tonic::{codegen::InterceptedService, transport::Channel}; 30 + 31 + use crate::signals::exit_signal; 32 + use crate::snowflake_client::SnowflakeClient; 33 + 34 + pub struct PubSubMessage { 35 + inner: DetachedMessage, 36 + } 37 + 38 + impl ConsumerMessage for PubSubMessage { 39 + fn data(&self) -> &[u8] { 40 + &self.inner.data 41 + } 42 + 43 + fn attributes(&self) -> &HashMap<String, String> { 44 + self.inner.attributes() 45 + } 46 + 47 + fn timestamp(&self) -> Timestamp { 48 + self.inner.publish_time() 49 + } 50 + 51 + fn id(&self) -> String { 52 + self.inner.message_id.clone() 53 + } 54 + } 55 + 56 + impl From<DetachedMessage> for PubSubMessage { 57 + fn from(msg: DetachedMessage) -> Self { 58 + PubSubMessage { inner: msg } 59 + } 60 + } 61 + 62 + async fn decrypt_pubsub_message( 63 + kms_envelope: Arc<AesGcmEnvelope>, 64 + message_data: &[u8], 65 + ) -> Result<Vec<u8>> { 66 + kms_envelope 67 + .decrypt(message_data) 68 + .await 69 + .map_err(|err| anyhow!("message decryption failed: {}", err.to_string())) 70 + } 71 + 72 + async fn create_action_from_pubsub_message( 73 + kms_envelope: Arc<AesGcmEnvelope>, 74 + message_data: &[u8], 75 + message_attributes: &HashMap<String, String>, 76 + ack_id: u64, 77 + message_timestamp: Timestamp, 78 + snowflake_client: &SnowflakeClient, 79 + metrics: &OspreyCoordinatorMetrics, 80 + ) -> Result<proto::OspreyCoordinatorAction> { 81 + let decrypted_message_vector = match message_attributes.get("encrypted") { 82 + Some(is_encrypted) if is_encrypted == "true" => { 83 + Some(decrypt_pubsub_message(kms_envelope, message_data).await?) 84 + } 85 + _ => None, 86 + }; 87 + let message_data = match &decrypted_message_vector { 88 + Some(data) => &data[..], 89 + None => message_data, 90 + }; 91 + 92 + match message_attributes.get("encoding") { 93 + Some(encoding) if encoding == "proto" => { 94 + message_decoder::decode_proto_message( 95 + message_data, 96 + ack_id, 97 + message_timestamp, 98 + snowflake_client, 99 + metrics, 100 + ) 101 + .await 102 + } 103 + _ => { 104 + message_decoder::decode_msgpack_json_message( 105 + message_data, 106 + ack_id, 107 + message_timestamp, 108 + snowflake_client, 109 + metrics, 110 + ) 111 + .await 112 + } 113 + } 114 + } 115 + 116 + async fn create_pubsub_subscription_client( 117 + ) -> SubscriberClient<InterceptedService<Channel, AuthorizationHeaderInterceptor>> { 118 + let emulator_host = std::env::var("PUBSUB_EMULATOR_HOST").ok(); 119 + 120 + let timeout = Duration::from_secs(5); 121 + 122 + if let Some(emulator_host) = emulator_host { 123 + tracing::info!("Creating subscription client to emulator"); 124 + Connection::new_no_auth( 125 + format!("http://{}", emulator_host).try_into().unwrap(), 126 + timeout, 127 + ) 128 + .create_subscriber_client() 129 + } else { 130 + tracing::info!("Creating subscription client to real pubsub"); 131 + let service_account = 132 + std::env::var("OSPREY_COORDINATOR_SERVICE_ACCOUNT").unwrap_or("default".to_string()); 133 + let client = GCPMetadataClient::new(service_account).unwrap(); 134 + Connection::from_metadata_client( 135 + client, 136 + timeout, 137 + Duration::from_secs(24000), 138 + GOOGLE_PUBSUB_DOMAIN, 139 + ) 140 + .await 141 + .unwrap() 142 + .create_subscriber_client() 143 + } 144 + } 145 + 146 + pub async fn start_pubsub_subscriber( 147 + snowflake_client: Arc<SnowflakeClient>, 148 + priority_queue_sender: PriorityQueueSender, 149 + metrics: Arc<OspreyCoordinatorMetrics>, 150 + ) -> Result<()> { 151 + let subscriber_client = create_pubsub_subscription_client().await; 152 + let subscription_name = { 153 + let project_id = 154 + std::env::var("PUBSUB_SUBSCRIPTION_PROJECT_ID").unwrap_or("osprey-dev".to_string()); 155 + 156 + let subscription_id = std::env::var("PUBSUB_SUBSCRIPTION_ID") 157 + .unwrap_or("osprey-coordinator-actions".to_string()); 158 + 159 + PubSubSubscription::new(project_id, subscription_id) 160 + }; 161 + 162 + let kek_uri = std::env::var("PUBSUB_ENCRYPTION_KEY_URI").unwrap_or("".to_string()); 163 + 164 + let kms_envelope = Connection::from_metadata_client( 165 + GCPMetadataClient::new("default".into())?, 166 + Duration::from_secs(5), 167 + Duration::from_secs(24000), 168 + GOOGLE_KMS_DOMAIN, 169 + ) 170 + .await? 171 + .create_kms_aes_gcm_envelope(kek_uri, Vec::new(), true)?; 172 + 173 + let kms_envelope = Arc::new(kms_envelope); 174 + let max_messages = std::env::var("PUBSUB_MAX_MESSAGES") 175 + .unwrap_or("5000".to_string()) 176 + .parse::<usize>() 177 + .unwrap(); 178 + let max_processing_messages = std::env::var("PUBSUB_MAX_PROCESSING_MESSAGES") 179 + .unwrap_or("5000".to_string()) 180 + .parse::<usize>() 181 + .unwrap(); 182 + 183 + let config = ConsumerConfig::default(); 184 + let max_time_to_send_to_async_queue = config.max_time_to_send_to_async_queue; 185 + let max_acking_receiver_wait_time = config.max_acking_receiver_wait_time; 186 + 187 + tracing::info!( 188 + {subscription_name = %subscription_name}, 189 + "creating streaming pull manager" 190 + ); 191 + let flow_control = FlowControl::default() 192 + .set_max_messages(max_messages) 193 + .set_max_processing_messages(max_processing_messages) 194 + .set_max_bytes(1024 * 1024 * 1024); 195 + StreamingPullManager::new( 196 + subscriber_client, 197 + subscription_name, 198 + flow_control, 199 + SpawnTaskPerMessageHandler::new(move |message: DetachedMessage| { 200 + let metrics = metrics.clone(); 201 + let pubsub_message = PubSubMessage::from(message); 202 + let message_id = pubsub_message.id(); 203 + let priority_queue_sender = priority_queue_sender.clone(); 204 + let snowflake_client = snowflake_client.clone(); 205 + let kms_envelope = kms_envelope.clone(); 206 + 207 + async move { 208 + let ack_id: u64 = rand::thread_rng().gen(); 209 + 210 + let action = create_action_from_pubsub_message( 211 + kms_envelope, 212 + pubsub_message.data(), 213 + pubsub_message.attributes(), 214 + ack_id, 215 + pubsub_message.timestamp(), 216 + snowflake_client.as_ref(), 217 + &metrics, 218 + ) 219 + .await 220 + .map_err(|_| ())?; 221 + 222 + let (ackable_action, acking_receiver) = AckableAction::new(action); 223 + 224 + tracing::debug!( 225 + {ack_id = %ack_id, message_id = %message_id}, 226 + "[pubsub] received message" 227 + ); 228 + 229 + let send_start_time = Instant::now(); 230 + match timeout( 231 + max_time_to_send_to_async_queue, 232 + priority_queue_sender.send_async(ackable_action), 233 + ) 234 + .await 235 + { 236 + Ok(Ok(())) => { 237 + tracing::debug!( 238 + {message_id = %message_id, ack_id = %ack_id}, 239 + "[pubsub] sent message to priority queue" 240 + ); 241 + metrics.async_classification_added_to_queue.incr(); 242 + } 243 + Ok(Err(e)) => { 244 + tracing::error!( 245 + {error = %e, message_id = %message_id}, 246 + "[pubsub] priority queue send error" 247 + ); 248 + return Err(()); 249 + } 250 + Err(_) => { 251 + tracing::error!( 252 + {message_id = %message_id}, 253 + "[pubsub] sending to priority queue timed out" 254 + ); 255 + return Err(()); 256 + } 257 + } 258 + metrics 259 + .priority_queue_send_time_async 260 + .record(send_start_time.elapsed()); 261 + 262 + tracing::debug!( 263 + {message_id = %message_id, ack_id = %ack_id}, 264 + "[pubsub] waiting on ack or nack" 265 + ); 266 + 267 + let receive_start_time = Instant::now(); 268 + match timeout(max_acking_receiver_wait_time, acking_receiver).await { 269 + Ok(Ok(ack_or_nack)) => match ack_or_nack { 270 + AckOrNack::Ack(_) => { 271 + tracing::debug!( 272 + {message_id = %message_id, ack_id = %ack_id}, 273 + "[pubsub] acking message" 274 + ); 275 + metrics.async_classification_result_ack.incr(); 276 + metrics 277 + .receiver_ack_time_async 278 + .record(receive_start_time.elapsed()); 279 + Ok(()) 280 + } 281 + AckOrNack::Nack => { 282 + tracing::debug!( 283 + {message_id = %message_id, ack_id = %ack_id}, 284 + "[pubsub] nacking message" 285 + ); 286 + metrics.async_classification_result_nack.incr(); 287 + metrics 288 + .receiver_ack_time_async 289 + .record(receive_start_time.elapsed()); 290 + Err(()) 291 + } 292 + }, 293 + Ok(Err(recv_error)) => { 294 + tracing::error!( 295 + {message_id = %message_id, recv_error = %recv_error, ack_id = %ack_id}, 296 + "[pubsub] acking sender dropped" 297 + ); 298 + metrics 299 + .receiver_ack_time_async 300 + .record(receive_start_time.elapsed()); 301 + Err(()) 302 + } 303 + Err(_) => { 304 + tracing::error!( 305 + {message_id = %message_id, ack_id = %ack_id}, 306 + "[pubsub] waiting for ack/nack timed out" 307 + ); 308 + metrics 309 + .receiver_ack_time_async 310 + .record(receive_start_time.elapsed()); 311 + Err(()) 312 + } 313 + } 314 + } 315 + }), 316 + MetricsClientBuilder::new("osprey_coordinator.pull"), 317 + ) 318 + .gracefully_stop_on_signal(exit_signal(), Duration::from_secs(30)) 319 + .await; 320 + Result::Ok(()) 321 + } 322 + 323 + #[cfg(test)] 324 + mod tests { 325 + use base64::Engine; 326 + use prost_types::Timestamp; 327 + use serde_json::json; 328 + use std::collections::HashMap; 329 + use std::sync::Arc; 330 + 331 + use super::create_action_from_pubsub_message; 332 + use crate::coordinator_metrics::OspreyCoordinatorMetrics; 333 + use crate::proto; 334 + use crate::snowflake_client::SnowflakeClient; 335 + 336 + #[tokio::test] 337 + async fn test_create_action_from_pubsub_message_1() { 338 + use crate::gcloud::grpc::connection::Connection; 339 + use msgpack_simple::MsgPack; 340 + 341 + let action_json = json!({ 342 + "id": "123456789", 343 + "name": "guild_invite_created", 344 + "data": { 345 + "char": "abc", 346 + "int": 1i64, 347 + "float2": 1.1_f64 348 + }, 349 + }); 350 + let encoded = MsgPack::String(action_json.to_string()).encode(); 351 + 352 + let snowflake = SnowflakeClient::new("http://localhost:8088".to_string()); 353 + let metrics = OspreyCoordinatorMetrics::new(); 354 + // Create a mock KMS envelope (won't be used since encrypted != true) 355 + let connection = Connection::new_no_auth( 356 + "http://localhost:8080".try_into().unwrap(), 357 + std::time::Duration::from_secs(5), 358 + ); 359 + let kms_envelope = Arc::new( 360 + connection 361 + .create_kms_aes_gcm_envelope("gcp-kms://test".to_string(), Vec::new(), false) 362 + .unwrap(), 363 + ); 364 + 365 + let attributes = HashMap::new(); 366 + 367 + let prost_action = create_action_from_pubsub_message( 368 + kms_envelope, 369 + encoded.as_slice(), 370 + &attributes, 371 + 12344242, 372 + Timestamp::default(), 373 + &snowflake, 374 + &metrics, 375 + ) 376 + .await; 377 + 378 + println!("{:?}", prost_action); 379 + assert!(prost_action.is_ok(), "prost action decoding failed"); 380 + let prost_action = prost_action.unwrap(); 381 + assert_eq!(prost_action.action_name, "guild_invite_created"); 382 + 383 + let action_data = prost_action 384 + .action_data 385 + .expect("action_data should be present"); 386 + let proto::osprey_coordinator_action::ActionData::JsonActionData(json_bytes) = action_data 387 + else { 388 + panic!() 389 + }; 390 + let data: serde_json::Value = serde_json::from_slice(&json_bytes).unwrap(); 391 + assert_eq!(data["char"], "abc"); 392 + assert_eq!(data["int"], 1); 393 + assert_eq!(data["float2"], 1.1); 394 + } 395 + 396 + #[tokio::test] 397 + async fn test_create_action_from_pubsub_message_2() { 398 + use crate::gcloud::grpc::connection::Connection; 399 + use msgpack_simple::MsgPack; 400 + 401 + let action_json = json!({ 402 + "id": "123456789", 403 + "name": "guild_invite_created", 404 + "data": { 405 + "char": "abc", 406 + "int": 1i64, 407 + "float2": 1.1_f64 408 + }, 409 + }); 410 + 411 + let encoded = MsgPack::String(action_json.to_string()).encode(); 412 + 413 + let snowflake = SnowflakeClient::new("http://localhost:8088".to_string()); 414 + let metrics = OspreyCoordinatorMetrics::new(); 415 + let connection = Connection::new_no_auth( 416 + "http://localhost:8080".try_into().unwrap(), 417 + std::time::Duration::from_secs(5), 418 + ); 419 + let kms_envelope = Arc::new( 420 + connection 421 + .create_kms_aes_gcm_envelope("gcp-kms://test".to_string(), Vec::new(), false) 422 + .unwrap(), 423 + ); 424 + let attributes = HashMap::new(); 425 + 426 + let prost_action = create_action_from_pubsub_message( 427 + kms_envelope, 428 + encoded.as_slice(), 429 + &attributes, 430 + 12344242, 431 + Timestamp::default(), 432 + &snowflake, 433 + &metrics, 434 + ) 435 + .await; 436 + 437 + println!("{:?}", prost_action); 438 + assert!(prost_action.is_ok(), "prost action decoding failed"); 439 + let prost_action = prost_action.unwrap(); 440 + assert_eq!(prost_action.action_name, "guild_invite_created"); 441 + 442 + let proto::osprey_coordinator_action::ActionData::JsonActionData(json_bytes) = prost_action 443 + .action_data 444 + .expect("action_data should be present") 445 + else { 446 + panic!("Expected JsonActionData variant") 447 + }; 448 + 449 + let data: serde_json::Value = serde_json::from_slice(&json_bytes).unwrap(); 450 + assert_eq!(data["char"], "abc"); 451 + assert_eq!(data["int"], 1); 452 + assert_eq!(data["float2"], 1.1); 453 + } 454 + 455 + #[tokio::test] 456 + async fn test_create_action_from_pubsub_proto_action() { 457 + use crate::gcloud::grpc::connection::Connection; 458 + use std::fs::File; 459 + use std::io::Read; 460 + 461 + let mut file = File::open("test_data/pubsub_proto_message.json").unwrap(); 462 + let mut data = String::new(); 463 + file.read_to_string(&mut data).unwrap(); 464 + let json: serde_json::Value = serde_json::from_str(&data).unwrap(); 465 + let action_jsons = json.as_array().expect("was not array"); 466 + let action_json = action_jsons[0].as_object().expect("is not map"); 467 + 468 + let action_data_str = action_json 469 + .get("message") 470 + .unwrap() 471 + .as_object() 472 + .unwrap() 473 + .get("data") 474 + .unwrap() 475 + .as_str() 476 + .unwrap(); 477 + 478 + let action_bytes = base64::engine::general_purpose::STANDARD 479 + .decode(action_data_str) 480 + .unwrap(); 481 + 482 + let snowflake = SnowflakeClient::new("http://localhost:8088".to_string()); 483 + let metrics = OspreyCoordinatorMetrics::new(); 484 + let connection = Connection::new_no_auth( 485 + "http://localhost:8080".try_into().unwrap(), 486 + std::time::Duration::from_secs(5), 487 + ); 488 + let kms_envelope = Arc::new( 489 + connection 490 + .create_kms_aes_gcm_envelope("gcp-kms://test".to_string(), Vec::new(), false) 491 + .unwrap(), 492 + ); 493 + let mut attributes = HashMap::new(); 494 + attributes.insert("encoding".to_string(), "proto".to_string()); 495 + 496 + let prost_action = create_action_from_pubsub_message( 497 + kms_envelope, 498 + action_bytes.as_slice(), 499 + &attributes, 500 + 12344242, 501 + Timestamp::default(), 502 + &snowflake, 503 + &metrics, 504 + ) 505 + .await; 506 + 507 + assert!(prost_action.is_ok(), "proto action decoding failed"); 508 + let prost_action = prost_action.unwrap(); 509 + 510 + // Validate we got ProtoActionData (not JsonActionData) and the bytes match input 511 + let proto::osprey_coordinator_action::ActionData::ProtoActionData(proto_bytes) = 512 + prost_action 513 + .action_data 514 + .expect("action_data should be present") 515 + else { 516 + panic!("Expected ProtoActionData variant") 517 + }; 518 + assert_eq!(proto_bytes, action_bytes); 519 + } 520 + }
+46 -11
osprey_coordinator/src/main.rs
··· 1 1 mod backoff_utils; 2 2 mod cached_futures; 3 + mod consumer; 3 4 mod coordinator_metrics; 4 5 mod discovery; 5 6 mod etcd; ··· 14 15 mod priority_queue; 15 16 mod proto; 16 17 mod pub_sub_streaming_pull; 17 - mod pubsub; 18 18 mod shutdown_handler; 19 19 mod signals; 20 20 mod snowflake_client; ··· 34 34 use crate::metrics::emit_worker::SpawnEmitWorker; 35 35 use crate::metrics::new_client; 36 36 37 + use consumer::{start_kafka_consumer, start_pubsub_subscriber}; 37 38 use priority_queue::{create_ackable_action_priority_queue, spawn_priority_queue_metrics_worker}; 38 - use pubsub::start_pubsub_subscriber; 39 39 use tokio::join; 40 40 41 41 use crate::osprey_bidirectional_stream::OspreyCoordinatorServer; ··· 95 95 metrics.clone(), 96 96 )); 97 97 98 - let pubsub_fut = start_pubsub_subscriber( 99 - snowflake_client, 100 - priority_queue_sender.clone(), 101 - metrics.clone(), 102 - ); 98 + let consumer_type = std::env::var("OSPREY_COORDINATOR_CONSUMER_TYPE").ok(); 99 + 100 + let consumer_fut = match consumer_type.as_deref() { 101 + Some("kafka") => { 102 + tracing::info!("starting Kafka consumer"); 103 + Box::pin(start_kafka_consumer( 104 + snowflake_client.clone(), 105 + priority_queue_sender.clone(), 106 + metrics.clone(), 107 + )) 108 + as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 109 + } 110 + Some("pubsub") => { 111 + tracing::info!("starting PubSub subscriber"); 112 + Box::pin(start_pubsub_subscriber( 113 + snowflake_client.clone(), 114 + priority_queue_sender.clone(), 115 + metrics.clone(), 116 + )) 117 + as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 118 + } 119 + Some(invalid) => { 120 + anyhow::bail!( 121 + "invalid OSPREY_COORDINATOR_CONSUMER_TYPE '{}', must be 'kafka' or 'pubsub'", 122 + invalid 123 + ); 124 + } 125 + None => { 126 + tracing::info!( 127 + "OSPREY_COORDINATOR_CONSUMER_TYPE not set, defaulting to Kafka consumer" 128 + ); 129 + Box::pin(start_kafka_consumer( 130 + snowflake_client.clone(), 131 + priority_queue_sender.clone(), 132 + metrics.clone(), 133 + )) 134 + as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 135 + } 136 + }; 137 + 103 138 let grpc_bidi_stream_service_fut = pigeon::serve( 104 139 osprey_coordinator_grpc_bidi_stream_service, 105 140 "osprey_coordinator", ··· 122 157 priority_queue_receiver.clone(), 123 158 ); 124 159 125 - tracing::info!("starting pubsub listener/bidi stream/sync classification rpc"); 126 - let (pubsub_result, grpc_bidi_stream_service_result, sync_action_service_result) = join!( 127 - pubsub_fut, 160 + tracing::info!("starting consumer/bidi stream/sync classification rpc"); 161 + let (consumer_result, grpc_bidi_stream_service_result, sync_action_service_result) = join!( 162 + consumer_fut, 128 163 grpc_bidi_stream_service_fut, 129 164 sync_action_service_fut 130 165 ); 131 166 tracing::info!({ 132 - pubsub_result=?pubsub_result, 167 + consumer_result=?consumer_result, 133 168 bidi_stream_result=?grpc_bidi_stream_service_result, 134 169 sync_action_result=?sync_action_service_result}, 135 170 "osprey coordinator terminated");
-494
osprey_coordinator/src/pubsub.rs
··· 1 - use std::collections::HashMap; 2 - use std::sync::Arc; 3 - use std::time::Duration; 4 - 5 - use crate::gcloud::grpc::connection::Connection; 6 - use crate::gcloud::{ 7 - auth::AuthorizationHeaderInterceptor, 8 - gcp_metadata::GCPMetadataClient, 9 - google::pubsub::v1::subscriber_client::SubscriberClient, 10 - kms::{AesGcmEnvelope, GOOGLE_KMS_DOMAIN}, 11 - pubsub::{PubSubSubscription, GOOGLE_PUBSUB_DOMAIN}, 12 - }; 13 - use crate::metrics::counters::StaticCounter; 14 - use crate::metrics::histograms::StaticHistogram; 15 - use crate::metrics::MetricsClientBuilder; 16 - use crate::{ 17 - coordinator_metrics::OspreyCoordinatorMetrics, 18 - priority_queue::{AckOrNack, AckableAction, PriorityQueueSender}, 19 - proto::{self, osprey_coordinator_action::SecretData}, 20 - pub_sub_streaming_pull::DetachedMessage, 21 - pub_sub_streaming_pull::{FlowControl, SpawnTaskPerMessageHandler, StreamingPullManager}, 22 - }; 23 - use anyhow::{anyhow, Result}; 24 - use msgpack_simple::MsgPack; 25 - use prost::Message; 26 - use prost_types::Timestamp; 27 - use rand::Rng; 28 - use serde::Deserialize; 29 - use serde_json::Value; 30 - use tokio::time::{timeout, Duration as TokioDuration, Instant}; 31 - use tonic::{codegen::InterceptedService, transport::Channel}; 32 - 33 - use crate::proto::Action as OspreyProtoAction; 34 - use crate::signals::exit_signal; 35 - use crate::snowflake_client::SnowflakeClient; 36 - use convert_case::{Case, Casing}; 37 - use proto::osprey_coordinator_action::ActionData; 38 - 39 - async fn decode_proto_message( 40 - message_data: &[u8], 41 - ack_id: u64, 42 - message_timestamp: Timestamp, 43 - snowflake_client: &SnowflakeClient, 44 - metrics: &OspreyCoordinatorMetrics, 45 - ) -> Result<proto::OspreyCoordinatorAction> { 46 - let osprey_proto_action = OspreyProtoAction::decode(message_data).unwrap(); 47 - let action_id = if osprey_proto_action.id == 0 { 48 - metrics.action_id_snowflake_generation_proto.incr(); 49 - snowflake_client.generate_id().await? 50 - } else { 51 - osprey_proto_action.id 52 - }; 53 - let action_name = osprey_proto_action 54 - .data 55 - .unwrap() 56 - .to_string() 57 - .to_case(Case::Snake); 58 - Ok(proto::OspreyCoordinatorAction { 59 - ack_id, 60 - action_id, 61 - action_name, 62 - action_data: Some(ActionData::ProtoActionData(message_data.into())), 63 - secret_data: None, 64 - timestamp: Some(message_timestamp), 65 - }) 66 - } 67 - 68 - async fn decode_msgpack_json_message( 69 - message_data: &[u8], 70 - ack_id: u64, 71 - message_timestamp: Timestamp, 72 - snowflake_client: &SnowflakeClient, 73 - metrics: &OspreyCoordinatorMetrics, 74 - ) -> Result<proto::OspreyCoordinatorAction> { 75 - // This whole function can probably be optimized way better, but in the interest of time I am leaving 76 - // it in a working state for now. 77 - #[derive(Deserialize, Debug)] 78 - struct PubsubAction { 79 - id: Option<String>, 80 - name: String, 81 - data: Value, 82 - secret_data: Option<Value>, 83 - } 84 - 85 - let decoded = MsgPack::parse(message_data)?; 86 - let decoded = decoded.as_string()?; 87 - let pubsub_action: PubsubAction = serde_json::from_str(decoded.as_str())?; 88 - 89 - let serde_json_vec = serde_json::to_vec(&pubsub_action.data)?; 90 - let optional_secret_data = match &pubsub_action.secret_data { 91 - Some(secret_data) => Some(SecretData::JsonSecretData(serde_json::to_vec(secret_data)?)), 92 - _ => None, 93 - }; 94 - 95 - // old msgpack parsing 96 - // let mut out = Vec::with_capacity(1024 * 6); 97 - // let mut de = serde_json::Deserializer::from_slice(serde_json_vec.as_slice()); 98 - // let mut se = rmp_serde::Serializer::new(&mut out); 99 - // serde_transcode::transcode(&mut de, &mut se).unwrap(); 100 - 101 - let action_id = match pubsub_action.id { 102 - Some(id) => id.parse::<u64>()?, 103 - None => { 104 - metrics.action_id_snowflake_generation_json.incr(); 105 - snowflake_client.generate_id().await? 106 - } 107 - }; 108 - 109 - Ok(proto::OspreyCoordinatorAction { 110 - ack_id, 111 - action_id, 112 - action_name: pubsub_action.name, 113 - action_data: Some(ActionData::JsonActionData(serde_json_vec)), 114 - secret_data: optional_secret_data, 115 - timestamp: Some(message_timestamp), 116 - }) 117 - } 118 - 119 - async fn decrypt_pubsub_message( 120 - kms_envelope: Arc<AesGcmEnvelope>, 121 - message_data: &[u8], 122 - ) -> Result<Vec<u8>> { 123 - kms_envelope 124 - .decrypt(message_data) 125 - .await 126 - .map_err(|err| anyhow!("message decryption failed: {}", err.to_string())) 127 - } 128 - 129 - async fn create_action_from_pubsub_message( 130 - kms_envelope: Arc<AesGcmEnvelope>, 131 - message_data: &[u8], 132 - message_attributes: &HashMap<String, String>, 133 - ack_id: u64, 134 - message_timestamp: Timestamp, 135 - snowflake_client: &SnowflakeClient, 136 - metrics: &OspreyCoordinatorMetrics, 137 - ) -> Result<proto::OspreyCoordinatorAction> { 138 - let decrypted_message_vector = match message_attributes.get("encrypted") { 139 - Some(is_encrypted) if is_encrypted == "true" => { 140 - Some(decrypt_pubsub_message(kms_envelope, message_data).await?) 141 - } 142 - _ => None, 143 - }; 144 - let message_data = match &decrypted_message_vector { 145 - Some(data) => &data[..], 146 - None => message_data, 147 - }; 148 - match message_attributes.get("encoding") { 149 - Some(encoding) if encoding == "proto" => { 150 - decode_proto_message( 151 - message_data, 152 - ack_id, 153 - message_timestamp, 154 - snowflake_client, 155 - metrics, 156 - ) 157 - .await 158 - } 159 - _ => { 160 - decode_msgpack_json_message( 161 - message_data, 162 - ack_id, 163 - message_timestamp, 164 - snowflake_client, 165 - metrics, 166 - ) 167 - .await 168 - } 169 - } 170 - } 171 - 172 - async fn create_pubsub_subscription_client( 173 - ) -> SubscriberClient<InterceptedService<Channel, AuthorizationHeaderInterceptor>> { 174 - let emulator_host = std::env::var("PUBSUB_EMULATOR_HOST").ok(); 175 - 176 - let timeout = Duration::from_secs(5); 177 - 178 - if let Some(emulator_host) = emulator_host { 179 - tracing::info!("Creating subscription client to emulator"); 180 - Connection::new_no_auth( 181 - format!("http://{}", emulator_host).try_into().unwrap(), 182 - timeout, 183 - ) 184 - .create_subscriber_client() 185 - } else { 186 - tracing::info!("Creating subscription client to real pubsub"); 187 - let service_account = 188 - std::env::var("OSPREY_COORDINATOR_SERVICE_ACCOUNT").unwrap_or("default".to_string()); 189 - let client = GCPMetadataClient::new(service_account).unwrap(); 190 - Connection::from_metadata_client( 191 - client, 192 - timeout, 193 - Duration::from_secs(24000), 194 - GOOGLE_PUBSUB_DOMAIN, 195 - ) 196 - .await 197 - .unwrap() 198 - .create_subscriber_client() 199 - } 200 - } 201 - 202 - pub async fn start_pubsub_subscriber( 203 - snowflake_client: Arc<SnowflakeClient>, 204 - priority_queue_sender: PriorityQueueSender, 205 - metrics: Arc<OspreyCoordinatorMetrics>, 206 - ) -> Result<()> { 207 - let subscriber_client = create_pubsub_subscription_client().await; 208 - let subscription_name = { 209 - let project_id = 210 - std::env::var("PUBSUB_SUBSCRIPTION_PROJECT_ID").unwrap_or("osprey-dev".to_string()); 211 - 212 - let subscription_id = std::env::var("PUBSUB_SUBSCRIPTION_ID") 213 - .unwrap_or("osprey-coordinator-actions".to_string()); 214 - 215 - PubSubSubscription::new(project_id, subscription_id) 216 - }; 217 - 218 - let kek_uri = std::env::var("PUBSUB_ENCRYPTION_KEY_URI").unwrap_or("".to_string()); 219 - 220 - let kms_envelope = Connection::from_metadata_client( 221 - GCPMetadataClient::new("default".into())?, 222 - Duration::from_secs(5), 223 - Duration::from_secs(24000), 224 - GOOGLE_KMS_DOMAIN, 225 - ) 226 - .await? 227 - .create_kms_aes_gcm_envelope(kek_uri, Vec::new(), true)?; 228 - 229 - let kms_envelope = Arc::new(kms_envelope); 230 - let max_messages = std::env::var("PUBSUB_MAX_MESSAGES") 231 - .unwrap_or("5000".to_string()) 232 - .parse::<usize>() 233 - .unwrap(); 234 - let max_processing_messages = std::env::var("PUBSUB_MAX_PROCESSING_MESSAGES") 235 - .unwrap_or("5000".to_string()) 236 - .parse::<usize>() 237 - .unwrap(); 238 - let max_time_to_send_to_async_queue = TokioDuration::from_millis( 239 - std::env::var("MAX_TIME_TO_SEND_TO_ASYNC_QUEUE_MS") 240 - .unwrap_or("500".to_string()) 241 - .parse::<u64>() 242 - .unwrap(), 243 - ); 244 - let max_acking_receiver_wait_time = TokioDuration::from_millis( 245 - std::env::var("MAX_ACKING_RECEIVER_WAIT_TIME_MS") 246 - .unwrap_or("60000".to_string()) 247 - .parse::<u64>() 248 - .unwrap(), 249 - ); 250 - 251 - tracing::info!( 252 - {subscription_name = %subscription_name}, 253 - "creating streaming pull manager" 254 - ); 255 - let flow_control = FlowControl::default() 256 - .set_max_messages(max_messages) 257 - .set_max_processing_messages(max_processing_messages) 258 - .set_max_bytes(1024 * 1024 * 1024); 259 - StreamingPullManager::new( 260 - subscriber_client, 261 - subscription_name, 262 - flow_control, 263 - SpawnTaskPerMessageHandler::new(move |message: DetachedMessage| { 264 - let metrics = metrics.clone(); 265 - let message_id = message.message_id.clone(); 266 - let priority_queue_sender = priority_queue_sender.clone(); 267 - let snowflake_client = snowflake_client.clone(); 268 - let kms_envelope = kms_envelope.clone(); 269 - async move { 270 - let message_attributes = message.attributes(); 271 - 272 - let ack_id: u64 = { 273 - let mut rng = rand::thread_rng(); 274 - rng.gen() 275 - }; 276 - 277 - let action = create_action_from_pubsub_message( 278 - kms_envelope, 279 - message.data.as_slice(), 280 - message_attributes, 281 - ack_id, 282 - message.publish_time(), 283 - snowflake_client.as_ref(), 284 - &metrics, 285 - ).await 286 - .map_err(|_| ())?; 287 - let (ackable_action, acking_receiver) = AckableAction::new(action); 288 - 289 - tracing::debug!({ack_id = %ack_id, message_id=%message_id}, "[pubsub] received pubsub message"); 290 - let send_start_time = Instant::now(); 291 - match timeout(max_time_to_send_to_async_queue, priority_queue_sender.send_async(ackable_action)).await { 292 - Ok(Ok(())) => { 293 - tracing::debug!({message_id=%message_id, ack_id=ack_id}, "[pubsub] sent pubsub message to priority queue"); 294 - metrics.async_classification_added_to_queue.incr(); 295 - }, 296 - Ok(Err(e)) => { 297 - tracing::error!({error=%e},"[pubsub] priority queue send error"); 298 - }, 299 - Err(_) => { 300 - tracing::error!({message_id=%message_id}, "[pubsub] sending to priority queue timed out"); 301 - } 302 - }; 303 - metrics.priority_queue_send_time_async.record(send_start_time.elapsed()); 304 - tracing::debug!({message_id=%message_id, ack_id=ack_id},"[pubsub] waiting on ack or nack"); 305 - 306 - let receive_start_time = Instant::now(); 307 - match timeout(max_acking_receiver_wait_time, acking_receiver).await { // 5 Minutes to return 308 - Ok(Ok(ack_or_nack)) => match ack_or_nack { 309 - AckOrNack::Ack(_optional_execution_result) => { 310 - tracing::debug!({message_id=%message_id, ack_id=ack_id},"[pubsub] acking message"); 311 - metrics.async_classification_result_ack.incr(); 312 - metrics.receiver_ack_time_async.record(receive_start_time.elapsed()); 313 - Ok(()) 314 - }, 315 - AckOrNack::Nack => { 316 - tracing::debug!({message_id=%message_id, ack_id=ack_id},"[pubsub] nacking message"); 317 - metrics.async_classification_result_nack.incr(); 318 - metrics.receiver_ack_time_async.record(receive_start_time.elapsed()); 319 - Err(()) 320 - }, 321 - }, 322 - Ok(Err(recv_error)) => { 323 - tracing::error!({message_id=%message_id, recv_error=%recv_error, ack_id=ack_id},"[pubsub] acking sender dropped"); 324 - metrics.receiver_ack_time_async.record(receive_start_time.elapsed()); 325 - Err(()) 326 - }, 327 - Err(_) => { 328 - tracing::error!({message_id=%message_id, ack_id=ack_id}, "[pubsub] waiting for ack/nack timed out"); 329 - metrics.receiver_ack_time_async.record(receive_start_time.elapsed()); 330 - Err(()) 331 - }, 332 - } 333 - } 334 - }), 335 - MetricsClientBuilder::new("osprey_coordinator.pull"), 336 - ) 337 - .gracefully_stop_on_signal(exit_signal(), Duration::from_secs(30)) 338 - .await; 339 - Result::Ok(()) 340 - } 341 - 342 - // TODO: Fix these tests 343 - 344 - // #[cfg(test)] 345 - // mod tests { 346 - // use std::{collections::HashMap, fs::File, io::Read}; 347 - 348 - // use msgpack_simple::MsgPack; 349 - // use prost::Message; 350 - // use prost_types::Timestamp; 351 - // // use protobuf_json_mapping; 352 - // use serde_json::json; 353 - 354 - // // use discord_smite_rpc_actions_proto::SmiteCoordinatorAction as SmiteRpcAction; 355 - // use crate::proto; 356 - // use std::io::Cursor; 357 - // use std::str; 358 - 359 - // use super::create_action_from_pubsub_message; 360 - 361 - // // #[test] 362 - // // fn test_create_action_from_pubsub_message_1() { 363 - // // let action_json = json!({ 364 - // // "id": "123456789", 365 - // // "name": "guild_invite_created", 366 - // // "data": { 367 - // // "char": "abc", 368 - // // "int": 1i64, 369 - // // "float2": 1.1_f64 370 - // // }, 371 - 372 - // // }); 373 - // // let encoded = MsgPack::String(action_json.to_string()).encode(); 374 - 375 - // // let prost_action = 376 - // // create_action_from_pubsub_message(encoded.as_slice(), 12344242, Timestamp::default()); 377 - // // println!("{:?}", prost_action); 378 - // // assert!(prost_action.is_ok(), "prost action decoding failed"); 379 - // // let prost_action = prost_action.unwrap(); 380 - // // let data = MsgPack::parse(&prost_action.action_data).unwrap(); 381 - // // println!("{:?}", data); 382 - // // let mut data: HashMap<String, MsgPack> = data 383 - // // .as_map() 384 - // // .unwrap() 385 - // // .into_iter() 386 - // // .map(|v| (v.key.as_string().unwrap(), v.value)) 387 - // // .collect(); 388 - 389 - // // let x = data.remove("char").unwrap().as_string().unwrap(); 390 - 391 - // // assert_eq!(x, "abc".to_string()); 392 - // // } 393 - 394 - // // #[test] 395 - // // fn test_create_action_from_pubsub_message_2() { 396 - // // let action_json = json!({ 397 - // // "id": "123456789", 398 - // // "name": "guild_invite_created", 399 - // // "data": { 400 - // // "char": "abc", 401 - // // "int": 1i64, 402 - // // "float2": 1.1_f64 403 - // // }, 404 - 405 - // // }); 406 - // // // let encoded = MsgPack::String(action_json.to_string()).encode(); 407 - 408 - // // let prost_action = create_action_from_pubsub_message( 409 - // // action_json.to_string().bytes().collect(), 410 - // // 12344242, 411 - // // Timestamp::default(), 412 - // // ); 413 - // // println!("{:?}", prost_action); 414 - // // assert!(prost_action.is_ok(), "prost action decoding failed"); 415 - // // let prost_action = prost_action.unwrap(); 416 - // // let data = MsgPack::parse(&prost_action.action_data).unwrap(); 417 - // // println!("{:?}", data); 418 - // // let mut data: HashMap<String, MsgPack> = data 419 - // // .as_map() 420 - // // .unwrap() 421 - // // .into_iter() 422 - // // .map(|v| (v.key.as_string().unwrap(), v.value)) 423 - // // .collect(); 424 - 425 - // // let x = data.remove("char").unwrap().as_string().unwrap(); 426 - 427 - // // assert_eq!(x, "abc".to_string()); 428 - // // } 429 - // #[test] 430 - // fn test_create_action_from_pubsub_proto_action() { 431 - // let mut file = File::open("pubsub_messages.json").unwrap(); 432 - // let mut data = String::new(); 433 - // file.read_to_string(&mut data).unwrap(); 434 - // let json: serde_json::Value = serde_json::from_str(&data).unwrap(); 435 - // let action_jsons = json.as_array().expect("was not array"); 436 - // let action_json = action_jsons[0].as_object().expect("is not map"); 437 - // println!("{:?}", action_json); 438 - // let action_data = action_json 439 - // .get("message") 440 - // .unwrap() 441 - // .as_object() 442 - // .unwrap() 443 - // .get("data") 444 - // .unwrap() 445 - // .as_str() 446 - // .unwrap(); 447 - 448 - // println!("{:?}", action_data); 449 - // let action_data = str::from_utf8(action_data.as_bytes()).unwrap(); 450 - // println!("{:?}", action_data); 451 - // // let action_data = base64::decode(action_data).unwrap(); 452 - 453 - // // let mut action_prost: SmiteRpcAction = 454 - // // SmiteRpcAction::decode(&mut Cursor::new(action_data.to_string())).unwrap(); 455 - // println!("--------"); 456 - // let test_object_data = "CWUQhNrjpmQOGsgBCgsJCwCCONT0dQYQARK4AQogfnvDH9y83BpR5FraKYJSCxTDrQkSb1axRgl9i82pYwgQARoLCI2fiZYGEPeX1CciCwjksNCbBhC8i4FgKgsI5MaGmwYQvIuBYDItCg0KCzk1LjIuMTIuMTY2EgkKB0FuZHJvaWQaEQoPRGlzY29yZCBBbmRyb2lkOi8KDwoNMTc2LjIxOS40Mi4zNBIJCgdBbmRyb2lkGhEKD0Rpc2NvcmQgQW5kcm9pZEILCPqihpsGEOj7p0c="; 457 - // let decoded_base64 = base64::decode(test_object_data).unwrap(); 458 - // let mut test_action = proto::SmiteCoordinatorAction::default(); 459 - // test_action.id = 20; 460 - 461 - // println!("{:?}", test_action); 462 - // let x = test_action.encode_to_vec(); 463 - // println!("{:?}", x); 464 - // println!("--------"); 465 - // println!("{:?}", decoded_base64); 466 - // let mut action_prost: SmiteRpcAction = 467 - // SmiteRpcAction::decode(decoded_base64.as_slice()).unwrap(); 468 - // println!("{:?}", action_prost); 469 - 470 - // let output = protobuf_json_mapping::print_to_string(action_prost); 471 - // println!("{:?}", output); 472 - 473 - // // let prost_action = create_action_from_pubsub_message( 474 - // // action_json.to_string().bytes().collect(), 475 - // // 12344242, 476 - // // Timestamp::default(), 477 - // // ); 478 - // // println!("{:?}", prost_action); 479 - // // assert!(prost_action.is_ok(), "prost action decoding failed"); 480 - // // let prost_action = prost_action.unwrap(); 481 - // // let data = MsgPack::parse(&prost_action.action_data).unwrap(); 482 - // // println!("{:?}", data); 483 - // // let mut data: HashMap<String, MsgPack> = data 484 - // // .as_map() 485 - // // .unwrap() 486 - // // .into_iter() 487 - // // .map(|v| (v.key.as_string().unwrap(), v.value)) 488 - // // .collect(); 489 - 490 - // // let x = data.remove("char").unwrap().as_string().unwrap(); 491 - 492 - // // assert_eq!(x, "abc".to_string()); 493 - // } 494 - // }
+12
osprey_coordinator/test_data/pubsub_proto_message.json
··· 1 + [ 2 + { 3 + "ackId": "test-ack-id-proto", 4 + "message": { 5 + "attributes": { 6 + "encoding": "proto", 7 + "user_id": "987654321" 8 + }, 9 + "data": "CbFo3joAAAAAMgA=" 10 + } 11 + } 12 + ]
+69
start.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + # Helper script to start Osprey with different configurations 5 + # Usage: 6 + # ./start.sh # Start with worker directly consuming from Kafka 7 + # ./start.sh --with-coordinator # Start with Osprey Coordinator 8 + # ./start.sh --help # Show this help 9 + 10 + show_help() { 11 + echo "Osprey Startup Helper" 12 + echo "" 13 + echo "Usage: ./start.sh [OPTIONS] [COMPOSE_ARGS...]" 14 + echo "" 15 + echo "Options:" 16 + echo " --with-coordinator Start Osprey with Coordinator (workers connect to coordinator)" 17 + echo " --help, -h Show this help message" 18 + echo "" 19 + echo "Examples:" 20 + echo " ./start.sh # Direct Kafka consumption" 21 + echo " ./start.sh --with-coordinator # With coordinator" 22 + echo " ./start.sh --with-coordinator up -d # With coordinator in detached mode" 23 + echo " ./start.sh --with-coordinator --profile test_data up # With test data producer" 24 + echo "" 25 + echo "When using --with-coordinator, the following services are added:" 26 + echo " - osprey-coordinator: Action distribution and load balancing" 27 + echo " - etcd: Service discovery for coordinator" 28 + echo "" 29 + } 30 + 31 + USE_COORDINATOR=false 32 + COMPOSE_FILES="-f docker-compose.yaml" 33 + COMPOSE_ARGS=() 34 + 35 + # Parse arguments 36 + while [[ $# -gt 0 ]]; do 37 + case $1 in 38 + --with-coordinator) 39 + USE_COORDINATOR=true 40 + shift 41 + ;; 42 + --help|-h) 43 + show_help 44 + exit 0 45 + ;; 46 + *) 47 + # Pass remaining args to docker compose 48 + COMPOSE_ARGS+=("$1") 49 + shift 50 + ;; 51 + esac 52 + done 53 + 54 + if [ "$USE_COORDINATOR" = true ]; then 55 + echo "Starting Osprey with Coordinator..." 56 + COMPOSE_FILES="$COMPOSE_FILES -f example_docker_compose/run_osprey_with_coordinator/docker-compose.coordinator.yaml" 57 + else 58 + echo "Starting Osprey without Coordiantor (direct Kafka consumption)..." 59 + fi 60 + 61 + # If no compose args provided, default to 'up' 62 + if [ ${#COMPOSE_ARGS[@]} -eq 0 ]; then 63 + COMPOSE_ARGS=("up") 64 + fi 65 + 66 + echo "Running: docker compose $COMPOSE_FILES ${COMPOSE_ARGS[@]}" 67 + echo "" 68 + 69 + exec docker compose $COMPOSE_FILES "${COMPOSE_ARGS[@]}"