Skip to main content

reth_e2e_test_utils/testsuite/
setup.rs

1//! Test setup utilities for configuring the initial state.
2
3use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::B256;
6use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
7use eyre::{eyre, Result};
8use reth_chainspec::ChainSpec;
9use reth_ethereum_primitives::Block;
10use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
11use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
12use reth_node_core::primitives::RecoveredBlock;
13use reth_payload_builder::EthPayloadBuilderAttributes;
14use revm::state::EvmState;
15use std::{marker::PhantomData, path::Path, sync::Arc};
16use tokio::{
17    sync::mpsc,
18    time::{sleep, Duration},
19};
20use tracing::debug;
21
22/// Configuration for setting up test environment
23#[derive(Debug)]
24pub struct Setup<I> {
25    /// Chain specification to use
26    pub chain_spec: Option<Arc<ChainSpec>>,
27    /// Genesis block to use
28    pub genesis: Option<Genesis>,
29    /// Blocks to replay during setup
30    pub blocks: Vec<RecoveredBlock<Block>>,
31    /// Initial state to load
32    pub state: Option<EvmState>,
33    /// Network configuration
34    pub network: NetworkSetup,
35    /// Engine tree configuration
36    pub tree_config: TreeConfig,
37    /// Shutdown channel to stop nodes when setup is dropped
38    shutdown_tx: Option<mpsc::Sender<()>>,
39    /// Is this setup in dev mode
40    pub is_dev: bool,
41    /// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
42    pub storage_v2: bool,
43    /// Tracks instance generic.
44    _phantom: PhantomData<I>,
45    /// Holds the import result to keep nodes alive when using imported chain
46    /// This is stored as an option to avoid lifetime issues with `tokio::spawn`
47    import_result_holder: Option<crate::setup_import::ChainImportResult>,
48    /// Path to RLP file to import during setup
49    pub import_rlp_path: Option<std::path::PathBuf>,
50}
51
52impl<I> Default for Setup<I> {
53    fn default() -> Self {
54        Self {
55            chain_spec: None,
56            genesis: None,
57            blocks: Vec::new(),
58            state: None,
59            network: NetworkSetup::default(),
60            tree_config: TreeConfig::default(),
61            shutdown_tx: None,
62            is_dev: true,
63            storage_v2: false,
64            _phantom: Default::default(),
65            import_result_holder: None,
66            import_rlp_path: None,
67        }
68    }
69}
70
71impl<I> Drop for Setup<I> {
72    fn drop(&mut self) {
73        // Send shutdown signal if the channel exists
74        if let Some(tx) = self.shutdown_tx.take() {
75            let _ = tx.try_send(());
76        }
77    }
78}
79
80impl<I> Setup<I>
81where
82    I: EngineTypes,
83{
84    /// Set the chain specification
85    pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
86        self.chain_spec = Some(chain_spec);
87        self
88    }
89
90    /// Set the genesis block
91    pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
92        self.genesis = Some(genesis);
93        self
94    }
95
96    /// Add a block to replay during setup
97    pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
98        self.blocks.push(block);
99        self
100    }
101
102    /// Add multiple blocks to replay during setup
103    pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
104        self.blocks.extend(blocks);
105        self
106    }
107
108    /// Set the initial state
109    pub fn with_state(mut self, state: EvmState) -> Self {
110        self.state = Some(state);
111        self
112    }
113
114    /// Set the network configuration
115    pub const fn with_network(mut self, network: NetworkSetup) -> Self {
116        self.network = network;
117        self
118    }
119
120    /// Set dev mode
121    pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
122        self.is_dev = is_dev;
123        self
124    }
125
126    /// Set the engine tree configuration
127    pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
128        self.tree_config = tree_config;
129        self
130    }
131
132    /// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
133    pub const fn with_storage_v2(mut self) -> Self {
134        self.storage_v2 = true;
135        self
136    }
137
138    /// Apply setup using pre-imported chain data from RLP file
139    pub async fn apply_with_import<N>(
140        &mut self,
141        env: &mut Environment<I>,
142        rlp_path: &Path,
143    ) -> Result<()>
144    where
145        N: NodeBuilderHelper<Payload = I>,
146    {
147        // Note: this future is quite large so we box it
148        Box::pin(self.apply_with_import_(env, rlp_path)).await
149    }
150
151    /// Apply setup using pre-imported chain data from RLP file
152    async fn apply_with_import_(
153        &mut self,
154        env: &mut Environment<I>,
155        rlp_path: &Path,
156    ) -> Result<()> {
157        // Create nodes with imported chain data
158        let import_result = self.create_nodes_with_import(rlp_path).await?;
159
160        // Extract node clients
161        let mut node_clients = Vec::new();
162        let nodes = &import_result.nodes;
163        for node in nodes {
164            let rpc = node
165                .rpc_client()
166                .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
167            let auth = node.auth_server_handle();
168            let url = node.rpc_url();
169            // TODO: Pass beacon_engine_handle once import system supports generic types
170            node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
171        }
172
173        // Store the import result to keep nodes alive
174        // They will be dropped when the Setup is dropped
175        self.import_result_holder = Some(import_result);
176
177        // Finalize setup - this will wait for nodes and initialize states
178        self.finalize_setup(env, node_clients, true).await
179    }
180
181    /// Apply the setup to the environment
182    pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
183    where
184        N: NodeBuilderHelper<Payload = I>,
185    {
186        // Note: this future is quite large so we box it
187        Box::pin(self.apply_::<N>(env)).await
188    }
189
190    /// Apply the setup to the environment
191    async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
192    where
193        N: NodeBuilderHelper<Payload = I>,
194    {
195        // If import_rlp_path is set, use apply_with_import instead
196        if let Some(rlp_path) = self.import_rlp_path.take() {
197            return self.apply_with_import::<N>(env, &rlp_path).await;
198        }
199        let chain_spec =
200            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
201
202        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
203        self.shutdown_tx = Some(shutdown_tx);
204
205        let is_dev = self.is_dev;
206        let storage_v2 = self.storage_v2;
207        let node_count = self.network.node_count;
208        let tree_config = self.tree_config.clone();
209
210        let attributes_generator = Self::create_static_attributes_generator::<N>();
211
212        let mut builder = E2ETestSetupBuilder::<N, _>::new(
213            node_count,
214            Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
215            attributes_generator,
216        )
217        .with_tree_config_modifier(move |base| {
218            tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
219        })
220        .with_node_config_modifier(move |config| config.set_dev(is_dev))
221        .with_connect_nodes(self.network.connect_nodes);
222
223        if storage_v2 {
224            builder = builder.with_storage_v2();
225        }
226
227        let result = builder.build().await;
228
229        let mut node_clients = Vec::new();
230        match result {
231            Ok((nodes, _wallet)) => {
232                // create HTTP clients for each node's RPC and Engine API endpoints
233                for node in &nodes {
234                    node_clients.push(node.to_node_client()?);
235                }
236
237                // spawn a separate task just to handle the shutdown
238                tokio::spawn(async move {
239                    // keep nodes in scope to ensure they're not dropped
240                    let _nodes = nodes;
241                    // Wait for shutdown signal
242                    let _ = shutdown_rx.recv().await;
243                    // nodes will be dropped here when the test completes
244                });
245            }
246            Err(e) => {
247                return Err(eyre!("Failed to setup nodes: {}", e));
248            }
249        }
250
251        // Finalize setup
252        self.finalize_setup(env, node_clients, false).await
253    }
254
255    /// Create nodes with imported chain data
256    ///
257    /// Note: Currently this only supports `EthereumNode` due to the import process
258    /// being Ethereum-specific. The generic parameter N is kept for consistency
259    /// with other methods but is not used.
260    async fn create_nodes_with_import(
261        &self,
262        rlp_path: &Path,
263    ) -> Result<crate::setup_import::ChainImportResult> {
264        let chain_spec =
265            self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
266
267        let attributes_generator = move |timestamp| {
268            let attributes = PayloadAttributes {
269                timestamp,
270                prev_randao: B256::ZERO,
271                suggested_fee_recipient: alloy_primitives::Address::ZERO,
272                withdrawals: Some(vec![]),
273                parent_beacon_block_root: Some(B256::ZERO),
274            };
275            EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
276        };
277
278        crate::setup_import::setup_engine_with_chain_import(
279            self.network.node_count,
280            chain_spec,
281            self.is_dev,
282            self.tree_config.clone(),
283            rlp_path,
284            attributes_generator,
285        )
286        .await
287    }
288
289    /// Create a static attributes generator that doesn't capture any instance data
290    fn create_static_attributes_generator<N>(
291    ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes
292           + Copy
293           + use<N, I>
294    where
295        N: NodeBuilderHelper<Payload = I>,
296    {
297        move |timestamp| {
298            let attributes = PayloadAttributes {
299                timestamp,
300                prev_randao: B256::ZERO,
301                suggested_fee_recipient: alloy_primitives::Address::ZERO,
302                withdrawals: Some(vec![]),
303                parent_beacon_block_root: Some(B256::ZERO),
304            };
305            <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
306                EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
307            )
308        }
309    }
310
311    /// Common finalization logic for both apply methods
312    async fn finalize_setup(
313        &self,
314        env: &mut Environment<I>,
315        node_clients: Vec<crate::testsuite::NodeClient<I>>,
316        use_latest_block: bool,
317    ) -> Result<()> {
318        if node_clients.is_empty() {
319            return Err(eyre!("No nodes were created"));
320        }
321
322        // Wait for all nodes to be ready
323        self.wait_for_nodes_ready(&node_clients).await?;
324
325        env.node_clients = node_clients;
326        env.initialize_node_states(self.network.node_count);
327
328        // Get initial block info (genesis or latest depending on use_latest_block)
329        let (initial_block_info, genesis_block_info) = if use_latest_block {
330            // For imported chain, get both latest and genesis
331            let latest =
332                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
333            let genesis =
334                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
335            (latest, genesis)
336        } else {
337            // For fresh chain, both are genesis
338            let genesis =
339                self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
340            (genesis, genesis)
341        };
342
343        // Initialize all node states
344        for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
345            node_state.current_block_info = Some(initial_block_info);
346            node_state.latest_header_time = initial_block_info.timestamp;
347            node_state.latest_fork_choice_state = ForkchoiceState {
348                head_block_hash: initial_block_info.hash,
349                safe_block_hash: initial_block_info.hash,
350                finalized_block_hash: genesis_block_info.hash,
351            };
352
353            debug!(
354                "Node {} initialized with block {} (hash: {})",
355                node_idx, initial_block_info.number, initial_block_info.hash
356            );
357        }
358
359        debug!(
360            "Environment initialized with {} nodes, starting from block {} (hash: {})",
361            self.network.node_count, initial_block_info.number, initial_block_info.hash
362        );
363
364        // In test environments, explicitly set sync state to Idle after initialization
365        // This ensures that eth_syncing returns false as expected by tests
366        if let Some(import_result) = &self.import_result_holder {
367            for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
368                debug!("Setting sync state to Idle for node {}", idx);
369                node_ctx.inner.network.update_sync_state(SyncState::Idle);
370            }
371        }
372
373        Ok(())
374    }
375
376    /// Wait for all nodes to be ready to accept RPC requests
377    async fn wait_for_nodes_ready<P>(
378        &self,
379        node_clients: &[crate::testsuite::NodeClient<P>],
380    ) -> Result<()>
381    where
382        P: PayloadTypes,
383    {
384        for (idx, client) in node_clients.iter().enumerate() {
385            let mut retry_count = 0;
386            const MAX_RETRIES: usize = 10;
387
388            while retry_count < MAX_RETRIES {
389                if client.is_ready().await {
390                    debug!("Node {idx} RPC endpoint is ready");
391                    break;
392                }
393
394                retry_count += 1;
395                debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
396                sleep(Duration::from_millis(500)).await;
397            }
398
399            if retry_count == MAX_RETRIES {
400                return Err(eyre!(
401                    "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
402                ));
403            }
404        }
405        Ok(())
406    }
407
408    /// Get block info for a given block number or tag
409    async fn get_block_info<P>(
410        &self,
411        client: &crate::testsuite::NodeClient<P>,
412        block: BlockNumberOrTag,
413    ) -> Result<crate::testsuite::BlockInfo>
414    where
415        P: PayloadTypes,
416    {
417        let block = client
418            .get_block_by_number(block)
419            .await?
420            .ok_or_else(|| eyre!("Block {:?} not found", block))?;
421
422        Ok(crate::testsuite::BlockInfo {
423            hash: block.header.hash,
424            number: block.header.number,
425            timestamp: block.header.timestamp,
426        })
427    }
428}
429
430/// Genesis block configuration
431#[derive(Debug)]
432pub struct Genesis {}
433
434/// Network configuration for setup
435#[derive(Debug, Default)]
436pub struct NetworkSetup {
437    /// Number of nodes to create
438    pub node_count: usize,
439    /// Whether nodes should be connected to each other
440    pub connect_nodes: bool,
441}
442
443impl NetworkSetup {
444    /// Create a new network setup with a single node
445    pub const fn single_node() -> Self {
446        Self { node_count: 1, connect_nodes: true }
447    }
448
449    /// Create a new network setup with multiple nodes (connected)
450    pub const fn multi_node(count: usize) -> Self {
451        Self { node_count: count, connect_nodes: true }
452    }
453
454    /// Create a new network setup with multiple nodes (disconnected)
455    pub const fn multi_node_unconnected(count: usize) -> Self {
456        Self { node_count: count, connect_nodes: false }
457    }
458}