reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    metrics::TxPoolValidatorMetrics,
6    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8    TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_primitives_traits::{Block, SealedBlock};
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15    sync,
16    sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20/// Represents a future outputting unit type and is sendable.
21type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23/// Represents a stream of validation futures.
24type ValidationStream = ReceiverStream<ValidationFuture>;
25
26/// A service that performs validation jobs.
27///
28/// This listens for incoming validation jobs and executes them.
29///
30/// This should be spawned as a task: [`ValidationTask::run`]
31#[derive(Clone)]
32pub struct ValidationTask {
33    validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37    /// Creates a new cloneable task pair.
38    ///
39    /// The sender sends new (transaction) validation tasks to an available validation task.
40    pub fn new() -> (ValidationJobSender, Self) {
41        Self::with_capacity(1)
42    }
43
44    /// Creates a new cloneable task pair with the given channel capacity.
45    pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
46        let (tx, rx) = mpsc::channel(capacity);
47        let metrics = TxPoolValidatorMetrics::default();
48        (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
49    }
50
51    /// Creates a new task with the given receiver.
52    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
53        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
54    }
55
56    /// Executes all new validation jobs that come in.
57    ///
58    /// This will run as long as the channel is alive and is expected to be spawned as a task.
59    pub async fn run(self) {
60        while let Some(task) = self.validation_jobs.lock().await.next().await {
61            task.await;
62        }
63    }
64}
65
66impl std::fmt::Debug for ValidationTask {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
69    }
70}
71
72/// A sender new type for sending validation jobs to [`ValidationTask`].
73#[derive(Debug)]
74pub struct ValidationJobSender {
75    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
76    metrics: TxPoolValidatorMetrics,
77}
78
79impl ValidationJobSender {
80    /// Sends the given job to the validation task.
81    pub async fn send(
82        &self,
83        job: Pin<Box<dyn Future<Output = ()> + Send>>,
84    ) -> Result<(), TransactionValidatorError> {
85        self.metrics.inflight_validation_jobs.increment(1);
86        let res = self
87            .tx
88            .send(job)
89            .await
90            .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
91        self.metrics.inflight_validation_jobs.decrement(1);
92        res
93    }
94}
95
96/// A [`TransactionValidator`] implementation that validates ethereum transaction.
97/// This validator is non-blocking, all validation work is done in a separate task.
98#[derive(Debug)]
99pub struct TransactionValidationTaskExecutor<V> {
100    /// The validator that will validate transactions on a separate task.
101    pub validator: Arc<V>,
102    /// The sender half to validation tasks that perform the actual validation.
103    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
104}
105
106impl<V> Clone for TransactionValidationTaskExecutor<V> {
107    fn clone(&self) -> Self {
108        Self {
109            validator: self.validator.clone(),
110            to_validation_task: self.to_validation_task.clone(),
111        }
112    }
113}
114
115// === impl TransactionValidationTaskExecutor ===
116
117impl TransactionValidationTaskExecutor<()> {
118    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
119    pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
120        EthTransactionValidatorBuilder::new(client)
121    }
122}
123
124impl<V> TransactionValidationTaskExecutor<V> {
125    /// Maps the given validator to a new type.
126    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
127    where
128        F: FnMut(V) -> T,
129    {
130        TransactionValidationTaskExecutor {
131            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
132            to_validation_task: self.to_validation_task,
133        }
134    }
135
136    /// Returns the validator.
137    pub fn validator(&self) -> &V {
138        &self.validator
139    }
140}
141
142impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
143    /// Creates a new instance for the given client
144    ///
145    /// This will spawn a single validation tasks that performs the actual validation.
146    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
147    pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
148    where
149        T: TaskSpawner,
150    {
151        Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
152    }
153
154    /// Creates a new instance for the given client
155    ///
156    /// By default this will enable support for:
157    ///   - shanghai
158    ///   - eip1559
159    ///   - eip2930
160    ///
161    /// This will always spawn a validation task that performs the actual validation. It will spawn
162    /// `num_additional_tasks` additional tasks.
163    pub fn eth_with_additional_tasks<T, S: BlobStore>(
164        client: Client,
165        blob_store: S,
166        tasks: T,
167        num_additional_tasks: usize,
168    ) -> Self
169    where
170        T: TaskSpawner,
171    {
172        EthTransactionValidatorBuilder::new(client)
173            .with_additional_tasks(num_additional_tasks)
174            .build_with_tasks::<Tx, T, S>(tasks, blob_store)
175    }
176}
177
178impl<V> TransactionValidationTaskExecutor<V> {
179    /// Creates a new executor instance with the given validator for transaction validation.
180    ///
181    /// Initializes the executor with the provided validator and sets up communication for
182    /// validation tasks.
183    pub fn new(validator: V) -> (Self, ValidationTask) {
184        let (tx, task) = ValidationTask::new();
185        (
186            Self {
187                validator: Arc::new(validator),
188                to_validation_task: Arc::new(sync::Mutex::new(tx)),
189            },
190            task,
191        )
192    }
193}
194
195impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
196where
197    V: TransactionValidator + 'static,
198{
199    type Transaction = <V as TransactionValidator>::Transaction;
200
201    async fn validate_transaction(
202        &self,
203        origin: TransactionOrigin,
204        transaction: Self::Transaction,
205    ) -> TransactionValidationOutcome<Self::Transaction> {
206        let hash = *transaction.hash();
207        let (tx, rx) = oneshot::channel();
208        {
209            let res = {
210                let to_validation_task = self.to_validation_task.clone();
211                let validator = self.validator.clone();
212                let fut = Box::pin(async move {
213                    let res = validator.validate_transaction(origin, transaction).await;
214                    let _ = tx.send(res);
215                });
216                let to_validation_task = to_validation_task.lock().await;
217                to_validation_task.send(fut).await
218            };
219            if res.is_err() {
220                return TransactionValidationOutcome::Error(
221                    hash,
222                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
223                );
224            }
225        }
226
227        match rx.await {
228            Ok(res) => res,
229            Err(_) => TransactionValidationOutcome::Error(
230                hash,
231                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
232            ),
233        }
234    }
235
236    async fn validate_transactions(
237        &self,
238        transactions: Vec<(TransactionOrigin, Self::Transaction)>,
239    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
240        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
241        let (tx, rx) = oneshot::channel();
242        {
243            let res = {
244                let to_validation_task = self.to_validation_task.clone();
245                let validator = self.validator.clone();
246                let fut = Box::pin(async move {
247                    let res = validator.validate_transactions(transactions).await;
248                    let _ = tx.send(res);
249                });
250                let to_validation_task = to_validation_task.lock().await;
251                to_validation_task.send(fut).await
252            };
253            if res.is_err() {
254                return hashes
255                    .into_iter()
256                    .map(|hash| {
257                        TransactionValidationOutcome::Error(
258                            hash,
259                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
260                        )
261                    })
262                    .collect();
263            }
264        }
265        match rx.await {
266            Ok(res) => res,
267            Err(_) => hashes
268                .into_iter()
269                .map(|hash| {
270                    TransactionValidationOutcome::Error(
271                        hash,
272                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
273                    )
274                })
275                .collect(),
276        }
277    }
278
279    async fn validate_transactions_with_origin(
280        &self,
281        origin: TransactionOrigin,
282        transactions: impl IntoIterator<Item = Self::Transaction> + Send,
283    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
284        self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
285    }
286
287    fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
288    where
289        B: Block,
290    {
291        self.validator.on_new_head_block(new_tip_block)
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::{
299        test_utils::MockTransaction,
300        validate::{TransactionValidationOutcome, ValidTransaction},
301        TransactionOrigin,
302    };
303    use alloy_primitives::{Address, U256};
304
305    #[derive(Debug)]
306    struct NoopValidator;
307
308    impl TransactionValidator for NoopValidator {
309        type Transaction = MockTransaction;
310
311        async fn validate_transaction(
312            &self,
313            _origin: TransactionOrigin,
314            transaction: Self::Transaction,
315        ) -> TransactionValidationOutcome<Self::Transaction> {
316            TransactionValidationOutcome::Valid {
317                balance: U256::ZERO,
318                state_nonce: 0,
319                bytecode_hash: None,
320                transaction: ValidTransaction::Valid(transaction),
321                propagate: false,
322                authorities: Some(Vec::<Address>::new()),
323            }
324        }
325    }
326
327    #[tokio::test]
328    async fn executor_new_spawns_and_validates_single() {
329        let validator = NoopValidator;
330        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
331        tokio::spawn(task.run());
332        let tx = MockTransaction::legacy();
333        let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
334        assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
335    }
336
337    #[tokio::test]
338    async fn executor_new_spawns_and_validates_batch() {
339        let validator = NoopValidator;
340        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
341        tokio::spawn(task.run());
342        let txs = vec![
343            (TransactionOrigin::External, MockTransaction::legacy()),
344            (TransactionOrigin::Local, MockTransaction::legacy()),
345        ];
346        let out = executor.validate_transactions(txs).await;
347        assert_eq!(out.len(), 2);
348        assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
349    }
350}