Skip to main content

reth_e2e_test_utils/
setup_import.rs

1//! Setup utilities for importing RLP chain data before starting nodes.
2
3use crate::{node::NodeTestContext, NodeHelperType, Wallet};
4use reth_chainspec::ChainSpec;
5use reth_cli_commands::import_core::{import_blocks_from_file, ImportConfig};
6use reth_config::Config;
7use reth_db::DatabaseEnv;
8use reth_node_api::{NodeTypesWithDBAdapter, TreeConfig};
9use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig, NodeHandle};
10use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
11use reth_node_ethereum::EthereumNode;
12use reth_provider::{
13    providers::BlockchainProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
14    StaticFileProviderFactory,
15};
16use reth_rpc_server_types::RpcModuleSelection;
17use reth_stages_types::StageId;
18use reth_tasks::Runtime;
19use std::{path::Path, sync::Arc};
20use tempfile::TempDir;
21use tracing::{debug, info, span, Level};
22
23/// Setup result containing nodes and temporary directories that must be kept alive
24pub struct ChainImportResult {
25    /// The nodes that were created
26    pub nodes: Vec<NodeHelperType<EthereumNode>>,
27    /// The wallet for testing
28    pub wallet: Wallet,
29    /// Temporary directories that must be kept alive for the duration of the test
30    pub _temp_dirs: Vec<TempDir>,
31}
32
33impl std::fmt::Debug for ChainImportResult {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("ChainImportResult")
36            .field("nodes", &self.nodes.len())
37            .field("wallet", &self.wallet)
38            .field("temp_dirs", &self._temp_dirs.len())
39            .finish()
40    }
41}
42
43/// Creates a test setup with Ethereum nodes that have pre-imported chain data from RLP files.
44///
45/// This function:
46/// 1. Creates a temporary datadir for each node
47/// 2. Imports the specified RLP chain data into the datadir
48/// 3. Starts the nodes with the pre-populated database
49/// 4. Returns the running nodes ready for testing
50///
51/// Note: This function is currently specific to `EthereumNode` because the import process
52/// uses Ethereum-specific consensus and block format. It can be made generic in the future
53/// by abstracting the import process.
54/// It uses `NoopConsensus` during import to bypass validation checks like gas limit constraints,
55/// which allows importing test chains that may not strictly conform to mainnet consensus rules. The
56/// nodes themselves still run with proper consensus when started.
57pub async fn setup_engine_with_chain_import(
58    num_nodes: usize,
59    chain_spec: Arc<ChainSpec>,
60    is_dev: bool,
61    tree_config: TreeConfig,
62    rlp_path: &Path,
63    attributes_generator: impl Fn(u64) -> reth_payload_builder::EthPayloadBuilderAttributes
64        + Send
65        + Sync
66        + Copy
67        + 'static,
68) -> eyre::Result<ChainImportResult> {
69    let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
70
71    let network_config = NetworkArgs {
72        discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
73        ..NetworkArgs::default()
74    };
75
76    // Create nodes with imported data
77    let mut nodes: Vec<NodeHelperType<EthereumNode>> = Vec::with_capacity(num_nodes);
78    let mut temp_dirs = Vec::with_capacity(num_nodes); // Keep temp dirs alive
79
80    for idx in 0..num_nodes {
81        // Create a temporary datadir for this node
82        let temp_dir = TempDir::new()?;
83        let datadir = temp_dir.path().to_path_buf();
84
85        let mut node_config = NodeConfig::new(chain_spec.clone())
86            .with_network(network_config.clone())
87            .with_unused_ports()
88            .with_rpc(
89                RpcServerArgs::default()
90                    .with_unused_ports()
91                    .with_http()
92                    .with_http_api(RpcModuleSelection::All),
93            )
94            .set_dev(is_dev);
95
96        // Set the datadir
97        node_config.datadir.datadir =
98            reth_node_core::dirs::MaybePlatformPath::from(datadir.clone());
99        debug!(target: "e2e::import", "Node {idx} datadir: {datadir:?}");
100
101        let span = span!(Level::INFO, "node", idx);
102        let _enter = span.enter();
103
104        // First, import the chain data into this datadir
105        info!(target: "test", "Importing chain data from {:?} for node {} into {:?}", rlp_path, idx, datadir);
106
107        // Create database path and static files path
108        let db_path = datadir.join("db");
109        let static_files_path = datadir.join("static_files");
110        let rocksdb_dir_path = datadir.join("rocksdb");
111
112        // Initialize the database using init_db (same as CLI import command)
113        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
114        let db = reth_db::init_db(&db_path, db_args)?;
115
116        // Create a provider factory with the initialized database (use regular DB, not
117        // TempDatabase) We need to specify the node types properly for the adapter
118        let provider_factory =
119            ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
120                db.clone(),
121                chain_spec.clone(),
122                reth_provider::providers::StaticFileProvider::read_write(
123                    static_files_path.clone(),
124                )?,
125                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
126                    .with_default_tables()
127                    .build()
128                    .unwrap(),
129                reth_tasks::Runtime::test(),
130            )?;
131
132        // Initialize genesis if needed
133        reth_db_common::init::init_genesis(&provider_factory)?;
134
135        // Import the chain data
136        // Use no_state to skip state validation for test chains
137        let import_config = ImportConfig::default();
138        let config = Config::default();
139
140        // Create EVM and consensus for Ethereum
141        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
142        // Use NoopConsensus to skip gas limit validation for test imports
143        let consensus = reth_consensus::noop::NoopConsensus::arc();
144
145        let result = import_blocks_from_file(
146            rlp_path,
147            import_config,
148            provider_factory.clone(),
149            &config,
150            evm_config,
151            consensus,
152        )
153        .await?;
154
155        info!(
156            target: "test",
157            "Imported {} blocks and {} transactions for node {}",
158            result.total_imported_blocks,
159            result.total_imported_txns,
160            idx
161        );
162
163        debug!(target: "e2e::import",
164            "Import result for node {}: decoded {} blocks, imported {} blocks, complete: {}",
165            idx,
166            result.total_decoded_blocks,
167            result.total_imported_blocks,
168            result.is_complete()
169        );
170
171        if result.total_decoded_blocks != result.total_imported_blocks {
172            debug!(target: "e2e::import",
173                "Import block count mismatch: decoded {} != imported {}",
174                result.total_decoded_blocks, result.total_imported_blocks
175            );
176            return Err(eyre::eyre!("Chain import block count mismatch for node {}", idx));
177        }
178
179        if result.total_decoded_txns != result.total_imported_txns {
180            debug!(target: "e2e::import",
181                "Import transaction count mismatch: decoded {} != imported {}",
182                result.total_decoded_txns, result.total_imported_txns
183            );
184            return Err(eyre::eyre!("Chain import transaction count mismatch for node {}", idx));
185        }
186
187        // Verify the database was properly initialized by checking stage checkpoints
188        {
189            let provider = provider_factory.database_provider_ro()?;
190            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers)?;
191            if headers_checkpoint.is_none() {
192                return Err(eyre::eyre!("Headers stage checkpoint is missing after import!"));
193            }
194            debug!(target: "e2e::import", "Headers stage checkpoint after import: {headers_checkpoint:?}");
195            drop(provider);
196        }
197
198        // IMPORTANT: We need to properly flush and close the static files provider
199        // The static files provider may have open file handles that need to be closed
200        // before we can reopen the database in the node launcher
201        {
202            let static_file_provider = provider_factory.static_file_provider();
203            // This will ensure all static file writers are properly closed
204            drop(static_file_provider);
205        }
206
207        // Close all database handles to release locks before launching the node
208        drop(provider_factory);
209        drop(db);
210
211        // Give the OS a moment to release file locks
212        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213
214        // Now launch the node with the pre-populated datadir
215        debug!(target: "e2e::import", "Launching node with datadir: {:?}", datadir);
216
217        // Use the testing_node_with_datadir method which properly handles opening existing
218        // databases
219        let node = EthereumNode::default();
220
221        let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
222            .testing_node_with_datadir(runtime.clone(), datadir.clone())
223            .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
224            .with_components(node.components_builder())
225            .with_add_ons(node.add_ons())
226            .launch_with_fn(|builder| {
227                let launcher = EngineNodeLauncher::new(
228                    builder.task_executor().clone(),
229                    builder.config().datadir(),
230                    tree_config.clone(),
231                );
232                builder.launch_with(launcher)
233            })
234            .await?;
235
236        let node_ctx = NodeTestContext::new(node, attributes_generator).await?;
237
238        nodes.push(node_ctx);
239        temp_dirs.push(temp_dir); // Keep temp dir alive
240    }
241
242    Ok(ChainImportResult {
243        nodes,
244        wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
245        _temp_dirs: temp_dirs,
246    })
247}
248
249/// Helper to load forkchoice state from a JSON file
250pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine::ForkchoiceState> {
251    let json_str = std::fs::read_to_string(path)?;
252    let fcu_data: serde_json::Value = serde_json::from_str(&json_str)?;
253
254    // The headfcu.json file contains a JSON-RPC request with the forkchoice state in params[0]
255    let state = &fcu_data["params"][0];
256    Ok(alloy_rpc_types_engine::ForkchoiceState {
257        head_block_hash: state["headBlockHash"]
258            .as_str()
259            .ok_or_else(|| eyre::eyre!("missing headBlockHash"))?
260            .parse()?,
261        safe_block_hash: state["safeBlockHash"]
262            .as_str()
263            .ok_or_else(|| eyre::eyre!("missing safeBlockHash"))?
264            .parse()?,
265        finalized_block_hash: state["finalizedBlockHash"]
266            .as_str()
267            .ok_or_else(|| eyre::eyre!("missing finalizedBlockHash"))?
268            .parse()?,
269    })
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
276    use reth_chainspec::{ChainSpecBuilder, MAINNET};
277    use reth_db::mdbx::DatabaseArguments;
278    use reth_payload_builder::EthPayloadBuilderAttributes;
279    use reth_primitives::SealedBlock;
280    use reth_provider::{
281        test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
282    };
283    use std::path::PathBuf;
284
285    #[tokio::test]
286    async fn test_stage_checkpoints_persistence() {
287        // This test specifically verifies that stage checkpoints are persisted correctly
288        // when reopening the database
289        reth_tracing::init_test_tracing();
290
291        let chain_spec = Arc::new(
292            ChainSpecBuilder::default()
293                .chain(MAINNET.chain)
294                .genesis(
295                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
296                )
297                .london_activated()
298                .shanghai_activated()
299                .build(),
300        );
301
302        // Generate test blocks
303        let test_blocks = generate_test_blocks(&chain_spec, 5);
304
305        // Create temporary files for RLP data
306        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
307        let rlp_path = temp_dir.path().join("test_chain.rlp");
308        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
309
310        // Create a persistent datadir that won't be deleted
311        let datadir = temp_dir.path().join("datadir");
312        std::fs::create_dir_all(&datadir).unwrap();
313        let db_path = datadir.join("db");
314        let static_files_path = datadir.join("static_files");
315        let rocksdb_dir_path = datadir.join("rocksdb");
316
317        // Import the chain
318        {
319            let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
320            let db = reth_db::init_db(&db_path, db_args).unwrap();
321
322            let provider_factory: ProviderFactory<
323                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
324            > = ProviderFactory::new(
325                db.clone(),
326                chain_spec.clone(),
327                reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
328                    .unwrap(),
329                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
330                    .with_default_tables()
331                    .build()
332                    .unwrap(),
333                reth_tasks::Runtime::test(),
334            )
335            .expect("failed to create provider factory");
336
337            // Initialize genesis
338            reth_db_common::init::init_genesis(&provider_factory).unwrap();
339
340            // Import the chain data
341            let import_config = ImportConfig::default();
342            let config = Config::default();
343            let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
344            // Use NoopConsensus to skip gas limit validation for test imports
345            let consensus = reth_consensus::noop::NoopConsensus::arc();
346
347            let result = import_blocks_from_file(
348                &rlp_path,
349                import_config,
350                provider_factory.clone(),
351                &config,
352                evm_config,
353                consensus,
354            )
355            .await
356            .unwrap();
357
358            assert_eq!(result.total_decoded_blocks, 5);
359            assert_eq!(result.total_imported_blocks, 5);
360
361            // Verify stage checkpoints exist
362            let provider = provider_factory.database_provider_ro().unwrap();
363            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
364            assert!(headers_checkpoint.is_some(), "Headers checkpoint should exist after import");
365            assert_eq!(
366                headers_checkpoint.unwrap().block_number,
367                5,
368                "Headers checkpoint should be at block 5"
369            );
370            drop(provider);
371
372            // Properly close static files to release all file handles
373            let static_file_provider = provider_factory.static_file_provider();
374            drop(static_file_provider);
375
376            drop(provider_factory);
377            drop(db);
378        }
379
380        // Give the OS a moment to release file locks
381        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
382
383        // Now reopen the database and verify checkpoints are still there
384        {
385            let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
386
387            let provider_factory: ProviderFactory<
388                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
389            > = ProviderFactory::new(
390                db,
391                chain_spec.clone(),
392                reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
393                    .unwrap(),
394                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
395                    .with_default_tables()
396                    .build()
397                    .unwrap(),
398                reth_tasks::Runtime::test(),
399            )
400            .expect("failed to create provider factory");
401
402            let provider = provider_factory.database_provider_ro().unwrap();
403
404            // Check that stage checkpoints are still present
405            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
406            assert!(
407                headers_checkpoint.is_some(),
408                "Headers checkpoint should still exist after reopening database"
409            );
410            assert_eq!(
411                headers_checkpoint.unwrap().block_number,
412                5,
413                "Headers checkpoint should still be at block 5"
414            );
415
416            // Verify we can read blocks
417            let block_5_hash = provider.block_hash(5).unwrap();
418            assert!(block_5_hash.is_some(), "Block 5 should exist in database");
419            assert_eq!(block_5_hash.unwrap(), test_blocks[4].hash(), "Block 5 hash should match");
420
421            // Check all stage checkpoints
422            debug!(target: "e2e::import", "All stage checkpoints after reopening:");
423            for stage in StageId::ALL {
424                let checkpoint = provider.get_stage_checkpoint(stage).unwrap();
425                debug!(target: "e2e::import", "  Stage {stage:?}: {checkpoint:?}");
426            }
427        }
428    }
429
430    /// Helper to create test chain spec
431    fn create_test_chain_spec() -> Arc<ChainSpec> {
432        Arc::new(
433            ChainSpecBuilder::default()
434                .chain(MAINNET.chain)
435                .genesis(
436                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
437                )
438                .london_activated()
439                .shanghai_activated()
440                .build(),
441        )
442    }
443
444    /// Helper to setup test blocks and write to RLP
445    async fn setup_test_blocks_and_rlp(
446        chain_spec: &ChainSpec,
447        block_count: u64,
448        temp_dir: &Path,
449    ) -> (Vec<SealedBlock>, PathBuf) {
450        let test_blocks = generate_test_blocks(chain_spec, block_count);
451        assert_eq!(
452            test_blocks.len(),
453            block_count as usize,
454            "Should have generated expected blocks"
455        );
456
457        let rlp_path = temp_dir.join("test_chain.rlp");
458        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
459
460        let rlp_size = std::fs::metadata(&rlp_path).expect("RLP file should exist").len();
461        debug!(target: "e2e::import", "Wrote RLP file with size: {rlp_size} bytes");
462
463        (test_blocks, rlp_path)
464    }
465
466    #[tokio::test]
467    async fn test_import_blocks_only() {
468        // Tests just the block import functionality without full node setup
469        reth_tracing::init_test_tracing();
470
471        let chain_spec = create_test_chain_spec();
472        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
473        let (test_blocks, rlp_path) =
474            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
475
476        // Create a test database
477        let datadir = temp_dir.path().join("datadir");
478        std::fs::create_dir_all(&datadir).unwrap();
479        let db_path = datadir.join("db");
480        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
481        let db_env = reth_db::init_db(&db_path, db_args).unwrap();
482        let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
483
484        // Create static files path
485        let static_files_path = datadir.join("static_files");
486
487        // Create rocksdb path
488        let rocksdb_dir_path = datadir.join("rocksdb");
489
490        // Create a provider factory
491        let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
492            db.clone(),
493            chain_spec.clone(),
494            reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
495            reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
496                .with_default_tables()
497                .build()
498                .unwrap(),
499            reth_tasks::Runtime::test(),
500        )
501        .expect("failed to create provider factory");
502
503        // Initialize genesis
504        reth_db_common::init::init_genesis(&provider_factory).unwrap();
505
506        // Import the chain data
507        let import_config = ImportConfig::default();
508        let config = Config::default();
509        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
510        // Use NoopConsensus to skip gas limit validation for test imports
511        let consensus = reth_consensus::noop::NoopConsensus::arc();
512
513        let result = import_blocks_from_file(
514            &rlp_path,
515            import_config,
516            provider_factory.clone(),
517            &config,
518            evm_config,
519            consensus,
520        )
521        .await
522        .unwrap();
523
524        debug!(target: "e2e::import",
525            "Import result: decoded {} blocks, imported {} blocks",
526            result.total_decoded_blocks, result.total_imported_blocks
527        );
528
529        // Verify the import was successful
530        assert_eq!(result.total_decoded_blocks, 10);
531        assert_eq!(result.total_imported_blocks, 10);
532        assert_eq!(result.total_decoded_txns, 0);
533        assert_eq!(result.total_imported_txns, 0);
534
535        // Verify we can read the imported blocks
536        let provider = provider_factory.database_provider_ro().unwrap();
537        let latest_block = provider.last_block_number().unwrap();
538        assert_eq!(latest_block, 10, "Should have imported up to block 10");
539
540        let block_10_hash = provider.block_hash(10).unwrap().expect("Block 10 should exist");
541        assert_eq!(block_10_hash, test_blocks[9].hash(), "Block 10 hash should match");
542    }
543
544    #[tokio::test]
545    async fn test_import_with_node_integration() {
546        // Tests the full integration with node setup, forkchoice updates, and syncing
547        reth_tracing::init_test_tracing();
548
549        let chain_spec = create_test_chain_spec();
550        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
551        let (test_blocks, rlp_path) =
552            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
553
554        // Create FCU data for the tip
555        let tip = test_blocks.last().expect("Should have generated blocks");
556        let fcu_path = temp_dir.path().join("test_fcu.json");
557        std::fs::write(&fcu_path, create_fcu_json(tip).to_string())
558            .expect("Failed to write FCU data");
559
560        // Setup nodes with imported chain
561        let result = setup_engine_with_chain_import(
562            1,
563            chain_spec,
564            false,
565            TreeConfig::default(),
566            &rlp_path,
567            |_| EthPayloadBuilderAttributes::default(),
568        )
569        .await
570        .expect("Failed to setup nodes with chain import");
571
572        // Load and apply forkchoice state
573        let fcu_state = load_forkchoice_state(&fcu_path).expect("Failed to load forkchoice state");
574
575        let node = &result.nodes[0];
576
577        // Send forkchoice update to make the imported chain canonical
578        node.update_forkchoice(fcu_state.finalized_block_hash, fcu_state.head_block_hash)
579            .await
580            .expect("Failed to update forkchoice");
581
582        // Wait for the node to sync to the head
583        node.sync_to(fcu_state.head_block_hash).await.expect("Failed to sync to head");
584
585        // Verify the chain tip
586        let latest = node
587            .inner
588            .provider
589            .sealed_header_by_id(alloy_eips::BlockId::latest())
590            .expect("Failed to get latest header")
591            .expect("No latest header found");
592
593        assert_eq!(
594            latest.hash(),
595            fcu_state.head_block_hash,
596            "Chain tip does not match expected head"
597        );
598    }
599}