Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import _ from 'lodash';
2import { type ReadonlyDeep } from 'type-fest';
3
4import {
5 getAllAggregationsInConditionSet,
6 getConditionSetResults,
7} from '../condition_evaluator/conditionSet.js';
8import { type Dependencies } from '../iocContainer/index.js';
9import { inject } from '../iocContainer/utils.js';
10import {
11 ConditionCompletionOutcome,
12 type RuleWithLatestVersion,
13 type Rule as TRule,
14} from '../models/rules/RuleModel.js';
15import { evaluateAggregationRuntimeArgsForItem } from '../services/aggregationsService/index.js';
16import { type ItemSubmission } from '../services/itemProcessingService/index.js';
17import {
18 RuleStatus,
19 type Action,
20 type ConditionSet,
21} from '../services/moderationConfigService/index.js';
22import { type RuleExecutionCorrelationId } from '../services/analyticsLoggers/index.js';
23import {
24 type CorrelationId,
25 type CorrelationIdType,
26} from '../utils/correlationIds.js';
27import { equalLengthZip } from '../utils/fp-helpers.js';
28import { safePick } from '../utils/misc.js';
29import type SafeTracer from '../utils/SafeTracer.js';
30import {
31 isFullSubmission,
32 type RuleEvaluationContext,
33 // This is used for a {@link} in a jsdoc comments.
34 // eslint-disable-next-line @typescript-eslint/no-unused-vars
35 type RuleEvaluator,
36 type RuleExecutionResult,
37 type RuleInput,
38} from './RuleEvaluator.js';
39
40const { partition, uniqBy } = _;
41
42// Represents the context from which a rule was triggered, which effects whether
43// it passing will trigger actions and, if so, whether those actions will count
44// against the rule's limit. Often, the rule environment is the same as the
45// rule's status (e.g., a `LIVE` content rule will run in the `LIVE` rule
46// environment when a new piece of content is submitted). However, sometimes the
47// two fields come apart. E.g., a background rule can be backtested and a live
48// rule can run under retroaction, among many other possibilities.
49export enum RuleEnvironment {
50 LIVE = 'LIVE',
51 BACKGROUND = 'BACKGROUND',
52 BACKTEST = 'BACKTEST',
53 MANUAL = 'MANUAL',
54 RETROACTION = 'RETROACTION',
55}
56
57/**
58 * This is the main Rule Engine class. It's responsible for running
59 * all of the user's Rules on a single piece of content sent to
60 * our API.
61 */
62class RuleEngine {
63 constructor(
64 private readonly ruleExecutionLogger: Dependencies['RuleExecutionLogger'],
65 private readonly ruleEvaluator: Dependencies['RuleEvaluator'],
66 private readonly actionPublisher: Dependencies['ActionPublisher'],
67 private readonly getEnabledRulesForItemTypeEventuallyConsistent: Dependencies['getEnabledRulesForItemTypeEventuallyConsistent'],
68 private readonly getPoliciesForRulesEventuallyConsistent: Dependencies['getPoliciesForRulesEventuallyConsistent'],
69 private readonly getRuleActionsEventuallyConsistent: Dependencies['getActionsForRuleEventuallyConsistent'],
70 private readonly recordRuleActionLimitUsage: Dependencies['recordRuleActionLimitUsage'],
71 private readonly aggregationsService: Dependencies['AggregationsService'],
72 private readonly tracer: Dependencies['Tracer'],
73 ) {}
74
75 private readonly environmentsThatApplyActions = [
76 RuleEnvironment.LIVE,
77 RuleEnvironment.MANUAL,
78 RuleEnvironment.RETROACTION,
79 ];
80
81 /**
82 * @see {@link RuleEvaluator.makeRuleExecutionContext}.
83 */
84 makeRuleExecutionContext(args: {
85 orgId: string;
86 input: RuleInput;
87 }): RuleEvaluationContext {
88 return this.ruleEvaluator.makeRuleExecutionContext(args);
89 }
90
91 /**
92 * Runs the rules that are "enabled" ({@see ItemType.getEnabledRules}) for
93 * this item type, against the given itemSubmission.
94 *
95 * @param itemSubmission
96 * @param executionsCorrelationId - An id that should be associated with all
97 * rule executions generated as part of running these rules, for correlating
98 * the execution with the event in the system that caused it.
99 * {@see getCorrelationId}
100 */
101 async runEnabledRules(
102 itemSubmission: ItemSubmission,
103 executionsCorrelationId: RuleExecutionCorrelationId,
104 sync: boolean = false,
105 ) {
106 // enabledRules can be null when the contentType can't be found.
107 // getEnabledRulesForContentTypeEventuallyConsistent has `null` in its
108 // return type primarily in case the contentTypeId points to a content type
109 // that doesn't exist. However, even though we know that the contentTypeId
110 // is for a content type that does exist (because we have the full
111 // ContentType model object), we still must handle `null` b/c it could be
112 // that contentType was _just_ created and can't be seen yet by
113 // getEnabledRulesForContentTypeEventuallyConsistent, which, as the name
114 // implies, is eventually consistent.
115 const enabledRules =
116 (await this.getEnabledRulesForItemTypeEventuallyConsistent(
117 itemSubmission.itemType.id,
118 )) ?? [];
119
120 const [liveRules, backgroundRules] = partition(
121 enabledRules,
122 (rule) => rule.status === RuleStatus.LIVE,
123 );
124
125
126 const evaluationContext = this.makeRuleExecutionContext({
127 orgId: itemSubmission.itemType.orgId,
128 input: itemSubmission,
129 });
130
131 const resultsPromise = Promise.all([
132 this.runRuleSet(
133 liveRules,
134 evaluationContext,
135 RuleEnvironment.LIVE,
136 executionsCorrelationId,
137 sync,
138 ),
139 this.runRuleSet(
140 backgroundRules,
141 evaluationContext,
142 RuleEnvironment.BACKGROUND,
143 executionsCorrelationId,
144 sync,
145 ),
146 ]);
147
148 return {
149 // Just return a promise for the actions that were triggered (which
150 // doesn't necessarily mean they've run just yet, w/ queueing +
151 // retrying, etc.) and a way to get this submission's derived field
152 // values (leveraging the cache), because that's actually all we need
153 // right now.
154 actionsTriggered: resultsPromise.then<readonly Action[]>(
155 (it) => it[0].actions,
156 ),
157 getDerivedFieldValue: evaluationContext.getDerivedFieldValue,
158 };
159 }
160
161 /**
162 * This function runs a rule set, which is an array of rules that all apply to
163 * the same item/rule input and must be run as a group (because the actions
164 * they'd trigger if they pass must be deduplicated).
165 *
166 * As part of running the rule set, it publishes all the Actions that the
167 * Rules determine should be run. There is an option to not execute the
168 * Actions for Rules that are running in Background mode or in a Backtest.
169 *
170 * @param rules - the list of Rules that will be run
171 * @param evaluationContext - the context needed to run each rule, including,
172 * most notably, the user-generated content to run the rule against and/or a
173 * user id that can be selected as an input to (future) user-scoring
174 * signals.
175 * @param environment - the RuleEnvironment that this rule is running in. This
176 * influences whether actions should be executed, whether they should count
177 * against daily limits, etc.
178 * @param executionsCorrelationId - An id that should be associated with all
179 * rule executions generated as part of this rule set, for correlating the
180 * execution with the event in the system that caused it.
181 * {@see getCorrelationId}
182 * @param sync - whether the request should run synchronously
183 */
184 async runRuleSet(
185 rules: ReadonlyDeep<RuleWithLatestVersion[]>,
186 evaluationContext: RuleEvaluationContext,
187 environment: RuleEnvironment,
188 executionsCorrelationId: RuleExecutionCorrelationId,
189 sync?: boolean,
190 ): Promise<{
191 rulesToResults: Map<ReadonlyDeep<TRule>, RuleExecutionResult>;
192 actions: readonly Action[];
193 }> {
194 if (!rules.length) {
195 return { rulesToResults: new Map(), actions: [] };
196 }
197
198 const shouldRunActions =
199 this.environmentsThatApplyActions.includes(environment);
200
201 // In some cases, even when we run actions, we don't count the action
202 // against a rule's daily limit. E.g., retroaction/manual.
203 const shouldCountActions =
204 shouldRunActions && environment === RuleEnvironment.LIVE;
205
206 // Do aggregation preprocessing here.
207 await Promise.all(
208 rules.map(async (rule) =>
209 this.preprocessAggregationConditions(
210 rule.conditionSet,
211 evaluationContext,
212 this.tracer,
213 ),
214 ),
215 );
216
217 const ruleResults = await Promise.all(
218 rules.map(async (it) =>
219 this.ruleEvaluator.runRule(it.conditionSet, evaluationContext),
220 ),
221 );
222
223
224 const rulesToResults = new Map(equalLengthZip(rules, ruleResults));
225
226 const passingRules = [...rulesToResults.entries()]
227 .filter(([_rule, result]) => result.passed)
228 .map(([rule, _result]) => rule);
229
230 const actionableRules = passingRules;
231
232 const actionableRulesToActions = new Map(
233 await Promise.all(
234 actionableRules.map(
235 async (rule) => {
236 const actions = (await this.getRuleActionsEventuallyConsistent(
237 rule.id,
238 )) satisfies readonly ReadonlyDeep<Action>[] as readonly Action[];
239
240
241 return [rule, actions] as const;
242 }
243 ),
244 ),
245 );
246
247 // NB: while we only run _deduped_ actions, we record the actions and
248 // update the rule action run counts as though no deduping took place,
249 // since, logically, each rule triggered the action.
250 const { org, input: ruleInput } = evaluationContext;
251
252 const policiesByRule = await this.getPoliciesForRulesEventuallyConsistent(
253 rules.map((it) => it.id),
254 );
255
256 const logRuleExecutionsPromise = this.ruleExecutionLogger.logRuleExecutions(
257 [...rulesToResults.entries()].map(([rule, result]) => ({
258 orgId: org.id,
259 rule: {
260 id: rule.id,
261 name: rule.name,
262 version: rule.latestVersion.version,
263 tags: rule.tags,
264 },
265 ruleInput,
266 environment,
267 result: result.conditionResults,
268 correlationId: executionsCorrelationId,
269 passed: result.passed,
270 policies: policiesByRule[rule.id] ?? [],
271 })),
272 sync,
273 );
274
275 if (!shouldRunActions) {
276 await logRuleExecutionsPromise;
277 return { rulesToResults, actions: [] };
278 }
279
280 const dedupedActions = uniqBy(
281 [...actionableRulesToActions.values()].flat(),
282 (it) => it.id,
283 ) satisfies Action[] as readonly Action[];
284
285 // Publish all (deduped) actions + update the rule action counts if appropriate.
286 const publishActionsPromise = this.actionPublisher
287 .publishActions(
288 dedupedActions.map((action) => {
289 return {
290 action,
291 ruleEnvironment: environment,
292 matchingRules: [...actionableRulesToActions.entries()].flatMap(
293 ([rule, actions]) =>
294 actions.includes(action)
295 ? [
296 {
297 ...safePick(rule, ['id', 'name', 'tags']),
298 version: rule.latestVersion.version,
299 policies: policiesByRule[rule.id] ?? [],
300 },
301 ]
302 : [],
303 ),
304 policies: _.uniqBy(
305 [...actionableRulesToActions.keys()].flatMap(
306 (rule) => policiesByRule[rule.id] ?? [],
307 ),
308 'id',
309 ),
310 };
311 }),
312 {
313 orgId: org.id,
314 targetItem: ruleInput,
315 correlationId: executionsCorrelationId as CorrelationId<
316 Exclude<CorrelationIdType<RuleExecutionCorrelationId>, 'backtest'>
317 >,
318 sync,
319 },
320 )
321 .catch((e) => {
322 this.tracer.logActiveSpanFailedIfAny(e);
323 throw e;
324 });
325
326 const updateRuleActionCountsPromise = shouldCountActions
327 ? this.recordRuleActionLimitUsage(
328 actionableRules.map((it) => it.id),
329 ).catch(() => {
330 // This query sometimes fails from a Sequelize Acquire Connection
331 // timeout. While we're debugging the root cause of that further,
332 // swallow the error rather than crashing the process.
333 })
334 : undefined;
335
336 await Promise.all([
337 publishActionsPromise,
338 logRuleExecutionsPromise,
339 updateRuleActionCountsPromise,
340 ]);
341
342 return { rulesToResults, actions: dedupedActions };
343 }
344
345 async preprocessAggregationConditions(
346 ruleConditions: ReadonlyDeep<ConditionSet>,
347 context: RuleEvaluationContext,
348 tracer: SafeTracer,
349 ) {
350 const { input } = context;
351
352 if (isFullSubmission(input)) {
353 const aggregations = getAllAggregationsInConditionSet(ruleConditions);
354
355 const aggregationAndConditionResults = await Promise.all(
356 aggregations.map(async (aggregation) => {
357 if (!aggregation.conditionSet) {
358 return { aggregation, shouldUpdate: true };
359 }
360
361 const results = await getConditionSetResults(
362 aggregation.conditionSet,
363 context,
364 tracer,
365 );
366
367 return {
368 aggregation,
369 shouldUpdate:
370 results.result.outcome === ConditionCompletionOutcome.PASSED,
371 };
372 }),
373 );
374
375 await Promise.all(
376 aggregationAndConditionResults
377 .filter(({ shouldUpdate }) => shouldUpdate)
378 .map(async ({ aggregation }) => {
379 const runtimeArgs = await evaluateAggregationRuntimeArgsForItem(
380 context,
381 input,
382 aggregation,
383 );
384
385 if (!runtimeArgs) {
386 return;
387 }
388
389 await this.aggregationsService.updateAggregation(
390 aggregation,
391 runtimeArgs,
392 tracer,
393 );
394 }),
395 );
396 }
397 }
398
399}
400
401export default inject(
402 [
403 'RuleExecutionLogger',
404 'RuleEvaluator',
405 'ActionPublisher',
406 'getEnabledRulesForItemTypeEventuallyConsistent',
407 'getPoliciesForRulesEventuallyConsistent',
408 'getActionsForRuleEventuallyConsistent',
409 'recordRuleActionLimitUsage',
410 'AggregationsService',
411 'Tracer',
412 ],
413 RuleEngine,
414);
415export { type RuleEngine };