1use alloy_consensus::{BlockHeader, Transaction};
4use alloy_primitives::Bytes;
5use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
6use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
7use itertools::Either;
8use reth_chainspec::{ChainSpecProvider, EthChainSpec};
9use reth_engine_primitives::{
10 BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
11};
12use reth_engine_tree::tree::EngineValidator;
13use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
14use reth_evm::{
15 execute::{BlockBuilder, BlockBuilderOutcome},
16 ConfigureEvm,
17};
18use reth_payload_primitives::{BuiltPayload, PayloadTypes};
19use reth_primitives_traits::{
20 block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
21};
22use reth_revm::{database::StateProviderDatabase, db::State};
23use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
24use std::{
25 collections::VecDeque,
26 future::Future,
27 pin::Pin,
28 task::{ready, Context, Poll},
29};
30use tokio::sync::oneshot;
31use tracing::*;
32
33#[derive(Debug)]
34enum EngineReorgState<T: PayloadTypes> {
35 Forward,
36 Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
37}
38
39type EngineReorgResponse = Result<
40 Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
41 oneshot::error::RecvError,
42>;
43
44type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
45
46#[derive(Debug)]
48#[pin_project::pin_project]
49pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
50 #[pin]
52 stream: S,
53 provider: Provider,
55 evm_config: Evm,
57 payload_validator: Validator,
59 frequency: usize,
61 depth: usize,
63 forkchoice_states_forwarded: usize,
66 state: EngineReorgState<T>,
68 last_forkchoice_state: Option<ForkchoiceState>,
70 reorg_responses: FuturesUnordered<ReorgResponseFut>,
72}
73
74impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
75 pub fn new(
77 stream: S,
78 provider: Provider,
79 evm_config: Evm,
80 payload_validator: Validator,
81 frequency: usize,
82 depth: usize,
83 ) -> Self {
84 Self {
85 stream,
86 provider,
87 evm_config,
88 payload_validator,
89 frequency,
90 depth,
91 state: EngineReorgState::Forward,
92 forkchoice_states_forwarded: 0,
93 last_forkchoice_state: None,
94 reorg_responses: FuturesUnordered::new(),
95 }
96 }
97}
98
99impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
100where
101 S: Stream<Item = BeaconEngineMessage<T>>,
102 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
103 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
104 + StateProviderFactory
105 + ChainSpecProvider,
106 Evm: ConfigureEvm,
107 Validator: EngineValidator<T, Evm::Primitives>,
108{
109 type Item = S::Item;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 let mut this = self.project();
113
114 loop {
115 if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
116 match response {
117 Ok(Either::Left(Ok(payload_status))) => {
118 debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
119 }
120 Ok(Either::Left(Err(payload_error))) => {
121 error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
122 }
123 Ok(Either::Right(Ok(fcu_status))) => {
124 debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
125 }
126 Ok(Either::Right(Err(fcu_error))) => {
127 error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
128 }
129 Err(_) => {}
130 };
131 continue
132 }
133
134 if let EngineReorgState::Reorg { queue } = &mut this.state {
135 match queue.pop_front() {
136 Some(msg) => return Poll::Ready(Some(msg)),
137 None => {
138 *this.forkchoice_states_forwarded = 0;
139 *this.state = EngineReorgState::Forward;
140 }
141 }
142 }
143
144 let next = ready!(this.stream.poll_next_unpin(cx));
145 let item = match (next, &this.last_forkchoice_state) {
146 (
147 Some(BeaconEngineMessage::NewPayload { payload, tx }),
148 Some(last_forkchoice_state),
149 ) if this.forkchoice_states_forwarded > this.frequency &&
150 last_forkchoice_state.head_block_hash == payload.parent_hash() =>
152 {
153 let (reorg_block, encoded_bal) = match create_reorg_head(
163 this.provider,
164 this.evm_config,
165 this.payload_validator,
166 *this.depth,
167 payload.clone(),
168 ) {
169 Ok(result) => result,
170 Err(error) => {
171 error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
172 return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
175 payload,
176 tx,
177 }))
178 }
179 };
180 let reorg_forkchoice_state = ForkchoiceState {
181 finalized_block_hash: last_forkchoice_state.finalized_block_hash,
182 safe_block_hash: last_forkchoice_state.safe_block_hash,
183 head_block_hash: reorg_block.hash(),
184 };
185
186 let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
187 let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
188 this.reorg_responses.extend([
189 Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
190 Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
191 ]);
192
193 let queue = VecDeque::from([
194 BeaconEngineMessage::NewPayload { payload, tx },
196 BeaconEngineMessage::NewPayload {
198 payload: T::block_to_payload(reorg_block, encoded_bal),
199 tx: reorg_payload_tx,
200 },
201 BeaconEngineMessage::ForkchoiceUpdated {
203 state: reorg_forkchoice_state,
204 payload_attrs: None,
205 tx: reorg_fcu_tx,
206 },
207 ]);
208 *this.state = EngineReorgState::Reorg { queue };
209 continue
210 }
211 (Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
212 *this.last_forkchoice_state = Some(state);
216 *this.forkchoice_states_forwarded += 1;
217 Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
218 }
219 (item, _) => item,
220 };
221 return Poll::Ready(item)
222 }
223 }
224}
225
226#[allow(clippy::type_complexity)]
227fn create_reorg_head<Provider, Evm, T, Validator>(
228 provider: &Provider,
229 evm_config: &Evm,
230 payload_validator: &Validator,
231 mut depth: usize,
232 next_payload: T::ExecutionData,
233) -> RethResult<(SealedBlock<BlockTy<Evm::Primitives>>, Option<Bytes>)>
234where
235 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
236 + StateProviderFactory
237 + ChainSpecProvider<ChainSpec: EthChainSpec>,
238 Evm: ConfigureEvm,
239 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
240 Validator: EngineValidator<T, Evm::Primitives>,
241{
242 let next_block =
244 payload_validator.convert_payload_to_block(next_payload).map_err(RethError::msg)?;
245
246 let mut previous_hash = next_block.parent_hash();
248 let mut candidate_transactions = next_block.into_body().transactions().to_vec();
249 let reorg_target = 'target: {
250 loop {
251 let reorg_target = provider
252 .block_by_hash(previous_hash)?
253 .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
254 if depth == 0 {
255 break 'target reorg_target.seal_slow()
256 }
257
258 depth -= 1;
259 previous_hash = reorg_target.header().parent_hash();
260 candidate_transactions = reorg_target.into_body().into_transactions();
261 }
262 };
263 let reorg_target_parent = provider
264 .sealed_header_by_hash(reorg_target.header().parent_hash())?
265 .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
266
267 debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
268
269 let has_bal = reorg_target.header().block_access_list_hash().is_some();
271 let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
272 let mut state = State::builder()
273 .with_database_ref(StateProviderDatabase::new(&state_provider))
274 .with_bundle_update()
275 .with_bal_builder_if(has_bal)
276 .build();
277
278 let ctx = evm_config.context_for_block(&reorg_target).map_err(RethError::other)?;
279 let evm = evm_config.evm_for_block(&mut state, &reorg_target).map_err(RethError::other)?;
280 let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
281
282 builder.apply_pre_execution_changes()?;
283
284 let mut cumulative_gas_used = 0;
285 for tx in candidate_transactions {
286 if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
288 continue
289 }
290
291 let tx_recovered =
292 tx.try_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
293 let gas_used = match builder.execute_transaction(tx_recovered) {
294 Ok(gas_used) => gas_used.tx_gas_used(),
295 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
296 hash,
297 error,
298 })) => {
299 trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
300 continue
301 }
302 Err(error) => return Err(RethError::Execution(error)),
304 };
305
306 cumulative_gas_used += gas_used;
307 }
308
309 let BlockBuilderOutcome { block, block_access_list, .. } =
310 builder.finish(&state_provider, None)?;
311
312 let encoded_bal: Option<Bytes> = block_access_list.map(|bal| alloy_rlp::encode(&bal).into());
313
314 Ok((block.into_sealed_block(), encoded_bal))
315}