1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::{Database, ReaderTxnTracker},
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55 SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, StoragePath,
65 StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77 cmp::Ordering,
78 collections::{BTreeMap, BTreeSet},
79 fmt::Debug,
80 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81 path::PathBuf,
82 sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89 #[default]
91 Normal,
92 Unwind,
95}
96
97impl CommitOrder {
98 pub const fn is_unwind(&self) -> bool {
100 matches!(self, Self::Unwind)
101 }
102}
103
104pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113 pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119 fn deref(&self) -> &Self::Target {
120 &self.0
121 }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.0
127 }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131 for DatabaseProviderRW<DB, N>
132{
133 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134 &self.0
135 }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139 pub fn commit(self) -> ProviderResult<()> {
141 self.0.commit()
142 }
143
144 pub fn into_tx(self) -> <DB as Database>::TXMut {
146 self.0.into_tx()
147 }
148
149 #[cfg(any(test, feature = "test-utils"))]
151 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152 self.0.minimum_pruning_distance = distance;
153 self
154 }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158 for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161 provider.0
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168 Full,
171 BlocksOnly,
175}
176
177impl SaveBlocksMode {
178 pub const fn with_state(self) -> bool {
180 matches!(self, Self::Full)
181 }
182}
183
184pub struct DatabaseProvider<TX, N: NodeTypes> {
187 tx: TX,
189 chain_spec: Arc<N::ChainSpec>,
191 static_file_provider: StaticFileProvider<N::Primitives>,
193 prune_modes: PruneModes,
195 storage: Arc<N::Storage>,
197 storage_settings: Arc<RwLock<StorageSettings>>,
199 rocksdb_provider: RocksDBProvider,
201 changeset_cache: ChangesetCache,
203 runtime: reth_tasks::Runtime,
205 db_path: PathBuf,
207 pending_rocksdb_batches: PendingRocksDBBatches,
209 commit_order: CommitOrder,
211 minimum_pruning_distance: u64,
213 metrics: metrics::DatabaseProviderMetrics,
215 reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
217}
218
219impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 let mut s = f.debug_struct("DatabaseProvider");
222 s.field("tx", &self.tx)
223 .field("chain_spec", &self.chain_spec)
224 .field("static_file_provider", &self.static_file_provider)
225 .field("prune_modes", &self.prune_modes)
226 .field("storage", &self.storage)
227 .field("storage_settings", &self.storage_settings)
228 .field("rocksdb_provider", &self.rocksdb_provider)
229 .field("changeset_cache", &self.changeset_cache)
230 .field("runtime", &self.runtime)
231 .field("pending_rocksdb_batches", &"<pending batches>")
232 .field("commit_order", &self.commit_order)
233 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
234 .field("reader_txn_tracker", &"<reader txn tracker>")
235 .finish()
236 }
237}
238
239impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
240 pub const fn prune_modes_ref(&self) -> &PruneModes {
242 &self.prune_modes
243 }
244
245 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
247 self.minimum_pruning_distance = distance;
248 self
249 }
250
251 pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
253 where
254 T: ReaderTxnTracker + 'static,
255 {
256 self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
257 self
258 }
259}
260
261impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
262 fn commit_unwind(self) -> ProviderResult<()> {
273 let storage_v2 = self.cached_storage_settings().storage_v2;
274 let reader_txn_tracker = self.reader_txn_tracker.clone();
275 self.tx.commit()?;
276
277 if storage_v2 {
278 if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
279 reader_txn_tracker.wait_for_pre_commit_readers();
280 }
281
282 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
283 for batch in batches {
284 self.rocksdb_provider.commit_batch(batch)?;
285 }
286 }
287
288 self.static_file_provider.commit()?;
289 Ok(())
290 }
291
292 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
294 trace!(target: "providers::db", "Returning latest state provider");
295 Box::new(LatestStateProviderRef::new(self))
296 }
297
298 pub fn history_by_block_hash<'a>(
300 &'a self,
301 block_hash: BlockHash,
302 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
303 let mut block_number =
304 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
305 if block_number == self.best_block_number().unwrap_or_default() &&
306 block_number == self.last_block_number().unwrap_or_default()
307 {
308 return Ok(Box::new(LatestStateProviderRef::new(self)))
309 }
310
311 block_number += 1;
313
314 let account_history_prune_checkpoint =
315 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
316 let storage_history_prune_checkpoint =
317 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
318
319 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
320
321 if let Some(prune_checkpoint_block_number) =
324 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
325 {
326 state_provider = state_provider.with_lowest_available_account_history_block_number(
327 prune_checkpoint_block_number + 1,
328 );
329 }
330 if let Some(prune_checkpoint_block_number) =
331 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
332 {
333 state_provider = state_provider.with_lowest_available_storage_history_block_number(
334 prune_checkpoint_block_number + 1,
335 );
336 }
337
338 Ok(Box::new(state_provider))
339 }
340
341 #[cfg(feature = "test-utils")]
342 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
344 self.prune_modes = prune_modes;
345 }
346}
347
348impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
349 type Primitives = N::Primitives;
350}
351
352impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
353 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
355 self.static_file_provider.clone()
356 }
357
358 fn get_static_file_writer(
359 &self,
360 block: BlockNumber,
361 segment: StaticFileSegment,
362 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
363 self.static_file_provider.get_writer(block, segment)
364 }
365}
366
367impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
368 fn rocksdb_provider(&self) -> RocksDBProvider {
370 self.rocksdb_provider.clone()
371 }
372
373 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
374 self.pending_rocksdb_batches.lock().push(batch);
375 }
376
377 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
378 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
379 for batch in batches {
380 self.rocksdb_provider.commit_batch(batch)?;
381 }
382 Ok(())
383 }
384}
385
386impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
387 for DatabaseProvider<TX, N>
388{
389 type ChainSpec = N::ChainSpec;
390
391 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
392 self.chain_spec.clone()
393 }
394}
395
396impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
397 #[expect(clippy::too_many_arguments)]
399 fn new_rw_inner(
400 tx: TX,
401 chain_spec: Arc<N::ChainSpec>,
402 static_file_provider: StaticFileProvider<N::Primitives>,
403 prune_modes: PruneModes,
404 storage: Arc<N::Storage>,
405 storage_settings: Arc<RwLock<StorageSettings>>,
406 rocksdb_provider: RocksDBProvider,
407 changeset_cache: ChangesetCache,
408 runtime: reth_tasks::Runtime,
409 db_path: PathBuf,
410 commit_order: CommitOrder,
411 ) -> Self {
412 Self {
413 tx,
414 chain_spec,
415 static_file_provider,
416 prune_modes,
417 storage,
418 storage_settings,
419 rocksdb_provider,
420 changeset_cache,
421 runtime,
422 db_path,
423 pending_rocksdb_batches: Default::default(),
424 commit_order,
425 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
426 metrics: metrics::DatabaseProviderMetrics::default(),
427 reader_txn_tracker: None,
428 }
429 }
430
431 #[expect(clippy::too_many_arguments)]
433 pub fn new_rw(
434 tx: TX,
435 chain_spec: Arc<N::ChainSpec>,
436 static_file_provider: StaticFileProvider<N::Primitives>,
437 prune_modes: PruneModes,
438 storage: Arc<N::Storage>,
439 storage_settings: Arc<RwLock<StorageSettings>>,
440 rocksdb_provider: RocksDBProvider,
441 changeset_cache: ChangesetCache,
442 runtime: reth_tasks::Runtime,
443 db_path: PathBuf,
444 ) -> Self {
445 Self::new_rw_inner(
446 tx,
447 chain_spec,
448 static_file_provider,
449 prune_modes,
450 storage,
451 storage_settings,
452 rocksdb_provider,
453 changeset_cache,
454 runtime,
455 db_path,
456 CommitOrder::Normal,
457 )
458 }
459
460 #[expect(clippy::too_many_arguments)]
462 pub fn new_unwind_rw(
463 tx: TX,
464 chain_spec: Arc<N::ChainSpec>,
465 static_file_provider: StaticFileProvider<N::Primitives>,
466 prune_modes: PruneModes,
467 storage: Arc<N::Storage>,
468 storage_settings: Arc<RwLock<StorageSettings>>,
469 rocksdb_provider: RocksDBProvider,
470 changeset_cache: ChangesetCache,
471 runtime: reth_tasks::Runtime,
472 db_path: PathBuf,
473 ) -> Self {
474 Self::new_rw_inner(
475 tx,
476 chain_spec,
477 static_file_provider,
478 prune_modes,
479 storage,
480 storage_settings,
481 rocksdb_provider,
482 changeset_cache,
483 runtime,
484 db_path,
485 CommitOrder::Unwind,
486 )
487 }
488}
489
490impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
491 fn as_ref(&self) -> &Self {
492 self
493 }
494}
495
496impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
497 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
501 where
502 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
503 {
504 let rocksdb = self.rocksdb_provider();
505 let rocksdb_batch = rocksdb.batch();
506
507 let (result, raw_batch) = f(rocksdb_batch)?;
508
509 if let Some(batch) = raw_batch {
510 self.set_pending_rocksdb_batch(batch);
511 }
512 let _ = raw_batch; Ok(result)
515 }
516
517 fn static_file_write_ctx(
519 &self,
520 save_mode: SaveBlocksMode,
521 first_block: BlockNumber,
522 last_block: BlockNumber,
523 ) -> ProviderResult<StaticFileWriteCtx> {
524 let tip = self.last_block_number()?.max(last_block);
525 Ok(StaticFileWriteCtx {
526 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
527 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
528 write_receipts: save_mode.with_state() &&
529 EitherWriter::receipts_destination(self).is_static_file(),
530 write_account_changesets: save_mode.with_state() &&
531 EitherWriterDestination::account_changesets(self).is_static_file(),
532 write_storage_changesets: save_mode.with_state() &&
533 EitherWriterDestination::storage_changesets(self).is_static_file(),
534 tip,
535 receipts_prune_mode: self.prune_modes.receipts,
536 receipts_prunable: self
538 .static_file_provider
539 .get_highest_static_file_tx(StaticFileSegment::Receipts)
540 .is_none() &&
541 PruneMode::Distance(self.minimum_pruning_distance)
542 .should_prune(first_block, tip),
543 })
544 }
545
546 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
548 RocksDBWriteCtx {
549 first_block_number: first_block,
550 prune_tx_lookup: self.prune_modes.transaction_lookup,
551 storage_settings: self.cached_storage_settings(),
552 pending_batches: self.pending_rocksdb_batches.clone(),
553 }
554 }
555
556 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
565 pub fn save_blocks(
566 &self,
567 blocks: Vec<ExecutedBlock<N::Primitives>>,
568 save_mode: SaveBlocksMode,
569 ) -> ProviderResult<()> {
570 if blocks.is_empty() {
571 debug!(target: "providers::db", "Attempted to write empty block range");
572 return Ok(())
573 }
574
575 let total_start = Instant::now();
576 let block_count = blocks.len() as u64;
577 let first_number = blocks.first().unwrap().recovered_block().number();
578 let last_block_number = blocks.last().unwrap().recovered_block().number();
579
580 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
581
582 let first_tx_num = self
584 .tx
585 .cursor_read::<tables::TransactionBlocks>()?
586 .last()?
587 .map(|(n, _)| n + 1)
588 .unwrap_or_default();
589
590 let tx_nums: Vec<TxNumber> = {
591 let mut nums = Vec::with_capacity(blocks.len());
592 let mut current = first_tx_num;
593 for block in &blocks {
594 nums.push(current);
595 current += block.recovered_block().body().transaction_count() as u64;
596 }
597 nums
598 };
599
600 let mut timings =
601 metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
602
603 let sf_provider = &self.static_file_provider;
605 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
606 let rocksdb_provider = self.rocksdb_provider.clone();
607 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
608 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
609
610 let mut sf_result = None;
611 let mut rocksdb_result = None;
612
613 let runtime = &self.runtime;
615 let span = tracing::Span::current();
618 runtime.storage_pool().in_place_scope(|s| {
619 s.spawn(|_| {
621 let _guard = span.enter();
622 let start = Instant::now();
623 sf_result = Some(
624 sf_provider
625 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
626 .map(|()| start.elapsed()),
627 );
628 });
629
630 if rocksdb_enabled {
632 s.spawn(|_| {
633 let _guard = span.enter();
634 let start = Instant::now();
635 rocksdb_result = Some(
636 rocksdb_provider
637 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
638 .map(|()| start.elapsed()),
639 );
640 });
641 }
642
643 let mdbx_start = Instant::now();
645
646 if !self.cached_storage_settings().storage_v2 &&
648 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
649 {
650 let start = Instant::now();
651 let total_tx_count: usize =
652 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
653 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
654 for (i, block) in blocks.iter().enumerate() {
655 let recovered_block = block.recovered_block();
656 for (tx_num, transaction) in
657 (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
658 {
659 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
660 }
661 }
662
663 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
665
666 self.with_rocksdb_batch(|batch| {
668 let mut tx_hash_writer =
669 EitherWriter::new_transaction_hash_numbers(self, batch)?;
670 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
671 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
672 Ok(((), raw_batch))
673 })?;
674 self.metrics.record_duration(
675 metrics::Action::InsertTransactionHashNumbers,
676 start.elapsed(),
677 );
678 }
679
680 for (i, block) in blocks.iter().enumerate() {
681 let recovered_block = block.recovered_block();
682
683 let start = Instant::now();
684 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
685 timings.insert_block += start.elapsed();
686
687 if save_mode.with_state() {
688 let execution_output = block.execution_outcome();
689
690 let start = Instant::now();
694 self.write_state(
695 WriteStateInput::Single {
696 outcome: execution_output,
697 block: recovered_block.number(),
698 },
699 OriginalValuesKnown::No,
700 StateWriteConfig {
701 write_receipts: !sf_ctx.write_receipts,
702 write_account_changesets: !sf_ctx.write_account_changesets,
703 write_storage_changesets: !sf_ctx.write_storage_changesets,
704 },
705 )?;
706 timings.write_state += start.elapsed();
707 }
708 }
709
710 if save_mode.with_state() {
713 let start = Instant::now();
715 let merged_hashed_state = HashedPostStateSorted::merge_batch(
716 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
717 );
718 if !merged_hashed_state.is_empty() {
719 self.write_hashed_state(&merged_hashed_state)?;
720 }
721 timings.write_hashed_state += start.elapsed();
722
723 let start = Instant::now();
724 let merged_trie =
725 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
726 if !merged_trie.is_empty() {
727 self.write_trie_updates_sorted(&merged_trie)?;
728 }
729 timings.write_trie_updates += start.elapsed();
730 }
731
732 if save_mode.with_state() {
734 let start = Instant::now();
735 self.update_history_indices(first_number..=last_block_number)?;
736 timings.update_history_indices = start.elapsed();
737 }
738
739 let start = Instant::now();
741 self.update_pipeline_stages(last_block_number, false)?;
742 timings.update_pipeline_stages = start.elapsed();
743
744 timings.mdbx = mdbx_start.elapsed();
745
746 Ok::<_, ProviderError>(())
747 })?;
748
749 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
751
752 if rocksdb_enabled {
753 timings.rocksdb = rocksdb_result.ok_or_else(|| {
754 ProviderError::Database(reth_db_api::DatabaseError::Other(
755 "RocksDB thread panicked".into(),
756 ))
757 })??;
758 }
759
760 timings.total = total_start.elapsed();
761
762 self.metrics.record_save_blocks(&timings);
763 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
764
765 Ok(())
766 }
767
768 #[instrument(level = "debug", target = "providers::db", skip_all)]
772 fn insert_block_mdbx_only(
773 &self,
774 block: &RecoveredBlock<BlockTy<N>>,
775 first_tx_num: TxNumber,
776 ) -> ProviderResult<StoredBlockBodyIndices> {
777 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
778 EitherWriterDestination::senders(self).is_database()
779 {
780 let start = Instant::now();
781 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
782 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
783 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
784 cursor.append(tx_num, &sender)?;
785 }
786 self.metrics
787 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
788 }
789
790 let block_number = block.number();
791 let tx_count = block.body().transaction_count() as u64;
792
793 let start = Instant::now();
794 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
795 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
796
797 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
798
799 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
800 }
801
802 fn write_block_body_indices(
805 &self,
806 block_number: BlockNumber,
807 body: &BodyTy<N>,
808 first_tx_num: TxNumber,
809 tx_count: u64,
810 ) -> ProviderResult<()> {
811 let start = Instant::now();
813 self.tx
814 .cursor_write::<tables::BlockBodyIndices>()?
815 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
816 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
817
818 if tx_count > 0 {
820 let start = Instant::now();
821 self.tx
822 .cursor_write::<tables::TransactionBlocks>()?
823 .append(first_tx_num + tx_count - 1, &block_number)?;
824 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
825 }
826
827 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
829
830 Ok(())
831 }
832
833 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
838 let changed_accounts = self.account_changesets_range(from..)?;
839
840 self.unwind_account_hashing(changed_accounts.iter())?;
842
843 self.unwind_account_history_indices(changed_accounts.iter())?;
845
846 let changed_storages = self.storage_changesets_range(from..)?;
847
848 self.unwind_storage_hashing(changed_storages.iter().copied())?;
850
851 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
853
854 let db_tip_block = self
857 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
858 .as_ref()
859 .map(|chk| chk.block_number)
860 .ok_or_else(|| ProviderError::InsufficientChangesets {
861 requested: from,
862 available: 0..=0,
863 })?;
864
865 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
866 self.write_trie_updates_sorted(&trie_revert)?;
867
868 Ok(())
869 }
870
871 fn remove_receipts_from(
873 &self,
874 from_tx: TxNumber,
875 last_block: BlockNumber,
876 ) -> ProviderResult<()> {
877 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
879
880 if EitherWriter::receipts_destination(self).is_static_file() {
881 let static_file_receipt_num =
882 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
883
884 let to_delete = static_file_receipt_num
885 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
886 .unwrap_or_default();
887
888 self.static_file_provider
889 .latest_writer(StaticFileSegment::Receipts)?
890 .prune_receipts(to_delete, last_block)?;
891 }
892
893 Ok(())
894 }
895
896 fn write_bytecodes(
898 &self,
899 bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
900 ) -> ProviderResult<()> {
901 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
902 for (hash, bytecode) in bytecodes {
903 bytecodes_cursor.upsert(hash, &bytecode)?;
904 }
905 Ok(())
906 }
907}
908
909impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
910 fn try_into_history_at_block(
911 self,
912 mut block_number: BlockNumber,
913 ) -> ProviderResult<StateProviderBox> {
914 let best_block = self.best_block_number().unwrap_or_default();
915
916 if block_number > best_block {
918 return Err(ProviderError::BlockNotExecuted {
919 requested: block_number,
920 executed: best_block,
921 });
922 }
923
924 if block_number == best_block {
926 return Ok(Box::new(LatestStateProvider::new(self)));
927 }
928
929 block_number += 1;
931
932 let account_history_prune_checkpoint =
933 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
934 let storage_history_prune_checkpoint =
935 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
936
937 let mut state_provider = HistoricalStateProvider::new(self, block_number);
938
939 if let Some(prune_checkpoint_block_number) =
942 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
943 {
944 state_provider = state_provider.with_lowest_available_account_history_block_number(
945 prune_checkpoint_block_number + 1,
946 );
947 }
948 if let Some(prune_checkpoint_block_number) =
949 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
950 {
951 state_provider = state_provider.with_lowest_available_storage_history_block_number(
952 prune_checkpoint_block_number + 1,
953 );
954 }
955
956 Ok(Box::new(state_provider))
957 }
958}
959
960fn unwind_history_shards<S, T, C>(
975 cursor: &mut C,
976 start_key: T::Key,
977 block_number: BlockNumber,
978 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
979) -> ProviderResult<Vec<u64>>
980where
981 T: Table<Value = BlockNumberList>,
982 T::Key: AsRef<ShardedKey<S>>,
983 C: DbCursorRO<T> + DbCursorRW<T>,
984{
985 let mut item = cursor.seek_exact(start_key)?;
987 while let Some((sharded_key, list)) = item {
988 if !shard_belongs_to_key(&sharded_key) {
990 break
991 }
992
993 cursor.delete_current()?;
996
997 let first = list.iter().next().expect("List can't be empty");
1000
1001 if first >= block_number {
1004 item = cursor.prev()?;
1005 continue
1006 }
1007 else if block_number <= sharded_key.as_ref().highest_block_number {
1010 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1013 }
1014 return Ok(list.iter().collect::<Vec<_>>())
1017 }
1018
1019 Ok(Vec::new())
1021}
1022
1023impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1024 #[expect(clippy::too_many_arguments)]
1026 pub fn new(
1027 tx: TX,
1028 chain_spec: Arc<N::ChainSpec>,
1029 static_file_provider: StaticFileProvider<N::Primitives>,
1030 prune_modes: PruneModes,
1031 storage: Arc<N::Storage>,
1032 storage_settings: Arc<RwLock<StorageSettings>>,
1033 rocksdb_provider: RocksDBProvider,
1034 changeset_cache: ChangesetCache,
1035 runtime: reth_tasks::Runtime,
1036 db_path: PathBuf,
1037 ) -> Self {
1038 Self {
1039 tx,
1040 chain_spec,
1041 static_file_provider,
1042 prune_modes,
1043 storage,
1044 storage_settings,
1045 rocksdb_provider,
1046 changeset_cache,
1047 runtime,
1048 db_path,
1049 pending_rocksdb_batches: Default::default(),
1050 commit_order: CommitOrder::Normal,
1051 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1052 metrics: metrics::DatabaseProviderMetrics::default(),
1053 reader_txn_tracker: None,
1054 }
1055 }
1056
1057 pub fn into_tx(self) -> TX {
1059 self.tx
1060 }
1061
1062 pub const fn tx_mut(&mut self) -> &mut TX {
1064 &mut self.tx
1065 }
1066
1067 pub const fn tx_ref(&self) -> &TX {
1069 &self.tx
1070 }
1071
1072 pub fn chain_spec(&self) -> &N::ChainSpec {
1074 &self.chain_spec
1075 }
1076}
1077
1078impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1079 fn recovered_block<H, HF, B, BF>(
1080 &self,
1081 id: BlockHashOrNumber,
1082 _transaction_kind: TransactionVariant,
1083 header_by_number: HF,
1084 construct_block: BF,
1085 ) -> ProviderResult<Option<B>>
1086 where
1087 H: AsRef<HeaderTy<N>>,
1088 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1089 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1090 {
1091 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1092 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1093
1094 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1101
1102 let tx_range = body.tx_num_range();
1103
1104 let transactions = if tx_range.is_empty() {
1105 vec![]
1106 } else {
1107 self.transactions_by_tx_range(tx_range.clone())?
1108 };
1109
1110 let body = self
1111 .storage
1112 .reader()
1113 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1114 .pop()
1115 .ok_or(ProviderError::InvalidStorageOutput)?;
1116
1117 let senders = if tx_range.is_empty() {
1118 vec![]
1119 } else {
1120 let known_senders: HashMap<TxNumber, Address> =
1121 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1122
1123 let mut senders = Vec::with_capacity(body.transactions().len());
1124 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1125 match known_senders.get(&tx_num) {
1126 None => {
1127 let sender = tx.recover_signer_unchecked()?;
1128 senders.push(sender);
1129 }
1130 Some(sender) => senders.push(*sender),
1131 }
1132 }
1133 senders
1134 };
1135
1136 construct_block(header, body, senders)
1137 }
1138
1139 fn block_range<F, H, HF, R>(
1149 &self,
1150 range: RangeInclusive<BlockNumber>,
1151 headers_range: HF,
1152 mut assemble_block: F,
1153 ) -> ProviderResult<Vec<R>>
1154 where
1155 H: AsRef<HeaderTy<N>>,
1156 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1157 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1158 {
1159 if range.is_empty() {
1160 return Ok(Vec::new())
1161 }
1162
1163 let len = range.end().saturating_sub(*range.start()) as usize + 1;
1164 let mut blocks = Vec::with_capacity(len);
1165
1166 let headers = headers_range(range.clone())?;
1167
1168 let present_headers = self
1174 .block_body_indices_range(range)?
1175 .into_iter()
1176 .map(|b| b.tx_num_range())
1177 .zip(headers)
1178 .collect::<Vec<_>>();
1179
1180 let mut inputs = Vec::with_capacity(present_headers.len());
1181 for (tx_range, header) in &present_headers {
1182 let transactions = if tx_range.is_empty() {
1183 Vec::new()
1184 } else {
1185 self.transactions_by_tx_range(tx_range.clone())?
1186 };
1187
1188 inputs.push((header.as_ref(), transactions));
1189 }
1190
1191 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1192
1193 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1194 blocks.push(assemble_block(header, body, tx_range)?);
1195 }
1196
1197 Ok(blocks)
1198 }
1199
1200 fn block_with_senders_range<H, HF, B, BF>(
1211 &self,
1212 range: RangeInclusive<BlockNumber>,
1213 headers_range: HF,
1214 assemble_block: BF,
1215 ) -> ProviderResult<Vec<B>>
1216 where
1217 H: AsRef<HeaderTy<N>>,
1218 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1219 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1220 {
1221 self.block_range(range, headers_range, |header, body, tx_range| {
1222 let senders = if tx_range.is_empty() {
1223 Vec::new()
1224 } else {
1225 let known_senders: HashMap<TxNumber, Address> =
1226 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1227
1228 let mut senders = Vec::with_capacity(body.transactions().len());
1229 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1230 match known_senders.get(&tx_num) {
1231 None => {
1232 let sender = tx.recover_signer_unchecked()?;
1234 senders.push(sender);
1235 }
1236 Some(sender) => senders.push(*sender),
1237 }
1238 }
1239
1240 senders
1241 };
1242
1243 assemble_block(header, body, senders)
1244 })
1245 }
1246
1247 fn populate_bundle_state<A, S>(
1251 &self,
1252 account_changeset: Vec<(u64, AccountBeforeTx)>,
1253 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1254 plain_accounts_cursor: &mut A,
1255 plain_storage_cursor: &mut S,
1256 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1257 where
1258 A: DbCursorRO<PlainAccountState>,
1259 S: DbDupCursorRO<PlainStorageState>,
1260 {
1261 let mut state: BundleStateInit = HashMap::default();
1265
1266 let mut reverts: RevertsInit = HashMap::default();
1272
1273 for (block_number, account_before) in account_changeset.into_iter().rev() {
1275 let AccountBeforeTx { info: old_info, address } = account_before;
1276 match state.entry(address) {
1277 hash_map::Entry::Vacant(entry) => {
1278 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1279 entry.insert((old_info, new_info, HashMap::default()));
1280 }
1281 hash_map::Entry::Occupied(mut entry) => {
1282 entry.get_mut().0 = old_info;
1284 }
1285 }
1286 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1288 }
1289
1290 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1292 let BlockNumberAddress((block_number, address)) = block_and_address;
1293 let account_state = match state.entry(address) {
1295 hash_map::Entry::Vacant(entry) => {
1296 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1297 entry.insert((present_info, present_info, HashMap::default()))
1298 }
1299 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1300 };
1301
1302 match account_state.2.entry(old_storage.key) {
1304 hash_map::Entry::Vacant(entry) => {
1305 let new_storage = plain_storage_cursor
1306 .seek_by_key_subkey(address, old_storage.key)?
1307 .filter(|storage| storage.key == old_storage.key)
1308 .unwrap_or_default();
1309 entry.insert((old_storage.value, new_storage.value));
1310 }
1311 hash_map::Entry::Occupied(mut entry) => {
1312 entry.get_mut().0 = old_storage.value;
1313 }
1314 };
1315
1316 reverts
1317 .entry(block_number)
1318 .or_default()
1319 .entry(address)
1320 .or_default()
1321 .1
1322 .push(old_storage);
1323 }
1324
1325 Ok((state, reverts))
1326 }
1327
1328 fn populate_bundle_state_hashed(
1333 &self,
1334 account_changeset: Vec<(u64, AccountBeforeTx)>,
1335 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1336 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1337 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1338 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1339 let mut state: BundleStateInit = HashMap::default();
1340 let mut reverts: RevertsInit = HashMap::default();
1341
1342 for (block_number, account_before) in account_changeset.into_iter().rev() {
1344 let AccountBeforeTx { info: old_info, address } = account_before;
1345 match state.entry(address) {
1346 hash_map::Entry::Vacant(entry) => {
1347 let hashed_address = keccak256(address);
1348 let new_info =
1349 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1350 entry.insert((old_info, new_info, HashMap::default()));
1351 }
1352 hash_map::Entry::Occupied(mut entry) => {
1353 entry.get_mut().0 = old_info;
1354 }
1355 }
1356 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1357 }
1358
1359 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1361 let BlockNumberAddress((block_number, address)) = block_and_address;
1362 let account_state = match state.entry(address) {
1363 hash_map::Entry::Vacant(entry) => {
1364 let hashed_address = keccak256(address);
1365 let present_info =
1366 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1367 entry.insert((present_info, present_info, HashMap::default()))
1368 }
1369 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1370 };
1371
1372 let hashed_storage_key = keccak256(old_storage.key);
1374 match account_state.2.entry(old_storage.key) {
1375 hash_map::Entry::Vacant(entry) => {
1376 let hashed_address = keccak256(address);
1377 let new_storage = hashed_storage_cursor
1378 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
1379 .filter(|storage| storage.key == hashed_storage_key)
1380 .unwrap_or_default();
1381 entry.insert((old_storage.value, new_storage.value));
1382 }
1383 hash_map::Entry::Occupied(mut entry) => {
1384 entry.get_mut().0 = old_storage.value;
1385 }
1386 };
1387
1388 reverts
1389 .entry(block_number)
1390 .or_default()
1391 .entry(address)
1392 .or_default()
1393 .1
1394 .push(old_storage);
1395 }
1396
1397 Ok((state, reverts))
1398 }
1399}
1400
1401impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1402 fn append_history_index<P, T>(
1410 &self,
1411 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1412 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1413 ) -> ProviderResult<()>
1414 where
1415 P: Copy,
1416 T: Table<Value = BlockNumberList>,
1417 {
1418 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1421
1422 let mut cursor = self.tx.cursor_write::<T>()?;
1423
1424 for (partial_key, indices) in index_updates {
1425 let last_key = sharded_key_factory(partial_key, u64::MAX);
1426 let mut last_shard = cursor
1427 .seek_exact(last_key.clone())?
1428 .map(|(_, list)| list)
1429 .unwrap_or_else(BlockNumberList::empty);
1430
1431 last_shard.append(indices).map_err(ProviderError::other)?;
1432
1433 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1435 cursor.upsert(last_key, &last_shard)?;
1436 continue;
1437 }
1438
1439 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1441 let mut chunks_peekable = chunks.into_iter().peekable();
1442
1443 while let Some(chunk) = chunks_peekable.next() {
1444 let shard = BlockNumberList::new_pre_sorted(chunk);
1445 let highest_block_number = if chunks_peekable.peek().is_some() {
1446 shard.iter().next_back().expect("`chunks` does not return empty list")
1447 } else {
1448 u64::MAX
1450 };
1451
1452 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1453 }
1454 }
1455
1456 Ok(())
1457 }
1458}
1459
1460impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1461 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1462 if self.cached_storage_settings().use_hashed_state() {
1463 let hashed_address = keccak256(address);
1464 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1465 } else {
1466 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1467 }
1468 }
1469}
1470
1471impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1472 fn changed_accounts_with_range(
1473 &self,
1474 range: RangeInclusive<BlockNumber>,
1475 ) -> ProviderResult<BTreeSet<Address>> {
1476 let mut reader = EitherReader::new_account_changesets(self)?;
1477
1478 reader.changed_accounts_with_range(range)
1479 }
1480
1481 fn basic_accounts(
1482 &self,
1483 iter: impl IntoIterator<Item = Address>,
1484 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1485 if self.cached_storage_settings().use_hashed_state() {
1486 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1487 Ok(iter
1488 .into_iter()
1489 .map(|address| {
1490 let hashed_address = keccak256(address);
1491 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1492 })
1493 .collect::<Result<Vec<_>, _>>()?)
1494 } else {
1495 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1496 Ok(iter
1497 .into_iter()
1498 .map(|address| {
1499 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1500 })
1501 .collect::<Result<Vec<_>, _>>()?)
1502 }
1503 }
1504
1505 fn changed_accounts_and_blocks_with_range(
1506 &self,
1507 range: RangeInclusive<BlockNumber>,
1508 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1509 let highest_static_block = self
1510 .static_file_provider
1511 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1512
1513 if let Some(highest) = highest_static_block &&
1514 self.cached_storage_settings().storage_v2
1515 {
1516 let start = *range.start();
1517 let static_end = (*range.end()).min(highest);
1518
1519 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1520 if start <= static_end {
1521 for block in start..=static_end {
1522 let block_changesets = self.account_block_changeset(block)?;
1523 for changeset in block_changesets {
1524 changed_accounts_and_blocks
1525 .entry(changeset.address)
1526 .or_default()
1527 .push(block);
1528 }
1529 }
1530 }
1531
1532 Ok(changed_accounts_and_blocks)
1533 } else {
1534 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1535
1536 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1537 BTreeMap::new(),
1538 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1539 let (index, account) = entry?;
1540 accounts.entry(account.address).or_default().push(index);
1541 Ok(accounts)
1542 },
1543 )?;
1544
1545 Ok(account_transitions)
1546 }
1547 }
1548}
1549
1550impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1551 fn storage_changeset(
1552 &self,
1553 block_number: BlockNumber,
1554 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1555 if self.cached_storage_settings().storage_v2 {
1556 self.static_file_provider.storage_changeset(block_number)
1557 } else {
1558 let range = block_number..=block_number;
1559 let storage_range = BlockNumberAddress::range(range);
1560 self.tx
1561 .cursor_dup_read::<tables::StorageChangeSets>()?
1562 .walk_range(storage_range)?
1563 .map(|r| {
1564 let (bna, entry) = r?;
1565 Ok((bna, entry))
1566 })
1567 .collect()
1568 }
1569 }
1570
1571 fn get_storage_before_block(
1572 &self,
1573 block_number: BlockNumber,
1574 address: Address,
1575 storage_key: B256,
1576 ) -> ProviderResult<Option<StorageEntry>> {
1577 if self.cached_storage_settings().storage_v2 {
1578 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1579 } else {
1580 Ok(self
1581 .tx
1582 .cursor_dup_read::<tables::StorageChangeSets>()?
1583 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1584 .filter(|entry| entry.key == storage_key))
1585 }
1586 }
1587
1588 fn storage_changesets_range(
1589 &self,
1590 range: impl RangeBounds<BlockNumber>,
1591 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1592 if self.cached_storage_settings().storage_v2 {
1593 self.static_file_provider.storage_changesets_range(range)
1594 } else {
1595 self.tx
1596 .cursor_dup_read::<tables::StorageChangeSets>()?
1597 .walk_range(BlockNumberAddressRange::from(range))?
1598 .map(|r| {
1599 let (bna, entry) = r?;
1600 Ok((bna, entry))
1601 })
1602 .collect()
1603 }
1604 }
1605}
1606
1607impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1608 fn account_block_changeset(
1609 &self,
1610 block_number: BlockNumber,
1611 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1612 if self.cached_storage_settings().storage_v2 {
1613 let static_changesets =
1614 self.static_file_provider.account_block_changeset(block_number)?;
1615 Ok(static_changesets)
1616 } else {
1617 let range = block_number..=block_number;
1618 self.tx
1619 .cursor_read::<tables::AccountChangeSets>()?
1620 .walk_range(range)?
1621 .map(|result| -> ProviderResult<_> {
1622 let (_, account_before) = result?;
1623 Ok(account_before)
1624 })
1625 .collect()
1626 }
1627 }
1628
1629 fn get_account_before_block(
1630 &self,
1631 block_number: BlockNumber,
1632 address: Address,
1633 ) -> ProviderResult<Option<AccountBeforeTx>> {
1634 if self.cached_storage_settings().storage_v2 {
1635 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1636 } else {
1637 self.tx
1638 .cursor_dup_read::<tables::AccountChangeSets>()?
1639 .seek_by_key_subkey(block_number, address)?
1640 .filter(|acc| acc.address == address)
1641 .map(Ok)
1642 .transpose()
1643 }
1644 }
1645
1646 fn account_changesets_range(
1647 &self,
1648 range: impl core::ops::RangeBounds<BlockNumber>,
1649 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1650 if self.cached_storage_settings().storage_v2 {
1651 self.static_file_provider.account_changesets_range(range)
1652 } else {
1653 self.tx
1654 .cursor_read::<tables::AccountChangeSets>()?
1655 .walk_range(to_range(range))?
1656 .map(|r| r.map_err(Into::into))
1657 .collect()
1658 }
1659 }
1660}
1661
1662impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1663 for DatabaseProvider<TX, N>
1664{
1665 type Header = HeaderTy<N>;
1666
1667 fn local_tip_header(
1668 &self,
1669 highest_uninterrupted_block: BlockNumber,
1670 ) -> ProviderResult<SealedHeader<Self::Header>> {
1671 let static_file_provider = self.static_file_provider();
1672
1673 let next_static_file_block_num = static_file_provider
1676 .get_highest_static_file_block(StaticFileSegment::Headers)
1677 .map(|id| id + 1)
1678 .unwrap_or_default();
1679 let next_block = highest_uninterrupted_block + 1;
1680
1681 match next_static_file_block_num.cmp(&next_block) {
1682 Ordering::Greater => {
1685 let mut static_file_producer =
1686 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1687 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1688 static_file_producer.commit()?
1691 }
1692 Ordering::Less => {
1693 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1695 }
1696 Ordering::Equal => {}
1697 }
1698
1699 let local_head = static_file_provider
1700 .sealed_header(highest_uninterrupted_block)?
1701 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1702
1703 Ok(local_head)
1704 }
1705}
1706
1707impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1708 type Header = HeaderTy<N>;
1709
1710 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1711 if let Some(num) = self.block_number(block_hash)? {
1712 Ok(self.header_by_number(num)?)
1713 } else {
1714 Ok(None)
1715 }
1716 }
1717
1718 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1719 self.static_file_provider.header_by_number(num)
1720 }
1721
1722 fn headers_range(
1723 &self,
1724 range: impl RangeBounds<BlockNumber>,
1725 ) -> ProviderResult<Vec<Self::Header>> {
1726 self.static_file_provider.headers_range(range)
1727 }
1728
1729 fn sealed_header(
1730 &self,
1731 number: BlockNumber,
1732 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1733 self.static_file_provider.sealed_header(number)
1734 }
1735
1736 fn sealed_headers_while(
1737 &self,
1738 range: impl RangeBounds<BlockNumber>,
1739 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1740 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1741 self.static_file_provider.sealed_headers_while(range, predicate)
1742 }
1743}
1744
1745impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1746 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1747 self.static_file_provider.block_hash(number)
1748 }
1749
1750 fn canonical_hashes_range(
1751 &self,
1752 start: BlockNumber,
1753 end: BlockNumber,
1754 ) -> ProviderResult<Vec<B256>> {
1755 self.static_file_provider.canonical_hashes_range(start, end)
1756 }
1757}
1758
1759impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1760 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1761 let best_number = self.best_block_number()?;
1762 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1763 Ok(ChainInfo { best_hash, best_number })
1764 }
1765
1766 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1767 Ok(self
1770 .get_stage_checkpoint(StageId::Finish)?
1771 .map(|checkpoint| checkpoint.block_number)
1772 .unwrap_or_default())
1773 }
1774
1775 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1776 self.static_file_provider.last_block_number()
1777 }
1778
1779 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1780 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1781 }
1782}
1783
1784impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1785 type Block = BlockTy<N>;
1786
1787 fn find_block_by_hash(
1788 &self,
1789 hash: B256,
1790 source: BlockSource,
1791 ) -> ProviderResult<Option<Self::Block>> {
1792 if source.is_canonical() {
1793 self.block(hash.into())
1794 } else {
1795 Ok(None)
1796 }
1797 }
1798
1799 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1807 if let Some(number) = self.convert_hash_or_number(id)? {
1808 let earliest_available = self.static_file_provider.earliest_history_height();
1809 if number < earliest_available {
1810 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1811 }
1812
1813 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1814
1815 let Some(transactions) = self.transactions_by_block(number.into())? else {
1820 return Ok(None)
1821 };
1822
1823 let body = self
1824 .storage
1825 .reader()
1826 .read_block_bodies(self, vec![(&header, transactions)])?
1827 .pop()
1828 .ok_or(ProviderError::InvalidStorageOutput)?;
1829
1830 return Ok(Some(Self::Block::new(header, body)))
1831 }
1832
1833 Ok(None)
1834 }
1835
1836 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1837 Ok(None)
1838 }
1839
1840 fn pending_block_and_receipts(
1841 &self,
1842 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1843 Ok(None)
1844 }
1845
1846 fn recovered_block(
1855 &self,
1856 id: BlockHashOrNumber,
1857 transaction_kind: TransactionVariant,
1858 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1859 self.recovered_block(
1860 id,
1861 transaction_kind,
1862 |block_number| self.header_by_number(block_number),
1863 |header, body, senders| {
1864 Self::Block::new(header, body)
1865 .try_into_recovered_unchecked(senders)
1869 .map(Some)
1870 .map_err(|_| ProviderError::SenderRecoveryError)
1871 },
1872 )
1873 }
1874
1875 fn sealed_block_with_senders(
1876 &self,
1877 id: BlockHashOrNumber,
1878 transaction_kind: TransactionVariant,
1879 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1880 self.recovered_block(
1881 id,
1882 transaction_kind,
1883 |block_number| self.sealed_header(block_number),
1884 |header, body, senders| {
1885 Self::Block::new_sealed(header, body)
1886 .try_with_senders_unchecked(senders)
1890 .map(Some)
1891 .map_err(|_| ProviderError::SenderRecoveryError)
1892 },
1893 )
1894 }
1895
1896 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1897 self.block_range(
1898 range,
1899 |range| self.headers_range(range),
1900 |header, body, _| Ok(Self::Block::new(header, body)),
1901 )
1902 }
1903
1904 fn block_with_senders_range(
1905 &self,
1906 range: RangeInclusive<BlockNumber>,
1907 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1908 self.block_with_senders_range(
1909 range,
1910 |range| self.headers_range(range),
1911 |header, body, senders| {
1912 Self::Block::new(header, body)
1913 .try_into_recovered_unchecked(senders)
1914 .map_err(|_| ProviderError::SenderRecoveryError)
1915 },
1916 )
1917 }
1918
1919 fn recovered_block_range(
1920 &self,
1921 range: RangeInclusive<BlockNumber>,
1922 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1923 self.block_with_senders_range(
1924 range,
1925 |range| self.sealed_headers_range(range),
1926 |header, body, senders| {
1927 Self::Block::new_sealed(header, body)
1928 .try_with_senders(senders)
1929 .map_err(|_| ProviderError::SenderRecoveryError)
1930 },
1931 )
1932 }
1933
1934 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1935 Ok(self
1936 .tx
1937 .cursor_read::<tables::TransactionBlocks>()?
1938 .seek(id)
1939 .map(|b| b.map(|(_, bn)| bn))?)
1940 }
1941}
1942
1943impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1944 for DatabaseProvider<TX, N>
1945{
1946 fn transaction_hashes_by_range(
1949 &self,
1950 tx_range: Range<TxNumber>,
1951 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1952 self.static_file_provider.transaction_hashes_by_range(tx_range)
1953 }
1954}
1955
1956impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1958 type Transaction = TxTy<N>;
1959
1960 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1961 self.with_rocksdb_snapshot(|rocksdb_ref| {
1962 let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1963 reader.get_transaction_hash_number(tx_hash)
1964 })
1965 }
1966
1967 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1968 self.static_file_provider.transaction_by_id(id)
1969 }
1970
1971 fn transaction_by_id_unhashed(
1972 &self,
1973 id: TxNumber,
1974 ) -> ProviderResult<Option<Self::Transaction>> {
1975 self.static_file_provider.transaction_by_id_unhashed(id)
1976 }
1977
1978 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1979 if let Some(id) = self.transaction_id(hash)? {
1980 Ok(self.transaction_by_id_unhashed(id)?)
1981 } else {
1982 Ok(None)
1983 }
1984 }
1985
1986 fn transaction_by_hash_with_meta(
1987 &self,
1988 tx_hash: TxHash,
1989 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1990 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1991 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1992 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1993 let Some(sealed_header) = self.sealed_header(block_number)?
1994 {
1995 let (header, block_hash) = sealed_header.split();
1996 if let Some(block_body) = self.block_body_indices(block_number)? {
1997 let index = transaction_id - block_body.first_tx_num();
2002
2003 let meta = TransactionMeta {
2004 tx_hash,
2005 index,
2006 block_hash,
2007 block_number,
2008 base_fee: header.base_fee_per_gas(),
2009 excess_blob_gas: header.excess_blob_gas(),
2010 timestamp: header.timestamp(),
2011 };
2012
2013 return Ok(Some((transaction, meta)))
2014 }
2015 }
2016
2017 Ok(None)
2018 }
2019
2020 fn transactions_by_block(
2021 &self,
2022 id: BlockHashOrNumber,
2023 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2024 if let Some(block_number) = self.convert_hash_or_number(id)? &&
2025 let Some(body) = self.block_body_indices(block_number)?
2026 {
2027 let tx_range = body.tx_num_range();
2028 return if tx_range.is_empty() {
2029 Ok(Some(Vec::new()))
2030 } else {
2031 self.transactions_by_tx_range(tx_range).map(Some)
2032 }
2033 }
2034 Ok(None)
2035 }
2036
2037 fn transactions_by_block_range(
2038 &self,
2039 range: impl RangeBounds<BlockNumber>,
2040 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2041 let range = to_range(range);
2042
2043 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2044 .into_iter()
2045 .map(|body| {
2046 let tx_num_range = body.tx_num_range();
2047 if tx_num_range.is_empty() {
2048 Ok(Vec::new())
2049 } else {
2050 self.transactions_by_tx_range(tx_num_range)
2051 }
2052 })
2053 .collect()
2054 }
2055
2056 fn transactions_by_tx_range(
2057 &self,
2058 range: impl RangeBounds<TxNumber>,
2059 ) -> ProviderResult<Vec<Self::Transaction>> {
2060 self.static_file_provider.transactions_by_tx_range(range)
2061 }
2062
2063 fn senders_by_tx_range(
2064 &self,
2065 range: impl RangeBounds<TxNumber>,
2066 ) -> ProviderResult<Vec<Address>> {
2067 if EitherWriterDestination::senders(self).is_static_file() {
2068 self.static_file_provider.senders_by_tx_range(range)
2069 } else {
2070 self.cursor_read_collect::<tables::TransactionSenders>(range)
2071 }
2072 }
2073
2074 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2075 if EitherWriterDestination::senders(self).is_static_file() {
2076 self.static_file_provider.transaction_sender(id)
2077 } else {
2078 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2079 }
2080 }
2081}
2082
2083impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2084 type Receipt = ReceiptTy<N>;
2085
2086 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2087 self.static_file_provider.get_with_static_file_or_database(
2088 StaticFileSegment::Receipts,
2089 id,
2090 |static_file| static_file.receipt(id),
2091 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2092 )
2093 }
2094
2095 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2096 if let Some(id) = self.transaction_id(hash)? {
2097 self.receipt(id)
2098 } else {
2099 Ok(None)
2100 }
2101 }
2102
2103 fn receipts_by_block(
2104 &self,
2105 block: BlockHashOrNumber,
2106 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2107 if let Some(number) = self.convert_hash_or_number(block)? &&
2108 let Some(body) = self.block_body_indices(number)?
2109 {
2110 let tx_range = body.tx_num_range();
2111 return if tx_range.is_empty() {
2112 Ok(Some(Vec::new()))
2113 } else {
2114 self.receipts_by_tx_range(tx_range).map(Some)
2115 }
2116 }
2117 Ok(None)
2118 }
2119
2120 fn receipts_by_tx_range(
2121 &self,
2122 range: impl RangeBounds<TxNumber>,
2123 ) -> ProviderResult<Vec<Self::Receipt>> {
2124 self.static_file_provider.get_range_with_static_file_or_database(
2125 StaticFileSegment::Receipts,
2126 to_range(range),
2127 |static_file, range, _| static_file.receipts_by_tx_range(range),
2128 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2129 |_| true,
2130 )
2131 }
2132
2133 fn receipts_by_block_range(
2134 &self,
2135 block_range: RangeInclusive<BlockNumber>,
2136 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2137 if block_range.is_empty() {
2138 return Ok(Vec::new());
2139 }
2140
2141 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2143 let mut block_body_indices = Vec::with_capacity(range_len);
2144 for block_num in block_range {
2145 if let Some(indices) = self.block_body_indices(block_num)? {
2146 block_body_indices.push(indices);
2147 } else {
2148 block_body_indices.push(StoredBlockBodyIndices::default());
2150 }
2151 }
2152
2153 if block_body_indices.is_empty() {
2154 return Ok(Vec::new());
2155 }
2156
2157 let non_empty_blocks: Vec<_> =
2159 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2160
2161 if non_empty_blocks.is_empty() {
2162 return Ok(vec![Vec::new(); block_body_indices.len()]);
2164 }
2165
2166 let first_tx = non_empty_blocks[0].first_tx_num();
2168 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2169
2170 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2172 let mut receipts_iter = all_receipts.into_iter();
2173
2174 let mut result = Vec::with_capacity(block_body_indices.len());
2176 for indices in &block_body_indices {
2177 if indices.tx_count == 0 {
2178 result.push(Vec::new());
2179 } else {
2180 let block_receipts =
2181 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2182 result.push(block_receipts);
2183 }
2184 }
2185
2186 Ok(result)
2187 }
2188}
2189
2190impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2191 for DatabaseProvider<TX, N>
2192{
2193 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2194 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2195 }
2196
2197 fn block_body_indices_range(
2198 &self,
2199 range: RangeInclusive<BlockNumber>,
2200 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2201 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2202 }
2203}
2204
2205impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2206 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2207 Ok(if let Some(encoded) = id.get_pre_encoded() {
2208 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2209 } else {
2210 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2211 })
2212 }
2213
2214 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2216 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2217 }
2218
2219 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2220 self.tx
2221 .cursor_read::<tables::StageCheckpoints>()?
2222 .walk(None)?
2223 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2224 .map_err(ProviderError::Database)
2225 }
2226}
2227
2228impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2229 fn save_stage_checkpoint(
2231 &self,
2232 id: StageId,
2233 checkpoint: StageCheckpoint,
2234 ) -> ProviderResult<()> {
2235 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2236 }
2237
2238 fn save_stage_checkpoint_progress(
2240 &self,
2241 id: StageId,
2242 checkpoint: Vec<u8>,
2243 ) -> ProviderResult<()> {
2244 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2245 }
2246
2247 #[instrument(level = "debug", target = "providers::db", skip_all)]
2248 fn update_pipeline_stages(
2249 &self,
2250 block_number: BlockNumber,
2251 drop_stage_checkpoint: bool,
2252 ) -> ProviderResult<()> {
2253 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2255 for stage_id in StageId::ALL {
2256 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2257 cursor.upsert(
2258 stage_id.to_string(),
2259 &StageCheckpoint {
2260 block_number,
2261 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2262 },
2263 )?;
2264 }
2265
2266 Ok(())
2267 }
2268}
2269
2270impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2271 fn plain_state_storages(
2272 &self,
2273 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2274 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2275 if self.cached_storage_settings().use_hashed_state() {
2276 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2277
2278 addresses_with_keys
2279 .into_iter()
2280 .map(|(address, storage)| {
2281 let hashed_address = keccak256(address);
2282 storage
2283 .into_iter()
2284 .map(|key| -> ProviderResult<_> {
2285 let hashed_key = keccak256(key);
2286 let value = hashed_storage
2287 .seek_by_key_subkey(hashed_address, hashed_key)?
2288 .filter(|v| v.key == hashed_key)
2289 .map(|v| v.value)
2290 .unwrap_or_default();
2291 Ok(StorageEntry { key, value })
2292 })
2293 .collect::<ProviderResult<Vec<_>>>()
2294 .map(|storage| (address, storage))
2295 })
2296 .collect::<ProviderResult<Vec<(_, _)>>>()
2297 } else {
2298 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2299
2300 addresses_with_keys
2301 .into_iter()
2302 .map(|(address, storage)| {
2303 storage
2304 .into_iter()
2305 .map(|key| -> ProviderResult<_> {
2306 Ok(plain_storage
2307 .seek_by_key_subkey(address, key)?
2308 .filter(|v| v.key == key)
2309 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2310 })
2311 .collect::<ProviderResult<Vec<_>>>()
2312 .map(|storage| (address, storage))
2313 })
2314 .collect::<ProviderResult<Vec<(_, _)>>>()
2315 }
2316 }
2317
2318 fn changed_storages_with_range(
2319 &self,
2320 range: RangeInclusive<BlockNumber>,
2321 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2322 if self.cached_storage_settings().storage_v2 {
2323 self.storage_changesets_range(range)?.into_iter().try_fold(
2324 BTreeMap::new(),
2325 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2326 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2327 accounts.entry(address).or_default().insert(storage_entry.key);
2328 Ok(accounts)
2329 },
2330 )
2331 } else {
2332 self.tx
2333 .cursor_read::<tables::StorageChangeSets>()?
2334 .walk_range(BlockNumberAddress::range(range))?
2335 .try_fold(
2338 BTreeMap::new(),
2339 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2340 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2341 accounts.entry(address).or_default().insert(storage_entry.key);
2342 Ok(accounts)
2343 },
2344 )
2345 }
2346 }
2347
2348 fn changed_storages_and_blocks_with_range(
2349 &self,
2350 range: RangeInclusive<BlockNumber>,
2351 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2352 if self.cached_storage_settings().storage_v2 {
2353 self.storage_changesets_range(range)?.into_iter().try_fold(
2354 BTreeMap::new(),
2355 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2356 storages
2357 .entry((index.address(), storage.key))
2358 .or_default()
2359 .push(index.block_number());
2360 Ok(storages)
2361 },
2362 )
2363 } else {
2364 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2365
2366 let storage_changeset_lists =
2367 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2368 BTreeMap::new(),
2369 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2370 entry|
2371 -> ProviderResult<_> {
2372 let (index, storage) = entry?;
2373 storages
2374 .entry((index.address(), storage.key))
2375 .or_default()
2376 .push(index.block_number());
2377 Ok(storages)
2378 },
2379 )?;
2380
2381 Ok(storage_changeset_lists)
2382 }
2383 }
2384}
2385
2386impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2387 for DatabaseProvider<TX, N>
2388{
2389 type Receipt = ReceiptTy<N>;
2390
2391 #[instrument(level = "debug", target = "providers::db", skip_all)]
2392 fn write_state<'a>(
2393 &self,
2394 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2395 is_value_known: OriginalValuesKnown,
2396 config: StateWriteConfig,
2397 ) -> ProviderResult<()> {
2398 let execution_outcome = execution_outcome.into();
2399
2400 if self.cached_storage_settings().use_hashed_state() &&
2401 !config.write_receipts &&
2402 !config.write_account_changesets &&
2403 !config.write_storage_changesets
2404 {
2405 self.write_bytecodes(
2409 execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2410 )?;
2411 return Ok(());
2412 }
2413
2414 let first_block = execution_outcome.first_block();
2415 let (plain_state, reverts) =
2416 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2417
2418 self.write_state_reverts(reverts, first_block, config)?;
2419 self.write_state_changes(plain_state)?;
2420
2421 if !config.write_receipts {
2422 return Ok(());
2423 }
2424
2425 let block_count = execution_outcome.len() as u64;
2426 let last_block = execution_outcome.last_block();
2427 let block_range = first_block..=last_block;
2428
2429 let tip = self.last_block_number()?.max(last_block);
2430
2431 let block_indices: Vec<_> = self
2433 .block_body_indices_range(block_range)?
2434 .into_iter()
2435 .map(|b| b.first_tx_num)
2436 .collect();
2437
2438 if block_indices.len() < block_count as usize {
2440 let missing_blocks = block_count - block_indices.len() as u64;
2441 return Err(ProviderError::BlockBodyIndicesNotFound(
2442 last_block.saturating_sub(missing_blocks - 1),
2443 ));
2444 }
2445
2446 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2447
2448 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2449 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2450
2451 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2459 self.static_file_provider()
2460 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2461 .is_none()) &&
2462 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2463
2464 let mut allowed_addresses: AddressSet = AddressSet::default();
2466 for (_, addresses) in contract_log_pruner.range(..first_block) {
2467 allowed_addresses.extend(addresses.iter().copied());
2468 }
2469
2470 for (idx, (receipts, first_tx_index)) in
2471 execution_outcome.receipts().zip(block_indices).enumerate()
2472 {
2473 let block_number = first_block + idx as u64;
2474
2475 receipts_writer.increment_block(block_number)?;
2477
2478 if prunable_receipts &&
2480 self.prune_modes
2481 .receipts
2482 .is_some_and(|mode| mode.should_prune(block_number, tip))
2483 {
2484 continue
2485 }
2486
2487 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2489 allowed_addresses.extend(new_addresses.iter().copied());
2490 }
2491
2492 for (idx, receipt) in receipts.iter().enumerate() {
2493 let receipt_idx = first_tx_index + idx as u64;
2494 if prunable_receipts &&
2497 has_contract_log_filter &&
2498 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2499 {
2500 continue
2501 }
2502
2503 receipts_writer.append_receipt(receipt_idx, receipt)?;
2504 }
2505 }
2506
2507 Ok(())
2508 }
2509
2510 fn write_state_reverts(
2511 &self,
2512 reverts: PlainStateReverts,
2513 first_block: BlockNumber,
2514 config: StateWriteConfig,
2515 ) -> ProviderResult<()> {
2516 if config.write_storage_changesets {
2518 tracing::trace!("Writing storage changes");
2519 let mut storages_cursor =
2520 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2521 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2522 let block_number = first_block + block_index as BlockNumber;
2523
2524 tracing::trace!(block_number, "Writing block change");
2525 storage_changes.par_sort_unstable_by_key(|a| a.address);
2527 let total_changes =
2528 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2529 let mut changeset = Vec::with_capacity(total_changes);
2530 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2531 let mut storage = storage_revert
2532 .into_iter()
2533 .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2534 .collect::<Vec<_>>();
2535 storage.par_sort_unstable_by_key(|a| a.0);
2537
2538 let mut wiped_storage = Vec::new();
2546 if wiped {
2547 tracing::trace!(?address, "Wiping storage");
2548 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2549 wiped_storage.push((entry.key, entry.value));
2550 while let Some(entry) = storages_cursor.next_dup_val()? {
2551 wiped_storage.push((entry.key, entry.value))
2552 }
2553 }
2554 }
2555
2556 tracing::trace!(?address, ?storage, "Writing storage reverts");
2557 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2558 changeset.push(StorageBeforeTx { address, key, value });
2559 }
2560 }
2561
2562 let mut storage_changesets_writer =
2563 EitherWriter::new_storage_changesets(self, block_number)?;
2564 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2565 }
2566 }
2567
2568 if !config.write_account_changesets {
2569 return Ok(());
2570 }
2571
2572 tracing::trace!(?first_block, "Writing account changes");
2574 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2575 let block_number = first_block + block_index as BlockNumber;
2576 let changeset = account_block_reverts
2577 .into_iter()
2578 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2579 .collect::<Vec<_>>();
2580 let mut account_changesets_writer =
2581 EitherWriter::new_account_changesets(self, block_number)?;
2582
2583 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2584 }
2585
2586 Ok(())
2587 }
2588
2589 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2590 changes.accounts.par_sort_by_key(|a| a.0);
2593 changes.storage.par_sort_by_key(|a| a.address);
2594 changes.contracts.par_sort_by_key(|a| a.0);
2595
2596 if !self.cached_storage_settings().use_hashed_state() {
2597 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2599 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2600 for (address, account) in changes.accounts {
2602 if let Some(account) = account {
2603 tracing::trace!(?address, "Updating plain state account");
2604 accounts_cursor.upsert(address, &account.into())?;
2605 } else if accounts_cursor.seek_exact(address)?.is_some() {
2606 tracing::trace!(?address, "Deleting plain state account");
2607 accounts_cursor.delete_current()?;
2608 }
2609 }
2610
2611 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2613 let mut storages_cursor =
2614 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2615 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2616 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2618 storages_cursor.delete_current_duplicates()?;
2619 }
2620 let mut storage = storage
2622 .into_iter()
2623 .map(|(k, value)| StorageEntry { key: k.into(), value })
2624 .collect::<Vec<_>>();
2625 storage.par_sort_unstable_by_key(|a| a.key);
2627
2628 for entry in storage {
2629 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2630 if let Some(db_entry) =
2631 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2632 db_entry.key == entry.key
2633 {
2634 storages_cursor.delete_current()?;
2635 }
2636
2637 if !entry.value.is_zero() {
2638 storages_cursor.upsert(address, &entry)?;
2639 }
2640 }
2641 }
2642 }
2643
2644 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2646 self.write_bytecodes(
2647 changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2648 )?;
2649
2650 Ok(())
2651 }
2652
2653 #[instrument(level = "debug", target = "providers::db", skip_all)]
2654 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2655 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2657 for (hashed_address, account) in hashed_state.accounts() {
2658 if let Some(account) = account {
2659 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2660 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2661 hashed_accounts_cursor.delete_current()?;
2662 }
2663 }
2664
2665 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2667 let mut hashed_storage_cursor =
2668 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2669 for (hashed_address, storage) in sorted_storages {
2670 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2671 hashed_storage_cursor.delete_current_duplicates()?;
2672 }
2673
2674 for (hashed_slot, value) in storage.storage_slots_ref() {
2675 let entry = StorageEntry { key: *hashed_slot, value: *value };
2676
2677 if let Some(db_entry) =
2678 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2679 db_entry.key == entry.key
2680 {
2681 hashed_storage_cursor.delete_current()?;
2682 }
2683
2684 if !entry.value.is_zero() {
2685 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2686 }
2687 }
2688 }
2689
2690 Ok(())
2691 }
2692
2693 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2715 let range = block + 1..=self.last_block_number()?;
2716
2717 if range.is_empty() {
2718 return Ok(());
2719 }
2720
2721 let block_bodies = self.block_body_indices_range(range.clone())?;
2723
2724 let from_transaction_num =
2726 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2727
2728 let storage_range = BlockNumberAddress::range(range.clone());
2729 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2730 let changesets = self.storage_changesets_range(range.clone())?;
2731 let mut changeset_writer =
2732 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2733 changeset_writer.prune_storage_changesets(block)?;
2734 changesets
2735 } else {
2736 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2737 };
2738 let account_changeset = if self.cached_storage_settings().storage_v2 {
2739 let changesets = self.account_changesets_range(range)?;
2740 let mut changeset_writer =
2741 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2742 changeset_writer.prune_account_changesets(block)?;
2743 changesets
2744 } else {
2745 self.take::<tables::AccountChangeSets>(range)?
2746 };
2747
2748 if self.cached_storage_settings().use_hashed_state() {
2749 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2750 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2751
2752 let (state, _) = self.populate_bundle_state_hashed(
2753 account_changeset,
2754 storage_changeset,
2755 &mut hashed_accounts_cursor,
2756 &mut hashed_storage_cursor,
2757 )?;
2758
2759 for (address, (old_account, new_account, storage)) in &state {
2760 if old_account != new_account {
2761 let hashed_address = keccak256(address);
2762 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2763 if let Some(account) = old_account {
2764 hashed_accounts_cursor.upsert(hashed_address, account)?;
2765 } else if existing_entry.is_some() {
2766 hashed_accounts_cursor.delete_current()?;
2767 }
2768 }
2769
2770 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2771 let hashed_address = keccak256(address);
2772 let hashed_storage_key = keccak256(storage_key);
2773 let storage_entry =
2774 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2775 if hashed_storage_cursor
2776 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2777 .is_some_and(|s| s.key == hashed_storage_key)
2778 {
2779 hashed_storage_cursor.delete_current()?
2780 }
2781
2782 if !old_storage_value.is_zero() {
2783 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2784 }
2785 }
2786 }
2787 } else {
2788 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2793 let mut plain_storage_cursor =
2794 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2795
2796 let (state, _) = self.populate_bundle_state(
2797 account_changeset,
2798 storage_changeset,
2799 &mut plain_accounts_cursor,
2800 &mut plain_storage_cursor,
2801 )?;
2802
2803 for (address, (old_account, new_account, storage)) in &state {
2804 if old_account != new_account {
2805 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2806 if let Some(account) = old_account {
2807 plain_accounts_cursor.upsert(*address, account)?;
2808 } else if existing_entry.is_some() {
2809 plain_accounts_cursor.delete_current()?;
2810 }
2811 }
2812
2813 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2814 let storage_entry =
2815 StorageEntry { key: *storage_key, value: *old_storage_value };
2816 if plain_storage_cursor
2817 .seek_by_key_subkey(*address, *storage_key)?
2818 .is_some_and(|s| s.key == *storage_key)
2819 {
2820 plain_storage_cursor.delete_current()?
2821 }
2822
2823 if !old_storage_value.is_zero() {
2824 plain_storage_cursor.upsert(*address, &storage_entry)?;
2825 }
2826 }
2827 }
2828 }
2829
2830 self.remove_receipts_from(from_transaction_num, block)?;
2831
2832 Ok(())
2833 }
2834
2835 fn take_state_above(
2857 &self,
2858 block: BlockNumber,
2859 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2860 let range = block + 1..=self.last_block_number()?;
2861
2862 if range.is_empty() {
2863 return Ok(ExecutionOutcome::default())
2864 }
2865 let start_block_number = *range.start();
2866
2867 let block_bodies = self.block_body_indices_range(range.clone())?;
2869
2870 let from_transaction_num =
2872 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2873 let to_transaction_num =
2874 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2875
2876 let storage_range = BlockNumberAddress::range(range.clone());
2877 let storage_changeset = if let Some(highest_block) = self
2878 .static_file_provider
2879 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2880 self.cached_storage_settings().storage_v2
2881 {
2882 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2883 let mut changeset_writer =
2884 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2885 changeset_writer.prune_storage_changesets(block)?;
2886 changesets
2887 } else {
2888 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2889 };
2890
2891 let highest_changeset_block = self
2893 .static_file_provider
2894 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2895 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2896 self.cached_storage_settings().storage_v2
2897 {
2898 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2900 let mut changeset_writer =
2901 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2902 changeset_writer.prune_account_changesets(block)?;
2903
2904 changesets
2905 } else {
2906 self.take::<tables::AccountChangeSets>(range)?
2909 };
2910
2911 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2912 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2913 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2914
2915 let (state, reverts) = self.populate_bundle_state_hashed(
2916 account_changeset,
2917 storage_changeset,
2918 &mut hashed_accounts_cursor,
2919 &mut hashed_storage_cursor,
2920 )?;
2921
2922 for (address, (old_account, new_account, storage)) in &state {
2923 if old_account != new_account {
2924 let hashed_address = keccak256(address);
2925 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2926 if let Some(account) = old_account {
2927 hashed_accounts_cursor.upsert(hashed_address, account)?;
2928 } else if existing_entry.is_some() {
2929 hashed_accounts_cursor.delete_current()?;
2930 }
2931 }
2932
2933 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2934 let hashed_address = keccak256(address);
2935 let hashed_storage_key = keccak256(storage_key);
2936 let storage_entry =
2937 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2938 if hashed_storage_cursor
2939 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2940 .is_some_and(|s| s.key == hashed_storage_key)
2941 {
2942 hashed_storage_cursor.delete_current()?
2943 }
2944
2945 if !old_storage_value.is_zero() {
2946 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2947 }
2948 }
2949 }
2950
2951 (state, reverts)
2952 } else {
2953 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2958 let mut plain_storage_cursor =
2959 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2960
2961 let (state, reverts) = self.populate_bundle_state(
2962 account_changeset,
2963 storage_changeset,
2964 &mut plain_accounts_cursor,
2965 &mut plain_storage_cursor,
2966 )?;
2967
2968 for (address, (old_account, new_account, storage)) in &state {
2969 if old_account != new_account {
2970 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2971 if let Some(account) = old_account {
2972 plain_accounts_cursor.upsert(*address, account)?;
2973 } else if existing_entry.is_some() {
2974 plain_accounts_cursor.delete_current()?;
2975 }
2976 }
2977
2978 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2979 let storage_entry =
2980 StorageEntry { key: *storage_key, value: *old_storage_value };
2981 if plain_storage_cursor
2982 .seek_by_key_subkey(*address, *storage_key)?
2983 .is_some_and(|s| s.key == *storage_key)
2984 {
2985 plain_storage_cursor.delete_current()?
2986 }
2987
2988 if !old_storage_value.is_zero() {
2989 plain_storage_cursor.upsert(*address, &storage_entry)?;
2990 }
2991 }
2992 }
2993
2994 (state, reverts)
2995 };
2996
2997 let mut receipts_iter = self
2999 .static_file_provider
3000 .get_range_with_static_file_or_database(
3001 StaticFileSegment::Receipts,
3002 from_transaction_num..to_transaction_num + 1,
3003 |static_file, range, _| {
3004 static_file
3005 .receipts_by_tx_range(range.clone())
3006 .map(|r| range.into_iter().zip(r).collect())
3007 },
3008 |range, _| {
3009 self.tx
3010 .cursor_read::<tables::Receipts<Self::Receipt>>()?
3011 .walk_range(range)?
3012 .map(|r| r.map_err(Into::into))
3013 .collect()
3014 },
3015 |_| true,
3016 )?
3017 .into_iter()
3018 .peekable();
3019
3020 let mut receipts = Vec::with_capacity(block_bodies.len());
3021 for block_body in block_bodies {
3023 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3024 for num in block_body.tx_num_range() {
3025 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3026 block_receipts.push(receipts_iter.next().unwrap().1);
3027 }
3028 }
3029 receipts.push(block_receipts);
3030 }
3031
3032 self.remove_receipts_from(from_transaction_num, block)?;
3033
3034 Ok(ExecutionOutcome::new_init(
3035 state,
3036 reverts,
3037 Vec::new(),
3038 receipts,
3039 start_block_number,
3040 Vec::new(),
3041 ))
3042 }
3043}
3044
3045impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3046 fn write_account_trie_updates<A: TrieTableAdapter>(
3047 tx: &TX,
3048 trie_updates: &TrieUpdatesSorted,
3049 num_entries: &mut usize,
3050 ) -> ProviderResult<()>
3051 where
3052 TX: DbTxMut,
3053 {
3054 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3055 for (key, updated_node) in trie_updates.account_nodes_ref() {
3057 let nibbles = A::AccountKey::from(*key);
3058 match updated_node {
3059 Some(node) => {
3060 if !key.is_empty() {
3061 *num_entries += 1;
3062 account_trie_cursor.upsert(nibbles, node)?;
3063 }
3064 }
3065 None => {
3066 *num_entries += 1;
3067 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3068 account_trie_cursor.delete_current()?;
3069 }
3070 }
3071 }
3072 }
3073 Ok(())
3074 }
3075
3076 fn write_storage_tries<A: TrieTableAdapter>(
3077 tx: &TX,
3078 storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3079 num_entries: &mut usize,
3080 ) -> ProviderResult<()>
3081 where
3082 TX: DbTxMut,
3083 {
3084 let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3085 for (hashed_address, storage_trie_updates) in storage_tries {
3086 let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3087 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3088 *num_entries +=
3089 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3090 cursor = db_storage_trie_cursor.cursor;
3091 }
3092 Ok(())
3093 }
3094}
3095
3096impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3097 #[instrument(level = "debug", target = "providers::db", skip_all)]
3101 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3102 if trie_updates.is_empty() {
3103 return Ok(0)
3104 }
3105
3106 let mut num_entries = 0;
3108
3109 reth_trie_db::with_adapter!(self, |A| {
3110 Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3111 });
3112
3113 num_entries +=
3114 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3115
3116 Ok(num_entries)
3117 }
3118}
3119
3120impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3121 fn write_storage_trie_updates_sorted<'a>(
3127 &self,
3128 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3129 ) -> ProviderResult<usize> {
3130 let mut num_entries = 0;
3131 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3132 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3133 reth_trie_db::with_adapter!(self, |A| {
3134 Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3135 });
3136 Ok(num_entries)
3137 }
3138}
3139
3140impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3141 fn unwind_account_hashing<'a>(
3142 &self,
3143 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3144 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3145 let hashed_accounts = changesets
3149 .into_iter()
3150 .map(|(_, e)| (keccak256(e.address), e.info))
3151 .collect::<Vec<_>>()
3152 .into_iter()
3153 .rev()
3154 .collect::<BTreeMap<_, _>>();
3155
3156 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3158 for (hashed_address, account) in &hashed_accounts {
3159 if let Some(account) = account {
3160 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3161 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3162 hashed_accounts_cursor.delete_current()?;
3163 }
3164 }
3165
3166 Ok(hashed_accounts)
3167 }
3168
3169 fn unwind_account_hashing_range(
3170 &self,
3171 range: impl RangeBounds<BlockNumber>,
3172 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3173 let changesets = self.account_changesets_range(range)?;
3174 self.unwind_account_hashing(changesets.iter())
3175 }
3176
3177 fn insert_account_for_hashing(
3178 &self,
3179 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3180 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3181 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3182 let hashed_accounts =
3183 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3184 for (hashed_address, account) in &hashed_accounts {
3185 if let Some(account) = account {
3186 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3187 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3188 hashed_accounts_cursor.delete_current()?;
3189 }
3190 }
3191 Ok(hashed_accounts)
3192 }
3193
3194 fn unwind_storage_hashing(
3195 &self,
3196 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3197 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3198 let mut hashed_storages = changesets
3200 .into_iter()
3201 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3202 let hashed_key = keccak256(storage_entry.key);
3203 (keccak256(address), hashed_key, storage_entry.value)
3204 })
3205 .collect::<Vec<_>>();
3206 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3207
3208 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3210 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3211 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3212 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3213 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3214
3215 if hashed_storage
3216 .seek_by_key_subkey(hashed_address, key)?
3217 .is_some_and(|entry| entry.key == key)
3218 {
3219 hashed_storage.delete_current()?;
3220 }
3221
3222 if !value.is_zero() {
3223 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3224 }
3225 }
3226 Ok(hashed_storage_keys)
3227 }
3228
3229 fn unwind_storage_hashing_range(
3230 &self,
3231 range: impl RangeBounds<BlockNumber>,
3232 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3233 let changesets = self.storage_changesets_range(range)?;
3234 self.unwind_storage_hashing(changesets.into_iter())
3235 }
3236
3237 fn insert_storage_for_hashing(
3238 &self,
3239 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3240 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3241 let hashed_storages =
3243 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3244 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3245 map.insert(keccak256(entry.key), entry.value);
3246 map
3247 });
3248 map.insert(keccak256(address), storage);
3249 map
3250 });
3251
3252 let hashed_storage_keys = hashed_storages
3253 .iter()
3254 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3255 .collect();
3256
3257 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3258 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3261 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3262 if hashed_storage_cursor
3263 .seek_by_key_subkey(hashed_address, key)?
3264 .is_some_and(|entry| entry.key == key)
3265 {
3266 hashed_storage_cursor.delete_current()?;
3267 }
3268
3269 if !value.is_zero() {
3270 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3271 }
3272 Ok(())
3273 })
3274 })?;
3275
3276 Ok(hashed_storage_keys)
3277 }
3278}
3279
3280impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3281 fn unwind_account_history_indices<'a>(
3282 &self,
3283 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3284 ) -> ProviderResult<usize> {
3285 let mut last_indices = changesets
3286 .into_iter()
3287 .map(|(index, account)| (account.address, *index))
3288 .collect::<Vec<_>>();
3289 last_indices.sort_unstable_by_key(|(a, _)| *a);
3290
3291 if self.cached_storage_settings().storage_v2 {
3292 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3293 self.pending_rocksdb_batches.lock().push(batch);
3294 } else {
3295 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3297 for &(address, rem_index) in &last_indices {
3298 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3299 &mut cursor,
3300 ShardedKey::last(address),
3301 rem_index,
3302 |sharded_key| sharded_key.key == address,
3303 )?;
3304
3305 if !partial_shard.is_empty() {
3308 cursor.insert(
3309 ShardedKey::last(address),
3310 &BlockNumberList::new_pre_sorted(partial_shard),
3311 )?;
3312 }
3313 }
3314 }
3315
3316 let changesets = last_indices.len();
3317 Ok(changesets)
3318 }
3319
3320 fn unwind_account_history_indices_range(
3321 &self,
3322 range: impl RangeBounds<BlockNumber>,
3323 ) -> ProviderResult<usize> {
3324 let changesets = self.account_changesets_range(range)?;
3325 self.unwind_account_history_indices(changesets.iter())
3326 }
3327
3328 fn insert_account_history_index(
3329 &self,
3330 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3331 ) -> ProviderResult<()> {
3332 self.append_history_index::<_, tables::AccountsHistory>(
3333 account_transitions,
3334 ShardedKey::new,
3335 )
3336 }
3337
3338 fn unwind_storage_history_indices(
3339 &self,
3340 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3341 ) -> ProviderResult<usize> {
3342 let mut storage_changesets = changesets
3343 .into_iter()
3344 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3345 .collect::<Vec<_>>();
3346 storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3347
3348 if self.cached_storage_settings().storage_v2 {
3349 let batch =
3350 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3351 self.pending_rocksdb_batches.lock().push(batch);
3352 } else {
3353 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3355 for &(address, storage_key, rem_index) in &storage_changesets {
3356 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3357 &mut cursor,
3358 StorageShardedKey::last(address, storage_key),
3359 rem_index,
3360 |storage_sharded_key| {
3361 storage_sharded_key.address == address &&
3362 storage_sharded_key.sharded_key.key == storage_key
3363 },
3364 )?;
3365
3366 if !partial_shard.is_empty() {
3369 cursor.insert(
3370 StorageShardedKey::last(address, storage_key),
3371 &BlockNumberList::new_pre_sorted(partial_shard),
3372 )?;
3373 }
3374 }
3375 }
3376
3377 let changesets = storage_changesets.len();
3378 Ok(changesets)
3379 }
3380
3381 fn unwind_storage_history_indices_range(
3382 &self,
3383 range: impl RangeBounds<BlockNumber>,
3384 ) -> ProviderResult<usize> {
3385 let changesets = self.storage_changesets_range(range)?;
3386 self.unwind_storage_history_indices(changesets.into_iter())
3387 }
3388
3389 fn insert_storage_history_index(
3390 &self,
3391 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3392 ) -> ProviderResult<()> {
3393 self.append_history_index::<_, tables::StoragesHistory>(
3394 storage_transitions,
3395 |(address, storage_key), highest_block_number| {
3396 StorageShardedKey::new(address, storage_key, highest_block_number)
3397 },
3398 )
3399 }
3400
3401 #[instrument(level = "debug", target = "providers::db", skip_all)]
3402 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3403 let storage_settings = self.cached_storage_settings();
3404 if !storage_settings.storage_v2 {
3405 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3406 self.insert_account_history_index(indices)?;
3407 }
3408
3409 if !storage_settings.storage_v2 {
3410 let indices = self.changed_storages_and_blocks_with_range(range)?;
3411 self.insert_storage_history_index(indices)?;
3412 }
3413
3414 Ok(())
3415 }
3416}
3417
3418impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3419 for DatabaseProvider<TX, N>
3420{
3421 fn take_block_and_execution_above(
3422 &self,
3423 block: BlockNumber,
3424 ) -> ProviderResult<Chain<Self::Primitives>> {
3425 let range = block + 1..=self.last_block_number()?;
3426
3427 self.unwind_trie_state_from(block + 1)?;
3428
3429 let execution_state = self.take_state_above(block)?;
3431
3432 let blocks = self.recovered_block_range(range)?;
3433
3434 self.remove_blocks_above(block)?;
3437
3438 self.update_pipeline_stages(block, true)?;
3440
3441 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3442 }
3443
3444 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3445 self.unwind_trie_state_from(block + 1)?;
3446
3447 self.remove_state_above(block)?;
3449
3450 self.remove_blocks_above(block)?;
3453
3454 self.update_pipeline_stages(block, true)?;
3456
3457 Ok(())
3458 }
3459}
3460
3461impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3462 for DatabaseProvider<TX, N>
3463{
3464 type Block = BlockTy<N>;
3465 type Receipt = ReceiptTy<N>;
3466
3467 fn insert_block(
3472 &self,
3473 block: &RecoveredBlock<Self::Block>,
3474 ) -> ProviderResult<StoredBlockBodyIndices> {
3475 let block_number = block.number();
3476
3477 let executed_block = ExecutedBlock::new(
3479 Arc::new(block.clone()),
3480 Arc::new(BlockExecutionOutput {
3481 result: BlockExecutionResult {
3482 receipts: Default::default(),
3483 requests: Default::default(),
3484 gas_used: 0,
3485 blob_gas_used: 0,
3486 },
3487 state: Default::default(),
3488 }),
3489 ComputedTrieData::default(),
3490 );
3491
3492 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3494
3495 self.block_body_indices(block_number)?
3497 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3498 }
3499
3500 fn append_block_bodies(
3501 &self,
3502 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3503 ) -> ProviderResult<()> {
3504 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3505
3506 let mut tx_writer =
3508 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3509
3510 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3511 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3512
3513 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3515
3516 for (block_number, body) in &bodies {
3517 tx_writer.increment_block(*block_number)?;
3519
3520 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3521 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3522
3523 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3524
3525 block_indices_cursor.append(*block_number, &block_indices)?;
3527
3528 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3529
3530 let Some(body) = body else { continue };
3531
3532 if !body.transactions().is_empty() {
3534 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3535 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3536 }
3537
3538 for transaction in body.transactions() {
3540 tx_writer.append_transaction(next_tx_num, transaction)?;
3541
3542 next_tx_num += 1;
3544 }
3545 }
3546
3547 self.storage.writer().write_block_bodies(self, bodies)?;
3548
3549 Ok(())
3550 }
3551
3552 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3553 let last_block_number = self.last_block_number()?;
3554 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3556 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3557 }
3558
3559 let highest_static_file_block = self
3561 .static_file_provider()
3562 .get_highest_static_file_block(StaticFileSegment::Headers)
3563 .expect("todo: error handling, headers should exist");
3564
3565 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3571 self.static_file_provider()
3572 .get_writer(block, StaticFileSegment::Headers)?
3573 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3574
3575 let unwind_tx_from = self
3577 .block_body_indices(block)?
3578 .map(|b| b.next_tx_num())
3579 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3580
3581 let unwind_tx_to = self
3583 .tx
3584 .cursor_read::<tables::BlockBodyIndices>()?
3585 .last()?
3586 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3588 .1
3589 .last_tx_num();
3590
3591 if unwind_tx_from <= unwind_tx_to {
3592 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3593 self.with_rocksdb_batch(|batch| {
3594 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3595 for (hash, _) in hashes {
3596 writer.delete_transaction_hash_number(hash)?;
3597 }
3598 Ok(((), writer.into_raw_rocksdb_batch()))
3599 })?;
3600 }
3601
3602 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3605 EitherWriter::new_senders(self, last_block_number)?
3606 .prune_senders(unwind_tx_from, block)?;
3607 }
3608
3609 self.remove_bodies_above(block)?;
3610
3611 Ok(())
3612 }
3613
3614 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3615 self.storage.writer().remove_block_bodies_above(self, block)?;
3616
3617 let unwind_tx_from = self
3619 .block_body_indices(block)?
3620 .map(|b| b.next_tx_num())
3621 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3622
3623 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3624 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3625
3626 let static_file_tx_num =
3627 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3628
3629 let to_delete = static_file_tx_num
3630 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3631 .unwrap_or_default();
3632
3633 self.static_file_provider
3634 .latest_writer(StaticFileSegment::Transactions)?
3635 .prune_transactions(to_delete, block)?;
3636
3637 Ok(())
3638 }
3639
3640 fn append_blocks_with_state(
3649 &self,
3650 blocks: Vec<RecoveredBlock<Self::Block>>,
3651 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3652 hashed_state: HashedPostStateSorted,
3653 ) -> ProviderResult<()> {
3654 if blocks.is_empty() {
3655 debug!(target: "providers::db", "Attempted to append empty block range");
3656 return Ok(())
3657 }
3658
3659 let first_number = blocks[0].number();
3662
3663 let last_block_number = blocks[blocks.len() - 1].number();
3666
3667 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3668
3669 let (account_transitions, storage_transitions) = {
3674 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3675 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3676 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3677 let block_number = first_number + block_idx as u64;
3678 for (address, account_revert) in block_reverts {
3679 account_transitions.entry(*address).or_default().push(block_number);
3680 for storage_key in account_revert.storage.keys() {
3681 let key = B256::from(storage_key.to_be_bytes());
3682 storage_transitions.entry((*address, key)).or_default().push(block_number);
3683 }
3684 }
3685 }
3686 (account_transitions, storage_transitions)
3687 };
3688
3689 for block in blocks {
3691 self.insert_block(&block)?;
3692 durations_recorder.record_relative(metrics::Action::InsertBlock);
3693 }
3694
3695 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3696 durations_recorder.record_relative(metrics::Action::InsertState);
3697
3698 self.write_hashed_state(&hashed_state)?;
3700 durations_recorder.record_relative(metrics::Action::InsertHashes);
3701
3702 let storage_settings = self.cached_storage_settings();
3707 if storage_settings.storage_v2 {
3708 self.with_rocksdb_batch(|mut batch| {
3709 for (address, blocks) in account_transitions {
3710 batch.append_account_history_shard(address, blocks)?;
3711 }
3712 Ok(((), Some(batch.into_inner())))
3713 })?;
3714 } else {
3715 self.insert_account_history_index(account_transitions)?;
3716 }
3717 if storage_settings.storage_v2 {
3718 self.with_rocksdb_batch(|mut batch| {
3719 for ((address, key), blocks) in storage_transitions {
3720 batch.append_storage_history_shard(address, key, blocks)?;
3721 }
3722 Ok(((), Some(batch.into_inner())))
3723 })?;
3724 } else {
3725 self.insert_storage_history_index(storage_transitions)?;
3726 }
3727 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3728
3729 self.update_pipeline_stages(last_block_number, false)?;
3731 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3732
3733 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3734
3735 Ok(())
3736 }
3737}
3738
3739impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3740 fn get_prune_checkpoint(
3741 &self,
3742 segment: PruneSegment,
3743 ) -> ProviderResult<Option<PruneCheckpoint>> {
3744 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3745 }
3746
3747 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3748 Ok(PruneSegment::variants()
3749 .filter_map(|segment| {
3750 self.tx
3751 .get::<tables::PruneCheckpoints>(segment)
3752 .transpose()
3753 .map(|chk| chk.map(|chk| (segment, chk)))
3754 })
3755 .collect::<Result<_, _>>()?)
3756 }
3757}
3758
3759impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3760 fn save_prune_checkpoint(
3761 &self,
3762 segment: PruneSegment,
3763 checkpoint: PruneCheckpoint,
3764 ) -> ProviderResult<()> {
3765 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3766 }
3767}
3768
3769impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3770 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3771 let db_entries = self.tx.entries::<T>()?;
3772 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3773 Ok(entries) => entries,
3774 Err(ProviderError::UnsupportedProvider) => 0,
3775 Err(err) => return Err(err),
3776 };
3777
3778 Ok(db_entries + static_file_entries)
3779 }
3780}
3781
3782impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3783 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3784 let mut finalized_blocks = self
3785 .tx
3786 .cursor_read::<tables::ChainState>()?
3787 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3788 .take(1)
3789 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3790
3791 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3792 Ok(last_finalized_block_number)
3793 }
3794
3795 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3796 let mut finalized_blocks = self
3797 .tx
3798 .cursor_read::<tables::ChainState>()?
3799 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3800 .take(1)
3801 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3802
3803 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3804 Ok(last_finalized_block_number)
3805 }
3806}
3807
3808impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3809 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3810 Ok(self
3811 .tx
3812 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3813 }
3814
3815 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3816 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3817 }
3818}
3819
3820impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3821 type Tx = TX;
3822
3823 fn tx_ref(&self) -> &Self::Tx {
3824 &self.tx
3825 }
3826
3827 fn tx_mut(&mut self) -> &mut Self::Tx {
3828 &mut self.tx
3829 }
3830
3831 fn into_tx(self) -> Self::Tx {
3832 self.tx
3833 }
3834
3835 fn prune_modes_ref(&self) -> &PruneModes {
3836 self.prune_modes_ref()
3837 }
3838
3839 #[instrument(
3841 name = "DatabaseProvider::commit",
3842 level = "debug",
3843 target = "providers::db",
3844 skip_all
3845 )]
3846 fn commit(self) -> ProviderResult<()> {
3847 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3848 self.commit_unwind()?;
3849 } else {
3850 let mut timings = metrics::CommitTimings::default();
3852
3853 let start = Instant::now();
3854 self.static_file_provider.finalize()?;
3855 timings.sf = start.elapsed();
3856
3857 let start = Instant::now();
3858 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3859 for batch in batches {
3860 self.rocksdb_provider.commit_batch(batch)?;
3861 }
3862 timings.rocksdb = start.elapsed();
3863
3864 let start = Instant::now();
3865 self.tx.commit()?;
3866 timings.mdbx = start.elapsed();
3867
3868 self.metrics.record_commit(&timings);
3869 }
3870
3871 Ok(())
3872 }
3873}
3874
3875impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3876 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3877 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3878 }
3879}
3880
3881impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3882 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3883 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3884 }
3885}
3886
3887impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3888 fn cached_storage_settings(&self) -> StorageSettings {
3889 *self.storage_settings.read()
3890 }
3891
3892 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3893 *self.storage_settings.write() = settings;
3894 }
3895}
3896
3897impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3898 fn storage_path(&self) -> PathBuf {
3899 self.db_path.clone()
3900 }
3901}
3902
3903#[cfg(test)]
3904mod tests {
3905 use super::*;
3906 use crate::{
3907 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3908 BlockWriter,
3909 };
3910 use alloy_consensus::Header;
3911 use alloy_primitives::{
3912 map::{AddressMap, B256Map},
3913 U256,
3914 };
3915 use reth_chain_state::ExecutedBlock;
3916 use reth_db_api::models::StorageSettings;
3917 use reth_ethereum_primitives::Receipt;
3918 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3919 use reth_primitives_traits::SealedBlock;
3920 use reth_storage_api::MetadataWriter;
3921 use reth_testing_utils::generators::{self, random_block, BlockParams};
3922 use reth_trie::{
3923 HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3924 };
3925 use revm_database::BundleState;
3926 use revm_state::AccountInfo;
3927 use std::{sync::mpsc, time::Duration};
3928
3929 #[test]
3930 fn test_receipts_by_block_range_empty_range() {
3931 let factory = create_test_provider_factory();
3932 let provider = factory.provider().unwrap();
3933
3934 let start = 10u64;
3936 let end = 9u64;
3937 let result = provider.receipts_by_block_range(start..=end).unwrap();
3938 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3939 }
3940
3941 #[test]
3942 fn unwind_commit_waits_for_pre_commit_readers() {
3943 let factory = create_test_provider_factory();
3944 factory.set_storage_settings_cache(StorageSettings::v2());
3945
3946 let reader = factory.provider().unwrap();
3947 let provider_rw = factory.unwind_provider_rw().unwrap();
3948 provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3949 let (done_tx, done_rx) = mpsc::channel();
3950
3951 let handle = std::thread::spawn(move || {
3952 let result = provider_rw.commit();
3953 done_tx.send(result).unwrap();
3954 });
3955
3956 assert!(
3957 done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3958 "unwind commit should wait while an older read transaction is still open"
3959 );
3960
3961 drop(reader);
3962
3963 done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
3964 handle.join().unwrap();
3965 }
3966
3967 #[test]
3968 fn test_receipts_by_block_range_nonexistent_blocks() {
3969 let factory = create_test_provider_factory();
3970 let provider = factory.provider().unwrap();
3971
3972 let result = provider.receipts_by_block_range(10..=12).unwrap();
3974 assert_eq!(result, vec![vec![], vec![], vec![]]);
3975 }
3976
3977 #[test]
3978 fn test_receipts_by_block_range_single_block() {
3979 let factory = create_test_provider_factory();
3980 let data = BlockchainTestData::default();
3981
3982 let provider_rw = factory.provider_rw().unwrap();
3983 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3984 provider_rw
3985 .write_state(
3986 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3987 crate::OriginalValuesKnown::No,
3988 StateWriteConfig::default(),
3989 )
3990 .unwrap();
3991 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3992 provider_rw
3993 .write_state(
3994 &data.blocks[0].1,
3995 crate::OriginalValuesKnown::No,
3996 StateWriteConfig::default(),
3997 )
3998 .unwrap();
3999 provider_rw.commit().unwrap();
4000
4001 let provider = factory.provider().unwrap();
4002 let result = provider.receipts_by_block_range(1..=1).unwrap();
4003
4004 assert_eq!(result.len(), 1);
4006 assert_eq!(result[0].len(), 1);
4007 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4008 }
4009
4010 #[test]
4011 fn test_receipts_by_block_range_multiple_blocks() {
4012 let factory = create_test_provider_factory();
4013 let data = BlockchainTestData::default();
4014
4015 let provider_rw = factory.provider_rw().unwrap();
4016 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4017 provider_rw
4018 .write_state(
4019 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4020 crate::OriginalValuesKnown::No,
4021 StateWriteConfig::default(),
4022 )
4023 .unwrap();
4024 for i in 0..3 {
4025 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4026 provider_rw
4027 .write_state(
4028 &data.blocks[i].1,
4029 crate::OriginalValuesKnown::No,
4030 StateWriteConfig::default(),
4031 )
4032 .unwrap();
4033 }
4034 provider_rw.commit().unwrap();
4035
4036 let provider = factory.provider().unwrap();
4037 let result = provider.receipts_by_block_range(1..=3).unwrap();
4038
4039 assert_eq!(result.len(), 3);
4041 for (i, block_receipts) in result.iter().enumerate() {
4042 assert_eq!(block_receipts.len(), 1);
4043 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4044 }
4045 }
4046
4047 #[test]
4048 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4049 let factory = create_test_provider_factory();
4050 let data = BlockchainTestData::default();
4051
4052 let provider_rw = factory.provider_rw().unwrap();
4053 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4054 provider_rw
4055 .write_state(
4056 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4057 crate::OriginalValuesKnown::No,
4058 StateWriteConfig::default(),
4059 )
4060 .unwrap();
4061
4062 for i in 0..3 {
4064 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4065 provider_rw
4066 .write_state(
4067 &data.blocks[i].1,
4068 crate::OriginalValuesKnown::No,
4069 StateWriteConfig::default(),
4070 )
4071 .unwrap();
4072 }
4073 provider_rw.commit().unwrap();
4074
4075 let provider = factory.provider().unwrap();
4076 let result = provider.receipts_by_block_range(1..=3).unwrap();
4077
4078 assert_eq!(result.len(), 3);
4080 for block_receipts in &result {
4081 assert_eq!(block_receipts.len(), 1);
4082 }
4083 }
4084
4085 #[test]
4086 fn test_receipts_by_block_range_partial_range() {
4087 let factory = create_test_provider_factory();
4088 let data = BlockchainTestData::default();
4089
4090 let provider_rw = factory.provider_rw().unwrap();
4091 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4092 provider_rw
4093 .write_state(
4094 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4095 crate::OriginalValuesKnown::No,
4096 StateWriteConfig::default(),
4097 )
4098 .unwrap();
4099 for i in 0..3 {
4100 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4101 provider_rw
4102 .write_state(
4103 &data.blocks[i].1,
4104 crate::OriginalValuesKnown::No,
4105 StateWriteConfig::default(),
4106 )
4107 .unwrap();
4108 }
4109 provider_rw.commit().unwrap();
4110
4111 let provider = factory.provider().unwrap();
4112
4113 let result = provider.receipts_by_block_range(2..=5).unwrap();
4115 assert_eq!(result.len(), 4);
4116
4117 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4124 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4125 }
4126
4127 #[test]
4128 fn test_receipts_by_block_range_all_empty_blocks() {
4129 let factory = create_test_provider_factory();
4130 let mut rng = generators::rng();
4131
4132 let mut blocks = Vec::new();
4134 for i in 0..3 {
4135 let block =
4136 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4137 blocks.push(block);
4138 }
4139
4140 let provider_rw = factory.provider_rw().unwrap();
4141 for block in blocks {
4142 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4143 }
4144 provider_rw.commit().unwrap();
4145
4146 let provider = factory.provider().unwrap();
4147 let result = provider.receipts_by_block_range(1..=3).unwrap();
4148
4149 assert_eq!(result.len(), 3);
4150 for block_receipts in result {
4151 assert_eq!(block_receipts.len(), 0);
4152 }
4153 }
4154
4155 #[test]
4156 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4157 let factory = create_test_provider_factory();
4158 let data = BlockchainTestData::default();
4159
4160 let provider_rw = factory.provider_rw().unwrap();
4161 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4162 provider_rw
4163 .write_state(
4164 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4165 crate::OriginalValuesKnown::No,
4166 StateWriteConfig::default(),
4167 )
4168 .unwrap();
4169 for i in 0..3 {
4170 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4171 provider_rw
4172 .write_state(
4173 &data.blocks[i].1,
4174 crate::OriginalValuesKnown::No,
4175 StateWriteConfig::default(),
4176 )
4177 .unwrap();
4178 }
4179 provider_rw.commit().unwrap();
4180
4181 let provider = factory.provider().unwrap();
4182
4183 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4185
4186 let mut individual_results = Vec::new();
4188 for block_num in 1..=3 {
4189 let receipts =
4190 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4191 individual_results.push(receipts);
4192 }
4193
4194 assert_eq!(range_result, individual_results);
4195 }
4196
4197 #[test]
4198 fn test_write_trie_updates_sorted() {
4199 use reth_trie::{
4200 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4201 BranchNodeCompact, StorageTrieEntry,
4202 };
4203
4204 let factory = create_test_provider_factory();
4205 let provider_rw = factory.provider_rw().unwrap();
4206
4207 {
4209 let tx = provider_rw.tx_ref();
4210 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4211
4212 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4214 cursor
4215 .upsert(
4216 to_delete,
4217 &BranchNodeCompact::new(
4218 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4222 None,
4223 ),
4224 )
4225 .unwrap();
4226
4227 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4229 cursor
4230 .upsert(
4231 to_update,
4232 &BranchNodeCompact::new(
4233 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4237 None,
4238 ),
4239 )
4240 .unwrap();
4241 }
4242
4243 let storage_address1 = B256::from([1u8; 32]);
4245 let storage_address2 = B256::from([2u8; 32]);
4246 {
4247 let tx = provider_rw.tx_ref();
4248 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4249
4250 storage_cursor
4252 .upsert(
4253 storage_address1,
4254 &StorageTrieEntry {
4255 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4256 node: BranchNodeCompact::new(
4257 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4259 0b0000_0000_0000_0000,
4260 vec![],
4261 None,
4262 ),
4263 },
4264 )
4265 .unwrap();
4266
4267 storage_cursor
4269 .upsert(
4270 storage_address2,
4271 &StorageTrieEntry {
4272 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4273 node: BranchNodeCompact::new(
4274 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4276 0b0000_0000_0000_0000,
4277 vec![],
4278 None,
4279 ),
4280 },
4281 )
4282 .unwrap();
4283 storage_cursor
4284 .upsert(
4285 storage_address2,
4286 &StorageTrieEntry {
4287 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4288 node: BranchNodeCompact::new(
4289 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4291 0b0000_0000_0000_0000,
4292 vec![],
4293 None,
4294 ),
4295 },
4296 )
4297 .unwrap();
4298 }
4299
4300 let account_nodes = vec![
4302 (
4303 Nibbles::from_nibbles([0x1, 0x2]),
4304 Some(BranchNodeCompact::new(
4305 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4309 None,
4310 )),
4311 ),
4312 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4314 Nibbles::from_nibbles([0x5, 0x6]),
4315 Some(BranchNodeCompact::new(
4316 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4320 None,
4321 )),
4322 ),
4323 ];
4324
4325 let storage_trie1 = StorageTrieUpdatesSorted {
4327 is_deleted: false,
4328 storage_nodes: vec![
4329 (
4330 Nibbles::from_nibbles([0x1, 0x0]),
4331 Some(BranchNodeCompact::new(
4332 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4336 None,
4337 )),
4338 ),
4339 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4341 };
4342
4343 let storage_trie2 = StorageTrieUpdatesSorted {
4344 is_deleted: true, storage_nodes: vec![],
4346 };
4347
4348 let mut storage_tries = B256Map::default();
4349 storage_tries.insert(storage_address1, storage_trie1);
4350 storage_tries.insert(storage_address2, storage_trie2);
4351
4352 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4353
4354 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4356
4357 assert_eq!(num_entries, 5);
4360
4361 let tx = provider_rw.tx_ref();
4363 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4364
4365 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4367 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4368 assert!(entry1.is_some(), "Updated account node should exist");
4369 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4370 assert_eq!(
4371 entry1.unwrap().1.state_mask,
4372 expected_mask,
4373 "Account node should have updated state_mask"
4374 );
4375
4376 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4378 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4379 assert!(entry2.is_none(), "Deleted account node should not exist");
4380
4381 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4383 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4384 assert!(entry3.is_some(), "New account node should exist");
4385
4386 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4388
4389 let storage_entries1: Vec<_> = storage_cursor
4391 .walk_dup(Some(storage_address1), None)
4392 .unwrap()
4393 .collect::<Result<Vec<_>, _>>()
4394 .unwrap();
4395 assert_eq!(
4396 storage_entries1.len(),
4397 1,
4398 "Storage address1 should have 1 entry after deletion"
4399 );
4400 assert_eq!(
4401 storage_entries1[0].1.nibbles.0,
4402 Nibbles::from_nibbles([0x1, 0x0]),
4403 "Remaining entry should be [0x1, 0x0]"
4404 );
4405
4406 let storage_entries2: Vec<_> = storage_cursor
4408 .walk_dup(Some(storage_address2), None)
4409 .unwrap()
4410 .collect::<Result<Vec<_>, _>>()
4411 .unwrap();
4412 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4413
4414 provider_rw.commit().unwrap();
4415 }
4416
4417 #[test]
4418 fn test_prunable_receipts_logic() {
4419 let insert_blocks =
4420 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4421 let mut rng = generators::rng();
4422 for block_num in 0..=tip_block {
4423 let block = random_block(
4424 &mut rng,
4425 block_num,
4426 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4427 );
4428 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4429 }
4430 };
4431
4432 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4433 let outcome = ExecutionOutcome {
4434 first_block: block,
4435 receipts: vec![vec![Receipt {
4436 tx_type: Default::default(),
4437 success: true,
4438 cumulative_gas_used: block, logs: vec![],
4440 }]],
4441 ..Default::default()
4442 };
4443 provider_rw
4444 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4445 .unwrap();
4446 provider_rw.commit().unwrap();
4447 };
4448
4449 {
4451 let factory = create_test_provider_factory();
4452 let storage_settings = StorageSettings::v1();
4453 factory.set_storage_settings_cache(storage_settings);
4454 let factory = factory.with_prune_modes(PruneModes {
4455 receipts: Some(PruneMode::Before(100)),
4456 ..Default::default()
4457 });
4458
4459 let tip_block = 200u64;
4460 let first_block = 1u64;
4461
4462 let provider_rw = factory.provider_rw().unwrap();
4464 insert_blocks(&provider_rw, tip_block, 1);
4465 provider_rw.commit().unwrap();
4466
4467 write_receipts(
4468 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4469 first_block,
4470 );
4471 write_receipts(
4472 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4473 tip_block - 1,
4474 );
4475
4476 let provider = factory.provider().unwrap();
4477
4478 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4479 assert!(provider
4480 .receipts_by_block(block.into())
4481 .unwrap()
4482 .is_some_and(|r| r.len() == num_receipts));
4483 }
4484 }
4485
4486 {
4488 let factory = create_test_provider_factory();
4489 let storage_settings = StorageSettings::v2();
4490 factory.set_storage_settings_cache(storage_settings);
4491 let factory = factory.with_prune_modes(PruneModes {
4492 receipts: Some(PruneMode::Before(2)),
4493 ..Default::default()
4494 });
4495
4496 let tip_block = 200u64;
4497
4498 let provider_rw = factory.provider_rw().unwrap();
4500 insert_blocks(&provider_rw, tip_block, 1);
4501 provider_rw.commit().unwrap();
4502
4503 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4505 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4506
4507 assert!(factory
4508 .static_file_provider()
4509 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4510 .is_none(),);
4511 assert!(factory
4512 .static_file_provider()
4513 .get_highest_static_file_block(StaticFileSegment::Receipts)
4514 .is_some_and(|b| b == 1),);
4515
4516 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4519 assert!(factory
4520 .static_file_provider()
4521 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4522 .is_some_and(|num| num == 2),);
4523
4524 let factory = factory.with_prune_modes(PruneModes {
4528 receipts: Some(PruneMode::Before(100)),
4529 ..Default::default()
4530 });
4531 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4532 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4533 write_receipts(provider_rw, 3);
4534
4535 let provider = factory.provider().unwrap();
4540 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4541 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4542 assert!(provider
4543 .receipts_by_block(num.into())
4544 .unwrap()
4545 .is_some_and(|r| r.len() == num_receipts));
4546
4547 let receipt = provider.receipt(num).unwrap();
4548 if num_receipts > 0 {
4549 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4550 } else {
4551 assert!(receipt.is_none());
4552 }
4553 }
4554 }
4555 }
4556
4557 #[test]
4558 fn test_try_into_history_rejects_unexecuted_blocks() {
4559 use reth_storage_api::TryIntoHistoricalStateProvider;
4560
4561 let factory = create_test_provider_factory();
4562
4563 let data = BlockchainTestData::default();
4565 let provider_rw = factory.provider_rw().unwrap();
4566 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4567 provider_rw
4568 .write_state(
4569 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4570 crate::OriginalValuesKnown::No,
4571 StateWriteConfig::default(),
4572 )
4573 .unwrap();
4574 provider_rw.commit().unwrap();
4575
4576 let provider = factory.provider().unwrap();
4578
4579 let result = provider.try_into_history_at_block(0);
4581 assert!(result.is_ok(), "Block 0 should be available");
4582
4583 let provider = factory.provider().unwrap();
4585 let result = provider.try_into_history_at_block(100);
4586
4587 match result {
4589 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4590 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4591 Ok(_) => panic!("Expected error, got Ok"),
4592 }
4593 }
4594
4595 #[test]
4596 fn test_unwind_storage_hashing_with_hashed_state() {
4597 let factory = create_test_provider_factory();
4598 let storage_settings = StorageSettings::v2();
4599 factory.set_storage_settings_cache(storage_settings);
4600
4601 let address = Address::random();
4602 let hashed_address = keccak256(address);
4603
4604 let plain_slot = B256::random();
4605 let hashed_slot = keccak256(plain_slot);
4606
4607 let current_value = U256::from(100);
4608 let old_value = U256::from(42);
4609
4610 let provider_rw = factory.provider_rw().unwrap();
4611 provider_rw
4612 .tx
4613 .cursor_dup_write::<tables::HashedStorages>()
4614 .unwrap()
4615 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4616 .unwrap();
4617
4618 let changesets = vec![(
4619 BlockNumberAddress((1, address)),
4620 StorageEntry { key: plain_slot, value: old_value },
4621 )];
4622
4623 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4624
4625 assert_eq!(result.len(), 1);
4626 assert!(result.contains_key(&hashed_address));
4627 assert!(result[&hashed_address].contains(&hashed_slot));
4628
4629 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4630 let entry = cursor
4631 .seek_by_key_subkey(hashed_address, hashed_slot)
4632 .unwrap()
4633 .expect("entry should exist");
4634 assert_eq!(entry.key, hashed_slot);
4635 assert_eq!(entry.value, old_value);
4636 }
4637
4638 #[test]
4639 fn test_write_and_remove_state_roundtrip_legacy() {
4640 let factory = create_test_provider_factory();
4641 let storage_settings = StorageSettings::v1();
4642 assert!(!storage_settings.use_hashed_state());
4643 factory.set_storage_settings_cache(storage_settings);
4644
4645 let address = Address::with_last_byte(1);
4646 let hashed_address = keccak256(address);
4647 let slot = U256::from(5);
4648 let slot_key = B256::from(slot);
4649 let hashed_slot = keccak256(slot_key);
4650
4651 let mut rng = generators::rng();
4652 let block0 =
4653 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4654 let block1 =
4655 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4656
4657 {
4658 let provider_rw = factory.provider_rw().unwrap();
4659 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4660 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4661 provider_rw
4662 .tx
4663 .cursor_write::<tables::PlainAccountState>()
4664 .unwrap()
4665 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4666 .unwrap();
4667 provider_rw.commit().unwrap();
4668 }
4669
4670 let provider_rw = factory.provider_rw().unwrap();
4671
4672 let mut state_init: BundleStateInit = AddressMap::default();
4673 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4674 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4675 state_init.insert(
4676 address,
4677 (
4678 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4679 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4680 storage_map,
4681 ),
4682 );
4683
4684 let mut reverts_init: RevertsInit = HashMap::default();
4685 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4686 block_reverts.insert(
4687 address,
4688 (
4689 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4690 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4691 ),
4692 );
4693 reverts_init.insert(1, block_reverts);
4694
4695 let execution_outcome =
4696 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4697
4698 provider_rw
4699 .write_state(
4700 &execution_outcome,
4701 OriginalValuesKnown::Yes,
4702 StateWriteConfig {
4703 write_receipts: false,
4704 write_account_changesets: true,
4705 write_storage_changesets: true,
4706 },
4707 )
4708 .unwrap();
4709
4710 let hashed_state =
4711 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4712 provider_rw.write_hashed_state(&hashed_state).unwrap();
4713
4714 let account = provider_rw
4715 .tx
4716 .cursor_read::<tables::PlainAccountState>()
4717 .unwrap()
4718 .seek_exact(address)
4719 .unwrap()
4720 .unwrap()
4721 .1;
4722 assert_eq!(account.nonce, 1);
4723
4724 let storage_entry = provider_rw
4725 .tx
4726 .cursor_dup_read::<tables::PlainStorageState>()
4727 .unwrap()
4728 .seek_by_key_subkey(address, slot_key)
4729 .unwrap()
4730 .unwrap();
4731 assert_eq!(storage_entry.key, slot_key);
4732 assert_eq!(storage_entry.value, U256::from(10));
4733
4734 let hashed_entry = provider_rw
4735 .tx
4736 .cursor_dup_read::<tables::HashedStorages>()
4737 .unwrap()
4738 .seek_by_key_subkey(hashed_address, hashed_slot)
4739 .unwrap()
4740 .unwrap();
4741 assert_eq!(hashed_entry.key, hashed_slot);
4742 assert_eq!(hashed_entry.value, U256::from(10));
4743
4744 let account_cs_entries = provider_rw
4745 .tx
4746 .cursor_dup_read::<tables::AccountChangeSets>()
4747 .unwrap()
4748 .walk(Some(1))
4749 .unwrap()
4750 .collect::<Result<Vec<_>, _>>()
4751 .unwrap();
4752 assert!(!account_cs_entries.is_empty());
4753
4754 let storage_cs_entries = provider_rw
4755 .tx
4756 .cursor_read::<tables::StorageChangeSets>()
4757 .unwrap()
4758 .walk(Some(BlockNumberAddress((1, address))))
4759 .unwrap()
4760 .collect::<Result<Vec<_>, _>>()
4761 .unwrap();
4762 assert!(!storage_cs_entries.is_empty());
4763 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4764
4765 provider_rw.remove_state_above(0).unwrap();
4766
4767 let restored_account = provider_rw
4768 .tx
4769 .cursor_read::<tables::PlainAccountState>()
4770 .unwrap()
4771 .seek_exact(address)
4772 .unwrap()
4773 .unwrap()
4774 .1;
4775 assert_eq!(restored_account.nonce, 0);
4776
4777 let storage_gone = provider_rw
4778 .tx
4779 .cursor_dup_read::<tables::PlainStorageState>()
4780 .unwrap()
4781 .seek_by_key_subkey(address, slot_key)
4782 .unwrap();
4783 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4784
4785 let account_cs_after = provider_rw
4786 .tx
4787 .cursor_dup_read::<tables::AccountChangeSets>()
4788 .unwrap()
4789 .walk(Some(1))
4790 .unwrap()
4791 .collect::<Result<Vec<_>, _>>()
4792 .unwrap();
4793 assert!(account_cs_after.is_empty());
4794
4795 let storage_cs_after = provider_rw
4796 .tx
4797 .cursor_read::<tables::StorageChangeSets>()
4798 .unwrap()
4799 .walk(Some(BlockNumberAddress((1, address))))
4800 .unwrap()
4801 .collect::<Result<Vec<_>, _>>()
4802 .unwrap();
4803 assert!(storage_cs_after.is_empty());
4804 }
4805
4806 #[test]
4807 fn test_unwind_storage_hashing_legacy() {
4808 let factory = create_test_provider_factory();
4809 let storage_settings = StorageSettings::v1();
4810 assert!(!storage_settings.use_hashed_state());
4811 factory.set_storage_settings_cache(storage_settings);
4812
4813 let address = Address::random();
4814 let hashed_address = keccak256(address);
4815
4816 let plain_slot = B256::random();
4817 let hashed_slot = keccak256(plain_slot);
4818
4819 let current_value = U256::from(100);
4820 let old_value = U256::from(42);
4821
4822 let provider_rw = factory.provider_rw().unwrap();
4823 provider_rw
4824 .tx
4825 .cursor_dup_write::<tables::HashedStorages>()
4826 .unwrap()
4827 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4828 .unwrap();
4829
4830 let changesets = vec![(
4831 BlockNumberAddress((1, address)),
4832 StorageEntry { key: plain_slot, value: old_value },
4833 )];
4834
4835 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4836
4837 assert_eq!(result.len(), 1);
4838 assert!(result.contains_key(&hashed_address));
4839 assert!(result[&hashed_address].contains(&hashed_slot));
4840
4841 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4842 let entry = cursor
4843 .seek_by_key_subkey(hashed_address, hashed_slot)
4844 .unwrap()
4845 .expect("entry should exist");
4846 assert_eq!(entry.key, hashed_slot);
4847 assert_eq!(entry.value, old_value);
4848 }
4849
4850 #[test]
4851 fn test_write_state_and_historical_read_hashed() {
4852 use reth_storage_api::StateProvider;
4853 use reth_trie::{HashedPostState, KeccakKeyHasher};
4854 use revm_database::BundleState;
4855 use revm_state::AccountInfo;
4856
4857 let factory = create_test_provider_factory();
4858 factory.set_storage_settings_cache(StorageSettings::v2());
4859
4860 let address = Address::with_last_byte(1);
4861 let slot = U256::from(5);
4862 let slot_key = B256::from(slot);
4863 let hashed_address = keccak256(address);
4864 let hashed_slot = keccak256(slot_key);
4865
4866 {
4867 let sf = factory.static_file_provider();
4868 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4869 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4870 hw.append_header(&h0, &B256::ZERO).unwrap();
4871 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4872 hw.append_header(&h1, &B256::ZERO).unwrap();
4873 hw.commit().unwrap();
4874
4875 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4876 aw.append_account_changeset(vec![], 0).unwrap();
4877 aw.commit().unwrap();
4878
4879 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4880 sw.append_storage_changeset(vec![], 0).unwrap();
4881 sw.commit().unwrap();
4882 }
4883
4884 let provider_rw = factory.provider_rw().unwrap();
4885
4886 let bundle = BundleState::builder(1..=1)
4887 .state_present_account_info(
4888 address,
4889 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4890 )
4891 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4892 .revert_account_info(1, address, Some(None))
4893 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4894 .build();
4895
4896 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4897
4898 provider_rw
4899 .tx
4900 .put::<tables::BlockBodyIndices>(
4901 1,
4902 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4903 )
4904 .unwrap();
4905
4906 provider_rw
4907 .write_state(
4908 &execution_outcome,
4909 OriginalValuesKnown::Yes,
4910 StateWriteConfig {
4911 write_receipts: false,
4912 write_account_changesets: true,
4913 write_storage_changesets: true,
4914 },
4915 )
4916 .unwrap();
4917
4918 let hashed_state =
4919 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4920 provider_rw.write_hashed_state(&hashed_state).unwrap();
4921
4922 let plain_storage_entries = provider_rw
4923 .tx
4924 .cursor_dup_read::<tables::PlainStorageState>()
4925 .unwrap()
4926 .walk(None)
4927 .unwrap()
4928 .collect::<Result<Vec<_>, _>>()
4929 .unwrap();
4930 assert!(plain_storage_entries.is_empty());
4931
4932 let hashed_entry = provider_rw
4933 .tx
4934 .cursor_dup_read::<tables::HashedStorages>()
4935 .unwrap()
4936 .seek_by_key_subkey(hashed_address, hashed_slot)
4937 .unwrap()
4938 .unwrap();
4939 assert_eq!(hashed_entry.key, hashed_slot);
4940 assert_eq!(hashed_entry.value, U256::from(10));
4941
4942 provider_rw.static_file_provider().commit().unwrap();
4943
4944 let sf = factory.static_file_provider();
4945 let storage_cs = sf.storage_changeset(1).unwrap();
4946 assert!(!storage_cs.is_empty());
4947 assert_eq!(storage_cs[0].1.key, slot_key);
4948
4949 let account_cs = sf.account_block_changeset(1).unwrap();
4950 assert!(!account_cs.is_empty());
4951 assert_eq!(account_cs[0].address, address);
4952
4953 let historical_value =
4954 HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4955 assert_eq!(historical_value, None);
4956 }
4957
4958 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4959 enum StorageMode {
4960 V1,
4961 V2,
4962 }
4963
4964 fn run_save_blocks_and_verify(mode: StorageMode) {
4965 use alloy_primitives::map::{FbBuildHasher, HashMap};
4966
4967 let factory = create_test_provider_factory();
4968
4969 match mode {
4970 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4971 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4972 }
4973
4974 let num_blocks = 3u64;
4975 let accounts_per_block = 5usize;
4976 let slots_per_account = 3usize;
4977
4978 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4979 SealedHeader::new(
4980 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4981 B256::ZERO,
4982 ),
4983 Default::default(),
4984 );
4985
4986 let genesis_executed = ExecutedBlock::new(
4987 Arc::new(genesis.try_recover().unwrap()),
4988 Arc::new(BlockExecutionOutput {
4989 result: BlockExecutionResult {
4990 receipts: vec![],
4991 requests: Default::default(),
4992 gas_used: 0,
4993 blob_gas_used: 0,
4994 },
4995 state: Default::default(),
4996 }),
4997 ComputedTrieData::default(),
4998 );
4999 let provider_rw = factory.provider_rw().unwrap();
5000 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5001 provider_rw.commit().unwrap();
5002
5003 let mut blocks: Vec<ExecutedBlock> = Vec::new();
5004 let mut parent_hash = B256::ZERO;
5005
5006 for block_num in 1..=num_blocks {
5007 let mut builder = BundleState::builder(block_num..=block_num);
5008
5009 for acct_idx in 0..accounts_per_block {
5010 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5011 let info = AccountInfo {
5012 nonce: block_num,
5013 balance: U256::from(block_num * 100 + acct_idx as u64),
5014 ..Default::default()
5015 };
5016
5017 let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5018 slots_per_account as u64)
5019 .map(|s| {
5020 (
5021 U256::from(s + acct_idx as u64 * 100),
5022 (U256::ZERO, U256::from(block_num * 1000 + s)),
5023 )
5024 })
5025 .collect();
5026
5027 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5028 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5029 .collect();
5030
5031 builder = builder
5032 .state_present_account_info(address, info)
5033 .revert_account_info(block_num, address, Some(None))
5034 .state_storage(address, storage)
5035 .revert_storage(block_num, address, revert_storage);
5036 }
5037
5038 let bundle = builder.build();
5039
5040 let hashed_state =
5041 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5042
5043 let header = Header {
5044 number: block_num,
5045 parent_hash,
5046 difficulty: U256::from(1),
5047 ..Default::default()
5048 };
5049 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5050 header,
5051 Default::default(),
5052 );
5053 parent_hash = block.hash();
5054
5055 let executed = ExecutedBlock::new(
5056 Arc::new(block.try_recover().unwrap()),
5057 Arc::new(BlockExecutionOutput {
5058 result: BlockExecutionResult {
5059 receipts: vec![],
5060 requests: Default::default(),
5061 gas_used: 0,
5062 blob_gas_used: 0,
5063 },
5064 state: bundle,
5065 }),
5066 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5067 );
5068 blocks.push(executed);
5069 }
5070
5071 let provider_rw = factory.provider_rw().unwrap();
5072 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5073 provider_rw.commit().unwrap();
5074
5075 let provider = factory.provider().unwrap();
5076
5077 for block_num in 1..=num_blocks {
5078 for acct_idx in 0..accounts_per_block {
5079 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5080 let hashed_address = keccak256(address);
5081
5082 let ha_entry = provider
5083 .tx_ref()
5084 .cursor_read::<tables::HashedAccounts>()
5085 .unwrap()
5086 .seek_exact(hashed_address)
5087 .unwrap();
5088 assert!(
5089 ha_entry.is_some(),
5090 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5091 );
5092
5093 for s in 1..=slots_per_account as u64 {
5094 let slot = U256::from(s + acct_idx as u64 * 100);
5095 let slot_key = B256::from(slot);
5096 let hashed_slot = keccak256(slot_key);
5097
5098 let hs_entry = provider
5099 .tx_ref()
5100 .cursor_dup_read::<tables::HashedStorages>()
5101 .unwrap()
5102 .seek_by_key_subkey(hashed_address, hashed_slot)
5103 .unwrap();
5104 assert!(
5105 hs_entry.is_some(),
5106 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5107 );
5108 let entry = hs_entry.unwrap();
5109 assert_eq!(entry.key, hashed_slot);
5110 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5111 }
5112 }
5113 }
5114
5115 for block_num in 1..=num_blocks {
5116 let header = provider.header_by_number(block_num).unwrap();
5117 assert!(header.is_some(), "Header missing for block {block_num}");
5118
5119 let indices = provider.block_body_indices(block_num).unwrap();
5120 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5121 }
5122
5123 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5124 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5125
5126 if mode == StorageMode::V2 {
5127 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5128 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5129
5130 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5131 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5132
5133 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5134 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5135
5136 provider.static_file_provider().commit().unwrap();
5137 let sf = factory.static_file_provider();
5138
5139 for block_num in 1..=num_blocks {
5140 let account_cs = sf.account_block_changeset(block_num).unwrap();
5141 assert!(
5142 !account_cs.is_empty(),
5143 "v2: static file AccountChangeSets should exist for block {block_num}"
5144 );
5145
5146 let storage_cs = sf.storage_changeset(block_num).unwrap();
5147 assert!(
5148 !storage_cs.is_empty(),
5149 "v2: static file StorageChangeSets should exist for block {block_num}"
5150 );
5151
5152 for (_, entry) in &storage_cs {
5153 assert!(
5154 entry.key != keccak256(entry.key),
5155 "v2: static file storage changeset should have plain slot keys"
5156 );
5157 }
5158 }
5159
5160 let rocksdb = factory.rocksdb_provider();
5161 for block_num in 1..=num_blocks {
5162 for acct_idx in 0..accounts_per_block {
5163 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5164 let shards = rocksdb.account_history_shards(address).unwrap();
5165 assert!(
5166 !shards.is_empty(),
5167 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5168 );
5169
5170 for s in 1..=slots_per_account as u64 {
5171 let slot = U256::from(s + acct_idx as u64 * 100);
5172 let slot_key = B256::from(slot);
5173 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5174 assert!(
5175 !shards.is_empty(),
5176 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5177 );
5178 }
5179 }
5180 }
5181 } else {
5182 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5183 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5184
5185 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5186 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5187
5188 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5189 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5190
5191 for block_num in 1..=num_blocks {
5192 let storage_entries: Vec<_> = provider
5193 .tx_ref()
5194 .cursor_dup_read::<tables::StorageChangeSets>()
5195 .unwrap()
5196 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5197 .unwrap()
5198 .collect::<Result<Vec<_>, _>>()
5199 .unwrap();
5200 assert!(
5201 !storage_entries.is_empty(),
5202 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5203 );
5204
5205 for (_, entry) in &storage_entries {
5206 let slot_key = B256::from(entry.key);
5207 assert!(
5208 slot_key != keccak256(slot_key),
5209 "v1: storage changeset keys should be plain (not hashed)"
5210 );
5211 }
5212 }
5213
5214 let mdbx_account_history =
5215 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5216 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5217
5218 let mdbx_storage_history =
5219 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5220 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5221 }
5222 }
5223
5224 #[test]
5225 fn test_save_blocks_v1_table_assertions() {
5226 run_save_blocks_and_verify(StorageMode::V1);
5227 }
5228
5229 #[test]
5230 fn test_save_blocks_v2_table_assertions() {
5231 run_save_blocks_and_verify(StorageMode::V2);
5232 }
5233
5234 #[test]
5235 fn test_write_and_remove_state_roundtrip_v2() {
5236 let factory = create_test_provider_factory();
5237 let storage_settings = StorageSettings::v2();
5238 assert!(storage_settings.use_hashed_state());
5239 factory.set_storage_settings_cache(storage_settings);
5240
5241 let address = Address::with_last_byte(1);
5242 let hashed_address = keccak256(address);
5243 let slot = U256::from(5);
5244 let slot_key = B256::from(slot);
5245 let hashed_slot = keccak256(slot_key);
5246
5247 {
5248 let sf = factory.static_file_provider();
5249 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5250 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5251 hw.append_header(&h0, &B256::ZERO).unwrap();
5252 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5253 hw.append_header(&h1, &B256::ZERO).unwrap();
5254 hw.commit().unwrap();
5255
5256 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5257 aw.append_account_changeset(vec![], 0).unwrap();
5258 aw.commit().unwrap();
5259
5260 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5261 sw.append_storage_changeset(vec![], 0).unwrap();
5262 sw.commit().unwrap();
5263 }
5264
5265 {
5266 let provider_rw = factory.provider_rw().unwrap();
5267 provider_rw
5268 .tx
5269 .put::<tables::BlockBodyIndices>(
5270 0,
5271 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5272 )
5273 .unwrap();
5274 provider_rw
5275 .tx
5276 .put::<tables::BlockBodyIndices>(
5277 1,
5278 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5279 )
5280 .unwrap();
5281 provider_rw
5282 .tx
5283 .cursor_write::<tables::HashedAccounts>()
5284 .unwrap()
5285 .upsert(
5286 hashed_address,
5287 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5288 )
5289 .unwrap();
5290 provider_rw.commit().unwrap();
5291 }
5292
5293 let provider_rw = factory.provider_rw().unwrap();
5294
5295 let bundle = BundleState::builder(1..=1)
5296 .state_present_account_info(
5297 address,
5298 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5299 )
5300 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5301 .revert_account_info(1, address, Some(None))
5302 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5303 .build();
5304
5305 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5306
5307 provider_rw
5308 .write_state(
5309 &execution_outcome,
5310 OriginalValuesKnown::Yes,
5311 StateWriteConfig {
5312 write_receipts: false,
5313 write_account_changesets: true,
5314 write_storage_changesets: true,
5315 },
5316 )
5317 .unwrap();
5318
5319 let hashed_state =
5320 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5321 provider_rw.write_hashed_state(&hashed_state).unwrap();
5322
5323 let hashed_account = provider_rw
5324 .tx
5325 .cursor_read::<tables::HashedAccounts>()
5326 .unwrap()
5327 .seek_exact(hashed_address)
5328 .unwrap()
5329 .unwrap()
5330 .1;
5331 assert_eq!(hashed_account.nonce, 1);
5332
5333 let hashed_entry = provider_rw
5334 .tx
5335 .cursor_dup_read::<tables::HashedStorages>()
5336 .unwrap()
5337 .seek_by_key_subkey(hashed_address, hashed_slot)
5338 .unwrap()
5339 .unwrap();
5340 assert_eq!(hashed_entry.key, hashed_slot);
5341 assert_eq!(hashed_entry.value, U256::from(10));
5342
5343 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5344 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5345
5346 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5347 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5348
5349 provider_rw.static_file_provider().commit().unwrap();
5350
5351 let sf = factory.static_file_provider();
5352 let storage_cs = sf.storage_changeset(1).unwrap();
5353 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5354 assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5355
5356 provider_rw.remove_state_above(0).unwrap();
5357
5358 let restored_account = provider_rw
5359 .tx
5360 .cursor_read::<tables::HashedAccounts>()
5361 .unwrap()
5362 .seek_exact(hashed_address)
5363 .unwrap();
5364 assert!(
5365 restored_account.is_none(),
5366 "v2: account should be removed (didn't exist before block 1)"
5367 );
5368
5369 let storage_gone = provider_rw
5370 .tx
5371 .cursor_dup_read::<tables::HashedStorages>()
5372 .unwrap()
5373 .seek_by_key_subkey(hashed_address, hashed_slot)
5374 .unwrap();
5375 assert!(
5376 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5377 "v2: storage should be reverted (removed or different key)"
5378 );
5379
5380 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5381 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5382
5383 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5384 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5385 }
5386
5387 #[test]
5388 fn test_unwind_storage_history_indices_v2() {
5389 let factory = create_test_provider_factory();
5390 factory.set_storage_settings_cache(StorageSettings::v2());
5391
5392 let address = Address::with_last_byte(1);
5393 let slot_key = B256::from(U256::from(42));
5394
5395 {
5396 let rocksdb = factory.rocksdb_provider();
5397 let mut batch = rocksdb.batch();
5398 batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5399 batch.commit().unwrap();
5400
5401 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5402 assert!(!shards.is_empty(), "history should be written to rocksdb");
5403 }
5404
5405 let provider_rw = factory.provider_rw().unwrap();
5406
5407 let changesets = vec![
5408 (
5409 BlockNumberAddress((7, address)),
5410 StorageEntry { key: slot_key, value: U256::from(5) },
5411 ),
5412 (
5413 BlockNumberAddress((10, address)),
5414 StorageEntry { key: slot_key, value: U256::from(8) },
5415 ),
5416 ];
5417
5418 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5419 assert_eq!(count, 2);
5420
5421 provider_rw.commit().unwrap();
5422
5423 let rocksdb = factory.rocksdb_provider();
5424 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5425
5426 assert!(
5427 !shards.is_empty(),
5428 "history shards should still exist with block 3 after partial unwind"
5429 );
5430
5431 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5432 assert!(all_blocks.contains(&3), "block 3 should remain");
5433 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5434 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5435 }
5436}