Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
1<?php
2
3declare(strict_types=1);
4
5namespace SocialDept\AtpSignals\CAR;
6
7use Generator;
8use SocialDept\AtpSignals\Binary\Reader;
9use SocialDept\AtpSignals\Core\CID;
10
11/**
12 * CAR (Content Addressable aRchive) block reader.
13 *
14 * Reads blocks from CAR format data used in AT Protocol commits.
15 */
16class BlockReader
17{
18 private Reader $reader;
19
20 public function __construct(string $data)
21 {
22 $this->reader = new Reader($data);
23 }
24
25 /**
26 * Read all blocks from CAR data.
27 *
28 * Yields [CID, block data] pairs.
29 *
30 * @return Generator<array{0: CID, 1: string}>
31 */
32 public function blocks(): Generator
33 {
34 // Skip CAR header (we don't need it for Firehose processing)
35 $this->skipHeader();
36
37 // Read blocks until end of data
38 while ($this->reader->hasMore()) {
39 $block = $this->readBlock();
40 if ($block !== null) {
41 yield $block;
42 }
43 }
44 }
45
46 /**
47 * Skip CAR header.
48 */
49 private function skipHeader(): void
50 {
51 if (! $this->reader->hasMore()) {
52 return;
53 }
54
55 // Read header length (varint)
56 $headerLength = $this->reader->readVarint();
57
58 // Skip header data
59 $this->reader->skip($headerLength);
60 }
61
62 /**
63 * Read a single block.
64 *
65 * @return array{0: CID, 1: string}|null [CID, block data] or null if no more blocks
66 */
67 private function readBlock(): ?array
68 {
69 if (! $this->reader->hasMore()) {
70 return null;
71 }
72
73 // Read block length (varint) - this is the total length of CID + data
74 $blockLength = $this->reader->readVarint();
75
76 if ($blockLength === 0) {
77 return null;
78 }
79
80 // Read entire block data
81 $blockData = $this->reader->readBytes($blockLength);
82
83 // Parse CID from the beginning of block data
84 // CIDs in CAR blocks are self-delimiting (no separate length prefix)
85 // We need to parse the CID to find out its length
86 $cidReader = new Reader($blockData);
87
88 // Read CID version
89 $version = $cidReader->readVarint();
90
91 if ($version === 0x12) {
92 // CIDv0 - multihash only (starting with 0x12 for SHA-256)
93 $hashLength = $cidReader->readVarint();
94 $cidReader->readBytes($hashLength); // Skip hash bytes
95 } elseif ($version === 1) {
96 // CIDv1 - version + codec + multihash
97 $codec = $cidReader->readVarint();
98 $hashType = $cidReader->readVarint();
99 $hashLength = $cidReader->readVarint();
100 $cidReader->readBytes($hashLength); // Skip hash bytes
101 } else {
102 throw new \RuntimeException("Unsupported CID version in CAR block: {$version}");
103 }
104
105 // Now we know the CID length
106 $cidLength = $cidReader->getPosition();
107 $cidBytes = substr($blockData, 0, $cidLength);
108 $cid = CID::fromBinary($cidBytes);
109
110 // Remaining data is the block content
111 $content = substr($blockData, $cidLength);
112
113 return [$cid, $content];
114 }
115
116 /**
117 * Get all blocks as an associative array.
118 *
119 * @return array<string, string> Map of CID string => block data
120 */
121 public function getBlockMap(): array
122 {
123 $blocks = [];
124
125 foreach ($this->blocks() as [$cid, $data]) {
126 $cidString = $cid->toString();
127 $blocks[$cidString] = $data;
128 }
129
130 return $blocks;
131 }
132}