Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/* eslint-disable max-lines */
2import { sql, type Kysely } from 'kysely';
3import { type ReadonlyDeep } from 'type-fest';
4
5import { UserPermission } from '../../../models/types/permissioning.js';
6import { MONTH_MS } from '../../../utils/time.js';
7import { type ManualReviewToolServicePg } from '../dbTypes.js';
8import {
9 type ManualReviewJob,
10 type ManualReviewJobEnqueueSource,
11} from '../manualReviewToolService.js';
12import { type ManualReviewDecisionType } from './JobDecisioning.js';
13
14export type RecentDecisionsFilterInput = {
15 userSearchString?: string;
16 decisions?: readonly (
17 | {
18 type: Exclude<
19 ManualReviewDecisionType,
20 'CUSTOM_ACTION' | 'RELATED_ACTION'
21 >;
22 actionIds: undefined;
23 }
24 | {
25 type: 'CUSTOM_ACTION' | 'RELATED_ACTION';
26 actionIds: readonly string[];
27 }
28 )[];
29 policyIds?: readonly string[];
30 reviewerIds?: readonly string[];
31 queueIds?: readonly string[];
32 startTime?: Date;
33 endTime?: Date;
34 page: number;
35};
36
37export default class DecisionAnalytics {
38 constructor(private readonly pgQuery: Kysely<ManualReviewToolServicePg>) {}
39
40 async getDecisionCounts(input: DecisionCountsInput) {
41 const { orgId, groupBy, filterBy, timeDivision, timeZone } = input;
42 const { ref } = this.pgQuery.dynamic;
43 return this.pgQuery
44 .selectFrom('manual_review_tool.dim_mrt_decisions_materialized')
45 .select([
46 sql<string>`DATE_TRUNC(${timeDivision}, decided_at, ${timeZone})`.as(
47 'time',
48 ),
49 sql<number>`COUNT(DISTINCT (item_id, item_type_id))`.as('count'),
50 ])
51 .$if(groupBy.includes('action_id'), (qb) => qb.select('action_id'))
52 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id'))
53 .$if(groupBy.includes('type'), (qb) => qb.select(['action_id', 'type']))
54 .$if(groupBy.includes('reviewer_id'), (qb) => qb.select('reviewer_id'))
55 .$if(groupBy.includes('policy_id'), (qb) => qb.select('policy_id'))
56 .where((eb) => {
57 return eb.and([
58 eb('org_id', '=', orgId),
59 eb(
60 sql`decided_at AT TIME ZONE ${timeZone}`,
61 '>=',
62 filterBy.startDate,
63 ),
64 eb(sql`decided_at AT TIME ZONE ${timeZone}`, '<=', filterBy.endDate),
65 ...(filterBy.actionIds.length > 0 || filterBy.type.length > 0
66 ? [
67 eb.or([
68 ...(filterBy.actionIds.length > 0
69 ? [eb('action_id', 'in', filterBy.actionIds)]
70 : []),
71 ...(filterBy.type.length > 0
72 ? [eb('type', 'in', filterBy.type)]
73 : []),
74 ]),
75 ]
76 : []),
77 ...(filterBy.itemTypeIds.length > 0
78 ? [eb('item_type_id', 'in', filterBy.itemTypeIds)]
79 : []),
80 ...(filterBy.policyIds.length > 0
81 ? [eb('policy_id', 'in', filterBy.policyIds)]
82 : []),
83 ...(filterBy.reviewerIds.length > 0
84 ? [eb('reviewer_id', 'in', filterBy.reviewerIds)]
85 : []),
86 ...(filterBy.queueIds.length > 0
87 ? [eb('queue_id', 'in', filterBy.queueIds)]
88 : []),
89 // Ignores, NCMEC reports and re-enqueueing don't make any sense when
90 // grouping by policy ID
91 ...(groupBy.includes('policy_id')
92 ? [eb('type', 'in', ['CUSTOM_ACTION', 'RELATED_ACTION'])]
93 : []),
94 ...(filterBy.filteredDecisionActionType &&
95 filterBy.filteredDecisionActionType.includes('RELATED_ACTION')
96 ? [eb('type', 'not in', filterBy.filteredDecisionActionType)]
97 : []),
98 ...(filterBy.filteredDecisionActionType?.includes('CUSTOM_ACTION')
99 ? [eb('type', 'not in', filterBy.filteredDecisionActionType)]
100 : []),
101 ]);
102 })
103 .groupBy([
104 ...groupBy.map((it) => ref(it as string)).flat(),
105 'time',
106 ...(groupBy.includes('type') ? [ref('action_id')] : []),
107 ])
108 .execute();
109 }
110
111 async getDecisionCountsTable(input: DecisionCountsTableInput) {
112 const { orgId, groupBy, filterBy, timeZone } = input;
113 return this.pgQuery
114 .selectFrom('manual_review_tool.dim_mrt_decisions_materialized')
115 .select([
116 sql<number>`COUNT(DISTINCT (item_id, item_type_id))`.as('count'),
117 'action_id',
118 'type',
119 ])
120 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id'))
121 .$if(groupBy.includes('reviewer_id'), (qb) => qb.select('reviewer_id'))
122 .where((eb) => {
123 return eb.and([
124 eb('org_id', '=', orgId),
125 eb(
126 sql`decided_at AT TIME ZONE ${timeZone}`,
127 '>=',
128 filterBy.startDate,
129 ),
130 eb(sql`decided_at AT TIME ZONE ${timeZone}`, '<=', filterBy.endDate),
131 ...(filterBy.reviewerIds.length > 0
132 ? [eb('reviewer_id', 'in', filterBy.reviewerIds)]
133 : []),
134 ...(filterBy.queueIds.length > 0
135 ? [eb('queue_id', 'in', filterBy.queueIds)]
136 : []),
137 ]);
138 })
139 .groupBy([groupBy, 'action_id', 'type'])
140 .execute();
141 }
142
143 async getTimeToAction(input: TimeToActionInput) {
144 const { orgId, groupBy, filterBy } = input;
145 const { ref } = this.pgQuery.dynamic;
146 return this.pgQuery
147 .selectFrom('manual_review_tool.job_creations as creations')
148 .innerJoin(
149 'manual_review_tool.manual_review_decisions as decisions',
150 (join) =>
151 join.on((eb) =>
152 eb(
153 'creations.id',
154 '=',
155 eb.ref('decisions.job_payload', '->>').key('id'),
156 ),
157 ),
158 )
159 .select(({ fn, val }) =>
160 fn<number | null>('date_part', [
161 val('EPOCH'),
162 fn.avg<number | null>(({ eb, ref }) =>
163 eb('decisions.created_at', '-', ref('creations.created_at')),
164 ),
165 ]).as('time_to_action'),
166 )
167 .$if(groupBy.includes('queue_id'), (qb) =>
168 qb.select('decisions.queue_id as queue_id'),
169 )
170 .where((eb) => {
171 return eb.and([
172 eb('creations.org_id', '=', orgId),
173 eb('creations.created_at', '>=', filterBy.startDate),
174 eb('creations.created_at', '<=', filterBy.endDate),
175 ...(filterBy.queueIds.length > 0
176 ? [eb('decisions.queue_id', 'in', filterBy.queueIds)]
177 : []),
178 ]);
179 })
180 .$if(groupBy.length > 0, (qb) =>
181 qb.groupBy([
182 ...groupBy.map((it) => ref(`decisions.${it as string}`)).flat(),
183 ]),
184 )
185 .execute();
186 }
187
188 async getJobCreations(input: JobCreationsInput) {
189 const { groupBy, filterBy, orgId, timeDivision, timeZone } = input;
190
191 const { ref } = this.pgQuery.dynamic;
192 return this.pgQuery
193 .selectFrom('manual_review_tool.flattened_job_creations as creations')
194 .select([
195 sql<string>`date_trunc(${timeDivision}, created_at, ${timeZone})`.as(
196 'time',
197 ),
198 sql<number>`COUNT(DISTINCT(item_id, item_type_id))`.as('count'),
199 ])
200 .$if(groupBy.includes('item_type_id'), (qb) => qb.select('item_type_id'))
201 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id'))
202 .$if(groupBy.includes('policy_id'), (qb) => qb.select('policy_id'))
203 .$if(groupBy.includes('source'), (qb) =>
204 qb.select(['source_kind as source', 'rule_id']),
205 )
206 .where((eb) => {
207 return eb.and([
208 eb('org_id', '=', orgId),
209 eb('creations.created_at', '>=', filterBy.startDate),
210 eb('creations.created_at', '<=', filterBy.endDate),
211 ...(filterBy.itemTypeIds.length > 0
212 ? [eb('item_type_id', 'in', filterBy.itemTypeIds)]
213 : []),
214 ...(filterBy.queueIds.length > 0
215 ? [eb('queue_id', 'in', filterBy.queueIds)]
216 : []),
217 ...(filterBy.policyIds.length > 0
218 ? [eb('policy_id', 'in', filterBy.policyIds)]
219 : []),
220 ...(filterBy.sources.length > 0
221 ? [eb('source_kind', 'in', filterBy.sources)]
222 : []),
223 ...(filterBy.ruleIds.length > 0
224 ? [eb('rule_id', 'in', filterBy.ruleIds)]
225 : []),
226 ]);
227 })
228 .groupBy([
229 ...groupBy.map((it) => ref(it as string)).flat(),
230 'time',
231 ...(groupBy.includes('source') ? [ref('rule_id')] : []),
232 ])
233 .execute();
234 }
235
236 async getRecentDecisions(opts: {
237 userPermissions: UserPermission[];
238 orgId: string;
239 input: RecentDecisionsFilterInput;
240 }) {
241 const { userPermissions, orgId, input } = opts;
242 const {
243 userSearchString,
244 decisions: decisionsFilter,
245 policyIds,
246 reviewerIds,
247 queueIds,
248 startTime,
249 endTime,
250 page,
251 } = input;
252 const limit = 100;
253 const decisions = await this.pgQuery
254 .selectFrom('manual_review_tool.manual_review_decisions')
255 .select([
256 'id',
257 'queue_id',
258 'reviewer_id',
259 'decision_components',
260 'related_actions',
261 'created_at',
262 sql<string>`((job_payload->'payload'::text)->'item'::text) -> 'itemId'::text`.as(
263 'item_id',
264 ),
265 sql<string>`(((job_payload->'payload'::text)->'item'::text) -> 'itemTypeIdentifier'::text) ->> 'id'::text`.as(
266 'item_type_id',
267 ),
268 'decision_reason',
269 sql<string>`(job_payload->>'id')::text`.as('job_id'),
270 ])
271 .where('org_id', '=', orgId)
272 .where(({ eb, selectFrom }) => {
273 return eb.and([
274 ...(startTime ? [eb('created_at', '>=', new Date(startTime))] : []),
275 ...(endTime ? [eb('created_at', '<=', new Date(endTime))] : []),
276 ...(queueIds && queueIds.length > 0
277 ? [eb('queue_id', 'in', queueIds)]
278 : []),
279 ...(reviewerIds && reviewerIds.length > 0
280 ? [eb('reviewer_id', 'in', reviewerIds)]
281 : []),
282 ...(policyIds
283 ? [
284 eb.exists(
285 selectFrom(
286 sql`unnest(manual_review_tool.manual_review_decisions.decision_components)`.as(
287 'decision_component',
288 ),
289 )
290 .selectAll()
291 .where(
292 eb.or(
293 policyIds.map((policyId) =>
294 eb(
295 sql<string>`decision_component->>'policies'`,
296 'like',
297 `%"${policyId}"%`,
298 ),
299 ),
300 ),
301 ),
302 ),
303 ]
304 : []),
305 ...(decisionsFilter
306 ? [
307 eb.or(
308 decisionsFilter.flatMap((it) => [
309 eb.exists(
310 selectFrom(
311 sql`unnest(manual_review_tool.manual_review_decisions.decision_components)`.as(
312 'decision_component',
313 ),
314 )
315 .selectAll()
316 .where(
317 sql<string>`decision_component->>'type'`,
318 '=',
319 it.type,
320 )
321 .$if(it.actionIds !== undefined, (qb) =>
322 qb.where(
323 eb.or(
324 it.actionIds!.map((actionId) =>
325 eb(
326 sql<string>`decision_component->>'actions'`,
327 'like',
328 `%"${actionId}"%`,
329 ),
330 ),
331 ),
332 ),
333 ),
334 ),
335 ]),
336 ),
337 ]
338 : []),
339 ]);
340 })
341 .$if(userSearchString !== undefined, (qb) =>
342 // See https://stackoverflow.com/a/55607847
343 qb.where(({ and, eb, val }) =>
344 and([
345 eb('created_at', '>', val(new Date(Date.now() - 3 * MONTH_MS))),
346 eb(
347 sql<string>`(manual_review_tool.manual_review_decisions.job_payload->'payload'->'item'->>'itemId')`,
348 '=',
349 // Above, the 'itemId' field is of type jsonb, so we cast it to a string using ::text, but that
350 // cast will leave quotes around the resulting string because it's just stringifying what it thinks
351 // is a jsonb object. The easiest way to handle this is to just add quotes around the userSearchString
352 // to match the quotes in the value above.
353 val(`${userSearchString}`),
354 ),
355 ]),
356 ),
357 )
358 // If the user doesn't have the VIEW_CHILD_SAFETY_DATA permission, filter out decisions on
359 // all NCMEC jobs
360 .$if(
361 !userPermissions.includes(UserPermission.VIEW_CHILD_SAFETY_DATA),
362 (qb) =>
363 qb.where(({ eb, val }) =>
364 eb(
365 sql<string>`(job_payload->'payload'->'kind')::text`,
366 '!=',
367 val('"NCMEC"'),
368 ),
369 ),
370 )
371 .orderBy('created_at', 'desc')
372 .limit(limit)
373 .offset(page * limit)
374 .execute();
375 return decisions.map((decision) => ({
376 id: decision.id,
377 itemId: decision.item_id,
378 itemTypeId: decision.item_type_id,
379 queueId: decision.queue_id,
380 reviewerId: decision.reviewer_id,
381 decisions: decision.decision_components.map((it) => {
382 if (it.type !== 'CUSTOM_ACTION') {
383 return it;
384 }
385 return {
386 ...it,
387 actionIds: it.actions.map((it) => it.id),
388 policyIds: it.policies.map((it) => it.id),
389 itemTypeId: it.itemTypeId,
390 };
391 }),
392 relatedActions: decision.related_actions.map((action) => ({
393 ...action,
394 type: 'RELATED_ACTION' as const,
395 })),
396 createdAt: decision.created_at,
397 decisionReason: decision.decision_reason,
398 jobId: decision.job_id,
399 }));
400 }
401
402 async getResolvedJobCounts(input: JobCountsInput) {
403 const { orgId, groupBy, filterBy, timeDivision, timeZone } = input;
404 const { queueIds, reviewerIds, startDate, endDate } = filterBy;
405 const { ref } = this.pgQuery.dynamic;
406 return this.pgQuery
407 .selectFrom('manual_review_tool.manual_review_decisions')
408 .select([
409 sql<string>`date_trunc(${timeDivision}, created_at, ${timeZone})`.as(
410 'time',
411 ),
412 sql<number>`COUNT(DISTINCT id)`.as('count'),
413 ])
414 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id'))
415 .$if(groupBy.includes('reviewer_id'), (qb) => qb.select('reviewer_id'))
416 .where('org_id', '=', orgId)
417 .where((eb) => {
418 return eb.and([
419 eb(sql`created_at AT TIME ZONE ${timeZone}`, '>=', startDate),
420 eb(sql`created_at AT TIME ZONE ${timeZone}`, '<=', endDate),
421 ]);
422 })
423 .where(({ eb }) => {
424 return eb.and([
425 ...(queueIds.length > 0 ? [eb('queue_id', 'in', queueIds)] : []),
426 ...(reviewerIds.length > 0
427 ? [eb('reviewer_id', 'in', reviewerIds)]
428 : []),
429 ]);
430 })
431 .groupBy(['time', ...groupBy.map((it) => ref(it as string)).flat()])
432 .execute();
433 }
434
435 async getDecidedJob(opts: { orgId: string; id: string }) {
436 const { orgId, id } = opts;
437 const payload = await this.pgQuery
438 .selectFrom('manual_review_tool.manual_review_decisions')
439 .select(['job_payload'])
440 .where('created_at', '>=', new Date('2023-10-01'))
441 .where('org_id', '=', orgId)
442 .where('id', '=', id)
443 .executeTakeFirst();
444 // This is safe because only jobs created before Sept 2023 have
445 // the old legacy schema defined in StoredManualReviewJob (see
446 // the comment associated with the StoredManualReviewJob type).
447 // This query has a hardcoded filter to not include those old jobs.
448 return (payload?.job_payload ?? null) as ManualReviewJob | null;
449 }
450
451 async getDecidedJobFromJobId(opts: {
452 orgId: string;
453 jobId: string;
454 userPermissions: UserPermission[];
455 }) {
456 const { orgId, jobId, userPermissions } = opts;
457 const decisionWithPayload = await this.pgQuery
458 .selectFrom('manual_review_tool.manual_review_decisions')
459 .select([
460 'job_payload',
461 'id',
462 'queue_id',
463 'reviewer_id',
464 'decision_components',
465 'related_actions',
466 'created_at',
467 sql<string>`((job_payload->'payload'::text)->'item'::text) -> 'itemId'::text`.as(
468 'item_id',
469 ),
470 sql<string>`(((job_payload->'payload'::text)->'item'::text) -> 'itemTypeIdentifier'::text) ->> 'id'::text`.as(
471 'item_type_id',
472 ),
473 sql<string>`(job_payload->>'id')::text`.as('job_id'),
474 ])
475 .where('created_at', '>=', new Date('2023-10-01'))
476 .where('org_id', '=', orgId)
477 .where(sql<string>`(job_payload->>'id')::text`, '=', jobId)
478 .$if(
479 !userPermissions.includes(UserPermission.VIEW_CHILD_SAFETY_DATA),
480 (qb) =>
481 qb.where(({ eb, val }) =>
482 eb(
483 sql<string>`(job_payload->'payload'->'kind')::text`,
484 '!=',
485 val('"NCMEC"'),
486 ),
487 ),
488 )
489 .executeTakeFirst();
490 // This is safe because only jobs created before Sept 2023 have
491 // the old legacy schema defined in StoredManualReviewJob (see
492 // the comment associated with the StoredManualReviewJob type).
493 // This query has a hardcoded filter to not include those old jobs.
494 if (!decisionWithPayload) {
495 return null;
496 }
497 return {
498 job: decisionWithPayload.job_payload as ManualReviewJob,
499 decision: {
500 id: decisionWithPayload.id,
501 itemId: decisionWithPayload.item_id,
502 itemTypeId: decisionWithPayload.item_type_id,
503 queueId: decisionWithPayload.queue_id,
504 reviewerId: decisionWithPayload.reviewer_id,
505 decisions: decisionWithPayload.decision_components.map((it) => {
506 if (it.type !== 'CUSTOM_ACTION') {
507 return it;
508 }
509 return {
510 ...it,
511 actionIds: it.actions.map((it) => it.id),
512 policyIds: it.policies.map((it) => it.id),
513 itemTypeId: it.itemTypeId,
514 };
515 }),
516 relatedActions: decisionWithPayload.related_actions.map((action) => ({
517 ...action,
518 type: 'RELATED_ACTION' as const,
519 })),
520 createdAt: decisionWithPayload.created_at,
521 jobId: decisionWithPayload.job_id,
522 },
523 };
524 }
525}
526
527/**
528 * These options are meant to be passed to the psql `DATE_TRUNC()`
529 * function, so the strings must conform to valid `field`
530 * value from the postgres docs:
531 * https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC
532 * microseconds, milliseconds, second, minute, hour, day, week, month, quarter, year, decade,
533 * century, millennium
534 */
535type DecisionAnalyticsTimeDivisionOptions = 'DAY' | 'HOUR';
536
537export type TimeToActionInput = ReadonlyDeep<{
538 orgId: string;
539 groupBy: Array<'queue_id' | 'reviewer_id' | 'item_type_id'>;
540 filterBy: {
541 itemTypeIds: string[];
542 queueIds: string[];
543 startDate: Date;
544 endDate: Date;
545 };
546}>;
547
548export type JobCreationsInput = ReadonlyDeep<{
549 orgId: string;
550 groupBy: Array<'queue_id' | 'item_type_id' | 'policy_id' | 'source'>;
551 timeDivision: DecisionAnalyticsTimeDivisionOptions;
552 timeZone: string;
553 filterBy: {
554 itemTypeIds: string[];
555 queueIds: string[];
556 policyIds: string[];
557 ruleIds: string[];
558 sources: ManualReviewJobEnqueueSource[];
559 startDate: Date;
560 endDate: Date;
561 };
562}>;
563
564export type DecisionCountsInput = ReadonlyDeep<{
565 orgId: string;
566 groupBy: Omit<
567 keyof ManualReviewToolServicePg['manual_review_tool.dim_mrt_decisions_materialized'],
568 'action_id' | 'ds'
569 >[];
570 timeDivision: DecisionAnalyticsTimeDivisionOptions;
571 timeZone: string;
572 filterBy: {
573 actionIds: string[];
574 itemTypeIds: string[];
575 policyIds: string[];
576 queueIds: string[];
577 type: ManualReviewDecisionType[];
578 reviewerIds: string[];
579 startDate: Date;
580 endDate: Date;
581 filteredDecisionActionType?: ('CUSTOM_ACTION' | 'RELATED_ACTION')[];
582 };
583}>;
584
585export type DecisionCountsTableInput = ReadonlyDeep<{
586 orgId: string;
587 groupBy: 'reviewer_id' | 'queue_id';
588 timeZone: string;
589 filterBy: {
590 queueIds: string[];
591 reviewerIds: string[];
592 startDate: Date;
593 endDate: Date;
594 };
595}>;
596
597export type JobCountsInput = ReadonlyDeep<{
598 orgId: string;
599 groupBy: Array<'queue_id' | 'reviewer_id'>;
600 timeDivision: DecisionAnalyticsTimeDivisionOptions;
601 timeZone: string;
602 filterBy: {
603 startDate: Date;
604 endDate: Date;
605 queueIds: string[];
606 reviewerIds: string[];
607 };
608}>;