reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
6    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
7    TransactionValidator,
8};
9use futures_util::{lock::Mutex, StreamExt};
10use reth_primitives_traits::{Block, SealedBlock};
11use reth_tasks::TaskSpawner;
12use std::{future::Future, pin::Pin, sync::Arc};
13use tokio::{
14    sync,
15    sync::{mpsc, oneshot},
16};
17use tokio_stream::wrappers::ReceiverStream;
18
19/// Represents a future outputting unit type and is sendable.
20type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
21
22/// Represents a stream of validation futures.
23type ValidationStream = ReceiverStream<ValidationFuture>;
24
25/// A service that performs validation jobs.
26///
27/// This listens for incoming validation jobs and executes them.
28///
29/// This should be spawned as a task: [`ValidationTask::run`]
30#[derive(Clone)]
31pub struct ValidationTask {
32    validation_jobs: Arc<Mutex<ValidationStream>>,
33}
34
35impl ValidationTask {
36    /// Creates a new cloneable task pair
37    pub fn new() -> (ValidationJobSender, Self) {
38        let (tx, rx) = mpsc::channel(1);
39        (ValidationJobSender { tx }, Self::with_receiver(rx))
40    }
41
42    /// Creates a new task with the given receiver.
43    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
44        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
45    }
46
47    /// Executes all new validation jobs that come in.
48    ///
49    /// This will run as long as the channel is alive and is expected to be spawned as a task.
50    pub async fn run(self) {
51        while let Some(task) = self.validation_jobs.lock().await.next().await {
52            task.await;
53        }
54    }
55}
56
57impl std::fmt::Debug for ValidationTask {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
60    }
61}
62
63/// A sender new type for sending validation jobs to [`ValidationTask`].
64#[derive(Debug)]
65pub struct ValidationJobSender {
66    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
67}
68
69impl ValidationJobSender {
70    /// Sends the given job to the validation task.
71    pub async fn send(
72        &self,
73        job: Pin<Box<dyn Future<Output = ()> + Send>>,
74    ) -> Result<(), TransactionValidatorError> {
75        self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
76    }
77}
78
79/// A [`TransactionValidator`] implementation that validates ethereum transaction.
80/// This validator is non-blocking, all validation work is done in a separate task.
81#[derive(Debug)]
82pub struct TransactionValidationTaskExecutor<V> {
83    /// The validator that will validate transactions on a separate task.
84    pub validator: Arc<V>,
85    /// The sender half to validation tasks that perform the actual validation.
86    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
87}
88
89impl<V> Clone for TransactionValidationTaskExecutor<V> {
90    fn clone(&self) -> Self {
91        Self {
92            validator: self.validator.clone(),
93            to_validation_task: self.to_validation_task.clone(),
94        }
95    }
96}
97
98// === impl TransactionValidationTaskExecutor ===
99
100impl TransactionValidationTaskExecutor<()> {
101    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
102    pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
103        EthTransactionValidatorBuilder::new(client)
104    }
105}
106
107impl<V> TransactionValidationTaskExecutor<V> {
108    /// Maps the given validator to a new type.
109    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
110    where
111        F: FnMut(V) -> T,
112    {
113        TransactionValidationTaskExecutor {
114            validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
115            to_validation_task: self.to_validation_task,
116        }
117    }
118
119    /// Returns the validator.
120    pub fn validator(&self) -> &V {
121        &self.validator
122    }
123}
124
125impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
126    /// Creates a new instance for the given client
127    ///
128    /// This will spawn a single validation tasks that performs the actual validation.
129    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
130    pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
131    where
132        T: TaskSpawner,
133    {
134        Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
135    }
136
137    /// Creates a new instance for the given client
138    ///
139    /// By default this will enable support for:
140    ///   - shanghai
141    ///   - eip1559
142    ///   - eip2930
143    ///
144    /// This will always spawn a validation task that performs the actual validation. It will spawn
145    /// `num_additional_tasks` additional tasks.
146    pub fn eth_with_additional_tasks<T, S: BlobStore>(
147        client: Client,
148        blob_store: S,
149        tasks: T,
150        num_additional_tasks: usize,
151    ) -> Self
152    where
153        T: TaskSpawner,
154    {
155        EthTransactionValidatorBuilder::new(client)
156            .with_additional_tasks(num_additional_tasks)
157            .build_with_tasks::<Tx, T, S>(tasks, blob_store)
158    }
159}
160
161impl<V> TransactionValidationTaskExecutor<V> {
162    /// Creates a new executor instance with the given validator for transaction validation.
163    ///
164    /// Initializes the executor with the provided validator and sets up communication for
165    /// validation tasks.
166    pub fn new(validator: V) -> Self {
167        let (tx, _) = ValidationTask::new();
168        Self { validator: Arc::new(validator), to_validation_task: Arc::new(sync::Mutex::new(tx)) }
169    }
170}
171
172impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
173where
174    V: TransactionValidator + 'static,
175{
176    type Transaction = <V as TransactionValidator>::Transaction;
177
178    async fn validate_transaction(
179        &self,
180        origin: TransactionOrigin,
181        transaction: Self::Transaction,
182    ) -> TransactionValidationOutcome<Self::Transaction> {
183        let hash = *transaction.hash();
184        let (tx, rx) = oneshot::channel();
185        {
186            let res = {
187                let to_validation_task = self.to_validation_task.clone();
188                let validator = self.validator.clone();
189                let fut = Box::pin(async move {
190                    let res = validator.validate_transaction(origin, transaction).await;
191                    let _ = tx.send(res);
192                });
193                let to_validation_task = to_validation_task.lock().await;
194                to_validation_task.send(fut).await
195            };
196            if res.is_err() {
197                return TransactionValidationOutcome::Error(
198                    hash,
199                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
200                );
201            }
202        }
203
204        match rx.await {
205            Ok(res) => res,
206            Err(_) => TransactionValidationOutcome::Error(
207                hash,
208                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
209            ),
210        }
211    }
212
213    async fn validate_transactions(
214        &self,
215        transactions: Vec<(TransactionOrigin, Self::Transaction)>,
216    ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
217        let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
218        let (tx, rx) = oneshot::channel();
219        {
220            let res = {
221                let to_validation_task = self.to_validation_task.clone();
222                let validator = self.validator.clone();
223                let fut = Box::pin(async move {
224                    let res = validator.validate_transactions(transactions).await;
225                    let _ = tx.send(res);
226                });
227                let to_validation_task = to_validation_task.lock().await;
228                to_validation_task.send(fut).await
229            };
230            if res.is_err() {
231                return hashes
232                    .into_iter()
233                    .map(|hash| {
234                        TransactionValidationOutcome::Error(
235                            hash,
236                            Box::new(TransactionValidatorError::ValidationServiceUnreachable),
237                        )
238                    })
239                    .collect();
240            }
241        }
242        match rx.await {
243            Ok(res) => res,
244            Err(_) => hashes
245                .into_iter()
246                .map(|hash| {
247                    TransactionValidationOutcome::Error(
248                        hash,
249                        Box::new(TransactionValidatorError::ValidationServiceUnreachable),
250                    )
251                })
252                .collect(),
253        }
254    }
255
256    fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
257    where
258        B: Block,
259    {
260        self.validator.on_new_head_block(new_tip_block)
261    }
262}