Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/* eslint-disable max-lines */
2
3import { type ItemIdentifier } from '@roostorg/types';
4import { Queue, Worker } from 'bullmq';
5import { type Job } from 'bullmq';
6import { type Cluster } from 'ioredis';
7import type IORedis from 'ioredis';
8import { type Kysely, type Transaction } from 'kysely';
9import pLimit from 'p-limit';
10import { type Opaque, type ReadonlyDeep } from 'type-fest';
11import { v1 as uuidv1 } from 'uuid';
12
13import { type Dependencies } from '../../../iocContainer/index.js';
14import {
15 UserPermission,
16 type Invoker,
17} from '../../../models/types/permissioning.js';
18import { cached, type Cached } from '../../../utils/caching.js';
19import { filterNullOrUndefined } from '../../../utils/collections.js';
20import {
21 b64UrlDecode,
22 b64UrlEncode,
23 type B64UrlOf,
24} from '../../../utils/encoding.js';
25import {
26 CoopError,
27 ErrorType,
28 makeUnauthorizedError,
29 type ErrorInstanceData,
30} from '../../../utils/errors.js';
31import { isUniqueViolationError } from '../../../utils/kysely.js';
32import { removeUndefinedKeys, safePick } from '../../../utils/misc.js';
33import { replaceEmptyStringWithNull } from '../../../utils/string.js';
34import { WEEK_MS } from '../../../utils/time.js';
35import {
36 instantiateOpaqueType,
37 type Bind1,
38} from '../../../utils/typescript-types.js';
39import {
40 getFieldValueForRole,
41 makeSubmissionId,
42} from '../../itemProcessingService/index.js';
43import { type ItemSubmissionWithTypeIdentifier } from '../../itemProcessingService/makeItemSubmissionWithTypeIdentifier.js';
44import { type ManualReviewToolServicePg } from '../dbTypes.js';
45import {
46 type AppealEnqueueSourceInfo,
47 type JobId,
48 type LegacyItemWithTypeIdentifier,
49 type ManualReviewAppealJob,
50 type ManualReviewAppealJobPayload,
51 type ManualReviewJob,
52 type ManualReviewJobEnqueueSourceInfo,
53 type ManualReviewJobPayload,
54 type OriginJobInfo,
55 type StoredManualReviewJob,
56} from '../manualReviewToolService.js';
57
58export type ManualReviewQueue = {
59 id: string;
60 name: string;
61 description: string | null;
62 orgId: string;
63 createdAt: Date;
64 isDefaultQueue: boolean;
65 isAppealsQueue: boolean;
66 autoCloseJobs: boolean;
67};
68
69const PgQueueSelection = [
70 'id',
71 'org_id as orgId',
72 'name',
73 'description',
74 'is_default_queue as isDefaultQueue',
75 'created_at as createdAt',
76 'is_appeals_queue as isAppealsQueue',
77 'auto_close_jobs as autoCloseJobs',
78] as const;
79
80// BullJobId represents the ID we give a job within Bull. It's only unique among
81// the pending jobs within a single queue. Over time, multiple jobs will end up
82// with the same BullJobId if the same item is enqueued multiple times (with the
83// second enqueue happening in a different queue or after the first job has been
84// processed and is no longer saved in Bull). However, if one attempts to
85// enqueue a new job with the same BullJobId while another job with that id is
86// still in the queue, Bull will ignore the second enqueue, because it dedupes
87// on the basis of this id, which is exactly what we want.
88type BullJobId = Opaque<string, 'BullJobId'>;
89
90// TODO: use this.
91export type ManualReviewQueueErrorType = 'ManualReviewQueueNameExistsError';
92export type QueueOperationsErrorType =
93 | 'DeleteAllJobsUnauthorizedError'
94 | 'QueueDoesNotExistError'
95 | 'UnableToDeleteDefaultQueueError';
96
97// Compound identifier for a queue. orgId is needed for security, but also
98// because queues are/will be actually sharded across redis instances for
99// scaling by orgId, so you need the orgId to find the queue.
100type QueueKey = { orgId: string; queueId: string };
101
102/**
103 * This class handles everything that MRT does directly with queues: CRUDing
104 * them, enqueuing and dequeueing jobs on a given queue, looking up jobs within
105 * a given queue, etc. It does not deal with routing jobs to queues or forming
106 * the job payloads that should be enqueued.
107 *
108 * The state for queues is split between BullMQ and Postgres, and this class
109 * coordinates those two backends where needed to present a single, logical API
110 * (see, e.g., the use of {@link #checkQueueExists} inside
111 * {@link #getBullQueue}, and the logic in {@link deleteManualReviewQueue}).
112 *
113 * This class also implements (and hides the implementation details of)
114 * MRT-specific logic for ignoring an enqueue when a job already exists for the
115 * same item.
116 *
117 * Finally, it handles the mismatch between how Bull is normally used (i.e.,
118 * with a small number of long-running workers that automatically dequeue and
119 * process jobs, and a few long-lived queue object references that are used to
120 * push jobs to those workers) and how we use it in MRT, where users -- not an
121 * automatic worker -- manually dequeue jobs and mark them complete, and where
122 * there are many, many queues (not all of which we want to keep references to
123 * in memory or connected to Redis at all times).
124 *
125 * As part of handling that mismatch, this class exposes an API that solely
126 * accepts and returns plain data values, as opposed to the stateful
127 * Queue/Worker/Job objects that Bull usually deals with. While this
128 * occasionally adds some overhead, that overhead is minimized by smart caching
129 * internally, and this sort of API also makes the class much easier to mock.
130 */
131export default class QueueOperations {
132 private readonly getOrCreateBullQueue: Cached<
133 Bind1<typeof getOrCreateBullQueue<StoredManualReviewJob>>
134 >;
135 private readonly getBullWorker: Cached<
136 Bind1<typeof getBullWorker<StoredManualReviewJob>>
137 >;
138
139 private readonly getOrCreateBullAppealQueue: Cached<
140 Bind1<typeof getOrCreateBullQueue<ManualReviewAppealJob>>
141 >;
142 private readonly getBullAppealWorker: Cached<
143 Bind1<typeof getBullWorker<ManualReviewAppealJob>>
144 >;
145
146 constructor(
147 private readonly pgQuery: Kysely<ManualReviewToolServicePg>,
148 private readonly pgQueryReadReplica: Kysely<ManualReviewToolServicePg>,
149 private readonly moderationConfigService: Dependencies['ModerationConfigService'],
150 redis: RedisConnection,
151 ) {
152 // Reassingment here is a hack to work around TS syntax limitations
153 // with generic instantiation expressions.
154 const getOrCreateBullQueue_ = getOrCreateBullQueue<StoredManualReviewJob>;
155 const getBullWorker_ = getBullWorker<StoredManualReviewJob>;
156 const getOrCreateBullAppealQueue_ =
157 getOrCreateBullQueue<ManualReviewAppealJob>;
158 const getBullAppealWorker_ = getBullWorker<ManualReviewAppealJob>;
159
160 this.getBullWorker = cached({
161 producer: getBullWorker_.bind(null, redis),
162 directives: { freshUntilAge: 600 },
163 numItemsLimit: 128,
164 onItemEviction: async (workerPromise) => {
165 await workerPromise.close();
166 },
167 });
168
169 this.getOrCreateBullQueue = cached({
170 producer: getOrCreateBullQueue_.bind(null, redis),
171 directives: { freshUntilAge: 600 },
172 numItemsLimit: 128,
173 onItemEviction: async (queuePromise) => {
174 await queuePromise.close();
175 },
176 });
177
178 this.getBullAppealWorker = cached({
179 producer: getBullAppealWorker_.bind(null, redis),
180 directives: { freshUntilAge: 600 },
181 numItemsLimit: 128,
182 onItemEviction: async (workerPromise) => {
183 await workerPromise.close();
184 },
185 });
186
187 this.getOrCreateBullAppealQueue = cached({
188 producer: getOrCreateBullAppealQueue_.bind(null, redis),
189 directives: { freshUntilAge: 600 },
190 numItemsLimit: 128,
191 onItemEviction: async (queuePromise) => {
192 await queuePromise.close();
193 },
194 });
195 }
196
197 async checkQueueExists(orgId: string, queueId: string) {
198 // NB: this intentionally hits the primary, to make sure we're giving an
199 // accurate result before creating the Bull queue.
200 const queue = await this.pgQuery
201 .selectFrom('manual_review_tool.manual_review_queues')
202 .select(PgQueueSelection)
203 .where('org_id', '=', orgId)
204 .where('id', '=', queueId)
205 .executeTakeFirst();
206 if (queue === undefined) {
207 throw makeQueueDoesNotExistError({ shouldErrorSpan: true });
208 }
209 }
210
211 async #getBullQueue(orgId: string, queueId: string) {
212 await this.checkQueueExists(orgId, queueId);
213 return this.getOrCreateBullQueue({ orgId, queueId });
214 }
215 async #getBullAppealQueue(orgId: string, queueId: string) {
216 await this.checkQueueExists(orgId, queueId);
217 return this.getOrCreateBullAppealQueue({ orgId, queueId });
218 }
219
220 // TODO: try/catch create and update and throw
221 // ManualReviewQueueNameExistsError and NotFoundError
222 async createManualReviewQueue(input: {
223 name: string;
224 description: string | null;
225 userIds: readonly string[];
226 hiddenActionIds: readonly string[];
227 invokedBy: Invoker;
228 isAppealsQueue?: boolean;
229 autoCloseJobs?: boolean;
230 }) {
231 const {
232 name,
233 description,
234 userIds,
235 hiddenActionIds,
236 invokedBy,
237 isAppealsQueue,
238 autoCloseJobs,
239 } = input;
240 const { orgId } = invokedBy;
241
242 if (!invokedBy.permissions.includes(UserPermission.EDIT_MRT_QUEUES)) {
243 throw makeUnauthorizedError(
244 'You do not have permission to create a queue',
245 { shouldErrorSpan: true },
246 );
247 }
248
249 try {
250 return await this.pgQuery.transaction().execute(async (transaction) => {
251 // In newer versions of kysely, this is greatly simplified with
252 // `transaction.selectNoFrom(eb => eb.exists(...))`, but we're blocked on
253 // updating by https://github.com/kysely-org/kysely/issues/577#issuecomment-1804900006
254 const orgHasQueuesAlready = await transaction
255 .selectFrom('manual_review_tool.manual_review_queues')
256 .where('org_id', '=', orgId)
257 .where('is_appeals_queue', '=', isAppealsQueue ?? false)
258 .limit(1)
259 .execute()
260 .then((queues) => queues.length > 0);
261
262 const queue = await transaction
263 .insertInto('manual_review_tool.manual_review_queues')
264 .returning(PgQueueSelection)
265 .values([
266 {
267 id: uuidv1(),
268 name,
269 org_id: orgId,
270 is_default_queue: !orgHasQueuesAlready,
271 description: replaceEmptyStringWithNull(description),
272 is_appeals_queue: isAppealsQueue ?? false,
273 auto_close_jobs: autoCloseJobs ?? false,
274 },
275 ])
276 .executeTakeFirstOrThrow();
277
278 await transaction
279 .insertInto('manual_review_tool.users_and_accessible_queues')
280 .values(
281 userIds.map((userId) => ({
282 queue_id: queue.id,
283 user_id: userId,
284 })),
285 )
286 .executeTakeFirstOrThrow();
287
288 await this.updateHiddenActionsForQueue({
289 transaction,
290 queueId: queue.id,
291 actionIdsToHide: hiddenActionIds,
292 actionIdsToUnhide: [],
293 orgId,
294 });
295 return queue;
296 });
297 } catch (e) {
298 if (isUniqueViolationError(e)) {
299 throw makeManualReviewQueueNameExistsError({ shouldErrorSpan: true });
300 }
301 throw e;
302 }
303 }
304
305 async updateManualReviewQueue(input: {
306 orgId: string;
307 queueId: string;
308 name?: string;
309 description?: string | null;
310 userIds: readonly string[];
311 actionIdsToHide: readonly string[];
312 actionIdsToUnhide: readonly string[];
313 autoCloseJobs?: boolean;
314 }) {
315 const {
316 queueId,
317 orgId,
318 name,
319 description,
320 userIds,
321 actionIdsToHide,
322 actionIdsToUnhide,
323 autoCloseJobs,
324 } = input;
325
326 return this.pgQuery.transaction().execute(async (transaction) => {
327 const [updatedQueue, _, __] = await Promise.all([
328 transaction
329 .updateTable('manual_review_tool.manual_review_queues')
330 .set(
331 removeUndefinedKeys({
332 name,
333 description: replaceEmptyStringWithNull(description),
334 auto_close_jobs: autoCloseJobs,
335 }),
336 )
337 .where('id', '=', queueId)
338 .where('org_id', '=', orgId)
339 .returning(PgQueueSelection)
340 .executeTakeFirstOrThrow(),
341 transaction
342 .insertInto('manual_review_tool.users_and_accessible_queues')
343 .values(
344 userIds.map((userId) => ({
345 queue_id: queueId,
346 user_id: userId,
347 })),
348 )
349 .onConflict((oc) => oc.doNothing())
350 .executeTakeFirstOrThrow(),
351 transaction
352 .deleteFrom('manual_review_tool.users_and_accessible_queues')
353 .where('queue_id', '=', queueId)
354 .where('user_id', 'not in', userIds)
355 .executeTakeFirstOrThrow(),
356 ]);
357
358 await this.updateHiddenActionsForQueue({
359 transaction,
360 queueId,
361 orgId,
362 actionIdsToHide,
363 actionIdsToUnhide,
364 });
365 return updatedQueue;
366 });
367 }
368
369 /**
370 * @returns true when the queue that was trying to be deleted
371 * exists and is successfully deleted, false when the queue
372 * did not exist and throws when the delete fails for some reason
373 */
374 async deleteManualReviewQueue(orgId: string, queueId: string) {
375 const defaultQueueId = await this.getDefaultQueueIdForOrg(orgId);
376 if (queueId === defaultQueueId) {
377 throw makeUnableToDeleteDefaultQueueError({ shouldErrorSpan: true });
378 }
379 const queue = await this.getOrCreateBullQueue({ orgId, queueId });
380
381 await queue.obliterate({ force: true });
382
383 const [{ numDeletedRows }, _] = await this.pgQuery
384 .transaction()
385 .execute(async (transaction) => {
386 return Promise.all([
387 transaction
388 .deleteFrom('manual_review_tool.manual_review_queues')
389 .where('id', '=', queueId)
390 .where('org_id', '=', orgId)
391 .executeTakeFirstOrThrow(),
392 transaction
393 .deleteFrom('manual_review_tool.users_and_accessible_queues')
394 .where('queue_id', '=', queueId)
395 .execute(),
396 ]);
397 });
398
399 return numDeletedRows === 1n;
400 }
401
402 async deleteManualReviewQueueForTestsDO_NOT_USE(
403 orgId: string,
404 queueId: string,
405 ) {
406 const queue = await this.getOrCreateBullQueue({ orgId, queueId });
407
408 await queue.obliterate({ force: true });
409
410 const [{ numDeletedRows }, _] = await this.pgQuery
411 .transaction()
412 .execute(async (transaction) => {
413 return Promise.all([
414 transaction
415 .deleteFrom('manual_review_tool.manual_review_queues')
416 .where('id', '=', queueId)
417 .where('org_id', '=', orgId)
418 .executeTakeFirstOrThrow(),
419 transaction
420 .deleteFrom('manual_review_tool.users_and_accessible_queues')
421 .where('queue_id', '=', queueId)
422 .execute(),
423 ]);
424 });
425
426 return numDeletedRows === 1n;
427 }
428
429 async getDefaultQueueIdForOrg(orgId: string) {
430 const queue = await this.pgQuery
431 .selectFrom('manual_review_tool.manual_review_queues')
432 .select(['id'])
433 .where('org_id', '=', orgId)
434 .where('is_appeals_queue', '=', false)
435 .where('is_default_queue', '=', true)
436 .executeTakeFirstOrThrow();
437 return queue.id;
438 }
439
440 async getDefaultAppealsQueueIdForOrg(orgId: string) {
441 const queue = await this.pgQuery
442 .selectFrom('manual_review_tool.manual_review_queues')
443 .select(['id'])
444 .where('org_id', '=', orgId)
445 .where('is_appeals_queue', '=', true)
446 .where('is_default_queue', '=', true)
447 .executeTakeFirstOrThrow();
448 return queue.id;
449 }
450
451 async getReviewableQueuesForUser(opts: { invoker: Invoker }) {
452 const { invoker } = opts;
453 const { userId, permissions, orgId } = invoker;
454
455 const canSeeQueues = permissions.includes(UserPermission.VIEW_MRT);
456 const bypassQueuePermissions = permissions.includes(
457 UserPermission.EDIT_MRT_QUEUES,
458 );
459
460 if (!canSeeQueues) {
461 return [];
462 }
463
464 return this.pgQuery
465 .selectFrom('manual_review_tool.manual_review_queues')
466 .select(PgQueueSelection)
467 .where('org_id', '=', orgId)
468 .$if(!bypassQueuePermissions, (query) =>
469 query.where(
470 'id',
471 'in',
472 this.pgQuery
473 .selectFrom('manual_review_tool.users_and_accessible_queues')
474 .select('queue_id')
475 .where('user_id', '=', userId),
476 ),
477 )
478 .execute();
479 }
480
481 async getQueueForOrg(opts: {
482 orgId: string;
483 userId: string;
484 queueId: string;
485 }) {
486 const { orgId, userId, queueId } = opts;
487 return this.pgQuery
488 .selectFrom('manual_review_tool.manual_review_queues')
489 .select(PgQueueSelection)
490 .where('org_id', '=', orgId)
491 .where('id', '=', queueId)
492 .where(
493 'id',
494 'in',
495 this.pgQuery
496 .selectFrom('manual_review_tool.users_and_accessible_queues')
497 .select('queue_id')
498 .where('user_id', '=', userId),
499 )
500 .executeTakeFirst();
501 }
502
503 async getFavoriteQueuesForUser(opts: { orgId: string; userId: string }) {
504 const { orgId, userId } = opts;
505 return this.pgQuery
506 .selectFrom(
507 'manual_review_tool.users_and_favorite_mrt_queues as favorite_queues',
508 )
509 .innerJoin(
510 'manual_review_tool.manual_review_queues as queues',
511 'favorite_queues.queue_id',
512 'queues.id',
513 )
514 .select([
515 'queues.id',
516 'queues.org_id as orgId',
517 'queues.name',
518 'queues.description',
519 'queues.is_default_queue as isDefaultQueue',
520 'queues.created_at as createdAt',
521 'queues.is_appeals_queue as isAppealsQueue',
522 'queues.auto_close_jobs as autoCloseJobs',
523 ])
524 .where('favorite_queues.user_id', '=', userId)
525 .where('favorite_queues.org_id', '=', orgId)
526 .execute();
527 }
528
529 async addFavoriteQueueForUser(opts: {
530 userId: string;
531 orgId: string;
532 queueId: string;
533 }) {
534 const { userId, orgId, queueId } = opts;
535 await this.pgQuery
536 .insertInto('manual_review_tool.users_and_favorite_mrt_queues')
537 .values({ user_id: userId, org_id: orgId, queue_id: queueId })
538 .onConflict((oc) => oc.doNothing())
539 .execute();
540 }
541
542 async removeFavoriteQueueForUser(opts: {
543 userId: string;
544 orgId: string;
545 queueId: string;
546 }) {
547 const { userId, orgId, queueId } = opts;
548 await this.pgQuery
549 .deleteFrom('manual_review_tool.users_and_favorite_mrt_queues')
550 .where('user_id', '=', userId)
551 .where('org_id', '=', orgId)
552 .where('queue_id', '=', queueId)
553 .execute();
554 }
555
556 async getAllQueuesForOrgAndDangerouslyBypassPermissioning(orgId: string) {
557 return this.pgQuery
558 .selectFrom('manual_review_tool.manual_review_queues')
559 .select(PgQueueSelection)
560 .where('org_id', '=', orgId)
561 .execute();
562 }
563
564 async getQueueForOrgAndDangerouslyBypassPermissioning(opts: {
565 orgId: string;
566 queueId: string;
567 }) {
568 return this.pgQuery
569 .selectFrom('manual_review_tool.manual_review_queues')
570 .select(PgQueueSelection)
571 .where('org_id', '=', opts.orgId)
572 .where('id', '=', opts.queueId)
573 .executeTakeFirst();
574 }
575
576 async getUsersWhoCanSeeQueue(opts: { orgId: string; queueId: string }) {
577 const { orgId, queueId } = opts;
578 return this.pgQuery
579 .selectFrom('manual_review_tool.users_and_accessible_queues')
580 .select(['user_id as userId'])
581 .where('queue_id', '=', queueId)
582 .where(
583 'queue_id',
584 'in',
585 this.pgQuery
586 .selectFrom('manual_review_tool.manual_review_queues')
587 .select('id')
588 .where('org_id', '=', orgId),
589 )
590 .execute();
591 }
592
593 async addAccessibleQueuesForUser(
594 userIds: string[],
595 queueIds: readonly string[],
596 ) {
597 return this.pgQuery
598 .insertInto('manual_review_tool.users_and_accessible_queues')
599 .values(
600 userIds.flatMap((userId) =>
601 queueIds.map((queueId) => ({ queue_id: queueId, user_id: userId })),
602 ),
603 )
604 .onConflict((oc) => oc.doNothing())
605 .execute();
606 }
607
608 async removeAccessibleQueuesForUser(
609 userId: string,
610 queueIds: readonly string[],
611 ) {
612 return this.pgQuery
613 .deleteFrom('manual_review_tool.users_and_accessible_queues')
614 .where('user_id', '=', userId)
615 .where('queue_id', 'in', queueIds)
616 .execute();
617 }
618
619 async #getJob(
620 externalId: JobId,
621 queue: ReadonlyDeep<Queue<StoredManualReviewJob>>,
622 ) {
623 const { bullId } = parseExternalId(externalId);
624
625 const job = await queue.getJob(bullId);
626
627 // NB: this check ensures that the job is actually the requested one,
628 // not a new job about the same item.
629 return job?.data.id === externalId ? job : undefined;
630 }
631
632 async #getAppealJob(
633 externalId: JobId,
634 queue: ReadonlyDeep<Queue<ManualReviewAppealJob>>,
635 ) {
636 const { bullId } = parseExternalId(externalId);
637
638 const job = await queue.getJob(bullId);
639
640 // NB: this check ensures that the job is actually the requested one,
641 // not a new job about the same item.
642 return job?.data.id === externalId ? job : undefined;
643 }
644
645 async getJobFromItemId(opts: {
646 orgId: string;
647 queueId: string;
648 itemId: string;
649 itemTypeId: string;
650 }): Promise<ManualReviewJob | undefined> {
651 const { orgId, queueId, itemId, itemTypeId } = opts;
652 const queue = await this.#getBullQueue(orgId, queueId);
653 const job = await queue.getJob(
654 itemIdToBullJobId({ id: itemId, typeId: itemTypeId }),
655 );
656 if (job) {
657 const converted = await this.legacyJobToJob(job, orgId);
658 return converted.data;
659 } else {
660 return undefined;
661 }
662 }
663
664 /**
665 * Enqueues a new job to the specified queue, or does nothing if a job already
666 * exists in that queue for the same item.
667 */
668 async addJob(opts: {
669 orgId: string;
670 queueId: string;
671 reenqueuedFrom?: OriginJobInfo;
672 jobPayload: {
673 createdAt?: Date;
674 policyIds: string[];
675 payload: ManualReviewJobPayload;
676 };
677 enqueueSourceInfo: ManualReviewJobEnqueueSourceInfo;
678 }) {
679 const { orgId, queueId, jobPayload, reenqueuedFrom, enqueueSourceInfo } =
680 opts;
681 const { payload, policyIds } = jobPayload;
682
683 const queue = await this.#getBullQueue(orgId, queueId);
684
685 const bullJobId = itemIdToBullJobId({
686 id: payload.item.itemId,
687 typeId: payload.item.itemTypeIdentifier.id,
688 });
689 const jobId = bullJobIdtoExternalJobId(bullJobId);
690 const createdAt = jobPayload.createdAt ?? new Date();
691
692 const newJob = await queue.add(
693 bullJobId,
694 {
695 orgId,
696 payload,
697 id: jobId,
698 createdAt,
699 policyIds,
700 reenqueuedFrom,
701 enqueueSourceInfo,
702 },
703 { removeOnComplete: true, jobId: bullJobId },
704 );
705
706 // Again, because new job data comes in in the non-legacy format, it's safe
707 // to cast.
708 return newJob.data satisfies StoredManualReviewJob as ManualReviewJob;
709 }
710
711 /**
712 * Enqueues a new appeal job to the specified queue, or does nothing if a job already
713 * exists in that queue for the same item.
714 */
715 async addAppealJob(opts: {
716 orgId: string;
717 queueId: string;
718 reenqueuedFrom?: OriginJobInfo;
719 jobPayload: {
720 createdAt?: Date;
721 policyIds: string[];
722 payload: ManualReviewAppealJobPayload;
723 };
724 enqueueSourceInfo: AppealEnqueueSourceInfo;
725 }) {
726 const { orgId, queueId, jobPayload, reenqueuedFrom, enqueueSourceInfo } =
727 opts;
728 const { payload, policyIds } = jobPayload;
729
730 const queue = await this.#getBullAppealQueue(orgId, queueId);
731
732 const bullJobId = itemIdToBullJobId({
733 id: payload.item.itemId,
734 typeId: payload.item.itemTypeIdentifier.id,
735 });
736 const jobId = bullJobIdtoExternalJobId(bullJobId);
737 const createdAt = jobPayload.createdAt ?? new Date();
738
739 const newJob = await queue.add(
740 bullJobId,
741 {
742 orgId,
743 payload,
744 id: jobId,
745 createdAt,
746 policyIds,
747 reenqueuedFrom,
748 enqueueSourceInfo,
749 },
750 { removeOnComplete: true, jobId: bullJobId },
751 );
752
753 return newJob.data;
754 }
755
756 async getAppealJobs(opts: {
757 orgId: string;
758 queueId: string;
759 jobIds: Readonly<JobId[]>;
760 }): Promise<ManualReviewAppealJob[]> {
761 const limit = pLimit(10);
762 const { orgId, queueId, jobIds } = opts;
763
764 const queue = await this.#getBullAppealQueue(orgId, queueId);
765 const jobs = await Promise.all(
766 jobIds.map(async (jobId) =>
767 limit(async () => {
768 return this.#getAppealJob(jobId, queue);
769 }),
770 ),
771 );
772
773 return filterNullOrUndefined(jobs).map((job) => job.data);
774 }
775
776 async getJobs(opts: {
777 orgId: string;
778 queueId: string;
779 jobIds: Readonly<JobId[]>;
780 }): Promise<ManualReviewJob[]> {
781 const limit = pLimit(10);
782 const { orgId, queueId, jobIds } = opts;
783
784 const queue = await this.#getBullQueue(orgId, queueId);
785 const jobs = await Promise.all(
786 jobIds.map(async (jobId) =>
787 limit(async () => {
788 const job = await this.#getJob(jobId, queue);
789 return job ? this.legacyJobToJob(job, orgId) : undefined;
790 }),
791 ),
792 );
793
794 return filterNullOrUndefined(jobs).map((job) => job.data);
795 }
796
797 /**
798 * This is currently only being used for testing, and should be
799 * deleted after we rework the list view.
800 */
801 async getAllJobsForQueue(opts: { orgId: string; queueId: string }) {
802 const limit = pLimit(10);
803 const { orgId, queueId } = opts;
804 const queue = await this.#getBullQueue(orgId, queueId);
805 // Get up to 50 jobs per queue.
806 const legacyJobs = await queue.getJobs(undefined, 0, 50);
807 const jobs = await Promise.all(
808 // eslint-disable-next-line @typescript-eslint/promise-function-async
809 legacyJobs.map((job) => limit(() => this.legacyJobToJob(job, orgId))),
810 );
811 return filterNullOrUndefined(jobs).map((job) => job.data);
812 }
813
814 async updateJobForQueue(opts: {
815 orgId: string;
816 queueId: string;
817 jobId: JobId;
818 data: ManualReviewJob;
819 }) {
820 const { orgId, queueId, jobId, data } = opts;
821 const queue = await this.#getBullQueue(orgId, queueId);
822 const { bullId } = parseExternalId(jobId);
823 const job = await queue.getJob(bullId);
824 await job?.updateData(data);
825
826 // Because the `data` arg above is a ManualReviewJob, we know the stored
827 // data for this particular job won't be in the legacy format.
828 return job?.data satisfies StoredManualReviewJob | undefined as
829 | ManualReviewJob
830 | undefined;
831 }
832
833 async deleteAllJobsFromQueue(opts: {
834 orgId: string;
835 queueId: string;
836 userPermissions: readonly UserPermission[];
837 }) {
838 const { orgId, queueId, userPermissions } = opts;
839 if (!userPermissions.includes(UserPermission.EDIT_MRT_QUEUES)) {
840 throw makeDeleteAllJobsInsufficientPermissionsError({
841 shouldErrorSpan: true,
842 });
843 }
844
845 const queue = await this.#getBullQueue(orgId, queueId);
846 await queue.obliterate({ force: true });
847 }
848
849 async dequeueNextAppealJobWithLock(opts: {
850 orgId: string;
851 queueId: string;
852 lockToken: string;
853 }): Promise<{
854 job: ManualReviewAppealJob;
855 lockToken: string;
856 } | null> {
857 const { orgId, queueId, lockToken } = opts;
858
859 await this.checkQueueExists(orgId, queueId);
860 const worker = await this.getBullAppealWorker({ orgId, queueId });
861
862 let hasDecision = true;
863 while (hasDecision) {
864 const job = await worker.getNextJob(lockToken);
865
866 if (!job) {
867 return null;
868 }
869
870 // There is a race condition due to the locking mechanism where a job can
871 // be decided on but not dequeued, so we check here if the first job in the
872 // queue has a decision, and if so use the lock token to immediately
873 // remove it, then grab a new job and return to the caller. it is very
874 // unlikely that there are multiple jobs like this at the front of the
875 // queue, but not impossible.
876 const decision = await this.pgQueryReadReplica
877 .selectFrom('manual_review_tool.manual_review_decisions')
878 .where('created_at', '>=', new Date('2023-10-01'))
879 .where('org_id', '=', orgId)
880 .where('id', '=', jobIdToGuid(job.data.id))
881 .executeTakeFirst();
882
883 hasDecision = decision !== undefined;
884
885 if (hasDecision) {
886 await this.removeJob({
887 orgId,
888 queueId,
889 lockToken,
890 jobId: job.data.id,
891 }).catch(() => {});
892 // then continue while loop
893 } else {
894 // this is the most likely case, where there is a job
895 // and it has never been decided before
896 return { job: job.data, lockToken };
897 }
898 }
899 return null;
900 }
901
902 async dequeueNextJobWithLock(opts: {
903 orgId: string;
904 queueId: string;
905 lockToken: string;
906 }): Promise<{
907 job: ManualReviewJob;
908 lockToken: string;
909 } | null> {
910 const { orgId, queueId, lockToken } = opts;
911
912 await this.checkQueueExists(orgId, queueId);
913 const worker = await this.getBullWorker({ orgId, queueId });
914
915 let hasDecision = true;
916 while (hasDecision) {
917 const job = await worker.getNextJob(lockToken);
918
919 if (!job) {
920 return null;
921 }
922
923 const convertedJob = await this.legacyJobToJob(job, orgId);
924
925 // There is a race condition due to the locking mechanism where a job can
926 // be decided on but not dequeued, so we check here if the first job in the
927 // queue has a decision, and if so use the lock token to immediately
928 // remove it, then grab a new job and return to the caller. it is very
929 // unlikely that there are multiple jobs like this at the front of the
930 // queue, but not impossible.
931 const decision = await this.pgQueryReadReplica
932 .selectFrom('manual_review_tool.manual_review_decisions')
933 .select(['decision_components']) // not really necessary to return anything
934 .where('created_at', '>=', new Date('2023-10-01'))
935 .where('org_id', '=', orgId)
936 .where('id', '=', jobIdToGuid(convertedJob.data.id))
937 .executeTakeFirst();
938
939 hasDecision = decision !== undefined;
940
941 if (hasDecision) {
942 await this.removeJob({
943 orgId,
944 queueId,
945 lockToken,
946 jobId: convertedJob.data.id,
947 }).catch(() => {});
948 // then continue while loop
949 } else {
950 // this is the most likely case, where there is a job
951 // and it has never been decided before
952 return { job: convertedJob.data, lockToken };
953 }
954 }
955 return null;
956 }
957
958 /**
959 * When returning a job that's already in the new format, ensure DEFAULT
960 * payloads never carry allMediaItems so they resolve to the regular manual
961 * review view (not NCMEC) and show full decision options.
962 */
963 #returnJobWithNormalizedDefaultPayloadIfNeeded(
964 job: Job<ManualReviewJob>,
965 ): Job<ManualReviewJob> {
966 const p = job.data.payload as Record<string, unknown> & { kind?: string };
967 if (p.kind !== 'DEFAULT' || !('allMediaItems' in p)) {
968 return job;
969 }
970 const payloadWithUnknownKeys = job.data.payload as Record<string, unknown>;
971 const { allMediaItems: _omitted, ...payloadWithoutNcmec } =
972 payloadWithUnknownKeys;
973 job.data = {
974 ...job.data,
975 payload: payloadWithoutNcmec as ManualReviewJobPayload,
976 };
977 return job;
978 }
979
980 /**
981 * TODO: remove when we no longer need to support legacy jobs
982 */
983 async legacyJobToJob(
984 job: Job<StoredManualReviewJob>,
985 orgId: string,
986 ): Promise<Job<ManualReviewJob>> {
987 if (
988 'policyIds' in job.data &&
989 'submissionId' in job.data.payload.item &&
990 'reportedForReasons' in job.data.payload &&
991 'reportHistory' in job.data.payload
992 ) {
993 return this.#returnJobWithNormalizedDefaultPayloadIfNeeded(
994 job as Job<ManualReviewJob>,
995 );
996 }
997
998 const legacyItem = job.data.payload.item;
999 const convertedItem =
1000 await this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier(
1001 orgId,
1002 legacyItem,
1003 );
1004
1005 let payload: ManualReviewJobPayload;
1006 // The type complexity of both StoredManualReviewJob and
1007 // ManualReviewJobPayload is pretty overwhelming, even for TS. so we are
1008 // putting the payload kind in a variable to help TS do some type narrowing.
1009 const jobKind = job.data.payload.kind;
1010 if (jobKind === 'DEFAULT') {
1011 const { allMediaItems: _omitted, ...storedPayloadWithoutNcmec } =
1012 job.data.payload as Record<string, unknown> & { allMediaItems?: unknown };
1013 payload = {
1014 ...storedPayloadWithoutNcmec,
1015 kind: 'DEFAULT',
1016 item: convertedItem,
1017
1018 ...('itemThreadContentItems' in job.data.payload
1019 ? {
1020 itemThreadContentItems: job.data.payload.itemThreadContentItems
1021 ? await Promise.all(
1022 job.data.payload.itemThreadContentItems.map(async (it) =>
1023 'submissionId' in it
1024 ? it
1025 : this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier(
1026 orgId,
1027 it,
1028 ),
1029 ),
1030 )
1031 : job.data.payload.itemThreadContentItems,
1032 }
1033 : {}),
1034 ...('additionalContentItems' in job.data.payload
1035 ? {
1036 additionalContentItems: job.data.payload.additionalContentItems
1037 ? await Promise.all(
1038 job.data.payload.additionalContentItems.map(async (it) =>
1039 'submissionId' in it
1040 ? it // leave as-is if we're already in the new format
1041 : this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier(
1042 orgId,
1043 it,
1044 ),
1045 ),
1046 )
1047 : job.data.payload.additionalContentItems,
1048 }
1049 : {}),
1050 ...(!('reportedForReasons' in job.data.payload)
1051 ? {
1052 reportedForReasons: [
1053 {
1054 reason:
1055 'reportedForReason' in job.data.payload
1056 ? job.data.payload.reportedForReason
1057 : undefined,
1058 reporterId: job.data.payload.reporterIdentifier,
1059 },
1060 ],
1061 }
1062 : { reportedForReasons: job.data.payload.reportedForReasons ?? [] }),
1063 // Not worth building this as slowly all lobs will have it, we can miss
1064 // one report's worth of history until then
1065 ...(!('reportHistory' in job.data.payload)
1066 ? { reportHistory: [] }
1067 : { reportHistory: job.data.payload.reportHistory }),
1068 };
1069 } else {
1070 // kind is inferred to be 'NCMEC' here
1071 payload = {
1072 ...job.data.payload,
1073 kind: 'NCMEC',
1074 item: convertedItem,
1075 allMediaItems: await Promise.all(
1076 'allMediaItems' in job.data.payload
1077 ? job.data.payload.allMediaItems.map(async (mediaItem) => {
1078 if ('submissionId' in mediaItem.contentItem) {
1079 return {
1080 ...mediaItem,
1081 contentItem: mediaItem.contentItem,
1082 };
1083 }
1084
1085 return {
1086 ...mediaItem,
1087 contentItem:
1088 await this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier(
1089 orgId,
1090 mediaItem.contentItem,
1091 ),
1092 };
1093 })
1094 : [],
1095 ),
1096 ...(!('reportHistory' in job.data.payload)
1097 ? { reportHistory: [] }
1098 : { reportHistory: job.data.payload.reportHistory }),
1099 };
1100 }
1101
1102 const policyIds =
1103 'policyIds' in job.data
1104 ? job.data.policyIds
1105 : 'policyId' in job.data.payload && job.data.payload.policyId
1106 ? [job.data.payload.policyId]
1107 : [];
1108
1109 const convertedJobData = {
1110 ...safePick(job.data, [
1111 'id',
1112 'createdAt',
1113 'orgId',
1114 'reenqueuedFrom',
1115 'enqueueSourceInfo',
1116 ]),
1117 policyIds,
1118 payload,
1119 } satisfies ManualReviewJob;
1120
1121 job.data = convertedJobData;
1122 // cast here is safe bc TS checked above that convertedJobData is a ManualReviewJob
1123 return job as Job<ManualReviewJob>;
1124 }
1125
1126 /**
1127 * TODO: remove when we no longer need to support legacy jobs
1128 */
1129 async legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier(
1130 orgId: string,
1131 legacyItem: LegacyItemWithTypeIdentifier | ItemSubmissionWithTypeIdentifier,
1132 ): Promise<ItemSubmissionWithTypeIdentifier> {
1133 if ('submissionId' in legacyItem) {
1134 return legacyItem;
1135 }
1136
1137 const itemType = await this.moderationConfigService.getItemType({
1138 orgId,
1139 itemTypeSelector: legacyItem.typeIdentifier,
1140 });
1141
1142 if (!itemType) {
1143 throw new Error('Item type not found');
1144 }
1145
1146 return instantiateOpaqueType<ItemSubmissionWithTypeIdentifier>({
1147 itemId: legacyItem.id,
1148 itemTypeIdentifier: legacyItem.typeIdentifier,
1149 data: legacyItem.data,
1150 submissionId: makeSubmissionId(),
1151 creator:
1152 itemType.kind === 'CONTENT'
1153 ? getFieldValueForRole(
1154 itemType.schema,
1155 itemType.schemaFieldRoles,
1156 'creatorId',
1157 legacyItem.data,
1158 )
1159 : undefined,
1160 });
1161 }
1162
1163 /**
1164 * Attempts to move the job to completed using the given lock, but falls back
1165 * to removing the job manually. This will fail if a different Worker has a
1166 * lock on the job.
1167 */
1168 async removeJob(opts: {
1169 orgId: string;
1170 queueId: string;
1171 jobId: JobId;
1172 lockToken: string;
1173 }) {
1174 try {
1175 await this.#markLockedJobCompleted(opts);
1176 } catch (error: unknown) {
1177 // The most common case where this throws if the lock token has expired,
1178 // so try to remove it manually.
1179 const { orgId, queueId, jobId } = opts;
1180 const queue = await this.getOrCreateBullQueue({ orgId, queueId });
1181 const bullJobId = parseExternalId(jobId).bullId;
1182 const removeJobStatus = await queue.remove(bullJobId);
1183 if (removeJobStatus !== 1) {
1184 throw new Error('Failed to remove job');
1185 }
1186 }
1187 }
1188
1189 /**
1190 * If the job is not found, silently does nothing.
1191 */
1192 async #markLockedJobCompleted(opts: {
1193 orgId: string;
1194 queueId: string;
1195 jobId: JobId;
1196 lockToken: string;
1197 }) {
1198 const { orgId, queueId, lockToken, jobId } = opts;
1199 const queue = await this.getOrCreateBullQueue({ orgId, queueId });
1200 const job = await this.#getJob(jobId, queue);
1201
1202 await job?.moveToCompleted(null, lockToken, false);
1203 }
1204
1205 /**
1206 * Releases the lock on a job and moves it back to the waiting state.
1207 * This is used when a job is skipped - we want to release the lock
1208 * so the job can be reviewed later, rather than waiting for it to
1209 * become stalled.
1210 */
1211 async releaseJobLock(opts: {
1212 orgId: string;
1213 queueId: string;
1214 jobId: JobId;
1215 lockToken: string;
1216 }) {
1217 const { orgId, queueId, lockToken, jobId } = opts;
1218 try {
1219 const queue = await this.getOrCreateBullQueue({ orgId, queueId });
1220 const job = await this.#getJob(jobId, queue);
1221
1222 if (!job) {
1223 // Job not found, nothing to release
1224 return;
1225 }
1226
1227 // Move the job to delayed state with timestamp = now (0 delay)
1228 // This releases the lock and makes the job immediately available
1229 // The token parameter ensures only the holder of the lock can release it
1230 await job.moveToDelayed(Date.now(), lockToken);
1231 } catch (error: unknown) {
1232 // If the lock has already expired or the job is in a different state,
1233 // we can safely ignore the error as the job is already released
1234 // or will be handled by the stalled job checker
1235 }
1236 }
1237
1238 async getPendingJobCount(opts: { orgId: string; queueId: string }) {
1239 const { orgId, queueId } = opts;
1240 const queue = await this.#getBullQueue(orgId, queueId);
1241
1242 // Returns the number of waiting or delayed jobs
1243 // https://api.docs.bullmq.io/classes/Queue.html#count
1244 return queue.count();
1245 }
1246
1247 async getOldestJobCreatedAt(opts: {
1248 orgId: string;
1249 queueId: string;
1250 isAppealsQueue: boolean;
1251 }): Promise<Date | null> {
1252 const { orgId, queueId, isAppealsQueue } = opts;
1253 const queue = isAppealsQueue
1254 ? await this.#getBullAppealQueue(orgId, queueId)
1255 : await this.#getBullQueue(orgId, queueId);
1256
1257 // Get the first waiting job and first delayed job
1258 // BullMQ maintains FIFO order within each state, so we only need to compare
1259 // the first job from each state to find the oldest overall
1260 const [waitingJobs, delayedJobs] = await Promise.all([
1261 queue.getJobs(['waiting'], 0, 0),
1262 queue.getJobs(['delayed'], 0, 0),
1263 ]);
1264
1265 // If no jobs exist in either state, return null
1266 if (waitingJobs.length === 0 && delayedJobs.length === 0) {
1267 return null;
1268 }
1269
1270 // If only one type exists, return it
1271 if (waitingJobs.length === 0) return delayedJobs[0].data.createdAt;
1272 if (delayedJobs.length === 0) return waitingJobs[0].data.createdAt;
1273
1274 // Both exist, return the older one
1275 const waitingTime = new Date(waitingJobs[0].data.createdAt).getTime();
1276 const delayedTime = new Date(delayedJobs[0].data.createdAt).getTime();
1277 return waitingTime < delayedTime
1278 ? waitingJobs[0].data.createdAt
1279 : delayedJobs[0].data.createdAt;
1280 }
1281
1282 async close() {
1283 return Promise.all([
1284 this.getOrCreateBullQueue.close(),
1285 this.getBullWorker.close(),
1286 this.getOrCreateBullAppealQueue.close(),
1287 this.getBullAppealWorker.close(),
1288 ]);
1289 }
1290
1291 async getExistingJobsForItem(opts: {
1292 orgId: string;
1293 itemId: string;
1294 itemTypeId: string;
1295 }) {
1296 const { orgId, itemId, itemTypeId } = opts;
1297 // Check postgres for creations within the last 7 days so we don't have to
1298 // search every bull queue for every item.
1299 const recentJobCreationQueues = await this.pgQuery
1300 .selectFrom('manual_review_tool.job_creations')
1301 .select(['queue_id'])
1302 .where('org_id', '=', orgId)
1303 .where('item_id', '=', itemId)
1304 .where('item_type_id', '=', itemTypeId)
1305 .where('created_at', '>=', new Date(Date.now() - WEEK_MS))
1306 .execute();
1307 const jobsWithQueue = await Promise.all(
1308 recentJobCreationQueues.map(async (rows) => {
1309 const queueId = rows.queue_id;
1310 const queue = await this.getOrCreateBullQueue({ orgId, queueId });
1311 const legacyJob = await queue.getJob(
1312 itemIdToBullJobId({ id: itemId, typeId: itemTypeId }),
1313 );
1314 if (legacyJob) {
1315 const job = await this.legacyJobToJob(legacyJob, orgId);
1316 return {
1317 job: job.data,
1318 queueId,
1319 };
1320 }
1321 return undefined;
1322 }),
1323 );
1324
1325 return filterNullOrUndefined(jobsWithQueue);
1326 }
1327
1328 async getHiddenActionsForQueue(opts: { queueId: string; orgId: string }) {
1329 const { queueId, orgId } = opts;
1330 return (
1331 await this.pgQuery
1332 .selectFrom('manual_review_tool.queues_and_hidden_actions')
1333 .select(['action_id'])
1334 .where('queue_id', '=', queueId)
1335 .where('org_id', '=', orgId)
1336 .execute()
1337 ).map((it) => it.action_id);
1338 }
1339
1340 // NB: Making the transaction required means this function should only be used
1341 // inside another transaction.
1342 async updateHiddenActionsForQueue(opts: {
1343 queueId: string;
1344 actionIdsToHide: readonly string[];
1345 actionIdsToUnhide: readonly string[];
1346 orgId: string;
1347 transaction?: Transaction<ManualReviewToolServicePg>;
1348 }) {
1349 const { transaction, queueId, actionIdsToHide, actionIdsToUnhide, orgId } =
1350 opts;
1351
1352 if (actionIdsToHide.length === 0 && actionIdsToUnhide.length === 0) {
1353 return undefined;
1354 }
1355
1356 const queryInterface = transaction ?? this.pgQuery;
1357
1358 if (actionIdsToHide.length > 0) {
1359 await queryInterface
1360 .insertInto('manual_review_tool.queues_and_hidden_actions')
1361 .values(
1362 actionIdsToHide.map((actionId) => ({
1363 queue_id: queueId,
1364 action_id: actionId,
1365 org_id: orgId,
1366 })),
1367 )
1368 .execute();
1369 }
1370
1371 if (actionIdsToUnhide.length > 0) {
1372 await queryInterface
1373 .deleteFrom('manual_review_tool.queues_and_hidden_actions')
1374 .where('queue_id', '=', queueId)
1375 .where('org_id', '=', orgId)
1376 .where('action_id', 'in', actionIdsToUnhide)
1377 .execute();
1378 }
1379 }
1380}
1381
1382/**
1383 * We want Bull to dedupe jobs on the same item, so this function maps each
1384 * distinct item identifier object to a string that can be used as a Bull job id.
1385 * Uses '.' as separator because BullMQ v5 disallows ':' in custom job ids.
1386 *
1387 * NB: only exported for use in tests.
1388 * @private
1389 */
1390const BULL_JOB_ID_SEPARATOR = '.';
1391
1392export function itemIdToBullJobId({ typeId, id }: ItemIdentifier) {
1393 if (!typeId || !id) {
1394 throw new Error('itemTypeId and itemId cannot be empty strings');
1395 }
1396
1397 return instantiateOpaqueType<BullJobId>(
1398 [typeId, id].map(b64UrlEncode).join(BULL_JOB_ID_SEPARATOR),
1399 );
1400}
1401
1402/**
1403 * Because BullJobIds are intentionally not globally unique (across time and
1404 * queues), this function takes a BullJobId and combines it with a guid to make
1405 * a truly unique id.
1406 *
1407 * NB: only exported for use in tests.
1408 * @private
1409 */
1410export function bullJobIdtoExternalJobId(
1411 bullJobId: BullJobId,
1412 guid = uuidv1(),
1413) {
1414 if (!guid) {
1415 throw new Error('guid cannot be empty.');
1416 }
1417
1418 return instantiateOpaqueType<JobId>(
1419 [bullJobId, guid].map(b64UrlEncode).join(':'),
1420 );
1421}
1422
1423/**
1424 * This function extracts the components of a JobId, so that we can look up the
1425 * job in Bull by its BullJobId.
1426 *
1427 * NB: only exported for use in tests.
1428 * @private
1429 */
1430export function parseExternalId(externalId: JobId) {
1431 const idParts = externalId.split(':') as [
1432 B64UrlOf<BullJobId>,
1433 B64UrlOf<string>,
1434 ];
1435 return { bullId: b64UrlDecode(idParts[0]), guid: b64UrlDecode(idParts[1]) };
1436}
1437
1438/**
1439 * This returns a UUID derived from an external JobId. In theory, this shouldn't
1440 * be necessary -- anything that needs to be logged as associated with the job
1441 * can just use the JobId directly -- but, historically, we've just used this
1442 * derived uuid (e.g., when logging decisions), because it's shorter.
1443 */
1444export function jobIdToGuid(jobId: JobId) {
1445 return parseExternalId(jobId).guid;
1446}
1447
1448type RedisConnection = IORedis.Redis | Cluster;
1449
1450export async function getBullWorker<JobData = unknown>(
1451 redisConnection: RedisConnection,
1452 queueKey: QueueKey,
1453) {
1454 const { queueId, orgId } = queueKey;
1455 const worker = new Worker(
1456 queueId,
1457 // An empty processor function is necessary to create
1458 // the timer manager that's used in checkStalledJobs
1459 async (_) => {
1460 return null;
1461 },
1462 {
1463 connection: redisConnection,
1464 lockDuration: 600000,
1465 prefix: getPrefix(orgId),
1466 autorun: false,
1467 // A job is put into stalled when a user claims it and then doesn't action
1468 // on it for the lockDuration. When the max limit is hit, Bull puts the
1469 // the job into a failed queue which isn't visible to the user and then
1470 // never dequeues again. This can lead to bad situations like a report
1471 // never being actioned on, or re-enqueues of a job looking like they're
1472 // not working. The default is 1, and 50 should be a large enough number
1473 // that no reasonable user should ever hit it.
1474 maxStalledCount: 50,
1475 },
1476 );
1477 // Start the stalled jobs checker for manual job processing
1478 await worker.startStalledCheckTimer();
1479
1480 // Cast worker to a version of its original type, but fixed to correctly
1481 // indicate that getNextJob() can return undefined
1482 return worker as unknown as Omit<Worker<JobData>, 'getNextJob'> & {
1483 getNextJob: (lockToken: string) => Promise<Job<JobData> | undefined>;
1484 };
1485}
1486
1487/**
1488 * NB: this is called getOrCreateBullQueue because BullMQ will silently create
1489 * a queue object (certainly in memory and maybe even leaving some traces in
1490 * Redis) if this is called with a queue id that doesn't exist yet.
1491 */
1492export async function getOrCreateBullQueue<JobData = unknown>(
1493 redisConnection: RedisConnection,
1494 queueKey: QueueKey,
1495) {
1496 const { orgId, queueId } = queueKey;
1497 return new Queue<JobData>(queueId, {
1498 connection: redisConnection,
1499 prefix: getPrefix(orgId),
1500 });
1501}
1502
1503/**
1504 * Constructs a prefix out of the orgId to use for automatic sharding.
1505 * Using the org ID ensures that all hashes in the same Org are stored together.
1506 */
1507function getPrefix(orgId: string) {
1508 return '{' + orgId + '}';
1509}
1510
1511export const makeDeleteAllJobsInsufficientPermissionsError = (
1512 data: ErrorInstanceData,
1513) =>
1514 new CoopError({
1515 status: 403,
1516 type: [ErrorType.Unauthorized],
1517 title: String(data.detail),
1518 name: 'DeleteAllJobsUnauthorizedError',
1519 ...data,
1520 });
1521
1522const makeQueueDoesNotExistError = (data: ErrorInstanceData) => {
1523 return new CoopError({
1524 status: 400,
1525 type: [ErrorType.InvalidUserInput],
1526 title: 'The queue with this ID does not exist',
1527 name: 'QueueDoesNotExistError',
1528 ...data,
1529 });
1530};
1531
1532export const makeUnableToDeleteDefaultQueueError = (
1533 data: ErrorInstanceData,
1534) => {
1535 return new CoopError({
1536 status: 403,
1537 type: [ErrorType.Unauthorized],
1538 title: 'Unable to delete default queue',
1539 name: 'UnableToDeleteDefaultQueueError',
1540 ...data,
1541 });
1542};
1543
1544export const makeManualReviewQueueNameExistsError = (data: ErrorInstanceData) =>
1545 new CoopError({
1546 status: 409,
1547 type: [ErrorType.UniqueViolation],
1548 title:
1549 'A manual review queue with that name already exists in this organization.',
1550 name: 'ManualReviewQueueNameExistsError',
1551 ...data,
1552 });