1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
//! A validation service for transactions.

use crate::{
    blobstore::BlobStore,
    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
    TransactionValidator,
};
use futures_util::{lock::Mutex, StreamExt};
use reth_chainspec::ChainSpec;
use reth_primitives::SealedBlock;
use reth_provider::BlockReaderIdExt;
use reth_tasks::TaskSpawner;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
    sync,
    sync::{mpsc, oneshot},
};
use tokio_stream::wrappers::ReceiverStream;

/// Represents a future outputting unit type and is sendable.
type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Represents a stream of validation futures.
type ValidationStream = ReceiverStream<ValidationFuture>;

/// A service that performs validation jobs.
///
/// This listens for incoming validation jobs and executes them.
///
/// This should be spawned as a task: [`ValidationTask::run`]
#[derive(Clone)]
pub struct ValidationTask {
    validation_jobs: Arc<Mutex<ValidationStream>>,
}

impl ValidationTask {
    /// Creates a new clonable task pair
    pub fn new() -> (ValidationJobSender, Self) {
        let (tx, rx) = mpsc::channel(1);
        (ValidationJobSender { tx }, Self::with_receiver(rx))
    }

    /// Creates a new task with the given receiver.
    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
    }

    /// Executes all new validation jobs that come in.
    ///
    /// This will run as long as the channel is alive and is expected to be spawned as a task.
    pub async fn run(self) {
        while let Some(task) = self.validation_jobs.lock().await.next().await {
            task.await;
        }
    }
}

impl std::fmt::Debug for ValidationTask {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
    }
}

/// A sender new type for sending validation jobs to [`ValidationTask`].
#[derive(Debug)]
pub struct ValidationJobSender {
    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

impl ValidationJobSender {
    /// Sends the given job to the validation task.
    pub async fn send(
        &self,
        job: Pin<Box<dyn Future<Output = ()> + Send>>,
    ) -> Result<(), TransactionValidatorError> {
        self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
    }
}

/// A [`TransactionValidator`] implementation that validates ethereum transaction.
///
/// This validator is non-blocking, all validation work is done in a separate task.
#[derive(Debug, Clone)]
pub struct TransactionValidationTaskExecutor<V> {
    /// The validator that will validate transactions on a separate task.
    pub validator: V,
    /// The sender half to validation tasks that perform the actual validation.
    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
}

// === impl TransactionValidationTaskExecutor ===

impl TransactionValidationTaskExecutor<()> {
    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
    pub fn eth_builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
        EthTransactionValidatorBuilder::new(chain_spec)
    }
}

impl<V> TransactionValidationTaskExecutor<V> {
    /// Maps the given validator to a new type.
    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
    where
        F: FnMut(V) -> T,
    {
        TransactionValidationTaskExecutor {
            validator: f(self.validator),
            to_validation_task: self.to_validation_task,
        }
    }
}

impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
    Client: BlockReaderIdExt,
{
    /// Creates a new instance for the given [`ChainSpec`]
    ///
    /// This will spawn a single validation tasks that performs the actual validation.
    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
    pub fn eth<T, S: BlobStore>(
        client: Client,
        chain_spec: Arc<ChainSpec>,
        blob_store: S,
        tasks: T,
    ) -> Self
    where
        T: TaskSpawner,
    {
        Self::eth_with_additional_tasks(client, chain_spec, blob_store, tasks, 0)
    }

    /// Creates a new instance for the given [`ChainSpec`]
    ///
    /// By default this will enable support for:
    ///   - shanghai
    ///   - eip1559
    ///   - eip2930
    ///
    /// This will always spawn a validation task that performs the actual validation. It will spawn
    /// `num_additional_tasks` additional tasks.
    pub fn eth_with_additional_tasks<T, S: BlobStore>(
        client: Client,
        chain_spec: Arc<ChainSpec>,
        blob_store: S,
        tasks: T,
        num_additional_tasks: usize,
    ) -> Self
    where
        T: TaskSpawner,
    {
        EthTransactionValidatorBuilder::new(chain_spec)
            .with_additional_tasks(num_additional_tasks)
            .build_with_tasks::<Client, Tx, T, S>(client, tasks, blob_store)
    }
}

impl<V> TransactionValidationTaskExecutor<V> {
    /// Creates a new executor instance with the given validator for transaction validation.
    ///
    /// Initializes the executor with the provided validator and sets up communication for
    /// validation tasks.
    pub fn new(validator: V) -> Self {
        let (tx, _) = ValidationTask::new();
        Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
    }
}

impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
where
    V: TransactionValidator + Clone + 'static,
{
    type Transaction = <V as TransactionValidator>::Transaction;

    async fn validate_transaction(
        &self,
        origin: TransactionOrigin,
        transaction: Self::Transaction,
    ) -> TransactionValidationOutcome<Self::Transaction> {
        let hash = *transaction.hash();
        let (tx, rx) = oneshot::channel();
        {
            let res = {
                let to_validation_task = self.to_validation_task.clone();
                let to_validation_task = to_validation_task.lock().await;
                let validator = self.validator.clone();
                to_validation_task
                    .send(Box::pin(async move {
                        let res = validator.validate_transaction(origin, transaction).await;
                        let _ = tx.send(res);
                    }))
                    .await
            };
            if res.is_err() {
                return TransactionValidationOutcome::Error(
                    hash,
                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
                )
            }
        }

        match rx.await {
            Ok(res) => res,
            Err(_) => TransactionValidationOutcome::Error(
                hash,
                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
            ),
        }
    }

    fn on_new_head_block(&self, new_tip_block: &SealedBlock) {
        self.validator.on_new_head_block(new_tip_block)
    }
}