A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: add blob storage to profile hydration service

Initialize LocalBlobStorage or S3BlobStorage when HYDRATE_BLOBS=true.
Actually store profile blob files to disk and save storage_path to database.

Previously only computed hashes without saving actual image data.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+26 -1
+26 -1
src/hydration/profiles.service.ts
··· 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 4 import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js"; 5 5 import { computeBlobHashes } from "../blobs/hasher.js"; 6 + import { LocalBlobStorage } from "../blobs/storage/local.js"; 7 + import { S3BlobStorage } from "../blobs/storage/s3.js"; 8 + import { BlobStorage } from "../blobs/processor.js"; 6 9 import { pRateLimit } from "p-ratelimit"; 7 10 import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 8 11 import { logger } from "../logger/index.js"; ··· 12 15 private agent: AtpAgent; 13 16 private profilesRepo: ProfilesRepository; 14 17 private profileBlobsRepo: ProfileBlobsRepository; 18 + private storage: BlobStorage | null = null; 15 19 private limit: ReturnType<typeof pRateLimit>; 16 20 17 21 constructor(db: Database) { 18 22 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 19 23 this.profilesRepo = new ProfilesRepository(db); 20 24 this.profileBlobsRepo = new ProfileBlobsRepository(db); 25 + 26 + if (config.blobs.hydrateBlobs) { 27 + if (config.blobs.storage.type === "s3") { 28 + this.storage = new S3BlobStorage( 29 + config.blobs.storage.s3Bucket!, 30 + config.blobs.storage.s3Region! 31 + ); 32 + } else { 33 + this.storage = new LocalBlobStorage( 34 + config.blobs.storage.localPath 35 + ); 36 + } 37 + } 38 + 21 39 this.limit = pRateLimit({ 22 40 interval: 300000, 23 41 rate: 3000, ··· 209 227 } 210 228 211 229 const blobData = Buffer.from(await blobResponse.arrayBuffer()); 230 + 231 + let storagePath: string | undefined; 232 + if (this.storage && config.blobs.hydrateBlobs) { 233 + storagePath = await this.storage.store(cid, blobData, "image/jpeg"); 234 + } 235 + 212 236 const hashes = await computeBlobHashes(blobData, "image/jpeg"); 213 237 214 238 await this.profileBlobsRepo.insert({ ··· 217 241 blob_cid: cid, 218 242 sha256: hashes.sha256, 219 243 phash: hashes.phash, 244 + storage_path: storagePath, 220 245 mimetype: "image/jpeg", 221 246 }); 222 247 223 - logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint }, "Profile blob processed successfully"); 248 + logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully"); 224 249 } 225 250 }