//! Fetch data from the network. mod client; pub use client::FetchClient; use std::{ collections::{HashMap, VecDeque}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, }; use futures::StreamExt; use reth_eth_wire::{GetBlockBodies, GetBlockHeaders}; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult}, headers::client::HeadersRequest, priority::Priority, }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; use reth_primitives::{BlockBody, Header, B256}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::message::BlockRequest; /// Manages data fetching operations. /// /// This type is hooked into the staged sync pipeline and delegates download request to available /// peers and sends the response once ready. /// /// This type maintains a list of connected peers that are available for requests. #[derive(Debug)] pub struct StateFetcher { /// Currently active [`GetBlockHeaders`] requests inflight_headers_requests: HashMap>>>, /// Currently active [`GetBlockBodies`] requests inflight_bodies_requests: HashMap, PeerRequestResult>>>, /// The list of _available_ peers for requests. peers: HashMap, /// The handle to the peers manager peers_handle: PeersHandle, /// Number of active peer sessions the node's currently handling. num_active_peers: Arc, /// Requests queued for processing queued_requests: VecDeque, /// Receiver for new incoming download requests download_requests_rx: UnboundedReceiverStream, /// Sender for download requests, used to detach a [`FetchClient`] download_requests_tx: UnboundedSender, } // === impl StateSyncer === impl StateFetcher { pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc) -> Self { let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); Self { inflight_headers_requests: Default::default(), inflight_bodies_requests: Default::default(), peers: Default::default(), peers_handle, num_active_peers, queued_requests: Default::default(), download_requests_rx: UnboundedReceiverStream::new(download_requests_rx), download_requests_tx, } } /// Invoked when connected to a new peer. pub(crate) fn new_active_peer( &mut self, peer_id: PeerId, best_hash: B256, best_number: u64, timeout: Arc, ) { self.peers.insert( peer_id, Peer { state: PeerState::Idle, best_hash, best_number, timeout, last_response_likely_bad: false, }, ); } /// Removes the peer from the peer list, after which it is no longer available for future /// requests. /// /// Invoked when an active session was closed. /// /// This cancels also inflight request and sends an error to the receiver. pub(crate) fn on_session_closed(&mut self, peer: &PeerId) { self.peers.remove(peer); if let Some(req) = self.inflight_headers_requests.remove(peer) { let _ = req.response.send(Err(RequestError::ConnectionDropped)); } if let Some(req) = self.inflight_bodies_requests.remove(peer) { let _ = req.response.send(Err(RequestError::ConnectionDropped)); } } /// Updates the block information for the peer. /// /// Returns `true` if this a newer block pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool { if let Some(peer) = self.peers.get_mut(peer_id) { if number > peer.best_number { peer.best_hash = hash; peer.best_number = number; return true } } false } /// Invoked when an active session is about to be disconnected. pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) { if let Some(peer) = self.peers.get_mut(peer_id) { peer.state = PeerState::Closing; } } /// Returns the _next_ idle peer that's ready to accept a request, /// prioritizing those with the lowest timeout/latency and those that recently responded with /// adequate data. fn next_best_peer(&self) -> Option { let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle()); let mut best_peer = idle.next()?; for maybe_better in idle { // replace best peer if our current best peer sent us a bad response last time if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad { best_peer = maybe_better; continue } // replace best peer if this peer has better rtt if maybe_better.1.timeout() < best_peer.1.timeout() && !maybe_better.1.last_response_likely_bad { best_peer = maybe_better; } } Some(*best_peer.0) } /// Returns the next action to return fn poll_action(&mut self) -> PollAction { // we only check and not pop here since we don't know yet whether a peer is available. if self.queued_requests.is_empty() { return PollAction::NoRequests } let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable }; let request = self.queued_requests.pop_front().expect("not empty"); let request = self.prepare_block_request(peer_id, request); PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) } /// Advance the state the syncer pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { // drain buffered actions first loop { let no_peers_available = match self.poll_action() { PollAction::Ready(action) => return Poll::Ready(action), PollAction::NoRequests => false, PollAction::NoPeersAvailable => true, }; loop { // poll incoming requests match self.download_requests_rx.poll_next_unpin(cx) { Poll::Ready(Some(request)) => match request.get_priority() { Priority::High => { // find the first normal request and queue before, add this request to // the back of the high-priority queue let pos = self .queued_requests .iter() .position(|req| req.is_normal_priority()) .unwrap_or(0); self.queued_requests.insert(pos, request); } Priority::Normal => { self.queued_requests.push_back(request); } }, Poll::Ready(None) => { unreachable!("channel can't close") } Poll::Pending => break, } } if self.queued_requests.is_empty() || no_peers_available { return Poll::Pending } } } /// Handles a new request to a peer. /// /// Caution: this assumes the peer exists and is idle fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest) -> BlockRequest { // update the peer's state if let Some(peer) = self.peers.get_mut(&peer_id) { peer.state = req.peer_state(); } match req { DownloadRequest::GetBlockHeaders { request, response, .. } => { let inflight = Request { request: request.clone(), response }; self.inflight_headers_requests.insert(peer_id, inflight); let HeadersRequest { start, limit, direction } = request; BlockRequest::GetBlockHeaders(GetBlockHeaders { start_block: start, limit, skip: 0, direction, }) } DownloadRequest::GetBlockBodies { request, response, .. } => { let inflight = Request { request: request.clone(), response }; self.inflight_bodies_requests.insert(peer_id, inflight); BlockRequest::GetBlockBodies(GetBlockBodies(request)) } } } /// Returns a new followup request for the peer. /// /// Caution: this expects that the peer is _not_ closed. fn followup_request(&mut self, peer_id: PeerId) -> Option { let req = self.queued_requests.pop_front()?; let req = self.prepare_block_request(peer_id, req); Some(BlockResponseOutcome::Request(peer_id, req)) } /// Called on a `GetBlockHeaders` response from a peer. /// /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a /// direct followup request or get the peer reported if the response was a /// [`EthResponseValidator::reputation_change_err`] pub(crate) fn on_block_headers_response( &mut self, peer_id: PeerId, res: RequestResult>, ) -> Option { let is_error = res.is_err(); let maybe_reputation_change = res.reputation_change_err(); let resp = self.inflight_headers_requests.remove(&peer_id); let is_likely_bad_response = resp .as_ref() .map(|r| res.is_likely_bad_headers_response(&r.request)) .unwrap_or_default(); if let Some(resp) = resp { // delegate the response let _ = resp.response.send(res.map(|h| (peer_id, h).into())); } if let Some(peer) = self.peers.get_mut(&peer_id) { // update the peer's response state peer.last_response_likely_bad = is_likely_bad_response; // If the peer is still ready to accept new requests, we try to send a followup // request immediately. if peer.state.on_request_finished() && !is_error && !is_likely_bad_response { return self.followup_request(peer_id) } } // if the response was an `Err` worth reporting the peer for then we return a `BadResponse` // outcome maybe_reputation_change .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change)) } /// Called on a `GetBlockBodies` response from a peer pub(crate) fn on_block_bodies_response( &mut self, peer_id: PeerId, res: RequestResult>, ) -> Option { let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty()); if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) { let _ = resp.response.send(res.map(|b| (peer_id, b).into())); } if let Some(peer) = self.peers.get_mut(&peer_id) { // update the peer's response state peer.last_response_likely_bad = is_likely_bad_response; if peer.state.on_request_finished() && !is_likely_bad_response { return self.followup_request(peer_id) } } None } /// Returns a new [`FetchClient`] that can send requests to this type. pub(crate) fn client(&self) -> FetchClient { FetchClient { request_tx: self.download_requests_tx.clone(), peers_handle: self.peers_handle.clone(), num_active_peers: Arc::clone(&self.num_active_peers), } } } /// The outcome of [`StateFetcher::poll_action`] enum PollAction { Ready(FetchAction), NoRequests, NoPeersAvailable, } /// Represents a connected peer #[derive(Debug)] struct Peer { /// The state this peer currently resides in. state: PeerState, /// Best known hash that the peer has best_hash: B256, /// Tracks the best number of the peer. best_number: u64, /// Tracks the current timeout value we use for the peer. timeout: Arc, /// Tracks whether the peer has recently responded with a likely bad response. /// /// This is used to de-rank the peer if there are other peers available. /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are /// downloaded), but we still want to avoid requesting from the same peer again if it has the /// lowest timeout. last_response_likely_bad: bool, } impl Peer { fn timeout(&self) -> u64 { self.timeout.load(Ordering::Relaxed) } } /// Tracks the state of an individual peer #[derive(Debug)] enum PeerState { /// Peer is currently not handling requests and is available. Idle, /// Peer is handling a `GetBlockHeaders` request. GetBlockHeaders, /// Peer is handling a `GetBlockBodies` request. GetBlockBodies, /// Peer session is about to close Closing, } // === impl PeerState === impl PeerState { /// Returns true if the peer is currently idle. const fn is_idle(&self) -> bool { matches!(self, Self::Idle) } /// Resets the state on a received response. /// /// If the state was already marked as `Closing` do nothing. /// /// Returns `true` if the peer is ready for another request. fn on_request_finished(&mut self) -> bool { if !matches!(self, Self::Closing) { *self = Self::Idle; return true } false } } /// A request that waits for a response from the network, so it can send it back through the /// response channel. #[derive(Debug)] struct Request { /// The issued request object // TODO: this can be attached to the response in error case #[allow(dead_code)] request: Req, response: oneshot::Sender, } /// Requests that can be sent to the Syncer from a [`FetchClient`] #[derive(Debug)] pub(crate) enum DownloadRequest { /// Download the requested headers and send response through channel GetBlockHeaders { request: HeadersRequest, response: oneshot::Sender>>, priority: Priority, }, /// Download the requested headers and send response through channel GetBlockBodies { request: Vec, response: oneshot::Sender>>, priority: Priority, }, } // === impl DownloadRequest === impl DownloadRequest { /// Returns the corresponding state for a peer that handles the request. const fn peer_state(&self) -> PeerState { match self { Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders, Self::GetBlockBodies { .. } => PeerState::GetBlockBodies, } } /// Returns the requested priority of this request const fn get_priority(&self) -> &Priority { match self { Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => { priority } } } /// Returns `true` if this request is normal priority. const fn is_normal_priority(&self) -> bool { self.get_priority().is_normal() } } /// An action the syncer can emit. pub(crate) enum FetchAction { /// Dispatch an eth request to the given peer. BlockRequest { /// The targeted recipient for the request peer_id: PeerId, /// The request to send request: BlockRequest, }, } /// Outcome of a processed response. /// /// Returned after processing a response. #[derive(Debug, PartialEq, Eq)] pub(crate) enum BlockResponseOutcome { /// Continue with another request to the peer. Request(PeerId, BlockRequest), /// How to handle a bad response and the reputation change to apply, if any. BadResponse(PeerId, ReputationChangeKind), } #[cfg(test)] mod tests { use super::*; use crate::{peers::PeersManager, PeersConfig}; use reth_primitives::{SealedHeader, B512}; use std::future::poll_fn; #[tokio::test(flavor = "multi_thread")] async fn test_poll_fetcher() { let manager = PeersManager::new(PeersConfig::default()); let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); poll_fn(move |cx| { assert!(fetcher.poll(cx).is_pending()); let (tx, _rx) = oneshot::channel(); fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies { request: vec![], response: tx, priority: Priority::default(), }); assert!(fetcher.poll(cx).is_pending()); Poll::Ready(()) }) .await; } #[tokio::test] async fn test_peer_rotation() { let manager = PeersManager::new(PeersConfig::default()); let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); // Add a few random peers let peer1 = B512::random(); let peer2 = B512::random(); fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1))); fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1))); let first_peer = fetcher.next_best_peer().unwrap(); assert!(first_peer == peer1 || first_peer == peer2); // Pending disconnect for first_peer fetcher.on_pending_disconnect(&first_peer); // first_peer now isn't idle, so we should get other peer let second_peer = fetcher.next_best_peer().unwrap(); assert!(first_peer == peer1 || first_peer == peer2); assert_ne!(first_peer, second_peer); // without idle peers, returns None fetcher.on_pending_disconnect(&second_peer); assert_eq!(fetcher.next_best_peer(), None); } #[tokio::test] async fn test_peer_prioritization() { let manager = PeersManager::new(PeersConfig::default()); let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); // Add a few random peers let peer1 = B512::random(); let peer2 = B512::random(); let peer3 = B512::random(); let peer2_timeout = Arc::new(AtomicU64::new(300)); fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30))); fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout)); fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50))); // Must always get peer1 (lowest timeout) assert_eq!(fetcher.next_best_peer(), Some(peer1)); assert_eq!(fetcher.next_best_peer(), Some(peer1)); // peer2's timeout changes below peer1's peer2_timeout.store(10, Ordering::Relaxed); // Then we get peer 2 always (now lowest) assert_eq!(fetcher.next_best_peer(), Some(peer2)); assert_eq!(fetcher.next_best_peer(), Some(peer2)); } #[tokio::test] async fn test_on_block_headers_response() { let manager = PeersManager::new(PeersConfig::default()); let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); let peer_id = B512::random(); assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None); assert_eq!( fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)), Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout)) ); assert_eq!( fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)), None ); assert_eq!( fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)), None ); assert_eq!( fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)), None ); assert_eq!( fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)), None ); } #[tokio::test] async fn test_header_response_outcome() { let manager = PeersManager::new(PeersConfig::default()); let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); let peer_id = B512::random(); let request_pair = || { let (tx, _rx) = oneshot::channel(); let req = Request { request: HeadersRequest { start: 0u64.into(), limit: 1, direction: Default::default(), }, response: tx, }; let mut header = SealedHeader::default().unseal(); header.number = 0u64; (req, header) }; fetcher.new_active_peer( peer_id, Default::default(), Default::default(), Default::default(), ); let (req, header) = request_pair(); fetcher.inflight_headers_requests.insert(peer_id, req); let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header])); assert!(outcome.is_none()); assert!(fetcher.peers[&peer_id].state.is_idle()); let outcome = fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap(); assert!(EthResponseValidator::reputation_change_err(&Err(RequestError::Timeout)).is_some()); match outcome { BlockResponseOutcome::BadResponse(peer, _) => { assert_eq!(peer, peer_id) } BlockResponseOutcome::Request(_, _) => { unreachable!() } }; assert!(fetcher.peers[&peer_id].state.is_idle()); } }