Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 1552 lines 50 kB view raw
1/* eslint-disable max-lines */ 2 3import { type ItemIdentifier } from '@roostorg/types'; 4import { Queue, Worker } from 'bullmq'; 5import { type Job } from 'bullmq'; 6import { type Cluster } from 'ioredis'; 7import type IORedis from 'ioredis'; 8import { type Kysely, type Transaction } from 'kysely'; 9import pLimit from 'p-limit'; 10import { type Opaque, type ReadonlyDeep } from 'type-fest'; 11import { v1 as uuidv1 } from 'uuid'; 12 13import { type Dependencies } from '../../../iocContainer/index.js'; 14import { 15 UserPermission, 16 type Invoker, 17} from '../../../models/types/permissioning.js'; 18import { cached, type Cached } from '../../../utils/caching.js'; 19import { filterNullOrUndefined } from '../../../utils/collections.js'; 20import { 21 b64UrlDecode, 22 b64UrlEncode, 23 type B64UrlOf, 24} from '../../../utils/encoding.js'; 25import { 26 CoopError, 27 ErrorType, 28 makeUnauthorizedError, 29 type ErrorInstanceData, 30} from '../../../utils/errors.js'; 31import { isUniqueViolationError } from '../../../utils/kysely.js'; 32import { removeUndefinedKeys, safePick } from '../../../utils/misc.js'; 33import { replaceEmptyStringWithNull } from '../../../utils/string.js'; 34import { WEEK_MS } from '../../../utils/time.js'; 35import { 36 instantiateOpaqueType, 37 type Bind1, 38} from '../../../utils/typescript-types.js'; 39import { 40 getFieldValueForRole, 41 makeSubmissionId, 42} from '../../itemProcessingService/index.js'; 43import { type ItemSubmissionWithTypeIdentifier } from '../../itemProcessingService/makeItemSubmissionWithTypeIdentifier.js'; 44import { type ManualReviewToolServicePg } from '../dbTypes.js'; 45import { 46 type AppealEnqueueSourceInfo, 47 type JobId, 48 type LegacyItemWithTypeIdentifier, 49 type ManualReviewAppealJob, 50 type ManualReviewAppealJobPayload, 51 type ManualReviewJob, 52 type ManualReviewJobEnqueueSourceInfo, 53 type ManualReviewJobPayload, 54 type OriginJobInfo, 55 type StoredManualReviewJob, 56} from '../manualReviewToolService.js'; 57 58export type ManualReviewQueue = { 59 id: string; 60 name: string; 61 description: string | null; 62 orgId: string; 63 createdAt: Date; 64 isDefaultQueue: boolean; 65 isAppealsQueue: boolean; 66 autoCloseJobs: boolean; 67}; 68 69const PgQueueSelection = [ 70 'id', 71 'org_id as orgId', 72 'name', 73 'description', 74 'is_default_queue as isDefaultQueue', 75 'created_at as createdAt', 76 'is_appeals_queue as isAppealsQueue', 77 'auto_close_jobs as autoCloseJobs', 78] as const; 79 80// BullJobId represents the ID we give a job within Bull. It's only unique among 81// the pending jobs within a single queue. Over time, multiple jobs will end up 82// with the same BullJobId if the same item is enqueued multiple times (with the 83// second enqueue happening in a different queue or after the first job has been 84// processed and is no longer saved in Bull). However, if one attempts to 85// enqueue a new job with the same BullJobId while another job with that id is 86// still in the queue, Bull will ignore the second enqueue, because it dedupes 87// on the basis of this id, which is exactly what we want. 88type BullJobId = Opaque<string, 'BullJobId'>; 89 90// TODO: use this. 91export type ManualReviewQueueErrorType = 'ManualReviewQueueNameExistsError'; 92export type QueueOperationsErrorType = 93 | 'DeleteAllJobsUnauthorizedError' 94 | 'QueueDoesNotExistError' 95 | 'UnableToDeleteDefaultQueueError'; 96 97// Compound identifier for a queue. orgId is needed for security, but also 98// because queues are/will be actually sharded across redis instances for 99// scaling by orgId, so you need the orgId to find the queue. 100type QueueKey = { orgId: string; queueId: string }; 101 102/** 103 * This class handles everything that MRT does directly with queues: CRUDing 104 * them, enqueuing and dequeueing jobs on a given queue, looking up jobs within 105 * a given queue, etc. It does not deal with routing jobs to queues or forming 106 * the job payloads that should be enqueued. 107 * 108 * The state for queues is split between BullMQ and Postgres, and this class 109 * coordinates those two backends where needed to present a single, logical API 110 * (see, e.g., the use of {@link #checkQueueExists} inside 111 * {@link #getBullQueue}, and the logic in {@link deleteManualReviewQueue}). 112 * 113 * This class also implements (and hides the implementation details of) 114 * MRT-specific logic for ignoring an enqueue when a job already exists for the 115 * same item. 116 * 117 * Finally, it handles the mismatch between how Bull is normally used (i.e., 118 * with a small number of long-running workers that automatically dequeue and 119 * process jobs, and a few long-lived queue object references that are used to 120 * push jobs to those workers) and how we use it in MRT, where users -- not an 121 * automatic worker -- manually dequeue jobs and mark them complete, and where 122 * there are many, many queues (not all of which we want to keep references to 123 * in memory or connected to Redis at all times). 124 * 125 * As part of handling that mismatch, this class exposes an API that solely 126 * accepts and returns plain data values, as opposed to the stateful 127 * Queue/Worker/Job objects that Bull usually deals with. While this 128 * occasionally adds some overhead, that overhead is minimized by smart caching 129 * internally, and this sort of API also makes the class much easier to mock. 130 */ 131export default class QueueOperations { 132 private readonly getOrCreateBullQueue: Cached< 133 Bind1<typeof getOrCreateBullQueue<StoredManualReviewJob>> 134 >; 135 private readonly getBullWorker: Cached< 136 Bind1<typeof getBullWorker<StoredManualReviewJob>> 137 >; 138 139 private readonly getOrCreateBullAppealQueue: Cached< 140 Bind1<typeof getOrCreateBullQueue<ManualReviewAppealJob>> 141 >; 142 private readonly getBullAppealWorker: Cached< 143 Bind1<typeof getBullWorker<ManualReviewAppealJob>> 144 >; 145 146 constructor( 147 private readonly pgQuery: Kysely<ManualReviewToolServicePg>, 148 private readonly pgQueryReadReplica: Kysely<ManualReviewToolServicePg>, 149 private readonly moderationConfigService: Dependencies['ModerationConfigService'], 150 redis: RedisConnection, 151 ) { 152 // Reassingment here is a hack to work around TS syntax limitations 153 // with generic instantiation expressions. 154 const getOrCreateBullQueue_ = getOrCreateBullQueue<StoredManualReviewJob>; 155 const getBullWorker_ = getBullWorker<StoredManualReviewJob>; 156 const getOrCreateBullAppealQueue_ = 157 getOrCreateBullQueue<ManualReviewAppealJob>; 158 const getBullAppealWorker_ = getBullWorker<ManualReviewAppealJob>; 159 160 this.getBullWorker = cached({ 161 producer: getBullWorker_.bind(null, redis), 162 directives: { freshUntilAge: 600 }, 163 numItemsLimit: 128, 164 onItemEviction: async (workerPromise) => { 165 await workerPromise.close(); 166 }, 167 }); 168 169 this.getOrCreateBullQueue = cached({ 170 producer: getOrCreateBullQueue_.bind(null, redis), 171 directives: { freshUntilAge: 600 }, 172 numItemsLimit: 128, 173 onItemEviction: async (queuePromise) => { 174 await queuePromise.close(); 175 }, 176 }); 177 178 this.getBullAppealWorker = cached({ 179 producer: getBullAppealWorker_.bind(null, redis), 180 directives: { freshUntilAge: 600 }, 181 numItemsLimit: 128, 182 onItemEviction: async (workerPromise) => { 183 await workerPromise.close(); 184 }, 185 }); 186 187 this.getOrCreateBullAppealQueue = cached({ 188 producer: getOrCreateBullAppealQueue_.bind(null, redis), 189 directives: { freshUntilAge: 600 }, 190 numItemsLimit: 128, 191 onItemEviction: async (queuePromise) => { 192 await queuePromise.close(); 193 }, 194 }); 195 } 196 197 async checkQueueExists(orgId: string, queueId: string) { 198 // NB: this intentionally hits the primary, to make sure we're giving an 199 // accurate result before creating the Bull queue. 200 const queue = await this.pgQuery 201 .selectFrom('manual_review_tool.manual_review_queues') 202 .select(PgQueueSelection) 203 .where('org_id', '=', orgId) 204 .where('id', '=', queueId) 205 .executeTakeFirst(); 206 if (queue === undefined) { 207 throw makeQueueDoesNotExistError({ shouldErrorSpan: true }); 208 } 209 } 210 211 async #getBullQueue(orgId: string, queueId: string) { 212 await this.checkQueueExists(orgId, queueId); 213 return this.getOrCreateBullQueue({ orgId, queueId }); 214 } 215 async #getBullAppealQueue(orgId: string, queueId: string) { 216 await this.checkQueueExists(orgId, queueId); 217 return this.getOrCreateBullAppealQueue({ orgId, queueId }); 218 } 219 220 // TODO: try/catch create and update and throw 221 // ManualReviewQueueNameExistsError and NotFoundError 222 async createManualReviewQueue(input: { 223 name: string; 224 description: string | null; 225 userIds: readonly string[]; 226 hiddenActionIds: readonly string[]; 227 invokedBy: Invoker; 228 isAppealsQueue?: boolean; 229 autoCloseJobs?: boolean; 230 }) { 231 const { 232 name, 233 description, 234 userIds, 235 hiddenActionIds, 236 invokedBy, 237 isAppealsQueue, 238 autoCloseJobs, 239 } = input; 240 const { orgId } = invokedBy; 241 242 if (!invokedBy.permissions.includes(UserPermission.EDIT_MRT_QUEUES)) { 243 throw makeUnauthorizedError( 244 'You do not have permission to create a queue', 245 { shouldErrorSpan: true }, 246 ); 247 } 248 249 try { 250 return await this.pgQuery.transaction().execute(async (transaction) => { 251 // In newer versions of kysely, this is greatly simplified with 252 // `transaction.selectNoFrom(eb => eb.exists(...))`, but we're blocked on 253 // updating by https://github.com/kysely-org/kysely/issues/577#issuecomment-1804900006 254 const orgHasQueuesAlready = await transaction 255 .selectFrom('manual_review_tool.manual_review_queues') 256 .where('org_id', '=', orgId) 257 .where('is_appeals_queue', '=', isAppealsQueue ?? false) 258 .limit(1) 259 .execute() 260 .then((queues) => queues.length > 0); 261 262 const queue = await transaction 263 .insertInto('manual_review_tool.manual_review_queues') 264 .returning(PgQueueSelection) 265 .values([ 266 { 267 id: uuidv1(), 268 name, 269 org_id: orgId, 270 is_default_queue: !orgHasQueuesAlready, 271 description: replaceEmptyStringWithNull(description), 272 is_appeals_queue: isAppealsQueue ?? false, 273 auto_close_jobs: autoCloseJobs ?? false, 274 }, 275 ]) 276 .executeTakeFirstOrThrow(); 277 278 await transaction 279 .insertInto('manual_review_tool.users_and_accessible_queues') 280 .values( 281 userIds.map((userId) => ({ 282 queue_id: queue.id, 283 user_id: userId, 284 })), 285 ) 286 .executeTakeFirstOrThrow(); 287 288 await this.updateHiddenActionsForQueue({ 289 transaction, 290 queueId: queue.id, 291 actionIdsToHide: hiddenActionIds, 292 actionIdsToUnhide: [], 293 orgId, 294 }); 295 return queue; 296 }); 297 } catch (e) { 298 if (isUniqueViolationError(e)) { 299 throw makeManualReviewQueueNameExistsError({ shouldErrorSpan: true }); 300 } 301 throw e; 302 } 303 } 304 305 async updateManualReviewQueue(input: { 306 orgId: string; 307 queueId: string; 308 name?: string; 309 description?: string | null; 310 userIds: readonly string[]; 311 actionIdsToHide: readonly string[]; 312 actionIdsToUnhide: readonly string[]; 313 autoCloseJobs?: boolean; 314 }) { 315 const { 316 queueId, 317 orgId, 318 name, 319 description, 320 userIds, 321 actionIdsToHide, 322 actionIdsToUnhide, 323 autoCloseJobs, 324 } = input; 325 326 return this.pgQuery.transaction().execute(async (transaction) => { 327 const [updatedQueue, _, __] = await Promise.all([ 328 transaction 329 .updateTable('manual_review_tool.manual_review_queues') 330 .set( 331 removeUndefinedKeys({ 332 name, 333 description: replaceEmptyStringWithNull(description), 334 auto_close_jobs: autoCloseJobs, 335 }), 336 ) 337 .where('id', '=', queueId) 338 .where('org_id', '=', orgId) 339 .returning(PgQueueSelection) 340 .executeTakeFirstOrThrow(), 341 transaction 342 .insertInto('manual_review_tool.users_and_accessible_queues') 343 .values( 344 userIds.map((userId) => ({ 345 queue_id: queueId, 346 user_id: userId, 347 })), 348 ) 349 .onConflict((oc) => oc.doNothing()) 350 .executeTakeFirstOrThrow(), 351 transaction 352 .deleteFrom('manual_review_tool.users_and_accessible_queues') 353 .where('queue_id', '=', queueId) 354 .where('user_id', 'not in', userIds) 355 .executeTakeFirstOrThrow(), 356 ]); 357 358 await this.updateHiddenActionsForQueue({ 359 transaction, 360 queueId, 361 orgId, 362 actionIdsToHide, 363 actionIdsToUnhide, 364 }); 365 return updatedQueue; 366 }); 367 } 368 369 /** 370 * @returns true when the queue that was trying to be deleted 371 * exists and is successfully deleted, false when the queue 372 * did not exist and throws when the delete fails for some reason 373 */ 374 async deleteManualReviewQueue(orgId: string, queueId: string) { 375 const defaultQueueId = await this.getDefaultQueueIdForOrg(orgId); 376 if (queueId === defaultQueueId) { 377 throw makeUnableToDeleteDefaultQueueError({ shouldErrorSpan: true }); 378 } 379 const queue = await this.getOrCreateBullQueue({ orgId, queueId }); 380 381 await queue.obliterate({ force: true }); 382 383 const [{ numDeletedRows }, _] = await this.pgQuery 384 .transaction() 385 .execute(async (transaction) => { 386 return Promise.all([ 387 transaction 388 .deleteFrom('manual_review_tool.manual_review_queues') 389 .where('id', '=', queueId) 390 .where('org_id', '=', orgId) 391 .executeTakeFirstOrThrow(), 392 transaction 393 .deleteFrom('manual_review_tool.users_and_accessible_queues') 394 .where('queue_id', '=', queueId) 395 .execute(), 396 ]); 397 }); 398 399 return numDeletedRows === 1n; 400 } 401 402 async deleteManualReviewQueueForTestsDO_NOT_USE( 403 orgId: string, 404 queueId: string, 405 ) { 406 const queue = await this.getOrCreateBullQueue({ orgId, queueId }); 407 408 await queue.obliterate({ force: true }); 409 410 const [{ numDeletedRows }, _] = await this.pgQuery 411 .transaction() 412 .execute(async (transaction) => { 413 return Promise.all([ 414 transaction 415 .deleteFrom('manual_review_tool.manual_review_queues') 416 .where('id', '=', queueId) 417 .where('org_id', '=', orgId) 418 .executeTakeFirstOrThrow(), 419 transaction 420 .deleteFrom('manual_review_tool.users_and_accessible_queues') 421 .where('queue_id', '=', queueId) 422 .execute(), 423 ]); 424 }); 425 426 return numDeletedRows === 1n; 427 } 428 429 async getDefaultQueueIdForOrg(orgId: string) { 430 const queue = await this.pgQuery 431 .selectFrom('manual_review_tool.manual_review_queues') 432 .select(['id']) 433 .where('org_id', '=', orgId) 434 .where('is_appeals_queue', '=', false) 435 .where('is_default_queue', '=', true) 436 .executeTakeFirstOrThrow(); 437 return queue.id; 438 } 439 440 async getDefaultAppealsQueueIdForOrg(orgId: string) { 441 const queue = await this.pgQuery 442 .selectFrom('manual_review_tool.manual_review_queues') 443 .select(['id']) 444 .where('org_id', '=', orgId) 445 .where('is_appeals_queue', '=', true) 446 .where('is_default_queue', '=', true) 447 .executeTakeFirstOrThrow(); 448 return queue.id; 449 } 450 451 async getReviewableQueuesForUser(opts: { invoker: Invoker }) { 452 const { invoker } = opts; 453 const { userId, permissions, orgId } = invoker; 454 455 const canSeeQueues = permissions.includes(UserPermission.VIEW_MRT); 456 const bypassQueuePermissions = permissions.includes( 457 UserPermission.EDIT_MRT_QUEUES, 458 ); 459 460 if (!canSeeQueues) { 461 return []; 462 } 463 464 return this.pgQuery 465 .selectFrom('manual_review_tool.manual_review_queues') 466 .select(PgQueueSelection) 467 .where('org_id', '=', orgId) 468 .$if(!bypassQueuePermissions, (query) => 469 query.where( 470 'id', 471 'in', 472 this.pgQuery 473 .selectFrom('manual_review_tool.users_and_accessible_queues') 474 .select('queue_id') 475 .where('user_id', '=', userId), 476 ), 477 ) 478 .execute(); 479 } 480 481 async getQueueForOrg(opts: { 482 orgId: string; 483 userId: string; 484 queueId: string; 485 }) { 486 const { orgId, userId, queueId } = opts; 487 return this.pgQuery 488 .selectFrom('manual_review_tool.manual_review_queues') 489 .select(PgQueueSelection) 490 .where('org_id', '=', orgId) 491 .where('id', '=', queueId) 492 .where( 493 'id', 494 'in', 495 this.pgQuery 496 .selectFrom('manual_review_tool.users_and_accessible_queues') 497 .select('queue_id') 498 .where('user_id', '=', userId), 499 ) 500 .executeTakeFirst(); 501 } 502 503 async getFavoriteQueuesForUser(opts: { orgId: string; userId: string }) { 504 const { orgId, userId } = opts; 505 return this.pgQuery 506 .selectFrom( 507 'manual_review_tool.users_and_favorite_mrt_queues as favorite_queues', 508 ) 509 .innerJoin( 510 'manual_review_tool.manual_review_queues as queues', 511 'favorite_queues.queue_id', 512 'queues.id', 513 ) 514 .select([ 515 'queues.id', 516 'queues.org_id as orgId', 517 'queues.name', 518 'queues.description', 519 'queues.is_default_queue as isDefaultQueue', 520 'queues.created_at as createdAt', 521 'queues.is_appeals_queue as isAppealsQueue', 522 'queues.auto_close_jobs as autoCloseJobs', 523 ]) 524 .where('favorite_queues.user_id', '=', userId) 525 .where('favorite_queues.org_id', '=', orgId) 526 .execute(); 527 } 528 529 async addFavoriteQueueForUser(opts: { 530 userId: string; 531 orgId: string; 532 queueId: string; 533 }) { 534 const { userId, orgId, queueId } = opts; 535 await this.pgQuery 536 .insertInto('manual_review_tool.users_and_favorite_mrt_queues') 537 .values({ user_id: userId, org_id: orgId, queue_id: queueId }) 538 .onConflict((oc) => oc.doNothing()) 539 .execute(); 540 } 541 542 async removeFavoriteQueueForUser(opts: { 543 userId: string; 544 orgId: string; 545 queueId: string; 546 }) { 547 const { userId, orgId, queueId } = opts; 548 await this.pgQuery 549 .deleteFrom('manual_review_tool.users_and_favorite_mrt_queues') 550 .where('user_id', '=', userId) 551 .where('org_id', '=', orgId) 552 .where('queue_id', '=', queueId) 553 .execute(); 554 } 555 556 async getAllQueuesForOrgAndDangerouslyBypassPermissioning(orgId: string) { 557 return this.pgQuery 558 .selectFrom('manual_review_tool.manual_review_queues') 559 .select(PgQueueSelection) 560 .where('org_id', '=', orgId) 561 .execute(); 562 } 563 564 async getQueueForOrgAndDangerouslyBypassPermissioning(opts: { 565 orgId: string; 566 queueId: string; 567 }) { 568 return this.pgQuery 569 .selectFrom('manual_review_tool.manual_review_queues') 570 .select(PgQueueSelection) 571 .where('org_id', '=', opts.orgId) 572 .where('id', '=', opts.queueId) 573 .executeTakeFirst(); 574 } 575 576 async getUsersWhoCanSeeQueue(opts: { orgId: string; queueId: string }) { 577 const { orgId, queueId } = opts; 578 return this.pgQuery 579 .selectFrom('manual_review_tool.users_and_accessible_queues') 580 .select(['user_id as userId']) 581 .where('queue_id', '=', queueId) 582 .where( 583 'queue_id', 584 'in', 585 this.pgQuery 586 .selectFrom('manual_review_tool.manual_review_queues') 587 .select('id') 588 .where('org_id', '=', orgId), 589 ) 590 .execute(); 591 } 592 593 async addAccessibleQueuesForUser( 594 userIds: string[], 595 queueIds: readonly string[], 596 ) { 597 return this.pgQuery 598 .insertInto('manual_review_tool.users_and_accessible_queues') 599 .values( 600 userIds.flatMap((userId) => 601 queueIds.map((queueId) => ({ queue_id: queueId, user_id: userId })), 602 ), 603 ) 604 .onConflict((oc) => oc.doNothing()) 605 .execute(); 606 } 607 608 async removeAccessibleQueuesForUser( 609 userId: string, 610 queueIds: readonly string[], 611 ) { 612 return this.pgQuery 613 .deleteFrom('manual_review_tool.users_and_accessible_queues') 614 .where('user_id', '=', userId) 615 .where('queue_id', 'in', queueIds) 616 .execute(); 617 } 618 619 async #getJob( 620 externalId: JobId, 621 queue: ReadonlyDeep<Queue<StoredManualReviewJob>>, 622 ) { 623 const { bullId } = parseExternalId(externalId); 624 625 const job = await queue.getJob(bullId); 626 627 // NB: this check ensures that the job is actually the requested one, 628 // not a new job about the same item. 629 return job?.data.id === externalId ? job : undefined; 630 } 631 632 async #getAppealJob( 633 externalId: JobId, 634 queue: ReadonlyDeep<Queue<ManualReviewAppealJob>>, 635 ) { 636 const { bullId } = parseExternalId(externalId); 637 638 const job = await queue.getJob(bullId); 639 640 // NB: this check ensures that the job is actually the requested one, 641 // not a new job about the same item. 642 return job?.data.id === externalId ? job : undefined; 643 } 644 645 async getJobFromItemId(opts: { 646 orgId: string; 647 queueId: string; 648 itemId: string; 649 itemTypeId: string; 650 }): Promise<ManualReviewJob | undefined> { 651 const { orgId, queueId, itemId, itemTypeId } = opts; 652 const queue = await this.#getBullQueue(orgId, queueId); 653 const job = await queue.getJob( 654 itemIdToBullJobId({ id: itemId, typeId: itemTypeId }), 655 ); 656 if (job) { 657 const converted = await this.legacyJobToJob(job, orgId); 658 return converted.data; 659 } else { 660 return undefined; 661 } 662 } 663 664 /** 665 * Enqueues a new job to the specified queue, or does nothing if a job already 666 * exists in that queue for the same item. 667 */ 668 async addJob(opts: { 669 orgId: string; 670 queueId: string; 671 reenqueuedFrom?: OriginJobInfo; 672 jobPayload: { 673 createdAt?: Date; 674 policyIds: string[]; 675 payload: ManualReviewJobPayload; 676 }; 677 enqueueSourceInfo: ManualReviewJobEnqueueSourceInfo; 678 }) { 679 const { orgId, queueId, jobPayload, reenqueuedFrom, enqueueSourceInfo } = 680 opts; 681 const { payload, policyIds } = jobPayload; 682 683 const queue = await this.#getBullQueue(orgId, queueId); 684 685 const bullJobId = itemIdToBullJobId({ 686 id: payload.item.itemId, 687 typeId: payload.item.itemTypeIdentifier.id, 688 }); 689 const jobId = bullJobIdtoExternalJobId(bullJobId); 690 const createdAt = jobPayload.createdAt ?? new Date(); 691 692 const newJob = await queue.add( 693 bullJobId, 694 { 695 orgId, 696 payload, 697 id: jobId, 698 createdAt, 699 policyIds, 700 reenqueuedFrom, 701 enqueueSourceInfo, 702 }, 703 { removeOnComplete: true, jobId: bullJobId }, 704 ); 705 706 // Again, because new job data comes in in the non-legacy format, it's safe 707 // to cast. 708 return newJob.data satisfies StoredManualReviewJob as ManualReviewJob; 709 } 710 711 /** 712 * Enqueues a new appeal job to the specified queue, or does nothing if a job already 713 * exists in that queue for the same item. 714 */ 715 async addAppealJob(opts: { 716 orgId: string; 717 queueId: string; 718 reenqueuedFrom?: OriginJobInfo; 719 jobPayload: { 720 createdAt?: Date; 721 policyIds: string[]; 722 payload: ManualReviewAppealJobPayload; 723 }; 724 enqueueSourceInfo: AppealEnqueueSourceInfo; 725 }) { 726 const { orgId, queueId, jobPayload, reenqueuedFrom, enqueueSourceInfo } = 727 opts; 728 const { payload, policyIds } = jobPayload; 729 730 const queue = await this.#getBullAppealQueue(orgId, queueId); 731 732 const bullJobId = itemIdToBullJobId({ 733 id: payload.item.itemId, 734 typeId: payload.item.itemTypeIdentifier.id, 735 }); 736 const jobId = bullJobIdtoExternalJobId(bullJobId); 737 const createdAt = jobPayload.createdAt ?? new Date(); 738 739 const newJob = await queue.add( 740 bullJobId, 741 { 742 orgId, 743 payload, 744 id: jobId, 745 createdAt, 746 policyIds, 747 reenqueuedFrom, 748 enqueueSourceInfo, 749 }, 750 { removeOnComplete: true, jobId: bullJobId }, 751 ); 752 753 return newJob.data; 754 } 755 756 async getAppealJobs(opts: { 757 orgId: string; 758 queueId: string; 759 jobIds: Readonly<JobId[]>; 760 }): Promise<ManualReviewAppealJob[]> { 761 const limit = pLimit(10); 762 const { orgId, queueId, jobIds } = opts; 763 764 const queue = await this.#getBullAppealQueue(orgId, queueId); 765 const jobs = await Promise.all( 766 jobIds.map(async (jobId) => 767 limit(async () => { 768 return this.#getAppealJob(jobId, queue); 769 }), 770 ), 771 ); 772 773 return filterNullOrUndefined(jobs).map((job) => job.data); 774 } 775 776 async getJobs(opts: { 777 orgId: string; 778 queueId: string; 779 jobIds: Readonly<JobId[]>; 780 }): Promise<ManualReviewJob[]> { 781 const limit = pLimit(10); 782 const { orgId, queueId, jobIds } = opts; 783 784 const queue = await this.#getBullQueue(orgId, queueId); 785 const jobs = await Promise.all( 786 jobIds.map(async (jobId) => 787 limit(async () => { 788 const job = await this.#getJob(jobId, queue); 789 return job ? this.legacyJobToJob(job, orgId) : undefined; 790 }), 791 ), 792 ); 793 794 return filterNullOrUndefined(jobs).map((job) => job.data); 795 } 796 797 /** 798 * This is currently only being used for testing, and should be 799 * deleted after we rework the list view. 800 */ 801 async getAllJobsForQueue(opts: { orgId: string; queueId: string }) { 802 const limit = pLimit(10); 803 const { orgId, queueId } = opts; 804 const queue = await this.#getBullQueue(orgId, queueId); 805 // Get up to 50 jobs per queue. 806 const legacyJobs = await queue.getJobs(undefined, 0, 50); 807 const jobs = await Promise.all( 808 // eslint-disable-next-line @typescript-eslint/promise-function-async 809 legacyJobs.map((job) => limit(() => this.legacyJobToJob(job, orgId))), 810 ); 811 return filterNullOrUndefined(jobs).map((job) => job.data); 812 } 813 814 async updateJobForQueue(opts: { 815 orgId: string; 816 queueId: string; 817 jobId: JobId; 818 data: ManualReviewJob; 819 }) { 820 const { orgId, queueId, jobId, data } = opts; 821 const queue = await this.#getBullQueue(orgId, queueId); 822 const { bullId } = parseExternalId(jobId); 823 const job = await queue.getJob(bullId); 824 await job?.updateData(data); 825 826 // Because the `data` arg above is a ManualReviewJob, we know the stored 827 // data for this particular job won't be in the legacy format. 828 return job?.data satisfies StoredManualReviewJob | undefined as 829 | ManualReviewJob 830 | undefined; 831 } 832 833 async deleteAllJobsFromQueue(opts: { 834 orgId: string; 835 queueId: string; 836 userPermissions: readonly UserPermission[]; 837 }) { 838 const { orgId, queueId, userPermissions } = opts; 839 if (!userPermissions.includes(UserPermission.EDIT_MRT_QUEUES)) { 840 throw makeDeleteAllJobsInsufficientPermissionsError({ 841 shouldErrorSpan: true, 842 }); 843 } 844 845 const queue = await this.#getBullQueue(orgId, queueId); 846 await queue.obliterate({ force: true }); 847 } 848 849 async dequeueNextAppealJobWithLock(opts: { 850 orgId: string; 851 queueId: string; 852 lockToken: string; 853 }): Promise<{ 854 job: ManualReviewAppealJob; 855 lockToken: string; 856 } | null> { 857 const { orgId, queueId, lockToken } = opts; 858 859 await this.checkQueueExists(orgId, queueId); 860 const worker = await this.getBullAppealWorker({ orgId, queueId }); 861 862 let hasDecision = true; 863 while (hasDecision) { 864 const job = await worker.getNextJob(lockToken); 865 866 if (!job) { 867 return null; 868 } 869 870 // There is a race condition due to the locking mechanism where a job can 871 // be decided on but not dequeued, so we check here if the first job in the 872 // queue has a decision, and if so use the lock token to immediately 873 // remove it, then grab a new job and return to the caller. it is very 874 // unlikely that there are multiple jobs like this at the front of the 875 // queue, but not impossible. 876 const decision = await this.pgQueryReadReplica 877 .selectFrom('manual_review_tool.manual_review_decisions') 878 .where('created_at', '>=', new Date('2023-10-01')) 879 .where('org_id', '=', orgId) 880 .where('id', '=', jobIdToGuid(job.data.id)) 881 .executeTakeFirst(); 882 883 hasDecision = decision !== undefined; 884 885 if (hasDecision) { 886 await this.removeJob({ 887 orgId, 888 queueId, 889 lockToken, 890 jobId: job.data.id, 891 }).catch(() => {}); 892 // then continue while loop 893 } else { 894 // this is the most likely case, where there is a job 895 // and it has never been decided before 896 return { job: job.data, lockToken }; 897 } 898 } 899 return null; 900 } 901 902 async dequeueNextJobWithLock(opts: { 903 orgId: string; 904 queueId: string; 905 lockToken: string; 906 }): Promise<{ 907 job: ManualReviewJob; 908 lockToken: string; 909 } | null> { 910 const { orgId, queueId, lockToken } = opts; 911 912 await this.checkQueueExists(orgId, queueId); 913 const worker = await this.getBullWorker({ orgId, queueId }); 914 915 let hasDecision = true; 916 while (hasDecision) { 917 const job = await worker.getNextJob(lockToken); 918 919 if (!job) { 920 return null; 921 } 922 923 const convertedJob = await this.legacyJobToJob(job, orgId); 924 925 // There is a race condition due to the locking mechanism where a job can 926 // be decided on but not dequeued, so we check here if the first job in the 927 // queue has a decision, and if so use the lock token to immediately 928 // remove it, then grab a new job and return to the caller. it is very 929 // unlikely that there are multiple jobs like this at the front of the 930 // queue, but not impossible. 931 const decision = await this.pgQueryReadReplica 932 .selectFrom('manual_review_tool.manual_review_decisions') 933 .select(['decision_components']) // not really necessary to return anything 934 .where('created_at', '>=', new Date('2023-10-01')) 935 .where('org_id', '=', orgId) 936 .where('id', '=', jobIdToGuid(convertedJob.data.id)) 937 .executeTakeFirst(); 938 939 hasDecision = decision !== undefined; 940 941 if (hasDecision) { 942 await this.removeJob({ 943 orgId, 944 queueId, 945 lockToken, 946 jobId: convertedJob.data.id, 947 }).catch(() => {}); 948 // then continue while loop 949 } else { 950 // this is the most likely case, where there is a job 951 // and it has never been decided before 952 return { job: convertedJob.data, lockToken }; 953 } 954 } 955 return null; 956 } 957 958 /** 959 * When returning a job that's already in the new format, ensure DEFAULT 960 * payloads never carry allMediaItems so they resolve to the regular manual 961 * review view (not NCMEC) and show full decision options. 962 */ 963 #returnJobWithNormalizedDefaultPayloadIfNeeded( 964 job: Job<ManualReviewJob>, 965 ): Job<ManualReviewJob> { 966 const p = job.data.payload as Record<string, unknown> & { kind?: string }; 967 if (p.kind !== 'DEFAULT' || !('allMediaItems' in p)) { 968 return job; 969 } 970 const payloadWithUnknownKeys = job.data.payload as Record<string, unknown>; 971 const { allMediaItems: _omitted, ...payloadWithoutNcmec } = 972 payloadWithUnknownKeys; 973 job.data = { 974 ...job.data, 975 payload: payloadWithoutNcmec as ManualReviewJobPayload, 976 }; 977 return job; 978 } 979 980 /** 981 * TODO: remove when we no longer need to support legacy jobs 982 */ 983 async legacyJobToJob( 984 job: Job<StoredManualReviewJob>, 985 orgId: string, 986 ): Promise<Job<ManualReviewJob>> { 987 if ( 988 'policyIds' in job.data && 989 'submissionId' in job.data.payload.item && 990 'reportedForReasons' in job.data.payload && 991 'reportHistory' in job.data.payload 992 ) { 993 return this.#returnJobWithNormalizedDefaultPayloadIfNeeded( 994 job as Job<ManualReviewJob>, 995 ); 996 } 997 998 const legacyItem = job.data.payload.item; 999 const convertedItem = 1000 await this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier( 1001 orgId, 1002 legacyItem, 1003 ); 1004 1005 let payload: ManualReviewJobPayload; 1006 // The type complexity of both StoredManualReviewJob and 1007 // ManualReviewJobPayload is pretty overwhelming, even for TS. so we are 1008 // putting the payload kind in a variable to help TS do some type narrowing. 1009 const jobKind = job.data.payload.kind; 1010 if (jobKind === 'DEFAULT') { 1011 const { allMediaItems: _omitted, ...storedPayloadWithoutNcmec } = 1012 job.data.payload as Record<string, unknown> & { allMediaItems?: unknown }; 1013 payload = { 1014 ...storedPayloadWithoutNcmec, 1015 kind: 'DEFAULT', 1016 item: convertedItem, 1017 1018 ...('itemThreadContentItems' in job.data.payload 1019 ? { 1020 itemThreadContentItems: job.data.payload.itemThreadContentItems 1021 ? await Promise.all( 1022 job.data.payload.itemThreadContentItems.map(async (it) => 1023 'submissionId' in it 1024 ? it 1025 : this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier( 1026 orgId, 1027 it, 1028 ), 1029 ), 1030 ) 1031 : job.data.payload.itemThreadContentItems, 1032 } 1033 : {}), 1034 ...('additionalContentItems' in job.data.payload 1035 ? { 1036 additionalContentItems: job.data.payload.additionalContentItems 1037 ? await Promise.all( 1038 job.data.payload.additionalContentItems.map(async (it) => 1039 'submissionId' in it 1040 ? it // leave as-is if we're already in the new format 1041 : this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier( 1042 orgId, 1043 it, 1044 ), 1045 ), 1046 ) 1047 : job.data.payload.additionalContentItems, 1048 } 1049 : {}), 1050 ...(!('reportedForReasons' in job.data.payload) 1051 ? { 1052 reportedForReasons: [ 1053 { 1054 reason: 1055 'reportedForReason' in job.data.payload 1056 ? job.data.payload.reportedForReason 1057 : undefined, 1058 reporterId: job.data.payload.reporterIdentifier, 1059 }, 1060 ], 1061 } 1062 : { reportedForReasons: job.data.payload.reportedForReasons ?? [] }), 1063 // Not worth building this as slowly all lobs will have it, we can miss 1064 // one report's worth of history until then 1065 ...(!('reportHistory' in job.data.payload) 1066 ? { reportHistory: [] } 1067 : { reportHistory: job.data.payload.reportHistory }), 1068 }; 1069 } else { 1070 // kind is inferred to be 'NCMEC' here 1071 payload = { 1072 ...job.data.payload, 1073 kind: 'NCMEC', 1074 item: convertedItem, 1075 allMediaItems: await Promise.all( 1076 'allMediaItems' in job.data.payload 1077 ? job.data.payload.allMediaItems.map(async (mediaItem) => { 1078 if ('submissionId' in mediaItem.contentItem) { 1079 return { 1080 ...mediaItem, 1081 contentItem: mediaItem.contentItem, 1082 }; 1083 } 1084 1085 return { 1086 ...mediaItem, 1087 contentItem: 1088 await this.legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier( 1089 orgId, 1090 mediaItem.contentItem, 1091 ), 1092 }; 1093 }) 1094 : [], 1095 ), 1096 ...(!('reportHistory' in job.data.payload) 1097 ? { reportHistory: [] } 1098 : { reportHistory: job.data.payload.reportHistory }), 1099 }; 1100 } 1101 1102 const policyIds = 1103 'policyIds' in job.data 1104 ? job.data.policyIds 1105 : 'policyId' in job.data.payload && job.data.payload.policyId 1106 ? [job.data.payload.policyId] 1107 : []; 1108 1109 const convertedJobData = { 1110 ...safePick(job.data, [ 1111 'id', 1112 'createdAt', 1113 'orgId', 1114 'reenqueuedFrom', 1115 'enqueueSourceInfo', 1116 ]), 1117 policyIds, 1118 payload, 1119 } satisfies ManualReviewJob; 1120 1121 job.data = convertedJobData; 1122 // cast here is safe bc TS checked above that convertedJobData is a ManualReviewJob 1123 return job as Job<ManualReviewJob>; 1124 } 1125 1126 /** 1127 * TODO: remove when we no longer need to support legacy jobs 1128 */ 1129 async legacyItemWithTypeIdentifierToItemSubmissionWithTypeIdentifier( 1130 orgId: string, 1131 legacyItem: LegacyItemWithTypeIdentifier | ItemSubmissionWithTypeIdentifier, 1132 ): Promise<ItemSubmissionWithTypeIdentifier> { 1133 if ('submissionId' in legacyItem) { 1134 return legacyItem; 1135 } 1136 1137 const itemType = await this.moderationConfigService.getItemType({ 1138 orgId, 1139 itemTypeSelector: legacyItem.typeIdentifier, 1140 }); 1141 1142 if (!itemType) { 1143 throw new Error('Item type not found'); 1144 } 1145 1146 return instantiateOpaqueType<ItemSubmissionWithTypeIdentifier>({ 1147 itemId: legacyItem.id, 1148 itemTypeIdentifier: legacyItem.typeIdentifier, 1149 data: legacyItem.data, 1150 submissionId: makeSubmissionId(), 1151 creator: 1152 itemType.kind === 'CONTENT' 1153 ? getFieldValueForRole( 1154 itemType.schema, 1155 itemType.schemaFieldRoles, 1156 'creatorId', 1157 legacyItem.data, 1158 ) 1159 : undefined, 1160 }); 1161 } 1162 1163 /** 1164 * Attempts to move the job to completed using the given lock, but falls back 1165 * to removing the job manually. This will fail if a different Worker has a 1166 * lock on the job. 1167 */ 1168 async removeJob(opts: { 1169 orgId: string; 1170 queueId: string; 1171 jobId: JobId; 1172 lockToken: string; 1173 }) { 1174 try { 1175 await this.#markLockedJobCompleted(opts); 1176 } catch (error: unknown) { 1177 // The most common case where this throws if the lock token has expired, 1178 // so try to remove it manually. 1179 const { orgId, queueId, jobId } = opts; 1180 const queue = await this.getOrCreateBullQueue({ orgId, queueId }); 1181 const bullJobId = parseExternalId(jobId).bullId; 1182 const removeJobStatus = await queue.remove(bullJobId); 1183 if (removeJobStatus !== 1) { 1184 throw new Error('Failed to remove job'); 1185 } 1186 } 1187 } 1188 1189 /** 1190 * If the job is not found, silently does nothing. 1191 */ 1192 async #markLockedJobCompleted(opts: { 1193 orgId: string; 1194 queueId: string; 1195 jobId: JobId; 1196 lockToken: string; 1197 }) { 1198 const { orgId, queueId, lockToken, jobId } = opts; 1199 const queue = await this.getOrCreateBullQueue({ orgId, queueId }); 1200 const job = await this.#getJob(jobId, queue); 1201 1202 await job?.moveToCompleted(null, lockToken, false); 1203 } 1204 1205 /** 1206 * Releases the lock on a job and moves it back to the waiting state. 1207 * This is used when a job is skipped - we want to release the lock 1208 * so the job can be reviewed later, rather than waiting for it to 1209 * become stalled. 1210 */ 1211 async releaseJobLock(opts: { 1212 orgId: string; 1213 queueId: string; 1214 jobId: JobId; 1215 lockToken: string; 1216 }) { 1217 const { orgId, queueId, lockToken, jobId } = opts; 1218 try { 1219 const queue = await this.getOrCreateBullQueue({ orgId, queueId }); 1220 const job = await this.#getJob(jobId, queue); 1221 1222 if (!job) { 1223 // Job not found, nothing to release 1224 return; 1225 } 1226 1227 // Move the job to delayed state with timestamp = now (0 delay) 1228 // This releases the lock and makes the job immediately available 1229 // The token parameter ensures only the holder of the lock can release it 1230 await job.moveToDelayed(Date.now(), lockToken); 1231 } catch (error: unknown) { 1232 // If the lock has already expired or the job is in a different state, 1233 // we can safely ignore the error as the job is already released 1234 // or will be handled by the stalled job checker 1235 } 1236 } 1237 1238 async getPendingJobCount(opts: { orgId: string; queueId: string }) { 1239 const { orgId, queueId } = opts; 1240 const queue = await this.#getBullQueue(orgId, queueId); 1241 1242 // Returns the number of waiting or delayed jobs 1243 // https://api.docs.bullmq.io/classes/Queue.html#count 1244 return queue.count(); 1245 } 1246 1247 async getOldestJobCreatedAt(opts: { 1248 orgId: string; 1249 queueId: string; 1250 isAppealsQueue: boolean; 1251 }): Promise<Date | null> { 1252 const { orgId, queueId, isAppealsQueue } = opts; 1253 const queue = isAppealsQueue 1254 ? await this.#getBullAppealQueue(orgId, queueId) 1255 : await this.#getBullQueue(orgId, queueId); 1256 1257 // Get the first waiting job and first delayed job 1258 // BullMQ maintains FIFO order within each state, so we only need to compare 1259 // the first job from each state to find the oldest overall 1260 const [waitingJobs, delayedJobs] = await Promise.all([ 1261 queue.getJobs(['waiting'], 0, 0), 1262 queue.getJobs(['delayed'], 0, 0), 1263 ]); 1264 1265 // If no jobs exist in either state, return null 1266 if (waitingJobs.length === 0 && delayedJobs.length === 0) { 1267 return null; 1268 } 1269 1270 // If only one type exists, return it 1271 if (waitingJobs.length === 0) return delayedJobs[0].data.createdAt; 1272 if (delayedJobs.length === 0) return waitingJobs[0].data.createdAt; 1273 1274 // Both exist, return the older one 1275 const waitingTime = new Date(waitingJobs[0].data.createdAt).getTime(); 1276 const delayedTime = new Date(delayedJobs[0].data.createdAt).getTime(); 1277 return waitingTime < delayedTime 1278 ? waitingJobs[0].data.createdAt 1279 : delayedJobs[0].data.createdAt; 1280 } 1281 1282 async close() { 1283 return Promise.all([ 1284 this.getOrCreateBullQueue.close(), 1285 this.getBullWorker.close(), 1286 this.getOrCreateBullAppealQueue.close(), 1287 this.getBullAppealWorker.close(), 1288 ]); 1289 } 1290 1291 async getExistingJobsForItem(opts: { 1292 orgId: string; 1293 itemId: string; 1294 itemTypeId: string; 1295 }) { 1296 const { orgId, itemId, itemTypeId } = opts; 1297 // Check postgres for creations within the last 7 days so we don't have to 1298 // search every bull queue for every item. 1299 const recentJobCreationQueues = await this.pgQuery 1300 .selectFrom('manual_review_tool.job_creations') 1301 .select(['queue_id']) 1302 .where('org_id', '=', orgId) 1303 .where('item_id', '=', itemId) 1304 .where('item_type_id', '=', itemTypeId) 1305 .where('created_at', '>=', new Date(Date.now() - WEEK_MS)) 1306 .execute(); 1307 const jobsWithQueue = await Promise.all( 1308 recentJobCreationQueues.map(async (rows) => { 1309 const queueId = rows.queue_id; 1310 const queue = await this.getOrCreateBullQueue({ orgId, queueId }); 1311 const legacyJob = await queue.getJob( 1312 itemIdToBullJobId({ id: itemId, typeId: itemTypeId }), 1313 ); 1314 if (legacyJob) { 1315 const job = await this.legacyJobToJob(legacyJob, orgId); 1316 return { 1317 job: job.data, 1318 queueId, 1319 }; 1320 } 1321 return undefined; 1322 }), 1323 ); 1324 1325 return filterNullOrUndefined(jobsWithQueue); 1326 } 1327 1328 async getHiddenActionsForQueue(opts: { queueId: string; orgId: string }) { 1329 const { queueId, orgId } = opts; 1330 return ( 1331 await this.pgQuery 1332 .selectFrom('manual_review_tool.queues_and_hidden_actions') 1333 .select(['action_id']) 1334 .where('queue_id', '=', queueId) 1335 .where('org_id', '=', orgId) 1336 .execute() 1337 ).map((it) => it.action_id); 1338 } 1339 1340 // NB: Making the transaction required means this function should only be used 1341 // inside another transaction. 1342 async updateHiddenActionsForQueue(opts: { 1343 queueId: string; 1344 actionIdsToHide: readonly string[]; 1345 actionIdsToUnhide: readonly string[]; 1346 orgId: string; 1347 transaction?: Transaction<ManualReviewToolServicePg>; 1348 }) { 1349 const { transaction, queueId, actionIdsToHide, actionIdsToUnhide, orgId } = 1350 opts; 1351 1352 if (actionIdsToHide.length === 0 && actionIdsToUnhide.length === 0) { 1353 return undefined; 1354 } 1355 1356 const queryInterface = transaction ?? this.pgQuery; 1357 1358 if (actionIdsToHide.length > 0) { 1359 await queryInterface 1360 .insertInto('manual_review_tool.queues_and_hidden_actions') 1361 .values( 1362 actionIdsToHide.map((actionId) => ({ 1363 queue_id: queueId, 1364 action_id: actionId, 1365 org_id: orgId, 1366 })), 1367 ) 1368 .execute(); 1369 } 1370 1371 if (actionIdsToUnhide.length > 0) { 1372 await queryInterface 1373 .deleteFrom('manual_review_tool.queues_and_hidden_actions') 1374 .where('queue_id', '=', queueId) 1375 .where('org_id', '=', orgId) 1376 .where('action_id', 'in', actionIdsToUnhide) 1377 .execute(); 1378 } 1379 } 1380} 1381 1382/** 1383 * We want Bull to dedupe jobs on the same item, so this function maps each 1384 * distinct item identifier object to a string that can be used as a Bull job id. 1385 * Uses '.' as separator because BullMQ v5 disallows ':' in custom job ids. 1386 * 1387 * NB: only exported for use in tests. 1388 * @private 1389 */ 1390const BULL_JOB_ID_SEPARATOR = '.'; 1391 1392export function itemIdToBullJobId({ typeId, id }: ItemIdentifier) { 1393 if (!typeId || !id) { 1394 throw new Error('itemTypeId and itemId cannot be empty strings'); 1395 } 1396 1397 return instantiateOpaqueType<BullJobId>( 1398 [typeId, id].map(b64UrlEncode).join(BULL_JOB_ID_SEPARATOR), 1399 ); 1400} 1401 1402/** 1403 * Because BullJobIds are intentionally not globally unique (across time and 1404 * queues), this function takes a BullJobId and combines it with a guid to make 1405 * a truly unique id. 1406 * 1407 * NB: only exported for use in tests. 1408 * @private 1409 */ 1410export function bullJobIdtoExternalJobId( 1411 bullJobId: BullJobId, 1412 guid = uuidv1(), 1413) { 1414 if (!guid) { 1415 throw new Error('guid cannot be empty.'); 1416 } 1417 1418 return instantiateOpaqueType<JobId>( 1419 [bullJobId, guid].map(b64UrlEncode).join(':'), 1420 ); 1421} 1422 1423/** 1424 * This function extracts the components of a JobId, so that we can look up the 1425 * job in Bull by its BullJobId. 1426 * 1427 * NB: only exported for use in tests. 1428 * @private 1429 */ 1430export function parseExternalId(externalId: JobId) { 1431 const idParts = externalId.split(':') as [ 1432 B64UrlOf<BullJobId>, 1433 B64UrlOf<string>, 1434 ]; 1435 return { bullId: b64UrlDecode(idParts[0]), guid: b64UrlDecode(idParts[1]) }; 1436} 1437 1438/** 1439 * This returns a UUID derived from an external JobId. In theory, this shouldn't 1440 * be necessary -- anything that needs to be logged as associated with the job 1441 * can just use the JobId directly -- but, historically, we've just used this 1442 * derived uuid (e.g., when logging decisions), because it's shorter. 1443 */ 1444export function jobIdToGuid(jobId: JobId) { 1445 return parseExternalId(jobId).guid; 1446} 1447 1448type RedisConnection = IORedis.Redis | Cluster; 1449 1450export async function getBullWorker<JobData = unknown>( 1451 redisConnection: RedisConnection, 1452 queueKey: QueueKey, 1453) { 1454 const { queueId, orgId } = queueKey; 1455 const worker = new Worker( 1456 queueId, 1457 // An empty processor function is necessary to create 1458 // the timer manager that's used in checkStalledJobs 1459 async (_) => { 1460 return null; 1461 }, 1462 { 1463 connection: redisConnection, 1464 lockDuration: 600000, 1465 prefix: getPrefix(orgId), 1466 autorun: false, 1467 // A job is put into stalled when a user claims it and then doesn't action 1468 // on it for the lockDuration. When the max limit is hit, Bull puts the 1469 // the job into a failed queue which isn't visible to the user and then 1470 // never dequeues again. This can lead to bad situations like a report 1471 // never being actioned on, or re-enqueues of a job looking like they're 1472 // not working. The default is 1, and 50 should be a large enough number 1473 // that no reasonable user should ever hit it. 1474 maxStalledCount: 50, 1475 }, 1476 ); 1477 // Start the stalled jobs checker for manual job processing 1478 await worker.startStalledCheckTimer(); 1479 1480 // Cast worker to a version of its original type, but fixed to correctly 1481 // indicate that getNextJob() can return undefined 1482 return worker as unknown as Omit<Worker<JobData>, 'getNextJob'> & { 1483 getNextJob: (lockToken: string) => Promise<Job<JobData> | undefined>; 1484 }; 1485} 1486 1487/** 1488 * NB: this is called getOrCreateBullQueue because BullMQ will silently create 1489 * a queue object (certainly in memory and maybe even leaving some traces in 1490 * Redis) if this is called with a queue id that doesn't exist yet. 1491 */ 1492export async function getOrCreateBullQueue<JobData = unknown>( 1493 redisConnection: RedisConnection, 1494 queueKey: QueueKey, 1495) { 1496 const { orgId, queueId } = queueKey; 1497 return new Queue<JobData>(queueId, { 1498 connection: redisConnection, 1499 prefix: getPrefix(orgId), 1500 }); 1501} 1502 1503/** 1504 * Constructs a prefix out of the orgId to use for automatic sharding. 1505 * Using the org ID ensures that all hashes in the same Org are stored together. 1506 */ 1507function getPrefix(orgId: string) { 1508 return '{' + orgId + '}'; 1509} 1510 1511export const makeDeleteAllJobsInsufficientPermissionsError = ( 1512 data: ErrorInstanceData, 1513) => 1514 new CoopError({ 1515 status: 403, 1516 type: [ErrorType.Unauthorized], 1517 title: String(data.detail), 1518 name: 'DeleteAllJobsUnauthorizedError', 1519 ...data, 1520 }); 1521 1522const makeQueueDoesNotExistError = (data: ErrorInstanceData) => { 1523 return new CoopError({ 1524 status: 400, 1525 type: [ErrorType.InvalidUserInput], 1526 title: 'The queue with this ID does not exist', 1527 name: 'QueueDoesNotExistError', 1528 ...data, 1529 }); 1530}; 1531 1532export const makeUnableToDeleteDefaultQueueError = ( 1533 data: ErrorInstanceData, 1534) => { 1535 return new CoopError({ 1536 status: 403, 1537 type: [ErrorType.Unauthorized], 1538 title: 'Unable to delete default queue', 1539 name: 'UnableToDeleteDefaultQueueError', 1540 ...data, 1541 }); 1542}; 1543 1544export const makeManualReviewQueueNameExistsError = (data: ErrorInstanceData) => 1545 new CoopError({ 1546 status: 409, 1547 type: [ErrorType.UniqueViolation], 1548 title: 1549 'A manual review queue with that name already exists in this organization.', 1550 name: 'ManualReviewQueueNameExistsError', 1551 ...data, 1552 });