Mirror of https://github.com/roostorg/coop github.com/roostorg/coop
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

[Scylla] Stop connect failures and add visibility on connection errors (#395)

* [Scylla] Stop connect failures from pinning worker CPU and surface errors

* reduce cap to allow 6 retries

authored by

Juan Mrad and committed by
GitHub
f04c777f 379da56c

+60 -2
+56 -2
server/iocContainer/index.ts
··· 3 3 import Bottle from '@ethanresnick/bottlejs'; 4 4 import opentelemetry from '@opentelemetry/api'; 5 5 import { makeDateString, type ItemIdentifier } from '@roostorg/types'; 6 - import { types as scyllaTypes } from 'cassandra-driver'; 6 + import { types as scyllaTypes, type Host as ScyllaHost } from 'cassandra-driver'; 7 7 import IORedis, { type Cluster } from 'ioredis'; 8 8 import { 9 9 Kysely, ··· 234 234 } from '../utils/correlationIds.js'; 235 235 import { getUsableCoreCount } from '../utils/cpu-helpers.js'; 236 236 import { jsonStringify, type JsonOf } from '../utils/encoding.js'; 237 + import { logErrorJson, logJson } from '../utils/logging.js'; 237 238 import { __throw, assertUnreachable } from '../utils/misc.js'; 238 239 import SafeTracer from '../utils/SafeTracer.js'; 239 240 import { ··· 310 311 // that each dependent service can type its arg more specifically with the set 311 312 // of tables it is responsible for / allowed to query. 312 313 // eslint-disable-next-line @typescript-eslint/no-explicit-any 313 - Scylla: Scylla<any> & { close: () => Promise<void> }; 314 + Scylla: Scylla<any> & { 315 + connect: () => Promise<void>; 316 + close: () => Promise<void>; 317 + }; 314 318 315 319 Sequelize: ReturnType<typeof makeDb>; 316 320 OrgModel: ReturnType<typeof makeDb>['Org']; ··· 723 727 consistency: scyllaTypes.consistencies.localQuorum, 724 728 }, 725 729 }); 730 + 731 + // Surface cluster state changes so reconnect storms are visible in logs. 732 + scyllaDriver.on('hostUp', (host: ScyllaHost) => { 733 + // eslint-disable-next-line no-restricted-syntax 734 + logJson(`scylla.hostUp address=${host.address}`); 735 + }); 736 + scyllaDriver.on('hostDown', (host: ScyllaHost) => { 737 + // eslint-disable-next-line no-restricted-syntax 738 + logJson(`scylla.hostDown address=${host.address}`); 739 + }); 740 + // Forward driver-internal warnings/errors (auth, TLS, connection drops, 741 + // etc.); skip the very chatty `info`/`verbose` levels. 742 + scyllaDriver.on( 743 + 'log', 744 + ( 745 + level: 'verbose' | 'info' | 'warning' | 'error', 746 + source: string, 747 + message: string, 748 + furtherInfo?: unknown, 749 + ) => { 750 + if (level !== 'warning' && level !== 'error') { 751 + return; 752 + } 753 + const wrapped = new Error(`scylla.${level}: [${source}] ${message}`); 754 + if (furtherInfo instanceof Error) { 755 + wrapped.stack = furtherInfo.stack ?? wrapped.stack; 756 + } 757 + // eslint-disable-next-line no-restricted-syntax 758 + logErrorJson({ 759 + message: `scylla.driver.${level}`, 760 + error: wrapped, 761 + }); 762 + }, 763 + ); 764 + 765 + // cassandra-driver leaks ~4 HostMap listeners per failed `Client._connect()` 766 + // retry and never recreates the HostMap, so the default cap of 10 trips 767 + // after ~3 failures. Raise it so transient blips don't spam the warning, 768 + // but keep it bounded so a true runaway is still noticeable. 769 + const controlConnection = ( 770 + scyllaDriver as unknown as { 771 + controlConnection?: { hosts?: { setMaxListeners?: (n: number) => void } }; 772 + } 773 + ).controlConnection; 774 + controlConnection?.hosts?.setMaxListeners?.(15); 775 + 726 776 class ClosableScylla< 727 777 DB extends Record<string, Record<string, unknown>>, 728 778 > extends Scylla<DB> { 779 + /** Eagerly connect; idempotent once `connected` is true. */ 780 + async connect() { 781 + return scyllaDriver.connect(); 782 + } 729 783 async close() { 730 784 return scyllaDriver.shutdown(); 731 785 }
+4
server/workers_jobs/ItemProcessingWorker.ts
··· 23 23 'ContentApiLogger', 24 24 'ModerationConfigService', 25 25 'ItemInvestigationService', 26 + 'Scylla', 26 27 'Meter', 27 28 'itemSubmissionRetryQueueBulkWrite', 28 29 ], ··· 33 34 contentApiLogger, 34 35 moderationConfigService, 35 36 ItemInvestigationService, 37 + scylla, 36 38 Meter, 37 39 itemSubmissionRetryQueueBulkWrite, 38 40 ) => { ··· 42 44 return { 43 45 type: 'Worker' as const, 44 46 async run(_signal) { 47 + // Fail fast if Scylla is unreachable, instead of retry-looping per job. 48 + await scylla.connect(); 45 49 queue = new Queue(ITEM_SUBMISSION_QUEUE_NAME, { connection: redis }); 46 50 const insertWithRetries = tracer.traced( 47 51 {