//! Command that initializes the node by importing OP Mainnet chain segment below Bedrock, from a //! file. use clap::Parser; use reth_cli_commands::common::{AccessRights, Environment, EnvironmentArgs}; use reth_consensus::noop::NoopConsensus; use reth_db::tables; use reth_db_api::transaction::DbTx; use reth_downloaders::file_client::{ ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE, }; use reth_node_core::version::SHORT_VERSION; use reth_optimism_primitives::bedrock_import::is_dup_tx; use reth_provider::StageCheckpointReader; use reth_prune::PruneModes; use reth_stages::StageId; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; use tracing::{debug, error, info}; use crate::commands::build_pipeline::build_import_pipeline; /// Syncs RLP encoded blocks from a file. #[derive(Debug, Parser)] pub struct ImportOpCommand { #[command(flatten)] env: EnvironmentArgs, /// Chunk byte length to read from file. #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)] chunk_len: Option, /// The path to a block file for import. /// /// The online stages (headers and bodies) are replaced by a file import, after which the /// remaining stages are executed. #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)] path: PathBuf, } impl ImportOpCommand { /// Execute `import` command pub async fn execute(self) -> eyre::Result<()> { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); info!(target: "reth::cli", "Disabled stages requiring state, since cannot execute OVM state changes" ); debug!(target: "reth::cli", chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE), "Chunking chain import" ); let Environment { provider_factory, config, .. } = self.env.init(AccessRights::RW)?; // we use noop here because we expect the inputs to be valid let consensus = Arc::new(NoopConsensus::default()); // open file let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?; let mut total_decoded_blocks = 0; let mut total_decoded_txns = 0; let mut total_filtered_out_dup_txns = 0; while let Some(mut file_client) = reader.next_chunk::().await? { // create a new FileClient from chunk read from file info!(target: "reth::cli", "Importing chain file chunk" ); let tip = file_client.tip().ok_or_else(|| eyre::eyre!("file client has no tip"))?; info!(target: "reth::cli", "Chain file chunk read"); total_decoded_blocks += file_client.headers_len(); total_decoded_txns += file_client.total_transactions(); for (block_number, body) in file_client.bodies_iter_mut() { body.transactions.retain(|_| { if is_dup_tx(block_number) { total_filtered_out_dup_txns += 1; return false } true }) } let (mut pipeline, events) = build_import_pipeline( &config, provider_factory.clone(), &consensus, Arc::new(file_client), StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), true, ) .await?; // override the tip pipeline.set_tip(tip); debug!(target: "reth::cli", ?tip, "Tip manually set"); let provider = provider_factory.provider()?; let latest_block_number = provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); tokio::spawn(reth_node_events::node::handle_events( None, latest_block_number, events, provider_factory.db_ref().clone(), )); // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); tokio::select! { res = pipeline.run() => res?, _ = tokio::signal::ctrl_c() => {}, } } let provider = provider_factory.provider()?; let total_imported_blocks = provider.tx_ref().entries::()?; let total_imported_txns = provider.tx_ref().entries::()?; if total_decoded_blocks != total_imported_blocks || total_decoded_txns != total_imported_txns + total_filtered_out_dup_txns { error!(target: "reth::cli", total_decoded_blocks, total_imported_blocks, total_decoded_txns, total_filtered_out_dup_txns, total_imported_txns, "Chain was partially imported" ); } info!(target: "reth::cli", total_imported_blocks, total_imported_txns, total_decoded_blocks, total_decoded_txns, total_filtered_out_dup_txns, "Chain file imported" ); Ok(()) } }