1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::Database,
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55 SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, StoragePath,
65 StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77 cmp::Ordering,
78 collections::{BTreeMap, BTreeSet},
79 fmt::Debug,
80 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81 path::PathBuf,
82 sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89 #[default]
91 Normal,
92 Unwind,
95}
96
97impl CommitOrder {
98 pub const fn is_unwind(&self) -> bool {
100 matches!(self, Self::Unwind)
101 }
102}
103
104pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113 pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119 fn deref(&self) -> &Self::Target {
120 &self.0
121 }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.0
127 }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131 for DatabaseProviderRW<DB, N>
132{
133 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134 &self.0
135 }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139 pub fn commit(self) -> ProviderResult<()> {
141 self.0.commit()
142 }
143
144 pub fn into_tx(self) -> <DB as Database>::TXMut {
146 self.0.into_tx()
147 }
148
149 #[cfg(any(test, feature = "test-utils"))]
151 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152 self.0.minimum_pruning_distance = distance;
153 self
154 }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158 for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161 provider.0
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168 Full,
171 BlocksOnly,
175}
176
177impl SaveBlocksMode {
178 pub const fn with_state(self) -> bool {
180 matches!(self, Self::Full)
181 }
182}
183
184pub struct DatabaseProvider<TX, N: NodeTypes> {
187 tx: TX,
189 chain_spec: Arc<N::ChainSpec>,
191 static_file_provider: StaticFileProvider<N::Primitives>,
193 prune_modes: PruneModes,
195 storage: Arc<N::Storage>,
197 storage_settings: Arc<RwLock<StorageSettings>>,
199 rocksdb_provider: RocksDBProvider,
201 changeset_cache: ChangesetCache,
203 runtime: reth_tasks::Runtime,
205 db_path: PathBuf,
207 pending_rocksdb_batches: PendingRocksDBBatches,
209 commit_order: CommitOrder,
211 minimum_pruning_distance: u64,
213 metrics: metrics::DatabaseProviderMetrics,
215}
216
217impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 let mut s = f.debug_struct("DatabaseProvider");
220 s.field("tx", &self.tx)
221 .field("chain_spec", &self.chain_spec)
222 .field("static_file_provider", &self.static_file_provider)
223 .field("prune_modes", &self.prune_modes)
224 .field("storage", &self.storage)
225 .field("storage_settings", &self.storage_settings)
226 .field("rocksdb_provider", &self.rocksdb_provider)
227 .field("changeset_cache", &self.changeset_cache)
228 .field("runtime", &self.runtime)
229 .field("pending_rocksdb_batches", &"<pending batches>")
230 .field("commit_order", &self.commit_order)
231 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
232 .finish()
233 }
234}
235
236impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
237 pub const fn prune_modes_ref(&self) -> &PruneModes {
239 &self.prune_modes
240 }
241
242 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
244 self.minimum_pruning_distance = distance;
245 self
246 }
247}
248
249impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
250 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
252 trace!(target: "providers::db", "Returning latest state provider");
253 Box::new(LatestStateProviderRef::new(self))
254 }
255
256 pub fn history_by_block_hash<'a>(
258 &'a self,
259 block_hash: BlockHash,
260 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
261 let mut block_number =
262 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
263 if block_number == self.best_block_number().unwrap_or_default() &&
264 block_number == self.last_block_number().unwrap_or_default()
265 {
266 return Ok(Box::new(LatestStateProviderRef::new(self)))
267 }
268
269 block_number += 1;
271
272 let account_history_prune_checkpoint =
273 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
274 let storage_history_prune_checkpoint =
275 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
276
277 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
278
279 if let Some(prune_checkpoint_block_number) =
282 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
283 {
284 state_provider = state_provider.with_lowest_available_account_history_block_number(
285 prune_checkpoint_block_number + 1,
286 );
287 }
288 if let Some(prune_checkpoint_block_number) =
289 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
290 {
291 state_provider = state_provider.with_lowest_available_storage_history_block_number(
292 prune_checkpoint_block_number + 1,
293 );
294 }
295
296 Ok(Box::new(state_provider))
297 }
298
299 #[cfg(feature = "test-utils")]
300 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
302 self.prune_modes = prune_modes;
303 }
304}
305
306impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
307 type Primitives = N::Primitives;
308}
309
310impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
311 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
313 self.static_file_provider.clone()
314 }
315
316 fn get_static_file_writer(
317 &self,
318 block: BlockNumber,
319 segment: StaticFileSegment,
320 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
321 self.static_file_provider.get_writer(block, segment)
322 }
323}
324
325impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
326 fn rocksdb_provider(&self) -> RocksDBProvider {
328 self.rocksdb_provider.clone()
329 }
330
331 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
332 self.pending_rocksdb_batches.lock().push(batch);
333 }
334
335 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
336 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
337 for batch in batches {
338 self.rocksdb_provider.commit_batch(batch)?;
339 }
340 Ok(())
341 }
342}
343
344impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
345 for DatabaseProvider<TX, N>
346{
347 type ChainSpec = N::ChainSpec;
348
349 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
350 self.chain_spec.clone()
351 }
352}
353
354impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
355 #[allow(clippy::too_many_arguments)]
357 fn new_rw_inner(
358 tx: TX,
359 chain_spec: Arc<N::ChainSpec>,
360 static_file_provider: StaticFileProvider<N::Primitives>,
361 prune_modes: PruneModes,
362 storage: Arc<N::Storage>,
363 storage_settings: Arc<RwLock<StorageSettings>>,
364 rocksdb_provider: RocksDBProvider,
365 changeset_cache: ChangesetCache,
366 runtime: reth_tasks::Runtime,
367 db_path: PathBuf,
368 commit_order: CommitOrder,
369 ) -> Self {
370 Self {
371 tx,
372 chain_spec,
373 static_file_provider,
374 prune_modes,
375 storage,
376 storage_settings,
377 rocksdb_provider,
378 changeset_cache,
379 runtime,
380 db_path,
381 pending_rocksdb_batches: Default::default(),
382 commit_order,
383 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
384 metrics: metrics::DatabaseProviderMetrics::default(),
385 }
386 }
387
388 #[allow(clippy::too_many_arguments)]
390 pub fn new_rw(
391 tx: TX,
392 chain_spec: Arc<N::ChainSpec>,
393 static_file_provider: StaticFileProvider<N::Primitives>,
394 prune_modes: PruneModes,
395 storage: Arc<N::Storage>,
396 storage_settings: Arc<RwLock<StorageSettings>>,
397 rocksdb_provider: RocksDBProvider,
398 changeset_cache: ChangesetCache,
399 runtime: reth_tasks::Runtime,
400 db_path: PathBuf,
401 ) -> Self {
402 Self::new_rw_inner(
403 tx,
404 chain_spec,
405 static_file_provider,
406 prune_modes,
407 storage,
408 storage_settings,
409 rocksdb_provider,
410 changeset_cache,
411 runtime,
412 db_path,
413 CommitOrder::Normal,
414 )
415 }
416
417 #[allow(clippy::too_many_arguments)]
419 pub fn new_unwind_rw(
420 tx: TX,
421 chain_spec: Arc<N::ChainSpec>,
422 static_file_provider: StaticFileProvider<N::Primitives>,
423 prune_modes: PruneModes,
424 storage: Arc<N::Storage>,
425 storage_settings: Arc<RwLock<StorageSettings>>,
426 rocksdb_provider: RocksDBProvider,
427 changeset_cache: ChangesetCache,
428 runtime: reth_tasks::Runtime,
429 db_path: PathBuf,
430 ) -> Self {
431 Self::new_rw_inner(
432 tx,
433 chain_spec,
434 static_file_provider,
435 prune_modes,
436 storage,
437 storage_settings,
438 rocksdb_provider,
439 changeset_cache,
440 runtime,
441 db_path,
442 CommitOrder::Unwind,
443 )
444 }
445}
446
447impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
448 fn as_ref(&self) -> &Self {
449 self
450 }
451}
452
453impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
454 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
458 where
459 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
460 {
461 let rocksdb = self.rocksdb_provider();
462 let rocksdb_batch = rocksdb.batch();
463
464 let (result, raw_batch) = f(rocksdb_batch)?;
465
466 if let Some(batch) = raw_batch {
467 self.set_pending_rocksdb_batch(batch);
468 }
469 let _ = raw_batch; Ok(result)
472 }
473
474 fn static_file_write_ctx(
476 &self,
477 save_mode: SaveBlocksMode,
478 first_block: BlockNumber,
479 last_block: BlockNumber,
480 ) -> ProviderResult<StaticFileWriteCtx> {
481 let tip = self.last_block_number()?.max(last_block);
482 Ok(StaticFileWriteCtx {
483 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
484 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
485 write_receipts: save_mode.with_state() &&
486 EitherWriter::receipts_destination(self).is_static_file(),
487 write_account_changesets: save_mode.with_state() &&
488 EitherWriterDestination::account_changesets(self).is_static_file(),
489 write_storage_changesets: save_mode.with_state() &&
490 EitherWriterDestination::storage_changesets(self).is_static_file(),
491 tip,
492 receipts_prune_mode: self.prune_modes.receipts,
493 receipts_prunable: self
495 .static_file_provider
496 .get_highest_static_file_tx(StaticFileSegment::Receipts)
497 .is_none() &&
498 PruneMode::Distance(self.minimum_pruning_distance)
499 .should_prune(first_block, tip),
500 })
501 }
502
503 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
505 RocksDBWriteCtx {
506 first_block_number: first_block,
507 prune_tx_lookup: self.prune_modes.transaction_lookup,
508 storage_settings: self.cached_storage_settings(),
509 pending_batches: self.pending_rocksdb_batches.clone(),
510 }
511 }
512
513 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
522 pub fn save_blocks(
523 &self,
524 blocks: Vec<ExecutedBlock<N::Primitives>>,
525 save_mode: SaveBlocksMode,
526 ) -> ProviderResult<()> {
527 if blocks.is_empty() {
528 debug!(target: "providers::db", "Attempted to write empty block range");
529 return Ok(())
530 }
531
532 let total_start = Instant::now();
533 let block_count = blocks.len() as u64;
534 let first_number = blocks.first().unwrap().recovered_block().number();
535 let last_block_number = blocks.last().unwrap().recovered_block().number();
536
537 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
538
539 let first_tx_num = self
541 .tx
542 .cursor_read::<tables::TransactionBlocks>()?
543 .last()?
544 .map(|(n, _)| n + 1)
545 .unwrap_or_default();
546
547 let tx_nums: Vec<TxNumber> = {
548 let mut nums = Vec::with_capacity(blocks.len());
549 let mut current = first_tx_num;
550 for block in &blocks {
551 nums.push(current);
552 current += block.recovered_block().body().transaction_count() as u64;
553 }
554 nums
555 };
556
557 let mut timings =
558 metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
559
560 let sf_provider = &self.static_file_provider;
562 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
563 let rocksdb_provider = self.rocksdb_provider.clone();
564 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
565 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
566
567 let mut sf_result = None;
568 let mut rocksdb_result = None;
569
570 let runtime = &self.runtime;
572 let span = tracing::Span::current();
575 runtime.storage_pool().in_place_scope(|s| {
576 s.spawn(|_| {
578 let _guard = span.enter();
579 let start = Instant::now();
580 sf_result = Some(
581 sf_provider
582 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
583 .map(|()| start.elapsed()),
584 );
585 });
586
587 if rocksdb_enabled {
589 s.spawn(|_| {
590 let _guard = span.enter();
591 let start = Instant::now();
592 rocksdb_result = Some(
593 rocksdb_provider
594 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
595 .map(|()| start.elapsed()),
596 );
597 });
598 }
599
600 let mdbx_start = Instant::now();
602
603 if !self.cached_storage_settings().storage_v2 &&
605 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
606 {
607 let start = Instant::now();
608 let total_tx_count: usize =
609 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
610 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
611 for (i, block) in blocks.iter().enumerate() {
612 let recovered_block = block.recovered_block();
613 for (tx_num, transaction) in
614 (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
615 {
616 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
617 }
618 }
619
620 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
622
623 self.with_rocksdb_batch(|batch| {
625 let mut tx_hash_writer =
626 EitherWriter::new_transaction_hash_numbers(self, batch)?;
627 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
628 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
629 Ok(((), raw_batch))
630 })?;
631 self.metrics.record_duration(
632 metrics::Action::InsertTransactionHashNumbers,
633 start.elapsed(),
634 );
635 }
636
637 for (i, block) in blocks.iter().enumerate() {
638 let recovered_block = block.recovered_block();
639
640 let start = Instant::now();
641 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
642 timings.insert_block += start.elapsed();
643
644 if save_mode.with_state() {
645 let execution_output = block.execution_outcome();
646
647 let start = Instant::now();
651 self.write_state(
652 WriteStateInput::Single {
653 outcome: execution_output,
654 block: recovered_block.number(),
655 },
656 OriginalValuesKnown::No,
657 StateWriteConfig {
658 write_receipts: !sf_ctx.write_receipts,
659 write_account_changesets: !sf_ctx.write_account_changesets,
660 write_storage_changesets: !sf_ctx.write_storage_changesets,
661 },
662 )?;
663 timings.write_state += start.elapsed();
664 }
665 }
666
667 if save_mode.with_state() {
670 let start = Instant::now();
672 let merged_hashed_state = HashedPostStateSorted::merge_batch(
673 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
674 );
675 if !merged_hashed_state.is_empty() {
676 self.write_hashed_state(&merged_hashed_state)?;
677 }
678 timings.write_hashed_state += start.elapsed();
679
680 let start = Instant::now();
681 let merged_trie =
682 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
683 if !merged_trie.is_empty() {
684 self.write_trie_updates_sorted(&merged_trie)?;
685 }
686 timings.write_trie_updates += start.elapsed();
687 }
688
689 if save_mode.with_state() {
691 let start = Instant::now();
692 self.update_history_indices(first_number..=last_block_number)?;
693 timings.update_history_indices = start.elapsed();
694 }
695
696 let start = Instant::now();
698 self.update_pipeline_stages(last_block_number, false)?;
699 timings.update_pipeline_stages = start.elapsed();
700
701 timings.mdbx = mdbx_start.elapsed();
702
703 Ok::<_, ProviderError>(())
704 })?;
705
706 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
708
709 if rocksdb_enabled {
710 timings.rocksdb = rocksdb_result.ok_or_else(|| {
711 ProviderError::Database(reth_db_api::DatabaseError::Other(
712 "RocksDB thread panicked".into(),
713 ))
714 })??;
715 }
716
717 timings.total = total_start.elapsed();
718
719 self.metrics.record_save_blocks(&timings);
720 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
721
722 Ok(())
723 }
724
725 #[instrument(level = "debug", target = "providers::db", skip_all)]
729 fn insert_block_mdbx_only(
730 &self,
731 block: &RecoveredBlock<BlockTy<N>>,
732 first_tx_num: TxNumber,
733 ) -> ProviderResult<StoredBlockBodyIndices> {
734 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
735 EitherWriterDestination::senders(self).is_database()
736 {
737 let start = Instant::now();
738 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
739 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
740 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
741 cursor.append(tx_num, &sender)?;
742 }
743 self.metrics
744 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
745 }
746
747 let block_number = block.number();
748 let tx_count = block.body().transaction_count() as u64;
749
750 let start = Instant::now();
751 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
752 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
753
754 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
755
756 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
757 }
758
759 fn write_block_body_indices(
762 &self,
763 block_number: BlockNumber,
764 body: &BodyTy<N>,
765 first_tx_num: TxNumber,
766 tx_count: u64,
767 ) -> ProviderResult<()> {
768 let start = Instant::now();
770 self.tx
771 .cursor_write::<tables::BlockBodyIndices>()?
772 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
773 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
774
775 if tx_count > 0 {
777 let start = Instant::now();
778 self.tx
779 .cursor_write::<tables::TransactionBlocks>()?
780 .append(first_tx_num + tx_count - 1, &block_number)?;
781 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
782 }
783
784 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
786
787 Ok(())
788 }
789
790 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
795 let changed_accounts = self.account_changesets_range(from..)?;
796
797 self.unwind_account_hashing(changed_accounts.iter())?;
799
800 self.unwind_account_history_indices(changed_accounts.iter())?;
802
803 let changed_storages = self.storage_changesets_range(from..)?;
804
805 self.unwind_storage_hashing(changed_storages.iter().copied())?;
807
808 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
810
811 let db_tip_block = self
814 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
815 .as_ref()
816 .map(|chk| chk.block_number)
817 .ok_or_else(|| ProviderError::InsufficientChangesets {
818 requested: from,
819 available: 0..=0,
820 })?;
821
822 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
823 self.write_trie_updates_sorted(&trie_revert)?;
824
825 Ok(())
826 }
827
828 fn remove_receipts_from(
830 &self,
831 from_tx: TxNumber,
832 last_block: BlockNumber,
833 ) -> ProviderResult<()> {
834 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
836
837 if EitherWriter::receipts_destination(self).is_static_file() {
838 let static_file_receipt_num =
839 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
840
841 let to_delete = static_file_receipt_num
842 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
843 .unwrap_or_default();
844
845 self.static_file_provider
846 .latest_writer(StaticFileSegment::Receipts)?
847 .prune_receipts(to_delete, last_block)?;
848 }
849
850 Ok(())
851 }
852
853 fn write_bytecodes(
855 &self,
856 bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
857 ) -> ProviderResult<()> {
858 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
859 for (hash, bytecode) in bytecodes {
860 bytecodes_cursor.upsert(hash, &bytecode)?;
861 }
862 Ok(())
863 }
864}
865
866impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
867 fn try_into_history_at_block(
868 self,
869 mut block_number: BlockNumber,
870 ) -> ProviderResult<StateProviderBox> {
871 let best_block = self.best_block_number().unwrap_or_default();
872
873 if block_number > best_block {
875 return Err(ProviderError::BlockNotExecuted {
876 requested: block_number,
877 executed: best_block,
878 });
879 }
880
881 if block_number == best_block {
883 return Ok(Box::new(LatestStateProvider::new(self)));
884 }
885
886 block_number += 1;
888
889 let account_history_prune_checkpoint =
890 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
891 let storage_history_prune_checkpoint =
892 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
893
894 let mut state_provider = HistoricalStateProvider::new(self, block_number);
895
896 if let Some(prune_checkpoint_block_number) =
899 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
900 {
901 state_provider = state_provider.with_lowest_available_account_history_block_number(
902 prune_checkpoint_block_number + 1,
903 );
904 }
905 if let Some(prune_checkpoint_block_number) =
906 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
907 {
908 state_provider = state_provider.with_lowest_available_storage_history_block_number(
909 prune_checkpoint_block_number + 1,
910 );
911 }
912
913 Ok(Box::new(state_provider))
914 }
915}
916
917fn unwind_history_shards<S, T, C>(
932 cursor: &mut C,
933 start_key: T::Key,
934 block_number: BlockNumber,
935 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
936) -> ProviderResult<Vec<u64>>
937where
938 T: Table<Value = BlockNumberList>,
939 T::Key: AsRef<ShardedKey<S>>,
940 C: DbCursorRO<T> + DbCursorRW<T>,
941{
942 let mut item = cursor.seek_exact(start_key)?;
944 while let Some((sharded_key, list)) = item {
945 if !shard_belongs_to_key(&sharded_key) {
947 break
948 }
949
950 cursor.delete_current()?;
953
954 let first = list.iter().next().expect("List can't be empty");
957
958 if first >= block_number {
961 item = cursor.prev()?;
962 continue
963 }
964 else if block_number <= sharded_key.as_ref().highest_block_number {
967 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
970 }
971 return Ok(list.iter().collect::<Vec<_>>())
974 }
975
976 Ok(Vec::new())
978}
979
980impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
981 #[allow(clippy::too_many_arguments)]
983 pub fn new(
984 tx: TX,
985 chain_spec: Arc<N::ChainSpec>,
986 static_file_provider: StaticFileProvider<N::Primitives>,
987 prune_modes: PruneModes,
988 storage: Arc<N::Storage>,
989 storage_settings: Arc<RwLock<StorageSettings>>,
990 rocksdb_provider: RocksDBProvider,
991 changeset_cache: ChangesetCache,
992 runtime: reth_tasks::Runtime,
993 db_path: PathBuf,
994 ) -> Self {
995 Self {
996 tx,
997 chain_spec,
998 static_file_provider,
999 prune_modes,
1000 storage,
1001 storage_settings,
1002 rocksdb_provider,
1003 changeset_cache,
1004 runtime,
1005 db_path,
1006 pending_rocksdb_batches: Default::default(),
1007 commit_order: CommitOrder::Normal,
1008 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1009 metrics: metrics::DatabaseProviderMetrics::default(),
1010 }
1011 }
1012
1013 pub fn into_tx(self) -> TX {
1015 self.tx
1016 }
1017
1018 pub const fn tx_mut(&mut self) -> &mut TX {
1020 &mut self.tx
1021 }
1022
1023 pub const fn tx_ref(&self) -> &TX {
1025 &self.tx
1026 }
1027
1028 pub fn chain_spec(&self) -> &N::ChainSpec {
1030 &self.chain_spec
1031 }
1032}
1033
1034impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1035 fn recovered_block<H, HF, B, BF>(
1036 &self,
1037 id: BlockHashOrNumber,
1038 _transaction_kind: TransactionVariant,
1039 header_by_number: HF,
1040 construct_block: BF,
1041 ) -> ProviderResult<Option<B>>
1042 where
1043 H: AsRef<HeaderTy<N>>,
1044 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1045 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1046 {
1047 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1048 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1049
1050 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1057
1058 let tx_range = body.tx_num_range();
1059
1060 let transactions = if tx_range.is_empty() {
1061 vec![]
1062 } else {
1063 self.transactions_by_tx_range(tx_range.clone())?
1064 };
1065
1066 let body = self
1067 .storage
1068 .reader()
1069 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1070 .pop()
1071 .ok_or(ProviderError::InvalidStorageOutput)?;
1072
1073 let senders = if tx_range.is_empty() {
1074 vec![]
1075 } else {
1076 let known_senders: HashMap<TxNumber, Address> =
1077 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1078
1079 let mut senders = Vec::with_capacity(body.transactions().len());
1080 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1081 match known_senders.get(&tx_num) {
1082 None => {
1083 let sender = tx.recover_signer_unchecked()?;
1084 senders.push(sender);
1085 }
1086 Some(sender) => senders.push(*sender),
1087 }
1088 }
1089 senders
1090 };
1091
1092 construct_block(header, body, senders)
1093 }
1094
1095 fn block_range<F, H, HF, R>(
1105 &self,
1106 range: RangeInclusive<BlockNumber>,
1107 headers_range: HF,
1108 mut assemble_block: F,
1109 ) -> ProviderResult<Vec<R>>
1110 where
1111 H: AsRef<HeaderTy<N>>,
1112 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1113 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1114 {
1115 if range.is_empty() {
1116 return Ok(Vec::new())
1117 }
1118
1119 let len = range.end().saturating_sub(*range.start()) as usize;
1120 let mut blocks = Vec::with_capacity(len);
1121
1122 let headers = headers_range(range.clone())?;
1123
1124 let present_headers = self
1130 .block_body_indices_range(range)?
1131 .into_iter()
1132 .map(|b| b.tx_num_range())
1133 .zip(headers)
1134 .collect::<Vec<_>>();
1135
1136 let mut inputs = Vec::with_capacity(present_headers.len());
1137 for (tx_range, header) in &present_headers {
1138 let transactions = if tx_range.is_empty() {
1139 Vec::new()
1140 } else {
1141 self.transactions_by_tx_range(tx_range.clone())?
1142 };
1143
1144 inputs.push((header.as_ref(), transactions));
1145 }
1146
1147 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1148
1149 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1150 blocks.push(assemble_block(header, body, tx_range)?);
1151 }
1152
1153 Ok(blocks)
1154 }
1155
1156 fn block_with_senders_range<H, HF, B, BF>(
1167 &self,
1168 range: RangeInclusive<BlockNumber>,
1169 headers_range: HF,
1170 assemble_block: BF,
1171 ) -> ProviderResult<Vec<B>>
1172 where
1173 H: AsRef<HeaderTy<N>>,
1174 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1175 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1176 {
1177 self.block_range(range, headers_range, |header, body, tx_range| {
1178 let senders = if tx_range.is_empty() {
1179 Vec::new()
1180 } else {
1181 let known_senders: HashMap<TxNumber, Address> =
1182 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1183
1184 let mut senders = Vec::with_capacity(body.transactions().len());
1185 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1186 match known_senders.get(&tx_num) {
1187 None => {
1188 let sender = tx.recover_signer_unchecked()?;
1190 senders.push(sender);
1191 }
1192 Some(sender) => senders.push(*sender),
1193 }
1194 }
1195
1196 senders
1197 };
1198
1199 assemble_block(header, body, senders)
1200 })
1201 }
1202
1203 fn populate_bundle_state<A, S>(
1207 &self,
1208 account_changeset: Vec<(u64, AccountBeforeTx)>,
1209 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1210 plain_accounts_cursor: &mut A,
1211 plain_storage_cursor: &mut S,
1212 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1213 where
1214 A: DbCursorRO<PlainAccountState>,
1215 S: DbDupCursorRO<PlainStorageState>,
1216 {
1217 let mut state: BundleStateInit = HashMap::default();
1221
1222 let mut reverts: RevertsInit = HashMap::default();
1228
1229 for (block_number, account_before) in account_changeset.into_iter().rev() {
1231 let AccountBeforeTx { info: old_info, address } = account_before;
1232 match state.entry(address) {
1233 hash_map::Entry::Vacant(entry) => {
1234 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1235 entry.insert((old_info, new_info, HashMap::default()));
1236 }
1237 hash_map::Entry::Occupied(mut entry) => {
1238 entry.get_mut().0 = old_info;
1240 }
1241 }
1242 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1244 }
1245
1246 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1248 let BlockNumberAddress((block_number, address)) = block_and_address;
1249 let account_state = match state.entry(address) {
1251 hash_map::Entry::Vacant(entry) => {
1252 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1253 entry.insert((present_info, present_info, HashMap::default()))
1254 }
1255 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1256 };
1257
1258 match account_state.2.entry(old_storage.key) {
1260 hash_map::Entry::Vacant(entry) => {
1261 let new_storage = plain_storage_cursor
1262 .seek_by_key_subkey(address, old_storage.key)?
1263 .filter(|storage| storage.key == old_storage.key)
1264 .unwrap_or_default();
1265 entry.insert((old_storage.value, new_storage.value));
1266 }
1267 hash_map::Entry::Occupied(mut entry) => {
1268 entry.get_mut().0 = old_storage.value;
1269 }
1270 };
1271
1272 reverts
1273 .entry(block_number)
1274 .or_default()
1275 .entry(address)
1276 .or_default()
1277 .1
1278 .push(old_storage);
1279 }
1280
1281 Ok((state, reverts))
1282 }
1283
1284 fn populate_bundle_state_hashed(
1289 &self,
1290 account_changeset: Vec<(u64, AccountBeforeTx)>,
1291 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1292 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1293 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1294 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1295 let mut state: BundleStateInit = HashMap::default();
1296 let mut reverts: RevertsInit = HashMap::default();
1297
1298 for (block_number, account_before) in account_changeset.into_iter().rev() {
1300 let AccountBeforeTx { info: old_info, address } = account_before;
1301 match state.entry(address) {
1302 hash_map::Entry::Vacant(entry) => {
1303 let hashed_address = keccak256(address);
1304 let new_info =
1305 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1306 entry.insert((old_info, new_info, HashMap::default()));
1307 }
1308 hash_map::Entry::Occupied(mut entry) => {
1309 entry.get_mut().0 = old_info;
1310 }
1311 }
1312 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1313 }
1314
1315 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1317 let BlockNumberAddress((block_number, address)) = block_and_address;
1318 let account_state = match state.entry(address) {
1319 hash_map::Entry::Vacant(entry) => {
1320 let hashed_address = keccak256(address);
1321 let present_info =
1322 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1323 entry.insert((present_info, present_info, HashMap::default()))
1324 }
1325 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1326 };
1327
1328 let hashed_storage_key = keccak256(old_storage.key);
1330 match account_state.2.entry(old_storage.key) {
1331 hash_map::Entry::Vacant(entry) => {
1332 let hashed_address = keccak256(address);
1333 let new_storage = hashed_storage_cursor
1334 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
1335 .filter(|storage| storage.key == hashed_storage_key)
1336 .unwrap_or_default();
1337 entry.insert((old_storage.value, new_storage.value));
1338 }
1339 hash_map::Entry::Occupied(mut entry) => {
1340 entry.get_mut().0 = old_storage.value;
1341 }
1342 };
1343
1344 reverts
1345 .entry(block_number)
1346 .or_default()
1347 .entry(address)
1348 .or_default()
1349 .1
1350 .push(old_storage);
1351 }
1352
1353 Ok((state, reverts))
1354 }
1355}
1356
1357impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1358 fn append_history_index<P, T>(
1366 &self,
1367 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1368 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1369 ) -> ProviderResult<()>
1370 where
1371 P: Copy,
1372 T: Table<Value = BlockNumberList>,
1373 {
1374 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1377
1378 let mut cursor = self.tx.cursor_write::<T>()?;
1379
1380 for (partial_key, indices) in index_updates {
1381 let last_key = sharded_key_factory(partial_key, u64::MAX);
1382 let mut last_shard = cursor
1383 .seek_exact(last_key.clone())?
1384 .map(|(_, list)| list)
1385 .unwrap_or_else(BlockNumberList::empty);
1386
1387 last_shard.append(indices).map_err(ProviderError::other)?;
1388
1389 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1391 cursor.upsert(last_key, &last_shard)?;
1392 continue;
1393 }
1394
1395 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1397 let mut chunks_peekable = chunks.into_iter().peekable();
1398
1399 while let Some(chunk) = chunks_peekable.next() {
1400 let shard = BlockNumberList::new_pre_sorted(chunk);
1401 let highest_block_number = if chunks_peekable.peek().is_some() {
1402 shard.iter().next_back().expect("`chunks` does not return empty list")
1403 } else {
1404 u64::MAX
1406 };
1407
1408 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1409 }
1410 }
1411
1412 Ok(())
1413 }
1414}
1415
1416impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1417 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1418 if self.cached_storage_settings().use_hashed_state() {
1419 let hashed_address = keccak256(address);
1420 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1421 } else {
1422 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1423 }
1424 }
1425}
1426
1427impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1428 fn changed_accounts_with_range(
1429 &self,
1430 range: RangeInclusive<BlockNumber>,
1431 ) -> ProviderResult<BTreeSet<Address>> {
1432 let mut reader = EitherReader::new_account_changesets(self)?;
1433
1434 reader.changed_accounts_with_range(range)
1435 }
1436
1437 fn basic_accounts(
1438 &self,
1439 iter: impl IntoIterator<Item = Address>,
1440 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1441 if self.cached_storage_settings().use_hashed_state() {
1442 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1443 Ok(iter
1444 .into_iter()
1445 .map(|address| {
1446 let hashed_address = keccak256(address);
1447 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1448 })
1449 .collect::<Result<Vec<_>, _>>()?)
1450 } else {
1451 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1452 Ok(iter
1453 .into_iter()
1454 .map(|address| {
1455 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1456 })
1457 .collect::<Result<Vec<_>, _>>()?)
1458 }
1459 }
1460
1461 fn changed_accounts_and_blocks_with_range(
1462 &self,
1463 range: RangeInclusive<BlockNumber>,
1464 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1465 let highest_static_block = self
1466 .static_file_provider
1467 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1468
1469 if let Some(highest) = highest_static_block &&
1470 self.cached_storage_settings().storage_v2
1471 {
1472 let start = *range.start();
1473 let static_end = (*range.end()).min(highest);
1474
1475 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1476 if start <= static_end {
1477 for block in start..=static_end {
1478 let block_changesets = self.account_block_changeset(block)?;
1479 for changeset in block_changesets {
1480 changed_accounts_and_blocks
1481 .entry(changeset.address)
1482 .or_default()
1483 .push(block);
1484 }
1485 }
1486 }
1487
1488 Ok(changed_accounts_and_blocks)
1489 } else {
1490 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1491
1492 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1493 BTreeMap::new(),
1494 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1495 let (index, account) = entry?;
1496 accounts.entry(account.address).or_default().push(index);
1497 Ok(accounts)
1498 },
1499 )?;
1500
1501 Ok(account_transitions)
1502 }
1503 }
1504}
1505
1506impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1507 fn storage_changeset(
1508 &self,
1509 block_number: BlockNumber,
1510 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1511 if self.cached_storage_settings().storage_v2 {
1512 self.static_file_provider.storage_changeset(block_number)
1513 } else {
1514 let range = block_number..=block_number;
1515 let storage_range = BlockNumberAddress::range(range);
1516 self.tx
1517 .cursor_dup_read::<tables::StorageChangeSets>()?
1518 .walk_range(storage_range)?
1519 .map(|r| {
1520 let (bna, entry) = r?;
1521 Ok((bna, entry))
1522 })
1523 .collect()
1524 }
1525 }
1526
1527 fn get_storage_before_block(
1528 &self,
1529 block_number: BlockNumber,
1530 address: Address,
1531 storage_key: B256,
1532 ) -> ProviderResult<Option<StorageEntry>> {
1533 if self.cached_storage_settings().storage_v2 {
1534 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1535 } else {
1536 Ok(self
1537 .tx
1538 .cursor_dup_read::<tables::StorageChangeSets>()?
1539 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1540 .filter(|entry| entry.key == storage_key))
1541 }
1542 }
1543
1544 fn storage_changesets_range(
1545 &self,
1546 range: impl RangeBounds<BlockNumber>,
1547 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1548 if self.cached_storage_settings().storage_v2 {
1549 self.static_file_provider.storage_changesets_range(range)
1550 } else {
1551 self.tx
1552 .cursor_dup_read::<tables::StorageChangeSets>()?
1553 .walk_range(BlockNumberAddressRange::from(range))?
1554 .map(|r| {
1555 let (bna, entry) = r?;
1556 Ok((bna, entry))
1557 })
1558 .collect()
1559 }
1560 }
1561
1562 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1563 if self.cached_storage_settings().storage_v2 {
1564 self.static_file_provider.storage_changeset_count()
1565 } else {
1566 Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1567 }
1568 }
1569}
1570
1571impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1572 fn account_block_changeset(
1573 &self,
1574 block_number: BlockNumber,
1575 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1576 if self.cached_storage_settings().storage_v2 {
1577 let static_changesets =
1578 self.static_file_provider.account_block_changeset(block_number)?;
1579 Ok(static_changesets)
1580 } else {
1581 let range = block_number..=block_number;
1582 self.tx
1583 .cursor_read::<tables::AccountChangeSets>()?
1584 .walk_range(range)?
1585 .map(|result| -> ProviderResult<_> {
1586 let (_, account_before) = result?;
1587 Ok(account_before)
1588 })
1589 .collect()
1590 }
1591 }
1592
1593 fn get_account_before_block(
1594 &self,
1595 block_number: BlockNumber,
1596 address: Address,
1597 ) -> ProviderResult<Option<AccountBeforeTx>> {
1598 if self.cached_storage_settings().storage_v2 {
1599 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1600 } else {
1601 self.tx
1602 .cursor_dup_read::<tables::AccountChangeSets>()?
1603 .seek_by_key_subkey(block_number, address)?
1604 .filter(|acc| acc.address == address)
1605 .map(Ok)
1606 .transpose()
1607 }
1608 }
1609
1610 fn account_changesets_range(
1611 &self,
1612 range: impl core::ops::RangeBounds<BlockNumber>,
1613 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1614 if self.cached_storage_settings().storage_v2 {
1615 self.static_file_provider.account_changesets_range(range)
1616 } else {
1617 self.tx
1618 .cursor_read::<tables::AccountChangeSets>()?
1619 .walk_range(to_range(range))?
1620 .map(|r| r.map_err(Into::into))
1621 .collect()
1622 }
1623 }
1624
1625 fn account_changeset_count(&self) -> ProviderResult<usize> {
1626 if self.cached_storage_settings().storage_v2 {
1629 self.static_file_provider.account_changeset_count()
1630 } else {
1631 Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1632 }
1633 }
1634}
1635
1636impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1637 for DatabaseProvider<TX, N>
1638{
1639 type Header = HeaderTy<N>;
1640
1641 fn local_tip_header(
1642 &self,
1643 highest_uninterrupted_block: BlockNumber,
1644 ) -> ProviderResult<SealedHeader<Self::Header>> {
1645 let static_file_provider = self.static_file_provider();
1646
1647 let next_static_file_block_num = static_file_provider
1650 .get_highest_static_file_block(StaticFileSegment::Headers)
1651 .map(|id| id + 1)
1652 .unwrap_or_default();
1653 let next_block = highest_uninterrupted_block + 1;
1654
1655 match next_static_file_block_num.cmp(&next_block) {
1656 Ordering::Greater => {
1659 let mut static_file_producer =
1660 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1661 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1662 static_file_producer.commit()?
1665 }
1666 Ordering::Less => {
1667 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1669 }
1670 Ordering::Equal => {}
1671 }
1672
1673 let local_head = static_file_provider
1674 .sealed_header(highest_uninterrupted_block)?
1675 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1676
1677 Ok(local_head)
1678 }
1679}
1680
1681impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1682 type Header = HeaderTy<N>;
1683
1684 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1685 if let Some(num) = self.block_number(block_hash)? {
1686 Ok(self.header_by_number(num)?)
1687 } else {
1688 Ok(None)
1689 }
1690 }
1691
1692 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1693 self.static_file_provider.header_by_number(num)
1694 }
1695
1696 fn headers_range(
1697 &self,
1698 range: impl RangeBounds<BlockNumber>,
1699 ) -> ProviderResult<Vec<Self::Header>> {
1700 self.static_file_provider.headers_range(range)
1701 }
1702
1703 fn sealed_header(
1704 &self,
1705 number: BlockNumber,
1706 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1707 self.static_file_provider.sealed_header(number)
1708 }
1709
1710 fn sealed_headers_while(
1711 &self,
1712 range: impl RangeBounds<BlockNumber>,
1713 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1714 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1715 self.static_file_provider.sealed_headers_while(range, predicate)
1716 }
1717}
1718
1719impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1720 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1721 self.static_file_provider.block_hash(number)
1722 }
1723
1724 fn canonical_hashes_range(
1725 &self,
1726 start: BlockNumber,
1727 end: BlockNumber,
1728 ) -> ProviderResult<Vec<B256>> {
1729 self.static_file_provider.canonical_hashes_range(start, end)
1730 }
1731}
1732
1733impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1734 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1735 let best_number = self.best_block_number()?;
1736 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1737 Ok(ChainInfo { best_hash, best_number })
1738 }
1739
1740 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1741 Ok(self
1744 .get_stage_checkpoint(StageId::Finish)?
1745 .map(|checkpoint| checkpoint.block_number)
1746 .unwrap_or_default())
1747 }
1748
1749 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1750 self.static_file_provider.last_block_number()
1751 }
1752
1753 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1754 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1755 }
1756}
1757
1758impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1759 type Block = BlockTy<N>;
1760
1761 fn find_block_by_hash(
1762 &self,
1763 hash: B256,
1764 source: BlockSource,
1765 ) -> ProviderResult<Option<Self::Block>> {
1766 if source.is_canonical() {
1767 self.block(hash.into())
1768 } else {
1769 Ok(None)
1770 }
1771 }
1772
1773 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1781 if let Some(number) = self.convert_hash_or_number(id)? {
1782 let earliest_available = self.static_file_provider.earliest_history_height();
1783 if number < earliest_available {
1784 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1785 }
1786
1787 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1788
1789 let Some(transactions) = self.transactions_by_block(number.into())? else {
1794 return Ok(None)
1795 };
1796
1797 let body = self
1798 .storage
1799 .reader()
1800 .read_block_bodies(self, vec![(&header, transactions)])?
1801 .pop()
1802 .ok_or(ProviderError::InvalidStorageOutput)?;
1803
1804 return Ok(Some(Self::Block::new(header, body)))
1805 }
1806
1807 Ok(None)
1808 }
1809
1810 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1811 Ok(None)
1812 }
1813
1814 fn pending_block_and_receipts(
1815 &self,
1816 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1817 Ok(None)
1818 }
1819
1820 fn recovered_block(
1829 &self,
1830 id: BlockHashOrNumber,
1831 transaction_kind: TransactionVariant,
1832 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1833 self.recovered_block(
1834 id,
1835 transaction_kind,
1836 |block_number| self.header_by_number(block_number),
1837 |header, body, senders| {
1838 Self::Block::new(header, body)
1839 .try_into_recovered_unchecked(senders)
1843 .map(Some)
1844 .map_err(|_| ProviderError::SenderRecoveryError)
1845 },
1846 )
1847 }
1848
1849 fn sealed_block_with_senders(
1850 &self,
1851 id: BlockHashOrNumber,
1852 transaction_kind: TransactionVariant,
1853 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1854 self.recovered_block(
1855 id,
1856 transaction_kind,
1857 |block_number| self.sealed_header(block_number),
1858 |header, body, senders| {
1859 Self::Block::new_sealed(header, body)
1860 .try_with_senders_unchecked(senders)
1864 .map(Some)
1865 .map_err(|_| ProviderError::SenderRecoveryError)
1866 },
1867 )
1868 }
1869
1870 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1871 self.block_range(
1872 range,
1873 |range| self.headers_range(range),
1874 |header, body, _| Ok(Self::Block::new(header, body)),
1875 )
1876 }
1877
1878 fn block_with_senders_range(
1879 &self,
1880 range: RangeInclusive<BlockNumber>,
1881 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1882 self.block_with_senders_range(
1883 range,
1884 |range| self.headers_range(range),
1885 |header, body, senders| {
1886 Self::Block::new(header, body)
1887 .try_into_recovered_unchecked(senders)
1888 .map_err(|_| ProviderError::SenderRecoveryError)
1889 },
1890 )
1891 }
1892
1893 fn recovered_block_range(
1894 &self,
1895 range: RangeInclusive<BlockNumber>,
1896 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1897 self.block_with_senders_range(
1898 range,
1899 |range| self.sealed_headers_range(range),
1900 |header, body, senders| {
1901 Self::Block::new_sealed(header, body)
1902 .try_with_senders(senders)
1903 .map_err(|_| ProviderError::SenderRecoveryError)
1904 },
1905 )
1906 }
1907
1908 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1909 Ok(self
1910 .tx
1911 .cursor_read::<tables::TransactionBlocks>()?
1912 .seek(id)
1913 .map(|b| b.map(|(_, bn)| bn))?)
1914 }
1915}
1916
1917impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1918 for DatabaseProvider<TX, N>
1919{
1920 fn transaction_hashes_by_range(
1923 &self,
1924 tx_range: Range<TxNumber>,
1925 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1926 self.static_file_provider.transaction_hashes_by_range(tx_range)
1927 }
1928}
1929
1930impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1932 type Transaction = TxTy<N>;
1933
1934 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1935 self.with_rocksdb_snapshot(|rocksdb_ref| {
1936 let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1937 reader.get_transaction_hash_number(tx_hash)
1938 })
1939 }
1940
1941 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1942 self.static_file_provider.transaction_by_id(id)
1943 }
1944
1945 fn transaction_by_id_unhashed(
1946 &self,
1947 id: TxNumber,
1948 ) -> ProviderResult<Option<Self::Transaction>> {
1949 self.static_file_provider.transaction_by_id_unhashed(id)
1950 }
1951
1952 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1953 if let Some(id) = self.transaction_id(hash)? {
1954 Ok(self.transaction_by_id_unhashed(id)?)
1955 } else {
1956 Ok(None)
1957 }
1958 }
1959
1960 fn transaction_by_hash_with_meta(
1961 &self,
1962 tx_hash: TxHash,
1963 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1964 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1965 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1966 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1967 let Some(sealed_header) = self.sealed_header(block_number)?
1968 {
1969 let (header, block_hash) = sealed_header.split();
1970 if let Some(block_body) = self.block_body_indices(block_number)? {
1971 let index = transaction_id - block_body.first_tx_num();
1976
1977 let meta = TransactionMeta {
1978 tx_hash,
1979 index,
1980 block_hash,
1981 block_number,
1982 base_fee: header.base_fee_per_gas(),
1983 excess_blob_gas: header.excess_blob_gas(),
1984 timestamp: header.timestamp(),
1985 };
1986
1987 return Ok(Some((transaction, meta)))
1988 }
1989 }
1990
1991 Ok(None)
1992 }
1993
1994 fn transactions_by_block(
1995 &self,
1996 id: BlockHashOrNumber,
1997 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1998 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1999 let Some(body) = self.block_body_indices(block_number)?
2000 {
2001 let tx_range = body.tx_num_range();
2002 return if tx_range.is_empty() {
2003 Ok(Some(Vec::new()))
2004 } else {
2005 self.transactions_by_tx_range(tx_range).map(Some)
2006 }
2007 }
2008 Ok(None)
2009 }
2010
2011 fn transactions_by_block_range(
2012 &self,
2013 range: impl RangeBounds<BlockNumber>,
2014 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2015 let range = to_range(range);
2016
2017 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2018 .into_iter()
2019 .map(|body| {
2020 let tx_num_range = body.tx_num_range();
2021 if tx_num_range.is_empty() {
2022 Ok(Vec::new())
2023 } else {
2024 self.transactions_by_tx_range(tx_num_range)
2025 }
2026 })
2027 .collect()
2028 }
2029
2030 fn transactions_by_tx_range(
2031 &self,
2032 range: impl RangeBounds<TxNumber>,
2033 ) -> ProviderResult<Vec<Self::Transaction>> {
2034 self.static_file_provider.transactions_by_tx_range(range)
2035 }
2036
2037 fn senders_by_tx_range(
2038 &self,
2039 range: impl RangeBounds<TxNumber>,
2040 ) -> ProviderResult<Vec<Address>> {
2041 if EitherWriterDestination::senders(self).is_static_file() {
2042 self.static_file_provider.senders_by_tx_range(range)
2043 } else {
2044 self.cursor_read_collect::<tables::TransactionSenders>(range)
2045 }
2046 }
2047
2048 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2049 if EitherWriterDestination::senders(self).is_static_file() {
2050 self.static_file_provider.transaction_sender(id)
2051 } else {
2052 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2053 }
2054 }
2055}
2056
2057impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2058 type Receipt = ReceiptTy<N>;
2059
2060 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2061 self.static_file_provider.get_with_static_file_or_database(
2062 StaticFileSegment::Receipts,
2063 id,
2064 |static_file| static_file.receipt(id),
2065 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2066 )
2067 }
2068
2069 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2070 if let Some(id) = self.transaction_id(hash)? {
2071 self.receipt(id)
2072 } else {
2073 Ok(None)
2074 }
2075 }
2076
2077 fn receipts_by_block(
2078 &self,
2079 block: BlockHashOrNumber,
2080 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2081 if let Some(number) = self.convert_hash_or_number(block)? &&
2082 let Some(body) = self.block_body_indices(number)?
2083 {
2084 let tx_range = body.tx_num_range();
2085 return if tx_range.is_empty() {
2086 Ok(Some(Vec::new()))
2087 } else {
2088 self.receipts_by_tx_range(tx_range).map(Some)
2089 }
2090 }
2091 Ok(None)
2092 }
2093
2094 fn receipts_by_tx_range(
2095 &self,
2096 range: impl RangeBounds<TxNumber>,
2097 ) -> ProviderResult<Vec<Self::Receipt>> {
2098 self.static_file_provider.get_range_with_static_file_or_database(
2099 StaticFileSegment::Receipts,
2100 to_range(range),
2101 |static_file, range, _| static_file.receipts_by_tx_range(range),
2102 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2103 |_| true,
2104 )
2105 }
2106
2107 fn receipts_by_block_range(
2108 &self,
2109 block_range: RangeInclusive<BlockNumber>,
2110 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2111 if block_range.is_empty() {
2112 return Ok(Vec::new());
2113 }
2114
2115 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2117 let mut block_body_indices = Vec::with_capacity(range_len);
2118 for block_num in block_range {
2119 if let Some(indices) = self.block_body_indices(block_num)? {
2120 block_body_indices.push(indices);
2121 } else {
2122 block_body_indices.push(StoredBlockBodyIndices::default());
2124 }
2125 }
2126
2127 if block_body_indices.is_empty() {
2128 return Ok(Vec::new());
2129 }
2130
2131 let non_empty_blocks: Vec<_> =
2133 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2134
2135 if non_empty_blocks.is_empty() {
2136 return Ok(vec![Vec::new(); block_body_indices.len()]);
2138 }
2139
2140 let first_tx = non_empty_blocks[0].first_tx_num();
2142 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2143
2144 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2146 let mut receipts_iter = all_receipts.into_iter();
2147
2148 let mut result = Vec::with_capacity(block_body_indices.len());
2150 for indices in &block_body_indices {
2151 if indices.tx_count == 0 {
2152 result.push(Vec::new());
2153 } else {
2154 let block_receipts =
2155 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2156 result.push(block_receipts);
2157 }
2158 }
2159
2160 Ok(result)
2161 }
2162}
2163
2164impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2165 for DatabaseProvider<TX, N>
2166{
2167 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2168 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2169 }
2170
2171 fn block_body_indices_range(
2172 &self,
2173 range: RangeInclusive<BlockNumber>,
2174 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2175 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2176 }
2177}
2178
2179impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2180 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2181 Ok(if let Some(encoded) = id.get_pre_encoded() {
2182 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2183 } else {
2184 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2185 })
2186 }
2187
2188 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2190 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2191 }
2192
2193 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2194 self.tx
2195 .cursor_read::<tables::StageCheckpoints>()?
2196 .walk(None)?
2197 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2198 .map_err(ProviderError::Database)
2199 }
2200}
2201
2202impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2203 fn save_stage_checkpoint(
2205 &self,
2206 id: StageId,
2207 checkpoint: StageCheckpoint,
2208 ) -> ProviderResult<()> {
2209 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2210 }
2211
2212 fn save_stage_checkpoint_progress(
2214 &self,
2215 id: StageId,
2216 checkpoint: Vec<u8>,
2217 ) -> ProviderResult<()> {
2218 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2219 }
2220
2221 #[instrument(level = "debug", target = "providers::db", skip_all)]
2222 fn update_pipeline_stages(
2223 &self,
2224 block_number: BlockNumber,
2225 drop_stage_checkpoint: bool,
2226 ) -> ProviderResult<()> {
2227 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2229 for stage_id in StageId::ALL {
2230 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2231 cursor.upsert(
2232 stage_id.to_string(),
2233 &StageCheckpoint {
2234 block_number,
2235 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2236 },
2237 )?;
2238 }
2239
2240 Ok(())
2241 }
2242}
2243
2244impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2245 fn plain_state_storages(
2246 &self,
2247 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2248 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2249 if self.cached_storage_settings().use_hashed_state() {
2250 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2251
2252 addresses_with_keys
2253 .into_iter()
2254 .map(|(address, storage)| {
2255 let hashed_address = keccak256(address);
2256 storage
2257 .into_iter()
2258 .map(|key| -> ProviderResult<_> {
2259 let hashed_key = keccak256(key);
2260 let value = hashed_storage
2261 .seek_by_key_subkey(hashed_address, hashed_key)?
2262 .filter(|v| v.key == hashed_key)
2263 .map(|v| v.value)
2264 .unwrap_or_default();
2265 Ok(StorageEntry { key, value })
2266 })
2267 .collect::<ProviderResult<Vec<_>>>()
2268 .map(|storage| (address, storage))
2269 })
2270 .collect::<ProviderResult<Vec<(_, _)>>>()
2271 } else {
2272 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2273
2274 addresses_with_keys
2275 .into_iter()
2276 .map(|(address, storage)| {
2277 storage
2278 .into_iter()
2279 .map(|key| -> ProviderResult<_> {
2280 Ok(plain_storage
2281 .seek_by_key_subkey(address, key)?
2282 .filter(|v| v.key == key)
2283 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2284 })
2285 .collect::<ProviderResult<Vec<_>>>()
2286 .map(|storage| (address, storage))
2287 })
2288 .collect::<ProviderResult<Vec<(_, _)>>>()
2289 }
2290 }
2291
2292 fn changed_storages_with_range(
2293 &self,
2294 range: RangeInclusive<BlockNumber>,
2295 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2296 if self.cached_storage_settings().storage_v2 {
2297 self.storage_changesets_range(range)?.into_iter().try_fold(
2298 BTreeMap::new(),
2299 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2300 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2301 accounts.entry(address).or_default().insert(storage_entry.key);
2302 Ok(accounts)
2303 },
2304 )
2305 } else {
2306 self.tx
2307 .cursor_read::<tables::StorageChangeSets>()?
2308 .walk_range(BlockNumberAddress::range(range))?
2309 .try_fold(
2312 BTreeMap::new(),
2313 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2314 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2315 accounts.entry(address).or_default().insert(storage_entry.key);
2316 Ok(accounts)
2317 },
2318 )
2319 }
2320 }
2321
2322 fn changed_storages_and_blocks_with_range(
2323 &self,
2324 range: RangeInclusive<BlockNumber>,
2325 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2326 if self.cached_storage_settings().storage_v2 {
2327 self.storage_changesets_range(range)?.into_iter().try_fold(
2328 BTreeMap::new(),
2329 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2330 storages
2331 .entry((index.address(), storage.key))
2332 .or_default()
2333 .push(index.block_number());
2334 Ok(storages)
2335 },
2336 )
2337 } else {
2338 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2339
2340 let storage_changeset_lists =
2341 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2342 BTreeMap::new(),
2343 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2344 entry|
2345 -> ProviderResult<_> {
2346 let (index, storage) = entry?;
2347 storages
2348 .entry((index.address(), storage.key))
2349 .or_default()
2350 .push(index.block_number());
2351 Ok(storages)
2352 },
2353 )?;
2354
2355 Ok(storage_changeset_lists)
2356 }
2357 }
2358}
2359
2360impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2361 for DatabaseProvider<TX, N>
2362{
2363 type Receipt = ReceiptTy<N>;
2364
2365 #[instrument(level = "debug", target = "providers::db", skip_all)]
2366 fn write_state<'a>(
2367 &self,
2368 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2369 is_value_known: OriginalValuesKnown,
2370 config: StateWriteConfig,
2371 ) -> ProviderResult<()> {
2372 let execution_outcome = execution_outcome.into();
2373
2374 if self.cached_storage_settings().use_hashed_state() &&
2375 !config.write_receipts &&
2376 !config.write_account_changesets &&
2377 !config.write_storage_changesets
2378 {
2379 self.write_bytecodes(
2383 execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2384 )?;
2385 return Ok(());
2386 }
2387
2388 let first_block = execution_outcome.first_block();
2389 let (plain_state, reverts) =
2390 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2391
2392 self.write_state_reverts(reverts, first_block, config)?;
2393 self.write_state_changes(plain_state)?;
2394
2395 if !config.write_receipts {
2396 return Ok(());
2397 }
2398
2399 let block_count = execution_outcome.len() as u64;
2400 let last_block = execution_outcome.last_block();
2401 let block_range = first_block..=last_block;
2402
2403 let tip = self.last_block_number()?.max(last_block);
2404
2405 let block_indices: Vec<_> = self
2407 .block_body_indices_range(block_range)?
2408 .into_iter()
2409 .map(|b| b.first_tx_num)
2410 .collect();
2411
2412 if block_indices.len() < block_count as usize {
2414 let missing_blocks = block_count - block_indices.len() as u64;
2415 return Err(ProviderError::BlockBodyIndicesNotFound(
2416 last_block.saturating_sub(missing_blocks - 1),
2417 ));
2418 }
2419
2420 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2421
2422 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2423 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2424
2425 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2433 self.static_file_provider()
2434 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2435 .is_none()) &&
2436 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2437
2438 let mut allowed_addresses: AddressSet = AddressSet::default();
2440 for (_, addresses) in contract_log_pruner.range(..first_block) {
2441 allowed_addresses.extend(addresses.iter().copied());
2442 }
2443
2444 for (idx, (receipts, first_tx_index)) in
2445 execution_outcome.receipts().zip(block_indices).enumerate()
2446 {
2447 let block_number = first_block + idx as u64;
2448
2449 receipts_writer.increment_block(block_number)?;
2451
2452 if prunable_receipts &&
2454 self.prune_modes
2455 .receipts
2456 .is_some_and(|mode| mode.should_prune(block_number, tip))
2457 {
2458 continue
2459 }
2460
2461 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2463 allowed_addresses.extend(new_addresses.iter().copied());
2464 }
2465
2466 for (idx, receipt) in receipts.iter().enumerate() {
2467 let receipt_idx = first_tx_index + idx as u64;
2468 if prunable_receipts &&
2471 has_contract_log_filter &&
2472 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2473 {
2474 continue
2475 }
2476
2477 receipts_writer.append_receipt(receipt_idx, receipt)?;
2478 }
2479 }
2480
2481 Ok(())
2482 }
2483
2484 fn write_state_reverts(
2485 &self,
2486 reverts: PlainStateReverts,
2487 first_block: BlockNumber,
2488 config: StateWriteConfig,
2489 ) -> ProviderResult<()> {
2490 if config.write_storage_changesets {
2492 tracing::trace!("Writing storage changes");
2493 let mut storages_cursor =
2494 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2495 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2496 let block_number = first_block + block_index as BlockNumber;
2497
2498 tracing::trace!(block_number, "Writing block change");
2499 storage_changes.par_sort_unstable_by_key(|a| a.address);
2501 let total_changes =
2502 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2503 let mut changeset = Vec::with_capacity(total_changes);
2504 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2505 let mut storage = storage_revert
2506 .into_iter()
2507 .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2508 .collect::<Vec<_>>();
2509 storage.par_sort_unstable_by_key(|a| a.0);
2511
2512 let mut wiped_storage = Vec::new();
2520 if wiped {
2521 tracing::trace!(?address, "Wiping storage");
2522 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2523 wiped_storage.push((entry.key, entry.value));
2524 while let Some(entry) = storages_cursor.next_dup_val()? {
2525 wiped_storage.push((entry.key, entry.value))
2526 }
2527 }
2528 }
2529
2530 tracing::trace!(?address, ?storage, "Writing storage reverts");
2531 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2532 changeset.push(StorageBeforeTx { address, key, value });
2533 }
2534 }
2535
2536 let mut storage_changesets_writer =
2537 EitherWriter::new_storage_changesets(self, block_number)?;
2538 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2539 }
2540 }
2541
2542 if !config.write_account_changesets {
2543 return Ok(());
2544 }
2545
2546 tracing::trace!(?first_block, "Writing account changes");
2548 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2549 let block_number = first_block + block_index as BlockNumber;
2550 let changeset = account_block_reverts
2551 .into_iter()
2552 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2553 .collect::<Vec<_>>();
2554 let mut account_changesets_writer =
2555 EitherWriter::new_account_changesets(self, block_number)?;
2556
2557 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2558 }
2559
2560 Ok(())
2561 }
2562
2563 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2564 changes.accounts.par_sort_by_key(|a| a.0);
2567 changes.storage.par_sort_by_key(|a| a.address);
2568 changes.contracts.par_sort_by_key(|a| a.0);
2569
2570 if !self.cached_storage_settings().use_hashed_state() {
2571 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2573 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2574 for (address, account) in changes.accounts {
2576 if let Some(account) = account {
2577 tracing::trace!(?address, "Updating plain state account");
2578 accounts_cursor.upsert(address, &account.into())?;
2579 } else if accounts_cursor.seek_exact(address)?.is_some() {
2580 tracing::trace!(?address, "Deleting plain state account");
2581 accounts_cursor.delete_current()?;
2582 }
2583 }
2584
2585 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2587 let mut storages_cursor =
2588 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2589 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2590 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2592 storages_cursor.delete_current_duplicates()?;
2593 }
2594 let mut storage = storage
2596 .into_iter()
2597 .map(|(k, value)| StorageEntry { key: k.into(), value })
2598 .collect::<Vec<_>>();
2599 storage.par_sort_unstable_by_key(|a| a.key);
2601
2602 for entry in storage {
2603 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2604 if let Some(db_entry) =
2605 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2606 db_entry.key == entry.key
2607 {
2608 storages_cursor.delete_current()?;
2609 }
2610
2611 if !entry.value.is_zero() {
2612 storages_cursor.upsert(address, &entry)?;
2613 }
2614 }
2615 }
2616 }
2617
2618 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2620 self.write_bytecodes(
2621 changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2622 )?;
2623
2624 Ok(())
2625 }
2626
2627 #[instrument(level = "debug", target = "providers::db", skip_all)]
2628 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2629 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2631 for (hashed_address, account) in hashed_state.accounts() {
2632 if let Some(account) = account {
2633 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2634 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2635 hashed_accounts_cursor.delete_current()?;
2636 }
2637 }
2638
2639 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2641 let mut hashed_storage_cursor =
2642 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2643 for (hashed_address, storage) in sorted_storages {
2644 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2645 hashed_storage_cursor.delete_current_duplicates()?;
2646 }
2647
2648 for (hashed_slot, value) in storage.storage_slots_ref() {
2649 let entry = StorageEntry { key: *hashed_slot, value: *value };
2650
2651 if let Some(db_entry) =
2652 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2653 db_entry.key == entry.key
2654 {
2655 hashed_storage_cursor.delete_current()?;
2656 }
2657
2658 if !entry.value.is_zero() {
2659 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2660 }
2661 }
2662 }
2663
2664 Ok(())
2665 }
2666
2667 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2689 let range = block + 1..=self.last_block_number()?;
2690
2691 if range.is_empty() {
2692 return Ok(());
2693 }
2694
2695 let block_bodies = self.block_body_indices_range(range.clone())?;
2697
2698 let from_transaction_num =
2700 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2701
2702 let storage_range = BlockNumberAddress::range(range.clone());
2703 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2704 let changesets = self.storage_changesets_range(range.clone())?;
2705 let mut changeset_writer =
2706 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2707 changeset_writer.prune_storage_changesets(block)?;
2708 changesets
2709 } else {
2710 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2711 };
2712 let account_changeset = if self.cached_storage_settings().storage_v2 {
2713 let changesets = self.account_changesets_range(range)?;
2714 let mut changeset_writer =
2715 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2716 changeset_writer.prune_account_changesets(block)?;
2717 changesets
2718 } else {
2719 self.take::<tables::AccountChangeSets>(range)?
2720 };
2721
2722 if self.cached_storage_settings().use_hashed_state() {
2723 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2724 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2725
2726 let (state, _) = self.populate_bundle_state_hashed(
2727 account_changeset,
2728 storage_changeset,
2729 &mut hashed_accounts_cursor,
2730 &mut hashed_storage_cursor,
2731 )?;
2732
2733 for (address, (old_account, new_account, storage)) in &state {
2734 if old_account != new_account {
2735 let hashed_address = keccak256(address);
2736 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2737 if let Some(account) = old_account {
2738 hashed_accounts_cursor.upsert(hashed_address, account)?;
2739 } else if existing_entry.is_some() {
2740 hashed_accounts_cursor.delete_current()?;
2741 }
2742 }
2743
2744 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2745 let hashed_address = keccak256(address);
2746 let hashed_storage_key = keccak256(storage_key);
2747 let storage_entry =
2748 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2749 if hashed_storage_cursor
2750 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2751 .is_some_and(|s| s.key == hashed_storage_key)
2752 {
2753 hashed_storage_cursor.delete_current()?
2754 }
2755
2756 if !old_storage_value.is_zero() {
2757 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2758 }
2759 }
2760 }
2761 } else {
2762 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2767 let mut plain_storage_cursor =
2768 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2769
2770 let (state, _) = self.populate_bundle_state(
2771 account_changeset,
2772 storage_changeset,
2773 &mut plain_accounts_cursor,
2774 &mut plain_storage_cursor,
2775 )?;
2776
2777 for (address, (old_account, new_account, storage)) in &state {
2778 if old_account != new_account {
2779 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2780 if let Some(account) = old_account {
2781 plain_accounts_cursor.upsert(*address, account)?;
2782 } else if existing_entry.is_some() {
2783 plain_accounts_cursor.delete_current()?;
2784 }
2785 }
2786
2787 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2788 let storage_entry =
2789 StorageEntry { key: *storage_key, value: *old_storage_value };
2790 if plain_storage_cursor
2791 .seek_by_key_subkey(*address, *storage_key)?
2792 .is_some_and(|s| s.key == *storage_key)
2793 {
2794 plain_storage_cursor.delete_current()?
2795 }
2796
2797 if !old_storage_value.is_zero() {
2798 plain_storage_cursor.upsert(*address, &storage_entry)?;
2799 }
2800 }
2801 }
2802 }
2803
2804 self.remove_receipts_from(from_transaction_num, block)?;
2805
2806 Ok(())
2807 }
2808
2809 fn take_state_above(
2831 &self,
2832 block: BlockNumber,
2833 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2834 let range = block + 1..=self.last_block_number()?;
2835
2836 if range.is_empty() {
2837 return Ok(ExecutionOutcome::default())
2838 }
2839 let start_block_number = *range.start();
2840
2841 let block_bodies = self.block_body_indices_range(range.clone())?;
2843
2844 let from_transaction_num =
2846 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2847 let to_transaction_num =
2848 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2849
2850 let storage_range = BlockNumberAddress::range(range.clone());
2851 let storage_changeset = if let Some(highest_block) = self
2852 .static_file_provider
2853 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2854 self.cached_storage_settings().storage_v2
2855 {
2856 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2857 let mut changeset_writer =
2858 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2859 changeset_writer.prune_storage_changesets(block)?;
2860 changesets
2861 } else {
2862 self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2863 };
2864
2865 let highest_changeset_block = self
2867 .static_file_provider
2868 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2869 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2870 self.cached_storage_settings().storage_v2
2871 {
2872 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2874 let mut changeset_writer =
2875 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2876 changeset_writer.prune_account_changesets(block)?;
2877
2878 changesets
2879 } else {
2880 self.take::<tables::AccountChangeSets>(range)?
2883 };
2884
2885 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2886 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2887 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2888
2889 let (state, reverts) = self.populate_bundle_state_hashed(
2890 account_changeset,
2891 storage_changeset,
2892 &mut hashed_accounts_cursor,
2893 &mut hashed_storage_cursor,
2894 )?;
2895
2896 for (address, (old_account, new_account, storage)) in &state {
2897 if old_account != new_account {
2898 let hashed_address = keccak256(address);
2899 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2900 if let Some(account) = old_account {
2901 hashed_accounts_cursor.upsert(hashed_address, account)?;
2902 } else if existing_entry.is_some() {
2903 hashed_accounts_cursor.delete_current()?;
2904 }
2905 }
2906
2907 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2908 let hashed_address = keccak256(address);
2909 let hashed_storage_key = keccak256(storage_key);
2910 let storage_entry =
2911 StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2912 if hashed_storage_cursor
2913 .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2914 .is_some_and(|s| s.key == hashed_storage_key)
2915 {
2916 hashed_storage_cursor.delete_current()?
2917 }
2918
2919 if !old_storage_value.is_zero() {
2920 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2921 }
2922 }
2923 }
2924
2925 (state, reverts)
2926 } else {
2927 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2932 let mut plain_storage_cursor =
2933 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2934
2935 let (state, reverts) = self.populate_bundle_state(
2936 account_changeset,
2937 storage_changeset,
2938 &mut plain_accounts_cursor,
2939 &mut plain_storage_cursor,
2940 )?;
2941
2942 for (address, (old_account, new_account, storage)) in &state {
2943 if old_account != new_account {
2944 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2945 if let Some(account) = old_account {
2946 plain_accounts_cursor.upsert(*address, account)?;
2947 } else if existing_entry.is_some() {
2948 plain_accounts_cursor.delete_current()?;
2949 }
2950 }
2951
2952 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2953 let storage_entry =
2954 StorageEntry { key: *storage_key, value: *old_storage_value };
2955 if plain_storage_cursor
2956 .seek_by_key_subkey(*address, *storage_key)?
2957 .is_some_and(|s| s.key == *storage_key)
2958 {
2959 plain_storage_cursor.delete_current()?
2960 }
2961
2962 if !old_storage_value.is_zero() {
2963 plain_storage_cursor.upsert(*address, &storage_entry)?;
2964 }
2965 }
2966 }
2967
2968 (state, reverts)
2969 };
2970
2971 let mut receipts_iter = self
2973 .static_file_provider
2974 .get_range_with_static_file_or_database(
2975 StaticFileSegment::Receipts,
2976 from_transaction_num..to_transaction_num + 1,
2977 |static_file, range, _| {
2978 static_file
2979 .receipts_by_tx_range(range.clone())
2980 .map(|r| range.into_iter().zip(r).collect())
2981 },
2982 |range, _| {
2983 self.tx
2984 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2985 .walk_range(range)?
2986 .map(|r| r.map_err(Into::into))
2987 .collect()
2988 },
2989 |_| true,
2990 )?
2991 .into_iter()
2992 .peekable();
2993
2994 let mut receipts = Vec::with_capacity(block_bodies.len());
2995 for block_body in block_bodies {
2997 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2998 for num in block_body.tx_num_range() {
2999 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3000 block_receipts.push(receipts_iter.next().unwrap().1);
3001 }
3002 }
3003 receipts.push(block_receipts);
3004 }
3005
3006 self.remove_receipts_from(from_transaction_num, block)?;
3007
3008 Ok(ExecutionOutcome::new_init(
3009 state,
3010 reverts,
3011 Vec::new(),
3012 receipts,
3013 start_block_number,
3014 Vec::new(),
3015 ))
3016 }
3017}
3018
3019impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3020 fn write_account_trie_updates<A: TrieTableAdapter>(
3021 tx: &TX,
3022 trie_updates: &TrieUpdatesSorted,
3023 num_entries: &mut usize,
3024 ) -> ProviderResult<()>
3025 where
3026 TX: DbTxMut,
3027 {
3028 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3029 for (key, updated_node) in trie_updates.account_nodes_ref() {
3031 let nibbles = A::AccountKey::from(*key);
3032 match updated_node {
3033 Some(node) => {
3034 if !key.is_empty() {
3035 *num_entries += 1;
3036 account_trie_cursor.upsert(nibbles, node)?;
3037 }
3038 }
3039 None => {
3040 *num_entries += 1;
3041 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3042 account_trie_cursor.delete_current()?;
3043 }
3044 }
3045 }
3046 }
3047 Ok(())
3048 }
3049
3050 fn write_storage_tries<A: TrieTableAdapter>(
3051 tx: &TX,
3052 storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3053 num_entries: &mut usize,
3054 ) -> ProviderResult<()>
3055 where
3056 TX: DbTxMut,
3057 {
3058 let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3059 for (hashed_address, storage_trie_updates) in storage_tries {
3060 let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3061 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3062 *num_entries +=
3063 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3064 cursor = db_storage_trie_cursor.cursor;
3065 }
3066 Ok(())
3067 }
3068}
3069
3070impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3071 #[instrument(level = "debug", target = "providers::db", skip_all)]
3075 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3076 if trie_updates.is_empty() {
3077 return Ok(0)
3078 }
3079
3080 let mut num_entries = 0;
3082
3083 reth_trie_db::with_adapter!(self, |A| {
3084 Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3085 });
3086
3087 num_entries +=
3088 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3089
3090 Ok(num_entries)
3091 }
3092}
3093
3094impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3095 fn write_storage_trie_updates_sorted<'a>(
3101 &self,
3102 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3103 ) -> ProviderResult<usize> {
3104 let mut num_entries = 0;
3105 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3106 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3107 reth_trie_db::with_adapter!(self, |A| {
3108 Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3109 });
3110 Ok(num_entries)
3111 }
3112}
3113
3114impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3115 fn unwind_account_hashing<'a>(
3116 &self,
3117 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3118 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3119 let hashed_accounts = changesets
3123 .into_iter()
3124 .map(|(_, e)| (keccak256(e.address), e.info))
3125 .collect::<Vec<_>>()
3126 .into_iter()
3127 .rev()
3128 .collect::<BTreeMap<_, _>>();
3129
3130 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3132 for (hashed_address, account) in &hashed_accounts {
3133 if let Some(account) = account {
3134 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3135 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3136 hashed_accounts_cursor.delete_current()?;
3137 }
3138 }
3139
3140 Ok(hashed_accounts)
3141 }
3142
3143 fn unwind_account_hashing_range(
3144 &self,
3145 range: impl RangeBounds<BlockNumber>,
3146 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3147 let changesets = self.account_changesets_range(range)?;
3148 self.unwind_account_hashing(changesets.iter())
3149 }
3150
3151 fn insert_account_for_hashing(
3152 &self,
3153 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3154 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3155 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3156 let hashed_accounts =
3157 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3158 for (hashed_address, account) in &hashed_accounts {
3159 if let Some(account) = account {
3160 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3161 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3162 hashed_accounts_cursor.delete_current()?;
3163 }
3164 }
3165 Ok(hashed_accounts)
3166 }
3167
3168 fn unwind_storage_hashing(
3169 &self,
3170 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3171 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3172 let mut hashed_storages = changesets
3174 .into_iter()
3175 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3176 let hashed_key = keccak256(storage_entry.key);
3177 (keccak256(address), hashed_key, storage_entry.value)
3178 })
3179 .collect::<Vec<_>>();
3180 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3181
3182 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3184 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3185 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3186 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3187 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3188
3189 if hashed_storage
3190 .seek_by_key_subkey(hashed_address, key)?
3191 .is_some_and(|entry| entry.key == key)
3192 {
3193 hashed_storage.delete_current()?;
3194 }
3195
3196 if !value.is_zero() {
3197 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3198 }
3199 }
3200 Ok(hashed_storage_keys)
3201 }
3202
3203 fn unwind_storage_hashing_range(
3204 &self,
3205 range: impl RangeBounds<BlockNumber>,
3206 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3207 let changesets = self.storage_changesets_range(range)?;
3208 self.unwind_storage_hashing(changesets.into_iter())
3209 }
3210
3211 fn insert_storage_for_hashing(
3212 &self,
3213 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3214 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3215 let hashed_storages =
3217 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3218 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3219 map.insert(keccak256(entry.key), entry.value);
3220 map
3221 });
3222 map.insert(keccak256(address), storage);
3223 map
3224 });
3225
3226 let hashed_storage_keys = hashed_storages
3227 .iter()
3228 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3229 .collect();
3230
3231 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3232 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3235 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3236 if hashed_storage_cursor
3237 .seek_by_key_subkey(hashed_address, key)?
3238 .is_some_and(|entry| entry.key == key)
3239 {
3240 hashed_storage_cursor.delete_current()?;
3241 }
3242
3243 if !value.is_zero() {
3244 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3245 }
3246 Ok(())
3247 })
3248 })?;
3249
3250 Ok(hashed_storage_keys)
3251 }
3252}
3253
3254impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3255 fn unwind_account_history_indices<'a>(
3256 &self,
3257 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3258 ) -> ProviderResult<usize> {
3259 let mut last_indices = changesets
3260 .into_iter()
3261 .map(|(index, account)| (account.address, *index))
3262 .collect::<Vec<_>>();
3263 last_indices.sort_unstable_by_key(|(a, _)| *a);
3264
3265 if self.cached_storage_settings().storage_v2 {
3266 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3267 self.pending_rocksdb_batches.lock().push(batch);
3268 } else {
3269 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3271 for &(address, rem_index) in &last_indices {
3272 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3273 &mut cursor,
3274 ShardedKey::last(address),
3275 rem_index,
3276 |sharded_key| sharded_key.key == address,
3277 )?;
3278
3279 if !partial_shard.is_empty() {
3282 cursor.insert(
3283 ShardedKey::last(address),
3284 &BlockNumberList::new_pre_sorted(partial_shard),
3285 )?;
3286 }
3287 }
3288 }
3289
3290 let changesets = last_indices.len();
3291 Ok(changesets)
3292 }
3293
3294 fn unwind_account_history_indices_range(
3295 &self,
3296 range: impl RangeBounds<BlockNumber>,
3297 ) -> ProviderResult<usize> {
3298 let changesets = self.account_changesets_range(range)?;
3299 self.unwind_account_history_indices(changesets.iter())
3300 }
3301
3302 fn insert_account_history_index(
3303 &self,
3304 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3305 ) -> ProviderResult<()> {
3306 self.append_history_index::<_, tables::AccountsHistory>(
3307 account_transitions,
3308 ShardedKey::new,
3309 )
3310 }
3311
3312 fn unwind_storage_history_indices(
3313 &self,
3314 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3315 ) -> ProviderResult<usize> {
3316 let mut storage_changesets = changesets
3317 .into_iter()
3318 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3319 .collect::<Vec<_>>();
3320 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3321
3322 if self.cached_storage_settings().storage_v2 {
3323 let batch =
3324 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3325 self.pending_rocksdb_batches.lock().push(batch);
3326 } else {
3327 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3329 for &(address, storage_key, rem_index) in &storage_changesets {
3330 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3331 &mut cursor,
3332 StorageShardedKey::last(address, storage_key),
3333 rem_index,
3334 |storage_sharded_key| {
3335 storage_sharded_key.address == address &&
3336 storage_sharded_key.sharded_key.key == storage_key
3337 },
3338 )?;
3339
3340 if !partial_shard.is_empty() {
3343 cursor.insert(
3344 StorageShardedKey::last(address, storage_key),
3345 &BlockNumberList::new_pre_sorted(partial_shard),
3346 )?;
3347 }
3348 }
3349 }
3350
3351 let changesets = storage_changesets.len();
3352 Ok(changesets)
3353 }
3354
3355 fn unwind_storage_history_indices_range(
3356 &self,
3357 range: impl RangeBounds<BlockNumber>,
3358 ) -> ProviderResult<usize> {
3359 let changesets = self.storage_changesets_range(range)?;
3360 self.unwind_storage_history_indices(changesets.into_iter())
3361 }
3362
3363 fn insert_storage_history_index(
3364 &self,
3365 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3366 ) -> ProviderResult<()> {
3367 self.append_history_index::<_, tables::StoragesHistory>(
3368 storage_transitions,
3369 |(address, storage_key), highest_block_number| {
3370 StorageShardedKey::new(address, storage_key, highest_block_number)
3371 },
3372 )
3373 }
3374
3375 #[instrument(level = "debug", target = "providers::db", skip_all)]
3376 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3377 let storage_settings = self.cached_storage_settings();
3378 if !storage_settings.storage_v2 {
3379 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3380 self.insert_account_history_index(indices)?;
3381 }
3382
3383 if !storage_settings.storage_v2 {
3384 let indices = self.changed_storages_and_blocks_with_range(range)?;
3385 self.insert_storage_history_index(indices)?;
3386 }
3387
3388 Ok(())
3389 }
3390}
3391
3392impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3393 for DatabaseProvider<TX, N>
3394{
3395 fn take_block_and_execution_above(
3396 &self,
3397 block: BlockNumber,
3398 ) -> ProviderResult<Chain<Self::Primitives>> {
3399 let range = block + 1..=self.last_block_number()?;
3400
3401 self.unwind_trie_state_from(block + 1)?;
3402
3403 let execution_state = self.take_state_above(block)?;
3405
3406 let blocks = self.recovered_block_range(range)?;
3407
3408 self.remove_blocks_above(block)?;
3411
3412 self.update_pipeline_stages(block, true)?;
3414
3415 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3416 }
3417
3418 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3419 self.unwind_trie_state_from(block + 1)?;
3420
3421 self.remove_state_above(block)?;
3423
3424 self.remove_blocks_above(block)?;
3427
3428 self.update_pipeline_stages(block, true)?;
3430
3431 Ok(())
3432 }
3433}
3434
3435impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3436 for DatabaseProvider<TX, N>
3437{
3438 type Block = BlockTy<N>;
3439 type Receipt = ReceiptTy<N>;
3440
3441 fn insert_block(
3446 &self,
3447 block: &RecoveredBlock<Self::Block>,
3448 ) -> ProviderResult<StoredBlockBodyIndices> {
3449 let block_number = block.number();
3450
3451 let executed_block = ExecutedBlock::new(
3453 Arc::new(block.clone()),
3454 Arc::new(BlockExecutionOutput {
3455 result: BlockExecutionResult {
3456 receipts: Default::default(),
3457 requests: Default::default(),
3458 gas_used: 0,
3459 blob_gas_used: 0,
3460 },
3461 state: Default::default(),
3462 }),
3463 ComputedTrieData::default(),
3464 );
3465
3466 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3468
3469 self.block_body_indices(block_number)?
3471 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3472 }
3473
3474 fn append_block_bodies(
3475 &self,
3476 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3477 ) -> ProviderResult<()> {
3478 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3479
3480 let mut tx_writer =
3482 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3483
3484 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3485 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3486
3487 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3489
3490 for (block_number, body) in &bodies {
3491 tx_writer.increment_block(*block_number)?;
3493
3494 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3495 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3496
3497 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3498
3499 block_indices_cursor.append(*block_number, &block_indices)?;
3501
3502 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3503
3504 let Some(body) = body else { continue };
3505
3506 if !body.transactions().is_empty() {
3508 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3509 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3510 }
3511
3512 for transaction in body.transactions() {
3514 tx_writer.append_transaction(next_tx_num, transaction)?;
3515
3516 next_tx_num += 1;
3518 }
3519 }
3520
3521 self.storage.writer().write_block_bodies(self, bodies)?;
3522
3523 Ok(())
3524 }
3525
3526 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3527 let last_block_number = self.last_block_number()?;
3528 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3530 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3531 }
3532
3533 let highest_static_file_block = self
3535 .static_file_provider()
3536 .get_highest_static_file_block(StaticFileSegment::Headers)
3537 .expect("todo: error handling, headers should exist");
3538
3539 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3545 self.static_file_provider()
3546 .get_writer(block, StaticFileSegment::Headers)?
3547 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3548
3549 let unwind_tx_from = self
3551 .block_body_indices(block)?
3552 .map(|b| b.next_tx_num())
3553 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3554
3555 let unwind_tx_to = self
3557 .tx
3558 .cursor_read::<tables::BlockBodyIndices>()?
3559 .last()?
3560 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3562 .1
3563 .last_tx_num();
3564
3565 if unwind_tx_from <= unwind_tx_to {
3566 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3567 self.with_rocksdb_batch(|batch| {
3568 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3569 for (hash, _) in hashes {
3570 writer.delete_transaction_hash_number(hash)?;
3571 }
3572 Ok(((), writer.into_raw_rocksdb_batch()))
3573 })?;
3574 }
3575
3576 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3579 EitherWriter::new_senders(self, last_block_number)?
3580 .prune_senders(unwind_tx_from, block)?;
3581 }
3582
3583 self.remove_bodies_above(block)?;
3584
3585 Ok(())
3586 }
3587
3588 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3589 self.storage.writer().remove_block_bodies_above(self, block)?;
3590
3591 let unwind_tx_from = self
3593 .block_body_indices(block)?
3594 .map(|b| b.next_tx_num())
3595 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3596
3597 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3598 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3599
3600 let static_file_tx_num =
3601 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3602
3603 let to_delete = static_file_tx_num
3604 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3605 .unwrap_or_default();
3606
3607 self.static_file_provider
3608 .latest_writer(StaticFileSegment::Transactions)?
3609 .prune_transactions(to_delete, block)?;
3610
3611 Ok(())
3612 }
3613
3614 fn append_blocks_with_state(
3623 &self,
3624 blocks: Vec<RecoveredBlock<Self::Block>>,
3625 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3626 hashed_state: HashedPostStateSorted,
3627 ) -> ProviderResult<()> {
3628 if blocks.is_empty() {
3629 debug!(target: "providers::db", "Attempted to append empty block range");
3630 return Ok(())
3631 }
3632
3633 let first_number = blocks[0].number();
3636
3637 let last_block_number = blocks[blocks.len() - 1].number();
3640
3641 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3642
3643 let (account_transitions, storage_transitions) = {
3648 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3649 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3650 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3651 let block_number = first_number + block_idx as u64;
3652 for (address, account_revert) in block_reverts {
3653 account_transitions.entry(*address).or_default().push(block_number);
3654 for storage_key in account_revert.storage.keys() {
3655 let key = B256::from(storage_key.to_be_bytes());
3656 storage_transitions.entry((*address, key)).or_default().push(block_number);
3657 }
3658 }
3659 }
3660 (account_transitions, storage_transitions)
3661 };
3662
3663 for block in blocks {
3665 self.insert_block(&block)?;
3666 durations_recorder.record_relative(metrics::Action::InsertBlock);
3667 }
3668
3669 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3670 durations_recorder.record_relative(metrics::Action::InsertState);
3671
3672 self.write_hashed_state(&hashed_state)?;
3674 durations_recorder.record_relative(metrics::Action::InsertHashes);
3675
3676 let storage_settings = self.cached_storage_settings();
3681 if storage_settings.storage_v2 {
3682 self.with_rocksdb_batch(|mut batch| {
3683 for (address, blocks) in account_transitions {
3684 batch.append_account_history_shard(address, blocks)?;
3685 }
3686 Ok(((), Some(batch.into_inner())))
3687 })?;
3688 } else {
3689 self.insert_account_history_index(account_transitions)?;
3690 }
3691 if storage_settings.storage_v2 {
3692 self.with_rocksdb_batch(|mut batch| {
3693 for ((address, key), blocks) in storage_transitions {
3694 batch.append_storage_history_shard(address, key, blocks)?;
3695 }
3696 Ok(((), Some(batch.into_inner())))
3697 })?;
3698 } else {
3699 self.insert_storage_history_index(storage_transitions)?;
3700 }
3701 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3702
3703 self.update_pipeline_stages(last_block_number, false)?;
3705 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3706
3707 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3708
3709 Ok(())
3710 }
3711}
3712
3713impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3714 fn get_prune_checkpoint(
3715 &self,
3716 segment: PruneSegment,
3717 ) -> ProviderResult<Option<PruneCheckpoint>> {
3718 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3719 }
3720
3721 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3722 Ok(PruneSegment::variants()
3723 .filter_map(|segment| {
3724 self.tx
3725 .get::<tables::PruneCheckpoints>(segment)
3726 .transpose()
3727 .map(|chk| chk.map(|chk| (segment, chk)))
3728 })
3729 .collect::<Result<_, _>>()?)
3730 }
3731}
3732
3733impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3734 fn save_prune_checkpoint(
3735 &self,
3736 segment: PruneSegment,
3737 checkpoint: PruneCheckpoint,
3738 ) -> ProviderResult<()> {
3739 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3740 }
3741}
3742
3743impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3744 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3745 let db_entries = self.tx.entries::<T>()?;
3746 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3747 Ok(entries) => entries,
3748 Err(ProviderError::UnsupportedProvider) => 0,
3749 Err(err) => return Err(err),
3750 };
3751
3752 Ok(db_entries + static_file_entries)
3753 }
3754}
3755
3756impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3757 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3758 let mut finalized_blocks = self
3759 .tx
3760 .cursor_read::<tables::ChainState>()?
3761 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3762 .take(1)
3763 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3764
3765 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3766 Ok(last_finalized_block_number)
3767 }
3768
3769 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3770 let mut finalized_blocks = self
3771 .tx
3772 .cursor_read::<tables::ChainState>()?
3773 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3774 .take(1)
3775 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3776
3777 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3778 Ok(last_finalized_block_number)
3779 }
3780}
3781
3782impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3783 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3784 Ok(self
3785 .tx
3786 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3787 }
3788
3789 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3790 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3791 }
3792}
3793
3794impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3795 type Tx = TX;
3796
3797 fn tx_ref(&self) -> &Self::Tx {
3798 &self.tx
3799 }
3800
3801 fn tx_mut(&mut self) -> &mut Self::Tx {
3802 &mut self.tx
3803 }
3804
3805 fn into_tx(self) -> Self::Tx {
3806 self.tx
3807 }
3808
3809 fn prune_modes_ref(&self) -> &PruneModes {
3810 self.prune_modes_ref()
3811 }
3812
3813 #[instrument(
3815 name = "DatabaseProvider::commit",
3816 level = "debug",
3817 target = "providers::db",
3818 skip_all
3819 )]
3820 fn commit(self) -> ProviderResult<()> {
3821 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3826 self.tx.commit()?;
3827
3828 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3829 for batch in batches {
3830 self.rocksdb_provider.commit_batch(batch)?;
3831 }
3832
3833 self.static_file_provider.commit()?;
3834 } else {
3835 let mut timings = metrics::CommitTimings::default();
3837
3838 let start = Instant::now();
3839 self.static_file_provider.finalize()?;
3840 timings.sf = start.elapsed();
3841
3842 let start = Instant::now();
3843 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3844 for batch in batches {
3845 self.rocksdb_provider.commit_batch(batch)?;
3846 }
3847 timings.rocksdb = start.elapsed();
3848
3849 let start = Instant::now();
3850 self.tx.commit()?;
3851 timings.mdbx = start.elapsed();
3852
3853 self.metrics.record_commit(&timings);
3854 }
3855
3856 Ok(())
3857 }
3858}
3859
3860impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3861 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3862 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3863 }
3864}
3865
3866impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3867 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3868 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3869 }
3870}
3871
3872impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3873 fn cached_storage_settings(&self) -> StorageSettings {
3874 *self.storage_settings.read()
3875 }
3876
3877 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3878 *self.storage_settings.write() = settings;
3879 }
3880}
3881
3882impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3883 fn storage_path(&self) -> PathBuf {
3884 self.db_path.clone()
3885 }
3886}
3887
3888#[cfg(test)]
3889mod tests {
3890 use super::*;
3891 use crate::{
3892 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3893 BlockWriter,
3894 };
3895 use alloy_consensus::Header;
3896 use alloy_primitives::{
3897 map::{AddressMap, B256Map},
3898 U256,
3899 };
3900 use reth_chain_state::ExecutedBlock;
3901 use reth_ethereum_primitives::Receipt;
3902 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3903 use reth_primitives_traits::SealedBlock;
3904 use reth_testing_utils::generators::{self, random_block, BlockParams};
3905 use reth_trie::{
3906 HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3907 };
3908 use revm_database::BundleState;
3909 use revm_state::AccountInfo;
3910
3911 #[test]
3912 fn test_receipts_by_block_range_empty_range() {
3913 let factory = create_test_provider_factory();
3914 let provider = factory.provider().unwrap();
3915
3916 let start = 10u64;
3918 let end = 9u64;
3919 let result = provider.receipts_by_block_range(start..=end).unwrap();
3920 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3921 }
3922
3923 #[test]
3924 fn test_receipts_by_block_range_nonexistent_blocks() {
3925 let factory = create_test_provider_factory();
3926 let provider = factory.provider().unwrap();
3927
3928 let result = provider.receipts_by_block_range(10..=12).unwrap();
3930 assert_eq!(result, vec![vec![], vec![], vec![]]);
3931 }
3932
3933 #[test]
3934 fn test_receipts_by_block_range_single_block() {
3935 let factory = create_test_provider_factory();
3936 let data = BlockchainTestData::default();
3937
3938 let provider_rw = factory.provider_rw().unwrap();
3939 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3940 provider_rw
3941 .write_state(
3942 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3943 crate::OriginalValuesKnown::No,
3944 StateWriteConfig::default(),
3945 )
3946 .unwrap();
3947 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3948 provider_rw
3949 .write_state(
3950 &data.blocks[0].1,
3951 crate::OriginalValuesKnown::No,
3952 StateWriteConfig::default(),
3953 )
3954 .unwrap();
3955 provider_rw.commit().unwrap();
3956
3957 let provider = factory.provider().unwrap();
3958 let result = provider.receipts_by_block_range(1..=1).unwrap();
3959
3960 assert_eq!(result.len(), 1);
3962 assert_eq!(result[0].len(), 1);
3963 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3964 }
3965
3966 #[test]
3967 fn test_receipts_by_block_range_multiple_blocks() {
3968 let factory = create_test_provider_factory();
3969 let data = BlockchainTestData::default();
3970
3971 let provider_rw = factory.provider_rw().unwrap();
3972 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3973 provider_rw
3974 .write_state(
3975 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3976 crate::OriginalValuesKnown::No,
3977 StateWriteConfig::default(),
3978 )
3979 .unwrap();
3980 for i in 0..3 {
3981 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3982 provider_rw
3983 .write_state(
3984 &data.blocks[i].1,
3985 crate::OriginalValuesKnown::No,
3986 StateWriteConfig::default(),
3987 )
3988 .unwrap();
3989 }
3990 provider_rw.commit().unwrap();
3991
3992 let provider = factory.provider().unwrap();
3993 let result = provider.receipts_by_block_range(1..=3).unwrap();
3994
3995 assert_eq!(result.len(), 3);
3997 for (i, block_receipts) in result.iter().enumerate() {
3998 assert_eq!(block_receipts.len(), 1);
3999 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4000 }
4001 }
4002
4003 #[test]
4004 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4005 let factory = create_test_provider_factory();
4006 let data = BlockchainTestData::default();
4007
4008 let provider_rw = factory.provider_rw().unwrap();
4009 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4010 provider_rw
4011 .write_state(
4012 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4013 crate::OriginalValuesKnown::No,
4014 StateWriteConfig::default(),
4015 )
4016 .unwrap();
4017
4018 for i in 0..3 {
4020 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4021 provider_rw
4022 .write_state(
4023 &data.blocks[i].1,
4024 crate::OriginalValuesKnown::No,
4025 StateWriteConfig::default(),
4026 )
4027 .unwrap();
4028 }
4029 provider_rw.commit().unwrap();
4030
4031 let provider = factory.provider().unwrap();
4032 let result = provider.receipts_by_block_range(1..=3).unwrap();
4033
4034 assert_eq!(result.len(), 3);
4036 for block_receipts in &result {
4037 assert_eq!(block_receipts.len(), 1);
4038 }
4039 }
4040
4041 #[test]
4042 fn test_receipts_by_block_range_partial_range() {
4043 let factory = create_test_provider_factory();
4044 let data = BlockchainTestData::default();
4045
4046 let provider_rw = factory.provider_rw().unwrap();
4047 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4048 provider_rw
4049 .write_state(
4050 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4051 crate::OriginalValuesKnown::No,
4052 StateWriteConfig::default(),
4053 )
4054 .unwrap();
4055 for i in 0..3 {
4056 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4057 provider_rw
4058 .write_state(
4059 &data.blocks[i].1,
4060 crate::OriginalValuesKnown::No,
4061 StateWriteConfig::default(),
4062 )
4063 .unwrap();
4064 }
4065 provider_rw.commit().unwrap();
4066
4067 let provider = factory.provider().unwrap();
4068
4069 let result = provider.receipts_by_block_range(2..=5).unwrap();
4071 assert_eq!(result.len(), 4);
4072
4073 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4080 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4081 }
4082
4083 #[test]
4084 fn test_receipts_by_block_range_all_empty_blocks() {
4085 let factory = create_test_provider_factory();
4086 let mut rng = generators::rng();
4087
4088 let mut blocks = Vec::new();
4090 for i in 0..3 {
4091 let block =
4092 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4093 blocks.push(block);
4094 }
4095
4096 let provider_rw = factory.provider_rw().unwrap();
4097 for block in blocks {
4098 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4099 }
4100 provider_rw.commit().unwrap();
4101
4102 let provider = factory.provider().unwrap();
4103 let result = provider.receipts_by_block_range(1..=3).unwrap();
4104
4105 assert_eq!(result.len(), 3);
4106 for block_receipts in result {
4107 assert_eq!(block_receipts.len(), 0);
4108 }
4109 }
4110
4111 #[test]
4112 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4113 let factory = create_test_provider_factory();
4114 let data = BlockchainTestData::default();
4115
4116 let provider_rw = factory.provider_rw().unwrap();
4117 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4118 provider_rw
4119 .write_state(
4120 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4121 crate::OriginalValuesKnown::No,
4122 StateWriteConfig::default(),
4123 )
4124 .unwrap();
4125 for i in 0..3 {
4126 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4127 provider_rw
4128 .write_state(
4129 &data.blocks[i].1,
4130 crate::OriginalValuesKnown::No,
4131 StateWriteConfig::default(),
4132 )
4133 .unwrap();
4134 }
4135 provider_rw.commit().unwrap();
4136
4137 let provider = factory.provider().unwrap();
4138
4139 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4141
4142 let mut individual_results = Vec::new();
4144 for block_num in 1..=3 {
4145 let receipts =
4146 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4147 individual_results.push(receipts);
4148 }
4149
4150 assert_eq!(range_result, individual_results);
4151 }
4152
4153 #[test]
4154 fn test_write_trie_updates_sorted() {
4155 use reth_trie::{
4156 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4157 BranchNodeCompact, StorageTrieEntry,
4158 };
4159
4160 let factory = create_test_provider_factory();
4161 let provider_rw = factory.provider_rw().unwrap();
4162
4163 {
4165 let tx = provider_rw.tx_ref();
4166 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4167
4168 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4170 cursor
4171 .upsert(
4172 to_delete,
4173 &BranchNodeCompact::new(
4174 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4178 None,
4179 ),
4180 )
4181 .unwrap();
4182
4183 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4185 cursor
4186 .upsert(
4187 to_update,
4188 &BranchNodeCompact::new(
4189 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4193 None,
4194 ),
4195 )
4196 .unwrap();
4197 }
4198
4199 let storage_address1 = B256::from([1u8; 32]);
4201 let storage_address2 = B256::from([2u8; 32]);
4202 {
4203 let tx = provider_rw.tx_ref();
4204 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4205
4206 storage_cursor
4208 .upsert(
4209 storage_address1,
4210 &StorageTrieEntry {
4211 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4212 node: BranchNodeCompact::new(
4213 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4215 0b0000_0000_0000_0000,
4216 vec![],
4217 None,
4218 ),
4219 },
4220 )
4221 .unwrap();
4222
4223 storage_cursor
4225 .upsert(
4226 storage_address2,
4227 &StorageTrieEntry {
4228 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4229 node: BranchNodeCompact::new(
4230 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4232 0b0000_0000_0000_0000,
4233 vec![],
4234 None,
4235 ),
4236 },
4237 )
4238 .unwrap();
4239 storage_cursor
4240 .upsert(
4241 storage_address2,
4242 &StorageTrieEntry {
4243 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4244 node: BranchNodeCompact::new(
4245 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4247 0b0000_0000_0000_0000,
4248 vec![],
4249 None,
4250 ),
4251 },
4252 )
4253 .unwrap();
4254 }
4255
4256 let account_nodes = vec![
4258 (
4259 Nibbles::from_nibbles([0x1, 0x2]),
4260 Some(BranchNodeCompact::new(
4261 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4265 None,
4266 )),
4267 ),
4268 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4270 Nibbles::from_nibbles([0x5, 0x6]),
4271 Some(BranchNodeCompact::new(
4272 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4276 None,
4277 )),
4278 ),
4279 ];
4280
4281 let storage_trie1 = StorageTrieUpdatesSorted {
4283 is_deleted: false,
4284 storage_nodes: vec![
4285 (
4286 Nibbles::from_nibbles([0x1, 0x0]),
4287 Some(BranchNodeCompact::new(
4288 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4292 None,
4293 )),
4294 ),
4295 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4297 };
4298
4299 let storage_trie2 = StorageTrieUpdatesSorted {
4300 is_deleted: true, storage_nodes: vec![],
4302 };
4303
4304 let mut storage_tries = B256Map::default();
4305 storage_tries.insert(storage_address1, storage_trie1);
4306 storage_tries.insert(storage_address2, storage_trie2);
4307
4308 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4309
4310 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4312
4313 assert_eq!(num_entries, 5);
4316
4317 let tx = provider_rw.tx_ref();
4319 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4320
4321 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4323 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4324 assert!(entry1.is_some(), "Updated account node should exist");
4325 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4326 assert_eq!(
4327 entry1.unwrap().1.state_mask,
4328 expected_mask,
4329 "Account node should have updated state_mask"
4330 );
4331
4332 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4334 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4335 assert!(entry2.is_none(), "Deleted account node should not exist");
4336
4337 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4339 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4340 assert!(entry3.is_some(), "New account node should exist");
4341
4342 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4344
4345 let storage_entries1: Vec<_> = storage_cursor
4347 .walk_dup(Some(storage_address1), None)
4348 .unwrap()
4349 .collect::<Result<Vec<_>, _>>()
4350 .unwrap();
4351 assert_eq!(
4352 storage_entries1.len(),
4353 1,
4354 "Storage address1 should have 1 entry after deletion"
4355 );
4356 assert_eq!(
4357 storage_entries1[0].1.nibbles.0,
4358 Nibbles::from_nibbles([0x1, 0x0]),
4359 "Remaining entry should be [0x1, 0x0]"
4360 );
4361
4362 let storage_entries2: Vec<_> = storage_cursor
4364 .walk_dup(Some(storage_address2), None)
4365 .unwrap()
4366 .collect::<Result<Vec<_>, _>>()
4367 .unwrap();
4368 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4369
4370 provider_rw.commit().unwrap();
4371 }
4372
4373 #[test]
4374 fn test_prunable_receipts_logic() {
4375 let insert_blocks =
4376 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4377 let mut rng = generators::rng();
4378 for block_num in 0..=tip_block {
4379 let block = random_block(
4380 &mut rng,
4381 block_num,
4382 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4383 );
4384 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4385 }
4386 };
4387
4388 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4389 let outcome = ExecutionOutcome {
4390 first_block: block,
4391 receipts: vec![vec![Receipt {
4392 tx_type: Default::default(),
4393 success: true,
4394 cumulative_gas_used: block, logs: vec![],
4396 }]],
4397 ..Default::default()
4398 };
4399 provider_rw
4400 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4401 .unwrap();
4402 provider_rw.commit().unwrap();
4403 };
4404
4405 {
4407 let factory = create_test_provider_factory();
4408 let storage_settings = StorageSettings::v1();
4409 factory.set_storage_settings_cache(storage_settings);
4410 let factory = factory.with_prune_modes(PruneModes {
4411 receipts: Some(PruneMode::Before(100)),
4412 ..Default::default()
4413 });
4414
4415 let tip_block = 200u64;
4416 let first_block = 1u64;
4417
4418 let provider_rw = factory.provider_rw().unwrap();
4420 insert_blocks(&provider_rw, tip_block, 1);
4421 provider_rw.commit().unwrap();
4422
4423 write_receipts(
4424 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4425 first_block,
4426 );
4427 write_receipts(
4428 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4429 tip_block - 1,
4430 );
4431
4432 let provider = factory.provider().unwrap();
4433
4434 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4435 assert!(provider
4436 .receipts_by_block(block.into())
4437 .unwrap()
4438 .is_some_and(|r| r.len() == num_receipts));
4439 }
4440 }
4441
4442 {
4444 let factory = create_test_provider_factory();
4445 let storage_settings = StorageSettings::v2();
4446 factory.set_storage_settings_cache(storage_settings);
4447 let factory = factory.with_prune_modes(PruneModes {
4448 receipts: Some(PruneMode::Before(2)),
4449 ..Default::default()
4450 });
4451
4452 let tip_block = 200u64;
4453
4454 let provider_rw = factory.provider_rw().unwrap();
4456 insert_blocks(&provider_rw, tip_block, 1);
4457 provider_rw.commit().unwrap();
4458
4459 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4461 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4462
4463 assert!(factory
4464 .static_file_provider()
4465 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4466 .is_none(),);
4467 assert!(factory
4468 .static_file_provider()
4469 .get_highest_static_file_block(StaticFileSegment::Receipts)
4470 .is_some_and(|b| b == 1),);
4471
4472 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4475 assert!(factory
4476 .static_file_provider()
4477 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4478 .is_some_and(|num| num == 2),);
4479
4480 let factory = factory.with_prune_modes(PruneModes {
4484 receipts: Some(PruneMode::Before(100)),
4485 ..Default::default()
4486 });
4487 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4488 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4489 write_receipts(provider_rw, 3);
4490
4491 let provider = factory.provider().unwrap();
4496 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4497 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4498 assert!(provider
4499 .receipts_by_block(num.into())
4500 .unwrap()
4501 .is_some_and(|r| r.len() == num_receipts));
4502
4503 let receipt = provider.receipt(num).unwrap();
4504 if num_receipts > 0 {
4505 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4506 } else {
4507 assert!(receipt.is_none());
4508 }
4509 }
4510 }
4511 }
4512
4513 #[test]
4514 fn test_try_into_history_rejects_unexecuted_blocks() {
4515 use reth_storage_api::TryIntoHistoricalStateProvider;
4516
4517 let factory = create_test_provider_factory();
4518
4519 let data = BlockchainTestData::default();
4521 let provider_rw = factory.provider_rw().unwrap();
4522 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4523 provider_rw
4524 .write_state(
4525 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4526 crate::OriginalValuesKnown::No,
4527 StateWriteConfig::default(),
4528 )
4529 .unwrap();
4530 provider_rw.commit().unwrap();
4531
4532 let provider = factory.provider().unwrap();
4534
4535 let result = provider.try_into_history_at_block(0);
4537 assert!(result.is_ok(), "Block 0 should be available");
4538
4539 let provider = factory.provider().unwrap();
4541 let result = provider.try_into_history_at_block(100);
4542
4543 match result {
4545 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4546 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4547 Ok(_) => panic!("Expected error, got Ok"),
4548 }
4549 }
4550
4551 #[test]
4552 fn test_unwind_storage_hashing_with_hashed_state() {
4553 let factory = create_test_provider_factory();
4554 let storage_settings = StorageSettings::v2();
4555 factory.set_storage_settings_cache(storage_settings);
4556
4557 let address = Address::random();
4558 let hashed_address = keccak256(address);
4559
4560 let plain_slot = B256::random();
4561 let hashed_slot = keccak256(plain_slot);
4562
4563 let current_value = U256::from(100);
4564 let old_value = U256::from(42);
4565
4566 let provider_rw = factory.provider_rw().unwrap();
4567 provider_rw
4568 .tx
4569 .cursor_dup_write::<tables::HashedStorages>()
4570 .unwrap()
4571 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4572 .unwrap();
4573
4574 let changesets = vec![(
4575 BlockNumberAddress((1, address)),
4576 StorageEntry { key: plain_slot, value: old_value },
4577 )];
4578
4579 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4580
4581 assert_eq!(result.len(), 1);
4582 assert!(result.contains_key(&hashed_address));
4583 assert!(result[&hashed_address].contains(&hashed_slot));
4584
4585 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4586 let entry = cursor
4587 .seek_by_key_subkey(hashed_address, hashed_slot)
4588 .unwrap()
4589 .expect("entry should exist");
4590 assert_eq!(entry.key, hashed_slot);
4591 assert_eq!(entry.value, old_value);
4592 }
4593
4594 #[test]
4595 fn test_write_and_remove_state_roundtrip_legacy() {
4596 let factory = create_test_provider_factory();
4597 let storage_settings = StorageSettings::v1();
4598 assert!(!storage_settings.use_hashed_state());
4599 factory.set_storage_settings_cache(storage_settings);
4600
4601 let address = Address::with_last_byte(1);
4602 let hashed_address = keccak256(address);
4603 let slot = U256::from(5);
4604 let slot_key = B256::from(slot);
4605 let hashed_slot = keccak256(slot_key);
4606
4607 let mut rng = generators::rng();
4608 let block0 =
4609 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4610 let block1 =
4611 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4612
4613 {
4614 let provider_rw = factory.provider_rw().unwrap();
4615 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4616 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4617 provider_rw
4618 .tx
4619 .cursor_write::<tables::PlainAccountState>()
4620 .unwrap()
4621 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4622 .unwrap();
4623 provider_rw.commit().unwrap();
4624 }
4625
4626 let provider_rw = factory.provider_rw().unwrap();
4627
4628 let mut state_init: BundleStateInit = AddressMap::default();
4629 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4630 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4631 state_init.insert(
4632 address,
4633 (
4634 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4635 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4636 storage_map,
4637 ),
4638 );
4639
4640 let mut reverts_init: RevertsInit = HashMap::default();
4641 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4642 block_reverts.insert(
4643 address,
4644 (
4645 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4646 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4647 ),
4648 );
4649 reverts_init.insert(1, block_reverts);
4650
4651 let execution_outcome =
4652 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4653
4654 provider_rw
4655 .write_state(
4656 &execution_outcome,
4657 OriginalValuesKnown::Yes,
4658 StateWriteConfig {
4659 write_receipts: false,
4660 write_account_changesets: true,
4661 write_storage_changesets: true,
4662 },
4663 )
4664 .unwrap();
4665
4666 let hashed_state =
4667 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4668 provider_rw.write_hashed_state(&hashed_state).unwrap();
4669
4670 let account = provider_rw
4671 .tx
4672 .cursor_read::<tables::PlainAccountState>()
4673 .unwrap()
4674 .seek_exact(address)
4675 .unwrap()
4676 .unwrap()
4677 .1;
4678 assert_eq!(account.nonce, 1);
4679
4680 let storage_entry = provider_rw
4681 .tx
4682 .cursor_dup_read::<tables::PlainStorageState>()
4683 .unwrap()
4684 .seek_by_key_subkey(address, slot_key)
4685 .unwrap()
4686 .unwrap();
4687 assert_eq!(storage_entry.key, slot_key);
4688 assert_eq!(storage_entry.value, U256::from(10));
4689
4690 let hashed_entry = provider_rw
4691 .tx
4692 .cursor_dup_read::<tables::HashedStorages>()
4693 .unwrap()
4694 .seek_by_key_subkey(hashed_address, hashed_slot)
4695 .unwrap()
4696 .unwrap();
4697 assert_eq!(hashed_entry.key, hashed_slot);
4698 assert_eq!(hashed_entry.value, U256::from(10));
4699
4700 let account_cs_entries = provider_rw
4701 .tx
4702 .cursor_dup_read::<tables::AccountChangeSets>()
4703 .unwrap()
4704 .walk(Some(1))
4705 .unwrap()
4706 .collect::<Result<Vec<_>, _>>()
4707 .unwrap();
4708 assert!(!account_cs_entries.is_empty());
4709
4710 let storage_cs_entries = provider_rw
4711 .tx
4712 .cursor_read::<tables::StorageChangeSets>()
4713 .unwrap()
4714 .walk(Some(BlockNumberAddress((1, address))))
4715 .unwrap()
4716 .collect::<Result<Vec<_>, _>>()
4717 .unwrap();
4718 assert!(!storage_cs_entries.is_empty());
4719 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4720
4721 provider_rw.remove_state_above(0).unwrap();
4722
4723 let restored_account = provider_rw
4724 .tx
4725 .cursor_read::<tables::PlainAccountState>()
4726 .unwrap()
4727 .seek_exact(address)
4728 .unwrap()
4729 .unwrap()
4730 .1;
4731 assert_eq!(restored_account.nonce, 0);
4732
4733 let storage_gone = provider_rw
4734 .tx
4735 .cursor_dup_read::<tables::PlainStorageState>()
4736 .unwrap()
4737 .seek_by_key_subkey(address, slot_key)
4738 .unwrap();
4739 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4740
4741 let account_cs_after = provider_rw
4742 .tx
4743 .cursor_dup_read::<tables::AccountChangeSets>()
4744 .unwrap()
4745 .walk(Some(1))
4746 .unwrap()
4747 .collect::<Result<Vec<_>, _>>()
4748 .unwrap();
4749 assert!(account_cs_after.is_empty());
4750
4751 let storage_cs_after = provider_rw
4752 .tx
4753 .cursor_read::<tables::StorageChangeSets>()
4754 .unwrap()
4755 .walk(Some(BlockNumberAddress((1, address))))
4756 .unwrap()
4757 .collect::<Result<Vec<_>, _>>()
4758 .unwrap();
4759 assert!(storage_cs_after.is_empty());
4760 }
4761
4762 #[test]
4763 fn test_unwind_storage_hashing_legacy() {
4764 let factory = create_test_provider_factory();
4765 let storage_settings = StorageSettings::v1();
4766 assert!(!storage_settings.use_hashed_state());
4767 factory.set_storage_settings_cache(storage_settings);
4768
4769 let address = Address::random();
4770 let hashed_address = keccak256(address);
4771
4772 let plain_slot = B256::random();
4773 let hashed_slot = keccak256(plain_slot);
4774
4775 let current_value = U256::from(100);
4776 let old_value = U256::from(42);
4777
4778 let provider_rw = factory.provider_rw().unwrap();
4779 provider_rw
4780 .tx
4781 .cursor_dup_write::<tables::HashedStorages>()
4782 .unwrap()
4783 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4784 .unwrap();
4785
4786 let changesets = vec![(
4787 BlockNumberAddress((1, address)),
4788 StorageEntry { key: plain_slot, value: old_value },
4789 )];
4790
4791 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4792
4793 assert_eq!(result.len(), 1);
4794 assert!(result.contains_key(&hashed_address));
4795 assert!(result[&hashed_address].contains(&hashed_slot));
4796
4797 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4798 let entry = cursor
4799 .seek_by_key_subkey(hashed_address, hashed_slot)
4800 .unwrap()
4801 .expect("entry should exist");
4802 assert_eq!(entry.key, hashed_slot);
4803 assert_eq!(entry.value, old_value);
4804 }
4805
4806 #[test]
4807 fn test_write_state_and_historical_read_hashed() {
4808 use reth_storage_api::StateProvider;
4809 use reth_trie::{HashedPostState, KeccakKeyHasher};
4810 use revm_database::BundleState;
4811 use revm_state::AccountInfo;
4812
4813 let factory = create_test_provider_factory();
4814 factory.set_storage_settings_cache(StorageSettings::v2());
4815
4816 let address = Address::with_last_byte(1);
4817 let slot = U256::from(5);
4818 let slot_key = B256::from(slot);
4819 let hashed_address = keccak256(address);
4820 let hashed_slot = keccak256(slot_key);
4821
4822 {
4823 let sf = factory.static_file_provider();
4824 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4825 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4826 hw.append_header(&h0, &B256::ZERO).unwrap();
4827 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4828 hw.append_header(&h1, &B256::ZERO).unwrap();
4829 hw.commit().unwrap();
4830
4831 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4832 aw.append_account_changeset(vec![], 0).unwrap();
4833 aw.commit().unwrap();
4834
4835 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4836 sw.append_storage_changeset(vec![], 0).unwrap();
4837 sw.commit().unwrap();
4838 }
4839
4840 let provider_rw = factory.provider_rw().unwrap();
4841
4842 let bundle = BundleState::builder(1..=1)
4843 .state_present_account_info(
4844 address,
4845 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4846 )
4847 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4848 .revert_account_info(1, address, Some(None))
4849 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4850 .build();
4851
4852 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4853
4854 provider_rw
4855 .tx
4856 .put::<tables::BlockBodyIndices>(
4857 1,
4858 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4859 )
4860 .unwrap();
4861
4862 provider_rw
4863 .write_state(
4864 &execution_outcome,
4865 OriginalValuesKnown::Yes,
4866 StateWriteConfig {
4867 write_receipts: false,
4868 write_account_changesets: true,
4869 write_storage_changesets: true,
4870 },
4871 )
4872 .unwrap();
4873
4874 let hashed_state =
4875 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4876 provider_rw.write_hashed_state(&hashed_state).unwrap();
4877
4878 let plain_storage_entries = provider_rw
4879 .tx
4880 .cursor_dup_read::<tables::PlainStorageState>()
4881 .unwrap()
4882 .walk(None)
4883 .unwrap()
4884 .collect::<Result<Vec<_>, _>>()
4885 .unwrap();
4886 assert!(plain_storage_entries.is_empty());
4887
4888 let hashed_entry = provider_rw
4889 .tx
4890 .cursor_dup_read::<tables::HashedStorages>()
4891 .unwrap()
4892 .seek_by_key_subkey(hashed_address, hashed_slot)
4893 .unwrap()
4894 .unwrap();
4895 assert_eq!(hashed_entry.key, hashed_slot);
4896 assert_eq!(hashed_entry.value, U256::from(10));
4897
4898 provider_rw.static_file_provider().commit().unwrap();
4899
4900 let sf = factory.static_file_provider();
4901 let storage_cs = sf.storage_changeset(1).unwrap();
4902 assert!(!storage_cs.is_empty());
4903 assert_eq!(storage_cs[0].1.key, slot_key);
4904
4905 let account_cs = sf.account_block_changeset(1).unwrap();
4906 assert!(!account_cs.is_empty());
4907 assert_eq!(account_cs[0].address, address);
4908
4909 let historical_value =
4910 HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4911 assert_eq!(historical_value, None);
4912 }
4913
4914 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4915 enum StorageMode {
4916 V1,
4917 V2,
4918 }
4919
4920 fn run_save_blocks_and_verify(mode: StorageMode) {
4921 use alloy_primitives::map::{FbBuildHasher, HashMap};
4922
4923 let factory = create_test_provider_factory();
4924
4925 match mode {
4926 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4927 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4928 }
4929
4930 let num_blocks = 3u64;
4931 let accounts_per_block = 5usize;
4932 let slots_per_account = 3usize;
4933
4934 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4935 SealedHeader::new(
4936 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4937 B256::ZERO,
4938 ),
4939 Default::default(),
4940 );
4941
4942 let genesis_executed = ExecutedBlock::new(
4943 Arc::new(genesis.try_recover().unwrap()),
4944 Arc::new(BlockExecutionOutput {
4945 result: BlockExecutionResult {
4946 receipts: vec![],
4947 requests: Default::default(),
4948 gas_used: 0,
4949 blob_gas_used: 0,
4950 },
4951 state: Default::default(),
4952 }),
4953 ComputedTrieData::default(),
4954 );
4955 let provider_rw = factory.provider_rw().unwrap();
4956 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
4957 provider_rw.commit().unwrap();
4958
4959 let mut blocks: Vec<ExecutedBlock> = Vec::new();
4960 let mut parent_hash = B256::ZERO;
4961
4962 for block_num in 1..=num_blocks {
4963 let mut builder = BundleState::builder(block_num..=block_num);
4964
4965 for acct_idx in 0..accounts_per_block {
4966 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
4967 let info = AccountInfo {
4968 nonce: block_num,
4969 balance: U256::from(block_num * 100 + acct_idx as u64),
4970 ..Default::default()
4971 };
4972
4973 let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
4974 slots_per_account as u64)
4975 .map(|s| {
4976 (
4977 U256::from(s + acct_idx as u64 * 100),
4978 (U256::ZERO, U256::from(block_num * 1000 + s)),
4979 )
4980 })
4981 .collect();
4982
4983 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
4984 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
4985 .collect();
4986
4987 builder = builder
4988 .state_present_account_info(address, info)
4989 .revert_account_info(block_num, address, Some(None))
4990 .state_storage(address, storage)
4991 .revert_storage(block_num, address, revert_storage);
4992 }
4993
4994 let bundle = builder.build();
4995
4996 let hashed_state =
4997 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4998
4999 let header = Header {
5000 number: block_num,
5001 parent_hash,
5002 difficulty: U256::from(1),
5003 ..Default::default()
5004 };
5005 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5006 header,
5007 Default::default(),
5008 );
5009 parent_hash = block.hash();
5010
5011 let executed = ExecutedBlock::new(
5012 Arc::new(block.try_recover().unwrap()),
5013 Arc::new(BlockExecutionOutput {
5014 result: BlockExecutionResult {
5015 receipts: vec![],
5016 requests: Default::default(),
5017 gas_used: 0,
5018 blob_gas_used: 0,
5019 },
5020 state: bundle,
5021 }),
5022 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5023 );
5024 blocks.push(executed);
5025 }
5026
5027 let provider_rw = factory.provider_rw().unwrap();
5028 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5029 provider_rw.commit().unwrap();
5030
5031 let provider = factory.provider().unwrap();
5032
5033 for block_num in 1..=num_blocks {
5034 for acct_idx in 0..accounts_per_block {
5035 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5036 let hashed_address = keccak256(address);
5037
5038 let ha_entry = provider
5039 .tx_ref()
5040 .cursor_read::<tables::HashedAccounts>()
5041 .unwrap()
5042 .seek_exact(hashed_address)
5043 .unwrap();
5044 assert!(
5045 ha_entry.is_some(),
5046 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5047 );
5048
5049 for s in 1..=slots_per_account as u64 {
5050 let slot = U256::from(s + acct_idx as u64 * 100);
5051 let slot_key = B256::from(slot);
5052 let hashed_slot = keccak256(slot_key);
5053
5054 let hs_entry = provider
5055 .tx_ref()
5056 .cursor_dup_read::<tables::HashedStorages>()
5057 .unwrap()
5058 .seek_by_key_subkey(hashed_address, hashed_slot)
5059 .unwrap();
5060 assert!(
5061 hs_entry.is_some(),
5062 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5063 );
5064 let entry = hs_entry.unwrap();
5065 assert_eq!(entry.key, hashed_slot);
5066 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5067 }
5068 }
5069 }
5070
5071 for block_num in 1..=num_blocks {
5072 let header = provider.header_by_number(block_num).unwrap();
5073 assert!(header.is_some(), "Header missing for block {block_num}");
5074
5075 let indices = provider.block_body_indices(block_num).unwrap();
5076 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5077 }
5078
5079 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5080 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5081
5082 if mode == StorageMode::V2 {
5083 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5084 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5085
5086 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5087 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5088
5089 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5090 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5091
5092 provider.static_file_provider().commit().unwrap();
5093 let sf = factory.static_file_provider();
5094
5095 for block_num in 1..=num_blocks {
5096 let account_cs = sf.account_block_changeset(block_num).unwrap();
5097 assert!(
5098 !account_cs.is_empty(),
5099 "v2: static file AccountChangeSets should exist for block {block_num}"
5100 );
5101
5102 let storage_cs = sf.storage_changeset(block_num).unwrap();
5103 assert!(
5104 !storage_cs.is_empty(),
5105 "v2: static file StorageChangeSets should exist for block {block_num}"
5106 );
5107
5108 for (_, entry) in &storage_cs {
5109 assert!(
5110 entry.key != keccak256(entry.key),
5111 "v2: static file storage changeset should have plain slot keys"
5112 );
5113 }
5114 }
5115
5116 let rocksdb = factory.rocksdb_provider();
5117 for block_num in 1..=num_blocks {
5118 for acct_idx in 0..accounts_per_block {
5119 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5120 let shards = rocksdb.account_history_shards(address).unwrap();
5121 assert!(
5122 !shards.is_empty(),
5123 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5124 );
5125
5126 for s in 1..=slots_per_account as u64 {
5127 let slot = U256::from(s + acct_idx as u64 * 100);
5128 let slot_key = B256::from(slot);
5129 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5130 assert!(
5131 !shards.is_empty(),
5132 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5133 );
5134 }
5135 }
5136 }
5137 } else {
5138 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5139 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5140
5141 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5142 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5143
5144 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5145 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5146
5147 for block_num in 1..=num_blocks {
5148 let storage_entries: Vec<_> = provider
5149 .tx_ref()
5150 .cursor_dup_read::<tables::StorageChangeSets>()
5151 .unwrap()
5152 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5153 .unwrap()
5154 .collect::<Result<Vec<_>, _>>()
5155 .unwrap();
5156 assert!(
5157 !storage_entries.is_empty(),
5158 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5159 );
5160
5161 for (_, entry) in &storage_entries {
5162 let slot_key = B256::from(entry.key);
5163 assert!(
5164 slot_key != keccak256(slot_key),
5165 "v1: storage changeset keys should be plain (not hashed)"
5166 );
5167 }
5168 }
5169
5170 let mdbx_account_history =
5171 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5172 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5173
5174 let mdbx_storage_history =
5175 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5176 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5177 }
5178 }
5179
5180 #[test]
5181 fn test_save_blocks_v1_table_assertions() {
5182 run_save_blocks_and_verify(StorageMode::V1);
5183 }
5184
5185 #[test]
5186 fn test_save_blocks_v2_table_assertions() {
5187 run_save_blocks_and_verify(StorageMode::V2);
5188 }
5189
5190 #[test]
5191 fn test_write_and_remove_state_roundtrip_v2() {
5192 let factory = create_test_provider_factory();
5193 let storage_settings = StorageSettings::v2();
5194 assert!(storage_settings.use_hashed_state());
5195 factory.set_storage_settings_cache(storage_settings);
5196
5197 let address = Address::with_last_byte(1);
5198 let hashed_address = keccak256(address);
5199 let slot = U256::from(5);
5200 let slot_key = B256::from(slot);
5201 let hashed_slot = keccak256(slot_key);
5202
5203 {
5204 let sf = factory.static_file_provider();
5205 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5206 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5207 hw.append_header(&h0, &B256::ZERO).unwrap();
5208 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5209 hw.append_header(&h1, &B256::ZERO).unwrap();
5210 hw.commit().unwrap();
5211
5212 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5213 aw.append_account_changeset(vec![], 0).unwrap();
5214 aw.commit().unwrap();
5215
5216 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5217 sw.append_storage_changeset(vec![], 0).unwrap();
5218 sw.commit().unwrap();
5219 }
5220
5221 {
5222 let provider_rw = factory.provider_rw().unwrap();
5223 provider_rw
5224 .tx
5225 .put::<tables::BlockBodyIndices>(
5226 0,
5227 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5228 )
5229 .unwrap();
5230 provider_rw
5231 .tx
5232 .put::<tables::BlockBodyIndices>(
5233 1,
5234 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5235 )
5236 .unwrap();
5237 provider_rw
5238 .tx
5239 .cursor_write::<tables::HashedAccounts>()
5240 .unwrap()
5241 .upsert(
5242 hashed_address,
5243 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5244 )
5245 .unwrap();
5246 provider_rw.commit().unwrap();
5247 }
5248
5249 let provider_rw = factory.provider_rw().unwrap();
5250
5251 let bundle = BundleState::builder(1..=1)
5252 .state_present_account_info(
5253 address,
5254 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5255 )
5256 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5257 .revert_account_info(1, address, Some(None))
5258 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5259 .build();
5260
5261 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5262
5263 provider_rw
5264 .write_state(
5265 &execution_outcome,
5266 OriginalValuesKnown::Yes,
5267 StateWriteConfig {
5268 write_receipts: false,
5269 write_account_changesets: true,
5270 write_storage_changesets: true,
5271 },
5272 )
5273 .unwrap();
5274
5275 let hashed_state =
5276 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5277 provider_rw.write_hashed_state(&hashed_state).unwrap();
5278
5279 let hashed_account = provider_rw
5280 .tx
5281 .cursor_read::<tables::HashedAccounts>()
5282 .unwrap()
5283 .seek_exact(hashed_address)
5284 .unwrap()
5285 .unwrap()
5286 .1;
5287 assert_eq!(hashed_account.nonce, 1);
5288
5289 let hashed_entry = provider_rw
5290 .tx
5291 .cursor_dup_read::<tables::HashedStorages>()
5292 .unwrap()
5293 .seek_by_key_subkey(hashed_address, hashed_slot)
5294 .unwrap()
5295 .unwrap();
5296 assert_eq!(hashed_entry.key, hashed_slot);
5297 assert_eq!(hashed_entry.value, U256::from(10));
5298
5299 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5300 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5301
5302 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5303 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5304
5305 provider_rw.static_file_provider().commit().unwrap();
5306
5307 let sf = factory.static_file_provider();
5308 let storage_cs = sf.storage_changeset(1).unwrap();
5309 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5310 assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5311
5312 provider_rw.remove_state_above(0).unwrap();
5313
5314 let restored_account = provider_rw
5315 .tx
5316 .cursor_read::<tables::HashedAccounts>()
5317 .unwrap()
5318 .seek_exact(hashed_address)
5319 .unwrap();
5320 assert!(
5321 restored_account.is_none(),
5322 "v2: account should be removed (didn't exist before block 1)"
5323 );
5324
5325 let storage_gone = provider_rw
5326 .tx
5327 .cursor_dup_read::<tables::HashedStorages>()
5328 .unwrap()
5329 .seek_by_key_subkey(hashed_address, hashed_slot)
5330 .unwrap();
5331 assert!(
5332 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5333 "v2: storage should be reverted (removed or different key)"
5334 );
5335
5336 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5337 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5338
5339 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5340 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5341 }
5342
5343 #[test]
5344 fn test_unwind_storage_history_indices_v2() {
5345 let factory = create_test_provider_factory();
5346 factory.set_storage_settings_cache(StorageSettings::v2());
5347
5348 let address = Address::with_last_byte(1);
5349 let slot_key = B256::from(U256::from(42));
5350
5351 {
5352 let rocksdb = factory.rocksdb_provider();
5353 let mut batch = rocksdb.batch();
5354 batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5355 batch.commit().unwrap();
5356
5357 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5358 assert!(!shards.is_empty(), "history should be written to rocksdb");
5359 }
5360
5361 let provider_rw = factory.provider_rw().unwrap();
5362
5363 let changesets = vec![
5364 (
5365 BlockNumberAddress((7, address)),
5366 StorageEntry { key: slot_key, value: U256::from(5) },
5367 ),
5368 (
5369 BlockNumberAddress((10, address)),
5370 StorageEntry { key: slot_key, value: U256::from(8) },
5371 ),
5372 ];
5373
5374 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5375 assert_eq!(count, 2);
5376
5377 provider_rw.commit().unwrap();
5378
5379 let rocksdb = factory.rocksdb_provider();
5380 let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5381
5382 assert!(
5383 !shards.is_empty(),
5384 "history shards should still exist with block 3 after partial unwind"
5385 );
5386
5387 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5388 assert!(all_blocks.contains(&3), "block 3 should remain");
5389 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5390 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5391 }
5392}