Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 416 lines 14 kB view raw
1import _ from 'lodash'; 2import { type ReadonlyDeep } from 'type-fest'; 3 4import { 5 getAllAggregationsInConditionSet, 6 getConditionSetResults, 7} from '../condition_evaluator/conditionSet.js'; 8import { type Dependencies } from '../iocContainer/index.js'; 9import { inject } from '../iocContainer/utils.js'; 10import { type PlainRuleWithLatestVersion } from '../models/rules/ruleTypes.js'; 11import { evaluateAggregationRuntimeArgsForItem } from '../services/aggregationsService/index.js'; 12import { type ItemSubmission } from '../services/itemProcessingService/index.js'; 13import { 14 type Action, 15 ConditionCompletionOutcome, 16 type ConditionSet, 17 RuleStatus, 18} from '../services/moderationConfigService/index.js'; 19import { type RuleExecutionCorrelationId } from '../services/analyticsLoggers/index.js'; 20import { 21 type CorrelationId, 22 type CorrelationIdType, 23} from '../utils/correlationIds.js'; 24import { equalLengthZip } from '../utils/fp-helpers.js'; 25import { safePick } from '../utils/misc.js'; 26import type SafeTracer from '../utils/SafeTracer.js'; 27import { 28 isFullSubmission, 29 type RuleEvaluationContext, 30 // This is used for a {@link} in a jsdoc comments. 31 // eslint-disable-next-line @typescript-eslint/no-unused-vars 32 type RuleEvaluator, 33 type RuleExecutionResult, 34 type RuleInput, 35} from './RuleEvaluator.js'; 36 37const { partition, uniqBy } = _; 38 39// Represents the context from which a rule was triggered, which effects whether 40// it passing will trigger actions and, if so, whether those actions will count 41// against the rule's limit. Often, the rule environment is the same as the 42// rule's status (e.g., a `LIVE` content rule will run in the `LIVE` rule 43// environment when a new piece of content is submitted). However, sometimes the 44// two fields come apart. E.g., a background rule can be backtested and a live 45// rule can run under retroaction, among many other possibilities. 46export enum RuleEnvironment { 47 LIVE = 'LIVE', 48 BACKGROUND = 'BACKGROUND', 49 BACKTEST = 'BACKTEST', 50 MANUAL = 'MANUAL', 51 RETROACTION = 'RETROACTION', 52} 53 54/** 55 * This is the main Rule Engine class. It's responsible for running 56 * all of the user's Rules on a single piece of content sent to 57 * our API. 58 */ 59class RuleEngine { 60 constructor( 61 private readonly ruleExecutionLogger: Dependencies['RuleExecutionLogger'], 62 private readonly ruleEvaluator: Dependencies['RuleEvaluator'], 63 private readonly actionPublisher: Dependencies['ActionPublisher'], 64 private readonly getEnabledRulesForItemTypeEventuallyConsistent: Dependencies['getEnabledRulesForItemTypeEventuallyConsistent'], 65 private readonly getPoliciesForRulesEventuallyConsistent: Dependencies['getPoliciesForRulesEventuallyConsistent'], 66 private readonly getRuleActionsEventuallyConsistent: Dependencies['getActionsForRuleEventuallyConsistent'], 67 private readonly recordRuleActionLimitUsage: Dependencies['recordRuleActionLimitUsage'], 68 private readonly aggregationsService: Dependencies['AggregationsService'], 69 private readonly tracer: Dependencies['Tracer'], 70 ) {} 71 72 private readonly environmentsThatApplyActions = [ 73 RuleEnvironment.LIVE, 74 RuleEnvironment.MANUAL, 75 RuleEnvironment.RETROACTION, 76 ]; 77 78 /** 79 * @see {@link RuleEvaluator.makeRuleExecutionContext}. 80 */ 81 makeRuleExecutionContext(args: { 82 orgId: string; 83 input: RuleInput; 84 }): RuleEvaluationContext { 85 return this.ruleEvaluator.makeRuleExecutionContext(args); 86 } 87 88 /** 89 * Runs the rules that are "enabled" ({@see ItemType.getEnabledRules}) for 90 * this item type, against the given itemSubmission. 91 * 92 * @param itemSubmission 93 * @param executionsCorrelationId - An id that should be associated with all 94 * rule executions generated as part of running these rules, for correlating 95 * the execution with the event in the system that caused it. 96 * {@see getCorrelationId} 97 */ 98 async runEnabledRules( 99 itemSubmission: ItemSubmission, 100 executionsCorrelationId: RuleExecutionCorrelationId, 101 sync: boolean = false, 102 ) { 103 // enabledRules can be null when the contentType can't be found. 104 // getEnabledRulesForContentTypeEventuallyConsistent has `null` in its 105 // return type primarily in case the contentTypeId points to a content type 106 // that doesn't exist. However, even though we know that the contentTypeId 107 // is for a content type that does exist (because we have the full 108 // ContentType model object), we still must handle `null` b/c it could be 109 // that contentType was _just_ created and can't be seen yet by 110 // getEnabledRulesForContentTypeEventuallyConsistent, which, as the name 111 // implies, is eventually consistent. 112 const enabledRules = 113 (await this.getEnabledRulesForItemTypeEventuallyConsistent( 114 itemSubmission.itemType.id, 115 )) ?? []; 116 117 const [liveRules, backgroundRules] = partition( 118 enabledRules, 119 (rule) => rule.status === RuleStatus.LIVE, 120 ); 121 122 123 const evaluationContext = this.makeRuleExecutionContext({ 124 orgId: itemSubmission.itemType.orgId, 125 input: itemSubmission, 126 }); 127 128 const resultsPromise = Promise.all([ 129 this.runRuleSet( 130 liveRules, 131 evaluationContext, 132 RuleEnvironment.LIVE, 133 executionsCorrelationId, 134 sync, 135 ), 136 this.runRuleSet( 137 backgroundRules, 138 evaluationContext, 139 RuleEnvironment.BACKGROUND, 140 executionsCorrelationId, 141 sync, 142 ), 143 ]); 144 145 return { 146 // Just return a promise for the actions that were triggered (which 147 // doesn't necessarily mean they've run just yet, w/ queueing + 148 // retrying, etc.) and a way to get this submission's derived field 149 // values (leveraging the cache), because that's actually all we need 150 // right now. 151 actionsTriggered: resultsPromise.then<readonly Action[]>( 152 (it) => it[0].actions, 153 ), 154 getDerivedFieldValue: evaluationContext.getDerivedFieldValue, 155 }; 156 } 157 158 /** 159 * This function runs a rule set, which is an array of rules that all apply to 160 * the same item/rule input and must be run as a group (because the actions 161 * they'd trigger if they pass must be deduplicated). 162 * 163 * As part of running the rule set, it publishes all the Actions that the 164 * Rules determine should be run. There is an option to not execute the 165 * Actions for Rules that are running in Background mode or in a Backtest. 166 * 167 * @param rules - the list of Rules that will be run 168 * @param evaluationContext - the context needed to run each rule, including, 169 * most notably, the user-generated content to run the rule against and/or a 170 * user id that can be selected as an input to (future) user-scoring 171 * signals. 172 * @param environment - the RuleEnvironment that this rule is running in. This 173 * influences whether actions should be executed, whether they should count 174 * against daily limits, etc. 175 * @param executionsCorrelationId - An id that should be associated with all 176 * rule executions generated as part of this rule set, for correlating the 177 * execution with the event in the system that caused it. 178 * {@see getCorrelationId} 179 * @param sync - whether the request should run synchronously 180 */ 181 async runRuleSet( 182 rules: ReadonlyDeep<PlainRuleWithLatestVersion[]>, 183 evaluationContext: RuleEvaluationContext, 184 environment: RuleEnvironment, 185 executionsCorrelationId: RuleExecutionCorrelationId, 186 sync?: boolean, 187 ): Promise<{ 188 rulesToResults: Map< 189 ReadonlyDeep<PlainRuleWithLatestVersion>, 190 RuleExecutionResult 191 >; 192 actions: readonly Action[]; 193 }> { 194 if (!rules.length) { 195 return { rulesToResults: new Map(), actions: [] }; 196 } 197 198 const shouldRunActions = 199 this.environmentsThatApplyActions.includes(environment); 200 201 // In some cases, even when we run actions, we don't count the action 202 // against a rule's daily limit. E.g., retroaction/manual. 203 const shouldCountActions = 204 shouldRunActions && environment === RuleEnvironment.LIVE; 205 206 // Do aggregation preprocessing here. 207 await Promise.all( 208 rules.map(async (rule) => 209 this.preprocessAggregationConditions( 210 rule.conditionSet, 211 evaluationContext, 212 this.tracer, 213 ), 214 ), 215 ); 216 217 const ruleResults = await Promise.all( 218 rules.map(async (it) => 219 this.ruleEvaluator.runRule(it.conditionSet, evaluationContext), 220 ), 221 ); 222 223 224 const rulesToResults = new Map(equalLengthZip(rules, ruleResults)); 225 226 const passingRules = [...rulesToResults.entries()] 227 .filter(([_rule, result]) => result.passed) 228 .map(([rule, _result]) => rule); 229 230 const actionableRules = passingRules; 231 232 const actionableRulesToActions = new Map( 233 await Promise.all( 234 actionableRules.map( 235 async (rule) => { 236 const actions = (await this.getRuleActionsEventuallyConsistent({ 237 orgId: evaluationContext.org.id, 238 ruleId: rule.id, 239 })) satisfies readonly ReadonlyDeep<Action>[] as readonly Action[]; 240 241 242 return [rule, actions] as const; 243 } 244 ), 245 ), 246 ); 247 248 // NB: while we only run _deduped_ actions, we record the actions and 249 // update the rule action run counts as though no deduping took place, 250 // since, logically, each rule triggered the action. 251 const { org, input: ruleInput } = evaluationContext; 252 253 const policiesByRule = await this.getPoliciesForRulesEventuallyConsistent( 254 rules.map((it) => it.id), 255 ); 256 257 const logRuleExecutionsPromise = this.ruleExecutionLogger.logRuleExecutions( 258 [...rulesToResults.entries()].map(([rule, result]) => ({ 259 orgId: org.id, 260 rule: { 261 id: rule.id, 262 name: rule.name, 263 version: rule.latestVersion.version, 264 tags: rule.tags, 265 }, 266 ruleInput, 267 environment, 268 result: result.conditionResults, 269 correlationId: executionsCorrelationId, 270 passed: result.passed, 271 policies: policiesByRule[rule.id] ?? [], 272 })), 273 sync, 274 ); 275 276 if (!shouldRunActions) { 277 await logRuleExecutionsPromise; 278 return { rulesToResults, actions: [] }; 279 } 280 281 const dedupedActions = uniqBy( 282 [...actionableRulesToActions.values()].flat(), 283 (it) => it.id, 284 ) satisfies Action[] as readonly Action[]; 285 286 // Publish all (deduped) actions + update the rule action counts if appropriate. 287 const publishActionsPromise = this.actionPublisher 288 .publishActions( 289 dedupedActions.map((action) => { 290 return { 291 action, 292 ruleEnvironment: environment, 293 matchingRules: [...actionableRulesToActions.entries()].flatMap( 294 ([rule, actions]) => 295 actions.includes(action) 296 ? [ 297 { 298 ...safePick(rule, ['id', 'name', 'tags']), 299 version: rule.latestVersion.version, 300 policies: policiesByRule[rule.id] ?? [], 301 }, 302 ] 303 : [], 304 ), 305 policies: _.uniqBy( 306 [...actionableRulesToActions.keys()].flatMap( 307 (rule) => policiesByRule[rule.id] ?? [], 308 ), 309 'id', 310 ), 311 }; 312 }), 313 { 314 orgId: org.id, 315 targetItem: ruleInput, 316 correlationId: executionsCorrelationId as CorrelationId< 317 Exclude<CorrelationIdType<RuleExecutionCorrelationId>, 'backtest'> 318 >, 319 sync, 320 }, 321 ) 322 .catch((e) => { 323 this.tracer.logActiveSpanFailedIfAny(e); 324 throw e; 325 }); 326 327 const updateRuleActionCountsPromise = shouldCountActions 328 ? this.recordRuleActionLimitUsage( 329 actionableRules.map((it) => it.id), 330 ).catch(() => { 331 // This query sometimes fails from a Sequelize Acquire Connection 332 // timeout. While we're debugging the root cause of that further, 333 // swallow the error rather than crashing the process. 334 }) 335 : undefined; 336 337 await Promise.all([ 338 publishActionsPromise, 339 logRuleExecutionsPromise, 340 updateRuleActionCountsPromise, 341 ]); 342 343 return { rulesToResults, actions: dedupedActions }; 344 } 345 346 async preprocessAggregationConditions( 347 ruleConditions: ReadonlyDeep<ConditionSet>, 348 context: RuleEvaluationContext, 349 tracer: SafeTracer, 350 ) { 351 const { input } = context; 352 353 if (isFullSubmission(input)) { 354 const aggregations = getAllAggregationsInConditionSet(ruleConditions); 355 356 const aggregationAndConditionResults = await Promise.all( 357 aggregations.map(async (aggregation) => { 358 if (!aggregation.conditionSet) { 359 return { aggregation, shouldUpdate: true }; 360 } 361 362 const results = await getConditionSetResults( 363 aggregation.conditionSet, 364 context, 365 tracer, 366 ); 367 368 return { 369 aggregation, 370 shouldUpdate: 371 results.result.outcome === ConditionCompletionOutcome.PASSED, 372 }; 373 }), 374 ); 375 376 await Promise.all( 377 aggregationAndConditionResults 378 .filter(({ shouldUpdate }) => shouldUpdate) 379 .map(async ({ aggregation }) => { 380 const runtimeArgs = await evaluateAggregationRuntimeArgsForItem( 381 context, 382 input, 383 aggregation, 384 ); 385 386 if (!runtimeArgs) { 387 return; 388 } 389 390 await this.aggregationsService.updateAggregation( 391 aggregation, 392 runtimeArgs, 393 tracer, 394 ); 395 }), 396 ); 397 } 398 } 399 400} 401 402export default inject( 403 [ 404 'RuleExecutionLogger', 405 'RuleEvaluator', 406 'ActionPublisher', 407 'getEnabledRulesForItemTypeEventuallyConsistent', 408 'getPoliciesForRulesEventuallyConsistent', 409 'getActionsForRuleEventuallyConsistent', 410 'recordRuleActionLimitUsage', 411 'AggregationsService', 412 'Tracer', 413 ], 414 RuleEngine, 415); 416export { type RuleEngine };