Skip to main content

reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    metrics::TxPoolValidatorMetrics,
6    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8    TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
12use reth_evm::ConfigureEvm;
13use reth_primitives_traits::{HeaderTy, SealedBlock};
14use reth_storage_api::BlockReaderIdExt;
15use reth_tasks::Runtime;
16use std::{future::Future, pin::Pin, sync::Arc};
17use tokio::{
18    sync,
19    sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::ReceiverStream;
22
23/// Represents a future outputting unit type and is sendable.
24type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
25
26/// Represents a stream of validation futures.
27type ValidationStream = ReceiverStream<ValidationFuture>;
28
29/// A service that performs validation jobs.
30///
31/// This listens for incoming validation jobs and executes them.
32///
33/// This should be spawned as a task: [`ValidationTask::run`]
34#[derive(Clone)]
35pub struct ValidationTask {
36    validation_jobs: Arc<Mutex<ValidationStream>>,
37}
38
39impl ValidationTask {
40    /// Creates a new cloneable task pair.
41    ///
42    /// The sender sends new (transaction) validation tasks to an available validation task.
43    pub fn new() -> (ValidationJobSender, Self) {
44        Self::with_capacity(1)
45    }
46
47    /// Creates a new cloneable task pair with the given channel capacity.
48    pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
49        let (tx, rx) = mpsc::channel(capacity);
50        let metrics = TxPoolValidatorMetrics::default();
51        (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
52    }
53
54    /// Creates a new task with the given receiver.
55    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
56        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
57    }
58
59    /// Executes all new validation jobs that come in.
60    ///
61    /// This will run as long as the channel is alive and is expected to be spawned as a task.
62    pub async fn run(self) {
63        while let Some(task) = self.validation_jobs.lock().await.next().await {
64            task.await;
65        }
66    }
67}
68
69impl std::fmt::Debug for ValidationTask {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
72    }
73}
74
75/// A sender new type for sending validation jobs to [`ValidationTask`].
76#[derive(Debug)]
77pub struct ValidationJobSender {
78    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
79    metrics: TxPoolValidatorMetrics,
80}
81
82impl ValidationJobSender {
83    /// Sends the given job to the validation task.
84    pub async fn send(
85        &self,
86        job: Pin<Box<dyn Future<Output = ()> + Send>>,
87    ) -> Result<(), TransactionValidatorError> {
88        self.metrics.inflight_validation_jobs.increment(1);
89        let res = self
90            .tx
91            .send(job)
92            .await
93            .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
94        self.metrics.inflight_validation_jobs.decrement(1);
95        res
96    }
97}
98
99/// A [`TransactionValidator`] implementation that validates ethereum transaction.
100/// This validator is non-blocking, all validation work is done in a separate task.
101#[derive(Debug)]
102pub struct TransactionValidationTaskExecutor<V> {
103    /// The validator that will validate transactions on a separate task.
104    pub validator: Arc<V>,
105    /// The sender half to validation tasks that perform the actual validation.
106    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
107}
108
109impl<V> Clone for TransactionValidationTaskExecutor<V> {
110    fn clone(&self) -> Self {
111        Self {
112            validator: self.validator.clone(),
113            to_validation_task: self.to_validation_task.clone(),
114        }
115    }
116}
117
118// === impl TransactionValidationTaskExecutor ===
119
120impl TransactionValidationTaskExecutor<()> {
121    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
122    pub fn eth_builder<Client, Evm>(
123        client: Client,
124        evm_config: Evm,
125    ) -> EthTransactionValidatorBuilder<Client, Evm>
126    where
127        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
128            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
129        Evm: ConfigureEvm,
130    {
131        EthTransactionValidatorBuilder::new(client, evm_config)
132    }
133}
134
135impl<V> TransactionValidationTaskExecutor<V> {
136    /// Maps the given validator to a new type.
137    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
138    where
139        F: FnMut(V) -> T,
140    {
141        TransactionValidationTaskExecutor {
142            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
143            to_validation_task: self.to_validation_task,
144        }
145    }
146
147    /// Returns the validator.
148    pub fn validator(&self) -> &V {
149        &self.validator
150    }
151}
152
153impl<Client, Tx, Evm> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx, Evm>> {
154    /// Creates a new instance for the given client
155    ///
156    /// This will spawn a single validation tasks that performs the actual validation.
157    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
158    pub fn eth<S: BlobStore>(client: Client, evm_config: Evm, blob_store: S, tasks: Runtime) -> Self
159    where
160        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
161            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
162        Evm: ConfigureEvm,
163    {
164        Self::eth_with_additional_tasks(client, evm_config, blob_store, tasks, 0)
165    }
166
167    /// Creates a new instance for the given client
168    ///
169    /// By default this will enable support for:
170    ///   - shanghai
171    ///   - eip1559
172    ///   - eip2930
173    ///
174    /// This will always spawn a validation task that performs the actual validation. It will spawn
175    /// `num_additional_tasks` additional tasks.
176    pub fn eth_with_additional_tasks<S: BlobStore>(
177        client: Client,
178        evm_config: Evm,
179        blob_store: S,
180        tasks: Runtime,
181        num_additional_tasks: usize,
182    ) -> Self
183    where
184        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
185            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
186        Evm: ConfigureEvm,
187    {
188        EthTransactionValidatorBuilder::new(client, evm_config)
189            .with_additional_tasks(num_additional_tasks)
190            .build_with_tasks(tasks, blob_store)
191    }
192}
193
194impl<V> TransactionValidationTaskExecutor<V> {
195    /// Creates a new executor instance with the given validator for transaction validation.
196    ///
197    /// Initializes the executor with the provided validator and sets up communication for
198    /// validation tasks.
199    pub fn new(validator: V) -> (Self, ValidationTask) {
200        let (tx, task) = ValidationTask::new();
201        (
202            Self {
203                validator: Arc::new(validator),
204                to_validation_task: Arc::new(sync::Mutex::new(tx)),
205            },
206            task,
207        )
208    }
209}
210
211impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
212where
213    V: TransactionValidator + 'static,
214{
215    type Transaction = <V as TransactionValidator>::Transaction;
216    type Block = V::Block;
217
218    async fn validate_transaction(
219        &self,
220        origin: TransactionOrigin,
221        transaction: Self::Transaction,
222    ) -> TransactionValidationOutcome<Self::Transaction> {
223        let hash = *transaction.hash();
224        let (tx, rx) = oneshot::channel();
225        {
226            let res = {
227                let to_validation_task = self.to_validation_task.clone();
228                let validator = self.validator.clone();
229                let fut = Box::pin(async move {
230                    let res = validator.validate_transaction(origin, transaction).await;
231                    let _ = tx.send(res);
232                });
233                let to_validation_task = to_validation_task.lock().await;
234                to_validation_task.send(fut).await
235            };
236            if res.is_err() {
237                return TransactionValidationOutcome::Error(
238                    hash,
239                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
240                );
241            }
242        }
243
244        match rx.await {
245            Ok(res) => res,
246            Err(_) => TransactionValidationOutcome::Error(
247                hash,
248                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
249            ),
250        }
251    }
252
253    async fn validate_transactions(
254        &self,
255        transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction), IntoIter: Send>
256            + Send,
257    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
258        let transactions: Vec<_> = transactions.into_iter().collect();
259        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
260        let (tx, rx) = oneshot::channel();
261        {
262            let res = {
263                let to_validation_task = self.to_validation_task.clone();
264                let validator = self.validator.clone();
265                let fut = Box::pin(async move {
266                    let res = validator.validate_transactions(transactions).await;
267                    let _ = tx.send(res);
268                });
269                let to_validation_task = to_validation_task.lock().await;
270                to_validation_task.send(fut).await
271            };
272            if res.is_err() {
273                return hashes
274                    .into_iter()
275                    .map(|hash| {
276                        TransactionValidationOutcome::Error(
277                            hash,
278                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
279                        )
280                    })
281                    .collect();
282            }
283        }
284        match rx.await {
285            Ok(res) => res,
286            Err(_) => hashes
287                .into_iter()
288                .map(|hash| {
289                    TransactionValidationOutcome::Error(
290                        hash,
291                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
292                    )
293                })
294                .collect(),
295        }
296    }
297
298    fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
299        self.validator.on_new_head_block(new_tip_block)
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::{
307        test_utils::MockTransaction,
308        validate::{TransactionValidationOutcome, ValidTransaction},
309        TransactionOrigin,
310    };
311    use alloy_primitives::{Address, U256};
312
313    #[derive(Debug)]
314    struct NoopValidator;
315
316    impl TransactionValidator for NoopValidator {
317        type Transaction = MockTransaction;
318        type Block = reth_ethereum_primitives::Block;
319
320        async fn validate_transaction(
321            &self,
322            _origin: TransactionOrigin,
323            transaction: Self::Transaction,
324        ) -> TransactionValidationOutcome<Self::Transaction> {
325            TransactionValidationOutcome::Valid {
326                balance: U256::ZERO,
327                state_nonce: 0,
328                bytecode_hash: None,
329                transaction: ValidTransaction::Valid(transaction),
330                propagate: false,
331                authorities: Some(Vec::<Address>::new()),
332            }
333        }
334    }
335
336    #[tokio::test]
337    async fn executor_new_spawns_and_validates_single() {
338        let validator = NoopValidator;
339        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
340        tokio::spawn(task.run());
341        let tx = MockTransaction::legacy();
342        let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
343        assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
344    }
345
346    #[tokio::test]
347    async fn executor_new_spawns_and_validates_batch() {
348        let validator = NoopValidator;
349        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
350        tokio::spawn(task.run());
351        let txs = vec![
352            (TransactionOrigin::External, MockTransaction::legacy()),
353            (TransactionOrigin::Local, MockTransaction::legacy()),
354        ];
355        let out = executor.validate_transactions(txs).await;
356        assert_eq!(out.len(), 2);
357        assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
358    }
359}