Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add artisan commands

+590
+122
src/Commands/DiscoverCommand.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Commands; 4 + 5 + use Illuminate\Console\Command; 6 + use SocialDept\AtpParity\Discovery\DiscoveryService; 7 + 8 + use function Laravel\Prompts\error; 9 + use function Laravel\Prompts\info; 10 + use function Laravel\Prompts\note; 11 + 12 + class DiscoverCommand extends Command 13 + { 14 + protected $signature = 'parity:discover 15 + {collection : The collection NSID to discover (e.g., app.bsky.feed.post)} 16 + {--limit= : Maximum number of DIDs to discover} 17 + {--import : Import records for discovered DIDs} 18 + {--output= : Output DIDs to file (one per line)} 19 + {--count : Only count DIDs without listing them}'; 20 + 21 + protected $description = 'Discover DIDs with records in a specific collection'; 22 + 23 + public function handle(DiscoveryService $service): int 24 + { 25 + $collection = $this->argument('collection'); 26 + $limit = $this->option('limit') ? (int) $this->option('limit') : null; 27 + 28 + if ($this->option('count')) { 29 + return $this->handleCount($service, $collection); 30 + } 31 + 32 + if ($this->option('import')) { 33 + return $this->handleDiscoverAndImport($service, $collection, $limit); 34 + } 35 + 36 + return $this->handleDiscover($service, $collection, $limit); 37 + } 38 + 39 + protected function handleCount(DiscoveryService $service, string $collection): int 40 + { 41 + info("Counting DIDs with records in {$collection}..."); 42 + 43 + $count = $service->count($collection); 44 + 45 + info("Found {$count} DIDs"); 46 + 47 + return self::SUCCESS; 48 + } 49 + 50 + protected function handleDiscover(DiscoveryService $service, string $collection, ?int $limit): int 51 + { 52 + $limitDisplay = $limit ? " (limit: {$limit})" : ''; 53 + info("Discovering DIDs with records in {$collection}{$limitDisplay}..."); 54 + 55 + $result = $service->discover($collection, $limit); 56 + 57 + if ($result->isFailed()) { 58 + error("Discovery failed: {$result->error}"); 59 + 60 + return self::FAILURE; 61 + } 62 + 63 + if ($result->total === 0) { 64 + note('No DIDs found'); 65 + 66 + return self::SUCCESS; 67 + } 68 + 69 + // Output to file if requested 70 + if ($output = $this->option('output')) { 71 + file_put_contents($output, implode("\n", $result->dids)."\n"); 72 + info("Found {$result->total} DIDs, written to {$output}"); 73 + 74 + if ($result->isIncomplete()) { 75 + note('Results may be incomplete due to limit'); 76 + } 77 + 78 + return self::SUCCESS; 79 + } 80 + 81 + // Output to console 82 + foreach ($result->dids as $did) { 83 + $this->line($did); 84 + } 85 + 86 + info("Found {$result->total} DIDs"); 87 + 88 + if ($result->isIncomplete()) { 89 + note('Results may be incomplete due to limit'); 90 + } 91 + 92 + return self::SUCCESS; 93 + } 94 + 95 + protected function handleDiscoverAndImport(DiscoveryService $service, string $collection, ?int $limit): int 96 + { 97 + $limitDisplay = $limit ? " (limit: {$limit})" : ''; 98 + info("Discovering and importing DIDs with records in {$collection}{$limitDisplay}..."); 99 + 100 + $result = $service->discoverAndImport( 101 + $collection, 102 + $limit, 103 + function (string $did, int $count) { 104 + note("[{$count}] Importing {$did}"); 105 + } 106 + ); 107 + 108 + if ($result->isFailed()) { 109 + error("Discovery failed: {$result->error}"); 110 + 111 + return self::FAILURE; 112 + } 113 + 114 + info("Imported records for {$result->total} DIDs"); 115 + 116 + if ($result->isIncomplete()) { 117 + note('Results may be incomplete due to limit'); 118 + } 119 + 120 + return self::SUCCESS; 121 + } 122 + }
+135
src/Commands/ExportCommand.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Commands; 4 + 5 + use Illuminate\Console\Command; 6 + use SocialDept\AtpParity\Events\ImportProgress; 7 + use SocialDept\AtpParity\Export\ExportService; 8 + 9 + use function Laravel\Prompts\error; 10 + use function Laravel\Prompts\info; 11 + use function Laravel\Prompts\note; 12 + 13 + class ExportCommand extends Command 14 + { 15 + protected $signature = 'parity:export 16 + {did : The DID to export} 17 + {--output= : Output CAR file path} 18 + {--import : Import records to database instead of saving CAR file} 19 + {--collection=* : Specific collections to import (with --import)} 20 + {--since= : Only export changes since this revision} 21 + {--status : Show repository status instead of exporting}'; 22 + 23 + protected $description = 'Export an AT Protocol repository as CAR file or import to database'; 24 + 25 + public function handle(ExportService $service): int 26 + { 27 + $did = $this->argument('did'); 28 + 29 + if (! str_starts_with($did, 'did:')) { 30 + error("Invalid DID format: {$did}"); 31 + 32 + return self::FAILURE; 33 + } 34 + 35 + if ($this->option('status')) { 36 + return $this->handleStatus($service, $did); 37 + } 38 + 39 + if ($this->option('import')) { 40 + return $this->handleImport($service, $did); 41 + } 42 + 43 + return $this->handleExport($service, $did); 44 + } 45 + 46 + protected function handleStatus(ExportService $service, string $did): int 47 + { 48 + info("Getting repository status for {$did}..."); 49 + 50 + try { 51 + $commit = $service->getLatestCommit($did); 52 + $status = $service->getRepoStatus($did); 53 + 54 + $this->table(['Property', 'Value'], [ 55 + ['DID', $did], 56 + ['Latest CID', $commit['cid'] ?? 'N/A'], 57 + ['Latest Rev', $commit['rev'] ?? 'N/A'], 58 + ['Active', ($status['active'] ?? false) ? 'Yes' : 'No'], 59 + ['Status', $status['status'] ?? 'N/A'], 60 + ]); 61 + 62 + return self::SUCCESS; 63 + } catch (\Throwable $e) { 64 + error("Failed to get status: {$e->getMessage()}"); 65 + 66 + return self::FAILURE; 67 + } 68 + } 69 + 70 + protected function handleExport(ExportService $service, string $did): int 71 + { 72 + $output = $this->option('output') ?? "{$did}.car"; 73 + $since = $this->option('since'); 74 + 75 + // Sanitize filename if using DID as filename 76 + $output = str_replace([':', '/'], ['_', '_'], $output); 77 + 78 + info("Exporting repository {$did} to {$output}..."); 79 + 80 + $result = $service->exportToFile($did, $output, $since); 81 + 82 + if ($result->isFailed()) { 83 + error("Export failed: {$result->error}"); 84 + 85 + return self::FAILURE; 86 + } 87 + 88 + $size = $this->formatBytes($result->size); 89 + info("Exported {$size} to {$output}"); 90 + 91 + return self::SUCCESS; 92 + } 93 + 94 + protected function handleImport(ExportService $service, string $did): int 95 + { 96 + $collections = $this->option('collection') ?: null; 97 + $collectionDisplay = $collections ? implode(', ', $collections) : 'all registered'; 98 + 99 + info("Exporting and importing {$did} ({$collectionDisplay})..."); 100 + 101 + $result = $service->exportAndImport( 102 + $did, 103 + $collections, 104 + function (ImportProgress $progress) { 105 + $this->output->write("\r"); 106 + $this->output->write(" [{$progress->collection}] {$progress->recordsSynced} records synced"); 107 + } 108 + ); 109 + 110 + $this->output->write("\n"); 111 + 112 + if ($result->isFailed()) { 113 + error("Import failed: {$result->error}"); 114 + 115 + return self::FAILURE; 116 + } 117 + 118 + info("Imported {$result->size} records"); 119 + 120 + return self::SUCCESS; 121 + } 122 + 123 + protected function formatBytes(int $bytes): string 124 + { 125 + $units = ['B', 'KB', 'MB', 'GB']; 126 + $unit = 0; 127 + 128 + while ($bytes >= 1024 && $unit < count($units) - 1) { 129 + $bytes /= 1024; 130 + $unit++; 131 + } 132 + 133 + return round($bytes, 2).' '.$units[$unit]; 134 + } 135 + }
+190
src/Commands/ImportCommand.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Commands; 4 + 5 + use Illuminate\Console\Command; 6 + use SocialDept\AtpParity\Events\ImportProgress; 7 + use SocialDept\AtpParity\Import\ImportService; 8 + use SocialDept\AtpParity\Jobs\ImportUserJob; 9 + use SocialDept\AtpParity\MapperRegistry; 10 + 11 + use function Laravel\Prompts\error; 12 + use function Laravel\Prompts\info; 13 + use function Laravel\Prompts\note; 14 + use function Laravel\Prompts\warning; 15 + 16 + class ImportCommand extends Command 17 + { 18 + protected $signature = 'parity:import 19 + {did? : The DID to import} 20 + {--collection= : Specific collection to import} 21 + {--file= : File containing DIDs to import (one per line)} 22 + {--resume : Resume all interrupted imports} 23 + {--queue : Queue the import job instead of running synchronously} 24 + {--progress : Show progress output}'; 25 + 26 + protected $description = 'Import AT Protocol records for a user or from a file of DIDs'; 27 + 28 + public function handle(ImportService $service, MapperRegistry $registry): int 29 + { 30 + if ($this->option('resume')) { 31 + return $this->handleResume($service); 32 + } 33 + 34 + $did = $this->argument('did'); 35 + $file = $this->option('file'); 36 + 37 + if (! $did && ! $file) { 38 + error('Please provide a DID or use --file to specify a file of DIDs'); 39 + 40 + return self::FAILURE; 41 + } 42 + 43 + if ($file) { 44 + return $this->handleFile($file, $service); 45 + } 46 + 47 + return $this->importDid($did, $service, $registry); 48 + } 49 + 50 + protected function handleResume(ImportService $service): int 51 + { 52 + info('Resuming interrupted imports...'); 53 + 54 + $results = $service->resumeAll($this->getProgressCallback()); 55 + 56 + if (empty($results)) { 57 + note('No interrupted imports found'); 58 + 59 + return self::SUCCESS; 60 + } 61 + 62 + $success = 0; 63 + $failed = 0; 64 + 65 + foreach ($results as $result) { 66 + if ($result->isSuccess()) { 67 + $success++; 68 + } else { 69 + $failed++; 70 + } 71 + } 72 + 73 + info("Resumed {$success} imports successfully"); 74 + 75 + if ($failed > 0) { 76 + warning("{$failed} imports failed"); 77 + } 78 + 79 + return $failed > 0 ? self::FAILURE : self::SUCCESS; 80 + } 81 + 82 + protected function handleFile(string $file, ImportService $service): int 83 + { 84 + if (! file_exists($file)) { 85 + error("File not found: {$file}"); 86 + 87 + return self::FAILURE; 88 + } 89 + 90 + $dids = array_filter(array_map('trim', file($file))); 91 + $total = count($dids); 92 + $success = 0; 93 + $failed = 0; 94 + 95 + info("Importing {$total} DIDs from {$file}"); 96 + 97 + foreach ($dids as $index => $did) { 98 + if (! str_starts_with($did, 'did:')) { 99 + warning("Skipping invalid DID: {$did}"); 100 + 101 + continue; 102 + } 103 + 104 + $current = $index + 1; 105 + note("[{$current}/{$total}] Importing {$did}"); 106 + 107 + if ($this->option('queue')) { 108 + ImportUserJob::dispatch($did, $this->option('collection')); 109 + $success++; 110 + } else { 111 + $result = $service->importUser($did, $this->getCollections(), $this->getProgressCallback()); 112 + 113 + if ($result->isSuccess()) { 114 + $success++; 115 + } else { 116 + $failed++; 117 + warning("Failed: {$result->error}"); 118 + } 119 + } 120 + } 121 + 122 + info("Completed: {$success} successful, {$failed} failed"); 123 + 124 + return $failed > 0 ? self::FAILURE : self::SUCCESS; 125 + } 126 + 127 + protected function importDid(string $did, ImportService $service, MapperRegistry $registry): int 128 + { 129 + if (! str_starts_with($did, 'did:')) { 130 + error("Invalid DID format: {$did}"); 131 + 132 + return self::FAILURE; 133 + } 134 + 135 + $collections = $this->getCollections(); 136 + $collectionDisplay = $collections ? implode(', ', $collections) : 'all registered'; 137 + 138 + info("Importing {$did} ({$collectionDisplay})"); 139 + 140 + if ($this->option('queue')) { 141 + ImportUserJob::dispatch($did, $this->option('collection')); 142 + note('Import job queued'); 143 + 144 + return self::SUCCESS; 145 + } 146 + 147 + $result = $service->importUser($did, $collections, $this->getProgressCallback()); 148 + 149 + if ($result->isSuccess()) { 150 + info("Import completed: {$result->recordsSynced} records synced"); 151 + 152 + if ($result->recordsSkipped > 0) { 153 + note("{$result->recordsSkipped} records skipped"); 154 + } 155 + 156 + if ($result->recordsFailed > 0) { 157 + warning("{$result->recordsFailed} records failed"); 158 + } 159 + 160 + return self::SUCCESS; 161 + } 162 + 163 + error("Import failed: {$result->error}"); 164 + 165 + if ($result->recordsSynced > 0) { 166 + note("Partial progress: {$result->recordsSynced} records synced before failure"); 167 + } 168 + 169 + return self::FAILURE; 170 + } 171 + 172 + protected function getCollections(): ?array 173 + { 174 + $collection = $this->option('collection'); 175 + 176 + return $collection ? [$collection] : null; 177 + } 178 + 179 + protected function getProgressCallback(): ?callable 180 + { 181 + if (! $this->option('progress')) { 182 + return null; 183 + } 184 + 185 + return function (ImportProgress $progress) { 186 + $this->output->write("\r"); 187 + $this->output->write(" [{$progress->collection}] {$progress->recordsSynced} records synced"); 188 + }; 189 + } 190 + }
+143
src/Commands/ImportStatusCommand.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Commands; 4 + 5 + use Illuminate\Console\Command; 6 + use SocialDept\AtpParity\Import\ImportState; 7 + 8 + use function Laravel\Prompts\info; 9 + use function Laravel\Prompts\note; 10 + use function Laravel\Prompts\table; 11 + use function Laravel\Prompts\warning; 12 + 13 + class ImportStatusCommand extends Command 14 + { 15 + protected $signature = 'parity:import-status 16 + {did? : Show status for specific DID} 17 + {--pending : Show only pending/incomplete imports} 18 + {--failed : Show only failed imports} 19 + {--completed : Show only completed imports}'; 20 + 21 + protected $description = 'Show import status'; 22 + 23 + public function handle(): int 24 + { 25 + $did = $this->argument('did'); 26 + 27 + if ($did) { 28 + return $this->showDidStatus($did); 29 + } 30 + 31 + return $this->showAllStatus(); 32 + } 33 + 34 + protected function showDidStatus(string $did): int 35 + { 36 + $states = ImportState::where('did', $did)->get(); 37 + 38 + if ($states->isEmpty()) { 39 + note("No import records found for {$did}"); 40 + 41 + return self::SUCCESS; 42 + } 43 + 44 + info("Import status for {$did}"); 45 + 46 + table( 47 + headers: ['Collection', 'Status', 'Synced', 'Skipped', 'Failed', 'Started', 'Completed'], 48 + rows: $states->map(fn (ImportState $state) => [ 49 + $state->collection, 50 + $this->formatStatus($state->status), 51 + $state->records_synced, 52 + $state->records_skipped, 53 + $state->records_failed, 54 + $state->started_at?->diffForHumans() ?? '-', 55 + $state->completed_at?->diffForHumans() ?? '-', 56 + ])->toArray() 57 + ); 58 + 59 + return self::SUCCESS; 60 + } 61 + 62 + protected function showAllStatus(): int 63 + { 64 + $query = ImportState::query(); 65 + 66 + if ($this->option('pending')) { 67 + $query->incomplete(); 68 + } elseif ($this->option('failed')) { 69 + $query->failed(); 70 + } elseif ($this->option('completed')) { 71 + $query->completed(); 72 + } 73 + 74 + $states = $query->orderByDesc('updated_at')->limit(100)->get(); 75 + 76 + if ($states->isEmpty()) { 77 + note('No import records found'); 78 + 79 + return self::SUCCESS; 80 + } 81 + 82 + $this->displaySummary(); 83 + 84 + table( 85 + headers: ['DID', 'Collection', 'Status', 'Synced', 'Updated'], 86 + rows: $states->map(fn (ImportState $state) => [ 87 + $this->truncateDid($state->did), 88 + $state->collection, 89 + $this->formatStatus($state->status), 90 + $state->records_synced, 91 + $state->updated_at->diffForHumans(), 92 + ])->toArray() 93 + ); 94 + 95 + if ($states->count() >= 100) { 96 + note('Showing first 100 results. Use --pending, --failed, or --completed to filter.'); 97 + } 98 + 99 + return self::SUCCESS; 100 + } 101 + 102 + protected function displaySummary(): void 103 + { 104 + $counts = ImportState::query() 105 + ->selectRaw('status, count(*) as count') 106 + ->groupBy('status') 107 + ->pluck('count', 'status'); 108 + 109 + $pending = $counts->get('pending', 0); 110 + $inProgress = $counts->get('in_progress', 0); 111 + $completed = $counts->get('completed', 0); 112 + $failed = $counts->get('failed', 0); 113 + 114 + info("Import Status Summary"); 115 + note("Pending: {$pending} | In Progress: {$inProgress} | Completed: {$completed} | Failed: {$failed}"); 116 + 117 + if ($failed > 0) { 118 + warning("Use 'php artisan parity:import --resume' to retry failed imports"); 119 + } 120 + 121 + $this->newLine(); 122 + } 123 + 124 + protected function formatStatus(string $status): string 125 + { 126 + return match ($status) { 127 + ImportState::STATUS_PENDING => 'pending', 128 + ImportState::STATUS_IN_PROGRESS => 'running', 129 + ImportState::STATUS_COMPLETED => 'done', 130 + ImportState::STATUS_FAILED => 'FAILED', 131 + default => $status, 132 + }; 133 + } 134 + 135 + protected function truncateDid(string $did): string 136 + { 137 + if (strlen($did) <= 30) { 138 + return $did; 139 + } 140 + 141 + return substr($did, 0, 15).'...'.substr($did, -12); 142 + } 143 + }