Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 557ff54b2b435e5f1e789c6a8a4e1bebf2d7deb6 152 lines 5.5 kB view raw
1/** 2 * @fileoverview This file is the public entrypoint for our user statistics 3 * service. 4 */ 5import { type ItemIdentifier } from '@roostorg/types'; 6import { sql, type Kysely } from 'kysely'; 7import { type ReadonlyDeep } from 'type-fest'; 8 9import { inject, type Dependencies } from '../../iocContainer/index.js'; 10import { type PolicyActionPenalties } from '../../models/OrgModel.js'; 11import { initialUserScore, type UserScore } from './computeUserScore.js'; 12import { 13 type UserStatisticsServicePg, 14 type UserStatisticsServiceWarehouse, 15} from './dbTypes.js'; 16import { 17 makeFetchUserActionStatistics, 18 type UserActionStatistics, 19} from './fetchUserActionStatistics.js'; 20import { 21 makeFetchUserSubmissionStatistics, 22 type UserSubmissionStatistics, 23} from './fetchUserSubmissionStatistics.js'; 24 25// NB: This function -- which is exported both from this file and from index.js, 26// and is used for constructing the UserStatisticsService from the outside world 27// -- doesn't allow the caller to provide a custom implementation for 28// `fetchUserActionStatistics` or `fetchUserSubmissionStatistics`, because 29// customizing those isn't part of the user statistics service's public API. 30// That's mostly because any implementation other than the one hardcoded below 31// wouldn't work irl: those functions read data from specific warehouse tables 32// which other methods in this service also depend on (e.g., 33// `refreshUserScoresCache`), so any alternate implementation would lead to 34// inconsistent data between methods. However, the 35// `internalMakeUserStatisticsService` function defined in this file, which is 36// exported here but is _not_ exported from index.js, does make those arguments 37// customizable, so that we can replace them with mocks in the tests. 38function makeUserStatisticsService( 39 pgQuery: Kysely<UserStatisticsServicePg>, 40 pgQueryReplica: Kysely<UserStatisticsServicePg>, 41 dialect: Dependencies['DataWarehouseDialect'], 42 _tracer: Dependencies['Tracer'], 43) { 44 const warehouseQuery = 45 dialect.getKyselyInstance() as Kysely<UserStatisticsServiceWarehouse>; 46 return internalMakeUserStatisticsService( 47 pgQuery, 48 pgQueryReplica, 49 warehouseQuery, 50 makeFetchUserActionStatistics(dialect), 51 makeFetchUserSubmissionStatistics(dialect), 52 ); 53} 54 55export function internalMakeUserStatisticsService( 56 _pgQuery: Kysely<UserStatisticsServicePg>, 57 pgQueryReplica: Kysely<UserStatisticsServicePg>, 58 warehouseQuery: Kysely<UserStatisticsServiceWarehouse>, 59 _fetchUserActionStatistics: ReturnType<typeof makeFetchUserActionStatistics>, 60 _fetchUserSubmissionStatistics: ReturnType< 61 typeof makeFetchUserSubmissionStatistics 62 >, 63) { 64 return { 65 /** 66 * Gets a user's score from the "user scores cache", which is the postgres 67 * table that stores recent (but not necessarily fully up-to-date) scores. 68 */ 69 async getUserScore(orgId: string, userItemIdentifier: ItemIdentifier) { 70 const { score } = (await pgQueryReplica 71 .selectFrom('user_statistics_service.user_scores') 72 .select('score') 73 .where('org_id', '=', orgId) 74 .where('user_type_id', '=', userItemIdentifier.typeId) 75 .where('user_id', '=', userItemIdentifier.id) 76 .executeTakeFirst()) ?? { score: initialUserScore }; 77 78 return score as UserScore; 79 }, 80 81 async handleUsersWithChangedScores( 82 _consumerId: string, 83 _cb: ( 84 changedUsers: { userId: string; userTypeId: string; orgId: string }[], 85 ) => Promise<void>, 86 ) { 87 // Previously consumed warehouse streams on USER_SCORES; disabled until 88 // replaced with a warehouse-agnostic change feed. 89 }, 90 91 async refreshUserScoresCache( 92 _getActionPenalties: ( 93 orgId: string, 94 ) => Promise<ReadonlyDeep<PolicyActionPenalties[]>>, 95 ) { 96 // Previously consumed warehouse streams on SUBMISSION_STATS; disabled until 97 // replaced with a warehouse-agnostic incremental refresh. 98 }, 99 100 async getUserActionCountsByPolicy( 101 orgId: string, 102 userItemIdentifier: ItemIdentifier, 103 ) { 104 return warehouseQuery 105 .selectFrom('USER_STATISTICS_SERVICE.LIFETIME_ACTION_STATS') 106 .select([ 107 'ACTION_ID as actionId', 108 'POLICY_ID as policyId', 109 'ACTOR_ID as actorId', 110 'COUNT as count', 111 'ITEM_SUBMISSION_IDS as itemSubmissionIds', 112 ]) 113 .where('ORG_ID', '=', orgId) 114 .where('USER_TYPE_ID', '=', userItemIdentifier.typeId) 115 .where(sql`LOWER(USER_ID)`, '=', userItemIdentifier.id.toLowerCase()) 116 .execute(); 117 }, 118 119 async getUserSubmissionCount( 120 orgId: string, 121 userItemIdentifier: ItemIdentifier, 122 ) { 123 return warehouseQuery 124 .selectFrom('USER_STATISTICS_SERVICE.SUBMISSION_STATS') 125 .select([ 126 'ITEM_TYPE_ID as itemTypeId', 127 warehouseQuery.fn.sum<number>('NUM_SUBMISSIONS').as('count'), 128 ]) 129 .where('ORG_ID', '=', orgId) 130 .where('USER_TYPE_ID', '=', userItemIdentifier.typeId) 131 .where(sql`LOWER(USER_ID)`, '=', userItemIdentifier.id.toLowerCase()) 132 .groupBy('ITEM_TYPE_ID') 133 .execute(); 134 }, 135 }; 136} 137 138export type UserStatisticsService = ReturnType< 139 typeof makeUserStatisticsService 140>; 141 142export default inject( 143 [ 144 'KyselyPg', 145 'KyselyPgReadReplica', 146 'DataWarehouseDialect', 147 'Tracer', 148 ], 149 makeUserStatisticsService, 150); 151 152export type { UserActionStatistics, UserSubmissionStatistics, UserScore };