search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 300 lines 9.7 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "pydantic-settings"] 5# /// 6""" 7Backfill cover_image column for existing documents. 8 9Fetches records from their PDS and extracts coverImage blob CIDs. 10For leaflet documents, falls back to the first image block CID. 11 12Usage: 13 ./scripts/backfill-cover-images # all platforms 14 ./scripts/backfill-cover-images --platform pckt # specific platform 15 ./scripts/backfill-cover-images --dry-run # preview only 16 ./scripts/backfill-cover-images --limit 50 # limit batch size 17""" 18 19import argparse 20import os 21import sys 22import time 23 24import httpx 25from pydantic_settings import BaseSettings, SettingsConfigDict 26 27 28class Settings(BaseSettings): 29 model_config = SettingsConfigDict( 30 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 31 ) 32 turso_url: str 33 turso_token: str 34 35 @property 36 def turso_host(self) -> str: 37 url = self.turso_url 38 if url.startswith("libsql://"): 39 url = url[len("libsql://"):] 40 return url 41 42 43def turso_query(settings: Settings, sql: str, args: list | None = None) -> list[dict]: 44 stmt: dict = {"sql": sql} 45 if args: 46 stmt["args"] = [] 47 for a in args: 48 if a is None: 49 stmt["args"].append({"type": "null"}) 50 else: 51 stmt["args"].append({"type": "text", "value": str(a)}) 52 53 response = httpx.post( 54 f"https://{settings.turso_host}/v2/pipeline", 55 headers={ 56 "Authorization": f"Bearer {settings.turso_token}", 57 "Content-Type": "application/json", 58 }, 59 json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 60 timeout=30, 61 ) 62 response.raise_for_status() 63 data = response.json() 64 65 result = data["results"][0] 66 if result.get("type") == "error": 67 raise RuntimeError(f"turso error: {result['error']}") 68 69 resp = result["response"]["result"] 70 cols = [c["name"] for c in resp["cols"]] 71 rows = [] 72 for row in resp["rows"]: 73 rows.append({cols[i]: cell["value"] if cell["type"] != "null" else None for i, cell in enumerate(row)}) 74 return rows 75 76 77def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None: 78 stmt: dict = {"sql": sql} 79 if args: 80 stmt["args"] = [] 81 for a in args: 82 if a is None: 83 stmt["args"].append({"type": "null"}) 84 else: 85 stmt["args"].append({"type": "text", "value": str(a)}) 86 87 response = httpx.post( 88 f"https://{settings.turso_host}/v2/pipeline", 89 headers={ 90 "Authorization": f"Bearer {settings.turso_token}", 91 "Content-Type": "application/json", 92 }, 93 json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 94 timeout=30, 95 ) 96 if response.status_code != 200: 97 print(f"turso error: {response.text}", file=sys.stderr) 98 response.raise_for_status() 99 100 101# --- PDS helpers --- 102 103_pds_cache: dict[str, str] = {} 104 105 106def get_pds_endpoint(did: str) -> str | None: 107 if did in _pds_cache: 108 return _pds_cache[did] 109 try: 110 resp = httpx.get(f"https://plc.directory/{did}", timeout=10) 111 resp.raise_for_status() 112 data = resp.json() 113 for service in data.get("service", []): 114 if service.get("type") == "AtprotoPersonalDataServer": 115 _pds_cache[did] = service["serviceEndpoint"] 116 return _pds_cache[did] 117 except Exception as e: 118 print(f" warning: failed to resolve PDS for {did}: {e}", file=sys.stderr) 119 return None 120 121 122def get_record(pds: str, did: str, collection: str, rkey: str) -> dict | None: 123 try: 124 resp = httpx.get( 125 f"{pds}/xrpc/com.atproto.repo.getRecord", 126 params={"repo": did, "collection": collection, "rkey": rkey}, 127 timeout=10, 128 ) 129 if resp.status_code in (400, 404): 130 return None 131 resp.raise_for_status() 132 return resp.json().get("value") 133 except Exception as e: 134 print(f" warning: failed to fetch {collection}/{rkey}: {e}", file=sys.stderr) 135 return None 136 137 138# platform-native collections to try when source_collection fails 139FALLBACK_COLLECTIONS: dict[str, list[str]] = { 140 "pckt": ["blog.pckt.document"], 141 "offprint": ["app.offprint.document.article"], 142 "greengale": ["app.greengale.document"], 143 "leaflet": ["pub.leaflet.document"], 144} 145 146 147def get_record_with_fallbacks(pds: str, did: str, collection: str, rkey: str, platform: str) -> dict | None: 148 """Try source_collection first, then platform-native collections.""" 149 value = get_record(pds, did, collection, rkey) 150 if value: 151 return value 152 for alt in FALLBACK_COLLECTIONS.get(platform, []): 153 if alt != collection: 154 value = get_record(pds, did, alt, rkey) 155 if value: 156 return value 157 return None 158 159 160# --- cover image extraction (mirrors extractor.zig logic) --- 161 162def extract_cover_image(value: dict) -> str | None: 163 """Extract cover image CID from a record value.""" 164 # try coverImage.ref.$link (site.standard/pckt/offprint/greengale) 165 cover = value.get("coverImage") 166 if isinstance(cover, dict): 167 ref = cover.get("ref") 168 if isinstance(ref, dict): 169 link = ref.get("$link") 170 if link: 171 return link 172 173 # fallback: first image block in leaflet pages 174 return extract_first_image_cid(value) 175 176 177def extract_first_image_cid(value: dict) -> str | None: 178 """Extract first image blob CID from leaflet page blocks.""" 179 # pages at top level or nested in content 180 pages = value.get("pages") 181 if not isinstance(pages, list): 182 content = value.get("content") 183 if isinstance(content, dict): 184 pages = content.get("pages") 185 if not isinstance(pages, list): 186 return None 187 188 for page in pages: 189 if not isinstance(page, dict): 190 continue 191 blocks = page.get("blocks", []) 192 for wrapper in blocks: 193 if not isinstance(wrapper, dict): 194 continue 195 block = wrapper.get("block", {}) 196 if not isinstance(block, dict): 197 continue 198 if block.get("$type") == "pub.leaflet.blocks.image": 199 image = block.get("image") 200 if isinstance(image, dict): 201 ref = image.get("ref") 202 if isinstance(ref, dict): 203 link = ref.get("$link") 204 if link: 205 return link 206 return None 207 208 209def collection_for_platform(platform: str) -> str: 210 return { 211 "leaflet": "pub.leaflet.document", 212 "pckt": "site.standard.document", 213 "offprint": "site.standard.document", 214 "greengale": "site.standard.document", 215 "other": "site.standard.document", 216 "whitewind": "com.whtwnd.blog.entry", 217 }.get(platform, "site.standard.document") 218 219 220def main(): 221 parser = argparse.ArgumentParser(description="Backfill cover_image for existing documents") 222 parser.add_argument("--platform", help="Only backfill this platform") 223 parser.add_argument("--limit", type=int, default=500, help="Max documents to process (default 500)") 224 parser.add_argument("--dry-run", action="store_true", help="Preview without writing") 225 args = parser.parse_args() 226 227 try: 228 settings = Settings() # type: ignore 229 except Exception as e: 230 print(f"error loading settings: {e}", file=sys.stderr) 231 print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr) 232 sys.exit(1) 233 234 # query docs missing cover_image 235 where = "WHERE (cover_image IS NULL OR cover_image = '')" 236 sql_args: list = [] 237 if args.platform: 238 where += " AND platform = ?" 239 sql_args.append(args.platform) 240 241 sql = f"SELECT uri, did, rkey, platform, source_collection, title FROM documents {where} LIMIT ?" 242 sql_args.append(args.limit) 243 244 print(f"querying documents missing cover_image...") 245 docs = turso_query(settings, sql, sql_args) 246 print(f"found {len(docs)} documents to check") 247 248 if not docs: 249 return 250 251 updated = 0 252 skipped = 0 253 errors = 0 254 255 for i, doc in enumerate(docs): 256 did = doc["did"] 257 rkey = doc["rkey"] 258 platform = doc["platform"] 259 collection = doc.get("source_collection") or collection_for_platform(platform) 260 title = (doc.get("title") or "")[:50] 261 262 # resolve PDS 263 pds = get_pds_endpoint(did) 264 if not pds: 265 errors += 1 266 continue 267 268 # fetch record (try source_collection, then platform-native fallbacks) 269 value = get_record_with_fallbacks(pds, did, collection, rkey, platform) 270 if not value: 271 skipped += 1 272 continue 273 274 # extract cover image 275 cid = extract_cover_image(value) 276 if not cid: 277 skipped += 1 278 continue 279 280 if args.dry_run: 281 print(f" [{i+1}/{len(docs)}] would set cover_image={cid[:20]}... for {title}...") 282 else: 283 turso_exec( 284 settings, 285 "UPDATE documents SET cover_image = ? WHERE uri = ?", 286 [cid, doc["uri"]], 287 ) 288 print(f" [{i+1}/{len(docs)}] {title}... -> {cid[:20]}...") 289 290 updated += 1 291 292 # be nice to PDS servers 293 if (i + 1) % 50 == 0: 294 time.sleep(1) 295 296 print(f"\ndone! {updated} updated, {skipped} skipped (no image), {errors} errors") 297 298 299if __name__ == "__main__": 300 main()