Skip to main content

reth_engine_tree/tree/payload_processor/bal/
worker.rs

1use super::BalExecutionError;
2use alloy_consensus::Transaction;
3use alloy_eip7928::BlockAccessIndex;
4use alloy_evm::{
5    block::{BlockExecutionError, BlockExecutor, BlockExecutorFactory},
6    Evm,
7};
8use alloy_primitives::Address;
9use crossbeam_channel::{Receiver, Sender};
10use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Database, EvmEnvFor, ExecutionCtxFor};
11use revm::database::State;
12use revm_state::bal::Bal as RevmBal;
13use std::sync::Arc;
14
15#[derive(Debug, thiserror::Error)]
16pub(super) enum BalWorkerError {
17    /// Worker state or provider setup failed.
18    #[error("BAL worker setup failed: {0}")]
19    Setup(#[source] BalExecutionError),
20    /// Transaction recovery or conversion failed before EVM execution.
21    #[error("BAL worker transaction conversion failed: {0}")]
22    Transaction(Box<dyn core::error::Error + Send + Sync + 'static>),
23    /// EVM transaction execution failed.
24    #[error("BAL worker EVM execution failed: {0}")]
25    Execution(BlockExecutionError),
26}
27
28impl From<BalWorkerError> for BalExecutionError {
29    fn from(err: BalWorkerError) -> Self {
30        match err {
31            BalWorkerError::Setup(err) => err,
32            BalWorkerError::Transaction(err) => Self::Other(err),
33            BalWorkerError::Execution(err) => Self::Execution(err),
34        }
35    }
36}
37
38pub(super) struct BalWorkerOutput<R> {
39    pub(super) index: usize,
40    pub(super) signer: Address,
41    pub(super) tx_gas_limit: u64,
42    pub(super) result: R,
43}
44
45type WorkerExecutorResult<Cfg> =
46    <<Cfg as ConfigureEvm>::BlockExecutorFactory as BlockExecutorFactory>::TxExecutionResult;
47
48type WorkerResultSender<Cfg> =
49    Sender<Result<BalWorkerOutput<WorkerExecutorResult<Cfg>>, BalWorkerError>>;
50
51#[expect(clippy::too_many_arguments)]
52pub(super) fn spawn_worker<'scope, Evm, Tx, Err, DB, MakeDb>(
53    scope: &rayon::Scope<'scope>,
54    tx_rx: Receiver<(usize, Result<Tx, Err>)>,
55    abort_rx: Receiver<()>,
56    result_tx: WorkerResultSender<Evm>,
57    evm_config: &'scope Evm,
58    make_db: &'scope MakeDb,
59    received_bal_revm: Arc<RevmBal>,
60    evm_env: EvmEnvFor<Evm>,
61    ctx: ExecutionCtxFor<'scope, Evm>,
62) where
63    Evm: ConfigureEvm + 'scope,
64    Tx: ExecutableTxFor<Evm> + Send + 'scope,
65    Err: core::error::Error + Send + Sync + 'static,
66    DB: Database + Send + 'scope,
67    MakeDb: Fn(bool) -> Result<DB, BalExecutionError> + Sync + 'scope,
68{
69    scope.spawn(move |_| {
70        let worker_result = (|| -> Result<(), BalWorkerError> {
71            // Create a database with fill_on_miss=true ensuring misses
72            // are inserted for the other workers.
73            let database = make_db(true).map_err(BalWorkerError::Setup)?;
74            let mut worker_state = State::builder()
75                .with_database(database)
76                .with_bal(received_bal_revm)
77                .with_bundle_update()
78                .build();
79            let evm = evm_config.evm_with_env(&mut worker_state, evm_env);
80            let mut executor = evm_config.create_executor_with_state(evm, ctx.clone());
81
82            loop {
83                let (index, tx) = crossbeam_channel::select_biased! {
84                    recv(abort_rx) -> _ => break,
85                    recv(tx_rx) -> msg => match msg {
86                        Ok(ix_tx) => ix_tx,
87                        Err(_) => break,
88                    },
89                };
90                let tx = tx.map_err(|e| BalWorkerError::Transaction(Box::new(e)))?;
91                let signer = *tx.signer();
92                let tx_gas_limit = tx.tx().gas_limit();
93
94                executor.evm_mut().db_mut().set_bal_index(BlockAccessIndex::new(index as u64 + 1));
95                let result = executor
96                    .execute_transaction_without_commit(tx)
97                    .map_err(BalWorkerError::Execution)?;
98
99                if result_tx
100                    .send(Ok(BalWorkerOutput { index, signer, tx_gas_limit, result }))
101                    .is_err()
102                {
103                    break;
104                }
105            }
106
107            Ok(())
108        })();
109
110        if let Err(err) = worker_result {
111            let _ = result_tx.send(Err(err));
112        }
113    });
114}