this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Cleanup

alice 6df6e060 74cbfdc7

+11 -39
+11 -39
app.py
··· 1 1 import os 2 2 import json 3 3 import asyncio 4 - import aiohttp 5 4 import logging 6 5 import multiprocessing 6 + from functools import partial 7 + from typing import Dict 8 + 7 9 from fastapi import FastAPI, HTTPException 8 10 from pydantic import BaseModel 9 - from typing import Dict 10 11 from concurrent.futures import ThreadPoolExecutor 11 - from functools import partial 12 + import aiohttp 12 13 from tenacity import ( 13 14 retry, 14 15 stop_after_attempt, 15 16 wait_random_exponential, 16 17 retry_if_exception_type, 17 18 ) 19 + 18 20 from atmst.cartool import print_all_records 19 21 20 22 ··· 44 46 45 47 46 48 class RetryableError(Exception): 47 - """Exception raised for retryable errors.""" 48 - 49 49 pass 50 50 51 51 52 52 class NonRetryableError(Exception): 53 - """Exception raised for non-retryable errors.""" 54 - 55 53 pass 56 54 57 55 58 - def parse_car(car_file_path: str): 56 + def parse_car(car_file_path: str) -> str: 59 57 return json.dumps(print_all_records(car_file_path, True)) 60 58 61 59 ··· 73 71 async def fetch_car_with_retry( 74 72 session: aiohttp.ClientSession, url: str, headers: Dict[str, str], did: str 75 73 ) -> bytes: 76 - """ 77 - Fetches the CAR file with retry logic using tenacity. 78 - 79 - Args: 80 - session (aiohttp.ClientSession): The HTTP session. 81 - url (str): The URL to fetch. 82 - headers (Dict[str, str]): The request headers. 83 - did (str): The DID being fetched (for logging purposes). 84 - 85 - Returns: 86 - bytes: The fetched CAR file bytes. 87 - 88 - Raises: 89 - NonRetryableError: For non-retryable HTTP status codes. 90 - RetryableError: For retryable HTTP status codes or exceptions. 91 - """ 92 74 try: 93 75 async with session.get(url, headers=headers) as response: 94 76 if response.status == 200: ··· 96 78 if not car_bytes: 97 79 logging.error("Received empty CAR file.") 98 80 raise NonRetryableError("Received empty CAR file.") 99 - return car_bytes # Successful fetch 81 + return car_bytes 100 82 elif response.status in {429, 500, 502, 503, 504}: 101 83 logging.warning( 102 84 f"Received HTTP {response.status} for DID {did}. Retrying..." ··· 117 99 118 100 119 101 @app.post("/fetch") 120 - async def fetch_car_file(request: FetchRequest): 121 - """ 122 - Fetches the CAR file for the given DID and PDS, parses it, saves it, 123 - and returns the extracted data as JSON. 124 - """ 125 - did = request.did 126 - pds = request.pds 127 - 102 + async def fetch_car_file(request: FetchRequest) -> str: 103 + did, pds = request.did, request.pds 128 104 url = f"https://{pds}/xrpc/com.atproto.sync.getRepo?did={did}" 129 105 headers = { 130 106 "Accept": "application/vnd.ipld.car", ··· 133 109 134 110 logging.info(f"Fetching CAR file for DID: {did} from PDS: {pds}") 135 111 136 - # Fetch CAR file with retries using tenacity 137 112 try: 138 113 async with aiohttp.ClientSession( 139 114 timeout=aiohttp.ClientTimeout(total=60) ··· 152 127 logging.error(f"Unexpected error while fetching CAR file for DID {did}: {e}") 153 128 raise HTTPException(status_code=500, detail="Internal server error.") 154 129 155 - car_file_path = None 130 + # Save CAR file 156 131 try: 157 - # Create filename from DID, replacing colons with underscores 158 132 filename = f"{did.replace(':', '_')}.car" 159 133 car_file_path = os.path.join(CAR_FILES_DIR, filename) 160 134 161 - # Delete existing file if it exists 162 135 if os.path.exists(car_file_path): 163 136 os.remove(car_file_path) 164 137 logging.info(f"Deleted existing CAR file: {car_file_path}") 165 138 166 - # Save the new CAR file 167 139 with open(car_file_path, "wb") as f: 168 140 f.write(car_bytes) 169 141 logging.info(f"Saved CAR file to {car_file_path}") ··· 171 143 logging.error(f"Error saving CAR file: {e}") 172 144 raise HTTPException(status_code=500, detail="Error saving CAR file.") 173 145 174 - # Parse CAR file in thread pool 146 + # Parse CAR file 175 147 try: 176 148 loop = asyncio.get_running_loop() 177 149 parsed_data = await loop.run_in_executor(