Skip to main content

reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    metrics::TxPoolValidatorMetrics,
6    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8    TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
12use reth_evm::ConfigureEvm;
13use reth_primitives_traits::{HeaderTy, SealedBlock};
14use reth_storage_api::BlockReaderIdExt;
15use reth_tasks::Runtime;
16use std::{future::Future, pin::Pin, sync::Arc};
17use tokio::{
18    sync,
19    sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::ReceiverStream;
22
23/// Represents a future outputting unit type and is sendable.
24type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
25
26/// Represents a stream of validation futures.
27type ValidationStream = ReceiverStream<ValidationFuture>;
28
29/// A service that performs validation jobs.
30///
31/// This listens for incoming validation jobs and executes them.
32///
33/// This should be spawned as a task: [`ValidationTask::run`]
34#[derive(Clone)]
35pub struct ValidationTask {
36    validation_jobs: Arc<Mutex<ValidationStream>>,
37}
38
39impl ValidationTask {
40    /// Creates a new cloneable task pair.
41    ///
42    /// The sender sends new (transaction) validation tasks to an available validation task.
43    pub fn new() -> (ValidationJobSender, Self) {
44        Self::with_capacity(1)
45    }
46
47    /// Creates a new cloneable task pair with the given channel capacity.
48    pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
49        let (tx, rx) = mpsc::channel(capacity);
50        let metrics = TxPoolValidatorMetrics::default();
51        (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
52    }
53
54    /// Creates a new task with the given receiver.
55    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
56        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
57    }
58
59    /// Executes all new validation jobs that come in.
60    ///
61    /// This will run as long as the channel is alive and is expected to be spawned as a task.
62    pub async fn run(self) {
63        while let Some(task) = self.validation_jobs.lock().await.next().await {
64            task.await;
65        }
66    }
67}
68
69impl std::fmt::Debug for ValidationTask {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
72    }
73}
74
75/// A sender new type for sending validation jobs to [`ValidationTask`].
76#[derive(Debug)]
77pub struct ValidationJobSender {
78    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
79    metrics: TxPoolValidatorMetrics,
80}
81
82impl ValidationJobSender {
83    /// Sends the given job to the validation task.
84    pub async fn send(
85        &self,
86        job: Pin<Box<dyn Future<Output = ()> + Send>>,
87    ) -> Result<(), TransactionValidatorError> {
88        self.metrics.inflight_validation_jobs.increment(1);
89        let res = self
90            .tx
91            .send(job)
92            .await
93            .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
94        self.metrics.inflight_validation_jobs.decrement(1);
95        res
96    }
97}
98
99/// A [`TransactionValidator`] implementation that validates ethereum transaction.
100/// This validator is non-blocking, all validation work is done in a separate task.
101#[derive(Debug)]
102pub struct TransactionValidationTaskExecutor<V> {
103    /// The validator that will validate transactions on a separate task.
104    pub validator: Arc<V>,
105    /// The sender half to validation tasks that perform the actual validation.
106    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
107}
108
109impl<V> Clone for TransactionValidationTaskExecutor<V> {
110    fn clone(&self) -> Self {
111        Self {
112            validator: self.validator.clone(),
113            to_validation_task: self.to_validation_task.clone(),
114        }
115    }
116}
117
118// === impl TransactionValidationTaskExecutor ===
119
120impl TransactionValidationTaskExecutor<()> {
121    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
122    pub fn eth_builder<Client, Evm>(
123        client: Client,
124        evm_config: Evm,
125    ) -> EthTransactionValidatorBuilder<Client, Evm>
126    where
127        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
128            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
129        Evm: ConfigureEvm,
130    {
131        EthTransactionValidatorBuilder::new(client, evm_config)
132    }
133}
134
135impl<V> TransactionValidationTaskExecutor<V> {
136    /// Maps the given validator to a new type.
137    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
138    where
139        F: FnMut(V) -> T,
140    {
141        TransactionValidationTaskExecutor {
142            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
143            to_validation_task: self.to_validation_task,
144        }
145    }
146
147    /// Returns the validator.
148    pub fn validator(&self) -> &V {
149        &self.validator
150    }
151}
152
153impl<Client, Tx, Evm> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx, Evm>> {
154    /// Creates a new instance for the given client
155    ///
156    /// This will spawn a single validation tasks that performs the actual validation.
157    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
158    pub fn eth<S: BlobStore>(client: Client, evm_config: Evm, blob_store: S, tasks: Runtime) -> Self
159    where
160        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
161            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
162        Evm: ConfigureEvm,
163    {
164        Self::eth_with_additional_tasks(client, evm_config, blob_store, tasks, 0)
165    }
166
167    /// Creates a new instance for the given client
168    ///
169    /// By default this will enable support for:
170    ///   - shanghai
171    ///   - eip1559
172    ///   - eip2930
173    ///
174    /// This will always spawn a validation task that performs the actual validation. It will spawn
175    /// `num_additional_tasks` additional tasks.
176    pub fn eth_with_additional_tasks<S: BlobStore>(
177        client: Client,
178        evm_config: Evm,
179        blob_store: S,
180        tasks: Runtime,
181        num_additional_tasks: usize,
182    ) -> Self
183    where
184        Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
185            + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
186        Evm: ConfigureEvm,
187    {
188        EthTransactionValidatorBuilder::new(client, evm_config)
189            .with_additional_tasks(num_additional_tasks)
190            .build_with_tasks(tasks, blob_store)
191    }
192}
193
194impl<V> TransactionValidationTaskExecutor<V> {
195    /// Creates a new executor instance with the given validator for transaction validation.
196    ///
197    /// Initializes the executor with the provided validator and sets up communication for
198    /// validation tasks.
199    pub fn new(validator: V) -> (Self, ValidationTask) {
200        let (tx, task) = ValidationTask::new();
201        (
202            Self {
203                validator: Arc::new(validator),
204                to_validation_task: Arc::new(sync::Mutex::new(tx)),
205            },
206            task,
207        )
208    }
209
210    /// Creates a new executor and spawns the validation tasks on the given runtime.
211    ///
212    /// This spawns `additional_tasks` extra blocking tasks plus one critical blocking task
213    /// for the validation service.
214    pub fn spawn(validator: V, tasks: &Runtime, additional_tasks: usize) -> Self {
215        let (tx, task) = ValidationTask::new();
216
217        for _ in 0..additional_tasks {
218            let task = task.clone();
219            tasks.spawn_blocking_task(async move {
220                task.run().await;
221            });
222        }
223
224        tasks.spawn_critical_blocking_task("transaction-validation-service", async move {
225            task.run().await;
226        });
227
228        Self { validator: Arc::new(validator), to_validation_task: Arc::new(sync::Mutex::new(tx)) }
229    }
230}
231
232impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
233where
234    V: TransactionValidator + 'static,
235{
236    type Transaction = <V as TransactionValidator>::Transaction;
237    type Block = V::Block;
238
239    async fn validate_transaction(
240        &self,
241        origin: TransactionOrigin,
242        transaction: Self::Transaction,
243    ) -> TransactionValidationOutcome<Self::Transaction> {
244        let hash = *transaction.hash();
245        let (tx, rx) = oneshot::channel();
246        {
247            let res = {
248                let to_validation_task = self.to_validation_task.clone();
249                let validator = self.validator.clone();
250                let fut = Box::pin(async move {
251                    let res = validator.validate_transaction(origin, transaction).await;
252                    let _ = tx.send(res);
253                });
254                let to_validation_task = to_validation_task.lock().await;
255                to_validation_task.send(fut).await
256            };
257            if res.is_err() {
258                return TransactionValidationOutcome::Error(
259                    hash,
260                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
261                );
262            }
263        }
264
265        match rx.await {
266            Ok(res) => res,
267            Err(_) => TransactionValidationOutcome::Error(
268                hash,
269                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
270            ),
271        }
272    }
273
274    async fn validate_transactions(
275        &self,
276        transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction), IntoIter: Send>
277            + Send,
278    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
279        let transactions: Vec<_> = transactions.into_iter().collect();
280        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
281        let (tx, rx) = oneshot::channel();
282        {
283            let res = {
284                let to_validation_task = self.to_validation_task.clone();
285                let validator = self.validator.clone();
286                let fut = Box::pin(async move {
287                    let res = validator.validate_transactions(transactions).await;
288                    let _ = tx.send(res);
289                });
290                let to_validation_task = to_validation_task.lock().await;
291                to_validation_task.send(fut).await
292            };
293            if res.is_err() {
294                return hashes
295                    .into_iter()
296                    .map(|hash| {
297                        TransactionValidationOutcome::Error(
298                            hash,
299                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
300                        )
301                    })
302                    .collect();
303            }
304        }
305        match rx.await {
306            Ok(res) => res,
307            Err(_) => hashes
308                .into_iter()
309                .map(|hash| {
310                    TransactionValidationOutcome::Error(
311                        hash,
312                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
313                    )
314                })
315                .collect(),
316        }
317    }
318
319    fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
320        self.validator.on_new_head_block(new_tip_block)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use crate::{
328        test_utils::MockTransaction,
329        validate::{TransactionValidationOutcome, ValidTransaction},
330        TransactionOrigin,
331    };
332    use alloy_primitives::{Address, U256};
333
334    #[derive(Debug)]
335    struct NoopValidator;
336
337    impl TransactionValidator for NoopValidator {
338        type Transaction = MockTransaction;
339        type Block = reth_ethereum_primitives::Block;
340
341        async fn validate_transaction(
342            &self,
343            _origin: TransactionOrigin,
344            transaction: Self::Transaction,
345        ) -> TransactionValidationOutcome<Self::Transaction> {
346            TransactionValidationOutcome::Valid {
347                balance: U256::ZERO,
348                state_nonce: 0,
349                bytecode_hash: None,
350                transaction: ValidTransaction::Valid(transaction),
351                propagate: false,
352                authorities: Some(Vec::<Address>::new()),
353            }
354        }
355    }
356
357    #[tokio::test]
358    async fn executor_new_spawns_and_validates_single() {
359        let validator = NoopValidator;
360        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
361        tokio::spawn(task.run());
362        let tx = MockTransaction::legacy();
363        let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
364        assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
365    }
366
367    #[tokio::test]
368    async fn executor_new_spawns_and_validates_batch() {
369        let validator = NoopValidator;
370        let (executor, task) = TransactionValidationTaskExecutor::new(validator);
371        tokio::spawn(task.run());
372        let txs = vec![
373            (TransactionOrigin::External, MockTransaction::legacy()),
374            (TransactionOrigin::Local, MockTransaction::legacy()),
375        ];
376        let out = executor.validate_transactions(txs).await;
377        assert_eq!(out.len(), 2);
378        assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
379    }
380}