Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 99 lines 3.7 kB view raw
1import { type ItemIdentifier } from '@roostorg/types'; 2import { type Kysely } from 'kysely'; 3import _ from 'lodash'; 4 5import { inject, type Dependencies } from '../../iocContainer/index.js'; 6import { jsonStringify } from '../../utils/encoding.js'; 7import { type UserStatisticsServiceWarehouse } from './dbTypes.js'; 8 9const { chunk, uniqBy } = _; 10 11export type UserSubmissionStatistics = { 12 userId: string; 13 userTypeId: string; 14 orgId: string; 15 itemTypeId: string; 16 numSubmissions: number; 17}; 18 19/** 20 * This is an internal function for querying a batch of users' content 21 * submission statistics from the data warehouse. It's not called directly by consumers 22 * of the user statistics service; instead, it's used by the service internally 23 * to freshen the cache that serves consumers' requests. 24 * 25 * @internal 26 */ 27export const makeFetchUserSubmissionStatistics = inject( 28 ['DataWarehouseDialect'], 29 (dialect: Dependencies['DataWarehouseDialect']) => { 30 const warehouseKysely = 31 dialect.getKyselyInstance() as Kysely<UserStatisticsServiceWarehouse>; 32 return async (opts: { 33 readonly orgId: string; 34 readonly userItemIdentifiers: readonly ItemIdentifier[]; 35 readonly startTime?: Date; 36 readonly endTime?: Date; 37 }): Promise<UserSubmissionStatistics[]> => { 38 const { startTime, endTime, orgId } = opts; 39 40 // The warehouse has a published limit of ~16,000 entries in a list expression 41 // (like we use in our `USER_ID IN (...)` filter), so we have to chunk the 42 // user ids. The warehouse also has a published limit of 1MB for the total 43 // length of the query (which must also coopr the size bind parameter 44 // values). However, we were getting warehouse errors in practice well 45 // below these limits, for not-totally-clear reasons. So, we set the limit 46 // here to the biggest chunk size that worked reliably. 47 const uniqUserItemIdentifiers = uniqBy(opts.userItemIdentifiers, (a) => 48 jsonStringify([a.id, a.typeId]), 49 ); 50 const userItemIdentifierBatches = chunk(uniqUserItemIdentifiers, 1000); 51 52 const makeQueryForBatch = (userItemIdentifiers: ItemIdentifier[]) => { 53 let query = warehouseKysely 54 .selectFrom('USER_STATISTICS_SERVICE.SUBMISSION_STATS') 55 .select([ 56 'USER_ID as userId', 57 'USER_TYPE_ID as userTypeId', 58 'ITEM_TYPE_ID as itemTypeId', 59 // NB: we need the manual `number` type param to indicate that this 60 // isn't returned as a bigint or a numeric string (it shouldn't be, 61 // because a user can't possibly have that many submissions). 62 warehouseKysely.fn.sum<number>('NUM_SUBMISSIONS').as('numSubmissions'), 63 ]) 64 .where('ORG_ID', '=', orgId) 65 .where(({ eb, and, or }) => 66 or( 67 userItemIdentifiers.map((itemIdentifier) => 68 and([ 69 eb('USER_ID', '=', itemIdentifier.id), 70 eb('USER_TYPE_ID', '=', itemIdentifier.typeId), 71 ]), 72 ), 73 ), 74 ) 75 .groupBy(['USER_ID', 'USER_TYPE_ID', 'ITEM_TYPE_ID']); 76 77 if (startTime) { 78 query = query.where('TS_START_INCLUSIVE', '>=', startTime); 79 } 80 81 if (endTime) { 82 query = query.where('TS_END_EXCLUSIVE', '<=', endTime); 83 } 84 85 return query; 86 }; 87 88 const results = await Promise.all( 89 userItemIdentifierBatches.map(async (batch) => 90 makeQueryForBatch(batch) 91 .execute() 92 .then((results) => results.map((it) => ({ ...it, orgId }))), 93 ), 94 ); 95 96 return results.flat(); 97 }; 98 }, 99);