personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

link: test websocket flow through tunnel

Exercise the TCP-pipe path with a live websocket upgrade over the relay tunnel and
broadcast a server-side event back through the same stream.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+69
+69
tests/link/test_integration.py
··· 9 9 from pathlib import Path 10 10 11 11 import pytest 12 + from wsproto import WSConnection 13 + from wsproto.connection import ConnectionType 14 + from wsproto.events import AcceptConnection, CloseConnection, Request, TextMessage 12 15 16 + import convey.bridge as convey_bridge 13 17 from tests.link.client import Client, StreamResetError 14 18 from tests.link.live_helpers import ( 15 19 CONVEY_PASSWORD, ··· 99 103 "/app/link/api/status", 100 104 headers={"authorization": f"Basic {auth}"}, 101 105 ) 106 + 107 + 108 + @pytest.mark.asyncio 109 + @pytest.mark.timeout(60) 110 + async def test_websocket_upgrade_and_bidirectional_flow( 111 + tmp_path: Path, 112 + monkeypatch: pytest.MonkeyPatch, 113 + ) -> None: 114 + tmp_journal = tmp_path / "journal" 115 + tmp_journal.mkdir() 116 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_journal)) 117 + 118 + with ( 119 + running_convey_server(tmp_journal) as base_url, 120 + running_link_service(tmp_journal), 121 + ): 122 + identity = Client.pair(base_url, device_label="pytest-device") 123 + enrolled = Client.enroll_device(RELAY_URL, identity) 124 + 125 + session = await Client.dial(RELAY_URL, enrolled) 126 + async with session: 127 + raw_stream = await session._mux.open_stream() 128 + ws = WSConnection(ConnectionType.CLIENT) 129 + await raw_stream.write( 130 + ws.send(Request(host="127.0.0.1", target="/ws/events")) 131 + ) 132 + 133 + accepted = False 134 + while not accepted: 135 + chunk = await asyncio.wait_for(_read_next_chunk(raw_stream), timeout=5) 136 + ws.receive_data(chunk) 137 + for event in ws.events(): 138 + if isinstance(event, AcceptConnection): 139 + accepted = True 140 + break 141 + 142 + await _wait_for_ws_client() 143 + convey_bridge._broadcast_to_websockets({"tract": "test", "event": "ping"}) 144 + 145 + payload = None 146 + while payload is None: 147 + chunk = await asyncio.wait_for(_read_next_chunk(raw_stream), timeout=5) 148 + ws.receive_data(chunk) 149 + for event in ws.events(): 150 + if isinstance(event, TextMessage): 151 + payload = event.data 152 + break 153 + 154 + assert payload == json.dumps({"tract": "test", "event": "ping"}) 155 + 156 + await raw_stream.write(ws.send(CloseConnection(code=1000))) 157 + await raw_stream.close() 158 + 159 + 160 + async def _read_next_chunk(raw_stream: object) -> bytes: 161 + return await anext(raw_stream.read()) 162 + 163 + 164 + async def _wait_for_ws_client() -> None: 165 + deadline = asyncio.get_running_loop().time() + 5 166 + while asyncio.get_running_loop().time() < deadline: 167 + if convey_bridge._WEBSOCKET_CLIENTS: 168 + return 169 + await asyncio.sleep(0.05) 170 + raise AssertionError("websocket client never registered")