this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

pretty good

alice e6629d08 16dd73ce

+261 -213
+6 -4
src/constants.ts
··· 6 6 export const SQL_OUTPUT_FILE = 'dids_pds.jsonl'; 7 7 export const HEALTH_CHECK_FILE = 'pds_health.json'; 8 8 export const DATA_OUTPUT_FILE = 'bsky_data.jsonl'; 9 - export const PDS_HEALTH_CHECK_CONCURRENCY = 20; 10 - export const PDS_DATA_FETCH_CONCURRENCY = 150; 9 + export const PDS_HEALTH_CHECK_CONCURRENCY = 50; 10 + export const PDS_HEALTH_CHECK_TIMEOUT_MS = 20000; 11 + export const PDS_DATA_FETCH_CONCURRENCY = 120; 11 12 export const DIDS_TO_PROCESS = parseInt(process.argv[2], 10) || 10000; 12 - export const BATCH_SIZE = 5000; // Number of records per batch 13 - export const BATCH_TIMEOUT_MS = 1000; // 1 second 13 + export const PYTHON_SERVICE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes 14 + export const BATCH_SIZE = 5000; // Number of postgres records per batch 15 + export const BATCH_TIMEOUT_MS = 5000; // how long to wait for a batch to fill before flushing 14 16 export const MAX_FLUSH_RETRIES = 5; // Maximum number of retry attempts for flushing
+39
src/db/insertFunctions.ts
··· 1 + import { PostData, ProfileData } from '../types.js'; 2 + import { db } from './postgres.js'; 3 + 4 + export const insertPosts = async (batch: PostData[]): Promise<void> => { 5 + await db.transaction().execute(async (tx) => { 6 + await tx 7 + .insertInto('posts') 8 + .values( 9 + batch.map((post) => ({ 10 + cid: post.cid, 11 + did: post.did, 12 + rkey: post.rkey, 13 + has_emojis: post.hasEmojis, 14 + langs: post.langs, 15 + text: post.post, 16 + created_at: new Date(post.createdAt), 17 + })), 18 + ) 19 + .execute(); 20 + }); 21 + }; 22 + 23 + export const insertProfiles = async (batch: ProfileData[]): Promise<void> => { 24 + await db.transaction().execute(async (tx) => { 25 + await tx 26 + .insertInto('profiles') 27 + .values( 28 + batch.map((profile) => ({ 29 + cid: profile.cid, 30 + did: profile.did, 31 + rkey: profile.rkey, 32 + display_name: profile.displayName, 33 + description: profile.description, 34 + created_at: new Date(profile.createdAt), 35 + })), 36 + ) 37 + .execute(); 38 + }); 39 + };
+119
src/db/postgresBatchQueue.ts
··· 1 + import { Mutex } from 'async-mutex'; 2 + 3 + import { MAX_FLUSH_RETRIES } from '../constants.js'; 4 + import { concurrentPostgresInserts } from '../metrics.js'; 5 + 6 + export class PostgresBatchQueue<T> { 7 + private queue: T[] = []; 8 + private mutex = new Mutex(); 9 + private batchSize: number; 10 + private batchTimeoutMs: number; 11 + private batchTimer: NodeJS.Timeout | null = null; 12 + private isShuttingDown = false; 13 + private insertFn: (batch: T[]) => Promise<void>; 14 + 15 + constructor(batchSize: number, batchTimeoutMs: number, insertFn: (batch: T[]) => Promise<void>) { 16 + this.batchSize = batchSize; 17 + this.batchTimeoutMs = batchTimeoutMs; 18 + this.insertFn = insertFn; 19 + } 20 + 21 + public async enqueue(data: T): Promise<void> { 22 + if (this.isShuttingDown) { 23 + throw new Error('Cannot enqueue data, the queue is shutting down.'); 24 + } 25 + 26 + const shouldFlush = await this.mutex.runExclusive(() => { 27 + this.queue.push(data); 28 + 29 + if (this.queue.length >= this.batchSize) { 30 + return true; 31 + } else if (!this.batchTimer) { 32 + this.scheduleFlush(); 33 + } 34 + 35 + return false; 36 + }); 37 + 38 + if (shouldFlush) { 39 + await this.flushQueue(); 40 + } 41 + } 42 + 43 + private scheduleFlush(): void { 44 + this.batchTimer = setTimeout(() => { 45 + this.flushQueue().catch((err: unknown) => { 46 + console.error(`Scheduled flush error: ${(err as Error).message}`); 47 + }); 48 + }, this.batchTimeoutMs); 49 + } 50 + 51 + private async flushQueue(): Promise<void> { 52 + if (this.batchTimer) { 53 + clearTimeout(this.batchTimer); 54 + this.batchTimer = null; 55 + } 56 + 57 + let currentBatch: T[] = []; 58 + 59 + await this.mutex.runExclusive(() => { 60 + if (this.queue.length === 0) { 61 + return; 62 + } 63 + currentBatch = this.queue.splice(0, this.batchSize); 64 + }); 65 + 66 + if (currentBatch.length === 0) { 67 + return; 68 + } 69 + 70 + concurrentPostgresInserts.inc(); 71 + 72 + try { 73 + await this.attemptFlush(currentBatch); 74 + console.log(`Flushed batch of ${currentBatch.length} items.`); 75 + } catch (error) { 76 + console.error(`Error flushing PostgreSQL batch: ${(error as Error).message}`); 77 + // Re-add the failed batch back for retry 78 + await this.mutex.runExclusive(() => { 79 + this.queue = currentBatch.concat(this.queue); 80 + }); 81 + } finally { 82 + concurrentPostgresInserts.dec(); 83 + } 84 + } 85 + 86 + private async attemptFlush(batch: T[]): Promise<void> { 87 + let attempt = 0; 88 + let success = false; 89 + 90 + while (attempt < MAX_FLUSH_RETRIES && !success) { 91 + try { 92 + await this.insertFn(batch); 93 + success = true; 94 + } catch (error) { 95 + attempt++; 96 + console.error(`Flush attempt ${attempt} failed: ${(error as Error).message}`); 97 + 98 + if (attempt < MAX_FLUSH_RETRIES) { 99 + const backoffTime = 2 ** attempt * 1000; 100 + console.log(`Retrying in ${backoffTime} ms...`); 101 + await new Promise((resolve) => setTimeout(resolve, backoffTime)); 102 + } else { 103 + console.error('Max retries reached. Re-queueing the batch.'); 104 + throw error; // Let the caller handle re-queueing 105 + } 106 + } 107 + } 108 + } 109 + 110 + public async shutdown(): Promise<void> { 111 + this.isShuttingDown = true; 112 + if (this.batchTimer) { 113 + clearTimeout(this.batchTimer); 114 + this.batchTimer = null; 115 + } 116 + await this.flushQueue(); 117 + console.log('Flushed all remaining items.'); 118 + } 119 + }
+7
src/db/postgresBatchQueues.ts
··· 1 + import { BATCH_SIZE, BATCH_TIMEOUT_MS } from '../constants.js'; 2 + import { PostData, ProfileData } from '../types.js'; 3 + import { insertPosts, insertProfiles } from './insertFunctions.js'; 4 + import { PostgresBatchQueue } from './postgresBatchQueue.js'; 5 + 6 + export const postBatchQueue = new PostgresBatchQueue<PostData>(BATCH_SIZE, BATCH_TIMEOUT_MS, insertPosts); 7 + export const profileBatchQueue = new PostgresBatchQueue<ProfileData>(BATCH_SIZE, BATCH_TIMEOUT_MS, insertProfiles);
+5 -16
src/helpers.ts
··· 1 1 import ky from 'ky'; 2 2 3 - import { RELAY_URL } from './constants.js'; 3 + import { PDS_HEALTH_CHECK_TIMEOUT_MS, RELAY_URL } from './constants.js'; 4 4 import { ServerDescription } from './types.js'; 5 5 6 6 export function sanitizePDSName(pds: string): string { 7 7 try { 8 - const originalPDS = pds; 9 - pds = pds.trim(); 10 - pds = pds.replace(/<\/?[^>]+(>|$)/g, ''); 11 - // eslint-disable-next-line no-control-regex 12 - pds = pds.replace(/[\x00-\x1F\x7F-\x9F]/g, ''); 13 - pds = pds.replace(/[^a-zA-Z0-9-._~:/?#[\]!$&'()*+,;=]/g, ''); 14 8 const hostnameWithPortRegex = /^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*(:\d{1,5})?$/; 15 9 16 10 if (!hostnameWithPortRegex.test(pds)) { 17 - throw new Error(`Invalid PDS hostname format. Original: ${originalPDS}`); 11 + throw new Error(`Invalid PDS hostname: ${pds}`); 18 12 } 19 13 20 - try { 21 - // eslint-disable-next-line @typescript-eslint/no-unused-vars 22 - const url = new URL(`https://${pds}/`); // if this still throws, the PDS name is invalid 23 - return pds; 24 - } catch (error) { 25 - throw new Error(`sanitizePDSName Error: ${error instanceof Error ? error.message : String(error)}`); 26 - } 14 + new URL(`https://${pds}/`); // if this still throws, the PDS name is invalid 15 + return pds; 27 16 } catch (error) { 28 17 throw new Error(`sanitizePDSName Error: ${error instanceof Error ? error.message : String(error)}`); 29 18 } ··· 41 30 42 31 try { 43 32 const res = await ky.get(`https://${pds}/xrpc/com.atproto.server.describeServer`, { 44 - timeout: 30000, 33 + timeout: PDS_HEALTH_CHECK_TIMEOUT_MS, 45 34 retry: { 46 35 limit: 3, 47 36 statusCodes: [429, 500, 502, 503, 504],
-2
src/index.ts
··· 27 27 await processDidsAndFetchData(didsToProcess); 28 28 29 29 await gracefulShutdown(); 30 - 31 - // console.log(`Fetched data array length: ${fetchedData.length}`); 32 30 } 33 31 34 32 main().catch(async (error: unknown) => {
+1 -1
src/metrics.ts
··· 3 3 import { Server } from 'http'; 4 4 import { Gauge, Registry, collectDefaultMetrics } from 'prom-client'; 5 5 6 - import { pool } from './postgres.js'; 6 + import { pool } from './db/postgres.js'; 7 7 8 8 const register = new Registry(); 9 9 collectDefaultMetrics({ register });
src/migrations/001-create.ts src/db/migrations/001-create.ts
src/postgres.ts src/db/postgres.ts
-159
src/postgresBatchQueue.ts
··· 1 - // src/postgresBatchQueue.ts 2 - import { Mutex } from 'async-mutex'; 3 - 4 - import { BATCH_SIZE, BATCH_TIMEOUT_MS, MAX_FLUSH_RETRIES } from './constants.js'; 5 - import { concurrentPostgresInserts } from './metrics.js'; 6 - import { closeDatabase, db } from './postgres.js'; 7 - import { PostData } from './types.js'; 8 - 9 - export class PostgresBatchQueue { 10 - private queue: PostData[] = []; 11 - private mutex = new Mutex(); 12 - private batchSize: number; 13 - private batchTimeoutMs: number; 14 - private batchTimer: NodeJS.Timeout | null = null; 15 - private isShuttingDown = false; 16 - 17 - constructor(batchSize: number, batchTimeoutMs: number) { 18 - this.batchSize = batchSize; 19 - this.batchTimeoutMs = batchTimeoutMs; 20 - } 21 - 22 - /** 23 - * Adds a PostData item to the queue and triggers flush if necessary. 24 - * @param data PostData to enqueue 25 - */ 26 - public async enqueue(data: PostData): Promise<void> { 27 - if (this.isShuttingDown) { 28 - throw new Error('Cannot enqueue data, the queue is shutting down.'); 29 - } 30 - 31 - const shouldFlush = await this.mutex.runExclusive(() => { 32 - this.queue.push(data); 33 - 34 - if (this.queue.length >= this.batchSize) { 35 - return true; 36 - } else if (!this.batchTimer) { 37 - this.scheduleFlush(); 38 - } 39 - 40 - return false; 41 - }); 42 - 43 - if (shouldFlush) { 44 - await this.flushQueue(); 45 - } 46 - } 47 - 48 - /** 49 - * Schedules a flush after the specified timeout. 50 - */ 51 - private scheduleFlush(): void { 52 - this.batchTimer = setTimeout(() => { 53 - this.flushQueue().catch((err: unknown) => { 54 - console.error(`Scheduled flush error: ${(err as Error).message}`); 55 - }); 56 - }, this.batchTimeoutMs); 57 - } 58 - 59 - /** 60 - * Flushes the current queue to PostgreSQL. 61 - */ 62 - private async flushQueue(): Promise<void> { 63 - // Clear the existing timer 64 - if (this.batchTimer) { 65 - clearTimeout(this.batchTimer); 66 - this.batchTimer = null; 67 - } 68 - 69 - let currentBatch: PostData[] = []; 70 - 71 - await this.mutex.runExclusive(() => { 72 - if (this.queue.length === 0) { 73 - return; 74 - } 75 - currentBatch = this.queue.splice(0, this.batchSize); 76 - }); 77 - 78 - if (currentBatch.length === 0) { 79 - return; 80 - } 81 - 82 - concurrentPostgresInserts.inc(); 83 - 84 - try { 85 - await this.attemptFlush(currentBatch); 86 - process.stdout.write('.'); 87 - } catch (error) { 88 - console.error(`Error flushing PostgreSQL batch: ${(error as Error).message}`); 89 - // Re-add the failed batch back for retry 90 - await this.mutex.runExclusive(() => { 91 - this.queue = currentBatch.concat(this.queue); 92 - }); 93 - } finally { 94 - concurrentPostgresInserts.dec(); 95 - } 96 - } 97 - 98 - /** 99 - * Attempts to flush the batch with retry logic. 100 - * @param batch The batch of PostData to flush 101 - */ 102 - private async attemptFlush(batch: PostData[]): Promise<void> { 103 - let attempt = 0; 104 - let success = false; 105 - 106 - while (attempt < MAX_FLUSH_RETRIES && !success) { 107 - try { 108 - await db.transaction().execute(async (tx) => { 109 - await tx 110 - .insertInto('posts') // Ensure 'posts' is your table name 111 - .values( 112 - batch.map((post) => ({ 113 - cid: post.cid, 114 - did: post.did, 115 - rkey: post.rkey, 116 - has_emojis: post.hasEmojis, 117 - langs: post.langs, 118 - text: post.post, 119 - created_at: post.createdAt, 120 - })), 121 - ) 122 - .execute(); 123 - }); 124 - 125 - success = true; 126 - } catch (error) { 127 - attempt++; 128 - console.error(`Flush attempt ${attempt} failed: ${(error as Error).message}`); 129 - 130 - if (attempt < MAX_FLUSH_RETRIES) { 131 - const backoffTime = 2 ** attempt * 1000; // Exponential backoff 132 - console.log(`Retrying in ${backoffTime} ms...`); 133 - await new Promise((resolve) => setTimeout(resolve, backoffTime)); 134 - } else { 135 - console.error('Max retries reached. Re-queueing the batch.'); 136 - throw error; // Let the caller handle re-queueing 137 - } 138 - } 139 - } 140 - } 141 - 142 - /** 143 - * Gracefully shuts down the queue by flushing remaining items. 144 - */ 145 - public async shutdown(): Promise<void> { 146 - this.isShuttingDown = true; 147 - if (this.batchTimer) { 148 - clearTimeout(this.batchTimer); 149 - this.batchTimer = null; 150 - } 151 - await this.flushQueue(); 152 - console.log('Flushed all remaining items.'); 153 - await closeDatabase(); 154 - console.log('Database connections closed.'); 155 - } 156 - } 157 - 158 - // Instantiate the queue 159 - export const postgresBatchQueue = new PostgresBatchQueue(BATCH_SIZE, BATCH_TIMEOUT_MS);
src/schema.d.ts src/db/schema.d.ts
+3 -3
src/shutdown.ts
··· 1 1 // src/shutdown.ts 2 + import { closeDatabase } from './db/postgres.js'; 3 + import { postBatchQueue, profileBatchQueue } from './db/postgresBatchQueues.js'; 2 4 import { stopMetricsServer } from './metrics.js'; 3 - import { closeDatabase } from './postgres.js'; 4 - import { postgresBatchQueue } from './postgresBatchQueue.js'; 5 5 6 6 export async function gracefulShutdown(): Promise<void> { 7 7 console.log('Initiating graceful shutdown...'); 8 8 try { 9 9 await stopMetricsServer(); 10 - await postgresBatchQueue.shutdown(); 10 + await Promise.all([postBatchQueue.shutdown(), profileBatchQueue.shutdown()]); 11 11 console.log('All pending batches have been flushed.'); 12 12 await closeDatabase(); 13 13 console.log('Database connections closed.');
+4 -1
src/stages/stage1.ts
··· 13 13 console.log(`${SQL_OUTPUT_FILE} already exists. Skipping DID fetching.`); 14 14 return; 15 15 } catch { 16 - console.log('fetchAndDumpDidsPdses'); 17 16 console.log('Fetching DIDs from database'); 18 17 const startTime = performance.now(); 19 18 ··· 54 53 const sanitizedPds = pds 55 54 .replace(/^(https?:\/\/)/, '') 56 55 .replace(/\/+$/, '') 56 + .replace(/<\/?[^>]+(>|$)/g, '') 57 + // eslint-disable-next-line no-control-regex 58 + .replace(/[\x00-\x1F\x7F-\x9F]/g, '') 59 + .replace(/[^a-zA-Z0-9-._~:/?#[\]!$&'()*+,;=]/g, '') 57 60 .trim(); 58 61 const finalPds = 59 62 sanitizedPds.includes('bsky.social') || sanitizedPds.includes('bsky.network') ? RELAY_URL : sanitizedPds;
+39 -8
src/stages/stage3.ts
··· 1 1 import axios from 'axios'; 2 2 import pLimit from 'p-limit'; 3 3 4 - import { PDS_DATA_FETCH_CONCURRENCY } from '../constants.js'; 4 + import { PDS_DATA_FETCH_CONCURRENCY, PYTHON_SERVICE_TIMEOUT_MS } from '../constants.js'; 5 + import { postBatchQueue, profileBatchQueue } from '../db/postgresBatchQueues.js'; 5 6 import { sanitizeTimestamp } from '../helpers.js'; 6 - import { postgresBatchQueue } from '../postgresBatchQueue.js'; 7 - import { BskyData, BskyPost, BskyPostData, DidAndPds, PostData } from '../types.js'; 7 + import { 8 + BskyData, 9 + BskyPost, 10 + BskyPostData, 11 + BskyProfile, 12 + BskyProfileData, 13 + DidAndPds, 14 + PostData, 15 + ProfileData, 16 + } from '../types.js'; 8 17 9 18 export async function processDidsAndFetchData(dids: DidAndPds[]): Promise<void> { 10 19 const limit = pLimit(PDS_DATA_FETCH_CONCURRENCY); ··· 21 30 { did, pds }, 22 31 { 23 32 responseType: 'stream', 24 - timeout: 30 * 60 * 1000, // 30 minutes 33 + timeout: PYTHON_SERVICE_TIMEOUT_MS, // 30 minutes 25 34 }, 26 35 ); 27 36 ··· 46 55 const postData = post.value as unknown as BskyPostData; 47 56 const rkeyParts = k.split('/'); 48 57 const rkey = rkeyParts.length > 1 ? rkeyParts[1] : k; 49 - const sanitizedCreatedAt = sanitizeTimestamp(postData.createdAt); 58 + const sanitizedCreatedAt = 59 + postData.createdAt ? sanitizeTimestamp(postData.createdAt) : '1970-01-01T00:00:00.000Z'; 50 60 const data: PostData = { 51 61 cid: postData.cid, 52 62 did: did, ··· 56 66 post: postData.text, 57 67 createdAt: sanitizedCreatedAt, 58 68 }; 59 - postgresBatchQueue.enqueue(data).catch((err: unknown) => { 69 + postBatchQueue.enqueue(data).catch((err: unknown) => { 70 + console.error(`Enqueue error for DID ${did}: ${(err as Error).message}`); 71 + }); 72 + } else if (k.includes('app.bsky.actor.profile')) { 73 + const profile = v as BskyProfile; 74 + const profileData = profile.value as unknown as BskyProfileData; 75 + const rkeyParts = k.split('/'); 76 + const rkey = rkeyParts.length > 1 ? rkeyParts[1] : k; 77 + const sanitizedCreatedAt = 78 + profileData.createdAt ? sanitizeTimestamp(profileData.createdAt) : '1970-01-01T00:00:00.000Z'; 79 + const data: ProfileData = { 80 + cid: profileData.cid, 81 + did: did, 82 + rkey: rkey, 83 + displayName: profileData.displayName ?? '', 84 + description: profileData.description ?? '', 85 + createdAt: sanitizedCreatedAt, 86 + }; 87 + 88 + profileBatchQueue.enqueue(data).catch((err: unknown) => { 60 89 console.error(`Enqueue error for DID ${did}: ${(err as Error).message}`); 61 90 }); 62 91 } ··· 84 113 } 85 114 successfulDids++; 86 115 if (successfulDids % 100 === 0) { 87 - process.stdout.write('#'); 116 + // process.stdout.write('#'); 117 + console.log(`Processed ${successfulDids} DIDs.`); 88 118 } 89 119 resolve(); 90 120 }); ··· 94 124 console.error(`Stream error for DID ${did}: ${err.message}`); 95 125 failedDids++; 96 126 if (failedDids % 100 === 0) { 97 - process.stdout.write('*'); 127 + // process.stdout.write('*'); 128 + console.log(`Failed ${failedDids} DIDs.`); 98 129 } 99 130 reject(err); 100 131 });
+31 -19
src/types.ts
··· 22 22 cid: string; 23 23 } 24 24 25 + export interface BskyProfileData { 26 + $type: string; 27 + avatar?: { 28 + ref: { $link: string }; 29 + size: number; 30 + $type: string; 31 + mimeType: string; 32 + }; 33 + banner?: { 34 + ref: { $link: string }; 35 + size: number; 36 + $type: string; 37 + mimeType: string; 38 + }; 39 + createdAt: string; 40 + description?: string; 41 + displayName?: string; 42 + cid: string; 43 + } 44 + 25 45 export type BskyPost = Record< 26 46 string, 27 47 { ··· 34 54 string, 35 55 { 36 56 cid: string; 37 - value: { 38 - $type: string; 39 - avatar?: { 40 - ref: { $link: string }; 41 - size: number; 42 - $type: string; 43 - mimeType: string; 44 - }; 45 - banner?: { 46 - ref: { $link: string }; 47 - size: number; 48 - $type: string; 49 - mimeType: string; 50 - }; 51 - createdAt: string; 52 - description?: string; 53 - displayName?: string; 54 - cid: string; 55 - }; 57 + value: BskyProfileData; 56 58 } 57 59 >; 58 60 59 61 export type BskyData = BskyPost | BskyProfile; 60 62 63 + // TODO: redundant 61 64 export interface PostData { 62 65 cid: string; 63 66 did: string; ··· 67 70 post: string; 68 71 createdAt: string; 69 72 } 73 + 74 + export interface ProfileData { 75 + cid: string; 76 + did: string; 77 + rkey: string; 78 + displayName: string; 79 + description: string; 80 + createdAt: string; 81 + }
+7
wipe-db.sh
··· 1 + #!/bin/bash 2 + direnv allow 3 + psql "$DATABASE_URL" -c ' 4 + TRUNCATE TABLE posts RESTART IDENTITY CASCADE; 5 + TRUNCATE TABLE profiles RESTART IDENTITY CASCADE; 6 + TRUNCATE TABLE emojis RESTART IDENTITY CASCADE; 7 + '