//! Utils for `stages`. use reth_config::config::EtlConfig; use reth_db::{BlockNumberList, Database}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, models::sharded_key::NUM_OF_INDICES_IN_SHARD, table::{Decompress, Table}, transaction::{DbTx, DbTxMut}, DatabaseError, }; use reth_etl::Collector; use reth_primitives::BlockNumber; use reth_provider::DatabaseProviderRW; use reth_stages_api::StageError; use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; /// Number of blocks before pushing indices from cache to [`Collector`] const DEFAULT_CACHE_THRESHOLD: u64 = 100_000; /// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a /// [`Collector`]. /// /// ## Process /// The function utilizes a `HashMap` cache with a structure of `PartialKey` (`P`) (Address or /// Address.StorageKey) to `BlockNumberList`. When the cache exceeds its capacity, its contents are /// moved to a [`Collector`]. Here, each entry's key is a concatenation of `PartialKey` and the /// highest block number in its list. /// /// ## Example /// 1. Initial Cache State: `{ Address1: [1,2,3], ... }` /// 2. Cache is flushed to the `Collector`. /// 3. Updated Cache State: `{ Address1: [100,300], ... }` /// 4. Cache is flushed again. /// /// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and /// `(Address1.300, [100,300])`. The entries may be stored across one or more files. pub(crate) fn collect_history_indices( provider: &DatabaseProviderRW, range: impl RangeBounds, sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key, partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P), etl_config: &EtlConfig, ) -> Result, StageError> where DB: Database, CS: Table, H: Table, P: Copy + Eq + Hash, { let mut changeset_cursor = provider.tx_ref().cursor_read::()?; let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone()); let mut cache: HashMap> = HashMap::new(); let mut collect = |cache: &HashMap>| { for (key, indice_list) in cache { let last = indice_list.last().expect("qed"); collector.insert( sharded_key_factory(*key, *last), BlockNumberList::new_pre_sorted(indice_list), )?; } Ok::<(), StageError>(()) }; // observability let total_changesets = provider.tx_ref().entries::()?; let interval = (total_changesets / 1000).max(1); let mut flush_counter = 0; let mut current_block_number = u64::MAX; for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() { let (block_number, key) = partial_key_factory(entry?); cache.entry(key).or_default().push(block_number); if idx > 0 && idx % interval == 0 && total_changesets > 1000 { info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); } // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks. if current_block_number != block_number { current_block_number = block_number; flush_counter += 1; if flush_counter > DEFAULT_CACHE_THRESHOLD { collect(&cache)?; cache.clear(); flush_counter = 0; } } } collect(&cache)?; Ok(collector) } /// Given a [`Collector`] created by [`collect_history_indices`] it iterates all entries, loading /// the indices into the database in shards. /// /// ## Process /// Iterates over elements, grouping indices by their partial keys (e.g., `Address` or /// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length /// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial /// key shard is stored. pub(crate) fn load_history_indices( provider: &DatabaseProviderRW, mut collector: Collector, append_only: bool, sharded_key_factory: impl Clone + Fn(P, u64) -> ::Key, decode_key: impl Fn(Vec) -> Result<::Key, DatabaseError>, get_partial: impl Fn(::Key) -> P, ) -> Result<(), StageError> where DB: Database, H: Table, P: Copy + Default + Eq, { let mut write_cursor = provider.tx_ref().cursor_write::()?; let mut current_partial = P::default(); let mut current_list = Vec::::new(); // observability let total_entries = collector.len(); let interval = (total_entries / 100).max(1); for (index, element) in collector.iter()?.enumerate() { let (k, v) = element?; let sharded_key = decode_key(k)?; let new_list = BlockNumberList::decompress_owned(v)?; if index > 0 && index % interval == 0 && total_entries > 100 { info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); } // AccountsHistory: `Address`. // StorageHistory: `Address.StorageKey`. let partial_key = get_partial(sharded_key); if current_partial != partial_key { // We have reached the end of this subset of keys so // we need to flush its last indice shard. load_indices( &mut write_cursor, current_partial, &mut current_list, &sharded_key_factory, append_only, LoadMode::Flush, )?; current_partial = partial_key; current_list.clear(); // If it's not the first sync, there might an existing shard already, so we need to // merge it with the one coming from the collector if !append_only { if let Some((_, last_database_shard)) = write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))? { current_list.extend(last_database_shard.iter()); } } } current_list.extend(new_list.iter()); load_indices( &mut write_cursor, current_partial, &mut current_list, &sharded_key_factory, append_only, LoadMode::KeepLast, )?; } // There will be one remaining shard that needs to be flushed to DB. load_indices( &mut write_cursor, current_partial, &mut current_list, &sharded_key_factory, append_only, LoadMode::Flush, )?; Ok(()) } /// Shard and insert the indices list according to [`LoadMode`] and its length. pub(crate) fn load_indices( cursor: &mut C, partial_key: P, list: &mut Vec, sharded_key_factory: &impl Fn(P, BlockNumber) -> ::Key, append_only: bool, mode: LoadMode, ) -> Result<(), StageError> where C: DbCursorRO + DbCursorRW, H: Table, P: Copy, { if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() { let chunks = list .chunks(NUM_OF_INDICES_IN_SHARD) .map(|chunks| chunks.to_vec()) .collect::>>(); let mut iter = chunks.into_iter().peekable(); while let Some(chunk) = iter.next() { let mut highest = *chunk.last().expect("at least one index"); if !mode.is_flush() && iter.peek().is_none() { *list = chunk; } else { if iter.peek().is_none() { highest = u64::MAX; } let key = sharded_key_factory(partial_key, highest); let value = BlockNumberList::new_pre_sorted(chunk); if append_only { cursor.append(key, value)?; } else { cursor.upsert(key, value)?; } } } } Ok(()) } /// Mode on how to load index shards into the database. pub(crate) enum LoadMode { /// Keep the last shard in memory and don't flush it to the database. KeepLast, /// Flush all shards into the database. Flush, } impl LoadMode { const fn is_flush(&self) -> bool { matches!(self, Self::Flush) } }