Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 491 lines 10 kB view raw view rendered
1# atp-signals Integration 2 3Parity integrates with atp-signals to automatically sync firehose events to your Eloquent models in real-time. The `ParitySignal` class handles create, update, and delete operations for all registered mappers. 4 5## ParitySignal 6 7The `ParitySignal` is a pre-built signal that listens for commit events and syncs them to your database using your registered mappers. 8 9### How It Works 10 111. ParitySignal listens for `commit` events on the firehose 122. It filters for collections that have registered mappers 133. For each matching event: 14 - **Create/Update**: Upserts the record to your database 15 - **Delete**: Removes the record from your database 16 17### Setup 18 19Register the signal in your atp-signals config: 20 21```php 22// config/signal.php 23return [ 24 'signals' => [ 25 \SocialDept\AtpParity\Signals\ParitySignal::class, 26 ], 27]; 28``` 29 30Then start consuming: 31 32```bash 33php artisan signal:consume 34``` 35 36That's it. Your models will automatically sync with the firehose. 37 38## What Gets Synced 39 40ParitySignal only syncs collections that have registered mappers: 41 42```php 43// config/parity.php 44return [ 45 'mappers' => [ 46 App\AtpMappers\PostMapper::class, // app.bsky.feed.post 47 App\AtpMappers\LikeMapper::class, // app.bsky.feed.like 48 App\AtpMappers\FollowMapper::class, // app.bsky.graph.follow 49 ], 50]; 51``` 52 53With this config, ParitySignal will sync posts, likes, and follows. All other collections are ignored. 54 55## Event Flow 56 57``` 58Firehose Event 59 60ParitySignal.handle() 61 62Check: Is collection registered? 63 64 Yes → Get mapper for collection 65 66Create DTO from event record 67 68Call mapper.upsert() or mapper.deleteByUri() 69 70Model saved to database 71``` 72 73## Example: Syncing Posts 74 75### 1. Create the Model 76 77```php 78// app/Models/Post.php 79namespace App\Models; 80 81use Illuminate\Database\Eloquent\Model; 82use SocialDept\AtpParity\Concerns\SyncsWithAtp; 83 84class Post extends Model 85{ 86 use SyncsWithAtp; 87 88 protected $fillable = [ 89 'content', 90 'author_did', 91 'published_at', 92 'atp_uri', 93 'atp_cid', 94 'atp_synced_at', 95 ]; 96 97 protected $casts = [ 98 'published_at' => 'datetime', 99 'atp_synced_at' => 'datetime', 100 ]; 101} 102``` 103 104### 2. Create the Migration 105 106```php 107Schema::create('posts', function (Blueprint $table) { 108 $table->id(); 109 $table->text('content'); 110 $table->string('author_did'); 111 $table->timestamp('published_at'); 112 $table->string('atp_uri')->unique(); 113 $table->string('atp_cid'); 114 $table->timestamp('atp_synced_at')->nullable(); 115 $table->timestamps(); 116}); 117``` 118 119### 3. Create the Mapper 120 121```php 122// app/AtpMappers/PostMapper.php 123namespace App\AtpMappers; 124 125use App\Models\Post; 126use Illuminate\Database\Eloquent\Model; 127use SocialDept\AtpParity\RecordMapper; 128use SocialDept\AtpSchema\Data\Data; 129use SocialDept\AtpSchema\Generated\App\Bsky\Feed\Post as PostRecord; 130 131class PostMapper extends RecordMapper 132{ 133 public function recordClass(): string 134 { 135 return PostRecord::class; 136 } 137 138 public function modelClass(): string 139 { 140 return Post::class; 141 } 142 143 protected function recordToAttributes(Data $record): array 144 { 145 return [ 146 'content' => $record->text, 147 'published_at' => $record->createdAt, 148 ]; 149 } 150 151 protected function modelToRecordData(Model $model): array 152 { 153 return [ 154 'text' => $model->content, 155 'createdAt' => $model->published_at->toIso8601String(), 156 ]; 157 } 158} 159``` 160 161### 4. Register Everything 162 163```php 164// config/parity.php 165return [ 166 'mappers' => [ 167 App\AtpMappers\PostMapper::class, 168 ], 169]; 170``` 171 172```php 173// config/signal.php 174return [ 175 'signals' => [ 176 \SocialDept\AtpParity\Signals\ParitySignal::class, 177 ], 178]; 179``` 180 181### 5. Start Syncing 182 183```bash 184php artisan signal:consume 185``` 186 187Every new post on the AT Protocol network will now be saved to your `posts` table. 188 189## Filtering by User 190 191To only sync records from specific users, create a custom signal: 192 193```php 194namespace App\Signals; 195 196use SocialDept\AtpParity\Signals\ParitySignal; 197use SocialDept\AtpSignals\Events\SignalEvent; 198 199class FilteredParitySignal extends ParitySignal 200{ 201 /** 202 * DIDs to sync. 203 */ 204 protected array $allowedDids = [ 205 'did:plc:abc123', 206 'did:plc:def456', 207 ]; 208 209 public function handle(SignalEvent $event): void 210 { 211 // Only process events from allowed DIDs 212 if (!in_array($event->did, $this->allowedDids)) { 213 return; 214 } 215 216 parent::handle($event); 217 } 218} 219``` 220 221Register your custom signal instead: 222 223```php 224// config/signal.php 225return [ 226 'signals' => [ 227 App\Signals\FilteredParitySignal::class, 228 ], 229]; 230``` 231 232## Filtering by Collection 233 234To only sync specific collections (even if more mappers are registered): 235 236```php 237namespace App\Signals; 238 239use SocialDept\AtpParity\Signals\ParitySignal; 240 241class PostsOnlySignal extends ParitySignal 242{ 243 public function collections(): ?array 244 { 245 // Only sync posts, ignore other registered mappers 246 return ['app.bsky.feed.post']; 247 } 248} 249``` 250 251## Custom Processing 252 253Add custom logic before or after syncing: 254 255```php 256namespace App\Signals; 257 258use SocialDept\AtpParity\Contracts\RecordMapper; 259use SocialDept\AtpParity\Signals\ParitySignal; 260use SocialDept\AtpSignals\Events\SignalEvent; 261 262class CustomParitySignal extends ParitySignal 263{ 264 protected function handleUpsert(SignalEvent $event, RecordMapper $mapper): void 265 { 266 // Pre-processing 267 logger()->info('Syncing record', [ 268 'did' => $event->did, 269 'collection' => $event->commit->collection, 270 'rkey' => $event->commit->rkey, 271 ]); 272 273 // Call parent to do the actual sync 274 parent::handleUpsert($event, $mapper); 275 276 // Post-processing 277 // e.g., dispatch a job, send notification, etc. 278 } 279 280 protected function handleDelete(SignalEvent $event, RecordMapper $mapper): void 281 { 282 logger()->info('Deleting record', [ 283 'uri' => $this->buildUri($event->did, $event->commit->collection, $event->commit->rkey), 284 ]); 285 286 parent::handleDelete($event, $mapper); 287 } 288} 289``` 290 291## Queue Integration 292 293For high-volume processing, enable queue mode: 294 295```php 296namespace App\Signals; 297 298use SocialDept\AtpParity\Signals\ParitySignal; 299 300class QueuedParitySignal extends ParitySignal 301{ 302 public function shouldQueue(): bool 303 { 304 return true; 305 } 306 307 public function queue(): string 308 { 309 return 'parity-sync'; 310 } 311} 312``` 313 314Then run a dedicated queue worker: 315 316```bash 317php artisan queue:work --queue=parity-sync 318``` 319 320## Multiple Signals 321 322You can run ParitySignal alongside other signals: 323 324```php 325// config/signal.php 326return [ 327 'signals' => [ 328 // Sync to database 329 \SocialDept\AtpParity\Signals\ParitySignal::class, 330 331 // Your custom analytics signal 332 App\Signals\AnalyticsSignal::class, 333 334 // Your moderation signal 335 App\Signals\ModerationSignal::class, 336 ], 337]; 338``` 339 340## Handling High Volume 341 342The AT Protocol firehose processes thousands of events per second. For production: 343 344### 1. Use Jetstream Mode 345 346Jetstream filters server-side, reducing bandwidth: 347 348```php 349// config/signal.php 350return [ 351 'mode' => 'jetstream', // More efficient than firehose 352 353 'jetstream' => [ 354 'collections' => [ 355 'app.bsky.feed.post', 356 'app.bsky.feed.like', 357 ], 358 ], 359]; 360``` 361 362### 2. Enable Queues 363 364Process events asynchronously: 365 366```php 367class QueuedParitySignal extends ParitySignal 368{ 369 public function shouldQueue(): bool 370 { 371 return true; 372 } 373} 374``` 375 376### 3. Use Database Transactions 377 378Batch inserts for better performance: 379 380```php 381namespace App\Signals; 382 383use Illuminate\Support\Facades\DB; 384use SocialDept\AtpParity\Signals\ParitySignal; 385use SocialDept\AtpSignals\Events\SignalEvent; 386 387class BatchedParitySignal extends ParitySignal 388{ 389 protected array $buffer = []; 390 protected int $batchSize = 100; 391 392 public function handle(SignalEvent $event): void 393 { 394 $this->buffer[] = $event; 395 396 if (count($this->buffer) >= $this->batchSize) { 397 $this->flush(); 398 } 399 } 400 401 protected function flush(): void 402 { 403 DB::transaction(function () { 404 foreach ($this->buffer as $event) { 405 parent::handle($event); 406 } 407 }); 408 409 $this->buffer = []; 410 } 411} 412``` 413 414### 4. Monitor Performance 415 416Log sync statistics: 417 418```php 419namespace App\Signals; 420 421use SocialDept\AtpParity\Signals\ParitySignal; 422use SocialDept\AtpSignals\Events\SignalEvent; 423 424class MonitoredParitySignal extends ParitySignal 425{ 426 protected int $processed = 0; 427 protected float $startTime; 428 429 public function handle(SignalEvent $event): void 430 { 431 $this->startTime ??= microtime(true); 432 433 parent::handle($event); 434 435 $this->processed++; 436 437 if ($this->processed % 1000 === 0) { 438 $elapsed = microtime(true) - $this->startTime; 439 $rate = $this->processed / $elapsed; 440 441 logger()->info("Parity sync stats", [ 442 'processed' => $this->processed, 443 'elapsed' => round($elapsed, 2), 444 'rate' => round($rate, 2) . '/sec', 445 ]); 446 } 447 } 448} 449``` 450 451## Cursor Management 452 453atp-signals handles cursor persistence automatically. If the consumer restarts, it resumes from where it left off. 454 455To reset and start fresh: 456 457```bash 458php artisan signal:consume --reset 459``` 460 461## Testing 462 463Test your sync setup without connecting to the firehose: 464 465```php 466use App\AtpMappers\PostMapper; 467use SocialDept\AtpParity\MapperRegistry; 468use SocialDept\AtpSchema\Generated\App\Bsky\Feed\Post; 469 470// Create a test record 471$record = Post::fromArray([ 472 'text' => 'Test post content', 473 'createdAt' => now()->toIso8601String(), 474]); 475 476// Get the mapper 477$registry = app(MapperRegistry::class); 478$mapper = $registry->forLexicon('app.bsky.feed.post'); 479 480// Simulate a sync 481$model = $mapper->upsert($record, [ 482 'uri' => 'at://did:plc:test/app.bsky.feed.post/test123', 483 'cid' => 'bafyretest...', 484]); 485 486// Assert 487$this->assertDatabaseHas('posts', [ 488 'content' => 'Test post content', 489 'atp_uri' => 'at://did:plc:test/app.bsky.feed.post/test123', 490]); 491```