decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
1import asyncio
2import json
3from typing import Any, NamedTuple, cast
4
5from aiohttp.client import ClientSession
6from atproto import (
7 get_record,
8 is_valid_did,
9 resolve_did_from_handle,
10 resolve_pds_from_did,
11)
12from atproto.oauth import pds_authed_req
13from atproto.types import DID, Handle, OAuthSession, PdsUrl
14from flask import Flask, g, redirect, render_template, request, session, url_for
15from flask_htmx import HTMX
16from flask_htmx import make_response as htmx_response
17from xrpc import xrpc
18
19from src.auth import (
20 get_auth_session,
21 refresh_auth_session,
22 save_auth_session,
23)
24from src.config import AuthServer, Config
25from src.db import KV, close_db_connection, get_db, init_db
26from src.oauth import oauth, oauth_metadata
27
28app = Flask(__name__)
29_ = app.config.from_prefixed_env()
30app.register_blueprint(oauth)
31app.register_blueprint(xrpc)
32htmx = HTMX()
33htmx.init_app(app)
34init_db(app, name="config")
35init_db(app, name="keyval")
36
37
38@app.before_request
39async def load_user_to_context():
40 g.user = get_auth_session(session)
41
42
43async def get_user() -> OAuthSession | None:
44 user = cast(OAuthSession | None, g.user)
45 if user is not None and user.is_expired():
46 async with ClientSession() as client:
47 user = await refresh_auth_session(session, client, user)
48 return user
49
50
51@app.teardown_appcontext
52async def app_teardown(exception: BaseException | None):
53 close_db_connection(exception)
54
55
56@app.get("/")
57def page_home():
58 return render_template("index.html")
59
60
61@app.get("/<string:atid>")
62async def page_profile(atid: str):
63 reload = request.args.get("reload") is not None
64
65 db = get_db(app, name="keyval")
66 didkv = KV[Handle, DID](db, app.logger, "did_from_handle")
67 pdskv = KV[DID, PdsUrl](db, app.logger, "pds_from_did")
68
69 async with ClientSession() as client:
70 if atid.startswith("@"):
71 handle = atid[1:].lower()
72 did = await resolve_did_from_handle(client, handle, kv=didkv, reload=reload)
73 if did is None:
74 return render_template("error.html", message="did not found"), 404
75 elif is_valid_did(atid):
76 handle = None
77 did = atid
78 else:
79 return render_template("error.html", message="invalid did or handle"), 400
80
81 if _is_did_blocked(did):
82 return render_template("error.html", message="profile not found"), 404
83
84 pds = await resolve_pds_from_did(client, did=did, kv=pdskv, reload=reload)
85 if pds is None:
86 return render_template("error.html", message="pds not found"), 404
87 (profile, _), link_sections = await asyncio.gather(
88 load_profile(client, pds, did, reload=reload),
89 load_links(client, pds, did, reload=reload),
90 )
91
92 if profile is None:
93 profile = {}
94 if link_sections is None:
95 link_sections = []
96
97 if reload:
98 # remove the ?reload parameter
99 return redirect(request.path)
100
101 if handle:
102 profile["handle"] = handle
103 athref = f"at://{did}/at.ligo.actor.links/self"
104 return render_template(
105 "profile.html",
106 did=did,
107 profile=profile,
108 sections=link_sections,
109 athref=athref,
110 )
111
112
113@app.get("/login")
114async def page_login():
115 if await get_user() is not None:
116 return redirect("/editor")
117 config = Config(app)
118 auth_servers = config.auth_servers()
119 if app.debug:
120 auth_servers.append(AuthServer("pds.rip", "https://pds.rip"))
121 return render_template("login.html", auth_servers=auth_servers)
122
123
124@app.post("/login")
125def auth_login():
126 value = request.form.get("username") or request.form.get("authserver")
127 if value and value[0] == "@":
128 value = value[1:]
129 if not value:
130 return redirect(url_for("page_login"), 303)
131 return redirect(url_for("oauth.oauth_start", username_or_authserver=value), 303)
132
133
134@app.route("/auth/logout")
135def auth_logout():
136 session.clear()
137 return redirect(url_for("page_login"), 303)
138
139
140@app.get("/editor")
141async def page_editor():
142 user = await get_user()
143 if user is None:
144 return redirect("/login", 302)
145
146 did: str = user.did
147 pds: str = user.pds_url
148 handle: str | None = user.handle
149
150 async with ClientSession() as client:
151 (profile, from_bluesky), link_sections = await asyncio.gather(
152 load_profile(client, pds, did),
153 load_links(client, pds, did),
154 )
155
156 links = []
157 if link_sections:
158 links = link_sections[0].links
159
160 return render_template(
161 "editor.html",
162 handle=handle,
163 profile=profile,
164 profile_from_bluesky=from_bluesky,
165 links=json.dumps(links),
166 )
167
168
169@app.post("/editor/profile")
170async def post_editor_profile():
171 user = await get_user()
172 if user is None:
173 url = url_for("auth_logout")
174 return htmx_response(redirect=url) if htmx else redirect(url, 303)
175
176 display_name = request.form.get("displayName")
177 description = request.form.get("description", "")
178 if not display_name:
179 return redirect("/editor", 303)
180
181 record = {
182 "$type": "at.ligo.actor.profile",
183 "displayName": display_name,
184 "description": description,
185 }
186
187 success = await put_record(
188 user=user,
189 pds=user.pds_url,
190 repo=user.did,
191 collection="at.ligo.actor.profile",
192 rkey="self",
193 record=record,
194 )
195
196 if success:
197 kv = KV(app, app.logger, "profile_from_did")
198 kv.set(user.did, json.dumps(record))
199 else:
200 app.logger.warning("log out user for now")
201 url = url_for("auth_logout")
202 return htmx_response(redirect=url) if htmx else redirect(url, 303)
203
204 if htmx:
205 return htmx_response(
206 render_template("_editor_profile.html", profile=record),
207 reswap="outerHTML",
208 )
209
210 return redirect(url_for("page_editor"), 303)
211
212
213@app.post("/editor/links")
214async def post_editor_links():
215 user = await get_user()
216 if user is None:
217 url = url_for("auth_logout")
218 return htmx_response(redirect=url) if htmx else redirect(url, 303)
219
220 links: list[dict[str, str]] = []
221 hrefs = request.form.getlist("link-href")
222 titles = request.form.getlist("link-title")
223 subtitles = request.form.getlist("link-subtitle")
224 backgrounds = request.form.getlist("link-background-color")
225 for href, title, subtitle, background in zip(hrefs, titles, subtitles, backgrounds):
226 if not href or not title or not background:
227 break
228 link: dict[str, str] = {
229 "href": href,
230 "title": title,
231 "backgroundColor": background,
232 }
233 if subtitle:
234 link["subtitle"] = subtitle
235 links.append(link)
236
237 record = {
238 "$type": "at.ligo.actor.links",
239 "sections": [
240 {
241 "title": "",
242 "links": links,
243 }
244 ],
245 }
246
247 success = await put_record(
248 user=user,
249 pds=user.pds_url,
250 repo=user.did,
251 collection="at.ligo.actor.links",
252 rkey="self",
253 record=record,
254 )
255
256 if success:
257 kv = KV(app, app.logger, "links_from_did")
258 kv.set(user.did, json.dumps(record))
259 else:
260 app.logger.warning("log out user for now")
261 url = url_for("auth_logout")
262 return htmx_response(redirect=url) if htmx else redirect(url, 303)
263
264 if htmx:
265 return htmx_response(
266 render_template("_editor_links.html", links=links),
267 reswap="outerHTML",
268 )
269
270 return redirect(url_for("page_editor"), 303)
271
272
273@app.get("/terms")
274def page_terms():
275 return render_template("terms.html")
276
277
278@app.get("/.well-known/oauth-client-metadata.json")
279def well_known_oauth_metadata():
280 response = oauth_metadata()
281 return response, 200, {"Access-Control-Allow-Origin": "*"}
282
283
284class LinkSection(NamedTuple):
285 title: str
286 links: list[dict[str, str]]
287
288
289async def load_links(
290 client: ClientSession,
291 pds: str,
292 did: str,
293 reload: bool = False,
294) -> list[LinkSection] | None:
295 kv = KV(app, app.logger, "links_from_did")
296 record_json = kv.get(did)
297
298 if record_json is not None and not reload:
299 parsed = json.loads(record_json)
300 return _links_or_sections(parsed)
301
302 record = await get_record(client, pds, did, "at.ligo.actor.links", "self")
303 if record is None:
304 return None
305
306 kv.set(did, value=json.dumps(record))
307 return _links_or_sections(record)
308
309
310def _links_or_sections(raw: dict[str, Any]) -> list[LinkSection] | None:
311 if "sections" in raw:
312 return list(map(lambda s: LinkSection(**s), raw["sections"]))
313 elif "links" in raw:
314 return [LinkSection("", raw["links"])]
315 else:
316 return None
317
318
319async def load_profile(
320 client: ClientSession,
321 pds: str,
322 did: str,
323 fallback_with_bluesky: bool = True,
324 reload: bool = False,
325) -> tuple[dict[str, str] | None, bool]:
326 kv = KV(app, app.logger, "profile_from_did")
327 record_json = kv.get(did)
328
329 if record_json is not None and not reload:
330 return json.loads(record_json), False
331
332 (record, bsky_record) = await asyncio.gather(
333 get_record(client, pds, did, "at.ligo.actor.profile", "self"),
334 get_record(client, pds, did, "app.bsky.actor.profile", "self"),
335 )
336
337 from_bluesky = False
338 if record is None and fallback_with_bluesky:
339 record = bsky_record
340 from_bluesky = True
341
342 if record is not None:
343 kv.set(did, value=json.dumps(record))
344
345 return record, from_bluesky
346
347
348# TODO: move to .atproto
349async def put_record(
350 user: OAuthSession,
351 pds: PdsUrl,
352 repo: str,
353 collection: str,
354 rkey: str,
355 record: dict[str, Any],
356) -> bool:
357 """Writes the record onto the users PDS. Returns bool for success."""
358
359 endpoint = f"{pds}/xrpc/com.atproto.repo.putRecord"
360 body = {
361 "repo": repo,
362 "collection": collection,
363 "rkey": rkey,
364 "record": record,
365 }
366
367 def update_dpop_pds_nonce(nonce: str):
368 session_ = user._replace(dpop_pds_nonce=nonce)
369 save_auth_session(session, session_)
370
371 response = await pds_authed_req(
372 method="POST",
373 url=endpoint,
374 body=body,
375 user=user,
376 update_dpop_pds_nonce=update_dpop_pds_nonce,
377 )
378
379 if not response.ok:
380 app.logger.warning(f"put_record failed with status {response.status}")
381 app.logger.warning(await response.text())
382
383 return response.ok
384
385
386def _is_did_blocked(did: str) -> bool:
387 kv = KV(app, app.logger, "blockeddids")
388 return kv.get(did) is not None