Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/* eslint-disable max-lines */
2
3import { type Exception } from '@opentelemetry/api';
4import { makeEnumLike } from '@roostorg/types';
5import { DataSource } from 'apollo-datasource';
6import { AuthenticationError } from 'apollo-server-express';
7import { sql, type Kysely } from 'kysely';
8import Sequelize from 'sequelize';
9import { uid } from 'uid';
10
11import { inject, type Dependencies } from '../../iocContainer/index.js';
12import {
13 isEmptyResultSetError,
14 isUniqueConstraintError,
15} from '../../models/errors.js';
16import { type User } from '../../models/UserModel.js';
17import { type ActionCountsInput } from '../../services/actionStatisticsService/index.js';
18import { type AggregationClause } from '../../services/aggregationsService/index.js';
19import {
20 RuleType,
21 type Condition,
22 type ConditionInput,
23 type ConditionSet,
24 type LeafCondition,
25 type CoopInput,
26 type RuleStatus,
27} from '../../services/moderationConfigService/index.js';
28import {
29 makeRuleHasRunningBacktestsError,
30 makeRuleIsMissingContentTypeError,
31 makeRuleNameExistsError,
32 // TODO: delete the import below when we move the rule mutation logic into the
33 // moderation config service, which is where it should be.
34 // eslint-disable-next-line import/no-restricted-paths
35} from '../../services/moderationConfigService/moderationConfigService.js';
36import {
37 isSignalId,
38 signalIsExternal,
39 type SignalId,
40} from '../../services/signalsService/index.js';
41import { type ConditionSetWithResultAsLogged } from '../../services/analyticsLoggers/index.js';
42import { type DataWarehousePublicSchema } from '../../storage/dataWarehouse/warehouseSchema.js';
43import { toCorrelationId } from '../../utils/correlationIds.js';
44import {
45 jsonParse,
46 jsonStringify,
47 tryJsonParse,
48} from '../../utils/encoding.js';
49import { makeNotFoundError } from '../../utils/errors.js';
50import { assertUnreachable, patchInPlace } from '../../utils/misc.js';
51import { takeLast } from '../../utils/sql.js';
52import {
53 type Mutable,
54 type NonEmptyString,
55 type RequiredWithoutNull,
56} from '../../utils/typescript-types.js';
57import {
58 type GQLAggregationClauseInput,
59 type GQLConditionInput,
60 type GQLConditionInputFieldInput,
61 type GQLConditionSetInput,
62 type GQLCreateContentRuleInput,
63 type GQLCreateUserRuleInput,
64 type GQLRunRetroactionInput,
65 type GQLUpdateContentRuleInput,
66 type GQLUpdateUserRuleInput,
67} from '../generated.js';
68import { oneOfInputToTaggedUnion } from '../utils/inputHelpers.js';
69import { type CursorInfo, type Edge } from '../utils/paginationHandler.js';
70import { locationAreaInputToLocationArea } from './LocationBankApi.js';
71
72const { Op, Transaction } = Sequelize;
73const SortOrder = makeEnumLike(['ASC', 'DESC']);
74type SortOrder = (typeof SortOrder)[keyof typeof SortOrder];
75
76// GraphQl exposed type for a rule execution.
77// TODO: make sure schema matches result here.
78export type RuleExecutionResult = {
79 date: string;
80 ts: string;
81 contentId: string;
82 itemTypeName: string;
83 itemTypeId: string;
84 userId?: string;
85 userTypeId?: string;
86 content: string;
87 result: ConditionSetWithResultAsLogged;
88 environment: RuleStatus;
89 passed: boolean;
90 ruleId: string;
91 ruleName: string;
92 tags: string[];
93};
94
95export function transformConditionForDB<
96 T extends GQLConditionInput | GQLConditionSetInput,
97>(condition: T): T extends GQLConditionSetInput ? ConditionSet : Condition {
98 if (!conditionInputIsValid(condition)) {
99 throw new Error('Invalid condition input');
100 }
101
102 if ('conditions' in condition) {
103 return {
104 ...condition,
105 conjunction: condition.conjunction,
106 conditions: condition.conditions.map(
107 transformConditionForDB,
108 ) as ConditionSet['conditions'],
109 };
110 }
111
112 return transformLeafConditionForDB(
113 condition,
114 ) as T extends GQLConditionSetInput ? ConditionSet : Condition;
115}
116
117/**
118 * When a LeafCondition is sent to us as input in a graphql mutation,
119 * the shape of the GQL input objects needs to be mapped to our internal
120 * representation of a LeafCondition (as used in the RuleModel/db/TS).
121 *
122 * NB: for google place locations stored in matchingValues, we convert them
123 * to valid LocationArea objects, but don't bother fetching the extra google
124 * place info (as that'd be quite a lot of extra data to store in the rule's
125 * json blob, which could have performance impacts, and it'd be quite
126 * slow/wasteful to fetch it for every location on every rule update).
127 */
128function transformLeafConditionForDB(
129 leafCondition: ValidatedGQLLeafConditionInput,
130): LeafCondition {
131 return {
132 ...leafCondition,
133 input: transformConditionInput(leafCondition.input),
134 ...(() => {
135 const { comparator, signal, matchingValues } = leafCondition;
136
137 if (comparator === 'IS_NOT_PROVIDED') {
138 if (signal) {
139 throw new Error(
140 'Cannot use is not provided on a condition with a signal',
141 );
142 }
143 return {
144 comparator,
145 signal: undefined,
146 matchingValues: undefined,
147 threshold: undefined,
148 };
149 }
150
151 return {
152 comparator,
153 matchingValues: matchingValues
154 ? {
155 ...matchingValues,
156 locations: matchingValues.locations?.map(
157 locationAreaInputToLocationArea,
158 ),
159 }
160 : undefined,
161 signal:
162 signal &&
163 (() => {
164 const { id, name, subcategory, type } = signal;
165 const idParsed = tryJsonParse(id);
166 if (!isSignalId(idParsed) || !signalIsExternal(idParsed)) {
167 throw new Error('Invalid signal id');
168 }
169 const signalInfo = {
170 id: jsonStringify(idParsed),
171 name,
172 subcategory,
173 };
174
175 // eslint-disable-next-line switch-statement/require-appropriate-default-case
176 switch (type) {
177 case 'AGGREGATION':
178 const aggregationClauseInput =
179 signal.args?.AGGREGATION?.aggregationClause;
180 if (!aggregationClauseInput) {
181 throw new Error('Missing signal args');
182 }
183 return {
184 ...signalInfo,
185 type,
186 args: {
187 aggregationClause: parseAggregationClauseInput(
188 aggregationClauseInput,
189 ),
190 },
191 };
192 default:
193 return {
194 ...signalInfo,
195 type,
196 args: undefined,
197 };
198 }
199 })(),
200 threshold: leafCondition.threshold,
201 };
202 })(),
203 };
204}
205
206function transformConditionInput(conditionInput: GQLConditionInputFieldInput) {
207 // TODO: fix the logic here rather than disabling the lint rule. We
208 // genuinely have some validation gaps.
209 // eslint-disable-next-line switch-statement/require-appropriate-default-case
210 switch (conditionInput.type) {
211 case 'CONTENT_DERIVED_FIELD':
212 const spec = conditionInput.spec!;
213 const specSource = oneOfInputToTaggedUnion(spec.source, {
214 contentField: 'CONTENT_FIELD',
215 fullItem: 'FULL_ITEM',
216 contentCoopInput: 'CONTENT_COOP_INPUT',
217 });
218
219 return {
220 ...(conditionInput as GQLConditionInputFieldInput & {
221 type: 'CONTENT_DERIVED_FIELD';
222 }),
223 spec: {
224 ...spec,
225 // This cast is needed because TS (from the generated types)
226 // thinks that input.spec.name is a GQLCoopInput enum, and the
227 // values of that type are things like ALL_TEXT etc, whereas the
228 // runtime values for our CoopInput type are 'All text' etc.
229 // What TS doesn't know is that an apollo resolver has mapped the
230 // GQL output values back to our saved runtime values, which makes
231 // this safe.
232 source: specSource as
233 | Exclude<typeof specSource, { type: 'CONTENT_COOP_INPUT' }>
234 | { type: 'CONTENT_COOP_INPUT'; name: CoopInput },
235 },
236 };
237 default:
238 // TS is actually right to complain here, because our GQL types for
239 // LeafCondition.input let a lot of invalid values through (the GQL
240 // types are pretty loose, because we haven't yet made them a proper
241 // GQL input union), and our coarse validation routine doesn't fully
242 // compensate for the looseness. But, for now, we just assume the
243 // frontend is sending valid data and do this cast.
244 return conditionInput as ConditionInput;
245 }
246}
247
248function parseAggregationClauseInput(
249 aggregationClause: GQLAggregationClauseInput,
250): AggregationClause {
251 return {
252 id: uid(),
253 conditionSet:
254 aggregationClause.conditionSet &&
255 transformConditionForDB(aggregationClause.conditionSet),
256 aggregation: {
257 type: aggregationClause.aggregation.type,
258 },
259 groupBy: aggregationClause.groupBy.map((it) => transformConditionInput(it)),
260 window: {
261 sizeMs: aggregationClause.window.sizeMs,
262 hopMs: aggregationClause.window.hopMs,
263 },
264 };
265}
266
267/**
268 * GraphQL Object for a Rule
269 */
270class RuleAPI extends DataSource {
271 private readonly warehouse: Kysely<DataWarehousePublicSchema>;
272
273 constructor(
274 private readonly knex: Dependencies['Knex'],
275 dialect: Dependencies['DataWarehouseDialect'],
276 public readonly ruleInsights: Dependencies['RuleActionInsights'],
277 private readonly actionStats: Dependencies['ActionStatisticsService'],
278 private readonly models: Dependencies['Sequelize'],
279 private readonly tracer: Dependencies['Tracer'],
280 private readonly signalsService: Dependencies['SignalsService'],
281 ) {
282 super();
283 this.warehouse = dialect.getKyselyInstance() as Kysely<DataWarehousePublicSchema>;
284 }
285
286 async getGraphQLRuleFromId(id: string, orgId: string) {
287 const rule = await this.models.Rule.findByPk(id, { rejectOnEmpty: true });
288 if (rule.orgId !== orgId) {
289 throw new AuthenticationError(
290 'User not authenticated to fetch this rule',
291 );
292 }
293
294 return rule;
295 }
296
297 async createContentRule(
298 input: GQLCreateContentRuleInput,
299 userId: string,
300 orgId: string,
301 ) {
302 return this.createRule(
303 { ...input, ruleType: RuleType.CONTENT },
304 userId,
305 orgId,
306 );
307 }
308
309 async createUserRule(
310 input: GQLCreateUserRuleInput,
311 userId: string,
312 orgId: string,
313 ) {
314 return this.createRule(
315 { ...input, ruleType: RuleType.USER },
316 userId,
317 orgId,
318 );
319 }
320
321 private async createRule(
322 input:
323 | (GQLCreateContentRuleInput & { ruleType: typeof RuleType.CONTENT })
324 | (GQLCreateUserRuleInput & { ruleType: typeof RuleType.USER }),
325 userId: string,
326 orgId: string,
327 ) {
328 const {
329 name,
330 description,
331 status,
332 conditionSet,
333 actionIds,
334 policyIds,
335 tags,
336 ruleType,
337 maxDailyActions,
338 expirationTime,
339 parentId,
340 } = input;
341
342 if (ruleType === RuleType.CONTENT && input.contentTypeIds.length === 0) {
343 throw makeRuleIsMissingContentTypeError({ shouldErrorSpan: true });
344 }
345
346 // Validate that signals used in automated rules are allowed
347 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
348 if (actionIds && actionIds.length > 0) {
349 await this.validateSignalsAllowedInAutomatedRules(conditionSet, orgId);
350 }
351
352 const rule = this.models.Rule.build({
353 id: uid(),
354 name,
355 description,
356 tags: tags.slice(),
357 status,
358 conditionSet: transformConditionForDB(conditionSet),
359 maxDailyActions,
360 expirationTime: (expirationTime as Date | null | undefined) ?? undefined,
361 creatorId: userId,
362 orgId,
363 ruleType,
364 parentId,
365 });
366
367 try {
368 await this.models.transactionWithRetry(async () => {
369 // Save rule to 'rules' table before adding assocs, to give it
370 // a record for foreign keys to reference and test name uniqueness.
371 await rule.save();
372
373 if (ruleType === RuleType.CONTENT) {
374 await rule.setContentTypes(
375 input.contentTypeIds as Mutable<typeof input.contentTypeIds>,
376 );
377 }
378
379 // The Mutable casts are used to work around a sequelize typings bug.
380 await rule.setActions(actionIds as Mutable<typeof actionIds>);
381 await rule.setPolicies(policyIds as Mutable<typeof policyIds>);
382
383 // TODO: is this needed?
384 await rule.save();
385 });
386 } catch (e) {
387 throw isUniqueConstraintError(e)
388 ? makeRuleNameExistsError({ shouldErrorSpan: true })
389 : e;
390 }
391
392 return rule;
393 }
394
395 async updateContentRule(opts: {
396 input: GQLUpdateContentRuleInput;
397 orgId: string;
398 }) {
399 const { input, orgId } = opts;
400 return this.updateRule({
401 input: { ...input, ruleType: RuleType.CONTENT },
402 orgId,
403 });
404 }
405
406 async updateUserRule(opts: { input: GQLUpdateUserRuleInput; orgId: string }) {
407 const { input, orgId } = opts;
408 return this.updateRule({
409 input: { ...input, ruleType: RuleType.USER },
410 orgId,
411 });
412 }
413
414 // eslint-disable-next-line complexity
415 private async updateRule(opts: {
416 input:
417 | (GQLUpdateContentRuleInput & { ruleType: typeof RuleType.CONTENT })
418 | (GQLUpdateUserRuleInput & { ruleType: typeof RuleType.USER });
419 orgId: string;
420 }) {
421 const { input, orgId } = opts;
422 const {
423 id,
424 name,
425 description,
426 status,
427 conditionSet,
428 actionIds,
429 policyIds,
430 tags,
431 ruleType,
432 maxDailyActions,
433 expirationTime,
434 cancelRunningBacktests,
435 parentId,
436 } = input;
437
438 const rule = await this.models.Rule.findOne({
439 where: { id, orgId },
440 rejectOnEmpty: true,
441 }).catch((e) => {
442 throw isEmptyResultSetError(e)
443 ? makeNotFoundError('Rule not found', {
444 detail: `Could not find rule with id ${id}`,
445 shouldErrorSpan: true,
446 })
447 : e;
448 });
449
450 if (conditionSet != null && !conditionInputIsValid(conditionSet)) {
451 throw new Error('Invalid condition set input');
452 }
453
454 // In the case of a content rule update, it's okay if the contentTypeIds isn't
455 // provided, since that will just be a no-op via the patchInPlace, but if it
456 // is provided, we need to check to make sure there are actually content type
457 // IDs present, since an empty list is invalid for content rules.
458 if (
459 ruleType === 'CONTENT' &&
460 input.contentTypeIds &&
461 input.contentTypeIds.length === 0
462 ) {
463 throw makeRuleIsMissingContentTypeError({ shouldErrorSpan: true });
464 }
465
466 patchInPlace(rule, {
467 name: name ?? undefined,
468 description,
469 conditionSet:
470 conditionSet == null
471 ? undefined
472 : transformConditionForDB(conditionSet),
473 tags: tags?.slice() ?? undefined,
474 ruleType,
475 });
476
477 if (status && rule.status !== status) {
478 rule.status = status;
479 }
480
481 if (rule.maxDailyActions !== maxDailyActions) {
482 // If maxDailyActions is undefined, it needs to be explicitly converted
483 // to null because postgres doesn't understand undefined
484 rule.maxDailyActions = maxDailyActions ?? null;
485 }
486
487 if (rule.expirationTime !== expirationTime) {
488 // If expirationTime is undefined, it needs to be explicitly converted
489 // to null because postgres doesn't understand undefined
490 rule.expirationTime = (expirationTime as Date | null | undefined) ?? null;
491 }
492
493 if (rule.parentId !== parentId) {
494 rule.parentId = parentId ?? null;
495 }
496
497 // Validate that signals used in automated rules are allowed
498 // Check if the rule will have actions meaning automated rule.
499 // This ensures we don't allow creating automated rules with signals
500 // that are restricted to routing rules only.
501 const willHaveActions = actionIds
502 ? actionIds.length > 0
503 : (await rule.getActions()).length > 0;
504
505 if (willHaveActions && conditionSet) {
506 await this.validateSignalsAllowedInAutomatedRules(conditionSet, orgId);
507 }
508
509 // Before we actually send any updates (which will happen as soon as we call
510 // setXXX to set the associations), we need to make sure that there are no
511 // active backtests for this rule because, if there are, we should fail the
512 // update unless the user's asked to cancel the backtests explicitly.
513 if (!cancelRunningBacktests) {
514 if (await this.models.Backtest.hasRunningBacktestsForRule(rule.id)) {
515 throw makeRuleHasRunningBacktestsError({ shouldErrorSpan: true });
516 }
517 }
518
519 // Do our updates, in a transaction so that we don't end up with
520 // inconsistent state if the name check fails. Technically, I think we'd
521 // need to put the hasRunningBacktests call above in this transaction and
522 // use SERIALIZABLE to make the update + backtest cancelation logically
523 // linearizable w/r/t concurrently started backtests, but that's overkill.
524 try {
525 await this.models.sequelize.transaction(
526 { isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ },
527 async () => {
528 if (ruleType === 'CONTENT' && input.contentTypeIds) {
529 await rule.setContentTypes(
530 input.contentTypeIds as Mutable<typeof input.contentTypeIds>,
531 );
532 }
533
534 // TODO: this is not safe. Let's one org link to a different org's
535 // policies/actions.
536 if (actionIds) {
537 await rule.setActions(actionIds as Mutable<typeof actionIds>);
538 }
539 if (policyIds) {
540 await rule.setPolicies(policyIds as Mutable<typeof policyIds>);
541 }
542
543 await rule.save();
544
545 // Finally, if the user asked to delete any running backtests, do it.
546 if (cancelRunningBacktests) {
547 await this.models.Backtest.cancelRunningBacktestsForRule(rule.id);
548 }
549 },
550 );
551 } catch (e) {
552 throw isUniqueConstraintError(e)
553 ? makeRuleNameExistsError({ shouldErrorSpan: true })
554 : e;
555 }
556
557 return rule;
558 }
559
560 async deleteRule(opts: { id: string; orgId: string }) {
561 const { id, orgId } = opts;
562
563 try {
564 const rule = await this.models.Rule.findOne({ where: { id, orgId } });
565 await rule?.destroy();
566 } catch (exception) {
567 const activeSpan = this.tracer.getActiveSpan();
568 if (activeSpan?.isRecording()) {
569 activeSpan.recordException(exception as Exception);
570 }
571 return false;
572 }
573 return true;
574 }
575
576 async getAllRuleInsights(orgId: string) {
577 const results = await Promise.allSettled([
578 this.actionStats.getActionedSubmissionCountsByDay(orgId),
579 this.actionStats.getActionedSubmissionCountsByPolicyByDay(orgId),
580 this.actionStats.getActionedSubmissionCountsByTagByDay(orgId),
581 this.actionStats.getActionedSubmissionCountsByActionByDay(orgId),
582 this.ruleInsights.getContentSubmissionCountsByDay(orgId),
583 ]);
584
585 const valueOrEmpty = <T,>(r: PromiseSettledResult<readonly T[]>): readonly T[] =>
586 r.status === 'fulfilled' ? r.value : [];
587
588 return {
589 actionedSubmissionsByDay: valueOrEmpty(results[0]),
590 actionedSubmissionsByPolicyByDay: valueOrEmpty(results[1]),
591 actionedSubmissionsByTagByDay: valueOrEmpty(results[2]),
592 actionedSubmissionsByActionByDay: valueOrEmpty(results[3]),
593 totalSubmissionsByDay: valueOrEmpty(
594 results[4] as PromiseSettledResult<readonly { date: string; count: number }[]>,
595 ),
596 };
597 }
598
599 async getPoliciesSortedByViolationCount(input: {
600 filterBy: {
601 startDate: Date;
602 endDate: Date;
603 };
604 timeZone: string;
605 orgId: string;
606 }) {
607 return this.actionStats.getPoliciesSortedByViolationCount(input);
608 }
609 async getActionStatistics(input: ActionCountsInput) {
610 const { filterBy } = input;
611 // if we need to filter some actions when also grouping, we must use the base table
612 // and can't use the materialized views that only aggregate by one field
613 if (
614 filterBy.actionIds.length ||
615 filterBy.itemTypeIds.length ||
616 filterBy.itemTypeIds.length ||
617 filterBy.sources.length
618 ) {
619 return this.actionStats.getAllActionCountsGroupBy(input);
620 }
621 switch (input.groupBy) {
622 case 'RULE_ID':
623 return this.actionStats.getAllActionCountsGroupByRule(input);
624 case 'POLICY_ID':
625 return this.actionStats.getAllActionCountsGroupByPolicy(input);
626 case 'ACTION_ID':
627 return this.actionStats.getAllActionCountsGroupByActionId(input);
628 case 'ITEM_TYPE_ID':
629 return this.actionStats.getAllActionCountsGroupByItemTypeId(input);
630 case 'ACTION_SOURCE':
631 return this.actionStats.getAllActionCountsGroupBySource(input);
632 default:
633 assertUnreachable(input.groupBy);
634 }
635 }
636
637 async createBacktest(_input: any, _user: User) {
638 throw new Error('Not Implemented');
639
640 // const id = uid();
641 // const rule = await this.models.Rule.findByPk(input.ruleId, {
642 // rejectOnEmpty: true,
643 // });
644 // const ruleContentTypes = await rule.getContentTypes();
645
646 // if (!ruleContentTypes.length) {
647 // throw new Error(
648 // "Rule is not attached to any content types, so we're " +
649 // 'unable to select content to use for the backtest.',
650 // );
651 // }
652
653 // const backest = this.models.Backtest.build({
654 // id,
655 // ruleId: input.ruleId,
656 // sampleDesiredSize: input.sampleDesiredSize,
657 // sampleStartAt: new Date(input.sampleStartAt),
658 // sampleEndAt: new Date(input.sampleEndAt),
659 // creatorId: user.id,
660 // });
661
662 // await backest.save();
663
664 // // Start sampling and enqueueing the sampled items, but do this without
665 // // awaiting so that we can return to the frontend immediately.
666 // //
667 // // Our query ignores legacy submissions that didn't store their content type
668 // // schema at the time of submission, as we can't interpret those reliably
669 // // when backtesting. This also has the effect of excluding all rows which
670 // // didn't log their submission id or item type id (which is what we want,
671 // // since those fields are required, and we started logging them before
672 // // logging `schema`). The use of FixSingleTableSelectRowType gets the types
673 // // to be aware of all our WHERE clause filters and their implications for
674 // // the other columns.
675 // // prettier-ignore
676 // this.getItemSubmissionsFromWarehouse({
677 // orgId: user.orgId,
678 // randomSample: true,
679 // numRows: input.sampleDesiredSize,
680 // startAt: new Date(input.sampleStartAt),
681 // endAt: new Date(input.sampleEndAt),
682 // itemTypeIds: ruleContentTypes.map(ct => ct.id),
683 // }).then(async (submissions) => {
684 // const ruleSetExecutionJobs = submissions.map((it) => ({
685 // orgId: user.orgId,
686 // ruleIds: [input.ruleId],
687 // itemSubmission: it,
688 // environment: RuleEnvironment.BACKTEST,
689 // correlationId: toCorrelationId({ type: 'backtest', id }),
690 // }));
691
692 // const { failures } = await this.ruleScheduler.enqueueRuleSetExecutions(
693 // ruleSetExecutionJobs,
694 // );
695
696 // const sampleActualSize = submissions.length - failures.length;
697 // await backest.update({ sampleActualSize, samplingComplete: true });
698 // })
699 // .catch((e) => {
700 // const span = this.tracer.getActiveSpan();
701 // span?.recordException(e);
702 // });
703
704 // return backest;
705 }
706
707 async getBacktestResults(
708 backtestId: string,
709 count: number,
710 takeFrom: 'start' | 'end',
711 cursor?: CursorInfo<{ ts: number }>,
712 sortByTs: SortOrder = SortOrder.DESC,
713 ): Promise<Edge<RuleExecutionResult, { ts: number }>[]> {
714 // There are a 12 cases here, i.e., (takeFrom start or end) x
715 // (no cursor, after cursor, before cursor) x (sort asc, desc).
716 // But our pagination helpers let us handle reasonably simply, in steps.
717 // First, we must define the result query if we weren't doing any pagination:
718 const allResultsQuery = this.knex('RULE_EXECUTIONS')
719 .select({
720 // This select is aliasing each column to the corresponding object key,
721 // so we have to do fewer renames from the warehouse ALL_CAPS_SNAKE_CASE
722 // when we return the final result.
723 date: 'DS',
724 ts: 'TS',
725 contentId: 'ITEM_ID',
726 contentType: 'ITEM_TYPE_NAME',
727 userId: 'ITEM_CREATOR_ID',
728 content: 'ITEM_DATA',
729 result: 'RESULT',
730 })
731 .where(
732 'CORRELATION_ID',
733 toCorrelationId({ type: 'backtest', id: backtestId }),
734 );
735
736 // Now, we can filter down the results to those that satisfy
737 // the cursor's before/after requirements, if there is a cursor.
738 // Note that how we do this filtering depends on how the results are sorted,
739 // because the sorting conceptually happens "before" pagination, and it
740 // effects what's "before" and what's "after" a given cursor.
741 //
742 // Specifically, if the results are sorted descending and we're looking for
743 // values _after_ the cursor, then we're looking for timestamp values that
744 // are less than the cursor. Similarly, if we're sorting ascending and
745 // looking for items before the cursor, then those items must have ts values
746 // less than the cursor. In the other cases, it's the opposite.
747 const filteredResultsQuery = !cursor
748 ? allResultsQuery
749 : allResultsQuery.andWhere(
750 'TS',
751 (sortByTs === SortOrder.DESC && cursor.direction === 'after') ||
752 (sortByTs === SortOrder.ASC && cursor.direction === 'before')
753 ? '<'
754 : '>',
755 new Date(cursor.value.ts),
756 );
757
758 const desiredSort = {
759 column: 'ts',
760 order: sortByTs === SortOrder.ASC ? 'asc' : 'desc',
761 } as const;
762
763 // Finally, filteredResultsQuery represents the _set_ of potentially valid
764 // items, but it's not yet sorted or limited to the page size. So, to do
765 // that... if we're taking from the start, then we add simple SQL sorting
766 // and limiting to our results; however, if we're taking from the end, we
767 // have to use our helper that implements "takeLast" in SQL.
768 const finalQuery =
769 takeFrom === 'start'
770 ? filteredResultsQuery.orderBy([desiredSort]).limit(count)
771 : takeLast(filteredResultsQuery, [desiredSort], count);
772
773 const results = (
774 await sql`${sql.raw(finalQuery.toString())}`.execute(this.warehouse)
775 ).rows;
776
777 return results.map((it: any) => ({
778 node: { ...it, result: it.result ? jsonParse(it.result) : null },
779 cursor: { ts: new Date(it.ts).valueOf() },
780 }));
781 }
782
783 async getBacktestsForRule(
784 ruleId: string,
785 backtestIds?: readonly string[] | null,
786 ) {
787 return this.models.Backtest.findAll({
788 where: {
789 ruleId,
790 ...(backtestIds ? { id: { [Op.in]: backtestIds } } : {}),
791 },
792 });
793 }
794
795 /**
796 * NB: This retroaction code is not production-ready. It should only
797 * be used for our Slack demo because it has a limit of 100 pieces
798 * of content on which it will run. That prevents us from accidentally
799 * turning this on and overloading our node servers, and is sufficient
800 * for the Slack demo.
801 */
802 async runRetroaction(_input: GQLRunRetroactionInput, _user: User) {
803 throw new Error('Not Implemented');
804
805 // const rule = await this.models.Rule.findByPk(input.ruleId, {
806 // rejectOnEmpty: true,
807 // });
808 // const ruleContentTypes = await rule.getContentTypes();
809
810 // if (!ruleContentTypes.length) {
811 // throw new Error(
812 // "Rule is not attached to any content types, so we're " +
813 // 'unable to select content to use for the backtest.',
814 // );
815 // }
816
817 // const id = uid();
818 // const submissions = await this.getItemSubmissionsFromWarehouse({
819 // orgId: user.orgId,
820 // itemTypeIds: ruleContentTypes.map((ct) => ct.id),
821 // randomSample: false,
822 // numRows: 100, // TODO: Remove the limit, and instead batch this query
823 // startAt: new Date(input.startAt),
824 // endAt: new Date(input.endAt),
825 // });
826
827 // try {
828 // const { failures } = await this.ruleScheduler.enqueueRuleSetExecutions(
829 // submissions.map((it) => ({
830 // orgId: user.orgId,
831 // ruleIds: [input.ruleId],
832 // itemSubmission: it,
833 // environment: RuleEnvironment.RETROACTION,
834 // correlationId: toCorrelationId({ type: 'retroaction', id }),
835 // })),
836 // );
837
838 // return { _: !failures.length };
839 // } catch (e) {
840 // this.tracer.getActiveSpan()?.recordException(e as Exception);
841 // return { _: false };
842 // }
843 }
844
845 /**
846 * Validates that all signals used in the condition set are allowed in automated rules.
847 * Throws an error if any restricted signal is found.
848 */
849 private async validateSignalsAllowedInAutomatedRules(
850 conditionSet: GQLConditionSetInput,
851 orgId: string,
852 ): Promise<void> {
853 const signalIds = this.extractSignalIdsFromConditionSet(conditionSet);
854
855 for (const signalId of signalIds) {
856 const signal = await this.signalsService.getSignal({
857 signalId,
858 orgId,
859 });
860
861 if (signal && !signal.allowedInAutomatedRules) {
862 throw new Error(
863 `Signal "${signal.displayName}" cannot be used in automated rules with actions. ` +
864 `This signal is restricted to routing rules only.`
865 );
866 }
867 }
868 }
869
870 /**
871 * Extracts all signal IDs from a condition set recursively
872 */
873 private extractSignalIdsFromConditionSet(
874 conditionSet: GQLConditionSetInput,
875 ): SignalId[] {
876 const signalIds: SignalId[] = [];
877
878 const processCondition = (condition: GQLConditionInput) => {
879 if ('conditions' in condition && condition.conditions) {
880 // It's a condition set, recurse
881 for (const subCondition of condition.conditions) {
882 processCondition(subCondition);
883 }
884 } else if ('signal' in condition && condition.signal) {
885 // It's a leaf condition with a signal (type is String to support plugin signals)
886 const { type, id } = condition.signal;
887 let signalId: SignalId;
888 if (type === 'CUSTOM') {
889 // CUSTOM signals require an id field. The id comes from validated GraphQL
890 // input where it's a required Scalars['ID'], so we can safely cast it.
891 signalId = { type: 'CUSTOM' as const, id: id as NonEmptyString };
892 } else {
893 // Built-in and plugin signals: type is the signal type string
894 signalId = { type };
895 }
896 signalIds.push(signalId);
897 }
898 };
899
900 // Start processing from the root
901 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
902 if (conditionSet.conditions) {
903 for (const condition of conditionSet.conditions) {
904 processCondition(condition);
905 }
906 }
907
908 return signalIds;
909 }
910}
911
912export default inject(
913 [
914 'Knex',
915 'DataWarehouseDialect',
916 'RuleActionInsights',
917 'ActionStatisticsService',
918 'Sequelize',
919 'Tracer',
920 'SignalsService',
921 ],
922 RuleAPI,
923);
924export type { RuleAPI };
925
926/**
927 * Our ConditionInput in GraphQL is forced to be not type safe, so we must
928 * validate it here. For convenience, we also allow this to accept a
929 * ConditionSetInput, which has the same shape as valid ConditionInputs that are
930 * used to represent ConditionSets.
931 */
932function conditionInputIsValid(
933 it: GQLConditionInput | GQLConditionSetInput,
934): it is ValidatedGQLConditionInput {
935 return (
936 (it.conjunction != null &&
937 it.conditions != null &&
938 Object.keys(it).length === 2) ||
939 (!('conjunction' in it) && !('conditions' in it) && it.input != null)
940 );
941}
942
943type ValidatedGQLConditionInput =
944 | ValidatedGQLConditionSetInput
945 | ValidatedGQLLeafConditionInput;
946
947type ValidatedGQLConditionSetInput = RequiredWithoutNull<
948 Pick<GQLConditionInput, 'conditions' | 'conjunction'>
949>;
950
951type ValidatedGQLLeafConditionInput = Omit<
952 GQLConditionInput,
953 'conditions' | 'conjunction'
954> &
955 RequiredWithoutNull<Pick<GQLConditionInput, 'input'>>;
956