use super::{ manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider, }; use crate::providers::static_file::metrics::StaticFileProviderOperation; use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; use reth_codecs::Compact; use reth_db_api::models::CompactU256; use reth_nippy_jar::{ConsistencyFailStrategy, NippyJar, NippyJarError, NippyJarWriter}; use reth_primitives::{ static_file::{find_fixed_range, SegmentHeader, SegmentRangeInclusive}, BlockHash, BlockNumber, Header, Receipt, StaticFileSegment, TransactionSignedNoHash, TxNumber, U256, }; use reth_storage_errors::provider::{ProviderError, ProviderResult}; use std::{ borrow::Borrow, path::{Path, PathBuf}, sync::{Arc, Weak}, time::Instant, }; use tracing::debug; /// Static file writers for every known [`StaticFileSegment`]. /// /// WARNING: Trying to use more than one writer for the same segment type **will result in a /// deadlock**. #[derive(Debug, Default)] pub(crate) struct StaticFileWriters { headers: RwLock>, transactions: RwLock>, receipts: RwLock>, } impl StaticFileWriters { pub(crate) fn get_or_create( &self, segment: StaticFileSegment, create_fn: impl FnOnce() -> ProviderResult, ) -> ProviderResult> { let mut write_guard = match segment { StaticFileSegment::Headers => self.headers.write(), StaticFileSegment::Transactions => self.transactions.write(), StaticFileSegment::Receipts => self.receipts.write(), }; if write_guard.is_none() { *write_guard = Some(create_fn()?); } Ok(StaticFileProviderRWRefMut(write_guard)) } pub(crate) fn commit(&self) -> ProviderResult<()> { for writer_lock in [&self.headers, &self.transactions, &self.receipts] { let mut writer = writer_lock.write(); if let Some(writer) = writer.as_mut() { writer.commit()?; } } Ok(()) } } /// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`]. #[derive(Debug)] pub struct StaticFileProviderRWRefMut<'a>( pub(crate) RwLockWriteGuard<'a, RawRwLock, Option>, ); impl<'a> std::ops::DerefMut for StaticFileProviderRWRefMut<'a> { fn deref_mut(&mut self) -> &mut Self::Target { // This is always created by [`StaticFileWriters::get_or_create`] self.0.as_mut().expect("static file writer provider should be init") } } impl<'a> std::ops::Deref for StaticFileProviderRWRefMut<'a> { type Target = StaticFileProviderRW; fn deref(&self) -> &Self::Target { // This is always created by [`StaticFileWriters::get_or_create`] self.0.as_ref().expect("static file writer provider should be init") } } #[derive(Debug)] /// Extends `StaticFileProvider` with writing capabilities pub struct StaticFileProviderRW { /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle. reader: Weak, /// A [`NippyJarWriter`] instance. writer: NippyJarWriter, /// Path to opened file. data_path: PathBuf, /// Reusable buffer for encoding appended data. buf: Vec, /// Metrics. metrics: Option>, /// On commit, does the instructed pruning: number of lines, and if it applies, the last block /// it ends at. prune_on_commit: Option<(u64, Option)>, } impl StaticFileProviderRW { /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`]. pub fn new( segment: StaticFileSegment, block: BlockNumber, reader: Weak, metrics: Option>, ) -> ProviderResult { let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?; Ok(Self { writer, data_path, buf: Vec::with_capacity(100), reader, metrics, prune_on_commit: None, }) } fn open( segment: StaticFileSegment, block: u64, reader: Weak, metrics: Option>, ) -> ProviderResult<(NippyJarWriter, PathBuf)> { let start = Instant::now(); let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader); let block_range = find_fixed_range(block); let (jar, path) = match static_file_provider.get_segment_provider_from_block( segment, block_range.start(), None, ) { Ok(provider) => ( NippyJar::load(provider.data_path()) .map_err(|e| ProviderError::NippyJar(e.to_string()))?, provider.data_path().into(), ), Err(ProviderError::MissingStaticFileBlock(_, _)) => { let path = static_file_provider.directory().join(segment.filename(&block_range)); (create_jar(segment, &path, block_range), path) } Err(err) => return Err(err), }; let reader = Self::upgrade_provider_to_strong_reference(&reader); let access = if reader.is_read_only() { ConsistencyFailStrategy::ThrowError } else { ConsistencyFailStrategy::Heal }; let result = match NippyJarWriter::new(jar, access) { Ok(writer) => Ok((writer, path)), Err(NippyJarError::FrozenJar) => { // This static file has been frozen, so we should Err(ProviderError::FinalizedStaticFile(segment, block)) } Err(e) => Err(ProviderError::NippyJar(e.to_string())), }?; if let Some(metrics) = &metrics { metrics.record_segment_operation( segment, StaticFileProviderOperation::OpenWriter, Some(start.elapsed()), ); } Ok(result) } /// Checks the consistency of the file and heals it if necessary and `read_only` is set to /// false. If the check fails, it will return an error. /// /// If healing does happen, it will update the end range on the [`SegmentHeader`]. However, for /// transaction based segments, the block end range has to be found and healed externally. /// /// Check [`NippyJarWriter::ensure_file_consistency`] for more on healing. pub fn ensure_file_consistency(&mut self, read_only: bool) -> ProviderResult<()> { let inconsistent_error = || { ProviderError::NippyJar( "Inconsistent state found. Restart the node to heal.".to_string(), ) }; let check_mode = if read_only { ConsistencyFailStrategy::ThrowError } else { ConsistencyFailStrategy::Heal }; self.writer.ensure_file_consistency(check_mode).map_err(|error| { if matches!(error, NippyJarError::InconsistentState) { return inconsistent_error() } ProviderError::NippyJar(error.to_string()) })?; // If we have lost rows (in this run or previous), we need to update the [SegmentHeader]. let expected_rows = if self.user_header().segment().is_headers() { self.user_header().block_len().unwrap_or_default() } else { self.user_header().tx_len().unwrap_or_default() }; let pruned_rows = expected_rows - self.writer.rows() as u64; if pruned_rows > 0 { if read_only { return Err(inconsistent_error()) } self.user_header_mut().prune(pruned_rows); } self.writer.commit().map_err(|error| ProviderError::NippyJar(error.to_string()))?; // Updates the [SnapshotProvider] manager self.update_index()?; Ok(()) } /// Commits configuration changes to disk and updates the reader index with the new changes. pub fn commit(&mut self) -> ProviderResult<()> { let start = Instant::now(); // Truncates the data file if instructed to. if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() { match self.writer.user_header().segment() { StaticFileSegment::Headers => self.prune_header_data(to_delete)?, StaticFileSegment::Transactions => self .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?, StaticFileSegment::Receipts => { self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))? } } } if self.writer.is_dirty() { // Commits offsets and new user_header to disk self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( self.writer.user_header().segment(), StaticFileProviderOperation::CommitWriter, Some(start.elapsed()), ); } debug!( target: "provider::static_file", segment = ?self.writer.user_header().segment(), path = ?self.data_path, duration = ?start.elapsed(), "Commit" ); self.update_index()?; } Ok(()) } /// Commits configuration changes to disk and updates the reader index with the new changes. /// /// CAUTION: does not call `sync_all` on the files. #[cfg(feature = "test-utils")] pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> { let start = Instant::now(); // Commits offsets and new user_header to disk self.writer .commit_without_sync_all() .map_err(|e| ProviderError::NippyJar(e.to_string()))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( self.writer.user_header().segment(), StaticFileProviderOperation::CommitWriter, Some(start.elapsed()), ); } debug!( target: "provider::static_file", segment = ?self.writer.user_header().segment(), path = ?self.data_path, duration = ?start.elapsed(), "Commit" ); self.update_index()?; Ok(()) } /// Updates the `self.reader` internal index. fn update_index(&self) -> ProviderResult<()> { // We find the maximum block of the segment by checking this writer's last block. // // However if there's no block range (because there's no data), we try to calculate it by // subtracting 1 from the expected block start, resulting on the last block of the // previous file. // // If that expected block start is 0, then it means that there's no actual block data, and // there's no block data in static files. let segment_max_block = match self.writer.user_header().block_range() { Some(block_range) => Some(block_range.end()), None => { if self.writer.user_header().expected_block_start() > 0 { Some(self.writer.user_header().expected_block_start() - 1) } else { None } } }; self.reader().update_index(self.writer.user_header().segment(), segment_max_block) } /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file, /// and create the next one if we are past the end range. /// /// Returns the current [`BlockNumber`] as seen in the static file. pub fn increment_block( &mut self, expected_block_number: BlockNumber, ) -> ProviderResult { let segment = self.writer.user_header().segment(); self.check_next_block_number(expected_block_number, segment)?; let start = Instant::now(); if let Some(last_block) = self.writer.user_header().block_end() { // We have finished the previous static file and must freeze it if last_block == self.writer.user_header().expected_block_end() { // Commits offsets and new user_header to disk self.commit()?; // Opens the new static file let (writer, data_path) = Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?; self.writer = writer; self.data_path = data_path; *self.writer.user_header_mut() = SegmentHeader::new(find_fixed_range(last_block + 1), None, None, segment); } } let block = self.writer.user_header_mut().increment_block(); if let Some(metrics) = &self.metrics { metrics.record_segment_operation( segment, StaticFileProviderOperation::IncrementBlock, Some(start.elapsed()), ); } Ok(block) } /// Verifies if the incoming block number matches the next expected block number /// for a static file. This ensures data continuity when adding new blocks. fn check_next_block_number( &self, expected_block_number: u64, segment: StaticFileSegment, ) -> ProviderResult<()> { // The next static file block number can be found by checking the one after block_end. // However if it's a new file that hasn't been added any data, its block range will actually // be None. In that case, the next block will be found on `expected_block_start`. let next_static_file_block = self .writer .user_header() .block_end() .map(|b| b + 1) .unwrap_or_else(|| self.writer.user_header().expected_block_start()); if expected_block_number != next_static_file_block { return Err(ProviderError::UnexpectedStaticFileBlockNumber( segment, expected_block_number, next_static_file_block, )) } Ok(()) } /// Truncates a number of rows from disk. It deletes and loads an older static file if block /// goes beyond the start of the current block range. /// /// **`last_block`** should be passed only with transaction based segments. /// /// # Note /// Commits to the configuration file at the end. fn truncate( &mut self, segment: StaticFileSegment, num_rows: u64, last_block: Option, ) -> ProviderResult<()> { let mut remaining_rows = num_rows; while remaining_rows > 0 { let len = match segment { StaticFileSegment::Headers => { self.writer.user_header().block_len().unwrap_or_default() } StaticFileSegment::Transactions | StaticFileSegment::Receipts => { self.writer.user_header().tx_len().unwrap_or_default() } }; if remaining_rows >= len { // If there's more rows to delete than this static file contains, then just // delete the whole file and go to the next static file let block_start = self.writer.user_header().expected_block_start(); if block_start != 0 { self.delete_current_and_open_previous()?; } else { // Update `SegmentHeader` self.writer.user_header_mut().prune(len); self.writer .prune_rows(len as usize) .map_err(|e| ProviderError::NippyJar(e.to_string()))?; break } remaining_rows -= len; } else { // Update `SegmentHeader` self.writer.user_header_mut().prune(remaining_rows); // Truncate data self.writer .prune_rows(remaining_rows as usize) .map_err(|e| ProviderError::NippyJar(e.to_string()))?; remaining_rows = 0; } } // Only Transactions and Receipts if let Some(last_block) = last_block { let mut expected_block_start = self.writer.user_header().expected_block_start(); if num_rows == 0 { // Edge case for when we are unwinding a chain of empty blocks that goes across // files, and therefore, the only reference point to know which file // we are supposed to be at is `last_block`. while last_block < expected_block_start { self.delete_current_and_open_previous()?; expected_block_start = self.writer.user_header().expected_block_start(); } } self.writer.user_header_mut().set_block_range(expected_block_start, last_block); } // Commits new changes to disk. self.commit()?; Ok(()) } /// Delete the current static file, and replace this provider writer with the previous static /// file. fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> { let current_path = self.data_path.clone(); let (previous_writer, data_path) = Self::open( self.user_header().segment(), self.writer.user_header().expected_block_start() - 1, self.reader.clone(), self.metrics.clone(), )?; self.writer = previous_writer; self.data_path = data_path; NippyJar::::load(¤t_path) .map_err(|e| ProviderError::NippyJar(e.to_string()))? .delete() .map_err(|e| ProviderError::NippyJar(e.to_string()))?; Ok(()) } /// Appends column to static file. fn append_column(&mut self, column: T) -> ProviderResult<()> { self.buf.clear(); column.to_compact(&mut self.buf); self.writer .append_column(Some(Ok(&self.buf))) .map_err(|e| ProviderError::NippyJar(e.to_string()))?; Ok(()) } /// Appends to tx number-based static file. /// /// Returns the current [`TxNumber`] as seen in the static file. fn append_with_tx_number( &mut self, segment: StaticFileSegment, tx_num: TxNumber, value: V, ) -> ProviderResult { debug_assert!(self.writer.user_header().segment() == segment); if self.writer.user_header().tx_range().is_none() { self.writer.user_header_mut().set_tx_range(tx_num, tx_num); } else { self.writer.user_header_mut().increment_tx(); } self.append_column(value)?; Ok(self.writer.user_header().tx_end().expect("qed")) } /// Appends header to static file. /// /// It **CALLS** `increment_block()` since the number of headers is equal to the number of /// blocks. /// /// Returns the current [`BlockNumber`] as seen in the static file. pub fn append_header( &mut self, header: &Header, total_difficulty: U256, hash: &BlockHash, ) -> ProviderResult { let start = Instant::now(); self.ensure_no_queued_prune()?; debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers); let block_number = self.increment_block(header.number)?; self.append_column(header)?; self.append_column(CompactU256::from(total_difficulty))?; self.append_column(hash)?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( StaticFileSegment::Headers, StaticFileProviderOperation::Append, Some(start.elapsed()), ); } Ok(block_number) } /// Appends transaction to static file. /// /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be /// empty blocks and this function wouldn't be called. /// /// Returns the current [`TxNumber`] as seen in the static file. pub fn append_transaction( &mut self, tx_num: TxNumber, tx: &TransactionSignedNoHash, ) -> ProviderResult { let start = Instant::now(); self.ensure_no_queued_prune()?; let result = self.append_with_tx_number(StaticFileSegment::Transactions, tx_num, tx)?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( StaticFileSegment::Transactions, StaticFileProviderOperation::Append, Some(start.elapsed()), ); } Ok(result) } /// Appends receipt to static file. /// /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be /// empty blocks and this function wouldn't be called. /// /// Returns the current [`TxNumber`] as seen in the static file. pub fn append_receipt( &mut self, tx_num: TxNumber, receipt: &Receipt, ) -> ProviderResult { let start = Instant::now(); self.ensure_no_queued_prune()?; let result = self.append_with_tx_number(StaticFileSegment::Receipts, tx_num, receipt)?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( StaticFileSegment::Receipts, StaticFileProviderOperation::Append, Some(start.elapsed()), ); } Ok(result) } /// Appends multiple receipts to the static file. /// /// Returns the current [`TxNumber`] as seen in the static file, if any. pub fn append_receipts(&mut self, receipts: I) -> ProviderResult> where I: Iterator>, R: Borrow, { let mut receipts_iter = receipts.into_iter().peekable(); // If receipts are empty, we can simply return None if receipts_iter.peek().is_none() { return Ok(None); } let start = Instant::now(); self.ensure_no_queued_prune()?; // At this point receipts contains at least one receipt, so this would be overwritten. let mut tx_number = 0; let mut count: u64 = 0; for receipt_result in receipts_iter { let (tx_num, receipt) = receipt_result?; tx_number = self.append_with_tx_number(StaticFileSegment::Receipts, tx_num, receipt.borrow())?; count += 1; } if let Some(metrics) = &self.metrics { metrics.record_segment_operations( StaticFileSegment::Receipts, StaticFileProviderOperation::Append, count, Some(start.elapsed()), ); } Ok(Some(tx_number)) } /// Adds an instruction to prune `to_delete`transactions during commit. /// /// Note: `last_block` refers to the block the unwinds ends at. pub fn prune_transactions( &mut self, to_delete: u64, last_block: BlockNumber, ) -> ProviderResult<()> { debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions); self.queue_prune(to_delete, Some(last_block)) } /// Adds an instruction to prune `to_delete` receipts during commit. /// /// Note: `last_block` refers to the block the unwinds ends at. pub fn prune_receipts( &mut self, to_delete: u64, last_block: BlockNumber, ) -> ProviderResult<()> { debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts); self.queue_prune(to_delete, Some(last_block)) } /// Adds an instruction to prune `to_delete` headers during commit. pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> { debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers); self.queue_prune(to_delete, None) } /// Adds an instruction to prune `to_delete` elements during commit. /// /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based /// data. fn queue_prune( &mut self, to_delete: u64, last_block: Option, ) -> ProviderResult<()> { self.ensure_no_queued_prune()?; self.prune_on_commit = Some((to_delete, last_block)); Ok(()) } /// Returns Error if there is a pruning instruction that needs to be applied. fn ensure_no_queued_prune(&self) -> ProviderResult<()> { if self.prune_on_commit.is_some() { return Err(ProviderError::NippyJar( "Pruning should be committed before appending or pruning more data".to_string(), )) } Ok(()) } /// Removes the last `to_delete` transactions from the data file. fn prune_transaction_data( &mut self, to_delete: u64, last_block: BlockNumber, ) -> ProviderResult<()> { let start = Instant::now(); let segment = StaticFileSegment::Transactions; debug_assert!(self.writer.user_header().segment() == segment); self.truncate(segment, to_delete, Some(last_block))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( StaticFileSegment::Transactions, StaticFileProviderOperation::Prune, Some(start.elapsed()), ); } Ok(()) } /// Prunes the last `to_delete` receipts from the data file. fn prune_receipt_data( &mut self, to_delete: u64, last_block: BlockNumber, ) -> ProviderResult<()> { let start = Instant::now(); let segment = StaticFileSegment::Receipts; debug_assert!(self.writer.user_header().segment() == segment); self.truncate(segment, to_delete, Some(last_block))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( StaticFileSegment::Receipts, StaticFileProviderOperation::Prune, Some(start.elapsed()), ); } Ok(()) } /// Prunes the last `to_delete` headers from the data file. fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> { let start = Instant::now(); let segment = StaticFileSegment::Headers; debug_assert!(self.writer.user_header().segment() == segment); self.truncate(segment, to_delete, None)?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( StaticFileSegment::Headers, StaticFileProviderOperation::Prune, Some(start.elapsed()), ); } Ok(()) } fn reader(&self) -> StaticFileProvider { Self::upgrade_provider_to_strong_reference(&self.reader) } /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference /// [`StaticFileProvider`]. /// /// # Panics /// /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the /// [`StaticFileProvider`]. fn upgrade_provider_to_strong_reference( provider: &Weak, ) -> StaticFileProvider { provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped") } /// Helper function to access [`SegmentHeader`]. pub const fn user_header(&self) -> &SegmentHeader { self.writer.user_header() } /// Helper function to access a mutable reference to [`SegmentHeader`]. pub fn user_header_mut(&mut self) -> &mut SegmentHeader { self.writer.user_header_mut() } /// Helper function to override block range for testing. #[cfg(any(test, feature = "test-utils"))] pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive) { self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end()) } /// Helper function to override block range for testing. #[cfg(any(test, feature = "test-utils"))] pub fn inner(&mut self) -> &mut NippyJarWriter { &mut self.writer } } fn create_jar( segment: StaticFileSegment, path: &Path, expected_block_range: SegmentRangeInclusive, ) -> NippyJar { let mut jar = NippyJar::new( segment.columns(), path, SegmentHeader::new(expected_block_range, None, None, segment), ); // Transaction and Receipt already have the compression scheme used natively in its encoding. // (zstd-dictionary) if segment.is_headers() { jar = jar.with_lz4(); } jar }