reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 metrics::TxPoolValidatorMetrics,
6 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8 TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_primitives_traits::{Block, SealedBlock};
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15 sync,
16 sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23type ValidationStream = ReceiverStream<ValidationFuture>;
25
26#[derive(Clone)]
32pub struct ValidationTask {
33 validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37 pub fn new() -> (ValidationJobSender, Self) {
41 Self::with_capacity(1)
42 }
43
44 pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
46 let (tx, rx) = mpsc::channel(capacity);
47 let metrics = TxPoolValidatorMetrics::default();
48 (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
49 }
50
51 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
53 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
54 }
55
56 pub async fn run(self) {
60 while let Some(task) = self.validation_jobs.lock().await.next().await {
61 task.await;
62 }
63 }
64}
65
66impl std::fmt::Debug for ValidationTask {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
69 }
70}
71
72#[derive(Debug)]
74pub struct ValidationJobSender {
75 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
76 metrics: TxPoolValidatorMetrics,
77}
78
79impl ValidationJobSender {
80 pub async fn send(
82 &self,
83 job: Pin<Box<dyn Future<Output = ()> + Send>>,
84 ) -> Result<(), TransactionValidatorError> {
85 self.metrics.inflight_validation_jobs.increment(1);
86 let res = self
87 .tx
88 .send(job)
89 .await
90 .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
91 self.metrics.inflight_validation_jobs.decrement(1);
92 res
93 }
94}
95
96#[derive(Debug)]
99pub struct TransactionValidationTaskExecutor<V> {
100 pub validator: Arc<V>,
102 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
104}
105
106impl<V> Clone for TransactionValidationTaskExecutor<V> {
107 fn clone(&self) -> Self {
108 Self {
109 validator: self.validator.clone(),
110 to_validation_task: self.to_validation_task.clone(),
111 }
112 }
113}
114
115impl TransactionValidationTaskExecutor<()> {
118 pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
120 EthTransactionValidatorBuilder::new(client)
121 }
122}
123
124impl<V> TransactionValidationTaskExecutor<V> {
125 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
127 where
128 F: FnMut(V) -> T,
129 {
130 TransactionValidationTaskExecutor {
131 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
132 to_validation_task: self.to_validation_task,
133 }
134 }
135
136 pub fn validator(&self) -> &V {
138 &self.validator
139 }
140}
141
142impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
143 pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
148 where
149 T: TaskSpawner,
150 {
151 Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
152 }
153
154 pub fn eth_with_additional_tasks<T, S: BlobStore>(
164 client: Client,
165 blob_store: S,
166 tasks: T,
167 num_additional_tasks: usize,
168 ) -> Self
169 where
170 T: TaskSpawner,
171 {
172 EthTransactionValidatorBuilder::new(client)
173 .with_additional_tasks(num_additional_tasks)
174 .build_with_tasks::<Tx, T, S>(tasks, blob_store)
175 }
176}
177
178impl<V> TransactionValidationTaskExecutor<V> {
179 pub fn new(validator: V) -> Self {
184 let (tx, _) = ValidationTask::new();
185 Self { validator: Arc::new(validator), to_validation_task: Arc::new(sync::Mutex::new(tx)) }
186 }
187}
188
189impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
190where
191 V: TransactionValidator + 'static,
192{
193 type Transaction = <V as TransactionValidator>::Transaction;
194
195 async fn validate_transaction(
196 &self,
197 origin: TransactionOrigin,
198 transaction: Self::Transaction,
199 ) -> TransactionValidationOutcome<Self::Transaction> {
200 let hash = *transaction.hash();
201 let (tx, rx) = oneshot::channel();
202 {
203 let res = {
204 let to_validation_task = self.to_validation_task.clone();
205 let validator = self.validator.clone();
206 let fut = Box::pin(async move {
207 let res = validator.validate_transaction(origin, transaction).await;
208 let _ = tx.send(res);
209 });
210 let to_validation_task = to_validation_task.lock().await;
211 to_validation_task.send(fut).await
212 };
213 if res.is_err() {
214 return TransactionValidationOutcome::Error(
215 hash,
216 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
217 );
218 }
219 }
220
221 match rx.await {
222 Ok(res) => res,
223 Err(_) => TransactionValidationOutcome::Error(
224 hash,
225 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
226 ),
227 }
228 }
229
230 async fn validate_transactions(
231 &self,
232 transactions: Vec<(TransactionOrigin, Self::Transaction)>,
233 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
234 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
235 let (tx, rx) = oneshot::channel();
236 {
237 let res = {
238 let to_validation_task = self.to_validation_task.clone();
239 let validator = self.validator.clone();
240 let fut = Box::pin(async move {
241 let res = validator.validate_transactions(transactions).await;
242 let _ = tx.send(res);
243 });
244 let to_validation_task = to_validation_task.lock().await;
245 to_validation_task.send(fut).await
246 };
247 if res.is_err() {
248 return hashes
249 .into_iter()
250 .map(|hash| {
251 TransactionValidationOutcome::Error(
252 hash,
253 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
254 )
255 })
256 .collect();
257 }
258 }
259 match rx.await {
260 Ok(res) => res,
261 Err(_) => hashes
262 .into_iter()
263 .map(|hash| {
264 TransactionValidationOutcome::Error(
265 hash,
266 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
267 )
268 })
269 .collect(),
270 }
271 }
272
273 async fn validate_transactions_with_origin(
274 &self,
275 origin: TransactionOrigin,
276 transactions: impl IntoIterator<Item = Self::Transaction> + Send,
277 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
278 self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
279 }
280
281 fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
282 where
283 B: Block,
284 {
285 self.validator.on_new_head_block(new_tip_block)
286 }
287}