1use crate::{node::NodeTestContext, NodeHelperType, Wallet};
4use reth_chainspec::ChainSpec;
5use reth_cli_commands::import_core::{import_blocks_from_file, ImportConfig};
6use reth_config::Config;
7use reth_db::DatabaseEnv;
8use reth_node_api::{NodeTypesWithDBAdapter, TreeConfig};
9use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig, NodeHandle};
10use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
11use reth_node_ethereum::EthereumNode;
12use reth_provider::{
13 providers::BlockchainProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
14 StaticFileProviderFactory,
15};
16use reth_rpc_server_types::RpcModuleSelection;
17use reth_stages_types::StageId;
18use std::{path::Path, sync::Arc};
19use tempfile::TempDir;
20use tracing::{debug, info, span, Level};
21
22pub struct ChainImportResult {
24 pub nodes: Vec<NodeHelperType<EthereumNode>>,
26 pub wallet: Wallet,
28 pub _temp_dirs: Vec<TempDir>,
30}
31
32impl std::fmt::Debug for ChainImportResult {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("ChainImportResult")
35 .field("nodes", &self.nodes.len())
36 .field("wallet", &self.wallet)
37 .field("temp_dirs", &self._temp_dirs.len())
38 .finish()
39 }
40}
41
42pub async fn setup_engine_with_chain_import(
57 num_nodes: usize,
58 chain_spec: Arc<ChainSpec>,
59 is_dev: bool,
60 tree_config: TreeConfig,
61 rlp_path: &Path,
62 attributes_generator: impl Fn(u64) -> reth_payload_builder::EthPayloadBuilderAttributes
63 + Send
64 + Sync
65 + Copy
66 + 'static,
67) -> eyre::Result<ChainImportResult> {
68 let runtime = reth_tasks::Runtime::test();
69
70 let network_config = NetworkArgs {
71 discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
72 ..NetworkArgs::default()
73 };
74
75 let mut nodes: Vec<NodeHelperType<EthereumNode>> = Vec::with_capacity(num_nodes);
77 let mut temp_dirs = Vec::with_capacity(num_nodes); for idx in 0..num_nodes {
80 let temp_dir = TempDir::new()?;
82 let datadir = temp_dir.path().to_path_buf();
83
84 let mut node_config = NodeConfig::new(chain_spec.clone())
85 .with_network(network_config.clone())
86 .with_unused_ports()
87 .with_rpc(
88 RpcServerArgs::default()
89 .with_unused_ports()
90 .with_http()
91 .with_http_api(RpcModuleSelection::All),
92 )
93 .set_dev(is_dev);
94
95 node_config.datadir.datadir =
97 reth_node_core::dirs::MaybePlatformPath::from(datadir.clone());
98 debug!(target: "e2e::import", "Node {idx} datadir: {datadir:?}");
99
100 let span = span!(Level::INFO, "node", idx);
101 let _enter = span.enter();
102
103 info!(target: "test", "Importing chain data from {:?} for node {} into {:?}", rlp_path, idx, datadir);
105
106 let db_path = datadir.join("db");
108 let static_files_path = datadir.join("static_files");
109 let rocksdb_dir_path = datadir.join("rocksdb");
110
111 let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
113 let db = reth_db::init_db(&db_path, db_args)?;
114
115 let provider_factory =
118 ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
119 db.clone(),
120 chain_spec.clone(),
121 reth_provider::providers::StaticFileProvider::read_write(
122 static_files_path.clone(),
123 )?,
124 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
125 .with_default_tables()
126 .build()
127 .unwrap(),
128 reth_tasks::Runtime::test(),
129 )?;
130
131 reth_db_common::init::init_genesis(&provider_factory)?;
133
134 let import_config = ImportConfig::default();
137 let config = Config::default();
138
139 let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
141 let consensus = reth_consensus::noop::NoopConsensus::arc();
143
144 let result = import_blocks_from_file(
145 rlp_path,
146 import_config,
147 provider_factory.clone(),
148 &config,
149 evm_config,
150 consensus,
151 runtime.clone(),
152 )
153 .await?;
154
155 info!(
156 target: "test",
157 "Imported {} blocks and {} transactions for node {}",
158 result.total_imported_blocks,
159 result.total_imported_txns,
160 idx
161 );
162
163 debug!(target: "e2e::import",
164 "Import result for node {}: decoded {} blocks, imported {} blocks, complete: {}",
165 idx,
166 result.total_decoded_blocks,
167 result.total_imported_blocks,
168 result.is_complete()
169 );
170
171 if result.total_decoded_blocks != result.total_imported_blocks {
172 debug!(target: "e2e::import",
173 "Import block count mismatch: decoded {} != imported {}",
174 result.total_decoded_blocks, result.total_imported_blocks
175 );
176 return Err(eyre::eyre!("Chain import block count mismatch for node {}", idx));
177 }
178
179 if result.total_decoded_txns != result.total_imported_txns {
180 debug!(target: "e2e::import",
181 "Import transaction count mismatch: decoded {} != imported {}",
182 result.total_decoded_txns, result.total_imported_txns
183 );
184 return Err(eyre::eyre!("Chain import transaction count mismatch for node {}", idx));
185 }
186
187 {
189 let provider = provider_factory.database_provider_ro()?;
190 let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers)?;
191 if headers_checkpoint.is_none() {
192 return Err(eyre::eyre!("Headers stage checkpoint is missing after import!"));
193 }
194 debug!(target: "e2e::import", "Headers stage checkpoint after import: {headers_checkpoint:?}");
195 drop(provider);
196 }
197
198 {
202 let static_file_provider = provider_factory.static_file_provider();
203 drop(static_file_provider);
205 }
206
207 drop(provider_factory);
209 drop(db);
210
211 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213
214 debug!(target: "e2e::import", "Launching node with datadir: {:?}", datadir);
216
217 let node = EthereumNode::default();
220
221 let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
222 .testing_node_with_datadir(runtime.clone(), datadir.clone())
223 .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
224 .with_components(node.components_builder())
225 .with_add_ons(node.add_ons())
226 .launch_with_fn(|builder| {
227 let launcher = EngineNodeLauncher::new(
228 builder.task_executor().clone(),
229 builder.config().datadir(),
230 tree_config.clone(),
231 );
232 builder.launch_with(launcher)
233 })
234 .await?;
235
236 let node_ctx = NodeTestContext::new(node, attributes_generator).await?;
237
238 nodes.push(node_ctx);
239 temp_dirs.push(temp_dir); }
241
242 Ok(ChainImportResult {
243 nodes,
244 wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
245 _temp_dirs: temp_dirs,
246 })
247}
248
249pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine::ForkchoiceState> {
251 let json_str = std::fs::read_to_string(path)?;
252 let fcu_data: serde_json::Value = serde_json::from_str(&json_str)?;
253
254 let state = &fcu_data["params"][0];
256 Ok(alloy_rpc_types_engine::ForkchoiceState {
257 head_block_hash: state["headBlockHash"]
258 .as_str()
259 .ok_or_else(|| eyre::eyre!("missing headBlockHash"))?
260 .parse()?,
261 safe_block_hash: state["safeBlockHash"]
262 .as_str()
263 .ok_or_else(|| eyre::eyre!("missing safeBlockHash"))?
264 .parse()?,
265 finalized_block_hash: state["finalizedBlockHash"]
266 .as_str()
267 .ok_or_else(|| eyre::eyre!("missing finalizedBlockHash"))?
268 .parse()?,
269 })
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
276 use reth_chainspec::{ChainSpecBuilder, MAINNET};
277 use reth_db::mdbx::DatabaseArguments;
278 use reth_ethereum_primitives::Block;
279 use reth_payload_builder::EthPayloadBuilderAttributes;
280 use reth_primitives_traits::SealedBlock;
281 use reth_provider::{
282 test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
283 };
284 use std::path::PathBuf;
285
286 #[tokio::test]
287 async fn test_stage_checkpoints_persistence() {
288 reth_tracing::init_test_tracing();
291
292 let chain_spec = Arc::new(
293 ChainSpecBuilder::default()
294 .chain(MAINNET.chain)
295 .genesis(
296 serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
297 )
298 .london_activated()
299 .shanghai_activated()
300 .build(),
301 );
302
303 let test_blocks = generate_test_blocks(&chain_spec, 5);
305
306 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
308 let rlp_path = temp_dir.path().join("test_chain.rlp");
309 write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
310
311 let datadir = temp_dir.path().join("datadir");
313 std::fs::create_dir_all(&datadir).unwrap();
314 let db_path = datadir.join("db");
315 let static_files_path = datadir.join("static_files");
316 let rocksdb_dir_path = datadir.join("rocksdb");
317
318 {
320 let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
321 let db = reth_db::init_db(&db_path, db_args).unwrap();
322
323 let provider_factory: ProviderFactory<
324 NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
325 > = ProviderFactory::new(
326 db.clone(),
327 chain_spec.clone(),
328 reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
329 .unwrap(),
330 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
331 .with_default_tables()
332 .build()
333 .unwrap(),
334 reth_tasks::Runtime::test(),
335 )
336 .expect("failed to create provider factory");
337
338 reth_db_common::init::init_genesis(&provider_factory).unwrap();
340
341 let import_config = ImportConfig::default();
343 let config = Config::default();
344 let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
345 let consensus = reth_consensus::noop::NoopConsensus::arc();
347 let runtime = reth_tasks::Runtime::test();
348
349 let result = import_blocks_from_file(
350 &rlp_path,
351 import_config,
352 provider_factory.clone(),
353 &config,
354 evm_config,
355 consensus,
356 runtime,
357 )
358 .await
359 .unwrap();
360
361 assert_eq!(result.total_decoded_blocks, 5);
362 assert_eq!(result.total_imported_blocks, 5);
363
364 let provider = provider_factory.database_provider_ro().unwrap();
366 let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
367 assert!(headers_checkpoint.is_some(), "Headers checkpoint should exist after import");
368 assert_eq!(
369 headers_checkpoint.unwrap().block_number,
370 5,
371 "Headers checkpoint should be at block 5"
372 );
373 drop(provider);
374
375 let static_file_provider = provider_factory.static_file_provider();
377 drop(static_file_provider);
378
379 drop(provider_factory);
380 drop(db);
381 }
382
383 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
385
386 {
388 let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
389
390 let provider_factory: ProviderFactory<
391 NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
392 > = ProviderFactory::new(
393 db,
394 chain_spec.clone(),
395 reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
396 .unwrap(),
397 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
398 .with_default_tables()
399 .build()
400 .unwrap(),
401 reth_tasks::Runtime::test(),
402 )
403 .expect("failed to create provider factory");
404
405 let provider = provider_factory.database_provider_ro().unwrap();
406
407 let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
409 assert!(
410 headers_checkpoint.is_some(),
411 "Headers checkpoint should still exist after reopening database"
412 );
413 assert_eq!(
414 headers_checkpoint.unwrap().block_number,
415 5,
416 "Headers checkpoint should still be at block 5"
417 );
418
419 let block_5_hash = provider.block_hash(5).unwrap();
421 assert!(block_5_hash.is_some(), "Block 5 should exist in database");
422 assert_eq!(block_5_hash.unwrap(), test_blocks[4].hash(), "Block 5 hash should match");
423
424 debug!(target: "e2e::import", "All stage checkpoints after reopening:");
426 for stage in StageId::ALL {
427 let checkpoint = provider.get_stage_checkpoint(stage).unwrap();
428 debug!(target: "e2e::import", " Stage {stage:?}: {checkpoint:?}");
429 }
430 }
431 }
432
433 fn create_test_chain_spec() -> Arc<ChainSpec> {
435 Arc::new(
436 ChainSpecBuilder::default()
437 .chain(MAINNET.chain)
438 .genesis(
439 serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
440 )
441 .london_activated()
442 .shanghai_activated()
443 .build(),
444 )
445 }
446
447 async fn setup_test_blocks_and_rlp(
449 chain_spec: &ChainSpec,
450 block_count: u64,
451 temp_dir: &Path,
452 ) -> (Vec<SealedBlock<Block>>, PathBuf) {
453 let test_blocks = generate_test_blocks(chain_spec, block_count);
454 assert_eq!(
455 test_blocks.len(),
456 block_count as usize,
457 "Should have generated expected blocks"
458 );
459
460 let rlp_path = temp_dir.join("test_chain.rlp");
461 write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
462
463 let rlp_size = std::fs::metadata(&rlp_path).expect("RLP file should exist").len();
464 debug!(target: "e2e::import", "Wrote RLP file with size: {rlp_size} bytes");
465
466 (test_blocks, rlp_path)
467 }
468
469 #[tokio::test]
470 async fn test_import_blocks_only() {
471 reth_tracing::init_test_tracing();
473
474 let chain_spec = create_test_chain_spec();
475 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
476 let (test_blocks, rlp_path) =
477 setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
478
479 let datadir = temp_dir.path().join("datadir");
481 std::fs::create_dir_all(&datadir).unwrap();
482 let db_path = datadir.join("db");
483 let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
484 let db_env = reth_db::init_db(&db_path, db_args).unwrap();
485 let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
486
487 let static_files_path = datadir.join("static_files");
489
490 let rocksdb_dir_path = datadir.join("rocksdb");
492
493 let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
495 db.clone(),
496 chain_spec.clone(),
497 reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
498 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
499 .with_default_tables()
500 .build()
501 .unwrap(),
502 reth_tasks::Runtime::test(),
503 )
504 .expect("failed to create provider factory");
505
506 reth_db_common::init::init_genesis(&provider_factory).unwrap();
508
509 let import_config = ImportConfig::default();
511 let config = Config::default();
512 let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
513 let consensus = reth_consensus::noop::NoopConsensus::arc();
515 let runtime = reth_tasks::Runtime::test();
516
517 let result = import_blocks_from_file(
518 &rlp_path,
519 import_config,
520 provider_factory.clone(),
521 &config,
522 evm_config,
523 consensus,
524 runtime,
525 )
526 .await
527 .unwrap();
528
529 debug!(target: "e2e::import",
530 "Import result: decoded {} blocks, imported {} blocks",
531 result.total_decoded_blocks, result.total_imported_blocks
532 );
533
534 assert_eq!(result.total_decoded_blocks, 10);
536 assert_eq!(result.total_imported_blocks, 10);
537 assert_eq!(result.total_decoded_txns, 0);
538 assert_eq!(result.total_imported_txns, 0);
539
540 let provider = provider_factory.database_provider_ro().unwrap();
542 let latest_block = provider.last_block_number().unwrap();
543 assert_eq!(latest_block, 10, "Should have imported up to block 10");
544
545 let block_10_hash = provider.block_hash(10).unwrap().expect("Block 10 should exist");
546 assert_eq!(block_10_hash, test_blocks[9].hash(), "Block 10 hash should match");
547 }
548
549 #[tokio::test]
550 async fn test_import_with_node_integration() {
551 reth_tracing::init_test_tracing();
553
554 let chain_spec = create_test_chain_spec();
555 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
556 let (test_blocks, rlp_path) =
557 setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
558
559 let tip = test_blocks.last().expect("Should have generated blocks");
561 let fcu_path = temp_dir.path().join("test_fcu.json");
562 std::fs::write(&fcu_path, create_fcu_json(tip).to_string())
563 .expect("Failed to write FCU data");
564
565 let result = setup_engine_with_chain_import(
567 1,
568 chain_spec,
569 false,
570 TreeConfig::default(),
571 &rlp_path,
572 |_| EthPayloadBuilderAttributes::default(),
573 )
574 .await
575 .expect("Failed to setup nodes with chain import");
576
577 let fcu_state = load_forkchoice_state(&fcu_path).expect("Failed to load forkchoice state");
579
580 let node = &result.nodes[0];
581
582 node.update_forkchoice(fcu_state.finalized_block_hash, fcu_state.head_block_hash)
584 .await
585 .expect("Failed to update forkchoice");
586
587 node.sync_to(fcu_state.head_block_hash).await.expect("Failed to sync to head");
589
590 let latest = node
592 .inner
593 .provider
594 .sealed_header_by_id(alloy_eips::BlockId::latest())
595 .expect("Failed to get latest header")
596 .expect("No latest header found");
597
598 assert_eq!(
599 latest.hash(),
600 fcu_state.head_block_hash,
601 "Chain tip does not match expected head"
602 );
603 }
604}