1use alloy_consensus::{BlockHeader, Transaction};
4use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
5use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
6use itertools::Either;
7use reth_chainspec::{ChainSpecProvider, EthChainSpec};
8use reth_engine_primitives::{
9 BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
10};
11use reth_engine_tree::tree::EngineValidator;
12use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
13use reth_evm::{
14 execute::{BlockBuilder, BlockBuilderOutcome},
15 ConfigureEvm,
16};
17use reth_payload_primitives::{BuiltPayload, EngineApiMessageVersion, PayloadTypes};
18use reth_primitives_traits::{
19 block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
20};
21use reth_revm::{database::StateProviderDatabase, db::State};
22use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
23use std::{
24 collections::VecDeque,
25 future::Future,
26 pin::Pin,
27 task::{ready, Context, Poll},
28};
29use tokio::sync::oneshot;
30use tracing::*;
31
32#[derive(Debug)]
33enum EngineReorgState<T: PayloadTypes> {
34 Forward,
35 Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
36}
37
38type EngineReorgResponse = Result<
39 Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
40 oneshot::error::RecvError,
41>;
42
43type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
44
45#[derive(Debug)]
47#[pin_project::pin_project]
48pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
49 #[pin]
51 stream: S,
52 provider: Provider,
54 evm_config: Evm,
56 payload_validator: Validator,
58 frequency: usize,
60 depth: usize,
62 forkchoice_states_forwarded: usize,
65 state: EngineReorgState<T>,
67 last_forkchoice_state: Option<ForkchoiceState>,
69 reorg_responses: FuturesUnordered<ReorgResponseFut>,
71}
72
73impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
74 pub fn new(
76 stream: S,
77 provider: Provider,
78 evm_config: Evm,
79 payload_validator: Validator,
80 frequency: usize,
81 depth: usize,
82 ) -> Self {
83 Self {
84 stream,
85 provider,
86 evm_config,
87 payload_validator,
88 frequency,
89 depth,
90 state: EngineReorgState::Forward,
91 forkchoice_states_forwarded: 0,
92 last_forkchoice_state: None,
93 reorg_responses: FuturesUnordered::new(),
94 }
95 }
96}
97
98impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
99where
100 S: Stream<Item = BeaconEngineMessage<T>>,
101 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
102 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
103 + StateProviderFactory
104 + ChainSpecProvider,
105 Evm: ConfigureEvm,
106 Validator: EngineValidator<T, Evm::Primitives>,
107{
108 type Item = S::Item;
109
110 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111 let mut this = self.project();
112
113 loop {
114 if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
115 match response {
116 Ok(Either::Left(Ok(payload_status))) => {
117 debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
118 }
119 Ok(Either::Left(Err(payload_error))) => {
120 error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
121 }
122 Ok(Either::Right(Ok(fcu_status))) => {
123 debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
124 }
125 Ok(Either::Right(Err(fcu_error))) => {
126 error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
127 }
128 Err(_) => {}
129 };
130 continue
131 }
132
133 if let EngineReorgState::Reorg { queue } = &mut this.state {
134 match queue.pop_front() {
135 Some(msg) => return Poll::Ready(Some(msg)),
136 None => {
137 *this.forkchoice_states_forwarded = 0;
138 *this.state = EngineReorgState::Forward;
139 }
140 }
141 }
142
143 let next = ready!(this.stream.poll_next_unpin(cx));
144 let item = match (next, &this.last_forkchoice_state) {
145 (
146 Some(BeaconEngineMessage::NewPayload { payload, tx }),
147 Some(last_forkchoice_state),
148 ) if this.forkchoice_states_forwarded > this.frequency &&
149 last_forkchoice_state.head_block_hash == payload.parent_hash() =>
151 {
152 let reorg_block = match create_reorg_head(
162 this.provider,
163 this.evm_config,
164 this.payload_validator,
165 *this.depth,
166 payload.clone(),
167 ) {
168 Ok(result) => result,
169 Err(error) => {
170 error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
171 return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
174 payload,
175 tx,
176 }))
177 }
178 };
179 let reorg_forkchoice_state = ForkchoiceState {
180 finalized_block_hash: last_forkchoice_state.finalized_block_hash,
181 safe_block_hash: last_forkchoice_state.safe_block_hash,
182 head_block_hash: reorg_block.hash(),
183 };
184
185 let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
186 let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
187 this.reorg_responses.extend([
188 Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
189 Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
190 ]);
191
192 let queue = VecDeque::from([
193 BeaconEngineMessage::NewPayload { payload, tx },
195 BeaconEngineMessage::NewPayload {
197 payload: T::block_to_payload(reorg_block),
198 tx: reorg_payload_tx,
199 },
200 BeaconEngineMessage::ForkchoiceUpdated {
202 state: reorg_forkchoice_state,
203 payload_attrs: None,
204 tx: reorg_fcu_tx,
205 version: EngineApiMessageVersion::default(),
206 },
207 ]);
208 *this.state = EngineReorgState::Reorg { queue };
209 continue
210 }
211 (
212 Some(BeaconEngineMessage::ForkchoiceUpdated {
213 state,
214 payload_attrs,
215 tx,
216 version,
217 }),
218 _,
219 ) => {
220 *this.last_forkchoice_state = Some(state);
224 *this.forkchoice_states_forwarded += 1;
225 Some(BeaconEngineMessage::ForkchoiceUpdated {
226 state,
227 payload_attrs,
228 tx,
229 version,
230 })
231 }
232 (item, _) => item,
233 };
234 return Poll::Ready(item)
235 }
236 }
237}
238
239fn create_reorg_head<Provider, Evm, T, Validator>(
240 provider: &Provider,
241 evm_config: &Evm,
242 payload_validator: &Validator,
243 mut depth: usize,
244 next_payload: T::ExecutionData,
245) -> RethResult<SealedBlock<BlockTy<Evm::Primitives>>>
246where
247 Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
248 + StateProviderFactory
249 + ChainSpecProvider<ChainSpec: EthChainSpec>,
250 Evm: ConfigureEvm,
251 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
252 Validator: EngineValidator<T, Evm::Primitives>,
253{
254 let next_block =
256 payload_validator.ensure_well_formed_payload(next_payload).map_err(RethError::msg)?;
257
258 let mut previous_hash = next_block.parent_hash();
260 let mut candidate_transactions = next_block.into_body().transactions().to_vec();
261 let reorg_target = 'target: {
262 loop {
263 let reorg_target = provider
264 .block_by_hash(previous_hash)?
265 .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
266 if depth == 0 {
267 break 'target reorg_target.seal_slow()
268 }
269
270 depth -= 1;
271 previous_hash = reorg_target.header().parent_hash();
272 candidate_transactions = reorg_target.into_body().into_transactions();
273 }
274 };
275 let reorg_target_parent = provider
276 .sealed_header_by_hash(reorg_target.header().parent_hash())?
277 .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
278
279 debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
280
281 let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
283 let mut state = State::builder()
284 .with_database_ref(StateProviderDatabase::new(&state_provider))
285 .with_bundle_update()
286 .build();
287
288 let ctx = evm_config.context_for_block(&reorg_target);
289 let evm = evm_config.evm_for_block(&mut state, &reorg_target);
290 let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
291
292 builder.apply_pre_execution_changes()?;
293
294 let mut cumulative_gas_used = 0;
295 for tx in candidate_transactions {
296 if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
298 continue
299 }
300
301 let tx_recovered =
302 tx.try_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
303 let gas_used = match builder.execute_transaction(tx_recovered) {
304 Ok(gas_used) => gas_used,
305 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
306 hash,
307 error,
308 })) => {
309 trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
310 continue
311 }
312 Err(error) => return Err(RethError::Execution(error)),
314 };
315
316 cumulative_gas_used += gas_used;
317 }
318
319 let BlockBuilderOutcome { block, .. } = builder.finish(&state_provider)?;
320
321 Ok(block.into_sealed_block())
322}