personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

link: pipe tunnel streams to convey

Add a loopback TCP pipe for relay streams, switch the link service off the in-process
WSGI app path, and log stream-close metadata around the new conduit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+159 -38
+2 -2
think/link/README.md
··· 12 12 |------|---------| 13 13 | `service.py` | Entry point + runtime. `sol link` runs `main()` here. | 14 14 | `relay_client.py` | Listen-WS + per-tunnel TLS pump. Spawns a task per incoming tunnel. | 15 - | `wsgi_bridge.py` | HTTP/1.1 ⇄ WSGI adapter that pipes tunnel bytes to convey's real Flask app. | 15 + | `tcp_pipe.py` | Byte pump that opens a loopback TCP connection to convey and forwards stream bytes both ways. | 16 16 | `tls_adapter.py` | pyOpenSSL memory-BIO adapter. Runs TLS 1.3 over opaque byte streams. | 17 17 | `ca.py` | Local CA lifecycle + CSR signing + home-attestation minting. | 18 18 | `auth.py` | `authorized_clients.json` reader/writer with mtime-reload and last-seen tracking. | ··· 28 28 29 29 ## privacy 30 30 31 - No payload bytes are ever logged. The tunnel pump only emits rendezvous metadata (tunnel_id, stream_id, method, path, status, byte counts) to logs and callosum. The CA private key never leaves `journal/link/ca/private.pem`; account tokens live in `journal/link/tokens/` and device tokens live on the phone. 31 + No payload bytes are ever logged. The tunnel pump only emits rendezvous metadata (tunnel_id, stream_id, bytes_in, bytes_out, closed_reason) to logs and callosum. The CA private key never leaves `journal/link/ca/private.pem`; account tokens live in `journal/link/tokens/` and device tokens live on the phone.
+1
think/link/mux.py
··· 73 73 def __init__(self, mux: Multiplexer, state: _StreamState) -> None: 74 74 self._mux = mux 75 75 self._state = state 76 + self.stream_id = state.stream_id 76 77 77 78 async def write(self, data: bytes) -> None: 78 79 if self._state.writer_closed:
+18 -20
think/link/relay_client.py
··· 9 9 3. Loop: wait for {"type":"incoming","tunnel_id":...} control messages. 10 10 On each signal, spawn a tunnel task that opens /tunnel/<id>, drives 11 11 pyOpenSSL TLS 1.3 in memory-BIO mode, and hands the plaintext byte 12 - stream to the multiplexer + WSGI bridge. 12 + stream to the multiplexer + loopback TCP pipe into convey. 13 13 4. On disconnect, reconnect with exponential backoff (1s → 60s, ±25%). 14 14 15 15 All WebSocket I/O uses the `websockets` library in asyncio mode. The TLS ··· 17 17 task pumping bytes between the WS and the TLS engine. 18 18 19 19 Privacy invariant: NO payload bytes ever appear in logs. Only rendezvous 20 - metadata (tunnel_id, stream_id, byte_count, status code, duration) is 20 + metadata (tunnel_id, stream_id, bytes_in, bytes_out, closed_reason) is 21 21 eligible for logging, and everything emitted to callosum is the same 22 22 rendezvous-only subset. 23 23 """ ··· 40 40 from .auth import AuthorizedClients 41 41 from .ca import LoadedCa 42 42 from .mux import Multiplexer, StreamWriter 43 + from .tcp_pipe import ConveyUnreachable, PipeMetadata, pump_stream 43 44 from .tls_adapter import ( 44 45 TlsError, 45 46 build_server_context, ··· 47 48 issue_server_cert, 48 49 new_server, 49 50 ) 50 - from .wsgi_bridge import serve_request 51 51 52 52 log = logging.getLogger("link.relay_client") 53 53 ··· 70 70 on_account_token: Callable[[str], None], 71 71 ca: LoadedCa, 72 72 authorized: AuthorizedClients, 73 - wsgi_app: Callable[..., Any], 74 73 callosum_emit: CallosumEmit | None = None, 75 74 ) -> None: 76 75 self._instance_id = instance_id ··· 80 79 self._on_account_token = on_account_token 81 80 self._ca = ca 82 81 self._authorized = authorized 83 - self._wsgi_app = wsgi_app 84 82 self._emit = callosum_emit or (lambda _event, _fields: None) 85 83 self._running = False 86 84 self._listen_state = "offline" ··· 228 226 reader: asyncio.StreamReader, 229 227 writer: StreamWriter, 230 228 ) -> None: 231 - meta = await serve_request( 232 - reader, 233 - writer, 234 - self._wsgi_app, 235 - peer_fingerprint=tls.peer_fingerprint, 236 - tunnel_id=tunnel_id, 237 - ) 238 - await writer.close() 229 + try: 230 + meta: PipeMetadata = await pump_stream( 231 + reader, 232 + writer, 233 + tunnel_id=tunnel_id, 234 + stream_id=writer.stream_id, 235 + ) 236 + except ConveyUnreachable: 237 + raise 239 238 log.debug( 240 - "tunnel %s exchange: method=%s path=%s status=%s in=%s out=%s", 241 - tunnel_id, 242 - meta.method, 243 - meta.path, 244 - meta.status, 245 - meta.request_bytes, 246 - meta.response_bytes, 239 + "link stream closed tunnel=%s stream_id=%d bytes_in=%d bytes_out=%d reason=%s", 240 + meta.tunnel_id, 241 + meta.stream_id, 242 + meta.bytes_in, 243 + meta.bytes_out, 244 + meta.closed_reason, 247 245 ) 248 246 249 247 mux = Multiplexer(send_frame, handle_stream, is_listener=True)
+1 -16
think/link/service.py
··· 9 9 10 10 start → load state + CA → ensure account_token (enroll once) → 11 11 open listen WS to spl-relay → accept tunnel pairs → pump bytes through 12 - TLS → convey WSGI. On disconnect, reconnect with exponential backoff. 12 + TLS → convey (TCP pipe). On disconnect, reconnect with exponential backoff. 13 13 14 14 Exits on SIGINT/SIGTERM with a clean close of the listen WS and all 15 15 in-flight tunnel WSes. ··· 55 55 authorized = AuthorizedClients(authorized_clients_path()) 56 56 token = load_account_token() 57 57 58 - wsgi_app = _build_convey_wsgi() 59 - 60 58 callosum = CallosumConnection() 61 59 callosum.start() 62 60 ··· 74 72 on_account_token=save_account_token, 75 73 ca=ca, 76 74 authorized=authorized, 77 - wsgi_app=wsgi_app, 78 75 callosum_emit=emit, 79 76 ) 80 77 ··· 96 93 except asyncio.CancelledError: 97 94 pass 98 95 callosum.stop() 99 - 100 - 101 - def _build_convey_wsgi() -> Any: 102 - """Return convey's Flask app as a WSGI callable. 103 - 104 - Imported lazily so `sol call link status` (and other dry reads) don't 105 - pay the convey import cost. The returned object is the Flask app — 106 - calling it as `app(environ, start_response)` invokes its WSGI entry. 107 - """ 108 - from convey import create_app 109 - 110 - return create_app() 111 96 112 97 113 98 class _suppress_not_implemented:
+137
think/link/tcp_pipe.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Byte pump for tunnel→convey. 5 + 6 + Opens a loopback TCP connection to the running convey server and pumps 7 + bytes bidirectionally with half-close handling. Does not parse HTTP or 8 + inspect payloads — the pipe carries whatever the tunnel stream carries. 9 + """ 10 + 11 + from __future__ import annotations 12 + 13 + import asyncio 14 + import logging 15 + from dataclasses import dataclass 16 + 17 + from think.utils import read_service_port 18 + 19 + from .mux import StreamWriter 20 + 21 + log = logging.getLogger("link.pipe") 22 + 23 + _BUF = 65536 24 + 25 + 26 + @dataclass 27 + class PipeMetadata: 28 + tunnel_id: str 29 + stream_id: int 30 + bytes_in: int 31 + bytes_out: int 32 + closed_reason: str 33 + 34 + 35 + class ConveyUnreachable(RuntimeError): ... 36 + 37 + 38 + async def _pump_up(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> int: 39 + total = 0 40 + try: 41 + while True: 42 + data = await src.read(_BUF) 43 + if not data: 44 + try: 45 + dst.write_eof() 46 + except (OSError, RuntimeError): 47 + pass 48 + return total 49 + dst.write(data) 50 + await dst.drain() 51 + total += len(data) 52 + except (BrokenPipeError, ConnectionResetError): 53 + return total 54 + 55 + 56 + async def _pump_down(src: asyncio.StreamReader, dst: StreamWriter) -> int: 57 + total = 0 58 + try: 59 + while True: 60 + data = await src.read(_BUF) 61 + if not data: 62 + await dst.close() 63 + return total 64 + await dst.write(data) 65 + total += len(data) 66 + except (BrokenPipeError, ConnectionResetError): 67 + return total 68 + 69 + 70 + async def pump_stream( 71 + reader: asyncio.StreamReader, 72 + writer: StreamWriter, 73 + *, 74 + tunnel_id: str, 75 + stream_id: int, 76 + ) -> PipeMetadata: 77 + port = read_service_port("convey") 78 + if port is None: 79 + log.debug( 80 + "convey unreachable for stream tunnel=%s stream_id=%d: no convey port", 81 + tunnel_id, 82 + stream_id, 83 + ) 84 + raise ConveyUnreachable("convey port not published") 85 + 86 + try: 87 + tcp_reader, tcp_writer = await asyncio.open_connection("127.0.0.1", port) 88 + except (ConnectionRefusedError, OSError) as err: 89 + log.debug( 90 + "convey unreachable for stream tunnel=%s stream_id=%d: %s", 91 + tunnel_id, 92 + stream_id, 93 + err, 94 + ) 95 + raise ConveyUnreachable(str(err)) from err 96 + 97 + bytes_in = 0 98 + bytes_out = 0 99 + closed_reason = "both_eof" 100 + try: 101 + if hasattr(asyncio, "TaskGroup"): 102 + async with asyncio.TaskGroup() as tg: 103 + up = tg.create_task(_pump_up(reader, tcp_writer)) 104 + down = tg.create_task(_pump_down(tcp_reader, writer)) 105 + bytes_in, bytes_out = up.result(), down.result() 106 + else: 107 + tasks = [ 108 + asyncio.create_task(_pump_up(reader, tcp_writer)), 109 + asyncio.create_task(_pump_down(tcp_reader, writer)), 110 + ] 111 + try: 112 + bytes_in, bytes_out = await asyncio.gather(*tasks) 113 + except BaseException: 114 + for task in tasks: 115 + task.cancel() 116 + await asyncio.gather(*tasks, return_exceptions=True) 117 + raise 118 + if reader.at_eof() and tcp_reader.at_eof(): 119 + closed_reason = "both_eof" 120 + elif reader.at_eof(): 121 + closed_reason = "client_eof" 122 + elif tcp_reader.at_eof(): 123 + closed_reason = "server_eof" 124 + except asyncio.CancelledError: 125 + closed_reason = "cancelled" 126 + raise 127 + except Exception: 128 + closed_reason = "error" 129 + raise 130 + finally: 131 + try: 132 + tcp_writer.close() 133 + await tcp_writer.wait_closed() 134 + except (OSError, RuntimeError): 135 + pass 136 + 137 + return PipeMetadata(tunnel_id, stream_id, bytes_in, bytes_out, closed_reason)