reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
6    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
7    TransactionValidator,
8};
9use futures_util::{lock::Mutex, StreamExt};
10use reth_primitives_traits::{Block, SealedBlock};
11use reth_tasks::TaskSpawner;
12use std::{future::Future, pin::Pin, sync::Arc};
13use tokio::{
14    sync,
15    sync::{mpsc, oneshot},
16};
17use tokio_stream::wrappers::ReceiverStream;
18
19/// Represents a future outputting unit type and is sendable.
20type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
21
22/// Represents a stream of validation futures.
23type ValidationStream = ReceiverStream<ValidationFuture>;
24
25/// A service that performs validation jobs.
26///
27/// This listens for incoming validation jobs and executes them.
28///
29/// This should be spawned as a task: [`ValidationTask::run`]
30#[derive(Clone)]
31pub struct ValidationTask {
32    validation_jobs: Arc<Mutex<ValidationStream>>,
33}
34
35impl ValidationTask {
36    /// Creates a new cloneable task pair
37    pub fn new() -> (ValidationJobSender, Self) {
38        let (tx, rx) = mpsc::channel(1);
39        (ValidationJobSender { tx }, Self::with_receiver(rx))
40    }
41
42    /// Creates a new task with the given receiver.
43    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
44        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
45    }
46
47    /// Executes all new validation jobs that come in.
48    ///
49    /// This will run as long as the channel is alive and is expected to be spawned as a task.
50    pub async fn run(self) {
51        while let Some(task) = self.validation_jobs.lock().await.next().await {
52            task.await;
53        }
54    }
55}
56
57impl std::fmt::Debug for ValidationTask {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
60    }
61}
62
63/// A sender new type for sending validation jobs to [`ValidationTask`].
64#[derive(Debug)]
65pub struct ValidationJobSender {
66    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
67}
68
69impl ValidationJobSender {
70    /// Sends the given job to the validation task.
71    pub async fn send(
72        &self,
73        job: Pin<Box<dyn Future<Output = ()> + Send>>,
74    ) -> Result<(), TransactionValidatorError> {
75        self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
76    }
77}
78
79/// A [`TransactionValidator`] implementation that validates ethereum transaction.
80/// This validator is non-blocking, all validation work is done in a separate task.
81#[derive(Debug, Clone)]
82pub struct TransactionValidationTaskExecutor<V> {
83    /// The validator that will validate transactions on a separate task.
84    pub validator: V,
85    /// The sender half to validation tasks that perform the actual validation.
86    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
87}
88
89// === impl TransactionValidationTaskExecutor ===
90
91impl TransactionValidationTaskExecutor<()> {
92    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
93    pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
94        EthTransactionValidatorBuilder::new(client)
95    }
96}
97
98impl<V> TransactionValidationTaskExecutor<V> {
99    /// Maps the given validator to a new type.
100    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
101    where
102        F: FnMut(V) -> T,
103    {
104        TransactionValidationTaskExecutor {
105            validator: f(self.validator),
106            to_validation_task: self.to_validation_task,
107        }
108    }
109}
110
111impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
112    /// Creates a new instance for the given client
113    ///
114    /// This will spawn a single validation tasks that performs the actual validation.
115    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
116    pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
117    where
118        T: TaskSpawner,
119    {
120        Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
121    }
122
123    /// Creates a new instance for the given client
124    ///
125    /// By default this will enable support for:
126    ///   - shanghai
127    ///   - eip1559
128    ///   - eip2930
129    ///
130    /// This will always spawn a validation task that performs the actual validation. It will spawn
131    /// `num_additional_tasks` additional tasks.
132    pub fn eth_with_additional_tasks<T, S: BlobStore>(
133        client: Client,
134        blob_store: S,
135        tasks: T,
136        num_additional_tasks: usize,
137    ) -> Self
138    where
139        T: TaskSpawner,
140    {
141        EthTransactionValidatorBuilder::new(client)
142            .with_additional_tasks(num_additional_tasks)
143            .build_with_tasks::<Tx, T, S>(tasks, blob_store)
144    }
145}
146
147impl<V> TransactionValidationTaskExecutor<V> {
148    /// Creates a new executor instance with the given validator for transaction validation.
149    ///
150    /// Initializes the executor with the provided validator and sets up communication for
151    /// validation tasks.
152    pub fn new(validator: V) -> Self {
153        let (tx, _) = ValidationTask::new();
154        Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
155    }
156}
157
158impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
159where
160    V: TransactionValidator + Clone + 'static,
161{
162    type Transaction = <V as TransactionValidator>::Transaction;
163
164    async fn validate_transaction(
165        &self,
166        origin: TransactionOrigin,
167        transaction: Self::Transaction,
168    ) -> TransactionValidationOutcome<Self::Transaction> {
169        let hash = *transaction.hash();
170        let (tx, rx) = oneshot::channel();
171        {
172            let res = {
173                let to_validation_task = self.to_validation_task.clone();
174                let to_validation_task = to_validation_task.lock().await;
175                let validator = self.validator.clone();
176                to_validation_task
177                    .send(Box::pin(async move {
178                        let res = validator.validate_transaction(origin, transaction).await;
179                        let _ = tx.send(res);
180                    }))
181                    .await
182            };
183            if res.is_err() {
184                return TransactionValidationOutcome::Error(
185                    hash,
186                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
187                )
188            }
189        }
190
191        match rx.await {
192            Ok(res) => res,
193            Err(_) => TransactionValidationOutcome::Error(
194                hash,
195                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
196            ),
197        }
198    }
199
200    fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
201    where
202        B: Block,
203    {
204        self.validator.on_new_head_block(new_tip_block)
205    }
206}