Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1# atp-signals Integration
2
3Parity integrates with atp-signals to automatically sync firehose events to your Eloquent models in real-time. The `ParitySignal` class handles create, update, and delete operations for all registered mappers.
4
5## ParitySignal
6
7The `ParitySignal` is a pre-built signal that listens for commit events and syncs them to your database using your registered mappers.
8
9### How It Works
10
111. ParitySignal listens for `commit` events on the firehose
122. It filters for collections that have registered mappers
133. For each matching event:
14 - **Create/Update**: Upserts the record to your database
15 - **Delete**: Removes the record from your database
16
17### Setup
18
19Register the signal in your atp-signals config:
20
21```php
22// config/signal.php
23return [
24 'signals' => [
25 \SocialDept\AtpParity\Signals\ParitySignal::class,
26 ],
27];
28```
29
30Then start consuming:
31
32```bash
33php artisan signal:consume
34```
35
36That's it. Your models will automatically sync with the firehose.
37
38## What Gets Synced
39
40ParitySignal only syncs collections that have registered mappers:
41
42```php
43// config/parity.php
44return [
45 'mappers' => [
46 App\AtpMappers\PostMapper::class, // app.bsky.feed.post
47 App\AtpMappers\LikeMapper::class, // app.bsky.feed.like
48 App\AtpMappers\FollowMapper::class, // app.bsky.graph.follow
49 ],
50];
51```
52
53With this config, ParitySignal will sync posts, likes, and follows. All other collections are ignored.
54
55## Event Flow
56
57```
58Firehose Event
59 ↓
60ParitySignal.handle()
61 ↓
62Check: Is collection registered?
63 ↓
64 Yes → Get mapper for collection
65 ↓
66Create DTO from event record
67 ↓
68Call mapper.upsert() or mapper.deleteByUri()
69 ↓
70Model saved to database
71```
72
73## Example: Syncing Posts
74
75### 1. Create the Model
76
77```php
78// app/Models/Post.php
79namespace App\Models;
80
81use Illuminate\Database\Eloquent\Model;
82use SocialDept\AtpParity\Concerns\SyncsWithAtp;
83
84class Post extends Model
85{
86 use SyncsWithAtp;
87
88 protected $fillable = [
89 'content',
90 'author_did',
91 'published_at',
92 'atp_uri',
93 'atp_cid',
94 'atp_synced_at',
95 ];
96
97 protected $casts = [
98 'published_at' => 'datetime',
99 'atp_synced_at' => 'datetime',
100 ];
101}
102```
103
104### 2. Create the Migration
105
106```php
107Schema::create('posts', function (Blueprint $table) {
108 $table->id();
109 $table->text('content');
110 $table->string('author_did');
111 $table->timestamp('published_at');
112 $table->string('atp_uri')->unique();
113 $table->string('atp_cid');
114 $table->timestamp('atp_synced_at')->nullable();
115 $table->timestamps();
116});
117```
118
119### 3. Create the Mapper
120
121```php
122// app/AtpMappers/PostMapper.php
123namespace App\AtpMappers;
124
125use App\Models\Post;
126use Illuminate\Database\Eloquent\Model;
127use SocialDept\AtpParity\RecordMapper;
128use SocialDept\AtpSchema\Data\Data;
129use SocialDept\AtpSchema\Generated\App\Bsky\Feed\Post as PostRecord;
130
131class PostMapper extends RecordMapper
132{
133 public function recordClass(): string
134 {
135 return PostRecord::class;
136 }
137
138 public function modelClass(): string
139 {
140 return Post::class;
141 }
142
143 protected function recordToAttributes(Data $record): array
144 {
145 return [
146 'content' => $record->text,
147 'published_at' => $record->createdAt,
148 ];
149 }
150
151 protected function modelToRecordData(Model $model): array
152 {
153 return [
154 'text' => $model->content,
155 'createdAt' => $model->published_at->toIso8601String(),
156 ];
157 }
158}
159```
160
161### 4. Register Everything
162
163```php
164// config/parity.php
165return [
166 'mappers' => [
167 App\AtpMappers\PostMapper::class,
168 ],
169];
170```
171
172```php
173// config/signal.php
174return [
175 'signals' => [
176 \SocialDept\AtpParity\Signals\ParitySignal::class,
177 ],
178];
179```
180
181### 5. Start Syncing
182
183```bash
184php artisan signal:consume
185```
186
187Every new post on the AT Protocol network will now be saved to your `posts` table.
188
189## Filtering by User
190
191To only sync records from specific users, create a custom signal:
192
193```php
194namespace App\Signals;
195
196use SocialDept\AtpParity\Signals\ParitySignal;
197use SocialDept\AtpSignals\Events\SignalEvent;
198
199class FilteredParitySignal extends ParitySignal
200{
201 /**
202 * DIDs to sync.
203 */
204 protected array $allowedDids = [
205 'did:plc:abc123',
206 'did:plc:def456',
207 ];
208
209 public function handle(SignalEvent $event): void
210 {
211 // Only process events from allowed DIDs
212 if (!in_array($event->did, $this->allowedDids)) {
213 return;
214 }
215
216 parent::handle($event);
217 }
218}
219```
220
221Register your custom signal instead:
222
223```php
224// config/signal.php
225return [
226 'signals' => [
227 App\Signals\FilteredParitySignal::class,
228 ],
229];
230```
231
232## Filtering by Collection
233
234To only sync specific collections (even if more mappers are registered):
235
236```php
237namespace App\Signals;
238
239use SocialDept\AtpParity\Signals\ParitySignal;
240
241class PostsOnlySignal extends ParitySignal
242{
243 public function collections(): ?array
244 {
245 // Only sync posts, ignore other registered mappers
246 return ['app.bsky.feed.post'];
247 }
248}
249```
250
251## Custom Processing
252
253Add custom logic before or after syncing:
254
255```php
256namespace App\Signals;
257
258use SocialDept\AtpParity\Contracts\RecordMapper;
259use SocialDept\AtpParity\Signals\ParitySignal;
260use SocialDept\AtpSignals\Events\SignalEvent;
261
262class CustomParitySignal extends ParitySignal
263{
264 protected function handleUpsert(SignalEvent $event, RecordMapper $mapper): void
265 {
266 // Pre-processing
267 logger()->info('Syncing record', [
268 'did' => $event->did,
269 'collection' => $event->commit->collection,
270 'rkey' => $event->commit->rkey,
271 ]);
272
273 // Call parent to do the actual sync
274 parent::handleUpsert($event, $mapper);
275
276 // Post-processing
277 // e.g., dispatch a job, send notification, etc.
278 }
279
280 protected function handleDelete(SignalEvent $event, RecordMapper $mapper): void
281 {
282 logger()->info('Deleting record', [
283 'uri' => $this->buildUri($event->did, $event->commit->collection, $event->commit->rkey),
284 ]);
285
286 parent::handleDelete($event, $mapper);
287 }
288}
289```
290
291## Queue Integration
292
293For high-volume processing, enable queue mode:
294
295```php
296namespace App\Signals;
297
298use SocialDept\AtpParity\Signals\ParitySignal;
299
300class QueuedParitySignal extends ParitySignal
301{
302 public function shouldQueue(): bool
303 {
304 return true;
305 }
306
307 public function queue(): string
308 {
309 return 'parity-sync';
310 }
311}
312```
313
314Then run a dedicated queue worker:
315
316```bash
317php artisan queue:work --queue=parity-sync
318```
319
320## Multiple Signals
321
322You can run ParitySignal alongside other signals:
323
324```php
325// config/signal.php
326return [
327 'signals' => [
328 // Sync to database
329 \SocialDept\AtpParity\Signals\ParitySignal::class,
330
331 // Your custom analytics signal
332 App\Signals\AnalyticsSignal::class,
333
334 // Your moderation signal
335 App\Signals\ModerationSignal::class,
336 ],
337];
338```
339
340## Handling High Volume
341
342The AT Protocol firehose processes thousands of events per second. For production:
343
344### 1. Use Jetstream Mode
345
346Jetstream filters server-side, reducing bandwidth:
347
348```php
349// config/signal.php
350return [
351 'mode' => 'jetstream', // More efficient than firehose
352
353 'jetstream' => [
354 'collections' => [
355 'app.bsky.feed.post',
356 'app.bsky.feed.like',
357 ],
358 ],
359];
360```
361
362### 2. Enable Queues
363
364Process events asynchronously:
365
366```php
367class QueuedParitySignal extends ParitySignal
368{
369 public function shouldQueue(): bool
370 {
371 return true;
372 }
373}
374```
375
376### 3. Use Database Transactions
377
378Batch inserts for better performance:
379
380```php
381namespace App\Signals;
382
383use Illuminate\Support\Facades\DB;
384use SocialDept\AtpParity\Signals\ParitySignal;
385use SocialDept\AtpSignals\Events\SignalEvent;
386
387class BatchedParitySignal extends ParitySignal
388{
389 protected array $buffer = [];
390 protected int $batchSize = 100;
391
392 public function handle(SignalEvent $event): void
393 {
394 $this->buffer[] = $event;
395
396 if (count($this->buffer) >= $this->batchSize) {
397 $this->flush();
398 }
399 }
400
401 protected function flush(): void
402 {
403 DB::transaction(function () {
404 foreach ($this->buffer as $event) {
405 parent::handle($event);
406 }
407 });
408
409 $this->buffer = [];
410 }
411}
412```
413
414### 4. Monitor Performance
415
416Log sync statistics:
417
418```php
419namespace App\Signals;
420
421use SocialDept\AtpParity\Signals\ParitySignal;
422use SocialDept\AtpSignals\Events\SignalEvent;
423
424class MonitoredParitySignal extends ParitySignal
425{
426 protected int $processed = 0;
427 protected float $startTime;
428
429 public function handle(SignalEvent $event): void
430 {
431 $this->startTime ??= microtime(true);
432
433 parent::handle($event);
434
435 $this->processed++;
436
437 if ($this->processed % 1000 === 0) {
438 $elapsed = microtime(true) - $this->startTime;
439 $rate = $this->processed / $elapsed;
440
441 logger()->info("Parity sync stats", [
442 'processed' => $this->processed,
443 'elapsed' => round($elapsed, 2),
444 'rate' => round($rate, 2) . '/sec',
445 ]);
446 }
447 }
448}
449```
450
451## Cursor Management
452
453atp-signals handles cursor persistence automatically. If the consumer restarts, it resumes from where it left off.
454
455To reset and start fresh:
456
457```bash
458php artisan signal:consume --reset
459```
460
461## Testing
462
463Test your sync setup without connecting to the firehose:
464
465```php
466use App\AtpMappers\PostMapper;
467use SocialDept\AtpParity\MapperRegistry;
468use SocialDept\AtpSchema\Generated\App\Bsky\Feed\Post;
469
470// Create a test record
471$record = Post::fromArray([
472 'text' => 'Test post content',
473 'createdAt' => now()->toIso8601String(),
474]);
475
476// Get the mapper
477$registry = app(MapperRegistry::class);
478$mapper = $registry->forLexicon('app.bsky.feed.post');
479
480// Simulate a sync
481$model = $mapper->upsert($record, [
482 'uri' => 'at://did:plc:test/app.bsky.feed.post/test123',
483 'cid' => 'bafyretest...',
484]);
485
486// Assert
487$this->assertDatabaseHas('posts', [
488 'content' => 'Test post content',
489 'atp_uri' => 'at://did:plc:test/app.bsky.feed.post/test123',
490]);
491```