use crate::{ identifier::{SenderId, TransactionId}, pool::size::SizeTracker, PoolTransaction, SubPoolLimit, ValidPoolTransaction, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER, }; use rustc_hash::FxHashMap; use smallvec::SmallVec; use std::{ cmp::Ordering, collections::{hash_map::Entry, BTreeMap, BTreeSet}, ops::{Bound::Unbounded, Deref}, sync::Arc, }; /// A pool of transactions that are currently parked and are waiting for external changes (e.g. /// basefee, ancestor transactions, balance) that eventually move the transaction into the pending /// pool. /// /// This pool is a bijection: at all times each set (`best`, `by_id`) contains the same /// transactions. /// /// Note: This type is generic over [`ParkedPool`] which enforces that the underlying transaction /// type is [`ValidPoolTransaction`] wrapped in an [Arc]. #[derive(Debug, Clone)] pub struct ParkedPool { /// Keeps track of transactions inserted in the pool. /// /// This way we can determine when transactions were submitted to the pool. submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. by_id: BTreeMap>, /// All transactions sorted by their order function. /// /// The higher, the better. best: BTreeSet>, /// Keeps track of last submission id for each sender. /// /// This are sorted in Reverse order, so the last (highest) submission id is first, and the /// lowest(oldest) is the last. last_sender_submission: BTreeSet, /// Keeps track of the number of transactions in the pool by the sender and the last submission /// id. sender_transaction_count: FxHashMap, /// Keeps track of the size of this pool. /// /// See also [`PoolTransaction::size`]. size_of: SizeTracker, } // === impl ParkedPool === impl ParkedPool { /// Adds a new transactions to the pending queue. /// /// # Panics /// /// If the transaction is already included. pub fn add_transaction(&mut self, tx: Arc>) { let id = *tx.id(); assert!( !self.contains(&id), "transaction already included {:?}", self.get(&id).unwrap().transaction.transaction ); let submission_id = self.next_id(); // keep track of size self.size_of += tx.size(); // update or create sender entry self.add_sender_count(tx.sender_id(), submission_id); let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() }; self.by_id.insert(id, transaction.clone()); self.best.insert(transaction); } /// Increments the count of transactions for the given sender and updates the tracked submission /// id. fn add_sender_count(&mut self, sender: SenderId, submission_id: u64) { match self.sender_transaction_count.entry(sender) { Entry::Occupied(mut entry) => { let value = entry.get_mut(); // remove the __currently__ tracked submission id self.last_sender_submission .remove(&SubmissionSenderId::new(sender, value.last_submission_id)); value.count += 1; value.last_submission_id = submission_id; } Entry::Vacant(entry) => { entry .insert(SenderTransactionCount { count: 1, last_submission_id: submission_id }); } } // insert a new entry self.last_sender_submission.insert(SubmissionSenderId::new(sender, submission_id)); } /// Decrements the count of transactions for the given sender. /// /// If the count reaches zero, the sender is removed from the map. /// /// Note: this does not update the tracked submission id for the sender, because we're only /// interested in the __last__ submission id when truncating the pool. fn remove_sender_count(&mut self, sender_id: SenderId) { let removed_sender = match self.sender_transaction_count.entry(sender_id) { Entry::Occupied(mut entry) => { let value = entry.get_mut(); value.count -= 1; if value.count == 0 { entry.remove() } else { return } } Entry::Vacant(_) => { // This should never happen because the bisection between the two maps unreachable!("sender count not found {:?}", sender_id); } }; // all transactions for this sender have been removed assert!( self.last_sender_submission .remove(&SubmissionSenderId::new(sender_id, removed_sender.last_submission_id)), "last sender transaction not found {sender_id:?}" ); } /// Returns an iterator over all transactions in the pool pub(crate) fn all( &self, ) -> impl Iterator>> + '_ { self.by_id.values().map(|tx| tx.transaction.clone().into()) } /// Removes the transaction from the pool pub(crate) fn remove_transaction( &mut self, id: &TransactionId, ) -> Option>> { // remove from queues let tx = self.by_id.remove(id)?; self.best.remove(&tx); self.remove_sender_count(tx.transaction.sender_id()); // keep track of size self.size_of -= tx.transaction.size(); Some(tx.transaction.into()) } /// Retrieves transactions by sender, using `SmallVec` to efficiently handle up to /// `TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER` transactions. pub(crate) fn get_txs_by_sender( &self, sender: SenderId, ) -> SmallVec<[TransactionId; TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER]> { self.by_id .range((sender.start_bound(), Unbounded)) .take_while(move |(other, _)| sender == other.sender) .map(|(tx_id, _)| *tx_id) .collect() } #[cfg(test)] pub(crate) fn get_senders_by_submission_id( &self, ) -> impl Iterator + '_ { self.last_sender_submission.iter().cloned() } /// Truncates the pool by removing transactions, until the given [`SubPoolLimit`] has been met. /// /// This is done by first ordering senders by the last time they have submitted a transaction /// /// Uses sender ids sorted by each sender's last submission id. Senders with older last /// submission ids are first. Note that _last_ submission ids are the newest submission id for /// that sender, so this sorts senders by the last time they submitted a transaction in /// descending order. Senders that have least recently submitted a transaction are first. /// /// Then, for each sender, all transactions for that sender are removed, until the pool limits /// have been met. /// /// Any removed transactions are returned. pub fn truncate_pool( &mut self, limit: SubPoolLimit, ) -> Vec>> { if !self.exceeds(&limit) { // if we are below the limits, we don't need to drop anything return Vec::new() } let mut removed = Vec::new(); while limit.is_exceeded(self.len(), self.size()) && !self.last_sender_submission.is_empty() { // NOTE: This will not panic due to `!last_sender_transaction.is_empty()` let sender_id = self.last_sender_submission.last().expect("not empty").sender_id; let list = self.get_txs_by_sender(sender_id); // Drop transactions from this sender until the pool is under limits for txid in list.into_iter().rev() { if let Some(tx) = self.remove_transaction(&txid) { removed.push(tx); } if !self.exceeds(&limit) { break } } } removed } fn next_id(&mut self) -> u64 { let id = self.submission_id; self.submission_id = self.submission_id.wrapping_add(1); id } /// The reported size of all transactions in this pool. pub(crate) fn size(&self) -> usize { self.size_of.into() } /// Number of transactions in the entire pool pub(crate) fn len(&self) -> usize { self.by_id.len() } /// Returns true if the pool exceeds the given limit #[inline] pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool { limit.is_exceeded(self.len(), self.size()) } /// Returns whether the pool is empty #[cfg(test)] #[allow(dead_code)] pub(crate) fn is_empty(&self) -> bool { self.by_id.is_empty() } /// Returns `true` if the transaction with the given id is already included in this pool. pub(crate) fn contains(&self, id: &TransactionId) -> bool { self.by_id.contains_key(id) } /// Retrieves a transaction with the given ID from the pool, if it exists. fn get(&self, id: &TransactionId) -> Option<&ParkedPoolTransaction> { self.by_id.get(id) } /// Asserts that the bijection between `by_id` and `best` is valid. #[cfg(any(test, feature = "test-utils"))] pub(crate) fn assert_invariants(&self) { assert_eq!(self.by_id.len(), self.best.len(), "by_id.len() != best.len()"); assert_eq!( self.last_sender_submission.len(), self.sender_transaction_count.len(), "last_sender_transaction.len() != sender_to_last_transaction.len()" ); } } impl ParkedPool> { /// Returns all transactions that satisfy the given basefee. /// /// Note: this does _not_ remove the transactions #[allow(dead_code)] pub(crate) fn satisfy_base_fee_transactions( &self, basefee: u64, ) -> Vec>> { let ids = self.satisfy_base_fee_ids(basefee); let mut txs = Vec::with_capacity(ids.len()); for id in ids { txs.push(self.get(&id).expect("transaction exists").transaction.clone().into()); } txs } /// Returns all transactions that satisfy the given basefee. fn satisfy_base_fee_ids(&self, basefee: u64) -> Vec { let mut transactions = Vec::new(); { let mut iter = self.by_id.iter().peekable(); while let Some((id, tx)) = iter.next() { if tx.transaction.transaction.max_fee_per_gas() < basefee as u128 { // still parked -> skip descendant transactions 'this: while let Some((peek, _)) = iter.peek() { if peek.sender != id.sender { break 'this } iter.next(); } } else { transactions.push(*id); } } } transactions } /// Removes all transactions and their dependent transaction from the subpool that no longer /// satisfy the given basefee. /// /// Note: the transactions are not returned in a particular order. pub(crate) fn enforce_basefee(&mut self, basefee: u64) -> Vec>> { let to_remove = self.satisfy_base_fee_ids(basefee); let mut removed = Vec::with_capacity(to_remove.len()); for id in to_remove { removed.push(self.remove_transaction(&id).expect("transaction exists")); } removed } } impl Default for ParkedPool { fn default() -> Self { Self { submission_id: 0, by_id: Default::default(), best: Default::default(), last_sender_submission: Default::default(), sender_transaction_count: Default::default(), size_of: Default::default(), } } } /// Keeps track of the number of transactions and the latest submission id for each sender. #[derive(Debug, Clone, Default, PartialEq, Eq)] struct SenderTransactionCount { count: u64, last_submission_id: u64, } /// Represents a transaction in this pool. #[derive(Debug)] struct ParkedPoolTransaction { /// Identifier that tags when transaction was submitted in the pool. submission_id: u64, /// Actual transaction. transaction: T, } impl Clone for ParkedPoolTransaction { fn clone(&self) -> Self { Self { submission_id: self.submission_id, transaction: self.transaction.clone() } } } impl Eq for ParkedPoolTransaction {} impl PartialEq for ParkedPoolTransaction { fn eq(&self, other: &Self) -> bool { self.cmp(other) == Ordering::Equal } } impl PartialOrd for ParkedPoolTransaction { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for ParkedPoolTransaction { fn cmp(&self, other: &Self) -> Ordering { // This compares by the transactions first, and only if two tx are equal this compares // the unique `submission_id`. // "better" transactions are Greater self.transaction .cmp(&other.transaction) .then_with(|| other.submission_id.cmp(&self.submission_id)) } } /// Includes a [`SenderId`] and `submission_id`. This is used to sort senders by their last /// submission id. #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub(crate) struct SubmissionSenderId { /// The sender id pub(crate) sender_id: SenderId, /// The submission id pub(crate) submission_id: u64, } impl SubmissionSenderId { /// Creates a new [`SubmissionSenderId`] based on the [`SenderId`] and `submission_id`. const fn new(sender_id: SenderId, submission_id: u64) -> Self { Self { sender_id, submission_id } } } impl Ord for SubmissionSenderId { fn cmp(&self, other: &Self) -> Ordering { // Reverse ordering for `submission_id` other.submission_id.cmp(&self.submission_id) } } impl PartialOrd for SubmissionSenderId { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } /// Helper trait used for custom `Ord` wrappers around a transaction. /// /// This is effectively a wrapper for `Arc` with custom `Ord` implementation. pub trait ParkedOrd: Ord + Clone + From>> + Into>> + Deref>> { /// The wrapper transaction type. type Transaction: PoolTransaction; } /// Helper macro to implement necessary conversions for `ParkedOrd` trait macro_rules! impl_ord_wrapper { ($name:ident) => { impl Clone for $name { fn clone(&self) -> Self { Self(self.0.clone()) } } impl Eq for $name {} impl PartialEq for $name { fn eq(&self, other: &Self) -> bool { self.cmp(other) == Ordering::Equal } } impl PartialOrd for $name { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Deref for $name { type Target = Arc>; fn deref(&self) -> &Self::Target { &self.0 } } impl ParkedOrd for $name { type Transaction = T; } impl From>> for $name { fn from(value: Arc>) -> Self { Self(value) } } impl From<$name> for Arc> { fn from(value: $name) -> Arc> { value.0 } } }; } /// A new type wrapper for [`ValidPoolTransaction`] /// /// This sorts transactions by their base fee. /// /// Caution: This assumes all transaction in the `BaseFee` sub-pool have a fee value. #[derive(Debug)] pub struct BasefeeOrd(Arc>); impl_ord_wrapper!(BasefeeOrd); impl Ord for BasefeeOrd { fn cmp(&self, other: &Self) -> Ordering { self.0.transaction.max_fee_per_gas().cmp(&other.0.transaction.max_fee_per_gas()) } } /// A new type wrapper for [`ValidPoolTransaction`] /// /// This sorts transactions by their distance. /// /// `Queued` transactions are transactions that are currently blocked by other parked (basefee, /// queued) or missing transactions. /// /// The primary order function always compares the transaction costs first. In case these /// are equal, it compares the timestamps when the transactions were created. #[derive(Debug)] pub struct QueuedOrd(Arc>); impl_ord_wrapper!(QueuedOrd); // TODO: temporary solution for ordering the queued pool. impl Ord for QueuedOrd { fn cmp(&self, other: &Self) -> Ordering { // Higher price is better self.max_fee_per_gas().cmp(&self.max_fee_per_gas()).then_with(|| // Lower timestamp is better other.timestamp.cmp(&self.timestamp)) } } #[cfg(test)] mod tests { use super::*; use crate::test_utils::{MockTransaction, MockTransactionFactory, MockTransactionSet}; use reth_primitives::{address, TxType}; use std::collections::HashSet; #[test] fn test_enforce_parked_basefee() { let mut f = MockTransactionFactory::default(); let mut pool = ParkedPool::>::default(); let tx = f.validated_arc(MockTransaction::eip1559().inc_price()); pool.add_transaction(tx.clone()); assert!(pool.contains(tx.id())); assert_eq!(pool.len(), 1); let removed = pool.enforce_basefee(u64::MAX); assert!(removed.is_empty()); let removed = pool.enforce_basefee((tx.max_fee_per_gas() - 1) as u64); assert_eq!(removed.len(), 1); assert!(pool.is_empty()); } #[test] fn test_enforce_parked_basefee_descendant() { let mut f = MockTransactionFactory::default(); let mut pool = ParkedPool::>::default(); let t = MockTransaction::eip1559().inc_price_by(10); let root_tx = f.validated_arc(t.clone()); pool.add_transaction(root_tx.clone()); let descendant_tx = f.validated_arc(t.inc_nonce().decr_price()); pool.add_transaction(descendant_tx.clone()); assert!(pool.contains(root_tx.id())); assert!(pool.contains(descendant_tx.id())); assert_eq!(pool.len(), 2); let removed = pool.enforce_basefee(u64::MAX); assert!(removed.is_empty()); assert_eq!(pool.len(), 2); // two dependent tx in the pool with decreasing fee { // TODO: test change might not be intended, re review let mut pool2 = pool.clone(); let removed = pool2.enforce_basefee(root_tx.max_fee_per_gas() as u64); assert_eq!(removed.len(), 1); assert_eq!(pool2.len(), 1); // root got popped - descendant should be skipped assert!(!pool2.contains(root_tx.id())); assert!(pool2.contains(descendant_tx.id())); } // remove root transaction via descendant tx fee let removed = pool.enforce_basefee(descendant_tx.max_fee_per_gas() as u64); assert_eq!(removed.len(), 2); assert!(pool.is_empty()); } #[test] fn truncate_parked_by_submission_id() { // this test ensures that we evict from the pending pool by sender let mut f = MockTransactionFactory::default(); let mut pool = ParkedPool::>::default(); let a_sender = address!("000000000000000000000000000000000000000a"); let b_sender = address!("000000000000000000000000000000000000000b"); let c_sender = address!("000000000000000000000000000000000000000c"); let d_sender = address!("000000000000000000000000000000000000000d"); // create a chain of transactions by sender A, B, C let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559); let a = tx_set.clone().into_vec(); let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec(); tx_set.extend(b.clone()); // C has the same number of txs as B let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec(); tx_set.extend(c.clone()); let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec(); tx_set.extend(d.clone()); let all_txs = tx_set.into_vec(); // just construct a list of all txs to add let expected_parked = vec![c[0].clone(), c[1].clone(), c[2].clone(), d[0].clone()] .into_iter() .map(|tx| (tx.sender(), tx.nonce())) .collect::>(); // we expect the truncate operation to go through the senders with the most txs, removing // txs based on when they were submitted, removing the oldest txs first, until the pool is // not over the limit let expected_removed = vec![ a[0].clone(), a[1].clone(), a[2].clone(), a[3].clone(), b[0].clone(), b[1].clone(), b[2].clone(), ] .into_iter() .map(|tx| (tx.sender(), tx.nonce())) .collect::>(); // add all the transactions to the pool for tx in all_txs { pool.add_transaction(f.validated_arc(tx)); } // we should end up with the most recently submitted transactions let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX }; // truncate the pool let removed = pool.truncate_pool(pool_limit); assert_eq!(removed.len(), expected_removed.len()); // get the inner txs from the removed txs let removed = removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); assert_eq!(removed, expected_removed); // get the parked pool let parked = pool.all().collect::>(); assert_eq!(parked.len(), expected_parked.len()); // get the inner txs from the parked txs let parked = parked.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); assert_eq!(parked, expected_parked); } #[test] fn test_truncate_parked_with_large_tx() { let mut f = MockTransactionFactory::default(); let mut pool = ParkedPool::>::default(); let default_limits = SubPoolLimit::default(); // create a chain of transactions by sender A // make sure they are all one over half the limit let a_sender = address!("000000000000000000000000000000000000000a"); // 2 txs, that should put the pool over the size limit but not max txs let a_txs = MockTransactionSet::dependent(a_sender, 0, 2, TxType::Eip1559) .into_iter() .map(|mut tx| { tx.set_size(default_limits.max_size / 2 + 1); tx }) .collect::>(); // add all the transactions to the pool for tx in a_txs { pool.add_transaction(f.validated_arc(tx)); } // truncate the pool, it should remove at least one transaction let removed = pool.truncate_pool(default_limits); assert_eq!(removed.len(), 1); } #[test] fn test_senders_by_submission_id() { // this test ensures that we evict from the pending pool by sender let mut f = MockTransactionFactory::default(); let mut pool = ParkedPool::>::default(); let a_sender = address!("000000000000000000000000000000000000000a"); let b_sender = address!("000000000000000000000000000000000000000b"); let c_sender = address!("000000000000000000000000000000000000000c"); let d_sender = address!("000000000000000000000000000000000000000d"); // create a chain of transactions by sender A, B, C let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, reth_primitives::TxType::Eip1559); let a = tx_set.clone().into_vec(); let b = MockTransactionSet::dependent(b_sender, 0, 3, reth_primitives::TxType::Eip1559) .into_vec(); tx_set.extend(b.clone()); // C has the same number of txs as B let c = MockTransactionSet::dependent(c_sender, 0, 3, reth_primitives::TxType::Eip1559) .into_vec(); tx_set.extend(c.clone()); let d = MockTransactionSet::dependent(d_sender, 0, 1, reth_primitives::TxType::Eip1559) .into_vec(); tx_set.extend(d.clone()); let all_txs = tx_set.into_vec(); // add all the transactions to the pool for tx in all_txs { pool.add_transaction(f.validated_arc(tx)); } // get senders by submission id - a4, b3, c3, d1, reversed let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::>(); assert_eq!(senders.len(), 4); let expected_senders = vec![d_sender, c_sender, b_sender, a_sender] .into_iter() .map(|s| f.ids.sender_id(&s).unwrap()) .collect::>(); assert_eq!(senders, expected_senders); // manually order the txs let mut pool = ParkedPool::>::default(); let all_txs = vec![d[0].clone(), b[0].clone(), c[0].clone(), a[0].clone()]; // add all the transactions to the pool for tx in all_txs { pool.add_transaction(f.validated_arc(tx)); } let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::>(); assert_eq!(senders.len(), 4); let expected_senders = vec![a_sender, c_sender, b_sender, d_sender] .into_iter() .map(|s| f.ids.sender_id(&s).unwrap()) .collect::>(); assert_eq!(senders, expected_senders); } #[test] fn test_add_sender_count_new_sender() { // Initialize a mock transaction factory let mut f = MockTransactionFactory::default(); // Create an empty transaction pool let mut pool = ParkedPool::>::default(); // Generate a validated transaction and add it to the pool let tx = f.validated_arc(MockTransaction::eip1559().inc_price()); pool.add_transaction(tx); // Define a new sender ID and submission ID let sender: SenderId = 11.into(); let submission_id = 1; // Add the sender count to the pool pool.add_sender_count(sender, submission_id); // Assert that the sender transaction count is updated correctly assert_eq!(pool.sender_transaction_count.len(), 2); let sender_info = pool.sender_transaction_count.get(&sender).unwrap(); assert_eq!(sender_info.count, 1); assert_eq!(sender_info.last_submission_id, submission_id); // Assert that the last sender submission is updated correctly assert_eq!(pool.last_sender_submission.len(), 2); let submission_info = pool.last_sender_submission.iter().next().unwrap(); assert_eq!(submission_info.sender_id, sender); assert_eq!(submission_info.submission_id, submission_id); } #[test] fn test_add_sender_count_existing_sender() { // Initialize a mock transaction factory let mut f = MockTransactionFactory::default(); // Create an empty transaction pool let mut pool = ParkedPool::>::default(); // Generate a validated transaction and add it to the pool let tx = f.validated_arc(MockTransaction::eip1559().inc_price()); pool.add_transaction(tx); // Define a sender ID and initial submission ID let sender: SenderId = 11.into(); let initial_submission_id = 1; // Add the sender count to the pool with the initial submission ID pool.add_sender_count(sender, initial_submission_id); // Define a new submission ID let new_submission_id = 2; // Add the sender count to the pool with the new submission ID pool.add_sender_count(sender, new_submission_id); // Assert that the sender transaction count is updated correctly assert_eq!(pool.sender_transaction_count.len(), 2); let sender_info = pool.sender_transaction_count.get(&sender).unwrap(); assert_eq!(sender_info.count, 2); assert_eq!(sender_info.last_submission_id, new_submission_id); // Assert that the last sender submission is updated correctly assert_eq!(pool.last_sender_submission.len(), 2); let submission_info = pool.last_sender_submission.iter().next().unwrap(); assert_eq!(submission_info.sender_id, sender); assert_eq!(submission_info.submission_id, new_submission_id); } #[test] fn test_add_sender_count_multiple_senders() { // Initialize a mock transaction factory let mut f = MockTransactionFactory::default(); // Create an empty transaction pool let mut pool = ParkedPool::>::default(); // Generate two validated transactions and add them to the pool let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price()); let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price()); pool.add_transaction(tx1); pool.add_transaction(tx2); // Define two different sender IDs and their corresponding submission IDs let sender1: SenderId = 11.into(); let sender2: SenderId = 22.into(); // Add the sender counts to the pool pool.add_sender_count(sender1, 1); pool.add_sender_count(sender2, 2); // Assert that the sender transaction counts are updated correctly assert_eq!(pool.sender_transaction_count.len(), 4); let sender1_info = pool.sender_transaction_count.get(&sender1).unwrap(); assert_eq!(sender1_info.count, 1); assert_eq!(sender1_info.last_submission_id, 1); let sender2_info = pool.sender_transaction_count.get(&sender2).unwrap(); assert_eq!(sender2_info.count, 1); assert_eq!(sender2_info.last_submission_id, 2); // Assert that the last sender submission is updated correctly assert_eq!(pool.last_sender_submission.len(), 3); // Verify that sender 1 is not in the last sender submission let submission_info1 = pool.last_sender_submission.iter().find(|info| info.sender_id == sender1); assert!(submission_info1.is_none()); // Verify that sender 2 is in the last sender submission let submission_info2 = pool.last_sender_submission.iter().find(|info| info.sender_id == sender2).unwrap(); assert_eq!(submission_info2.sender_id, sender2); assert_eq!(submission_info2.submission_id, 2); } }