Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import lodash from 'lodash';
2
3import { inject } from '../../iocContainer/utils.js';
4import { type NonEmptyArray } from '../../utils/typescript-types.js';
5import { RuleAlarmStatus } from '../moderationConfigService/index.js';
6import { NotificationType } from '../notificationsService/notificationsService.js';
7
8const { capitalize, keyBy } = lodash;
9
10type OrgAlertRow = {
11 id: string;
12 on_call_alert_email: string | null;
13};
14
15export default inject(
16 [
17 'KyselyPg',
18 'NotificationsService',
19 'getCurrentPeriodRuleAlarmStatuses',
20 'closeSharedResourcesForShutdown',
21 ],
22 (
23 db,
24 notificationsService,
25 getCurrentPeriodRuleAlarmStatuses,
26 sharedResourceShutdown,
27 ) => ({
28 type: 'Job' as const,
29 async run() {
30 const now = new Date();
31 const newAlarmStatusByRule = await getCurrentPeriodRuleAlarmStatuses();
32
33 const ruleIds = Object.keys(newAlarmStatusByRule);
34 if (ruleIds.length === 0) {
35 return;
36 }
37
38 const rules = await db
39 .selectFrom('public.rules')
40 .select([
41 'id',
42 'org_id',
43 'creator_id',
44 'name',
45 'alarm_status',
46 'status_if_unexpired',
47 ])
48 .where('id', 'in', ruleIds)
49 .execute();
50
51 const alarmStatusChangedRules = rules.filter(
52 (rule) => rule.alarm_status !== newAlarmStatusByRule[rule.id].status,
53 );
54
55 const changedRuleOrgIds = alarmStatusChangedRules.map((it) => it.org_id);
56 let orgsForChangedRules: Record<string, OrgAlertRow> = {};
57 if (changedRuleOrgIds.length > 0) {
58 const orgRows = (await db
59 .selectFrom('public.orgs')
60 .select(['id', 'on_call_alert_email'])
61 .where('id', 'in', [...new Set(changedRuleOrgIds)])
62 .execute()) as OrgAlertRow[];
63 orgsForChangedRules = keyBy(orgRows, (r) => r.id);
64 }
65
66 const notifications = alarmStatusChangedRules
67 .filter(
68 (it) =>
69 it.alarm_status === RuleAlarmStatus.ALARM ||
70 newAlarmStatusByRule[it.id].status === RuleAlarmStatus.ALARM,
71 )
72 .flatMap((rule) => {
73 const ruleNowInAlarm =
74 newAlarmStatusByRule[rule.id].status === RuleAlarmStatus.ALARM;
75
76 const orgRow = orgsForChangedRules[rule.org_id];
77 if (!orgRow) {
78 return [];
79 }
80
81 return {
82 type: ruleNowInAlarm
83 ? NotificationType.RulePassRateIncreaseAnomalyStart
84 : NotificationType.RulePassRateIncreaseAnomalyEnd,
85 data: {
86 ruleId: rule.id,
87 ruleName: rule.name,
88 lastPeriodPassRate:
89 newAlarmStatusByRule[rule.id].meta.lastPeriodPassRate,
90 secondToLastPeriodPassRate:
91 newAlarmStatusByRule[rule.id].meta.secondToLastPeriodPassRate,
92 },
93 message: `${
94 ruleNowInAlarm
95 ? `[Alarm Triggered - ${capitalize(
96 String(rule.status_if_unexpired),
97 )} Rule]`
98 : `[Alarm Cleared - ${capitalize(
99 String(rule.status_if_unexpired),
100 )} Rule]`
101 } ${rule.name} has ${
102 ruleNowInAlarm ? 'started' : 'stopped'
103 } passing at an anomalous rate.`,
104 recipients: [
105 { type: 'user_id' as const, value: rule.creator_id },
106 ...(orgRow.on_call_alert_email
107 ? [
108 {
109 type: 'email_address' as const,
110 value: orgRow.on_call_alert_email,
111 },
112 ]
113 : []),
114 ],
115 } as const;
116 });
117
118 const ruleUpdateTasks = alarmStatusChangedRules.map(async (rule) => {
119 await db
120 .updateTable('public.rules')
121 .set({
122 alarm_status: newAlarmStatusByRule[rule.id].status,
123 alarm_status_set_at: now,
124 })
125 .where('id', '=', rule.id)
126 .execute();
127 });
128
129 await Promise.all([
130 notifications.length &&
131 notificationsService.createNotifications(
132 notifications as NonEmptyArray<(typeof notifications)[number]>,
133 ),
134 ...ruleUpdateTasks,
135 ]);
136 },
137 async shutdown() {
138 await sharedResourceShutdown();
139 },
140 }),
141);