Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/* eslint-disable max-lines */
2
3import { SpanStatusCode } from '@opentelemetry/api';
4import { type ConsumerDirectives } from '../../lib/cache/index.js';
5import { type ItemIdentifier } from '@roostorg/types';
6import { type Kysely } from 'kysely';
7import _ from 'lodash';
8import { type Opaque } from 'type-fest';
9
10import { type Dependencies } from '../../iocContainer/index.js';
11import {
12 type Invoker,
13 type UserPermission,
14} from '../../models/types/permissioning.js';
15import { jsonStringify } from '../../utils/encoding.js';
16import { isCoopErrorOfType } from '../../utils/errors.js';
17import { isUniqueViolationError } from '../../utils/kysely.js';
18import type { OmitEach, ReplaceDeep } from '../../utils/typescript-types.js';
19import {
20 getFieldValueForRole,
21 getFieldValueOrValues,
22 type NormalizedItemData,
23} from '../itemProcessingService/index.js';
24import { type ItemSubmissionWithTypeIdentifier } from '../itemProcessingService/makeItemSubmissionWithTypeIdentifier.js';
25import { type ModerationConfigService } from '../moderationConfigService/index.js';
26import { type PartialItemsService } from '../partialItemsService/index.js';
27import {
28 type UserScore,
29 type UserStatisticsService,
30} from '../userStatisticsService/userStatisticsService.js';
31import { type ManualReviewToolServicePg } from './dbTypes.js';
32import AppealsJobRouting from './modules/AppealsJobRouting.js';
33import CommentOperations from './modules/CommentOperations.js';
34import DecisionAnalytics, {
35 type DecisionCountsInput,
36 type DecisionCountsTableInput,
37 type JobCountsInput,
38 type JobCreationsInput,
39 type RecentDecisionsFilterInput,
40 type TimeToActionInput,
41} from './modules/DecisionAnalytics.js';
42import JobDecisioning, {
43 type OnRecordDecisionInput,
44 type SubmitDecisionInput,
45} from './modules/JobDecisioning.js';
46import JobEnrichment, {
47 type ManualReviewAppealJobInput,
48 type ManualReviewJobInput,
49} from './modules/JobEnrichment.js';
50import JobRendering from './modules/JobRendering.js';
51import JobRouting, {
52 type CreateRoutingRuleInput,
53 type ReorderRoutingRulesInput,
54 type RoutingRuleWithoutVersion,
55 type UpdateRoutingRuleInput,
56} from './modules/JobRouting.js';
57import ManualReviewToolSettings from './modules/ManualReviewToolSettings.js';
58import QueueOperations, {
59 type ManualReviewQueue,
60} from './modules/QueueOperations.js';
61import SkipOperations, {
62 type SkippedJobCountInput,
63} from './modules/SkipOperations.js';
64
65// An id that's unique across all jobs ever added to any queue (pending or not).
66// This is the id that's passed into the MRT Service by callers to identify a
67// job, and that's exposed to callers when a job is returned from this service.
68// Note that this is not the same as the id of the job within Bull, which is
69// managed as an implementation detail of the QueueOperations submodule. That
70// module creates these JobId values when a Job is added to Bull.
71export type JobId = Opaque<string, 'JobId'>;
72
73export type ManualReviewJob = {
74 id: JobId;
75 orgId: string;
76 createdAt: Date;
77 payload: ManualReviewJobPayload;
78 reenqueuedFrom?: OriginJobInfo;
79 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo;
80 // NB: represents the policy under which the job was added (e.g., the policy
81 // it was reported for violating.
82 policyIds: string[];
83};
84
85export type ManualReviewAppealJob = {
86 id: JobId;
87 orgId: string;
88 createdAt: Date;
89 payload: ManualReviewAppealJobPayload;
90 reenqueuedFrom?: OriginJobInfo;
91 enqueueSourceInfo?: AppealEnqueueSourceInfo;
92 // NB: represents the policy under which the job was added (e.g., the policy
93 // it was reported for violating.
94 policyIds: string[];
95};
96
97export type ManualReviewJobOrAppeal = ManualReviewJob | ManualReviewAppealJob;
98
99// Old MRT jobs (created before roughly Sept 2023) included this
100// "LegacyItemWithTypeIdentifier" type in their payloads, rather than including
101// full `ItemSubmissionWithTypeIdentifier` objects. While these these jobs will
102// cycle out of redis over time, their payload was also saved forever in
103// postgres (in the decisions table), so we can't remove this type and update
104// the `ManualReviewJobPayload` type until _both_ all the old jobs have been
105// dequeued from redis and we've manually migrated the records in the decisions
106// table. We should do that soon because synthesizing submissionIds from the
107// postgres decisions _on read_ is a little sketch (the synthesized ids won't be
108// stable across reads and will have the wrong embedded date if we aren't
109// careful.)
110export type LegacyItemWithTypeIdentifier = {
111 id: string;
112 data: NormalizedItemData;
113 typeIdentifier: {
114 id: string;
115 version: string;
116 schemaVariant: 'original' | 'partial';
117 };
118};
119
120// The type of stored, legacy MRT jobs -- in both redis and the pg decisions
121// table -- per comment above.
122// TODO: migrate and delete. Also delete the date filter in getDecidedJob().
123export type StoredManualReviewJob =
124 | ManualReviewJob
125 | ReplaceDeep<
126 Omit<ManualReviewJob, 'policyIds' | 'payload'> & {
127 payload: ManualReviewJobPayload & {
128 policyId?: string;
129 };
130 },
131 ItemSubmissionWithTypeIdentifier,
132 LegacyItemWithTypeIdentifier | ItemSubmissionWithTypeIdentifier
133 >;
134
135export type ReportHistory = Array<{
136 reporterId?: ItemIdentifier;
137 reason?: string;
138 reportId: string;
139 reportedAt: Date;
140 policyId?: string;
141}>;
142
143export type ContentManualReviewJobPayload = {
144 kind: 'DEFAULT';
145 item: ItemSubmissionWithTypeIdentifier;
146 userScore?: UserScore;
147 itemThreadContentItems?: ItemSubmissionWithTypeIdentifier[];
148 additionalContentItems?: ItemSubmissionWithTypeIdentifier[];
149 reportedForReason?: string;
150 reporterIdentifier?: ItemIdentifier;
151 reportedForReasons?: Array<{ reporterId?: ItemIdentifier; reason?: string }>;
152 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo;
153 reportHistory: ReportHistory;
154};
155
156export type UserManualReviewJobPayload = {
157 kind: 'DEFAULT';
158 item: ItemSubmissionWithTypeIdentifier;
159 userScore?: UserScore;
160 itemThreadContentItems?: ItemSubmissionWithTypeIdentifier[];
161 additionalContentItems?: ItemSubmissionWithTypeIdentifier[];
162 reporterIdentifier?: ItemIdentifier;
163 reportedItems?: ItemIdentifier[];
164 reportedForReasons?: Array<{ reporterId?: ItemIdentifier; reason?: string }>;
165 reportHistory: ReportHistory;
166 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo;
167};
168
169export type ThreadManualReviewJobPayload = {
170 kind: 'DEFAULT';
171 item: ItemSubmissionWithTypeIdentifier;
172 reportedForReason?: string;
173 reporterIdentifier?: ItemIdentifier;
174 reportedForReasons?: Array<{ reporterId?: ItemIdentifier; reason?: string }>;
175 reportHistory: ReportHistory;
176 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo;
177};
178
179export type NcmecContentItemSubmission = {
180 contentItem: ItemSubmissionWithTypeIdentifier;
181 isConfirmedCSAM: boolean;
182 isReported: boolean;
183};
184
185export type NcmecManualReviewJobPayload = {
186 kind: 'NCMEC';
187 item: ItemSubmissionWithTypeIdentifier; // the user being reviewed
188 allMediaItems: NcmecContentItemSubmission[]; // all the user's media from the last 30 days
189 userScore?: UserScore;
190 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo;
191 reportHistory: ReportHistory;
192};
193
194export type ContentAppealReviewJobPayload = {
195 kind: 'APPEAL';
196 item: ItemSubmissionWithTypeIdentifier;
197 appealId: string;
198 userScore?: UserScore;
199 additionalContentItems?: ItemSubmissionWithTypeIdentifier[];
200 appealerIdentifier?: ItemIdentifier;
201 enqueueSourceInfo?: AppealEnqueueSourceInfo;
202 appliedRulesIds?: string[];
203 actionsTaken?: string[];
204 appealReason?: string;
205};
206
207export type UserAppealReviewJobPayload = {
208 kind: 'APPEAL';
209 item: ItemSubmissionWithTypeIdentifier;
210 appealId: string;
211 userScore?: UserScore;
212 additionalContentItems?: ItemSubmissionWithTypeIdentifier[];
213 reportedItems?: ItemIdentifier[];
214 appealerIdentifier?: ItemIdentifier;
215 appealReason?: string;
216 enqueueSourceInfo?: AppealEnqueueSourceInfo;
217 appliedRulesIds?: string[];
218 actionsTaken?: string[];
219};
220
221export type ThreadAppealReviewJobPayload = {
222 kind: 'APPEAL';
223 item: ItemSubmissionWithTypeIdentifier;
224 appealId: string;
225 appealerIdentifier?: ItemIdentifier;
226 enqueueSourceInfo?: AppealEnqueueSourceInfo;
227 appliedRulesIds?: string[];
228 actionsTaken?: string[];
229 appealReason?: string;
230};
231
232export type ManualReviewAppealJobPayload =
233 | ThreadAppealReviewJobPayload
234 | ContentAppealReviewJobPayload
235 | UserAppealReviewJobPayload;
236
237export type ManualReviewJobPayload =
238 | ContentManualReviewJobPayload
239 | UserManualReviewJobPayload
240 | ThreadManualReviewJobPayload
241 | NcmecManualReviewJobPayload;
242
243export type ManualReviewJobEnqueueSource =
244 | 'APPEAL'
245 | 'REPORT'
246 | 'RULE_EXECUTION'
247 | 'MRT_JOB'
248 | 'POST_ACTIONS';
249
250export type RuleExecutionEnqueueSourceInfo = {
251 kind: 'RULE_EXECUTION';
252 rules: string[];
253};
254export type ReportEnqueueSourceInfo = {
255 kind: 'REPORT';
256};
257export type AppealEnqueueSourceInfo = {
258 kind: 'APPEAL';
259};
260export type MrtJobEnqueueSourceInfo = { kind: 'MRT_JOB' };
261export type PostActionsEnqueueSourceInfo = { kind: 'POST_ACTIONS' };
262
263export type ManualReviewJobEnqueueSourceInfo =
264 | ReportEnqueueSourceInfo
265 | RuleExecutionEnqueueSourceInfo
266 | MrtJobEnqueueSourceInfo
267 | PostActionsEnqueueSourceInfo;
268
269export type OriginJobInfo = {
270 jobId: JobId;
271};
272
273export type ManualReviewJobKind = ManualReviewJobPayload['kind'];
274
275export class ManualReviewToolService {
276 private readonly queueOps: QueueOperations;
277 private readonly jobRendering: JobRendering;
278 private readonly jobRouting: JobRouting;
279 private readonly appealsJobRouting: AppealsJobRouting;
280 private readonly jobEnrichment: JobEnrichment;
281 private readonly jobDecisioning: JobDecisioning;
282 private readonly decisionAnalytics: DecisionAnalytics;
283 private readonly manualReviewToolSettings: ManualReviewToolSettings;
284 private readonly commentOps: CommentOperations;
285 private readonly skipOps: SkipOperations;
286
287 constructor(
288 readonly redis: Dependencies['IORedis'],
289 readonly ruleEvaluator: Dependencies['RuleEvaluator'],
290 readonly routingRuleExecutionLogger: Dependencies['RoutingRuleExecutionLogger'],
291 readonly pgQuery: Kysely<ManualReviewToolServicePg>,
292 readonly pgQueryReadReplica: Kysely<ManualReviewToolServicePg>,
293 readonly userStatisticsService: UserStatisticsService,
294 readonly getCustomActionsByIds: Dependencies['getActionsByIdEventuallyConsistent'],
295 private readonly tracer: Dependencies['Tracer'],
296 private readonly moderationConfigService: ModerationConfigService,
297 readonly partialItemsService: PartialItemsService,
298 readonly onRecordDecision: (params: OnRecordDecisionInput) => Promise<void>,
299 private readonly onEnqueue: (
300 input: ManualReviewJobInput | ManualReviewAppealJobInput,
301 queueId: string,
302 ) => Promise<void>,
303 ) {
304 this.queueOps = new QueueOperations(
305 pgQuery,
306 pgQueryReadReplica,
307 moderationConfigService,
308 redis,
309 );
310 this.jobEnrichment = new JobEnrichment(
311 partialItemsService,
312 userStatisticsService,
313 );
314 this.jobRouting = new JobRouting(
315 pgQuery,
316 this.queueOps,
317 moderationConfigService,
318 ruleEvaluator,
319 routingRuleExecutionLogger,
320 );
321 this.appealsJobRouting = new AppealsJobRouting(
322 pgQuery,
323 this.queueOps,
324 moderationConfigService,
325 ruleEvaluator,
326 //routingRuleExecutionLogger,
327 );
328 this.jobDecisioning = new JobDecisioning(
329 this.queueOps,
330 pgQuery,
331 getCustomActionsByIds,
332 onRecordDecision,
333 moderationConfigService,
334 this.tracer,
335 );
336 this.jobRendering = new JobRendering(pgQuery);
337 this.decisionAnalytics = new DecisionAnalytics(pgQueryReadReplica);
338 this.manualReviewToolSettings = new ManualReviewToolSettings(pgQuery);
339 this.commentOps = new CommentOperations(pgQuery);
340 this.skipOps = new SkipOperations(pgQuery);
341 }
342
343 /**
344 * The payload for enqueue isn't exactly correct - we only accept
345 * itemThreadContentItems if the item represents a content type and the items
346 * inside them must also be content. Because of how we get the items out of
347 * Kysely as individual fields losing their correlation and some limitations
348 * of TS, we check in this function that these are true and throw if not
349 * rather than updating the function signature to be narrower because we would
350 * need to tell TS that the input matches what we expect rather than have it
351 * check for us which defeats the purpse.
352 *
353 * If a queueId is provided, it will skip routing and insert into that queue directly.
354 */
355 async enqueue(input: ManualReviewJobInput, queueId?: string) {
356 const MAX_ENQUEUE_ATTEMPTS = 5;
357
358 await this.tracer.addActiveSpan(
359 {
360 resource: 'mrtService',
361 operation: 'enqueue',
362 attributes: {
363 'mrtJob.enqueueSource': input.enqueueSource,
364 'mrtJob.orgId': input.orgId,
365 'mrtJob.itemIdentifier': jsonStringify({
366 type: input.payload.item.itemTypeIdentifier.id,
367 id: input.payload.item.itemId,
368 }),
369 'mrtJob.submissionId': input.payload.item.submissionId,
370 },
371 },
372 async (span) => {
373 let numAttemptsToEnqueue = 0;
374
375 const type = await this.moderationConfigService.getItemType({
376 orgId: input.orgId,
377 itemTypeSelector: input.payload.item.itemTypeIdentifier,
378 });
379 if (type === undefined) {
380 throw new Error(
381 `No item type for org ${input.orgId} with ID ${input.payload.item.itemTypeIdentifier.id}`,
382 );
383 }
384 const enrichedJobPayload = await this.jobEnrichment.enrichJobPayload(
385 input,
386 type,
387 );
388
389 // This function attempts to enqueue an MRT job, after running routing
390 // rules on it to get the destination queue. We know that, in the db,
391 // foreign key constraints prevent a routing rule from pointing to a
392 // non-existent queue. However, it's possible for the destination queue
393 // to have been deleted _between the time when the routing rules were
394 // loaded from the db and when the attempted enqueue takes place_.
395 // Moreover, because the routing rules are cached in memory, the time
396 // between them being loaded from the db and the enqueue can actually be
397 // relatively long, making this a plausible edge case.
398 //
399 // In that case, the call to `addJob` or `updateJobForQueue` will throw
400 // a queue does not exist error. We know, per the above, that the cause
401 // must be that we use ran a stale copy of the rules, so we load the
402 // latest routing rules (bypassing the cache with `maxAge: 0`) and retry
403 // the enqueue operation until we hit a maximum number of attempts.
404 //
405 // It's still technically possible for the queue to be deleted between
406 // the time we reload the rules and attempt the enqueue, but that should
407 // be exceedingly rare, as the time window in that case is much shorter
408 // than the amount of time that old rules live in memory due to caching.
409 // Even if this did happen, subsequent retries would again reload the
410 // rules, and users won't be _repeatedly_ deleting queues right in
411 // this brief window while the rules are running.
412 const attemptEnqueue = async (): Promise<
413 { job: ManualReviewJob; targetQueueForNewJob: string } | undefined
414 > => {
415 try {
416 const targetQueueForNewJob =
417 queueId ??
418 (await this.jobRouting.getQueueIdForJob({
419 orgId: input.orgId,
420 payload: enrichedJobPayload,
421 correlationId: input.correlationId,
422 policyIds: input.policyIds,
423 routingRuleCacheDirectives:
424 numAttemptsToEnqueue > 0 ? { maxAge: 0 } : undefined,
425 }));
426
427 const existingJobInSameQueue = await this.queueOps.getJobFromItemId(
428 {
429 orgId: input.orgId,
430 itemId: input.payload.item.itemId,
431 itemTypeId: input.payload.item.itemTypeIdentifier.id,
432 queueId: targetQueueForNewJob,
433 },
434 );
435
436 const finalJobPayload = existingJobInSameQueue
437 ? await this.#mergeJobPayloads(
438 input.orgId,
439 enrichedJobPayload,
440 existingJobInSameQueue.payload,
441 )
442 : enrichedJobPayload;
443
444 const job = existingJobInSameQueue
445 ? await this.queueOps.updateJobForQueue({
446 orgId: input.orgId,
447 queueId: targetQueueForNewJob,
448 jobId: existingJobInSameQueue.id,
449 data: {
450 ...existingJobInSameQueue,
451 payload: finalJobPayload,
452 },
453 })
454 : await this.queueOps.addJob({
455 orgId: input.orgId,
456 queueId: targetQueueForNewJob,
457 enqueueSourceInfo: input.enqueueSourceInfo,
458 jobPayload: {
459 ...input,
460 payload: finalJobPayload,
461 },
462 });
463
464 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
465 if (!job) {
466 // this means that we tried to update a job that was deleted
467 // between when we looked up the existing job and did the update.
468 // Just do nothing
469 return;
470 }
471
472 // log job creation/enqueue to postgres
473 this.pgQuery
474 .insertInto('manual_review_tool.job_creations')
475 .values({
476 id: job.id,
477 org_id: job.orgId,
478 item_id: job.payload.item.itemId,
479 item_type_id: job.payload.item.itemTypeIdentifier.id,
480 queue_id: targetQueueForNewJob,
481 // We use the Source Info from the input argument to account
482 // for the case that we are in fact updating a job which was
483 // never inserted into this table and did not have enqueue source
484 // info, but the updated job will.
485 enqueue_source_info: input.enqueueSourceInfo,
486 policy_ids: input.policyIds,
487 created_at: new Date(),
488 })
489 .execute()
490 .catch(() => {}); // don't throw if logging fails
491
492 return { targetQueueForNewJob, job };
493 } catch (e) {
494 if (
495 isCoopErrorOfType(e, 'QueueDoesNotExistError') &&
496 numAttemptsToEnqueue++ < MAX_ENQUEUE_ATTEMPTS
497 ) {
498 return attemptEnqueue();
499 }
500
501 if (isUniqueViolationError(e)) {
502 // This is actually expected behavior, because we'll try to log it when an
503 // attempt to enqueue a single item happens more than once in the
504 // same queue. Because of that, don't throw an error here and just
505 // return undefined, since we don't end up re-enqueuing the item
506 // (or anything else).
507 span.setStatus({ code: SpanStatusCode.OK });
508 return undefined;
509 }
510
511 throw e;
512 }
513 };
514
515 const enqueueResult = await attemptEnqueue();
516
517 // There are some edge cases where enqueue does nothing (see comment
518 // above). But, if the enqueue worked, call listeners asynchronously,
519 // not caring about whether they fail.
520 if (enqueueResult) {
521 const { targetQueueForNewJob } = enqueueResult;
522 this.onEnqueue(input, targetQueueForNewJob).catch(() => {});
523 }
524 },
525 );
526 }
527
528 async enqueueAppeal(input: ManualReviewAppealJobInput, queueId?: string) {
529 const MAX_ENQUEUE_ATTEMPTS = 5;
530
531 await this.tracer.addActiveSpan(
532 {
533 resource: 'mrtService',
534 operation: 'enqueueAppeal',
535 attributes: {
536 'mrtJob.enqueueSource': input.enqueueSource,
537 'mrtJob.orgId': input.orgId,
538 'mrtJob.itemIdentifier': jsonStringify({
539 type: input.payload.item.itemTypeIdentifier.id,
540 id: input.payload.item.itemId,
541 }),
542 'mrtJob.submissionId': input.payload.item.submissionId,
543 },
544 },
545 async (span) => {
546 let numAttemptsToEnqueue = 0;
547
548 const type = await this.moderationConfigService.getItemType({
549 orgId: input.orgId,
550 itemTypeSelector: input.payload.item.itemTypeIdentifier,
551 });
552 if (type === undefined) {
553 throw new Error(
554 `No item type for org ${input.orgId} with ID ${input.payload.item.itemTypeIdentifier.id}`,
555 );
556 }
557 const enrichedJobPayload = await this.jobEnrichment.enrichAppealPayload(
558 input,
559 );
560
561 const attemptAppealEnqueue = async (): Promise<
562 | { job: ManualReviewAppealJob; targetQueueForNewJob: string }
563 | undefined
564 > => {
565 try {
566 const targetQueueForNewJob =
567 queueId ??
568 (await this.appealsJobRouting.getQueueIdForJob({
569 orgId: input.orgId,
570 payload: enrichedJobPayload,
571 correlationId: input.correlationId,
572 policyIds: input.policyIds,
573 routingRuleCacheDirectives:
574 numAttemptsToEnqueue > 0 ? { maxAge: 0 } : undefined,
575 }));
576
577 const job = await this.queueOps.addAppealJob({
578 orgId: input.orgId,
579 queueId: targetQueueForNewJob,
580 enqueueSourceInfo: input.enqueueSourceInfo,
581 jobPayload: {
582 ...input,
583 payload: enrichedJobPayload,
584 },
585 });
586
587 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
588 if (!job) {
589 // this means that we tried to update a job that was deleted
590 // between when we looked up the existing job and did the update.
591 // Just do nothing
592 return;
593 }
594
595 this.pgQuery
596 .insertInto('manual_review_tool.job_creations')
597 .values({
598 id: job.id,
599 org_id: job.orgId,
600 item_id: job.payload.item.itemId,
601 item_type_id: job.payload.item.itemTypeIdentifier.id,
602 queue_id: targetQueueForNewJob,
603 enqueue_source_info: input.enqueueSourceInfo,
604 policy_ids: input.policyIds,
605 created_at: new Date(),
606 })
607 .execute()
608 .catch(() => {}); // don't throw if logging fails
609
610 return { targetQueueForNewJob, job };
611 } catch (e) {
612 if (
613 isCoopErrorOfType(e, 'QueueDoesNotExistError') &&
614 numAttemptsToEnqueue++ < MAX_ENQUEUE_ATTEMPTS
615 ) {
616 return attemptAppealEnqueue();
617 }
618
619 if (isUniqueViolationError(e)) {
620 // This is actually expected behavior, because we'll try to log it when an
621 // attempt to enqueue a single item happens more than once in the
622 // same queue. Because of that, don't throw an error here and just
623 // return undefined, since we don't end up re-enqueuing the item
624 // (or anything else).
625 span.setStatus({ code: SpanStatusCode.OK });
626 return undefined;
627 }
628
629 throw e;
630 }
631 };
632
633 const enqueueResult = await attemptAppealEnqueue();
634
635 // There are some edge cases where enqueue does nothing (see comment
636 // above). But, if the enqueue worked, call listeners asynchronously,
637 // not caring about whether they fail.
638 if (enqueueResult) {
639 const { targetQueueForNewJob } = enqueueResult;
640 this.onEnqueue(input, targetQueueForNewJob).catch(() => {});
641 }
642 },
643 );
644 }
645
646 async #mergeJobPayloads(
647 orgId: string,
648 newJob: ManualReviewJobPayload,
649 existingJob: ManualReviewJobPayload,
650 ): Promise<ManualReviewJobPayload> {
651 // we construct the set of reportIds here to make sure this logic is handled
652 // independently of any org's settings.
653 // Open question as to whether we
654 // should enforce that all org-specific merge logic happens strictly after
655 // any org-agnostic merge or whether it's fine to overwrite some of the
656 // org-specific as we do here.
657 const allReportHistories = newJob.reportHistory.concat(
658 existingJob.reportHistory,
659 );
660 const mergedJobPayload = this.#orgSpecificMergeJobs(
661 orgId,
662 newJob,
663 existingJob,
664 );
665 const finalJobPayload = {
666 ...mergedJobPayload,
667 reportHistory: Array.from(allReportHistories),
668 };
669 return finalJobPayload;
670 }
671
672 // eslint-disable-next-line complexity
673 #orgSpecificMergeJobs(
674 _orgId: string, // unused now, but keeping signature for compatibility / configurability
675 newJob: ManualReviewJobPayload,
676 existingJob: ManualReviewJobPayload,
677 ): ManualReviewJobPayload {
678 // For NCMEC jobs, always use the latest payload (re-sorted by ncmecEnqueueToMrt)
679 if (newJob.kind === 'NCMEC' && existingJob.kind === 'NCMEC') {
680 return newJob;
681 }
682
683 // For DEFAULT jobs, merge contextual fields
684 if (newJob.kind === 'DEFAULT' && existingJob.kind === 'DEFAULT') {
685 // Merge itemThreadContentItems (conversation context)
686 const mergedThreadItems =
687 ('itemThreadContentItems' in newJob || 'itemThreadContentItems' in existingJob)
688 ? _.uniqBy(
689 [
690 ...('itemThreadContentItems' in newJob
691 ? (newJob.itemThreadContentItems ?? [])
692 : []),
693 ...('itemThreadContentItems' in existingJob
694 ? (existingJob.itemThreadContentItems ?? [])
695 : []),
696 ],
697 (it) => jsonStringify([it.itemId, it.itemTypeIdentifier.id]),
698 )
699 : undefined;
700
701 // Merge reportedItems (list of items reported by users)
702 const mergedReportedItems =
703 ('reportedItems' in newJob || 'reportedItems' in existingJob)
704 ? _.uniqBy(
705 [
706 ...('reportedItems' in newJob ? (newJob.reportedItems ?? []) : []),
707 ...('reportedItems' in existingJob
708 ? (existingJob.reportedItems ?? [])
709 : []),
710 ],
711 (it) => jsonStringify([it.id, it.typeId]),
712 )
713 : undefined;
714
715 return {
716 ...newJob, // Use new job as base
717 ...(mergedThreadItems && mergedThreadItems.length > 0
718 ? { itemThreadContentItems: mergedThreadItems }
719 : {}),
720 ...(mergedReportedItems && mergedReportedItems.length > 0
721 ? { reportedItems: mergedReportedItems }
722 : {}),
723 };
724 }
725
726 // For all other cases, use new job
727 return newJob;
728 }
729
730 /**
731 * This method is used to get routing rules for a specific organization.
732 * @param orgId - The ID of the organization for which to fetch the routing rules.
733 * @returns A promise that resolves to an array of RoutingRule objects.
734 */
735 async getRoutingRules(opts: {
736 orgId: string;
737 directives?: ConsumerDirectives;
738 }) {
739 return this.jobRouting.getRoutingRules(opts);
740 }
741
742 /**
743 * This method is used to get appeals routing rules for a specific organization.
744 * @param orgId - The ID of the organization for which to fetch the routing rules.
745 * @returns A promise that resolves to an array of RoutingRule objects.
746 */
747 async getAppealsRoutingRules(opts: {
748 orgId: string;
749 directives?: ConsumerDirectives;
750 }) {
751 return this.appealsJobRouting.getAppealsRoutingRules(opts);
752 }
753
754 /**
755 * This method is used to create a new routing rule.
756 * @param input - An object containing the necessary parameters to create a new routing rule.
757 * @returns A promise that resolves to the newly created RoutingRule object.
758 * @throws {RoutingRuleNameExistsError} If the routing rule name already exists.
759 * @throws {QueueDoesNotExistError} If the destination queue does not exist.
760 */
761 async createRoutingRule(
762 input: CreateRoutingRuleInput,
763 ): Promise<RoutingRuleWithoutVersion> {
764 return input.isAppealsRule
765 ? this.appealsJobRouting.createAppealsRoutingRule(input)
766 : this.jobRouting.createRoutingRule(input);
767 }
768
769 /**
770 * This method is used to update an existing routing rule.
771 * @param input - An object containing the necessary parameters to update a routing rule.
772 * @returns A promise that resolves to the updated RoutingRule object.
773 * @throws {RoutingRuleNameExistsError} If the new routing rule name already exists.
774 * @throws {NotFoundError} If the routing rule to be updated was not found.
775 * @throws {QueueDoesNotExistError} If the destination queue does not exist.
776 */
777 async updateRoutingRule(
778 input: UpdateRoutingRuleInput,
779 ): Promise<RoutingRuleWithoutVersion> {
780 return input.isAppealsRule
781 ? this.appealsJobRouting.updateAppealsRoutingRule(input)
782 : this.jobRouting.updateRoutingRule(input);
783 }
784
785 /**
786 * This method is used to delete a routing rule.
787 * @param input - An object containing the id of the routing rule to be deleted.
788 * @returns A promise that resolves to a boolean indicating the success of the operation.
789 */
790 async deleteRoutingRule(input: { id: string; isAppealsRule?: boolean }) {
791 return input.isAppealsRule
792 ? this.appealsJobRouting.deleteAppealsRoutingRule({ id: input.id })
793 : this.jobRouting.deleteRoutingRule({ id: input.id });
794 }
795
796 async addAccessibleQueuesForUser(
797 userId: string,
798 queueIds: readonly string[],
799 ) {
800 return this.queueOps.addAccessibleQueuesForUser([userId], queueIds);
801 }
802
803 async removeAccessibleQueuesForUser(
804 userId: string,
805 queueIds: readonly string[],
806 ) {
807 return this.queueOps.removeAccessibleQueuesForUser(userId, queueIds);
808 }
809
810 /**
811 * This method is used to reorder routing rules.
812 * Under the hood, it runs a query similar to the following:
813 *
814 * UPDATE manual_review_tool.routing_rules
815 * SET sequence_number =
816 * CASE
817 * WHEN sequence_number = 2 THEN 1
818 * WHEN sequence_number = 1 THEN 2
819 * ELSE sequence_number
820 * END;
821 *
822 * @param input - An object containing the orgId and the new order of the routing rules.
823 * @returns A promise that resolves to an array of reordered RoutingRule objects.
824 */
825 async reorderRoutingRules(input: ReorderRoutingRulesInput) {
826 return input.isAppealsRule
827 ? this.appealsJobRouting.reorderAppealsRoutingRules(input)
828 : this.jobRouting.reorderRoutingRules(input);
829 }
830
831 async createManualReviewQueue(input: {
832 name: string;
833 description: string | null;
834 userIds: readonly string[];
835 hiddenActionIds: readonly string[];
836 invokedBy: Invoker;
837 isAppealsQueue: boolean;
838 autoCloseJobs?: boolean;
839 }): Promise<ManualReviewQueue> {
840 return this.queueOps.createManualReviewQueue(input);
841 }
842
843 async updateManualReviewQueue(input: {
844 orgId: string;
845 queueId: string;
846 name?: string;
847 description?: string | null;
848 userIds: readonly string[];
849 actionIdsToHide: readonly string[];
850 actionIdsToUnhide: readonly string[];
851 autoCloseJobs?: boolean;
852 }): Promise<ManualReviewQueue> {
853 return this.queueOps.updateManualReviewQueue(input);
854 }
855
856 async getDefaultQueueIdForOrg(orgId: string) {
857 return this.queueOps.getDefaultQueueIdForOrg(orgId);
858 }
859
860 /**
861 * This method returns all the queues that are the given user can view
862 * and review. Queues are only editable by users with the EditMrtQueues
863 * permission.
864 */
865 async getReviewableQueuesForUser(opts: {
866 invoker: Invoker;
867 }): Promise<ManualReviewQueue[]> {
868 return this.queueOps.getReviewableQueuesForUser(opts);
869 }
870
871 async getQueueForOrg(opts: {
872 orgId: string;
873 userId: string;
874 queueId: string;
875 }): Promise<ManualReviewQueue | undefined> {
876 return this.queueOps.getQueueForOrg(opts);
877 }
878
879 async getAllQueuesForOrgAndDangerouslyBypassPermissioning(opts: {
880 orgId: string;
881 }) {
882 return this.queueOps.getAllQueuesForOrgAndDangerouslyBypassPermissioning(
883 opts.orgId,
884 );
885 }
886
887 async getQueueForOrgAndDangerouslyBypassPermissioning(opts: {
888 orgId: string;
889 queueId: string;
890 }) {
891 return this.queueOps.getQueueForOrgAndDangerouslyBypassPermissioning(opts);
892 }
893
894 async getFavoriteQueuesForUser(opts: { orgId: string; userId: string }) {
895 return this.queueOps.getFavoriteQueuesForUser(opts);
896 }
897
898 async addFavoriteQueueForUser(opts: {
899 userId: string;
900 orgId: string;
901 queueId: string;
902 }) {
903 return this.queueOps.addFavoriteQueueForUser(opts);
904 }
905
906 async removeFavoriteQueueForUser(opts: {
907 userId: string;
908 orgId: string;
909 queueId: string;
910 }) {
911 return this.queueOps.removeFavoriteQueueForUser(opts);
912 }
913
914 /**
915 * @returns true when the queue that was trying to be deleted
916 * exists and is successfully deleted, false when the queue
917 * did not exist and throws when the delete fails for some reason.
918 * In case of failure, some jobs may still have been deleted.
919 */
920 async deleteManualReviewQueue(orgId: string, queueId: string) {
921 return this.queueOps.deleteManualReviewQueue(orgId, queueId);
922 }
923
924 async deleteManualReviewQueueForTestsDO_NOT_USE(
925 orgId: string,
926 queueId: string,
927 ) {
928 return this.queueOps.deleteManualReviewQueueForTestsDO_NOT_USE(
929 orgId,
930 queueId,
931 );
932 }
933
934 async getJobsForQueue(opts: {
935 orgId: string;
936 queueId: string;
937 jobIds: readonly string[];
938 isAppealsQueue?: boolean;
939 }) {
940 const { orgId, queueId, jobIds, isAppealsQueue = false } = opts;
941 return isAppealsQueue
942 ? this.queueOps.getAppealJobs({
943 orgId,
944 queueId,
945 jobIds: jobIds satisfies readonly string[] as readonly JobId[],
946 })
947 : this.queueOps.getJobs({
948 orgId,
949 queueId,
950 jobIds: jobIds satisfies readonly string[] as readonly JobId[],
951 });
952 }
953
954 async getAllJobsForQueue(opts: { orgId: string; queueId: string }) {
955 return this.queueOps.getAllJobsForQueue(opts);
956 }
957
958 async getPendingJobCount(opts: { orgId: string; queueId: string }) {
959 return this.queueOps.getPendingJobCount(opts);
960 }
961
962 async getOldestJobCreatedAt(opts: {
963 orgId: string;
964 queueId: string;
965 isAppealsQueue: boolean;
966 }) {
967 return this.queueOps.getOldestJobCreatedAt(opts);
968 }
969
970 async getHiddenFieldsForItemType(opts: {
971 orgId: string;
972 itemTypeId: string;
973 }) {
974 return this.jobRendering.getHiddenFieldsForItemType(opts);
975 }
976
977 async getIgnoreCallbackForOrg(orgId: string) {
978 return this.jobDecisioning.getIgnoreCallbackForOrg(orgId);
979 }
980
981 async setHiddenFieldsForItemType(opts: {
982 orgId: string;
983 itemTypeId: string;
984 hiddenFields: readonly string[];
985 }) {
986 return this.jobRendering.setHiddenFieldsForItemType(opts);
987 }
988
989 async getUsersWhoCanSeeQueue(opts: {
990 queueId: string;
991 userId: string;
992 orgId: string;
993 }) {
994 const { orgId, queueId } = opts;
995
996 return this.queueOps.getUsersWhoCanSeeQueue({ orgId, queueId });
997 }
998
999 async getDecisionTimeToAction(input: TimeToActionInput) {
1000 return this.decisionAnalytics.getTimeToAction(input);
1001 }
1002
1003 async getDecisionCounts(input: DecisionCountsInput) {
1004 return this.decisionAnalytics.getDecisionCounts(input);
1005 }
1006
1007 async getJobCreationCounts(input: JobCreationsInput) {
1008 return this.decisionAnalytics.getJobCreations(input);
1009 }
1010
1011 async getResolvedJobCounts(input: JobCountsInput) {
1012 return this.decisionAnalytics.getResolvedJobCounts(input);
1013 }
1014
1015 async getSkippedJobCounts(input: SkippedJobCountInput) {
1016 return this.skipOps.getSkippedJobCount(input);
1017 }
1018
1019 async getDecisionCountsTable(input: DecisionCountsTableInput) {
1020 return this.decisionAnalytics.getDecisionCountsTable(input);
1021 }
1022
1023 async getRecentDecisions(opts: {
1024 userPermissions: UserPermission[];
1025 orgId: string;
1026 input: RecentDecisionsFilterInput;
1027 }) {
1028 return this.decisionAnalytics.getRecentDecisions(opts);
1029 }
1030
1031 async getSkippedJobsForRecentDecisions(opts: {
1032 orgId: string;
1033 input: Omit<RecentDecisionsFilterInput, 'page' | 'startTime' | 'endTime'>;
1034 }) {
1035 return this.skipOps.getSkippedJobsForRecentDecisions(opts);
1036 }
1037
1038 async getExistingJobsForItem(opts: {
1039 orgId: string;
1040 itemId: string;
1041 itemTypeId: string;
1042 }) {
1043 return this.queueOps.getExistingJobsForItem(opts);
1044 }
1045
1046 async getDecidedJob(opts: { orgId: string; id: string }) {
1047 return this.decisionAnalytics.getDecidedJob(opts);
1048 }
1049
1050 async getDecidedJobFromJobId(opts: {
1051 orgId: string;
1052 jobId: string;
1053 userPermissions: UserPermission[];
1054 }) {
1055 return this.decisionAnalytics.getDecidedJobFromJobId(opts);
1056 }
1057
1058 async dequeueNextJob(opts: {
1059 orgId: string;
1060 queueId: string;
1061 userId: string;
1062 }) {
1063 const { orgId, queueId, userId } = opts;
1064 const queue =
1065 await this.queueOps.getQueueForOrgAndDangerouslyBypassPermissioning({
1066 orgId,
1067 queueId,
1068 });
1069
1070 let shouldBeAutoActioned = queue?.autoCloseJobs ?? false;
1071 let job = await this.queueOps.dequeueNextJobWithLock({
1072 orgId,
1073 queueId,
1074 lockToken: userId,
1075 });
1076 if (!shouldBeAutoActioned || !job) {
1077 return job;
1078 }
1079
1080 // Some orgs have configured queues to auto-delete jobs in which
1081 // the reported item has already been deleted in their system.
1082 // in this loop we check the partialItems endpoint provided by the org,
1083 // as well as the queue settings, to auto-action these jobs and return
1084 // the first job with a reported item that has not been deleted.
1085 while (shouldBeAutoActioned) {
1086 const freshItemInfo = await this.partialItemsService
1087 .getPartialItem(orgId, {
1088 id: job.job.payload.item.itemId,
1089 typeId: job.job.payload.item.itemTypeIdentifier.id,
1090 })
1091 .catch(() => null);
1092 if (!freshItemInfo) {
1093 return job;
1094 }
1095
1096 const isDeletedFieldRole =
1097 getFieldValueForRole(
1098 freshItemInfo.itemType.schema,
1099 freshItemInfo.itemType.schemaFieldRoles,
1100 'isDeleted',
1101 freshItemInfo.data,
1102 ) ?? false;
1103
1104 // TODO: This is a temporary solution to get the deleted field value
1105 // from legacy items that have not been migrated to the new schema.
1106 // Remove after ~48 hours.
1107 const deletedFieldValue =
1108 getFieldValueOrValues<'BOOLEAN'>(freshItemInfo.data, {
1109 type: 'BOOLEAN',
1110 name: 'deleted',
1111 required: false,
1112 container: null,
1113 })?.value ?? false;
1114
1115 // If a queue has auto-actioning enabled, and the reported item has been
1116 // deleted, we should auto-close the job and move on to the next one.
1117 shouldBeAutoActioned = deletedFieldValue || isDeletedFieldRole;
1118 if (!shouldBeAutoActioned) {
1119 return job;
1120 } else {
1121 await this.submitDecision({
1122 queueId,
1123 reportHistory: job.job.payload.reportHistory,
1124 jobId: job.job.id,
1125 lockToken: job.lockToken,
1126 automaticCloseDecision: {
1127 type: 'AUTOMATIC_CLOSE',
1128 reason: 'ITEM_DELETED_BEFORE_REVIEW',
1129 },
1130 relatedActions: [],
1131 orgId,
1132 });
1133
1134 job = await this.queueOps.dequeueNextJobWithLock({
1135 orgId,
1136 queueId,
1137 lockToken: userId,
1138 });
1139
1140 if (!job) {
1141 return job;
1142 }
1143 }
1144 }
1145 return null;
1146 }
1147
1148 async deleteAllJobsFromQueue(opts: {
1149 orgId: string;
1150 queueId: string;
1151 userPermissions: readonly UserPermission[];
1152 }) {
1153 return this.queueOps.deleteAllJobsFromQueue(opts);
1154 }
1155
1156 async submitDecision(
1157 opts: OmitEach<SubmitDecisionInput, 'jobId'> & { jobId: string },
1158 ) {
1159 // As submitDecision is a public method, we assume the passed in jobId is an
1160 // external id (which are the only kind that should leave the mrt service).
1161 return this.jobDecisioning.submitDecision(opts as SubmitDecisionInput);
1162 }
1163
1164 async getNcmecDecisions(opts: { startDate: Date; endDate: Date }) {
1165 return this.jobDecisioning.getNcmecDecisions(opts);
1166 }
1167
1168 async upsertDefaultSettings(opts: { orgId: string }) {
1169 return this.manualReviewToolSettings.upsertDefaultSettings(opts);
1170 }
1171
1172 async getRequiresPolicyForDecisions(orgId: string) {
1173 return this.manualReviewToolSettings.getRequiresPolicyForDecisions(orgId);
1174 }
1175
1176 async getHideSkipButtonForNonAdmins(orgId: string) {
1177 return this.manualReviewToolSettings.getHideSkipButtonForNonAdmins(orgId);
1178 }
1179
1180 async getRequiresDecisionReason(orgId: string) {
1181 return this.manualReviewToolSettings.getRequiresDecisionReason(orgId);
1182 }
1183
1184 async getPreviewJobsViewEnabled(orgId: string) {
1185 return this.manualReviewToolSettings.getPreviewJobsViewEnabled(orgId);
1186 }
1187
1188 async getJobComments(opts: { orgId: string; jobId: string }) {
1189 return this.commentOps.getComments(opts);
1190 }
1191
1192 async getJobCommentCount(opts: { orgId: string; jobId: string }) {
1193 return this.commentOps.getCommentCount(opts);
1194 }
1195
1196 async addJobComment(opts: {
1197 orgId: string;
1198 jobId: string;
1199 commentText: string;
1200 authorId: string;
1201 }) {
1202 return this.commentOps.addComment(opts);
1203 }
1204
1205 async deleteJobComment(opts: {
1206 orgId: string;
1207 jobId: string;
1208 userId: string;
1209 commentId: string;
1210 }) {
1211 return this.commentOps.deleteComment(opts);
1212 }
1213
1214 async getHiddenActionsForQueue(opts: { orgId: string; queueId: string }) {
1215 return this.queueOps.getHiddenActionsForQueue(opts);
1216 }
1217
1218 async updateHiddenActionsForQueue(opts: {
1219 orgId: string;
1220 actionIdsToHide: string[];
1221 actionIdsToUnhide: string[];
1222 queueId: string;
1223 }) {
1224 return this.queueOps.updateHiddenActionsForQueue(opts);
1225 }
1226
1227 async logSkip(opts: {
1228 orgId: string;
1229 queueId: string;
1230 jobId: string;
1231 userId: string;
1232 }) {
1233 await this.skipOps.logSkip(opts);
1234 }
1235
1236 async releaseJobLock(opts: {
1237 orgId: string;
1238 queueId: string;
1239 jobId: string;
1240 lockToken: string;
1241 }) {
1242 // As releaseJobLock is a public method, we assume the passed in jobId is an
1243 // external id (which are the only kind that should leave the mrt service).
1244 await this.queueOps.releaseJobLock(opts as {
1245 orgId: string;
1246 queueId: string;
1247 jobId: JobId;
1248 lockToken: string;
1249 });
1250 }
1251
1252 async close() {
1253 return Promise.all([this.queueOps.close(), this.jobRouting.close()]);
1254 }
1255}