A decentralized music tracking and discovery platform built on AT Protocol 🎵
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: update avatar script to use bun for script execution and improve user processing logic

+73 -24
+6 -6
apps/api/package.json
··· 9 9 "dev": "concurrently 'tsx --watch ./src/index.ts' 'tsx --watch ./src/server.ts'", 10 10 "prod": "tsx ./src/index.ts", 11 11 "build": "pkgroll", 12 - "sync": "tsx ./src/scripts/sync.ts", 13 - "meili:sync": "tsx ./src/scripts/meili.ts", 14 - "sync:library": "tsx ./src/scripts/sync-library.ts", 15 - "avatar": "tsx ./src/scripts/avatar.ts", 16 - "genres": "tsx ./src/scripts/genres.ts", 12 + "sync": "bun ./src/scripts/sync.ts", 13 + "meili:sync": "bun ./src/scripts/meili.ts", 14 + "sync:library": "bun ./src/scripts/sync-library.ts", 15 + "avatar": "bun ./src/scripts/avatar.ts", 16 + "genres": "bun ./src/scripts/genres.ts", 17 17 "exp": "tsx ./src/scripts/exp.ts", 18 18 "pkl:eval": "pkl eval -f json", 19 - "pkl:gen": "tsx ./scripts/pkl.ts", 19 + "pkl:gen": "bun ./scripts/pkl.ts", 20 20 "dev:xrpc": "tsx --watch ./src/server.ts", 21 21 "prod:xrpc": "tsx ./src/server.ts", 22 22 "db:migrate": "drizzle-kit migrate",
+67 -18
apps/api/src/scripts/avatar.ts
··· 1 + import chalk from "chalk"; 1 2 import { ctx } from "context"; 2 3 import { eq, or } from "drizzle-orm"; 3 4 import _ from "lodash"; 4 - import users from "schema/users"; 5 + import users, { type SelectUser } from "schema/users"; 5 6 6 7 const args = process.argv.slice(2); 7 - 8 - for (const did of args) { 9 - const [user] = await ctx.db 10 - .select() 11 - .from(users) 12 - .where(or(eq(users.did, did), eq(users.handle, did))) 13 - .limit(1) 14 - .execute(); 15 - if (!user) { 16 - console.log(`User ${did} not found`); 17 - continue; 18 - } 8 + const BATCH_SIZE = 100; // Process 100 users at a time 19 9 10 + async function processUser(user: SelectUser) { 20 11 if (!process.env.SKIP_AVATAR_UPDATE) { 21 12 const plc = await fetch(`https://plc.directory/${user.did}`).then((res) => 22 13 res.json() ··· 24 15 25 16 const serviceEndpoint = _.get(plc, "service.0.serviceEndpoint"); 26 17 if (!serviceEndpoint) { 27 - console.log(`Service endpoint not found for ${did}`); 28 - continue; 18 + console.log(`Service endpoint not found for ${user.did}`); 19 + return; 29 20 } 30 21 31 22 const profile = await fetch( ··· 42 33 .where(eq(users.did, user.did)) 43 34 .execute(); 44 35 } else { 45 - console.log(`Skipping avatar update for ${did}`); 36 + console.log(`Skipping avatar update for ${user.did}`); 46 37 } 47 38 48 39 const [u] = await ctx.db ··· 64 55 }; 65 56 66 57 console.log(userPayload); 58 + await ctx.nc.publish( 59 + "rocksky.user", 60 + Buffer.from(JSON.stringify(userPayload)) 61 + ); 62 + } 67 63 68 - ctx.nc.publish("rocksky.user", Buffer.from(JSON.stringify(userPayload))); 64 + if (args.length > 0) { 65 + for (const did of args) { 66 + const [user] = await ctx.db 67 + .select() 68 + .from(users) 69 + .where(or(eq(users.did, did), eq(users.handle, did))) 70 + .limit(1) 71 + .execute(); 72 + if (!user) { 73 + console.log(`User ${did} not found`); 74 + continue; 75 + } 76 + 77 + await processUser(user); 78 + } 79 + } else { 80 + let offset = 0; 81 + let processedCount = 0; 82 + 83 + console.log("Processing all users..."); 84 + 85 + while (true) { 86 + const batch = await ctx.db 87 + .select() 88 + .from(users) 89 + .limit(BATCH_SIZE) 90 + .offset(offset) 91 + .execute(); 69 92 70 - await new Promise((resolve) => setTimeout(resolve, 3000)); 93 + if (batch.length === 0) { 94 + break; // No more users to process 95 + } 96 + 97 + console.log( 98 + `Processing batch ${Math.floor(offset / BATCH_SIZE) + 1}, users ${offset + 1}-${offset + batch.length}` 99 + ); 100 + 101 + for (const user of batch) { 102 + try { 103 + await processUser(user); 104 + processedCount++; 105 + } catch (error) { 106 + console.error(`Error processing user ${user.did}:`, error); 107 + } 108 + } 109 + 110 + offset += BATCH_SIZE; 111 + 112 + // Small delay between batches to avoid overwhelming the API 113 + await new Promise((resolve) => setTimeout(resolve, 100)); 114 + } 115 + 116 + console.log(`Processed ${chalk.greenBright(processedCount)} users total`); 71 117 } 118 + 119 + // Ensure all messages are flushed before exiting 120 + await ctx.nc.flush(); 72 121 73 122 console.log("Done"); 74 123