Skip to main content

reth_e2e_test_utils/
setup_import.rs

1//! Setup utilities for importing RLP chain data before starting nodes.
2
3use crate::{node::NodeTestContext, NodeHelperType, Wallet};
4use alloy_rpc_types_engine::PayloadAttributes;
5use reth_chainspec::ChainSpec;
6use reth_cli_commands::import_core::{import_blocks_from_file, ImportConfig};
7use reth_config::Config;
8use reth_db::DatabaseEnv;
9use reth_node_api::{NodeTypesWithDBAdapter, TreeConfig};
10use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig, NodeHandle};
11use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
12use reth_node_ethereum::EthereumNode;
13use reth_provider::{
14    providers::BlockchainProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
15    StaticFileProviderFactory,
16};
17use reth_rpc_server_types::RpcModuleSelection;
18use reth_stages_types::StageId;
19use std::{path::Path, sync::Arc};
20use tempfile::TempDir;
21use tracing::{debug, info, span, Level};
22
23/// Setup result containing nodes and temporary directories that must be kept alive
24pub struct ChainImportResult {
25    /// The nodes that were created
26    pub nodes: Vec<NodeHelperType<EthereumNode>>,
27    /// The wallet for testing
28    pub wallet: Wallet,
29    /// Temporary directories that must be kept alive for the duration of the test
30    pub _temp_dirs: Vec<TempDir>,
31}
32
33impl std::fmt::Debug for ChainImportResult {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("ChainImportResult")
36            .field("nodes", &self.nodes.len())
37            .field("wallet", &self.wallet)
38            .field("temp_dirs", &self._temp_dirs.len())
39            .finish()
40    }
41}
42
43/// Creates a test setup with Ethereum nodes that have pre-imported chain data from RLP files.
44///
45/// This function:
46/// 1. Creates a temporary datadir for each node
47/// 2. Imports the specified RLP chain data into the datadir
48/// 3. Starts the nodes with the pre-populated database
49/// 4. Returns the running nodes ready for testing
50///
51/// Note: This function is currently specific to `EthereumNode` because the import process
52/// uses Ethereum-specific consensus and block format. It can be made generic in the future
53/// by abstracting the import process.
54/// It uses `NoopConsensus` during import to bypass validation checks like gas limit constraints,
55/// which allows importing test chains that may not strictly conform to mainnet consensus rules. The
56/// nodes themselves still run with proper consensus when started.
57pub async fn setup_engine_with_chain_import(
58    num_nodes: usize,
59    chain_spec: Arc<ChainSpec>,
60    is_dev: bool,
61    tree_config: TreeConfig,
62    rlp_path: &Path,
63    attributes_generator: impl Fn(u64) -> PayloadAttributes + Send + Sync + Copy + 'static,
64) -> eyre::Result<ChainImportResult> {
65    let runtime = reth_tasks::Runtime::test();
66
67    let network_config = NetworkArgs {
68        discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
69        ..NetworkArgs::default()
70    };
71
72    // Create nodes with imported data
73    let mut nodes: Vec<NodeHelperType<EthereumNode>> = Vec::with_capacity(num_nodes);
74    let mut temp_dirs = Vec::with_capacity(num_nodes); // Keep temp dirs alive
75
76    for idx in 0..num_nodes {
77        // Create a temporary datadir for this node
78        let temp_dir = TempDir::new()?;
79        let datadir = temp_dir.path().to_path_buf();
80
81        let mut node_config = NodeConfig::new(chain_spec.clone())
82            .with_network(network_config.clone())
83            .with_unused_ports()
84            .with_rpc(
85                RpcServerArgs::default()
86                    .with_unused_ports()
87                    .with_http()
88                    .with_http_api(RpcModuleSelection::All),
89            )
90            .set_dev(is_dev);
91
92        // Set the datadir
93        node_config.datadir.datadir =
94            reth_node_core::dirs::MaybePlatformPath::from(datadir.clone());
95        debug!(target: "e2e::import", "Node {idx} datadir: {datadir:?}");
96
97        let span = span!(Level::INFO, "node", idx);
98        let _enter = span.enter();
99
100        // First, import the chain data into this datadir
101        info!(target: "test", "Importing chain data from {:?} for node {} into {:?}", rlp_path, idx, datadir);
102
103        // Create database path and static files path
104        let db_path = datadir.join("db");
105        let static_files_path = datadir.join("static_files");
106        let rocksdb_dir_path = datadir.join("rocksdb");
107
108        // Initialize the database using init_db (same as CLI import command)
109        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
110        let db = reth_db::init_db(&db_path, db_args)?;
111
112        // Create a provider factory with the initialized database (use regular DB, not
113        // TempDatabase) We need to specify the node types properly for the adapter
114        let provider_factory =
115            ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
116                db.clone(),
117                chain_spec.clone(),
118                reth_provider::providers::StaticFileProvider::read_write(
119                    static_files_path.clone(),
120                )?,
121                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
122                    .with_default_tables()
123                    .build()
124                    .unwrap(),
125                reth_tasks::Runtime::test(),
126            )?;
127
128        // Initialize genesis if needed
129        reth_db_common::init::init_genesis(&provider_factory)?;
130
131        // Import the chain data
132        // Use no_state to skip state validation for test chains
133        let import_config = ImportConfig::default();
134        let config = Config::default();
135
136        // Create EVM and consensus for Ethereum
137        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
138        // Use NoopConsensus to skip gas limit validation for test imports
139        let consensus = reth_consensus::noop::NoopConsensus::arc();
140
141        let result = import_blocks_from_file(
142            rlp_path,
143            import_config,
144            provider_factory.clone(),
145            &config,
146            evm_config,
147            consensus,
148            runtime.clone(),
149        )
150        .await?;
151
152        info!(
153            target: "test",
154            "Imported {} blocks and {} transactions for node {}",
155            result.total_imported_blocks,
156            result.total_imported_txns,
157            idx
158        );
159
160        debug!(target: "e2e::import",
161            "Import result for node {}: decoded {} blocks, imported {} blocks, complete: {}",
162            idx,
163            result.total_decoded_blocks,
164            result.total_imported_blocks,
165            result.is_complete()
166        );
167
168        if result.total_decoded_blocks != result.total_imported_blocks {
169            debug!(target: "e2e::import",
170                "Import block count mismatch: decoded {} != imported {}",
171                result.total_decoded_blocks, result.total_imported_blocks
172            );
173            return Err(eyre::eyre!("Chain import block count mismatch for node {}", idx));
174        }
175
176        if result.total_decoded_txns != result.total_imported_txns {
177            debug!(target: "e2e::import",
178                "Import transaction count mismatch: decoded {} != imported {}",
179                result.total_decoded_txns, result.total_imported_txns
180            );
181            return Err(eyre::eyre!("Chain import transaction count mismatch for node {}", idx));
182        }
183
184        // Verify the database was properly initialized by checking stage checkpoints
185        {
186            let provider = provider_factory.database_provider_ro()?;
187            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers)?;
188            if headers_checkpoint.is_none() {
189                return Err(eyre::eyre!("Headers stage checkpoint is missing after import!"));
190            }
191            debug!(target: "e2e::import", "Headers stage checkpoint after import: {headers_checkpoint:?}");
192            drop(provider);
193        }
194
195        // IMPORTANT: We need to properly flush and close the static files provider
196        // The static files provider may have open file handles that need to be closed
197        // before we can reopen the database in the node launcher
198        {
199            let static_file_provider = provider_factory.static_file_provider();
200            // This will ensure all static file writers are properly closed
201            drop(static_file_provider);
202        }
203
204        // Close all database handles to release locks before launching the node
205        drop(provider_factory);
206        drop(db);
207
208        // Give the OS a moment to release file locks
209        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
210
211        // Now launch the node with the pre-populated datadir
212        debug!(target: "e2e::import", "Launching node with datadir: {:?}", datadir);
213
214        // Use the testing_node_with_datadir method which properly handles opening existing
215        // databases
216        let node = EthereumNode::default();
217
218        let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
219            .testing_node_with_datadir(runtime.clone(), datadir.clone())
220            .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
221            .with_components(node.components_builder())
222            .with_add_ons(node.add_ons())
223            .launch_with_fn(|builder| {
224                let launcher = EngineNodeLauncher::new(
225                    builder.task_executor().clone(),
226                    builder.config().datadir(),
227                    tree_config.clone(),
228                );
229                builder.launch_with(launcher)
230            })
231            .await?;
232
233        let node_ctx = NodeTestContext::new(node, attributes_generator).await?;
234
235        nodes.push(node_ctx);
236        temp_dirs.push(temp_dir); // Keep temp dir alive
237    }
238
239    Ok(ChainImportResult {
240        nodes,
241        wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
242        _temp_dirs: temp_dirs,
243    })
244}
245
246/// Helper to load forkchoice state from a JSON file
247pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine::ForkchoiceState> {
248    let json_str = std::fs::read_to_string(path)?;
249    let fcu_data: serde_json::Value = serde_json::from_str(&json_str)?;
250
251    // The headfcu.json file contains a JSON-RPC request with the forkchoice state in params[0]
252    let state = &fcu_data["params"][0];
253    Ok(alloy_rpc_types_engine::ForkchoiceState {
254        head_block_hash: state["headBlockHash"]
255            .as_str()
256            .ok_or_else(|| eyre::eyre!("missing headBlockHash"))?
257            .parse()?,
258        safe_block_hash: state["safeBlockHash"]
259            .as_str()
260            .ok_or_else(|| eyre::eyre!("missing safeBlockHash"))?
261            .parse()?,
262        finalized_block_hash: state["finalizedBlockHash"]
263            .as_str()
264            .ok_or_else(|| eyre::eyre!("missing finalizedBlockHash"))?
265            .parse()?,
266    })
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
273    use alloy_rpc_types_engine::PayloadAttributes;
274    use reth_chainspec::{ChainSpecBuilder, MAINNET};
275    use reth_db::mdbx::DatabaseArguments;
276    use reth_ethereum_primitives::Block;
277    use reth_primitives_traits::SealedBlock;
278    use reth_provider::{
279        test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
280    };
281    use std::path::PathBuf;
282
283    #[tokio::test]
284    async fn test_stage_checkpoints_persistence() {
285        // This test specifically verifies that stage checkpoints are persisted correctly
286        // when reopening the database
287        reth_tracing::init_test_tracing();
288
289        let chain_spec = Arc::new(
290            ChainSpecBuilder::default()
291                .chain(MAINNET.chain)
292                .genesis(
293                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
294                )
295                .london_activated()
296                .shanghai_activated()
297                .build(),
298        );
299
300        // Generate test blocks
301        let test_blocks = generate_test_blocks(&chain_spec, 5);
302
303        // Create temporary files for RLP data
304        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
305        let rlp_path = temp_dir.path().join("test_chain.rlp");
306        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
307
308        // Create a persistent datadir that won't be deleted
309        let datadir = temp_dir.path().join("datadir");
310        std::fs::create_dir_all(&datadir).unwrap();
311        let db_path = datadir.join("db");
312        let static_files_path = datadir.join("static_files");
313        let rocksdb_dir_path = datadir.join("rocksdb");
314
315        // Import the chain
316        {
317            let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
318            let db = reth_db::init_db(&db_path, db_args).unwrap();
319
320            let provider_factory: ProviderFactory<
321                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
322            > = ProviderFactory::new(
323                db.clone(),
324                chain_spec.clone(),
325                reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
326                    .unwrap(),
327                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
328                    .with_default_tables()
329                    .build()
330                    .unwrap(),
331                reth_tasks::Runtime::test(),
332            )
333            .expect("failed to create provider factory");
334
335            // Initialize genesis
336            reth_db_common::init::init_genesis(&provider_factory).unwrap();
337
338            // Import the chain data
339            let import_config = ImportConfig::default();
340            let config = Config::default();
341            let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
342            // Use NoopConsensus to skip gas limit validation for test imports
343            let consensus = reth_consensus::noop::NoopConsensus::arc();
344            let runtime = reth_tasks::Runtime::test();
345
346            let result = import_blocks_from_file(
347                &rlp_path,
348                import_config,
349                provider_factory.clone(),
350                &config,
351                evm_config,
352                consensus,
353                runtime,
354            )
355            .await
356            .unwrap();
357
358            assert_eq!(result.total_decoded_blocks, 5);
359            assert_eq!(result.total_imported_blocks, 5);
360
361            // Verify stage checkpoints exist
362            let provider = provider_factory.database_provider_ro().unwrap();
363            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
364            assert!(headers_checkpoint.is_some(), "Headers checkpoint should exist after import");
365            assert_eq!(
366                headers_checkpoint.unwrap().block_number,
367                5,
368                "Headers checkpoint should be at block 5"
369            );
370            drop(provider);
371
372            // Properly close static files to release all file handles
373            let static_file_provider = provider_factory.static_file_provider();
374            drop(static_file_provider);
375
376            drop(provider_factory);
377            drop(db);
378        }
379
380        // Give the OS a moment to release file locks
381        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
382
383        // Now reopen the database and verify checkpoints are still there
384        {
385            let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
386
387            let provider_factory: ProviderFactory<
388                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
389            > = ProviderFactory::new(
390                db,
391                chain_spec.clone(),
392                reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
393                    .unwrap(),
394                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
395                    .with_default_tables()
396                    .build()
397                    .unwrap(),
398                reth_tasks::Runtime::test(),
399            )
400            .expect("failed to create provider factory");
401
402            let provider = provider_factory.database_provider_ro().unwrap();
403
404            // Check that stage checkpoints are still present
405            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
406            assert!(
407                headers_checkpoint.is_some(),
408                "Headers checkpoint should still exist after reopening database"
409            );
410            assert_eq!(
411                headers_checkpoint.unwrap().block_number,
412                5,
413                "Headers checkpoint should still be at block 5"
414            );
415
416            // Verify we can read blocks
417            let block_5_hash = provider.block_hash(5).unwrap();
418            assert!(block_5_hash.is_some(), "Block 5 should exist in database");
419            assert_eq!(block_5_hash.unwrap(), test_blocks[4].hash(), "Block 5 hash should match");
420
421            // Check all stage checkpoints
422            debug!(target: "e2e::import", "All stage checkpoints after reopening:");
423            for stage in StageId::ALL {
424                let checkpoint = provider.get_stage_checkpoint(stage).unwrap();
425                debug!(target: "e2e::import", "  Stage {stage:?}: {checkpoint:?}");
426            }
427        }
428    }
429
430    /// Helper to create test chain spec
431    fn create_test_chain_spec() -> Arc<ChainSpec> {
432        Arc::new(
433            ChainSpecBuilder::default()
434                .chain(MAINNET.chain)
435                .genesis(
436                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
437                )
438                .london_activated()
439                .shanghai_activated()
440                .build(),
441        )
442    }
443
444    /// Helper to setup test blocks and write to RLP
445    async fn setup_test_blocks_and_rlp(
446        chain_spec: &ChainSpec,
447        block_count: u64,
448        temp_dir: &Path,
449    ) -> (Vec<SealedBlock<Block>>, PathBuf) {
450        let test_blocks = generate_test_blocks(chain_spec, block_count);
451        assert_eq!(
452            test_blocks.len(),
453            block_count as usize,
454            "Should have generated expected blocks"
455        );
456
457        let rlp_path = temp_dir.join("test_chain.rlp");
458        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
459
460        let rlp_size = std::fs::metadata(&rlp_path).expect("RLP file should exist").len();
461        debug!(target: "e2e::import", "Wrote RLP file with size: {rlp_size} bytes");
462
463        (test_blocks, rlp_path)
464    }
465
466    #[tokio::test]
467    async fn test_import_blocks_only() {
468        // Tests just the block import functionality without full node setup
469        reth_tracing::init_test_tracing();
470
471        let chain_spec = create_test_chain_spec();
472        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
473        let (test_blocks, rlp_path) =
474            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
475
476        // Create a test database
477        let datadir = temp_dir.path().join("datadir");
478        std::fs::create_dir_all(&datadir).unwrap();
479        let db_path = datadir.join("db");
480        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
481        let db_env = reth_db::init_db(&db_path, db_args).unwrap();
482        let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
483
484        // Create static files path
485        let static_files_path = datadir.join("static_files");
486
487        // Create rocksdb path
488        let rocksdb_dir_path = datadir.join("rocksdb");
489
490        // Create a provider factory
491        let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
492            db.clone(),
493            chain_spec.clone(),
494            reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
495            reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
496                .with_default_tables()
497                .build()
498                .unwrap(),
499            reth_tasks::Runtime::test(),
500        )
501        .expect("failed to create provider factory");
502
503        // Initialize genesis
504        reth_db_common::init::init_genesis(&provider_factory).unwrap();
505
506        // Import the chain data
507        let import_config = ImportConfig::default();
508        let config = Config::default();
509        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
510        // Use NoopConsensus to skip gas limit validation for test imports
511        let consensus = reth_consensus::noop::NoopConsensus::arc();
512        let runtime = reth_tasks::Runtime::test();
513
514        let result = import_blocks_from_file(
515            &rlp_path,
516            import_config,
517            provider_factory.clone(),
518            &config,
519            evm_config,
520            consensus,
521            runtime,
522        )
523        .await
524        .unwrap();
525
526        debug!(target: "e2e::import",
527            "Import result: decoded {} blocks, imported {} blocks",
528            result.total_decoded_blocks, result.total_imported_blocks
529        );
530
531        // Verify the import was successful
532        assert_eq!(result.total_decoded_blocks, 10);
533        assert_eq!(result.total_imported_blocks, 10);
534        assert_eq!(result.total_decoded_txns, 0);
535        assert_eq!(result.total_imported_txns, 0);
536
537        // Verify we can read the imported blocks
538        let provider = provider_factory.database_provider_ro().unwrap();
539        let latest_block = provider.last_block_number().unwrap();
540        assert_eq!(latest_block, 10, "Should have imported up to block 10");
541
542        let block_10_hash = provider.block_hash(10).unwrap().expect("Block 10 should exist");
543        assert_eq!(block_10_hash, test_blocks[9].hash(), "Block 10 hash should match");
544    }
545
546    #[tokio::test]
547    async fn test_import_with_node_integration() {
548        // Tests the full integration with node setup, forkchoice updates, and syncing
549        reth_tracing::init_test_tracing();
550
551        let chain_spec = create_test_chain_spec();
552        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
553        let (test_blocks, rlp_path) =
554            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
555
556        // Create FCU data for the tip
557        let tip = test_blocks.last().expect("Should have generated blocks");
558        let fcu_path = temp_dir.path().join("test_fcu.json");
559        std::fs::write(&fcu_path, create_fcu_json(tip).to_string())
560            .expect("Failed to write FCU data");
561
562        // Setup nodes with imported chain
563        let result = setup_engine_with_chain_import(
564            1,
565            chain_spec,
566            false,
567            TreeConfig::default(),
568            &rlp_path,
569            |_| PayloadAttributes::default(),
570        )
571        .await
572        .expect("Failed to setup nodes with chain import");
573
574        // Load and apply forkchoice state
575        let fcu_state = load_forkchoice_state(&fcu_path).expect("Failed to load forkchoice state");
576
577        let node = &result.nodes[0];
578
579        // Send forkchoice update to make the imported chain canonical
580        node.update_forkchoice(fcu_state.finalized_block_hash, fcu_state.head_block_hash)
581            .await
582            .expect("Failed to update forkchoice");
583
584        // Wait for the node to sync to the head
585        node.sync_to(fcu_state.head_block_hash).await.expect("Failed to sync to head");
586
587        // Verify the chain tip
588        let latest = node
589            .inner
590            .provider
591            .sealed_header_by_id(alloy_eips::BlockId::latest())
592            .expect("Failed to get latest header")
593            .expect("No latest header found");
594
595        assert_eq!(
596            latest.hash(),
597            fcu_state.head_block_hash,
598            "Chain tip does not match expected head"
599        );
600    }
601}