1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::Database,
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55 SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, StoragePath,
65 StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77 cmp::Ordering,
78 collections::{BTreeMap, BTreeSet},
79 fmt::Debug,
80 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81 path::PathBuf,
82 sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89 #[default]
91 Normal,
92 Unwind,
95}
96
97impl CommitOrder {
98 pub const fn is_unwind(&self) -> bool {
100 matches!(self, Self::Unwind)
101 }
102}
103
104pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113 pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119 fn deref(&self) -> &Self::Target {
120 &self.0
121 }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.0
127 }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131 for DatabaseProviderRW<DB, N>
132{
133 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134 &self.0
135 }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139 pub fn commit(self) -> ProviderResult<()> {
141 self.0.commit()
142 }
143
144 pub fn into_tx(self) -> <DB as Database>::TXMut {
146 self.0.into_tx()
147 }
148
149 #[cfg(any(test, feature = "test-utils"))]
151 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152 self.0.minimum_pruning_distance = distance;
153 self
154 }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158 for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161 provider.0
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168 Full,
171 BlocksOnly,
175}
176
177impl SaveBlocksMode {
178 pub const fn with_state(self) -> bool {
180 matches!(self, Self::Full)
181 }
182}
183
184pub struct DatabaseProvider<TX, N: NodeTypes> {
187 tx: TX,
189 chain_spec: Arc<N::ChainSpec>,
191 static_file_provider: StaticFileProvider<N::Primitives>,
193 prune_modes: PruneModes,
195 storage: Arc<N::Storage>,
197 storage_settings: Arc<RwLock<StorageSettings>>,
199 rocksdb_provider: RocksDBProvider,
201 changeset_cache: ChangesetCache,
203 runtime: reth_tasks::Runtime,
205 db_path: PathBuf,
207 #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
209 pending_rocksdb_batches: PendingRocksDBBatches,
210 commit_order: CommitOrder,
212 minimum_pruning_distance: u64,
214 metrics: metrics::DatabaseProviderMetrics,
216}
217
218impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 let mut s = f.debug_struct("DatabaseProvider");
221 s.field("tx", &self.tx)
222 .field("chain_spec", &self.chain_spec)
223 .field("static_file_provider", &self.static_file_provider)
224 .field("prune_modes", &self.prune_modes)
225 .field("storage", &self.storage)
226 .field("storage_settings", &self.storage_settings)
227 .field("rocksdb_provider", &self.rocksdb_provider)
228 .field("changeset_cache", &self.changeset_cache)
229 .field("runtime", &self.runtime)
230 .field("pending_rocksdb_batches", &"<pending batches>")
231 .field("commit_order", &self.commit_order)
232 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
233 .finish()
234 }
235}
236
237impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
238 pub const fn prune_modes_ref(&self) -> &PruneModes {
240 &self.prune_modes
241 }
242}
243
244impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
245 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
247 trace!(target: "providers::db", "Returning latest state provider");
248 Box::new(LatestStateProviderRef::new(self))
249 }
250
251 pub fn history_by_block_hash<'a>(
253 &'a self,
254 block_hash: BlockHash,
255 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
256 let mut block_number =
257 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
258 if block_number == self.best_block_number().unwrap_or_default() &&
259 block_number == self.last_block_number().unwrap_or_default()
260 {
261 return Ok(Box::new(LatestStateProviderRef::new(self)))
262 }
263
264 block_number += 1;
266
267 let account_history_prune_checkpoint =
268 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
269 let storage_history_prune_checkpoint =
270 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
271
272 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
273
274 if let Some(prune_checkpoint_block_number) =
277 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
278 {
279 state_provider = state_provider.with_lowest_available_account_history_block_number(
280 prune_checkpoint_block_number + 1,
281 );
282 }
283 if let Some(prune_checkpoint_block_number) =
284 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
285 {
286 state_provider = state_provider.with_lowest_available_storage_history_block_number(
287 prune_checkpoint_block_number + 1,
288 );
289 }
290
291 Ok(Box::new(state_provider))
292 }
293
294 #[cfg(feature = "test-utils")]
295 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
297 self.prune_modes = prune_modes;
298 }
299}
300
301impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
302 type Primitives = N::Primitives;
303}
304
305impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
306 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
308 self.static_file_provider.clone()
309 }
310
311 fn get_static_file_writer(
312 &self,
313 block: BlockNumber,
314 segment: StaticFileSegment,
315 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
316 self.static_file_provider.get_writer(block, segment)
317 }
318}
319
320impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
321 fn rocksdb_provider(&self) -> RocksDBProvider {
323 self.rocksdb_provider.clone()
324 }
325
326 #[cfg(all(unix, feature = "rocksdb"))]
327 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
328 self.pending_rocksdb_batches.lock().push(batch);
329 }
330
331 #[cfg(all(unix, feature = "rocksdb"))]
332 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
333 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
334 for batch in batches {
335 self.rocksdb_provider.commit_batch(batch)?;
336 }
337 Ok(())
338 }
339}
340
341impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
342 for DatabaseProvider<TX, N>
343{
344 type ChainSpec = N::ChainSpec;
345
346 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
347 self.chain_spec.clone()
348 }
349}
350
351impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
352 #[allow(clippy::too_many_arguments)]
354 fn new_rw_inner(
355 tx: TX,
356 chain_spec: Arc<N::ChainSpec>,
357 static_file_provider: StaticFileProvider<N::Primitives>,
358 prune_modes: PruneModes,
359 storage: Arc<N::Storage>,
360 storage_settings: Arc<RwLock<StorageSettings>>,
361 rocksdb_provider: RocksDBProvider,
362 changeset_cache: ChangesetCache,
363 runtime: reth_tasks::Runtime,
364 db_path: PathBuf,
365 commit_order: CommitOrder,
366 ) -> Self {
367 Self {
368 tx,
369 chain_spec,
370 static_file_provider,
371 prune_modes,
372 storage,
373 storage_settings,
374 rocksdb_provider,
375 changeset_cache,
376 runtime,
377 db_path,
378 pending_rocksdb_batches: Default::default(),
379 commit_order,
380 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
381 metrics: metrics::DatabaseProviderMetrics::default(),
382 }
383 }
384
385 #[allow(clippy::too_many_arguments)]
387 pub fn new_rw(
388 tx: TX,
389 chain_spec: Arc<N::ChainSpec>,
390 static_file_provider: StaticFileProvider<N::Primitives>,
391 prune_modes: PruneModes,
392 storage: Arc<N::Storage>,
393 storage_settings: Arc<RwLock<StorageSettings>>,
394 rocksdb_provider: RocksDBProvider,
395 changeset_cache: ChangesetCache,
396 runtime: reth_tasks::Runtime,
397 db_path: PathBuf,
398 ) -> Self {
399 Self::new_rw_inner(
400 tx,
401 chain_spec,
402 static_file_provider,
403 prune_modes,
404 storage,
405 storage_settings,
406 rocksdb_provider,
407 changeset_cache,
408 runtime,
409 db_path,
410 CommitOrder::Normal,
411 )
412 }
413
414 #[allow(clippy::too_many_arguments)]
416 pub fn new_unwind_rw(
417 tx: TX,
418 chain_spec: Arc<N::ChainSpec>,
419 static_file_provider: StaticFileProvider<N::Primitives>,
420 prune_modes: PruneModes,
421 storage: Arc<N::Storage>,
422 storage_settings: Arc<RwLock<StorageSettings>>,
423 rocksdb_provider: RocksDBProvider,
424 changeset_cache: ChangesetCache,
425 runtime: reth_tasks::Runtime,
426 db_path: PathBuf,
427 ) -> Self {
428 Self::new_rw_inner(
429 tx,
430 chain_spec,
431 static_file_provider,
432 prune_modes,
433 storage,
434 storage_settings,
435 rocksdb_provider,
436 changeset_cache,
437 runtime,
438 db_path,
439 CommitOrder::Unwind,
440 )
441 }
442}
443
444impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
445 fn as_ref(&self) -> &Self {
446 self
447 }
448}
449
450impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
451 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
455 where
456 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
457 {
458 #[cfg(all(unix, feature = "rocksdb"))]
459 let rocksdb = self.rocksdb_provider();
460 #[cfg(all(unix, feature = "rocksdb"))]
461 let rocksdb_batch = rocksdb.batch();
462 #[cfg(not(all(unix, feature = "rocksdb")))]
463 let rocksdb_batch = ();
464
465 let (result, raw_batch) = f(rocksdb_batch)?;
466
467 #[cfg(all(unix, feature = "rocksdb"))]
468 if let Some(batch) = raw_batch {
469 self.set_pending_rocksdb_batch(batch);
470 }
471 let _ = raw_batch; Ok(result)
474 }
475
476 fn static_file_write_ctx(
478 &self,
479 save_mode: SaveBlocksMode,
480 first_block: BlockNumber,
481 last_block: BlockNumber,
482 ) -> ProviderResult<StaticFileWriteCtx> {
483 let tip = self.last_block_number()?.max(last_block);
484 Ok(StaticFileWriteCtx {
485 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
486 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
487 write_receipts: save_mode.with_state() &&
488 EitherWriter::receipts_destination(self).is_static_file(),
489 write_account_changesets: save_mode.with_state() &&
490 EitherWriterDestination::account_changesets(self).is_static_file(),
491 write_storage_changesets: save_mode.with_state() &&
492 EitherWriterDestination::storage_changesets(self).is_static_file(),
493 tip,
494 receipts_prune_mode: self.prune_modes.receipts,
495 receipts_prunable: self
497 .static_file_provider
498 .get_highest_static_file_tx(StaticFileSegment::Receipts)
499 .is_none() &&
500 PruneMode::Distance(self.minimum_pruning_distance)
501 .should_prune(first_block, tip),
502 })
503 }
504
505 #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
507 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
508 RocksDBWriteCtx {
509 first_block_number: first_block,
510 prune_tx_lookup: self.prune_modes.transaction_lookup,
511 storage_settings: self.cached_storage_settings(),
512 pending_batches: self.pending_rocksdb_batches.clone(),
513 }
514 }
515
516 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
525 pub fn save_blocks(
526 &self,
527 blocks: Vec<ExecutedBlock<N::Primitives>>,
528 save_mode: SaveBlocksMode,
529 ) -> ProviderResult<()> {
530 if blocks.is_empty() {
531 debug!(target: "providers::db", "Attempted to write empty block range");
532 return Ok(())
533 }
534
535 let total_start = Instant::now();
536 let block_count = blocks.len() as u64;
537 let first_number = blocks.first().unwrap().recovered_block().number();
538 let last_block_number = blocks.last().unwrap().recovered_block().number();
539
540 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
541
542 let first_tx_num = self
544 .tx
545 .cursor_read::<tables::TransactionBlocks>()?
546 .last()?
547 .map(|(n, _)| n + 1)
548 .unwrap_or_default();
549
550 let tx_nums: Vec<TxNumber> = {
551 let mut nums = Vec::with_capacity(blocks.len());
552 let mut current = first_tx_num;
553 for block in &blocks {
554 nums.push(current);
555 current += block.recovered_block().body().transaction_count() as u64;
556 }
557 nums
558 };
559
560 let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
561
562 let sf_provider = &self.static_file_provider;
564 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
565 #[cfg(all(unix, feature = "rocksdb"))]
566 let rocksdb_provider = self.rocksdb_provider.clone();
567 #[cfg(all(unix, feature = "rocksdb"))]
568 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
569 #[cfg(all(unix, feature = "rocksdb"))]
570 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
571
572 let mut sf_result = None;
573 #[cfg(all(unix, feature = "rocksdb"))]
574 let mut rocksdb_result = None;
575
576 let runtime = &self.runtime;
578 let span = tracing::Span::current();
581 runtime.storage_pool().in_place_scope(|s| {
582 s.spawn(|_| {
584 let _guard = span.enter();
585 let start = Instant::now();
586 sf_result = Some(
587 sf_provider
588 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
589 .map(|()| start.elapsed()),
590 );
591 });
592
593 #[cfg(all(unix, feature = "rocksdb"))]
595 if rocksdb_enabled {
596 s.spawn(|_| {
597 let _guard = span.enter();
598 let start = Instant::now();
599 rocksdb_result = Some(
600 rocksdb_provider
601 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
602 .map(|()| start.elapsed()),
603 );
604 });
605 }
606
607 let mdbx_start = Instant::now();
609
610 if !self.cached_storage_settings().storage_v2 &&
612 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
613 {
614 let start = Instant::now();
615 let total_tx_count: usize =
616 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
617 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
618 for (i, block) in blocks.iter().enumerate() {
619 let recovered_block = block.recovered_block();
620 for (tx_num, transaction) in
621 (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
622 {
623 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
624 }
625 }
626
627 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
629
630 self.with_rocksdb_batch(|batch| {
632 let mut tx_hash_writer =
633 EitherWriter::new_transaction_hash_numbers(self, batch)?;
634 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
635 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
636 Ok(((), raw_batch))
637 })?;
638 self.metrics.record_duration(
639 metrics::Action::InsertTransactionHashNumbers,
640 start.elapsed(),
641 );
642 }
643
644 for (i, block) in blocks.iter().enumerate() {
645 let recovered_block = block.recovered_block();
646
647 let start = Instant::now();
648 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
649 timings.insert_block += start.elapsed();
650
651 if save_mode.with_state() {
652 let execution_output = block.execution_outcome();
653
654 let start = Instant::now();
658 self.write_state(
659 WriteStateInput::Single {
660 outcome: execution_output,
661 block: recovered_block.number(),
662 },
663 OriginalValuesKnown::No,
664 StateWriteConfig {
665 write_receipts: !sf_ctx.write_receipts,
666 write_account_changesets: !sf_ctx.write_account_changesets,
667 write_storage_changesets: !sf_ctx.write_storage_changesets,
668 },
669 )?;
670 timings.write_state += start.elapsed();
671 }
672 }
673
674 if save_mode.with_state() {
677 let start = Instant::now();
679 let merged_hashed_state = HashedPostStateSorted::merge_batch(
680 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
681 );
682 if !merged_hashed_state.is_empty() {
683 self.write_hashed_state(&merged_hashed_state)?;
684 }
685 timings.write_hashed_state += start.elapsed();
686
687 let start = Instant::now();
688 let merged_trie =
689 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
690 if !merged_trie.is_empty() {
691 self.write_trie_updates_sorted(&merged_trie)?;
692 }
693 timings.write_trie_updates += start.elapsed();
694 }
695
696 if save_mode.with_state() {
698 let start = Instant::now();
699 self.update_history_indices(first_number..=last_block_number)?;
700 timings.update_history_indices = start.elapsed();
701 }
702
703 let start = Instant::now();
705 self.update_pipeline_stages(last_block_number, false)?;
706 timings.update_pipeline_stages = start.elapsed();
707
708 timings.mdbx = mdbx_start.elapsed();
709
710 Ok::<_, ProviderError>(())
711 })?;
712
713 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
715
716 #[cfg(all(unix, feature = "rocksdb"))]
717 if rocksdb_enabled {
718 timings.rocksdb = rocksdb_result.ok_or_else(|| {
719 ProviderError::Database(reth_db_api::DatabaseError::Other(
720 "RocksDB thread panicked".into(),
721 ))
722 })??;
723 }
724
725 timings.total = total_start.elapsed();
726
727 self.metrics.record_save_blocks(&timings);
728 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
729
730 Ok(())
731 }
732
733 #[instrument(level = "debug", target = "providers::db", skip_all)]
737 fn insert_block_mdbx_only(
738 &self,
739 block: &RecoveredBlock<BlockTy<N>>,
740 first_tx_num: TxNumber,
741 ) -> ProviderResult<StoredBlockBodyIndices> {
742 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
743 EitherWriterDestination::senders(self).is_database()
744 {
745 let start = Instant::now();
746 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
747 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
748 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
749 cursor.append(tx_num, &sender)?;
750 }
751 self.metrics
752 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
753 }
754
755 let block_number = block.number();
756 let tx_count = block.body().transaction_count() as u64;
757
758 let start = Instant::now();
759 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
760 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
761
762 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
763
764 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
765 }
766
767 fn write_block_body_indices(
770 &self,
771 block_number: BlockNumber,
772 body: &BodyTy<N>,
773 first_tx_num: TxNumber,
774 tx_count: u64,
775 ) -> ProviderResult<()> {
776 let start = Instant::now();
778 self.tx
779 .cursor_write::<tables::BlockBodyIndices>()?
780 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
781 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
782
783 if tx_count > 0 {
785 let start = Instant::now();
786 self.tx
787 .cursor_write::<tables::TransactionBlocks>()?
788 .append(first_tx_num + tx_count - 1, &block_number)?;
789 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
790 }
791
792 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
794
795 Ok(())
796 }
797
798 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
803 let changed_accounts = self.account_changesets_range(from..)?;
804
805 self.unwind_account_hashing(changed_accounts.iter())?;
807
808 self.unwind_account_history_indices(changed_accounts.iter())?;
810
811 let changed_storages = self.storage_changesets_range(from..)?;
812
813 self.unwind_storage_hashing(changed_storages.iter().copied())?;
815
816 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
818
819 let db_tip_block = self
822 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
823 .as_ref()
824 .map(|chk| chk.block_number)
825 .ok_or_else(|| ProviderError::InsufficientChangesets {
826 requested: from,
827 available: 0..=0,
828 })?;
829
830 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
831 self.write_trie_updates_sorted(&trie_revert)?;
832
833 Ok(())
834 }
835
836 fn remove_receipts_from(
838 &self,
839 from_tx: TxNumber,
840 last_block: BlockNumber,
841 ) -> ProviderResult<()> {
842 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
844
845 if EitherWriter::receipts_destination(self).is_static_file() {
846 let static_file_receipt_num =
847 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
848
849 let to_delete = static_file_receipt_num
850 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
851 .unwrap_or_default();
852
853 self.static_file_provider
854 .latest_writer(StaticFileSegment::Receipts)?
855 .prune_receipts(to_delete, last_block)?;
856 }
857
858 Ok(())
859 }
860
861 fn write_bytecodes(
863 &self,
864 bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
865 ) -> ProviderResult<()> {
866 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
867 for (hash, bytecode) in bytecodes {
868 bytecodes_cursor.upsert(hash, &bytecode)?;
869 }
870 Ok(())
871 }
872}
873
874impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
875 fn try_into_history_at_block(
876 self,
877 mut block_number: BlockNumber,
878 ) -> ProviderResult<StateProviderBox> {
879 let best_block = self.best_block_number().unwrap_or_default();
880
881 if block_number > best_block {
883 return Err(ProviderError::BlockNotExecuted {
884 requested: block_number,
885 executed: best_block,
886 });
887 }
888
889 if block_number == best_block {
891 return Ok(Box::new(LatestStateProvider::new(self)));
892 }
893
894 block_number += 1;
896
897 let account_history_prune_checkpoint =
898 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
899 let storage_history_prune_checkpoint =
900 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
901
902 let mut state_provider = HistoricalStateProvider::new(self, block_number);
903
904 if let Some(prune_checkpoint_block_number) =
907 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
908 {
909 state_provider = state_provider.with_lowest_available_account_history_block_number(
910 prune_checkpoint_block_number + 1,
911 );
912 }
913 if let Some(prune_checkpoint_block_number) =
914 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
915 {
916 state_provider = state_provider.with_lowest_available_storage_history_block_number(
917 prune_checkpoint_block_number + 1,
918 );
919 }
920
921 Ok(Box::new(state_provider))
922 }
923}
924
925fn unwind_history_shards<S, T, C>(
940 cursor: &mut C,
941 start_key: T::Key,
942 block_number: BlockNumber,
943 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
944) -> ProviderResult<Vec<u64>>
945where
946 T: Table<Value = BlockNumberList>,
947 T::Key: AsRef<ShardedKey<S>>,
948 C: DbCursorRO<T> + DbCursorRW<T>,
949{
950 let mut item = cursor.seek_exact(start_key)?;
952 while let Some((sharded_key, list)) = item {
953 if !shard_belongs_to_key(&sharded_key) {
955 break
956 }
957
958 cursor.delete_current()?;
961
962 let first = list.iter().next().expect("List can't be empty");
965
966 if first >= block_number {
969 item = cursor.prev()?;
970 continue
971 }
972 else if block_number <= sharded_key.as_ref().highest_block_number {
975 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
978 }
979 return Ok(list.iter().collect::<Vec<_>>())
982 }
983
984 Ok(Vec::new())
986}
987
988impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
989 #[allow(clippy::too_many_arguments)]
991 pub fn new(
992 tx: TX,
993 chain_spec: Arc<N::ChainSpec>,
994 static_file_provider: StaticFileProvider<N::Primitives>,
995 prune_modes: PruneModes,
996 storage: Arc<N::Storage>,
997 storage_settings: Arc<RwLock<StorageSettings>>,
998 rocksdb_provider: RocksDBProvider,
999 changeset_cache: ChangesetCache,
1000 runtime: reth_tasks::Runtime,
1001 db_path: PathBuf,
1002 ) -> Self {
1003 Self {
1004 tx,
1005 chain_spec,
1006 static_file_provider,
1007 prune_modes,
1008 storage,
1009 storage_settings,
1010 rocksdb_provider,
1011 changeset_cache,
1012 runtime,
1013 db_path,
1014 pending_rocksdb_batches: Default::default(),
1015 commit_order: CommitOrder::Normal,
1016 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1017 metrics: metrics::DatabaseProviderMetrics::default(),
1018 }
1019 }
1020
1021 pub fn into_tx(self) -> TX {
1023 self.tx
1024 }
1025
1026 pub const fn tx_mut(&mut self) -> &mut TX {
1028 &mut self.tx
1029 }
1030
1031 pub const fn tx_ref(&self) -> &TX {
1033 &self.tx
1034 }
1035
1036 pub fn chain_spec(&self) -> &N::ChainSpec {
1038 &self.chain_spec
1039 }
1040}
1041
1042impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1043 fn recovered_block<H, HF, B, BF>(
1044 &self,
1045 id: BlockHashOrNumber,
1046 _transaction_kind: TransactionVariant,
1047 header_by_number: HF,
1048 construct_block: BF,
1049 ) -> ProviderResult<Option<B>>
1050 where
1051 H: AsRef<HeaderTy<N>>,
1052 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1053 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1054 {
1055 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1056 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1057
1058 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1065
1066 let tx_range = body.tx_num_range();
1067
1068 let transactions = if tx_range.is_empty() {
1069 vec![]
1070 } else {
1071 self.transactions_by_tx_range(tx_range.clone())?
1072 };
1073
1074 let body = self
1075 .storage
1076 .reader()
1077 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1078 .pop()
1079 .ok_or(ProviderError::InvalidStorageOutput)?;
1080
1081 let senders = if tx_range.is_empty() {
1082 vec![]
1083 } else {
1084 let known_senders: HashMap<TxNumber, Address> =
1085 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1086
1087 let mut senders = Vec::with_capacity(body.transactions().len());
1088 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1089 match known_senders.get(&tx_num) {
1090 None => {
1091 let sender = tx.recover_signer_unchecked()?;
1092 senders.push(sender);
1093 }
1094 Some(sender) => senders.push(*sender),
1095 }
1096 }
1097 senders
1098 };
1099
1100 construct_block(header, body, senders)
1101 }
1102
1103 fn block_range<F, H, HF, R>(
1113 &self,
1114 range: RangeInclusive<BlockNumber>,
1115 headers_range: HF,
1116 mut assemble_block: F,
1117 ) -> ProviderResult<Vec<R>>
1118 where
1119 H: AsRef<HeaderTy<N>>,
1120 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1121 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1122 {
1123 if range.is_empty() {
1124 return Ok(Vec::new())
1125 }
1126
1127 let len = range.end().saturating_sub(*range.start()) as usize;
1128 let mut blocks = Vec::with_capacity(len);
1129
1130 let headers = headers_range(range.clone())?;
1131
1132 let present_headers = self
1138 .block_body_indices_range(range)?
1139 .into_iter()
1140 .map(|b| b.tx_num_range())
1141 .zip(headers)
1142 .collect::<Vec<_>>();
1143
1144 let mut inputs = Vec::with_capacity(present_headers.len());
1145 for (tx_range, header) in &present_headers {
1146 let transactions = if tx_range.is_empty() {
1147 Vec::new()
1148 } else {
1149 self.transactions_by_tx_range(tx_range.clone())?
1150 };
1151
1152 inputs.push((header.as_ref(), transactions));
1153 }
1154
1155 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1156
1157 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1158 blocks.push(assemble_block(header, body, tx_range)?);
1159 }
1160
1161 Ok(blocks)
1162 }
1163
1164 fn block_with_senders_range<H, HF, B, BF>(
1175 &self,
1176 range: RangeInclusive<BlockNumber>,
1177 headers_range: HF,
1178 assemble_block: BF,
1179 ) -> ProviderResult<Vec<B>>
1180 where
1181 H: AsRef<HeaderTy<N>>,
1182 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1183 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1184 {
1185 self.block_range(range, headers_range, |header, body, tx_range| {
1186 let senders = if tx_range.is_empty() {
1187 Vec::new()
1188 } else {
1189 let known_senders: HashMap<TxNumber, Address> =
1190 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1191
1192 let mut senders = Vec::with_capacity(body.transactions().len());
1193 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1194 match known_senders.get(&tx_num) {
1195 None => {
1196 let sender = tx.recover_signer_unchecked()?;
1198 senders.push(sender);
1199 }
1200 Some(sender) => senders.push(*sender),
1201 }
1202 }
1203
1204 senders
1205 };
1206
1207 assemble_block(header, body, senders)
1208 })
1209 }
1210
1211 fn populate_bundle_state<A, S>(
1215 &self,
1216 account_changeset: Vec<(u64, AccountBeforeTx)>,
1217 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1218 plain_accounts_cursor: &mut A,
1219 plain_storage_cursor: &mut S,
1220 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1221 where
1222 A: DbCursorRO<PlainAccountState>,
1223 S: DbDupCursorRO<PlainStorageState>,
1224 {
1225 let mut state: BundleStateInit = HashMap::default();
1229
1230 let mut reverts: RevertsInit = HashMap::default();
1236
1237 for (block_number, account_before) in account_changeset.into_iter().rev() {
1239 let AccountBeforeTx { info: old_info, address } = account_before;
1240 match state.entry(address) {
1241 hash_map::Entry::Vacant(entry) => {
1242 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1243 entry.insert((old_info, new_info, HashMap::default()));
1244 }
1245 hash_map::Entry::Occupied(mut entry) => {
1246 entry.get_mut().0 = old_info;
1248 }
1249 }
1250 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1252 }
1253
1254 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1256 let BlockNumberAddress((block_number, address)) = block_and_address;
1257 let account_state = match state.entry(address) {
1259 hash_map::Entry::Vacant(entry) => {
1260 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1261 entry.insert((present_info, present_info, HashMap::default()))
1262 }
1263 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1264 };
1265
1266 match account_state.2.entry(old_storage.key) {
1268 hash_map::Entry::Vacant(entry) => {
1269 let new_storage = plain_storage_cursor
1270 .seek_by_key_subkey(address, old_storage.key)?
1271 .filter(|storage| storage.key == old_storage.key)
1272 .unwrap_or_default();
1273 entry.insert((old_storage.value, new_storage.value));
1274 }
1275 hash_map::Entry::Occupied(mut entry) => {
1276 entry.get_mut().0 = old_storage.value;
1277 }
1278 };
1279
1280 reverts
1281 .entry(block_number)
1282 .or_default()
1283 .entry(address)
1284 .or_default()
1285 .1
1286 .push(old_storage);
1287 }
1288
1289 Ok((state, reverts))
1290 }
1291
1292 fn populate_bundle_state_hashed(
1297 &self,
1298 account_changeset: Vec<(u64, AccountBeforeTx)>,
1299 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1300 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1301 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1302 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1303 let mut state: BundleStateInit = HashMap::default();
1304 let mut reverts: RevertsInit = HashMap::default();
1305
1306 for (block_number, account_before) in account_changeset.into_iter().rev() {
1308 let AccountBeforeTx { info: old_info, address } = account_before;
1309 match state.entry(address) {
1310 hash_map::Entry::Vacant(entry) => {
1311 let hashed_address = keccak256(address);
1312 let new_info =
1313 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1314 entry.insert((old_info, new_info, HashMap::default()));
1315 }
1316 hash_map::Entry::Occupied(mut entry) => {
1317 entry.get_mut().0 = old_info;
1318 }
1319 }
1320 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1321 }
1322
1323 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1325 let BlockNumberAddress((block_number, address)) = block_and_address;
1326 let account_state = match state.entry(address) {
1327 hash_map::Entry::Vacant(entry) => {
1328 let hashed_address = keccak256(address);
1329 let present_info =
1330 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1331 entry.insert((present_info, present_info, HashMap::default()))
1332 }
1333 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1334 };
1335
1336 let hashed_storage_key = keccak256(old_storage.key);
1338 match account_state.2.entry(old_storage.key) {
1339 hash_map::Entry::Vacant(entry) => {
1340 let hashed_address = keccak256(address);
1341 let new_storage = hashed_storage_cursor
1342 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
1343 .filter(|storage| storage.key == hashed_storage_key)
1344 .unwrap_or_default();
1345 entry.insert((old_storage.value, new_storage.value));
1346 }
1347 hash_map::Entry::Occupied(mut entry) => {
1348 entry.get_mut().0 = old_storage.value;
1349 }
1350 };
1351
1352 reverts
1353 .entry(block_number)
1354 .or_default()
1355 .entry(address)
1356 .or_default()
1357 .1
1358 .push(old_storage);
1359 }
1360
1361 Ok((state, reverts))
1362 }
1363}
1364
1365impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1366 fn append_history_index<P, T>(
1374 &self,
1375 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1376 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1377 ) -> ProviderResult<()>
1378 where
1379 P: Copy,
1380 T: Table<Value = BlockNumberList>,
1381 {
1382 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1385
1386 let mut cursor = self.tx.cursor_write::<T>()?;
1387
1388 for (partial_key, indices) in index_updates {
1389 let last_key = sharded_key_factory(partial_key, u64::MAX);
1390 let mut last_shard = cursor
1391 .seek_exact(last_key.clone())?
1392 .map(|(_, list)| list)
1393 .unwrap_or_else(BlockNumberList::empty);
1394
1395 last_shard.append(indices).map_err(ProviderError::other)?;
1396
1397 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1399 cursor.upsert(last_key, &last_shard)?;
1400 continue;
1401 }
1402
1403 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1405 let mut chunks_peekable = chunks.into_iter().peekable();
1406
1407 while let Some(chunk) = chunks_peekable.next() {
1408 let shard = BlockNumberList::new_pre_sorted(chunk);
1409 let highest_block_number = if chunks_peekable.peek().is_some() {
1410 shard.iter().next_back().expect("`chunks` does not return empty list")
1411 } else {
1412 u64::MAX
1414 };
1415
1416 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1417 }
1418 }
1419
1420 Ok(())
1421 }
1422}
1423
1424impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1425 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1426 if self.cached_storage_settings().use_hashed_state() {
1427 let hashed_address = keccak256(address);
1428 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1429 } else {
1430 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1431 }
1432 }
1433}
1434
1435impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1436 fn changed_accounts_with_range(
1437 &self,
1438 range: RangeInclusive<BlockNumber>,
1439 ) -> ProviderResult<BTreeSet<Address>> {
1440 let mut reader = EitherReader::new_account_changesets(self)?;
1441
1442 reader.changed_accounts_with_range(range)
1443 }
1444
1445 fn basic_accounts(
1446 &self,
1447 iter: impl IntoIterator<Item = Address>,
1448 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1449 if self.cached_storage_settings().use_hashed_state() {
1450 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1451 Ok(iter
1452 .into_iter()
1453 .map(|address| {
1454 let hashed_address = keccak256(address);
1455 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1456 })
1457 .collect::<Result<Vec<_>, _>>()?)
1458 } else {
1459 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1460 Ok(iter
1461 .into_iter()
1462 .map(|address| {
1463 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1464 })
1465 .collect::<Result<Vec<_>, _>>()?)
1466 }
1467 }
1468
1469 fn changed_accounts_and_blocks_with_range(
1470 &self,
1471 range: RangeInclusive<BlockNumber>,
1472 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1473 let highest_static_block = self
1474 .static_file_provider
1475 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1476
1477 if let Some(highest) = highest_static_block &&
1478 self.cached_storage_settings().storage_v2
1479 {
1480 let start = *range.start();
1481 let static_end = (*range.end()).min(highest);
1482
1483 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1484 if start <= static_end {
1485 for block in start..=static_end {
1486 let block_changesets = self.account_block_changeset(block)?;
1487 for changeset in block_changesets {
1488 changed_accounts_and_blocks
1489 .entry(changeset.address)
1490 .or_default()
1491 .push(block);
1492 }
1493 }
1494 }
1495
1496 Ok(changed_accounts_and_blocks)
1497 } else {
1498 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1499
1500 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1501 BTreeMap::new(),
1502 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1503 let (index, account) = entry?;
1504 accounts.entry(account.address).or_default().push(index);
1505 Ok(accounts)
1506 },
1507 )?;
1508
1509 Ok(account_transitions)
1510 }
1511 }
1512}
1513
1514impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1515 fn storage_changeset(
1516 &self,
1517 block_number: BlockNumber,
1518 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1519 if self.cached_storage_settings().storage_v2 {
1520 self.static_file_provider.storage_changeset(block_number)
1521 } else {
1522 let range = block_number..=block_number;
1523 let storage_range = BlockNumberAddress::range(range);
1524 self.tx
1525 .cursor_dup_read::<tables::StorageChangeSets>()?
1526 .walk_range(storage_range)?
1527 .map(|r| {
1528 let (bna, entry) = r?;
1529 Ok((bna, entry))
1530 })
1531 .collect()
1532 }
1533 }
1534
1535 fn get_storage_before_block(
1536 &self,
1537 block_number: BlockNumber,
1538 address: Address,
1539 storage_key: B256,
1540 ) -> ProviderResult<Option<StorageEntry>> {
1541 if self.cached_storage_settings().storage_v2 {
1542 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1543 } else {
1544 Ok(self
1545 .tx
1546 .cursor_dup_read::<tables::StorageChangeSets>()?
1547 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1548 .filter(|entry| entry.key == storage_key))
1549 }
1550 }
1551
1552 fn storage_changesets_range(
1553 &self,
1554 range: impl RangeBounds<BlockNumber>,
1555 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1556 if self.cached_storage_settings().storage_v2 {
1557 self.static_file_provider.storage_changesets_range(range)
1558 } else {
1559 self.tx
1560 .cursor_dup_read::<tables::StorageChangeSets>()?
1561 .walk_range(BlockNumberAddressRange::from(range))?
1562 .map(|r| {
1563 let (bna, entry) = r?;
1564 Ok((bna, entry))
1565 })
1566 .collect()
1567 }
1568 }
1569
1570 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1571 if self.cached_storage_settings().storage_v2 {
1572 self.static_file_provider.storage_changeset_count()
1573 } else {
1574 Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1575 }
1576 }
1577}
1578
1579impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1580 fn account_block_changeset(
1581 &self,
1582 block_number: BlockNumber,
1583 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1584 if self.cached_storage_settings().storage_v2 {
1585 let static_changesets =
1586 self.static_file_provider.account_block_changeset(block_number)?;
1587 Ok(static_changesets)
1588 } else {
1589 let range = block_number..=block_number;
1590 self.tx
1591 .cursor_read::<tables::AccountChangeSets>()?
1592 .walk_range(range)?
1593 .map(|result| -> ProviderResult<_> {
1594 let (_, account_before) = result?;
1595 Ok(account_before)
1596 })
1597 .collect()
1598 }
1599 }
1600
1601 fn get_account_before_block(
1602 &self,
1603 block_number: BlockNumber,
1604 address: Address,
1605 ) -> ProviderResult<Option<AccountBeforeTx>> {
1606 if self.cached_storage_settings().storage_v2 {
1607 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1608 } else {
1609 self.tx
1610 .cursor_dup_read::<tables::AccountChangeSets>()?
1611 .seek_by_key_subkey(block_number, address)?
1612 .filter(|acc| acc.address == address)
1613 .map(Ok)
1614 .transpose()
1615 }
1616 }
1617
1618 fn account_changesets_range(
1619 &self,
1620 range: impl core::ops::RangeBounds<BlockNumber>,
1621 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1622 if self.cached_storage_settings().storage_v2 {
1623 self.static_file_provider.account_changesets_range(range)
1624 } else {
1625 self.tx
1626 .cursor_read::<tables::AccountChangeSets>()?
1627 .walk_range(to_range(range))?
1628 .map(|r| r.map_err(Into::into))
1629 .collect()
1630 }
1631 }
1632
1633 fn account_changeset_count(&self) -> ProviderResult<usize> {
1634 if self.cached_storage_settings().storage_v2 {
1637 self.static_file_provider.account_changeset_count()
1638 } else {
1639 Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1640 }
1641 }
1642}
1643
1644impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1645 for DatabaseProvider<TX, N>
1646{
1647 type Header = HeaderTy<N>;
1648
1649 fn local_tip_header(
1650 &self,
1651 highest_uninterrupted_block: BlockNumber,
1652 ) -> ProviderResult<SealedHeader<Self::Header>> {
1653 let static_file_provider = self.static_file_provider();
1654
1655 let next_static_file_block_num = static_file_provider
1658 .get_highest_static_file_block(StaticFileSegment::Headers)
1659 .map(|id| id + 1)
1660 .unwrap_or_default();
1661 let next_block = highest_uninterrupted_block + 1;
1662
1663 match next_static_file_block_num.cmp(&next_block) {
1664 Ordering::Greater => {
1667 let mut static_file_producer =
1668 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1669 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1670 static_file_producer.commit()?
1673 }
1674 Ordering::Less => {
1675 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1677 }
1678 Ordering::Equal => {}
1679 }
1680
1681 let local_head = static_file_provider
1682 .sealed_header(highest_uninterrupted_block)?
1683 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1684
1685 Ok(local_head)
1686 }
1687}
1688
1689impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1690 type Header = HeaderTy<N>;
1691
1692 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1693 if let Some(num) = self.block_number(block_hash)? {
1694 Ok(self.header_by_number(num)?)
1695 } else {
1696 Ok(None)
1697 }
1698 }
1699
1700 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1701 self.static_file_provider.header_by_number(num)
1702 }
1703
1704 fn headers_range(
1705 &self,
1706 range: impl RangeBounds<BlockNumber>,
1707 ) -> ProviderResult<Vec<Self::Header>> {
1708 self.static_file_provider.headers_range(range)
1709 }
1710
1711 fn sealed_header(
1712 &self,
1713 number: BlockNumber,
1714 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1715 self.static_file_provider.sealed_header(number)
1716 }
1717
1718 fn sealed_headers_while(
1719 &self,
1720 range: impl RangeBounds<BlockNumber>,
1721 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1722 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1723 self.static_file_provider.sealed_headers_while(range, predicate)
1724 }
1725}
1726
1727impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1728 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1729 self.static_file_provider.block_hash(number)
1730 }
1731
1732 fn canonical_hashes_range(
1733 &self,
1734 start: BlockNumber,
1735 end: BlockNumber,
1736 ) -> ProviderResult<Vec<B256>> {
1737 self.static_file_provider.canonical_hashes_range(start, end)
1738 }
1739}
1740
1741impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1742 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1743 let best_number = self.best_block_number()?;
1744 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1745 Ok(ChainInfo { best_hash, best_number })
1746 }
1747
1748 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1749 Ok(self
1752 .get_stage_checkpoint(StageId::Finish)?
1753 .map(|checkpoint| checkpoint.block_number)
1754 .unwrap_or_default())
1755 }
1756
1757 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1758 self.static_file_provider.last_block_number()
1759 }
1760
1761 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1762 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1763 }
1764}
1765
1766impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1767 type Block = BlockTy<N>;
1768
1769 fn find_block_by_hash(
1770 &self,
1771 hash: B256,
1772 source: BlockSource,
1773 ) -> ProviderResult<Option<Self::Block>> {
1774 if source.is_canonical() {
1775 self.block(hash.into())
1776 } else {
1777 Ok(None)
1778 }
1779 }
1780
1781 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1789 if let Some(number) = self.convert_hash_or_number(id)? {
1790 let earliest_available = self.static_file_provider.earliest_history_height();
1791 if number < earliest_available {
1792 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1793 }
1794
1795 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1796
1797 let Some(transactions) = self.transactions_by_block(number.into())? else {
1802 return Ok(None)
1803 };
1804
1805 let body = self
1806 .storage
1807 .reader()
1808 .read_block_bodies(self, vec![(&header, transactions)])?
1809 .pop()
1810 .ok_or(ProviderError::InvalidStorageOutput)?;
1811
1812 return Ok(Some(Self::Block::new(header, body)))
1813 }
1814
1815 Ok(None)
1816 }
1817
1818 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1819 Ok(None)
1820 }
1821
1822 fn pending_block_and_receipts(
1823 &self,
1824 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1825 Ok(None)
1826 }
1827
1828 fn recovered_block(
1837 &self,
1838 id: BlockHashOrNumber,
1839 transaction_kind: TransactionVariant,
1840 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1841 self.recovered_block(
1842 id,
1843 transaction_kind,
1844 |block_number| self.header_by_number(block_number),
1845 |header, body, senders| {
1846 Self::Block::new(header, body)
1847 .try_into_recovered_unchecked(senders)
1851 .map(Some)
1852 .map_err(|_| ProviderError::SenderRecoveryError)
1853 },
1854 )
1855 }
1856
1857 fn sealed_block_with_senders(
1858 &self,
1859 id: BlockHashOrNumber,
1860 transaction_kind: TransactionVariant,
1861 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1862 self.recovered_block(
1863 id,
1864 transaction_kind,
1865 |block_number| self.sealed_header(block_number),
1866 |header, body, senders| {
1867 Self::Block::new_sealed(header, body)
1868 .try_with_senders_unchecked(senders)
1872 .map(Some)
1873 .map_err(|_| ProviderError::SenderRecoveryError)
1874 },
1875 )
1876 }
1877
1878 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1879 self.block_range(
1880 range,
1881 |range| self.headers_range(range),
1882 |header, body, _| Ok(Self::Block::new(header, body)),
1883 )
1884 }
1885
1886 fn block_with_senders_range(
1887 &self,
1888 range: RangeInclusive<BlockNumber>,
1889 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1890 self.block_with_senders_range(
1891 range,
1892 |range| self.headers_range(range),
1893 |header, body, senders| {
1894 Self::Block::new(header, body)
1895 .try_into_recovered_unchecked(senders)
1896 .map_err(|_| ProviderError::SenderRecoveryError)
1897 },
1898 )
1899 }
1900
1901 fn recovered_block_range(
1902 &self,
1903 range: RangeInclusive<BlockNumber>,
1904 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1905 self.block_with_senders_range(
1906 range,
1907 |range| self.sealed_headers_range(range),
1908 |header, body, senders| {
1909 Self::Block::new_sealed(header, body)
1910 .try_with_senders(senders)
1911 .map_err(|_| ProviderError::SenderRecoveryError)
1912 },
1913 )
1914 }
1915
1916 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1917 Ok(self
1918 .tx
1919 .cursor_read::<tables::TransactionBlocks>()?
1920 .seek(id)
1921 .map(|b| b.map(|(_, bn)| bn))?)
1922 }
1923}
1924
1925impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1926 for DatabaseProvider<TX, N>
1927{
1928 fn transaction_hashes_by_range(
1931 &self,
1932 tx_range: Range<TxNumber>,
1933 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1934 self.static_file_provider.transaction_hashes_by_range(tx_range)
1935 }
1936}
1937
1938impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1940 type Transaction = TxTy<N>;
1941
1942 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1943 self.with_rocksdb_tx(|tx_ref| {
1944 let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1945 reader.get_transaction_hash_number(tx_hash)
1946 })
1947 }
1948
1949 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1950 self.static_file_provider.transaction_by_id(id)
1951 }
1952
1953 fn transaction_by_id_unhashed(
1954 &self,
1955 id: TxNumber,
1956 ) -> ProviderResult<Option<Self::Transaction>> {
1957 self.static_file_provider.transaction_by_id_unhashed(id)
1958 }
1959
1960 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1961 if let Some(id) = self.transaction_id(hash)? {
1962 Ok(self.transaction_by_id_unhashed(id)?)
1963 } else {
1964 Ok(None)
1965 }
1966 }
1967
1968 fn transaction_by_hash_with_meta(
1969 &self,
1970 tx_hash: TxHash,
1971 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1972 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1973 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1974 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1975 let Some(sealed_header) = self.sealed_header(block_number)?
1976 {
1977 let (header, block_hash) = sealed_header.split();
1978 if let Some(block_body) = self.block_body_indices(block_number)? {
1979 let index = transaction_id - block_body.first_tx_num();
1984
1985 let meta = TransactionMeta {
1986 tx_hash,
1987 index,
1988 block_hash,
1989 block_number,
1990 base_fee: header.base_fee_per_gas(),
1991 excess_blob_gas: header.excess_blob_gas(),
1992 timestamp: header.timestamp(),
1993 };
1994
1995 return Ok(Some((transaction, meta)))
1996 }
1997 }
1998
1999 Ok(None)
2000 }
2001
2002 fn transactions_by_block(
2003 &self,
2004 id: BlockHashOrNumber,
2005 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2006 if let Some(block_number) = self.convert_hash_or_number(id)? &&
2007 let Some(body) = self.block_body_indices(block_number)?
2008 {
2009 let tx_range = body.tx_num_range();
2010 return if tx_range.is_empty() {
2011 Ok(Some(Vec::new()))
2012 } else {
2013 self.transactions_by_tx_range(tx_range).map(Some)
2014 }
2015 }
2016 Ok(None)
2017 }
2018
2019 fn transactions_by_block_range(
2020 &self,
2021 range: impl RangeBounds<BlockNumber>,
2022 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2023 let range = to_range(range);
2024
2025 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2026 .into_iter()
2027 .map(|body| {
2028 let tx_num_range = body.tx_num_range();
2029 if tx_num_range.is_empty() {
2030 Ok(Vec::new())
2031 } else {
2032 self.transactions_by_tx_range(tx_num_range)
2033 }
2034 })
2035 .collect()
2036 }
2037
2038 fn transactions_by_tx_range(
2039 &self,
2040 range: impl RangeBounds<TxNumber>,
2041 ) -> ProviderResult<Vec<Self::Transaction>> {
2042 self.static_file_provider.transactions_by_tx_range(range)
2043 }
2044
2045 fn senders_by_tx_range(
2046 &self,
2047 range: impl RangeBounds<TxNumber>,
2048 ) -> ProviderResult<Vec<Address>> {
2049 if EitherWriterDestination::senders(self).is_static_file() {
2050 self.static_file_provider.senders_by_tx_range(range)
2051 } else {
2052 self.cursor_read_collect::<tables::TransactionSenders>(range)
2053 }
2054 }
2055
2056 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2057 if EitherWriterDestination::senders(self).is_static_file() {
2058 self.static_file_provider.transaction_sender(id)
2059 } else {
2060 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2061 }
2062 }
2063}
2064
2065impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2066 type Receipt = ReceiptTy<N>;
2067
2068 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2069 self.static_file_provider.get_with_static_file_or_database(
2070 StaticFileSegment::Receipts,
2071 id,
2072 |static_file| static_file.receipt(id),
2073 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2074 )
2075 }
2076
2077 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2078 if let Some(id) = self.transaction_id(hash)? {
2079 self.receipt(id)
2080 } else {
2081 Ok(None)
2082 }
2083 }
2084
2085 fn receipts_by_block(
2086 &self,
2087 block: BlockHashOrNumber,
2088 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2089 if let Some(number) = self.convert_hash_or_number(block)? &&
2090 let Some(body) = self.block_body_indices(number)?
2091 {
2092 let tx_range = body.tx_num_range();
2093 return if tx_range.is_empty() {
2094 Ok(Some(Vec::new()))
2095 } else {
2096 self.receipts_by_tx_range(tx_range).map(Some)
2097 }
2098 }
2099 Ok(None)
2100 }
2101
2102 fn receipts_by_tx_range(
2103 &self,
2104 range: impl RangeBounds<TxNumber>,
2105 ) -> ProviderResult<Vec<Self::Receipt>> {
2106 self.static_file_provider.get_range_with_static_file_or_database(
2107 StaticFileSegment::Receipts,
2108 to_range(range),
2109 |static_file, range, _| static_file.receipts_by_tx_range(range),
2110 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2111 |_| true,
2112 )
2113 }
2114
2115 fn receipts_by_block_range(
2116 &self,
2117 block_range: RangeInclusive<BlockNumber>,
2118 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2119 if block_range.is_empty() {
2120 return Ok(Vec::new());
2121 }
2122
2123 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2125 let mut block_body_indices = Vec::with_capacity(range_len);
2126 for block_num in block_range {
2127 if let Some(indices) = self.block_body_indices(block_num)? {
2128 block_body_indices.push(indices);
2129 } else {
2130 block_body_indices.push(StoredBlockBodyIndices::default());
2132 }
2133 }
2134
2135 if block_body_indices.is_empty() {
2136 return Ok(Vec::new());
2137 }
2138
2139 let non_empty_blocks: Vec<_> =
2141 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2142
2143 if non_empty_blocks.is_empty() {
2144 return Ok(vec![Vec::new(); block_body_indices.len()]);
2146 }
2147
2148 let first_tx = non_empty_blocks[0].first_tx_num();
2150 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2151
2152 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2154 let mut receipts_iter = all_receipts.into_iter();
2155
2156 let mut result = Vec::with_capacity(block_body_indices.len());
2158 for indices in &block_body_indices {
2159 if indices.tx_count == 0 {
2160 result.push(Vec::new());
2161 } else {
2162 let block_receipts =
2163 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2164 result.push(block_receipts);
2165 }
2166 }
2167
2168 Ok(result)
2169 }
2170}
2171
2172impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2173 for DatabaseProvider<TX, N>
2174{
2175 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2176 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2177 }
2178
2179 fn block_body_indices_range(
2180 &self,
2181 range: RangeInclusive<BlockNumber>,
2182 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2183 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2184 }
2185}
2186
2187impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2188 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2189 Ok(if let Some(encoded) = id.get_pre_encoded() {
2190 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2191 } else {
2192 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2193 })
2194 }
2195
2196 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2198 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2199 }
2200
2201 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2202 self.tx
2203 .cursor_read::<tables::StageCheckpoints>()?
2204 .walk(None)?
2205 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2206 .map_err(ProviderError::Database)
2207 }
2208}
2209
2210impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2211 fn save_stage_checkpoint(
2213 &self,
2214 id: StageId,
2215 checkpoint: StageCheckpoint,
2216 ) -> ProviderResult<()> {
2217 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2218 }
2219
2220 fn save_stage_checkpoint_progress(
2222 &self,
2223 id: StageId,
2224 checkpoint: Vec<u8>,
2225 ) -> ProviderResult<()> {
2226 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2227 }
2228
2229 #[instrument(level = "debug", target = "providers::db", skip_all)]
2230 fn update_pipeline_stages(
2231 &self,
2232 block_number: BlockNumber,
2233 drop_stage_checkpoint: bool,
2234 ) -> ProviderResult<()> {
2235 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2237 for stage_id in StageId::ALL {
2238 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2239 cursor.upsert(
2240 stage_id.to_string(),
2241 &StageCheckpoint {
2242 block_number,
2243 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2244 },
2245 )?;
2246 }
2247
2248 Ok(())
2249 }
2250}
2251
2252impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2253 fn plain_state_storages(
2254 &self,
2255 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2256 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2257 if self.cached_storage_settings().use_hashed_state() {
2258 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2259
2260 addresses_with_keys
2261 .into_iter()
2262 .map(|(address, storage)| {
2263 let hashed_address = keccak256(address);
2264 storage
2265 .into_iter()
2266 .map(|key| -> ProviderResult<_> {
2267 let hashed_key = keccak256(key);
2268 let value = hashed_storage
2269 .seek_by_key_subkey(hashed_address, hashed_key)?
2270 .filter(|v| v.key == hashed_key)
2271 .map(|v| v.value)
2272 .unwrap_or_default();
2273 Ok(StorageEntry { key, value })
2274 })
2275 .collect::<ProviderResult<Vec<_>>>()
2276 .map(|storage| (address, storage))
2277 })
2278 .collect::<ProviderResult<Vec<(_, _)>>>()
2279 } else {
2280 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2281
2282 addresses_with_keys
2283 .into_iter()
2284 .map(|(address, storage)| {
2285 storage
2286 .into_iter()
2287 .map(|key| -> ProviderResult<_> {
2288 Ok(plain_storage
2289 .seek_by_key_subkey(address, key)?
2290 .filter(|v| v.key == key)
2291 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2292 })
2293 .collect::<ProviderResult<Vec<_>>>()
2294 .map(|storage| (address, storage))
2295 })
2296 .collect::<ProviderResult<Vec<(_, _)>>>()
2297 }
2298 }
2299
2300 fn changed_storages_with_range(
2301 &self,
2302 range: RangeInclusive<BlockNumber>,
2303 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2304 if self.cached_storage_settings().storage_v2 {
2305 self.storage_changesets_range(range)?.into_iter().try_fold(
2306 BTreeMap::new(),
2307 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2308 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2309 accounts.entry(address).or_default().insert(storage_entry.key);
2310 Ok(accounts)
2311 },
2312 )
2313 } else {
2314 self.tx
2315 .cursor_read::<tables::StorageChangeSets>()?
2316 .walk_range(BlockNumberAddress::range(range))?
2317 .try_fold(
2320 BTreeMap::new(),
2321 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2322 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2323 accounts.entry(address).or_default().insert(storage_entry.key);
2324 Ok(accounts)
2325 },
2326 )
2327 }
2328 }
2329
2330 fn changed_storages_and_blocks_with_range(
2331 &self,
2332 range: RangeInclusive<BlockNumber>,
2333 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2334 if self.cached_storage_settings().storage_v2 {
2335 self.storage_changesets_range(range)?.into_iter().try_fold(
2336 BTreeMap::new(),
2337 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2338 storages
2339 .entry((index.address(), storage.key))
2340 .or_default()
2341 .push(index.block_number());
2342 Ok(storages)
2343 },
2344 )
2345 } else {
2346 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2347
2348 let storage_changeset_lists =
2349 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2350 BTreeMap::new(),
2351 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2352 entry|
2353 -> ProviderResult<_> {
2354 let (index, storage) = entry?;
2355 storages
2356 .entry((index.address(), storage.key))
2357 .or_default()
2358 .push(index.block_number());
2359 Ok(storages)
2360 },
2361 )?;
2362
2363 Ok(storage_changeset_lists)
2364 }
2365 }
2366}
2367
2368impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2369 for DatabaseProvider<TX, N>
2370{
2371 type Receipt = ReceiptTy<N>;
2372
2373 #[instrument(level = "debug", target = "providers::db", skip_all)]
2374 fn write_state<'a>(
2375 &self,
2376 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2377 is_value_known: OriginalValuesKnown,
2378 config: StateWriteConfig,
2379 ) -> ProviderResult<()> {
2380 let execution_outcome = execution_outcome.into();
2381
2382 if self.cached_storage_settings().use_hashed_state() &&
2383 !config.write_receipts &&
2384 !config.write_account_changesets &&
2385 !config.write_storage_changesets
2386 {
2387 self.write_bytecodes(
2391 execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2392 )?;
2393 return Ok(());
2394 }
2395
2396 let first_block = execution_outcome.first_block();
2397 let (plain_state, reverts) =
2398 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2399
2400 self.write_state_reverts(reverts, first_block, config)?;
2401 self.write_state_changes(plain_state)?;
2402
2403 if !config.write_receipts {
2404 return Ok(());
2405 }
2406
2407 let block_count = execution_outcome.len() as u64;
2408 let last_block = execution_outcome.last_block();
2409 let block_range = first_block..=last_block;
2410
2411 let tip = self.last_block_number()?.max(last_block);
2412
2413 let block_indices: Vec<_> = self
2415 .block_body_indices_range(block_range)?
2416 .into_iter()
2417 .map(|b| b.first_tx_num)
2418 .collect();
2419
2420 if block_indices.len() < block_count as usize {
2422 let missing_blocks = block_count - block_indices.len() as u64;
2423 return Err(ProviderError::BlockBodyIndicesNotFound(
2424 last_block.saturating_sub(missing_blocks - 1),
2425 ));
2426 }
2427
2428 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2429
2430 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2431 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2432
2433 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2441 self.static_file_provider()
2442 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2443 .is_none()) &&
2444 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2445
2446 let mut allowed_addresses: AddressSet = AddressSet::default();
2448 for (_, addresses) in contract_log_pruner.range(..first_block) {
2449 allowed_addresses.extend(addresses.iter().copied());
2450 }
2451
2452 for (idx, (receipts, first_tx_index)) in
2453 execution_outcome.receipts().zip(block_indices).enumerate()
2454 {
2455 let block_number = first_block + idx as u64;
2456
2457 receipts_writer.increment_block(block_number)?;
2459
2460 if prunable_receipts &&
2462 self.prune_modes
2463 .receipts
2464 .is_some_and(|mode| mode.should_prune(block_number, tip))
2465 {
2466 continue
2467 }
2468
2469 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2471 allowed_addresses.extend(new_addresses.iter().copied());
2472 }
2473
2474 for (idx, receipt) in receipts.iter().enumerate() {
2475 let receipt_idx = first_tx_index + idx as u64;
2476 if prunable_receipts &&
2479 has_contract_log_filter &&
2480 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2481 {
2482 continue
2483 }
2484
2485 receipts_writer.append_receipt(receipt_idx, receipt)?;
2486 }
2487 }
2488
2489 Ok(())
2490 }
2491
2492 fn write_state_reverts(
2493 &self,
2494 reverts: PlainStateReverts,
2495 first_block: BlockNumber,
2496 config: StateWriteConfig,
2497 ) -> ProviderResult<()> {
2498 if config.write_storage_changesets {
2500 tracing::trace!("Writing storage changes");
2501 let mut storages_cursor =
2502 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2503 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2504 let block_number = first_block + block_index as BlockNumber;
2505
2506 tracing::trace!(block_number, "Writing block change");
2507 storage_changes.par_sort_unstable_by_key(|a| a.address);
2509 let total_changes =
2510 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2511 let mut changeset = Vec::with_capacity(total_changes);
2512 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2513 let mut storage = storage_revert
2514 .into_iter()
2515 .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2516 .collect::<Vec<_>>();
2517 storage.par_sort_unstable_by_key(|a| a.0);
2519
2520 let mut wiped_storage = Vec::new();
2528 if wiped {
2529 tracing::trace!(?address, "Wiping storage");
2530 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2531 wiped_storage.push((entry.key, entry.value));
2532 while let Some(entry) = storages_cursor.next_dup_val()? {
2533 wiped_storage.push((entry.key, entry.value))
2534 }
2535 }
2536 }
2537
2538 tracing::trace!(?address, ?storage, "Writing storage reverts");
2539 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2540 changeset.push(StorageBeforeTx { address, key, value });
2541 }
2542 }
2543
2544 let mut storage_changesets_writer =
2545 EitherWriter::new_storage_changesets(self, block_number)?;
2546 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2547 }
2548 }
2549
2550 if !config.write_account_changesets {
2551 return Ok(());
2552 }
2553
2554 tracing::trace!(?first_block, "Writing account changes");
2556 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2557 let block_number = first_block + block_index as BlockNumber;
2558 let changeset = account_block_reverts
2559 .into_iter()
2560 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2561 .collect::<Vec<_>>();
2562 let mut account_changesets_writer =
2563 EitherWriter::new_account_changesets(self, block_number)?;
2564
2565 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2566 }
2567
2568 Ok(())
2569 }
2570
2571 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2572 changes.accounts.par_sort_by_key(|a| a.0);
2575 changes.storage.par_sort_by_key(|a| a.address);
2576 changes.contracts.par_sort_by_key(|a| a.0);
2577
2578 if !self.cached_storage_settings().use_hashed_state() {
2579 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2581 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2582 for (address, account) in changes.accounts {
2584 if let Some(account) = account {
2585 tracing::trace!(?address, "Updating plain state account");
2586 accounts_cursor.upsert(address, &account.into())?;
2587 } else if accounts_cursor.seek_exact(address)?.is_some() {
2588 tracing::trace!(?address, "Deleting plain state account");
2589 accounts_cursor.delete_current()?;
2590 }
2591 }
2592
2593 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2595 let mut storages_cursor =
2596 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2597 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2598 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2600 storages_cursor.delete_current_duplicates()?;
2601 }
2602 let mut storage = storage
2604 .into_iter()
2605 .map(|(k, value)| StorageEntry { key: k.into(), value })
2606 .collect::<Vec<_>>();
2607 storage.par_sort_unstable_by_key(|a| a.key);
2609
2610 for entry in storage {
2611 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2612 if let Some(db_entry) =
2613 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2614 db_entry.key == entry.key
2615 {
2616 storages_cursor.delete_current()?;
2617 }
2618
2619 if !entry.value.is_zero() {
2620 storages_cursor.upsert(address, &entry)?;
2621 }
2622 }
2623 }
2624 }
2625
2626 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2628 self.write_bytecodes(
2629 changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2630 )?;
2631
2632 Ok(())
2633 }
2634
2635 #[instrument(level = "debug", target = "providers::db", skip_all)]
2636 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2637 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2639 for (hashed_address, account) in hashed_state.accounts() {
2640 if let Some(account) = account {
2641 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2642 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2643 hashed_accounts_cursor.delete_current()?;
2644 }
2645 }
2646
2647 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2649 let mut hashed_storage_cursor =
2650 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2651 for (hashed_address, storage) in sorted_storages {
2652 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2653 hashed_storage_cursor.delete_current_duplicates()?;
2654 }
2655
2656 for (hashed_slot, value) in storage.storage_slots_ref() {
2657 let entry = StorageEntry { key: *hashed_slot, value: *value };
2658
2659 if let Some(db_entry) =
2660 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2661 db_entry.key == entry.key
2662 {
2663 hashed_storage_cursor.delete_current()?;
2664 }
2665
2666 if !entry.value.is_zero() {
2667 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2668 }
2669 }
2670 }
2671
2672 Ok(())
2673 }
2674
2675 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2697 let range = block + 1..=self.last_block_number()?;
2698
2699 if range.is_empty() {
2700 return Ok(());
2701 }
2702
2703 let block_bodies = self.block_body_indices_range(range.clone())?;
2705
2706 let from_transaction_num =
2708 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2709
2710 let storage_range = BlockNumberAddress::range(range.clone());
2711 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2712 let changesets = self.storage_changesets_range(range.clone())?;
2713 let mut changeset_writer =
2714 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2715 changeset_writer.prune_storage_changesets(block)?;
2716 changesets
2717 } else {
2718 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2719 };
2720 let account_changeset = if self.cached_storage_settings().storage_v2 {
2721 let changesets = self.account_changesets_range(range)?;
2722 let mut changeset_writer =
2723 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2724 changeset_writer.prune_account_changesets(block)?;
2725 changesets
2726 } else {
2727 self.take::<tables::AccountChangeSets>(range)?
2728 };
2729
2730 if self.cached_storage_settings().use_hashed_state() {
2731 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2732 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2733
2734 let (state, _) = self.populate_bundle_state_hashed(
2735 account_changeset,
2736 storage_changeset,
2737 &mut hashed_accounts_cursor,
2738 &mut hashed_storage_cursor,
2739 )?;
2740
2741 for (address, (old_account, new_account, storage)) in &state {
2742 if old_account != new_account {
2743 let hashed_address = keccak256(address);
2744 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2745 if let Some(account) = old_account {
2746 hashed_accounts_cursor.upsert(hashed_address, account)?;
2747 } else if existing_entry.is_some() {
2748 hashed_accounts_cursor.delete_current()?;
2749 }
2750 }
2751
2752 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2753 let hashed_address = keccak256(address);
2754 let hashed_storage_key = keccak256(storage_key);
2755 let storage_entry =
2756 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2757 if hashed_storage_cursor
2758 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2759 .is_some_and(|s| s.key == hashed_storage_key)
2760 {
2761 hashed_storage_cursor.delete_current()?
2762 }
2763
2764 if !old_storage_value.is_zero() {
2765 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2766 }
2767 }
2768 }
2769 } else {
2770 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2775 let mut plain_storage_cursor =
2776 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2777
2778 let (state, _) = self.populate_bundle_state(
2779 account_changeset,
2780 storage_changeset,
2781 &mut plain_accounts_cursor,
2782 &mut plain_storage_cursor,
2783 )?;
2784
2785 for (address, (old_account, new_account, storage)) in &state {
2786 if old_account != new_account {
2787 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2788 if let Some(account) = old_account {
2789 plain_accounts_cursor.upsert(*address, account)?;
2790 } else if existing_entry.is_some() {
2791 plain_accounts_cursor.delete_current()?;
2792 }
2793 }
2794
2795 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2796 let storage_entry =
2797 StorageEntry { key: *storage_key, value: *old_storage_value };
2798 if plain_storage_cursor
2799 .seek_by_key_subkey(*address, *storage_key)?
2800 .is_some_and(|s| s.key == *storage_key)
2801 {
2802 plain_storage_cursor.delete_current()?
2803 }
2804
2805 if !old_storage_value.is_zero() {
2806 plain_storage_cursor.upsert(*address, &storage_entry)?;
2807 }
2808 }
2809 }
2810 }
2811
2812 self.remove_receipts_from(from_transaction_num, block)?;
2813
2814 Ok(())
2815 }
2816
2817 fn take_state_above(
2839 &self,
2840 block: BlockNumber,
2841 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2842 let range = block + 1..=self.last_block_number()?;
2843
2844 if range.is_empty() {
2845 return Ok(ExecutionOutcome::default())
2846 }
2847 let start_block_number = *range.start();
2848
2849 let block_bodies = self.block_body_indices_range(range.clone())?;
2851
2852 let from_transaction_num =
2854 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2855 let to_transaction_num =
2856 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2857
2858 let storage_range = BlockNumberAddress::range(range.clone());
2859 let storage_changeset = if let Some(highest_block) = self
2860 .static_file_provider
2861 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2862 self.cached_storage_settings().storage_v2
2863 {
2864 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2865 let mut changeset_writer =
2866 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2867 changeset_writer.prune_storage_changesets(block)?;
2868 changesets
2869 } else {
2870 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2871 };
2872
2873 let highest_changeset_block = self
2875 .static_file_provider
2876 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2877 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2878 self.cached_storage_settings().storage_v2
2879 {
2880 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2882 let mut changeset_writer =
2883 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2884 changeset_writer.prune_account_changesets(block)?;
2885
2886 changesets
2887 } else {
2888 self.take::<tables::AccountChangeSets>(range)?
2891 };
2892
2893 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2894 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2895 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2896
2897 let (state, reverts) = self.populate_bundle_state_hashed(
2898 account_changeset,
2899 storage_changeset,
2900 &mut hashed_accounts_cursor,
2901 &mut hashed_storage_cursor,
2902 )?;
2903
2904 for (address, (old_account, new_account, storage)) in &state {
2905 if old_account != new_account {
2906 let hashed_address = keccak256(address);
2907 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2908 if let Some(account) = old_account {
2909 hashed_accounts_cursor.upsert(hashed_address, account)?;
2910 } else if existing_entry.is_some() {
2911 hashed_accounts_cursor.delete_current()?;
2912 }
2913 }
2914
2915 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2916 let hashed_address = keccak256(address);
2917 let hashed_storage_key = keccak256(storage_key);
2918 let storage_entry =
2919 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2920 if hashed_storage_cursor
2921 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2922 .is_some_and(|s| s.key == hashed_storage_key)
2923 {
2924 hashed_storage_cursor.delete_current()?
2925 }
2926
2927 if !old_storage_value.is_zero() {
2928 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2929 }
2930 }
2931 }
2932
2933 (state, reverts)
2934 } else {
2935 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2940 let mut plain_storage_cursor =
2941 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2942
2943 let (state, reverts) = self.populate_bundle_state(
2944 account_changeset,
2945 storage_changeset,
2946 &mut plain_accounts_cursor,
2947 &mut plain_storage_cursor,
2948 )?;
2949
2950 for (address, (old_account, new_account, storage)) in &state {
2951 if old_account != new_account {
2952 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2953 if let Some(account) = old_account {
2954 plain_accounts_cursor.upsert(*address, account)?;
2955 } else if existing_entry.is_some() {
2956 plain_accounts_cursor.delete_current()?;
2957 }
2958 }
2959
2960 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2961 let storage_entry =
2962 StorageEntry { key: *storage_key, value: *old_storage_value };
2963 if plain_storage_cursor
2964 .seek_by_key_subkey(*address, *storage_key)?
2965 .is_some_and(|s| s.key == *storage_key)
2966 {
2967 plain_storage_cursor.delete_current()?
2968 }
2969
2970 if !old_storage_value.is_zero() {
2971 plain_storage_cursor.upsert(*address, &storage_entry)?;
2972 }
2973 }
2974 }
2975
2976 (state, reverts)
2977 };
2978
2979 let mut receipts_iter = self
2981 .static_file_provider
2982 .get_range_with_static_file_or_database(
2983 StaticFileSegment::Receipts,
2984 from_transaction_num..to_transaction_num + 1,
2985 |static_file, range, _| {
2986 static_file
2987 .receipts_by_tx_range(range.clone())
2988 .map(|r| range.into_iter().zip(r).collect())
2989 },
2990 |range, _| {
2991 self.tx
2992 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2993 .walk_range(range)?
2994 .map(|r| r.map_err(Into::into))
2995 .collect()
2996 },
2997 |_| true,
2998 )?
2999 .into_iter()
3000 .peekable();
3001
3002 let mut receipts = Vec::with_capacity(block_bodies.len());
3003 for block_body in block_bodies {
3005 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3006 for num in block_body.tx_num_range() {
3007 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3008 block_receipts.push(receipts_iter.next().unwrap().1);
3009 }
3010 }
3011 receipts.push(block_receipts);
3012 }
3013
3014 self.remove_receipts_from(from_transaction_num, block)?;
3015
3016 Ok(ExecutionOutcome::new_init(
3017 state,
3018 reverts,
3019 Vec::new(),
3020 receipts,
3021 start_block_number,
3022 Vec::new(),
3023 ))
3024 }
3025}
3026
3027impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3028 fn write_account_trie_updates<A: TrieTableAdapter>(
3029 tx: &TX,
3030 trie_updates: &TrieUpdatesSorted,
3031 num_entries: &mut usize,
3032 ) -> ProviderResult<()>
3033 where
3034 TX: DbTxMut,
3035 {
3036 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3037 for (key, updated_node) in trie_updates.account_nodes_ref() {
3039 let nibbles = A::AccountKey::from(*key);
3040 match updated_node {
3041 Some(node) => {
3042 if !key.is_empty() {
3043 *num_entries += 1;
3044 account_trie_cursor.upsert(nibbles, node)?;
3045 }
3046 }
3047 None => {
3048 *num_entries += 1;
3049 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3050 account_trie_cursor.delete_current()?;
3051 }
3052 }
3053 }
3054 }
3055 Ok(())
3056 }
3057
3058 fn write_storage_tries<A: TrieTableAdapter>(
3059 tx: &TX,
3060 storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3061 num_entries: &mut usize,
3062 ) -> ProviderResult<()>
3063 where
3064 TX: DbTxMut,
3065 {
3066 let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3067 for (hashed_address, storage_trie_updates) in storage_tries {
3068 let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3069 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3070 *num_entries +=
3071 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3072 cursor = db_storage_trie_cursor.cursor;
3073 }
3074 Ok(())
3075 }
3076}
3077
3078impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3079 #[instrument(level = "debug", target = "providers::db", skip_all)]
3083 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3084 if trie_updates.is_empty() {
3085 return Ok(0)
3086 }
3087
3088 let mut num_entries = 0;
3090
3091 reth_trie_db::with_adapter!(self, |A| {
3092 Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3093 });
3094
3095 num_entries +=
3096 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3097
3098 Ok(num_entries)
3099 }
3100}
3101
3102impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3103 fn write_storage_trie_updates_sorted<'a>(
3109 &self,
3110 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3111 ) -> ProviderResult<usize> {
3112 let mut num_entries = 0;
3113 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3114 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3115 reth_trie_db::with_adapter!(self, |A| {
3116 Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3117 });
3118 Ok(num_entries)
3119 }
3120}
3121
3122impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3123 fn unwind_account_hashing<'a>(
3124 &self,
3125 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3126 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3127 let hashed_accounts = changesets
3131 .into_iter()
3132 .map(|(_, e)| (keccak256(e.address), e.info))
3133 .collect::<Vec<_>>()
3134 .into_iter()
3135 .rev()
3136 .collect::<BTreeMap<_, _>>();
3137
3138 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3140 for (hashed_address, account) in &hashed_accounts {
3141 if let Some(account) = account {
3142 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3143 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3144 hashed_accounts_cursor.delete_current()?;
3145 }
3146 }
3147
3148 Ok(hashed_accounts)
3149 }
3150
3151 fn unwind_account_hashing_range(
3152 &self,
3153 range: impl RangeBounds<BlockNumber>,
3154 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3155 let changesets = self.account_changesets_range(range)?;
3156 self.unwind_account_hashing(changesets.iter())
3157 }
3158
3159 fn insert_account_for_hashing(
3160 &self,
3161 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3162 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3163 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3164 let hashed_accounts =
3165 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3166 for (hashed_address, account) in &hashed_accounts {
3167 if let Some(account) = account {
3168 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3169 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3170 hashed_accounts_cursor.delete_current()?;
3171 }
3172 }
3173 Ok(hashed_accounts)
3174 }
3175
3176 fn unwind_storage_hashing(
3177 &self,
3178 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3179 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3180 let mut hashed_storages = changesets
3182 .into_iter()
3183 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3184 let hashed_key = keccak256(storage_entry.key);
3185 (keccak256(address), hashed_key, storage_entry.value)
3186 })
3187 .collect::<Vec<_>>();
3188 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3189
3190 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3192 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3193 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3194 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3195 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3196
3197 if hashed_storage
3198 .seek_by_key_subkey(hashed_address, key)?
3199 .is_some_and(|entry| entry.key == key)
3200 {
3201 hashed_storage.delete_current()?;
3202 }
3203
3204 if !value.is_zero() {
3205 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3206 }
3207 }
3208 Ok(hashed_storage_keys)
3209 }
3210
3211 fn unwind_storage_hashing_range(
3212 &self,
3213 range: impl RangeBounds<BlockNumber>,
3214 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3215 let changesets = self.storage_changesets_range(range)?;
3216 self.unwind_storage_hashing(changesets.into_iter())
3217 }
3218
3219 fn insert_storage_for_hashing(
3220 &self,
3221 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3222 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3223 let hashed_storages =
3225 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3226 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3227 map.insert(keccak256(entry.key), entry.value);
3228 map
3229 });
3230 map.insert(keccak256(address), storage);
3231 map
3232 });
3233
3234 let hashed_storage_keys = hashed_storages
3235 .iter()
3236 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3237 .collect();
3238
3239 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3240 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3243 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3244 if hashed_storage_cursor
3245 .seek_by_key_subkey(hashed_address, key)?
3246 .is_some_and(|entry| entry.key == key)
3247 {
3248 hashed_storage_cursor.delete_current()?;
3249 }
3250
3251 if !value.is_zero() {
3252 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3253 }
3254 Ok(())
3255 })
3256 })?;
3257
3258 Ok(hashed_storage_keys)
3259 }
3260}
3261
3262impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3263 fn unwind_account_history_indices<'a>(
3264 &self,
3265 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3266 ) -> ProviderResult<usize> {
3267 let mut last_indices = changesets
3268 .into_iter()
3269 .map(|(index, account)| (account.address, *index))
3270 .collect::<Vec<_>>();
3271 last_indices.sort_unstable_by_key(|(a, _)| *a);
3272
3273 if self.cached_storage_settings().storage_v2 {
3274 #[cfg(all(unix, feature = "rocksdb"))]
3275 {
3276 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3277 self.pending_rocksdb_batches.lock().push(batch);
3278 }
3279 } else {
3280 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3282 for &(address, rem_index) in &last_indices {
3283 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3284 &mut cursor,
3285 ShardedKey::last(address),
3286 rem_index,
3287 |sharded_key| sharded_key.key == address,
3288 )?;
3289
3290 if !partial_shard.is_empty() {
3293 cursor.insert(
3294 ShardedKey::last(address),
3295 &BlockNumberList::new_pre_sorted(partial_shard),
3296 )?;
3297 }
3298 }
3299 }
3300
3301 let changesets = last_indices.len();
3302 Ok(changesets)
3303 }
3304
3305 fn unwind_account_history_indices_range(
3306 &self,
3307 range: impl RangeBounds<BlockNumber>,
3308 ) -> ProviderResult<usize> {
3309 let changesets = self.account_changesets_range(range)?;
3310 self.unwind_account_history_indices(changesets.iter())
3311 }
3312
3313 fn insert_account_history_index(
3314 &self,
3315 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3316 ) -> ProviderResult<()> {
3317 self.append_history_index::<_, tables::AccountsHistory>(
3318 account_transitions,
3319 ShardedKey::new,
3320 )
3321 }
3322
3323 fn unwind_storage_history_indices(
3324 &self,
3325 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3326 ) -> ProviderResult<usize> {
3327 let mut storage_changesets = changesets
3328 .into_iter()
3329 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3330 .collect::<Vec<_>>();
3331 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3332
3333 if self.cached_storage_settings().storage_v2 {
3334 #[cfg(all(unix, feature = "rocksdb"))]
3335 {
3336 let batch =
3337 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3338 self.pending_rocksdb_batches.lock().push(batch);
3339 }
3340 } else {
3341 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3343 for &(address, storage_key, rem_index) in &storage_changesets {
3344 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3345 &mut cursor,
3346 StorageShardedKey::last(address, storage_key),
3347 rem_index,
3348 |storage_sharded_key| {
3349 storage_sharded_key.address == address &&
3350 storage_sharded_key.sharded_key.key == storage_key
3351 },
3352 )?;
3353
3354 if !partial_shard.is_empty() {
3357 cursor.insert(
3358 StorageShardedKey::last(address, storage_key),
3359 &BlockNumberList::new_pre_sorted(partial_shard),
3360 )?;
3361 }
3362 }
3363 }
3364
3365 let changesets = storage_changesets.len();
3366 Ok(changesets)
3367 }
3368
3369 fn unwind_storage_history_indices_range(
3370 &self,
3371 range: impl RangeBounds<BlockNumber>,
3372 ) -> ProviderResult<usize> {
3373 let changesets = self.storage_changesets_range(range)?;
3374 self.unwind_storage_history_indices(changesets.into_iter())
3375 }
3376
3377 fn insert_storage_history_index(
3378 &self,
3379 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3380 ) -> ProviderResult<()> {
3381 self.append_history_index::<_, tables::StoragesHistory>(
3382 storage_transitions,
3383 |(address, storage_key), highest_block_number| {
3384 StorageShardedKey::new(address, storage_key, highest_block_number)
3385 },
3386 )
3387 }
3388
3389 #[instrument(level = "debug", target = "providers::db", skip_all)]
3390 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3391 let storage_settings = self.cached_storage_settings();
3392 if !storage_settings.storage_v2 {
3393 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3394 self.insert_account_history_index(indices)?;
3395 }
3396
3397 if !storage_settings.storage_v2 {
3398 let indices = self.changed_storages_and_blocks_with_range(range)?;
3399 self.insert_storage_history_index(indices)?;
3400 }
3401
3402 Ok(())
3403 }
3404}
3405
3406impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3407 for DatabaseProvider<TX, N>
3408{
3409 fn take_block_and_execution_above(
3410 &self,
3411 block: BlockNumber,
3412 ) -> ProviderResult<Chain<Self::Primitives>> {
3413 let range = block + 1..=self.last_block_number()?;
3414
3415 self.unwind_trie_state_from(block + 1)?;
3416
3417 let execution_state = self.take_state_above(block)?;
3419
3420 let blocks = self.recovered_block_range(range)?;
3421
3422 self.remove_blocks_above(block)?;
3425
3426 self.update_pipeline_stages(block, true)?;
3428
3429 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3430 }
3431
3432 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3433 self.unwind_trie_state_from(block + 1)?;
3434
3435 self.remove_state_above(block)?;
3437
3438 self.remove_blocks_above(block)?;
3441
3442 self.update_pipeline_stages(block, true)?;
3444
3445 Ok(())
3446 }
3447}
3448
3449impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3450 for DatabaseProvider<TX, N>
3451{
3452 type Block = BlockTy<N>;
3453 type Receipt = ReceiptTy<N>;
3454
3455 fn insert_block(
3460 &self,
3461 block: &RecoveredBlock<Self::Block>,
3462 ) -> ProviderResult<StoredBlockBodyIndices> {
3463 let block_number = block.number();
3464
3465 let executed_block = ExecutedBlock::new(
3467 Arc::new(block.clone()),
3468 Arc::new(BlockExecutionOutput {
3469 result: BlockExecutionResult {
3470 receipts: Default::default(),
3471 requests: Default::default(),
3472 gas_used: 0,
3473 blob_gas_used: 0,
3474 },
3475 state: Default::default(),
3476 }),
3477 ComputedTrieData::default(),
3478 );
3479
3480 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3482
3483 self.block_body_indices(block_number)?
3485 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3486 }
3487
3488 fn append_block_bodies(
3489 &self,
3490 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3491 ) -> ProviderResult<()> {
3492 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3493
3494 let mut tx_writer =
3496 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3497
3498 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3499 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3500
3501 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3503
3504 for (block_number, body) in &bodies {
3505 tx_writer.increment_block(*block_number)?;
3507
3508 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3509 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3510
3511 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3512
3513 block_indices_cursor.append(*block_number, &block_indices)?;
3515
3516 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3517
3518 let Some(body) = body else { continue };
3519
3520 if !body.transactions().is_empty() {
3522 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3523 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3524 }
3525
3526 for transaction in body.transactions() {
3528 tx_writer.append_transaction(next_tx_num, transaction)?;
3529
3530 next_tx_num += 1;
3532 }
3533 }
3534
3535 self.storage.writer().write_block_bodies(self, bodies)?;
3536
3537 Ok(())
3538 }
3539
3540 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3541 let last_block_number = self.last_block_number()?;
3542 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3544 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3545 }
3546
3547 let highest_static_file_block = self
3549 .static_file_provider()
3550 .get_highest_static_file_block(StaticFileSegment::Headers)
3551 .expect("todo: error handling, headers should exist");
3552
3553 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3559 self.static_file_provider()
3560 .get_writer(block, StaticFileSegment::Headers)?
3561 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3562
3563 let unwind_tx_from = self
3565 .block_body_indices(block)?
3566 .map(|b| b.next_tx_num())
3567 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3568
3569 let unwind_tx_to = self
3571 .tx
3572 .cursor_read::<tables::BlockBodyIndices>()?
3573 .last()?
3574 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3576 .1
3577 .last_tx_num();
3578
3579 if unwind_tx_from <= unwind_tx_to {
3580 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3581 self.with_rocksdb_batch(|batch| {
3582 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3583 for (hash, _) in hashes {
3584 writer.delete_transaction_hash_number(hash)?;
3585 }
3586 Ok(((), writer.into_raw_rocksdb_batch()))
3587 })?;
3588 }
3589
3590 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3593 EitherWriter::new_senders(self, last_block_number)?
3594 .prune_senders(unwind_tx_from, block)?;
3595 }
3596
3597 self.remove_bodies_above(block)?;
3598
3599 Ok(())
3600 }
3601
3602 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3603 self.storage.writer().remove_block_bodies_above(self, block)?;
3604
3605 let unwind_tx_from = self
3607 .block_body_indices(block)?
3608 .map(|b| b.next_tx_num())
3609 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3610
3611 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3612 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3613
3614 let static_file_tx_num =
3615 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3616
3617 let to_delete = static_file_tx_num
3618 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3619 .unwrap_or_default();
3620
3621 self.static_file_provider
3622 .latest_writer(StaticFileSegment::Transactions)?
3623 .prune_transactions(to_delete, block)?;
3624
3625 Ok(())
3626 }
3627
3628 fn append_blocks_with_state(
3637 &self,
3638 blocks: Vec<RecoveredBlock<Self::Block>>,
3639 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3640 hashed_state: HashedPostStateSorted,
3641 ) -> ProviderResult<()> {
3642 if blocks.is_empty() {
3643 debug!(target: "providers::db", "Attempted to append empty block range");
3644 return Ok(())
3645 }
3646
3647 let first_number = blocks[0].number();
3650
3651 let last_block_number = blocks[blocks.len() - 1].number();
3654
3655 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3656
3657 let (account_transitions, storage_transitions) = {
3662 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3663 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3664 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3665 let block_number = first_number + block_idx as u64;
3666 for (address, account_revert) in block_reverts {
3667 account_transitions.entry(*address).or_default().push(block_number);
3668 for storage_key in account_revert.storage.keys() {
3669 let key = B256::from(storage_key.to_be_bytes());
3670 storage_transitions.entry((*address, key)).or_default().push(block_number);
3671 }
3672 }
3673 }
3674 (account_transitions, storage_transitions)
3675 };
3676
3677 for block in blocks {
3679 self.insert_block(&block)?;
3680 durations_recorder.record_relative(metrics::Action::InsertBlock);
3681 }
3682
3683 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3684 durations_recorder.record_relative(metrics::Action::InsertState);
3685
3686 self.write_hashed_state(&hashed_state)?;
3688 durations_recorder.record_relative(metrics::Action::InsertHashes);
3689
3690 let storage_settings = self.cached_storage_settings();
3695 if storage_settings.storage_v2 {
3696 #[cfg(all(unix, feature = "rocksdb"))]
3697 self.with_rocksdb_batch(|mut batch| {
3698 for (address, blocks) in account_transitions {
3699 batch.append_account_history_shard(address, blocks)?;
3700 }
3701 Ok(((), Some(batch.into_inner())))
3702 })?;
3703 } else {
3704 self.insert_account_history_index(account_transitions)?;
3705 }
3706 if storage_settings.storage_v2 {
3707 #[cfg(all(unix, feature = "rocksdb"))]
3708 self.with_rocksdb_batch(|mut batch| {
3709 for ((address, key), blocks) in storage_transitions {
3710 batch.append_storage_history_shard(address, key, blocks)?;
3711 }
3712 Ok(((), Some(batch.into_inner())))
3713 })?;
3714 } else {
3715 self.insert_storage_history_index(storage_transitions)?;
3716 }
3717 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3718
3719 self.update_pipeline_stages(last_block_number, false)?;
3721 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3722
3723 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3724
3725 Ok(())
3726 }
3727}
3728
3729impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3730 fn get_prune_checkpoint(
3731 &self,
3732 segment: PruneSegment,
3733 ) -> ProviderResult<Option<PruneCheckpoint>> {
3734 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3735 }
3736
3737 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3738 Ok(PruneSegment::variants()
3739 .filter_map(|segment| {
3740 self.tx
3741 .get::<tables::PruneCheckpoints>(segment)
3742 .transpose()
3743 .map(|chk| chk.map(|chk| (segment, chk)))
3744 })
3745 .collect::<Result<_, _>>()?)
3746 }
3747}
3748
3749impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3750 fn save_prune_checkpoint(
3751 &self,
3752 segment: PruneSegment,
3753 checkpoint: PruneCheckpoint,
3754 ) -> ProviderResult<()> {
3755 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3756 }
3757}
3758
3759impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3760 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3761 let db_entries = self.tx.entries::<T>()?;
3762 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3763 Ok(entries) => entries,
3764 Err(ProviderError::UnsupportedProvider) => 0,
3765 Err(err) => return Err(err),
3766 };
3767
3768 Ok(db_entries + static_file_entries)
3769 }
3770}
3771
3772impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3773 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3774 let mut finalized_blocks = self
3775 .tx
3776 .cursor_read::<tables::ChainState>()?
3777 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3778 .take(1)
3779 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3780
3781 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3782 Ok(last_finalized_block_number)
3783 }
3784
3785 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3786 let mut finalized_blocks = self
3787 .tx
3788 .cursor_read::<tables::ChainState>()?
3789 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3790 .take(1)
3791 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3792
3793 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3794 Ok(last_finalized_block_number)
3795 }
3796}
3797
3798impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3799 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3800 Ok(self
3801 .tx
3802 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3803 }
3804
3805 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3806 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3807 }
3808}
3809
3810impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3811 type Tx = TX;
3812
3813 fn tx_ref(&self) -> &Self::Tx {
3814 &self.tx
3815 }
3816
3817 fn tx_mut(&mut self) -> &mut Self::Tx {
3818 &mut self.tx
3819 }
3820
3821 fn into_tx(self) -> Self::Tx {
3822 self.tx
3823 }
3824
3825 fn prune_modes_ref(&self) -> &PruneModes {
3826 self.prune_modes_ref()
3827 }
3828
3829 #[instrument(
3831 name = "DatabaseProvider::commit",
3832 level = "debug",
3833 target = "providers::db",
3834 skip_all
3835 )]
3836 fn commit(self) -> ProviderResult<()> {
3837 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3842 self.tx.commit()?;
3843
3844 #[cfg(all(unix, feature = "rocksdb"))]
3845 {
3846 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3847 for batch in batches {
3848 self.rocksdb_provider.commit_batch(batch)?;
3849 }
3850 }
3851
3852 self.static_file_provider.commit()?;
3853 } else {
3854 let mut timings = metrics::CommitTimings::default();
3856
3857 let start = Instant::now();
3858 self.static_file_provider.finalize()?;
3859 timings.sf = start.elapsed();
3860
3861 #[cfg(all(unix, feature = "rocksdb"))]
3862 {
3863 let start = Instant::now();
3864 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3865 for batch in batches {
3866 self.rocksdb_provider.commit_batch(batch)?;
3867 }
3868 timings.rocksdb = start.elapsed();
3869 }
3870
3871 let start = Instant::now();
3872 self.tx.commit()?;
3873 timings.mdbx = start.elapsed();
3874
3875 self.metrics.record_commit(&timings);
3876 }
3877
3878 Ok(())
3879 }
3880}
3881
3882impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3883 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3884 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3885 }
3886}
3887
3888impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3889 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3890 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3891 }
3892}
3893
3894impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3895 fn cached_storage_settings(&self) -> StorageSettings {
3896 *self.storage_settings.read()
3897 }
3898
3899 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3900 *self.storage_settings.write() = settings;
3901 }
3902}
3903
3904impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3905 fn storage_path(&self) -> PathBuf {
3906 self.db_path.clone()
3907 }
3908}
3909
3910#[cfg(test)]
3911mod tests {
3912 use super::*;
3913 use crate::{
3914 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3915 BlockWriter,
3916 };
3917 use alloy_consensus::Header;
3918 use alloy_primitives::{
3919 map::{AddressMap, B256Map},
3920 U256,
3921 };
3922 use reth_chain_state::ExecutedBlock;
3923 use reth_ethereum_primitives::Receipt;
3924 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3925 use reth_primitives_traits::SealedBlock;
3926 use reth_testing_utils::generators::{self, random_block, BlockParams};
3927 use reth_trie::{
3928 HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3929 };
3930 use revm_database::BundleState;
3931 use revm_state::AccountInfo;
3932
3933 #[test]
3934 fn test_receipts_by_block_range_empty_range() {
3935 let factory = create_test_provider_factory();
3936 let provider = factory.provider().unwrap();
3937
3938 let start = 10u64;
3940 let end = 9u64;
3941 let result = provider.receipts_by_block_range(start..=end).unwrap();
3942 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3943 }
3944
3945 #[test]
3946 fn test_receipts_by_block_range_nonexistent_blocks() {
3947 let factory = create_test_provider_factory();
3948 let provider = factory.provider().unwrap();
3949
3950 let result = provider.receipts_by_block_range(10..=12).unwrap();
3952 assert_eq!(result, vec![vec![], vec![], vec![]]);
3953 }
3954
3955 #[test]
3956 fn test_receipts_by_block_range_single_block() {
3957 let factory = create_test_provider_factory();
3958 let data = BlockchainTestData::default();
3959
3960 let provider_rw = factory.provider_rw().unwrap();
3961 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3962 provider_rw
3963 .write_state(
3964 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3965 crate::OriginalValuesKnown::No,
3966 StateWriteConfig::default(),
3967 )
3968 .unwrap();
3969 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3970 provider_rw
3971 .write_state(
3972 &data.blocks[0].1,
3973 crate::OriginalValuesKnown::No,
3974 StateWriteConfig::default(),
3975 )
3976 .unwrap();
3977 provider_rw.commit().unwrap();
3978
3979 let provider = factory.provider().unwrap();
3980 let result = provider.receipts_by_block_range(1..=1).unwrap();
3981
3982 assert_eq!(result.len(), 1);
3984 assert_eq!(result[0].len(), 1);
3985 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3986 }
3987
3988 #[test]
3989 fn test_receipts_by_block_range_multiple_blocks() {
3990 let factory = create_test_provider_factory();
3991 let data = BlockchainTestData::default();
3992
3993 let provider_rw = factory.provider_rw().unwrap();
3994 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3995 provider_rw
3996 .write_state(
3997 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3998 crate::OriginalValuesKnown::No,
3999 StateWriteConfig::default(),
4000 )
4001 .unwrap();
4002 for i in 0..3 {
4003 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4004 provider_rw
4005 .write_state(
4006 &data.blocks[i].1,
4007 crate::OriginalValuesKnown::No,
4008 StateWriteConfig::default(),
4009 )
4010 .unwrap();
4011 }
4012 provider_rw.commit().unwrap();
4013
4014 let provider = factory.provider().unwrap();
4015 let result = provider.receipts_by_block_range(1..=3).unwrap();
4016
4017 assert_eq!(result.len(), 3);
4019 for (i, block_receipts) in result.iter().enumerate() {
4020 assert_eq!(block_receipts.len(), 1);
4021 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4022 }
4023 }
4024
4025 #[test]
4026 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4027 let factory = create_test_provider_factory();
4028 let data = BlockchainTestData::default();
4029
4030 let provider_rw = factory.provider_rw().unwrap();
4031 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4032 provider_rw
4033 .write_state(
4034 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4035 crate::OriginalValuesKnown::No,
4036 StateWriteConfig::default(),
4037 )
4038 .unwrap();
4039
4040 for i in 0..3 {
4042 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4043 provider_rw
4044 .write_state(
4045 &data.blocks[i].1,
4046 crate::OriginalValuesKnown::No,
4047 StateWriteConfig::default(),
4048 )
4049 .unwrap();
4050 }
4051 provider_rw.commit().unwrap();
4052
4053 let provider = factory.provider().unwrap();
4054 let result = provider.receipts_by_block_range(1..=3).unwrap();
4055
4056 assert_eq!(result.len(), 3);
4058 for block_receipts in &result {
4059 assert_eq!(block_receipts.len(), 1);
4060 }
4061 }
4062
4063 #[test]
4064 fn test_receipts_by_block_range_partial_range() {
4065 let factory = create_test_provider_factory();
4066 let data = BlockchainTestData::default();
4067
4068 let provider_rw = factory.provider_rw().unwrap();
4069 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4070 provider_rw
4071 .write_state(
4072 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4073 crate::OriginalValuesKnown::No,
4074 StateWriteConfig::default(),
4075 )
4076 .unwrap();
4077 for i in 0..3 {
4078 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4079 provider_rw
4080 .write_state(
4081 &data.blocks[i].1,
4082 crate::OriginalValuesKnown::No,
4083 StateWriteConfig::default(),
4084 )
4085 .unwrap();
4086 }
4087 provider_rw.commit().unwrap();
4088
4089 let provider = factory.provider().unwrap();
4090
4091 let result = provider.receipts_by_block_range(2..=5).unwrap();
4093 assert_eq!(result.len(), 4);
4094
4095 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4102 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4103 }
4104
4105 #[test]
4106 fn test_receipts_by_block_range_all_empty_blocks() {
4107 let factory = create_test_provider_factory();
4108 let mut rng = generators::rng();
4109
4110 let mut blocks = Vec::new();
4112 for i in 0..3 {
4113 let block =
4114 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4115 blocks.push(block);
4116 }
4117
4118 let provider_rw = factory.provider_rw().unwrap();
4119 for block in blocks {
4120 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4121 }
4122 provider_rw.commit().unwrap();
4123
4124 let provider = factory.provider().unwrap();
4125 let result = provider.receipts_by_block_range(1..=3).unwrap();
4126
4127 assert_eq!(result.len(), 3);
4128 for block_receipts in result {
4129 assert_eq!(block_receipts.len(), 0);
4130 }
4131 }
4132
4133 #[test]
4134 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4135 let factory = create_test_provider_factory();
4136 let data = BlockchainTestData::default();
4137
4138 let provider_rw = factory.provider_rw().unwrap();
4139 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4140 provider_rw
4141 .write_state(
4142 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4143 crate::OriginalValuesKnown::No,
4144 StateWriteConfig::default(),
4145 )
4146 .unwrap();
4147 for i in 0..3 {
4148 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4149 provider_rw
4150 .write_state(
4151 &data.blocks[i].1,
4152 crate::OriginalValuesKnown::No,
4153 StateWriteConfig::default(),
4154 )
4155 .unwrap();
4156 }
4157 provider_rw.commit().unwrap();
4158
4159 let provider = factory.provider().unwrap();
4160
4161 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4163
4164 let mut individual_results = Vec::new();
4166 for block_num in 1..=3 {
4167 let receipts =
4168 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4169 individual_results.push(receipts);
4170 }
4171
4172 assert_eq!(range_result, individual_results);
4173 }
4174
4175 #[test]
4176 fn test_write_trie_updates_sorted() {
4177 use reth_trie::{
4178 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4179 BranchNodeCompact, StorageTrieEntry,
4180 };
4181
4182 let factory = create_test_provider_factory();
4183 let provider_rw = factory.provider_rw().unwrap();
4184
4185 {
4187 let tx = provider_rw.tx_ref();
4188 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4189
4190 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4192 cursor
4193 .upsert(
4194 to_delete,
4195 &BranchNodeCompact::new(
4196 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4200 None,
4201 ),
4202 )
4203 .unwrap();
4204
4205 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4207 cursor
4208 .upsert(
4209 to_update,
4210 &BranchNodeCompact::new(
4211 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4215 None,
4216 ),
4217 )
4218 .unwrap();
4219 }
4220
4221 let storage_address1 = B256::from([1u8; 32]);
4223 let storage_address2 = B256::from([2u8; 32]);
4224 {
4225 let tx = provider_rw.tx_ref();
4226 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4227
4228 storage_cursor
4230 .upsert(
4231 storage_address1,
4232 &StorageTrieEntry {
4233 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4234 node: BranchNodeCompact::new(
4235 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4237 0b0000_0000_0000_0000,
4238 vec![],
4239 None,
4240 ),
4241 },
4242 )
4243 .unwrap();
4244
4245 storage_cursor
4247 .upsert(
4248 storage_address2,
4249 &StorageTrieEntry {
4250 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4251 node: BranchNodeCompact::new(
4252 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4254 0b0000_0000_0000_0000,
4255 vec![],
4256 None,
4257 ),
4258 },
4259 )
4260 .unwrap();
4261 storage_cursor
4262 .upsert(
4263 storage_address2,
4264 &StorageTrieEntry {
4265 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4266 node: BranchNodeCompact::new(
4267 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4269 0b0000_0000_0000_0000,
4270 vec![],
4271 None,
4272 ),
4273 },
4274 )
4275 .unwrap();
4276 }
4277
4278 let account_nodes = vec![
4280 (
4281 Nibbles::from_nibbles([0x1, 0x2]),
4282 Some(BranchNodeCompact::new(
4283 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4287 None,
4288 )),
4289 ),
4290 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4292 Nibbles::from_nibbles([0x5, 0x6]),
4293 Some(BranchNodeCompact::new(
4294 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4298 None,
4299 )),
4300 ),
4301 ];
4302
4303 let storage_trie1 = StorageTrieUpdatesSorted {
4305 is_deleted: false,
4306 storage_nodes: vec![
4307 (
4308 Nibbles::from_nibbles([0x1, 0x0]),
4309 Some(BranchNodeCompact::new(
4310 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4314 None,
4315 )),
4316 ),
4317 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4319 };
4320
4321 let storage_trie2 = StorageTrieUpdatesSorted {
4322 is_deleted: true, storage_nodes: vec![],
4324 };
4325
4326 let mut storage_tries = B256Map::default();
4327 storage_tries.insert(storage_address1, storage_trie1);
4328 storage_tries.insert(storage_address2, storage_trie2);
4329
4330 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4331
4332 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4334
4335 assert_eq!(num_entries, 5);
4338
4339 let tx = provider_rw.tx_ref();
4341 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4342
4343 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4345 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4346 assert!(entry1.is_some(), "Updated account node should exist");
4347 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4348 assert_eq!(
4349 entry1.unwrap().1.state_mask,
4350 expected_mask,
4351 "Account node should have updated state_mask"
4352 );
4353
4354 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4356 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4357 assert!(entry2.is_none(), "Deleted account node should not exist");
4358
4359 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4361 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4362 assert!(entry3.is_some(), "New account node should exist");
4363
4364 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4366
4367 let storage_entries1: Vec<_> = storage_cursor
4369 .walk_dup(Some(storage_address1), None)
4370 .unwrap()
4371 .collect::<Result<Vec<_>, _>>()
4372 .unwrap();
4373 assert_eq!(
4374 storage_entries1.len(),
4375 1,
4376 "Storage address1 should have 1 entry after deletion"
4377 );
4378 assert_eq!(
4379 storage_entries1[0].1.nibbles.0,
4380 Nibbles::from_nibbles([0x1, 0x0]),
4381 "Remaining entry should be [0x1, 0x0]"
4382 );
4383
4384 let storage_entries2: Vec<_> = storage_cursor
4386 .walk_dup(Some(storage_address2), None)
4387 .unwrap()
4388 .collect::<Result<Vec<_>, _>>()
4389 .unwrap();
4390 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4391
4392 provider_rw.commit().unwrap();
4393 }
4394
4395 #[test]
4396 fn test_prunable_receipts_logic() {
4397 let insert_blocks =
4398 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4399 let mut rng = generators::rng();
4400 for block_num in 0..=tip_block {
4401 let block = random_block(
4402 &mut rng,
4403 block_num,
4404 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4405 );
4406 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4407 }
4408 };
4409
4410 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4411 let outcome = ExecutionOutcome {
4412 first_block: block,
4413 receipts: vec![vec![Receipt {
4414 tx_type: Default::default(),
4415 success: true,
4416 cumulative_gas_used: block, logs: vec![],
4418 }]],
4419 ..Default::default()
4420 };
4421 provider_rw
4422 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4423 .unwrap();
4424 provider_rw.commit().unwrap();
4425 };
4426
4427 {
4429 let factory = create_test_provider_factory();
4430 let storage_settings = StorageSettings::v1();
4431 factory.set_storage_settings_cache(storage_settings);
4432 let factory = factory.with_prune_modes(PruneModes {
4433 receipts: Some(PruneMode::Before(100)),
4434 ..Default::default()
4435 });
4436
4437 let tip_block = 200u64;
4438 let first_block = 1u64;
4439
4440 let provider_rw = factory.provider_rw().unwrap();
4442 insert_blocks(&provider_rw, tip_block, 1);
4443 provider_rw.commit().unwrap();
4444
4445 write_receipts(
4446 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4447 first_block,
4448 );
4449 write_receipts(
4450 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4451 tip_block - 1,
4452 );
4453
4454 let provider = factory.provider().unwrap();
4455
4456 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4457 assert!(provider
4458 .receipts_by_block(block.into())
4459 .unwrap()
4460 .is_some_and(|r| r.len() == num_receipts));
4461 }
4462 }
4463
4464 {
4466 let factory = create_test_provider_factory();
4467 let storage_settings = StorageSettings::v2();
4468 factory.set_storage_settings_cache(storage_settings);
4469 let factory = factory.with_prune_modes(PruneModes {
4470 receipts: Some(PruneMode::Before(2)),
4471 ..Default::default()
4472 });
4473
4474 let tip_block = 200u64;
4475
4476 let provider_rw = factory.provider_rw().unwrap();
4478 insert_blocks(&provider_rw, tip_block, 1);
4479 provider_rw.commit().unwrap();
4480
4481 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4483 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4484
4485 assert!(factory
4486 .static_file_provider()
4487 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4488 .is_none(),);
4489 assert!(factory
4490 .static_file_provider()
4491 .get_highest_static_file_block(StaticFileSegment::Receipts)
4492 .is_some_and(|b| b == 1),);
4493
4494 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4497 assert!(factory
4498 .static_file_provider()
4499 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4500 .is_some_and(|num| num == 2),);
4501
4502 let factory = factory.with_prune_modes(PruneModes {
4506 receipts: Some(PruneMode::Before(100)),
4507 ..Default::default()
4508 });
4509 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4510 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4511 write_receipts(provider_rw, 3);
4512
4513 let provider = factory.provider().unwrap();
4518 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4519 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4520 assert!(provider
4521 .receipts_by_block(num.into())
4522 .unwrap()
4523 .is_some_and(|r| r.len() == num_receipts));
4524
4525 let receipt = provider.receipt(num).unwrap();
4526 if num_receipts > 0 {
4527 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4528 } else {
4529 assert!(receipt.is_none());
4530 }
4531 }
4532 }
4533 }
4534
4535 #[test]
4536 fn test_try_into_history_rejects_unexecuted_blocks() {
4537 use reth_storage_api::TryIntoHistoricalStateProvider;
4538
4539 let factory = create_test_provider_factory();
4540
4541 let data = BlockchainTestData::default();
4543 let provider_rw = factory.provider_rw().unwrap();
4544 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4545 provider_rw
4546 .write_state(
4547 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4548 crate::OriginalValuesKnown::No,
4549 StateWriteConfig::default(),
4550 )
4551 .unwrap();
4552 provider_rw.commit().unwrap();
4553
4554 let provider = factory.provider().unwrap();
4556
4557 let result = provider.try_into_history_at_block(0);
4559 assert!(result.is_ok(), "Block 0 should be available");
4560
4561 let provider = factory.provider().unwrap();
4563 let result = provider.try_into_history_at_block(100);
4564
4565 match result {
4567 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4568 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4569 Ok(_) => panic!("Expected error, got Ok"),
4570 }
4571 }
4572
4573 #[test]
4574 fn test_unwind_storage_hashing_with_hashed_state() {
4575 let factory = create_test_provider_factory();
4576 let storage_settings = StorageSettings::v2();
4577 factory.set_storage_settings_cache(storage_settings);
4578
4579 let address = Address::random();
4580 let hashed_address = keccak256(address);
4581
4582 let plain_slot = B256::random();
4583 let hashed_slot = keccak256(plain_slot);
4584
4585 let current_value = U256::from(100);
4586 let old_value = U256::from(42);
4587
4588 let provider_rw = factory.provider_rw().unwrap();
4589 provider_rw
4590 .tx
4591 .cursor_dup_write::<tables::HashedStorages>()
4592 .unwrap()
4593 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4594 .unwrap();
4595
4596 let changesets = vec![(
4597 BlockNumberAddress((1, address)),
4598 StorageEntry { key: plain_slot, value: old_value },
4599 )];
4600
4601 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4602
4603 assert_eq!(result.len(), 1);
4604 assert!(result.contains_key(&hashed_address));
4605 assert!(result[&hashed_address].contains(&hashed_slot));
4606
4607 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4608 let entry = cursor
4609 .seek_by_key_subkey(hashed_address, hashed_slot)
4610 .unwrap()
4611 .expect("entry should exist");
4612 assert_eq!(entry.key, hashed_slot);
4613 assert_eq!(entry.value, old_value);
4614 }
4615
4616 #[test]
4617 fn test_write_and_remove_state_roundtrip_legacy() {
4618 let factory = create_test_provider_factory();
4619 let storage_settings = StorageSettings::v1();
4620 assert!(!storage_settings.use_hashed_state());
4621 factory.set_storage_settings_cache(storage_settings);
4622
4623 let address = Address::with_last_byte(1);
4624 let hashed_address = keccak256(address);
4625 let slot = U256::from(5);
4626 let slot_key = B256::from(slot);
4627 let hashed_slot = keccak256(slot_key);
4628
4629 let mut rng = generators::rng();
4630 let block0 =
4631 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4632 let block1 =
4633 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4634
4635 {
4636 let provider_rw = factory.provider_rw().unwrap();
4637 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4638 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4639 provider_rw
4640 .tx
4641 .cursor_write::<tables::PlainAccountState>()
4642 .unwrap()
4643 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4644 .unwrap();
4645 provider_rw.commit().unwrap();
4646 }
4647
4648 let provider_rw = factory.provider_rw().unwrap();
4649
4650 let mut state_init: BundleStateInit = AddressMap::default();
4651 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4652 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4653 state_init.insert(
4654 address,
4655 (
4656 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4657 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4658 storage_map,
4659 ),
4660 );
4661
4662 let mut reverts_init: RevertsInit = HashMap::default();
4663 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4664 block_reverts.insert(
4665 address,
4666 (
4667 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4668 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4669 ),
4670 );
4671 reverts_init.insert(1, block_reverts);
4672
4673 let execution_outcome =
4674 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4675
4676 provider_rw
4677 .write_state(
4678 &execution_outcome,
4679 OriginalValuesKnown::Yes,
4680 StateWriteConfig {
4681 write_receipts: false,
4682 write_account_changesets: true,
4683 write_storage_changesets: true,
4684 },
4685 )
4686 .unwrap();
4687
4688 let hashed_state =
4689 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4690 provider_rw.write_hashed_state(&hashed_state).unwrap();
4691
4692 let account = provider_rw
4693 .tx
4694 .cursor_read::<tables::PlainAccountState>()
4695 .unwrap()
4696 .seek_exact(address)
4697 .unwrap()
4698 .unwrap()
4699 .1;
4700 assert_eq!(account.nonce, 1);
4701
4702 let storage_entry = provider_rw
4703 .tx
4704 .cursor_dup_read::<tables::PlainStorageState>()
4705 .unwrap()
4706 .seek_by_key_subkey(address, slot_key)
4707 .unwrap()
4708 .unwrap();
4709 assert_eq!(storage_entry.key, slot_key);
4710 assert_eq!(storage_entry.value, U256::from(10));
4711
4712 let hashed_entry = provider_rw
4713 .tx
4714 .cursor_dup_read::<tables::HashedStorages>()
4715 .unwrap()
4716 .seek_by_key_subkey(hashed_address, hashed_slot)
4717 .unwrap()
4718 .unwrap();
4719 assert_eq!(hashed_entry.key, hashed_slot);
4720 assert_eq!(hashed_entry.value, U256::from(10));
4721
4722 let account_cs_entries = provider_rw
4723 .tx
4724 .cursor_dup_read::<tables::AccountChangeSets>()
4725 .unwrap()
4726 .walk(Some(1))
4727 .unwrap()
4728 .collect::<Result<Vec<_>, _>>()
4729 .unwrap();
4730 assert!(!account_cs_entries.is_empty());
4731
4732 let storage_cs_entries = provider_rw
4733 .tx
4734 .cursor_read::<tables::StorageChangeSets>()
4735 .unwrap()
4736 .walk(Some(BlockNumberAddress((1, address))))
4737 .unwrap()
4738 .collect::<Result<Vec<_>, _>>()
4739 .unwrap();
4740 assert!(!storage_cs_entries.is_empty());
4741 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4742
4743 provider_rw.remove_state_above(0).unwrap();
4744
4745 let restored_account = provider_rw
4746 .tx
4747 .cursor_read::<tables::PlainAccountState>()
4748 .unwrap()
4749 .seek_exact(address)
4750 .unwrap()
4751 .unwrap()
4752 .1;
4753 assert_eq!(restored_account.nonce, 0);
4754
4755 let storage_gone = provider_rw
4756 .tx
4757 .cursor_dup_read::<tables::PlainStorageState>()
4758 .unwrap()
4759 .seek_by_key_subkey(address, slot_key)
4760 .unwrap();
4761 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4762
4763 let account_cs_after = provider_rw
4764 .tx
4765 .cursor_dup_read::<tables::AccountChangeSets>()
4766 .unwrap()
4767 .walk(Some(1))
4768 .unwrap()
4769 .collect::<Result<Vec<_>, _>>()
4770 .unwrap();
4771 assert!(account_cs_after.is_empty());
4772
4773 let storage_cs_after = provider_rw
4774 .tx
4775 .cursor_read::<tables::StorageChangeSets>()
4776 .unwrap()
4777 .walk(Some(BlockNumberAddress((1, address))))
4778 .unwrap()
4779 .collect::<Result<Vec<_>, _>>()
4780 .unwrap();
4781 assert!(storage_cs_after.is_empty());
4782 }
4783
4784 #[test]
4785 fn test_unwind_storage_hashing_legacy() {
4786 let factory = create_test_provider_factory();
4787 let storage_settings = StorageSettings::v1();
4788 assert!(!storage_settings.use_hashed_state());
4789 factory.set_storage_settings_cache(storage_settings);
4790
4791 let address = Address::random();
4792 let hashed_address = keccak256(address);
4793
4794 let plain_slot = B256::random();
4795 let hashed_slot = keccak256(plain_slot);
4796
4797 let current_value = U256::from(100);
4798 let old_value = U256::from(42);
4799
4800 let provider_rw = factory.provider_rw().unwrap();
4801 provider_rw
4802 .tx
4803 .cursor_dup_write::<tables::HashedStorages>()
4804 .unwrap()
4805 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4806 .unwrap();
4807
4808 let changesets = vec![(
4809 BlockNumberAddress((1, address)),
4810 StorageEntry { key: plain_slot, value: old_value },
4811 )];
4812
4813 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4814
4815 assert_eq!(result.len(), 1);
4816 assert!(result.contains_key(&hashed_address));
4817 assert!(result[&hashed_address].contains(&hashed_slot));
4818
4819 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4820 let entry = cursor
4821 .seek_by_key_subkey(hashed_address, hashed_slot)
4822 .unwrap()
4823 .expect("entry should exist");
4824 assert_eq!(entry.key, hashed_slot);
4825 assert_eq!(entry.value, old_value);
4826 }
4827
4828 #[test]
4829 fn test_write_state_and_historical_read_hashed() {
4830 use reth_storage_api::StateProvider;
4831 use reth_trie::{HashedPostState, KeccakKeyHasher};
4832 use revm_database::BundleState;
4833 use revm_state::AccountInfo;
4834
4835 let factory = create_test_provider_factory();
4836 factory.set_storage_settings_cache(StorageSettings::v2());
4837
4838 let address = Address::with_last_byte(1);
4839 let slot = U256::from(5);
4840 let slot_key = B256::from(slot);
4841 let hashed_address = keccak256(address);
4842 let hashed_slot = keccak256(slot_key);
4843
4844 {
4845 let sf = factory.static_file_provider();
4846 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4847 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4848 hw.append_header(&h0, &B256::ZERO).unwrap();
4849 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4850 hw.append_header(&h1, &B256::ZERO).unwrap();
4851 hw.commit().unwrap();
4852
4853 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4854 aw.append_account_changeset(vec![], 0).unwrap();
4855 aw.commit().unwrap();
4856
4857 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4858 sw.append_storage_changeset(vec![], 0).unwrap();
4859 sw.commit().unwrap();
4860 }
4861
4862 let provider_rw = factory.provider_rw().unwrap();
4863
4864 let bundle = BundleState::builder(1..=1)
4865 .state_present_account_info(
4866 address,
4867 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4868 )
4869 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4870 .revert_account_info(1, address, Some(None))
4871 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4872 .build();
4873
4874 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4875
4876 provider_rw
4877 .tx
4878 .put::<tables::BlockBodyIndices>(
4879 1,
4880 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4881 )
4882 .unwrap();
4883
4884 provider_rw
4885 .write_state(
4886 &execution_outcome,
4887 OriginalValuesKnown::Yes,
4888 StateWriteConfig {
4889 write_receipts: false,
4890 write_account_changesets: true,
4891 write_storage_changesets: true,
4892 },
4893 )
4894 .unwrap();
4895
4896 let hashed_state =
4897 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4898 provider_rw.write_hashed_state(&hashed_state).unwrap();
4899
4900 let plain_storage_entries = provider_rw
4901 .tx
4902 .cursor_dup_read::<tables::PlainStorageState>()
4903 .unwrap()
4904 .walk(None)
4905 .unwrap()
4906 .collect::<Result<Vec<_>, _>>()
4907 .unwrap();
4908 assert!(plain_storage_entries.is_empty());
4909
4910 let hashed_entry = provider_rw
4911 .tx
4912 .cursor_dup_read::<tables::HashedStorages>()
4913 .unwrap()
4914 .seek_by_key_subkey(hashed_address, hashed_slot)
4915 .unwrap()
4916 .unwrap();
4917 assert_eq!(hashed_entry.key, hashed_slot);
4918 assert_eq!(hashed_entry.value, U256::from(10));
4919
4920 provider_rw.static_file_provider().commit().unwrap();
4921
4922 let sf = factory.static_file_provider();
4923 let storage_cs = sf.storage_changeset(1).unwrap();
4924 assert!(!storage_cs.is_empty());
4925 assert_eq!(storage_cs[0].1.key, slot_key);
4926
4927 let account_cs = sf.account_block_changeset(1).unwrap();
4928 assert!(!account_cs.is_empty());
4929 assert_eq!(account_cs[0].address, address);
4930
4931 let historical_value =
4932 HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4933 assert_eq!(historical_value, None);
4934 }
4935
4936 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4937 enum StorageMode {
4938 V1,
4939 V2,
4940 }
4941
4942 fn run_save_blocks_and_verify(mode: StorageMode) {
4943 use alloy_primitives::map::{FbBuildHasher, HashMap};
4944
4945 let factory = create_test_provider_factory();
4946
4947 match mode {
4948 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4949 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4950 }
4951
4952 let num_blocks = 3u64;
4953 let accounts_per_block = 5usize;
4954 let slots_per_account = 3usize;
4955
4956 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4957 SealedHeader::new(
4958 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4959 B256::ZERO,
4960 ),
4961 Default::default(),
4962 );
4963
4964 let genesis_executed = ExecutedBlock::new(
4965 Arc::new(genesis.try_recover().unwrap()),
4966 Arc::new(BlockExecutionOutput {
4967 result: BlockExecutionResult {
4968 receipts: vec![],
4969 requests: Default::default(),
4970 gas_used: 0,
4971 blob_gas_used: 0,
4972 },
4973 state: Default::default(),
4974 }),
4975 ComputedTrieData::default(),
4976 );
4977 let provider_rw = factory.provider_rw().unwrap();
4978 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
4979 provider_rw.commit().unwrap();
4980
4981 let mut blocks: Vec<ExecutedBlock> = Vec::new();
4982 let mut parent_hash = B256::ZERO;
4983
4984 for block_num in 1..=num_blocks {
4985 let mut builder = BundleState::builder(block_num..=block_num);
4986
4987 for acct_idx in 0..accounts_per_block {
4988 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
4989 let info = AccountInfo {
4990 nonce: block_num,
4991 balance: U256::from(block_num * 100 + acct_idx as u64),
4992 ..Default::default()
4993 };
4994
4995 let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
4996 slots_per_account as u64)
4997 .map(|s| {
4998 (
4999 U256::from(s + acct_idx as u64 * 100),
5000 (U256::ZERO, U256::from(block_num * 1000 + s)),
5001 )
5002 })
5003 .collect();
5004
5005 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5006 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5007 .collect();
5008
5009 builder = builder
5010 .state_present_account_info(address, info)
5011 .revert_account_info(block_num, address, Some(None))
5012 .state_storage(address, storage)
5013 .revert_storage(block_num, address, revert_storage);
5014 }
5015
5016 let bundle = builder.build();
5017
5018 let hashed_state =
5019 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5020
5021 let header = Header {
5022 number: block_num,
5023 parent_hash,
5024 difficulty: U256::from(1),
5025 ..Default::default()
5026 };
5027 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5028 header,
5029 Default::default(),
5030 );
5031 parent_hash = block.hash();
5032
5033 let executed = ExecutedBlock::new(
5034 Arc::new(block.try_recover().unwrap()),
5035 Arc::new(BlockExecutionOutput {
5036 result: BlockExecutionResult {
5037 receipts: vec![],
5038 requests: Default::default(),
5039 gas_used: 0,
5040 blob_gas_used: 0,
5041 },
5042 state: bundle,
5043 }),
5044 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5045 );
5046 blocks.push(executed);
5047 }
5048
5049 let provider_rw = factory.provider_rw().unwrap();
5050 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5051 provider_rw.commit().unwrap();
5052
5053 let provider = factory.provider().unwrap();
5054
5055 for block_num in 1..=num_blocks {
5056 for acct_idx in 0..accounts_per_block {
5057 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5058 let hashed_address = keccak256(address);
5059
5060 let ha_entry = provider
5061 .tx_ref()
5062 .cursor_read::<tables::HashedAccounts>()
5063 .unwrap()
5064 .seek_exact(hashed_address)
5065 .unwrap();
5066 assert!(
5067 ha_entry.is_some(),
5068 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5069 );
5070
5071 for s in 1..=slots_per_account as u64 {
5072 let slot = U256::from(s + acct_idx as u64 * 100);
5073 let slot_key = B256::from(slot);
5074 let hashed_slot = keccak256(slot_key);
5075
5076 let hs_entry = provider
5077 .tx_ref()
5078 .cursor_dup_read::<tables::HashedStorages>()
5079 .unwrap()
5080 .seek_by_key_subkey(hashed_address, hashed_slot)
5081 .unwrap();
5082 assert!(
5083 hs_entry.is_some(),
5084 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5085 );
5086 let entry = hs_entry.unwrap();
5087 assert_eq!(entry.key, hashed_slot);
5088 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5089 }
5090 }
5091 }
5092
5093 for block_num in 1..=num_blocks {
5094 let header = provider.header_by_number(block_num).unwrap();
5095 assert!(header.is_some(), "Header missing for block {block_num}");
5096
5097 let indices = provider.block_body_indices(block_num).unwrap();
5098 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5099 }
5100
5101 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5102 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5103
5104 if mode == StorageMode::V2 {
5105 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5106 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5107
5108 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5109 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5110
5111 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5112 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5113
5114 provider.static_file_provider().commit().unwrap();
5115 let sf = factory.static_file_provider();
5116
5117 for block_num in 1..=num_blocks {
5118 let account_cs = sf.account_block_changeset(block_num).unwrap();
5119 assert!(
5120 !account_cs.is_empty(),
5121 "v2: static file AccountChangeSets should exist for block {block_num}"
5122 );
5123
5124 let storage_cs = sf.storage_changeset(block_num).unwrap();
5125 assert!(
5126 !storage_cs.is_empty(),
5127 "v2: static file StorageChangeSets should exist for block {block_num}"
5128 );
5129
5130 for (_, entry) in &storage_cs {
5131 assert!(
5132 entry.key != keccak256(entry.key),
5133 "v2: static file storage changeset should have plain slot keys"
5134 );
5135 }
5136 }
5137
5138 #[cfg(all(unix, feature = "rocksdb"))]
5139 {
5140 let rocksdb = factory.rocksdb_provider();
5141 for block_num in 1..=num_blocks {
5142 for acct_idx in 0..accounts_per_block {
5143 let address =
5144 Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5145 let shards = rocksdb.account_history_shards(address).unwrap();
5146 assert!(
5147 !shards.is_empty(),
5148 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5149 );
5150
5151 for s in 1..=slots_per_account as u64 {
5152 let slot = U256::from(s + acct_idx as u64 * 100);
5153 let slot_key = B256::from(slot);
5154 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5155 assert!(
5156 !shards.is_empty(),
5157 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5158 );
5159 }
5160 }
5161 }
5162 }
5163 } else {
5164 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5165 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5166
5167 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5168 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5169
5170 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5171 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5172
5173 for block_num in 1..=num_blocks {
5174 let storage_entries: Vec<_> = provider
5175 .tx_ref()
5176 .cursor_dup_read::<tables::StorageChangeSets>()
5177 .unwrap()
5178 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5179 .unwrap()
5180 .collect::<Result<Vec<_>, _>>()
5181 .unwrap();
5182 assert!(
5183 !storage_entries.is_empty(),
5184 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5185 );
5186
5187 for (_, entry) in &storage_entries {
5188 let slot_key = B256::from(entry.key);
5189 assert!(
5190 slot_key != keccak256(slot_key),
5191 "v1: storage changeset keys should be plain (not hashed)"
5192 );
5193 }
5194 }
5195
5196 let mdbx_account_history =
5197 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5198 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5199
5200 let mdbx_storage_history =
5201 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5202 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5203 }
5204 }
5205
5206 #[test]
5207 fn test_save_blocks_v1_table_assertions() {
5208 run_save_blocks_and_verify(StorageMode::V1);
5209 }
5210
5211 #[test]
5212 fn test_save_blocks_v2_table_assertions() {
5213 run_save_blocks_and_verify(StorageMode::V2);
5214 }
5215
5216 #[test]
5217 fn test_write_and_remove_state_roundtrip_v2() {
5218 let factory = create_test_provider_factory();
5219 let storage_settings = StorageSettings::v2();
5220 assert!(storage_settings.use_hashed_state());
5221 factory.set_storage_settings_cache(storage_settings);
5222
5223 let address = Address::with_last_byte(1);
5224 let hashed_address = keccak256(address);
5225 let slot = U256::from(5);
5226 let slot_key = B256::from(slot);
5227 let hashed_slot = keccak256(slot_key);
5228
5229 {
5230 let sf = factory.static_file_provider();
5231 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5232 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5233 hw.append_header(&h0, &B256::ZERO).unwrap();
5234 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5235 hw.append_header(&h1, &B256::ZERO).unwrap();
5236 hw.commit().unwrap();
5237
5238 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5239 aw.append_account_changeset(vec![], 0).unwrap();
5240 aw.commit().unwrap();
5241
5242 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5243 sw.append_storage_changeset(vec![], 0).unwrap();
5244 sw.commit().unwrap();
5245 }
5246
5247 {
5248 let provider_rw = factory.provider_rw().unwrap();
5249 provider_rw
5250 .tx
5251 .put::<tables::BlockBodyIndices>(
5252 0,
5253 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5254 )
5255 .unwrap();
5256 provider_rw
5257 .tx
5258 .put::<tables::BlockBodyIndices>(
5259 1,
5260 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5261 )
5262 .unwrap();
5263 provider_rw
5264 .tx
5265 .cursor_write::<tables::HashedAccounts>()
5266 .unwrap()
5267 .upsert(
5268 hashed_address,
5269 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5270 )
5271 .unwrap();
5272 provider_rw.commit().unwrap();
5273 }
5274
5275 let provider_rw = factory.provider_rw().unwrap();
5276
5277 let bundle = BundleState::builder(1..=1)
5278 .state_present_account_info(
5279 address,
5280 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5281 )
5282 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5283 .revert_account_info(1, address, Some(None))
5284 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5285 .build();
5286
5287 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5288
5289 provider_rw
5290 .write_state(
5291 &execution_outcome,
5292 OriginalValuesKnown::Yes,
5293 StateWriteConfig {
5294 write_receipts: false,
5295 write_account_changesets: true,
5296 write_storage_changesets: true,
5297 },
5298 )
5299 .unwrap();
5300
5301 let hashed_state =
5302 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5303 provider_rw.write_hashed_state(&hashed_state).unwrap();
5304
5305 let hashed_account = provider_rw
5306 .tx
5307 .cursor_read::<tables::HashedAccounts>()
5308 .unwrap()
5309 .seek_exact(hashed_address)
5310 .unwrap()
5311 .unwrap()
5312 .1;
5313 assert_eq!(hashed_account.nonce, 1);
5314
5315 let hashed_entry = provider_rw
5316 .tx
5317 .cursor_dup_read::<tables::HashedStorages>()
5318 .unwrap()
5319 .seek_by_key_subkey(hashed_address, hashed_slot)
5320 .unwrap()
5321 .unwrap();
5322 assert_eq!(hashed_entry.key, hashed_slot);
5323 assert_eq!(hashed_entry.value, U256::from(10));
5324
5325 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5326 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5327
5328 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5329 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5330
5331 provider_rw.static_file_provider().commit().unwrap();
5332
5333 let sf = factory.static_file_provider();
5334 let storage_cs = sf.storage_changeset(1).unwrap();
5335 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5336 assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5337
5338 provider_rw.remove_state_above(0).unwrap();
5339
5340 let restored_account = provider_rw
5341 .tx
5342 .cursor_read::<tables::HashedAccounts>()
5343 .unwrap()
5344 .seek_exact(hashed_address)
5345 .unwrap();
5346 assert!(
5347 restored_account.is_none(),
5348 "v2: account should be removed (didn't exist before block 1)"
5349 );
5350
5351 let storage_gone = provider_rw
5352 .tx
5353 .cursor_dup_read::<tables::HashedStorages>()
5354 .unwrap()
5355 .seek_by_key_subkey(hashed_address, hashed_slot)
5356 .unwrap();
5357 assert!(
5358 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5359 "v2: storage should be reverted (removed or different key)"
5360 );
5361
5362 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5363 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5364
5365 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5366 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5367 }
5368
5369 #[test]
5370 #[cfg(all(unix, feature = "rocksdb"))]
5371 fn test_unwind_storage_history_indices_v2() {
5372 let factory = create_test_provider_factory();
5373 factory.set_storage_settings_cache(StorageSettings::v2());
5374
5375 let address = Address::with_last_byte(1);
5376 let slot_key = B256::from(U256::from(42));
5377
5378 {
5379 let rocksdb = factory.rocksdb_provider();
5380 let mut batch = rocksdb.batch();
5381 batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5382 batch.commit().unwrap();
5383
5384 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5385 assert!(!shards.is_empty(), "history should be written to rocksdb");
5386 }
5387
5388 let provider_rw = factory.provider_rw().unwrap();
5389
5390 let changesets = vec![
5391 (
5392 BlockNumberAddress((7, address)),
5393 StorageEntry { key: slot_key, value: U256::from(5) },
5394 ),
5395 (
5396 BlockNumberAddress((10, address)),
5397 StorageEntry { key: slot_key, value: U256::from(8) },
5398 ),
5399 ];
5400
5401 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5402 assert_eq!(count, 2);
5403
5404 provider_rw.commit().unwrap();
5405
5406 let rocksdb = factory.rocksdb_provider();
5407 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5408
5409 assert!(
5410 !shards.is_empty(),
5411 "history shards should still exist with block 3 after partial unwind"
5412 );
5413
5414 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5415 assert!(all_blocks.contains(&3), "block 3 should remain");
5416 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5417 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5418 }
5419}