A simple tool which lets you scrape twitter accounts and crosspost them to bluesky accounts! Comes with a CLI and a webapp for managing profiles! Works with images/videos/link embeds/threads.
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add 5-subbranch concurrent account/backfill scheduling

jack 2123725f ac561bdd

+97 -99
+97 -99
src/index.ts
··· 235 235 // Custom Twitter Client 236 236 // ============================================================================ 237 237 238 - let scraper: Scraper | null = null; 239 - let currentTwitterCookies = { authToken: '', ct0: '' }; 238 + const scraperSessions = new Map<string, Scraper>(); 239 + const sessionCookies = new Map<string, { authToken: string; ct0: string }>(); 240 240 let useBackupCredentials = false; 241 241 const lastCreatedAtByBsky = new Map<string, number>(); 242 + const SUBBRANCH_COUNT = 5; 242 243 243 244 function getUniqueCreatedAtIso(bskyIdentifier: string, desiredMs: number): string { 244 245 const key = bskyIdentifier.toLowerCase(); ··· 248 249 return new Date(nextMs).toISOString(); 249 250 } 250 251 251 - async function getTwitterScraper(forceReset = false): Promise<Scraper | null> { 252 + async function getTwitterScraper(sessionKey = 'default', forceReset = false): Promise<Scraper | null> { 252 253 const config = getConfig(); 253 254 let authToken = config.twitter.authToken; 254 255 let ct0 = config.twitter.ct0; ··· 262 263 if (!authToken || !ct0) return null; 263 264 264 265 // Re-initialize if config changed, not yet initialized, or forced reset 265 - if (!scraper || forceReset || currentTwitterCookies.authToken !== authToken || currentTwitterCookies.ct0 !== ct0) { 266 + const existingScraper = scraperSessions.get(sessionKey); 267 + const existingCookies = sessionCookies.get(sessionKey); 268 + if ( 269 + !existingScraper || 270 + forceReset || 271 + existingCookies?.authToken !== authToken || 272 + existingCookies?.ct0 !== ct0 273 + ) { 266 274 console.log(`🔄 Initializing Twitter scraper with ${useBackupCredentials ? 'BACKUP' : 'PRIMARY'} credentials...`); 267 - scraper = new Scraper(); 275 + const scraper = new Scraper(); 268 276 await scraper.setCookies([`auth_token=${authToken}`, `ct0=${ct0}`]); 269 - 270 - currentTwitterCookies = { 277 + scraperSessions.set(sessionKey, scraper); 278 + sessionCookies.set(sessionKey, { 271 279 authToken: authToken, 272 280 ct0: ct0, 273 - }; 281 + }); 274 282 } 275 - return scraper; 283 + return scraperSessions.get(sessionKey) ?? null; 276 284 } 277 285 278 286 async function switchCredentials() { ··· 280 288 if (config.twitter.backupAuthToken && config.twitter.backupCt0) { 281 289 useBackupCredentials = !useBackupCredentials; 282 290 console.log(`⚠️ Switching to ${useBackupCredentials ? 'BACKUP' : 'PRIMARY'} Twitter credentials...`); 283 - await getTwitterScraper(true); 291 + scraperSessions.clear(); 292 + sessionCookies.clear(); 284 293 return true; 285 294 } 286 295 console.log('⚠️ No backup credentials available to switch to.'); ··· 1190 1199 return [...existingFacets, ...newFacets].sort((a, b) => a.index.byteStart - b.index.byteStart); 1191 1200 } 1192 1201 1193 - // Simple p-limit implementation for concurrency control 1194 - const pLimit = (concurrency: number) => { 1195 - const queue: (() => Promise<void>)[] = []; 1196 - let activeCount = 0; 1197 - 1198 - const next = () => { 1199 - activeCount--; 1200 - if (queue.length > 0) { 1201 - queue.shift()!(); 1202 - } 1203 - }; 1204 - 1205 - return <T>(fn: () => Promise<T>): Promise<T> => { 1206 - return new Promise<T>((resolve, reject) => { 1207 - const run = async () => { 1208 - activeCount++; 1209 - try { 1210 - resolve(await fn()); 1211 - } catch (e) { 1212 - reject(e); 1213 - } finally { 1214 - next(); 1215 - } 1216 - }; 1217 - 1218 - if (activeCount < concurrency) { 1219 - run(); 1220 - } else { 1221 - queue.push(run); 1222 - } 1223 - }); 1224 - }; 1225 - }; 1226 - 1227 1202 // Replaced safeSearch with fetchUserTweets to use UserTweets endpoint instead of Search 1228 1203 // Added processedIds for early stopping optimization 1229 - async function fetchUserTweets(username: string, limit: number, processedIds?: Set<string>): Promise<Tweet[]> { 1230 - const client = await getTwitterScraper(); 1204 + async function fetchUserTweets( 1205 + username: string, 1206 + limit: number, 1207 + processedIds?: Set<string>, 1208 + sessionKey = 'default', 1209 + ): Promise<Tweet[]> { 1210 + const client = await getTwitterScraper(sessionKey); 1231 1211 if (!client) return []; 1232 1212 1233 1213 let retries = 3; ··· 1316 1296 dryRun = false, 1317 1297 sharedProcessedMap?: ProcessedTweetsMap, 1318 1298 sharedTweetMap?: Map<string, Tweet>, 1299 + sessionKey = 'default', 1319 1300 ): Promise<void> { 1320 1301 // Filter tweets to ensure they're actually from this user 1321 1302 const filteredTweets = tweets.filter((t) => { ··· 1413 1394 1414 1395 let parentBackfilled = false; 1415 1396 try { 1416 - const scraper = await getTwitterScraper(); 1397 + const scraper = await getTwitterScraper(sessionKey); 1417 1398 if (scraper) { 1418 1399 const parentRaw = await scraper.getTweet(replyStatusId); 1419 1400 if (parentRaw) { ··· 1432 1413 dryRun, 1433 1414 localProcessedMap, 1434 1415 tweetMap, 1416 + sessionKey, 1435 1417 ); 1436 1418 1437 1419 // Check if it was saved ··· 1946 1928 dryRun = false, 1947 1929 ignoreCancellation = false, 1948 1930 requestId?: string, 1931 + sessionKey = 'default', 1949 1932 ): Promise<void> { 1950 1933 const config = getConfig(); 1951 1934 const mapping = config.mappings.find((m) => ··· 1983 1966 console.log(`Fetching tweets for ${twitterUsername}...`); 1984 1967 updateAppStatus({ message: `Fetching tweets...` }); 1985 1968 1986 - const client = await getTwitterScraper(); 1969 + const client = await getTwitterScraper(sessionKey); 1987 1970 if (client) { 1988 1971 try { 1989 1972 // Use getTweets which reliably fetches user timeline ··· 2021 2004 2022 2005 console.log(`Fetch complete. Found ${allFoundTweets.length} new tweets to import.`); 2023 2006 if (allFoundTweets.length > 0) { 2024 - await processTweets(agent as BskyAgent, twitterUsername, bskyIdentifier, allFoundTweets, dryRun); 2007 + await processTweets(agent as BskyAgent, twitterUsername, bskyIdentifier, allFoundTweets, dryRun, undefined, undefined, sessionKey); 2025 2008 console.log('History import complete.'); 2026 2009 } 2027 2010 } ··· 2198 2181 } 2199 2182 } 2200 2183 2201 - async function runAccountTask(mapping: AccountMapping, backfillRequest?: PendingBackfill, dryRun = false) { 2184 + async function runAccountTask( 2185 + mapping: AccountMapping, 2186 + backfillRequest?: PendingBackfill, 2187 + dryRun = false, 2188 + sessionKey = 'default', 2189 + ) { 2202 2190 const logPrefix = getMappingLogPrefix(mapping); 2203 2191 const existingTask = activeTasks.get(mapping.id); 2204 2192 if (existingTask) { ··· 2297 2285 backfillRequestId: backfillReq.requestId, 2298 2286 }); 2299 2287 await withTimeout( 2300 - importHistory(twitterUsername, mapping.bskyIdentifier, limit, dryRun, false, backfillReq.requestId), 2288 + importHistory(twitterUsername, mapping.bskyIdentifier, limit, dryRun, false, backfillReq.requestId, sessionKey), 2301 2289 backfillAccountTimeoutMs, 2302 2290 `[${twitterUsername}] Backfill timed out after ${Math.round(backfillAccountTimeoutMs / 1000)}s`, 2303 2291 ); ··· 2350 2338 // Use fetchUserTweets with early stopping optimization 2351 2339 // Increase limit slightly since we have early stopping now 2352 2340 const tweets = await withTimeout( 2353 - fetchUserTweets(twitterUsername, 50, processedIds), 2341 + fetchUserTweets(twitterUsername, 50, processedIds, sessionKey), 2354 2342 scheduledAccountTimeoutMs, 2355 2343 `[${twitterUsername}] Scheduled fetch timed out after ${Math.round(scheduledAccountTimeoutMs / 1000)}s`, 2356 2344 ); ··· 2362 2350 2363 2351 console.log(`[${twitterUsername}] 📥 Fetched ${tweets.length} tweets.`); 2364 2352 await withTimeout( 2365 - processTweets(agent, twitterUsername, mapping.bskyIdentifier, tweets, dryRun), 2353 + processTweets(agent, twitterUsername, mapping.bskyIdentifier, tweets, dryRun, undefined, undefined, sessionKey), 2366 2354 scheduledAccountTimeoutMs, 2367 2355 `[${twitterUsername}] Scheduled processing timed out after ${Math.round(scheduledAccountTimeoutMs / 1000)}s`, 2368 2356 ); ··· 2466 2454 ); 2467 2455 }; 2468 2456 2457 + const createSubbranches = <T,>(items: T[], branchCount = SUBBRANCH_COUNT): T[][] => { 2458 + const branches = Array.from({ length: Math.max(1, branchCount) }, () => [] as T[]); 2459 + for (let index = 0; index < items.length; index += 1) { 2460 + branches[index % branches.length]?.push(items[index] as T); 2461 + } 2462 + return branches; 2463 + }; 2464 + 2465 + const runMappingsWithSubbranches = async ( 2466 + mappings: AccountMapping[], 2467 + dryRun: boolean, 2468 + modeLabel: 'scheduled' | 'run-once', 2469 + ) => { 2470 + const enabledMappings = mappings.filter((mapping) => mapping.enabled); 2471 + if (enabledMappings.length === 0) { 2472 + const logPrefix = modeLabel === 'run-once' ? '[CLI]' : '[Scheduler]'; 2473 + console.log(`${logPrefix} ℹ️ No enabled mappings found for ${modeLabel} cycle.`); 2474 + return; 2475 + } 2476 + 2477 + const branches = createSubbranches(enabledMappings); 2478 + const tasks = branches.map(async (branchMappings, branchIndex) => { 2479 + const sessionKey = `subbranch-${branchIndex + 1}`; 2480 + if (branchMappings.length === 0) return; 2481 + console.log( 2482 + `[${modeLabel}] 🌿 Subbranch ${branchIndex + 1}/${branches.length} processing ${branchMappings.length} mapping(s).`, 2483 + ); 2484 + for (const mapping of branchMappings) { 2485 + await runAccountTask(mapping, undefined, dryRun, sessionKey); 2486 + } 2487 + }); 2488 + 2489 + await Promise.all(tasks); 2490 + }; 2491 + 2469 2492 const runSingleCycle = async (cycleConfig: ReturnType<typeof getConfig>) => { 2470 - const runLimit = pLimit(3); 2471 - const tasks: Promise<void>[] = []; 2472 2493 2473 2494 if (options.backfillMapping) { 2474 2495 const mapping = findMappingByRef(cycleConfig.mappings, options.backfillMapping); ··· 2491 2512 }; 2492 2513 2493 2514 console.log(`[CLI] 🚧 Running backfill for ${mapping.bskyIdentifier}...`); 2494 - await runAccountTask(mapping, backfillRequest, options.dryRun); 2515 + await runAccountTask(mapping, backfillRequest, options.dryRun, 'subbranch-1'); 2495 2516 updateAppStatus({ state: 'idle', message: `Backfill complete for ${mapping.bskyIdentifier}` }); 2496 2517 return; 2497 2518 } 2498 2519 2499 - for (const mapping of cycleConfig.mappings) { 2500 - if (!mapping.enabled) continue; 2501 - tasks.push( 2502 - runLimit(async () => { 2503 - await runAccountTask(mapping, undefined, options.dryRun); 2504 - }), 2505 - ); 2506 - } 2507 - 2508 - if (tasks.length === 0) { 2509 - console.log('[CLI] No enabled mappings found.'); 2510 - updateAppStatus({ state: 'idle', message: 'No enabled mappings found' }); 2511 - return; 2512 - } 2513 - 2514 - await Promise.all(tasks); 2520 + await runMappingsWithSubbranches(cycleConfig.mappings, options.dryRun, 'run-once'); 2515 2521 updateAppStatus({ state: 'idle', message: options.dryRun ? 'Dry run cycle complete' : 'Run-once cycle complete' }); 2516 2522 }; 2517 2523 ··· 2524 2530 console.log(`Scheduler started. Base interval: ${config.checkIntervalMinutes} minutes.`); 2525 2531 updateLastCheckTime(); // Initialize next time 2526 2532 2527 - // Concurrency limit for processing accounts 2528 - const runLimit = pLimit(3); 2529 2533 let deferredScheduledRun = false; 2530 2534 let lastWakeSignal = getSchedulerWakeSignal(); 2531 2535 ··· 2581 2585 message: `Backfill queue priority: ${pendingBackfills.length} job(s), ~${estimatedPendingTweets} tweets pending`, 2582 2586 }); 2583 2587 2584 - const [nextBackfill] = pendingBackfills; 2585 - if (nextBackfill) { 2586 - const mapping = findMappingById(config.mappings, nextBackfill.id); 2588 + const selectedBackfills: PendingBackfill[] = []; 2589 + const mappingIds = new Set<string>(); 2590 + for (const backfill of pendingBackfills) { 2591 + if (mappingIds.has(backfill.id)) continue; 2592 + mappingIds.add(backfill.id); 2593 + selectedBackfills.push(backfill); 2594 + if (selectedBackfills.length >= SUBBRANCH_COUNT) break; 2595 + } 2596 + 2597 + const backfillTasks = selectedBackfills.map(async (backfill, branchIndex) => { 2598 + const mapping = findMappingById(config.mappings, backfill.id); 2587 2599 if (mapping && mapping.enabled) { 2588 - const limit = nextBackfill.limit || 15; 2600 + const limit = backfill.limit || 15; 2589 2601 console.log( 2590 - `[Scheduler] 🚧 Backfill priority 1/${pendingBackfills.length}: ${mapping.bskyIdentifier} (limit ${limit})`, 2602 + `[Scheduler] 🚧 Backfill subbranch ${branchIndex + 1}/${SUBBRANCH_COUNT}: ${mapping.bskyIdentifier} (limit ${limit})`, 2591 2603 ); 2592 - await runAccountTask(mapping, nextBackfill, options.dryRun); 2604 + await runAccountTask(mapping, backfill, options.dryRun, `subbranch-${branchIndex + 1}`); 2593 2605 } else { 2594 - clearBackfill(nextBackfill.id, nextBackfill.requestId); 2606 + clearBackfill(backfill.id, backfill.requestId); 2595 2607 } 2596 - } 2608 + }); 2609 + await Promise.all(backfillTasks); 2597 2610 2598 2611 const remainingBackfills = getPendingBackfills(); 2599 2612 if (remainingBackfills.length === 0) { ··· 2616 2629 deferredScheduledRun = false; 2617 2630 updateLastCheckTime(); 2618 2631 2619 - const tasks: Promise<void>[] = []; 2620 - for (const mapping of config.mappings) { 2621 - if (!mapping.enabled) continue; 2622 - 2623 - tasks.push( 2624 - runLimit(async () => { 2625 - await runAccountTask(mapping, undefined, options.dryRun); 2626 - }), 2627 - ); 2628 - } 2629 - 2630 - if (tasks.length > 0) { 2631 - await Promise.all(tasks); 2632 - console.log(`[Scheduler] ✅ All tasks for this cycle complete.`); 2633 - } else { 2634 - console.log('[Scheduler] ℹ️ No enabled mappings found for scheduled cycle.'); 2635 - } 2632 + await runMappingsWithSubbranches(config.mappings, options.dryRun, 'scheduled'); 2633 + console.log(`[Scheduler] ✅ All subbranches for this cycle complete.`); 2636 2634 2637 2635 updateAppStatus({ state: 'idle', message: 'Scheduled checks complete' }); 2638 2636 }