Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 91 lines 3.2 kB view raw
1import { type ItemIdentifier } from '@roostorg/types'; 2import { type Kysely } from 'kysely'; 3import _ from 'lodash'; 4import { type ReadonlyDeep } from 'type-fest'; 5 6import { inject, type Dependencies } from '../../iocContainer/index.js'; 7import { type UserStatisticsServiceWarehouse } from './dbTypes.js'; 8 9export type UserActionStatistics = { 10 userId: string; 11 userTypeId: string; 12 orgId: string; 13 actionId: string; 14 policyId: string | null; 15 actorId: string | null; 16 itemSubmissionIds: string[]; 17 count: number; 18}; 19 20const { chunk, uniqBy } = _; 21 22/** 23 * This is an internal function for querying a batch of users' lifetime action 24 * statistics from the data warehouse. It's not called directly by consumers of the user 25 * statistics service; instead, it's used by the service internally to freshen 26 * the cache that serves consumers' requests. 27 * 28 * @internal 29 */ 30export const makeFetchUserActionStatistics = inject( 31 ['DataWarehouseDialect'], 32 (dialect: Dependencies['DataWarehouseDialect']) => { 33 const warehouseKysely = 34 dialect.getKyselyInstance() as Kysely<UserStatisticsServiceWarehouse>; 35 return async ( 36 opts: ReadonlyDeep<{ 37 orgId: string; 38 userItemIdentifiers: ItemIdentifier[]; 39 }>, 40 ): Promise<UserActionStatistics[]> => { 41 const { orgId, userItemIdentifiers } = opts; 42 43 // The warehouse has a published limit of ~16,000 entries in a list expression 44 // (like we use in our `USER_ID IN (...)` filter), so we have to chunk the 45 // user ids. The warehouse also has a published limit of 1MB for the total 46 // length of the query (which must also coopr the size bind parameter 47 // values). However, we were getting warehouse errors in practice well 48 // below these limits, for not-totally-clear reasons. So, we set the limit 49 // here to the biggest chunk size that worked reliably. 50 const uniqUserItemIdentifiers = uniqBy( 51 userItemIdentifiers, 52 (it) => `${it.id}:${it.typeId}`, 53 ); 54 const userItemIdentifierBatches = chunk(uniqUserItemIdentifiers, 1000); 55 56 const makeQueryForBatch = (userItemIdentifiers: ItemIdentifier[]) => 57 warehouseKysely 58 .selectFrom('USER_STATISTICS_SERVICE.LIFETIME_ACTION_STATS') 59 .select([ 60 'USER_ID as userId', 61 'USER_TYPE_ID as userTypeId', 62 'ACTION_ID as actionId', 63 'POLICY_ID as policyId', 64 'ITEM_SUBMISSION_IDS as itemSubmissionIds', 65 'ACTOR_ID as actorId', 66 'COUNT as count', 67 ]) 68 .where('ORG_ID', '=', orgId) 69 .where(({ eb, and, or }) => 70 or( 71 userItemIdentifiers.map((itemIdentifier) => 72 and([ 73 eb('USER_ID', '=', itemIdentifier.id), 74 eb('USER_TYPE_ID', '=', itemIdentifier.typeId), 75 ]), 76 ), 77 ), 78 ); 79 80 const results = await Promise.all( 81 userItemIdentifierBatches.map(async (it) => 82 makeQueryForBatch(it) 83 .execute() 84 .then((results) => results.map((it) => ({ ...it, orgId }))), 85 ), 86 ); 87 88 return results.flat(); 89 }; 90 }, 91);