reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 metrics::TxPoolValidatorMetrics,
6 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8 TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
12use reth_evm::ConfigureEvm;
13use reth_primitives_traits::{HeaderTy, SealedBlock};
14use reth_storage_api::BlockReaderIdExt;
15use reth_tasks::Runtime;
16use std::{future::Future, pin::Pin, sync::Arc};
17use tokio::{
18 sync,
19 sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::ReceiverStream;
22
23type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
25
26type ValidationStream = ReceiverStream<ValidationFuture>;
28
29#[derive(Clone)]
35pub struct ValidationTask {
36 validation_jobs: Arc<Mutex<ValidationStream>>,
37}
38
39impl ValidationTask {
40 pub fn new() -> (ValidationJobSender, Self) {
44 Self::with_capacity(1)
45 }
46
47 pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
49 let (tx, rx) = mpsc::channel(capacity);
50 let metrics = TxPoolValidatorMetrics::default();
51 (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
52 }
53
54 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
56 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
57 }
58
59 pub async fn run(self) {
63 while let Some(task) = self.validation_jobs.lock().await.next().await {
64 task.await;
65 }
66 }
67}
68
69impl std::fmt::Debug for ValidationTask {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
72 }
73}
74
75#[derive(Debug)]
77pub struct ValidationJobSender {
78 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
79 metrics: TxPoolValidatorMetrics,
80}
81
82impl ValidationJobSender {
83 pub async fn send(
85 &self,
86 job: Pin<Box<dyn Future<Output = ()> + Send>>,
87 ) -> Result<(), TransactionValidatorError> {
88 self.metrics.inflight_validation_jobs.increment(1);
89 let res = self
90 .tx
91 .send(job)
92 .await
93 .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
94 self.metrics.inflight_validation_jobs.decrement(1);
95 res
96 }
97}
98
99#[derive(Debug)]
102pub struct TransactionValidationTaskExecutor<V> {
103 pub validator: Arc<V>,
105 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
107}
108
109impl<V> Clone for TransactionValidationTaskExecutor<V> {
110 fn clone(&self) -> Self {
111 Self {
112 validator: self.validator.clone(),
113 to_validation_task: self.to_validation_task.clone(),
114 }
115 }
116}
117
118impl TransactionValidationTaskExecutor<()> {
121 pub fn eth_builder<Client, Evm>(
123 client: Client,
124 evm_config: Evm,
125 ) -> EthTransactionValidatorBuilder<Client, Evm>
126 where
127 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
128 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
129 Evm: ConfigureEvm,
130 {
131 EthTransactionValidatorBuilder::new(client, evm_config)
132 }
133}
134
135impl<V> TransactionValidationTaskExecutor<V> {
136 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
138 where
139 F: FnMut(V) -> T,
140 {
141 TransactionValidationTaskExecutor {
142 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
143 to_validation_task: self.to_validation_task,
144 }
145 }
146
147 pub fn validator(&self) -> &V {
149 &self.validator
150 }
151}
152
153impl<Client, Tx, Evm> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx, Evm>> {
154 pub fn eth<S: BlobStore>(client: Client, evm_config: Evm, blob_store: S, tasks: Runtime) -> Self
159 where
160 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
161 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
162 Evm: ConfigureEvm,
163 {
164 Self::eth_with_additional_tasks(client, evm_config, blob_store, tasks, 0)
165 }
166
167 pub fn eth_with_additional_tasks<S: BlobStore>(
177 client: Client,
178 evm_config: Evm,
179 blob_store: S,
180 tasks: Runtime,
181 num_additional_tasks: usize,
182 ) -> Self
183 where
184 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
185 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>,
186 Evm: ConfigureEvm,
187 {
188 EthTransactionValidatorBuilder::new(client, evm_config)
189 .with_additional_tasks(num_additional_tasks)
190 .build_with_tasks(tasks, blob_store)
191 }
192}
193
194impl<V> TransactionValidationTaskExecutor<V> {
195 pub fn new(validator: V) -> (Self, ValidationTask) {
200 let (tx, task) = ValidationTask::new();
201 (
202 Self {
203 validator: Arc::new(validator),
204 to_validation_task: Arc::new(sync::Mutex::new(tx)),
205 },
206 task,
207 )
208 }
209
210 pub fn spawn(validator: V, tasks: &Runtime, additional_tasks: usize) -> Self {
215 let (tx, task) = ValidationTask::new();
216
217 for _ in 0..additional_tasks {
218 let task = task.clone();
219 tasks.spawn_blocking_task(async move {
220 task.run().await;
221 });
222 }
223
224 tasks.spawn_critical_blocking_task("transaction-validation-service", async move {
225 task.run().await;
226 });
227
228 Self { validator: Arc::new(validator), to_validation_task: Arc::new(sync::Mutex::new(tx)) }
229 }
230}
231
232impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
233where
234 V: TransactionValidator + 'static,
235{
236 type Transaction = <V as TransactionValidator>::Transaction;
237 type Block = V::Block;
238
239 async fn validate_transaction(
240 &self,
241 origin: TransactionOrigin,
242 transaction: Self::Transaction,
243 ) -> TransactionValidationOutcome<Self::Transaction> {
244 let hash = *transaction.hash();
245 let (tx, rx) = oneshot::channel();
246 {
247 let res = {
248 let to_validation_task = self.to_validation_task.clone();
249 let validator = self.validator.clone();
250 let fut = Box::pin(async move {
251 let res = validator.validate_transaction(origin, transaction).await;
252 let _ = tx.send(res);
253 });
254 let to_validation_task = to_validation_task.lock().await;
255 to_validation_task.send(fut).await
256 };
257 if res.is_err() {
258 return TransactionValidationOutcome::Error(
259 hash,
260 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
261 );
262 }
263 }
264
265 match rx.await {
266 Ok(res) => res,
267 Err(_) => TransactionValidationOutcome::Error(
268 hash,
269 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
270 ),
271 }
272 }
273
274 async fn validate_transactions(
275 &self,
276 transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction), IntoIter: Send>
277 + Send,
278 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
279 let transactions: Vec<_> = transactions.into_iter().collect();
280 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
281 let (tx, rx) = oneshot::channel();
282 {
283 let res = {
284 let to_validation_task = self.to_validation_task.clone();
285 let validator = self.validator.clone();
286 let fut = Box::pin(async move {
287 let res = validator.validate_transactions(transactions).await;
288 let _ = tx.send(res);
289 });
290 let to_validation_task = to_validation_task.lock().await;
291 to_validation_task.send(fut).await
292 };
293 if res.is_err() {
294 return hashes
295 .into_iter()
296 .map(|hash| {
297 TransactionValidationOutcome::Error(
298 hash,
299 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
300 )
301 })
302 .collect();
303 }
304 }
305 match rx.await {
306 Ok(res) => res,
307 Err(_) => hashes
308 .into_iter()
309 .map(|hash| {
310 TransactionValidationOutcome::Error(
311 hash,
312 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
313 )
314 })
315 .collect(),
316 }
317 }
318
319 fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
320 self.validator.on_new_head_block(new_tip_block)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use crate::{
328 test_utils::MockTransaction,
329 validate::{TransactionValidationOutcome, ValidTransaction},
330 TransactionOrigin,
331 };
332 use alloy_primitives::{Address, U256};
333
334 #[derive(Debug)]
335 struct NoopValidator;
336
337 impl TransactionValidator for NoopValidator {
338 type Transaction = MockTransaction;
339 type Block = reth_ethereum_primitives::Block;
340
341 async fn validate_transaction(
342 &self,
343 _origin: TransactionOrigin,
344 transaction: Self::Transaction,
345 ) -> TransactionValidationOutcome<Self::Transaction> {
346 TransactionValidationOutcome::Valid {
347 balance: U256::ZERO,
348 state_nonce: 0,
349 bytecode_hash: None,
350 transaction: ValidTransaction::Valid(transaction),
351 propagate: false,
352 authorities: Some(Vec::<Address>::new()),
353 }
354 }
355 }
356
357 #[tokio::test]
358 async fn executor_new_spawns_and_validates_single() {
359 let validator = NoopValidator;
360 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
361 tokio::spawn(task.run());
362 let tx = MockTransaction::legacy();
363 let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
364 assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
365 }
366
367 #[tokio::test]
368 async fn executor_new_spawns_and_validates_batch() {
369 let validator = NoopValidator;
370 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
371 tokio::spawn(task.run());
372 let txs = vec![
373 (TransactionOrigin::External, MockTransaction::legacy()),
374 (TransactionOrigin::Local, MockTransaction::legacy()),
375 ];
376 let out = executor.validate_transactions(txs).await;
377 assert_eq!(out.len(), 2);
378 assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
379 }
380}