Rewild Your Web
18
fork

Configure Feed

Select the types of activity you want to include in your feed.

beaver_p2p: more reliable messaging

Signed-off-by: webbeef <me@webbeef.org>

webbeef 7c5a0ed7 83f3c2b0

+95 -86
+53 -33
crates/beaver_p2p/src/lib.rs
··· 11 11 12 12 use iroh::address_lookup::UserData; 13 13 use iroh::address_lookup::mdns::MdnsAddressLookup; 14 - use iroh::endpoint::{ClosedStream, ConnectError, ConnectionError, WriteError}; 14 + use iroh::endpoint::{ClosedStream, ConnectError, Connection, ConnectionError, WriteError}; 15 15 use iroh::protocol::Router; 16 16 use iroh::{Endpoint, EndpointAddr, EndpointId, RelayMode, SecretKey}; 17 17 use log::{error, info}; ··· 74 74 Connect(#[from] ConnectError), 75 75 #[error("Connection error")] 76 76 Connection(#[from] ConnectionError), 77 + #[error("Stream closed")] 78 + ClosedStream(#[from] ClosedStream), 79 + #[error("Write error")] 80 + Write(#[from] WriteError), 77 81 } 78 82 79 83 #[derive(Clone)] ··· 304 308 } 305 309 306 310 // Send a message to a paired peer. 311 + // Caches the QUIC connection per peer and opens a fresh uni stream per message. 312 + // If sending fails (stale connection), retries once with a fresh connection. 307 313 pub async fn send_message(&self, to: &EndpointId, message: &[u8]) -> Result<(), MessageError> { 308 314 let Some(ref inner) = self.inner else { 309 315 error!("Not initialized"); ··· 322 328 remote.addr() 323 329 }; 324 330 325 - if inner.state.lock().await.by_id(to).is_none() { 326 - error!("Can't send message to unknown remote {to}"); 327 - return Err(MessageError::UnknownRemote); 328 - }; 331 + // Get or create a connection to the peer. 332 + let connection = self.get_or_connect(inner, to, &addr).await?; 329 333 330 - // If we have an existing sender for that endpoint, use it. 331 - if let Some(ref mut sender) = inner.state.lock().await.get_message_sender(to) { 332 - BasePacket::send(message, sender).await?; 334 + // Try sending. If it fails, clear the stale connection and retry once. 335 + if Self::try_send(&connection, message).await.is_ok() { 333 336 return Ok(()); 334 337 } 335 338 336 - // Otherwise, create a new connection 337 - let connection = inner.router.endpoint().connect(addr, MESSAGE_ALPN).await?; 338 - let (mut sender, mut receiver) = connection.open_bi().await?; 339 - BasePacket::send(message, &mut sender).await?; 340 - inner.state.lock().await.set_message_sender(to, sender); 339 + info!("Send failed on cached connection to {to}, retrying with fresh connection"); 340 + inner 341 + .state 342 + .lock() 343 + .await 344 + .by_id_mut(to) 345 + .map(|r| r.clear_connection()); 341 346 342 - // create the relaying task 343 - // TODO: share with message_protocol 344 - let state = Arc::clone(&inner.state); 345 - let remote_id = *to; 346 - tokio::spawn(async move { 347 - loop { 348 - match BasePacket::recv(&mut receiver).await { 349 - Ok(payload) => { 350 - state 351 - .lock() 352 - .await 353 - .notify(PeerEvent::Message(remote_id, payload)); 354 - }, 355 - Err(err) => { 356 - error!("Error reading message packet: {err}"); 357 - break; 358 - }, 359 - } 360 - } 361 - }); 347 + let connection = self.get_or_connect(inner, to, &addr).await?; 348 + Self::try_send(&connection, message).await 349 + } 350 + 351 + async fn get_or_connect( 352 + &self, 353 + inner: &PairingManagerInner, 354 + to: &EndpointId, 355 + addr: &EndpointAddr, 356 + ) -> Result<Connection, MessageError> { 357 + let mut state = inner.state.lock().await; 358 + if let Some(conn) = state.by_id(to).and_then(|r| r.connection()).cloned() { 359 + return Ok(conn); 360 + } 361 + drop(state); 362 362 363 + let conn = inner 364 + .router 365 + .endpoint() 366 + .connect(addr.clone(), MESSAGE_ALPN) 367 + .await?; 368 + 369 + inner 370 + .state 371 + .lock() 372 + .await 373 + .by_id_mut(to) 374 + .map(|r| r.set_connection(conn.clone())); 375 + 376 + Ok(conn) 377 + } 378 + 379 + async fn try_send(connection: &Connection, message: &[u8]) -> Result<(), MessageError> { 380 + let mut sender = connection.open_uni().await?; 381 + BasePacket::send(message, &mut sender).await?; 382 + sender.finish()?; 363 383 Ok(()) 364 384 } 365 385
+21 -26
crates/beaver_p2p/src/message_protocol.rs
··· 2 2 3 3 use iroh::endpoint::Connection; 4 4 use iroh::protocol::{AcceptError, ProtocolHandler}; 5 - use log::{error, info}; 5 + use log::{debug, error}; 6 6 7 7 use crate::PeerEvent; 8 8 use crate::packet::BasePacket; ··· 25 25 impl ProtocolHandler for MessageProtocol { 26 26 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { 27 27 let remote_id = connection.remote_id(); 28 - info!( 29 - "accepted connection on {:?} from {:?}", 28 + debug!( 29 + "[P2P MSG] accepted connection on {:?} from {:?}", 30 30 String::from_utf8_lossy(connection.alpn()), 31 31 remote_id 32 32 ); 33 33 34 - // 1. Accept the bidirectional stream. 35 - let (sender, mut receiver) = connection.accept_bi().await?; 36 - 37 - // 2. Store the sender in the state for that remote ID. 38 - self.state 39 - .lock() 40 - .await 41 - .set_message_sender(&remote_id, sender); 42 - 43 - // 3. Relay all the incoming messages. 34 + // Accept uni streams — each incoming message arrives on its own stream. 44 35 loop { 45 - match BasePacket::recv(&mut receiver).await { 46 - Ok(payload) => { 47 - self.state 48 - .lock() 49 - .await 50 - .notify(PeerEvent::Message(remote_id, payload)); 36 + match connection.accept_uni().await { 37 + Ok(mut receiver) => match BasePacket::recv(&mut receiver).await { 38 + Ok(payload) => { 39 + debug!( 40 + "[P2P MSG] received {} bytes from {:?}", 41 + payload.len(), 42 + remote_id 43 + ); 44 + self.state 45 + .lock() 46 + .await 47 + .notify(PeerEvent::Message(remote_id, payload)); 48 + }, 49 + Err(err) => { 50 + error!("Error reading message packet: {err}"); 51 + }, 51 52 }, 52 - Err(err) => { 53 - error!("Error reading message packet: {err}"); 54 - break; 55 - }, 53 + Err(_) => break, 56 54 } 57 55 } 58 - 59 - // 4. Done with that connection? 60 - self.state.lock().await.remove_message_sender(&remote_id); 61 56 62 57 Ok(()) 63 58 }
+21 -27
crates/beaver_p2p/src/state.rs
··· 5 5 use std::sync::mpsc::Sender; 6 6 7 7 use iroh::address_lookup::DiscoveryEvent; 8 - use iroh::endpoint::SendStream; 8 + use iroh::endpoint::Connection; 9 9 use iroh::{EndpointAddr, EndpointId}; 10 10 use log::{info, warn}; 11 11 use serde::{Deserialize, Serialize}; ··· 27 27 id: EndpointId, 28 28 addr: EndpointAddr, 29 29 status: EndpointStatus, 30 - message_sender: Option<SendStream>, 30 + /// Cached QUIC connection for sending messages. 31 + connection: Option<Connection>, 31 32 } 32 33 33 34 impl EndpointProxy { ··· 42 43 id: id.to_owned(), 43 44 addr, 44 45 status, 45 - message_sender: None, 46 + connection: None, 46 47 } 47 48 } 48 49 ··· 57 58 pub(crate) fn is_paired(&self) -> bool { 58 59 self.status == EndpointStatus::PairedConnected || 59 60 self.status == EndpointStatus::PairedDisconnected 61 + } 62 + 63 + pub(crate) fn connection(&self) -> Option<&Connection> { 64 + self.connection.as_ref() 65 + } 66 + 67 + pub(crate) fn set_connection(&mut self, connection: Connection) { 68 + self.connection = Some(connection); 69 + } 70 + 71 + pub(crate) fn clear_connection(&mut self) { 72 + self.connection = None; 60 73 } 61 74 } 62 75 ··· 150 163 self.endpoints.get(id) 151 164 } 152 165 153 - pub(crate) fn add_endpoint(&mut self, id: &EndpointId, description: EndpointProxy) { 154 - self.endpoints.insert(*id, description); 155 - } 156 - 157 - pub(crate) fn set_message_sender(&mut self, id: &EndpointId, stream: SendStream) { 158 - info!("set_message_sender for {id}"); 159 - if let Some(desc) = self.endpoints.get_mut(id) { 160 - desc.message_sender = Some(stream); 161 - } 162 - } 163 - 164 - pub(crate) fn get_message_sender(&mut self, id: &EndpointId) -> Option<&mut SendStream> { 165 - info!("get_message_sender for {id}"); 166 - if let Some(desc) = self.endpoints.get_mut(id) { 167 - desc.message_sender.as_mut() 168 - } else { 169 - None 170 - } 166 + pub(crate) fn by_id_mut(&mut self, id: &EndpointId) -> Option<&mut EndpointProxy> { 167 + self.endpoints.get_mut(id) 171 168 } 172 169 173 - pub(crate) fn remove_message_sender(&mut self, id: &EndpointId) { 174 - info!("remove_message_sender for {id}"); 175 - if let Some(desc) = self.endpoints.get_mut(id) { 176 - desc.message_sender = None; 177 - } 170 + pub(crate) fn add_endpoint(&mut self, id: &EndpointId, description: EndpointProxy) { 171 + self.endpoints.insert(*id, description); 178 172 } 179 173 180 174 pub(crate) fn set_status(&mut self, id: &EndpointId, status: EndpointStatus) { ··· 259 253 id: endpoint_info.endpoint_id, 260 254 addr, 261 255 status: EndpointStatus::Discovered, 262 - message_sender: None, 256 + connection: None, 263 257 }; 264 258 self.endpoints 265 259 .insert(endpoint_info.endpoint_id, description);