A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix CBOR decoding for framed firehose messages

- Switch from decode to decodeFirst for proper frame handling
- Decode header and body separately to handle message framing
- Update label extraction to return array (messages can have multiple labels)
- Process seq for cursor tracking before label processing
- Improve error logging with actual error messages

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+33 -27
+25 -17
src/firehose/decoder.ts
··· 1 - import { decode as decodeCBOR } from "@atcute/cbor"; 1 + import { decodeFirst } from "@atcute/cbor"; 2 2 import { logger } from "../logger/index.js"; 3 3 4 4 export interface LabelEvent { ··· 14 14 } 15 15 16 16 export interface FirehoseMessage { 17 - op: number; 17 + op?: number; 18 18 t?: string; 19 + seq?: number; 20 + labels?: LabelEvent[]; 19 21 [key: string]: any; 20 22 } 21 23 22 24 export function decodeFirehoseMessage(data: Buffer): FirehoseMessage | null { 23 25 try { 24 - const decoded = decodeCBOR(data); 25 - return decoded as FirehoseMessage; 26 - } catch (error) { 27 - logger.error({ error }, "Failed to decode CBOR message"); 28 - return null; 29 - } 30 - } 26 + const buffer = new Uint8Array(data); 27 + const [header, remainder] = decodeFirst(buffer); 28 + const [body] = decodeFirst(remainder); 31 29 32 - export function extractLabelFromMessage(message: FirehoseMessage): LabelEvent | null { 33 - if (!message || message.op !== 1) { 30 + return body as FirehoseMessage; 31 + } catch (err) { 32 + logger.error( 33 + { 34 + err: err instanceof Error ? err.message : String(err), 35 + errorStack: err instanceof Error ? err.stack : undefined, 36 + dataLength: data.length, 37 + dataPreview: data.slice(0, 50).toString("hex") 38 + }, 39 + "Failed to decode CBOR message" 40 + ); 34 41 return null; 35 42 } 43 + } 36 44 37 - if (message.t !== "#labels") { 38 - return null; 45 + export function extractLabelsFromMessage(message: FirehoseMessage): LabelEvent[] { 46 + if (!message) { 47 + return []; 39 48 } 40 49 41 - const labels = message.labels; 42 - if (!Array.isArray(labels) || labels.length === 0) { 43 - return null; 50 + if (message.labels && Array.isArray(message.labels)) { 51 + return message.labels; 44 52 } 45 53 46 - return labels[0] as LabelEvent; 54 + return []; 47 55 } 48 56 49 57 export function validateLabel(label: LabelEvent): boolean {
+8 -10
src/firehose/subscriber.ts
··· 4 4 import { logger } from "../logger/index.js"; 5 5 import { 6 6 decodeFirehoseMessage, 7 - extractLabelFromMessage, 7 + extractLabelsFromMessage, 8 8 validateLabel, 9 9 LabelEvent, 10 10 } from "./decoder.js"; ··· 86 86 return; 87 87 } 88 88 89 - const label = extractLabelFromMessage(message); 90 - if (!label) return; 91 - 92 - if (!validateLabel(label)) return; 93 - 94 - if (!this.filter.shouldCapture(label)) return; 95 - 96 - this.emit("label", label); 97 - 98 89 if (message.seq) { 99 90 await this.saveCursor(message.seq); 91 + } 92 + 93 + const labels = extractLabelsFromMessage(message); 94 + for (const label of labels) { 95 + if (!validateLabel(label)) continue; 96 + if (!this.filter.shouldCapture(label)) continue; 97 + this.emit("label", label); 100 98 } 101 99 } catch (error) { 102 100 logger.error({ error }, "Error processing message");