//! Blocks/Headers management for the p2p network. use std::{ future::Future, pin::Pin, task::{Context, Poll}, time::Duration, }; use alloy_rlp::Encodable; use futures::StreamExt; use reth_eth_wire::{ BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, HeadersDirection, NodeData, Receipts, }; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::error::RequestResult; use reth_network_peers::PeerId; use reth_primitives::{BlockBody, BlockHashOrNumber, Header}; use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider}; use tokio::sync::{mpsc::Receiver, oneshot}; use tokio_stream::wrappers::ReceiverStream; use crate::{ budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget, metrics::EthRequestHandlerMetrics, }; // Limits: /// Maximum number of receipts to serve. /// /// Used to limit lookups. const MAX_RECEIPTS_SERVE: usize = 1024; /// Maximum number of block headers to serve. /// /// Used to limit lookups. const MAX_HEADERS_SERVE: usize = 1024; /// Maximum number of block headers to serve. /// /// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be /// `SOFT_RESPONSE_LIMIT`. const MAX_BODIES_SERVE: usize = 1024; /// Maximum size of replies to data retrievals. const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024; /// Manages eth related requests on top of the p2p network. /// /// This can be spawned to another task and is supposed to be run as background service. #[derive(Debug)] #[must_use = "Manager does nothing unless polled."] pub struct EthRequestHandler { /// The client type that can interact with the chain. client: C, /// Used for reporting peers. // TODO use to report spammers #[allow(dead_code)] peers: PeersHandle, /// Incoming request from the [`NetworkManager`](crate::NetworkManager). incoming_requests: ReceiverStream, /// Metrics for the eth request handler. metrics: EthRequestHandlerMetrics, } // === impl EthRequestHandler === impl EthRequestHandler { /// Create a new instance pub fn new(client: C, peers: PeersHandle, incoming: Receiver) -> Self { Self { client, peers, incoming_requests: ReceiverStream::new(incoming), metrics: Default::default(), } } } impl EthRequestHandler where C: BlockReader + HeaderProvider + ReceiptProvider, { /// Returns the list of requested headers fn get_headers_response(&self, request: GetBlockHeaders) -> Vec
{ let GetBlockHeaders { start_block, limit, skip, direction } = request; let mut headers = Vec::new(); let mut block: BlockHashOrNumber = match start_block { BlockHashOrNumber::Hash(start) => start.into(), BlockHashOrNumber::Number(num) => { let Some(hash) = self.client.block_hash(num).unwrap_or_default() else { return headers }; hash.into() } }; let skip = skip as u64; let mut total_bytes = 0; for _ in 0..limit { if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() { match direction { HeadersDirection::Rising => { if let Some(next) = (header.number + 1).checked_add(skip) { block = next.into() } else { break } } HeadersDirection::Falling => { if skip > 0 { // prevent under flows for block.number == 0 and `block.number - skip < // 0` if let Some(next) = header.number.checked_sub(1).and_then(|num| num.checked_sub(skip)) { block = next.into() } else { break } } else { block = header.parent_hash.into() } } } total_bytes += header.length(); headers.push(header); if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT { break } } else { break } } headers } fn on_headers_request( &self, _peer_id: PeerId, request: GetBlockHeaders, response: oneshot::Sender>, ) { self.metrics.eth_headers_requests_received_total.increment(1); let headers = self.get_headers_response(request); let _ = response.send(Ok(BlockHeaders(headers))); } fn on_bodies_request( &self, _peer_id: PeerId, request: GetBlockBodies, response: oneshot::Sender>, ) { self.metrics.eth_bodies_requests_received_total.increment(1); let mut bodies = Vec::new(); let mut total_bytes = 0; for hash in request.0 { if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() { let body: BlockBody = block.into(); total_bytes += body.length(); bodies.push(body); if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT { break } } else { break } } let _ = response.send(Ok(BlockBodies(bodies))); } fn on_receipts_request( &self, _peer_id: PeerId, request: GetReceipts, response: oneshot::Sender>, ) { self.metrics.eth_receipts_requests_received_total.increment(1); let mut receipts = Vec::new(); let mut total_bytes = 0; for hash in request.0 { if let Some(receipts_by_block) = self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default() { let receipt = receipts_by_block .into_iter() .map(|receipt| receipt.with_bloom()) .collect::>(); total_bytes += receipt.length(); receipts.push(receipt); if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT { break } } else { break } } let _ = response.send(Ok(Receipts(receipts))); } } /// An endless future. /// /// This should be spawned or used as part of `tokio::select!`. impl Future for EthRequestHandler where C: BlockReader + HeaderProvider + Unpin, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); let mut acc = Duration::ZERO; let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!( acc, "net::eth", "Incoming eth requests stream", DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, this.incoming_requests.poll_next_unpin(cx), |incoming| { match incoming { IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => { this.on_headers_request(peer_id, request, response) } IncomingEthRequest::GetBlockBodies { peer_id, request, response } => { this.on_bodies_request(peer_id, request, response) } IncomingEthRequest::GetNodeData { .. } => { this.metrics.eth_node_data_requests_received_total.increment(1); } IncomingEthRequest::GetReceipts { peer_id, request, response } => { this.on_receipts_request(peer_id, request, response) } } }, ); this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64()); // stream is fully drained and import futures pending if maybe_more_incoming_requests { // make sure we're woken up again cx.waker().wake_by_ref(); return Poll::Pending } Poll::Pending } } /// All `eth` request related to blocks delegated by the network. #[derive(Debug)] pub enum IncomingEthRequest { /// Request Block headers from the peer. /// /// The response should be sent through the channel. GetBlockHeaders { /// The ID of the peer to request block headers from. peer_id: PeerId, /// The specific block headers requested. request: GetBlockHeaders, /// The channel sender for the response containing block headers. response: oneshot::Sender>, }, /// Request Block bodies from the peer. /// /// The response should be sent through the channel. GetBlockBodies { /// The ID of the peer to request block bodies from. peer_id: PeerId, /// The specific block bodies requested. request: GetBlockBodies, /// The channel sender for the response containing block bodies. response: oneshot::Sender>, }, /// Request Node Data from the peer. /// /// The response should be sent through the channel. GetNodeData { /// The ID of the peer to request node data from. peer_id: PeerId, /// The specific node data requested. request: GetNodeData, /// The channel sender for the response containing node data. response: oneshot::Sender>, }, /// Request Receipts from the peer. /// /// The response should be sent through the channel. GetReceipts { /// The ID of the peer to request receipts from. peer_id: PeerId, /// The specific receipts requested. request: GetReceipts, /// The channel sender for the response containing receipts. response: oneshot::Sender>, }, }