Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add atp-signals integration

+234
+234
src/Signals/ParitySignal.php
··· 1 + <?php 2 + 3 + namespace SocialDept\AtpParity\Signals; 4 + 5 + use SocialDept\AtpParity\Contracts\RecordMapper; 6 + use SocialDept\AtpParity\MapperRegistry; 7 + use SocialDept\AtpParity\Sync\ConflictDetector; 8 + use SocialDept\AtpParity\Sync\ConflictResolver; 9 + use SocialDept\AtpParity\Sync\ConflictStrategy; 10 + use SocialDept\AtpSignals\Events\SignalEvent; 11 + use SocialDept\AtpSignals\Signals\Signal; 12 + 13 + /** 14 + * Signal that automatically syncs firehose events to Eloquent models. 15 + * 16 + * This signal listens for commit events on collections that have registered 17 + * mappers and automatically creates, updates, or deletes the corresponding 18 + * Eloquent models. 19 + * 20 + * Supports selective sync via configuration or by extending this class: 21 + * - Filter by DID: config('parity.sync.dids') or override dids() 22 + * - Filter by operation: config('parity.sync.operations') or override operations() 23 + * - Custom filter: config('parity.sync.filter') or override shouldSync() 24 + * 25 + * Supports conflict resolution via configuration: 26 + * - Strategy: config('parity.conflicts.strategy') - 'remote', 'local', 'newest', 'manual' 27 + * 28 + * To use this signal, register it in your atp-signals config: 29 + * 30 + * // config/signal.php 31 + * return [ 32 + * 'signals' => [ 33 + * \SocialDept\AtpParity\Signals\ParitySignal::class, 34 + * ], 35 + * ]; 36 + */ 37 + class ParitySignal extends Signal 38 + { 39 + protected ConflictDetector $conflictDetector; 40 + 41 + protected ConflictResolver $conflictResolver; 42 + 43 + public function __construct( 44 + protected MapperRegistry $registry 45 + ) { 46 + $this->conflictDetector = new ConflictDetector; 47 + $this->conflictResolver = new ConflictResolver; 48 + } 49 + 50 + /** 51 + * Listen for commit events only. 52 + */ 53 + public function eventTypes(): array 54 + { 55 + return ['commit']; 56 + } 57 + 58 + /** 59 + * Only listen for collections that have registered mappers. 60 + */ 61 + public function collections(): ?array 62 + { 63 + $lexicons = $this->registry->lexicons(); 64 + 65 + // Return null if no mappers registered (don't match anything) 66 + return empty($lexicons) ? ['__none__'] : $lexicons; 67 + } 68 + 69 + /** 70 + * Get the DIDs to sync (null = all DIDs). 71 + * 72 + * Override this method for custom DID filtering logic. 73 + */ 74 + public function dids(): ?array 75 + { 76 + return config('parity.sync.dids'); 77 + } 78 + 79 + /** 80 + * Get the operations to sync (null = all operations). 81 + * 82 + * Possible values: 'create', 'update', 'delete' 83 + * Override this method for custom operation filtering. 84 + */ 85 + public function operations(): ?array 86 + { 87 + return config('parity.sync.operations'); 88 + } 89 + 90 + /** 91 + * Determine if the event should be synced. 92 + * 93 + * Override this method for custom filtering logic. 94 + */ 95 + public function shouldSync(SignalEvent $event): bool 96 + { 97 + // Check custom filter callback from config 98 + $filter = config('parity.sync.filter'); 99 + if ($filter && is_callable($filter)) { 100 + return $filter($event); 101 + } 102 + 103 + return true; 104 + } 105 + 106 + /** 107 + * Handle the firehose event. 108 + */ 109 + public function handle(SignalEvent $event): void 110 + { 111 + if (! $event->commit) { 112 + return; 113 + } 114 + 115 + // Apply DID filter 116 + $dids = $this->dids(); 117 + if ($dids !== null && ! in_array($event->did, $dids)) { 118 + return; 119 + } 120 + 121 + $commit = $event->commit; 122 + 123 + // Apply operation filter 124 + $operations = $this->operations(); 125 + if ($operations !== null) { 126 + $operation = $this->getOperationType($commit); 127 + if (! in_array($operation, $operations)) { 128 + return; 129 + } 130 + } 131 + 132 + // Apply custom filter 133 + if (! $this->shouldSync($event)) { 134 + return; 135 + } 136 + 137 + $mapper = $this->registry->forLexicon($commit->collection); 138 + 139 + if (! $mapper) { 140 + return; 141 + } 142 + 143 + if ($commit->isCreate() || $commit->isUpdate()) { 144 + $this->handleUpsert($event, $mapper); 145 + } elseif ($commit->isDelete()) { 146 + $this->handleDelete($event, $mapper); 147 + } 148 + } 149 + 150 + /** 151 + * Get the operation type from a commit. 152 + */ 153 + protected function getOperationType(object $commit): string 154 + { 155 + if ($commit->isCreate()) { 156 + return 'create'; 157 + } 158 + 159 + if ($commit->isUpdate()) { 160 + return 'update'; 161 + } 162 + 163 + if ($commit->isDelete()) { 164 + return 'delete'; 165 + } 166 + 167 + return 'unknown'; 168 + } 169 + 170 + /** 171 + * Handle create or update operations. 172 + */ 173 + protected function handleUpsert(SignalEvent $event, RecordMapper $mapper): void 174 + { 175 + $commit = $event->commit; 176 + 177 + if (! $commit->record) { 178 + return; 179 + } 180 + 181 + $recordClass = $mapper->recordClass(); 182 + $record = $recordClass::fromArray((array) $commit->record); 183 + 184 + $uri = $this->buildUri($event->did, $commit->collection, $commit->rkey); 185 + $meta = [ 186 + 'uri' => $uri, 187 + 'cid' => $commit->cid, 188 + ]; 189 + 190 + // Check for existing model and potential conflict 191 + $existing = $mapper->findByUri($uri); 192 + 193 + if ($existing && $this->conflictDetector->hasConflict($existing, $record, $commit->cid)) { 194 + $strategy = ConflictStrategy::fromConfig(); 195 + $resolution = $this->conflictResolver->resolve( 196 + $existing, 197 + $record, 198 + $meta, 199 + $mapper, 200 + $strategy 201 + ); 202 + 203 + // If conflict is pending manual resolution, don't apply changes 204 + if (! $resolution->isResolved()) { 205 + return; 206 + } 207 + 208 + // Conflict was resolved, model already updated if needed 209 + return; 210 + } 211 + 212 + // No conflict, proceed with normal upsert 213 + $mapper->upsert($record, $meta); 214 + } 215 + 216 + /** 217 + * Handle delete operations. 218 + */ 219 + protected function handleDelete(SignalEvent $event, RecordMapper $mapper): void 220 + { 221 + $commit = $event->commit; 222 + $uri = $this->buildUri($event->did, $commit->collection, $commit->rkey); 223 + 224 + $mapper->deleteByUri($uri); 225 + } 226 + 227 + /** 228 + * Build an AT Protocol URI. 229 + */ 230 + protected function buildUri(string $did, string $collection, string $rkey): string 231 + { 232 + return "at://{$did}/{$collection}/{$rkey}"; 233 + } 234 + }