use async_trait::async_trait; use jsonrpsee::core::RpcResult as Result; use reth_primitives::{Address, TransactionSignedEcRecovered}; use reth_rpc_api::TxPoolApiServer; use reth_rpc_types::{ txpool::{TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus}, Transaction, }; use reth_transaction_pool::{AllPoolTransactions, PoolTransaction, TransactionPool}; use std::collections::BTreeMap; use tracing::trace; /// `txpool` API implementation. /// /// This type provides the functionality for handling `txpool` related requests. #[derive(Clone)] pub struct TxPoolApi { /// An interface to interact with the pool pool: Pool, } impl TxPoolApi { /// Creates a new instance of `TxpoolApi`. pub const fn new(pool: Pool) -> Self { Self { pool } } } impl TxPoolApi where Pool: TransactionPool + 'static, { fn content(&self) -> TxpoolContent { #[inline] fn insert( tx: &T, content: &mut BTreeMap>, ) { content.entry(tx.sender()).or_default().insert( tx.nonce().to_string(), reth_rpc_types_compat::transaction::from_recovered(tx.clone().into()), ); } let AllPoolTransactions { pending, queued } = self.pool.all_transactions(); let mut content = TxpoolContent::default(); for pending in pending { insert(&pending.transaction, &mut content.pending); } for queued in queued { insert(&queued.transaction, &mut content.queued); } content } } #[async_trait] impl TxPoolApiServer for TxPoolApi where Pool: TransactionPool + 'static, { /// Returns the number of transactions currently pending for inclusion in the next block(s), as /// well as the ones that are being scheduled for future execution only. /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status) /// /// Handler for `txpool_status` async fn txpool_status(&self) -> Result { trace!(target: "rpc::eth", "Serving txpool_status"); let all = self.pool.all_transactions(); Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 }) } /// Returns a summary of all the transactions currently pending for inclusion in the next /// block(s), as well as the ones that are being scheduled for future execution only. /// /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) for more details /// /// Handler for `txpool_inspect` async fn txpool_inspect(&self) -> Result { trace!(target: "rpc::eth", "Serving txpool_inspect"); #[inline] fn insert( tx: &T, inspect: &mut BTreeMap>, ) { let entry = inspect.entry(tx.sender()).or_default(); let tx: TransactionSignedEcRecovered = tx.clone().into(); entry.insert( tx.nonce().to_string(), TxpoolInspectSummary { to: tx.to(), value: tx.value(), gas: tx.gas_limit() as u128, gas_price: tx.transaction.max_fee_per_gas(), }, ); } let AllPoolTransactions { pending, queued } = self.pool.all_transactions(); Ok(TxpoolInspect { pending: pending.iter().fold(Default::default(), |mut acc, tx| { insert(&tx.transaction, &mut acc); acc }), queued: queued.iter().fold(Default::default(), |mut acc, tx| { insert(&tx.transaction, &mut acc); acc }), }) } /// Retrieves the transactions contained within the txpool, returning pending as well as queued /// transactions of this address, grouped by nonce. /// /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_contentFrom) for more details /// Handler for `txpool_contentFrom` async fn txpool_content_from(&self, from: Address) -> Result { trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom"); Ok(self.content().remove_from(&from)) } /// Returns the details of all transactions currently pending for inclusion in the next /// block(s), as well as the ones that are being scheduled for future execution only. /// /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content) for more details /// Handler for `txpool_content` async fn txpool_content(&self) -> Result { trace!(target: "rpc::eth", "Serving txpool_content"); Ok(self.content()) } } impl std::fmt::Debug for TxPoolApi { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("TxpoolApi").finish_non_exhaustive() } }