Mirror of https://github.com/roostorg/coop
github.com/roostorg/coop
1/* eslint-disable max-lines */
2import type { ItemIdentifier } from '@roostorg/types';
3import _ from 'lodash';
4
5import { type Dependencies } from '../../iocContainer/index.js';
6import {
7 isRealItemIdentifier,
8 itemIdentifierToScyllaItemIdentifier,
9 ScyllaNilItemIdentifier,
10 type Scylla,
11 type ScyllaItemIdentifier,
12} from '../../scylla/index.js';
13import type { ContentApiRequestLogEntry } from '../analyticsLoggers/ContentApiLogger.js';
14import { type RuleExecutionCorrelationId } from '../analyticsLoggers/ruleExecutionLoggingUtils.js';
15import { filterNullOrUndefined } from '../../utils/collections.js';
16import {
17 fromCorrelationId,
18 type CorrelationId,
19} from '../../utils/correlationIds.js';
20import { jsonStringify, type JsonOf } from '../../utils/encoding.js';
21import {
22 chunkAsyncIterableByKey,
23 mapAsyncIterable,
24} from '../../utils/iterables.js';
25import { DAY_MS, MONTH_MS } from '../../utils/time.js';
26import { tryParseNonEmptyString } from '../../utils/typescript-types.js';
27import {
28 getFieldValueForRole,
29 itemSubmissionToItemSubmissionWithTypeIdentifier,
30 type ItemSubmissionWithTypeIdentifier,
31 type NormalizedItemData,
32} from '../itemProcessingService/index.js';
33import { type ReportingRuleExecutionCorrelationId } from '../reportingService/index.js';
34import {
35 type ScyllaItemSubmissionsRow,
36 type ScyllaRelations,
37} from './dbTypes.js';
38import {
39 RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB,
40 type ReturnUnlimitedResultsAndPotentiallyHangDb,
41} from './itemInvestigationServiceAdapter.js';
42import {
43 dbRowToItemSubmissionWithItemTypeIdentifier,
44 getEmptyAsyncIterable,
45 getSyntheticThreadId,
46 partitionLatestAndPriorSubmissions,
47} from './utils.js';
48import { type IActionExecutionsAdapter } from '../../plugins/warehouse/queries/IActionExecutionsAdapter.js';
49import {
50 type ContentApiRequestRecord,
51 type IContentApiRequestsAdapter,
52} from '../../plugins/warehouse/queries/IContentApiRequestsAdapter.js';
53import { type SubmissionId } from '../itemProcessingService/makeItemSubmission.js';
54
55/**
56 * The ItemInvestigationService API exposes `limit` parameters
57 * in almost all of its data retrieval methods, which refers to the number of
58 * unique items the consumer would like to receive. Because the underlying data
59 * stores hold item submissions, of which there can be multiple per item,
60 * the service often needs to retrieve more than `limit` number of rows from the
61 * data stores and reduce them down to a set of unique item before returning to
62 * the consumer, and often necessitates issuing multiple queries.
63 * This ARTIFICIAL_LIMIT_MULTIPLIER is used to minimize the amount
64 * of queries the service dispatches to the databases by inflating the `limit`
65 * we provide in queries by our observed ratio of
66 * ((Item Submissions) / (Unique Item Identifiers))
67 */
68const ARTIFICIAL_LIMIT_MULTIPLIER = 1;
69
70/**
71 * For all methods exposed by this class, the consumers are likely calling
72 * "Items" in mind, meaning submissions with unique ItemIdentifiers. the
73 * datastores that support this service deal with Item Submissions, which are
74 * many-to-one with Items. i.e. one item can be submitted multiple times. This
75 * service's API accounts for this by providing a distinction between the latest
76 * submission of an item, and all the prior submissions, and in all streams
77 * emits an object with those two properties to represent that, i.e.
78 * {
79 * latestSubmission: ItemSubmissionWithTypeIdentifier,
80 * priorSubmissions?: ItemSubmissionWithTypeIdentifier[],
81 * }
82 *
83 * The Service makes a best effort to de-duplicate submissions and represent
84 * them in the above format. However, due to the implementation there is still a
85 * possibility that a submission will be represented twice in the output stream.
86 * This occurs when the service has to make multiple queries to the datastores
87 * to fulfill a request, and the full set of submissions that make up an item's
88 * submission history ends up split across the query-boundary.
89 *
90 * The first explanation of this was left in a github comment:
91 * https://github.com/roostorg/coop/pull/1605#issuecomment-1721584595
92 *
93 * and there is a JIRA task for the fix:
94 * https://coop.atlassian.net/browse/COOP-1291
95 *
96 */
97export type SubmissionsForItemWithTypeIdentifier = {
98 latestSubmission: ItemSubmissionWithTypeIdentifier;
99 priorSubmissions?: ItemSubmissionWithTypeIdentifier[];
100};
101
102export class ItemInvestigationService {
103 private readonly scylla: Scylla<ScyllaRelations>;
104 private readonly selectStream: Scylla<ScyllaRelations>['selectStream'];
105 private readonly select: Scylla<ScyllaRelations>['select'];
106 private readonly insert: Scylla<ScyllaRelations>['insert'];
107 constructor(
108 scylla: Scylla<ScyllaRelations>,
109 tracer: Dependencies['Tracer'],
110 private readonly partialItemsEndpoint: Dependencies['PartialItemsService'],
111 private readonly actionExecutionsAdapter: IActionExecutionsAdapter,
112 private readonly contentApiRequestsAdapter: IContentApiRequestsAdapter,
113 private readonly meter: Dependencies['Meter'],
114 ) {
115 this.scylla = scylla;
116
117 this.selectStream = tracer.traced(
118 {
119 resource: 'scylla.client',
120 operation: 'scylla.selectStream',
121 attributesFromArgs(args) {
122 const { from, select, where, ...rest } = args[0];
123 return {
124 'query.from': from,
125 'query.select': jsonStringify(select),
126 'query.where': jsonStringify(where),
127 'query.otherOpts': jsonStringify(rest),
128 };
129 },
130 },
131 this.scylla.selectStream.bind(this.scylla),
132 );
133 this.select = tracer.traced(
134 {
135 resource: 'scylla.client',
136 operation: 'scylla.select',
137 attributesFromArgs(args) {
138 const { from, select, where, ...rest } = args[0];
139 return {
140 'query.from': from,
141 'query.select': jsonStringify(select),
142 'query.where': jsonStringify(where),
143 'query.otherOpts': jsonStringify(rest),
144 };
145 },
146 },
147 this.scylla.select.bind(this.scylla),
148 );
149 this.insert = tracer.traced(
150 {
151 resource: 'scylla.client',
152 operation: 'scylla.insert',
153 attributesFromArgs(args) {
154 const { into } = args[0];
155 return {
156 'query.into': into,
157 };
158 },
159 },
160 this.scylla.insert.bind(this.scylla),
161 );
162 }
163
164 async insertItem(
165 // The failure Reason would always be set to 'null'
166 // so we omit it from the argument type.
167 data: Omit<
168 ContentApiRequestLogEntry<false>,
169 'failureReason' | 'requestId'
170 > & {
171 itemSubmission: { submissionTime: Date };
172 requestId: CorrelationId<
173 | RuleExecutionCorrelationId
174 | ReportingRuleExecutionCorrelationId
175 | CorrelationId<'submit-appeal'>
176 >;
177 },
178 ): Promise<void> {
179 const item = data.itemSubmission;
180 const { itemType } = data.itemSubmission;
181
182 const createdAtFromSchema = getFieldValueForRole(
183 itemType.schema,
184 itemType.schemaFieldRoles,
185 'createdAt',
186 item.data,
187 );
188 const syntheticCreatedAt = createdAtFromSchema
189 ? new Date(createdAtFromSchema)
190 : item.submissionTime;
191
192 const [threadId, parentId] =
193 itemType.kind === 'CONTENT'
194 ? [
195 getFieldValueForRole(
196 itemType.schema,
197 itemType.schemaFieldRoles,
198 'threadId',
199 item.data,
200 ),
201 getFieldValueForRole(
202 itemType.schema,
203 itemType.schemaFieldRoles,
204 'parentId',
205 item.data,
206 ),
207 ]
208 : [];
209
210 const itemIdentifier = { id: item.itemId, typeId: itemType.id };
211 const syntheticThreadId = getSyntheticThreadId(itemIdentifier, threadId);
212
213 await this.insert({
214 into: 'item_submission_by_thread',
215 row: {
216 org_id: data.orgId,
217 request_id: fromCorrelationId(data.requestId),
218 submission_id: item.submissionId,
219 item_identifier: itemIdentifierToScyllaItemIdentifier(itemIdentifier),
220 item_type_name: itemType.name,
221 item_type_version: itemType.version,
222 item_creator_identifier: item.creator
223 ? itemIdentifierToScyllaItemIdentifier(item.creator)
224 : ScyllaNilItemIdentifier,
225 item_data: jsonStringify(item.data),
226 //TODO: create datestamp in submitItems route and use
227 // for write to both Scylla and the data warehouse
228 item_submission_time: new Date(),
229 item_synthetic_created_at: syntheticCreatedAt,
230 synthetic_thread_id: syntheticThreadId,
231 thread_identifier: threadId
232 ? itemIdentifierToScyllaItemIdentifier(threadId)
233 : ScyllaNilItemIdentifier,
234 parent_identifier: parentId
235 ? itemIdentifierToScyllaItemIdentifier(parentId)
236 : ScyllaNilItemIdentifier,
237 item_type_schema_field_roles: jsonStringify(itemType.schemaFieldRoles),
238 item_type_schema: jsonStringify(itemType.schema),
239 item_type_schema_variant: itemType.schemaVariant,
240 },
241 });
242
243 //TODO: if parent_id, read parent by id and update ttl
244 }
245
246 /**
247 * the returned object's fields (when all streams are closed)
248 * will have the following order:
249 *
250 * {
251 * parents: from child back to root
252 * subsequentItems: chronological order
253 * priorItems: chronological order
254 * }
255 */
256 getThreadSubmissionsByPosition(opts: {
257 orgId: string;
258 threadId: ItemIdentifier;
259 parentId: ItemIdentifier | null;
260 siblingsSplitAtDate: Date;
261 numPriorSiblings?: number;
262 numSubsequentSiblings?: number;
263 numParentLevels?: number;
264 oldestReturnedSubmissionDate?: Date;
265 latestSubmissionsOnly?: boolean;
266 }): {
267 parents: AsyncIterable<SubmissionsForItemWithTypeIdentifier>;
268 priorSiblings: AsyncIterable<SubmissionsForItemWithTypeIdentifier>;
269 subsequentSiblings: AsyncIterable<SubmissionsForItemWithTypeIdentifier>;
270 } {
271 const {
272 orgId,
273 threadId,
274 parentId,
275 numPriorSiblings = 5,
276 numSubsequentSiblings = 0,
277 numParentLevels = 3,
278 siblingsSplitAtDate,
279 latestSubmissionsOnly = true,
280 oldestReturnedSubmissionDate = new Date(
281 Date.now() - 1000 * 60 * 60 * 24 * 7,
282 ),
283 } = opts;
284
285 const syntheticThreadId = getSyntheticThreadId(threadId);
286 const parentIdentifier = parentId
287 ? itemIdentifierToScyllaItemIdentifier(parentId)
288 : ScyllaNilItemIdentifier;
289
290 return {
291 parents: mapAsyncIterable(
292 this.#getParentStream({
293 orgId,
294 syntheticThreadId,
295 parentIdentifier,
296 numParentLevels,
297 oldestReturnedSubmissionDate,
298 latestSubmissionsOnly,
299 }),
300 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier,
301 ),
302 subsequentSiblings: mapAsyncIterable(
303 this.#getSiblingStream({
304 orgId,
305 syntheticThreadId,
306 parentIdentifier,
307 referenceTime: siblingsSplitAtDate,
308 itemLimit: numSubsequentSiblings,
309 siblingAge: 'younger',
310 latestSubmissionsOnly,
311 }),
312 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier,
313 ),
314 priorSiblings: mapAsyncIterable(
315 this.#getSiblingStream({
316 orgId,
317 syntheticThreadId,
318 parentIdentifier,
319 referenceTime: siblingsSplitAtDate,
320 itemLimit: numPriorSiblings,
321 siblingAge: 'older',
322 }),
323 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier,
324 ),
325 };
326 }
327
328 /**
329 * Retrieves n messages going backwards from a given time and optionally
330 * returns the ancestors of each retrieved item Useful for retrieving the last
331 * n messages in a conversation-like thread.
332 *
333 * The output contains items ordered reverse chronologically (most recent to
334 * oldest), each one bundled with an iterable of its parents, ordered from the
335 * leaf back to the root ancestor.
336 */
337 async *getThreadSubmissionsByTime(opts: {
338 orgId: string;
339 threadId: ItemIdentifier;
340 limit?: number;
341 numParentLevels?: number;
342 newestReturnedSubmissionDate?: Date;
343 oldestReturnedSubmissionDate?: Date;
344 latestSubmissionsOnly?: boolean;
345 }): AsyncIterable<{
346 latestSubmission: ItemSubmissionWithTypeIdentifier;
347 priorSubmissions?: ItemSubmissionWithTypeIdentifier[];
348 parents: AsyncIterable<SubmissionsForItemWithTypeIdentifier>;
349 }> {
350 const {
351 orgId,
352 threadId,
353 limit = 10,
354 numParentLevels = 1,
355 oldestReturnedSubmissionDate = new Date(Date.now() - MONTH_MS * 3),
356 newestReturnedSubmissionDate = new Date(),
357 latestSubmissionsOnly = true,
358 } = opts;
359
360 const syntheticThreadId = getSyntheticThreadId(threadId);
361 /**
362 * The `emptyQueryResult` variable lets us distinguish between the two
363 * cases which cause the loop to yield no results:
364 * 1. The datastore returned items which were for some reason filtered
365 * out, leaving fewer items to return than the `limit` option
366 * specified. In this case we want to perform another query looking
367 * further back in time to find submissions that can be returned
368 * to the client.
369 * 2. The datastore did not have any results at all, in which case
370 * subsequent queries will also not produce any results and the
371 * search should halt
372 */
373 let emptyQueryResult = true;
374 let returnedItemCount = 0;
375 // Add a millisecond so that the startingi date is inclusive in the "<"
376 // query
377 let startingDate = new Date(newestReturnedSubmissionDate.getTime() + 1);
378 const now = Date.now();
379
380 while (
381 returnedItemCount < limit &&
382 startingDate > oldestReturnedSubmissionDate
383 ) {
384 const threadSubmissions = this.selectStream({
385 from: 'item_submission_by_thread_and_time',
386 select: '*',
387 where: [
388 ['org_id', '=', orgId],
389 ['synthetic_thread_id', '=', syntheticThreadId],
390 // we don't want tor return the item that triggered the search,
391 // so the query shohuld not be inclusive of the startingDate
392 // using `<` instead of `<=` also prevents searching infinitely
393 // for a date which only returns a threadItem, which gets filtered out
394 // of the result set
395 ['item_synthetic_created_at', '<', startingDate],
396 ['item_synthetic_created_at', '>=', oldestReturnedSubmissionDate],
397 ],
398 limit: Math.floor(
399 (limit - returnedItemCount) * ARTIFICIAL_LIMIT_MULTIPLIER,
400 ),
401 // the materialized view is ordered this way,
402 // but this makes the query more explicit
403 sortOrder: [['item_synthetic_created_at', 'DESC']],
404 });
405
406 const submissionGroups = chunkAsyncIterableByKey(
407 threadSubmissions,
408 (it: ScyllaItemSubmissionsRow) => jsonStringify(it.item_identifier),
409 );
410
411 const scyllaThreadId = itemIdentifierToScyllaItemIdentifier(threadId);
412 for await (const submissionsForItem of submissionGroups) {
413 const { latestSubmission, priorSubmissions } =
414 partitionLatestAndPriorSubmissions(submissionsForItem);
415
416 // Counting old records to find out how often we would be missing data
417 // if we reduced the Scylla TTL to 14 days
418 const difference = Math.abs(
419 now - latestSubmission.item_submission_time.getTime(),
420 );
421 const daysOld = Math.floor(difference / DAY_MS);
422 this.meter.scyllaRecordAgeHistogram.record(daysOld);
423
424 // if the item identifier is identical to the thread identifier, then
425 // the thread item itself is present in the scylla partition for this thread
426 // but we don't want to return it to the client since it is not a logically
427 // valid item within the thread (itself).
428 // So if we find an identical item_id/thread_id pair we don't yield anything
429 // to the stream and move on with the search
430 if (!_.isEqual(latestSubmission.item_identifier, scyllaThreadId)) {
431 const parents = isRealItemIdentifier(
432 latestSubmission.parent_identifier,
433 )
434 ? mapAsyncIterable(
435 this.#getParentStream({
436 orgId,
437 syntheticThreadId,
438 parentIdentifier: latestSubmission.parent_identifier,
439 numParentLevels,
440 oldestReturnedSubmissionDate,
441 latestSubmissionsOnly,
442 }),
443 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier,
444 )
445 : getEmptyAsyncIterable<SubmissionsForItemWithTypeIdentifier>();
446
447 yield {
448 ...scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier({
449 latestSubmission,
450 priorSubmissions: latestSubmissionsOnly
451 ? undefined
452 : priorSubmissions,
453 }),
454 parents,
455 };
456
457 // manually track how many distinct items have been returned
458 returnedItemCount++;
459 }
460 emptyQueryResult = false;
461 if (returnedItemCount >= limit) {
462 break;
463 }
464 // set the new date to work backwards from
465 startingDate = latestSubmission.item_synthetic_created_at;
466 }
467 if (emptyQueryResult) {
468 break;
469 }
470 emptyQueryResult = true;
471 }
472 }
473
474 /**
475 * NB: in this function, getting priorSubmissions is best effort. You aren't
476 * guaranteed to get all prior submissions, you may receive some or none of
477 * them, even if they exist.
478 *
479 * NB: Right now, we check the partial items endpoint before checking
480 * the data warehouse. This is likely fine for now, but the partial items endpoint
481 * could return an entirely different submission (if the data has been mutated
482 * on their side), which could lead to unexpected behavior. It's fine to keep
483 * for now but we should keep an eye out for bugs that could stem from this.
484 */
485 async getItemByIdentifier(opts: {
486 orgId: string;
487 itemIdentifier: ItemIdentifier;
488 latestSubmissionOnly?: boolean;
489 signal?: AbortSignal;
490 }): Promise<SubmissionsForItemWithTypeIdentifier | null> {
491 const { orgId, itemIdentifier, latestSubmissionOnly = true, signal } = opts;
492
493 // Attempt #1: Pull the item from scylla
494 // If this fails for any reason, just coerce the error to an empty result
495 // (in `.catch()`) so that we'll move on to checking the next fallback source.
496 const queryResults = await this.select({
497 from: 'item_submission_by_thread',
498 select: '*',
499 where: [
500 [
501 'item_identifier',
502 '=',
503 itemIdentifierToScyllaItemIdentifier(itemIdentifier),
504 ],
505 ],
506 }).catch((_) => []);
507
508 if (queryResults.length) {
509 const { latestSubmission, priorSubmissions } =
510 partitionLatestAndPriorSubmissions(queryResults);
511
512 return scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier({
513 latestSubmission,
514 priorSubmissions: latestSubmissionOnly ? undefined : priorSubmissions,
515 });
516 }
517
518 signal?.throwIfAborted();
519
520 // Attempt #2: Check if there's a partial items endpoint, and if there is,
521 // attempt to fetch the item's data from there. Most users won't have a
522 // partial items endpoint, so this should only apply in a handful of cases
523 // If this fails for any reason, just coerce the error to an empty array so
524 // that we'll move on to trying the data warehouse.
525 const partialItemsResult = await this.partialItemsEndpoint
526 .getPartialItems(orgId, [itemIdentifier])
527 .catch((_e) => []);
528
529 if (partialItemsResult.length > 0) {
530 const itemFromPartialItemsEndpoint = partialItemsResult[0];
531
532 return {
533 latestSubmission: itemSubmissionToItemSubmissionWithTypeIdentifier(
534 itemFromPartialItemsEndpoint,
535 ),
536 priorSubmissions: latestSubmissionOnly ? undefined : [],
537 };
538 }
539
540 signal?.throwIfAborted();
541
542 // Attempt #3: Fetch the item from the data warehouse via the content API adapter.
543 const records =
544 await this.contentApiRequestsAdapter.getSuccessfulRequestsForItem(
545 orgId,
546 itemIdentifier,
547 {
548 latestOnly: latestSubmissionOnly,
549 lookbackWindowMs: 6 * MONTH_MS,
550 },
551 );
552
553 if (records.length > 0) {
554 const toItemSubmissionWithTypeIdentifier = (
555 record: ContentApiRequestRecord,
556 ) => {
557 const submissionId = record.submissionId as SubmissionId;
558 const itemData = record.itemData as JsonOf<NormalizedItemData>;
559 const schemaVariant = record.itemTypeSchemaVariant as
560 | 'original'
561 | 'partial';
562
563 return dbRowToItemSubmissionWithItemTypeIdentifier({
564 submission_id: submissionId,
565 item_identifier: {
566 id: tryParseNonEmptyString(itemIdentifier.id),
567 type_id: tryParseNonEmptyString(itemIdentifier.typeId),
568 },
569 item_type_version: record.itemTypeVersion,
570 item_creator_identifier:
571 record.itemCreatorId && record.itemCreatorTypeId
572 ? {
573 id: tryParseNonEmptyString(record.itemCreatorId),
574 type_id: tryParseNonEmptyString(record.itemCreatorTypeId),
575 }
576 : ({ id: '', type_id: '' } as const),
577 item_data: itemData,
578 item_submission_time: record.occurredAt,
579 item_type_schema_variant: schemaVariant,
580 });
581 };
582
583 const [latestRecord, ...priorRecords] = records;
584
585 return {
586 latestSubmission: toItemSubmissionWithTypeIdentifier(latestRecord),
587 priorSubmissions: latestSubmissionOnly
588 ? undefined
589 : priorRecords.map(toItemSubmissionWithTypeIdentifier),
590 };
591 }
592
593 return null;
594 }
595
596 getAncestorItems(opts: {
597 orgId: string;
598 itemIdentifier: ItemIdentifier;
599 numParentLevels: number;
600 oldestReturnedSubmissionDate?: Date;
601 latestSubmissionsOnly?: boolean;
602 }): AsyncIterable<SubmissionsForItemWithTypeIdentifier> {
603 return mapAsyncIterable(
604 this.getAncestorItemStream(opts),
605 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier,
606 );
607 }
608
609 /**
610 * This differs from the `getParentStream` method in that it doesn't ensure
611 * items are part of the same thread, to support parent-child relationships
612 * that don't have a connecting thread ID.
613 * This method returns an async iterable of a given item's ancestors, i.e.
614 * its parent, that posts parent, etc
615 */
616 async *getAncestorItemStream(opts: {
617 orgId: string;
618 itemIdentifier: ItemIdentifier;
619 numParentLevels: number;
620 oldestReturnedSubmissionDate?: Date;
621 latestSubmissionsOnly?: boolean;
622 }): AsyncIterable<{
623 latestSubmission: ScyllaItemSubmissionsRow;
624 priorSubmissions?: ScyllaItemSubmissionsRow[];
625 }> {
626 const {
627 orgId,
628 numParentLevels,
629 oldestReturnedSubmissionDate = new Date(
630 Date.now() - 1000 * 60 * 60 * 24 * 7,
631 ),
632 latestSubmissionsOnly = true,
633 itemIdentifier,
634 } = opts;
635 // This is the given item, so we should not yield the first item found in
636 // the results
637 let currentParent: ScyllaItemIdentifier =
638 itemIdentifierToScyllaItemIdentifier(itemIdentifier);
639
640 for (
641 let i = 0;
642 i < numParentLevels + 1 && isRealItemIdentifier(currentParent);
643 i++
644 ) {
645 const potentialParents = await this.select({
646 select: '*',
647 from: 'item_submission_by_thread',
648 where: [
649 // using the GSI on item_identifier, we can't also constrain by
650 // synthetic_created_at or org_id :(
651 ['item_identifier', '=', currentParent],
652 ],
653 });
654
655 // This first filters out any parents that are somehow part of a different
656 // org. Then splits the submissions between
657 // the most recent submission of that item and all its prior submissions
658 // because the GSI we are using disallows constraining the
659 // synthetic_created_at column, we also use this filter to filter out
660 // submissions older than the oldestReturnedSubmissionDate
661 const parentSubmissions = potentialParents.filter(
662 (row) =>
663 row.item_synthetic_created_at >= oldestReturnedSubmissionDate &&
664 row.org_id === orgId,
665 );
666
667 const { latestSubmission, priorSubmissions } =
668 partitionLatestAndPriorSubmissions(parentSubmissions);
669
670 // the no-unnecessary condition check assumes that all array accesses are
671 // valid since the type of the array does not include `| undefined` but
672 // out of bounds accesses in js allow this behavior. So while TS thinks
673 // the parentItemRow will always be of type `ScyllaItemSubmissionsRow` (or similar), it
674 // is possible that it doesn't exist and we want to prevent the generator
675 // from yielding `undefined`
676 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
677 if (latestSubmission) {
678 currentParent = latestSubmission.parent_identifier;
679 if (i === 0) {
680 // to avoid yielding the given item as its own parent, don't yield on
681 // the first iteration
682 continue;
683 }
684 yield {
685 latestSubmission,
686 priorSubmissions: latestSubmissionsOnly
687 ? undefined
688 : priorSubmissions,
689 };
690 } else {
691 break;
692 }
693 }
694 }
695
696 async *#getParentStream(opts: {
697 orgId: string;
698 parentIdentifier: ScyllaItemIdentifier;
699 syntheticThreadId: string;
700 numParentLevels: number;
701 oldestReturnedSubmissionDate: Date;
702 latestSubmissionsOnly?: boolean;
703 }): AsyncIterable<{
704 latestSubmission: ScyllaItemSubmissionsRow;
705 priorSubmissions?: ScyllaItemSubmissionsRow[];
706 }> {
707 const {
708 orgId,
709 parentIdentifier,
710 syntheticThreadId,
711 numParentLevels,
712 oldestReturnedSubmissionDate,
713 latestSubmissionsOnly = true,
714 } = opts;
715 let currentParent = parentIdentifier;
716
717 for (
718 let i = 0;
719 i < numParentLevels && isRealItemIdentifier(currentParent);
720 i++
721 ) {
722 const potentialParents = await this.select({
723 select: '*',
724 from: 'item_submission_by_thread',
725 where: [
726 // using the GSI on item_identifier, we can't also constrain by
727 // synthetic_created_at or org_id :(
728 ['item_identifier', '=', currentParent],
729 ],
730 });
731
732 // This first filters out any parents that are somehow part of a different
733 // thread, or from a different org. Then splits the submissions between
734 // the most recent submission of that item and all its prior submissions
735 // because the GSI we are using disallows constraining the
736 // synthetic_created_at column, we also use this filter to filter out
737 // submissions older than the oldestReturnedSubmissionDate
738 //
739 // TODO: Peter/Nick , we should consider item type relationships
740 // where there is a parent-child relationship but no connecting
741 // thread ID, maybe that can be handled with a different query
742 // specifically for ancestors of a given item
743 const parentSubmissions = potentialParents.filter(
744 (row) =>
745 row.item_synthetic_created_at >= oldestReturnedSubmissionDate &&
746 row.synthetic_thread_id === syntheticThreadId &&
747 row.org_id === orgId,
748 );
749
750 const { latestSubmission, priorSubmissions } =
751 partitionLatestAndPriorSubmissions(parentSubmissions);
752
753 // the no-unnecessary condition check assumes that all array accesses are
754 // valid since the type of the array does not include `| undefined` but
755 // out of bounds accesses in js allow this behavior. So while TS thinks
756 // the parentItemRow will always be of type `ScyllaItemSubmissionsRow` (or similar), it
757 // is possible that it doesn't exist and we want to prevent the generator
758 // from yielding `undefined`
759 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
760 if (latestSubmission) {
761 yield {
762 latestSubmission,
763 priorSubmissions: latestSubmissionsOnly
764 ? undefined
765 : priorSubmissions,
766 };
767 currentParent = latestSubmission.parent_identifier;
768 } else {
769 break;
770 }
771 }
772 //TODO: Fallback to the data warehouse for parents not found
773 // in the initial Scylla query
774 }
775
776 async *#getSiblingStream(opts: {
777 orgId: string;
778 syntheticThreadId: string;
779 parentIdentifier: ScyllaItemIdentifier;
780 referenceTime: Date;
781 siblingAge: 'older' | 'younger';
782 itemLimit: number;
783 latestSubmissionsOnly?: boolean;
784 }): AsyncIterable<{
785 latestSubmission: ScyllaItemSubmissionsRow;
786 priorSubmissions?: ScyllaItemSubmissionsRow[];
787 }> {
788 const {
789 orgId,
790 syntheticThreadId,
791 parentIdentifier,
792 referenceTime,
793 itemLimit,
794 siblingAge,
795 latestSubmissionsOnly = true,
796 } = opts;
797
798 if (itemLimit <= 0) {
799 return getEmptyAsyncIterable();
800 }
801
802 let returnedItemCount = 0;
803 let emptyQueryResult = true;
804 let searchStartDate = siblingAge === 'older' ? referenceTime : new Date();
805 while (returnedItemCount < itemLimit) {
806 const stream = this.selectStream({
807 select: '*',
808 from: 'item_submission_by_thread',
809 where: [
810 ['org_id', '=', orgId],
811 ['synthetic_thread_id', '=', syntheticThreadId],
812 ['parent_identifier', '=', parentIdentifier],
813 [
814 'item_synthetic_created_at',
815 siblingAge === 'older' ? '<' : '>',
816 referenceTime,
817 ],
818 // Time must be compared to the option provided, referenceTime,
819 // and the internal searchStartDate
820 ['item_synthetic_created_at', '<', searchStartDate],
821 ],
822 limit: Math.floor(itemLimit * ARTIFICIAL_LIMIT_MULTIPLIER),
823 });
824
825 const submissionGroups = chunkAsyncIterableByKey(stream, (it) =>
826 jsonStringify(it.item_identifier),
827 );
828
829 for await (const submissionsForItem of submissionGroups) {
830 const { latestSubmission, priorSubmissions } =
831 partitionLatestAndPriorSubmissions(submissionsForItem);
832
833 yield {
834 latestSubmission,
835 priorSubmissions: latestSubmissionsOnly
836 ? undefined
837 : priorSubmissions,
838 };
839
840 returnedItemCount++;
841 emptyQueryResult = false;
842 if (returnedItemCount >= itemLimit) {
843 break;
844 }
845 searchStartDate = latestSubmission.item_synthetic_created_at;
846 }
847 if (emptyQueryResult) {
848 break;
849 }
850 emptyQueryResult = true;
851 }
852 }
853
854 /**
855 * Items are returned in reverse chronological order starting at the
856 * present moment and going back in time until the limit is reached,
857 * the oldestReturnedSubmissionDate is passed, or no more items are
858 * found in any of the available datastores
859 */
860 async *getItemSubmissionsByCreator(opts: {
861 orgId: string;
862 itemCreatorIdentifier: ItemIdentifier;
863 limit?: number | ReturnUnlimitedResultsAndPotentiallyHangDb;
864 oldestReturnedSubmissionDate?: Date;
865 earliestReturnedSubmissionDate?: Date;
866 latestSubmissionsOnly?: boolean;
867 }): AsyncIterable<SubmissionsForItemWithTypeIdentifier> {
868 const {
869 orgId,
870 itemCreatorIdentifier,
871 limit = 100,
872 oldestReturnedSubmissionDate = new Date(
873 Date.now() - 30 * 24 * 60 * 60 * 1000,
874 ),
875 earliestReturnedSubmissionDate = new Date(),
876 latestSubmissionsOnly = true,
877 } = opts;
878
879 let returnedItemCount = 0;
880 let emptyQueryResult = true;
881 let searchStartDate = earliestReturnedSubmissionDate;
882 const now = Date.now();
883 while (
884 limit === RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB ||
885 returnedItemCount < limit
886 ) {
887 const stream = this.selectStream({
888 from: 'item_submission_by_creator',
889 select: '*',
890 where: [
891 ['org_id', '=', orgId],
892 [
893 'item_creator_identifier',
894 '=',
895 itemIdentifierToScyllaItemIdentifier(itemCreatorIdentifier),
896 ],
897 ['item_synthetic_created_at', '<', searchStartDate],
898 ['item_synthetic_created_at', '>', oldestReturnedSubmissionDate],
899 ],
900 limit:
901 limit === RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB
902 ? undefined
903 : Math.floor(
904 (limit - returnedItemCount) * ARTIFICIAL_LIMIT_MULTIPLIER,
905 ),
906 sortOrder: [['item_synthetic_created_at', 'DESC']],
907 });
908
909 const groupedStream = chunkAsyncIterableByKey(
910 stream,
911 (it: ScyllaItemSubmissionsRow) => jsonStringify(it.item_identifier),
912 );
913
914 for await (const itemGroup of groupedStream) {
915 const { latestSubmission, priorSubmissions } =
916 partitionLatestAndPriorSubmissions(itemGroup);
917 // Counting old records to find out how often we would be missing data
918 // if we reduced the Scylla TTL to 14 days
919 const difference = Math.abs(
920 now - latestSubmission.item_submission_time.getTime(),
921 );
922 const daysOld = Math.floor(difference / DAY_MS);
923 this.meter.scyllaRecordAgeHistogram.record(daysOld);
924 yield {
925 latestSubmission:
926 dbRowToItemSubmissionWithItemTypeIdentifier(latestSubmission),
927 priorSubmissions: latestSubmissionsOnly
928 ? undefined
929 : priorSubmissions.map(dbRowToItemSubmissionWithItemTypeIdentifier),
930 };
931 // manually track how many distinct items have been returned
932 returnedItemCount++;
933 emptyQueryResult = false;
934 if (
935 limit !== RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB &&
936 returnedItemCount >= limit
937 ) {
938 break;
939 }
940 searchStartDate = latestSubmission.item_synthetic_created_at;
941 }
942 if (emptyQueryResult) {
943 break;
944 }
945 emptyQueryResult = true;
946 }
947 }
948
949 async #updateItemSubmissionTTL(_opts: {
950 org_id: string;
951 itemID: ItemIdentifier;
952 submissionID: string;
953 }) {
954 // find by ID
955 // INSERT all rows WITH TTL 30 days
956 throw new Error('Not Implemented');
957 }
958
959 async getItemActionHistory(opts: {
960 orgId: string;
961 itemId: string;
962 itemTypeId: string;
963 itemSubmissionTime: Date | undefined;
964 }) {
965 const { itemId, itemTypeId, orgId, itemSubmissionTime } = opts;
966
967 const records = await this.actionExecutionsAdapter.getItemActionHistory({
968 orgId,
969 itemId,
970 itemTypeId,
971 itemSubmissionTime,
972 });
973
974 return filterNullOrUndefined(
975 records.map((record) => {
976 if (!record.itemId || !record.itemTypeId) {
977 return undefined;
978 }
979
980 return {
981 actionId: record.actionId,
982 itemId: record.itemId,
983 itemTypeId: record.itemTypeId,
984 actorId: record.actorId ?? undefined,
985 jobId: record.jobId ?? undefined,
986 itemCreatorId: record.userId ?? undefined,
987 itemCreatorTypeId: record.userTypeId ?? undefined,
988 policies: record.policies,
989 ruleIds: record.ruleIds,
990 ts: record.occurredAt,
991 };
992 }),
993 );
994 }
995}
996
997function scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier(it: {
998 latestSubmission: ScyllaItemSubmissionsRow;
999 priorSubmissions?: ScyllaItemSubmissionsRow[];
1000}) {
1001 return {
1002 latestSubmission: dbRowToItemSubmissionWithItemTypeIdentifier(
1003 it.latestSubmission,
1004 ),
1005 priorSubmissions: it.priorSubmissions?.map(
1006 dbRowToItemSubmissionWithItemTypeIdentifier,
1007 ),
1008 };
1009}