Sync reading position from Moon Reader app to Bookhive atproto records
atproto bookhive ereader moonreader
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 80 lines 2.5 kB view raw
1"""Handle → DID → PDS resolution.""" 2 3from __future__ import annotations 4 5import httpx 6import pytest 7import respx 8 9from spacebee.atproto import identity 10 11 12@pytest.fixture 13async def http(): 14 async with httpx.AsyncClient() as client: 15 yield client 16 17 18@respx.mock 19async def test_resolve_plc_handle(http): 20 respx.get( 21 "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle", 22 params={"handle": "alice.bsky.social"}, 23 ).mock(return_value=httpx.Response(200, json={"did": "did:plc:abc123"})) 24 respx.get("https://plc.directory/did:plc:abc123").mock( 25 return_value=httpx.Response( 26 200, 27 json={ 28 "id": "did:plc:abc123", 29 "service": [ 30 { 31 "id": "#atproto_pds", 32 "type": "AtprotoPersonalDataServer", 33 "serviceEndpoint": "https://morel.us-east.host.bsky.network", 34 } 35 ], 36 }, 37 ) 38 ) 39 40 host = await identity.resolve_pds(http, "alice.bsky.social") 41 assert host == "morel.us-east.host.bsky.network" 42 43 44@respx.mock 45async def test_resolve_did_web_root(http): 46 respx.get( 47 "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle", 48 params={"handle": "alice.example.com"}, 49 ).mock(return_value=httpx.Response(200, json={"did": "did:web:example.com"})) 50 respx.get("https://example.com/.well-known/did.json").mock( 51 return_value=httpx.Response( 52 200, 53 json={ 54 "service": [ 55 { 56 "id": "#atproto_pds", 57 "type": "AtprotoPersonalDataServer", 58 "serviceEndpoint": "https://pds.example.com/", 59 } 60 ] 61 }, 62 ) 63 ) 64 65 host = await identity.resolve_pds(http, "alice.example.com") 66 assert host == "pds.example.com" 67 68 69@respx.mock 70async def test_missing_pds_service_raises(http): 71 respx.get( 72 "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle", 73 params={"handle": "alice.bsky.social"}, 74 ).mock(return_value=httpx.Response(200, json={"did": "did:plc:abc"})) 75 respx.get("https://plc.directory/did:plc:abc").mock( 76 return_value=httpx.Response(200, json={"service": []}) 77 ) 78 79 with pytest.raises(RuntimeError, match="No AtprotoPersonalDataServer"): 80 await identity.resolve_pds(http, "alice.bsky.social")