Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { type ConsumerDirectives } from '../../lib/cache/index.js';
2import { type ItemIdentifier } from '@roostorg/types';
3import { type Kysely } from 'kysely';
4import { match } from 'ts-pattern';
5
6import { inject, type Dependencies } from '../../iocContainer/index.js';
7import {
8 fromCorrelationId,
9 type CorrelationId,
10} from '../../utils/correlationIds.js';
11import { YEAR_MS } from '../../utils/time.js';
12import { type Bind1 } from '../../utils/typescript-types.js';
13import {
14 itemSubmissionToItemSubmissionWithTypeIdentifier,
15 type ItemSubmission,
16} from '../itemProcessingService/index.js';
17import { type ReportingServicePg } from './dbTypes.js';
18import ReportingRuleEngine from './reportingRuleEngine.js';
19import {
20 buildSimplifiedHistoryQuery,
21 getSimplifiedRuleHistory,
22 type VersionedField,
23} from './reportingRuleHistoryHelpers.js';
24import ReportingRules, {
25 type CreateReportingRuleInput,
26 type UpdateReportingRuleInput,
27} from './ReportingRules.js';
28import { jsonStringify } from '../../utils/encoding.js';
29import {
30 type IReportingAnalyticsAdapter,
31 type ReportingRulePassRateInput,
32 type ReportingRulePassingContentSampleInput,
33} from '../../plugins/warehouse/queries/IReportingAnalyticsAdapter.js';
34
35export type ReporterKind = 'rule' | 'user';
36export type Reporter =
37 | { kind: 'rule'; id: string }
38 | { kind: 'user'; typeId: string; id: string };
39
40export type Appealer = { typeId: string; id: string };
41
42export type ReportSubmission = {
43 requestId: CorrelationId<'submit-report'>;
44 orgId: string;
45 reporter: Reporter;
46 reportedAt: Date;
47 reportedForReason?: {
48 policyId?: string;
49 reason?: string;
50 };
51 reportedItem: ItemSubmission;
52 reportedItemThread?: ItemSubmission[];
53 reportedItemsInThread?: ItemIdentifier[];
54 additionalItemSubmissions?: ItemSubmission[];
55 skipJobEnqueue: boolean;
56};
57
58export type AppealSubmission = {
59 requestId: CorrelationId<'submit-appeal'>;
60 appealId: string;
61 orgId: string;
62 appealedBy: Appealer;
63 appealedAt: Date;
64 appealReason?: string;
65 actionsTaken: string[];
66 actionedItem: ItemSubmission;
67 additionalItemSubmissions?: ItemSubmission[];
68 skipJobEnqueue: boolean;
69};
70
71export type ReportingRuleExecutionSourceType = 'submit-report';
72
73export type ReportingRuleExecutionCorrelationId =
74 CorrelationId<ReportingRuleExecutionSourceType>;
75
76function makeReportingService(
77 dataWarehouseAnalytics: Dependencies['DataWarehouseAnalytics'],
78 reportingAnalyticsAdapter: IReportingAnalyticsAdapter,
79 pgQuery: Kysely<ReportingServicePg>,
80 ruleEvaluator: Dependencies['RuleEvaluator'],
81 reportingRuleExecutionLogger: Dependencies['ReportingRuleExecutionLogger'],
82 actionPublisher: Dependencies['ActionPublisher'],
83 getActionsByIdEventuallyConsistent: Dependencies['getActionsByIdEventuallyConsistent'],
84 getPoliciesByIdEventuallyConsistent: Dependencies['getPoliciesByIdEventuallyConsistent'],
85 tracer: Dependencies['Tracer'],
86) {
87 const reportingRules = new ReportingRules(pgQuery);
88 const reportingRuleEngine = new ReportingRuleEngine(
89 ruleEvaluator,
90 reportingRuleExecutionLogger,
91 actionPublisher,
92 getActionsByIdEventuallyConsistent,
93 getPoliciesByIdEventuallyConsistent,
94 tracer,
95 reportingRules,
96 );
97
98 return {
99 async submitReport(submission: ReportSubmission): Promise<void> {
100 const {
101 requestId,
102 orgId,
103 reportedItem,
104 reporter,
105 reportedForReason,
106 reportedItemThread,
107 reportedItemsInThread,
108 additionalItemSubmissions,
109 skipJobEnqueue,
110 } = submission;
111
112 const reportRow = {
113 ts: new Date(),
114 org_id: orgId,
115 request_id: fromCorrelationId(requestId),
116 reporter_kind: reporter.kind,
117 reported_at: submission.reportedAt,
118 reported_item_id: reportedItem.itemId,
119 reported_item_data: reportedItem.data,
120 reported_item_type_id: reportedItem.itemType.id,
121 reported_item_type_kind: reportedItem.itemType.kind,
122 // nb: this is intentionally logged as a string not json, because it
123 // contains JSON nulls, which are not safe for the data warehouse JSON columns.
124 reported_item_type_schema: reportedItem.itemType.schema,
125 reported_item_type_schema_variant:
126 reportedItem.itemType.schemaVariant,
127 reported_item_type_version: reportedItem.itemType.version,
128 reported_item_type_schema_field_roles:
129 reportedItem.itemType.schemaFieldRoles,
130 ...(reporter.kind === 'user'
131 ? {
132 reporter_user_id: reporter.id,
133 reporter_user_item_type_id: reporter.typeId,
134 }
135 : {}),
136 ...(reportedItemThread
137 ? {
138 reported_item_thread: reportedItemThread.map((it) =>
139 itemSubmissionToLegacyReportItem(it),
140 ),
141 }
142 : {}),
143 ...(reportedItemsInThread
144 ? {
145 reported_items_in_thread: reportedItemsInThread,
146 }
147 : {}),
148 ...(additionalItemSubmissions
149 ? {
150 additional_items: additionalItemSubmissions.map((it) =>
151 itemSubmissionToLegacyReportItem(it),
152 ),
153 }
154 : {}),
155 ...(reportedForReason?.policyId
156 ? { policy_id: reportedForReason.policyId }
157 : {}),
158 ...(reportedForReason?.reason
159 ? { reported_for_reason: reportedForReason.reason }
160 : {}),
161 skip_job_enqueue: skipJobEnqueue,
162 } satisfies Record<string, unknown>;
163
164 try {
165 await dataWarehouseAnalytics.bulkWrite(
166 'REPORTING_SERVICE.REPORTS',
167 [reportRow],
168 );
169 } catch (error) {
170 // eslint-disable-next-line no-console
171 console.error(
172 '[ReportingService] Failed to write REPORTING_SERVICE.REPORTS row',
173 {
174 orgId,
175 requestId: fromCorrelationId(requestId),
176 error: error instanceof Error ? error.message : error,
177 row: jsonStringify(reportRow),
178 },
179 );
180 throw error;
181 }
182 },
183
184 async submitAppeal(submission: AppealSubmission): Promise<void> {
185 const {
186 appealId,
187 requestId,
188 orgId,
189 actionedItem,
190 appealedBy,
191 appealReason,
192 actionsTaken,
193 additionalItemSubmissions,
194 skipJobEnqueue,
195 } = submission;
196
197 const appealRow = {
198 ts: new Date(),
199 org_id: orgId,
200 request_id: fromCorrelationId(requestId),
201 appeal_id: appealId,
202 appealed_at: submission.appealedAt,
203 appeal_reason: appealReason,
204 actions_taken: actionsTaken,
205 actioned_item_id: actionedItem.itemId,
206 actioned_item_data: actionedItem.data,
207 actioned_item_type_id: actionedItem.itemType.id,
208 actioned_item_type_kind: actionedItem.itemType.kind,
209 // nb: this is intentionally logged as a string not json, because it
210 // contains JSON nulls, which are not safe for the data warehouse JSON columns.
211 actioned_item_type_schema: actionedItem.itemType.schema,
212 actioned_item_type_schema_variant:
213 actionedItem.itemType.schemaVariant,
214 actioned_item_type_version: actionedItem.itemType.version,
215 actioned_item_type_schema_field_roles:
216 actionedItem.itemType.schemaFieldRoles,
217 ...{
218 appealed_by_user_id: appealedBy.id,
219 appealed_by_user_item_type_id: appealedBy.typeId,
220 },
221 ...(additionalItemSubmissions
222 ? {
223 additional_items: additionalItemSubmissions.map((it) =>
224 itemSubmissionToItemSubmissionWithTypeIdentifier(it),
225 ),
226 }
227 : {}),
228 skip_job_enqueue: skipJobEnqueue,
229 } satisfies Record<string, unknown>;
230
231 try {
232 await dataWarehouseAnalytics.bulkWrite(
233 'REPORTING_SERVICE.APPEALS',
234 [appealRow],
235 );
236 } catch (error) {
237 // eslint-disable-next-line no-console
238 console.error(
239 '[ReportingService] Failed to write REPORTING_SERVICE.APPEALS row',
240 {
241 orgId,
242 appealId,
243 requestId: fromCorrelationId(requestId),
244 error: error instanceof Error ? error.message : error,
245 row: jsonStringify(appealRow),
246 },
247 );
248 throw error;
249 }
250 },
251
252 async getTotalIngestedReportsByDay(orgId: string) {
253 return reportingAnalyticsAdapter.getTotalIngestedReportsByDay(orgId);
254 },
255
256 async getReportingRulePassRateData(opts: {
257 orgId: string;
258 ruleId: string;
259 startDate?: Date;
260 }) {
261 const {
262 orgId,
263 ruleId,
264 startDate = new Date(Date.now() - YEAR_MS),
265 } = opts;
266 const input: ReportingRulePassRateInput = {
267 orgId,
268 ruleId,
269 startDate,
270 };
271 return reportingAnalyticsAdapter.getReportingRulePassRateData(input);
272 },
273
274 async getReportingRuleHistory<K extends VersionedField>(
275 ...getHistoryArgs: Parameters<Bind1<typeof getSimplifiedRuleHistory<K>>>
276 ) {
277 return getSimplifiedRuleHistory<K>(
278 async (...buildQueryArgs) =>
279 buildSimplifiedHistoryQuery(pgQuery, ...buildQueryArgs).execute(),
280 ...getHistoryArgs,
281 );
282 },
283
284 async getReportingRulePassingContentSamples(opts: {
285 orgId: string;
286 ruleId: string;
287 itemIds?: readonly string[];
288 numSamples: number;
289 source: 'latestVersion' | 'priorVersion';
290 }) {
291 const { orgId, ruleId, itemIds, numSamples, source } = opts;
292 // We only wanna show samples generated by the rule's current + prior
293 // conditionSet, as showing other samples will give a misleading impression
294 // of the rule's behavior. The only way to do that is to use the rule history
295 // service. Note that, even if we wanted to just use the rule's latest
296 // version, we'd have to use the history service (rather than reading the
297 // latest version from the data warehouse), b/c the warehouse is only eventually
298 // consistent (i.e., after a rule update, it won't see the new version for
299 // up to 5 minutes, so we'll show cleary outdated samples.)
300 const history = await this.getReportingRuleHistory(
301 ['conditionSet'],
302 [ruleId],
303 );
304
305 // Selects executions for this rule, verifying that this is the right org.
306 // We'll filter by rule version below.
307 const { exactVersion: mostRecentVersion } = history.at(-1)!;
308 const { exactVersion: priorVersion } = history.at(-2) ?? {};
309
310 if (source === 'priorVersion' && !priorVersion) {
311 return [];
312 }
313
314 const filter = match(source)
315 .with('latestVersion', () => {
316 const dateFilter = (() => {
317 const mostRecentVersionDate = new Date(mostRecentVersion);
318 const oneWeekAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
319 return mostRecentVersionDate > oneWeekAgo
320 ? mostRecentVersionDate
321 : oneWeekAgo;
322 })();
323
324 return {
325 type: 'latestVersion' as const,
326 minVersion: mostRecentVersion,
327 minDate: dateFilter,
328 };
329 })
330 .with('priorVersion', () => ({
331 type: 'priorVersion' as const,
332 fromVersion: priorVersion!,
333 toVersion: mostRecentVersion,
334 fromDate: new Date(priorVersion!),
335 toDate: new Date(mostRecentVersion),
336 }))
337 .exhaustive();
338
339 const adapterInput: ReportingRulePassingContentSampleInput = {
340 orgId,
341 ruleId,
342 itemIds,
343 numSamples,
344 filter,
345 };
346
347 return reportingAnalyticsAdapter.getReportingRulePassingContentSamples(
348 adapterInput,
349 );
350 },
351
352 async getReportingRules(opts: {
353 orgId: string;
354 directives?: ConsumerDirectives;
355 }) {
356 return reportingRules.getReportingRules(opts);
357 },
358
359 async createReportingRule(opts: CreateReportingRuleInput) {
360 return reportingRules.createReportingRule(opts);
361 },
362
363 async updateReportingRule(opts: UpdateReportingRuleInput) {
364 return reportingRules.updateReportingRule(opts);
365 },
366
367 async deleteReportingRule(opts: { orgId: string; id: string }) {
368 return reportingRules.deleteReportingRule(opts);
369 },
370
371 async runEnabledRules(
372 itemSubmission: ItemSubmission,
373 executionsCorrelationId: ReportingRuleExecutionCorrelationId,
374 ) {
375 return reportingRuleEngine.runEnabledRules(
376 itemSubmission,
377 executionsCorrelationId,
378 );
379 },
380
381 async getNumTimesReported(opts: { orgId: string; itemId: string }) {
382 const { orgId, itemId } = opts;
383 return reportingAnalyticsAdapter.getNumTimesReported(orgId, itemId);
384 },
385 };
386}
387
388export default inject(
389 [
390 'DataWarehouseAnalytics',
391 'ReportingAnalyticsAdapter',
392 'KyselyPg',
393 'RuleEvaluator',
394 'ReportingRuleExecutionLogger',
395 'ActionPublisher',
396 'getActionsByIdEventuallyConsistent',
397 'getPoliciesByIdEventuallyConsistent',
398 'Tracer',
399 ],
400 makeReportingService,
401);
402export type ReportingService = ReturnType<typeof makeReportingService>;
403
404function itemSubmissionToLegacyReportItem(it: ItemSubmission) {
405 return {
406 id: it.itemId,
407 data: it.data,
408 submisssionId: it.submissionId,
409 typeIdentifier: {
410 id: it.itemType.id,
411 version: it.itemType.version,
412 schemaVariant: it.itemType.schemaVariant,
413 },
414 };
415}