decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

add some types to atproto

+122 -57
+71 -22
src/atproto/__init__.py
··· 1 + import asyncio 1 2 from os import getenv 2 3 from re import match as regex_match 3 - from typing import Any 4 + from typing import Any, TypeGuard 4 5 5 6 from aiodns import DNSResolver 6 7 from aiodns import error as dns_error ··· 9 10 from src.security import is_safe_url 10 11 11 12 from .kv import KV, nokv 13 + from .types import DID, AuthserverUrl, Handle, PdsUrl 12 14 from .validator import is_valid_authserver_meta 13 15 14 16 PLC_DIRECTORY = getenv("PLC_DIRECTORY_URL") or "https://plc.directory" ··· 16 18 DID_REGEX = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$" 17 19 18 20 19 - type AuthserverUrl = str 20 - type PdsUrl = str 21 - type DID = str 22 - 23 - 24 - def is_valid_handle(handle: str) -> bool: 21 + def is_valid_handle(handle: str) -> TypeGuard[Handle]: 25 22 return regex_match(HANDLE_REGEX, handle) is not None 26 23 27 24 28 - def is_valid_did(did: str) -> bool: 25 + def is_valid_did(did: str) -> TypeGuard[DID]: 29 26 return regex_match(DID_REGEX, did) is not None 30 27 31 28 32 29 async def resolve_identity( 33 30 client: ClientSession, 34 31 query: str, 35 - didkv: KV = nokv, 36 - ) -> tuple[str, str, dict[str, Any]] | None: 32 + didkv: KV[Handle, DID] = nokv, 33 + pdskv: KV[DID, PdsUrl] = nokv, 34 + ) -> tuple[DID, Handle, PdsUrl] | None: 35 + ((done,), _) = await asyncio.wait( 36 + ( 37 + asyncio.create_task( 38 + resolve_identity_microcosm(client, query, didkv=didkv, pdskv=pdskv), 39 + name="microcosm", 40 + ), 41 + asyncio.create_task( 42 + resolve_identity_raw(client, query, didkv=didkv, pdskv=pdskv), 43 + name="raw", 44 + ), 45 + ), 46 + return_when=asyncio.FIRST_COMPLETED, 47 + ) 48 + return done.result() 49 + 50 + 51 + async def resolve_identity_raw( 52 + client: ClientSession, 53 + query: str, 54 + didkv: KV[Handle, DID], 55 + pdskv: KV[DID, PdsUrl], 56 + ) -> tuple[DID, Handle, PdsUrl] | None: 37 57 """Resolves an identity to a DID, handle and DID document, verifies handles bi directionally.""" 38 58 39 59 if is_valid_handle(query): 40 - handle = query.lower() 60 + handle = Handle(query.lower()) 41 61 did = await resolve_did_from_handle(client, handle, didkv) 42 62 if not did: 43 63 return None ··· 47 67 doc_handle = handle_from_doc(doc) 48 68 if not doc_handle or doc_handle != handle: 49 69 return None 50 - return (did, handle, doc) 51 70 52 - if is_valid_did(query): 71 + elif is_valid_did(query): 53 72 did = query 54 73 doc = await resolve_doc_from_did(client, did) 55 74 if not doc: ··· 59 78 return None 60 79 if await resolve_did_from_handle(client, handle, didkv) != did: 61 80 return None 62 - return (did, handle, doc) 81 + 82 + else: 83 + return None 63 84 64 - return None 85 + pds_url = pds_endpoint_from_doc(doc) 86 + if not pds_url: 87 + return None 88 + pdskv.set(did, value=pds_url) 89 + 90 + return (did, handle, pds_url) 65 91 66 92 67 - def handle_from_doc(doc: dict[str, list[str]]) -> str | None: 93 + async def resolve_identity_microcosm( 94 + client: ClientSession, 95 + query: str, 96 + didkv: KV[Handle, DID], 97 + pdskv: KV[DID, PdsUrl], 98 + ) -> tuple[DID, Handle, PdsUrl] | None: 99 + base = "https://slingshot.microcosm.blue" 100 + url = f"{base}/xrpc/com.bad-example.identity.resolveMiniDoc?identifier={query}" 101 + response = await client.get(url) 102 + if not response.ok: 103 + return None 104 + mini_doc: dict[str, str] = await response.json() 105 + did, handle, pds = mini_doc["did"], mini_doc["handle"], mini_doc["pds"] 106 + assert is_valid_did(did) 107 + assert is_valid_handle(handle) 108 + didkv.set(handle, value=did) 109 + pds = PdsUrl(pds) 110 + pdskv.set(did, value=pds) 111 + return did, handle, pds 112 + 113 + 114 + def handle_from_doc(doc: dict[str, list[str]]) -> Handle | None: 68 115 """Return all possible handles inside the DID document.""" 69 116 70 117 for aka in doc.get("alsoKnownAs", []): ··· 78 125 async def resolve_did_from_handle( 79 126 client: ClientSession, 80 127 handle: str, 81 - kv: KV = nokv, 128 + kv: KV[Handle, DID] = nokv, 82 129 reload: bool = False, 83 - ) -> str | None: 130 + ) -> DID | None: 84 131 """Returns the DID for a given handle.""" 85 132 86 133 if not is_valid_handle(handle): ··· 96 143 97 144 if did is not None and is_valid_did(did): 98 145 kv.set(handle, value=did) 99 - return did 146 + return DID(did) 100 147 101 148 return None 102 149 ··· 130 177 return None 131 178 132 179 133 - def pds_endpoint_from_doc(doc: dict[str, list[dict[str, str]]]) -> str | None: 180 + def pds_endpoint_from_doc(doc: dict[str, list[dict[str, str]]]) -> PdsUrl | None: 134 181 """Returns the PDS endpoint from the DID document.""" 135 182 136 183 for service in doc.get("service", []): 137 184 if service.get("id") == "#atproto_pds": 138 - return service.get("serviceEndpoint") 185 + url = service.get("serviceEndpoint") 186 + if url is not None: 187 + return PdsUrl(url) 139 188 return None 140 189 141 190 ··· 199 248 parsed: dict[str, list[str]] = await response.json() 200 249 authserver_url = parsed["authorization_servers"][0] 201 250 kv.set(pds_url, value=authserver_url) 202 - return authserver_url 251 + return AuthserverUrl(authserver_url) 203 252 204 253 205 254 async def fetch_authserver_meta(
+7 -4
src/atproto/kv.py
··· 1 1 from abc import ABC, abstractmethod 2 2 from logging import Logger 3 - from typing import override 3 + from typing import Generic, TypeVar, override 4 4 5 + K = TypeVar("K", bound=str) 6 + V = TypeVar("V", bound=str) 5 7 6 - class KV(ABC): 8 + 9 + class KV(ABC, Generic[K, V]): 7 10 @abstractmethod 8 - def get(self, key: str) -> str | None: 11 + def get(self, key: K) -> V | None: 9 12 pass 10 13 11 14 @abstractmethod 12 - def set(self, key: str, value: str): 15 + def set(self, key: K, value: V): 13 16 pass 14 17 15 18
+12 -7
src/atproto/types.py
··· 1 - from typing import NamedTuple 1 + from typing import NamedTuple, NewType 2 + 3 + AuthserverUrl = NewType("AuthserverUrl", str) 4 + PdsUrl = NewType("PdsUrl", str) 5 + Handle = NewType("Handle", str) 6 + DID = NewType("DID", str) 2 7 3 8 4 9 class OAuthAuthRequest(NamedTuple): 5 10 state: str 6 11 authserver_iss: str 7 - did: str | None 8 - handle: str | None 9 - pds_url: str | None 12 + did: DID | None 13 + handle: Handle | None 14 + pds_url: PdsUrl | None 10 15 pkce_verifier: str 11 16 scope: str 12 17 dpop_authserver_nonce: str ··· 14 19 15 20 16 21 class OAuthSession(NamedTuple): 17 - did: str 18 - handle: str | None 19 - pds_url: str 22 + did: DID 23 + handle: Handle | None 24 + pds_url: PdsUrl 20 25 authserver_iss: str 21 26 access_token: str | None 22 27 refresh_token: str | None
+6 -5
src/db.py
··· 1 1 import sqlite3 2 2 from logging import Logger 3 3 from sqlite3 import Connection 4 - from typing import override 4 + from typing import Generic, cast, override 5 5 6 6 from flask import Flask, g 7 7 8 8 from src.atproto.kv import KV as BaseKV 9 + from src.atproto.kv import K, V 9 10 10 11 11 - class KV(BaseKV): 12 + class KV(BaseKV, Generic[K, V]): 12 13 db: Connection 13 14 logger: Logger 14 15 prefix: str ··· 19 20 self.prefix = prefix 20 21 21 22 @override 22 - def get(self, key: str) -> str | None: 23 + def get(self, key: K) -> V | None: 23 24 cursor = self.db.cursor() 24 25 row: dict[str, str] | None = cursor.execute( 25 26 "select value from keyval where prefix = ? and key = ?", ··· 27 28 ).fetchone() 28 29 if row is not None: 29 30 self.logger.debug(f"returning cached {self.prefix}({key})") 30 - return row["value"] 31 + return cast(V, row["value"]) 31 32 return None 32 33 33 34 @override 34 - def set(self, key: str, value: str): 35 + def set(self, key: K, value: V): 35 36 self.logger.debug(f"caching {self.prefix}({key}): {value}") 36 37 cursor = self.db.cursor() 37 38 _ = cursor.execute(
+3 -4
src/main.py
··· 8 8 from flask_htmx import make_response as htmx_response 9 9 10 10 from src.atproto import ( 11 - PdsUrl, 12 11 get_record, 13 12 is_valid_did, 14 13 resolve_did_from_handle, 15 14 resolve_pds_from_did, 16 15 ) 17 16 from src.atproto.oauth import pds_authed_req 18 - from src.atproto.types import OAuthSession 17 + from src.atproto.types import DID, Handle, OAuthSession, PdsUrl 19 18 from src.auth import get_auth_session, save_auth_session 20 19 from src.db import KV, close_db_connection, get_db, init_db 21 20 from src.oauth import oauth ··· 52 51 reload = request.args.get("reload") is not None 53 52 54 53 db = get_db(app) 55 - didkv = KV(db, app.logger, "did_from_handle") 56 - pdskv = KV(db, app.logger, "pds_from_did") 54 + didkv = KV[Handle, DID](db, app.logger, "did_from_handle") 55 + pdskv = KV[DID, PdsUrl](db, app.logger, "pds_from_did") 57 56 58 57 async with ClientSession() as client: 59 58 if atid.startswith("@"):
+23 -15
src/oauth.py
··· 9 9 fetch_authserver_meta, 10 10 is_valid_did, 11 11 is_valid_handle, 12 - pds_endpoint_from_doc, 13 12 resolve_authserver_from_pds, 14 13 resolve_identity, 15 14 ) 16 15 from src.atproto.oauth import initial_token_request, send_par_auth_request 17 - from src.atproto.types import OAuthAuthRequest, OAuthSession 16 + from src.atproto.types import ( 17 + DID, 18 + AuthserverUrl, 19 + Handle, 20 + OAuthAuthRequest, 21 + OAuthSession, 22 + PdsUrl, 23 + ) 18 24 from src.auth import ( 19 25 delete_auth_request, 20 26 get_auth_request, ··· 35 41 return redirect(url_for("page_login"), 303) 36 42 37 43 db = get_db(current_app) 38 - pdskv = KV(db, current_app.logger, "authserver_from_pds") 44 + didkv = KV[Handle, DID](db, current_app.logger, "did_from_handle") 45 + pdskv = KV[DID, PdsUrl](db, current_app.logger, "pds_from_did") 46 + authserverkv = KV[PdsUrl, AuthserverUrl]( 47 + db, 48 + current_app.logger, 49 + "authserver_from_pds", 50 + ) 39 51 40 52 client = ClientSession() 41 53 42 54 if is_valid_handle(username) or is_valid_did(username): 43 55 login_hint = username 44 - kv = KV(db, current_app.logger, "did_from_handle") 45 - identity = await resolve_identity(client, username, didkv=kv) 56 + identity = await resolve_identity(client, username, didkv=didkv, pdskv=pdskv) 46 57 if identity is None: 47 58 return "couldnt resolve identity", 500 48 - did, handle, doc = identity 49 - pds_url = pds_endpoint_from_doc(doc) 50 - if not pds_url: 51 - return "pds not found", 404 59 + did, handle, pds_url = identity 52 60 current_app.logger.debug(f"account PDS: {pds_url}") 53 - authserver_url = await resolve_authserver_from_pds(client, pds_url, pdskv) 61 + authserver_url = await resolve_authserver_from_pds( 62 + client, pds_url, authserverkv 63 + ) 54 64 if not authserver_url: 55 65 return "authserver not found", 404 56 66 ··· 58 68 did, handle, pds_url = None, None, None 59 69 login_hint = None 60 70 authserver_url = ( 61 - await resolve_authserver_from_pds(client, username, pdskv) or username 71 + await resolve_authserver_from_pds(client, PdsUrl(username), authserverkv) 72 + or username 62 73 ) 63 74 64 75 else: ··· 175 186 identity = await resolve_identity(client, did, didkv=didkv) 176 187 if not identity: 177 188 return "could not resolve identity", 500 178 - did, handle, did_doc = identity 179 - pds_url = pds_endpoint_from_doc(did_doc) 180 - if not pds_url: 181 - return "could not resolve pds", 500 189 + did, handle, pds_url = identity 182 190 authserver_url = await resolve_authserver_from_pds( 183 191 client, 184 192 pds_url,