//! Stream wrapper that simulates reorgs. use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; use itertools::Either; use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated}; use reth_engine_primitives::EngineTypes; use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult}; use reth_ethereum_forks::EthereumHardforks; use reth_evm::{system_calls::apply_beacon_root_contract_call, ConfigureEvm}; use reth_payload_validator::ExecutionPayloadValidator; use reth_primitives::{ eip4844::calculate_excess_blob_gas, proofs, Block, Header, Receipt, Receipts, U256, }; use reth_provider::{BlockReader, ExecutionOutcome, ProviderError, StateProviderFactory}; use reth_revm::{ database::StateProviderDatabase, db::{states::bundle_state::BundleRetention, State}, state_change::post_block_withdrawals_balance_increments, DatabaseCommit, }; use reth_rpc_types::{ engine::{CancunPayloadFields, ForkchoiceState, PayloadStatus}, ExecutionPayload, }; use reth_rpc_types_compat::engine::payload::block_to_payload; use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg}; use std::{ collections::VecDeque, future::Future, pin::Pin, task::{ready, Context, Poll}, }; use tokio::sync::oneshot; use tracing::*; #[derive(Debug)] enum EngineReorgState { Forward, Reorg { queue: VecDeque> }, } type EngineReorgResponse = Result< Either, RethResult>, oneshot::error::RecvError, >; type ReorgResponseFut = Pin + Send + Sync>>; /// Engine API stream wrapper that simulates reorgs with specified frequency. #[derive(Debug)] #[pin_project::pin_project] pub struct EngineReorg { /// Underlying stream #[pin] stream: S, /// Database provider. provider: Provider, /// Evm configuration. evm_config: Evm, /// Payload validator. payload_validator: ExecutionPayloadValidator, /// The frequency of reorgs. frequency: usize, /// The number of forwarded forkchoice states. /// This is reset after a reorg. forkchoice_states_forwarded: usize, /// Current state of the stream. state: EngineReorgState, /// Last forkchoice state. last_forkchoice_state: Option, /// Pending engine responses to reorg messages. reorg_responses: FuturesUnordered, } impl EngineReorg { /// Creates new [`EngineReorg`] stream wrapper. pub fn new( stream: S, provider: Provider, evm_config: Evm, payload_validator: ExecutionPayloadValidator, frequency: usize, ) -> Self { Self { stream, provider, evm_config, payload_validator, frequency, state: EngineReorgState::Forward, forkchoice_states_forwarded: 0, last_forkchoice_state: None, reorg_responses: FuturesUnordered::new(), } } } impl Stream for EngineReorg where S: Stream>, Engine: EngineTypes, Provider: BlockReader + StateProviderFactory, Evm: ConfigureEvm, { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) { match response { Ok(Either::Left(Ok(payload_status))) => { debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload"); } Ok(Either::Left(Err(payload_error))) => { error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload"); } Ok(Either::Right(Ok(fcu_status))) => { debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update"); } Ok(Either::Right(Err(fcu_error))) => { error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update"); } Err(_) => {} }; continue } if let EngineReorgState::Reorg { queue } = &mut this.state { match queue.pop_front() { Some(msg) => return Poll::Ready(Some(msg)), None => { *this.forkchoice_states_forwarded = 0; *this.state = EngineReorgState::Forward; } } } let next = ready!(this.stream.poll_next_unpin(cx)); let item = match next { Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => { if this.forkchoice_states_forwarded > this.frequency { if let Some(last_forkchoice_state) = this .last_forkchoice_state // Only enter reorg state if new payload attaches to current head. .filter(|state| state.head_block_hash == payload.parent_hash()) { // Enter the reorg state. // The current payload will be immediately forwarded by being in front // of the queue. Then we attempt to reorg the current head by generating // a payload that attaches to the head's parent and is based on the // non-conflicting transactions (txs from block `n + 1` that are valid // at block `n` according to consensus checks) from the current payload // as well as the corresponding forkchoice state. We will rely on CL to // reorg us back to canonical chain. // TODO: This is an expensive blocking operation, ideally it's spawned // as a task so that the stream could yield the control back. let (reorg_payload, reorg_cancun_fields) = match create_reorg_head( this.provider, this.evm_config, this.payload_validator, payload.clone(), cancun_fields.clone(), ) { Ok(result) => result, Err(error) => { error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head"); // Forward the payload and attempt to create reorg on top of the // next one return Poll::Ready(Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx, })) } }; let reorg_forkchoice_state = ForkchoiceState { finalized_block_hash: last_forkchoice_state.finalized_block_hash, safe_block_hash: last_forkchoice_state.safe_block_hash, head_block_hash: reorg_payload.block_hash(), }; let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel(); let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel(); this.reorg_responses.extend([ Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut, Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut, ]); *this.state = EngineReorgState::Reorg { queue: VecDeque::from([ // Current payload BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }, // Reorg payload BeaconEngineMessage::NewPayload { payload: reorg_payload, cancun_fields: reorg_cancun_fields, tx: reorg_payload_tx, }, // Reorg forkchoice state BeaconEngineMessage::ForkchoiceUpdated { state: reorg_forkchoice_state, payload_attrs: None, tx: reorg_fcu_tx, }, ]), }; continue } } Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) } Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => { // Record last forkchoice state forwarded to the engine. // We do not care if it's valid since engine should be able to handle // reorgs that rely on invalid forkchoice state. *this.last_forkchoice_state = Some(state); *this.forkchoice_states_forwarded += 1; Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) } item => item, }; return Poll::Ready(item) } } } fn create_reorg_head( provider: &Provider, evm_config: &Evm, payload_validator: &ExecutionPayloadValidator, next_payload: ExecutionPayload, next_cancun_fields: Option, ) -> RethResult<(ExecutionPayload, Option)> where Provider: BlockReader + StateProviderFactory, Evm: ConfigureEvm, { let chain_spec = payload_validator.chain_spec(); // Ensure next payload is valid. let next_block = payload_validator .ensure_well_formed_payload(next_payload, next_cancun_fields.into()) .map_err(RethError::msg)?; // Fetch reorg target block and its parent let reorg_target = provider .block_by_hash(next_block.parent_hash)? .ok_or_else(|| ProviderError::HeaderNotFound(next_block.parent_hash.into()))?; let reorg_target_parent = provider .block_by_hash(reorg_target.parent_hash)? .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?; // Configure state let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?; let mut state = State::builder() .with_database_ref(StateProviderDatabase::new(&state_provider)) .with_bundle_update() .build(); // Configure environments let mut cfg = CfgEnvWithHandlerCfg::new(Default::default(), Default::default()); let mut block_env = BlockEnv::default(); evm_config.fill_cfg_and_block_env( &mut cfg, &mut block_env, chain_spec, &reorg_target.header, U256::MAX, ); let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, Default::default()); let mut evm = evm_config.evm_with_env(&mut state, env); // apply eip-4788 pre block contract call apply_beacon_root_contract_call( evm_config, chain_spec, reorg_target.timestamp, reorg_target.number, reorg_target.parent_beacon_block_root, &mut evm, )?; let mut cumulative_gas_used = 0; let mut sum_blob_gas_used = 0; let mut transactions = Vec::new(); let mut receipts = Vec::new(); let mut versioned_hashes = Vec::new(); for tx in next_block.body { // ensure we still have capacity for this transaction if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit { continue } // Configure the environment for the block. let tx_recovered = tx.clone().try_into_ecrecovered().map_err(|_| { BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError) })?; evm_config.fill_tx_env(evm.tx_mut(), &tx_recovered, tx_recovered.signer()); let exec_result = match evm.transact() { Ok(result) => result, error @ Err(EVMError::Transaction(_) | EVMError::Header(_)) => { trace!(target: "engine::stream::reorg", hash = %tx.hash(), ?error, "Error executing transaction from next block"); continue } // Treat error as fatal Err(error) => { return Err(RethError::Execution(BlockExecutionError::Validation( BlockValidationError::EVM { hash: tx.hash, error: Box::new(error) }, ))) } }; evm.db_mut().commit(exec_result.state); if let Some(blob_tx) = tx.transaction.as_eip4844() { sum_blob_gas_used += blob_tx.blob_gas(); versioned_hashes.extend(blob_tx.blob_versioned_hashes.clone()); } cumulative_gas_used += exec_result.result.gas_used(); #[allow(clippy::needless_update)] // side-effect of optimism fields receipts.push(Some(Receipt { tx_type: tx.tx_type(), success: exec_result.result.is_success(), cumulative_gas_used, logs: exec_result.result.into_logs().into_iter().map(Into::into).collect(), ..Default::default() })); // append transaction to the list of executed transactions transactions.push(tx); } drop(evm); if let Some(withdrawals) = &reorg_target.withdrawals { state.increment_balances(post_block_withdrawals_balance_increments( chain_spec, reorg_target.timestamp, withdrawals, ))?; } // merge all transitions into bundle state, this would apply the withdrawal balance changes // and 4788 contract call state.merge_transitions(BundleRetention::PlainState); let outcome = ExecutionOutcome::new( state.take_bundle(), Receipts::from(vec![receipts]), reorg_target.number, Default::default(), ); let (blob_gas_used, excess_blob_gas) = if chain_spec.is_cancun_active_at_timestamp(reorg_target.timestamp) { ( Some(sum_blob_gas_used), Some(calculate_excess_blob_gas( reorg_target_parent.excess_blob_gas.unwrap_or_default(), reorg_target_parent.blob_gas_used.unwrap_or_default(), )), ) } else { (None, None) }; let reorg_block = Block { header: Header { // Set same fields as the reorg target parent_hash: reorg_target.header.parent_hash, ommers_hash: reorg_target.header.ommers_hash, beneficiary: reorg_target.header.beneficiary, difficulty: reorg_target.header.difficulty, number: reorg_target.header.number, gas_limit: reorg_target.header.gas_limit, timestamp: reorg_target.header.timestamp, extra_data: reorg_target.header.extra_data, mix_hash: reorg_target.header.mix_hash, nonce: reorg_target.header.nonce, base_fee_per_gas: reorg_target.header.base_fee_per_gas, parent_beacon_block_root: reorg_target.header.parent_beacon_block_root, withdrawals_root: reorg_target.header.withdrawals_root, // Compute or add new fields transactions_root: proofs::calculate_transaction_root(&transactions), receipts_root: outcome.receipts_root_slow(reorg_target.header.number).unwrap(), logs_bloom: outcome.block_logs_bloom(reorg_target.header.number).unwrap(), requests_root: None, // TODO(prague) gas_used: cumulative_gas_used, blob_gas_used, excess_blob_gas, state_root: state_provider.state_root(outcome.state())?, }, body: transactions, ommers: reorg_target.ommers, withdrawals: reorg_target.withdrawals, requests: None, // TODO(prague) } .seal_slow(); Ok(( block_to_payload(reorg_block), reorg_target .header .parent_beacon_block_root .map(|root| CancunPayloadFields { parent_beacon_block_root: root, versioned_hashes }), )) }