Malachite is a tool to import your Last.fm and Spotify listening history to the AT Protocol network using the fm.teal.alpha.feed.play lexicon.
malachite scrobbles importer atproto music
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(import): add CAR sync fallback to applyWrites when dedup check fails

Prefer CAR export (com.atproto.sync.getRepo) for deduplication since it uses a
separate, more generous rate-limit bucket and avoids write-quota costs. If the
CAR sync check fails (network/auth/PDS unsupported), fall back gracefully by
skipping deduplication and allowing applyWrites to handle existing records.

The PDS rejects creates for already-existing rkeys while still accepting new
records, ensuring imports continue without aborting. Added logging and
conditional sync stats to reflect whether CAR-based deduplication succeeded.

Applied to both CLI and web import flows.

+71 -22
+38 -11
src/lib/cli.ts
··· 600 600 601 601 if (agent) { 602 602 const originalRecords = [...records]; 603 - const existingRecords = await fetchExistingRecords(agent, cfg, args.fresh ?? false); 604 - records = filterNewRecords(records, existingRecords); 605 - if (records.length === 0) { 603 + 604 + // CAR export (com.atproto.sync.getRepo) sits on a separate, far more 605 + // generous rate-limit bucket and costs zero write-quota points — prefer 606 + // it for the dedup check so we only call applyWrites for new records. 607 + // If CAR is unavailable for any reason (network error, auth, PDS doesn't 608 + // support it), fall back gracefully: proceed with the full record set and 609 + // let applyWrites handle everything. The PDS rejects creates for rkeys 610 + // that already exist, but new records still land and the import continues. 611 + let carSyncOk = true; 612 + let existingMap: Awaited<ReturnType<typeof fetchExistingRecords>>; 613 + try { 614 + existingMap = await fetchExistingRecords(agent, cfg, args.fresh ?? false); 615 + } catch (carErr) { 616 + carSyncOk = false; 617 + const msg = (carErr as Error)?.message ?? String(carErr); 618 + log.warn(`⚠️ CAR sync check unavailable: ${msg}`); 619 + log.warn(` Falling back to full applyWrites — existing records will be rejected by the PDS, new ones will land correctly.`); 620 + log.blank(); 621 + existingMap = new Map(); 622 + } 623 + 624 + records = filterNewRecords(records, existingMap); 625 + 626 + if (records.length === 0 && carSyncOk) { 606 627 log.success('All records already exist in Teal. Nothing to import!'); 607 628 process.exit(0); 608 629 } 609 - if (mode === 'sync' || mode === 'combined') { 610 - displaySyncStats(originalRecords, existingRecords, records); 611 - } else { 612 - const skipped = originalRecords.length - records.length; 613 - if (skipped > 0) { 614 - log.info(`Found ${skipped.toLocaleString()} record(s) already in Teal (skipping)`); 615 - log.info(`New records to import: ${records.length.toLocaleString()}`); 630 + 631 + if (carSyncOk) { 632 + if (mode === 'sync' || mode === 'combined') { 633 + displaySyncStats(originalRecords, existingMap, records); 616 634 } else { 617 - log.info(`All ${records.length.toLocaleString()} records are new`); 635 + const skipped = originalRecords.length - records.length; 636 + if (skipped > 0) { 637 + log.info(`Found ${skipped.toLocaleString()} record(s) already in Teal (skipping)`); 638 + log.info(`New records to import: ${records.length.toLocaleString()}`); 639 + } else { 640 + log.info(`All ${records.length.toLocaleString()} records are new`); 641 + } 642 + log.blank(); 618 643 } 644 + } else { 645 + log.info(`${records.length.toLocaleString()} record(s) queued (deduplication skipped — CAR unavailable)`); 619 646 log.blank(); 620 647 } 621 648 }
+33 -11
web/src/lib/core/import.ts
··· 19 19 fetchAllRecordsForDedup, 20 20 findDuplicateGroups, 21 21 removeDuplicateRecords, 22 + type ExistingRecord, 22 23 } from '$core/sync.js'; 23 24 import { publishRecords, type PublishProgress } from '$core/publisher.js'; 24 25 ··· 121 122 records = unique; 122 123 if (inputDups > 0) onLog('warn', `Removed ${inputDups.toLocaleString()} duplicate(s) from input`); 123 124 124 - // ── Sync check ─────────────────────────────────────────────────────────── 125 + // ── Sync check (CAR primary; applyWrites fallback) ─────────────────────── 126 + // CAR export (com.atproto.sync.getRepo) sits on a separate, far more 127 + // generous rate-limit bucket and costs zero write-quota points — it's the 128 + // preferred way to identify records we already have so we skip them. 129 + // If CAR is unavailable for any reason we fall back gracefully: the record 130 + // list is left unfiltered and applyWrites handles everything. The PDS 131 + // will reject creates for rkeys that already exist, but new records still 132 + // land correctly and the import isn't aborted. 125 133 onLog('section', '── Sync Check ───────────────────────────────────────'); 126 - onLog('info', 'Fetching existing records from Teal…'); 127 - const existing = await fetchExistingRecords( 128 - agent, 129 - (n) => onLog('progress', ` Fetched ${n.toLocaleString()} existing records…`), 130 - fresh, 131 - sig, 132 - ); 134 + onLog('info', 'Fetching existing records via CAR export…'); 135 + let existing: Map<string, ExistingRecord>; 136 + let carSyncOk = true; 137 + try { 138 + existing = await fetchExistingRecords( 139 + agent, 140 + (n) => onLog('progress', ` Fetched ${n.toLocaleString()} existing records…`), 141 + fresh, 142 + sig, 143 + ); 144 + } catch (carErr) { 145 + carSyncOk = false; 146 + const msg = carErr instanceof Error ? carErr.message : String(carErr); 147 + onLog('warn', `⚠️ CAR sync check unavailable: ${msg}`); 148 + onLog('warn', ` Falling back to full applyWrites — existing records will be rejected by the PDS, new ones will land correctly.`); 149 + existing = new Map(); 150 + } 133 151 const before = records.length; 134 152 records = filterNewRecords(records, existing); 135 - const skipped = before - records.length; 136 - if (skipped > 0) onLog('info', `Skipped ${skipped.toLocaleString()} already-imported record(s)`); 137 - onLog('success', `${records.length.toLocaleString()} new record(s) to import`); 153 + if (carSyncOk) { 154 + const skipped = before - records.length; 155 + if (skipped > 0) onLog('info', `Skipped ${skipped.toLocaleString()} already-imported record(s)`); 156 + onLog('success', `${records.length.toLocaleString()} new record(s) to import`); 157 + } else { 158 + onLog('info', `${records.length.toLocaleString()} record(s) queued for import (deduplication skipped — CAR unavailable)`); 159 + } 138 160 139 161 if (records.length === 0) { 140 162 onLog('success', '✓ Nothing to import — all records already exist in Teal!');