audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(uploads): per-DID concurrency cap + exponential backoff on PDS calls (#1333)

authored by

nate nowack and committed by
GitHub
9ed734c3 9ecb6a93

+100 -31
+69 -25
backend/src/backend/_internal/atproto/client.py
··· 41 41 return f"{type(e).__name__}: {e!r}" if repr(e) else type(e).__name__ 42 42 43 43 44 - # httpx / httpcore exception classes we treat as transient and retry once 45 - # on before giving up. covers connection drops, read-half failures, 44 + # httpx / httpcore exception classes we treat as transient and retry on 45 + # before giving up. covers connection drops, read-half failures, 46 46 # protocol-level errors (remote closed before fully responding), 47 47 # timeouts, and pool exhaustion. 48 48 _TRANSIENT_HTTP_ERRORS: tuple[type[BaseException], ...] = ( ··· 55 55 httpcore.ConnectError, 56 56 httpcore.RemoteProtocolError, 57 57 ) 58 + 59 + # max attempts for a single PDS request (including the initial try). 60 + # backoff schedule between attempts: element N is the sleep BEFORE 61 + # attempt N+1 runs. 4 attempts with 1s/2s/4s gives exponential-ish 62 + # backoff that totals ~7s of deliberate sleep across all retries, 63 + # on top of whatever time the underlying connect/read took. 64 + _PDS_MAX_ATTEMPTS = 4 65 + _PDS_BACKOFF_SCHEDULE: tuple[float, ...] = (1.0, 2.0, 4.0) 66 + 67 + 68 + def _backoff_for_attempt(attempt: int) -> float: 69 + """seconds to sleep AFTER a failed attempt of index `attempt`.""" 70 + return _PDS_BACKOFF_SCHEDULE[min(attempt, len(_PDS_BACKOFF_SCHEDULE) - 1)] 58 71 59 72 60 73 class PayloadTooLargeError(Exception): ··· 249 262 oauth_session = reconstruct_oauth_session(oauth_data) 250 263 url = f"{oauth_data['pds_url']}/xrpc/{endpoint}" 251 264 response = None # defensive: bind before the loop so error paths can read it 265 + has_refreshed = False 252 266 253 - for attempt in range(2): 267 + for attempt in range(_PDS_MAX_ATTEMPTS): 254 268 kwargs: dict[str, Any] = {} 255 269 if payload: 256 270 kwargs["json"] = payload ··· 265 279 **kwargs, 266 280 ) 267 281 except _TRANSIENT_HTTP_ERRORS as e: 268 - if attempt == 0: 282 + if attempt < _PDS_MAX_ATTEMPTS - 1: 283 + backoff = _backoff_for_attempt(attempt) 269 284 logger.warning( 270 - f"PDS network error for {auth_session.did}, retrying: {_describe_exc(e)}" 285 + f"PDS network error for {auth_session.did} on attempt " 286 + f"{attempt + 1}/{_PDS_MAX_ATTEMPTS}, backing off {backoff}s: " 287 + f"{_describe_exc(e)}" 271 288 ) 272 - await asyncio.sleep(1) 289 + await asyncio.sleep(backoff) 273 290 continue 274 291 raise Exception( 275 - f"PDS request failed after retry: {_describe_exc(e)}" 292 + f"PDS request failed after {_PDS_MAX_ATTEMPTS} attempts: {_describe_exc(e)}" 276 293 ) from e 277 294 278 295 if response.status_code in success_codes: ··· 280 297 return {} 281 298 return response.json() 282 299 283 - # token expired - refresh and retry. previously gated on the response 284 - # body containing "exp" in its message, but under concurrent load the 285 - # PDS can return 401 with an empty body, a body that can't be parsed, 286 - # or a body whose message differs across PDS implementations — in 287 - # which case we'd silently skip the refresh and raise a useless error. 288 - # always attempt refresh on a first-attempt 401; if the refresh itself 289 - # is transient-flaky, retry the refresh once before giving up. 290 - if response.status_code == 401 and attempt == 0: 300 + # 401: token expired or rejected. always attempt refresh on the first 301 + # 401 we see (under concurrent load PDSes return 401 bodies with 302 + # varying shapes, including empty — gating on "exp" in the message 303 + # silently skipped refresh before). if the refresh itself is flaky, 304 + # retry it once before giving up. 305 + if response.status_code == 401 and not has_refreshed: 306 + has_refreshed = True 291 307 logger.info( 292 308 f"access token expired or rejected for {auth_session.did}; refreshing" 293 309 ) ··· 305 321 ) 306 322 continue 307 323 308 - # response should always be bound here (attempt==1 branch), but defensive 309 - # check keeps the error path sane if the loop structure changes. 324 + # 5xx: upstream is failing, worth a backoff + retry 325 + if 500 <= response.status_code < 600 and attempt < _PDS_MAX_ATTEMPTS - 1: 326 + backoff = _backoff_for_attempt(attempt) 327 + logger.warning( 328 + f"PDS {response.status_code} for {auth_session.did} on attempt " 329 + f"{attempt + 1}/{_PDS_MAX_ATTEMPTS}, backing off {backoff}s" 330 + ) 331 + await asyncio.sleep(backoff) 332 + continue 333 + 334 + # 4xx other than 401, or 5xx on the last attempt, or a repeat 401 335 + # post-refresh: stop retrying and surface the error. 336 + break 337 + 310 338 if response is None: 311 339 raise Exception("PDS request failed: no response received") 312 340 raise Exception( ··· 347 375 blob_data = data if isinstance(data, bytes) else data.read() 348 376 349 377 response = None # defensive: bind before the loop 378 + has_refreshed = False 350 379 351 - for attempt in range(2): 380 + for attempt in range(_PDS_MAX_ATTEMPTS): 352 381 try: 353 382 response = await get_oauth_client().make_authenticated_request( 354 383 session=oauth_session, ··· 358 387 headers={"Content-Type": content_type}, 359 388 ) 360 389 except _TRANSIENT_HTTP_ERRORS as e: 361 - if attempt == 0: 390 + if attempt < _PDS_MAX_ATTEMPTS - 1: 391 + backoff = _backoff_for_attempt(attempt) 362 392 logger.warning( 363 - f"PDS blob upload network error for {auth_session.did}, retrying: {_describe_exc(e)}" 393 + f"PDS blob upload network error for {auth_session.did} on " 394 + f"attempt {attempt + 1}/{_PDS_MAX_ATTEMPTS}, backing off " 395 + f"{backoff}s: {_describe_exc(e)}" 364 396 ) 365 - await asyncio.sleep(1) 397 + await asyncio.sleep(backoff) 366 398 continue 367 399 raise Exception( 368 - f"blob upload failed after retry: {_describe_exc(e)}" 400 + f"blob upload failed after {_PDS_MAX_ATTEMPTS} attempts: {_describe_exc(e)}" 369 401 ) from e 370 402 371 403 if response.status_code == 200: ··· 377 409 f"blob too large for PDS (limit exceeded): {response.text or '<empty body>'}" 378 410 ) 379 411 380 - # token expired - refresh and retry. unconditional on first-attempt 381 - # 401 (see rationale in make_pds_request). 382 - if response.status_code == 401 and attempt == 0: 412 + # 401: refresh once, then retry (same rationale as make_pds_request). 413 + if response.status_code == 401 and not has_refreshed: 414 + has_refreshed = True 383 415 logger.info( 384 416 f"access token expired or rejected for {auth_session.did}; refreshing" 385 417 ) ··· 396 428 auth_session, oauth_session 397 429 ) 398 430 continue 431 + 432 + # 5xx: backoff and retry 433 + if 500 <= response.status_code < 600 and attempt < _PDS_MAX_ATTEMPTS - 1: 434 + backoff = _backoff_for_attempt(attempt) 435 + logger.warning( 436 + f"PDS blob upload {response.status_code} for {auth_session.did} " 437 + f"on attempt {attempt + 1}/{_PDS_MAX_ATTEMPTS}, backing off {backoff}s" 438 + ) 439 + await asyncio.sleep(backoff) 440 + continue 441 + 442 + break 399 443 400 444 if response is None: 401 445 raise Exception("blob upload failed: no response received")
+9
backend/src/backend/api/tracks/audio_replace.py
··· 29 29 from urllib.parse import urljoin 30 30 31 31 import logfire 32 + from docket import ConcurrencyLimit 32 33 from fastapi import ( 33 34 Depends, 34 35 File, ··· 545 546 async def run_track_audio_replace( 546 547 job_id: str, 547 548 session_id: str, 549 + user_did: str, 548 550 track_id: int, 549 551 file_path: str, 550 552 filename: str, 553 + concurrency: ConcurrencyLimit = ConcurrencyLimit("user_did", max_concurrent=3), 551 554 ) -> None: 552 555 """docket task entry point for audio replace. 553 556 ··· 556 559 ReplaceContext, and delegates to the phase orchestrator 557 560 (`_process_replace_background`). this is the function registered with 558 561 docket; the HTTP handler enqueues it via `schedule_track_audio_replace`. 562 + 563 + `ConcurrencyLimit("user_did", max_concurrent=3)` caps concurrent 564 + replaces per user's DID at 3 — same as the upload task. prevents a 565 + user kicking off many replaces at once from overwhelming their PDS's 566 + connection-limit tolerance. 559 567 """ 560 568 auth_session = await get_session(session_id) 561 569 if auth_session is None: ··· 589 597 await docket.add(run_track_audio_replace)( 590 598 job_id=ctx.job_id, 591 599 session_id=ctx.auth_session.session_id, 600 + user_did=ctx.auth_session.did, 592 601 track_id=ctx.track_id, 593 602 file_path=ctx.file_path, 594 603 filename=ctx.filename,
+11
backend/src/backend/api/tracks/uploads.py
··· 12 12 13 13 import aiofiles 14 14 import logfire 15 + from docket import ConcurrencyLimit 15 16 from fastapi import ( 16 17 Depends, 17 18 File, ··· 995 996 support_gate: dict | None, 996 997 auto_tag: bool, 997 998 unlisted: bool, 999 + concurrency: ConcurrencyLimit = ConcurrencyLimit("artist_did", max_concurrent=3), 998 1000 ) -> None: 999 1001 """docket task entry point for track uploads. 1000 1002 ··· 1007 1009 rehydrating the session at task start rather than passing the cached 1008 1010 AuthSession over the wire means we pick up any token refresh that 1009 1011 happened between the HTTP request and the worker picking up the task. 1012 + 1013 + the `ConcurrencyLimit("artist_did", max_concurrent=3)` caps concurrent 1014 + uploads per user's DID at 3. a 12-track album upload does not produce 1015 + 12 parallel `createRecord` calls against the user's PDS (which would 1016 + exceed the typical PDS's connection-limit + rate-limit tolerance and 1017 + cause ConnectTimeouts). instead the task queue trickles uploads 1018 + through 3 at a time. user-visible latency for the slowest track in a 1019 + large album goes up, but every track publishes successfully rather 1020 + than 1-2 silently failing on upstream PDS throttling. 1010 1021 """ 1011 1022 auth_session = await get_session(session_id) 1012 1023 if auth_session is None:
+9 -4
backend/tests/test_pds_network_retry.py
··· 77 77 assert result == {"uri": "at://did:plc:testgoose/fm.plyr.track/abc"} 78 78 assert mock_client.make_authenticated_request.call_count == 2 79 79 80 - async def test_raises_after_two_read_errors( 80 + async def test_raises_after_all_attempts_exhausted( 81 81 self, mock_auth_session: AuthSession 82 82 ) -> None: 83 + # with exponential-backoff retries the client now makes 84 + # _PDS_MAX_ATTEMPTS attempts before giving up. supply enough 85 + # ReadErrors to exhaust them all. backoffs are real sleeps 86 + # (1s + 2s + 4s = 7s between attempts) — unavoidable for 87 + # this test, but only one test pays the cost. 83 88 mock_client = AsyncMock() 84 89 mock_client.make_authenticated_request = AsyncMock( 85 90 side_effect=httpx.ReadError("") ··· 90 95 "backend._internal.atproto.client.get_oauth_client", 91 96 return_value=mock_client, 92 97 ), 93 - pytest.raises(Exception, match="PDS request failed after retry"), 98 + pytest.raises(Exception, match=r"PDS request failed after \d+ attempts"), 94 99 ): 95 100 await make_pds_request( 96 101 mock_auth_session, ··· 147 152 assert result == blob_ref 148 153 assert mock_client.make_authenticated_request.call_count == 2 149 154 150 - async def test_raises_after_two_read_errors( 155 + async def test_raises_after_all_attempts_exhausted( 151 156 self, mock_auth_session: AuthSession 152 157 ) -> None: 153 158 mock_client = AsyncMock() ··· 160 165 "backend._internal.atproto.client.get_oauth_client", 161 166 return_value=mock_client, 162 167 ), 163 - pytest.raises(Exception, match="blob upload failed after retry"), 168 + pytest.raises(Exception, match=r"blob upload failed after \d+ attempts"), 164 169 ): 165 170 await upload_blob(mock_auth_session, b"fake-audio", "audio/mpeg") 166 171
+2 -2
loq.toml
··· 40 40 41 41 [[rules]] 42 42 path = "backend/src/backend/api/tracks/uploads.py" 43 - max_lines = 1338 43 + max_lines = 1349 44 44 45 45 [[rules]] 46 46 path = "backend/src/backend/config.py" ··· 256 256 257 257 [[rules]] 258 258 path = "backend/src/backend/api/tracks/audio_replace.py" 259 - max_lines = 704 259 + max_lines = 713 260 260 261 261 [[rules]] 262 262 path = "backend/tests/conftest.py"