Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 76 lines 2.5 kB view raw
1import { type Kysely } from 'kysely'; 2import _ from 'lodash'; 3 4import { inject } from '../iocContainer/utils.js'; 5import { type ManualReviewToolServicePg } from '../services/manualReviewToolService/index.js'; 6import { MINUTE_MS } from '../utils/time.js'; 7 8export default inject( 9 ['closeSharedResourcesForShutdown', 'KyselyPg'], 10 (sharedResourceShutdown, pgQuery: Kysely<ManualReviewToolServicePg>) => ({ 11 type: 'Job' as const, 12 async run() { 13 await pgQuery.transaction().execute(async (trx) => { 14 const lastTimestamp = await trx 15 .selectFrom('public.view_maintenance_metadata') 16 .select('last_insert') 17 .where( 18 'table_name', 19 '=', 20 'manual_review_tool.dim_mrt_decisions_materialized', 21 ) 22 .executeTakeFirst(); 23 24 if (!lastTimestamp) { 25 throw new Error('No last_insert timestamp found for the table'); 26 } 27 28 // When last_insert is null (fresh deploy), do a full backfill 29 const isInitialBackfill = lastTimestamp.last_insert == null; 30 const oneMinutePrevious = isInitialBackfill 31 ? new Date(0) 32 : new Date(lastTimestamp.last_insert.valueOf() - MINUTE_MS); 33 34 const insertedRows = await trx 35 .insertInto('manual_review_tool.dim_mrt_decisions_materialized') 36 .expression( 37 trx 38 .selectFrom('manual_review_tool.dim_mrt_decisions') 39 .selectAll() 40 .where('decided_at', '>', oneMinutePrevious), 41 ) 42 // since we are going back in time by 1 minute, we will likely be 43 // re-inserting some rows, which should lead to unique constraint 44 // violations. In this case we just want to ignore those rows to avoid 45 // duplicate data 46 .onConflict((oc) => oc.doNothing()) 47 .returning('decided_at') 48 .execute(); 49 50 if (insertedRows.length === 0) { 51 return; 52 } 53 54 const latestDecidedAt = insertedRows.reduce( 55 (max, row) => (row.decided_at > max ? row.decided_at : max), 56 insertedRows[0].decided_at, 57 ); 58 59 await trx 60 .updateTable('public.view_maintenance_metadata') 61 .set({ 62 last_insert: latestDecidedAt, 63 }) 64 .where( 65 'table_name', 66 '=', 67 'manual_review_tool.dim_mrt_decisions_materialized', 68 ) 69 .execute(); 70 }); 71 }, 72 async shutdown() { 73 await sharedResourceShutdown(); 74 }, 75 }), 76);