reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    metrics::TxPoolValidatorMetrics,
6    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8    TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_primitives_traits::SealedBlock;
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15    sync,
16    sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20/// Represents a future outputting unit type and is sendable.
21type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23/// Represents a stream of validation futures.
24type ValidationStream = ReceiverStream<ValidationFuture>;
25
26/// A service that performs validation jobs.
27///
28/// This listens for incoming validation jobs and executes them.
29///
30/// This should be spawned as a task: [`ValidationTask::run`]
31#[derive(Clone)]
32pub struct ValidationTask {
33    validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37    /// Creates a new cloneable task pair.
38    ///
39    /// The sender sends new (transaction) validation tasks to an available validation task.
40    pub fn new() -> (ValidationJobSender, Self) {
41        Self::with_capacity(1)
42    }
43
44    /// Creates a new cloneable task pair with the given channel capacity.
45    pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
46        let (tx, rx) = mpsc::channel(capacity);
47        let metrics = TxPoolValidatorMetrics::default();
48        (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
49    }
50
51    /// Creates a new task with the given receiver.
52    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
53        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
54    }
55
56    /// Executes all new validation jobs that come in.
57    ///
58    /// This will run as long as the channel is alive and is expected to be spawned as a task.
59    pub async fn run(self) {
60        while let Some(task) = self.validation_jobs.lock().await.next().await {
61            task.await;
62        }
63    }
64}
65
66impl std::fmt::Debug for ValidationTask {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
69    }
70}
71
72/// A sender new type for sending validation jobs to [`ValidationTask`].
73#[derive(Debug)]
74pub struct ValidationJobSender {
75    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
76    metrics: TxPoolValidatorMetrics,
77}
78
79impl ValidationJobSender {
80    /// Sends the given job to the validation task.
81    pub async fn send(
82        &self,
83        job: Pin<Box<dyn Future<Output = ()> + Send>>,
84    ) -> Result<(), TransactionValidatorError> {
85        self.metrics.inflight_validation_jobs.increment(1);
86        let res = self
87            .tx
88            .send(job)
89            .await
90            .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
91        self.metrics.inflight_validation_jobs.decrement(1);
92        res
93    }
94}
95
96/// A [`TransactionValidator`] implementation that validates ethereum transaction.
97/// This validator is non-blocking, all validation work is done in a separate task.
98#[derive(Debug)]
99pub struct TransactionValidationTaskExecutor<V> {
100    /// The validator that will validate transactions on a separate task.
101    pub validator: Arc<V>,
102    /// The sender half to validation tasks that perform the actual validation.
103    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
104}
105
106impl<V> Clone for TransactionValidationTaskExecutor<V> {
107    fn clone(&self) -> Self {
108        Self {
109            validator: self.validator.clone(),
110            to_validation_task: self.to_validation_task.clone(),
111        }
112    }
113}
114
115// === impl TransactionValidationTaskExecutor ===
116
117impl TransactionValidationTaskExecutor<()> {
118    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
119    pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
120        EthTransactionValidatorBuilder::new(client)
121    }
122}
123
124impl<V> TransactionValidationTaskExecutor<V> {
125    /// Maps the given validator to a new type.
126    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
127    where
128        F: FnMut(V) -> T,
129    {
130        TransactionValidationTaskExecutor {
131            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
132            to_validation_task: self.to_validation_task,
133        }
134    }
135
136    /// Returns the validator.
137    pub fn validator(&self) -> &V {
138        &self.validator
139    }
140}
141
142impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
143    /// Creates a new instance for the given client
144    ///
145    /// This will spawn a single validation tasks that performs the actual validation.
146    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
147    pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
148    where
149        T: TaskSpawner,
150    {
151        Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
152    }
153
154    /// Creates a new instance for the given client
155    ///
156    /// By default this will enable support for:
157    ///   - shanghai
158    ///   - eip1559
159    ///   - eip2930
160    ///
161    /// This will always spawn a validation task that performs the actual validation. It will spawn
162    /// `num_additional_tasks` additional tasks.
163    pub fn eth_with_additional_tasks<T, S: BlobStore>(
164        client: Client,
165        blob_store: S,
166        tasks: T,
167        num_additional_tasks: usize,
168    ) -> Self
169    where
170        T: TaskSpawner,
171    {
172        EthTransactionValidatorBuilder::new(client)
173            .with_additional_tasks(num_additional_tasks)
174            .build_with_tasks(tasks, blob_store)
175    }
176}
177
178impl<V> TransactionValidationTaskExecutor<V> {
179    /// Creates a new executor instance with the given validator for transaction validation.
180    ///
181    /// Initializes the executor with the provided validator and sets up communication for
182    /// validation tasks.
183    pub fn new(validator: V) -> (Self, ValidationTask) {
184        let (tx, task) = ValidationTask::new();
185        (
186            Self {
187                validator: Arc::new(validator),
188                to_validation_task: Arc::new(sync::Mutex::new(tx)),
189            },
190            task,
191        )
192    }
193}
194
195impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
196where
197    V: TransactionValidator + 'static,
198{
199    type Transaction = <V as TransactionValidator>::Transaction;
200    type Block = V::Block;
201
202    async fn validate_transaction(
203        &self,
204        origin: TransactionOrigin,
205        transaction: Self::Transaction,
206    ) -> TransactionValidationOutcome<Self::Transaction> {
207        let hash = *transaction.hash();
208        let (tx, rx) = oneshot::channel();
209        {
210            let res = {
211                let to_validation_task = self.to_validation_task.clone();
212                let validator = self.validator.clone();
213                let fut = Box::pin(async move {
214                    let res = validator.validate_transaction(origin, transaction).await;
215                    let _ = tx.send(res);
216                });
217                let to_validation_task = to_validation_task.lock().await;
218                to_validation_task.send(fut).await
219            };
220            if res.is_err() {
221                return TransactionValidationOutcome::Error(
222                    hash,
223                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
224                );
225            }
226        }
227
228        match rx.await {
229            Ok(res) => res,
230            Err(_) => TransactionValidationOutcome::Error(
231                hash,
232                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
233            ),
234        }
235    }
236
237    async fn validate_transactions(
238        &self,
239        transactions: Vec<(TransactionOrigin, Self::Transaction)>,
240    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
241        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
242        let (tx, rx) = oneshot::channel();
243        {
244            let res = {
245                let to_validation_task = self.to_validation_task.clone();
246                let validator = self.validator.clone();
247                let fut = Box::pin(async move {
248                    let res = validator.validate_transactions(transactions).await;
249                    let _ = tx.send(res);
250                });
251                let to_validation_task = to_validation_task.lock().await;
252                to_validation_task.send(fut).await
253            };
254            if res.is_err() {
255                return hashes
256                    .into_iter()
257                    .map(|hash| {
258                        TransactionValidationOutcome::Error(
259                            hash,
260                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
261                        )
262                    })
263                    .collect();
264            }
265        }
266        match rx.await {
267            Ok(res) => res,
268            Err(_) => hashes
269                .into_iter()
270                .map(|hash| {
271                    TransactionValidationOutcome::Error(
272                        hash,
273                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
274                    )
275                })
276                .collect(),
277        }
278    }
279
280    async fn validate_transactions_with_origin(
281        &self,
282        origin: TransactionOrigin,
283        transactions: impl IntoIterator<Item = Self::Transaction> + Send,
284    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
285        self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
286    }
287
288    fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
289        self.validator.on_new_head_block(new_tip_block)
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::{
297        test_utils::MockTransaction,
298        validate::{TransactionValidationOutcome, ValidTransaction},
299        TransactionOrigin,
300    };
301    use alloy_primitives::{Address, U256};
302
303    #[derive(Debug)]
304    struct NoopValidator;
305
306    impl TransactionValidator for NoopValidator {
307        type Transaction = MockTransaction;
308        type Block = reth_ethereum_primitives::Block;
309
310        async fn validate_transaction(
311            &self,
312            _origin: TransactionOrigin,
313            transaction: Self::Transaction,
314        ) -> TransactionValidationOutcome<Self::Transaction> {
315            TransactionValidationOutcome::Valid {
316                balance: U256::ZERO,
317                state_nonce: 0,
318                bytecode_hash: None,
319                transaction: ValidTransaction::Valid(transaction),
320                propagate: false,
321                authorities: Some(Vec::<Address>::new()),
322            }
323        }
324    }
325
326    #[tokio::test]
327    async fn executor_new_spawns_and_validates_single() {
328        let validator = NoopValidator;
329        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
330        tokio::spawn(task.run());
331        let tx = MockTransaction::legacy();
332        let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
333        assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
334    }
335
336    #[tokio::test]
337    async fn executor_new_spawns_and_validates_batch() {
338        let validator = NoopValidator;
339        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
340        tokio::spawn(task.run());
341        let txs = vec![
342            (TransactionOrigin::External, MockTransaction::legacy()),
343            (TransactionOrigin::Local, MockTransaction::legacy()),
344        ];
345        let out = executor.validate_transactions(txs).await;
346        assert_eq!(out.len(), 2);
347        assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
348    }
349}