reth_e2e_test_utils/testsuite/
setup.rs

1//! Test setup utilities for configuring the initial state.
2
3use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::B256;
6use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
7use eyre::{eyre, Result};
8use reth_chainspec::ChainSpec;
9use reth_ethereum_primitives::Block;
10use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
11use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
12use reth_node_core::primitives::RecoveredBlock;
13use reth_payload_builder::EthPayloadBuilderAttributes;
14use revm::state::EvmState;
15use std::{marker::PhantomData, path::Path, sync::Arc};
16use tokio::{
17    sync::mpsc,
18    time::{sleep, Duration},
19};
20use tracing::debug;
21
22/// Configuration for setting up test environment
23#[derive(Debug)]
24pub struct Setup<I> {
25    /// Chain specification to use
26    pub chain_spec: Option<Arc<ChainSpec>>,
27    /// Genesis block to use
28    pub genesis: Option<Genesis>,
29    /// Blocks to replay during setup
30    pub blocks: Vec<RecoveredBlock<Block>>,
31    /// Initial state to load
32    pub state: Option<EvmState>,
33    /// Network configuration
34    pub network: NetworkSetup,
35    /// Engine tree configuration
36    pub tree_config: TreeConfig,
37    /// Shutdown channel to stop nodes when setup is dropped
38    shutdown_tx: Option<mpsc::Sender<()>>,
39    /// Is this setup in dev mode
40    pub is_dev: bool,
41    /// Tracks instance generic.
42    _phantom: PhantomData<I>,
43    /// Holds the import result to keep nodes alive when using imported chain
44    /// This is stored as an option to avoid lifetime issues with `tokio::spawn`
45    import_result_holder: Option<crate::setup_import::ChainImportResult>,
46    /// Path to RLP file to import during setup
47    pub import_rlp_path: Option<std::path::PathBuf>,
48}
49
50impl<I> Default for Setup<I> {
51    fn default() -> Self {
52        Self {
53            chain_spec: None,
54            genesis: None,
55            blocks: Vec::new(),
56            state: None,
57            network: NetworkSetup::default(),
58            tree_config: TreeConfig::default(),
59            shutdown_tx: None,
60            is_dev: true,
61            _phantom: Default::default(),
62            import_result_holder: None,
63            import_rlp_path: None,
64        }
65    }
66}
67
68impl<I> Drop for Setup<I> {
69    fn drop(&mut self) {
70        // Send shutdown signal if the channel exists
71        if let Some(tx) = self.shutdown_tx.take() {
72            let _ = tx.try_send(());
73        }
74    }
75}
76
77impl<I> Setup<I>
78where
79    I: EngineTypes,
80{
81    /// Set the chain specification
82    pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
83        self.chain_spec = Some(chain_spec);
84        self
85    }
86
87    /// Set the genesis block
88    pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
89        self.genesis = Some(genesis);
90        self
91    }
92
93    /// Add a block to replay during setup
94    pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
95        self.blocks.push(block);
96        self
97    }
98
99    /// Add multiple blocks to replay during setup
100    pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
101        self.blocks.extend(blocks);
102        self
103    }
104
105    /// Set the initial state
106    pub fn with_state(mut self, state: EvmState) -> Self {
107        self.state = Some(state);
108        self
109    }
110
111    /// Set the network configuration
112    pub const fn with_network(mut self, network: NetworkSetup) -> Self {
113        self.network = network;
114        self
115    }
116
117    /// Set dev mode
118    pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
119        self.is_dev = is_dev;
120        self
121    }
122
123    /// Set the engine tree configuration
124    pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
125        self.tree_config = tree_config;
126        self
127    }
128
129    /// Apply setup using pre-imported chain data from RLP file
130    pub async fn apply_with_import<N>(
131        &mut self,
132        env: &mut Environment<I>,
133        rlp_path: &Path,
134    ) -> Result<()>
135    where
136        N: NodeBuilderHelper<Payload = I>,
137    {
138        // Note: this future is quite large so we box it
139        Box::pin(self.apply_with_import_(env, rlp_path)).await
140    }
141
142    /// Apply setup using pre-imported chain data from RLP file
143    async fn apply_with_import_(
144        &mut self,
145        env: &mut Environment<I>,
146        rlp_path: &Path,
147    ) -> Result<()> {
148        // Create nodes with imported chain data
149        let import_result = self.create_nodes_with_import(rlp_path).await?;
150
151        // Extract node clients
152        let mut node_clients = Vec::new();
153        let nodes = &import_result.nodes;
154        for node in nodes {
155            let rpc = node
156                .rpc_client()
157                .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
158            let auth = node.auth_server_handle();
159            let url = node.rpc_url();
160            // TODO: Pass beacon_engine_handle once import system supports generic types
161            node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
162        }
163
164        // Store the import result to keep nodes alive
165        // They will be dropped when the Setup is dropped
166        self.import_result_holder = Some(import_result);
167
168        // Finalize setup - this will wait for nodes and initialize states
169        self.finalize_setup(env, node_clients, true).await
170    }
171
172    /// Apply the setup to the environment
173    pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
174    where
175        N: NodeBuilderHelper<Payload = I>,
176    {
177        // Note: this future is quite large so we box it
178        Box::pin(self.apply_::<N>(env)).await
179    }
180
181    /// Apply the setup to the environment
182    async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
183    where
184        N: NodeBuilderHelper<Payload = I>,
185    {
186        // If import_rlp_path is set, use apply_with_import instead
187        if let Some(rlp_path) = self.import_rlp_path.take() {
188            return self.apply_with_import::<N>(env, &rlp_path).await;
189        }
190        let chain_spec =
191            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
192
193        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
194        self.shutdown_tx = Some(shutdown_tx);
195
196        let is_dev = self.is_dev;
197        let node_count = self.network.node_count;
198
199        let attributes_generator = Self::create_static_attributes_generator::<N>();
200
201        let result = setup_engine_with_connection::<N>(
202            node_count,
203            Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
204            is_dev,
205            self.tree_config.clone(),
206            attributes_generator,
207            self.network.connect_nodes,
208        )
209        .await;
210
211        let mut node_clients = Vec::new();
212        match result {
213            Ok((nodes, executor, _wallet)) => {
214                // create HTTP clients for each node's RPC and Engine API endpoints
215                for node in &nodes {
216                    node_clients.push(node.to_node_client()?);
217                }
218
219                // spawn a separate task just to handle the shutdown
220                tokio::spawn(async move {
221                    // keep nodes and executor in scope to ensure they're not dropped
222                    let _nodes = nodes;
223                    let _executor = executor;
224                    // Wait for shutdown signal
225                    let _ = shutdown_rx.recv().await;
226                    // nodes and executor will be dropped here when the test completes
227                });
228            }
229            Err(e) => {
230                return Err(eyre!("Failed to setup nodes: {}", e));
231            }
232        }
233
234        // Finalize setup
235        self.finalize_setup(env, node_clients, false).await
236    }
237
238    /// Create nodes with imported chain data
239    ///
240    /// Note: Currently this only supports `EthereumNode` due to the import process
241    /// being Ethereum-specific. The generic parameter N is kept for consistency
242    /// with other methods but is not used.
243    async fn create_nodes_with_import(
244        &self,
245        rlp_path: &Path,
246    ) -> Result<crate::setup_import::ChainImportResult> {
247        let chain_spec =
248            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
249
250        let attributes_generator = move |timestamp| {
251            let attributes = PayloadAttributes {
252                timestamp,
253                prev_randao: B256::ZERO,
254                suggested_fee_recipient: alloy_primitives::Address::ZERO,
255                withdrawals: Some(vec![]),
256                parent_beacon_block_root: Some(B256::ZERO),
257            };
258            EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
259        };
260
261        crate::setup_import::setup_engine_with_chain_import(
262            self.network.node_count,
263            chain_spec,
264            self.is_dev,
265            self.tree_config.clone(),
266            rlp_path,
267            attributes_generator,
268        )
269        .await
270    }
271
272    /// Create a static attributes generator that doesn't capture any instance data
273    fn create_static_attributes_generator<N>(
274    ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes
275           + Copy
276           + use<N, I>
277    where
278        N: NodeBuilderHelper<Payload = I>,
279    {
280        move |timestamp| {
281            let attributes = PayloadAttributes {
282                timestamp,
283                prev_randao: B256::ZERO,
284                suggested_fee_recipient: alloy_primitives::Address::ZERO,
285                withdrawals: Some(vec![]),
286                parent_beacon_block_root: Some(B256::ZERO),
287            };
288            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
289                EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
290            )
291        }
292    }
293
294    /// Common finalization logic for both apply methods
295    async fn finalize_setup(
296        &self,
297        env: &mut Environment<I>,
298        node_clients: Vec<crate::testsuite::NodeClient<I>>,
299        use_latest_block: bool,
300    ) -> Result<()> {
301        if node_clients.is_empty() {
302            return Err(eyre!("No nodes were created"));
303        }
304
305        // Wait for all nodes to be ready
306        self.wait_for_nodes_ready(&node_clients).await?;
307
308        env.node_clients = node_clients;
309        env.initialize_node_states(self.network.node_count);
310
311        // Get initial block info (genesis or latest depending on use_latest_block)
312        let (initial_block_info, genesis_block_info) = if use_latest_block {
313            // For imported chain, get both latest and genesis
314            let latest =
315                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
316            let genesis =
317                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
318            (latest, genesis)
319        } else {
320            // For fresh chain, both are genesis
321            let genesis =
322                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
323            (genesis, genesis)
324        };
325
326        // Initialize all node states
327        for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
328            node_state.current_block_info = Some(initial_block_info);
329            node_state.latest_header_time = initial_block_info.timestamp;
330            node_state.latest_fork_choice_state = ForkchoiceState {
331                head_block_hash: initial_block_info.hash,
332                safe_block_hash: initial_block_info.hash,
333                finalized_block_hash: genesis_block_info.hash,
334            };
335
336            debug!(
337                "Node {} initialized with block {} (hash: {})",
338                node_idx, initial_block_info.number, initial_block_info.hash
339            );
340        }
341
342        debug!(
343            "Environment initialized with {} nodes, starting from block {} (hash: {})",
344            self.network.node_count, initial_block_info.number, initial_block_info.hash
345        );
346
347        // In test environments, explicitly set sync state to Idle after initialization
348        // This ensures that eth_syncing returns false as expected by tests
349        if let Some(import_result) = &self.import_result_holder {
350            for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
351                debug!("Setting sync state to Idle for node {}", idx);
352                node_ctx.inner.network.update_sync_state(SyncState::Idle);
353            }
354        }
355
356        Ok(())
357    }
358
359    /// Wait for all nodes to be ready to accept RPC requests
360    async fn wait_for_nodes_ready<P>(
361        &self,
362        node_clients: &[crate::testsuite::NodeClient<P>],
363    ) -> Result<()>
364    where
365        P: PayloadTypes,
366    {
367        for (idx, client) in node_clients.iter().enumerate() {
368            let mut retry_count = 0;
369            const MAX_RETRIES: usize = 10;
370
371            while retry_count < MAX_RETRIES {
372                if client.is_ready().await {
373                    debug!("Node {idx} RPC endpoint is ready");
374                    break;
375                }
376
377                retry_count += 1;
378                debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
379                sleep(Duration::from_millis(500)).await;
380            }
381
382            if retry_count == MAX_RETRIES {
383                return Err(eyre!(
384                    "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
385                ));
386            }
387        }
388        Ok(())
389    }
390
391    /// Get block info for a given block number or tag
392    async fn get_block_info<P>(
393        &self,
394        client: &crate::testsuite::NodeClient<P>,
395        block: BlockNumberOrTag,
396    ) -> Result<crate::testsuite::BlockInfo>
397    where
398        P: PayloadTypes,
399    {
400        let block = client
401            .get_block_by_number(block)
402            .await?
403            .ok_or_else(|| eyre!("Block {:?} not found", block))?;
404
405        Ok(crate::testsuite::BlockInfo {
406            hash: block.header.hash,
407            number: block.header.number,
408            timestamp: block.header.timestamp,
409        })
410    }
411}
412
413/// Genesis block configuration
414#[derive(Debug)]
415pub struct Genesis {}
416
417/// Network configuration for setup
418#[derive(Debug, Default)]
419pub struct NetworkSetup {
420    /// Number of nodes to create
421    pub node_count: usize,
422    /// Whether nodes should be connected to each other
423    pub connect_nodes: bool,
424}
425
426impl NetworkSetup {
427    /// Create a new network setup with a single node
428    pub const fn single_node() -> Self {
429        Self { node_count: 1, connect_nodes: true }
430    }
431
432    /// Create a new network setup with multiple nodes (connected)
433    pub const fn multi_node(count: usize) -> Self {
434        Self { node_count: count, connect_nodes: true }
435    }
436
437    /// Create a new network setup with multiple nodes (disconnected)
438    pub const fn multi_node_unconnected(count: usize) -> Self {
439        Self { node_count: count, connect_nodes: false }
440    }
441}