Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

add option to suppress cached errors to reduce metric bloat (#180)

authored by

Li Yu and committed by
GitHub
3c77e998 759af1dd

+107 -2
+15 -1
osprey_worker/src/osprey/engine/executor/external_service_utils.py
··· 32 32 """ 33 33 return None 34 34 35 + def count_error_once(self) -> bool: 36 + """ 37 + When True, only the caller that initiated the external service call 38 + receives the exception. Subsequent callers that would hit the cached 39 + error receive None instead. 40 + 41 + Only enable this when ValueT is Optional and None is a safe fallback. 42 + """ 43 + return False 44 + 35 45 36 46 class ExternalServiceAccessor(Generic[KeyT, ValueT]): 37 47 """Facilitates accessing an external service in a way that caches and debounces requests based on a key.""" ··· 84 94 try: 85 95 cache_entry[0].set(self._service.get_from_service(key)) 86 96 except Exception as e: 87 - cache_entry[0].set_exception(e) 97 + if self._service.count_error_once(): 98 + cache_entry[0].set(None) 99 + else: 100 + cache_entry[0].set_exception(e) 101 + raise 88 102 89 103 return cast(ValueT, cache_entry[0].get()) 90 104
+92 -1
osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_external_service_utils.py
··· 1 - from typing import List 1 + from typing import List, Optional 2 2 3 3 import gevent 4 + import pytest 4 5 from gevent.event import Event 5 6 from osprey.engine.executor.external_service_utils import ExternalService, ExternalServiceAccessor 6 7 ··· 30 31 return super().get_from_service(key) 31 32 32 33 34 + class FailingService(ExternalService[str, int]): 35 + """Always raises on the first call for a key.""" 36 + 37 + def __init__(self) -> None: 38 + self.calls: List[str] = [] 39 + 40 + def get_from_service(self, key: str) -> int: 41 + self.calls.append(key) 42 + raise RuntimeError(f'timeout for {key}') 43 + 44 + 45 + class CountErrorOnceFailingService(ExternalService[str, Optional[int]]): 46 + """Always raises, but opts in to count_error_once.""" 47 + 48 + def __init__(self) -> None: 49 + self.calls: List[str] = [] 50 + 51 + def count_error_once(self) -> bool: 52 + return True 53 + 54 + def get_from_service(self, key: str) -> Optional[int]: 55 + self.calls.append(key) 56 + raise RuntimeError(f'timeout for {key}') 57 + 58 + 33 59 def test_accessor_caches_values() -> None: 34 60 service = CountingService() 35 61 accessor = ExternalServiceAccessor(service) ··· 89 115 assert g1.get() == 1 90 116 assert g2.get() == 2 91 117 assert g3.get() == 1 118 + 119 + 120 + def test_cached_errors_re_raise_by_default() -> None: 121 + """Without count_error_once, cached exceptions re-raise for every caller.""" 122 + service = FailingService() 123 + accessor = ExternalServiceAccessor(service) 124 + 125 + with pytest.raises(RuntimeError, match='timeout for a'): 126 + accessor.get('a') 127 + 128 + # Subsequent call also raises from cache 129 + with pytest.raises(RuntimeError, match='timeout for a'): 130 + accessor.get('a') 131 + 132 + # Only one service call was made 133 + assert service.calls == ['a'] 134 + 135 + 136 + def test_count_error_once_returns_none_on_subsequent_calls() -> None: 137 + """With count_error_once, the first caller gets the exception but 138 + subsequent callers receive None from the cache.""" 139 + service = CountErrorOnceFailingService() 140 + accessor = ExternalServiceAccessor(service) 141 + 142 + with pytest.raises(RuntimeError, match='timeout for a'): 143 + accessor.get('a') 144 + 145 + # Subsequent call returns None from cache 146 + assert accessor.get('a') is None 147 + 148 + # Only one service call was made 149 + assert service.calls == ['a'] 150 + 151 + 152 + def test_count_error_once_concurrent_waiters() -> None: 153 + """With count_error_once, concurrent waiters get None while the 154 + initiating greenlet gets the exception.""" 155 + 156 + class BlockingThenFailService(ExternalService[str, Optional[int]]): 157 + def __init__(self) -> None: 158 + self.event = Event() 159 + 160 + def count_error_once(self) -> bool: 161 + return True 162 + 163 + def get_from_service(self, key: str) -> Optional[int]: 164 + self.event.wait() 165 + raise RuntimeError('timeout') 166 + 167 + service = BlockingThenFailService() 168 + accessor = ExternalServiceAccessor(service) 169 + 170 + g1 = gevent.spawn(lambda: accessor.get('a')) 171 + g2 = gevent.spawn(lambda: accessor.get('a')) 172 + gevent.idle() 173 + 174 + service.event.set() 175 + gevent.idle() 176 + 177 + # The initiating greenlet gets the exception 178 + assert g1.exception is not None or g2.exception is not None 179 + 180 + # The waiting greenlet gets None 181 + results = [g.value for g in [g1, g2] if g.exception is None] 182 + assert None in results