this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

redis

alice cf62173a e6629d08

+81 -5
+2
.husky/pre-commit
··· 1 + # shellcheck disable=SC2148 2 + bunx lint-staged
+6
package.json
··· 24 24 "cli-progress": "^3.12.0", 25 25 "dotenv": "^16.4.5", 26 26 "express": "^4.21.1", 27 + "husky": "^9.1.6", 27 28 "ky": "^1.7.2", 28 29 "kysely": "^0.27.4", 29 30 "kysely-codegen": "^0.16.8", 30 31 "libsql": "^0.4.6", 32 + "lint-staged": "^15.2.10", 31 33 "p-limit": "^6.1.0", 32 34 "pg": "^8.13.0", 33 35 "pg-native": "^3.2.0", 34 36 "pino": "^9.5.0", 35 37 "pino-pretty": "^11.3.0", 36 38 "prom-client": "^15.1.3", 39 + "redis": "^4.7.0", 37 40 "terminal-kit": "^3.1.1" 38 41 }, 39 42 "devDependencies": { ··· 46 49 "tsx": "^4.19.1", 47 50 "typescript": "^5.6.3", 48 51 "typescript-eslint": "^8.10.0" 52 + }, 53 + "lint-staged": { 54 + "*": "prettier --ignore-unknown --write --ignore-path .prettierignore" 49 55 }, 50 56 "author": "alice", 51 57 "license": "MIT"
+1
src/constants.ts
··· 3 3 export const RELAY_URL = process.env.RELAY_URL!; 4 4 export const PLC_DB_PATH = process.env.PLC_DB_PATH!; 5 5 export const METRICS_PORT = parseInt(process.env.METRICS_PORT!, 10); 6 + export const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379'; 6 7 export const SQL_OUTPUT_FILE = 'dids_pds.jsonl'; 7 8 export const HEALTH_CHECK_FILE = 'pds_health.json'; 8 9 export const DATA_OUTPUT_FILE = 'bsky_data.jsonl';
+4 -2
src/db/migrations/001-create.ts
··· 19 19 has_emojis BOOLEAN NOT NULL DEFAULT FALSE, 20 20 langs TEXT[] NOT NULL DEFAULT '{}', 21 21 text TEXT, 22 - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc') 22 + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc'), 23 + UNIQUE (did, cid, rkey) 23 24 ); 24 25 25 26 CREATE TABLE IF NOT EXISTS emojis ( ··· 38 39 rkey TEXT NOT NULL, -- ~13 characters 39 40 description TEXT, 40 41 display_name TEXT, 41 - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc') 42 + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc'), 43 + UNIQUE (did, cid, rkey) 42 44 ); 43 45 `); 44 46 }
+4 -3
src/db/schema.d.ts
··· 14 14 export interface Emojis { 15 15 created_at: Generated<Timestamp>; 16 16 emoji: string; 17 - id: Generated<number>; 17 + id: Generated<Int8>; 18 18 lang: string; 19 - post_id: number; 19 + post_id: Int8 | null; 20 + profile_id: Int8 | null; 20 21 } 21 22 22 23 export interface Posts { ··· 24 25 created_at: Generated<Timestamp>; 25 26 did: string; 26 27 has_emojis: Generated<boolean>; 27 - id: Generated<number>; 28 + id: Generated<Int8>; 28 29 langs: Generated<string[]>; 29 30 rkey: string; 30 31 text: string | null;
+4
src/index.ts
··· 1 1 import { DIDS_TO_PROCESS, METRICS_PORT } from './constants.js'; 2 2 import { startMetricsServer } from './metrics.js'; 3 + import { redis } from './redis.js'; 3 4 import { gracefulShutdown, registerShutdownHandlers } from './shutdown.js'; 4 5 import { fetchAndDumpDidsPdses } from './stages/stage1.js'; 5 6 import { checkAllPDSHealth, selectAllDids } from './stages/stage2.js'; ··· 12 13 13 14 // start metrics server 14 15 startMetricsServer(METRICS_PORT); 16 + 17 + // connect to redis 18 + await redis.connect(); 15 19 16 20 // stage 1 17 21 await fetchAndDumpDidsPdses();
+34
src/redis.ts
··· 1 + import { createClient } from 'redis'; 2 + 3 + import { REDIS_URL } from './constants.js'; 4 + 5 + const redis = createClient({ url: REDIS_URL }); 6 + 7 + redis.on('error', (err: Error) => { 8 + console.error('Redis Client Error', { error: err }); 9 + }); 10 + 11 + redis.on('connect', () => { 12 + console.info('Connected to Redis.'); 13 + }); 14 + 15 + redis.on('ready', () => { 16 + console.info('Redis client ready.'); 17 + }); 18 + 19 + redis.on('end', () => { 20 + console.info('Redis client disconnected.'); 21 + }); 22 + 23 + // let SCRIPT_SHA: string; 24 + 25 + // const loadRedisScripts = async () => { 26 + // const scriptPath = new URL('lua/incrementEmojis.lua', import.meta.url); 27 + // const incrementEmojisScript = fs.readFileSync(scriptPath, 'utf8'); 28 + // SCRIPT_SHA = await redis.scriptLoad(incrementEmojisScript); 29 + // console.info(`Loaded Redis script with SHA: ${SCRIPT_SHA}`); 30 + // }; 31 + 32 + // export { redis, loadRedisScripts, SCRIPT_SHA }; 33 + 34 + export { redis };
+3
src/shutdown.ts
··· 2 2 import { closeDatabase } from './db/postgres.js'; 3 3 import { postBatchQueue, profileBatchQueue } from './db/postgresBatchQueues.js'; 4 4 import { stopMetricsServer } from './metrics.js'; 5 + import { redis } from './redis.js'; 5 6 6 7 export async function gracefulShutdown(): Promise<void> { 7 8 console.log('Initiating graceful shutdown...'); ··· 11 12 console.log('All pending batches have been flushed.'); 12 13 await closeDatabase(); 13 14 console.log('Database connections closed.'); 15 + await redis.quit(); 16 + console.log('Redis client disconnected.'); 14 17 process.exit(0); 15 18 } catch (err) { 16 19 console.error(`Error during shutdown: ${(err as Error).message}`);
+20
src/stages/stage3.ts
··· 4 4 import { PDS_DATA_FETCH_CONCURRENCY, PYTHON_SERVICE_TIMEOUT_MS } from '../constants.js'; 5 5 import { postBatchQueue, profileBatchQueue } from '../db/postgresBatchQueues.js'; 6 6 import { sanitizeTimestamp } from '../helpers.js'; 7 + import { redis } from '../redis.js'; 7 8 import { 8 9 BskyData, 9 10 BskyPost, ··· 24 25 25 26 const tasks = dids.map(({ did, pds }) => 26 27 limit(async () => { 28 + const status = await redis.get(`${did}:status`); 29 + if (status === 'completed' || status === 'failed') { 30 + process.stdout.write('~'); 31 + return; 32 + } 33 + 34 + // in theory this happens when we're resuming processing after a crash 35 + if (status === 'processing') { 36 + console.log(`Resuming processing for DID ${did}`); 37 + } 38 + 39 + await redis.set(`${did}:status`, 'processing'); 40 + 27 41 try { 28 42 const res = await axios.post( 29 43 'http://localhost:8000/fetch', ··· 112 126 } 113 127 } 114 128 successfulDids++; 129 + redis.set(`${did}:status`, 'completed').catch((err: unknown) => { 130 + console.error(`Redis set error for DID ${did}: ${(err as Error).message}`); 131 + }); 115 132 if (successfulDids % 100 === 0) { 116 133 // process.stdout.write('#'); 117 134 console.log(`Processed ${successfulDids} DIDs.`); ··· 122 139 // eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access 123 140 res.data.on('error', (err: Error) => { 124 141 console.error(`Stream error for DID ${did}: ${err.message}`); 142 + redis.set(`${did}:status`, 'failed').catch((err: unknown) => { 143 + console.error(`Redis set error for DID ${did}: ${(err as Error).message}`); 144 + }); 125 145 failedDids++; 126 146 if (failedDids % 100 === 0) { 127 147 // process.stdout.write('*');
+2
src/types.ts
··· 1 1 export interface DidAndPds { 2 2 did: string; 3 3 pds: string; 4 + status?: 'pending' | 'processing' | 'completed' | 'failed'; 5 + error?: string; 4 6 } 5 7 6 8 export type PdsToDidsMap = Record<string, string[] | undefined>;
+1
wipe-db.sh
··· 1 1 #!/bin/bash 2 2 direnv allow 3 + valkey-cli flushall 3 4 psql "$DATABASE_URL" -c ' 4 5 TRUNCATE TABLE posts RESTART IDENTITY CASCADE; 5 6 TRUNCATE TABLE profiles RESTART IDENTITY CASCADE;