Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1import { type Kysely } from 'kysely';
2import _ from 'lodash';
3
4import { inject } from '../iocContainer/utils.js';
5import { type ManualReviewToolServicePg } from '../services/manualReviewToolService/index.js';
6import { MINUTE_MS } from '../utils/time.js';
7
8export default inject(
9 ['closeSharedResourcesForShutdown', 'KyselyPg'],
10 (sharedResourceShutdown, pgQuery: Kysely<ManualReviewToolServicePg>) => ({
11 type: 'Job' as const,
12 async run() {
13 await pgQuery.transaction().execute(async (trx) => {
14 const lastTimestamp = await trx
15 .selectFrom('public.view_maintenance_metadata')
16 .select('last_insert')
17 .where(
18 'table_name',
19 '=',
20 'manual_review_tool.dim_mrt_decisions_materialized',
21 )
22 .executeTakeFirst();
23
24 if (!lastTimestamp) {
25 throw new Error('No last_insert timestamp found for the table');
26 }
27
28 // When last_insert is null (fresh deploy), do a full backfill
29 const isInitialBackfill = lastTimestamp.last_insert == null;
30 const oneMinutePrevious = isInitialBackfill
31 ? new Date(0)
32 : new Date(lastTimestamp.last_insert.valueOf() - MINUTE_MS);
33
34 const insertedRows = await trx
35 .insertInto('manual_review_tool.dim_mrt_decisions_materialized')
36 .expression(
37 trx
38 .selectFrom('manual_review_tool.dim_mrt_decisions')
39 .selectAll()
40 .where('decided_at', '>', oneMinutePrevious),
41 )
42 // since we are going back in time by 1 minute, we will likely be
43 // re-inserting some rows, which should lead to unique constraint
44 // violations. In this case we just want to ignore those rows to avoid
45 // duplicate data
46 .onConflict((oc) => oc.doNothing())
47 .returning('decided_at')
48 .execute();
49
50 if (insertedRows.length === 0) {
51 return;
52 }
53
54 const latestDecidedAt = insertedRows.reduce(
55 (max, row) => (row.decided_at > max ? row.decided_at : max),
56 insertedRows[0].decided_at,
57 );
58
59 await trx
60 .updateTable('public.view_maintenance_metadata')
61 .set({
62 last_insert: latestDecidedAt,
63 })
64 .where(
65 'table_name',
66 '=',
67 'manual_review_tool.dim_mrt_decisions_materialized',
68 )
69 .execute();
70 });
71 },
72 async shutdown() {
73 await sharedResourceShutdown();
74 },
75 }),
76);