use alloy_consensus::{Header, Transaction};
use alloy_primitives::U256;
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus,
};
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use itertools::Either;
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
OnForkChoiceUpdated,
};
use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
use reth_ethereum_forks::EthereumHardforks;
use reth_evm::{
state_change::post_block_withdrawals_balance_increments, system_calls::SystemCaller,
ConfigureEvm,
};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{proofs, Block, BlockBody, Receipt, Receipts};
use reth_provider::{BlockReader, ExecutionOutcome, ProviderError, StateProviderFactory};
use reth_revm::{
database::StateProviderDatabase,
db::{states::bundle_state::BundleRetention, State},
DatabaseCommit,
};
use reth_rpc_types_compat::engine::payload::block_to_payload;
use reth_trie::HashedPostState;
use revm_primitives::{
calc_excess_blob_gas, BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg,
};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tracing::*;
#[derive(Debug)]
enum EngineReorgState<Engine: EngineTypes> {
Forward,
Reorg { queue: VecDeque<BeaconEngineMessage<Engine>> },
}
type EngineReorgResponse = Result<
Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
oneshot::error::RecvError,
>;
type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm, Spec> {
#[pin]
stream: S,
provider: Provider,
evm_config: Evm,
payload_validator: ExecutionPayloadValidator<Spec>,
frequency: usize,
depth: usize,
forkchoice_states_forwarded: usize,
state: EngineReorgState<Engine>,
last_forkchoice_state: Option<ForkchoiceState>,
reorg_responses: FuturesUnordered<ReorgResponseFut>,
}
impl<S, Engine: EngineTypes, Provider, Evm, Spec> EngineReorg<S, Engine, Provider, Evm, Spec> {
pub fn new(
stream: S,
provider: Provider,
evm_config: Evm,
payload_validator: ExecutionPayloadValidator<Spec>,
frequency: usize,
depth: usize,
) -> Self {
Self {
stream,
provider,
evm_config,
payload_validator,
frequency,
depth,
state: EngineReorgState::Forward,
forkchoice_states_forwarded: 0,
last_forkchoice_state: None,
reorg_responses: FuturesUnordered::new(),
}
}
}
impl<S, Engine, Provider, Evm, Spec> Stream for EngineReorg<S, Engine, Provider, Evm, Spec>
where
S: Stream<Item = BeaconEngineMessage<Engine>>,
Engine: EngineTypes,
Provider: BlockReader + StateProviderFactory,
Evm: ConfigureEvm<Header = Header>,
Spec: EthereumHardforks,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
match response {
Ok(Either::Left(Ok(payload_status))) => {
debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
}
Ok(Either::Left(Err(payload_error))) => {
error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
}
Ok(Either::Right(Ok(fcu_status))) => {
debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
}
Ok(Either::Right(Err(fcu_error))) => {
error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
}
Err(_) => {}
};
continue
}
if let EngineReorgState::Reorg { queue } = &mut this.state {
match queue.pop_front() {
Some(msg) => return Poll::Ready(Some(msg)),
None => {
*this.forkchoice_states_forwarded = 0;
*this.state = EngineReorgState::Forward;
}
}
}
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match (next, &this.last_forkchoice_state) {
(
Some(BeaconEngineMessage::NewPayload { payload, sidecar, tx }),
Some(last_forkchoice_state),
) if this.forkchoice_states_forwarded > this.frequency &&
last_forkchoice_state.head_block_hash == payload.parent_hash() =>
{
let (reorg_payload, reorg_sidecar) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
*this.depth,
payload.clone(),
sidecar.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
sidecar,
tx,
}))
}
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);
let queue = VecDeque::from([
BeaconEngineMessage::NewPayload { payload, sidecar, tx },
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
sidecar: reorg_sidecar,
tx: reorg_payload_tx,
},
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
version: EngineApiMessageVersion::default(),
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
(
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
}),
_,
) => {
*this.last_forkchoice_state = Some(state);
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
})
}
(item, _) => item,
};
return Poll::Ready(item)
}
}
}
fn create_reorg_head<Provider, Evm, Spec>(
provider: &Provider,
evm_config: &Evm,
payload_validator: &ExecutionPayloadValidator<Spec>,
mut depth: usize,
next_payload: ExecutionPayload,
next_sidecar: ExecutionPayloadSidecar,
) -> RethResult<(ExecutionPayload, ExecutionPayloadSidecar)>
where
Provider: BlockReader + StateProviderFactory,
Evm: ConfigureEvm<Header = Header>,
Spec: EthereumHardforks,
{
let chain_spec = payload_validator.chain_spec();
let next_block = payload_validator
.ensure_well_formed_payload(next_payload, next_sidecar)
.map_err(RethError::msg)?;
let mut previous_hash = next_block.parent_hash;
let mut candidate_transactions = next_block.body.transactions;
let reorg_target = 'target: {
loop {
let reorg_target = provider
.block_by_hash(previous_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
if depth == 0 {
break 'target reorg_target
}
depth -= 1;
previous_hash = reorg_target.parent_hash;
candidate_transactions = reorg_target.body.transactions;
}
};
let reorg_target_parent = provider
.block_by_hash(reorg_target.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;
debug!(target: "engine::stream::reorg", number = reorg_target.number, hash = %previous_hash, "Selected reorg target");
let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
let mut state = State::builder()
.with_database_ref(StateProviderDatabase::new(&state_provider))
.with_bundle_update()
.build();
let mut cfg = CfgEnvWithHandlerCfg::new(Default::default(), Default::default());
let mut block_env = BlockEnv::default();
evm_config.fill_cfg_and_block_env(&mut cfg, &mut block_env, &reorg_target.header, U256::MAX);
let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, Default::default());
let mut evm = evm_config.evm_with_env(&mut state, env);
let mut system_caller = SystemCaller::new(evm_config.clone(), chain_spec.clone());
system_caller.apply_beacon_root_contract_call(
reorg_target.timestamp,
reorg_target.number,
reorg_target.parent_beacon_block_root,
&mut evm,
)?;
let mut cumulative_gas_used = 0;
let mut sum_blob_gas_used = 0;
let mut transactions = Vec::new();
let mut receipts = Vec::new();
let mut versioned_hashes = Vec::new();
for tx in candidate_transactions {
if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
continue
}
let tx_recovered = tx.clone().try_into_ecrecovered().map_err(|_| {
BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError)
})?;
evm_config.fill_tx_env(evm.tx_mut(), &tx_recovered, tx_recovered.signer());
let exec_result = match evm.transact() {
Ok(result) => result,
error @ Err(EVMError::Transaction(_) | EVMError::Header(_)) => {
trace!(target: "engine::stream::reorg", hash = %tx.hash(), ?error, "Error executing transaction from next block");
continue
}
Err(error) => {
return Err(RethError::Execution(BlockExecutionError::Validation(
BlockValidationError::EVM { hash: tx.hash, error: Box::new(error) },
)))
}
};
evm.db_mut().commit(exec_result.state);
if let Some(blob_tx) = tx.transaction.as_eip4844() {
sum_blob_gas_used += blob_tx.blob_gas();
versioned_hashes.extend(blob_tx.blob_versioned_hashes.clone());
}
cumulative_gas_used += exec_result.result.gas_used();
#[allow(clippy::needless_update)] receipts.push(Some(Receipt {
tx_type: tx.tx_type(),
success: exec_result.result.is_success(),
cumulative_gas_used,
logs: exec_result.result.into_logs().into_iter().map(Into::into).collect(),
..Default::default()
}));
transactions.push(tx);
}
drop(evm);
if let Some(withdrawals) = &reorg_target.body.withdrawals {
state.increment_balances(post_block_withdrawals_balance_increments(
chain_spec,
reorg_target.timestamp,
withdrawals,
))?;
}
state.merge_transitions(BundleRetention::PlainState);
let outcome: ExecutionOutcome = ExecutionOutcome::new(
state.take_bundle(),
Receipts::from(vec![receipts]),
reorg_target.number,
Default::default(),
);
let hashed_state = HashedPostState::from_bundle_state(&outcome.state().state);
let (blob_gas_used, excess_blob_gas) =
if chain_spec.is_cancun_active_at_timestamp(reorg_target.timestamp) {
(
Some(sum_blob_gas_used),
Some(calc_excess_blob_gas(
reorg_target_parent.excess_blob_gas.unwrap_or_default(),
reorg_target_parent.blob_gas_used.unwrap_or_default(),
)),
)
} else {
(None, None)
};
let reorg_block = Block {
header: Header {
parent_hash: reorg_target.header.parent_hash,
ommers_hash: reorg_target.header.ommers_hash,
beneficiary: reorg_target.header.beneficiary,
difficulty: reorg_target.header.difficulty,
number: reorg_target.header.number,
gas_limit: reorg_target.header.gas_limit,
timestamp: reorg_target.header.timestamp,
extra_data: reorg_target.header.extra_data,
mix_hash: reorg_target.header.mix_hash,
nonce: reorg_target.header.nonce,
base_fee_per_gas: reorg_target.header.base_fee_per_gas,
parent_beacon_block_root: reorg_target.header.parent_beacon_block_root,
withdrawals_root: reorg_target.header.withdrawals_root,
transactions_root: proofs::calculate_transaction_root(&transactions),
receipts_root: outcome.receipts_root_slow(reorg_target.header.number).unwrap(),
logs_bloom: outcome.block_logs_bloom(reorg_target.header.number).unwrap(),
requests_hash: None, gas_used: cumulative_gas_used,
blob_gas_used: blob_gas_used.map(Into::into),
excess_blob_gas: excess_blob_gas.map(Into::into),
state_root: state_provider.state_root(hashed_state)?,
},
body: BlockBody {
transactions,
ommers: reorg_target.body.ommers,
withdrawals: reorg_target.body.withdrawals,
},
}
.seal_slow();
Ok((
block_to_payload(reorg_block),
reorg_target
.header
.parent_beacon_block_root
.map(|root| {
ExecutionPayloadSidecar::v3(CancunPayloadFields {
parent_beacon_block_root: root,
versioned_hashes,
})
})
.unwrap_or_else(ExecutionPayloadSidecar::none),
))
}