audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(atproto): always refresh on 401 + widen transient-error retry net (#1332)

authored by

nate nowack and committed by
GitHub
9ecb6a93 608cb733

+212 -47
+92 -44
backend/src/backend/_internal/atproto/client.py
··· 1 1 """low-level ATProto PDS client with OAuth and token refresh.""" 2 2 3 3 import asyncio 4 - import json 5 4 import logging 6 5 from datetime import UTC, datetime, timedelta 7 6 from typing import Any, BinaryIO ··· 25 24 def pds_blob_url(pds_url: str, did: str, cid: str) -> str: 26 25 """construct a public URL to fetch a blob from a PDS.""" 27 26 return f"{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 27 + 28 + 29 + def _describe_exc(e: BaseException) -> str: 30 + """produce a non-empty, type-qualified description of an exception. 31 + 32 + some exception types (notably httpx.RemoteProtocolError with an empty 33 + h11 reason, asyncio.CancelledError, and bare HTTPError subclasses) 34 + stringify to "", which makes downstream error logs and user-visible 35 + messages useless. always surface the exception type; fall back to the 36 + repr if str is empty. 37 + """ 38 + msg = str(e) 39 + if msg: 40 + return f"{type(e).__name__}: {msg}" 41 + return f"{type(e).__name__}: {e!r}" if repr(e) else type(e).__name__ 42 + 43 + 44 + # httpx / httpcore exception classes we treat as transient and retry once 45 + # on before giving up. covers connection drops, read-half failures, 46 + # protocol-level errors (remote closed before fully responding), 47 + # timeouts, and pool exhaustion. 48 + _TRANSIENT_HTTP_ERRORS: tuple[type[BaseException], ...] = ( 49 + httpx.ReadError, 50 + httpx.ConnectError, 51 + httpx.RemoteProtocolError, 52 + httpx.TimeoutException, 53 + httpx.PoolTimeout, 54 + httpcore.ReadError, 55 + httpcore.ConnectError, 56 + httpcore.RemoteProtocolError, 57 + ) 28 58 29 59 30 60 class PayloadTooLargeError(Exception): ··· 218 248 219 249 oauth_session = reconstruct_oauth_session(oauth_data) 220 250 url = f"{oauth_data['pds_url']}/xrpc/{endpoint}" 251 + response = None # defensive: bind before the loop so error paths can read it 221 252 222 253 for attempt in range(2): 223 254 kwargs: dict[str, Any] = {} ··· 233 264 url=url, 234 265 **kwargs, 235 266 ) 236 - except ( 237 - httpx.ReadError, 238 - httpx.ConnectError, 239 - httpcore.ReadError, 240 - httpcore.ConnectError, 241 - ) as e: 267 + except _TRANSIENT_HTTP_ERRORS as e: 242 268 if attempt == 0: 243 269 logger.warning( 244 - f"PDS network error for {auth_session.did}, retrying: {type(e).__name__}: {e}" 270 + f"PDS network error for {auth_session.did}, retrying: {_describe_exc(e)}" 245 271 ) 246 272 await asyncio.sleep(1) 247 273 continue 248 274 raise Exception( 249 - f"PDS request failed after retry: {type(e).__name__}: {e}" 275 + f"PDS request failed after retry: {_describe_exc(e)}" 250 276 ) from e 251 277 252 278 if response.status_code in success_codes: ··· 254 280 return {} 255 281 return response.json() 256 282 257 - # token expired - refresh and retry 283 + # token expired - refresh and retry. previously gated on the response 284 + # body containing "exp" in its message, but under concurrent load the 285 + # PDS can return 401 with an empty body, a body that can't be parsed, 286 + # or a body whose message differs across PDS implementations — in 287 + # which case we'd silently skip the refresh and raise a useless error. 288 + # always attempt refresh on a first-attempt 401; if the refresh itself 289 + # is transient-flaky, retry the refresh once before giving up. 258 290 if response.status_code == 401 and attempt == 0: 291 + logger.info( 292 + f"access token expired or rejected for {auth_session.did}; refreshing" 293 + ) 259 294 try: 260 - error_data = response.json() 261 - if "exp" in error_data.get("message", ""): 262 - logger.info( 263 - f"access token expired for {auth_session.did}, attempting refresh" 264 - ) 265 - oauth_session = await _refresh_session_tokens( 266 - auth_session, oauth_session 267 - ) 268 - continue 269 - except (json.JSONDecodeError, KeyError): 270 - pass 295 + oauth_session = await _refresh_session_tokens( 296 + auth_session, oauth_session 297 + ) 298 + except _TRANSIENT_HTTP_ERRORS as refresh_exc: 299 + logger.warning( 300 + f"token refresh hit transient error, retrying once: {_describe_exc(refresh_exc)}" 301 + ) 302 + await asyncio.sleep(1) 303 + oauth_session = await _refresh_session_tokens( 304 + auth_session, oauth_session 305 + ) 306 + continue 271 307 272 - raise Exception(f"PDS request failed: {response.status_code} {response.text}") 308 + # response should always be bound here (attempt==1 branch), but defensive 309 + # check keeps the error path sane if the loop structure changes. 310 + if response is None: 311 + raise Exception("PDS request failed: no response received") 312 + raise Exception( 313 + f"PDS request failed: {response.status_code} {response.text or '<empty body>'}" 314 + ) 273 315 274 316 275 317 async def upload_blob( ··· 304 346 # read data if it's a file-like object 305 347 blob_data = data if isinstance(data, bytes) else data.read() 306 348 349 + response = None # defensive: bind before the loop 350 + 307 351 for attempt in range(2): 308 352 try: 309 353 response = await get_oauth_client().make_authenticated_request( ··· 313 357 content=blob_data, 314 358 headers={"Content-Type": content_type}, 315 359 ) 316 - except ( 317 - httpx.ReadError, 318 - httpx.ConnectError, 319 - httpcore.ReadError, 320 - httpcore.ConnectError, 321 - ) as e: 360 + except _TRANSIENT_HTTP_ERRORS as e: 322 361 if attempt == 0: 323 362 logger.warning( 324 - f"PDS blob upload network error for {auth_session.did}, retrying: {type(e).__name__}: {e}" 363 + f"PDS blob upload network error for {auth_session.did}, retrying: {_describe_exc(e)}" 325 364 ) 326 365 await asyncio.sleep(1) 327 366 continue 328 367 raise Exception( 329 - f"blob upload failed after retry: {type(e).__name__}: {e}" 368 + f"blob upload failed after retry: {_describe_exc(e)}" 330 369 ) from e 331 370 332 371 if response.status_code == 200: ··· 335 374 # payload too large - PDS rejects due to size limit 336 375 if response.status_code == 413: 337 376 raise PayloadTooLargeError( 338 - f"blob too large for PDS (limit exceeded): {response.text}" 377 + f"blob too large for PDS (limit exceeded): {response.text or '<empty body>'}" 339 378 ) 340 379 341 - # token expired - refresh and retry 380 + # token expired - refresh and retry. unconditional on first-attempt 381 + # 401 (see rationale in make_pds_request). 342 382 if response.status_code == 401 and attempt == 0: 383 + logger.info( 384 + f"access token expired or rejected for {auth_session.did}; refreshing" 385 + ) 343 386 try: 344 - error_data = response.json() 345 - if "exp" in error_data.get("message", ""): 346 - logger.info( 347 - f"access token expired for {auth_session.did}, attempting refresh" 348 - ) 349 - oauth_session = await _refresh_session_tokens( 350 - auth_session, oauth_session 351 - ) 352 - continue 353 - except (json.JSONDecodeError, KeyError): 354 - pass 387 + oauth_session = await _refresh_session_tokens( 388 + auth_session, oauth_session 389 + ) 390 + except _TRANSIENT_HTTP_ERRORS as refresh_exc: 391 + logger.warning( 392 + f"token refresh hit transient error, retrying once: {_describe_exc(refresh_exc)}" 393 + ) 394 + await asyncio.sleep(1) 395 + oauth_session = await _refresh_session_tokens( 396 + auth_session, oauth_session 397 + ) 398 + continue 355 399 356 - raise Exception(f"blob upload failed: {response.status_code} {response.text}") 400 + if response is None: 401 + raise Exception("blob upload failed: no response received") 402 + raise Exception( 403 + f"blob upload failed: {response.status_code} {response.text or '<empty body>'}" 404 + ) 357 405 358 406 359 407 def parse_at_uri(uri: str) -> tuple[str, str, str]:
+7 -2
backend/src/backend/api/tracks/uploads.py
··· 771 771 raise ValueError("PDS returned no record data") 772 772 _, atproto_cid = atproto_result 773 773 except Exception as e: 774 - logger.error("ATProto sync failed for upload %s: %s", ctx.upload_id, e) 774 + # always include the exception type in the surfaced message — some 775 + # exception classes (notably httpx.RemoteProtocolError with an empty 776 + # h11 reason) stringify to "", which makes downstream error logs and 777 + # the failed-job error field useless. 778 + err_detail = f"{type(e).__name__}: {e!s}" if str(e) else type(e).__name__ 779 + logger.error("ATProto sync failed for upload %s: %s", ctx.upload_id, err_detail) 775 780 # only delete the row if it's still pending — on ambiguous failures 776 781 # (timeouts, connection drops) Jetstream may have already finalized it 777 782 deleted_pending = False ··· 797 802 await storage.delete(image_id) 798 803 # else: Jetstream finalized the row — media belongs to the published track 799 804 800 - raise UploadPhaseError(f"failed to sync track to ATProto: {e}") from e 805 + raise UploadPhaseError(f"failed to sync track to ATProto: {err_detail}") from e 801 806 802 807 # step 3: atomic CAS update pending → published + deferred album linkage 803 808 async with db_session() as db:
+112
backend/tests/test_pds_network_retry.py
··· 182 182 await upload_blob(mock_auth_session, b"huge-audio", "audio/mpeg") 183 183 184 184 assert mock_client.make_authenticated_request.call_count == 1 185 + 186 + 187 + class TestMakePdsRequestAuthRefresh: 188 + """make_pds_request refreshes and retries on 401 regardless of body shape. 189 + 190 + regression for the 2026-04-24 concurrent-upload flake: under load the PDS 191 + can return 401 with an empty body or a body whose message doesn't contain 192 + 'exp'. the previous implementation silently skipped refresh in those 193 + cases, raising an error with an empty/cryptic message. the refresh path 194 + is now unconditional on first-attempt 401s. 195 + """ 196 + 197 + async def test_refreshes_on_401_with_empty_body( 198 + self, mock_auth_session: AuthSession 199 + ) -> None: 200 + unauthorized_empty_body = _mock_response(401, json_data={}) 201 + unauthorized_empty_body.text = "" 202 + ok_response = _mock_response(200, {"uri": "at://test"}) 203 + mock_client = AsyncMock() 204 + mock_client.make_authenticated_request = AsyncMock( 205 + side_effect=[unauthorized_empty_body, ok_response] 206 + ) 207 + 208 + with ( 209 + patch( 210 + "backend._internal.atproto.client.get_oauth_client", 211 + return_value=mock_client, 212 + ), 213 + patch( 214 + "backend._internal.atproto.client._refresh_session_tokens", 215 + new_callable=AsyncMock, 216 + return_value=mock_auth_session.oauth_session, 217 + ) as mock_refresh, 218 + ): 219 + # _refresh_session_tokens returns the oauth_session-equivalent, 220 + # so spoof it with something reconstruct_oauth_session-compatible 221 + mock_refresh.return_value = MagicMock() 222 + result = await make_pds_request( 223 + mock_auth_session, 224 + "POST", 225 + "com.atproto.repo.createRecord", 226 + ) 227 + 228 + assert result == {"uri": "at://test"} 229 + assert mock_client.make_authenticated_request.call_count == 2 230 + mock_refresh.assert_awaited_once() 231 + 232 + async def test_refreshes_on_401_with_non_exp_message( 233 + self, mock_auth_session: AuthSession 234 + ) -> None: 235 + # PDSes vary on their 401 body — some return "invalid_token", some 236 + # omit the message, some say "unauthorized". refresh must fire for 237 + # all of them, not just when 'exp' happens to be in the string. 238 + unauthorized = _mock_response( 239 + 401, json_data={"error": "InvalidToken", "message": "unauthorized"} 240 + ) 241 + ok_response = _mock_response(200, {"uri": "at://test"}) 242 + mock_client = AsyncMock() 243 + mock_client.make_authenticated_request = AsyncMock( 244 + side_effect=[unauthorized, ok_response] 245 + ) 246 + 247 + with ( 248 + patch( 249 + "backend._internal.atproto.client.get_oauth_client", 250 + return_value=mock_client, 251 + ), 252 + patch( 253 + "backend._internal.atproto.client._refresh_session_tokens", 254 + new_callable=AsyncMock, 255 + ) as mock_refresh, 256 + ): 257 + mock_refresh.return_value = MagicMock() 258 + result = await make_pds_request( 259 + mock_auth_session, 260 + "POST", 261 + "com.atproto.repo.createRecord", 262 + ) 263 + 264 + assert result == {"uri": "at://test"} 265 + mock_refresh.assert_awaited_once() 266 + 267 + 268 + class TestMakePdsRequestTransientErrors: 269 + """make_pds_request retries the newly-covered transient httpx errors.""" 270 + 271 + async def test_retries_on_remote_protocol_error( 272 + self, mock_auth_session: AuthSession 273 + ) -> None: 274 + # httpx.RemoteProtocolError stringifies to "" when the h11 reason is 275 + # blank — the exact class that surfaced the silent failure today. 276 + ok_response = _mock_response(200, {"uri": "at://test"}) 277 + mock_client = AsyncMock() 278 + mock_client.make_authenticated_request = AsyncMock( 279 + side_effect=[ 280 + httpx.RemoteProtocolError(""), 281 + ok_response, 282 + ] 283 + ) 284 + 285 + with patch( 286 + "backend._internal.atproto.client.get_oauth_client", 287 + return_value=mock_client, 288 + ): 289 + result = await make_pds_request( 290 + mock_auth_session, 291 + "POST", 292 + "com.atproto.repo.createRecord", 293 + ) 294 + 295 + assert result == {"uri": "at://test"} 296 + assert mock_client.make_authenticated_request.call_count == 2
+1 -1
loq.toml
··· 40 40 41 41 [[rules]] 42 42 path = "backend/src/backend/api/tracks/uploads.py" 43 - max_lines = 1333 43 + max_lines = 1338 44 44 45 45 [[rules]] 46 46 path = "backend/src/backend/config.py"