//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages //! sent, number of errors, etc. use futures::Stream; use metrics::Counter; use reth_metrics_derive::Metrics; use std::{ pin::Pin, task::{ready, Context, Poll}, }; use tokio::sync::mpsc::{ self, error::{SendError, TryRecvError, TrySendError}, OwnedPermit, }; use tokio_util::sync::{PollSendError, PollSender}; /// Wrapper around [`mpsc::unbounded_channel`] that returns a new unbounded metered channel. pub fn metered_unbounded_channel( scope: &'static str, ) -> (UnboundedMeteredSender, UnboundedMeteredReceiver) { let (tx, rx) = mpsc::unbounded_channel(); (UnboundedMeteredSender::new(tx, scope), UnboundedMeteredReceiver::new(rx, scope)) } /// Wrapper around [`mpsc::channel`] that returns a new bounded metered channel with the given /// buffer size. pub fn metered_channel( buffer: usize, scope: &'static str, ) -> (MeteredSender, MeteredReceiver) { let (tx, rx) = mpsc::channel(buffer); (MeteredSender::new(tx, scope), MeteredReceiver::new(rx, scope)) } /// A wrapper type around [`UnboundedSender`](mpsc::UnboundedSender) that updates metrics on send. #[derive(Debug)] pub struct UnboundedMeteredSender { /// The [`UnboundedSender`](mpsc::UnboundedSender) that this wraps around sender: mpsc::UnboundedSender, /// Holds metrics for this type metrics: MeteredSenderMetrics, } impl UnboundedMeteredSender { /// Creates a new [`MeteredSender`] wrapping around the provided that updates metrics on send. // #[derive(Debug)] pub fn new(sender: mpsc::UnboundedSender, scope: &'static str) -> Self { Self { sender, metrics: MeteredSenderMetrics::new(scope) } } /// Calls the underlying that updates metrics on send. // #[derive(Debug)]'s `try_send`, incrementing the appropriate /// metrics depending on the result. pub fn send(&self, message: T) -> Result<(), SendError> { match self.sender.send(message) { Ok(()) => { self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => { self.metrics.send_errors_total.increment(1); Err(error) } } } } impl Clone for UnboundedMeteredSender { fn clone(&self) -> Self { Self { sender: self.sender.clone(), metrics: self.metrics.clone() } } } /// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive. #[derive(Debug)] pub struct UnboundedMeteredReceiver { /// The [Sender](mpsc::Sender) that this wraps around receiver: mpsc::UnboundedReceiver, /// Holds metrics for this type metrics: MeteredReceiverMetrics, } // === impl MeteredReceiver === impl UnboundedMeteredReceiver { /// Creates a new [`UnboundedMeteredReceiver`] wrapping around the provided /// [Receiver](mpsc::UnboundedReceiver) pub fn new(receiver: mpsc::UnboundedReceiver, scope: &'static str) -> Self { Self { receiver, metrics: MeteredReceiverMetrics::new(scope) } } /// Receives the next value for this receiver. pub async fn recv(&mut self) -> Option { let msg = self.receiver.recv().await; if msg.is_some() { self.metrics.messages_received_total.increment(1); } msg } /// Tries to receive the next value for this receiver. pub fn try_recv(&mut self) -> Result { let msg = self.receiver.try_recv()?; self.metrics.messages_received_total.increment(1); Ok(msg) } /// Closes the receiving half of a channel without dropping it. pub fn close(&mut self) { self.receiver.close(); } /// Polls to receive the next message on this channel. pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { let msg = ready!(self.receiver.poll_recv(cx)); if msg.is_some() { self.metrics.messages_received_total.increment(1); } Poll::Ready(msg) } } impl Stream for UnboundedMeteredReceiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_recv(cx) } } /// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send. #[derive(Debug)] pub struct MeteredSender { /// The [Sender](mpsc::Sender) that this wraps around sender: mpsc::Sender, /// Holds metrics for this type metrics: MeteredSenderMetrics, } impl MeteredSender { /// Creates a new [`MeteredSender`] wrapping around the provided [Sender](mpsc::Sender) pub fn new(sender: mpsc::Sender, scope: &'static str) -> Self { Self { sender, metrics: MeteredSenderMetrics::new(scope) } } /// Tries to acquire a permit to send a message. /// /// See also [Sender](mpsc::Sender)'s `try_reserve_owned`. pub fn try_reserve_owned(&self) -> Result, TrySendError>> { self.sender.clone().try_reserve_owned() } /// Returns the underlying [Sender](mpsc::Sender). pub const fn inner(&self) -> &mpsc::Sender { &self.sender } /// Calls the underlying [Sender](mpsc::Sender)'s `try_send`, incrementing the appropriate /// metrics depending on the result. pub fn try_send(&self, message: T) -> Result<(), TrySendError> { match self.sender.try_send(message) { Ok(()) => { self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => { self.metrics.send_errors_total.increment(1); Err(error) } } } /// Calls the underlying [Sender](mpsc::Sender)'s `send`, incrementing the appropriate /// metrics depending on the result. pub async fn send(&self, value: T) -> Result<(), SendError> { match self.sender.send(value).await { Ok(()) => { self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => { self.metrics.send_errors_total.increment(1); Err(error) } } } } impl Clone for MeteredSender { fn clone(&self) -> Self { Self { sender: self.sender.clone(), metrics: self.metrics.clone() } } } /// A wrapper type around [Receiver](mpsc::Receiver) that updates metrics on receive. #[derive(Debug)] pub struct MeteredReceiver { /// The [Sender](mpsc::Sender) that this wraps around receiver: mpsc::Receiver, /// Holds metrics for this type metrics: MeteredReceiverMetrics, } // === impl MeteredReceiver === impl MeteredReceiver { /// Creates a new [`MeteredReceiver`] wrapping around the provided [Receiver](mpsc::Receiver) pub fn new(receiver: mpsc::Receiver, scope: &'static str) -> Self { Self { receiver, metrics: MeteredReceiverMetrics::new(scope) } } /// Receives the next value for this receiver. pub async fn recv(&mut self) -> Option { let msg = self.receiver.recv().await; if msg.is_some() { self.metrics.messages_received_total.increment(1); } msg } /// Tries to receive the next value for this receiver. pub fn try_recv(&mut self) -> Result { let msg = self.receiver.try_recv()?; self.metrics.messages_received_total.increment(1); Ok(msg) } /// Closes the receiving half of a channel without dropping it. pub fn close(&mut self) { self.receiver.close(); } /// Polls to receive the next message on this channel. pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { let msg = ready!(self.receiver.poll_recv(cx)); if msg.is_some() { self.metrics.messages_received_total.increment(1); } Poll::Ready(msg) } } impl Stream for MeteredReceiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_recv(cx) } } /// Throughput metrics for [`MeteredSender`] #[derive(Clone, Metrics)] #[metrics(dynamic = true)] struct MeteredSenderMetrics { /// Number of messages sent messages_sent_total: Counter, /// Number of failed message deliveries send_errors_total: Counter, } /// Throughput metrics for [`MeteredReceiver`] #[derive(Clone, Metrics)] #[metrics(dynamic = true)] struct MeteredReceiverMetrics { /// Number of messages received messages_received_total: Counter, } /// A wrapper type around [`PollSender`] that updates metrics on send. #[derive(Debug)] pub struct MeteredPollSender { /// The [`PollSender`] that this wraps around. sender: PollSender, /// Holds metrics for this type. metrics: MeteredPollSenderMetrics, } impl MeteredPollSender { /// Creates a new [`MeteredPollSender`] wrapping around the provided [`PollSender`]. pub fn new(sender: PollSender, scope: &'static str) -> Self { Self { sender, metrics: MeteredPollSenderMetrics::new(scope) } } /// Returns the underlying [`PollSender`]. pub const fn inner(&self) -> &PollSender { &self.sender } /// Calls the underlying [`PollSender`]'s `poll_reserve`, incrementing the appropriate /// metrics depending on the result. pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll>> { match self.sender.poll_reserve(cx) { Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)), Poll::Ready(Err(error)) => Poll::Ready(Err(error)), Poll::Pending => { self.metrics.back_pressure_total.increment(1); Poll::Pending } } } /// Calls the underlying [`PollSender`]'s `send_item`, incrementing the appropriate /// metrics depending on the result. pub fn send_item(&mut self, item: T) -> Result<(), PollSendError> { match self.sender.send_item(item) { Ok(()) => { self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => Err(error), } } } impl Clone for MeteredPollSender { fn clone(&self) -> Self { Self { sender: self.sender.clone(), metrics: self.metrics.clone() } } } /// Throughput metrics for [`MeteredPollSender`] #[derive(Clone, Metrics)] #[metrics(dynamic = true)] struct MeteredPollSenderMetrics { /// Number of messages sent messages_sent_total: Counter, /// Number of delayed message deliveries caused by a full channel back_pressure_total: Counter, }