use reth_evm::ConfigureEvm; use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider, StateProviderFactory}; use reth_rpc::{EthFilter, EthPubSub}; use reth_rpc_eth_types::{ cache::cache_new_blocks_task, EthApiBuilderCtx, EthConfig, EthStateCache, }; use reth_tasks::TaskSpawner; /// Alias for `eth` namespace API builder. pub type DynEthApiBuilder = Box) -> EthApi>; /// Handlers for core, filter and pubsub `eth` namespace APIs. #[derive(Debug, Clone)] pub struct EthHandlers { /// Main `eth_` request handler pub api: EthApi, /// The async caching layer used by the eth handlers pub cache: EthStateCache, /// Polling based filter handler available on all transports pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) pub pubsub: EthPubSub, } impl EthHandlers { /// Returns a new [`EthHandlers`] builder. #[allow(clippy::too_many_arguments)] pub fn builder( provider: Provider, pool: Pool, network: Network, evm_config: EvmConfig, config: EthConfig, executor: Tasks, events: Events, eth_api_builder: DynEthApiBuilder< Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi, >, ) -> EthHandlersBuilder { EthHandlersBuilder { provider, pool, network, evm_config, config, executor, events, eth_api_builder, } } } /// Builds [`EthHandlers`] for core, filter, and pubsub `eth_` apis. #[allow(missing_debug_implementations)] pub struct EthHandlersBuilder { provider: Provider, pool: Pool, network: Network, evm_config: EvmConfig, config: EthConfig, executor: Tasks, events: Events, eth_api_builder: DynEthApiBuilder, } impl EthHandlersBuilder where Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static, Pool: Send + Sync + Clone + 'static, EvmConfig: ConfigureEvm, Network: Clone + 'static, Tasks: TaskSpawner + Clone + 'static, Events: CanonStateSubscriptions + Clone + 'static, EthApi: 'static, { /// Returns a new instance with handlers for `eth` namespace. pub fn build(self) -> EthHandlers { let Self { provider, pool, network, evm_config, config, executor, events, eth_api_builder } = self; let cache = EthStateCache::spawn_with( provider.clone(), config.cache, executor.clone(), evm_config.clone(), ); let new_canonical_blocks = events.canonical_state_stream(); let c = cache.clone(); executor.spawn_critical( "cache canonical blocks task", Box::pin(async move { cache_new_blocks_task(c, new_canonical_blocks).await; }), ); let ctx = EthApiBuilderCtx { provider, pool, network, evm_config, config, executor, events, cache, }; let api = eth_api_builder(&ctx); let filter = EthFilterApiBuilder::build(&ctx); let pubsub = EthPubSubApiBuilder::build(&ctx); EthHandlers { api, cache: ctx.cache, filter, pubsub } } } /// Builds the `eth_` namespace API [`EthFilterApiServer`](reth_rpc_eth_api::EthFilterApiServer). #[derive(Debug)] pub struct EthFilterApiBuilder; impl EthFilterApiBuilder { /// Builds the [`EthFilterApiServer`](reth_rpc_eth_api::EthFilterApiServer), for given context. pub fn build( ctx: &EthApiBuilderCtx, ) -> EthFilter where Provider: Send + Sync + Clone + 'static, Pool: Send + Sync + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, { EthFilter::new( ctx.provider.clone(), ctx.pool.clone(), ctx.cache.clone(), ctx.config.filter_config(), Box::new(ctx.executor.clone()), ) } } /// Builds the `eth_` namespace API [`EthPubSubApiServer`](reth_rpc_eth_api::EthFilterApiServer). #[derive(Debug)] pub struct EthPubSubApiBuilder; impl EthPubSubApiBuilder { /// Builds the [`EthPubSubApiServer`](reth_rpc_eth_api::EthPubSubApiServer), for given context. pub fn build( ctx: &EthApiBuilderCtx, ) -> EthPubSub where Provider: Clone, Pool: Clone, Events: Clone, Network: Clone, Tasks: TaskSpawner + Clone + 'static, { EthPubSub::with_spawner( ctx.provider.clone(), ctx.pool.clone(), ctx.events.clone(), ctx.network.clone(), Box::new(ctx.executor.clone()), ) } }