Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Allow osprey coordinator to change the service name through config (#59)

authored by

Leon Shi and committed by
GitHub
76bf0037 704347d5

+16 -4
+4 -1
osprey_worker/src/osprey/worker/cli/sinks.py
··· 182 182 183 183 # Input Stream 184 184 input_stream_ready_signaler = InputStreamReadySignaler() 185 + coordinator_service_name = config.get_str('OSPREY_COORDINATOR_SERVICE_NAME', 'osprey_coordinator') 185 186 input_stream = OspreyCoordinatorInputStream( 186 - client_id=f'{uuid1()}', input_stream_ready_signaler=input_stream_ready_signaler 187 + client_id=f'{uuid1()}', 188 + input_stream_ready_signaler=input_stream_ready_signaler, 189 + coordinator_service_name=coordinator_service_name, 187 190 ) 188 191 signal.signal(signal.SIGTERM, lambda *args: input_stream.stop()) 189 192 signal.signal(signal.SIGINT, lambda *args: input_stream.stop())
+5 -1
osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py
··· 43 43 ) 44 44 45 45 elif input_stream_source == InputStreamSource.OSPREY_COORDINATOR: 46 - return OspreyCoordinatorInputStream(client_id='meow') 46 + coordinator_service_name = config.get_str('OSPREY_COORDINATOR_SERVICE_NAME', 'osprey_coordinator') 47 + return OspreyCoordinatorInputStream( 48 + client_id='meow', 49 + coordinator_service_name=coordinator_service_name, 50 + ) 47 51 48 52 elif input_stream_source == InputStreamSource.SYNTHETIC: 49 53 random_actions = []
+7 -2
osprey_worker/src/osprey/worker/sinks/sink/osprey_coordinator_input_stream.py
··· 187 187 188 188 _STOP_STREAMING_SIGNAL = object() 189 189 190 - def __init__(self, client_id: str, input_stream_ready_signaler: Optional[InputStreamReadySignaler] = None) -> None: 190 + def __init__( 191 + self, 192 + client_id: str, 193 + input_stream_ready_signaler: Optional[InputStreamReadySignaler] = None, 194 + coordinator_service_name: str = 'osprey_coordinator', 195 + ) -> None: 191 196 super().__init__() 192 197 193 198 self._outgoing_request_queue: Queue[Request] = Queue() 194 199 self._soft_shutdown_signal_received = False 195 - self._channel_pool = GrpcConnectionDiscoveryPool('osprey_coordinator') 200 + self._channel_pool = GrpcConnectionDiscoveryPool(coordinator_service_name) 196 201 self._client_id = client_id 197 202 self._input_stream_ready_signaler = input_stream_ready_signaler 198 203 self._current_execution_result: Optional[ExecutionResult] = None