Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Feature: Implement retriability in output sinks (#122)

authored by

Leon Shi and committed by
GitHub
7533c23b 2e977211

+44 -9
+39 -9
osprey_worker/src/osprey/worker/sinks/sink/output_sink.py
··· 1 1 import abc 2 2 from collections import defaultdict 3 3 from datetime import datetime 4 - from typing import Any, DefaultDict, Dict, Mapping, Optional, Sequence 4 + from typing import Any, Callable, DefaultDict, Dict, Mapping, Optional, Sequence 5 5 6 6 import gevent 7 7 import sentry_sdk ··· 16 16 from osprey.worker.lib.osprey_shared.labels import EntityLabelMutation 17 17 from osprey.worker.lib.osprey_shared.logging import DynamicLogSampler, get_logger 18 18 from osprey.worker.lib.storage.labels import LabelsProvider 19 + from tenacity import RetryCallState, retry, stop_after_attempt, wait_exponential 19 20 20 21 logger = get_logger() 21 22 22 23 DEFAULT_GEVENT_TIMEOUT = 2 24 + DEFAULT_MAX_RETRIES = 0 # No retries by default (1 attempt total) 23 25 24 26 25 27 class BaseOutputSink(abc.ABC): 26 28 # Default timeout for sink operations. Subclasses can override this. 27 29 timeout: float = DEFAULT_GEVENT_TIMEOUT 28 30 31 + # Retry configuration. Subclasses can override this. 32 + # 0 = no retries (1 attempt), 2 = up to 3 total attempts 33 + max_retries: int = DEFAULT_MAX_RETRIES 34 + 29 35 @abc.abstractmethod 30 36 def will_do_work(self, result: ExecutionResult) -> bool: 31 37 """A quick way to determine if this sink needs to do anything for this result.""" ··· 52 58 def will_do_work(self, result: ExecutionResult) -> bool: 53 59 return any(sink.will_do_work(result) for sink in self._sinks) 54 60 61 + def _create_push_with_retry(self, sink: BaseOutputSink) -> Callable[[ExecutionResult], None]: 62 + """Create a retry-wrapped push function for a sink. 63 + 64 + Uses tenacity for exponential backoff retries. 65 + """ 66 + sink_name = sink.__class__.__name__ 67 + 68 + def log_retry_attempt(retry_state: RetryCallState) -> None: 69 + attempt = retry_state.attempt_number 70 + exception = retry_state.outcome.exception() if retry_state.outcome else None 71 + logger.warning(f'Retrying sink {sink_name}, attempt {attempt}, error: {exception}') 72 + metrics.increment('output_sink.retry', tags=[f'sink:{sink_name}', f'attempt:{attempt}']) 73 + 74 + # stop_after_attempt(1) = no retries, stop_after_attempt(3) = 2 retries 75 + @retry( 76 + stop=stop_after_attempt(sink.max_retries + 1), 77 + wait=wait_exponential(multiplier=0.5, min=0.5, max=5), 78 + before_sleep=log_retry_attempt, 79 + reraise=True, 80 + ) 81 + def push_with_retry(result: ExecutionResult) -> None: 82 + with ( 83 + trace(f'{sink_name}.push'), 84 + metrics.timed('handled_message_output', tags=[f'sink:{sink_name}'], use_ms=True), 85 + gevent.Timeout(sink.timeout), 86 + ): 87 + sink.push(result) 88 + 89 + return push_with_retry 90 + 55 91 def push(self, result: ExecutionResult) -> None: 56 92 errors: Dict[BaseOutputSink, BaseException] = {} 57 93 58 94 for sink in self._sinks: 59 95 if sink.will_do_work(result): 60 96 sink_name = sink.__class__.__name__ 97 + push_fn = self._create_push_with_retry(sink) 61 98 try: 62 - with ( 63 - trace(f'{sink_name}.push'), 64 - metrics.timed('handled_message_output', tags=[f'sink:{sink_name}'], use_ms=True), 65 - gevent.Timeout(sink.timeout), 66 - ): 67 - sink.push(result) 99 + push_fn(result) 68 100 except gevent.Timeout as timeout_exc: 69 101 logger.exception(f'Timeout exception raised when pushing event to sink: {sink_name}') 70 102 errors[sink] = timeout_exc 71 103 metrics.increment('output_sink.timeout', tags=[f'sink:{sink_name}']) 72 - # Capture the Timeout exception 73 104 sentry_sdk.capture_exception() 74 105 except Exception as exc: 75 106 errors[sink] = exc 76 107 metrics.increment( 77 108 'output_sink.error', tags=[f'sink:{sink_name}', f'error:{exc.__class__.__name__}'] 78 109 ) 79 - # Capture the current exception for now until we fix PartialSinkFailure 80 110 sentry_sdk.capture_exception() 81 111 82 112 def stop(self) -> None:
+5
osprey_worker/src/osprey/worker/sinks/sink/stored_execution_result_output_sink.py
··· 6 6 class StoredExecutionResultOutputSink(BaseOutputSink): 7 7 """An output sink that persists the execution result to an EventRecord.""" 8 8 9 + # BigTable operations can be slow and may fail transiently. 10 + # Allow more time and retry on failure. 11 + timeout: float = 5.0 12 + max_retries: int = 2 # Up to 3 total attempts with exponential backoff 13 + 9 14 def __init__(self): 10 15 self._service = bootstrap_execution_result_storage_service() 11 16