Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 608 lines 21 kB view raw
1/* eslint-disable max-lines */ 2import { sql, type Kysely } from 'kysely'; 3import { type ReadonlyDeep } from 'type-fest'; 4 5import { UserPermission } from '../../../models/types/permissioning.js'; 6import { MONTH_MS } from '../../../utils/time.js'; 7import { type ManualReviewToolServicePg } from '../dbTypes.js'; 8import { 9 type ManualReviewJob, 10 type ManualReviewJobEnqueueSource, 11} from '../manualReviewToolService.js'; 12import { type ManualReviewDecisionType } from './JobDecisioning.js'; 13 14export type RecentDecisionsFilterInput = { 15 userSearchString?: string; 16 decisions?: readonly ( 17 | { 18 type: Exclude< 19 ManualReviewDecisionType, 20 'CUSTOM_ACTION' | 'RELATED_ACTION' 21 >; 22 actionIds: undefined; 23 } 24 | { 25 type: 'CUSTOM_ACTION' | 'RELATED_ACTION'; 26 actionIds: readonly string[]; 27 } 28 )[]; 29 policyIds?: readonly string[]; 30 reviewerIds?: readonly string[]; 31 queueIds?: readonly string[]; 32 startTime?: Date; 33 endTime?: Date; 34 page: number; 35}; 36 37export default class DecisionAnalytics { 38 constructor(private readonly pgQuery: Kysely<ManualReviewToolServicePg>) {} 39 40 async getDecisionCounts(input: DecisionCountsInput) { 41 const { orgId, groupBy, filterBy, timeDivision, timeZone } = input; 42 const { ref } = this.pgQuery.dynamic; 43 return this.pgQuery 44 .selectFrom('manual_review_tool.dim_mrt_decisions_materialized') 45 .select([ 46 sql<string>`DATE_TRUNC(${timeDivision}, decided_at, ${timeZone})`.as( 47 'time', 48 ), 49 sql<number>`COUNT(DISTINCT (item_id, item_type_id))`.as('count'), 50 ]) 51 .$if(groupBy.includes('action_id'), (qb) => qb.select('action_id')) 52 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id')) 53 .$if(groupBy.includes('type'), (qb) => qb.select(['action_id', 'type'])) 54 .$if(groupBy.includes('reviewer_id'), (qb) => qb.select('reviewer_id')) 55 .$if(groupBy.includes('policy_id'), (qb) => qb.select('policy_id')) 56 .where((eb) => { 57 return eb.and([ 58 eb('org_id', '=', orgId), 59 eb( 60 sql`decided_at AT TIME ZONE ${timeZone}`, 61 '>=', 62 filterBy.startDate, 63 ), 64 eb(sql`decided_at AT TIME ZONE ${timeZone}`, '<=', filterBy.endDate), 65 ...(filterBy.actionIds.length > 0 || filterBy.type.length > 0 66 ? [ 67 eb.or([ 68 ...(filterBy.actionIds.length > 0 69 ? [eb('action_id', 'in', filterBy.actionIds)] 70 : []), 71 ...(filterBy.type.length > 0 72 ? [eb('type', 'in', filterBy.type)] 73 : []), 74 ]), 75 ] 76 : []), 77 ...(filterBy.itemTypeIds.length > 0 78 ? [eb('item_type_id', 'in', filterBy.itemTypeIds)] 79 : []), 80 ...(filterBy.policyIds.length > 0 81 ? [eb('policy_id', 'in', filterBy.policyIds)] 82 : []), 83 ...(filterBy.reviewerIds.length > 0 84 ? [eb('reviewer_id', 'in', filterBy.reviewerIds)] 85 : []), 86 ...(filterBy.queueIds.length > 0 87 ? [eb('queue_id', 'in', filterBy.queueIds)] 88 : []), 89 // Ignores, NCMEC reports and re-enqueueing don't make any sense when 90 // grouping by policy ID 91 ...(groupBy.includes('policy_id') 92 ? [eb('type', 'in', ['CUSTOM_ACTION', 'RELATED_ACTION'])] 93 : []), 94 ...(filterBy.filteredDecisionActionType && 95 filterBy.filteredDecisionActionType.includes('RELATED_ACTION') 96 ? [eb('type', 'not in', filterBy.filteredDecisionActionType)] 97 : []), 98 ...(filterBy.filteredDecisionActionType?.includes('CUSTOM_ACTION') 99 ? [eb('type', 'not in', filterBy.filteredDecisionActionType)] 100 : []), 101 ]); 102 }) 103 .groupBy([ 104 ...groupBy.map((it) => ref(it as string)).flat(), 105 'time', 106 ...(groupBy.includes('type') ? [ref('action_id')] : []), 107 ]) 108 .execute(); 109 } 110 111 async getDecisionCountsTable(input: DecisionCountsTableInput) { 112 const { orgId, groupBy, filterBy, timeZone } = input; 113 return this.pgQuery 114 .selectFrom('manual_review_tool.dim_mrt_decisions_materialized') 115 .select([ 116 sql<number>`COUNT(DISTINCT (item_id, item_type_id))`.as('count'), 117 'action_id', 118 'type', 119 ]) 120 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id')) 121 .$if(groupBy.includes('reviewer_id'), (qb) => qb.select('reviewer_id')) 122 .where((eb) => { 123 return eb.and([ 124 eb('org_id', '=', orgId), 125 eb( 126 sql`decided_at AT TIME ZONE ${timeZone}`, 127 '>=', 128 filterBy.startDate, 129 ), 130 eb(sql`decided_at AT TIME ZONE ${timeZone}`, '<=', filterBy.endDate), 131 ...(filterBy.reviewerIds.length > 0 132 ? [eb('reviewer_id', 'in', filterBy.reviewerIds)] 133 : []), 134 ...(filterBy.queueIds.length > 0 135 ? [eb('queue_id', 'in', filterBy.queueIds)] 136 : []), 137 ]); 138 }) 139 .groupBy([groupBy, 'action_id', 'type']) 140 .execute(); 141 } 142 143 async getTimeToAction(input: TimeToActionInput) { 144 const { orgId, groupBy, filterBy } = input; 145 const { ref } = this.pgQuery.dynamic; 146 return this.pgQuery 147 .selectFrom('manual_review_tool.job_creations as creations') 148 .innerJoin( 149 'manual_review_tool.manual_review_decisions as decisions', 150 (join) => 151 join.on((eb) => 152 eb( 153 'creations.id', 154 '=', 155 eb.ref('decisions.job_payload', '->>').key('id'), 156 ), 157 ), 158 ) 159 .select(({ fn, val }) => 160 fn<number | null>('date_part', [ 161 val('EPOCH'), 162 fn.avg<number | null>(({ eb, ref }) => 163 eb('decisions.created_at', '-', ref('creations.created_at')), 164 ), 165 ]).as('time_to_action'), 166 ) 167 .$if(groupBy.includes('queue_id'), (qb) => 168 qb.select('decisions.queue_id as queue_id'), 169 ) 170 .where((eb) => { 171 return eb.and([ 172 eb('creations.org_id', '=', orgId), 173 eb('creations.created_at', '>=', filterBy.startDate), 174 eb('creations.created_at', '<=', filterBy.endDate), 175 ...(filterBy.queueIds.length > 0 176 ? [eb('decisions.queue_id', 'in', filterBy.queueIds)] 177 : []), 178 ]); 179 }) 180 .$if(groupBy.length > 0, (qb) => 181 qb.groupBy([ 182 ...groupBy.map((it) => ref(`decisions.${it as string}`)).flat(), 183 ]), 184 ) 185 .execute(); 186 } 187 188 async getJobCreations(input: JobCreationsInput) { 189 const { groupBy, filterBy, orgId, timeDivision, timeZone } = input; 190 191 const { ref } = this.pgQuery.dynamic; 192 return this.pgQuery 193 .selectFrom('manual_review_tool.flattened_job_creations as creations') 194 .select([ 195 sql<string>`date_trunc(${timeDivision}, created_at, ${timeZone})`.as( 196 'time', 197 ), 198 sql<number>`COUNT(DISTINCT(item_id, item_type_id))`.as('count'), 199 ]) 200 .$if(groupBy.includes('item_type_id'), (qb) => qb.select('item_type_id')) 201 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id')) 202 .$if(groupBy.includes('policy_id'), (qb) => qb.select('policy_id')) 203 .$if(groupBy.includes('source'), (qb) => 204 qb.select(['source_kind as source', 'rule_id']), 205 ) 206 .where((eb) => { 207 return eb.and([ 208 eb('org_id', '=', orgId), 209 eb('creations.created_at', '>=', filterBy.startDate), 210 eb('creations.created_at', '<=', filterBy.endDate), 211 ...(filterBy.itemTypeIds.length > 0 212 ? [eb('item_type_id', 'in', filterBy.itemTypeIds)] 213 : []), 214 ...(filterBy.queueIds.length > 0 215 ? [eb('queue_id', 'in', filterBy.queueIds)] 216 : []), 217 ...(filterBy.policyIds.length > 0 218 ? [eb('policy_id', 'in', filterBy.policyIds)] 219 : []), 220 ...(filterBy.sources.length > 0 221 ? [eb('source_kind', 'in', filterBy.sources)] 222 : []), 223 ...(filterBy.ruleIds.length > 0 224 ? [eb('rule_id', 'in', filterBy.ruleIds)] 225 : []), 226 ]); 227 }) 228 .groupBy([ 229 ...groupBy.map((it) => ref(it as string)).flat(), 230 'time', 231 ...(groupBy.includes('source') ? [ref('rule_id')] : []), 232 ]) 233 .execute(); 234 } 235 236 async getRecentDecisions(opts: { 237 userPermissions: UserPermission[]; 238 orgId: string; 239 input: RecentDecisionsFilterInput; 240 }) { 241 const { userPermissions, orgId, input } = opts; 242 const { 243 userSearchString, 244 decisions: decisionsFilter, 245 policyIds, 246 reviewerIds, 247 queueIds, 248 startTime, 249 endTime, 250 page, 251 } = input; 252 const limit = 100; 253 const decisions = await this.pgQuery 254 .selectFrom('manual_review_tool.manual_review_decisions') 255 .select([ 256 'id', 257 'queue_id', 258 'reviewer_id', 259 'decision_components', 260 'related_actions', 261 'created_at', 262 sql<string>`((job_payload->'payload'::text)->'item'::text) -> 'itemId'::text`.as( 263 'item_id', 264 ), 265 sql<string>`(((job_payload->'payload'::text)->'item'::text) -> 'itemTypeIdentifier'::text) ->> 'id'::text`.as( 266 'item_type_id', 267 ), 268 'decision_reason', 269 sql<string>`(job_payload->>'id')::text`.as('job_id'), 270 ]) 271 .where('org_id', '=', orgId) 272 .where(({ eb, selectFrom }) => { 273 return eb.and([ 274 ...(startTime ? [eb('created_at', '>=', new Date(startTime))] : []), 275 ...(endTime ? [eb('created_at', '<=', new Date(endTime))] : []), 276 ...(queueIds && queueIds.length > 0 277 ? [eb('queue_id', 'in', queueIds)] 278 : []), 279 ...(reviewerIds && reviewerIds.length > 0 280 ? [eb('reviewer_id', 'in', reviewerIds)] 281 : []), 282 ...(policyIds 283 ? [ 284 eb.exists( 285 selectFrom( 286 sql`unnest(manual_review_tool.manual_review_decisions.decision_components)`.as( 287 'decision_component', 288 ), 289 ) 290 .selectAll() 291 .where( 292 eb.or( 293 policyIds.map((policyId) => 294 eb( 295 sql<string>`decision_component->>'policies'`, 296 'like', 297 `%"${policyId}"%`, 298 ), 299 ), 300 ), 301 ), 302 ), 303 ] 304 : []), 305 ...(decisionsFilter 306 ? [ 307 eb.or( 308 decisionsFilter.flatMap((it) => [ 309 eb.exists( 310 selectFrom( 311 sql`unnest(manual_review_tool.manual_review_decisions.decision_components)`.as( 312 'decision_component', 313 ), 314 ) 315 .selectAll() 316 .where( 317 sql<string>`decision_component->>'type'`, 318 '=', 319 it.type, 320 ) 321 .$if(it.actionIds !== undefined, (qb) => 322 qb.where( 323 eb.or( 324 it.actionIds!.map((actionId) => 325 eb( 326 sql<string>`decision_component->>'actions'`, 327 'like', 328 `%"${actionId}"%`, 329 ), 330 ), 331 ), 332 ), 333 ), 334 ), 335 ]), 336 ), 337 ] 338 : []), 339 ]); 340 }) 341 .$if(userSearchString !== undefined, (qb) => 342 // See https://stackoverflow.com/a/55607847 343 qb.where(({ and, eb, val }) => 344 and([ 345 eb('created_at', '>', val(new Date(Date.now() - 3 * MONTH_MS))), 346 eb( 347 sql<string>`(manual_review_tool.manual_review_decisions.job_payload->'payload'->'item'->>'itemId')`, 348 '=', 349 // Above, the 'itemId' field is of type jsonb, so we cast it to a string using ::text, but that 350 // cast will leave quotes around the resulting string because it's just stringifying what it thinks 351 // is a jsonb object. The easiest way to handle this is to just add quotes around the userSearchString 352 // to match the quotes in the value above. 353 val(`${userSearchString}`), 354 ), 355 ]), 356 ), 357 ) 358 // If the user doesn't have the VIEW_CHILD_SAFETY_DATA permission, filter out decisions on 359 // all NCMEC jobs 360 .$if( 361 !userPermissions.includes(UserPermission.VIEW_CHILD_SAFETY_DATA), 362 (qb) => 363 qb.where(({ eb, val }) => 364 eb( 365 sql<string>`(job_payload->'payload'->'kind')::text`, 366 '!=', 367 val('"NCMEC"'), 368 ), 369 ), 370 ) 371 .orderBy('created_at', 'desc') 372 .limit(limit) 373 .offset(page * limit) 374 .execute(); 375 return decisions.map((decision) => ({ 376 id: decision.id, 377 itemId: decision.item_id, 378 itemTypeId: decision.item_type_id, 379 queueId: decision.queue_id, 380 reviewerId: decision.reviewer_id, 381 decisions: decision.decision_components.map((it) => { 382 if (it.type !== 'CUSTOM_ACTION') { 383 return it; 384 } 385 return { 386 ...it, 387 actionIds: it.actions.map((it) => it.id), 388 policyIds: it.policies.map((it) => it.id), 389 itemTypeId: it.itemTypeId, 390 }; 391 }), 392 relatedActions: decision.related_actions.map((action) => ({ 393 ...action, 394 type: 'RELATED_ACTION' as const, 395 })), 396 createdAt: decision.created_at, 397 decisionReason: decision.decision_reason, 398 jobId: decision.job_id, 399 })); 400 } 401 402 async getResolvedJobCounts(input: JobCountsInput) { 403 const { orgId, groupBy, filterBy, timeDivision, timeZone } = input; 404 const { queueIds, reviewerIds, startDate, endDate } = filterBy; 405 const { ref } = this.pgQuery.dynamic; 406 return this.pgQuery 407 .selectFrom('manual_review_tool.manual_review_decisions') 408 .select([ 409 sql<string>`date_trunc(${timeDivision}, created_at, ${timeZone})`.as( 410 'time', 411 ), 412 sql<number>`COUNT(DISTINCT id)`.as('count'), 413 ]) 414 .$if(groupBy.includes('queue_id'), (qb) => qb.select('queue_id')) 415 .$if(groupBy.includes('reviewer_id'), (qb) => qb.select('reviewer_id')) 416 .where('org_id', '=', orgId) 417 .where((eb) => { 418 return eb.and([ 419 eb(sql`created_at AT TIME ZONE ${timeZone}`, '>=', startDate), 420 eb(sql`created_at AT TIME ZONE ${timeZone}`, '<=', endDate), 421 ]); 422 }) 423 .where(({ eb }) => { 424 return eb.and([ 425 ...(queueIds.length > 0 ? [eb('queue_id', 'in', queueIds)] : []), 426 ...(reviewerIds.length > 0 427 ? [eb('reviewer_id', 'in', reviewerIds)] 428 : []), 429 ]); 430 }) 431 .groupBy(['time', ...groupBy.map((it) => ref(it as string)).flat()]) 432 .execute(); 433 } 434 435 async getDecidedJob(opts: { orgId: string; id: string }) { 436 const { orgId, id } = opts; 437 const payload = await this.pgQuery 438 .selectFrom('manual_review_tool.manual_review_decisions') 439 .select(['job_payload']) 440 .where('created_at', '>=', new Date('2023-10-01')) 441 .where('org_id', '=', orgId) 442 .where('id', '=', id) 443 .executeTakeFirst(); 444 // This is safe because only jobs created before Sept 2023 have 445 // the old legacy schema defined in StoredManualReviewJob (see 446 // the comment associated with the StoredManualReviewJob type). 447 // This query has a hardcoded filter to not include those old jobs. 448 return (payload?.job_payload ?? null) as ManualReviewJob | null; 449 } 450 451 async getDecidedJobFromJobId(opts: { 452 orgId: string; 453 jobId: string; 454 userPermissions: UserPermission[]; 455 }) { 456 const { orgId, jobId, userPermissions } = opts; 457 const decisionWithPayload = await this.pgQuery 458 .selectFrom('manual_review_tool.manual_review_decisions') 459 .select([ 460 'job_payload', 461 'id', 462 'queue_id', 463 'reviewer_id', 464 'decision_components', 465 'related_actions', 466 'created_at', 467 sql<string>`((job_payload->'payload'::text)->'item'::text) -> 'itemId'::text`.as( 468 'item_id', 469 ), 470 sql<string>`(((job_payload->'payload'::text)->'item'::text) -> 'itemTypeIdentifier'::text) ->> 'id'::text`.as( 471 'item_type_id', 472 ), 473 sql<string>`(job_payload->>'id')::text`.as('job_id'), 474 ]) 475 .where('created_at', '>=', new Date('2023-10-01')) 476 .where('org_id', '=', orgId) 477 .where(sql<string>`(job_payload->>'id')::text`, '=', jobId) 478 .$if( 479 !userPermissions.includes(UserPermission.VIEW_CHILD_SAFETY_DATA), 480 (qb) => 481 qb.where(({ eb, val }) => 482 eb( 483 sql<string>`(job_payload->'payload'->'kind')::text`, 484 '!=', 485 val('"NCMEC"'), 486 ), 487 ), 488 ) 489 .executeTakeFirst(); 490 // This is safe because only jobs created before Sept 2023 have 491 // the old legacy schema defined in StoredManualReviewJob (see 492 // the comment associated with the StoredManualReviewJob type). 493 // This query has a hardcoded filter to not include those old jobs. 494 if (!decisionWithPayload) { 495 return null; 496 } 497 return { 498 job: decisionWithPayload.job_payload as ManualReviewJob, 499 decision: { 500 id: decisionWithPayload.id, 501 itemId: decisionWithPayload.item_id, 502 itemTypeId: decisionWithPayload.item_type_id, 503 queueId: decisionWithPayload.queue_id, 504 reviewerId: decisionWithPayload.reviewer_id, 505 decisions: decisionWithPayload.decision_components.map((it) => { 506 if (it.type !== 'CUSTOM_ACTION') { 507 return it; 508 } 509 return { 510 ...it, 511 actionIds: it.actions.map((it) => it.id), 512 policyIds: it.policies.map((it) => it.id), 513 itemTypeId: it.itemTypeId, 514 }; 515 }), 516 relatedActions: decisionWithPayload.related_actions.map((action) => ({ 517 ...action, 518 type: 'RELATED_ACTION' as const, 519 })), 520 createdAt: decisionWithPayload.created_at, 521 jobId: decisionWithPayload.job_id, 522 }, 523 }; 524 } 525} 526 527/** 528 * These options are meant to be passed to the psql `DATE_TRUNC()` 529 * function, so the strings must conform to valid `field` 530 * value from the postgres docs: 531 * https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC 532 * microseconds, milliseconds, second, minute, hour, day, week, month, quarter, year, decade, 533 * century, millennium 534 */ 535type DecisionAnalyticsTimeDivisionOptions = 'DAY' | 'HOUR'; 536 537export type TimeToActionInput = ReadonlyDeep<{ 538 orgId: string; 539 groupBy: Array<'queue_id' | 'reviewer_id' | 'item_type_id'>; 540 filterBy: { 541 itemTypeIds: string[]; 542 queueIds: string[]; 543 startDate: Date; 544 endDate: Date; 545 }; 546}>; 547 548export type JobCreationsInput = ReadonlyDeep<{ 549 orgId: string; 550 groupBy: Array<'queue_id' | 'item_type_id' | 'policy_id' | 'source'>; 551 timeDivision: DecisionAnalyticsTimeDivisionOptions; 552 timeZone: string; 553 filterBy: { 554 itemTypeIds: string[]; 555 queueIds: string[]; 556 policyIds: string[]; 557 ruleIds: string[]; 558 sources: ManualReviewJobEnqueueSource[]; 559 startDate: Date; 560 endDate: Date; 561 }; 562}>; 563 564export type DecisionCountsInput = ReadonlyDeep<{ 565 orgId: string; 566 groupBy: Omit< 567 keyof ManualReviewToolServicePg['manual_review_tool.dim_mrt_decisions_materialized'], 568 'action_id' | 'ds' 569 >[]; 570 timeDivision: DecisionAnalyticsTimeDivisionOptions; 571 timeZone: string; 572 filterBy: { 573 actionIds: string[]; 574 itemTypeIds: string[]; 575 policyIds: string[]; 576 queueIds: string[]; 577 type: ManualReviewDecisionType[]; 578 reviewerIds: string[]; 579 startDate: Date; 580 endDate: Date; 581 filteredDecisionActionType?: ('CUSTOM_ACTION' | 'RELATED_ACTION')[]; 582 }; 583}>; 584 585export type DecisionCountsTableInput = ReadonlyDeep<{ 586 orgId: string; 587 groupBy: 'reviewer_id' | 'queue_id'; 588 timeZone: string; 589 filterBy: { 590 queueIds: string[]; 591 reviewerIds: string[]; 592 startDate: Date; 593 endDate: Date; 594 }; 595}>; 596 597export type JobCountsInput = ReadonlyDeep<{ 598 orgId: string; 599 groupBy: Array<'queue_id' | 'reviewer_id'>; 600 timeDivision: DecisionAnalyticsTimeDivisionOptions; 601 timeZone: string; 602 filterBy: { 603 startDate: Date; 604 endDate: Date; 605 queueIds: string[]; 606 reviewerIds: string[]; 607 }; 608}>;