Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { type ItemIdentifier } from '@roostorg/types';
2import { type Kysely } from 'kysely';
3import _ from 'lodash';
4import { type ReadonlyDeep } from 'type-fest';
5
6import { inject, type Dependencies } from '../../iocContainer/index.js';
7import { type UserStatisticsServiceWarehouse } from './dbTypes.js';
8
9export type UserActionStatistics = {
10 userId: string;
11 userTypeId: string;
12 orgId: string;
13 actionId: string;
14 policyId: string | null;
15 actorId: string | null;
16 itemSubmissionIds: string[];
17 count: number;
18};
19
20const { chunk, uniqBy } = _;
21
22/**
23 * This is an internal function for querying a batch of users' lifetime action
24 * statistics from the data warehouse. It's not called directly by consumers of the user
25 * statistics service; instead, it's used by the service internally to freshen
26 * the cache that serves consumers' requests.
27 *
28 * @internal
29 */
30export const makeFetchUserActionStatistics = inject(
31 ['DataWarehouseDialect'],
32 (dialect: Dependencies['DataWarehouseDialect']) => {
33 const warehouseKysely =
34 dialect.getKyselyInstance() as Kysely<UserStatisticsServiceWarehouse>;
35 return async (
36 opts: ReadonlyDeep<{
37 orgId: string;
38 userItemIdentifiers: ItemIdentifier[];
39 }>,
40 ): Promise<UserActionStatistics[]> => {
41 const { orgId, userItemIdentifiers } = opts;
42
43 // The warehouse has a published limit of ~16,000 entries in a list expression
44 // (like we use in our `USER_ID IN (...)` filter), so we have to chunk the
45 // user ids. The warehouse also has a published limit of 1MB for the total
46 // length of the query (which must also coopr the size bind parameter
47 // values). However, we were getting warehouse errors in practice well
48 // below these limits, for not-totally-clear reasons. So, we set the limit
49 // here to the biggest chunk size that worked reliably.
50 const uniqUserItemIdentifiers = uniqBy(
51 userItemIdentifiers,
52 (it) => `${it.id}:${it.typeId}`,
53 );
54 const userItemIdentifierBatches = chunk(uniqUserItemIdentifiers, 1000);
55
56 const makeQueryForBatch = (userItemIdentifiers: ItemIdentifier[]) =>
57 warehouseKysely
58 .selectFrom('USER_STATISTICS_SERVICE.LIFETIME_ACTION_STATS')
59 .select([
60 'USER_ID as userId',
61 'USER_TYPE_ID as userTypeId',
62 'ACTION_ID as actionId',
63 'POLICY_ID as policyId',
64 'ITEM_SUBMISSION_IDS as itemSubmissionIds',
65 'ACTOR_ID as actorId',
66 'COUNT as count',
67 ])
68 .where('ORG_ID', '=', orgId)
69 .where(({ eb, and, or }) =>
70 or(
71 userItemIdentifiers.map((itemIdentifier) =>
72 and([
73 eb('USER_ID', '=', itemIdentifier.id),
74 eb('USER_TYPE_ID', '=', itemIdentifier.typeId),
75 ]),
76 ),
77 ),
78 );
79
80 const results = await Promise.all(
81 userItemIdentifierBatches.map(async (it) =>
82 makeQueryForBatch(it)
83 .execute()
84 .then((results) => results.map((it) => ({ ...it, orgId }))),
85 ),
86 );
87
88 return results.flat();
89 };
90 },
91);