reth_e2e_test_utils/testsuite/
setup.rs

1//! Test setup utilities for configuring the initial state.
2
3use crate::{
4    setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper,
5    PayloadAttributesBuilder,
6};
7use alloy_eips::BlockNumberOrTag;
8use alloy_primitives::B256;
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
10use eyre::{eyre, Result};
11use reth_chainspec::ChainSpec;
12use reth_engine_local::LocalPayloadAttributesBuilder;
13use reth_ethereum_primitives::Block;
14use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
15use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
16use reth_node_core::primitives::RecoveredBlock;
17use reth_payload_builder::EthPayloadBuilderAttributes;
18use revm::state::EvmState;
19use std::{marker::PhantomData, path::Path, sync::Arc};
20use tokio::{
21    sync::mpsc,
22    time::{sleep, Duration},
23};
24use tracing::debug;
25
26/// Configuration for setting up test environment
27#[derive(Debug)]
28pub struct Setup<I> {
29    /// Chain specification to use
30    pub chain_spec: Option<Arc<ChainSpec>>,
31    /// Genesis block to use
32    pub genesis: Option<Genesis>,
33    /// Blocks to replay during setup
34    pub blocks: Vec<RecoveredBlock<Block>>,
35    /// Initial state to load
36    pub state: Option<EvmState>,
37    /// Network configuration
38    pub network: NetworkSetup,
39    /// Engine tree configuration
40    pub tree_config: TreeConfig,
41    /// Shutdown channel to stop nodes when setup is dropped
42    shutdown_tx: Option<mpsc::Sender<()>>,
43    /// Is this setup in dev mode
44    pub is_dev: bool,
45    /// Tracks instance generic.
46    _phantom: PhantomData<I>,
47    /// Holds the import result to keep nodes alive when using imported chain
48    /// This is stored as an option to avoid lifetime issues with `tokio::spawn`
49    import_result_holder: Option<crate::setup_import::ChainImportResult>,
50    /// Path to RLP file to import during setup
51    pub import_rlp_path: Option<std::path::PathBuf>,
52}
53
54impl<I> Default for Setup<I> {
55    fn default() -> Self {
56        Self {
57            chain_spec: None,
58            genesis: None,
59            blocks: Vec::new(),
60            state: None,
61            network: NetworkSetup::default(),
62            tree_config: TreeConfig::default(),
63            shutdown_tx: None,
64            is_dev: true,
65            _phantom: Default::default(),
66            import_result_holder: None,
67            import_rlp_path: None,
68        }
69    }
70}
71
72impl<I> Drop for Setup<I> {
73    fn drop(&mut self) {
74        // Send shutdown signal if the channel exists
75        if let Some(tx) = self.shutdown_tx.take() {
76            let _ = tx.try_send(());
77        }
78    }
79}
80
81impl<I> Setup<I>
82where
83    I: EngineTypes,
84{
85    /// Create a new setup with default values
86    pub fn new() -> Self {
87        Self::default()
88    }
89
90    /// Set the chain specification
91    pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
92        self.chain_spec = Some(chain_spec);
93        self
94    }
95
96    /// Set the genesis block
97    pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
98        self.genesis = Some(genesis);
99        self
100    }
101
102    /// Add a block to replay during setup
103    pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
104        self.blocks.push(block);
105        self
106    }
107
108    /// Add multiple blocks to replay during setup
109    pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
110        self.blocks.extend(blocks);
111        self
112    }
113
114    /// Set the initial state
115    pub fn with_state(mut self, state: EvmState) -> Self {
116        self.state = Some(state);
117        self
118    }
119
120    /// Set the network configuration
121    pub const fn with_network(mut self, network: NetworkSetup) -> Self {
122        self.network = network;
123        self
124    }
125
126    /// Set dev mode
127    pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
128        self.is_dev = is_dev;
129        self
130    }
131
132    /// Set the engine tree configuration
133    pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
134        self.tree_config = tree_config;
135        self
136    }
137
138    /// Apply setup using pre-imported chain data from RLP file
139    pub async fn apply_with_import<N>(
140        &mut self,
141        env: &mut Environment<I>,
142        rlp_path: &Path,
143    ) -> Result<()>
144    where
145        N: NodeBuilderHelper,
146        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
147            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
148        >,
149    {
150        // Note: this future is quite large so we box it
151        Box::pin(self.apply_with_import_::<N>(env, rlp_path)).await
152    }
153
154    /// Apply setup using pre-imported chain data from RLP file
155    async fn apply_with_import_<N>(
156        &mut self,
157        env: &mut Environment<I>,
158        rlp_path: &Path,
159    ) -> Result<()>
160    where
161        N: NodeBuilderHelper,
162        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
163            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
164        >,
165    {
166        // Create nodes with imported chain data
167        let import_result = self.create_nodes_with_import::<N>(rlp_path).await?;
168
169        // Extract node clients
170        let mut node_clients = Vec::new();
171        let nodes = &import_result.nodes;
172        for node in nodes {
173            let rpc = node
174                .rpc_client()
175                .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
176            let auth = node.auth_server_handle();
177            let url = node.rpc_url();
178            node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
179        }
180
181        // Store the import result to keep nodes alive
182        // They will be dropped when the Setup is dropped
183        self.import_result_holder = Some(import_result);
184
185        // Finalize setup - this will wait for nodes and initialize states
186        self.finalize_setup(env, node_clients, true).await
187    }
188
189    /// Apply the setup to the environment
190    pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
191    where
192        N: NodeBuilderHelper,
193        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
194            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
195        >,
196    {
197        // Note: this future is quite large so we box it
198        Box::pin(self.apply_::<N>(env)).await
199    }
200
201    /// Apply the setup to the environment
202    async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
203    where
204        N: NodeBuilderHelper,
205        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
206            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
207        >,
208    {
209        // If import_rlp_path is set, use apply_with_import instead
210        if let Some(rlp_path) = self.import_rlp_path.take() {
211            return self.apply_with_import::<N>(env, &rlp_path).await;
212        }
213        let chain_spec =
214            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
215
216        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
217        self.shutdown_tx = Some(shutdown_tx);
218
219        let is_dev = self.is_dev;
220        let node_count = self.network.node_count;
221
222        let attributes_generator = self.create_attributes_generator::<N>();
223
224        let result = setup_engine_with_connection::<N>(
225            node_count,
226            Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
227            is_dev,
228            self.tree_config.clone(),
229            attributes_generator,
230            self.network.connect_nodes,
231        )
232        .await;
233
234        let mut node_clients = Vec::new();
235        match result {
236            Ok((nodes, executor, _wallet)) => {
237                // create HTTP clients for each node's RPC and Engine API endpoints
238                for node in &nodes {
239                    let rpc = node
240                        .rpc_client()
241                        .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
242                    let auth = node.auth_server_handle();
243                    let url = node.rpc_url();
244
245                    node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
246                }
247
248                // spawn a separate task just to handle the shutdown
249                tokio::spawn(async move {
250                    // keep nodes and executor in scope to ensure they're not dropped
251                    let _nodes = nodes;
252                    let _executor = executor;
253                    // Wait for shutdown signal
254                    let _ = shutdown_rx.recv().await;
255                    // nodes and executor will be dropped here when the test completes
256                });
257            }
258            Err(e) => {
259                return Err(eyre!("Failed to setup nodes: {}", e));
260            }
261        }
262
263        // Finalize setup
264        self.finalize_setup(env, node_clients, false).await
265    }
266
267    /// Create nodes with imported chain data
268    ///
269    /// Note: Currently this only supports `EthereumNode` due to the import process
270    /// being Ethereum-specific. The generic parameter N is kept for consistency
271    /// with other methods but is not used.
272    async fn create_nodes_with_import<N>(
273        &self,
274        rlp_path: &Path,
275    ) -> Result<crate::setup_import::ChainImportResult>
276    where
277        N: NodeBuilderHelper,
278        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
279            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
280        >,
281    {
282        let chain_spec =
283            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
284
285        let attributes_generator = move |timestamp| {
286            let attributes = PayloadAttributes {
287                timestamp,
288                prev_randao: B256::ZERO,
289                suggested_fee_recipient: alloy_primitives::Address::ZERO,
290                withdrawals: Some(vec![]),
291                parent_beacon_block_root: Some(B256::ZERO),
292            };
293            EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
294        };
295
296        crate::setup_import::setup_engine_with_chain_import(
297            self.network.node_count,
298            chain_spec,
299            self.is_dev,
300            self.tree_config.clone(),
301            rlp_path,
302            attributes_generator,
303        )
304        .await
305    }
306
307    /// Create the attributes generator function
308    fn create_attributes_generator<N>(
309        &self,
310    ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Copy
311    where
312        N: NodeBuilderHelper,
313        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
314            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
315        >,
316    {
317        move |timestamp| {
318            let attributes = PayloadAttributes {
319                timestamp,
320                prev_randao: B256::ZERO,
321                suggested_fee_recipient: alloy_primitives::Address::ZERO,
322                withdrawals: Some(vec![]),
323                parent_beacon_block_root: Some(B256::ZERO),
324            };
325            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
326                EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
327            )
328        }
329    }
330
331    /// Common finalization logic for both apply methods
332    async fn finalize_setup(
333        &self,
334        env: &mut Environment<I>,
335        node_clients: Vec<crate::testsuite::NodeClient>,
336        use_latest_block: bool,
337    ) -> Result<()> {
338        if node_clients.is_empty() {
339            return Err(eyre!("No nodes were created"));
340        }
341
342        // Wait for all nodes to be ready
343        self.wait_for_nodes_ready(&node_clients).await?;
344
345        env.node_clients = node_clients;
346        env.initialize_node_states(self.network.node_count);
347
348        // Get initial block info (genesis or latest depending on use_latest_block)
349        let (initial_block_info, genesis_block_info) = if use_latest_block {
350            // For imported chain, get both latest and genesis
351            let latest =
352                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
353            let genesis =
354                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
355            (latest, genesis)
356        } else {
357            // For fresh chain, both are genesis
358            let genesis =
359                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
360            (genesis, genesis)
361        };
362
363        // Initialize all node states
364        for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
365            node_state.current_block_info = Some(initial_block_info);
366            node_state.latest_header_time = initial_block_info.timestamp;
367            node_state.latest_fork_choice_state = ForkchoiceState {
368                head_block_hash: initial_block_info.hash,
369                safe_block_hash: initial_block_info.hash,
370                finalized_block_hash: genesis_block_info.hash,
371            };
372
373            debug!(
374                "Node {} initialized with block {} (hash: {})",
375                node_idx, initial_block_info.number, initial_block_info.hash
376            );
377        }
378
379        debug!(
380            "Environment initialized with {} nodes, starting from block {} (hash: {})",
381            self.network.node_count, initial_block_info.number, initial_block_info.hash
382        );
383
384        // In test environments, explicitly set sync state to Idle after initialization
385        // This ensures that eth_syncing returns false as expected by tests
386        if let Some(import_result) = &self.import_result_holder {
387            for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
388                debug!("Setting sync state to Idle for node {}", idx);
389                node_ctx.inner.network.update_sync_state(SyncState::Idle);
390            }
391        }
392
393        Ok(())
394    }
395
396    /// Wait for all nodes to be ready to accept RPC requests
397    async fn wait_for_nodes_ready(
398        &self,
399        node_clients: &[crate::testsuite::NodeClient],
400    ) -> Result<()> {
401        for (idx, client) in node_clients.iter().enumerate() {
402            let mut retry_count = 0;
403            const MAX_RETRIES: usize = 10;
404
405            while retry_count < MAX_RETRIES {
406                if client.is_ready().await {
407                    debug!("Node {idx} RPC endpoint is ready");
408                    break;
409                }
410
411                retry_count += 1;
412                debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
413                sleep(Duration::from_millis(500)).await;
414            }
415
416            if retry_count == MAX_RETRIES {
417                return Err(eyre!(
418                    "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
419                ));
420            }
421        }
422        Ok(())
423    }
424
425    /// Get block info for a given block number or tag
426    async fn get_block_info(
427        &self,
428        client: &crate::testsuite::NodeClient,
429        block: BlockNumberOrTag,
430    ) -> Result<crate::testsuite::BlockInfo> {
431        let block = client
432            .get_block_by_number(block)
433            .await?
434            .ok_or_else(|| eyre!("Block {:?} not found", block))?;
435
436        Ok(crate::testsuite::BlockInfo {
437            hash: block.header.hash,
438            number: block.header.number,
439            timestamp: block.header.timestamp,
440        })
441    }
442}
443
444/// Genesis block configuration
445#[derive(Debug)]
446pub struct Genesis {}
447
448/// Network configuration for setup
449#[derive(Debug, Default)]
450pub struct NetworkSetup {
451    /// Number of nodes to create
452    pub node_count: usize,
453    /// Whether nodes should be connected to each other
454    pub connect_nodes: bool,
455}
456
457impl NetworkSetup {
458    /// Create a new network setup with a single node
459    pub const fn single_node() -> Self {
460        Self { node_count: 1, connect_nodes: true }
461    }
462
463    /// Create a new network setup with multiple nodes (connected)
464    pub const fn multi_node(count: usize) -> Self {
465        Self { node_count: count, connect_nodes: true }
466    }
467
468    /// Create a new network setup with multiple nodes (disconnected)
469    pub const fn multi_node_unconnected(count: usize) -> Self {
470        Self { node_count: count, connect_nodes: false }
471    }
472}