Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import _ from 'lodash';
2import { type ReadonlyDeep } from 'type-fest';
3
4import {
5 getAllAggregationsInConditionSet,
6 getConditionSetResults,
7} from '../condition_evaluator/conditionSet.js';
8import { type Dependencies } from '../iocContainer/index.js';
9import { inject } from '../iocContainer/utils.js';
10import { type PlainRuleWithLatestVersion } from '../models/rules/ruleTypes.js';
11import { evaluateAggregationRuntimeArgsForItem } from '../services/aggregationsService/index.js';
12import { type ItemSubmission } from '../services/itemProcessingService/index.js';
13import {
14 type Action,
15 ConditionCompletionOutcome,
16 type ConditionSet,
17 RuleStatus,
18} from '../services/moderationConfigService/index.js';
19import { type RuleExecutionCorrelationId } from '../services/analyticsLoggers/index.js';
20import {
21 type CorrelationId,
22 type CorrelationIdType,
23} from '../utils/correlationIds.js';
24import { equalLengthZip } from '../utils/fp-helpers.js';
25import { safePick } from '../utils/misc.js';
26import type SafeTracer from '../utils/SafeTracer.js';
27import {
28 isFullSubmission,
29 type RuleEvaluationContext,
30 // This is used for a {@link} in a jsdoc comments.
31 // eslint-disable-next-line @typescript-eslint/no-unused-vars
32 type RuleEvaluator,
33 type RuleExecutionResult,
34 type RuleInput,
35} from './RuleEvaluator.js';
36
37const { partition, uniqBy } = _;
38
39// Represents the context from which a rule was triggered, which effects whether
40// it passing will trigger actions and, if so, whether those actions will count
41// against the rule's limit. Often, the rule environment is the same as the
42// rule's status (e.g., a `LIVE` content rule will run in the `LIVE` rule
43// environment when a new piece of content is submitted). However, sometimes the
44// two fields come apart. E.g., a background rule can be backtested and a live
45// rule can run under retroaction, among many other possibilities.
46export enum RuleEnvironment {
47 LIVE = 'LIVE',
48 BACKGROUND = 'BACKGROUND',
49 BACKTEST = 'BACKTEST',
50 MANUAL = 'MANUAL',
51 RETROACTION = 'RETROACTION',
52}
53
54/**
55 * This is the main Rule Engine class. It's responsible for running
56 * all of the user's Rules on a single piece of content sent to
57 * our API.
58 */
59class RuleEngine {
60 constructor(
61 private readonly ruleExecutionLogger: Dependencies['RuleExecutionLogger'],
62 private readonly ruleEvaluator: Dependencies['RuleEvaluator'],
63 private readonly actionPublisher: Dependencies['ActionPublisher'],
64 private readonly getEnabledRulesForItemTypeEventuallyConsistent: Dependencies['getEnabledRulesForItemTypeEventuallyConsistent'],
65 private readonly getPoliciesForRulesEventuallyConsistent: Dependencies['getPoliciesForRulesEventuallyConsistent'],
66 private readonly getRuleActionsEventuallyConsistent: Dependencies['getActionsForRuleEventuallyConsistent'],
67 private readonly recordRuleActionLimitUsage: Dependencies['recordRuleActionLimitUsage'],
68 private readonly aggregationsService: Dependencies['AggregationsService'],
69 private readonly tracer: Dependencies['Tracer'],
70 ) {}
71
72 private readonly environmentsThatApplyActions = [
73 RuleEnvironment.LIVE,
74 RuleEnvironment.MANUAL,
75 RuleEnvironment.RETROACTION,
76 ];
77
78 /**
79 * @see {@link RuleEvaluator.makeRuleExecutionContext}.
80 */
81 makeRuleExecutionContext(args: {
82 orgId: string;
83 input: RuleInput;
84 }): RuleEvaluationContext {
85 return this.ruleEvaluator.makeRuleExecutionContext(args);
86 }
87
88 /**
89 * Runs the rules that are "enabled" ({@see ItemType.getEnabledRules}) for
90 * this item type, against the given itemSubmission.
91 *
92 * @param itemSubmission
93 * @param executionsCorrelationId - An id that should be associated with all
94 * rule executions generated as part of running these rules, for correlating
95 * the execution with the event in the system that caused it.
96 * {@see getCorrelationId}
97 */
98 async runEnabledRules(
99 itemSubmission: ItemSubmission,
100 executionsCorrelationId: RuleExecutionCorrelationId,
101 sync: boolean = false,
102 ) {
103 // enabledRules can be null when the contentType can't be found.
104 // getEnabledRulesForContentTypeEventuallyConsistent has `null` in its
105 // return type primarily in case the contentTypeId points to a content type
106 // that doesn't exist. However, even though we know that the contentTypeId
107 // is for a content type that does exist (because we have the full
108 // ContentType model object), we still must handle `null` b/c it could be
109 // that contentType was _just_ created and can't be seen yet by
110 // getEnabledRulesForContentTypeEventuallyConsistent, which, as the name
111 // implies, is eventually consistent.
112 const enabledRules =
113 (await this.getEnabledRulesForItemTypeEventuallyConsistent(
114 itemSubmission.itemType.id,
115 )) ?? [];
116
117 const [liveRules, backgroundRules] = partition(
118 enabledRules,
119 (rule) => rule.status === RuleStatus.LIVE,
120 );
121
122
123 const evaluationContext = this.makeRuleExecutionContext({
124 orgId: itemSubmission.itemType.orgId,
125 input: itemSubmission,
126 });
127
128 const resultsPromise = Promise.all([
129 this.runRuleSet(
130 liveRules,
131 evaluationContext,
132 RuleEnvironment.LIVE,
133 executionsCorrelationId,
134 sync,
135 ),
136 this.runRuleSet(
137 backgroundRules,
138 evaluationContext,
139 RuleEnvironment.BACKGROUND,
140 executionsCorrelationId,
141 sync,
142 ),
143 ]);
144
145 return {
146 // Just return a promise for the actions that were triggered (which
147 // doesn't necessarily mean they've run just yet, w/ queueing +
148 // retrying, etc.) and a way to get this submission's derived field
149 // values (leveraging the cache), because that's actually all we need
150 // right now.
151 actionsTriggered: resultsPromise.then<readonly Action[]>(
152 (it) => it[0].actions,
153 ),
154 getDerivedFieldValue: evaluationContext.getDerivedFieldValue,
155 };
156 }
157
158 /**
159 * This function runs a rule set, which is an array of rules that all apply to
160 * the same item/rule input and must be run as a group (because the actions
161 * they'd trigger if they pass must be deduplicated).
162 *
163 * As part of running the rule set, it publishes all the Actions that the
164 * Rules determine should be run. There is an option to not execute the
165 * Actions for Rules that are running in Background mode or in a Backtest.
166 *
167 * @param rules - the list of Rules that will be run
168 * @param evaluationContext - the context needed to run each rule, including,
169 * most notably, the user-generated content to run the rule against and/or a
170 * user id that can be selected as an input to (future) user-scoring
171 * signals.
172 * @param environment - the RuleEnvironment that this rule is running in. This
173 * influences whether actions should be executed, whether they should count
174 * against daily limits, etc.
175 * @param executionsCorrelationId - An id that should be associated with all
176 * rule executions generated as part of this rule set, for correlating the
177 * execution with the event in the system that caused it.
178 * {@see getCorrelationId}
179 * @param sync - whether the request should run synchronously
180 */
181 async runRuleSet(
182 rules: ReadonlyDeep<PlainRuleWithLatestVersion[]>,
183 evaluationContext: RuleEvaluationContext,
184 environment: RuleEnvironment,
185 executionsCorrelationId: RuleExecutionCorrelationId,
186 sync?: boolean,
187 ): Promise<{
188 rulesToResults: Map<
189 ReadonlyDeep<PlainRuleWithLatestVersion>,
190 RuleExecutionResult
191 >;
192 actions: readonly Action[];
193 }> {
194 if (!rules.length) {
195 return { rulesToResults: new Map(), actions: [] };
196 }
197
198 const shouldRunActions =
199 this.environmentsThatApplyActions.includes(environment);
200
201 // In some cases, even when we run actions, we don't count the action
202 // against a rule's daily limit. E.g., retroaction/manual.
203 const shouldCountActions =
204 shouldRunActions && environment === RuleEnvironment.LIVE;
205
206 // Do aggregation preprocessing here.
207 await Promise.all(
208 rules.map(async (rule) =>
209 this.preprocessAggregationConditions(
210 rule.conditionSet,
211 evaluationContext,
212 this.tracer,
213 ),
214 ),
215 );
216
217 const ruleResults = await Promise.all(
218 rules.map(async (it) =>
219 this.ruleEvaluator.runRule(it.conditionSet, evaluationContext),
220 ),
221 );
222
223
224 const rulesToResults = new Map(equalLengthZip(rules, ruleResults));
225
226 const passingRules = [...rulesToResults.entries()]
227 .filter(([_rule, result]) => result.passed)
228 .map(([rule, _result]) => rule);
229
230 const actionableRules = passingRules;
231
232 const actionableRulesToActions = new Map(
233 await Promise.all(
234 actionableRules.map(
235 async (rule) => {
236 const actions = (await this.getRuleActionsEventuallyConsistent({
237 orgId: evaluationContext.org.id,
238 ruleId: rule.id,
239 })) satisfies readonly ReadonlyDeep<Action>[] as readonly Action[];
240
241
242 return [rule, actions] as const;
243 }
244 ),
245 ),
246 );
247
248 // NB: while we only run _deduped_ actions, we record the actions and
249 // update the rule action run counts as though no deduping took place,
250 // since, logically, each rule triggered the action.
251 const { org, input: ruleInput } = evaluationContext;
252
253 const policiesByRule = await this.getPoliciesForRulesEventuallyConsistent(
254 rules.map((it) => it.id),
255 );
256
257 const logRuleExecutionsPromise = this.ruleExecutionLogger.logRuleExecutions(
258 [...rulesToResults.entries()].map(([rule, result]) => ({
259 orgId: org.id,
260 rule: {
261 id: rule.id,
262 name: rule.name,
263 version: rule.latestVersion.version,
264 tags: rule.tags,
265 },
266 ruleInput,
267 environment,
268 result: result.conditionResults,
269 correlationId: executionsCorrelationId,
270 passed: result.passed,
271 policies: policiesByRule[rule.id] ?? [],
272 })),
273 sync,
274 );
275
276 if (!shouldRunActions) {
277 await logRuleExecutionsPromise;
278 return { rulesToResults, actions: [] };
279 }
280
281 const dedupedActions = uniqBy(
282 [...actionableRulesToActions.values()].flat(),
283 (it) => it.id,
284 ) satisfies Action[] as readonly Action[];
285
286 // Publish all (deduped) actions + update the rule action counts if appropriate.
287 const publishActionsPromise = this.actionPublisher
288 .publishActions(
289 dedupedActions.map((action) => {
290 return {
291 action,
292 ruleEnvironment: environment,
293 matchingRules: [...actionableRulesToActions.entries()].flatMap(
294 ([rule, actions]) =>
295 actions.includes(action)
296 ? [
297 {
298 ...safePick(rule, ['id', 'name', 'tags']),
299 version: rule.latestVersion.version,
300 policies: policiesByRule[rule.id] ?? [],
301 },
302 ]
303 : [],
304 ),
305 policies: _.uniqBy(
306 [...actionableRulesToActions.keys()].flatMap(
307 (rule) => policiesByRule[rule.id] ?? [],
308 ),
309 'id',
310 ),
311 };
312 }),
313 {
314 orgId: org.id,
315 targetItem: ruleInput,
316 correlationId: executionsCorrelationId as CorrelationId<
317 Exclude<CorrelationIdType<RuleExecutionCorrelationId>, 'backtest'>
318 >,
319 sync,
320 },
321 )
322 .catch((e) => {
323 this.tracer.logActiveSpanFailedIfAny(e);
324 throw e;
325 });
326
327 const updateRuleActionCountsPromise = shouldCountActions
328 ? this.recordRuleActionLimitUsage(
329 actionableRules.map((it) => it.id),
330 ).catch(() => {
331 // This query sometimes fails from a Sequelize Acquire Connection
332 // timeout. While we're debugging the root cause of that further,
333 // swallow the error rather than crashing the process.
334 })
335 : undefined;
336
337 await Promise.all([
338 publishActionsPromise,
339 logRuleExecutionsPromise,
340 updateRuleActionCountsPromise,
341 ]);
342
343 return { rulesToResults, actions: dedupedActions };
344 }
345
346 async preprocessAggregationConditions(
347 ruleConditions: ReadonlyDeep<ConditionSet>,
348 context: RuleEvaluationContext,
349 tracer: SafeTracer,
350 ) {
351 const { input } = context;
352
353 if (isFullSubmission(input)) {
354 const aggregations = getAllAggregationsInConditionSet(ruleConditions);
355
356 const aggregationAndConditionResults = await Promise.all(
357 aggregations.map(async (aggregation) => {
358 if (!aggregation.conditionSet) {
359 return { aggregation, shouldUpdate: true };
360 }
361
362 const results = await getConditionSetResults(
363 aggregation.conditionSet,
364 context,
365 tracer,
366 );
367
368 return {
369 aggregation,
370 shouldUpdate:
371 results.result.outcome === ConditionCompletionOutcome.PASSED,
372 };
373 }),
374 );
375
376 await Promise.all(
377 aggregationAndConditionResults
378 .filter(({ shouldUpdate }) => shouldUpdate)
379 .map(async ({ aggregation }) => {
380 const runtimeArgs = await evaluateAggregationRuntimeArgsForItem(
381 context,
382 input,
383 aggregation,
384 );
385
386 if (!runtimeArgs) {
387 return;
388 }
389
390 await this.aggregationsService.updateAggregation(
391 aggregation,
392 runtimeArgs,
393 tracer,
394 );
395 }),
396 );
397 }
398 }
399
400}
401
402export default inject(
403 [
404 'RuleExecutionLogger',
405 'RuleEvaluator',
406 'ActionPublisher',
407 'getEnabledRulesForItemTypeEventuallyConsistent',
408 'getPoliciesForRulesEventuallyConsistent',
409 'getActionsForRuleEventuallyConsistent',
410 'recordRuleActionLimitUsage',
411 'AggregationsService',
412 'Tracer',
413 ],
414 RuleEngine,
415);
416export { type RuleEngine };