Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/**
2 * @fileoverview This file is the public entrypoint for our user statistics
3 * service.
4 */
5import { type ItemIdentifier } from '@roostorg/types';
6import { sql, type Kysely } from 'kysely';
7import { type ReadonlyDeep } from 'type-fest';
8
9import { inject, type Dependencies } from '../../iocContainer/index.js';
10import { type PolicyActionPenalties } from '../../models/OrgModel.js';
11import { initialUserScore, type UserScore } from './computeUserScore.js';
12import {
13 type UserStatisticsServicePg,
14 type UserStatisticsServiceWarehouse,
15} from './dbTypes.js';
16import {
17 makeFetchUserActionStatistics,
18 type UserActionStatistics,
19} from './fetchUserActionStatistics.js';
20import {
21 makeFetchUserSubmissionStatistics,
22 type UserSubmissionStatistics,
23} from './fetchUserSubmissionStatistics.js';
24
25// NB: This function -- which is exported both from this file and from index.js,
26// and is used for constructing the UserStatisticsService from the outside world
27// -- doesn't allow the caller to provide a custom implementation for
28// `fetchUserActionStatistics` or `fetchUserSubmissionStatistics`, because
29// customizing those isn't part of the user statistics service's public API.
30// That's mostly because any implementation other than the one hardcoded below
31// wouldn't work irl: those functions read data from specific warehouse tables
32// which other methods in this service also depend on (e.g.,
33// `refreshUserScoresCache`), so any alternate implementation would lead to
34// inconsistent data between methods. However, the
35// `internalMakeUserStatisticsService` function defined in this file, which is
36// exported here but is _not_ exported from index.js, does make those arguments
37// customizable, so that we can replace them with mocks in the tests.
38function makeUserStatisticsService(
39 pgQuery: Kysely<UserStatisticsServicePg>,
40 pgQueryReplica: Kysely<UserStatisticsServicePg>,
41 dialect: Dependencies['DataWarehouseDialect'],
42 _tracer: Dependencies['Tracer'],
43) {
44 const warehouseQuery =
45 dialect.getKyselyInstance() as Kysely<UserStatisticsServiceWarehouse>;
46 return internalMakeUserStatisticsService(
47 pgQuery,
48 pgQueryReplica,
49 warehouseQuery,
50 makeFetchUserActionStatistics(dialect),
51 makeFetchUserSubmissionStatistics(dialect),
52 );
53}
54
55export function internalMakeUserStatisticsService(
56 _pgQuery: Kysely<UserStatisticsServicePg>,
57 pgQueryReplica: Kysely<UserStatisticsServicePg>,
58 warehouseQuery: Kysely<UserStatisticsServiceWarehouse>,
59 _fetchUserActionStatistics: ReturnType<typeof makeFetchUserActionStatistics>,
60 _fetchUserSubmissionStatistics: ReturnType<
61 typeof makeFetchUserSubmissionStatistics
62 >,
63) {
64 return {
65 /**
66 * Gets a user's score from the "user scores cache", which is the postgres
67 * table that stores recent (but not necessarily fully up-to-date) scores.
68 */
69 async getUserScore(orgId: string, userItemIdentifier: ItemIdentifier) {
70 const { score } = (await pgQueryReplica
71 .selectFrom('user_statistics_service.user_scores')
72 .select('score')
73 .where('org_id', '=', orgId)
74 .where('user_type_id', '=', userItemIdentifier.typeId)
75 .where('user_id', '=', userItemIdentifier.id)
76 .executeTakeFirst()) ?? { score: initialUserScore };
77
78 return score as UserScore;
79 },
80
81 async handleUsersWithChangedScores(
82 _consumerId: string,
83 _cb: (
84 changedUsers: { userId: string; userTypeId: string; orgId: string }[],
85 ) => Promise<void>,
86 ) {
87 // Previously consumed warehouse streams on USER_SCORES; disabled until
88 // replaced with a warehouse-agnostic change feed.
89 },
90
91 async refreshUserScoresCache(
92 _getActionPenalties: (
93 orgId: string,
94 ) => Promise<ReadonlyDeep<PolicyActionPenalties[]>>,
95 ) {
96 // Previously consumed warehouse streams on SUBMISSION_STATS; disabled until
97 // replaced with a warehouse-agnostic incremental refresh.
98 },
99
100 async getUserActionCountsByPolicy(
101 orgId: string,
102 userItemIdentifier: ItemIdentifier,
103 ) {
104 return warehouseQuery
105 .selectFrom('USER_STATISTICS_SERVICE.LIFETIME_ACTION_STATS')
106 .select([
107 'ACTION_ID as actionId',
108 'POLICY_ID as policyId',
109 'ACTOR_ID as actorId',
110 'COUNT as count',
111 'ITEM_SUBMISSION_IDS as itemSubmissionIds',
112 ])
113 .where('ORG_ID', '=', orgId)
114 .where('USER_TYPE_ID', '=', userItemIdentifier.typeId)
115 .where(sql`LOWER(USER_ID)`, '=', userItemIdentifier.id.toLowerCase())
116 .execute();
117 },
118
119 async getUserSubmissionCount(
120 orgId: string,
121 userItemIdentifier: ItemIdentifier,
122 ) {
123 return warehouseQuery
124 .selectFrom('USER_STATISTICS_SERVICE.SUBMISSION_STATS')
125 .select([
126 'ITEM_TYPE_ID as itemTypeId',
127 warehouseQuery.fn.sum<number>('NUM_SUBMISSIONS').as('count'),
128 ])
129 .where('ORG_ID', '=', orgId)
130 .where('USER_TYPE_ID', '=', userItemIdentifier.typeId)
131 .where(sql`LOWER(USER_ID)`, '=', userItemIdentifier.id.toLowerCase())
132 .groupBy('ITEM_TYPE_ID')
133 .execute();
134 },
135 };
136}
137
138export type UserStatisticsService = ReturnType<
139 typeof makeUserStatisticsService
140>;
141
142export default inject(
143 [
144 'KyselyPg',
145 'KyselyPgReadReplica',
146 'DataWarehouseDialect',
147 'Tracer',
148 ],
149 makeUserStatisticsService,
150);
151
152export type { UserActionStatistics, UserSubmissionStatistics, UserScore };