Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 220 lines 6.2 kB view raw
1import type { AtprotoClient } from "./atproto-client.ts"; 2import type { MigrationProgress } from "./types.ts"; 3 4export interface BlobMigrationResult { 5 migrated: number; 6 failed: string[]; 7 total: number; 8 sourceUnreachable: boolean; 9} 10 11const MAX_RETRIES = 3; 12const RETRY_DELAYS = [1000, 2000, 4000]; 13 14const sleep = (ms: number): Promise<void> => 15 new Promise((resolve) => setTimeout(resolve, ms)); 16 17const safeProgress = ( 18 onProgress: (update: Partial<MigrationProgress>) => void, 19 update: Partial<MigrationProgress>, 20): void => { 21 try { 22 onProgress(update); 23 } catch (e) { 24 console.warn("[blob-migration] Progress callback failed:", e); 25 } 26}; 27 28interface MigrateBlobResult { 29 cid: string; 30 success: boolean; 31 error?: string; 32} 33 34const migrateSingleBlob = async ( 35 cid: string, 36 userDid: string, 37 sourceClient: AtprotoClient, 38 localClient: AtprotoClient, 39 attempt = 0, 40): Promise<MigrateBlobResult> => { 41 try { 42 console.log( 43 `[blob-migration] Fetching blob ${cid} from source (attempt ${ 44 attempt + 1 45 })`, 46 ); 47 const { data: blobData, contentType } = await sourceClient 48 .getBlobWithContentType(userDid, cid); 49 console.log( 50 `[blob-migration] Got blob ${cid}, size: ${blobData.byteLength}, type: ${contentType}`, 51 ); 52 53 console.log(`[blob-migration] Uploading blob ${cid} to local PDS...`); 54 const uploadResult = await localClient.uploadBlob(blobData, contentType); 55 console.log( 56 `[blob-migration] Upload response for ${cid}:`, 57 JSON.stringify(uploadResult), 58 ); 59 60 return { cid, success: true }; 61 } catch (e) { 62 const errorMessage = (e as Error).message || String(e); 63 console.error( 64 `[blob-migration] Failed to migrate blob ${cid} (attempt ${ 65 attempt + 1 66 }):`, 67 errorMessage, 68 ); 69 70 const isRetryable = attempt < MAX_RETRIES - 1 && 71 !errorMessage.includes("404") && 72 !errorMessage.includes("not found") && 73 !errorMessage.includes("BlobNotFound"); 74 75 if (isRetryable) { 76 const delay = RETRY_DELAYS[attempt] ?? 4000; 77 console.log(`[blob-migration] Retrying ${cid} in ${delay}ms...`); 78 await sleep(delay); 79 return migrateSingleBlob( 80 cid, 81 userDid, 82 sourceClient, 83 localClient, 84 attempt + 1, 85 ); 86 } 87 88 return { cid, success: false, error: errorMessage }; 89 } 90}; 91 92const collectMissingBlobs = async ( 93 localClient: AtprotoClient, 94): Promise<string[]> => { 95 const allBlobs: string[] = []; 96 let cursor: string | undefined; 97 98 do { 99 const { blobs, cursor: nextCursor } = await localClient.listMissingBlobs( 100 cursor, 101 500, 102 ); 103 console.log( 104 `[blob-migration] listMissingBlobs returned ${blobs.length} blobs, cursor: ${nextCursor}`, 105 ); 106 allBlobs.push(...blobs.map((blob) => blob.cid)); 107 cursor = nextCursor; 108 } while (cursor); 109 110 return allBlobs; 111}; 112 113export async function migrateBlobs( 114 localClient: AtprotoClient, 115 sourceClient: AtprotoClient | null, 116 userDid: string, 117 onProgress: (update: Partial<MigrationProgress>) => void, 118): Promise<BlobMigrationResult> { 119 console.log("[blob-migration] Starting blob migration for", userDid); 120 console.log( 121 "[blob-migration] Source client:", 122 sourceClient 123 ? `available (baseUrl: ${sourceClient.getBaseUrl()})` 124 : "NOT AVAILABLE", 125 ); 126 console.log( 127 "[blob-migration] Local client baseUrl:", 128 localClient.getBaseUrl(), 129 ); 130 console.log( 131 "[blob-migration] Local client has access token:", 132 localClient.getAccessToken() ? "yes" : "NO", 133 ); 134 135 safeProgress(onProgress, { 136 currentOperation: "Checking for missing blobs...", 137 }); 138 139 const missingBlobs = await collectMissingBlobs(localClient); 140 141 console.log("[blob-migration] Total missing blobs:", missingBlobs.length); 142 safeProgress(onProgress, { blobsTotal: missingBlobs.length }); 143 144 if (missingBlobs.length === 0) { 145 console.log("[blob-migration] No blobs to migrate"); 146 safeProgress(onProgress, { currentOperation: "No blobs to migrate" }); 147 return { migrated: 0, failed: [], total: 0, sourceUnreachable: false }; 148 } 149 150 if (!sourceClient) { 151 console.warn( 152 "[blob-migration] No source client available, cannot fetch blobs", 153 ); 154 safeProgress(onProgress, { 155 currentOperation: 156 `${missingBlobs.length} media files missing. No source PDS URL available - your old server may have shut down. Posts will work, but some images/media may be unavailable.`, 157 }); 158 return { 159 migrated: 0, 160 failed: missingBlobs, 161 total: missingBlobs.length, 162 sourceUnreachable: true, 163 }; 164 } 165 166 safeProgress(onProgress, { 167 currentOperation: `Migrating ${missingBlobs.length} blobs...`, 168 }); 169 170 const results = await missingBlobs.reduce< 171 Promise<{ migrated: number; failed: string[] }> 172 >( 173 async (accPromise, cid, index) => { 174 const acc = await accPromise; 175 176 safeProgress(onProgress, { 177 currentOperation: `Migrating blob ${ 178 index + 1 179 }/${missingBlobs.length}...`, 180 blobsMigrated: acc.migrated, 181 }); 182 183 const result = await migrateSingleBlob( 184 cid, 185 userDid, 186 sourceClient, 187 localClient, 188 ); 189 190 return result.success 191 ? { migrated: acc.migrated + 1, failed: acc.failed } 192 : { migrated: acc.migrated, failed: [...acc.failed, cid] }; 193 }, 194 Promise.resolve({ migrated: 0, failed: [] as string[] }), 195 ); 196 197 const { migrated, failed } = results; 198 199 safeProgress(onProgress, { blobsMigrated: migrated }); 200 201 const statusMessage = migrated === missingBlobs.length 202 ? `All ${migrated} blobs migrated successfully` 203 : migrated > 0 204 ? `${migrated}/${missingBlobs.length} blobs migrated. ${failed.length} failed.` 205 : `Could not migrate blobs (${failed.length} missing)`; 206 207 safeProgress(onProgress, { currentOperation: statusMessage }); 208 209 console.log( 210 `[blob-migration] Complete: ${migrated} migrated, ${failed.length} failed`, 211 ); 212 failed.length > 0 && console.log("[blob-migration] Failed CIDs:", failed); 213 214 return { 215 migrated, 216 failed, 217 total: missingBlobs.length, 218 sourceUnreachable: false, 219 }; 220}