reth_transaction_pool/validate/
task.rsuse crate::{
blobstore::BlobStore,
validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
TransactionValidator,
};
use futures_util::{lock::Mutex, StreamExt};
use reth_chainspec::ChainSpec;
use reth_primitives::SealedBlock;
use reth_tasks::TaskSpawner;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
sync,
sync::{mpsc, oneshot},
};
use tokio_stream::wrappers::ReceiverStream;
type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
type ValidationStream = ReceiverStream<ValidationFuture>;
#[derive(Clone)]
pub struct ValidationTask {
validation_jobs: Arc<Mutex<ValidationStream>>,
}
impl ValidationTask {
pub fn new() -> (ValidationJobSender, Self) {
let (tx, rx) = mpsc::channel(1);
(ValidationJobSender { tx }, Self::with_receiver(rx))
}
pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
}
pub async fn run(self) {
while let Some(task) = self.validation_jobs.lock().await.next().await {
task.await;
}
}
}
impl std::fmt::Debug for ValidationTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
}
}
#[derive(Debug)]
pub struct ValidationJobSender {
tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl ValidationJobSender {
pub async fn send(
&self,
job: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), TransactionValidatorError> {
self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
}
}
#[derive(Debug, Clone)]
pub struct TransactionValidationTaskExecutor<V> {
pub validator: V,
pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
}
impl TransactionValidationTaskExecutor<()> {
pub fn eth_builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
EthTransactionValidatorBuilder::new(chain_spec)
}
}
impl<V> TransactionValidationTaskExecutor<V> {
pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
where
F: FnMut(V) -> T,
{
TransactionValidationTaskExecutor {
validator: f(self.validator),
to_validation_task: self.to_validation_task,
}
}
}
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
pub fn eth<T, S: BlobStore>(
client: Client,
chain_spec: Arc<ChainSpec>,
blob_store: S,
tasks: T,
) -> Self
where
T: TaskSpawner,
{
Self::eth_with_additional_tasks(client, chain_spec, blob_store, tasks, 0)
}
pub fn eth_with_additional_tasks<T, S: BlobStore>(
client: Client,
chain_spec: Arc<ChainSpec>,
blob_store: S,
tasks: T,
num_additional_tasks: usize,
) -> Self
where
T: TaskSpawner,
{
EthTransactionValidatorBuilder::new(chain_spec)
.with_additional_tasks(num_additional_tasks)
.build_with_tasks::<Client, Tx, T, S>(client, tasks, blob_store)
}
}
impl<V> TransactionValidationTaskExecutor<V> {
pub fn new(validator: V) -> Self {
let (tx, _) = ValidationTask::new();
Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
}
}
impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
where
V: TransactionValidator + Clone + 'static,
{
type Transaction = <V as TransactionValidator>::Transaction;
async fn validate_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> TransactionValidationOutcome<Self::Transaction> {
let hash = *transaction.hash();
let (tx, rx) = oneshot::channel();
{
let res = {
let to_validation_task = self.to_validation_task.clone();
let to_validation_task = to_validation_task.lock().await;
let validator = self.validator.clone();
to_validation_task
.send(Box::pin(async move {
let res = validator.validate_transaction(origin, transaction).await;
let _ = tx.send(res);
}))
.await
};
if res.is_err() {
return TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
}
}
match rx.await {
Ok(res) => res,
Err(_) => TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
),
}
}
fn on_new_head_block(&self, new_tip_block: &SealedBlock) {
self.validator.on_new_head_block(new_tip_block)
}
}