P2P support library for the beaver compute environment
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

messaging tests

webbeef 88398c62 3a064c53

+167 -43
+2 -2
Cargo.lock
··· 268 268 269 269 [[package]] 270 270 name = "bumpalo" 271 - version = "3.19.1" 271 + version = "3.20.1" 272 272 source = "registry+https://github.com/rust-lang/crates.io-index" 273 - checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" 273 + checksum = "5c6f81257d10a0f602a294ae4182251151ff97dbb504ef9afcdda4a64b24d9b4" 274 274 275 275 [[package]] 276 276 name = "byteorder"
+114 -18
src/lib.rs
··· 10 10 use std::sync::Arc; 11 11 use tokio::task::AbortHandle; 12 12 13 + use crate::packet::BasePacket; 13 14 use iroh::address_lookup::UserData; 14 15 use iroh::address_lookup::mdns::MdnsAddressLookup; 15 16 use iroh::endpoint::{ClosedStream, ConnectError, ConnectionError, WriteError}; 16 17 use iroh::{Endpoint, EndpointId, RelayMode, protocol::Router}; 17 18 use log::{error, info}; 18 19 use n0_future::StreamExt; 19 - use parking_lot::Mutex; 20 20 use std::sync::mpsc::Sender; 21 21 use thiserror::Error; 22 + use tokio::sync::Mutex; 22 23 23 - use crate::pairing_hook::PAIRING_ALPN; 24 + use crate::pairing_hook::{MESSAGE_ALPN, PAIRING_ALPN}; 25 + pub use crate::state::EndpointStatus; 24 26 pub use crate::state::PeerEvent; 25 - use crate::state::{EndpointDescription, EndpointStatus, PairingCommand, SharedState, State}; 27 + use crate::state::{EndpointDescription, PairingCommand, SharedState, State}; 26 28 27 29 #[derive(Debug, Error)] 28 30 pub enum PairingError { ··· 48 50 Postcard(#[from] postcard::Error), 49 51 } 50 52 53 + #[derive(Debug, Error)] 54 + pub enum MessageError { 55 + #[error("Invalid state for messaging")] 56 + InvalidState, 57 + #[error("Unknown remote endpoint")] 58 + UnknownRemote, 59 + #[error("Peer is not paired")] 60 + Unpaired, 61 + #[error("Packet sending error")] 62 + Packet(#[from] packet::PacketError), 63 + #[error("Failed to connect")] 64 + Connect(#[from] ConnectError), 65 + #[error("Connection error")] 66 + Connection(#[from] ConnectionError), 67 + } 68 + 51 69 #[derive(Clone)] 52 70 pub struct PairingManagerInner { 53 71 state: SharedState, ··· 73 91 .await 74 92 .unwrap(); 75 93 76 - info!("Endpoint {name}, {}", endpoint.id()); 94 + println!("Endpoint {name}, {}", endpoint.id()); 77 95 78 96 let mdns = MdnsAddressLookup::builder().build(endpoint.id()).unwrap(); 79 97 endpoint.address_lookup().add(mdns.clone()); ··· 83 101 let mut events = mdns.subscribe().await; 84 102 while let Some(event) = events.next().await { 85 103 // Update the state. 86 - disco_state.lock().on_discovery(&event); 104 + disco_state.lock().await.on_discovery(&event); 87 105 } 88 106 }); 89 107 ··· 92 110 PAIRING_ALPN, 93 111 pairing_protocol::PairingProtocol::new(Arc::clone(&state)), 94 112 ) 113 + .accept( 114 + MESSAGE_ALPN, 115 + message_protocol::MessageProtocol::new(Arc::clone(&state)), 116 + ) 95 117 .spawn(); 96 118 97 119 let inner = PairingManagerInner { ··· 111 133 }; 112 134 113 135 let addr = { 114 - let state = inner.state.lock(); 136 + let state = inner.state.lock().await; 115 137 let Some(remote) = state.by_id(id) else { 116 138 error!("Can't request pairing from unknown remote {id}"); 117 139 return Err(PairingError::UnknownRemote); ··· 121 143 122 144 // Don't request pairing twice with the same endpoint. 123 145 { 124 - let mut state = inner.state.lock(); 146 + let mut state = inner.state.lock().await; 125 147 if !state.has_requested(id) { 126 148 state.set_pairing_requested(id); 127 149 } else { ··· 163 185 .await 164 186 .expect("Failed to send Ack"); 165 187 166 - let mut state = inner.state.lock(); 188 + let mut state = inner.state.lock().await; 167 189 state.remove_pairing_requested(id); 168 190 sender.finish()?; 169 191 ··· 190 212 return Err(PairingError::InvalidState); 191 213 }; 192 214 193 - if inner.state.lock().by_id(from).is_none() { 215 + if inner.state.lock().await.by_id(from).is_none() { 194 216 error!("Can't send pairing response to unknown remote {from}"); 195 217 return Err(PairingError::UnknownRemote); 196 218 }; 197 219 198 220 let (sender, mut ack_receiver) = { 199 - let mut state = inner.state.lock(); 221 + let mut state = inner.state.lock().await; 200 222 let Some((sender, ack_receiver)) = state.take_pairing_responder(from) else { 201 223 error!("No responder for {from}"); 202 224 return Err(PairingError::UnknownRemote); ··· 226 248 return Err(PairingError::InvalidState); 227 249 }; 228 250 229 - self.send_pairing_response(from, PairingCommand::Accept) 230 - .await 231 - .map(|_| { 232 - // Add the endpoint to the set of pending acks if successfully sending. 233 - inner.state.lock().set_pending_ack(from); 234 - }) 251 + let res = self 252 + .send_pairing_response(from, PairingCommand::Accept) 253 + .await; 254 + println!("accept_pairing ok 3"); 255 + let mut state = inner.state.lock().await; 256 + let res = res.map(|_| { 257 + // Add the endpoint to the set of pending acks if successfully sending. 258 + state.set_pending_ack(from); 259 + }); 260 + res 235 261 } 236 262 237 263 // Reject a pairing request ··· 241 267 } 242 268 243 269 // Get the list of current known peers. 244 - pub fn peers(&self) -> Vec<EndpointDescription> { 270 + pub async fn peers(&self) -> Vec<EndpointDescription> { 245 271 let Some(ref inner) = self.inner else { 246 272 error!("Not initialized"); 247 273 return vec![]; ··· 250 276 inner 251 277 .state 252 278 .lock() 279 + .await 253 280 .endpoints() 254 281 .values() 255 282 .map(|e| e.into()) 256 283 .collect() 257 284 } 258 285 286 + // Send a message to a paired peer. 287 + pub async fn send_message(&self, to: &EndpointId, message: &[u8]) -> Result<(), MessageError> { 288 + let Some(ref inner) = self.inner else { 289 + error!("Not initialized"); 290 + return Err(MessageError::InvalidState); 291 + }; 292 + 293 + let addr = { 294 + let state = inner.state.lock().await; 295 + let Some(remote) = state.by_id(to) else { 296 + error!("Can't send messages to unknown remote {to}"); 297 + return Err(MessageError::UnknownRemote); 298 + }; 299 + if !remote.is_paired() { 300 + return Err(MessageError::Unpaired); 301 + } 302 + remote.addr() 303 + }; 304 + 305 + if inner.state.lock().await.by_id(to).is_none() { 306 + error!("Can't send message to unknown remote {to}"); 307 + return Err(MessageError::UnknownRemote); 308 + }; 309 + 310 + // If we have an existing sender for that endpoint, use it. 311 + if let Some(ref mut sender) = inner.state.lock().await.get_message_sender(to) { 312 + BasePacket::send(message, sender).await?; 313 + return Ok(()); 314 + } 315 + 316 + // Otherwise, create a new connection 317 + let connection = inner.router.endpoint().connect(addr, MESSAGE_ALPN).await?; 318 + let (mut sender, mut receiver) = connection.open_bi().await?; 319 + BasePacket::send(message, &mut sender).await?; 320 + inner.state.lock().await.set_message_sender(to, sender); 321 + 322 + // create the relaying task 323 + // TODO: share with message_protocol 324 + let state = Arc::clone(&inner.state); 325 + let remote_id = *to; 326 + tokio::spawn(async move { 327 + loop { 328 + match BasePacket::recv(&mut receiver).await { 329 + Ok(payload) => { 330 + state 331 + .lock() 332 + .await 333 + .notify(PeerEvent::Message(remote_id, payload)); 334 + } 335 + Err(err) => { 336 + error!("Error reading message packet: {err}"); 337 + break; 338 + } 339 + } 340 + } 341 + }); 342 + 343 + Ok(()) 344 + } 345 + 259 346 pub async fn stop(&mut self) { 260 347 let Some(ref inner) = self.inner else { 261 348 error!("Not initialized"); ··· 263 350 }; 264 351 265 352 if !inner.router.is_shutdown() { 266 - let _ = inner.mdns_handle.abort(); 353 + inner.mdns_handle.abort(); 267 354 let _ = inner.router.shutdown().await; 268 355 } 269 356 270 357 // Force dropping the mdns advertising tasks. 271 358 self.inner = None; 359 + } 360 + 361 + pub async fn set_status(&self, endpoint: &EndpointId, status: EndpointStatus) { 362 + let Some(ref inner) = self.inner else { 363 + error!("Not initialized"); 364 + return; 365 + }; 366 + 367 + inner.state.lock().await.set_status(endpoint, status); 272 368 } 273 369 }
+14 -3
src/message_protocol.rs
··· 25 25 impl ProtocolHandler for MessageProtocol { 26 26 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { 27 27 let remote_id = connection.remote_id(); 28 - info!( 28 + println!( 29 29 "accepted connection on {:?} from {:?}", 30 30 String::from_utf8_lossy(connection.alpn()), 31 31 remote_id 32 32 ); 33 33 34 34 // 1. Accept the bidirectional stream. 35 - let (mut sender, mut receiver) = connection.accept_bi().await?; 35 + let (sender, mut receiver) = connection.accept_bi().await?; 36 + 37 + // 2. Store the sender in the state for that remote ID. 38 + self.state 39 + .lock() 40 + .await 41 + .set_message_sender(&remote_id, sender); 36 42 37 - // 2. Relay all the incoming messages. 43 + // 3. Relay all the incoming messages. 38 44 loop { 39 45 match BasePacket::recv(&mut receiver).await { 40 46 Ok(payload) => { 41 47 self.state 42 48 .lock() 49 + .await 43 50 .notify(PeerEvent::Message(remote_id, payload)); 44 51 } 45 52 Err(err) => { ··· 48 55 } 49 56 } 50 57 } 58 + 59 + // 4. Done with that connection? 60 + self.state.lock().await.remove_message_sender(&remote_id); 61 + 51 62 Ok(()) 52 63 } 53 64 }
+6
src/pairing_hook.rs
··· 22 22 23 23 impl EndpointHooks for PairingHook { 24 24 async fn after_handshake<'a>(&'a self, conn: &'a ConnectionInfo) -> AfterHandshakeOutcome { 25 + println!( 26 + "after_handshake alpn={} side={:?} remote={}", 27 + String::from_utf8_lossy(conn.alpn()), conn.side(), conn.remote_id() 28 + ); 29 + 25 30 // Always allow outgoing connections. 26 31 if conn.side() == Side::Client { 27 32 return AfterHandshakeOutcome::Accept; ··· 37 42 && !self 38 43 .state 39 44 .lock() 45 + .await 40 46 .has(&conn.remote_id(), EndpointStatus::PairedConnected) 41 47 { 42 48 return AfterHandshakeOutcome::Reject {
+5 -3
src/pairing_protocol.rs
··· 50 50 51 51 // If we don't know about this endpoint yet, add it to our set as Discovered. 52 52 { 53 - let mut state = self.state.lock(); 53 + let mut state = self.state.lock().await; 54 54 if !state.has_any(&remote_id) { 55 55 info!("Registering auto-discovered endpoint at {remote_id}"); 56 56 let description = EndpointProxy::new( ··· 74 74 PairingCommand::Request => { 75 75 self.state 76 76 .lock() 77 + .await 77 78 .notify(PeerEvent::PairingRequest(remote_id)); 78 79 } 79 80 _ => { ··· 87 88 let (tokio_sender, mut tokio_receiver) = tokio_channel(2); 88 89 let (ack_sender, ack_receiver) = tokio_channel(2); 89 90 { 90 - let mut state = self.state.lock(); 91 + let mut state = self.state.lock().await; 91 92 state.set_pairing_responder(&remote_id, (tokio_sender, ack_receiver)); 92 93 } 93 94 ··· 111 112 match command { 112 113 PairingCommand::Ack => { 113 114 { 114 - let mut state = self.state.lock(); 115 + let mut state = self.state.lock().await; 115 116 if accepted { 116 117 state.notify(PeerEvent::PairingAccepted(remote_id)); 117 118 state.set_status(&remote_id, EndpointStatus::PairedConnected); ··· 124 125 _ => { 125 126 self.state 126 127 .lock() 128 + .await 127 129 .notify(PeerEvent::PairingFailed(remote_id)); 128 130 let _ = ack_sender.send(false).await; 129 131 error!("Unexpected command: {command:?}");
+18 -9
src/state.rs
··· 5 5 sync::Arc, 6 6 }; 7 7 8 - use iroh::{ 9 - EndpointAddr, EndpointId, 10 - address_lookup::DiscoveryEvent, 11 - endpoint::{RecvStream, SendStream}, 12 - }; 8 + use iroh::{EndpointAddr, EndpointId, address_lookup::DiscoveryEvent, endpoint::SendStream}; 13 9 14 10 use log::{info, warn}; 15 - use parking_lot::Mutex; 16 11 use serde::{Deserialize, Serialize}; 17 12 use std::sync::mpsc::Sender; 13 + use tokio::sync::Mutex; 18 14 use tokio::sync::mpsc::{Receiver as TokioReceiver, Sender as TokioSender}; 19 15 20 16 pub(crate) type SharedState = Arc<Mutex<State>>; ··· 27 23 } 28 24 29 25 #[derive(Debug)] 30 - 31 26 pub(crate) struct EndpointProxy { 32 27 name: String, 33 28 id: EndpointId, ··· 54 49 55 50 pub(crate) fn addr(&self) -> EndpointAddr { 56 51 self.addr.clone() 52 + } 53 + 54 + pub(crate) fn is_paired(&self) -> bool { 55 + self.status == EndpointStatus::PairedConnected 56 + || self.status == EndpointStatus::PairedDisconnected 57 57 } 58 58 } 59 59 ··· 151 151 self.endpoints.insert(*id, description); 152 152 } 153 153 154 - fn set_message_sender(&mut self, id: &EndpointId, stream: SendStream) { 154 + pub(crate) fn set_message_sender(&mut self, id: &EndpointId, stream: SendStream) { 155 155 info!("set_message_sender for {id}"); 156 156 if let Some(desc) = self.endpoints.get_mut(id) { 157 157 desc.message_sender = Some(stream); 158 158 } 159 159 } 160 160 161 - fn remove_message_sender(&mut self, id: &EndpointId) { 161 + pub(crate) fn get_message_sender(&mut self, id: &EndpointId) -> Option<&mut SendStream> { 162 + info!("get_message_sender for {id}"); 163 + if let Some(desc) = self.endpoints.get_mut(id) { 164 + desc.message_sender.as_mut() 165 + } else { 166 + None 167 + } 168 + } 169 + 170 + pub(crate) fn remove_message_sender(&mut self, id: &EndpointId) { 162 171 info!("remove_message_sender for {id}"); 163 172 if let Some(desc) = self.endpoints.get_mut(id) { 164 173 desc.message_sender = None;
+8 -8
tests/pairing.rs
··· 156 156 } 157 157 }); 158 158 159 - assert_eq!(manager2.peers().len(), 1); 160 - assert_eq!(manager1.peers().len(), 1); 159 + assert_eq!(manager2.peers().await.len(), 1); 160 + assert_eq!(manager1.peers().await.len(), 1); 161 161 manager2.stop().await; 162 162 163 163 let _ = handle1.join(); 164 - assert_eq!(manager1.peers().len(), 0); 164 + assert_eq!(manager1.peers().await.len(), 0); 165 165 manager1.stop().await; 166 166 } 167 167 ··· 179 179 let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); 180 180 181 181 let handle1 = std::thread::spawn(move || { 182 - // let mut peer2_id = None; 183 182 loop { 184 183 match receiver1.lock().recv() { 185 184 Ok(event) => { ··· 257 256 258 257 let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); 259 258 259 + println!("Discovery done"); 260 + 260 261 let handle1 = std::thread::spawn(move || { 261 - // let mut peer2_id = None; 262 262 loop { 263 263 match receiver1.lock().recv() { 264 264 Ok(event) => { ··· 296 296 rt.block_on(async { 297 297 mgr.accept_pairing(&endpoint) 298 298 .await 299 - .expect("failed to reject pairing"); 299 + .expect("failed to accept pairing"); 300 300 }); 301 301 } 302 302 PeerEvent::PairingAccepted(endpoint) => { ··· 319 319 let _ = handle1.join(); 320 320 let _ = handle2.join(); 321 321 322 - assert_eq!(manager1.peers().len(), 1); 323 - assert_eq!(manager2.peers().len(), 1); 322 + assert_eq!(manager1.peers().await.len(), 1); 323 + assert_eq!(manager2.peers().await.len(), 1); 324 324 325 325 manager1.stop().await; 326 326 manager2.stop().await;