1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, StorageKey, StorageValue, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::{Database, ReaderTxnTracker},
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55 SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateReader, StateWriteConfig, StorageChangeSetReader,
65 StoragePath, StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77 cmp::Ordering,
78 collections::{BTreeMap, BTreeSet},
79 fmt::Debug,
80 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81 path::PathBuf,
82 sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89 #[default]
91 Normal,
92 Unwind,
95}
96
97impl CommitOrder {
98 pub const fn is_unwind(&self) -> bool {
100 matches!(self, Self::Unwind)
101 }
102}
103
104pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113 pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119 fn deref(&self) -> &Self::Target {
120 &self.0
121 }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.0
127 }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131 for DatabaseProviderRW<DB, N>
132{
133 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134 &self.0
135 }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139 pub fn commit(self) -> ProviderResult<()> {
141 self.0.commit()
142 }
143
144 pub fn into_tx(self) -> <DB as Database>::TXMut {
146 self.0.into_tx()
147 }
148
149 #[cfg(any(test, feature = "test-utils"))]
151 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152 self.0.minimum_pruning_distance = distance;
153 self
154 }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158 for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161 provider.0
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168 Full,
171 BlocksOnly,
175}
176
177impl SaveBlocksMode {
178 pub const fn with_state(self) -> bool {
180 matches!(self, Self::Full)
181 }
182}
183
184pub struct DatabaseProvider<TX, N: NodeTypes> {
187 tx: TX,
189 chain_spec: Arc<N::ChainSpec>,
191 static_file_provider: StaticFileProvider<N::Primitives>,
193 prune_modes: PruneModes,
195 storage: Arc<N::Storage>,
197 storage_settings: Arc<RwLock<StorageSettings>>,
199 rocksdb_provider: RocksDBProvider,
201 changeset_cache: ChangesetCache,
203 runtime: reth_tasks::Runtime,
205 db_path: PathBuf,
207 pending_rocksdb_batches: PendingRocksDBBatches,
209 commit_order: CommitOrder,
211 minimum_pruning_distance: u64,
213 metrics: metrics::DatabaseProviderMetrics,
215 reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
217}
218
219impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 let mut s = f.debug_struct("DatabaseProvider");
222 s.field("tx", &self.tx)
223 .field("chain_spec", &self.chain_spec)
224 .field("static_file_provider", &self.static_file_provider)
225 .field("prune_modes", &self.prune_modes)
226 .field("storage", &self.storage)
227 .field("storage_settings", &self.storage_settings)
228 .field("rocksdb_provider", &self.rocksdb_provider)
229 .field("changeset_cache", &self.changeset_cache)
230 .field("runtime", &self.runtime)
231 .field("pending_rocksdb_batches", &"<pending batches>")
232 .field("commit_order", &self.commit_order)
233 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
234 .field("reader_txn_tracker", &"<reader txn tracker>")
235 .finish()
236 }
237}
238
239impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
240 pub const fn prune_modes_ref(&self) -> &PruneModes {
242 &self.prune_modes
243 }
244
245 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
247 self.minimum_pruning_distance = distance;
248 self
249 }
250
251 pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
253 where
254 T: ReaderTxnTracker + 'static,
255 {
256 self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
257 self
258 }
259}
260
261impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
262 fn commit_unwind(self) -> ProviderResult<()> {
273 let storage_v2 = self.cached_storage_settings().storage_v2;
274 let reader_txn_tracker = self.reader_txn_tracker.clone();
275 self.tx.commit()?;
276
277 if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
278 reader_txn_tracker.wait_for_pre_commit_readers();
279 }
280
281 if storage_v2 {
282 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
283 for batch in batches {
284 self.rocksdb_provider.commit_batch(batch)?;
285 }
286 }
287
288 self.static_file_provider.commit()?;
289 Ok(())
290 }
291
292 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
294 trace!(target: "providers::db", "Returning latest state provider");
295 Box::new(LatestStateProviderRef::new(self))
296 }
297
298 pub fn history_by_block_hash<'a>(
300 &'a self,
301 block_hash: BlockHash,
302 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
303 let block_number =
304 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
305 self.history_by_block_number(block_number)
306 }
307
308 pub fn history_by_block_number<'a>(
310 &'a self,
311 mut block_number: BlockNumber,
312 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
313 if block_number == self.best_block_number().unwrap_or_default() &&
314 block_number == self.last_block_number().unwrap_or_default()
315 {
316 return Ok(Box::new(LatestStateProviderRef::new(self)))
317 }
318
319 block_number += 1;
321
322 let account_history_prune_checkpoint =
323 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
324 let storage_history_prune_checkpoint =
325 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
326
327 let mut state_provider =
328 HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
329 if let Some(prune_checkpoint_block_number) =
332 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
333 {
334 state_provider = state_provider.with_lowest_available_account_history_block_number(
335 prune_checkpoint_block_number + 1,
336 );
337 }
338 if let Some(prune_checkpoint_block_number) =
339 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
340 {
341 state_provider = state_provider.with_lowest_available_storage_history_block_number(
342 prune_checkpoint_block_number + 1,
343 );
344 }
345
346 Ok(Box::new(state_provider))
347 }
348
349 #[cfg(feature = "test-utils")]
350 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
352 self.prune_modes = prune_modes;
353 }
354}
355
356impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
357 type Primitives = N::Primitives;
358}
359
360impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
361 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
363 self.static_file_provider.clone()
364 }
365
366 fn get_static_file_writer(
367 &self,
368 block: BlockNumber,
369 segment: StaticFileSegment,
370 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
371 self.static_file_provider.get_writer(block, segment)
372 }
373}
374
375impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
376 fn rocksdb_provider(&self) -> RocksDBProvider {
378 self.rocksdb_provider.clone()
379 }
380
381 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
382 self.pending_rocksdb_batches.lock().push(batch);
383 }
384
385 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
386 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
387 for batch in batches {
388 self.rocksdb_provider.commit_batch(batch)?;
389 }
390 Ok(())
391 }
392}
393
394impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
395 for DatabaseProvider<TX, N>
396{
397 type ChainSpec = N::ChainSpec;
398
399 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
400 self.chain_spec.clone()
401 }
402}
403
404impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
405 #[expect(clippy::too_many_arguments)]
407 fn new_rw_inner(
408 tx: TX,
409 chain_spec: Arc<N::ChainSpec>,
410 static_file_provider: StaticFileProvider<N::Primitives>,
411 prune_modes: PruneModes,
412 storage: Arc<N::Storage>,
413 storage_settings: Arc<RwLock<StorageSettings>>,
414 rocksdb_provider: RocksDBProvider,
415 changeset_cache: ChangesetCache,
416 runtime: reth_tasks::Runtime,
417 db_path: PathBuf,
418 commit_order: CommitOrder,
419 ) -> Self {
420 Self {
421 tx,
422 chain_spec,
423 static_file_provider,
424 prune_modes,
425 storage,
426 storage_settings,
427 rocksdb_provider,
428 changeset_cache,
429 runtime,
430 db_path,
431 pending_rocksdb_batches: Default::default(),
432 commit_order,
433 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
434 metrics: metrics::DatabaseProviderMetrics::default(),
435 reader_txn_tracker: None,
436 }
437 }
438
439 #[expect(clippy::too_many_arguments)]
441 pub fn new_rw(
442 tx: TX,
443 chain_spec: Arc<N::ChainSpec>,
444 static_file_provider: StaticFileProvider<N::Primitives>,
445 prune_modes: PruneModes,
446 storage: Arc<N::Storage>,
447 storage_settings: Arc<RwLock<StorageSettings>>,
448 rocksdb_provider: RocksDBProvider,
449 changeset_cache: ChangesetCache,
450 runtime: reth_tasks::Runtime,
451 db_path: PathBuf,
452 ) -> Self {
453 Self::new_rw_inner(
454 tx,
455 chain_spec,
456 static_file_provider,
457 prune_modes,
458 storage,
459 storage_settings,
460 rocksdb_provider,
461 changeset_cache,
462 runtime,
463 db_path,
464 CommitOrder::Normal,
465 )
466 }
467
468 #[expect(clippy::too_many_arguments)]
470 pub fn new_unwind_rw(
471 tx: TX,
472 chain_spec: Arc<N::ChainSpec>,
473 static_file_provider: StaticFileProvider<N::Primitives>,
474 prune_modes: PruneModes,
475 storage: Arc<N::Storage>,
476 storage_settings: Arc<RwLock<StorageSettings>>,
477 rocksdb_provider: RocksDBProvider,
478 changeset_cache: ChangesetCache,
479 runtime: reth_tasks::Runtime,
480 db_path: PathBuf,
481 ) -> Self {
482 Self::new_rw_inner(
483 tx,
484 chain_spec,
485 static_file_provider,
486 prune_modes,
487 storage,
488 storage_settings,
489 rocksdb_provider,
490 changeset_cache,
491 runtime,
492 db_path,
493 CommitOrder::Unwind,
494 )
495 }
496}
497
498impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
499 fn as_ref(&self) -> &Self {
500 self
501 }
502}
503
504impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
505 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
509 where
510 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
511 {
512 let rocksdb = self.rocksdb_provider();
513 let rocksdb_batch = rocksdb.batch();
514
515 let (result, raw_batch) = f(rocksdb_batch)?;
516
517 if let Some(batch) = raw_batch {
518 self.set_pending_rocksdb_batch(batch);
519 }
520 let _ = raw_batch; Ok(result)
523 }
524
525 fn static_file_write_ctx(
527 &self,
528 save_mode: SaveBlocksMode,
529 first_block: BlockNumber,
530 last_block: BlockNumber,
531 ) -> ProviderResult<StaticFileWriteCtx> {
532 let tip = self.last_block_number()?.max(last_block);
533 Ok(StaticFileWriteCtx {
534 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
535 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
536 write_receipts: save_mode.with_state() &&
537 EitherWriter::receipts_destination(self).is_static_file(),
538 write_account_changesets: save_mode.with_state() &&
539 EitherWriterDestination::account_changesets(self).is_static_file(),
540 write_storage_changesets: save_mode.with_state() &&
541 EitherWriterDestination::storage_changesets(self).is_static_file(),
542 tip,
543 receipts_prune_mode: self.prune_modes.receipts,
544 receipts_prunable: self
546 .static_file_provider
547 .get_highest_static_file_tx(StaticFileSegment::Receipts)
548 .is_none() &&
549 PruneMode::Distance(self.minimum_pruning_distance)
550 .should_prune(first_block, tip),
551 })
552 }
553
554 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
556 RocksDBWriteCtx {
557 first_block_number: first_block,
558 prune_tx_lookup: self.prune_modes.transaction_lookup,
559 storage_settings: self.cached_storage_settings(),
560 pending_batches: self.pending_rocksdb_batches.clone(),
561 }
562 }
563
564 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
573 pub fn save_blocks(
574 &self,
575 blocks: Vec<ExecutedBlock<N::Primitives>>,
576 save_mode: SaveBlocksMode,
577 ) -> ProviderResult<()> {
578 if blocks.is_empty() {
579 debug!(target: "providers::db", "Attempted to write empty block range");
580 return Ok(())
581 }
582
583 let total_start = Instant::now();
584 let block_count = blocks.len() as u64;
585 let first_number = blocks.first().unwrap().recovered_block().number();
586 let last_block_number = blocks.last().unwrap().recovered_block().number();
587
588 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
589
590 let first_tx_num = self
592 .tx
593 .cursor_read::<tables::TransactionBlocks>()?
594 .last()?
595 .map(|(n, _)| n + 1)
596 .unwrap_or_default();
597
598 let tx_nums: Vec<TxNumber> = {
599 let mut nums = Vec::with_capacity(blocks.len());
600 let mut current = first_tx_num;
601 for block in &blocks {
602 nums.push(current);
603 current += block.recovered_block().body().transaction_count() as u64;
604 }
605 nums
606 };
607
608 let mut timings =
609 metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
610
611 let sf_provider = &self.static_file_provider;
613 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
614 let rocksdb_provider = self.rocksdb_provider.clone();
615 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
616 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
617
618 let mut sf_result = None;
619 let mut rocksdb_result = None;
620
621 let runtime = &self.runtime;
623 let span = tracing::Span::current();
626 runtime.storage_pool().in_place_scope(|s| {
627 s.spawn(|_| {
629 let _guard = span.enter();
630 let start = Instant::now();
631 sf_result = Some(
632 sf_provider
633 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
634 .map(|()| start.elapsed()),
635 );
636 });
637
638 if rocksdb_enabled {
640 s.spawn(|_| {
641 let _guard = span.enter();
642 let start = Instant::now();
643 rocksdb_result = Some(
644 rocksdb_provider
645 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
646 .map(|()| start.elapsed()),
647 );
648 });
649 }
650
651 let mdbx_start = Instant::now();
653
654 if !self.cached_storage_settings().storage_v2 &&
656 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
657 {
658 let start = Instant::now();
659 let total_tx_count: usize =
660 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
661 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
662 for (i, block) in blocks.iter().enumerate() {
663 let recovered_block = block.recovered_block();
664 for (tx_num, transaction) in
665 (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
666 {
667 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
668 }
669 }
670
671 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
673
674 self.with_rocksdb_batch(|batch| {
676 let mut tx_hash_writer =
677 EitherWriter::new_transaction_hash_numbers(self, batch)?;
678 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
679 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
680 Ok(((), raw_batch))
681 })?;
682 self.metrics.record_duration(
683 metrics::Action::InsertTransactionHashNumbers,
684 start.elapsed(),
685 );
686 }
687
688 for (i, block) in blocks.iter().enumerate() {
689 let recovered_block = block.recovered_block();
690
691 let start = Instant::now();
692 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
693 timings.insert_block += start.elapsed();
694
695 if save_mode.with_state() {
696 let execution_output = block.execution_outcome();
697
698 let start = Instant::now();
702 self.write_state(
703 WriteStateInput::Single {
704 outcome: execution_output,
705 block: recovered_block.number(),
706 },
707 OriginalValuesKnown::No,
708 StateWriteConfig {
709 write_receipts: !sf_ctx.write_receipts,
710 write_account_changesets: !sf_ctx.write_account_changesets,
711 write_storage_changesets: !sf_ctx.write_storage_changesets,
712 },
713 )?;
714 timings.write_state += start.elapsed();
715 }
716 }
717
718 if save_mode.with_state() {
721 let start = Instant::now();
723 let merged_hashed_state = HashedPostStateSorted::merge_batch(
724 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
725 );
726 if !merged_hashed_state.is_empty() {
727 self.write_hashed_state(&merged_hashed_state)?;
728 }
729 timings.write_hashed_state += start.elapsed();
730
731 let start = Instant::now();
732 let merged_trie =
733 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
734 if !merged_trie.is_empty() {
735 self.write_trie_updates_sorted(&merged_trie)?;
736 }
737 timings.write_trie_updates += start.elapsed();
738 }
739
740 if save_mode.with_state() {
742 let start = Instant::now();
743 self.update_history_indices(first_number..=last_block_number)?;
744 timings.update_history_indices = start.elapsed();
745 }
746
747 let start = Instant::now();
749 self.update_pipeline_stages(last_block_number, false)?;
750 timings.update_pipeline_stages = start.elapsed();
751
752 timings.mdbx = mdbx_start.elapsed();
753
754 Ok::<_, ProviderError>(())
755 })?;
756
757 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
759
760 if rocksdb_enabled {
761 timings.rocksdb = rocksdb_result.ok_or_else(|| {
762 ProviderError::Database(reth_db_api::DatabaseError::Other(
763 "RocksDB thread panicked".into(),
764 ))
765 })??;
766 }
767
768 timings.total = total_start.elapsed();
769
770 self.metrics.record_save_blocks(&timings);
771 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
772
773 Ok(())
774 }
775
776 #[instrument(level = "debug", target = "providers::db", skip_all)]
780 fn insert_block_mdbx_only(
781 &self,
782 block: &RecoveredBlock<BlockTy<N>>,
783 first_tx_num: TxNumber,
784 ) -> ProviderResult<StoredBlockBodyIndices> {
785 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
786 EitherWriterDestination::senders(self).is_database()
787 {
788 let start = Instant::now();
789 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
790 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
791 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
792 cursor.append(tx_num, &sender)?;
793 }
794 self.metrics
795 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
796 }
797
798 let block_number = block.number();
799 let tx_count = block.body().transaction_count() as u64;
800
801 let start = Instant::now();
802 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
803 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
804
805 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
806
807 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
808 }
809
810 fn write_block_body_indices(
813 &self,
814 block_number: BlockNumber,
815 body: &BodyTy<N>,
816 first_tx_num: TxNumber,
817 tx_count: u64,
818 ) -> ProviderResult<()> {
819 let start = Instant::now();
821 self.tx
822 .cursor_write::<tables::BlockBodyIndices>()?
823 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
824 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
825
826 if tx_count > 0 {
828 let start = Instant::now();
829 self.tx
830 .cursor_write::<tables::TransactionBlocks>()?
831 .append(first_tx_num + tx_count - 1, &block_number)?;
832 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
833 }
834
835 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
837
838 Ok(())
839 }
840
841 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
846 let changed_accounts = self.account_changesets_range(from..)?;
847
848 self.unwind_account_hashing(changed_accounts.iter())?;
850
851 self.unwind_account_history_indices(changed_accounts.iter())?;
853
854 let changed_storages = self.storage_changesets_range(from..)?;
855
856 self.unwind_storage_hashing(changed_storages.iter().copied())?;
858
859 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
861
862 let db_tip_block = self
865 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
866 .as_ref()
867 .map(|chk| chk.block_number)
868 .ok_or_else(|| ProviderError::InsufficientChangesets {
869 requested: from,
870 available: 0..=0,
871 })?;
872
873 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
874 self.write_trie_updates_sorted(&trie_revert)?;
875
876 Ok(())
877 }
878
879 fn remove_receipts_from(
881 &self,
882 from_tx: TxNumber,
883 last_block: BlockNumber,
884 ) -> ProviderResult<()> {
885 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
887
888 if EitherWriter::receipts_destination(self).is_static_file() {
889 let static_file_receipt_num =
890 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
891
892 let to_delete = static_file_receipt_num
893 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
894 .unwrap_or_default();
895
896 self.static_file_provider
897 .latest_writer(StaticFileSegment::Receipts)?
898 .prune_receipts(to_delete, last_block)?;
899 }
900
901 Ok(())
902 }
903
904 fn write_bytecodes(
906 &self,
907 bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
908 ) -> ProviderResult<()> {
909 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
910 for (hash, bytecode) in bytecodes {
911 bytecodes_cursor.upsert(hash, &bytecode)?;
912 }
913 Ok(())
914 }
915}
916
917impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
918 fn try_into_history_at_block(
919 self,
920 mut block_number: BlockNumber,
921 ) -> ProviderResult<StateProviderBox> {
922 let best_block = self.best_block_number().unwrap_or_default();
923
924 if block_number > best_block {
926 return Err(ProviderError::BlockNotExecuted {
927 requested: block_number,
928 executed: best_block,
929 });
930 }
931
932 if block_number == best_block {
934 return Ok(Box::new(LatestStateProvider::new(self)));
935 }
936
937 block_number += 1;
939
940 let account_history_prune_checkpoint =
941 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
942 let storage_history_prune_checkpoint =
943 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
944 let changeset_cache = self.changeset_cache.clone();
945
946 let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
947
948 if let Some(prune_checkpoint_block_number) =
951 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
952 {
953 state_provider = state_provider.with_lowest_available_account_history_block_number(
954 prune_checkpoint_block_number + 1,
955 );
956 }
957 if let Some(prune_checkpoint_block_number) =
958 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
959 {
960 state_provider = state_provider.with_lowest_available_storage_history_block_number(
961 prune_checkpoint_block_number + 1,
962 );
963 }
964
965 Ok(Box::new(state_provider))
966 }
967}
968
969fn unwind_history_shards<S, T, C>(
984 cursor: &mut C,
985 start_key: T::Key,
986 block_number: BlockNumber,
987 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
988) -> ProviderResult<Vec<u64>>
989where
990 T: Table<Value = BlockNumberList>,
991 T::Key: AsRef<ShardedKey<S>>,
992 C: DbCursorRO<T> + DbCursorRW<T>,
993{
994 let mut item = cursor.seek_exact(start_key)?;
996 while let Some((sharded_key, list)) = item {
997 if !shard_belongs_to_key(&sharded_key) {
999 break
1000 }
1001
1002 cursor.delete_current()?;
1005
1006 let first = list.iter().next().expect("List can't be empty");
1009
1010 if first >= block_number {
1013 item = cursor.prev()?;
1014 continue
1015 }
1016 else if block_number <= sharded_key.as_ref().highest_block_number {
1019 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1022 }
1023 return Ok(list.iter().collect::<Vec<_>>())
1026 }
1027
1028 Ok(Vec::new())
1030}
1031
1032impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1033 #[expect(clippy::too_many_arguments)]
1035 pub fn new(
1036 tx: TX,
1037 chain_spec: Arc<N::ChainSpec>,
1038 static_file_provider: StaticFileProvider<N::Primitives>,
1039 prune_modes: PruneModes,
1040 storage: Arc<N::Storage>,
1041 storage_settings: Arc<RwLock<StorageSettings>>,
1042 rocksdb_provider: RocksDBProvider,
1043 changeset_cache: ChangesetCache,
1044 runtime: reth_tasks::Runtime,
1045 db_path: PathBuf,
1046 ) -> Self {
1047 Self {
1048 tx,
1049 chain_spec,
1050 static_file_provider,
1051 prune_modes,
1052 storage,
1053 storage_settings,
1054 rocksdb_provider,
1055 changeset_cache,
1056 runtime,
1057 db_path,
1058 pending_rocksdb_batches: Default::default(),
1059 commit_order: CommitOrder::Normal,
1060 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1061 metrics: metrics::DatabaseProviderMetrics::default(),
1062 reader_txn_tracker: None,
1063 }
1064 }
1065
1066 pub fn into_tx(self) -> TX {
1068 self.tx
1069 }
1070
1071 pub const fn tx_mut(&mut self) -> &mut TX {
1073 &mut self.tx
1074 }
1075
1076 pub const fn tx_ref(&self) -> &TX {
1078 &self.tx
1079 }
1080
1081 pub fn chain_spec(&self) -> &N::ChainSpec {
1083 &self.chain_spec
1084 }
1085}
1086
1087impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1088 fn recovered_block<H, HF, B, BF>(
1089 &self,
1090 id: BlockHashOrNumber,
1091 _transaction_kind: TransactionVariant,
1092 header_by_number: HF,
1093 construct_block: BF,
1094 ) -> ProviderResult<Option<B>>
1095 where
1096 H: AsRef<HeaderTy<N>>,
1097 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1098 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1099 {
1100 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1101 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1102
1103 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1110
1111 let tx_range = body.tx_num_range();
1112
1113 let transactions = if tx_range.is_empty() {
1114 vec![]
1115 } else {
1116 self.transactions_by_tx_range(tx_range.clone())?
1117 };
1118
1119 let body = self
1120 .storage
1121 .reader()
1122 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1123 .pop()
1124 .ok_or(ProviderError::InvalidStorageOutput)?;
1125
1126 let senders = if tx_range.is_empty() {
1127 vec![]
1128 } else {
1129 let known_senders: HashMap<TxNumber, Address> =
1130 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1131
1132 let mut senders = Vec::with_capacity(body.transactions().len());
1133 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1134 match known_senders.get(&tx_num) {
1135 None => {
1136 let sender = tx.recover_signer_unchecked()?;
1137 senders.push(sender);
1138 }
1139 Some(sender) => senders.push(*sender),
1140 }
1141 }
1142 senders
1143 };
1144
1145 construct_block(header, body, senders)
1146 }
1147
1148 fn block_range<F, H, HF, R>(
1158 &self,
1159 range: RangeInclusive<BlockNumber>,
1160 headers_range: HF,
1161 mut assemble_block: F,
1162 ) -> ProviderResult<Vec<R>>
1163 where
1164 H: AsRef<HeaderTy<N>>,
1165 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1166 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1167 {
1168 if range.is_empty() {
1169 return Ok(Vec::new())
1170 }
1171
1172 let len = range.end().saturating_sub(*range.start()) as usize + 1;
1173 let mut blocks = Vec::with_capacity(len);
1174
1175 let headers = headers_range(range.clone())?;
1176
1177 let present_headers = self
1183 .block_body_indices_range(range)?
1184 .into_iter()
1185 .map(|b| b.tx_num_range())
1186 .zip(headers)
1187 .collect::<Vec<_>>();
1188
1189 let mut inputs = Vec::with_capacity(present_headers.len());
1190 for (tx_range, header) in &present_headers {
1191 let transactions = if tx_range.is_empty() {
1192 Vec::new()
1193 } else {
1194 self.transactions_by_tx_range(tx_range.clone())?
1195 };
1196
1197 inputs.push((header.as_ref(), transactions));
1198 }
1199
1200 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1201
1202 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1203 blocks.push(assemble_block(header, body, tx_range)?);
1204 }
1205
1206 Ok(blocks)
1207 }
1208
1209 fn block_with_senders_range<H, HF, B, BF>(
1220 &self,
1221 range: RangeInclusive<BlockNumber>,
1222 headers_range: HF,
1223 assemble_block: BF,
1224 ) -> ProviderResult<Vec<B>>
1225 where
1226 H: AsRef<HeaderTy<N>>,
1227 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1228 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1229 {
1230 self.block_range(range, headers_range, |header, body, tx_range| {
1231 let senders = if tx_range.is_empty() {
1232 Vec::new()
1233 } else {
1234 let known_senders: HashMap<TxNumber, Address> =
1235 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1236
1237 let mut senders = Vec::with_capacity(body.transactions().len());
1238 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1239 match known_senders.get(&tx_num) {
1240 None => {
1241 let sender = tx.recover_signer_unchecked()?;
1243 senders.push(sender);
1244 }
1245 Some(sender) => senders.push(*sender),
1246 }
1247 }
1248
1249 senders
1250 };
1251
1252 assemble_block(header, body, senders)
1253 })
1254 }
1255
1256 fn populate_bundle_state(
1260 &self,
1261 account_changeset: Vec<(u64, AccountBeforeTx)>,
1262 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1263 mut get_account: impl FnMut(Address) -> ProviderResult<Option<Account>>,
1264 mut get_storage: impl FnMut(Address, StorageKey) -> ProviderResult<Option<StorageValue>>,
1265 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1266 let mut state: BundleStateInit = HashMap::default();
1270
1271 let mut reverts: RevertsInit = HashMap::default();
1277
1278 for (block_number, account_before) in account_changeset.into_iter().rev() {
1280 let AccountBeforeTx { info: old_info, address } = account_before;
1281 match state.entry(address) {
1282 hash_map::Entry::Vacant(entry) => {
1283 let new_info = get_account(address)?;
1284 entry.insert((old_info, new_info, HashMap::default()));
1285 }
1286 hash_map::Entry::Occupied(mut entry) => {
1287 entry.get_mut().0 = old_info;
1289 }
1290 }
1291 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1293 }
1294
1295 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1297 let BlockNumberAddress((block_number, address)) = block_and_address;
1298 let account_state = match state.entry(address) {
1300 hash_map::Entry::Vacant(entry) => {
1301 let present_info = get_account(address)?;
1302 entry.insert((present_info, present_info, HashMap::default()))
1303 }
1304 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1305 };
1306
1307 match account_state.2.entry(old_storage.key) {
1309 hash_map::Entry::Vacant(entry) => {
1310 let new_storage = get_storage(address, old_storage.key)?.unwrap_or_default();
1311 entry.insert((old_storage.value, new_storage));
1312 }
1313 hash_map::Entry::Occupied(mut entry) => {
1314 entry.get_mut().0 = old_storage.value;
1315 }
1316 };
1317
1318 reverts
1319 .entry(block_number)
1320 .or_default()
1321 .entry(address)
1322 .or_default()
1323 .1
1324 .push(old_storage);
1325 }
1326
1327 Ok((state, reverts))
1328 }
1329
1330 fn populate_bundle_state_plain(
1333 &self,
1334 account_changeset: Vec<(u64, AccountBeforeTx)>,
1335 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1336 plain_accounts_cursor: &mut impl DbCursorRO<tables::PlainAccountState>,
1337 plain_storage_cursor: &mut impl DbDupCursorRO<tables::PlainStorageState>,
1338 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1339 self.populate_bundle_state(
1340 account_changeset,
1341 storage_changeset,
1342 |address| Ok(plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1)),
1343 |address, storage_key| {
1344 Ok(plain_storage_cursor
1345 .seek_by_key_subkey(address, storage_key)?
1346 .filter(|s| s.key == storage_key)
1347 .map(|s| s.value))
1348 },
1349 )
1350 }
1351
1352 fn populate_bundle_state_hashed(
1357 &self,
1358 account_changeset: Vec<(u64, AccountBeforeTx)>,
1359 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1360 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1361 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1362 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1363 self.populate_bundle_state(
1364 account_changeset,
1365 storage_changeset,
1366 |address| Ok(hashed_accounts_cursor.seek_exact(keccak256(address))?.map(|kv| kv.1)),
1367 |address, storage_key| {
1368 let hashed_storage_key = keccak256(storage_key);
1369 Ok(hashed_storage_cursor
1370 .seek_by_key_subkey(keccak256(address), hashed_storage_key)?
1371 .filter(|s| s.key == hashed_storage_key)
1372 .map(|s| s.value))
1373 },
1374 )
1375 }
1376
1377 fn populate_bundle_state_with_provider(
1378 &self,
1379 account_changeset: Vec<(u64, AccountBeforeTx)>,
1380 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1381 state_provider: impl StateProvider,
1382 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1383 self.populate_bundle_state(
1384 account_changeset,
1385 storage_changeset,
1386 |address| state_provider.basic_account(&address),
1387 |address, storage_key| state_provider.storage(address, storage_key),
1388 )
1389 }
1390}
1391
1392impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1393 fn append_history_index<P, T>(
1401 &self,
1402 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1403 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1404 ) -> ProviderResult<()>
1405 where
1406 P: Copy,
1407 T: Table<Value = BlockNumberList>,
1408 {
1409 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1412
1413 let mut cursor = self.tx.cursor_write::<T>()?;
1414
1415 for (partial_key, indices) in index_updates {
1416 let last_key = sharded_key_factory(partial_key, u64::MAX);
1417 let mut last_shard = cursor
1418 .seek_exact(last_key.clone())?
1419 .map(|(_, list)| list)
1420 .unwrap_or_else(BlockNumberList::empty);
1421
1422 last_shard.append(indices).map_err(ProviderError::other)?;
1423
1424 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1426 cursor.upsert(last_key, &last_shard)?;
1427 continue;
1428 }
1429
1430 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1432 let mut chunks_peekable = chunks.into_iter().peekable();
1433
1434 while let Some(chunk) = chunks_peekable.next() {
1435 let shard = BlockNumberList::new_pre_sorted(chunk);
1436 let highest_block_number = if chunks_peekable.peek().is_some() {
1437 shard.iter().next_back().expect("`chunks` does not return empty list")
1438 } else {
1439 u64::MAX
1441 };
1442
1443 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1444 }
1445 }
1446
1447 Ok(())
1448 }
1449}
1450
1451impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1452 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1453 if self.cached_storage_settings().use_hashed_state() {
1454 let hashed_address = keccak256(address);
1455 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1456 } else {
1457 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1458 }
1459 }
1460}
1461
1462impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1463 fn changed_accounts_with_range(
1464 &self,
1465 range: RangeInclusive<BlockNumber>,
1466 ) -> ProviderResult<BTreeSet<Address>> {
1467 let mut reader = EitherReader::new_account_changesets(self)?;
1468
1469 reader.changed_accounts_with_range(range)
1470 }
1471
1472 fn basic_accounts(
1473 &self,
1474 iter: impl IntoIterator<Item = Address>,
1475 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1476 if self.cached_storage_settings().use_hashed_state() {
1477 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1478 Ok(iter
1479 .into_iter()
1480 .map(|address| {
1481 let hashed_address = keccak256(address);
1482 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1483 })
1484 .collect::<Result<Vec<_>, _>>()?)
1485 } else {
1486 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1487 Ok(iter
1488 .into_iter()
1489 .map(|address| {
1490 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1491 })
1492 .collect::<Result<Vec<_>, _>>()?)
1493 }
1494 }
1495
1496 fn changed_accounts_and_blocks_with_range(
1497 &self,
1498 range: RangeInclusive<BlockNumber>,
1499 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1500 let highest_static_block = self
1501 .static_file_provider
1502 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1503
1504 if let Some(highest) = highest_static_block &&
1505 self.cached_storage_settings().storage_v2
1506 {
1507 let start = *range.start();
1508 let static_end = (*range.end()).min(highest);
1509
1510 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1511 if start <= static_end {
1512 for block in start..=static_end {
1513 let block_changesets = self.account_block_changeset(block)?;
1514 for changeset in block_changesets {
1515 changed_accounts_and_blocks
1516 .entry(changeset.address)
1517 .or_default()
1518 .push(block);
1519 }
1520 }
1521 }
1522
1523 Ok(changed_accounts_and_blocks)
1524 } else {
1525 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1526
1527 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1528 BTreeMap::new(),
1529 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1530 let (index, account) = entry?;
1531 accounts.entry(account.address).or_default().push(index);
1532 Ok(accounts)
1533 },
1534 )?;
1535
1536 Ok(account_transitions)
1537 }
1538 }
1539}
1540
1541impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1542 fn storage_changeset(
1543 &self,
1544 block_number: BlockNumber,
1545 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1546 if self.cached_storage_settings().storage_v2 {
1547 self.static_file_provider.storage_changeset(block_number)
1548 } else {
1549 let range = block_number..=block_number;
1550 let storage_range = BlockNumberAddress::range(range);
1551 self.tx
1552 .cursor_dup_read::<tables::StorageChangeSets>()?
1553 .walk_range(storage_range)?
1554 .map(|r| {
1555 let (bna, entry) = r?;
1556 Ok((bna, entry))
1557 })
1558 .collect()
1559 }
1560 }
1561
1562 fn get_storage_before_block(
1563 &self,
1564 block_number: BlockNumber,
1565 address: Address,
1566 storage_key: B256,
1567 ) -> ProviderResult<Option<StorageEntry>> {
1568 if self.cached_storage_settings().storage_v2 {
1569 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1570 } else {
1571 Ok(self
1572 .tx
1573 .cursor_dup_read::<tables::StorageChangeSets>()?
1574 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1575 .filter(|entry| entry.key == storage_key))
1576 }
1577 }
1578
1579 fn storage_changesets_range(
1580 &self,
1581 range: impl RangeBounds<BlockNumber>,
1582 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1583 if self.cached_storage_settings().storage_v2 {
1584 self.static_file_provider.storage_changesets_range(range)
1585 } else {
1586 self.tx
1587 .cursor_dup_read::<tables::StorageChangeSets>()?
1588 .walk_range(BlockNumberAddressRange::from(range))?
1589 .map(|r| {
1590 let (bna, entry) = r?;
1591 Ok((bna, entry))
1592 })
1593 .collect()
1594 }
1595 }
1596}
1597
1598impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1599 fn account_block_changeset(
1600 &self,
1601 block_number: BlockNumber,
1602 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1603 if self.cached_storage_settings().storage_v2 {
1604 let static_changesets =
1605 self.static_file_provider.account_block_changeset(block_number)?;
1606 Ok(static_changesets)
1607 } else {
1608 let range = block_number..=block_number;
1609 self.tx
1610 .cursor_read::<tables::AccountChangeSets>()?
1611 .walk_range(range)?
1612 .map(|result| -> ProviderResult<_> {
1613 let (_, account_before) = result?;
1614 Ok(account_before)
1615 })
1616 .collect()
1617 }
1618 }
1619
1620 fn get_account_before_block(
1621 &self,
1622 block_number: BlockNumber,
1623 address: Address,
1624 ) -> ProviderResult<Option<AccountBeforeTx>> {
1625 if self.cached_storage_settings().storage_v2 {
1626 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1627 } else {
1628 self.tx
1629 .cursor_dup_read::<tables::AccountChangeSets>()?
1630 .seek_by_key_subkey(block_number, address)?
1631 .filter(|acc| acc.address == address)
1632 .map(Ok)
1633 .transpose()
1634 }
1635 }
1636
1637 fn account_changesets_range(
1638 &self,
1639 range: impl core::ops::RangeBounds<BlockNumber>,
1640 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1641 if self.cached_storage_settings().storage_v2 {
1642 self.static_file_provider.account_changesets_range(range)
1643 } else {
1644 self.tx
1645 .cursor_read::<tables::AccountChangeSets>()?
1646 .walk_range(to_range(range))?
1647 .map(|r| r.map_err(Into::into))
1648 .collect()
1649 }
1650 }
1651}
1652
1653impl<Tx: DbTx + 'static, N: NodeTypesForProvider> StateReader for DatabaseProvider<Tx, N> {
1654 type Receipt = ReceiptTy<N>;
1655
1656 fn get_state(
1657 &self,
1658 block: BlockNumber,
1659 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1660 let Some(block_body) = self.block_body_indices(block)? else { return Ok(None) };
1661
1662 let from_transaction_num = block_body.first_tx_num();
1663 let to_transaction_num = block_body.last_tx_num();
1664
1665 let account_changeset = self.account_changesets_range(block..=block)?;
1666 let storage_changeset = self.storage_changeset(block)?;
1667
1668 let Some(block_hash) = self.block_hash(block)? else { return Ok(None) };
1669 let state_provider = self.history_by_block_hash(block_hash)?;
1670 let (state, reverts) = self.populate_bundle_state_with_provider(
1671 account_changeset,
1672 storage_changeset,
1673 state_provider,
1674 )?;
1675
1676 let receipts = self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?;
1677
1678 Ok(Some(ExecutionOutcome::new_init(
1679 state,
1680 reverts,
1681 Vec::new(),
1683 vec![receipts],
1684 block,
1685 Vec::new(),
1686 )))
1687 }
1688}
1689
1690impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1691 for DatabaseProvider<TX, N>
1692{
1693 type Header = HeaderTy<N>;
1694
1695 fn local_tip_header(
1696 &self,
1697 highest_uninterrupted_block: BlockNumber,
1698 ) -> ProviderResult<SealedHeader<Self::Header>> {
1699 let static_file_provider = self.static_file_provider();
1700
1701 let next_static_file_block_num = static_file_provider
1704 .get_highest_static_file_block(StaticFileSegment::Headers)
1705 .map(|id| id + 1)
1706 .unwrap_or_default();
1707 let next_block = highest_uninterrupted_block + 1;
1708
1709 match next_static_file_block_num.cmp(&next_block) {
1710 Ordering::Greater => {
1713 let mut static_file_producer =
1714 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1715 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1716 static_file_producer.commit()?
1719 }
1720 Ordering::Less => {
1721 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1723 }
1724 Ordering::Equal => {}
1725 }
1726
1727 let local_head = static_file_provider
1728 .sealed_header(highest_uninterrupted_block)?
1729 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1730
1731 Ok(local_head)
1732 }
1733}
1734
1735impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1736 type Header = HeaderTy<N>;
1737
1738 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1739 if let Some(num) = self.block_number(block_hash)? {
1740 Ok(self.header_by_number(num)?)
1741 } else {
1742 Ok(None)
1743 }
1744 }
1745
1746 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1747 self.static_file_provider.header_by_number(num)
1748 }
1749
1750 fn headers_range(
1751 &self,
1752 range: impl RangeBounds<BlockNumber>,
1753 ) -> ProviderResult<Vec<Self::Header>> {
1754 self.static_file_provider.headers_range(range)
1755 }
1756
1757 fn sealed_header(
1758 &self,
1759 number: BlockNumber,
1760 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1761 self.static_file_provider.sealed_header(number)
1762 }
1763
1764 fn sealed_headers_while(
1765 &self,
1766 range: impl RangeBounds<BlockNumber>,
1767 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1768 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1769 self.static_file_provider.sealed_headers_while(range, predicate)
1770 }
1771}
1772
1773impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1774 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1775 self.static_file_provider.block_hash(number)
1776 }
1777
1778 fn canonical_hashes_range(
1779 &self,
1780 start: BlockNumber,
1781 end: BlockNumber,
1782 ) -> ProviderResult<Vec<B256>> {
1783 self.static_file_provider.canonical_hashes_range(start, end)
1784 }
1785}
1786
1787impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1788 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1789 let best_number = self.best_block_number()?;
1790 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1791 Ok(ChainInfo { best_hash, best_number })
1792 }
1793
1794 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1795 Ok(self
1798 .get_stage_checkpoint(StageId::Finish)?
1799 .map(|checkpoint| checkpoint.block_number)
1800 .unwrap_or_default())
1801 }
1802
1803 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1804 self.static_file_provider.last_block_number()
1805 }
1806
1807 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1808 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1809 }
1810}
1811
1812impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1813 type Block = BlockTy<N>;
1814
1815 fn find_block_by_hash(
1816 &self,
1817 hash: B256,
1818 source: BlockSource,
1819 ) -> ProviderResult<Option<Self::Block>> {
1820 if source.is_canonical() {
1821 self.block(hash.into())
1822 } else {
1823 Ok(None)
1824 }
1825 }
1826
1827 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1835 if let Some(number) = self.convert_hash_or_number(id)? {
1836 let earliest_available = self.static_file_provider.earliest_history_height();
1837 if number < earliest_available {
1838 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1839 }
1840
1841 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1842
1843 let Some(transactions) = self.transactions_by_block(number.into())? else {
1848 return Ok(None)
1849 };
1850
1851 let body = self
1852 .storage
1853 .reader()
1854 .read_block_bodies(self, vec![(&header, transactions)])?
1855 .pop()
1856 .ok_or(ProviderError::InvalidStorageOutput)?;
1857
1858 return Ok(Some(Self::Block::new(header, body)))
1859 }
1860
1861 Ok(None)
1862 }
1863
1864 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1865 Ok(None)
1866 }
1867
1868 fn pending_block_and_receipts(
1869 &self,
1870 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1871 Ok(None)
1872 }
1873
1874 fn recovered_block(
1883 &self,
1884 id: BlockHashOrNumber,
1885 transaction_kind: TransactionVariant,
1886 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1887 self.recovered_block(
1888 id,
1889 transaction_kind,
1890 |block_number| self.header_by_number(block_number),
1891 |header, body, senders| {
1892 Self::Block::new(header, body)
1893 .try_into_recovered_unchecked(senders)
1897 .map(Some)
1898 .map_err(|_| ProviderError::SenderRecoveryError)
1899 },
1900 )
1901 }
1902
1903 fn sealed_block_with_senders(
1904 &self,
1905 id: BlockHashOrNumber,
1906 transaction_kind: TransactionVariant,
1907 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1908 self.recovered_block(
1909 id,
1910 transaction_kind,
1911 |block_number| self.sealed_header(block_number),
1912 |header, body, senders| {
1913 Self::Block::new_sealed(header, body)
1914 .try_with_senders_unchecked(senders)
1918 .map(Some)
1919 .map_err(|_| ProviderError::SenderRecoveryError)
1920 },
1921 )
1922 }
1923
1924 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1925 self.block_range(
1926 range,
1927 |range| self.headers_range(range),
1928 |header, body, _| Ok(Self::Block::new(header, body)),
1929 )
1930 }
1931
1932 fn block_with_senders_range(
1933 &self,
1934 range: RangeInclusive<BlockNumber>,
1935 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1936 self.block_with_senders_range(
1937 range,
1938 |range| self.headers_range(range),
1939 |header, body, senders| {
1940 Self::Block::new(header, body)
1941 .try_into_recovered_unchecked(senders)
1942 .map_err(|_| ProviderError::SenderRecoveryError)
1943 },
1944 )
1945 }
1946
1947 fn recovered_block_range(
1948 &self,
1949 range: RangeInclusive<BlockNumber>,
1950 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1951 self.block_with_senders_range(
1952 range,
1953 |range| self.sealed_headers_range(range),
1954 |header, body, senders| {
1955 Self::Block::new_sealed(header, body)
1956 .try_with_senders(senders)
1957 .map_err(|_| ProviderError::SenderRecoveryError)
1958 },
1959 )
1960 }
1961
1962 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1963 Ok(self
1964 .tx
1965 .cursor_read::<tables::TransactionBlocks>()?
1966 .seek(id)
1967 .map(|b| b.map(|(_, bn)| bn))?)
1968 }
1969}
1970
1971impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1972 for DatabaseProvider<TX, N>
1973{
1974 fn transaction_hashes_by_range(
1977 &self,
1978 tx_range: Range<TxNumber>,
1979 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1980 self.static_file_provider.transaction_hashes_by_range(tx_range)
1981 }
1982}
1983
1984impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1986 type Transaction = TxTy<N>;
1987
1988 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1989 self.with_rocksdb_snapshot(|rocksdb_ref| {
1990 let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1991 reader.get_transaction_hash_number(tx_hash)
1992 })
1993 }
1994
1995 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1996 self.static_file_provider.transaction_by_id(id)
1997 }
1998
1999 fn transaction_by_id_unhashed(
2000 &self,
2001 id: TxNumber,
2002 ) -> ProviderResult<Option<Self::Transaction>> {
2003 self.static_file_provider.transaction_by_id_unhashed(id)
2004 }
2005
2006 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2007 if let Some(id) = self.transaction_id(hash)? {
2008 Ok(self.transaction_by_id_unhashed(id)?)
2009 } else {
2010 Ok(None)
2011 }
2012 }
2013
2014 fn transaction_by_hash_with_meta(
2015 &self,
2016 tx_hash: TxHash,
2017 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2018 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
2019 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
2020 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
2021 let Some(sealed_header) = self.sealed_header(block_number)?
2022 {
2023 let (header, block_hash) = sealed_header.split();
2024 if let Some(block_body) = self.block_body_indices(block_number)? {
2025 let index = transaction_id - block_body.first_tx_num();
2030
2031 let meta = TransactionMeta {
2032 tx_hash,
2033 index,
2034 block_hash,
2035 block_number,
2036 base_fee: header.base_fee_per_gas(),
2037 excess_blob_gas: header.excess_blob_gas(),
2038 timestamp: header.timestamp(),
2039 };
2040
2041 return Ok(Some((transaction, meta)))
2042 }
2043 }
2044
2045 Ok(None)
2046 }
2047
2048 fn transactions_by_block(
2049 &self,
2050 id: BlockHashOrNumber,
2051 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2052 if let Some(block_number) = self.convert_hash_or_number(id)? &&
2053 let Some(body) = self.block_body_indices(block_number)?
2054 {
2055 let tx_range = body.tx_num_range();
2056 return if tx_range.is_empty() {
2057 Ok(Some(Vec::new()))
2058 } else {
2059 self.transactions_by_tx_range(tx_range).map(Some)
2060 }
2061 }
2062 Ok(None)
2063 }
2064
2065 fn transactions_by_block_range(
2066 &self,
2067 range: impl RangeBounds<BlockNumber>,
2068 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2069 let range = to_range(range);
2070
2071 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2072 .into_iter()
2073 .map(|body| {
2074 let tx_num_range = body.tx_num_range();
2075 if tx_num_range.is_empty() {
2076 Ok(Vec::new())
2077 } else {
2078 self.transactions_by_tx_range(tx_num_range)
2079 }
2080 })
2081 .collect()
2082 }
2083
2084 fn transactions_by_tx_range(
2085 &self,
2086 range: impl RangeBounds<TxNumber>,
2087 ) -> ProviderResult<Vec<Self::Transaction>> {
2088 self.static_file_provider.transactions_by_tx_range(range)
2089 }
2090
2091 fn senders_by_tx_range(
2092 &self,
2093 range: impl RangeBounds<TxNumber>,
2094 ) -> ProviderResult<Vec<Address>> {
2095 if EitherWriterDestination::senders(self).is_static_file() {
2096 self.static_file_provider.senders_by_tx_range(range)
2097 } else {
2098 self.cursor_read_collect::<tables::TransactionSenders>(range)
2099 }
2100 }
2101
2102 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2103 if EitherWriterDestination::senders(self).is_static_file() {
2104 self.static_file_provider.transaction_sender(id)
2105 } else {
2106 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2107 }
2108 }
2109}
2110
2111impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2112 type Receipt = ReceiptTy<N>;
2113
2114 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2115 self.static_file_provider.get_with_static_file_or_database(
2116 StaticFileSegment::Receipts,
2117 id,
2118 |static_file| static_file.receipt(id),
2119 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2120 )
2121 }
2122
2123 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2124 if let Some(id) = self.transaction_id(hash)? {
2125 self.receipt(id)
2126 } else {
2127 Ok(None)
2128 }
2129 }
2130
2131 fn receipts_by_block(
2132 &self,
2133 block: BlockHashOrNumber,
2134 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2135 if let Some(number) = self.convert_hash_or_number(block)? &&
2136 let Some(body) = self.block_body_indices(number)?
2137 {
2138 let tx_range = body.tx_num_range();
2139 return if tx_range.is_empty() {
2140 Ok(Some(Vec::new()))
2141 } else {
2142 self.receipts_by_tx_range(tx_range).map(Some)
2143 }
2144 }
2145 Ok(None)
2146 }
2147
2148 fn receipts_by_tx_range(
2149 &self,
2150 range: impl RangeBounds<TxNumber>,
2151 ) -> ProviderResult<Vec<Self::Receipt>> {
2152 self.static_file_provider.get_range_with_static_file_or_database(
2153 StaticFileSegment::Receipts,
2154 to_range(range),
2155 |static_file, range, _| static_file.receipts_by_tx_range(range),
2156 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2157 |_| true,
2158 )
2159 }
2160
2161 fn receipts_by_block_range(
2162 &self,
2163 block_range: RangeInclusive<BlockNumber>,
2164 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2165 if block_range.is_empty() {
2166 return Ok(Vec::new());
2167 }
2168
2169 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2171 let mut block_body_indices = Vec::with_capacity(range_len);
2172 for block_num in block_range {
2173 if let Some(indices) = self.block_body_indices(block_num)? {
2174 block_body_indices.push(indices);
2175 } else {
2176 block_body_indices.push(StoredBlockBodyIndices::default());
2178 }
2179 }
2180
2181 if block_body_indices.is_empty() {
2182 return Ok(Vec::new());
2183 }
2184
2185 let non_empty_blocks: Vec<_> =
2187 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2188
2189 if non_empty_blocks.is_empty() {
2190 return Ok(vec![Vec::new(); block_body_indices.len()]);
2192 }
2193
2194 let first_tx = non_empty_blocks[0].first_tx_num();
2196 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2197
2198 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2200 let mut receipts_iter = all_receipts.into_iter();
2201
2202 let mut result = Vec::with_capacity(block_body_indices.len());
2204 for indices in &block_body_indices {
2205 if indices.tx_count == 0 {
2206 result.push(Vec::new());
2207 } else {
2208 let block_receipts =
2209 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2210 result.push(block_receipts);
2211 }
2212 }
2213
2214 Ok(result)
2215 }
2216}
2217
2218impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2219 for DatabaseProvider<TX, N>
2220{
2221 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2222 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2223 }
2224
2225 fn block_body_indices_range(
2226 &self,
2227 range: RangeInclusive<BlockNumber>,
2228 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2229 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2230 }
2231}
2232
2233impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2234 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2235 Ok(if let Some(encoded) = id.get_pre_encoded() {
2236 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2237 } else {
2238 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2239 })
2240 }
2241
2242 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2244 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2245 }
2246
2247 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2248 self.tx
2249 .cursor_read::<tables::StageCheckpoints>()?
2250 .walk(None)?
2251 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2252 .map_err(ProviderError::Database)
2253 }
2254}
2255
2256impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2257 fn save_stage_checkpoint(
2259 &self,
2260 id: StageId,
2261 checkpoint: StageCheckpoint,
2262 ) -> ProviderResult<()> {
2263 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2264 }
2265
2266 fn save_stage_checkpoint_progress(
2268 &self,
2269 id: StageId,
2270 checkpoint: Vec<u8>,
2271 ) -> ProviderResult<()> {
2272 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2273 }
2274
2275 #[instrument(level = "debug", target = "providers::db", skip_all)]
2276 fn update_pipeline_stages(
2277 &self,
2278 block_number: BlockNumber,
2279 drop_stage_checkpoint: bool,
2280 ) -> ProviderResult<()> {
2281 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2283 for stage_id in StageId::ALL {
2284 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2285 cursor.upsert(
2286 stage_id.to_string(),
2287 &StageCheckpoint {
2288 block_number,
2289 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2290 },
2291 )?;
2292 }
2293
2294 Ok(())
2295 }
2296}
2297
2298impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2299 fn plain_state_storages(
2300 &self,
2301 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2302 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2303 if self.cached_storage_settings().use_hashed_state() {
2304 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2305
2306 addresses_with_keys
2307 .into_iter()
2308 .map(|(address, storage)| {
2309 let hashed_address = keccak256(address);
2310 storage
2311 .into_iter()
2312 .map(|key| -> ProviderResult<_> {
2313 let hashed_key = keccak256(key);
2314 let value = hashed_storage
2315 .seek_by_key_subkey(hashed_address, hashed_key)?
2316 .filter(|v| v.key == hashed_key)
2317 .map(|v| v.value)
2318 .unwrap_or_default();
2319 Ok(StorageEntry { key, value })
2320 })
2321 .collect::<ProviderResult<Vec<_>>>()
2322 .map(|storage| (address, storage))
2323 })
2324 .collect::<ProviderResult<Vec<(_, _)>>>()
2325 } else {
2326 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2327
2328 addresses_with_keys
2329 .into_iter()
2330 .map(|(address, storage)| {
2331 storage
2332 .into_iter()
2333 .map(|key| -> ProviderResult<_> {
2334 Ok(plain_storage
2335 .seek_by_key_subkey(address, key)?
2336 .filter(|v| v.key == key)
2337 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2338 })
2339 .collect::<ProviderResult<Vec<_>>>()
2340 .map(|storage| (address, storage))
2341 })
2342 .collect::<ProviderResult<Vec<(_, _)>>>()
2343 }
2344 }
2345
2346 fn changed_storages_with_range(
2347 &self,
2348 range: RangeInclusive<BlockNumber>,
2349 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2350 if self.cached_storage_settings().storage_v2 {
2351 self.storage_changesets_range(range)?.into_iter().try_fold(
2352 BTreeMap::new(),
2353 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2354 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2355 accounts.entry(address).or_default().insert(storage_entry.key);
2356 Ok(accounts)
2357 },
2358 )
2359 } else {
2360 self.tx
2361 .cursor_read::<tables::StorageChangeSets>()?
2362 .walk_range(BlockNumberAddress::range(range))?
2363 .try_fold(
2366 BTreeMap::new(),
2367 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2368 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2369 accounts.entry(address).or_default().insert(storage_entry.key);
2370 Ok(accounts)
2371 },
2372 )
2373 }
2374 }
2375
2376 fn changed_storages_and_blocks_with_range(
2377 &self,
2378 range: RangeInclusive<BlockNumber>,
2379 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2380 if self.cached_storage_settings().storage_v2 {
2381 self.storage_changesets_range(range)?.into_iter().try_fold(
2382 BTreeMap::new(),
2383 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2384 storages
2385 .entry((index.address(), storage.key))
2386 .or_default()
2387 .push(index.block_number());
2388 Ok(storages)
2389 },
2390 )
2391 } else {
2392 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2393
2394 let storage_changeset_lists =
2395 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2396 BTreeMap::new(),
2397 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2398 entry|
2399 -> ProviderResult<_> {
2400 let (index, storage) = entry?;
2401 storages
2402 .entry((index.address(), storage.key))
2403 .or_default()
2404 .push(index.block_number());
2405 Ok(storages)
2406 },
2407 )?;
2408
2409 Ok(storage_changeset_lists)
2410 }
2411 }
2412}
2413
2414impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2415 for DatabaseProvider<TX, N>
2416{
2417 type Receipt = ReceiptTy<N>;
2418
2419 #[instrument(level = "debug", target = "providers::db", skip_all)]
2420 fn write_state<'a>(
2421 &self,
2422 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2423 is_value_known: OriginalValuesKnown,
2424 config: StateWriteConfig,
2425 ) -> ProviderResult<()> {
2426 let execution_outcome = execution_outcome.into();
2427
2428 if self.cached_storage_settings().use_hashed_state() &&
2429 !config.write_receipts &&
2430 !config.write_account_changesets &&
2431 !config.write_storage_changesets
2432 {
2433 self.write_bytecodes(
2437 execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2438 )?;
2439 return Ok(());
2440 }
2441
2442 let first_block = execution_outcome.first_block();
2443 let (plain_state, reverts) =
2444 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2445
2446 self.write_state_reverts(reverts, first_block, config)?;
2447 self.write_state_changes(plain_state)?;
2448
2449 if !config.write_receipts {
2450 return Ok(());
2451 }
2452
2453 let block_count = execution_outcome.len() as u64;
2454 let last_block = execution_outcome.last_block();
2455 let block_range = first_block..=last_block;
2456
2457 let tip = self.last_block_number()?.max(last_block);
2458
2459 let block_indices: Vec<_> = self
2461 .block_body_indices_range(block_range)?
2462 .into_iter()
2463 .map(|b| b.first_tx_num)
2464 .collect();
2465
2466 if block_indices.len() < block_count as usize {
2468 let missing_blocks = block_count - block_indices.len() as u64;
2469 return Err(ProviderError::BlockBodyIndicesNotFound(
2470 last_block.saturating_sub(missing_blocks - 1),
2471 ));
2472 }
2473
2474 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2475
2476 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2477 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2478
2479 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2487 self.static_file_provider()
2488 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2489 .is_none()) &&
2490 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2491
2492 let mut allowed_addresses: AddressSet = AddressSet::default();
2494 for (_, addresses) in contract_log_pruner.range(..first_block) {
2495 allowed_addresses.extend(addresses.iter().copied());
2496 }
2497
2498 for (idx, (receipts, first_tx_index)) in
2499 execution_outcome.receipts().zip(block_indices).enumerate()
2500 {
2501 let block_number = first_block + idx as u64;
2502
2503 receipts_writer.increment_block(block_number)?;
2505
2506 if prunable_receipts &&
2508 self.prune_modes
2509 .receipts
2510 .is_some_and(|mode| mode.should_prune(block_number, tip))
2511 {
2512 continue
2513 }
2514
2515 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2517 allowed_addresses.extend(new_addresses.iter().copied());
2518 }
2519
2520 for (idx, receipt) in receipts.iter().enumerate() {
2521 let receipt_idx = first_tx_index + idx as u64;
2522 if prunable_receipts &&
2525 has_contract_log_filter &&
2526 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2527 {
2528 continue
2529 }
2530
2531 receipts_writer.append_receipt(receipt_idx, receipt)?;
2532 }
2533 }
2534
2535 Ok(())
2536 }
2537
2538 fn write_state_reverts(
2539 &self,
2540 reverts: PlainStateReverts,
2541 first_block: BlockNumber,
2542 config: StateWriteConfig,
2543 ) -> ProviderResult<()> {
2544 if config.write_storage_changesets {
2546 tracing::trace!("Writing storage changes");
2547 let mut storages_cursor =
2548 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2549 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2550 let block_number = first_block + block_index as BlockNumber;
2551
2552 tracing::trace!(block_number, "Writing block change");
2553 storage_changes.par_sort_unstable_by_key(|a| a.address);
2555 let total_changes =
2556 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2557 let mut changeset = Vec::with_capacity(total_changes);
2558 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2559 let mut storage = storage_revert
2560 .into_iter()
2561 .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2562 .collect::<Vec<_>>();
2563 storage.par_sort_unstable_by_key(|a| a.0);
2565
2566 let mut wiped_storage = Vec::new();
2574 if wiped {
2575 tracing::trace!(?address, "Wiping storage");
2576 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2577 wiped_storage.push((entry.key, entry.value));
2578 while let Some(entry) = storages_cursor.next_dup_val()? {
2579 wiped_storage.push((entry.key, entry.value))
2580 }
2581 }
2582 }
2583
2584 tracing::trace!(?address, ?storage, "Writing storage reverts");
2585 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2586 changeset.push(StorageBeforeTx { address, key, value });
2587 }
2588 }
2589
2590 let mut storage_changesets_writer =
2591 EitherWriter::new_storage_changesets(self, block_number)?;
2592 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2593 }
2594 }
2595
2596 if !config.write_account_changesets {
2597 return Ok(());
2598 }
2599
2600 tracing::trace!(?first_block, "Writing account changes");
2602 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2603 let block_number = first_block + block_index as BlockNumber;
2604 let changeset = account_block_reverts
2605 .into_iter()
2606 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2607 .collect::<Vec<_>>();
2608 let mut account_changesets_writer =
2609 EitherWriter::new_account_changesets(self, block_number)?;
2610
2611 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2612 }
2613
2614 Ok(())
2615 }
2616
2617 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2618 changes.accounts.par_sort_by_key(|a| a.0);
2621 changes.storage.par_sort_by_key(|a| a.address);
2622 changes.contracts.par_sort_by_key(|a| a.0);
2623
2624 if !self.cached_storage_settings().use_hashed_state() {
2625 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2627 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2628 for (address, account) in changes.accounts {
2630 if let Some(account) = account {
2631 tracing::trace!(?address, "Updating plain state account");
2632 accounts_cursor.upsert(address, &account.into())?;
2633 } else if accounts_cursor.seek_exact(address)?.is_some() {
2634 tracing::trace!(?address, "Deleting plain state account");
2635 accounts_cursor.delete_current()?;
2636 }
2637 }
2638
2639 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2641 let mut storages_cursor =
2642 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2643 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2644 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2646 storages_cursor.delete_current_duplicates()?;
2647 }
2648 let mut storage = storage
2650 .into_iter()
2651 .map(|(k, value)| StorageEntry { key: k.into(), value })
2652 .collect::<Vec<_>>();
2653 storage.par_sort_unstable_by_key(|a| a.key);
2655
2656 for entry in storage {
2657 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2658 if let Some(db_entry) =
2659 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2660 db_entry.key == entry.key
2661 {
2662 storages_cursor.delete_current()?;
2663 }
2664
2665 if !entry.value.is_zero() {
2666 storages_cursor.upsert(address, &entry)?;
2667 }
2668 }
2669 }
2670 }
2671
2672 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2674 self.write_bytecodes(
2675 changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2676 )?;
2677
2678 Ok(())
2679 }
2680
2681 #[instrument(level = "debug", target = "providers::db", skip_all)]
2682 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2683 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2685 for (hashed_address, account) in hashed_state.accounts() {
2686 if let Some(account) = account {
2687 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2688 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2689 hashed_accounts_cursor.delete_current()?;
2690 }
2691 }
2692
2693 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2695 let mut hashed_storage_cursor =
2696 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2697 for (hashed_address, storage) in sorted_storages {
2698 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2699 hashed_storage_cursor.delete_current_duplicates()?;
2700 }
2701
2702 for (hashed_slot, value) in storage.storage_slots_ref() {
2703 let entry = StorageEntry { key: *hashed_slot, value: *value };
2704
2705 if let Some(db_entry) =
2706 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2707 db_entry.key == entry.key
2708 {
2709 hashed_storage_cursor.delete_current()?;
2710 }
2711
2712 if !entry.value.is_zero() {
2713 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2714 }
2715 }
2716 }
2717
2718 Ok(())
2719 }
2720
2721 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2743 let range = block + 1..=self.last_block_number()?;
2744
2745 if range.is_empty() {
2746 return Ok(());
2747 }
2748
2749 let block_bodies = self.block_body_indices_range(range.clone())?;
2751
2752 let from_transaction_num =
2754 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2755
2756 let storage_range = BlockNumberAddress::range(range.clone());
2757 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2758 let changesets = self.storage_changesets_range(range.clone())?;
2759 let mut changeset_writer =
2760 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2761 changeset_writer.prune_storage_changesets(block)?;
2762 changesets
2763 } else {
2764 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2765 };
2766 let account_changeset = if self.cached_storage_settings().storage_v2 {
2767 let changesets = self.account_changesets_range(range)?;
2768 let mut changeset_writer =
2769 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2770 changeset_writer.prune_account_changesets(block)?;
2771 changesets
2772 } else {
2773 self.take::<tables::AccountChangeSets>(range)?
2774 };
2775
2776 if self.cached_storage_settings().use_hashed_state() {
2777 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2778 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2779
2780 let (state, _) = self.populate_bundle_state_hashed(
2781 account_changeset,
2782 storage_changeset,
2783 &mut hashed_accounts_cursor,
2784 &mut hashed_storage_cursor,
2785 )?;
2786
2787 for (address, (old_account, new_account, storage)) in &state {
2788 if old_account != new_account {
2789 let hashed_address = keccak256(address);
2790 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2791 if let Some(account) = old_account {
2792 hashed_accounts_cursor.upsert(hashed_address, account)?;
2793 } else if existing_entry.is_some() {
2794 hashed_accounts_cursor.delete_current()?;
2795 }
2796 }
2797
2798 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2799 let hashed_address = keccak256(address);
2800 let hashed_storage_key = keccak256(storage_key);
2801 let storage_entry =
2802 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2803 if hashed_storage_cursor
2804 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2805 .is_some_and(|s| s.key == hashed_storage_key)
2806 {
2807 hashed_storage_cursor.delete_current()?
2808 }
2809
2810 if !old_storage_value.is_zero() {
2811 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2812 }
2813 }
2814 }
2815 } else {
2816 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2821 let mut plain_storage_cursor =
2822 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2823
2824 let (state, _) = self.populate_bundle_state_plain(
2825 account_changeset,
2826 storage_changeset,
2827 &mut plain_accounts_cursor,
2828 &mut plain_storage_cursor,
2829 )?;
2830
2831 for (address, (old_account, new_account, storage)) in &state {
2832 if old_account != new_account {
2833 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2834 if let Some(account) = old_account {
2835 plain_accounts_cursor.upsert(*address, account)?;
2836 } else if existing_entry.is_some() {
2837 plain_accounts_cursor.delete_current()?;
2838 }
2839 }
2840
2841 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2842 let storage_entry =
2843 StorageEntry { key: *storage_key, value: *old_storage_value };
2844 if plain_storage_cursor
2845 .seek_by_key_subkey(*address, *storage_key)?
2846 .is_some_and(|s| s.key == *storage_key)
2847 {
2848 plain_storage_cursor.delete_current()?
2849 }
2850
2851 if !old_storage_value.is_zero() {
2852 plain_storage_cursor.upsert(*address, &storage_entry)?;
2853 }
2854 }
2855 }
2856 }
2857
2858 self.remove_receipts_from(from_transaction_num, block)?;
2859
2860 Ok(())
2861 }
2862
2863 fn take_state_above(
2885 &self,
2886 block: BlockNumber,
2887 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2888 let range = block + 1..=self.last_block_number()?;
2889
2890 if range.is_empty() {
2891 return Ok(ExecutionOutcome::default())
2892 }
2893 let start_block_number = *range.start();
2894
2895 let block_bodies = self.block_body_indices_range(range.clone())?;
2897
2898 let from_transaction_num =
2900 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2901 let to_transaction_num =
2902 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2903
2904 let storage_range = BlockNumberAddress::range(range.clone());
2905 let storage_changeset = if let Some(highest_block) = self
2906 .static_file_provider
2907 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2908 self.cached_storage_settings().storage_v2
2909 {
2910 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2911 let mut changeset_writer =
2912 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2913 changeset_writer.prune_storage_changesets(block)?;
2914 changesets
2915 } else {
2916 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2917 };
2918
2919 let highest_changeset_block = self
2921 .static_file_provider
2922 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2923 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2924 self.cached_storage_settings().storage_v2
2925 {
2926 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2928 let mut changeset_writer =
2929 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2930 changeset_writer.prune_account_changesets(block)?;
2931
2932 changesets
2933 } else {
2934 self.take::<tables::AccountChangeSets>(range)?
2937 };
2938
2939 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2940 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2941 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2942
2943 let (state, reverts) = self.populate_bundle_state_hashed(
2944 account_changeset,
2945 storage_changeset,
2946 &mut hashed_accounts_cursor,
2947 &mut hashed_storage_cursor,
2948 )?;
2949
2950 for (address, (old_account, new_account, storage)) in &state {
2951 if old_account != new_account {
2952 let hashed_address = keccak256(address);
2953 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2954 if let Some(account) = old_account {
2955 hashed_accounts_cursor.upsert(hashed_address, account)?;
2956 } else if existing_entry.is_some() {
2957 hashed_accounts_cursor.delete_current()?;
2958 }
2959 }
2960
2961 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2962 let hashed_address = keccak256(address);
2963 let hashed_storage_key = keccak256(storage_key);
2964 let storage_entry =
2965 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2966 if hashed_storage_cursor
2967 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2968 .is_some_and(|s| s.key == hashed_storage_key)
2969 {
2970 hashed_storage_cursor.delete_current()?
2971 }
2972
2973 if !old_storage_value.is_zero() {
2974 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2975 }
2976 }
2977 }
2978
2979 (state, reverts)
2980 } else {
2981 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2986 let mut plain_storage_cursor =
2987 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2988
2989 let (state, reverts) = self.populate_bundle_state_plain(
2990 account_changeset,
2991 storage_changeset,
2992 &mut plain_accounts_cursor,
2993 &mut plain_storage_cursor,
2994 )?;
2995
2996 for (address, (old_account, new_account, storage)) in &state {
2997 if old_account != new_account {
2998 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2999 if let Some(account) = old_account {
3000 plain_accounts_cursor.upsert(*address, account)?;
3001 } else if existing_entry.is_some() {
3002 plain_accounts_cursor.delete_current()?;
3003 }
3004 }
3005
3006 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
3007 let storage_entry =
3008 StorageEntry { key: *storage_key, value: *old_storage_value };
3009 if plain_storage_cursor
3010 .seek_by_key_subkey(*address, *storage_key)?
3011 .is_some_and(|s| s.key == *storage_key)
3012 {
3013 plain_storage_cursor.delete_current()?
3014 }
3015
3016 if !old_storage_value.is_zero() {
3017 plain_storage_cursor.upsert(*address, &storage_entry)?;
3018 }
3019 }
3020 }
3021
3022 (state, reverts)
3023 };
3024
3025 let mut receipts_iter = self
3027 .static_file_provider
3028 .get_range_with_static_file_or_database(
3029 StaticFileSegment::Receipts,
3030 from_transaction_num..to_transaction_num + 1,
3031 |static_file, range, _| {
3032 static_file
3033 .receipts_by_tx_range(range.clone())
3034 .map(|r| range.into_iter().zip(r).collect())
3035 },
3036 |range, _| {
3037 self.tx
3038 .cursor_read::<tables::Receipts<Self::Receipt>>()?
3039 .walk_range(range)?
3040 .map(|r| r.map_err(Into::into))
3041 .collect()
3042 },
3043 |_| true,
3044 )?
3045 .into_iter()
3046 .peekable();
3047
3048 let mut receipts = Vec::with_capacity(block_bodies.len());
3049 for block_body in block_bodies {
3051 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3052 for num in block_body.tx_num_range() {
3053 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3054 block_receipts.push(receipts_iter.next().unwrap().1);
3055 }
3056 }
3057 receipts.push(block_receipts);
3058 }
3059
3060 self.remove_receipts_from(from_transaction_num, block)?;
3061
3062 Ok(ExecutionOutcome::new_init(
3063 state,
3064 reverts,
3065 Vec::new(),
3066 receipts,
3067 start_block_number,
3068 Vec::new(),
3069 ))
3070 }
3071}
3072
3073impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3074 fn write_account_trie_updates<A: TrieTableAdapter>(
3075 tx: &TX,
3076 trie_updates: &TrieUpdatesSorted,
3077 num_entries: &mut usize,
3078 ) -> ProviderResult<()>
3079 where
3080 TX: DbTxMut,
3081 {
3082 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3083 for (key, updated_node) in trie_updates.account_nodes_ref() {
3085 let nibbles = A::AccountKey::from(*key);
3086 match updated_node {
3087 Some(node) => {
3088 if !key.is_empty() {
3089 *num_entries += 1;
3090 account_trie_cursor.upsert(nibbles, node)?;
3091 }
3092 }
3093 None => {
3094 *num_entries += 1;
3095 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3096 account_trie_cursor.delete_current()?;
3097 }
3098 }
3099 }
3100 }
3101 Ok(())
3102 }
3103
3104 fn write_storage_tries<A: TrieTableAdapter>(
3105 tx: &TX,
3106 storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3107 num_entries: &mut usize,
3108 ) -> ProviderResult<()>
3109 where
3110 TX: DbTxMut,
3111 {
3112 let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3113 for (hashed_address, storage_trie_updates) in storage_tries {
3114 let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3115 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3116 *num_entries +=
3117 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3118 cursor = db_storage_trie_cursor.cursor;
3119 }
3120 Ok(())
3121 }
3122}
3123
3124impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3125 #[instrument(level = "debug", target = "providers::db", skip_all)]
3129 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3130 if trie_updates.is_empty() {
3131 return Ok(0)
3132 }
3133
3134 let mut num_entries = 0;
3136
3137 reth_trie_db::with_adapter!(self, |A| {
3138 Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3139 });
3140
3141 num_entries +=
3142 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3143
3144 Ok(num_entries)
3145 }
3146}
3147
3148impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3149 fn write_storage_trie_updates_sorted<'a>(
3155 &self,
3156 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3157 ) -> ProviderResult<usize> {
3158 let mut num_entries = 0;
3159 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3160 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3161 reth_trie_db::with_adapter!(self, |A| {
3162 Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3163 });
3164 Ok(num_entries)
3165 }
3166}
3167
3168impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3169 fn unwind_account_hashing<'a>(
3170 &self,
3171 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3172 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3173 let hashed_accounts = changesets
3177 .into_iter()
3178 .map(|(_, e)| (keccak256(e.address), e.info))
3179 .collect::<Vec<_>>()
3180 .into_iter()
3181 .rev()
3182 .collect::<BTreeMap<_, _>>();
3183
3184 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3186 for (hashed_address, account) in &hashed_accounts {
3187 if let Some(account) = account {
3188 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3189 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3190 hashed_accounts_cursor.delete_current()?;
3191 }
3192 }
3193
3194 Ok(hashed_accounts)
3195 }
3196
3197 fn unwind_account_hashing_range(
3198 &self,
3199 range: impl RangeBounds<BlockNumber>,
3200 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3201 let changesets = self.account_changesets_range(range)?;
3202 self.unwind_account_hashing(changesets.iter())
3203 }
3204
3205 fn insert_account_for_hashing(
3206 &self,
3207 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3208 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3209 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3210 let hashed_accounts =
3211 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3212 for (hashed_address, account) in &hashed_accounts {
3213 if let Some(account) = account {
3214 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3215 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3216 hashed_accounts_cursor.delete_current()?;
3217 }
3218 }
3219 Ok(hashed_accounts)
3220 }
3221
3222 fn unwind_storage_hashing(
3223 &self,
3224 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3225 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3226 let mut hashed_storages = changesets
3228 .into_iter()
3229 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3230 let hashed_key = keccak256(storage_entry.key);
3231 (keccak256(address), hashed_key, storage_entry.value)
3232 })
3233 .collect::<Vec<_>>();
3234 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3235
3236 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3238 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3239 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3240 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3241 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3242
3243 if hashed_storage
3244 .seek_by_key_subkey(hashed_address, key)?
3245 .is_some_and(|entry| entry.key == key)
3246 {
3247 hashed_storage.delete_current()?;
3248 }
3249
3250 if !value.is_zero() {
3251 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3252 }
3253 }
3254 Ok(hashed_storage_keys)
3255 }
3256
3257 fn unwind_storage_hashing_range(
3258 &self,
3259 range: impl RangeBounds<BlockNumber>,
3260 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3261 let changesets = self.storage_changesets_range(range)?;
3262 self.unwind_storage_hashing(changesets.into_iter())
3263 }
3264
3265 fn insert_storage_for_hashing(
3266 &self,
3267 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3268 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3269 let hashed_storages =
3271 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3272 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3273 map.insert(keccak256(entry.key), entry.value);
3274 map
3275 });
3276 map.insert(keccak256(address), storage);
3277 map
3278 });
3279
3280 let hashed_storage_keys = hashed_storages
3281 .iter()
3282 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3283 .collect();
3284
3285 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3286 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3289 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3290 if hashed_storage_cursor
3291 .seek_by_key_subkey(hashed_address, key)?
3292 .is_some_and(|entry| entry.key == key)
3293 {
3294 hashed_storage_cursor.delete_current()?;
3295 }
3296
3297 if !value.is_zero() {
3298 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3299 }
3300 Ok(())
3301 })
3302 })?;
3303
3304 Ok(hashed_storage_keys)
3305 }
3306}
3307
3308impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3309 fn unwind_account_history_indices<'a>(
3310 &self,
3311 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3312 ) -> ProviderResult<usize> {
3313 let mut last_indices = changesets
3314 .into_iter()
3315 .map(|(index, account)| (account.address, *index))
3316 .collect::<Vec<_>>();
3317 last_indices.sort_unstable_by_key(|(a, _)| *a);
3318
3319 if self.cached_storage_settings().storage_v2 {
3320 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3321 self.pending_rocksdb_batches.lock().push(batch);
3322 } else {
3323 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3325 for &(address, rem_index) in &last_indices {
3326 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3327 &mut cursor,
3328 ShardedKey::last(address),
3329 rem_index,
3330 |sharded_key| sharded_key.key == address,
3331 )?;
3332
3333 if !partial_shard.is_empty() {
3336 cursor.insert(
3337 ShardedKey::last(address),
3338 &BlockNumberList::new_pre_sorted(partial_shard),
3339 )?;
3340 }
3341 }
3342 }
3343
3344 let changesets = last_indices.len();
3345 Ok(changesets)
3346 }
3347
3348 fn unwind_account_history_indices_range(
3349 &self,
3350 range: impl RangeBounds<BlockNumber>,
3351 ) -> ProviderResult<usize> {
3352 let changesets = self.account_changesets_range(range)?;
3353 self.unwind_account_history_indices(changesets.iter())
3354 }
3355
3356 fn insert_account_history_index(
3357 &self,
3358 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3359 ) -> ProviderResult<()> {
3360 self.append_history_index::<_, tables::AccountsHistory>(
3361 account_transitions,
3362 ShardedKey::new,
3363 )
3364 }
3365
3366 fn unwind_storage_history_indices(
3367 &self,
3368 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3369 ) -> ProviderResult<usize> {
3370 let mut storage_changesets = changesets
3371 .into_iter()
3372 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3373 .collect::<Vec<_>>();
3374 storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3375
3376 if self.cached_storage_settings().storage_v2 {
3377 let batch =
3378 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3379 self.pending_rocksdb_batches.lock().push(batch);
3380 } else {
3381 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3383 for &(address, storage_key, rem_index) in &storage_changesets {
3384 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3385 &mut cursor,
3386 StorageShardedKey::last(address, storage_key),
3387 rem_index,
3388 |storage_sharded_key| {
3389 storage_sharded_key.address == address &&
3390 storage_sharded_key.sharded_key.key == storage_key
3391 },
3392 )?;
3393
3394 if !partial_shard.is_empty() {
3397 cursor.insert(
3398 StorageShardedKey::last(address, storage_key),
3399 &BlockNumberList::new_pre_sorted(partial_shard),
3400 )?;
3401 }
3402 }
3403 }
3404
3405 let changesets = storage_changesets.len();
3406 Ok(changesets)
3407 }
3408
3409 fn unwind_storage_history_indices_range(
3410 &self,
3411 range: impl RangeBounds<BlockNumber>,
3412 ) -> ProviderResult<usize> {
3413 let changesets = self.storage_changesets_range(range)?;
3414 self.unwind_storage_history_indices(changesets.into_iter())
3415 }
3416
3417 fn insert_storage_history_index(
3418 &self,
3419 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3420 ) -> ProviderResult<()> {
3421 self.append_history_index::<_, tables::StoragesHistory>(
3422 storage_transitions,
3423 |(address, storage_key), highest_block_number| {
3424 StorageShardedKey::new(address, storage_key, highest_block_number)
3425 },
3426 )
3427 }
3428
3429 #[instrument(level = "debug", target = "providers::db", skip_all)]
3430 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3431 let storage_settings = self.cached_storage_settings();
3432 if !storage_settings.storage_v2 {
3433 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3434 self.insert_account_history_index(indices)?;
3435 }
3436
3437 if !storage_settings.storage_v2 {
3438 let indices = self.changed_storages_and_blocks_with_range(range)?;
3439 self.insert_storage_history_index(indices)?;
3440 }
3441
3442 Ok(())
3443 }
3444}
3445
3446impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3447 for DatabaseProvider<TX, N>
3448{
3449 fn take_block_and_execution_above(
3450 &self,
3451 block: BlockNumber,
3452 ) -> ProviderResult<Chain<Self::Primitives>> {
3453 let range = block + 1..=self.last_block_number()?;
3454
3455 self.unwind_trie_state_from(block + 1)?;
3456
3457 let execution_state = self.take_state_above(block)?;
3459
3460 let blocks = self.recovered_block_range(range)?;
3461
3462 self.remove_blocks_above(block)?;
3465
3466 self.update_pipeline_stages(block, true)?;
3468
3469 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3470 }
3471
3472 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3473 self.unwind_trie_state_from(block + 1)?;
3474
3475 self.remove_state_above(block)?;
3477
3478 self.remove_blocks_above(block)?;
3481
3482 self.update_pipeline_stages(block, true)?;
3484
3485 Ok(())
3486 }
3487}
3488
3489impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3490 for DatabaseProvider<TX, N>
3491{
3492 type Block = BlockTy<N>;
3493 type Receipt = ReceiptTy<N>;
3494
3495 fn insert_block(
3500 &self,
3501 block: &RecoveredBlock<Self::Block>,
3502 ) -> ProviderResult<StoredBlockBodyIndices> {
3503 let block_number = block.number();
3504
3505 let executed_block = ExecutedBlock::new(
3507 Arc::new(block.clone()),
3508 Arc::new(BlockExecutionOutput {
3509 result: BlockExecutionResult {
3510 receipts: Default::default(),
3511 requests: Default::default(),
3512 gas_used: 0,
3513 blob_gas_used: 0,
3514 },
3515 state: Default::default(),
3516 }),
3517 ComputedTrieData::default(),
3518 );
3519
3520 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3522
3523 self.block_body_indices(block_number)?
3525 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3526 }
3527
3528 fn append_block_bodies(
3529 &self,
3530 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3531 ) -> ProviderResult<()> {
3532 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3533
3534 let mut tx_writer =
3536 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3537
3538 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3539 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3540
3541 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3543
3544 for (block_number, body) in &bodies {
3545 tx_writer.increment_block(*block_number)?;
3547
3548 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3549 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3550
3551 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3552
3553 block_indices_cursor.append(*block_number, &block_indices)?;
3555
3556 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3557
3558 let Some(body) = body else { continue };
3559
3560 if !body.transactions().is_empty() {
3562 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3563 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3564 }
3565
3566 for transaction in body.transactions() {
3568 tx_writer.append_transaction(next_tx_num, transaction)?;
3569
3570 next_tx_num += 1;
3572 }
3573 }
3574
3575 self.storage.writer().write_block_bodies(self, bodies)?;
3576
3577 Ok(())
3578 }
3579
3580 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3581 let last_block_number = self.last_block_number()?;
3582 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3584 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3585 }
3586
3587 let highest_static_file_block = self
3589 .static_file_provider()
3590 .get_highest_static_file_block(StaticFileSegment::Headers)
3591 .expect("todo: error handling, headers should exist");
3592
3593 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3599 self.static_file_provider()
3600 .get_writer(block, StaticFileSegment::Headers)?
3601 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3602
3603 let unwind_tx_from = self
3605 .block_body_indices(block)?
3606 .map(|b| b.next_tx_num())
3607 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3608
3609 let unwind_tx_to = self
3611 .tx
3612 .cursor_read::<tables::BlockBodyIndices>()?
3613 .last()?
3614 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3616 .1
3617 .last_tx_num();
3618
3619 if unwind_tx_from <= unwind_tx_to {
3620 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3621 self.with_rocksdb_batch(|batch| {
3622 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3623 for (hash, _) in hashes {
3624 writer.delete_transaction_hash_number(hash)?;
3625 }
3626 Ok(((), writer.into_raw_rocksdb_batch()))
3627 })?;
3628 }
3629
3630 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3633 EitherWriter::new_senders(self, last_block_number)?
3634 .prune_senders(unwind_tx_from, block)?;
3635 }
3636
3637 self.remove_bodies_above(block)?;
3638
3639 Ok(())
3640 }
3641
3642 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3643 self.storage.writer().remove_block_bodies_above(self, block)?;
3644
3645 let unwind_tx_from = self
3647 .block_body_indices(block)?
3648 .map(|b| b.next_tx_num())
3649 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3650
3651 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3652 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3653
3654 let static_file_tx_num =
3655 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3656
3657 let to_delete = static_file_tx_num
3658 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3659 .unwrap_or_default();
3660
3661 self.static_file_provider
3662 .latest_writer(StaticFileSegment::Transactions)?
3663 .prune_transactions(to_delete, block)?;
3664
3665 Ok(())
3666 }
3667
3668 fn append_blocks_with_state(
3677 &self,
3678 blocks: Vec<RecoveredBlock<Self::Block>>,
3679 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3680 hashed_state: HashedPostStateSorted,
3681 ) -> ProviderResult<()> {
3682 if blocks.is_empty() {
3683 debug!(target: "providers::db", "Attempted to append empty block range");
3684 return Ok(())
3685 }
3686
3687 let first_number = blocks[0].number();
3690
3691 let last_block_number = blocks[blocks.len() - 1].number();
3694
3695 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3696
3697 let (account_transitions, storage_transitions) = {
3702 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3703 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3704 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3705 let block_number = first_number + block_idx as u64;
3706 for (address, account_revert) in block_reverts {
3707 account_transitions.entry(*address).or_default().push(block_number);
3708 for storage_key in account_revert.storage.keys() {
3709 let key = B256::from(storage_key.to_be_bytes());
3710 storage_transitions.entry((*address, key)).or_default().push(block_number);
3711 }
3712 }
3713 }
3714 (account_transitions, storage_transitions)
3715 };
3716
3717 for block in blocks {
3719 self.insert_block(&block)?;
3720 durations_recorder.record_relative(metrics::Action::InsertBlock);
3721 }
3722
3723 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3724 durations_recorder.record_relative(metrics::Action::InsertState);
3725
3726 self.write_hashed_state(&hashed_state)?;
3728 durations_recorder.record_relative(metrics::Action::InsertHashes);
3729
3730 let storage_settings = self.cached_storage_settings();
3735 if storage_settings.storage_v2 {
3736 self.with_rocksdb_batch(|mut batch| {
3737 for (address, blocks) in account_transitions {
3738 batch.append_account_history_shard(address, blocks)?;
3739 }
3740 Ok(((), Some(batch.into_inner())))
3741 })?;
3742 } else {
3743 self.insert_account_history_index(account_transitions)?;
3744 }
3745 if storage_settings.storage_v2 {
3746 self.with_rocksdb_batch(|mut batch| {
3747 for ((address, key), blocks) in storage_transitions {
3748 batch.append_storage_history_shard(address, key, blocks)?;
3749 }
3750 Ok(((), Some(batch.into_inner())))
3751 })?;
3752 } else {
3753 self.insert_storage_history_index(storage_transitions)?;
3754 }
3755 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3756
3757 self.update_pipeline_stages(last_block_number, false)?;
3759 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3760
3761 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3762
3763 Ok(())
3764 }
3765}
3766
3767impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3768 fn get_prune_checkpoint(
3769 &self,
3770 segment: PruneSegment,
3771 ) -> ProviderResult<Option<PruneCheckpoint>> {
3772 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3773 }
3774
3775 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3776 Ok(PruneSegment::variants()
3777 .filter_map(|segment| {
3778 self.tx
3779 .get::<tables::PruneCheckpoints>(segment)
3780 .transpose()
3781 .map(|chk| chk.map(|chk| (segment, chk)))
3782 })
3783 .collect::<Result<_, _>>()?)
3784 }
3785}
3786
3787impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3788 fn save_prune_checkpoint(
3789 &self,
3790 segment: PruneSegment,
3791 checkpoint: PruneCheckpoint,
3792 ) -> ProviderResult<()> {
3793 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3794 }
3795}
3796
3797impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3798 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3799 let db_entries = self.tx.entries::<T>()?;
3800 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3801 Ok(entries) => entries,
3802 Err(ProviderError::UnsupportedProvider) => 0,
3803 Err(err) => return Err(err),
3804 };
3805
3806 Ok(db_entries + static_file_entries)
3807 }
3808}
3809
3810impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3811 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3812 let mut finalized_blocks = self
3813 .tx
3814 .cursor_read::<tables::ChainState>()?
3815 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3816 .take(1)
3817 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3818
3819 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3820 Ok(last_finalized_block_number)
3821 }
3822
3823 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3824 let mut finalized_blocks = self
3825 .tx
3826 .cursor_read::<tables::ChainState>()?
3827 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3828 .take(1)
3829 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3830
3831 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3832 Ok(last_finalized_block_number)
3833 }
3834}
3835
3836impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3837 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3838 Ok(self
3839 .tx
3840 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3841 }
3842
3843 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3844 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3845 }
3846}
3847
3848impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3849 type Tx = TX;
3850
3851 fn tx_ref(&self) -> &Self::Tx {
3852 &self.tx
3853 }
3854
3855 fn tx_mut(&mut self) -> &mut Self::Tx {
3856 &mut self.tx
3857 }
3858
3859 fn into_tx(self) -> Self::Tx {
3860 self.tx
3861 }
3862
3863 fn prune_modes_ref(&self) -> &PruneModes {
3864 self.prune_modes_ref()
3865 }
3866
3867 #[instrument(
3869 name = "DatabaseProvider::commit",
3870 level = "debug",
3871 target = "providers::db",
3872 skip_all
3873 )]
3874 fn commit(self) -> ProviderResult<()> {
3875 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3876 self.commit_unwind()?;
3877 } else {
3878 let mut timings = metrics::CommitTimings::default();
3880
3881 let start = Instant::now();
3882 self.static_file_provider.finalize()?;
3883 timings.sf = start.elapsed();
3884
3885 let start = Instant::now();
3886 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3887 for batch in batches {
3888 self.rocksdb_provider.commit_batch(batch)?;
3889 }
3890 timings.rocksdb = start.elapsed();
3891
3892 let start = Instant::now();
3893 self.tx.commit()?;
3894 timings.mdbx = start.elapsed();
3895
3896 self.metrics.record_commit(&timings);
3897 }
3898
3899 Ok(())
3900 }
3901}
3902
3903impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3904 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3905 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3906 }
3907}
3908
3909impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3910 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3911 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3912 }
3913}
3914
3915impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3916 fn cached_storage_settings(&self) -> StorageSettings {
3917 *self.storage_settings.read()
3918 }
3919
3920 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3921 *self.storage_settings.write() = settings;
3922 }
3923}
3924
3925impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3926 fn storage_path(&self) -> PathBuf {
3927 self.db_path.clone()
3928 }
3929}
3930
3931#[cfg(test)]
3932mod tests {
3933 use super::*;
3934 use crate::{
3935 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3936 BlockWriter,
3937 };
3938 use alloy_consensus::Header;
3939 use alloy_primitives::{
3940 map::{AddressMap, B256Map},
3941 U256,
3942 };
3943 use reth_chain_state::ExecutedBlock;
3944 use reth_db_api::models::StorageSettings;
3945 use reth_ethereum_primitives::Receipt;
3946 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3947 use reth_primitives_traits::SealedBlock;
3948 use reth_storage_api::MetadataWriter;
3949 use reth_testing_utils::generators::{self, random_block, BlockParams};
3950 use reth_trie::{
3951 HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3952 };
3953 use revm_database::BundleState;
3954 use revm_state::AccountInfo;
3955 use std::{sync::mpsc, time::Duration};
3956
3957 #[test]
3958 fn test_receipts_by_block_range_empty_range() {
3959 let factory = create_test_provider_factory();
3960 let provider = factory.provider().unwrap();
3961
3962 let start = 10u64;
3964 let end = 9u64;
3965 let result = provider.receipts_by_block_range(start..=end).unwrap();
3966 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3967 }
3968
3969 #[test]
3970 fn unwind_commit_waits_for_pre_commit_readers() {
3971 let factory = create_test_provider_factory();
3972
3973 let reader = factory.provider().unwrap();
3974 let provider_rw = factory.unwind_provider_rw().unwrap();
3975 provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3976 let (done_tx, done_rx) = mpsc::channel();
3977
3978 let handle = std::thread::spawn(move || {
3979 let result = provider_rw.commit();
3980 done_tx.send(result).unwrap();
3981 });
3982
3983 assert!(
3984 done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3985 "unwind commit should wait while an older read transaction is still open"
3986 );
3987
3988 drop(reader);
3989
3990 done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
3991 handle.join().unwrap();
3992 }
3993
3994 #[test]
3995 fn test_receipts_by_block_range_nonexistent_blocks() {
3996 let factory = create_test_provider_factory();
3997 let provider = factory.provider().unwrap();
3998
3999 let result = provider.receipts_by_block_range(10..=12).unwrap();
4001 assert_eq!(result, vec![vec![], vec![], vec![]]);
4002 }
4003
4004 #[test]
4005 fn test_receipts_by_block_range_single_block() {
4006 let factory = create_test_provider_factory();
4007 let data = BlockchainTestData::default();
4008
4009 let provider_rw = factory.provider_rw().unwrap();
4010 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4011 provider_rw
4012 .write_state(
4013 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4014 crate::OriginalValuesKnown::No,
4015 StateWriteConfig::default(),
4016 )
4017 .unwrap();
4018 provider_rw.insert_block(&data.blocks[0].0).unwrap();
4019 provider_rw
4020 .write_state(
4021 &data.blocks[0].1,
4022 crate::OriginalValuesKnown::No,
4023 StateWriteConfig::default(),
4024 )
4025 .unwrap();
4026 provider_rw.commit().unwrap();
4027
4028 let provider = factory.provider().unwrap();
4029 let result = provider.receipts_by_block_range(1..=1).unwrap();
4030
4031 assert_eq!(result.len(), 1);
4033 assert_eq!(result[0].len(), 1);
4034 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4035 }
4036
4037 #[test]
4038 fn test_receipts_by_block_range_multiple_blocks() {
4039 let factory = create_test_provider_factory();
4040 let data = BlockchainTestData::default();
4041
4042 let provider_rw = factory.provider_rw().unwrap();
4043 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4044 provider_rw
4045 .write_state(
4046 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4047 crate::OriginalValuesKnown::No,
4048 StateWriteConfig::default(),
4049 )
4050 .unwrap();
4051 for i in 0..3 {
4052 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4053 provider_rw
4054 .write_state(
4055 &data.blocks[i].1,
4056 crate::OriginalValuesKnown::No,
4057 StateWriteConfig::default(),
4058 )
4059 .unwrap();
4060 }
4061 provider_rw.commit().unwrap();
4062
4063 let provider = factory.provider().unwrap();
4064 let result = provider.receipts_by_block_range(1..=3).unwrap();
4065
4066 assert_eq!(result.len(), 3);
4068 for (i, block_receipts) in result.iter().enumerate() {
4069 assert_eq!(block_receipts.len(), 1);
4070 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4071 }
4072 }
4073
4074 #[test]
4075 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4076 let factory = create_test_provider_factory();
4077 let data = BlockchainTestData::default();
4078
4079 let provider_rw = factory.provider_rw().unwrap();
4080 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4081 provider_rw
4082 .write_state(
4083 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4084 crate::OriginalValuesKnown::No,
4085 StateWriteConfig::default(),
4086 )
4087 .unwrap();
4088
4089 for i in 0..3 {
4091 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4092 provider_rw
4093 .write_state(
4094 &data.blocks[i].1,
4095 crate::OriginalValuesKnown::No,
4096 StateWriteConfig::default(),
4097 )
4098 .unwrap();
4099 }
4100 provider_rw.commit().unwrap();
4101
4102 let provider = factory.provider().unwrap();
4103 let result = provider.receipts_by_block_range(1..=3).unwrap();
4104
4105 assert_eq!(result.len(), 3);
4107 for block_receipts in &result {
4108 assert_eq!(block_receipts.len(), 1);
4109 }
4110 }
4111
4112 #[test]
4113 fn test_receipts_by_block_range_partial_range() {
4114 let factory = create_test_provider_factory();
4115 let data = BlockchainTestData::default();
4116
4117 let provider_rw = factory.provider_rw().unwrap();
4118 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4119 provider_rw
4120 .write_state(
4121 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4122 crate::OriginalValuesKnown::No,
4123 StateWriteConfig::default(),
4124 )
4125 .unwrap();
4126 for i in 0..3 {
4127 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4128 provider_rw
4129 .write_state(
4130 &data.blocks[i].1,
4131 crate::OriginalValuesKnown::No,
4132 StateWriteConfig::default(),
4133 )
4134 .unwrap();
4135 }
4136 provider_rw.commit().unwrap();
4137
4138 let provider = factory.provider().unwrap();
4139
4140 let result = provider.receipts_by_block_range(2..=5).unwrap();
4142 assert_eq!(result.len(), 4);
4143
4144 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4151 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4152 }
4153
4154 #[test]
4155 fn test_receipts_by_block_range_all_empty_blocks() {
4156 let factory = create_test_provider_factory();
4157 let mut rng = generators::rng();
4158
4159 let mut blocks = Vec::new();
4161 for i in 0..3 {
4162 let block =
4163 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4164 blocks.push(block);
4165 }
4166
4167 let provider_rw = factory.provider_rw().unwrap();
4168 for block in blocks {
4169 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4170 }
4171 provider_rw.commit().unwrap();
4172
4173 let provider = factory.provider().unwrap();
4174 let result = provider.receipts_by_block_range(1..=3).unwrap();
4175
4176 assert_eq!(result.len(), 3);
4177 for block_receipts in result {
4178 assert_eq!(block_receipts.len(), 0);
4179 }
4180 }
4181
4182 #[test]
4183 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4184 let factory = create_test_provider_factory();
4185 let data = BlockchainTestData::default();
4186
4187 let provider_rw = factory.provider_rw().unwrap();
4188 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4189 provider_rw
4190 .write_state(
4191 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4192 crate::OriginalValuesKnown::No,
4193 StateWriteConfig::default(),
4194 )
4195 .unwrap();
4196 for i in 0..3 {
4197 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4198 provider_rw
4199 .write_state(
4200 &data.blocks[i].1,
4201 crate::OriginalValuesKnown::No,
4202 StateWriteConfig::default(),
4203 )
4204 .unwrap();
4205 }
4206 provider_rw.commit().unwrap();
4207
4208 let provider = factory.provider().unwrap();
4209
4210 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4212
4213 let mut individual_results = Vec::new();
4215 for block_num in 1..=3 {
4216 let receipts =
4217 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4218 individual_results.push(receipts);
4219 }
4220
4221 assert_eq!(range_result, individual_results);
4222 }
4223
4224 #[test]
4225 fn test_write_trie_updates_sorted() {
4226 use reth_trie::{
4227 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4228 BranchNodeCompact, StorageTrieEntry,
4229 };
4230
4231 let factory = create_test_provider_factory();
4232 let provider_rw = factory.provider_rw().unwrap();
4233
4234 {
4236 let tx = provider_rw.tx_ref();
4237 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4238
4239 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4241 cursor
4242 .upsert(
4243 to_delete,
4244 &BranchNodeCompact::new(
4245 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4249 None,
4250 ),
4251 )
4252 .unwrap();
4253
4254 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4256 cursor
4257 .upsert(
4258 to_update,
4259 &BranchNodeCompact::new(
4260 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4264 None,
4265 ),
4266 )
4267 .unwrap();
4268 }
4269
4270 let storage_address1 = B256::from([1u8; 32]);
4272 let storage_address2 = B256::from([2u8; 32]);
4273 {
4274 let tx = provider_rw.tx_ref();
4275 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4276
4277 storage_cursor
4279 .upsert(
4280 storage_address1,
4281 &StorageTrieEntry {
4282 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4283 node: BranchNodeCompact::new(
4284 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4286 0b0000_0000_0000_0000,
4287 vec![],
4288 None,
4289 ),
4290 },
4291 )
4292 .unwrap();
4293
4294 storage_cursor
4296 .upsert(
4297 storage_address2,
4298 &StorageTrieEntry {
4299 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4300 node: BranchNodeCompact::new(
4301 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4303 0b0000_0000_0000_0000,
4304 vec![],
4305 None,
4306 ),
4307 },
4308 )
4309 .unwrap();
4310 storage_cursor
4311 .upsert(
4312 storage_address2,
4313 &StorageTrieEntry {
4314 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4315 node: BranchNodeCompact::new(
4316 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4318 0b0000_0000_0000_0000,
4319 vec![],
4320 None,
4321 ),
4322 },
4323 )
4324 .unwrap();
4325 }
4326
4327 let account_nodes = vec![
4329 (
4330 Nibbles::from_nibbles([0x1, 0x2]),
4331 Some(BranchNodeCompact::new(
4332 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4336 None,
4337 )),
4338 ),
4339 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4341 Nibbles::from_nibbles([0x5, 0x6]),
4342 Some(BranchNodeCompact::new(
4343 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4347 None,
4348 )),
4349 ),
4350 ];
4351
4352 let storage_trie1 = StorageTrieUpdatesSorted {
4354 is_deleted: false,
4355 storage_nodes: vec![
4356 (
4357 Nibbles::from_nibbles([0x1, 0x0]),
4358 Some(BranchNodeCompact::new(
4359 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4363 None,
4364 )),
4365 ),
4366 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4368 };
4369
4370 let storage_trie2 = StorageTrieUpdatesSorted {
4371 is_deleted: true, storage_nodes: vec![],
4373 };
4374
4375 let mut storage_tries = B256Map::default();
4376 storage_tries.insert(storage_address1, storage_trie1);
4377 storage_tries.insert(storage_address2, storage_trie2);
4378
4379 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4380
4381 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4383
4384 assert_eq!(num_entries, 5);
4387
4388 let tx = provider_rw.tx_ref();
4390 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4391
4392 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4394 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4395 assert!(entry1.is_some(), "Updated account node should exist");
4396 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4397 assert_eq!(
4398 entry1.unwrap().1.state_mask,
4399 expected_mask,
4400 "Account node should have updated state_mask"
4401 );
4402
4403 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4405 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4406 assert!(entry2.is_none(), "Deleted account node should not exist");
4407
4408 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4410 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4411 assert!(entry3.is_some(), "New account node should exist");
4412
4413 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4415
4416 let storage_entries1: Vec<_> = storage_cursor
4418 .walk_dup(Some(storage_address1), None)
4419 .unwrap()
4420 .collect::<Result<Vec<_>, _>>()
4421 .unwrap();
4422 assert_eq!(
4423 storage_entries1.len(),
4424 1,
4425 "Storage address1 should have 1 entry after deletion"
4426 );
4427 assert_eq!(
4428 storage_entries1[0].1.nibbles.0,
4429 Nibbles::from_nibbles([0x1, 0x0]),
4430 "Remaining entry should be [0x1, 0x0]"
4431 );
4432
4433 let storage_entries2: Vec<_> = storage_cursor
4435 .walk_dup(Some(storage_address2), None)
4436 .unwrap()
4437 .collect::<Result<Vec<_>, _>>()
4438 .unwrap();
4439 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4440
4441 provider_rw.commit().unwrap();
4442 }
4443
4444 #[test]
4445 fn test_prunable_receipts_logic() {
4446 let insert_blocks =
4447 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4448 let mut rng = generators::rng();
4449 for block_num in 0..=tip_block {
4450 let block = random_block(
4451 &mut rng,
4452 block_num,
4453 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4454 );
4455 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4456 }
4457 };
4458
4459 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4460 let outcome = ExecutionOutcome {
4461 first_block: block,
4462 receipts: vec![vec![Receipt {
4463 tx_type: Default::default(),
4464 success: true,
4465 cumulative_gas_used: block, logs: vec![],
4467 }]],
4468 ..Default::default()
4469 };
4470 provider_rw
4471 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4472 .unwrap();
4473 provider_rw.commit().unwrap();
4474 };
4475
4476 {
4478 let factory = create_test_provider_factory();
4479 let storage_settings = StorageSettings::v1();
4480 factory.set_storage_settings_cache(storage_settings);
4481 let factory = factory.with_prune_modes(PruneModes {
4482 receipts: Some(PruneMode::Before(100)),
4483 ..Default::default()
4484 });
4485
4486 let tip_block = 200u64;
4487 let first_block = 1u64;
4488
4489 let provider_rw = factory.provider_rw().unwrap();
4491 insert_blocks(&provider_rw, tip_block, 1);
4492 provider_rw.commit().unwrap();
4493
4494 write_receipts(
4495 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4496 first_block,
4497 );
4498 write_receipts(
4499 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4500 tip_block - 1,
4501 );
4502
4503 let provider = factory.provider().unwrap();
4504
4505 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4506 assert!(provider
4507 .receipts_by_block(block.into())
4508 .unwrap()
4509 .is_some_and(|r| r.len() == num_receipts));
4510 }
4511 }
4512
4513 {
4515 let factory = create_test_provider_factory();
4516 let storage_settings = StorageSettings::v2();
4517 factory.set_storage_settings_cache(storage_settings);
4518 let factory = factory.with_prune_modes(PruneModes {
4519 receipts: Some(PruneMode::Before(2)),
4520 ..Default::default()
4521 });
4522
4523 let tip_block = 200u64;
4524
4525 let provider_rw = factory.provider_rw().unwrap();
4527 insert_blocks(&provider_rw, tip_block, 1);
4528 provider_rw.commit().unwrap();
4529
4530 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4532 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4533
4534 assert!(factory
4535 .static_file_provider()
4536 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4537 .is_none(),);
4538 assert!(factory
4539 .static_file_provider()
4540 .get_highest_static_file_block(StaticFileSegment::Receipts)
4541 .is_some_and(|b| b == 1),);
4542
4543 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4546 assert!(factory
4547 .static_file_provider()
4548 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4549 .is_some_and(|num| num == 2),);
4550
4551 let factory = factory.with_prune_modes(PruneModes {
4555 receipts: Some(PruneMode::Before(100)),
4556 ..Default::default()
4557 });
4558 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4559 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4560 write_receipts(provider_rw, 3);
4561
4562 let provider = factory.provider().unwrap();
4567 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4568 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4569 assert!(provider
4570 .receipts_by_block(num.into())
4571 .unwrap()
4572 .is_some_and(|r| r.len() == num_receipts));
4573
4574 let receipt = provider.receipt(num).unwrap();
4575 if num_receipts > 0 {
4576 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4577 } else {
4578 assert!(receipt.is_none());
4579 }
4580 }
4581 }
4582 }
4583
4584 #[test]
4585 fn test_try_into_history_rejects_unexecuted_blocks() {
4586 use reth_storage_api::TryIntoHistoricalStateProvider;
4587
4588 let factory = create_test_provider_factory();
4589
4590 let data = BlockchainTestData::default();
4592 let provider_rw = factory.provider_rw().unwrap();
4593 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4594 provider_rw
4595 .write_state(
4596 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4597 crate::OriginalValuesKnown::No,
4598 StateWriteConfig::default(),
4599 )
4600 .unwrap();
4601 provider_rw.commit().unwrap();
4602
4603 let provider = factory.provider().unwrap();
4605
4606 let result = provider.try_into_history_at_block(0);
4608 assert!(result.is_ok(), "Block 0 should be available");
4609
4610 let provider = factory.provider().unwrap();
4612 let result = provider.try_into_history_at_block(100);
4613
4614 match result {
4616 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4617 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4618 Ok(_) => panic!("Expected error, got Ok"),
4619 }
4620 }
4621
4622 #[test]
4623 fn test_unwind_storage_hashing_with_hashed_state() {
4624 let factory = create_test_provider_factory();
4625 let storage_settings = StorageSettings::v2();
4626 factory.set_storage_settings_cache(storage_settings);
4627
4628 let address = Address::random();
4629 let hashed_address = keccak256(address);
4630
4631 let plain_slot = B256::random();
4632 let hashed_slot = keccak256(plain_slot);
4633
4634 let current_value = U256::from(100);
4635 let old_value = U256::from(42);
4636
4637 let provider_rw = factory.provider_rw().unwrap();
4638 provider_rw
4639 .tx
4640 .cursor_dup_write::<tables::HashedStorages>()
4641 .unwrap()
4642 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4643 .unwrap();
4644
4645 let changesets = vec![(
4646 BlockNumberAddress((1, address)),
4647 StorageEntry { key: plain_slot, value: old_value },
4648 )];
4649
4650 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4651
4652 assert_eq!(result.len(), 1);
4653 assert!(result.contains_key(&hashed_address));
4654 assert!(result[&hashed_address].contains(&hashed_slot));
4655
4656 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4657 let entry = cursor
4658 .seek_by_key_subkey(hashed_address, hashed_slot)
4659 .unwrap()
4660 .expect("entry should exist");
4661 assert_eq!(entry.key, hashed_slot);
4662 assert_eq!(entry.value, old_value);
4663 }
4664
4665 #[test]
4666 fn test_write_and_remove_state_roundtrip_legacy() {
4667 let factory = create_test_provider_factory();
4668 let storage_settings = StorageSettings::v1();
4669 assert!(!storage_settings.use_hashed_state());
4670 factory.set_storage_settings_cache(storage_settings);
4671
4672 let address = Address::with_last_byte(1);
4673 let hashed_address = keccak256(address);
4674 let slot = U256::from(5);
4675 let slot_key = B256::from(slot);
4676 let hashed_slot = keccak256(slot_key);
4677
4678 let mut rng = generators::rng();
4679 let block0 =
4680 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4681 let block1 =
4682 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4683
4684 {
4685 let provider_rw = factory.provider_rw().unwrap();
4686 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4687 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4688 provider_rw
4689 .tx
4690 .cursor_write::<tables::PlainAccountState>()
4691 .unwrap()
4692 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4693 .unwrap();
4694 provider_rw.commit().unwrap();
4695 }
4696
4697 let provider_rw = factory.provider_rw().unwrap();
4698
4699 let mut state_init: BundleStateInit = AddressMap::default();
4700 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4701 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4702 state_init.insert(
4703 address,
4704 (
4705 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4706 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4707 storage_map,
4708 ),
4709 );
4710
4711 let mut reverts_init: RevertsInit = HashMap::default();
4712 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4713 block_reverts.insert(
4714 address,
4715 (
4716 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4717 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4718 ),
4719 );
4720 reverts_init.insert(1, block_reverts);
4721
4722 let execution_outcome =
4723 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4724
4725 provider_rw
4726 .write_state(
4727 &execution_outcome,
4728 OriginalValuesKnown::Yes,
4729 StateWriteConfig {
4730 write_receipts: false,
4731 write_account_changesets: true,
4732 write_storage_changesets: true,
4733 },
4734 )
4735 .unwrap();
4736
4737 let hashed_state =
4738 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4739 provider_rw.write_hashed_state(&hashed_state).unwrap();
4740
4741 let account = provider_rw
4742 .tx
4743 .cursor_read::<tables::PlainAccountState>()
4744 .unwrap()
4745 .seek_exact(address)
4746 .unwrap()
4747 .unwrap()
4748 .1;
4749 assert_eq!(account.nonce, 1);
4750
4751 let storage_entry = provider_rw
4752 .tx
4753 .cursor_dup_read::<tables::PlainStorageState>()
4754 .unwrap()
4755 .seek_by_key_subkey(address, slot_key)
4756 .unwrap()
4757 .unwrap();
4758 assert_eq!(storage_entry.key, slot_key);
4759 assert_eq!(storage_entry.value, U256::from(10));
4760
4761 let hashed_entry = provider_rw
4762 .tx
4763 .cursor_dup_read::<tables::HashedStorages>()
4764 .unwrap()
4765 .seek_by_key_subkey(hashed_address, hashed_slot)
4766 .unwrap()
4767 .unwrap();
4768 assert_eq!(hashed_entry.key, hashed_slot);
4769 assert_eq!(hashed_entry.value, U256::from(10));
4770
4771 let account_cs_entries = provider_rw
4772 .tx
4773 .cursor_dup_read::<tables::AccountChangeSets>()
4774 .unwrap()
4775 .walk(Some(1))
4776 .unwrap()
4777 .collect::<Result<Vec<_>, _>>()
4778 .unwrap();
4779 assert!(!account_cs_entries.is_empty());
4780
4781 let storage_cs_entries = provider_rw
4782 .tx
4783 .cursor_read::<tables::StorageChangeSets>()
4784 .unwrap()
4785 .walk(Some(BlockNumberAddress((1, address))))
4786 .unwrap()
4787 .collect::<Result<Vec<_>, _>>()
4788 .unwrap();
4789 assert!(!storage_cs_entries.is_empty());
4790 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4791
4792 provider_rw.remove_state_above(0).unwrap();
4793
4794 let restored_account = provider_rw
4795 .tx
4796 .cursor_read::<tables::PlainAccountState>()
4797 .unwrap()
4798 .seek_exact(address)
4799 .unwrap()
4800 .unwrap()
4801 .1;
4802 assert_eq!(restored_account.nonce, 0);
4803
4804 let storage_gone = provider_rw
4805 .tx
4806 .cursor_dup_read::<tables::PlainStorageState>()
4807 .unwrap()
4808 .seek_by_key_subkey(address, slot_key)
4809 .unwrap();
4810 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4811
4812 let account_cs_after = provider_rw
4813 .tx
4814 .cursor_dup_read::<tables::AccountChangeSets>()
4815 .unwrap()
4816 .walk(Some(1))
4817 .unwrap()
4818 .collect::<Result<Vec<_>, _>>()
4819 .unwrap();
4820 assert!(account_cs_after.is_empty());
4821
4822 let storage_cs_after = provider_rw
4823 .tx
4824 .cursor_read::<tables::StorageChangeSets>()
4825 .unwrap()
4826 .walk(Some(BlockNumberAddress((1, address))))
4827 .unwrap()
4828 .collect::<Result<Vec<_>, _>>()
4829 .unwrap();
4830 assert!(storage_cs_after.is_empty());
4831 }
4832
4833 #[test]
4834 fn test_unwind_storage_hashing_legacy() {
4835 let factory = create_test_provider_factory();
4836 let storage_settings = StorageSettings::v1();
4837 assert!(!storage_settings.use_hashed_state());
4838 factory.set_storage_settings_cache(storage_settings);
4839
4840 let address = Address::random();
4841 let hashed_address = keccak256(address);
4842
4843 let plain_slot = B256::random();
4844 let hashed_slot = keccak256(plain_slot);
4845
4846 let current_value = U256::from(100);
4847 let old_value = U256::from(42);
4848
4849 let provider_rw = factory.provider_rw().unwrap();
4850 provider_rw
4851 .tx
4852 .cursor_dup_write::<tables::HashedStorages>()
4853 .unwrap()
4854 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4855 .unwrap();
4856
4857 let changesets = vec![(
4858 BlockNumberAddress((1, address)),
4859 StorageEntry { key: plain_slot, value: old_value },
4860 )];
4861
4862 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4863
4864 assert_eq!(result.len(), 1);
4865 assert!(result.contains_key(&hashed_address));
4866 assert!(result[&hashed_address].contains(&hashed_slot));
4867
4868 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4869 let entry = cursor
4870 .seek_by_key_subkey(hashed_address, hashed_slot)
4871 .unwrap()
4872 .expect("entry should exist");
4873 assert_eq!(entry.key, hashed_slot);
4874 assert_eq!(entry.value, old_value);
4875 }
4876
4877 #[test]
4878 fn test_write_state_and_historical_read_hashed() {
4879 use reth_storage_api::StateProvider;
4880 use reth_trie::{HashedPostState, KeccakKeyHasher};
4881 use revm_database::BundleState;
4882 use revm_state::AccountInfo;
4883
4884 let factory = create_test_provider_factory();
4885 factory.set_storage_settings_cache(StorageSettings::v2());
4886
4887 let address = Address::with_last_byte(1);
4888 let slot = U256::from(5);
4889 let slot_key = B256::from(slot);
4890 let hashed_address = keccak256(address);
4891 let hashed_slot = keccak256(slot_key);
4892
4893 {
4894 let sf = factory.static_file_provider();
4895 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4896 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4897 hw.append_header(&h0, &B256::ZERO).unwrap();
4898 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4899 hw.append_header(&h1, &B256::ZERO).unwrap();
4900 hw.commit().unwrap();
4901
4902 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4903 aw.append_account_changeset(vec![], 0).unwrap();
4904 aw.commit().unwrap();
4905
4906 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4907 sw.append_storage_changeset(vec![], 0).unwrap();
4908 sw.commit().unwrap();
4909 }
4910
4911 let provider_rw = factory.provider_rw().unwrap();
4912
4913 let bundle = BundleState::builder(1..=1)
4914 .state_present_account_info(
4915 address,
4916 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4917 )
4918 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4919 .revert_account_info(1, address, Some(None))
4920 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4921 .build();
4922
4923 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4924
4925 provider_rw
4926 .tx
4927 .put::<tables::BlockBodyIndices>(
4928 1,
4929 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4930 )
4931 .unwrap();
4932
4933 provider_rw
4934 .write_state(
4935 &execution_outcome,
4936 OriginalValuesKnown::Yes,
4937 StateWriteConfig {
4938 write_receipts: false,
4939 write_account_changesets: true,
4940 write_storage_changesets: true,
4941 },
4942 )
4943 .unwrap();
4944
4945 let hashed_state =
4946 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4947 provider_rw.write_hashed_state(&hashed_state).unwrap();
4948
4949 let plain_storage_entries = provider_rw
4950 .tx
4951 .cursor_dup_read::<tables::PlainStorageState>()
4952 .unwrap()
4953 .walk(None)
4954 .unwrap()
4955 .collect::<Result<Vec<_>, _>>()
4956 .unwrap();
4957 assert!(plain_storage_entries.is_empty());
4958
4959 let hashed_entry = provider_rw
4960 .tx
4961 .cursor_dup_read::<tables::HashedStorages>()
4962 .unwrap()
4963 .seek_by_key_subkey(hashed_address, hashed_slot)
4964 .unwrap()
4965 .unwrap();
4966 assert_eq!(hashed_entry.key, hashed_slot);
4967 assert_eq!(hashed_entry.value, U256::from(10));
4968
4969 provider_rw.static_file_provider().commit().unwrap();
4970
4971 let sf = factory.static_file_provider();
4972 let storage_cs = sf.storage_changeset(1).unwrap();
4973 assert!(!storage_cs.is_empty());
4974 assert_eq!(storage_cs[0].1.key, slot_key);
4975
4976 let account_cs = sf.account_block_changeset(1).unwrap();
4977 assert!(!account_cs.is_empty());
4978 assert_eq!(account_cs[0].address, address);
4979
4980 let historical_value =
4981 HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
4982 .storage(address, slot_key)
4983 .unwrap();
4984 assert_eq!(historical_value, None);
4985 }
4986
4987 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4988 enum StorageMode {
4989 V1,
4990 V2,
4991 }
4992
4993 fn run_save_blocks_and_verify(mode: StorageMode) {
4994 use alloy_primitives::map::{FbBuildHasher, HashMap};
4995
4996 let factory = create_test_provider_factory();
4997
4998 match mode {
4999 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
5000 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
5001 }
5002
5003 let num_blocks = 3u64;
5004 let accounts_per_block = 5usize;
5005 let slots_per_account = 3usize;
5006
5007 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
5008 SealedHeader::new(
5009 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
5010 B256::ZERO,
5011 ),
5012 Default::default(),
5013 );
5014
5015 let genesis_executed = ExecutedBlock::new(
5016 Arc::new(genesis.try_recover().unwrap()),
5017 Arc::new(BlockExecutionOutput {
5018 result: BlockExecutionResult {
5019 receipts: vec![],
5020 requests: Default::default(),
5021 gas_used: 0,
5022 blob_gas_used: 0,
5023 },
5024 state: Default::default(),
5025 }),
5026 ComputedTrieData::default(),
5027 );
5028 let provider_rw = factory.provider_rw().unwrap();
5029 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5030 provider_rw.commit().unwrap();
5031
5032 let mut blocks: Vec<ExecutedBlock> = Vec::new();
5033 let mut parent_hash = B256::ZERO;
5034
5035 for block_num in 1..=num_blocks {
5036 let mut builder = BundleState::builder(block_num..=block_num);
5037
5038 for acct_idx in 0..accounts_per_block {
5039 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5040 let info = AccountInfo {
5041 nonce: block_num,
5042 balance: U256::from(block_num * 100 + acct_idx as u64),
5043 ..Default::default()
5044 };
5045
5046 let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5047 slots_per_account as u64)
5048 .map(|s| {
5049 (
5050 U256::from(s + acct_idx as u64 * 100),
5051 (U256::ZERO, U256::from(block_num * 1000 + s)),
5052 )
5053 })
5054 .collect();
5055
5056 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5057 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5058 .collect();
5059
5060 builder = builder
5061 .state_present_account_info(address, info)
5062 .revert_account_info(block_num, address, Some(None))
5063 .state_storage(address, storage)
5064 .revert_storage(block_num, address, revert_storage);
5065 }
5066
5067 let bundle = builder.build();
5068
5069 let hashed_state =
5070 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5071
5072 let header = Header {
5073 number: block_num,
5074 parent_hash,
5075 difficulty: U256::from(1),
5076 ..Default::default()
5077 };
5078 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5079 header,
5080 Default::default(),
5081 );
5082 parent_hash = block.hash();
5083
5084 let executed = ExecutedBlock::new(
5085 Arc::new(block.try_recover().unwrap()),
5086 Arc::new(BlockExecutionOutput {
5087 result: BlockExecutionResult {
5088 receipts: vec![],
5089 requests: Default::default(),
5090 gas_used: 0,
5091 blob_gas_used: 0,
5092 },
5093 state: bundle,
5094 }),
5095 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5096 );
5097 blocks.push(executed);
5098 }
5099
5100 let provider_rw = factory.provider_rw().unwrap();
5101 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5102 provider_rw.commit().unwrap();
5103
5104 let provider = factory.provider().unwrap();
5105
5106 for block_num in 1..=num_blocks {
5107 for acct_idx in 0..accounts_per_block {
5108 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5109 let hashed_address = keccak256(address);
5110
5111 let ha_entry = provider
5112 .tx_ref()
5113 .cursor_read::<tables::HashedAccounts>()
5114 .unwrap()
5115 .seek_exact(hashed_address)
5116 .unwrap();
5117 assert!(
5118 ha_entry.is_some(),
5119 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5120 );
5121
5122 for s in 1..=slots_per_account as u64 {
5123 let slot = U256::from(s + acct_idx as u64 * 100);
5124 let slot_key = B256::from(slot);
5125 let hashed_slot = keccak256(slot_key);
5126
5127 let hs_entry = provider
5128 .tx_ref()
5129 .cursor_dup_read::<tables::HashedStorages>()
5130 .unwrap()
5131 .seek_by_key_subkey(hashed_address, hashed_slot)
5132 .unwrap();
5133 assert!(
5134 hs_entry.is_some(),
5135 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5136 );
5137 let entry = hs_entry.unwrap();
5138 assert_eq!(entry.key, hashed_slot);
5139 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5140 }
5141 }
5142 }
5143
5144 for block_num in 1..=num_blocks {
5145 let header = provider.header_by_number(block_num).unwrap();
5146 assert!(header.is_some(), "Header missing for block {block_num}");
5147
5148 let indices = provider.block_body_indices(block_num).unwrap();
5149 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5150 }
5151
5152 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5153 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5154
5155 if mode == StorageMode::V2 {
5156 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5157 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5158
5159 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5160 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5161
5162 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5163 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5164
5165 provider.static_file_provider().commit().unwrap();
5166 let sf = factory.static_file_provider();
5167
5168 for block_num in 1..=num_blocks {
5169 let account_cs = sf.account_block_changeset(block_num).unwrap();
5170 assert!(
5171 !account_cs.is_empty(),
5172 "v2: static file AccountChangeSets should exist for block {block_num}"
5173 );
5174
5175 let storage_cs = sf.storage_changeset(block_num).unwrap();
5176 assert!(
5177 !storage_cs.is_empty(),
5178 "v2: static file StorageChangeSets should exist for block {block_num}"
5179 );
5180
5181 for (_, entry) in &storage_cs {
5182 assert!(
5183 entry.key != keccak256(entry.key),
5184 "v2: static file storage changeset should have plain slot keys"
5185 );
5186 }
5187 }
5188
5189 let rocksdb = factory.rocksdb_provider();
5190 for block_num in 1..=num_blocks {
5191 for acct_idx in 0..accounts_per_block {
5192 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5193 let shards = rocksdb.account_history_shards(address).unwrap();
5194 assert!(
5195 !shards.is_empty(),
5196 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5197 );
5198
5199 for s in 1..=slots_per_account as u64 {
5200 let slot = U256::from(s + acct_idx as u64 * 100);
5201 let slot_key = B256::from(slot);
5202 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5203 assert!(
5204 !shards.is_empty(),
5205 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5206 );
5207 }
5208 }
5209 }
5210 } else {
5211 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5212 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5213
5214 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5215 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5216
5217 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5218 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5219
5220 for block_num in 1..=num_blocks {
5221 let storage_entries: Vec<_> = provider
5222 .tx_ref()
5223 .cursor_dup_read::<tables::StorageChangeSets>()
5224 .unwrap()
5225 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5226 .unwrap()
5227 .collect::<Result<Vec<_>, _>>()
5228 .unwrap();
5229 assert!(
5230 !storage_entries.is_empty(),
5231 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5232 );
5233
5234 for (_, entry) in &storage_entries {
5235 let slot_key = B256::from(entry.key);
5236 assert!(
5237 slot_key != keccak256(slot_key),
5238 "v1: storage changeset keys should be plain (not hashed)"
5239 );
5240 }
5241 }
5242
5243 let mdbx_account_history =
5244 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5245 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5246
5247 let mdbx_storage_history =
5248 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5249 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5250 }
5251 }
5252
5253 #[test]
5254 fn test_save_blocks_v1_table_assertions() {
5255 run_save_blocks_and_verify(StorageMode::V1);
5256 }
5257
5258 #[test]
5259 fn test_save_blocks_v2_table_assertions() {
5260 run_save_blocks_and_verify(StorageMode::V2);
5261 }
5262
5263 #[test]
5264 fn test_write_and_remove_state_roundtrip_v2() {
5265 let factory = create_test_provider_factory();
5266 let storage_settings = StorageSettings::v2();
5267 assert!(storage_settings.use_hashed_state());
5268 factory.set_storage_settings_cache(storage_settings);
5269
5270 let address = Address::with_last_byte(1);
5271 let hashed_address = keccak256(address);
5272 let slot = U256::from(5);
5273 let slot_key = B256::from(slot);
5274 let hashed_slot = keccak256(slot_key);
5275
5276 {
5277 let sf = factory.static_file_provider();
5278 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5279 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5280 hw.append_header(&h0, &B256::ZERO).unwrap();
5281 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5282 hw.append_header(&h1, &B256::ZERO).unwrap();
5283 hw.commit().unwrap();
5284
5285 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5286 aw.append_account_changeset(vec![], 0).unwrap();
5287 aw.commit().unwrap();
5288
5289 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5290 sw.append_storage_changeset(vec![], 0).unwrap();
5291 sw.commit().unwrap();
5292 }
5293
5294 {
5295 let provider_rw = factory.provider_rw().unwrap();
5296 provider_rw
5297 .tx
5298 .put::<tables::BlockBodyIndices>(
5299 0,
5300 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5301 )
5302 .unwrap();
5303 provider_rw
5304 .tx
5305 .put::<tables::BlockBodyIndices>(
5306 1,
5307 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5308 )
5309 .unwrap();
5310 provider_rw
5311 .tx
5312 .cursor_write::<tables::HashedAccounts>()
5313 .unwrap()
5314 .upsert(
5315 hashed_address,
5316 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5317 )
5318 .unwrap();
5319 provider_rw.commit().unwrap();
5320 }
5321
5322 let provider_rw = factory.provider_rw().unwrap();
5323
5324 let bundle = BundleState::builder(1..=1)
5325 .state_present_account_info(
5326 address,
5327 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5328 )
5329 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5330 .revert_account_info(1, address, Some(None))
5331 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5332 .build();
5333
5334 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5335
5336 provider_rw
5337 .write_state(
5338 &execution_outcome,
5339 OriginalValuesKnown::Yes,
5340 StateWriteConfig {
5341 write_receipts: false,
5342 write_account_changesets: true,
5343 write_storage_changesets: true,
5344 },
5345 )
5346 .unwrap();
5347
5348 let hashed_state =
5349 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5350 provider_rw.write_hashed_state(&hashed_state).unwrap();
5351
5352 let hashed_account = provider_rw
5353 .tx
5354 .cursor_read::<tables::HashedAccounts>()
5355 .unwrap()
5356 .seek_exact(hashed_address)
5357 .unwrap()
5358 .unwrap()
5359 .1;
5360 assert_eq!(hashed_account.nonce, 1);
5361
5362 let hashed_entry = provider_rw
5363 .tx
5364 .cursor_dup_read::<tables::HashedStorages>()
5365 .unwrap()
5366 .seek_by_key_subkey(hashed_address, hashed_slot)
5367 .unwrap()
5368 .unwrap();
5369 assert_eq!(hashed_entry.key, hashed_slot);
5370 assert_eq!(hashed_entry.value, U256::from(10));
5371
5372 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5373 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5374
5375 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5376 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5377
5378 provider_rw.static_file_provider().commit().unwrap();
5379
5380 let sf = factory.static_file_provider();
5381 let storage_cs = sf.storage_changeset(1).unwrap();
5382 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5383 assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5384
5385 provider_rw.remove_state_above(0).unwrap();
5386
5387 let restored_account = provider_rw
5388 .tx
5389 .cursor_read::<tables::HashedAccounts>()
5390 .unwrap()
5391 .seek_exact(hashed_address)
5392 .unwrap();
5393 assert!(
5394 restored_account.is_none(),
5395 "v2: account should be removed (didn't exist before block 1)"
5396 );
5397
5398 let storage_gone = provider_rw
5399 .tx
5400 .cursor_dup_read::<tables::HashedStorages>()
5401 .unwrap()
5402 .seek_by_key_subkey(hashed_address, hashed_slot)
5403 .unwrap();
5404 assert!(
5405 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5406 "v2: storage should be reverted (removed or different key)"
5407 );
5408
5409 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5410 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5411
5412 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5413 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5414 }
5415
5416 #[test]
5417 fn test_unwind_storage_history_indices_v2() {
5418 let factory = create_test_provider_factory();
5419 factory.set_storage_settings_cache(StorageSettings::v2());
5420
5421 let address = Address::with_last_byte(1);
5422 let slot_key = B256::from(U256::from(42));
5423
5424 {
5425 let rocksdb = factory.rocksdb_provider();
5426 let mut batch = rocksdb.batch();
5427 batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5428 batch.commit().unwrap();
5429
5430 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5431 assert!(!shards.is_empty(), "history should be written to rocksdb");
5432 }
5433
5434 let provider_rw = factory.provider_rw().unwrap();
5435
5436 let changesets = vec![
5437 (
5438 BlockNumberAddress((7, address)),
5439 StorageEntry { key: slot_key, value: U256::from(5) },
5440 ),
5441 (
5442 BlockNumberAddress((10, address)),
5443 StorageEntry { key: slot_key, value: U256::from(8) },
5444 ),
5445 ];
5446
5447 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5448 assert_eq!(count, 2);
5449
5450 provider_rw.commit().unwrap();
5451
5452 let rocksdb = factory.rocksdb_provider();
5453 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5454
5455 assert!(
5456 !shards.is_empty(),
5457 "history shards should still exist with block 3 after partial unwind"
5458 );
5459
5460 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5461 assert!(all_blocks.contains(&3), "block 3 should remain");
5462 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5463 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5464 }
5465}