//! Rlpx protocol multiplexer and satellite stream //! //! A Satellite is a Stream that primarily drives a single `RLPx` subprotocol but can also handle //! additional subprotocols. //! //! Most of other subprotocols are "dependent satellite" protocols of "eth" and not a fully standalone protocol, for example "snap", See also [snap protocol](https://github.com/ethereum/devp2p/blob/298d7a77c3bf833641579ecbbb5b13f0311eeeea/caps/snap.md?plain=1#L71) //! Hence it is expected that the primary protocol is "eth" and the additional protocols are //! "dependent satellite" protocols. use std::{ collections::VecDeque, fmt, future::Future, io, pin::{pin, Pin}, task::{ready, Context, Poll}, }; use crate::{ capability::{SharedCapabilities, SharedCapability, UnsupportedCapabilityError}, errors::{EthStreamError, P2PStreamError}, p2pstream::DisconnectP2P, CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, Status, UnauthedEthStream, }; use bytes::{Bytes, BytesMut}; use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt}; use reth_primitives::ForkFilter; use tokio::sync::{mpsc, mpsc::UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; /// A Stream and Sink type that wraps a raw rlpx stream [`P2PStream`] and handles message ID /// multiplexing. #[derive(Debug)] pub struct RlpxProtocolMultiplexer { inner: MultiplexInner, } impl RlpxProtocolMultiplexer { /// Wraps the raw p2p stream pub fn new(conn: P2PStream) -> Self { Self { inner: MultiplexInner { conn, protocols: Default::default(), out_buffer: Default::default(), }, } } /// Installs a new protocol on top of the raw p2p stream. /// /// This accepts a closure that receives a [`ProtocolConnection`] that will yield messages for /// the given capability. pub fn install_protocol( &mut self, cap: &Capability, f: F, ) -> Result<(), UnsupportedCapabilityError> where F: FnOnce(ProtocolConnection) -> Proto, Proto: Stream + Send + 'static, { self.inner.install_protocol(cap, f) } /// Returns the [`SharedCapabilities`] of the underlying raw p2p stream pub const fn shared_capabilities(&self) -> &SharedCapabilities { self.inner.shared_capabilities() } /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol. pub fn into_satellite_stream( self, cap: &Capability, primary: F, ) -> Result, P2PStreamError> where F: FnOnce(ProtocolProxy) -> Primary, { let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned() else { return Err(P2PStreamError::CapabilityNotShared) }; let (to_primary, from_wire) = mpsc::unbounded_channel(); let (to_wire, from_primary) = mpsc::unbounded_channel(); let proxy = ProtocolProxy { shared_cap: shared_cap.clone(), from_wire: UnboundedReceiverStream::new(from_wire), to_wire, }; let st = primary(proxy); Ok(RlpxSatelliteStream { inner: self.inner, primary: PrimaryProtocol { to_primary, from_primary: UnboundedReceiverStream::new(from_primary), st, shared_cap, }, }) } /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol. /// /// Returns an error if the primary protocol is not supported by the remote or the handshake /// failed. pub async fn into_satellite_stream_with_handshake( self, cap: &Capability, handshake: F, ) -> Result, Err> where F: FnOnce(ProtocolProxy) -> Fut, Fut: Future>, St: Stream> + Sink + Unpin, P2PStreamError: Into, { self.into_satellite_stream_with_tuple_handshake(cap, move |proxy| async move { let st = handshake(proxy).await?; Ok((st, ())) }) .await .map(|(st, _)| st) } /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol. /// /// Returns an error if the primary protocol is not supported by the remote or the handshake /// failed. /// /// This accepts a closure that does a handshake with the remote peer and returns a tuple of the /// primary stream and extra data. /// /// See also [`UnauthedEthStream::handshake`] pub async fn into_satellite_stream_with_tuple_handshake( mut self, cap: &Capability, handshake: F, ) -> Result<(RlpxSatelliteStream, Extra), Err> where F: FnOnce(ProtocolProxy) -> Fut, Fut: Future>, St: Stream> + Sink + Unpin, P2PStreamError: Into, { let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned() else { return Err(P2PStreamError::CapabilityNotShared.into()) }; let (to_primary, from_wire) = mpsc::unbounded_channel(); let (to_wire, mut from_primary) = mpsc::unbounded_channel(); let proxy = ProtocolProxy { shared_cap: shared_cap.clone(), from_wire: UnboundedReceiverStream::new(from_wire), to_wire, }; let f = handshake(proxy); let mut f = pin!(f); // this polls the connection and the primary stream concurrently until the handshake is // complete loop { tokio::select! { Some(Ok(msg)) = self.inner.conn.next() => { // Ensure the message belongs to the primary protocol let Some(offset) = msg.first().copied() else { return Err(P2PStreamError::EmptyProtocolMessage.into()) }; if let Some(cap) = self.shared_capabilities().find_by_relative_offset(offset).cloned() { if cap == shared_cap { // delegate to primary let _ = to_primary.send(msg); } else { // delegate to satellite self.inner.delegate_message(&cap, msg); } } else { return Err(P2PStreamError::UnknownReservedMessageId(offset).into()) } } Some(msg) = from_primary.recv() => { self.inner.conn.send(msg).await.map_err(Into::into)?; } res = &mut f => { let (st, extra) = res?; return Ok((RlpxSatelliteStream { inner: self.inner, primary: PrimaryProtocol { to_primary, from_primary: UnboundedReceiverStream::new(from_primary), st, shared_cap, } }, extra)) } } } } /// Converts this multiplexer into a [`RlpxSatelliteStream`] with eth protocol as the given /// primary protocol. pub async fn into_eth_satellite_stream( self, status: Status, fork_filter: ForkFilter, ) -> Result<(RlpxSatelliteStream>, Status), EthStreamError> where St: Stream> + Sink + Unpin, { let eth_cap = self.inner.conn.shared_capabilities().eth_version()?; self.into_satellite_stream_with_tuple_handshake( &Capability::eth(eth_cap), move |proxy| async move { UnauthedEthStream::new(proxy).handshake(status, fork_filter).await }, ) .await } } #[derive(Debug)] struct MultiplexInner { /// The raw p2p stream conn: P2PStream, /// All the subprotocols that are multiplexed on top of the raw p2p stream protocols: Vec, /// Buffer for outgoing messages on the wire. out_buffer: VecDeque, } impl MultiplexInner { const fn shared_capabilities(&self) -> &SharedCapabilities { self.conn.shared_capabilities() } /// Delegates a message to the matching protocol. fn delegate_message(&self, cap: &SharedCapability, msg: BytesMut) -> bool { for proto in &self.protocols { if proto.shared_cap == *cap { proto.send_raw(msg); return true } } false } fn install_protocol( &mut self, cap: &Capability, f: F, ) -> Result<(), UnsupportedCapabilityError> where F: FnOnce(ProtocolConnection) -> Proto, Proto: Stream + Send + 'static, { let shared_cap = self.conn.shared_capabilities().ensure_matching_capability(cap).cloned()?; let (to_satellite, rx) = mpsc::unbounded_channel(); let proto_conn = ProtocolConnection { from_wire: UnboundedReceiverStream::new(rx) }; let st = f(proto_conn); let st = ProtocolStream { shared_cap, to_satellite, satellite_st: Box::pin(st) }; self.protocols.push(st); Ok(()) } } /// Represents a protocol in the multiplexer that is used as the primary protocol. #[derive(Debug)] struct PrimaryProtocol { /// Channel to send messages to the primary protocol. to_primary: UnboundedSender, /// Receiver for messages from the primary protocol. from_primary: UnboundedReceiverStream, /// Shared capability of the primary protocol. shared_cap: SharedCapability, /// The primary stream. st: Primary, } /// A Stream and Sink type that acts as a wrapper around a primary `RLPx` subprotocol (e.g. "eth") /// /// Only emits and sends _non-empty_ messages #[derive(Debug)] pub struct ProtocolProxy { shared_cap: SharedCapability, /// Receives _non-empty_ messages from the wire from_wire: UnboundedReceiverStream, /// Sends _non-empty_ messages from the wire to_wire: UnboundedSender, } impl ProtocolProxy { /// Sends a _non-empty_ message on the wire. fn try_send(&self, msg: Bytes) -> Result<(), io::Error> { if msg.is_empty() { // message must not be empty return Err(io::ErrorKind::InvalidInput.into()) } self.to_wire.send(self.mask_msg_id(msg)?).map_err(|_| io::ErrorKind::BrokenPipe.into()) } /// Masks the message ID of a message to be sent on the wire. #[inline] fn mask_msg_id(&self, msg: Bytes) -> Result { if msg.is_empty() { // message must not be empty return Err(io::ErrorKind::InvalidInput.into()) } let offset = self.shared_cap.relative_message_id_offset(); if offset == 0 { return Ok(msg); } let mut masked = Vec::from(msg); masked[0] = masked[0].checked_add(offset).ok_or(io::ErrorKind::InvalidInput)?; Ok(masked.into()) } /// Unmasks the message ID of a message received from the wire. #[inline] fn unmask_id(&self, mut msg: BytesMut) -> Result { if msg.is_empty() { // message must not be empty return Err(io::ErrorKind::InvalidInput.into()) } msg[0] = msg[0] .checked_sub(self.shared_cap.relative_message_id_offset()) .ok_or(io::ErrorKind::InvalidInput)?; Ok(msg) } } impl Stream for ProtocolProxy { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let msg = ready!(self.from_wire.poll_next_unpin(cx)); Poll::Ready(msg.map(|msg| self.get_mut().unmask_id(msg))) } } impl Sink for ProtocolProxy { type Error = io::Error; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { self.get_mut().try_send(item) } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } impl CanDisconnect for ProtocolProxy { async fn disconnect( &mut self, _reason: DisconnectReason, ) -> Result<(), >::Error> { // TODO handle disconnects Ok(()) } } /// A connection channel to receive _`non_empty`_ messages for the negotiated protocol. /// /// This is a [Stream] that returns raw bytes of the received messages for this protocol. #[derive(Debug)] pub struct ProtocolConnection { from_wire: UnboundedReceiverStream, } impl Stream for ProtocolConnection { type Item = BytesMut; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.from_wire.poll_next_unpin(cx) } } /// A Stream and Sink type that acts as a wrapper around a primary `RLPx` subprotocol (e.g. "eth") /// [`EthStream`] and can also handle additional subprotocols. #[derive(Debug)] pub struct RlpxSatelliteStream { inner: MultiplexInner, primary: PrimaryProtocol, } impl RlpxSatelliteStream { /// Installs a new protocol on top of the raw p2p stream. /// /// This accepts a closure that receives a [`ProtocolConnection`] that will yield messages for /// the given capability. pub fn install_protocol( &mut self, cap: &Capability, f: F, ) -> Result<(), UnsupportedCapabilityError> where F: FnOnce(ProtocolConnection) -> Proto, Proto: Stream + Send + 'static, { self.inner.install_protocol(cap, f) } /// Returns the primary protocol. #[inline] pub const fn primary(&self) -> &Primary { &self.primary.st } /// Returns mutable access to the primary protocol. #[inline] pub fn primary_mut(&mut self) -> &mut Primary { &mut self.primary.st } /// Returns the underlying [`P2PStream`]. #[inline] pub const fn inner(&self) -> &P2PStream { &self.inner.conn } /// Returns mutable access to the underlying [`P2PStream`]. #[inline] pub fn inner_mut(&mut self) -> &mut P2PStream { &mut self.inner.conn } /// Consumes this type and returns the wrapped [`P2PStream`]. #[inline] pub fn into_inner(self) -> P2PStream { self.inner.conn } } impl Stream for RlpxSatelliteStream where St: Stream> + Sink + Unpin, Primary: TryStream + Unpin, P2PStreamError: Into, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); loop { // first drain the primary stream if let Poll::Ready(Some(msg)) = this.primary.st.try_poll_next_unpin(cx) { return Poll::Ready(Some(msg)) } let mut conn_ready = true; loop { match this.inner.conn.poll_ready_unpin(cx) { Poll::Ready(Ok(())) => { if let Some(msg) = this.inner.out_buffer.pop_front() { if let Err(err) = this.inner.conn.start_send_unpin(msg) { return Poll::Ready(Some(Err(err.into()))) } } else { break } } Poll::Ready(Err(err)) => { if let Err(disconnect_err) = this.inner.conn.start_disconnect(DisconnectReason::DisconnectRequested) { return Poll::Ready(Some(Err(disconnect_err.into()))) } return Poll::Ready(Some(Err(err.into()))) } Poll::Pending => { conn_ready = false; break } } } // advance primary out loop { match this.primary.from_primary.poll_next_unpin(cx) { Poll::Ready(Some(msg)) => { this.inner.out_buffer.push_back(msg); } Poll::Ready(None) => { // primary closed return Poll::Ready(None) } Poll::Pending => break, } } // advance all satellites for idx in (0..this.inner.protocols.len()).rev() { let mut proto = this.inner.protocols.swap_remove(idx); loop { match proto.poll_next_unpin(cx) { Poll::Ready(Some(Err(err))) => { return Poll::Ready(Some(Err(P2PStreamError::Io(err).into()))) } Poll::Ready(Some(Ok(msg))) => { this.inner.out_buffer.push_back(msg); } Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => { this.inner.protocols.push(proto); break } } } } let mut delegated = false; loop { // pull messages from connection match this.inner.conn.poll_next_unpin(cx) { Poll::Ready(Some(Ok(msg))) => { delegated = true; let Some(offset) = msg.first().copied() else { return Poll::Ready(Some(Err( P2PStreamError::EmptyProtocolMessage.into() ))) }; // delegate the multiplexed message to the correct protocol if let Some(cap) = this.inner.conn.shared_capabilities().find_by_relative_offset(offset) { if cap == &this.primary.shared_cap { // delegate to primary let _ = this.primary.to_primary.send(msg); } else { // delegate to installed satellite if any for proto in &this.inner.protocols { if proto.shared_cap == *cap { proto.send_raw(msg); break } } } } else { return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId( offset, ) .into()))) } } Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))), Poll::Ready(None) => { // connection closed return Poll::Ready(None) } Poll::Pending => break, } } if !conn_ready || (!delegated && this.inner.out_buffer.is_empty()) { return Poll::Pending } } } } impl Sink for RlpxSatelliteStream where St: Stream> + Sink + Unpin, Primary: Sink + Unpin, P2PStreamError: Into<>::Error>, { type Error = >::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) { return Poll::Ready(Err(err.into())) } if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) { return Poll::Ready(Err(err)) } Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { self.get_mut().primary.st.start_send_unpin(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into) } } /// Wraps a `RLPx` subprotocol and handles message ID multiplexing. struct ProtocolStream { shared_cap: SharedCapability, /// the channel shared with the satellite stream to_satellite: UnboundedSender, satellite_st: Pin + Send>>, } impl ProtocolStream { /// Masks the message ID of a message to be sent on the wire. #[inline] fn mask_msg_id(&self, mut msg: BytesMut) -> Result { if msg.is_empty() { // message must not be empty return Err(io::ErrorKind::InvalidInput.into()) } msg[0] = msg[0] .checked_add(self.shared_cap.relative_message_id_offset()) .ok_or(io::ErrorKind::InvalidInput)?; Ok(msg.freeze()) } /// Unmasks the message ID of a message received from the wire. #[inline] fn unmask_id(&self, mut msg: BytesMut) -> Result { if msg.is_empty() { // message must not be empty return Err(io::ErrorKind::InvalidInput.into()) } msg[0] = msg[0] .checked_sub(self.shared_cap.relative_message_id_offset()) .ok_or(io::ErrorKind::InvalidInput)?; Ok(msg) } /// Sends the message to the satellite stream. fn send_raw(&self, msg: BytesMut) { let _ = self.unmask_id(msg).map(|msg| self.to_satellite.send(msg)); } } impl Stream for ProtocolStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); let msg = ready!(this.satellite_st.as_mut().poll_next(cx)); Poll::Ready(msg.map(|msg| this.mask_msg_id(msg))) } } impl fmt::Debug for ProtocolStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ProtocolStream").field("cap", &self.shared_cap).finish_non_exhaustive() } } #[cfg(test)] mod tests { use super::*; use crate::{ test_utils::{ connect_passthrough, eth_handshake, eth_hello, proto::{test_hello, TestProtoMessage}, }, UnauthedP2PStream, }; use tokio::{net::TcpListener, sync::oneshot}; use tokio_util::codec::Decoder; #[tokio::test] async fn eth_satellite() { reth_tracing::init_test_tracing(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); let (status, fork_filter) = eth_handshake(); let other_status = status; let other_fork_filter = fork_filter.clone(); let _handle = tokio::spawn(async move { let (incoming, _) = listener.accept().await.unwrap(); let stream = crate::PassthroughCodec::default().framed(incoming); let (server_hello, _) = eth_hello(); let (p2p_stream, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream) .handshake(other_status, other_fork_filter) .await .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; }); let conn = connect_passthrough(local_addr, eth_hello().0).await; let eth = conn.shared_capabilities().eth().unwrap().clone(); let multiplexer = RlpxProtocolMultiplexer::new(conn); let _satellite = multiplexer .into_satellite_stream_with_handshake( eth.capability().as_ref(), move |proxy| async move { UnauthedEthStream::new(proxy).handshake(status, fork_filter).await }, ) .await .unwrap(); } /// A test that install a satellite stream eth+test protocol and sends messages between them. #[tokio::test(flavor = "multi_thread")] async fn eth_test_protocol_satellite() { reth_tracing::init_test_tracing(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); let (status, fork_filter) = eth_handshake(); let other_status = status; let other_fork_filter = fork_filter.clone(); let _handle = tokio::spawn(async move { let (incoming, _) = listener.accept().await.unwrap(); let stream = crate::PassthroughCodec::default().framed(incoming); let (server_hello, _) = test_hello(); let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn) .into_eth_satellite_stream(other_status, other_fork_filter) .await .unwrap(); st.install_protocol(&TestProtoMessage::capability(), |mut conn| { async_stream::stream! { yield TestProtoMessage::ping().encoded(); let msg = conn.next().await.unwrap(); let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); assert_eq!(msg, TestProtoMessage::pong()); yield TestProtoMessage::message("hello").encoded(); let msg = conn.next().await.unwrap(); let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); assert_eq!(msg, TestProtoMessage::message("good bye!")); yield TestProtoMessage::message("good bye!").encoded(); futures::future::pending::<()>().await; unreachable!() } }) .unwrap(); loop { let _ = st.next().await; } }); let conn = connect_passthrough(local_addr, test_hello().0).await; let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn) .into_eth_satellite_stream(status, fork_filter) .await .unwrap(); let (tx, mut rx) = oneshot::channel(); st.install_protocol(&TestProtoMessage::capability(), |mut conn| { async_stream::stream! { let msg = conn.next().await.unwrap(); let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); assert_eq!(msg, TestProtoMessage::ping()); yield TestProtoMessage::pong().encoded(); let msg = conn.next().await.unwrap(); let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); assert_eq!(msg, TestProtoMessage::message("hello")); yield TestProtoMessage::message("good bye!").encoded(); let msg = conn.next().await.unwrap(); let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); assert_eq!(msg, TestProtoMessage::message("good bye!")); tx.send(()).unwrap(); futures::future::pending::<()>().await; unreachable!() } }) .unwrap(); loop { tokio::select! { _ = &mut rx => { break } _ = st.next() => { } } } } }