Skip to main content

reth_engine_tree/tree/payload_processor/bal/
worker.rs

1use super::BalExecutionError;
2use alloy_consensus::Transaction;
3use alloy_evm::{
4    block::{BlockExecutionError, BlockExecutor, BlockExecutorFactory},
5    Evm,
6};
7use alloy_primitives::Address;
8use crossbeam_channel::{Receiver, Sender};
9use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Database, EvmEnvFor, ExecutionCtxFor};
10use revm::database::State;
11use revm_state::bal::Bal as RevmBal;
12use std::sync::Arc;
13
14pub(super) struct BalWorkerOutput<R> {
15    pub(super) index: usize,
16    pub(super) signer: Address,
17    pub(super) tx_gas_limit: u64,
18    pub(super) result: R,
19}
20
21type WorkerExecutorResult<Cfg> =
22    <<Cfg as ConfigureEvm>::BlockExecutorFactory as BlockExecutorFactory>::TxExecutionResult;
23
24type WorkerResultSender<Cfg> =
25    Sender<Result<BalWorkerOutput<WorkerExecutorResult<Cfg>>, BalExecutionError>>;
26
27#[expect(clippy::too_many_arguments)]
28pub(super) fn spawn_worker<'scope, Evm, Tx, Err, DB, MakeDb>(
29    scope: &rayon::Scope<'scope>,
30    tx_rx: Receiver<(usize, Result<Tx, Err>)>,
31    abort_rx: Receiver<()>,
32    result_tx: WorkerResultSender<Evm>,
33    evm_config: &'scope Evm,
34    make_db: &'scope MakeDb,
35    received_bal_revm: Arc<RevmBal>,
36    evm_env: EvmEnvFor<Evm>,
37    ctx: ExecutionCtxFor<'scope, Evm>,
38) where
39    Evm: ConfigureEvm + 'scope,
40    Tx: ExecutableTxFor<Evm> + Send + 'scope,
41    Err: core::error::Error + Send + Sync + 'static,
42    DB: Database + Send + 'scope,
43    MakeDb: Fn() -> Result<DB, BalExecutionError> + Sync + 'scope,
44{
45    scope.spawn(move |_| {
46        let worker_result = (|| -> Result<(), BalExecutionError> {
47            let mut worker_state = State::builder()
48                .with_database(make_db()?)
49                .with_bal(received_bal_revm)
50                .with_bundle_update()
51                .build();
52            let evm = evm_config.evm_with_env(&mut worker_state, evm_env);
53            let mut executor = evm_config.create_executor_with_state(evm, ctx.clone());
54
55            loop {
56                let (index, tx) = crossbeam_channel::select_biased! {
57                    recv(abort_rx) -> _ => break,
58                    recv(tx_rx) -> msg => match msg {
59                        Ok(ix_tx) => ix_tx,
60                        Err(_) => break,
61                    },
62                };
63                let tx = tx.map_err(|e| BalExecutionError::Evm(BlockExecutionError::other(e)))?;
64                let signer = *tx.signer();
65                let tx_gas_limit = tx.tx().gas_limit();
66
67                executor.evm_mut().db_mut().set_bal_index(index as u64 + 1);
68                let result = executor
69                    .execute_transaction_without_commit(tx)
70                    .map_err(BalExecutionError::Evm)?;
71
72                if result_tx
73                    .send(Ok(BalWorkerOutput { index, signer, tx_gas_limit, result }))
74                    .is_err()
75                {
76                    break;
77                }
78            }
79
80            Ok(())
81        })();
82
83        if let Err(err) = worker_result {
84            let _ = result_tx.send(Err(err));
85        }
86    });
87}