reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 metrics::TxPoolValidatorMetrics,
6 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
7 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
8 TransactionValidator,
9};
10use futures_util::{lock::Mutex, StreamExt};
11use reth_primitives_traits::SealedBlock;
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15 sync,
16 sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23type ValidationStream = ReceiverStream<ValidationFuture>;
25
26#[derive(Clone)]
32pub struct ValidationTask {
33 validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37 pub fn new() -> (ValidationJobSender, Self) {
41 Self::with_capacity(1)
42 }
43
44 pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
46 let (tx, rx) = mpsc::channel(capacity);
47 let metrics = TxPoolValidatorMetrics::default();
48 (ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
49 }
50
51 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
53 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
54 }
55
56 pub async fn run(self) {
60 while let Some(task) = self.validation_jobs.lock().await.next().await {
61 task.await;
62 }
63 }
64}
65
66impl std::fmt::Debug for ValidationTask {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
69 }
70}
71
72#[derive(Debug)]
74pub struct ValidationJobSender {
75 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
76 metrics: TxPoolValidatorMetrics,
77}
78
79impl ValidationJobSender {
80 pub async fn send(
82 &self,
83 job: Pin<Box<dyn Future<Output = ()> + Send>>,
84 ) -> Result<(), TransactionValidatorError> {
85 self.metrics.inflight_validation_jobs.increment(1);
86 let res = self
87 .tx
88 .send(job)
89 .await
90 .map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
91 self.metrics.inflight_validation_jobs.decrement(1);
92 res
93 }
94}
95
96#[derive(Debug)]
99pub struct TransactionValidationTaskExecutor<V> {
100 pub validator: Arc<V>,
102 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
104}
105
106impl<V> Clone for TransactionValidationTaskExecutor<V> {
107 fn clone(&self) -> Self {
108 Self {
109 validator: self.validator.clone(),
110 to_validation_task: self.to_validation_task.clone(),
111 }
112 }
113}
114
115impl TransactionValidationTaskExecutor<()> {
118 pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
120 EthTransactionValidatorBuilder::new(client)
121 }
122}
123
124impl<V> TransactionValidationTaskExecutor<V> {
125 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
127 where
128 F: FnMut(V) -> T,
129 {
130 TransactionValidationTaskExecutor {
131 validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
132 to_validation_task: self.to_validation_task,
133 }
134 }
135
136 pub fn validator(&self) -> &V {
138 &self.validator
139 }
140}
141
142impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
143 pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
148 where
149 T: TaskSpawner,
150 {
151 Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
152 }
153
154 pub fn eth_with_additional_tasks<T, S: BlobStore>(
164 client: Client,
165 blob_store: S,
166 tasks: T,
167 num_additional_tasks: usize,
168 ) -> Self
169 where
170 T: TaskSpawner,
171 {
172 EthTransactionValidatorBuilder::new(client)
173 .with_additional_tasks(num_additional_tasks)
174 .build_with_tasks(tasks, blob_store)
175 }
176}
177
178impl<V> TransactionValidationTaskExecutor<V> {
179 pub fn new(validator: V) -> (Self, ValidationTask) {
184 let (tx, task) = ValidationTask::new();
185 (
186 Self {
187 validator: Arc::new(validator),
188 to_validation_task: Arc::new(sync::Mutex::new(tx)),
189 },
190 task,
191 )
192 }
193}
194
195impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
196where
197 V: TransactionValidator + 'static,
198{
199 type Transaction = <V as TransactionValidator>::Transaction;
200 type Block = V::Block;
201
202 async fn validate_transaction(
203 &self,
204 origin: TransactionOrigin,
205 transaction: Self::Transaction,
206 ) -> TransactionValidationOutcome<Self::Transaction> {
207 let hash = *transaction.hash();
208 let (tx, rx) = oneshot::channel();
209 {
210 let res = {
211 let to_validation_task = self.to_validation_task.clone();
212 let validator = self.validator.clone();
213 let fut = Box::pin(async move {
214 let res = validator.validate_transaction(origin, transaction).await;
215 let _ = tx.send(res);
216 });
217 let to_validation_task = to_validation_task.lock().await;
218 to_validation_task.send(fut).await
219 };
220 if res.is_err() {
221 return TransactionValidationOutcome::Error(
222 hash,
223 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
224 );
225 }
226 }
227
228 match rx.await {
229 Ok(res) => res,
230 Err(_) => TransactionValidationOutcome::Error(
231 hash,
232 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
233 ),
234 }
235 }
236
237 async fn validate_transactions(
238 &self,
239 transactions: Vec<(TransactionOrigin, Self::Transaction)>,
240 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
241 let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
242 let (tx, rx) = oneshot::channel();
243 {
244 let res = {
245 let to_validation_task = self.to_validation_task.clone();
246 let validator = self.validator.clone();
247 let fut = Box::pin(async move {
248 let res = validator.validate_transactions(transactions).await;
249 let _ = tx.send(res);
250 });
251 let to_validation_task = to_validation_task.lock().await;
252 to_validation_task.send(fut).await
253 };
254 if res.is_err() {
255 return hashes
256 .into_iter()
257 .map(|hash| {
258 TransactionValidationOutcome::Error(
259 hash,
260 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
261 )
262 })
263 .collect();
264 }
265 }
266 match rx.await {
267 Ok(res) => res,
268 Err(_) => hashes
269 .into_iter()
270 .map(|hash| {
271 TransactionValidationOutcome::Error(
272 hash,
273 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
274 )
275 })
276 .collect(),
277 }
278 }
279
280 async fn validate_transactions_with_origin(
281 &self,
282 origin: TransactionOrigin,
283 transactions: impl IntoIterator<Item = Self::Transaction> + Send,
284 ) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
285 self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
286 }
287
288 fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
289 self.validator.on_new_head_block(new_tip_block)
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use crate::{
297 test_utils::MockTransaction,
298 validate::{TransactionValidationOutcome, ValidTransaction},
299 TransactionOrigin,
300 };
301 use alloy_primitives::{Address, U256};
302
303 #[derive(Debug)]
304 struct NoopValidator;
305
306 impl TransactionValidator for NoopValidator {
307 type Transaction = MockTransaction;
308 type Block = reth_ethereum_primitives::Block;
309
310 async fn validate_transaction(
311 &self,
312 _origin: TransactionOrigin,
313 transaction: Self::Transaction,
314 ) -> TransactionValidationOutcome<Self::Transaction> {
315 TransactionValidationOutcome::Valid {
316 balance: U256::ZERO,
317 state_nonce: 0,
318 bytecode_hash: None,
319 transaction: ValidTransaction::Valid(transaction),
320 propagate: false,
321 authorities: Some(Vec::<Address>::new()),
322 }
323 }
324 }
325
326 #[tokio::test]
327 async fn executor_new_spawns_and_validates_single() {
328 let validator = NoopValidator;
329 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
330 tokio::spawn(task.run());
331 let tx = MockTransaction::legacy();
332 let out = executor.validate_transaction(TransactionOrigin::External, tx).await;
333 assert!(matches!(out, TransactionValidationOutcome::Valid { .. }));
334 }
335
336 #[tokio::test]
337 async fn executor_new_spawns_and_validates_batch() {
338 let validator = NoopValidator;
339 let (executor, task) = TransactionValidationTaskExecutor::new(validator);
340 tokio::spawn(task.run());
341 let txs = vec![
342 (TransactionOrigin::External, MockTransaction::legacy()),
343 (TransactionOrigin::Local, MockTransaction::legacy()),
344 ];
345 let out = executor.validate_transactions(txs).await;
346 assert_eq!(out.len(), 2);
347 assert!(out.iter().all(|o| matches!(o, TransactionValidationOutcome::Valid { .. })));
348 }
349}