//! Implementation of [EIP-1459](https://eips.ethereum.org/EIPS/eip-1459) Node Discovery via DNS. //! //! ## Feature Flags //! //! - `serde` (default): Enable serde support //! - `test-utils`: Export utilities for testing #![doc( html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] pub use crate::resolver::{DnsResolver, MapResolver, Resolver}; use crate::{ query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult}, sync::{ResolveKind, SyncAction}, tree::{DnsEntry, LinkEntry}, }; pub use config::DnsDiscoveryConfig; use enr::Enr; pub use error::ParseDnsEntryError; use reth_ethereum_forks::{EnrForkIdEntry, ForkId}; use reth_network_peers::{pk2id, NodeRecord}; use schnellru::{ByLength, LruMap}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, net::IpAddr, pin::Pin, sync::Arc, task::{ready, Context, Poll}, time::{Duration, Instant}, }; use sync::SyncTree; use tokio::{ sync::{ mpsc, mpsc::{error::TrySendError, UnboundedSender}, oneshot, }, task::JoinHandle, }; use tokio_stream::{ wrappers::{ReceiverStream, UnboundedReceiverStream}, Stream, StreamExt, }; use tracing::{debug, trace}; mod config; mod error; mod query; pub mod resolver; mod sync; pub mod tree; /// [`DnsDiscoveryService`] front-end. #[derive(Clone, Debug)] pub struct DnsDiscoveryHandle { /// Channel for sending commands to the service. to_service: UnboundedSender, } // === impl DnsDiscovery === impl DnsDiscoveryHandle { /// Starts syncing the given link to a tree. pub fn sync_tree(&self, link: &str) -> Result<(), ParseDnsEntryError> { self.sync_tree_with_link(link.parse()?); Ok(()) } /// Starts syncing the given link to a tree. pub fn sync_tree_with_link(&self, link: LinkEntry) { let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link)); } /// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s. pub async fn node_record_stream( &self, ) -> Result, oneshot::error::RecvError> { let (tx, rx) = oneshot::channel(); let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx); let _ = self.to_service.send(cmd); rx.await } } /// A client that discovers nodes via DNS. #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct DnsDiscoveryService { /// Copy of the sender half, so new [`DnsDiscoveryHandle`] can be created on demand. command_tx: UnboundedSender, /// Receiver half of the command channel. command_rx: UnboundedReceiverStream, /// All subscribers for resolved [`NodeRecord`]s. node_record_listeners: Vec>, /// All the trees that can be synced. trees: HashMap, /// All queries currently in progress queries: QueryPool, /// Cached dns records dns_record_cache: LruMap>, /// all buffered events queued_events: VecDeque, /// The rate at which trees should be updated. recheck_interval: Duration, /// Links to the DNS networks to bootstrap. bootstrap_dns_networks: HashSet, } // === impl DnsDiscoveryService === impl DnsDiscoveryService { /// Creates a new instance of the [`DnsDiscoveryService`] using the given settings. /// /// ``` /// use reth_dns_discovery::{DnsDiscoveryService, DnsResolver}; /// use std::sync::Arc; /// # fn t() { /// let service = DnsDiscoveryService::new( /// Arc::new(DnsResolver::from_system_conf().unwrap()), /// Default::default(), /// ); /// # } /// ``` pub fn new(resolver: Arc, config: DnsDiscoveryConfig) -> Self { let DnsDiscoveryConfig { lookup_timeout, max_requests_per_sec, recheck_interval, dns_record_cache_limit, bootstrap_dns_networks, } = config; let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout); let (command_tx, command_rx) = mpsc::unbounded_channel(); Self { command_tx, command_rx: UnboundedReceiverStream::new(command_rx), node_record_listeners: Default::default(), trees: Default::default(), queries, dns_record_cache: LruMap::new(ByLength::new(dns_record_cache_limit.get())), queued_events: Default::default(), recheck_interval, bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(), } } /// Spawns this services onto a new task /// /// Note: requires a running runtime pub fn spawn(mut self) -> JoinHandle<()> { tokio::task::spawn(async move { self.bootstrap(); while let Some(event) = self.next().await { trace!(target: "disc::dns", ?event, "processed"); } }) } /// Starts discovery with all configured bootstrap links pub fn bootstrap(&mut self) { for link in self.bootstrap_dns_networks.clone() { self.sync_tree_with_link(link); } } /// Same as [`DnsDiscoveryService::new`] but also returns a new handle that's connected to the /// service pub fn new_pair(resolver: Arc, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { let service = Self::new(resolver, config); let handle = service.handle(); (service, handle) } /// Returns a new [`DnsDiscoveryHandle`] that can send commands to this type. pub fn handle(&self) -> DnsDiscoveryHandle { DnsDiscoveryHandle { to_service: self.command_tx.clone() } } /// Creates a new channel for [`NodeRecord`]s. pub fn node_record_stream(&mut self) -> ReceiverStream { let (tx, rx) = mpsc::channel(256); self.node_record_listeners.push(tx); ReceiverStream::new(rx) } /// Sends the event to all listeners. /// /// Remove channels that got closed. fn notify(&mut self, record: DnsNodeRecordUpdate) { self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) { Ok(()) => true, Err(err) => match err { TrySendError::Full(_) => true, TrySendError::Closed(_) => false, }, }); } /// Starts syncing the given link to a tree. pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> { self.sync_tree_with_link(link.parse()?); Ok(()) } /// Starts syncing the given link to a tree. pub fn sync_tree_with_link(&mut self, link: LinkEntry) { self.queries.resolve_root(link); } /// Resolves an entry fn resolve_entry(&mut self, link: LinkEntry, hash: String, kind: ResolveKind) { if let Some(entry) = self.dns_record_cache.get(&hash).cloned() { // already resolved let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind }; self.on_resolved_entry(cached); return } self.queries.resolve_entry(link, hash, kind) } fn on_resolved_root(&mut self, resp: ResolveRootResult) { match resp { Ok((root, link)) => match self.trees.entry(link.clone()) { Entry::Occupied(mut entry) => { entry.get_mut().update_root(root); } Entry::Vacant(entry) => { entry.insert(SyncTree::new(root, link)); } }, Err((err, link)) => { debug!(target: "disc::dns",%err, ?link, "Failed to lookup root") } } } fn on_resolved_enr(&mut self, enr: Enr) { if let Some(record) = convert_enr_node_record(&enr) { self.notify(record); } self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr)) } fn on_resolved_entry(&mut self, resp: ResolveEntryResult) { let ResolveEntryResult { entry, link, hash, kind } = resp; match entry { Some(Err(err)) => { debug!(target: "disc::dns",%err, domain=%link.domain, ?hash, "Failed to lookup entry") } None => { trace!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry") } Some(Ok(entry)) => { // cache entry self.dns_record_cache.insert(hash.clone(), entry.clone()); match entry { DnsEntry::Root(root) => { debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry"); } DnsEntry::Link(link_entry) => { if kind.is_link() { if let Some(tree) = self.trees.get_mut(&link) { tree.resolved_links_mut().insert(hash, link_entry.clone()); } self.sync_tree_with_link(link_entry) } else { debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry"); } } DnsEntry::Branch(branch_entry) => { if let Some(tree) = self.trees.get_mut(&link) { tree.extend_children(kind, branch_entry.children) } } DnsEntry::Node(entry) => { if kind.is_link() { debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry"); } else { self.on_resolved_enr(entry.enr) } } } } } } /// Advances the state of the DNS discovery service by polling,triggering lookups pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { loop { // drain buffered events first if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event) } // process all incoming commands while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) { match cmd { DnsDiscoveryCommand::SyncTree(link) => { self.sync_tree_with_link(link); } DnsDiscoveryCommand::NodeRecordUpdates(tx) => { let _ = tx.send(self.node_record_stream()); } } } while let Poll::Ready(outcome) = self.queries.poll(cx) { // handle query outcome match outcome { QueryOutcome::Root(resp) => self.on_resolved_root(resp), QueryOutcome::Entry(resp) => self.on_resolved_entry(resp), } } let mut progress = false; let now = Instant::now(); let mut pending_resolves = Vec::new(); let mut pending_updates = Vec::new(); for tree in self.trees.values_mut() { while let Some(action) = tree.poll(now, self.recheck_interval) { progress = true; match action { SyncAction::UpdateRoot => { pending_updates.push(tree.link().clone()); } SyncAction::Enr(hash) => { pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr)); } SyncAction::Link(hash) => { pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link)); } } } } for (domain, hash, kind) in pending_resolves { self.resolve_entry(domain, hash, kind) } for link in pending_updates { self.sync_tree_with_link(link) } if !progress && self.queued_events.is_empty() { return Poll::Pending } } } } /// A Stream events, mainly used for debugging impl Stream for DnsDiscoveryService { type Item = DnsDiscoveryEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(Some(ready!(self.get_mut().poll(cx)))) } } /// The converted discovered [Enr] object #[derive(Debug, Clone, Eq, PartialEq)] pub struct DnsNodeRecordUpdate { /// Discovered node and it's addresses pub node_record: NodeRecord, /// The forkid of the node, if present in the ENR pub fork_id: Option, /// Original [`Enr`]. pub enr: Enr, } /// Commands sent from [`DnsDiscoveryHandle`] to [`DnsDiscoveryService`] enum DnsDiscoveryCommand { /// Sync a tree SyncTree(LinkEntry), NodeRecordUpdates(oneshot::Sender>), } /// Represents dns discovery related update events. #[derive(Debug, Clone)] pub enum DnsDiscoveryEvent { /// Resolved an Enr entry via DNS. Enr(Enr), } /// Converts an [Enr] into a [`NodeRecord`] fn convert_enr_node_record(enr: &Enr) -> Option { let node_record = NodeRecord { address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, udp_port: enr.udp4().or_else(|| enr.udp6())?, id: pk2id(&enr.public_key()), } .into_ipv4_mapped(); let fork_id = enr.get_decodable::(b"eth").transpose().ok().flatten().map(Into::into); Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() }) } #[cfg(test)] mod tests { use super::*; use crate::tree::TreeRootEntry; use alloy_chains::Chain; use alloy_rlp::{Decodable, Encodable}; use enr::EnrKey; use reth_chainspec::MAINNET; use reth_ethereum_forks::{EthereumHardfork, ForkHash}; use secp256k1::rand::thread_rng; use std::{future::poll_fn, net::Ipv4Addr}; #[test] fn test_convert_enr_node_record() { // rig let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng()); let enr = Enr::builder() .ip("127.0.0.1".parse().unwrap()) .udp4(9000) .tcp4(30303) .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id())) .build(&secret_key) .unwrap(); // test let node_record_update = convert_enr_node_record(&enr).unwrap(); assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::().unwrap()); assert_eq!(node_record_update.node_record.tcp_port, 30303); assert_eq!(node_record_update.node_record.udp_port, 9000); assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id())); assert_eq!(node_record_update.enr, enr); } #[test] fn test_decode_and_convert_enr_node_record() { // rig let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng()); let enr = Enr::builder() .ip("127.0.0.1".parse().unwrap()) .udp4(9000) .tcp4(30303) .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id())) .add_value(b"opstack", &ForkId { hash: ForkHash(rand::random()), next: rand::random() }) .build(&secret_key) .unwrap(); let mut encoded_enr = vec![]; enr.encode(&mut encoded_enr); // test let decoded_enr = Enr::decode(&mut &encoded_enr[..]).unwrap(); let node_record_update = convert_enr_node_record(&decoded_enr).unwrap(); assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::().unwrap()); assert_eq!(node_record_update.node_record.tcp_port, 30303); assert_eq!(node_record_update.node_record.udp_port, 9000); assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id())); assert_eq!(node_record_update.enr, enr); } #[tokio::test] async fn test_start_root_sync() { reth_tracing::init_test_tracing(); let secret_key = SecretKey::new(&mut thread_rng()); let resolver = MapResolver::default(); let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE"; let mut root: TreeRootEntry = s.parse().unwrap(); root.sign(&secret_key).unwrap(); let link = LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); service.sync_tree_with_link(link.clone()); poll_fn(|cx| { let _ = service.poll(cx); Poll::Ready(()) }) .await; let tree = service.trees.get(&link).unwrap(); assert_eq!(tree.root().clone(), root); } #[tokio::test(flavor = "multi_thread")] async fn test_get_node() { reth_tracing::init_test_tracing(); let secret_key = SecretKey::new(&mut thread_rng()); let resolver = MapResolver::default(); let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE"; let mut root: TreeRootEntry = s.parse().unwrap(); root.sign(&secret_key).unwrap(); let link = LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); let mut builder = Enr::builder(); let fork_id = MAINNET.hardfork_fork_id(EthereumHardfork::Frontier).unwrap(); builder .ip4(Ipv4Addr::LOCALHOST) .udp4(30303) .tcp4(30303) .add_value(b"eth", &EnrForkIdEntry::from(fork_id)); let enr = builder.build(&secret_key).unwrap(); resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); let mut node_records = service.node_record_stream(); let task = tokio::task::spawn(async move { let record = node_records.next().await.unwrap(); assert_eq!(record.fork_id, Some(fork_id)); }); service.sync_tree_with_link(link.clone()); let event = poll_fn(|cx| service.poll(cx)).await; match event { DnsDiscoveryEvent::Enr(discovered) => { assert_eq!(discovered, enr); } } poll_fn(|cx| { assert!(service.poll(cx).is_pending()); Poll::Ready(()) }) .await; task.await.unwrap(); } #[tokio::test] async fn test_recheck_tree() { reth_tracing::init_test_tracing(); let config = DnsDiscoveryConfig { recheck_interval: Duration::from_millis(750), ..Default::default() }; let secret_key = SecretKey::new(&mut thread_rng()); let resolver = Arc::new(MapResolver::default()); let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE"; let mut root: TreeRootEntry = s.parse().unwrap(); root.sign(&secret_key).unwrap(); let link = LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; resolver.insert(link.domain.clone(), root.to_string()); let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone()); service.sync_tree_with_link(link.clone()); poll_fn(|cx| { assert!(service.poll(cx).is_pending()); Poll::Ready(()) }) .await; // await recheck timeout tokio::time::sleep(config.recheck_interval).await; let enr = Enr::empty(&secret_key).unwrap(); resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); let event = poll_fn(|cx| service.poll(cx)).await; match event { DnsDiscoveryEvent::Enr(discovered) => { assert_eq!(discovered, enr); } } poll_fn(|cx| { assert!(service.poll(cx).is_pending()); Poll::Ready(()) }) .await; } #[tokio::test] #[ignore] async fn test_dns_resolver() { reth_tracing::init_test_tracing(); let mut service = DnsDiscoveryService::new( Arc::new(DnsResolver::from_system_conf().unwrap()), Default::default(), ); service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap(); while let Some(event) = service.next().await { match event { DnsDiscoveryEvent::Enr(enr) => { println!("discovered enr {}", enr.to_base64()); } } } } }