forked from
nekomimi.pet/wisp.place-monorepo
Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
1import type { Directory, Entry, File, Record as FsRecord } from '@wispplace/lexicons/types/place/wisp/fs';
2import type { Record as SubfsRecord } from '@wispplace/lexicons/types/place/wisp/subfs';
3import { extractBlobCid, resolveDid, getPdsForDid } from '@wispplace/atproto-utils';
4import { sanitizePath } from '@wispplace/fs-utils';
5import { existsSync, mkdirSync, writeFileSync, rmSync, renameSync, readFileSync } from 'fs';
6import { dirname, join } from 'path';
7import { gunzipSync } from 'zlib';
8import { createSpinner, formatBytes, pc } from '../lib/progress.ts';
9import { loadMetadata, saveMetadata, type SiteMetadata } from '../lib/metadata.ts';
10
11const MAX_CONCURRENT_DOWNLOADS = 20;
12
13export interface PullOptions {
14 site: string;
15 path: string;
16}
17
18async function fetchRecord(pdsEndpoint: string, did: string, collection: string, rkey: string): Promise<any> {
19 const url = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}`;
20 const res = await fetch(url);
21 if (!res.ok) {
22 throw new Error(`Failed to fetch record: ${res.status}`);
23 }
24 return res.json();
25}
26
27function extractSubfsUris(directory: Directory, currentPath: string = ''): Array<{ uri: string; path: string }> {
28 const uris: Array<{ uri: string; path: string }> = [];
29
30 for (const entry of directory.entries) {
31 const fullPath = currentPath ? `${currentPath}/${entry.name}` : entry.name;
32
33 if ('type' in entry.node) {
34 if (entry.node.type === 'subfs') {
35 const subfsNode = entry.node as any;
36 if (subfsNode.subject) {
37 uris.push({ uri: subfsNode.subject, path: fullPath });
38 }
39 } else if (entry.node.type === 'directory') {
40 const subUris = extractSubfsUris(entry.node as Directory, fullPath);
41 uris.push(...subUris);
42 }
43 }
44 }
45
46 return uris;
47}
48
49async function expandSubfsNodes(
50 directory: Directory,
51 pdsEndpoint: string,
52 depth: number = 0,
53 subfsCache: Map<string, SubfsRecord | null> = new Map()
54): Promise<Directory> {
55 const MAX_DEPTH = 10;
56
57 if (depth >= MAX_DEPTH) {
58 console.warn('Max subfs expansion depth reached');
59 return directory;
60 }
61
62 const subfsUris = extractSubfsUris(directory);
63 if (subfsUris.length === 0) {
64 return directory;
65 }
66
67 // Fetch uncached subfs records
68 const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri));
69
70 if (uncachedUris.length > 0) {
71 await Promise.all(uncachedUris.map(async ({ uri }) => {
72 try {
73 const parts = uri.replace('at://', '').split('/');
74 const did = parts[0]!;
75 const collection = parts[1]!;
76 const rkey = parts[2]!;
77
78 const data = await fetchRecord(pdsEndpoint, did, collection, rkey);
79 subfsCache.set(uri, data.value as SubfsRecord);
80 } catch {
81 subfsCache.set(uri, null);
82 }
83 }));
84 }
85
86 // Build map of path -> entries
87 const subfsMap = new Map<string, Entry[]>();
88 for (const { uri, path } of subfsUris) {
89 const record = subfsCache.get(uri);
90 if (record?.root?.entries) {
91 subfsMap.set(path, record.root.entries as unknown as Entry[]);
92 }
93 }
94
95 // Replace subfs nodes with their content
96 function replaceSubfsInEntries(entries: Entry[], currentPath: string = ''): Entry[] {
97 const result: Entry[] = [];
98
99 for (const entry of entries) {
100 const fullPath = currentPath ? `${currentPath}/${entry.name}` : entry.name;
101 const node = entry.node;
102
103 if ('type' in node && node.type === 'subfs') {
104 const subfsNode = node as any;
105 const isFlat = subfsNode.flat !== false;
106 const subfsEntries = subfsMap.get(fullPath);
107
108 if (subfsEntries) {
109 if (isFlat) {
110 const processedEntries = replaceSubfsInEntries(subfsEntries, currentPath);
111 result.push(...processedEntries);
112 } else {
113 const processedEntries = replaceSubfsInEntries(subfsEntries, fullPath);
114 result.push({
115 name: entry.name,
116 node: {
117 type: 'directory',
118 entries: processedEntries
119 } as any
120 });
121 }
122 } else {
123 result.push(entry);
124 }
125 } else if ('type' in node && node.type === 'directory' && 'entries' in node) {
126 result.push({
127 ...entry,
128 node: {
129 ...node,
130 entries: replaceSubfsInEntries(node.entries, fullPath)
131 }
132 });
133 } else {
134 result.push(entry);
135 }
136 }
137
138 return result;
139 }
140
141 const partiallyExpanded = {
142 ...directory,
143 entries: replaceSubfsInEntries(directory.entries)
144 };
145
146 return expandSubfsNodes(partiallyExpanded, pdsEndpoint, depth + 1, subfsCache);
147}
148
149interface FileToDownload {
150 path: string;
151 cid: string;
152 encoding?: 'gzip';
153 mimeType?: string;
154 base64?: boolean;
155}
156
157function collectFiles(
158 entries: Entry[],
159 pathPrefix: string,
160 existingCids: Record<string, string>
161): { toDownload: FileToDownload[]; toSkip: number } {
162 const toDownload: FileToDownload[] = [];
163 let toSkip = 0;
164
165 function collect(entries: Entry[], currentPath: string) {
166 for (const entry of entries) {
167 const fullPath = currentPath ? `${currentPath}/${entry.name}` : entry.name;
168 const node = entry.node;
169
170 if ('type' in node && node.type === 'directory' && 'entries' in node) {
171 collect(node.entries, fullPath);
172 } else if ('type' in node && node.type === 'file' && 'blob' in node) {
173 const fileNode = node as File;
174 const cid = extractBlobCid(fileNode.blob);
175
176 if (!cid) continue;
177
178 if (existingCids[fullPath] === cid) {
179 toSkip++;
180 } else {
181 toDownload.push({
182 path: fullPath,
183 cid,
184 encoding: fileNode.encoding,
185 mimeType: fileNode.mimeType,
186 base64: fileNode.base64
187 });
188 }
189 }
190 }
191 }
192
193 collect(entries, pathPrefix);
194 return { toDownload, toSkip };
195}
196
197async function downloadBlob(
198 pdsEndpoint: string,
199 did: string,
200 file: FileToDownload
201): Promise<Buffer> {
202 const url = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`;
203 const res = await fetch(url);
204
205 if (!res.ok) {
206 throw new Error(`Failed to download blob ${file.cid}: ${res.status}`);
207 }
208
209 let content = Buffer.from(await res.arrayBuffer());
210
211 // Decode base64 if needed
212 if (file.base64) {
213 const base64String = content.toString('utf-8');
214 content = Buffer.from(base64String, 'base64');
215 }
216
217 // Decompress gzip
218 if (file.encoding === 'gzip' && content.length >= 2 && content[0] === 0x1f && content[1] === 0x8b) {
219 try {
220 content = gunzipSync(content);
221 } catch {
222 // Keep original content if decompression fails
223 }
224 }
225
226 return content;
227}
228
229export async function pull(
230 identifier: string,
231 options: PullOptions
232): Promise<void> {
233 const { site, path: outputPath } = options;
234
235 console.log(pc.cyan(`\nPulling ${pc.bold(site)} from ${identifier}\n`));
236
237 // 1. Resolve DID
238 const spinner = createSpinner('Resolving identity...').start();
239 const did = await resolveDid(identifier);
240
241 if (!did) {
242 spinner.fail('Failed to resolve identity');
243 throw new Error(`Could not resolve: ${identifier}`);
244 }
245
246 spinner.succeed(`Resolved to ${did}`);
247
248 // 2. Get PDS endpoint
249 const pdsSpinner = createSpinner('Getting PDS endpoint...').start();
250 const pdsEndpoint = await getPdsForDid(did);
251
252 if (!pdsEndpoint) {
253 pdsSpinner.fail('Failed to get PDS endpoint');
254 throw new Error(`Could not get PDS for: ${did}`);
255 }
256
257 pdsSpinner.succeed(`PDS: ${pdsEndpoint}`);
258
259 // 3. Fetch site record
260 const recordSpinner = createSpinner('Fetching site record...').start();
261 let recordData;
262
263 try {
264 recordData = await fetchRecord(pdsEndpoint, did, 'place.wisp.fs', site);
265 } catch {
266 recordSpinner.fail('Site not found');
267 throw new Error(`Site not found: ${site}`);
268 }
269
270 const record = recordData.value as FsRecord;
271 const recordCid = recordData.cid || '';
272 recordSpinner.succeed('Fetched site record');
273
274 // 4. Expand subfs nodes
275 const expandSpinner = createSpinner('Expanding subfs nodes...').start();
276 const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint);
277 expandSpinner.succeed('Expanded subfs nodes');
278
279 // 5. Load existing metadata for incremental updates
280 const existingMetadata = loadMetadata(outputPath);
281 const existingCids = existingMetadata?.fileCids || {};
282
283 // 6. Collect files to download
284 const { toDownload, toSkip } = collectFiles(expandedRoot.entries, '', existingCids);
285
286 console.log(pc.dim(`Files to download: ${toDownload.length}, unchanged: ${toSkip}`));
287
288 if (toDownload.length === 0 && toSkip > 0) {
289 console.log(pc.green('\n✓ Site is already up to date\n'));
290 return;
291 }
292
293 // 7. Create temp directory
294 const tempDir = `${outputPath}.tmp-${Date.now()}`;
295 mkdirSync(tempDir, { recursive: true });
296
297 // 8. Download files
298 const downloadSpinner = createSpinner(`Downloading ${toDownload.length} files...`).start();
299 const newFileCids: Record<string, string> = { ...existingCids };
300 let downloaded = 0;
301
302 try {
303 for (let i = 0; i < toDownload.length; i += MAX_CONCURRENT_DOWNLOADS) {
304 const batch = toDownload.slice(i, i + MAX_CONCURRENT_DOWNLOADS);
305
306 await Promise.all(batch.map(async (file) => {
307 const content = await downloadBlob(pdsEndpoint, did, file);
308 const filePath = join(tempDir, sanitizePath(file.path));
309
310 mkdirSync(dirname(filePath), { recursive: true });
311 writeFileSync(filePath, content);
312
313 newFileCids[file.path] = file.cid;
314 downloaded++;
315 downloadSpinner.text = `Downloading files: ${downloaded}/${toDownload.length}`;
316 }));
317 }
318
319 downloadSpinner.succeed(`Downloaded ${downloaded} files`);
320
321 // 9. Copy unchanged files from existing directory
322 if (toSkip > 0 && existsSync(outputPath)) {
323 const copySpinner = createSpinner(`Copying ${toSkip} unchanged files...`).start();
324
325 for (const [filePath, cid] of Object.entries(existingCids)) {
326 if (!toDownload.find(f => f.path === filePath)) {
327 const srcPath = join(outputPath, sanitizePath(filePath));
328 const destPath = join(tempDir, sanitizePath(filePath));
329
330 if (existsSync(srcPath)) {
331 mkdirSync(dirname(destPath), { recursive: true });
332 const content = readFileSync(srcPath);
333 writeFileSync(destPath, content);
334 }
335 }
336 }
337
338 copySpinner.succeed(`Copied ${toSkip} unchanged files`);
339 }
340
341 // 10. Atomic replace
342 if (existsSync(outputPath)) {
343 const backupPath = `${outputPath}.backup-${Date.now()}`;
344 renameSync(outputPath, backupPath);
345 renameSync(tempDir, outputPath);
346 rmSync(backupPath, { recursive: true, force: true });
347 } else {
348 renameSync(tempDir, outputPath);
349 }
350
351 // 11. Save metadata
352 const metadata: SiteMetadata = {
353 recordCid,
354 fileCids: newFileCids,
355 lastSync: Date.now()
356 };
357 saveMetadata(outputPath, metadata);
358
359 console.log(pc.green(`\n✓ Pulled ${site} to ${outputPath}\n`));
360
361 } catch (err) {
362 // Cleanup temp dir on error
363 rmSync(tempDir, { recursive: true, force: true });
364 throw err;
365 }
366}