1use alloy_consensus::{BlockHeader, Transaction};
4use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
5use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
6use itertools::Either;
7use reth_chainspec::{ChainSpecProvider, EthChainSpec};
8use reth_engine_primitives::{
9 BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
10 PayloadValidator,
11};
12use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
13use reth_evm::{
14 execute::{BlockBuilder, BlockBuilderOutcome},
15 ConfigureEvm,
16};
17use reth_payload_primitives::{BuiltPayload, EngineApiMessageVersion, PayloadTypes};
18use reth_primitives_traits::{
19 block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
20};
21use reth_revm::{database::StateProviderDatabase, db::State};
22use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
23use std::{
24 collections::VecDeque,
25 future::Future,
26 pin::Pin,
27 task::{ready, Context, Poll},
28};
29use tokio::sync::oneshot;
30use tracing::*;
31
32#[derive(Debug)]
33enum EngineReorgState<T: PayloadTypes> {
34 Forward,
35 Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
36}
37
38type EngineReorgResponse = Result<
39 Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
40 oneshot::error::RecvError,
41>;
42
43type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
44
45#[derive(Debug)]
47#[pin_project::pin_project]
48pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
49 #[pin]
51 stream: S,
52 provider: Provider,
54 evm_config: Evm,
56 payload_validator: Validator,
58 frequency: usize,
60 depth: usize,
62 forkchoice_states_forwarded: usize,
65 state: EngineReorgState<T>,
67 last_forkchoice_state: Option<ForkchoiceState>,
69 reorg_responses: FuturesUnordered<ReorgResponseFut>,
71}
72
73impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
74 pub fn new(
76 stream: S,
77 provider: Provider,
78 evm_config: Evm,
79 payload_validator: Validator,
80 frequency: usize,
81 depth: usize,
82 ) -> Self {
83 Self {
84 stream,
85 provider,
86 evm_config,
87 payload_validator,
88 frequency,
89 depth,
90 state: EngineReorgState::Forward,
91 forkchoice_states_forwarded: 0,
92 last_forkchoice_state: None,
93 reorg_responses: FuturesUnordered::new(),
94 }
95 }
96}
97
98impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
99where
100 S: Stream<Item = BeaconEngineMessage<T>>,
101 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
102 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
103 + StateProviderFactory
104 + ChainSpecProvider,
105 Evm: ConfigureEvm,
106 Validator: PayloadValidator<ExecutionData = T::ExecutionData, Block = BlockTy<Evm::Primitives>>,
107{
108 type Item = S::Item;
109
110 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111 let mut this = self.project();
112
113 loop {
114 if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
115 match response {
116 Ok(Either::Left(Ok(payload_status))) => {
117 debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
118 }
119 Ok(Either::Left(Err(payload_error))) => {
120 error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
121 }
122 Ok(Either::Right(Ok(fcu_status))) => {
123 debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
124 }
125 Ok(Either::Right(Err(fcu_error))) => {
126 error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
127 }
128 Err(_) => {}
129 };
130 continue
131 }
132
133 if let EngineReorgState::Reorg { queue } = &mut this.state {
134 match queue.pop_front() {
135 Some(msg) => return Poll::Ready(Some(msg)),
136 None => {
137 *this.forkchoice_states_forwarded = 0;
138 *this.state = EngineReorgState::Forward;
139 }
140 }
141 }
142
143 let next = ready!(this.stream.poll_next_unpin(cx));
144 let item = match (next, &this.last_forkchoice_state) {
145 (
146 Some(BeaconEngineMessage::NewPayload { payload, tx }),
147 Some(last_forkchoice_state),
148 ) if this.forkchoice_states_forwarded > this.frequency &&
149 last_forkchoice_state.head_block_hash == payload.parent_hash() =>
151 {
152 let reorg_block = match create_reorg_head(
162 this.provider,
163 this.evm_config,
164 this.payload_validator,
165 *this.depth,
166 payload.clone(),
167 ) {
168 Ok(result) => result,
169 Err(error) => {
170 error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
171 return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
174 payload,
175 tx,
176 }))
177 }
178 };
179 let reorg_forkchoice_state = ForkchoiceState {
180 finalized_block_hash: last_forkchoice_state.finalized_block_hash,
181 safe_block_hash: last_forkchoice_state.safe_block_hash,
182 head_block_hash: reorg_block.hash(),
183 };
184
185 let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
186 let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
187 this.reorg_responses.extend([
188 Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
189 Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
190 ]);
191
192 let queue = VecDeque::from([
193 BeaconEngineMessage::NewPayload { payload, tx },
195 BeaconEngineMessage::NewPayload {
197 payload: T::block_to_payload(reorg_block),
198 tx: reorg_payload_tx,
199 },
200 BeaconEngineMessage::ForkchoiceUpdated {
202 state: reorg_forkchoice_state,
203 payload_attrs: None,
204 tx: reorg_fcu_tx,
205 version: EngineApiMessageVersion::default(),
206 },
207 ]);
208 *this.state = EngineReorgState::Reorg { queue };
209 continue
210 }
211 (
212 Some(BeaconEngineMessage::ForkchoiceUpdated {
213 state,
214 payload_attrs,
215 tx,
216 version,
217 }),
218 _,
219 ) => {
220 *this.last_forkchoice_state = Some(state);
224 *this.forkchoice_states_forwarded += 1;
225 Some(BeaconEngineMessage::ForkchoiceUpdated {
226 state,
227 payload_attrs,
228 tx,
229 version,
230 })
231 }
232 (item, _) => item,
233 };
234 return Poll::Ready(item)
235 }
236 }
237}
238
239fn create_reorg_head<Provider, Evm, Validator>(
240 provider: &Provider,
241 evm_config: &Evm,
242 payload_validator: &Validator,
243 mut depth: usize,
244 next_payload: Validator::ExecutionData,
245) -> RethResult<SealedBlock<BlockTy<Evm::Primitives>>>
246where
247 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
248 + StateProviderFactory
249 + ChainSpecProvider<ChainSpec: EthChainSpec>,
250 Evm: ConfigureEvm,
251 Validator: PayloadValidator<Block = BlockTy<Evm::Primitives>>,
252{
253 let next_block =
255 payload_validator.ensure_well_formed_payload(next_payload).map_err(RethError::msg)?;
256
257 let mut previous_hash = next_block.parent_hash();
259 let mut candidate_transactions = next_block.into_body().transactions().to_vec();
260 let reorg_target = 'target: {
261 loop {
262 let reorg_target = provider
263 .block_by_hash(previous_hash)?
264 .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
265 if depth == 0 {
266 break 'target reorg_target.seal_slow()
267 }
268
269 depth -= 1;
270 previous_hash = reorg_target.header().parent_hash();
271 candidate_transactions = reorg_target.into_body().into_transactions();
272 }
273 };
274 let reorg_target_parent = provider
275 .sealed_header_by_hash(reorg_target.header().parent_hash())?
276 .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
277
278 debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
279
280 let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
282 let mut state = State::builder()
283 .with_database_ref(StateProviderDatabase::new(&state_provider))
284 .with_bundle_update()
285 .build();
286
287 let ctx = evm_config.context_for_block(&reorg_target);
288 let evm = evm_config.evm_for_block(&mut state, &reorg_target);
289 let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
290
291 builder.apply_pre_execution_changes()?;
292
293 let mut cumulative_gas_used = 0;
294 for tx in candidate_transactions {
295 if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
297 continue
298 }
299
300 let tx_recovered =
301 tx.try_clone_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
302 let gas_used = match builder.execute_transaction(tx_recovered) {
303 Ok(gas_used) => gas_used,
304 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
305 hash,
306 error,
307 })) => {
308 trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
309 continue
310 }
311 Err(error) => return Err(RethError::Execution(error)),
313 };
314
315 cumulative_gas_used += gas_used;
316 }
317
318 let BlockBuilderOutcome { block, .. } = builder.finish(&state_provider)?;
319
320 Ok(block.into_sealed_block())
321}