search for standard sites
pub-search.waow.tech
search
zig
blog
atproto
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "pydantic-settings"]
5# ///
6"""
7Backfill cover_image column for existing documents.
8
9Fetches records from their PDS and extracts coverImage blob CIDs.
10For leaflet documents, falls back to the first image block CID.
11
12Usage:
13 ./scripts/backfill-cover-images # all platforms
14 ./scripts/backfill-cover-images --platform pckt # specific platform
15 ./scripts/backfill-cover-images --dry-run # preview only
16 ./scripts/backfill-cover-images --limit 50 # limit batch size
17"""
18
19import argparse
20import os
21import sys
22import time
23
24import httpx
25from pydantic_settings import BaseSettings, SettingsConfigDict
26
27
28class Settings(BaseSettings):
29 model_config = SettingsConfigDict(
30 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore"
31 )
32 turso_url: str
33 turso_token: str
34
35 @property
36 def turso_host(self) -> str:
37 url = self.turso_url
38 if url.startswith("libsql://"):
39 url = url[len("libsql://"):]
40 return url
41
42
43def turso_query(settings: Settings, sql: str, args: list | None = None) -> list[dict]:
44 stmt: dict = {"sql": sql}
45 if args:
46 stmt["args"] = []
47 for a in args:
48 if a is None:
49 stmt["args"].append({"type": "null"})
50 else:
51 stmt["args"].append({"type": "text", "value": str(a)})
52
53 response = httpx.post(
54 f"https://{settings.turso_host}/v2/pipeline",
55 headers={
56 "Authorization": f"Bearer {settings.turso_token}",
57 "Content-Type": "application/json",
58 },
59 json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]},
60 timeout=30,
61 )
62 response.raise_for_status()
63 data = response.json()
64
65 result = data["results"][0]
66 if result.get("type") == "error":
67 raise RuntimeError(f"turso error: {result['error']}")
68
69 resp = result["response"]["result"]
70 cols = [c["name"] for c in resp["cols"]]
71 rows = []
72 for row in resp["rows"]:
73 rows.append({cols[i]: cell["value"] if cell["type"] != "null" else None for i, cell in enumerate(row)})
74 return rows
75
76
77def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None:
78 stmt: dict = {"sql": sql}
79 if args:
80 stmt["args"] = []
81 for a in args:
82 if a is None:
83 stmt["args"].append({"type": "null"})
84 else:
85 stmt["args"].append({"type": "text", "value": str(a)})
86
87 response = httpx.post(
88 f"https://{settings.turso_host}/v2/pipeline",
89 headers={
90 "Authorization": f"Bearer {settings.turso_token}",
91 "Content-Type": "application/json",
92 },
93 json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]},
94 timeout=30,
95 )
96 if response.status_code != 200:
97 print(f"turso error: {response.text}", file=sys.stderr)
98 response.raise_for_status()
99
100
101# --- PDS helpers ---
102
103_pds_cache: dict[str, str] = {}
104
105
106def get_pds_endpoint(did: str) -> str | None:
107 if did in _pds_cache:
108 return _pds_cache[did]
109 try:
110 resp = httpx.get(f"https://plc.directory/{did}", timeout=10)
111 resp.raise_for_status()
112 data = resp.json()
113 for service in data.get("service", []):
114 if service.get("type") == "AtprotoPersonalDataServer":
115 _pds_cache[did] = service["serviceEndpoint"]
116 return _pds_cache[did]
117 except Exception as e:
118 print(f" warning: failed to resolve PDS for {did}: {e}", file=sys.stderr)
119 return None
120
121
122def get_record(pds: str, did: str, collection: str, rkey: str) -> dict | None:
123 try:
124 resp = httpx.get(
125 f"{pds}/xrpc/com.atproto.repo.getRecord",
126 params={"repo": did, "collection": collection, "rkey": rkey},
127 timeout=10,
128 )
129 if resp.status_code in (400, 404):
130 return None
131 resp.raise_for_status()
132 return resp.json().get("value")
133 except Exception as e:
134 print(f" warning: failed to fetch {collection}/{rkey}: {e}", file=sys.stderr)
135 return None
136
137
138# platform-native collections to try when source_collection fails
139FALLBACK_COLLECTIONS: dict[str, list[str]] = {
140 "pckt": ["blog.pckt.document"],
141 "offprint": ["app.offprint.document.article"],
142 "greengale": ["app.greengale.document"],
143 "leaflet": ["pub.leaflet.document"],
144}
145
146
147def get_record_with_fallbacks(pds: str, did: str, collection: str, rkey: str, platform: str) -> dict | None:
148 """Try source_collection first, then platform-native collections."""
149 value = get_record(pds, did, collection, rkey)
150 if value:
151 return value
152 for alt in FALLBACK_COLLECTIONS.get(platform, []):
153 if alt != collection:
154 value = get_record(pds, did, alt, rkey)
155 if value:
156 return value
157 return None
158
159
160# --- cover image extraction (mirrors extractor.zig logic) ---
161
162def extract_cover_image(value: dict) -> str | None:
163 """Extract cover image CID from a record value."""
164 # try coverImage.ref.$link (site.standard/pckt/offprint/greengale)
165 cover = value.get("coverImage")
166 if isinstance(cover, dict):
167 ref = cover.get("ref")
168 if isinstance(ref, dict):
169 link = ref.get("$link")
170 if link:
171 return link
172
173 # fallback: first image block in leaflet pages
174 return extract_first_image_cid(value)
175
176
177def extract_first_image_cid(value: dict) -> str | None:
178 """Extract first image blob CID from leaflet page blocks."""
179 # pages at top level or nested in content
180 pages = value.get("pages")
181 if not isinstance(pages, list):
182 content = value.get("content")
183 if isinstance(content, dict):
184 pages = content.get("pages")
185 if not isinstance(pages, list):
186 return None
187
188 for page in pages:
189 if not isinstance(page, dict):
190 continue
191 blocks = page.get("blocks", [])
192 for wrapper in blocks:
193 if not isinstance(wrapper, dict):
194 continue
195 block = wrapper.get("block", {})
196 if not isinstance(block, dict):
197 continue
198 if block.get("$type") == "pub.leaflet.blocks.image":
199 image = block.get("image")
200 if isinstance(image, dict):
201 ref = image.get("ref")
202 if isinstance(ref, dict):
203 link = ref.get("$link")
204 if link:
205 return link
206 return None
207
208
209def collection_for_platform(platform: str) -> str:
210 return {
211 "leaflet": "pub.leaflet.document",
212 "pckt": "site.standard.document",
213 "offprint": "site.standard.document",
214 "greengale": "site.standard.document",
215 "other": "site.standard.document",
216 "whitewind": "com.whtwnd.blog.entry",
217 }.get(platform, "site.standard.document")
218
219
220def main():
221 parser = argparse.ArgumentParser(description="Backfill cover_image for existing documents")
222 parser.add_argument("--platform", help="Only backfill this platform")
223 parser.add_argument("--limit", type=int, default=500, help="Max documents to process (default 500)")
224 parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
225 args = parser.parse_args()
226
227 try:
228 settings = Settings() # type: ignore
229 except Exception as e:
230 print(f"error loading settings: {e}", file=sys.stderr)
231 print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr)
232 sys.exit(1)
233
234 # query docs missing cover_image
235 where = "WHERE (cover_image IS NULL OR cover_image = '')"
236 sql_args: list = []
237 if args.platform:
238 where += " AND platform = ?"
239 sql_args.append(args.platform)
240
241 sql = f"SELECT uri, did, rkey, platform, source_collection, title FROM documents {where} LIMIT ?"
242 sql_args.append(args.limit)
243
244 print(f"querying documents missing cover_image...")
245 docs = turso_query(settings, sql, sql_args)
246 print(f"found {len(docs)} documents to check")
247
248 if not docs:
249 return
250
251 updated = 0
252 skipped = 0
253 errors = 0
254
255 for i, doc in enumerate(docs):
256 did = doc["did"]
257 rkey = doc["rkey"]
258 platform = doc["platform"]
259 collection = doc.get("source_collection") or collection_for_platform(platform)
260 title = (doc.get("title") or "")[:50]
261
262 # resolve PDS
263 pds = get_pds_endpoint(did)
264 if not pds:
265 errors += 1
266 continue
267
268 # fetch record (try source_collection, then platform-native fallbacks)
269 value = get_record_with_fallbacks(pds, did, collection, rkey, platform)
270 if not value:
271 skipped += 1
272 continue
273
274 # extract cover image
275 cid = extract_cover_image(value)
276 if not cid:
277 skipped += 1
278 continue
279
280 if args.dry_run:
281 print(f" [{i+1}/{len(docs)}] would set cover_image={cid[:20]}... for {title}...")
282 else:
283 turso_exec(
284 settings,
285 "UPDATE documents SET cover_image = ? WHERE uri = ?",
286 [cid, doc["uri"]],
287 )
288 print(f" [{i+1}/{len(docs)}] {title}... -> {cid[:20]}...")
289
290 updated += 1
291
292 # be nice to PDS servers
293 if (i + 1) % 50 == 0:
294 time.sleep(1)
295
296 print(f"\ndone! {updated} updated, {skipped} skipped (no image), {errors} errors")
297
298
299if __name__ == "__main__":
300 main()