A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: resolve user PDS endpoint from DID document for blob fetching

Query plc.wtf to get each user's actual PDS endpoint instead of assuming bsky.social. Fixes RepoNotFound errors when fetching blobs from users on different PDS instances.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+37 -8
+37 -8
src/hydration/profiles.service.ts
··· 157 157 } 158 158 } 159 159 160 + private async resolvePds(did: string): Promise<string | null> { 161 + try { 162 + const didDocResponse = await fetch(`https://plc.wtf/${did}`); 163 + if (!didDocResponse.ok) { 164 + logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document"); 165 + return null; 166 + } 167 + 168 + const didDoc = await didDocResponse.json(); 169 + const pdsService = didDoc.service?.find((s: any) => 170 + s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 171 + ); 172 + 173 + if (!pdsService?.serviceEndpoint) { 174 + logger.warn({ did }, "No PDS endpoint found in DID document"); 175 + return null; 176 + } 177 + 178 + return pdsService.serviceEndpoint; 179 + } catch (error) { 180 + logger.error({ error, did }, "Failed to resolve PDS from DID"); 181 + return null; 182 + } 183 + } 184 + 160 185 private async processProfileBlob( 161 186 did: string, 162 187 cid: string, ··· 170 195 return; 171 196 } 172 197 173 - const blobResponse = await this.agent.com.atproto.sync.getBlob({ 174 - did, 175 - cid, 176 - }); 198 + const pdsEndpoint = await this.resolvePds(did); 199 + if (!pdsEndpoint) { 200 + logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint"); 201 + return; 202 + } 177 203 178 - if (!blobResponse.success) { 179 - logger.warn({ did, cid, type }, "Failed to fetch blob from PDS"); 204 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 205 + const blobResponse = await fetch(blobUrl); 206 + 207 + if (!blobResponse.ok) { 208 + logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS"); 180 209 return; 181 210 } 182 211 183 - const blobData = Buffer.from(await blobResponse.data.arrayBuffer()); 212 + const blobData = Buffer.from(await blobResponse.arrayBuffer()); 184 213 const hashes = await computeBlobHashes(blobData, "image/jpeg"); 185 214 186 215 await this.blobsRepo.insert({ ··· 191 220 mimetype: "image/jpeg", 192 221 }); 193 222 194 - logger.info({ did, cid, type, sha256: hashes.sha256 }, "Profile blob processed successfully"); 223 + logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint }, "Profile blob processed successfully"); 195 224 } 196 225 }