reth_e2e_test_utils/testsuite/
actions.rs

1//! Actions that can be performed in tests.
2
3use crate::testsuite::Environment;
4use alloy_primitives::{Bytes, B256};
5use alloy_rpc_types_engine::{
6    ExecutionPayloadV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
7};
8use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
9use eyre::Result;
10use futures_util::future::BoxFuture;
11use reth_node_api::{EngineTypes, PayloadTypes};
12use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
13use std::{future::Future, marker::PhantomData, time::Duration};
14use tokio::time::sleep;
15use tracing::debug;
16
17/// An action that can be performed on an instance.
18///
19/// Actions execute operations and potentially make assertions in a single step.
20/// The action name indicates what it does (e.g., `AssertMineBlock` would both
21/// mine a block and assert it worked).
22pub trait Action<I>: Send + 'static {
23    /// Executes the action
24    fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>>;
25}
26
27/// Simplified action container for storage in tests
28#[expect(missing_debug_implementations)]
29pub struct ActionBox<I>(Box<dyn Action<I>>);
30
31impl<I: 'static> ActionBox<I> {
32    /// Constructor for [`ActionBox`].
33    pub fn new<A: Action<I>>(action: A) -> Self {
34        Self(Box::new(action))
35    }
36
37    /// Executes an [`ActionBox`] with the given [`Environment`] reference.
38    pub async fn execute(mut self, env: &mut Environment<I>) -> Result<()> {
39        self.0.execute(env).await
40    }
41}
42
43/// Implementation of `Action` for any function/closure that takes an Environment
44/// reference and returns a Future resolving to Result<()>.
45///
46/// This allows using closures directly as actions with `.with_action(async move |env| {...})`.
47impl<I, F, Fut> Action<I> for F
48where
49    F: FnMut(&Environment<I>) -> Fut + Send + 'static,
50    Fut: Future<Output = Result<()>> + Send + 'static,
51{
52    fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
53        Box::pin(self(env))
54    }
55}
56
57/// Mine a single block with the given transactions and verify the block was created
58/// successfully.
59#[derive(Debug)]
60pub struct AssertMineBlock<Engine>
61where
62    Engine: PayloadTypes,
63{
64    /// The node index to mine
65    pub node_idx: usize,
66    /// Transactions to include in the block
67    pub transactions: Vec<Bytes>,
68    /// Expected block hash (optional)
69    pub expected_hash: Option<B256>,
70    /// Block's payload attributes
71    // TODO: refactor once we have actions to generate payload attributes.
72    pub payload_attributes: Engine::PayloadAttributes,
73    /// Tracks engine type
74    _phantom: PhantomData<Engine>,
75}
76
77impl<Engine> AssertMineBlock<Engine>
78where
79    Engine: PayloadTypes,
80{
81    /// Create a new `AssertMineBlock` action
82    pub fn new(
83        node_idx: usize,
84        transactions: Vec<Bytes>,
85        expected_hash: Option<B256>,
86        payload_attributes: Engine::PayloadAttributes,
87    ) -> Self {
88        Self {
89            node_idx,
90            transactions,
91            expected_hash,
92            payload_attributes,
93            _phantom: Default::default(),
94        }
95    }
96}
97
98impl<Engine> Action<Engine> for AssertMineBlock<Engine>
99where
100    Engine: EngineTypes,
101{
102    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
103        Box::pin(async move {
104            if self.node_idx >= env.node_clients.len() {
105                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
106            }
107
108            let node_client = &env.node_clients[self.node_idx];
109            let rpc_client = &node_client.rpc;
110            let engine_client = &node_client.engine;
111
112            // get the latest block to use as parent
113            let latest_block =
114                EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
115                    rpc_client,
116                    alloy_eips::BlockNumberOrTag::Latest,
117                    false,
118                )
119                .await?;
120
121            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
122            let parent_hash = latest_block.header.hash;
123
124            debug!("Latest block hash: {parent_hash}");
125
126            // create a simple forkchoice state with the latest block as head
127            let fork_choice_state = ForkchoiceState {
128                head_block_hash: parent_hash,
129                safe_block_hash: parent_hash,
130                finalized_block_hash: parent_hash,
131            };
132
133            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v2(
134                engine_client,
135                fork_choice_state,
136                Some(self.payload_attributes.clone()),
137            )
138            .await?;
139
140            debug!("FCU result: {:?}", fcu_result);
141
142            // check if we got a valid payload ID
143            match fcu_result.payload_status.status {
144                PayloadStatusEnum::Valid => {
145                    if let Some(payload_id) = fcu_result.payload_id {
146                        debug!("Got payload ID: {payload_id}");
147
148                        // get the payload that was built
149                        let _engine_payload =
150                            EngineApiClient::<Engine>::get_payload_v2(engine_client, payload_id)
151                                .await?;
152                        Ok(())
153                    } else {
154                        Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
155                    }
156                }
157                _ => Err(eyre::eyre!("Payload status not valid: {:?}", fcu_result.payload_status)),
158            }
159        })
160    }
161}
162/// Pick the next block producer based on the latest block information.
163#[derive(Debug, Default)]
164pub struct PickNextBlockProducer {}
165
166impl PickNextBlockProducer {
167    /// Create a new `PickNextBlockProducer` action
168    pub const fn new() -> Self {
169        Self {}
170    }
171}
172
173impl<Engine> Action<Engine> for PickNextBlockProducer
174where
175    Engine: EngineTypes,
176{
177    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
178        Box::pin(async move {
179            let num_clients = env.node_clients.len();
180            if num_clients == 0 {
181                return Err(eyre::eyre!("No node clients available"));
182            }
183
184            let latest_info = env
185                .latest_block_info
186                .as_ref()
187                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
188
189            // Calculate the starting index based on the latest block number
190            let start_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
191
192            for i in 0..num_clients {
193                let idx = (start_idx + i) % num_clients;
194                let node_client = &env.node_clients[idx];
195                let rpc_client = &node_client.rpc;
196
197                let latest_block =
198                    EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
199                        rpc_client,
200                        alloy_eips::BlockNumberOrTag::Latest,
201                        false,
202                    )
203                    .await?;
204
205                if let Some(block) = latest_block {
206                    let block_number = block.header.number;
207                    let block_hash = block.header.hash;
208
209                    // Check if the block hash and number match the latest block info
210                    if block_hash == latest_info.hash && block_number == latest_info.number {
211                        env.last_producer_idx = Some(idx);
212                        debug!("Selected node {} as the next block producer", idx);
213                        return Ok(());
214                    }
215                }
216            }
217
218            Err(eyre::eyre!("No suitable block producer found"))
219        })
220    }
221}
222
223/// Store payload attributes for the next block.
224#[derive(Debug, Default)]
225pub struct GeneratePayloadAttributes {}
226
227impl<Engine> Action<Engine> for GeneratePayloadAttributes
228where
229    Engine: EngineTypes,
230{
231    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
232        Box::pin(async move {
233            let latest_block = env
234                .latest_block_info
235                .as_ref()
236                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
237            let block_number = latest_block.number;
238            let timestamp = env.latest_header_time + env.block_timestamp_increment;
239            let payload_attributes = alloy_rpc_types_engine::PayloadAttributes {
240                timestamp,
241                prev_randao: B256::random(),
242                suggested_fee_recipient: alloy_primitives::Address::random(),
243                withdrawals: Some(vec![]),
244                parent_beacon_block_root: Some(B256::ZERO),
245            };
246
247            env.payload_attributes.insert(latest_block.number + 1, payload_attributes);
248            debug!("Stored payload attributes for block {}", block_number + 1);
249            Ok(())
250        })
251    }
252}
253/// Action that generates the next payload
254#[derive(Debug, Default)]
255pub struct GenerateNextPayload {}
256
257impl<Engine> Action<Engine> for GenerateNextPayload
258where
259    Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
260    reth_node_ethereum::engine::EthPayloadAttributes:
261        From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
262{
263    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
264        Box::pin(async move {
265            let latest_block = env
266                .latest_block_info
267                .as_ref()
268                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
269
270            let parent_hash = latest_block.hash;
271            debug!("Latest block hash: {parent_hash}");
272
273            let fork_choice_state = ForkchoiceState {
274                head_block_hash: parent_hash,
275                safe_block_hash: parent_hash,
276                finalized_block_hash: parent_hash,
277            };
278
279            let payload_attributes: PayloadAttributes = env
280                .payload_attributes
281                .get(&latest_block.number)
282                .cloned()
283                .ok_or_else(|| eyre::eyre!("No payload attributes found for latest block"))?;
284
285            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
286                &env.node_clients[0].engine,
287                fork_choice_state,
288                Some(payload_attributes.clone()),
289            )
290            .await?;
291
292            debug!("FCU result: {:?}", fcu_result);
293
294            let payload_id = fcu_result
295                .payload_id
296                .ok_or_else(|| eyre::eyre!("No payload ID returned from forkChoiceUpdated"))?;
297
298            debug!("Received payload ID: {:?}", payload_id);
299            env.next_payload_id = Some(payload_id);
300
301            sleep(Duration::from_secs(1)).await;
302
303            let built_payload: PayloadAttributes =
304                EngineApiClient::<Engine>::get_payload_v3(&env.node_clients[0].engine, payload_id)
305                    .await?
306                    .into();
307            env.payload_id_history.insert(latest_block.number + 1, payload_id);
308            env.latest_payload_built = Some(built_payload);
309
310            Ok(())
311        })
312    }
313}
314
315///Action that broadcasts the latest fork choice state to all clients
316#[derive(Debug, Default)]
317pub struct BroadcastLatestForkchoice {}
318
319impl<Engine> Action<Engine> for BroadcastLatestForkchoice
320where
321    Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
322    reth_node_ethereum::engine::EthPayloadAttributes:
323        From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
324{
325    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
326        Box::pin(async move {
327            let payload = env.latest_payload_executed.clone();
328
329            if env.node_clients.is_empty() {
330                return Err(eyre::eyre!("No node clients available"));
331            }
332            let latest_block = env
333                .latest_block_info
334                .as_ref()
335                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
336
337            let parent_hash = latest_block.hash;
338            debug!("Latest block hash: {parent_hash}");
339
340            let fork_choice_state = ForkchoiceState {
341                head_block_hash: parent_hash,
342                safe_block_hash: parent_hash,
343                finalized_block_hash: parent_hash,
344            };
345            debug!(
346                "Broadcasting forkchoice update to {} clients. Head: {:?}",
347                env.node_clients.len(),
348                fork_choice_state.head_block_hash
349            );
350
351            for (idx, client) in env.node_clients.iter().enumerate() {
352                let engine_client = &client.engine;
353
354                match EngineApiClient::<Engine>::fork_choice_updated_v3(
355                    engine_client,
356                    fork_choice_state,
357                    payload.clone(),
358                )
359                .await
360                {
361                    Ok(resp) => {
362                        debug!(
363                            "Client {}: Forkchoice update status: {:?}",
364                            idx, resp.payload_status.status
365                        );
366                    }
367                    Err(err) => {
368                        return Err(eyre::eyre!(
369                            "Client {}: Failed to broadcast forkchoice: {:?}",
370                            idx,
371                            err
372                        ));
373                    }
374                }
375            }
376            debug!("Forkchoice update broadcasted successfully");
377            Ok(())
378        })
379    }
380}
381
382/// Action that produces a sequence of blocks using the available clients
383#[derive(Debug)]
384pub struct ProduceBlocks<Engine> {
385    /// Number of blocks to produce
386    pub num_blocks: u64,
387    /// Tracks engine type
388    _phantom: PhantomData<Engine>,
389}
390
391impl<Engine> ProduceBlocks<Engine> {
392    /// Create a new `ProduceBlocks` action
393    pub fn new(num_blocks: u64) -> Self {
394        Self { num_blocks, _phantom: Default::default() }
395    }
396}
397
398impl<Engine> Default for ProduceBlocks<Engine> {
399    fn default() -> Self {
400        Self::new(0)
401    }
402}
403
404impl<Engine> Action<Engine> for ProduceBlocks<Engine>
405where
406    Engine: EngineTypes,
407{
408    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
409        Box::pin(async move {
410            // Create a sequence for producing a single block
411            let mut sequence = Sequence::new(vec![
412                Box::new(PickNextBlockProducer::default()),
413                Box::new(GeneratePayloadAttributes::default()),
414            ]);
415            for _ in 0..self.num_blocks {
416                sequence.execute(env).await?;
417            }
418            Ok(())
419        })
420    }
421}
422
423/// Run a sequence of actions in series.
424#[expect(missing_debug_implementations)]
425pub struct Sequence<I> {
426    /// Actions to execute in sequence
427    pub actions: Vec<Box<dyn Action<I>>>,
428}
429
430impl<I> Sequence<I> {
431    /// Create a new sequence of actions
432    pub fn new(actions: Vec<Box<dyn Action<I>>>) -> Self {
433        Self { actions }
434    }
435}
436
437impl<I: Sync + Send + 'static> Action<I> for Sequence<I> {
438    fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
439        Box::pin(async move {
440            // Execute each action in sequence
441            for action in &mut self.actions {
442                action.execute(env).await?;
443            }
444
445            Ok(())
446        })
447    }
448}
449
450/// Action that braodcasts the next new payload
451#[derive(Debug, Default)]
452pub struct BroadcastNextNewPayload {}
453
454impl<Engine> Action<Engine> for BroadcastNextNewPayload
455where
456    Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
457    reth_node_ethereum::engine::EthPayloadAttributes:
458        From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
459{
460    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
461        Box::pin(async move {
462            // Get the next new payload to broadcast
463            let next_new_payload = env
464                .latest_payload_built
465                .as_ref()
466                .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
467            let parent_beacon_block_root = next_new_payload
468                .parent_beacon_block_root
469                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
470
471            // Loop through all clients and broadcast the next new payload
472            let mut successful_broadcast: bool = false;
473
474            for client in &env.node_clients {
475                let engine = &client.engine;
476                let rpc_client = &client.rpc;
477
478                // Get latest block from the client
479                let rpc_latest_block =
480                    EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
481                        rpc_client,
482                        alloy_eips::BlockNumberOrTag::Latest,
483                        false,
484                    )
485                    .await?
486                    .ok_or_else(|| eyre::eyre!("No latest block found from rpc"))?;
487
488                let latest_block = reth_ethereum_primitives::Block {
489                    header: rpc_latest_block.header.inner,
490                    body: reth_ethereum_primitives::BlockBody {
491                        transactions: rpc_latest_block
492                            .transactions
493                            .into_transactions()
494                            .map(|tx| tx.inner.into_inner().into())
495                            .collect(),
496                        ommers: Default::default(),
497                        withdrawals: rpc_latest_block.withdrawals,
498                    },
499                };
500
501                // Validate block number matches expected
502                let latest_block_info = env
503                    .latest_block_info
504                    .as_ref()
505                    .ok_or_else(|| eyre::eyre!("No latest block info found"))?;
506
507                if latest_block.header.number != latest_block_info.number {
508                    return Err(eyre::eyre!(
509                        "Client block number {} does not match expected block number {}",
510                        latest_block.header.number,
511                        latest_block_info.number
512                    ));
513                }
514
515                // Validate parent beacon block root
516                let latest_block_parent_beacon_block_root =
517                    latest_block.parent_beacon_block_root.ok_or_else(|| {
518                        eyre::eyre!("No parent beacon block root for latest block")
519                    })?;
520
521                if parent_beacon_block_root != latest_block_parent_beacon_block_root {
522                    return Err(eyre::eyre!(
523                        "Parent beacon block root mismatch: expected {:?}, got {:?}",
524                        parent_beacon_block_root,
525                        latest_block_parent_beacon_block_root
526                    ));
527                }
528
529                // Construct and broadcast the execution payload from the latest block
530                // The latest block should contain the latest_payload_built
531                let execution_payload = ExecutionPayloadV3::from_block_slow(&latest_block);
532                let result = EngineApiClient::<Engine>::new_payload_v3(
533                    engine,
534                    execution_payload,
535                    vec![],
536                    parent_beacon_block_root,
537                )
538                .await?;
539
540                // Check if broadcast was successful
541                if result.status == PayloadStatusEnum::Valid {
542                    successful_broadcast = true;
543                    // We don't need to update the latest payload built since it should be the same.
544                    // env.latest_payload_built = Some(next_new_payload.clone());
545                    env.latest_payload_executed = Some(next_new_payload.clone());
546                    break;
547                } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
548                    debug!(
549                        "Invalid payload status returned from broadcast: {:?}",
550                        validation_error
551                    );
552                }
553            }
554
555            if !successful_broadcast {
556                return Err(eyre::eyre!("Failed to successfully broadcast payload to any client"));
557            }
558
559            Ok(())
560        })
561    }
562}