reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    metrics::TxPoolValidatorMetrics,
6    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8    TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_primitives_traits::{Block, SealedBlock};
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15    sync,
16    sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20/// Represents a future outputting unit type and is sendable.
21type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23/// Represents a stream of validation futures.
24type ValidationStream = ReceiverStream<ValidationFuture>;
25
26/// A service that performs validation jobs.
27///
28/// This listens for incoming validation jobs and executes them.
29///
30/// This should be spawned as a task: [`ValidationTask::run`]
31#[derive(Clone)]
32pub struct ValidationTask {
33    validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37    /// Creates a new cloneable task pair.
38    ///
39    /// The sender sends new (transaction) validation tasks to an available validation task.
40    pub fn new() -> (ValidationJobSender, Self) {
41        Self::with_capacity(1)
42    }
43
44    /// Creates a new cloneable task pair with the given channel capacity.
45    pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
46        let (tx, rx) = mpsc::channel(capacity);
47        let metrics = TxPoolValidatorMetrics::default();
48        (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
49    }
50
51    /// Creates a new task with the given receiver.
52    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
53        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
54    }
55
56    /// Executes all new validation jobs that come in.
57    ///
58    /// This will run as long as the channel is alive and is expected to be spawned as a task.
59    pub async fn run(self) {
60        while let Some(task) = self.validation_jobs.lock().await.next().await {
61            task.await;
62        }
63    }
64}
65
66impl std::fmt::Debug for ValidationTask {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
69    }
70}
71
72/// A sender new type for sending validation jobs to [`ValidationTask`].
73#[derive(Debug)]
74pub struct ValidationJobSender {
75    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
76    metrics: TxPoolValidatorMetrics,
77}
78
79impl ValidationJobSender {
80    /// Sends the given job to the validation task.
81    pub async fn send(
82        &self,
83        job: Pin<Box<dyn Future<Output = ()> + Send>>,
84    ) -> Result<(), TransactionValidatorError> {
85        self.metrics.inflight_validation_jobs.increment(1);
86        let res = self
87            .tx
88            .send(job)
89            .await
90            .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
91        self.metrics.inflight_validation_jobs.decrement(1);
92        res
93    }
94}
95
96/// A [`TransactionValidator`] implementation that validates ethereum transaction.
97/// This validator is non-blocking, all validation work is done in a separate task.
98#[derive(Debug)]
99pub struct TransactionValidationTaskExecutor<V> {
100    /// The validator that will validate transactions on a separate task.
101    pub validator: Arc<V>,
102    /// The sender half to validation tasks that perform the actual validation.
103    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
104}
105
106impl<V> Clone for TransactionValidationTaskExecutor<V> {
107    fn clone(&self) -> Self {
108        Self {
109            validator: self.validator.clone(),
110            to_validation_task: self.to_validation_task.clone(),
111        }
112    }
113}
114
115// === impl TransactionValidationTaskExecutor ===
116
117impl TransactionValidationTaskExecutor<()> {
118    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
119    pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
120        EthTransactionValidatorBuilder::new(client)
121    }
122}
123
124impl<V> TransactionValidationTaskExecutor<V> {
125    /// Maps the given validator to a new type.
126    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
127    where
128        F: FnMut(V) -> T,
129    {
130        TransactionValidationTaskExecutor {
131            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
132            to_validation_task: self.to_validation_task,
133        }
134    }
135
136    /// Returns the validator.
137    pub fn validator(&self) -> &V {
138        &self.validator
139    }
140}
141
142impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
143    /// Creates a new instance for the given client
144    ///
145    /// This will spawn a single validation tasks that performs the actual validation.
146    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
147    pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
148    where
149        T: TaskSpawner,
150    {
151        Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
152    }
153
154    /// Creates a new instance for the given client
155    ///
156    /// By default this will enable support for:
157    ///   - shanghai
158    ///   - eip1559
159    ///   - eip2930
160    ///
161    /// This will always spawn a validation task that performs the actual validation. It will spawn
162    /// `num_additional_tasks` additional tasks.
163    pub fn eth_with_additional_tasks<T, S: BlobStore>(
164        client: Client,
165        blob_store: S,
166        tasks: T,
167        num_additional_tasks: usize,
168    ) -> Self
169    where
170        T: TaskSpawner,
171    {
172        EthTransactionValidatorBuilder::new(client)
173            .with_additional_tasks(num_additional_tasks)
174            .build_with_tasks::<Tx, T, S>(tasks, blob_store)
175    }
176}
177
178impl<V> TransactionValidationTaskExecutor<V> {
179    /// Creates a new executor instance with the given validator for transaction validation.
180    ///
181    /// Initializes the executor with the provided validator and sets up communication for
182    /// validation tasks.
183    pub fn new(validator: V) -> Self {
184        let (tx, _) = ValidationTask::new();
185        Self { validator: Arc::new(validator), to_validation_task: Arc::new(sync::Mutex::new(tx)) }
186    }
187}
188
189impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
190where
191    V: TransactionValidator + 'static,
192{
193    type Transaction = <V as TransactionValidator>::Transaction;
194
195    async fn validate_transaction(
196        &self,
197        origin: TransactionOrigin,
198        transaction: Self::Transaction,
199    ) -> TransactionValidationOutcome<Self::Transaction> {
200        let hash = *transaction.hash();
201        let (tx, rx) = oneshot::channel();
202        {
203            let res = {
204                let to_validation_task = self.to_validation_task.clone();
205                let validator = self.validator.clone();
206                let fut = Box::pin(async move {
207                    let res = validator.validate_transaction(origin, transaction).await;
208                    let _ = tx.send(res);
209                });
210                let to_validation_task = to_validation_task.lock().await;
211                to_validation_task.send(fut).await
212            };
213            if res.is_err() {
214                return TransactionValidationOutcome::Error(
215                    hash,
216                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
217                );
218            }
219        }
220
221        match rx.await {
222            Ok(res) => res,
223            Err(_) => TransactionValidationOutcome::Error(
224                hash,
225                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
226            ),
227        }
228    }
229
230    async fn validate_transactions(
231        &self,
232        transactions: Vec<(TransactionOrigin, Self::Transaction)>,
233    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
234        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
235        let (tx, rx) = oneshot::channel();
236        {
237            let res = {
238                let to_validation_task = self.to_validation_task.clone();
239                let validator = self.validator.clone();
240                let fut = Box::pin(async move {
241                    let res = validator.validate_transactions(transactions).await;
242                    let _ = tx.send(res);
243                });
244                let to_validation_task = to_validation_task.lock().await;
245                to_validation_task.send(fut).await
246            };
247            if res.is_err() {
248                return hashes
249                    .into_iter()
250                    .map(|hash| {
251                        TransactionValidationOutcome::Error(
252                            hash,
253                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
254                        )
255                    })
256                    .collect();
257            }
258        }
259        match rx.await {
260            Ok(res) => res,
261            Err(_) => hashes
262                .into_iter()
263                .map(|hash| {
264                    TransactionValidationOutcome::Error(
265                        hash,
266                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
267                    )
268                })
269                .collect(),
270        }
271    }
272
273    async fn validate_transactions_with_origin(
274        &self,
275        origin: TransactionOrigin,
276        transactions: impl IntoIterator<Item = Self::Transaction> + Send,
277    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
278        self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
279    }
280
281    fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
282    where
283        B: Block,
284    {
285        self.validator.on_new_head_block(new_tip_block)
286    }
287}