A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement Phase 4: Blob processing with hashing and storage

- Add blob hashing utilities (SHA-256 and perceptual hash)
- Implement blob processor to extract references from embeds
- Create local and S3 storage backends
- Integrate blob processing into post hydration pipeline
- Update config schema for blob hydration settings
- Fix decoder tests for plural extractLabelsFromMessage

Blobs are always hashed for fingerprinting but only downloaded
if HYDRATE_BLOBS=true for safety (CSAM/sensitive content).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+474 -47
+68
src/blobs/hasher.ts
··· 1 + import crypto from "crypto"; 2 + import sharp from "sharp"; 3 + import { logger } from "../logger/index.js"; 4 + 5 + export async function computeSha256(buffer: Buffer): Promise<string> { 6 + return crypto.createHash("sha256").update(buffer).digest("hex"); 7 + } 8 + 9 + export async function computePerceptualHash(buffer: Buffer): Promise<string> { 10 + try { 11 + const image = sharp(buffer); 12 + const metadata = await image.metadata(); 13 + 14 + if (!metadata.width || !metadata.height) { 15 + throw new Error("Invalid image metadata"); 16 + } 17 + 18 + const resized = await image 19 + .resize(8, 8, { fit: "fill" }) 20 + .grayscale() 21 + .raw() 22 + .toBuffer(); 23 + 24 + const pixels = new Uint8Array(resized); 25 + const avg = 26 + pixels.reduce((sum, val) => sum + val, 0) / pixels.length; 27 + 28 + let hash = ""; 29 + for (let i = 0; i < pixels.length; i++) { 30 + hash += pixels[i] > avg ? "1" : "0"; 31 + } 32 + 33 + return BigInt("0b" + hash).toString(16).padStart(16, "0"); 34 + } catch (error) { 35 + logger.error({ error }, "Failed to compute perceptual hash"); 36 + throw error; 37 + } 38 + } 39 + 40 + export interface BlobHashes { 41 + sha256: string; 42 + phash?: string; 43 + } 44 + 45 + export async function computeBlobHashes( 46 + buffer: Buffer, 47 + mimetype?: string 48 + ): Promise<BlobHashes> { 49 + const sha256 = await computeSha256(buffer); 50 + 51 + if ( 52 + mimetype?.startsWith("image/") && 53 + !mimetype.includes("svg") 54 + ) { 55 + try { 56 + const phash = await computePerceptualHash(buffer); 57 + return { sha256, phash }; 58 + } catch (error) { 59 + logger.warn( 60 + { error, mimetype }, 61 + "Failed to compute pHash, returning SHA256 only" 62 + ); 63 + return { sha256 }; 64 + } 65 + } 66 + 67 + return { sha256 }; 68 + }
+196
src/blobs/processor.ts
··· 1 + import { AtpAgent } from "@atproto/api"; 2 + import { Database } from "duckdb"; 3 + import { BlobsRepository } from "../database/blobs.repository.js"; 4 + import { computeBlobHashes } from "./hasher.js"; 5 + import { LocalBlobStorage } from "./storage/local.js"; 6 + import { S3BlobStorage } from "./storage/s3.js"; 7 + import { config } from "../config/index.js"; 8 + import { logger } from "../logger/index.js"; 9 + 10 + export interface BlobReference { 11 + cid: string; 12 + mimeType?: string; 13 + } 14 + 15 + export interface BlobStorage { 16 + store(cid: string, data: Buffer, mimeType?: string): Promise<string>; 17 + retrieve(cid: string): Promise<Buffer | null>; 18 + } 19 + 20 + export class BlobProcessor { 21 + private blobsRepo: BlobsRepository; 22 + private storage: BlobStorage | null = null; 23 + private agent: AtpAgent; 24 + 25 + constructor(db: Database, agent: AtpAgent) { 26 + this.blobsRepo = new BlobsRepository(db); 27 + this.agent = agent; 28 + 29 + if (config.blobs.hydrateBlobs) { 30 + if (config.blobs.storage.type === "s3") { 31 + this.storage = new S3BlobStorage( 32 + config.blobs.storage.s3Bucket!, 33 + config.blobs.storage.s3Region! 34 + ); 35 + } else { 36 + this.storage = new LocalBlobStorage( 37 + config.blobs.storage.localPath 38 + ); 39 + } 40 + } 41 + } 42 + 43 + extractBlobReferences(embedsJson: any): BlobReference[] { 44 + const refs: BlobReference[] = []; 45 + 46 + if (!embedsJson || !Array.isArray(embedsJson)) { 47 + return refs; 48 + } 49 + 50 + for (const embed of embedsJson) { 51 + if (embed.images && Array.isArray(embed.images)) { 52 + for (const img of embed.images) { 53 + if (img.image?.ref?.$link) { 54 + refs.push({ 55 + cid: img.image.ref.$link, 56 + mimeType: img.image.mimeType, 57 + }); 58 + } 59 + } 60 + } 61 + 62 + if (embed.media?.images && Array.isArray(embed.media.images)) { 63 + for (const img of embed.media.images) { 64 + if (img.image?.ref?.$link) { 65 + refs.push({ 66 + cid: img.image.ref.$link, 67 + mimeType: img.image.mimeType, 68 + }); 69 + } 70 + } 71 + } 72 + 73 + if (embed.video?.ref?.$link) { 74 + refs.push({ 75 + cid: embed.video.ref.$link, 76 + mimeType: embed.video.mimeType, 77 + }); 78 + } 79 + } 80 + 81 + return refs; 82 + } 83 + 84 + async processBlobs(postUri: string, embedsJson: any): Promise<void> { 85 + const blobRefs = this.extractBlobReferences(embedsJson); 86 + 87 + if (blobRefs.length === 0) { 88 + return; 89 + } 90 + 91 + for (const ref of blobRefs) { 92 + try { 93 + await this.processBlob(postUri, ref); 94 + } catch (error) { 95 + logger.error( 96 + { error, postUri, cid: ref.cid }, 97 + "Failed to process blob" 98 + ); 99 + } 100 + } 101 + } 102 + 103 + private async processBlob( 104 + postUri: string, 105 + ref: BlobReference 106 + ): Promise<void> { 107 + const existing = await this.blobsRepo.findBySha256(ref.cid); 108 + if (existing) { 109 + logger.debug( 110 + { postUri, cid: ref.cid }, 111 + "Blob already processed, skipping" 112 + ); 113 + return; 114 + } 115 + 116 + const [, did] = postUri.replace("at://", "").split("/"); 117 + 118 + try { 119 + const response = await fetch( 120 + `https://cdn.bsky.app/img/feed_thumbnail/plain/${did}/${ref.cid}@jpeg`, 121 + { method: "HEAD" } 122 + ); 123 + 124 + if (!response.ok) { 125 + logger.warn( 126 + { postUri, cid: ref.cid, status: response.status }, 127 + "Failed to fetch blob metadata" 128 + ); 129 + return; 130 + } 131 + 132 + let blobData: Buffer | null = null; 133 + let storagePath: string | undefined; 134 + 135 + if (this.storage && config.blobs.hydrateBlobs) { 136 + const fullResponse = await fetch( 137 + `https://cdn.bsky.app/img/feed_fullsize/plain/${did}/${ref.cid}@jpeg` 138 + ); 139 + 140 + if (fullResponse.ok) { 141 + blobData = Buffer.from( 142 + await fullResponse.arrayBuffer() 143 + ); 144 + storagePath = await this.storage.store( 145 + ref.cid, 146 + blobData, 147 + ref.mimeType 148 + ); 149 + } 150 + } else { 151 + const thumbResponse = await fetch( 152 + `https://cdn.bsky.app/img/feed_thumbnail/plain/${did}/${ref.cid}@jpeg` 153 + ); 154 + 155 + if (thumbResponse.ok) { 156 + blobData = Buffer.from( 157 + await thumbResponse.arrayBuffer() 158 + ); 159 + } 160 + } 161 + 162 + if (!blobData) { 163 + logger.warn( 164 + { postUri, cid: ref.cid }, 165 + "Could not fetch blob data" 166 + ); 167 + return; 168 + } 169 + 170 + const hashes = await computeBlobHashes( 171 + blobData, 172 + ref.mimeType 173 + ); 174 + 175 + await this.blobsRepo.insert({ 176 + post_uri: postUri, 177 + blob_cid: ref.cid, 178 + sha256: hashes.sha256, 179 + phash: hashes.phash, 180 + storage_path: storagePath, 181 + mimetype: ref.mimeType, 182 + }); 183 + 184 + logger.info( 185 + { postUri, cid: ref.cid, sha256: hashes.sha256 }, 186 + "Blob processed successfully" 187 + ); 188 + } catch (error) { 189 + logger.error( 190 + { error, postUri, cid: ref.cid }, 191 + "Failed to download or hash blob" 192 + ); 193 + throw error; 194 + } 195 + } 196 + }
+82
src/blobs/storage/local.ts
··· 1 + import * as fs from "fs/promises"; 2 + import * as path from "path"; 3 + import { BlobStorage } from "../processor.js"; 4 + import { logger } from "../../logger/index.js"; 5 + 6 + export class LocalBlobStorage implements BlobStorage { 7 + constructor(private basePath: string) {} 8 + 9 + async store( 10 + cid: string, 11 + data: Buffer, 12 + mimeType?: string 13 + ): Promise<string> { 14 + try { 15 + const extension = this.getExtensionFromMime(mimeType); 16 + const filename = `${cid}${extension}`; 17 + 18 + const dir = path.join( 19 + this.basePath, 20 + cid.substring(0, 2), 21 + cid.substring(2, 4) 22 + ); 23 + 24 + await fs.mkdir(dir, { recursive: true }); 25 + 26 + const fullPath = path.join(dir, filename); 27 + await fs.writeFile(fullPath, data); 28 + 29 + logger.debug({ cid, path: fullPath }, "Blob stored locally"); 30 + 31 + return fullPath; 32 + } catch (error) { 33 + logger.error({ error, cid }, "Failed to store blob locally"); 34 + throw error; 35 + } 36 + } 37 + 38 + async retrieve(cid: string): Promise<Buffer | null> { 39 + try { 40 + const possibleExtensions = ["", ".jpg", ".jpeg", ".png", ".webp", ".mp4"]; 41 + 42 + for (const ext of possibleExtensions) { 43 + const filename = `${cid}${ext}`; 44 + const filePath = path.join( 45 + this.basePath, 46 + cid.substring(0, 2), 47 + cid.substring(2, 4), 48 + filename 49 + ); 50 + 51 + try { 52 + const data = await fs.readFile(filePath); 53 + return data; 54 + } catch { 55 + continue; 56 + } 57 + } 58 + 59 + logger.warn({ cid }, "Blob not found in local storage"); 60 + return null; 61 + } catch (error) { 62 + logger.error({ error, cid }, "Failed to retrieve blob from local storage"); 63 + throw error; 64 + } 65 + } 66 + 67 + private getExtensionFromMime(mimeType?: string): string { 68 + if (!mimeType) return ""; 69 + 70 + const mimeMap: Record<string, string> = { 71 + "image/jpeg": ".jpg", 72 + "image/jpg": ".jpg", 73 + "image/png": ".png", 74 + "image/webp": ".webp", 75 + "image/gif": ".gif", 76 + "video/mp4": ".mp4", 77 + "video/webm": ".webm", 78 + }; 79 + 80 + return mimeMap[mimeType.toLowerCase()] || ""; 81 + } 82 + }
+71
src/blobs/storage/s3.ts
··· 1 + import { 2 + S3Client, 3 + PutObjectCommand, 4 + GetObjectCommand, 5 + } from "@aws-sdk/client-s3"; 6 + import { BlobStorage } from "../processor.js"; 7 + import { logger } from "../../logger/index.js"; 8 + 9 + export class S3BlobStorage implements BlobStorage { 10 + private client: S3Client; 11 + private bucket: string; 12 + 13 + constructor(bucket: string, region: string) { 14 + this.bucket = bucket; 15 + this.client = new S3Client({ region }); 16 + } 17 + 18 + async store( 19 + cid: string, 20 + data: Buffer, 21 + mimeType?: string 22 + ): Promise<string> { 23 + try { 24 + const key = `blobs/${cid.substring(0, 2)}/${cid.substring(2, 4)}/${cid}`; 25 + 26 + await this.client.send( 27 + new PutObjectCommand({ 28 + Bucket: this.bucket, 29 + Key: key, 30 + Body: data, 31 + ContentType: mimeType, 32 + }) 33 + ); 34 + 35 + logger.debug({ cid, key }, "Blob stored in S3"); 36 + 37 + return `s3://${this.bucket}/${key}`; 38 + } catch (error) { 39 + logger.error({ error, cid }, "Failed to store blob in S3"); 40 + throw error; 41 + } 42 + } 43 + 44 + async retrieve(cid: string): Promise<Buffer | null> { 45 + try { 46 + const key = `blobs/${cid.substring(0, 2)}/${cid.substring(2, 4)}/${cid}`; 47 + 48 + const response = await this.client.send( 49 + new GetObjectCommand({ 50 + Bucket: this.bucket, 51 + Key: key, 52 + }) 53 + ); 54 + 55 + if (!response.Body) { 56 + logger.warn({ cid }, "Blob not found in S3"); 57 + return null; 58 + } 59 + 60 + const chunks: Uint8Array[] = []; 61 + for await (const chunk of response.Body as any) { 62 + chunks.push(chunk); 63 + } 64 + 65 + return Buffer.concat(chunks); 66 + } catch (error) { 67 + logger.error({ error, cid }, "Failed to retrieve blob from S3"); 68 + return null; 69 + } 70 + } 71 + }
+18 -29
src/config/index.ts
··· 13 13 wssUrl: z.string().url("WSS_URL must be a valid URL"), 14 14 }), 15 15 blobs: z.object({ 16 - hydrate: z.boolean().default(false), 17 - storageType: z.enum(["local", "s3"]).default("local"), 18 - storagePath: z.string().default("./data/blobs"), 16 + hydrateBlobs: z.boolean().default(false), 17 + storage: z.object({ 18 + type: z.enum(["local", "s3"]).default("local"), 19 + localPath: z.string().default("./data/blobs"), 20 + s3Bucket: z.string().optional(), 21 + s3Region: z.string().optional(), 22 + }), 19 23 }), 20 - s3: z 21 - .object({ 22 - bucket: z.string().optional(), 23 - region: z.string().optional(), 24 - accessKeyId: z.string().optional(), 25 - secretAccessKey: z.string().optional(), 26 - }) 27 - .optional(), 28 24 database: z.object({ 29 25 path: z.string().default("./data/skywatch.duckdb"), 30 26 }), ··· 51 47 wssUrl: process.env.WSS_URL, 52 48 }, 53 49 blobs: { 54 - hydrate: process.env.HYDRATE_BLOBS === "true", 55 - storageType: process.env.BLOB_STORAGE_TYPE, 56 - storagePath: process.env.BLOB_STORAGE_PATH, 50 + hydrateBlobs: process.env.HYDRATE_BLOBS === "true", 51 + storage: { 52 + type: process.env.BLOB_STORAGE_TYPE, 53 + localPath: process.env.BLOB_STORAGE_PATH, 54 + s3Bucket: process.env.S3_BUCKET, 55 + s3Region: process.env.S3_REGION, 56 + }, 57 57 }, 58 - s3: 59 - process.env.BLOB_STORAGE_TYPE === "s3" 60 - ? { 61 - bucket: process.env.S3_BUCKET, 62 - region: process.env.S3_REGION, 63 - accessKeyId: process.env.AWS_ACCESS_KEY_ID, 64 - secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, 65 - } 66 - : undefined, 67 58 database: { 68 59 path: process.env.DB_PATH, 69 60 }, ··· 85 76 process.exit(1); 86 77 } 87 78 88 - if (result.data.blobs.storageType === "s3") { 79 + if (result.data.blobs.storage.type === "s3") { 89 80 if ( 90 - !result.data.s3?.bucket || 91 - !result.data.s3?.region || 92 - !result.data.s3?.accessKeyId || 93 - !result.data.s3?.secretAccessKey 81 + !result.data.blobs.storage.s3Bucket || 82 + !result.data.blobs.storage.s3Region 94 83 ) { 95 84 console.error( 96 - "S3 configuration is incomplete. Required: S3_BUCKET, S3_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY" 85 + "S3 configuration is incomplete. Required: S3_BUCKET, S3_REGION" 97 86 ); 98 87 process.exit(1); 99 88 }
+14 -1
src/hydration/posts.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { PostsRepository } from "../database/posts.repository.js"; 4 + import { BlobProcessor } from "../blobs/processor.js"; 4 5 import { logger } from "../logger/index.js"; 5 6 import { config } from "../config/index.js"; 6 7 7 8 export class PostHydrationService { 8 9 private agent: AtpAgent; 9 10 private postsRepo: PostsRepository; 11 + private blobProcessor: BlobProcessor; 10 12 11 13 constructor(db: Database) { 12 14 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 13 15 this.postsRepo = new PostsRepository(db); 16 + this.blobProcessor = new BlobProcessor(db, this.agent); 14 17 } 15 18 16 19 async initialize(): Promise<void> { ··· 57 60 58 61 const isReply = !!record.reply; 59 62 63 + const embeds = record.embed ? [record.embed] : null; 64 + 60 65 await this.postsRepo.insert({ 61 66 uri, 62 67 did, 63 68 text: record.text || "", 64 69 facets: record.facets || null, 65 - embeds: record.embed || null, 70 + embeds, 66 71 langs: record.langs || null, 67 72 tags: record.tags || null, 68 73 created_at: record.createdAt, ··· 70 75 }); 71 76 72 77 logger.info({ uri }, "Post hydrated successfully"); 78 + 79 + if (embeds) { 80 + try { 81 + await this.blobProcessor.processBlobs(uri, embeds); 82 + } catch (error) { 83 + logger.warn({ error, uri }, "Failed to process blobs for post"); 84 + } 85 + } 73 86 } catch (error) { 74 87 logger.error({ error, uri }, "Failed to hydrate post"); 75 88 throw error;
+25 -17
tests/unit/decoder.test.ts
··· 1 1 import { describe, test, expect } from "bun:test"; 2 2 import { 3 - extractLabelFromMessage, 3 + extractLabelsFromMessage, 4 4 validateLabel, 5 5 LabelEvent, 6 6 } from "../../src/firehose/decoder.js"; 7 7 8 8 describe("Firehose Decoder", () => { 9 - describe("extractLabelFromMessage", () => { 10 - test("should extract label from valid message", () => { 9 + describe("extractLabelsFromMessage", () => { 10 + test("should extract labels from valid message", () => { 11 11 const message = { 12 12 op: 1, 13 13 t: "#labels", ··· 21 21 ], 22 22 }; 23 23 24 - const label = extractLabelFromMessage(message); 24 + const labels = extractLabelsFromMessage(message); 25 25 26 - expect(label).not.toBeNull(); 27 - expect(label?.val).toBe("spam"); 28 - expect(label?.src).toBe("did:plc:labeler"); 26 + expect(labels).toHaveLength(1); 27 + expect(labels[0].val).toBe("spam"); 28 + expect(labels[0].src).toBe("did:plc:labeler"); 29 29 }); 30 30 31 - test("should return null for non-label messages", () => { 31 + test("should return empty array for non-label messages", () => { 32 32 const message = { 33 33 op: 1, 34 34 t: "#info", 35 35 }; 36 36 37 - const label = extractLabelFromMessage(message); 37 + const labels = extractLabelsFromMessage(message); 38 38 39 - expect(label).toBeNull(); 39 + expect(labels).toHaveLength(0); 40 40 }); 41 41 42 - test("should return null for messages with wrong op", () => { 42 + test("should extract all labels from message with multiple labels", () => { 43 43 const message = { 44 - op: 0, 44 + op: 1, 45 45 t: "#labels", 46 46 labels: [ 47 47 { ··· 50 50 val: "spam", 51 51 cts: "2025-01-15T12:00:00Z", 52 52 }, 53 + { 54 + src: "did:plc:labeler", 55 + uri: "at://did:plc:user/app.bsky.feed.post/456", 56 + val: "csam", 57 + cts: "2025-01-15T12:01:00Z", 58 + }, 53 59 ], 54 60 }; 55 61 56 - const label = extractLabelFromMessage(message); 62 + const labels = extractLabelsFromMessage(message); 57 63 58 - expect(label).toBeNull(); 64 + expect(labels).toHaveLength(2); 65 + expect(labels[0].val).toBe("spam"); 66 + expect(labels[1].val).toBe("csam"); 59 67 }); 60 68 61 - test("should return null for messages with empty labels array", () => { 69 + test("should return empty array for messages with empty labels array", () => { 62 70 const message = { 63 71 op: 1, 64 72 t: "#labels", 65 73 labels: [], 66 74 }; 67 75 68 - const label = extractLabelFromMessage(message); 76 + const labels = extractLabelsFromMessage(message); 69 77 70 - expect(label).toBeNull(); 78 + expect(labels).toHaveLength(0); 71 79 }); 72 80 }); 73 81