A simple tool which lets you scrape twitter accounts and crosspost them to bluesky accounts! Comes with a CLI and a webapp for managing profiles! Works with images/videos/link embeds/threads.
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: prioritize queued backfills

jack 320d9451 b89e9e16

+191 -48
+50 -7
public/index.html
··· 93 93 } 94 94 95 95 .table > :not(caption) > * > * { padding: 1rem; } 96 - 96 + 97 + .status-dot { 98 + width: 8px; 99 + height: 8px; 100 + border-radius: 999px; 101 + display: inline-block; 102 + margin-right: 8px; 103 + background: #10b981; 104 + box-shadow: 0 0 0 3px rgba(16, 185, 129, 0.15); 105 + } 106 + .status-queued { 107 + background: #f59e0b; 108 + box-shadow: 0 0 0 3px rgba(245, 158, 11, 0.15); 109 + } 110 + .status-active { 111 + background: #10b981; 112 + box-shadow: 0 0 0 3px rgba(16, 185, 129, 0.15); 113 + } 114 + .status-backfilling { 115 + background: #f97316; 116 + box-shadow: 0 0 0 3px rgba(249, 115, 22, 0.18); 117 + } 118 + 97 119 .tweet-preview { 98 120 max-width: 300px; 99 121 white-space: nowrap; ··· 213 235 214 236 useEffect(() => { 215 237 if (view !== 'dashboard' || !token) return; 216 - const statusTimer = setInterval(fetchStatus, 5000); 217 - const activityTimer = setInterval(fetchActivity, 10000); 238 + const statusTimer = setInterval(fetchStatus, 2000); 239 + const activityTimer = setInterval(fetchActivity, 7000); 218 240 return () => { 219 241 clearInterval(statusTimer); 220 242 clearInterval(activityTimer); ··· 327 349 } 328 350 }; 329 351 330 - const runBackfill = async (id) => { 352 + const runBackfill = async (id, label = 'Backfill') => { 353 + const hasQueue = status.pendingBackfills && status.pendingBackfills.length > 0; 354 + const isActiveBackfill = status.currentStatus?.state === 'backfilling'; 355 + if (hasQueue || isActiveBackfill) { 356 + const confirmMsg = `${label} is already running or queued. This will add a new request and replace any existing one for this account. Continue?`; 357 + if (!confirm(confirmMsg)) return; 358 + } 331 359 const limit = prompt(`How many tweets to backfill per account?`, "15"); 332 360 if (limit === null) return; 333 361 try { ··· 349 377 }; 350 378 351 379 const resetAndBackfill = async (id) => { 380 + const hasQueue = status.pendingBackfills && status.pendingBackfills.length > 0; 381 + const isActiveBackfill = status.currentStatus?.state === 'backfilling'; 382 + if (hasQueue || isActiveBackfill) { 383 + const confirmMsg = 'Backfill is already running or queued. This will add a new request and replace any existing one for this account. Continue?'; 384 + if (!confirm(confirmMsg)) return; 385 + } 352 386 const limit = prompt(`Reset cache and backfill how many tweets?`, "15"); 353 387 if (limit === null) return; 354 388 try { ··· 484 518 } 485 519 486 520 const isBackfillQueued = (id) => status.pendingBackfills?.some(b => (b.id || b) === id); 521 + const backfillEntry = (id) => status.pendingBackfills?.find(b => (b.id || b) === id); 522 + const activeBackfillId = status.currentStatus?.backfillMappingId; 523 + const isBackfillActive = (id) => status.currentStatus?.state === 'backfilling' && activeBackfillId === id; 487 524 488 525 // Check if configs are set to collapse by default 489 526 const hasTwitterConfig = twitterConfig.authToken && twitterConfig.ct0; ··· 601 638 <td className="small text-muted fw-medium">{m.bskyIdentifier}</td> 602 639 <td> 603 640 <div className="d-flex align-items-center"> 604 - <span className={`status-dot ${isBackfillQueued(m.id) ? 'status-queued' : 'status-active'}`}></span> 605 - <span className="badge bg-success bg-opacity-10 text-success border border-success border-opacity-10">{isBackfillQueued(m.id) ? 'Backfilling' : 'Active'}</span> 641 + <span className={`status-dot ${isBackfillActive(m.id) ? 'status-backfilling' : (isBackfillQueued(m.id) ? 'status-queued' : 'status-active')}`}></span> 642 + {isBackfillActive(m.id) ? ( 643 + <span className="badge bg-warning bg-opacity-10 text-warning border border-warning border-opacity-10">Backfilling</span> 644 + ) : isBackfillQueued(m.id) ? ( 645 + <span className="badge bg-warning bg-opacity-10 text-warning border border-warning border-opacity-10">Queued #{backfillEntry(m.id)?.position || ''}</span> 646 + ) : ( 647 + <span className="badge bg-success bg-opacity-10 text-success border border-success border-opacity-10">Active</span> 648 + )} 606 649 </div> 607 650 </td> 608 651 <td className="text-end pe-4"> ··· 614 657 {isAdmin && ( 615 658 <> 616 659 <li><button className="dropdown-item" onClick={() => setEditingMapping(m)}>Edit</button></li> 617 - <li><button className="dropdown-item" onClick={() => runBackfill(m.id)}>Backfill History</button></li> 660 + <li><button className="dropdown-item" onClick={() => runBackfill(m.id, 'Backfill')}>Backfill History</button></li> 618 661 <li><button className="dropdown-item text-warning" onClick={() => resetAndBackfill(m.id)}>Reset Cache & Backfill</button></li> 619 662 <li><button className="dropdown-item text-danger fw-bold" onClick={() => deleteAllPosts(m.id)}>Danger: Delete All Posts</button></li> 620 663 <li><hr className="dropdown-divider"/></li>
+98 -33
src/index.ts
··· 1422 1422 1423 1423 import { getAgent } from './bsky.js'; 1424 1424 1425 - async function importHistory(twitterUsername: string, bskyIdentifier: string, limit = 15, dryRun = false, ignoreCancellation = false): Promise<void> { 1425 + async function importHistory( 1426 + twitterUsername: string, 1427 + bskyIdentifier: string, 1428 + limit = 15, 1429 + dryRun = false, 1430 + ignoreCancellation = false, 1431 + requestId?: string, 1432 + ): Promise<void> { 1426 1433 const config = getConfig(); 1427 1434 const mapping = config.mappings.find((m) => m.twitterUsernames.map(u => u.toLowerCase()).includes(twitterUsername.toLowerCase())); 1428 1435 if (!mapping) { ··· 1468 1475 1469 1476 for await (const scraperTweet of generator) { 1470 1477 if (!ignoreCancellation) { 1471 - const stillPending = getPendingBackfills().some(b => b.id === mapping.id); 1472 - if (!stillPending) { 1473 - console.log(`[${twitterUsername}] 🛑 Backfill cancelled.`); 1474 - break; 1475 - } 1478 + const stillPending = getPendingBackfills().some(b => b.id === mapping.id && (!requestId || b.requestId === requestId)); 1479 + if (!stillPending) { 1480 + console.log(`[${twitterUsername}] 🛑 Backfill cancelled.`); 1481 + break; 1482 + } 1483 + 1476 1484 } 1477 1485 1478 1486 const t = mapScraperTweetToLocalTweet(scraperTweet); ··· 1501 1509 // Task management 1502 1510 const activeTasks = new Map<string, Promise<void>>(); 1503 1511 1504 - async function runAccountTask(mapping: AccountMapping, forceBackfill = false, dryRun = false) { 1512 + async function runAccountTask(mapping: AccountMapping, backfillRequest?: PendingBackfill, dryRun = false) { 1505 1513 if (activeTasks.has(mapping.id)) return; // Already running 1506 1514 1507 1515 const task = (async () => { ··· 1509 1517 const agent = await getAgent(mapping); 1510 1518 if (!agent) return; 1511 1519 1512 - const backfillReq = getPendingBackfills().find(b => b.id === mapping.id); 1520 + const backfillReq = backfillRequest ?? getPendingBackfills().find(b => b.id === mapping.id); 1513 1521 1514 - if (forceBackfill || backfillReq) { 1515 - const limit = backfillReq?.limit || 15; 1522 + if (backfillReq) { 1523 + const limit = backfillReq.limit || 15; 1516 1524 console.log(`[${mapping.bskyIdentifier}] Running backfill for ${mapping.twitterUsernames.length} accounts (limit ${limit})...`); 1525 + updateAppStatus({ 1526 + state: 'backfilling', 1527 + currentAccount: mapping.twitterUsernames[0], 1528 + message: `Starting backfill (limit ${limit})...`, 1529 + backfillMappingId: mapping.id, 1530 + backfillRequestId: backfillReq.requestId, 1531 + }); 1517 1532 1518 1533 for (const twitterUsername of mapping.twitterUsernames) { 1534 + const stillPending = getPendingBackfills().some( 1535 + (b) => b.id === mapping.id && b.requestId === backfillReq.requestId, 1536 + ); 1537 + if (!stillPending) { 1538 + console.log(`[${mapping.bskyIdentifier}] 🛑 Backfill request replaced; stopping.`); 1539 + break; 1540 + } 1541 + 1519 1542 try { 1520 - updateAppStatus({ state: 'backfilling', currentAccount: twitterUsername, message: `Starting backfill (limit ${limit})...` }); 1521 - await importHistory(twitterUsername, mapping.bskyIdentifier, limit, dryRun); 1543 + updateAppStatus({ 1544 + state: 'backfilling', 1545 + currentAccount: twitterUsername, 1546 + message: `Starting backfill (limit ${limit})...`, 1547 + backfillMappingId: mapping.id, 1548 + backfillRequestId: backfillReq.requestId, 1549 + }); 1550 + await importHistory(twitterUsername, mapping.bskyIdentifier, limit, dryRun, false, backfillReq.requestId); 1522 1551 } catch (err) { 1523 1552 console.error(`❌ Error backfilling ${twitterUsername}:`, err); 1524 1553 } 1525 1554 } 1526 - clearBackfill(mapping.id); 1555 + clearBackfill(mapping.id, backfillReq.requestId); 1556 + updateAppStatus({ 1557 + state: 'idle', 1558 + message: `Backfill complete for ${mapping.bskyIdentifier}`, 1559 + backfillMappingId: undefined, 1560 + backfillRequestId: undefined, 1561 + }); 1527 1562 console.log(`[${mapping.bskyIdentifier}] Backfill complete.`); 1528 1563 } else { 1564 + updateAppStatus({ backfillMappingId: undefined, backfillRequestId: undefined }); 1565 + 1529 1566 // Pre-load processed IDs for optimization 1530 1567 const processedMap = loadProcessedTweets(mapping.bskyIdentifier); 1531 1568 const processedIds = new Set(Object.keys(processedMap)); ··· 1533 1570 for (const twitterUsername of mapping.twitterUsernames) { 1534 1571 try { 1535 1572 console.log(`[${twitterUsername}] 🏁 Starting check for new tweets...`); 1536 - updateAppStatus({ state: 'checking', currentAccount: twitterUsername, message: 'Fetching latest tweets...' }); 1573 + updateAppStatus({ 1574 + state: 'checking', 1575 + currentAccount: twitterUsername, 1576 + message: 'Fetching latest tweets...', 1577 + backfillMappingId: undefined, 1578 + backfillRequestId: undefined, 1579 + }); 1537 1580 1538 1581 // Use fetchUserTweets with early stopping optimization 1539 1582 // Increase limit slightly since we have early stopping now ··· 1570 1613 getNextCheckTime, 1571 1614 updateAppStatus, 1572 1615 } from './server.js'; 1616 + import type { PendingBackfill } from './server.js'; 1573 1617 import { AccountMapping } from './config-manager.js'; 1574 1618 1575 1619 async function main(): Promise<void> { ··· 1631 1675 // Concurrency limit for processing accounts 1632 1676 const runLimit = pLimit(3); 1633 1677 1678 + const findMappingById = (mappings: AccountMapping[], id: string) => 1679 + mappings.find((mapping) => mapping.id === id); 1680 + 1634 1681 // Main loop 1635 1682 while (true) { 1636 1683 const now = Date.now(); 1637 1684 const config = getConfig(); // Reload config to get new mappings/settings 1638 1685 const nextTime = getNextCheckTime(); 1639 - 1686 + 1640 1687 // Check if it's time for a scheduled run OR if we have pending backfills 1641 1688 const isScheduledRun = now >= nextTime; 1642 1689 const pendingBackfills = getPendingBackfills(); 1643 - 1690 + 1644 1691 if (isScheduledRun) { 1645 - console.log(`[${new Date().toISOString()}] ⏰ Scheduled check triggered.`); 1646 - updateLastCheckTime(); 1692 + console.log(`[${new Date().toISOString()}] ⏰ Scheduled check triggered.`); 1693 + updateLastCheckTime(); 1647 1694 } 1648 1695 1649 1696 const tasks: Promise<void>[] = []; 1650 1697 1651 - for (const mapping of config.mappings) { 1698 + if (pendingBackfills.length > 0) { 1699 + const [nextBackfill, ...rest] = pendingBackfills; 1700 + if (nextBackfill) { 1701 + const mapping = findMappingById(config.mappings, nextBackfill.id); 1702 + if (mapping && mapping.enabled) { 1703 + console.log(`[Scheduler] 🚧 Backfill priority: ${mapping.bskyIdentifier}`); 1704 + await runAccountTask(mapping, nextBackfill, options.dryRun); 1705 + } else { 1706 + clearBackfill(nextBackfill.id, nextBackfill.requestId); 1707 + } 1708 + } 1709 + if (pendingBackfills.length === 0 && getPendingBackfills().length === 0) { 1710 + updateAppStatus({ 1711 + state: 'idle', 1712 + message: 'Backfill queue empty', 1713 + backfillMappingId: undefined, 1714 + backfillRequestId: undefined, 1715 + }); 1716 + } 1717 + nextCheckTime = Date.now() + (config.checkIntervalMinutes || 5) * 60 * 1000; 1718 + } else if (isScheduledRun) { 1719 + for (const mapping of config.mappings) { 1652 1720 if (!mapping.enabled) continue; 1653 - 1654 - const hasPendingBackfill = pendingBackfills.some(b => b.id === mapping.id); 1655 - 1656 - // Run if scheduled OR backfill requested 1657 - if (isScheduledRun || hasPendingBackfill) { 1658 - // Queue task with concurrency limit 1659 - tasks.push(runLimit(async () => { 1660 - await runAccountTask(mapping, hasPendingBackfill, options.dryRun); 1661 - })); 1662 - } 1663 - } 1664 1721 1665 - if (tasks.length > 0) { 1722 + tasks.push(runLimit(async () => { 1723 + await runAccountTask(mapping, undefined, options.dryRun); 1724 + })); 1725 + } 1726 + 1727 + if (tasks.length > 0) { 1666 1728 await Promise.all(tasks); 1667 1729 console.log(`[Scheduler] ✅ All tasks for this cycle complete.`); 1730 + } 1731 + 1732 + updateAppStatus({ state: 'idle', message: 'Scheduled checks complete' }); 1668 1733 } 1669 - 1734 + 1670 1735 // Sleep for 5 seconds 1671 - await new Promise(resolve => setTimeout(resolve, 5000)); 1736 + await new Promise((resolve) => setTimeout(resolve, 5000)); 1672 1737 } 1673 1738 } 1674 1739
+43 -8
src/server.ts
··· 18 18 // In-memory state for triggers and scheduling 19 19 let lastCheckTime = Date.now(); 20 20 let nextCheckTime = Date.now() + (getConfig().checkIntervalMinutes || 5) * 60 * 1000; 21 - interface PendingBackfill { 21 + export interface PendingBackfill { 22 22 id: string; 23 23 limit?: number; 24 + queuedAt: number; 25 + sequence: number; 26 + requestId: string; 24 27 } 25 28 let pendingBackfills: PendingBackfill[] = []; 29 + let backfillSequence = 0; 26 30 27 31 interface AppStatus { 28 32 state: 'idle' | 'checking' | 'backfilling' | 'pacing' | 'processing'; ··· 30 34 processedCount?: number; 31 35 totalCount?: number; 32 36 message?: string; 37 + backfillMappingId?: string; 38 + backfillRequestId?: string; 33 39 lastUpdate: number; 34 40 } 35 41 ··· 285 291 nextCheckTime, 286 292 nextCheckMinutes: Math.ceil(nextRunMs / 60000), 287 293 checkIntervalMinutes: config.checkIntervalMinutes, 288 - pendingBackfills, 294 + pendingBackfills: pendingBackfills 295 + .slice() 296 + .sort((a, b) => a.sequence - b.sequence) 297 + .map((backfill, index) => ({ 298 + ...backfill, 299 + position: index + 1, 300 + })), 289 301 currentStatus: currentAppStatus, 290 302 }); 291 303 }); ··· 307 319 return; 308 320 } 309 321 310 - if (!pendingBackfills.find((b) => b.id === id)) { 311 - pendingBackfills.push({ id, limit: limit ? Number(limit) : undefined }); 312 - } 322 + const queuedAt = Date.now(); 323 + const sequence = backfillSequence++; 324 + const requestId = Math.random().toString(36).slice(2); 325 + pendingBackfills = pendingBackfills.filter((b) => b.id !== id); 326 + pendingBackfills.push({ 327 + id, 328 + limit: limit ? Number(limit) : undefined, 329 + queuedAt, 330 + sequence, 331 + requestId, 332 + }); 333 + pendingBackfills.sort((a, b) => a.sequence - b.sequence); 313 334 314 335 // Do not force a global run; the scheduler loop will pick up the pending backfill in ~5s 315 - res.json({ success: true, message: `Backfill queued for @${mapping.twitterUsernames.join(', ')}` }); 336 + res.json({ 337 + success: true, 338 + message: `Backfill queued for @${mapping.twitterUsernames.join(', ')}`, 339 + requestId, 340 + }); 316 341 }); 317 342 318 343 app.delete('/api/backfill/:id', authenticateToken, (req, res) => { ··· 323 348 324 349 app.post('/api/backfill/clear-all', authenticateToken, requireAdmin, (_req, res) => { 325 350 pendingBackfills = []; 351 + updateAppStatus({ 352 + state: 'idle', 353 + message: 'All backfills cleared', 354 + backfillMappingId: undefined, 355 + backfillRequestId: undefined, 356 + }); 326 357 res.json({ success: true, message: 'All backfills cleared' }); 327 358 }); 328 359 ··· 392 423 } 393 424 394 425 export function getPendingBackfills(): PendingBackfill[] { 395 - return [...pendingBackfills]; 426 + return [...pendingBackfills].sort((a, b) => a.sequence - b.sequence); 396 427 } 397 428 398 429 export function getNextCheckTime(): number { 399 430 return nextCheckTime; 400 431 } 401 432 402 - export function clearBackfill(id: string) { 433 + export function clearBackfill(id: string, requestId?: string) { 434 + if (requestId) { 435 + pendingBackfills = pendingBackfills.filter((bid) => !(bid.id === id && bid.requestId === requestId)); 436 + return; 437 + } 403 438 pendingBackfills = pendingBackfills.filter((bid) => bid.id !== id); 404 439 } 405 440