reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
6 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
7 TransactionValidator,
8};
9use futures_util::{lock::Mutex, StreamExt};
10use reth_primitives_traits::{Block, SealedBlock};
11use reth_tasks::TaskSpawner;
12use std::{future::Future, pin::Pin, sync::Arc};
13use tokio::{
14 sync,
15 sync::{mpsc, oneshot},
16};
17use tokio_stream::wrappers::ReceiverStream;
18
19type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
21
22type ValidationStream = ReceiverStream<ValidationFuture>;
24
25#[derive(Clone)]
31pub struct ValidationTask {
32 validation_jobs: Arc<Mutex<ValidationStream>>,
33}
34
35impl ValidationTask {
36 pub fn new() -> (ValidationJobSender, Self) {
38 let (tx, rx) = mpsc::channel(1);
39 (ValidationJobSender { tx }, Self::with_receiver(rx))
40 }
41
42 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
44 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
45 }
46
47 pub async fn run(self) {
51 while let Some(task) = self.validation_jobs.lock().await.next().await {
52 task.await;
53 }
54 }
55}
56
57impl std::fmt::Debug for ValidationTask {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
60 }
61}
62
63#[derive(Debug)]
65pub struct ValidationJobSender {
66 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
67}
68
69impl ValidationJobSender {
70 pub async fn send(
72 &self,
73 job: Pin<Box<dyn Future<Output = ()> + Send>>,
74 ) -> Result<(), TransactionValidatorError> {
75 self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
76 }
77}
78
79#[derive(Debug)]
82pub struct TransactionValidationTaskExecutor<V> {
83 pub validator: Arc<V>,
85 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
87}
88
89impl<V> Clone for TransactionValidationTaskExecutor<V> {
90 fn clone(&self) -> Self {
91 Self {
92 validator: self.validator.clone(),
93 to_validation_task: self.to_validation_task.clone(),
94 }
95 }
96}
97
98impl TransactionValidationTaskExecutor<()> {
101 pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
103 EthTransactionValidatorBuilder::new(client)
104 }
105}
106
107impl<V> TransactionValidationTaskExecutor<V> {
108 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
110 where
111 F: FnMut(V) -> T,
112 {
113 TransactionValidationTaskExecutor {
114 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
115 to_validation_task: self.to_validation_task,
116 }
117 }
118
119 pub fn validator(&self) -> &V {
121 &self.validator
122 }
123}
124
125impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
126 pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
131 where
132 T: TaskSpawner,
133 {
134 Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
135 }
136
137 pub fn eth_with_additional_tasks<T, S: BlobStore>(
147 client: Client,
148 blob_store: S,
149 tasks: T,
150 num_additional_tasks: usize,
151 ) -> Self
152 where
153 T: TaskSpawner,
154 {
155 EthTransactionValidatorBuilder::new(client)
156 .with_additional_tasks(num_additional_tasks)
157 .build_with_tasks::<Tx, T, S>(tasks, blob_store)
158 }
159}
160
161impl<V> TransactionValidationTaskExecutor<V> {
162 pub fn new(validator: V) -> Self {
167 let (tx, _) = ValidationTask::new();
168 Self { validator: Arc::new(validator), to_validation_task: Arc::new(sync::Mutex::new(tx)) }
169 }
170}
171
172impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
173where
174 V: TransactionValidator + 'static,
175{
176 type Transaction = <V as TransactionValidator>::Transaction;
177
178 async fn validate_transaction(
179 &self,
180 origin: TransactionOrigin,
181 transaction: Self::Transaction,
182 ) -> TransactionValidationOutcome<Self::Transaction> {
183 let hash = *transaction.hash();
184 let (tx, rx) = oneshot::channel();
185 {
186 let res = {
187 let to_validation_task = self.to_validation_task.clone();
188 let validator = self.validator.clone();
189 let fut = Box::pin(async move {
190 let res = validator.validate_transaction(origin, transaction).await;
191 let _ = tx.send(res);
192 });
193 let to_validation_task = to_validation_task.lock().await;
194 to_validation_task.send(fut).await
195 };
196 if res.is_err() {
197 return TransactionValidationOutcome::Error(
198 hash,
199 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
200 );
201 }
202 }
203
204 match rx.await {
205 Ok(res) => res,
206 Err(_) => TransactionValidationOutcome::Error(
207 hash,
208 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
209 ),
210 }
211 }
212
213 async fn validate_transactions(
214 &self,
215 transactions: Vec<(TransactionOrigin, Self::Transaction)>,
216 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
217 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
218 let (tx, rx) = oneshot::channel();
219 {
220 let res = {
221 let to_validation_task = self.to_validation_task.clone();
222 let validator = self.validator.clone();
223 let fut = Box::pin(async move {
224 let res = validator.validate_transactions(transactions).await;
225 let _ = tx.send(res);
226 });
227 let to_validation_task = to_validation_task.lock().await;
228 to_validation_task.send(fut).await
229 };
230 if res.is_err() {
231 return hashes
232 .into_iter()
233 .map(|hash| {
234 TransactionValidationOutcome::Error(
235 hash,
236 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
237 )
238 })
239 .collect();
240 }
241 }
242 match rx.await {
243 Ok(res) => res,
244 Err(_) => hashes
245 .into_iter()
246 .map(|hash| {
247 TransactionValidationOutcome::Error(
248 hash,
249 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
250 )
251 })
252 .collect(),
253 }
254 }
255
256 fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
257 where
258 B: Block,
259 {
260 self.validator.on_new_head_block(new_tip_block)
261 }
262}