Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Input Stream Plugin (#10)

authored by

Ethan Breder and committed by
GitHub
6b8ea467 4c095160

+165 -146
+1
docker-compose.yaml
··· 73 73 environment: 74 74 - PYTHONPATH=/osprey 75 75 - PORT=5000 76 + - OSPREY_INPUT_STREAM_SOURCE=kafka 76 77 - OSPREY_STDOUT_OUTPUT_SINK=True 77 78 - OSPREY_KAFKA_BOOTSTRAP_SERVERS=["kafka:29092"] 78 79 - OSPREY_KAFKA_INPUT_STREAM_TOPIC=osprey.actions_input
+1 -1
entrypoint.sh
··· 32 32 } 33 33 34 34 cli-osprey-worker() { 35 - exec uv run python3.11 osprey_worker/src/osprey/worker/cli/sinks.py run-rules-sink --input kafka --rules-path ./example_rules 35 + exec uv run python3.11 osprey_worker/src/osprey/worker/cli/sinks.py run-rules-sink --rules-path ./example_rules 36 36 } 37 37 38 38 cli-run-tests() {
+8
osprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.py
··· 4 4 5 5 import pluggy 6 6 from osprey.engine.ast_validator.base_validator import BaseValidator 7 + from osprey.engine.executor.execution_context import Action 7 8 from osprey.engine.udf.base import UDFBase 8 9 from osprey.worker.adaptor.constants import OSPREY_ADAPTOR 9 10 from osprey.worker.lib.action_proto_deserializer import ActionProtoDeserializer 11 + from osprey.worker.sinks.sink.input_stream import BaseInputStream 12 + from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext 10 13 11 14 if TYPE_CHECKING: 12 15 from osprey.worker.lib.config import Config ··· 37 40 def register_action_proto_deserializer() -> ActionProtoDeserializer | None: 38 41 """Register a custom deserializer to convert custom Action proto into JSON.""" 39 42 raise NotImplementedError('register_action_proto_deserializers must be implemented by the plugin') 43 + 44 + 45 + @hookspec(firstresult=True) 46 + def register_input_stream() -> BaseInputStream[BaseAckingContext[Action]]: 47 + raise NotImplementedError('register_input_stream must be implemented by the plugin')
+14
osprey_worker/src/osprey/worker/adaptor/plugin_manager.py
··· 5 5 6 6 import pluggy 7 7 from osprey.engine.ast_validator import ValidatorRegistry 8 + from osprey.engine.executor.execution_context import Action 8 9 from osprey.engine.executor.udf_execution_helpers import HasHelper, UDFHelpers 9 10 from osprey.engine.udf.base import UDFBase 10 11 from osprey.engine.udf.registry import UDFRegistry ··· 12 13 from osprey.worker.adaptor.hookspecs import osprey_hooks 13 14 from osprey.worker.lib.action_proto_deserializer import ActionProtoDeserializer 14 15 from osprey.worker.lib.storage.labels import HasLabelProvider 16 + from osprey.worker.sinks.sink.input_stream import BaseInputStream 15 17 from osprey.worker.sinks.sink.output_sink import BaseOutputSink, MultiOutputSink 18 + from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext 16 19 17 20 if TYPE_CHECKING: 18 21 from osprey.worker.lib.config import Config ··· 77 80 return deserializer 78 81 except Exception: 79 82 return None 83 + 84 + 85 + def bootstrap_input_stream() -> BaseInputStream[BaseAckingContext[Action]] | None: 86 + load_all_osprey_plugins() 87 + 88 + streams = plugin_manager.hook.register_input_stream() 89 + if streams: 90 + # spec has firstresult=True set, so at most it will be one. 91 + return streams[0] 92 + else: 93 + return None
+8 -19
osprey_worker/src/osprey/worker/cli/sinks.py
··· 1 1 # mypy: ignore-errors 2 2 # ruff: noqa: E402, E501 3 3 from osprey.worker.lib.patcher import patch_all 4 + from osprey.worker.sinks.input_stream_chooser import get_rules_sink_input_stream 4 5 from osprey.worker.sinks.sink.output_sink import EventEffectsOutputSink 5 6 6 7 patch_all(ddtrace_args={'cassandra': True, 'psycopg': True}) ··· 46 47 from osprey.worker.lib.storage import postgres 47 48 from osprey.worker.lib.storage.bigtable import osprey_bigtable 48 49 from osprey.worker.lib.storage.bulk_label_task import BulkLabelTask 49 - from osprey.worker.lib.utils.click_utils import EnumChoice 50 50 from osprey.worker.lib.utils.input_stream_ready_signaler import InputStreamReadySignaler 51 51 from osprey.worker.sinks import ( 52 52 InputStreamSource, 53 - OutputSinkDestination, 54 - get_rules_sink_input_stream, 55 53 ) 56 54 from osprey.worker.sinks.sink.base_sink import BaseSink, PooledSink 57 55 from osprey.worker.sinks.sink.bulk_label_sink import BulkLabelSink ··· 115 113 116 114 @cli.command() 117 115 @click.option( 118 - '--input', 119 - 'input_stream_source', 120 - default=InputStreamSource.PUBSUB, 121 - type=EnumChoice(InputStreamSource), 122 - help='Where to ingest data from.', 123 - ) 124 - @click.option( 125 - '--output', 126 - 'output_sink_destination', 127 - default=OutputSinkDestination.OSPREY, 128 - type=EnumChoice(OutputSinkDestination), 129 - help='Where to send the processed events to.', 130 - ) 131 - @click.option( 132 116 '--rules-path', 133 117 type=click.Path(dir_okay=True, file_okay=False, exists=True), 134 118 help='Which rules to use. If not provided uses the rules in etcd.', ··· 147 131 help='Create base tables', 148 132 ) 149 133 def run_rules_sink( 150 - input_stream_source: InputStreamSource, 151 - output_sink_destination: OutputSinkDestination, 152 134 rules_path: Optional[str], 153 135 pooled: bool, 154 136 bootstrap_pubsub: bool, ··· 167 149 bootstrap_ast_validators() 168 150 169 151 engine = OspreyEngine(sources_provider=sources_provider, udf_registry=udf_registry) 152 + 153 + input_stream_source_string = config.get_str('OSPREY_INPUT_STREAM_SOURCE', 'plugin') 154 + try: 155 + input_stream_source = InputStreamSource(input_stream_source_string.lower()) 156 + except ValueError: 157 + raise NotImplementedError(f'{input_stream_source_string} is not a valid input stream source.') 158 + 170 159 input_stream = get_rules_sink_input_stream(input_stream_source) 171 160 output_sink = bootstrap_output_sinks(config=config) 172 161
+6 -126
osprey_worker/src/osprey/worker/sinks/__init__.py
··· 8 8 but not ideal as clients should be responsible for patching 9 9 """ 10 10 11 - import platform 12 - 13 11 from osprey.worker.lib.patcher import patch_all 14 12 15 13 patch_all() 16 14 17 - import random 18 - from datetime import datetime, timedelta 19 - from enum import Enum, auto 20 - 21 - from google.cloud import pubsub_v1 22 - from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor 23 - from osprey.engine.executor.execution_context import Action 24 - from osprey.worker.lib.singletons import CONFIG 25 - from osprey.worker.sinks.sink.input_stream import ( 26 - AsyncPubSubOspreyActionInputStream, 27 - BaseInputStream, 28 - StaticInputStream, 29 - ) 30 - from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream 31 - from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext 32 - from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer 15 + from enum import StrEnum, auto 33 16 34 17 35 - class InputStreamSource(Enum): 18 + class InputStreamSource(StrEnum): 36 19 """Where data for the input to the classification engine are sourced.""" 37 20 38 21 PUBSUB = auto() ··· 51 34 KAFKA = auto() 52 35 """Sources events from kafka.""" 53 36 37 + PLUGIN = auto() 38 + """Sources events from whatever a plugin defines via register_input_stream.""" 54 39 55 - class OutputSinkDestination(Enum): 40 + 41 + class OutputSinkDestination(StrEnum): 56 42 """Where the data of a classified event should be sent.""" 57 43 58 44 OSPREY = auto() ··· 61 47 62 48 STDOUT = auto() 63 49 """Prints the output to standard out. Good for local development or debugging.""" 64 - 65 - 66 - def get_rules_sink_input_stream( 67 - input_stream_source: InputStreamSource, 68 - ) -> BaseInputStream[BaseAckingContext[Action]]: 69 - """Based on the `input_stream_source` constructs a configured input stream that can be used to source events to 70 - classify. For more details, see `InputStreamSource`.""" 71 - 72 - if input_stream_source == InputStreamSource.PUBSUB: 73 - config = CONFIG.instance() 74 - gcloud_project = config.get_str('PUBSUB_OSPREY_PROJECT_ID', 'osprey-dev') 75 - pubsub_subscription = config.get_str('PUBSUB_OSPREY_RULES_SINK_SUBSCRIPTION', 'rules-sink') 76 - subscriber = pubsub_v1.SubscriberClient() 77 - subscription_path = subscriber.subscription_path(gcloud_project, pubsub_subscription) 78 - batch_size = config.get_int('PUBSUB_OSPREY_INPUT_BATCH_SIZE', 100) 79 - gevent_queue_size = config.get_int('PUBSUB_OSPREY_INPUT_STREAM_GEVENT_QUEUE_SIZE', 1000) 80 - kek_uri = config.get_str('PUBSUB_ENCRYPTION_KEY_URI', '') 81 - return AsyncPubSubOspreyActionInputStream( 82 - subscriber=subscriber, 83 - subscription_path=subscription_path, 84 - kek_uri=kek_uri, 85 - max_messages=batch_size, 86 - gevent_queue_size=gevent_queue_size, 87 - ) 88 - 89 - elif input_stream_source == InputStreamSource.OSPREY_COORDINATOR: 90 - return OspreyCoordinatorInputStream(client_id='meow') 91 - 92 - elif input_stream_source == InputStreamSource.SYNTHETIC: 93 - random_actions = [] 94 - action_types = [ 95 - 'dm_channel_created', 96 - 'user_phone_verification_completed', 97 - 'guild_joined', 98 - 'guild_left', 99 - 'report_submitted', 100 - ] 101 - usernames = ['person_a', 'person_b', 'person_c', 'person_d', 'person_e', 'person_f', 'person_g'] 102 - http_headers_user_agent = [ 103 - 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) osprey/0.0.306 Chrome/78.0.3', 104 - 'Osprey/20414 CFNetwork/1126 Darwin/19.5.0', 105 - ] 106 - emails = [ 107 - 'watchanimeintheoffice@watchanimeintheoffice.com', 108 - 'dorime@interimoayapare.com', 109 - 'icarus@pippa.org', 110 - '/@test-entity-id-uri-encoding.com', 111 - ] 112 - ips = [ 113 - '127.0.0.1', 114 - '231.11.46.123', 115 - '148.123.7.13', 116 - '191.201.12.201', 117 - ] 118 - 119 - now = datetime.utcnow() 120 - 121 - for i in range(random.randint(1000, 5000)): 122 - data = { 123 - 'user': { 124 - 'id': random.getrandbits(32), 125 - 'username': random.choice(usernames), 126 - 'email': random.choice(emails), 127 - }, 128 - 'http_request': { 129 - 'headers': {'User-Agent': random.choice(http_headers_user_agent)}, 130 - 'remote_addr': random.choice(ips), 131 - }, 132 - 'remote_addr_mobile': random.choice(ips), 133 - 'guild': {'id': random.getrandbits(32), 'name': str(random.random())}, 134 - 'target': {'ip': random.choice(ips)}, 135 - } 136 - 137 - random_actions.append( 138 - NoopAckingContext( 139 - item=Action(action_id=i, action_name=random.choice(action_types), data=data, timestamp=now) 140 - ) 141 - ) 142 - now += timedelta(minutes=1) 143 - 144 - return StaticInputStream(random_actions) 145 - elif input_stream_source == InputStreamSource.KAFKA: 146 - config = CONFIG.instance() 147 - client_id = config.get_str('OSPREY_KAFKA_INPUT_STREAM_CLIENT_ID', platform.node()) 148 - client_id_suffix = config.get_optional_str('OSPREY_KAFKA_INPUT_STREAM_CLIENT_ID_SUFFIX') 149 - input_topic: str = config.get_str('OSPREY_KAFKA_INPUT_STREAM_TOPIC', 'osprey.actions_input') 150 - input_bootstrap_servers: list[str] = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) 151 - group_id = config.get_optional_str('OSPREY_KAFKA_GROUP_ID') 152 - 153 - if client_id_suffix: 154 - client_id = f'{client_id}-{client_id_suffix}' 155 - 156 - consumer: PatchedKafkaConsumer = PatchedKafkaConsumer( 157 - input_topic, 158 - bootstrap_servers=input_bootstrap_servers, 159 - client_id=client_id, 160 - group_id=group_id, 161 - partition_assignment_strategy=(RoundRobinPartitionAssignor,), 162 - ) 163 - from osprey.worker.sinks.sink.input_stream import KafkaInputStream 164 - 165 - return KafkaInputStream( 166 - kafka_consumer=consumer, 167 - ) 168 - else: 169 - raise AssertionError(f'Unknown rules sink input source: {input_stream_source}')
+127
osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py
··· 1 + import platform 2 + import random 3 + from datetime import datetime, timedelta 4 + 5 + from google.cloud import pubsub_v1 6 + from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor 7 + from osprey.engine.executor.execution_context import Action 8 + from osprey.worker.adaptor.plugin_manager import bootstrap_input_stream 9 + from osprey.worker.lib.singletons import CONFIG 10 + from osprey.worker.sinks import InputStreamSource 11 + from osprey.worker.sinks.sink.input_stream import ( 12 + AsyncPubSubOspreyActionInputStream, 13 + BaseInputStream, 14 + StaticInputStream, 15 + ) 16 + from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream 17 + from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext 18 + from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer 19 + 20 + 21 + def get_rules_sink_input_stream( 22 + input_stream_source: InputStreamSource, 23 + ) -> BaseInputStream[BaseAckingContext[Action]]: 24 + """Based on the `input_stream_source` constructs a configured input stream that can be used to source events to 25 + classify. For more details, see `InputStreamSource`.""" 26 + 27 + if input_stream_source == InputStreamSource.PUBSUB: 28 + config = CONFIG.instance() 29 + gcloud_project = config.get_str('PUBSUB_OSPREY_PROJECT_ID', 'osprey-dev') 30 + pubsub_subscription = config.get_str('PUBSUB_OSPREY_RULES_SINK_SUBSCRIPTION', 'rules-sink') 31 + subscriber = pubsub_v1.SubscriberClient() 32 + subscription_path = subscriber.subscription_path(gcloud_project, pubsub_subscription) 33 + batch_size = config.get_int('PUBSUB_OSPREY_INPUT_BATCH_SIZE', 100) 34 + gevent_queue_size = config.get_int('PUBSUB_OSPREY_INPUT_STREAM_GEVENT_QUEUE_SIZE', 1000) 35 + kek_uri = config.get_str('PUBSUB_ENCRYPTION_KEY_URI', '') 36 + return AsyncPubSubOspreyActionInputStream( 37 + subscriber=subscriber, 38 + subscription_path=subscription_path, 39 + kek_uri=kek_uri, 40 + max_messages=batch_size, 41 + gevent_queue_size=gevent_queue_size, 42 + ) 43 + 44 + elif input_stream_source == InputStreamSource.OSPREY_COORDINATOR: 45 + return OspreyCoordinatorInputStream(client_id='meow') 46 + 47 + elif input_stream_source == InputStreamSource.SYNTHETIC: 48 + random_actions = [] 49 + action_types = [ 50 + 'dm_channel_created', 51 + 'user_phone_verification_completed', 52 + 'guild_joined', 53 + 'guild_left', 54 + 'report_submitted', 55 + ] 56 + usernames = ['person_a', 'person_b', 'person_c', 'person_d', 'person_e', 'person_f', 'person_g'] 57 + http_headers_user_agent = [ 58 + 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) osprey/0.0.306 Chrome/78.0.3', 59 + 'Osprey/20414 CFNetwork/1126 Darwin/19.5.0', 60 + ] 61 + emails = [ 62 + 'watchanimeintheoffice@watchanimeintheoffice.com', 63 + 'dorime@interimoayapare.com', 64 + 'icarus@pippa.org', 65 + '/@test-entity-id-uri-encoding.com', 66 + ] 67 + ips = [ 68 + '127.0.0.1', 69 + '231.11.46.123', 70 + '148.123.7.13', 71 + '191.201.12.201', 72 + ] 73 + 74 + now = datetime.utcnow() 75 + 76 + for i in range(random.randint(1000, 5000)): 77 + data = { 78 + 'user': { 79 + 'id': random.getrandbits(32), 80 + 'username': random.choice(usernames), 81 + 'email': random.choice(emails), 82 + }, 83 + 'http_request': { 84 + 'headers': {'User-Agent': random.choice(http_headers_user_agent)}, 85 + 'remote_addr': random.choice(ips), 86 + }, 87 + 'remote_addr_mobile': random.choice(ips), 88 + 'guild': {'id': random.getrandbits(32), 'name': str(random.random())}, 89 + 'target': {'ip': random.choice(ips)}, 90 + } 91 + 92 + random_actions.append( 93 + NoopAckingContext( 94 + item=Action(action_id=i, action_name=random.choice(action_types), data=data, timestamp=now) 95 + ) 96 + ) 97 + now += timedelta(minutes=1) 98 + 99 + return StaticInputStream(random_actions) 100 + elif input_stream_source == InputStreamSource.KAFKA: 101 + config = CONFIG.instance() 102 + client_id = config.get_str('OSPREY_KAFKA_INPUT_STREAM_CLIENT_ID', platform.node()) 103 + client_id_suffix = config.get_optional_str('OSPREY_KAFKA_INPUT_STREAM_CLIENT_ID_SUFFIX') 104 + input_topic: str = config.get_str('OSPREY_KAFKA_INPUT_STREAM_TOPIC', 'osprey.actions_input') 105 + input_bootstrap_servers: list[str] = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) 106 + group_id = config.get_optional_str('OSPREY_KAFKA_GROUP_ID') 107 + 108 + if client_id_suffix: 109 + client_id = f'{client_id}-{client_id_suffix}' 110 + 111 + consumer: PatchedKafkaConsumer = PatchedKafkaConsumer( 112 + input_topic, 113 + bootstrap_servers=input_bootstrap_servers, 114 + client_id=client_id, 115 + group_id=group_id, 116 + partition_assignment_strategy=(RoundRobinPartitionAssignor,), 117 + ) 118 + from osprey.worker.sinks.sink.input_stream import KafkaInputStream 119 + 120 + return KafkaInputStream( 121 + kafka_consumer=consumer, 122 + ) 123 + elif input_stream_source == InputStreamSource.PLUGIN: 124 + stream = bootstrap_input_stream() 125 + if stream is None: 126 + raise AssertionError('No input stream plugin registered') 127 + return stream