1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, StorageKey, StorageValue, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::{Database, ReaderTxnTracker},
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55 SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateReader, StateWriteConfig, StorageChangeSetReader,
65 StoragePath, StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use smallvec::SmallVec;
77use std::{
78 cmp::Ordering,
79 collections::{BTreeMap, BTreeSet},
80 fmt::Debug,
81 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
82 path::PathBuf,
83 sync::Arc,
84};
85use tracing::{debug, instrument, trace};
86
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
89pub enum CommitOrder {
90 #[default]
92 Normal,
93 Unwind,
96}
97
98impl CommitOrder {
99 pub const fn is_unwind(&self) -> bool {
101 matches!(self, Self::Unwind)
102 }
103}
104
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
107
108#[derive(Debug)]
113pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
114 pub DatabaseProvider<<DB as Database>::TXMut, N>,
115);
116
117impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
118 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
119
120 fn deref(&self) -> &Self::Target {
121 &self.0
122 }
123}
124
125impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
126 fn deref_mut(&mut self) -> &mut Self::Target {
127 &mut self.0
128 }
129}
130
131impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
132 for DatabaseProviderRW<DB, N>
133{
134 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
135 &self.0
136 }
137}
138
139impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
140 pub fn commit(self) -> ProviderResult<()> {
142 self.0.commit()
143 }
144
145 pub fn into_tx(self) -> <DB as Database>::TXMut {
147 self.0.into_tx()
148 }
149
150 #[cfg(any(test, feature = "test-utils"))]
152 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
153 self.0.minimum_pruning_distance = distance;
154 self
155 }
156}
157
158impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
159 for DatabaseProvider<<DB as Database>::TXMut, N>
160{
161 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
162 provider.0
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum SaveBlocksMode {
169 Full,
172 BlocksOnly,
176}
177
178impl SaveBlocksMode {
179 pub const fn with_state(self) -> bool {
181 matches!(self, Self::Full)
182 }
183}
184
185pub struct DatabaseProvider<TX, N: NodeTypes> {
188 tx: TX,
190 chain_spec: Arc<N::ChainSpec>,
192 static_file_provider: StaticFileProvider<N::Primitives>,
194 prune_modes: PruneModes,
196 storage: Arc<N::Storage>,
198 storage_settings: Arc<RwLock<StorageSettings>>,
200 rocksdb_provider: RocksDBProvider,
202 changeset_cache: ChangesetCache,
204 runtime: reth_tasks::Runtime,
206 db_path: PathBuf,
208 pending_rocksdb_batches: PendingRocksDBBatches,
210 commit_order: CommitOrder,
212 minimum_pruning_distance: u64,
214 metrics: metrics::DatabaseProviderMetrics,
216 reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
218}
219
220impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 let mut s = f.debug_struct("DatabaseProvider");
223 s.field("tx", &self.tx)
224 .field("chain_spec", &self.chain_spec)
225 .field("static_file_provider", &self.static_file_provider)
226 .field("prune_modes", &self.prune_modes)
227 .field("storage", &self.storage)
228 .field("storage_settings", &self.storage_settings)
229 .field("rocksdb_provider", &self.rocksdb_provider)
230 .field("changeset_cache", &self.changeset_cache)
231 .field("runtime", &self.runtime)
232 .field("pending_rocksdb_batches", &"<pending batches>")
233 .field("commit_order", &self.commit_order)
234 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
235 .field("reader_txn_tracker", &"<reader txn tracker>")
236 .finish()
237 }
238}
239
240impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
241 pub const fn prune_modes_ref(&self) -> &PruneModes {
243 &self.prune_modes
244 }
245
246 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
248 self.minimum_pruning_distance = distance;
249 self
250 }
251
252 pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
254 where
255 T: ReaderTxnTracker + 'static,
256 {
257 self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
258 self
259 }
260}
261
262impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
263 fn commit_unwind(self) -> ProviderResult<()> {
274 let storage_v2 = self.cached_storage_settings().storage_v2;
275 let reader_txn_tracker = self.reader_txn_tracker.clone();
276 self.tx.commit()?;
277
278 if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
279 reader_txn_tracker.wait_for_pre_commit_readers();
280 }
281
282 if storage_v2 {
283 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
284 for batch in batches {
285 self.rocksdb_provider.commit_batch(batch)?;
286 }
287 }
288
289 self.static_file_provider.commit()?;
290 Ok(())
291 }
292
293 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
295 trace!(target: "providers::db", "Returning latest state provider");
296 Box::new(LatestStateProviderRef::new(self))
297 }
298
299 pub fn history_by_block_hash<'a>(
301 &'a self,
302 block_hash: BlockHash,
303 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
304 let block_number =
305 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
306 self.history_by_block_number(block_number)
307 }
308
309 pub fn history_by_block_number<'a>(
311 &'a self,
312 mut block_number: BlockNumber,
313 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
314 if block_number == self.best_block_number().unwrap_or_default() &&
315 block_number == self.last_block_number().unwrap_or_default()
316 {
317 return Ok(Box::new(LatestStateProviderRef::new(self)))
318 }
319
320 block_number += 1;
322
323 let account_history_prune_checkpoint =
324 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
325 let storage_history_prune_checkpoint =
326 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
327
328 let mut state_provider =
329 HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
330 if let Some(prune_checkpoint_block_number) =
333 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
334 {
335 state_provider = state_provider.with_lowest_available_account_history_block_number(
336 prune_checkpoint_block_number + 1,
337 );
338 }
339 if let Some(prune_checkpoint_block_number) =
340 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
341 {
342 state_provider = state_provider.with_lowest_available_storage_history_block_number(
343 prune_checkpoint_block_number + 1,
344 );
345 }
346
347 Ok(Box::new(state_provider))
348 }
349
350 #[cfg(feature = "test-utils")]
351 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
353 self.prune_modes = prune_modes;
354 }
355}
356
357impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
358 type Primitives = N::Primitives;
359}
360
361impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
362 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
364 self.static_file_provider.clone()
365 }
366
367 fn get_static_file_writer(
368 &self,
369 block: BlockNumber,
370 segment: StaticFileSegment,
371 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
372 self.static_file_provider.get_writer(block, segment)
373 }
374}
375
376impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
377 fn rocksdb_provider(&self) -> RocksDBProvider {
379 self.rocksdb_provider.clone()
380 }
381
382 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
383 self.pending_rocksdb_batches.lock().push(batch);
384 }
385
386 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
387 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
388 for batch in batches {
389 self.rocksdb_provider.commit_batch(batch)?;
390 }
391 Ok(())
392 }
393}
394
395impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
396 for DatabaseProvider<TX, N>
397{
398 type ChainSpec = N::ChainSpec;
399
400 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
401 self.chain_spec.clone()
402 }
403}
404
405impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
406 #[expect(clippy::too_many_arguments)]
408 fn new_rw_inner(
409 tx: TX,
410 chain_spec: Arc<N::ChainSpec>,
411 static_file_provider: StaticFileProvider<N::Primitives>,
412 prune_modes: PruneModes,
413 storage: Arc<N::Storage>,
414 storage_settings: Arc<RwLock<StorageSettings>>,
415 rocksdb_provider: RocksDBProvider,
416 changeset_cache: ChangesetCache,
417 runtime: reth_tasks::Runtime,
418 db_path: PathBuf,
419 commit_order: CommitOrder,
420 ) -> Self {
421 Self {
422 tx,
423 chain_spec,
424 static_file_provider,
425 prune_modes,
426 storage,
427 storage_settings,
428 rocksdb_provider,
429 changeset_cache,
430 runtime,
431 db_path,
432 pending_rocksdb_batches: Default::default(),
433 commit_order,
434 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
435 metrics: metrics::DatabaseProviderMetrics::default(),
436 reader_txn_tracker: None,
437 }
438 }
439
440 #[expect(clippy::too_many_arguments)]
442 pub fn new_rw(
443 tx: TX,
444 chain_spec: Arc<N::ChainSpec>,
445 static_file_provider: StaticFileProvider<N::Primitives>,
446 prune_modes: PruneModes,
447 storage: Arc<N::Storage>,
448 storage_settings: Arc<RwLock<StorageSettings>>,
449 rocksdb_provider: RocksDBProvider,
450 changeset_cache: ChangesetCache,
451 runtime: reth_tasks::Runtime,
452 db_path: PathBuf,
453 ) -> Self {
454 Self::new_rw_inner(
455 tx,
456 chain_spec,
457 static_file_provider,
458 prune_modes,
459 storage,
460 storage_settings,
461 rocksdb_provider,
462 changeset_cache,
463 runtime,
464 db_path,
465 CommitOrder::Normal,
466 )
467 }
468
469 #[expect(clippy::too_many_arguments)]
471 pub fn new_unwind_rw(
472 tx: TX,
473 chain_spec: Arc<N::ChainSpec>,
474 static_file_provider: StaticFileProvider<N::Primitives>,
475 prune_modes: PruneModes,
476 storage: Arc<N::Storage>,
477 storage_settings: Arc<RwLock<StorageSettings>>,
478 rocksdb_provider: RocksDBProvider,
479 changeset_cache: ChangesetCache,
480 runtime: reth_tasks::Runtime,
481 db_path: PathBuf,
482 ) -> Self {
483 Self::new_rw_inner(
484 tx,
485 chain_spec,
486 static_file_provider,
487 prune_modes,
488 storage,
489 storage_settings,
490 rocksdb_provider,
491 changeset_cache,
492 runtime,
493 db_path,
494 CommitOrder::Unwind,
495 )
496 }
497}
498
499impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
500 fn as_ref(&self) -> &Self {
501 self
502 }
503}
504
505impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
506 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
510 where
511 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
512 {
513 let rocksdb = self.rocksdb_provider();
514 let rocksdb_batch = rocksdb.batch();
515
516 let (result, raw_batch) = f(rocksdb_batch)?;
517
518 if let Some(batch) = raw_batch {
519 self.set_pending_rocksdb_batch(batch);
520 }
521 let _ = raw_batch; Ok(result)
524 }
525
526 fn static_file_write_ctx(
528 &self,
529 save_mode: SaveBlocksMode,
530 first_block: BlockNumber,
531 last_block: BlockNumber,
532 ) -> ProviderResult<StaticFileWriteCtx> {
533 let tip = self.last_block_number()?.max(last_block);
534 Ok(StaticFileWriteCtx {
535 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
536 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
537 write_receipts: save_mode.with_state() &&
538 EitherWriter::receipts_destination(self).is_static_file(),
539 write_account_changesets: save_mode.with_state() &&
540 EitherWriterDestination::account_changesets(self).is_static_file(),
541 write_storage_changesets: save_mode.with_state() &&
542 EitherWriterDestination::storage_changesets(self).is_static_file(),
543 tip,
544 receipts_prune_mode: self.prune_modes.receipts,
545 receipts_prunable: self
547 .static_file_provider
548 .get_highest_static_file_tx(StaticFileSegment::Receipts)
549 .is_none() &&
550 PruneMode::Distance(self.minimum_pruning_distance)
551 .should_prune(first_block, tip),
552 })
553 }
554
555 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
557 RocksDBWriteCtx {
558 first_block_number: first_block,
559 prune_tx_lookup: self.prune_modes.transaction_lookup,
560 storage_settings: self.cached_storage_settings(),
561 pending_batches: self.pending_rocksdb_batches.clone(),
562 }
563 }
564
565 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
574 pub fn save_blocks(
575 &self,
576 blocks: Vec<ExecutedBlock<N::Primitives>>,
577 save_mode: SaveBlocksMode,
578 ) -> ProviderResult<()> {
579 if blocks.is_empty() {
580 debug!(target: "providers::db", "Attempted to write empty block range");
581 return Ok(())
582 }
583
584 let total_start = Instant::now();
585 let block_count = blocks.len() as u64;
586 let first_number = blocks.first().unwrap().recovered_block().number();
587 let last_block_number = blocks.last().unwrap().recovered_block().number();
588
589 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
590
591 let first_tx_num = self
593 .tx
594 .cursor_read::<tables::TransactionBlocks>()?
595 .last()?
596 .map(|(n, _)| n + 1)
597 .unwrap_or_default();
598
599 let tx_nums: SmallVec<[TxNumber; 4]> = {
600 let mut nums = SmallVec::with_capacity(blocks.len());
601 let mut current = first_tx_num;
602 for block in &blocks {
603 nums.push(current);
604 current += block.recovered_block().body().transaction_count() as u64;
605 }
606 nums
607 };
608
609 let mut timings =
610 metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
611
612 let sf_provider = &self.static_file_provider;
614 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
615 let rocksdb_provider = self.rocksdb_provider.clone();
616 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
617 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
618
619 let mut sf_result = None;
620 let mut rocksdb_result = None;
621
622 let runtime = &self.runtime;
624 let span = tracing::Span::current();
627 runtime.storage_pool().in_place_scope(|s| {
628 s.spawn(|_| {
630 let _guard = span.enter();
631 let start = Instant::now();
632 sf_result = Some(
633 sf_provider
634 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
635 .map(|()| start.elapsed()),
636 );
637 });
638
639 if rocksdb_enabled {
641 s.spawn(|_| {
642 let _guard = span.enter();
643 let start = Instant::now();
644 rocksdb_result = Some(
645 rocksdb_provider
646 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
647 .map(|()| start.elapsed()),
648 );
649 });
650 }
651
652 let mdbx_start = Instant::now();
654
655 if !self.cached_storage_settings().storage_v2 &&
657 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
658 {
659 let start = Instant::now();
660 let total_tx_count: usize =
661 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
662 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
663 for (i, block) in blocks.iter().enumerate() {
664 let recovered_block = block.recovered_block();
665 for (tx_num, transaction) in
666 (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
667 {
668 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
669 }
670 }
671
672 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
674
675 self.with_rocksdb_batch(|batch| {
677 let mut tx_hash_writer =
678 EitherWriter::new_transaction_hash_numbers(self, batch)?;
679 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
680 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
681 Ok(((), raw_batch))
682 })?;
683 self.metrics.record_duration(
684 metrics::Action::InsertTransactionHashNumbers,
685 start.elapsed(),
686 );
687 }
688
689 for (i, block) in blocks.iter().enumerate() {
690 let recovered_block = block.recovered_block();
691
692 let start = Instant::now();
693 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
694 timings.insert_block += start.elapsed();
695
696 if save_mode.with_state() {
697 let execution_output = block.execution_outcome();
698
699 let start = Instant::now();
703 self.write_state(
704 WriteStateInput::Single {
705 outcome: execution_output,
706 block: recovered_block.number(),
707 },
708 OriginalValuesKnown::No,
709 StateWriteConfig {
710 write_receipts: !sf_ctx.write_receipts,
711 write_account_changesets: !sf_ctx.write_account_changesets,
712 write_storage_changesets: !sf_ctx.write_storage_changesets,
713 },
714 )?;
715 timings.write_state += start.elapsed();
716 }
717 }
718
719 if save_mode.with_state() {
722 let start = Instant::now();
724 let merged_hashed_state = HashedPostStateSorted::merge_batch(
725 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
726 );
727 if !merged_hashed_state.is_empty() {
728 self.write_hashed_state(&merged_hashed_state)?;
729 }
730 timings.write_hashed_state += start.elapsed();
731
732 let start = Instant::now();
733 let merged_trie =
734 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
735 if !merged_trie.is_empty() {
736 self.write_trie_updates_sorted(&merged_trie)?;
737 }
738 timings.write_trie_updates += start.elapsed();
739 }
740
741 if save_mode.with_state() {
743 let start = Instant::now();
744 self.update_history_indices(first_number..=last_block_number)?;
745 timings.update_history_indices = start.elapsed();
746 }
747
748 let start = Instant::now();
750 self.update_pipeline_stages(last_block_number, false)?;
751 timings.update_pipeline_stages = start.elapsed();
752
753 timings.mdbx = mdbx_start.elapsed();
754
755 Ok::<_, ProviderError>(())
756 })?;
757
758 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
760
761 if rocksdb_enabled {
762 timings.rocksdb = rocksdb_result.ok_or_else(|| {
763 ProviderError::Database(reth_db_api::DatabaseError::Other(
764 "RocksDB thread panicked".into(),
765 ))
766 })??;
767 }
768
769 timings.total = total_start.elapsed();
770
771 self.metrics.record_save_blocks(&timings);
772 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
773
774 Ok(())
775 }
776
777 #[instrument(level = "debug", target = "providers::db", skip_all)]
781 fn insert_block_mdbx_only(
782 &self,
783 block: &RecoveredBlock<BlockTy<N>>,
784 first_tx_num: TxNumber,
785 ) -> ProviderResult<StoredBlockBodyIndices> {
786 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
787 EitherWriterDestination::senders(self).is_database()
788 {
789 let start = Instant::now();
790 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
791 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
792 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
793 cursor.append(tx_num, &sender)?;
794 }
795 self.metrics
796 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
797 }
798
799 let block_number = block.number();
800 let tx_count = block.body().transaction_count() as u64;
801
802 let start = Instant::now();
803 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
804 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
805
806 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
807
808 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
809 }
810
811 fn write_block_body_indices(
814 &self,
815 block_number: BlockNumber,
816 body: &BodyTy<N>,
817 first_tx_num: TxNumber,
818 tx_count: u64,
819 ) -> ProviderResult<()> {
820 let start = Instant::now();
822 self.tx
823 .cursor_write::<tables::BlockBodyIndices>()?
824 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
825 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
826
827 if tx_count > 0 {
829 let start = Instant::now();
830 self.tx
831 .cursor_write::<tables::TransactionBlocks>()?
832 .append(first_tx_num + tx_count - 1, &block_number)?;
833 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
834 }
835
836 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
838
839 Ok(())
840 }
841
842 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
847 let changed_accounts = self.account_changesets_range(from..)?;
848
849 self.unwind_account_hashing(changed_accounts.iter())?;
851
852 self.unwind_account_history_indices(changed_accounts.iter())?;
854
855 let changed_storages = self.storage_changesets_range(from..)?;
856
857 self.unwind_storage_hashing(changed_storages.iter().copied())?;
859
860 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
862
863 let db_tip_block = self
866 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
867 .as_ref()
868 .map(|chk| chk.block_number)
869 .ok_or_else(|| ProviderError::InsufficientChangesets {
870 requested: from,
871 available: 0..=0,
872 })?;
873
874 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
875 self.write_trie_updates_sorted(&trie_revert)?;
876
877 Ok(())
878 }
879
880 fn remove_receipts_from(
882 &self,
883 from_tx: TxNumber,
884 last_block: BlockNumber,
885 ) -> ProviderResult<()> {
886 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
888
889 if EitherWriter::receipts_destination(self).is_static_file() {
890 let static_file_receipt_num =
891 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
892
893 let to_delete = static_file_receipt_num
894 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
895 .unwrap_or_default();
896
897 self.static_file_provider
898 .latest_writer(StaticFileSegment::Receipts)?
899 .prune_receipts(to_delete, last_block)?;
900 }
901
902 Ok(())
903 }
904
905 fn write_bytecodes(
907 &self,
908 bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
909 ) -> ProviderResult<()> {
910 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
911 for (hash, bytecode) in bytecodes {
912 bytecodes_cursor.upsert(hash, &bytecode)?;
913 }
914 Ok(())
915 }
916}
917
918impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
919 fn try_into_history_at_block(
920 self,
921 mut block_number: BlockNumber,
922 ) -> ProviderResult<StateProviderBox> {
923 let best_block = self.best_block_number().unwrap_or_default();
924
925 if block_number > best_block {
927 return Err(ProviderError::BlockNotExecuted {
928 requested: block_number,
929 executed: best_block,
930 });
931 }
932
933 if block_number == best_block {
935 return Ok(Box::new(LatestStateProvider::new(self)));
936 }
937
938 block_number += 1;
940
941 let account_history_prune_checkpoint =
942 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
943 let storage_history_prune_checkpoint =
944 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
945 let changeset_cache = self.changeset_cache.clone();
946
947 let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
948
949 if let Some(prune_checkpoint_block_number) =
952 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
953 {
954 state_provider = state_provider.with_lowest_available_account_history_block_number(
955 prune_checkpoint_block_number + 1,
956 );
957 }
958 if let Some(prune_checkpoint_block_number) =
959 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
960 {
961 state_provider = state_provider.with_lowest_available_storage_history_block_number(
962 prune_checkpoint_block_number + 1,
963 );
964 }
965
966 Ok(Box::new(state_provider))
967 }
968}
969
970fn unwind_history_shards<S, T, C>(
985 cursor: &mut C,
986 start_key: T::Key,
987 block_number: BlockNumber,
988 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
989) -> ProviderResult<Vec<u64>>
990where
991 T: Table<Value = BlockNumberList>,
992 T::Key: AsRef<ShardedKey<S>>,
993 C: DbCursorRO<T> + DbCursorRW<T>,
994{
995 let mut item = cursor.seek_exact(start_key)?;
997 while let Some((sharded_key, list)) = item {
998 if !shard_belongs_to_key(&sharded_key) {
1000 break
1001 }
1002
1003 cursor.delete_current()?;
1006
1007 let first = list.iter().next().expect("List can't be empty");
1010
1011 if first >= block_number {
1014 item = cursor.prev()?;
1015 continue
1016 }
1017 else if block_number <= sharded_key.as_ref().highest_block_number {
1020 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1023 }
1024 return Ok(list.iter().collect::<Vec<_>>())
1027 }
1028
1029 Ok(Vec::new())
1031}
1032
1033impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1034 #[expect(clippy::too_many_arguments)]
1036 pub fn new(
1037 tx: TX,
1038 chain_spec: Arc<N::ChainSpec>,
1039 static_file_provider: StaticFileProvider<N::Primitives>,
1040 prune_modes: PruneModes,
1041 storage: Arc<N::Storage>,
1042 storage_settings: Arc<RwLock<StorageSettings>>,
1043 rocksdb_provider: RocksDBProvider,
1044 changeset_cache: ChangesetCache,
1045 runtime: reth_tasks::Runtime,
1046 db_path: PathBuf,
1047 ) -> Self {
1048 Self {
1049 tx,
1050 chain_spec,
1051 static_file_provider,
1052 prune_modes,
1053 storage,
1054 storage_settings,
1055 rocksdb_provider,
1056 changeset_cache,
1057 runtime,
1058 db_path,
1059 pending_rocksdb_batches: Default::default(),
1060 commit_order: CommitOrder::Normal,
1061 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1062 metrics: metrics::DatabaseProviderMetrics::default(),
1063 reader_txn_tracker: None,
1064 }
1065 }
1066
1067 pub fn into_tx(self) -> TX {
1069 self.tx
1070 }
1071
1072 pub const fn tx_mut(&mut self) -> &mut TX {
1074 &mut self.tx
1075 }
1076
1077 pub const fn tx_ref(&self) -> &TX {
1079 &self.tx
1080 }
1081
1082 pub fn chain_spec(&self) -> &N::ChainSpec {
1084 &self.chain_spec
1085 }
1086}
1087
1088impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1089 fn recovered_block<H, HF, B, BF>(
1090 &self,
1091 id: BlockHashOrNumber,
1092 _transaction_kind: TransactionVariant,
1093 header_by_number: HF,
1094 construct_block: BF,
1095 ) -> ProviderResult<Option<B>>
1096 where
1097 H: AsRef<HeaderTy<N>>,
1098 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1099 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1100 {
1101 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1102 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1103
1104 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1111
1112 let tx_range = body.tx_num_range();
1113
1114 let transactions = if tx_range.is_empty() {
1115 vec![]
1116 } else {
1117 self.transactions_by_tx_range(tx_range.clone())?
1118 };
1119
1120 let body = self
1121 .storage
1122 .reader()
1123 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1124 .pop()
1125 .ok_or(ProviderError::InvalidStorageOutput)?;
1126
1127 let senders = if tx_range.is_empty() {
1128 vec![]
1129 } else {
1130 let known_senders: HashMap<TxNumber, Address> =
1131 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1132
1133 let mut senders = Vec::with_capacity(body.transactions().len());
1134 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1135 match known_senders.get(&tx_num) {
1136 None => {
1137 let sender = tx.recover_signer_unchecked()?;
1138 senders.push(sender);
1139 }
1140 Some(sender) => senders.push(*sender),
1141 }
1142 }
1143 senders
1144 };
1145
1146 construct_block(header, body, senders)
1147 }
1148
1149 fn block_range<F, H, HF, R>(
1159 &self,
1160 range: RangeInclusive<BlockNumber>,
1161 headers_range: HF,
1162 mut assemble_block: F,
1163 ) -> ProviderResult<Vec<R>>
1164 where
1165 H: AsRef<HeaderTy<N>>,
1166 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1167 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1168 {
1169 if range.is_empty() {
1170 return Ok(Vec::new())
1171 }
1172
1173 let len = range.end().saturating_sub(*range.start()) as usize + 1;
1174 let mut blocks = Vec::with_capacity(len);
1175
1176 let headers = headers_range(range.clone())?;
1177
1178 let present_headers = self
1184 .block_body_indices_range(range)?
1185 .into_iter()
1186 .map(|b| b.tx_num_range())
1187 .zip(headers)
1188 .collect::<Vec<_>>();
1189
1190 let mut inputs = Vec::with_capacity(present_headers.len());
1191 for (tx_range, header) in &present_headers {
1192 let transactions = if tx_range.is_empty() {
1193 Vec::new()
1194 } else {
1195 self.transactions_by_tx_range(tx_range.clone())?
1196 };
1197
1198 inputs.push((header.as_ref(), transactions));
1199 }
1200
1201 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1202
1203 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1204 blocks.push(assemble_block(header, body, tx_range)?);
1205 }
1206
1207 Ok(blocks)
1208 }
1209
1210 fn block_with_senders_range<H, HF, B, BF>(
1221 &self,
1222 range: RangeInclusive<BlockNumber>,
1223 headers_range: HF,
1224 assemble_block: BF,
1225 ) -> ProviderResult<Vec<B>>
1226 where
1227 H: AsRef<HeaderTy<N>>,
1228 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1229 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1230 {
1231 self.block_range(range, headers_range, |header, body, tx_range| {
1232 let senders = if tx_range.is_empty() {
1233 Vec::new()
1234 } else {
1235 let known_senders: HashMap<TxNumber, Address> =
1236 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1237
1238 let mut senders = Vec::with_capacity(body.transactions().len());
1239 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1240 match known_senders.get(&tx_num) {
1241 None => {
1242 let sender = tx.recover_signer_unchecked()?;
1244 senders.push(sender);
1245 }
1246 Some(sender) => senders.push(*sender),
1247 }
1248 }
1249
1250 senders
1251 };
1252
1253 assemble_block(header, body, senders)
1254 })
1255 }
1256
1257 fn populate_bundle_state(
1261 &self,
1262 account_changeset: Vec<(u64, AccountBeforeTx)>,
1263 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1264 mut get_account: impl FnMut(Address) -> ProviderResult<Option<Account>>,
1265 mut get_storage: impl FnMut(Address, StorageKey) -> ProviderResult<Option<StorageValue>>,
1266 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1267 let mut state: BundleStateInit = HashMap::default();
1271
1272 let mut reverts: RevertsInit = HashMap::default();
1278
1279 for (block_number, account_before) in account_changeset.into_iter().rev() {
1281 let AccountBeforeTx { info: old_info, address } = account_before;
1282 match state.entry(address) {
1283 hash_map::Entry::Vacant(entry) => {
1284 let new_info = get_account(address)?;
1285 entry.insert((old_info, new_info, HashMap::default()));
1286 }
1287 hash_map::Entry::Occupied(mut entry) => {
1288 entry.get_mut().0 = old_info;
1290 }
1291 }
1292 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1294 }
1295
1296 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1298 let BlockNumberAddress((block_number, address)) = block_and_address;
1299 let account_state = match state.entry(address) {
1301 hash_map::Entry::Vacant(entry) => {
1302 let present_info = get_account(address)?;
1303 entry.insert((present_info, present_info, HashMap::default()))
1304 }
1305 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1306 };
1307
1308 match account_state.2.entry(old_storage.key) {
1310 hash_map::Entry::Vacant(entry) => {
1311 let new_storage = get_storage(address, old_storage.key)?.unwrap_or_default();
1312 entry.insert((old_storage.value, new_storage));
1313 }
1314 hash_map::Entry::Occupied(mut entry) => {
1315 entry.get_mut().0 = old_storage.value;
1316 }
1317 };
1318
1319 reverts
1320 .entry(block_number)
1321 .or_default()
1322 .entry(address)
1323 .or_default()
1324 .1
1325 .push(old_storage);
1326 }
1327
1328 Ok((state, reverts))
1329 }
1330
1331 fn populate_bundle_state_plain(
1334 &self,
1335 account_changeset: Vec<(u64, AccountBeforeTx)>,
1336 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1337 plain_accounts_cursor: &mut impl DbCursorRO<tables::PlainAccountState>,
1338 plain_storage_cursor: &mut impl DbDupCursorRO<tables::PlainStorageState>,
1339 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1340 self.populate_bundle_state(
1341 account_changeset,
1342 storage_changeset,
1343 |address| Ok(plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1)),
1344 |address, storage_key| {
1345 Ok(plain_storage_cursor
1346 .seek_by_key_subkey(address, storage_key)?
1347 .filter(|s| s.key == storage_key)
1348 .map(|s| s.value))
1349 },
1350 )
1351 }
1352
1353 fn populate_bundle_state_hashed(
1358 &self,
1359 account_changeset: Vec<(u64, AccountBeforeTx)>,
1360 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1361 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1362 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1363 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1364 self.populate_bundle_state(
1365 account_changeset,
1366 storage_changeset,
1367 |address| Ok(hashed_accounts_cursor.seek_exact(keccak256(address))?.map(|kv| kv.1)),
1368 |address, storage_key| {
1369 let hashed_storage_key = keccak256(storage_key);
1370 Ok(hashed_storage_cursor
1371 .seek_by_key_subkey(keccak256(address), hashed_storage_key)?
1372 .filter(|s| s.key == hashed_storage_key)
1373 .map(|s| s.value))
1374 },
1375 )
1376 }
1377
1378 fn populate_bundle_state_with_provider(
1379 &self,
1380 account_changeset: Vec<(u64, AccountBeforeTx)>,
1381 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1382 state_provider: impl StateProvider,
1383 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1384 self.populate_bundle_state(
1385 account_changeset,
1386 storage_changeset,
1387 |address| state_provider.basic_account(&address),
1388 |address, storage_key| state_provider.storage(address, storage_key),
1389 )
1390 }
1391}
1392
1393impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1394 fn append_history_index<P, T>(
1402 &self,
1403 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1404 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1405 ) -> ProviderResult<()>
1406 where
1407 P: Copy,
1408 T: Table<Value = BlockNumberList>,
1409 {
1410 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1413
1414 let mut cursor = self.tx.cursor_write::<T>()?;
1415
1416 for (partial_key, indices) in index_updates {
1417 let last_key = sharded_key_factory(partial_key, u64::MAX);
1418 let mut last_shard = cursor
1419 .seek_exact(last_key.clone())?
1420 .map(|(_, list)| list)
1421 .unwrap_or_else(BlockNumberList::empty);
1422
1423 last_shard.append(indices).map_err(ProviderError::other)?;
1424
1425 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1427 cursor.upsert(last_key, &last_shard)?;
1428 continue;
1429 }
1430
1431 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1433 let mut chunks_peekable = chunks.into_iter().peekable();
1434
1435 while let Some(chunk) = chunks_peekable.next() {
1436 let shard = BlockNumberList::new_pre_sorted(chunk);
1437 let highest_block_number = if chunks_peekable.peek().is_some() {
1438 shard.iter().next_back().expect("`chunks` does not return empty list")
1439 } else {
1440 u64::MAX
1442 };
1443
1444 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1445 }
1446 }
1447
1448 Ok(())
1449 }
1450}
1451
1452impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1453 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1454 if self.cached_storage_settings().use_hashed_state() {
1455 let hashed_address = keccak256(address);
1456 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1457 } else {
1458 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1459 }
1460 }
1461}
1462
1463impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1464 fn changed_accounts_with_range(
1465 &self,
1466 range: RangeInclusive<BlockNumber>,
1467 ) -> ProviderResult<BTreeSet<Address>> {
1468 let mut reader = EitherReader::new_account_changesets(self)?;
1469
1470 reader.changed_accounts_with_range(range)
1471 }
1472
1473 fn basic_accounts(
1474 &self,
1475 iter: impl IntoIterator<Item = Address>,
1476 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1477 if self.cached_storage_settings().use_hashed_state() {
1478 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1479 Ok(iter
1480 .into_iter()
1481 .map(|address| {
1482 let hashed_address = keccak256(address);
1483 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1484 })
1485 .collect::<Result<Vec<_>, _>>()?)
1486 } else {
1487 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1488 Ok(iter
1489 .into_iter()
1490 .map(|address| {
1491 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1492 })
1493 .collect::<Result<Vec<_>, _>>()?)
1494 }
1495 }
1496
1497 fn changed_accounts_and_blocks_with_range(
1498 &self,
1499 range: RangeInclusive<BlockNumber>,
1500 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1501 let highest_static_block = self
1502 .static_file_provider
1503 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1504
1505 if let Some(highest) = highest_static_block &&
1506 self.cached_storage_settings().storage_v2
1507 {
1508 let start = *range.start();
1509 let static_end = (*range.end()).min(highest);
1510
1511 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1512 if start <= static_end {
1513 for block in start..=static_end {
1514 let block_changesets = self.account_block_changeset(block)?;
1515 for changeset in block_changesets {
1516 changed_accounts_and_blocks
1517 .entry(changeset.address)
1518 .or_default()
1519 .push(block);
1520 }
1521 }
1522 }
1523
1524 Ok(changed_accounts_and_blocks)
1525 } else {
1526 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1527
1528 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1529 BTreeMap::new(),
1530 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1531 let (index, account) = entry?;
1532 accounts.entry(account.address).or_default().push(index);
1533 Ok(accounts)
1534 },
1535 )?;
1536
1537 Ok(account_transitions)
1538 }
1539 }
1540}
1541
1542impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1543 fn storage_changeset(
1544 &self,
1545 block_number: BlockNumber,
1546 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1547 if self.cached_storage_settings().storage_v2 {
1548 self.static_file_provider.storage_changeset(block_number)
1549 } else {
1550 let range = block_number..=block_number;
1551 let storage_range = BlockNumberAddress::range(range);
1552 self.tx
1553 .cursor_dup_read::<tables::StorageChangeSets>()?
1554 .walk_range(storage_range)?
1555 .map(|r| {
1556 let (bna, entry) = r?;
1557 Ok((bna, entry))
1558 })
1559 .collect()
1560 }
1561 }
1562
1563 fn get_storage_before_block(
1564 &self,
1565 block_number: BlockNumber,
1566 address: Address,
1567 storage_key: B256,
1568 ) -> ProviderResult<Option<StorageEntry>> {
1569 if self.cached_storage_settings().storage_v2 {
1570 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1571 } else {
1572 Ok(self
1573 .tx
1574 .cursor_dup_read::<tables::StorageChangeSets>()?
1575 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1576 .filter(|entry| entry.key == storage_key))
1577 }
1578 }
1579
1580 fn storage_changesets_range(
1581 &self,
1582 range: impl RangeBounds<BlockNumber>,
1583 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1584 if self.cached_storage_settings().storage_v2 {
1585 self.static_file_provider.storage_changesets_range(range)
1586 } else {
1587 self.tx
1588 .cursor_dup_read::<tables::StorageChangeSets>()?
1589 .walk_range(BlockNumberAddressRange::from(range))?
1590 .map(|r| {
1591 let (bna, entry) = r?;
1592 Ok((bna, entry))
1593 })
1594 .collect()
1595 }
1596 }
1597}
1598
1599impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1600 fn account_block_changeset(
1601 &self,
1602 block_number: BlockNumber,
1603 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1604 if self.cached_storage_settings().storage_v2 {
1605 let static_changesets =
1606 self.static_file_provider.account_block_changeset(block_number)?;
1607 Ok(static_changesets)
1608 } else {
1609 let range = block_number..=block_number;
1610 self.tx
1611 .cursor_read::<tables::AccountChangeSets>()?
1612 .walk_range(range)?
1613 .map(|result| -> ProviderResult<_> {
1614 let (_, account_before) = result?;
1615 Ok(account_before)
1616 })
1617 .collect()
1618 }
1619 }
1620
1621 fn get_account_before_block(
1622 &self,
1623 block_number: BlockNumber,
1624 address: Address,
1625 ) -> ProviderResult<Option<AccountBeforeTx>> {
1626 if self.cached_storage_settings().storage_v2 {
1627 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1628 } else {
1629 self.tx
1630 .cursor_dup_read::<tables::AccountChangeSets>()?
1631 .seek_by_key_subkey(block_number, address)?
1632 .filter(|acc| acc.address == address)
1633 .map(Ok)
1634 .transpose()
1635 }
1636 }
1637
1638 fn account_changesets_range(
1639 &self,
1640 range: impl core::ops::RangeBounds<BlockNumber>,
1641 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1642 if self.cached_storage_settings().storage_v2 {
1643 self.static_file_provider.account_changesets_range(range)
1644 } else {
1645 self.tx
1646 .cursor_read::<tables::AccountChangeSets>()?
1647 .walk_range(to_range(range))?
1648 .map(|r| r.map_err(Into::into))
1649 .collect()
1650 }
1651 }
1652}
1653
1654impl<Tx: DbTx + 'static, N: NodeTypesForProvider> StateReader for DatabaseProvider<Tx, N> {
1655 type Receipt = ReceiptTy<N>;
1656
1657 fn get_state(
1658 &self,
1659 block: BlockNumber,
1660 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1661 let Some(block_body) = self.block_body_indices(block)? else { return Ok(None) };
1662
1663 let from_transaction_num = block_body.first_tx_num();
1664 let to_transaction_num = block_body.last_tx_num();
1665
1666 let account_changeset = self.account_changesets_range(block..=block)?;
1667 let storage_changeset = self.storage_changeset(block)?;
1668
1669 let Some(block_hash) = self.block_hash(block)? else { return Ok(None) };
1670 let state_provider = self.history_by_block_hash(block_hash)?;
1671 let (state, reverts) = self.populate_bundle_state_with_provider(
1672 account_changeset,
1673 storage_changeset,
1674 state_provider,
1675 )?;
1676
1677 let receipts = self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?;
1678
1679 Ok(Some(ExecutionOutcome::new_init(
1680 state,
1681 reverts,
1682 Vec::new(),
1684 vec![receipts],
1685 block,
1686 Vec::new(),
1687 )))
1688 }
1689}
1690
1691impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1692 for DatabaseProvider<TX, N>
1693{
1694 type Header = HeaderTy<N>;
1695
1696 fn local_tip_header(
1697 &self,
1698 highest_uninterrupted_block: BlockNumber,
1699 ) -> ProviderResult<SealedHeader<Self::Header>> {
1700 let static_file_provider = self.static_file_provider();
1701
1702 let next_static_file_block_num = static_file_provider
1705 .get_highest_static_file_block(StaticFileSegment::Headers)
1706 .map(|id| id + 1)
1707 .unwrap_or_default();
1708 let next_block = highest_uninterrupted_block + 1;
1709
1710 match next_static_file_block_num.cmp(&next_block) {
1711 Ordering::Greater => {
1714 let mut static_file_producer =
1715 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1716 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1717 static_file_producer.commit()?
1720 }
1721 Ordering::Less => {
1722 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1724 }
1725 Ordering::Equal => {}
1726 }
1727
1728 let local_head = static_file_provider
1729 .sealed_header(highest_uninterrupted_block)?
1730 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1731
1732 Ok(local_head)
1733 }
1734}
1735
1736impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1737 type Header = HeaderTy<N>;
1738
1739 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1740 if let Some(num) = self.block_number(block_hash)? {
1741 Ok(self.header_by_number(num)?)
1742 } else {
1743 Ok(None)
1744 }
1745 }
1746
1747 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1748 self.static_file_provider.header_by_number(num)
1749 }
1750
1751 fn headers_range(
1752 &self,
1753 range: impl RangeBounds<BlockNumber>,
1754 ) -> ProviderResult<Vec<Self::Header>> {
1755 self.static_file_provider.headers_range(range)
1756 }
1757
1758 fn sealed_header(
1759 &self,
1760 number: BlockNumber,
1761 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1762 self.static_file_provider.sealed_header(number)
1763 }
1764
1765 fn sealed_headers_while(
1766 &self,
1767 range: impl RangeBounds<BlockNumber>,
1768 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1769 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1770 self.static_file_provider.sealed_headers_while(range, predicate)
1771 }
1772}
1773
1774impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1775 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1776 self.static_file_provider.block_hash(number)
1777 }
1778
1779 fn canonical_hashes_range(
1780 &self,
1781 start: BlockNumber,
1782 end: BlockNumber,
1783 ) -> ProviderResult<Vec<B256>> {
1784 self.static_file_provider.canonical_hashes_range(start, end)
1785 }
1786}
1787
1788impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1789 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1790 let best_number = self.best_block_number()?;
1791 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1792 Ok(ChainInfo { best_hash, best_number })
1793 }
1794
1795 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1796 Ok(self
1799 .get_stage_checkpoint(StageId::Finish)?
1800 .map(|checkpoint| checkpoint.block_number)
1801 .unwrap_or_default())
1802 }
1803
1804 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1805 self.static_file_provider.last_block_number()
1806 }
1807
1808 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1809 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1810 }
1811}
1812
1813impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1814 type Block = BlockTy<N>;
1815
1816 fn find_block_by_hash(
1817 &self,
1818 hash: B256,
1819 source: BlockSource,
1820 ) -> ProviderResult<Option<Self::Block>> {
1821 if source.is_canonical() {
1822 self.block(hash.into())
1823 } else {
1824 Ok(None)
1825 }
1826 }
1827
1828 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1836 if let Some(number) = self.convert_hash_or_number(id)? {
1837 let earliest_available = self.static_file_provider.earliest_history_height();
1838 if number < earliest_available {
1839 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1840 }
1841
1842 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1843
1844 let Some(transactions) = self.transactions_by_block(number.into())? else {
1849 return Ok(None)
1850 };
1851
1852 let body = self
1853 .storage
1854 .reader()
1855 .read_block_bodies(self, vec![(&header, transactions)])?
1856 .pop()
1857 .ok_or(ProviderError::InvalidStorageOutput)?;
1858
1859 return Ok(Some(Self::Block::new(header, body)))
1860 }
1861
1862 Ok(None)
1863 }
1864
1865 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1866 Ok(None)
1867 }
1868
1869 fn pending_block_and_receipts(
1870 &self,
1871 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1872 Ok(None)
1873 }
1874
1875 fn recovered_block(
1884 &self,
1885 id: BlockHashOrNumber,
1886 transaction_kind: TransactionVariant,
1887 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1888 self.recovered_block(
1889 id,
1890 transaction_kind,
1891 |block_number| self.header_by_number(block_number),
1892 |header, body, senders| {
1893 Self::Block::new(header, body)
1894 .try_into_recovered_unchecked(senders)
1898 .map(Some)
1899 .map_err(|_| ProviderError::SenderRecoveryError)
1900 },
1901 )
1902 }
1903
1904 fn sealed_block_with_senders(
1905 &self,
1906 id: BlockHashOrNumber,
1907 transaction_kind: TransactionVariant,
1908 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1909 self.recovered_block(
1910 id,
1911 transaction_kind,
1912 |block_number| self.sealed_header(block_number),
1913 |header, body, senders| {
1914 Self::Block::new_sealed(header, body)
1915 .try_with_senders_unchecked(senders)
1919 .map(Some)
1920 .map_err(|_| ProviderError::SenderRecoveryError)
1921 },
1922 )
1923 }
1924
1925 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1926 self.block_range(
1927 range,
1928 |range| self.headers_range(range),
1929 |header, body, _| Ok(Self::Block::new(header, body)),
1930 )
1931 }
1932
1933 fn block_with_senders_range(
1934 &self,
1935 range: RangeInclusive<BlockNumber>,
1936 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1937 self.block_with_senders_range(
1938 range,
1939 |range| self.headers_range(range),
1940 |header, body, senders| {
1941 Self::Block::new(header, body)
1942 .try_into_recovered_unchecked(senders)
1943 .map_err(|_| ProviderError::SenderRecoveryError)
1944 },
1945 )
1946 }
1947
1948 fn recovered_block_range(
1949 &self,
1950 range: RangeInclusive<BlockNumber>,
1951 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1952 self.block_with_senders_range(
1953 range,
1954 |range| self.sealed_headers_range(range),
1955 |header, body, senders| {
1956 Self::Block::new_sealed(header, body)
1957 .try_with_senders(senders)
1958 .map_err(|_| ProviderError::SenderRecoveryError)
1959 },
1960 )
1961 }
1962
1963 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1964 Ok(self
1965 .tx
1966 .cursor_read::<tables::TransactionBlocks>()?
1967 .seek(id)
1968 .map(|b| b.map(|(_, bn)| bn))?)
1969 }
1970}
1971
1972impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1973 for DatabaseProvider<TX, N>
1974{
1975 fn transaction_hashes_by_range(
1978 &self,
1979 tx_range: Range<TxNumber>,
1980 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1981 self.static_file_provider.transaction_hashes_by_range(tx_range)
1982 }
1983}
1984
1985impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1987 type Transaction = TxTy<N>;
1988
1989 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1990 self.with_rocksdb_snapshot(|rocksdb_ref| {
1991 let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1992 reader.get_transaction_hash_number(tx_hash)
1993 })
1994 }
1995
1996 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1997 self.static_file_provider.transaction_by_id(id)
1998 }
1999
2000 fn transaction_by_id_unhashed(
2001 &self,
2002 id: TxNumber,
2003 ) -> ProviderResult<Option<Self::Transaction>> {
2004 self.static_file_provider.transaction_by_id_unhashed(id)
2005 }
2006
2007 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2008 if let Some(id) = self.transaction_id(hash)? {
2009 Ok(self.transaction_by_id_unhashed(id)?)
2010 } else {
2011 Ok(None)
2012 }
2013 }
2014
2015 fn transaction_by_hash_with_meta(
2016 &self,
2017 tx_hash: TxHash,
2018 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2019 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
2020 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
2021 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
2022 let Some(sealed_header) = self.sealed_header(block_number)?
2023 {
2024 let (header, block_hash) = sealed_header.split();
2025 if let Some(block_body) = self.block_body_indices(block_number)? {
2026 let index = transaction_id - block_body.first_tx_num();
2031
2032 let meta = TransactionMeta {
2033 tx_hash,
2034 index,
2035 block_hash,
2036 block_number,
2037 base_fee: header.base_fee_per_gas(),
2038 excess_blob_gas: header.excess_blob_gas(),
2039 timestamp: header.timestamp(),
2040 };
2041
2042 return Ok(Some((transaction, meta)))
2043 }
2044 }
2045
2046 Ok(None)
2047 }
2048
2049 fn transactions_by_block(
2050 &self,
2051 id: BlockHashOrNumber,
2052 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2053 if let Some(block_number) = self.convert_hash_or_number(id)? &&
2054 let Some(body) = self.block_body_indices(block_number)?
2055 {
2056 let tx_range = body.tx_num_range();
2057 return if tx_range.is_empty() {
2058 Ok(Some(Vec::new()))
2059 } else {
2060 self.transactions_by_tx_range(tx_range).map(Some)
2061 }
2062 }
2063 Ok(None)
2064 }
2065
2066 fn transactions_by_block_range(
2067 &self,
2068 range: impl RangeBounds<BlockNumber>,
2069 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2070 let range = to_range(range);
2071
2072 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2073 .into_iter()
2074 .map(|body| {
2075 let tx_num_range = body.tx_num_range();
2076 if tx_num_range.is_empty() {
2077 Ok(Vec::new())
2078 } else {
2079 self.transactions_by_tx_range(tx_num_range)
2080 }
2081 })
2082 .collect()
2083 }
2084
2085 fn transactions_by_tx_range(
2086 &self,
2087 range: impl RangeBounds<TxNumber>,
2088 ) -> ProviderResult<Vec<Self::Transaction>> {
2089 self.static_file_provider.transactions_by_tx_range(range)
2090 }
2091
2092 fn senders_by_tx_range(
2093 &self,
2094 range: impl RangeBounds<TxNumber>,
2095 ) -> ProviderResult<Vec<Address>> {
2096 if EitherWriterDestination::senders(self).is_static_file() {
2097 self.static_file_provider.senders_by_tx_range(range)
2098 } else {
2099 self.cursor_read_collect::<tables::TransactionSenders>(range)
2100 }
2101 }
2102
2103 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2104 if EitherWriterDestination::senders(self).is_static_file() {
2105 self.static_file_provider.transaction_sender(id)
2106 } else {
2107 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2108 }
2109 }
2110}
2111
2112impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2113 type Receipt = ReceiptTy<N>;
2114
2115 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2116 self.static_file_provider.get_with_static_file_or_database(
2117 StaticFileSegment::Receipts,
2118 id,
2119 |static_file| static_file.receipt(id),
2120 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2121 )
2122 }
2123
2124 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2125 if let Some(id) = self.transaction_id(hash)? {
2126 self.receipt(id)
2127 } else {
2128 Ok(None)
2129 }
2130 }
2131
2132 fn receipts_by_block(
2133 &self,
2134 block: BlockHashOrNumber,
2135 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2136 if let Some(number) = self.convert_hash_or_number(block)? &&
2137 let Some(body) = self.block_body_indices(number)?
2138 {
2139 let tx_range = body.tx_num_range();
2140 return if tx_range.is_empty() {
2141 Ok(Some(Vec::new()))
2142 } else {
2143 self.receipts_by_tx_range(tx_range).map(Some)
2144 }
2145 }
2146 Ok(None)
2147 }
2148
2149 fn receipts_by_tx_range(
2150 &self,
2151 range: impl RangeBounds<TxNumber>,
2152 ) -> ProviderResult<Vec<Self::Receipt>> {
2153 self.static_file_provider.get_range_with_static_file_or_database(
2154 StaticFileSegment::Receipts,
2155 to_range(range),
2156 |static_file, range, _| static_file.receipts_by_tx_range(range),
2157 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2158 |_| true,
2159 )
2160 }
2161
2162 fn receipts_by_block_range(
2163 &self,
2164 block_range: RangeInclusive<BlockNumber>,
2165 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2166 if block_range.is_empty() {
2167 return Ok(Vec::new());
2168 }
2169
2170 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2172 let mut block_body_indices = Vec::with_capacity(range_len);
2173 for block_num in block_range {
2174 if let Some(indices) = self.block_body_indices(block_num)? {
2175 block_body_indices.push(indices);
2176 } else {
2177 block_body_indices.push(StoredBlockBodyIndices::default());
2179 }
2180 }
2181
2182 if block_body_indices.is_empty() {
2183 return Ok(Vec::new());
2184 }
2185
2186 let non_empty_blocks: Vec<_> =
2188 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2189
2190 if non_empty_blocks.is_empty() {
2191 return Ok(vec![Vec::new(); block_body_indices.len()]);
2193 }
2194
2195 let first_tx = non_empty_blocks[0].first_tx_num();
2197 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2198
2199 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2201 let mut receipts_iter = all_receipts.into_iter();
2202
2203 let mut result = Vec::with_capacity(block_body_indices.len());
2205 for indices in &block_body_indices {
2206 if indices.tx_count == 0 {
2207 result.push(Vec::new());
2208 } else {
2209 let block_receipts =
2210 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2211 result.push(block_receipts);
2212 }
2213 }
2214
2215 Ok(result)
2216 }
2217}
2218
2219impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2220 for DatabaseProvider<TX, N>
2221{
2222 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2223 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2224 }
2225
2226 fn block_body_indices_range(
2227 &self,
2228 range: RangeInclusive<BlockNumber>,
2229 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2230 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2231 }
2232}
2233
2234impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2235 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2236 Ok(if let Some(encoded) = id.get_pre_encoded() {
2237 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2238 } else {
2239 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2240 })
2241 }
2242
2243 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2245 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2246 }
2247
2248 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2249 self.tx
2250 .cursor_read::<tables::StageCheckpoints>()?
2251 .walk(None)?
2252 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2253 .map_err(ProviderError::Database)
2254 }
2255}
2256
2257impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2258 fn save_stage_checkpoint(
2260 &self,
2261 id: StageId,
2262 checkpoint: StageCheckpoint,
2263 ) -> ProviderResult<()> {
2264 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2265 }
2266
2267 fn save_stage_checkpoint_progress(
2269 &self,
2270 id: StageId,
2271 checkpoint: Vec<u8>,
2272 ) -> ProviderResult<()> {
2273 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2274 }
2275
2276 #[instrument(level = "debug", target = "providers::db", skip_all)]
2277 fn update_pipeline_stages(
2278 &self,
2279 block_number: BlockNumber,
2280 drop_stage_checkpoint: bool,
2281 ) -> ProviderResult<()> {
2282 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2284 for stage_id in StageId::ALL {
2285 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2286 cursor.upsert(
2287 stage_id.to_string(),
2288 &StageCheckpoint {
2289 block_number,
2290 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2291 },
2292 )?;
2293 }
2294
2295 Ok(())
2296 }
2297}
2298
2299impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2300 fn plain_state_storages(
2301 &self,
2302 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2303 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2304 if self.cached_storage_settings().use_hashed_state() {
2305 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2306
2307 addresses_with_keys
2308 .into_iter()
2309 .map(|(address, storage)| {
2310 let hashed_address = keccak256(address);
2311 storage
2312 .into_iter()
2313 .map(|key| -> ProviderResult<_> {
2314 let hashed_key = keccak256(key);
2315 let value = hashed_storage
2316 .seek_by_key_subkey(hashed_address, hashed_key)?
2317 .filter(|v| v.key == hashed_key)
2318 .map(|v| v.value)
2319 .unwrap_or_default();
2320 Ok(StorageEntry { key, value })
2321 })
2322 .collect::<ProviderResult<Vec<_>>>()
2323 .map(|storage| (address, storage))
2324 })
2325 .collect::<ProviderResult<Vec<(_, _)>>>()
2326 } else {
2327 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2328
2329 addresses_with_keys
2330 .into_iter()
2331 .map(|(address, storage)| {
2332 storage
2333 .into_iter()
2334 .map(|key| -> ProviderResult<_> {
2335 Ok(plain_storage
2336 .seek_by_key_subkey(address, key)?
2337 .filter(|v| v.key == key)
2338 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2339 })
2340 .collect::<ProviderResult<Vec<_>>>()
2341 .map(|storage| (address, storage))
2342 })
2343 .collect::<ProviderResult<Vec<(_, _)>>>()
2344 }
2345 }
2346
2347 fn changed_storages_with_range(
2348 &self,
2349 range: RangeInclusive<BlockNumber>,
2350 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2351 if self.cached_storage_settings().storage_v2 {
2352 self.storage_changesets_range(range)?.into_iter().try_fold(
2353 BTreeMap::new(),
2354 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2355 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2356 accounts.entry(address).or_default().insert(storage_entry.key);
2357 Ok(accounts)
2358 },
2359 )
2360 } else {
2361 self.tx
2362 .cursor_read::<tables::StorageChangeSets>()?
2363 .walk_range(BlockNumberAddress::range(range))?
2364 .try_fold(
2367 BTreeMap::new(),
2368 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2369 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2370 accounts.entry(address).or_default().insert(storage_entry.key);
2371 Ok(accounts)
2372 },
2373 )
2374 }
2375 }
2376
2377 fn changed_storages_and_blocks_with_range(
2378 &self,
2379 range: RangeInclusive<BlockNumber>,
2380 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2381 if self.cached_storage_settings().storage_v2 {
2382 self.storage_changesets_range(range)?.into_iter().try_fold(
2383 BTreeMap::new(),
2384 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2385 storages
2386 .entry((index.address(), storage.key))
2387 .or_default()
2388 .push(index.block_number());
2389 Ok(storages)
2390 },
2391 )
2392 } else {
2393 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2394
2395 let storage_changeset_lists =
2396 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2397 BTreeMap::new(),
2398 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2399 entry|
2400 -> ProviderResult<_> {
2401 let (index, storage) = entry?;
2402 storages
2403 .entry((index.address(), storage.key))
2404 .or_default()
2405 .push(index.block_number());
2406 Ok(storages)
2407 },
2408 )?;
2409
2410 Ok(storage_changeset_lists)
2411 }
2412 }
2413}
2414
2415impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2416 for DatabaseProvider<TX, N>
2417{
2418 type Receipt = ReceiptTy<N>;
2419
2420 #[instrument(level = "debug", target = "providers::db", skip_all)]
2421 fn write_state<'a>(
2422 &self,
2423 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2424 is_value_known: OriginalValuesKnown,
2425 config: StateWriteConfig,
2426 ) -> ProviderResult<()> {
2427 let execution_outcome = execution_outcome.into();
2428
2429 if self.cached_storage_settings().use_hashed_state() &&
2430 !config.write_receipts &&
2431 !config.write_account_changesets &&
2432 !config.write_storage_changesets
2433 {
2434 self.write_bytecodes(
2438 execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2439 )?;
2440 return Ok(());
2441 }
2442
2443 let first_block = execution_outcome.first_block();
2444 let (plain_state, reverts) =
2445 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2446
2447 self.write_state_reverts(reverts, first_block, config)?;
2448 self.write_state_changes(plain_state)?;
2449
2450 if !config.write_receipts {
2451 return Ok(());
2452 }
2453
2454 let block_count = execution_outcome.len() as u64;
2455 let last_block = execution_outcome.last_block();
2456 let block_range = first_block..=last_block;
2457
2458 let tip = self.last_block_number()?.max(last_block);
2459
2460 let block_indices: Vec<_> = self
2462 .block_body_indices_range(block_range)?
2463 .into_iter()
2464 .map(|b| b.first_tx_num)
2465 .collect();
2466
2467 if block_indices.len() < block_count as usize {
2469 let missing_blocks = block_count - block_indices.len() as u64;
2470 return Err(ProviderError::BlockBodyIndicesNotFound(
2471 last_block.saturating_sub(missing_blocks - 1),
2472 ));
2473 }
2474
2475 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2476
2477 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2478 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2479
2480 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2488 self.static_file_provider()
2489 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2490 .is_none()) &&
2491 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2492
2493 let mut allowed_addresses: AddressSet = AddressSet::default();
2495 for (_, addresses) in contract_log_pruner.range(..first_block) {
2496 allowed_addresses.extend(addresses.iter().copied());
2497 }
2498
2499 for (idx, (receipts, first_tx_index)) in
2500 execution_outcome.receipts().zip(block_indices).enumerate()
2501 {
2502 let block_number = first_block + idx as u64;
2503
2504 receipts_writer.increment_block(block_number)?;
2506
2507 if prunable_receipts &&
2509 self.prune_modes
2510 .receipts
2511 .is_some_and(|mode| mode.should_prune(block_number, tip))
2512 {
2513 continue
2514 }
2515
2516 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2518 allowed_addresses.extend(new_addresses.iter().copied());
2519 }
2520
2521 for (idx, receipt) in receipts.iter().enumerate() {
2522 let receipt_idx = first_tx_index + idx as u64;
2523 if prunable_receipts &&
2526 has_contract_log_filter &&
2527 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2528 {
2529 continue
2530 }
2531
2532 receipts_writer.append_receipt(receipt_idx, receipt)?;
2533 }
2534 }
2535
2536 Ok(())
2537 }
2538
2539 fn write_state_reverts(
2540 &self,
2541 reverts: PlainStateReverts,
2542 first_block: BlockNumber,
2543 config: StateWriteConfig,
2544 ) -> ProviderResult<()> {
2545 if config.write_storage_changesets {
2547 tracing::trace!("Writing storage changes");
2548 let mut storages_cursor =
2549 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2550 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2551 let block_number = first_block + block_index as BlockNumber;
2552
2553 tracing::trace!(block_number, "Writing block change");
2554 storage_changes.par_sort_unstable_by_key(|a| a.address);
2556 let total_changes =
2557 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2558 let mut changeset = Vec::with_capacity(total_changes);
2559 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2560 let mut storage = storage_revert
2561 .into_iter()
2562 .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2563 .collect::<Vec<_>>();
2564 storage.par_sort_unstable_by_key(|a| a.0);
2566
2567 let mut wiped_storage = Vec::new();
2575 if wiped {
2576 tracing::trace!(?address, "Wiping storage");
2577 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2578 wiped_storage.push((entry.key, entry.value));
2579 while let Some(entry) = storages_cursor.next_dup_val()? {
2580 wiped_storage.push((entry.key, entry.value))
2581 }
2582 }
2583 }
2584
2585 tracing::trace!(?address, ?storage, "Writing storage reverts");
2586 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2587 changeset.push(StorageBeforeTx { address, key, value });
2588 }
2589 }
2590
2591 let mut storage_changesets_writer =
2592 EitherWriter::new_storage_changesets(self, block_number)?;
2593 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2594 }
2595 }
2596
2597 if !config.write_account_changesets {
2598 return Ok(());
2599 }
2600
2601 tracing::trace!(?first_block, "Writing account changes");
2603 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2604 let block_number = first_block + block_index as BlockNumber;
2605 let changeset = account_block_reverts
2606 .into_iter()
2607 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2608 .collect::<Vec<_>>();
2609 let mut account_changesets_writer =
2610 EitherWriter::new_account_changesets(self, block_number)?;
2611
2612 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2613 }
2614
2615 Ok(())
2616 }
2617
2618 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2619 changes.accounts.par_sort_by_key(|a| a.0);
2622 changes.storage.par_sort_by_key(|a| a.address);
2623 changes.contracts.par_sort_by_key(|a| a.0);
2624
2625 if !self.cached_storage_settings().use_hashed_state() {
2626 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2628 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2629 for (address, account) in changes.accounts {
2631 if let Some(account) = account {
2632 tracing::trace!(?address, "Updating plain state account");
2633 accounts_cursor.upsert(address, &account.into())?;
2634 } else if accounts_cursor.seek_exact(address)?.is_some() {
2635 tracing::trace!(?address, "Deleting plain state account");
2636 accounts_cursor.delete_current()?;
2637 }
2638 }
2639
2640 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2642 let mut storages_cursor =
2643 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2644 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2645 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2647 storages_cursor.delete_current_duplicates()?;
2648 }
2649 let mut storage = storage
2651 .into_iter()
2652 .map(|(k, value)| StorageEntry { key: k.into(), value })
2653 .collect::<Vec<_>>();
2654 storage.par_sort_unstable_by_key(|a| a.key);
2656
2657 for entry in storage {
2658 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2659 if let Some(db_entry) =
2660 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2661 db_entry.key == entry.key
2662 {
2663 storages_cursor.delete_current()?;
2664 }
2665
2666 if !entry.value.is_zero() {
2667 storages_cursor.upsert(address, &entry)?;
2668 }
2669 }
2670 }
2671 }
2672
2673 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2675 self.write_bytecodes(
2676 changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2677 )?;
2678
2679 Ok(())
2680 }
2681
2682 #[instrument(level = "debug", target = "providers::db", skip_all)]
2683 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2684 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2686 for (hashed_address, account) in hashed_state.accounts() {
2687 if let Some(account) = account {
2688 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2689 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2690 hashed_accounts_cursor.delete_current()?;
2691 }
2692 }
2693
2694 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2696 let mut hashed_storage_cursor =
2697 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2698 for (hashed_address, storage) in sorted_storages {
2699 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2700 hashed_storage_cursor.delete_current_duplicates()?;
2701 }
2702
2703 for (hashed_slot, value) in storage.storage_slots_ref() {
2704 let entry = StorageEntry { key: *hashed_slot, value: *value };
2705
2706 if let Some(db_entry) =
2707 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2708 db_entry.key == entry.key
2709 {
2710 hashed_storage_cursor.delete_current()?;
2711 }
2712
2713 if !entry.value.is_zero() {
2714 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2715 }
2716 }
2717 }
2718
2719 Ok(())
2720 }
2721
2722 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2744 let range = block + 1..=self.last_block_number()?;
2745
2746 if range.is_empty() {
2747 return Ok(());
2748 }
2749
2750 let block_bodies = self.block_body_indices_range(range.clone())?;
2752
2753 let from_transaction_num =
2755 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2756
2757 let storage_range = BlockNumberAddress::range(range.clone());
2758 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2759 let changesets = self.storage_changesets_range(range.clone())?;
2760 let mut changeset_writer =
2761 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2762 changeset_writer.prune_storage_changesets(block)?;
2763 changesets
2764 } else {
2765 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2766 };
2767 let account_changeset = if self.cached_storage_settings().storage_v2 {
2768 let changesets = self.account_changesets_range(range)?;
2769 let mut changeset_writer =
2770 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2771 changeset_writer.prune_account_changesets(block)?;
2772 changesets
2773 } else {
2774 self.take::<tables::AccountChangeSets>(range)?
2775 };
2776
2777 if self.cached_storage_settings().use_hashed_state() {
2778 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2779 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2780
2781 let (state, _) = self.populate_bundle_state_hashed(
2782 account_changeset,
2783 storage_changeset,
2784 &mut hashed_accounts_cursor,
2785 &mut hashed_storage_cursor,
2786 )?;
2787
2788 for (address, (old_account, new_account, storage)) in &state {
2789 if old_account != new_account {
2790 let hashed_address = keccak256(address);
2791 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2792 if let Some(account) = old_account {
2793 hashed_accounts_cursor.upsert(hashed_address, account)?;
2794 } else if existing_entry.is_some() {
2795 hashed_accounts_cursor.delete_current()?;
2796 }
2797 }
2798
2799 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2800 let hashed_address = keccak256(address);
2801 let hashed_storage_key = keccak256(storage_key);
2802 let storage_entry =
2803 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2804 if hashed_storage_cursor
2805 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2806 .is_some_and(|s| s.key == hashed_storage_key)
2807 {
2808 hashed_storage_cursor.delete_current()?
2809 }
2810
2811 if !old_storage_value.is_zero() {
2812 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2813 }
2814 }
2815 }
2816 } else {
2817 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2822 let mut plain_storage_cursor =
2823 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2824
2825 let (state, _) = self.populate_bundle_state_plain(
2826 account_changeset,
2827 storage_changeset,
2828 &mut plain_accounts_cursor,
2829 &mut plain_storage_cursor,
2830 )?;
2831
2832 for (address, (old_account, new_account, storage)) in &state {
2833 if old_account != new_account {
2834 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2835 if let Some(account) = old_account {
2836 plain_accounts_cursor.upsert(*address, account)?;
2837 } else if existing_entry.is_some() {
2838 plain_accounts_cursor.delete_current()?;
2839 }
2840 }
2841
2842 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2843 let storage_entry =
2844 StorageEntry { key: *storage_key, value: *old_storage_value };
2845 if plain_storage_cursor
2846 .seek_by_key_subkey(*address, *storage_key)?
2847 .is_some_and(|s| s.key == *storage_key)
2848 {
2849 plain_storage_cursor.delete_current()?
2850 }
2851
2852 if !old_storage_value.is_zero() {
2853 plain_storage_cursor.upsert(*address, &storage_entry)?;
2854 }
2855 }
2856 }
2857 }
2858
2859 self.remove_receipts_from(from_transaction_num, block)?;
2860
2861 Ok(())
2862 }
2863
2864 fn take_state_above(
2886 &self,
2887 block: BlockNumber,
2888 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2889 let range = block + 1..=self.last_block_number()?;
2890
2891 if range.is_empty() {
2892 return Ok(ExecutionOutcome::default())
2893 }
2894 let start_block_number = *range.start();
2895
2896 let block_bodies = self.block_body_indices_range(range.clone())?;
2898
2899 let from_transaction_num =
2901 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2902 let to_transaction_num =
2903 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2904
2905 let storage_range = BlockNumberAddress::range(range.clone());
2906 let storage_changeset = if let Some(highest_block) = self
2907 .static_file_provider
2908 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2909 self.cached_storage_settings().storage_v2
2910 {
2911 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2912 let mut changeset_writer =
2913 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2914 changeset_writer.prune_storage_changesets(block)?;
2915 changesets
2916 } else {
2917 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2918 };
2919
2920 let highest_changeset_block = self
2922 .static_file_provider
2923 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2924 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2925 self.cached_storage_settings().storage_v2
2926 {
2927 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2929 let mut changeset_writer =
2930 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2931 changeset_writer.prune_account_changesets(block)?;
2932
2933 changesets
2934 } else {
2935 self.take::<tables::AccountChangeSets>(range)?
2938 };
2939
2940 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2941 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2942 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2943
2944 let (state, reverts) = self.populate_bundle_state_hashed(
2945 account_changeset,
2946 storage_changeset,
2947 &mut hashed_accounts_cursor,
2948 &mut hashed_storage_cursor,
2949 )?;
2950
2951 for (address, (old_account, new_account, storage)) in &state {
2952 if old_account != new_account {
2953 let hashed_address = keccak256(address);
2954 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2955 if let Some(account) = old_account {
2956 hashed_accounts_cursor.upsert(hashed_address, account)?;
2957 } else if existing_entry.is_some() {
2958 hashed_accounts_cursor.delete_current()?;
2959 }
2960 }
2961
2962 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2963 let hashed_address = keccak256(address);
2964 let hashed_storage_key = keccak256(storage_key);
2965 let storage_entry =
2966 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2967 if hashed_storage_cursor
2968 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2969 .is_some_and(|s| s.key == hashed_storage_key)
2970 {
2971 hashed_storage_cursor.delete_current()?
2972 }
2973
2974 if !old_storage_value.is_zero() {
2975 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2976 }
2977 }
2978 }
2979
2980 (state, reverts)
2981 } else {
2982 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2987 let mut plain_storage_cursor =
2988 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2989
2990 let (state, reverts) = self.populate_bundle_state_plain(
2991 account_changeset,
2992 storage_changeset,
2993 &mut plain_accounts_cursor,
2994 &mut plain_storage_cursor,
2995 )?;
2996
2997 for (address, (old_account, new_account, storage)) in &state {
2998 if old_account != new_account {
2999 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
3000 if let Some(account) = old_account {
3001 plain_accounts_cursor.upsert(*address, account)?;
3002 } else if existing_entry.is_some() {
3003 plain_accounts_cursor.delete_current()?;
3004 }
3005 }
3006
3007 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
3008 let storage_entry =
3009 StorageEntry { key: *storage_key, value: *old_storage_value };
3010 if plain_storage_cursor
3011 .seek_by_key_subkey(*address, *storage_key)?
3012 .is_some_and(|s| s.key == *storage_key)
3013 {
3014 plain_storage_cursor.delete_current()?
3015 }
3016
3017 if !old_storage_value.is_zero() {
3018 plain_storage_cursor.upsert(*address, &storage_entry)?;
3019 }
3020 }
3021 }
3022
3023 (state, reverts)
3024 };
3025
3026 let mut receipts_iter = self
3028 .static_file_provider
3029 .get_range_with_static_file_or_database(
3030 StaticFileSegment::Receipts,
3031 from_transaction_num..to_transaction_num + 1,
3032 |static_file, range, _| {
3033 static_file
3034 .receipts_by_tx_range(range.clone())
3035 .map(|r| range.into_iter().zip(r).collect())
3036 },
3037 |range, _| {
3038 self.tx
3039 .cursor_read::<tables::Receipts<Self::Receipt>>()?
3040 .walk_range(range)?
3041 .map(|r| r.map_err(Into::into))
3042 .collect()
3043 },
3044 |_| true,
3045 )?
3046 .into_iter()
3047 .peekable();
3048
3049 let mut receipts = Vec::with_capacity(block_bodies.len());
3050 for block_body in block_bodies {
3052 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3053 for num in block_body.tx_num_range() {
3054 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3055 block_receipts.push(receipts_iter.next().unwrap().1);
3056 }
3057 }
3058 receipts.push(block_receipts);
3059 }
3060
3061 self.remove_receipts_from(from_transaction_num, block)?;
3062
3063 Ok(ExecutionOutcome::new_init(
3064 state,
3065 reverts,
3066 Vec::new(),
3067 receipts,
3068 start_block_number,
3069 Vec::new(),
3070 ))
3071 }
3072}
3073
3074impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3075 fn write_account_trie_updates<A: TrieTableAdapter>(
3076 tx: &TX,
3077 trie_updates: &TrieUpdatesSorted,
3078 num_entries: &mut usize,
3079 ) -> ProviderResult<()>
3080 where
3081 TX: DbTxMut,
3082 {
3083 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3084 for (key, updated_node) in trie_updates.account_nodes_ref() {
3086 let nibbles = A::AccountKey::from(*key);
3087 match updated_node {
3088 Some(node) => {
3089 if !key.is_empty() {
3090 *num_entries += 1;
3091 account_trie_cursor.upsert(nibbles, node)?;
3092 }
3093 }
3094 None => {
3095 *num_entries += 1;
3096 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3097 account_trie_cursor.delete_current()?;
3098 }
3099 }
3100 }
3101 }
3102 Ok(())
3103 }
3104
3105 fn write_storage_tries<A: TrieTableAdapter>(
3106 tx: &TX,
3107 storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3108 num_entries: &mut usize,
3109 ) -> ProviderResult<()>
3110 where
3111 TX: DbTxMut,
3112 {
3113 let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3114 for (hashed_address, storage_trie_updates) in storage_tries {
3115 let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3116 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3117 *num_entries +=
3118 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3119 cursor = db_storage_trie_cursor.cursor;
3120 }
3121 Ok(())
3122 }
3123}
3124
3125impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3126 #[instrument(level = "debug", target = "providers::db", skip_all)]
3130 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3131 if trie_updates.is_empty() {
3132 return Ok(0)
3133 }
3134
3135 let mut num_entries = 0;
3137
3138 reth_trie_db::with_adapter!(self, |A| {
3139 Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3140 });
3141
3142 num_entries +=
3143 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3144
3145 Ok(num_entries)
3146 }
3147}
3148
3149impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3150 fn write_storage_trie_updates_sorted<'a>(
3156 &self,
3157 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3158 ) -> ProviderResult<usize> {
3159 let mut num_entries = 0;
3160 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3161 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3162 reth_trie_db::with_adapter!(self, |A| {
3163 Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3164 });
3165 Ok(num_entries)
3166 }
3167}
3168
3169impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3170 fn unwind_account_hashing<'a>(
3171 &self,
3172 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3173 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3174 let hashed_accounts = changesets
3178 .into_iter()
3179 .map(|(_, e)| (keccak256(e.address), e.info))
3180 .collect::<Vec<_>>()
3181 .into_iter()
3182 .rev()
3183 .collect::<BTreeMap<_, _>>();
3184
3185 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3187 for (hashed_address, account) in &hashed_accounts {
3188 if let Some(account) = account {
3189 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3190 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3191 hashed_accounts_cursor.delete_current()?;
3192 }
3193 }
3194
3195 Ok(hashed_accounts)
3196 }
3197
3198 fn unwind_account_hashing_range(
3199 &self,
3200 range: impl RangeBounds<BlockNumber>,
3201 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3202 let changesets = self.account_changesets_range(range)?;
3203 self.unwind_account_hashing(changesets.iter())
3204 }
3205
3206 fn insert_account_for_hashing(
3207 &self,
3208 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3209 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3210 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3211 let hashed_accounts =
3212 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3213 for (hashed_address, account) in &hashed_accounts {
3214 if let Some(account) = account {
3215 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3216 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3217 hashed_accounts_cursor.delete_current()?;
3218 }
3219 }
3220 Ok(hashed_accounts)
3221 }
3222
3223 fn unwind_storage_hashing(
3224 &self,
3225 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3226 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3227 let mut hashed_storages = changesets
3229 .into_iter()
3230 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3231 let hashed_key = keccak256(storage_entry.key);
3232 (keccak256(address), hashed_key, storage_entry.value)
3233 })
3234 .collect::<Vec<_>>();
3235 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3236
3237 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3239 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3240 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3241 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3242 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3243
3244 if hashed_storage
3245 .seek_by_key_subkey(hashed_address, key)?
3246 .is_some_and(|entry| entry.key == key)
3247 {
3248 hashed_storage.delete_current()?;
3249 }
3250
3251 if !value.is_zero() {
3252 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3253 }
3254 }
3255 Ok(hashed_storage_keys)
3256 }
3257
3258 fn unwind_storage_hashing_range(
3259 &self,
3260 range: impl RangeBounds<BlockNumber>,
3261 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3262 let changesets = self.storage_changesets_range(range)?;
3263 self.unwind_storage_hashing(changesets.into_iter())
3264 }
3265
3266 fn insert_storage_for_hashing(
3267 &self,
3268 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3269 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3270 let hashed_storages =
3272 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3273 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3274 map.insert(keccak256(entry.key), entry.value);
3275 map
3276 });
3277 map.insert(keccak256(address), storage);
3278 map
3279 });
3280
3281 let hashed_storage_keys = hashed_storages
3282 .iter()
3283 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3284 .collect();
3285
3286 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3287 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3290 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3291 if hashed_storage_cursor
3292 .seek_by_key_subkey(hashed_address, key)?
3293 .is_some_and(|entry| entry.key == key)
3294 {
3295 hashed_storage_cursor.delete_current()?;
3296 }
3297
3298 if !value.is_zero() {
3299 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3300 }
3301 Ok(())
3302 })
3303 })?;
3304
3305 Ok(hashed_storage_keys)
3306 }
3307}
3308
3309impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3310 fn unwind_account_history_indices<'a>(
3311 &self,
3312 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3313 ) -> ProviderResult<usize> {
3314 let mut last_indices = changesets
3315 .into_iter()
3316 .map(|(index, account)| (account.address, *index))
3317 .collect::<Vec<_>>();
3318 last_indices.sort_unstable_by_key(|(a, _)| *a);
3319
3320 if self.cached_storage_settings().storage_v2 {
3321 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3322 self.pending_rocksdb_batches.lock().push(batch);
3323 } else {
3324 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3326 for &(address, rem_index) in &last_indices {
3327 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3328 &mut cursor,
3329 ShardedKey::last(address),
3330 rem_index,
3331 |sharded_key| sharded_key.key == address,
3332 )?;
3333
3334 if !partial_shard.is_empty() {
3337 cursor.insert(
3338 ShardedKey::last(address),
3339 &BlockNumberList::new_pre_sorted(partial_shard),
3340 )?;
3341 }
3342 }
3343 }
3344
3345 let changesets = last_indices.len();
3346 Ok(changesets)
3347 }
3348
3349 fn unwind_account_history_indices_range(
3350 &self,
3351 range: impl RangeBounds<BlockNumber>,
3352 ) -> ProviderResult<usize> {
3353 let changesets = self.account_changesets_range(range)?;
3354 self.unwind_account_history_indices(changesets.iter())
3355 }
3356
3357 fn insert_account_history_index(
3358 &self,
3359 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3360 ) -> ProviderResult<()> {
3361 self.append_history_index::<_, tables::AccountsHistory>(
3362 account_transitions,
3363 ShardedKey::new,
3364 )
3365 }
3366
3367 fn unwind_storage_history_indices(
3368 &self,
3369 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3370 ) -> ProviderResult<usize> {
3371 let mut storage_changesets = changesets
3372 .into_iter()
3373 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3374 .collect::<Vec<_>>();
3375 storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3376
3377 if self.cached_storage_settings().storage_v2 {
3378 let batch =
3379 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3380 self.pending_rocksdb_batches.lock().push(batch);
3381 } else {
3382 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3384 for &(address, storage_key, rem_index) in &storage_changesets {
3385 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3386 &mut cursor,
3387 StorageShardedKey::last(address, storage_key),
3388 rem_index,
3389 |storage_sharded_key| {
3390 storage_sharded_key.address == address &&
3391 storage_sharded_key.sharded_key.key == storage_key
3392 },
3393 )?;
3394
3395 if !partial_shard.is_empty() {
3398 cursor.insert(
3399 StorageShardedKey::last(address, storage_key),
3400 &BlockNumberList::new_pre_sorted(partial_shard),
3401 )?;
3402 }
3403 }
3404 }
3405
3406 let changesets = storage_changesets.len();
3407 Ok(changesets)
3408 }
3409
3410 fn unwind_storage_history_indices_range(
3411 &self,
3412 range: impl RangeBounds<BlockNumber>,
3413 ) -> ProviderResult<usize> {
3414 let changesets = self.storage_changesets_range(range)?;
3415 self.unwind_storage_history_indices(changesets.into_iter())
3416 }
3417
3418 fn insert_storage_history_index(
3419 &self,
3420 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3421 ) -> ProviderResult<()> {
3422 self.append_history_index::<_, tables::StoragesHistory>(
3423 storage_transitions,
3424 |(address, storage_key), highest_block_number| {
3425 StorageShardedKey::new(address, storage_key, highest_block_number)
3426 },
3427 )
3428 }
3429
3430 #[instrument(level = "debug", target = "providers::db", skip_all)]
3431 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3432 let storage_settings = self.cached_storage_settings();
3433 if !storage_settings.storage_v2 {
3434 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3435 self.insert_account_history_index(indices)?;
3436 }
3437
3438 if !storage_settings.storage_v2 {
3439 let indices = self.changed_storages_and_blocks_with_range(range)?;
3440 self.insert_storage_history_index(indices)?;
3441 }
3442
3443 Ok(())
3444 }
3445}
3446
3447impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3448 for DatabaseProvider<TX, N>
3449{
3450 fn take_block_and_execution_above(
3451 &self,
3452 block: BlockNumber,
3453 ) -> ProviderResult<Chain<Self::Primitives>> {
3454 let range = block + 1..=self.last_block_number()?;
3455
3456 self.unwind_trie_state_from(block + 1)?;
3457
3458 let execution_state = self.take_state_above(block)?;
3460
3461 let blocks = self.recovered_block_range(range)?;
3462
3463 self.remove_blocks_above(block)?;
3466
3467 self.update_pipeline_stages(block, true)?;
3469
3470 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3471 }
3472
3473 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3474 self.unwind_trie_state_from(block + 1)?;
3475
3476 self.remove_state_above(block)?;
3478
3479 self.remove_blocks_above(block)?;
3482
3483 self.update_pipeline_stages(block, true)?;
3485
3486 Ok(())
3487 }
3488}
3489
3490impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3491 for DatabaseProvider<TX, N>
3492{
3493 type Block = BlockTy<N>;
3494 type Receipt = ReceiptTy<N>;
3495
3496 fn insert_block(
3501 &self,
3502 block: &RecoveredBlock<Self::Block>,
3503 ) -> ProviderResult<StoredBlockBodyIndices> {
3504 let block_number = block.number();
3505
3506 let executed_block = ExecutedBlock::new(
3508 Arc::new(block.clone()),
3509 Arc::new(BlockExecutionOutput {
3510 result: BlockExecutionResult {
3511 receipts: Default::default(),
3512 requests: Default::default(),
3513 gas_used: 0,
3514 blob_gas_used: 0,
3515 },
3516 state: Default::default(),
3517 }),
3518 ComputedTrieData::default(),
3519 );
3520
3521 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3523
3524 self.block_body_indices(block_number)?
3526 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3527 }
3528
3529 fn append_block_bodies(
3530 &self,
3531 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3532 ) -> ProviderResult<()> {
3533 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3534
3535 let mut tx_writer =
3537 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3538
3539 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3540 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3541
3542 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3544
3545 for (block_number, body) in &bodies {
3546 tx_writer.increment_block(*block_number)?;
3548
3549 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3550 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3551
3552 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3553
3554 block_indices_cursor.append(*block_number, &block_indices)?;
3556
3557 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3558
3559 let Some(body) = body else { continue };
3560
3561 if !body.transactions().is_empty() {
3563 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3564 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3565 }
3566
3567 for transaction in body.transactions() {
3569 tx_writer.append_transaction(next_tx_num, transaction)?;
3570
3571 next_tx_num += 1;
3573 }
3574 }
3575
3576 self.storage.writer().write_block_bodies(self, bodies)?;
3577
3578 Ok(())
3579 }
3580
3581 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3582 let last_block_number = self.last_block_number()?;
3583 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3585 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3586 }
3587
3588 let highest_static_file_block = self
3590 .static_file_provider()
3591 .get_highest_static_file_block(StaticFileSegment::Headers)
3592 .expect("todo: error handling, headers should exist");
3593
3594 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3600 self.static_file_provider()
3601 .get_writer(block, StaticFileSegment::Headers)?
3602 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3603
3604 let unwind_tx_from = self
3606 .block_body_indices(block)?
3607 .map(|b| b.next_tx_num())
3608 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3609
3610 let unwind_tx_to = self
3612 .tx
3613 .cursor_read::<tables::BlockBodyIndices>()?
3614 .last()?
3615 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3617 .1
3618 .last_tx_num();
3619
3620 if unwind_tx_from <= unwind_tx_to {
3621 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3622 self.with_rocksdb_batch(|batch| {
3623 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3624 for (hash, _) in hashes {
3625 writer.delete_transaction_hash_number(hash)?;
3626 }
3627 Ok(((), writer.into_raw_rocksdb_batch()))
3628 })?;
3629 }
3630
3631 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3634 EitherWriter::new_senders(self, last_block_number)?
3635 .prune_senders(unwind_tx_from, block)?;
3636 }
3637
3638 self.remove_bodies_above(block)?;
3639
3640 Ok(())
3641 }
3642
3643 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3644 self.storage.writer().remove_block_bodies_above(self, block)?;
3645
3646 let unwind_tx_from = self
3648 .block_body_indices(block)?
3649 .map(|b| b.next_tx_num())
3650 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3651
3652 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3653 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3654
3655 let static_file_tx_num =
3656 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3657
3658 let to_delete = static_file_tx_num
3659 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3660 .unwrap_or_default();
3661
3662 self.static_file_provider
3663 .latest_writer(StaticFileSegment::Transactions)?
3664 .prune_transactions(to_delete, block)?;
3665
3666 Ok(())
3667 }
3668
3669 fn append_blocks_with_state(
3678 &self,
3679 blocks: Vec<RecoveredBlock<Self::Block>>,
3680 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3681 hashed_state: HashedPostStateSorted,
3682 ) -> ProviderResult<()> {
3683 if blocks.is_empty() {
3684 debug!(target: "providers::db", "Attempted to append empty block range");
3685 return Ok(())
3686 }
3687
3688 let first_number = blocks[0].number();
3691
3692 let last_block_number = blocks[blocks.len() - 1].number();
3695
3696 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3697
3698 let (account_transitions, storage_transitions) = {
3703 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3704 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3705 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3706 let block_number = first_number + block_idx as u64;
3707 for (address, account_revert) in block_reverts {
3708 account_transitions.entry(*address).or_default().push(block_number);
3709 for storage_key in account_revert.storage.keys() {
3710 let key = B256::from(storage_key.to_be_bytes());
3711 storage_transitions.entry((*address, key)).or_default().push(block_number);
3712 }
3713 }
3714 }
3715 (account_transitions, storage_transitions)
3716 };
3717
3718 for block in blocks {
3720 self.insert_block(&block)?;
3721 durations_recorder.record_relative(metrics::Action::InsertBlock);
3722 }
3723
3724 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3725 durations_recorder.record_relative(metrics::Action::InsertState);
3726
3727 self.write_hashed_state(&hashed_state)?;
3729 durations_recorder.record_relative(metrics::Action::InsertHashes);
3730
3731 let storage_settings = self.cached_storage_settings();
3736 if storage_settings.storage_v2 {
3737 self.with_rocksdb_batch(|mut batch| {
3738 for (address, blocks) in account_transitions {
3739 batch.append_account_history_shard(address, blocks)?;
3740 }
3741 Ok(((), Some(batch.into_inner())))
3742 })?;
3743 } else {
3744 self.insert_account_history_index(account_transitions)?;
3745 }
3746 if storage_settings.storage_v2 {
3747 self.with_rocksdb_batch(|mut batch| {
3748 for ((address, key), blocks) in storage_transitions {
3749 batch.append_storage_history_shard(address, key, blocks)?;
3750 }
3751 Ok(((), Some(batch.into_inner())))
3752 })?;
3753 } else {
3754 self.insert_storage_history_index(storage_transitions)?;
3755 }
3756 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3757
3758 self.update_pipeline_stages(last_block_number, false)?;
3760 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3761
3762 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3763
3764 Ok(())
3765 }
3766}
3767
3768impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3769 fn get_prune_checkpoint(
3770 &self,
3771 segment: PruneSegment,
3772 ) -> ProviderResult<Option<PruneCheckpoint>> {
3773 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3774 }
3775
3776 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3777 Ok(PruneSegment::variants()
3778 .filter_map(|segment| {
3779 self.tx
3780 .get::<tables::PruneCheckpoints>(segment)
3781 .transpose()
3782 .map(|chk| chk.map(|chk| (segment, chk)))
3783 })
3784 .collect::<Result<_, _>>()?)
3785 }
3786}
3787
3788impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3789 fn save_prune_checkpoint(
3790 &self,
3791 segment: PruneSegment,
3792 checkpoint: PruneCheckpoint,
3793 ) -> ProviderResult<()> {
3794 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3795 }
3796}
3797
3798impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3799 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3800 let db_entries = self.tx.entries::<T>()?;
3801 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3802 Ok(entries) => entries,
3803 Err(ProviderError::UnsupportedProvider) => 0,
3804 Err(err) => return Err(err),
3805 };
3806
3807 Ok(db_entries + static_file_entries)
3808 }
3809}
3810
3811impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3812 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3813 let mut finalized_blocks = self
3814 .tx
3815 .cursor_read::<tables::ChainState>()?
3816 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3817 .take(1)
3818 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3819
3820 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3821 Ok(last_finalized_block_number)
3822 }
3823
3824 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3825 let mut finalized_blocks = self
3826 .tx
3827 .cursor_read::<tables::ChainState>()?
3828 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3829 .take(1)
3830 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3831
3832 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3833 Ok(last_finalized_block_number)
3834 }
3835}
3836
3837impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3838 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3839 Ok(self
3840 .tx
3841 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3842 }
3843
3844 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3845 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3846 }
3847}
3848
3849impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3850 type Tx = TX;
3851
3852 fn tx_ref(&self) -> &Self::Tx {
3853 &self.tx
3854 }
3855
3856 fn tx_mut(&mut self) -> &mut Self::Tx {
3857 &mut self.tx
3858 }
3859
3860 fn into_tx(self) -> Self::Tx {
3861 self.tx
3862 }
3863
3864 fn prune_modes_ref(&self) -> &PruneModes {
3865 self.prune_modes_ref()
3866 }
3867
3868 #[instrument(
3870 name = "DatabaseProvider::commit",
3871 level = "debug",
3872 target = "providers::db",
3873 skip_all
3874 )]
3875 fn commit(self) -> ProviderResult<()> {
3876 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3877 self.commit_unwind()?;
3878 } else {
3879 let mut timings = metrics::CommitTimings::default();
3881
3882 let start = Instant::now();
3883 self.static_file_provider.finalize()?;
3884 timings.sf = start.elapsed();
3885
3886 let start = Instant::now();
3887 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3888 for batch in batches {
3889 self.rocksdb_provider.commit_batch(batch)?;
3890 }
3891 timings.rocksdb = start.elapsed();
3892
3893 let start = Instant::now();
3894 self.tx.commit()?;
3895 timings.mdbx = start.elapsed();
3896
3897 self.metrics.record_commit(&timings);
3898 }
3899
3900 Ok(())
3901 }
3902}
3903
3904impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3905 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3906 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3907 }
3908}
3909
3910impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3911 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3912 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3913 }
3914}
3915
3916impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3917 fn cached_storage_settings(&self) -> StorageSettings {
3918 *self.storage_settings.read()
3919 }
3920
3921 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3922 *self.storage_settings.write() = settings;
3923 }
3924}
3925
3926impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3927 fn storage_path(&self) -> PathBuf {
3928 self.db_path.clone()
3929 }
3930}
3931
3932#[cfg(test)]
3933mod tests {
3934 use super::*;
3935 use crate::{
3936 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3937 BlockWriter,
3938 };
3939 use alloy_consensus::Header;
3940 use alloy_primitives::{
3941 map::{AddressMap, B256Map},
3942 U256,
3943 };
3944 use reth_chain_state::ExecutedBlock;
3945 use reth_db_api::models::StorageSettings;
3946 use reth_ethereum_primitives::Receipt;
3947 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3948 use reth_primitives_traits::SealedBlock;
3949 use reth_storage_api::MetadataWriter;
3950 use reth_testing_utils::generators::{self, random_block, BlockParams};
3951 use reth_trie::{
3952 HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3953 };
3954 use revm_database::BundleState;
3955 use revm_state::AccountInfo;
3956 use std::{sync::mpsc, time::Duration};
3957
3958 #[test]
3959 fn test_receipts_by_block_range_empty_range() {
3960 let factory = create_test_provider_factory();
3961 let provider = factory.provider().unwrap();
3962
3963 let start = 10u64;
3965 let end = 9u64;
3966 let result = provider.receipts_by_block_range(start..=end).unwrap();
3967 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3968 }
3969
3970 #[test]
3971 fn unwind_commit_waits_for_pre_commit_readers() {
3972 let factory = create_test_provider_factory();
3973
3974 let reader = factory.provider().unwrap();
3975 let provider_rw = factory.unwind_provider_rw().unwrap();
3976 provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3977 let (done_tx, done_rx) = mpsc::channel();
3978
3979 let handle = std::thread::spawn(move || {
3980 let result = provider_rw.commit();
3981 done_tx.send(result).unwrap();
3982 });
3983
3984 assert!(
3985 done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3986 "unwind commit should wait while an older read transaction is still open"
3987 );
3988
3989 drop(reader);
3990
3991 done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
3992 handle.join().unwrap();
3993 }
3994
3995 #[test]
3996 fn test_receipts_by_block_range_nonexistent_blocks() {
3997 let factory = create_test_provider_factory();
3998 let provider = factory.provider().unwrap();
3999
4000 let result = provider.receipts_by_block_range(10..=12).unwrap();
4002 assert_eq!(result, vec![vec![], vec![], vec![]]);
4003 }
4004
4005 #[test]
4006 fn test_receipts_by_block_range_single_block() {
4007 let factory = create_test_provider_factory();
4008 let data = BlockchainTestData::default();
4009
4010 let provider_rw = factory.provider_rw().unwrap();
4011 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4012 provider_rw
4013 .write_state(
4014 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4015 crate::OriginalValuesKnown::No,
4016 StateWriteConfig::default(),
4017 )
4018 .unwrap();
4019 provider_rw.insert_block(&data.blocks[0].0).unwrap();
4020 provider_rw
4021 .write_state(
4022 &data.blocks[0].1,
4023 crate::OriginalValuesKnown::No,
4024 StateWriteConfig::default(),
4025 )
4026 .unwrap();
4027 provider_rw.commit().unwrap();
4028
4029 let provider = factory.provider().unwrap();
4030 let result = provider.receipts_by_block_range(1..=1).unwrap();
4031
4032 assert_eq!(result.len(), 1);
4034 assert_eq!(result[0].len(), 1);
4035 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4036 }
4037
4038 #[test]
4039 fn test_receipts_by_block_range_multiple_blocks() {
4040 let factory = create_test_provider_factory();
4041 let data = BlockchainTestData::default();
4042
4043 let provider_rw = factory.provider_rw().unwrap();
4044 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4045 provider_rw
4046 .write_state(
4047 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4048 crate::OriginalValuesKnown::No,
4049 StateWriteConfig::default(),
4050 )
4051 .unwrap();
4052 for i in 0..3 {
4053 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4054 provider_rw
4055 .write_state(
4056 &data.blocks[i].1,
4057 crate::OriginalValuesKnown::No,
4058 StateWriteConfig::default(),
4059 )
4060 .unwrap();
4061 }
4062 provider_rw.commit().unwrap();
4063
4064 let provider = factory.provider().unwrap();
4065 let result = provider.receipts_by_block_range(1..=3).unwrap();
4066
4067 assert_eq!(result.len(), 3);
4069 for (i, block_receipts) in result.iter().enumerate() {
4070 assert_eq!(block_receipts.len(), 1);
4071 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4072 }
4073 }
4074
4075 #[test]
4076 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4077 let factory = create_test_provider_factory();
4078 let data = BlockchainTestData::default();
4079
4080 let provider_rw = factory.provider_rw().unwrap();
4081 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4082 provider_rw
4083 .write_state(
4084 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4085 crate::OriginalValuesKnown::No,
4086 StateWriteConfig::default(),
4087 )
4088 .unwrap();
4089
4090 for i in 0..3 {
4092 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4093 provider_rw
4094 .write_state(
4095 &data.blocks[i].1,
4096 crate::OriginalValuesKnown::No,
4097 StateWriteConfig::default(),
4098 )
4099 .unwrap();
4100 }
4101 provider_rw.commit().unwrap();
4102
4103 let provider = factory.provider().unwrap();
4104 let result = provider.receipts_by_block_range(1..=3).unwrap();
4105
4106 assert_eq!(result.len(), 3);
4108 for block_receipts in &result {
4109 assert_eq!(block_receipts.len(), 1);
4110 }
4111 }
4112
4113 #[test]
4114 fn test_receipts_by_block_range_partial_range() {
4115 let factory = create_test_provider_factory();
4116 let data = BlockchainTestData::default();
4117
4118 let provider_rw = factory.provider_rw().unwrap();
4119 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4120 provider_rw
4121 .write_state(
4122 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4123 crate::OriginalValuesKnown::No,
4124 StateWriteConfig::default(),
4125 )
4126 .unwrap();
4127 for i in 0..3 {
4128 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4129 provider_rw
4130 .write_state(
4131 &data.blocks[i].1,
4132 crate::OriginalValuesKnown::No,
4133 StateWriteConfig::default(),
4134 )
4135 .unwrap();
4136 }
4137 provider_rw.commit().unwrap();
4138
4139 let provider = factory.provider().unwrap();
4140
4141 let result = provider.receipts_by_block_range(2..=5).unwrap();
4143 assert_eq!(result.len(), 4);
4144
4145 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4152 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4153 }
4154
4155 #[test]
4156 fn test_receipts_by_block_range_all_empty_blocks() {
4157 let factory = create_test_provider_factory();
4158 let mut rng = generators::rng();
4159
4160 let mut blocks = Vec::new();
4162 for i in 0..3 {
4163 let block =
4164 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4165 blocks.push(block);
4166 }
4167
4168 let provider_rw = factory.provider_rw().unwrap();
4169 for block in blocks {
4170 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4171 }
4172 provider_rw.commit().unwrap();
4173
4174 let provider = factory.provider().unwrap();
4175 let result = provider.receipts_by_block_range(1..=3).unwrap();
4176
4177 assert_eq!(result.len(), 3);
4178 for block_receipts in result {
4179 assert_eq!(block_receipts.len(), 0);
4180 }
4181 }
4182
4183 #[test]
4184 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4185 let factory = create_test_provider_factory();
4186 let data = BlockchainTestData::default();
4187
4188 let provider_rw = factory.provider_rw().unwrap();
4189 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4190 provider_rw
4191 .write_state(
4192 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4193 crate::OriginalValuesKnown::No,
4194 StateWriteConfig::default(),
4195 )
4196 .unwrap();
4197 for i in 0..3 {
4198 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4199 provider_rw
4200 .write_state(
4201 &data.blocks[i].1,
4202 crate::OriginalValuesKnown::No,
4203 StateWriteConfig::default(),
4204 )
4205 .unwrap();
4206 }
4207 provider_rw.commit().unwrap();
4208
4209 let provider = factory.provider().unwrap();
4210
4211 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4213
4214 let mut individual_results = Vec::new();
4216 for block_num in 1..=3 {
4217 let receipts =
4218 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4219 individual_results.push(receipts);
4220 }
4221
4222 assert_eq!(range_result, individual_results);
4223 }
4224
4225 #[test]
4226 fn test_write_trie_updates_sorted() {
4227 use reth_trie::{
4228 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4229 BranchNodeCompact, StorageTrieEntry,
4230 };
4231
4232 let factory = create_test_provider_factory();
4233 let provider_rw = factory.provider_rw().unwrap();
4234
4235 {
4237 let tx = provider_rw.tx_ref();
4238 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4239
4240 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4242 cursor
4243 .upsert(
4244 to_delete,
4245 &BranchNodeCompact::new(
4246 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4250 None,
4251 ),
4252 )
4253 .unwrap();
4254
4255 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4257 cursor
4258 .upsert(
4259 to_update,
4260 &BranchNodeCompact::new(
4261 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4265 None,
4266 ),
4267 )
4268 .unwrap();
4269 }
4270
4271 let storage_address1 = B256::from([1u8; 32]);
4273 let storage_address2 = B256::from([2u8; 32]);
4274 {
4275 let tx = provider_rw.tx_ref();
4276 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4277
4278 storage_cursor
4280 .upsert(
4281 storage_address1,
4282 &StorageTrieEntry {
4283 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4284 node: BranchNodeCompact::new(
4285 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4287 0b0000_0000_0000_0000,
4288 vec![],
4289 None,
4290 ),
4291 },
4292 )
4293 .unwrap();
4294
4295 storage_cursor
4297 .upsert(
4298 storage_address2,
4299 &StorageTrieEntry {
4300 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4301 node: BranchNodeCompact::new(
4302 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4304 0b0000_0000_0000_0000,
4305 vec![],
4306 None,
4307 ),
4308 },
4309 )
4310 .unwrap();
4311 storage_cursor
4312 .upsert(
4313 storage_address2,
4314 &StorageTrieEntry {
4315 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4316 node: BranchNodeCompact::new(
4317 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4319 0b0000_0000_0000_0000,
4320 vec![],
4321 None,
4322 ),
4323 },
4324 )
4325 .unwrap();
4326 }
4327
4328 let account_nodes = vec![
4330 (
4331 Nibbles::from_nibbles([0x1, 0x2]),
4332 Some(BranchNodeCompact::new(
4333 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4337 None,
4338 )),
4339 ),
4340 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4342 Nibbles::from_nibbles([0x5, 0x6]),
4343 Some(BranchNodeCompact::new(
4344 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4348 None,
4349 )),
4350 ),
4351 ];
4352
4353 let storage_trie1 = StorageTrieUpdatesSorted {
4355 is_deleted: false,
4356 storage_nodes: vec![
4357 (
4358 Nibbles::from_nibbles([0x1, 0x0]),
4359 Some(BranchNodeCompact::new(
4360 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4364 None,
4365 )),
4366 ),
4367 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4369 };
4370
4371 let storage_trie2 = StorageTrieUpdatesSorted {
4372 is_deleted: true, storage_nodes: vec![],
4374 };
4375
4376 let mut storage_tries = B256Map::default();
4377 storage_tries.insert(storage_address1, storage_trie1);
4378 storage_tries.insert(storage_address2, storage_trie2);
4379
4380 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4381
4382 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4384
4385 assert_eq!(num_entries, 5);
4388
4389 let tx = provider_rw.tx_ref();
4391 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4392
4393 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4395 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4396 assert!(entry1.is_some(), "Updated account node should exist");
4397 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4398 assert_eq!(
4399 entry1.unwrap().1.state_mask,
4400 expected_mask,
4401 "Account node should have updated state_mask"
4402 );
4403
4404 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4406 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4407 assert!(entry2.is_none(), "Deleted account node should not exist");
4408
4409 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4411 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4412 assert!(entry3.is_some(), "New account node should exist");
4413
4414 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4416
4417 let storage_entries1: Vec<_> = storage_cursor
4419 .walk_dup(Some(storage_address1), None)
4420 .unwrap()
4421 .collect::<Result<Vec<_>, _>>()
4422 .unwrap();
4423 assert_eq!(
4424 storage_entries1.len(),
4425 1,
4426 "Storage address1 should have 1 entry after deletion"
4427 );
4428 assert_eq!(
4429 storage_entries1[0].1.nibbles.0,
4430 Nibbles::from_nibbles([0x1, 0x0]),
4431 "Remaining entry should be [0x1, 0x0]"
4432 );
4433
4434 let storage_entries2: Vec<_> = storage_cursor
4436 .walk_dup(Some(storage_address2), None)
4437 .unwrap()
4438 .collect::<Result<Vec<_>, _>>()
4439 .unwrap();
4440 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4441
4442 provider_rw.commit().unwrap();
4443 }
4444
4445 #[test]
4446 fn test_prunable_receipts_logic() {
4447 let insert_blocks =
4448 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4449 let mut rng = generators::rng();
4450 for block_num in 0..=tip_block {
4451 let block = random_block(
4452 &mut rng,
4453 block_num,
4454 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4455 );
4456 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4457 }
4458 };
4459
4460 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4461 let outcome = ExecutionOutcome {
4462 first_block: block,
4463 receipts: vec![vec![Receipt {
4464 tx_type: Default::default(),
4465 success: true,
4466 cumulative_gas_used: block, logs: vec![],
4468 }]],
4469 ..Default::default()
4470 };
4471 provider_rw
4472 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4473 .unwrap();
4474 provider_rw.commit().unwrap();
4475 };
4476
4477 {
4479 let factory = create_test_provider_factory();
4480 let storage_settings = StorageSettings::v1();
4481 factory.set_storage_settings_cache(storage_settings);
4482 let factory = factory.with_prune_modes(PruneModes {
4483 receipts: Some(PruneMode::Before(100)),
4484 ..Default::default()
4485 });
4486
4487 let tip_block = 200u64;
4488 let first_block = 1u64;
4489
4490 let provider_rw = factory.provider_rw().unwrap();
4492 insert_blocks(&provider_rw, tip_block, 1);
4493 provider_rw.commit().unwrap();
4494
4495 write_receipts(
4496 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4497 first_block,
4498 );
4499 write_receipts(
4500 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4501 tip_block - 1,
4502 );
4503
4504 let provider = factory.provider().unwrap();
4505
4506 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4507 assert!(provider
4508 .receipts_by_block(block.into())
4509 .unwrap()
4510 .is_some_and(|r| r.len() == num_receipts));
4511 }
4512 }
4513
4514 {
4516 let factory = create_test_provider_factory();
4517 let storage_settings = StorageSettings::v2();
4518 factory.set_storage_settings_cache(storage_settings);
4519 let factory = factory.with_prune_modes(PruneModes {
4520 receipts: Some(PruneMode::Before(2)),
4521 ..Default::default()
4522 });
4523
4524 let tip_block = 200u64;
4525
4526 let provider_rw = factory.provider_rw().unwrap();
4528 insert_blocks(&provider_rw, tip_block, 1);
4529 provider_rw.commit().unwrap();
4530
4531 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4533 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4534
4535 assert!(factory
4536 .static_file_provider()
4537 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4538 .is_none(),);
4539 assert!(factory
4540 .static_file_provider()
4541 .get_highest_static_file_block(StaticFileSegment::Receipts)
4542 .is_some_and(|b| b == 1),);
4543
4544 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4547 assert!(factory
4548 .static_file_provider()
4549 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4550 .is_some_and(|num| num == 2),);
4551
4552 let factory = factory.with_prune_modes(PruneModes {
4556 receipts: Some(PruneMode::Before(100)),
4557 ..Default::default()
4558 });
4559 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4560 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4561 write_receipts(provider_rw, 3);
4562
4563 let provider = factory.provider().unwrap();
4568 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4569 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4570 assert!(provider
4571 .receipts_by_block(num.into())
4572 .unwrap()
4573 .is_some_and(|r| r.len() == num_receipts));
4574
4575 let receipt = provider.receipt(num).unwrap();
4576 if num_receipts > 0 {
4577 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4578 } else {
4579 assert!(receipt.is_none());
4580 }
4581 }
4582 }
4583 }
4584
4585 #[test]
4586 fn test_try_into_history_rejects_unexecuted_blocks() {
4587 use reth_storage_api::TryIntoHistoricalStateProvider;
4588
4589 let factory = create_test_provider_factory();
4590
4591 let data = BlockchainTestData::default();
4593 let provider_rw = factory.provider_rw().unwrap();
4594 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4595 provider_rw
4596 .write_state(
4597 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4598 crate::OriginalValuesKnown::No,
4599 StateWriteConfig::default(),
4600 )
4601 .unwrap();
4602 provider_rw.commit().unwrap();
4603
4604 let provider = factory.provider().unwrap();
4606
4607 let result = provider.try_into_history_at_block(0);
4609 assert!(result.is_ok(), "Block 0 should be available");
4610
4611 let provider = factory.provider().unwrap();
4613 let result = provider.try_into_history_at_block(100);
4614
4615 match result {
4617 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4618 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4619 Ok(_) => panic!("Expected error, got Ok"),
4620 }
4621 }
4622
4623 #[test]
4624 fn test_unwind_storage_hashing_with_hashed_state() {
4625 let factory = create_test_provider_factory();
4626 let storage_settings = StorageSettings::v2();
4627 factory.set_storage_settings_cache(storage_settings);
4628
4629 let address = Address::random();
4630 let hashed_address = keccak256(address);
4631
4632 let plain_slot = B256::random();
4633 let hashed_slot = keccak256(plain_slot);
4634
4635 let current_value = U256::from(100);
4636 let old_value = U256::from(42);
4637
4638 let provider_rw = factory.provider_rw().unwrap();
4639 provider_rw
4640 .tx
4641 .cursor_dup_write::<tables::HashedStorages>()
4642 .unwrap()
4643 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4644 .unwrap();
4645
4646 let changesets = vec![(
4647 BlockNumberAddress((1, address)),
4648 StorageEntry { key: plain_slot, value: old_value },
4649 )];
4650
4651 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4652
4653 assert_eq!(result.len(), 1);
4654 assert!(result.contains_key(&hashed_address));
4655 assert!(result[&hashed_address].contains(&hashed_slot));
4656
4657 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4658 let entry = cursor
4659 .seek_by_key_subkey(hashed_address, hashed_slot)
4660 .unwrap()
4661 .expect("entry should exist");
4662 assert_eq!(entry.key, hashed_slot);
4663 assert_eq!(entry.value, old_value);
4664 }
4665
4666 #[test]
4667 fn test_write_and_remove_state_roundtrip_legacy() {
4668 let factory = create_test_provider_factory();
4669 let storage_settings = StorageSettings::v1();
4670 assert!(!storage_settings.use_hashed_state());
4671 factory.set_storage_settings_cache(storage_settings);
4672
4673 let address = Address::with_last_byte(1);
4674 let hashed_address = keccak256(address);
4675 let slot = U256::from(5);
4676 let slot_key = B256::from(slot);
4677 let hashed_slot = keccak256(slot_key);
4678
4679 let mut rng = generators::rng();
4680 let block0 =
4681 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4682 let block1 =
4683 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4684
4685 {
4686 let provider_rw = factory.provider_rw().unwrap();
4687 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4688 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4689 provider_rw
4690 .tx
4691 .cursor_write::<tables::PlainAccountState>()
4692 .unwrap()
4693 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4694 .unwrap();
4695 provider_rw.commit().unwrap();
4696 }
4697
4698 let provider_rw = factory.provider_rw().unwrap();
4699
4700 let mut state_init: BundleStateInit = AddressMap::default();
4701 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4702 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4703 state_init.insert(
4704 address,
4705 (
4706 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4707 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4708 storage_map,
4709 ),
4710 );
4711
4712 let mut reverts_init: RevertsInit = HashMap::default();
4713 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4714 block_reverts.insert(
4715 address,
4716 (
4717 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4718 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4719 ),
4720 );
4721 reverts_init.insert(1, block_reverts);
4722
4723 let execution_outcome =
4724 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4725
4726 provider_rw
4727 .write_state(
4728 &execution_outcome,
4729 OriginalValuesKnown::Yes,
4730 StateWriteConfig {
4731 write_receipts: false,
4732 write_account_changesets: true,
4733 write_storage_changesets: true,
4734 },
4735 )
4736 .unwrap();
4737
4738 let hashed_state =
4739 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4740 provider_rw.write_hashed_state(&hashed_state).unwrap();
4741
4742 let account = provider_rw
4743 .tx
4744 .cursor_read::<tables::PlainAccountState>()
4745 .unwrap()
4746 .seek_exact(address)
4747 .unwrap()
4748 .unwrap()
4749 .1;
4750 assert_eq!(account.nonce, 1);
4751
4752 let storage_entry = provider_rw
4753 .tx
4754 .cursor_dup_read::<tables::PlainStorageState>()
4755 .unwrap()
4756 .seek_by_key_subkey(address, slot_key)
4757 .unwrap()
4758 .unwrap();
4759 assert_eq!(storage_entry.key, slot_key);
4760 assert_eq!(storage_entry.value, U256::from(10));
4761
4762 let hashed_entry = provider_rw
4763 .tx
4764 .cursor_dup_read::<tables::HashedStorages>()
4765 .unwrap()
4766 .seek_by_key_subkey(hashed_address, hashed_slot)
4767 .unwrap()
4768 .unwrap();
4769 assert_eq!(hashed_entry.key, hashed_slot);
4770 assert_eq!(hashed_entry.value, U256::from(10));
4771
4772 let account_cs_entries = provider_rw
4773 .tx
4774 .cursor_dup_read::<tables::AccountChangeSets>()
4775 .unwrap()
4776 .walk(Some(1))
4777 .unwrap()
4778 .collect::<Result<Vec<_>, _>>()
4779 .unwrap();
4780 assert!(!account_cs_entries.is_empty());
4781
4782 let storage_cs_entries = provider_rw
4783 .tx
4784 .cursor_read::<tables::StorageChangeSets>()
4785 .unwrap()
4786 .walk(Some(BlockNumberAddress((1, address))))
4787 .unwrap()
4788 .collect::<Result<Vec<_>, _>>()
4789 .unwrap();
4790 assert!(!storage_cs_entries.is_empty());
4791 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4792
4793 provider_rw.remove_state_above(0).unwrap();
4794
4795 let restored_account = provider_rw
4796 .tx
4797 .cursor_read::<tables::PlainAccountState>()
4798 .unwrap()
4799 .seek_exact(address)
4800 .unwrap()
4801 .unwrap()
4802 .1;
4803 assert_eq!(restored_account.nonce, 0);
4804
4805 let storage_gone = provider_rw
4806 .tx
4807 .cursor_dup_read::<tables::PlainStorageState>()
4808 .unwrap()
4809 .seek_by_key_subkey(address, slot_key)
4810 .unwrap();
4811 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4812
4813 let account_cs_after = provider_rw
4814 .tx
4815 .cursor_dup_read::<tables::AccountChangeSets>()
4816 .unwrap()
4817 .walk(Some(1))
4818 .unwrap()
4819 .collect::<Result<Vec<_>, _>>()
4820 .unwrap();
4821 assert!(account_cs_after.is_empty());
4822
4823 let storage_cs_after = provider_rw
4824 .tx
4825 .cursor_read::<tables::StorageChangeSets>()
4826 .unwrap()
4827 .walk(Some(BlockNumberAddress((1, address))))
4828 .unwrap()
4829 .collect::<Result<Vec<_>, _>>()
4830 .unwrap();
4831 assert!(storage_cs_after.is_empty());
4832 }
4833
4834 #[test]
4835 fn test_unwind_storage_hashing_legacy() {
4836 let factory = create_test_provider_factory();
4837 let storage_settings = StorageSettings::v1();
4838 assert!(!storage_settings.use_hashed_state());
4839 factory.set_storage_settings_cache(storage_settings);
4840
4841 let address = Address::random();
4842 let hashed_address = keccak256(address);
4843
4844 let plain_slot = B256::random();
4845 let hashed_slot = keccak256(plain_slot);
4846
4847 let current_value = U256::from(100);
4848 let old_value = U256::from(42);
4849
4850 let provider_rw = factory.provider_rw().unwrap();
4851 provider_rw
4852 .tx
4853 .cursor_dup_write::<tables::HashedStorages>()
4854 .unwrap()
4855 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4856 .unwrap();
4857
4858 let changesets = vec![(
4859 BlockNumberAddress((1, address)),
4860 StorageEntry { key: plain_slot, value: old_value },
4861 )];
4862
4863 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4864
4865 assert_eq!(result.len(), 1);
4866 assert!(result.contains_key(&hashed_address));
4867 assert!(result[&hashed_address].contains(&hashed_slot));
4868
4869 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4870 let entry = cursor
4871 .seek_by_key_subkey(hashed_address, hashed_slot)
4872 .unwrap()
4873 .expect("entry should exist");
4874 assert_eq!(entry.key, hashed_slot);
4875 assert_eq!(entry.value, old_value);
4876 }
4877
4878 #[test]
4879 fn test_write_state_and_historical_read_hashed() {
4880 use reth_storage_api::StateProvider;
4881 use reth_trie::{HashedPostState, KeccakKeyHasher};
4882 use revm_database::BundleState;
4883 use revm_state::AccountInfo;
4884
4885 let factory = create_test_provider_factory();
4886 factory.set_storage_settings_cache(StorageSettings::v2());
4887
4888 let address = Address::with_last_byte(1);
4889 let slot = U256::from(5);
4890 let slot_key = B256::from(slot);
4891 let hashed_address = keccak256(address);
4892 let hashed_slot = keccak256(slot_key);
4893
4894 {
4895 let sf = factory.static_file_provider();
4896 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4897 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4898 hw.append_header(&h0, &B256::ZERO).unwrap();
4899 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4900 hw.append_header(&h1, &B256::ZERO).unwrap();
4901 hw.commit().unwrap();
4902
4903 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4904 aw.append_account_changeset(vec![], 0).unwrap();
4905 aw.commit().unwrap();
4906
4907 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4908 sw.append_storage_changeset(vec![], 0).unwrap();
4909 sw.commit().unwrap();
4910 }
4911
4912 let provider_rw = factory.provider_rw().unwrap();
4913
4914 let bundle = BundleState::builder(1..=1)
4915 .state_present_account_info(
4916 address,
4917 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4918 )
4919 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4920 .revert_account_info(1, address, Some(None))
4921 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4922 .build();
4923
4924 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4925
4926 provider_rw
4927 .tx
4928 .put::<tables::BlockBodyIndices>(
4929 1,
4930 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4931 )
4932 .unwrap();
4933
4934 provider_rw
4935 .write_state(
4936 &execution_outcome,
4937 OriginalValuesKnown::Yes,
4938 StateWriteConfig {
4939 write_receipts: false,
4940 write_account_changesets: true,
4941 write_storage_changesets: true,
4942 },
4943 )
4944 .unwrap();
4945
4946 let hashed_state =
4947 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4948 provider_rw.write_hashed_state(&hashed_state).unwrap();
4949
4950 let plain_storage_entries = provider_rw
4951 .tx
4952 .cursor_dup_read::<tables::PlainStorageState>()
4953 .unwrap()
4954 .walk(None)
4955 .unwrap()
4956 .collect::<Result<Vec<_>, _>>()
4957 .unwrap();
4958 assert!(plain_storage_entries.is_empty());
4959
4960 let hashed_entry = provider_rw
4961 .tx
4962 .cursor_dup_read::<tables::HashedStorages>()
4963 .unwrap()
4964 .seek_by_key_subkey(hashed_address, hashed_slot)
4965 .unwrap()
4966 .unwrap();
4967 assert_eq!(hashed_entry.key, hashed_slot);
4968 assert_eq!(hashed_entry.value, U256::from(10));
4969
4970 provider_rw.static_file_provider().commit().unwrap();
4971
4972 let sf = factory.static_file_provider();
4973 let storage_cs = sf.storage_changeset(1).unwrap();
4974 assert!(!storage_cs.is_empty());
4975 assert_eq!(storage_cs[0].1.key, slot_key);
4976
4977 let account_cs = sf.account_block_changeset(1).unwrap();
4978 assert!(!account_cs.is_empty());
4979 assert_eq!(account_cs[0].address, address);
4980
4981 let historical_value =
4982 HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
4983 .storage(address, slot_key)
4984 .unwrap();
4985 assert_eq!(historical_value, None);
4986 }
4987
4988 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4989 enum StorageMode {
4990 V1,
4991 V2,
4992 }
4993
4994 fn run_save_blocks_and_verify(mode: StorageMode) {
4995 use alloy_primitives::map::{FbBuildHasher, HashMap};
4996
4997 let factory = create_test_provider_factory();
4998
4999 match mode {
5000 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
5001 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
5002 }
5003
5004 let num_blocks = 3u64;
5005 let accounts_per_block = 5usize;
5006 let slots_per_account = 3usize;
5007
5008 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
5009 SealedHeader::new(
5010 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
5011 B256::ZERO,
5012 ),
5013 Default::default(),
5014 );
5015
5016 let genesis_executed = ExecutedBlock::new(
5017 Arc::new(genesis.try_recover().unwrap()),
5018 Arc::new(BlockExecutionOutput {
5019 result: BlockExecutionResult {
5020 receipts: vec![],
5021 requests: Default::default(),
5022 gas_used: 0,
5023 blob_gas_used: 0,
5024 },
5025 state: Default::default(),
5026 }),
5027 ComputedTrieData::default(),
5028 );
5029 let provider_rw = factory.provider_rw().unwrap();
5030 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5031 provider_rw.commit().unwrap();
5032
5033 let mut blocks: Vec<ExecutedBlock> = Vec::new();
5034 let mut parent_hash = B256::ZERO;
5035
5036 for block_num in 1..=num_blocks {
5037 let mut builder = BundleState::builder(block_num..=block_num);
5038
5039 for acct_idx in 0..accounts_per_block {
5040 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5041 let info = AccountInfo {
5042 nonce: block_num,
5043 balance: U256::from(block_num * 100 + acct_idx as u64),
5044 ..Default::default()
5045 };
5046
5047 let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5048 slots_per_account as u64)
5049 .map(|s| {
5050 (
5051 U256::from(s + acct_idx as u64 * 100),
5052 (U256::ZERO, U256::from(block_num * 1000 + s)),
5053 )
5054 })
5055 .collect();
5056
5057 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5058 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5059 .collect();
5060
5061 builder = builder
5062 .state_present_account_info(address, info)
5063 .revert_account_info(block_num, address, Some(None))
5064 .state_storage(address, storage)
5065 .revert_storage(block_num, address, revert_storage);
5066 }
5067
5068 let bundle = builder.build();
5069
5070 let hashed_state =
5071 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5072
5073 let header = Header {
5074 number: block_num,
5075 parent_hash,
5076 difficulty: U256::from(1),
5077 ..Default::default()
5078 };
5079 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5080 header,
5081 Default::default(),
5082 );
5083 parent_hash = block.hash();
5084
5085 let executed = ExecutedBlock::new(
5086 Arc::new(block.try_recover().unwrap()),
5087 Arc::new(BlockExecutionOutput {
5088 result: BlockExecutionResult {
5089 receipts: vec![],
5090 requests: Default::default(),
5091 gas_used: 0,
5092 blob_gas_used: 0,
5093 },
5094 state: bundle,
5095 }),
5096 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5097 );
5098 blocks.push(executed);
5099 }
5100
5101 let provider_rw = factory.provider_rw().unwrap();
5102 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5103 provider_rw.commit().unwrap();
5104
5105 let provider = factory.provider().unwrap();
5106
5107 for block_num in 1..=num_blocks {
5108 for acct_idx in 0..accounts_per_block {
5109 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5110 let hashed_address = keccak256(address);
5111
5112 let ha_entry = provider
5113 .tx_ref()
5114 .cursor_read::<tables::HashedAccounts>()
5115 .unwrap()
5116 .seek_exact(hashed_address)
5117 .unwrap();
5118 assert!(
5119 ha_entry.is_some(),
5120 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5121 );
5122
5123 for s in 1..=slots_per_account as u64 {
5124 let slot = U256::from(s + acct_idx as u64 * 100);
5125 let slot_key = B256::from(slot);
5126 let hashed_slot = keccak256(slot_key);
5127
5128 let hs_entry = provider
5129 .tx_ref()
5130 .cursor_dup_read::<tables::HashedStorages>()
5131 .unwrap()
5132 .seek_by_key_subkey(hashed_address, hashed_slot)
5133 .unwrap();
5134 assert!(
5135 hs_entry.is_some(),
5136 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5137 );
5138 let entry = hs_entry.unwrap();
5139 assert_eq!(entry.key, hashed_slot);
5140 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5141 }
5142 }
5143 }
5144
5145 for block_num in 1..=num_blocks {
5146 let header = provider.header_by_number(block_num).unwrap();
5147 assert!(header.is_some(), "Header missing for block {block_num}");
5148
5149 let indices = provider.block_body_indices(block_num).unwrap();
5150 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5151 }
5152
5153 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5154 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5155
5156 if mode == StorageMode::V2 {
5157 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5158 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5159
5160 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5161 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5162
5163 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5164 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5165
5166 provider.static_file_provider().commit().unwrap();
5167 let sf = factory.static_file_provider();
5168
5169 for block_num in 1..=num_blocks {
5170 let account_cs = sf.account_block_changeset(block_num).unwrap();
5171 assert!(
5172 !account_cs.is_empty(),
5173 "v2: static file AccountChangeSets should exist for block {block_num}"
5174 );
5175
5176 let storage_cs = sf.storage_changeset(block_num).unwrap();
5177 assert!(
5178 !storage_cs.is_empty(),
5179 "v2: static file StorageChangeSets should exist for block {block_num}"
5180 );
5181
5182 for (_, entry) in &storage_cs {
5183 assert!(
5184 entry.key != keccak256(entry.key),
5185 "v2: static file storage changeset should have plain slot keys"
5186 );
5187 }
5188 }
5189
5190 let rocksdb = factory.rocksdb_provider();
5191 for block_num in 1..=num_blocks {
5192 for acct_idx in 0..accounts_per_block {
5193 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5194 let shards = rocksdb.account_history_shards(address).unwrap();
5195 assert!(
5196 !shards.is_empty(),
5197 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5198 );
5199
5200 for s in 1..=slots_per_account as u64 {
5201 let slot = U256::from(s + acct_idx as u64 * 100);
5202 let slot_key = B256::from(slot);
5203 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5204 assert!(
5205 !shards.is_empty(),
5206 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5207 );
5208 }
5209 }
5210 }
5211 } else {
5212 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5213 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5214
5215 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5216 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5217
5218 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5219 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5220
5221 for block_num in 1..=num_blocks {
5222 let storage_entries: Vec<_> = provider
5223 .tx_ref()
5224 .cursor_dup_read::<tables::StorageChangeSets>()
5225 .unwrap()
5226 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5227 .unwrap()
5228 .collect::<Result<Vec<_>, _>>()
5229 .unwrap();
5230 assert!(
5231 !storage_entries.is_empty(),
5232 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5233 );
5234
5235 for (_, entry) in &storage_entries {
5236 let slot_key = B256::from(entry.key);
5237 assert!(
5238 slot_key != keccak256(slot_key),
5239 "v1: storage changeset keys should be plain (not hashed)"
5240 );
5241 }
5242 }
5243
5244 let mdbx_account_history =
5245 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5246 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5247
5248 let mdbx_storage_history =
5249 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5250 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5251 }
5252 }
5253
5254 #[test]
5255 fn test_save_blocks_v1_table_assertions() {
5256 run_save_blocks_and_verify(StorageMode::V1);
5257 }
5258
5259 #[test]
5260 fn test_save_blocks_v2_table_assertions() {
5261 run_save_blocks_and_verify(StorageMode::V2);
5262 }
5263
5264 #[test]
5265 fn test_write_and_remove_state_roundtrip_v2() {
5266 let factory = create_test_provider_factory();
5267 let storage_settings = StorageSettings::v2();
5268 assert!(storage_settings.use_hashed_state());
5269 factory.set_storage_settings_cache(storage_settings);
5270
5271 let address = Address::with_last_byte(1);
5272 let hashed_address = keccak256(address);
5273 let slot = U256::from(5);
5274 let slot_key = B256::from(slot);
5275 let hashed_slot = keccak256(slot_key);
5276
5277 {
5278 let sf = factory.static_file_provider();
5279 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5280 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5281 hw.append_header(&h0, &B256::ZERO).unwrap();
5282 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5283 hw.append_header(&h1, &B256::ZERO).unwrap();
5284 hw.commit().unwrap();
5285
5286 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5287 aw.append_account_changeset(vec![], 0).unwrap();
5288 aw.commit().unwrap();
5289
5290 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5291 sw.append_storage_changeset(vec![], 0).unwrap();
5292 sw.commit().unwrap();
5293 }
5294
5295 {
5296 let provider_rw = factory.provider_rw().unwrap();
5297 provider_rw
5298 .tx
5299 .put::<tables::BlockBodyIndices>(
5300 0,
5301 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5302 )
5303 .unwrap();
5304 provider_rw
5305 .tx
5306 .put::<tables::BlockBodyIndices>(
5307 1,
5308 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5309 )
5310 .unwrap();
5311 provider_rw
5312 .tx
5313 .cursor_write::<tables::HashedAccounts>()
5314 .unwrap()
5315 .upsert(
5316 hashed_address,
5317 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5318 )
5319 .unwrap();
5320 provider_rw.commit().unwrap();
5321 }
5322
5323 let provider_rw = factory.provider_rw().unwrap();
5324
5325 let bundle = BundleState::builder(1..=1)
5326 .state_present_account_info(
5327 address,
5328 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5329 )
5330 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5331 .revert_account_info(1, address, Some(None))
5332 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5333 .build();
5334
5335 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5336
5337 provider_rw
5338 .write_state(
5339 &execution_outcome,
5340 OriginalValuesKnown::Yes,
5341 StateWriteConfig {
5342 write_receipts: false,
5343 write_account_changesets: true,
5344 write_storage_changesets: true,
5345 },
5346 )
5347 .unwrap();
5348
5349 let hashed_state =
5350 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5351 provider_rw.write_hashed_state(&hashed_state).unwrap();
5352
5353 let hashed_account = provider_rw
5354 .tx
5355 .cursor_read::<tables::HashedAccounts>()
5356 .unwrap()
5357 .seek_exact(hashed_address)
5358 .unwrap()
5359 .unwrap()
5360 .1;
5361 assert_eq!(hashed_account.nonce, 1);
5362
5363 let hashed_entry = provider_rw
5364 .tx
5365 .cursor_dup_read::<tables::HashedStorages>()
5366 .unwrap()
5367 .seek_by_key_subkey(hashed_address, hashed_slot)
5368 .unwrap()
5369 .unwrap();
5370 assert_eq!(hashed_entry.key, hashed_slot);
5371 assert_eq!(hashed_entry.value, U256::from(10));
5372
5373 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5374 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5375
5376 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5377 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5378
5379 provider_rw.static_file_provider().commit().unwrap();
5380
5381 let sf = factory.static_file_provider();
5382 let storage_cs = sf.storage_changeset(1).unwrap();
5383 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5384 assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5385
5386 provider_rw.remove_state_above(0).unwrap();
5387
5388 let restored_account = provider_rw
5389 .tx
5390 .cursor_read::<tables::HashedAccounts>()
5391 .unwrap()
5392 .seek_exact(hashed_address)
5393 .unwrap();
5394 assert!(
5395 restored_account.is_none(),
5396 "v2: account should be removed (didn't exist before block 1)"
5397 );
5398
5399 let storage_gone = provider_rw
5400 .tx
5401 .cursor_dup_read::<tables::HashedStorages>()
5402 .unwrap()
5403 .seek_by_key_subkey(hashed_address, hashed_slot)
5404 .unwrap();
5405 assert!(
5406 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5407 "v2: storage should be reverted (removed or different key)"
5408 );
5409
5410 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5411 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5412
5413 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5414 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5415 }
5416
5417 #[test]
5418 fn test_unwind_storage_history_indices_v2() {
5419 let factory = create_test_provider_factory();
5420 factory.set_storage_settings_cache(StorageSettings::v2());
5421
5422 let address = Address::with_last_byte(1);
5423 let slot_key = B256::from(U256::from(42));
5424
5425 {
5426 let rocksdb = factory.rocksdb_provider();
5427 let mut batch = rocksdb.batch();
5428 batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5429 batch.commit().unwrap();
5430
5431 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5432 assert!(!shards.is_empty(), "history should be written to rocksdb");
5433 }
5434
5435 let provider_rw = factory.provider_rw().unwrap();
5436
5437 let changesets = vec![
5438 (
5439 BlockNumberAddress((7, address)),
5440 StorageEntry { key: slot_key, value: U256::from(5) },
5441 ),
5442 (
5443 BlockNumberAddress((10, address)),
5444 StorageEntry { key: slot_key, value: U256::from(8) },
5445 ),
5446 ];
5447
5448 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5449 assert_eq!(count, 2);
5450
5451 provider_rw.commit().unwrap();
5452
5453 let rocksdb = factory.rocksdb_provider();
5454 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5455
5456 assert!(
5457 !shards.is_empty(),
5458 "history shards should still exist with block 3 after partial unwind"
5459 );
5460
5461 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5462 assert!(all_blocks.contains(&3), "block 3 should remain");
5463 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5464 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5465 }
5466}