Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 366 lines 11 kB view raw
1import type { Directory, Entry, File, Record as FsRecord } from '@wispplace/lexicons/types/place/wisp/fs'; 2import type { Record as SubfsRecord } from '@wispplace/lexicons/types/place/wisp/subfs'; 3import { extractBlobCid, resolveDid, getPdsForDid } from '@wispplace/atproto-utils'; 4import { sanitizePath } from '@wispplace/fs-utils'; 5import { existsSync, mkdirSync, writeFileSync, rmSync, renameSync, readFileSync } from 'fs'; 6import { dirname, join } from 'path'; 7import { gunzipSync } from 'zlib'; 8import { createSpinner, formatBytes, pc } from '../lib/progress.ts'; 9import { loadMetadata, saveMetadata, type SiteMetadata } from '../lib/metadata.ts'; 10 11const MAX_CONCURRENT_DOWNLOADS = 20; 12 13export interface PullOptions { 14 site: string; 15 path: string; 16} 17 18async function fetchRecord(pdsEndpoint: string, did: string, collection: string, rkey: string): Promise<any> { 19 const url = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}`; 20 const res = await fetch(url); 21 if (!res.ok) { 22 throw new Error(`Failed to fetch record: ${res.status}`); 23 } 24 return res.json(); 25} 26 27function extractSubfsUris(directory: Directory, currentPath: string = ''): Array<{ uri: string; path: string }> { 28 const uris: Array<{ uri: string; path: string }> = []; 29 30 for (const entry of directory.entries) { 31 const fullPath = currentPath ? `${currentPath}/${entry.name}` : entry.name; 32 33 if ('type' in entry.node) { 34 if (entry.node.type === 'subfs') { 35 const subfsNode = entry.node as any; 36 if (subfsNode.subject) { 37 uris.push({ uri: subfsNode.subject, path: fullPath }); 38 } 39 } else if (entry.node.type === 'directory') { 40 const subUris = extractSubfsUris(entry.node as Directory, fullPath); 41 uris.push(...subUris); 42 } 43 } 44 } 45 46 return uris; 47} 48 49async function expandSubfsNodes( 50 directory: Directory, 51 pdsEndpoint: string, 52 depth: number = 0, 53 subfsCache: Map<string, SubfsRecord | null> = new Map() 54): Promise<Directory> { 55 const MAX_DEPTH = 10; 56 57 if (depth >= MAX_DEPTH) { 58 console.warn('Max subfs expansion depth reached'); 59 return directory; 60 } 61 62 const subfsUris = extractSubfsUris(directory); 63 if (subfsUris.length === 0) { 64 return directory; 65 } 66 67 // Fetch uncached subfs records 68 const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri)); 69 70 if (uncachedUris.length > 0) { 71 await Promise.all(uncachedUris.map(async ({ uri }) => { 72 try { 73 const parts = uri.replace('at://', '').split('/'); 74 const did = parts[0]!; 75 const collection = parts[1]!; 76 const rkey = parts[2]!; 77 78 const data = await fetchRecord(pdsEndpoint, did, collection, rkey); 79 subfsCache.set(uri, data.value as SubfsRecord); 80 } catch { 81 subfsCache.set(uri, null); 82 } 83 })); 84 } 85 86 // Build map of path -> entries 87 const subfsMap = new Map<string, Entry[]>(); 88 for (const { uri, path } of subfsUris) { 89 const record = subfsCache.get(uri); 90 if (record?.root?.entries) { 91 subfsMap.set(path, record.root.entries as unknown as Entry[]); 92 } 93 } 94 95 // Replace subfs nodes with their content 96 function replaceSubfsInEntries(entries: Entry[], currentPath: string = ''): Entry[] { 97 const result: Entry[] = []; 98 99 for (const entry of entries) { 100 const fullPath = currentPath ? `${currentPath}/${entry.name}` : entry.name; 101 const node = entry.node; 102 103 if ('type' in node && node.type === 'subfs') { 104 const subfsNode = node as any; 105 const isFlat = subfsNode.flat !== false; 106 const subfsEntries = subfsMap.get(fullPath); 107 108 if (subfsEntries) { 109 if (isFlat) { 110 const processedEntries = replaceSubfsInEntries(subfsEntries, currentPath); 111 result.push(...processedEntries); 112 } else { 113 const processedEntries = replaceSubfsInEntries(subfsEntries, fullPath); 114 result.push({ 115 name: entry.name, 116 node: { 117 type: 'directory', 118 entries: processedEntries 119 } as any 120 }); 121 } 122 } else { 123 result.push(entry); 124 } 125 } else if ('type' in node && node.type === 'directory' && 'entries' in node) { 126 result.push({ 127 ...entry, 128 node: { 129 ...node, 130 entries: replaceSubfsInEntries(node.entries, fullPath) 131 } 132 }); 133 } else { 134 result.push(entry); 135 } 136 } 137 138 return result; 139 } 140 141 const partiallyExpanded = { 142 ...directory, 143 entries: replaceSubfsInEntries(directory.entries) 144 }; 145 146 return expandSubfsNodes(partiallyExpanded, pdsEndpoint, depth + 1, subfsCache); 147} 148 149interface FileToDownload { 150 path: string; 151 cid: string; 152 encoding?: 'gzip'; 153 mimeType?: string; 154 base64?: boolean; 155} 156 157function collectFiles( 158 entries: Entry[], 159 pathPrefix: string, 160 existingCids: Record<string, string> 161): { toDownload: FileToDownload[]; toSkip: number } { 162 const toDownload: FileToDownload[] = []; 163 let toSkip = 0; 164 165 function collect(entries: Entry[], currentPath: string) { 166 for (const entry of entries) { 167 const fullPath = currentPath ? `${currentPath}/${entry.name}` : entry.name; 168 const node = entry.node; 169 170 if ('type' in node && node.type === 'directory' && 'entries' in node) { 171 collect(node.entries, fullPath); 172 } else if ('type' in node && node.type === 'file' && 'blob' in node) { 173 const fileNode = node as File; 174 const cid = extractBlobCid(fileNode.blob); 175 176 if (!cid) continue; 177 178 if (existingCids[fullPath] === cid) { 179 toSkip++; 180 } else { 181 toDownload.push({ 182 path: fullPath, 183 cid, 184 encoding: fileNode.encoding, 185 mimeType: fileNode.mimeType, 186 base64: fileNode.base64 187 }); 188 } 189 } 190 } 191 } 192 193 collect(entries, pathPrefix); 194 return { toDownload, toSkip }; 195} 196 197async function downloadBlob( 198 pdsEndpoint: string, 199 did: string, 200 file: FileToDownload 201): Promise<Buffer> { 202 const url = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 203 const res = await fetch(url); 204 205 if (!res.ok) { 206 throw new Error(`Failed to download blob ${file.cid}: ${res.status}`); 207 } 208 209 let content = Buffer.from(await res.arrayBuffer()); 210 211 // Decode base64 if needed 212 if (file.base64) { 213 const base64String = content.toString('utf-8'); 214 content = Buffer.from(base64String, 'base64'); 215 } 216 217 // Decompress gzip 218 if (file.encoding === 'gzip' && content.length >= 2 && content[0] === 0x1f && content[1] === 0x8b) { 219 try { 220 content = gunzipSync(content); 221 } catch { 222 // Keep original content if decompression fails 223 } 224 } 225 226 return content; 227} 228 229export async function pull( 230 identifier: string, 231 options: PullOptions 232): Promise<void> { 233 const { site, path: outputPath } = options; 234 235 console.log(pc.cyan(`\nPulling ${pc.bold(site)} from ${identifier}\n`)); 236 237 // 1. Resolve DID 238 const spinner = createSpinner('Resolving identity...').start(); 239 const did = await resolveDid(identifier); 240 241 if (!did) { 242 spinner.fail('Failed to resolve identity'); 243 throw new Error(`Could not resolve: ${identifier}`); 244 } 245 246 spinner.succeed(`Resolved to ${did}`); 247 248 // 2. Get PDS endpoint 249 const pdsSpinner = createSpinner('Getting PDS endpoint...').start(); 250 const pdsEndpoint = await getPdsForDid(did); 251 252 if (!pdsEndpoint) { 253 pdsSpinner.fail('Failed to get PDS endpoint'); 254 throw new Error(`Could not get PDS for: ${did}`); 255 } 256 257 pdsSpinner.succeed(`PDS: ${pdsEndpoint}`); 258 259 // 3. Fetch site record 260 const recordSpinner = createSpinner('Fetching site record...').start(); 261 let recordData; 262 263 try { 264 recordData = await fetchRecord(pdsEndpoint, did, 'place.wisp.fs', site); 265 } catch { 266 recordSpinner.fail('Site not found'); 267 throw new Error(`Site not found: ${site}`); 268 } 269 270 const record = recordData.value as FsRecord; 271 const recordCid = recordData.cid || ''; 272 recordSpinner.succeed('Fetched site record'); 273 274 // 4. Expand subfs nodes 275 const expandSpinner = createSpinner('Expanding subfs nodes...').start(); 276 const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint); 277 expandSpinner.succeed('Expanded subfs nodes'); 278 279 // 5. Load existing metadata for incremental updates 280 const existingMetadata = loadMetadata(outputPath); 281 const existingCids = existingMetadata?.fileCids || {}; 282 283 // 6. Collect files to download 284 const { toDownload, toSkip } = collectFiles(expandedRoot.entries, '', existingCids); 285 286 console.log(pc.dim(`Files to download: ${toDownload.length}, unchanged: ${toSkip}`)); 287 288 if (toDownload.length === 0 && toSkip > 0) { 289 console.log(pc.green('\n✓ Site is already up to date\n')); 290 return; 291 } 292 293 // 7. Create temp directory 294 const tempDir = `${outputPath}.tmp-${Date.now()}`; 295 mkdirSync(tempDir, { recursive: true }); 296 297 // 8. Download files 298 const downloadSpinner = createSpinner(`Downloading ${toDownload.length} files...`).start(); 299 const newFileCids: Record<string, string> = { ...existingCids }; 300 let downloaded = 0; 301 302 try { 303 for (let i = 0; i < toDownload.length; i += MAX_CONCURRENT_DOWNLOADS) { 304 const batch = toDownload.slice(i, i + MAX_CONCURRENT_DOWNLOADS); 305 306 await Promise.all(batch.map(async (file) => { 307 const content = await downloadBlob(pdsEndpoint, did, file); 308 const filePath = join(tempDir, sanitizePath(file.path)); 309 310 mkdirSync(dirname(filePath), { recursive: true }); 311 writeFileSync(filePath, content); 312 313 newFileCids[file.path] = file.cid; 314 downloaded++; 315 downloadSpinner.text = `Downloading files: ${downloaded}/${toDownload.length}`; 316 })); 317 } 318 319 downloadSpinner.succeed(`Downloaded ${downloaded} files`); 320 321 // 9. Copy unchanged files from existing directory 322 if (toSkip > 0 && existsSync(outputPath)) { 323 const copySpinner = createSpinner(`Copying ${toSkip} unchanged files...`).start(); 324 325 for (const [filePath, cid] of Object.entries(existingCids)) { 326 if (!toDownload.find(f => f.path === filePath)) { 327 const srcPath = join(outputPath, sanitizePath(filePath)); 328 const destPath = join(tempDir, sanitizePath(filePath)); 329 330 if (existsSync(srcPath)) { 331 mkdirSync(dirname(destPath), { recursive: true }); 332 const content = readFileSync(srcPath); 333 writeFileSync(destPath, content); 334 } 335 } 336 } 337 338 copySpinner.succeed(`Copied ${toSkip} unchanged files`); 339 } 340 341 // 10. Atomic replace 342 if (existsSync(outputPath)) { 343 const backupPath = `${outputPath}.backup-${Date.now()}`; 344 renameSync(outputPath, backupPath); 345 renameSync(tempDir, outputPath); 346 rmSync(backupPath, { recursive: true, force: true }); 347 } else { 348 renameSync(tempDir, outputPath); 349 } 350 351 // 11. Save metadata 352 const metadata: SiteMetadata = { 353 recordCid, 354 fileCids: newFileCids, 355 lastSync: Date.now() 356 }; 357 saveMetadata(outputPath, metadata); 358 359 console.log(pc.green(`\n✓ Pulled ${site} to ${outputPath}\n`)); 360 361 } catch (err) { 362 // Cleanup temp dir on error 363 rmSync(tempDir, { recursive: true, force: true }); 364 throw err; 365 } 366}