Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 415 lines 14 kB view raw
1import { type ConsumerDirectives } from '../../lib/cache/index.js'; 2import { type ItemIdentifier } from '@roostorg/types'; 3import { type Kysely } from 'kysely'; 4import { match } from 'ts-pattern'; 5 6import { inject, type Dependencies } from '../../iocContainer/index.js'; 7import { 8 fromCorrelationId, 9 type CorrelationId, 10} from '../../utils/correlationIds.js'; 11import { YEAR_MS } from '../../utils/time.js'; 12import { type Bind1 } from '../../utils/typescript-types.js'; 13import { 14 itemSubmissionToItemSubmissionWithTypeIdentifier, 15 type ItemSubmission, 16} from '../itemProcessingService/index.js'; 17import { type ReportingServicePg } from './dbTypes.js'; 18import ReportingRuleEngine from './reportingRuleEngine.js'; 19import { 20 buildSimplifiedHistoryQuery, 21 getSimplifiedRuleHistory, 22 type VersionedField, 23} from './reportingRuleHistoryHelpers.js'; 24import ReportingRules, { 25 type CreateReportingRuleInput, 26 type UpdateReportingRuleInput, 27} from './ReportingRules.js'; 28import { jsonStringify } from '../../utils/encoding.js'; 29import { 30 type IReportingAnalyticsAdapter, 31 type ReportingRulePassRateInput, 32 type ReportingRulePassingContentSampleInput, 33} from '../../plugins/warehouse/queries/IReportingAnalyticsAdapter.js'; 34 35export type ReporterKind = 'rule' | 'user'; 36export type Reporter = 37 | { kind: 'rule'; id: string } 38 | { kind: 'user'; typeId: string; id: string }; 39 40export type Appealer = { typeId: string; id: string }; 41 42export type ReportSubmission = { 43 requestId: CorrelationId<'submit-report'>; 44 orgId: string; 45 reporter: Reporter; 46 reportedAt: Date; 47 reportedForReason?: { 48 policyId?: string; 49 reason?: string; 50 }; 51 reportedItem: ItemSubmission; 52 reportedItemThread?: ItemSubmission[]; 53 reportedItemsInThread?: ItemIdentifier[]; 54 additionalItemSubmissions?: ItemSubmission[]; 55 skipJobEnqueue: boolean; 56}; 57 58export type AppealSubmission = { 59 requestId: CorrelationId<'submit-appeal'>; 60 appealId: string; 61 orgId: string; 62 appealedBy: Appealer; 63 appealedAt: Date; 64 appealReason?: string; 65 actionsTaken: string[]; 66 actionedItem: ItemSubmission; 67 additionalItemSubmissions?: ItemSubmission[]; 68 skipJobEnqueue: boolean; 69}; 70 71export type ReportingRuleExecutionSourceType = 'submit-report'; 72 73export type ReportingRuleExecutionCorrelationId = 74 CorrelationId<ReportingRuleExecutionSourceType>; 75 76function makeReportingService( 77 dataWarehouseAnalytics: Dependencies['DataWarehouseAnalytics'], 78 reportingAnalyticsAdapter: IReportingAnalyticsAdapter, 79 pgQuery: Kysely<ReportingServicePg>, 80 ruleEvaluator: Dependencies['RuleEvaluator'], 81 reportingRuleExecutionLogger: Dependencies['ReportingRuleExecutionLogger'], 82 actionPublisher: Dependencies['ActionPublisher'], 83 getActionsByIdEventuallyConsistent: Dependencies['getActionsByIdEventuallyConsistent'], 84 getPoliciesByIdEventuallyConsistent: Dependencies['getPoliciesByIdEventuallyConsistent'], 85 tracer: Dependencies['Tracer'], 86) { 87 const reportingRules = new ReportingRules(pgQuery); 88 const reportingRuleEngine = new ReportingRuleEngine( 89 ruleEvaluator, 90 reportingRuleExecutionLogger, 91 actionPublisher, 92 getActionsByIdEventuallyConsistent, 93 getPoliciesByIdEventuallyConsistent, 94 tracer, 95 reportingRules, 96 ); 97 98 return { 99 async submitReport(submission: ReportSubmission): Promise<void> { 100 const { 101 requestId, 102 orgId, 103 reportedItem, 104 reporter, 105 reportedForReason, 106 reportedItemThread, 107 reportedItemsInThread, 108 additionalItemSubmissions, 109 skipJobEnqueue, 110 } = submission; 111 112 const reportRow = { 113 ts: new Date(), 114 org_id: orgId, 115 request_id: fromCorrelationId(requestId), 116 reporter_kind: reporter.kind, 117 reported_at: submission.reportedAt, 118 reported_item_id: reportedItem.itemId, 119 reported_item_data: reportedItem.data, 120 reported_item_type_id: reportedItem.itemType.id, 121 reported_item_type_kind: reportedItem.itemType.kind, 122 // nb: this is intentionally logged as a string not json, because it 123 // contains JSON nulls, which are not safe for the data warehouse JSON columns. 124 reported_item_type_schema: reportedItem.itemType.schema, 125 reported_item_type_schema_variant: 126 reportedItem.itemType.schemaVariant, 127 reported_item_type_version: reportedItem.itemType.version, 128 reported_item_type_schema_field_roles: 129 reportedItem.itemType.schemaFieldRoles, 130 ...(reporter.kind === 'user' 131 ? { 132 reporter_user_id: reporter.id, 133 reporter_user_item_type_id: reporter.typeId, 134 } 135 : {}), 136 ...(reportedItemThread 137 ? { 138 reported_item_thread: reportedItemThread.map((it) => 139 itemSubmissionToLegacyReportItem(it), 140 ), 141 } 142 : {}), 143 ...(reportedItemsInThread 144 ? { 145 reported_items_in_thread: reportedItemsInThread, 146 } 147 : {}), 148 ...(additionalItemSubmissions 149 ? { 150 additional_items: additionalItemSubmissions.map((it) => 151 itemSubmissionToLegacyReportItem(it), 152 ), 153 } 154 : {}), 155 ...(reportedForReason?.policyId 156 ? { policy_id: reportedForReason.policyId } 157 : {}), 158 ...(reportedForReason?.reason 159 ? { reported_for_reason: reportedForReason.reason } 160 : {}), 161 skip_job_enqueue: skipJobEnqueue, 162 } satisfies Record<string, unknown>; 163 164 try { 165 await dataWarehouseAnalytics.bulkWrite( 166 'REPORTING_SERVICE.REPORTS', 167 [reportRow], 168 ); 169 } catch (error) { 170 // eslint-disable-next-line no-console 171 console.error( 172 '[ReportingService] Failed to write REPORTING_SERVICE.REPORTS row', 173 { 174 orgId, 175 requestId: fromCorrelationId(requestId), 176 error: error instanceof Error ? error.message : error, 177 row: jsonStringify(reportRow), 178 }, 179 ); 180 throw error; 181 } 182 }, 183 184 async submitAppeal(submission: AppealSubmission): Promise<void> { 185 const { 186 appealId, 187 requestId, 188 orgId, 189 actionedItem, 190 appealedBy, 191 appealReason, 192 actionsTaken, 193 additionalItemSubmissions, 194 skipJobEnqueue, 195 } = submission; 196 197 const appealRow = { 198 ts: new Date(), 199 org_id: orgId, 200 request_id: fromCorrelationId(requestId), 201 appeal_id: appealId, 202 appealed_at: submission.appealedAt, 203 appeal_reason: appealReason, 204 actions_taken: actionsTaken, 205 actioned_item_id: actionedItem.itemId, 206 actioned_item_data: actionedItem.data, 207 actioned_item_type_id: actionedItem.itemType.id, 208 actioned_item_type_kind: actionedItem.itemType.kind, 209 // nb: this is intentionally logged as a string not json, because it 210 // contains JSON nulls, which are not safe for the data warehouse JSON columns. 211 actioned_item_type_schema: actionedItem.itemType.schema, 212 actioned_item_type_schema_variant: 213 actionedItem.itemType.schemaVariant, 214 actioned_item_type_version: actionedItem.itemType.version, 215 actioned_item_type_schema_field_roles: 216 actionedItem.itemType.schemaFieldRoles, 217 ...{ 218 appealed_by_user_id: appealedBy.id, 219 appealed_by_user_item_type_id: appealedBy.typeId, 220 }, 221 ...(additionalItemSubmissions 222 ? { 223 additional_items: additionalItemSubmissions.map((it) => 224 itemSubmissionToItemSubmissionWithTypeIdentifier(it), 225 ), 226 } 227 : {}), 228 skip_job_enqueue: skipJobEnqueue, 229 } satisfies Record<string, unknown>; 230 231 try { 232 await dataWarehouseAnalytics.bulkWrite( 233 'REPORTING_SERVICE.APPEALS', 234 [appealRow], 235 ); 236 } catch (error) { 237 // eslint-disable-next-line no-console 238 console.error( 239 '[ReportingService] Failed to write REPORTING_SERVICE.APPEALS row', 240 { 241 orgId, 242 appealId, 243 requestId: fromCorrelationId(requestId), 244 error: error instanceof Error ? error.message : error, 245 row: jsonStringify(appealRow), 246 }, 247 ); 248 throw error; 249 } 250 }, 251 252 async getTotalIngestedReportsByDay(orgId: string) { 253 return reportingAnalyticsAdapter.getTotalIngestedReportsByDay(orgId); 254 }, 255 256 async getReportingRulePassRateData(opts: { 257 orgId: string; 258 ruleId: string; 259 startDate?: Date; 260 }) { 261 const { 262 orgId, 263 ruleId, 264 startDate = new Date(Date.now() - YEAR_MS), 265 } = opts; 266 const input: ReportingRulePassRateInput = { 267 orgId, 268 ruleId, 269 startDate, 270 }; 271 return reportingAnalyticsAdapter.getReportingRulePassRateData(input); 272 }, 273 274 async getReportingRuleHistory<K extends VersionedField>( 275 ...getHistoryArgs: Parameters<Bind1<typeof getSimplifiedRuleHistory<K>>> 276 ) { 277 return getSimplifiedRuleHistory<K>( 278 async (...buildQueryArgs) => 279 buildSimplifiedHistoryQuery(pgQuery, ...buildQueryArgs).execute(), 280 ...getHistoryArgs, 281 ); 282 }, 283 284 async getReportingRulePassingContentSamples(opts: { 285 orgId: string; 286 ruleId: string; 287 itemIds?: readonly string[]; 288 numSamples: number; 289 source: 'latestVersion' | 'priorVersion'; 290 }) { 291 const { orgId, ruleId, itemIds, numSamples, source } = opts; 292 // We only wanna show samples generated by the rule's current + prior 293 // conditionSet, as showing other samples will give a misleading impression 294 // of the rule's behavior. The only way to do that is to use the rule history 295 // service. Note that, even if we wanted to just use the rule's latest 296 // version, we'd have to use the history service (rather than reading the 297 // latest version from the data warehouse), b/c the warehouse is only eventually 298 // consistent (i.e., after a rule update, it won't see the new version for 299 // up to 5 minutes, so we'll show cleary outdated samples.) 300 const history = await this.getReportingRuleHistory( 301 ['conditionSet'], 302 [ruleId], 303 ); 304 305 // Selects executions for this rule, verifying that this is the right org. 306 // We'll filter by rule version below. 307 const { exactVersion: mostRecentVersion } = history.at(-1)!; 308 const { exactVersion: priorVersion } = history.at(-2) ?? {}; 309 310 if (source === 'priorVersion' && !priorVersion) { 311 return []; 312 } 313 314 const filter = match(source) 315 .with('latestVersion', () => { 316 const dateFilter = (() => { 317 const mostRecentVersionDate = new Date(mostRecentVersion); 318 const oneWeekAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); 319 return mostRecentVersionDate > oneWeekAgo 320 ? mostRecentVersionDate 321 : oneWeekAgo; 322 })(); 323 324 return { 325 type: 'latestVersion' as const, 326 minVersion: mostRecentVersion, 327 minDate: dateFilter, 328 }; 329 }) 330 .with('priorVersion', () => ({ 331 type: 'priorVersion' as const, 332 fromVersion: priorVersion!, 333 toVersion: mostRecentVersion, 334 fromDate: new Date(priorVersion!), 335 toDate: new Date(mostRecentVersion), 336 })) 337 .exhaustive(); 338 339 const adapterInput: ReportingRulePassingContentSampleInput = { 340 orgId, 341 ruleId, 342 itemIds, 343 numSamples, 344 filter, 345 }; 346 347 return reportingAnalyticsAdapter.getReportingRulePassingContentSamples( 348 adapterInput, 349 ); 350 }, 351 352 async getReportingRules(opts: { 353 orgId: string; 354 directives?: ConsumerDirectives; 355 }) { 356 return reportingRules.getReportingRules(opts); 357 }, 358 359 async createReportingRule(opts: CreateReportingRuleInput) { 360 return reportingRules.createReportingRule(opts); 361 }, 362 363 async updateReportingRule(opts: UpdateReportingRuleInput) { 364 return reportingRules.updateReportingRule(opts); 365 }, 366 367 async deleteReportingRule(opts: { orgId: string; id: string }) { 368 return reportingRules.deleteReportingRule(opts); 369 }, 370 371 async runEnabledRules( 372 itemSubmission: ItemSubmission, 373 executionsCorrelationId: ReportingRuleExecutionCorrelationId, 374 ) { 375 return reportingRuleEngine.runEnabledRules( 376 itemSubmission, 377 executionsCorrelationId, 378 ); 379 }, 380 381 async getNumTimesReported(opts: { orgId: string; itemId: string }) { 382 const { orgId, itemId } = opts; 383 return reportingAnalyticsAdapter.getNumTimesReported(orgId, itemId); 384 }, 385 }; 386} 387 388export default inject( 389 [ 390 'DataWarehouseAnalytics', 391 'ReportingAnalyticsAdapter', 392 'KyselyPg', 393 'RuleEvaluator', 394 'ReportingRuleExecutionLogger', 395 'ActionPublisher', 396 'getActionsByIdEventuallyConsistent', 397 'getPoliciesByIdEventuallyConsistent', 398 'Tracer', 399 ], 400 makeReportingService, 401); 402export type ReportingService = ReturnType<typeof makeReportingService>; 403 404function itemSubmissionToLegacyReportItem(it: ItemSubmission) { 405 return { 406 id: it.itemId, 407 data: it.data, 408 submisssionId: it.submissionId, 409 typeIdentifier: { 410 id: it.itemType.id, 411 version: it.itemType.version, 412 schemaVariant: it.itemType.schemaVariant, 413 }, 414 }; 415}