Skip to main content

reth_db_common/
init.rs

1//! Reth genesis initialization utility functions.
2
3use alloy_consensus::BlockHeader;
4use alloy_genesis::GenesisAccount;
5use alloy_primitives::{
6    keccak256,
7    map::{AddressMap, B256Map, B256Set, HashMap},
8    Address, B256, U256,
9};
10use reth_chainspec::EthChainSpec;
11use reth_codecs::Compact;
12use reth_config::config::EtlConfig;
13use reth_db_api::{
14    cursor::{DbCursorRW, DbDupCursorRW},
15    models::{
16        storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, IntegerList,
17        ShardedKey,
18    },
19    tables,
20    transaction::DbTxMut,
21    DatabaseError,
22};
23use reth_etl::Collector;
24use reth_execution_errors::StateRootError;
25use reth_primitives_traits::{
26    Account, Bytecode, GotExpected, NodePrimitives, SealedHeader, StorageEntry,
27};
28use reth_provider::{
29    errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
30    BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
31    HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider, MetadataWriter,
32    NodePrimitivesProvider, OriginalValuesKnown, ProviderError, RevertsInit,
33    RocksDBProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
34    StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
35};
36use reth_stages_types::{StageCheckpoint, StageId};
37use reth_static_file_types::StaticFileSegment;
38use reth_trie::{
39    prefix_set::TriePrefixSets, IntermediateStateRootState, StateRoot as StateRootComputer,
40    StateRootProgress,
41};
42use reth_trie_db::DatabaseStateRoot;
43
44type DbStateRoot<'a, TX, A> = StateRootComputer<
45    reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>,
46    reth_trie_db::DatabaseHashedCursorFactory<&'a TX>,
47>;
48
49use serde::{Deserialize, Serialize};
50use std::io::BufRead;
51use tracing::{debug, error, info, trace, warn};
52
53pub use reth_provider::init::{
54    insert_account_history, insert_genesis_account_history, insert_genesis_history,
55    insert_genesis_storage_history, insert_history, insert_storage_history,
56};
57
58/// Default soft limit for number of bytes to read from state dump file, before inserting into
59/// database.
60///
61/// Default is 1 GB.
62pub const DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK: usize = 1_000_000_000;
63
64/// Soft limit for the number of flushed updates after which to log progress summary.
65const SOFT_LIMIT_COUNT_FLUSHED_UPDATES: usize = 1_000_000;
66
67/// Max number of storage "units" (1 per account + 1 per storage slot) before committing
68/// the current MDBX transaction and opening a new one. This bounds dirty page accumulation
69/// and prevents OOM on large state imports.
70const STORAGE_COMMIT_THRESHOLD: usize = 100_000;
71
72/// Max number of trie updates retained before init-state state root computation commits progress.
73const STATE_ROOT_COMMIT_THRESHOLD: u64 = 25_000;
74
75/// Storage initialization error type.
76#[derive(Debug, thiserror::Error, Clone)]
77pub enum InitStorageError {
78    /// Genesis header found on static files but the database is empty.
79    #[error(
80        "static files found, but the database is uninitialized. If attempting to re-syncing, delete both."
81    )]
82    UninitializedDatabase,
83    /// An existing genesis block was found in the database, and its hash did not match the hash of
84    /// the chainspec.
85    #[error(
86        "genesis hash in the storage does not match the specified chainspec: chainspec is {chainspec_hash}, database is {storage_hash}"
87    )]
88    GenesisHashMismatch {
89        /// Expected genesis hash.
90        chainspec_hash: B256,
91        /// Actual genesis hash.
92        storage_hash: B256,
93    },
94    /// Provider error.
95    #[error(transparent)]
96    Provider(#[from] ProviderError),
97    /// State root error while computing the state root
98    #[error(transparent)]
99    StateRootError(#[from] StateRootError),
100    /// State root doesn't match the expected one.
101    #[error("state root mismatch: {_0}")]
102    StateRootMismatch(GotExpected<B256>),
103}
104
105impl From<DatabaseError> for InitStorageError {
106    fn from(error: DatabaseError) -> Self {
107        Self::Provider(ProviderError::Database(error))
108    }
109}
110
111/// Write the genesis block if it has not already been written
112pub fn init_genesis<PF>(factory: &PF) -> Result<B256, InitStorageError>
113where
114    PF: DatabaseProviderFactory
115        + StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Compact>>
116        + ChainSpecProvider
117        + StageCheckpointReader
118        + BlockNumReader
119        + MetadataProvider
120        + StorageSettingsCache,
121    PF::ProviderRW: StaticFileProviderFactory<Primitives = PF::Primitives>
122        + StageCheckpointWriter
123        + HistoryWriter
124        + HeaderProvider
125        + HashingWriter
126        + StateWriter
127        + TrieWriter
128        + MetadataWriter
129        + ChainSpecProvider
130        + StorageSettingsCache
131        + RocksDBProviderFactory
132        + NodePrimitivesProvider
133        + AsRef<PF::ProviderRW>,
134    PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
135{
136    init_genesis_with_settings(factory, StorageSettings::base())
137}
138
139/// Write the genesis block if it has not already been written with [`StorageSettings`].
140pub fn init_genesis_with_settings<PF>(
141    factory: &PF,
142    genesis_storage_settings: StorageSettings,
143) -> Result<B256, InitStorageError>
144where
145    PF: DatabaseProviderFactory
146        + StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Compact>>
147        + ChainSpecProvider
148        + StageCheckpointReader
149        + BlockNumReader
150        + MetadataProvider
151        + StorageSettingsCache,
152    PF::ProviderRW: StaticFileProviderFactory<Primitives = PF::Primitives>
153        + StageCheckpointWriter
154        + HistoryWriter
155        + HeaderProvider
156        + HashingWriter
157        + StateWriter
158        + TrieWriter
159        + MetadataWriter
160        + ChainSpecProvider
161        + StorageSettingsCache
162        + RocksDBProviderFactory
163        + NodePrimitivesProvider
164        + AsRef<PF::ProviderRW>,
165    PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
166{
167    init_genesis_with_settings_and_validate(factory, genesis_storage_settings, true)
168}
169
170/// Write the genesis block if it has not already been written with [`StorageSettings`],
171/// optionally validating the DB-resident genesis hash against the chainspec hash.
172pub fn init_genesis_with_settings_and_validate<PF>(
173    factory: &PF,
174    genesis_storage_settings: StorageSettings,
175    validate_genesis_hash: bool,
176) -> Result<B256, InitStorageError>
177where
178    PF: DatabaseProviderFactory
179        + StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Compact>>
180        + ChainSpecProvider
181        + StageCheckpointReader
182        + BlockNumReader
183        + MetadataProvider
184        + StorageSettingsCache,
185    PF::ProviderRW: StaticFileProviderFactory<Primitives = PF::Primitives>
186        + StageCheckpointWriter
187        + HistoryWriter
188        + HeaderProvider
189        + HashingWriter
190        + StateWriter
191        + TrieWriter
192        + MetadataWriter
193        + ChainSpecProvider
194        + StorageSettingsCache
195        + RocksDBProviderFactory
196        + NodePrimitivesProvider
197        + AsRef<PF::ProviderRW>,
198    PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
199{
200    let chain = factory.chain_spec();
201
202    let genesis = chain.genesis();
203    let hash = chain.genesis_hash();
204
205    // Get the genesis block number from the chain spec
206    let genesis_block_number = chain.genesis_header().number();
207
208    // Check if we already have the genesis header or if we have the wrong one.
209    match factory.block_hash(genesis_block_number) {
210        Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, _)) => {}
211        Ok(Some(block_hash)) => {
212            if block_hash == hash {
213                // Some users will at times attempt to re-sync from scratch by just deleting the
214                // database. Since `factory.block_hash` will only query the static files, we need to
215                // make sure that our database has been written to, and throw error if it's empty.
216                if factory.get_stage_checkpoint(StageId::Headers)?.is_none() {
217                    error!(target: "reth::storage", "Genesis header found on static files, but database is uninitialized.");
218                    return Err(InitStorageError::UninitializedDatabase)
219                }
220
221                let stored = factory.storage_settings()?.unwrap_or_else(StorageSettings::v1);
222                if stored != genesis_storage_settings {
223                    warn!(
224                        target: "reth::storage",
225                        ?stored,
226                        requested = ?genesis_storage_settings,
227                        "Storage settings mismatch detected. Using the stored settings from the existing database."
228                    );
229                }
230
231                debug!("Genesis already written, skipping.");
232                return Ok(hash)
233            }
234
235            if !validate_genesis_hash {
236                warn!(
237                    target: "reth::storage",
238                    chainspec_hash = %hash,
239                    storage_hash = %block_hash,
240                    "Genesis hash mismatch with chainspec; trusting DB per --debug.skip-genesis-validation"
241                );
242                return Ok(block_hash)
243            }
244            return Err(InitStorageError::GenesisHashMismatch {
245                chainspec_hash: hash,
246                storage_hash: block_hash,
247            })
248        }
249        Err(e) => {
250            debug!(?e);
251            return Err(e.into());
252        }
253    }
254
255    debug!("Writing genesis block.");
256
257    // Make sure to set storage settings before anything writes
258    factory.set_storage_settings_cache(genesis_storage_settings);
259
260    let alloc = &genesis.alloc;
261
262    // use transaction to insert genesis header
263    let provider_rw = factory.database_provider_rw()?;
264
265    // Behaviour reserved only for new nodes should be set in the storage settings.
266    provider_rw.write_storage_settings(genesis_storage_settings)?;
267
268    // For non-zero genesis blocks, set expected_block_start BEFORE insert_genesis_state.
269    // When block_range is None, next_block_number() uses expected_block_start. By default,
270    // expected_block_start comes from find_fixed_range which returns the file range start (0),
271    // not the genesis block number. This would cause increment_block(N) to fail.
272    let static_file_provider = provider_rw.static_file_provider();
273    if genesis_block_number > 0 {
274        if genesis_storage_settings.storage_v2 {
275            static_file_provider
276                .get_writer(genesis_block_number, StaticFileSegment::AccountChangeSets)?
277                .user_header_mut()
278                .set_expected_block_start(genesis_block_number);
279        }
280        if genesis_storage_settings.storage_v2 {
281            static_file_provider
282                .get_writer(genesis_block_number, StaticFileSegment::StorageChangeSets)?
283                .user_header_mut()
284                .set_expected_block_start(genesis_block_number);
285        }
286    }
287
288    insert_genesis_hashes(&provider_rw, alloc.iter())?;
289    insert_genesis_history(&provider_rw, alloc.iter())?;
290
291    // Insert header
292    insert_genesis_header(&provider_rw, &chain)?;
293
294    insert_genesis_state(&provider_rw, alloc.iter())?;
295
296    // compute state root to populate trie tables
297    compute_state_root(&provider_rw, None)?;
298
299    // set stage checkpoint to genesis block number for all stages
300    let checkpoint = StageCheckpoint::new(genesis_block_number);
301    for stage in StageId::ALL {
302        provider_rw.save_stage_checkpoint(stage, checkpoint)?;
303    }
304
305    // Static file segments start empty, so we need to initialize the block range.
306    // For genesis blocks with non-zero block numbers, we use get_writer() instead of
307    // latest_writer() and set_block_range() to ensure static files start at the correct block.
308    let static_file_provider = provider_rw.static_file_provider();
309
310    static_file_provider
311        .get_writer(genesis_block_number, StaticFileSegment::Receipts)?
312        .user_header_mut()
313        .set_block_range(genesis_block_number, genesis_block_number);
314    static_file_provider
315        .get_writer(genesis_block_number, StaticFileSegment::Transactions)?
316        .user_header_mut()
317        .set_block_range(genesis_block_number, genesis_block_number);
318
319    if genesis_storage_settings.storage_v2 {
320        static_file_provider
321            .get_writer(genesis_block_number, StaticFileSegment::TransactionSenders)?
322            .user_header_mut()
323            .set_block_range(genesis_block_number, genesis_block_number);
324    }
325
326    // `commit_unwind`` will first commit the DB and then the static file provider, which is
327    // necessary on `init_genesis`.
328    provider_rw.commit()?;
329
330    Ok(hash)
331}
332
333/// Inserts the genesis state into the database.
334pub fn insert_genesis_state<'a, 'b, Provider>(
335    provider: &Provider,
336    alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
337) -> ProviderResult<()>
338where
339    Provider: StaticFileProviderFactory
340        + DBProvider<Tx: DbTxMut>
341        + HeaderProvider
342        + StateWriter
343        + ChainSpecProvider
344        + AsRef<Provider>,
345{
346    let genesis_block_number = provider.chain_spec().genesis_header().number();
347    insert_state(provider, alloc, genesis_block_number)
348}
349
350/// Inserts state at given block into database.
351pub fn insert_state<'a, 'b, Provider>(
352    provider: &Provider,
353    alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
354    block: u64,
355) -> ProviderResult<()>
356where
357    Provider: StaticFileProviderFactory
358        + DBProvider<Tx: DbTxMut>
359        + HeaderProvider
360        + StateWriter
361        + AsRef<Provider>,
362{
363    let capacity = alloc.size_hint().1.unwrap_or(0);
364    let mut state_init: BundleStateInit =
365        AddressMap::with_capacity_and_hasher(capacity, Default::default());
366    let mut reverts_init: AddressMap<_> =
367        AddressMap::with_capacity_and_hasher(capacity, Default::default());
368    let mut contracts: B256Map<Bytecode> =
369        B256Map::with_capacity_and_hasher(capacity, Default::default());
370
371    for (address, account) in alloc {
372        let bytecode_hash = if let Some(code) = &account.code {
373            match Bytecode::new_raw_checked(code.clone()) {
374                Ok(bytecode) => {
375                    let hash = bytecode.hash_slow();
376                    contracts.insert(hash, bytecode);
377                    Some(hash)
378                }
379                Err(err) => {
380                    error!(%address, %err, "Failed to decode genesis bytecode.");
381                    return Err(DatabaseError::Other(err.to_string()).into());
382                }
383            }
384        } else {
385            None
386        };
387
388        // get state
389        let storage = account
390            .storage
391            .as_ref()
392            .map(|m| {
393                m.iter()
394                    .map(|(key, value)| {
395                        let value = U256::from_be_bytes(value.0);
396                        (*key, (U256::ZERO, value))
397                    })
398                    .collect::<B256Map<_>>()
399            })
400            .unwrap_or_default();
401
402        reverts_init.insert(
403            *address,
404            (Some(None), storage.keys().map(|k| StorageEntry::new(*k, U256::ZERO)).collect()),
405        );
406
407        state_init.insert(
408            *address,
409            (
410                None,
411                Some(Account {
412                    nonce: account.nonce.unwrap_or_default(),
413                    balance: account.balance,
414                    bytecode_hash,
415                }),
416                storage,
417            ),
418        );
419    }
420    let all_reverts_init: RevertsInit = HashMap::from_iter([(block, reverts_init)]);
421
422    let execution_outcome = ExecutionOutcome::new_init(
423        state_init,
424        all_reverts_init,
425        contracts,
426        Vec::default(),
427        block,
428        Vec::new(),
429    );
430
431    provider.write_state(
432        &execution_outcome,
433        OriginalValuesKnown::Yes,
434        StateWriteConfig::default(),
435    )?;
436
437    trace!(target: "reth::cli", "Inserted state");
438
439    Ok(())
440}
441
442/// Inserts hashes for the genesis state.
443pub fn insert_genesis_hashes<'a, 'b, Provider>(
444    provider: &Provider,
445    alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
446) -> ProviderResult<()>
447where
448    Provider: DBProvider<Tx: DbTxMut> + HashingWriter,
449{
450    // insert and hash accounts to hashing table
451    let alloc_accounts = alloc.clone().map(|(addr, account)| (*addr, Some(Account::from(account))));
452    provider.insert_account_for_hashing(alloc_accounts)?;
453
454    trace!(target: "reth::cli", "Inserted account hashes");
455
456    let alloc_storage = alloc.filter_map(|(addr, account)| {
457        // only return Some if there is storage
458        account.storage.as_ref().map(|storage| {
459            (*addr, storage.iter().map(|(&key, &value)| StorageEntry { key, value: value.into() }))
460        })
461    });
462    provider.insert_storage_for_hashing(alloc_storage)?;
463
464    trace!(target: "reth::cli", "Inserted storage hashes");
465
466    Ok(())
467}
468
469/// Inserts header for the genesis state.
470pub fn insert_genesis_header<Provider, Spec>(
471    provider: &Provider,
472    chain: &Spec,
473) -> ProviderResult<()>
474where
475    Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Compact>>
476        + DBProvider<Tx: DbTxMut>,
477    Spec: EthChainSpec<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>,
478{
479    let (header, block_hash) = (chain.genesis_header(), chain.genesis_hash());
480    let static_file_provider = provider.static_file_provider();
481
482    // Get the actual genesis block number from the header
483    let genesis_block_number = header.number();
484
485    match static_file_provider.block_hash(genesis_block_number) {
486        Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, _)) => {
487            let difficulty = header.difficulty();
488
489            // For genesis blocks with non-zero block numbers, we need to ensure they are stored
490            // in the correct static file range. We use get_writer() with the genesis block number
491            // to ensure the genesis block is stored in the correct static file range.
492            let mut writer = static_file_provider
493                .get_writer(genesis_block_number, StaticFileSegment::Headers)?;
494
495            // For non-zero genesis blocks, we need to set block range to genesis_block_number and
496            // append header without increment block
497            if genesis_block_number > 0 {
498                writer
499                    .user_header_mut()
500                    .set_block_range(genesis_block_number, genesis_block_number);
501                writer.append_header_direct(header, difficulty, &block_hash)?;
502            } else {
503                // For zero genesis blocks, use normal append_header
504                writer.append_header(header, &block_hash)?;
505            }
506        }
507        Ok(Some(_)) => {}
508        Err(e) => return Err(e),
509    }
510
511    provider.tx_ref().put::<tables::HeaderNumbers>(block_hash, genesis_block_number)?;
512    provider.tx_ref().put::<tables::BlockBodyIndices>(genesis_block_number, Default::default())?;
513
514    Ok(())
515}
516
517/// Reads account state from a [`BufRead`] reader and initializes it at the highest block that can
518/// be found on database.
519///
520/// It's similar to [`init_genesis`] but supports importing state too big to fit in memory, and can
521/// be set to the highest block present. One practical usecase is to import OP mainnet state at
522/// bedrock transition block.
523pub fn init_from_state_dump<PF>(
524    mut reader: impl BufRead,
525    provider_factory: &PF,
526    etl_config: EtlConfig,
527) -> eyre::Result<B256>
528where
529    PF: DatabaseProviderFactory<
530        ProviderRW: StaticFileProviderFactory
531                        + DBProvider<Tx: DbTxMut>
532                        + BlockNumReader
533                        + BlockHashReader
534                        + ChainSpecProvider
535                        + StageCheckpointWriter
536                        + HistoryWriter
537                        + HeaderProvider
538                        + HashingWriter
539                        + TrieWriter
540                        + StateWriter
541                        + StorageSettingsCache
542                        + RocksDBProviderFactory
543                        + NodePrimitivesProvider
544                        + AsRef<PF::ProviderRW>,
545    >,
546{
547    if etl_config.file_size == 0 {
548        return Err(eyre::eyre!("ETL file size cannot be zero"))
549    }
550
551    let (block, hash, expected_state_root) = {
552        let provider_rw = provider_factory.database_provider_rw()?;
553        let block = provider_rw.last_block_number()?;
554        let hash = provider_rw
555            .block_hash(block)?
556            .ok_or_else(|| eyre::eyre!("Block hash not found for block {}", block))?;
557        let header = provider_rw
558            .header_by_number(block)?
559            .map(|h| SealedHeader::new(h, hash))
560            .ok_or_else(|| ProviderError::HeaderNotFound(block.into()))?;
561        let state_root = header.state_root();
562
563        debug!(target: "reth::cli",
564            block,
565            chain=%provider_rw.chain_spec().chain(),
566            "Initializing state at block"
567        );
568
569        (block, hash, state_root)
570    };
571
572    // first line can be state root
573    let dump_state_root = parse_state_root(&mut reader)?;
574    if expected_state_root != dump_state_root {
575        error!(target: "reth::cli",
576            ?dump_state_root,
577            ?expected_state_root,
578            "State root from state dump does not match state root in current header."
579        );
580        return Err(InitStorageError::StateRootMismatch(GotExpected {
581            got: dump_state_root,
582            expected: expected_state_root,
583        })
584        .into())
585    }
586
587    // remaining lines are accounts
588    let collector = parse_accounts(&mut reader, etl_config)?;
589
590    // write state to db with chunked commits to avoid OOM
591    dump_state(collector, provider_factory, block)?;
592
593    info!(target: "reth::cli", "All accounts written to database, starting state root computation (may take some time)");
594
595    // clear trie tables so state root is computed from scratch
596    {
597        let provider_rw = provider_factory.database_provider_rw()?;
598        provider_rw.tx_ref().clear::<tables::AccountsTrie>()?;
599        provider_rw.tx_ref().clear::<tables::StoragesTrie>()?;
600        provider_rw.commit()?;
601    }
602
603    // compute and compare state root
604    let computed_state_root = compute_state_root_chunked(provider_factory)?;
605    if computed_state_root == expected_state_root {
606        info!(target: "reth::cli",
607            ?computed_state_root,
608            "Computed state root matches state root in state dump"
609        );
610    } else {
611        error!(target: "reth::cli",
612            ?computed_state_root,
613            ?expected_state_root,
614            "Computed state root does not match state root in state dump"
615        );
616
617        return Err(InitStorageError::StateRootMismatch(GotExpected {
618            got: computed_state_root,
619            expected: expected_state_root,
620        })
621        .into())
622    }
623
624    // insert sync stages for stages that require state
625    {
626        let provider_rw = provider_factory.database_provider_rw()?;
627        for stage in StageId::STATE_REQUIRED {
628            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(block))?;
629        }
630        provider_rw.commit()?;
631    }
632
633    Ok(hash)
634}
635
636/// Parses and returns expected state root.
637fn parse_state_root(reader: &mut impl BufRead) -> eyre::Result<B256> {
638    let mut line = String::new();
639    reader.read_line(&mut line)?;
640
641    let expected_state_root = serde_json::from_str::<StateRoot>(&line)?.root;
642    trace!(target: "reth::cli",
643        root=%expected_state_root,
644        "Read state root from file"
645    );
646    Ok(expected_state_root)
647}
648
649/// Parses accounts and pushes them to a [`Collector`].
650fn parse_accounts(
651    reader: impl BufRead,
652    etl_config: EtlConfig,
653) -> Result<Collector<Address, GenesisAccount>, eyre::Error> {
654    let mut collector = Collector::new(etl_config.file_size, etl_config.dir);
655    let mut parsed_accounts = 0usize;
656
657    let stream =
658        serde_json::Deserializer::from_reader(reader).into_iter::<GenesisAccountWithAddress>();
659    for account in stream {
660        let GenesisAccountWithAddress { genesis_account, address } = account?;
661        collector.insert(address, genesis_account)?;
662
663        parsed_accounts += 1;
664        if parsed_accounts.is_multiple_of(100_000) {
665            info!(target: "reth::cli", parsed_accounts, "Parsed accounts");
666        }
667    }
668
669    Ok(collector)
670}
671
672/// Takes a [`Collector`] and writes all accounts directly to database tables.
673///
674/// This bypasses the higher-level `insert_state`/`insert_genesis_hashes`/`insert_history`
675/// functions which build intermediate structures (`BundleStateInit`, `RevertsInit`,
676/// `ExecutionOutcome`) that duplicate all storage data 2-3x in memory. For accounts with
677/// millions of storage entries this causes OOM.
678///
679/// Instead, each account is written directly to all required tables using cursor operations,
680/// using `append`/`append_dup` for sorted tables where possible (MDBX fast path that skips
681/// B-tree traversal). Commits happen every [`STORAGE_COMMIT_THRESHOLD`] storage units to
682/// bound MDBX dirty page accumulation.
683///
684/// NOTE: This function is not idempotent. If the process crashes mid-import, the database
685/// must be wiped before retrying.
686fn dump_state<PF>(
687    mut collector: Collector<Address, GenesisAccount>,
688    provider_factory: &PF,
689    block: u64,
690) -> Result<(), eyre::Error>
691where
692    PF: DatabaseProviderFactory<ProviderRW: DBProvider<Tx: DbTxMut>>,
693{
694    let accounts_len = collector.len();
695    let mut total_accounts: usize = 0;
696    let mut storage_units: usize = 0;
697
698    // pre-allocate the history list once — every entry uses the same single-block bitmap
699    let history_list = IntegerList::new([block])?;
700
701    // track seen bytecode hashes to avoid re-hashing and re-writing duplicates
702    let mut seen_bytecodes: B256Set = B256Set::default();
703
704    let mut provider_rw = provider_factory.database_provider_rw()?;
705
706    for entry in collector.iter()? {
707        let (address_raw, account_raw) = entry?;
708        let (address, _) = Address::from_compact(address_raw.as_slice(), address_raw.len());
709        let (account, _) = GenesisAccount::from_compact(account_raw.as_slice(), account_raw.len());
710
711        let account_storage_len = account.storage.as_ref().map_or(0, |s| s.len());
712        let account_units = 1 + account_storage_len;
713
714        // commit before this account would push us over the threshold
715        if storage_units > 0 && storage_units + account_units > STORAGE_COMMIT_THRESHOLD {
716            provider_rw.commit()?;
717            provider_rw = provider_factory.database_provider_rw()?;
718            info!(target: "reth::cli",
719                total_accounts,
720                accounts_len,
721                storage_units,
722                "Committed chunk"
723            );
724            storage_units = 0;
725            seen_bytecodes = B256Set::default();
726        }
727
728        write_account_to_db(
729            provider_rw.tx_ref(),
730            &address,
731            &account,
732            block,
733            &history_list,
734            &mut seen_bytecodes,
735        )?;
736
737        total_accounts += 1;
738        storage_units += account_units;
739
740        if total_accounts.is_multiple_of(100_000) {
741            info!(target: "reth::cli", total_accounts, accounts_len, "Writing accounts...");
742        }
743    }
744
745    // commit final batch
746    provider_rw.commit()?;
747
748    info!(target: "reth::cli", total_accounts, "All accounts written to database");
749
750    Ok(())
751}
752
753/// Writes a single account and all its storage to every required DB table directly,
754/// without building intermediary structures.
755///
756/// Uses `append_dup` for `DupSort` tables where insertion order matches key order (the ETL
757/// collector sorts by address, so `AccountChangeSets`, `PlainStorageState`, and
758/// `StorageChangeSets` receive data in sorted order within each account). For `HashedAccounts`
759/// and `HashedStorages`, insertion order is unsorted (keccak scrambles address order), so we
760/// use `put`/`upsert` which do a full B-tree lookup.
761fn write_account_to_db<TX: DbTxMut>(
762    tx: &TX,
763    address: &Address,
764    genesis_account: &GenesisAccount,
765    block: u64,
766    history_list: &IntegerList,
767    seen_bytecodes: &mut B256Set,
768) -> Result<(), eyre::Error> {
769    let bytecode_hash = if let Some(code) = &genesis_account.code {
770        let bytecode = Bytecode::new_raw_checked(code.clone())
771            .map_err(|e| eyre::eyre!("Invalid bytecode for {address}: {e}"))?;
772        let hash = bytecode.hash_slow();
773        if seen_bytecodes.insert(hash) {
774            tx.put::<tables::Bytecodes>(hash, bytecode)?;
775        }
776        Some(hash)
777    } else {
778        None
779    };
780
781    let account = Account {
782        nonce: genesis_account.nonce.unwrap_or_default(),
783        balance: genesis_account.balance,
784        bytecode_hash,
785    };
786
787    let hashed_address = keccak256(address);
788
789    // plain state — sorted by address (ETL order), use append
790    tx.put::<tables::PlainAccountState>(*address, account)?;
791
792    // hashed state — unsorted (keccak scrambles order), must use put
793    tx.put::<tables::HashedAccounts>(hashed_address, account)?;
794
795    // account changeset — DupSort keyed by block, subkey sorted by address (ETL order)
796    let mut acct_cs_cursor = tx.cursor_dup_write::<tables::AccountChangeSets>()?;
797    acct_cs_cursor.append_dup(block, AccountBeforeTx { address: *address, info: None })?;
798
799    // account history
800    tx.put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), history_list.clone())?;
801
802    // storage entries
803    if let Some(storage) = &genesis_account.storage {
804        let mut hashed_storage_cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
805        let mut plain_storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
806        let mut storage_cs_cursor = tx.cursor_dup_write::<tables::StorageChangeSets>()?;
807
808        for (&key, &value) in storage {
809            let value_u256 = U256::from_be_bytes(value.0);
810
811            // plain storage — sorted by (address, key), use append_dup
812            plain_storage_cursor.append_dup(*address, StorageEntry { key, value: value_u256 })?;
813
814            // hashed storage — unsorted keccak order, use upsert
815            let hashed_key = keccak256(key);
816            hashed_storage_cursor
817                .upsert(hashed_address, &StorageEntry { key: hashed_key, value: value_u256 })?;
818
819            // storage changeset — sorted by (block, address), then by key via append_dup
820            storage_cs_cursor.append_dup(
821                BlockNumberAddress((block, *address)),
822                StorageEntry { key, value: U256::ZERO },
823            )?;
824
825            // storage history
826            tx.put::<tables::StoragesHistory>(
827                StorageShardedKey::new(*address, key, u64::MAX),
828                history_list.clone(),
829            )?;
830        }
831    }
832
833    Ok(())
834}
835
836/// Computes the state root (from scratch) based on the accounts and storages present in the
837/// database.
838fn compute_state_root<Provider>(
839    provider: &Provider,
840    prefix_sets: Option<TriePrefixSets>,
841) -> Result<B256, InitStorageError>
842where
843    Provider: DBProvider<Tx: DbTxMut> + TrieWriter + StorageSettingsCache,
844{
845    reth_trie_db::with_adapter!(provider, |A| {
846        compute_state_root_inner::<_, A>(provider, prefix_sets)
847    })
848}
849
850fn compute_state_root_inner<Provider, A>(
851    provider: &Provider,
852    prefix_sets: Option<TriePrefixSets>,
853) -> Result<B256, InitStorageError>
854where
855    Provider: DBProvider<Tx: DbTxMut> + TrieWriter + StorageSettingsCache,
856    A: reth_trie_db::TrieTableAdapter,
857{
858    trace!(target: "reth::cli", "Computing state root");
859
860    let tx = provider.tx_ref();
861    let mut intermediate_state: Option<IntermediateStateRootState> = None;
862    let mut total_flushed_updates = 0;
863
864    loop {
865        let mut state_root =
866            DbStateRoot::<_, A>::from_tx(tx).with_intermediate_state(intermediate_state);
867
868        if let Some(sets) = prefix_sets.clone() {
869            state_root = state_root.with_prefix_sets(sets);
870        }
871
872        match state_root.root_with_progress()? {
873            StateRootProgress::Progress(state, _, updates) => {
874                let updated_len = provider.write_trie_updates(updates)?;
875                total_flushed_updates += updated_len;
876
877                trace!(target: "reth::cli",
878                    last_account_key = %state.account_root_state.last_hashed_key,
879                    updated_len,
880                    total_flushed_updates,
881                    "Flushing trie updates"
882                );
883
884                intermediate_state = Some(*state);
885
886                if total_flushed_updates.is_multiple_of(SOFT_LIMIT_COUNT_FLUSHED_UPDATES) {
887                    info!(target: "reth::cli",
888                        total_flushed_updates,
889                        "Flushing trie updates"
890                    );
891                }
892            }
893            StateRootProgress::Complete(root, _, updates) => {
894                let updated_len = provider.write_trie_updates(updates)?;
895                total_flushed_updates += updated_len;
896
897                trace!(target: "reth::cli",
898                    %root,
899                    updated_len,
900                    total_flushed_updates,
901                    "State root has been computed"
902                );
903
904                return Ok(root)
905            }
906        }
907    }
908}
909
910/// Computes the state root (from scratch) with periodic commits to free MDBX dirty pages.
911///
912/// Opens a fresh transaction each iteration to release dirty pages, preventing OOM on large
913/// states where trie updates accumulate gigabytes of MDBX dirty pages.
914fn compute_state_root_chunked<PF>(provider_factory: &PF) -> Result<B256, InitStorageError>
915where
916    PF: DatabaseProviderFactory<
917        ProviderRW: DBProvider<Tx: DbTxMut> + TrieWriter + StorageSettingsCache,
918    >,
919{
920    let provider_rw = provider_factory.database_provider_rw().map_err(provider_db_err)?;
921
922    reth_trie_db::with_adapter!(&provider_rw, |A| {
923        drop(provider_rw);
924        compute_state_root_chunked_inner::<PF, A>(provider_factory)
925    })
926}
927
928fn compute_state_root_chunked_inner<PF, A>(provider_factory: &PF) -> Result<B256, InitStorageError>
929where
930    PF: DatabaseProviderFactory<
931        ProviderRW: DBProvider<Tx: DbTxMut> + TrieWriter + StorageSettingsCache,
932    >,
933    A: reth_trie_db::TrieTableAdapter,
934{
935    trace!(target: "reth::cli", "Computing state root");
936
937    let mut intermediate_state: Option<IntermediateStateRootState> = None;
938    let mut total_flushed_updates = 0;
939
940    loop {
941        let provider_rw = provider_factory.database_provider_rw().map_err(provider_db_err)?;
942        let tx = provider_rw.tx_ref();
943
944        let state_root = DbStateRoot::<_, A>::from_tx(tx)
945            .with_intermediate_state(intermediate_state.take())
946            .with_threshold(STATE_ROOT_COMMIT_THRESHOLD);
947
948        match state_root.root_with_progress()? {
949            StateRootProgress::Progress(state, _, updates) => {
950                let updated_len = provider_rw.write_trie_updates(updates)?;
951                total_flushed_updates += updated_len;
952
953                info!(target: "reth::cli",
954                    last_account_key = %state.account_root_state.last_hashed_key,
955                    updated_len,
956                    total_flushed_updates,
957                    "Flushing trie updates (committing to free memory)"
958                );
959
960                intermediate_state = Some(*state);
961                provider_rw.commit().map_err(provider_db_err)?;
962            }
963            StateRootProgress::Complete(root, _, updates) => {
964                let updated_len = provider_rw.write_trie_updates(updates)?;
965                total_flushed_updates += updated_len;
966
967                info!(target: "reth::cli",
968                    %root,
969                    updated_len,
970                    total_flushed_updates,
971                    "State root computation complete"
972                );
973
974                provider_rw.commit().map_err(provider_db_err)?;
975                return Ok(root)
976            }
977        }
978    }
979}
980
981/// Converts a provider error into an [`InitStorageError`].
982fn provider_db_err(e: impl std::fmt::Display) -> InitStorageError {
983    InitStorageError::from(StateRootError::Database(DatabaseError::Other(e.to_string())))
984}
985
986/// Type to deserialize state root from state dump file.
987#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
988struct StateRoot {
989    root: B256,
990}
991
992/// An account as in the state dump file. This contains a [`GenesisAccount`] and the account's
993/// address.
994#[derive(Debug, Serialize, Deserialize)]
995struct GenesisAccountWithAddress {
996    /// The account's balance, nonce, code, and storage.
997    #[serde(flatten)]
998    genesis_account: GenesisAccount,
999    /// The account's address.
1000    address: Address,
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005    use super::*;
1006    use alloy_consensus::constants::{
1007        HOLESKY_GENESIS_HASH, MAINNET_GENESIS_HASH, SEPOLIA_GENESIS_HASH,
1008    };
1009    use alloy_genesis::Genesis;
1010    use reth_chainspec::{Chain, ChainSpec, HOLESKY, MAINNET, SEPOLIA};
1011    use reth_db::DatabaseEnv;
1012    use reth_db_api::{
1013        cursor::DbCursorRO,
1014        models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1015        table::{Table, TableRow},
1016        transaction::DbTx,
1017        Database,
1018    };
1019    use reth_provider::{
1020        test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
1021        ProviderFactory, RocksDBProviderFactory,
1022    };
1023    use std::{collections::BTreeMap, sync::Arc};
1024
1025    fn collect_table_entries<DB, T>(
1026        tx: &<DB as Database>::TX,
1027    ) -> Result<Vec<TableRow<T>>, InitStorageError>
1028    where
1029        DB: Database,
1030        T: Table,
1031    {
1032        Ok(tx.cursor_read::<T>()?.walk_range(..)?.collect::<Result<Vec<_>, _>>()?)
1033    }
1034
1035    #[test]
1036    fn parse_accounts_streams_jsonl_accounts() {
1037        let input = br#"{"address":"0x0000000000000000000000000000000000000002","balance":"0x2"}
1038{"address":"0x0000000000000000000000000000000000000001","balance":"0x1"}
1039"#;
1040
1041        let mut collector =
1042            parse_accounts(&input[..], EtlConfig::new(None, 128)).expect("parse succeeds");
1043
1044        let accounts = collector
1045            .iter()
1046            .unwrap()
1047            .map(|entry| {
1048                let (address_raw, account_raw) = entry.unwrap();
1049                let (address, _) = Address::from_compact(address_raw.as_slice(), address_raw.len());
1050                let (account, _) =
1051                    GenesisAccount::from_compact(account_raw.as_slice(), account_raw.len());
1052                (address, account.balance)
1053            })
1054            .collect::<Vec<_>>();
1055
1056        assert_eq!(
1057            accounts,
1058            vec![
1059                (Address::with_last_byte(1), U256::from(1)),
1060                (Address::with_last_byte(2), U256::from(2))
1061            ]
1062        );
1063    }
1064
1065    #[test]
1066    fn success_init_genesis_mainnet() {
1067        let genesis_hash =
1068            init_genesis(&create_test_provider_factory_with_chain_spec(MAINNET.clone())).unwrap();
1069
1070        // actual, expected
1071        assert_eq!(genesis_hash, MAINNET_GENESIS_HASH);
1072    }
1073
1074    #[test]
1075    fn success_init_genesis_sepolia() {
1076        let genesis_hash =
1077            init_genesis(&create_test_provider_factory_with_chain_spec(SEPOLIA.clone())).unwrap();
1078
1079        // actual, expected
1080        assert_eq!(genesis_hash, SEPOLIA_GENESIS_HASH);
1081    }
1082
1083    #[test]
1084    fn success_init_genesis_holesky() {
1085        let genesis_hash =
1086            init_genesis(&create_test_provider_factory_with_chain_spec(HOLESKY.clone())).unwrap();
1087
1088        // actual, expected
1089        assert_eq!(genesis_hash, HOLESKY_GENESIS_HASH);
1090    }
1091
1092    #[test]
1093    fn fail_init_inconsistent_db() {
1094        let factory = create_test_provider_factory_with_chain_spec(SEPOLIA.clone());
1095        let static_file_provider = factory.static_file_provider();
1096        let rocksdb_provider = factory.rocksdb_provider();
1097        init_genesis(&factory).unwrap();
1098
1099        // Try to init db with a different genesis block
1100        let genesis_hash = init_genesis(
1101            &ProviderFactory::<MockNodeTypesWithDB>::new(
1102                factory.into_db(),
1103                MAINNET.clone(),
1104                static_file_provider,
1105                rocksdb_provider,
1106                reth_tasks::Runtime::test(),
1107            )
1108            .unwrap(),
1109        );
1110
1111        assert!(matches!(
1112            genesis_hash.unwrap_err(),
1113            InitStorageError::GenesisHashMismatch {
1114                chainspec_hash: MAINNET_GENESIS_HASH,
1115                storage_hash: SEPOLIA_GENESIS_HASH
1116            }
1117        ))
1118    }
1119
1120    #[test]
1121    fn skip_genesis_hash_validation_accepts_mismatched_db() {
1122        let factory = create_test_provider_factory_with_chain_spec(SEPOLIA.clone());
1123        let static_file_provider = factory.static_file_provider();
1124        let rocksdb_provider = factory.rocksdb_provider();
1125        init_genesis(&factory).unwrap();
1126
1127        let result = init_genesis_with_settings_and_validate(
1128            &ProviderFactory::<MockNodeTypesWithDB>::new(
1129                factory.into_db(),
1130                MAINNET.clone(),
1131                static_file_provider,
1132                rocksdb_provider,
1133                reth_tasks::Runtime::test(),
1134            )
1135            .unwrap(),
1136            StorageSettings::base(),
1137            false,
1138        );
1139
1140        let returned = result.expect("skip_genesis_validation should suppress mismatch error");
1141        assert_eq!(
1142            returned, SEPOLIA_GENESIS_HASH,
1143            "bypass returns the DB-resident hash, not the chainspec hash",
1144        );
1145    }
1146
1147    #[test]
1148    fn init_genesis_history() {
1149        let address_with_balance = Address::with_last_byte(1);
1150        let address_with_storage = Address::with_last_byte(2);
1151        let storage_key = B256::with_last_byte(1);
1152        let chain_spec = Arc::new(ChainSpec {
1153            chain: Chain::from_id(1),
1154            genesis: Genesis {
1155                alloc: BTreeMap::from([
1156                    (
1157                        address_with_balance,
1158                        GenesisAccount { balance: U256::from(1), ..Default::default() },
1159                    ),
1160                    (
1161                        address_with_storage,
1162                        GenesisAccount {
1163                            storage: Some(BTreeMap::from([(storage_key, B256::random())])),
1164                            ..Default::default()
1165                        },
1166                    ),
1167                ]),
1168                ..Default::default()
1169            },
1170            hardforks: Default::default(),
1171            paris_block_and_final_difficulty: None,
1172            deposit_contract: None,
1173            ..Default::default()
1174        });
1175
1176        let factory = create_test_provider_factory_with_chain_spec(chain_spec);
1177        init_genesis(&factory).unwrap();
1178
1179        let expected_accounts = vec![
1180            (ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
1181            (ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap()),
1182        ];
1183        let expected_storages = vec![(
1184            StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
1185            IntegerList::new([0]).unwrap(),
1186        )];
1187
1188        let collect_from_mdbx = |factory: &ProviderFactory<MockNodeTypesWithDB>| {
1189            let provider = factory.provider().unwrap();
1190            let tx = provider.tx_ref();
1191            (
1192                collect_table_entries::<DatabaseEnv, tables::AccountsHistory>(tx).unwrap(),
1193                collect_table_entries::<DatabaseEnv, tables::StoragesHistory>(tx).unwrap(),
1194            )
1195        };
1196
1197        {
1198            let settings = factory.cached_storage_settings();
1199            let rocksdb = factory.rocksdb_provider();
1200
1201            let collect_rocksdb = |rocksdb: &reth_provider::providers::RocksDBProvider| {
1202                (
1203                    rocksdb
1204                        .iter::<tables::AccountsHistory>()
1205                        .unwrap()
1206                        .collect::<Result<Vec<_>, _>>()
1207                        .unwrap(),
1208                    rocksdb
1209                        .iter::<tables::StoragesHistory>()
1210                        .unwrap()
1211                        .collect::<Result<Vec<_>, _>>()
1212                        .unwrap(),
1213                )
1214            };
1215
1216            let (accounts, storages) = if settings.storage_v2 {
1217                collect_rocksdb(&rocksdb)
1218            } else {
1219                collect_from_mdbx(&factory)
1220            };
1221            assert_eq!(accounts, expected_accounts);
1222            assert_eq!(storages, expected_storages);
1223        }
1224    }
1225
1226    #[test]
1227    fn warn_storage_settings_mismatch() {
1228        let factory = create_test_provider_factory_with_chain_spec(MAINNET.clone());
1229        init_genesis_with_settings(&factory, StorageSettings::v1()).unwrap();
1230
1231        // Request different settings - should warn but succeed
1232        let result = init_genesis_with_settings(&factory, StorageSettings::v2());
1233
1234        // Should succeed (warning is logged, not an error)
1235        assert!(result.is_ok());
1236    }
1237
1238    #[test]
1239    fn allow_same_storage_settings() {
1240        let factory = create_test_provider_factory_with_chain_spec(MAINNET.clone());
1241        let settings = StorageSettings::v2();
1242        init_genesis_with_settings(&factory, settings).unwrap();
1243
1244        let result = init_genesis_with_settings(&factory, settings);
1245
1246        assert!(result.is_ok());
1247    }
1248}