Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { type ItemIdentifier } from '@roostorg/types';
2import { type Kysely } from 'kysely';
3import _ from 'lodash';
4
5import { inject, type Dependencies } from '../../iocContainer/index.js';
6import { jsonStringify } from '../../utils/encoding.js';
7import { type UserStatisticsServiceWarehouse } from './dbTypes.js';
8
9const { chunk, uniqBy } = _;
10
11export type UserSubmissionStatistics = {
12 userId: string;
13 userTypeId: string;
14 orgId: string;
15 itemTypeId: string;
16 numSubmissions: number;
17};
18
19/**
20 * This is an internal function for querying a batch of users' content
21 * submission statistics from the data warehouse. It's not called directly by consumers
22 * of the user statistics service; instead, it's used by the service internally
23 * to freshen the cache that serves consumers' requests.
24 *
25 * @internal
26 */
27export const makeFetchUserSubmissionStatistics = inject(
28 ['DataWarehouseDialect'],
29 (dialect: Dependencies['DataWarehouseDialect']) => {
30 const warehouseKysely =
31 dialect.getKyselyInstance() as Kysely<UserStatisticsServiceWarehouse>;
32 return async (opts: {
33 readonly orgId: string;
34 readonly userItemIdentifiers: readonly ItemIdentifier[];
35 readonly startTime?: Date;
36 readonly endTime?: Date;
37 }): Promise<UserSubmissionStatistics[]> => {
38 const { startTime, endTime, orgId } = opts;
39
40 // The warehouse has a published limit of ~16,000 entries in a list expression
41 // (like we use in our `USER_ID IN (...)` filter), so we have to chunk the
42 // user ids. The warehouse also has a published limit of 1MB for the total
43 // length of the query (which must also coopr the size bind parameter
44 // values). However, we were getting warehouse errors in practice well
45 // below these limits, for not-totally-clear reasons. So, we set the limit
46 // here to the biggest chunk size that worked reliably.
47 const uniqUserItemIdentifiers = uniqBy(opts.userItemIdentifiers, (a) =>
48 jsonStringify([a.id, a.typeId]),
49 );
50 const userItemIdentifierBatches = chunk(uniqUserItemIdentifiers, 1000);
51
52 const makeQueryForBatch = (userItemIdentifiers: ItemIdentifier[]) => {
53 let query = warehouseKysely
54 .selectFrom('USER_STATISTICS_SERVICE.SUBMISSION_STATS')
55 .select([
56 'USER_ID as userId',
57 'USER_TYPE_ID as userTypeId',
58 'ITEM_TYPE_ID as itemTypeId',
59 // NB: we need the manual `number` type param to indicate that this
60 // isn't returned as a bigint or a numeric string (it shouldn't be,
61 // because a user can't possibly have that many submissions).
62 warehouseKysely.fn.sum<number>('NUM_SUBMISSIONS').as('numSubmissions'),
63 ])
64 .where('ORG_ID', '=', orgId)
65 .where(({ eb, and, or }) =>
66 or(
67 userItemIdentifiers.map((itemIdentifier) =>
68 and([
69 eb('USER_ID', '=', itemIdentifier.id),
70 eb('USER_TYPE_ID', '=', itemIdentifier.typeId),
71 ]),
72 ),
73 ),
74 )
75 .groupBy(['USER_ID', 'USER_TYPE_ID', 'ITEM_TYPE_ID']);
76
77 if (startTime) {
78 query = query.where('TS_START_INCLUSIVE', '>=', startTime);
79 }
80
81 if (endTime) {
82 query = query.where('TS_END_EXCLUSIVE', '<=', endTime);
83 }
84
85 return query;
86 };
87
88 const results = await Promise.all(
89 userItemIdentifierBatches.map(async (batch) =>
90 makeQueryForBatch(batch)
91 .execute()
92 .then((results) => results.map((it) => ({ ...it, orgId }))),
93 ),
94 );
95
96 return results.flat();
97 };
98 },
99);