Skip to main content

reth_engine_util/
reorg.rs

1//! Stream wrapper that simulates reorgs.
2
3use alloy_consensus::{BlockHeader, Transaction};
4use alloy_primitives::Bytes;
5use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
6use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
7use itertools::Either;
8use reth_chainspec::{ChainSpecProvider, EthChainSpec};
9use reth_engine_primitives::{
10    BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
11};
12use reth_engine_tree::tree::EngineValidator;
13use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
14use reth_evm::{
15    execute::{BlockBuilder, BlockBuilderOutcome},
16    ConfigureEvm,
17};
18use reth_payload_primitives::{BuiltPayload, PayloadTypes};
19use reth_primitives_traits::{
20    block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
21};
22use reth_revm::{database::StateProviderDatabase, db::State};
23use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
24use std::{
25    collections::VecDeque,
26    future::Future,
27    pin::Pin,
28    task::{ready, Context, Poll},
29};
30use tokio::sync::oneshot;
31use tracing::*;
32
33#[derive(Debug)]
34enum EngineReorgState<T: PayloadTypes> {
35    Forward,
36    Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
37}
38
39type EngineReorgResponse = Result<
40    Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
41    oneshot::error::RecvError,
42>;
43
44type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
45
46/// Engine API stream wrapper that simulates reorgs with specified frequency.
47#[derive(Debug)]
48#[pin_project::pin_project]
49pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
50    /// Underlying stream
51    #[pin]
52    stream: S,
53    /// Database provider.
54    provider: Provider,
55    /// Evm configuration.
56    evm_config: Evm,
57    /// Payload validator.
58    payload_validator: Validator,
59    /// The frequency of reorgs.
60    frequency: usize,
61    /// The depth of reorgs.
62    depth: usize,
63    /// The number of forwarded forkchoice states.
64    /// This is reset after a reorg.
65    forkchoice_states_forwarded: usize,
66    /// Current state of the stream.
67    state: EngineReorgState<T>,
68    /// Last forkchoice state.
69    last_forkchoice_state: Option<ForkchoiceState>,
70    /// Pending engine responses to reorg messages.
71    reorg_responses: FuturesUnordered<ReorgResponseFut>,
72}
73
74impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
75    /// Creates new [`EngineReorg`] stream wrapper.
76    pub fn new(
77        stream: S,
78        provider: Provider,
79        evm_config: Evm,
80        payload_validator: Validator,
81        frequency: usize,
82        depth: usize,
83    ) -> Self {
84        Self {
85            stream,
86            provider,
87            evm_config,
88            payload_validator,
89            frequency,
90            depth,
91            state: EngineReorgState::Forward,
92            forkchoice_states_forwarded: 0,
93            last_forkchoice_state: None,
94            reorg_responses: FuturesUnordered::new(),
95        }
96    }
97}
98
99impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
100where
101    S: Stream<Item = BeaconEngineMessage<T>>,
102    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
103    Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
104        + StateProviderFactory
105        + ChainSpecProvider,
106    Evm: ConfigureEvm,
107    Validator: EngineValidator<T, Evm::Primitives>,
108{
109    type Item = S::Item;
110
111    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        let mut this = self.project();
113
114        loop {
115            if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
116                match response {
117                    Ok(Either::Left(Ok(payload_status))) => {
118                        debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
119                    }
120                    Ok(Either::Left(Err(payload_error))) => {
121                        error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
122                    }
123                    Ok(Either::Right(Ok(fcu_status))) => {
124                        debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
125                    }
126                    Ok(Either::Right(Err(fcu_error))) => {
127                        error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
128                    }
129                    Err(_) => {}
130                };
131                continue
132            }
133
134            if let EngineReorgState::Reorg { queue } = &mut this.state {
135                match queue.pop_front() {
136                    Some(msg) => return Poll::Ready(Some(msg)),
137                    None => {
138                        *this.forkchoice_states_forwarded = 0;
139                        *this.state = EngineReorgState::Forward;
140                    }
141                }
142            }
143
144            let next = ready!(this.stream.poll_next_unpin(cx));
145            let item = match (next, &this.last_forkchoice_state) {
146                (
147                    Some(BeaconEngineMessage::NewPayload { payload, tx }),
148                    Some(last_forkchoice_state),
149                ) if this.forkchoice_states_forwarded > this.frequency &&
150                        // Only enter reorg state if new payload attaches to current head.
151                        last_forkchoice_state.head_block_hash == payload.parent_hash() =>
152                {
153                    // Enter the reorg state.
154                    // The current payload will be immediately forwarded by being in front of the
155                    // queue. Then we attempt to reorg the current head by generating a payload that
156                    // attaches to the head's parent and is based on the non-conflicting
157                    // transactions (txs from block `n + 1` that are valid at block `n` according to
158                    // consensus checks) from the current payload as well as the corresponding
159                    // forkchoice state. We will rely on CL to reorg us back to canonical chain.
160                    // TODO: This is an expensive blocking operation, ideally it's spawned as a task
161                    // so that the stream could yield the control back.
162                    let (reorg_block, encoded_bal) = match create_reorg_head(
163                        this.provider,
164                        this.evm_config,
165                        this.payload_validator,
166                        *this.depth,
167                        payload.clone(),
168                    ) {
169                        Ok(result) => result,
170                        Err(error) => {
171                            error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
172                            // Forward the payload and attempt to create reorg on top of
173                            // the next one
174                            return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
175                                payload,
176                                tx,
177                            }))
178                        }
179                    };
180                    let reorg_forkchoice_state = ForkchoiceState {
181                        finalized_block_hash: last_forkchoice_state.finalized_block_hash,
182                        safe_block_hash: last_forkchoice_state.safe_block_hash,
183                        head_block_hash: reorg_block.hash(),
184                    };
185
186                    let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
187                    let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
188                    this.reorg_responses.extend([
189                        Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
190                        Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
191                    ]);
192
193                    let queue = VecDeque::from([
194                        // Current payload
195                        BeaconEngineMessage::NewPayload { payload, tx },
196                        // Reorg payload
197                        BeaconEngineMessage::NewPayload {
198                            payload: T::block_to_payload(reorg_block, encoded_bal),
199                            tx: reorg_payload_tx,
200                        },
201                        // Reorg forkchoice state
202                        BeaconEngineMessage::ForkchoiceUpdated {
203                            state: reorg_forkchoice_state,
204                            payload_attrs: None,
205                            tx: reorg_fcu_tx,
206                        },
207                    ]);
208                    *this.state = EngineReorgState::Reorg { queue };
209                    continue
210                }
211                (Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
212                    // Record last forkchoice state forwarded to the engine.
213                    // We do not care if it's valid since engine should be able to handle
214                    // reorgs that rely on invalid forkchoice state.
215                    *this.last_forkchoice_state = Some(state);
216                    *this.forkchoice_states_forwarded += 1;
217                    Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
218                }
219                (item, _) => item,
220            };
221            return Poll::Ready(item)
222        }
223    }
224}
225
226#[allow(clippy::type_complexity)]
227fn create_reorg_head<Provider, Evm, T, Validator>(
228    provider: &Provider,
229    evm_config: &Evm,
230    payload_validator: &Validator,
231    mut depth: usize,
232    next_payload: T::ExecutionData,
233) -> RethResult<(SealedBlock<BlockTy<Evm::Primitives>>, Option<Bytes>)>
234where
235    Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
236        + StateProviderFactory
237        + ChainSpecProvider<ChainSpec: EthChainSpec>,
238    Evm: ConfigureEvm,
239    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
240    Validator: EngineValidator<T, Evm::Primitives>,
241{
242    // Ensure next payload is valid.
243    let next_block =
244        payload_validator.convert_payload_to_block(next_payload).map_err(RethError::msg)?;
245
246    // Fetch reorg target block depending on its depth and its parent.
247    let mut previous_hash = next_block.parent_hash();
248    let mut candidate_transactions = next_block.into_body().transactions().to_vec();
249    let reorg_target = 'target: {
250        loop {
251            let reorg_target = provider
252                .block_by_hash(previous_hash)?
253                .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
254            if depth == 0 {
255                break 'target reorg_target.seal_slow()
256            }
257
258            depth -= 1;
259            previous_hash = reorg_target.header().parent_hash();
260            candidate_transactions = reorg_target.into_body().into_transactions();
261        }
262    };
263    let reorg_target_parent = provider
264        .sealed_header_by_hash(reorg_target.header().parent_hash())?
265        .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
266
267    debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
268
269    // Configure state
270    let has_bal = reorg_target.header().block_access_list_hash().is_some();
271    let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
272    let mut state = State::builder()
273        .with_database_ref(StateProviderDatabase::new(&state_provider))
274        .with_bundle_update()
275        .with_bal_builder_if(has_bal)
276        .build();
277
278    let ctx = evm_config.context_for_block(&reorg_target).map_err(RethError::other)?;
279    let evm = evm_config.evm_for_block(&mut state, &reorg_target).map_err(RethError::other)?;
280    let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
281
282    builder.apply_pre_execution_changes()?;
283
284    let mut cumulative_gas_used = 0;
285    for tx in candidate_transactions {
286        // ensure we still have capacity for this transaction
287        if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
288            continue
289        }
290
291        let tx_recovered =
292            tx.try_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
293        let gas_used = match builder.execute_transaction(tx_recovered) {
294            Ok(gas_used) => gas_used.tx_gas_used(),
295            Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
296                hash,
297                error,
298            })) => {
299                trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
300                continue
301            }
302            // Treat error as fatal
303            Err(error) => return Err(RethError::Execution(error)),
304        };
305
306        cumulative_gas_used += gas_used;
307    }
308
309    let BlockBuilderOutcome { block, block_access_list, .. } =
310        builder.finish(&state_provider, None)?;
311
312    let encoded_bal: Option<Bytes> = block_access_list.map(|bal| alloy_rlp::encode(&bal).into());
313
314    Ok((block.into_sealed_block(), encoded_bal))
315}