Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add postgres result store (#171)

Co-authored-by: Shalabh Agarwal <me@serendipty01.dev>

authored by

Shalabh Agarwal
Shalabh Agarwal
and committed by
GitHub
6bbc9a93 756ee78a

+145 -3
+9 -1
CHANGELOG.md
··· 7 7 8 8 ## [Unreleased] 9 9 10 - (No changes yet. Add entries here before cutting a release.) 10 + ### 📚 3rd party library updates 11 + 12 + ### 🛠 Breaking changes 13 + 14 + ### 🎉 New features 15 + - Add Postgres execution result store ([#171](https://github.com/roostorg/osprey/pull/171) by [@serendipty01](https://github.com/serendipty01)) 16 + 17 + ### 🐛 Bug fixes 18 +
+3
osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py
··· 8 8 StoredExecutionResultBigTable, 9 9 StoredExecutionResultGCS, 10 10 StoredExecutionResultMinIO, 11 + StoredExecutionResultPostgres, 11 12 ) 12 13 13 14 ··· 33 34 return StoredExecutionResultMinIO( 34 35 endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name 35 36 ) 37 + elif backend_type == ExecutionResultStorageBackendType.POSTGRES: 38 + return StoredExecutionResultPostgres() 36 39 elif backend_type == ExecutionResultStorageBackendType.PLUGIN: 37 40 store = bootstrap_execution_result_store(config=config) 38 41 if store is None:
+5
osprey_worker/src/osprey/worker/lib/storage/__init__.py
··· 26 26 Minio execution result store 27 27 """ 28 28 29 + POSTGRES = auto() 30 + """ 31 + Postgres execution result store 32 + """ 33 + 29 34 PLUGIN = auto() 30 35 """ 31 36 Execution result store that is defined via register_execution_result_store
+52
osprey_worker/src/osprey/worker/lib/storage/pg_stored_execution.py
··· 1 + from typing import Any, Dict, List, Optional 2 + 3 + from sqlalchemy import BigInteger, Column 4 + from sqlalchemy.dialects.postgresql import JSONB 5 + 6 + from .postgres import Model, scoped_session 7 + 8 + 9 + class PgStoredExecutionResult(Model): 10 + """ 11 + `stored_execution_result` stores results as jsonb 12 + """ 13 + 14 + __tablename__ = 'stored_execution_result' 15 + 16 + id = Column(BigInteger, primary_key=True) 17 + payload = Column(JSONB, nullable=False) 18 + 19 + @classmethod 20 + def insert( 21 + cls, 22 + id: int, 23 + payload: Dict[str, Any], 24 + ) -> None: 25 + """ 26 + Adds a single `execution_result` to the database 27 + """ 28 + with scoped_session(commit=True) as session: 29 + execution_result = PgStoredExecutionResult(id=id, payload=payload) 30 + session.add(execution_result) 31 + 32 + @classmethod 33 + def select_one(cls, id: int) -> Optional['PgStoredExecutionResult']: 34 + """ 35 + Gets stored execution result with id if it exists 36 + """ 37 + with scoped_session() as session: 38 + execution_result: Optional['PgStoredExecutionResult'] = ( 39 + session.query(PgStoredExecutionResult).filter(PgStoredExecutionResult.id == id).one_or_none() 40 + ) 41 + return execution_result 42 + 43 + @classmethod 44 + def select_many(cls, ids: List[int]) -> List['PgStoredExecutionResult']: 45 + """ 46 + Gets list of stored execution results with given ids 47 + """ 48 + with scoped_session() as session: 49 + execution_results: List['PgStoredExecutionResult'] = ( 50 + session.query(PgStoredExecutionResult).filter(PgStoredExecutionResult.id.in_(ids)).all() 51 + ) 52 + return execution_results
+1
osprey_worker/src/osprey/worker/lib/storage/postgres.py
··· 53 53 from . import ( # noqa: F401 54 54 bulk_action_task, 55 55 bulk_label_task, 56 + pg_stored_execution, 56 57 queries, 57 58 temporary_ability_token, 58 59 )
+75 -2
osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py
··· 5 5 from abc import ABC, abstractmethod 6 6 from datetime import datetime 7 7 from io import BytesIO 8 - from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence 8 + from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast 9 9 10 10 import gevent 11 11 import google.cloud.storage as storage ··· 19 19 from osprey.worker.lib.instruments import metrics 20 20 from osprey.worker.lib.osprey_shared.logging import get_logger 21 21 from osprey.worker.lib.snowflake import Snowflake 22 - from osprey.worker.lib.storage import ExecutionResultStorageBackendType 22 + from osprey.worker.lib.storage import ExecutionResultStorageBackendType, postgres 23 23 from osprey.worker.lib.storage.bigtable import osprey_bigtable 24 + from osprey.worker.lib.storage.pg_stored_execution import PgStoredExecutionResult 24 25 from pydantic.main import BaseModel 25 26 26 27 logger = get_logger() ··· 456 457 457 458 @staticmethod 458 459 def _execution_result_dict_from_minio_data(data: Dict[str, Any]) -> Dict[str, Any]: 460 + execution_result_dict = { 461 + 'id': data['id'], 462 + 'extracted_features': data['extracted_features'], 463 + 'error_traces': data['error_traces'], 464 + 'timestamp': datetime.fromisoformat(data['timestamp']), 465 + 'action_data': None, 466 + } 467 + 468 + action_data = data.get('action_data') 469 + if action_data: 470 + execution_result_dict['action_data'] = action_data 471 + 472 + return execution_result_dict 473 + 474 + 475 + class StoredExecutionResultPostgres(ExecutionResultStore): 476 + def __init__(self) -> None: 477 + postgres.init_from_config('osprey_db') 478 + 479 + def select_one(self, action_id: int) -> Optional[Dict[str, Any]]: 480 + try: 481 + with metrics.timed('pg_stored_execution_result.get_one'): 482 + result = PgStoredExecutionResult.select_one(action_id) 483 + if not result: 484 + metrics.increment( 485 + 'pg_stored_execution_result.select_one.not_found', tags=[f'action_id:{action_id}'] 486 + ) 487 + return None 488 + payload = cast(Dict[str, Any], result.payload) 489 + return StoredExecutionResultPostgres._execution_result_dict_from_pg_data(payload) 490 + 491 + except Exception as e: 492 + logger.error(f'Failed to retrieve execution result from PG for action_id {action_id}: {e}') 493 + return None 494 + 495 + def select_many(self, action_ids: List[int]) -> List[Dict[str, Any]]: 496 + try: 497 + with metrics.timed('pg_stored_execution_result.get_many'): 498 + results = PgStoredExecutionResult.select_many(action_ids) 499 + return [ 500 + StoredExecutionResultPostgres._execution_result_dict_from_pg_data( 501 + cast(Dict[str, Any], result.payload) 502 + ) 503 + for result in results 504 + ] 505 + except Exception as e: 506 + logger.error(f'Failed to retrieve execution results from PG for action_ids {action_ids}: {e}') 507 + return [] 508 + 509 + def insert( 510 + self, 511 + action_id: int, 512 + extracted_features_json: str, 513 + error_traces_json: str, 514 + timestamp: datetime, 515 + action_data_json: str, 516 + ) -> None: 517 + try: 518 + with metrics.timed('pg_stored_execution_result.insert'): 519 + payload: Dict[str, Any] = { 520 + 'id': action_id, 521 + 'extracted_features': extracted_features_json, 522 + 'error_traces': error_traces_json, 523 + 'timestamp': timestamp.isoformat(), 524 + 'action_data': action_data_json, 525 + } 526 + PgStoredExecutionResult.insert(action_id, payload) 527 + except Exception as e: 528 + logger.error(f'Failed to insert execution result into PG for action_id {action_id}: {e}') 529 + 530 + @staticmethod 531 + def _execution_result_dict_from_pg_data(data: Dict[str, Any]) -> Dict[str, Any]: 459 532 execution_result_dict = { 460 533 'id': data['id'], 461 534 'extracted_features': data['extracted_features'],