//! Cursor wrapper for libmdbx-sys. use crate::{ metrics::{DatabaseEnvMetrics, Operation}, tables::utils::*, DatabaseError, }; use reth_db_api::{ common::{PairResult, ValueOnlyResult}, cursor::{ DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker, ReverseWalker, Walker, }, table::{Compress, Decode, Decompress, DupSort, Encode, Table}, }; use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW}; use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation}; use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc}; /// Read only Cursor. pub type CursorRO = Cursor; /// Read write cursor. pub type CursorRW = Cursor; /// Cursor wrapper to access KV items. #[derive(Debug)] pub struct Cursor { /// Inner `libmdbx` cursor. pub(crate) inner: reth_libmdbx::Cursor, /// Cache buffer that receives compressed values. buf: Vec, /// Reference to metric handles in the DB environment. If `None`, metrics are not recorded. metrics: Option>, /// Phantom data to enforce encoding/decoding. _dbi: PhantomData, } impl Cursor { pub(crate) const fn new_with_metrics( inner: reth_libmdbx::Cursor, metrics: Option>, ) -> Self { Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData } } /// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value /// size. /// /// Otherwise, just execute the closure. fn execute_with_operation_metric( &mut self, operation: Operation, value_size: Option, f: impl FnOnce(&mut Self) -> R, ) -> R { if let Some(metrics) = self.metrics.as_ref().cloned() { metrics.record_operation(T::NAME, operation, value_size, || f(self)) } else { f(self) } } } /// Decodes a `(key, value)` pair from the database. #[allow(clippy::type_complexity)] pub fn decode( res: Result, Cow<'_, [u8]>)>, impl Into>, ) -> PairResult where T: Table, T::Key: Decode, T::Value: Decompress, { res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::).transpose() } /// Some types don't support compression (eg. B256), and we don't want to be copying them to the /// allocated buffer when we can just use their reference. macro_rules! compress_to_buf_or_ref { ($self:expr, $value:expr) => { if let Some(value) = $value.uncompressable_ref() { Some(value) } else { $self.buf.truncate(0); $value.compress_to_buf(&mut $self.buf); None } }; } impl DbCursorRO for Cursor { fn first(&mut self) -> PairResult { decode::(self.inner.first()) } fn seek_exact(&mut self, key: ::Key) -> PairResult { decode::(self.inner.set_key(key.encode().as_ref())) } fn seek(&mut self, key: ::Key) -> PairResult { decode::(self.inner.set_range(key.encode().as_ref())) } fn next(&mut self) -> PairResult { decode::(self.inner.next()) } fn prev(&mut self) -> PairResult { decode::(self.inner.prev()) } fn last(&mut self) -> PairResult { decode::(self.inner.last()) } fn current(&mut self) -> PairResult { decode::(self.inner.get_current()) } fn walk(&mut self, start_key: Option) -> Result, DatabaseError> { let start = if let Some(start_key) = start_key { decode::(self.inner.set_range(start_key.encode().as_ref())).transpose() } else { self.first().transpose() }; Ok(Walker::new(self, start)) } fn walk_range( &mut self, range: impl RangeBounds, ) -> Result, DatabaseError> { let start = match range.start_bound().cloned() { Bound::Included(key) => self.inner.set_range(key.encode().as_ref()), Bound::Excluded(_key) => { unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds"); } Bound::Unbounded => self.inner.first(), }; let start = decode::(start).transpose(); Ok(RangeWalker::new(self, start, range.end_bound().cloned())) } fn walk_back( &mut self, start_key: Option, ) -> Result, DatabaseError> { let start = if let Some(start_key) = start_key { decode::(self.inner.set_range(start_key.encode().as_ref())) } else { self.last() } .transpose(); Ok(ReverseWalker::new(self, start)) } } impl DbDupCursorRO for Cursor { /// Returns the next `(key, value)` pair of a DUPSORT table. fn next_dup(&mut self) -> PairResult { decode::(self.inner.next_dup()) } /// Returns the next `(key, value)` pair skipping the duplicates. fn next_no_dup(&mut self) -> PairResult { decode::(self.inner.next_nodup()) } /// Returns the next `value` of a duplicate `key`. fn next_dup_val(&mut self) -> ValueOnlyResult { self.inner .next_dup() .map_err(|e| DatabaseError::Read(e.into()))? .map(decode_value::) .transpose() } fn seek_by_key_subkey( &mut self, key: ::Key, subkey: ::SubKey, ) -> ValueOnlyResult { self.inner .get_both_range(key.encode().as_ref(), subkey.encode().as_ref()) .map_err(|e| DatabaseError::Read(e.into()))? .map(decode_one::) .transpose() } /// Depending on its arguments, returns an iterator starting at: /// - Some(key), Some(subkey): a `key` item whose data is >= than `subkey` /// - Some(key), None: first item of a specified `key` /// - None, Some(subkey): like first case, but in the first key /// - None, None: first item in the table of a DUPSORT table. fn walk_dup( &mut self, key: Option, subkey: Option, ) -> Result, DatabaseError> { let start = match (key, subkey) { (Some(key), Some(subkey)) => { // encode key and decode it after. let key: Vec = key.encode().into(); self.inner .get_both_range(key.as_ref(), subkey.encode().as_ref()) .map_err(|e| DatabaseError::Read(e.into()))? .map(|val| decoder::((Cow::Owned(key), val))) } (Some(key), None) => { let key: Vec = key.encode().into(); self.inner .set(key.as_ref()) .map_err(|e| DatabaseError::Read(e.into()))? .map(|val| decoder::((Cow::Owned(key), val))) } (None, Some(subkey)) => { if let Some((key, _)) = self.first()? { let key: Vec = key.encode().into(); self.inner .get_both_range(key.as_ref(), subkey.encode().as_ref()) .map_err(|e| DatabaseError::Read(e.into()))? .map(|val| decoder::((Cow::Owned(key), val))) } else { Some(Err(DatabaseError::Read(MDBXError::NotFound.into()))) } } (None, None) => self.first().transpose(), }; Ok(DupWalker::<'_, T, Self> { cursor: self, start }) } } impl DbCursorRW for Cursor { /// Database operation that will update an existing row if a specified value already /// exists in a table, and insert a new row if the specified value doesn't already exist /// /// For a DUPSORT table, `upsert` will not actually update-or-insert. If the key already exists, /// it will append the value to the subkey, even if the subkeys are the same. So if you want /// to properly upsert, you'll need to `seek_exact` & `delete_current` if the key+subkey was /// found, before calling `upsert`. fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); let value = compress_to_buf_or_ref!(self, value); self.execute_with_operation_metric( Operation::CursorUpsert, Some(value.unwrap_or(&self.buf).len()), |this| { this.inner .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT) .map_err(|e| { DatabaseWriteError { info: e.into(), operation: DatabaseWriteOperation::CursorUpsert, table_name: T::NAME, key: key.into(), } .into() }) }, ) } fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); let value = compress_to_buf_or_ref!(self, value); self.execute_with_operation_metric( Operation::CursorInsert, Some(value.unwrap_or(&self.buf).len()), |this| { this.inner .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE) .map_err(|e| { DatabaseWriteError { info: e.into(), operation: DatabaseWriteOperation::CursorInsert, table_name: T::NAME, key: key.into(), } .into() }) }, ) } /// Appends the data to the end of the table. Consequently, the append operation /// will fail if the inserted key is less than the last table key fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); let value = compress_to_buf_or_ref!(self, value); self.execute_with_operation_metric( Operation::CursorAppend, Some(value.unwrap_or(&self.buf).len()), |this| { this.inner .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND) .map_err(|e| { DatabaseWriteError { info: e.into(), operation: DatabaseWriteOperation::CursorAppend, table_name: T::NAME, key: key.into(), } .into() }) }, ) } fn delete_current(&mut self) -> Result<(), DatabaseError> { self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| { this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into())) }) } } impl DbDupCursorRW for Cursor { fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> { self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| { this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into())) }) } fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); let value = compress_to_buf_or_ref!(self, value); self.execute_with_operation_metric( Operation::CursorAppendDup, Some(value.unwrap_or(&self.buf).len()), |this| { this.inner .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP) .map_err(|e| { DatabaseWriteError { info: e.into(), operation: DatabaseWriteOperation::CursorAppendDup, table_name: T::NAME, key: key.into(), } .into() }) }, ) } }