···2222| **Frontend** | React, TypeScript, Ant Design, TailwindCSS, Apollo Client |
2323| **Backend** | Node.js, Express, Apollo Server, TypeScript |
2424| **Databases** | PostgreSQL, Scylla(5.2), ClickHouse, Redis |
2525-| **Messaging** | Kafka (optional), BullMQ |
2525+| **Messaging** | BullMQ (Redis) |
2626| **ORM** | Sequelize, Kysely |
2727| **Auth** | Passport.js, express-session, SAML (SSO) |
2828| **Observability** | OpenTelemetry |
···6666Coop accepts both synchronous and asynchronous input.
67676868* Synchronous input is handled via REST APIs and supports item submission, action execution, reporting workflows, policy retrieval, and related operations.
6969-* Asynchronous input is handled via Kafka-based event streaming using the ITEM\_SUBMISSION\_EVENT topic.
6969+* Asynchronous input is handled via BullMQ job queues backed by Redis.
70707171All API requests require an organization API key passed via the x-api-key header.
7272···472472473473* PostgreSQL
474474* Redis
475475-* Kafka
476476- * Schema registry
477477- * Zookeeper
478475* Clickhouse
479476* ScyllaDb
480477* Metrics
+13-6
docs/DEVELOPMENT.md
···1717Copy `server/.env.example` to `server/.env`. The example file contains all available options with documentation. Key sections:
18181919- **Database connections**: PostgreSQL, ClickHouse, ScyllaDB, Redis
2020-- **Kafka**: Broker and schema registry settings
2120- **External APIs**: OpenAI, SendGrid, Google APIs (optional)
2221- **Security**: Session secrets, JWT signing keys
2322···4241PostgreSQL | 5432 | Primary DB (with pgvector)
4342ClickHouse | 8123, 9000 | Analytics warehouse
4443ScyllaDB | 9042 | Item submission history
4545-Redis | 6379 | Caching and job queues
4646-Kafka | 29092 | Event streaming
4747-Schema Registry | 8081 | Kafka schemas
4848-Zookeeper | 22181 | Kafka coordination
4949-Jaeger | 16686 | Tracing UI (opens automatically)
4444+Redis | 6379 | Caching and job queues
4545+Jaeger | 16686 | Tracing UI (opens automatically)
5046OTEL Collector | 4317 | Telemetry collection
51475248Check service health:
···114110# Terminal 3 (optional, for GraphQL schema changes)
115111npm run generate:watch
116112```
113113+114114+### Background Workers
115115+116116+Item submissions are processed asynchronously via a BullMQ worker that consumes from Redis. To process items locally, run the worker in a separate terminal:
117117+118118+```bash
119119+cd server
120120+npm run runWorkerOrJob ItemProcessingWorker
121121+```
122122+123123+Without this running, submitted items will be enqueued in Redis but not processed. Other available workers/jobs can be found in `server/iocContainer/services/workersAndJobs.ts`.
117124118125### With Distributed Tracing
119126
+1-1
package.json
···1818 "generate:watch": "graphql-codegen --watch \"server/graphql/**/**.ts\"",
1919 "prepare": "husky install",
2020 "lint": "cd client && npm run lint; cd ../server && npm run lint",
2121- "up": "docker compose up --detach postgres clickhouse hma scylla redis otel-collector kafka && open http://localhost:16686",
2121+ "up": "docker compose up --detach postgres clickhouse hma scylla redis otel-collector && open http://localhost:16686",
2222 "down": "docker compose down",
2323 "betterer": "betterer"
2424 },
-20
server/.env.example
···28282929HMA_SERVICE_URL=http://localhost:9876
30303131-# Kafka authentication info.
3232-KAFKA_BROKER_HOST=localhost:29092
3333-KAFKA_BROKER_USERNAME=
3434-KAFKA_BROKER_PASSWORD=
3535-KAFKA_SCHEMA_REGISTRY_HOST=http://localhost:8081
3636-KAFKA_SCHEMA_REGISTRY_USERNAME=
3737-KAFKA_SCHEMA_REGISTRY_PASSWORD=
3838-KAFKAJS_NO_PARTITIONER_WARNING=1
3939-4040-# NB: these schema ids are different on prod + staging clusters; may be
4141-# different in our future local dev setup. Eventually, we'll likely want
4242-# a more sophisticated approach than env vars for determining these values,
4343-# but we need to figure out our Kafka schema migration system first.
4444-KAFKA_TOPIC_KEY_SCHEMA_ID_DATA_WAREHOUSE_INGEST_EVENTS=
4545-KAFKA_TOPIC_VALUE_SCHEMA_ID_DATA_WAREHOUSE_INGEST_EVENTS=
4646-KAFKA_TOPIC_KEY_SCHEMA_ID_ITEM_SUBMISSION_EVENTS=
4747-KAFKA_TOPIC_VALUE_SCHEMA_ID_ITEM_SUBMISSION_EVENTS=
4848-KAFKA_TOPIC_KEY_SCHEMA_ID_ITEM_SUBMISSION_EVENTS_RETRY_0=
4949-KAFKA_TOPIC_VALUE_SCHEMA_ID_ITEM_SUBMISSION_EVENTS_RETRY_0=
5050-5131# Scylla Cluster Details
5232SCYLLA_USERNAME=cassandra
5333SCYLLA_PASSWORD=cassandra
···11/* eslint-disable max-lines */
22import { createRequire } from 'module';
33import Bottle from '@ethanresnick/bottlejs';
44-import { SchemaType } from '@kafkajs/confluent-schema-registry';
54import opentelemetry from '@opentelemetry/api';
65import { makeDateString, type ItemIdentifier } from '@roostorg/types';
77-import avro from 'avsc';
86import { types as scyllaTypes } from 'cassandra-driver';
97import IORedis, { type Cluster } from 'ioredis';
1010-import { logLevel } from 'kafkajs';
118import * as knexPkg from 'knex';
129import { type Knex } from 'knex';
1310import {
···2219import { type JsonObject, type ReadonlyDeep } from 'type-fest';
2320import { v1 as uuidv1 } from 'uuid';
24212525-import Kafka, { SchemaRegistry, type SchemaIdFor } from '../kafka/index.js';
2626-import {
2727- makeItemQueueBulkWrite,
2828- type ItemQueueBulkWrite,
2929-} from '../kafka/itemQueueBulkWrite.js';
3030-import logCreator from '../kafka/logger.js';
3122import makeDb from '../models/index.js';
2323+import {
2424+ makeItemSubmissionBulkWrite,
2525+ type ItemSubmissionBulkWrite,
2626+ ITEM_SUBMISSION_QUEUE_NAME,
2727+ ITEM_SUBMISSION_DLQ_NAME,
2828+} from '../queues/itemSubmissionQueue.js';
3229import { type PolicyActionPenalties } from '../models/OrgModel.js';
3330import { type HashBank, HashBankService } from '../services/hmaService/index.js';
3431import makeActionPublisher, {
···249246import { registerWorkersAndJobs } from './services/workersAndJobs.js';
250247import { register, safeGetEnvVar } from './utils.js';
251248252252-type DataWarehouseOutboxKafkaMessageKey = {
253253- orgId: string;
254254- userId: string;
255255-};
256256-257257-type DataWarehouseOutboxKafkaMessageValue = {
258258- dataJSON: string;
259259- table: string;
260260- recordedAt: Date;
261261-};
262262-263249// the otel instrumentation currently intercepts require statements. support for
264250// esm support is experimental so we should wait until it is stable
265251const require = createRequire(import.meta.url);
266252const { Client: ScyllaClient } = require('cassandra-driver');
267253export type { DataSources } from './services/gqlDataSources.js';
268254269269-// All Kafka topics and their schemas should be referenced here. Currently, we
270270-// have to create schemas and topics manually, and manually keep them in sync
271271-// across environments, which is hard to do reliably. Eventually, we'll have
272272-// an IaC solution that lets us keep these schemas in code somewhere and view
273273-// them in the repo. Until then, talk to Ethan if you need a new topic or schema.
274274-export type ItemSubmissionKafkaMessageKey = {
255255+export type ItemSubmissionMessageKey = {
275256 syntheticThreadId: string;
276257};
277277-export type ItemSubmissionKafkaMessageValue = {
258258+export type ItemSubmissionMessageValue = {
278259 metadata: {
279260 syntheticThreadId: string;
280261 requestId: CorrelationId<'post-items'>;
···293274 };
294275};
295276296296-export type KafkaSchemaMap = {
297297- ITEM_SUBMISSION_EVENTS: {
298298- keySchema: SchemaIdFor<ItemSubmissionKafkaMessageKey>;
299299- valueSchema: SchemaIdFor<ItemSubmissionKafkaMessageValue>;
300300- };
301301- ITEM_SUBMISSION_EVENTS_RETRY_0: {
302302- keySchema: SchemaIdFor<ItemSubmissionKafkaMessageKey>;
303303- valueSchema: SchemaIdFor<ItemSubmissionKafkaMessageValue>;
304304- };
305305- DATA_WAREHOUSE_INGEST_EVENTS: {
306306- keySchema: SchemaIdFor<DataWarehouseOutboxKafkaMessageKey>;
307307- valueSchema: SchemaIdFor<DataWarehouseOutboxKafkaMessageValue>;
308308- };
309309-};
310310-311277// Defines a global map type of all injectable dependencies, where the key is,
312278// conceptually, the name of the "interface"/name of the contract, and the value
313279// is the type that any implementation must sastify.
···365331 ContentApiRequestsAdapter: IContentApiRequestsAdapter;
366332 OrgCreationAdapter: IOrgCreationAdapter;
367333368368- itemSubmissionQueueBulkWrite: ItemQueueBulkWrite;
369369- itemSubmissionRetryQueueBulkWrite: ItemQueueBulkWrite;
334334+ itemSubmissionQueueBulkWrite: ItemSubmissionBulkWrite;
335335+ itemSubmissionRetryQueueBulkWrite: ItemSubmissionBulkWrite;
370336 Knex: Knex;
371337 IORedis: IORedis.Redis | Cluster;
372372- // We register the services as Kafka<any> so that each service that depends
373373- // on Kafka can type its arg more specifically, based on the topic that
374374- // it's supposed to be able to "see". E.g., a worker can type
375375- // its argument as `Kafka<Pick<KafkaSchemaMap, 'ITEM_SUBMISSION_EVENTS'>>`, so that its code can only
376376- // read messages from the topic with the intended schema, and the `Kafka`
377377- // service will be assignable to that argument because of the `any`.
378378- // eslint-disable-next-line @typescript-eslint/no-explicit-any
379379- Kafka: Kafka<any>;
380338381339 // Loggers
382340 RuleExecutionLogger: RuleExecutionLogger;
···557515 dnsLookup: (address, callback) => callback(null, address),
558516 redisOptions: {
559517 tls: {},
518518+ // Required by BullMQ: its workers use blocking Redis commands
519519+ // that would otherwise be misinterpreted as timed-out requests.
520520+ maxRetriesPerRequest: null,
560521 username: safeGetEnvVar('REDIS_USER'),
561522 password: safeGetEnvVar('REDIS_PASSWORD'),
562523 },
563524 },
564525 )
565526 : new IORedis.default({
527527+ // Required by BullMQ: its workers use blocking Redis commands
528528+ // that would otherwise be misinterpreted as timed-out requests.
566529 maxRetriesPerRequest: null,
567530 port: parseInt(process.env.REDIS_PORT ?? '6379'),
568531 host: safeGetEnvVar('REDIS_HOST'),
569532 }),
570533 );
571534572572- bottle.factory('Kafka', () => {
573573- // TODO: think about shutdown logic. Right now, creating this instance
574574- // doesn't open up any resources that need to be shutdown, so we're ok.
575575- // However, when a producer/consumer are created from this instance and then
576576- // they call .connect(), that opens a connection that we must terminate by
577577- // manually calling .disconnect() on shutdown. Maybe there's a more
578578- // elegant/robust way?
579579- return new Kafka(
580580- {
581581- // NB: Confluent Cloud exposes only one endpoint URL that load balances
582582- // between multiple brokers, so we don't need to worry about splitting
583583- // this to an array.
584584- brokers: [safeGetEnvVar('KAFKA_BROKER_HOST')],
585585- ...(['CI', 'development'].includes(process.env.NODE_ENV ?? 'production') ? {} : {
586586- ssl: true,
587587- sasl: {
588588- mechanism: 'plain',
589589- username: safeGetEnvVar('KAFKA_BROKER_USERNAME'),
590590- password: safeGetEnvVar('KAFKA_BROKER_PASSWORD'),
591591- },
592592- }),
593593- // Found experimentally. Confluent docs seem to recommend setting at
594594- // least some timeouts to a value above 10s, but they don't mention a
595595- // specific value to use, and the setting described in those docs may
596596- // not map 1:1 to a kafkajs setting. Nevertheless, the kafkajs default
597597- // of 1s was giving timeout errors, so we had to bump this. See
598598- // https://docs.confluent.io/cloud/current/cp-component/clients-cloud-config.html#prerequisitesq
599599- connectionTimeout: 10_000,
600600- // Default here is 30s but we set it to avoid long-running requests
601601- // wait that long.
602602- requestTimeout: 10_000,
603603- // Set clientId to help with monitoring/observability.
604604- // See https://kafka.js.org/docs/configuration#client-id
605605- clientId: getEnvVarOrWarn('OTEL_SERVICE_NAME'),
606606- logLevel: logLevel.WARN,
607607- logCreator,
608608- },
609609- {
610610- DATA_WAREHOUSE_INGEST_EVENTS: {
611611- keySchema: parseInt(
612612- safeGetEnvVar('KAFKA_TOPIC_KEY_SCHEMA_ID_DATA_WAREHOUSE_INGEST_EVENTS'),
613613- ) as SchemaIdFor<DataWarehouseOutboxKafkaMessageKey>,
614614- valueSchema: parseInt(
615615- safeGetEnvVar(
616616- 'KAFKA_TOPIC_VALUE_SCHEMA_ID_DATA_WAREHOUSE_INGEST_EVENTS',
617617- ),
618618- ) as SchemaIdFor<DataWarehouseOutboxKafkaMessageValue>,
619619- },
620620- ITEM_SUBMISSION_EVENTS: {
621621- keySchema: parseInt(
622622- safeGetEnvVar('KAFKA_TOPIC_KEY_SCHEMA_ID_ITEM_SUBMISSION_EVENTS'),
623623- ) as SchemaIdFor<ItemSubmissionKafkaMessageKey>,
624624- valueSchema: parseInt(
625625- safeGetEnvVar('KAFKA_TOPIC_VALUE_SCHEMA_ID_ITEM_SUBMISSION_EVENTS'),
626626- ) as SchemaIdFor<ItemSubmissionKafkaMessageValue>,
627627- },
628628- ITEM_SUBMISSION_EVENTS_RETRY_0: {
629629- keySchema: parseInt(
630630- safeGetEnvVar(
631631- 'KAFKA_TOPIC_KEY_SCHEMA_ID_ITEM_SUBMISSION_EVENTS_RETRY_0',
632632- ),
633633- ) as SchemaIdFor<ItemSubmissionKafkaMessageKey>,
634634- valueSchema: parseInt(
635635- safeGetEnvVar(
636636- 'KAFKA_TOPIC_VALUE_SCHEMA_ID_ITEM_SUBMISSION_EVENTS_RETRY_0',
637637- ),
638638- ) as SchemaIdFor<ItemSubmissionKafkaMessageValue>,
639639- },
640640- },
641641- new SchemaRegistry(
642642- {
643643- host: safeGetEnvVar('KAFKA_SCHEMA_REGISTRY_HOST'),
644644- auth: {
645645- username: safeGetEnvVar('KAFKA_SCHEMA_REGISTRY_USERNAME'),
646646- password: safeGetEnvVar('KAFKA_SCHEMA_REGISTRY_PASSWORD'),
647647- },
648648- },
649649- {
650650- [SchemaType.AVRO]: {
651651- logicalTypes: {
652652- // Implementation copied from avsc docs.
653653- // See https://gist.github.com/mtth/1aec40375fbcb077aee7#file-date-js
654654- 'timestamp-millis': class extends avro.types.LogicalType {
655655- override _fromValue(val: string) {
656656- return new Date(val);
657657- }
658658- override _toValue(date: Date) {
659659- return date instanceof Date ? Number(date) : undefined;
660660- }
661661- override _resolve(type: unknown) {
662662- return avro.Type.isType(
663663- type,
664664- 'long',
665665- 'string',
666666- 'logical:timestamp-millis',
667667- )
668668- ? this._fromValue
669669- : undefined;
670670- }
671671- },
672672- },
673673- },
674674- },
675675- ),
676676- );
677677- });
678535679536 bottle.factory('Sequelize', () => makeDb());
680537 bottle.factory('OrgModel', ({ Sequelize }) => Sequelize.Org);
···755612 });
756613757614 bottle.factory('itemSubmissionQueueBulkWrite', (container) =>
758758- makeItemQueueBulkWrite(container.Kafka, 'ITEM_SUBMISSION_EVENTS'),
615615+ makeItemSubmissionBulkWrite(container.IORedis, ITEM_SUBMISSION_QUEUE_NAME),
759616 );
760617 bottle.factory('itemSubmissionRetryQueueBulkWrite', (container) =>
761761- makeItemQueueBulkWrite(container.Kafka, 'ITEM_SUBMISSION_EVENTS_RETRY_0'),
618618+ makeItemSubmissionBulkWrite(container.IORedis, ITEM_SUBMISSION_DLQ_NAME),
762619 );
763620764621 // Legacy service deprecated in favor of kysely.
-18
server/kafka/KafkajsZstdCompressionCodec.ts
···11-import { compress, decompress } from '@mongodb-js/zstd';
22-33-// The encoder class from Kafkajs doesn't have an exported type,
44-// but we can make a minimal stub in the meantime.
55-// See https://github.com/tulios/kafkajs/issues/1552
66-type Encoder = { buffer: Buffer };
77-88-export class KafkajsZstdCompressionCodec {
99- constructor(private readonly level: number) {}
1010-1111- async compress(encoder: Encoder) {
1212- return compress(encoder.buffer, this.level);
1313- }
1414-1515- async decompress(buffer: Buffer) {
1616- return decompress(buffer);
1717- }
1818-}
-99
server/kafka/SchemaAwareClient.ts
···11-import { SchemaRegistry as UntypedSchemaRegistry } from '@kafkajs/confluent-schema-registry';
22-import type { ConsumerConfig, Kafka as KafkaJS, KafkaConfig, ProducerConfig } from 'kafkajs';
33-44-import { createRequire } from 'module';
55-import SchemaAwareConsumer from './SchemaAwareConsumer.js';
66-import SchemaAwareProducer from './SchemaAwareProducer.js';
77-88-// NB: we import kafkajs using require() here instead of import because the
99-// open-telemetry instrumentations intercepts only require() calls in order
1010-// to patch modules. If kafkajs is imported using import, it won't be patched.
1111-const require = createRequire(import.meta.url);
1212-const {Kafka: KafkaClient} = require('kafkajs')
1313-1414-// Generic fake symbol for holding type-level metadata.
1515-declare const meta: unique symbol;
1616-1717-// This type holds the id of a schema in the Schema Registry, alongside
1818-// TS type-level metadata showing the expected shape of the decoded message.
1919-export type SchemaIdFor<T> = number & { readonly [meta]: T };
2020-2121-// Allows storing the Schema Registry, with some TS metadata reflecting
2222-// which schemas have been registered with the registry.
2323-export type SchemaRegistry<T extends AnyTopicSchemaMap> =
2424- UntypedSchemaRegistry & { readonly [meta]: T };
2525-2626-// Re-export the SchemaRegistry constructor w/ a type cast that lets us hold
2727-// registered schema metadata in the type param.
2828-export const SchemaRegistry = UntypedSchemaRegistry as new <
2929- T extends AnyTopicSchemaMap,
3030->(
3131- args: ConstructorParameters<typeof UntypedSchemaRegistry>[0],
3232- options?: ConstructorParameters<typeof UntypedSchemaRegistry>[1],
3333-) => SchemaRegistry<T>;
3434-3535-export type AnyTopicSchemaMap = {
3636- [topicName: string]: {
3737- keySchema: SchemaIdFor<unknown>;
3838- valueSchema: SchemaIdFor<unknown>;
3939- };
4040-};
4141-4242-// A union of the message key types for the given topics.
4343-export type KeyTypes<T extends AnyTopicSchemaMap> =
4444- T[keyof T]['keySchema'][typeof meta];
4545-4646-// A union of the message value types for the given topics.
4747-export type ValueTypes<T extends AnyTopicSchemaMap> =
4848- T[keyof T]['valueSchema'][typeof meta];
4949-5050-/**
5151- * Constructs a wrapped Kafka client instance that's aware of the Schema
5252- * Registry and our schemas in it.
5353- */
5454-export default class Kafka<TopicSchemaMap extends AnyTopicSchemaMap> {
5555- readonly #client: KafkaJS;
5656- readonly #schemaMap: TopicSchemaMap;
5757- readonly #registry: SchemaRegistry<TopicSchemaMap>;
5858-5959- constructor(
6060- config: KafkaConfig,
6161- schemaMap: TopicSchemaMap,
6262- registry: SchemaRegistry<TopicSchemaMap>,
6363- ) {
6464- this.#client = new KafkaClient(config);
6565- this.#schemaMap = schemaMap;
6666- this.#registry = registry;
6767- }
6868-6969- public producer(config?: ProducerConfig) {
7070- return new SchemaAwareProducer(
7171- this.#client,
7272- this.#registry,
7373- this.#schemaMap,
7474- // Unlike in Kafkajs, default allowAutoTopicCreation to false, since it's
7575- // not a super safe setting. We may have to revise this as we think about
7676- // the local dev story (and it may not be necessary if we have proper ACLs
7777- // in prod that bans our clients from creating topics).
7878- { allowAutoTopicCreation: false, ...config },
7979- );
8080- }
8181-8282- /**
8383- * The Topics type parameter should be filled in with the list of topic names
8484- * that the consumer might subscribe to. (It will only be allowed to subscribe
8585- * to these topics, and all of these topics must have a corresponding registered
8686- * schema.) In KafkaJS, choosing which topics to subscribe to and then actually
8787- * consuming the messages on those topics are two separate operations.
8888- * However, we have to link them in the types (i.e., the type of each decoded
8989- * message needs to depend on which topics the consumer has subscribed to), so
9090- * we use this Topics type parameter to do that.
9191- */
9292- public consumer<Topics extends keyof TopicSchemaMap>(config: ConsumerConfig) {
9393- return new SchemaAwareConsumer(
9494- this.#client,
9595- this.#registry as SchemaRegistry<Pick<TopicSchemaMap, Topics>>,
9696- config,
9797- );
9898- }
9999-}
-359
server/kafka/SchemaAwareConsumer.ts
···11-import {
22- type ConnectEvent,
33- type Consumer,
44- type ConsumerCommitOffsetsEvent,
55- type ConsumerConfig,
66- type ConsumerCrashEvent,
77- type ConsumerEndBatchProcessEvent,
88- type ConsumerEvents,
99- type ConsumerFetchEvent,
1010- type ConsumerFetchStartEvent,
1111- type ConsumerGroupJoinEvent,
1212- type ConsumerHeartbeatEvent,
1313- type ConsumerRebalancingEvent,
1414- type ConsumerReceivedUnsubcribedTopicsEvent,
1515- type ConsumerStartBatchProcessEvent,
1616- type DisconnectEvent,
1717- type InstrumentationEvent,
1818- type TopicPartition as KafakJSTopicPartition,
1919- type TopicPartitionOffset as KafakJSTopicPartitionOffset,
2020- type TopicPartitionOffsetAndMetadata as KafakJSTopicPartitionOffsetAndMetadata,
2121- type Kafka as KafkaJS,
2222- type ConsumerRunConfig as KafkaJSConsumerRunConfig,
2323- type ConsumerSubscribeTopics as KafkaJSConsumerSubscribeTopics,
2424- type EachBatchPayload as KafkaJSEachBatchPayload,
2525- type EachMessagePayload as KafkaJSEachMessagePayload,
2626- type KafkaMessage,
2727- type RemoveInstrumentationEventListener,
2828- type RequestEvent,
2929- type RequestQueueSizeEvent,
3030- type RequestTimeoutEvent,
3131- type KafkaJSError as KafkaJSErrorType,
3232- type KafkaJSProtocolError as KafkaJSProtocolErrorType
3333-} from 'kafkajs';
3434-import kafkaJs from 'kafkajs';
3535-3636-const { KafkaJSError, KafkaJSProtocolError } = kafkaJs;
3737-3838-3939-import { type Mutable } from '../utils/typescript-types.js';
4040-import {
4141- type AnyTopicSchemaMap,
4242- type KeyTypes,
4343- type SchemaRegistry,
4444- type ValueTypes,
4545-} from './SchemaAwareClient.js';
4646-4747-// Redefine a number of types to support subscribing to/processing messages
4848-// from topics that have a registered schema in a type-safe way.
4949-type ConsumerSubscribeTopics<T extends AnyTopicSchemaMap> = Pick<
5050- KafkaJSConsumerSubscribeTopics,
5151- 'fromBeginning'
5252-> & { topics: readonly (keyof T & string)[] };
5353-5454-export type DecodedMessage<EligibleTopics extends AnyTopicSchemaMap> = Omit<
5555- KafkaMessage,
5656- 'key' | 'value'
5757-> & {
5858- key: KeyTypes<EligibleTopics> | null;
5959- value: ValueTypes<EligibleTopics> | null;
6060-};
6161-6262-type EachMessagePayload<T extends AnyTopicSchemaMap> = Omit<
6363- KafkaJSEachMessagePayload,
6464- 'message' | 'topic'
6565-> & { topic: keyof T & string; message: DecodedMessage<T> };
6666-6767-type EachBatchPayload<T extends AnyTopicSchemaMap> = Omit<
6868- KafkaJSEachBatchPayload,
6969- 'batch'
7070-> & {
7171- batch: Omit<KafkaJSEachBatchPayload['batch'], 'topic'> & {
7272- topic: keyof T & string;
7373- decodedMessages: DecodedMessage<T>[];
7474- };
7575-};
7676-7777-export type ConsumerRunConfig<T extends AnyTopicSchemaMap> = Pick<
7878- KafkaJSConsumerRunConfig,
7979- | 'autoCommit'
8080- | 'autoCommitInterval'
8181- | 'autoCommitThreshold'
8282- | 'eachBatchAutoResolve'
8383- | 'partitionsConsumedConcurrently'
8484-> & {
8585- eachBatch?: (payload: EachBatchPayload<T>) => Promise<void>;
8686- eachMessage?: (payload: EachMessagePayload<T>) => Promise<void>;
8787-};
8888-8989-type TopicPartition<EligibleTopics extends AnyTopicSchemaMap> = Omit<
9090- KafakJSTopicPartition,
9191- 'topic'
9292-> & { topic: keyof EligibleTopics & string };
9393-9494-type TopicPartitionOffset<EligibleTopics extends AnyTopicSchemaMap> = Omit<
9595- KafakJSTopicPartitionOffset,
9696- 'topic'
9797-> & { topic: keyof EligibleTopics & string };
9898-9999-type TopicPartitionOffsetAndMetadata<EligibleTopics extends AnyTopicSchemaMap> =
100100- Omit<KafakJSTopicPartitionOffsetAndMetadata, 'topic'> & {
101101- topic: keyof EligibleTopics & string;
102102- };
103103-104104-/**
105105- * Returns a Kafka consumer whose received messages will be transparently
106106- * decoded using their schema in the schema registry.
107107- *
108108- * Note overridden argument types, to only allow subscribing to/processing
109109- * messages from known topics.
110110- */
111111-export default class SchemaAwareConsumer<
112112- EligibleTopicsSchemaMap extends AnyTopicSchemaMap,
113113-> {
114114- readonly #registry: SchemaRegistry<EligibleTopicsSchemaMap>;
115115- readonly #consumer: Consumer;
116116- public readonly config: ConsumerConfig;
117117-118118- constructor(
119119- client: KafkaJS,
120120- registry: SchemaRegistry<EligibleTopicsSchemaMap>,
121121- config: ConsumerConfig,
122122- ) {
123123- this.config = config;
124124- this.#registry = registry;
125125- this.#consumer = client.consumer(config);
126126- }
127127-128128- async #decodeMessage(message: KafkaMessage) {
129129- const [key, value] = await Promise.all([
130130- message.key ? this.#registry.decode(message.key) : message.key,
131131- message.value ? this.#registry.decode(message.value) : message.value,
132132- ]);
133133-134134- return {
135135- ...message,
136136- key: key as KeyTypes<EligibleTopicsSchemaMap>,
137137- value: value as ValueTypes<EligibleTopicsSchemaMap>,
138138- };
139139- }
140140-141141- async run(config?: ConsumerRunConfig<EligibleTopicsSchemaMap>) {
142142- return this.#consumer.run({
143143- // This cast helps TS understand that eachBatch and eachMessage, if
144144- // present on config, will always get overridden before being passed to
145145- // this.#consumer.run (i.e., will never be passed with the type defined in
146146- // ConsumerRunConfig<T>).
147147- ...(config as Omit<typeof config, 'eachBatch' | 'eachMessage'>),
148148- ...(config?.eachBatch
149149- ? {
150150- eachBatch: async (payload) => {
151151- // TODO: does this need plimit? It shouldn't bc the schema is
152152- // cached, but idk if the cache is smart enough to avoid a huge
153153- // spike in initial requests for the schema(s) if the batch kicks
154154- // off a lot of decodes at a time.
155155- const decodedMessages = await Promise.all(
156156- payload.batch.messages.map(async (msg) =>
157157- this.#decodeMessage(msg),
158158- ),
159159- );
160160-161161- // We have to create the new batch by putting the original batch
162162- // in the prototype chain, in order for methods on the batch
163163- // object (like `lastOffset()`) to continue to work. We can't use
164164- // something like { ...origBatch, messages: decodedMessages } as
165165- // the new batch.
166166- return config.eachBatch!({
167167- ...payload,
168168- batch: Object.create(payload.batch, {
169169- decodedMessages: {
170170- value: decodedMessages,
171171- writable: false,
172172- configurable: false,
173173- enumerable: true,
174174- },
175175- }) as typeof payload.batch & {
176176- decodedMessages: typeof decodedMessages;
177177- },
178178- });
179179- },
180180- }
181181- : {}),
182182- ...(config?.eachMessage
183183- ? {
184184- eachMessage: async (payload) => {
185185- return config.eachMessage!({
186186- ...payload,
187187- message: await this.#decodeMessage(payload.message),
188188- });
189189- },
190190- }
191191- : {}),
192192- });
193193- }
194194-195195- // Bunch of blindly delegated methods below, although with arg types redefined
196196- // for some of them to limit the set of applicable topics like above.
197197- //
198198- // These delegated methods are explicitly enumerated on purpose (rather than
199199- // just, e.g., putting the kakfajs consumer instance in the prototype chain)
200200- // to make sure that the abstraction isn't leaky; i.e., that some KafkaJS API
201201- // isn't automatically delegated to that exposes messages without calling
202202- // registry.decode() on them. This choice of explicit delegation reflects that
203203- // I'd rather have the API surface be missing some KafkaJS methods (which can
204204- // easily be added if needed) than have the abstraction inadvertently leak.
205205- async subscribe(opts: ConsumerSubscribeTopics<EligibleTopicsSchemaMap>) {
206206- return this.#consumer.subscribe(
207207- // cast bc kafkajs' typings incorrectly fail to mark the `topics` key as
208208- // readonly (which it should be, since kafkajs doesn't mutate this array)
209209- opts as Omit<typeof opts, 'topics'> & {
210210- topics: Mutable<typeof opts.topics>;
211211- },
212212- );
213213- }
214214-215215- async commitOffsets(
216216- topicPartitions: TopicPartitionOffsetAndMetadata<EligibleTopicsSchemaMap>[],
217217- ) {
218218- try {
219219- return await this.#consumer.commitOffsets(topicPartitions);
220220- } catch (e) {
221221- // We want to unwrap the underlying KafkaJSProtocolError and throw that
222222- // instead. This is because there is logic within the KafkaJS library
223223- // that handles KafkaJSProtocolErrors, and will e.g. recover and
224224- // rejoin the group on errors that are associated with rebalancing.
225225- // However, the error thrown by consumer.commitOffsets() is always
226226- // wrapped in a KafkaJSNonRetriableError because it went through the
227227- // retrier already. This prevents the KafkaJSProtocolError from being
228228- // gracefully handled by the library unless we unwrap and throw it here.
229229- //
230230- // Alternatively we could turn on autoCommit for the simpler
231231- // consumers, which currently throws protocol errors directly.
232232- if (e instanceof KafkaJSError) {
233233- throw unwrapProtocolError(e) ?? e;
234234- }
235235-236236- throw e;
237237- }
238238- }
239239-240240- async seek(
241241- topicPartitionOffset: TopicPartitionOffset<EligibleTopicsSchemaMap>,
242242- ) {
243243- return this.#consumer.seek(topicPartitionOffset);
244244- }
245245-246246- async pause(topics: TopicPartition<EligibleTopicsSchemaMap>[]) {
247247- return this.#consumer.pause(topics);
248248- }
249249-250250- async resume(topics: TopicPartition<EligibleTopicsSchemaMap>[]) {
251251- return this.#consumer.resume(topics);
252252- }
253253-254254- async stop() {
255255- return this.#consumer.stop();
256256- }
257257-258258- async connect() {
259259- return this.#consumer.connect();
260260- }
261261-262262- async disconnect() {
263263- return this.#consumer.disconnect();
264264- }
265265-266266- // Overloads copied straight from the KafkaJS typings.
267267- // This is hella ugly, but idk a better alternative.
268268- on(
269269- eventName: ConsumerEvents['HEARTBEAT'],
270270- listener: (event: ConsumerHeartbeatEvent) => void,
271271- ): RemoveInstrumentationEventListener<typeof eventName>;
272272- on(
273273- eventName: ConsumerEvents['COMMIT_OFFSETS'],
274274- listener: (event: ConsumerCommitOffsetsEvent) => void,
275275- ): RemoveInstrumentationEventListener<typeof eventName>;
276276- on(
277277- eventName: ConsumerEvents['GROUP_JOIN'],
278278- listener: (event: ConsumerGroupJoinEvent) => void,
279279- ): RemoveInstrumentationEventListener<typeof eventName>;
280280- on(
281281- eventName: ConsumerEvents['FETCH_START'],
282282- listener: (event: ConsumerFetchStartEvent) => void,
283283- ): RemoveInstrumentationEventListener<typeof eventName>;
284284- on(
285285- eventName: ConsumerEvents['FETCH'],
286286- listener: (event: ConsumerFetchEvent) => void,
287287- ): RemoveInstrumentationEventListener<typeof eventName>;
288288- on(
289289- eventName: ConsumerEvents['START_BATCH_PROCESS'],
290290- listener: (event: ConsumerStartBatchProcessEvent) => void,
291291- ): RemoveInstrumentationEventListener<typeof eventName>;
292292- on(
293293- eventName: ConsumerEvents['END_BATCH_PROCESS'],
294294- listener: (event: ConsumerEndBatchProcessEvent) => void,
295295- ): RemoveInstrumentationEventListener<typeof eventName>;
296296- on(
297297- eventName: ConsumerEvents['CONNECT'],
298298- listener: (event: ConnectEvent) => void,
299299- ): RemoveInstrumentationEventListener<typeof eventName>;
300300- on(
301301- eventName: ConsumerEvents['DISCONNECT'],
302302- listener: (event: DisconnectEvent) => void,
303303- ): RemoveInstrumentationEventListener<typeof eventName>;
304304- on(
305305- eventName: ConsumerEvents['STOP'],
306306- listener: (event: InstrumentationEvent<null>) => void,
307307- ): RemoveInstrumentationEventListener<typeof eventName>;
308308- on(
309309- eventName: ConsumerEvents['CRASH'],
310310- listener: (event: ConsumerCrashEvent) => void,
311311- ): RemoveInstrumentationEventListener<typeof eventName>;
312312- on(
313313- eventName: ConsumerEvents['REBALANCING'],
314314- listener: (event: ConsumerRebalancingEvent) => void,
315315- ): RemoveInstrumentationEventListener<typeof eventName>;
316316- on(
317317- eventName: ConsumerEvents['RECEIVED_UNSUBSCRIBED_TOPICS'],
318318- listener: (event: ConsumerReceivedUnsubcribedTopicsEvent) => void,
319319- ): RemoveInstrumentationEventListener<typeof eventName>;
320320- on(
321321- eventName: ConsumerEvents['REQUEST'],
322322- listener: (event: RequestEvent) => void,
323323- ): RemoveInstrumentationEventListener<typeof eventName>;
324324- on(
325325- eventName: ConsumerEvents['REQUEST_TIMEOUT'],
326326- listener: (event: RequestTimeoutEvent) => void,
327327- ): RemoveInstrumentationEventListener<typeof eventName>;
328328- on(
329329- eventName: ConsumerEvents['REQUEST_QUEUE_SIZE'],
330330- listener: (event: RequestQueueSizeEvent) => void,
331331- ): RemoveInstrumentationEventListener<typeof eventName>;
332332- on(
333333- eventName: ConsumerEvents[keyof ConsumerEvents],
334334- // The type parameter here has to be `any` (or some union that'd be hard to
335335- // generate), rather than unknown, for TS to allow the overloads.
336336- // eslint-disable-next-line @typescript-eslint/no-explicit-any
337337- listener: (event: InstrumentationEvent<any>) => void,
338338- ): RemoveInstrumentationEventListener<typeof eventName> {
339339- return this.#consumer.on(eventName, listener);
340340- }
341341-342342- public get events() {
343343- return this.#consumer.events;
344344- }
345345-}
346346-347347-// Helper function to unwrap the underlying KafkaJSProtocolError from a
348348-// KafkaJSError, if present.
349349-function unwrapProtocolError(e: KafkaJSErrorType): KafkaJSProtocolErrorType | undefined {
350350- if (e instanceof KafkaJSProtocolError) {
351351- return e;
352352- }
353353-354354- if (e.cause && e.cause instanceof KafkaJSError) {
355355- return unwrapProtocolError(e.cause);
356356- }
357357-358358- return undefined;
359359-}
-175
server/kafka/SchemaAwareProducer.ts
···11-import type {
22- ConnectEvent,
33- DisconnectEvent,
44- InstrumentationEvent,
55- Kafka as KafkaJS,
66- ProducerBatch as KafkaJSProducerBatch,
77- ProducerRecord as KafkaJSProducerRecord,
88- Message as KafkaJSWriteMessage,
99- Producer,
1010- ProducerConfig,
1111- ProducerEvents,
1212- RemoveInstrumentationEventListener,
1313- RequestEvent,
1414- RequestQueueSizeEvent,
1515- RequestTimeoutEvent,
1616-} from 'kafkajs';
1717-1818-import {
1919- type AnyTopicSchemaMap,
2020- type KeyTypes,
2121- type SchemaRegistry,
2222- type ValueTypes,
2323-} from './SchemaAwareClient.js';
2424-// This is imported just so that the docblock comment can link to it.
2525-// eslint-disable-next-line @typescript-eslint/no-unused-vars
2626-import SchemaAwareConsumer from './SchemaAwareConsumer.js';
2727-2828-// Represents a message to produce to a topic before it's encoded.
2929-// NB: for best accuracy, Topic should be instantiated w/ a single string
3030-// literal type (as we do in ProducerBatch) rather than a union of literals.
3131-type TopicMessage<T extends AnyTopicSchemaMap, Topic extends keyof T> = Omit<
3232- KafkaJSWriteMessage,
3333- 'key' | 'value'
3434-> & {
3535- key?: KeyTypes<Pick<T, Topic>>;
3636- value: ValueTypes<Pick<T, Topic>>;
3737-};
3838-3939-type TopicMessages<T extends AnyTopicSchemaMap, Topic extends keyof T> = {
4040- topic: Topic;
4141- messages: TopicMessage<T, Topic>[];
4242-};
4343-4444-type ProducerRecord<T extends AnyTopicSchemaMap, Topic extends keyof T> = Omit<
4545- KafkaJSProducerRecord,
4646- 'topic' | 'messages'
4747-> &
4848- TopicMessages<T, Topic>;
4949-5050-type ProducerBatch<T extends AnyTopicSchemaMap, Topics extends keyof T> = Omit<
5151- KafkaJSProducerBatch,
5252- 'topicMessages'
5353-> & { topicMessages: { [Topic in Topics]: TopicMessages<T, Topic> }[Topics][] };
5454-5555-/**
5656- * This class is analogous to the {@link SchemaAwareConsumer} class,
5757- * so see that class for details behind the implementation rationale.
5858- *
5959- * TODO: support producer transactions.
6060- */
6161-export default class SchemaAwareProducer<T extends AnyTopicSchemaMap> {
6262- readonly #schemaMap: T;
6363- readonly #registry: SchemaRegistry<T>;
6464- readonly #producer: Producer;
6565- public readonly config: ProducerConfig | undefined;
6666-6767- constructor(
6868- client: KafkaJS,
6969- registry: SchemaRegistry<T>,
7070- schemaMap: T,
7171- config?: ProducerConfig,
7272- ) {
7373- this.config = config;
7474- this.#registry = registry;
7575- this.#schemaMap = schemaMap;
7676- this.#producer = client.producer(config);
7777- }
7878-7979- async #encodeTopicMessage<Topic extends keyof T>(
8080- topic: Topic,
8181- message: TopicMessage<T, Topic>,
8282- ) {
8383- const { keySchema, valueSchema } = this.#schemaMap[topic];
8484- const [key, value] = await Promise.all([
8585- message.key != null
8686- ? this.#registry.encode(keySchema, message.key)
8787- : null,
8888- message.value != null
8989- ? this.#registry.encode(valueSchema, message.value)
9090- : null,
9191- ]);
9292-9393- return { ...message, key, value };
9494- }
9595-9696- async #encodeTopicMessages<Topic extends keyof T>(
9797- it: TopicMessages<T, Topic>,
9898- ) {
9999- return Promise.all(
100100- // We don't make the map callback async as that just wastefully allocates
101101- // (a lot) of extra promises. (We're already ensured that synchronosuly
102102- // thrown errors in `#encodeTopicMessage` will be handled correctly
103103- // because it's an async function.)
104104- // eslint-disable-next-line @typescript-eslint/promise-function-async
105105- it.messages.map((message) => this.#encodeTopicMessage(it.topic, message)),
106106- );
107107- }
108108-109109- async send<Topic extends keyof T & string>(record: ProducerRecord<T, Topic>) {
110110- return this.#producer.send({
111111- ...record,
112112- messages: await this.#encodeTopicMessages(record),
113113- });
114114- }
115115-116116- async sendBatch<Topics extends keyof T & string>(
117117- batch: ProducerBatch<T, Topics>,
118118- ) {
119119- return this.#producer.sendBatch({
120120- ...batch,
121121- topicMessages: await Promise.all(
122122- batch.topicMessages.map(async (it) => ({
123123- ...it,
124124- messages: await this.#encodeTopicMessages(it),
125125- })),
126126- ),
127127- });
128128- }
129129-130130- async connect() {
131131- return this.#producer.connect();
132132- }
133133-134134- async disconnect() {
135135- return this.#producer.disconnect();
136136- }
137137-138138- isIdempotent() {
139139- return this.#producer.isIdempotent();
140140- }
141141-142142- get events() {
143143- return this.#producer.events;
144144- }
145145-146146- on(
147147- eventName: ProducerEvents['CONNECT'],
148148- listener: (event: ConnectEvent) => void,
149149- ): RemoveInstrumentationEventListener<typeof eventName>;
150150- on(
151151- eventName: ProducerEvents['DISCONNECT'],
152152- listener: (event: DisconnectEvent) => void,
153153- ): RemoveInstrumentationEventListener<typeof eventName>;
154154- on(
155155- eventName: ProducerEvents['REQUEST'],
156156- listener: (event: RequestEvent) => void,
157157- ): RemoveInstrumentationEventListener<typeof eventName>;
158158- on(
159159- eventName: ProducerEvents['REQUEST_QUEUE_SIZE'],
160160- listener: (event: RequestQueueSizeEvent) => void,
161161- ): RemoveInstrumentationEventListener<typeof eventName>;
162162- on(
163163- eventName: ProducerEvents['REQUEST_TIMEOUT'],
164164- listener: (event: RequestTimeoutEvent) => void,
165165- ): RemoveInstrumentationEventListener<typeof eventName>;
166166- on(
167167- eventName: ProducerEvents[keyof ProducerEvents],
168168- // The type parameter here has to be `any` (or some union that'd be hard to
169169- // generate), rather than unknown, for TS to allow the overloads.
170170- // eslint-disable-next-line @typescript-eslint/no-explicit-any
171171- listener: (event: InstrumentationEvent<any>) => void,
172172- ): RemoveInstrumentationEventListener<typeof eventName> {
173173- return this.#producer.on(eventName, listener);
174174- }
175175-}
-46
server/kafka/index.ts
···11-import kafkaJs from 'kafkajs';
22-33-import { KafkajsZstdCompressionCodec } from './KafkajsZstdCompressionCodec.js';
44-import SchemaAwareKafkaClient, {
55- SchemaRegistry,
66- type SchemaIdFor,
77-} from './SchemaAwareClient.js';
88-import {
99- type ConsumerRunConfig,
1010- type DecodedMessage,
1111-} from './SchemaAwareConsumer.js';
1212-import type SchemaAwareConsumer from './SchemaAwareConsumer.js';
1313-import type SchemaAwareProducer from './SchemaAwareProducer.js';
1414-1515-// Only the wrapper client class is exported, not the consumer/producer classes.
1616-export default SchemaAwareKafkaClient;
1717-1818-const { CompressionCodecs, CompressionTypes } = kafkaJs;
1919-2020-// The line below will allow producers to generate, and consumers to read,
2121-// messages compressed w/ zstd. However, it doesn't require (or automatically
2222-// opt-in) the producers to using compression, nor does it stop the consumers
2323-// from reading uncompressed messages.
2424-//
2525-// In Kafkajs, the registered compression codecs are global, so there's no way
2626-// to (e.g.) provide different detailed compression options per client/
2727-// producer/topic/message batch. In other words, any messages that request
2828-// compression w/ zstd will get this compression level 5, which is a bit
2929-// annoying because different topics might warrant different compression levels.
3030-// See https://github.com/tulios/kafkajs/issues/1553
3131-//
3232-// Given that this setting is global, we also can't expose any
3333-// compression-related options on the classes we export from this module,
3434-// as they can't do any sort of local override.
3535-CompressionCodecs[CompressionTypes.ZSTD] = () =>
3636- new KafkajsZstdCompressionCodec(5);
3737-3838-export type {
3939- SchemaIdFor,
4040- DecodedMessage,
4141- SchemaAwareKafkaClient as Kafka,
4242- SchemaAwareProducer as KafkaProducer,
4343- SchemaAwareConsumer as KafkaConsumer,
4444- ConsumerRunConfig as KafkaConsumerRunConfig,
4545-};
4646-export { SchemaRegistry };
-117
server/kafka/itemQueueBulkWrite.ts
···11-import DataLoader from 'dataloader';
22-import { CompressionTypes } from 'kafkajs';
33-44-import {
55- type ItemSubmissionKafkaMessageValue,
66- type KafkaSchemaMap,
77-} from '../iocContainer/index.js';
88-import { type Kafka, type KafkaProducer } from '../kafka/index.js';
99-import { sleep } from '../utils/misc.js';
1010-1111-type ITEM_SUBMISSION_SCHEMAS =
1212- | 'ITEM_SUBMISSION_EVENTS'
1313- | 'ITEM_SUBMISSION_EVENTS_RETRY_0';
1414-1515-/**
1616- * Factory for a service that'll write to Kafka after batching the writes,
1717- * returns to the caller after the whole batch has been written.
1818- */
1919-function makeItemQueueBulkWrite(
2020- kafka: Kafka<Pick<KafkaSchemaMap, ITEM_SUBMISSION_SCHEMAS>>,
2121- topic: ITEM_SUBMISSION_SCHEMAS,
2222-) {
2323- const kafkaProducer = kafka.producer();
2424- let connectError: Error | undefined;
2525- const initialConnectPromise = kafkaProducer.connect().catch((err: unknown) => {
2626- // Store the error to prevent an unhandled promise rejection from crashing
2727- // the process. We re-throw it when callers attempt to write to Kafka.
2828- connectError = err instanceof Error ? err : new Error(String(err));
2929- });
3030- const batchTimeout = 500;
3131-3232- const loader: DataLoader<ItemSubmissionKafkaMessageValue, void> =
3333- new DataLoader(
3434- async (data) =>
3535- bulkWrite(kafkaProducer, data, topic).then(() =>
3636- new Array(data.length).fill(undefined),
3737- ),
3838- {
3939- cache: false,
4040- batch: true,
4141- maxBatchSize: 200,
4242- batchScheduleFn(cb) {
4343- setTimeout(cb, batchTimeout);
4444- },
4545- },
4646- );
4747-4848- async function itemQueueBulkWrite(
4949- items: readonly ItemSubmissionKafkaMessageValue[],
5050- skipBatch: boolean = false,
5151- ) {
5252- await initialConnectPromise;
5353- if (connectError) {
5454- throw connectError;
5555- }
5656- // bulkWrite and loader.loadMany have different return types, so we have to
5757- // handle their returns separately and construct a homogenous return type in
5858- // each case, in addition to the logical difference of using batching or not
5959- if (skipBatch) {
6060- try {
6161- await bulkWrite(kafkaProducer, items, topic);
6262- return { error: false, results: [] };
6363- } catch (err) {
6464- return { error: true, results: [err] };
6565- }
6666- } else {
6767- // loader.loadMany never throws, just return error objects in it's
6868- // response
6969- const response = await loader.loadMany(items);
7070- if (response.some((r) => r instanceof Error)) {
7171- return {
7272- error: true,
7373- results: response,
7474- };
7575- }
7676- return {
7777- error: false,
7878- results: [],
7979- };
8080- }
8181- }
8282-8383- itemQueueBulkWrite.close = async () => {
8484- // make sure the latest batch of writes has been flushed to kafka before we
8585- // attempt to disconnect. This should be the last batch, assuming
8686- // bulkWrite isn't called again after `close()` is called.
8787- await sleep(batchTimeout + 1000);
8888- await kafkaProducer.disconnect();
8989- };
9090-9191- return itemQueueBulkWrite;
9292-}
9393-9494-export type ItemQueueBulkWrite = ReturnType<typeof makeItemQueueBulkWrite>;
9595-9696-export { makeItemQueueBulkWrite };
9797-9898-async function bulkWrite(
9999- kafka: KafkaProducer<Pick<KafkaSchemaMap, ITEM_SUBMISSION_SCHEMAS>>,
100100- data: readonly ItemSubmissionKafkaMessageValue[],
101101- topic: ITEM_SUBMISSION_SCHEMAS,
102102-) {
103103- if (!data.length) {
104104- return;
105105- }
106106-107107- await kafka.send({
108108- topic,
109109- compression: CompressionTypes.ZSTD,
110110- messages: data.map((msg) => ({
111111- key: {
112112- syntheticThreadId: msg.metadata.syntheticThreadId,
113113- },
114114- value: msg,
115115- })),
116116- });
117117-}
-37
server/kafka/logger.ts
···11-/* eslint-disable no-console */
22-import util from 'util';
33-import { logLevel, type logCreator } from 'kafkajs';
44-55-import { assertUnreachable } from '../utils/misc.js';
66-77-// This is almost a carbon copy of the default logger in kafkajs, but it uses
88-// util.inspect instead of JSON.stringify to avoid errors when logging circular
99-// objects. See:
1010-// https://github.com/tulios/kafkajs/blob/master/src/loggers/console.js
1111-// https://github.com/tulios/kafkajs/issues/975
1212-const logCreator: logCreator =
1313- () =>
1414- ({ namespace, level, label, log }) => {
1515- const prefix = namespace ? `[${namespace}] ` : '';
1616- const message = util.inspect({
1717- level: label,
1818- ...log,
1919- message: `${prefix}${log.message}`,
2020- });
2121- switch (level) {
2222- case logLevel.INFO:
2323- return console.info(message);
2424- case logLevel.ERROR:
2525- return console.error(message);
2626- case logLevel.WARN:
2727- return console.warn(message);
2828- case logLevel.DEBUG:
2929- return console.log(message);
3030- case logLevel.NOTHING:
3131- return;
3232- default:
3333- assertUnreachable(level);
3434- }
3535- };
3636-3737-export default logCreator;
···4455import {
66 type Dependencies,
77- type ItemSubmissionKafkaMessageValue,
77+ type ItemSubmissionMessageValue,
88} from '../../iocContainer/index.js';
99import { safeGetEnvVar } from '../../iocContainer/utils.js';
1010import {
···232232 return next(new AggregateError(errors));
233233 }
234234235235- // Send 5% of traffic to the async processing queue, otherwise handle in
236236- // the traditional way (in this process, immediately after returning 202 to
237237- // the user)
235235+ // Send a configurable percentage of traffic to the async processing queue
236236+ // (BullMQ), otherwise handle inline (in this process, immediately after
237237+ // returning 202 to the user). Set to 1 to route all traffic through the queue.
238238 const trafficPercentage = Number(
239239 safeGetEnvVar('ITEM_QUEUE_TRAFFIC_PERCENTAGE'),
240240 );
···243243 // valid Date, but due to legacy data the type returned, ItemSubmission, an
244244 // optional `submissionTime` property. this variable is used to convince
245245 // typescript that the value we subsequently pass to itemSubmissionQueueBulkWrite has
246246- // a valid Date in the `submissionTime` property, which is specified in the
247247- // schema for the kafka topic that item submissions get written to.
246246+ // a valid Date in the `submissionTime` property.
248247 const backupSubmissiontime = new Date();
249248 const submissionsToProcess = itemSubmissionsOrErrors.map((it) => {
250249 // We checked for errors earlier so this should never happen
···290289 // toItemSubmission specifies it is optional, as noted above
291290 it.itemSubmission.submissionTime ?? backupSubmissiontime,
292291 },
293293- } satisfies ItemSubmissionKafkaMessageValue;
292292+ } satisfies ItemSubmissionMessageValue;
294293 });
295294296295 Meter.itemsEnqueued.add(submissionsToProcess.length);
···332332 // TODO: start sending automatic close decisions when we are sending the
333333 // report decision callbacks
334334 if (newDecisionStored && automaticCloseDecision === undefined) {
335335- // TODO: use proper publishing to a durable queue (kafka?) and retry
335335+ // TODO: use proper publishing to a durable queue and retry
336336 this.onRecordDecision({
337337 decisionComponents,
338338 relatedActions,
+1-1
server/storage/dataWarehouse/warehouseSchema.ts
···81818282/**
8383 * Transforms a table row type into the shape expected for bulk/eventual writes
8484- * (e.g. via Kafka ingestion). Keys are lowercased and nullable keys become
8484+ * (e.g. via bulk ingestion). Keys are lowercased and nullable keys become
8585 * optional. The `AcceptSlowQueries` flag controls whether JSON null values are
8686 * permitted (they can degrade columnar storage performance).
8787 */
+9-12
server/utils/CoopMeter.ts
···2020 // indicates a bug in the processing code or an infrastructure/network issue
2121 // that is preventing progress from being made
2222 public readonly itemProcessingFailuresCounter: opentelemetry.Counter;
2323- // Used to track the amount of time a worker spends processing one batch of
2424- // item submissions (batch as seen by Kafka, not a user batch). Gives a
2525- // rough idea of item processing performance.
2626- public readonly itemProcessingBatchTime: opentelemetry.Histogram;
2727- // Tracks the batch size for each batch of items processed by a worker.
2828- // This metric can help tune # of workers, # of partitions assigned per
2929- // worker, and timeouts for kafka batch writes
3030- public readonly itemProcessingBatchSize: opentelemetry.Histogram;
2323+ // Tracks the time a worker spends processing a single job.
2424+ public readonly itemProcessingJobTime: opentelemetry.Histogram;
2525+ // Snapshot of waiting + active jobs in the queue, sampled after each
2626+ // job completes. Useful for detecting backpressure.
2727+ public readonly itemProcessingQueueDepth: opentelemetry.Histogram;
3128 // Counts the number of items sent to the processing queue
3229 // this is mostly for debugging, and should allow us to confirm
3330 // the percentage of traffic we are sending to the queue and
···7269 this.itemsEnqueued = myMeter.createCounter(
7370 `${metricNamespace}.items.enqueued-to-processing-queue.counter`,
7471 );
7575- this.itemProcessingBatchTime = myMeter.createHistogram(
7676- `${metricNamespace}.items.batch-processing-time-ms.histogram`,
7272+ this.itemProcessingJobTime = myMeter.createHistogram(
7373+ `${metricNamespace}.items.job-processing-time-ms.histogram`,
7774 );
7878- this.itemProcessingBatchSize = myMeter.createHistogram(
7979- `${metricNamespace}.items.batch-size.histogram`,
7575+ this.itemProcessingQueueDepth = myMeter.createHistogram(
7676+ `${metricNamespace}.items.queue-depth.histogram`,
8077 );
8178 }
8279}
+137-334
server/workers_jobs/ItemProcessingWorker.ts
···1122-import { type KafkaSchemaMap } from '../iocContainer/index.js';
22+import { Queue, Worker as BullWorker, type Job as BullJob } from 'bullmq';
33+import { type Cluster } from 'ioredis';
44+import type IORedis from 'ioredis';
55+66+import { type ItemSubmissionMessageValue } from '../iocContainer/index.js';
37import { inject } from '../iocContainer/utils.js';
44-import { type Kafka, type KafkaConsumerRunConfig } from '../kafka/index.js';
88+import { ITEM_SUBMISSION_QUEUE_NAME } from '../queues/itemSubmissionQueue.js';
59import {
610 submissionDataToItemSubmission,
711 type ItemSubmission,
···1115import { withRetries } from '../utils/misc.js';
1216import { type Worker } from './index.js';
13171414-const topicsToConsume = ['ITEM_SUBMISSION_EVENTS'] as const;
1515-1616-type ConsumedTopic = (typeof topicsToConsume)[number];
1717-1818export default inject(
1919 [
2020- 'Kafka',
2020+ 'IORedis',
2121 'Tracer',
2222 'RuleEngine',
2323 'ContentApiLogger',
···2727 'itemSubmissionRetryQueueBulkWrite',
2828 ],
2929 (
3030- kafka: Kafka<Pick<KafkaSchemaMap, ConsumedTopic>>,
3030+ redis: IORedis.Redis | Cluster,
3131 tracer,
3232 ruleEngine,
3333 contentApiLogger,
···3636 Meter,
3737 itemSubmissionRetryQueueBulkWrite,
3838 ) => {
3939- let consumer: ReturnType<typeof kafka.consumer<ConsumedTopic>>;
3939+ let worker: BullWorker<ItemSubmissionMessageValue> | undefined;
4040+ let queue: Queue<ItemSubmissionMessageValue> | undefined;
40414142 return {
4243 type: 'Worker' as const,
4344 async run(_signal) {
4444- consumer = kafka.consumer<ConsumedTopic>({
4545- // NB: don't rename lightly, as this has permissions
4646- // associated w/ it through Kafka ACLS.
4747- groupId: 'item-submission-worker',
4848- maxBytesPerPartition: 1024 * 1024, // 1 mb = Default
4949- sessionTimeout: 90_000,
5050- });
4545+ queue = new Queue(ITEM_SUBMISSION_QUEUE_NAME, { connection: redis });
4646+ const insertWithRetries = tracer.traced(
4747+ {
4848+ resource: 'itemProcessingWorker',
4949+ operation: 'ItemInvestigationService.insertItem',
5050+ },
5151+ withRetries(
5252+ {
5353+ maxRetries: 1,
5454+ initialTimeMsBetweenRetries: 75,
5555+ maxTimeMsBetweenRetries: 250,
5656+ },
5757+ ItemInvestigationService.insertItem.bind(ItemInvestigationService),
5858+ ),
5959+ );
51605252- await consumer.connect();
5353- await consumer.subscribe({ topics: topicsToConsume });
6161+ worker = new BullWorker<ItemSubmissionMessageValue>(
6262+ ITEM_SUBMISSION_QUEUE_NAME,
6363+ async (job: BullJob<ItemSubmissionMessageValue>) => {
6464+ const processJob = tracer.traced(
6565+ {
6666+ operation: 'processJob',
6767+ resource: 'itemsProcessingWorker',
6868+ },
6969+ async () => {
7070+ const jobStartTime = performance.now();
7171+ const { itemSubmissionWithTypeIdentifier, metadata } = job.data;
54725555- // An error thrown within eachBatch does not lead the promise returned
5656- // by `consumer.run()` to reject. Instead, that promise resolves
5757- // immediately once the consumer starts running and, if an error occurs
5858- // within `eachBatch`, kafkajs will simply retry the `eachBatch`
5959- // callback a few times (the exact number is configurable). However,
6060- // _even once that retry count limit is exhausted_, the `consumer.run()`
6161- // call still does not reject, as you might expect.
6262- //
6363- // Instead, once that retry count is exhausted, kafkajs switches from
6464- // silently + automatically retrying `eachBatch` to emitting a `crash`
6565- // event on the consumer. However, even after this `crash` event is
6666- // emitted, kafkajs does not stop the consumer or raise an exception.
6767- // Instead, kafkajs's default behavior is to simply restart the consumer
6868- // after the crash event. So the overall nodejs process will, by
6969- // default, basically never crash.
7070- //
7171- // However, for now, we _want_ Node to crash if we're getting repeated
7272- // errors (even after retrying) within `eachBatch`, so that we can take
7373- // advantage of simple, out-of-the-box monitoring to see these crashes.
7474- // Therefore, we register a crash listener that throws unconditionally
7575- // (again, this only applies once the automatic retries have failed and
7676- // the crash event is emitted). The unconditional part means that we're
7777- // ignoring `event.payload.restart`, which is the flag for whether
7878- // kafkajs should restart the consumer after the crash, and which is
7979- // always true by default. Kafkajs takes a `retryOnFailure` setting for
8080- // configuring that, but we don't even bother, because we always want to
8181- // crash nodejs/the whole process once the consumer crash event is
8282- // emitted.
8383- consumer.on('consumer.crash', (event) => {
8484- const { error } = event.payload;
8585- tracer.logActiveSpanFailedIfAny(error);
8686- throw error;
8787- });
7373+ Meter.itemProcessingAttemptsCounter.add(1, {
7474+ process: 'item-processing-worker',
7575+ });
88768989- // Error Cases
9090- //
9191- // 1. If the worker is shut down by kubernetes (eg. a new version is
9292- // deployed and being rolled out, or because k8s decides this pod needs
9393- // to be evicted/moved to another node), then shutdown() will run,
9494- // which will call `consumer.disconnect()`, which also makes the
9595- // consumer stop pulling new messages, so there's nothing else we have
9696- // to do. If a batch is interrupted by shutdown, KafkaJS will
9797- // automatically commit any resolved offsets so that we don’t lose
9898- // progress another worker doesn’t re-process messages that this worker
9999- // has already seen, while the rest of the batches messages should be
100100- // picked up by another worker and processed eventually.
101101- //
102102- // 2. If Kafka is unavailable when the worker starts, then the consumer
103103- // will fail to connect or subscribe, the worker will throw an
104104- // exception (after some internal kafkajs retrying), no state will get
105105- // messed up, and k8s can restart the worker.
106106- //
107107- // 3. If Kafka becomes unavailable while the worker is running,
108108- // there are two cases:
109109- // a) KafkaJS detects that Kafka is unavailable when it tries to
110110- // fetch the next batch. By default kafkajs will try to reconnect
111111- // for a while; if that fails, I think an error is eventually
112112- // raised (either thrown or as the "CRASH" event), in which case
113113- // there shouldn’t be any buffered messages (because we only
114114- // request a new batch when the current one is completely finished
115115- // processing and the offset is committed) so when Kafka becomes
116116- // available again we should be able to start making progress with
117117- // no weird state.
118118- // b) kafkajs detects that kafka is unavailable when it tries to
119119- // commit the offsets, _after processing items and publishing
120120- // actions_. Kafkajs will already retry committing the offsets
121121- // but, if that fails, what do we do? If we shut down the worker,
122122- // the consumer will restart and reprocess the messages that
123123- // didn’t get their offsets committed. It is not a catastrophic
124124- // failure if one batch of messages is re-processed when there is
125125- // a connection issue with kafka, so this worker does not have
126126- // logic to prevent this situation. The main issue with this
127127- // failure is we may publish actions for those items more than
128128- // once, which again is not catastrophic but also not ideal. To
129129- // prevent this we can add an idempotency mechanism to the action
130130- // publisher that stores a key of either `topic:partition:offset`
131131- // or `requestId:SubmissionId` for each action with some
132132- // reasonable TTL, and also checks for that keys existence before
133133- // sending a request to a custom action callback API.
134134- //
135135- // 4. If a partition gets reassigned while a batch is in the middle of
136136- // processing,kafkaJS will automatically commit the resolved offsets
137137- // for the current batch, similar to case 1.
138138- //
139139- // 5. If the item processing takes a long time (which is very
140140- // possible since much of the rule engine is network I/O), we want to
141141- // make sure that Kafka doesn't think the consumer is dead and
142142- // needlessly reassign its partitions. So, we set a 5 second heartbeat
143143- // interval outside of KafkaJS’s automatic heartbeat flow.
144144- //
145145- // 6. An error is thrown while processing a message. This can happen
146146- // if any one of `itemDataToItemSubmission`, `runEnabledRules`, or
147147- // `logContentAPIRequest` throws. In all these cases we choose to
148148- // block the queue (or at least the current partition) from
149149- // progressing until the error is resolved, for the reasons explained
150150- // below:
151151- // a) `itemDataToItemSubmission` throws. This could happen if there
152152- // is an issue connecting to postgres, in which case we should retry
153153- // until it succeeds (this can be handled by simply throwing and
154154- // causing the batch to retry). If data is fundamentally malformed
155155- // and will always cause this error to throw, this is likely
156156- // due to a bug in the Kafka producer code, or somehow bad data
157157- // got through validation and is not reconstructible. In this
158158- // case we write to a separate queue to allow processing of
159159- // other messages to continue, and these bad messages can be
160160- // inspected from the dead letter queue
161161- //
162162- // b) `runEnabledRules` throws. This does not happen in the usual
163163- // lifecycle of our application, even if all signals associated with
164164- // a given rule fail. This usually happens when we push a bug or bad
165165- // code, or if some other infrastructure is down (e.g. postgres). In
166166- // this case we want to block progress until the external dependency
167167- // is back up or we deploy a fix for the bug. This ensures that all
168168- // items are processed normally when the issue is resolved.
169169- //
170170- // c) `logContentAPIRequest` throws. This will happen if a
171171- // connection to kafka is unavailable, in which case we generally
172172- // can’t make progress, or if this function throws. This is likely
173173- // to be a transient error and we can throw this error, causing the
174174- // batch to be retried. Although this is the same strategy as 6.a
175175- // and 6.b, in this case we have already processed the given item
176176- // and may have published actions related to it so we risk
177177- // publishing actions more than once (as well as doing duplicate
178178- // work more generally). This can be mitigated with the same
179179- // idempotency strategy described in 3.b, and again we don’t take
180180- // pains to prevent duplicate work in this case in the code for this
181181- // worker.
7777+ let itemSubmission;
7878+ try {
7979+ const { itemTypeIdentifier } =
8080+ itemSubmissionWithTypeIdentifier;
18281183183- const eachBatchTraced = tracer.traced(
184184- { operation: 'processBatch', resource: 'itemsProcessingWorker' },
185185- async function ({ batch, heartbeat }) {
186186- // Heartbeat every 5s while the upload is in progress/being retried
187187- // (kafkajs will dedupe these if we're calling heartbeat more often
188188- // than heartbeat interval), to avoid 30s session timeout if s3
189189- // upload has to be retried a few times or takes a long time.
190190- // Handles case (6) above.
191191- const heartbeatInverval = setInterval(() => {
192192- heartbeat().catch((reason) => {
193193- tracer.traced(
194194- {
195195- operation: 'consumerHeartbeat',
196196- resource: 'itemsProcessingWorker',
197197- },
198198- () => {
199199- tracer.logActiveSpanFailedIfAny(reason);
200200- },
201201- );
202202- });
203203- }, 5_000);
8282+ // BullMQ serializes job data as JSON, which converts Date
8383+ // objects to ISO strings. Re-hydrate here.
8484+ const submissionTime = new Date(
8585+ itemSubmissionWithTypeIdentifier.submissionTime,
8686+ );
20487205205- try {
206206- const { decodedMessages: messages, topic, partition } = batch;
207207- const insertWithRetries = tracer.traced(
208208- {
209209- resource: 'itemProcessingWorker',
210210- operation: 'ItemInvestigationService.insertItem',
211211- },
212212- withRetries(
213213- {
214214- maxRetries: 1,
215215- initialTimeMsBetweenRetries: 75,
216216- maxTimeMsBetweenRetries: 250,
217217- },
218218- ItemInvestigationService.insertItem.bind(
219219- ItemInvestigationService,
220220- ),
221221- ),
222222- );
8888+ try {
8989+ itemSubmission = (await submissionDataToItemSubmission(
9090+ async ({ typeSelector, orgId }) =>
9191+ moderationConfigService.getItemType({
9292+ orgId,
9393+ itemTypeSelector: typeSelector,
9494+ }),
9595+ {
9696+ orgId: metadata.orgId,
9797+ submissionId:
9898+ itemSubmissionWithTypeIdentifier.submissionId satisfies string as SubmissionId,
9999+ submissionTime,
100100+ itemId: itemSubmissionWithTypeIdentifier.itemId,
101101+ itemTypeId: itemTypeIdentifier.id,
102102+ itemTypeVersion: itemTypeIdentifier.version,
103103+ itemTypeSchemaVariant: itemTypeIdentifier.schemaVariant,
104104+ data: jsonParse(
105105+ itemSubmissionWithTypeIdentifier.dataJSON,
106106+ ),
107107+ creatorId: null,
108108+ creatorTypeId: null,
109109+ },
110110+ )) as ItemSubmission & { submissionTime: Date };
111111+ } catch {
112112+ // If we can't reconstruct a message, it likely has
113113+ // bad data or was written in a bad format. Write to
114114+ // the DLQ for inspection and return without throwing
115115+ // so BullMQ marks this job as complete (not retried).
116116+ await itemSubmissionRetryQueueBulkWrite([job.data]);
117117+ return;
118118+ }
223119224224- Meter.itemProcessingBatchSize.record(messages.length);
225225- const batchStartTime = performance.now();
226226- await Promise.all(
227227- messages.map(async (data) => {
228228- // TODO: what to do if value is missing cuz we wrote incorrectly?
229229- // Add metric to count occurences of this, ideally we would only see this
230230- // failure on first deploy and quickly fix it.
231231- const { itemSubmissionWithTypeIdentifier, metadata } =
232232- data.value!;
233233-234234- Meter.itemProcessingAttemptsCounter.add(1, {
235235- process: 'item-processing-worker',
236236- });
237237-238238- // TODO: better way to do this?
239239- let itemSubmission;
240120 try {
241241- const { itemTypeIdentifier } =
242242- itemSubmissionWithTypeIdentifier;
121121+ await insertWithRetries({
122122+ requestId: metadata.requestId,
123123+ orgId: metadata.orgId,
124124+ itemSubmission,
125125+ });
126126+ } catch (e: unknown) {
127127+ // swallow error for now if an item fails to make it into
128128+ // scylla; it shouldn't prevent processing
129129+ }
243130244244- try {
245245- // NB: could throw if item type can't be found (e.g.,
246246- // postgres briefly down)
247247- itemSubmission = (await submissionDataToItemSubmission(
248248- async ({ typeSelector, orgId }) =>
249249- moderationConfigService.getItemType({
250250- orgId,
251251- itemTypeSelector: typeSelector,
252252- }),
253253- {
254254- orgId: metadata.orgId,
255255- submissionId:
256256- itemSubmissionWithTypeIdentifier.submissionId satisfies string as SubmissionId,
257257- submissionTime:
258258- itemSubmissionWithTypeIdentifier.submissionTime,
259259- itemId: itemSubmissionWithTypeIdentifier.itemId,
260260- itemTypeId: itemTypeIdentifier.id,
261261- itemTypeVersion: itemTypeIdentifier.version,
262262- itemTypeSchemaVariant:
263263- itemTypeIdentifier.schemaVariant,
264264- data: jsonParse(
265265- itemSubmissionWithTypeIdentifier.dataJSON,
266266- ),
267267- creatorId: null,
268268- creatorTypeId: null,
269269- },
270270- // this cast is safe since new ItemSubmissions are
271271- // always written with a submissionTime, despite the
272272- // `...toItemSubmission` function annotation implying they
273273- // could have an undefined submissionTime. This is to support
274274- // legacy submissions, but none of those will end up in
275275- // Kafka
276276- )) as ItemSubmission & { submissionTime: Date };
277277- } catch {
278278- // If we can't reconstruct a message, it is likely has
279279- // made it past validation with some bad data (shouldn't happen)
280280- // or the kafka message was written in a bad format. In this case
281281- // we hope it is not a problem with every single item, so we don't want
282282- // to block progress on the item submission queue - so we write to a
283283- // retry queue which can be inspected and optionally retried
284284- if (data.value) {
285285- await itemSubmissionRetryQueueBulkWrite([data.value]);
286286- }
287287- return;
288288- }
131131+ await ruleEngine.runEnabledRules(
132132+ itemSubmission,
133133+ metadata.requestId,
134134+ );
289135290290- try {
291291- await insertWithRetries({
292292- requestId: metadata.requestId,
293293- orgId: metadata.orgId,
294294- itemSubmission,
295295- });
296296- } catch (e: unknown) {
297297- //swallow error for now if an item fails to make it into
298298- //scylla, it is not really an issue for running most
299299- //rules and shouldn't prevent processing
300300- }
136136+ await contentApiLogger.logContentApiRequest(
137137+ {
138138+ requestId: metadata.requestId,
139139+ orgId: metadata.orgId,
140140+ itemSubmission,
141141+ failureReason: undefined,
142142+ },
143143+ false,
144144+ );
301145302302- await ruleEngine.runEnabledRules(
303303- itemSubmission,
304304- metadata.requestId,
305305- );
146146+ Meter.itemProcessingJobTime.record(
147147+ performance.now() - jobStartTime,
148148+ );
306149307307- // This returns as soon as the item is loaded, not when the
308308- // batch is actually written, so it can be
309309- // safely/efficiently awaited on each message
310310- await contentApiLogger.logContentApiRequest(
311311- {
312312- requestId: metadata.requestId,
313313- orgId: metadata.orgId,
314314- itemSubmission,
315315- failureReason: undefined,
316316- },
317317- false,
150150+ queue!.getJobCounts('waiting', 'active').then((counts) => {
151151+ Meter.itemProcessingQueueDepth.record(
152152+ counts.waiting + counts.active,
318153 );
154154+ }).catch(() => {});
155155+ } catch (e: unknown) {
156156+ tracer.logActiveSpanFailedIfAny(e);
157157+ Meter.itemProcessingFailuresCounter.add(1, {
158158+ process: 'item-processing-worker',
159159+ });
319160320320- } catch (e: unknown) {
321321- tracer.logActiveSpanFailedIfAny(e);
322322- Meter.itemProcessingFailuresCounter.add(1, {
323323- process: 'item-processing-worker',
324324- });
161161+ // Transient errors (postgres down, etc.) are retried by
162162+ // BullMQ automatically. Bugs or infrastructure outages
163163+ // will exhaust retries and the job moves to failed state,
164164+ // preserving it for inspection.
165165+ throw e;
166166+ }
167167+ },
168168+ );
325169326326- // When we reach this catch block we have hit one of the errors in
327327- // case 6 a, b, or c. These fall into two categories:
328328- //
329329- // Transient Errors: errors in connection to postgres, or
330330- // writing to ContentAPIRequests. these are cheaply retried
331331- // by throwing, which triggers another call to `eachBatch`.
332332- //
333333- // Bugs or Infrastructure outages: In these cases we want
334334- // to stop progressing through the queue until the issue is
335335- // resolved, either by deploying updated code which fixes
336336- // the issue, or when some external infrastructure (most
337337- // likely Kafka itself) is available and we can establish a
338338- // connection. We can also handle this by throwing, which
339339- // will retry continually until the process crashes.
340340- //
341341- // In both cases (if Kafka is available) KafkaJS will
342342- // automatically commit the offsets for any messages in the
343343- // batch that have already been processed, so we are not at
344344- // risk of re-processing them and duplicating effor
345345- throw e;
346346- }
347347- }),
348348- );
349349- Meter.itemProcessingBatchTime.record(
350350- performance.now() - batchStartTime,
351351- );
170170+ await processJob();
171171+ },
172172+ {
173173+ connection: redis,
174174+ concurrency: 30,
175175+ removeOnComplete: { count: 0 },
176176+ removeOnFail: { count: 1000 },
177177+ },
178178+ );
352179353353- // commit offset only after processing successfully. The +1 here
354354- // _is_ necessary, after extensive testing, because kafka starts
355355- // consuming at the committed offset so, if we commit the last
356356- // offset that was successfully processed, and then the worker
357357- // restarts or there's a rebalance, that message will get
358358- // processed twice. We use BigInt here since the offset is
359359- // uint64.
360360- await consumer.commitOffsets([
361361- {
362362- topic,
363363- partition,
364364- offset: (BigInt(batch.lastOffset()) + 1n).toString(),
365365- },
366366- ]);
180180+ // BullMQ Worker runs continuously; wait for it to be ready
181181+ await worker.waitUntilReady();
367182368368- // NB: no catch block means the error's rethrown, which addresses
369369- // Error cases 6 a, b, c
370370- // This will trigger eachBatch to be retried, until the process
371371- // eventually crashes. See comment on the crash listener.
372372- } finally {
373373- clearInterval(heartbeatInverval);
374374- }
375375- } satisfies KafkaConsumerRunConfig<
376376- Pick<KafkaSchemaMap, ConsumedTopic>
377377- >['eachBatch'],
378378- );
379379-380380- await consumer.run({
381381- autoCommit: false,
382382- partitionsConsumedConcurrently: 30,
383383- eachBatch: eachBatchTraced,
183183+ // Keep the run() promise pending until the worker is closed.
184184+ await new Promise<void>((resolve) => {
185185+ worker!.on('closed', () => resolve());
384186 });
385187 },
386188 async shutdown() {
387387- await consumer.disconnect();
189189+ await worker?.close();
190190+ await queue?.close();
388191 },
389192 } satisfies Worker;
390193 },