···11+# Byte-compiled / optimized / DLL files
22+__pycache__/
33+*.py[cod]
44+*$py.class
55+66+# C extensions
77+*.so
88+99+# Distribution / packaging
1010+.Python
1111+build/
1212+develop-eggs/
1313+dist/
1414+downloads/
1515+eggs/
1616+.eggs/
1717+lib/
1818+lib64/
1919+parts/
2020+sdist/
2121+var/
2222+wheels/
2323+share/python-wheels/
2424+*.egg-info/
2525+.installed.cfg
2626+*.egg
2727+MANIFEST
2828+2929+# PyInstaller
3030+# Usually these files are written by a python script from a template
3131+# before PyInstaller builds the exe, so as to inject date/other infos into it.
3232+*.manifest
3333+*.spec
3434+3535+# Installer logs
3636+pip-log.txt
3737+pip-delete-this-directory.txt
3838+3939+# Unit test / coverage reports
4040+htmlcov/
4141+.tox/
4242+.nox/
4343+.coverage
4444+.coverage.*
4545+.cache
4646+nosetests.xml
4747+coverage.xml
4848+*.cover
4949+*.py,cover
5050+.hypothesis/
5151+.pytest_cache/
5252+cover/
5353+5454+# Translations
5555+*.mo
5656+*.pot
5757+5858+# Django stuff:
5959+*.log
6060+local_settings.py
6161+db.sqlite3
6262+db.sqlite3-journal
6363+6464+# Flask stuff:
6565+instance/
6666+.webassets-cache
6767+6868+# Scrapy stuff:
6969+.scrapy
7070+7171+# Sphinx documentation
7272+docs/_build/
7373+7474+# PyBuilder
7575+.pybuilder/
7676+target/
7777+7878+# Jupyter Notebook
7979+.ipynb_checkpoints
8080+8181+# IPython
8282+profile_default/
8383+ipython_config.py
8484+8585+# pyenv
8686+# For a library or package, you might want to ignore these files since the code is
8787+# intended to run in multiple environments; otherwise, check them in:
8888+# .python-version
8989+9090+# pipenv
9191+# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
9292+# However, in case of collaboration, if having platform-specific dependencies or dependencies
9393+# having no cross-platform support, pipenv may install dependencies that don't work, or not
9494+# install all needed dependencies.
9595+#Pipfile.lock
9696+9797+# poetry
9898+# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
9999+# This is especially recommended for binary packages to ensure reproducibility, and is more
100100+# commonly ignored for libraries.
101101+# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
102102+#poetry.lock
103103+104104+# pdm
105105+# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
106106+#pdm.lock
107107+# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
108108+# in version control.
109109+# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
110110+.pdm.toml
111111+.pdm-python
112112+.pdm-build/
113113+114114+# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
115115+__pypackages__/
116116+117117+# Celery stuff
118118+celerybeat-schedule
119119+celerybeat.pid
120120+121121+# SageMath parsed files
122122+*.sage.py
123123+124124+# Environments
125125+.env
126126+.venv
127127+env/
128128+venv/
129129+ENV/
130130+env.bak/
131131+venv.bak/
132132+133133+# Spyder project settings
134134+.spyderproject
135135+.spyproject
136136+137137+# Rope project settings
138138+.ropeproject
139139+140140+# mkdocs documentation
141141+/site
142142+143143+# mypy
144144+.mypy_cache/
145145+.dmypy.json
146146+dmypy.json
147147+148148+# Pyre type checker
149149+.pyre/
150150+151151+# pytype static type analyzer
152152+.pytype/
153153+154154+# Cython debug symbols
155155+cython_debug/
156156+157157+# PyCharm
158158+# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
159159+# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
160160+# and can be added to the global gitignore or merged into this file. For a more nuclear
161161+# option (not recommended) you can uncomment the following to ignore the entire idea folder.
162162+#.idea/
163163+164164+car_files
+185
app.py
···11+import os
22+import json
33+import asyncio
44+import aiohttp
55+import logging
66+import multiprocessing
77+from fastapi import FastAPI, HTTPException
88+from pydantic import BaseModel
99+from typing import Dict
1010+from concurrent.futures import ThreadPoolExecutor
1111+from functools import partial
1212+from tenacity import (
1313+ retry,
1414+ stop_after_attempt,
1515+ wait_random_exponential,
1616+ retry_if_exception_type,
1717+)
1818+from atmst.cartool import print_all_records
1919+2020+2121+# Configure logging
2222+logging.basicConfig(
2323+ level=logging.INFO,
2424+ format="%(asctime)s [%(levelname)s] %(message)s",
2525+ datefmt="%Y-%m-%d %H:%M:%S",
2626+ handlers=[logging.FileHandler("car_service.log"), logging.StreamHandler()],
2727+)
2828+2929+app = FastAPI(title="CAR File Fetcher and Parser")
3030+3131+CAR_FILES_DIR = "car_files"
3232+os.makedirs(CAR_FILES_DIR, exist_ok=True)
3333+3434+MAX_RETRIES = 10
3535+INITIAL_BACKOFF = 1
3636+BACKOFF_FACTOR = 2
3737+3838+executor = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
3939+4040+4141+class FetchRequest(BaseModel):
4242+ did: str
4343+ pds: str
4444+4545+4646+class RetryableError(Exception):
4747+ """Exception raised for retryable errors."""
4848+4949+ pass
5050+5151+5252+class NonRetryableError(Exception):
5353+ """Exception raised for non-retryable errors."""
5454+5555+ pass
5656+5757+5858+def parse_car(car_file_path: str):
5959+ return json.dumps(print_all_records(car_file_path, True))
6060+6161+6262+@retry(
6363+ reraise=True,
6464+ stop=stop_after_attempt(MAX_RETRIES),
6565+ wait=wait_random_exponential(
6666+ multiplier=INITIAL_BACKOFF, min=INITIAL_BACKOFF, max=60
6767+ ),
6868+ retry=(
6969+ retry_if_exception_type(RetryableError)
7070+ | retry_if_exception_type(asyncio.TimeoutError)
7171+ ),
7272+)
7373+async def fetch_car_with_retry(
7474+ session: aiohttp.ClientSession, url: str, headers: Dict[str, str], did: str
7575+) -> bytes:
7676+ """
7777+ Fetches the CAR file with retry logic using tenacity.
7878+7979+ Args:
8080+ session (aiohttp.ClientSession): The HTTP session.
8181+ url (str): The URL to fetch.
8282+ headers (Dict[str, str]): The request headers.
8383+ did (str): The DID being fetched (for logging purposes).
8484+8585+ Returns:
8686+ bytes: The fetched CAR file bytes.
8787+8888+ Raises:
8989+ NonRetryableError: For non-retryable HTTP status codes.
9090+ RetryableError: For retryable HTTP status codes or exceptions.
9191+ """
9292+ try:
9393+ async with session.get(url, headers=headers) as response:
9494+ if response.status == 200:
9595+ car_bytes = await response.read()
9696+ if not car_bytes:
9797+ logging.error("Received empty CAR file.")
9898+ raise NonRetryableError("Received empty CAR file.")
9999+ return car_bytes # Successful fetch
100100+ elif response.status in {429, 500, 502, 503, 504}:
101101+ logging.warning(
102102+ f"Received HTTP {response.status} for DID {did}. Retrying..."
103103+ )
104104+ raise RetryableError(f"HTTP {response.status} error.")
105105+ else:
106106+ logging.error(f"Failed to fetch CAR file: HTTP {response.status}")
107107+ raise NonRetryableError(f"HTTP {response.status} error.")
108108+ except aiohttp.ClientResponseError as e:
109109+ logging.error(f"Client response error for DID {did}: {e}")
110110+ raise RetryableError(str(e))
111111+ except asyncio.TimeoutError:
112112+ logging.warning(f"Timeout while fetching DID {did}. Retrying...")
113113+ raise RetryableError("Timeout error.")
114114+ except aiohttp.ClientError as e:
115115+ logging.error(f"Client error for DID {did}: {e}")
116116+ raise RetryableError(str(e))
117117+118118+119119+@app.post("/fetch")
120120+async def fetch_car_file(request: FetchRequest):
121121+ """
122122+ Fetches the CAR file for the given DID and PDS, parses it, saves it,
123123+ and returns the extracted data as JSON.
124124+ """
125125+ did = request.did
126126+ pds = request.pds
127127+128128+ url = f"https://{pds}/xrpc/com.atproto.sync.getRepo?did={did}"
129129+ headers = {
130130+ "Accept": "application/vnd.ipld.car",
131131+ "User-Agent": "emojistats-backfiller/0.0.1",
132132+ }
133133+134134+ logging.info(f"Fetching CAR file for DID: {did} from PDS: {pds}")
135135+136136+ # Fetch CAR file with retries using tenacity
137137+ try:
138138+ async with aiohttp.ClientSession(
139139+ timeout=aiohttp.ClientTimeout(total=60)
140140+ ) as session:
141141+ car_bytes = await fetch_car_with_retry(session, url, headers, did)
142142+ except RetryableError as e:
143143+ logging.error(f"Retryable error fetching CAR file for DID {did}: {e}")
144144+ raise HTTPException(
145145+ status_code=502,
146146+ detail=f"Failed to fetch CAR file after {MAX_RETRIES} attempts.",
147147+ )
148148+ except NonRetryableError as e:
149149+ logging.error(f"Non-retryable error fetching CAR file for DID {did}: {e}")
150150+ raise HTTPException(status_code=502, detail=str(e))
151151+ except Exception as e:
152152+ logging.error(f"Unexpected error while fetching CAR file for DID {did}: {e}")
153153+ raise HTTPException(status_code=500, detail="Internal server error.")
154154+155155+ car_file_path = None
156156+ try:
157157+ # Create filename from DID, replacing colons with underscores
158158+ filename = f"{did.replace(':', '_')}.car"
159159+ car_file_path = os.path.join(CAR_FILES_DIR, filename)
160160+161161+ # Delete existing file if it exists
162162+ if os.path.exists(car_file_path):
163163+ os.remove(car_file_path)
164164+ logging.info(f"Deleted existing CAR file: {car_file_path}")
165165+166166+ # Save the new CAR file
167167+ with open(car_file_path, "wb") as f:
168168+ f.write(car_bytes)
169169+ logging.info(f"Saved CAR file to {car_file_path}")
170170+ except Exception as e:
171171+ logging.error(f"Error saving CAR file: {e}")
172172+ raise HTTPException(status_code=500, detail="Error saving CAR file.")
173173+174174+ # Parse CAR file in thread pool
175175+ try:
176176+ loop = asyncio.get_running_loop()
177177+ parsed_data = await loop.run_in_executor(
178178+ executor, partial(parse_car, car_file_path)
179179+ )
180180+ logging.info(f"Parsed CAR file for DID: {did}")
181181+ except Exception as e:
182182+ logging.error(f"Error parsing CAR file: {e}")
183183+ raise HTTPException(status_code=500, detail="Error parsing CAR file.")
184184+185185+ return parsed_data