Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 494 lines 16 kB view raw
1import { type Kysely, sql } from 'kysely'; 2import { type JsonObject, type JsonValue, type Writable } from 'type-fest'; 3import { uid } from 'uid'; 4 5import { 6 CoopError, 7 ErrorType, 8 makeNotFoundError, 9 type ErrorInstanceData, 10} from '../../../utils/errors.js'; 11import { 12 isUniqueViolationError, 13 type FixKyselyRowCorrelation, 14} from '../../../utils/kysely.js'; 15import { makeKyselyTransactionWithRetry } from '../../../utils/kyselyTransactionWithRetry.js'; 16import { assertUnreachable, removeUndefinedKeys } from '../../../utils/misc.js'; 17import { type ModerationConfigServicePg } from '../dbTypes.js'; 18import { type Action, type CustomAction } from '../index.js'; 19import { type ItemTypeKind } from '../types/itemTypes.js'; 20import { 21 type RawActionParameterInput, 22 serializeParameters, 23 validateActionParameters, 24} from './actionParametersValidation.js'; 25 26function assertCustomAction(action: Action): asserts action is CustomAction { 27 if (action.actionType !== 'CUSTOM_ACTION') { 28 throw new Error( 29 `Expected CUSTOM_ACTION but received ${action.actionType}`, 30 ); 31 } 32} 33 34// Seeded once per org by upsertBuiltInActions; not creatable/editable via the 35// CRUD APIs, which are scoped to action_type='CUSTOM_ACTION'. 36export const BUILT_IN_ACTIONS = [ 37 { 38 actionType: 'ENQUEUE_TO_MRT', 39 name: 'Enqueue Item to Manual Review', 40 description: 41 'Sends the matched item directly to a manual review queue, routed by the org\u2019s MRT routing rules.', 42 appliesToAllItemsOfKind: ['CONTENT', 'USER', 'THREAD'] as const, 43 }, 44 { 45 actionType: 'ENQUEUE_AUTHOR_TO_MRT', 46 name: 'Enqueue Author for Manual Review', 47 description: 48 'Sends the author of the matched content to a manual review queue, with the matched item attached as context.', 49 appliesToAllItemsOfKind: ['CONTENT'] as const, 50 }, 51 { 52 actionType: 'ENQUEUE_TO_NCMEC', 53 name: 'Enqueue for NCMEC Review', 54 description: 55 'Sends the user associated with the matched item to the NCMEC review flow, gathering their media for reporting.', 56 appliesToAllItemsOfKind: ['CONTENT', 'USER'] as const, 57 }, 58] as const satisfies readonly { 59 actionType: Exclude<Action['actionType'], 'CUSTOM_ACTION'>; 60 name: string; 61 description: string; 62 appliesToAllItemsOfKind: readonly ItemTypeKind[]; 63}[]; 64 65const actionDbSelection = [ 66 'id', 67 'name', 68 'description', 69 'callback_url as callbackUrl', 70 'callback_url_headers as callbackUrlHeaders', 71 'callback_url_body as callbackUrlBody', 72 'org_id as orgId', 73 'penalty', 74 'action_type as actionType', 75 'applies_to_all_items_of_kind as appliesToAllItemsOfKind', 76 'apply_user_strikes as applyUserStrikes', 77 'custom_mrt_api_params as customMrtApiParams', 78] as const; 79 80const actionJoinDbSelection = [ 81 'a.id', 82 'a.name', 83 'a.description', 84 'a.callback_url as callbackUrl', 85 'a.callback_url_headers as callbackUrlHeaders', 86 'a.callback_url_body as callbackUrlBody', 87 'a.org_id as orgId', 88 'a.penalty', 89 'a.action_type as actionType', 90 'a.applies_to_all_items_of_kind as appliesToAllItemsOfKind', 91 'a.apply_user_strikes as applyUserStrikes', 92 'a.custom_mrt_api_params as customMrtApiParams', 93] as const; 94 95type ActionDbResult = FixKyselyRowCorrelation< 96 ModerationConfigServicePg['public.actions'], 97 typeof actionDbSelection 98>; 99 100export default class ActionOperations { 101 private readonly transactionWithRetry: ReturnType< 102 typeof makeKyselyTransactionWithRetry<ModerationConfigServicePg> 103 >; 104 105 constructor( 106 private readonly pgQuery: Kysely<ModerationConfigServicePg>, 107 private readonly pgQueryReplica: Kysely<ModerationConfigServicePg>, 108 ) { 109 this.transactionWithRetry = makeKyselyTransactionWithRetry(this.pgQuery); 110 } 111 112 async createAction( 113 orgId: string, 114 input: { 115 name: string; 116 description: string | null; 117 // TODO: support other types? Need to figure out relationship between 118 // activating various org settings (e.g., to enable MRT or NCMEC reporting) 119 // and this moderationConfigService. 120 type: 'CUSTOM_ACTION'; 121 callbackUrl: string; 122 callbackUrlHeaders: JsonObject | null; 123 callbackUrlBody: JsonObject | null; 124 applyUserStrikes?: boolean; 125 itemTypeIds?: readonly string[]; 126 // AJV narrows this to `readonly ActionParameter[]` and rejects nulls / 127 // unknown fields; the GraphQL input shape is wider (nullable optionals) 128 // so we accept arbitrary property bags here. 129 parameters?: readonly RawActionParameterInput[] | null; 130 }, 131 ): Promise<CustomAction> { 132 const parameters = validateActionParameters(input.parameters ?? null); 133 134 return this.transactionWithRetry(async (trx) => { 135 try { 136 const query = trx 137 .insertInto('public.actions') 138 .values({ 139 id: uid(), 140 name: input.name, 141 description: input.description, 142 org_id: orgId, 143 action_type: input.type, 144 callback_url: input.callbackUrl, 145 callback_url_headers: input.callbackUrlHeaders, 146 callback_url_body: input.callbackUrlBody, 147 penalty: 'NONE', 148 apply_user_strikes: input.applyUserStrikes ?? false, 149 custom_mrt_api_params: serializeParameters(parameters), 150 updated_at: new Date(), 151 }) 152 .returning(actionDbSelection); 153 154 const actionRow = 155 (await query.executeTakeFirstOrThrow()) as ActionDbResult; 156 157 if (input.itemTypeIds !== undefined && input.itemTypeIds.length > 0) { 158 await trx 159 .insertInto('public.actions_and_item_types') 160 .values( 161 input.itemTypeIds.map((item_type_id) => ({ 162 action_id: actionRow.id, 163 item_type_id, 164 })), 165 ) 166 .execute(); 167 } 168 169 const action = this.#dbResultToAction(actionRow); 170 assertCustomAction(action); 171 return action; 172 } catch (e: unknown) { 173 if (isUniqueViolationError(e)) { 174 throw makeActionNameExistsError({ shouldErrorSpan: true }); 175 } 176 throw e; 177 } 178 }); 179 } 180 181 // Idempotent: existing built-ins are detected by (org_id, action_type). 182 async upsertBuiltInActions(orgId: string): Promise<readonly Action[]> { 183 return this.transactionWithRetry(async (trx) => { 184 const existingByType = new Set( 185 ( 186 (await trx 187 .selectFrom('public.actions') 188 .select('action_type as actionType') 189 .where('org_id', '=', orgId) 190 .where('action_type', '!=', 'CUSTOM_ACTION') 191 .execute()) as { actionType: Action['actionType'] }[] 192 ).map((row) => row.actionType), 193 ); 194 195 const toInsert = BUILT_IN_ACTIONS.filter( 196 (b) => !existingByType.has(b.actionType), 197 ).map((b) => ({ 198 id: uid(), 199 name: b.name, 200 description: b.description, 201 org_id: orgId, 202 action_type: b.actionType, 203 callback_url: null, 204 callback_url_headers: null, 205 callback_url_body: null, 206 penalty: 'NONE' as const, 207 apply_user_strikes: false, 208 applies_to_all_items_of_kind: [...b.appliesToAllItemsOfKind], 209 updated_at: new Date(), 210 })); 211 212 if (toInsert.length > 0) { 213 await trx 214 .insertInto('public.actions') 215 .values(toInsert) 216 .onConflict((oc) => oc.doNothing()) 217 .execute(); 218 } 219 220 const refreshed = (await trx 221 .selectFrom('public.actions') 222 .select(actionDbSelection) 223 .where('org_id', '=', orgId) 224 .where('action_type', '!=', 'CUSTOM_ACTION') 225 .execute()) as ActionDbResult[]; 226 227 return refreshed.map((row) => this.#dbResultToAction(row)); 228 }); 229 } 230 231 async updateCustomAction(opts: { 232 orgId: string; 233 actionId: string; 234 patch: { 235 name?: string; 236 description?: string | null; 237 callbackUrl?: string; 238 callbackUrlHeaders?: JsonObject | null; 239 callbackUrlBody?: JsonObject | null; 240 applyUserStrikes?: boolean; 241 // `undefined` = leave unchanged. Pass `[]` to clear all parameters. 242 parameters?: readonly RawActionParameterInput[] | null; 243 }; 244 itemTypeIds?: readonly string[] | undefined; 245 }): Promise<CustomAction> { 246 const { orgId, actionId, patch, itemTypeIds } = opts; 247 const validatedParameters = 248 patch.parameters === undefined 249 ? undefined 250 : validateActionParameters(patch.parameters); 251 return this.transactionWithRetry(async (trx) => { 252 const existing = (await trx 253 .selectFrom('public.actions') 254 .select(actionDbSelection) 255 .where('id', '=', actionId) 256 .where('org_id', '=', orgId) 257 .where('action_type', '=', 'CUSTOM_ACTION') 258 .executeTakeFirst()) as ActionDbResult | undefined; 259 260 if (existing == null) { 261 throw makeNotFoundError('Action not found', { shouldErrorSpan: true }); 262 } 263 264 const setPayload = removeUndefinedKeys({ 265 name: patch.name, 266 description: patch.description, 267 callback_url: patch.callbackUrl, 268 callback_url_headers: patch.callbackUrlHeaders, 269 callback_url_body: patch.callbackUrlBody, 270 apply_user_strikes: patch.applyUserStrikes, 271 custom_mrt_api_params: 272 validatedParameters === undefined 273 ? undefined 274 : serializeParameters(validatedParameters), 275 }); 276 const hasUserFields = Object.keys(setPayload).length > 0; 277 const touchesJunction = itemTypeIds !== undefined; 278 279 if (!hasUserFields && !touchesJunction) { 280 const action = this.#dbResultToAction(existing); 281 assertCustomAction(action); 282 return action; 283 } 284 285 try { 286 if (hasUserFields) { 287 await trx 288 .updateTable('public.actions') 289 .set({ 290 ...setPayload, 291 updated_at: new Date(), 292 }) 293 .where('id', '=', actionId) 294 .where('org_id', '=', orgId) 295 .execute(); 296 } 297 298 if (itemTypeIds !== undefined) { 299 await trx 300 .deleteFrom('public.actions_and_item_types') 301 .where('action_id', '=', actionId) 302 .execute(); 303 if (itemTypeIds.length > 0) { 304 await trx 305 .insertInto('public.actions_and_item_types') 306 .values( 307 itemTypeIds.map((item_type_id) => ({ 308 action_id: actionId, 309 item_type_id, 310 })), 311 ) 312 .execute(); 313 } 314 } 315 316 const refreshed = (await trx 317 .selectFrom('public.actions') 318 .select(actionDbSelection) 319 .where('id', '=', actionId) 320 .where('org_id', '=', orgId) 321 .executeTakeFirstOrThrow()) as ActionDbResult; 322 323 const action = this.#dbResultToAction(refreshed); 324 assertCustomAction(action); 325 return action; 326 } catch (e: unknown) { 327 if (isUniqueViolationError(e)) { 328 throw makeActionNameExistsError({ shouldErrorSpan: true }); 329 } 330 throw e; 331 } 332 }); 333 } 334 335 async deleteCustomAction(opts: { orgId: string; actionId: string }) { 336 const { orgId, actionId } = opts; 337 return this.transactionWithRetry(async (trx) => { 338 const row = await trx 339 .selectFrom('public.actions') 340 .select('id') 341 .where('id', '=', actionId) 342 .where('org_id', '=', orgId) 343 .where('action_type', '=', 'CUSTOM_ACTION') 344 .executeTakeFirst(); 345 346 if (row == null) { 347 return false; 348 } 349 350 await trx 351 .deleteFrom('public.rules_and_actions') 352 .where('action_id', '=', actionId) 353 .execute(); 354 await trx 355 .deleteFrom('public.actions_and_item_types') 356 .where('action_id', '=', actionId) 357 .execute(); 358 await trx 359 .deleteFrom('public.actions') 360 .where('id', '=', actionId) 361 .where('org_id', '=', orgId) 362 .execute(); 363 364 return true; 365 }); 366 } 367 368 async getActionsForItemType(opts: { 369 orgId: string; 370 itemTypeId: string; 371 itemTypeKind: ItemTypeKind; 372 readFromReplica?: boolean; 373 }) { 374 const { orgId, itemTypeId, itemTypeKind, readFromReplica } = opts; 375 const pgQuery = this.#getPgQuery(readFromReplica); 376 377 const [viaJunction, viaAppliesAll] = await Promise.all([ 378 pgQuery 379 .selectFrom('public.actions_and_item_types as ait') 380 .innerJoin('public.actions as a', 'a.id', 'ait.action_id') 381 .select(actionJoinDbSelection) 382 .where('ait.item_type_id', '=', itemTypeId) 383 .where('a.org_id', '=', orgId) 384 .execute(), 385 pgQuery 386 .selectFrom('public.actions as a') 387 .select(actionJoinDbSelection) 388 .where('a.org_id', '=', orgId) 389 .where( 390 sql<boolean>`${itemTypeKind}::text = ANY(a.applies_to_all_items_of_kind::text[])`, 391 ) 392 .execute(), 393 ]); 394 395 const junctionRows = viaJunction as ActionDbResult[]; 396 const appliesAllRows = viaAppliesAll as ActionDbResult[]; 397 398 const byId = new Map<string, ActionDbResult>(); 399 for (const row of [...junctionRows, ...appliesAllRows]) { 400 byId.set(row.id, row); 401 } 402 return [...byId.values()].map((it) => this.#dbResultToAction(it)); 403 } 404 405 async getActions(opts: { 406 orgId: string; 407 ids?: readonly string[]; 408 readFromReplica?: boolean; 409 }) { 410 const { ids, orgId, readFromReplica } = opts; 411 const pgQuery = this.#getPgQuery(readFromReplica); 412 const query = pgQuery 413 .selectFrom('public.actions') 414 .select(actionDbSelection) 415 .where('org_id', '=', orgId) 416 .$if(ids !== undefined, (qb) => qb.where('id', 'in', ids!)); 417 418 const results = (await query.execute()) as ActionDbResult[]; 419 420 return results.map((it) => this.#dbResultToAction(it)); 421 } 422 423 async getActionsForRuleId(opts: { 424 orgId: string; 425 ruleId: string; 426 readFromReplica?: boolean; 427 }) { 428 const { orgId, ruleId, readFromReplica } = opts; 429 const pgQuery = this.#getPgQuery(readFromReplica); 430 const results = (await pgQuery 431 .selectFrom('public.rules_and_actions as raa') 432 .innerJoin('public.actions as a', 'a.id', 'raa.action_id') 433 .select(actionJoinDbSelection) 434 .where('raa.rule_id', '=', ruleId) 435 .where('a.org_id', '=', orgId) 436 .execute()) as ActionDbResult[]; 437 438 return results.map((it) => this.#dbResultToAction(it)); 439 } 440 441 private static customMrtApiParamsFromDb( 442 value: JsonValue[] | null, 443 ): JsonValue | null { 444 if (value == null || value.length === 0) { 445 return null; 446 } 447 return value; 448 } 449 450 #dbResultToAction(it: ActionDbResult) { 451 return { 452 id: it.id, 453 name: it.name, 454 description: it.description, 455 orgId: it.orgId, 456 applyUserStrikes: it.applyUserStrikes, 457 penalty: it.penalty, 458 ...(() => { 459 switch (it.actionType) { 460 case 'CUSTOM_ACTION': 461 return { 462 actionType: it.actionType, 463 callbackUrl: it.callbackUrl, 464 callbackUrlBody: it.callbackUrlBody, 465 callbackUrlHeaders: it.callbackUrlHeaders, 466 customMrtApiParams: 467 ActionOperations.customMrtApiParamsFromDb(it.customMrtApiParams), 468 }; 469 case 'ENQUEUE_TO_MRT': 470 case 'ENQUEUE_TO_NCMEC': 471 case 'ENQUEUE_AUTHOR_TO_MRT': 472 return { actionType: it.actionType }; 473 default: 474 assertUnreachable(it); 475 } 476 })(), 477 } satisfies Writable<Action> as Action; 478 } 479 480 #getPgQuery(readFromReplica: boolean = false) { 481 return readFromReplica ? this.pgQueryReplica : this.pgQuery; 482 } 483} 484 485export type ActionErrorType = 'ActionNameExistsError'; 486 487export const makeActionNameExistsError = (data: ErrorInstanceData) => 488 new CoopError({ 489 status: 409, 490 type: [ErrorType.UniqueViolation], 491 title: 'An action with this name already exists', 492 name: 'ActionNameExistsError', 493 ...data, 494 });