reth_e2e_test_utils/testsuite/
setup.rs1use crate::{
4 setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper,
5 PayloadAttributesBuilder,
6};
7use alloy_eips::BlockNumberOrTag;
8use alloy_primitives::B256;
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
10use eyre::{eyre, Result};
11use reth_chainspec::ChainSpec;
12use reth_engine_local::LocalPayloadAttributesBuilder;
13use reth_ethereum_primitives::Block;
14use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
15use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
16use reth_node_core::primitives::RecoveredBlock;
17use reth_payload_builder::EthPayloadBuilderAttributes;
18use revm::state::EvmState;
19use std::{marker::PhantomData, path::Path, sync::Arc};
20use tokio::{
21 sync::mpsc,
22 time::{sleep, Duration},
23};
24use tracing::debug;
25
26#[derive(Debug)]
28pub struct Setup<I> {
29 pub chain_spec: Option<Arc<ChainSpec>>,
31 pub genesis: Option<Genesis>,
33 pub blocks: Vec<RecoveredBlock<Block>>,
35 pub state: Option<EvmState>,
37 pub network: NetworkSetup,
39 pub tree_config: TreeConfig,
41 shutdown_tx: Option<mpsc::Sender<()>>,
43 pub is_dev: bool,
45 _phantom: PhantomData<I>,
47 import_result_holder: Option<crate::setup_import::ChainImportResult>,
50 pub import_rlp_path: Option<std::path::PathBuf>,
52}
53
54impl<I> Default for Setup<I> {
55 fn default() -> Self {
56 Self {
57 chain_spec: None,
58 genesis: None,
59 blocks: Vec::new(),
60 state: None,
61 network: NetworkSetup::default(),
62 tree_config: TreeConfig::default(),
63 shutdown_tx: None,
64 is_dev: true,
65 _phantom: Default::default(),
66 import_result_holder: None,
67 import_rlp_path: None,
68 }
69 }
70}
71
72impl<I> Drop for Setup<I> {
73 fn drop(&mut self) {
74 if let Some(tx) = self.shutdown_tx.take() {
76 let _ = tx.try_send(());
77 }
78 }
79}
80
81impl<I> Setup<I>
82where
83 I: EngineTypes,
84{
85 pub fn new() -> Self {
87 Self::default()
88 }
89
90 pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
92 self.chain_spec = Some(chain_spec);
93 self
94 }
95
96 pub const fn with_genesis(mut self, genesis: Genesis) -> Self {
98 self.genesis = Some(genesis);
99 self
100 }
101
102 pub fn with_block(mut self, block: RecoveredBlock<Block>) -> Self {
104 self.blocks.push(block);
105 self
106 }
107
108 pub fn with_blocks(mut self, blocks: Vec<RecoveredBlock<Block>>) -> Self {
110 self.blocks.extend(blocks);
111 self
112 }
113
114 pub fn with_state(mut self, state: EvmState) -> Self {
116 self.state = Some(state);
117 self
118 }
119
120 pub const fn with_network(mut self, network: NetworkSetup) -> Self {
122 self.network = network;
123 self
124 }
125
126 pub const fn with_dev_mode(mut self, is_dev: bool) -> Self {
128 self.is_dev = is_dev;
129 self
130 }
131
132 pub const fn with_tree_config(mut self, tree_config: TreeConfig) -> Self {
134 self.tree_config = tree_config;
135 self
136 }
137
138 pub async fn apply_with_import<N>(
140 &mut self,
141 env: &mut Environment<I>,
142 rlp_path: &Path,
143 ) -> Result<()>
144 where
145 N: NodeBuilderHelper,
146 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
147 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
148 >,
149 {
150 Box::pin(self.apply_with_import_::<N>(env, rlp_path)).await
152 }
153
154 async fn apply_with_import_<N>(
156 &mut self,
157 env: &mut Environment<I>,
158 rlp_path: &Path,
159 ) -> Result<()>
160 where
161 N: NodeBuilderHelper,
162 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
163 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
164 >,
165 {
166 let import_result = self.create_nodes_with_import::<N>(rlp_path).await?;
168
169 let mut node_clients = Vec::new();
171 let nodes = &import_result.nodes;
172 for node in nodes {
173 let rpc = node
174 .rpc_client()
175 .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
176 let auth = node.auth_server_handle();
177 let url = node.rpc_url();
178 node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
179 }
180
181 self.import_result_holder = Some(import_result);
184
185 self.finalize_setup(env, node_clients, true).await
187 }
188
189 pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
191 where
192 N: NodeBuilderHelper,
193 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
194 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
195 >,
196 {
197 Box::pin(self.apply_::<N>(env)).await
199 }
200
201 async fn apply_<N>(&mut self, env: &mut Environment<I>) -> Result<()>
203 where
204 N: NodeBuilderHelper,
205 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
206 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
207 >,
208 {
209 if let Some(rlp_path) = self.import_rlp_path.take() {
211 return self.apply_with_import::<N>(env, &rlp_path).await;
212 }
213 let chain_spec =
214 self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
215
216 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
217 self.shutdown_tx = Some(shutdown_tx);
218
219 let is_dev = self.is_dev;
220 let node_count = self.network.node_count;
221
222 let attributes_generator = self.create_attributes_generator::<N>();
223
224 let result = setup_engine_with_connection::<N>(
225 node_count,
226 Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
227 is_dev,
228 self.tree_config.clone(),
229 attributes_generator,
230 self.network.connect_nodes,
231 )
232 .await;
233
234 let mut node_clients = Vec::new();
235 match result {
236 Ok((nodes, executor, _wallet)) => {
237 for node in &nodes {
239 let rpc = node
240 .rpc_client()
241 .ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
242 let auth = node.auth_server_handle();
243 let url = node.rpc_url();
244
245 node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
246 }
247
248 tokio::spawn(async move {
250 let _nodes = nodes;
252 let _executor = executor;
253 let _ = shutdown_rx.recv().await;
255 });
257 }
258 Err(e) => {
259 return Err(eyre!("Failed to setup nodes: {}", e));
260 }
261 }
262
263 self.finalize_setup(env, node_clients, false).await
265 }
266
267 async fn create_nodes_with_import<N>(
273 &self,
274 rlp_path: &Path,
275 ) -> Result<crate::setup_import::ChainImportResult>
276 where
277 N: NodeBuilderHelper,
278 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
279 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
280 >,
281 {
282 let chain_spec =
283 self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
284
285 let attributes_generator = move |timestamp| {
286 let attributes = PayloadAttributes {
287 timestamp,
288 prev_randao: B256::ZERO,
289 suggested_fee_recipient: alloy_primitives::Address::ZERO,
290 withdrawals: Some(vec![]),
291 parent_beacon_block_root: Some(B256::ZERO),
292 };
293 EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
294 };
295
296 crate::setup_import::setup_engine_with_chain_import(
297 self.network.node_count,
298 chain_spec,
299 self.is_dev,
300 self.tree_config.clone(),
301 rlp_path,
302 attributes_generator,
303 )
304 .await
305 }
306
307 fn create_attributes_generator<N>(
309 &self,
310 ) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Copy
311 where
312 N: NodeBuilderHelper,
313 LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
314 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
315 >,
316 {
317 move |timestamp| {
318 let attributes = PayloadAttributes {
319 timestamp,
320 prev_randao: B256::ZERO,
321 suggested_fee_recipient: alloy_primitives::Address::ZERO,
322 withdrawals: Some(vec![]),
323 parent_beacon_block_root: Some(B256::ZERO),
324 };
325 <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
326 EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
327 )
328 }
329 }
330
331 async fn finalize_setup(
333 &self,
334 env: &mut Environment<I>,
335 node_clients: Vec<crate::testsuite::NodeClient>,
336 use_latest_block: bool,
337 ) -> Result<()> {
338 if node_clients.is_empty() {
339 return Err(eyre!("No nodes were created"));
340 }
341
342 self.wait_for_nodes_ready(&node_clients).await?;
344
345 env.node_clients = node_clients;
346 env.initialize_node_states(self.network.node_count);
347
348 let (initial_block_info, genesis_block_info) = if use_latest_block {
350 let latest =
352 self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
353 let genesis =
354 self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
355 (latest, genesis)
356 } else {
357 let genesis =
359 self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
360 (genesis, genesis)
361 };
362
363 for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
365 node_state.current_block_info = Some(initial_block_info);
366 node_state.latest_header_time = initial_block_info.timestamp;
367 node_state.latest_fork_choice_state = ForkchoiceState {
368 head_block_hash: initial_block_info.hash,
369 safe_block_hash: initial_block_info.hash,
370 finalized_block_hash: genesis_block_info.hash,
371 };
372
373 debug!(
374 "Node {} initialized with block {} (hash: {})",
375 node_idx, initial_block_info.number, initial_block_info.hash
376 );
377 }
378
379 debug!(
380 "Environment initialized with {} nodes, starting from block {} (hash: {})",
381 self.network.node_count, initial_block_info.number, initial_block_info.hash
382 );
383
384 if let Some(import_result) = &self.import_result_holder {
387 for (idx, node_ctx) in import_result.nodes.iter().enumerate() {
388 debug!("Setting sync state to Idle for node {}", idx);
389 node_ctx.inner.network.update_sync_state(SyncState::Idle);
390 }
391 }
392
393 Ok(())
394 }
395
396 async fn wait_for_nodes_ready(
398 &self,
399 node_clients: &[crate::testsuite::NodeClient],
400 ) -> Result<()> {
401 for (idx, client) in node_clients.iter().enumerate() {
402 let mut retry_count = 0;
403 const MAX_RETRIES: usize = 10;
404
405 while retry_count < MAX_RETRIES {
406 if client.is_ready().await {
407 debug!("Node {idx} RPC endpoint is ready");
408 break;
409 }
410
411 retry_count += 1;
412 debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
413 sleep(Duration::from_millis(500)).await;
414 }
415
416 if retry_count == MAX_RETRIES {
417 return Err(eyre!(
418 "Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
419 ));
420 }
421 }
422 Ok(())
423 }
424
425 async fn get_block_info(
427 &self,
428 client: &crate::testsuite::NodeClient,
429 block: BlockNumberOrTag,
430 ) -> Result<crate::testsuite::BlockInfo> {
431 let block = client
432 .get_block_by_number(block)
433 .await?
434 .ok_or_else(|| eyre!("Block {:?} not found", block))?;
435
436 Ok(crate::testsuite::BlockInfo {
437 hash: block.header.hash,
438 number: block.header.number,
439 timestamp: block.header.timestamp,
440 })
441 }
442}
443
444#[derive(Debug)]
446pub struct Genesis {}
447
448#[derive(Debug, Default)]
450pub struct NetworkSetup {
451 pub node_count: usize,
453 pub connect_nodes: bool,
455}
456
457impl NetworkSetup {
458 pub const fn single_node() -> Self {
460 Self { node_count: 1, connect_nodes: true }
461 }
462
463 pub const fn multi_node(count: usize) -> Self {
465 Self { node_count: count, connect_nodes: true }
466 }
467
468 pub const fn multi_node_unconnected(count: usize) -> Self {
470 Self { node_count: count, connect_nodes: false }
471 }
472}