about things
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 125 lines 4.8 kB view raw view rendered
1# async 2 3python's `async`/`await` syntax is straightforward. the interesting part is how you structure code around it. 4 5## async with 6 7the core insight from async python codebases: `async with` is how you manage resources. not try/finally, not callbacks - the context manager protocol. 8 9when you open a connection, start a session, or acquire any resource that needs cleanup, you wrap it in an async context manager: 10 11```python 12@asynccontextmanager 13async def get_atproto_client( 14 require_auth: bool = False, 15 operation: str = "this operation", 16 target_repo: str | None = None, 17) -> AsyncIterator[AsyncClient]: 18 """get an atproto client using credentials from context or environment.""" 19 client = AsyncClient(pds_url) 20 if require_auth and handle and password: 21 await client.login(handle, password) 22 try: 23 yield client 24 finally: 25 pass # AsyncClient doesn't need explicit cleanup 26``` 27 28the caller writes `async with get_atproto_client() as client:` and cleanup happens automatically. this pattern appears constantly - database connections, HTTP sessions, file handles, locks. 29 30from [pdsx/mcp/client.py](https://github.com/zzstoatzz/pdsx/blob/main/src/pdsx/mcp/client.py) 31 32the alternative - manual try/finally blocks scattered through the code, or worse, forgetting cleanup entirely - is why this pattern dominates. you encode the lifecycle once in the context manager, and every use site gets it right by default. 33 34## ContextVar 35 36python added `contextvars` to solve a specific problem: how do you have request-scoped state in async code without passing it through every function? 37 38in sync code, you might use thread-locals. but async tasks can interleave on the same thread, so thread-locals don't work. `ContextVar` gives each task its own copy: 39 40```python 41from contextvars import ContextVar 42 43_current_docket: ContextVar[Docket | None] = ContextVar("docket", default=None) 44_current_worker: ContextVar[Worker | None] = ContextVar("worker", default=None) 45_current_server: ContextVar[weakref.ref[FastMCP] | None] = ContextVar("server", default=None) 46``` 47 48set it at the start of handling a request, and any code called from that task can access it. this is how frameworks like fastapi and fastmcp pass request context without threading it through every function signature. 49 50the pattern: set at the boundary (request handler, task entry), read anywhere inside. reset when you're done. 51 52from [fastmcp/server/dependencies.py](https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/dependencies.py) 53 54## concurrency control 55 56`asyncio.gather()` runs tasks concurrently, but sometimes you need to limit how many run at once - rate limits, connection pools, memory constraints. 57 58`asyncio.Semaphore` is the primitive for this. acquire before work, release after. the `async with` syntax makes it clean: 59 60```python 61semaphore = asyncio.Semaphore(concurrency) 62 63async def delete_one(uri: str) -> None: 64 """delete a single record with concurrency control.""" 65 async with semaphore: 66 try: 67 await delete_record(client, uri) 68 successful.append(uri) 69 except Exception as e: 70 failed.append((uri, e)) 71 if fail_fast: 72 raise 73 74await asyncio.gather(*[delete_one(uri) for uri in uris]) 75``` 76 77at most `concurrency` delete operations run at once. the rest wait. 78 79from [pdsx/_internal/batch.py](https://github.com/zzstoatzz/pdsx/blob/main/src/pdsx/_internal/batch.py) 80 81## connection pools 82 83module-level singleton pool, lazily initialized: 84 85```python 86_pool: asyncpg.Pool | None = None 87 88async def get_pool() -> asyncpg.Pool: 89 global _pool 90 if _pool is None: 91 _pool = await asyncpg.create_pool(db_url, min_size=2, max_size=10) 92 return _pool 93 94@asynccontextmanager 95async def get_conn() -> AsyncGenerator[asyncpg.Connection, None]: 96 pool = await get_pool() 97 async with pool.acquire() as conn: 98 yield conn 99``` 100 101callers use `async with get_conn() as conn:` - pool handles connection lifecycle. 102 103## batch writes with unnest 104 105postgres `unnest()` turns arrays into rows. one round trip for thousands of inserts: 106 107```python 108async def batch_upsert_follows(follows: list[tuple[str, str, str]]) -> None: 109 follower_ids = [f[0] for f in follows] 110 rkeys = [f[1] for f in follows] 111 subject_ids = [f[2] for f in follows] 112 113 async with get_conn() as conn: 114 await conn.execute( 115 """ 116 INSERT INTO follows (follower_id, rkey, subject_id) 117 SELECT * FROM unnest($1::bigint[], $2::text[], $3::bigint[]) 118 ON CONFLICT (follower_id, rkey) DO UPDATE 119 SET subject_id = EXCLUDED.subject_id 120 """, 121 follower_ids, rkeys, subject_ids, 122 ) 123``` 124 125from [follower-weight/db.py](https://github.com/zzstoatzz/follower-weight)