Skip to main content

reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    metrics::TxPoolValidatorMetrics,
6    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8    TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
12use reth_evm::ConfigureEvm;
13use reth_primitives_traits::{HeaderTy, SealedBlock};
14use reth_storage_api::BlockReaderIdExt;
15use reth_tasks::TaskSpawner;
16use std::{future::Future, pin::Pin, sync::Arc};
17use tokio::{
18    sync,
19    sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::ReceiverStream;
22
23/// Represents a future outputting unit type and is sendable.
24type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
25
26/// Represents a stream of validation futures.
27type ValidationStream = ReceiverStream<ValidationFuture>;
28
29/// A service that performs validation jobs.
30///
31/// This listens for incoming validation jobs and executes them.
32///
33/// This should be spawned as a task: [`ValidationTask::run`]
34#[derive(Clone)]
35pub struct ValidationTask {
36    validation_jobs: Arc<Mutex<ValidationStream>>,
37}
38
39impl ValidationTask {
40    /// Creates a new cloneable task pair.
41    ///
42    /// The sender sends new (transaction) validation tasks to an available validation task.
43    pub fn new() -> (ValidationJobSender, Self) {
44        Self::with_capacity(1)
45    }
46
47    /// Creates a new cloneable task pair with the given channel capacity.
48    pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
49        let (tx, rx) = mpsc::channel(capacity);
50        let metrics = TxPoolValidatorMetrics::default();
51        (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
52    }
53
54    /// Creates a new task with the given receiver.
55    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
56        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
57    }
58
59    /// Executes all new validation jobs that come in.
60    ///
61    /// This will run as long as the channel is alive and is expected to be spawned as a task.
62    pub async fn run(self) {
63        while let Some(task) = self.validation_jobs.lock().await.next().await {
64            task.await;
65        }
66    }
67}
68
69impl std::fmt::Debug for ValidationTask {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
72    }
73}
74
75/// A sender new type for sending validation jobs to [`ValidationTask`].
76#[derive(Debug)]
77pub struct ValidationJobSender {
78    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
79    metrics: TxPoolValidatorMetrics,
80}
81
82impl ValidationJobSender {
83    /// Sends the given job to the validation task.
84    pub async fn send(
85        &self,
86        job: Pin<Box<dyn Future<Output = ()> + Send>>,
87    ) -> Result<(), TransactionValidatorError> {
88        self.metrics.inflight_validation_jobs.increment(1);
89        let res = self
90            .tx
91            .send(job)
92            .await
93            .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
94        self.metrics.inflight_validation_jobs.decrement(1);
95        res
96    }
97}
98
99/// A [`TransactionValidator`] implementation that validates ethereum transaction.
100/// This validator is non-blocking, all validation work is done in a separate task.
101#[derive(Debug)]
102pub struct TransactionValidationTaskExecutor<V> {
103    /// The validator that will validate transactions on a separate task.
104    pub validator: Arc<V>,
105    /// The sender half to validation tasks that perform the actual validation.
106    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
107}
108
109impl<V> Clone for TransactionValidationTaskExecutor<V> {
110    fn clone(&self) -> Self {
111        Self {
112            validator: self.validator.clone(),
113            to_validation_task: self.to_validation_task.clone(),
114        }
115    }
116}
117
118// === impl TransactionValidationTaskExecutor ===
119
120impl TransactionValidationTaskExecutor<()> {
121    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
122    pub fn eth_builder<Client, Evm>(
123        client: Client,
124        evm_config: Evm,
125    ) -> EthTransactionValidatorBuilder<Client, Evm>
126    where
127        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
128            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
129        Evm: ConfigureEvm,
130    {
131        EthTransactionValidatorBuilder::new(client, evm_config)
132    }
133}
134
135impl<V> TransactionValidationTaskExecutor<V> {
136    /// Maps the given validator to a new type.
137    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
138    where
139        F: FnMut(V) -> T,
140    {
141        TransactionValidationTaskExecutor {
142            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
143            to_validation_task: self.to_validation_task,
144        }
145    }
146
147    /// Returns the validator.
148    pub fn validator(&self) -> &V {
149        &self.validator
150    }
151}
152
153impl<Client, Tx, Evm> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx, Evm>> {
154    /// Creates a new instance for the given client
155    ///
156    /// This will spawn a single validation tasks that performs the actual validation.
157    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
158    pub fn eth<T, S: BlobStore>(client: Client, evm_config: Evm, blob_store: S, tasks: T) -> Self
159    where
160        T: TaskSpawner,
161        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
162            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
163        Evm: ConfigureEvm,
164    {
165        Self::eth_with_additional_tasks(client, evm_config, blob_store, tasks, 0)
166    }
167
168    /// Creates a new instance for the given client
169    ///
170    /// By default this will enable support for:
171    ///   - shanghai
172    ///   - eip1559
173    ///   - eip2930
174    ///
175    /// This will always spawn a validation task that performs the actual validation. It will spawn
176    /// `num_additional_tasks` additional tasks.
177    pub fn eth_with_additional_tasks<T, S: BlobStore>(
178        client: Client,
179        evm_config: Evm,
180        blob_store: S,
181        tasks: T,
182        num_additional_tasks: usize,
183    ) -> Self
184    where
185        T: TaskSpawner,
186        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
187            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
188        Evm: ConfigureEvm,
189    {
190        EthTransactionValidatorBuilder::new(client, evm_config)
191            .with_additional_tasks(num_additional_tasks)
192            .build_with_tasks(tasks, blob_store)
193    }
194}
195
196impl<V> TransactionValidationTaskExecutor<V> {
197    /// Creates a new executor instance with the given validator for transaction validation.
198    ///
199    /// Initializes the executor with the provided validator and sets up communication for
200    /// validation tasks.
201    pub fn new(validator: V) -> (Self, ValidationTask) {
202        let (tx, task) = ValidationTask::new();
203        (
204            Self {
205                validator: Arc::new(validator),
206                to_validation_task: Arc::new(sync::Mutex::new(tx)),
207            },
208            task,
209        )
210    }
211}
212
213impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
214where
215    V: TransactionValidator + 'static,
216{
217    type Transaction = <V as TransactionValidator>::Transaction;
218    type Block = V::Block;
219
220    async fn validate_transaction(
221        &self,
222        origin: TransactionOrigin,
223        transaction: Self::Transaction,
224    ) -> TransactionValidationOutcome<Self::Transaction> {
225        let hash = *transaction.hash();
226        let (tx, rx) = oneshot::channel();
227        {
228            let res = {
229                let to_validation_task = self.to_validation_task.clone();
230                let validator = self.validator.clone();
231                let fut = Box::pin(async move {
232                    let res = validator.validate_transaction(origin, transaction).await;
233                    let _ = tx.send(res);
234                });
235                let to_validation_task = to_validation_task.lock().await;
236                to_validation_task.send(fut).await
237            };
238            if res.is_err() {
239                return TransactionValidationOutcome::Error(
240                    hash,
241                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
242                );
243            }
244        }
245
246        match rx.await {
247            Ok(res) => res,
248            Err(_) => TransactionValidationOutcome::Error(
249                hash,
250                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
251            ),
252        }
253    }
254
255    async fn validate_transactions(
256        &self,
257        transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction), IntoIter: Send>
258            + Send,
259    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
260        let transactions: Vec<_> = transactions.into_iter().collect();
261        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
262        let (tx, rx) = oneshot::channel();
263        {
264            let res = {
265                let to_validation_task = self.to_validation_task.clone();
266                let validator = self.validator.clone();
267                let fut = Box::pin(async move {
268                    let res = validator.validate_transactions(transactions).await;
269                    let _ = tx.send(res);
270                });
271                let to_validation_task = to_validation_task.lock().await;
272                to_validation_task.send(fut).await
273            };
274            if res.is_err() {
275                return hashes
276                    .into_iter()
277                    .map(|hash| {
278                        TransactionValidationOutcome::Error(
279                            hash,
280                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
281                        )
282                    })
283                    .collect();
284            }
285        }
286        match rx.await {
287            Ok(res) => res,
288            Err(_) => hashes
289                .into_iter()
290                .map(|hash| {
291                    TransactionValidationOutcome::Error(
292                        hash,
293                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
294                    )
295                })
296                .collect(),
297        }
298    }
299
300    fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
301        self.validator.on_new_head_block(new_tip_block)
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::{
309        test_utils::MockTransaction,
310        validate::{TransactionValidationOutcome, ValidTransaction},
311        TransactionOrigin,
312    };
313    use alloy_primitives::{Address, U256};
314
315    #[derive(Debug)]
316    struct NoopValidator;
317
318    impl TransactionValidator for NoopValidator {
319        type Transaction = MockTransaction;
320        type Block = reth_ethereum_primitives::Block;
321
322        async fn validate_transaction(
323            &self,
324            _origin: TransactionOrigin,
325            transaction: Self::Transaction,
326        ) -> TransactionValidationOutcome<Self::Transaction> {
327            TransactionValidationOutcome::Valid {
328                balance: U256::ZERO,
329                state_nonce: 0,
330                bytecode_hash: None,
331                transaction: ValidTransaction::Valid(transaction),
332                propagate: false,
333                authorities: Some(Vec::<Address>::new()),
334            }
335        }
336    }
337
338    #[tokio::test]
339    async fn executor_new_spawns_and_validates_single() {
340        let validator = NoopValidator;
341        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
342        tokio::spawn(task.run());
343        let tx = MockTransaction::legacy();
344        let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
345        assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
346    }
347
348    #[tokio::test]
349    async fn executor_new_spawns_and_validates_batch() {
350        let validator = NoopValidator;
351        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
352        tokio::spawn(task.run());
353        let txs = vec![
354            (TransactionOrigin::External, MockTransaction::legacy()),
355            (TransactionOrigin::Local, MockTransaction::legacy()),
356        ];
357        let out = executor.validate_transactions(txs).await;
358        assert_eq!(out.len(), 2);
359        assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
360    }
361}