Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { type Kysely, sql } from 'kysely';
2import { type JsonObject, type JsonValue, type Writable } from 'type-fest';
3import { uid } from 'uid';
4
5import {
6 CoopError,
7 ErrorType,
8 makeNotFoundError,
9 type ErrorInstanceData,
10} from '../../../utils/errors.js';
11import {
12 isUniqueViolationError,
13 type FixKyselyRowCorrelation,
14} from '../../../utils/kysely.js';
15import { makeKyselyTransactionWithRetry } from '../../../utils/kyselyTransactionWithRetry.js';
16import { assertUnreachable, removeUndefinedKeys } from '../../../utils/misc.js';
17import { type ModerationConfigServicePg } from '../dbTypes.js';
18import { type Action, type CustomAction } from '../index.js';
19import { type ItemTypeKind } from '../types/itemTypes.js';
20import {
21 type RawActionParameterInput,
22 serializeParameters,
23 validateActionParameters,
24} from './actionParametersValidation.js';
25
26function assertCustomAction(action: Action): asserts action is CustomAction {
27 if (action.actionType !== 'CUSTOM_ACTION') {
28 throw new Error(
29 `Expected CUSTOM_ACTION but received ${action.actionType}`,
30 );
31 }
32}
33
34// Seeded once per org by upsertBuiltInActions; not creatable/editable via the
35// CRUD APIs, which are scoped to action_type='CUSTOM_ACTION'.
36export const BUILT_IN_ACTIONS = [
37 {
38 actionType: 'ENQUEUE_TO_MRT',
39 name: 'Enqueue Item to Manual Review',
40 description:
41 'Sends the matched item directly to a manual review queue, routed by the org\u2019s MRT routing rules.',
42 appliesToAllItemsOfKind: ['CONTENT', 'USER', 'THREAD'] as const,
43 },
44 {
45 actionType: 'ENQUEUE_AUTHOR_TO_MRT',
46 name: 'Enqueue Author for Manual Review',
47 description:
48 'Sends the author of the matched content to a manual review queue, with the matched item attached as context.',
49 appliesToAllItemsOfKind: ['CONTENT'] as const,
50 },
51 {
52 actionType: 'ENQUEUE_TO_NCMEC',
53 name: 'Enqueue for NCMEC Review',
54 description:
55 'Sends the user associated with the matched item to the NCMEC review flow, gathering their media for reporting.',
56 appliesToAllItemsOfKind: ['CONTENT', 'USER'] as const,
57 },
58] as const satisfies readonly {
59 actionType: Exclude<Action['actionType'], 'CUSTOM_ACTION'>;
60 name: string;
61 description: string;
62 appliesToAllItemsOfKind: readonly ItemTypeKind[];
63}[];
64
65const actionDbSelection = [
66 'id',
67 'name',
68 'description',
69 'callback_url as callbackUrl',
70 'callback_url_headers as callbackUrlHeaders',
71 'callback_url_body as callbackUrlBody',
72 'org_id as orgId',
73 'penalty',
74 'action_type as actionType',
75 'applies_to_all_items_of_kind as appliesToAllItemsOfKind',
76 'apply_user_strikes as applyUserStrikes',
77 'custom_mrt_api_params as customMrtApiParams',
78] as const;
79
80const actionJoinDbSelection = [
81 'a.id',
82 'a.name',
83 'a.description',
84 'a.callback_url as callbackUrl',
85 'a.callback_url_headers as callbackUrlHeaders',
86 'a.callback_url_body as callbackUrlBody',
87 'a.org_id as orgId',
88 'a.penalty',
89 'a.action_type as actionType',
90 'a.applies_to_all_items_of_kind as appliesToAllItemsOfKind',
91 'a.apply_user_strikes as applyUserStrikes',
92 'a.custom_mrt_api_params as customMrtApiParams',
93] as const;
94
95type ActionDbResult = FixKyselyRowCorrelation<
96 ModerationConfigServicePg['public.actions'],
97 typeof actionDbSelection
98>;
99
100export default class ActionOperations {
101 private readonly transactionWithRetry: ReturnType<
102 typeof makeKyselyTransactionWithRetry<ModerationConfigServicePg>
103 >;
104
105 constructor(
106 private readonly pgQuery: Kysely<ModerationConfigServicePg>,
107 private readonly pgQueryReplica: Kysely<ModerationConfigServicePg>,
108 ) {
109 this.transactionWithRetry = makeKyselyTransactionWithRetry(this.pgQuery);
110 }
111
112 async createAction(
113 orgId: string,
114 input: {
115 name: string;
116 description: string | null;
117 // TODO: support other types? Need to figure out relationship between
118 // activating various org settings (e.g., to enable MRT or NCMEC reporting)
119 // and this moderationConfigService.
120 type: 'CUSTOM_ACTION';
121 callbackUrl: string;
122 callbackUrlHeaders: JsonObject | null;
123 callbackUrlBody: JsonObject | null;
124 applyUserStrikes?: boolean;
125 itemTypeIds?: readonly string[];
126 // AJV narrows this to `readonly ActionParameter[]` and rejects nulls /
127 // unknown fields; the GraphQL input shape is wider (nullable optionals)
128 // so we accept arbitrary property bags here.
129 parameters?: readonly RawActionParameterInput[] | null;
130 },
131 ): Promise<CustomAction> {
132 const parameters = validateActionParameters(input.parameters ?? null);
133
134 return this.transactionWithRetry(async (trx) => {
135 try {
136 const query = trx
137 .insertInto('public.actions')
138 .values({
139 id: uid(),
140 name: input.name,
141 description: input.description,
142 org_id: orgId,
143 action_type: input.type,
144 callback_url: input.callbackUrl,
145 callback_url_headers: input.callbackUrlHeaders,
146 callback_url_body: input.callbackUrlBody,
147 penalty: 'NONE',
148 apply_user_strikes: input.applyUserStrikes ?? false,
149 custom_mrt_api_params: serializeParameters(parameters),
150 updated_at: new Date(),
151 })
152 .returning(actionDbSelection);
153
154 const actionRow =
155 (await query.executeTakeFirstOrThrow()) as ActionDbResult;
156
157 if (input.itemTypeIds !== undefined && input.itemTypeIds.length > 0) {
158 await trx
159 .insertInto('public.actions_and_item_types')
160 .values(
161 input.itemTypeIds.map((item_type_id) => ({
162 action_id: actionRow.id,
163 item_type_id,
164 })),
165 )
166 .execute();
167 }
168
169 const action = this.#dbResultToAction(actionRow);
170 assertCustomAction(action);
171 return action;
172 } catch (e: unknown) {
173 if (isUniqueViolationError(e)) {
174 throw makeActionNameExistsError({ shouldErrorSpan: true });
175 }
176 throw e;
177 }
178 });
179 }
180
181 // Idempotent: existing built-ins are detected by (org_id, action_type).
182 async upsertBuiltInActions(orgId: string): Promise<readonly Action[]> {
183 return this.transactionWithRetry(async (trx) => {
184 const existingByType = new Set(
185 (
186 (await trx
187 .selectFrom('public.actions')
188 .select('action_type as actionType')
189 .where('org_id', '=', orgId)
190 .where('action_type', '!=', 'CUSTOM_ACTION')
191 .execute()) as { actionType: Action['actionType'] }[]
192 ).map((row) => row.actionType),
193 );
194
195 const toInsert = BUILT_IN_ACTIONS.filter(
196 (b) => !existingByType.has(b.actionType),
197 ).map((b) => ({
198 id: uid(),
199 name: b.name,
200 description: b.description,
201 org_id: orgId,
202 action_type: b.actionType,
203 callback_url: null,
204 callback_url_headers: null,
205 callback_url_body: null,
206 penalty: 'NONE' as const,
207 apply_user_strikes: false,
208 applies_to_all_items_of_kind: [...b.appliesToAllItemsOfKind],
209 updated_at: new Date(),
210 }));
211
212 if (toInsert.length > 0) {
213 await trx
214 .insertInto('public.actions')
215 .values(toInsert)
216 .onConflict((oc) => oc.doNothing())
217 .execute();
218 }
219
220 const refreshed = (await trx
221 .selectFrom('public.actions')
222 .select(actionDbSelection)
223 .where('org_id', '=', orgId)
224 .where('action_type', '!=', 'CUSTOM_ACTION')
225 .execute()) as ActionDbResult[];
226
227 return refreshed.map((row) => this.#dbResultToAction(row));
228 });
229 }
230
231 async updateCustomAction(opts: {
232 orgId: string;
233 actionId: string;
234 patch: {
235 name?: string;
236 description?: string | null;
237 callbackUrl?: string;
238 callbackUrlHeaders?: JsonObject | null;
239 callbackUrlBody?: JsonObject | null;
240 applyUserStrikes?: boolean;
241 // `undefined` = leave unchanged. Pass `[]` to clear all parameters.
242 parameters?: readonly RawActionParameterInput[] | null;
243 };
244 itemTypeIds?: readonly string[] | undefined;
245 }): Promise<CustomAction> {
246 const { orgId, actionId, patch, itemTypeIds } = opts;
247 const validatedParameters =
248 patch.parameters === undefined
249 ? undefined
250 : validateActionParameters(patch.parameters);
251 return this.transactionWithRetry(async (trx) => {
252 const existing = (await trx
253 .selectFrom('public.actions')
254 .select(actionDbSelection)
255 .where('id', '=', actionId)
256 .where('org_id', '=', orgId)
257 .where('action_type', '=', 'CUSTOM_ACTION')
258 .executeTakeFirst()) as ActionDbResult | undefined;
259
260 if (existing == null) {
261 throw makeNotFoundError('Action not found', { shouldErrorSpan: true });
262 }
263
264 const setPayload = removeUndefinedKeys({
265 name: patch.name,
266 description: patch.description,
267 callback_url: patch.callbackUrl,
268 callback_url_headers: patch.callbackUrlHeaders,
269 callback_url_body: patch.callbackUrlBody,
270 apply_user_strikes: patch.applyUserStrikes,
271 custom_mrt_api_params:
272 validatedParameters === undefined
273 ? undefined
274 : serializeParameters(validatedParameters),
275 });
276 const hasUserFields = Object.keys(setPayload).length > 0;
277 const touchesJunction = itemTypeIds !== undefined;
278
279 if (!hasUserFields && !touchesJunction) {
280 const action = this.#dbResultToAction(existing);
281 assertCustomAction(action);
282 return action;
283 }
284
285 try {
286 if (hasUserFields) {
287 await trx
288 .updateTable('public.actions')
289 .set({
290 ...setPayload,
291 updated_at: new Date(),
292 })
293 .where('id', '=', actionId)
294 .where('org_id', '=', orgId)
295 .execute();
296 }
297
298 if (itemTypeIds !== undefined) {
299 await trx
300 .deleteFrom('public.actions_and_item_types')
301 .where('action_id', '=', actionId)
302 .execute();
303 if (itemTypeIds.length > 0) {
304 await trx
305 .insertInto('public.actions_and_item_types')
306 .values(
307 itemTypeIds.map((item_type_id) => ({
308 action_id: actionId,
309 item_type_id,
310 })),
311 )
312 .execute();
313 }
314 }
315
316 const refreshed = (await trx
317 .selectFrom('public.actions')
318 .select(actionDbSelection)
319 .where('id', '=', actionId)
320 .where('org_id', '=', orgId)
321 .executeTakeFirstOrThrow()) as ActionDbResult;
322
323 const action = this.#dbResultToAction(refreshed);
324 assertCustomAction(action);
325 return action;
326 } catch (e: unknown) {
327 if (isUniqueViolationError(e)) {
328 throw makeActionNameExistsError({ shouldErrorSpan: true });
329 }
330 throw e;
331 }
332 });
333 }
334
335 async deleteCustomAction(opts: { orgId: string; actionId: string }) {
336 const { orgId, actionId } = opts;
337 return this.transactionWithRetry(async (trx) => {
338 const row = await trx
339 .selectFrom('public.actions')
340 .select('id')
341 .where('id', '=', actionId)
342 .where('org_id', '=', orgId)
343 .where('action_type', '=', 'CUSTOM_ACTION')
344 .executeTakeFirst();
345
346 if (row == null) {
347 return false;
348 }
349
350 await trx
351 .deleteFrom('public.rules_and_actions')
352 .where('action_id', '=', actionId)
353 .execute();
354 await trx
355 .deleteFrom('public.actions_and_item_types')
356 .where('action_id', '=', actionId)
357 .execute();
358 await trx
359 .deleteFrom('public.actions')
360 .where('id', '=', actionId)
361 .where('org_id', '=', orgId)
362 .execute();
363
364 return true;
365 });
366 }
367
368 async getActionsForItemType(opts: {
369 orgId: string;
370 itemTypeId: string;
371 itemTypeKind: ItemTypeKind;
372 readFromReplica?: boolean;
373 }) {
374 const { orgId, itemTypeId, itemTypeKind, readFromReplica } = opts;
375 const pgQuery = this.#getPgQuery(readFromReplica);
376
377 const [viaJunction, viaAppliesAll] = await Promise.all([
378 pgQuery
379 .selectFrom('public.actions_and_item_types as ait')
380 .innerJoin('public.actions as a', 'a.id', 'ait.action_id')
381 .select(actionJoinDbSelection)
382 .where('ait.item_type_id', '=', itemTypeId)
383 .where('a.org_id', '=', orgId)
384 .execute(),
385 pgQuery
386 .selectFrom('public.actions as a')
387 .select(actionJoinDbSelection)
388 .where('a.org_id', '=', orgId)
389 .where(
390 sql<boolean>`${itemTypeKind}::text = ANY(a.applies_to_all_items_of_kind::text[])`,
391 )
392 .execute(),
393 ]);
394
395 const junctionRows = viaJunction as ActionDbResult[];
396 const appliesAllRows = viaAppliesAll as ActionDbResult[];
397
398 const byId = new Map<string, ActionDbResult>();
399 for (const row of [...junctionRows, ...appliesAllRows]) {
400 byId.set(row.id, row);
401 }
402 return [...byId.values()].map((it) => this.#dbResultToAction(it));
403 }
404
405 async getActions(opts: {
406 orgId: string;
407 ids?: readonly string[];
408 readFromReplica?: boolean;
409 }) {
410 const { ids, orgId, readFromReplica } = opts;
411 const pgQuery = this.#getPgQuery(readFromReplica);
412 const query = pgQuery
413 .selectFrom('public.actions')
414 .select(actionDbSelection)
415 .where('org_id', '=', orgId)
416 .$if(ids !== undefined, (qb) => qb.where('id', 'in', ids!));
417
418 const results = (await query.execute()) as ActionDbResult[];
419
420 return results.map((it) => this.#dbResultToAction(it));
421 }
422
423 async getActionsForRuleId(opts: {
424 orgId: string;
425 ruleId: string;
426 readFromReplica?: boolean;
427 }) {
428 const { orgId, ruleId, readFromReplica } = opts;
429 const pgQuery = this.#getPgQuery(readFromReplica);
430 const results = (await pgQuery
431 .selectFrom('public.rules_and_actions as raa')
432 .innerJoin('public.actions as a', 'a.id', 'raa.action_id')
433 .select(actionJoinDbSelection)
434 .where('raa.rule_id', '=', ruleId)
435 .where('a.org_id', '=', orgId)
436 .execute()) as ActionDbResult[];
437
438 return results.map((it) => this.#dbResultToAction(it));
439 }
440
441 private static customMrtApiParamsFromDb(
442 value: JsonValue[] | null,
443 ): JsonValue | null {
444 if (value == null || value.length === 0) {
445 return null;
446 }
447 return value;
448 }
449
450 #dbResultToAction(it: ActionDbResult) {
451 return {
452 id: it.id,
453 name: it.name,
454 description: it.description,
455 orgId: it.orgId,
456 applyUserStrikes: it.applyUserStrikes,
457 penalty: it.penalty,
458 ...(() => {
459 switch (it.actionType) {
460 case 'CUSTOM_ACTION':
461 return {
462 actionType: it.actionType,
463 callbackUrl: it.callbackUrl,
464 callbackUrlBody: it.callbackUrlBody,
465 callbackUrlHeaders: it.callbackUrlHeaders,
466 customMrtApiParams:
467 ActionOperations.customMrtApiParamsFromDb(it.customMrtApiParams),
468 };
469 case 'ENQUEUE_TO_MRT':
470 case 'ENQUEUE_TO_NCMEC':
471 case 'ENQUEUE_AUTHOR_TO_MRT':
472 return { actionType: it.actionType };
473 default:
474 assertUnreachable(it);
475 }
476 })(),
477 } satisfies Writable<Action> as Action;
478 }
479
480 #getPgQuery(readFromReplica: boolean = false) {
481 return readFromReplica ? this.pgQueryReplica : this.pgQuery;
482 }
483}
484
485export type ActionErrorType = 'ActionNameExistsError';
486
487export const makeActionNameExistsError = (data: ErrorInstanceData) =>
488 new CoopError({
489 status: 409,
490 type: [ErrorType.UniqueViolation],
491 title: 'An action with this name already exists',
492 name: 'ActionNameExistsError',
493 ...data,
494 });