reth_engine_util/
reorg.rs

1//! Stream wrapper that simulates reorgs.
2
3use alloy_consensus::{BlockHeader, Transaction};
4use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
5use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
6use itertools::Either;
7use reth_chainspec::{ChainSpecProvider, EthChainSpec};
8use reth_engine_primitives::{
9    BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
10    PayloadValidator,
11};
12use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
13use reth_evm::{
14    execute::{BlockBuilder, BlockBuilderOutcome},
15    ConfigureEvm,
16};
17use reth_payload_primitives::{BuiltPayload, EngineApiMessageVersion, PayloadTypes};
18use reth_primitives_traits::{
19    block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
20};
21use reth_revm::{database::StateProviderDatabase, db::State};
22use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
23use std::{
24    collections::VecDeque,
25    future::Future,
26    pin::Pin,
27    task::{ready, Context, Poll},
28};
29use tokio::sync::oneshot;
30use tracing::*;
31
32#[derive(Debug)]
33enum EngineReorgState<T: PayloadTypes> {
34    Forward,
35    Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
36}
37
38type EngineReorgResponse = Result<
39    Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
40    oneshot::error::RecvError,
41>;
42
43type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
44
45/// Engine API stream wrapper that simulates reorgs with specified frequency.
46#[derive(Debug)]
47#[pin_project::pin_project]
48pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
49    /// Underlying stream
50    #[pin]
51    stream: S,
52    /// Database provider.
53    provider: Provider,
54    /// Evm configuration.
55    evm_config: Evm,
56    /// Payload validator.
57    payload_validator: Validator,
58    /// The frequency of reorgs.
59    frequency: usize,
60    /// The depth of reorgs.
61    depth: usize,
62    /// The number of forwarded forkchoice states.
63    /// This is reset after a reorg.
64    forkchoice_states_forwarded: usize,
65    /// Current state of the stream.
66    state: EngineReorgState<T>,
67    /// Last forkchoice state.
68    last_forkchoice_state: Option<ForkchoiceState>,
69    /// Pending engine responses to reorg messages.
70    reorg_responses: FuturesUnordered<ReorgResponseFut>,
71}
72
73impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
74    /// Creates new [`EngineReorg`] stream wrapper.
75    pub fn new(
76        stream: S,
77        provider: Provider,
78        evm_config: Evm,
79        payload_validator: Validator,
80        frequency: usize,
81        depth: usize,
82    ) -> Self {
83        Self {
84            stream,
85            provider,
86            evm_config,
87            payload_validator,
88            frequency,
89            depth,
90            state: EngineReorgState::Forward,
91            forkchoice_states_forwarded: 0,
92            last_forkchoice_state: None,
93            reorg_responses: FuturesUnordered::new(),
94        }
95    }
96}
97
98impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
99where
100    S: Stream<Item = BeaconEngineMessage<T>>,
101    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
102    Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
103        + StateProviderFactory
104        + ChainSpecProvider,
105    Evm: ConfigureEvm,
106    Validator: PayloadValidator<ExecutionData = T::ExecutionData, Block = BlockTy<Evm::Primitives>>,
107{
108    type Item = S::Item;
109
110    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111        let mut this = self.project();
112
113        loop {
114            if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
115                match response {
116                    Ok(Either::Left(Ok(payload_status))) => {
117                        debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
118                    }
119                    Ok(Either::Left(Err(payload_error))) => {
120                        error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
121                    }
122                    Ok(Either::Right(Ok(fcu_status))) => {
123                        debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
124                    }
125                    Ok(Either::Right(Err(fcu_error))) => {
126                        error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
127                    }
128                    Err(_) => {}
129                };
130                continue
131            }
132
133            if let EngineReorgState::Reorg { queue } = &mut this.state {
134                match queue.pop_front() {
135                    Some(msg) => return Poll::Ready(Some(msg)),
136                    None => {
137                        *this.forkchoice_states_forwarded = 0;
138                        *this.state = EngineReorgState::Forward;
139                    }
140                }
141            }
142
143            let next = ready!(this.stream.poll_next_unpin(cx));
144            let item = match (next, &this.last_forkchoice_state) {
145                (
146                    Some(BeaconEngineMessage::NewPayload { payload, tx }),
147                    Some(last_forkchoice_state),
148                ) if this.forkchoice_states_forwarded > this.frequency &&
149                        // Only enter reorg state if new payload attaches to current head.
150                        last_forkchoice_state.head_block_hash == payload.parent_hash() =>
151                {
152                    // Enter the reorg state.
153                    // The current payload will be immediately forwarded by being in front of the
154                    // queue. Then we attempt to reorg the current head by generating a payload that
155                    // attaches to the head's parent and is based on the non-conflicting
156                    // transactions (txs from block `n + 1` that are valid at block `n` according to
157                    // consensus checks) from the current payload as well as the corresponding
158                    // forkchoice state. We will rely on CL to reorg us back to canonical chain.
159                    // TODO: This is an expensive blocking operation, ideally it's spawned as a task
160                    // so that the stream could yield the control back.
161                    let reorg_block = match create_reorg_head(
162                        this.provider,
163                        this.evm_config,
164                        this.payload_validator,
165                        *this.depth,
166                        payload.clone(),
167                    ) {
168                        Ok(result) => result,
169                        Err(error) => {
170                            error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
171                            // Forward the payload and attempt to create reorg on top of
172                            // the next one
173                            return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
174                                payload,
175                                tx,
176                            }))
177                        }
178                    };
179                    let reorg_forkchoice_state = ForkchoiceState {
180                        finalized_block_hash: last_forkchoice_state.finalized_block_hash,
181                        safe_block_hash: last_forkchoice_state.safe_block_hash,
182                        head_block_hash: reorg_block.hash(),
183                    };
184
185                    let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
186                    let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
187                    this.reorg_responses.extend([
188                        Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
189                        Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
190                    ]);
191
192                    let queue = VecDeque::from([
193                        // Current payload
194                        BeaconEngineMessage::NewPayload { payload, tx },
195                        // Reorg payload
196                        BeaconEngineMessage::NewPayload {
197                            payload: T::block_to_payload(reorg_block),
198                            tx: reorg_payload_tx,
199                        },
200                        // Reorg forkchoice state
201                        BeaconEngineMessage::ForkchoiceUpdated {
202                            state: reorg_forkchoice_state,
203                            payload_attrs: None,
204                            tx: reorg_fcu_tx,
205                            version: EngineApiMessageVersion::default(),
206                        },
207                    ]);
208                    *this.state = EngineReorgState::Reorg { queue };
209                    continue
210                }
211                (
212                    Some(BeaconEngineMessage::ForkchoiceUpdated {
213                        state,
214                        payload_attrs,
215                        tx,
216                        version,
217                    }),
218                    _,
219                ) => {
220                    // Record last forkchoice state forwarded to the engine.
221                    // We do not care if it's valid since engine should be able to handle
222                    // reorgs that rely on invalid forkchoice state.
223                    *this.last_forkchoice_state = Some(state);
224                    *this.forkchoice_states_forwarded += 1;
225                    Some(BeaconEngineMessage::ForkchoiceUpdated {
226                        state,
227                        payload_attrs,
228                        tx,
229                        version,
230                    })
231                }
232                (item, _) => item,
233            };
234            return Poll::Ready(item)
235        }
236    }
237}
238
239fn create_reorg_head<Provider, Evm, Validator>(
240    provider: &Provider,
241    evm_config: &Evm,
242    payload_validator: &Validator,
243    mut depth: usize,
244    next_payload: Validator::ExecutionData,
245) -> RethResult<SealedBlock<BlockTy<Evm::Primitives>>>
246where
247    Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
248        + StateProviderFactory
249        + ChainSpecProvider<ChainSpec: EthChainSpec>,
250    Evm: ConfigureEvm,
251    Validator: PayloadValidator<Block = BlockTy<Evm::Primitives>>,
252{
253    // Ensure next payload is valid.
254    let next_block =
255        payload_validator.ensure_well_formed_payload(next_payload).map_err(RethError::msg)?;
256
257    // Fetch reorg target block depending on its depth and its parent.
258    let mut previous_hash = next_block.parent_hash();
259    let mut candidate_transactions = next_block.into_body().transactions().to_vec();
260    let reorg_target = 'target: {
261        loop {
262            let reorg_target = provider
263                .block_by_hash(previous_hash)?
264                .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
265            if depth == 0 {
266                break 'target reorg_target.seal_slow()
267            }
268
269            depth -= 1;
270            previous_hash = reorg_target.header().parent_hash();
271            candidate_transactions = reorg_target.into_body().into_transactions();
272        }
273    };
274    let reorg_target_parent = provider
275        .sealed_header_by_hash(reorg_target.header().parent_hash())?
276        .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
277
278    debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
279
280    // Configure state
281    let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
282    let mut state = State::builder()
283        .with_database_ref(StateProviderDatabase::new(&state_provider))
284        .with_bundle_update()
285        .build();
286
287    let ctx = evm_config.context_for_block(&reorg_target);
288    let evm = evm_config.evm_for_block(&mut state, &reorg_target);
289    let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
290
291    builder.apply_pre_execution_changes()?;
292
293    let mut cumulative_gas_used = 0;
294    for tx in candidate_transactions {
295        // ensure we still have capacity for this transaction
296        if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
297            continue
298        }
299
300        let tx_recovered =
301            tx.try_clone_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
302        let gas_used = match builder.execute_transaction(tx_recovered) {
303            Ok(gas_used) => gas_used,
304            Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
305                hash,
306                error,
307            })) => {
308                trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
309                continue
310            }
311            // Treat error as fatal
312            Err(error) => return Err(RethError::Execution(error)),
313        };
314
315        cumulative_gas_used += gas_used;
316    }
317
318    let BlockBuilderOutcome { block, .. } = builder.finish(&state_provider)?;
319
320    Ok(block.into_sealed_block())
321}