A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Phase 3 - Content hydration with queuing

Implements automatic post and profile hydration:

- Post hydration service via @atproto/api
* Fetches full post records (text, facets, embeds, langs, tags)
* Detects reply status
* Skips already-hydrated content
- Profile hydration service
* Fetches profile records (displayName, description)
* Resolves handles via getProfile
* Links DID to handle
- Asynchronous hydration queue
* Deduplicates tasks
* Prevents concurrent processing of same resource
* FIFO ordering
- Automatic URI parsing and routing
* at:// URIs with 3 parts → post hydration
* did: URIs → profile hydration

Integration:
- Labels trigger hydration on receipt
- Queue processes tasks asynchronously
- Both services authenticate with Bluesky on startup

Tests: 4 new queue tests (27 total, all passing)

All Phase 3 deliverables complete.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+347 -2
+78
src/hydration/posts.service.ts
··· 1 + import { AtpAgent } from "@atproto/api"; 2 + import { Database } from "duckdb"; 3 + import { PostsRepository } from "../database/posts.repository.js"; 4 + import { logger } from "../logger/index.js"; 5 + import { config } from "../config/index.js"; 6 + 7 + export class PostHydrationService { 8 + private agent: AtpAgent; 9 + private postsRepo: PostsRepository; 10 + 11 + constructor(db: Database) { 12 + this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 13 + this.postsRepo = new PostsRepository(db); 14 + } 15 + 16 + async initialize(): Promise<void> { 17 + try { 18 + await this.agent.login({ 19 + identifier: config.bsky.handle, 20 + password: config.bsky.password, 21 + }); 22 + logger.info("Post hydration service authenticated"); 23 + } catch (error) { 24 + logger.error({ error }, "Failed to authenticate post hydration service"); 25 + throw error; 26 + } 27 + } 28 + 29 + async hydratePost(uri: string): Promise<void> { 30 + try { 31 + const existingPost = await this.postsRepo.findByUri(uri); 32 + if (existingPost) { 33 + logger.debug({ uri }, "Post already hydrated, skipping"); 34 + return; 35 + } 36 + 37 + const uriParts = uri.replace("at://", "").split("/"); 38 + if (uriParts.length !== 3) { 39 + logger.warn({ uri }, "Invalid post URI format"); 40 + return; 41 + } 42 + 43 + const [did, collection, rkey] = uriParts; 44 + 45 + const response = await this.agent.com.atproto.repo.getRecord({ 46 + repo: did, 47 + collection, 48 + rkey, 49 + }); 50 + 51 + if (!response.success || !response.data.value) { 52 + logger.warn({ uri }, "Failed to fetch post record"); 53 + return; 54 + } 55 + 56 + const record = response.data.value as any; 57 + 58 + const isReply = !!record.reply; 59 + 60 + await this.postsRepo.insert({ 61 + uri, 62 + did, 63 + text: record.text || "", 64 + facets: record.facets || null, 65 + embeds: record.embed || null, 66 + langs: record.langs || null, 67 + tags: record.tags || null, 68 + created_at: record.createdAt, 69 + is_reply: isReply, 70 + }); 71 + 72 + logger.info({ uri }, "Post hydrated successfully"); 73 + } catch (error) { 74 + logger.error({ error, uri }, "Failed to hydrate post"); 75 + throw error; 76 + } 77 + } 78 + }
+72
src/hydration/profiles.service.ts
··· 1 + import { AtpAgent } from "@atproto/api"; 2 + import { Database } from "duckdb"; 3 + import { ProfilesRepository } from "../database/profiles.repository.js"; 4 + import { logger } from "../logger/index.js"; 5 + import { config } from "../config/index.js"; 6 + 7 + export class ProfileHydrationService { 8 + private agent: AtpAgent; 9 + private profilesRepo: ProfilesRepository; 10 + 11 + constructor(db: Database) { 12 + this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 13 + this.profilesRepo = new ProfilesRepository(db); 14 + } 15 + 16 + async initialize(): Promise<void> { 17 + try { 18 + await this.agent.login({ 19 + identifier: config.bsky.handle, 20 + password: config.bsky.password, 21 + }); 22 + logger.info("Profile hydration service authenticated"); 23 + } catch (error) { 24 + logger.error({ error }, "Failed to authenticate profile hydration service"); 25 + throw error; 26 + } 27 + } 28 + 29 + async hydrateProfile(did: string): Promise<void> { 30 + try { 31 + const existingProfile = await this.profilesRepo.findByDid(did); 32 + if (existingProfile) { 33 + logger.debug({ did }, "Profile already hydrated, skipping"); 34 + return; 35 + } 36 + 37 + const profileResponse = await this.agent.com.atproto.repo.getRecord({ 38 + repo: did, 39 + collection: "app.bsky.actor.profile", 40 + rkey: "self", 41 + }); 42 + 43 + let displayName: string | undefined; 44 + let description: string | undefined; 45 + 46 + if (profileResponse.success && profileResponse.data.value) { 47 + const record = profileResponse.data.value as any; 48 + displayName = record.displayName; 49 + description = record.description; 50 + } 51 + 52 + const profileLookup = await this.agent.getProfile({ actor: did }); 53 + 54 + let handle: string | undefined; 55 + if (profileLookup.success) { 56 + handle = profileLookup.data.handle; 57 + } 58 + 59 + await this.profilesRepo.insert({ 60 + did, 61 + handle, 62 + display_name: displayName, 63 + description, 64 + }); 65 + 66 + logger.info({ did, handle }, "Profile hydrated successfully"); 67 + } catch (error) { 68 + logger.error({ error, did }, "Failed to hydrate profile"); 69 + throw error; 70 + } 71 + } 72 + }
+85
src/hydration/queue.ts
··· 1 + import { EventEmitter } from "events"; 2 + import { logger } from "../logger/index.js"; 3 + 4 + export interface HydrationTask { 5 + type: "post" | "profile"; 6 + identifier: string; 7 + } 8 + 9 + export class HydrationQueue extends EventEmitter { 10 + private queue: HydrationTask[] = []; 11 + private processing = false; 12 + private processingTask: HydrationTask | null = null; 13 + 14 + enqueue(task: HydrationTask): void { 15 + const isDuplicate = this.queue.some( 16 + (t) => t.type === task.type && t.identifier === task.identifier 17 + ); 18 + 19 + if (isDuplicate) { 20 + logger.debug( 21 + { type: task.type, identifier: task.identifier }, 22 + "Skipping duplicate task" 23 + ); 24 + return; 25 + } 26 + 27 + if ( 28 + this.processingTask?.type === task.type && 29 + this.processingTask?.identifier === task.identifier 30 + ) { 31 + logger.debug( 32 + { type: task.type, identifier: task.identifier }, 33 + "Task already being processed" 34 + ); 35 + return; 36 + } 37 + 38 + this.queue.push(task); 39 + logger.debug( 40 + { type: task.type, identifier: task.identifier, queueSize: this.queue.length }, 41 + "Task enqueued" 42 + ); 43 + 44 + if (!this.processing) { 45 + this.processNext(); 46 + } 47 + } 48 + 49 + private async processNext(): Promise<void> { 50 + if (this.queue.length === 0) { 51 + this.processing = false; 52 + return; 53 + } 54 + 55 + this.processing = true; 56 + this.processingTask = this.queue.shift()!; 57 + 58 + logger.debug( 59 + { 60 + type: this.processingTask.type, 61 + identifier: this.processingTask.identifier, 62 + remaining: this.queue.length, 63 + }, 64 + "Processing task" 65 + ); 66 + 67 + this.emit("task", this.processingTask); 68 + 69 + setTimeout(() => { 70 + this.processingTask = null; 71 + this.processNext(); 72 + }, 100); 73 + } 74 + 75 + getQueueSize(): number { 76 + return this.queue.length; 77 + } 78 + 79 + clear(): void { 80 + this.queue = []; 81 + this.processing = false; 82 + this.processingTask = null; 83 + logger.info("Hydration queue cleared"); 84 + } 85 + }
+51 -2
src/index.ts
··· 1 1 import { config } from "./config/index.js"; 2 2 import { logger } from "./logger/index.js"; 3 - import { initializeDatabase, closeDatabase, getDatabase } from "./database/connection.js"; 3 + import { 4 + initializeDatabase, 5 + closeDatabase, 6 + getDatabase, 7 + } from "./database/connection.js"; 4 8 import { initializeSchema } from "./database/schema.js"; 5 9 import { LabelsRepository } from "./database/labels.repository.js"; 6 10 import { FirehoseSubscriber } from "./firehose/subscriber.js"; 11 + import { PostHydrationService } from "./hydration/posts.service.js"; 12 + import { ProfileHydrationService } from "./hydration/profiles.service.js"; 13 + import { HydrationQueue } from "./hydration/queue.js"; 7 14 8 15 async function main() { 9 16 logger.info("Starting Skywatch Tail..."); ··· 15 22 const db = getDatabase(); 16 23 const labelsRepo = new LabelsRepository(db); 17 24 25 + const postHydration = new PostHydrationService(db); 26 + const profileHydration = new ProfileHydrationService(db); 27 + const hydrationQueue = new HydrationQueue(); 28 + 29 + await postHydration.initialize(); 30 + await profileHydration.initialize(); 31 + 32 + hydrationQueue.on("task", async (task) => { 33 + try { 34 + if (task.type === "post") { 35 + await postHydration.hydratePost(task.identifier); 36 + } else if (task.type === "profile") { 37 + await profileHydration.hydrateProfile(task.identifier); 38 + } 39 + } catch (error) { 40 + logger.error({ error, task }, "Hydration task failed"); 41 + } 42 + }); 43 + 18 44 const subscriber = new FirehoseSubscriber(); 19 45 20 46 subscriber.on("label", async (label) => { ··· 32 58 }); 33 59 34 60 logger.debug({ uri: label.uri }, "Label stored"); 61 + 62 + if (label.uri.startsWith("at://")) { 63 + const uriParts = label.uri.replace("at://", "").split("/"); 64 + 65 + if (uriParts.length === 3) { 66 + hydrationQueue.enqueue({ 67 + type: "post", 68 + identifier: label.uri, 69 + }); 70 + } else if (uriParts.length === 1) { 71 + hydrationQueue.enqueue({ 72 + type: "profile", 73 + identifier: label.uri.replace("at://", ""), 74 + }); 75 + } 76 + } else if (label.uri.startsWith("did:")) { 77 + hydrationQueue.enqueue({ 78 + type: "profile", 79 + identifier: label.uri, 80 + }); 81 + } 35 82 } catch (error) { 36 - logger.error({ error, label }, "Failed to store label"); 83 + logger.error({ error, label }, "Failed to process label"); 37 84 } 38 85 }); 39 86 ··· 56 103 process.on("SIGINT", async () => { 57 104 logger.info("Shutting down gracefully..."); 58 105 subscriber.stop(); 106 + hydrationQueue.clear(); 59 107 await closeDatabase(); 60 108 process.exit(0); 61 109 }); ··· 63 111 process.on("SIGTERM", async () => { 64 112 logger.info("Shutting down gracefully..."); 65 113 subscriber.stop(); 114 + hydrationQueue.clear(); 66 115 await closeDatabase(); 67 116 process.exit(0); 68 117 });
+61
tests/unit/queue.test.ts
··· 1 + import { describe, test, expect, beforeEach } from "bun:test"; 2 + import { HydrationQueue, HydrationTask } from "../../src/hydration/queue.js"; 3 + 4 + describe("Hydration Queue", () => { 5 + let queue: HydrationQueue; 6 + 7 + beforeEach(() => { 8 + queue = new HydrationQueue(); 9 + }); 10 + 11 + test("should enqueue and process tasks", (done) => { 12 + const task: HydrationTask = { 13 + type: "post", 14 + identifier: "at://did:plc:user/app.bsky.feed.post/123", 15 + }; 16 + 17 + queue.on("task", (processedTask) => { 18 + expect(processedTask).toEqual(task); 19 + done(); 20 + }); 21 + 22 + queue.enqueue(task); 23 + }); 24 + 25 + test("should track queue size", () => { 26 + queue.enqueue({ 27 + type: "post", 28 + identifier: "at://did:plc:user/app.bsky.feed.post/123", 29 + }); 30 + 31 + queue.enqueue({ 32 + type: "profile", 33 + identifier: "did:plc:user", 34 + }); 35 + 36 + expect(queue.getQueueSize()).toBeGreaterThan(0); 37 + }); 38 + 39 + test("should not enqueue duplicate tasks", () => { 40 + const task: HydrationTask = { 41 + type: "post", 42 + identifier: "at://did:plc:user/app.bsky.feed.post/123", 43 + }; 44 + 45 + queue.enqueue(task); 46 + queue.enqueue(task); 47 + 48 + expect(queue.getQueueSize()).toBeLessThanOrEqual(1); 49 + }); 50 + 51 + test("should clear queue", () => { 52 + queue.enqueue({ 53 + type: "post", 54 + identifier: "at://did:plc:user/app.bsky.feed.post/123", 55 + }); 56 + 57 + queue.clear(); 58 + 59 + expect(queue.getQueueSize()).toBe(0); 60 + }); 61 + });