Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add import service and models

+369
+138
src/Import/ImportResult.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Import; 4 + 5 + /** 6 + * Immutable value object representing the result of an import operation. 7 + */ 8 + readonly class ImportResult 9 + { 10 + public function __construct( 11 + public string $did, 12 + public string $collection, 13 + public int $recordsSynced, 14 + public int $recordsSkipped, 15 + public int $recordsFailed, 16 + public bool $completed, 17 + public ?string $cursor = null, 18 + public ?string $error = null, 19 + ) {} 20 + 21 + /** 22 + * Check if the import completed successfully. 23 + */ 24 + public function isSuccess(): bool 25 + { 26 + return $this->completed && $this->error === null; 27 + } 28 + 29 + /** 30 + * Check if the import was partially completed. 31 + */ 32 + public function isPartial(): bool 33 + { 34 + return ! $this->completed && $this->recordsSynced > 0; 35 + } 36 + 37 + /** 38 + * Check if the import failed. 39 + */ 40 + public function isFailed(): bool 41 + { 42 + return $this->error !== null; 43 + } 44 + 45 + /** 46 + * Get total records processed. 47 + */ 48 + public function totalProcessed(): int 49 + { 50 + return $this->recordsSynced + $this->recordsSkipped + $this->recordsFailed; 51 + } 52 + 53 + /** 54 + * Create a successful result. 55 + */ 56 + public static function success(string $did, string $collection, int $synced, int $skipped = 0, int $failed = 0): self 57 + { 58 + return new self( 59 + did: $did, 60 + collection: $collection, 61 + recordsSynced: $synced, 62 + recordsSkipped: $skipped, 63 + recordsFailed: $failed, 64 + completed: true, 65 + ); 66 + } 67 + 68 + /** 69 + * Create a partial result (incomplete). 70 + */ 71 + public static function partial(string $did, string $collection, int $synced, string $cursor, int $skipped = 0, int $failed = 0): self 72 + { 73 + return new self( 74 + did: $did, 75 + collection: $collection, 76 + recordsSynced: $synced, 77 + recordsSkipped: $skipped, 78 + recordsFailed: $failed, 79 + completed: false, 80 + cursor: $cursor, 81 + ); 82 + } 83 + 84 + /** 85 + * Create a failed result. 86 + */ 87 + public static function failed(string $did, string $collection, string $error, int $synced = 0, int $skipped = 0, int $failed = 0, ?string $cursor = null): self 88 + { 89 + return new self( 90 + did: $did, 91 + collection: $collection, 92 + recordsSynced: $synced, 93 + recordsSkipped: $skipped, 94 + recordsFailed: $failed, 95 + completed: false, 96 + cursor: $cursor, 97 + error: $error, 98 + ); 99 + } 100 + 101 + /** 102 + * Merge multiple results for the same DID into one aggregate result. 103 + * 104 + * @param ImportResult[] $results 105 + */ 106 + public static function aggregate(string $did, array $results): self 107 + { 108 + $synced = 0; 109 + $skipped = 0; 110 + $failed = 0; 111 + $errors = []; 112 + $allCompleted = true; 113 + 114 + foreach ($results as $result) { 115 + $synced += $result->recordsSynced; 116 + $skipped += $result->recordsSkipped; 117 + $failed += $result->recordsFailed; 118 + 119 + if (! $result->completed) { 120 + $allCompleted = false; 121 + } 122 + 123 + if ($result->error) { 124 + $errors[] = "{$result->collection}: {$result->error}"; 125 + } 126 + } 127 + 128 + return new self( 129 + did: $did, 130 + collection: '*', 131 + recordsSynced: $synced, 132 + recordsSkipped: $skipped, 133 + recordsFailed: $failed, 134 + completed: $allCompleted, 135 + error: $errors ? implode('; ', $errors) : null, 136 + ); 137 + } 138 + }
+231
src/Import/ImportState.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Import; 4 + 5 + use Illuminate\Database\Eloquent\Builder; 6 + use Illuminate\Database\Eloquent\Model; 7 + 8 + /** 9 + * Tracks import progress for a DID/collection pair. 10 + * 11 + * @property int $id 12 + * @property string $did 13 + * @property string $collection 14 + * @property string $status 15 + * @property string|null $cursor 16 + * @property int $records_synced 17 + * @property int $records_skipped 18 + * @property int $records_failed 19 + * @property \Carbon\Carbon|null $started_at 20 + * @property \Carbon\Carbon|null $completed_at 21 + * @property string|null $error 22 + * @property \Carbon\Carbon $created_at 23 + * @property \Carbon\Carbon $updated_at 24 + */ 25 + class ImportState extends Model 26 + { 27 + public const STATUS_PENDING = 'pending'; 28 + 29 + public const STATUS_IN_PROGRESS = 'in_progress'; 30 + 31 + public const STATUS_COMPLETED = 'completed'; 32 + 33 + public const STATUS_FAILED = 'failed'; 34 + 35 + protected $fillable = [ 36 + 'did', 37 + 'collection', 38 + 'status', 39 + 'cursor', 40 + 'records_synced', 41 + 'records_skipped', 42 + 'records_failed', 43 + 'started_at', 44 + 'completed_at', 45 + 'error', 46 + ]; 47 + 48 + protected $casts = [ 49 + 'records_synced' => 'integer', 50 + 'records_skipped' => 'integer', 51 + 'records_failed' => 'integer', 52 + 'started_at' => 'datetime', 53 + 'completed_at' => 'datetime', 54 + ]; 55 + 56 + public function getTable(): string 57 + { 58 + return config('parity.import.state_table', 'parity_import_states'); 59 + } 60 + 61 + /** 62 + * Start the import process for this state. 63 + */ 64 + public function markStarted(): self 65 + { 66 + $this->update([ 67 + 'status' => self::STATUS_IN_PROGRESS, 68 + 'started_at' => now(), 69 + 'error' => null, 70 + ]); 71 + 72 + return $this; 73 + } 74 + 75 + /** 76 + * Mark the import as completed. 77 + */ 78 + public function markCompleted(): self 79 + { 80 + $this->update([ 81 + 'status' => self::STATUS_COMPLETED, 82 + 'completed_at' => now(), 83 + 'cursor' => null, 84 + ]); 85 + 86 + return $this; 87 + } 88 + 89 + /** 90 + * Mark the import as failed. 91 + */ 92 + public function markFailed(string $error): self 93 + { 94 + $this->update([ 95 + 'status' => self::STATUS_FAILED, 96 + 'error' => $error, 97 + ]); 98 + 99 + return $this; 100 + } 101 + 102 + /** 103 + * Update progress during import. 104 + */ 105 + public function updateProgress(int $synced, int $skipped = 0, int $failed = 0, ?string $cursor = null): self 106 + { 107 + $this->increment('records_synced', $synced); 108 + 109 + if ($skipped > 0) { 110 + $this->increment('records_skipped', $skipped); 111 + } 112 + 113 + if ($failed > 0) { 114 + $this->increment('records_failed', $failed); 115 + } 116 + 117 + if ($cursor !== null) { 118 + $this->update(['cursor' => $cursor]); 119 + } 120 + 121 + return $this; 122 + } 123 + 124 + /** 125 + * Check if this import can be resumed. 126 + */ 127 + public function canResume(): bool 128 + { 129 + return $this->status === self::STATUS_IN_PROGRESS 130 + || $this->status === self::STATUS_FAILED; 131 + } 132 + 133 + /** 134 + * Check if this import is complete. 135 + */ 136 + public function isComplete(): bool 137 + { 138 + return $this->status === self::STATUS_COMPLETED; 139 + } 140 + 141 + /** 142 + * Check if this import is currently running. 143 + */ 144 + public function isRunning(): bool 145 + { 146 + return $this->status === self::STATUS_IN_PROGRESS; 147 + } 148 + 149 + /** 150 + * Scope to pending imports. 151 + */ 152 + public function scopePending(Builder $query): Builder 153 + { 154 + return $query->where('status', self::STATUS_PENDING); 155 + } 156 + 157 + /** 158 + * Scope to in-progress imports. 159 + */ 160 + public function scopeInProgress(Builder $query): Builder 161 + { 162 + return $query->where('status', self::STATUS_IN_PROGRESS); 163 + } 164 + 165 + /** 166 + * Scope to completed imports. 167 + */ 168 + public function scopeCompleted(Builder $query): Builder 169 + { 170 + return $query->where('status', self::STATUS_COMPLETED); 171 + } 172 + 173 + /** 174 + * Scope to failed imports. 175 + */ 176 + public function scopeFailed(Builder $query): Builder 177 + { 178 + return $query->where('status', self::STATUS_FAILED); 179 + } 180 + 181 + /** 182 + * Scope to incomplete imports (pending, in_progress, or failed). 183 + */ 184 + public function scopeIncomplete(Builder $query): Builder 185 + { 186 + return $query->whereIn('status', [ 187 + self::STATUS_PENDING, 188 + self::STATUS_IN_PROGRESS, 189 + self::STATUS_FAILED, 190 + ]); 191 + } 192 + 193 + /** 194 + * Scope to resumable imports (in_progress or failed with cursor). 195 + */ 196 + public function scopeResumable(Builder $query): Builder 197 + { 198 + return $query->whereIn('status', [ 199 + self::STATUS_IN_PROGRESS, 200 + self::STATUS_FAILED, 201 + ]); 202 + } 203 + 204 + /** 205 + * Find or create an import state for a DID/collection pair. 206 + */ 207 + public static function findOrCreateFor(string $did, string $collection): self 208 + { 209 + return static::firstOrCreate( 210 + ['did' => $did, 'collection' => $collection], 211 + ['status' => self::STATUS_PENDING] 212 + ); 213 + } 214 + 215 + /** 216 + * Convert to ImportResult. 217 + */ 218 + public function toResult(): ImportResult 219 + { 220 + return new ImportResult( 221 + did: $this->did, 222 + collection: $this->collection, 223 + recordsSynced: $this->records_synced, 224 + recordsSkipped: $this->records_skipped, 225 + recordsFailed: $this->records_failed, 226 + completed: $this->isComplete(), 227 + cursor: $this->cursor, 228 + error: $this->error, 229 + ); 230 + } 231 + }