Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

run bigtable rpcs in their own thread (#36)

authored by

Ethan Breder and committed by
GitHub
e96e371f a8e67633

+19 -4
+19 -4
osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py
··· 10 10 import gevent 11 11 import google.cloud.storage as storage 12 12 import pytz 13 + from gevent.threadpool import ThreadPool 13 14 from google.api_core import retry 14 15 from google.cloud.bigtable import row_filters, row_set 15 16 from google.cloud.bigtable.row import Row ··· 31 32 BIGTABLE_CONCURRENCY_LIMIT = 100 32 33 GCS_CONCURRENCY_LIMIT = 100 33 34 MINIO_CONCURRENCY_LIMIT = 100 35 + 36 + # Thread pool for BigTable operations to avoid gevent/gRPC conflicts 37 + # Using a dedicated thread pool ensures gRPC calls run in real OS threads 38 + _bigtable_threadpool = ThreadPool(maxsize=BIGTABLE_CONCURRENCY_LIMIT) 34 39 35 40 36 41 class ExecutionResultStore(ABC): ··· 182 187 retry_policy = retry.Retry(initial=1.0, maximum=2.0, multiplier=1.25, deadline=120.0) 183 188 184 189 def select_one(self, action_id: int) -> Optional[Dict[str, Any]]: 185 - row = osprey_bigtable.table('stored_execution_result').read_row( 186 - StoredExecutionResultBigTable._encode_action_id(action_id), row_filters.CellsColumnLimitFilter(1) 187 - ) 190 + # Run read_row in a real thread to avoid gevent/gRPC conflicts 191 + def _read_row(): 192 + return osprey_bigtable.table('stored_execution_result').read_row( 193 + StoredExecutionResultBigTable._encode_action_id(action_id), row_filters.CellsColumnLimitFilter(1) 194 + ) 195 + 196 + row = _bigtable_threadpool.apply(_read_row) 188 197 if not row: 189 198 return None 190 199 ··· 229 238 row.set_cell('execution_result', b'error_traces', error_traces_json.encode(), timestamp=timestamp) 230 239 row.set_cell('execution_result', b'timestamp', timestamp.isoformat().encode(), timestamp=timestamp) 231 240 row.set_cell('execution_result', b'action_data', action_data_json.encode(), timestamp=timestamp) 232 - osprey_bigtable.table('stored_execution_result').mutate_rows([row], retry=self.retry_policy) 241 + 242 + # Run mutate_rows in a real thread to avoid gevent/gRPC conflicts 243 + # This prevents the greenlet context-switching issue with gRPC streaming calls 244 + def _mutate_rows(): 245 + osprey_bigtable.table('stored_execution_result').mutate_rows([row], retry=self.retry_policy) 246 + 247 + _bigtable_threadpool.apply(_mutate_rows) 233 248 234 249 @staticmethod 235 250 def _encode_action_id(action_id_snowflake: int) -> bytes: