Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 956 lines 32 kB view raw
1/* eslint-disable max-lines */ 2 3import { type Exception } from '@opentelemetry/api'; 4import { makeEnumLike } from '@roostorg/types'; 5import { DataSource } from 'apollo-datasource'; 6import { AuthenticationError } from 'apollo-server-express'; 7import { sql, type Kysely } from 'kysely'; 8import Sequelize from 'sequelize'; 9import { uid } from 'uid'; 10 11import { inject, type Dependencies } from '../../iocContainer/index.js'; 12import { 13 isEmptyResultSetError, 14 isUniqueConstraintError, 15} from '../../models/errors.js'; 16import { type User } from '../../models/UserModel.js'; 17import { type ActionCountsInput } from '../../services/actionStatisticsService/index.js'; 18import { type AggregationClause } from '../../services/aggregationsService/index.js'; 19import { 20 RuleType, 21 type Condition, 22 type ConditionInput, 23 type ConditionSet, 24 type LeafCondition, 25 type CoopInput, 26 type RuleStatus, 27} from '../../services/moderationConfigService/index.js'; 28import { 29 makeRuleHasRunningBacktestsError, 30 makeRuleIsMissingContentTypeError, 31 makeRuleNameExistsError, 32 // TODO: delete the import below when we move the rule mutation logic into the 33 // moderation config service, which is where it should be. 34 // eslint-disable-next-line import/no-restricted-paths 35} from '../../services/moderationConfigService/moderationConfigService.js'; 36import { 37 isSignalId, 38 signalIsExternal, 39 type SignalId, 40} from '../../services/signalsService/index.js'; 41import { type ConditionSetWithResultAsLogged } from '../../services/analyticsLoggers/index.js'; 42import { type DataWarehousePublicSchema } from '../../storage/dataWarehouse/warehouseSchema.js'; 43import { toCorrelationId } from '../../utils/correlationIds.js'; 44import { 45 jsonParse, 46 jsonStringify, 47 tryJsonParse, 48} from '../../utils/encoding.js'; 49import { makeNotFoundError } from '../../utils/errors.js'; 50import { assertUnreachable, patchInPlace } from '../../utils/misc.js'; 51import { takeLast } from '../../utils/sql.js'; 52import { 53 type Mutable, 54 type NonEmptyString, 55 type RequiredWithoutNull, 56} from '../../utils/typescript-types.js'; 57import { 58 type GQLAggregationClauseInput, 59 type GQLConditionInput, 60 type GQLConditionInputFieldInput, 61 type GQLConditionSetInput, 62 type GQLCreateContentRuleInput, 63 type GQLCreateUserRuleInput, 64 type GQLRunRetroactionInput, 65 type GQLUpdateContentRuleInput, 66 type GQLUpdateUserRuleInput, 67} from '../generated.js'; 68import { oneOfInputToTaggedUnion } from '../utils/inputHelpers.js'; 69import { type CursorInfo, type Edge } from '../utils/paginationHandler.js'; 70import { locationAreaInputToLocationArea } from './LocationBankApi.js'; 71 72const { Op, Transaction } = Sequelize; 73const SortOrder = makeEnumLike(['ASC', 'DESC']); 74type SortOrder = (typeof SortOrder)[keyof typeof SortOrder]; 75 76// GraphQl exposed type for a rule execution. 77// TODO: make sure schema matches result here. 78export type RuleExecutionResult = { 79 date: string; 80 ts: string; 81 contentId: string; 82 itemTypeName: string; 83 itemTypeId: string; 84 userId?: string; 85 userTypeId?: string; 86 content: string; 87 result: ConditionSetWithResultAsLogged; 88 environment: RuleStatus; 89 passed: boolean; 90 ruleId: string; 91 ruleName: string; 92 tags: string[]; 93}; 94 95export function transformConditionForDB< 96 T extends GQLConditionInput | GQLConditionSetInput, 97>(condition: T): T extends GQLConditionSetInput ? ConditionSet : Condition { 98 if (!conditionInputIsValid(condition)) { 99 throw new Error('Invalid condition input'); 100 } 101 102 if ('conditions' in condition) { 103 return { 104 ...condition, 105 conjunction: condition.conjunction, 106 conditions: condition.conditions.map( 107 transformConditionForDB, 108 ) as ConditionSet['conditions'], 109 }; 110 } 111 112 return transformLeafConditionForDB( 113 condition, 114 ) as T extends GQLConditionSetInput ? ConditionSet : Condition; 115} 116 117/** 118 * When a LeafCondition is sent to us as input in a graphql mutation, 119 * the shape of the GQL input objects needs to be mapped to our internal 120 * representation of a LeafCondition (as used in the RuleModel/db/TS). 121 * 122 * NB: for google place locations stored in matchingValues, we convert them 123 * to valid LocationArea objects, but don't bother fetching the extra google 124 * place info (as that'd be quite a lot of extra data to store in the rule's 125 * json blob, which could have performance impacts, and it'd be quite 126 * slow/wasteful to fetch it for every location on every rule update). 127 */ 128function transformLeafConditionForDB( 129 leafCondition: ValidatedGQLLeafConditionInput, 130): LeafCondition { 131 return { 132 ...leafCondition, 133 input: transformConditionInput(leafCondition.input), 134 ...(() => { 135 const { comparator, signal, matchingValues } = leafCondition; 136 137 if (comparator === 'IS_NOT_PROVIDED') { 138 if (signal) { 139 throw new Error( 140 'Cannot use is not provided on a condition with a signal', 141 ); 142 } 143 return { 144 comparator, 145 signal: undefined, 146 matchingValues: undefined, 147 threshold: undefined, 148 }; 149 } 150 151 return { 152 comparator, 153 matchingValues: matchingValues 154 ? { 155 ...matchingValues, 156 locations: matchingValues.locations?.map( 157 locationAreaInputToLocationArea, 158 ), 159 } 160 : undefined, 161 signal: 162 signal && 163 (() => { 164 const { id, name, subcategory, type } = signal; 165 const idParsed = tryJsonParse(id); 166 if (!isSignalId(idParsed) || !signalIsExternal(idParsed)) { 167 throw new Error('Invalid signal id'); 168 } 169 const signalInfo = { 170 id: jsonStringify(idParsed), 171 name, 172 subcategory, 173 }; 174 175 // eslint-disable-next-line switch-statement/require-appropriate-default-case 176 switch (type) { 177 case 'AGGREGATION': 178 const aggregationClauseInput = 179 signal.args?.AGGREGATION?.aggregationClause; 180 if (!aggregationClauseInput) { 181 throw new Error('Missing signal args'); 182 } 183 return { 184 ...signalInfo, 185 type, 186 args: { 187 aggregationClause: parseAggregationClauseInput( 188 aggregationClauseInput, 189 ), 190 }, 191 }; 192 default: 193 return { 194 ...signalInfo, 195 type, 196 args: undefined, 197 }; 198 } 199 })(), 200 threshold: leafCondition.threshold, 201 }; 202 })(), 203 }; 204} 205 206function transformConditionInput(conditionInput: GQLConditionInputFieldInput) { 207 // TODO: fix the logic here rather than disabling the lint rule. We 208 // genuinely have some validation gaps. 209 // eslint-disable-next-line switch-statement/require-appropriate-default-case 210 switch (conditionInput.type) { 211 case 'CONTENT_DERIVED_FIELD': 212 const spec = conditionInput.spec!; 213 const specSource = oneOfInputToTaggedUnion(spec.source, { 214 contentField: 'CONTENT_FIELD', 215 fullItem: 'FULL_ITEM', 216 contentCoopInput: 'CONTENT_COOP_INPUT', 217 }); 218 219 return { 220 ...(conditionInput as GQLConditionInputFieldInput & { 221 type: 'CONTENT_DERIVED_FIELD'; 222 }), 223 spec: { 224 ...spec, 225 // This cast is needed because TS (from the generated types) 226 // thinks that input.spec.name is a GQLCoopInput enum, and the 227 // values of that type are things like ALL_TEXT etc, whereas the 228 // runtime values for our CoopInput type are 'All text' etc. 229 // What TS doesn't know is that an apollo resolver has mapped the 230 // GQL output values back to our saved runtime values, which makes 231 // this safe. 232 source: specSource as 233 | Exclude<typeof specSource, { type: 'CONTENT_COOP_INPUT' }> 234 | { type: 'CONTENT_COOP_INPUT'; name: CoopInput }, 235 }, 236 }; 237 default: 238 // TS is actually right to complain here, because our GQL types for 239 // LeafCondition.input let a lot of invalid values through (the GQL 240 // types are pretty loose, because we haven't yet made them a proper 241 // GQL input union), and our coarse validation routine doesn't fully 242 // compensate for the looseness. But, for now, we just assume the 243 // frontend is sending valid data and do this cast. 244 return conditionInput as ConditionInput; 245 } 246} 247 248function parseAggregationClauseInput( 249 aggregationClause: GQLAggregationClauseInput, 250): AggregationClause { 251 return { 252 id: uid(), 253 conditionSet: 254 aggregationClause.conditionSet && 255 transformConditionForDB(aggregationClause.conditionSet), 256 aggregation: { 257 type: aggregationClause.aggregation.type, 258 }, 259 groupBy: aggregationClause.groupBy.map((it) => transformConditionInput(it)), 260 window: { 261 sizeMs: aggregationClause.window.sizeMs, 262 hopMs: aggregationClause.window.hopMs, 263 }, 264 }; 265} 266 267/** 268 * GraphQL Object for a Rule 269 */ 270class RuleAPI extends DataSource { 271 private readonly warehouse: Kysely<DataWarehousePublicSchema>; 272 273 constructor( 274 private readonly knex: Dependencies['Knex'], 275 dialect: Dependencies['DataWarehouseDialect'], 276 public readonly ruleInsights: Dependencies['RuleActionInsights'], 277 private readonly actionStats: Dependencies['ActionStatisticsService'], 278 private readonly models: Dependencies['Sequelize'], 279 private readonly tracer: Dependencies['Tracer'], 280 private readonly signalsService: Dependencies['SignalsService'], 281 ) { 282 super(); 283 this.warehouse = dialect.getKyselyInstance() as Kysely<DataWarehousePublicSchema>; 284 } 285 286 async getGraphQLRuleFromId(id: string, orgId: string) { 287 const rule = await this.models.Rule.findByPk(id, { rejectOnEmpty: true }); 288 if (rule.orgId !== orgId) { 289 throw new AuthenticationError( 290 'User not authenticated to fetch this rule', 291 ); 292 } 293 294 return rule; 295 } 296 297 async createContentRule( 298 input: GQLCreateContentRuleInput, 299 userId: string, 300 orgId: string, 301 ) { 302 return this.createRule( 303 { ...input, ruleType: RuleType.CONTENT }, 304 userId, 305 orgId, 306 ); 307 } 308 309 async createUserRule( 310 input: GQLCreateUserRuleInput, 311 userId: string, 312 orgId: string, 313 ) { 314 return this.createRule( 315 { ...input, ruleType: RuleType.USER }, 316 userId, 317 orgId, 318 ); 319 } 320 321 private async createRule( 322 input: 323 | (GQLCreateContentRuleInput & { ruleType: typeof RuleType.CONTENT }) 324 | (GQLCreateUserRuleInput & { ruleType: typeof RuleType.USER }), 325 userId: string, 326 orgId: string, 327 ) { 328 const { 329 name, 330 description, 331 status, 332 conditionSet, 333 actionIds, 334 policyIds, 335 tags, 336 ruleType, 337 maxDailyActions, 338 expirationTime, 339 parentId, 340 } = input; 341 342 if (ruleType === RuleType.CONTENT && input.contentTypeIds.length === 0) { 343 throw makeRuleIsMissingContentTypeError({ shouldErrorSpan: true }); 344 } 345 346 // Validate that signals used in automated rules are allowed 347 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 348 if (actionIds && actionIds.length > 0) { 349 await this.validateSignalsAllowedInAutomatedRules(conditionSet, orgId); 350 } 351 352 const rule = this.models.Rule.build({ 353 id: uid(), 354 name, 355 description, 356 tags: tags.slice(), 357 status, 358 conditionSet: transformConditionForDB(conditionSet), 359 maxDailyActions, 360 expirationTime: (expirationTime as Date | null | undefined) ?? undefined, 361 creatorId: userId, 362 orgId, 363 ruleType, 364 parentId, 365 }); 366 367 try { 368 await this.models.transactionWithRetry(async () => { 369 // Save rule to 'rules' table before adding assocs, to give it 370 // a record for foreign keys to reference and test name uniqueness. 371 await rule.save(); 372 373 if (ruleType === RuleType.CONTENT) { 374 await rule.setContentTypes( 375 input.contentTypeIds as Mutable<typeof input.contentTypeIds>, 376 ); 377 } 378 379 // The Mutable casts are used to work around a sequelize typings bug. 380 await rule.setActions(actionIds as Mutable<typeof actionIds>); 381 await rule.setPolicies(policyIds as Mutable<typeof policyIds>); 382 383 // TODO: is this needed? 384 await rule.save(); 385 }); 386 } catch (e) { 387 throw isUniqueConstraintError(e) 388 ? makeRuleNameExistsError({ shouldErrorSpan: true }) 389 : e; 390 } 391 392 return rule; 393 } 394 395 async updateContentRule(opts: { 396 input: GQLUpdateContentRuleInput; 397 orgId: string; 398 }) { 399 const { input, orgId } = opts; 400 return this.updateRule({ 401 input: { ...input, ruleType: RuleType.CONTENT }, 402 orgId, 403 }); 404 } 405 406 async updateUserRule(opts: { input: GQLUpdateUserRuleInput; orgId: string }) { 407 const { input, orgId } = opts; 408 return this.updateRule({ 409 input: { ...input, ruleType: RuleType.USER }, 410 orgId, 411 }); 412 } 413 414 // eslint-disable-next-line complexity 415 private async updateRule(opts: { 416 input: 417 | (GQLUpdateContentRuleInput & { ruleType: typeof RuleType.CONTENT }) 418 | (GQLUpdateUserRuleInput & { ruleType: typeof RuleType.USER }); 419 orgId: string; 420 }) { 421 const { input, orgId } = opts; 422 const { 423 id, 424 name, 425 description, 426 status, 427 conditionSet, 428 actionIds, 429 policyIds, 430 tags, 431 ruleType, 432 maxDailyActions, 433 expirationTime, 434 cancelRunningBacktests, 435 parentId, 436 } = input; 437 438 const rule = await this.models.Rule.findOne({ 439 where: { id, orgId }, 440 rejectOnEmpty: true, 441 }).catch((e) => { 442 throw isEmptyResultSetError(e) 443 ? makeNotFoundError('Rule not found', { 444 detail: `Could not find rule with id ${id}`, 445 shouldErrorSpan: true, 446 }) 447 : e; 448 }); 449 450 if (conditionSet != null && !conditionInputIsValid(conditionSet)) { 451 throw new Error('Invalid condition set input'); 452 } 453 454 // In the case of a content rule update, it's okay if the contentTypeIds isn't 455 // provided, since that will just be a no-op via the patchInPlace, but if it 456 // is provided, we need to check to make sure there are actually content type 457 // IDs present, since an empty list is invalid for content rules. 458 if ( 459 ruleType === 'CONTENT' && 460 input.contentTypeIds && 461 input.contentTypeIds.length === 0 462 ) { 463 throw makeRuleIsMissingContentTypeError({ shouldErrorSpan: true }); 464 } 465 466 patchInPlace(rule, { 467 name: name ?? undefined, 468 description, 469 conditionSet: 470 conditionSet == null 471 ? undefined 472 : transformConditionForDB(conditionSet), 473 tags: tags?.slice() ?? undefined, 474 ruleType, 475 }); 476 477 if (status && rule.status !== status) { 478 rule.status = status; 479 } 480 481 if (rule.maxDailyActions !== maxDailyActions) { 482 // If maxDailyActions is undefined, it needs to be explicitly converted 483 // to null because postgres doesn't understand undefined 484 rule.maxDailyActions = maxDailyActions ?? null; 485 } 486 487 if (rule.expirationTime !== expirationTime) { 488 // If expirationTime is undefined, it needs to be explicitly converted 489 // to null because postgres doesn't understand undefined 490 rule.expirationTime = (expirationTime as Date | null | undefined) ?? null; 491 } 492 493 if (rule.parentId !== parentId) { 494 rule.parentId = parentId ?? null; 495 } 496 497 // Validate that signals used in automated rules are allowed 498 // Check if the rule will have actions meaning automated rule. 499 // This ensures we don't allow creating automated rules with signals 500 // that are restricted to routing rules only. 501 const willHaveActions = actionIds 502 ? actionIds.length > 0 503 : (await rule.getActions()).length > 0; 504 505 if (willHaveActions && conditionSet) { 506 await this.validateSignalsAllowedInAutomatedRules(conditionSet, orgId); 507 } 508 509 // Before we actually send any updates (which will happen as soon as we call 510 // setXXX to set the associations), we need to make sure that there are no 511 // active backtests for this rule because, if there are, we should fail the 512 // update unless the user's asked to cancel the backtests explicitly. 513 if (!cancelRunningBacktests) { 514 if (await this.models.Backtest.hasRunningBacktestsForRule(rule.id)) { 515 throw makeRuleHasRunningBacktestsError({ shouldErrorSpan: true }); 516 } 517 } 518 519 // Do our updates, in a transaction so that we don't end up with 520 // inconsistent state if the name check fails. Technically, I think we'd 521 // need to put the hasRunningBacktests call above in this transaction and 522 // use SERIALIZABLE to make the update + backtest cancelation logically 523 // linearizable w/r/t concurrently started backtests, but that's overkill. 524 try { 525 await this.models.sequelize.transaction( 526 { isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ }, 527 async () => { 528 if (ruleType === 'CONTENT' && input.contentTypeIds) { 529 await rule.setContentTypes( 530 input.contentTypeIds as Mutable<typeof input.contentTypeIds>, 531 ); 532 } 533 534 // TODO: this is not safe. Let's one org link to a different org's 535 // policies/actions. 536 if (actionIds) { 537 await rule.setActions(actionIds as Mutable<typeof actionIds>); 538 } 539 if (policyIds) { 540 await rule.setPolicies(policyIds as Mutable<typeof policyIds>); 541 } 542 543 await rule.save(); 544 545 // Finally, if the user asked to delete any running backtests, do it. 546 if (cancelRunningBacktests) { 547 await this.models.Backtest.cancelRunningBacktestsForRule(rule.id); 548 } 549 }, 550 ); 551 } catch (e) { 552 throw isUniqueConstraintError(e) 553 ? makeRuleNameExistsError({ shouldErrorSpan: true }) 554 : e; 555 } 556 557 return rule; 558 } 559 560 async deleteRule(opts: { id: string; orgId: string }) { 561 const { id, orgId } = opts; 562 563 try { 564 const rule = await this.models.Rule.findOne({ where: { id, orgId } }); 565 await rule?.destroy(); 566 } catch (exception) { 567 const activeSpan = this.tracer.getActiveSpan(); 568 if (activeSpan?.isRecording()) { 569 activeSpan.recordException(exception as Exception); 570 } 571 return false; 572 } 573 return true; 574 } 575 576 async getAllRuleInsights(orgId: string) { 577 const results = await Promise.allSettled([ 578 this.actionStats.getActionedSubmissionCountsByDay(orgId), 579 this.actionStats.getActionedSubmissionCountsByPolicyByDay(orgId), 580 this.actionStats.getActionedSubmissionCountsByTagByDay(orgId), 581 this.actionStats.getActionedSubmissionCountsByActionByDay(orgId), 582 this.ruleInsights.getContentSubmissionCountsByDay(orgId), 583 ]); 584 585 const valueOrEmpty = <T,>(r: PromiseSettledResult<readonly T[]>): readonly T[] => 586 r.status === 'fulfilled' ? r.value : []; 587 588 return { 589 actionedSubmissionsByDay: valueOrEmpty(results[0]), 590 actionedSubmissionsByPolicyByDay: valueOrEmpty(results[1]), 591 actionedSubmissionsByTagByDay: valueOrEmpty(results[2]), 592 actionedSubmissionsByActionByDay: valueOrEmpty(results[3]), 593 totalSubmissionsByDay: valueOrEmpty( 594 results[4] as PromiseSettledResult<readonly { date: string; count: number }[]>, 595 ), 596 }; 597 } 598 599 async getPoliciesSortedByViolationCount(input: { 600 filterBy: { 601 startDate: Date; 602 endDate: Date; 603 }; 604 timeZone: string; 605 orgId: string; 606 }) { 607 return this.actionStats.getPoliciesSortedByViolationCount(input); 608 } 609 async getActionStatistics(input: ActionCountsInput) { 610 const { filterBy } = input; 611 // if we need to filter some actions when also grouping, we must use the base table 612 // and can't use the materialized views that only aggregate by one field 613 if ( 614 filterBy.actionIds.length || 615 filterBy.itemTypeIds.length || 616 filterBy.itemTypeIds.length || 617 filterBy.sources.length 618 ) { 619 return this.actionStats.getAllActionCountsGroupBy(input); 620 } 621 switch (input.groupBy) { 622 case 'RULE_ID': 623 return this.actionStats.getAllActionCountsGroupByRule(input); 624 case 'POLICY_ID': 625 return this.actionStats.getAllActionCountsGroupByPolicy(input); 626 case 'ACTION_ID': 627 return this.actionStats.getAllActionCountsGroupByActionId(input); 628 case 'ITEM_TYPE_ID': 629 return this.actionStats.getAllActionCountsGroupByItemTypeId(input); 630 case 'ACTION_SOURCE': 631 return this.actionStats.getAllActionCountsGroupBySource(input); 632 default: 633 assertUnreachable(input.groupBy); 634 } 635 } 636 637 async createBacktest(_input: any, _user: User) { 638 throw new Error('Not Implemented'); 639 640 // const id = uid(); 641 // const rule = await this.models.Rule.findByPk(input.ruleId, { 642 // rejectOnEmpty: true, 643 // }); 644 // const ruleContentTypes = await rule.getContentTypes(); 645 646 // if (!ruleContentTypes.length) { 647 // throw new Error( 648 // "Rule is not attached to any content types, so we're " + 649 // 'unable to select content to use for the backtest.', 650 // ); 651 // } 652 653 // const backest = this.models.Backtest.build({ 654 // id, 655 // ruleId: input.ruleId, 656 // sampleDesiredSize: input.sampleDesiredSize, 657 // sampleStartAt: new Date(input.sampleStartAt), 658 // sampleEndAt: new Date(input.sampleEndAt), 659 // creatorId: user.id, 660 // }); 661 662 // await backest.save(); 663 664 // // Start sampling and enqueueing the sampled items, but do this without 665 // // awaiting so that we can return to the frontend immediately. 666 // // 667 // // Our query ignores legacy submissions that didn't store their content type 668 // // schema at the time of submission, as we can't interpret those reliably 669 // // when backtesting. This also has the effect of excluding all rows which 670 // // didn't log their submission id or item type id (which is what we want, 671 // // since those fields are required, and we started logging them before 672 // // logging `schema`). The use of FixSingleTableSelectRowType gets the types 673 // // to be aware of all our WHERE clause filters and their implications for 674 // // the other columns. 675 // // prettier-ignore 676 // this.getItemSubmissionsFromWarehouse({ 677 // orgId: user.orgId, 678 // randomSample: true, 679 // numRows: input.sampleDesiredSize, 680 // startAt: new Date(input.sampleStartAt), 681 // endAt: new Date(input.sampleEndAt), 682 // itemTypeIds: ruleContentTypes.map(ct => ct.id), 683 // }).then(async (submissions) => { 684 // const ruleSetExecutionJobs = submissions.map((it) => ({ 685 // orgId: user.orgId, 686 // ruleIds: [input.ruleId], 687 // itemSubmission: it, 688 // environment: RuleEnvironment.BACKTEST, 689 // correlationId: toCorrelationId({ type: 'backtest', id }), 690 // })); 691 692 // const { failures } = await this.ruleScheduler.enqueueRuleSetExecutions( 693 // ruleSetExecutionJobs, 694 // ); 695 696 // const sampleActualSize = submissions.length - failures.length; 697 // await backest.update({ sampleActualSize, samplingComplete: true }); 698 // }) 699 // .catch((e) => { 700 // const span = this.tracer.getActiveSpan(); 701 // span?.recordException(e); 702 // }); 703 704 // return backest; 705 } 706 707 async getBacktestResults( 708 backtestId: string, 709 count: number, 710 takeFrom: 'start' | 'end', 711 cursor?: CursorInfo<{ ts: number }>, 712 sortByTs: SortOrder = SortOrder.DESC, 713 ): Promise<Edge<RuleExecutionResult, { ts: number }>[]> { 714 // There are a 12 cases here, i.e., (takeFrom start or end) x 715 // (no cursor, after cursor, before cursor) x (sort asc, desc). 716 // But our pagination helpers let us handle reasonably simply, in steps. 717 // First, we must define the result query if we weren't doing any pagination: 718 const allResultsQuery = this.knex('RULE_EXECUTIONS') 719 .select({ 720 // This select is aliasing each column to the corresponding object key, 721 // so we have to do fewer renames from the warehouse ALL_CAPS_SNAKE_CASE 722 // when we return the final result. 723 date: 'DS', 724 ts: 'TS', 725 contentId: 'ITEM_ID', 726 contentType: 'ITEM_TYPE_NAME', 727 userId: 'ITEM_CREATOR_ID', 728 content: 'ITEM_DATA', 729 result: 'RESULT', 730 }) 731 .where( 732 'CORRELATION_ID', 733 toCorrelationId({ type: 'backtest', id: backtestId }), 734 ); 735 736 // Now, we can filter down the results to those that satisfy 737 // the cursor's before/after requirements, if there is a cursor. 738 // Note that how we do this filtering depends on how the results are sorted, 739 // because the sorting conceptually happens "before" pagination, and it 740 // effects what's "before" and what's "after" a given cursor. 741 // 742 // Specifically, if the results are sorted descending and we're looking for 743 // values _after_ the cursor, then we're looking for timestamp values that 744 // are less than the cursor. Similarly, if we're sorting ascending and 745 // looking for items before the cursor, then those items must have ts values 746 // less than the cursor. In the other cases, it's the opposite. 747 const filteredResultsQuery = !cursor 748 ? allResultsQuery 749 : allResultsQuery.andWhere( 750 'TS', 751 (sortByTs === SortOrder.DESC && cursor.direction === 'after') || 752 (sortByTs === SortOrder.ASC && cursor.direction === 'before') 753 ? '<' 754 : '>', 755 new Date(cursor.value.ts), 756 ); 757 758 const desiredSort = { 759 column: 'ts', 760 order: sortByTs === SortOrder.ASC ? 'asc' : 'desc', 761 } as const; 762 763 // Finally, filteredResultsQuery represents the _set_ of potentially valid 764 // items, but it's not yet sorted or limited to the page size. So, to do 765 // that... if we're taking from the start, then we add simple SQL sorting 766 // and limiting to our results; however, if we're taking from the end, we 767 // have to use our helper that implements "takeLast" in SQL. 768 const finalQuery = 769 takeFrom === 'start' 770 ? filteredResultsQuery.orderBy([desiredSort]).limit(count) 771 : takeLast(filteredResultsQuery, [desiredSort], count); 772 773 const results = ( 774 await sql`${sql.raw(finalQuery.toString())}`.execute(this.warehouse) 775 ).rows; 776 777 return results.map((it: any) => ({ 778 node: { ...it, result: it.result ? jsonParse(it.result) : null }, 779 cursor: { ts: new Date(it.ts).valueOf() }, 780 })); 781 } 782 783 async getBacktestsForRule( 784 ruleId: string, 785 backtestIds?: readonly string[] | null, 786 ) { 787 return this.models.Backtest.findAll({ 788 where: { 789 ruleId, 790 ...(backtestIds ? { id: { [Op.in]: backtestIds } } : {}), 791 }, 792 }); 793 } 794 795 /** 796 * NB: This retroaction code is not production-ready. It should only 797 * be used for our Slack demo because it has a limit of 100 pieces 798 * of content on which it will run. That prevents us from accidentally 799 * turning this on and overloading our node servers, and is sufficient 800 * for the Slack demo. 801 */ 802 async runRetroaction(_input: GQLRunRetroactionInput, _user: User) { 803 throw new Error('Not Implemented'); 804 805 // const rule = await this.models.Rule.findByPk(input.ruleId, { 806 // rejectOnEmpty: true, 807 // }); 808 // const ruleContentTypes = await rule.getContentTypes(); 809 810 // if (!ruleContentTypes.length) { 811 // throw new Error( 812 // "Rule is not attached to any content types, so we're " + 813 // 'unable to select content to use for the backtest.', 814 // ); 815 // } 816 817 // const id = uid(); 818 // const submissions = await this.getItemSubmissionsFromWarehouse({ 819 // orgId: user.orgId, 820 // itemTypeIds: ruleContentTypes.map((ct) => ct.id), 821 // randomSample: false, 822 // numRows: 100, // TODO: Remove the limit, and instead batch this query 823 // startAt: new Date(input.startAt), 824 // endAt: new Date(input.endAt), 825 // }); 826 827 // try { 828 // const { failures } = await this.ruleScheduler.enqueueRuleSetExecutions( 829 // submissions.map((it) => ({ 830 // orgId: user.orgId, 831 // ruleIds: [input.ruleId], 832 // itemSubmission: it, 833 // environment: RuleEnvironment.RETROACTION, 834 // correlationId: toCorrelationId({ type: 'retroaction', id }), 835 // })), 836 // ); 837 838 // return { _: !failures.length }; 839 // } catch (e) { 840 // this.tracer.getActiveSpan()?.recordException(e as Exception); 841 // return { _: false }; 842 // } 843 } 844 845 /** 846 * Validates that all signals used in the condition set are allowed in automated rules. 847 * Throws an error if any restricted signal is found. 848 */ 849 private async validateSignalsAllowedInAutomatedRules( 850 conditionSet: GQLConditionSetInput, 851 orgId: string, 852 ): Promise<void> { 853 const signalIds = this.extractSignalIdsFromConditionSet(conditionSet); 854 855 for (const signalId of signalIds) { 856 const signal = await this.signalsService.getSignal({ 857 signalId, 858 orgId, 859 }); 860 861 if (signal && !signal.allowedInAutomatedRules) { 862 throw new Error( 863 `Signal "${signal.displayName}" cannot be used in automated rules with actions. ` + 864 `This signal is restricted to routing rules only.` 865 ); 866 } 867 } 868 } 869 870 /** 871 * Extracts all signal IDs from a condition set recursively 872 */ 873 private extractSignalIdsFromConditionSet( 874 conditionSet: GQLConditionSetInput, 875 ): SignalId[] { 876 const signalIds: SignalId[] = []; 877 878 const processCondition = (condition: GQLConditionInput) => { 879 if ('conditions' in condition && condition.conditions) { 880 // It's a condition set, recurse 881 for (const subCondition of condition.conditions) { 882 processCondition(subCondition); 883 } 884 } else if ('signal' in condition && condition.signal) { 885 // It's a leaf condition with a signal (type is String to support plugin signals) 886 const { type, id } = condition.signal; 887 let signalId: SignalId; 888 if (type === 'CUSTOM') { 889 // CUSTOM signals require an id field. The id comes from validated GraphQL 890 // input where it's a required Scalars['ID'], so we can safely cast it. 891 signalId = { type: 'CUSTOM' as const, id: id as NonEmptyString }; 892 } else { 893 // Built-in and plugin signals: type is the signal type string 894 signalId = { type }; 895 } 896 signalIds.push(signalId); 897 } 898 }; 899 900 // Start processing from the root 901 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 902 if (conditionSet.conditions) { 903 for (const condition of conditionSet.conditions) { 904 processCondition(condition); 905 } 906 } 907 908 return signalIds; 909 } 910} 911 912export default inject( 913 [ 914 'Knex', 915 'DataWarehouseDialect', 916 'RuleActionInsights', 917 'ActionStatisticsService', 918 'Sequelize', 919 'Tracer', 920 'SignalsService', 921 ], 922 RuleAPI, 923); 924export type { RuleAPI }; 925 926/** 927 * Our ConditionInput in GraphQL is forced to be not type safe, so we must 928 * validate it here. For convenience, we also allow this to accept a 929 * ConditionSetInput, which has the same shape as valid ConditionInputs that are 930 * used to represent ConditionSets. 931 */ 932function conditionInputIsValid( 933 it: GQLConditionInput | GQLConditionSetInput, 934): it is ValidatedGQLConditionInput { 935 return ( 936 (it.conjunction != null && 937 it.conditions != null && 938 Object.keys(it).length === 2) || 939 (!('conjunction' in it) && !('conditions' in it) && it.input != null) 940 ); 941} 942 943type ValidatedGQLConditionInput = 944 | ValidatedGQLConditionSetInput 945 | ValidatedGQLLeafConditionInput; 946 947type ValidatedGQLConditionSetInput = RequiredWithoutNull< 948 Pick<GQLConditionInput, 'conditions' | 'conjunction'> 949>; 950 951type ValidatedGQLLeafConditionInput = Omit< 952 GQLConditionInput, 953 'conditions' | 'conjunction' 954> & 955 RequiredWithoutNull<Pick<GQLConditionInput, 'input'>>; 956