1use alloy_consensus::{BlockHeader, Transaction};
4use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
5use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
6use itertools::Either;
7use reth_chainspec::{ChainSpecProvider, EthChainSpec};
8use reth_engine_primitives::{
9 BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
10};
11use reth_engine_tree::tree::EngineValidator;
12use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
13use reth_evm::{
14 execute::{BlockBuilder, BlockBuilderOutcome},
15 ConfigureEvm,
16};
17use reth_payload_primitives::{BuiltPayload, PayloadTypes};
18use reth_primitives_traits::{
19 block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
20};
21use reth_revm::{database::StateProviderDatabase, db::State};
22use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
23use std::{
24 collections::VecDeque,
25 future::Future,
26 pin::Pin,
27 task::{ready, Context, Poll},
28};
29use tokio::sync::oneshot;
30use tracing::*;
31
32#[derive(Debug)]
33enum EngineReorgState<T: PayloadTypes> {
34 Forward,
35 Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
36}
37
38type EngineReorgResponse = Result<
39 Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
40 oneshot::error::RecvError,
41>;
42
43type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
44
45#[derive(Debug)]
47#[pin_project::pin_project]
48pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
49 #[pin]
51 stream: S,
52 provider: Provider,
54 evm_config: Evm,
56 payload_validator: Validator,
58 frequency: usize,
60 depth: usize,
62 forkchoice_states_forwarded: usize,
65 state: EngineReorgState<T>,
67 last_forkchoice_state: Option<ForkchoiceState>,
69 reorg_responses: FuturesUnordered<ReorgResponseFut>,
71}
72
73impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
74 pub fn new(
76 stream: S,
77 provider: Provider,
78 evm_config: Evm,
79 payload_validator: Validator,
80 frequency: usize,
81 depth: usize,
82 ) -> Self {
83 Self {
84 stream,
85 provider,
86 evm_config,
87 payload_validator,
88 frequency,
89 depth,
90 state: EngineReorgState::Forward,
91 forkchoice_states_forwarded: 0,
92 last_forkchoice_state: None,
93 reorg_responses: FuturesUnordered::new(),
94 }
95 }
96}
97
98impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
99where
100 S: Stream<Item = BeaconEngineMessage<T>>,
101 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
102 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
103 + StateProviderFactory
104 + ChainSpecProvider,
105 Evm: ConfigureEvm,
106 Validator: EngineValidator<T, Evm::Primitives>,
107{
108 type Item = S::Item;
109
110 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111 let mut this = self.project();
112
113 loop {
114 if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
115 match response {
116 Ok(Either::Left(Ok(payload_status))) => {
117 debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
118 }
119 Ok(Either::Left(Err(payload_error))) => {
120 error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
121 }
122 Ok(Either::Right(Ok(fcu_status))) => {
123 debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
124 }
125 Ok(Either::Right(Err(fcu_error))) => {
126 error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
127 }
128 Err(_) => {}
129 };
130 continue
131 }
132
133 if let EngineReorgState::Reorg { queue } = &mut this.state {
134 match queue.pop_front() {
135 Some(msg) => return Poll::Ready(Some(msg)),
136 None => {
137 *this.forkchoice_states_forwarded = 0;
138 *this.state = EngineReorgState::Forward;
139 }
140 }
141 }
142
143 let next = ready!(this.stream.poll_next_unpin(cx));
144 let item = match (next, &this.last_forkchoice_state) {
145 (
146 Some(BeaconEngineMessage::NewPayload { payload, tx }),
147 Some(last_forkchoice_state),
148 ) if this.forkchoice_states_forwarded > this.frequency &&
149 last_forkchoice_state.head_block_hash == payload.parent_hash() =>
151 {
152 let reorg_block = match create_reorg_head(
162 this.provider,
163 this.evm_config,
164 this.payload_validator,
165 *this.depth,
166 payload.clone(),
167 ) {
168 Ok(result) => result,
169 Err(error) => {
170 error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
171 return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
174 payload,
175 tx,
176 }))
177 }
178 };
179 let reorg_forkchoice_state = ForkchoiceState {
180 finalized_block_hash: last_forkchoice_state.finalized_block_hash,
181 safe_block_hash: last_forkchoice_state.safe_block_hash,
182 head_block_hash: reorg_block.hash(),
183 };
184
185 let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
186 let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
187 this.reorg_responses.extend([
188 Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
189 Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
190 ]);
191
192 let queue = VecDeque::from([
193 BeaconEngineMessage::NewPayload { payload, tx },
195 BeaconEngineMessage::NewPayload {
197 payload: T::block_to_payload(reorg_block),
198 tx: reorg_payload_tx,
199 },
200 BeaconEngineMessage::ForkchoiceUpdated {
202 state: reorg_forkchoice_state,
203 payload_attrs: None,
204 tx: reorg_fcu_tx,
205 },
206 ]);
207 *this.state = EngineReorgState::Reorg { queue };
208 continue
209 }
210 (Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
211 *this.last_forkchoice_state = Some(state);
215 *this.forkchoice_states_forwarded += 1;
216 Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
217 }
218 (item, _) => item,
219 };
220 return Poll::Ready(item)
221 }
222 }
223}
224
225fn create_reorg_head<Provider, Evm, T, Validator>(
226 provider: &Provider,
227 evm_config: &Evm,
228 payload_validator: &Validator,
229 mut depth: usize,
230 next_payload: T::ExecutionData,
231) -> RethResult<SealedBlock<BlockTy<Evm::Primitives>>>
232where
233 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
234 + StateProviderFactory
235 + ChainSpecProvider<ChainSpec: EthChainSpec>,
236 Evm: ConfigureEvm,
237 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
238 Validator: EngineValidator<T, Evm::Primitives>,
239{
240 let next_block =
242 payload_validator.convert_payload_to_block(next_payload).map_err(RethError::msg)?;
243
244 let mut previous_hash = next_block.parent_hash();
246 let mut candidate_transactions = next_block.into_body().transactions().to_vec();
247 let reorg_target = 'target: {
248 loop {
249 let reorg_target = provider
250 .block_by_hash(previous_hash)?
251 .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
252 if depth == 0 {
253 break 'target reorg_target.seal_slow()
254 }
255
256 depth -= 1;
257 previous_hash = reorg_target.header().parent_hash();
258 candidate_transactions = reorg_target.into_body().into_transactions();
259 }
260 };
261 let reorg_target_parent = provider
262 .sealed_header_by_hash(reorg_target.header().parent_hash())?
263 .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
264
265 debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
266
267 let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
269 let mut state = State::builder()
270 .with_database_ref(StateProviderDatabase::new(&state_provider))
271 .with_bundle_update()
272 .build();
273
274 let ctx = evm_config.context_for_block(&reorg_target).map_err(RethError::other)?;
275 let evm = evm_config.evm_for_block(&mut state, &reorg_target).map_err(RethError::other)?;
276 let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
277
278 builder.apply_pre_execution_changes()?;
279
280 let mut cumulative_gas_used = 0;
281 for tx in candidate_transactions {
282 if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
284 continue
285 }
286
287 let tx_recovered =
288 tx.try_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
289 let gas_used = match builder.execute_transaction(tx_recovered) {
290 Ok(gas_used) => gas_used,
291 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
292 hash,
293 error,
294 })) => {
295 trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
296 continue
297 }
298 Err(error) => return Err(RethError::Execution(error)),
300 };
301
302 cumulative_gas_used += gas_used;
303 }
304
305 let BlockBuilderOutcome { block, .. } = builder.finish(&state_provider)?;
306
307 Ok(block.into_sealed_block())
308}