Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add integration tests for firehose consumer

+239 -4
+228
tests/Integration/FirehoseConsumerTest.php
··· 1 + <?php 2 + 3 + declare(strict_types=1); 4 + 5 + namespace SocialDept\Signal\Tests\Integration; 6 + 7 + use Orchestra\Testbench\TestCase; 8 + use SocialDept\Signal\Core\CAR; 9 + use SocialDept\Signal\Core\CBOR; 10 + use SocialDept\Signal\Core\CID; 11 + 12 + class FirehoseConsumerTest extends TestCase 13 + { 14 + public function test_cbor_can_decode_firehose_message_header(): void 15 + { 16 + // Simulate a Firehose message header 17 + // Map with 't' => '#commit', 'op' => 1 18 + $header = [ 19 + 't' => '#commit', 20 + 'op' => 1, 21 + ]; 22 + 23 + // Encode it manually for testing 24 + $cbor = "\xA2"; // Map with 2 items 25 + $cbor .= "\x61t"; // Text string 't' 26 + $cbor .= "\x67#commit"; // Text string '#commit' 27 + $cbor .= "\x62op"; // Text string 'op' 28 + $cbor .= "\x01"; // Integer 1 29 + 30 + [$decoded, $remainder] = CBOR::decodeFirst($cbor); 31 + 32 + $this->assertIsArray($decoded); 33 + $this->assertArrayHasKey('t', $decoded); 34 + $this->assertArrayHasKey('op', $decoded); 35 + $this->assertSame('#commit', $decoded['t']); 36 + $this->assertSame(1, $decoded['op']); 37 + } 38 + 39 + public function test_cbor_can_decode_commit_payload(): void 40 + { 41 + // Simplified commit payload structure 42 + $payload = [ 43 + 'repo' => 'did:plc:test123', 44 + 'rev' => 'test-rev', 45 + 'seq' => 12345, 46 + 'time' => '2024-01-01T00:00:00Z', 47 + 'ops' => [], 48 + ]; 49 + 50 + // Encode a simple payload 51 + $cbor = "\xA5"; // Map with 5 items 52 + 53 + // 'repo' key 54 + $cbor .= "\x64repo"; // Text string 'repo' 55 + $cbor .= "\x6Fdid:plc:test123"; // Text string 'did:plc:test123' 56 + 57 + // 'rev' key 58 + $cbor .= "\x63rev"; // Text string 'rev' 59 + $cbor .= "\x68test-rev"; // Text string 'test-rev' 60 + 61 + // 'seq' key 62 + $cbor .= "\x63seq"; // Text string 'seq' 63 + $cbor .= "\x19\x30\x39"; // Integer 12345 64 + 65 + // 'time' key 66 + $cbor .= "\x64time"; // Text string 'time' 67 + $cbor .= "\x78\x182024-01-01T00:00:00Z"; // Text string (length 24) 68 + 69 + // 'ops' key 70 + $cbor .= "\x63ops"; // Text string 'ops' 71 + $cbor .= "\x80"; // Empty array 72 + 73 + $decoded = CBOR::decode($cbor); 74 + 75 + $this->assertIsArray($decoded); 76 + $this->assertSame('did:plc:test123', $decoded['repo']); 77 + $this->assertSame('test-rev', $decoded['rev']); 78 + $this->assertSame(12345, $decoded['seq']); 79 + } 80 + 81 + public function test_cid_can_be_decoded_from_cbor_tag(): void 82 + { 83 + // Create a CID and encode it as CBOR tag 42 84 + $hash = hash('sha256', 'test-content', true); 85 + $cidBinary = "\x01\x71\x12\x20" . $hash; // CIDv1, dag-cbor, sha256 86 + $cidBytes = "\x00" . $cidBinary; // Add 0x00 prefix 87 + 88 + // CBOR tag 42 + byte string 89 + $length = strlen($cidBytes); 90 + $cbor = "\xD8\x2A\x58" . chr($length) . $cidBytes; 91 + 92 + $decoded = CBOR::decode($cbor); 93 + 94 + $this->assertInstanceOf(CID::class, $decoded); 95 + $this->assertSame(1, $decoded->version); 96 + $this->assertSame(0x71, $decoded->codec); 97 + } 98 + 99 + public function test_car_can_extract_blocks(): void 100 + { 101 + // Create a minimal CAR with header and one block 102 + $car = ''; 103 + 104 + // CAR header (minimal) 105 + $headerCbor = "\xA1\x67version\x01"; // {version: 1} 106 + $headerLength = strlen($headerCbor); 107 + $car .= chr($headerLength) . $headerCbor; 108 + 109 + // Create a block with CID 110 + $blockData = "\xA1\x64test\x65value"; // {test: "value"} 111 + $cid = CID::fromBinary("\x01\x71\x12\x20" . str_repeat("\x00", 32)); 112 + $cidBinary = $cid->toBinary(); 113 + $cidLength = strlen($cidBinary); 114 + 115 + $block = chr($cidLength) . $cidBinary . $blockData; 116 + $blockLength = strlen($block); 117 + 118 + // Add varint-encoded block length 119 + $car .= chr($blockLength) . $block; 120 + 121 + // This should not throw an error 122 + $blocks = []; 123 + foreach (CAR::blockMap($car, 'did:plc:test') as $key => $value) { 124 + $blocks[$key] = $value; 125 + } 126 + 127 + // Even if empty, it shouldn't crash 128 + $this->assertIsArray($blocks); 129 + } 130 + 131 + public function test_firehose_consumer_message_structure(): void 132 + { 133 + // Test the exact structure FirehoseConsumer expects 134 + 135 + // 1. Create CBOR header 136 + $headerMap = [ 137 + 't' => '#commit', 138 + 'op' => 1, 139 + ]; 140 + 141 + $header = "\xA2"; // Map with 2 items 142 + $header .= "\x61t\x67#commit"; // 't' => '#commit' 143 + $header .= "\x62op\x01"; // 'op' => 1 144 + 145 + // 2. Create CBOR payload 146 + $payload = "\xA6"; // Map with 6 items 147 + $payload .= "\x63seq\x19\x30\x39"; // 'seq' => 12345 148 + $payload .= "\x66rebase\xF4"; // 'rebase' => false 149 + $payload .= "\x64repo\x6Fdid:plc:test123"; // 'repo' => 'did:plc:test123' 150 + $payload .= "\x66commit\xA0"; // 'commit' => {} 151 + $payload .= "\x63rev\x68test-rev"; // 'rev' => 'test-rev' 152 + $payload .= "\x65since\x66origin"; // 'since' => 'origin' 153 + 154 + // Add required fields 155 + $payload .= "\x66blocks\x40"; // 'blocks' => empty byte string 156 + $payload .= "\x63ops\x80"; // 'ops' => [] 157 + $payload .= "\x64time\x78\x182024-01-01T00:00:00Z"; // 'time' => timestamp 158 + 159 + // Combine header + payload 160 + $message = $header . $payload; 161 + 162 + // Test decoding header 163 + [$decodedHeader, $remainder] = CBOR::decodeFirst($message); 164 + 165 + $this->assertIsArray($decodedHeader); 166 + $this->assertSame('#commit', $decodedHeader['t']); 167 + $this->assertSame(1, $decodedHeader['op']); 168 + 169 + // Test decoding payload 170 + $decodedPayload = CBOR::decode($remainder); 171 + 172 + $this->assertIsArray($decodedPayload); 173 + $this->assertArrayHasKey('seq', $decodedPayload); 174 + $this->assertArrayHasKey('repo', $decodedPayload); 175 + $this->assertArrayHasKey('rev', $decodedPayload); 176 + } 177 + 178 + public function test_complete_firehose_message_flow(): void 179 + { 180 + // This test simulates the complete flow that FirehoseConsumer::handleMessage() uses 181 + 182 + // Step 1: CBOR header 183 + $header = "\xA2\x61t\x67#commit\x62op\x01"; 184 + 185 + // Step 2: CBOR payload with all required fields 186 + $payload = "\xA9"; // Map with 9 items 187 + $payload .= "\x63seq\x19\x30\x39"; // seq: 12345 188 + $payload .= "\x66rebase\xF4"; // rebase: false 189 + $payload .= "\x64repo\x6Fdid:plc:test123"; // repo: "did:plc:test123" 190 + $payload .= "\x66commit\xA0"; // commit: {} 191 + $payload .= "\x63rev\x68test-rev"; // rev: "test-rev" 192 + $payload .= "\x65since\x66origin"; // since: "origin" 193 + $payload .= "\x66blocks\x40"; // blocks: b'' 194 + $payload .= "\x63ops\x80"; // ops: [] 195 + $payload .= "\x64time\x78\x182024-01-01T00:00:00Z"; // time: "2024-01-01T00:00:00Z" 196 + 197 + $message = $header . $payload; 198 + 199 + // Simulate FirehoseConsumer::handleMessage() logic 200 + 201 + // 1. Decode CBOR header 202 + [$decodedHeader, $remainder] = CBOR::decodeFirst($message); 203 + 204 + $this->assertArrayHasKey('t', $decodedHeader); 205 + $this->assertArrayHasKey('op', $decodedHeader); 206 + 207 + // 2. Check operation 208 + $this->assertSame(1, $decodedHeader['op']); 209 + 210 + // 3. Decode payload 211 + $decodedPayload = CBOR::decode($remainder); 212 + 213 + // 4. Verify required fields exist 214 + $requiredFields = ['seq', 'rebase', 'repo', 'commit', 'rev', 'since', 'blocks', 'ops', 'time']; 215 + foreach ($requiredFields as $field) { 216 + $this->assertArrayHasKey($field, $decodedPayload); 217 + } 218 + 219 + // 5. Verify data types 220 + $this->assertIsInt($decodedPayload['seq']); 221 + $this->assertIsBool($decodedPayload['rebase']); 222 + $this->assertIsString($decodedPayload['repo']); 223 + $this->assertIsArray($decodedPayload['ops']); 224 + 225 + // Success! The message structure is valid 226 + $this->assertTrue(true); 227 + } 228 + }
+11 -4
tests/Unit/SignalTest.php
··· 66 66 /** @test */ 67 67 public function it_can_filter_by_wildcard_collection() 68 68 { 69 - $signal = new class extends Signal { 69 + $signalClass = new class extends Signal { 70 70 public function eventTypes(): array 71 71 { 72 72 return ['commit']; ··· 83 83 } 84 84 }; 85 85 86 + // Create registry and register the signal 87 + $registry = new \SocialDept\Signal\Services\SignalRegistry; 88 + $registry->register($signalClass::class); 89 + 86 90 // Test that it matches app.bsky.feed.post 87 91 $postEvent = new SignalEvent( 88 92 did: 'did:plc:test', ··· 96 100 ), 97 101 ); 98 102 99 - $this->assertTrue($signal->shouldHandle($postEvent)); 103 + $matchingSignals = $registry->getMatchingSignals($postEvent); 104 + $this->assertCount(1, $matchingSignals); 100 105 101 106 // Test that it matches app.bsky.feed.like 102 107 $likeEvent = new SignalEvent( ··· 111 116 ), 112 117 ); 113 118 114 - $this->assertTrue($signal->shouldHandle($likeEvent)); 119 + $matchingSignals = $registry->getMatchingSignals($likeEvent); 120 + $this->assertCount(1, $matchingSignals); 115 121 116 122 // Test that it does NOT match app.bsky.graph.follow 117 123 $followEvent = new SignalEvent( ··· 126 132 ), 127 133 ); 128 134 129 - $this->assertFalse($signal->shouldHandle($followEvent)); 135 + $matchingSignals = $registry->getMatchingSignals($followEvent); 136 + $this->assertCount(0, $matchingSignals); 130 137 } 131 138 }