Skip to main content

reth_engine_util/
reorg.rs

1//! Stream wrapper that simulates reorgs.
2
3use alloy_consensus::{BlockHeader, Transaction};
4use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
5use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
6use itertools::Either;
7use reth_chainspec::{ChainSpecProvider, EthChainSpec};
8use reth_engine_primitives::{
9    BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
10};
11use reth_engine_tree::tree::EngineValidator;
12use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
13use reth_evm::{
14    execute::{BlockBuilder, BlockBuilderOutcome},
15    ConfigureEvm,
16};
17use reth_payload_primitives::{BuiltPayload, PayloadTypes};
18use reth_primitives_traits::{
19    block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
20};
21use reth_revm::{database::StateProviderDatabase, db::State};
22use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
23use std::{
24    collections::VecDeque,
25    future::Future,
26    pin::Pin,
27    task::{ready, Context, Poll},
28};
29use tokio::sync::oneshot;
30use tracing::*;
31
32#[derive(Debug)]
33enum EngineReorgState<T: PayloadTypes> {
34    Forward,
35    Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
36}
37
38type EngineReorgResponse = Result<
39    Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
40    oneshot::error::RecvError,
41>;
42
43type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
44
45/// Engine API stream wrapper that simulates reorgs with specified frequency.
46#[derive(Debug)]
47#[pin_project::pin_project]
48pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
49    /// Underlying stream
50    #[pin]
51    stream: S,
52    /// Database provider.
53    provider: Provider,
54    /// Evm configuration.
55    evm_config: Evm,
56    /// Payload validator.
57    payload_validator: Validator,
58    /// The frequency of reorgs.
59    frequency: usize,
60    /// The depth of reorgs.
61    depth: usize,
62    /// The number of forwarded forkchoice states.
63    /// This is reset after a reorg.
64    forkchoice_states_forwarded: usize,
65    /// Current state of the stream.
66    state: EngineReorgState<T>,
67    /// Last forkchoice state.
68    last_forkchoice_state: Option<ForkchoiceState>,
69    /// Pending engine responses to reorg messages.
70    reorg_responses: FuturesUnordered<ReorgResponseFut>,
71}
72
73impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
74    /// Creates new [`EngineReorg`] stream wrapper.
75    pub fn new(
76        stream: S,
77        provider: Provider,
78        evm_config: Evm,
79        payload_validator: Validator,
80        frequency: usize,
81        depth: usize,
82    ) -> Self {
83        Self {
84            stream,
85            provider,
86            evm_config,
87            payload_validator,
88            frequency,
89            depth,
90            state: EngineReorgState::Forward,
91            forkchoice_states_forwarded: 0,
92            last_forkchoice_state: None,
93            reorg_responses: FuturesUnordered::new(),
94        }
95    }
96}
97
98impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
99where
100    S: Stream<Item = BeaconEngineMessage<T>>,
101    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
102    Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
103        + StateProviderFactory
104        + ChainSpecProvider,
105    Evm: ConfigureEvm,
106    Validator: EngineValidator<T, Evm::Primitives>,
107{
108    type Item = S::Item;
109
110    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111        let mut this = self.project();
112
113        loop {
114            if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
115                match response {
116                    Ok(Either::Left(Ok(payload_status))) => {
117                        debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
118                    }
119                    Ok(Either::Left(Err(payload_error))) => {
120                        error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
121                    }
122                    Ok(Either::Right(Ok(fcu_status))) => {
123                        debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
124                    }
125                    Ok(Either::Right(Err(fcu_error))) => {
126                        error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
127                    }
128                    Err(_) => {}
129                };
130                continue
131            }
132
133            if let EngineReorgState::Reorg { queue } = &mut this.state {
134                match queue.pop_front() {
135                    Some(msg) => return Poll::Ready(Some(msg)),
136                    None => {
137                        *this.forkchoice_states_forwarded = 0;
138                        *this.state = EngineReorgState::Forward;
139                    }
140                }
141            }
142
143            let next = ready!(this.stream.poll_next_unpin(cx));
144            let item = match (next, &this.last_forkchoice_state) {
145                (
146                    Some(BeaconEngineMessage::NewPayload { payload, tx }),
147                    Some(last_forkchoice_state),
148                ) if this.forkchoice_states_forwarded > this.frequency &&
149                        // Only enter reorg state if new payload attaches to current head.
150                        last_forkchoice_state.head_block_hash == payload.parent_hash() =>
151                {
152                    // Enter the reorg state.
153                    // The current payload will be immediately forwarded by being in front of the
154                    // queue. Then we attempt to reorg the current head by generating a payload that
155                    // attaches to the head's parent and is based on the non-conflicting
156                    // transactions (txs from block `n + 1` that are valid at block `n` according to
157                    // consensus checks) from the current payload as well as the corresponding
158                    // forkchoice state. We will rely on CL to reorg us back to canonical chain.
159                    // TODO: This is an expensive blocking operation, ideally it's spawned as a task
160                    // so that the stream could yield the control back.
161                    let reorg_block = match create_reorg_head(
162                        this.provider,
163                        this.evm_config,
164                        this.payload_validator,
165                        *this.depth,
166                        payload.clone(),
167                    ) {
168                        Ok(result) => result,
169                        Err(error) => {
170                            error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
171                            // Forward the payload and attempt to create reorg on top of
172                            // the next one
173                            return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
174                                payload,
175                                tx,
176                            }))
177                        }
178                    };
179                    let reorg_forkchoice_state = ForkchoiceState {
180                        finalized_block_hash: last_forkchoice_state.finalized_block_hash,
181                        safe_block_hash: last_forkchoice_state.safe_block_hash,
182                        head_block_hash: reorg_block.hash(),
183                    };
184
185                    let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
186                    let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
187                    this.reorg_responses.extend([
188                        Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
189                        Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
190                    ]);
191
192                    let queue = VecDeque::from([
193                        // Current payload
194                        BeaconEngineMessage::NewPayload { payload, tx },
195                        // Reorg payload
196                        BeaconEngineMessage::NewPayload {
197                            payload: T::block_to_payload(reorg_block),
198                            tx: reorg_payload_tx,
199                        },
200                        // Reorg forkchoice state
201                        BeaconEngineMessage::ForkchoiceUpdated {
202                            state: reorg_forkchoice_state,
203                            payload_attrs: None,
204                            tx: reorg_fcu_tx,
205                        },
206                    ]);
207                    *this.state = EngineReorgState::Reorg { queue };
208                    continue
209                }
210                (Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
211                    // Record last forkchoice state forwarded to the engine.
212                    // We do not care if it's valid since engine should be able to handle
213                    // reorgs that rely on invalid forkchoice state.
214                    *this.last_forkchoice_state = Some(state);
215                    *this.forkchoice_states_forwarded += 1;
216                    Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
217                }
218                (item, _) => item,
219            };
220            return Poll::Ready(item)
221        }
222    }
223}
224
225fn create_reorg_head<Provider, Evm, T, Validator>(
226    provider: &Provider,
227    evm_config: &Evm,
228    payload_validator: &Validator,
229    mut depth: usize,
230    next_payload: T::ExecutionData,
231) -> RethResult<SealedBlock<BlockTy<Evm::Primitives>>>
232where
233    Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
234        + StateProviderFactory
235        + ChainSpecProvider<ChainSpec: EthChainSpec>,
236    Evm: ConfigureEvm,
237    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
238    Validator: EngineValidator<T, Evm::Primitives>,
239{
240    // Ensure next payload is valid.
241    let next_block =
242        payload_validator.convert_payload_to_block(next_payload).map_err(RethError::msg)?;
243
244    // Fetch reorg target block depending on its depth and its parent.
245    let mut previous_hash = next_block.parent_hash();
246    let mut candidate_transactions = next_block.into_body().transactions().to_vec();
247    let reorg_target = 'target: {
248        loop {
249            let reorg_target = provider
250                .block_by_hash(previous_hash)?
251                .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
252            if depth == 0 {
253                break 'target reorg_target.seal_slow()
254            }
255
256            depth -= 1;
257            previous_hash = reorg_target.header().parent_hash();
258            candidate_transactions = reorg_target.into_body().into_transactions();
259        }
260    };
261    let reorg_target_parent = provider
262        .sealed_header_by_hash(reorg_target.header().parent_hash())?
263        .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
264
265    debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
266
267    // Configure state
268    let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
269    let mut state = State::builder()
270        .with_database_ref(StateProviderDatabase::new(&state_provider))
271        .with_bundle_update()
272        .build();
273
274    let ctx = evm_config.context_for_block(&reorg_target).map_err(RethError::other)?;
275    let evm = evm_config.evm_for_block(&mut state, &reorg_target).map_err(RethError::other)?;
276    let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
277
278    builder.apply_pre_execution_changes()?;
279
280    let mut cumulative_gas_used = 0;
281    for tx in candidate_transactions {
282        // ensure we still have capacity for this transaction
283        if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
284            continue
285        }
286
287        let tx_recovered =
288            tx.try_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
289        let gas_used = match builder.execute_transaction(tx_recovered) {
290            Ok(gas_used) => gas_used,
291            Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
292                hash,
293                error,
294            })) => {
295                trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
296                continue
297            }
298            // Treat error as fatal
299            Err(error) => return Err(RethError::Execution(error)),
300        };
301
302        cumulative_gas_used += gas_used;
303    }
304
305    let BlockBuilderOutcome { block, .. } = builder.finish(&state_provider)?;
306
307    Ok(block.into_sealed_block())
308}