reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 metrics::TxPoolValidatorMetrics,
6 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8 TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_primitives_traits::{Block, SealedBlock};
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15 sync,
16 sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23type ValidationStream = ReceiverStream<ValidationFuture>;
25
26#[derive(Clone)]
32pub struct ValidationTask {
33 validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37 pub fn new() -> (ValidationJobSender, Self) {
41 Self::with_capacity(1)
42 }
43
44 pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
46 let (tx, rx) = mpsc::channel(capacity);
47 let metrics = TxPoolValidatorMetrics::default();
48 (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
49 }
50
51 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
53 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
54 }
55
56 pub async fn run(self) {
60 while let Some(task) = self.validation_jobs.lock().await.next().await {
61 task.await;
62 }
63 }
64}
65
66impl std::fmt::Debug for ValidationTask {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
69 }
70}
71
72#[derive(Debug)]
74pub struct ValidationJobSender {
75 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
76 metrics: TxPoolValidatorMetrics,
77}
78
79impl ValidationJobSender {
80 pub async fn send(
82 &self,
83 job: Pin<Box<dyn Future<Output = ()> + Send>>,
84 ) -> Result<(), TransactionValidatorError> {
85 self.metrics.inflight_validation_jobs.increment(1);
86 let res = self
87 .tx
88 .send(job)
89 .await
90 .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
91 self.metrics.inflight_validation_jobs.decrement(1);
92 res
93 }
94}
95
96#[derive(Debug)]
99pub struct TransactionValidationTaskExecutor<V> {
100 pub validator: Arc<V>,
102 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
104}
105
106impl<V> Clone for TransactionValidationTaskExecutor<V> {
107 fn clone(&self) -> Self {
108 Self {
109 validator: self.validator.clone(),
110 to_validation_task: self.to_validation_task.clone(),
111 }
112 }
113}
114
115impl TransactionValidationTaskExecutor<()> {
118 pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
120 EthTransactionValidatorBuilder::new(client)
121 }
122}
123
124impl<V> TransactionValidationTaskExecutor<V> {
125 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
127 where
128 F: FnMut(V) -> T,
129 {
130 TransactionValidationTaskExecutor {
131 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
132 to_validation_task: self.to_validation_task,
133 }
134 }
135
136 pub fn validator(&self) -> &V {
138 &self.validator
139 }
140}
141
142impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
143 pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
148 where
149 T: TaskSpawner,
150 {
151 Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
152 }
153
154 pub fn eth_with_additional_tasks<T, S: BlobStore>(
164 client: Client,
165 blob_store: S,
166 tasks: T,
167 num_additional_tasks: usize,
168 ) -> Self
169 where
170 T: TaskSpawner,
171 {
172 EthTransactionValidatorBuilder::new(client)
173 .with_additional_tasks(num_additional_tasks)
174 .build_with_tasks::<Tx, T, S>(tasks, blob_store)
175 }
176}
177
178impl<V> TransactionValidationTaskExecutor<V> {
179 pub fn new(validator: V) -> (Self, ValidationTask) {
184 let (tx, task) = ValidationTask::new();
185 (
186 Self {
187 validator: Arc::new(validator),
188 to_validation_task: Arc::new(sync::Mutex::new(tx)),
189 },
190 task,
191 )
192 }
193}
194
195impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
196where
197 V: TransactionValidator + 'static,
198{
199 type Transaction = <V as TransactionValidator>::Transaction;
200
201 async fn validate_transaction(
202 &self,
203 origin: TransactionOrigin,
204 transaction: Self::Transaction,
205 ) -> TransactionValidationOutcome<Self::Transaction> {
206 let hash = *transaction.hash();
207 let (tx, rx) = oneshot::channel();
208 {
209 let res = {
210 let to_validation_task = self.to_validation_task.clone();
211 let validator = self.validator.clone();
212 let fut = Box::pin(async move {
213 let res = validator.validate_transaction(origin, transaction).await;
214 let _ = tx.send(res);
215 });
216 let to_validation_task = to_validation_task.lock().await;
217 to_validation_task.send(fut).await
218 };
219 if res.is_err() {
220 return TransactionValidationOutcome::Error(
221 hash,
222 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
223 );
224 }
225 }
226
227 match rx.await {
228 Ok(res) => res,
229 Err(_) => TransactionValidationOutcome::Error(
230 hash,
231 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
232 ),
233 }
234 }
235
236 async fn validate_transactions(
237 &self,
238 transactions: Vec<(TransactionOrigin, Self::Transaction)>,
239 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
240 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
241 let (tx, rx) = oneshot::channel();
242 {
243 let res = {
244 let to_validation_task = self.to_validation_task.clone();
245 let validator = self.validator.clone();
246 let fut = Box::pin(async move {
247 let res = validator.validate_transactions(transactions).await;
248 let _ = tx.send(res);
249 });
250 let to_validation_task = to_validation_task.lock().await;
251 to_validation_task.send(fut).await
252 };
253 if res.is_err() {
254 return hashes
255 .into_iter()
256 .map(|hash| {
257 TransactionValidationOutcome::Error(
258 hash,
259 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
260 )
261 })
262 .collect();
263 }
264 }
265 match rx.await {
266 Ok(res) => res,
267 Err(_) => hashes
268 .into_iter()
269 .map(|hash| {
270 TransactionValidationOutcome::Error(
271 hash,
272 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
273 )
274 })
275 .collect(),
276 }
277 }
278
279 async fn validate_transactions_with_origin(
280 &self,
281 origin: TransactionOrigin,
282 transactions: impl IntoIterator<Item = Self::Transaction> + Send,
283 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
284 self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
285 }
286
287 fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
288 where
289 B: Block,
290 {
291 self.validator.on_new_head_block(new_tip_block)
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::{
299 test_utils::MockTransaction,
300 validate::{TransactionValidationOutcome, ValidTransaction},
301 TransactionOrigin,
302 };
303 use alloy_primitives::{Address, U256};
304
305 #[derive(Debug)]
306 struct NoopValidator;
307
308 impl TransactionValidator for NoopValidator {
309 type Transaction = MockTransaction;
310
311 async fn validate_transaction(
312 &self,
313 _origin: TransactionOrigin,
314 transaction: Self::Transaction,
315 ) -> TransactionValidationOutcome<Self::Transaction> {
316 TransactionValidationOutcome::Valid {
317 balance: U256::ZERO,
318 state_nonce: 0,
319 bytecode_hash: None,
320 transaction: ValidTransaction::Valid(transaction),
321 propagate: false,
322 authorities: Some(Vec::<Address>::new()),
323 }
324 }
325 }
326
327 #[tokio::test]
328 async fn executor_new_spawns_and_validates_single() {
329 let validator = NoopValidator;
330 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
331 tokio::spawn(task.run());
332 let tx = MockTransaction::legacy();
333 let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
334 assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
335 }
336
337 #[tokio::test]
338 async fn executor_new_spawns_and_validates_batch() {
339 let validator = NoopValidator;
340 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
341 tokio::spawn(task.run());
342 let txs = vec![
343 (TransactionOrigin::External, MockTransaction::legacy()),
344 (TransactionOrigin::Local, MockTransaction::legacy()),
345 ];
346 let out = executor.validate_transactions(txs).await;
347 assert_eq!(out.len(), 2);
348 assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
349 }
350}