audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #1107 from zzstoatzz/fix/jetstream-otel-and-upload-retry

fix: jetstream crash loop + PDS upload retry for network errors

authored by

nate nowack and committed by
GitHub
4234764c 671d2bb2

+232 -15
+47 -13
backend/src/backend/_internal/atproto/client.py
··· 6 6 from datetime import UTC, datetime, timedelta 7 7 from typing import Any, BinaryIO 8 8 9 + import httpcore 10 + import httpx 9 11 from atproto import AtUri 10 12 from atproto_oauth.models import OAuthSession 11 13 from cachetools import LRUCache ··· 224 226 if params: 225 227 kwargs["params"] = params 226 228 227 - response = await get_oauth_client().make_authenticated_request( 228 - session=oauth_session, 229 - method=method, 230 - url=url, 231 - **kwargs, 232 - ) 229 + try: 230 + response = await get_oauth_client().make_authenticated_request( 231 + session=oauth_session, 232 + method=method, 233 + url=url, 234 + **kwargs, 235 + ) 236 + except ( 237 + httpx.ReadError, 238 + httpx.ConnectError, 239 + httpcore.ReadError, 240 + httpcore.ConnectError, 241 + ) as e: 242 + if attempt == 0: 243 + logger.warning( 244 + f"PDS network error for {auth_session.did}, retrying: {type(e).__name__}: {e}" 245 + ) 246 + await asyncio.sleep(1) 247 + continue 248 + raise Exception( 249 + f"PDS request failed after retry: {type(e).__name__}: {e}" 250 + ) from e 233 251 234 252 if response.status_code in success_codes: 235 253 if response.status_code == 204: ··· 287 305 blob_data = data if isinstance(data, bytes) else data.read() 288 306 289 307 for attempt in range(2): 290 - response = await get_oauth_client().make_authenticated_request( 291 - session=oauth_session, 292 - method="POST", 293 - url=url, 294 - content=blob_data, 295 - headers={"Content-Type": content_type}, 296 - ) 308 + try: 309 + response = await get_oauth_client().make_authenticated_request( 310 + session=oauth_session, 311 + method="POST", 312 + url=url, 313 + content=blob_data, 314 + headers={"Content-Type": content_type}, 315 + ) 316 + except ( 317 + httpx.ReadError, 318 + httpx.ConnectError, 319 + httpcore.ReadError, 320 + httpcore.ConnectError, 321 + ) as e: 322 + if attempt == 0: 323 + logger.warning( 324 + f"PDS blob upload network error for {auth_session.did}, retrying: {type(e).__name__}: {e}" 325 + ) 326 + await asyncio.sleep(1) 327 + continue 328 + raise Exception( 329 + f"blob upload failed after retry: {type(e).__name__}: {e}" 330 + ) from e 297 331 298 332 if response.status_code == 200: 299 333 return response.json()["blob"]
+1 -2
backend/src/backend/_internal/jetstream.py
··· 201 201 # profile updates are a special case (nested collection) 202 202 if collection.endswith(".actor.profile") and operation == "update": 203 203 await docket.add(ingest_profile_update)(did=did, record=record or {}) 204 - logfire.info( 204 + logfire.debug( 205 205 "jetstream dispatched profile.update", 206 206 did=did, 207 - _level="debug", 208 207 ) 209 208 return 210 209
+184
backend/tests/test_pds_network_retry.py
··· 1 + """tests for PDS network error retry in make_pds_request and upload_blob.""" 2 + 3 + from unittest.mock import AsyncMock, MagicMock, patch 4 + 5 + import httpx 6 + import pytest 7 + from cryptography.hazmat.backends import default_backend 8 + from cryptography.hazmat.primitives import serialization 9 + from cryptography.hazmat.primitives.asymmetric import ec 10 + 11 + from backend._internal import Session as AuthSession 12 + from backend._internal.atproto.client import make_pds_request, upload_blob 13 + 14 + 15 + @pytest.fixture 16 + def mock_auth_session() -> AuthSession: 17 + """create mock auth session with valid OAuth data.""" 18 + private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) 19 + dpop_key_pem = private_key.private_bytes( 20 + encoding=serialization.Encoding.PEM, 21 + format=serialization.PrivateFormat.PKCS8, 22 + encryption_algorithm=serialization.NoEncryption(), 23 + ).decode("utf-8") 24 + 25 + return AuthSession( 26 + session_id="test-session", 27 + did="did:plc:testgoose", 28 + handle="goose.art", 29 + oauth_session={ 30 + "did": "did:plc:testgoose", 31 + "handle": "goose.art", 32 + "pds_url": "https://selfhosted.social", 33 + "authserver_iss": "https://selfhosted.social", 34 + "scope": "atproto transition:generic", 35 + "access_token": "test-token", 36 + "refresh_token": "test-refresh", 37 + "dpop_private_key_pem": dpop_key_pem, 38 + "dpop_authserver_nonce": "nonce1", 39 + "dpop_pds_nonce": "nonce2", 40 + }, 41 + ) 42 + 43 + 44 + def _mock_response(status_code: int = 200, json_data: dict | None = None) -> MagicMock: 45 + resp = MagicMock() 46 + resp.status_code = status_code 47 + resp.json.return_value = json_data or {} 48 + resp.text = "ok" 49 + return resp 50 + 51 + 52 + class TestMakePdsRequestNetworkRetry: 53 + """make_pds_request retries on transient network errors.""" 54 + 55 + async def test_retries_on_read_error_then_succeeds( 56 + self, mock_auth_session: AuthSession 57 + ) -> None: 58 + ok_response = _mock_response( 59 + 200, {"uri": "at://did:plc:testgoose/fm.plyr.track/abc"} 60 + ) 61 + mock_client = AsyncMock() 62 + mock_client.make_authenticated_request = AsyncMock( 63 + side_effect=[httpx.ReadError(""), ok_response] 64 + ) 65 + 66 + with patch( 67 + "backend._internal.atproto.client.get_oauth_client", 68 + return_value=mock_client, 69 + ): 70 + result = await make_pds_request( 71 + mock_auth_session, 72 + "POST", 73 + "com.atproto.repo.createRecord", 74 + payload={"repo": "did:plc:testgoose"}, 75 + ) 76 + 77 + assert result == {"uri": "at://did:plc:testgoose/fm.plyr.track/abc"} 78 + assert mock_client.make_authenticated_request.call_count == 2 79 + 80 + async def test_raises_after_two_read_errors( 81 + self, mock_auth_session: AuthSession 82 + ) -> None: 83 + mock_client = AsyncMock() 84 + mock_client.make_authenticated_request = AsyncMock( 85 + side_effect=httpx.ReadError("") 86 + ) 87 + 88 + with ( 89 + patch( 90 + "backend._internal.atproto.client.get_oauth_client", 91 + return_value=mock_client, 92 + ), 93 + pytest.raises(Exception, match="PDS request failed after retry"), 94 + ): 95 + await make_pds_request( 96 + mock_auth_session, 97 + "POST", 98 + "com.atproto.repo.createRecord", 99 + ) 100 + 101 + async def test_retries_on_connect_error( 102 + self, mock_auth_session: AuthSession 103 + ) -> None: 104 + ok_response = _mock_response(200, {"uri": "at://test"}) 105 + mock_client = AsyncMock() 106 + mock_client.make_authenticated_request = AsyncMock( 107 + side_effect=[httpx.ConnectError("connection reset"), ok_response] 108 + ) 109 + 110 + with patch( 111 + "backend._internal.atproto.client.get_oauth_client", 112 + return_value=mock_client, 113 + ): 114 + result = await make_pds_request( 115 + mock_auth_session, 116 + "GET", 117 + "com.atproto.repo.getRecord", 118 + ) 119 + 120 + assert result == {"uri": "at://test"} 121 + 122 + 123 + class TestUploadBlobNetworkRetry: 124 + """upload_blob retries on transient network errors.""" 125 + 126 + async def test_retries_on_read_error_then_succeeds( 127 + self, mock_auth_session: AuthSession 128 + ) -> None: 129 + blob_ref = { 130 + "$type": "blob", 131 + "ref": {"$link": "bafytest"}, 132 + "mimeType": "audio/mpeg", 133 + "size": 1024, 134 + } 135 + ok_response = _mock_response(200, {"blob": blob_ref}) 136 + mock_client = AsyncMock() 137 + mock_client.make_authenticated_request = AsyncMock( 138 + side_effect=[httpx.ReadError(""), ok_response] 139 + ) 140 + 141 + with patch( 142 + "backend._internal.atproto.client.get_oauth_client", 143 + return_value=mock_client, 144 + ): 145 + result = await upload_blob(mock_auth_session, b"fake-audio", "audio/mpeg") 146 + 147 + assert result == blob_ref 148 + assert mock_client.make_authenticated_request.call_count == 2 149 + 150 + async def test_raises_after_two_read_errors( 151 + self, mock_auth_session: AuthSession 152 + ) -> None: 153 + mock_client = AsyncMock() 154 + mock_client.make_authenticated_request = AsyncMock( 155 + side_effect=httpx.ReadError("") 156 + ) 157 + 158 + with ( 159 + patch( 160 + "backend._internal.atproto.client.get_oauth_client", 161 + return_value=mock_client, 162 + ), 163 + pytest.raises(Exception, match="blob upload failed after retry"), 164 + ): 165 + await upload_blob(mock_auth_session, b"fake-audio", "audio/mpeg") 166 + 167 + async def test_does_not_retry_payload_too_large( 168 + self, mock_auth_session: AuthSession 169 + ) -> None: 170 + response_413 = _mock_response(413) 171 + response_413.text = "payload too large" 172 + mock_client = AsyncMock() 173 + mock_client.make_authenticated_request = AsyncMock(return_value=response_413) 174 + 175 + with ( 176 + patch( 177 + "backend._internal.atproto.client.get_oauth_client", 178 + return_value=mock_client, 179 + ), 180 + pytest.raises(Exception, match="blob too large"), 181 + ): 182 + await upload_blob(mock_auth_session, b"huge-audio", "audio/mpeg") 183 + 184 + assert mock_client.make_authenticated_request.call_count == 1