audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(infra): split docket worker onto its own fly process group (#1359)

* feat(infra): split docket worker onto its own fly process group

before: `relay-api` ran a single process group with uvicorn AND a
docket Worker in the same uvicorn process. an OOM in any background
task — most acutely `run_track_upload`, which holds full audio bytes
in RAM — killed uvicorn with it, producing the 502s and silent
logouts in the 2026-04-30 incident (#1357).

after: two fly process groups.
- `app`: just uvicorn. opens a Docket *client* (no Worker) so request
handlers can enqueue tasks. sized for HTTP fan-out only.
- `worker`: just the docket Worker. dedicated entrypoint at
`backend.worker`. sized for in-memory upload-pipeline work (2GB).

split structure in `_internal/background.py`:
- `docket_client_lifespan()`: opens Docket, registers tasks, no Worker.
used by main.py.
- `docket_worker_lifespan()`: opens Docket, registers tasks, runs
Worker. used by worker.py.

`backend/worker.py` is intentionally a thin Python entrypoint rather
than execing the upstream `docket` CLI: `configure_observability()`
must run before any task module imports so logfire spans cover the
worker's full lifecycle.

deploy notes (in PR body): after this lands, ops needs to set machine
counts per group via `fly scale count app=2 worker=1 -a relay-api`.
the toml only declares the groups; counts are managed out-of-band.

structural fixes still tracked in #1357.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

* fix(worker): set up notification_service before tasks run

reviewer caught a regression: registered docket tasks
(`tasks/hooks._send_track_notification`,
`tasks/moderation.scan_image_moderation`, and
`_internal/moderation.scan_track_for_copyright` invoked from
`tasks/copyright`) call into `notification_service` for admin DMs.
without `setup()` running on the worker process, the global instance
keeps `recipient_did = None`, so each `send_*_notification` early-returns
None — and `_send_track_notification` then sets
`track.notification_sent = True` regardless, which would have made the
loss permanent and silent on every new upload.

`queue_service` and `jam_service` are HTTP-only — they stay in main.py's
lifespan and intentionally don't run on the worker.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

* chore(staging): mirror process group split in fly.staging.toml

without this, the staging deploy that follows the merge would still run
the old single-process toml, so the split structure wouldn't actually
be exercised — and worse, the new main.py would only open a docket
client (no Worker), leaving staging background tasks stranded in Redis.

staging sized at 1GB per group (vs prod's app=1GB, worker=2GB) since
staging doesn't carry meaningful upload load.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4 (1M context)
and committed by
GitHub
522cc694 06e62d83

+173 -23
+16 -1
backend/fly.staging.toml
··· 21 21 R2_PUBLIC_BUCKET_URL = 'https://pub-0a0a2e70496c461581c9fafb442b269d.r2.dev' 22 22 STORAGE_BACKEND = 'r2' 23 23 24 + # two process groups so a runaway upload task in `worker` cannot OOM the 25 + # `app` process serving HTTP. mirrors backend/fly.toml — staging needs 26 + # the same structural split for smoke testing to exercise it. sizing is 27 + # smaller than prod since staging carries less load. 28 + [processes] 29 + app = "uv run --no-sync uvicorn backend.main:app --host 0.0.0.0 --port 8000" 30 + worker = "uv run --no-sync python -m backend.worker" 31 + 24 32 [http_service] 25 33 internal_port = 8000 26 34 force_https = true ··· 42 50 path = "/health" 43 51 44 52 [[vm]] 53 + processes = ['app'] 54 + memory = '1gb' 55 + cpu_kind = 'shared' 56 + cpus = 1 57 + 58 + [[vm]] 59 + processes = ['worker'] 45 60 memory = '1gb' 46 61 cpu_kind = 'shared' 47 62 cpus = 1 ··· 51 66 # - AWS_ACCESS_KEY_ID (cloudflare R2) 52 67 # - AWS_SECRET_ACCESS_KEY (cloudflare R2) 53 68 # - OAUTH_ENCRYPTION_KEY (generate: python -c 'from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())') 54 - # - DOCKET_URL (self-hosted redis: redis://plyr-redis-stg.internal:6379) 69 + # - DOCKET_URL (self-hosted redis: redis://plyr-redis-stg.internal:6379)
+21 -1
backend/fly.toml
··· 8 8 [deploy] 9 9 release_command = "uv run --no-dev alembic upgrade head" 10 10 11 + # two process groups so a runaway upload task in `worker` cannot OOM the 12 + # `app` process serving HTTP. see backend/worker.py for the worker 13 + # entrypoint and docs/internal/runbooks/upload-oom-cycle.md for the 14 + # incident that motivated the split. 15 + [processes] 16 + app = "uv run --no-sync uvicorn backend.main:app --host 0.0.0.0 --port 8000" 17 + worker = "uv run --no-sync python -m backend.worker" 18 + 11 19 [http_service] 12 20 internal_port = 8000 13 21 force_https = true ··· 28 36 method = "GET" 29 37 path = "/health" 30 38 39 + # `app` only serves HTTP; sized for request fan-out, not for in-memory 40 + # audio processing. 31 41 [[vm]] 42 + processes = ['app'] 32 43 memory = '1gb' 33 44 cpu_kind = 'shared' 34 45 cpus = 1 35 46 47 + # `worker` runs the docket Worker — runs run_track_upload, transcode 48 + # fetches, etc. headroom for the in-memory audio bytes the upload 49 + # pipeline currently holds (tracked in #1357). 50 + [[vm]] 51 + processes = ['worker'] 52 + memory = '2gb' 53 + cpu_kind = 'shared' 54 + cpus = 1 55 + 36 56 [env] 37 57 PORT = '8000' 38 58 STORAGE_BACKEND = 'r2' ··· 46 66 # - AWS_ACCESS_KEY_ID (cloudflare R2) 47 67 # - AWS_SECRET_ACCESS_KEY (cloudflare R2) 48 68 # - OAUTH_ENCRYPTION_KEY (generate: python -c 'from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())') 49 - # - DOCKET_URL (self-hosted redis: redis://plyr-redis.internal:6379) 69 + # - DOCKET_URL (self-hosted redis: redis://plyr-redis.internal:6379)
+47 -18
backend/src/backend/_internal/background.py
··· 1 1 """background task infrastructure using pydocket. 2 2 3 - provides a docket instance for scheduling background tasks and a worker 4 - that runs alongside the FastAPI server. requires DOCKET_URL to be set 5 - to a Redis URL. 3 + provides two lifespans plus a `get_docket()` accessor: 4 + 5 + - `docket_client_lifespan()`: opens a Docket connection and registers the 6 + task collection. used by the FastAPI `app` process so request handlers 7 + can enqueue background tasks via `get_docket().add(task)(...)` without 8 + running the Worker run loop in the same process. 6 9 7 - usage: 8 - from backend._internal.background import get_docket 10 + - `docket_worker_lifespan()`: opens the Docket connection AND runs a 11 + Worker. used by the dedicated `worker` process (see `backend/worker.py`). 9 12 10 - docket = get_docket() 11 - await docket.add(my_task_function)(arg1, arg2) 13 + splitting these means an upload-task OOM in the worker process can no 14 + longer kill the HTTP server. requires `DOCKET_URL` to be set to a Redis 15 + URL. 12 16 """ 13 17 14 18 import asyncio ··· 23 27 24 28 logger = logging.getLogger(__name__) 25 29 26 - # global docket instance - initialized in lifespan 30 + # global docket instance - initialized in either lifespan 27 31 _docket: Docket | None = None 28 32 29 33 ··· 39 43 40 44 41 45 @asynccontextmanager 42 - async def background_worker_lifespan() -> AsyncGenerator[Docket, None]: 43 - """lifespan context manager for docket and its worker. 46 + async def docket_client_lifespan() -> AsyncGenerator[Docket, None]: 47 + """open a Docket client (no Worker). 48 + 49 + used by the HTTP `app` process so request handlers can enqueue 50 + background tasks. the Worker run loop runs in a separate process 51 + (see `backend/worker.py`), so a runaway upload task can never OOM 52 + the HTTP server. 53 + """ 54 + global _docket 55 + 56 + logger.info( 57 + "initializing docket client", 58 + extra={"docket_name": settings.docket.name, "url": settings.docket.url}, 59 + ) 44 60 45 - initializes the docket connection and starts an in-process 46 - worker that processes background tasks. 61 + # WARNING: do not modify Docket() constructor args without reading 62 + # docs/backend/background-tasks.md - see 2025-12-30 incident 63 + async with Docket( 64 + name=settings.docket.name, 65 + url=settings.docket.url, 66 + ) as docket: 67 + _docket = docket 68 + _register_tasks(docket) 69 + try: 70 + yield docket 71 + finally: 72 + _docket = None 73 + logger.info("docket client closed") 47 74 48 - yields: 49 - Docket: the initialized docket instance 75 + 76 + @asynccontextmanager 77 + async def docket_worker_lifespan() -> AsyncGenerator[Docket, None]: 78 + """open a Docket connection AND run a Worker. 79 + 80 + used by the dedicated worker process. yields the Docket so the 81 + entrypoint can hold the lifespan open until a shutdown signal arrives. 50 82 """ 51 83 global _docket 52 84 53 85 logger.info( 54 - "initializing docket", 86 + "initializing docket worker", 55 87 extra={"docket_name": settings.docket.name, "url": settings.docket.url}, 56 88 ) 57 89 ··· 62 94 url=settings.docket.url, 63 95 ) as docket: 64 96 _docket = docket 65 - 66 - # register all background task functions 67 97 _register_tasks(docket) 68 98 69 - # start worker as background task 70 99 worker_task: asyncio.Task[None] | None = None 71 100 try: 72 101 async with Worker(
+6 -3
backend/src/backend/main.py
··· 15 15 from sqlalchemy.exc import SQLAlchemyError 16 16 17 17 from backend._internal import jam_service, notification_service, queue_service 18 - from backend._internal.background import background_worker_lifespan 18 + from backend._internal.background import docket_client_lifespan 19 19 from backend.api import ( 20 20 account_router, 21 21 activity_router, ··· 96 96 except (OSError, SQLAlchemyError): 97 97 logger.warning("failed to warm database connection pool") 98 98 99 - # start background task worker (docket) 100 - async with background_worker_lifespan() as docket: 99 + # open docket client (no Worker — the worker run loop lives in the 100 + # dedicated `worker` process group, see backend/worker.py + fly.toml). 101 + # request handlers enqueue tasks via this client; nothing here actually 102 + # consumes the queue. 103 + async with docket_client_lifespan() as docket: 101 104 # store docket on app state for access in routes if needed 102 105 app.state.docket = docket 103 106 yield
+83
backend/src/backend/worker.py
··· 1 + """docket worker entrypoint. 2 + 3 + run via: 4 + uv run --no-sync python -m backend.worker 5 + 6 + this is the standalone worker process — no uvicorn, no HTTP serving. 7 + matches the `worker` process group in `backend/fly.toml`. 8 + 9 + we don't shell out to the upstream `docket` CLI because observability is 10 + app-owned: `configure_observability()` (logfire + logging) needs to run 11 + before any task module is imported, and the upstream CLI does not run 12 + plyr-specific setup. 13 + 14 + services initialized here are deliberately a subset of `main.py`'s 15 + lifespan — only the ones used by *registered docket tasks*. specifically 16 + `notification_service`, which `tasks/hooks.py:200`, 17 + `tasks/moderation.py:46`, and `_internal/moderation.py:170` (the 18 + copyright scanner, called from `tasks/copyright.py`) all reach into. 19 + without setup the bsky DM client and `recipient_did` stay None, so 20 + notifications silently no-op while `track.notification_sent` still gets 21 + flipped to True — permanent silent loss. `queue_service` and 22 + `jam_service` are HTTP-only and intentionally omitted. 23 + """ 24 + 25 + import asyncio 26 + import logging 27 + import signal 28 + 29 + from backend._internal import notification_service 30 + from backend._internal.background import docket_worker_lifespan 31 + from backend.config import settings 32 + from backend.utilities.observability import ( 33 + configure_observability, 34 + suppress_warnings, 35 + ) 36 + 37 + # matches the order in main.py: suppress pydantic warnings before any 38 + # atproto-touching task module gets imported, then configure logfire. 39 + suppress_warnings() 40 + configure_observability(settings) 41 + 42 + logger = logging.getLogger(__name__) 43 + 44 + 45 + async def _run() -> None: 46 + """run the docket worker until SIGINT/SIGTERM.""" 47 + loop = asyncio.get_running_loop() 48 + stop = asyncio.Event() 49 + 50 + def request_stop(sig_name: str) -> None: 51 + logger.info("received %s, initiating graceful shutdown", sig_name) 52 + stop.set() 53 + 54 + loop.add_signal_handler(signal.SIGTERM, lambda: request_stop("SIGTERM")) 55 + loop.add_signal_handler(signal.SIGINT, lambda: request_stop("SIGINT")) 56 + 57 + try: 58 + # services used by registered tasks must be set up before the 59 + # worker starts processing. setup is best-effort: if bsky auth 60 + # fails, notification_service.setup() logs and leaves the client 61 + # disabled rather than raising — same as in the HTTP lifespan. 62 + await notification_service.setup() 63 + try: 64 + async with docket_worker_lifespan(): 65 + logger.info("worker process ready") 66 + await stop.wait() 67 + logger.info("shutdown signal received, draining worker") 68 + finally: 69 + try: 70 + await asyncio.wait_for(notification_service.shutdown(), timeout=2.0) 71 + except TimeoutError: 72 + logger.warning("notification_service.shutdown() timed out") 73 + finally: 74 + loop.remove_signal_handler(signal.SIGTERM) 75 + loop.remove_signal_handler(signal.SIGINT) 76 + 77 + 78 + def main() -> None: 79 + asyncio.run(_run()) 80 + 81 + 82 + if __name__ == "__main__": 83 + main()