reth_transaction_pool/validate/
task.rs
1use crate::{
4 blobstore::BlobStore,
5 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
6 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
7 TransactionValidator,
8};
9use futures_util::{lock::Mutex, StreamExt};
10use reth_primitives_traits::{Block, SealedBlock};
11use reth_tasks::TaskSpawner;
12use std::{future::Future, pin::Pin, sync::Arc};
13use tokio::{
14 sync,
15 sync::{mpsc, oneshot},
16};
17use tokio_stream::wrappers::ReceiverStream;
18
19type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
21
22type ValidationStream = ReceiverStream<ValidationFuture>;
24
25#[derive(Clone)]
31pub struct ValidationTask {
32 validation_jobs: Arc<Mutex<ValidationStream>>,
33}
34
35impl ValidationTask {
36 pub fn new() -> (ValidationJobSender, Self) {
38 let (tx, rx) = mpsc::channel(1);
39 (ValidationJobSender { tx }, Self::with_receiver(rx))
40 }
41
42 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
44 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
45 }
46
47 pub async fn run(self) {
51 while let Some(task) = self.validation_jobs.lock().await.next().await {
52 task.await;
53 }
54 }
55}
56
57impl std::fmt::Debug for ValidationTask {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
60 }
61}
62
63#[derive(Debug)]
65pub struct ValidationJobSender {
66 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
67}
68
69impl ValidationJobSender {
70 pub async fn send(
72 &self,
73 job: Pin<Box<dyn Future<Output = ()> + Send>>,
74 ) -> Result<(), TransactionValidatorError> {
75 self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
76 }
77}
78
79#[derive(Debug, Clone)]
82pub struct TransactionValidationTaskExecutor<V> {
83 pub validator: V,
85 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
87}
88
89impl TransactionValidationTaskExecutor<()> {
92 pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
94 EthTransactionValidatorBuilder::new(client)
95 }
96}
97
98impl<V> TransactionValidationTaskExecutor<V> {
99 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
101 where
102 F: FnMut(V) -> T,
103 {
104 TransactionValidationTaskExecutor {
105 validator: f(self.validator),
106 to_validation_task: self.to_validation_task,
107 }
108 }
109}
110
111impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
112 pub fn eth<T, S: BlobStore>(client: Client, blob_store: S, tasks: T) -> Self
117 where
118 T: TaskSpawner,
119 {
120 Self::eth_with_additional_tasks(client, blob_store, tasks, 0)
121 }
122
123 pub fn eth_with_additional_tasks<T, S: BlobStore>(
133 client: Client,
134 blob_store: S,
135 tasks: T,
136 num_additional_tasks: usize,
137 ) -> Self
138 where
139 T: TaskSpawner,
140 {
141 EthTransactionValidatorBuilder::new(client)
142 .with_additional_tasks(num_additional_tasks)
143 .build_with_tasks::<Tx, T, S>(tasks, blob_store)
144 }
145}
146
147impl<V> TransactionValidationTaskExecutor<V> {
148 pub fn new(validator: V) -> Self {
153 let (tx, _) = ValidationTask::new();
154 Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
155 }
156}
157
158impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
159where
160 V: TransactionValidator + Clone + 'static,
161{
162 type Transaction = <V as TransactionValidator>::Transaction;
163
164 async fn validate_transaction(
165 &self,
166 origin: TransactionOrigin,
167 transaction: Self::Transaction,
168 ) -> TransactionValidationOutcome<Self::Transaction> {
169 let hash = *transaction.hash();
170 let (tx, rx) = oneshot::channel();
171 {
172 let res = {
173 let to_validation_task = self.to_validation_task.clone();
174 let to_validation_task = to_validation_task.lock().await;
175 let validator = self.validator.clone();
176 to_validation_task
177 .send(Box::pin(async move {
178 let res = validator.validate_transaction(origin, transaction).await;
179 let _ = tx.send(res);
180 }))
181 .await
182 };
183 if res.is_err() {
184 return TransactionValidationOutcome::Error(
185 hash,
186 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
187 )
188 }
189 }
190
191 match rx.await {
192 Ok(res) => res,
193 Err(_) => TransactionValidationOutcome::Error(
194 hash,
195 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
196 ),
197 }
198 }
199
200 fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
201 where
202 B: Block,
203 {
204 self.validator.on_new_head_block(new_tip_block)
205 }
206}