⌜ ᐸ ᐊ ᐯ Ⲷ Ⲡ ⌟
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

init

yzzxyz 23aa39ba

+1181
+115
CASE_STUDY.md
··· 1 + Feature: Flower-Iroh Funnel Integration 2 + 3 + In order to enable federated learning across NAT boundaries 4 + As a system operator 5 + I want to route Flower gRPC communications through Iroh tunnels 6 + 7 + Background: 8 + Given Flower operates as a hub-and-spoke federated learning framework 9 + And Flower nodes communicate via gRPC using protobuf messages 10 + And Iroh provides P2P connections with NAT traversal and encryption 11 + 12 + Scenario: Flower Framework Architecture 13 + In order to coordinate distributed model training 14 + As a Flower deployment 15 + I want to understand the framework's component topology 16 + 17 + Given Flower uses a hub-and-spoke topology centered on a SuperLink server 18 + When organizing federated learning workloads 19 + Then the framework consists of three primary components: 20 + | Component | Role | 21 + | SuperLink | Long-running server that forwards tasks | 22 + | SuperNode | Client coordinating client training | 23 + | ServerApp | Short-lived process with project code | 24 + And communication flows through well-defined API ports 25 + | API | Port | Purpose | 26 + | Fleet API | 9092 | SuperNode to SuperLink messaging | 27 + | ServerAppIo API | 9091 | ServerApp to SuperLink run management | 28 + | ClientAppIo API | 9094 | ClientApp to SuperNode FAB retrieval | 29 + | Control API | 9093 | CLI to SuperLink user interaction | 30 + 31 + Scenario: Iroh Framework Architecture 32 + In order to establish peer connections across NAT boundaries 33 + As an Iroh deployment 34 + I want to leverage QUIC-based hole-punching and relay fallback 35 + 36 + Given Iroh dials by public key rather than IP address 37 + When establishing connections 38 + Then the framework uses a multi-tier connection strategy: 39 + | Strategy | Condition | 40 + | Direct P2P | When hole-punching succeeds | 41 + | Home Relay | When both endpoints have relays | 42 + | Public Relay | When NAT traversal fails | 43 + And all traffic is encrypted via QUIC with TLS 44 + And protocol negotiation uses ALPN for stream multiplexing 45 + 46 + Scenario: Flower-Iroh Funnel Design 47 + In order to route Flower gRPC through Iroh tunnels 48 + As a system architect 49 + I want to design a sidecar proxy architecture 50 + 51 + Given Flower communicates via gRPC on HTTP/2 52 + And Iroh provides QUIC-based transport with NAT traversal 53 + When integrating the frameworks 54 + Then the funnel package structure follows: 55 + | Component | Responsibility | 56 + | grpc_server.rs | gRPC server (Flower protocol terminator)| 57 + | tunnel.rs | Iroh tunnel management | 58 + | protocol.rs | Protocol translation layer | 59 + | address_lookup/ | Address resolution service | 60 + And the communication flow operates as: 61 + | Step | Description | 62 + | Flower Client | Initiates gRPC to funnel | 63 + | Funnel (Iroh endpoint) | Terminates gRPC, publishes endpoint | 64 + | Iroh tunnel | Encrypts, traverses NAT via QUIC | 65 + | Peer Funnel | Terminates Iroh, forwards to Flower | 66 + | Flower Server | Receives gRPC from peer funnel | 67 + 68 + Scenario: Funnel API Design 69 + In order to provide a clean library interface 70 + As a Rust developer 71 + I want to expose a typed API for tunnel management 72 + 73 + Given the funnel is a Rust package named flower-iroh-funnel 74 + When consuming the library 75 + Then the public interface provides: 76 + | Method | Returns | Purpose | 77 + | Funnel::new() | Result<Self, Error> | Initialize with config | 78 + | Funnel::serve() | Result<(), Error> | Start gRPC server | 79 + | Funnel::connect_to_peer | Result<PeerConn, Err>| Connect to remote peer | 80 + | Funnel::endpoint_id | EndpointId | Local iroh endpoint identity | 81 + And configuration includes: 82 + | Field | Type | Purpose | 83 + | alpn | Vec<u8> | Protocol negotiation bytes | 84 + | relay_mode | RelayMode | NAT traversal strategy | 85 + | bind_addr | SocketAddr | Local bind address | 86 + | flower_grpc_addr | SocketAddr | Flower gRPC listen address | 87 + 88 + Scenario: Cross-Platform Support 89 + In order to support deployment across major platforms 90 + As an operator 91 + I want the funnel to operate on macOS, Windows, and Linux 92 + 93 + Given Iroh's QUIC transport is cross-platform 94 + When deploying the funnel 95 + Then full support is available on: 96 + | Platform | Status | 97 + | macOS | Full | 98 + | Windows | Full | 99 + | Linux | Full | 100 + And platform detection uses cfg(target_os) conditionals 101 + 102 + Scenario: Dependencies 103 + In order to build the funnel package 104 + As a Rust developer 105 + I want to declare the correct dependencies 106 + 107 + Given the funnel requires both gRPC and P2P capabilities 108 + When configuring Cargo.toml 109 + Then the dependencies are: 110 + | Crate | Version | Purpose | 111 + | iroh | 0.98 | P2P connectivity and NAT traversal| 112 + | tokio | 1 | Async runtime with full features | 113 + | tonic | 0.12 | gRPC server implementation | 114 + | prost | 0.12 | Protocol buffer code generation | 115 + | serde | 1 | Serialization with derive support |
+119
COVEN.md
··· 1 + Understanding: 2 + 3 + - Flower is a federated learning framework with Python/gRPC communication 4 + - Flower nodes communicate via gRPC using protobuf messages (PullAppInputs, PushAppOutputs, etc.) 5 + - Iroh provides P2P connections with NAT traversal and encryption via endpoint IDs 6 + - The goal is a Rust "funnel" that routes Flower gRPC through Iroh tunnels 7 + 8 + Flower should wrap well with Iroh because: 9 + 10 + 1. Clean abstraction boundary - Flower's GrpcAdapter provides a clear interface (grpc_adapter() context manager returns send/receive callables) that could be replaced with iroh endpoints 11 + 2. Standard protocol - gRPC uses HTTP/2 which iroh can leverage for QUIC-based hole-punching 12 + 3. Separated concerns - Message serialization (protobuf), transport (gRPC), and application logic are modular 13 + 4. Simpler message model - Request-response pattern maps well to iroh's bidirectional QUIC streams 14 + 15 + Let me ask a clarifying question to ensure I understand the architecture correctly: 16 + 17 + # Questions 18 + 19 + Which Flower communication pattern should the Iroh funnel support? The Flower architecture has multiple communication paths: (1) Client→Server gRPC (2) ServerApp→ClientApp gRPC (3) Node-to-Node communication via Fleet protocol. Which should the funnel wrap? 20 + **Both layers** 21 + 22 + # Questions 23 + 24 + How should the Iroh funnel integrate with existing Flower deployments? Should it be a sidecar proxy that Flower nodes connect through, or an embedded library that replaces Flower transport directly? 25 + **Sidecar proxy (Recommended)** 26 + Now I have a clear picture. Let me present the comprehensive plan. 27 + 28 + --- 29 + 30 + Plan: Flower-Iroh Funnel Package 31 + Overview 32 + Create a modular Rust package called flower-iroh-funnel that acts as a sidecar proxy, wrapping Flower gRPC communications with Iroh for NAT traversal and encryption. 33 + Package Structure 34 + flower-iroh-funnel/ 35 + ├── Cargo.toml # Workspace member 36 + ├── src/ 37 + │ ├── lib.rs # Library interface 38 + │ ├── proxy.rs # Main proxy logic 39 + │ ├── grpc_server.rs # gRPC server (terminator) 40 + │ ├── tunnel.rs # Iroh tunnel management 41 + │ ├── protocol.rs # Protocol translation 42 + │ └── address_lookup/ # Address resolution 43 + ├── examples/ 44 + │ ├── simple_proxy.rs # Basic proxy example 45 + └── tests/ 46 + └── integration.rs 47 + Key Components 48 + 49 + 1. gRPC Server (Terminator) 50 + 51 + - Listens on Flower gRPC port 52 + - Accepts connections from Flower clients 53 + - Translates gRPC messages to Iroh stream format 54 + - Publishes its endpoint via Iroh address lookup 55 + 56 + 2. Iroh Tunnel Layer 57 + 58 + - Manages Iroh connections to peer funnels 59 + - Provides NAT traversal via relay/hole punching 60 + - Encrypts all traffic with Iroh's QUIC+TLS 61 + - Maps Flower node IDs to Iroh endpoint IDs 62 + 63 + 3. Protocol Translation 64 + 65 + - Converts Flower protobuf messages to Iroh streams 66 + - Handles streaming request/response patterns 67 + - Maintains message ordering and delivery confirmation 68 + 69 + 4. Address Lookup 70 + 71 + - Publishes Flower node info via Iroh's address lookup 72 + - Resolves Flower node IDs to Iroh endpoints 73 + - Supports DNS and pkarr integration 74 + API Design 75 + // Library interface 76 + pub struct Funnel { 77 + inner: Arc<FunnelInner>, 78 + } 79 + impl Funnel { 80 + pub async fn new(config: FunnelConfig) -> Result<Self, FunnelError>; 81 + 82 + pub async fn serve(&self) -> Result<(), FunnelError>; 83 + 84 + pub async fn connect_to_peer(&self, node_id: &NodeId) -> Result<PeerConnection, ConnectError>; 85 + 86 + pub fn endpoint_id(&self) -> EndpointId; 87 + 88 + } 89 + pub struct FunnelConfig { 90 + pub alpn: Vec<u8>, 91 + pub relay_mode: RelayMode, 92 + pub bind_addr: SocketAddr, 93 + pub flower_grpc_addr: SocketAddr, 94 + } 95 + Communication Flow 96 + Flower Client → gRPC → Funnel (Iroh endpoint) → Iroh tunnel → Peer Funnel → gRPC → Flower Server 97 + Cross-Platform Support 98 + 99 + - macOS: Full support via Iroh's QUIC transport 100 + - Windows: Full support via Iroh's QUIC transport 101 + - Linux: Full support via Iroh's QUIC transport 102 + - Platform detection via cfg(target_os) conditionals 103 + Dependencies 104 + [dependencies] 105 + iroh = "0.98" 106 + tokio = { version = "1", features = ["full"] } 107 + serde = { version = "1", features = ["derive"] } 108 + tonic = "0.12" # gRPC server 109 + prost = "0.12" 110 + Next Steps 111 + 112 + 1. Create the Rust crate structure 113 + 2. Implement the gRPC server (Flower protocol terminator) 114 + 3. Implement Iroh tunnel management 115 + 4. Add protocol translation layer 116 + 5. Add address lookup integration 117 + 6. Test with existing Flower deployments 118 + 119 + ---
+31
flower-iroh-funnel/Cargo.toml
··· 1 + [package] 2 + name = "flower-iroh-funnel" 3 + version = "0.1.0" 4 + edition = "2021" 5 + authors = ["COVEN Team"] 6 + description = "Flower-Iroh funnel for routing Flower gRPC through Iroh tunnels" 7 + 8 + build = "build.rs" 9 + 10 + [dependencies] 11 + iroh = "0.98" 12 + tokio = { version = "1", features = ["full"] } 13 + serde = { version = "1", features = ["derive"] } 14 + tonic = "0.12" 15 + prost = "0.12" 16 + bytes = "1" 17 + tracing = "0.1" 18 + tracing-subscriber = { version = "0.2", features = ["env-filter"] } 19 + anyhow = "1" 20 + thiserror = "1" 21 + futures = "0.3" 22 + 23 + [build-dependencies] 24 + tonic-build = "0.12" 25 + 26 + [dev-dependencies] 27 + tokio-test = "0.4" 28 + 29 + [[example]] 30 + name = "simple_proxy" 31 + path = "examples/simple_proxy.rs"
+7
flower-iroh-funnel/build.rs
··· 1 + fn main() -> Result<(), Box<dyn std::error::Error>> { 2 + tonic_build::configure() 3 + .build_attrib(true) 4 + .out_dir("src/generated") 5 + .compile(&["proto/fleet.proto", "proto/serverappio.proto", "proto/clientappio.proto", "proto/control.proto"], &["proto/"])?; 6 + Ok(()) 7 + }
+23
flower-iroh-funnel/examples/simple_proxy.rs
··· 1 + use anyhow::Result; 2 + use flower_iroh_funnel::{Funnel, FunnelConfig, RelayMode}; 3 + use std::net::SocketAddr; 4 + 5 + #[tokio::main] 6 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 7 + tracing_subscriber::fmt::init(); 8 + 9 + let config = FunnelConfig { 10 + alpn: b"flower/iroh/funnel/v1".to_vec(), 11 + relay_mode: RelayMode::Default, 12 + bind_addr: "0.0.0.0:0".parse()?, 13 + flower_grpc_addr: "0.0.0.0:9092".parse()?, 14 + }; 15 + 16 + let funnel = Funnel::new(config)?; 17 + let endpoint_id = funnel.endpoint_id(); 18 + 19 + tracing::info!("Funnel started with endpoint ID: {:?}", endpoint_id); 20 + tracing::info!("Share this ID with peers to establish tunnel connections"); 21 + 22 + Ok(()) 23 + }
+69
flower-iroh-funnel/proto/clientappio.proto
··· 1 + syntax = "proto3"; 2 + 3 + package clientappio; 4 + 5 + service ClientAppIo { 6 + rpc PullAppInputs(PullAppInputsRequest) returns (PullAppInputsResponse) {} 7 + rpc PushAppOutputs(PushAppOutputsRequest) returns (PushAppOutputsResponse) {} 8 + rpc GetStatus(GetStatusRequest) returns (GetStatusResponse) {} 9 + } 10 + 11 + message PullAppInputsRequest { 12 + int64 timeout = 1; 13 + string reply_to = 2; 14 + } 15 + 16 + message PullAppInputsResponse { 17 + repeated AppMessage messages = 1; 18 + } 19 + 20 + message PushAppOutputsRequest { 21 + repeated AppMessage messages = 1; 22 + } 23 + 24 + message PushAppOutputsResponse { 25 + repeated string message_ids = 1; 26 + } 27 + 28 + message GetStatusRequest {} 29 + 30 + message GetStatusResponse { 31 + string status = 1; 32 + } 33 + 34 + message AppMessage { 35 + Metadata metadata = 1; 36 + RecordDict content = 2; 37 + Error error = 3; 38 + } 39 + 40 + message Metadata { 41 + int64 run_id = 1; 42 + string message_id = 2; 43 + int64 src_node_id = 3; 44 + int64 dst_node_id = 4; 45 + string reply_to_message_id = 5; 46 + string group_id = 6; 47 + double ttl = 7; 48 + string message_type = 8; 49 + double created_at = 9; 50 + } 51 + 52 + message RecordDict { 53 + map<string, Value> values = 1; 54 + } 55 + 56 + message Value { 57 + oneof value { 58 + bytes bytes = 1; 59 + string string = 2; 60 + int64 integer = 3; 61 + double floating = 4; 62 + } 63 + } 64 + 65 + message Error { 66 + int64 code = 1; 67 + string message = 2; 68 + string traceback = 3; 69 + }
+40
flower-iroh-funnel/proto/common.proto
··· 1 + syntax = "proto3"; 2 + 3 + package common; 4 + 5 + message Message { 6 + Metadata metadata = 1; 7 + RecordDict content = 2; 8 + Error error = 3; 9 + } 10 + 11 + message Metadata { 12 + int64 run_id = 1; 13 + string message_id = 2; 14 + int64 src_node_id = 3; 15 + int64 dst_node_id = 4; 16 + string reply_to_message_id = 5; 17 + string group_id = 6; 18 + double ttl = 7; 19 + string message_type = 8; 20 + double created_at = 9; 21 + } 22 + 23 + message RecordDict { 24 + map<string, Value> values = 1; 25 + } 26 + 27 + message Value { 28 + oneof value { 29 + bytes bytes = 1; 30 + string string = 2; 31 + int64 integer = 3; 32 + double floating = 4; 33 + } 34 + } 35 + 36 + message Error { 37 + int64 code = 1; 38 + string message = 2; 39 + string traceback = 3; 40 + }
+83
flower-iroh-funnel/proto/control.proto
··· 1 + syntax = "proto3"; 2 + 3 + package control; 4 + 5 + service Control { 6 + rpc StartRun(StartRunRequest) returns (StartRunResponse) {} 7 + rpc StopRun(StopRunRequest) returns (StopRunResponse) {} 8 + rpc GetRunStatus(GetRunStatusRequest) returns (GetRunStatusResponse) {} 9 + rpc ListNodes(ListNodesRequest) returns (ListNodesResponse) {} 10 + rpc CreateFederation(CreateFederationRequest) returns (CreateFederationResponse) {} 11 + rpc JoinFederation(JoinFederationRequest) returns (JoinFederationResponse) {} 12 + } 13 + 14 + message StartRunRequest { 15 + string federation_id = 1; 16 + int64 num_nodes = 2; 17 + map<string, string> config = 3; 18 + } 19 + 20 + message StartRunResponse { 21 + int64 run_id = 1; 22 + bool success = 2; 23 + string message = 3; 24 + } 25 + 26 + message StopRunRequest { 27 + int64 run_id = 1; 28 + } 29 + 30 + message StopRunResponse { 31 + bool success = 1; 32 + string message = 2; 33 + } 34 + 35 + message GetRunStatusRequest { 36 + int64 run_id = 1; 37 + } 38 + 39 + message GetRunStatusResponse { 40 + RunStatus status = 1; 41 + } 42 + 43 + message ListNodesRequest { 44 + int64 run_id = 1; 45 + } 46 + 47 + message ListNodesResponse { 48 + repeated NodeInfo nodes = 1; 49 + } 50 + 51 + message CreateFederationRequest { 52 + string federation_id = 1; 53 + string superlink_address = 2; 54 + } 55 + 56 + message CreateFederationResponse { 57 + string ticket = 1; 58 + bool success = 2; 59 + } 60 + 61 + message JoinFederationRequest { 62 + string ticket = 1; 63 + } 64 + 65 + message JoinFederationResponse { 66 + bool success = 1; 67 + string federation_id = 2; 68 + } 69 + 70 + message RunStatus { 71 + int64 run_id = 1; 72 + string state = 2; 73 + int64 num_nodes = 3; 74 + repeated int64 active_nodes = 4; 75 + double started_at = 5; 76 + } 77 + 78 + message NodeInfo { 79 + int64 node_id = 1; 80 + string node_type = 2; 81 + string address = 3; 82 + bool is_active = 4; 83 + }
+156
flower-iroh-funnel/proto/fleet.proto
··· 1 + syntax = "proto3"; 2 + 3 + package fleet; 4 + 5 + service Fleet { 6 + rpc RegisterNode(RegisterNodeFleetRequest) returns (RegisterNodeFleetResponse) {} 7 + rpc ActivateNode(ActivateNodeRequest) returns (ActivateNodeResponse) {} 8 + rpc PullMessages(PullMessagesRequest) returns (PullMessagesResponse) {} 9 + rpc PushMessages(PushMessagesRequest) returns (PushMessagesResponse) {} 10 + rpc GetRun(GetRunRequest) returns (GetRunResponse) {} 11 + rpc GetFab(GetFabRequest) returns (GetFabResponse) {} 12 + rpc StopNode(StopNodeRequest) returns (StopNodeResponse) {} 13 + rpc PubSub(PubSubRequest) returns (PubSubResponse) {} 14 + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {} 15 + } 16 + 17 + message RegisterNodeFleetRequest { 18 + NodeConfig config = 1; 19 + int64 node_id = 2; 20 + string node_type = 3; 21 + string pub_key = 4; 22 + } 23 + 24 + message RegisterNodeFleetResponse { 25 + int64 node_id = 1; 26 + bool success = 2; 27 + string message = 3; 28 + } 29 + 30 + message ActivateNodeRequest { 31 + int64 node_id = 1; 32 + string session_id = 2; 33 + } 34 + 35 + message ActivateNodeResponse { 36 + bool success = 1; 37 + string message = 2; 38 + } 39 + 40 + message PullMessagesRequest { 41 + int64 node_id = 1; 42 + int64 run_id = 2; 43 + repeated string groups = 3; 44 + string reply_to = 4; 45 + int64 timeout = 5; 46 + } 47 + 48 + message PullMessagesResponse { 49 + repeated Message messages = 1; 50 + } 51 + 52 + message PushMessagesRequest { 53 + repeated Message messages = 1; 54 + } 55 + 56 + message PushMessagesResponse { 57 + repeated string message_ids = 1; 58 + } 59 + 60 + message GetRunRequest { 61 + int64 run_id = 1; 62 + } 63 + 64 + message GetRunResponse { 65 + Run run = 1; 66 + } 67 + 68 + message GetFabRequest { 69 + int64 node_id = 1; 70 + int64 run_id = 2; 71 + } 72 + 73 + message GetFabResponse { 74 + Fab fab = 1; 75 + } 76 + 77 + message StopNodeRequest { 78 + int64 node_id = 1; 79 + int64 run_id = 2; 80 + } 81 + 82 + message StopNodeResponse { 83 + bool success = 1; 84 + string message = 2; 85 + } 86 + 87 + message PubSubRequest { 88 + string topic = 1; 89 + Message message = 2; 90 + } 91 + 92 + message PubSubResponse { 93 + bool success = 1; 94 + } 95 + 96 + message HeartbeatRequest { 97 + int64 node_id = 1; 98 + } 99 + 100 + message HeartbeatResponse { 101 + bool alive = 1; 102 + } 103 + 104 + message Message { 105 + Metadata metadata = 1; 106 + RecordDict content = 2; 107 + Error error = 3; 108 + } 109 + 110 + message Metadata { 111 + int64 run_id = 1; 112 + string message_id = 2; 113 + int64 src_node_id = 3; 114 + int64 dst_node_id = 4; 115 + string reply_to_message_id = 5; 116 + string group_id = 6; 117 + double ttl = 7; 118 + string message_type = 8; 119 + double created_at = 9; 120 + } 121 + 122 + message RecordDict { 123 + map<string, Value> values = 1; 124 + } 125 + 126 + message Value { 127 + oneof value { 128 + bytes bytes = 1; 129 + string string = 2; 130 + int64 integer = 3; 131 + double floating = 4; 132 + } 133 + } 134 + 135 + message Error { 136 + int64 code = 1; 137 + string message = 2; 138 + string traceback = 3; 139 + } 140 + 141 + message NodeConfig { 142 + string address = 1; 143 + int64 node_id = 2; 144 + string pub_key = 3; 145 + } 146 + 147 + message Run { 148 + int64 run_id = 1; 149 + string federation_id = 2; 150 + int64 num_nodes = 3; 151 + string state = 4; 152 + } 153 + 154 + message Fab { 155 + bytes data = 1; 156 + }
+87
flower-iroh-funnel/proto/serverappio.proto
··· 1 + syntax = "proto3"; 2 + 3 + package serverappio; 4 + 5 + service ServerAppIo { 6 + rpc PushAppInputs(PushAppInputsRequest) returns (PushAppInputsResponse) {} 7 + rpc PullAppOutputs(PullAppOutputsRequest) returns (PullAppOutputsResponse) {} 8 + rpc GetNodeIds(GetNodeIdsRequest) returns (GetNodeIdsResponse) {} 9 + rpc GetRunConfig(GetRunConfigRequest) returns (GetRunConfigResponse) {} 10 + } 11 + 12 + message PushAppInputsRequest { 13 + repeated AppMessage messages = 1; 14 + } 15 + 16 + message PushAppInputsResponse { 17 + repeated string message_ids = 1; 18 + } 19 + 20 + message PullAppOutputsRequest { 21 + int64 timeout = 1; 22 + int64 run_id = 2; 23 + } 24 + 25 + message PullAppOutputsResponse { 26 + repeated AppMessage messages = 1; 27 + } 28 + 29 + message GetNodeIdsRequest { 30 + int64 run_id = 1; 31 + } 32 + 33 + message GetNodeIdsResponse { 34 + repeated int64 node_ids = 1; 35 + } 36 + 37 + message GetRunConfigRequest { 38 + int64 run_id = 1; 39 + } 40 + 41 + message GetRunConfigResponse { 42 + RunConfig config = 1; 43 + } 44 + 45 + message AppMessage { 46 + Metadata metadata = 1; 47 + RecordDict content = 2; 48 + Error error = 3; 49 + } 50 + 51 + message Metadata { 52 + int64 run_id = 1; 53 + string message_id = 2; 54 + int64 src_node_id = 3; 55 + int64 dst_node_id = 4; 56 + string reply_to_message_id = 5; 57 + string group_id = 6; 58 + double ttl = 7; 59 + string message_type = 8; 60 + double created_at = 9; 61 + } 62 + 63 + message RecordDict { 64 + map<string, Value> values = 1; 65 + } 66 + 67 + message Value { 68 + oneof value { 69 + bytes bytes = 1; 70 + string string = 2; 71 + int64 integer = 3; 72 + double floating = 4; 73 + } 74 + } 75 + 76 + message Error { 77 + int64 code = 1; 78 + string message = 2; 79 + string traceback = 3; 80 + } 81 + 82 + message RunConfig { 83 + int64 run_id = 1; 84 + string federation_id = 2; 85 + int64 num_nodes = 3; 86 + string state = 4; 87 + }
+30
flower-iroh-funnel/src/address_lookup.rs
··· 1 + use anyhow::Result; 2 + use std::collections::HashMap; 3 + use crate::FunnelError; 4 + use iroh::EndpointId; 5 + 6 + pub struct AddressLookup { 7 + table: HashMap<u64, EndpointId>, 8 + } 9 + 10 + impl AddressLookup { 11 + pub fn new() -> Self { 12 + Self { 13 + table: HashMap::new(), 14 + } 15 + } 16 + 17 + pub fn register(&mut self, node_id: u64, endpoint_id: EndpointId) { 18 + self.table.insert(node_id, endpoint_id); 19 + } 20 + 21 + pub fn lookup(&self, node_id: u64) -> Result<EndpointId, FunnelError> { 22 + self.table.get(&node_id) 23 + .ok_or_else(|| FunnelError::AddressLookup("Node not found".into())) 24 + .map(|id| *id) 25 + } 26 + 27 + pub fn remove(&mut self, node_id: u64) { 28 + self.table.remove(&node_id); 29 + } 30 + }
+58
flower-iroh-funnel/src/grpc_terminator.rs
··· 1 + use std::net::SocketAddr; 2 + use anyhow::Result; 3 + use crate::FunnelError; 4 + use tonic::{Server, Request, Response, Status}; 5 + use crate::proxy::Proxy; 6 + use crate::proxy::ProxyMessage; 7 + use std::sync::Arc; 8 + 9 + pub struct GrpcTerminator { 10 + addr: SocketAddr, 11 + proxy: Arc<Proxy>, 12 + } 13 + 14 + impl GrpcTerminator { 15 + pub fn new(addr: SocketAddr, proxy: Arc<Proxy>) -> Self { 16 + Self { addr, proxy } 17 + } 18 + 19 + pub async fn serve(&mut self) -> Result<(), FunnelError> { 20 + tracing::info!("gRPC terminator listening on {}", self.addr); 21 + Ok(()) 22 + } 23 + 24 + pub fn proxy(&self) -> Arc<Proxy> { 25 + self.proxy.clone() 26 + } 27 + } 28 + 29 + pub mod fleet_terminator { 30 + use super::*; 31 + use crate::FunnelError; 32 + use crate::proxy::{Proxy, ProxyMessage}; 33 + use tonic::{Request, Response, Status}; 34 + 35 + pub struct FleetTerminator; 36 + 37 + impl FleetTerminator { 38 + pub fn new() -> Self { 39 + Self {} 40 + } 41 + 42 + pub async fn register_node(&self, req: Request<()>) -> Result<Response<()>, Status> { 43 + Ok(Response::new(())) 44 + } 45 + 46 + pub async fn activate_node(&self, req: Request<()>) -> Result<Response<()>, Status> { 47 + Ok(Response::new(())) 48 + } 49 + 50 + pub async fn pull_messages(&self, req: Request<()>) -> Result<Response<()>, Status> { 51 + Ok(Response::new(())) 52 + } 53 + 54 + pub async fn push_messages(&self, req: Request<()>) -> Result<Response<()>, Status> { 55 + Ok(Response::new(())) 56 + } 57 + } 58 + }
+101
flower-iroh-funnel/src/lib.rs
··· 1 + use std::sync::Arc; 2 + use std::net::SocketAddr; 3 + use anyhow::{Result, Context}; 4 + use thiserror::Error; 5 + use rand::RngCore; 6 + 7 + #[derive(Debug, Error)] 8 + pub enum FunnelError { 9 + #[error("Iroh error: {0}")] 10 + Iroh(#[from] iroh::Error), 11 + #[error("gRPC error: {0}")] 12 + Grpc(String), 13 + #[error("protocol error: {0}")] 14 + Protocol(String), 15 + #[error("address lookup error: {0}")] 16 + AddressLookup(String), 17 + #[error("io error: {0}")] 18 + Io(#[from] std::io::Error), 19 + } 20 + 21 + #[derive(Clone)] 22 + pub struct FunnelConfig { 23 + pub alpn: Vec<u8>, 24 + pub relay_mode: RelayMode, 25 + pub bind_addr: SocketAddr, 26 + pub flower_grpc_addr: SocketAddr, 27 + } 28 + 29 + #[derive(Clone, Debug)] 30 + pub enum RelayMode { 31 + Default, 32 + None, 33 + Custom(Vec<String>), 34 + } 35 + 36 + pub struct Funnel { 37 + inner: Arc<FunnelInner>, 38 + } 39 + 40 + struct FunnelInner { 41 + config: FunnelConfig, 42 + endpoint: iroh::Endpoint, 43 + endpoint_id: iroh::EndpointId, 44 + } 45 + 46 + impl Funnel { 47 + pub async fn new(config: FunnelConfig) -> Result<Self, FunnelError> { 48 + let secret_key = iroh::SecretKey::generate(&mut rand::rng()); 49 + let endpoint = iroh::Endpoint::builder() 50 + .secret_key(secret_key) 51 + .relay_mode(map_relay_mode(&config.relay_mode)) 52 + .bind(config.bind_addr) 53 + .await 54 + .context("Failed to create Iroh endpoint")?; 55 + 56 + let endpoint_id = endpoint.id(); 57 + 58 + Ok(Self { 59 + inner: Arc::new(FunnelInner { 60 + config, 61 + endpoint, 62 + endpoint_id, 63 + }), 64 + }) 65 + } 66 + 67 + pub async fn serve(&self) -> Result<(), FunnelError> { 68 + let addr = self.inner.config.flower_grpc_addr; 69 + tracing::info!("Funnel gRPC server listening on {}", addr); 70 + Ok(()) 71 + } 72 + 73 + pub async fn connect_to_peer(&self, node_id: &iroh::EndpointId) -> Result<proxy::PeerConnection, FunnelError> { 74 + let addr = iroh::Addr::from(*node_id); 75 + let connection = self.inner.endpoint.connect(addr).await?; 76 + let stream = connection.open_stream(&self.inner.config.alpn).await?; 77 + 78 + let (tx, rx) = tokio::sync::mpsc::channel(100); 79 + Ok(proxy::PeerConnection::new(*node_id, tx, rx)) 80 + } 81 + 82 + pub fn endpoint_id(&self) -> iroh::EndpointId { 83 + self.inner.endpoint_id 84 + } 85 + } 86 + 87 + fn map_relay_mode(mode: &RelayMode) -> iroh::RelayMode { 88 + match mode { 89 + RelayMode::Default => iroh::RelayMode::Default, 90 + RelayMode::None => iroh::RelayMode::None, 91 + RelayMode::Custom(urls) => iroh::RelayMode::Custom(urls.iter().map(|s| s).collect()), 92 + } 93 + } 94 + 95 + mod proxy; 96 + mod tunnel; 97 + mod protocol; 98 + mod address_lookup; 99 + 100 + pub use self::{proxy::Proxy, proxy::PeerConnection, proxy::ProxyMessage}; 101 + pub use self::{address_lookup::AddressLookup, protocol::ProtocolTranslator};
+61
flower-iroh-funnel/src/protocol.rs
··· 1 + use anyhow::Result; 2 + use bytes::{Bytes, BytesMut, Buf, BufMut}; 3 + use crate::FunnelError; 4 + 5 + pub struct ProtocolTranslator; 6 + 7 + impl ProtocolTranslator { 8 + pub fn new() -> Self { 9 + Self {} 10 + } 11 + 12 + pub fn grpc_to_iroh(&self, grpc_msg: &[u8]) -> Result<Vec<u8>, FunnelError> { 13 + Ok(grpc_msg.to_vec()) 14 + } 15 + 16 + pub fn iroh_to_grpc(&self, iroh_msg: &[u8]) -> Result<Vec<u8>, FunnelError> { 17 + Ok(iroh_msg.to_vec()) 18 + } 19 + 20 + pub fn frame_message(&self, msg: &[u8]) -> Bytes { 21 + let mut buf = BytesMut::with_capacity(msg.len() + 10); 22 + write_varint(&mut buf, msg.len() as u64); 23 + buf.extend_from_slice(msg); 24 + buf.freeze() 25 + } 26 + 27 + pub fn parse_framed_message(&self, data: &[u8]) -> Result<Bytes, FunnelError> { 28 + let mut buf = BytesMut::from(data); 29 + let len = read_varint(&mut buf).map_err(|_| FunnelError::Protocol("Invalid frame".into()))?; 30 + if buf.remaining() < len as usize { 31 + return Err(FunnelError::Protocol("Incomplete message".into())); 32 + } 33 + Ok(buf.slice(0..len as usize)) 34 + } 35 + } 36 + 37 + fn write_varint(buf: &mut BytesMut, mut val: u64) { 38 + loop { 39 + let byte = (val & 0x7f) as u8; 40 + val >>= 7; 41 + if val == 0 { 42 + buf.push(byte); 43 + break; 44 + } 45 + buf.push(byte | 0x80); 46 + } 47 + } 48 + 49 + fn read_varint(buf: &mut BytesMut) -> Result<u64, std::io::Error> { 50 + let mut val = 0u64; 51 + let mut shift = 0; 52 + loop { 53 + let byte = buf.read_u8().map_err(|_| std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?; 54 + val |= ((byte & 0x7f) as u64) << shift; 55 + if byte & 0x80 == 0 { 56 + break; 57 + } 58 + shift += 7; 59 + } 60 + Ok(val) 61 + }
+47
flower-iroh-funnel/src/proxy.rs
··· 1 + use std::collections::HashMap; 2 + use std::sync::Arc; 3 + use tokio::sync::mpsc; 4 + use iroh::EndpointId; 5 + use crate::FunnelError; 6 + 7 + pub struct Proxy { 8 + connections: HashMap<EndpointId, PeerConnection>, 9 + } 10 + 11 + pub struct PeerConnection { 12 + pub peer_id: EndpointId, 13 + pub send: mpsc::Sender<ProxyMessage>, 14 + pub recv: mpsc::Receiver<ProxyMessage>, 15 + } 16 + 17 + pub struct ProxyMessage { 18 + pub content: Vec<u8>, 19 + pub src_node_id: u64, 20 + pub dst_node_id: u64, 21 + } 22 + 23 + impl PeerConnection { 24 + pub fn new(peer_id: EndpointId, send: mpsc::Sender<ProxyMessage>, recv: mpsc::Receiver<ProxyMessage>) -> Self { 25 + Self { peer_id, send, recv } 26 + } 27 + } 28 + 29 + impl Proxy { 30 + pub fn new() -> Self { 31 + Self { 32 + connections: HashMap::new(), 33 + } 34 + } 35 + 36 + pub fn add_peer(&mut self, peer_id: EndpointId, conn: PeerConnection) { 37 + self.connections.insert(peer_id, conn); 38 + } 39 + 40 + pub fn remove_peer(&mut self, peer_id: &EndpointId) { 41 + self.connections.remove(peer_id); 42 + } 43 + 44 + pub fn get_peer(&self, peer_id: &EndpointId) -> Option<&PeerConnection> { 45 + self.connections.get(peer_id) 46 + } 47 + }
+111
flower-iroh-funnel/src/tunnel.rs
··· 1 + use anyhow::Result; 2 + use std::pin::Pin; 3 + use std::future::Future; 4 + use crate::proxy::Proxy; 5 + use crate::proxy::PeerConnection; 6 + use crate::proxy::ProxyMessage; 7 + use crate::FunnelError; 8 + use iroh::Endpoint; 9 + use iroh::EndpointId; 10 + use iroh::Addr; 11 + use iroh::Connection; 12 + use iroh::Stream; 13 + 14 + pub const FLOWER_ALPN: &[u8] = b"flower/iroh/funnel/v1"; 15 + 16 + pub struct TunnelManager { 17 + endpoint: Endpoint, 18 + proxy: std::sync::Arc<Proxy>, 19 + } 20 + 21 + impl TunnelManager { 22 + pub fn new(endpoint: Endpoint, proxy: std::sync::Arc<Proxy>) -> Self { 23 + Self { endpoint, proxy } 24 + } 25 + 26 + pub async fn start(&mut self) -> Result<(), FunnelError> { 27 + let proxy = self.proxy.clone(); 28 + let endpoint_clone = self.endpoint.clone(); 29 + let alpn = FLOWER_ALPN.to_vec(); 30 + 31 + tokio::spawn(accept_loop(endpoint_clone, proxy, alpn)); 32 + 33 + tracing::info!("Tunnel manager started with endpoint ID: {:?}", self.endpoint.id()); 34 + Ok(()) 35 + } 36 + 37 + pub async fn connect_to_peer(&mut self, peer_id: &EndpointId) -> Result<PeerConnection, FunnelError> { 38 + let addr = Addr::from(*peer_id); 39 + let connection = self.endpoint.connect(addr).await?; 40 + let stream = connection.open_stream(FLOWER_ALPN).await?; 41 + 42 + let (tx, rx) = tokio::sync::mpsc::channel(100); 43 + let (tx_out, rx_out) = tokio::sync::mpsc::channel(100); 44 + 45 + let peer_id_val = *peer_id; 46 + tokio::spawn(handle_outgoing(rx, stream, tx_out)); 47 + 48 + Ok(PeerConnection::new(peer_id_val, tx, rx_out)) 49 + } 50 + } 51 + 52 + async fn accept_loop(endpoint: Endpoint, proxy: std::sync::Arc<Proxy>, _alpn: Vec<u8>) { 53 + loop { 54 + match endpoint.accept().await { 55 + Ok(Some((alpn_match, stream))) => { 56 + if alpn_match == FLOWER_ALPN { 57 + let proxy_clone = proxy.clone(); 58 + tokio::spawn(handle_incoming(stream, proxy_clone)); 59 + } 60 + } 61 + Ok(None) => break, 62 + Err(e) => { 63 + tracing::error!("Accept error: {}", e); 64 + break; 65 + } 66 + } 67 + } 68 + } 69 + 70 + async fn handle_incoming(stream: Stream, proxy: std::sync::Arc<Proxy>) { 71 + let mut stream = stream; 72 + loop { 73 + let msg = match read_message(&mut stream).await { 74 + Ok(m) => m, 75 + Err(_) => break, 76 + }; 77 + 78 + let _proxy_msg = ProxyMessage { 79 + content: msg, 80 + src_node_id: 0, 81 + dst_node_id: 0, 82 + }; 83 + } 84 + } 85 + 86 + async fn handle_outgoing( 87 + mut rx: tokio::sync::mpsc::Receiver<ProxyMessage>, 88 + mut stream: Stream, 89 + mut _tx_out: tokio::sync::mpsc::Sender<ProxyMessage>, 90 + ) { 91 + while let Some(msg) = rx.next().await { 92 + if let Err(e) = write_message(&mut stream, &msg.content).await { 93 + tracing::error!("Write error: {}", e); 94 + break; 95 + } 96 + } 97 + } 98 + 99 + async fn read_message(stream: &mut Stream) -> Result<Vec<u8>, std::io::Error> { 100 + let len = stream.read_uvarint().await?; 101 + let mut buf = vec![0u8; len as usize]; 102 + stream.read_exact(&mut buf).await?; 103 + Ok(buf) 104 + } 105 + 106 + async fn write_message(stream: &mut Stream, data: &[u8]) -> Result<(), std::io::Error> { 107 + stream.write_uvarint(data.len() as u64).await?; 108 + stream.write_all(data).await?; 109 + stream.flush().await?; 110 + Ok(()) 111 + }
+43
flower-iroh-funnel/tests/integration.rs
··· 1 + use anyhow::Result; 2 + use flower_iroh_funnel::{Funnel, FunnelConfig, RelayMode}; 3 + use std::net::SocketAddr; 4 + 5 + #[tokio::test] 6 + async fn test_funnel_creation() -> Result<(), Box<dyn std::error::Error>> { 7 + let config = FunnelConfig { 8 + alpn: b"flower/iroh/funnel/v1".to_vec(), 9 + relay_mode: RelayMode::Default, 10 + bind_addr: SocketAddr::new(0, 0), 11 + flower_grpc_addr: SocketAddr::new(0, 0), 12 + }; 13 + 14 + let funnel = Funnel::new(config)?; 15 + assert_ne!(funnel.endpoint_id(), iroh::EndpointId::new()); 16 + 17 + Ok(()) 18 + } 19 + 20 + #[tokio::test] 21 + async fn test_peer_connection() -> Result<(), Box<dyn std::error::Error>> { 22 + let config1 = FunnelConfig { 23 + alpn: b"flower/iroh/funnel/v1".to_vec(), 24 + relay_mode: RelayMode::Default, 25 + bind_addr: SocketAddr::new(0, 0), 26 + flower_grpc_addr: SocketAddr::new(0, 0), 27 + }; 28 + 29 + let config2 = FunnelConfig { 30 + alpn: b"flower/iroh/funnel/v1".to_vec(), 31 + relay_mode: RelayMode::Default, 32 + bind_addr: SocketAddr::new(0, 0), 33 + flower_grpc_addr: SocketAddr::new(0, 0), 34 + }; 35 + 36 + let funnel1 = Funnel::new(config1)?; 37 + let funnel2 = Funnel::new(config2)?; 38 + 39 + let peer_id = funnel2.endpoint_id(); 40 + let _connection = funnel1.connect_to_peer(&peer_id)?; 41 + 42 + Ok(()) 43 + }