Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

configurable execution result store (#29)

Co-authored-by: ayu <ayu@ayu.dev>

authored by

hailey
ayu
and committed by
GitHub
e794aa81 d6839e99

+105 -14
+43
osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py
··· 1 + from typing import Optional 2 + 3 + from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store 4 + from osprey.worker.lib.singletons import CONFIG 5 + from osprey.worker.lib.storage import ExecutionResultStorageBackendType 6 + from osprey.worker.lib.storage.stored_execution_result import ( 7 + ExecutionResultStore, 8 + StoredExecutionResultBigTable, 9 + StoredExecutionResultGCS, 10 + StoredExecutionResultMinIO, 11 + ) 12 + 13 + 14 + def get_rules_execution_result_storage_backend( 15 + backend_type: ExecutionResultStorageBackendType, 16 + ) -> Optional[ExecutionResultStore]: 17 + """Based on the `backend_type` constructs a configured execution result store that can be used to store execution 18 + results. For more details, see `ExecutionResultStore`.""" 19 + 20 + config = CONFIG.instance() 21 + 22 + if backend_type == ExecutionResultStorageBackendType.BIGTABLE: 23 + return StoredExecutionResultBigTable() 24 + elif backend_type == ExecutionResultStorageBackendType.GCS: 25 + return StoredExecutionResultGCS() 26 + elif backend_type == ExecutionResultStorageBackendType.MINIO: 27 + endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') 28 + access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') 29 + secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') 30 + secure = config.get_bool('OSPREY_MINIO_SECURE', False) 31 + bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output') 32 + 33 + return StoredExecutionResultMinIO( 34 + endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name 35 + ) 36 + elif backend_type == ExecutionResultStorageBackendType.PLUGIN: 37 + store = bootstrap_execution_result_store(config=config) 38 + if store is None: 39 + raise AssertionError('No execution result store registered') 40 + elif backend_type == ExecutionResultStorageBackendType.NONE: 41 + return None 42 + 43 + return None
+12 -3
osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py
··· 1 1 from typing import List, Sequence 2 2 3 3 from kafka import KafkaProducer 4 - from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store, hookimpl_osprey 4 + from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend 5 + from osprey.worker.adaptor.plugin_manager import hookimpl_osprey 5 6 from osprey.worker.lib.config import Config 7 + from osprey.worker.lib.storage import ExecutionResultStorageBackendType 6 8 from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink 7 9 from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink 8 10 from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink ··· 23 25 kafka_producer=KafkaProducer(bootstrap_servers=bootstrap_servers, client_id=client_id), 24 26 ) 25 27 ) 26 - execution_result_store = bootstrap_execution_result_store(config=config) 27 - if execution_result_store is not None: 28 + 29 + storage_backend_type = ExecutionResultStorageBackendType( 30 + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() 31 + ) 32 + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) 33 + 34 + # There may not be an execution result store configured, so check before adding the output sink 35 + if storage_backend is not None: 28 36 sinks.append(StoredExecutionResultOutputSink()) 37 + 29 38 return sinks
+8 -9
osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py
··· 1 1 from typing import Optional 2 2 3 + from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend 3 4 from osprey.worker.adaptor.plugin_manager import hookimpl_osprey 4 5 from osprey.worker.lib.config import Config 5 - from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore, StoredExecutionResultMinIO 6 + from osprey.worker.lib.storage import ExecutionResultStorageBackendType 7 + from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore 6 8 7 9 8 10 @hookimpl_osprey(trylast=True) 9 11 def register_execution_result_store(config: Config) -> Optional[ExecutionResultStore]: 10 - endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') 11 - access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') 12 - secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') 13 - secure = config.get_bool('OSPREY_MINIO_SECURE', False) 14 - bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output') 12 + storage_backend_type = ExecutionResultStorageBackendType( 13 + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() 14 + ) 15 + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) 15 16 16 - return StoredExecutionResultMinIO( 17 - endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name 18 - ) 17 + return storage_backend
+31
osprey_worker/src/osprey/worker/lib/storage/__init__.py
··· 1 + from enum import StrEnum, auto 2 + 1 3 # Import all models to ensure they're registered with SQLAlchemy 2 4 # This is required for metadata.create_all() to create all tables 3 5 from .bulk_action_task import BulkActionJob, BulkActionTask # noqa: F401 4 6 from .bulk_label_task import BulkLabelTask # noqa: F401 5 7 from .queries import Query, SavedQuery # noqa: F401 6 8 from .temporary_ability_token import TemporaryAbilityToken # noqa: F401 9 + 10 + 11 + class ExecutionResultStorageBackendType(StrEnum): 12 + """Type of store used for execution results.""" 13 + 14 + BIGTABLE = auto() 15 + """ 16 + Bigtable execution result store 17 + """ 18 + 19 + GCS = auto() 20 + """ 21 + Google Cloud Storage execution result store 22 + """ 23 + 24 + MINIO = auto() 25 + """ 26 + Minio execution result store 27 + """ 28 + 29 + PLUGIN = auto() 30 + """ 31 + Execution result store that is defined via register_execution_result_store 32 + """ 33 + 34 + NONE = auto() 35 + """ 36 + Disable execution results from being stored. This may cause certain elements of Osprey to break, such as the events stream and individual event details in the UI 37 + """
+11 -2
osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py
··· 19 19 from osprey.worker.lib.instruments import metrics 20 20 from osprey.worker.lib.osprey_shared.logging import get_logger 21 21 from osprey.worker.lib.snowflake import Snowflake 22 + from osprey.worker.lib.storage import ExecutionResultStorageBackendType 22 23 from osprey.worker.lib.storage.bigtable import osprey_bigtable 23 24 from pydantic.main import BaseModel 24 25 ··· 497 498 498 499 def bootstrap_execution_result_storage_service() -> ExecutionResultStorageService: 499 500 """Create an ExecutionResultStorageService with the configured storage backend.""" 500 - from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store 501 + from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend 501 502 from osprey.worker.lib.singletons import CONFIG 502 503 503 504 config = CONFIG.instance() 504 - storage_backend = bootstrap_execution_result_store(config) 505 + 506 + storage_backend_type = ExecutionResultStorageBackendType( 507 + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() 508 + ) 509 + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) 510 + 511 + if storage_backend is None: 512 + raise AssertionError('No storage backend registered') 513 + 505 514 return ExecutionResultStorageService(storage_backend)