reth_e2e_test_utils/
setup_import.rs

1//! Setup utilities for importing RLP chain data before starting nodes.
2
3use crate::{node::NodeTestContext, NodeHelperType, Wallet};
4use reth_chainspec::ChainSpec;
5use reth_cli_commands::import_core::{import_blocks_from_file, ImportConfig};
6use reth_config::Config;
7use reth_db::DatabaseEnv;
8use reth_node_api::{NodeTypesWithDBAdapter, TreeConfig};
9use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig, NodeHandle};
10use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
11use reth_node_ethereum::EthereumNode;
12use reth_provider::{
13    providers::BlockchainProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
14    StaticFileProviderFactory,
15};
16use reth_rpc_server_types::RpcModuleSelection;
17use reth_stages_types::StageId;
18use reth_tasks::TaskManager;
19use std::{path::Path, sync::Arc};
20use tempfile::TempDir;
21use tracing::{debug, info, span, Level};
22
23/// Setup result containing nodes and temporary directories that must be kept alive
24pub struct ChainImportResult {
25    /// The nodes that were created
26    pub nodes: Vec<NodeHelperType<EthereumNode>>,
27    /// The task manager
28    pub task_manager: TaskManager,
29    /// The wallet for testing
30    pub wallet: Wallet,
31    /// Temporary directories that must be kept alive for the duration of the test
32    pub _temp_dirs: Vec<TempDir>,
33}
34
35impl std::fmt::Debug for ChainImportResult {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("ChainImportResult")
38            .field("nodes", &self.nodes.len())
39            .field("wallet", &self.wallet)
40            .field("temp_dirs", &self._temp_dirs.len())
41            .finish()
42    }
43}
44
45/// Creates a test setup with Ethereum nodes that have pre-imported chain data from RLP files.
46///
47/// This function:
48/// 1. Creates a temporary datadir for each node
49/// 2. Imports the specified RLP chain data into the datadir
50/// 3. Starts the nodes with the pre-populated database
51/// 4. Returns the running nodes ready for testing
52///
53/// Note: This function is currently specific to `EthereumNode` because the import process
54/// uses Ethereum-specific consensus and block format. It can be made generic in the future
55/// by abstracting the import process.
56/// It uses `NoopConsensus` during import to bypass validation checks like gas limit constraints,
57/// which allows importing test chains that may not strictly conform to mainnet consensus rules. The
58/// nodes themselves still run with proper consensus when started.
59pub async fn setup_engine_with_chain_import(
60    num_nodes: usize,
61    chain_spec: Arc<ChainSpec>,
62    is_dev: bool,
63    tree_config: TreeConfig,
64    rlp_path: &Path,
65    attributes_generator: impl Fn(u64) -> reth_payload_builder::EthPayloadBuilderAttributes
66        + Send
67        + Sync
68        + Copy
69        + 'static,
70) -> eyre::Result<ChainImportResult> {
71    let tasks = TaskManager::current();
72    let exec = tasks.executor();
73
74    let network_config = NetworkArgs {
75        discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
76        ..NetworkArgs::default()
77    };
78
79    // Create nodes with imported data
80    let mut nodes: Vec<NodeHelperType<EthereumNode>> = Vec::with_capacity(num_nodes);
81    let mut temp_dirs = Vec::with_capacity(num_nodes); // Keep temp dirs alive
82
83    for idx in 0..num_nodes {
84        // Create a temporary datadir for this node
85        let temp_dir = TempDir::new()?;
86        let datadir = temp_dir.path().to_path_buf();
87
88        let mut node_config = NodeConfig::new(chain_spec.clone())
89            .with_network(network_config.clone())
90            .with_unused_ports()
91            .with_rpc(
92                RpcServerArgs::default()
93                    .with_unused_ports()
94                    .with_http()
95                    .with_http_api(RpcModuleSelection::All),
96            )
97            .set_dev(is_dev);
98
99        // Set the datadir
100        node_config.datadir.datadir =
101            reth_node_core::dirs::MaybePlatformPath::from(datadir.clone());
102        debug!(target: "e2e::import", "Node {idx} datadir: {datadir:?}");
103
104        let span = span!(Level::INFO, "node", idx);
105        let _enter = span.enter();
106
107        // First, import the chain data into this datadir
108        info!(target: "test", "Importing chain data from {:?} for node {} into {:?}", rlp_path, idx, datadir);
109
110        // Create database path and static files path
111        let db_path = datadir.join("db");
112        let static_files_path = datadir.join("static_files");
113        let rocksdb_dir_path = datadir.join("rocksdb");
114
115        // Initialize the database using init_db (same as CLI import command)
116        // Use the same database arguments as the node will use
117        let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
118        let db_env = reth_db::init_db(&db_path, db_args)?;
119        let db = Arc::new(db_env);
120
121        // Create a provider factory with the initialized database (use regular DB, not
122        // TempDatabase) We need to specify the node types properly for the adapter
123        let provider_factory = ProviderFactory::<
124            NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>,
125        >::new(
126            db.clone(),
127            chain_spec.clone(),
128            reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
129            reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
130        )?;
131
132        // Initialize genesis if needed
133        reth_db_common::init::init_genesis(&provider_factory)?;
134
135        // Import the chain data
136        // Use no_state to skip state validation for test chains
137        let import_config = ImportConfig::default();
138        let config = Config::default();
139
140        // Create EVM and consensus for Ethereum
141        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
142        // Use NoopConsensus to skip gas limit validation for test imports
143        let consensus = reth_consensus::noop::NoopConsensus::arc();
144
145        let result = import_blocks_from_file(
146            rlp_path,
147            import_config,
148            provider_factory.clone(),
149            &config,
150            evm_config,
151            consensus,
152        )
153        .await?;
154
155        info!(
156            target: "test",
157            "Imported {} blocks and {} transactions for node {}",
158            result.total_imported_blocks,
159            result.total_imported_txns,
160            idx
161        );
162
163        debug!(target: "e2e::import",
164            "Import result for node {}: decoded {} blocks, imported {} blocks, complete: {}",
165            idx,
166            result.total_decoded_blocks,
167            result.total_imported_blocks,
168            result.is_complete()
169        );
170
171        if result.total_decoded_blocks != result.total_imported_blocks {
172            debug!(target: "e2e::import",
173                "Import block count mismatch: decoded {} != imported {}",
174                result.total_decoded_blocks, result.total_imported_blocks
175            );
176            return Err(eyre::eyre!("Chain import block count mismatch for node {}", idx));
177        }
178
179        if result.total_decoded_txns != result.total_imported_txns {
180            debug!(target: "e2e::import",
181                "Import transaction count mismatch: decoded {} != imported {}",
182                result.total_decoded_txns, result.total_imported_txns
183            );
184            return Err(eyre::eyre!("Chain import transaction count mismatch for node {}", idx));
185        }
186
187        // Verify the database was properly initialized by checking stage checkpoints
188        {
189            let provider = provider_factory.database_provider_ro()?;
190            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers)?;
191            if headers_checkpoint.is_none() {
192                return Err(eyre::eyre!("Headers stage checkpoint is missing after import!"));
193            }
194            debug!(target: "e2e::import", "Headers stage checkpoint after import: {headers_checkpoint:?}");
195            drop(provider);
196        }
197
198        // IMPORTANT: We need to properly flush and close the static files provider
199        // The static files provider may have open file handles that need to be closed
200        // before we can reopen the database in the node launcher
201        {
202            let static_file_provider = provider_factory.static_file_provider();
203            // This will ensure all static file writers are properly closed
204            drop(static_file_provider);
205        }
206
207        // Close all database handles to release locks before launching the node
208        drop(provider_factory);
209        drop(db);
210
211        // Give the OS a moment to release file locks
212        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213
214        // Now launch the node with the pre-populated datadir
215        debug!(target: "e2e::import", "Launching node with datadir: {:?}", datadir);
216
217        // Use the testing_node_with_datadir method which properly handles opening existing
218        // databases
219        let node = EthereumNode::default();
220
221        let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
222            .testing_node_with_datadir(exec.clone(), datadir.clone())
223            .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
224            .with_components(node.components_builder())
225            .with_add_ons(node.add_ons())
226            .launch_with_fn(|builder| {
227                let launcher = EngineNodeLauncher::new(
228                    builder.task_executor().clone(),
229                    builder.config().datadir(),
230                    tree_config.clone(),
231                );
232                builder.launch_with(launcher)
233            })
234            .await?;
235
236        let node_ctx = NodeTestContext::new(node, attributes_generator).await?;
237
238        nodes.push(node_ctx);
239        temp_dirs.push(temp_dir); // Keep temp dir alive
240    }
241
242    Ok(ChainImportResult {
243        nodes,
244        task_manager: tasks,
245        wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
246        _temp_dirs: temp_dirs,
247    })
248}
249
250/// Helper to load forkchoice state from a JSON file
251pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine::ForkchoiceState> {
252    let json_str = std::fs::read_to_string(path)?;
253    let fcu_data: serde_json::Value = serde_json::from_str(&json_str)?;
254
255    // The headfcu.json file contains a JSON-RPC request with the forkchoice state in params[0]
256    let state = &fcu_data["params"][0];
257    Ok(alloy_rpc_types_engine::ForkchoiceState {
258        head_block_hash: state["headBlockHash"]
259            .as_str()
260            .ok_or_else(|| eyre::eyre!("missing headBlockHash"))?
261            .parse()?,
262        safe_block_hash: state["safeBlockHash"]
263            .as_str()
264            .ok_or_else(|| eyre::eyre!("missing safeBlockHash"))?
265            .parse()?,
266        finalized_block_hash: state["finalizedBlockHash"]
267            .as_str()
268            .ok_or_else(|| eyre::eyre!("missing finalizedBlockHash"))?
269            .parse()?,
270    })
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
277    use reth_chainspec::{ChainSpecBuilder, MAINNET};
278    use reth_db::mdbx::DatabaseArguments;
279    use reth_payload_builder::EthPayloadBuilderAttributes;
280    use reth_primitives::SealedBlock;
281    use reth_provider::{
282        test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
283    };
284    use std::path::PathBuf;
285
286    #[tokio::test]
287    async fn test_stage_checkpoints_persistence() {
288        // This test specifically verifies that stage checkpoints are persisted correctly
289        // when reopening the database
290        reth_tracing::init_test_tracing();
291
292        let chain_spec = Arc::new(
293            ChainSpecBuilder::default()
294                .chain(MAINNET.chain)
295                .genesis(
296                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
297                )
298                .london_activated()
299                .shanghai_activated()
300                .build(),
301        );
302
303        // Generate test blocks
304        let test_blocks = generate_test_blocks(&chain_spec, 5);
305
306        // Create temporary files for RLP data
307        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
308        let rlp_path = temp_dir.path().join("test_chain.rlp");
309        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
310
311        // Create a persistent datadir that won't be deleted
312        let datadir = temp_dir.path().join("datadir");
313        std::fs::create_dir_all(&datadir).unwrap();
314        let db_path = datadir.join("db");
315        let static_files_path = datadir.join("static_files");
316        let rocksdb_dir_path = datadir.join("rocksdb");
317
318        // Import the chain
319        {
320            let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
321            let db = Arc::new(db_env);
322
323            let provider_factory: ProviderFactory<
324                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, Arc<DatabaseEnv>>,
325            > = ProviderFactory::new(
326                db.clone(),
327                chain_spec.clone(),
328                reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
329                    .unwrap(),
330                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
331                    .build()
332                    .unwrap(),
333            )
334            .expect("failed to create provider factory");
335
336            // Initialize genesis
337            reth_db_common::init::init_genesis(&provider_factory).unwrap();
338
339            // Import the chain data
340            let import_config = ImportConfig::default();
341            let config = Config::default();
342            let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
343            // Use NoopConsensus to skip gas limit validation for test imports
344            let consensus = reth_consensus::noop::NoopConsensus::arc();
345
346            let result = import_blocks_from_file(
347                &rlp_path,
348                import_config,
349                provider_factory.clone(),
350                &config,
351                evm_config,
352                consensus,
353            )
354            .await
355            .unwrap();
356
357            assert_eq!(result.total_decoded_blocks, 5);
358            assert_eq!(result.total_imported_blocks, 5);
359
360            // Verify stage checkpoints exist
361            let provider = provider_factory.database_provider_ro().unwrap();
362            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
363            assert!(headers_checkpoint.is_some(), "Headers checkpoint should exist after import");
364            assert_eq!(
365                headers_checkpoint.unwrap().block_number,
366                5,
367                "Headers checkpoint should be at block 5"
368            );
369            drop(provider);
370
371            // Properly close static files to release all file handles
372            let static_file_provider = provider_factory.static_file_provider();
373            drop(static_file_provider);
374
375            drop(provider_factory);
376            drop(db);
377        }
378
379        // Give the OS a moment to release file locks
380        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
381
382        // Now reopen the database and verify checkpoints are still there
383        {
384            let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
385            let db = Arc::new(db_env);
386
387            let provider_factory: ProviderFactory<
388                NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, Arc<DatabaseEnv>>,
389            > = ProviderFactory::new(
390                db,
391                chain_spec.clone(),
392                reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
393                    .unwrap(),
394                reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
395                    .build()
396                    .unwrap(),
397            )
398            .expect("failed to create provider factory");
399
400            let provider = provider_factory.database_provider_ro().unwrap();
401
402            // Check that stage checkpoints are still present
403            let headers_checkpoint = provider.get_stage_checkpoint(StageId::Headers).unwrap();
404            assert!(
405                headers_checkpoint.is_some(),
406                "Headers checkpoint should still exist after reopening database"
407            );
408            assert_eq!(
409                headers_checkpoint.unwrap().block_number,
410                5,
411                "Headers checkpoint should still be at block 5"
412            );
413
414            // Verify we can read blocks
415            let block_5_hash = provider.block_hash(5).unwrap();
416            assert!(block_5_hash.is_some(), "Block 5 should exist in database");
417            assert_eq!(block_5_hash.unwrap(), test_blocks[4].hash(), "Block 5 hash should match");
418
419            // Check all stage checkpoints
420            debug!(target: "e2e::import", "All stage checkpoints after reopening:");
421            for stage in StageId::ALL {
422                let checkpoint = provider.get_stage_checkpoint(stage).unwrap();
423                debug!(target: "e2e::import", "  Stage {stage:?}: {checkpoint:?}");
424            }
425        }
426    }
427
428    /// Helper to create test chain spec
429    fn create_test_chain_spec() -> Arc<ChainSpec> {
430        Arc::new(
431            ChainSpecBuilder::default()
432                .chain(MAINNET.chain)
433                .genesis(
434                    serde_json::from_str(include_str!("testsuite/assets/genesis.json")).unwrap(),
435                )
436                .london_activated()
437                .shanghai_activated()
438                .build(),
439        )
440    }
441
442    /// Helper to setup test blocks and write to RLP
443    async fn setup_test_blocks_and_rlp(
444        chain_spec: &ChainSpec,
445        block_count: u64,
446        temp_dir: &Path,
447    ) -> (Vec<SealedBlock>, PathBuf) {
448        let test_blocks = generate_test_blocks(chain_spec, block_count);
449        assert_eq!(
450            test_blocks.len(),
451            block_count as usize,
452            "Should have generated expected blocks"
453        );
454
455        let rlp_path = temp_dir.join("test_chain.rlp");
456        write_blocks_to_rlp(&test_blocks, &rlp_path).expect("Failed to write RLP data");
457
458        let rlp_size = std::fs::metadata(&rlp_path).expect("RLP file should exist").len();
459        debug!(target: "e2e::import", "Wrote RLP file with size: {rlp_size} bytes");
460
461        (test_blocks, rlp_path)
462    }
463
464    #[tokio::test]
465    async fn test_import_blocks_only() {
466        // Tests just the block import functionality without full node setup
467        reth_tracing::init_test_tracing();
468
469        let chain_spec = create_test_chain_spec();
470        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
471        let (test_blocks, rlp_path) =
472            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
473
474        // Create a test database
475        let datadir = temp_dir.path().join("datadir");
476        std::fs::create_dir_all(&datadir).unwrap();
477        let db_path = datadir.join("db");
478        let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
479        let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
480
481        // Create static files path
482        let static_files_path = datadir.join("static_files");
483
484        // Create rocksdb path
485        let rocksdb_dir_path = datadir.join("rocksdb");
486
487        // Create a provider factory
488        let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
489            db.clone(),
490            chain_spec.clone(),
491            reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
492            reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
493        )
494        .expect("failed to create provider factory");
495
496        // Initialize genesis
497        reth_db_common::init::init_genesis(&provider_factory).unwrap();
498
499        // Import the chain data
500        let import_config = ImportConfig::default();
501        let config = Config::default();
502        let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
503        // Use NoopConsensus to skip gas limit validation for test imports
504        let consensus = reth_consensus::noop::NoopConsensus::arc();
505
506        let result = import_blocks_from_file(
507            &rlp_path,
508            import_config,
509            provider_factory.clone(),
510            &config,
511            evm_config,
512            consensus,
513        )
514        .await
515        .unwrap();
516
517        debug!(target: "e2e::import",
518            "Import result: decoded {} blocks, imported {} blocks",
519            result.total_decoded_blocks, result.total_imported_blocks
520        );
521
522        // Verify the import was successful
523        assert_eq!(result.total_decoded_blocks, 10);
524        assert_eq!(result.total_imported_blocks, 10);
525        assert_eq!(result.total_decoded_txns, 0);
526        assert_eq!(result.total_imported_txns, 0);
527
528        // Verify we can read the imported blocks
529        let provider = provider_factory.database_provider_ro().unwrap();
530        let latest_block = provider.last_block_number().unwrap();
531        assert_eq!(latest_block, 10, "Should have imported up to block 10");
532
533        let block_10_hash = provider.block_hash(10).unwrap().expect("Block 10 should exist");
534        assert_eq!(block_10_hash, test_blocks[9].hash(), "Block 10 hash should match");
535    }
536
537    #[tokio::test]
538    async fn test_import_with_node_integration() {
539        // Tests the full integration with node setup, forkchoice updates, and syncing
540        reth_tracing::init_test_tracing();
541
542        let chain_spec = create_test_chain_spec();
543        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
544        let (test_blocks, rlp_path) =
545            setup_test_blocks_and_rlp(&chain_spec, 10, temp_dir.path()).await;
546
547        // Create FCU data for the tip
548        let tip = test_blocks.last().expect("Should have generated blocks");
549        let fcu_path = temp_dir.path().join("test_fcu.json");
550        std::fs::write(&fcu_path, create_fcu_json(tip).to_string())
551            .expect("Failed to write FCU data");
552
553        // Setup nodes with imported chain
554        let result = setup_engine_with_chain_import(
555            1,
556            chain_spec,
557            false,
558            TreeConfig::default(),
559            &rlp_path,
560            |_| EthPayloadBuilderAttributes::default(),
561        )
562        .await
563        .expect("Failed to setup nodes with chain import");
564
565        // Load and apply forkchoice state
566        let fcu_state = load_forkchoice_state(&fcu_path).expect("Failed to load forkchoice state");
567
568        let node = &result.nodes[0];
569
570        // Send forkchoice update to make the imported chain canonical
571        node.update_forkchoice(fcu_state.finalized_block_hash, fcu_state.head_block_hash)
572            .await
573            .expect("Failed to update forkchoice");
574
575        // Wait for the node to sync to the head
576        node.sync_to(fcu_state.head_block_hash).await.expect("Failed to sync to head");
577
578        // Verify the chain tip
579        let latest = node
580            .inner
581            .provider
582            .sealed_header_by_id(alloy_eips::BlockId::latest())
583            .expect("Failed to get latest header")
584            .expect("No latest header found");
585
586        assert_eq!(
587            latest.hash(),
588            fcu_state.head_block_hash,
589            "Chain tip does not match expected head"
590        );
591    }
592}