this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Postgres persistence

alice f11b2c94 c2e1f6f5

+218 -24
+1
.gitignore
··· 1 1 node_modules 2 2 .env 3 3 bun.lockb 4 + CURSOR_OVERRIDE.TXT
+1 -2
package.json
··· 16 16 }, 17 17 "lint-staged": { 18 18 "*": "prettier --ignore-unknown --write --ignore-path packages/backend/.prettierignore" 19 - }, 20 - "dependencies": {} 19 + } 21 20 }
+1
packages/backend/.env.example
··· 4 4 PORT=3100 5 5 ORIGINS=http://localhost:5173,https://production.example.com 6 6 METRICS_PORT=3101 7 + DATABASE_URL=postgresql://localhost:5432/emojistats
+7 -1
packages/backend/package.json
··· 8 8 "dev": "bunx tsx --watch src/index.ts", 9 9 "format": "bunx prettier --write .", 10 10 "lint": "bunx eslint .", 11 - "lint:fix": "bunx eslint --fix ." 11 + "lint:fix": "bunx eslint --fix .", 12 + "db:generate": "bunx kysely-codegen --out-file ./src/lib/schema.d.ts", 13 + "db:create": "bunx tsx ./src/migrations/002-postgresCreate.ts" 12 14 }, 13 15 "dependencies": { 14 16 "@skyware/jetstream": "^0.1.9", 17 + "@types/pg": "^8.11.10", 15 18 "dotenv": "^16.4.5", 16 19 "emoji-regex": "^10.4.0", 17 20 "express": "^4.21.0", 21 + "kysely": "^0.27.4", 22 + "pg": "^8.13.0", 18 23 "pino": "^9.4.0", 19 24 "pino-pretty": "^11.2.2", 20 25 "prom-client": "^15.1.3", ··· 30 35 "@types/express": "^4.17.21", 31 36 "@types/node": "^22.7.5", 32 37 "eslint": "^9.12.0", 38 + "kysely-codegen": "^0.16.8", 33 39 "prettier": "^3.3.3", 34 40 "tsx": "^4.19.1", 35 41 "typescript": "^5.6.2",
+82 -17
packages/backend/src/lib/emojiStats.ts
··· 1 1 import { CommitCreateEvent } from '@skyware/jetstream'; 2 2 import emojiRegexFunc from 'emoji-regex'; 3 3 import fs from 'fs'; 4 + import { sql } from 'kysely'; 4 5 5 6 import { MAX_EMOJIS, MAX_TOP_LANGUAGES } from '../config.js'; 6 7 import { batchNormalizeEmojis } from './emojiNormalization.js'; ··· 12 13 totalPostsWithEmojis, 13 14 totalPostsWithoutEmojis, 14 15 } from './metrics.js'; 16 + import { db } from './postgres.js'; 15 17 import { SCRIPT_SHA, redis } from './redis.js'; 16 18 import { Emoji, LanguageStat } from './types.js'; 17 19 ··· 33 35 export async function handleCreate(event: CommitCreateEvent<'app.bsky.feed.post'>) { 34 36 const timer = postProcessingDuration.startTimer(); 35 37 try { 36 - const { commit } = event; 38 + const { commit, did } = event; 37 39 38 40 if (!commit.rkey) return; 39 41 40 - const { record } = commit; 42 + const { record, cid, rkey } = commit; 41 43 42 44 try { 43 45 let langs = new Set<string>(); ··· 48 50 } 49 51 50 52 const emojiMatches: string[] = record.text.match(emojiRegex) ?? []; 53 + await db.transaction().execute(async (tx) => { 54 + if (emojiMatches.length > 0) { 55 + const stringifiedLangs = JSON.stringify(Array.from(langs)); 51 56 52 - if (emojiMatches.length > 0) { 53 - const stringifiedLangs = JSON.stringify(Array.from(langs)); 57 + const normalizedEmojis = JSON.stringify(batchNormalizeEmojis(emojiMatches)); 58 + 59 + await redis.evalSha(SCRIPT_SHA, { 60 + arguments: [normalizedEmojis, stringifiedLangs], 61 + }); 54 62 55 - const normalizedEmojis = JSON.stringify(batchNormalizeEmojis(emojiMatches)); 63 + logger.debug(`Emojis updated for languages: ${Array.from(langs).join(', ')}`); 64 + incrementTotalEmojis(emojiMatches.length); 65 + totalPostsWithEmojis.inc(); 66 + } else { 67 + await redis.incr(POSTS_WITHOUT_EMOJIS); 68 + totalPostsWithoutEmojis.inc(); 69 + } 56 70 57 - await redis.evalSha(SCRIPT_SHA, { 58 - arguments: [normalizedEmojis, stringifiedLangs], 59 - }); 71 + await redis.incr(PROCESSED_POSTS); 72 + incrementTotalPosts(); 73 + 74 + // const createdAt = new Date().toUTCString(); 75 + const { id } = await tx 76 + .insertInto('posts') 77 + .values({ 78 + cid: cid, 79 + did: did, 80 + rkey: rkey, 81 + has_emojis: emojiMatches.length > 0, 82 + langs: Array.from(langs), 83 + // created_at: createdAt, 84 + }) 85 + .returning('id') 86 + .executeTakeFirstOrThrow(); 87 + 88 + for (const emoji of emojiMatches) { 89 + for (const lang of langs) { 90 + if (lang === 'nn') console.dir(commit, { depth: null }); 91 + await tx 92 + .insertInto('emojis') 93 + .values({ 94 + post_id: id, 95 + emoji: emoji, 96 + lang: lang, 97 + }) 98 + .returning('id') 99 + .executeTakeFirstOrThrow(); 60 100 61 - logger.debug(`Emojis updated for languages: ${Array.from(langs).join(', ')}`); 62 - incrementTotalEmojis(emojiMatches.length); 63 - totalPostsWithEmojis.inc(); 64 - } else { 65 - await redis.incr(POSTS_WITHOUT_EMOJIS); 66 - totalPostsWithoutEmojis.inc(); 67 - } 101 + await tx 102 + .insertInto('emoji_stats') 103 + .values({ 104 + lang: lang, 105 + emoji: emoji, 106 + count: 1, 107 + }) 108 + .onConflict((b) => 109 + b.columns(['lang', 'emoji']).doUpdateSet({ 110 + count: sql`emoji_stats.count + 1`, 111 + // created_at: createdAt, 112 + }), 113 + ) 114 + .execute(); 115 + } 116 + } 68 117 69 - await redis.incr(PROCESSED_POSTS); 70 - incrementTotalPosts(); 118 + // Update global emoji_stats (lang = 'emojiStats') 119 + for (const emoji of emojiMatches) { 120 + await tx 121 + .insertInto('emoji_stats') 122 + .values({ 123 + lang: 'emojiStats', 124 + emoji: emoji, 125 + count: 1, 126 + }) 127 + .onConflict((b) => 128 + b.columns(['lang', 'emoji']).doUpdateSet({ 129 + count: sql`emoji_stats.count + 1`, 130 + // created_at: createdAt, 131 + }), 132 + ) 133 + .execute(); 134 + } 135 + }); 71 136 } catch (error) { 72 137 logger.error(`Error processing "create" commit: ${(error as Error).message}`, { commit, record }); 73 138 logger.error(`Malformed record data: ${JSON.stringify(record)}`);
+27 -3
packages/backend/src/lib/jetstream.ts
··· 4 4 import { handleCreate } from './emojiStats.js'; 5 5 import logger from './logger.js'; 6 6 import { redis } from './redis.js'; 7 + import fs from 'node:fs'; 7 8 8 9 let jetstream: Jetstream; 9 10 let cursor = 0; ··· 14 15 } 15 16 16 17 export const initializeJetstream = async () => { 17 - const result = await redis.get('cursor'); 18 - if (result === null) { 18 + console.log(import.meta.url); 19 + const cursorOverridePath = new URL('../../../../CURSOR_OVERRIDE.TXT', import.meta.url); 20 + 21 + if (fs.existsSync(cursorOverridePath)) { 22 + try { 23 + const overrideCursor = fs.readFileSync(cursorOverridePath, 'utf8').trim(); 24 + cursor = parseInt(overrideCursor, 10); 25 + 26 + if (isNaN(cursor)) { 27 + throw new Error('Invalid cursor value in CURSOR_OVERRIDE.TXT'); 28 + } 29 + 30 + await redis.set('cursor', cursor.toString()); 31 + logger.info(`Cursor overridden with value: ${cursor} (${epochUsToDateTime(cursor)})`); 32 + 33 + fs.unlinkSync(cursorOverridePath); 34 + logger.info('CURSOR_OVERRIDE.TXT file deleted after successful override'); 35 + } catch (error) { 36 + logger.error(`Error processing CURSOR_OVERRIDE.TXT: ${(error as Error).message}`); 37 + } 38 + } else { 39 + const result = await redis.get('cursor'); 40 + if (result === null) { 19 41 logger.info('No cursor found, initializing with current epoch in microseconds...'); 20 42 cursor = Math.floor(Date.now() * 1000); 21 43 await redis.set('cursor', cursor.toString()); 22 44 logger.info(`Initialized new cursor with value: ${cursor} (${epochUsToDateTime(cursor)})`); 45 + } else { 46 + logger.info(`Found existing cursor in Redis: ${result} (${epochUsToDateTime(Number(result))})`); 47 + } 23 48 } 24 - logger.info(`Found existing cursor in Redis: ${result} (${epochUsToDateTime(Number(result))})`); 25 49 26 50 jetstream = new Jetstream({ 27 51 wantedCollections: ['app.bsky.feed.post'],
+1 -1
packages/backend/src/lib/lua/incrementEmojis.lua
··· 12 12 -- Increment per-language emoji counts and global language stats 13 13 for _, lang in ipairs(langs) do 14 14 for _, emoji in ipairs(emojis) do 15 - redis.call('ZINCRBY', lang, 1, emoji) -- langKey being pt, ja, UNKNOWN, etc. 15 + redis.call('ZINCRBY', lang, 1, emoji) -- langKey being pt, ja, unknown, etc. 16 16 redis.call('ZINCRBY', 'languageStats', 1, lang) -- languageStats is the counter for per-language emoji count 17 17 end 18 18 end
+14
packages/backend/src/lib/postgres.ts
··· 1 + import { Kysely, PostgresDialect } from 'kysely'; 2 + import type { DB } from './schema.js'; 3 + import pg from 'pg'; 4 + const { Pool } = pg; 5 + 6 + const db = new Kysely<DB>({ 7 + dialect: new PostgresDialect({ 8 + pool: new Pool({ 9 + connectionString: process.env.DATABASE_URL, 10 + }), 11 + }), 12 + }); 13 + 14 + export { db };
+43
packages/backend/src/lib/schema.d.ts
··· 1 + /** 2 + * This file was generated by kysely-codegen. 3 + * Please do not edit it manually. 4 + */ 5 + 6 + import type { ColumnType } from "kysely"; 7 + 8 + export type Generated<T> = T extends ColumnType<infer S, infer I, infer U> 9 + ? ColumnType<S, I | undefined, U> 10 + : ColumnType<T, T | undefined, T>; 11 + 12 + export type Timestamp = ColumnType<Date, Date | string>; 13 + 14 + export interface Emojis { 15 + created_at: Generated<Timestamp>; 16 + emoji: string; 17 + id: Generated<number>; 18 + lang: string; 19 + post_id: number; 20 + } 21 + 22 + export interface EmojiStats { 23 + count: Generated<number>; 24 + created_at: Generated<Timestamp>; 25 + emoji: string; 26 + lang: string; 27 + } 28 + 29 + export interface Posts { 30 + cid: string; 31 + created_at: Generated<Timestamp>; 32 + did: string; 33 + has_emojis: Generated<boolean>; 34 + id: Generated<number>; 35 + langs: Generated<string[]>; 36 + rkey: string; 37 + } 38 + 39 + export interface DB { 40 + emoji_stats: EmojiStats; 41 + emojis: Emojis; 42 + posts: Posts; 43 + }
+41
packages/backend/src/migrations/002-postgresCreate.ts
··· 1 + import pg from 'pg' 2 + const { Client } = pg 3 + 4 + const client = new Client({ 5 + connectionString: process.env.DATABASE_URL!, 6 + }); 7 + 8 + await client.connect(); 9 + 10 + export async function createTables() { 11 + await client.query(` 12 + CREATE TABLE IF NOT EXISTS posts ( 13 + id SERIAL PRIMARY KEY, 14 + cid TEXT NOT NULL, -- 64 characters 15 + did TEXT NOT NULL, -- 32 characters 16 + rkey TEXT NOT NULL, -- 13 characters 17 + has_emojis BOOLEAN NOT NULL DEFAULT FALSE, 18 + langs TEXT[] NOT NULL DEFAULT '{}', 19 + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc') 20 + ); 21 + 22 + CREATE TABLE IF NOT EXISTS emojis ( 23 + id SERIAL PRIMARY KEY, 24 + post_id INTEGER NOT NULL, 25 + emoji TEXT NOT NULL, 26 + lang TEXT NOT NULL, -- 2 or 5 characters 27 + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc') 28 + ); 29 + 30 + CREATE TABLE IF NOT EXISTS emoji_stats ( 31 + lang TEXT NOT NULL, -- 2 or 5 characters 32 + emoji TEXT NOT NULL, 33 + count INTEGER NOT NULL DEFAULT 0, 34 + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc'), 35 + PRIMARY KEY (lang, emoji) 36 + ); 37 + `); 38 + } 39 + 40 + createTables().catch((e: unknown) => { console.error(e); }).finally(() => void client.end()); 41 +