Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 1255 lines 43 kB view raw
1/* eslint-disable max-lines */ 2 3import { SpanStatusCode } from '@opentelemetry/api'; 4import { type ConsumerDirectives } from '../../lib/cache/index.js'; 5import { type ItemIdentifier } from '@roostorg/types'; 6import { type Kysely } from 'kysely'; 7import _ from 'lodash'; 8import { type Opaque } from 'type-fest'; 9 10import { type Dependencies } from '../../iocContainer/index.js'; 11import { 12 type Invoker, 13 type UserPermission, 14} from '../../models/types/permissioning.js'; 15import { jsonStringify } from '../../utils/encoding.js'; 16import { isCoopErrorOfType } from '../../utils/errors.js'; 17import { isUniqueViolationError } from '../../utils/kysely.js'; 18import type { OmitEach, ReplaceDeep } from '../../utils/typescript-types.js'; 19import { 20 getFieldValueForRole, 21 getFieldValueOrValues, 22 type NormalizedItemData, 23} from '../itemProcessingService/index.js'; 24import { type ItemSubmissionWithTypeIdentifier } from '../itemProcessingService/makeItemSubmissionWithTypeIdentifier.js'; 25import { type ModerationConfigService } from '../moderationConfigService/index.js'; 26import { type PartialItemsService } from '../partialItemsService/index.js'; 27import { 28 type UserScore, 29 type UserStatisticsService, 30} from '../userStatisticsService/userStatisticsService.js'; 31import { type ManualReviewToolServicePg } from './dbTypes.js'; 32import AppealsJobRouting from './modules/AppealsJobRouting.js'; 33import CommentOperations from './modules/CommentOperations.js'; 34import DecisionAnalytics, { 35 type DecisionCountsInput, 36 type DecisionCountsTableInput, 37 type JobCountsInput, 38 type JobCreationsInput, 39 type RecentDecisionsFilterInput, 40 type TimeToActionInput, 41} from './modules/DecisionAnalytics.js'; 42import JobDecisioning, { 43 type OnRecordDecisionInput, 44 type SubmitDecisionInput, 45} from './modules/JobDecisioning.js'; 46import JobEnrichment, { 47 type ManualReviewAppealJobInput, 48 type ManualReviewJobInput, 49} from './modules/JobEnrichment.js'; 50import JobRendering from './modules/JobRendering.js'; 51import JobRouting, { 52 type CreateRoutingRuleInput, 53 type ReorderRoutingRulesInput, 54 type RoutingRuleWithoutVersion, 55 type UpdateRoutingRuleInput, 56} from './modules/JobRouting.js'; 57import ManualReviewToolSettings from './modules/ManualReviewToolSettings.js'; 58import QueueOperations, { 59 type ManualReviewQueue, 60} from './modules/QueueOperations.js'; 61import SkipOperations, { 62 type SkippedJobCountInput, 63} from './modules/SkipOperations.js'; 64 65// An id that's unique across all jobs ever added to any queue (pending or not). 66// This is the id that's passed into the MRT Service by callers to identify a 67// job, and that's exposed to callers when a job is returned from this service. 68// Note that this is not the same as the id of the job within Bull, which is 69// managed as an implementation detail of the QueueOperations submodule. That 70// module creates these JobId values when a Job is added to Bull. 71export type JobId = Opaque<string, 'JobId'>; 72 73export type ManualReviewJob = { 74 id: JobId; 75 orgId: string; 76 createdAt: Date; 77 payload: ManualReviewJobPayload; 78 reenqueuedFrom?: OriginJobInfo; 79 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo; 80 // NB: represents the policy under which the job was added (e.g., the policy 81 // it was reported for violating. 82 policyIds: string[]; 83}; 84 85export type ManualReviewAppealJob = { 86 id: JobId; 87 orgId: string; 88 createdAt: Date; 89 payload: ManualReviewAppealJobPayload; 90 reenqueuedFrom?: OriginJobInfo; 91 enqueueSourceInfo?: AppealEnqueueSourceInfo; 92 // NB: represents the policy under which the job was added (e.g., the policy 93 // it was reported for violating. 94 policyIds: string[]; 95}; 96 97export type ManualReviewJobOrAppeal = ManualReviewJob | ManualReviewAppealJob; 98 99// Old MRT jobs (created before roughly Sept 2023) included this 100// "LegacyItemWithTypeIdentifier" type in their payloads, rather than including 101// full `ItemSubmissionWithTypeIdentifier` objects. While these these jobs will 102// cycle out of redis over time, their payload was also saved forever in 103// postgres (in the decisions table), so we can't remove this type and update 104// the `ManualReviewJobPayload` type until _both_ all the old jobs have been 105// dequeued from redis and we've manually migrated the records in the decisions 106// table. We should do that soon because synthesizing submissionIds from the 107// postgres decisions _on read_ is a little sketch (the synthesized ids won't be 108// stable across reads and will have the wrong embedded date if we aren't 109// careful.) 110export type LegacyItemWithTypeIdentifier = { 111 id: string; 112 data: NormalizedItemData; 113 typeIdentifier: { 114 id: string; 115 version: string; 116 schemaVariant: 'original' | 'partial'; 117 }; 118}; 119 120// The type of stored, legacy MRT jobs -- in both redis and the pg decisions 121// table -- per comment above. 122// TODO: migrate and delete. Also delete the date filter in getDecidedJob(). 123export type StoredManualReviewJob = 124 | ManualReviewJob 125 | ReplaceDeep< 126 Omit<ManualReviewJob, 'policyIds' | 'payload'> & { 127 payload: ManualReviewJobPayload & { 128 policyId?: string; 129 }; 130 }, 131 ItemSubmissionWithTypeIdentifier, 132 LegacyItemWithTypeIdentifier | ItemSubmissionWithTypeIdentifier 133 >; 134 135export type ReportHistory = Array<{ 136 reporterId?: ItemIdentifier; 137 reason?: string; 138 reportId: string; 139 reportedAt: Date; 140 policyId?: string; 141}>; 142 143export type ContentManualReviewJobPayload = { 144 kind: 'DEFAULT'; 145 item: ItemSubmissionWithTypeIdentifier; 146 userScore?: UserScore; 147 itemThreadContentItems?: ItemSubmissionWithTypeIdentifier[]; 148 additionalContentItems?: ItemSubmissionWithTypeIdentifier[]; 149 reportedForReason?: string; 150 reporterIdentifier?: ItemIdentifier; 151 reportedForReasons?: Array<{ reporterId?: ItemIdentifier; reason?: string }>; 152 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo; 153 reportHistory: ReportHistory; 154}; 155 156export type UserManualReviewJobPayload = { 157 kind: 'DEFAULT'; 158 item: ItemSubmissionWithTypeIdentifier; 159 userScore?: UserScore; 160 itemThreadContentItems?: ItemSubmissionWithTypeIdentifier[]; 161 additionalContentItems?: ItemSubmissionWithTypeIdentifier[]; 162 reporterIdentifier?: ItemIdentifier; 163 reportedItems?: ItemIdentifier[]; 164 reportedForReasons?: Array<{ reporterId?: ItemIdentifier; reason?: string }>; 165 reportHistory: ReportHistory; 166 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo; 167}; 168 169export type ThreadManualReviewJobPayload = { 170 kind: 'DEFAULT'; 171 item: ItemSubmissionWithTypeIdentifier; 172 reportedForReason?: string; 173 reporterIdentifier?: ItemIdentifier; 174 reportedForReasons?: Array<{ reporterId?: ItemIdentifier; reason?: string }>; 175 reportHistory: ReportHistory; 176 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo; 177}; 178 179export type NcmecContentItemSubmission = { 180 contentItem: ItemSubmissionWithTypeIdentifier; 181 isConfirmedCSAM: boolean; 182 isReported: boolean; 183}; 184 185export type NcmecManualReviewJobPayload = { 186 kind: 'NCMEC'; 187 item: ItemSubmissionWithTypeIdentifier; // the user being reviewed 188 allMediaItems: NcmecContentItemSubmission[]; // all the user's media from the last 30 days 189 userScore?: UserScore; 190 enqueueSourceInfo?: ManualReviewJobEnqueueSourceInfo; 191 reportHistory: ReportHistory; 192}; 193 194export type ContentAppealReviewJobPayload = { 195 kind: 'APPEAL'; 196 item: ItemSubmissionWithTypeIdentifier; 197 appealId: string; 198 userScore?: UserScore; 199 additionalContentItems?: ItemSubmissionWithTypeIdentifier[]; 200 appealerIdentifier?: ItemIdentifier; 201 enqueueSourceInfo?: AppealEnqueueSourceInfo; 202 appliedRulesIds?: string[]; 203 actionsTaken?: string[]; 204 appealReason?: string; 205}; 206 207export type UserAppealReviewJobPayload = { 208 kind: 'APPEAL'; 209 item: ItemSubmissionWithTypeIdentifier; 210 appealId: string; 211 userScore?: UserScore; 212 additionalContentItems?: ItemSubmissionWithTypeIdentifier[]; 213 reportedItems?: ItemIdentifier[]; 214 appealerIdentifier?: ItemIdentifier; 215 appealReason?: string; 216 enqueueSourceInfo?: AppealEnqueueSourceInfo; 217 appliedRulesIds?: string[]; 218 actionsTaken?: string[]; 219}; 220 221export type ThreadAppealReviewJobPayload = { 222 kind: 'APPEAL'; 223 item: ItemSubmissionWithTypeIdentifier; 224 appealId: string; 225 appealerIdentifier?: ItemIdentifier; 226 enqueueSourceInfo?: AppealEnqueueSourceInfo; 227 appliedRulesIds?: string[]; 228 actionsTaken?: string[]; 229 appealReason?: string; 230}; 231 232export type ManualReviewAppealJobPayload = 233 | ThreadAppealReviewJobPayload 234 | ContentAppealReviewJobPayload 235 | UserAppealReviewJobPayload; 236 237export type ManualReviewJobPayload = 238 | ContentManualReviewJobPayload 239 | UserManualReviewJobPayload 240 | ThreadManualReviewJobPayload 241 | NcmecManualReviewJobPayload; 242 243export type ManualReviewJobEnqueueSource = 244 | 'APPEAL' 245 | 'REPORT' 246 | 'RULE_EXECUTION' 247 | 'MRT_JOB' 248 | 'POST_ACTIONS'; 249 250export type RuleExecutionEnqueueSourceInfo = { 251 kind: 'RULE_EXECUTION'; 252 rules: string[]; 253}; 254export type ReportEnqueueSourceInfo = { 255 kind: 'REPORT'; 256}; 257export type AppealEnqueueSourceInfo = { 258 kind: 'APPEAL'; 259}; 260export type MrtJobEnqueueSourceInfo = { kind: 'MRT_JOB' }; 261export type PostActionsEnqueueSourceInfo = { kind: 'POST_ACTIONS' }; 262 263export type ManualReviewJobEnqueueSourceInfo = 264 | ReportEnqueueSourceInfo 265 | RuleExecutionEnqueueSourceInfo 266 | MrtJobEnqueueSourceInfo 267 | PostActionsEnqueueSourceInfo; 268 269export type OriginJobInfo = { 270 jobId: JobId; 271}; 272 273export type ManualReviewJobKind = ManualReviewJobPayload['kind']; 274 275export class ManualReviewToolService { 276 private readonly queueOps: QueueOperations; 277 private readonly jobRendering: JobRendering; 278 private readonly jobRouting: JobRouting; 279 private readonly appealsJobRouting: AppealsJobRouting; 280 private readonly jobEnrichment: JobEnrichment; 281 private readonly jobDecisioning: JobDecisioning; 282 private readonly decisionAnalytics: DecisionAnalytics; 283 private readonly manualReviewToolSettings: ManualReviewToolSettings; 284 private readonly commentOps: CommentOperations; 285 private readonly skipOps: SkipOperations; 286 287 constructor( 288 readonly redis: Dependencies['IORedis'], 289 readonly ruleEvaluator: Dependencies['RuleEvaluator'], 290 readonly routingRuleExecutionLogger: Dependencies['RoutingRuleExecutionLogger'], 291 readonly pgQuery: Kysely<ManualReviewToolServicePg>, 292 readonly pgQueryReadReplica: Kysely<ManualReviewToolServicePg>, 293 readonly userStatisticsService: UserStatisticsService, 294 readonly getCustomActionsByIds: Dependencies['getActionsByIdEventuallyConsistent'], 295 private readonly tracer: Dependencies['Tracer'], 296 private readonly moderationConfigService: ModerationConfigService, 297 readonly partialItemsService: PartialItemsService, 298 readonly onRecordDecision: (params: OnRecordDecisionInput) => Promise<void>, 299 private readonly onEnqueue: ( 300 input: ManualReviewJobInput | ManualReviewAppealJobInput, 301 queueId: string, 302 ) => Promise<void>, 303 ) { 304 this.queueOps = new QueueOperations( 305 pgQuery, 306 pgQueryReadReplica, 307 moderationConfigService, 308 redis, 309 ); 310 this.jobEnrichment = new JobEnrichment( 311 partialItemsService, 312 userStatisticsService, 313 ); 314 this.jobRouting = new JobRouting( 315 pgQuery, 316 this.queueOps, 317 moderationConfigService, 318 ruleEvaluator, 319 routingRuleExecutionLogger, 320 ); 321 this.appealsJobRouting = new AppealsJobRouting( 322 pgQuery, 323 this.queueOps, 324 moderationConfigService, 325 ruleEvaluator, 326 //routingRuleExecutionLogger, 327 ); 328 this.jobDecisioning = new JobDecisioning( 329 this.queueOps, 330 pgQuery, 331 getCustomActionsByIds, 332 onRecordDecision, 333 moderationConfigService, 334 this.tracer, 335 ); 336 this.jobRendering = new JobRendering(pgQuery); 337 this.decisionAnalytics = new DecisionAnalytics(pgQueryReadReplica); 338 this.manualReviewToolSettings = new ManualReviewToolSettings(pgQuery); 339 this.commentOps = new CommentOperations(pgQuery); 340 this.skipOps = new SkipOperations(pgQuery); 341 } 342 343 /** 344 * The payload for enqueue isn't exactly correct - we only accept 345 * itemThreadContentItems if the item represents a content type and the items 346 * inside them must also be content. Because of how we get the items out of 347 * Kysely as individual fields losing their correlation and some limitations 348 * of TS, we check in this function that these are true and throw if not 349 * rather than updating the function signature to be narrower because we would 350 * need to tell TS that the input matches what we expect rather than have it 351 * check for us which defeats the purpse. 352 * 353 * If a queueId is provided, it will skip routing and insert into that queue directly. 354 */ 355 async enqueue(input: ManualReviewJobInput, queueId?: string) { 356 const MAX_ENQUEUE_ATTEMPTS = 5; 357 358 await this.tracer.addActiveSpan( 359 { 360 resource: 'mrtService', 361 operation: 'enqueue', 362 attributes: { 363 'mrtJob.enqueueSource': input.enqueueSource, 364 'mrtJob.orgId': input.orgId, 365 'mrtJob.itemIdentifier': jsonStringify({ 366 type: input.payload.item.itemTypeIdentifier.id, 367 id: input.payload.item.itemId, 368 }), 369 'mrtJob.submissionId': input.payload.item.submissionId, 370 }, 371 }, 372 async (span) => { 373 let numAttemptsToEnqueue = 0; 374 375 const type = await this.moderationConfigService.getItemType({ 376 orgId: input.orgId, 377 itemTypeSelector: input.payload.item.itemTypeIdentifier, 378 }); 379 if (type === undefined) { 380 throw new Error( 381 `No item type for org ${input.orgId} with ID ${input.payload.item.itemTypeIdentifier.id}`, 382 ); 383 } 384 const enrichedJobPayload = await this.jobEnrichment.enrichJobPayload( 385 input, 386 type, 387 ); 388 389 // This function attempts to enqueue an MRT job, after running routing 390 // rules on it to get the destination queue. We know that, in the db, 391 // foreign key constraints prevent a routing rule from pointing to a 392 // non-existent queue. However, it's possible for the destination queue 393 // to have been deleted _between the time when the routing rules were 394 // loaded from the db and when the attempted enqueue takes place_. 395 // Moreover, because the routing rules are cached in memory, the time 396 // between them being loaded from the db and the enqueue can actually be 397 // relatively long, making this a plausible edge case. 398 // 399 // In that case, the call to `addJob` or `updateJobForQueue` will throw 400 // a queue does not exist error. We know, per the above, that the cause 401 // must be that we use ran a stale copy of the rules, so we load the 402 // latest routing rules (bypassing the cache with `maxAge: 0`) and retry 403 // the enqueue operation until we hit a maximum number of attempts. 404 // 405 // It's still technically possible for the queue to be deleted between 406 // the time we reload the rules and attempt the enqueue, but that should 407 // be exceedingly rare, as the time window in that case is much shorter 408 // than the amount of time that old rules live in memory due to caching. 409 // Even if this did happen, subsequent retries would again reload the 410 // rules, and users won't be _repeatedly_ deleting queues right in 411 // this brief window while the rules are running. 412 const attemptEnqueue = async (): Promise< 413 { job: ManualReviewJob; targetQueueForNewJob: string } | undefined 414 > => { 415 try { 416 const targetQueueForNewJob = 417 queueId ?? 418 (await this.jobRouting.getQueueIdForJob({ 419 orgId: input.orgId, 420 payload: enrichedJobPayload, 421 correlationId: input.correlationId, 422 policyIds: input.policyIds, 423 routingRuleCacheDirectives: 424 numAttemptsToEnqueue > 0 ? { maxAge: 0 } : undefined, 425 })); 426 427 const existingJobInSameQueue = await this.queueOps.getJobFromItemId( 428 { 429 orgId: input.orgId, 430 itemId: input.payload.item.itemId, 431 itemTypeId: input.payload.item.itemTypeIdentifier.id, 432 queueId: targetQueueForNewJob, 433 }, 434 ); 435 436 const finalJobPayload = existingJobInSameQueue 437 ? await this.#mergeJobPayloads( 438 input.orgId, 439 enrichedJobPayload, 440 existingJobInSameQueue.payload, 441 ) 442 : enrichedJobPayload; 443 444 const job = existingJobInSameQueue 445 ? await this.queueOps.updateJobForQueue({ 446 orgId: input.orgId, 447 queueId: targetQueueForNewJob, 448 jobId: existingJobInSameQueue.id, 449 data: { 450 ...existingJobInSameQueue, 451 payload: finalJobPayload, 452 }, 453 }) 454 : await this.queueOps.addJob({ 455 orgId: input.orgId, 456 queueId: targetQueueForNewJob, 457 enqueueSourceInfo: input.enqueueSourceInfo, 458 jobPayload: { 459 ...input, 460 payload: finalJobPayload, 461 }, 462 }); 463 464 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 465 if (!job) { 466 // this means that we tried to update a job that was deleted 467 // between when we looked up the existing job and did the update. 468 // Just do nothing 469 return; 470 } 471 472 // log job creation/enqueue to postgres 473 this.pgQuery 474 .insertInto('manual_review_tool.job_creations') 475 .values({ 476 id: job.id, 477 org_id: job.orgId, 478 item_id: job.payload.item.itemId, 479 item_type_id: job.payload.item.itemTypeIdentifier.id, 480 queue_id: targetQueueForNewJob, 481 // We use the Source Info from the input argument to account 482 // for the case that we are in fact updating a job which was 483 // never inserted into this table and did not have enqueue source 484 // info, but the updated job will. 485 enqueue_source_info: input.enqueueSourceInfo, 486 policy_ids: input.policyIds, 487 created_at: new Date(), 488 }) 489 .execute() 490 .catch(() => {}); // don't throw if logging fails 491 492 return { targetQueueForNewJob, job }; 493 } catch (e) { 494 if ( 495 isCoopErrorOfType(e, 'QueueDoesNotExistError') && 496 numAttemptsToEnqueue++ < MAX_ENQUEUE_ATTEMPTS 497 ) { 498 return attemptEnqueue(); 499 } 500 501 if (isUniqueViolationError(e)) { 502 // This is actually expected behavior, because we'll try to log it when an 503 // attempt to enqueue a single item happens more than once in the 504 // same queue. Because of that, don't throw an error here and just 505 // return undefined, since we don't end up re-enqueuing the item 506 // (or anything else). 507 span.setStatus({ code: SpanStatusCode.OK }); 508 return undefined; 509 } 510 511 throw e; 512 } 513 }; 514 515 const enqueueResult = await attemptEnqueue(); 516 517 // There are some edge cases where enqueue does nothing (see comment 518 // above). But, if the enqueue worked, call listeners asynchronously, 519 // not caring about whether they fail. 520 if (enqueueResult) { 521 const { targetQueueForNewJob } = enqueueResult; 522 this.onEnqueue(input, targetQueueForNewJob).catch(() => {}); 523 } 524 }, 525 ); 526 } 527 528 async enqueueAppeal(input: ManualReviewAppealJobInput, queueId?: string) { 529 const MAX_ENQUEUE_ATTEMPTS = 5; 530 531 await this.tracer.addActiveSpan( 532 { 533 resource: 'mrtService', 534 operation: 'enqueueAppeal', 535 attributes: { 536 'mrtJob.enqueueSource': input.enqueueSource, 537 'mrtJob.orgId': input.orgId, 538 'mrtJob.itemIdentifier': jsonStringify({ 539 type: input.payload.item.itemTypeIdentifier.id, 540 id: input.payload.item.itemId, 541 }), 542 'mrtJob.submissionId': input.payload.item.submissionId, 543 }, 544 }, 545 async (span) => { 546 let numAttemptsToEnqueue = 0; 547 548 const type = await this.moderationConfigService.getItemType({ 549 orgId: input.orgId, 550 itemTypeSelector: input.payload.item.itemTypeIdentifier, 551 }); 552 if (type === undefined) { 553 throw new Error( 554 `No item type for org ${input.orgId} with ID ${input.payload.item.itemTypeIdentifier.id}`, 555 ); 556 } 557 const enrichedJobPayload = await this.jobEnrichment.enrichAppealPayload( 558 input, 559 ); 560 561 const attemptAppealEnqueue = async (): Promise< 562 | { job: ManualReviewAppealJob; targetQueueForNewJob: string } 563 | undefined 564 > => { 565 try { 566 const targetQueueForNewJob = 567 queueId ?? 568 (await this.appealsJobRouting.getQueueIdForJob({ 569 orgId: input.orgId, 570 payload: enrichedJobPayload, 571 correlationId: input.correlationId, 572 policyIds: input.policyIds, 573 routingRuleCacheDirectives: 574 numAttemptsToEnqueue > 0 ? { maxAge: 0 } : undefined, 575 })); 576 577 const job = await this.queueOps.addAppealJob({ 578 orgId: input.orgId, 579 queueId: targetQueueForNewJob, 580 enqueueSourceInfo: input.enqueueSourceInfo, 581 jobPayload: { 582 ...input, 583 payload: enrichedJobPayload, 584 }, 585 }); 586 587 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 588 if (!job) { 589 // this means that we tried to update a job that was deleted 590 // between when we looked up the existing job and did the update. 591 // Just do nothing 592 return; 593 } 594 595 this.pgQuery 596 .insertInto('manual_review_tool.job_creations') 597 .values({ 598 id: job.id, 599 org_id: job.orgId, 600 item_id: job.payload.item.itemId, 601 item_type_id: job.payload.item.itemTypeIdentifier.id, 602 queue_id: targetQueueForNewJob, 603 enqueue_source_info: input.enqueueSourceInfo, 604 policy_ids: input.policyIds, 605 created_at: new Date(), 606 }) 607 .execute() 608 .catch(() => {}); // don't throw if logging fails 609 610 return { targetQueueForNewJob, job }; 611 } catch (e) { 612 if ( 613 isCoopErrorOfType(e, 'QueueDoesNotExistError') && 614 numAttemptsToEnqueue++ < MAX_ENQUEUE_ATTEMPTS 615 ) { 616 return attemptAppealEnqueue(); 617 } 618 619 if (isUniqueViolationError(e)) { 620 // This is actually expected behavior, because we'll try to log it when an 621 // attempt to enqueue a single item happens more than once in the 622 // same queue. Because of that, don't throw an error here and just 623 // return undefined, since we don't end up re-enqueuing the item 624 // (or anything else). 625 span.setStatus({ code: SpanStatusCode.OK }); 626 return undefined; 627 } 628 629 throw e; 630 } 631 }; 632 633 const enqueueResult = await attemptAppealEnqueue(); 634 635 // There are some edge cases where enqueue does nothing (see comment 636 // above). But, if the enqueue worked, call listeners asynchronously, 637 // not caring about whether they fail. 638 if (enqueueResult) { 639 const { targetQueueForNewJob } = enqueueResult; 640 this.onEnqueue(input, targetQueueForNewJob).catch(() => {}); 641 } 642 }, 643 ); 644 } 645 646 async #mergeJobPayloads( 647 orgId: string, 648 newJob: ManualReviewJobPayload, 649 existingJob: ManualReviewJobPayload, 650 ): Promise<ManualReviewJobPayload> { 651 // we construct the set of reportIds here to make sure this logic is handled 652 // independently of any org's settings. 653 // Open question as to whether we 654 // should enforce that all org-specific merge logic happens strictly after 655 // any org-agnostic merge or whether it's fine to overwrite some of the 656 // org-specific as we do here. 657 const allReportHistories = newJob.reportHistory.concat( 658 existingJob.reportHistory, 659 ); 660 const mergedJobPayload = this.#orgSpecificMergeJobs( 661 orgId, 662 newJob, 663 existingJob, 664 ); 665 const finalJobPayload = { 666 ...mergedJobPayload, 667 reportHistory: Array.from(allReportHistories), 668 }; 669 return finalJobPayload; 670 } 671 672 // eslint-disable-next-line complexity 673 #orgSpecificMergeJobs( 674 _orgId: string, // unused now, but keeping signature for compatibility / configurability 675 newJob: ManualReviewJobPayload, 676 existingJob: ManualReviewJobPayload, 677 ): ManualReviewJobPayload { 678 // For NCMEC jobs, always use the latest payload (re-sorted by ncmecEnqueueToMrt) 679 if (newJob.kind === 'NCMEC' && existingJob.kind === 'NCMEC') { 680 return newJob; 681 } 682 683 // For DEFAULT jobs, merge contextual fields 684 if (newJob.kind === 'DEFAULT' && existingJob.kind === 'DEFAULT') { 685 // Merge itemThreadContentItems (conversation context) 686 const mergedThreadItems = 687 ('itemThreadContentItems' in newJob || 'itemThreadContentItems' in existingJob) 688 ? _.uniqBy( 689 [ 690 ...('itemThreadContentItems' in newJob 691 ? (newJob.itemThreadContentItems ?? []) 692 : []), 693 ...('itemThreadContentItems' in existingJob 694 ? (existingJob.itemThreadContentItems ?? []) 695 : []), 696 ], 697 (it) => jsonStringify([it.itemId, it.itemTypeIdentifier.id]), 698 ) 699 : undefined; 700 701 // Merge reportedItems (list of items reported by users) 702 const mergedReportedItems = 703 ('reportedItems' in newJob || 'reportedItems' in existingJob) 704 ? _.uniqBy( 705 [ 706 ...('reportedItems' in newJob ? (newJob.reportedItems ?? []) : []), 707 ...('reportedItems' in existingJob 708 ? (existingJob.reportedItems ?? []) 709 : []), 710 ], 711 (it) => jsonStringify([it.id, it.typeId]), 712 ) 713 : undefined; 714 715 return { 716 ...newJob, // Use new job as base 717 ...(mergedThreadItems && mergedThreadItems.length > 0 718 ? { itemThreadContentItems: mergedThreadItems } 719 : {}), 720 ...(mergedReportedItems && mergedReportedItems.length > 0 721 ? { reportedItems: mergedReportedItems } 722 : {}), 723 }; 724 } 725 726 // For all other cases, use new job 727 return newJob; 728 } 729 730 /** 731 * This method is used to get routing rules for a specific organization. 732 * @param orgId - The ID of the organization for which to fetch the routing rules. 733 * @returns A promise that resolves to an array of RoutingRule objects. 734 */ 735 async getRoutingRules(opts: { 736 orgId: string; 737 directives?: ConsumerDirectives; 738 }) { 739 return this.jobRouting.getRoutingRules(opts); 740 } 741 742 /** 743 * This method is used to get appeals routing rules for a specific organization. 744 * @param orgId - The ID of the organization for which to fetch the routing rules. 745 * @returns A promise that resolves to an array of RoutingRule objects. 746 */ 747 async getAppealsRoutingRules(opts: { 748 orgId: string; 749 directives?: ConsumerDirectives; 750 }) { 751 return this.appealsJobRouting.getAppealsRoutingRules(opts); 752 } 753 754 /** 755 * This method is used to create a new routing rule. 756 * @param input - An object containing the necessary parameters to create a new routing rule. 757 * @returns A promise that resolves to the newly created RoutingRule object. 758 * @throws {RoutingRuleNameExistsError} If the routing rule name already exists. 759 * @throws {QueueDoesNotExistError} If the destination queue does not exist. 760 */ 761 async createRoutingRule( 762 input: CreateRoutingRuleInput, 763 ): Promise<RoutingRuleWithoutVersion> { 764 return input.isAppealsRule 765 ? this.appealsJobRouting.createAppealsRoutingRule(input) 766 : this.jobRouting.createRoutingRule(input); 767 } 768 769 /** 770 * This method is used to update an existing routing rule. 771 * @param input - An object containing the necessary parameters to update a routing rule. 772 * @returns A promise that resolves to the updated RoutingRule object. 773 * @throws {RoutingRuleNameExistsError} If the new routing rule name already exists. 774 * @throws {NotFoundError} If the routing rule to be updated was not found. 775 * @throws {QueueDoesNotExistError} If the destination queue does not exist. 776 */ 777 async updateRoutingRule( 778 input: UpdateRoutingRuleInput, 779 ): Promise<RoutingRuleWithoutVersion> { 780 return input.isAppealsRule 781 ? this.appealsJobRouting.updateAppealsRoutingRule(input) 782 : this.jobRouting.updateRoutingRule(input); 783 } 784 785 /** 786 * This method is used to delete a routing rule. 787 * @param input - An object containing the id of the routing rule to be deleted. 788 * @returns A promise that resolves to a boolean indicating the success of the operation. 789 */ 790 async deleteRoutingRule(input: { id: string; isAppealsRule?: boolean }) { 791 return input.isAppealsRule 792 ? this.appealsJobRouting.deleteAppealsRoutingRule({ id: input.id }) 793 : this.jobRouting.deleteRoutingRule({ id: input.id }); 794 } 795 796 async addAccessibleQueuesForUser( 797 userId: string, 798 queueIds: readonly string[], 799 ) { 800 return this.queueOps.addAccessibleQueuesForUser([userId], queueIds); 801 } 802 803 async removeAccessibleQueuesForUser( 804 userId: string, 805 queueIds: readonly string[], 806 ) { 807 return this.queueOps.removeAccessibleQueuesForUser(userId, queueIds); 808 } 809 810 /** 811 * This method is used to reorder routing rules. 812 * Under the hood, it runs a query similar to the following: 813 * 814 * UPDATE manual_review_tool.routing_rules 815 * SET sequence_number = 816 * CASE 817 * WHEN sequence_number = 2 THEN 1 818 * WHEN sequence_number = 1 THEN 2 819 * ELSE sequence_number 820 * END; 821 * 822 * @param input - An object containing the orgId and the new order of the routing rules. 823 * @returns A promise that resolves to an array of reordered RoutingRule objects. 824 */ 825 async reorderRoutingRules(input: ReorderRoutingRulesInput) { 826 return input.isAppealsRule 827 ? this.appealsJobRouting.reorderAppealsRoutingRules(input) 828 : this.jobRouting.reorderRoutingRules(input); 829 } 830 831 async createManualReviewQueue(input: { 832 name: string; 833 description: string | null; 834 userIds: readonly string[]; 835 hiddenActionIds: readonly string[]; 836 invokedBy: Invoker; 837 isAppealsQueue: boolean; 838 autoCloseJobs?: boolean; 839 }): Promise<ManualReviewQueue> { 840 return this.queueOps.createManualReviewQueue(input); 841 } 842 843 async updateManualReviewQueue(input: { 844 orgId: string; 845 queueId: string; 846 name?: string; 847 description?: string | null; 848 userIds: readonly string[]; 849 actionIdsToHide: readonly string[]; 850 actionIdsToUnhide: readonly string[]; 851 autoCloseJobs?: boolean; 852 }): Promise<ManualReviewQueue> { 853 return this.queueOps.updateManualReviewQueue(input); 854 } 855 856 async getDefaultQueueIdForOrg(orgId: string) { 857 return this.queueOps.getDefaultQueueIdForOrg(orgId); 858 } 859 860 /** 861 * This method returns all the queues that are the given user can view 862 * and review. Queues are only editable by users with the EditMrtQueues 863 * permission. 864 */ 865 async getReviewableQueuesForUser(opts: { 866 invoker: Invoker; 867 }): Promise<ManualReviewQueue[]> { 868 return this.queueOps.getReviewableQueuesForUser(opts); 869 } 870 871 async getQueueForOrg(opts: { 872 orgId: string; 873 userId: string; 874 queueId: string; 875 }): Promise<ManualReviewQueue | undefined> { 876 return this.queueOps.getQueueForOrg(opts); 877 } 878 879 async getAllQueuesForOrgAndDangerouslyBypassPermissioning(opts: { 880 orgId: string; 881 }) { 882 return this.queueOps.getAllQueuesForOrgAndDangerouslyBypassPermissioning( 883 opts.orgId, 884 ); 885 } 886 887 async getQueueForOrgAndDangerouslyBypassPermissioning(opts: { 888 orgId: string; 889 queueId: string; 890 }) { 891 return this.queueOps.getQueueForOrgAndDangerouslyBypassPermissioning(opts); 892 } 893 894 async getFavoriteQueuesForUser(opts: { orgId: string; userId: string }) { 895 return this.queueOps.getFavoriteQueuesForUser(opts); 896 } 897 898 async addFavoriteQueueForUser(opts: { 899 userId: string; 900 orgId: string; 901 queueId: string; 902 }) { 903 return this.queueOps.addFavoriteQueueForUser(opts); 904 } 905 906 async removeFavoriteQueueForUser(opts: { 907 userId: string; 908 orgId: string; 909 queueId: string; 910 }) { 911 return this.queueOps.removeFavoriteQueueForUser(opts); 912 } 913 914 /** 915 * @returns true when the queue that was trying to be deleted 916 * exists and is successfully deleted, false when the queue 917 * did not exist and throws when the delete fails for some reason. 918 * In case of failure, some jobs may still have been deleted. 919 */ 920 async deleteManualReviewQueue(orgId: string, queueId: string) { 921 return this.queueOps.deleteManualReviewQueue(orgId, queueId); 922 } 923 924 async deleteManualReviewQueueForTestsDO_NOT_USE( 925 orgId: string, 926 queueId: string, 927 ) { 928 return this.queueOps.deleteManualReviewQueueForTestsDO_NOT_USE( 929 orgId, 930 queueId, 931 ); 932 } 933 934 async getJobsForQueue(opts: { 935 orgId: string; 936 queueId: string; 937 jobIds: readonly string[]; 938 isAppealsQueue?: boolean; 939 }) { 940 const { orgId, queueId, jobIds, isAppealsQueue = false } = opts; 941 return isAppealsQueue 942 ? this.queueOps.getAppealJobs({ 943 orgId, 944 queueId, 945 jobIds: jobIds satisfies readonly string[] as readonly JobId[], 946 }) 947 : this.queueOps.getJobs({ 948 orgId, 949 queueId, 950 jobIds: jobIds satisfies readonly string[] as readonly JobId[], 951 }); 952 } 953 954 async getAllJobsForQueue(opts: { orgId: string; queueId: string }) { 955 return this.queueOps.getAllJobsForQueue(opts); 956 } 957 958 async getPendingJobCount(opts: { orgId: string; queueId: string }) { 959 return this.queueOps.getPendingJobCount(opts); 960 } 961 962 async getOldestJobCreatedAt(opts: { 963 orgId: string; 964 queueId: string; 965 isAppealsQueue: boolean; 966 }) { 967 return this.queueOps.getOldestJobCreatedAt(opts); 968 } 969 970 async getHiddenFieldsForItemType(opts: { 971 orgId: string; 972 itemTypeId: string; 973 }) { 974 return this.jobRendering.getHiddenFieldsForItemType(opts); 975 } 976 977 async getIgnoreCallbackForOrg(orgId: string) { 978 return this.jobDecisioning.getIgnoreCallbackForOrg(orgId); 979 } 980 981 async setHiddenFieldsForItemType(opts: { 982 orgId: string; 983 itemTypeId: string; 984 hiddenFields: readonly string[]; 985 }) { 986 return this.jobRendering.setHiddenFieldsForItemType(opts); 987 } 988 989 async getUsersWhoCanSeeQueue(opts: { 990 queueId: string; 991 userId: string; 992 orgId: string; 993 }) { 994 const { orgId, queueId } = opts; 995 996 return this.queueOps.getUsersWhoCanSeeQueue({ orgId, queueId }); 997 } 998 999 async getDecisionTimeToAction(input: TimeToActionInput) { 1000 return this.decisionAnalytics.getTimeToAction(input); 1001 } 1002 1003 async getDecisionCounts(input: DecisionCountsInput) { 1004 return this.decisionAnalytics.getDecisionCounts(input); 1005 } 1006 1007 async getJobCreationCounts(input: JobCreationsInput) { 1008 return this.decisionAnalytics.getJobCreations(input); 1009 } 1010 1011 async getResolvedJobCounts(input: JobCountsInput) { 1012 return this.decisionAnalytics.getResolvedJobCounts(input); 1013 } 1014 1015 async getSkippedJobCounts(input: SkippedJobCountInput) { 1016 return this.skipOps.getSkippedJobCount(input); 1017 } 1018 1019 async getDecisionCountsTable(input: DecisionCountsTableInput) { 1020 return this.decisionAnalytics.getDecisionCountsTable(input); 1021 } 1022 1023 async getRecentDecisions(opts: { 1024 userPermissions: UserPermission[]; 1025 orgId: string; 1026 input: RecentDecisionsFilterInput; 1027 }) { 1028 return this.decisionAnalytics.getRecentDecisions(opts); 1029 } 1030 1031 async getSkippedJobsForRecentDecisions(opts: { 1032 orgId: string; 1033 input: Omit<RecentDecisionsFilterInput, 'page' | 'startTime' | 'endTime'>; 1034 }) { 1035 return this.skipOps.getSkippedJobsForRecentDecisions(opts); 1036 } 1037 1038 async getExistingJobsForItem(opts: { 1039 orgId: string; 1040 itemId: string; 1041 itemTypeId: string; 1042 }) { 1043 return this.queueOps.getExistingJobsForItem(opts); 1044 } 1045 1046 async getDecidedJob(opts: { orgId: string; id: string }) { 1047 return this.decisionAnalytics.getDecidedJob(opts); 1048 } 1049 1050 async getDecidedJobFromJobId(opts: { 1051 orgId: string; 1052 jobId: string; 1053 userPermissions: UserPermission[]; 1054 }) { 1055 return this.decisionAnalytics.getDecidedJobFromJobId(opts); 1056 } 1057 1058 async dequeueNextJob(opts: { 1059 orgId: string; 1060 queueId: string; 1061 userId: string; 1062 }) { 1063 const { orgId, queueId, userId } = opts; 1064 const queue = 1065 await this.queueOps.getQueueForOrgAndDangerouslyBypassPermissioning({ 1066 orgId, 1067 queueId, 1068 }); 1069 1070 let shouldBeAutoActioned = queue?.autoCloseJobs ?? false; 1071 let job = await this.queueOps.dequeueNextJobWithLock({ 1072 orgId, 1073 queueId, 1074 lockToken: userId, 1075 }); 1076 if (!shouldBeAutoActioned || !job) { 1077 return job; 1078 } 1079 1080 // Some orgs have configured queues to auto-delete jobs in which 1081 // the reported item has already been deleted in their system. 1082 // in this loop we check the partialItems endpoint provided by the org, 1083 // as well as the queue settings, to auto-action these jobs and return 1084 // the first job with a reported item that has not been deleted. 1085 while (shouldBeAutoActioned) { 1086 const freshItemInfo = await this.partialItemsService 1087 .getPartialItem(orgId, { 1088 id: job.job.payload.item.itemId, 1089 typeId: job.job.payload.item.itemTypeIdentifier.id, 1090 }) 1091 .catch(() => null); 1092 if (!freshItemInfo) { 1093 return job; 1094 } 1095 1096 const isDeletedFieldRole = 1097 getFieldValueForRole( 1098 freshItemInfo.itemType.schema, 1099 freshItemInfo.itemType.schemaFieldRoles, 1100 'isDeleted', 1101 freshItemInfo.data, 1102 ) ?? false; 1103 1104 // TODO: This is a temporary solution to get the deleted field value 1105 // from legacy items that have not been migrated to the new schema. 1106 // Remove after ~48 hours. 1107 const deletedFieldValue = 1108 getFieldValueOrValues<'BOOLEAN'>(freshItemInfo.data, { 1109 type: 'BOOLEAN', 1110 name: 'deleted', 1111 required: false, 1112 container: null, 1113 })?.value ?? false; 1114 1115 // If a queue has auto-actioning enabled, and the reported item has been 1116 // deleted, we should auto-close the job and move on to the next one. 1117 shouldBeAutoActioned = deletedFieldValue || isDeletedFieldRole; 1118 if (!shouldBeAutoActioned) { 1119 return job; 1120 } else { 1121 await this.submitDecision({ 1122 queueId, 1123 reportHistory: job.job.payload.reportHistory, 1124 jobId: job.job.id, 1125 lockToken: job.lockToken, 1126 automaticCloseDecision: { 1127 type: 'AUTOMATIC_CLOSE', 1128 reason: 'ITEM_DELETED_BEFORE_REVIEW', 1129 }, 1130 relatedActions: [], 1131 orgId, 1132 }); 1133 1134 job = await this.queueOps.dequeueNextJobWithLock({ 1135 orgId, 1136 queueId, 1137 lockToken: userId, 1138 }); 1139 1140 if (!job) { 1141 return job; 1142 } 1143 } 1144 } 1145 return null; 1146 } 1147 1148 async deleteAllJobsFromQueue(opts: { 1149 orgId: string; 1150 queueId: string; 1151 userPermissions: readonly UserPermission[]; 1152 }) { 1153 return this.queueOps.deleteAllJobsFromQueue(opts); 1154 } 1155 1156 async submitDecision( 1157 opts: OmitEach<SubmitDecisionInput, 'jobId'> & { jobId: string }, 1158 ) { 1159 // As submitDecision is a public method, we assume the passed in jobId is an 1160 // external id (which are the only kind that should leave the mrt service). 1161 return this.jobDecisioning.submitDecision(opts as SubmitDecisionInput); 1162 } 1163 1164 async getNcmecDecisions(opts: { startDate: Date; endDate: Date }) { 1165 return this.jobDecisioning.getNcmecDecisions(opts); 1166 } 1167 1168 async upsertDefaultSettings(opts: { orgId: string }) { 1169 return this.manualReviewToolSettings.upsertDefaultSettings(opts); 1170 } 1171 1172 async getRequiresPolicyForDecisions(orgId: string) { 1173 return this.manualReviewToolSettings.getRequiresPolicyForDecisions(orgId); 1174 } 1175 1176 async getHideSkipButtonForNonAdmins(orgId: string) { 1177 return this.manualReviewToolSettings.getHideSkipButtonForNonAdmins(orgId); 1178 } 1179 1180 async getRequiresDecisionReason(orgId: string) { 1181 return this.manualReviewToolSettings.getRequiresDecisionReason(orgId); 1182 } 1183 1184 async getPreviewJobsViewEnabled(orgId: string) { 1185 return this.manualReviewToolSettings.getPreviewJobsViewEnabled(orgId); 1186 } 1187 1188 async getJobComments(opts: { orgId: string; jobId: string }) { 1189 return this.commentOps.getComments(opts); 1190 } 1191 1192 async getJobCommentCount(opts: { orgId: string; jobId: string }) { 1193 return this.commentOps.getCommentCount(opts); 1194 } 1195 1196 async addJobComment(opts: { 1197 orgId: string; 1198 jobId: string; 1199 commentText: string; 1200 authorId: string; 1201 }) { 1202 return this.commentOps.addComment(opts); 1203 } 1204 1205 async deleteJobComment(opts: { 1206 orgId: string; 1207 jobId: string; 1208 userId: string; 1209 commentId: string; 1210 }) { 1211 return this.commentOps.deleteComment(opts); 1212 } 1213 1214 async getHiddenActionsForQueue(opts: { orgId: string; queueId: string }) { 1215 return this.queueOps.getHiddenActionsForQueue(opts); 1216 } 1217 1218 async updateHiddenActionsForQueue(opts: { 1219 orgId: string; 1220 actionIdsToHide: string[]; 1221 actionIdsToUnhide: string[]; 1222 queueId: string; 1223 }) { 1224 return this.queueOps.updateHiddenActionsForQueue(opts); 1225 } 1226 1227 async logSkip(opts: { 1228 orgId: string; 1229 queueId: string; 1230 jobId: string; 1231 userId: string; 1232 }) { 1233 await this.skipOps.logSkip(opts); 1234 } 1235 1236 async releaseJobLock(opts: { 1237 orgId: string; 1238 queueId: string; 1239 jobId: string; 1240 lockToken: string; 1241 }) { 1242 // As releaseJobLock is a public method, we assume the passed in jobId is an 1243 // external id (which are the only kind that should leave the mrt service). 1244 await this.queueOps.releaseJobLock(opts as { 1245 orgId: string; 1246 queueId: string; 1247 jobId: JobId; 1248 lockToken: string; 1249 }); 1250 } 1251 1252 async close() { 1253 return Promise.all([this.queueOps.close(), this.jobRouting.close()]); 1254 } 1255}