my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add rebuild-atlas flow (every 6h)

builds the 2D semantic map (UMAP + HDBSCAN on tpuf vectors) and
deploys to Cloudflare Pages via Direct Upload API.

secrets: tpuf-token, cloudflare-api-token

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 9aecaa99 934ea1a4

+154
+148
flows/atlas.py
··· 1 + """ 2 + Rebuild the atlas (2D semantic map) and deploy to Cloudflare Pages. 3 + 4 + Clones leaflet-search, runs the build-atlas script (UMAP + HDBSCAN), 5 + then deploys the site to Cloudflare Pages via the Direct Upload API. 6 + 7 + Requires: 8 + - Secret block "tpuf-token" (turbopuffer API key) 9 + - Secret block "cloudflare-api-token" (Pages edit permission) 10 + """ 11 + 12 + import hashlib 13 + import json 14 + import os 15 + import subprocess 16 + import tempfile 17 + from pathlib import Path 18 + 19 + import httpx 20 + from prefect import flow, get_run_logger, task 21 + from prefect.blocks.system import Secret 22 + 23 + REPO_URL = "https://github.com/zzstoatzz/leaflet-search.git" 24 + CF_API = "https://api.cloudflare.com/client/v4" 25 + CF_ACCOUNT_ID = "8feb33b5fb57ce2bc093bc6f4141f40a" 26 + CF_PROJECT = "leaflet-search" 27 + 28 + 29 + @task 30 + def clone_repo(dest: Path) -> Path: 31 + """Shallow-clone leaflet-search to get site files + build script.""" 32 + subprocess.run( 33 + ["git", "clone", "--depth", "1", REPO_URL, str(dest)], 34 + check=True, 35 + capture_output=True, 36 + ) 37 + return dest 38 + 39 + 40 + @task 41 + def build_atlas(repo_dir: Path, tpuf_key: str) -> Path: 42 + """Run the build-atlas script. Returns path to atlas.json.""" 43 + logger = get_run_logger() 44 + output = repo_dir / "site" / "atlas.json" 45 + 46 + result = subprocess.run( 47 + ["uv", "run", "--script", str(repo_dir / "scripts" / "build-atlas"), 48 + "--output", str(output)], 49 + env={**os.environ, "TURBOPUFFER_API_KEY": tpuf_key}, 50 + capture_output=True, 51 + text=True, 52 + timeout=300, 53 + ) 54 + if result.returncode != 0: 55 + raise RuntimeError(f"build-atlas failed:\n{result.stderr}") 56 + 57 + for line in result.stdout.strip().splitlines(): 58 + logger.info(line) 59 + 60 + logger.info(f"atlas.json: {output.stat().st_size / 1024:.0f} KB") 61 + return output 62 + 63 + 64 + @task 65 + def deploy_to_pages(site_dir: Path, api_token: str) -> str: 66 + """Deploy site/ to Cloudflare Pages via Direct Upload API.""" 67 + logger = get_run_logger() 68 + headers = {"Authorization": f"Bearer {api_token}"} 69 + 70 + # build manifest: path -> truncated SHA-256 71 + manifest: dict[str, str] = {} 72 + content_by_hash: dict[str, bytes] = {} 73 + 74 + for path in sorted(site_dir.rglob("*")): 75 + if path.is_dir() or path.name.startswith("."): 76 + continue 77 + content = path.read_bytes() 78 + h = hashlib.sha256(content).hexdigest()[:32] 79 + rel = "/" + str(path.relative_to(site_dir)) 80 + manifest[rel] = h 81 + content_by_hash[h] = content 82 + 83 + logger.info(f"deploying {len(manifest)} files") 84 + 85 + # create deployment 86 + resp = httpx.post( 87 + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/pages/projects/{CF_PROJECT}/deployments", 88 + headers={**headers, "Content-Type": "application/json"}, 89 + json={"manifest": manifest, "branch": "main"}, 90 + timeout=60, 91 + ) 92 + resp.raise_for_status() 93 + deployment = resp.json()["result"] 94 + jwt = deployment["jwt"] 95 + logger.info(f"deployment {deployment['id']} created") 96 + 97 + # check which files need uploading 98 + jwt_headers = {"Authorization": f"Bearer {jwt}"} 99 + resp = httpx.post( 100 + f"{CF_API}/pages/assets/check-missing", 101 + headers=jwt_headers, 102 + json={"hashes": list(content_by_hash.keys())}, 103 + timeout=30, 104 + ) 105 + resp.raise_for_status() 106 + missing = set(resp.json()) 107 + logger.info(f"{len(missing)} files to upload ({len(manifest) - len(missing)} cached)") 108 + 109 + # upload missing files 110 + if missing: 111 + batch: list[tuple[str, tuple[str, bytes, str]]] = [] 112 + for h in missing: 113 + batch.append((h, ("blob", content_by_hash[h], "application/octet-stream"))) 114 + if len(batch) >= 50: 115 + httpx.post( 116 + f"{CF_API}/pages/assets/upload", 117 + headers=jwt_headers, 118 + files=batch, 119 + timeout=120, 120 + ).raise_for_status() 121 + batch = [] 122 + if batch: 123 + httpx.post( 124 + f"{CF_API}/pages/assets/upload", 125 + headers=jwt_headers, 126 + files=batch, 127 + timeout=120, 128 + ).raise_for_status() 129 + 130 + url = deployment.get("url", "") 131 + logger.info(f"deployed: {url}") 132 + return url 133 + 134 + 135 + @flow(name="rebuild-atlas", log_prints=True) 136 + def rebuild_atlas(): 137 + """Rebuild the 2D semantic map and deploy to Cloudflare Pages.""" 138 + tpuf_key = Secret.load("tpuf-token").get() 139 + cf_token = Secret.load("cloudflare-api-token").get() 140 + 141 + with tempfile.TemporaryDirectory() as tmpdir: 142 + repo_dir = clone_repo(Path(tmpdir) / "repo") 143 + build_atlas(repo_dir, tpuf_key) 144 + deploy_to_pages(repo_dir / "site", cf_token) 145 + 146 + 147 + if __name__ == "__main__": 148 + rebuild_atlas()
+6
prefect.yaml
··· 91 91 work_pool: *k8s 92 92 schedules: 93 93 - cron: "0 13 * * *" # 8am CT daily 94 + 95 + - name: rebuild-atlas 96 + entrypoint: flows/atlas.py:rebuild_atlas 97 + work_pool: *k8s 98 + schedules: 99 + - cron: "0 */6 * * *" # every 6 hours