Skip to main content

reth_e2e_test_utils/testsuite/
setup.rs

1//! Test setup utilities for configuring the initial state.
2
3use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::B256;
6use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
7use eyre::{eyre, Result};
8use reth_chainspec::ChainSpec;
9use reth_ethereum_primitives::Block;
10use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
11use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
12use reth_node_core::primitives::RecoveredBlock;
13use revm::state::EvmState;
14use std::{marker::PhantomData, path::Path, sync::Arc};
15use tokio::{
16    sync::mpsc,
17    time::{sleep, Duration},
18};
19use tracing::debug;
20
21/// Configuration for setting up test environment
22#[derive(Debug)]
23pub struct Setup<I> {
24    /// Chain specification to use
25    pub chain_spec: Option<Arc<ChainSpec>>,
26    /// Genesis block to use
27    pub genesis: Option<Genesis>,
28    /// Blocks to replay during setup
29    pub blocks: Vec<RecoveredBlock<Block>>,
30    /// Initial state to load
31    pub state: Option<EvmState>,
32    /// Network configuration
33    pub network: NetworkSetup,
34    /// Engine tree configuration
35    pub tree_config: TreeConfig,
36    /// Shutdown channel to stop nodes when setup is dropped
37    shutdown_tx: Option<mpsc::Sender<()>>,
38    /// Is this setup in dev mode
39    pub is_dev: bool,
40    /// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
41    pub storage_v2: bool,
42    /// Tracks instance generic.
43    _phantom: PhantomData<I>,
44    /// Holds the import result to keep nodes alive when using imported chain
45    /// This is stored as an option to avoid lifetime issues with `tokio::spawn`
46    import_result_holder: Option<crate::setup_import::ChainImportResult>,
47    /// Path to RLP file to import during setup
48    pub import_rlp_path: Option<std::path::PathBuf>,
49}
50
51impl<I> Default for Setup<I> {
52    fn default() -> Self {
53        Self {
54            chain_spec: None,
55            genesis: None,
56            blocks: Vec::new(),
57            state: None,
58            network: NetworkSetup::default(),
59            tree_config: TreeConfig::default(),
60            shutdown_tx: None,
61            is_dev: true,
62            storage_v2: false,
63            _phantom: Default::default(),
64            import_result_holder: None,
65            import_rlp_path: None,
66        }
67    }
68}
69
70impl<I> Drop for Setup<I> {
71    fn drop(&mut self) {
72        // Send shutdown signal if the channel exists
73        if let Some(tx) = self.shutdown_tx.take() {
74            let _ = tx.try_send(());
75        }
76    }
77}
78
79impl<I> Setup<I>
80where
81    I: EngineTypes,
82{
83    /// Set the chain specification
84    pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
85        self.chain_spec = Some(chain_spec);
86        self
87    }
88
89    /// Set the genesis block
90    pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
91        self.genesis = Some(genesis);
92        self
93    }
94
95    /// Add a block to replay during setup
96    pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
97        self.blocks.push(block);
98        self
99    }
100
101    /// Add multiple blocks to replay during setup
102    pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
103        self.blocks.extend(blocks);
104        self
105    }
106
107    /// Set the initial state
108    pub fn with_state(mut self, state: EvmState) -> Self {
109        self.state = Some(state);
110        self
111    }
112
113    /// Set the network configuration
114    pub const fn with_network(mut self, network: NetworkSetup) -> Self {
115        self.network = network;
116        self
117    }
118
119    /// Set dev mode
120    pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
121        self.is_dev = is_dev;
122        self
123    }
124
125    /// Set the engine tree configuration
126    pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
127        self.tree_config = tree_config;
128        self
129    }
130
131    /// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
132    pub const fn with_storage_v2(mut self) -> Self {
133        self.storage_v2 = true;
134        self
135    }
136
137    /// Apply setup using pre-imported chain data from RLP file
138    pub async fn apply_with_import<N>(
139        &mut self,
140        env: &mut Environment<I>,
141        rlp_path: &Path,
142    ) -> Result<()>
143    where
144        N: NodeBuilderHelper<Payload = I>,
145    {
146        // Note: this future is quite large so we box it
147        Box::pin(self.apply_with_import_(env, rlp_path)).await
148    }
149
150    /// Apply setup using pre-imported chain data from RLP file
151    async fn apply_with_import_(
152        &mut self,
153        env: &mut Environment<I>,
154        rlp_path: &Path,
155    ) -> Result<()> {
156        // Create nodes with imported chain data
157        let import_result = self.create_nodes_with_import(rlp_path).await?;
158
159        // Extract node clients
160        let mut node_clients = Vec::new();
161        let nodes = &import_result.nodes;
162        for node in nodes {
163            let rpc = node
164                .rpc_client()
165                .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
166            let auth = node.auth_server_handle();
167            let url = node.rpc_url();
168            // TODO: Pass beacon_engine_handle once import system supports generic types
169            node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
170        }
171
172        // Store the import result to keep nodes alive
173        // They will be dropped when the Setup is dropped
174        self.import_result_holder = Some(import_result);
175
176        // Finalize setup - this will wait for nodes and initialize states
177        self.finalize_setup(env, node_clients, true).await
178    }
179
180    /// Apply the setup to the environment
181    pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
182    where
183        N: NodeBuilderHelper<Payload = I>,
184    {
185        // Note: this future is quite large so we box it
186        Box::pin(self.apply_::<N>(env)).await
187    }
188
189    /// Apply the setup to the environment
190    async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
191    where
192        N: NodeBuilderHelper<Payload = I>,
193    {
194        // If import_rlp_path is set, use apply_with_import instead
195        if let Some(rlp_path) = self.import_rlp_path.take() {
196            return self.apply_with_import::<N>(env, &rlp_path).await;
197        }
198        let chain_spec =
199            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
200
201        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
202        self.shutdown_tx = Some(shutdown_tx);
203
204        let is_dev = self.is_dev;
205        let storage_v2 = self.storage_v2;
206        let node_count = self.network.node_count;
207        let tree_config = self.tree_config.clone();
208
209        let attributes_generator = Self::create_static_attributes_generator::<N>();
210
211        let mut builder = E2ETestSetupBuilder::<N, _>::new(
212            node_count,
213            Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
214            attributes_generator,
215        )
216        .with_tree_config_modifier(move |base| {
217            tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
218        })
219        .with_node_config_modifier(move |config| config.set_dev(is_dev))
220        .with_connect_nodes(self.network.connect_nodes);
221
222        if storage_v2 {
223            builder = builder.with_storage_v2();
224        }
225
226        let result = builder.build().await;
227
228        let mut node_clients = Vec::new();
229        match result {
230            Ok((nodes, _wallet)) => {
231                // create HTTP clients for each node's RPC and Engine API endpoints
232                for node in &nodes {
233                    node_clients.push(node.to_node_client()?);
234                }
235
236                // spawn a separate task just to handle the shutdown
237                tokio::spawn(async move {
238                    // keep nodes in scope to ensure they're not dropped
239                    let _nodes = nodes;
240                    // Wait for shutdown signal
241                    let _ = shutdown_rx.recv().await;
242                    // nodes will be dropped here when the test completes
243                });
244            }
245            Err(e) => {
246                return Err(eyre!("Failed to setup nodes: {}", e));
247            }
248        }
249
250        // Finalize setup
251        self.finalize_setup(env, node_clients, false).await
252    }
253
254    /// Create nodes with imported chain data
255    ///
256    /// Note: Currently this only supports `EthereumNode` due to the import process
257    /// being Ethereum-specific. The generic parameter N is kept for consistency
258    /// with other methods but is not used.
259    async fn create_nodes_with_import(
260        &self,
261        rlp_path: &Path,
262    ) -> Result<crate::setup_import::ChainImportResult> {
263        let chain_spec =
264            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
265
266        let attributes_generator = move |timestamp| PayloadAttributes {
267            timestamp,
268            prev_randao: B256::ZERO,
269            suggested_fee_recipient: alloy_primitives::Address::ZERO,
270            withdrawals: Some(vec![]),
271            parent_beacon_block_root: Some(B256::ZERO),
272        };
273
274        crate::setup_import::setup_engine_with_chain_import(
275            self.network.node_count,
276            chain_spec,
277            self.is_dev,
278            self.tree_config.clone(),
279            rlp_path,
280            attributes_generator,
281        )
282        .await
283    }
284
285    /// Create a static attributes generator that doesn't capture any instance data
286    fn create_static_attributes_generator<N>(
287    ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes + Copy + use<N, I>
288    where
289        N: NodeBuilderHelper<Payload = I>,
290    {
291        move |timestamp| {
292            PayloadAttributes {
293                timestamp,
294                prev_randao: B256::ZERO,
295                suggested_fee_recipient: alloy_primitives::Address::ZERO,
296                withdrawals: Some(vec![]),
297                parent_beacon_block_root: Some(B256::ZERO),
298            }
299            .into()
300        }
301    }
302
303    /// Common finalization logic for both apply methods
304    async fn finalize_setup(
305        &self,
306        env: &mut Environment<I>,
307        node_clients: Vec<crate::testsuite::NodeClient<I>>,
308        use_latest_block: bool,
309    ) -> Result<()> {
310        if node_clients.is_empty() {
311            return Err(eyre!("No nodes were created"));
312        }
313
314        // Wait for all nodes to be ready
315        self.wait_for_nodes_ready(&node_clients).await?;
316
317        env.node_clients = node_clients;
318        env.initialize_node_states(self.network.node_count);
319
320        // Get initial block info (genesis or latest depending on use_latest_block)
321        let (initial_block_info, genesis_block_info) = if use_latest_block {
322            // For imported chain, get both latest and genesis
323            let latest =
324                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
325            let genesis =
326                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
327            (latest, genesis)
328        } else {
329            // For fresh chain, both are genesis
330            let genesis =
331                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
332            (genesis, genesis)
333        };
334
335        // Initialize all node states
336        for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
337            node_state.current_block_info = Some(initial_block_info);
338            node_state.latest_header_time = initial_block_info.timestamp;
339            node_state.latest_fork_choice_state = ForkchoiceState {
340                head_block_hash: initial_block_info.hash,
341                safe_block_hash: initial_block_info.hash,
342                finalized_block_hash: genesis_block_info.hash,
343            };
344
345            debug!(
346                "Node {} initialized with block {} (hash: {})",
347                node_idx, initial_block_info.number, initial_block_info.hash
348            );
349        }
350
351        debug!(
352            "Environment initialized with {} nodes, starting from block {} (hash: {})",
353            self.network.node_count, initial_block_info.number, initial_block_info.hash
354        );
355
356        // In test environments, explicitly set sync state to Idle after initialization
357        // This ensures that eth_syncing returns false as expected by tests
358        if let Some(import_result) = &self.import_result_holder {
359            for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
360                debug!("Setting sync state to Idle for node {}", idx);
361                node_ctx.inner.network.update_sync_state(SyncState::Idle);
362            }
363        }
364
365        Ok(())
366    }
367
368    /// Wait for all nodes to be ready to accept RPC requests
369    async fn wait_for_nodes_ready<P>(
370        &self,
371        node_clients: &[crate::testsuite::NodeClient<P>],
372    ) -> Result<()>
373    where
374        P: PayloadTypes,
375    {
376        for (idx, client) in node_clients.iter().enumerate() {
377            let mut retry_count = 0;
378            const MAX_RETRIES: usize = 10;
379
380            while retry_count < MAX_RETRIES {
381                if client.is_ready().await {
382                    debug!("Node {idx} RPC endpoint is ready");
383                    break;
384                }
385
386                retry_count += 1;
387                debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
388                sleep(Duration::from_millis(500)).await;
389            }
390
391            if retry_count == MAX_RETRIES {
392                return Err(eyre!(
393                    "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
394                ));
395            }
396        }
397        Ok(())
398    }
399
400    /// Get block info for a given block number or tag
401    async fn get_block_info<P>(
402        &self,
403        client: &crate::testsuite::NodeClient<P>,
404        block: BlockNumberOrTag,
405    ) -> Result<crate::testsuite::BlockInfo>
406    where
407        P: PayloadTypes,
408    {
409        let block = client
410            .get_block_by_number(block)
411            .await?
412            .ok_or_else(|| eyre!("Block {:?} not found", block))?;
413
414        Ok(crate::testsuite::BlockInfo {
415            hash: block.header.hash,
416            number: block.header.number,
417            timestamp: block.header.timestamp,
418        })
419    }
420}
421
422/// Genesis block configuration
423#[derive(Debug)]
424pub struct Genesis {}
425
426/// Network configuration for setup
427#[derive(Debug, Default)]
428pub struct NetworkSetup {
429    /// Number of nodes to create
430    pub node_count: usize,
431    /// Whether nodes should be connected to each other
432    pub connect_nodes: bool,
433}
434
435impl NetworkSetup {
436    /// Create a new network setup with a single node
437    pub const fn single_node() -> Self {
438        Self { node_count: 1, connect_nodes: true }
439    }
440
441    /// Create a new network setup with multiple nodes (connected)
442    pub const fn multi_node(count: usize) -> Self {
443        Self { node_count: count, connect_nodes: true }
444    }
445
446    /// Create a new network setup with multiple nodes (disconnected)
447    pub const fn multi_node_unconnected(count: usize) -> Self {
448        Self { node_count: count, connect_nodes: false }
449    }
450}