Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

move engine initialization to a standardized location (#18)

authored by

Ethan Breder and committed by
GitHub
c498c35a 8e841309

+63 -85
+2 -1
docker-compose.yaml
··· 118 118 - OSPREY_RULES_SINK_NUM_WORKERS=1 119 119 - BIGTABLE_EMULATOR_HOST=bigtable:8361 120 120 - SNOWFLAKE_API_ENDPOINT=http://snowflake:8080 121 + - OSPREY_RULES_PATH=./example_rules 121 122 volumes: 122 123 - ./osprey_worker:/osprey/osprey_worker 123 124 - ./osprey_rpc:/osprey/osprey_rpc 124 125 - ./example_rules:/osprey/example_rules 125 - 126 + - ./entrypoint.sh:/osprey/entrypoint.sh 126 127 osprey_ui_api: 127 128 container_name: osprey_ui_api 128 129 build:
+2 -3
docs/DEVELOPMENT.md
··· 155 155 156 156 #### Rules 157 157 158 - Rules are written in SML, some examples are provided in `example_rules/` with YAML config, the rules are mounted to the worker processes when the containers start via: 158 + Rules are written in SML, some examples are provided in `example_rules/` with YAML config, the rules are mounted to the worker processes when the containers start via environment variables. ex: 159 159 160 160 ```bash 161 - uv run python3.11 osprey_worker/src/osprey/worker/sinks/cli.py run-rules-sink \ 162 - --input kafka --output stdout --rules-path ./example_rules 161 + OSPREY_RULES=./example_rules uv run python3.11 osprey_worker/src/osprey/worker/sinks/cli.py run-rules-sink 163 162 ``` 164 163 165 164 #### Test Data
+3
druid/environment
··· 49 49 druid_processing_numMergeBuffers=2 50 50 51 51 DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration> 52 + 53 + # 100MB 54 + druid.coordinator.compaction.config=[{"dataSource":"osprey.execution_results","taskPriority":25,"inputSegmentSizeBytes":100000000}]
+2 -1
druid/specs/execution_results.json
··· 4 4 "ioConfig": { 5 5 "type": "kafka", 6 6 "consumerProperties": { 7 - "bootstrap.servers": "kafka:29092" 7 + "bootstrap.servers": "kafka:29092", 8 + "auto.offset.reset": "latest" 8 9 }, 9 10 "topic": "osprey.execution_results", 10 11 "inputFormat": {
+1 -1
entrypoint.sh
··· 32 32 } 33 33 34 34 cli-osprey-worker() { 35 - exec uv run python3.11 osprey_worker/src/osprey/worker/cli/sinks.py run-rules-sink --rules-path ./example_rules 35 + exec uv run python3.11 osprey_worker/src/osprey/worker/cli/sinks.py run-rules-sink 36 36 } 37 37 38 38 cli-run-tests() {
+7 -31
osprey_worker/src/osprey/worker/cli/sinks.py
··· 1 1 # mypy: ignore-errors 2 2 # ruff: noqa: E402, E501 3 + 3 4 from osprey.worker.lib.patcher import patch_all 4 5 from osprey.worker.sinks.input_stream_chooser import get_rules_sink_input_stream 5 6 from osprey.worker.sinks.sink.output_sink import EventEffectsOutputSink ··· 12 13 13 14 # this is required to avoid memory leaks with gRPC 14 15 from gevent import config as gevent_config 15 - from osprey.worker.adaptor.plugin_manager import bootstrap_ast_validators, bootstrap_output_sinks, bootstrap_udfs 16 + from osprey.worker.adaptor.plugin_manager import bootstrap_output_sinks 16 17 17 18 gevent_config.track_greenlet_tree = False 18 19 19 20 import multiprocessing 20 21 import os 21 22 from concurrent.futures import ProcessPoolExecutor 22 - from pathlib import Path 23 23 from typing import Optional, Set, TextIO, cast 24 24 25 25 import click ··· 36 36 37 37 from osprey.worker.lib.bulk_label import TaskStatus 38 38 from osprey.worker.lib.config import Config 39 - from osprey.worker.lib.osprey_engine import OspreyEngine, get_sources_provider 39 + from osprey.worker.lib.osprey_engine import bootstrap_engine, bootstrap_engine_with_helpers, get_sources_provider 40 40 from osprey.worker.lib.osprey_shared.logging import get_logger 41 41 from osprey.worker.lib.publisher import PubSubPublisher 42 42 from osprey.worker.lib.singletons import CONFIG 43 - from osprey.worker.lib.sources_provider import EtcdSourcesProvider 44 43 from osprey.worker.lib.storage import postgres 45 44 from osprey.worker.lib.storage.bigtable import osprey_bigtable 46 45 from osprey.worker.lib.storage.bulk_label_task import BulkLabelTask ··· 109 108 110 109 111 110 @cli.command() 112 - @click.option( 113 - '--rules-path', 114 - type=click.Path(dir_okay=True, file_okay=False, exists=True), 115 - help='Which rules to use. If not provided uses the rules in etcd.', 116 - ) 117 111 @click.option('--pooled/--no-pooled', default=True, help='Whether to run multiple sinks simultaneously in a pool.') 118 112 @click.option( 119 113 '--bootstrap-pubsub/--no-bootstrap-pubsub', ··· 128 122 help='Create base tables', 129 123 ) 130 124 def run_rules_sink( 131 - rules_path: Optional[str], 132 125 pooled: bool, 133 126 bootstrap_pubsub: bool, 134 127 bootstrap_bigtable: bool, ··· 140 133 _bootstrap_bigtable() 141 134 142 135 config = init_config() 143 - sources_provider = get_sources_provider(Path(rules_path) if rules_path is not None else None) 144 136 145 - udf_registry, udf_helpers = bootstrap_udfs() 146 - bootstrap_ast_validators() 147 - 148 - engine = OspreyEngine(sources_provider=sources_provider, udf_registry=udf_registry) 137 + engine, udf_helpers = bootstrap_engine_with_helpers() 149 138 150 139 input_stream_source_string = config.get_str('OSPREY_INPUT_STREAM_SOURCE', 'plugin') 151 140 try: ··· 202 191 203 192 # Sources and Engine 204 193 sources_provider = get_sources_provider(rules_path=None, input_stream_ready_signaler=input_stream_ready_signaler) 205 - udf_registry, udf_helpers = bootstrap_udfs() 206 - bootstrap_ast_validators() 207 194 208 - engine = OspreyEngine(sources_provider=sources_provider, udf_registry=udf_registry) 195 + engine, udf_helpers = bootstrap_engine_with_helpers(sources_provider=sources_provider) 209 196 210 197 # Output Sink 211 198 output_sink = bootstrap_output_sinks(config) ··· 274 261 275 262 postgres.init_from_config('osprey_db') 276 263 277 - sources_provider = EtcdSourcesProvider( 278 - etcd_key=config.get_str('OSPREY_ETCD_SOURCES_PROVIDER_KEY', '/config/osprey/rules-sink-sources') 279 - ) 280 - udf_registry, _ = bootstrap_udfs() 281 - bootstrap_ast_validators() 282 - 283 - engine = OspreyEngine(sources_provider, udf_registry) 264 + engine = bootstrap_engine() 284 265 285 266 analytics_pubsub_project_id = config.get_str('PUBSUB_DATA_PROJECT_ID', 'osprey-dev') 286 267 analytics_pubsub_topic_id = config.get_str('PUBSUB_ANALYTICS_EVENT_TOPIC_ID', 'osprey-analytics') ··· 325 306 ) -> None: 326 307 # TODO: Clean up this copy pasta. 327 308 config = init_config() 328 - sources_provider = EtcdSourcesProvider( 329 - etcd_key=config.get_str('OSPREY_ETCD_SOURCES_PROVIDER_KEY', '/config/osprey/rules-sink-sources') 330 - ) 331 309 postgres.init_from_config('osprey_db') 332 - udf_registry, _ = bootstrap_udfs() 333 - bootstrap_ast_validators() 334 310 335 - engine = OspreyEngine(sources_provider, udf_registry) 311 + engine = bootstrap_engine() 336 312 337 313 analytics_pubsub_project_id = config.get_str('PUBSUB_DATA_PROJECT_ID', 'osprey-dev') 338 314 analytics_pubsub_topic_id = config.get_str('PUBSUB_ANALYTICS_EVENT_TOPIC_ID', 'osprey-analytics')
+2 -9
osprey_worker/src/osprey/worker/lib/cli.py
··· 31 31 EntityMutation, 32 32 LabelStatus, 33 33 ) 34 - from osprey.worker.adaptor.plugin_manager import bootstrap_ast_validators, bootstrap_udfs # noqa: E402 35 - from osprey.worker.lib.osprey_engine import OspreyEngine # noqa: E402 34 + from osprey.worker.lib.osprey_engine import bootstrap_engine # noqa: E402 36 35 from osprey.worker.lib.publisher import PubSubPublisher # noqa: E402 37 36 from osprey.worker.lib.singletons import CONFIG # noqa: E402 38 - from osprey.worker.lib.sources_provider import EtcdSourcesProvider # noqa: E402 39 37 from osprey.worker.lib.sources_publisher import ( # noqa: E402 40 38 upload_dependencies_mapping, 41 39 validate_and_push, ··· 273 271 config.configure_from_env() 274 272 275 273 postgres.init_from_config('osprey_db') 276 - sources_provider = EtcdSourcesProvider( 277 - etcd_key=config.get_str('OSPREY_ETCD_SOURCES_PROVIDER_KEY', '/config/osprey/rules-sink-sources') 278 - ) 279 - udf_registery, _ = bootstrap_udfs() 280 - bootstrap_ast_validators() 281 - engine = OspreyEngine(sources_provider, udf_registery) 274 + engine = bootstrap_engine() 282 275 analytics_pubsub_project_id = config.get_str('PUBSUB_DATA_PROJECT_ID', 'osprey-dev') 283 276 analytics_pubsub_topic_id = config.get_str('PUBSUB_ANALYTICS_EVENT_TOPIC_ID', 'osprey-analytics') 284 277 analytics_publisher = PubSubPublisher(analytics_pubsub_project_id, analytics_pubsub_topic_id)
+31 -1
osprey_worker/src/osprey/worker/lib/osprey_engine.py
··· 2 2 from dataclasses import dataclass 3 3 from pathlib import Path 4 4 from time import time 5 - from typing import Callable, Dict, Generic, List, Optional, Set, Type, TypeVar 5 + from typing import Callable, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar 6 6 7 7 import gevent 8 8 import gevent.pool ··· 299 299 start_col = span.start_pos 300 300 301 301 return '\n'.join(extracted_snippet) 302 + 303 + 304 + def bootstrap_engine_with_helpers( 305 + sources_provider: Optional[BaseSourcesProvider] = None, 306 + ) -> Tuple[OspreyEngine, UDFHelpers]: 307 + # Avoid circular imports 308 + from osprey.worker.adaptor.plugin_manager import bootstrap_ast_validators, bootstrap_udfs 309 + 310 + udf_registry, udf_helpers = bootstrap_udfs() 311 + bootstrap_ast_validators() 312 + 313 + if not sources_provider: 314 + # Use static rules path if configured, otherwise use etcd 315 + config = CONFIG.instance() 316 + rules_path_str = config.get_optional_str('OSPREY_RULES_PATH') 317 + rules_path = Path(rules_path_str) if rules_path_str else None 318 + sources_provider = get_sources_provider(rules_path=rules_path) 319 + 320 + return ( 321 + OspreyEngine( 322 + sources_provider=sources_provider, 323 + udf_registry=udf_registry, 324 + should_yield_during_compilation=should_yield_during_compilation(), 325 + ), 326 + udf_helpers, 327 + ) 328 + 329 + 330 + def bootstrap_engine(sources_provider: Optional[BaseSourcesProvider] = None) -> OspreyEngine: 331 + return bootstrap_engine_with_helpers(sources_provider)[0]
+3 -25
osprey_worker/src/osprey/worker/lib/singletons.py
··· 13 13 CONFIG_REGISTRY: Singleton[ConfigRegistry] = Singleton(lambda: get_config_registry().clone()) 14 14 15 15 16 - def _init_engine() -> 'OspreyEngine': 17 - # Avoid circular imports 18 - from pathlib import Path 19 - 20 - from osprey.worker.adaptor.plugin_manager import bootstrap_ast_validators, bootstrap_udfs 21 - from osprey.worker.lib.data_exporters.validation_result_exporter import get_validation_result_exporter 22 - from osprey.worker.lib.osprey_engine import ( 23 - OspreyEngine, 24 - get_sources_provider, 25 - should_yield_during_compilation, 26 - ) 27 - 28 - udf_registry, _ = bootstrap_udfs() 29 - bootstrap_ast_validators() 30 - 31 - # Use static rules path if configured, otherwise use etcd 32 - config = CONFIG.instance() 33 - rules_path_str = config.get_str('OSPREY_RULES_PATH', '') 34 - rules_path = Path(rules_path_str) if rules_path_str else None 16 + def _init_engine(): 17 + from osprey.worker.lib.osprey_engine import bootstrap_engine 35 18 36 - return OspreyEngine( 37 - sources_provider=get_sources_provider(rules_path=rules_path), 38 - udf_registry=udf_registry, 39 - should_yield_during_compilation=should_yield_during_compilation(), 40 - validation_exporter=get_validation_result_exporter(), 41 - ) 19 + return bootstrap_engine() 42 20 43 21 44 22 ENGINE: Singleton['OspreyEngine'] = Singleton(_init_engine)
+8 -8
osprey_worker/src/osprey/worker/lib/storage/access_audit_log.py
··· 1 1 import dataclasses 2 - import datetime 3 2 from typing import Optional 4 3 5 4 from osprey.engine.utils.types import add_slots 6 - from osprey.worker.lib.storage.bigtable import osprey_bigtable 7 5 8 6 9 7 @add_slots ··· 25 23 response_body: bytes 26 24 27 25 def persist(self) -> None: 28 - row = osprey_bigtable.table('audit_log').row(self.id.to_bytes(8, 'big')) 29 - timestamp = datetime.datetime.utcnow() 30 - for k in dataclasses.fields(self): 31 - v = getattr(self, k.name) 32 - row.set_cell('audit_log', k.name.encode(), v.encode() if hasattr(v, 'encode') else v, timestamp=timestamp) 33 - osprey_bigtable.table('audit_log').mutate_rows([row]) 26 + pass 27 + # we should move this store to postgres, or some other plugin. 28 + # row = osprey_bigtable.table('audit_log').row(self.id.to_bytes(8, 'big')) 29 + # timestamp = datetime.datetime.utcnow() 30 + # for k in dataclasses.fields(self): 31 + # v = getattr(self, k.name) 32 + # row.set_cell('audit_log', k.name.encode(), v.encode() if hasattr(v, 'encode') else v, timestamp=timestamp) 33 + # osprey_bigtable.table('audit_log').mutate_rows([row])
+2 -5
osprey_worker/src/osprey/worker/lib/tests/test_utils.py
··· 5 5 import pytest 6 6 from flask import Flask 7 7 from osprey.engine.ast.sources import Sources 8 - from osprey.worker.adaptor.plugin_manager import bootstrap_ast_validators, bootstrap_udfs 9 - from osprey.worker.lib.osprey_engine import OspreyEngine 8 + from osprey.worker.lib.osprey_engine import bootstrap_engine 10 9 from osprey.worker.lib.singletons import CONFIG, ENGINE 11 10 from osprey.worker.lib.sources_provider import StaticSourcesProvider 12 11 from osprey.worker.lib.sources_publisher import validate_and_push ··· 86 85 sources = Sources.from_dict(sources_to_use) 87 86 assert validate_and_push(sources, quiet=True, dry_run=True) 88 87 sources_provider = StaticSourcesProvider(sources) 89 - udf_registry, _ = bootstrap_udfs() 90 - bootstrap_ast_validators() 91 - engine = OspreyEngine(sources_provider, udf_registry) 88 + engine = bootstrap_engine(sources_provider=sources_provider) 92 89 93 90 with ENGINE.override_instance_for_test(engine): 94 91 CONFIG.instance().unconfigure_for_tests()