reth_engine_tree/tree/payload_processor/bal/
worker.rs1use super::BalExecutionError;
2use alloy_consensus::Transaction;
3use alloy_evm::{
4 block::{BlockExecutionError, BlockExecutor, BlockExecutorFactory},
5 Evm,
6};
7use alloy_primitives::Address;
8use crossbeam_channel::{Receiver, Sender};
9use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Database, EvmEnvFor, ExecutionCtxFor};
10use revm::database::State;
11use revm_state::bal::Bal as RevmBal;
12use std::sync::Arc;
13
14pub(super) struct BalWorkerOutput<R> {
15 pub(super) index: usize,
16 pub(super) signer: Address,
17 pub(super) tx_gas_limit: u64,
18 pub(super) result: R,
19}
20
21type WorkerExecutorResult<Cfg> =
22 <<Cfg as ConfigureEvm>::BlockExecutorFactory as BlockExecutorFactory>::TxExecutionResult;
23
24type WorkerResultSender<Cfg> =
25 Sender<Result<BalWorkerOutput<WorkerExecutorResult<Cfg>>, BalExecutionError>>;
26
27#[expect(clippy::too_many_arguments)]
28pub(super) fn spawn_worker<'scope, Evm, Tx, Err, DB, MakeDb>(
29 scope: &rayon::Scope<'scope>,
30 tx_rx: Receiver<(usize, Result<Tx, Err>)>,
31 abort_rx: Receiver<()>,
32 result_tx: WorkerResultSender<Evm>,
33 evm_config: &'scope Evm,
34 make_db: &'scope MakeDb,
35 received_bal_revm: Arc<RevmBal>,
36 evm_env: EvmEnvFor<Evm>,
37 ctx: ExecutionCtxFor<'scope, Evm>,
38) where
39 Evm: ConfigureEvm + 'scope,
40 Tx: ExecutableTxFor<Evm> + Send + 'scope,
41 Err: core::error::Error + Send + Sync + 'static,
42 DB: Database + Send + 'scope,
43 MakeDb: Fn() -> Result<DB, BalExecutionError> + Sync + 'scope,
44{
45 scope.spawn(move |_| {
46 let worker_result = (|| -> Result<(), BalExecutionError> {
47 let mut worker_state = State::builder()
48 .with_database(make_db()?)
49 .with_bal(received_bal_revm)
50 .with_bundle_update()
51 .build();
52 let evm = evm_config.evm_with_env(&mut worker_state, evm_env);
53 let mut executor = evm_config.create_executor_with_state(evm, ctx.clone());
54
55 loop {
56 let (index, tx) = crossbeam_channel::select_biased! {
57 recv(abort_rx) -> _ => break,
58 recv(tx_rx) -> msg => match msg {
59 Ok(ix_tx) => ix_tx,
60 Err(_) => break,
61 },
62 };
63 let tx = tx.map_err(|e| BalExecutionError::Evm(BlockExecutionError::other(e)))?;
64 let signer = *tx.signer();
65 let tx_gas_limit = tx.tx().gas_limit();
66
67 executor.evm_mut().db_mut().set_bal_index(index as u64 + 1);
68 let result = executor
69 .execute_transaction_without_commit(tx)
70 .map_err(BalExecutionError::Evm)?;
71
72 if result_tx
73 .send(Ok(BalWorkerOutput { index, signer, tx_gas_limit, result }))
74 .is_err()
75 {
76 break;
77 }
78 }
79
80 Ok(())
81 })();
82
83 if let Err(err) = worker_result {
84 let _ = result_tx.send(Err(err));
85 }
86 });
87}