A social pastebin built on atproto.
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

use slingshot for resolveMiniDoc

+26 -21
+26 -21
identity.py
··· 2 2 from concurrent.futures import ThreadPoolExecutor 3 3 4 4 import requests 5 - from atproto_identity.resolver import IdResolver 6 5 7 6 CONSTELLATION_URL = "https://constellation.microcosm.blue" 8 7 SLINGSHOT_URL = "https://slingshot.microcosm.blue" ··· 18 17 19 18 20 19 def resolve_did(identifier: str) -> str | None: 21 - """Resolve a handle to a DID. Returns the DID, or None if resolution fails.""" 20 + """Resolve a handle to a DID via Slingshot. Returns the DID, or None if resolution fails.""" 22 21 if identifier.startswith("did:"): 23 22 return identifier 24 - resolver = IdResolver() 25 - return resolver.handle.resolve(identifier) 23 + try: 24 + resp = requests.get( 25 + f"{SLINGSHOT_URL}/xrpc/com.atproto.identity.resolveHandle", 26 + params={"handle": identifier}, 27 + timeout=5, 28 + ) 29 + if resp.status_code != 200: 30 + return None 31 + return resp.json().get("did") 32 + except (requests.RequestException, ValueError): 33 + return None 26 34 27 35 28 36 def resolve_identity(did: str) -> tuple[str | None, str | None]: 29 - """Resolve a DID to its handle and PDS URL.""" 37 + """Resolve a DID to its handle and PDS URL via Slingshot.""" 30 38 now = time.time() 31 39 if did in _identity_cache: 32 40 result, ts = _identity_cache[did] 33 41 if now - ts < _identity_ttl: 34 42 return result 35 43 36 - resolver = IdResolver() 37 - did_doc = resolver.did.resolve(did) 38 - 39 - if did_doc is None: 40 - _identity_cache[did] = ((None, None), now) 44 + try: 45 + resp = requests.get( 46 + f"{SLINGSHOT_URL}/xrpc/blue.microcosm.identity.resolveMiniDoc", 47 + params={"identifier": did}, 48 + timeout=5, 49 + ) 50 + if resp.status_code != 200: 51 + _identity_cache[did] = ((None, None), now) 52 + return None, None 53 + data = resp.json() 54 + except (requests.RequestException, ValueError): 41 55 return None, None 42 56 43 - handle = None 44 - for aka in did_doc.also_known_as: # type: ignore[union-attr] 45 - if aka.startswith("at://"): 46 - handle = aka[5:] 47 - break 48 - 49 - pds_url = None 50 - for service in did_doc.service: # type: ignore[union-attr] 51 - if service.id == "#atproto_pds": 52 - pds_url = service.service_endpoint 53 - break 57 + handle = data.get("handle") 58 + pds_url = data.get("pds") 54 59 55 60 _identity_cache[did] = ((handle, pds_url), now) 56 61 return handle, pds_url