1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, AddressSet, B256Map, HashMap},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::Database,
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44 StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
55 StorageSlotKey,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, ChangesetEntry, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader,
65 StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 HashedPostStateSorted, StoredNibbles,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor};
73use revm_database::states::{
74 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77 cmp::Ordering,
78 collections::{BTreeMap, BTreeSet},
79 fmt::Debug,
80 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81 sync::Arc,
82 time::Instant,
83};
84use tracing::{debug, instrument, trace};
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89 #[default]
91 Normal,
92 Unwind,
95}
96
97impl CommitOrder {
98 pub const fn is_unwind(&self) -> bool {
100 matches!(self, Self::Unwind)
101 }
102}
103
104pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113 pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119 fn deref(&self) -> &Self::Target {
120 &self.0
121 }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.0
127 }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131 for DatabaseProviderRW<DB, N>
132{
133 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134 &self.0
135 }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139 pub fn commit(self) -> ProviderResult<()> {
141 self.0.commit()
142 }
143
144 pub fn into_tx(self) -> <DB as Database>::TXMut {
146 self.0.into_tx()
147 }
148
149 #[cfg(any(test, feature = "test-utils"))]
151 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152 self.0.minimum_pruning_distance = distance;
153 self
154 }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158 for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161 provider.0
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168 Full,
171 BlocksOnly,
175}
176
177impl SaveBlocksMode {
178 pub const fn with_state(self) -> bool {
180 matches!(self, Self::Full)
181 }
182}
183
184pub struct DatabaseProvider<TX, N: NodeTypes> {
187 tx: TX,
189 chain_spec: Arc<N::ChainSpec>,
191 static_file_provider: StaticFileProvider<N::Primitives>,
193 prune_modes: PruneModes,
195 storage: Arc<N::Storage>,
197 storage_settings: Arc<RwLock<StorageSettings>>,
199 rocksdb_provider: RocksDBProvider,
201 changeset_cache: ChangesetCache,
203 runtime: reth_tasks::Runtime,
205 #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
207 pending_rocksdb_batches: PendingRocksDBBatches,
208 commit_order: CommitOrder,
210 minimum_pruning_distance: u64,
212 metrics: metrics::DatabaseProviderMetrics,
214}
215
216impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 let mut s = f.debug_struct("DatabaseProvider");
219 s.field("tx", &self.tx)
220 .field("chain_spec", &self.chain_spec)
221 .field("static_file_provider", &self.static_file_provider)
222 .field("prune_modes", &self.prune_modes)
223 .field("storage", &self.storage)
224 .field("storage_settings", &self.storage_settings)
225 .field("rocksdb_provider", &self.rocksdb_provider)
226 .field("changeset_cache", &self.changeset_cache)
227 .field("runtime", &self.runtime)
228 .field("pending_rocksdb_batches", &"<pending batches>")
229 .field("commit_order", &self.commit_order)
230 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
231 .finish()
232 }
233}
234
235impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
236 pub const fn prune_modes_ref(&self) -> &PruneModes {
238 &self.prune_modes
239 }
240}
241
242impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
243 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
245 trace!(target: "providers::db", "Returning latest state provider");
246 Box::new(LatestStateProviderRef::new(self))
247 }
248
249 pub fn history_by_block_hash<'a>(
251 &'a self,
252 block_hash: BlockHash,
253 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
254 let mut block_number =
255 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
256 if block_number == self.best_block_number().unwrap_or_default() &&
257 block_number == self.last_block_number().unwrap_or_default()
258 {
259 return Ok(Box::new(LatestStateProviderRef::new(self)))
260 }
261
262 block_number += 1;
264
265 let account_history_prune_checkpoint =
266 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
267 let storage_history_prune_checkpoint =
268 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
269
270 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
271
272 if let Some(prune_checkpoint_block_number) =
275 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
276 {
277 state_provider = state_provider.with_lowest_available_account_history_block_number(
278 prune_checkpoint_block_number + 1,
279 );
280 }
281 if let Some(prune_checkpoint_block_number) =
282 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
283 {
284 state_provider = state_provider.with_lowest_available_storage_history_block_number(
285 prune_checkpoint_block_number + 1,
286 );
287 }
288
289 Ok(Box::new(state_provider))
290 }
291
292 #[cfg(feature = "test-utils")]
293 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
295 self.prune_modes = prune_modes;
296 }
297}
298
299impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
300 type Primitives = N::Primitives;
301}
302
303impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
304 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
306 self.static_file_provider.clone()
307 }
308
309 fn get_static_file_writer(
310 &self,
311 block: BlockNumber,
312 segment: StaticFileSegment,
313 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
314 self.static_file_provider.get_writer(block, segment)
315 }
316}
317
318impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
319 fn rocksdb_provider(&self) -> RocksDBProvider {
321 self.rocksdb_provider.clone()
322 }
323
324 #[cfg(all(unix, feature = "rocksdb"))]
325 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
326 self.pending_rocksdb_batches.lock().push(batch);
327 }
328
329 #[cfg(all(unix, feature = "rocksdb"))]
330 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
331 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
332 for batch in batches {
333 self.rocksdb_provider.commit_batch(batch)?;
334 }
335 Ok(())
336 }
337}
338
339impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
340 for DatabaseProvider<TX, N>
341{
342 type ChainSpec = N::ChainSpec;
343
344 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
345 self.chain_spec.clone()
346 }
347}
348
349impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
350 #[allow(clippy::too_many_arguments)]
352 fn new_rw_inner(
353 tx: TX,
354 chain_spec: Arc<N::ChainSpec>,
355 static_file_provider: StaticFileProvider<N::Primitives>,
356 prune_modes: PruneModes,
357 storage: Arc<N::Storage>,
358 storage_settings: Arc<RwLock<StorageSettings>>,
359 rocksdb_provider: RocksDBProvider,
360 changeset_cache: ChangesetCache,
361 runtime: reth_tasks::Runtime,
362 commit_order: CommitOrder,
363 ) -> Self {
364 Self {
365 tx,
366 chain_spec,
367 static_file_provider,
368 prune_modes,
369 storage,
370 storage_settings,
371 rocksdb_provider,
372 changeset_cache,
373 runtime,
374 pending_rocksdb_batches: Default::default(),
375 commit_order,
376 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
377 metrics: metrics::DatabaseProviderMetrics::default(),
378 }
379 }
380
381 #[allow(clippy::too_many_arguments)]
383 pub fn new_rw(
384 tx: TX,
385 chain_spec: Arc<N::ChainSpec>,
386 static_file_provider: StaticFileProvider<N::Primitives>,
387 prune_modes: PruneModes,
388 storage: Arc<N::Storage>,
389 storage_settings: Arc<RwLock<StorageSettings>>,
390 rocksdb_provider: RocksDBProvider,
391 changeset_cache: ChangesetCache,
392 runtime: reth_tasks::Runtime,
393 ) -> Self {
394 Self::new_rw_inner(
395 tx,
396 chain_spec,
397 static_file_provider,
398 prune_modes,
399 storage,
400 storage_settings,
401 rocksdb_provider,
402 changeset_cache,
403 runtime,
404 CommitOrder::Normal,
405 )
406 }
407
408 #[allow(clippy::too_many_arguments)]
410 pub fn new_unwind_rw(
411 tx: TX,
412 chain_spec: Arc<N::ChainSpec>,
413 static_file_provider: StaticFileProvider<N::Primitives>,
414 prune_modes: PruneModes,
415 storage: Arc<N::Storage>,
416 storage_settings: Arc<RwLock<StorageSettings>>,
417 rocksdb_provider: RocksDBProvider,
418 changeset_cache: ChangesetCache,
419 runtime: reth_tasks::Runtime,
420 ) -> Self {
421 Self::new_rw_inner(
422 tx,
423 chain_spec,
424 static_file_provider,
425 prune_modes,
426 storage,
427 storage_settings,
428 rocksdb_provider,
429 changeset_cache,
430 runtime,
431 CommitOrder::Unwind,
432 )
433 }
434}
435
436impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
437 fn as_ref(&self) -> &Self {
438 self
439 }
440}
441
442impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
443 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
447 where
448 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
449 {
450 #[cfg(all(unix, feature = "rocksdb"))]
451 let rocksdb = self.rocksdb_provider();
452 #[cfg(all(unix, feature = "rocksdb"))]
453 let rocksdb_batch = rocksdb.batch();
454 #[cfg(not(all(unix, feature = "rocksdb")))]
455 let rocksdb_batch = ();
456
457 let (result, raw_batch) = f(rocksdb_batch)?;
458
459 #[cfg(all(unix, feature = "rocksdb"))]
460 if let Some(batch) = raw_batch {
461 self.set_pending_rocksdb_batch(batch);
462 }
463 let _ = raw_batch; Ok(result)
466 }
467
468 fn static_file_write_ctx(
470 &self,
471 save_mode: SaveBlocksMode,
472 first_block: BlockNumber,
473 last_block: BlockNumber,
474 ) -> ProviderResult<StaticFileWriteCtx> {
475 let tip = self.last_block_number()?.max(last_block);
476 Ok(StaticFileWriteCtx {
477 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
478 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
479 write_receipts: save_mode.with_state() &&
480 EitherWriter::receipts_destination(self).is_static_file(),
481 write_account_changesets: save_mode.with_state() &&
482 EitherWriterDestination::account_changesets(self).is_static_file(),
483 write_storage_changesets: save_mode.with_state() &&
484 EitherWriterDestination::storage_changesets(self).is_static_file(),
485 tip,
486 receipts_prune_mode: self.prune_modes.receipts,
487 receipts_prunable: self
489 .static_file_provider
490 .get_highest_static_file_tx(StaticFileSegment::Receipts)
491 .is_none() &&
492 PruneMode::Distance(self.minimum_pruning_distance)
493 .should_prune(first_block, tip),
494 })
495 }
496
497 #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
499 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
500 RocksDBWriteCtx {
501 first_block_number: first_block,
502 prune_tx_lookup: self.prune_modes.transaction_lookup,
503 storage_settings: self.cached_storage_settings(),
504 pending_batches: self.pending_rocksdb_batches.clone(),
505 }
506 }
507
508 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
517 pub fn save_blocks(
518 &self,
519 blocks: Vec<ExecutedBlock<N::Primitives>>,
520 save_mode: SaveBlocksMode,
521 ) -> ProviderResult<()> {
522 if blocks.is_empty() {
523 debug!(target: "providers::db", "Attempted to write empty block range");
524 return Ok(())
525 }
526
527 let total_start = Instant::now();
528 let block_count = blocks.len() as u64;
529 let first_number = blocks.first().unwrap().recovered_block().number();
530 let last_block_number = blocks.last().unwrap().recovered_block().number();
531
532 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
533
534 let first_tx_num = self
536 .tx
537 .cursor_read::<tables::TransactionBlocks>()?
538 .last()?
539 .map(|(n, _)| n + 1)
540 .unwrap_or_default();
541
542 let tx_nums: Vec<TxNumber> = {
543 let mut nums = Vec::with_capacity(blocks.len());
544 let mut current = first_tx_num;
545 for block in &blocks {
546 nums.push(current);
547 current += block.recovered_block().body().transaction_count() as u64;
548 }
549 nums
550 };
551
552 let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
553
554 let sf_provider = &self.static_file_provider;
556 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
557 #[cfg(all(unix, feature = "rocksdb"))]
558 let rocksdb_provider = self.rocksdb_provider.clone();
559 #[cfg(all(unix, feature = "rocksdb"))]
560 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
561 #[cfg(all(unix, feature = "rocksdb"))]
562 let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
563
564 let mut sf_result = None;
565 #[cfg(all(unix, feature = "rocksdb"))]
566 let mut rocksdb_result = None;
567
568 let runtime = &self.runtime;
570 runtime.storage_pool().in_place_scope(|s| {
571 s.spawn(|_| {
573 let start = Instant::now();
574 sf_result = Some(
575 sf_provider
576 .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
577 .map(|()| start.elapsed()),
578 );
579 });
580
581 #[cfg(all(unix, feature = "rocksdb"))]
583 if rocksdb_enabled {
584 s.spawn(|_| {
585 let start = Instant::now();
586 rocksdb_result = Some(
587 rocksdb_provider
588 .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
589 .map(|()| start.elapsed()),
590 );
591 });
592 }
593
594 let mdbx_start = Instant::now();
596
597 if !self.cached_storage_settings().storage_v2 &&
599 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
600 {
601 let start = Instant::now();
602 let total_tx_count: usize =
603 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
604 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
605 for (i, block) in blocks.iter().enumerate() {
606 let recovered_block = block.recovered_block();
607 let mut tx_num = tx_nums[i];
608 for transaction in recovered_block.body().transactions_iter() {
609 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
610 tx_num += 1;
611 }
612 }
613
614 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
616
617 self.with_rocksdb_batch(|batch| {
619 let mut tx_hash_writer =
620 EitherWriter::new_transaction_hash_numbers(self, batch)?;
621 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
622 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
623 Ok(((), raw_batch))
624 })?;
625 self.metrics.record_duration(
626 metrics::Action::InsertTransactionHashNumbers,
627 start.elapsed(),
628 );
629 }
630
631 for (i, block) in blocks.iter().enumerate() {
632 let recovered_block = block.recovered_block();
633
634 let start = Instant::now();
635 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
636 timings.insert_block += start.elapsed();
637
638 if save_mode.with_state() {
639 let execution_output = block.execution_outcome();
640
641 let start = Instant::now();
645 self.write_state(
646 WriteStateInput::Single {
647 outcome: execution_output,
648 block: recovered_block.number(),
649 },
650 OriginalValuesKnown::No,
651 StateWriteConfig {
652 write_receipts: !sf_ctx.write_receipts,
653 write_account_changesets: !sf_ctx.write_account_changesets,
654 write_storage_changesets: !sf_ctx.write_storage_changesets,
655 },
656 )?;
657 timings.write_state += start.elapsed();
658 }
659 }
660
661 if save_mode.with_state() {
664 let start = Instant::now();
666 let merged_hashed_state = HashedPostStateSorted::merge_batch(
667 blocks.iter().rev().map(|b| b.trie_data().hashed_state),
668 );
669 if !merged_hashed_state.is_empty() {
670 self.write_hashed_state(&merged_hashed_state)?;
671 }
672 timings.write_hashed_state += start.elapsed();
673
674 let start = Instant::now();
675 let merged_trie =
676 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
677 if !merged_trie.is_empty() {
678 self.write_trie_updates_sorted(&merged_trie)?;
679 }
680 timings.write_trie_updates += start.elapsed();
681 }
682
683 if save_mode.with_state() {
685 let start = Instant::now();
686 self.update_history_indices(first_number..=last_block_number)?;
687 timings.update_history_indices = start.elapsed();
688 }
689
690 let start = Instant::now();
692 self.update_pipeline_stages(last_block_number, false)?;
693 timings.update_pipeline_stages = start.elapsed();
694
695 timings.mdbx = mdbx_start.elapsed();
696
697 Ok::<_, ProviderError>(())
698 })?;
699
700 timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
702
703 #[cfg(all(unix, feature = "rocksdb"))]
704 if rocksdb_enabled {
705 timings.rocksdb = rocksdb_result.ok_or_else(|| {
706 ProviderError::Database(reth_db_api::DatabaseError::Other(
707 "RocksDB thread panicked".into(),
708 ))
709 })??;
710 }
711
712 timings.total = total_start.elapsed();
713
714 self.metrics.record_save_blocks(&timings);
715 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
716
717 Ok(())
718 }
719
720 #[instrument(level = "debug", target = "providers::db", skip_all)]
724 fn insert_block_mdbx_only(
725 &self,
726 block: &RecoveredBlock<BlockTy<N>>,
727 first_tx_num: TxNumber,
728 ) -> ProviderResult<StoredBlockBodyIndices> {
729 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
730 EitherWriterDestination::senders(self).is_database()
731 {
732 let start = Instant::now();
733 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
734 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
735 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
736 cursor.append(tx_num, &sender)?;
737 }
738 self.metrics
739 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
740 }
741
742 let block_number = block.number();
743 let tx_count = block.body().transaction_count() as u64;
744
745 let start = Instant::now();
746 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
747 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
748
749 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
750
751 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
752 }
753
754 fn write_block_body_indices(
757 &self,
758 block_number: BlockNumber,
759 body: &BodyTy<N>,
760 first_tx_num: TxNumber,
761 tx_count: u64,
762 ) -> ProviderResult<()> {
763 let start = Instant::now();
765 self.tx
766 .cursor_write::<tables::BlockBodyIndices>()?
767 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
768 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
769
770 if tx_count > 0 {
772 let start = Instant::now();
773 self.tx
774 .cursor_write::<tables::TransactionBlocks>()?
775 .append(first_tx_num + tx_count - 1, &block_number)?;
776 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
777 }
778
779 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
781
782 Ok(())
783 }
784
785 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
790 let changed_accounts = self.account_changesets_range(from..)?;
791
792 self.unwind_account_hashing(changed_accounts.iter())?;
794
795 self.unwind_account_history_indices(changed_accounts.iter())?;
797
798 let changed_storages = self.storage_changesets_range(from..)?;
799
800 self.unwind_storage_hashing(changed_storages.iter().copied())?;
802
803 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
805
806 let db_tip_block = self
809 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
810 .as_ref()
811 .map(|chk| chk.block_number)
812 .ok_or_else(|| ProviderError::InsufficientChangesets {
813 requested: from,
814 available: 0..=0,
815 })?;
816
817 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
818 self.write_trie_updates_sorted(&trie_revert)?;
819
820 Ok(())
821 }
822
823 fn remove_receipts_from(
825 &self,
826 from_tx: TxNumber,
827 last_block: BlockNumber,
828 ) -> ProviderResult<()> {
829 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
831
832 if EitherWriter::receipts_destination(self).is_static_file() {
833 let static_file_receipt_num =
834 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
835
836 let to_delete = static_file_receipt_num
837 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
838 .unwrap_or_default();
839
840 self.static_file_provider
841 .latest_writer(StaticFileSegment::Receipts)?
842 .prune_receipts(to_delete, last_block)?;
843 }
844
845 Ok(())
846 }
847}
848
849impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
850 fn try_into_history_at_block(
851 self,
852 mut block_number: BlockNumber,
853 ) -> ProviderResult<StateProviderBox> {
854 let best_block = self.best_block_number().unwrap_or_default();
855
856 if block_number > best_block {
858 return Err(ProviderError::BlockNotExecuted {
859 requested: block_number,
860 executed: best_block,
861 });
862 }
863
864 if block_number == best_block {
866 return Ok(Box::new(LatestStateProvider::new(self)));
867 }
868
869 block_number += 1;
871
872 let account_history_prune_checkpoint =
873 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
874 let storage_history_prune_checkpoint =
875 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
876
877 let mut state_provider = HistoricalStateProvider::new(self, block_number);
878
879 if let Some(prune_checkpoint_block_number) =
882 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
883 {
884 state_provider = state_provider.with_lowest_available_account_history_block_number(
885 prune_checkpoint_block_number + 1,
886 );
887 }
888 if let Some(prune_checkpoint_block_number) =
889 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
890 {
891 state_provider = state_provider.with_lowest_available_storage_history_block_number(
892 prune_checkpoint_block_number + 1,
893 );
894 }
895
896 Ok(Box::new(state_provider))
897 }
898}
899
900fn unwind_history_shards<S, T, C>(
915 cursor: &mut C,
916 start_key: T::Key,
917 block_number: BlockNumber,
918 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
919) -> ProviderResult<Vec<u64>>
920where
921 T: Table<Value = BlockNumberList>,
922 T::Key: AsRef<ShardedKey<S>>,
923 C: DbCursorRO<T> + DbCursorRW<T>,
924{
925 let mut item = cursor.seek_exact(start_key)?;
927 while let Some((sharded_key, list)) = item {
928 if !shard_belongs_to_key(&sharded_key) {
930 break
931 }
932
933 cursor.delete_current()?;
936
937 let first = list.iter().next().expect("List can't be empty");
940
941 if first >= block_number {
944 item = cursor.prev()?;
945 continue
946 }
947 else if block_number <= sharded_key.as_ref().highest_block_number {
950 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
953 }
954 return Ok(list.iter().collect::<Vec<_>>())
957 }
958
959 Ok(Vec::new())
961}
962
963impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
964 #[allow(clippy::too_many_arguments)]
966 pub fn new(
967 tx: TX,
968 chain_spec: Arc<N::ChainSpec>,
969 static_file_provider: StaticFileProvider<N::Primitives>,
970 prune_modes: PruneModes,
971 storage: Arc<N::Storage>,
972 storage_settings: Arc<RwLock<StorageSettings>>,
973 rocksdb_provider: RocksDBProvider,
974 changeset_cache: ChangesetCache,
975 runtime: reth_tasks::Runtime,
976 ) -> Self {
977 Self {
978 tx,
979 chain_spec,
980 static_file_provider,
981 prune_modes,
982 storage,
983 storage_settings,
984 rocksdb_provider,
985 changeset_cache,
986 runtime,
987 pending_rocksdb_batches: Default::default(),
988 commit_order: CommitOrder::Normal,
989 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
990 metrics: metrics::DatabaseProviderMetrics::default(),
991 }
992 }
993
994 pub fn into_tx(self) -> TX {
996 self.tx
997 }
998
999 pub const fn tx_mut(&mut self) -> &mut TX {
1001 &mut self.tx
1002 }
1003
1004 pub const fn tx_ref(&self) -> &TX {
1006 &self.tx
1007 }
1008
1009 pub fn chain_spec(&self) -> &N::ChainSpec {
1011 &self.chain_spec
1012 }
1013}
1014
1015impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1016 fn recovered_block<H, HF, B, BF>(
1017 &self,
1018 id: BlockHashOrNumber,
1019 _transaction_kind: TransactionVariant,
1020 header_by_number: HF,
1021 construct_block: BF,
1022 ) -> ProviderResult<Option<B>>
1023 where
1024 H: AsRef<HeaderTy<N>>,
1025 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1026 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1027 {
1028 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1029 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1030
1031 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1038
1039 let tx_range = body.tx_num_range();
1040
1041 let transactions = if tx_range.is_empty() {
1042 vec![]
1043 } else {
1044 self.transactions_by_tx_range(tx_range.clone())?
1045 };
1046
1047 let body = self
1048 .storage
1049 .reader()
1050 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1051 .pop()
1052 .ok_or(ProviderError::InvalidStorageOutput)?;
1053
1054 let senders = if tx_range.is_empty() {
1055 vec![]
1056 } else {
1057 let known_senders: HashMap<TxNumber, Address> =
1058 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1059
1060 let mut senders = Vec::with_capacity(body.transactions().len());
1061 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1062 match known_senders.get(&tx_num) {
1063 None => {
1064 let sender = tx.recover_signer_unchecked()?;
1065 senders.push(sender);
1066 }
1067 Some(sender) => senders.push(*sender),
1068 }
1069 }
1070 senders
1071 };
1072
1073 construct_block(header, body, senders)
1074 }
1075
1076 fn block_range<F, H, HF, R>(
1086 &self,
1087 range: RangeInclusive<BlockNumber>,
1088 headers_range: HF,
1089 mut assemble_block: F,
1090 ) -> ProviderResult<Vec<R>>
1091 where
1092 H: AsRef<HeaderTy<N>>,
1093 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1094 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1095 {
1096 if range.is_empty() {
1097 return Ok(Vec::new())
1098 }
1099
1100 let len = range.end().saturating_sub(*range.start()) as usize;
1101 let mut blocks = Vec::with_capacity(len);
1102
1103 let headers = headers_range(range.clone())?;
1104
1105 let present_headers = self
1111 .block_body_indices_range(range)?
1112 .into_iter()
1113 .map(|b| b.tx_num_range())
1114 .zip(headers)
1115 .collect::<Vec<_>>();
1116
1117 let mut inputs = Vec::with_capacity(present_headers.len());
1118 for (tx_range, header) in &present_headers {
1119 let transactions = if tx_range.is_empty() {
1120 Vec::new()
1121 } else {
1122 self.transactions_by_tx_range(tx_range.clone())?
1123 };
1124
1125 inputs.push((header.as_ref(), transactions));
1126 }
1127
1128 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1129
1130 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1131 blocks.push(assemble_block(header, body, tx_range)?);
1132 }
1133
1134 Ok(blocks)
1135 }
1136
1137 fn block_with_senders_range<H, HF, B, BF>(
1148 &self,
1149 range: RangeInclusive<BlockNumber>,
1150 headers_range: HF,
1151 assemble_block: BF,
1152 ) -> ProviderResult<Vec<B>>
1153 where
1154 H: AsRef<HeaderTy<N>>,
1155 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1156 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1157 {
1158 self.block_range(range, headers_range, |header, body, tx_range| {
1159 let senders = if tx_range.is_empty() {
1160 Vec::new()
1161 } else {
1162 let known_senders: HashMap<TxNumber, Address> =
1163 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1164
1165 let mut senders = Vec::with_capacity(body.transactions().len());
1166 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1167 match known_senders.get(&tx_num) {
1168 None => {
1169 let sender = tx.recover_signer_unchecked()?;
1171 senders.push(sender);
1172 }
1173 Some(sender) => senders.push(*sender),
1174 }
1175 }
1176
1177 senders
1178 };
1179
1180 assemble_block(header, body, senders)
1181 })
1182 }
1183
1184 fn populate_bundle_state<A, S>(
1188 &self,
1189 account_changeset: Vec<(u64, AccountBeforeTx)>,
1190 storage_changeset: Vec<(BlockNumberAddress, ChangesetEntry)>,
1191 plain_accounts_cursor: &mut A,
1192 plain_storage_cursor: &mut S,
1193 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1194 where
1195 A: DbCursorRO<PlainAccountState>,
1196 S: DbDupCursorRO<PlainStorageState>,
1197 {
1198 let mut state: BundleStateInit = HashMap::default();
1202
1203 let mut reverts: RevertsInit = HashMap::default();
1209
1210 for (block_number, account_before) in account_changeset.into_iter().rev() {
1212 let AccountBeforeTx { info: old_info, address } = account_before;
1213 match state.entry(address) {
1214 hash_map::Entry::Vacant(entry) => {
1215 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1216 entry.insert((old_info, new_info, HashMap::default()));
1217 }
1218 hash_map::Entry::Occupied(mut entry) => {
1219 entry.get_mut().0 = old_info;
1221 }
1222 }
1223 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1225 }
1226
1227 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1229 let BlockNumberAddress((block_number, address)) = block_and_address;
1230 let account_state = match state.entry(address) {
1232 hash_map::Entry::Vacant(entry) => {
1233 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1234 entry.insert((present_info, present_info, HashMap::default()))
1235 }
1236 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1237 };
1238
1239 let storage_key = old_storage.key.as_b256();
1241 match account_state.2.entry(storage_key) {
1242 hash_map::Entry::Vacant(entry) => {
1243 let new_storage = plain_storage_cursor
1244 .seek_by_key_subkey(address, storage_key)?
1245 .filter(|storage| storage.key == storage_key)
1246 .unwrap_or_default();
1247 entry.insert((old_storage.value, new_storage.value));
1248 }
1249 hash_map::Entry::Occupied(mut entry) => {
1250 entry.get_mut().0 = old_storage.value;
1251 }
1252 };
1253
1254 reverts
1255 .entry(block_number)
1256 .or_default()
1257 .entry(address)
1258 .or_default()
1259 .1
1260 .push(old_storage.into_storage_entry());
1261 }
1262
1263 Ok((state, reverts))
1264 }
1265
1266 fn populate_bundle_state_hashed(
1271 &self,
1272 account_changeset: Vec<(u64, AccountBeforeTx)>,
1273 storage_changeset: Vec<(BlockNumberAddress, ChangesetEntry)>,
1274 hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1275 hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1276 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1277 let mut state: BundleStateInit = HashMap::default();
1278 let mut reverts: RevertsInit = HashMap::default();
1279
1280 for (block_number, account_before) in account_changeset.into_iter().rev() {
1282 let AccountBeforeTx { info: old_info, address } = account_before;
1283 match state.entry(address) {
1284 hash_map::Entry::Vacant(entry) => {
1285 let hashed_address = keccak256(address);
1286 let new_info =
1287 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1288 entry.insert((old_info, new_info, HashMap::default()));
1289 }
1290 hash_map::Entry::Occupied(mut entry) => {
1291 entry.get_mut().0 = old_info;
1292 }
1293 }
1294 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1295 }
1296
1297 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1299 let BlockNumberAddress((block_number, address)) = block_and_address;
1300 let account_state = match state.entry(address) {
1301 hash_map::Entry::Vacant(entry) => {
1302 let hashed_address = keccak256(address);
1303 let present_info =
1304 hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1305 entry.insert((present_info, present_info, HashMap::default()))
1306 }
1307 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1308 };
1309
1310 let storage_key = old_storage.key.as_b256();
1311 match account_state.2.entry(storage_key) {
1312 hash_map::Entry::Vacant(entry) => {
1313 let hashed_address = keccak256(address);
1314 let new_storage = hashed_storage_cursor
1315 .seek_by_key_subkey(hashed_address, storage_key)?
1316 .filter(|storage| storage.key == storage_key)
1317 .unwrap_or_default();
1318 entry.insert((old_storage.value, new_storage.value));
1319 }
1320 hash_map::Entry::Occupied(mut entry) => {
1321 entry.get_mut().0 = old_storage.value;
1322 }
1323 };
1324
1325 reverts
1326 .entry(block_number)
1327 .or_default()
1328 .entry(address)
1329 .or_default()
1330 .1
1331 .push(old_storage.into_storage_entry());
1332 }
1333
1334 Ok((state, reverts))
1335 }
1336}
1337
1338impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1339 fn append_history_index<P, T>(
1347 &self,
1348 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1349 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1350 ) -> ProviderResult<()>
1351 where
1352 P: Copy,
1353 T: Table<Value = BlockNumberList>,
1354 {
1355 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1358
1359 let mut cursor = self.tx.cursor_write::<T>()?;
1360
1361 for (partial_key, indices) in index_updates {
1362 let last_key = sharded_key_factory(partial_key, u64::MAX);
1363 let mut last_shard = cursor
1364 .seek_exact(last_key.clone())?
1365 .map(|(_, list)| list)
1366 .unwrap_or_else(BlockNumberList::empty);
1367
1368 last_shard.append(indices).map_err(ProviderError::other)?;
1369
1370 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1372 cursor.upsert(last_key, &last_shard)?;
1373 continue;
1374 }
1375
1376 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1378 let mut chunks_peekable = chunks.into_iter().peekable();
1379
1380 while let Some(chunk) = chunks_peekable.next() {
1381 let shard = BlockNumberList::new_pre_sorted(chunk);
1382 let highest_block_number = if chunks_peekable.peek().is_some() {
1383 shard.iter().next_back().expect("`chunks` does not return empty list")
1384 } else {
1385 u64::MAX
1387 };
1388
1389 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1390 }
1391 }
1392
1393 Ok(())
1394 }
1395}
1396
1397impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1398 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1399 if self.cached_storage_settings().use_hashed_state() {
1400 let hashed_address = keccak256(address);
1401 Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1402 } else {
1403 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1404 }
1405 }
1406}
1407
1408impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1409 fn changed_accounts_with_range(
1410 &self,
1411 range: RangeInclusive<BlockNumber>,
1412 ) -> ProviderResult<BTreeSet<Address>> {
1413 let mut reader = EitherReader::new_account_changesets(self)?;
1414
1415 reader.changed_accounts_with_range(range)
1416 }
1417
1418 fn basic_accounts(
1419 &self,
1420 iter: impl IntoIterator<Item = Address>,
1421 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1422 if self.cached_storage_settings().use_hashed_state() {
1423 let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1424 Ok(iter
1425 .into_iter()
1426 .map(|address| {
1427 let hashed_address = keccak256(address);
1428 hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1429 })
1430 .collect::<Result<Vec<_>, _>>()?)
1431 } else {
1432 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1433 Ok(iter
1434 .into_iter()
1435 .map(|address| {
1436 plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1437 })
1438 .collect::<Result<Vec<_>, _>>()?)
1439 }
1440 }
1441
1442 fn changed_accounts_and_blocks_with_range(
1443 &self,
1444 range: RangeInclusive<BlockNumber>,
1445 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1446 let highest_static_block = self
1447 .static_file_provider
1448 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1449
1450 if let Some(highest) = highest_static_block &&
1451 self.cached_storage_settings().storage_v2
1452 {
1453 let start = *range.start();
1454 let static_end = (*range.end()).min(highest);
1455
1456 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1457 if start <= static_end {
1458 for block in start..=static_end {
1459 let block_changesets = self.account_block_changeset(block)?;
1460 for changeset in block_changesets {
1461 changed_accounts_and_blocks
1462 .entry(changeset.address)
1463 .or_default()
1464 .push(block);
1465 }
1466 }
1467 }
1468
1469 Ok(changed_accounts_and_blocks)
1470 } else {
1471 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1472
1473 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1474 BTreeMap::new(),
1475 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1476 let (index, account) = entry?;
1477 accounts.entry(account.address).or_default().push(index);
1478 Ok(accounts)
1479 },
1480 )?;
1481
1482 Ok(account_transitions)
1483 }
1484 }
1485}
1486
1487impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1488 fn storage_changeset(
1489 &self,
1490 block_number: BlockNumber,
1491 ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1492 if self.cached_storage_settings().storage_v2 {
1493 self.static_file_provider.storage_changeset(block_number)
1494 } else {
1495 let range = block_number..=block_number;
1496 let storage_range = BlockNumberAddress::range(range);
1497 self.tx
1498 .cursor_dup_read::<tables::StorageChangeSets>()?
1499 .walk_range(storage_range)?
1500 .map(|r| {
1501 let (bna, entry) = r?;
1502 Ok((
1503 bna,
1504 ChangesetEntry {
1505 key: StorageSlotKey::plain(entry.key),
1506 value: entry.value,
1507 },
1508 ))
1509 })
1510 .collect()
1511 }
1512 }
1513
1514 fn get_storage_before_block(
1515 &self,
1516 block_number: BlockNumber,
1517 address: Address,
1518 storage_key: B256,
1519 ) -> ProviderResult<Option<ChangesetEntry>> {
1520 if self.cached_storage_settings().storage_v2 {
1521 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1522 } else {
1523 Ok(self
1524 .tx
1525 .cursor_dup_read::<tables::StorageChangeSets>()?
1526 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1527 .filter(|entry| entry.key == storage_key)
1528 .map(|entry| ChangesetEntry {
1529 key: StorageSlotKey::plain(entry.key),
1530 value: entry.value,
1531 }))
1532 }
1533 }
1534
1535 fn storage_changesets_range(
1536 &self,
1537 range: impl RangeBounds<BlockNumber>,
1538 ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1539 if self.cached_storage_settings().storage_v2 {
1540 self.static_file_provider.storage_changesets_range(range)
1541 } else {
1542 self.tx
1543 .cursor_dup_read::<tables::StorageChangeSets>()?
1544 .walk_range(BlockNumberAddressRange::from(range))?
1545 .map(|r| {
1546 let (bna, entry) = r?;
1547 Ok((
1548 bna,
1549 ChangesetEntry {
1550 key: StorageSlotKey::plain(entry.key),
1551 value: entry.value,
1552 },
1553 ))
1554 })
1555 .collect()
1556 }
1557 }
1558
1559 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1560 if self.cached_storage_settings().storage_v2 {
1561 self.static_file_provider.storage_changeset_count()
1562 } else {
1563 Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1564 }
1565 }
1566}
1567
1568impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1569 fn account_block_changeset(
1570 &self,
1571 block_number: BlockNumber,
1572 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1573 if self.cached_storage_settings().storage_v2 {
1574 let static_changesets =
1575 self.static_file_provider.account_block_changeset(block_number)?;
1576 Ok(static_changesets)
1577 } else {
1578 let range = block_number..=block_number;
1579 self.tx
1580 .cursor_read::<tables::AccountChangeSets>()?
1581 .walk_range(range)?
1582 .map(|result| -> ProviderResult<_> {
1583 let (_, account_before) = result?;
1584 Ok(account_before)
1585 })
1586 .collect()
1587 }
1588 }
1589
1590 fn get_account_before_block(
1591 &self,
1592 block_number: BlockNumber,
1593 address: Address,
1594 ) -> ProviderResult<Option<AccountBeforeTx>> {
1595 if self.cached_storage_settings().storage_v2 {
1596 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1597 } else {
1598 self.tx
1599 .cursor_dup_read::<tables::AccountChangeSets>()?
1600 .seek_by_key_subkey(block_number, address)?
1601 .filter(|acc| acc.address == address)
1602 .map(Ok)
1603 .transpose()
1604 }
1605 }
1606
1607 fn account_changesets_range(
1608 &self,
1609 range: impl core::ops::RangeBounds<BlockNumber>,
1610 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1611 if self.cached_storage_settings().storage_v2 {
1612 self.static_file_provider.account_changesets_range(range)
1613 } else {
1614 self.tx
1615 .cursor_read::<tables::AccountChangeSets>()?
1616 .walk_range(to_range(range))?
1617 .map(|r| r.map_err(Into::into))
1618 .collect()
1619 }
1620 }
1621
1622 fn account_changeset_count(&self) -> ProviderResult<usize> {
1623 if self.cached_storage_settings().storage_v2 {
1626 self.static_file_provider.account_changeset_count()
1627 } else {
1628 Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1629 }
1630 }
1631}
1632
1633impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1634 for DatabaseProvider<TX, N>
1635{
1636 type Header = HeaderTy<N>;
1637
1638 fn local_tip_header(
1639 &self,
1640 highest_uninterrupted_block: BlockNumber,
1641 ) -> ProviderResult<SealedHeader<Self::Header>> {
1642 let static_file_provider = self.static_file_provider();
1643
1644 let next_static_file_block_num = static_file_provider
1647 .get_highest_static_file_block(StaticFileSegment::Headers)
1648 .map(|id| id + 1)
1649 .unwrap_or_default();
1650 let next_block = highest_uninterrupted_block + 1;
1651
1652 match next_static_file_block_num.cmp(&next_block) {
1653 Ordering::Greater => {
1656 let mut static_file_producer =
1657 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1658 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1659 static_file_producer.commit()?
1662 }
1663 Ordering::Less => {
1664 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1666 }
1667 Ordering::Equal => {}
1668 }
1669
1670 let local_head = static_file_provider
1671 .sealed_header(highest_uninterrupted_block)?
1672 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1673
1674 Ok(local_head)
1675 }
1676}
1677
1678impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1679 type Header = HeaderTy<N>;
1680
1681 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1682 if let Some(num) = self.block_number(block_hash)? {
1683 Ok(self.header_by_number(num)?)
1684 } else {
1685 Ok(None)
1686 }
1687 }
1688
1689 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1690 self.static_file_provider.header_by_number(num)
1691 }
1692
1693 fn headers_range(
1694 &self,
1695 range: impl RangeBounds<BlockNumber>,
1696 ) -> ProviderResult<Vec<Self::Header>> {
1697 self.static_file_provider.headers_range(range)
1698 }
1699
1700 fn sealed_header(
1701 &self,
1702 number: BlockNumber,
1703 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1704 self.static_file_provider.sealed_header(number)
1705 }
1706
1707 fn sealed_headers_while(
1708 &self,
1709 range: impl RangeBounds<BlockNumber>,
1710 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1711 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1712 self.static_file_provider.sealed_headers_while(range, predicate)
1713 }
1714}
1715
1716impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1717 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1718 self.static_file_provider.block_hash(number)
1719 }
1720
1721 fn canonical_hashes_range(
1722 &self,
1723 start: BlockNumber,
1724 end: BlockNumber,
1725 ) -> ProviderResult<Vec<B256>> {
1726 self.static_file_provider.canonical_hashes_range(start, end)
1727 }
1728}
1729
1730impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1731 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1732 let best_number = self.best_block_number()?;
1733 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1734 Ok(ChainInfo { best_hash, best_number })
1735 }
1736
1737 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1738 Ok(self
1741 .get_stage_checkpoint(StageId::Finish)?
1742 .map(|checkpoint| checkpoint.block_number)
1743 .unwrap_or_default())
1744 }
1745
1746 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1747 self.static_file_provider.last_block_number()
1748 }
1749
1750 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1751 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1752 }
1753}
1754
1755impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1756 type Block = BlockTy<N>;
1757
1758 fn find_block_by_hash(
1759 &self,
1760 hash: B256,
1761 source: BlockSource,
1762 ) -> ProviderResult<Option<Self::Block>> {
1763 if source.is_canonical() {
1764 self.block(hash.into())
1765 } else {
1766 Ok(None)
1767 }
1768 }
1769
1770 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1778 if let Some(number) = self.convert_hash_or_number(id)? {
1779 let earliest_available = self.static_file_provider.earliest_history_height();
1780 if number < earliest_available {
1781 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1782 }
1783
1784 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1785
1786 let Some(transactions) = self.transactions_by_block(number.into())? else {
1791 return Ok(None)
1792 };
1793
1794 let body = self
1795 .storage
1796 .reader()
1797 .read_block_bodies(self, vec![(&header, transactions)])?
1798 .pop()
1799 .ok_or(ProviderError::InvalidStorageOutput)?;
1800
1801 return Ok(Some(Self::Block::new(header, body)))
1802 }
1803
1804 Ok(None)
1805 }
1806
1807 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1808 Ok(None)
1809 }
1810
1811 fn pending_block_and_receipts(
1812 &self,
1813 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1814 Ok(None)
1815 }
1816
1817 fn recovered_block(
1826 &self,
1827 id: BlockHashOrNumber,
1828 transaction_kind: TransactionVariant,
1829 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1830 self.recovered_block(
1831 id,
1832 transaction_kind,
1833 |block_number| self.header_by_number(block_number),
1834 |header, body, senders| {
1835 Self::Block::new(header, body)
1836 .try_into_recovered_unchecked(senders)
1840 .map(Some)
1841 .map_err(|_| ProviderError::SenderRecoveryError)
1842 },
1843 )
1844 }
1845
1846 fn sealed_block_with_senders(
1847 &self,
1848 id: BlockHashOrNumber,
1849 transaction_kind: TransactionVariant,
1850 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1851 self.recovered_block(
1852 id,
1853 transaction_kind,
1854 |block_number| self.sealed_header(block_number),
1855 |header, body, senders| {
1856 Self::Block::new_sealed(header, body)
1857 .try_with_senders_unchecked(senders)
1861 .map(Some)
1862 .map_err(|_| ProviderError::SenderRecoveryError)
1863 },
1864 )
1865 }
1866
1867 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1868 self.block_range(
1869 range,
1870 |range| self.headers_range(range),
1871 |header, body, _| Ok(Self::Block::new(header, body)),
1872 )
1873 }
1874
1875 fn block_with_senders_range(
1876 &self,
1877 range: RangeInclusive<BlockNumber>,
1878 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1879 self.block_with_senders_range(
1880 range,
1881 |range| self.headers_range(range),
1882 |header, body, senders| {
1883 Self::Block::new(header, body)
1884 .try_into_recovered_unchecked(senders)
1885 .map_err(|_| ProviderError::SenderRecoveryError)
1886 },
1887 )
1888 }
1889
1890 fn recovered_block_range(
1891 &self,
1892 range: RangeInclusive<BlockNumber>,
1893 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1894 self.block_with_senders_range(
1895 range,
1896 |range| self.sealed_headers_range(range),
1897 |header, body, senders| {
1898 Self::Block::new_sealed(header, body)
1899 .try_with_senders(senders)
1900 .map_err(|_| ProviderError::SenderRecoveryError)
1901 },
1902 )
1903 }
1904
1905 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1906 Ok(self
1907 .tx
1908 .cursor_read::<tables::TransactionBlocks>()?
1909 .seek(id)
1910 .map(|b| b.map(|(_, bn)| bn))?)
1911 }
1912}
1913
1914impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1915 for DatabaseProvider<TX, N>
1916{
1917 fn transaction_hashes_by_range(
1920 &self,
1921 tx_range: Range<TxNumber>,
1922 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1923 self.static_file_provider.transaction_hashes_by_range(tx_range)
1924 }
1925}
1926
1927impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1929 type Transaction = TxTy<N>;
1930
1931 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1932 self.with_rocksdb_tx(|tx_ref| {
1933 let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1934 reader.get_transaction_hash_number(tx_hash)
1935 })
1936 }
1937
1938 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1939 self.static_file_provider.transaction_by_id(id)
1940 }
1941
1942 fn transaction_by_id_unhashed(
1943 &self,
1944 id: TxNumber,
1945 ) -> ProviderResult<Option<Self::Transaction>> {
1946 self.static_file_provider.transaction_by_id_unhashed(id)
1947 }
1948
1949 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1950 if let Some(id) = self.transaction_id(hash)? {
1951 Ok(self.transaction_by_id_unhashed(id)?)
1952 } else {
1953 Ok(None)
1954 }
1955 }
1956
1957 fn transaction_by_hash_with_meta(
1958 &self,
1959 tx_hash: TxHash,
1960 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1961 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1962 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1963 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1964 let Some(sealed_header) = self.sealed_header(block_number)?
1965 {
1966 let (header, block_hash) = sealed_header.split();
1967 if let Some(block_body) = self.block_body_indices(block_number)? {
1968 let index = transaction_id - block_body.first_tx_num();
1973
1974 let meta = TransactionMeta {
1975 tx_hash,
1976 index,
1977 block_hash,
1978 block_number,
1979 base_fee: header.base_fee_per_gas(),
1980 excess_blob_gas: header.excess_blob_gas(),
1981 timestamp: header.timestamp(),
1982 };
1983
1984 return Ok(Some((transaction, meta)))
1985 }
1986 }
1987
1988 Ok(None)
1989 }
1990
1991 fn transactions_by_block(
1992 &self,
1993 id: BlockHashOrNumber,
1994 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1995 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1996 let Some(body) = self.block_body_indices(block_number)?
1997 {
1998 let tx_range = body.tx_num_range();
1999 return if tx_range.is_empty() {
2000 Ok(Some(Vec::new()))
2001 } else {
2002 self.transactions_by_tx_range(tx_range).map(Some)
2003 }
2004 }
2005 Ok(None)
2006 }
2007
2008 fn transactions_by_block_range(
2009 &self,
2010 range: impl RangeBounds<BlockNumber>,
2011 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2012 let range = to_range(range);
2013
2014 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2015 .into_iter()
2016 .map(|body| {
2017 let tx_num_range = body.tx_num_range();
2018 if tx_num_range.is_empty() {
2019 Ok(Vec::new())
2020 } else {
2021 self.transactions_by_tx_range(tx_num_range)
2022 }
2023 })
2024 .collect()
2025 }
2026
2027 fn transactions_by_tx_range(
2028 &self,
2029 range: impl RangeBounds<TxNumber>,
2030 ) -> ProviderResult<Vec<Self::Transaction>> {
2031 self.static_file_provider.transactions_by_tx_range(range)
2032 }
2033
2034 fn senders_by_tx_range(
2035 &self,
2036 range: impl RangeBounds<TxNumber>,
2037 ) -> ProviderResult<Vec<Address>> {
2038 if EitherWriterDestination::senders(self).is_static_file() {
2039 self.static_file_provider.senders_by_tx_range(range)
2040 } else {
2041 self.cursor_read_collect::<tables::TransactionSenders>(range)
2042 }
2043 }
2044
2045 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2046 if EitherWriterDestination::senders(self).is_static_file() {
2047 self.static_file_provider.transaction_sender(id)
2048 } else {
2049 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2050 }
2051 }
2052}
2053
2054impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2055 type Receipt = ReceiptTy<N>;
2056
2057 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2058 self.static_file_provider.get_with_static_file_or_database(
2059 StaticFileSegment::Receipts,
2060 id,
2061 |static_file| static_file.receipt(id),
2062 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2063 )
2064 }
2065
2066 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2067 if let Some(id) = self.transaction_id(hash)? {
2068 self.receipt(id)
2069 } else {
2070 Ok(None)
2071 }
2072 }
2073
2074 fn receipts_by_block(
2075 &self,
2076 block: BlockHashOrNumber,
2077 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2078 if let Some(number) = self.convert_hash_or_number(block)? &&
2079 let Some(body) = self.block_body_indices(number)?
2080 {
2081 let tx_range = body.tx_num_range();
2082 return if tx_range.is_empty() {
2083 Ok(Some(Vec::new()))
2084 } else {
2085 self.receipts_by_tx_range(tx_range).map(Some)
2086 }
2087 }
2088 Ok(None)
2089 }
2090
2091 fn receipts_by_tx_range(
2092 &self,
2093 range: impl RangeBounds<TxNumber>,
2094 ) -> ProviderResult<Vec<Self::Receipt>> {
2095 self.static_file_provider.get_range_with_static_file_or_database(
2096 StaticFileSegment::Receipts,
2097 to_range(range),
2098 |static_file, range, _| static_file.receipts_by_tx_range(range),
2099 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2100 |_| true,
2101 )
2102 }
2103
2104 fn receipts_by_block_range(
2105 &self,
2106 block_range: RangeInclusive<BlockNumber>,
2107 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2108 if block_range.is_empty() {
2109 return Ok(Vec::new());
2110 }
2111
2112 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2114 let mut block_body_indices = Vec::with_capacity(range_len);
2115 for block_num in block_range {
2116 if let Some(indices) = self.block_body_indices(block_num)? {
2117 block_body_indices.push(indices);
2118 } else {
2119 block_body_indices.push(StoredBlockBodyIndices::default());
2121 }
2122 }
2123
2124 if block_body_indices.is_empty() {
2125 return Ok(Vec::new());
2126 }
2127
2128 let non_empty_blocks: Vec<_> =
2130 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2131
2132 if non_empty_blocks.is_empty() {
2133 return Ok(vec![Vec::new(); block_body_indices.len()]);
2135 }
2136
2137 let first_tx = non_empty_blocks[0].first_tx_num();
2139 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2140
2141 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2143 let mut receipts_iter = all_receipts.into_iter();
2144
2145 let mut result = Vec::with_capacity(block_body_indices.len());
2147 for indices in &block_body_indices {
2148 if indices.tx_count == 0 {
2149 result.push(Vec::new());
2150 } else {
2151 let block_receipts =
2152 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2153 result.push(block_receipts);
2154 }
2155 }
2156
2157 Ok(result)
2158 }
2159}
2160
2161impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2162 for DatabaseProvider<TX, N>
2163{
2164 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2165 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2166 }
2167
2168 fn block_body_indices_range(
2169 &self,
2170 range: RangeInclusive<BlockNumber>,
2171 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2172 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2173 }
2174}
2175
2176impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2177 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2178 Ok(if let Some(encoded) = id.get_pre_encoded() {
2179 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2180 } else {
2181 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2182 })
2183 }
2184
2185 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2187 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2188 }
2189
2190 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2191 self.tx
2192 .cursor_read::<tables::StageCheckpoints>()?
2193 .walk(None)?
2194 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2195 .map_err(ProviderError::Database)
2196 }
2197}
2198
2199impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2200 fn save_stage_checkpoint(
2202 &self,
2203 id: StageId,
2204 checkpoint: StageCheckpoint,
2205 ) -> ProviderResult<()> {
2206 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2207 }
2208
2209 fn save_stage_checkpoint_progress(
2211 &self,
2212 id: StageId,
2213 checkpoint: Vec<u8>,
2214 ) -> ProviderResult<()> {
2215 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2216 }
2217
2218 #[instrument(level = "debug", target = "providers::db", skip_all)]
2219 fn update_pipeline_stages(
2220 &self,
2221 block_number: BlockNumber,
2222 drop_stage_checkpoint: bool,
2223 ) -> ProviderResult<()> {
2224 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2226 for stage_id in StageId::ALL {
2227 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2228 cursor.upsert(
2229 stage_id.to_string(),
2230 &StageCheckpoint {
2231 block_number,
2232 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2233 },
2234 )?;
2235 }
2236
2237 Ok(())
2238 }
2239}
2240
2241impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2242 fn plain_state_storages(
2243 &self,
2244 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2245 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2246 if self.cached_storage_settings().use_hashed_state() {
2247 let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2248
2249 addresses_with_keys
2250 .into_iter()
2251 .map(|(address, storage)| {
2252 let hashed_address = keccak256(address);
2253 storage
2254 .into_iter()
2255 .map(|key| -> ProviderResult<_> {
2256 let hashed_key = keccak256(key);
2257 let value = hashed_storage
2258 .seek_by_key_subkey(hashed_address, hashed_key)?
2259 .filter(|v| v.key == hashed_key)
2260 .map(|v| v.value)
2261 .unwrap_or_default();
2262 Ok(StorageEntry { key, value })
2263 })
2264 .collect::<ProviderResult<Vec<_>>>()
2265 .map(|storage| (address, storage))
2266 })
2267 .collect::<ProviderResult<Vec<(_, _)>>>()
2268 } else {
2269 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2270
2271 addresses_with_keys
2272 .into_iter()
2273 .map(|(address, storage)| {
2274 storage
2275 .into_iter()
2276 .map(|key| -> ProviderResult<_> {
2277 Ok(plain_storage
2278 .seek_by_key_subkey(address, key)?
2279 .filter(|v| v.key == key)
2280 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2281 })
2282 .collect::<ProviderResult<Vec<_>>>()
2283 .map(|storage| (address, storage))
2284 })
2285 .collect::<ProviderResult<Vec<(_, _)>>>()
2286 }
2287 }
2288
2289 fn changed_storages_with_range(
2290 &self,
2291 range: RangeInclusive<BlockNumber>,
2292 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2293 if self.cached_storage_settings().storage_v2 {
2294 self.storage_changesets_range(range)?.into_iter().try_fold(
2295 BTreeMap::new(),
2296 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2297 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2298 accounts.entry(address).or_default().insert(storage_entry.key.as_b256());
2299 Ok(accounts)
2300 },
2301 )
2302 } else {
2303 self.tx
2304 .cursor_read::<tables::StorageChangeSets>()?
2305 .walk_range(BlockNumberAddress::range(range))?
2306 .try_fold(
2309 BTreeMap::new(),
2310 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2311 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2312 accounts.entry(address).or_default().insert(storage_entry.key);
2313 Ok(accounts)
2314 },
2315 )
2316 }
2317 }
2318
2319 fn changed_storages_and_blocks_with_range(
2320 &self,
2321 range: RangeInclusive<BlockNumber>,
2322 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2323 if self.cached_storage_settings().storage_v2 {
2324 self.storage_changesets_range(range)?.into_iter().try_fold(
2325 BTreeMap::new(),
2326 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2327 storages
2328 .entry((index.address(), storage.key.as_b256()))
2329 .or_default()
2330 .push(index.block_number());
2331 Ok(storages)
2332 },
2333 )
2334 } else {
2335 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2336
2337 let storage_changeset_lists =
2338 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2339 BTreeMap::new(),
2340 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2341 entry|
2342 -> ProviderResult<_> {
2343 let (index, storage) = entry?;
2344 storages
2345 .entry((index.address(), storage.key))
2346 .or_default()
2347 .push(index.block_number());
2348 Ok(storages)
2349 },
2350 )?;
2351
2352 Ok(storage_changeset_lists)
2353 }
2354 }
2355}
2356
2357impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2358 for DatabaseProvider<TX, N>
2359{
2360 type Receipt = ReceiptTy<N>;
2361
2362 #[instrument(level = "debug", target = "providers::db", skip_all)]
2363 fn write_state<'a>(
2364 &self,
2365 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2366 is_value_known: OriginalValuesKnown,
2367 config: StateWriteConfig,
2368 ) -> ProviderResult<()> {
2369 let execution_outcome = execution_outcome.into();
2370 let first_block = execution_outcome.first_block();
2371
2372 let (plain_state, reverts) =
2373 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2374
2375 self.write_state_reverts(reverts, first_block, config)?;
2376 self.write_state_changes(plain_state)?;
2377
2378 if !config.write_receipts {
2379 return Ok(());
2380 }
2381
2382 let block_count = execution_outcome.len() as u64;
2383 let last_block = execution_outcome.last_block();
2384 let block_range = first_block..=last_block;
2385
2386 let tip = self.last_block_number()?.max(last_block);
2387
2388 let block_indices: Vec<_> = self
2390 .block_body_indices_range(block_range)?
2391 .into_iter()
2392 .map(|b| b.first_tx_num)
2393 .collect();
2394
2395 if block_indices.len() < block_count as usize {
2397 let missing_blocks = block_count - block_indices.len() as u64;
2398 return Err(ProviderError::BlockBodyIndicesNotFound(
2399 last_block.saturating_sub(missing_blocks - 1),
2400 ));
2401 }
2402
2403 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2404
2405 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2406 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2407
2408 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2416 self.static_file_provider()
2417 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2418 .is_none()) &&
2419 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2420
2421 let mut allowed_addresses: AddressSet = AddressSet::default();
2423 for (_, addresses) in contract_log_pruner.range(..first_block) {
2424 allowed_addresses.extend(addresses.iter().copied());
2425 }
2426
2427 for (idx, (receipts, first_tx_index)) in
2428 execution_outcome.receipts().zip(block_indices).enumerate()
2429 {
2430 let block_number = first_block + idx as u64;
2431
2432 receipts_writer.increment_block(block_number)?;
2434
2435 if prunable_receipts &&
2437 self.prune_modes
2438 .receipts
2439 .is_some_and(|mode| mode.should_prune(block_number, tip))
2440 {
2441 continue
2442 }
2443
2444 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2446 allowed_addresses.extend(new_addresses.iter().copied());
2447 }
2448
2449 for (idx, receipt) in receipts.iter().enumerate() {
2450 let receipt_idx = first_tx_index + idx as u64;
2451 if prunable_receipts &&
2454 has_contract_log_filter &&
2455 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2456 {
2457 continue
2458 }
2459
2460 receipts_writer.append_receipt(receipt_idx, receipt)?;
2461 }
2462 }
2463
2464 Ok(())
2465 }
2466
2467 fn write_state_reverts(
2468 &self,
2469 reverts: PlainStateReverts,
2470 first_block: BlockNumber,
2471 config: StateWriteConfig,
2472 ) -> ProviderResult<()> {
2473 let use_hashed_state = self.cached_storage_settings().use_hashed_state();
2474
2475 if config.write_storage_changesets {
2477 tracing::trace!("Writing storage changes");
2478 let mut storages_cursor =
2479 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2480 let mut hashed_storages_cursor =
2481 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2482 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2483 let block_number = first_block + block_index as BlockNumber;
2484
2485 tracing::trace!(block_number, "Writing block change");
2486 storage_changes.par_sort_unstable_by_key(|a| a.address);
2488 let total_changes =
2489 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2490 let mut changeset = Vec::with_capacity(total_changes);
2491 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2492 let mut storage = storage_revert
2493 .into_iter()
2494 .map(|(k, v)| {
2495 (StorageSlotKey::from_u256(k).to_changeset_key(use_hashed_state), v)
2496 })
2497 .collect::<Vec<_>>();
2498 storage.par_sort_unstable_by_key(|a| a.0);
2500
2501 let mut wiped_storage = Vec::new();
2515 if wiped {
2516 tracing::trace!(?address, "Wiping storage");
2517 if use_hashed_state {
2518 let hashed_address = keccak256(address);
2519 if let Some((_, entry)) =
2520 hashed_storages_cursor.seek_exact(hashed_address)?
2521 {
2522 wiped_storage.push((entry.key, entry.value));
2523 while let Some(entry) = hashed_storages_cursor.next_dup_val()? {
2524 wiped_storage.push((entry.key, entry.value))
2525 }
2526 }
2527 } else if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2528 wiped_storage.push((entry.key, entry.value));
2529 while let Some(entry) = storages_cursor.next_dup_val()? {
2530 wiped_storage.push((entry.key, entry.value))
2531 }
2532 }
2533 }
2534
2535 tracing::trace!(?address, ?storage, "Writing storage reverts");
2536 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2537 changeset.push(StorageBeforeTx { address, key, value });
2538 }
2539 }
2540
2541 let mut storage_changesets_writer =
2542 EitherWriter::new_storage_changesets(self, block_number)?;
2543 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2544 }
2545 }
2546
2547 if !config.write_account_changesets {
2548 return Ok(());
2549 }
2550
2551 tracing::trace!(?first_block, "Writing account changes");
2553 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2554 let block_number = first_block + block_index as BlockNumber;
2555 let changeset = account_block_reverts
2556 .into_iter()
2557 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2558 .collect::<Vec<_>>();
2559 let mut account_changesets_writer =
2560 EitherWriter::new_account_changesets(self, block_number)?;
2561
2562 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2563 }
2564
2565 Ok(())
2566 }
2567
2568 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2569 changes.accounts.par_sort_by_key(|a| a.0);
2572 changes.storage.par_sort_by_key(|a| a.address);
2573 changes.contracts.par_sort_by_key(|a| a.0);
2574
2575 if !self.cached_storage_settings().use_hashed_state() {
2579 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2581 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2582 for (address, account) in changes.accounts {
2584 if let Some(account) = account {
2585 tracing::trace!(?address, "Updating plain state account");
2586 accounts_cursor.upsert(address, &account.into())?;
2587 } else if accounts_cursor.seek_exact(address)?.is_some() {
2588 tracing::trace!(?address, "Deleting plain state account");
2589 accounts_cursor.delete_current()?;
2590 }
2591 }
2592
2593 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2595 let mut storages_cursor =
2596 self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2597 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2598 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2600 storages_cursor.delete_current_duplicates()?;
2601 }
2602 let mut storage = storage
2604 .into_iter()
2605 .map(|(k, value)| StorageEntry { key: k.into(), value })
2606 .collect::<Vec<_>>();
2607 storage.par_sort_unstable_by_key(|a| a.key);
2609
2610 for entry in storage {
2611 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2612 if let Some(db_entry) =
2613 storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2614 db_entry.key == entry.key
2615 {
2616 storages_cursor.delete_current()?;
2617 }
2618
2619 if !entry.value.is_zero() {
2620 storages_cursor.upsert(address, &entry)?;
2621 }
2622 }
2623 }
2624 }
2625
2626 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2628 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
2629 for (hash, bytecode) in changes.contracts {
2630 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
2631 }
2632
2633 Ok(())
2634 }
2635
2636 #[instrument(level = "debug", target = "providers::db", skip_all)]
2637 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2638 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2640 for (hashed_address, account) in hashed_state.accounts() {
2641 if let Some(account) = account {
2642 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2643 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2644 hashed_accounts_cursor.delete_current()?;
2645 }
2646 }
2647
2648 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2650 let mut hashed_storage_cursor =
2651 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2652 for (hashed_address, storage) in sorted_storages {
2653 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2654 hashed_storage_cursor.delete_current_duplicates()?;
2655 }
2656
2657 for (hashed_slot, value) in storage.storage_slots_ref() {
2658 let entry = StorageEntry { key: *hashed_slot, value: *value };
2659
2660 if let Some(db_entry) =
2661 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2662 db_entry.key == entry.key
2663 {
2664 hashed_storage_cursor.delete_current()?;
2665 }
2666
2667 if !entry.value.is_zero() {
2668 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2669 }
2670 }
2671 }
2672
2673 Ok(())
2674 }
2675
2676 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2698 let range = block + 1..=self.last_block_number()?;
2699
2700 if range.is_empty() {
2701 return Ok(());
2702 }
2703
2704 let block_bodies = self.block_body_indices_range(range.clone())?;
2706
2707 let from_transaction_num =
2709 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2710
2711 let storage_range = BlockNumberAddress::range(range.clone());
2712 let storage_changeset = if self.cached_storage_settings().storage_v2 {
2713 let changesets = self.storage_changesets_range(range.clone())?;
2714 let mut changeset_writer =
2715 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2716 changeset_writer.prune_storage_changesets(block)?;
2717 changesets
2718 } else {
2719 self.take::<tables::StorageChangeSets>(storage_range)?
2720 .into_iter()
2721 .map(|(k, v)| {
2722 (k, ChangesetEntry { key: StorageSlotKey::plain(v.key), value: v.value })
2723 })
2724 .collect()
2725 };
2726 let account_changeset = if self.cached_storage_settings().storage_v2 {
2727 let changesets = self.account_changesets_range(range)?;
2728 let mut changeset_writer =
2729 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2730 changeset_writer.prune_account_changesets(block)?;
2731 changesets
2732 } else {
2733 self.take::<tables::AccountChangeSets>(range)?
2734 };
2735
2736 if self.cached_storage_settings().use_hashed_state() {
2737 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2738 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2739
2740 let (state, _) = self.populate_bundle_state_hashed(
2741 account_changeset,
2742 storage_changeset,
2743 &mut hashed_accounts_cursor,
2744 &mut hashed_storage_cursor,
2745 )?;
2746
2747 for (address, (old_account, new_account, storage)) in &state {
2748 if old_account != new_account {
2749 let hashed_address = keccak256(address);
2750 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2751 if let Some(account) = old_account {
2752 hashed_accounts_cursor.upsert(hashed_address, account)?;
2753 } else if existing_entry.is_some() {
2754 hashed_accounts_cursor.delete_current()?;
2755 }
2756 }
2757
2758 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2759 let hashed_address = keccak256(address);
2760 let storage_entry =
2761 StorageEntry { key: *storage_key, value: *old_storage_value };
2762 if hashed_storage_cursor
2763 .seek_by_key_subkey(hashed_address, *storage_key)?
2764 .filter(|s| s.key == *storage_key)
2765 .is_some()
2766 {
2767 hashed_storage_cursor.delete_current()?
2768 }
2769
2770 if !old_storage_value.is_zero() {
2771 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2772 }
2773 }
2774 }
2775 } else {
2776 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2781 let mut plain_storage_cursor =
2782 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2783
2784 let (state, _) = self.populate_bundle_state(
2785 account_changeset,
2786 storage_changeset,
2787 &mut plain_accounts_cursor,
2788 &mut plain_storage_cursor,
2789 )?;
2790
2791 for (address, (old_account, new_account, storage)) in &state {
2792 if old_account != new_account {
2793 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2794 if let Some(account) = old_account {
2795 plain_accounts_cursor.upsert(*address, account)?;
2796 } else if existing_entry.is_some() {
2797 plain_accounts_cursor.delete_current()?;
2798 }
2799 }
2800
2801 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2802 let storage_entry =
2803 StorageEntry { key: *storage_key, value: *old_storage_value };
2804 if plain_storage_cursor
2805 .seek_by_key_subkey(*address, *storage_key)?
2806 .filter(|s| s.key == *storage_key)
2807 .is_some()
2808 {
2809 plain_storage_cursor.delete_current()?
2810 }
2811
2812 if !old_storage_value.is_zero() {
2813 plain_storage_cursor.upsert(*address, &storage_entry)?;
2814 }
2815 }
2816 }
2817 }
2818
2819 self.remove_receipts_from(from_transaction_num, block)?;
2820
2821 Ok(())
2822 }
2823
2824 fn take_state_above(
2846 &self,
2847 block: BlockNumber,
2848 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2849 let range = block + 1..=self.last_block_number()?;
2850
2851 if range.is_empty() {
2852 return Ok(ExecutionOutcome::default())
2853 }
2854 let start_block_number = *range.start();
2855
2856 let block_bodies = self.block_body_indices_range(range.clone())?;
2858
2859 let from_transaction_num =
2861 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2862 let to_transaction_num =
2863 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2864
2865 let storage_range = BlockNumberAddress::range(range.clone());
2866 let storage_changeset = if let Some(highest_block) = self
2867 .static_file_provider
2868 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2869 self.cached_storage_settings().storage_v2
2870 {
2871 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2872 let mut changeset_writer =
2873 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2874 changeset_writer.prune_storage_changesets(block)?;
2875 changesets
2876 } else {
2877 self.take::<tables::StorageChangeSets>(storage_range)?
2878 .into_iter()
2879 .map(|(k, v)| {
2880 (k, ChangesetEntry { key: StorageSlotKey::plain(v.key), value: v.value })
2881 })
2882 .collect()
2883 };
2884
2885 let highest_changeset_block = self
2887 .static_file_provider
2888 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2889 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2890 self.cached_storage_settings().storage_v2
2891 {
2892 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2894 let mut changeset_writer =
2895 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2896 changeset_writer.prune_account_changesets(block)?;
2897
2898 changesets
2899 } else {
2900 self.take::<tables::AccountChangeSets>(range)?
2903 };
2904
2905 let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2906 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2907 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2908
2909 let (state, reverts) = self.populate_bundle_state_hashed(
2910 account_changeset,
2911 storage_changeset,
2912 &mut hashed_accounts_cursor,
2913 &mut hashed_storage_cursor,
2914 )?;
2915
2916 for (address, (old_account, new_account, storage)) in &state {
2917 if old_account != new_account {
2918 let hashed_address = keccak256(address);
2919 let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2920 if let Some(account) = old_account {
2921 hashed_accounts_cursor.upsert(hashed_address, account)?;
2922 } else if existing_entry.is_some() {
2923 hashed_accounts_cursor.delete_current()?;
2924 }
2925 }
2926
2927 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2928 let hashed_address = keccak256(address);
2929 let storage_entry =
2930 StorageEntry { key: *storage_key, value: *old_storage_value };
2931 if hashed_storage_cursor
2932 .seek_by_key_subkey(hashed_address, *storage_key)?
2933 .filter(|s| s.key == *storage_key)
2934 .is_some()
2935 {
2936 hashed_storage_cursor.delete_current()?
2937 }
2938
2939 if !old_storage_value.is_zero() {
2940 hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2941 }
2942 }
2943 }
2944
2945 (state, reverts)
2946 } else {
2947 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2952 let mut plain_storage_cursor =
2953 self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2954
2955 let (state, reverts) = self.populate_bundle_state(
2956 account_changeset,
2957 storage_changeset,
2958 &mut plain_accounts_cursor,
2959 &mut plain_storage_cursor,
2960 )?;
2961
2962 for (address, (old_account, new_account, storage)) in &state {
2963 if old_account != new_account {
2964 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2965 if let Some(account) = old_account {
2966 plain_accounts_cursor.upsert(*address, account)?;
2967 } else if existing_entry.is_some() {
2968 plain_accounts_cursor.delete_current()?;
2969 }
2970 }
2971
2972 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2973 let storage_entry =
2974 StorageEntry { key: *storage_key, value: *old_storage_value };
2975 if plain_storage_cursor
2976 .seek_by_key_subkey(*address, *storage_key)?
2977 .filter(|s| s.key == *storage_key)
2978 .is_some()
2979 {
2980 plain_storage_cursor.delete_current()?
2981 }
2982
2983 if !old_storage_value.is_zero() {
2984 plain_storage_cursor.upsert(*address, &storage_entry)?;
2985 }
2986 }
2987 }
2988
2989 (state, reverts)
2990 };
2991
2992 let mut receipts_iter = self
2994 .static_file_provider
2995 .get_range_with_static_file_or_database(
2996 StaticFileSegment::Receipts,
2997 from_transaction_num..to_transaction_num + 1,
2998 |static_file, range, _| {
2999 static_file
3000 .receipts_by_tx_range(range.clone())
3001 .map(|r| range.into_iter().zip(r).collect())
3002 },
3003 |range, _| {
3004 self.tx
3005 .cursor_read::<tables::Receipts<Self::Receipt>>()?
3006 .walk_range(range)?
3007 .map(|r| r.map_err(Into::into))
3008 .collect()
3009 },
3010 |_| true,
3011 )?
3012 .into_iter()
3013 .peekable();
3014
3015 let mut receipts = Vec::with_capacity(block_bodies.len());
3016 for block_body in block_bodies {
3018 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3019 for num in block_body.tx_num_range() {
3020 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3021 block_receipts.push(receipts_iter.next().unwrap().1);
3022 }
3023 }
3024 receipts.push(block_receipts);
3025 }
3026
3027 self.remove_receipts_from(from_transaction_num, block)?;
3028
3029 Ok(ExecutionOutcome::new_init(
3030 state,
3031 reverts,
3032 Vec::new(),
3033 receipts,
3034 start_block_number,
3035 Vec::new(),
3036 ))
3037 }
3038}
3039
3040impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3041 #[instrument(level = "debug", target = "providers::db", skip_all)]
3045 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3046 if trie_updates.is_empty() {
3047 return Ok(0)
3048 }
3049
3050 let mut num_entries = 0;
3052
3053 let tx = self.tx_ref();
3054 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
3055
3056 for (key, updated_node) in trie_updates.account_nodes_ref() {
3058 let nibbles = StoredNibbles(*key);
3059 match updated_node {
3060 Some(node) => {
3061 if !nibbles.0.is_empty() {
3062 num_entries += 1;
3063 account_trie_cursor.upsert(nibbles, node)?;
3064 }
3065 }
3066 None => {
3067 num_entries += 1;
3068 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3069 account_trie_cursor.delete_current()?;
3070 }
3071 }
3072 }
3073 }
3074
3075 num_entries +=
3076 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3077
3078 Ok(num_entries)
3079 }
3080}
3081
3082impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3083 fn write_storage_trie_updates_sorted<'a>(
3089 &self,
3090 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3091 ) -> ProviderResult<usize> {
3092 let mut num_entries = 0;
3093 let mut storage_tries = storage_tries.collect::<Vec<_>>();
3094 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3095 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
3096 for (hashed_address, storage_trie_updates) in storage_tries {
3097 let mut db_storage_trie_cursor =
3098 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3099 num_entries +=
3100 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3101 cursor = db_storage_trie_cursor.cursor;
3102 }
3103
3104 Ok(num_entries)
3105 }
3106}
3107
3108impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3109 fn unwind_account_hashing<'a>(
3110 &self,
3111 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3112 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3113 let hashed_accounts = changesets
3117 .into_iter()
3118 .map(|(_, e)| (keccak256(e.address), e.info))
3119 .collect::<Vec<_>>()
3120 .into_iter()
3121 .rev()
3122 .collect::<BTreeMap<_, _>>();
3123
3124 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3126 for (hashed_address, account) in &hashed_accounts {
3127 if let Some(account) = account {
3128 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3129 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3130 hashed_accounts_cursor.delete_current()?;
3131 }
3132 }
3133
3134 Ok(hashed_accounts)
3135 }
3136
3137 fn unwind_account_hashing_range(
3138 &self,
3139 range: impl RangeBounds<BlockNumber>,
3140 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3141 let changesets = self.account_changesets_range(range)?;
3142 self.unwind_account_hashing(changesets.iter())
3143 }
3144
3145 fn insert_account_for_hashing(
3146 &self,
3147 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3148 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3149 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3150 let hashed_accounts =
3151 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3152 for (hashed_address, account) in &hashed_accounts {
3153 if let Some(account) = account {
3154 hashed_accounts_cursor.upsert(*hashed_address, account)?;
3155 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3156 hashed_accounts_cursor.delete_current()?;
3157 }
3158 }
3159 Ok(hashed_accounts)
3160 }
3161
3162 fn unwind_storage_hashing(
3163 &self,
3164 changesets: impl Iterator<Item = (BlockNumberAddress, ChangesetEntry)>,
3165 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3166 let mut hashed_storages = changesets
3168 .into_iter()
3169 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3170 let hashed_key = storage_entry.key.to_hashed();
3171 (keccak256(address), hashed_key, storage_entry.value)
3172 })
3173 .collect::<Vec<_>>();
3174 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3175
3176 let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3178 B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3179 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3180 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3181 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3182
3183 if hashed_storage
3184 .seek_by_key_subkey(hashed_address, key)?
3185 .filter(|entry| entry.key == key)
3186 .is_some()
3187 {
3188 hashed_storage.delete_current()?;
3189 }
3190
3191 if !value.is_zero() {
3192 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3193 }
3194 }
3195 Ok(hashed_storage_keys)
3196 }
3197
3198 fn unwind_storage_hashing_range(
3199 &self,
3200 range: impl RangeBounds<BlockNumber>,
3201 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3202 let changesets = self.storage_changesets_range(range)?;
3203 self.unwind_storage_hashing(changesets.into_iter())
3204 }
3205
3206 fn insert_storage_for_hashing(
3207 &self,
3208 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3209 ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3210 let hashed_storages =
3212 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3213 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3214 map.insert(keccak256(entry.key), entry.value);
3215 map
3216 });
3217 map.insert(keccak256(address), storage);
3218 map
3219 });
3220
3221 let hashed_storage_keys = hashed_storages
3222 .iter()
3223 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3224 .collect();
3225
3226 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3227 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3230 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3231 if hashed_storage_cursor
3232 .seek_by_key_subkey(hashed_address, key)?
3233 .filter(|entry| entry.key == key)
3234 .is_some()
3235 {
3236 hashed_storage_cursor.delete_current()?;
3237 }
3238
3239 if !value.is_zero() {
3240 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3241 }
3242 Ok(())
3243 })
3244 })?;
3245
3246 Ok(hashed_storage_keys)
3247 }
3248}
3249
3250impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3251 fn unwind_account_history_indices<'a>(
3252 &self,
3253 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3254 ) -> ProviderResult<usize> {
3255 let mut last_indices = changesets
3256 .into_iter()
3257 .map(|(index, account)| (account.address, *index))
3258 .collect::<Vec<_>>();
3259 last_indices.sort_unstable_by_key(|(a, _)| *a);
3260
3261 if self.cached_storage_settings().storage_v2 {
3262 #[cfg(all(unix, feature = "rocksdb"))]
3263 {
3264 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3265 self.pending_rocksdb_batches.lock().push(batch);
3266 }
3267 } else {
3268 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3270 for &(address, rem_index) in &last_indices {
3271 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3272 &mut cursor,
3273 ShardedKey::last(address),
3274 rem_index,
3275 |sharded_key| sharded_key.key == address,
3276 )?;
3277
3278 if !partial_shard.is_empty() {
3281 cursor.insert(
3282 ShardedKey::last(address),
3283 &BlockNumberList::new_pre_sorted(partial_shard),
3284 )?;
3285 }
3286 }
3287 }
3288
3289 let changesets = last_indices.len();
3290 Ok(changesets)
3291 }
3292
3293 fn unwind_account_history_indices_range(
3294 &self,
3295 range: impl RangeBounds<BlockNumber>,
3296 ) -> ProviderResult<usize> {
3297 let changesets = self.account_changesets_range(range)?;
3298 self.unwind_account_history_indices(changesets.iter())
3299 }
3300
3301 fn insert_account_history_index(
3302 &self,
3303 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3304 ) -> ProviderResult<()> {
3305 self.append_history_index::<_, tables::AccountsHistory>(
3306 account_transitions,
3307 ShardedKey::new,
3308 )
3309 }
3310
3311 fn unwind_storage_history_indices(
3312 &self,
3313 changesets: impl Iterator<Item = (BlockNumberAddress, ChangesetEntry)>,
3314 ) -> ProviderResult<usize> {
3315 let mut storage_changesets = changesets
3316 .into_iter()
3317 .map(|(BlockNumberAddress((bn, address)), storage)| {
3318 (address, storage.key.as_b256(), bn)
3319 })
3320 .collect::<Vec<_>>();
3321 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3322
3323 if self.cached_storage_settings().storage_v2 {
3324 #[cfg(all(unix, feature = "rocksdb"))]
3325 {
3326 let batch =
3327 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3328 self.pending_rocksdb_batches.lock().push(batch);
3329 }
3330 } else {
3331 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3333 for &(address, storage_key, rem_index) in &storage_changesets {
3334 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3335 &mut cursor,
3336 StorageShardedKey::last(address, storage_key),
3337 rem_index,
3338 |storage_sharded_key| {
3339 storage_sharded_key.address == address &&
3340 storage_sharded_key.sharded_key.key == storage_key
3341 },
3342 )?;
3343
3344 if !partial_shard.is_empty() {
3347 cursor.insert(
3348 StorageShardedKey::last(address, storage_key),
3349 &BlockNumberList::new_pre_sorted(partial_shard),
3350 )?;
3351 }
3352 }
3353 }
3354
3355 let changesets = storage_changesets.len();
3356 Ok(changesets)
3357 }
3358
3359 fn unwind_storage_history_indices_range(
3360 &self,
3361 range: impl RangeBounds<BlockNumber>,
3362 ) -> ProviderResult<usize> {
3363 let changesets = self.storage_changesets_range(range)?;
3364 self.unwind_storage_history_indices(changesets.into_iter())
3365 }
3366
3367 fn insert_storage_history_index(
3368 &self,
3369 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3370 ) -> ProviderResult<()> {
3371 self.append_history_index::<_, tables::StoragesHistory>(
3372 storage_transitions,
3373 |(address, storage_key), highest_block_number| {
3374 StorageShardedKey::new(address, storage_key, highest_block_number)
3375 },
3376 )
3377 }
3378
3379 #[instrument(level = "debug", target = "providers::db", skip_all)]
3380 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3381 let storage_settings = self.cached_storage_settings();
3382 if !storage_settings.storage_v2 {
3383 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3384 self.insert_account_history_index(indices)?;
3385 }
3386
3387 if !storage_settings.storage_v2 {
3388 let indices = self.changed_storages_and_blocks_with_range(range)?;
3389 self.insert_storage_history_index(indices)?;
3390 }
3391
3392 Ok(())
3393 }
3394}
3395
3396impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3397 for DatabaseProvider<TX, N>
3398{
3399 fn take_block_and_execution_above(
3400 &self,
3401 block: BlockNumber,
3402 ) -> ProviderResult<Chain<Self::Primitives>> {
3403 let range = block + 1..=self.last_block_number()?;
3404
3405 self.unwind_trie_state_from(block + 1)?;
3406
3407 let execution_state = self.take_state_above(block)?;
3409
3410 let blocks = self.recovered_block_range(range)?;
3411
3412 self.remove_blocks_above(block)?;
3415
3416 self.update_pipeline_stages(block, true)?;
3418
3419 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3420 }
3421
3422 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3423 self.unwind_trie_state_from(block + 1)?;
3424
3425 self.remove_state_above(block)?;
3427
3428 self.remove_blocks_above(block)?;
3431
3432 self.update_pipeline_stages(block, true)?;
3434
3435 Ok(())
3436 }
3437}
3438
3439impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3440 for DatabaseProvider<TX, N>
3441{
3442 type Block = BlockTy<N>;
3443 type Receipt = ReceiptTy<N>;
3444
3445 fn insert_block(
3450 &self,
3451 block: &RecoveredBlock<Self::Block>,
3452 ) -> ProviderResult<StoredBlockBodyIndices> {
3453 let block_number = block.number();
3454
3455 let executed_block = ExecutedBlock::new(
3457 Arc::new(block.clone()),
3458 Arc::new(BlockExecutionOutput {
3459 result: BlockExecutionResult {
3460 receipts: Default::default(),
3461 requests: Default::default(),
3462 gas_used: 0,
3463 blob_gas_used: 0,
3464 },
3465 state: Default::default(),
3466 }),
3467 ComputedTrieData::default(),
3468 );
3469
3470 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3472
3473 self.block_body_indices(block_number)?
3475 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3476 }
3477
3478 fn append_block_bodies(
3479 &self,
3480 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3481 ) -> ProviderResult<()> {
3482 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3483
3484 let mut tx_writer =
3486 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3487
3488 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3489 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3490
3491 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3493
3494 for (block_number, body) in &bodies {
3495 tx_writer.increment_block(*block_number)?;
3497
3498 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3499 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3500
3501 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3502
3503 block_indices_cursor.append(*block_number, &block_indices)?;
3505
3506 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3507
3508 let Some(body) = body else { continue };
3509
3510 if !body.transactions().is_empty() {
3512 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3513 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3514 }
3515
3516 for transaction in body.transactions() {
3518 tx_writer.append_transaction(next_tx_num, transaction)?;
3519
3520 next_tx_num += 1;
3522 }
3523 }
3524
3525 self.storage.writer().write_block_bodies(self, bodies)?;
3526
3527 Ok(())
3528 }
3529
3530 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3531 let last_block_number = self.last_block_number()?;
3532 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3534 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3535 }
3536
3537 let highest_static_file_block = self
3539 .static_file_provider()
3540 .get_highest_static_file_block(StaticFileSegment::Headers)
3541 .expect("todo: error handling, headers should exist");
3542
3543 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3549 self.static_file_provider()
3550 .get_writer(block, StaticFileSegment::Headers)?
3551 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3552
3553 let unwind_tx_from = self
3555 .block_body_indices(block)?
3556 .map(|b| b.next_tx_num())
3557 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3558
3559 let unwind_tx_to = self
3561 .tx
3562 .cursor_read::<tables::BlockBodyIndices>()?
3563 .last()?
3564 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3566 .1
3567 .last_tx_num();
3568
3569 if unwind_tx_from <= unwind_tx_to {
3570 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3571 self.with_rocksdb_batch(|batch| {
3572 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3573 for (hash, _) in hashes {
3574 writer.delete_transaction_hash_number(hash)?;
3575 }
3576 Ok(((), writer.into_raw_rocksdb_batch()))
3577 })?;
3578 }
3579
3580 EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3581
3582 self.remove_bodies_above(block)?;
3583
3584 Ok(())
3585 }
3586
3587 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3588 self.storage.writer().remove_block_bodies_above(self, block)?;
3589
3590 let unwind_tx_from = self
3592 .block_body_indices(block)?
3593 .map(|b| b.next_tx_num())
3594 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3595
3596 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3597 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3598
3599 let static_file_tx_num =
3600 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3601
3602 let to_delete = static_file_tx_num
3603 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3604 .unwrap_or_default();
3605
3606 self.static_file_provider
3607 .latest_writer(StaticFileSegment::Transactions)?
3608 .prune_transactions(to_delete, block)?;
3609
3610 Ok(())
3611 }
3612
3613 fn append_blocks_with_state(
3622 &self,
3623 blocks: Vec<RecoveredBlock<Self::Block>>,
3624 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3625 hashed_state: HashedPostStateSorted,
3626 ) -> ProviderResult<()> {
3627 if blocks.is_empty() {
3628 debug!(target: "providers::db", "Attempted to append empty block range");
3629 return Ok(())
3630 }
3631
3632 let first_number = blocks[0].number();
3635
3636 let last_block_number = blocks[blocks.len() - 1].number();
3639
3640 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3641
3642 let use_hashed = self.cached_storage_settings().use_hashed_state();
3647 let (account_transitions, storage_transitions) = {
3648 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3649 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3650 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3651 let block_number = first_number + block_idx as u64;
3652 for (address, account_revert) in block_reverts {
3653 account_transitions.entry(*address).or_default().push(block_number);
3654 for storage_key in account_revert.storage.keys() {
3655 let key =
3656 StorageSlotKey::from_u256(*storage_key).to_changeset_key(use_hashed);
3657 storage_transitions.entry((*address, key)).or_default().push(block_number);
3658 }
3659 }
3660 }
3661 (account_transitions, storage_transitions)
3662 };
3663
3664 for block in blocks {
3666 self.insert_block(&block)?;
3667 durations_recorder.record_relative(metrics::Action::InsertBlock);
3668 }
3669
3670 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3671 durations_recorder.record_relative(metrics::Action::InsertState);
3672
3673 self.write_hashed_state(&hashed_state)?;
3675 durations_recorder.record_relative(metrics::Action::InsertHashes);
3676
3677 let storage_settings = self.cached_storage_settings();
3682 if storage_settings.storage_v2 {
3683 #[cfg(all(unix, feature = "rocksdb"))]
3684 self.with_rocksdb_batch(|mut batch| {
3685 for (address, blocks) in account_transitions {
3686 batch.append_account_history_shard(address, blocks)?;
3687 }
3688 Ok(((), Some(batch.into_inner())))
3689 })?;
3690 } else {
3691 self.insert_account_history_index(account_transitions)?;
3692 }
3693 if storage_settings.storage_v2 {
3694 #[cfg(all(unix, feature = "rocksdb"))]
3695 self.with_rocksdb_batch(|mut batch| {
3696 for ((address, key), blocks) in storage_transitions {
3697 batch.append_storage_history_shard(address, key, blocks)?;
3698 }
3699 Ok(((), Some(batch.into_inner())))
3700 })?;
3701 } else {
3702 self.insert_storage_history_index(storage_transitions)?;
3703 }
3704 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3705
3706 self.update_pipeline_stages(last_block_number, false)?;
3708 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3709
3710 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3711
3712 Ok(())
3713 }
3714}
3715
3716impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3717 fn get_prune_checkpoint(
3718 &self,
3719 segment: PruneSegment,
3720 ) -> ProviderResult<Option<PruneCheckpoint>> {
3721 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3722 }
3723
3724 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3725 Ok(PruneSegment::variants()
3726 .filter_map(|segment| {
3727 self.tx
3728 .get::<tables::PruneCheckpoints>(segment)
3729 .transpose()
3730 .map(|chk| chk.map(|chk| (segment, chk)))
3731 })
3732 .collect::<Result<_, _>>()?)
3733 }
3734}
3735
3736impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3737 fn save_prune_checkpoint(
3738 &self,
3739 segment: PruneSegment,
3740 checkpoint: PruneCheckpoint,
3741 ) -> ProviderResult<()> {
3742 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3743 }
3744}
3745
3746impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3747 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3748 let db_entries = self.tx.entries::<T>()?;
3749 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3750 Ok(entries) => entries,
3751 Err(ProviderError::UnsupportedProvider) => 0,
3752 Err(err) => return Err(err),
3753 };
3754
3755 Ok(db_entries + static_file_entries)
3756 }
3757}
3758
3759impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3760 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3761 let mut finalized_blocks = self
3762 .tx
3763 .cursor_read::<tables::ChainState>()?
3764 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3765 .take(1)
3766 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3767
3768 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3769 Ok(last_finalized_block_number)
3770 }
3771
3772 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3773 let mut finalized_blocks = self
3774 .tx
3775 .cursor_read::<tables::ChainState>()?
3776 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3777 .take(1)
3778 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3779
3780 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3781 Ok(last_finalized_block_number)
3782 }
3783}
3784
3785impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3786 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3787 Ok(self
3788 .tx
3789 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3790 }
3791
3792 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3793 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3794 }
3795}
3796
3797impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3798 type Tx = TX;
3799
3800 fn tx_ref(&self) -> &Self::Tx {
3801 &self.tx
3802 }
3803
3804 fn tx_mut(&mut self) -> &mut Self::Tx {
3805 &mut self.tx
3806 }
3807
3808 fn into_tx(self) -> Self::Tx {
3809 self.tx
3810 }
3811
3812 fn prune_modes_ref(&self) -> &PruneModes {
3813 self.prune_modes_ref()
3814 }
3815
3816 #[instrument(
3818 name = "DatabaseProvider::commit",
3819 level = "debug",
3820 target = "providers::db",
3821 skip_all
3822 )]
3823 fn commit(self) -> ProviderResult<()> {
3824 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3829 self.tx.commit()?;
3830
3831 #[cfg(all(unix, feature = "rocksdb"))]
3832 {
3833 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3834 for batch in batches {
3835 self.rocksdb_provider.commit_batch(batch)?;
3836 }
3837 }
3838
3839 self.static_file_provider.commit()?;
3840 } else {
3841 let mut timings = metrics::CommitTimings::default();
3843
3844 let start = Instant::now();
3845 self.static_file_provider.finalize()?;
3846 timings.sf = start.elapsed();
3847
3848 #[cfg(all(unix, feature = "rocksdb"))]
3849 {
3850 let start = Instant::now();
3851 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3852 for batch in batches {
3853 self.rocksdb_provider.commit_batch(batch)?;
3854 }
3855 timings.rocksdb = start.elapsed();
3856 }
3857
3858 let start = Instant::now();
3859 self.tx.commit()?;
3860 timings.mdbx = start.elapsed();
3861
3862 self.metrics.record_commit(&timings);
3863 }
3864
3865 Ok(())
3866 }
3867}
3868
3869impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3870 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3871 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3872 }
3873}
3874
3875impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3876 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3877 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3878 }
3879}
3880
3881impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3882 fn cached_storage_settings(&self) -> StorageSettings {
3883 *self.storage_settings.read()
3884 }
3885
3886 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3887 *self.storage_settings.write() = settings;
3888 }
3889}
3890
3891#[cfg(test)]
3892mod tests {
3893 use super::*;
3894 use crate::{
3895 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3896 BlockWriter,
3897 };
3898 use alloy_consensus::Header;
3899 use alloy_primitives::{
3900 map::{AddressMap, B256Map},
3901 U256,
3902 };
3903 use reth_chain_state::ExecutedBlock;
3904 use reth_ethereum_primitives::Receipt;
3905 use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3906 use reth_primitives_traits::SealedBlock;
3907 use reth_testing_utils::generators::{self, random_block, BlockParams};
3908 use reth_trie::{HashedPostState, KeccakKeyHasher, Nibbles, StoredNibblesSubKey};
3909 use revm_database::BundleState;
3910 use revm_state::AccountInfo;
3911
3912 #[test]
3913 fn test_receipts_by_block_range_empty_range() {
3914 let factory = create_test_provider_factory();
3915 let provider = factory.provider().unwrap();
3916
3917 let start = 10u64;
3919 let end = 9u64;
3920 let result = provider.receipts_by_block_range(start..=end).unwrap();
3921 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3922 }
3923
3924 #[test]
3925 fn test_receipts_by_block_range_nonexistent_blocks() {
3926 let factory = create_test_provider_factory();
3927 let provider = factory.provider().unwrap();
3928
3929 let result = provider.receipts_by_block_range(10..=12).unwrap();
3931 assert_eq!(result, vec![vec![], vec![], vec![]]);
3932 }
3933
3934 #[test]
3935 fn test_receipts_by_block_range_single_block() {
3936 let factory = create_test_provider_factory();
3937 let data = BlockchainTestData::default();
3938
3939 let provider_rw = factory.provider_rw().unwrap();
3940 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3941 provider_rw
3942 .write_state(
3943 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3944 crate::OriginalValuesKnown::No,
3945 StateWriteConfig::default(),
3946 )
3947 .unwrap();
3948 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3949 provider_rw
3950 .write_state(
3951 &data.blocks[0].1,
3952 crate::OriginalValuesKnown::No,
3953 StateWriteConfig::default(),
3954 )
3955 .unwrap();
3956 provider_rw.commit().unwrap();
3957
3958 let provider = factory.provider().unwrap();
3959 let result = provider.receipts_by_block_range(1..=1).unwrap();
3960
3961 assert_eq!(result.len(), 1);
3963 assert_eq!(result[0].len(), 1);
3964 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3965 }
3966
3967 #[test]
3968 fn test_receipts_by_block_range_multiple_blocks() {
3969 let factory = create_test_provider_factory();
3970 let data = BlockchainTestData::default();
3971
3972 let provider_rw = factory.provider_rw().unwrap();
3973 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3974 provider_rw
3975 .write_state(
3976 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3977 crate::OriginalValuesKnown::No,
3978 StateWriteConfig::default(),
3979 )
3980 .unwrap();
3981 for i in 0..3 {
3982 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3983 provider_rw
3984 .write_state(
3985 &data.blocks[i].1,
3986 crate::OriginalValuesKnown::No,
3987 StateWriteConfig::default(),
3988 )
3989 .unwrap();
3990 }
3991 provider_rw.commit().unwrap();
3992
3993 let provider = factory.provider().unwrap();
3994 let result = provider.receipts_by_block_range(1..=3).unwrap();
3995
3996 assert_eq!(result.len(), 3);
3998 for (i, block_receipts) in result.iter().enumerate() {
3999 assert_eq!(block_receipts.len(), 1);
4000 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4001 }
4002 }
4003
4004 #[test]
4005 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4006 let factory = create_test_provider_factory();
4007 let data = BlockchainTestData::default();
4008
4009 let provider_rw = factory.provider_rw().unwrap();
4010 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4011 provider_rw
4012 .write_state(
4013 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4014 crate::OriginalValuesKnown::No,
4015 StateWriteConfig::default(),
4016 )
4017 .unwrap();
4018
4019 for i in 0..3 {
4021 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4022 provider_rw
4023 .write_state(
4024 &data.blocks[i].1,
4025 crate::OriginalValuesKnown::No,
4026 StateWriteConfig::default(),
4027 )
4028 .unwrap();
4029 }
4030 provider_rw.commit().unwrap();
4031
4032 let provider = factory.provider().unwrap();
4033 let result = provider.receipts_by_block_range(1..=3).unwrap();
4034
4035 assert_eq!(result.len(), 3);
4037 for block_receipts in &result {
4038 assert_eq!(block_receipts.len(), 1);
4039 }
4040 }
4041
4042 #[test]
4043 fn test_receipts_by_block_range_partial_range() {
4044 let factory = create_test_provider_factory();
4045 let data = BlockchainTestData::default();
4046
4047 let provider_rw = factory.provider_rw().unwrap();
4048 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4049 provider_rw
4050 .write_state(
4051 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4052 crate::OriginalValuesKnown::No,
4053 StateWriteConfig::default(),
4054 )
4055 .unwrap();
4056 for i in 0..3 {
4057 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4058 provider_rw
4059 .write_state(
4060 &data.blocks[i].1,
4061 crate::OriginalValuesKnown::No,
4062 StateWriteConfig::default(),
4063 )
4064 .unwrap();
4065 }
4066 provider_rw.commit().unwrap();
4067
4068 let provider = factory.provider().unwrap();
4069
4070 let result = provider.receipts_by_block_range(2..=5).unwrap();
4072 assert_eq!(result.len(), 4);
4073
4074 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4081 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4082 }
4083
4084 #[test]
4085 fn test_receipts_by_block_range_all_empty_blocks() {
4086 let factory = create_test_provider_factory();
4087 let mut rng = generators::rng();
4088
4089 let mut blocks = Vec::new();
4091 for i in 0..3 {
4092 let block =
4093 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4094 blocks.push(block);
4095 }
4096
4097 let provider_rw = factory.provider_rw().unwrap();
4098 for block in blocks {
4099 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4100 }
4101 provider_rw.commit().unwrap();
4102
4103 let provider = factory.provider().unwrap();
4104 let result = provider.receipts_by_block_range(1..=3).unwrap();
4105
4106 assert_eq!(result.len(), 3);
4107 for block_receipts in result {
4108 assert_eq!(block_receipts.len(), 0);
4109 }
4110 }
4111
4112 #[test]
4113 fn test_receipts_by_block_range_consistency_with_individual_calls() {
4114 let factory = create_test_provider_factory();
4115 let data = BlockchainTestData::default();
4116
4117 let provider_rw = factory.provider_rw().unwrap();
4118 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4119 provider_rw
4120 .write_state(
4121 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4122 crate::OriginalValuesKnown::No,
4123 StateWriteConfig::default(),
4124 )
4125 .unwrap();
4126 for i in 0..3 {
4127 provider_rw.insert_block(&data.blocks[i].0).unwrap();
4128 provider_rw
4129 .write_state(
4130 &data.blocks[i].1,
4131 crate::OriginalValuesKnown::No,
4132 StateWriteConfig::default(),
4133 )
4134 .unwrap();
4135 }
4136 provider_rw.commit().unwrap();
4137
4138 let provider = factory.provider().unwrap();
4139
4140 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4142
4143 let mut individual_results = Vec::new();
4145 for block_num in 1..=3 {
4146 let receipts =
4147 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4148 individual_results.push(receipts);
4149 }
4150
4151 assert_eq!(range_result, individual_results);
4152 }
4153
4154 #[test]
4155 fn test_write_trie_updates_sorted() {
4156 use reth_trie::{
4157 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4158 BranchNodeCompact, StorageTrieEntry,
4159 };
4160
4161 let factory = create_test_provider_factory();
4162 let provider_rw = factory.provider_rw().unwrap();
4163
4164 {
4166 let tx = provider_rw.tx_ref();
4167 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4168
4169 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4171 cursor
4172 .upsert(
4173 to_delete,
4174 &BranchNodeCompact::new(
4175 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4179 None,
4180 ),
4181 )
4182 .unwrap();
4183
4184 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4186 cursor
4187 .upsert(
4188 to_update,
4189 &BranchNodeCompact::new(
4190 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4194 None,
4195 ),
4196 )
4197 .unwrap();
4198 }
4199
4200 let storage_address1 = B256::from([1u8; 32]);
4202 let storage_address2 = B256::from([2u8; 32]);
4203 {
4204 let tx = provider_rw.tx_ref();
4205 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4206
4207 storage_cursor
4209 .upsert(
4210 storage_address1,
4211 &StorageTrieEntry {
4212 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4213 node: BranchNodeCompact::new(
4214 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4216 0b0000_0000_0000_0000,
4217 vec![],
4218 None,
4219 ),
4220 },
4221 )
4222 .unwrap();
4223
4224 storage_cursor
4226 .upsert(
4227 storage_address2,
4228 &StorageTrieEntry {
4229 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4230 node: BranchNodeCompact::new(
4231 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4233 0b0000_0000_0000_0000,
4234 vec![],
4235 None,
4236 ),
4237 },
4238 )
4239 .unwrap();
4240 storage_cursor
4241 .upsert(
4242 storage_address2,
4243 &StorageTrieEntry {
4244 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4245 node: BranchNodeCompact::new(
4246 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4248 0b0000_0000_0000_0000,
4249 vec![],
4250 None,
4251 ),
4252 },
4253 )
4254 .unwrap();
4255 }
4256
4257 let account_nodes = vec![
4259 (
4260 Nibbles::from_nibbles([0x1, 0x2]),
4261 Some(BranchNodeCompact::new(
4262 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4266 None,
4267 )),
4268 ),
4269 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4271 Nibbles::from_nibbles([0x5, 0x6]),
4272 Some(BranchNodeCompact::new(
4273 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4277 None,
4278 )),
4279 ),
4280 ];
4281
4282 let storage_trie1 = StorageTrieUpdatesSorted {
4284 is_deleted: false,
4285 storage_nodes: vec![
4286 (
4287 Nibbles::from_nibbles([0x1, 0x0]),
4288 Some(BranchNodeCompact::new(
4289 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4293 None,
4294 )),
4295 ),
4296 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4298 };
4299
4300 let storage_trie2 = StorageTrieUpdatesSorted {
4301 is_deleted: true, storage_nodes: vec![],
4303 };
4304
4305 let mut storage_tries = B256Map::default();
4306 storage_tries.insert(storage_address1, storage_trie1);
4307 storage_tries.insert(storage_address2, storage_trie2);
4308
4309 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4310
4311 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4313
4314 assert_eq!(num_entries, 5);
4317
4318 let tx = provider_rw.tx_ref();
4320 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4321
4322 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4324 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4325 assert!(entry1.is_some(), "Updated account node should exist");
4326 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4327 assert_eq!(
4328 entry1.unwrap().1.state_mask,
4329 expected_mask,
4330 "Account node should have updated state_mask"
4331 );
4332
4333 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4335 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4336 assert!(entry2.is_none(), "Deleted account node should not exist");
4337
4338 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4340 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4341 assert!(entry3.is_some(), "New account node should exist");
4342
4343 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4345
4346 let storage_entries1: Vec<_> = storage_cursor
4348 .walk_dup(Some(storage_address1), None)
4349 .unwrap()
4350 .collect::<Result<Vec<_>, _>>()
4351 .unwrap();
4352 assert_eq!(
4353 storage_entries1.len(),
4354 1,
4355 "Storage address1 should have 1 entry after deletion"
4356 );
4357 assert_eq!(
4358 storage_entries1[0].1.nibbles.0,
4359 Nibbles::from_nibbles([0x1, 0x0]),
4360 "Remaining entry should be [0x1, 0x0]"
4361 );
4362
4363 let storage_entries2: Vec<_> = storage_cursor
4365 .walk_dup(Some(storage_address2), None)
4366 .unwrap()
4367 .collect::<Result<Vec<_>, _>>()
4368 .unwrap();
4369 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4370
4371 provider_rw.commit().unwrap();
4372 }
4373
4374 #[test]
4375 fn test_prunable_receipts_logic() {
4376 let insert_blocks =
4377 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4378 let mut rng = generators::rng();
4379 for block_num in 0..=tip_block {
4380 let block = random_block(
4381 &mut rng,
4382 block_num,
4383 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4384 );
4385 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4386 }
4387 };
4388
4389 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4390 let outcome = ExecutionOutcome {
4391 first_block: block,
4392 receipts: vec![vec![Receipt {
4393 tx_type: Default::default(),
4394 success: true,
4395 cumulative_gas_used: block, logs: vec![],
4397 }]],
4398 ..Default::default()
4399 };
4400 provider_rw
4401 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4402 .unwrap();
4403 provider_rw.commit().unwrap();
4404 };
4405
4406 {
4408 let factory = create_test_provider_factory();
4409 let storage_settings = StorageSettings::v1();
4410 factory.set_storage_settings_cache(storage_settings);
4411 let factory = factory.with_prune_modes(PruneModes {
4412 receipts: Some(PruneMode::Before(100)),
4413 ..Default::default()
4414 });
4415
4416 let tip_block = 200u64;
4417 let first_block = 1u64;
4418
4419 let provider_rw = factory.provider_rw().unwrap();
4421 insert_blocks(&provider_rw, tip_block, 1);
4422 provider_rw.commit().unwrap();
4423
4424 write_receipts(
4425 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4426 first_block,
4427 );
4428 write_receipts(
4429 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4430 tip_block - 1,
4431 );
4432
4433 let provider = factory.provider().unwrap();
4434
4435 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4436 assert!(provider
4437 .receipts_by_block(block.into())
4438 .unwrap()
4439 .is_some_and(|r| r.len() == num_receipts));
4440 }
4441 }
4442
4443 {
4445 let factory = create_test_provider_factory();
4446 let storage_settings = StorageSettings::v2();
4447 factory.set_storage_settings_cache(storage_settings);
4448 let factory = factory.with_prune_modes(PruneModes {
4449 receipts: Some(PruneMode::Before(2)),
4450 ..Default::default()
4451 });
4452
4453 let tip_block = 200u64;
4454
4455 let provider_rw = factory.provider_rw().unwrap();
4457 insert_blocks(&provider_rw, tip_block, 1);
4458 provider_rw.commit().unwrap();
4459
4460 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4462 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4463
4464 assert!(factory
4465 .static_file_provider()
4466 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4467 .is_none(),);
4468 assert!(factory
4469 .static_file_provider()
4470 .get_highest_static_file_block(StaticFileSegment::Receipts)
4471 .is_some_and(|b| b == 1),);
4472
4473 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4476 assert!(factory
4477 .static_file_provider()
4478 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4479 .is_some_and(|num| num == 2),);
4480
4481 let factory = factory.with_prune_modes(PruneModes {
4485 receipts: Some(PruneMode::Before(100)),
4486 ..Default::default()
4487 });
4488 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4489 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4490 write_receipts(provider_rw, 3);
4491
4492 let provider = factory.provider().unwrap();
4497 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4498 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4499 assert!(provider
4500 .receipts_by_block(num.into())
4501 .unwrap()
4502 .is_some_and(|r| r.len() == num_receipts));
4503
4504 let receipt = provider.receipt(num).unwrap();
4505 if num_receipts > 0 {
4506 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4507 } else {
4508 assert!(receipt.is_none());
4509 }
4510 }
4511 }
4512 }
4513
4514 #[test]
4515 fn test_try_into_history_rejects_unexecuted_blocks() {
4516 use reth_storage_api::TryIntoHistoricalStateProvider;
4517
4518 let factory = create_test_provider_factory();
4519
4520 let data = BlockchainTestData::default();
4522 let provider_rw = factory.provider_rw().unwrap();
4523 provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4524 provider_rw
4525 .write_state(
4526 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4527 crate::OriginalValuesKnown::No,
4528 StateWriteConfig::default(),
4529 )
4530 .unwrap();
4531 provider_rw.commit().unwrap();
4532
4533 let provider = factory.provider().unwrap();
4535
4536 let result = provider.try_into_history_at_block(0);
4538 assert!(result.is_ok(), "Block 0 should be available");
4539
4540 let provider = factory.provider().unwrap();
4542 let result = provider.try_into_history_at_block(100);
4543
4544 match result {
4546 Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4547 Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4548 Ok(_) => panic!("Expected error, got Ok"),
4549 }
4550 }
4551
4552 #[test]
4553 fn test_unwind_storage_hashing_with_hashed_state() {
4554 let factory = create_test_provider_factory();
4555 let storage_settings = StorageSettings::v2();
4556 factory.set_storage_settings_cache(storage_settings);
4557
4558 let address = Address::random();
4559 let hashed_address = keccak256(address);
4560
4561 let slot_key_already_hashed = B256::random();
4562
4563 let current_value = U256::from(100);
4564 let old_value = U256::from(42);
4565
4566 let provider_rw = factory.provider_rw().unwrap();
4567 provider_rw
4568 .tx
4569 .cursor_dup_write::<tables::HashedStorages>()
4570 .unwrap()
4571 .upsert(
4572 hashed_address,
4573 &StorageEntry { key: slot_key_already_hashed, value: current_value },
4574 )
4575 .unwrap();
4576
4577 let changesets = vec![(
4578 BlockNumberAddress((1, address)),
4579 ChangesetEntry {
4580 key: StorageSlotKey::Hashed(slot_key_already_hashed),
4581 value: old_value,
4582 },
4583 )];
4584
4585 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4586
4587 assert_eq!(result.len(), 1);
4588 assert!(result.contains_key(&hashed_address));
4589 assert!(result[&hashed_address].contains(&slot_key_already_hashed));
4590
4591 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4592 let entry = cursor
4593 .seek_by_key_subkey(hashed_address, slot_key_already_hashed)
4594 .unwrap()
4595 .expect("entry should exist");
4596 assert_eq!(entry.key, slot_key_already_hashed);
4597 assert_eq!(entry.value, old_value);
4598 }
4599
4600 #[test]
4601 fn test_write_and_remove_state_roundtrip_legacy() {
4602 let factory = create_test_provider_factory();
4603 let storage_settings = StorageSettings::v1();
4604 assert!(!storage_settings.use_hashed_state());
4605 factory.set_storage_settings_cache(storage_settings);
4606
4607 let address = Address::with_last_byte(1);
4608 let hashed_address = keccak256(address);
4609 let slot = U256::from(5);
4610 let slot_key = B256::from(slot);
4611 let hashed_slot = keccak256(slot_key);
4612
4613 let mut rng = generators::rng();
4614 let block0 =
4615 random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4616 let block1 =
4617 random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4618
4619 {
4620 let provider_rw = factory.provider_rw().unwrap();
4621 provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4622 provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4623 provider_rw
4624 .tx
4625 .cursor_write::<tables::PlainAccountState>()
4626 .unwrap()
4627 .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4628 .unwrap();
4629 provider_rw.commit().unwrap();
4630 }
4631
4632 let provider_rw = factory.provider_rw().unwrap();
4633
4634 let mut state_init: BundleStateInit = AddressMap::default();
4635 let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4636 storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4637 state_init.insert(
4638 address,
4639 (
4640 Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4641 Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4642 storage_map,
4643 ),
4644 );
4645
4646 let mut reverts_init: RevertsInit = HashMap::default();
4647 let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4648 block_reverts.insert(
4649 address,
4650 (
4651 Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4652 vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4653 ),
4654 );
4655 reverts_init.insert(1, block_reverts);
4656
4657 let execution_outcome =
4658 ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4659
4660 provider_rw
4661 .write_state(
4662 &execution_outcome,
4663 OriginalValuesKnown::Yes,
4664 StateWriteConfig {
4665 write_receipts: false,
4666 write_account_changesets: true,
4667 write_storage_changesets: true,
4668 },
4669 )
4670 .unwrap();
4671
4672 let hashed_state =
4673 execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4674 provider_rw.write_hashed_state(&hashed_state).unwrap();
4675
4676 let account = provider_rw
4677 .tx
4678 .cursor_read::<tables::PlainAccountState>()
4679 .unwrap()
4680 .seek_exact(address)
4681 .unwrap()
4682 .unwrap()
4683 .1;
4684 assert_eq!(account.nonce, 1);
4685
4686 let storage_entry = provider_rw
4687 .tx
4688 .cursor_dup_read::<tables::PlainStorageState>()
4689 .unwrap()
4690 .seek_by_key_subkey(address, slot_key)
4691 .unwrap()
4692 .unwrap();
4693 assert_eq!(storage_entry.key, slot_key);
4694 assert_eq!(storage_entry.value, U256::from(10));
4695
4696 let hashed_entry = provider_rw
4697 .tx
4698 .cursor_dup_read::<tables::HashedStorages>()
4699 .unwrap()
4700 .seek_by_key_subkey(hashed_address, hashed_slot)
4701 .unwrap()
4702 .unwrap();
4703 assert_eq!(hashed_entry.key, hashed_slot);
4704 assert_eq!(hashed_entry.value, U256::from(10));
4705
4706 let account_cs_entries = provider_rw
4707 .tx
4708 .cursor_dup_read::<tables::AccountChangeSets>()
4709 .unwrap()
4710 .walk(Some(1))
4711 .unwrap()
4712 .collect::<Result<Vec<_>, _>>()
4713 .unwrap();
4714 assert!(!account_cs_entries.is_empty());
4715
4716 let storage_cs_entries = provider_rw
4717 .tx
4718 .cursor_read::<tables::StorageChangeSets>()
4719 .unwrap()
4720 .walk(Some(BlockNumberAddress((1, address))))
4721 .unwrap()
4722 .collect::<Result<Vec<_>, _>>()
4723 .unwrap();
4724 assert!(!storage_cs_entries.is_empty());
4725 assert_eq!(storage_cs_entries[0].1.key, slot_key);
4726
4727 provider_rw.remove_state_above(0).unwrap();
4728
4729 let restored_account = provider_rw
4730 .tx
4731 .cursor_read::<tables::PlainAccountState>()
4732 .unwrap()
4733 .seek_exact(address)
4734 .unwrap()
4735 .unwrap()
4736 .1;
4737 assert_eq!(restored_account.nonce, 0);
4738
4739 let storage_gone = provider_rw
4740 .tx
4741 .cursor_dup_read::<tables::PlainStorageState>()
4742 .unwrap()
4743 .seek_by_key_subkey(address, slot_key)
4744 .unwrap();
4745 assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4746
4747 let account_cs_after = provider_rw
4748 .tx
4749 .cursor_dup_read::<tables::AccountChangeSets>()
4750 .unwrap()
4751 .walk(Some(1))
4752 .unwrap()
4753 .collect::<Result<Vec<_>, _>>()
4754 .unwrap();
4755 assert!(account_cs_after.is_empty());
4756
4757 let storage_cs_after = provider_rw
4758 .tx
4759 .cursor_read::<tables::StorageChangeSets>()
4760 .unwrap()
4761 .walk(Some(BlockNumberAddress((1, address))))
4762 .unwrap()
4763 .collect::<Result<Vec<_>, _>>()
4764 .unwrap();
4765 assert!(storage_cs_after.is_empty());
4766 }
4767
4768 #[test]
4769 fn test_unwind_storage_hashing_legacy() {
4770 let factory = create_test_provider_factory();
4771 let storage_settings = StorageSettings::v1();
4772 assert!(!storage_settings.use_hashed_state());
4773 factory.set_storage_settings_cache(storage_settings);
4774
4775 let address = Address::random();
4776 let hashed_address = keccak256(address);
4777
4778 let plain_slot = B256::random();
4779 let hashed_slot = keccak256(plain_slot);
4780
4781 let current_value = U256::from(100);
4782 let old_value = U256::from(42);
4783
4784 let provider_rw = factory.provider_rw().unwrap();
4785 provider_rw
4786 .tx
4787 .cursor_dup_write::<tables::HashedStorages>()
4788 .unwrap()
4789 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4790 .unwrap();
4791
4792 let changesets = vec![(
4793 BlockNumberAddress((1, address)),
4794 ChangesetEntry { key: StorageSlotKey::Plain(plain_slot), value: old_value },
4795 )];
4796
4797 let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4798
4799 assert_eq!(result.len(), 1);
4800 assert!(result.contains_key(&hashed_address));
4801 assert!(result[&hashed_address].contains(&hashed_slot));
4802
4803 let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4804 let entry = cursor
4805 .seek_by_key_subkey(hashed_address, hashed_slot)
4806 .unwrap()
4807 .expect("entry should exist");
4808 assert_eq!(entry.key, hashed_slot);
4809 assert_eq!(entry.value, old_value);
4810 }
4811
4812 #[test]
4813 fn test_write_state_and_historical_read_hashed() {
4814 use reth_storage_api::StateProvider;
4815 use reth_trie::{HashedPostState, KeccakKeyHasher};
4816 use revm_database::BundleState;
4817 use revm_state::AccountInfo;
4818
4819 let factory = create_test_provider_factory();
4820 factory.set_storage_settings_cache(StorageSettings::v2());
4821
4822 let address = Address::with_last_byte(1);
4823 let slot = U256::from(5);
4824 let slot_key = B256::from(slot);
4825 let hashed_address = keccak256(address);
4826 let hashed_slot = keccak256(slot_key);
4827
4828 {
4829 let sf = factory.static_file_provider();
4830 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4831 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4832 hw.append_header(&h0, &B256::ZERO).unwrap();
4833 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4834 hw.append_header(&h1, &B256::ZERO).unwrap();
4835 hw.commit().unwrap();
4836
4837 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4838 aw.append_account_changeset(vec![], 0).unwrap();
4839 aw.commit().unwrap();
4840
4841 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4842 sw.append_storage_changeset(vec![], 0).unwrap();
4843 sw.commit().unwrap();
4844 }
4845
4846 let provider_rw = factory.provider_rw().unwrap();
4847
4848 let bundle = BundleState::builder(1..=1)
4849 .state_present_account_info(
4850 address,
4851 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4852 )
4853 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4854 .revert_account_info(1, address, Some(None))
4855 .revert_storage(1, address, vec![(slot, U256::ZERO)])
4856 .build();
4857
4858 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4859
4860 provider_rw
4861 .tx
4862 .put::<tables::BlockBodyIndices>(
4863 1,
4864 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4865 )
4866 .unwrap();
4867
4868 provider_rw
4869 .write_state(
4870 &execution_outcome,
4871 OriginalValuesKnown::Yes,
4872 StateWriteConfig {
4873 write_receipts: false,
4874 write_account_changesets: true,
4875 write_storage_changesets: true,
4876 },
4877 )
4878 .unwrap();
4879
4880 let hashed_state =
4881 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4882 provider_rw.write_hashed_state(&hashed_state).unwrap();
4883
4884 let plain_storage_entries = provider_rw
4885 .tx
4886 .cursor_dup_read::<tables::PlainStorageState>()
4887 .unwrap()
4888 .walk(None)
4889 .unwrap()
4890 .collect::<Result<Vec<_>, _>>()
4891 .unwrap();
4892 assert!(plain_storage_entries.is_empty());
4893
4894 let hashed_entry = provider_rw
4895 .tx
4896 .cursor_dup_read::<tables::HashedStorages>()
4897 .unwrap()
4898 .seek_by_key_subkey(hashed_address, hashed_slot)
4899 .unwrap()
4900 .unwrap();
4901 assert_eq!(hashed_entry.key, hashed_slot);
4902 assert_eq!(hashed_entry.value, U256::from(10));
4903
4904 provider_rw.static_file_provider().commit().unwrap();
4905
4906 let sf = factory.static_file_provider();
4907 let storage_cs = sf.storage_changeset(1).unwrap();
4908 assert!(!storage_cs.is_empty());
4909 assert_eq!(storage_cs[0].1.key.as_b256(), hashed_slot);
4910
4911 let account_cs = sf.account_block_changeset(1).unwrap();
4912 assert!(!account_cs.is_empty());
4913 assert_eq!(account_cs[0].address, address);
4914
4915 let historical_value =
4916 HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4917 assert_eq!(historical_value, None);
4918 }
4919
4920 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4921 enum StorageMode {
4922 V1,
4923 V2,
4924 }
4925
4926 fn run_save_blocks_and_verify(mode: StorageMode) {
4927 use alloy_primitives::map::HashMap;
4928
4929 let factory = create_test_provider_factory();
4930
4931 match mode {
4932 StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4933 StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4934 }
4935
4936 let num_blocks = 3u64;
4937 let accounts_per_block = 5usize;
4938 let slots_per_account = 3usize;
4939
4940 let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4941 SealedHeader::new(
4942 Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4943 B256::ZERO,
4944 ),
4945 Default::default(),
4946 );
4947
4948 let genesis_executed = ExecutedBlock::new(
4949 Arc::new(genesis.try_recover().unwrap()),
4950 Arc::new(BlockExecutionOutput {
4951 result: BlockExecutionResult {
4952 receipts: vec![],
4953 requests: Default::default(),
4954 gas_used: 0,
4955 blob_gas_used: 0,
4956 },
4957 state: Default::default(),
4958 }),
4959 ComputedTrieData::default(),
4960 );
4961 let provider_rw = factory.provider_rw().unwrap();
4962 provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
4963 provider_rw.commit().unwrap();
4964
4965 let mut blocks: Vec<ExecutedBlock> = Vec::new();
4966 let mut parent_hash = B256::ZERO;
4967
4968 for block_num in 1..=num_blocks {
4969 let mut builder = BundleState::builder(block_num..=block_num);
4970
4971 for acct_idx in 0..accounts_per_block {
4972 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
4973 let info = AccountInfo {
4974 nonce: block_num,
4975 balance: U256::from(block_num * 100 + acct_idx as u64),
4976 ..Default::default()
4977 };
4978
4979 let storage: HashMap<U256, (U256, U256)> = (1..=slots_per_account as u64)
4980 .map(|s| {
4981 (
4982 U256::from(s + acct_idx as u64 * 100),
4983 (U256::ZERO, U256::from(block_num * 1000 + s)),
4984 )
4985 })
4986 .collect();
4987
4988 let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
4989 .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
4990 .collect();
4991
4992 builder = builder
4993 .state_present_account_info(address, info)
4994 .revert_account_info(block_num, address, Some(None))
4995 .state_storage(address, storage)
4996 .revert_storage(block_num, address, revert_storage);
4997 }
4998
4999 let bundle = builder.build();
5000
5001 let hashed_state =
5002 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5003
5004 let header = Header {
5005 number: block_num,
5006 parent_hash,
5007 difficulty: U256::from(1),
5008 ..Default::default()
5009 };
5010 let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5011 header,
5012 Default::default(),
5013 );
5014 parent_hash = block.hash();
5015
5016 let executed = ExecutedBlock::new(
5017 Arc::new(block.try_recover().unwrap()),
5018 Arc::new(BlockExecutionOutput {
5019 result: BlockExecutionResult {
5020 receipts: vec![],
5021 requests: Default::default(),
5022 gas_used: 0,
5023 blob_gas_used: 0,
5024 },
5025 state: bundle,
5026 }),
5027 ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5028 );
5029 blocks.push(executed);
5030 }
5031
5032 let provider_rw = factory.provider_rw().unwrap();
5033 provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5034 provider_rw.commit().unwrap();
5035
5036 let provider = factory.provider().unwrap();
5037
5038 for block_num in 1..=num_blocks {
5039 for acct_idx in 0..accounts_per_block {
5040 let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5041 let hashed_address = keccak256(address);
5042
5043 let ha_entry = provider
5044 .tx_ref()
5045 .cursor_read::<tables::HashedAccounts>()
5046 .unwrap()
5047 .seek_exact(hashed_address)
5048 .unwrap();
5049 assert!(
5050 ha_entry.is_some(),
5051 "HashedAccounts missing for block {block_num} acct {acct_idx}"
5052 );
5053
5054 for s in 1..=slots_per_account as u64 {
5055 let slot = U256::from(s + acct_idx as u64 * 100);
5056 let slot_key = B256::from(slot);
5057 let hashed_slot = keccak256(slot_key);
5058
5059 let hs_entry = provider
5060 .tx_ref()
5061 .cursor_dup_read::<tables::HashedStorages>()
5062 .unwrap()
5063 .seek_by_key_subkey(hashed_address, hashed_slot)
5064 .unwrap();
5065 assert!(
5066 hs_entry.is_some(),
5067 "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5068 );
5069 let entry = hs_entry.unwrap();
5070 assert_eq!(entry.key, hashed_slot);
5071 assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5072 }
5073 }
5074 }
5075
5076 for block_num in 1..=num_blocks {
5077 let header = provider.header_by_number(block_num).unwrap();
5078 assert!(header.is_some(), "Header missing for block {block_num}");
5079
5080 let indices = provider.block_body_indices(block_num).unwrap();
5081 assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5082 }
5083
5084 let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5085 let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5086
5087 if mode == StorageMode::V2 {
5088 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5089 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5090
5091 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5092 assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5093
5094 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5095 assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5096
5097 provider.static_file_provider().commit().unwrap();
5098 let sf = factory.static_file_provider();
5099
5100 for block_num in 1..=num_blocks {
5101 let account_cs = sf.account_block_changeset(block_num).unwrap();
5102 assert!(
5103 !account_cs.is_empty(),
5104 "v2: static file AccountChangeSets should exist for block {block_num}"
5105 );
5106
5107 let storage_cs = sf.storage_changeset(block_num).unwrap();
5108 assert!(
5109 !storage_cs.is_empty(),
5110 "v2: static file StorageChangeSets should exist for block {block_num}"
5111 );
5112
5113 for (_, entry) in &storage_cs {
5114 assert!(
5115 entry.key.is_hashed(),
5116 "v2: static file storage changeset should have hashed slot keys"
5117 );
5118 }
5119 }
5120
5121 #[cfg(all(unix, feature = "rocksdb"))]
5122 {
5123 let rocksdb = factory.rocksdb_provider();
5124 for block_num in 1..=num_blocks {
5125 for acct_idx in 0..accounts_per_block {
5126 let address =
5127 Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5128 let shards = rocksdb.account_history_shards(address).unwrap();
5129 assert!(
5130 !shards.is_empty(),
5131 "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5132 );
5133
5134 for s in 1..=slots_per_account as u64 {
5135 let slot = U256::from(s + acct_idx as u64 * 100);
5136 let slot_key = B256::from(slot);
5137 let hashed_slot = keccak256(slot_key);
5138
5139 let shards =
5140 rocksdb.storage_history_shards(address, hashed_slot).unwrap();
5141 assert!(
5142 !shards.is_empty(),
5143 "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5144 );
5145 }
5146 }
5147 }
5148 }
5149 } else {
5150 assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5151 assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5152
5153 let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5154 assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5155
5156 let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5157 assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5158
5159 for block_num in 1..=num_blocks {
5160 let storage_entries: Vec<_> = provider
5161 .tx_ref()
5162 .cursor_dup_read::<tables::StorageChangeSets>()
5163 .unwrap()
5164 .walk_range(BlockNumberAddress::range(block_num..=block_num))
5165 .unwrap()
5166 .collect::<Result<Vec<_>, _>>()
5167 .unwrap();
5168 assert!(
5169 !storage_entries.is_empty(),
5170 "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5171 );
5172
5173 for (_, entry) in &storage_entries {
5174 let slot_key = B256::from(entry.key);
5175 assert!(
5176 slot_key != keccak256(slot_key),
5177 "v1: storage changeset keys should be plain (not hashed)"
5178 );
5179 }
5180 }
5181
5182 let mdbx_account_history =
5183 provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5184 assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5185
5186 let mdbx_storage_history =
5187 provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5188 assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5189 }
5190 }
5191
5192 #[test]
5193 fn test_save_blocks_v1_table_assertions() {
5194 run_save_blocks_and_verify(StorageMode::V1);
5195 }
5196
5197 #[test]
5198 fn test_save_blocks_v2_table_assertions() {
5199 run_save_blocks_and_verify(StorageMode::V2);
5200 }
5201
5202 #[test]
5203 fn test_write_and_remove_state_roundtrip_v2() {
5204 let factory = create_test_provider_factory();
5205 let storage_settings = StorageSettings::v2();
5206 assert!(storage_settings.use_hashed_state());
5207 factory.set_storage_settings_cache(storage_settings);
5208
5209 let address = Address::with_last_byte(1);
5210 let hashed_address = keccak256(address);
5211 let slot = U256::from(5);
5212 let slot_key = B256::from(slot);
5213 let hashed_slot = keccak256(slot_key);
5214
5215 {
5216 let sf = factory.static_file_provider();
5217 let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5218 let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5219 hw.append_header(&h0, &B256::ZERO).unwrap();
5220 let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5221 hw.append_header(&h1, &B256::ZERO).unwrap();
5222 hw.commit().unwrap();
5223
5224 let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5225 aw.append_account_changeset(vec![], 0).unwrap();
5226 aw.commit().unwrap();
5227
5228 let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5229 sw.append_storage_changeset(vec![], 0).unwrap();
5230 sw.commit().unwrap();
5231 }
5232
5233 {
5234 let provider_rw = factory.provider_rw().unwrap();
5235 provider_rw
5236 .tx
5237 .put::<tables::BlockBodyIndices>(
5238 0,
5239 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5240 )
5241 .unwrap();
5242 provider_rw
5243 .tx
5244 .put::<tables::BlockBodyIndices>(
5245 1,
5246 StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5247 )
5248 .unwrap();
5249 provider_rw
5250 .tx
5251 .cursor_write::<tables::HashedAccounts>()
5252 .unwrap()
5253 .upsert(
5254 hashed_address,
5255 &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5256 )
5257 .unwrap();
5258 provider_rw.commit().unwrap();
5259 }
5260
5261 let provider_rw = factory.provider_rw().unwrap();
5262
5263 let bundle = BundleState::builder(1..=1)
5264 .state_present_account_info(
5265 address,
5266 AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5267 )
5268 .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5269 .revert_account_info(1, address, Some(None))
5270 .revert_storage(1, address, vec![(slot, U256::ZERO)])
5271 .build();
5272
5273 let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5274
5275 provider_rw
5276 .write_state(
5277 &execution_outcome,
5278 OriginalValuesKnown::Yes,
5279 StateWriteConfig {
5280 write_receipts: false,
5281 write_account_changesets: true,
5282 write_storage_changesets: true,
5283 },
5284 )
5285 .unwrap();
5286
5287 let hashed_state =
5288 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5289 provider_rw.write_hashed_state(&hashed_state).unwrap();
5290
5291 let hashed_account = provider_rw
5292 .tx
5293 .cursor_read::<tables::HashedAccounts>()
5294 .unwrap()
5295 .seek_exact(hashed_address)
5296 .unwrap()
5297 .unwrap()
5298 .1;
5299 assert_eq!(hashed_account.nonce, 1);
5300
5301 let hashed_entry = provider_rw
5302 .tx
5303 .cursor_dup_read::<tables::HashedStorages>()
5304 .unwrap()
5305 .seek_by_key_subkey(hashed_address, hashed_slot)
5306 .unwrap()
5307 .unwrap();
5308 assert_eq!(hashed_entry.key, hashed_slot);
5309 assert_eq!(hashed_entry.value, U256::from(10));
5310
5311 let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5312 assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5313
5314 let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5315 assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5316
5317 provider_rw.static_file_provider().commit().unwrap();
5318
5319 let sf = factory.static_file_provider();
5320 let storage_cs = sf.storage_changeset(1).unwrap();
5321 assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5322 assert_eq!(
5323 storage_cs[0].1.key.as_b256(),
5324 hashed_slot,
5325 "v2: changeset key should be hashed"
5326 );
5327
5328 provider_rw.remove_state_above(0).unwrap();
5329
5330 let restored_account = provider_rw
5331 .tx
5332 .cursor_read::<tables::HashedAccounts>()
5333 .unwrap()
5334 .seek_exact(hashed_address)
5335 .unwrap();
5336 assert!(
5337 restored_account.is_none(),
5338 "v2: account should be removed (didn't exist before block 1)"
5339 );
5340
5341 let storage_gone = provider_rw
5342 .tx
5343 .cursor_dup_read::<tables::HashedStorages>()
5344 .unwrap()
5345 .seek_by_key_subkey(hashed_address, hashed_slot)
5346 .unwrap();
5347 assert!(
5348 storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5349 "v2: storage should be reverted (removed or different key)"
5350 );
5351
5352 let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5353 assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5354
5355 let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5356 assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5357 }
5358
5359 #[test]
5360 fn test_populate_bundle_state_hashed_with_hashed_keys() {
5361 let factory = create_test_provider_factory();
5362 factory.set_storage_settings_cache(StorageSettings::v2());
5363
5364 let address = Address::with_last_byte(1);
5365 let hashed_address = keccak256(address);
5366 let slot_key = B256::from(U256::from(42));
5367 let hashed_slot = keccak256(slot_key);
5368 let current_value = U256::from(100);
5369 let old_value = U256::from(50);
5370
5371 let provider_rw = factory.provider_rw().unwrap();
5372
5373 provider_rw
5374 .tx
5375 .cursor_write::<tables::HashedAccounts>()
5376 .unwrap()
5377 .upsert(hashed_address, &Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None })
5378 .unwrap();
5379 provider_rw
5380 .tx
5381 .cursor_dup_write::<tables::HashedStorages>()
5382 .unwrap()
5383 .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
5384 .unwrap();
5385
5386 let storage_changeset = vec![(
5387 BlockNumberAddress((1, address)),
5388 ChangesetEntry { key: StorageSlotKey::Hashed(hashed_slot), value: old_value },
5389 )];
5390
5391 let account_changeset = vec![(
5392 1u64,
5393 AccountBeforeTx {
5394 address,
5395 info: Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
5396 },
5397 )];
5398
5399 let mut hashed_accounts_cursor =
5400 provider_rw.tx.cursor_read::<tables::HashedAccounts>().unwrap();
5401 let mut hashed_storage_cursor =
5402 provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
5403
5404 let (state, reverts) = provider_rw
5405 .populate_bundle_state_hashed(
5406 account_changeset,
5407 storage_changeset,
5408 &mut hashed_accounts_cursor,
5409 &mut hashed_storage_cursor,
5410 )
5411 .unwrap();
5412
5413 let (_, new_account, storage_map) =
5414 state.get(&address).expect("address should be in state");
5415 assert!(new_account.is_some());
5416 assert_eq!(new_account.unwrap().nonce, 1);
5417
5418 let (old_val, new_val) =
5419 storage_map.get(&hashed_slot).expect("hashed slot should be in storage map");
5420 assert_eq!(*old_val, old_value);
5421 assert_eq!(*new_val, current_value);
5422
5423 let block_reverts = reverts.get(&1).expect("block 1 should have reverts");
5424 let (_, storage_reverts) =
5425 block_reverts.get(&address).expect("address should have reverts");
5426 assert_eq!(storage_reverts.len(), 1);
5427 assert_eq!(storage_reverts[0].key, hashed_slot);
5428 assert_eq!(storage_reverts[0].value, old_value);
5429 }
5430
5431 #[test]
5432 #[cfg(all(unix, feature = "rocksdb"))]
5433 fn test_unwind_storage_history_indices_v2() {
5434 let factory = create_test_provider_factory();
5435 factory.set_storage_settings_cache(StorageSettings::v2());
5436
5437 let address = Address::with_last_byte(1);
5438 let slot_key = B256::from(U256::from(42));
5439 let hashed_slot = keccak256(slot_key);
5440
5441 {
5442 let rocksdb = factory.rocksdb_provider();
5443 let mut batch = rocksdb.batch();
5444 batch.append_storage_history_shard(address, hashed_slot, vec![3u64, 7, 10]).unwrap();
5445 batch.commit().unwrap();
5446
5447 let shards = rocksdb.storage_history_shards(address, hashed_slot).unwrap();
5448 assert!(!shards.is_empty(), "history should be written to rocksdb");
5449 }
5450
5451 let provider_rw = factory.provider_rw().unwrap();
5452
5453 let changesets = vec![
5454 (
5455 BlockNumberAddress((7, address)),
5456 ChangesetEntry { key: StorageSlotKey::Hashed(hashed_slot), value: U256::from(5) },
5457 ),
5458 (
5459 BlockNumberAddress((10, address)),
5460 ChangesetEntry { key: StorageSlotKey::Hashed(hashed_slot), value: U256::from(8) },
5461 ),
5462 ];
5463
5464 let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5465 assert_eq!(count, 2);
5466
5467 provider_rw.commit().unwrap();
5468
5469 let rocksdb = factory.rocksdb_provider();
5470 let shards = rocksdb.storage_history_shards(address, hashed_slot).unwrap();
5471
5472 assert!(
5473 !shards.is_empty(),
5474 "history shards should still exist with block 3 after partial unwind"
5475 );
5476
5477 let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5478 assert!(all_blocks.contains(&3), "block 3 should remain");
5479 assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5480 assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5481 }
5482}