Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at dev 132 lines 3.5 kB view raw
1<?php 2 3declare(strict_types=1); 4 5namespace SocialDept\AtpSignals\CAR; 6 7use Generator; 8use SocialDept\AtpSignals\Binary\Reader; 9use SocialDept\AtpSignals\Core\CID; 10 11/** 12 * CAR (Content Addressable aRchive) block reader. 13 * 14 * Reads blocks from CAR format data used in AT Protocol commits. 15 */ 16class BlockReader 17{ 18 private Reader $reader; 19 20 public function __construct(string $data) 21 { 22 $this->reader = new Reader($data); 23 } 24 25 /** 26 * Read all blocks from CAR data. 27 * 28 * Yields [CID, block data] pairs. 29 * 30 * @return Generator<array{0: CID, 1: string}> 31 */ 32 public function blocks(): Generator 33 { 34 // Skip CAR header (we don't need it for Firehose processing) 35 $this->skipHeader(); 36 37 // Read blocks until end of data 38 while ($this->reader->hasMore()) { 39 $block = $this->readBlock(); 40 if ($block !== null) { 41 yield $block; 42 } 43 } 44 } 45 46 /** 47 * Skip CAR header. 48 */ 49 private function skipHeader(): void 50 { 51 if (! $this->reader->hasMore()) { 52 return; 53 } 54 55 // Read header length (varint) 56 $headerLength = $this->reader->readVarint(); 57 58 // Skip header data 59 $this->reader->skip($headerLength); 60 } 61 62 /** 63 * Read a single block. 64 * 65 * @return array{0: CID, 1: string}|null [CID, block data] or null if no more blocks 66 */ 67 private function readBlock(): ?array 68 { 69 if (! $this->reader->hasMore()) { 70 return null; 71 } 72 73 // Read block length (varint) - this is the total length of CID + data 74 $blockLength = $this->reader->readVarint(); 75 76 if ($blockLength === 0) { 77 return null; 78 } 79 80 // Read entire block data 81 $blockData = $this->reader->readBytes($blockLength); 82 83 // Parse CID from the beginning of block data 84 // CIDs in CAR blocks are self-delimiting (no separate length prefix) 85 // We need to parse the CID to find out its length 86 $cidReader = new Reader($blockData); 87 88 // Read CID version 89 $version = $cidReader->readVarint(); 90 91 if ($version === 0x12) { 92 // CIDv0 - multihash only (starting with 0x12 for SHA-256) 93 $hashLength = $cidReader->readVarint(); 94 $cidReader->readBytes($hashLength); // Skip hash bytes 95 } elseif ($version === 1) { 96 // CIDv1 - version + codec + multihash 97 $codec = $cidReader->readVarint(); 98 $hashType = $cidReader->readVarint(); 99 $hashLength = $cidReader->readVarint(); 100 $cidReader->readBytes($hashLength); // Skip hash bytes 101 } else { 102 throw new \RuntimeException("Unsupported CID version in CAR block: {$version}"); 103 } 104 105 // Now we know the CID length 106 $cidLength = $cidReader->getPosition(); 107 $cidBytes = substr($blockData, 0, $cidLength); 108 $cid = CID::fromBinary($cidBytes); 109 110 // Remaining data is the block content 111 $content = substr($blockData, $cidLength); 112 113 return [$cid, $content]; 114 } 115 116 /** 117 * Get all blocks as an associative array. 118 * 119 * @return array<string, string> Map of CID string => block data 120 */ 121 public function getBlockMap(): array 122 { 123 $blocks = []; 124 125 foreach ($this->blocks() as [$cid, $data]) { 126 $cidString = $cid->toString(); 127 $blocks[$cidString] = $data; 128 } 129 130 return $blocks; 131 } 132}