···2121 topic: ITEM_SUBMISSION_SCHEMAS,
2222) {
2323 const kafkaProducer = kafka.producer();
2424- const initialConnectPromise = kafkaProducer.connect();
2424+ let connectError: Error | undefined;
2525+ const initialConnectPromise = kafkaProducer.connect().catch((err: unknown) => {
2626+ // Store the error to prevent an unhandled promise rejection from crashing
2727+ // the process. We re-throw it when callers attempt to write to Kafka.
2828+ connectError = err instanceof Error ? err : new Error(String(err));
2929+ });
2530 const batchTimeout = 500;
26312732 const loader: DataLoader<ItemSubmissionKafkaMessageValue, void> =
···4550 skipBatch: boolean = false,
4651 ) {
4752 await initialConnectPromise;
5353+ if (connectError) {
5454+ throw connectError;
5555+ }
4856 // bulkWrite and loader.loadMany have different return types, so we have to
4957 // handle their returns separately and construct a homogenous return type in
5058 // each case, in addition to the logical difference of using batching or not