Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Revert "run bigtable rpcs in their own thread" (#90)

authored by

Leon Shi and committed by
GitHub
f0164353 63599d6c

+4 -19
+4 -19
osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py
··· 10 10 import gevent 11 11 import google.cloud.storage as storage 12 12 import pytz 13 - from gevent.threadpool import ThreadPool 14 13 from google.api_core import retry 15 14 from google.cloud.bigtable import row_filters, row_set 16 15 from google.cloud.bigtable.row import Row ··· 32 31 BIGTABLE_CONCURRENCY_LIMIT = 100 33 32 GCS_CONCURRENCY_LIMIT = 100 34 33 MINIO_CONCURRENCY_LIMIT = 100 35 - 36 - # Thread pool for BigTable operations to avoid gevent/gRPC conflicts 37 - # Using a dedicated thread pool ensures gRPC calls run in real OS threads 38 - _bigtable_threadpool = ThreadPool(maxsize=BIGTABLE_CONCURRENCY_LIMIT) 39 34 40 35 41 36 class ExecutionResultStore(ABC): ··· 187 182 retry_policy = retry.Retry(initial=1.0, maximum=2.0, multiplier=1.25, deadline=120.0) 188 183 189 184 def select_one(self, action_id: int) -> Optional[Dict[str, Any]]: 190 - # Run read_row in a real thread to avoid gevent/gRPC conflicts 191 - def _read_row(): 192 - return osprey_bigtable.table('stored_execution_result').read_row( 193 - StoredExecutionResultBigTable._encode_action_id(action_id), row_filters.CellsColumnLimitFilter(1) 194 - ) 195 - 196 - row = _bigtable_threadpool.apply(_read_row) 185 + row = osprey_bigtable.table('stored_execution_result').read_row( 186 + StoredExecutionResultBigTable._encode_action_id(action_id), row_filters.CellsColumnLimitFilter(1) 187 + ) 197 188 if not row: 198 189 return None 199 190 ··· 238 229 row.set_cell('execution_result', b'error_traces', error_traces_json.encode(), timestamp=timestamp) 239 230 row.set_cell('execution_result', b'timestamp', timestamp.isoformat().encode(), timestamp=timestamp) 240 231 row.set_cell('execution_result', b'action_data', action_data_json.encode(), timestamp=timestamp) 241 - 242 - # Run mutate_rows in a real thread to avoid gevent/gRPC conflicts 243 - # This prevents the greenlet context-switching issue with gRPC streaming calls 244 - def _mutate_rows(): 245 - osprey_bigtable.table('stored_execution_result').mutate_rows([row], retry=self.retry_policy) 246 - 247 - _bigtable_threadpool.apply(_mutate_rows) 232 + osprey_bigtable.table('stored_execution_result').mutate_rows([row], retry=self.retry_policy) 248 233 249 234 @staticmethod 250 235 def _encode_action_id(action_id_snowflake: int) -> bytes: