reth_e2e_test_utils/testsuite/
setup.rs1use crate::{
4 setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper,
5 PayloadAttributesBuilder,
6};
7use alloy_eips::BlockNumberOrTag;
8use alloy_primitives::B256;
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
10use eyre::{eyre, Result};
11use reth_chainspec::ChainSpec;
12use reth_engine_local::LocalPayloadAttributesBuilder;
13use reth_ethereum_primitives::Block;
14use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
15use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
16use reth_node_core::primitives::RecoveredBlock;
17use reth_payload_builder::EthPayloadBuilderAttributes;
18use revm::state::EvmState;
19use std::{marker::PhantomData, path::Path, sync::Arc};
20use tokio::{
21 sync::mpsc,
22 time::{sleep, Duration},
23};
24use tracing::debug;
25
26#[derive(Debug)]
28pub struct Setup<I> {
29 pub chain_spec: Option<Arc<ChainSpec>>,
31 pub genesis: Option<Genesis>,
33 pub blocks: Vec<RecoveredBlock<Block>>,
35 pub state: Option<EvmState>,
37 pub network: NetworkSetup,
39 pub tree_config: TreeConfig,
41 shutdown_tx: Option<mpsc::Sender<()>>,
43 pub is_dev: bool,
45 _phantom: PhantomData<I>,
47 import_result_holder: Option<crate::setup_import::ChainImportResult>,
50 pub import_rlp_path: Option<std::path::PathBuf>,
52}
53
54impl<I> Default for Setup<I> {
55 fn default() -> Self {
56 Self {
57 chain_spec: None,
58 genesis: None,
59 blocks: Vec::new(),
60 state: None,
61 network: NetworkSetup::default(),
62 tree_config: TreeConfig::default(),
63 shutdown_tx: None,
64 is_dev: true,
65 _phantom: Default::default(),
66 import_result_holder: None,
67 import_rlp_path: None,
68 }
69 }
70}
71
72impl<I> Drop for Setup<I> {
73 fn drop(&mut self) {
74 if let Some(tx) = self.shutdown_tx.take() {
76 let _ = tx.try_send(());
77 }
78 }
79}
80
81impl<I> Setup<I>
82where
83 I: EngineTypes,
84{
85 pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
87 self.chain_spec = Some(chain_spec);
88 self
89 }
90
91 pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
93 self.genesis = Some(genesis);
94 self
95 }
96
97 pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
99 self.blocks.push(block);
100 self
101 }
102
103 pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
105 self.blocks.extend(blocks);
106 self
107 }
108
109 pub fn with_state(mut self, state: EvmState) -> Self {
111 self.state = Some(state);
112 self
113 }
114
115 pub const fn with_network(mut self, network: NetworkSetup) -> Self {
117 self.network = network;
118 self
119 }
120
121 pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
123 self.is_dev = is_dev;
124 self
125 }
126
127 pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
129 self.tree_config = tree_config;
130 self
131 }
132
133 pub async fn apply_with_import<N>(
135 &mut self,
136 env: &mut Environment<I>,
137 rlp_path: &Path,
138 ) -> Result<()>
139 where
140 N: NodeBuilderHelper<Payload = I>,
141 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
142 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
143 >,
144 {
145 Box::pin(self.apply_with_import_::<N>(env, rlp_path)).await
147 }
148
149 async fn apply_with_import_<N>(
151 &mut self,
152 env: &mut Environment<I>,
153 rlp_path: &Path,
154 ) -> Result<()>
155 where
156 N: NodeBuilderHelper<Payload = I>,
157 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
158 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
159 >,
160 {
161 let import_result = self.create_nodes_with_import::<N>(rlp_path).await?;
163
164 let mut node_clients = Vec::new();
166 let nodes = &import_result.nodes;
167 for node in nodes {
168 let rpc = node
169 .rpc_client()
170 .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
171 let auth = node.auth_server_handle();
172 let url = node.rpc_url();
173 node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
175 }
176
177 self.import_result_holder = Some(import_result);
180
181 self.finalize_setup(env, node_clients, true).await
183 }
184
185 pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
187 where
188 N: NodeBuilderHelper<Payload = I>,
189 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
190 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
191 >,
192 {
193 Box::pin(self.apply_::<N>(env)).await
195 }
196
197 async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
199 where
200 N: NodeBuilderHelper<Payload = I>,
201 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
202 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
203 >,
204 {
205 if let Some(rlp_path) = self.import_rlp_path.take() {
207 return self.apply_with_import::<N>(env, &rlp_path).await;
208 }
209 let chain_spec =
210 self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
211
212 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
213 self.shutdown_tx = Some(shutdown_tx);
214
215 let is_dev = self.is_dev;
216 let node_count = self.network.node_count;
217
218 let attributes_generator = Self::create_static_attributes_generator::<N>();
219
220 let result = setup_engine_with_connection::<N>(
221 node_count,
222 Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
223 is_dev,
224 self.tree_config.clone(),
225 attributes_generator,
226 self.network.connect_nodes,
227 )
228 .await;
229
230 let mut node_clients = Vec::new();
231 match result {
232 Ok((nodes, executor, _wallet)) => {
233 for node in &nodes {
235 node_clients.push(node.to_node_client()?);
236 }
237
238 tokio::spawn(async move {
240 let _nodes = nodes;
242 let _executor = executor;
243 let _ = shutdown_rx.recv().await;
245 });
247 }
248 Err(e) => {
249 return Err(eyre!("Failed to setup nodes: {}", e));
250 }
251 }
252
253 self.finalize_setup(env, node_clients, false).await
255 }
256
257 async fn create_nodes_with_import<N>(
263 &self,
264 rlp_path: &Path,
265 ) -> Result<crate::setup_import::ChainImportResult>
266 where
267 N: NodeBuilderHelper<Payload = I>,
268 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
269 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
270 >,
271 {
272 let chain_spec =
273 self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
274
275 let attributes_generator = move |timestamp| {
276 let attributes = PayloadAttributes {
277 timestamp,
278 prev_randao: B256::ZERO,
279 suggested_fee_recipient: alloy_primitives::Address::ZERO,
280 withdrawals: Some(vec![]),
281 parent_beacon_block_root: Some(B256::ZERO),
282 };
283 EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
284 };
285
286 crate::setup_import::setup_engine_with_chain_import(
287 self.network.node_count,
288 chain_spec,
289 self.is_dev,
290 self.tree_config.clone(),
291 rlp_path,
292 attributes_generator,
293 )
294 .await
295 }
296
297 fn create_static_attributes_generator<N>(
299 ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes
300 + Copy
301 + use<N, I>
302 where
303 N: NodeBuilderHelper<Payload = I>,
304 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
305 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
306 >,
307 {
308 move |timestamp| {
309 let attributes = PayloadAttributes {
310 timestamp,
311 prev_randao: B256::ZERO,
312 suggested_fee_recipient: alloy_primitives::Address::ZERO,
313 withdrawals: Some(vec![]),
314 parent_beacon_block_root: Some(B256::ZERO),
315 };
316 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
317 EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
318 )
319 }
320 }
321
322 async fn finalize_setup(
324 &self,
325 env: &mut Environment<I>,
326 node_clients: Vec<crate::testsuite::NodeClient<I>>,
327 use_latest_block: bool,
328 ) -> Result<()> {
329 if node_clients.is_empty() {
330 return Err(eyre!("No nodes were created"));
331 }
332
333 self.wait_for_nodes_ready(&node_clients).await?;
335
336 env.node_clients = node_clients;
337 env.initialize_node_states(self.network.node_count);
338
339 let (initial_block_info, genesis_block_info) = if use_latest_block {
341 let latest =
343 self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
344 let genesis =
345 self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
346 (latest, genesis)
347 } else {
348 let genesis =
350 self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
351 (genesis, genesis)
352 };
353
354 for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
356 node_state.current_block_info = Some(initial_block_info);
357 node_state.latest_header_time = initial_block_info.timestamp;
358 node_state.latest_fork_choice_state = ForkchoiceState {
359 head_block_hash: initial_block_info.hash,
360 safe_block_hash: initial_block_info.hash,
361 finalized_block_hash: genesis_block_info.hash,
362 };
363
364 debug!(
365 "Node {} initialized with block {} (hash: {})",
366 node_idx, initial_block_info.number, initial_block_info.hash
367 );
368 }
369
370 debug!(
371 "Environment initialized with {} nodes, starting from block {} (hash: {})",
372 self.network.node_count, initial_block_info.number, initial_block_info.hash
373 );
374
375 if let Some(import_result) = &self.import_result_holder {
378 for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
379 debug!("Setting sync state to Idle for node {}", idx);
380 node_ctx.inner.network.update_sync_state(SyncState::Idle);
381 }
382 }
383
384 Ok(())
385 }
386
387 async fn wait_for_nodes_ready<P>(
389 &self,
390 node_clients: &[crate::testsuite::NodeClient<P>],
391 ) -> Result<()>
392 where
393 P: PayloadTypes,
394 {
395 for (idx, client) in node_clients.iter().enumerate() {
396 let mut retry_count = 0;
397 const MAX_RETRIES: usize = 10;
398
399 while retry_count < MAX_RETRIES {
400 if client.is_ready().await {
401 debug!("Node {idx} RPC endpoint is ready");
402 break;
403 }
404
405 retry_count += 1;
406 debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
407 sleep(Duration::from_millis(500)).await;
408 }
409
410 if retry_count == MAX_RETRIES {
411 return Err(eyre!(
412 "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
413 ));
414 }
415 }
416 Ok(())
417 }
418
419 async fn get_block_info<P>(
421 &self,
422 client: &crate::testsuite::NodeClient<P>,
423 block: BlockNumberOrTag,
424 ) -> Result<crate::testsuite::BlockInfo>
425 where
426 P: PayloadTypes,
427 {
428 let block = client
429 .get_block_by_number(block)
430 .await?
431 .ok_or_else(|| eyre!("Block {:?} not found", block))?;
432
433 Ok(crate::testsuite::BlockInfo {
434 hash: block.header.hash,
435 number: block.header.number,
436 timestamp: block.header.timestamp,
437 })
438 }
439}
440
441#[derive(Debug)]
443pub struct Genesis {}
444
445#[derive(Debug, Default)]
447pub struct NetworkSetup {
448 pub node_count: usize,
450 pub connect_nodes: bool,
452}
453
454impl NetworkSetup {
455 pub const fn single_node() -> Self {
457 Self { node_count: 1, connect_nodes: true }
458 }
459
460 pub const fn multi_node(count: usize) -> Self {
462 Self { node_count: count, connect_nodes: true }
463 }
464
465 pub const fn multi_node_unconnected(count: usize) -> Self {
467 Self { node_count: count, connect_nodes: false }
468 }
469}