reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 metrics::TxPoolValidatorMetrics,
6 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8 TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
12use reth_evm::ConfigureEvm;
13use reth_primitives_traits::{HeaderTy, SealedBlock};
14use reth_storage_api::BlockReaderIdExt;
15use reth_tasks::Runtime;
16use std::{future::Future, pin::Pin, sync::Arc};
17use tokio::{
18 sync,
19 sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::ReceiverStream;
22
23type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
25
26type ValidationStream = ReceiverStream<ValidationFuture>;
28
29#[derive(Clone)]
35pub struct ValidationTask {
36 validation_jobs: Arc<Mutex<ValidationStream>>,
37}
38
39impl ValidationTask {
40 pub fn new() -> (ValidationJobSender, Self) {
44 Self::with_capacity(1)
45 }
46
47 pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
49 let (tx, rx) = mpsc::channel(capacity);
50 let metrics = TxPoolValidatorMetrics::default();
51 (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
52 }
53
54 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
56 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
57 }
58
59 pub async fn run(self) {
63 while let Some(task) = self.validation_jobs.lock().await.next().await {
64 task.await;
65 }
66 }
67}
68
69impl std::fmt::Debug for ValidationTask {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
72 }
73}
74
75#[derive(Debug)]
77pub struct ValidationJobSender {
78 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
79 metrics: TxPoolValidatorMetrics,
80}
81
82impl ValidationJobSender {
83 pub async fn send(
85 &self,
86 job: Pin<Box<dyn Future<Output = ()> + Send>>,
87 ) -> Result<(), TransactionValidatorError> {
88 self.metrics.inflight_validation_jobs.increment(1);
89 let res = self
90 .tx
91 .send(job)
92 .await
93 .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
94 self.metrics.inflight_validation_jobs.decrement(1);
95 res
96 }
97}
98
99#[derive(Debug)]
102pub struct TransactionValidationTaskExecutor<V> {
103 pub validator: Arc<V>,
105 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
107}
108
109impl<V> Clone for TransactionValidationTaskExecutor<V> {
110 fn clone(&self) -> Self {
111 Self {
112 validator: self.validator.clone(),
113 to_validation_task: self.to_validation_task.clone(),
114 }
115 }
116}
117
118impl TransactionValidationTaskExecutor<()> {
121 pub fn eth_builder<Client, Evm>(
123 client: Client,
124 evm_config: Evm,
125 ) -> EthTransactionValidatorBuilder<Client, Evm>
126 where
127 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
128 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
129 Evm: ConfigureEvm,
130 {
131 EthTransactionValidatorBuilder::new(client, evm_config)
132 }
133}
134
135impl<V> TransactionValidationTaskExecutor<V> {
136 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
138 where
139 F: FnMut(V) -> T,
140 {
141 TransactionValidationTaskExecutor {
142 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
143 to_validation_task: self.to_validation_task,
144 }
145 }
146
147 pub fn validator(&self) -> &V {
149 &self.validator
150 }
151}
152
153impl<Client, Tx, Evm> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx, Evm>> {
154 pub fn eth<S: BlobStore>(client: Client, evm_config: Evm, blob_store: S, tasks: Runtime) -> Self
159 where
160 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
161 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
162 Evm: ConfigureEvm,
163 {
164 Self::eth_with_additional_tasks(client, evm_config, blob_store, tasks, 0)
165 }
166
167 pub fn eth_with_additional_tasks<S: BlobStore>(
177 client: Client,
178 evm_config: Evm,
179 blob_store: S,
180 tasks: Runtime,
181 num_additional_tasks: usize,
182 ) -> Self
183 where
184 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
185 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
186 Evm: ConfigureEvm,
187 {
188 EthTransactionValidatorBuilder::new(client, evm_config)
189 .with_additional_tasks(num_additional_tasks)
190 .build_with_tasks(tasks, blob_store)
191 }
192}
193
194impl<V> TransactionValidationTaskExecutor<V> {
195 pub fn new(validator: V) -> (Self, ValidationTask) {
200 let (tx, task) = ValidationTask::new();
201 (
202 Self {
203 validator: Arc::new(validator),
204 to_validation_task: Arc::new(sync::Mutex::new(tx)),
205 },
206 task,
207 )
208 }
209}
210
211impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
212where
213 V: TransactionValidator + 'static,
214{
215 type Transaction = <V as TransactionValidator>::Transaction;
216 type Block = V::Block;
217
218 async fn validate_transaction(
219 &self,
220 origin: TransactionOrigin,
221 transaction: Self::Transaction,
222 ) -> TransactionValidationOutcome<Self::Transaction> {
223 let hash = *transaction.hash();
224 let (tx, rx) = oneshot::channel();
225 {
226 let res = {
227 let to_validation_task = self.to_validation_task.clone();
228 let validator = self.validator.clone();
229 let fut = Box::pin(async move {
230 let res = validator.validate_transaction(origin, transaction).await;
231 let _ = tx.send(res);
232 });
233 let to_validation_task = to_validation_task.lock().await;
234 to_validation_task.send(fut).await
235 };
236 if res.is_err() {
237 return TransactionValidationOutcome::Error(
238 hash,
239 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
240 );
241 }
242 }
243
244 match rx.await {
245 Ok(res) => res,
246 Err(_) => TransactionValidationOutcome::Error(
247 hash,
248 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
249 ),
250 }
251 }
252
253 async fn validate_transactions(
254 &self,
255 transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction), IntoIter: Send>
256 + Send,
257 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
258 let transactions: Vec<_> = transactions.into_iter().collect();
259 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
260 let (tx, rx) = oneshot::channel();
261 {
262 let res = {
263 let to_validation_task = self.to_validation_task.clone();
264 let validator = self.validator.clone();
265 let fut = Box::pin(async move {
266 let res = validator.validate_transactions(transactions).await;
267 let _ = tx.send(res);
268 });
269 let to_validation_task = to_validation_task.lock().await;
270 to_validation_task.send(fut).await
271 };
272 if res.is_err() {
273 return hashes
274 .into_iter()
275 .map(|hash| {
276 TransactionValidationOutcome::Error(
277 hash,
278 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
279 )
280 })
281 .collect();
282 }
283 }
284 match rx.await {
285 Ok(res) => res,
286 Err(_) => hashes
287 .into_iter()
288 .map(|hash| {
289 TransactionValidationOutcome::Error(
290 hash,
291 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
292 )
293 })
294 .collect(),
295 }
296 }
297
298 fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
299 self.validator.on_new_head_block(new_tip_block)
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::{
307 test_utils::MockTransaction,
308 validate::{TransactionValidationOutcome, ValidTransaction},
309 TransactionOrigin,
310 };
311 use alloy_primitives::{Address, U256};
312
313 #[derive(Debug)]
314 struct NoopValidator;
315
316 impl TransactionValidator for NoopValidator {
317 type Transaction = MockTransaction;
318 type Block = reth_ethereum_primitives::Block;
319
320 async fn validate_transaction(
321 &self,
322 _origin: TransactionOrigin,
323 transaction: Self::Transaction,
324 ) -> TransactionValidationOutcome<Self::Transaction> {
325 TransactionValidationOutcome::Valid {
326 balance: U256::ZERO,
327 state_nonce: 0,
328 bytecode_hash: None,
329 transaction: ValidTransaction::Valid(transaction),
330 propagate: false,
331 authorities: Some(Vec::<Address>::new()),
332 }
333 }
334 }
335
336 #[tokio::test]
337 async fn executor_new_spawns_and_validates_single() {
338 let validator = NoopValidator;
339 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
340 tokio::spawn(task.run());
341 let tx = MockTransaction::legacy();
342 let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
343 assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
344 }
345
346 #[tokio::test]
347 async fn executor_new_spawns_and_validates_batch() {
348 let validator = NoopValidator;
349 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
350 tokio::spawn(task.run());
351 let txs = vec![
352 (TransactionOrigin::External, MockTransaction::legacy()),
353 (TransactionOrigin::Local, MockTransaction::legacy()),
354 ];
355 let out = executor.validate_transactions(txs).await;
356 assert_eq!(out.len(), 2);
357 assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
358 }
359}