reth_engine_tree/tree/payload_processor/bal/
worker.rs1use super::BalExecutionError;
2use alloy_consensus::Transaction;
3use alloy_eip7928::BlockAccessIndex;
4use alloy_evm::{
5 block::{BlockExecutionError, BlockExecutor, BlockExecutorFactory},
6 Evm,
7};
8use alloy_primitives::Address;
9use crossbeam_channel::{Receiver, Sender};
10use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Database, EvmEnvFor, ExecutionCtxFor};
11use revm::database::State;
12use revm_state::bal::Bal as RevmBal;
13use std::sync::Arc;
14
15#[derive(Debug, thiserror::Error)]
16pub(super) enum BalWorkerError {
17 #[error("BAL worker setup failed: {0}")]
19 Setup(#[source] BalExecutionError),
20 #[error("BAL worker transaction conversion failed: {0}")]
22 Transaction(Box<dyn core::error::Error + Send + Sync + 'static>),
23 #[error("BAL worker EVM execution failed: {0}")]
25 Execution(BlockExecutionError),
26}
27
28impl From<BalWorkerError> for BalExecutionError {
29 fn from(err: BalWorkerError) -> Self {
30 match err {
31 BalWorkerError::Setup(err) => err,
32 BalWorkerError::Transaction(err) => Self::Other(err),
33 BalWorkerError::Execution(err) => Self::Execution(err),
34 }
35 }
36}
37
38pub(super) struct BalWorkerOutput<R> {
39 pub(super) index: usize,
40 pub(super) signer: Address,
41 pub(super) tx_gas_limit: u64,
42 pub(super) result: R,
43}
44
45type WorkerExecutorResult<Cfg> =
46 <<Cfg as ConfigureEvm>::BlockExecutorFactory as BlockExecutorFactory>::TxExecutionResult;
47
48type WorkerResultSender<Cfg> =
49 Sender<Result<BalWorkerOutput<WorkerExecutorResult<Cfg>>, BalWorkerError>>;
50
51#[expect(clippy::too_many_arguments)]
52pub(super) fn spawn_worker<'scope, Evm, Tx, Err, DB, MakeDb>(
53 scope: &rayon::Scope<'scope>,
54 tx_rx: Receiver<(usize, Result<Tx, Err>)>,
55 abort_rx: Receiver<()>,
56 result_tx: WorkerResultSender<Evm>,
57 evm_config: &'scope Evm,
58 make_db: &'scope MakeDb,
59 received_bal_revm: Arc<RevmBal>,
60 evm_env: EvmEnvFor<Evm>,
61 ctx: ExecutionCtxFor<'scope, Evm>,
62) where
63 Evm: ConfigureEvm + 'scope,
64 Tx: ExecutableTxFor<Evm> + Send + 'scope,
65 Err: core::error::Error + Send + Sync + 'static,
66 DB: Database + Send + 'scope,
67 MakeDb: Fn(bool) -> Result<DB, BalExecutionError> + Sync + 'scope,
68{
69 scope.spawn(move |_| {
70 let worker_result = (|| -> Result<(), BalWorkerError> {
71 let database = make_db(true).map_err(BalWorkerError::Setup)?;
74 let mut worker_state = State::builder()
75 .with_database(database)
76 .with_bal(received_bal_revm)
77 .with_bundle_update()
78 .build();
79 let evm = evm_config.evm_with_env(&mut worker_state, evm_env);
80 let mut executor = evm_config.create_executor_with_state(evm, ctx.clone());
81
82 loop {
83 let (index, tx) = crossbeam_channel::select_biased! {
84 recv(abort_rx) -> _ => break,
85 recv(tx_rx) -> msg => match msg {
86 Ok(ix_tx) => ix_tx,
87 Err(_) => break,
88 },
89 };
90 let tx = tx.map_err(|e| BalWorkerError::Transaction(Box::new(e)))?;
91 let signer = *tx.signer();
92 let tx_gas_limit = tx.tx().gas_limit();
93
94 executor.evm_mut().db_mut().set_bal_index(BlockAccessIndex::new(index as u64 + 1));
95 let result = executor
96 .execute_transaction_without_commit(tx)
97 .map_err(BalWorkerError::Execution)?;
98
99 if result_tx
100 .send(Ok(BalWorkerOutput { index, signer, tx_gas_limit, result }))
101 .is_err()
102 {
103 break;
104 }
105 }
106
107 Ok(())
108 })();
109
110 if let Err(err) = worker_result {
111 let _ = result_tx.send(Err(err));
112 }
113 });
114}