Rewild Your Web
18
fork

Configure Feed

Select the types of activity you want to include in your feed.

p2p: improve peer stream api

Signed-off-by: webbeef <me@webbeef.org>

webbeef ba52ef2e 9f41ba34

+106 -50
+50 -25
patches/components/constellation/constellation.rs.patch
··· 308 308 ScriptToConstellationMessage::MediaSessionEvent(pipeline_id, event) => { 309 309 // Unlikely at this point, but we may receive events coming from 310 310 // different media sessions, so we set the active media session based 311 - @@ -2057,7 +2185,333 @@ 311 + @@ -2057,9 +2185,358 @@ 312 312 let _ = event_loop.send(ScriptThreadMessage::TriggerGarbageCollection); 313 313 } 314 314 }, ··· 463 463 + peer_id, 464 464 + local_port_id, 465 465 + remote_port_id, 466 + + target_url, 466 467 + callback, 467 468 + ) => { 469 + + debug!("CreatePeerStream: peer_id={peer_id}, target_url={target_url}"); 468 470 + // Register the virtual remote port that only exists in the constellation, 469 471 + // not in any GlobalScope. 470 472 + self.message_ports.insert( ··· 494 496 + &P2pMessage::PortOffer { 495 497 + stream_id, 496 498 + port_id: port_id_bytes, 497 - + from_peer: peer_id.clone(), 499 + + target_url, 498 500 + }, 499 501 + ); 500 502 + // TODO: wait for PortOfferAccepted before resolving. ··· 520 522 + .send_message(&from_peer, &P2pMessage::PortOfferDenied { stream_id }); 521 523 + } 522 524 + }, 523 - + } 524 - + } 525 - + 525 + } 526 + } 527 + 526 528 + fn handle_pairing_event(&mut self, event: constellation_traits::PairingEvent) { 527 529 + if let constellation_traits::PairingEvent::MessageReceived { ref from, ref data } = event { 530 + + debug!("P2P message received from {from}, {} bytes", data.len()); 528 531 + if let Some((from, message)) = self.pairing.handle_incoming_message(from, data) { 532 + + debug!("Decoded P2P message: {message:?}"); 529 533 + match message { 530 534 + P2pMessage::BroadcastChannelMessage { 531 535 + ref origin, ··· 551 555 + P2pMessage::PortOffer { 552 556 + ref stream_id, 553 557 + ref port_id, 554 - + .. 558 + + ref target_url, 555 559 + } => { 560 + + debug!( 561 + + "PortOffer received: stream_id={stream_id}, target_url={target_url}, from={from}" 562 + + ); 556 563 + let Ok(remote_port_id) = 557 564 + postcard::from_bytes::<base::id::MessagePortId>(port_id) 558 565 + else { ··· 568 575 + entangled_with: None, 569 576 + }, 570 577 + ); 571 - + // Notify script threads to create a MessagePort and fire peerstream event. 572 - + // The script thread will report back via PeerStreamResponse whether 573 - + // the event was accepted or denied. 578 + + // Dispatch only to the pipeline whose URL matches the target URL. 574 579 + let remote_port_id_bytes = port_id.clone(); 575 - + for event_loop in self.event_loops() { 576 - + let _ = event_loop.send(ScriptThreadMessage::DispatchPeerStream( 577 - + from.clone(), 578 - + remote_port_id_bytes.clone(), 579 - + stream_id.clone(), 580 - + from.clone(), 581 - + )); 580 + + let mut dispatched = false; 581 + + for (_pipeline_id, pipeline) in &self.pipelines { 582 + + let pipeline_url = pipeline.load_data.url.as_str(); 583 + + debug!( 584 + + "Checking pipeline {:?} url={} against target={}", 585 + + _pipeline_id, pipeline_url, target_url 586 + + ); 587 + + if pipeline_url == target_url { 588 + + let _ = pipeline.event_loop.send( 589 + + ScriptThreadMessage::DispatchPeerStream( 590 + + from.clone(), 591 + + remote_port_id_bytes.clone(), 592 + + stream_id.clone(), 593 + + from.clone(), 594 + + target_url.clone(), 595 + + ), 596 + + ); 597 + + dispatched = true; 598 + + // Send to the first matching pipeline only — avoid entanglement 599 + + // conflicts from multiple ports being created for the same offer. 600 + + break; 601 + + } 602 + + } 603 + + if !dispatched { 604 + + warn!("No pipeline matches target_url {target_url} for PortOffer"); 582 605 + } 583 606 + }, 584 607 + P2pMessage::PortOfferAccepted { .. } => { ··· 632 655 + // Handle peer disconnect: clean up remote channel state. 633 656 + if let constellation_traits::PairingEvent::PeerExpired { ref id } = event { 634 657 + self.pairing.clear_remote_peer(id); 635 - } 658 + + } 636 659 + 637 660 + for event_loop in self.event_loops() { 638 661 + if self.embedder_error_listeners.contains(&event_loop.id()) { 639 662 + let _ = event_loop.send(ScriptThreadMessage::DispatchPairingEvent(event.clone())); 640 663 + } 641 664 + } 642 - } 643 - 665 + + } 666 + + 644 667 /// Check the origin of a message against that of the pipeline it came from. 645 - @@ -2376,6 +2830,29 @@ 668 + /// Note: this is still limited as a security check, 669 + /// see <https://github.com/servo/servo/issues/11722> 670 + @@ -2376,6 +2853,29 @@ 646 671 TransferState::TransferInProgress(queue) => queue.push_back(task), 647 672 TransferState::CompletionFailed(queue) => queue.push_back(task), 648 673 TransferState::CompletionRequested(_, queue) => queue.push_back(task), ··· 672 697 } 673 698 } 674 699 675 - @@ -3246,6 +3723,13 @@ 700 + @@ -3246,6 +3746,13 @@ 676 701 /// <https://html.spec.whatwg.org/multipage/#destroy-a-top-level-traversable> 677 702 fn handle_close_top_level_browsing_context(&mut self, webview_id: WebViewId) { 678 703 debug!("{webview_id}: Closing"); ··· 686 711 let browsing_context_id = BrowsingContextId::from(webview_id); 687 712 // Step 5. Remove traversable from the user agent's top-level traversable set. 688 713 let browsing_context = 689 - @@ -3522,8 +4006,27 @@ 714 + @@ -3522,8 +4029,27 @@ 690 715 opener_webview_id, 691 716 opener_pipeline_id, 692 717 response_sender, ··· 714 739 let Some((webview_id_sender, webview_id_receiver)) = generic_channel::channel() else { 715 740 warn!("Failed to create channel"); 716 741 let _ = response_sender.send(None); 717 - @@ -3622,6 +4125,361 @@ 742 + @@ -3622,6 +4148,361 @@ 718 743 }); 719 744 } 720 745 ··· 1076 1101 #[servo_tracing::instrument(skip_all)] 1077 1102 fn handle_refresh_cursor(&self, pipeline_id: PipelineId) { 1078 1103 let Some(pipeline) = self.pipelines.get(&pipeline_id) else { 1079 - @@ -4747,7 +5605,7 @@ 1104 + @@ -4747,7 +5628,7 @@ 1080 1105 } 1081 1106 1082 1107 #[servo_tracing::instrument(skip_all)] ··· 1085 1110 // Send a flat projection of the history to embedder. 1086 1111 // The final vector is a concatenation of the URLs of the past 1087 1112 // entries, the current entry and the future entries. 1088 - @@ -4850,9 +5708,23 @@ 1113 + @@ -4850,9 +5731,23 @@ 1089 1114 ); 1090 1115 self.embedder_proxy.send(EmbedderMsg::HistoryChanged( 1091 1116 webview_id,
+2 -2
patches/components/constellation/pairing.rs.patch
··· 42 42 + stream_id: String, 43 43 + /// The serialized MessagePortId of the port on the offering side. 44 44 + port_id: Vec<u8>, 45 - + /// The peer that is offering the port. 46 - + from_peer: String, 45 + + /// The URL of the page that should receive the peer stream event on the remote side. 46 + + target_url: String, 47 47 + }, 48 48 + /// Accept a port offer — the remote side created its port. 49 49 + PortOfferAccepted {
+3 -1
patches/components/script/dom/navigator.rs.patch
··· 101 101 /// <https://html.spec.whatwg.org/multipage/#dom-navigator-registerprotocolhandler> 102 102 fn RegisterProtocolHandler(&self, scheme: DOMString, url: USVString) -> Fallible<()> { 103 103 // Step 1. Let (normalizedScheme, normalizedURLString) be the result of 104 - @@ -602,6 +630,60 @@ 104 + @@ -602,6 +630,62 @@ 105 105 self.user_activation 106 106 .or_init(|| UserActivation::new(&self.global(), can_gc)) 107 107 } ··· 109 109 + fn CreatePeerStream( 110 110 + &self, 111 111 + peer_id: DOMString, 112 + + target_url: USVString, 112 113 + comp: InRealm, 113 114 + can_gc: CanGc, 114 115 + ) -> Fallible<Rc<Promise>> { ··· 147 148 + peer_id.to_string(), 148 149 + *port1.message_port_id(), 149 150 + remote_port_id, 151 + + target_url.to_string(), 150 152 + callback, 151 153 + )) 152 154 + .is_err()
+45 -17
patches/components/script/script_thread.rs.patch
··· 88 88 }, 89 89 ScriptThreadMessage::ForwardKeyboardScroll(pipeline_id, scroll) => { 90 90 if let Some(document) = self.documents.borrow().find_document(pipeline_id) { 91 - @@ -1986,6 +2004,33 @@ 91 + @@ -1986,6 +2004,35 @@ 92 92 ScriptThreadMessage::TriggerGarbageCollection => unsafe { 93 93 JS_GC(*GlobalScope::get_cx(), GCReason::API); 94 94 }, ··· 110 110 + remote_port_id_bytes, 111 111 + stream_id, 112 112 + from_peer, 113 + + target_url, 113 114 + ) => { 114 115 + self.handle_dispatch_peer_stream( 115 116 + peer_id, 116 117 + remote_port_id_bytes, 117 118 + stream_id, 118 119 + from_peer, 120 + + target_url, 119 121 + CanGc::from_cx(cx), 120 122 + ); 121 123 + }, 122 124 } 123 125 } 124 126 125 - @@ -3064,6 +3109,9 @@ 127 + @@ -3064,6 +3111,9 @@ 126 128 .documents 127 129 .borrow() 128 130 .find_iframe(parent_pipeline_id, browsing_context_id); ··· 132 134 if let Some(frame_element) = frame_element { 133 135 frame_element.update_pipeline_id(new_pipeline_id, reason, cx); 134 136 } 135 - @@ -3083,6 +3131,7 @@ 137 + @@ -3083,6 +3133,7 @@ 136 138 // is no need to pass along existing opener information that 137 139 // will be discarded. 138 140 None, ··· 140 142 ); 141 143 } 142 144 } 143 - @@ -3359,6 +3408,131 @@ 145 + @@ -3359,6 +3410,157 @@ 144 146 } 145 147 } 146 148 ··· 199 201 + } 200 202 + 201 203 + /// Handle an incoming peer stream: create a local MessagePort and fire "peerstream" on Window. 204 + + /// Only fires on documents whose URL matches the target_url specified by the sender. 202 205 + /// If preventDefault() is called on the event, deny the offer. 203 206 + fn handle_dispatch_peer_stream( 204 207 + &self, ··· 206 209 + remote_port_id_bytes: Vec<u8>, 207 210 + stream_id: String, 208 211 + from_peer: String, 212 + + target_url: String, 209 213 + can_gc: CanGc, 210 214 + ) { 215 + + log::debug!( 216 + + "handle_dispatch_peer_stream: peer_id={peer_id}, stream_id={stream_id}, target_url={target_url}" 217 + + ); 211 218 + let Ok(remote_port_id) = postcard::from_bytes::<MessagePortId>(&remote_port_id_bytes) 212 219 + else { 213 220 + log::warn!("Failed to deserialize remote port ID in DispatchPeerStream"); 214 221 + return; 215 222 + }; 216 223 + 217 - + // Fire on all windows. If any handler calls preventDefault(), deny the offer. 218 - + // TODO: figure out a better design that doesn't dispatch on all the windows. 224 + + // Find the first document whose URL matches the target. 219 225 + let mut accepted = false; 226 + + let mut responding_global = None; 227 + + let doc_count = self.documents.borrow().iter().count(); 228 + + log::debug!("Searching {doc_count} documents for URL match with {target_url}"); 220 229 + for (_, document) in self.documents.borrow().iter() { 221 230 + let window = document.window(); 222 231 + let global = window.upcast::<crate::dom::globalscope::GlobalScope>(); 232 + + let doc_url = global.get_url(); 233 + + log::debug!( 234 + + "Document url={} vs target={} match={}", 235 + + doc_url.as_str(), 236 + + target_url, 237 + + doc_url.as_str() == target_url 238 + + ); 239 + + if doc_url.as_str() != target_url { 240 + + continue; 241 + + } 223 242 + let _ac = enter_realm(window); 224 243 + 244 + + log::debug!("URL matched! Creating local port and firing peerstream event"); 245 + + 225 246 + // Create a new local port and set its entanglement to the remote port. 226 247 + let local_port = MessagePort::new(global, can_gc); 227 248 + global.track_message_port(&local_port, None); ··· 250 271 + .upcast::<Event>() 251 272 + .fire(window.upcast::<EventTarget>(), can_gc); 252 273 + 274 + + log::debug!( 275 + + "peerstream event fired, defaultPrevented={}", 276 + + event.upcast::<Event>().DefaultPrevented() 277 + + ); 278 + + 253 279 + if !event.upcast::<Event>().DefaultPrevented() { 254 280 + accepted = true; 255 281 + } else { 256 282 + // Clean up the port — the offer was denied. 257 283 + local_port.Close(can_gc); 258 284 + } 285 + + responding_global = Some(global.script_to_constellation_chan().clone()); 286 + + break; 259 287 + } 260 288 + 261 289 + // Report back to the constellation. 262 - + if let Some((_, document)) = self.documents.borrow().iter().next() { 263 - + let global = document 264 - + .window() 265 - + .upcast::<crate::dom::globalscope::GlobalScope>(); 266 - + let _ = global.script_to_constellation_chan().send( 267 - + ScriptToConstellationMessage::PeerStreamResponse(stream_id, from_peer, accepted), 268 - + ); 290 + + if let Some(chan) = responding_global { 291 + + log::debug!("Sending PeerStreamResponse: accepted={accepted}"); 292 + + let _ = chan.send(ScriptToConstellationMessage::PeerStreamResponse( 293 + + stream_id, from_peer, accepted, 294 + + )); 295 + + } else { 296 + + log::warn!("No document matched target_url {target_url} — peerstream event not fired"); 269 297 + } 270 298 + } 271 299 + 272 300 fn ask_constellation_for_top_level_info( 273 301 &self, 274 302 sender_webview_id: WebViewId, 275 - @@ -3473,7 +3647,13 @@ 303 + @@ -3473,7 +3675,13 @@ 276 304 self.senders.pipeline_to_embedder_sender.clone(), 277 305 self.senders.constellation_sender.clone(), 278 306 incomplete.pipeline_id, ··· 287 315 incomplete.viewport_details, 288 316 origin.clone(), 289 317 final_url.clone(), 290 - @@ -3495,6 +3675,8 @@ 318 + @@ -3495,6 +3703,8 @@ 291 319 #[cfg(feature = "webgpu")] 292 320 self.gpu_id_hub.clone(), 293 321 incomplete.load_data.inherited_secure_context, ··· 296 324 incomplete.theme, 297 325 self.this.clone(), 298 326 ); 299 - @@ -3518,6 +3700,7 @@ 327 + @@ -3518,6 +3728,7 @@ 300 328 incomplete.webview_id, 301 329 incomplete.parent_info, 302 330 incomplete.opener, ··· 304 332 ); 305 333 if window_proxy.parent().is_some() { 306 334 // https://html.spec.whatwg.org/multipage/#navigating-across-documents:delaying-load-events-mode-2 307 - @@ -4291,6 +4474,24 @@ 335 + @@ -4291,6 +4502,24 @@ 308 336 document.event_handler().handle_refresh_cursor(); 309 337 } 310 338
+1 -1
patches/components/script_bindings/webidls/PeerStream.webidl.patch
··· 16 16 +}; 17 17 + 18 18 +partial interface Navigator { 19 - + [Throws] Promise<MessagePort> createPeerStream(DOMString peerId); 19 + + [Throws] Promise<MessagePort> createPeerStream(DOMString peerId, USVString targetURL); 20 20 +}; 21 21 + 22 22 +partial interface Window {
+3 -2
patches/components/shared/constellation/from_script_message.rs.patch
··· 119 119 /// Mark a new document as active 120 120 ActivateDocument, 121 121 /// Set the document state for a pipeline (used by screenshot / reftests) 122 - @@ -726,6 +775,72 @@ 122 + @@ -726,6 +775,73 @@ 123 123 RespondToScreenshotReadinessRequest(ScreenshotReadinessResponse), 124 124 /// Request the constellation to force garbage collection in all `ScriptThread`'s. 125 125 TriggerGarbageCollection, ··· 179 179 + PairingRejectPairing(String, GenericCallback<Result<(), String>>), 180 180 + /// Create a peer stream: create a virtual remote port entangled with a local port, 181 181 + /// and send the offer to a remote peer. 182 - + /// Args: peer_id, local_port_id, remote_port_id, callback. 182 + + /// Args: peer_id, local_port_id, remote_port_id, target_url, callback. 183 183 + CreatePeerStream( 184 184 + String, 185 185 + base::id::MessagePortId, 186 186 + base::id::MessagePortId, 187 + + String, 187 188 + GenericCallback<Result<(), String>>, 188 189 + ), 189 190 + /// Response to a DispatchPeerStream — whether the peer stream was accepted or denied.
+2 -2
patches/components/shared/script/lib.rs.patch
··· 55 55 + /// Dispatch a pairing event to all `navigator.embedder.pairing` instances in this script thread. 56 56 + DispatchPairingEvent(constellation_traits::PairingEvent), 57 57 + /// Dispatch a peer stream event — a remote peer is offering a MessagePort. 58 - + /// Contains (peer_id, serialized remote port_id bytes, stream_id, from_peer_id). 59 - + DispatchPeerStream(String, Vec<u8>, String, String), 58 + + /// Contains (peer_id, serialized remote port_id bytes, stream_id, from_peer_id, target_url). 59 + + DispatchPeerStream(String, Vec<u8>, String, String, String), 60 60 } 61 61 62 62 impl fmt::Debug for ScriptThreadMessage {