1use crate::{node::NodeTestContext, NodeHelperType, Wallet};
4use alloy_rpc_types_engine::PayloadAttributes;
5use reth_chainspec::ChainSpec;
6use reth_cli_commands::import_core::{import_blocks_from_file, ImportConfig};
7use reth_config::Config;
8use reth_db::DatabaseEnv;
9use reth_node_api::{NodeTypesWithDBAdapter, TreeConfig};
10use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig, NodeHandle};
11use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
12use reth_node_ethereum::EthereumNode;
13use reth_provider::{
14 providers::BlockchainProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
15 StaticFileProviderFactory,
16};
17use reth_rpc_server_types::RpcModuleSelection;
18use reth_stages_types::StageId;
19use std::{path::Path, sync::Arc};
20use tempfile::TempDir;
21use tracing::{debug, info, span, Level};
22
23pub struct ChainImportResult {
25 pub nodes: Vec<NodeHelperType<EthereumNode>>,
27 pub wallet: Wallet,
29 pub _temp_dirs: Vec<TempDir>,
31}
32
33impl std::fmt::Debug for ChainImportResult {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("ChainImportResult")
36 .field("nodes", &self.nodes.len())
37 .field("wallet", &self.wallet)
38 .field("temp_dirs", &self._temp_dirs.len())
39 .finish()
40 }
41}
42
43pub async fn setup_engine_with_chain_import(
58 num_nodes: usize,
59 chain_spec: Arc<ChainSpec>,
60 is_dev: bool,
61 tree_config: TreeConfig,
62 rlp_path: &Path,
63 attributes_generator: impl Fn(u64) -> PayloadAttributes + Send + Sync + Copy + 'static,
64) -> eyre::Result<ChainImportResult> {
65 let runtime = reth_tasks::Runtime::test();
66
67 let network_config = NetworkArgs {
68 discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
69 ..NetworkArgs::default()
70 };
71
72 let mut nodes: Vec<NodeHelperType<EthereumNode>> = Vec::with_capacity(num_nodes);
74 let mut temp_dirs = Vec::with_capacity(num_nodes); for idx in 0..num_nodes {
77 let temp_dir = TempDir::new()?;
79 let datadir = temp_dir.path().to_path_buf();
80
81 let mut node_config = NodeConfig::new(chain_spec.clone())
82 .with_network(network_config.clone())
83 .with_unused_ports()
84 .with_rpc(
85 RpcServerArgs::default()
86 .with_unused_ports()
87 .with_http()
88 .with_http_api(RpcModuleSelection::All),
89 )
90 .set_dev(is_dev);
91
92 node_config.datadir.datadir =
94 reth_node_core::dirs::MaybePlatformPath::from(datadir.clone());
95 debug!(target: "e2e::import", "Node {idx} datadir: {datadir:?}");
96
97 let span = span!(Level::INFO, "node", idx);
98 let _enter = span.enter();
99
100 info!(target: "test", "Importing chain data from {:?} for node {} into {:?}", rlp_path, idx, datadir);
102
103 let db_path = datadir.join("db");
105 let static_files_path = datadir.join("static_files");
106 let rocksdb_dir_path = datadir.join("rocksdb");
107
108 let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
110 let db = reth_db::init_db(&db_path, db_args)?;
111
112 let provider_factory =
115 ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
116 db.clone(),
117 chain_spec.clone(),
118 reth_provider::providers::StaticFileProvider::read_write(
119 static_files_path.clone(),
120 )?,
121 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
122 .with_default_tables()
123 .build()
124 .unwrap(),
125 reth_tasks::Runtime::test(),
126 )?;
127
128 reth_db_common::init::init_genesis(&provider_factory)?;
130
131 let import_config = ImportConfig::default();
134 let config = Config::default();
135
136 let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
138 let consensus = reth_consensus::noop::NoopConsensus::arc();
140
141 let result = import_blocks_from_file(
142 rlp_path,
143 import_config,
144 provider_factory.clone(),
145 &config,
146 evm_config,
147 consensus,
148 runtime.clone(),
149 )
150 .await?;
151
152 info!(
153 target: "test",
154 "Imported {} blocks and {} transactions for node {}",
155 result.total_imported_blocks,
156 result.total_imported_txns,
157 idx
158 );
159
160 debug!(target: "e2e::import",
161 "Import result for node {}: decoded {} blocks, imported {} blocks, complete: {}",
162 idx,
163 result.total_decoded_blocks,
164 result.total_imported_blocks,
165 result.is_complete()
166 );
167
168 if result.total_decoded_blocks != result.total_imported_blocks {
169 debug!(target: "e2e::import",
170 "Import block count mismatch: decoded {} != imported {}",
171 result.total_decoded_blocks, result.total_imported_blocks
172 );
173 return Err(eyre::eyre!("Chain import block count mismatch for node {}", idx));
174 }
175
176 if result.total_decoded_txns != result.total_imported_txns {
177 debug!(target: "e2e::import",
178 "Import transaction count mismatch: decoded {} != imported {}",
179 result.total_decoded_txns, result.total_imported_txns
180 );
181 return Err(eyre::eyre!("Chain import transaction count mismatch for node {}", idx));
182 }
183
184 {
186 let provider = provider_factory.database_provider_ro()?;
187 let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers)?;
188 if headers_checkpoint.is_none() {
189 return Err(eyre::eyre!("Headers stage checkpoint is missing after import!"));
190 }
191 debug!(target: "e2e::import", "Headers stage checkpoint after import: {headers_checkpoint:?}");
192 drop(provider);
193 }
194
195 {
199 let static_file_provider = provider_factory.static_file_provider();
200 drop(static_file_provider);
202 }
203
204 drop(provider_factory);
206 drop(db);
207
208 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
210
211 debug!(target: "e2e::import", "Launching node with datadir: {:?}", datadir);
213
214 let node = EthereumNode::default();
217
218 let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
219 .testing_node_with_datadir(runtime.clone(), datadir.clone())
220 .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
221 .with_components(node.components_builder())
222 .with_add_ons(node.add_ons())
223 .launch_with_fn(|builder| {
224 let launcher = EngineNodeLauncher::new(
225 builder.task_executor().clone(),
226 builder.config().datadir(),
227 tree_config.clone(),
228 );
229 builder.launch_with(launcher)
230 })
231 .await?;
232
233 let node_ctx = NodeTestContext::new(node, attributes_generator).await?;
234
235 nodes.push(node_ctx);
236 temp_dirs.push(temp_dir); }
238
239 Ok(ChainImportResult {
240 nodes,
241 wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
242 _temp_dirs: temp_dirs,
243 })
244}
245
246pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine::ForkchoiceState> {
248 let json_str = std::fs::read_to_string(path)?;
249 let fcu_data: serde_json::Value = serde_json::from_str(&json_str)?;
250
251 let state = &fcu_data["params"][0];
253 Ok(alloy_rpc_types_engine::ForkchoiceState {
254 head_block_hash: state["headBlockHash"]
255 .as_str()
256 .ok_or_else(|| eyre::eyre!("missing headBlockHash"))?
257 .parse()?,
258 safe_block_hash: state["safeBlockHash"]
259 .as_str()
260 .ok_or_else(|| eyre::eyre!("missing safeBlockHash"))?
261 .parse()?,
262 finalized_block_hash: state["finalizedBlockHash"]
263 .as_str()
264 .ok_or_else(|| eyre::eyre!("missing finalizedBlockHash"))?
265 .parse()?,
266 })
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
273 use alloy_rpc_types_engine::PayloadAttributes;
274 use reth_chainspec::{ChainSpecBuilder, MAINNET};
275 use reth_db::mdbx::DatabaseArguments;
276 use reth_ethereum_primitives::Block;
277 use reth_primitives_traits::SealedBlock;
278 use reth_provider::{
279 test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
280 };
281 use std::path::PathBuf;
282
283 #[tokio::test]
284 async fn test_stage_checkpoints_persistence() {
285 reth_tracing::init_test_tracing();
288
289 let chain_spec = Arc::new(
290 ChainSpecBuilder::default()
291 .chain(MAINNET.chain)
292 .genesis(
293 serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
294 )
295 .london_activated()
296 .shanghai_activated()
297 .build(),
298 );
299
300 let test_blocks = generate_test_blocks(&chain_spec, 5);
302
303 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
305 let rlp_path = temp_dir.path().join("test_chain.rlp");
306 write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
307
308 let datadir = temp_dir.path().join("datadir");
310 std::fs::create_dir_all(&datadir).unwrap();
311 let db_path = datadir.join("db");
312 let static_files_path = datadir.join("static_files");
313 let rocksdb_dir_path = datadir.join("rocksdb");
314
315 {
317 let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
318 let db = reth_db::init_db(&db_path, db_args).unwrap();
319
320 let provider_factory: ProviderFactory<
321 NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
322 > = ProviderFactory::new(
323 db.clone(),
324 chain_spec.clone(),
325 reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
326 .unwrap(),
327 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
328 .with_default_tables()
329 .build()
330 .unwrap(),
331 reth_tasks::Runtime::test(),
332 )
333 .expect("failed to create provider factory");
334
335 reth_db_common::init::init_genesis(&provider_factory).unwrap();
337
338 let import_config = ImportConfig::default();
340 let config = Config::default();
341 let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
342 let consensus = reth_consensus::noop::NoopConsensus::arc();
344 let runtime = reth_tasks::Runtime::test();
345
346 let result = import_blocks_from_file(
347 &rlp_path,
348 import_config,
349 provider_factory.clone(),
350 &config,
351 evm_config,
352 consensus,
353 runtime,
354 )
355 .await
356 .unwrap();
357
358 assert_eq!(result.total_decoded_blocks, 5);
359 assert_eq!(result.total_imported_blocks, 5);
360
361 let provider = provider_factory.database_provider_ro().unwrap();
363 let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
364 assert!(headers_checkpoint.is_some(), "Headers checkpoint should exist after import");
365 assert_eq!(
366 headers_checkpoint.unwrap().block_number,
367 5,
368 "Headers checkpoint should be at block 5"
369 );
370 drop(provider);
371
372 let static_file_provider = provider_factory.static_file_provider();
374 drop(static_file_provider);
375
376 drop(provider_factory);
377 drop(db);
378 }
379
380 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
382
383 {
385 let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
386
387 let provider_factory: ProviderFactory<
388 NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
389 > = ProviderFactory::new(
390 db,
391 chain_spec.clone(),
392 reth_provider::providers::StaticFileProvider::read_only(static_files_path).unwrap(),
393 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
394 .with_default_tables()
395 .build()
396 .unwrap(),
397 reth_tasks::Runtime::test(),
398 )
399 .expect("failed to create provider factory");
400
401 let provider = provider_factory.database_provider_ro().unwrap();
402
403 let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
405 assert!(
406 headers_checkpoint.is_some(),
407 "Headers checkpoint should still exist after reopening database"
408 );
409 assert_eq!(
410 headers_checkpoint.unwrap().block_number,
411 5,
412 "Headers checkpoint should still be at block 5"
413 );
414
415 let block_5_hash = provider.block_hash(5).unwrap();
417 assert!(block_5_hash.is_some(), "Block 5 should exist in database");
418 assert_eq!(block_5_hash.unwrap(), test_blocks[4].hash(), "Block 5 hash should match");
419
420 debug!(target: "e2e::import", "All stage checkpoints after reopening:");
422 for stage in StageId::ALL {
423 let checkpoint = provider.get_stage_checkpoint(stage).unwrap();
424 debug!(target: "e2e::import", " Stage {stage:?}: {checkpoint:?}");
425 }
426 }
427 }
428
429 fn create_test_chain_spec() -> Arc<ChainSpec> {
431 Arc::new(
432 ChainSpecBuilder::default()
433 .chain(MAINNET.chain)
434 .genesis(
435 serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
436 )
437 .london_activated()
438 .shanghai_activated()
439 .build(),
440 )
441 }
442
443 async fn setup_test_blocks_and_rlp(
445 chain_spec: &ChainSpec,
446 block_count: u64,
447 temp_dir: &Path,
448 ) -> (Vec<SealedBlock<Block>>, PathBuf) {
449 let test_blocks = generate_test_blocks(chain_spec, block_count);
450 assert_eq!(
451 test_blocks.len(),
452 block_count as usize,
453 "Should have generated expected blocks"
454 );
455
456 let rlp_path = temp_dir.join("test_chain.rlp");
457 write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
458
459 let rlp_size = std::fs::metadata(&rlp_path).expect("RLP file should exist").len();
460 debug!(target: "e2e::import", "Wrote RLP file with size: {rlp_size} bytes");
461
462 (test_blocks, rlp_path)
463 }
464
465 #[tokio::test]
466 async fn test_import_blocks_only() {
467 reth_tracing::init_test_tracing();
469
470 let chain_spec = create_test_chain_spec();
471 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
472 let (test_blocks, rlp_path) =
473 setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
474
475 let datadir = temp_dir.path().join("datadir");
477 std::fs::create_dir_all(&datadir).unwrap();
478 let db_path = datadir.join("db");
479 let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
480 let db_env = reth_db::init_db(&db_path, db_args).unwrap();
481 let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
482
483 let static_files_path = datadir.join("static_files");
485
486 let rocksdb_dir_path = datadir.join("rocksdb");
488
489 let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
491 db.clone(),
492 chain_spec.clone(),
493 reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
494 reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
495 .with_default_tables()
496 .build()
497 .unwrap(),
498 reth_tasks::Runtime::test(),
499 )
500 .expect("failed to create provider factory");
501
502 reth_db_common::init::init_genesis(&provider_factory).unwrap();
504
505 let import_config = ImportConfig::default();
507 let config = Config::default();
508 let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
509 let consensus = reth_consensus::noop::NoopConsensus::arc();
511 let runtime = reth_tasks::Runtime::test();
512
513 let result = import_blocks_from_file(
514 &rlp_path,
515 import_config,
516 provider_factory.clone(),
517 &config,
518 evm_config,
519 consensus,
520 runtime,
521 )
522 .await
523 .unwrap();
524
525 debug!(target: "e2e::import",
526 "Import result: decoded {} blocks, imported {} blocks",
527 result.total_decoded_blocks, result.total_imported_blocks
528 );
529
530 assert_eq!(result.total_decoded_blocks, 10);
532 assert_eq!(result.total_imported_blocks, 10);
533 assert_eq!(result.total_decoded_txns, 0);
534 assert_eq!(result.total_imported_txns, 0);
535
536 let provider = provider_factory.database_provider_ro().unwrap();
538 let latest_block = provider.last_block_number().unwrap();
539 assert_eq!(latest_block, 10, "Should have imported up to block 10");
540
541 let block_10_hash = provider.block_hash(10).unwrap().expect("Block 10 should exist");
542 assert_eq!(block_10_hash, test_blocks[9].hash(), "Block 10 hash should match");
543 }
544
545 #[tokio::test]
546 async fn test_import_with_node_integration() {
547 reth_tracing::init_test_tracing();
549
550 let chain_spec = create_test_chain_spec();
551 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
552 let (test_blocks, rlp_path) =
553 setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
554
555 let tip = test_blocks.last().expect("Should have generated blocks");
557 let fcu_path = temp_dir.path().join("test_fcu.json");
558 std::fs::write(&fcu_path, create_fcu_json(tip).to_string())
559 .expect("Failed to write FCU data");
560
561 let result = setup_engine_with_chain_import(
563 1,
564 chain_spec,
565 false,
566 TreeConfig::default(),
567 &rlp_path,
568 |_| PayloadAttributes::default(),
569 )
570 .await
571 .expect("Failed to setup nodes with chain import");
572
573 let fcu_state = load_forkchoice_state(&fcu_path).expect("Failed to load forkchoice state");
575
576 let node = &result.nodes[0];
577
578 node.update_forkchoice(fcu_state.finalized_block_hash, fcu_state.head_block_hash)
580 .await
581 .expect("Failed to update forkchoice");
582
583 node.sync_to(fcu_state.head_block_hash).await.expect("Failed to sync to head");
585
586 let latest = node
588 .inner
589 .provider
590 .sealed_header_by_id(alloy_eips::BlockId::latest())
591 .expect("Failed to get latest header")
592 .expect("No latest header found");
593
594 assert_eq!(
595 latest.hash(),
596 fcu_state.head_block_hash,
597 "Chain tip does not match expected head"
598 );
599 }
600}