personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

link: delete wsgi bridge

Remove the in-process HTTP/WSGI bridge and its unit tests to clear the way for the
loopback TCP pipe replacement.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

-568
-210
tests/link/test_wsgi_bridge.py
··· 1 - # SPDX-License-Identifier: AGPL-3.0-only 2 - # Copyright (c) 2026 sol pbc 3 - 4 - from __future__ import annotations 5 - 6 - import asyncio 7 - import dataclasses 8 - import hashlib 9 - import json 10 - import os 11 - import time 12 - from collections.abc import Iterator 13 - 14 - import pytest 15 - from flask import Flask, Response, jsonify, request, stream_with_context 16 - 17 - from think.link.wsgi_bridge import ExchangeMetadata, serve_request 18 - 19 - 20 - class _WriterStub: 21 - def __init__(self) -> None: 22 - self.writes: list[bytes] = [] 23 - 24 - async def write(self, data: bytes) -> None: 25 - self.writes.append(data) 26 - 27 - async def drain(self) -> None: 28 - return None 29 - 30 - def close(self) -> None: 31 - return None 32 - 33 - async def wait_closed(self) -> None: 34 - return None 35 - 36 - def joined(self) -> bytes: 37 - return b"".join(self.writes) 38 - 39 - 40 - def _build_app(*, propagate_exceptions: bool = False) -> Flask: 41 - app = Flask(__name__) 42 - app.config["PROPAGATE_EXCEPTIONS"] = propagate_exceptions 43 - 44 - @app.get("/hello") 45 - def hello() -> Response: 46 - return Response(b"hello", mimetype="text/plain") 47 - 48 - @app.get("/stream") 49 - def stream() -> Response: 50 - @stream_with_context 51 - def generate() -> Iterator[bytes]: 52 - for chunk in (b"part-1\n", b"part-2\n", b"part-3\n"): 53 - time.sleep(0.01) 54 - yield chunk 55 - 56 - return Response(generate(), mimetype="text/plain") 57 - 58 - @app.post("/upload") 59 - def upload() -> Response: 60 - body = request.get_data() 61 - return jsonify( 62 - {"sha256": hashlib.sha256(body).hexdigest(), "length": len(body)} 63 - ) 64 - 65 - @app.get("/boom") 66 - def boom() -> Response: 67 - raise RuntimeError("bridge test failure") 68 - 69 - return app 70 - 71 - 72 - def _make_reader(request_bytes: bytes) -> asyncio.StreamReader: 73 - reader = asyncio.StreamReader() 74 - reader.feed_data(request_bytes) 75 - reader.feed_eof() 76 - return reader 77 - 78 - 79 - async def _serve( 80 - request_bytes: bytes, 81 - *, 82 - app: Flask | None = None, 83 - stream_id: int = 1, 84 - ) -> tuple[ExchangeMetadata, _WriterStub]: 85 - writer = _WriterStub() 86 - metadata = await serve_request( 87 - _make_reader(request_bytes), 88 - writer, 89 - (app or _build_app()).wsgi_app, 90 - stream_id=stream_id, 91 - ) 92 - return metadata, writer 93 - 94 - 95 - def _split_response(raw: bytes) -> tuple[bytes, bytes]: 96 - head, sep, body = raw.partition(b"\r\n\r\n") 97 - assert sep == b"\r\n\r\n" 98 - return head, body 99 - 100 - 101 - @pytest.mark.asyncio 102 - async def test_get_returns_200_and_body() -> None: 103 - meta, writer = await _serve( 104 - b"GET /hello HTTP/1.1\r\nHost: link.test\r\nContent-Length: 0\r\n\r\n", 105 - ) 106 - 107 - head, body = _split_response(writer.joined()) 108 - 109 - assert meta == ExchangeMetadata( 110 - method="GET", 111 - path="/hello", 112 - status=200, 113 - request_bytes=0, 114 - response_bytes=len(writer.joined()), 115 - stream_id=1, 116 - ) 117 - assert set(ExchangeMetadata.__dataclass_fields__) == { 118 - "method", 119 - "path", 120 - "status", 121 - "request_bytes", 122 - "response_bytes", 123 - "stream_id", 124 - } 125 - assert head.startswith(b"HTTP/1.1 200 OK\r\n") 126 - assert body == b"hello" 127 - 128 - 129 - @pytest.mark.asyncio 130 - async def test_post_upload_roundtrips_sha256() -> None: 131 - payload = os.urandom(1024 * 1024) 132 - digest = hashlib.sha256(payload).hexdigest() 133 - request_bytes = ( 134 - b"POST /upload HTTP/1.1\r\n" 135 - b"Host: link.test\r\n" 136 - b"Content-Type: application/octet-stream\r\n" 137 - b"Content-Length: " + str(len(payload)).encode("ascii") + b"\r\n\r\n" + payload 138 - ) 139 - 140 - meta, writer = await _serve(request_bytes) 141 - head, body = _split_response(writer.joined()) 142 - parsed = json.loads(body) 143 - 144 - assert meta.method == "POST" 145 - assert meta.path == "/upload" 146 - assert meta.status == 200 147 - assert meta.request_bytes == 1024 * 1024 148 - assert meta.response_bytes == len(writer.joined()) 149 - assert head.startswith(b"HTTP/1.1 200 OK\r\n") 150 - assert parsed == {"sha256": digest, "length": 1024 * 1024} 151 - 152 - 153 - @pytest.mark.asyncio 154 - async def test_streaming_response_arrives_in_chunks() -> None: 155 - meta, writer = await _serve( 156 - b"GET /stream HTTP/1.1\r\nHost: link.test\r\nContent-Length: 0\r\n\r\n", 157 - ) 158 - 159 - assert meta.status == 200 160 - assert meta.method == "GET" 161 - assert meta.path == "/stream" 162 - assert writer.writes[0].startswith(b"HTTP/1.1 200 OK\r\n") 163 - assert writer.writes[1:] == [b"part-1\n", b"part-2\n", b"part-3\n"] 164 - assert len(writer.writes) == 4 165 - assert meta.response_bytes == len(writer.joined()) 166 - 167 - 168 - @pytest.mark.asyncio 169 - async def test_malformed_request_line_returns_400() -> None: 170 - meta, writer = await _serve(b"NOTAREALREQUEST\r\n\r\n") 171 - 172 - head, body = _split_response(writer.joined()) 173 - 174 - assert meta.method == "-" 175 - assert meta.path == "-" 176 - assert meta.status == 400 177 - assert head.startswith(b"HTTP/1.1 400 bad request\r\n") 178 - assert body == b"bad request\n" 179 - 180 - 181 - @pytest.mark.asyncio 182 - async def test_wsgi_exception_returns_500() -> None: 183 - meta, writer = await _serve( 184 - b"GET /boom HTTP/1.1\r\nHost: link.test\r\nContent-Length: 0\r\n\r\n", 185 - app=_build_app(propagate_exceptions=True), 186 - ) 187 - 188 - head, body = _split_response(writer.joined()) 189 - 190 - assert meta.method == "GET" 191 - assert meta.path == "/boom" 192 - assert meta.status == 500 193 - assert head.startswith(b"HTTP/1.1 500 internal server error\r\n") 194 - assert body == b"internal server error\n" 195 - 196 - 197 - @pytest.mark.asyncio 198 - async def test_metadata_has_no_payload_fields() -> None: 199 - meta, _ = await _serve( 200 - b"GET /hello HTTP/1.1\r\nHost: link.test\r\nContent-Length: 0\r\n\r\n", 201 - ) 202 - 203 - assert [field.name for field in dataclasses.fields(meta)] == [ 204 - "method", 205 - "path", 206 - "status", 207 - "request_bytes", 208 - "response_bytes", 209 - "stream_id", 210 - ]
-358
think/link/wsgi_bridge.py
··· 1 - # SPDX-License-Identifier: AGPL-3.0-only 2 - # Copyright (c) 2026 sol pbc 3 - 4 - """HTTP/1.1 ⇄ WSGI bridge for the link tunnel. 5 - 6 - Each tunnel stream is one HTTP request/response exchange (no keep-alive). 7 - This module: 8 - 9 - 1. Parses an HTTP/1.1 request from the per-stream byte reader 10 - 2. Builds a WSGI environ dict 11 - 3. Invokes convey's real Flask app 12 - 4. Writes the response (status + headers + body) back to the stream writer 13 - 14 - Streaming responses (SSE, chunked) are supported — the WSGI iterable is 15 - pumped chunk-by-chunk so the client sees events as the app produces them. 16 - 17 - Request-body size is capped at 64 MiB. Privacy invariant: NO request or 18 - response bytes are ever logged. Only error counts, byte totals, and 19 - status codes are eligible for logging (see `metadata`), never payloads. 20 - """ 21 - 22 - from __future__ import annotations 23 - 24 - import asyncio 25 - import io 26 - import logging 27 - import urllib.parse 28 - from dataclasses import dataclass 29 - from typing import Any, Callable 30 - 31 - log = logging.getLogger("link.wsgi") 32 - 33 - MAX_REQUEST_BODY = 64 * 1024 * 1024 34 - WSGI_SERVER_NAME = "solstone-link" 35 - 36 - 37 - @dataclass 38 - class ExchangeMetadata: 39 - """Per-request rendezvous metadata — safe to log (no payload).""" 40 - 41 - method: str 42 - path: str 43 - status: int | None = None 44 - request_bytes: int = 0 45 - response_bytes: int = 0 46 - stream_id: int | None = None 47 - 48 - 49 - async def serve_request( 50 - reader: asyncio.StreamReader, 51 - writer: Any, # think.link.mux.StreamWriter 52 - wsgi_app: Callable[[dict[str, Any], Callable[..., Any]], Any], 53 - *, 54 - peer_fingerprint: str | None = None, 55 - tunnel_id: str | None = None, 56 - stream_id: int | None = None, 57 - ) -> ExchangeMetadata: 58 - """Read one HTTP/1.1 request; dispatch to `wsgi_app`; write response. 59 - 60 - Returns rendezvous metadata describing what happened (for callosum 61 - events + debug logs). Never returns payload bytes. 62 - """ 63 - meta = ExchangeMetadata(method="-", path="-", stream_id=stream_id) 64 - try: 65 - request = await _read_request(reader) 66 - except _BadRequest as exc: 67 - log.debug("tunnel %s stream %s: bad request: %s", tunnel_id, stream_id, exc) 68 - await _write_simple(writer, 400, "bad request", b"bad request\n") 69 - meta.status = 400 70 - meta.response_bytes = _byte_count_for_simple(b"bad request\n") 71 - return meta 72 - except asyncio.IncompleteReadError: 73 - log.debug("tunnel %s stream %s: incomplete request", tunnel_id, stream_id) 74 - return meta 75 - 76 - meta.method = request.method 77 - meta.path = request.path 78 - meta.request_bytes = len(request.body) 79 - 80 - environ = _build_environ( 81 - request, 82 - peer_fingerprint=peer_fingerprint, 83 - tunnel_id=tunnel_id, 84 - ) 85 - 86 - response_state = _ResponseState() 87 - 88 - def start_response( 89 - status: str, 90 - headers: list[tuple[str, str]], 91 - exc_info: Any = None, 92 - ) -> Callable[[bytes], None]: 93 - response_state.status_line = status 94 - response_state.headers = list(headers) 95 - 96 - # WSGI returns a write() callable, but we use the iterable instead. 97 - def write(_data: bytes) -> None: 98 - raise RuntimeError("write() callable not supported; return iterable") 99 - 100 - return write 101 - 102 - # Run the WSGI app in a thread — it may block on DB, file I/O, etc. 103 - loop = asyncio.get_running_loop() 104 - try: 105 - result = await loop.run_in_executor( 106 - None, lambda: wsgi_app(environ, start_response) 107 - ) 108 - except Exception: 109 - log.exception("tunnel %s stream %s: wsgi app raised", tunnel_id, stream_id) 110 - await _write_simple( 111 - writer, 500, "internal server error", b"internal server error\n" 112 - ) 113 - meta.status = 500 114 - meta.response_bytes = _byte_count_for_simple(b"internal server error\n") 115 - return meta 116 - 117 - if response_state.status_line is None: 118 - log.warning( 119 - "tunnel %s stream %s: wsgi app returned without calling start_response", 120 - tunnel_id, 121 - stream_id, 122 - ) 123 - await _write_simple(writer, 500, "internal server error", b"missing response\n") 124 - meta.status = 500 125 - return meta 126 - 127 - # Parse numeric status. 128 - try: 129 - code_str, reason = response_state.status_line.split(" ", 1) 130 - code = int(code_str) 131 - except (ValueError, IndexError): 132 - code = 500 133 - reason = "internal server error" 134 - meta.status = code 135 - 136 - # Detect if the response is Transfer-Encoding: chunked or uses Content-Length. 137 - headers_map = {k.lower(): v for k, v in response_state.headers} 138 - is_chunked = headers_map.get("transfer-encoding", "").lower() == "chunked" 139 - 140 - # Send status + headers. 141 - sent = await _write_status_headers(writer, code, reason, response_state.headers) 142 - meta.response_bytes += sent 143 - 144 - # Stream body. Iterate in a thread (WSGI iterables can block). 145 - iterator = ( 146 - iter(result) 147 - if not isinstance(result, (bytes, bytearray)) 148 - else iter([bytes(result)]) 149 - ) 150 - 151 - async def next_chunk() -> bytes | None: 152 - def _pull() -> bytes | None: 153 - try: 154 - return next(iterator) 155 - except StopIteration: 156 - return None 157 - 158 - return await loop.run_in_executor(None, _pull) 159 - 160 - try: 161 - while True: 162 - chunk = await next_chunk() 163 - if chunk is None: 164 - break 165 - if not chunk: 166 - continue 167 - if is_chunked: 168 - # The WSGI app is responsible for emitting valid chunked 169 - # framing (Flask's stream_with_context does this). Pass 170 - # through unchanged. 171 - await writer.write(chunk) 172 - else: 173 - await writer.write(chunk) 174 - meta.response_bytes += len(chunk) 175 - finally: 176 - # WSGI requires closing iterables that have a close() method. 177 - close = getattr(result, "close", None) 178 - if callable(close): 179 - try: 180 - await loop.run_in_executor(None, close) 181 - except Exception: 182 - log.debug("wsgi close() raised", exc_info=True) 183 - 184 - return meta 185 - 186 - 187 - @dataclass 188 - class _ResponseState: 189 - status_line: str | None = None 190 - headers: list[tuple[str, str]] | None = None 191 - 192 - 193 - @dataclass 194 - class _Request: 195 - method: str 196 - path: str 197 - query: str 198 - headers: list[tuple[str, str]] 199 - body: bytes 200 - 201 - 202 - class _BadRequest(Exception): 203 - pass 204 - 205 - 206 - async def _read_request(reader: asyncio.StreamReader) -> _Request: 207 - """Parse HTTP/1.1 request line + headers + body (Content-Length only).""" 208 - raw_line = await reader.readline() 209 - if not raw_line: 210 - raise asyncio.IncompleteReadError(b"", None) 211 - line = raw_line.decode("latin-1").rstrip("\r\n") 212 - parts = line.split(" ", 2) 213 - if len(parts) != 3: 214 - raise _BadRequest(f"bad request line: {line!r}") 215 - method, target, _version = parts 216 - path, _, query = target.partition("?") 217 - 218 - headers: list[tuple[str, str]] = [] 219 - while True: 220 - raw = await reader.readline() 221 - if raw in (b"\r\n", b"\n", b""): 222 - break 223 - try: 224 - header_line = raw.decode("latin-1").rstrip("\r\n") 225 - except UnicodeDecodeError as exc: 226 - raise _BadRequest("header decode failed") from exc 227 - if ":" not in header_line: 228 - raise _BadRequest(f"bad header: {header_line!r}") 229 - name, _, value = header_line.partition(":") 230 - headers.append((name.strip(), value.strip())) 231 - 232 - headers_map = {k.lower(): v for k, v in headers} 233 - body = b"" 234 - if ( 235 - "transfer-encoding" in headers_map 236 - and headers_map["transfer-encoding"].lower() == "chunked" 237 - ): 238 - body = await _read_chunked(reader) 239 - else: 240 - cl_raw = headers_map.get("content-length", "0") 241 - try: 242 - cl = int(cl_raw) 243 - except ValueError as exc: 244 - raise _BadRequest(f"bad content-length: {cl_raw!r}") from exc 245 - if cl < 0 or cl > MAX_REQUEST_BODY: 246 - raise _BadRequest(f"content-length out of bounds: {cl}") 247 - if cl: 248 - body = await reader.readexactly(cl) 249 - 250 - return _Request( 251 - method=method, 252 - path=urllib.parse.unquote(path), 253 - query=query, 254 - headers=headers, 255 - body=body, 256 - ) 257 - 258 - 259 - async def _read_chunked(reader: asyncio.StreamReader) -> bytes: 260 - """Minimal chunked-transfer decoder for uploads.""" 261 - out = bytearray() 262 - total = 0 263 - while True: 264 - size_line = (await reader.readline()).decode("latin-1").strip() 265 - if ";" in size_line: 266 - size_line = size_line.split(";", 1)[0].strip() 267 - try: 268 - size = int(size_line, 16) 269 - except ValueError as exc: 270 - raise _BadRequest(f"bad chunk size: {size_line!r}") from exc 271 - if size == 0: 272 - # Consume trailer headers until blank line. 273 - while True: 274 - line = await reader.readline() 275 - if line in (b"\r\n", b"\n", b""): 276 - break 277 - break 278 - chunk = await reader.readexactly(size) 279 - out.extend(chunk) 280 - total += size 281 - if total > MAX_REQUEST_BODY: 282 - raise _BadRequest("request body too large") 283 - # Trailing CRLF after each chunk. 284 - trailer = await reader.readexactly(2) 285 - if trailer not in (b"\r\n", b"\n\r"): 286 - # tolerate bare \n 287 - pass 288 - return bytes(out) 289 - 290 - 291 - def _build_environ( 292 - request: _Request, 293 - *, 294 - peer_fingerprint: str | None, 295 - tunnel_id: str | None, 296 - ) -> dict[str, Any]: 297 - """Build a PEP-3333-compliant WSGI environ from the parsed request.""" 298 - environ: dict[str, Any] = { 299 - "REQUEST_METHOD": request.method, 300 - "SCRIPT_NAME": "", 301 - "PATH_INFO": request.path, 302 - "QUERY_STRING": request.query, 303 - "SERVER_NAME": WSGI_SERVER_NAME, 304 - "SERVER_PORT": "443", 305 - "SERVER_PROTOCOL": "HTTP/1.1", 306 - "wsgi.version": (1, 0), 307 - "wsgi.url_scheme": "https", 308 - "wsgi.input": io.BytesIO(request.body), 309 - "wsgi.errors": io.StringIO(), 310 - "wsgi.multithread": True, 311 - "wsgi.multiprocess": False, 312 - "wsgi.run_once": False, 313 - } 314 - 315 - for name, value in request.headers: 316 - key = "HTTP_" + name.upper().replace("-", "_") 317 - if name.lower() == "content-length": 318 - environ["CONTENT_LENGTH"] = value 319 - elif name.lower() == "content-type": 320 - environ["CONTENT_TYPE"] = value 321 - environ[key] = value 322 - 323 - if peer_fingerprint: 324 - environ["LINK_PEER_FINGERPRINT"] = peer_fingerprint 325 - if tunnel_id: 326 - environ["LINK_TUNNEL_ID"] = tunnel_id 327 - 328 - return environ 329 - 330 - 331 - async def _write_status_headers( 332 - writer: Any, 333 - code: int, 334 - reason: str, 335 - headers: list[tuple[str, str]], 336 - ) -> int: 337 - lines = [f"HTTP/1.1 {code} {reason}\r\n"] 338 - for name, value in headers: 339 - lines.append(f"{name}: {value}\r\n") 340 - lines.append("\r\n") 341 - out = "".join(lines).encode("latin-1") 342 - await writer.write(out) 343 - return len(out) 344 - 345 - 346 - async def _write_simple(writer: Any, code: int, reason: str, body: bytes) -> None: 347 - headers = [ 348 - ("Content-Type", "text/plain; charset=utf-8"), 349 - ("Content-Length", str(len(body))), 350 - ] 351 - await _write_status_headers(writer, code, reason, headers) 352 - if body: 353 - await writer.write(body) 354 - 355 - 356 - def _byte_count_for_simple(body: bytes) -> int: 357 - # Just a rough tally for metadata — status line + headers + body. 358 - return len(body) + 64