Skip to main content

reth_e2e_test_utils/
setup_import.rs

1//! Setup utilities for importing RLP chain data before starting nodes.
2
3use crate::{node::NodeTestContext, NodeHelperType, Wallet};
4use reth_chainspec::ChainSpec;
5use reth_cli_commands::import_core::{import_blocks_from_file, ImportConfig};
6use reth_config::Config;
7use reth_db::DatabaseEnv;
8use reth_node_api::{NodeTypesWithDBAdapter, TreeConfig};
9use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig, NodeHandle};
10use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
11use reth_node_ethereum::EthereumNode;
12use reth_provider::{
13    providers::BlockchainProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
14    StaticFileProviderFactory,
15};
16use reth_rpc_server_types::RpcModuleSelection;
17use reth_stages_types::StageId;
18use std::{path::Path, sync::Arc};
19use tempfile::TempDir;
20use tracing::{debug, info, span, Level};
21
22/// Setup result containing nodes and temporary directories that must be kept alive
23pub struct ChainImportResult {
24    /// The nodes that were created
25    pub nodes: Vec<NodeHelperType<EthereumNode>>,
26    /// The wallet for testing
27    pub wallet: Wallet,
28    /// Temporary directories that must be kept alive for the duration of the test
29    pub _temp_dirs: Vec<TempDir>,
30}
31
32impl std::fmt::Debug for ChainImportResult {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("ChainImportResult")
35            .field("nodes", &self.nodes.len())
36            .field("wallet", &self.wallet)
37            .field("temp_dirs", &self._temp_dirs.len())
38            .finish()
39    }
40}
41
42/// Creates a test setup with Ethereum nodes that have pre-imported chain data from RLP files.
43///
44/// This function:
45/// 1. Creates a temporary datadir for each node
46/// 2. Imports the specified RLP chain data into the datadir
47/// 3. Starts the nodes with the pre-populated database
48/// 4. Returns the running nodes ready for testing
49///
50/// Note: This function is currently specific to `EthereumNode` because the import process
51/// uses Ethereum-specific consensus and block format. It can be made generic in the future
52/// by abstracting the import process.
53/// It uses `NoopConsensus` during import to bypass validation checks like gas limit constraints,
54/// which allows importing test chains that may not strictly conform to mainnet consensus rules. The
55/// nodes themselves still run with proper consensus when started.
56pub async fn setup_engine_with_chain_import(
57    num_nodes: usize,
58    chain_spec: Arc<ChainSpec>,
59    is_dev: bool,
60    tree_config: TreeConfig,
61    rlp_path: &Path,
62    attributes_generator: impl Fn(u64) -> reth_payload_builder::EthPayloadBuilderAttributes
63        + Send
64        + Sync
65        + Copy
66        + 'static,
67) -> eyre::Result<ChainImportResult> {
68    let runtime = reth_tasks::Runtime::test();
69
70    let network_config = NetworkArgs {
71        discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
72        ..NetworkArgs::default()
73    };
74
75    // Create nodes with imported data
76    let mut nodes: Vec<NodeHelperType<EthereumNode>> = Vec::with_capacity(num_nodes);
77    let mut temp_dirs = Vec::with_capacity(num_nodes); // Keep temp dirs alive
78
79    for idx in 0..num_nodes {
80        // Create a temporary datadir for this node
81        let temp_dir = TempDir::new()?;
82        let datadir = temp_dir.path().to_path_buf();
83
84        let mut node_config = NodeConfig::new(chain_spec.clone())
85            .with_network(network_config.clone())
86            .with_unused_ports()
87            .with_rpc(
88                RpcServerArgs::default()
89                    .with_unused_ports()
90                    .with_http()
91                    .with_http_api(RpcModuleSelection::All),
92            )
93            .set_dev(is_dev);
94
95        // Set the datadir
96        node_config.datadir.datadir =
97            reth_node_core::dirs::MaybePlatformPath::from(datadir.clone());
98        debug!(target: "e2e::import", "Node {idx} datadir: {datadir:?}");
99
100        let span = span!(Level::INFO, "node", idx);
101        let _enter = span.enter();
102
103        // First, import the chain data into this datadir
104        info!(target: "test", "Importing chain data from {:?} for node {} into {:?}", rlp_path, idx, datadir);
105
106        // Create database path and static files path
107        let db_path = datadir.join("db");
108        let static_files_path = datadir.join("static_files");
109        let rocksdb_dir_path = datadir.join("rocksdb");
110
111        // Initialize the database using init_db (same as CLI import command)
112        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
113        let db = reth_db::init_db(&db_path, db_args)?;
114
115        // Create a provider factory with the initialized database (use regular DB, not
116        // TempDatabase) We need to specify the node types properly for the adapter
117        let provider_factory =
118            ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
119                db.clone(),
120                chain_spec.clone(),
121                reth_provider::providers::StaticFileProvider::read_write(
122                    static_files_path.clone(),
123                )?,
124                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
125                    .with_default_tables()
126                    .build()
127                    .unwrap(),
128                reth_tasks::Runtime::test(),
129            )?;
130
131        // Initialize genesis if needed
132        reth_db_common::init::init_genesis(&provider_factory)?;
133
134        // Import the chain data
135        // Use no_state to skip state validation for test chains
136        let import_config = ImportConfig::default();
137        let config = Config::default();
138
139        // Create EVM and consensus for Ethereum
140        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
141        // Use NoopConsensus to skip gas limit validation for test imports
142        let consensus = reth_consensus::noop::NoopConsensus::arc();
143
144        let result = import_blocks_from_file(
145            rlp_path,
146            import_config,
147            provider_factory.clone(),
148            &config,
149            evm_config,
150            consensus,
151            runtime.clone(),
152        )
153        .await?;
154
155        info!(
156            target: "test",
157            "Imported {} blocks and {} transactions for node {}",
158            result.total_imported_blocks,
159            result.total_imported_txns,
160            idx
161        );
162
163        debug!(target: "e2e::import",
164            "Import result for node {}: decoded {} blocks, imported {} blocks, complete: {}",
165            idx,
166            result.total_decoded_blocks,
167            result.total_imported_blocks,
168            result.is_complete()
169        );
170
171        if result.total_decoded_blocks != result.total_imported_blocks {
172            debug!(target: "e2e::import",
173                "Import block count mismatch: decoded {} != imported {}",
174                result.total_decoded_blocks, result.total_imported_blocks
175            );
176            return Err(eyre::eyre!("Chain import block count mismatch for node {}", idx));
177        }
178
179        if result.total_decoded_txns != result.total_imported_txns {
180            debug!(target: "e2e::import",
181                "Import transaction count mismatch: decoded {} != imported {}",
182                result.total_decoded_txns, result.total_imported_txns
183            );
184            return Err(eyre::eyre!("Chain import transaction count mismatch for node {}", idx));
185        }
186
187        // Verify the database was properly initialized by checking stage checkpoints
188        {
189            let provider = provider_factory.database_provider_ro()?;
190            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers)?;
191            if headers_checkpoint.is_none() {
192                return Err(eyre::eyre!("Headers stage checkpoint is missing after import!"));
193            }
194            debug!(target: "e2e::import", "Headers stage checkpoint after import: {headers_checkpoint:?}");
195            drop(provider);
196        }
197
198        // IMPORTANT: We need to properly flush and close the static files provider
199        // The static files provider may have open file handles that need to be closed
200        // before we can reopen the database in the node launcher
201        {
202            let static_file_provider = provider_factory.static_file_provider();
203            // This will ensure all static file writers are properly closed
204            drop(static_file_provider);
205        }
206
207        // Close all database handles to release locks before launching the node
208        drop(provider_factory);
209        drop(db);
210
211        // Give the OS a moment to release file locks
212        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213
214        // Now launch the node with the pre-populated datadir
215        debug!(target: "e2e::import", "Launching node with datadir: {:?}", datadir);
216
217        // Use the testing_node_with_datadir method which properly handles opening existing
218        // databases
219        let node = EthereumNode::default();
220
221        let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
222            .testing_node_with_datadir(runtime.clone(), datadir.clone())
223            .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
224            .with_components(node.components_builder())
225            .with_add_ons(node.add_ons())
226            .launch_with_fn(|builder| {
227                let launcher = EngineNodeLauncher::new(
228                    builder.task_executor().clone(),
229                    builder.config().datadir(),
230                    tree_config.clone(),
231                );
232                builder.launch_with(launcher)
233            })
234            .await?;
235
236        let node_ctx = NodeTestContext::new(node, attributes_generator).await?;
237
238        nodes.push(node_ctx);
239        temp_dirs.push(temp_dir); // Keep temp dir alive
240    }
241
242    Ok(ChainImportResult {
243        nodes,
244        wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
245        _temp_dirs: temp_dirs,
246    })
247}
248
249/// Helper to load forkchoice state from a JSON file
250pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine::ForkchoiceState> {
251    let json_str = std::fs::read_to_string(path)?;
252    let fcu_data: serde_json::Value = serde_json::from_str(&json_str)?;
253
254    // The headfcu.json file contains a JSON-RPC request with the forkchoice state in params[0]
255    let state = &fcu_data["params"][0];
256    Ok(alloy_rpc_types_engine::ForkchoiceState {
257        head_block_hash: state["headBlockHash"]
258            .as_str()
259            .ok_or_else(|| eyre::eyre!("missing headBlockHash"))?
260            .parse()?,
261        safe_block_hash: state["safeBlockHash"]
262            .as_str()
263            .ok_or_else(|| eyre::eyre!("missing safeBlockHash"))?
264            .parse()?,
265        finalized_block_hash: state["finalizedBlockHash"]
266            .as_str()
267            .ok_or_else(|| eyre::eyre!("missing finalizedBlockHash"))?
268            .parse()?,
269    })
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
276    use reth_chainspec::{ChainSpecBuilder, MAINNET};
277    use reth_db::mdbx::DatabaseArguments;
278    use reth_ethereum_primitives::Block;
279    use reth_payload_builder::EthPayloadBuilderAttributes;
280    use reth_primitives_traits::SealedBlock;
281    use reth_provider::{
282        test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
283    };
284    use std::path::PathBuf;
285
286    #[tokio::test]
287    async fn test_stage_checkpoints_persistence() {
288        // This test specifically verifies that stage checkpoints are persisted correctly
289        // when reopening the database
290        reth_tracing::init_test_tracing();
291
292        let chain_spec = Arc::new(
293            ChainSpecBuilder::default()
294                .chain(MAINNET.chain)
295                .genesis(
296                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
297                )
298                .london_activated()
299                .shanghai_activated()
300                .build(),
301        );
302
303        // Generate test blocks
304        let test_blocks = generate_test_blocks(&chain_spec, 5);
305
306        // Create temporary files for RLP data
307        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
308        let rlp_path = temp_dir.path().join("test_chain.rlp");
309        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
310
311        // Create a persistent datadir that won't be deleted
312        let datadir = temp_dir.path().join("datadir");
313        std::fs::create_dir_all(&datadir).unwrap();
314        let db_path = datadir.join("db");
315        let static_files_path = datadir.join("static_files");
316        let rocksdb_dir_path = datadir.join("rocksdb");
317
318        // Import the chain
319        {
320            let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
321            let db = reth_db::init_db(&db_path, db_args).unwrap();
322
323            let provider_factory: ProviderFactory<
324                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
325            > = ProviderFactory::new(
326                db.clone(),
327                chain_spec.clone(),
328                reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
329                    .unwrap(),
330                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
331                    .with_default_tables()
332                    .build()
333                    .unwrap(),
334                reth_tasks::Runtime::test(),
335            )
336            .expect("failed to create provider factory");
337
338            // Initialize genesis
339            reth_db_common::init::init_genesis(&provider_factory).unwrap();
340
341            // Import the chain data
342            let import_config = ImportConfig::default();
343            let config = Config::default();
344            let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
345            // Use NoopConsensus to skip gas limit validation for test imports
346            let consensus = reth_consensus::noop::NoopConsensus::arc();
347            let runtime = reth_tasks::Runtime::test();
348
349            let result = import_blocks_from_file(
350                &rlp_path,
351                import_config,
352                provider_factory.clone(),
353                &config,
354                evm_config,
355                consensus,
356                runtime,
357            )
358            .await
359            .unwrap();
360
361            assert_eq!(result.total_decoded_blocks, 5);
362            assert_eq!(result.total_imported_blocks, 5);
363
364            // Verify stage checkpoints exist
365            let provider = provider_factory.database_provider_ro().unwrap();
366            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
367            assert!(headers_checkpoint.is_some(), "Headers checkpoint should exist after import");
368            assert_eq!(
369                headers_checkpoint.unwrap().block_number,
370                5,
371                "Headers checkpoint should be at block 5"
372            );
373            drop(provider);
374
375            // Properly close static files to release all file handles
376            let static_file_provider = provider_factory.static_file_provider();
377            drop(static_file_provider);
378
379            drop(provider_factory);
380            drop(db);
381        }
382
383        // Give the OS a moment to release file locks
384        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
385
386        // Now reopen the database and verify checkpoints are still there
387        {
388            let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
389
390            let provider_factory: ProviderFactory<
391                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
392            > = ProviderFactory::new(
393                db,
394                chain_spec.clone(),
395                reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
396                    .unwrap(),
397                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
398                    .with_default_tables()
399                    .build()
400                    .unwrap(),
401                reth_tasks::Runtime::test(),
402            )
403            .expect("failed to create provider factory");
404
405            let provider = provider_factory.database_provider_ro().unwrap();
406
407            // Check that stage checkpoints are still present
408            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
409            assert!(
410                headers_checkpoint.is_some(),
411                "Headers checkpoint should still exist after reopening database"
412            );
413            assert_eq!(
414                headers_checkpoint.unwrap().block_number,
415                5,
416                "Headers checkpoint should still be at block 5"
417            );
418
419            // Verify we can read blocks
420            let block_5_hash = provider.block_hash(5).unwrap();
421            assert!(block_5_hash.is_some(), "Block 5 should exist in database");
422            assert_eq!(block_5_hash.unwrap(), test_blocks[4].hash(), "Block 5 hash should match");
423
424            // Check all stage checkpoints
425            debug!(target: "e2e::import", "All stage checkpoints after reopening:");
426            for stage in StageId::ALL {
427                let checkpoint = provider.get_stage_checkpoint(stage).unwrap();
428                debug!(target: "e2e::import", "  Stage {stage:?}: {checkpoint:?}");
429            }
430        }
431    }
432
433    /// Helper to create test chain spec
434    fn create_test_chain_spec() -> Arc<ChainSpec> {
435        Arc::new(
436            ChainSpecBuilder::default()
437                .chain(MAINNET.chain)
438                .genesis(
439                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
440                )
441                .london_activated()
442                .shanghai_activated()
443                .build(),
444        )
445    }
446
447    /// Helper to setup test blocks and write to RLP
448    async fn setup_test_blocks_and_rlp(
449        chain_spec: &ChainSpec,
450        block_count: u64,
451        temp_dir: &Path,
452    ) -> (Vec<SealedBlock<Block>>, PathBuf) {
453        let test_blocks = generate_test_blocks(chain_spec, block_count);
454        assert_eq!(
455            test_blocks.len(),
456            block_count as usize,
457            "Should have generated expected blocks"
458        );
459
460        let rlp_path = temp_dir.join("test_chain.rlp");
461        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
462
463        let rlp_size = std::fs::metadata(&rlp_path).expect("RLP file should exist").len();
464        debug!(target: "e2e::import", "Wrote RLP file with size: {rlp_size} bytes");
465
466        (test_blocks, rlp_path)
467    }
468
469    #[tokio::test]
470    async fn test_import_blocks_only() {
471        // Tests just the block import functionality without full node setup
472        reth_tracing::init_test_tracing();
473
474        let chain_spec = create_test_chain_spec();
475        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
476        let (test_blocks, rlp_path) =
477            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
478
479        // Create a test database
480        let datadir = temp_dir.path().join("datadir");
481        std::fs::create_dir_all(&datadir).unwrap();
482        let db_path = datadir.join("db");
483        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
484        let db_env = reth_db::init_db(&db_path, db_args).unwrap();
485        let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
486
487        // Create static files path
488        let static_files_path = datadir.join("static_files");
489
490        // Create rocksdb path
491        let rocksdb_dir_path = datadir.join("rocksdb");
492
493        // Create a provider factory
494        let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
495            db.clone(),
496            chain_spec.clone(),
497            reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
498            reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
499                .with_default_tables()
500                .build()
501                .unwrap(),
502            reth_tasks::Runtime::test(),
503        )
504        .expect("failed to create provider factory");
505
506        // Initialize genesis
507        reth_db_common::init::init_genesis(&provider_factory).unwrap();
508
509        // Import the chain data
510        let import_config = ImportConfig::default();
511        let config = Config::default();
512        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
513        // Use NoopConsensus to skip gas limit validation for test imports
514        let consensus = reth_consensus::noop::NoopConsensus::arc();
515        let runtime = reth_tasks::Runtime::test();
516
517        let result = import_blocks_from_file(
518            &rlp_path,
519            import_config,
520            provider_factory.clone(),
521            &config,
522            evm_config,
523            consensus,
524            runtime,
525        )
526        .await
527        .unwrap();
528
529        debug!(target: "e2e::import",
530            "Import result: decoded {} blocks, imported {} blocks",
531            result.total_decoded_blocks, result.total_imported_blocks
532        );
533
534        // Verify the import was successful
535        assert_eq!(result.total_decoded_blocks, 10);
536        assert_eq!(result.total_imported_blocks, 10);
537        assert_eq!(result.total_decoded_txns, 0);
538        assert_eq!(result.total_imported_txns, 0);
539
540        // Verify we can read the imported blocks
541        let provider = provider_factory.database_provider_ro().unwrap();
542        let latest_block = provider.last_block_number().unwrap();
543        assert_eq!(latest_block, 10, "Should have imported up to block 10");
544
545        let block_10_hash = provider.block_hash(10).unwrap().expect("Block 10 should exist");
546        assert_eq!(block_10_hash, test_blocks[9].hash(), "Block 10 hash should match");
547    }
548
549    #[tokio::test]
550    async fn test_import_with_node_integration() {
551        // Tests the full integration with node setup, forkchoice updates, and syncing
552        reth_tracing::init_test_tracing();
553
554        let chain_spec = create_test_chain_spec();
555        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
556        let (test_blocks, rlp_path) =
557            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
558
559        // Create FCU data for the tip
560        let tip = test_blocks.last().expect("Should have generated blocks");
561        let fcu_path = temp_dir.path().join("test_fcu.json");
562        std::fs::write(&fcu_path, create_fcu_json(tip).to_string())
563            .expect("Failed to write FCU data");
564
565        // Setup nodes with imported chain
566        let result = setup_engine_with_chain_import(
567            1,
568            chain_spec,
569            false,
570            TreeConfig::default(),
571            &rlp_path,
572            |_| EthPayloadBuilderAttributes::default(),
573        )
574        .await
575        .expect("Failed to setup nodes with chain import");
576
577        // Load and apply forkchoice state
578        let fcu_state = load_forkchoice_state(&fcu_path).expect("Failed to load forkchoice state");
579
580        let node = &result.nodes[0];
581
582        // Send forkchoice update to make the imported chain canonical
583        node.update_forkchoice(fcu_state.finalized_block_hash, fcu_state.head_block_hash)
584            .await
585            .expect("Failed to update forkchoice");
586
587        // Wait for the node to sync to the head
588        node.sync_to(fcu_state.head_block_hash).await.expect("Failed to sync to head");
589
590        // Verify the chain tip
591        let latest = node
592            .inner
593            .provider
594            .sealed_header_by_id(alloy_eips::BlockId::latest())
595            .expect("Failed to get latest header")
596            .expect("No latest header found");
597
598        assert_eq!(
599            latest.hash(),
600            fcu_state.head_block_hash,
601            "Chain tip does not match expected head"
602        );
603    }
604}