Skip to main content

reth_e2e_test_utils/testsuite/
setup.rs

1//! Test setup utilities for configuring the initial state.
2
3use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::B256;
6use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
7use eyre::{eyre, Result};
8use reth_chainspec::ChainSpec;
9use reth_ethereum_primitives::Block;
10use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
11use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
12use reth_node_core::primitives::RecoveredBlock;
13use revm::state::EvmState;
14use std::{marker::PhantomData, path::Path, sync::Arc};
15use tokio::{
16    sync::mpsc,
17    time::{sleep, Duration},
18};
19use tracing::debug;
20
21/// Configuration for setting up test environment
22#[derive(Debug)]
23pub struct Setup<I> {
24    /// Chain specification to use
25    pub chain_spec: Option<Arc<ChainSpec>>,
26    /// Genesis block to use
27    pub genesis: Option<Genesis>,
28    /// Blocks to replay during setup
29    pub blocks: Vec<RecoveredBlock<Block>>,
30    /// Initial state to load
31    pub state: Option<EvmState>,
32    /// Network configuration
33    pub network: NetworkSetup,
34    /// Engine tree configuration
35    pub tree_config: TreeConfig,
36    /// Shutdown channel to stop nodes when setup is dropped
37    shutdown_tx: Option<mpsc::Sender<()>>,
38    /// Is this setup in dev mode
39    pub is_dev: bool,
40    /// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
41    pub storage_v2: bool,
42    /// Tracks instance generic.
43    _phantom: PhantomData<I>,
44    /// Holds the import result to keep nodes alive when using imported chain
45    /// This is stored as an option to avoid lifetime issues with `tokio::spawn`
46    import_result_holder: Option<crate::setup_import::ChainImportResult>,
47    /// Path to RLP file to import during setup
48    pub import_rlp_path: Option<std::path::PathBuf>,
49}
50
51impl<I> Default for Setup<I> {
52    fn default() -> Self {
53        Self {
54            chain_spec: None,
55            genesis: None,
56            blocks: Vec::new(),
57            state: None,
58            network: NetworkSetup::default(),
59            tree_config: TreeConfig::default(),
60            shutdown_tx: None,
61            is_dev: true,
62            storage_v2: false,
63            _phantom: Default::default(),
64            import_result_holder: None,
65            import_rlp_path: None,
66        }
67    }
68}
69
70impl<I> Drop for Setup<I> {
71    fn drop(&mut self) {
72        // Send shutdown signal if the channel exists
73        if let Some(tx) = self.shutdown_tx.take() {
74            let _ = tx.try_send(());
75        }
76    }
77}
78
79impl<I> Setup<I>
80where
81    I: EngineTypes,
82{
83    /// Set the chain specification
84    pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
85        self.chain_spec = Some(chain_spec);
86        self
87    }
88
89    /// Set the genesis block
90    pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
91        self.genesis = Some(genesis);
92        self
93    }
94
95    /// Add a block to replay during setup
96    pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
97        self.blocks.push(block);
98        self
99    }
100
101    /// Add multiple blocks to replay during setup
102    pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
103        self.blocks.extend(blocks);
104        self
105    }
106
107    /// Set the initial state
108    pub fn with_state(mut self, state: EvmState) -> Self {
109        self.state = Some(state);
110        self
111    }
112
113    /// Set the network configuration
114    pub const fn with_network(mut self, network: NetworkSetup) -> Self {
115        self.network = network;
116        self
117    }
118
119    /// Set dev mode
120    pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
121        self.is_dev = is_dev;
122        self
123    }
124
125    /// Set the engine tree configuration
126    pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
127        self.tree_config = tree_config;
128        self
129    }
130
131    /// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
132    pub const fn with_storage_v2(mut self) -> Self {
133        self.storage_v2 = true;
134        self
135    }
136
137    /// Apply setup using pre-imported chain data from RLP file
138    pub async fn apply_with_import<N>(
139        &mut self,
140        env: &mut Environment<I>,
141        rlp_path: &Path,
142    ) -> Result<()>
143    where
144        N: NodeBuilderHelper<Payload = I>,
145    {
146        // Note: this future is quite large so we box it
147        Box::pin(self.apply_with_import_(env, rlp_path)).await
148    }
149
150    /// Apply setup using pre-imported chain data from RLP file
151    async fn apply_with_import_(
152        &mut self,
153        env: &mut Environment<I>,
154        rlp_path: &Path,
155    ) -> Result<()> {
156        // Create nodes with imported chain data
157        let import_result = self.create_nodes_with_import(rlp_path).await?;
158
159        // Extract node clients
160        let mut node_clients = Vec::new();
161        let nodes = &import_result.nodes;
162        for node in nodes {
163            let rpc = node
164                .rpc_client()
165                .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
166            let auth = node.auth_server_handle();
167            let url = node.rpc_url();
168            // TODO: Pass beacon_engine_handle once import system supports generic types
169            node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
170        }
171
172        // Store the import result to keep nodes alive
173        // They will be dropped when the Setup is dropped
174        self.import_result_holder = Some(import_result);
175
176        // Finalize setup - this will wait for nodes and initialize states
177        self.finalize_setup(env, node_clients, true).await
178    }
179
180    /// Apply the setup to the environment
181    pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
182    where
183        N: NodeBuilderHelper<Payload = I>,
184    {
185        // Note: this future is quite large so we box it
186        Box::pin(self.apply_::<N>(env)).await
187    }
188
189    /// Apply the setup to the environment
190    async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
191    where
192        N: NodeBuilderHelper<Payload = I>,
193    {
194        // If import_rlp_path is set, use apply_with_import instead
195        if let Some(rlp_path) = self.import_rlp_path.take() {
196            return self.apply_with_import::<N>(env, &rlp_path).await;
197        }
198        let chain_spec =
199            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
200
201        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
202        self.shutdown_tx = Some(shutdown_tx);
203
204        let is_dev = self.is_dev;
205        let storage_v2 = self.storage_v2;
206        let node_count = self.network.node_count;
207        let tree_config = self.tree_config.clone();
208
209        let attributes_generator = Self::create_static_attributes_generator::<N>();
210
211        let mut builder = E2ETestSetupBuilder::<N, _>::new(
212            node_count,
213            Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
214            attributes_generator,
215        )
216        .with_tree_config_modifier(move |base| {
217            tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
218        })
219        .with_node_config_modifier(move |config| config.set_dev(is_dev))
220        .with_connect_nodes(self.network.connect_nodes);
221
222        if storage_v2 {
223            builder = builder.with_storage_v2();
224        }
225
226        let result = builder.build().await;
227
228        let mut node_clients = Vec::new();
229        match result {
230            Ok((nodes, _wallet)) => {
231                // create HTTP clients for each node's RPC and Engine API endpoints
232                for node in &nodes {
233                    node_clients.push(node.to_node_client()?);
234                }
235
236                // spawn a separate task just to handle the shutdown
237                tokio::spawn(async move {
238                    // keep nodes in scope to ensure they're not dropped
239                    let _nodes = nodes;
240                    // Wait for shutdown signal
241                    let _ = shutdown_rx.recv().await;
242                    // nodes will be dropped here when the test completes
243                });
244            }
245            Err(e) => {
246                return Err(eyre!("Failed to setup nodes: {}", e));
247            }
248        }
249
250        // Finalize setup
251        self.finalize_setup(env, node_clients, false).await
252    }
253
254    /// Create nodes with imported chain data
255    ///
256    /// Note: Currently this only supports `EthereumNode` due to the import process
257    /// being Ethereum-specific. The generic parameter N is kept for consistency
258    /// with other methods but is not used.
259    async fn create_nodes_with_import(
260        &self,
261        rlp_path: &Path,
262    ) -> Result<crate::setup_import::ChainImportResult> {
263        let chain_spec =
264            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
265
266        let attributes_generator = move |timestamp| PayloadAttributes {
267            timestamp,
268            prev_randao: B256::ZERO,
269            suggested_fee_recipient: alloy_primitives::Address::ZERO,
270            withdrawals: Some(vec![]),
271            parent_beacon_block_root: Some(B256::ZERO),
272            slot_number: None,
273        };
274
275        crate::setup_import::setup_engine_with_chain_import(
276            self.network.node_count,
277            chain_spec,
278            self.is_dev,
279            self.tree_config.clone(),
280            rlp_path,
281            attributes_generator,
282        )
283        .await
284    }
285
286    /// Create a static attributes generator that doesn't capture any instance data
287    fn create_static_attributes_generator<N>(
288    ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes + Copy + use<N, I>
289    where
290        N: NodeBuilderHelper<Payload = I>,
291    {
292        move |timestamp| {
293            PayloadAttributes {
294                timestamp,
295                prev_randao: B256::ZERO,
296                suggested_fee_recipient: alloy_primitives::Address::ZERO,
297                withdrawals: Some(vec![]),
298                parent_beacon_block_root: Some(B256::ZERO),
299                slot_number: None,
300            }
301            .into()
302        }
303    }
304
305    /// Common finalization logic for both apply methods
306    async fn finalize_setup(
307        &self,
308        env: &mut Environment<I>,
309        node_clients: Vec<crate::testsuite::NodeClient<I>>,
310        use_latest_block: bool,
311    ) -> Result<()> {
312        if node_clients.is_empty() {
313            return Err(eyre!("No nodes were created"));
314        }
315
316        // Wait for all nodes to be ready
317        self.wait_for_nodes_ready(&node_clients).await?;
318
319        env.node_clients = node_clients;
320        env.initialize_node_states(self.network.node_count);
321
322        // Get initial block info (genesis or latest depending on use_latest_block)
323        let (initial_block_info, genesis_block_info) = if use_latest_block {
324            // For imported chain, get both latest and genesis
325            let latest =
326                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
327            let genesis =
328                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
329            (latest, genesis)
330        } else {
331            // For fresh chain, both are genesis
332            let genesis =
333                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
334            (genesis, genesis)
335        };
336
337        // Initialize all node states
338        for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
339            node_state.current_block_info = Some(initial_block_info);
340            node_state.latest_header_time = initial_block_info.timestamp;
341            node_state.latest_fork_choice_state = ForkchoiceState {
342                head_block_hash: initial_block_info.hash,
343                safe_block_hash: initial_block_info.hash,
344                finalized_block_hash: genesis_block_info.hash,
345            };
346
347            debug!(
348                "Node {} initialized with block {} (hash: {})",
349                node_idx, initial_block_info.number, initial_block_info.hash
350            );
351        }
352
353        debug!(
354            "Environment initialized with {} nodes, starting from block {} (hash: {})",
355            self.network.node_count, initial_block_info.number, initial_block_info.hash
356        );
357
358        // In test environments, explicitly set sync state to Idle after initialization
359        // This ensures that eth_syncing returns false as expected by tests
360        if let Some(import_result) = &self.import_result_holder {
361            for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
362                debug!("Setting sync state to Idle for node {}", idx);
363                node_ctx.inner.network.update_sync_state(SyncState::Idle);
364            }
365        }
366
367        Ok(())
368    }
369
370    /// Wait for all nodes to be ready to accept RPC requests
371    async fn wait_for_nodes_ready<P>(
372        &self,
373        node_clients: &[crate::testsuite::NodeClient<P>],
374    ) -> Result<()>
375    where
376        P: PayloadTypes,
377    {
378        for (idx, client) in node_clients.iter().enumerate() {
379            let mut retry_count = 0;
380            const MAX_RETRIES: usize = 10;
381
382            while retry_count < MAX_RETRIES {
383                if client.is_ready().await {
384                    debug!("Node {idx} RPC endpoint is ready");
385                    break;
386                }
387
388                retry_count += 1;
389                debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
390                sleep(Duration::from_millis(500)).await;
391            }
392
393            if retry_count == MAX_RETRIES {
394                return Err(eyre!(
395                    "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
396                ));
397            }
398        }
399        Ok(())
400    }
401
402    /// Get block info for a given block number or tag
403    async fn get_block_info<P>(
404        &self,
405        client: &crate::testsuite::NodeClient<P>,
406        block: BlockNumberOrTag,
407    ) -> Result<crate::testsuite::BlockInfo>
408    where
409        P: PayloadTypes,
410    {
411        let block = client
412            .get_block_by_number(block)
413            .await?
414            .ok_or_else(|| eyre!("Block {:?} not found", block))?;
415
416        Ok(crate::testsuite::BlockInfo {
417            hash: block.header.hash,
418            number: block.header.number,
419            timestamp: block.header.timestamp,
420        })
421    }
422}
423
424/// Genesis block configuration
425#[derive(Debug)]
426pub struct Genesis {}
427
428/// Network configuration for setup
429#[derive(Debug, Default)]
430pub struct NetworkSetup {
431    /// Number of nodes to create
432    pub node_count: usize,
433    /// Whether nodes should be connected to each other
434    pub connect_nodes: bool,
435}
436
437impl NetworkSetup {
438    /// Create a new network setup with a single node
439    pub const fn single_node() -> Self {
440        Self { node_count: 1, connect_nodes: true }
441    }
442
443    /// Create a new network setup with multiple nodes (connected)
444    pub const fn multi_node(count: usize) -> Self {
445        Self { node_count: count, connect_nodes: true }
446    }
447
448    /// Create a new network setup with multiple nodes (disconnected)
449    pub const fn multi_node_unconnected(count: usize) -> Self {
450        Self { node_count: count, connect_nodes: false }
451    }
452}