reth_e2e_test_utils/testsuite/
setup.rs

1//! Test setup utilities for configuring the initial state.
2
3use crate::{
4    setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper,
5    PayloadAttributesBuilder,
6};
7use alloy_eips::BlockNumberOrTag;
8use alloy_primitives::B256;
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
10use eyre::{eyre, Result};
11use reth_chainspec::ChainSpec;
12use reth_engine_local::LocalPayloadAttributesBuilder;
13use reth_ethereum_primitives::Block;
14use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
15use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
16use reth_node_core::primitives::RecoveredBlock;
17use reth_payload_builder::EthPayloadBuilderAttributes;
18use revm::state::EvmState;
19use std::{marker::PhantomData, path::Path, sync::Arc};
20use tokio::{
21    sync::mpsc,
22    time::{sleep, Duration},
23};
24use tracing::debug;
25
26/// Configuration for setting up test environment
27#[derive(Debug)]
28pub struct Setup<I> {
29    /// Chain specification to use
30    pub chain_spec: Option<Arc<ChainSpec>>,
31    /// Genesis block to use
32    pub genesis: Option<Genesis>,
33    /// Blocks to replay during setup
34    pub blocks: Vec<RecoveredBlock<Block>>,
35    /// Initial state to load
36    pub state: Option<EvmState>,
37    /// Network configuration
38    pub network: NetworkSetup,
39    /// Engine tree configuration
40    pub tree_config: TreeConfig,
41    /// Shutdown channel to stop nodes when setup is dropped
42    shutdown_tx: Option<mpsc::Sender<()>>,
43    /// Is this setup in dev mode
44    pub is_dev: bool,
45    /// Tracks instance generic.
46    _phantom: PhantomData<I>,
47    /// Holds the import result to keep nodes alive when using imported chain
48    /// This is stored as an option to avoid lifetime issues with `tokio::spawn`
49    import_result_holder: Option<crate::setup_import::ChainImportResult>,
50    /// Path to RLP file to import during setup
51    pub import_rlp_path: Option<std::path::PathBuf>,
52}
53
54impl<I> Default for Setup<I> {
55    fn default() -> Self {
56        Self {
57            chain_spec: None,
58            genesis: None,
59            blocks: Vec::new(),
60            state: None,
61            network: NetworkSetup::default(),
62            tree_config: TreeConfig::default(),
63            shutdown_tx: None,
64            is_dev: true,
65            _phantom: Default::default(),
66            import_result_holder: None,
67            import_rlp_path: None,
68        }
69    }
70}
71
72impl<I> Drop for Setup<I> {
73    fn drop(&mut self) {
74        // Send shutdown signal if the channel exists
75        if let Some(tx) = self.shutdown_tx.take() {
76            let _ = tx.try_send(());
77        }
78    }
79}
80
81impl<I> Setup<I>
82where
83    I: EngineTypes,
84{
85    /// Set the chain specification
86    pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
87        self.chain_spec = Some(chain_spec);
88        self
89    }
90
91    /// Set the genesis block
92    pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
93        self.genesis = Some(genesis);
94        self
95    }
96
97    /// Add a block to replay during setup
98    pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
99        self.blocks.push(block);
100        self
101    }
102
103    /// Add multiple blocks to replay during setup
104    pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
105        self.blocks.extend(blocks);
106        self
107    }
108
109    /// Set the initial state
110    pub fn with_state(mut self, state: EvmState) -> Self {
111        self.state = Some(state);
112        self
113    }
114
115    /// Set the network configuration
116    pub const fn with_network(mut self, network: NetworkSetup) -> Self {
117        self.network = network;
118        self
119    }
120
121    /// Set dev mode
122    pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
123        self.is_dev = is_dev;
124        self
125    }
126
127    /// Set the engine tree configuration
128    pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
129        self.tree_config = tree_config;
130        self
131    }
132
133    /// Apply setup using pre-imported chain data from RLP file
134    pub async fn apply_with_import<N>(
135        &mut self,
136        env: &mut Environment<I>,
137        rlp_path: &Path,
138    ) -> Result<()>
139    where
140        N: NodeBuilderHelper<Payload = I>,
141        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
142            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
143        >,
144    {
145        // Note: this future is quite large so we box it
146        Box::pin(self.apply_with_import_::<N>(env, rlp_path)).await
147    }
148
149    /// Apply setup using pre-imported chain data from RLP file
150    async fn apply_with_import_<N>(
151        &mut self,
152        env: &mut Environment<I>,
153        rlp_path: &Path,
154    ) -> Result<()>
155    where
156        N: NodeBuilderHelper<Payload = I>,
157        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
158            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
159        >,
160    {
161        // Create nodes with imported chain data
162        let import_result = self.create_nodes_with_import::<N>(rlp_path).await?;
163
164        // Extract node clients
165        let mut node_clients = Vec::new();
166        let nodes = &import_result.nodes;
167        for node in nodes {
168            let rpc = node
169                .rpc_client()
170                .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
171            let auth = node.auth_server_handle();
172            let url = node.rpc_url();
173            // TODO: Pass beacon_engine_handle once import system supports generic types
174            node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
175        }
176
177        // Store the import result to keep nodes alive
178        // They will be dropped when the Setup is dropped
179        self.import_result_holder = Some(import_result);
180
181        // Finalize setup - this will wait for nodes and initialize states
182        self.finalize_setup(env, node_clients, true).await
183    }
184
185    /// Apply the setup to the environment
186    pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
187    where
188        N: NodeBuilderHelper<Payload = I>,
189        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
190            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
191        >,
192    {
193        // Note: this future is quite large so we box it
194        Box::pin(self.apply_::<N>(env)).await
195    }
196
197    /// Apply the setup to the environment
198    async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
199    where
200        N: NodeBuilderHelper<Payload = I>,
201        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
202            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
203        >,
204    {
205        // If import_rlp_path is set, use apply_with_import instead
206        if let Some(rlp_path) = self.import_rlp_path.take() {
207            return self.apply_with_import::<N>(env, &rlp_path).await;
208        }
209        let chain_spec =
210            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
211
212        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
213        self.shutdown_tx = Some(shutdown_tx);
214
215        let is_dev = self.is_dev;
216        let node_count = self.network.node_count;
217
218        let attributes_generator = Self::create_static_attributes_generator::<N>();
219
220        let result = setup_engine_with_connection::<N>(
221            node_count,
222            Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
223            is_dev,
224            self.tree_config.clone(),
225            attributes_generator,
226            self.network.connect_nodes,
227        )
228        .await;
229
230        let mut node_clients = Vec::new();
231        match result {
232            Ok((nodes, executor, _wallet)) => {
233                // create HTTP clients for each node's RPC and Engine API endpoints
234                for node in &nodes {
235                    node_clients.push(node.to_node_client()?);
236                }
237
238                // spawn a separate task just to handle the shutdown
239                tokio::spawn(async move {
240                    // keep nodes and executor in scope to ensure they're not dropped
241                    let _nodes = nodes;
242                    let _executor = executor;
243                    // Wait for shutdown signal
244                    let _ = shutdown_rx.recv().await;
245                    // nodes and executor will be dropped here when the test completes
246                });
247            }
248            Err(e) => {
249                return Err(eyre!("Failed to setup nodes: {}", e));
250            }
251        }
252
253        // Finalize setup
254        self.finalize_setup(env, node_clients, false).await
255    }
256
257    /// Create nodes with imported chain data
258    ///
259    /// Note: Currently this only supports `EthereumNode` due to the import process
260    /// being Ethereum-specific. The generic parameter N is kept for consistency
261    /// with other methods but is not used.
262    async fn create_nodes_with_import<N>(
263        &self,
264        rlp_path: &Path,
265    ) -> Result<crate::setup_import::ChainImportResult>
266    where
267        N: NodeBuilderHelper<Payload = I>,
268        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
269            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
270        >,
271    {
272        let chain_spec =
273            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
274
275        let attributes_generator = move |timestamp| {
276            let attributes = PayloadAttributes {
277                timestamp,
278                prev_randao: B256::ZERO,
279                suggested_fee_recipient: alloy_primitives::Address::ZERO,
280                withdrawals: Some(vec![]),
281                parent_beacon_block_root: Some(B256::ZERO),
282            };
283            EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
284        };
285
286        crate::setup_import::setup_engine_with_chain_import(
287            self.network.node_count,
288            chain_spec,
289            self.is_dev,
290            self.tree_config.clone(),
291            rlp_path,
292            attributes_generator,
293        )
294        .await
295    }
296
297    /// Create a static attributes generator that doesn't capture any instance data
298    fn create_static_attributes_generator<N>(
299    ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes
300           + Copy
301           + use<N, I>
302    where
303        N: NodeBuilderHelper<Payload = I>,
304        LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
305            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
306        >,
307    {
308        move |timestamp| {
309            let attributes = PayloadAttributes {
310                timestamp,
311                prev_randao: B256::ZERO,
312                suggested_fee_recipient: alloy_primitives::Address::ZERO,
313                withdrawals: Some(vec![]),
314                parent_beacon_block_root: Some(B256::ZERO),
315            };
316            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
317                EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
318            )
319        }
320    }
321
322    /// Common finalization logic for both apply methods
323    async fn finalize_setup(
324        &self,
325        env: &mut Environment<I>,
326        node_clients: Vec<crate::testsuite::NodeClient<I>>,
327        use_latest_block: bool,
328    ) -> Result<()> {
329        if node_clients.is_empty() {
330            return Err(eyre!("No nodes were created"));
331        }
332
333        // Wait for all nodes to be ready
334        self.wait_for_nodes_ready(&node_clients).await?;
335
336        env.node_clients = node_clients;
337        env.initialize_node_states(self.network.node_count);
338
339        // Get initial block info (genesis or latest depending on use_latest_block)
340        let (initial_block_info, genesis_block_info) = if use_latest_block {
341            // For imported chain, get both latest and genesis
342            let latest =
343                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
344            let genesis =
345                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
346            (latest, genesis)
347        } else {
348            // For fresh chain, both are genesis
349            let genesis =
350                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
351            (genesis, genesis)
352        };
353
354        // Initialize all node states
355        for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
356            node_state.current_block_info = Some(initial_block_info);
357            node_state.latest_header_time = initial_block_info.timestamp;
358            node_state.latest_fork_choice_state = ForkchoiceState {
359                head_block_hash: initial_block_info.hash,
360                safe_block_hash: initial_block_info.hash,
361                finalized_block_hash: genesis_block_info.hash,
362            };
363
364            debug!(
365                "Node {} initialized with block {} (hash: {})",
366                node_idx, initial_block_info.number, initial_block_info.hash
367            );
368        }
369
370        debug!(
371            "Environment initialized with {} nodes, starting from block {} (hash: {})",
372            self.network.node_count, initial_block_info.number, initial_block_info.hash
373        );
374
375        // In test environments, explicitly set sync state to Idle after initialization
376        // This ensures that eth_syncing returns false as expected by tests
377        if let Some(import_result) = &self.import_result_holder {
378            for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
379                debug!("Setting sync state to Idle for node {}", idx);
380                node_ctx.inner.network.update_sync_state(SyncState::Idle);
381            }
382        }
383
384        Ok(())
385    }
386
387    /// Wait for all nodes to be ready to accept RPC requests
388    async fn wait_for_nodes_ready<P>(
389        &self,
390        node_clients: &[crate::testsuite::NodeClient<P>],
391    ) -> Result<()>
392    where
393        P: PayloadTypes,
394    {
395        for (idx, client) in node_clients.iter().enumerate() {
396            let mut retry_count = 0;
397            const MAX_RETRIES: usize = 10;
398
399            while retry_count < MAX_RETRIES {
400                if client.is_ready().await {
401                    debug!("Node {idx} RPC endpoint is ready");
402                    break;
403                }
404
405                retry_count += 1;
406                debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
407                sleep(Duration::from_millis(500)).await;
408            }
409
410            if retry_count == MAX_RETRIES {
411                return Err(eyre!(
412                    "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
413                ));
414            }
415        }
416        Ok(())
417    }
418
419    /// Get block info for a given block number or tag
420    async fn get_block_info<P>(
421        &self,
422        client: &crate::testsuite::NodeClient<P>,
423        block: BlockNumberOrTag,
424    ) -> Result<crate::testsuite::BlockInfo>
425    where
426        P: PayloadTypes,
427    {
428        let block = client
429            .get_block_by_number(block)
430            .await?
431            .ok_or_else(|| eyre!("Block {:?} not found", block))?;
432
433        Ok(crate::testsuite::BlockInfo {
434            hash: block.header.hash,
435            number: block.header.number,
436            timestamp: block.header.timestamp,
437        })
438    }
439}
440
441/// Genesis block configuration
442#[derive(Debug)]
443pub struct Genesis {}
444
445/// Network configuration for setup
446#[derive(Debug, Default)]
447pub struct NetworkSetup {
448    /// Number of nodes to create
449    pub node_count: usize,
450    /// Whether nodes should be connected to each other
451    pub connect_nodes: bool,
452}
453
454impl NetworkSetup {
455    /// Create a new network setup with a single node
456    pub const fn single_node() -> Self {
457        Self { node_count: 1, connect_nodes: true }
458    }
459
460    /// Create a new network setup with multiple nodes (connected)
461    pub const fn multi_node(count: usize) -> Self {
462        Self { node_count: count, connect_nodes: true }
463    }
464
465    /// Create a new network setup with multiple nodes (disconnected)
466    pub const fn multi_node_unconnected(count: usize) -> Self {
467        Self { node_count: count, connect_nodes: false }
468    }
469}