Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 415 lines 14 kB view raw
1import _ from 'lodash'; 2import { type ReadonlyDeep } from 'type-fest'; 3 4import { 5 getAllAggregationsInConditionSet, 6 getConditionSetResults, 7} from '../condition_evaluator/conditionSet.js'; 8import { type Dependencies } from '../iocContainer/index.js'; 9import { inject } from '../iocContainer/utils.js'; 10import { 11 ConditionCompletionOutcome, 12 type RuleWithLatestVersion, 13 type Rule as TRule, 14} from '../models/rules/RuleModel.js'; 15import { evaluateAggregationRuntimeArgsForItem } from '../services/aggregationsService/index.js'; 16import { type ItemSubmission } from '../services/itemProcessingService/index.js'; 17import { 18 RuleStatus, 19 type Action, 20 type ConditionSet, 21} from '../services/moderationConfigService/index.js'; 22import { type RuleExecutionCorrelationId } from '../services/analyticsLoggers/index.js'; 23import { 24 type CorrelationId, 25 type CorrelationIdType, 26} from '../utils/correlationIds.js'; 27import { equalLengthZip } from '../utils/fp-helpers.js'; 28import { safePick } from '../utils/misc.js'; 29import type SafeTracer from '../utils/SafeTracer.js'; 30import { 31 isFullSubmission, 32 type RuleEvaluationContext, 33 // This is used for a {@link} in a jsdoc comments. 34 // eslint-disable-next-line @typescript-eslint/no-unused-vars 35 type RuleEvaluator, 36 type RuleExecutionResult, 37 type RuleInput, 38} from './RuleEvaluator.js'; 39 40const { partition, uniqBy } = _; 41 42// Represents the context from which a rule was triggered, which effects whether 43// it passing will trigger actions and, if so, whether those actions will count 44// against the rule's limit. Often, the rule environment is the same as the 45// rule's status (e.g., a `LIVE` content rule will run in the `LIVE` rule 46// environment when a new piece of content is submitted). However, sometimes the 47// two fields come apart. E.g., a background rule can be backtested and a live 48// rule can run under retroaction, among many other possibilities. 49export enum RuleEnvironment { 50 LIVE = 'LIVE', 51 BACKGROUND = 'BACKGROUND', 52 BACKTEST = 'BACKTEST', 53 MANUAL = 'MANUAL', 54 RETROACTION = 'RETROACTION', 55} 56 57/** 58 * This is the main Rule Engine class. It's responsible for running 59 * all of the user's Rules on a single piece of content sent to 60 * our API. 61 */ 62class RuleEngine { 63 constructor( 64 private readonly ruleExecutionLogger: Dependencies['RuleExecutionLogger'], 65 private readonly ruleEvaluator: Dependencies['RuleEvaluator'], 66 private readonly actionPublisher: Dependencies['ActionPublisher'], 67 private readonly getEnabledRulesForItemTypeEventuallyConsistent: Dependencies['getEnabledRulesForItemTypeEventuallyConsistent'], 68 private readonly getPoliciesForRulesEventuallyConsistent: Dependencies['getPoliciesForRulesEventuallyConsistent'], 69 private readonly getRuleActionsEventuallyConsistent: Dependencies['getActionsForRuleEventuallyConsistent'], 70 private readonly recordRuleActionLimitUsage: Dependencies['recordRuleActionLimitUsage'], 71 private readonly aggregationsService: Dependencies['AggregationsService'], 72 private readonly tracer: Dependencies['Tracer'], 73 ) {} 74 75 private readonly environmentsThatApplyActions = [ 76 RuleEnvironment.LIVE, 77 RuleEnvironment.MANUAL, 78 RuleEnvironment.RETROACTION, 79 ]; 80 81 /** 82 * @see {@link RuleEvaluator.makeRuleExecutionContext}. 83 */ 84 makeRuleExecutionContext(args: { 85 orgId: string; 86 input: RuleInput; 87 }): RuleEvaluationContext { 88 return this.ruleEvaluator.makeRuleExecutionContext(args); 89 } 90 91 /** 92 * Runs the rules that are "enabled" ({@see ItemType.getEnabledRules}) for 93 * this item type, against the given itemSubmission. 94 * 95 * @param itemSubmission 96 * @param executionsCorrelationId - An id that should be associated with all 97 * rule executions generated as part of running these rules, for correlating 98 * the execution with the event in the system that caused it. 99 * {@see getCorrelationId} 100 */ 101 async runEnabledRules( 102 itemSubmission: ItemSubmission, 103 executionsCorrelationId: RuleExecutionCorrelationId, 104 sync: boolean = false, 105 ) { 106 // enabledRules can be null when the contentType can't be found. 107 // getEnabledRulesForContentTypeEventuallyConsistent has `null` in its 108 // return type primarily in case the contentTypeId points to a content type 109 // that doesn't exist. However, even though we know that the contentTypeId 110 // is for a content type that does exist (because we have the full 111 // ContentType model object), we still must handle `null` b/c it could be 112 // that contentType was _just_ created and can't be seen yet by 113 // getEnabledRulesForContentTypeEventuallyConsistent, which, as the name 114 // implies, is eventually consistent. 115 const enabledRules = 116 (await this.getEnabledRulesForItemTypeEventuallyConsistent( 117 itemSubmission.itemType.id, 118 )) ?? []; 119 120 const [liveRules, backgroundRules] = partition( 121 enabledRules, 122 (rule) => rule.status === RuleStatus.LIVE, 123 ); 124 125 126 const evaluationContext = this.makeRuleExecutionContext({ 127 orgId: itemSubmission.itemType.orgId, 128 input: itemSubmission, 129 }); 130 131 const resultsPromise = Promise.all([ 132 this.runRuleSet( 133 liveRules, 134 evaluationContext, 135 RuleEnvironment.LIVE, 136 executionsCorrelationId, 137 sync, 138 ), 139 this.runRuleSet( 140 backgroundRules, 141 evaluationContext, 142 RuleEnvironment.BACKGROUND, 143 executionsCorrelationId, 144 sync, 145 ), 146 ]); 147 148 return { 149 // Just return a promise for the actions that were triggered (which 150 // doesn't necessarily mean they've run just yet, w/ queueing + 151 // retrying, etc.) and a way to get this submission's derived field 152 // values (leveraging the cache), because that's actually all we need 153 // right now. 154 actionsTriggered: resultsPromise.then<readonly Action[]>( 155 (it) => it[0].actions, 156 ), 157 getDerivedFieldValue: evaluationContext.getDerivedFieldValue, 158 }; 159 } 160 161 /** 162 * This function runs a rule set, which is an array of rules that all apply to 163 * the same item/rule input and must be run as a group (because the actions 164 * they'd trigger if they pass must be deduplicated). 165 * 166 * As part of running the rule set, it publishes all the Actions that the 167 * Rules determine should be run. There is an option to not execute the 168 * Actions for Rules that are running in Background mode or in a Backtest. 169 * 170 * @param rules - the list of Rules that will be run 171 * @param evaluationContext - the context needed to run each rule, including, 172 * most notably, the user-generated content to run the rule against and/or a 173 * user id that can be selected as an input to (future) user-scoring 174 * signals. 175 * @param environment - the RuleEnvironment that this rule is running in. This 176 * influences whether actions should be executed, whether they should count 177 * against daily limits, etc. 178 * @param executionsCorrelationId - An id that should be associated with all 179 * rule executions generated as part of this rule set, for correlating the 180 * execution with the event in the system that caused it. 181 * {@see getCorrelationId} 182 * @param sync - whether the request should run synchronously 183 */ 184 async runRuleSet( 185 rules: ReadonlyDeep<RuleWithLatestVersion[]>, 186 evaluationContext: RuleEvaluationContext, 187 environment: RuleEnvironment, 188 executionsCorrelationId: RuleExecutionCorrelationId, 189 sync?: boolean, 190 ): Promise<{ 191 rulesToResults: Map<ReadonlyDeep<TRule>, RuleExecutionResult>; 192 actions: readonly Action[]; 193 }> { 194 if (!rules.length) { 195 return { rulesToResults: new Map(), actions: [] }; 196 } 197 198 const shouldRunActions = 199 this.environmentsThatApplyActions.includes(environment); 200 201 // In some cases, even when we run actions, we don't count the action 202 // against a rule's daily limit. E.g., retroaction/manual. 203 const shouldCountActions = 204 shouldRunActions && environment === RuleEnvironment.LIVE; 205 206 // Do aggregation preprocessing here. 207 await Promise.all( 208 rules.map(async (rule) => 209 this.preprocessAggregationConditions( 210 rule.conditionSet, 211 evaluationContext, 212 this.tracer, 213 ), 214 ), 215 ); 216 217 const ruleResults = await Promise.all( 218 rules.map(async (it) => 219 this.ruleEvaluator.runRule(it.conditionSet, evaluationContext), 220 ), 221 ); 222 223 224 const rulesToResults = new Map(equalLengthZip(rules, ruleResults)); 225 226 const passingRules = [...rulesToResults.entries()] 227 .filter(([_rule, result]) => result.passed) 228 .map(([rule, _result]) => rule); 229 230 const actionableRules = passingRules; 231 232 const actionableRulesToActions = new Map( 233 await Promise.all( 234 actionableRules.map( 235 async (rule) => { 236 const actions = (await this.getRuleActionsEventuallyConsistent( 237 rule.id, 238 )) satisfies readonly ReadonlyDeep<Action>[] as readonly Action[]; 239 240 241 return [rule, actions] as const; 242 } 243 ), 244 ), 245 ); 246 247 // NB: while we only run _deduped_ actions, we record the actions and 248 // update the rule action run counts as though no deduping took place, 249 // since, logically, each rule triggered the action. 250 const { org, input: ruleInput } = evaluationContext; 251 252 const policiesByRule = await this.getPoliciesForRulesEventuallyConsistent( 253 rules.map((it) => it.id), 254 ); 255 256 const logRuleExecutionsPromise = this.ruleExecutionLogger.logRuleExecutions( 257 [...rulesToResults.entries()].map(([rule, result]) => ({ 258 orgId: org.id, 259 rule: { 260 id: rule.id, 261 name: rule.name, 262 version: rule.latestVersion.version, 263 tags: rule.tags, 264 }, 265 ruleInput, 266 environment, 267 result: result.conditionResults, 268 correlationId: executionsCorrelationId, 269 passed: result.passed, 270 policies: policiesByRule[rule.id] ?? [], 271 })), 272 sync, 273 ); 274 275 if (!shouldRunActions) { 276 await logRuleExecutionsPromise; 277 return { rulesToResults, actions: [] }; 278 } 279 280 const dedupedActions = uniqBy( 281 [...actionableRulesToActions.values()].flat(), 282 (it) => it.id, 283 ) satisfies Action[] as readonly Action[]; 284 285 // Publish all (deduped) actions + update the rule action counts if appropriate. 286 const publishActionsPromise = this.actionPublisher 287 .publishActions( 288 dedupedActions.map((action) => { 289 return { 290 action, 291 ruleEnvironment: environment, 292 matchingRules: [...actionableRulesToActions.entries()].flatMap( 293 ([rule, actions]) => 294 actions.includes(action) 295 ? [ 296 { 297 ...safePick(rule, ['id', 'name', 'tags']), 298 version: rule.latestVersion.version, 299 policies: policiesByRule[rule.id] ?? [], 300 }, 301 ] 302 : [], 303 ), 304 policies: _.uniqBy( 305 [...actionableRulesToActions.keys()].flatMap( 306 (rule) => policiesByRule[rule.id] ?? [], 307 ), 308 'id', 309 ), 310 }; 311 }), 312 { 313 orgId: org.id, 314 targetItem: ruleInput, 315 correlationId: executionsCorrelationId as CorrelationId< 316 Exclude<CorrelationIdType<RuleExecutionCorrelationId>, 'backtest'> 317 >, 318 sync, 319 }, 320 ) 321 .catch((e) => { 322 this.tracer.logActiveSpanFailedIfAny(e); 323 throw e; 324 }); 325 326 const updateRuleActionCountsPromise = shouldCountActions 327 ? this.recordRuleActionLimitUsage( 328 actionableRules.map((it) => it.id), 329 ).catch(() => { 330 // This query sometimes fails from a Sequelize Acquire Connection 331 // timeout. While we're debugging the root cause of that further, 332 // swallow the error rather than crashing the process. 333 }) 334 : undefined; 335 336 await Promise.all([ 337 publishActionsPromise, 338 logRuleExecutionsPromise, 339 updateRuleActionCountsPromise, 340 ]); 341 342 return { rulesToResults, actions: dedupedActions }; 343 } 344 345 async preprocessAggregationConditions( 346 ruleConditions: ReadonlyDeep<ConditionSet>, 347 context: RuleEvaluationContext, 348 tracer: SafeTracer, 349 ) { 350 const { input } = context; 351 352 if (isFullSubmission(input)) { 353 const aggregations = getAllAggregationsInConditionSet(ruleConditions); 354 355 const aggregationAndConditionResults = await Promise.all( 356 aggregations.map(async (aggregation) => { 357 if (!aggregation.conditionSet) { 358 return { aggregation, shouldUpdate: true }; 359 } 360 361 const results = await getConditionSetResults( 362 aggregation.conditionSet, 363 context, 364 tracer, 365 ); 366 367 return { 368 aggregation, 369 shouldUpdate: 370 results.result.outcome === ConditionCompletionOutcome.PASSED, 371 }; 372 }), 373 ); 374 375 await Promise.all( 376 aggregationAndConditionResults 377 .filter(({ shouldUpdate }) => shouldUpdate) 378 .map(async ({ aggregation }) => { 379 const runtimeArgs = await evaluateAggregationRuntimeArgsForItem( 380 context, 381 input, 382 aggregation, 383 ); 384 385 if (!runtimeArgs) { 386 return; 387 } 388 389 await this.aggregationsService.updateAggregation( 390 aggregation, 391 runtimeArgs, 392 tracer, 393 ); 394 }), 395 ); 396 } 397 } 398 399} 400 401export default inject( 402 [ 403 'RuleExecutionLogger', 404 'RuleEvaluator', 405 'ActionPublisher', 406 'getEnabledRulesForItemTypeEventuallyConsistent', 407 'getPoliciesForRulesEventuallyConsistent', 408 'getActionsForRuleEventuallyConsistent', 409 'recordRuleActionLimitUsage', 410 'AggregationsService', 411 'Tracer', 412 ], 413 RuleEngine, 414); 415export { type RuleEngine };