reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 metrics::TxPoolValidatorMetrics,
6 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8 TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
12use reth_evm::ConfigureEvm;
13use reth_primitives_traits::{HeaderTy, SealedBlock};
14use reth_storage_api::BlockReaderIdExt;
15use reth_tasks::TaskSpawner;
16use std::{future::Future, pin::Pin, sync::Arc};
17use tokio::{
18 sync,
19 sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::ReceiverStream;
22
23type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
25
26type ValidationStream = ReceiverStream<ValidationFuture>;
28
29#[derive(Clone)]
35pub struct ValidationTask {
36 validation_jobs: Arc<Mutex<ValidationStream>>,
37}
38
39impl ValidationTask {
40 pub fn new() -> (ValidationJobSender, Self) {
44 Self::with_capacity(1)
45 }
46
47 pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
49 let (tx, rx) = mpsc::channel(capacity);
50 let metrics = TxPoolValidatorMetrics::default();
51 (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
52 }
53
54 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
56 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
57 }
58
59 pub async fn run(self) {
63 while let Some(task) = self.validation_jobs.lock().await.next().await {
64 task.await;
65 }
66 }
67}
68
69impl std::fmt::Debug for ValidationTask {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
72 }
73}
74
75#[derive(Debug)]
77pub struct ValidationJobSender {
78 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
79 metrics: TxPoolValidatorMetrics,
80}
81
82impl ValidationJobSender {
83 pub async fn send(
85 &self,
86 job: Pin<Box<dyn Future<Output = ()> + Send>>,
87 ) -> Result<(), TransactionValidatorError> {
88 self.metrics.inflight_validation_jobs.increment(1);
89 let res = self
90 .tx
91 .send(job)
92 .await
93 .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
94 self.metrics.inflight_validation_jobs.decrement(1);
95 res
96 }
97}
98
99#[derive(Debug)]
102pub struct TransactionValidationTaskExecutor<V> {
103 pub validator: Arc<V>,
105 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
107}
108
109impl<V> Clone for TransactionValidationTaskExecutor<V> {
110 fn clone(&self) -> Self {
111 Self {
112 validator: self.validator.clone(),
113 to_validation_task: self.to_validation_task.clone(),
114 }
115 }
116}
117
118impl TransactionValidationTaskExecutor<()> {
121 pub fn eth_builder<Client, Evm>(
123 client: Client,
124 evm_config: Evm,
125 ) -> EthTransactionValidatorBuilder<Client, Evm>
126 where
127 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
128 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
129 Evm: ConfigureEvm,
130 {
131 EthTransactionValidatorBuilder::new(client, evm_config)
132 }
133}
134
135impl<V> TransactionValidationTaskExecutor<V> {
136 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
138 where
139 F: FnMut(V) -> T,
140 {
141 TransactionValidationTaskExecutor {
142 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
143 to_validation_task: self.to_validation_task,
144 }
145 }
146
147 pub fn validator(&self) -> &V {
149 &self.validator
150 }
151}
152
153impl<Client, Tx, Evm> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx, Evm>> {
154 pub fn eth<T, S: BlobStore>(client: Client, evm_config: Evm, blob_store: S, tasks: T) -> Self
159 where
160 T: TaskSpawner,
161 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
162 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
163 Evm: ConfigureEvm,
164 {
165 Self::eth_with_additional_tasks(client, evm_config, blob_store, tasks, 0)
166 }
167
168 pub fn eth_with_additional_tasks<T, S: BlobStore>(
178 client: Client,
179 evm_config: Evm,
180 blob_store: S,
181 tasks: T,
182 num_additional_tasks: usize,
183 ) -> Self
184 where
185 T: TaskSpawner,
186 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
187 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
188 Evm: ConfigureEvm,
189 {
190 EthTransactionValidatorBuilder::new(client, evm_config)
191 .with_additional_tasks(num_additional_tasks)
192 .build_with_tasks(tasks, blob_store)
193 }
194}
195
196impl<V> TransactionValidationTaskExecutor<V> {
197 pub fn new(validator: V) -> (Self, ValidationTask) {
202 let (tx, task) = ValidationTask::new();
203 (
204 Self {
205 validator: Arc::new(validator),
206 to_validation_task: Arc::new(sync::Mutex::new(tx)),
207 },
208 task,
209 )
210 }
211}
212
213impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
214where
215 V: TransactionValidator + 'static,
216{
217 type Transaction = <V as TransactionValidator>::Transaction;
218 type Block = V::Block;
219
220 async fn validate_transaction(
221 &self,
222 origin: TransactionOrigin,
223 transaction: Self::Transaction,
224 ) -> TransactionValidationOutcome<Self::Transaction> {
225 let hash = *transaction.hash();
226 let (tx, rx) = oneshot::channel();
227 {
228 let res = {
229 let to_validation_task = self.to_validation_task.clone();
230 let validator = self.validator.clone();
231 let fut = Box::pin(async move {
232 let res = validator.validate_transaction(origin, transaction).await;
233 let _ = tx.send(res);
234 });
235 let to_validation_task = to_validation_task.lock().await;
236 to_validation_task.send(fut).await
237 };
238 if res.is_err() {
239 return TransactionValidationOutcome::Error(
240 hash,
241 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
242 );
243 }
244 }
245
246 match rx.await {
247 Ok(res) => res,
248 Err(_) => TransactionValidationOutcome::Error(
249 hash,
250 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
251 ),
252 }
253 }
254
255 async fn validate_transactions(
256 &self,
257 transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction), IntoIter: Send>
258 + Send,
259 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
260 let transactions: Vec<_> = transactions.into_iter().collect();
261 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
262 let (tx, rx) = oneshot::channel();
263 {
264 let res = {
265 let to_validation_task = self.to_validation_task.clone();
266 let validator = self.validator.clone();
267 let fut = Box::pin(async move {
268 let res = validator.validate_transactions(transactions).await;
269 let _ = tx.send(res);
270 });
271 let to_validation_task = to_validation_task.lock().await;
272 to_validation_task.send(fut).await
273 };
274 if res.is_err() {
275 return hashes
276 .into_iter()
277 .map(|hash| {
278 TransactionValidationOutcome::Error(
279 hash,
280 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
281 )
282 })
283 .collect();
284 }
285 }
286 match rx.await {
287 Ok(res) => res,
288 Err(_) => hashes
289 .into_iter()
290 .map(|hash| {
291 TransactionValidationOutcome::Error(
292 hash,
293 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
294 )
295 })
296 .collect(),
297 }
298 }
299
300 fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
301 self.validator.on_new_head_block(new_tip_block)
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::{
309 test_utils::MockTransaction,
310 validate::{TransactionValidationOutcome, ValidTransaction},
311 TransactionOrigin,
312 };
313 use alloy_primitives::{Address, U256};
314
315 #[derive(Debug)]
316 struct NoopValidator;
317
318 impl TransactionValidator for NoopValidator {
319 type Transaction = MockTransaction;
320 type Block = reth_ethereum_primitives::Block;
321
322 async fn validate_transaction(
323 &self,
324 _origin: TransactionOrigin,
325 transaction: Self::Transaction,
326 ) -> TransactionValidationOutcome<Self::Transaction> {
327 TransactionValidationOutcome::Valid {
328 balance: U256::ZERO,
329 state_nonce: 0,
330 bytecode_hash: None,
331 transaction: ValidTransaction::Valid(transaction),
332 propagate: false,
333 authorities: Some(Vec::<Address>::new()),
334 }
335 }
336 }
337
338 #[tokio::test]
339 async fn executor_new_spawns_and_validates_single() {
340 let validator = NoopValidator;
341 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
342 tokio::spawn(task.run());
343 let tx = MockTransaction::legacy();
344 let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
345 assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
346 }
347
348 #[tokio::test]
349 async fn executor_new_spawns_and_validates_batch() {
350 let validator = NoopValidator;
351 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
352 tokio::spawn(task.run());
353 let txs = vec![
354 (TransactionOrigin::External, MockTransaction::legacy()),
355 (TransactionOrigin::Local, MockTransaction::legacy()),
356 ];
357 let out = executor.validate_transactions(txs).await;
358 assert_eq!(out.len(), 2);
359 assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
360 }
361}