Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1009 lines 36 kB view raw
1/* eslint-disable max-lines */ 2import type { ItemIdentifier } from '@roostorg/types'; 3import _ from 'lodash'; 4 5import { type Dependencies } from '../../iocContainer/index.js'; 6import { 7 isRealItemIdentifier, 8 itemIdentifierToScyllaItemIdentifier, 9 ScyllaNilItemIdentifier, 10 type Scylla, 11 type ScyllaItemIdentifier, 12} from '../../scylla/index.js'; 13import type { ContentApiRequestLogEntry } from '../analyticsLoggers/ContentApiLogger.js'; 14import { type RuleExecutionCorrelationId } from '../analyticsLoggers/ruleExecutionLoggingUtils.js'; 15import { filterNullOrUndefined } from '../../utils/collections.js'; 16import { 17 fromCorrelationId, 18 type CorrelationId, 19} from '../../utils/correlationIds.js'; 20import { jsonStringify, type JsonOf } from '../../utils/encoding.js'; 21import { 22 chunkAsyncIterableByKey, 23 mapAsyncIterable, 24} from '../../utils/iterables.js'; 25import { DAY_MS, MONTH_MS } from '../../utils/time.js'; 26import { tryParseNonEmptyString } from '../../utils/typescript-types.js'; 27import { 28 getFieldValueForRole, 29 itemSubmissionToItemSubmissionWithTypeIdentifier, 30 type ItemSubmissionWithTypeIdentifier, 31 type NormalizedItemData, 32} from '../itemProcessingService/index.js'; 33import { type ReportingRuleExecutionCorrelationId } from '../reportingService/index.js'; 34import { 35 type ScyllaItemSubmissionsRow, 36 type ScyllaRelations, 37} from './dbTypes.js'; 38import { 39 RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB, 40 type ReturnUnlimitedResultsAndPotentiallyHangDb, 41} from './itemInvestigationServiceAdapter.js'; 42import { 43 dbRowToItemSubmissionWithItemTypeIdentifier, 44 getEmptyAsyncIterable, 45 getSyntheticThreadId, 46 partitionLatestAndPriorSubmissions, 47} from './utils.js'; 48import { type IActionExecutionsAdapter } from '../../plugins/warehouse/queries/IActionExecutionsAdapter.js'; 49import { 50 type ContentApiRequestRecord, 51 type IContentApiRequestsAdapter, 52} from '../../plugins/warehouse/queries/IContentApiRequestsAdapter.js'; 53import { type SubmissionId } from '../itemProcessingService/makeItemSubmission.js'; 54 55/** 56 * The ItemInvestigationService API exposes `limit` parameters 57 * in almost all of its data retrieval methods, which refers to the number of 58 * unique items the consumer would like to receive. Because the underlying data 59 * stores hold item submissions, of which there can be multiple per item, 60 * the service often needs to retrieve more than `limit` number of rows from the 61 * data stores and reduce them down to a set of unique item before returning to 62 * the consumer, and often necessitates issuing multiple queries. 63 * This ARTIFICIAL_LIMIT_MULTIPLIER is used to minimize the amount 64 * of queries the service dispatches to the databases by inflating the `limit` 65 * we provide in queries by our observed ratio of 66 * ((Item Submissions) / (Unique Item Identifiers)) 67 */ 68const ARTIFICIAL_LIMIT_MULTIPLIER = 1; 69 70/** 71 * For all methods exposed by this class, the consumers are likely calling 72 * "Items" in mind, meaning submissions with unique ItemIdentifiers. the 73 * datastores that support this service deal with Item Submissions, which are 74 * many-to-one with Items. i.e. one item can be submitted multiple times. This 75 * service's API accounts for this by providing a distinction between the latest 76 * submission of an item, and all the prior submissions, and in all streams 77 * emits an object with those two properties to represent that, i.e. 78 * { 79 * latestSubmission: ItemSubmissionWithTypeIdentifier, 80 * priorSubmissions?: ItemSubmissionWithTypeIdentifier[], 81 * } 82 * 83 * The Service makes a best effort to de-duplicate submissions and represent 84 * them in the above format. However, due to the implementation there is still a 85 * possibility that a submission will be represented twice in the output stream. 86 * This occurs when the service has to make multiple queries to the datastores 87 * to fulfill a request, and the full set of submissions that make up an item's 88 * submission history ends up split across the query-boundary. 89 * 90 * The first explanation of this was left in a github comment: 91 * https://github.com/roostorg/coop/pull/1605#issuecomment-1721584595 92 * 93 * and there is a JIRA task for the fix: 94 * https://coop.atlassian.net/browse/COOP-1291 95 * 96 */ 97export type SubmissionsForItemWithTypeIdentifier = { 98 latestSubmission: ItemSubmissionWithTypeIdentifier; 99 priorSubmissions?: ItemSubmissionWithTypeIdentifier[]; 100}; 101 102export class ItemInvestigationService { 103 private readonly scylla: Scylla<ScyllaRelations>; 104 private readonly selectStream: Scylla<ScyllaRelations>['selectStream']; 105 private readonly select: Scylla<ScyllaRelations>['select']; 106 private readonly insert: Scylla<ScyllaRelations>['insert']; 107 constructor( 108 scylla: Scylla<ScyllaRelations>, 109 tracer: Dependencies['Tracer'], 110 private readonly partialItemsEndpoint: Dependencies['PartialItemsService'], 111 private readonly actionExecutionsAdapter: IActionExecutionsAdapter, 112 private readonly contentApiRequestsAdapter: IContentApiRequestsAdapter, 113 private readonly meter: Dependencies['Meter'], 114 ) { 115 this.scylla = scylla; 116 117 this.selectStream = tracer.traced( 118 { 119 resource: 'scylla.client', 120 operation: 'scylla.selectStream', 121 attributesFromArgs(args) { 122 const { from, select, where, ...rest } = args[0]; 123 return { 124 'query.from': from, 125 'query.select': jsonStringify(select), 126 'query.where': jsonStringify(where), 127 'query.otherOpts': jsonStringify(rest), 128 }; 129 }, 130 }, 131 this.scylla.selectStream.bind(this.scylla), 132 ); 133 this.select = tracer.traced( 134 { 135 resource: 'scylla.client', 136 operation: 'scylla.select', 137 attributesFromArgs(args) { 138 const { from, select, where, ...rest } = args[0]; 139 return { 140 'query.from': from, 141 'query.select': jsonStringify(select), 142 'query.where': jsonStringify(where), 143 'query.otherOpts': jsonStringify(rest), 144 }; 145 }, 146 }, 147 this.scylla.select.bind(this.scylla), 148 ); 149 this.insert = tracer.traced( 150 { 151 resource: 'scylla.client', 152 operation: 'scylla.insert', 153 attributesFromArgs(args) { 154 const { into } = args[0]; 155 return { 156 'query.into': into, 157 }; 158 }, 159 }, 160 this.scylla.insert.bind(this.scylla), 161 ); 162 } 163 164 async insertItem( 165 // The failure Reason would always be set to 'null' 166 // so we omit it from the argument type. 167 data: Omit< 168 ContentApiRequestLogEntry<false>, 169 'failureReason' | 'requestId' 170 > & { 171 itemSubmission: { submissionTime: Date }; 172 requestId: CorrelationId< 173 | RuleExecutionCorrelationId 174 | ReportingRuleExecutionCorrelationId 175 | CorrelationId<'submit-appeal'> 176 >; 177 }, 178 ): Promise<void> { 179 const item = data.itemSubmission; 180 const { itemType } = data.itemSubmission; 181 182 const createdAtFromSchema = getFieldValueForRole( 183 itemType.schema, 184 itemType.schemaFieldRoles, 185 'createdAt', 186 item.data, 187 ); 188 const syntheticCreatedAt = createdAtFromSchema 189 ? new Date(createdAtFromSchema) 190 : item.submissionTime; 191 192 const [threadId, parentId] = 193 itemType.kind === 'CONTENT' 194 ? [ 195 getFieldValueForRole( 196 itemType.schema, 197 itemType.schemaFieldRoles, 198 'threadId', 199 item.data, 200 ), 201 getFieldValueForRole( 202 itemType.schema, 203 itemType.schemaFieldRoles, 204 'parentId', 205 item.data, 206 ), 207 ] 208 : []; 209 210 const itemIdentifier = { id: item.itemId, typeId: itemType.id }; 211 const syntheticThreadId = getSyntheticThreadId(itemIdentifier, threadId); 212 213 await this.insert({ 214 into: 'item_submission_by_thread', 215 row: { 216 org_id: data.orgId, 217 request_id: fromCorrelationId(data.requestId), 218 submission_id: item.submissionId, 219 item_identifier: itemIdentifierToScyllaItemIdentifier(itemIdentifier), 220 item_type_name: itemType.name, 221 item_type_version: itemType.version, 222 item_creator_identifier: item.creator 223 ? itemIdentifierToScyllaItemIdentifier(item.creator) 224 : ScyllaNilItemIdentifier, 225 item_data: jsonStringify(item.data), 226 //TODO: create datestamp in submitItems route and use 227 // for write to both Scylla and the data warehouse 228 item_submission_time: new Date(), 229 item_synthetic_created_at: syntheticCreatedAt, 230 synthetic_thread_id: syntheticThreadId, 231 thread_identifier: threadId 232 ? itemIdentifierToScyllaItemIdentifier(threadId) 233 : ScyllaNilItemIdentifier, 234 parent_identifier: parentId 235 ? itemIdentifierToScyllaItemIdentifier(parentId) 236 : ScyllaNilItemIdentifier, 237 item_type_schema_field_roles: jsonStringify(itemType.schemaFieldRoles), 238 item_type_schema: jsonStringify(itemType.schema), 239 item_type_schema_variant: itemType.schemaVariant, 240 }, 241 }); 242 243 //TODO: if parent_id, read parent by id and update ttl 244 } 245 246 /** 247 * the returned object's fields (when all streams are closed) 248 * will have the following order: 249 * 250 * { 251 * parents: from child back to root 252 * subsequentItems: chronological order 253 * priorItems: chronological order 254 * } 255 */ 256 getThreadSubmissionsByPosition(opts: { 257 orgId: string; 258 threadId: ItemIdentifier; 259 parentId: ItemIdentifier | null; 260 siblingsSplitAtDate: Date; 261 numPriorSiblings?: number; 262 numSubsequentSiblings?: number; 263 numParentLevels?: number; 264 oldestReturnedSubmissionDate?: Date; 265 latestSubmissionsOnly?: boolean; 266 }): { 267 parents: AsyncIterable<SubmissionsForItemWithTypeIdentifier>; 268 priorSiblings: AsyncIterable<SubmissionsForItemWithTypeIdentifier>; 269 subsequentSiblings: AsyncIterable<SubmissionsForItemWithTypeIdentifier>; 270 } { 271 const { 272 orgId, 273 threadId, 274 parentId, 275 numPriorSiblings = 5, 276 numSubsequentSiblings = 0, 277 numParentLevels = 3, 278 siblingsSplitAtDate, 279 latestSubmissionsOnly = true, 280 oldestReturnedSubmissionDate = new Date( 281 Date.now() - 1000 * 60 * 60 * 24 * 7, 282 ), 283 } = opts; 284 285 const syntheticThreadId = getSyntheticThreadId(threadId); 286 const parentIdentifier = parentId 287 ? itemIdentifierToScyllaItemIdentifier(parentId) 288 : ScyllaNilItemIdentifier; 289 290 return { 291 parents: mapAsyncIterable( 292 this.#getParentStream({ 293 orgId, 294 syntheticThreadId, 295 parentIdentifier, 296 numParentLevels, 297 oldestReturnedSubmissionDate, 298 latestSubmissionsOnly, 299 }), 300 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier, 301 ), 302 subsequentSiblings: mapAsyncIterable( 303 this.#getSiblingStream({ 304 orgId, 305 syntheticThreadId, 306 parentIdentifier, 307 referenceTime: siblingsSplitAtDate, 308 itemLimit: numSubsequentSiblings, 309 siblingAge: 'younger', 310 latestSubmissionsOnly, 311 }), 312 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier, 313 ), 314 priorSiblings: mapAsyncIterable( 315 this.#getSiblingStream({ 316 orgId, 317 syntheticThreadId, 318 parentIdentifier, 319 referenceTime: siblingsSplitAtDate, 320 itemLimit: numPriorSiblings, 321 siblingAge: 'older', 322 }), 323 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier, 324 ), 325 }; 326 } 327 328 /** 329 * Retrieves n messages going backwards from a given time and optionally 330 * returns the ancestors of each retrieved item Useful for retrieving the last 331 * n messages in a conversation-like thread. 332 * 333 * The output contains items ordered reverse chronologically (most recent to 334 * oldest), each one bundled with an iterable of its parents, ordered from the 335 * leaf back to the root ancestor. 336 */ 337 async *getThreadSubmissionsByTime(opts: { 338 orgId: string; 339 threadId: ItemIdentifier; 340 limit?: number; 341 numParentLevels?: number; 342 newestReturnedSubmissionDate?: Date; 343 oldestReturnedSubmissionDate?: Date; 344 latestSubmissionsOnly?: boolean; 345 }): AsyncIterable<{ 346 latestSubmission: ItemSubmissionWithTypeIdentifier; 347 priorSubmissions?: ItemSubmissionWithTypeIdentifier[]; 348 parents: AsyncIterable<SubmissionsForItemWithTypeIdentifier>; 349 }> { 350 const { 351 orgId, 352 threadId, 353 limit = 10, 354 numParentLevels = 1, 355 oldestReturnedSubmissionDate = new Date(Date.now() - MONTH_MS * 3), 356 newestReturnedSubmissionDate = new Date(), 357 latestSubmissionsOnly = true, 358 } = opts; 359 360 const syntheticThreadId = getSyntheticThreadId(threadId); 361 /** 362 * The `emptyQueryResult` variable lets us distinguish between the two 363 * cases which cause the loop to yield no results: 364 * 1. The datastore returned items which were for some reason filtered 365 * out, leaving fewer items to return than the `limit` option 366 * specified. In this case we want to perform another query looking 367 * further back in time to find submissions that can be returned 368 * to the client. 369 * 2. The datastore did not have any results at all, in which case 370 * subsequent queries will also not produce any results and the 371 * search should halt 372 */ 373 let emptyQueryResult = true; 374 let returnedItemCount = 0; 375 // Add a millisecond so that the startingi date is inclusive in the "<" 376 // query 377 let startingDate = new Date(newestReturnedSubmissionDate.getTime() + 1); 378 const now = Date.now(); 379 380 while ( 381 returnedItemCount < limit && 382 startingDate > oldestReturnedSubmissionDate 383 ) { 384 const threadSubmissions = this.selectStream({ 385 from: 'item_submission_by_thread_and_time', 386 select: '*', 387 where: [ 388 ['org_id', '=', orgId], 389 ['synthetic_thread_id', '=', syntheticThreadId], 390 // we don't want tor return the item that triggered the search, 391 // so the query shohuld not be inclusive of the startingDate 392 // using `<` instead of `<=` also prevents searching infinitely 393 // for a date which only returns a threadItem, which gets filtered out 394 // of the result set 395 ['item_synthetic_created_at', '<', startingDate], 396 ['item_synthetic_created_at', '>=', oldestReturnedSubmissionDate], 397 ], 398 limit: Math.floor( 399 (limit - returnedItemCount) * ARTIFICIAL_LIMIT_MULTIPLIER, 400 ), 401 // the materialized view is ordered this way, 402 // but this makes the query more explicit 403 sortOrder: [['item_synthetic_created_at', 'DESC']], 404 }); 405 406 const submissionGroups = chunkAsyncIterableByKey( 407 threadSubmissions, 408 (it: ScyllaItemSubmissionsRow) => jsonStringify(it.item_identifier), 409 ); 410 411 const scyllaThreadId = itemIdentifierToScyllaItemIdentifier(threadId); 412 for await (const submissionsForItem of submissionGroups) { 413 const { latestSubmission, priorSubmissions } = 414 partitionLatestAndPriorSubmissions(submissionsForItem); 415 416 // Counting old records to find out how often we would be missing data 417 // if we reduced the Scylla TTL to 14 days 418 const difference = Math.abs( 419 now - latestSubmission.item_submission_time.getTime(), 420 ); 421 const daysOld = Math.floor(difference / DAY_MS); 422 this.meter.scyllaRecordAgeHistogram.record(daysOld); 423 424 // if the item identifier is identical to the thread identifier, then 425 // the thread item itself is present in the scylla partition for this thread 426 // but we don't want to return it to the client since it is not a logically 427 // valid item within the thread (itself). 428 // So if we find an identical item_id/thread_id pair we don't yield anything 429 // to the stream and move on with the search 430 if (!_.isEqual(latestSubmission.item_identifier, scyllaThreadId)) { 431 const parents = isRealItemIdentifier( 432 latestSubmission.parent_identifier, 433 ) 434 ? mapAsyncIterable( 435 this.#getParentStream({ 436 orgId, 437 syntheticThreadId, 438 parentIdentifier: latestSubmission.parent_identifier, 439 numParentLevels, 440 oldestReturnedSubmissionDate, 441 latestSubmissionsOnly, 442 }), 443 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier, 444 ) 445 : getEmptyAsyncIterable<SubmissionsForItemWithTypeIdentifier>(); 446 447 yield { 448 ...scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier({ 449 latestSubmission, 450 priorSubmissions: latestSubmissionsOnly 451 ? undefined 452 : priorSubmissions, 453 }), 454 parents, 455 }; 456 457 // manually track how many distinct items have been returned 458 returnedItemCount++; 459 } 460 emptyQueryResult = false; 461 if (returnedItemCount >= limit) { 462 break; 463 } 464 // set the new date to work backwards from 465 startingDate = latestSubmission.item_synthetic_created_at; 466 } 467 if (emptyQueryResult) { 468 break; 469 } 470 emptyQueryResult = true; 471 } 472 } 473 474 /** 475 * NB: in this function, getting priorSubmissions is best effort. You aren't 476 * guaranteed to get all prior submissions, you may receive some or none of 477 * them, even if they exist. 478 * 479 * NB: Right now, we check the partial items endpoint before checking 480 * the data warehouse. This is likely fine for now, but the partial items endpoint 481 * could return an entirely different submission (if the data has been mutated 482 * on their side), which could lead to unexpected behavior. It's fine to keep 483 * for now but we should keep an eye out for bugs that could stem from this. 484 */ 485 async getItemByIdentifier(opts: { 486 orgId: string; 487 itemIdentifier: ItemIdentifier; 488 latestSubmissionOnly?: boolean; 489 signal?: AbortSignal; 490 }): Promise<SubmissionsForItemWithTypeIdentifier | null> { 491 const { orgId, itemIdentifier, latestSubmissionOnly = true, signal } = opts; 492 493 // Attempt #1: Pull the item from scylla 494 // If this fails for any reason, just coerce the error to an empty result 495 // (in `.catch()`) so that we'll move on to checking the next fallback source. 496 const queryResults = await this.select({ 497 from: 'item_submission_by_thread', 498 select: '*', 499 where: [ 500 [ 501 'item_identifier', 502 '=', 503 itemIdentifierToScyllaItemIdentifier(itemIdentifier), 504 ], 505 ], 506 }).catch((_) => []); 507 508 if (queryResults.length) { 509 const { latestSubmission, priorSubmissions } = 510 partitionLatestAndPriorSubmissions(queryResults); 511 512 return scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier({ 513 latestSubmission, 514 priorSubmissions: latestSubmissionOnly ? undefined : priorSubmissions, 515 }); 516 } 517 518 signal?.throwIfAborted(); 519 520 // Attempt #2: Check if there's a partial items endpoint, and if there is, 521 // attempt to fetch the item's data from there. Most users won't have a 522 // partial items endpoint, so this should only apply in a handful of cases 523 // If this fails for any reason, just coerce the error to an empty array so 524 // that we'll move on to trying the data warehouse. 525 const partialItemsResult = await this.partialItemsEndpoint 526 .getPartialItems(orgId, [itemIdentifier]) 527 .catch((_e) => []); 528 529 if (partialItemsResult.length > 0) { 530 const itemFromPartialItemsEndpoint = partialItemsResult[0]; 531 532 return { 533 latestSubmission: itemSubmissionToItemSubmissionWithTypeIdentifier( 534 itemFromPartialItemsEndpoint, 535 ), 536 priorSubmissions: latestSubmissionOnly ? undefined : [], 537 }; 538 } 539 540 signal?.throwIfAborted(); 541 542 // Attempt #3: Fetch the item from the data warehouse via the content API adapter. 543 const records = 544 await this.contentApiRequestsAdapter.getSuccessfulRequestsForItem( 545 orgId, 546 itemIdentifier, 547 { 548 latestOnly: latestSubmissionOnly, 549 lookbackWindowMs: 6 * MONTH_MS, 550 }, 551 ); 552 553 if (records.length > 0) { 554 const toItemSubmissionWithTypeIdentifier = ( 555 record: ContentApiRequestRecord, 556 ) => { 557 const submissionId = record.submissionId as SubmissionId; 558 const itemData = record.itemData as JsonOf<NormalizedItemData>; 559 const schemaVariant = record.itemTypeSchemaVariant as 560 | 'original' 561 | 'partial'; 562 563 return dbRowToItemSubmissionWithItemTypeIdentifier({ 564 submission_id: submissionId, 565 item_identifier: { 566 id: tryParseNonEmptyString(itemIdentifier.id), 567 type_id: tryParseNonEmptyString(itemIdentifier.typeId), 568 }, 569 item_type_version: record.itemTypeVersion, 570 item_creator_identifier: 571 record.itemCreatorId && record.itemCreatorTypeId 572 ? { 573 id: tryParseNonEmptyString(record.itemCreatorId), 574 type_id: tryParseNonEmptyString(record.itemCreatorTypeId), 575 } 576 : ({ id: '', type_id: '' } as const), 577 item_data: itemData, 578 item_submission_time: record.occurredAt, 579 item_type_schema_variant: schemaVariant, 580 }); 581 }; 582 583 const [latestRecord, ...priorRecords] = records; 584 585 return { 586 latestSubmission: toItemSubmissionWithTypeIdentifier(latestRecord), 587 priorSubmissions: latestSubmissionOnly 588 ? undefined 589 : priorRecords.map(toItemSubmissionWithTypeIdentifier), 590 }; 591 } 592 593 return null; 594 } 595 596 getAncestorItems(opts: { 597 orgId: string; 598 itemIdentifier: ItemIdentifier; 599 numParentLevels: number; 600 oldestReturnedSubmissionDate?: Date; 601 latestSubmissionsOnly?: boolean; 602 }): AsyncIterable<SubmissionsForItemWithTypeIdentifier> { 603 return mapAsyncIterable( 604 this.getAncestorItemStream(opts), 605 scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier, 606 ); 607 } 608 609 /** 610 * This differs from the `getParentStream` method in that it doesn't ensure 611 * items are part of the same thread, to support parent-child relationships 612 * that don't have a connecting thread ID. 613 * This method returns an async iterable of a given item's ancestors, i.e. 614 * its parent, that posts parent, etc 615 */ 616 async *getAncestorItemStream(opts: { 617 orgId: string; 618 itemIdentifier: ItemIdentifier; 619 numParentLevels: number; 620 oldestReturnedSubmissionDate?: Date; 621 latestSubmissionsOnly?: boolean; 622 }): AsyncIterable<{ 623 latestSubmission: ScyllaItemSubmissionsRow; 624 priorSubmissions?: ScyllaItemSubmissionsRow[]; 625 }> { 626 const { 627 orgId, 628 numParentLevels, 629 oldestReturnedSubmissionDate = new Date( 630 Date.now() - 1000 * 60 * 60 * 24 * 7, 631 ), 632 latestSubmissionsOnly = true, 633 itemIdentifier, 634 } = opts; 635 // This is the given item, so we should not yield the first item found in 636 // the results 637 let currentParent: ScyllaItemIdentifier = 638 itemIdentifierToScyllaItemIdentifier(itemIdentifier); 639 640 for ( 641 let i = 0; 642 i < numParentLevels + 1 && isRealItemIdentifier(currentParent); 643 i++ 644 ) { 645 const potentialParents = await this.select({ 646 select: '*', 647 from: 'item_submission_by_thread', 648 where: [ 649 // using the GSI on item_identifier, we can't also constrain by 650 // synthetic_created_at or org_id :( 651 ['item_identifier', '=', currentParent], 652 ], 653 }); 654 655 // This first filters out any parents that are somehow part of a different 656 // org. Then splits the submissions between 657 // the most recent submission of that item and all its prior submissions 658 // because the GSI we are using disallows constraining the 659 // synthetic_created_at column, we also use this filter to filter out 660 // submissions older than the oldestReturnedSubmissionDate 661 const parentSubmissions = potentialParents.filter( 662 (row) => 663 row.item_synthetic_created_at >= oldestReturnedSubmissionDate && 664 row.org_id === orgId, 665 ); 666 667 const { latestSubmission, priorSubmissions } = 668 partitionLatestAndPriorSubmissions(parentSubmissions); 669 670 // the no-unnecessary condition check assumes that all array accesses are 671 // valid since the type of the array does not include `| undefined` but 672 // out of bounds accesses in js allow this behavior. So while TS thinks 673 // the parentItemRow will always be of type `ScyllaItemSubmissionsRow` (or similar), it 674 // is possible that it doesn't exist and we want to prevent the generator 675 // from yielding `undefined` 676 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 677 if (latestSubmission) { 678 currentParent = latestSubmission.parent_identifier; 679 if (i === 0) { 680 // to avoid yielding the given item as its own parent, don't yield on 681 // the first iteration 682 continue; 683 } 684 yield { 685 latestSubmission, 686 priorSubmissions: latestSubmissionsOnly 687 ? undefined 688 : priorSubmissions, 689 }; 690 } else { 691 break; 692 } 693 } 694 } 695 696 async *#getParentStream(opts: { 697 orgId: string; 698 parentIdentifier: ScyllaItemIdentifier; 699 syntheticThreadId: string; 700 numParentLevels: number; 701 oldestReturnedSubmissionDate: Date; 702 latestSubmissionsOnly?: boolean; 703 }): AsyncIterable<{ 704 latestSubmission: ScyllaItemSubmissionsRow; 705 priorSubmissions?: ScyllaItemSubmissionsRow[]; 706 }> { 707 const { 708 orgId, 709 parentIdentifier, 710 syntheticThreadId, 711 numParentLevels, 712 oldestReturnedSubmissionDate, 713 latestSubmissionsOnly = true, 714 } = opts; 715 let currentParent = parentIdentifier; 716 717 for ( 718 let i = 0; 719 i < numParentLevels && isRealItemIdentifier(currentParent); 720 i++ 721 ) { 722 const potentialParents = await this.select({ 723 select: '*', 724 from: 'item_submission_by_thread', 725 where: [ 726 // using the GSI on item_identifier, we can't also constrain by 727 // synthetic_created_at or org_id :( 728 ['item_identifier', '=', currentParent], 729 ], 730 }); 731 732 // This first filters out any parents that are somehow part of a different 733 // thread, or from a different org. Then splits the submissions between 734 // the most recent submission of that item and all its prior submissions 735 // because the GSI we are using disallows constraining the 736 // synthetic_created_at column, we also use this filter to filter out 737 // submissions older than the oldestReturnedSubmissionDate 738 // 739 // TODO: Peter/Nick , we should consider item type relationships 740 // where there is a parent-child relationship but no connecting 741 // thread ID, maybe that can be handled with a different query 742 // specifically for ancestors of a given item 743 const parentSubmissions = potentialParents.filter( 744 (row) => 745 row.item_synthetic_created_at >= oldestReturnedSubmissionDate && 746 row.synthetic_thread_id === syntheticThreadId && 747 row.org_id === orgId, 748 ); 749 750 const { latestSubmission, priorSubmissions } = 751 partitionLatestAndPriorSubmissions(parentSubmissions); 752 753 // the no-unnecessary condition check assumes that all array accesses are 754 // valid since the type of the array does not include `| undefined` but 755 // out of bounds accesses in js allow this behavior. So while TS thinks 756 // the parentItemRow will always be of type `ScyllaItemSubmissionsRow` (or similar), it 757 // is possible that it doesn't exist and we want to prevent the generator 758 // from yielding `undefined` 759 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 760 if (latestSubmission) { 761 yield { 762 latestSubmission, 763 priorSubmissions: latestSubmissionsOnly 764 ? undefined 765 : priorSubmissions, 766 }; 767 currentParent = latestSubmission.parent_identifier; 768 } else { 769 break; 770 } 771 } 772 //TODO: Fallback to the data warehouse for parents not found 773 // in the initial Scylla query 774 } 775 776 async *#getSiblingStream(opts: { 777 orgId: string; 778 syntheticThreadId: string; 779 parentIdentifier: ScyllaItemIdentifier; 780 referenceTime: Date; 781 siblingAge: 'older' | 'younger'; 782 itemLimit: number; 783 latestSubmissionsOnly?: boolean; 784 }): AsyncIterable<{ 785 latestSubmission: ScyllaItemSubmissionsRow; 786 priorSubmissions?: ScyllaItemSubmissionsRow[]; 787 }> { 788 const { 789 orgId, 790 syntheticThreadId, 791 parentIdentifier, 792 referenceTime, 793 itemLimit, 794 siblingAge, 795 latestSubmissionsOnly = true, 796 } = opts; 797 798 if (itemLimit <= 0) { 799 return getEmptyAsyncIterable(); 800 } 801 802 let returnedItemCount = 0; 803 let emptyQueryResult = true; 804 let searchStartDate = siblingAge === 'older' ? referenceTime : new Date(); 805 while (returnedItemCount < itemLimit) { 806 const stream = this.selectStream({ 807 select: '*', 808 from: 'item_submission_by_thread', 809 where: [ 810 ['org_id', '=', orgId], 811 ['synthetic_thread_id', '=', syntheticThreadId], 812 ['parent_identifier', '=', parentIdentifier], 813 [ 814 'item_synthetic_created_at', 815 siblingAge === 'older' ? '<' : '>', 816 referenceTime, 817 ], 818 // Time must be compared to the option provided, referenceTime, 819 // and the internal searchStartDate 820 ['item_synthetic_created_at', '<', searchStartDate], 821 ], 822 limit: Math.floor(itemLimit * ARTIFICIAL_LIMIT_MULTIPLIER), 823 }); 824 825 const submissionGroups = chunkAsyncIterableByKey(stream, (it) => 826 jsonStringify(it.item_identifier), 827 ); 828 829 for await (const submissionsForItem of submissionGroups) { 830 const { latestSubmission, priorSubmissions } = 831 partitionLatestAndPriorSubmissions(submissionsForItem); 832 833 yield { 834 latestSubmission, 835 priorSubmissions: latestSubmissionsOnly 836 ? undefined 837 : priorSubmissions, 838 }; 839 840 returnedItemCount++; 841 emptyQueryResult = false; 842 if (returnedItemCount >= itemLimit) { 843 break; 844 } 845 searchStartDate = latestSubmission.item_synthetic_created_at; 846 } 847 if (emptyQueryResult) { 848 break; 849 } 850 emptyQueryResult = true; 851 } 852 } 853 854 /** 855 * Items are returned in reverse chronological order starting at the 856 * present moment and going back in time until the limit is reached, 857 * the oldestReturnedSubmissionDate is passed, or no more items are 858 * found in any of the available datastores 859 */ 860 async *getItemSubmissionsByCreator(opts: { 861 orgId: string; 862 itemCreatorIdentifier: ItemIdentifier; 863 limit?: number | ReturnUnlimitedResultsAndPotentiallyHangDb; 864 oldestReturnedSubmissionDate?: Date; 865 earliestReturnedSubmissionDate?: Date; 866 latestSubmissionsOnly?: boolean; 867 }): AsyncIterable<SubmissionsForItemWithTypeIdentifier> { 868 const { 869 orgId, 870 itemCreatorIdentifier, 871 limit = 100, 872 oldestReturnedSubmissionDate = new Date( 873 Date.now() - 30 * 24 * 60 * 60 * 1000, 874 ), 875 earliestReturnedSubmissionDate = new Date(), 876 latestSubmissionsOnly = true, 877 } = opts; 878 879 let returnedItemCount = 0; 880 let emptyQueryResult = true; 881 let searchStartDate = earliestReturnedSubmissionDate; 882 const now = Date.now(); 883 while ( 884 limit === RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB || 885 returnedItemCount < limit 886 ) { 887 const stream = this.selectStream({ 888 from: 'item_submission_by_creator', 889 select: '*', 890 where: [ 891 ['org_id', '=', orgId], 892 [ 893 'item_creator_identifier', 894 '=', 895 itemIdentifierToScyllaItemIdentifier(itemCreatorIdentifier), 896 ], 897 ['item_synthetic_created_at', '<', searchStartDate], 898 ['item_synthetic_created_at', '>', oldestReturnedSubmissionDate], 899 ], 900 limit: 901 limit === RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB 902 ? undefined 903 : Math.floor( 904 (limit - returnedItemCount) * ARTIFICIAL_LIMIT_MULTIPLIER, 905 ), 906 sortOrder: [['item_synthetic_created_at', 'DESC']], 907 }); 908 909 const groupedStream = chunkAsyncIterableByKey( 910 stream, 911 (it: ScyllaItemSubmissionsRow) => jsonStringify(it.item_identifier), 912 ); 913 914 for await (const itemGroup of groupedStream) { 915 const { latestSubmission, priorSubmissions } = 916 partitionLatestAndPriorSubmissions(itemGroup); 917 // Counting old records to find out how often we would be missing data 918 // if we reduced the Scylla TTL to 14 days 919 const difference = Math.abs( 920 now - latestSubmission.item_submission_time.getTime(), 921 ); 922 const daysOld = Math.floor(difference / DAY_MS); 923 this.meter.scyllaRecordAgeHistogram.record(daysOld); 924 yield { 925 latestSubmission: 926 dbRowToItemSubmissionWithItemTypeIdentifier(latestSubmission), 927 priorSubmissions: latestSubmissionsOnly 928 ? undefined 929 : priorSubmissions.map(dbRowToItemSubmissionWithItemTypeIdentifier), 930 }; 931 // manually track how many distinct items have been returned 932 returnedItemCount++; 933 emptyQueryResult = false; 934 if ( 935 limit !== RETURN_UNLIMITED_RESULTS_AND_POTENTIALLY_HANG_DB && 936 returnedItemCount >= limit 937 ) { 938 break; 939 } 940 searchStartDate = latestSubmission.item_synthetic_created_at; 941 } 942 if (emptyQueryResult) { 943 break; 944 } 945 emptyQueryResult = true; 946 } 947 } 948 949 async #updateItemSubmissionTTL(_opts: { 950 org_id: string; 951 itemID: ItemIdentifier; 952 submissionID: string; 953 }) { 954 // find by ID 955 // INSERT all rows WITH TTL 30 days 956 throw new Error('Not Implemented'); 957 } 958 959 async getItemActionHistory(opts: { 960 orgId: string; 961 itemId: string; 962 itemTypeId: string; 963 itemSubmissionTime: Date | undefined; 964 }) { 965 const { itemId, itemTypeId, orgId, itemSubmissionTime } = opts; 966 967 const records = await this.actionExecutionsAdapter.getItemActionHistory({ 968 orgId, 969 itemId, 970 itemTypeId, 971 itemSubmissionTime, 972 }); 973 974 return filterNullOrUndefined( 975 records.map((record) => { 976 if (!record.itemId || !record.itemTypeId) { 977 return undefined; 978 } 979 980 return { 981 actionId: record.actionId, 982 itemId: record.itemId, 983 itemTypeId: record.itemTypeId, 984 actorId: record.actorId ?? undefined, 985 jobId: record.jobId ?? undefined, 986 itemCreatorId: record.userId ?? undefined, 987 itemCreatorTypeId: record.userTypeId ?? undefined, 988 policies: record.policies, 989 ruleIds: record.ruleIds, 990 ts: record.occurredAt, 991 }; 992 }), 993 ); 994 } 995} 996 997function scyllaSubmissionsForItemToSubmissionsForItemWithTypeIdentifier(it: { 998 latestSubmission: ScyllaItemSubmissionsRow; 999 priorSubmissions?: ScyllaItemSubmissionsRow[]; 1000}) { 1001 return { 1002 latestSubmission: dbRowToItemSubmissionWithItemTypeIdentifier( 1003 it.latestSubmission, 1004 ), 1005 priorSubmissions: it.priorSubmissions?.map( 1006 dbRowToItemSubmissionWithItemTypeIdentifier, 1007 ), 1008 }; 1009}