Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

[Kysely] migrate rule-engine queries and related jobs to Kysely (phase 1) (#225)

* [Vulnerabilities] Upgrade Kysely to latest

* fix lint

* code review

* [Kysely] migrate rule-engine queries and related jobs to Kysely (phase 1)

* fixes

* fix lint by organizing errors to a file for simplifications

* lint fix again

* fix test

* [Kysely] Remove knex migrate backtest pagination and takeLast to Kysely (#226)

* [Kysely] Remove knex migrate backtest pagination and takeLast to Kysely

* code revie fix

* simplify enum uses

authored by

Juan Mrad and committed by
GitHub
74c206f5 ffc58854

+1053 -688
+1 -4
server/graphql/datasources/LocationBankApi.ts
··· 7 7 import { isUniqueConstraintError } from '../../models/errors.js'; 8 8 import { type LocationArea } from '../../models/types/locationArea.js'; 9 9 import { type User } from '../../models/UserModel.js'; 10 - // TODO: delete the import below when we move the location bank mutation logic 11 - // into the moderation config service, which is where it should be. 12 - // eslint-disable-next-line import/no-restricted-paths 13 - import { makeLocationBankNameExistsError } from '../../services/moderationConfigService/moderationConfigService.js'; 10 + import { makeLocationBankNameExistsError } from '../../services/moderationConfigService/index.js'; 14 11 import { type PlacesApiService } from '../../services/placesApiService/index.js'; 15 12 import { patchInPlace, safePick } from '../../utils/misc.js'; 16 13 import {
+69 -55
server/graphql/datasources/RuleApi.ts
··· 3 3 import { type Exception } from '@opentelemetry/api'; 4 4 import { makeEnumLike } from '@roostorg/types'; 5 5 6 - import { sql, type Kysely } from 'kysely'; 6 + import { type Kysely } from 'kysely'; 7 7 import Sequelize from 'sequelize'; 8 8 import { uid } from 'uid'; 9 9 ··· 29 29 makeRuleHasRunningBacktestsError, 30 30 makeRuleIsMissingContentTypeError, 31 31 makeRuleNameExistsError, 32 - // TODO: delete the import below when we move the rule mutation logic into the 33 - // moderation config service, which is where it should be. 34 - // eslint-disable-next-line import/no-restricted-paths 35 - } from '../../services/moderationConfigService/moderationConfigService.js'; 32 + } from '../../services/moderationConfigService/index.js'; 36 33 import { 37 34 isSignalId, 38 35 signalIsExternal, 39 36 type SignalId, 40 37 } from '../../services/signalsService/index.js'; 41 - import { type DataWarehousePublicSchema } from '../../storage/dataWarehouse/warehouseSchema.js'; 38 + import { 39 + type DataWarehousePublicSchema, 40 + warehouseDateToDate, 41 + } from '../../storage/dataWarehouse/warehouseSchema.js'; 42 42 import { toCorrelationId } from '../../utils/correlationIds.js'; 43 43 import { 44 + type JsonOf, 44 45 jsonParse, 45 46 jsonStringify, 46 47 tryJsonParse, ··· 84 85 userId?: string; 85 86 userTypeId?: string; 86 87 content: string; 87 - result: ConditionSetWithResultAsLogged; 88 + result: ConditionSetWithResultAsLogged | null; 88 89 environment: RuleStatus; 89 90 passed: boolean; 90 91 ruleId: string; ··· 270 271 private readonly warehouse: Kysely<DataWarehousePublicSchema>; 271 272 272 273 constructor( 273 - private readonly knex: Dependencies['Knex'], 274 274 dialect: Dependencies['DataWarehouseDialect'], 275 275 public readonly ruleInsights: Dependencies['RuleActionInsights'], 276 276 private readonly actionStats: Dependencies['ActionStatisticsService'], ··· 714 714 // (no cursor, after cursor, before cursor) x (sort asc, desc). 715 715 // But our pagination helpers let us handle reasonably simply, in steps. 716 716 // First, we must define the result query if we weren't doing any pagination: 717 - const allResultsQuery = this.knex('RULE_EXECUTIONS') 718 - .select({ 719 - // This select is aliasing each column to the corresponding object key, 720 - // so we have to do fewer renames from the warehouse ALL_CAPS_SNAKE_CASE 721 - // when we return the final result. 722 - date: 'DS', 723 - ts: 'TS', 724 - contentId: 'ITEM_ID', 725 - contentType: 'ITEM_TYPE_NAME', 726 - userId: 'ITEM_CREATOR_ID', 727 - content: 'ITEM_DATA', 728 - result: 'RESULT', 729 - }) 730 - .where( 731 - 'CORRELATION_ID', 732 - toCorrelationId({ type: 'backtest', id: backtestId }), 733 - ); 717 + const correlationId = toCorrelationId({ 718 + type: 'backtest', 719 + id: backtestId, 720 + }); 734 721 735 - // Now, we can filter down the results to those that satisfy 736 - // the cursor's before/after requirements, if there is a cursor. 737 - // Note that how we do this filtering depends on how the results are sorted, 738 - // because the sorting conceptually happens "before" pagination, and it 739 - // effects what's "before" and what's "after" a given cursor. 740 - // 741 - // Specifically, if the results are sorted descending and we're looking for 742 - // values _after_ the cursor, then we're looking for timestamp values that 743 - // are less than the cursor. Similarly, if we're sorting ascending and 744 - // looking for items before the cursor, then those items must have ts values 745 - // less than the cursor. In the other cases, it's the opposite. 746 - const filteredResultsQuery = !cursor 747 - ? allResultsQuery 748 - : allResultsQuery.andWhere( 749 - 'TS', 750 - (sortByTs === SortOrder.DESC && cursor.direction === 'after') || 751 - (sortByTs === SortOrder.ASC && cursor.direction === 'before') 752 - ? '<' 753 - : '>', 754 - new Date(cursor.value.ts), 755 - ); 722 + let filteredResultsQuery = this.warehouse 723 + .selectFrom('RULE_EXECUTIONS') 724 + .select([ 725 + 'DS as date', 726 + 'TS as ts', 727 + 'ITEM_ID as contentId', 728 + 'ITEM_TYPE_NAME as itemTypeName', 729 + 'ITEM_TYPE_ID as itemTypeId', 730 + 'ITEM_CREATOR_ID as userId', 731 + 'ITEM_CREATOR_TYPE_ID as userTypeId', 732 + 'ITEM_DATA as content', 733 + 'RESULT as result', 734 + 'ENVIRONMENT as environment', 735 + 'PASSED as passed', 736 + 'RULE_ID as ruleId', 737 + 'RULE as ruleName', 738 + 'TAGS as tags', 739 + ]) 740 + .where('CORRELATION_ID', '=', correlationId); 741 + 742 + if (cursor) { 743 + filteredResultsQuery = filteredResultsQuery.where( 744 + 'TS', 745 + (sortByTs === SortOrder.DESC && cursor.direction === 'after') || 746 + (sortByTs === SortOrder.ASC && cursor.direction === 'before') 747 + ? '<' 748 + : '>', 749 + new Date(cursor.value.ts), 750 + ); 751 + } 756 752 757 753 const desiredSort = { 758 754 column: 'ts', ··· 766 762 // have to use our helper that implements "takeLast" in SQL. 767 763 const finalQuery = 768 764 takeFrom === 'start' 769 - ? filteredResultsQuery.orderBy([desiredSort]).limit(count) 770 - : takeLast(filteredResultsQuery, [desiredSort], count); 765 + ? filteredResultsQuery 766 + .orderBy('ts', desiredSort.order) 767 + .limit(count) 768 + : takeLast(this.warehouse, filteredResultsQuery, [desiredSort], count); 771 769 772 - const results = ( 773 - await sql`${sql.raw(finalQuery.toString())}`.execute(this.warehouse) 774 - ).rows; 770 + const results = await finalQuery.execute(); 775 771 776 - return results.map((it: any) => ({ 777 - node: { ...it, result: it.result ? jsonParse(it.result) : null }, 778 - cursor: { ts: new Date(it.ts).valueOf() }, 772 + return results.map<Edge<RuleExecutionResult, { ts: number }>>((it) => ({ 773 + node: { 774 + date: warehouseDateToDate(it.date).toISOString(), 775 + ts: warehouseDateToDate(it.ts).toISOString(), 776 + contentId: it.contentId, 777 + itemTypeName: it.itemTypeName ?? '', 778 + itemTypeId: it.itemTypeId, 779 + userId: it.userId ?? undefined, 780 + userTypeId: it.userTypeId ?? undefined, 781 + content: (it.content ?? '') as string, 782 + result: it.result 783 + ? jsonParse( 784 + it.result as JsonOf<ConditionSetWithResultAsLogged>, 785 + ) 786 + : null, 787 + environment: it.environment as RuleStatus, 788 + passed: it.passed, 789 + ruleId: it.ruleId, 790 + ruleName: it.ruleName ?? '', 791 + tags: [...it.tags], 792 + }, 793 + cursor: { ts: warehouseDateToDate(it.ts).valueOf() }, 779 794 })); 780 795 } 781 796 ··· 910 925 911 926 export default inject( 912 927 [ 913 - 'Knex', 914 928 'DataWarehouseDialect', 915 929 'RuleActionInsights', 916 930 'ActionStatisticsService',
+10 -29
server/iocContainer/index.ts
··· 5 5 import { makeDateString, type ItemIdentifier } from '@roostorg/types'; 6 6 import { types as scyllaTypes } from 'cassandra-driver'; 7 7 import IORedis, { type Cluster } from 'ioredis'; 8 - import * as knexPkg from 'knex'; 9 - import { type Knex } from 'knex'; 10 8 import { 11 9 Kysely, 12 10 PostgresDialect, ··· 20 18 import { v1 as uuidv1 } from 'uuid'; 21 19 22 20 import makeDb from '../models/index.js'; 23 - import { type PolicyActionPenalties } from '../models/OrgModel.js'; 24 21 import type { IActionExecutionsAdapter } from '../plugins/warehouse/queries/IActionExecutionsAdapter.js'; 25 22 import type { IActionStatisticsAdapter } from '../plugins/warehouse/queries/IActionStatisticsAdapter.js'; 26 23 import type { IContentApiRequestsAdapter } from '../plugins/warehouse/queries/IContentApiRequestsAdapter.js'; ··· 39 36 makeItemSubmissionBulkWrite, 40 37 type ItemSubmissionBulkWrite, 41 38 } from '../queues/itemSubmissionQueue.js'; 39 + import { 40 + getPolicyActionPenaltiesForOrg, 41 + type PolicyActionPenalties, 42 + } from '../services/policyActionPenalties.js'; 42 43 import makeActionPublisher, { 43 44 type ActionPublisher, 44 45 type ActionTargetItem, ··· 50 51 makeGetItemTypesForOrgEventuallyConsistent, 51 52 makeGetLocationBankLocationsEventuallyConsistent, 52 53 makeGetPoliciesForRulesEventuallyConsistent, 53 - makeGetSequelizeItemTypeEventuallyConsistent, 54 54 makeGetTextBankStringsEventuallyConsistent, 55 55 makeRecordRuleActionLimitUsage, 56 56 type GetActionsForRuleEventuallyConsistent, ··· 58 58 type GetItemTypesForOrgEventuallyConsistent, 59 59 type GetLocationBankLocationsBankEventuallyConsistent, 60 60 type GetPoliciesForRulesEventuallyConsistent, 61 - type GetSequelizeItemTypeEventuallyConsistent, 62 61 type GetTextBankStringsEventuallyConsistent, 63 62 type RecordRuleActionLimitUsage, 64 63 } from '../rule_engine/ruleEngineQueries.js'; ··· 334 333 335 334 itemSubmissionQueueBulkWrite: ItemSubmissionBulkWrite; 336 335 itemSubmissionRetryQueueBulkWrite: ItemSubmissionBulkWrite; 337 - Knex: Knex; 338 336 IORedis: IORedis.Redis | Cluster; 339 337 340 338 // Loggers ··· 397 395 (input: { orgId: string; bankId: string }) => Promise<HashBank | null> 398 396 >; 399 397 400 - getSequelizeItemTypeEventuallyConsistent: GetSequelizeItemTypeEventuallyConsistent; 401 398 getItemTypesForOrgEventuallyConsistent: GetItemTypesForOrgEventuallyConsistent; 402 399 getItemTypeEventuallyConsistent: GetItemTypeEventuallyConsistent; 403 400 getEnabledRulesForItemTypeEventuallyConsistent: GetEnabledRulesForItemTypeEventuallyConsistent; ··· 467 464 // 468 465 // - 'KyselyPg' is for issuing raw pg queries w/o sequelize (e.g., the queries 469 466 // that some of the our "services" issue to pg, to the non-public schemas). 470 - // These queries go to our primary db, which accepts writes. Using knex for 471 - // query building is deprecated in favor of kysely, because the latter offers 472 - // better typings. 467 + // These queries go to our primary db, which accepts writes. 473 468 // 474 469 // - KyselyPgReadReplica gives us the same type safety, but sends queries to our 475 470 // replicas, for when we only need reads and we're ok w/ eventual consistency. ··· 618 613 makeItemSubmissionBulkWrite(container.IORedis, ITEM_SUBMISSION_DLQ_NAME), 619 614 ); 620 615 621 - // Legacy service deprecated in favor of kysely. 622 - bottle.value( 623 - 'Knex', 624 - knexPkg.default.knex({ 625 - client: 'pg', 626 - connection: getPgMasterConnectionInfo, 627 - }), 628 - ); 629 - 630 616 // Loggers 631 617 register(bottle, 'RuleExecutionLogger', makeRuleExecutionLogger); 632 618 ··· 1345 1331 bottle.factory( 1346 1332 'getPolicyActionPenaltiesEventuallyConsistent', 1347 1333 (container) => { 1348 - const Org = container.OrgModel; 1334 + const moderationConfigService = container.ModerationConfigService; 1349 1335 1350 1336 return cached({ 1351 1337 async producer(orgId) { 1352 - return Org.getPolicyActionPenaltiesEventuallyConsistent(orgId); 1338 + return getPolicyActionPenaltiesForOrg( 1339 + moderationConfigService, 1340 + orgId, 1341 + ); 1353 1342 }, 1354 1343 directives: { freshUntilAge: 60 }, 1355 1344 }); ··· 1392 1381 directives: { freshUntilAge: 600 }, 1393 1382 }); 1394 1383 }); 1395 - 1396 - register( 1397 - bottle, 1398 - 'getSequelizeItemTypeEventuallyConsistent', 1399 - makeGetSequelizeItemTypeEventuallyConsistent, 1400 - ); 1401 1384 1402 1385 register( 1403 1386 bottle, ··· 1567 1550 'itemSubmissionQueueBulkWrite', 1568 1551 'itemSubmissionRetryQueueBulkWrite', 1569 1552 'Sequelize', 1570 - 'Knex', 1571 1553 'IORedis', 1572 1554 // Storage abstractions 1573 1555 'DataWarehouse', ··· 1576 1558 'ReportingAnalyticsAdapter', 1577 1559 'KyselyPg', 1578 1560 'KyselyPgReadReplica', 1579 - 'getSequelizeItemTypeEventuallyConsistent', 1580 1561 'getEnabledRulesForItemTypeEventuallyConsistent', 1581 1562 'getPoliciesForRulesEventuallyConsistent', 1582 1563 'getActionsForRuleEventuallyConsistent',
-68
server/models/OrgModel.ts
··· 6 6 type Sequelize, 7 7 } from 'sequelize'; 8 8 9 - import { UserPenaltySeverity } from '../services/moderationConfigService/index.js'; 10 9 import { validateUrl } from '../utils/url.js'; 11 10 import { type LocationBank } from './banks/LocationBankModel.js'; 12 11 import { type DataTypes } from './index.js'; ··· 18 17 const { Model } = sequelize; 19 18 20 19 export type Org = InstanceType<ReturnType<typeof makeOrgModel>>; 21 - export type PolicyActionPenalties = { 22 - actionId: string; 23 - policyId: string; 24 - penalties: number[]; 25 - }; 26 20 27 21 /** 28 22 * Data Model for Organizations ··· 62 56 Org.hasMany(models.LocationBank, { as: 'LocationBanks' }); 63 57 Org.hasMany(models.Policy, { as: 'policies' }); 64 58 } 65 - 66 - static async getPolicyActionPenaltiesEventuallyConsistent(orgId: string) { 67 - const [actions, policies] = await Promise.all([ 68 - sequelize.models.Action.findAll({ where: { orgId } }), 69 - sequelize.models.Policy.findAll({ where: { orgId } }), 70 - ]); 71 - 72 - return (policies as Policy[]).flatMap((policy) => 73 - (actions as SequelizeAction[]).map( 74 - (action): PolicyActionPenalties => ({ 75 - actionId: action.id, 76 - policyId: policy.id, 77 - penalties: [ 78 - computeActionPolicyPenalty(action.penalty, policy.penalty), 79 - ], 80 - }), 81 - ), 82 - ); 83 - } 84 59 } 85 60 86 61 /* Fields */ ··· 137 112 138 113 return Org; 139 114 } 140 - 141 - /** 142 - * Computes the severity of the penalty we should apply for a given 143 - * (action, policy) pair. The general idea is to make the penalties 144 - * increase exponentially as severity levels increase, but the rate 145 - * of increase can't be so high that a (severe, severe) penalty is 146 - * 50x higher than a (high, high) penalty. 147 - * 148 - * The easiest way to achieve this exponential behavior is at the individual 149 - * severity levels, rather than trying to multiply the action penalty 150 - * by the severity penalty to compound their magnitudes. So the severity 151 - * levels apply penalty magnitudes as follows: 152 - * 153 - * NONE = 0 154 - * LOW = 1 155 - * MEDIUM = 3 156 - * HIGH = 9 157 - * SEVERE = 27 158 - * 159 - * To get the penalty value for an (action, policy) pair, we just add the 160 - * penalty values of the action and penalty because the exponential nature 161 - * of these penalties has already been taken into account. 162 - */ 163 - function computeActionPolicyPenalty( 164 - actionPenalty: UserPenaltySeverity, 165 - policyPenalty: UserPenaltySeverity, 166 - ) { 167 - // Type annotation makes sure that every possible severity has a score. 168 - const penaltySeverityMap: { [k in UserPenaltySeverity]: number } = { 169 - [UserPenaltySeverity.NONE]: 0, 170 - [UserPenaltySeverity.LOW]: 1, 171 - [UserPenaltySeverity.MEDIUM]: 3, 172 - [UserPenaltySeverity.HIGH]: 9, 173 - [UserPenaltySeverity.SEVERE]: 27, 174 - }; 175 - 176 - // If the action has no penalty (e.g., "Send to Moderation", "Restore 177 - // Content"), we never apply any penalty, regardless of the policy penalty. 178 - // Otherwise, the penalty accounts for both the action + policy penalties. 179 - return actionPenalty === UserPenaltySeverity.NONE 180 - ? 0 181 - : penaltySeverityMap[actionPenalty] + penaltySeverityMap[policyPenalty]; 182 - }
+12 -80
server/models/rules/RuleModel.ts
··· 1 - import { type ScalarType, type TaggedScalar } from '@roostorg/types'; 2 1 import _ from 'lodash'; 3 2 import sequelize, { 4 3 Sequelize, ··· 16 15 RuleStatus, 17 16 RuleType, 18 17 type ConditionSet, 19 - type LeafCondition, 20 18 } from '../../services/moderationConfigService/index.js'; 21 - import { type SerializableError } from '../../utils/errors.js'; 22 19 import { getUtcDateOnlyString } from '../../utils/time.js'; 23 - import { 24 - type NonEmptyArray, 25 - type WithUndefined, 26 - } from '../../utils/typescript-types.js'; 27 20 import { type DataTypes } from '../index.js'; 28 21 import { type User } from '../UserModel.js'; 29 22 import { type SequelizeAction } from './ActionModel.js'; 30 - import { type TaggedItemData } from './item-type-fields.js'; 31 23 import { type RuleLatestVersion } from './RuleLatestVersionModel.js'; 32 24 25 + export { 26 + ConditionCompletionOutcome, 27 + ConditionFailureOutcome, 28 + type ConditionOutcome, 29 + type ConditionCompletionMetadata, 30 + type ConditionFailureMetadata, 31 + type ConditionResult, 32 + type ConditionWithResult, 33 + type ConditionSetWithResult, 34 + type LeafConditionWithResult, 35 + } from './ruleTypes.js'; 36 + 33 37 const { Model, Op } = sequelize; 34 38 const { without } = _; 35 39 36 40 export type Rule = InstanceType<ReturnType<typeof makeRuleModel>>; 37 41 export type RuleWithLatestVersion = Rule & 38 42 Required<Pick<Rule, 'latestVersion'>>; 39 - 40 - export enum ConditionCompletionOutcome { 41 - PASSED = 'PASSED', 42 - FAILED = 'FAILED', 43 - INAPPLICABLE = 'INAPPLICABLE', 44 - } 45 - 46 - // We might add more kinds of errors here later... 47 - export enum ConditionFailureOutcome { 48 - ERRORED = 'ERRORED', 49 - } 50 - 51 - export type ConditionOutcome = 52 - | ConditionCompletionOutcome 53 - | ConditionFailureOutcome; 54 - 55 - // NB: For legacy reasons, score is stored as a string and matchingValue holds 56 - // only a single string value. In the future, its easy to imagine condition 57 - // completions having much richer metadata, which could vary by signal type. 58 - export type ConditionCompletionMetadata = { 59 - score?: string; 60 - matchedValue?: string; 61 - }; 62 - 63 - export type ConditionFailureMetadata = { 64 - error?: SerializableError; 65 - }; 66 - 67 - type ConditionResultCommonMetadata = { 68 - signalInputValues?: (TaggedScalar<ScalarType> | TaggedItemData)[]; 69 - }; 70 - 71 - // A completion outcome can have optional metadata about the signal's score etc, 72 - // while a failure outcome can have an error. In a completion outcome, the 73 - // failure metadata must be undefined (except in IS_UNAVAILABLE comparisons), 74 - // and vice-versa. 75 - // prettier-ignore 76 - export type ConditionResult = 77 - | ({ outcome: ConditionCompletionOutcome } 78 - & ConditionCompletionMetadata 79 - // Conditions that completed successfully can optionally have 80 - // some of the failure metadata (for now just the `error`) if the 81 - // condition used a signal result IS_UNAVAILABLE comparison. 82 - & Partial<Pick<ConditionFailureMetadata, 'error'>> 83 - & ConditionResultCommonMetadata) 84 - | ({ outcome: ConditionFailureOutcome; } 85 - & ConditionFailureMetadata 86 - // Failed condition must not have completion data 87 - & WithUndefined<ConditionCompletionMetadata> 88 - & ConditionResultCommonMetadata) 89 - 90 - export type ConditionWithResult = 91 - | LeafConditionWithResult 92 - | ConditionSetWithResult; 93 - 94 - export type ConditionSetWithResult = Omit<ConditionSet, 'conditions'> & { 95 - conditions: 96 - | NonEmptyArray<LeafConditionWithResult> 97 - | NonEmptyArray<ConditionSetWithResult>; 98 - result?: ConditionResult; 99 - }; 100 - 101 - // A useful type for logging. Gives us the result of the condition, 102 - // with the context (i.e., the Condition definition) to interpret that result. 103 - // The result is still optional because the conditions execution can be skipped 104 - // if the result of a parent condition can be determined without running the 105 - // child condition. Just leaving out the result property in that case is a very 106 - // nice way to model "skipped", because it means that the original condition 107 - // object can be used as a ConditionWithResult in that case. 108 - export type LeafConditionWithResult = LeafCondition & { 109 - result?: ConditionResult; 110 - }; 111 43 112 44 /** 113 45 * Data Model for Rules. Rules are comprised of
+124
server/models/rules/ruleTypes.ts
··· 1 + import { type ScalarType, type TaggedScalar } from '@roostorg/types'; 2 + 3 + import { 4 + type RuleAlarmStatus, 5 + RuleStatus, 6 + type RuleType, 7 + type ConditionSet, 8 + type LeafCondition, 9 + type Action, 10 + type Policy, 11 + } from '../../services/moderationConfigService/index.js'; 12 + import { type SerializableError } from '../../utils/errors.js'; 13 + import { 14 + type NonEmptyArray, 15 + type WithUndefined, 16 + } from '../../utils/typescript-types.js'; 17 + import { type User } from '../UserModel.js'; 18 + import { type TaggedItemData } from './item-type-fields.js'; 19 + 20 + export enum ConditionCompletionOutcome { 21 + PASSED = 'PASSED', 22 + FAILED = 'FAILED', 23 + INAPPLICABLE = 'INAPPLICABLE', 24 + } 25 + 26 + export enum ConditionFailureOutcome { 27 + ERRORED = 'ERRORED', 28 + } 29 + 30 + export type ConditionOutcome = 31 + | ConditionCompletionOutcome 32 + | ConditionFailureOutcome; 33 + 34 + export type ConditionCompletionMetadata = { 35 + score?: string; 36 + matchedValue?: string; 37 + }; 38 + 39 + export type ConditionFailureMetadata = { 40 + error?: SerializableError; 41 + }; 42 + 43 + type ConditionResultCommonMetadata = { 44 + signalInputValues?: (TaggedScalar<ScalarType> | TaggedItemData)[]; 45 + }; 46 + 47 + // prettier-ignore 48 + export type ConditionResult = 49 + | ({ outcome: ConditionCompletionOutcome } 50 + & ConditionCompletionMetadata 51 + & Partial<Pick<ConditionFailureMetadata, 'error'>> 52 + & ConditionResultCommonMetadata) 53 + | ({ outcome: ConditionFailureOutcome; } 54 + & ConditionFailureMetadata 55 + & WithUndefined<ConditionCompletionMetadata> 56 + & ConditionResultCommonMetadata) 57 + 58 + export type ConditionWithResult = 59 + | LeafConditionWithResult 60 + | ConditionSetWithResult; 61 + 62 + export type ConditionSetWithResult = Omit<ConditionSet, 'conditions'> & { 63 + conditions: 64 + | NonEmptyArray<LeafConditionWithResult> 65 + | NonEmptyArray<ConditionSetWithResult>; 66 + result?: ConditionResult; 67 + }; 68 + 69 + export type LeafConditionWithResult = LeafCondition & { 70 + result?: ConditionResult; 71 + }; 72 + 73 + export type RuleLatestVersionRow = { 74 + ruleId: string; 75 + version: string; 76 + }; 77 + 78 + /** 79 + * Rule row fields shared by the rule engine (no GraphQL resolver methods). 80 + */ 81 + export type PlainRuleWithLatestVersion = { 82 + id: string; 83 + name: string; 84 + description: string | null; 85 + statusIfUnexpired: Exclude<RuleStatus, typeof RuleStatus.EXPIRED>; 86 + status: RuleStatus; 87 + tags: string[]; 88 + maxDailyActions: number | null; 89 + dailyActionsRun: number; 90 + lastActionDate: string | null; 91 + createdAt: Date; 92 + updatedAt: Date; 93 + orgId: string; 94 + creatorId: string; 95 + expirationTime: Date | null; 96 + conditionSet: ConditionSet; 97 + alarmStatus: RuleAlarmStatus; 98 + alarmStatusSetAt: Date; 99 + ruleType: RuleType; 100 + parentId: string | null; 101 + latestVersion: RuleLatestVersionRow; 102 + }; 103 + 104 + export function computeRuleStatusFromRow( 105 + expirationTime: Date | null, 106 + statusIfUnexpired: Exclude<RuleStatus, typeof RuleStatus.EXPIRED>, 107 + ): RuleStatus { 108 + if (expirationTime && expirationTime.valueOf() < Date.now()) { 109 + return RuleStatus.EXPIRED; 110 + } 111 + return statusIfUnexpired; 112 + } 113 + 114 + export type RuleGraphqlMethods = { 115 + getCreator(): Promise<User>; 116 + getActions(): Promise<Action[]>; 117 + getPolicies(): Promise<Policy[]>; 118 + }; 119 + 120 + /** GraphQL parent for Rule / ContentRule / UserRule / RuleInsights. */ 121 + export type Rule = PlainRuleWithLatestVersion & RuleGraphqlMethods; 122 + 123 + /** @deprecated Use {@link PlainRuleWithLatestVersion} directly. Remove after Kysely migration. */ 124 + export type RuleWithLatestVersion = PlainRuleWithLatestVersion;
+1 -120
server/package-lock.json
··· 69 69 "helmet": "^4.6.0", 70 70 "ioredis": "^5.2.4", 71 71 "jsonwebtoken": "^9.0.3", 72 - "knex": "^2.3.0", 73 72 "kysely": "^0.28.16", 74 73 "latlon-geohash": "^2.0.0", 75 74 "lodash": "^4.18.1", ··· 13141 13140 "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", 13142 13141 "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" 13143 13142 }, 13144 - "node_modules/colorette": { 13145 - "version": "2.0.19", 13146 - "resolved": "https://registry.npmjs.org/colorette/-/colorette-2.0.19.tgz", 13147 - "integrity": "sha512-3tlv/dIP7FWvj3BsbHrGLJ6l/oKh1O3TcgBqMn+yyCagOxc23fyzDS6HypQbgxWbkpDnf52p1LuR4eWDQ/K9WQ==" 13148 - }, 13149 13143 "node_modules/colors": { 13150 13144 "version": "1.4.0", 13151 13145 "resolved": "https://registry.npmjs.org/colors/-/colors-1.4.0.tgz", ··· 13163 13157 }, 13164 13158 "engines": { 13165 13159 "node": ">= 0.8" 13166 - } 13167 - }, 13168 - "node_modules/commander": { 13169 - "version": "10.0.1", 13170 - "resolved": "https://registry.npmjs.org/commander/-/commander-10.0.1.tgz", 13171 - "integrity": "sha512-y4Mg2tXshplEbSGzx7amzPwKKOCGuoSRP/CjEdwwk0FOGlUbq6lKuoyDZTNZkmxHdJtp54hdfY/JUrdL7Xfdug==", 13172 - "engines": { 13173 - "node": ">=14" 13174 13160 } 13175 13161 }, 13176 13162 "node_modules/comment-parser": { ··· 14558 14544 "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", 14559 14545 "license": "MIT" 14560 14546 }, 14561 - "node_modules/esm": { 14562 - "version": "3.2.25", 14563 - "resolved": "https://registry.npmjs.org/esm/-/esm-3.2.25.tgz", 14564 - "integrity": "sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==", 14565 - "engines": { 14566 - "node": ">=6" 14567 - } 14568 - }, 14569 14547 "node_modules/espree": { 14570 14548 "version": "10.4.0", 14571 14549 "resolved": "https://registry.npmjs.org/espree/-/espree-10.4.0.tgz", ··· 15426 15404 "version": "0.1.0", 15427 15405 "resolved": "https://registry.npmjs.org/get-package-type/-/get-package-type-0.1.0.tgz", 15428 15406 "integrity": "sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==", 15407 + "dev": true, 15429 15408 "engines": { 15430 15409 "node": ">=8.0.0" 15431 15410 } ··· 15485 15464 "funding": { 15486 15465 "url": "https://github.com/privatenumber/get-tsconfig?sponsor=1" 15487 15466 } 15488 - }, 15489 - "node_modules/getopts": { 15490 - "version": "2.3.0", 15491 - "resolved": "https://registry.npmjs.org/getopts/-/getopts-2.3.0.tgz", 15492 - "integrity": "sha512-5eDf9fuSXwxBL6q5HX+dhDj+dslFGWzU5thZ9kNKUkcPtaPdatmUFKwHFrLb/uf/WpA4BHET+AX3Scl56cAjpA==" 15493 15467 }, 15494 15468 "node_modules/glob": { 15495 15469 "version": "7.2.3", ··· 16098 16072 "node": ">= 0.4" 16099 16073 } 16100 16074 }, 16101 - "node_modules/interpret": { 16102 - "version": "2.2.0", 16103 - "resolved": "https://registry.npmjs.org/interpret/-/interpret-2.2.0.tgz", 16104 - "integrity": "sha512-Ju0Bz/cEia55xDwUWEa8+olFpCiQoypjnQySseKtmjNrnps3P+xfpUmGr90T7yjlVJmOtybRvPXhKMbHr+fWnw==", 16105 - "engines": { 16106 - "node": ">= 0.10" 16107 - } 16108 - }, 16109 16075 "node_modules/ioredis": { 16110 16076 "version": "5.9.2", 16111 16077 "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.9.2.tgz", ··· 17608 17574 "node": ">=6" 17609 17575 } 17610 17576 }, 17611 - "node_modules/knex": { 17612 - "version": "2.5.1", 17613 - "resolved": "https://registry.npmjs.org/knex/-/knex-2.5.1.tgz", 17614 - "integrity": "sha512-z78DgGKUr4SE/6cm7ku+jHvFT0X97aERh/f0MUKAKgFnwCYBEW4TFBqtHWFYiJFid7fMrtpZ/gxJthvz5mEByA==", 17615 - "dependencies": { 17616 - "colorette": "2.0.19", 17617 - "commander": "^10.0.0", 17618 - "debug": "4.3.4", 17619 - "escalade": "^3.1.1", 17620 - "esm": "^3.2.25", 17621 - "get-package-type": "^0.1.0", 17622 - "getopts": "2.3.0", 17623 - "interpret": "^2.2.0", 17624 - "lodash": "^4.17.21", 17625 - "pg-connection-string": "2.6.1", 17626 - "rechoir": "^0.8.0", 17627 - "resolve-from": "^5.0.0", 17628 - "tarn": "^3.0.2", 17629 - "tildify": "2.0.0" 17630 - }, 17631 - "bin": { 17632 - "knex": "bin/cli.js" 17633 - }, 17634 - "engines": { 17635 - "node": ">=12" 17636 - }, 17637 - "peerDependenciesMeta": { 17638 - "better-sqlite3": { 17639 - "optional": true 17640 - }, 17641 - "mysql": { 17642 - "optional": true 17643 - }, 17644 - "mysql2": { 17645 - "optional": true 17646 - }, 17647 - "pg": { 17648 - "optional": true 17649 - }, 17650 - "pg-native": { 17651 - "optional": true 17652 - }, 17653 - "sqlite3": { 17654 - "optional": true 17655 - }, 17656 - "tedious": { 17657 - "optional": true 17658 - } 17659 - } 17660 - }, 17661 - "node_modules/knex/node_modules/resolve-from": { 17662 - "version": "5.0.0", 17663 - "resolved": "https://registry.npmjs.org/resolve-from/-/resolve-from-5.0.0.tgz", 17664 - "integrity": "sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==", 17665 - "engines": { 17666 - "node": ">=8" 17667 - } 17668 - }, 17669 17577 "node_modules/kysely": { 17670 17578 "version": "0.28.16", 17671 17579 "resolved": "https://registry.npmjs.org/kysely/-/kysely-0.28.16.tgz", ··· 19312 19220 "node": ">= 6" 19313 19221 } 19314 19222 }, 19315 - "node_modules/rechoir": { 19316 - "version": "0.8.0", 19317 - "resolved": "https://registry.npmjs.org/rechoir/-/rechoir-0.8.0.tgz", 19318 - "integrity": "sha512-/vxpCXddiX8NGfGO/mTafwjq4aFa/71pvamip0++IQk3zG8cbCj0fifNPrjjF1XMXUne91jL9OoxmdykoEtifQ==", 19319 - "dependencies": { 19320 - "resolve": "^1.20.0" 19321 - }, 19322 - "engines": { 19323 - "node": ">= 10.13.0" 19324 - } 19325 - }, 19326 19223 "node_modules/redis-errors": { 19327 19224 "version": "1.2.0", 19328 19225 "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", ··· 20621 20518 "node": ">=6" 20622 20519 } 20623 20520 }, 20624 - "node_modules/tarn": { 20625 - "version": "3.0.2", 20626 - "resolved": "https://registry.npmjs.org/tarn/-/tarn-3.0.2.tgz", 20627 - "integrity": "sha512-51LAVKUSZSVfI05vjPESNc5vwqqZpbXCsU+/+wxlOrUjk2SnFTt97v9ZgQrD4YmxYW1Px6w2KjaDitCfkvgxMQ==", 20628 - "engines": { 20629 - "node": ">=8.0.0" 20630 - } 20631 - }, 20632 20521 "node_modules/teeny-request": { 20633 20522 "version": "10.1.2", 20634 20523 "resolved": "https://registry.npmjs.org/teeny-request/-/teeny-request-10.1.2.tgz", ··· 20724 20613 "license": "MIT", 20725 20614 "dependencies": { 20726 20615 "safe-buffer": "~5.1.0" 20727 - } 20728 - }, 20729 - "node_modules/tildify": { 20730 - "version": "2.0.0", 20731 - "resolved": "https://registry.npmjs.org/tildify/-/tildify-2.0.0.tgz", 20732 - "integrity": "sha512-Cc+OraorugtXNfs50hU9KS369rFXCfgGLpfCfvlc+Ud5u6VWmUQsOAa9HbTvheQdYnrdJqqv1e5oIqXppMYnSw==", 20733 - "engines": { 20734 - "node": ">=8" 20735 20616 } 20736 20617 }, 20737 20618 "node_modules/tinyglobby": {
-1
server/package.json
··· 83 83 "helmet": "^4.6.0", 84 84 "ioredis": "^5.2.4", 85 85 "jsonwebtoken": "^9.0.3", 86 - "knex": "^2.3.0", 87 86 "kysely": "^0.28.16", 88 87 "latlon-geohash": "^2.0.0", 89 88 "lodash": "^4.18.1",
+1 -1
server/plugins/warehouse/IWarehouseAdapter.ts
··· 5 5 * 6 6 * Adapters power operational workloads (transactions, point queries, etc.). 7 7 * The interface intentionally mirrors a typical SQL client without enforcing 8 - * a specific library (Kysely, Knex, pg, etc.). 8 + * a specific library (Kysely, pg, etc.). 9 9 */ 10 10 export interface IWarehouseAdapter { 11 11 /** Human friendly provider name for logging / diagnostics. */
+2
server/rule_engine/ActionPublisher.test.ts
··· 42 42 orgId: 'org-123', 43 43 name: 'Action 1', 44 44 applyUserStrikes: false, 45 + penalty: 'NONE' as const, 45 46 actionType: ActionType.CUSTOM_ACTION, 46 47 callbackUrl: 'https://example.com/action1', 47 48 callbackUrlHeaders: null, ··· 73 74 orgId: 'org-123', 74 75 name: 'Action 2', 75 76 applyUserStrikes: false, 77 + penalty: 'NONE' as const, 76 78 actionType: ActionType.CUSTOM_ACTION, 77 79 callbackUrl: 'https://example.com/action2', 78 80 callbackUrlHeaders: null,
+7 -7
server/rule_engine/RuleEngine.ts
··· 7 7 } from '../condition_evaluator/conditionSet.js'; 8 8 import { type Dependencies } from '../iocContainer/index.js'; 9 9 import { inject } from '../iocContainer/utils.js'; 10 - import { 11 - ConditionCompletionOutcome, 12 - type RuleWithLatestVersion, 13 - type Rule as TRule, 14 - } from '../models/rules/RuleModel.js'; 10 + import { ConditionCompletionOutcome } from '../models/rules/RuleModel.js'; 11 + import { type PlainRuleWithLatestVersion } from '../models/rules/ruleTypes.js'; 15 12 import { evaluateAggregationRuntimeArgsForItem } from '../services/aggregationsService/index.js'; 16 13 import { type ItemSubmission } from '../services/itemProcessingService/index.js'; 17 14 import { ··· 182 179 * @param sync - whether the request should run synchronously 183 180 */ 184 181 async runRuleSet( 185 - rules: ReadonlyDeep<RuleWithLatestVersion[]>, 182 + rules: ReadonlyDeep<PlainRuleWithLatestVersion[]>, 186 183 evaluationContext: RuleEvaluationContext, 187 184 environment: RuleEnvironment, 188 185 executionsCorrelationId: RuleExecutionCorrelationId, 189 186 sync?: boolean, 190 187 ): Promise<{ 191 - rulesToResults: Map<ReadonlyDeep<TRule>, RuleExecutionResult>; 188 + rulesToResults: Map< 189 + ReadonlyDeep<PlainRuleWithLatestVersion>, 190 + RuleExecutionResult 191 + >; 192 192 actions: readonly Action[]; 193 193 }> { 194 194 if (!rules.length) {
+60 -98
server/rule_engine/ruleEngineQueries.ts
··· 4 4 * the signal execution service) to enable those services to make the queries 5 5 * they need to run a rule. Having these queries defined separately + injected 6 6 * into the consumers gives us a cleaner place to add optimizations to the query 7 - * logic (i.e., run the queries against replicas, add caching, etc) and makes 7 + * logic (i.e. run the queries against replicas, add caching, etc) and makes 8 8 * the consumers much more unit testable. 9 9 */ 10 - import { Op } from 'sequelize'; 10 + import { sql, type Kysely } from 'kysely'; 11 11 12 12 import { inject } from '../iocContainer/index.js'; 13 13 import { type LocationArea } from '../models/types/locationArea.js'; 14 - import { type Action } from '../services/moderationConfigService/index.js'; 14 + import { type CombinedPg } from '../services/combinedDbTypes.js'; 15 15 import { cached } from '../utils/caching.js'; 16 16 import { jsonParse, jsonStringify } from '../utils/encoding.js'; 17 + import { makeKyselyTransactionWithRetry } from '../utils/kyselyTransactionWithRetry.js'; 17 18 import { getUtcDateOnlyString } from '../utils/time.js'; 18 19 19 20 export const makeGetEnabledRulesForItemTypeEventuallyConsistent = inject( 20 - ['getSequelizeItemTypeEventuallyConsistent'], 21 - function (getSequelizeItemTypeEventuallyConsistent) { 21 + ['ModerationConfigService'], 22 + function (moderationConfigService) { 22 23 return cached({ 23 24 async producer(itemTypeId: string) { 24 - // Getting the enabledRules is currently coupled to sequelize, so, 25 - // annoyingly, we first have to convert the contentTypeId into a full 26 - // contentType model object. However, we don't want to incur too much 27 - // overhead for that, so we use a cached lookup. (Note: we can't just 28 - // take a model object as the argument because our caching library 29 - // requires the cache key to be a string; further, even if we could just 30 - // accept the model object, we wouldn't want to, because we want to move 31 - // away from this coupling to sequelize.) 32 - const itemType = await getSequelizeItemTypeEventuallyConsistent({ 33 - id: itemTypeId, 34 - }); 35 - 36 - return itemType ? itemType.getEnabledRules() : null; 25 + return moderationConfigService.getEnabledRulesForItemType(itemTypeId); 37 26 }, 38 27 directives: { freshUntilAge: 20 }, 39 28 }); ··· 44 33 typeof makeGetEnabledRulesForItemTypeEventuallyConsistent 45 34 >; 46 35 47 - export const makeGetSequelizeItemTypeEventuallyConsistent = inject( 48 - ['ItemTypeModel'], 49 - (ItemType) => { 50 - return cached({ 51 - async producer(key: { id: string } | { name: string; orgId: string }) { 52 - return 'id' in key 53 - ? ItemType.findByPk(key.id) 54 - : ItemType.findOne({ where: { name: key.name, orgId: key.orgId } }); 55 - }, 56 - directives: { freshUntilAge: 10, maxStale: [0, 2, 2] }, 57 - }); 58 - }, 59 - ); 60 - 61 - export type GetSequelizeItemTypeEventuallyConsistent = ReturnType< 62 - typeof makeGetSequelizeItemTypeEventuallyConsistent 63 - >; 64 - 65 36 export const makeGetItemTypesForOrgEventuallyConsistent = inject( 66 37 ['ModerationConfigService'], 67 38 (moderationConfigService) => async (orgId: string) => ··· 74 45 typeof makeGetItemTypesForOrgEventuallyConsistent 75 46 >; 76 47 77 - // TODO: this could probably be improved to increase cache hit rates, since 78 - // rn the cache will only be used if all the ids have previously been fetched. 79 48 export const makeGetPoliciesForRulesEventuallyConsistent = inject( 80 - ['PolicyModel'], 81 - function (Policy) { 49 + ['ModerationConfigService'], 50 + function (moderationConfigService) { 82 51 return cached({ 83 52 keyGeneration: { 84 53 toString: (ids: readonly string[]) => jsonStringify([...ids].sort()), 85 54 fromString: (it) => jsonParse(it), 86 55 }, 87 - async producer(key) { 88 - return Policy.getPoliciesForRuleIds(key); 56 + async producer(key: readonly string[]) { 57 + return moderationConfigService.getPoliciesByRuleIds(key); 89 58 }, 90 59 directives: { freshUntilAge: 120 }, 91 60 }); ··· 97 66 >; 98 67 99 68 export const makeGetActionsForRuleEventuallyConsistent = inject( 100 - ['ActionModel'], 101 - (Action) => { 69 + ['ModerationConfigService'], 70 + (moderationConfigService) => { 102 71 return cached({ 103 72 async producer(ruleId: string) { 104 - // This generates a pretty slow/overly-complex query, but I think it's 105 - // the best we can do with Sequelize. Eventually, we want to move off of 106 - // Sequelize, so we don't fetch full model instances + we cast the 107 - // result to be a plain data object, so that the rest of the code can't 108 - // depend on getting an Action model instance back, as that won't always 109 - // be the case. 110 - return Action.findAll({ 111 - where: { '$rules.id$': ruleId }, 112 - include: [{ association: 'rules', attributes: ['id'] }], 113 - raw: true, 114 - }) as Promise<Action[]>; 73 + return moderationConfigService.getActionsForRuleId(ruleId); 115 74 }, 116 75 directives: { freshUntilAge: 30 }, 117 76 }); ··· 123 82 >; 124 83 125 84 export const makeGetLocationBankLocationsEventuallyConsistent = inject( 126 - ['LocationBankLocationModel'], 127 - (LocationBankLocation) => { 85 + ['KyselyPgReadReplica'], 86 + (db) => { 128 87 return cached({ 129 88 async producer(bankId: string) { 130 - // NB: we use `raw: true` to get back plain JS objects, rather than 131 - // sequelize model instances. We do that because, with model instances, 132 - // every proprety access runs some extra getter code; see 133 - // https://github.com/sequelize/sequelize/blob/e77dcf78b341b62c97dbb29f16ce7a23f46ddc53/src/model.js#L42 134 - // This ends up killing the performance of our hot-path code that checks 135 - // whether a location is in each of these location areas. 136 - // 137 - // Meanwhile, we have to cast to LocationArea[] because findAll is still 138 - // typed (incorrectly) to return the model instance, even when `raw: 139 - // true` is provided. 140 - return LocationBankLocation.findAll({ 141 - where: { bankId }, 142 - raw: true, 143 - }) as Promise<LocationArea[]>; 89 + const rows = await db 90 + .selectFrom('public.location_bank_locations') 91 + .selectAll() 92 + .where('bank_id', '=', bankId) 93 + .execute(); 94 + return rows.map( 95 + (r) => 96 + ({ 97 + id: r.id, 98 + name: r.name ?? undefined, 99 + geometry: r.geometry as LocationArea['geometry'], 100 + bounds: r.bounds as LocationArea['bounds'], 101 + googlePlaceInfo: r.google_place_info as LocationArea['googlePlaceInfo'], 102 + }) satisfies LocationArea, 103 + ); 144 104 }, 145 105 directives(locations) { 146 106 const numLocations = locations.length; ··· 197 157 >; 198 158 199 159 export const makeRecordRuleActionLimitUsage = inject( 200 - ['Sequelize', 'Tracer'], 160 + ['KyselyPg', 'Tracer'], 201 161 (db, tracer) => { 202 - /** 203 - * Record that each of the rules given by ruleIds has used up one of its 204 - * daily action runs, against its maxDailyActions. 205 - */ 162 + const transactionWithRetry = makeKyselyTransactionWithRetry( 163 + db as Kysely<CombinedPg>, 164 + ); 165 + 206 166 async function recordRuleActionLimitUsage(ruleIds: readonly string[]) { 207 167 if (ruleIds.length === 0) { 208 168 return; 209 169 } 210 170 211 - const today = getUtcDateOnlyString(); 212 - await db.transactionWithRetry(async () => { 213 - // Using two queries like this isn't as efficient as, e.g., 214 - // UPDATE `rules` 215 - // SET `daily_actions_run` = 216 - // IF(last_action_date != $1, 1, daily_actions_run + 1) 217 - // SET `last_action_date` = $1 218 - // WHERE `id` IN (...); 219 - // But it lets us keep the code in Sequelize, which is probably worth it. 220 - await db.Rule.increment( 221 - { dailyActionsRun: 1 }, 222 - { where: { id: { [Op.in]: ruleIds }, lastActionDate: today } }, 223 - ); 171 + const today = String(getUtcDateOnlyString()); 172 + await transactionWithRetry(async (trx) => { 173 + await trx 174 + .updateTable('public.rules') 175 + .set({ 176 + daily_actions_run: sql`daily_actions_run + 1`, 177 + }) 178 + .where('id', 'in', [...ruleIds]) 179 + .where('last_action_date', '=', today) 180 + .execute(); 224 181 225 - await db.Rule.update( 226 - { dailyActionsRun: 1, lastActionDate: today }, 227 - { 228 - where: { 229 - id: { [Op.in]: ruleIds }, 230 - lastActionDate: { [Op.ne]: today }, 231 - }, 232 - }, 233 - ); 182 + await trx 183 + .updateTable('public.rules') 184 + .set({ 185 + daily_actions_run: 1, 186 + last_action_date: today, 187 + }) 188 + .where('id', 'in', [...ruleIds]) 189 + .where((eb) => 190 + eb.or([ 191 + eb('last_action_date', 'is', null), 192 + eb('last_action_date', '!=', today), 193 + ]), 194 + ) 195 + .execute(); 234 196 }); 235 197 } 236 198
+8 -2
server/services/combinedDbTypes.ts
··· 1 + import { type ApiKeyServicePg } from './apiKeyService/dbTypes.js'; 2 + import { type CoreAppTablesPg } from './coreAppTables.js'; 1 3 import { type ModerationConfigServicePg } from './moderationConfigService/dbTypes.js'; 2 - import { type ApiKeyServicePg } from './apiKeyService/dbTypes.js'; 3 4 import { type SigningKeyPairServicePg } from './signingKeyPairService/dbTypes.js'; 5 + import { type UserManagementPg } from './userManagementService/dbTypes.js'; 4 6 5 - export type CombinedPg = ModerationConfigServicePg & ApiKeyServicePg & SigningKeyPairServicePg; 7 + export type CombinedPg = ModerationConfigServicePg & 8 + ApiKeyServicePg & 9 + SigningKeyPairServicePg & 10 + UserManagementPg & 11 + CoreAppTablesPg;
+59
server/services/coreAppTables.ts
··· 1 + import { type Generated, type GeneratedAlways } from 'kysely'; 2 + 3 + /** Postgres enum for backtests.status (generated column — read-only in app). */ 4 + export type BacktestStatusDb = 'RUNNING' | 'COMPLETE' | 'CANCELED'; 5 + 6 + export type CoreAppTablesPg = { 7 + 'public.orgs': { 8 + id: string; 9 + email: string; 10 + name: string; 11 + website_url: string; 12 + api_key_id: string | null; 13 + created_at: Date; 14 + updated_at: Date; 15 + on_call_alert_email: string | null; 16 + }; 17 + 'public.location_banks': { 18 + id: string; 19 + name: string; 20 + description: string | null; 21 + org_id: string; 22 + owner_id: string; 23 + created_at: GeneratedAlways<Date>; 24 + updated_at: Date; 25 + full_places_api_responses: unknown[]; 26 + }; 27 + 'public.location_bank_locations': { 28 + id: string; 29 + bank_id: string; 30 + geometry: unknown; 31 + bounds: unknown | null; 32 + name: string | null; 33 + google_place_info: unknown | null; 34 + created_at: GeneratedAlways<Date>; 35 + updated_at: GeneratedAlways<Date>; 36 + }; 37 + 'public.backtests': { 38 + id: string; 39 + rule_id: string; 40 + creator_id: string; 41 + sample_desired_size: number; 42 + sample_actual_size: Generated<number>; 43 + sample_start_at: Date; 44 + sample_end_at: Date; 45 + sampling_complete: Generated<boolean>; 46 + content_items_processed: Generated<number>; 47 + content_items_matched: Generated<number>; 48 + created_at: GeneratedAlways<Date>; 49 + updated_at: Date; 50 + cancelation_date: Date | null; 51 + status: GeneratedAlways<BacktestStatusDb>; 52 + }; 53 + 'public.users_and_favorite_rules': { 54 + user_id: string; 55 + rule_id: string; 56 + created_at: GeneratedAlways<Date>; 57 + updated_at: Date; 58 + }; 59 + };
+20
server/services/moderationConfigService/dbTypes.ts
··· 62 62 created_at: GeneratedAlways<Date>; 63 63 updated_at: GeneratedAlways<Date>; 64 64 }; 65 + 'public.rules_and_actions': { 66 + action_id: string; 67 + rule_id: string; 68 + created_at: GeneratedAlways<Date>; 69 + updated_at: GeneratedAlways<Date>; 70 + sys_period: GeneratedAlways<unknown>; 71 + }; 72 + 'public.rules_and_policies': { 73 + policy_id: string; 74 + rule_id: string; 75 + created_at: Date; 76 + updated_at: Date; 77 + sys_period: GeneratedAlways<unknown>; 78 + }; 65 79 'public.actions_and_item_types': { 66 80 action_id: string; 67 81 item_type_id: string; ··· 100 114 ACCEPT_APPEAL: { callback_url: null }; 101 115 } 102 116 >; 117 + 'public.rules_latest_versions': { 118 + rule_id: string; 119 + version: string; 120 + }; 103 121 'public.rules': { 104 122 id: string; 105 123 name: string; 124 + description: string | null; 106 125 status_if_unexpired: RuleStatus; 107 126 tags: string[]; 108 127 max_daily_actions: number | null; ··· 117 136 alarm_status: Generated<RuleAlarmStatus>; 118 137 alarm_status_set_at: Generated<Date>; 119 138 rule_type: RuleType; 139 + parent_id: string | null; 120 140 }; 121 141 'public.policies': { 122 142 id: string;
+49
server/services/moderationConfigService/errors.ts
··· 1 + import { 2 + CoopError, 3 + ErrorType, 4 + type ErrorInstanceData, 5 + } from '../../utils/errors.js'; 6 + 7 + export type RuleErrorType = 8 + | 'RuleNameExistsError' 9 + | 'RuleHasRunningBacktestsError' 10 + | 'RuleIsMissingContentTypeError'; 11 + 12 + export const makeRuleNameExistsError = (data: ErrorInstanceData) => 13 + new CoopError({ 14 + status: 409, 15 + type: [ErrorType.UniqueViolation], 16 + title: 'A rule with that name already exists in this organization.', 17 + name: 'RuleNameExistsError', 18 + ...data, 19 + }); 20 + 21 + export const makeRuleIsMissingContentTypeError = (data: ErrorInstanceData) => 22 + new CoopError({ 23 + status: 400, 24 + type: [ErrorType.InvalidUserInput], 25 + title: 'This rule must contain a content type on which to operate.', 26 + name: 'RuleIsMissingContentTypeError', 27 + ...data, 28 + }); 29 + 30 + export const makeRuleHasRunningBacktestsError = (data: ErrorInstanceData) => 31 + new CoopError({ 32 + status: 409, 33 + type: [ErrorType.AttemptingToMutateActiveRule], 34 + title: 35 + "This rule cannot be updated while it has running backtests, which are using the rule's current conditions.", 36 + name: 'RuleHasRunningBacktestsError', 37 + ...data, 38 + }); 39 + 40 + export type LocationBankErrorType = 'LocationBankNameExistsError'; 41 + 42 + export const makeLocationBankNameExistsError = (data: ErrorInstanceData) => 43 + new CoopError({ 44 + status: 409, 45 + type: [ErrorType.UniqueViolation], 46 + title: 'A location bank with this name already exists', 47 + name: 'LocationBankNameExistsError', 48 + ...data, 49 + });
+7
server/services/moderationConfigService/index.ts
··· 44 44 ModerationConfigService, 45 45 ModerationConfigErrorType, 46 46 } from './moderationConfigService.js'; 47 + 48 + export { 49 + makeRuleNameExistsError, 50 + makeRuleIsMissingContentTypeError, 51 + makeRuleHasRunningBacktestsError, 52 + makeLocationBankNameExistsError, 53 + } from './errors.js';
+1
server/services/moderationConfigService/moderationConfigService.test.ts
··· 452 452 "id": Any<String>, 453 453 "name": "Test Action", 454 454 "orgId": Any<String>, 455 + "penalty": "NONE", 455 456 } 456 457 `, 457 458 );
+27 -54
server/services/moderationConfigService/moderationConfigService.ts
··· 4 4 5 5 import { type ConsumerDirectives } from '../../lib/cache/index.js'; 6 6 import type { Invoker } from '../../models/types/permissioning.js'; 7 - import { 8 - CoopError, 9 - ErrorType, 10 - type ErrorInstanceData, 11 - } from '../../utils/errors.js'; 12 - import { __throw } from '../../utils/misc.js'; 7 + import { type RuleErrorType, type LocationBankErrorType } from './errors.js'; 13 8 import { type ModerationConfigServicePg } from './dbTypes.js'; 14 9 import { type Action, type Policy } from './index.js'; 15 10 import ActionOperations, { ··· 22 17 import PolicyOperations, { 23 18 type PolicyErrorType, 24 19 } from './modules/PolicyOperations.js'; 20 + import RuleReadOperations from './modules/RuleReadOperations.js'; 25 21 import UserStrikeOperations, { 26 22 type UserStrikeThresholdErrorType, 27 23 } from './modules/UserStrikeOperations.js'; ··· 35 31 type UserItemType, 36 32 } from './types/itemTypes.js'; 37 33 import type { PolicyType } from './types/policies.js'; 34 + import { type PlainRuleWithLatestVersion } from '../../models/rules/ruleTypes.js'; 38 35 39 36 export type ModerationConfigErrorType = 40 37 | 'AttemptingToDeleteDefaultUserType' ··· 78 75 private readonly itemTypeOps: ItemTypeOperations; 79 76 private readonly userStrikeOps: UserStrikeOperations; 80 77 private readonly matchingBankOps: MatchingBankOperations; 78 + private readonly ruleReadOps: RuleReadOperations; 81 79 82 80 constructor( 83 81 pgQuery: Kysely<ModerationConfigServicePg>, ··· 94 92 onDeletePolicyId, 95 93 ); 96 94 this.itemTypeOps = new ItemTypeOperations(pgQuery, pgQueryReplica); 97 - // TODO: Remove Rule API and replace with kysely 98 95 this.userStrikeOps = new UserStrikeOperations(pgQuery, pgQueryReplica); 99 96 this.matchingBankOps = new MatchingBankOperations(pgQuery, pgQueryReplica); 97 + this.ruleReadOps = new RuleReadOperations(pgQuery, pgQueryReplica); 100 98 } 101 99 102 100 async getItemTypes(opts: { ··· 285 283 return this.actionOps.getActions(opts); 286 284 } 287 285 286 + async getActionsForRuleId(ruleId: string) { 287 + return this.actionOps.getActionsForRuleId({ 288 + ruleId, 289 + readFromReplica: true, 290 + }); 291 + } 292 + 293 + async getPoliciesByRuleIds(ruleIds: readonly string[]) { 294 + return this.policyOps.getPoliciesByRuleIds({ 295 + ruleIds, 296 + readFromReplica: true, 297 + }); 298 + } 299 + 300 + async getEnabledRulesForItemType(itemTypeId: string) { 301 + return this.ruleReadOps.getEnabledRulesForItemType(itemTypeId); 302 + } 303 + 304 + async findEnabledUserRules(): Promise<PlainRuleWithLatestVersion[]> { 305 + return this.ruleReadOps.findEnabledUserRules(); 306 + } 307 + 288 308 async getPolicies(opts: { orgId: string; readFromReplica?: boolean }) { 289 309 return this.policyOps.getPolicies(opts); 290 310 } ··· 429 449 } 430 450 } 431 451 432 - type RuleErrorType = 433 - | 'RuleNameExistsError' 434 - | 'RuleHasRunningBacktestsError' 435 - | 'RuleIsMissingContentTypeError'; 436 - 437 - // TODO: throw this error as appropriate on failed rule creation/update. 438 - export const makeRuleNameExistsError = (data: ErrorInstanceData) => 439 - new CoopError({ 440 - status: 409, 441 - type: [ErrorType.UniqueViolation], 442 - title: 'A rule with that name already exists in this organization.', 443 - name: 'RuleNameExistsError', 444 - ...data, 445 - }); 446 - 447 - // TODO: throw this error as appropriate on failed rule creation/update. 448 - export const makeRuleIsMissingContentTypeError = (data: ErrorInstanceData) => 449 - new CoopError({ 450 - status: 400, 451 - type: [ErrorType.InvalidUserInput], 452 - title: 'This rule must contain a content type on which to operate.', 453 - name: 'RuleIsMissingContentTypeError', 454 - ...data, 455 - }); 456 - 457 - // TODO: throw this error as appropriate on failed rule creation/update. 458 - export const makeRuleHasRunningBacktestsError = (data: ErrorInstanceData) => 459 - new CoopError({ 460 - status: 409, 461 - type: [ErrorType.AttemptingToMutateActiveRule], 462 - title: 463 - "This rule cannot be updated while it has running backtests, which are using the rule's current conditions.", 464 - name: 'RuleHasRunningBacktestsError', 465 - ...data, 466 - }); 467 - 468 - type LocationBankErrorType = 'LocationBankNameExistsError'; 469 - 470 - // TODO: throw this error as appropriate on failed bank creation/update. 471 - export const makeLocationBankNameExistsError = (data: ErrorInstanceData) => 472 - new CoopError({ 473 - status: 409, 474 - type: [ErrorType.UniqueViolation], 475 - title: 'A location bank with this name already exists', 476 - name: 'LocationBankNameExistsError', 477 - ...data, 478 - });
+31
server/services/moderationConfigService/modules/ActionOperations.ts
··· 29 29 'apply_user_strikes as applyUserStrikes', 30 30 ] as const; 31 31 32 + const actionJoinDbSelection = [ 33 + 'a.id', 34 + 'a.name', 35 + 'a.description', 36 + 'a.callback_url as callbackUrl', 37 + 'a.callback_url_headers as callbackUrlHeaders', 38 + 'a.callback_url_body as callbackUrlBody', 39 + 'a.org_id as orgId', 40 + 'a.penalty', 41 + 'a.action_type as actionType', 42 + 'a.applies_to_all_items_of_kind as appliesToAllItemsOfKind', 43 + 'a.apply_user_strikes as applyUserStrikes', 44 + ] as const; 45 + 32 46 type ActionDbResult = FixKyselyRowCorrelation< 33 47 ModerationConfigServicePg['public.actions'], 34 48 typeof actionDbSelection ··· 105 119 return results.map((it) => this.#dbResultToAction(it)); 106 120 } 107 121 122 + async getActionsForRuleId(opts: { 123 + ruleId: string; 124 + readFromReplica?: boolean; 125 + }) { 126 + const { ruleId, readFromReplica } = opts; 127 + const pgQuery = this.#getPgQuery(readFromReplica ?? true); 128 + const results = (await pgQuery 129 + .selectFrom('public.rules_and_actions as raa') 130 + .innerJoin('public.actions as a', 'a.id', 'raa.action_id') 131 + .select(actionJoinDbSelection) 132 + .where('raa.rule_id', '=', ruleId) 133 + .execute()) as ActionDbResult[]; 134 + 135 + return results.map((it) => this.#dbResultToAction(it)); 136 + } 137 + 108 138 #dbResultToAction(it: ActionDbResult) { 109 139 return { 110 140 id: it.id, 111 141 name: it.name, 112 142 orgId: it.orgId, 113 143 applyUserStrikes: it.applyUserStrikes, 144 + penalty: it.penalty, 114 145 ...(() => { 115 146 switch (it.actionType) { 116 147 case 'CUSTOM_ACTION':
+45 -1
server/services/moderationConfigService/modules/PolicyOperations.ts
··· 36 36 'semantic_version as semanticVersion', 37 37 'user_strike_count as userStrikeCount', 38 38 'apply_user_strike_count_config_to_children as applyUserStrikeCountConfigToChildren', 39 - 'penalty', // TODO: remove 39 + 'penalty', 40 + ] as const; 41 + 42 + const policyJoinDbSelection = [ 43 + 'rap.rule_id as ruleId', 44 + 'p.id', 45 + 'p.name', 46 + 'p.org_id as orgId', 47 + 'p.parent_id as parentId', 48 + 'p.created_at as createdAt', 49 + 'p.updated_at as updatedAt', 50 + 'p.policy_text as policyText', 51 + 'p.enforcement_guidelines as enforcementGuidelines', 52 + 'p.sys_period as sysPeriod', 53 + 'p.policy_type as policyType', 54 + 'p.semantic_version as semanticVersion', 55 + 'p.user_strike_count as userStrikeCount', 56 + 'p.apply_user_strike_count_config_to_children as applyUserStrikeCountConfigToChildren', 57 + 'p.penalty', 40 58 ] as const; 41 59 42 60 type PolicyDbResult = FixKyselyRowCorrelation< ··· 64 82 const results = (await query.execute()) as PolicyDbResult[]; 65 83 66 84 return results.map((it) => this.#dbResultToPolicy(it)); 85 + } 86 + 87 + async getPoliciesByRuleIds(opts: { 88 + ruleIds: readonly string[]; 89 + readFromReplica?: boolean; 90 + }): Promise<Record<string, Policy[]>> { 91 + const { ruleIds, readFromReplica } = opts; 92 + if (ruleIds.length === 0) { 93 + return {}; 94 + } 95 + const pgQuery = this.#getPgQuery(readFromReplica ?? true); 96 + type Row = PolicyDbResult & { ruleId: string }; 97 + const rows = (await pgQuery 98 + .selectFrom('public.rules_and_policies as rap') 99 + .innerJoin('public.policies as p', 'p.id', 'rap.policy_id') 100 + .select(policyJoinDbSelection) 101 + .where('rap.rule_id', 'in', [...ruleIds]) 102 + .execute()) as Row[]; 103 + 104 + const out: Record<string, Policy[]> = {}; 105 + for (const row of rows) { 106 + const { ruleId, ...policyFields } = row; 107 + const policy = this.#dbResultToPolicy(policyFields as PolicyDbResult); 108 + (out[ruleId] ??= []).push(policy); 109 + } 110 + return out; 67 111 } 68 112 69 113 async getPolicy(opts: {
+147
server/services/moderationConfigService/modules/RuleReadOperations.ts
··· 1 + import { type Kysely, sql } from 'kysely'; 2 + 3 + import { 4 + type RuleAlarmStatus, 5 + RuleStatus, 6 + RuleType, 7 + type ConditionSet, 8 + } from '../index.js'; 9 + import { type ModerationConfigServicePg } from '../dbTypes.js'; 10 + import { getUtcDateOnlyString } from '../../../utils/time.js'; 11 + import { 12 + type PlainRuleWithLatestVersion, 13 + computeRuleStatusFromRow, 14 + } from '../../../models/rules/ruleTypes.js'; 15 + 16 + const ruleSelect = [ 17 + 'r.id', 18 + 'r.name', 19 + 'r.description', 20 + 'r.status_if_unexpired as statusIfUnexpired', 21 + 'r.tags', 22 + 'r.max_daily_actions as maxDailyActions', 23 + 'r.daily_actions_run as dailyActionsRun', 24 + 'r.last_action_date as lastActionDate', 25 + 'r.created_at as createdAt', 26 + 'r.updated_at as updatedAt', 27 + 'r.org_id as orgId', 28 + 'r.creator_id as creatorId', 29 + 'r.expiration_time as expirationTime', 30 + 'r.condition_set as conditionSet', 31 + 'r.alarm_status as alarmStatus', 32 + 'r.alarm_status_set_at as alarmStatusSetAt', 33 + 'r.rule_type as ruleType', 34 + 'r.parent_id as parentId', 35 + 'rlv.version as latestVersionString', 36 + ] as const; 37 + 38 + type RuleRow = { 39 + id: string; 40 + name: string; 41 + description: string | null; 42 + statusIfUnexpired: Exclude<RuleStatus, typeof RuleStatus.EXPIRED>; 43 + tags: string[]; 44 + maxDailyActions: number | null; 45 + dailyActionsRun: number; 46 + lastActionDate: string | null; 47 + createdAt: Date; 48 + updatedAt: Date; 49 + orgId: string; 50 + creatorId: string; 51 + expirationTime: Date | null; 52 + conditionSet: ConditionSet; 53 + // Kysely returns the Postgres enum as a plain string; cast in rowToPlainRuleWithLatest. 54 + alarmStatus: string; 55 + alarmStatusSetAt: Date; 56 + ruleType: RuleType; 57 + parentId: string | null; 58 + latestVersionString: string | null; 59 + }; 60 + 61 + function enabledQuotaWhere(today: string) { 62 + return sql<boolean>`(r.max_daily_actions is null or r.last_action_date is distinct from ${today}::date or (r.max_daily_actions is not null and r.daily_actions_run < r.max_daily_actions))`; 63 + } 64 + 65 + function rowToPlainRuleWithLatest(row: RuleRow): PlainRuleWithLatestVersion { 66 + const status = computeRuleStatusFromRow(row.expirationTime, row.statusIfUnexpired); 67 + const version = row.latestVersionString ?? ''; 68 + return { 69 + id: row.id, 70 + name: row.name, 71 + description: row.description, 72 + statusIfUnexpired: row.statusIfUnexpired, 73 + status, 74 + tags: row.tags, 75 + maxDailyActions: row.maxDailyActions, 76 + dailyActionsRun: row.dailyActionsRun, 77 + lastActionDate: row.lastActionDate, 78 + createdAt: row.createdAt, 79 + updatedAt: row.updatedAt, 80 + orgId: row.orgId, 81 + creatorId: row.creatorId, 82 + expirationTime: row.expirationTime, 83 + conditionSet: row.conditionSet, 84 + alarmStatus: row.alarmStatus as RuleAlarmStatus, 85 + alarmStatusSetAt: row.alarmStatusSetAt, 86 + ruleType: row.ruleType, 87 + parentId: row.parentId, 88 + latestVersion: { ruleId: row.id, version }, 89 + }; 90 + } 91 + 92 + export default class RuleReadOperations { 93 + constructor( 94 + private readonly pgQuery: Kysely<ModerationConfigServicePg>, 95 + private readonly pgQueryReplica: Kysely<ModerationConfigServicePg>, 96 + ) {} 97 + 98 + async getEnabledRulesForItemType(itemTypeId: string) { 99 + const today = String(getUtcDateOnlyString()); 100 + const rows = (await this.pgQueryReplica 101 + .selectFrom('public.rules as r') 102 + .innerJoin('public.rules_and_item_types as rit', 'rit.rule_id', 'r.id') 103 + .leftJoin('public.rules_latest_versions as rlv', 'rlv.rule_id', 'r.id') 104 + .select(ruleSelect) 105 + .where('rit.item_type_id', '=', itemTypeId) 106 + .where((eb) => 107 + eb.or([ 108 + eb('r.expiration_time', 'is', null), 109 + eb('r.expiration_time', '>', sql<Date>`now()`), 110 + ]), 111 + ) 112 + .where('r.status_if_unexpired', 'in', [ 113 + RuleStatus.LIVE, 114 + RuleStatus.BACKGROUND, 115 + ]) 116 + .where(enabledQuotaWhere(today)) 117 + .execute()) as RuleRow[]; 118 + 119 + return rows.map(rowToPlainRuleWithLatest); 120 + } 121 + 122 + async findEnabledUserRules() { 123 + const today = String(getUtcDateOnlyString()); 124 + const rows = (await this.pgQueryReplica 125 + .selectFrom('public.rules as r') 126 + .leftJoin('public.rules_latest_versions as rlv', 'rlv.rule_id', 'r.id') 127 + .select(ruleSelect) 128 + .where('r.rule_type', '=', RuleType.USER) 129 + .where( 130 + sql<boolean>`not exists (select 1 from public.rules_and_item_types rit where rit.rule_id = r.id)`, 131 + ) 132 + .where((eb) => 133 + eb.or([ 134 + eb('r.expiration_time', 'is', null), 135 + eb('r.expiration_time', '>', sql<Date>`now()`), 136 + ]), 137 + ) 138 + .where('r.status_if_unexpired', 'in', [ 139 + RuleStatus.LIVE, 140 + RuleStatus.BACKGROUND, 141 + ]) 142 + .where(enabledQuotaWhere(today)) 143 + .execute()) as RuleRow[]; 144 + 145 + return rows.map(rowToPlainRuleWithLatest); 146 + } 147 + }
+3
server/services/moderationConfigService/types/actions.ts
··· 6 6 7 7 import { type TaggedUnionFromCases } from '../../../utils/typescript-types.js'; 8 8 9 + import { type UserPenaltySeverity } from './shared.js'; 10 + 9 11 export const ActionType = makeEnumLike([ 10 12 'CUSTOM_ACTION', 11 13 'ENQUEUE_TO_MRT', ··· 28 30 orgId: string; 29 31 name: string; 30 32 applyUserStrikes: boolean; 33 + penalty: UserPenaltySeverity; 31 34 } & TaggedUnionFromCases< 32 35 { actionType: ActionType }, 33 36 {
+3 -1
server/services/moderationConfigService/types/policies.ts
··· 1 1 import { makeEnumLike } from '@roostorg/types'; 2 2 3 + import { type UserPenaltySeverity } from './shared.js'; 4 + 3 5 export const PolicyType = makeEnumLike([ 4 6 'HATE', 5 7 'VIOLENCE', ··· 30 32 semanticVersion: number; 31 33 userStrikeCount: number; 32 34 applyUserStrikeCountConfigToChildren: boolean; 33 - penalty: string; // TODO: remove 35 + penalty: UserPenaltySeverity; 34 36 };
+1 -1
server/services/orgAwareSignalExecutionService/signalExecutionService.ts
··· 4 4 import { type ReadonlyDeep } from 'type-fest'; 5 5 6 6 import { inject } from '../../iocContainer/utils.js'; 7 - import { type PolicyActionPenalties } from '../../models/OrgModel.js'; 7 + import { type PolicyActionPenalties } from '../policyActionPenalties.js'; 8 8 import { type MatchingValues } from '../../models/rules/matchingValues.js'; 9 9 import { type LocationArea } from '../../models/types/locationArea.js'; 10 10 import { jsonStringify } from '../../utils/encoding.js';
+73
server/services/policyActionPenalties.ts
··· 1 + import { 2 + UserPenaltySeverity, 3 + type ModerationConfigService, 4 + } from './moderationConfigService/index.js'; 5 + 6 + export type PolicyActionPenalties = { 7 + actionId: string; 8 + policyId: string; 9 + penalties: number[]; 10 + }; 11 + 12 + /** 13 + * Computes the severity of the penalty we should apply for a given 14 + * (action, policy) pair. The general idea is to make the penalties 15 + * increase exponentially as severity levels increase, but the rate 16 + * of increase can't be so high that a (severe, severe) penalty is 17 + * 50x higher than a (high, high) penalty. 18 + * 19 + * The easiest way to achieve this exponential behavior is at the individual 20 + * severity levels, rather than trying to multiply the action penalty 21 + * by the severity penalty to compound their magnitudes. So the severity 22 + * levels apply penalty magnitudes as follows: 23 + * 24 + * NONE = 0 25 + * LOW = 1 26 + * MEDIUM = 3 27 + * HIGH = 9 28 + * SEVERE = 27 29 + * 30 + * To get the penalty value for an (action, policy) pair, we just add the 31 + * penalty values of the action and policy because the exponential nature 32 + * of these penalties has already been taken into account. 33 + * 34 + * If the action has no penalty (e.g., "Send to Moderation", "Restore 35 + * Content"), we never apply any penalty, regardless of the policy penalty. 36 + * Otherwise, the penalty accounts for both the action + policy penalties. 37 + */ 38 + export function computeActionPolicyPenalty( 39 + actionPenalty: UserPenaltySeverity, 40 + policyPenalty: UserPenaltySeverity, 41 + ): number { 42 + const penaltySeverityMap: { [k in UserPenaltySeverity]: number } = { 43 + [UserPenaltySeverity.NONE]: 0, 44 + [UserPenaltySeverity.LOW]: 1, 45 + [UserPenaltySeverity.MEDIUM]: 3, 46 + [UserPenaltySeverity.HIGH]: 9, 47 + [UserPenaltySeverity.SEVERE]: 27, 48 + }; 49 + 50 + return actionPenalty === UserPenaltySeverity.NONE 51 + ? 0 52 + : penaltySeverityMap[actionPenalty] + penaltySeverityMap[policyPenalty]; 53 + } 54 + 55 + export async function getPolicyActionPenaltiesForOrg( 56 + moderationConfigService: ModerationConfigService, 57 + orgId: string, 58 + ): Promise<PolicyActionPenalties[]> { 59 + const [actions, policies] = await Promise.all([ 60 + moderationConfigService.getActions({ orgId, readFromReplica: true }), 61 + moderationConfigService.getPolicies({ orgId, readFromReplica: true }), 62 + ]); 63 + 64 + return policies.flatMap((policy) => 65 + actions.map((action) => ({ 66 + actionId: action.id, 67 + policyId: policy.id, 68 + penalties: [ 69 + computeActionPolicyPenalty(action.penalty, policy.penalty), 70 + ], 71 + })), 72 + ); 73 + }
+94 -47
server/services/ruleAnomalyDetectionService/detectRulePassRateAnomaliesJob.test.ts
··· 1 - import _ from 'lodash'; 2 - 3 1 import getBottle, { 4 2 type Dependencies, 5 3 type PublicInterface, 6 4 } from '../../iocContainer/index.js'; 7 - import { type Rule as TRule } from '../../models/rules/RuleModel.js'; 8 5 import { type NotificationsService } from '../../services/notificationsService/notificationsService.js'; 9 6 import { type GetCurrentPeriodRuleAlarmStatuses } from '../../services/ruleAnomalyDetectionService/getCurrentPeriodRuleAlarmStatuses.js'; 10 7 import createOrg from '../../test/fixtureHelpers/createOrg.js'; 11 8 import createRule from '../../test/fixtureHelpers/createRule.js'; 12 9 import createUser from '../../test/fixtureHelpers/createUser.js'; 13 - import { mocked, type Mocked } from '../../test/mockHelpers/jestMocks.js'; 10 + import { type Mocked } from '../../test/mockHelpers/jestMocks.js'; 14 11 import { RuleAlarmStatus } from '../moderationConfigService/index.js'; 15 12 import DetectRulePassRateAnomaliesJob from './detectRulePassRateAnomaliesJob.js'; 16 13 14 + function makeMockKyselyForRules( 15 + fakeRules: Array<{ 16 + id: string; 17 + orgId: string; 18 + creatorId: string; 19 + name: string; 20 + alarmStatus: RuleAlarmStatus; 21 + statusIfUnexpired: string; 22 + }>, 23 + orgRows: Array<{ id: string; on_call_alert_email: string | null }>, 24 + ) { 25 + const updateExecute = jest.fn().mockResolvedValue(undefined); 26 + const mockDb = { 27 + selectFrom: jest.fn((table: string) => { 28 + const chain: { 29 + select: jest.Mock; 30 + where: jest.Mock; 31 + execute: jest.Mock; 32 + } = { 33 + select: jest.fn(), 34 + where: jest.fn(), 35 + execute: jest.fn(), 36 + }; 37 + chain.select.mockReturnValue(chain); 38 + chain.where.mockReturnValue(chain); 39 + chain.execute.mockImplementation(async () => { 40 + if (table === 'public.rules') { 41 + return fakeRules.map((r) => ({ 42 + id: r.id, 43 + org_id: r.orgId, 44 + creator_id: r.creatorId, 45 + name: r.name, 46 + alarm_status: r.alarmStatus, 47 + status_if_unexpired: r.statusIfUnexpired, 48 + })); 49 + } 50 + if (table === 'public.orgs') { 51 + return orgRows; 52 + } 53 + return []; 54 + }); 55 + return chain; 56 + }), 57 + updateTable: jest.fn(() => ({ 58 + set: jest.fn().mockReturnValue({ 59 + where: jest.fn().mockReturnValue({ 60 + execute: updateExecute, 61 + }), 62 + }), 63 + })), 64 + __updateExecute: updateExecute, 65 + }; 66 + return mockDb; 67 + } 68 + 17 69 describe('Detect Rule Anomalies', () => { 18 70 describe('worker', () => { 19 - let OrgModel: Dependencies['Sequelize']['Org'], 20 - deleteMockData: () => Promise<void>, 21 - mockDummyRules: Mocked<TRule, 'save'>[], 22 - mockRuleModel: Mocked<Dependencies['Sequelize']['Rule'], 'findAll'>, 71 + let deleteMockData: () => Promise<void>, 72 + mockDummyRules: Array<{ 73 + id: string; 74 + orgId: string; 75 + creatorId: string; 76 + name: string; 77 + alarmStatus: RuleAlarmStatus; 78 + statusIfUnexpired: string; 79 + }>, 80 + mockKysely: ReturnType<typeof makeMockKyselyForRules>, 23 81 mockGetCurrentPeriodRuleAlarmStatuses: GetCurrentPeriodRuleAlarmStatuses, 24 82 mockNotificationsService: Mocked< 25 83 PublicInterface<NotificationsService>, ··· 33 91 ModerationConfigService, 34 92 ApiKeyService, 35 93 } = (await getBottle()).container; 36 - OrgModel = models.Org; 37 94 38 95 // make some fake rules (w/ stable ids so we can match them in a snapshot) 39 96 // in different initial alarm statuses, to test all 9 combinations [i.e., ··· 107 164 }), 108 165 ]); 109 166 110 - mockDummyRules = fakeRules.map((it) => mocked(it, ['save'])); 167 + mockDummyRules = fakeRules.map((r) => ({ 168 + id: r.id, 169 + orgId: r.orgId, 170 + creatorId: r.creatorId, 171 + name: r.name, 172 + alarmStatus: r.alarmStatus, 173 + statusIfUnexpired: r.statusIfUnexpired, 174 + })); 175 + 111 176 mockGetCurrentPeriodRuleAlarmStatuses = async () => { 112 177 const newAlarmStatusByRule = 113 178 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions ··· 119 184 i % 3 === 0 120 185 ? RuleAlarmStatus.ALARM 121 186 : i % 3 === 1 122 - ? RuleAlarmStatus.OK 123 - : RuleAlarmStatus.INSUFFICIENT_DATA, 187 + ? RuleAlarmStatus.OK 188 + : RuleAlarmStatus.INSUFFICIENT_DATA, 124 189 meta: { lastPeriodPassRate: 0.5, secondToLastPeriodPassRate: 0.4 }, 125 190 }; 126 191 }); 127 192 return newAlarmStatusByRule; 128 193 }; 129 194 130 - mockNotificationsService = mocked( 131 - { 132 - async createNotifications(_it: any) {}, 133 - async getNotificationsForUser() { 134 - return []; 135 - }, 136 - }, 137 - ['createNotifications'], 138 - ); 195 + mockNotificationsService = { 196 + createNotifications: jest.fn(), 197 + getNotificationsForUser: jest.fn(), 198 + } as unknown as Mocked< 199 + PublicInterface<NotificationsService>, 200 + 'createNotifications' 201 + >; 139 202 140 - mockRuleModel = mocked(models.Rule, ['findAll']); 141 - mockRuleModel.findAll.mockResolvedValue(mockDummyRules); 203 + mockKysely = makeMockKyselyForRules(mockDummyRules, [ 204 + { id: org.id, on_call_alert_email: null }, 205 + { id: org2.id, on_call_alert_email: 'test@gmail.com' }, 206 + ]); 142 207 143 - // It might be nice if we could create the mock data at the start of a 144 - // transaction, run the tests with that transaction open, and then just 145 - // roll it back at the end to automatically delete and leave the db in a 146 - // consistent/clean state. That's a little tricky, though, as it requires 147 - // feeding the transacation object (or keeping a managed transaction 148 - // callback open) all the way into calling the worker. So, instead, we 149 - // settle for manually defining this compensating transacaction, which we 150 - // call at the end. 151 208 deleteMockData = async () => { 152 209 await Promise.all(fakeRules.map(async (it) => it.destroy())); 153 210 await Promise.all([ruleOwner.destroy(), ruleOwner2.destroy()]); ··· 163 220 164 221 test('should generate the proper notifications + update rules', async () => { 165 222 const worker = DetectRulePassRateAnomaliesJob( 166 - mockRuleModel, 167 - OrgModel, 223 + mockKysely as unknown as Dependencies['KyselyPg'], 168 224 mockNotificationsService, 169 225 mockGetCurrentPeriodRuleAlarmStatuses, 170 226 jest.fn<() => Promise<void>>(), 171 227 ); 172 228 await worker.run(); 173 229 174 - // We should've sent 4 notifications: one for each of the two rules that 175 - // was in alarm and transitioned to 'not alarm' (ok or insufficient data), 176 - // and one for each of the rules that was in 'not alarm' and went to alarm. 177 230 const mockCreateNotifications = 178 231 mockNotificationsService.createNotifications; 179 232 ··· 255 308 ] 256 309 `); 257 310 258 - // Except for the rules that stayed the same state (indexes 0, 4, 8), all 259 - // the rules should've been saved with their new state. 260 - const newRuleStatuses = await mockGetCurrentPeriodRuleAlarmStatuses(); 261 - mockDummyRules.forEach((rule, i) => { 262 - if (![0, 4, 8].includes(i)) { 263 - expect(rule.save).toHaveBeenCalledTimes(1); 264 - expect(rule.alarmStatus).toEqual(newRuleStatuses[rule.id].status); 265 - } else { 266 - expect(rule.save).toHaveBeenCalledTimes(0); 267 - } 268 - }); 311 + await mockGetCurrentPeriodRuleAlarmStatuses(); 312 + const expectedUpdates = mockDummyRules.filter( 313 + (_rule, i) => ![0, 4, 8].includes(i), 314 + ).length; 315 + expect(mockKysely.updateTable).toHaveBeenCalledTimes(expectedUpdates); 269 316 }); 270 317 }); 271 318 });
+57 -33
server/services/ruleAnomalyDetectionService/detectRulePassRateAnomaliesJob.ts
··· 7 7 8 8 const { capitalize, keyBy } = lodash; 9 9 10 + type OrgAlertRow = { 11 + id: string; 12 + on_call_alert_email: string | null; 13 + }; 14 + 10 15 export default inject( 11 16 [ 12 - 'RuleModel', 13 - 'OrgModel', 17 + 'KyselyPg', 14 18 'NotificationsService', 15 19 'getCurrentPeriodRuleAlarmStatuses', 16 20 'closeSharedResourcesForShutdown', 17 21 ], 18 22 ( 19 - Rule, 20 - Org, 23 + db, 21 24 notificationsService, 22 25 getCurrentPeriodRuleAlarmStatuses, 23 26 sharedResourceShutdown, ··· 27 30 const now = new Date(); 28 31 const newAlarmStatusByRule = await getCurrentPeriodRuleAlarmStatuses(); 29 32 30 - // TODO: at some point, we might have to chunk this, 31 - // but we're very far from that right now. We also don't have to use a 32 - // transaction, since there's no risk of concurrent updates to rule.alarmStatus. 33 33 const ruleIds = Object.keys(newAlarmStatusByRule); 34 - const rules = await Rule.findAll({ where: { id: ruleIds } }); 34 + if (ruleIds.length === 0) { 35 + return; 36 + } 37 + 38 + const rules = await db 39 + .selectFrom('public.rules') 40 + .select([ 41 + 'id', 42 + 'org_id', 43 + 'creator_id', 44 + 'name', 45 + 'alarm_status', 46 + 'status_if_unexpired', 47 + ]) 48 + .where('id', 'in', ruleIds) 49 + .execute(); 50 + 35 51 const alarmStatusChangedRules = rules.filter( 36 - (rule) => rule.alarmStatus !== newAlarmStatusByRule[rule.id].status, 52 + (rule) => rule.alarm_status !== newAlarmStatusByRule[rule.id].status, 37 53 ); 38 54 39 - const changedRuleOrgIds = alarmStatusChangedRules.map((it) => it.orgId); 40 - const orgsForChangedRules = changedRuleOrgIds.length 41 - ? keyBy( 42 - await Org.findAll({ 43 - where: { id: [...new Set(changedRuleOrgIds)] }, 44 - }), 45 - (it) => it.id, 46 - ) 47 - : {}; 55 + const changedRuleOrgIds = alarmStatusChangedRules.map((it) => it.org_id); 56 + let orgsForChangedRules: Record<string, OrgAlertRow> = {}; 57 + if (changedRuleOrgIds.length > 0) { 58 + const orgRows = (await db 59 + .selectFrom('public.orgs') 60 + .select(['id', 'on_call_alert_email']) 61 + .where('id', 'in', [...new Set(changedRuleOrgIds)]) 62 + .execute()) as OrgAlertRow[]; 63 + orgsForChangedRules = keyBy(orgRows, (r) => r.id); 64 + } 48 65 49 - // Notify the creator of each rule, and the on call alert email (if any). 50 66 const notifications = alarmStatusChangedRules 51 67 .filter( 52 - // Only alert if we're coming into an ALARM, or going out of one. 53 - // If we transitioned (e.g.) from "OK" to "INSUFFICIENT_DATA", b/c a 54 - // rule's conditions got updated, we don't care about that. Similarly, 55 - // we don't care about INSUFFICIENT_DATA going to "OK". 56 68 (it) => 57 - it.alarmStatus === RuleAlarmStatus.ALARM || 69 + it.alarm_status === RuleAlarmStatus.ALARM || 58 70 newAlarmStatusByRule[it.id].status === RuleAlarmStatus.ALARM, 59 71 ) 60 - .map((rule) => { 72 + .flatMap((rule) => { 61 73 const ruleNowInAlarm = 62 74 newAlarmStatusByRule[rule.id].status === RuleAlarmStatus.ALARM; 75 + 76 + const orgRow = orgsForChangedRules[rule.org_id]; 77 + if (!orgRow) { 78 + return []; 79 + } 63 80 64 81 return { 65 82 type: ruleNowInAlarm ··· 76 93 message: `${ 77 94 ruleNowInAlarm 78 95 ? `[Alarm Triggered - ${capitalize( 79 - rule.statusIfUnexpired, 96 + String(rule.status_if_unexpired), 80 97 )} Rule]` 81 - : `[Alarm Cleared - ${capitalize(rule.statusIfUnexpired)} Rule]` 98 + : `[Alarm Cleared - ${capitalize( 99 + String(rule.status_if_unexpired), 100 + )} Rule]` 82 101 } ${rule.name} has ${ 83 102 ruleNowInAlarm ? 'started' : 'stopped' 84 103 } passing at an anomalous rate.`, 85 104 recipients: [ 86 - { type: 'user_id', value: rule.creatorId }, 87 - ...(orgsForChangedRules[rule.orgId].onCallAlertEmail 105 + { type: 'user_id' as const, value: rule.creator_id }, 106 + ...(orgRow.on_call_alert_email 88 107 ? [ 89 108 { 90 109 type: 'email_address' as const, 91 - value: orgsForChangedRules[rule.orgId].onCallAlertEmail!, 110 + value: orgRow.on_call_alert_email, 92 111 }, 93 112 ] 94 113 : []), ··· 97 116 }); 98 117 99 118 const ruleUpdateTasks = alarmStatusChangedRules.map(async (rule) => { 100 - rule.alarmStatus = newAlarmStatusByRule[rule.id].status; 101 - rule.alarmStatusSetAt = now; 102 - return rule.save(); 119 + await db 120 + .updateTable('public.rules') 121 + .set({ 122 + alarm_status: newAlarmStatusByRule[rule.id].status, 123 + alarm_status_set_at: now, 124 + }) 125 + .where('id', '=', rule.id) 126 + .execute(); 103 127 }); 104 128 105 129 await Promise.all([
+1 -1
server/services/signalsService/signals/SignalBase.ts
··· 6 6 } from '@roostorg/types'; 7 7 import { type ReadonlyDeep, type Simplify } from 'type-fest'; 8 8 9 - import { type PolicyActionPenalties } from '../../../models/OrgModel.js'; 9 + import { type PolicyActionPenalties } from '../../policyActionPenalties.js'; 10 10 import { type TaggedItemData } from '../../../models/rules/item-type-fields.js'; 11 11 import { type CoopError } from '../../../utils/errors.js'; 12 12 import { type Language } from '../../../utils/language.js';
+2 -1
server/services/userManagementService/dbTypes.ts
··· 59 59 'public.users': { 60 60 id: GeneratedAlways<string>; 61 61 email: string; 62 - password: string; 62 + password: string | null; 63 63 first_name: string; 64 64 last_name: string; 65 65 role: UserRole; ··· 68 68 created_at: GeneratedAlways<Date>; 69 69 updated_at: GeneratedAlways<Date>; 70 70 org_id: string; 71 + login_methods: ('password' | 'saml')[]; 71 72 }; 72 73 'public.invite_user_tokens': { 73 74 id: GeneratedAlways<string>;
+1 -1
server/services/userStatisticsService/computeUserScore.ts
··· 1 1 import _ from 'lodash'; 2 2 import { type ReadonlyDeep } from 'type-fest'; 3 3 4 - import { type PolicyActionPenalties } from '../../models/OrgModel.js'; 4 + import { type PolicyActionPenalties } from '../policyActionPenalties.js'; 5 5 import { jsonStringify, type JsonOf } from '../../utils/encoding.js'; 6 6 import { unzip2 } from '../../utils/fp-helpers.js'; 7 7 import { type UserActionStatistics } from './fetchUserActionStatistics.js';
+1 -1
server/services/userStatisticsService/userStatisticsService.ts
··· 7 7 import { type ReadonlyDeep } from 'type-fest'; 8 8 9 9 import { inject, type Dependencies } from '../../iocContainer/index.js'; 10 - import { type PolicyActionPenalties } from '../../models/OrgModel.js'; 10 + import { type PolicyActionPenalties } from '../policyActionPenalties.js'; 11 11 import { initialUserScore, type UserScore } from './computeUserScore.js'; 12 12 import { 13 13 type UserStatisticsServicePg,
+3
server/services/userStrikeService/userStrikeService.test.ts
··· 65 65 name: 'testAction1', 66 66 applyUserStrikes: true, 67 67 orgId: 'fakeOrgId', 68 + penalty: 'NONE' as const, 68 69 callbackUrl: 'fakeCallbackUrl1', 69 70 callbackUrlHeaders: null, 70 71 callbackUrlBody: null, ··· 106 107 name: 'testAction1', 107 108 applyUserStrikes: false, 108 109 orgId: 'fakeOrgId', 110 + penalty: 'NONE' as const, 109 111 callbackUrl: 'fakeCallbackUrl1', 110 112 callbackUrlHeaders: null, 111 113 callbackUrlBody: null, ··· 137 139 name: 'testAction1', 138 140 applyUserStrikes: false, 139 141 orgId: 'fakeOrgId', 142 + penalty: 'NONE' as const, 140 143 callbackUrl: 'fakeCallbackUrl1', 141 144 callbackUrlHeaders: null, 142 145 callbackUrlBody: null,
+36
server/utils/kyselyTransactionWithRetry.ts
··· 1 + import { type Kysely } from 'kysely'; 2 + 3 + import { safeGet } from './misc.js'; 4 + 5 + function isSerializationFailure(error: unknown): boolean { 6 + return safeGet(error, ['code']) === '40001'; 7 + } 8 + 9 + /** 10 + * Like {@link server/models/sequelizeSetup.ts maketransactionWithRetry} but for Kysely. 11 + */ 12 + export function makeKyselyTransactionWithRetry<T>(kysely: Kysely<T>) { 13 + return async function transactionWithRetry<R>( 14 + callback: (trx: Kysely<T>) => Promise<R>, 15 + ): Promise<R> { 16 + let remainingTries = 3; 17 + let lastError: unknown; 18 + while (remainingTries > 0) { 19 + remainingTries -= 1; 20 + try { 21 + return await kysely.transaction().execute(callback); 22 + } catch (e: unknown) { 23 + if (!isSerializationFailure(e)) { 24 + throw e; 25 + } 26 + lastError = e; 27 + } 28 + } 29 + 30 + throw lastError; 31 + }; 32 + } 33 + 34 + export type KyselyTransactionWithRetry<T> = ReturnType< 35 + typeof makeKyselyTransactionWithRetry<T> 36 + >;
+47 -26
server/utils/sql.test.ts
··· 1 - import * as knexPkg from 'knex'; 1 + import { Kysely, PostgresDialect, type PostgresQueryResult } from 'kysely'; 2 2 3 3 import { takeLast } from './sql.js'; 4 4 5 - const { knex: Knex } = knexPkg.default; 5 + function makeCompileOnlyDb<T extends Record<string, Record<string, unknown>>>() { 6 + return new Kysely<T>({ 7 + dialect: new PostgresDialect({ 8 + pool: { 9 + async connect() { 10 + return { 11 + query: jest.fn().mockResolvedValue({ 12 + rows: [], 13 + command: 'SELECT', 14 + rowCount: 0, 15 + } as PostgresQueryResult<unknown>), 16 + async release() {}, 17 + }; 18 + }, 19 + async end() {}, 20 + }, 21 + }), 22 + }); 23 + } 6 24 7 25 describe('Sql Helpers', () => { 8 26 describe('takeLast', () => { 9 27 test('should work for simple queries', () => { 10 28 type User = { id: string; name: string; email: string }; 29 + type TestDb = { users: User }; 11 30 12 - const knex = Knex({ dialect: 'postgres' }); 13 - const users = knex<User>('users').select('id', 'name'); 31 + const db = makeCompileOnlyDb<TestDb>(); 32 + const users = db.selectFrom('users').select(['id', 'name']); 14 33 15 - const result = takeLast(users, [{ column: 'id', order: 'desc' }], 2); 34 + const result = takeLast(db, users, [{ column: 'id', order: 'desc' }], 2); 16 35 17 - expect(result.toString()).toEqual( 18 - 'select * from (select "id", "name" from "users" order by "id" asc limit 2) as "dc2d41a9-082e-48b0-a66f-345a22696b02" order by "id" desc', 36 + expect(result.compile().sql).toEqual( 37 + 'select * from (select "id", "name" from "users" order by "id" asc limit $1) as "dc2d41a9-082e-48b0-a66f-345a22696b02" order by "id" desc', 19 38 ); 39 + expect(result.compile().parameters).toEqual([2]); 20 40 }); 21 41 22 42 test('should work for arbitrarily complex queries', () => { 23 - // We'll test this with the real query we use for backtesting. 24 - // This is still only one case (notably, with no joins), but at least it 25 - // uses aliases and a WHERE, so it'll give us a bit more confidence. 26 - const knex = Knex({ dialect: 'postgres' }); 27 - type Result = { 28 - orgId: string; 29 - ts: string; 30 - content: string; 31 - correlationId: string; 43 + type RuleExecRow = { 44 + ORG_ID: string; 45 + TS: string; 46 + CONTENT: string; 47 + CORRELATION_ID: string; 32 48 }; 49 + type TestDb = { RULE_EXECUTIONS: RuleExecRow }; 33 50 34 - const backtestResults = knex<Result>('RULE_EXECUTIONS') 35 - .select({ 36 - orgId: 'ORG_ID', 37 - ts: 'TS', 38 - content: 'CONTENT', 39 - correlationId: 'CORRELATION_ID', 40 - }) 51 + const db = makeCompileOnlyDb<TestDb>(); 52 + const backtestResults = db 53 + .selectFrom('RULE_EXECUTIONS') 54 + .select([ 55 + 'ORG_ID as orgId', 56 + 'TS as ts', 57 + 'CONTENT as content', 58 + 'CORRELATION_ID as correlationId', 59 + ]) 41 60 .where('CORRELATION_ID', '=', '47') 42 - .andWhere('TS', '>', '2019-01-01'); 61 + .where('TS', '>', '2019-01-01'); 43 62 44 63 const result = takeLast( 64 + db, 45 65 backtestResults, 46 66 [{ column: 'ts', order: 'asc' }], 47 67 50, 48 68 ); 49 69 50 - expect(result.toString()).toMatchInlineSnapshot( 51 - `"select * from (select "ORG_ID" as "orgId", "TS" as "ts", "CONTENT" as "content", "CORRELATION_ID" as "correlationId" from "RULE_EXECUTIONS" where "CORRELATION_ID" = '47' and "TS" > '2019-01-01' order by "ts" desc limit 50) as "dc2d41a9-082e-48b0-a66f-345a22696b02" order by "ts" asc"`, 70 + expect(result.compile().sql).toMatchInlineSnapshot( 71 + `"select * from (select "ORG_ID" as "orgId", "TS" as "ts", "CONTENT" as "content", "CORRELATION_ID" as "correlationId" from "RULE_EXECUTIONS" where "CORRELATION_ID" = $1 and "TS" > $2 order by "ts" desc limit $3) as "dc2d41a9-082e-48b0-a66f-345a22696b02" order by "ts" asc"`, 52 72 ); 73 + expect(result.compile().parameters).toEqual(['47', '2019-01-01', 50]); 53 74 }); 54 75 }); 55 76 });
+41 -53
server/utils/sql.ts
··· 1 - import * as knexPkg from 'knex'; 2 - import type { Knex } from 'knex'; 3 - 4 - const { knex } = knexPkg.default; 1 + import type { Kysely, SelectQueryBuilder } from 'kysely'; 5 2 6 3 /** 7 4 * When paginating backwards (i.e., the user's on page 5 and asks for page 4 ··· 17 14 * last 2" generically, you need to sort the items in reverse order, apply a 18 15 * LIMIT, then reverse the result. 19 16 * 20 - * To do that generically, we need to use a query builder that'll let us build 21 - * queries programmatically/work with them as data structures, so we use knex. 22 - * This function, then implements the "take last n" operation given an unsorted 23 - * knex select query and some sort criteria. 17 + * This function implements that pattern on a Kysely select query. 18 + * 19 + * @param db Kysely instance used only to build the outer `select * from (…)`. 20 + * It must use the same dialect as `unsortedSelectQuery`. 24 21 * 25 22 * @param unsortedSelectQuery The query that selects the set of items (without 26 23 * them being sorted) from which we want to take the last n, after sorting. ··· 29 26 * returned by unsortedSelectQuery, before we can take the last n items. 30 27 * NB: the column names provided here must refer to one of the columns 31 28 * selected by `unsortedSelectQuery`, under that column's final alias. E.g., 32 - * if unsortedSelectQuery is `SELECT a as "hello" from table`, then you can 33 - * only provide "hello" as the sort criteria; not "a", and not some unselected 34 - * column "b". 29 + * if the select is `DS as date`, then sort criteria must use `date`, not 30 + * `DS`. 35 31 * 36 32 * @param size How many items to take. 37 33 * 38 - * @param client The name of the knex client to use. This effects the 39 - * SQL-dialect-specific settings that knex might apply to the generated query. 40 - * NB: these dialect-specific settings potentially include data escaping rules 41 - * that could be relevant for SQL injection. 42 - * 43 - * @param subqueryAlias Internally, this query generates a subquery, and SQL 44 - * mandates that that subquery be given an alias. In theory, there's maybe 45 - * some risk of that alias 46 - * 47 - * @returns A new knex query that selects the last n items, after sorting. 34 + * @returns A Kysely query that selects the last n items, after sorting. 48 35 */ 49 - export function takeLast<T extends object>( 50 - unsortedSelectQuery: Knex.QueryBuilder<T>, 51 - sortCriteria: { 52 - column: (keyof T & string) | Knex.Raw; 53 - order: 'desc' | 'asc'; 54 - }[], 36 + const SUBQUERY_ALIAS = 'dc2d41a9-082e-48b0-a66f-345a22696b02'; 37 + 38 + export function takeLast< 39 + DB, 40 + TB extends keyof DB, 41 + O extends Record<string, unknown>, 42 + >( 43 + db: Kysely<DB>, 44 + unsortedSelectQuery: SelectQueryBuilder<DB, TB, O>, 45 + sortCriteria: readonly { column: keyof O & string; order: 'desc' | 'asc' }[], 55 46 size: number, 56 - client: string = 'pg', 57 47 ) { 58 - // SQL requires that the subquery we create have an alias. I don't _think_ 59 - // there's risk of that name causing a naming conflict, but I haven't thought 60 - // too hard about all the scoping implications, so, to be safe, we give this 61 - // alias a very-unlikely-to-conflict name. 62 - const subqueryAlias = 'dc2d41a9-082e-48b0-a66f-345a22696b02'; 48 + let inner = unsortedSelectQuery.clearOrderBy(); 49 + for (const it of sortCriteria) { 50 + inner = inner.orderBy( 51 + it.column, 52 + it.order === 'desc' ? 'asc' : 'desc', 53 + ); 54 + } 55 + inner = inner.limit(size); 63 56 64 - const inner = unsortedSelectQuery 65 - .clone() 66 - .orderBy( 67 - sortCriteria.map((it) => ({ 68 - // Cast here is because I think the knex typings are just wrong. 69 - // They suggest that `column` has to be a string, but, actually, 70 - // we can sort on arbitrary expressesions contained in a `knex.raw`. 71 - column: it.column as keyof T & string, 72 - order: it.order === 'desc' ? ('asc' as const) : ('desc' as const), 73 - })), 74 - ) 75 - .limit(size); 76 - 77 - return knex({ client }) 78 - .select('*') 79 - .from<T>(inner.as(subqueryAlias)) 80 - .orderBy( 81 - // Cast here is same as above. 82 - sortCriteria as ((typeof sortCriteria)[number] & { column: string })[], 83 - ); 57 + // Chaining `orderBy` in a loop widens `outer` to an incompatible union; the 58 + // builder is still the same concrete Kysely select at runtime. 59 + let outer = db.selectFrom(inner.as(SUBQUERY_ALIAS)).selectAll() as SelectQueryBuilder< 60 + DB & { [K in typeof SUBQUERY_ALIAS]: O }, 61 + typeof SUBQUERY_ALIAS, 62 + O 63 + >; 64 + for (const it of sortCriteria) { 65 + outer = outer.orderBy(it.column, it.order) as typeof outer; 66 + } 67 + return outer as SelectQueryBuilder< 68 + DB & { [K in typeof SUBQUERY_ALIAS]: O }, 69 + typeof SUBQUERY_ALIAS, 70 + O 71 + >; 84 72 }
+9 -3
server/workers_jobs/RunUserRulesJob.ts
··· 11 11 'RuleEngine', 12 12 'UserStatisticsService', 13 13 'closeSharedResourcesForShutdown', 14 - 'RuleModel', 14 + 'ModerationConfigService', 15 15 'getItemTypeEventuallyConsistent', 16 16 ], 17 - (RuleEngine, userStatisticsService, sharedResourceShutdown, Rule, getItemTypeEventuallyConsistent) => ({ 17 + ( 18 + RuleEngine, 19 + userStatisticsService, 20 + sharedResourceShutdown, 21 + moderationConfigService, 22 + getItemTypeEventuallyConsistent, 23 + ) => ({ 18 24 type: 'Job' as const, 19 25 async run() { 20 26 // TODO: we may have to do only some orgs per job run at some point. 21 27 // For now, though, this is fine. 22 - const userRules = await Rule.findEnabledUserRules(); 28 + const userRules = await moderationConfigService.findEnabledUserRules(); 23 29 24 30 if (!userRules.length) { 25 31 return;