1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, StorageKey, StorageValue, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::{Database, ReaderTxnTracker},
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55 SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateReader, StateWriteConfig, StorageChangeSetReader,
65 StoragePath, StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use smallvec::SmallVec;
77use std::{
78 cmp::Ordering,
79 collections::{BTreeMap, BTreeSet},
80 fmt::Debug,
81 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
82 path::PathBuf,
83 sync::Arc,
84};
85use tracing::{debug, instrument, trace};
86
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
89pub enum CommitOrder {
90 #[default]
92 Normal,
93 Unwind,
96}
97
98impl CommitOrder {
99 pub const fn is_unwind(&self) -> bool {
101 matches!(self, Self::Unwind)
102 }
103}
104
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
107
108#[derive(Debug)]
113pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
114 pub DatabaseProvider<<DB as Database>::TXMut, N>,
115);
116
117impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
118 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
119
120 fn deref(&self) -> &Self::Target {
121 &self.0
122 }
123}
124
125impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
126 fn deref_mut(&mut self) -> &mut Self::Target {
127 &mut self.0
128 }
129}
130
131impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
132 for DatabaseProviderRW<DB, N>
133{
134 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
135 &self.0
136 }
137}
138
139impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
140 pub fn commit(self) -> ProviderResult<()> {
142 self.0.commit()
143 }
144
145 pub fn into_tx(self) -> <DB as Database>::TXMut {
147 self.0.into_tx()
148 }
149
150 #[cfg(any(test, feature = "test-utils"))]
152 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
153 self.0.minimum_pruning_distance = distance;
154 self
155 }
156}
157
158impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
159 for DatabaseProvider<<DB as Database>::TXMut, N>
160{
161 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
162 provider.0
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum SaveBlocksMode {
169 Full,
172 BlocksOnly,
176}
177
178impl SaveBlocksMode {
179 pub const fn with_state(self) -> bool {
181 matches!(self, Self::Full)
182 }
183}
184
185pub struct DatabaseProvider<TX, N: NodeTypes> {
188 tx: TX,
190 chain_spec: Arc<N::ChainSpec>,
192 static_file_provider: StaticFileProvider<N::Primitives>,
194 prune_modes: PruneModes,
196 storage: Arc<N::Storage>,
198 storage_settings: Arc<RwLock<StorageSettings>>,
200 rocksdb_provider: RocksDBProvider,
202 changeset_cache: ChangesetCache,
204 runtime: reth_tasks::Runtime,
206 db_path: PathBuf,
208 pending_rocksdb_batches: PendingRocksDBBatches,
210 commit_order: CommitOrder,
212 minimum_pruning_distance: u64,
214 metrics: metrics::DatabaseProviderMetrics,
216 reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
218}
219
220impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 let mut s = f.debug_struct("DatabaseProvider");
223 s.field("tx", &self.tx)
224 .field("chain_spec", &self.chain_spec)
225 .field("static_file_provider", &self.static_file_provider)
226 .field("prune_modes", &self.prune_modes)
227 .field("storage", &self.storage)
228 .field("storage_settings", &self.storage_settings)
229 .field("rocksdb_provider", &self.rocksdb_provider)
230 .field("changeset_cache", &self.changeset_cache)
231 .field("runtime", &self.runtime)
232 .field("pending_rocksdb_batches", &"<pending batches>")
233 .field("commit_order", &self.commit_order)
234 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
235 .field("reader_txn_tracker", &"<reader txn tracker>")
236 .finish()
237 }
238}
239
240impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
241 pub const fn prune_modes_ref(&self) -> &PruneModes {
243 &self.prune_modes
244 }
245
246 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
248 self.minimum_pruning_distance = distance;
249 self
250 }
251
252 pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
254 where
255 T: ReaderTxnTracker + 'static,
256 {
257 self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
258 self
259 }
260}
261
262impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
263 fn commit_unwind(self) -> ProviderResult<()> {
274 let storage_v2 = self.cached_storage_settings().storage_v2;
275 let reader_txn_tracker = self.reader_txn_tracker.clone();
276 self.tx.commit()?;
277
278 if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
279 reader_txn_tracker.wait_for_pre_commit_readers();
280 }
281
282 if storage_v2 {
283 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
284 for batch in batches {
285 self.rocksdb_provider.commit_batch(batch)?;
286 }
287 }
288
289 self.static_file_provider.commit()?;
290 Ok(())
291 }
292
293 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
295 trace!(target: "providers::db", "Returning latest state provider");
296 Box::new(LatestStateProviderRef::new(self))
297 }
298
299 pub fn history_by_block_hash<'a>(
301 &'a self,
302 block_hash: BlockHash,
303 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
304 let block_number =
305 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
306 self.history_by_block_number(block_number)
307 }
308
309 pub fn history_by_block_number<'a>(
311 &'a self,
312 mut block_number: BlockNumber,
313 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
314 if block_number == self.best_block_number().unwrap_or_default() &&
315 block_number == self.last_block_number().unwrap_or_default()
316 {
317 return Ok(Box::new(LatestStateProviderRef::new(self)))
318 }
319
320 block_number += 1;
322
323 let account_history_prune_checkpoint =
324 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
325 let storage_history_prune_checkpoint =
326 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
327
328 let mut state_provider =
329 HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
330 if let Some(prune_checkpoint_block_number) =
333 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
334 {
335 state_provider = state_provider.with_lowest_available_account_history_block_number(
336 prune_checkpoint_block_number + 1,
337 );
338 }
339 if let Some(prune_checkpoint_block_number) =
340 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
341 {
342 state_provider = state_provider.with_lowest_available_storage_history_block_number(
343 prune_checkpoint_block_number + 1,
344 );
345 }
346
347 Ok(Box::new(state_provider))
348 }
349
350 #[cfg(feature = "test-utils")]
351 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
353 self.prune_modes = prune_modes;
354 }
355}
356
357impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
358 type Primitives = N::Primitives;
359}
360
361impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
362 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
364 self.static_file_provider.clone()
365 }
366
367 fn get_static_file_writer(
368 &self,
369 block: BlockNumber,
370 segment: StaticFileSegment,
371 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
372 self.static_file_provider.get_writer(block, segment)
373 }
374}
375
376impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
377 fn rocksdb_provider(&self) -> RocksDBProvider {
379 self.rocksdb_provider.clone()
380 }
381
382 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
383 self.pending_rocksdb_batches.lock().push(batch);
384 }
385
386 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
387 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
388 for batch in batches {
389 self.rocksdb_provider.commit_batch(batch)?;
390 }
391 Ok(())
392 }
393}
394
395impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
396 for DatabaseProvider<TX, N>
397{
398 type ChainSpec = N::ChainSpec;
399
400 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
401 self.chain_spec.clone()
402 }
403}
404
405impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
406 #[expect(clippy::too_many_arguments)]
408 fn new_rw_inner(
409 tx: TX,
410 chain_spec: Arc<N::ChainSpec>,
411 static_file_provider: StaticFileProvider<N::Primitives>,
412 prune_modes: PruneModes,
413 storage: Arc<N::Storage>,
414 storage_settings: Arc<RwLock<StorageSettings>>,
415 rocksdb_provider: RocksDBProvider,
416 changeset_cache: ChangesetCache,
417 runtime: reth_tasks::Runtime,
418 db_path: PathBuf,
419 commit_order: CommitOrder,
420 ) -> Self {
421 Self {
422 tx,
423 chain_spec,
424 static_file_provider,
425 prune_modes,
426 storage,
427 storage_settings,
428 rocksdb_provider,
429 changeset_cache,
430 runtime,
431 db_path,
432 pending_rocksdb_batches: Default::default(),
433 commit_order,
434 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
435 metrics: metrics::DatabaseProviderMetrics::default(),
436 reader_txn_tracker: None,
437 }
438 }
439
440 #[expect(clippy::too_many_arguments)]
442 pub fn new_rw(
443 tx: TX,
444 chain_spec: Arc<N::ChainSpec>,
445 static_file_provider: StaticFileProvider<N::Primitives>,
446 prune_modes: PruneModes,
447 storage: Arc<N::Storage>,
448 storage_settings: Arc<RwLock<StorageSettings>>,
449 rocksdb_provider: RocksDBProvider,
450 changeset_cache: ChangesetCache,
451 runtime: reth_tasks::Runtime,
452 db_path: PathBuf,
453 ) -> Self {
454 Self::new_rw_inner(
455 tx,
456 chain_spec,
457 static_file_provider,
458 prune_modes,
459 storage,
460 storage_settings,
461 rocksdb_provider,
462 changeset_cache,
463 runtime,
464 db_path,
465 CommitOrder::Normal,
466 )
467 }
468
469 #[expect(clippy::too_many_arguments)]
471 pub fn new_unwind_rw(
472 tx: TX,
473 chain_spec: Arc<N::ChainSpec>,
474 static_file_provider: StaticFileProvider<N::Primitives>,
475 prune_modes: PruneModes,
476 storage: Arc<N::Storage>,
477 storage_settings: Arc<RwLock<StorageSettings>>,
478 rocksdb_provider: RocksDBProvider,
479 changeset_cache: ChangesetCache,
480 runtime: reth_tasks::Runtime,
481 db_path: PathBuf,
482 ) -> Self {
483 Self::new_rw_inner(
484 tx,
485 chain_spec,
486 static_file_provider,
487 prune_modes,
488 storage,
489 storage_settings,
490 rocksdb_provider,
491 changeset_cache,
492 runtime,
493 db_path,
494 CommitOrder::Unwind,
495 )
496 }
497}
498
499impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
500 fn as_ref(&self) -> &Self {
501 self
502 }
503}
504
505impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
506 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
510 where
511 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
512 {
513 let rocksdb = self.rocksdb_provider();
514 let rocksdb_batch = rocksdb.batch();
515
516 let (result, raw_batch) = f(rocksdb_batch)?;
517
518 if let Some(batch) = raw_batch {
519 self.set_pending_rocksdb_batch(batch);
520 }
521 let _ = raw_batch; Ok(result)
524 }
525
526 fn static_file_write_ctx(
528 &self,
529 save_mode: SaveBlocksMode,
530 first_block: BlockNumber,
531 last_block: BlockNumber,
532 ) -> ProviderResult<StaticFileWriteCtx> {
533 let tip = self.last_block_number()?.max(last_block);
534 Ok(StaticFileWriteCtx {
535 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
536 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
537 write_receipts: save_mode.with_state() &&
538 EitherWriter::receipts_destination(self).is_static_file(),
539 write_account_changesets: save_mode.with_state() &&
540 EitherWriterDestination::account_changesets(self).is_static_file(),
541 write_storage_changesets: save_mode.with_state() &&
542 EitherWriterDestination::storage_changesets(self).is_static_file(),
543 tip,
544 receipts_prune_mode: self.prune_modes.receipts,
545 receipts_prunable: self
547 .static_file_provider
548 .get_highest_static_file_tx(StaticFileSegment::Receipts)
549 .is_none() &&
550 PruneMode::Distance(self.minimum_pruning_distance)
551 .should_prune(first_block, tip),
552 })
553 }
554
555 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
557 RocksDBWriteCtx {
558 first_block_number: first_block,
559 prune_tx_lookup: self.prune_modes.transaction_lookup,
560 storage_settings: self.cached_storage_settings(),
561 pending_batches: self.pending_rocksdb_batches.clone(),
562 }
563 }
564
565 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
574 pub fn save_blocks(
575 &self,
576 blocks: Vec<ExecutedBlock<N::Primitives>>,
577 save_mode: SaveBlocksMode,
578 ) -> ProviderResult<()> {
579 if blocks.is_empty() {
580 debug!(target: "providers::db", "Attempted to write empty block range");
581 return Ok(())
582 }
583
584 let total_start = Instant::now();
585 let block_count = blocks.len() as u64;
586 let first_number = blocks.first().unwrap().recovered_block().number();
587 let last_block_number = blocks.last().unwrap().recovered_block().number();
588
589 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
590
591 let first_tx_num = self
593 .tx
594 .cursor_read::<tables::TransactionBlocks>()?
595 .last()?
596 .map(|(n, _)| n + 1)
597 .unwrap_or_default();
598
599 let tx_nums: SmallVec<[TxNumber; 4]> = {
600 let mut nums = SmallVec::with_capacity(blocks.len());
601 let mut current = first_tx_num;
602 for block in &blocks {
603 nums.push(current);
604 current += block.recovered_block().body().transaction_count() as u64;
605 }
606 nums
607 };
608
609 let mut timings =
610 metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
611
612 let sf_provider = &self.static_file_provider;
614 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
615 let rocksdb_provider = self.rocksdb_provider.clone();
616 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
617 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
618
619 let mut sf_result = None;
620 let mut rocksdb_result = None;
621
622 let runtime = &self.runtime;
624 let span = tracing::Span::current();
627 runtime.storage_pool().in_place_scope(|s| {
628 s.spawn(|_| {
630 let _guard = span.enter();
631 let start = Instant::now();
632 sf_result = Some(
633 sf_provider
634 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
635 .map(|()| start.elapsed()),
636 );
637 });
638
639 if rocksdb_enabled {
641 s.spawn(|_| {
642 let _guard = span.enter();
643 let start = Instant::now();
644 rocksdb_result = Some(
645 rocksdb_provider
646 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
647 .map(|()| start.elapsed()),
648 );
649 });
650 }
651
652 let mdbx_start = Instant::now();
654
655 if !self.cached_storage_settings().storage_v2 &&
657 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
658 {
659 let start = Instant::now();
660 let total_tx_count: usize =
661 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
662 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
663 for (i, block) in blocks.iter().enumerate() {
664 let recovered_block = block.recovered_block();
665 for (tx_num, transaction) in
666 (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
667 {
668 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
669 }
670 }
671
672 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
674
675 self.with_rocksdb_batch(|batch| {
677 let mut tx_hash_writer =
678 EitherWriter::new_transaction_hash_numbers(self, batch)?;
679 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
680 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
681 Ok(((), raw_batch))
682 })?;
683 self.metrics.record_duration(
684 metrics::Action::InsertTransactionHashNumbers,
685 start.elapsed(),
686 );
687 }
688
689 for (i, block) in blocks.iter().enumerate() {
690 let recovered_block = block.recovered_block();
691
692 let start = Instant::now();
693 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
694 timings.insert_block += start.elapsed();
695
696 if save_mode.with_state() {
697 let execution_output = block.execution_outcome();
698
699 let start = Instant::now();
703 self.write_state(
704 WriteStateInput::Single {
705 outcome: execution_output,
706 block: recovered_block.number(),
707 },
708 OriginalValuesKnown::No,
709 StateWriteConfig {
710 write_receipts: !sf_ctx.write_receipts,
711 write_account_changesets: !sf_ctx.write_account_changesets,
712 write_storage_changesets: !sf_ctx.write_storage_changesets,
713 },
714 )?;
715 timings.write_state += start.elapsed();
716 }
717 }
718
719 if save_mode.with_state() {
722 let start = Instant::now();
724 let merged_hashed_state = HashedPostStateSorted::merge_batch(
725 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
726 );
727 if !merged_hashed_state.is_empty() {
728 self.write_hashed_state(&merged_hashed_state)?;
729 }
730 timings.write_hashed_state += start.elapsed();
731
732 let start = Instant::now();
733 let merged_trie =
734 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
735 if !merged_trie.is_empty() {
736 self.write_trie_updates_sorted(&merged_trie)?;
737 }
738 timings.write_trie_updates += start.elapsed();
739 }
740
741 if save_mode.with_state() {
743 let start = Instant::now();
744 self.update_history_indices(first_number..=last_block_number)?;
745 timings.update_history_indices = start.elapsed();
746 }
747
748 let start = Instant::now();
750 self.update_pipeline_stages(last_block_number, false)?;
751 timings.update_pipeline_stages = start.elapsed();
752
753 timings.mdbx = mdbx_start.elapsed();
754
755 Ok::<_, ProviderError>(())
756 })?;
757
758 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
760
761 if rocksdb_enabled {
762 timings.rocksdb = rocksdb_result.ok_or_else(|| {
763 ProviderError::Database(reth_db_api::DatabaseError::Other(
764 "RocksDB thread panicked".into(),
765 ))
766 })??;
767 }
768
769 timings.total = total_start.elapsed();
770
771 self.metrics.record_save_blocks(&timings);
772 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
773
774 Ok(())
775 }
776
777 #[instrument(level = "debug", target = "providers::db", skip_all)]
781 fn insert_block_mdbx_only(
782 &self,
783 block: &RecoveredBlock<BlockTy<N>>,
784 first_tx_num: TxNumber,
785 ) -> ProviderResult<StoredBlockBodyIndices> {
786 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
787 EitherWriterDestination::senders(self).is_database()
788 {
789 let start = Instant::now();
790 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
791 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
792 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
793 cursor.append(tx_num, &sender)?;
794 }
795 self.metrics
796 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
797 }
798
799 let block_number = block.number();
800 let tx_count = block.body().transaction_count() as u64;
801
802 let start = Instant::now();
803 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
804 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
805
806 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
807
808 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
809 }
810
811 fn write_block_body_indices(
814 &self,
815 block_number: BlockNumber,
816 body: &BodyTy<N>,
817 first_tx_num: TxNumber,
818 tx_count: u64,
819 ) -> ProviderResult<()> {
820 let start = Instant::now();
822 self.tx
823 .cursor_write::<tables::BlockBodyIndices>()?
824 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
825 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
826
827 if tx_count > 0 {
829 let start = Instant::now();
830 self.tx
831 .cursor_write::<tables::TransactionBlocks>()?
832 .append(first_tx_num + tx_count - 1, &block_number)?;
833 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
834 }
835
836 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
838
839 Ok(())
840 }
841
842 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
847 let changed_accounts = self.account_changesets_range(from..)?;
848
849 self.unwind_account_hashing(changed_accounts.iter())?;
851
852 self.unwind_account_history_indices(changed_accounts.iter())?;
854
855 let changed_storages = self.storage_changesets_range(from..)?;
856
857 self.unwind_storage_hashing(changed_storages.iter().copied())?;
859
860 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
862
863 let db_tip_block = self
866 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
867 .as_ref()
868 .map(|chk| chk.block_number)
869 .ok_or_else(|| ProviderError::InsufficientChangesets {
870 requested: from,
871 available: 0..=0,
872 })?;
873
874 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
875 self.write_trie_updates_sorted(&trie_revert)?;
876
877 Ok(())
878 }
879
880 fn remove_receipts_from(
882 &self,
883 from_tx: TxNumber,
884 last_block: BlockNumber,
885 ) -> ProviderResult<()> {
886 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
888
889 if EitherWriter::receipts_destination(self).is_static_file() {
890 let static_file_receipt_num =
891 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
892
893 let to_delete = static_file_receipt_num
894 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
895 .unwrap_or_default();
896
897 self.static_file_provider
898 .latest_writer(StaticFileSegment::Receipts)?
899 .prune_receipts(to_delete, last_block)?;
900 }
901
902 Ok(())
903 }
904
905 fn write_bytecodes(
907 &self,
908 bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
909 ) -> ProviderResult<()> {
910 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
911 for (hash, bytecode) in bytecodes {
912 bytecodes_cursor.upsert(hash, &bytecode)?;
913 }
914 Ok(())
915 }
916}
917
918impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
919 fn try_into_history_at_block(
920 self,
921 mut block_number: BlockNumber,
922 ) -> ProviderResult<StateProviderBox> {
923 let best_block = self.best_block_number().unwrap_or_default();
924
925 if block_number > best_block {
927 return Err(ProviderError::BlockNotExecuted {
928 requested: block_number,
929 executed: best_block,
930 });
931 }
932
933 if block_number == best_block {
935 return Ok(Box::new(LatestStateProvider::new(self)));
936 }
937
938 block_number += 1;
940
941 let account_history_prune_checkpoint =
942 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
943 let storage_history_prune_checkpoint =
944 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
945 let changeset_cache = self.changeset_cache.clone();
946
947 let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
948
949 if let Some(prune_checkpoint_block_number) =
952 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
953 {
954 state_provider = state_provider.with_lowest_available_account_history_block_number(
955 prune_checkpoint_block_number + 1,
956 );
957 }
958 if let Some(prune_checkpoint_block_number) =
959 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
960 {
961 state_provider = state_provider.with_lowest_available_storage_history_block_number(
962 prune_checkpoint_block_number + 1,
963 );
964 }
965
966 Ok(Box::new(state_provider))
967 }
968}
969
970fn unwind_history_shards<S, T, C>(
985 cursor: &mut C,
986 start_key: T::Key,
987 block_number: BlockNumber,
988 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
989) -> ProviderResult<Vec<u64>>
990where
991 T: Table<Value = BlockNumberList>,
992 T::Key: AsRef<ShardedKey<S>>,
993 C: DbCursorRO<T> + DbCursorRW<T>,
994{
995 let mut item = cursor.seek_exact(start_key)?;
997 while let Some((sharded_key, list)) = item {
998 if !shard_belongs_to_key(&sharded_key) {
1000 break
1001 }
1002
1003 cursor.delete_current()?;
1006
1007 let first = list.iter().next().expect("List can't be empty");
1010
1011 if first >= block_number {
1014 item = cursor.prev()?;
1015 continue
1016 }
1017 else if block_number <= sharded_key.as_ref().highest_block_number {
1020 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1023 }
1024 return Ok(list.iter().collect::<Vec<_>>())
1027 }
1028
1029 Ok(Vec::new())
1031}
1032
1033impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1034 #[expect(clippy::too_many_arguments)]
1036 pub fn new(
1037 tx: TX,
1038 chain_spec: Arc<N::ChainSpec>,
1039 static_file_provider: StaticFileProvider<N::Primitives>,
1040 prune_modes: PruneModes,
1041 storage: Arc<N::Storage>,
1042 storage_settings: Arc<RwLock<StorageSettings>>,
1043 rocksdb_provider: RocksDBProvider,
1044 changeset_cache: ChangesetCache,
1045 runtime: reth_tasks::Runtime,
1046 db_path: PathBuf,
1047 ) -> Self {
1048 Self {
1049 tx,
1050 chain_spec,
1051 static_file_provider,
1052 prune_modes,
1053 storage,
1054 storage_settings,
1055 rocksdb_provider,
1056 changeset_cache,
1057 runtime,
1058 db_path,
1059 pending_rocksdb_batches: Default::default(),
1060 commit_order: CommitOrder::Normal,
1061 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1062 metrics: metrics::DatabaseProviderMetrics::default(),
1063 reader_txn_tracker: None,
1064 }
1065 }
1066
1067 pub fn into_tx(self) -> TX {
1069 self.tx
1070 }
1071
1072 pub const fn tx_mut(&mut self) -> &mut TX {
1074 &mut self.tx
1075 }
1076
1077 pub const fn tx_ref(&self) -> &TX {
1079 &self.tx
1080 }
1081
1082 pub fn chain_spec(&self) -> &N::ChainSpec {
1084 &self.chain_spec
1085 }
1086}
1087
1088impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1089 fn recovered_block<H, HF, B, BF>(
1090 &self,
1091 id: BlockHashOrNumber,
1092 _transaction_kind: TransactionVariant,
1093 header_by_number: HF,
1094 construct_block: BF,
1095 ) -> ProviderResult<Option<B>>
1096 where
1097 H: AsRef<HeaderTy<N>>,
1098 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1099 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1100 {
1101 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1102 let earliest_available = self.static_file_provider.earliest_history_height();
1103 if block_number < earliest_available {
1104 return Err(ProviderError::BlockExpired { requested: block_number, earliest_available })
1105 }
1106 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1107
1108 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1115
1116 let tx_range = body.tx_num_range();
1117
1118 let transactions = if tx_range.is_empty() {
1119 vec![]
1120 } else {
1121 self.transactions_by_tx_range(tx_range.clone())?
1122 };
1123
1124 let body = self
1125 .storage
1126 .reader()
1127 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1128 .pop()
1129 .ok_or(ProviderError::InvalidStorageOutput)?;
1130
1131 let senders = if tx_range.is_empty() {
1132 vec![]
1133 } else {
1134 let known_senders: HashMap<TxNumber, Address> =
1135 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1136
1137 let mut senders = Vec::with_capacity(body.transactions().len());
1138 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1139 match known_senders.get(&tx_num) {
1140 None => {
1141 let sender = tx.recover_signer_unchecked()?;
1142 senders.push(sender);
1143 }
1144 Some(sender) => senders.push(*sender),
1145 }
1146 }
1147 senders
1148 };
1149
1150 construct_block(header, body, senders)
1151 }
1152
1153 fn block_range<F, H, HF, R>(
1163 &self,
1164 range: RangeInclusive<BlockNumber>,
1165 headers_range: HF,
1166 mut assemble_block: F,
1167 ) -> ProviderResult<Vec<R>>
1168 where
1169 H: AsRef<HeaderTy<N>>,
1170 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1171 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1172 {
1173 if range.is_empty() {
1174 return Ok(Vec::new())
1175 }
1176
1177 let len = range.end().saturating_sub(*range.start()) as usize + 1;
1178 let mut blocks = Vec::with_capacity(len);
1179
1180 let headers = headers_range(range.clone())?;
1181
1182 let present_headers = self
1188 .block_body_indices_range(range)?
1189 .into_iter()
1190 .map(|b| b.tx_num_range())
1191 .zip(headers)
1192 .collect::<Vec<_>>();
1193
1194 let mut inputs = Vec::with_capacity(present_headers.len());
1195 for (tx_range, header) in &present_headers {
1196 let transactions = if tx_range.is_empty() {
1197 Vec::new()
1198 } else {
1199 self.transactions_by_tx_range(tx_range.clone())?
1200 };
1201
1202 inputs.push((header.as_ref(), transactions));
1203 }
1204
1205 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1206
1207 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1208 blocks.push(assemble_block(header, body, tx_range)?);
1209 }
1210
1211 Ok(blocks)
1212 }
1213
1214 fn block_with_senders_range<H, HF, B, BF>(
1225 &self,
1226 range: RangeInclusive<BlockNumber>,
1227 headers_range: HF,
1228 assemble_block: BF,
1229 ) -> ProviderResult<Vec<B>>
1230 where
1231 H: AsRef<HeaderTy<N>>,
1232 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1233 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1234 {
1235 self.block_range(range, headers_range, |header, body, tx_range| {
1236 let senders = if tx_range.is_empty() {
1237 Vec::new()
1238 } else {
1239 let known_senders: HashMap<TxNumber, Address> =
1240 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1241
1242 let mut senders = Vec::with_capacity(body.transactions().len());
1243 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1244 match known_senders.get(&tx_num) {
1245 None => {
1246 let sender = tx.recover_signer_unchecked()?;
1248 senders.push(sender);
1249 }
1250 Some(sender) => senders.push(*sender),
1251 }
1252 }
1253
1254 senders
1255 };
1256
1257 assemble_block(header, body, senders)
1258 })
1259 }
1260
1261 fn populate_bundle_state(
1265 &self,
1266 account_changeset: Vec<(u64, AccountBeforeTx)>,
1267 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1268 mut get_account: impl FnMut(Address) -> ProviderResult<Option<Account>>,
1269 mut get_storage: impl FnMut(Address, StorageKey) -> ProviderResult<Option<StorageValue>>,
1270 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1271 let mut state: BundleStateInit = HashMap::default();
1275
1276 let mut reverts: RevertsInit = HashMap::default();
1282
1283 for (block_number, account_before) in account_changeset.into_iter().rev() {
1285 let AccountBeforeTx { info: old_info, address } = account_before;
1286 match state.entry(address) {
1287 hash_map::Entry::Vacant(entry) => {
1288 let new_info = get_account(address)?;
1289 entry.insert((old_info, new_info, HashMap::default()));
1290 }
1291 hash_map::Entry::Occupied(mut entry) => {
1292 entry.get_mut().0 = old_info;
1294 }
1295 }
1296 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1298 }
1299
1300 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1302 let BlockNumberAddress((block_number, address)) = block_and_address;
1303 let account_state = match state.entry(address) {
1305 hash_map::Entry::Vacant(entry) => {
1306 let present_info = get_account(address)?;
1307 entry.insert((present_info, present_info, HashMap::default()))
1308 }
1309 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1310 };
1311
1312 match account_state.2.entry(old_storage.key) {
1314 hash_map::Entry::Vacant(entry) => {
1315 let new_storage = get_storage(address, old_storage.key)?.unwrap_or_default();
1316 entry.insert((old_storage.value, new_storage));
1317 }
1318 hash_map::Entry::Occupied(mut entry) => {
1319 entry.get_mut().0 = old_storage.value;
1320 }
1321 };
1322
1323 reverts
1324 .entry(block_number)
1325 .or_default()
1326 .entry(address)
1327 .or_default()
1328 .1
1329 .push(old_storage);
1330 }
1331
1332 Ok((state, reverts))
1333 }
1334
1335 fn populate_bundle_state_plain(
1338 &self,
1339 account_changeset: Vec<(u64, AccountBeforeTx)>,
1340 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1341 plain_accounts_cursor: &mut impl DbCursorRO<tables::PlainAccountState>,
1342 plain_storage_cursor: &mut impl DbDupCursorRO<tables::PlainStorageState>,
1343 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1344 self.populate_bundle_state(
1345 account_changeset,
1346 storage_changeset,
1347 |address| Ok(plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1)),
1348 |address, storage_key| {
1349 Ok(plain_storage_cursor
1350 .seek_by_key_subkey(address, storage_key)?
1351 .filter(|s| s.key == storage_key)
1352 .map(|s| s.value))
1353 },
1354 )
1355 }
1356
1357 fn populate_bundle_state_hashed(
1362 &self,
1363 account_changeset: Vec<(u64, AccountBeforeTx)>,
1364 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1365 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1366 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1367 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1368 self.populate_bundle_state(
1369 account_changeset,
1370 storage_changeset,
1371 |address| Ok(hashed_accounts_cursor.seek_exact(keccak256(address))?.map(|kv| kv.1)),
1372 |address, storage_key| {
1373 let hashed_storage_key = keccak256(storage_key);
1374 Ok(hashed_storage_cursor
1375 .seek_by_key_subkey(keccak256(address), hashed_storage_key)?
1376 .filter(|s| s.key == hashed_storage_key)
1377 .map(|s| s.value))
1378 },
1379 )
1380 }
1381
1382 fn populate_bundle_state_with_provider(
1383 &self,
1384 account_changeset: Vec<(u64, AccountBeforeTx)>,
1385 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1386 state_provider: impl StateProvider,
1387 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1388 self.populate_bundle_state(
1389 account_changeset,
1390 storage_changeset,
1391 |address| state_provider.basic_account(&address),
1392 |address, storage_key| state_provider.storage(address, storage_key),
1393 )
1394 }
1395}
1396
1397impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1398 fn append_history_index<P, T>(
1406 &self,
1407 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1408 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1409 ) -> ProviderResult<()>
1410 where
1411 P: Copy,
1412 T: Table<Value = BlockNumberList>,
1413 {
1414 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1417
1418 let mut cursor = self.tx.cursor_write::<T>()?;
1419
1420 for (partial_key, indices) in index_updates {
1421 let last_key = sharded_key_factory(partial_key, u64::MAX);
1422 let mut last_shard = cursor
1423 .seek_exact(last_key.clone())?
1424 .map(|(_, list)| list)
1425 .unwrap_or_else(BlockNumberList::empty);
1426
1427 last_shard.append(indices).map_err(ProviderError::other)?;
1428
1429 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1431 cursor.upsert(last_key, &last_shard)?;
1432 continue;
1433 }
1434
1435 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1437 let mut chunks_peekable = chunks.into_iter().peekable();
1438
1439 while let Some(chunk) = chunks_peekable.next() {
1440 let shard = BlockNumberList::new_pre_sorted(chunk);
1441 let highest_block_number = if chunks_peekable.peek().is_some() {
1442 shard.iter().next_back().expect("`chunks` does not return empty list")
1443 } else {
1444 u64::MAX
1446 };
1447
1448 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1449 }
1450 }
1451
1452 Ok(())
1453 }
1454}
1455
1456impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1457 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1458 if self.cached_storage_settings().use_hashed_state() {
1459 let hashed_address = keccak256(address);
1460 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1461 } else {
1462 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1463 }
1464 }
1465}
1466
1467impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1468 fn changed_accounts_with_range(
1469 &self,
1470 range: RangeInclusive<BlockNumber>,
1471 ) -> ProviderResult<BTreeSet<Address>> {
1472 let mut reader = EitherReader::new_account_changesets(self)?;
1473
1474 reader.changed_accounts_with_range(range)
1475 }
1476
1477 fn basic_accounts(
1478 &self,
1479 iter: impl IntoIterator<Item = Address>,
1480 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1481 if self.cached_storage_settings().use_hashed_state() {
1482 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1483 Ok(iter
1484 .into_iter()
1485 .map(|address| {
1486 let hashed_address = keccak256(address);
1487 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1488 })
1489 .collect::<Result<Vec<_>, _>>()?)
1490 } else {
1491 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1492 Ok(iter
1493 .into_iter()
1494 .map(|address| {
1495 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1496 })
1497 .collect::<Result<Vec<_>, _>>()?)
1498 }
1499 }
1500
1501 fn changed_accounts_and_blocks_with_range(
1502 &self,
1503 range: RangeInclusive<BlockNumber>,
1504 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1505 let highest_static_block = self
1506 .static_file_provider
1507 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1508
1509 if let Some(highest) = highest_static_block &&
1510 self.cached_storage_settings().storage_v2
1511 {
1512 let start = *range.start();
1513 let static_end = (*range.end()).min(highest);
1514
1515 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1516 if start <= static_end {
1517 for block in start..=static_end {
1518 let block_changesets = self.account_block_changeset(block)?;
1519 for changeset in block_changesets {
1520 changed_accounts_and_blocks
1521 .entry(changeset.address)
1522 .or_default()
1523 .push(block);
1524 }
1525 }
1526 }
1527
1528 Ok(changed_accounts_and_blocks)
1529 } else {
1530 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1531
1532 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1533 BTreeMap::new(),
1534 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1535 let (index, account) = entry?;
1536 accounts.entry(account.address).or_default().push(index);
1537 Ok(accounts)
1538 },
1539 )?;
1540
1541 Ok(account_transitions)
1542 }
1543 }
1544}
1545
1546impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1547 fn storage_changeset(
1548 &self,
1549 block_number: BlockNumber,
1550 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1551 if self.cached_storage_settings().storage_v2 {
1552 self.static_file_provider.storage_changeset(block_number)
1553 } else {
1554 let range = block_number..=block_number;
1555 let storage_range = BlockNumberAddress::range(range);
1556 self.tx
1557 .cursor_dup_read::<tables::StorageChangeSets>()?
1558 .walk_range(storage_range)?
1559 .map(|r| {
1560 let (bna, entry) = r?;
1561 Ok((bna, entry))
1562 })
1563 .collect()
1564 }
1565 }
1566
1567 fn get_storage_before_block(
1568 &self,
1569 block_number: BlockNumber,
1570 address: Address,
1571 storage_key: B256,
1572 ) -> ProviderResult<Option<StorageEntry>> {
1573 if self.cached_storage_settings().storage_v2 {
1574 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1575 } else {
1576 Ok(self
1577 .tx
1578 .cursor_dup_read::<tables::StorageChangeSets>()?
1579 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1580 .filter(|entry| entry.key == storage_key))
1581 }
1582 }
1583
1584 fn storage_changesets_range(
1585 &self,
1586 range: impl RangeBounds<BlockNumber>,
1587 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1588 if self.cached_storage_settings().storage_v2 {
1589 self.static_file_provider.storage_changesets_range(range)
1590 } else {
1591 self.tx
1592 .cursor_dup_read::<tables::StorageChangeSets>()?
1593 .walk_range(BlockNumberAddressRange::from(range))?
1594 .map(|r| {
1595 let (bna, entry) = r?;
1596 Ok((bna, entry))
1597 })
1598 .collect()
1599 }
1600 }
1601}
1602
1603impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1604 fn account_block_changeset(
1605 &self,
1606 block_number: BlockNumber,
1607 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1608 if self.cached_storage_settings().storage_v2 {
1609 let static_changesets =
1610 self.static_file_provider.account_block_changeset(block_number)?;
1611 Ok(static_changesets)
1612 } else {
1613 let range = block_number..=block_number;
1614 self.tx
1615 .cursor_read::<tables::AccountChangeSets>()?
1616 .walk_range(range)?
1617 .map(|result| -> ProviderResult<_> {
1618 let (_, account_before) = result?;
1619 Ok(account_before)
1620 })
1621 .collect()
1622 }
1623 }
1624
1625 fn get_account_before_block(
1626 &self,
1627 block_number: BlockNumber,
1628 address: Address,
1629 ) -> ProviderResult<Option<AccountBeforeTx>> {
1630 if self.cached_storage_settings().storage_v2 {
1631 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1632 } else {
1633 self.tx
1634 .cursor_dup_read::<tables::AccountChangeSets>()?
1635 .seek_by_key_subkey(block_number, address)?
1636 .filter(|acc| acc.address == address)
1637 .map(Ok)
1638 .transpose()
1639 }
1640 }
1641
1642 fn account_changesets_range(
1643 &self,
1644 range: impl core::ops::RangeBounds<BlockNumber>,
1645 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1646 if self.cached_storage_settings().storage_v2 {
1647 self.static_file_provider.account_changesets_range(range)
1648 } else {
1649 self.tx
1650 .cursor_read::<tables::AccountChangeSets>()?
1651 .walk_range(to_range(range))?
1652 .map(|r| r.map_err(Into::into))
1653 .collect()
1654 }
1655 }
1656}
1657
1658impl<Tx: DbTx + 'static, N: NodeTypesForProvider> StateReader for DatabaseProvider<Tx, N> {
1659 type Receipt = ReceiptTy<N>;
1660
1661 fn get_state(
1662 &self,
1663 block: BlockNumber,
1664 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1665 let Some(block_body) = self.block_body_indices(block)? else { return Ok(None) };
1666
1667 let from_transaction_num = block_body.first_tx_num();
1668 let to_transaction_num = block_body.last_tx_num();
1669
1670 let account_changeset = self.account_changesets_range(block..=block)?;
1671 let storage_changeset = self.storage_changeset(block)?;
1672
1673 let Some(block_hash) = self.block_hash(block)? else { return Ok(None) };
1674 let state_provider = self.history_by_block_hash(block_hash)?;
1675 let (state, reverts) = self.populate_bundle_state_with_provider(
1676 account_changeset,
1677 storage_changeset,
1678 state_provider,
1679 )?;
1680
1681 let receipts = self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?;
1682
1683 Ok(Some(ExecutionOutcome::new_init(
1684 state,
1685 reverts,
1686 Vec::new(),
1688 vec![receipts],
1689 block,
1690 Vec::new(),
1691 )))
1692 }
1693}
1694
1695impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1696 for DatabaseProvider<TX, N>
1697{
1698 type Header = HeaderTy<N>;
1699
1700 fn local_tip_header(
1701 &self,
1702 highest_uninterrupted_block: BlockNumber,
1703 ) -> ProviderResult<SealedHeader<Self::Header>> {
1704 let static_file_provider = self.static_file_provider();
1705
1706 let next_static_file_block_num = static_file_provider
1709 .get_highest_static_file_block(StaticFileSegment::Headers)
1710 .map(|id| id + 1)
1711 .unwrap_or_default();
1712 let next_block = highest_uninterrupted_block + 1;
1713
1714 match next_static_file_block_num.cmp(&next_block) {
1715 Ordering::Greater => {
1718 let mut static_file_producer =
1719 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1720 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1721 static_file_producer.commit()?
1724 }
1725 Ordering::Less => {
1726 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1728 }
1729 Ordering::Equal => {}
1730 }
1731
1732 let local_head = static_file_provider
1733 .sealed_header(highest_uninterrupted_block)?
1734 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1735
1736 Ok(local_head)
1737 }
1738}
1739
1740impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1741 type Header = HeaderTy<N>;
1742
1743 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1744 if let Some(num) = self.block_number(block_hash)? {
1745 Ok(self.header_by_number(num)?)
1746 } else {
1747 Ok(None)
1748 }
1749 }
1750
1751 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1752 self.static_file_provider.header_by_number(num)
1753 }
1754
1755 fn headers_range(
1756 &self,
1757 range: impl RangeBounds<BlockNumber>,
1758 ) -> ProviderResult<Vec<Self::Header>> {
1759 self.static_file_provider.headers_range(range)
1760 }
1761
1762 fn sealed_header(
1763 &self,
1764 number: BlockNumber,
1765 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1766 self.static_file_provider.sealed_header(number)
1767 }
1768
1769 fn sealed_headers_while(
1770 &self,
1771 range: impl RangeBounds<BlockNumber>,
1772 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1773 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1774 self.static_file_provider.sealed_headers_while(range, predicate)
1775 }
1776}
1777
1778impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1779 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1780 self.static_file_provider.block_hash(number)
1781 }
1782
1783 fn canonical_hashes_range(
1784 &self,
1785 start: BlockNumber,
1786 end: BlockNumber,
1787 ) -> ProviderResult<Vec<B256>> {
1788 self.static_file_provider.canonical_hashes_range(start, end)
1789 }
1790}
1791
1792impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1793 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1794 let best_number = self.best_block_number()?;
1795 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1796 Ok(ChainInfo { best_hash, best_number })
1797 }
1798
1799 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1800 Ok(self
1803 .get_stage_checkpoint(StageId::Finish)?
1804 .map(|checkpoint| checkpoint.block_number)
1805 .unwrap_or_default())
1806 }
1807
1808 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1809 self.static_file_provider.last_block_number()
1810 }
1811
1812 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1813 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1814 }
1815}
1816
1817impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1818 type Block = BlockTy<N>;
1819
1820 fn find_block_by_hash(
1821 &self,
1822 hash: B256,
1823 source: BlockSource,
1824 ) -> ProviderResult<Option<Self::Block>> {
1825 if source.is_canonical() {
1826 self.block(hash.into())
1827 } else {
1828 Ok(None)
1829 }
1830 }
1831
1832 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1840 if let Some(number) = self.convert_hash_or_number(id)? {
1841 let earliest_available = self.static_file_provider.earliest_history_height();
1842 if number < earliest_available {
1843 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1844 }
1845
1846 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1847
1848 let Some(transactions) = self.transactions_by_block(number.into())? else {
1853 return Ok(None)
1854 };
1855
1856 let body = self
1857 .storage
1858 .reader()
1859 .read_block_bodies(self, vec![(&header, transactions)])?
1860 .pop()
1861 .ok_or(ProviderError::InvalidStorageOutput)?;
1862
1863 return Ok(Some(Self::Block::new(header, body)))
1864 }
1865
1866 Ok(None)
1867 }
1868
1869 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1870 Ok(None)
1871 }
1872
1873 fn pending_block_and_receipts(
1874 &self,
1875 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1876 Ok(None)
1877 }
1878
1879 fn recovered_block(
1888 &self,
1889 id: BlockHashOrNumber,
1890 transaction_kind: TransactionVariant,
1891 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1892 self.recovered_block(
1893 id,
1894 transaction_kind,
1895 |block_number| self.header_by_number(block_number),
1896 |header, body, senders| {
1897 Self::Block::new(header, body)
1898 .try_into_recovered_unchecked(senders)
1902 .map(Some)
1903 .map_err(|_| ProviderError::SenderRecoveryError)
1904 },
1905 )
1906 }
1907
1908 fn sealed_block_with_senders(
1909 &self,
1910 id: BlockHashOrNumber,
1911 transaction_kind: TransactionVariant,
1912 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1913 self.recovered_block(
1914 id,
1915 transaction_kind,
1916 |block_number| self.sealed_header(block_number),
1917 |header, body, senders| {
1918 Self::Block::new_sealed(header, body)
1919 .try_with_senders_unchecked(senders)
1923 .map(Some)
1924 .map_err(|_| ProviderError::SenderRecoveryError)
1925 },
1926 )
1927 }
1928
1929 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1930 self.block_range(
1931 range,
1932 |range| self.headers_range(range),
1933 |header, body, _| Ok(Self::Block::new(header, body)),
1934 )
1935 }
1936
1937 fn block_with_senders_range(
1938 &self,
1939 range: RangeInclusive<BlockNumber>,
1940 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1941 self.block_with_senders_range(
1942 range,
1943 |range| self.headers_range(range),
1944 |header, body, senders| {
1945 Self::Block::new(header, body)
1946 .try_into_recovered_unchecked(senders)
1947 .map_err(|_| ProviderError::SenderRecoveryError)
1948 },
1949 )
1950 }
1951
1952 fn recovered_block_range(
1953 &self,
1954 range: RangeInclusive<BlockNumber>,
1955 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1956 self.block_with_senders_range(
1957 range,
1958 |range| self.sealed_headers_range(range),
1959 |header, body, senders| {
1960 Self::Block::new_sealed(header, body)
1961 .try_with_senders(senders)
1962 .map_err(|_| ProviderError::SenderRecoveryError)
1963 },
1964 )
1965 }
1966
1967 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1968 Ok(self
1969 .tx
1970 .cursor_read::<tables::TransactionBlocks>()?
1971 .seek(id)
1972 .map(|b| b.map(|(_, bn)| bn))?)
1973 }
1974}
1975
1976impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1977 for DatabaseProvider<TX, N>
1978{
1979 fn transaction_hashes_by_range(
1982 &self,
1983 tx_range: Range<TxNumber>,
1984 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1985 self.static_file_provider.transaction_hashes_by_range(tx_range)
1986 }
1987}
1988
1989impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1991 type Transaction = TxTy<N>;
1992
1993 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1994 self.with_rocksdb_snapshot(|rocksdb_ref| {
1995 let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1996 reader.get_transaction_hash_number(tx_hash)
1997 })
1998 }
1999
2000 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2001 self.static_file_provider.transaction_by_id(id)
2002 }
2003
2004 fn transaction_by_id_unhashed(
2005 &self,
2006 id: TxNumber,
2007 ) -> ProviderResult<Option<Self::Transaction>> {
2008 self.static_file_provider.transaction_by_id_unhashed(id)
2009 }
2010
2011 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2012 if let Some(id) = self.transaction_id(hash)? {
2013 Ok(self.transaction_by_id_unhashed(id)?)
2014 } else {
2015 Ok(None)
2016 }
2017 }
2018
2019 fn transaction_by_hash_with_meta(
2020 &self,
2021 tx_hash: TxHash,
2022 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2023 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
2024 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
2025 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
2026 let Some(sealed_header) = self.sealed_header(block_number)?
2027 {
2028 let (header, block_hash) = sealed_header.split();
2029 if let Some(block_body) = self.block_body_indices(block_number)? {
2030 let index = transaction_id - block_body.first_tx_num();
2035
2036 let meta = TransactionMeta {
2037 tx_hash,
2038 index,
2039 block_hash,
2040 block_number,
2041 base_fee: header.base_fee_per_gas(),
2042 excess_blob_gas: header.excess_blob_gas(),
2043 timestamp: header.timestamp(),
2044 };
2045
2046 return Ok(Some((transaction, meta)))
2047 }
2048 }
2049
2050 Ok(None)
2051 }
2052
2053 fn transactions_by_block(
2054 &self,
2055 id: BlockHashOrNumber,
2056 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2057 if let Some(block_number) = self.convert_hash_or_number(id)? &&
2058 let Some(body) = self.block_body_indices(block_number)?
2059 {
2060 let tx_range = body.tx_num_range();
2061 return if tx_range.is_empty() {
2062 Ok(Some(Vec::new()))
2063 } else {
2064 self.transactions_by_tx_range(tx_range).map(Some)
2065 }
2066 }
2067 Ok(None)
2068 }
2069
2070 fn transactions_by_block_range(
2071 &self,
2072 range: impl RangeBounds<BlockNumber>,
2073 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2074 let range = to_range(range);
2075
2076 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2077 .into_iter()
2078 .map(|body| {
2079 let tx_num_range = body.tx_num_range();
2080 if tx_num_range.is_empty() {
2081 Ok(Vec::new())
2082 } else {
2083 self.transactions_by_tx_range(tx_num_range)
2084 }
2085 })
2086 .collect()
2087 }
2088
2089 fn transactions_by_tx_range(
2090 &self,
2091 range: impl RangeBounds<TxNumber>,
2092 ) -> ProviderResult<Vec<Self::Transaction>> {
2093 self.static_file_provider.transactions_by_tx_range(range)
2094 }
2095
2096 fn senders_by_tx_range(
2097 &self,
2098 range: impl RangeBounds<TxNumber>,
2099 ) -> ProviderResult<Vec<Address>> {
2100 if EitherWriterDestination::senders(self).is_static_file() {
2101 self.static_file_provider.senders_by_tx_range(range)
2102 } else {
2103 self.cursor_read_collect::<tables::TransactionSenders>(range)
2104 }
2105 }
2106
2107 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2108 if EitherWriterDestination::senders(self).is_static_file() {
2109 self.static_file_provider.transaction_sender(id)
2110 } else {
2111 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2112 }
2113 }
2114}
2115
2116impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2117 type Receipt = ReceiptTy<N>;
2118
2119 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2120 self.static_file_provider.get_with_static_file_or_database(
2121 StaticFileSegment::Receipts,
2122 id,
2123 |static_file| static_file.receipt(id),
2124 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2125 )
2126 }
2127
2128 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2129 if let Some(id) = self.transaction_id(hash)? {
2130 self.receipt(id)
2131 } else {
2132 Ok(None)
2133 }
2134 }
2135
2136 fn receipts_by_block(
2137 &self,
2138 block: BlockHashOrNumber,
2139 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2140 if let Some(number) = self.convert_hash_or_number(block)? &&
2141 let Some(body) = self.block_body_indices(number)?
2142 {
2143 let tx_range = body.tx_num_range();
2144 return if tx_range.is_empty() {
2145 Ok(Some(Vec::new()))
2146 } else {
2147 let receipts = self.receipts_by_tx_range(tx_range)?;
2148
2149 if receipts.len() != body.tx_count as usize {
2150 return Ok(None)
2151 }
2152
2153 Ok(Some(receipts))
2154 }
2155 }
2156 Ok(None)
2157 }
2158
2159 fn receipts_by_tx_range(
2160 &self,
2161 range: impl RangeBounds<TxNumber>,
2162 ) -> ProviderResult<Vec<Self::Receipt>> {
2163 self.static_file_provider.get_range_with_static_file_or_database(
2164 StaticFileSegment::Receipts,
2165 to_range(range),
2166 |static_file, range, _| static_file.receipts_by_tx_range(range),
2167 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2168 |_| true,
2169 )
2170 }
2171
2172 fn receipts_by_block_range(
2173 &self,
2174 block_range: RangeInclusive<BlockNumber>,
2175 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2176 if block_range.is_empty() {
2177 return Ok(Vec::new());
2178 }
2179
2180 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2182 let mut block_body_indices = Vec::with_capacity(range_len);
2183 for block_num in block_range {
2184 if let Some(indices) = self.block_body_indices(block_num)? {
2185 block_body_indices.push(indices);
2186 } else {
2187 block_body_indices.push(StoredBlockBodyIndices::default());
2189 }
2190 }
2191
2192 if block_body_indices.is_empty() {
2193 return Ok(Vec::new());
2194 }
2195
2196 let non_empty_blocks: Vec<_> =
2198 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2199
2200 if non_empty_blocks.is_empty() {
2201 return Ok(vec![Vec::new(); block_body_indices.len()]);
2203 }
2204
2205 let first_tx = non_empty_blocks[0].first_tx_num();
2207 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2208
2209 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2211 let mut receipts_iter = all_receipts.into_iter();
2212
2213 let mut result = Vec::with_capacity(block_body_indices.len());
2215 for indices in &block_body_indices {
2216 if indices.tx_count == 0 {
2217 result.push(Vec::new());
2218 } else {
2219 let block_receipts =
2220 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2221 result.push(block_receipts);
2222 }
2223 }
2224
2225 Ok(result)
2226 }
2227}
2228
2229impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2230 for DatabaseProvider<TX, N>
2231{
2232 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2233 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2234 }
2235
2236 fn block_body_indices_range(
2237 &self,
2238 range: RangeInclusive<BlockNumber>,
2239 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2240 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2241 }
2242}
2243
2244impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2245 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2246 Ok(if let Some(encoded) = id.get_pre_encoded() {
2247 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2248 } else {
2249 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2250 })
2251 }
2252
2253 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2255 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2256 }
2257
2258 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2259 self.tx
2260 .cursor_read::<tables::StageCheckpoints>()?
2261 .walk(None)?
2262 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2263 .map_err(ProviderError::Database)
2264 }
2265}
2266
2267impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2268 fn save_stage_checkpoint(
2270 &self,
2271 id: StageId,
2272 checkpoint: StageCheckpoint,
2273 ) -> ProviderResult<()> {
2274 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2275 }
2276
2277 fn save_stage_checkpoint_progress(
2279 &self,
2280 id: StageId,
2281 checkpoint: Vec<u8>,
2282 ) -> ProviderResult<()> {
2283 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2284 }
2285
2286 #[instrument(level = "debug", target = "providers::db", skip_all)]
2287 fn update_pipeline_stages(
2288 &self,
2289 block_number: BlockNumber,
2290 drop_stage_checkpoint: bool,
2291 ) -> ProviderResult<()> {
2292 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2294 for stage_id in StageId::ALL {
2295 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2296 cursor.upsert(
2297 stage_id.to_string(),
2298 &StageCheckpoint {
2299 block_number,
2300 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2301 },
2302 )?;
2303 }
2304
2305 Ok(())
2306 }
2307}
2308
2309impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2310 fn plain_state_storages(
2311 &self,
2312 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2313 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2314 if self.cached_storage_settings().use_hashed_state() {
2315 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2316
2317 addresses_with_keys
2318 .into_iter()
2319 .map(|(address, storage)| {
2320 let hashed_address = keccak256(address);
2321 storage
2322 .into_iter()
2323 .map(|key| -> ProviderResult<_> {
2324 let hashed_key = keccak256(key);
2325 let value = hashed_storage
2326 .seek_by_key_subkey(hashed_address, hashed_key)?
2327 .filter(|v| v.key == hashed_key)
2328 .map(|v| v.value)
2329 .unwrap_or_default();
2330 Ok(StorageEntry { key, value })
2331 })
2332 .collect::<ProviderResult<Vec<_>>>()
2333 .map(|storage| (address, storage))
2334 })
2335 .collect::<ProviderResult<Vec<(_, _)>>>()
2336 } else {
2337 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2338
2339 addresses_with_keys
2340 .into_iter()
2341 .map(|(address, storage)| {
2342 storage
2343 .into_iter()
2344 .map(|key| -> ProviderResult<_> {
2345 Ok(plain_storage
2346 .seek_by_key_subkey(address, key)?
2347 .filter(|v| v.key == key)
2348 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2349 })
2350 .collect::<ProviderResult<Vec<_>>>()
2351 .map(|storage| (address, storage))
2352 })
2353 .collect::<ProviderResult<Vec<(_, _)>>>()
2354 }
2355 }
2356
2357 fn changed_storages_with_range(
2358 &self,
2359 range: RangeInclusive<BlockNumber>,
2360 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2361 if self.cached_storage_settings().storage_v2 {
2362 self.storage_changesets_range(range)?.into_iter().try_fold(
2363 BTreeMap::new(),
2364 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2365 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2366 accounts.entry(address).or_default().insert(storage_entry.key);
2367 Ok(accounts)
2368 },
2369 )
2370 } else {
2371 self.tx
2372 .cursor_read::<tables::StorageChangeSets>()?
2373 .walk_range(BlockNumberAddress::range(range))?
2374 .try_fold(
2377 BTreeMap::new(),
2378 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2379 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2380 accounts.entry(address).or_default().insert(storage_entry.key);
2381 Ok(accounts)
2382 },
2383 )
2384 }
2385 }
2386
2387 fn changed_storages_and_blocks_with_range(
2388 &self,
2389 range: RangeInclusive<BlockNumber>,
2390 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2391 if self.cached_storage_settings().storage_v2 {
2392 self.storage_changesets_range(range)?.into_iter().try_fold(
2393 BTreeMap::new(),
2394 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2395 storages
2396 .entry((index.address(), storage.key))
2397 .or_default()
2398 .push(index.block_number());
2399 Ok(storages)
2400 },
2401 )
2402 } else {
2403 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2404
2405 let storage_changeset_lists =
2406 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2407 BTreeMap::new(),
2408 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2409 entry|
2410 -> ProviderResult<_> {
2411 let (index, storage) = entry?;
2412 storages
2413 .entry((index.address(), storage.key))
2414 .or_default()
2415 .push(index.block_number());
2416 Ok(storages)
2417 },
2418 )?;
2419
2420 Ok(storage_changeset_lists)
2421 }
2422 }
2423}
2424
2425impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2426 for DatabaseProvider<TX, N>
2427{
2428 type Receipt = ReceiptTy<N>;
2429
2430 #[instrument(level = "debug", target = "providers::db", skip_all)]
2431 fn write_state<'a>(
2432 &self,
2433 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2434 is_value_known: OriginalValuesKnown,
2435 config: StateWriteConfig,
2436 ) -> ProviderResult<()> {
2437 let execution_outcome = execution_outcome.into();
2438
2439 if self.cached_storage_settings().use_hashed_state() &&
2440 !config.write_receipts &&
2441 !config.write_account_changesets &&
2442 !config.write_storage_changesets
2443 {
2444 self.write_bytecodes(
2448 execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2449 )?;
2450 return Ok(());
2451 }
2452
2453 let first_block = execution_outcome.first_block();
2454 let (plain_state, reverts) =
2455 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2456
2457 self.write_state_reverts(reverts, first_block, config)?;
2458 self.write_state_changes(plain_state)?;
2459
2460 if !config.write_receipts {
2461 return Ok(());
2462 }
2463
2464 let block_count = execution_outcome.len() as u64;
2465 let last_block = execution_outcome.last_block();
2466 let block_range = first_block..=last_block;
2467
2468 let tip = self.last_block_number()?.max(last_block);
2469
2470 let block_indices: Vec<_> = self
2472 .block_body_indices_range(block_range)?
2473 .into_iter()
2474 .map(|b| b.first_tx_num)
2475 .collect();
2476
2477 if block_indices.len() < block_count as usize {
2479 let missing_blocks = block_count - block_indices.len() as u64;
2480 return Err(ProviderError::BlockBodyIndicesNotFound(
2481 last_block.saturating_sub(missing_blocks - 1),
2482 ));
2483 }
2484
2485 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2486
2487 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2488 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2489
2490 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2498 self.static_file_provider()
2499 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2500 .is_none()) &&
2501 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2502
2503 let mut allowed_addresses: AddressSet = AddressSet::default();
2505 for (_, addresses) in contract_log_pruner.range(..first_block) {
2506 allowed_addresses.extend(addresses.iter().copied());
2507 }
2508
2509 for (idx, (receipts, first_tx_index)) in
2510 execution_outcome.receipts().zip(block_indices).enumerate()
2511 {
2512 let block_number = first_block + idx as u64;
2513
2514 receipts_writer.increment_block(block_number)?;
2516
2517 if prunable_receipts &&
2519 self.prune_modes
2520 .receipts
2521 .is_some_and(|mode| mode.should_prune(block_number, tip))
2522 {
2523 continue
2524 }
2525
2526 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2528 allowed_addresses.extend(new_addresses.iter().copied());
2529 }
2530
2531 for (idx, receipt) in receipts.iter().enumerate() {
2532 let receipt_idx = first_tx_index + idx as u64;
2533 if prunable_receipts &&
2536 has_contract_log_filter &&
2537 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2538 {
2539 continue
2540 }
2541
2542 receipts_writer.append_receipt(receipt_idx, receipt)?;
2543 }
2544 }
2545
2546 Ok(())
2547 }
2548
2549 fn write_state_reverts(
2550 &self,
2551 reverts: PlainStateReverts,
2552 first_block: BlockNumber,
2553 config: StateWriteConfig,
2554 ) -> ProviderResult<()> {
2555 if config.write_storage_changesets {
2557 tracing::trace!("Writing storage changes");
2558 let mut storages_cursor =
2559 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2560 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2561 let block_number = first_block + block_index as BlockNumber;
2562
2563 tracing::trace!(block_number, "Writing block change");
2564 storage_changes.par_sort_unstable_by_key(|a| a.address);
2566 let total_changes =
2567 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2568 let mut changeset = Vec::with_capacity(total_changes);
2569 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2570 let mut storage = storage_revert
2571 .into_iter()
2572 .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2573 .collect::<Vec<_>>();
2574 storage.par_sort_unstable_by_key(|a| a.0);
2576
2577 let mut wiped_storage = Vec::new();
2585 if wiped {
2586 tracing::trace!(?address, "Wiping storage");
2587 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2588 wiped_storage.push((entry.key, entry.value));
2589 while let Some(entry) = storages_cursor.next_dup_val()? {
2590 wiped_storage.push((entry.key, entry.value))
2591 }
2592 }
2593 }
2594
2595 tracing::trace!(?address, ?storage, "Writing storage reverts");
2596 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2597 changeset.push(StorageBeforeTx { address, key, value });
2598 }
2599 }
2600
2601 let mut storage_changesets_writer =
2602 EitherWriter::new_storage_changesets(self, block_number)?;
2603 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2604 }
2605 }
2606
2607 if !config.write_account_changesets {
2608 return Ok(());
2609 }
2610
2611 tracing::trace!(?first_block, "Writing account changes");
2613 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2614 let block_number = first_block + block_index as BlockNumber;
2615 let changeset = account_block_reverts
2616 .into_iter()
2617 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2618 .collect::<Vec<_>>();
2619 let mut account_changesets_writer =
2620 EitherWriter::new_account_changesets(self, block_number)?;
2621
2622 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2623 }
2624
2625 Ok(())
2626 }
2627
2628 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2629 changes.accounts.par_sort_by_key(|a| a.0);
2632 changes.storage.par_sort_by_key(|a| a.address);
2633 changes.contracts.par_sort_by_key(|a| a.0);
2634
2635 if !self.cached_storage_settings().use_hashed_state() {
2636 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2638 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2639 for (address, account) in changes.accounts {
2641 if let Some(account) = account {
2642 tracing::trace!(?address, "Updating plain state account");
2643 accounts_cursor.upsert(address, &account.into())?;
2644 } else if accounts_cursor.seek_exact(address)?.is_some() {
2645 tracing::trace!(?address, "Deleting plain state account");
2646 accounts_cursor.delete_current()?;
2647 }
2648 }
2649
2650 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2652 let mut storages_cursor =
2653 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2654 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2655 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2657 storages_cursor.delete_current_duplicates()?;
2658 }
2659 let mut storage = storage
2661 .into_iter()
2662 .map(|(k, value)| StorageEntry { key: k.into(), value })
2663 .collect::<Vec<_>>();
2664 storage.par_sort_unstable_by_key(|a| a.key);
2666
2667 for entry in storage {
2668 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2669 if let Some(db_entry) =
2670 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2671 db_entry.key == entry.key
2672 {
2673 storages_cursor.delete_current()?;
2674 }
2675
2676 if !entry.value.is_zero() {
2677 storages_cursor.upsert(address, &entry)?;
2678 }
2679 }
2680 }
2681 }
2682
2683 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2685 self.write_bytecodes(
2686 changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2687 )?;
2688
2689 Ok(())
2690 }
2691
2692 #[instrument(level = "debug", target = "providers::db", skip_all)]
2693 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2694 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2696 for (hashed_address, account) in hashed_state.accounts() {
2697 if let Some(account) = account {
2698 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2699 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2700 hashed_accounts_cursor.delete_current()?;
2701 }
2702 }
2703
2704 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2706 let mut hashed_storage_cursor =
2707 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2708 for (hashed_address, storage) in sorted_storages {
2709 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2710 hashed_storage_cursor.delete_current_duplicates()?;
2711 }
2712
2713 for (hashed_slot, value) in storage.storage_slots_ref() {
2714 let entry = StorageEntry { key: *hashed_slot, value: *value };
2715
2716 if let Some(db_entry) =
2717 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2718 db_entry.key == entry.key
2719 {
2720 hashed_storage_cursor.delete_current()?;
2721 }
2722
2723 if !entry.value.is_zero() {
2724 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2725 }
2726 }
2727 }
2728
2729 Ok(())
2730 }
2731
2732 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2754 let range = block + 1..=self.last_block_number()?;
2755
2756 if range.is_empty() {
2757 return Ok(());
2758 }
2759
2760 let block_bodies = self.block_body_indices_range(range.clone())?;
2762
2763 let from_transaction_num =
2765 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2766
2767 let storage_range = BlockNumberAddress::range(range.clone());
2768 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2769 let changesets = self.storage_changesets_range(range.clone())?;
2770 let mut changeset_writer =
2771 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2772 changeset_writer.prune_storage_changesets(block)?;
2773 changesets
2774 } else {
2775 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2776 };
2777 let account_changeset = if self.cached_storage_settings().storage_v2 {
2778 let changesets = self.account_changesets_range(range)?;
2779 let mut changeset_writer =
2780 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2781 changeset_writer.prune_account_changesets(block)?;
2782 changesets
2783 } else {
2784 self.take::<tables::AccountChangeSets>(range)?
2785 };
2786
2787 if self.cached_storage_settings().use_hashed_state() {
2788 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2789 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2790
2791 let (state, _) = self.populate_bundle_state_hashed(
2792 account_changeset,
2793 storage_changeset,
2794 &mut hashed_accounts_cursor,
2795 &mut hashed_storage_cursor,
2796 )?;
2797
2798 for (address, (old_account, new_account, storage)) in &state {
2799 if old_account != new_account {
2800 let hashed_address = keccak256(address);
2801 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2802 if let Some(account) = old_account {
2803 hashed_accounts_cursor.upsert(hashed_address, account)?;
2804 } else if existing_entry.is_some() {
2805 hashed_accounts_cursor.delete_current()?;
2806 }
2807 }
2808
2809 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2810 let hashed_address = keccak256(address);
2811 let hashed_storage_key = keccak256(storage_key);
2812 let storage_entry =
2813 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2814 if hashed_storage_cursor
2815 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2816 .is_some_and(|s| s.key == hashed_storage_key)
2817 {
2818 hashed_storage_cursor.delete_current()?
2819 }
2820
2821 if !old_storage_value.is_zero() {
2822 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2823 }
2824 }
2825 }
2826 } else {
2827 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2832 let mut plain_storage_cursor =
2833 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2834
2835 let (state, _) = self.populate_bundle_state_plain(
2836 account_changeset,
2837 storage_changeset,
2838 &mut plain_accounts_cursor,
2839 &mut plain_storage_cursor,
2840 )?;
2841
2842 for (address, (old_account, new_account, storage)) in &state {
2843 if old_account != new_account {
2844 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2845 if let Some(account) = old_account {
2846 plain_accounts_cursor.upsert(*address, account)?;
2847 } else if existing_entry.is_some() {
2848 plain_accounts_cursor.delete_current()?;
2849 }
2850 }
2851
2852 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2853 let storage_entry =
2854 StorageEntry { key: *storage_key, value: *old_storage_value };
2855 if plain_storage_cursor
2856 .seek_by_key_subkey(*address, *storage_key)?
2857 .is_some_and(|s| s.key == *storage_key)
2858 {
2859 plain_storage_cursor.delete_current()?
2860 }
2861
2862 if !old_storage_value.is_zero() {
2863 plain_storage_cursor.upsert(*address, &storage_entry)?;
2864 }
2865 }
2866 }
2867 }
2868
2869 self.remove_receipts_from(from_transaction_num, block)?;
2870
2871 Ok(())
2872 }
2873
2874 fn take_state_above(
2896 &self,
2897 block: BlockNumber,
2898 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2899 let range = block + 1..=self.last_block_number()?;
2900
2901 if range.is_empty() {
2902 return Ok(ExecutionOutcome::default())
2903 }
2904 let start_block_number = *range.start();
2905
2906 let block_bodies = self.block_body_indices_range(range.clone())?;
2908
2909 let from_transaction_num =
2911 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2912 let to_transaction_num =
2913 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2914
2915 let storage_range = BlockNumberAddress::range(range.clone());
2916 let storage_changeset = if let Some(highest_block) = self
2917 .static_file_provider
2918 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2919 self.cached_storage_settings().storage_v2
2920 {
2921 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2922 let mut changeset_writer =
2923 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2924 changeset_writer.prune_storage_changesets(block)?;
2925 changesets
2926 } else {
2927 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2928 };
2929
2930 let highest_changeset_block = self
2932 .static_file_provider
2933 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2934 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2935 self.cached_storage_settings().storage_v2
2936 {
2937 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2939 let mut changeset_writer =
2940 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2941 changeset_writer.prune_account_changesets(block)?;
2942
2943 changesets
2944 } else {
2945 self.take::<tables::AccountChangeSets>(range)?
2948 };
2949
2950 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2951 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2952 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2953
2954 let (state, reverts) = self.populate_bundle_state_hashed(
2955 account_changeset,
2956 storage_changeset,
2957 &mut hashed_accounts_cursor,
2958 &mut hashed_storage_cursor,
2959 )?;
2960
2961 for (address, (old_account, new_account, storage)) in &state {
2962 if old_account != new_account {
2963 let hashed_address = keccak256(address);
2964 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2965 if let Some(account) = old_account {
2966 hashed_accounts_cursor.upsert(hashed_address, account)?;
2967 } else if existing_entry.is_some() {
2968 hashed_accounts_cursor.delete_current()?;
2969 }
2970 }
2971
2972 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2973 let hashed_address = keccak256(address);
2974 let hashed_storage_key = keccak256(storage_key);
2975 let storage_entry =
2976 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2977 if hashed_storage_cursor
2978 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2979 .is_some_and(|s| s.key == hashed_storage_key)
2980 {
2981 hashed_storage_cursor.delete_current()?
2982 }
2983
2984 if !old_storage_value.is_zero() {
2985 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2986 }
2987 }
2988 }
2989
2990 (state, reverts)
2991 } else {
2992 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2997 let mut plain_storage_cursor =
2998 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2999
3000 let (state, reverts) = self.populate_bundle_state_plain(
3001 account_changeset,
3002 storage_changeset,
3003 &mut plain_accounts_cursor,
3004 &mut plain_storage_cursor,
3005 )?;
3006
3007 for (address, (old_account, new_account, storage)) in &state {
3008 if old_account != new_account {
3009 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
3010 if let Some(account) = old_account {
3011 plain_accounts_cursor.upsert(*address, account)?;
3012 } else if existing_entry.is_some() {
3013 plain_accounts_cursor.delete_current()?;
3014 }
3015 }
3016
3017 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
3018 let storage_entry =
3019 StorageEntry { key: *storage_key, value: *old_storage_value };
3020 if plain_storage_cursor
3021 .seek_by_key_subkey(*address, *storage_key)?
3022 .is_some_and(|s| s.key == *storage_key)
3023 {
3024 plain_storage_cursor.delete_current()?
3025 }
3026
3027 if !old_storage_value.is_zero() {
3028 plain_storage_cursor.upsert(*address, &storage_entry)?;
3029 }
3030 }
3031 }
3032
3033 (state, reverts)
3034 };
3035
3036 let mut receipts_iter = self
3038 .static_file_provider
3039 .get_range_with_static_file_or_database(
3040 StaticFileSegment::Receipts,
3041 from_transaction_num..to_transaction_num + 1,
3042 |static_file, range, _| {
3043 static_file
3044 .receipts_by_tx_range(range.clone())
3045 .map(|r| range.into_iter().zip(r).collect())
3046 },
3047 |range, _| {
3048 self.tx
3049 .cursor_read::<tables::Receipts<Self::Receipt>>()?
3050 .walk_range(range)?
3051 .map(|r| r.map_err(Into::into))
3052 .collect()
3053 },
3054 |_| true,
3055 )?
3056 .into_iter()
3057 .peekable();
3058
3059 let mut receipts = Vec::with_capacity(block_bodies.len());
3060 for block_body in block_bodies {
3062 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3063 for num in block_body.tx_num_range() {
3064 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3065 block_receipts.push(receipts_iter.next().unwrap().1);
3066 }
3067 }
3068 receipts.push(block_receipts);
3069 }
3070
3071 self.remove_receipts_from(from_transaction_num, block)?;
3072
3073 Ok(ExecutionOutcome::new_init(
3074 state,
3075 reverts,
3076 Vec::new(),
3077 receipts,
3078 start_block_number,
3079 Vec::new(),
3080 ))
3081 }
3082}
3083
3084impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3085 fn write_account_trie_updates<A: TrieTableAdapter>(
3086 tx: &TX,
3087 trie_updates: &TrieUpdatesSorted,
3088 num_entries: &mut usize,
3089 ) -> ProviderResult<()>
3090 where
3091 TX: DbTxMut,
3092 {
3093 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3094 for (key, updated_node) in trie_updates.account_nodes_ref() {
3096 let nibbles = A::AccountKey::from(*key);
3097 match updated_node {
3098 Some(node) => {
3099 if !key.is_empty() {
3100 *num_entries += 1;
3101 account_trie_cursor.upsert(nibbles, node)?;
3102 }
3103 }
3104 None => {
3105 *num_entries += 1;
3106 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3107 account_trie_cursor.delete_current()?;
3108 }
3109 }
3110 }
3111 }
3112 Ok(())
3113 }
3114
3115 fn write_storage_tries<A: TrieTableAdapter>(
3116 tx: &TX,
3117 storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3118 num_entries: &mut usize,
3119 ) -> ProviderResult<()>
3120 where
3121 TX: DbTxMut,
3122 {
3123 let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3124 for (hashed_address, storage_trie_updates) in storage_tries {
3125 let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3126 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3127 *num_entries +=
3128 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3129 cursor = db_storage_trie_cursor.cursor;
3130 }
3131 Ok(())
3132 }
3133}
3134
3135impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3136 #[instrument(level = "debug", target = "providers::db", skip_all)]
3140 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3141 if trie_updates.is_empty() {
3142 return Ok(0)
3143 }
3144
3145 let mut num_entries = 0;
3147
3148 reth_trie_db::with_adapter!(self, |A| {
3149 Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3150 });
3151
3152 num_entries +=
3153 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3154
3155 Ok(num_entries)
3156 }
3157}
3158
3159impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3160 fn write_storage_trie_updates_sorted<'a>(
3166 &self,
3167 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3168 ) -> ProviderResult<usize> {
3169 let mut num_entries = 0;
3170 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3171 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3172 reth_trie_db::with_adapter!(self, |A| {
3173 Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3174 });
3175 Ok(num_entries)
3176 }
3177}
3178
3179impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3180 fn unwind_account_hashing<'a>(
3181 &self,
3182 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3183 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3184 let hashed_accounts = changesets
3188 .into_iter()
3189 .map(|(_, e)| (keccak256(e.address), e.info))
3190 .collect::<Vec<_>>()
3191 .into_iter()
3192 .rev()
3193 .collect::<BTreeMap<_, _>>();
3194
3195 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3197 for (hashed_address, account) in &hashed_accounts {
3198 if let Some(account) = account {
3199 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3200 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3201 hashed_accounts_cursor.delete_current()?;
3202 }
3203 }
3204
3205 Ok(hashed_accounts)
3206 }
3207
3208 fn unwind_account_hashing_range(
3209 &self,
3210 range: impl RangeBounds<BlockNumber>,
3211 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3212 let changesets = self.account_changesets_range(range)?;
3213 self.unwind_account_hashing(changesets.iter())
3214 }
3215
3216 fn insert_account_for_hashing(
3217 &self,
3218 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3219 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3220 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3221 let hashed_accounts =
3222 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3223 for (hashed_address, account) in &hashed_accounts {
3224 if let Some(account) = account {
3225 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3226 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3227 hashed_accounts_cursor.delete_current()?;
3228 }
3229 }
3230 Ok(hashed_accounts)
3231 }
3232
3233 fn unwind_storage_hashing(
3234 &self,
3235 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3236 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3237 let mut hashed_storages = changesets
3239 .into_iter()
3240 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3241 let hashed_key = keccak256(storage_entry.key);
3242 (keccak256(address), hashed_key, storage_entry.value)
3243 })
3244 .collect::<Vec<_>>();
3245 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3246
3247 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3249 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3250 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3251 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3252 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3253
3254 if hashed_storage
3255 .seek_by_key_subkey(hashed_address, key)?
3256 .is_some_and(|entry| entry.key == key)
3257 {
3258 hashed_storage.delete_current()?;
3259 }
3260
3261 if !value.is_zero() {
3262 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3263 }
3264 }
3265 Ok(hashed_storage_keys)
3266 }
3267
3268 fn unwind_storage_hashing_range(
3269 &self,
3270 range: impl RangeBounds<BlockNumber>,
3271 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3272 let changesets = self.storage_changesets_range(range)?;
3273 self.unwind_storage_hashing(changesets.into_iter())
3274 }
3275
3276 fn insert_storage_for_hashing(
3277 &self,
3278 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3279 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3280 let hashed_storages =
3282 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3283 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3284 map.insert(keccak256(entry.key), entry.value);
3285 map
3286 });
3287 map.insert(keccak256(address), storage);
3288 map
3289 });
3290
3291 let hashed_storage_keys = hashed_storages
3292 .iter()
3293 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3294 .collect();
3295
3296 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3297 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3300 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3301 if hashed_storage_cursor
3302 .seek_by_key_subkey(hashed_address, key)?
3303 .is_some_and(|entry| entry.key == key)
3304 {
3305 hashed_storage_cursor.delete_current()?;
3306 }
3307
3308 if !value.is_zero() {
3309 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3310 }
3311 Ok(())
3312 })
3313 })?;
3314
3315 Ok(hashed_storage_keys)
3316 }
3317}
3318
3319impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3320 fn unwind_account_history_indices<'a>(
3321 &self,
3322 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3323 ) -> ProviderResult<usize> {
3324 let mut last_indices = changesets
3325 .into_iter()
3326 .map(|(index, account)| (account.address, *index))
3327 .collect::<Vec<_>>();
3328 last_indices.sort_unstable_by_key(|(a, _)| *a);
3329
3330 if self.cached_storage_settings().storage_v2 {
3331 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3332 self.pending_rocksdb_batches.lock().push(batch);
3333 } else {
3334 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3336 for &(address, rem_index) in &last_indices {
3337 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3338 &mut cursor,
3339 ShardedKey::last(address),
3340 rem_index,
3341 |sharded_key| sharded_key.key == address,
3342 )?;
3343
3344 if !partial_shard.is_empty() {
3347 cursor.insert(
3348 ShardedKey::last(address),
3349 &BlockNumberList::new_pre_sorted(partial_shard),
3350 )?;
3351 }
3352 }
3353 }
3354
3355 let changesets = last_indices.len();
3356 Ok(changesets)
3357 }
3358
3359 fn unwind_account_history_indices_range(
3360 &self,
3361 range: impl RangeBounds<BlockNumber>,
3362 ) -> ProviderResult<usize> {
3363 let changesets = self.account_changesets_range(range)?;
3364 self.unwind_account_history_indices(changesets.iter())
3365 }
3366
3367 fn insert_account_history_index(
3368 &self,
3369 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3370 ) -> ProviderResult<()> {
3371 self.append_history_index::<_, tables::AccountsHistory>(
3372 account_transitions,
3373 ShardedKey::new,
3374 )
3375 }
3376
3377 fn unwind_storage_history_indices(
3378 &self,
3379 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3380 ) -> ProviderResult<usize> {
3381 let mut storage_changesets = changesets
3382 .into_iter()
3383 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3384 .collect::<Vec<_>>();
3385 storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3386
3387 if self.cached_storage_settings().storage_v2 {
3388 let batch =
3389 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3390 self.pending_rocksdb_batches.lock().push(batch);
3391 } else {
3392 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3394 for &(address, storage_key, rem_index) in &storage_changesets {
3395 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3396 &mut cursor,
3397 StorageShardedKey::last(address, storage_key),
3398 rem_index,
3399 |storage_sharded_key| {
3400 storage_sharded_key.address == address &&
3401 storage_sharded_key.sharded_key.key == storage_key
3402 },
3403 )?;
3404
3405 if !partial_shard.is_empty() {
3408 cursor.insert(
3409 StorageShardedKey::last(address, storage_key),
3410 &BlockNumberList::new_pre_sorted(partial_shard),
3411 )?;
3412 }
3413 }
3414 }
3415
3416 let changesets = storage_changesets.len();
3417 Ok(changesets)
3418 }
3419
3420 fn unwind_storage_history_indices_range(
3421 &self,
3422 range: impl RangeBounds<BlockNumber>,
3423 ) -> ProviderResult<usize> {
3424 let changesets = self.storage_changesets_range(range)?;
3425 self.unwind_storage_history_indices(changesets.into_iter())
3426 }
3427
3428 fn insert_storage_history_index(
3429 &self,
3430 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3431 ) -> ProviderResult<()> {
3432 self.append_history_index::<_, tables::StoragesHistory>(
3433 storage_transitions,
3434 |(address, storage_key), highest_block_number| {
3435 StorageShardedKey::new(address, storage_key, highest_block_number)
3436 },
3437 )
3438 }
3439
3440 #[instrument(level = "debug", target = "providers::db", skip_all)]
3441 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3442 let storage_settings = self.cached_storage_settings();
3443 if !storage_settings.storage_v2 {
3444 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3445 self.insert_account_history_index(indices)?;
3446 }
3447
3448 if !storage_settings.storage_v2 {
3449 let indices = self.changed_storages_and_blocks_with_range(range)?;
3450 self.insert_storage_history_index(indices)?;
3451 }
3452
3453 Ok(())
3454 }
3455}
3456
3457impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3458 for DatabaseProvider<TX, N>
3459{
3460 fn take_block_and_execution_above(
3461 &self,
3462 block: BlockNumber,
3463 ) -> ProviderResult<Chain<Self::Primitives>> {
3464 let range = block + 1..=self.last_block_number()?;
3465
3466 self.unwind_trie_state_from(block + 1)?;
3467
3468 let execution_state = self.take_state_above(block)?;
3470
3471 let blocks = self.recovered_block_range(range)?;
3472
3473 self.remove_blocks_above(block)?;
3476
3477 self.update_pipeline_stages(block, true)?;
3479
3480 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3481 }
3482
3483 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3484 self.unwind_trie_state_from(block + 1)?;
3485
3486 self.remove_state_above(block)?;
3488
3489 self.remove_blocks_above(block)?;
3492
3493 self.update_pipeline_stages(block, true)?;
3495
3496 Ok(())
3497 }
3498}
3499
3500impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3501 for DatabaseProvider<TX, N>
3502{
3503 type Block = BlockTy<N>;
3504 type Receipt = ReceiptTy<N>;
3505
3506 fn insert_block(
3511 &self,
3512 block: &RecoveredBlock<Self::Block>,
3513 ) -> ProviderResult<StoredBlockBodyIndices> {
3514 let block_number = block.number();
3515
3516 let executed_block = ExecutedBlock::new(
3518 Arc::new(block.clone()),
3519 Arc::new(BlockExecutionOutput {
3520 result: BlockExecutionResult {
3521 receipts: Default::default(),
3522 requests: Default::default(),
3523 gas_used: 0,
3524 blob_gas_used: 0,
3525 },
3526 state: Default::default(),
3527 }),
3528 ComputedTrieData::default(),
3529 );
3530
3531 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3533
3534 self.block_body_indices(block_number)?
3536 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3537 }
3538
3539 fn append_block_bodies(
3540 &self,
3541 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3542 ) -> ProviderResult<()> {
3543 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3544
3545 let mut tx_writer =
3547 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3548
3549 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3550 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3551
3552 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3554
3555 for (block_number, body) in &bodies {
3556 tx_writer.increment_block(*block_number)?;
3558
3559 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3560 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3561
3562 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3563
3564 block_indices_cursor.append(*block_number, &block_indices)?;
3566
3567 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3568
3569 let Some(body) = body else { continue };
3570
3571 if !body.transactions().is_empty() {
3573 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3574 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3575 }
3576
3577 for transaction in body.transactions() {
3579 tx_writer.append_transaction(next_tx_num, transaction)?;
3580
3581 next_tx_num += 1;
3583 }
3584 }
3585
3586 self.storage.writer().write_block_bodies(self, bodies)?;
3587
3588 Ok(())
3589 }
3590
3591 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3592 let last_block_number = self.last_block_number()?;
3593 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3595 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3596 }
3597
3598 let highest_static_file_block = self
3600 .static_file_provider()
3601 .get_highest_static_file_block(StaticFileSegment::Headers)
3602 .expect("todo: error handling, headers should exist");
3603
3604 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3610 self.static_file_provider()
3611 .get_writer(block, StaticFileSegment::Headers)?
3612 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3613
3614 let unwind_tx_from = self
3616 .block_body_indices(block)?
3617 .map(|b| b.next_tx_num())
3618 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3619
3620 let unwind_tx_to = self
3622 .tx
3623 .cursor_read::<tables::BlockBodyIndices>()?
3624 .last()?
3625 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3627 .1
3628 .last_tx_num();
3629
3630 if unwind_tx_from <= unwind_tx_to {
3631 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3632 self.with_rocksdb_batch(|batch| {
3633 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3634 for (hash, _) in hashes {
3635 writer.delete_transaction_hash_number(hash)?;
3636 }
3637 Ok(((), writer.into_raw_rocksdb_batch()))
3638 })?;
3639 }
3640
3641 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3644 EitherWriter::new_senders(self, last_block_number)?
3645 .prune_senders(unwind_tx_from, block)?;
3646 }
3647
3648 self.remove_bodies_above(block)?;
3649
3650 Ok(())
3651 }
3652
3653 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3654 self.storage.writer().remove_block_bodies_above(self, block)?;
3655
3656 let unwind_tx_from = self
3658 .block_body_indices(block)?
3659 .map(|b| b.next_tx_num())
3660 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3661
3662 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3663 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3664
3665 let static_file_tx_num =
3666 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3667
3668 let to_delete = static_file_tx_num
3669 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3670 .unwrap_or_default();
3671
3672 self.static_file_provider
3673 .latest_writer(StaticFileSegment::Transactions)?
3674 .prune_transactions(to_delete, block)?;
3675
3676 Ok(())
3677 }
3678
3679 fn append_blocks_with_state(
3688 &self,
3689 blocks: Vec<RecoveredBlock<Self::Block>>,
3690 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3691 hashed_state: HashedPostStateSorted,
3692 ) -> ProviderResult<()> {
3693 if blocks.is_empty() {
3694 debug!(target: "providers::db", "Attempted to append empty block range");
3695 return Ok(())
3696 }
3697
3698 let first_number = blocks[0].number();
3701
3702 let last_block_number = blocks[blocks.len() - 1].number();
3705
3706 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3707
3708 let (account_transitions, storage_transitions) = {
3713 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3714 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3715 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3716 let block_number = first_number + block_idx as u64;
3717 for (address, account_revert) in block_reverts {
3718 account_transitions.entry(*address).or_default().push(block_number);
3719 for storage_key in account_revert.storage.keys() {
3720 let key = B256::from(storage_key.to_be_bytes());
3721 storage_transitions.entry((*address, key)).or_default().push(block_number);
3722 }
3723 }
3724 }
3725 (account_transitions, storage_transitions)
3726 };
3727
3728 for block in blocks {
3730 self.insert_block(&block)?;
3731 durations_recorder.record_relative(metrics::Action::InsertBlock);
3732 }
3733
3734 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3735 durations_recorder.record_relative(metrics::Action::InsertState);
3736
3737 self.write_hashed_state(&hashed_state)?;
3739 durations_recorder.record_relative(metrics::Action::InsertHashes);
3740
3741 let storage_settings = self.cached_storage_settings();
3746 if storage_settings.storage_v2 {
3747 self.with_rocksdb_batch(|mut batch| {
3748 for (address, blocks) in account_transitions {
3749 batch.append_account_history_shard(address, blocks)?;
3750 }
3751 Ok(((), Some(batch.into_inner())))
3752 })?;
3753 } else {
3754 self.insert_account_history_index(account_transitions)?;
3755 }
3756 if storage_settings.storage_v2 {
3757 self.with_rocksdb_batch(|mut batch| {
3758 for ((address, key), blocks) in storage_transitions {
3759 batch.append_storage_history_shard(address, key, blocks)?;
3760 }
3761 Ok(((), Some(batch.into_inner())))
3762 })?;
3763 } else {
3764 self.insert_storage_history_index(storage_transitions)?;
3765 }
3766 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3767
3768 self.update_pipeline_stages(last_block_number, false)?;
3770 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3771
3772 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3773
3774 Ok(())
3775 }
3776}
3777
3778impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3779 fn get_prune_checkpoint(
3780 &self,
3781 segment: PruneSegment,
3782 ) -> ProviderResult<Option<PruneCheckpoint>> {
3783 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3784 }
3785
3786 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3787 Ok(PruneSegment::variants()
3788 .filter_map(|segment| {
3789 self.tx
3790 .get::<tables::PruneCheckpoints>(segment)
3791 .transpose()
3792 .map(|chk| chk.map(|chk| (segment, chk)))
3793 })
3794 .collect::<Result<_, _>>()?)
3795 }
3796}
3797
3798impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3799 fn save_prune_checkpoint(
3800 &self,
3801 segment: PruneSegment,
3802 checkpoint: PruneCheckpoint,
3803 ) -> ProviderResult<()> {
3804 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3805 }
3806}
3807
3808impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3809 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3810 let db_entries = self.tx.entries::<T>()?;
3811 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3812 Ok(entries) => entries,
3813 Err(ProviderError::UnsupportedProvider) => 0,
3814 Err(err) => return Err(err),
3815 };
3816
3817 Ok(db_entries + static_file_entries)
3818 }
3819}
3820
3821impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3822 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3823 let mut finalized_blocks = self
3824 .tx
3825 .cursor_read::<tables::ChainState>()?
3826 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3827 .take(1)
3828 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3829
3830 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3831 Ok(last_finalized_block_number)
3832 }
3833
3834 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3835 let mut finalized_blocks = self
3836 .tx
3837 .cursor_read::<tables::ChainState>()?
3838 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3839 .take(1)
3840 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3841
3842 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3843 Ok(last_finalized_block_number)
3844 }
3845}
3846
3847impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3848 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3849 Ok(self
3850 .tx
3851 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3852 }
3853
3854 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3855 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3856 }
3857}
3858
3859impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3860 type Tx = TX;
3861
3862 fn tx_ref(&self) -> &Self::Tx {
3863 &self.tx
3864 }
3865
3866 fn tx_mut(&mut self) -> &mut Self::Tx {
3867 &mut self.tx
3868 }
3869
3870 fn into_tx(self) -> Self::Tx {
3871 self.tx
3872 }
3873
3874 fn prune_modes_ref(&self) -> &PruneModes {
3875 self.prune_modes_ref()
3876 }
3877
3878 #[instrument(
3880 name = "DatabaseProvider::commit",
3881 level = "debug",
3882 target = "providers::db",
3883 skip_all
3884 )]
3885 fn commit(self) -> ProviderResult<()> {
3886 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3887 self.commit_unwind()?;
3888 } else {
3889 let mut timings = metrics::CommitTimings::default();
3891
3892 let start = Instant::now();
3893 self.static_file_provider.finalize()?;
3894 timings.sf = start.elapsed();
3895
3896 let start = Instant::now();
3897 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3898 for batch in batches {
3899 self.rocksdb_provider.commit_batch(batch)?;
3900 }
3901 timings.rocksdb = start.elapsed();
3902
3903 let start = Instant::now();
3904 self.tx.commit()?;
3905 timings.mdbx = start.elapsed();
3906
3907 self.metrics.record_commit(&timings);
3908 }
3909
3910 Ok(())
3911 }
3912}
3913
3914impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3915 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3916 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3917 }
3918}
3919
3920impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3921 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3922 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3923 }
3924}
3925
3926impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3927 fn cached_storage_settings(&self) -> StorageSettings {
3928 *self.storage_settings.read()
3929 }
3930
3931 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3932 *self.storage_settings.write() = settings;
3933 }
3934}
3935
3936impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3937 fn storage_path(&self) -> PathBuf {
3938 self.db_path.clone()
3939 }
3940}
3941
3942#[cfg(test)]
3943mod tests {
3944 use super::*;
3945 use crate::{
3946 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3947 BlockWriter,
3948 };
3949 use alloy_consensus::Header;
3950 use alloy_primitives::{
3951 map::{AddressMap, B256Map},
3952 U256,
3953 };
3954 use reth_chain_state::ExecutedBlock;
3955 use reth_db_api::models::StorageSettings;
3956 use reth_ethereum_primitives::Receipt;
3957 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3958 use reth_primitives_traits::SealedBlock;
3959 use reth_storage_api::MetadataWriter;
3960 use reth_testing_utils::generators::{self, random_block, BlockParams};
3961 use reth_trie::{
3962 HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3963 };
3964 use revm_database::BundleState;
3965 use revm_state::AccountInfo;
3966 use std::{sync::mpsc, time::Duration};
3967
3968 #[test]
3969 fn test_receipts_by_block_range_empty_range() {
3970 let factory = create_test_provider_factory();
3971 let provider = factory.provider().unwrap();
3972
3973 let start = 10u64;
3975 let end = 9u64;
3976 let result = provider.receipts_by_block_range(start..=end).unwrap();
3977 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3978 }
3979
3980 #[test]
3981 fn unwind_commit_waits_for_pre_commit_readers() {
3982 let factory = create_test_provider_factory();
3983
3984 let reader = factory.provider().unwrap();
3985 let provider_rw = factory.unwind_provider_rw().unwrap();
3986 provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3987 let (done_tx, done_rx) = mpsc::channel();
3988
3989 let handle = std::thread::spawn(move || {
3990 let result = provider_rw.commit();
3991 done_tx.send(result).unwrap();
3992 });
3993
3994 assert!(
3995 done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3996 "unwind commit should wait while an older read transaction is still open"
3997 );
3998
3999 drop(reader);
4000
4001 done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
4002 handle.join().unwrap();
4003 }
4004
4005 #[test]
4006 fn test_receipts_by_block_range_nonexistent_blocks() {
4007 let factory = create_test_provider_factory();
4008 let provider = factory.provider().unwrap();
4009
4010 let result = provider.receipts_by_block_range(10..=12).unwrap();
4012 assert_eq!(result, vec![vec![], vec![], vec![]]);
4013 }
4014
4015 #[test]
4016 fn test_receipts_by_block_range_single_block() {
4017 let factory = create_test_provider_factory();
4018 let data = BlockchainTestData::default();
4019
4020 let provider_rw = factory.provider_rw().unwrap();
4021 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4022 provider_rw
4023 .write_state(
4024 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4025 crate::OriginalValuesKnown::No,
4026 StateWriteConfig::default(),
4027 )
4028 .unwrap();
4029 provider_rw.insert_block(&data.blocks[0].0).unwrap();
4030 provider_rw
4031 .write_state(
4032 &data.blocks[0].1,
4033 crate::OriginalValuesKnown::No,
4034 StateWriteConfig::default(),
4035 )
4036 .unwrap();
4037 provider_rw.commit().unwrap();
4038
4039 let provider = factory.provider().unwrap();
4040 let result = provider.receipts_by_block_range(1..=1).unwrap();
4041
4042 assert_eq!(result.len(), 1);
4044 assert_eq!(result[0].len(), 1);
4045 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4046 }
4047
4048 #[test]
4049 fn test_receipts_by_block_range_multiple_blocks() {
4050 let factory = create_test_provider_factory();
4051 let data = BlockchainTestData::default();
4052
4053 let provider_rw = factory.provider_rw().unwrap();
4054 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4055 provider_rw
4056 .write_state(
4057 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4058 crate::OriginalValuesKnown::No,
4059 StateWriteConfig::default(),
4060 )
4061 .unwrap();
4062 for i in 0..3 {
4063 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4064 provider_rw
4065 .write_state(
4066 &data.blocks[i].1,
4067 crate::OriginalValuesKnown::No,
4068 StateWriteConfig::default(),
4069 )
4070 .unwrap();
4071 }
4072 provider_rw.commit().unwrap();
4073
4074 let provider = factory.provider().unwrap();
4075 let result = provider.receipts_by_block_range(1..=3).unwrap();
4076
4077 assert_eq!(result.len(), 3);
4079 for (i, block_receipts) in result.iter().enumerate() {
4080 assert_eq!(block_receipts.len(), 1);
4081 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4082 }
4083 }
4084
4085 #[test]
4086 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4087 let factory = create_test_provider_factory();
4088 let data = BlockchainTestData::default();
4089
4090 let provider_rw = factory.provider_rw().unwrap();
4091 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4092 provider_rw
4093 .write_state(
4094 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4095 crate::OriginalValuesKnown::No,
4096 StateWriteConfig::default(),
4097 )
4098 .unwrap();
4099
4100 for i in 0..3 {
4102 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4103 provider_rw
4104 .write_state(
4105 &data.blocks[i].1,
4106 crate::OriginalValuesKnown::No,
4107 StateWriteConfig::default(),
4108 )
4109 .unwrap();
4110 }
4111 provider_rw.commit().unwrap();
4112
4113 let provider = factory.provider().unwrap();
4114 let result = provider.receipts_by_block_range(1..=3).unwrap();
4115
4116 assert_eq!(result.len(), 3);
4118 for block_receipts in &result {
4119 assert_eq!(block_receipts.len(), 1);
4120 }
4121 }
4122
4123 #[test]
4124 fn test_receipts_by_block_range_partial_range() {
4125 let factory = create_test_provider_factory();
4126 let data = BlockchainTestData::default();
4127
4128 let provider_rw = factory.provider_rw().unwrap();
4129 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4130 provider_rw
4131 .write_state(
4132 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4133 crate::OriginalValuesKnown::No,
4134 StateWriteConfig::default(),
4135 )
4136 .unwrap();
4137 for i in 0..3 {
4138 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4139 provider_rw
4140 .write_state(
4141 &data.blocks[i].1,
4142 crate::OriginalValuesKnown::No,
4143 StateWriteConfig::default(),
4144 )
4145 .unwrap();
4146 }
4147 provider_rw.commit().unwrap();
4148
4149 let provider = factory.provider().unwrap();
4150
4151 let result = provider.receipts_by_block_range(2..=5).unwrap();
4153 assert_eq!(result.len(), 4);
4154
4155 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4162 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4163 }
4164
4165 #[test]
4166 fn test_receipts_by_block_range_all_empty_blocks() {
4167 let factory = create_test_provider_factory();
4168 let mut rng = generators::rng();
4169
4170 let mut blocks = Vec::new();
4172 for i in 0..3 {
4173 let block =
4174 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4175 blocks.push(block);
4176 }
4177
4178 let provider_rw = factory.provider_rw().unwrap();
4179 for block in blocks {
4180 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4181 }
4182 provider_rw.commit().unwrap();
4183
4184 let provider = factory.provider().unwrap();
4185 let result = provider.receipts_by_block_range(1..=3).unwrap();
4186
4187 assert_eq!(result.len(), 3);
4188 for block_receipts in result {
4189 assert_eq!(block_receipts.len(), 0);
4190 }
4191 }
4192
4193 #[test]
4194 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4195 let factory = create_test_provider_factory();
4196 let data = BlockchainTestData::default();
4197
4198 let provider_rw = factory.provider_rw().unwrap();
4199 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4200 provider_rw
4201 .write_state(
4202 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4203 crate::OriginalValuesKnown::No,
4204 StateWriteConfig::default(),
4205 )
4206 .unwrap();
4207 for i in 0..3 {
4208 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4209 provider_rw
4210 .write_state(
4211 &data.blocks[i].1,
4212 crate::OriginalValuesKnown::No,
4213 StateWriteConfig::default(),
4214 )
4215 .unwrap();
4216 }
4217 provider_rw.commit().unwrap();
4218
4219 let provider = factory.provider().unwrap();
4220
4221 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4223
4224 let mut individual_results = Vec::new();
4226 for block_num in 1..=3 {
4227 let receipts =
4228 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4229 individual_results.push(receipts);
4230 }
4231
4232 assert_eq!(range_result, individual_results);
4233 }
4234
4235 #[test]
4236 fn test_receipts_by_block_returns_none_for_missing_unpruned_receipts() {
4237 let factory = create_test_provider_factory();
4238 let data = BlockchainTestData::default();
4239
4240 let provider_rw = factory.provider_rw().unwrap();
4241 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4242 provider_rw.insert_block(&data.blocks[0].0).unwrap();
4243 provider_rw.commit().unwrap();
4244
4245 let provider = factory.provider().unwrap();
4246 assert!(provider.receipts_by_block(1.into()).unwrap().is_none());
4247 }
4248
4249 #[test]
4250 fn test_write_trie_updates_sorted() {
4251 use reth_trie::{
4252 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4253 BranchNodeCompact, StorageTrieEntry,
4254 };
4255
4256 let factory = create_test_provider_factory();
4257 let provider_rw = factory.provider_rw().unwrap();
4258
4259 {
4261 let tx = provider_rw.tx_ref();
4262 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4263
4264 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4266 cursor
4267 .upsert(
4268 to_delete,
4269 &BranchNodeCompact::new(
4270 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4274 None,
4275 ),
4276 )
4277 .unwrap();
4278
4279 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4281 cursor
4282 .upsert(
4283 to_update,
4284 &BranchNodeCompact::new(
4285 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4289 None,
4290 ),
4291 )
4292 .unwrap();
4293 }
4294
4295 let storage_address1 = B256::from([1u8; 32]);
4297 let storage_address2 = B256::from([2u8; 32]);
4298 {
4299 let tx = provider_rw.tx_ref();
4300 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4301
4302 storage_cursor
4304 .upsert(
4305 storage_address1,
4306 &StorageTrieEntry {
4307 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4308 node: BranchNodeCompact::new(
4309 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4311 0b0000_0000_0000_0000,
4312 vec![],
4313 None,
4314 ),
4315 },
4316 )
4317 .unwrap();
4318
4319 storage_cursor
4321 .upsert(
4322 storage_address2,
4323 &StorageTrieEntry {
4324 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4325 node: BranchNodeCompact::new(
4326 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4328 0b0000_0000_0000_0000,
4329 vec![],
4330 None,
4331 ),
4332 },
4333 )
4334 .unwrap();
4335 storage_cursor
4336 .upsert(
4337 storage_address2,
4338 &StorageTrieEntry {
4339 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4340 node: BranchNodeCompact::new(
4341 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4343 0b0000_0000_0000_0000,
4344 vec![],
4345 None,
4346 ),
4347 },
4348 )
4349 .unwrap();
4350 }
4351
4352 let account_nodes = vec![
4354 (
4355 Nibbles::from_nibbles([0x1, 0x2]),
4356 Some(BranchNodeCompact::new(
4357 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4361 None,
4362 )),
4363 ),
4364 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4366 Nibbles::from_nibbles([0x5, 0x6]),
4367 Some(BranchNodeCompact::new(
4368 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4372 None,
4373 )),
4374 ),
4375 ];
4376
4377 let storage_trie1 = StorageTrieUpdatesSorted {
4379 is_deleted: false,
4380 storage_nodes: vec![
4381 (
4382 Nibbles::from_nibbles([0x1, 0x0]),
4383 Some(BranchNodeCompact::new(
4384 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4388 None,
4389 )),
4390 ),
4391 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4393 };
4394
4395 let storage_trie2 = StorageTrieUpdatesSorted {
4396 is_deleted: true, storage_nodes: vec![],
4398 };
4399
4400 let mut storage_tries = B256Map::default();
4401 storage_tries.insert(storage_address1, storage_trie1);
4402 storage_tries.insert(storage_address2, storage_trie2);
4403
4404 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4405
4406 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4408
4409 assert_eq!(num_entries, 5);
4412
4413 let tx = provider_rw.tx_ref();
4415 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4416
4417 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4419 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4420 assert!(entry1.is_some(), "Updated account node should exist");
4421 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4422 assert_eq!(
4423 entry1.unwrap().1.state_mask,
4424 expected_mask,
4425 "Account node should have updated state_mask"
4426 );
4427
4428 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4430 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4431 assert!(entry2.is_none(), "Deleted account node should not exist");
4432
4433 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4435 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4436 assert!(entry3.is_some(), "New account node should exist");
4437
4438 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4440
4441 let storage_entries1: Vec<_> = storage_cursor
4443 .walk_dup(Some(storage_address1), None)
4444 .unwrap()
4445 .collect::<Result<Vec<_>, _>>()
4446 .unwrap();
4447 assert_eq!(
4448 storage_entries1.len(),
4449 1,
4450 "Storage address1 should have 1 entry after deletion"
4451 );
4452 assert_eq!(
4453 storage_entries1[0].1.nibbles.0,
4454 Nibbles::from_nibbles([0x1, 0x0]),
4455 "Remaining entry should be [0x1, 0x0]"
4456 );
4457
4458 let storage_entries2: Vec<_> = storage_cursor
4460 .walk_dup(Some(storage_address2), None)
4461 .unwrap()
4462 .collect::<Result<Vec<_>, _>>()
4463 .unwrap();
4464 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4465
4466 provider_rw.commit().unwrap();
4467 }
4468
4469 #[test]
4470 fn test_prunable_receipts_logic() {
4471 let insert_blocks =
4472 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4473 let mut rng = generators::rng();
4474 for block_num in 0..=tip_block {
4475 let block = random_block(
4476 &mut rng,
4477 block_num,
4478 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4479 );
4480 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4481 }
4482 };
4483
4484 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4485 let outcome = ExecutionOutcome {
4486 first_block: block,
4487 receipts: vec![vec![Receipt {
4488 tx_type: Default::default(),
4489 success: true,
4490 cumulative_gas_used: block, logs: vec![],
4492 }]],
4493 ..Default::default()
4494 };
4495 provider_rw
4496 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4497 .unwrap();
4498 provider_rw.commit().unwrap();
4499 };
4500
4501 {
4503 let factory = create_test_provider_factory();
4504 let storage_settings = StorageSettings::v1();
4505 factory.set_storage_settings_cache(storage_settings);
4506 let factory = factory.with_prune_modes(PruneModes {
4507 receipts: Some(PruneMode::Before(100)),
4508 ..Default::default()
4509 });
4510
4511 let tip_block = 200u64;
4512 let first_block = 1u64;
4513
4514 let provider_rw = factory.provider_rw().unwrap();
4516 insert_blocks(&provider_rw, tip_block, 1);
4517 provider_rw.commit().unwrap();
4518
4519 write_receipts(
4520 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4521 first_block,
4522 );
4523 write_receipts(
4524 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4525 tip_block - 1,
4526 );
4527
4528 let provider = factory.provider().unwrap();
4529
4530 assert!(provider.receipts_by_block(0.into()).unwrap().is_none());
4531 assert!(provider
4532 .receipts_by_block((tip_block - 1).into())
4533 .unwrap()
4534 .is_some_and(|r| r.len() == 1));
4535 }
4536
4537 {
4539 let factory = create_test_provider_factory();
4540 let storage_settings = StorageSettings::v2();
4541 factory.set_storage_settings_cache(storage_settings);
4542 let factory = factory.with_prune_modes(PruneModes {
4543 receipts: Some(PruneMode::Before(2)),
4544 ..Default::default()
4545 });
4546
4547 let tip_block = 200u64;
4548
4549 let provider_rw = factory.provider_rw().unwrap();
4551 insert_blocks(&provider_rw, tip_block, 1);
4552 provider_rw.commit().unwrap();
4553
4554 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4556 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4557
4558 assert!(factory
4559 .static_file_provider()
4560 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4561 .is_none(),);
4562 assert!(factory
4563 .static_file_provider()
4564 .get_highest_static_file_block(StaticFileSegment::Receipts)
4565 .is_some_and(|b| b == 1),);
4566
4567 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4570 assert!(factory
4571 .static_file_provider()
4572 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4573 .is_some_and(|num| num == 2),);
4574
4575 let factory = factory.with_prune_modes(PruneModes {
4579 receipts: Some(PruneMode::Before(100)),
4580 ..Default::default()
4581 });
4582 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4583 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4584 write_receipts(provider_rw, 3);
4585
4586 let provider = factory.provider().unwrap();
4591 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4592 for (num, has_receipt) in [(0, false), (1, false), (2, true), (3, true)] {
4593 let receipts = provider.receipts_by_block(num.into()).unwrap();
4594 if has_receipt {
4595 assert!(receipts.is_some_and(|r| r.len() == 1));
4596 } else {
4597 assert!(receipts.is_none());
4598 }
4599
4600 let receipt = provider.receipt(num).unwrap();
4601 if has_receipt {
4602 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4603 } else {
4604 assert!(receipt.is_none());
4605 }
4606 }
4607 }
4608 }
4609
4610 #[test]
4611 fn test_try_into_history_rejects_unexecuted_blocks() {
4612 use reth_storage_api::TryIntoHistoricalStateProvider;
4613
4614 let factory = create_test_provider_factory();
4615
4616 let data = BlockchainTestData::default();
4618 let provider_rw = factory.provider_rw().unwrap();
4619 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4620 provider_rw
4621 .write_state(
4622 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4623 crate::OriginalValuesKnown::No,
4624 StateWriteConfig::default(),
4625 )
4626 .unwrap();
4627 provider_rw.commit().unwrap();
4628
4629 let provider = factory.provider().unwrap();
4631
4632 let result = provider.try_into_history_at_block(0);
4634 assert!(result.is_ok(), "Block 0 should be available");
4635
4636 let provider = factory.provider().unwrap();
4638 let result = provider.try_into_history_at_block(100);
4639
4640 match result {
4642 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4643 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4644 Ok(_) => panic!("Expected error, got Ok"),
4645 }
4646 }
4647
4648 #[test]
4649 fn test_unwind_storage_hashing_with_hashed_state() {
4650 let factory = create_test_provider_factory();
4651 let storage_settings = StorageSettings::v2();
4652 factory.set_storage_settings_cache(storage_settings);
4653
4654 let address = Address::random();
4655 let hashed_address = keccak256(address);
4656
4657 let plain_slot = B256::random();
4658 let hashed_slot = keccak256(plain_slot);
4659
4660 let current_value = U256::from(100);
4661 let old_value = U256::from(42);
4662
4663 let provider_rw = factory.provider_rw().unwrap();
4664 provider_rw
4665 .tx
4666 .cursor_dup_write::<tables::HashedStorages>()
4667 .unwrap()
4668 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4669 .unwrap();
4670
4671 let changesets = vec![(
4672 BlockNumberAddress((1, address)),
4673 StorageEntry { key: plain_slot, value: old_value },
4674 )];
4675
4676 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4677
4678 assert_eq!(result.len(), 1);
4679 assert!(result.contains_key(&hashed_address));
4680 assert!(result[&hashed_address].contains(&hashed_slot));
4681
4682 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4683 let entry = cursor
4684 .seek_by_key_subkey(hashed_address, hashed_slot)
4685 .unwrap()
4686 .expect("entry should exist");
4687 assert_eq!(entry.key, hashed_slot);
4688 assert_eq!(entry.value, old_value);
4689 }
4690
4691 #[test]
4692 fn test_write_and_remove_state_roundtrip_legacy() {
4693 let factory = create_test_provider_factory();
4694 let storage_settings = StorageSettings::v1();
4695 assert!(!storage_settings.use_hashed_state());
4696 factory.set_storage_settings_cache(storage_settings);
4697
4698 let address = Address::with_last_byte(1);
4699 let hashed_address = keccak256(address);
4700 let slot = U256::from(5);
4701 let slot_key = B256::from(slot);
4702 let hashed_slot = keccak256(slot_key);
4703
4704 let mut rng = generators::rng();
4705 let block0 =
4706 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4707 let block1 =
4708 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4709
4710 {
4711 let provider_rw = factory.provider_rw().unwrap();
4712 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4713 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4714 provider_rw
4715 .tx
4716 .cursor_write::<tables::PlainAccountState>()
4717 .unwrap()
4718 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4719 .unwrap();
4720 provider_rw.commit().unwrap();
4721 }
4722
4723 let provider_rw = factory.provider_rw().unwrap();
4724
4725 let mut state_init: BundleStateInit = AddressMap::default();
4726 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4727 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4728 state_init.insert(
4729 address,
4730 (
4731 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4732 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4733 storage_map,
4734 ),
4735 );
4736
4737 let mut reverts_init: RevertsInit = HashMap::default();
4738 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4739 block_reverts.insert(
4740 address,
4741 (
4742 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4743 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4744 ),
4745 );
4746 reverts_init.insert(1, block_reverts);
4747
4748 let execution_outcome =
4749 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4750
4751 provider_rw
4752 .write_state(
4753 &execution_outcome,
4754 OriginalValuesKnown::Yes,
4755 StateWriteConfig {
4756 write_receipts: false,
4757 write_account_changesets: true,
4758 write_storage_changesets: true,
4759 },
4760 )
4761 .unwrap();
4762
4763 let hashed_state =
4764 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4765 provider_rw.write_hashed_state(&hashed_state).unwrap();
4766
4767 let account = provider_rw
4768 .tx
4769 .cursor_read::<tables::PlainAccountState>()
4770 .unwrap()
4771 .seek_exact(address)
4772 .unwrap()
4773 .unwrap()
4774 .1;
4775 assert_eq!(account.nonce, 1);
4776
4777 let storage_entry = provider_rw
4778 .tx
4779 .cursor_dup_read::<tables::PlainStorageState>()
4780 .unwrap()
4781 .seek_by_key_subkey(address, slot_key)
4782 .unwrap()
4783 .unwrap();
4784 assert_eq!(storage_entry.key, slot_key);
4785 assert_eq!(storage_entry.value, U256::from(10));
4786
4787 let hashed_entry = provider_rw
4788 .tx
4789 .cursor_dup_read::<tables::HashedStorages>()
4790 .unwrap()
4791 .seek_by_key_subkey(hashed_address, hashed_slot)
4792 .unwrap()
4793 .unwrap();
4794 assert_eq!(hashed_entry.key, hashed_slot);
4795 assert_eq!(hashed_entry.value, U256::from(10));
4796
4797 let account_cs_entries = provider_rw
4798 .tx
4799 .cursor_dup_read::<tables::AccountChangeSets>()
4800 .unwrap()
4801 .walk(Some(1))
4802 .unwrap()
4803 .collect::<Result<Vec<_>, _>>()
4804 .unwrap();
4805 assert!(!account_cs_entries.is_empty());
4806
4807 let storage_cs_entries = provider_rw
4808 .tx
4809 .cursor_read::<tables::StorageChangeSets>()
4810 .unwrap()
4811 .walk(Some(BlockNumberAddress((1, address))))
4812 .unwrap()
4813 .collect::<Result<Vec<_>, _>>()
4814 .unwrap();
4815 assert!(!storage_cs_entries.is_empty());
4816 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4817
4818 provider_rw.remove_state_above(0).unwrap();
4819
4820 let restored_account = provider_rw
4821 .tx
4822 .cursor_read::<tables::PlainAccountState>()
4823 .unwrap()
4824 .seek_exact(address)
4825 .unwrap()
4826 .unwrap()
4827 .1;
4828 assert_eq!(restored_account.nonce, 0);
4829
4830 let storage_gone = provider_rw
4831 .tx
4832 .cursor_dup_read::<tables::PlainStorageState>()
4833 .unwrap()
4834 .seek_by_key_subkey(address, slot_key)
4835 .unwrap();
4836 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4837
4838 let account_cs_after = provider_rw
4839 .tx
4840 .cursor_dup_read::<tables::AccountChangeSets>()
4841 .unwrap()
4842 .walk(Some(1))
4843 .unwrap()
4844 .collect::<Result<Vec<_>, _>>()
4845 .unwrap();
4846 assert!(account_cs_after.is_empty());
4847
4848 let storage_cs_after = provider_rw
4849 .tx
4850 .cursor_read::<tables::StorageChangeSets>()
4851 .unwrap()
4852 .walk(Some(BlockNumberAddress((1, address))))
4853 .unwrap()
4854 .collect::<Result<Vec<_>, _>>()
4855 .unwrap();
4856 assert!(storage_cs_after.is_empty());
4857 }
4858
4859 #[test]
4860 fn test_unwind_storage_hashing_legacy() {
4861 let factory = create_test_provider_factory();
4862 let storage_settings = StorageSettings::v1();
4863 assert!(!storage_settings.use_hashed_state());
4864 factory.set_storage_settings_cache(storage_settings);
4865
4866 let address = Address::random();
4867 let hashed_address = keccak256(address);
4868
4869 let plain_slot = B256::random();
4870 let hashed_slot = keccak256(plain_slot);
4871
4872 let current_value = U256::from(100);
4873 let old_value = U256::from(42);
4874
4875 let provider_rw = factory.provider_rw().unwrap();
4876 provider_rw
4877 .tx
4878 .cursor_dup_write::<tables::HashedStorages>()
4879 .unwrap()
4880 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4881 .unwrap();
4882
4883 let changesets = vec![(
4884 BlockNumberAddress((1, address)),
4885 StorageEntry { key: plain_slot, value: old_value },
4886 )];
4887
4888 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4889
4890 assert_eq!(result.len(), 1);
4891 assert!(result.contains_key(&hashed_address));
4892 assert!(result[&hashed_address].contains(&hashed_slot));
4893
4894 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4895 let entry = cursor
4896 .seek_by_key_subkey(hashed_address, hashed_slot)
4897 .unwrap()
4898 .expect("entry should exist");
4899 assert_eq!(entry.key, hashed_slot);
4900 assert_eq!(entry.value, old_value);
4901 }
4902
4903 #[test]
4904 fn test_write_state_and_historical_read_hashed() {
4905 use reth_storage_api::StateProvider;
4906 use reth_trie::{HashedPostState, KeccakKeyHasher};
4907 use revm_database::BundleState;
4908 use revm_state::AccountInfo;
4909
4910 let factory = create_test_provider_factory();
4911 factory.set_storage_settings_cache(StorageSettings::v2());
4912
4913 let address = Address::with_last_byte(1);
4914 let slot = U256::from(5);
4915 let slot_key = B256::from(slot);
4916 let hashed_address = keccak256(address);
4917 let hashed_slot = keccak256(slot_key);
4918
4919 {
4920 let sf = factory.static_file_provider();
4921 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4922 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4923 hw.append_header(&h0, &B256::ZERO).unwrap();
4924 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4925 hw.append_header(&h1, &B256::ZERO).unwrap();
4926 hw.commit().unwrap();
4927
4928 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4929 aw.append_account_changeset(vec![], 0).unwrap();
4930 aw.commit().unwrap();
4931
4932 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4933 sw.append_storage_changeset(vec![], 0).unwrap();
4934 sw.commit().unwrap();
4935 }
4936
4937 let provider_rw = factory.provider_rw().unwrap();
4938
4939 let bundle = BundleState::builder(1..=1)
4940 .state_present_account_info(
4941 address,
4942 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4943 )
4944 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4945 .revert_account_info(1, address, Some(None))
4946 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4947 .build();
4948
4949 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4950
4951 provider_rw
4952 .tx
4953 .put::<tables::BlockBodyIndices>(
4954 1,
4955 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4956 )
4957 .unwrap();
4958
4959 provider_rw
4960 .write_state(
4961 &execution_outcome,
4962 OriginalValuesKnown::Yes,
4963 StateWriteConfig {
4964 write_receipts: false,
4965 write_account_changesets: true,
4966 write_storage_changesets: true,
4967 },
4968 )
4969 .unwrap();
4970
4971 let hashed_state =
4972 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4973 provider_rw.write_hashed_state(&hashed_state).unwrap();
4974
4975 let plain_storage_entries = provider_rw
4976 .tx
4977 .cursor_dup_read::<tables::PlainStorageState>()
4978 .unwrap()
4979 .walk(None)
4980 .unwrap()
4981 .collect::<Result<Vec<_>, _>>()
4982 .unwrap();
4983 assert!(plain_storage_entries.is_empty());
4984
4985 let hashed_entry = provider_rw
4986 .tx
4987 .cursor_dup_read::<tables::HashedStorages>()
4988 .unwrap()
4989 .seek_by_key_subkey(hashed_address, hashed_slot)
4990 .unwrap()
4991 .unwrap();
4992 assert_eq!(hashed_entry.key, hashed_slot);
4993 assert_eq!(hashed_entry.value, U256::from(10));
4994
4995 provider_rw.static_file_provider().commit().unwrap();
4996
4997 let sf = factory.static_file_provider();
4998 let storage_cs = sf.storage_changeset(1).unwrap();
4999 assert!(!storage_cs.is_empty());
5000 assert_eq!(storage_cs[0].1.key, slot_key);
5001
5002 let account_cs = sf.account_block_changeset(1).unwrap();
5003 assert!(!account_cs.is_empty());
5004 assert_eq!(account_cs[0].address, address);
5005
5006 let historical_value =
5007 HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
5008 .storage(address, slot_key)
5009 .unwrap();
5010 assert_eq!(historical_value, None);
5011 }
5012
5013 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
5014 enum StorageMode {
5015 V1,
5016 V2,
5017 }
5018
5019 fn run_save_blocks_and_verify(mode: StorageMode) {
5020 use alloy_primitives::map::{FbBuildHasher, HashMap};
5021
5022 let factory = create_test_provider_factory();
5023
5024 match mode {
5025 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
5026 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
5027 }
5028
5029 let num_blocks = 3u64;
5030 let accounts_per_block = 5usize;
5031 let slots_per_account = 3usize;
5032
5033 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
5034 SealedHeader::new(
5035 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
5036 B256::ZERO,
5037 ),
5038 Default::default(),
5039 );
5040
5041 let genesis_executed = ExecutedBlock::new(
5042 Arc::new(genesis.try_recover().unwrap()),
5043 Arc::new(BlockExecutionOutput {
5044 result: BlockExecutionResult {
5045 receipts: vec![],
5046 requests: Default::default(),
5047 gas_used: 0,
5048 blob_gas_used: 0,
5049 },
5050 state: Default::default(),
5051 }),
5052 ComputedTrieData::default(),
5053 );
5054 let provider_rw = factory.provider_rw().unwrap();
5055 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5056 provider_rw.commit().unwrap();
5057
5058 let mut blocks: Vec<ExecutedBlock> = Vec::new();
5059 let mut parent_hash = B256::ZERO;
5060
5061 for block_num in 1..=num_blocks {
5062 let mut builder = BundleState::builder(block_num..=block_num);
5063
5064 for acct_idx in 0..accounts_per_block {
5065 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5066 let info = AccountInfo {
5067 nonce: block_num,
5068 balance: U256::from(block_num * 100 + acct_idx as u64),
5069 ..Default::default()
5070 };
5071
5072 let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5073 slots_per_account as u64)
5074 .map(|s| {
5075 (
5076 U256::from(s + acct_idx as u64 * 100),
5077 (U256::ZERO, U256::from(block_num * 1000 + s)),
5078 )
5079 })
5080 .collect();
5081
5082 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5083 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5084 .collect();
5085
5086 builder = builder
5087 .state_present_account_info(address, info)
5088 .revert_account_info(block_num, address, Some(None))
5089 .state_storage(address, storage)
5090 .revert_storage(block_num, address, revert_storage);
5091 }
5092
5093 let bundle = builder.build();
5094
5095 let hashed_state =
5096 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5097
5098 let header = Header {
5099 number: block_num,
5100 parent_hash,
5101 difficulty: U256::from(1),
5102 ..Default::default()
5103 };
5104 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5105 header,
5106 Default::default(),
5107 );
5108 parent_hash = block.hash();
5109
5110 let executed = ExecutedBlock::new(
5111 Arc::new(block.try_recover().unwrap()),
5112 Arc::new(BlockExecutionOutput {
5113 result: BlockExecutionResult {
5114 receipts: vec![],
5115 requests: Default::default(),
5116 gas_used: 0,
5117 blob_gas_used: 0,
5118 },
5119 state: bundle,
5120 }),
5121 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5122 );
5123 blocks.push(executed);
5124 }
5125
5126 let provider_rw = factory.provider_rw().unwrap();
5127 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5128 provider_rw.commit().unwrap();
5129
5130 let provider = factory.provider().unwrap();
5131
5132 for block_num in 1..=num_blocks {
5133 for acct_idx in 0..accounts_per_block {
5134 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5135 let hashed_address = keccak256(address);
5136
5137 let ha_entry = provider
5138 .tx_ref()
5139 .cursor_read::<tables::HashedAccounts>()
5140 .unwrap()
5141 .seek_exact(hashed_address)
5142 .unwrap();
5143 assert!(
5144 ha_entry.is_some(),
5145 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5146 );
5147
5148 for s in 1..=slots_per_account as u64 {
5149 let slot = U256::from(s + acct_idx as u64 * 100);
5150 let slot_key = B256::from(slot);
5151 let hashed_slot = keccak256(slot_key);
5152
5153 let hs_entry = provider
5154 .tx_ref()
5155 .cursor_dup_read::<tables::HashedStorages>()
5156 .unwrap()
5157 .seek_by_key_subkey(hashed_address, hashed_slot)
5158 .unwrap();
5159 assert!(
5160 hs_entry.is_some(),
5161 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5162 );
5163 let entry = hs_entry.unwrap();
5164 assert_eq!(entry.key, hashed_slot);
5165 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5166 }
5167 }
5168 }
5169
5170 for block_num in 1..=num_blocks {
5171 let header = provider.header_by_number(block_num).unwrap();
5172 assert!(header.is_some(), "Header missing for block {block_num}");
5173
5174 let indices = provider.block_body_indices(block_num).unwrap();
5175 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5176 }
5177
5178 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5179 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5180
5181 if mode == StorageMode::V2 {
5182 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5183 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5184
5185 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5186 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5187
5188 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5189 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5190
5191 provider.static_file_provider().commit().unwrap();
5192 let sf = factory.static_file_provider();
5193
5194 for block_num in 1..=num_blocks {
5195 let account_cs = sf.account_block_changeset(block_num).unwrap();
5196 assert!(
5197 !account_cs.is_empty(),
5198 "v2: static file AccountChangeSets should exist for block {block_num}"
5199 );
5200
5201 let storage_cs = sf.storage_changeset(block_num).unwrap();
5202 assert!(
5203 !storage_cs.is_empty(),
5204 "v2: static file StorageChangeSets should exist for block {block_num}"
5205 );
5206
5207 for (_, entry) in &storage_cs {
5208 assert!(
5209 entry.key != keccak256(entry.key),
5210 "v2: static file storage changeset should have plain slot keys"
5211 );
5212 }
5213 }
5214
5215 let rocksdb = factory.rocksdb_provider();
5216 for block_num in 1..=num_blocks {
5217 for acct_idx in 0..accounts_per_block {
5218 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5219 let shards = rocksdb.account_history_shards(address).unwrap();
5220 assert!(
5221 !shards.is_empty(),
5222 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5223 );
5224
5225 for s in 1..=slots_per_account as u64 {
5226 let slot = U256::from(s + acct_idx as u64 * 100);
5227 let slot_key = B256::from(slot);
5228 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5229 assert!(
5230 !shards.is_empty(),
5231 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5232 );
5233 }
5234 }
5235 }
5236 } else {
5237 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5238 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5239
5240 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5241 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5242
5243 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5244 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5245
5246 for block_num in 1..=num_blocks {
5247 let storage_entries: Vec<_> = provider
5248 .tx_ref()
5249 .cursor_dup_read::<tables::StorageChangeSets>()
5250 .unwrap()
5251 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5252 .unwrap()
5253 .collect::<Result<Vec<_>, _>>()
5254 .unwrap();
5255 assert!(
5256 !storage_entries.is_empty(),
5257 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5258 );
5259
5260 for (_, entry) in &storage_entries {
5261 let slot_key = B256::from(entry.key);
5262 assert!(
5263 slot_key != keccak256(slot_key),
5264 "v1: storage changeset keys should be plain (not hashed)"
5265 );
5266 }
5267 }
5268
5269 let mdbx_account_history =
5270 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5271 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5272
5273 let mdbx_storage_history =
5274 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5275 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5276 }
5277 }
5278
5279 #[test]
5280 fn test_save_blocks_v1_table_assertions() {
5281 run_save_blocks_and_verify(StorageMode::V1);
5282 }
5283
5284 #[test]
5285 fn test_save_blocks_v2_table_assertions() {
5286 run_save_blocks_and_verify(StorageMode::V2);
5287 }
5288
5289 #[test]
5290 fn test_write_and_remove_state_roundtrip_v2() {
5291 let factory = create_test_provider_factory();
5292 let storage_settings = StorageSettings::v2();
5293 assert!(storage_settings.use_hashed_state());
5294 factory.set_storage_settings_cache(storage_settings);
5295
5296 let address = Address::with_last_byte(1);
5297 let hashed_address = keccak256(address);
5298 let slot = U256::from(5);
5299 let slot_key = B256::from(slot);
5300 let hashed_slot = keccak256(slot_key);
5301
5302 {
5303 let sf = factory.static_file_provider();
5304 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5305 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5306 hw.append_header(&h0, &B256::ZERO).unwrap();
5307 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5308 hw.append_header(&h1, &B256::ZERO).unwrap();
5309 hw.commit().unwrap();
5310
5311 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5312 aw.append_account_changeset(vec![], 0).unwrap();
5313 aw.commit().unwrap();
5314
5315 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5316 sw.append_storage_changeset(vec![], 0).unwrap();
5317 sw.commit().unwrap();
5318 }
5319
5320 {
5321 let provider_rw = factory.provider_rw().unwrap();
5322 provider_rw
5323 .tx
5324 .put::<tables::BlockBodyIndices>(
5325 0,
5326 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5327 )
5328 .unwrap();
5329 provider_rw
5330 .tx
5331 .put::<tables::BlockBodyIndices>(
5332 1,
5333 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5334 )
5335 .unwrap();
5336 provider_rw
5337 .tx
5338 .cursor_write::<tables::HashedAccounts>()
5339 .unwrap()
5340 .upsert(
5341 hashed_address,
5342 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5343 )
5344 .unwrap();
5345 provider_rw.commit().unwrap();
5346 }
5347
5348 let provider_rw = factory.provider_rw().unwrap();
5349
5350 let bundle = BundleState::builder(1..=1)
5351 .state_present_account_info(
5352 address,
5353 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5354 )
5355 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5356 .revert_account_info(1, address, Some(None))
5357 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5358 .build();
5359
5360 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5361
5362 provider_rw
5363 .write_state(
5364 &execution_outcome,
5365 OriginalValuesKnown::Yes,
5366 StateWriteConfig {
5367 write_receipts: false,
5368 write_account_changesets: true,
5369 write_storage_changesets: true,
5370 },
5371 )
5372 .unwrap();
5373
5374 let hashed_state =
5375 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5376 provider_rw.write_hashed_state(&hashed_state).unwrap();
5377
5378 let hashed_account = provider_rw
5379 .tx
5380 .cursor_read::<tables::HashedAccounts>()
5381 .unwrap()
5382 .seek_exact(hashed_address)
5383 .unwrap()
5384 .unwrap()
5385 .1;
5386 assert_eq!(hashed_account.nonce, 1);
5387
5388 let hashed_entry = provider_rw
5389 .tx
5390 .cursor_dup_read::<tables::HashedStorages>()
5391 .unwrap()
5392 .seek_by_key_subkey(hashed_address, hashed_slot)
5393 .unwrap()
5394 .unwrap();
5395 assert_eq!(hashed_entry.key, hashed_slot);
5396 assert_eq!(hashed_entry.value, U256::from(10));
5397
5398 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5399 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5400
5401 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5402 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5403
5404 provider_rw.static_file_provider().commit().unwrap();
5405
5406 let sf = factory.static_file_provider();
5407 let storage_cs = sf.storage_changeset(1).unwrap();
5408 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5409 assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5410
5411 provider_rw.remove_state_above(0).unwrap();
5412
5413 let restored_account = provider_rw
5414 .tx
5415 .cursor_read::<tables::HashedAccounts>()
5416 .unwrap()
5417 .seek_exact(hashed_address)
5418 .unwrap();
5419 assert!(
5420 restored_account.is_none(),
5421 "v2: account should be removed (didn't exist before block 1)"
5422 );
5423
5424 let storage_gone = provider_rw
5425 .tx
5426 .cursor_dup_read::<tables::HashedStorages>()
5427 .unwrap()
5428 .seek_by_key_subkey(hashed_address, hashed_slot)
5429 .unwrap();
5430 assert!(
5431 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5432 "v2: storage should be reverted (removed or different key)"
5433 );
5434
5435 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5436 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5437
5438 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5439 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5440 }
5441
5442 #[test]
5443 fn test_unwind_storage_history_indices_v2() {
5444 let factory = create_test_provider_factory();
5445 factory.set_storage_settings_cache(StorageSettings::v2());
5446
5447 let address = Address::with_last_byte(1);
5448 let slot_key = B256::from(U256::from(42));
5449
5450 {
5451 let rocksdb = factory.rocksdb_provider();
5452 let mut batch = rocksdb.batch();
5453 batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5454 batch.commit().unwrap();
5455
5456 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5457 assert!(!shards.is_empty(), "history should be written to rocksdb");
5458 }
5459
5460 let provider_rw = factory.provider_rw().unwrap();
5461
5462 let changesets = vec![
5463 (
5464 BlockNumberAddress((7, address)),
5465 StorageEntry { key: slot_key, value: U256::from(5) },
5466 ),
5467 (
5468 BlockNumberAddress((10, address)),
5469 StorageEntry { key: slot_key, value: U256::from(8) },
5470 ),
5471 ];
5472
5473 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5474 assert_eq!(count, 2);
5475
5476 provider_rw.commit().unwrap();
5477
5478 let rocksdb = factory.rocksdb_provider();
5479 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5480
5481 assert!(
5482 !shards.is_empty(),
5483 "history shards should still exist with block 3 after partial unwind"
5484 );
5485
5486 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5487 assert!(all_blocks.contains(&3), "block 3 should remain");
5488 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5489 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5490 }
5491}