Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add CAR (Content Addressable aRchive) parser

+351
+133
src/CAR/BlockReader.php
··· 1 + <?php 2 + 3 + declare(strict_types=1); 4 + 5 + namespace SocialDept\Signal\CAR; 6 + 7 + use Generator; 8 + use SocialDept\Signal\Binary\Reader; 9 + use SocialDept\Signal\Core\CID; 10 + use SocialDept\Signal\Core\CBOR; 11 + 12 + /** 13 + * CAR (Content Addressable aRchive) block reader. 14 + * 15 + * Reads blocks from CAR format data used in AT Protocol commits. 16 + */ 17 + class BlockReader 18 + { 19 + private Reader $reader; 20 + 21 + public function __construct(string $data) 22 + { 23 + $this->reader = new Reader($data); 24 + } 25 + 26 + /** 27 + * Read all blocks from CAR data. 28 + * 29 + * Yields [CID, block data] pairs. 30 + * 31 + * @return Generator<array{0: CID, 1: string}> 32 + */ 33 + public function blocks(): Generator 34 + { 35 + // Skip CAR header (we don't need it for Firehose processing) 36 + $this->skipHeader(); 37 + 38 + // Read blocks until end of data 39 + while ($this->reader->hasMore()) { 40 + $block = $this->readBlock(); 41 + if ($block !== null) { 42 + yield $block; 43 + } 44 + } 45 + } 46 + 47 + /** 48 + * Skip CAR header. 49 + */ 50 + private function skipHeader(): void 51 + { 52 + if (!$this->reader->hasMore()) { 53 + return; 54 + } 55 + 56 + // Read header length (varint) 57 + $headerLength = $this->reader->readVarint(); 58 + 59 + // Skip header data 60 + $this->reader->skip($headerLength); 61 + } 62 + 63 + /** 64 + * Read a single block. 65 + * 66 + * @return array{0: CID, 1: string}|null [CID, block data] or null if no more blocks 67 + */ 68 + private function readBlock(): ?array 69 + { 70 + if (!$this->reader->hasMore()) { 71 + return null; 72 + } 73 + 74 + // Read block length (varint) - this is the total length of CID + data 75 + $blockLength = $this->reader->readVarint(); 76 + 77 + if ($blockLength === 0) { 78 + return null; 79 + } 80 + 81 + // Read entire block data 82 + $blockData = $this->reader->readBytes($blockLength); 83 + 84 + // Parse CID from the beginning of block data 85 + // CIDs in CAR blocks are self-delimiting (no separate length prefix) 86 + // We need to parse the CID to find out its length 87 + $cidReader = new Reader($blockData); 88 + 89 + // Read CID version 90 + $version = $cidReader->readVarint(); 91 + 92 + if ($version === 0x12) { 93 + // CIDv0 - multihash only (starting with 0x12 for SHA-256) 94 + $hashLength = $cidReader->readVarint(); 95 + $cidReader->readBytes($hashLength); // Skip hash bytes 96 + } elseif ($version === 1) { 97 + // CIDv1 - version + codec + multihash 98 + $codec = $cidReader->readVarint(); 99 + $hashType = $cidReader->readVarint(); 100 + $hashLength = $cidReader->readVarint(); 101 + $cidReader->readBytes($hashLength); // Skip hash bytes 102 + } else { 103 + throw new \RuntimeException("Unsupported CID version in CAR block: {$version}"); 104 + } 105 + 106 + // Now we know the CID length 107 + $cidLength = $cidReader->getPosition(); 108 + $cidBytes = substr($blockData, 0, $cidLength); 109 + $cid = CID::fromBinary($cidBytes); 110 + 111 + // Remaining data is the block content 112 + $content = substr($blockData, $cidLength); 113 + 114 + return [$cid, $content]; 115 + } 116 + 117 + /** 118 + * Get all blocks as an associative array. 119 + * 120 + * @return array<string, string> Map of CID string => block data 121 + */ 122 + public function getBlockMap(): array 123 + { 124 + $blocks = []; 125 + 126 + foreach ($this->blocks() as [$cid, $data]) { 127 + $cidString = $cid->toString(); 128 + $blocks[$cidString] = $data; 129 + } 130 + 131 + return $blocks; 132 + } 133 + }
+132
src/CAR/RecordExtractor.php
··· 1 + <?php 2 + 3 + declare(strict_types=1); 4 + 5 + namespace SocialDept\Signal\CAR; 6 + 7 + use Generator; 8 + use SocialDept\Signal\Core\CID; 9 + use SocialDept\Signal\Core\CBOR; 10 + 11 + /** 12 + * Extract records from AT Protocol MST (Merkle Search Tree) blocks. 13 + * 14 + * Walks MST structure to extract collection/rkey records with their values. 15 + */ 16 + class RecordExtractor 17 + { 18 + /** 19 + * @param array<string, string> $blocks Map of CID string => block data 20 + */ 21 + public function __construct( 22 + private readonly array $blocks, 23 + private readonly string $did, 24 + ) {} 25 + 26 + /** 27 + * Extract all records from blocks. 28 + * 29 + * Yields records in format: "collection/rkey" => record data 30 + * 31 + * @return Generator<string, array> 32 + */ 33 + public function extractRecords(CID $rootCid): Generator 34 + { 35 + yield from $this->walkTree($rootCid, ''); 36 + } 37 + 38 + /** 39 + * Recursively walk MST tree. 40 + * 41 + * @param CID $cid Current node CID 42 + * @param string $prefix Path prefix accumulated from parent nodes 43 + * @return Generator<string, array> 44 + */ 45 + private function walkTree(CID $cid, string $prefix): Generator 46 + { 47 + $cidStr = $cid->toString(); 48 + 49 + // Get block data 50 + if (!isset($this->blocks[$cidStr])) { 51 + // Block not found - might be a pruned tree, skip it 52 + return; 53 + } 54 + 55 + $blockData = $this->blocks[$cidStr]; 56 + 57 + // Decode CBOR block 58 + $node = CBOR::decode($blockData); 59 + 60 + if (!is_array($node)) { 61 + return; 62 + } 63 + 64 + // Process left subtree if exists 65 + if (isset($node['l']) && $node['l'] instanceof CID) { 66 + yield from $this->walkTree($node['l'], $prefix); 67 + } 68 + 69 + // Process entries 70 + if (isset($node['e']) && is_array($node['e'])) { 71 + foreach ($node['e'] as $entry) { 72 + if (!is_array($entry)) { 73 + continue; 74 + } 75 + 76 + // Build full key from prefix + entry key 77 + $entryPrefix = $entry['p'] ?? 0; 78 + $keyPart = $entry['k'] ?? ''; 79 + $fullKey = substr($prefix, 0, $entryPrefix) . $keyPart; 80 + 81 + // If entry has a tree link, walk it 82 + if (isset($entry['t']) && $entry['t'] instanceof CID) { 83 + yield from $this->walkTree($entry['t'], $fullKey); 84 + } 85 + 86 + // If entry has a value (record), yield it 87 + if (isset($entry['v']) && $entry['v'] instanceof CID) { 88 + $recordCid = $entry['v']; 89 + $record = $this->getRecord($recordCid); 90 + 91 + if ($record !== null) { 92 + // Parse collection/rkey from key 93 + $parts = explode('/', $fullKey, 2); 94 + if (count($parts) === 2) { 95 + [$collection, $rkey] = $parts; 96 + $path = "{$collection}/{$rkey}"; 97 + 98 + yield $path => [ 99 + 'uri' => "at://{$this->did}/{$path}", 100 + 'cid' => $recordCid->toString(), 101 + 'value' => $record, 102 + ]; 103 + } else { 104 + // Debug: log when key format doesn't match expected pattern 105 + \Illuminate\Support\Facades\Log::debug('Signal: MST key parse failed', [ 106 + 'fullKey' => $fullKey, 107 + 'parts' => $parts, 108 + 'did' => $this->did, 109 + ]); 110 + } 111 + } 112 + } 113 + } 114 + } 115 + } 116 + 117 + /** 118 + * Get record data from block. 119 + */ 120 + private function getRecord(CID $cid): ?array 121 + { 122 + $cidStr = $cid->toString(); 123 + 124 + if (!isset($this->blocks[$cidStr])) { 125 + return null; 126 + } 127 + 128 + $data = CBOR::decode($this->blocks[$cidStr]); 129 + 130 + return is_array($data) ? $data : null; 131 + } 132 + }
+86
src/Core/CAR.php
··· 1 + <?php 2 + 3 + declare(strict_types=1); 4 + 5 + namespace SocialDept\Signal\Core; 6 + 7 + use Generator; 8 + use SocialDept\Signal\CAR\BlockReader; 9 + use SocialDept\Signal\CAR\RecordExtractor; 10 + 11 + /** 12 + * CAR (Content Addressable aRchive) facade. 13 + * 14 + * Provides static methods for parsing CAR data from AT Protocol Firehose. 15 + */ 16 + class CAR 17 + { 18 + /** 19 + * Parse CAR blocks. 20 + * 21 + * Returns array of blocks keyed by CID string. 22 + * The blocks contain raw CBOR data, not decoded. 23 + * 24 + * @param string $data Binary CAR data 25 + * @param string|null $did DID for constructing URIs (not used, kept for compatibility) 26 + * @return array<string, string> Map of CID string => block data 27 + */ 28 + public static function blockMap(string $data, ?string $did = null): array 29 + { 30 + // Read all blocks from CAR 31 + $blockReader = new BlockReader($data); 32 + return $blockReader->getBlockMap(); 33 + } 34 + 35 + /** 36 + * Extract DID from commit block. 37 + */ 38 + private static function extractDidFromBlocks(array $blocks): ?string 39 + { 40 + // The first block is typically the commit 41 + $firstBlock = reset($blocks); 42 + 43 + if ($firstBlock === false) { 44 + return null; 45 + } 46 + 47 + $decoded = CBOR::decode($firstBlock); 48 + 49 + if (!is_array($decoded)) { 50 + return null; 51 + } 52 + 53 + return $decoded['did'] ?? null; 54 + } 55 + 56 + /** 57 + * Find MST root CID from blocks. 58 + */ 59 + private static function findMstRoot(array $blocks, array $cids): ?CID 60 + { 61 + // Try to parse commit block to get data CID 62 + $firstBlock = reset($blocks); 63 + 64 + if ($firstBlock === false) { 65 + return null; 66 + } 67 + 68 + $commit = CBOR::decode($firstBlock); 69 + 70 + if (!is_array($commit)) { 71 + return null; 72 + } 73 + 74 + // MST root is in the 'data' field of commit 75 + if (isset($commit['data']) && $commit['data'] instanceof CID) { 76 + return $commit['data']; 77 + } 78 + 79 + // Fallback: second block is often the MST root 80 + if (count($cids) >= 2) { 81 + return CID::fromString($cids[1]); 82 + } 83 + 84 + return null; 85 + } 86 + }