1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15 HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16 OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17 StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
18 StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
19 TransactionsProviderExt, TrieWriter,
20};
21use alloy_consensus::{
22 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
23 BlockHeader, TxReceipt,
24};
25use alloy_eips::BlockHashOrNumber;
26use alloy_primitives::{
27 keccak256,
28 map::{hash_map, B256Map, HashMap, HashSet},
29 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
30};
31use itertools::Itertools;
32use rayon::slice::ParallelSliceMut;
33use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates};
34use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
35use reth_db_api::{
36 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
37 database::Database,
38 models::{
39 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
40 ShardedKey, StoredBlockBodyIndices,
41 },
42 table::Table,
43 tables,
44 transaction::{DbTx, DbTxMut},
45 BlockNumberList, PlainAccountState, PlainStorageState,
46};
47use reth_execution_types::{Chain, ExecutionOutcome};
48use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
49use reth_primitives_traits::{
50 Account, Block as _, BlockBody as _, Bytecode, GotExpected, RecoveredBlock, SealedHeader,
51 StorageEntry,
52};
53use reth_prune_types::{
54 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
55};
56use reth_stages_types::{StageCheckpoint, StageId};
57use reth_static_file_types::StaticFileSegment;
58use reth_storage_api::{
59 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
60 StorageChangeSetReader, TryIntoHistoricalStateProvider,
61};
62use reth_storage_errors::provider::{ProviderResult, RootMismatch};
63use reth_trie::{
64 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
65 updates::{StorageTrieUpdates, TrieUpdates},
66 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
67};
68use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
69use revm_database::states::{
70 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
71};
72use std::{
73 cmp::Ordering,
74 collections::{BTreeMap, BTreeSet},
75 fmt::Debug,
76 ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeInclusive},
77 sync::Arc,
78};
79use tracing::{debug, trace};
80
81pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
83
84#[derive(Debug)]
89pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
90 pub DatabaseProvider<<DB as Database>::TXMut, N>,
91);
92
93impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
94 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
95
96 fn deref(&self) -> &Self::Target {
97 &self.0
98 }
99}
100
101impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
102 fn deref_mut(&mut self) -> &mut Self::Target {
103 &mut self.0
104 }
105}
106
107impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
108 for DatabaseProviderRW<DB, N>
109{
110 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
111 &self.0
112 }
113}
114
115impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
116 pub fn commit(self) -> ProviderResult<bool> {
118 self.0.commit()
119 }
120
121 pub fn into_tx(self) -> <DB as Database>::TXMut {
123 self.0.into_tx()
124 }
125}
126
127impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
128 for DatabaseProvider<<DB as Database>::TXMut, N>
129{
130 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
131 provider.0
132 }
133}
134
135#[derive(Debug)]
138pub struct DatabaseProvider<TX, N: NodeTypes> {
139 tx: TX,
141 chain_spec: Arc<N::ChainSpec>,
143 static_file_provider: StaticFileProvider<N::Primitives>,
145 prune_modes: PruneModes,
147 storage: Arc<N::Storage>,
149}
150
151impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
152 pub const fn prune_modes_ref(&self) -> &PruneModes {
154 &self.prune_modes
155 }
156}
157
158impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
159 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
161 trace!(target: "providers::db", "Returning latest state provider");
162 Box::new(LatestStateProviderRef::new(self))
163 }
164
165 pub fn history_by_block_hash<'a>(
167 &'a self,
168 block_hash: BlockHash,
169 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
170 let mut block_number =
171 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
172 if block_number == self.best_block_number().unwrap_or_default() &&
173 block_number == self.last_block_number().unwrap_or_default()
174 {
175 return Ok(Box::new(LatestStateProviderRef::new(self)))
176 }
177
178 block_number += 1;
180
181 let account_history_prune_checkpoint =
182 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
183 let storage_history_prune_checkpoint =
184 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
185
186 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
187
188 if let Some(prune_checkpoint_block_number) =
191 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
192 {
193 state_provider = state_provider.with_lowest_available_account_history_block_number(
194 prune_checkpoint_block_number + 1,
195 );
196 }
197 if let Some(prune_checkpoint_block_number) =
198 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
199 {
200 state_provider = state_provider.with_lowest_available_storage_history_block_number(
201 prune_checkpoint_block_number + 1,
202 );
203 }
204
205 Ok(Box::new(state_provider))
206 }
207
208 #[cfg(feature = "test-utils")]
209 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
211 self.prune_modes = prune_modes;
212 }
213}
214
215impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
216 type Primitives = N::Primitives;
217}
218
219impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
220 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
222 self.static_file_provider.clone()
223 }
224}
225
226impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
227 for DatabaseProvider<TX, N>
228{
229 type ChainSpec = N::ChainSpec;
230
231 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
232 self.chain_spec.clone()
233 }
234}
235
236impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
237 pub const fn new_rw(
239 tx: TX,
240 chain_spec: Arc<N::ChainSpec>,
241 static_file_provider: StaticFileProvider<N::Primitives>,
242 prune_modes: PruneModes,
243 storage: Arc<N::Storage>,
244 ) -> Self {
245 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
246 }
247}
248
249impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
250 fn as_ref(&self) -> &Self {
251 self
252 }
253}
254
255impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
256 pub fn save_blocks(
258 &self,
259 blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
260 ) -> ProviderResult<()> {
261 if blocks.is_empty() {
262 debug!(target: "providers::db", "Attempted to write empty block range");
263 return Ok(())
264 }
265
266 let first_block = blocks.first().unwrap().recovered_block();
268
269 let last_block = blocks.last().unwrap().recovered_block();
270 let first_number = first_block.number();
271 let last_block_number = last_block.number();
272
273 debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
274
275 for ExecutedBlockWithTrieUpdates {
285 block: ExecutedBlock { recovered_block, execution_output, hashed_state },
286 trie,
287 } in blocks
288 {
289 let block_hash = recovered_block.hash();
290 self.insert_block(Arc::unwrap_or_clone(recovered_block))?;
291
292 self.write_state(&execution_output, OriginalValuesKnown::No)?;
295
296 self.write_hashed_state(&Arc::unwrap_or_clone(hashed_state).into_sorted())?;
298 self.write_trie_updates(
299 trie.as_ref().ok_or(ProviderError::MissingTrieUpdates(block_hash))?,
300 )?;
301 }
302
303 self.update_history_indices(first_number..=last_block_number)?;
305
306 self.update_pipeline_stages(last_block_number, false)?;
308
309 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
310
311 Ok(())
312 }
313
314 pub fn unwind_trie_state_range(
319 &self,
320 range: RangeInclusive<BlockNumber>,
321 ) -> ProviderResult<()> {
322 let changed_accounts = self
323 .tx
324 .cursor_read::<tables::AccountChangeSets>()?
325 .walk_range(range.clone())?
326 .collect::<Result<Vec<_>, _>>()?;
327
328 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
330 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
331 let mut destroyed_accounts = HashSet::default();
332 for (hashed_address, account) in hashed_addresses {
333 account_prefix_set.insert(Nibbles::unpack(hashed_address));
334 if account.is_none() {
335 destroyed_accounts.insert(hashed_address);
336 }
337 }
338
339 self.unwind_account_history_indices(changed_accounts.iter())?;
341 let storage_range = BlockNumberAddress::range(range.clone());
342
343 let changed_storages = self
344 .tx
345 .cursor_read::<tables::StorageChangeSets>()?
346 .walk_range(storage_range)?
347 .collect::<Result<Vec<_>, _>>()?;
348
349 let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
352 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
353 for (hashed_address, hashed_slots) in storage_entries {
354 account_prefix_set.insert(Nibbles::unpack(hashed_address));
355 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
356 for slot in hashed_slots {
357 storage_prefix_set.insert(Nibbles::unpack(slot));
358 }
359 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
360 }
361
362 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
364
365 let prefix_sets = TriePrefixSets {
369 account_prefix_set: account_prefix_set.freeze(),
370 storage_prefix_sets,
371 destroyed_accounts,
372 };
373 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
374 .with_prefix_sets(prefix_sets)
375 .root_with_updates()
376 .map_err(reth_db_api::DatabaseError::from)?;
377
378 let parent_number = range.start().saturating_sub(1);
379 let parent_state_root = self
380 .header_by_number(parent_number)?
381 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
382 .state_root();
383
384 if new_state_root != parent_state_root {
387 let parent_hash = self
388 .block_hash(parent_number)?
389 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
390 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
391 root: GotExpected { got: new_state_root, expected: parent_state_root },
392 block_number: parent_number,
393 block_hash: parent_hash,
394 })))
395 }
396 self.write_trie_updates(&trie_updates)?;
397
398 Ok(())
399 }
400
401 fn remove_receipts_from(
403 &self,
404 from_tx: TxNumber,
405 last_block: BlockNumber,
406 ) -> ProviderResult<()> {
407 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
409
410 if !self.prune_modes.has_receipts_pruning() {
411 let static_file_receipt_num =
412 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
413
414 let to_delete = static_file_receipt_num
415 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
416 .unwrap_or_default();
417
418 self.static_file_provider
419 .latest_writer(StaticFileSegment::Receipts)?
420 .prune_receipts(to_delete, last_block)?;
421 }
422
423 Ok(())
424 }
425}
426
427impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
428 fn try_into_history_at_block(
429 self,
430 mut block_number: BlockNumber,
431 ) -> ProviderResult<StateProviderBox> {
432 if block_number == self.best_block_number().unwrap_or_default() {
435 return Ok(Box::new(LatestStateProvider::new(self)))
436 }
437
438 block_number += 1;
440
441 let account_history_prune_checkpoint =
442 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
443 let storage_history_prune_checkpoint =
444 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
445
446 let mut state_provider = HistoricalStateProvider::new(self, block_number);
447
448 if let Some(prune_checkpoint_block_number) =
451 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
452 {
453 state_provider = state_provider.with_lowest_available_account_history_block_number(
454 prune_checkpoint_block_number + 1,
455 );
456 }
457 if let Some(prune_checkpoint_block_number) =
458 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
459 {
460 state_provider = state_provider.with_lowest_available_storage_history_block_number(
461 prune_checkpoint_block_number + 1,
462 );
463 }
464
465 Ok(Box::new(state_provider))
466 }
467}
468
469fn unwind_history_shards<S, T, C>(
484 cursor: &mut C,
485 start_key: T::Key,
486 block_number: BlockNumber,
487 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
488) -> ProviderResult<Vec<u64>>
489where
490 T: Table<Value = BlockNumberList>,
491 T::Key: AsRef<ShardedKey<S>>,
492 C: DbCursorRO<T> + DbCursorRW<T>,
493{
494 let mut item = cursor.seek_exact(start_key)?;
496 while let Some((sharded_key, list)) = item {
497 if !shard_belongs_to_key(&sharded_key) {
499 break
500 }
501
502 cursor.delete_current()?;
505
506 let first = list.iter().next().expect("List can't be empty");
509
510 if first >= block_number {
513 item = cursor.prev()?;
514 continue
515 }
516 else if block_number <= sharded_key.as_ref().highest_block_number {
519 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
522 }
523 return Ok(list.iter().collect::<Vec<_>>())
526 }
527
528 Ok(Vec::new())
530}
531
532impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
533 pub const fn new(
535 tx: TX,
536 chain_spec: Arc<N::ChainSpec>,
537 static_file_provider: StaticFileProvider<N::Primitives>,
538 prune_modes: PruneModes,
539 storage: Arc<N::Storage>,
540 ) -> Self {
541 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
542 }
543
544 pub fn into_tx(self) -> TX {
546 self.tx
547 }
548
549 pub const fn tx_mut(&mut self) -> &mut TX {
551 &mut self.tx
552 }
553
554 pub const fn tx_ref(&self) -> &TX {
556 &self.tx
557 }
558
559 pub fn chain_spec(&self) -> &N::ChainSpec {
561 &self.chain_spec
562 }
563}
564
565impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
566 fn recovered_block<H, HF, B, BF>(
567 &self,
568 id: BlockHashOrNumber,
569 _transaction_kind: TransactionVariant,
570 header_by_number: HF,
571 construct_block: BF,
572 ) -> ProviderResult<Option<B>>
573 where
574 H: AsRef<HeaderTy<N>>,
575 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
576 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
577 {
578 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
579 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
580
581 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
588
589 let tx_range = body.tx_num_range();
590
591 let (transactions, senders) = if tx_range.is_empty() {
592 (vec![], vec![])
593 } else {
594 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
595 };
596
597 let body = self
598 .storage
599 .reader()
600 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
601 .pop()
602 .ok_or(ProviderError::InvalidStorageOutput)?;
603
604 construct_block(header, body, senders)
605 }
606
607 fn block_range<F, H, HF, R>(
617 &self,
618 range: RangeInclusive<BlockNumber>,
619 headers_range: HF,
620 mut assemble_block: F,
621 ) -> ProviderResult<Vec<R>>
622 where
623 H: AsRef<HeaderTy<N>>,
624 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
625 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
626 {
627 if range.is_empty() {
628 return Ok(Vec::new())
629 }
630
631 let len = range.end().saturating_sub(*range.start()) as usize;
632 let mut blocks = Vec::with_capacity(len);
633
634 let headers = headers_range(range.clone())?;
635
636 let present_headers = self
642 .block_body_indices_range(range)?
643 .into_iter()
644 .map(|b| b.tx_num_range())
645 .zip(headers)
646 .collect::<Vec<_>>();
647
648 let mut inputs = Vec::new();
649 for (tx_range, header) in &present_headers {
650 let transactions = if tx_range.is_empty() {
651 Vec::new()
652 } else {
653 self.transactions_by_tx_range(tx_range.clone())?
654 };
655
656 inputs.push((header.as_ref(), transactions));
657 }
658
659 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
660
661 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
662 blocks.push(assemble_block(header, body, tx_range)?);
663 }
664
665 Ok(blocks)
666 }
667
668 fn block_with_senders_range<H, HF, B, BF>(
679 &self,
680 range: RangeInclusive<BlockNumber>,
681 headers_range: HF,
682 assemble_block: BF,
683 ) -> ProviderResult<Vec<B>>
684 where
685 H: AsRef<HeaderTy<N>>,
686 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
687 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
688 {
689 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
690
691 self.block_range(range, headers_range, |header, body, tx_range| {
692 let senders = if tx_range.is_empty() {
693 Vec::new()
694 } else {
695 let known_senders =
697 senders_cursor
698 .walk_range(tx_range.clone())?
699 .collect::<Result<HashMap<_, _>, _>>()?;
700
701 let mut senders = Vec::with_capacity(body.transactions().len());
702 for (tx_num, tx) in tx_range.zip(body.transactions()) {
703 match known_senders.get(&tx_num) {
704 None => {
705 let sender = tx.recover_signer_unchecked()?;
707 senders.push(sender);
708 }
709 Some(sender) => senders.push(*sender),
710 }
711 }
712
713 senders
714 };
715
716 assemble_block(header, body, senders)
717 })
718 }
719
720 fn populate_bundle_state<A, S>(
724 &self,
725 account_changeset: Vec<(u64, AccountBeforeTx)>,
726 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
727 plain_accounts_cursor: &mut A,
728 plain_storage_cursor: &mut S,
729 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
730 where
731 A: DbCursorRO<PlainAccountState>,
732 S: DbDupCursorRO<PlainStorageState>,
733 {
734 let mut state: BundleStateInit = HashMap::default();
738
739 let mut reverts: RevertsInit = HashMap::default();
745
746 for (block_number, account_before) in account_changeset.into_iter().rev() {
748 let AccountBeforeTx { info: old_info, address } = account_before;
749 match state.entry(address) {
750 hash_map::Entry::Vacant(entry) => {
751 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
752 entry.insert((old_info, new_info, HashMap::default()));
753 }
754 hash_map::Entry::Occupied(mut entry) => {
755 entry.get_mut().0 = old_info;
757 }
758 }
759 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
761 }
762
763 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
765 let BlockNumberAddress((block_number, address)) = block_and_address;
766 let account_state = match state.entry(address) {
768 hash_map::Entry::Vacant(entry) => {
769 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
770 entry.insert((present_info, present_info, HashMap::default()))
771 }
772 hash_map::Entry::Occupied(entry) => entry.into_mut(),
773 };
774
775 match account_state.2.entry(old_storage.key) {
777 hash_map::Entry::Vacant(entry) => {
778 let new_storage = plain_storage_cursor
779 .seek_by_key_subkey(address, old_storage.key)?
780 .filter(|storage| storage.key == old_storage.key)
781 .unwrap_or_default();
782 entry.insert((old_storage.value, new_storage.value));
783 }
784 hash_map::Entry::Occupied(mut entry) => {
785 entry.get_mut().0 = old_storage.value;
786 }
787 };
788
789 reverts
790 .entry(block_number)
791 .or_default()
792 .entry(address)
793 .or_default()
794 .1
795 .push(old_storage);
796 }
797
798 Ok((state, reverts))
799 }
800}
801
802impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
803 fn take_shard<T>(
806 &self,
807 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
808 key: T::Key,
809 ) -> ProviderResult<Vec<u64>>
810 where
811 T: Table<Value = BlockNumberList>,
812 {
813 if let Some((_, list)) = cursor.seek_exact(key)? {
814 cursor.delete_current()?;
816 let list = list.iter().collect::<Vec<_>>();
817 return Ok(list)
818 }
819 Ok(Vec::new())
820 }
821
822 fn append_history_index<P, T>(
830 &self,
831 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
832 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
833 ) -> ProviderResult<()>
834 where
835 P: Copy,
836 T: Table<Value = BlockNumberList>,
837 {
838 let mut cursor = self.tx.cursor_write::<T>()?;
839 for (partial_key, indices) in index_updates {
840 let mut last_shard =
841 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
842 last_shard.extend(indices);
843 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
845 while let Some(list) = chunks.next() {
846 let highest_block_number = if chunks.peek().is_some() {
847 *list.last().expect("`chunks` does not return empty list")
848 } else {
849 u64::MAX
851 };
852 cursor.insert(
853 sharded_key_factory(partial_key, highest_block_number),
854 &BlockNumberList::new_pre_sorted(list.iter().copied()),
855 )?;
856 }
857 }
858 Ok(())
859 }
860}
861
862impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
863 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
864 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
865 }
866}
867
868impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
869 fn changed_accounts_with_range(
870 &self,
871 range: impl RangeBounds<BlockNumber>,
872 ) -> ProviderResult<BTreeSet<Address>> {
873 self.tx
874 .cursor_read::<tables::AccountChangeSets>()?
875 .walk_range(range)?
876 .map(|entry| {
877 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
878 })
879 .collect()
880 }
881
882 fn basic_accounts(
883 &self,
884 iter: impl IntoIterator<Item = Address>,
885 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
886 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
887 Ok(iter
888 .into_iter()
889 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
890 .collect::<Result<Vec<_>, _>>()?)
891 }
892
893 fn changed_accounts_and_blocks_with_range(
894 &self,
895 range: RangeInclusive<BlockNumber>,
896 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
897 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
898
899 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
900 BTreeMap::new(),
901 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
902 let (index, account) = entry?;
903 accounts.entry(account.address).or_default().push(index);
904 Ok(accounts)
905 },
906 )?;
907
908 Ok(account_transitions)
909 }
910}
911
912impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
913 fn storage_changeset(
914 &self,
915 block_number: BlockNumber,
916 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
917 let range = block_number..=block_number;
918 let storage_range = BlockNumberAddress::range(range);
919 self.tx
920 .cursor_dup_read::<tables::StorageChangeSets>()?
921 .walk_range(storage_range)?
922 .map(|result| -> ProviderResult<_> { Ok(result?) })
923 .collect()
924 }
925}
926
927impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
928 fn account_block_changeset(
929 &self,
930 block_number: BlockNumber,
931 ) -> ProviderResult<Vec<AccountBeforeTx>> {
932 let range = block_number..=block_number;
933 self.tx
934 .cursor_read::<tables::AccountChangeSets>()?
935 .walk_range(range)?
936 .map(|result| -> ProviderResult<_> {
937 let (_, account_before) = result?;
938 Ok(account_before)
939 })
940 .collect()
941 }
942
943 fn get_account_before_block(
944 &self,
945 block_number: BlockNumber,
946 address: Address,
947 ) -> ProviderResult<Option<AccountBeforeTx>> {
948 self.tx
949 .cursor_dup_read::<tables::AccountChangeSets>()?
950 .seek_by_key_subkey(block_number, address)?
951 .filter(|acc| acc.address == address)
952 .map(Ok)
953 .transpose()
954 }
955}
956
957impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
958 for DatabaseProvider<TX, N>
959{
960 type Header = HeaderTy<N>;
961
962 fn local_tip_header(
963 &self,
964 highest_uninterrupted_block: BlockNumber,
965 ) -> ProviderResult<SealedHeader<Self::Header>> {
966 let static_file_provider = self.static_file_provider();
967
968 let next_static_file_block_num = static_file_provider
971 .get_highest_static_file_block(StaticFileSegment::Headers)
972 .map(|id| id + 1)
973 .unwrap_or_default();
974 let next_block = highest_uninterrupted_block + 1;
975
976 match next_static_file_block_num.cmp(&next_block) {
977 Ordering::Greater => {
980 let mut static_file_producer =
981 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
982 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
983 static_file_producer.commit()?
986 }
987 Ordering::Less => {
988 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
990 }
991 Ordering::Equal => {}
992 }
993
994 let local_head = static_file_provider
995 .sealed_header(highest_uninterrupted_block)?
996 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
997
998 Ok(local_head)
999 }
1000}
1001
1002impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1003 type Header = HeaderTy<N>;
1004
1005 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1006 if let Some(num) = self.block_number(block_hash)? {
1007 Ok(self.header_by_number(num)?)
1008 } else {
1009 Ok(None)
1010 }
1011 }
1012
1013 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1014 self.static_file_provider.header_by_number(num)
1015 }
1016
1017 fn header_td(&self, block_hash: BlockHash) -> ProviderResult<Option<U256>> {
1018 if let Some(num) = self.block_number(block_hash)? {
1019 self.header_td_by_number(num)
1020 } else {
1021 Ok(None)
1022 }
1023 }
1024
1025 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1026 if self.chain_spec.is_paris_active_at_block(number) &&
1027 let Some(td) = self.chain_spec.final_paris_total_difficulty()
1028 {
1029 return Ok(Some(td))
1032 }
1033
1034 self.static_file_provider.header_td_by_number(number)
1035 }
1036
1037 fn headers_range(
1038 &self,
1039 range: impl RangeBounds<BlockNumber>,
1040 ) -> ProviderResult<Vec<Self::Header>> {
1041 self.static_file_provider.headers_range(range)
1042 }
1043
1044 fn sealed_header(
1045 &self,
1046 number: BlockNumber,
1047 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1048 self.static_file_provider.sealed_header(number)
1049 }
1050
1051 fn sealed_headers_while(
1052 &self,
1053 range: impl RangeBounds<BlockNumber>,
1054 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1055 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1056 self.static_file_provider.sealed_headers_while(range, predicate)
1057 }
1058}
1059
1060impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1061 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1062 self.static_file_provider.block_hash(number)
1063 }
1064
1065 fn canonical_hashes_range(
1066 &self,
1067 start: BlockNumber,
1068 end: BlockNumber,
1069 ) -> ProviderResult<Vec<B256>> {
1070 self.static_file_provider.canonical_hashes_range(start, end)
1071 }
1072}
1073
1074impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1075 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1076 let best_number = self.best_block_number()?;
1077 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1078 Ok(ChainInfo { best_hash, best_number })
1079 }
1080
1081 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1082 Ok(self
1085 .get_stage_checkpoint(StageId::Finish)?
1086 .map(|checkpoint| checkpoint.block_number)
1087 .unwrap_or_default())
1088 }
1089
1090 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1091 self.static_file_provider.last_block_number()
1092 }
1093
1094 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1095 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1096 }
1097}
1098
1099impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1100 type Block = BlockTy<N>;
1101
1102 fn find_block_by_hash(
1103 &self,
1104 hash: B256,
1105 source: BlockSource,
1106 ) -> ProviderResult<Option<Self::Block>> {
1107 if source.is_canonical() {
1108 self.block(hash.into())
1109 } else {
1110 Ok(None)
1111 }
1112 }
1113
1114 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1120 if let Some(number) = self.convert_hash_or_number(id)? &&
1121 let Some(header) = self.header_by_number(number)?
1122 {
1123 let Some(transactions) = self.transactions_by_block(number.into())? else {
1128 return Ok(None)
1129 };
1130
1131 let body = self
1132 .storage
1133 .reader()
1134 .read_block_bodies(self, vec![(&header, transactions)])?
1135 .pop()
1136 .ok_or(ProviderError::InvalidStorageOutput)?;
1137
1138 return Ok(Some(Self::Block::new(header, body)))
1139 }
1140
1141 Ok(None)
1142 }
1143
1144 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1145 Ok(None)
1146 }
1147
1148 fn pending_block_and_receipts(
1149 &self,
1150 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1151 Ok(None)
1152 }
1153
1154 fn recovered_block(
1163 &self,
1164 id: BlockHashOrNumber,
1165 transaction_kind: TransactionVariant,
1166 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1167 self.recovered_block(
1168 id,
1169 transaction_kind,
1170 |block_number| self.header_by_number(block_number),
1171 |header, body, senders| {
1172 Self::Block::new(header, body)
1173 .try_into_recovered_unchecked(senders)
1177 .map(Some)
1178 .map_err(|_| ProviderError::SenderRecoveryError)
1179 },
1180 )
1181 }
1182
1183 fn sealed_block_with_senders(
1184 &self,
1185 id: BlockHashOrNumber,
1186 transaction_kind: TransactionVariant,
1187 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1188 self.recovered_block(
1189 id,
1190 transaction_kind,
1191 |block_number| self.sealed_header(block_number),
1192 |header, body, senders| {
1193 Self::Block::new_sealed(header, body)
1194 .try_with_senders_unchecked(senders)
1198 .map(Some)
1199 .map_err(|_| ProviderError::SenderRecoveryError)
1200 },
1201 )
1202 }
1203
1204 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1205 self.block_range(
1206 range,
1207 |range| self.headers_range(range),
1208 |header, body, _| Ok(Self::Block::new(header, body)),
1209 )
1210 }
1211
1212 fn block_with_senders_range(
1213 &self,
1214 range: RangeInclusive<BlockNumber>,
1215 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1216 self.block_with_senders_range(
1217 range,
1218 |range| self.headers_range(range),
1219 |header, body, senders| {
1220 Self::Block::new(header, body)
1221 .try_into_recovered_unchecked(senders)
1222 .map_err(|_| ProviderError::SenderRecoveryError)
1223 },
1224 )
1225 }
1226
1227 fn recovered_block_range(
1228 &self,
1229 range: RangeInclusive<BlockNumber>,
1230 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1231 self.block_with_senders_range(
1232 range,
1233 |range| self.sealed_headers_range(range),
1234 |header, body, senders| {
1235 Self::Block::new_sealed(header, body)
1236 .try_with_senders(senders)
1237 .map_err(|_| ProviderError::SenderRecoveryError)
1238 },
1239 )
1240 }
1241
1242 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1243 Ok(self
1244 .tx
1245 .cursor_read::<tables::TransactionBlocks>()?
1246 .seek(id)
1247 .map(|b| b.map(|(_, bn)| bn))?)
1248 }
1249}
1250
1251impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1252 for DatabaseProvider<TX, N>
1253{
1254 fn transaction_hashes_by_range(
1257 &self,
1258 tx_range: Range<TxNumber>,
1259 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1260 self.static_file_provider.transaction_hashes_by_range(tx_range)
1261 }
1262}
1263
1264impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1266 type Transaction = TxTy<N>;
1267
1268 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1269 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1270 }
1271
1272 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1273 self.static_file_provider.transaction_by_id(id)
1274 }
1275
1276 fn transaction_by_id_unhashed(
1277 &self,
1278 id: TxNumber,
1279 ) -> ProviderResult<Option<Self::Transaction>> {
1280 self.static_file_provider.transaction_by_id_unhashed(id)
1281 }
1282
1283 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1284 if let Some(id) = self.transaction_id(hash)? {
1285 Ok(self.transaction_by_id_unhashed(id)?)
1286 } else {
1287 Ok(None)
1288 }
1289 }
1290
1291 fn transaction_by_hash_with_meta(
1292 &self,
1293 tx_hash: TxHash,
1294 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1295 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1296 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1297 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1298 let Some(sealed_header) = self.sealed_header(block_number)?
1299 {
1300 let (header, block_hash) = sealed_header.split();
1301 if let Some(block_body) = self.block_body_indices(block_number)? {
1302 let index = transaction_id - block_body.first_tx_num();
1307
1308 let meta = TransactionMeta {
1309 tx_hash,
1310 index,
1311 block_hash,
1312 block_number,
1313 base_fee: header.base_fee_per_gas(),
1314 excess_blob_gas: header.excess_blob_gas(),
1315 timestamp: header.timestamp(),
1316 };
1317
1318 return Ok(Some((transaction, meta)))
1319 }
1320 }
1321
1322 Ok(None)
1323 }
1324
1325 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1326 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1327 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1328 }
1329
1330 fn transactions_by_block(
1331 &self,
1332 id: BlockHashOrNumber,
1333 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1334 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1335 let Some(body) = self.block_body_indices(block_number)?
1336 {
1337 let tx_range = body.tx_num_range();
1338 return if tx_range.is_empty() {
1339 Ok(Some(Vec::new()))
1340 } else {
1341 self.transactions_by_tx_range(tx_range).map(Some)
1342 }
1343 }
1344 Ok(None)
1345 }
1346
1347 fn transactions_by_block_range(
1348 &self,
1349 range: impl RangeBounds<BlockNumber>,
1350 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1351 let range = to_range(range);
1352
1353 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1354 .into_iter()
1355 .map(|body| {
1356 let tx_num_range = body.tx_num_range();
1357 if tx_num_range.is_empty() {
1358 Ok(Vec::new())
1359 } else {
1360 self.transactions_by_tx_range(tx_num_range)
1361 }
1362 })
1363 .collect()
1364 }
1365
1366 fn transactions_by_tx_range(
1367 &self,
1368 range: impl RangeBounds<TxNumber>,
1369 ) -> ProviderResult<Vec<Self::Transaction>> {
1370 self.static_file_provider.transactions_by_tx_range(range)
1371 }
1372
1373 fn senders_by_tx_range(
1374 &self,
1375 range: impl RangeBounds<TxNumber>,
1376 ) -> ProviderResult<Vec<Address>> {
1377 self.cursor_read_collect::<tables::TransactionSenders>(range)
1378 }
1379
1380 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1381 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1382 }
1383}
1384
1385impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1386 type Receipt = ReceiptTy<N>;
1387
1388 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1389 self.static_file_provider.get_with_static_file_or_database(
1390 StaticFileSegment::Receipts,
1391 id,
1392 |static_file| static_file.receipt(id),
1393 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1394 )
1395 }
1396
1397 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1398 if let Some(id) = self.transaction_id(hash)? {
1399 self.receipt(id)
1400 } else {
1401 Ok(None)
1402 }
1403 }
1404
1405 fn receipts_by_block(
1406 &self,
1407 block: BlockHashOrNumber,
1408 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1409 if let Some(number) = self.convert_hash_or_number(block)? &&
1410 let Some(body) = self.block_body_indices(number)?
1411 {
1412 let tx_range = body.tx_num_range();
1413 return if tx_range.is_empty() {
1414 Ok(Some(Vec::new()))
1415 } else {
1416 self.receipts_by_tx_range(tx_range).map(Some)
1417 }
1418 }
1419 Ok(None)
1420 }
1421
1422 fn receipts_by_tx_range(
1423 &self,
1424 range: impl RangeBounds<TxNumber>,
1425 ) -> ProviderResult<Vec<Self::Receipt>> {
1426 self.static_file_provider.get_range_with_static_file_or_database(
1427 StaticFileSegment::Receipts,
1428 to_range(range),
1429 |static_file, range, _| static_file.receipts_by_tx_range(range),
1430 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1431 |_| true,
1432 )
1433 }
1434
1435 fn receipts_by_block_range(
1436 &self,
1437 block_range: RangeInclusive<BlockNumber>,
1438 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1439 if block_range.is_empty() {
1440 return Ok(Vec::new());
1441 }
1442
1443 let mut block_body_indices = Vec::new();
1445 for block_num in block_range {
1446 if let Some(indices) = self.block_body_indices(block_num)? {
1447 block_body_indices.push(indices);
1448 } else {
1449 block_body_indices.push(StoredBlockBodyIndices::default());
1451 }
1452 }
1453
1454 if block_body_indices.is_empty() {
1455 return Ok(Vec::new());
1456 }
1457
1458 let non_empty_blocks: Vec<_> =
1460 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1461
1462 if non_empty_blocks.is_empty() {
1463 return Ok(vec![Vec::new(); block_body_indices.len()]);
1465 }
1466
1467 let first_tx = non_empty_blocks[0].first_tx_num();
1469 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1470
1471 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1473 let mut receipts_iter = all_receipts.into_iter();
1474
1475 let mut result = Vec::with_capacity(block_body_indices.len());
1477 for indices in &block_body_indices {
1478 if indices.tx_count == 0 {
1479 result.push(Vec::new());
1480 } else {
1481 let block_receipts =
1482 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1483 result.push(block_receipts);
1484 }
1485 }
1486
1487 Ok(result)
1488 }
1489}
1490
1491impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1492 for DatabaseProvider<TX, N>
1493{
1494 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1495 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1496 }
1497
1498 fn block_body_indices_range(
1499 &self,
1500 range: RangeInclusive<BlockNumber>,
1501 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1502 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1503 }
1504}
1505
1506impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1507 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1508 Ok(if let Some(encoded) = id.get_pre_encoded() {
1509 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1510 } else {
1511 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1512 })
1513 }
1514
1515 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1517 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1518 }
1519
1520 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1521 self.tx
1522 .cursor_read::<tables::StageCheckpoints>()?
1523 .walk(None)?
1524 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1525 .map_err(ProviderError::Database)
1526 }
1527}
1528
1529impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1530 fn save_stage_checkpoint(
1532 &self,
1533 id: StageId,
1534 checkpoint: StageCheckpoint,
1535 ) -> ProviderResult<()> {
1536 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1537 }
1538
1539 fn save_stage_checkpoint_progress(
1541 &self,
1542 id: StageId,
1543 checkpoint: Vec<u8>,
1544 ) -> ProviderResult<()> {
1545 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1546 }
1547
1548 fn update_pipeline_stages(
1549 &self,
1550 block_number: BlockNumber,
1551 drop_stage_checkpoint: bool,
1552 ) -> ProviderResult<()> {
1553 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1555 for stage_id in StageId::ALL {
1556 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1557 cursor.upsert(
1558 stage_id.to_string(),
1559 &StageCheckpoint {
1560 block_number,
1561 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1562 },
1563 )?;
1564 }
1565
1566 Ok(())
1567 }
1568}
1569
1570impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1571 fn plain_state_storages(
1572 &self,
1573 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1574 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1575 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1576
1577 addresses_with_keys
1578 .into_iter()
1579 .map(|(address, storage)| {
1580 storage
1581 .into_iter()
1582 .map(|key| -> ProviderResult<_> {
1583 Ok(plain_storage
1584 .seek_by_key_subkey(address, key)?
1585 .filter(|v| v.key == key)
1586 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1587 })
1588 .collect::<ProviderResult<Vec<_>>>()
1589 .map(|storage| (address, storage))
1590 })
1591 .collect::<ProviderResult<Vec<(_, _)>>>()
1592 }
1593
1594 fn changed_storages_with_range(
1595 &self,
1596 range: RangeInclusive<BlockNumber>,
1597 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1598 self.tx
1599 .cursor_read::<tables::StorageChangeSets>()?
1600 .walk_range(BlockNumberAddress::range(range))?
1601 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1604 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1605 accounts.entry(address).or_default().insert(storage_entry.key);
1606 Ok(accounts)
1607 })
1608 }
1609
1610 fn changed_storages_and_blocks_with_range(
1611 &self,
1612 range: RangeInclusive<BlockNumber>,
1613 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1614 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1615
1616 let storage_changeset_lists =
1617 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1618 BTreeMap::new(),
1619 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1620 let (index, storage) = entry?;
1621 storages
1622 .entry((index.address(), storage.key))
1623 .or_default()
1624 .push(index.block_number());
1625 Ok(storages)
1626 },
1627 )?;
1628
1629 Ok(storage_changeset_lists)
1630 }
1631}
1632
1633impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1634 for DatabaseProvider<TX, N>
1635{
1636 type Receipt = ReceiptTy<N>;
1637
1638 fn write_state(
1639 &self,
1640 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1641 is_value_known: OriginalValuesKnown,
1642 ) -> ProviderResult<()> {
1643 let first_block = execution_outcome.first_block();
1644 let block_count = execution_outcome.len() as u64;
1645 let last_block = execution_outcome.last_block();
1646 let block_range = first_block..=last_block;
1647
1648 let tip = self.last_block_number()?.max(last_block);
1649
1650 let (plain_state, reverts) =
1651 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1652
1653 self.write_state_reverts(reverts, first_block)?;
1654 self.write_state_changes(plain_state)?;
1655
1656 let block_indices: Vec<_> = self
1658 .block_body_indices_range(block_range)?
1659 .into_iter()
1660 .map(|b| b.first_tx_num)
1661 .collect();
1662
1663 if block_indices.len() < block_count as usize {
1665 let missing_blocks = block_count - block_indices.len() as u64;
1666 return Err(ProviderError::BlockBodyIndicesNotFound(
1667 last_block.saturating_sub(missing_blocks - 1),
1668 ));
1669 }
1670
1671 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1672
1673 let mut receipts_cursor = self.tx.cursor_write::<tables::Receipts<Self::Receipt>>()?;
1678
1679 let mut receipts_static_writer = has_receipts_pruning
1683 .not()
1684 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1685 .transpose()?;
1686
1687 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1688 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1689
1690 let prunable_receipts =
1693 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1694
1695 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1697 for (_, addresses) in contract_log_pruner.range(..first_block) {
1698 allowed_addresses.extend(addresses.iter().copied());
1699 }
1700
1701 for (idx, (receipts, first_tx_index)) in
1702 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1703 {
1704 let block_number = first_block + idx as u64;
1705
1706 if let Some(writer) = receipts_static_writer.as_mut() {
1708 writer.increment_block(block_number)?;
1709 }
1710
1711 if prunable_receipts &&
1713 self.prune_modes
1714 .receipts
1715 .is_some_and(|mode| mode.should_prune(block_number, tip))
1716 {
1717 continue
1718 }
1719
1720 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1722 allowed_addresses.extend(new_addresses.iter().copied());
1723 }
1724
1725 for (idx, receipt) in receipts.iter().enumerate() {
1726 let receipt_idx = first_tx_index + idx as u64;
1727 if prunable_receipts &&
1730 has_contract_log_filter &&
1731 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1732 {
1733 continue
1734 }
1735
1736 if let Some(writer) = &mut receipts_static_writer {
1737 writer.append_receipt(receipt_idx, receipt)?;
1738 }
1739
1740 receipts_cursor.append(receipt_idx, receipt)?;
1741 }
1742 }
1743
1744 Ok(())
1745 }
1746
1747 fn write_state_reverts(
1748 &self,
1749 reverts: PlainStateReverts,
1750 first_block: BlockNumber,
1751 ) -> ProviderResult<()> {
1752 tracing::trace!("Writing storage changes");
1754 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1755 let mut storage_changeset_cursor =
1756 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1757 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1758 let block_number = first_block + block_index as BlockNumber;
1759
1760 tracing::trace!(block_number, "Writing block change");
1761 storage_changes.par_sort_unstable_by_key(|a| a.address);
1763 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1764 let storage_id = BlockNumberAddress((block_number, address));
1765
1766 let mut storage = storage_revert
1767 .into_iter()
1768 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1769 .collect::<Vec<_>>();
1770 storage.par_sort_unstable_by_key(|a| a.0);
1772
1773 let mut wiped_storage = Vec::new();
1777 if wiped {
1778 tracing::trace!(?address, "Wiping storage");
1779 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1780 wiped_storage.push((entry.key, entry.value));
1781 while let Some(entry) = storages_cursor.next_dup_val()? {
1782 wiped_storage.push((entry.key, entry.value))
1783 }
1784 }
1785 }
1786
1787 tracing::trace!(?address, ?storage, "Writing storage reverts");
1788 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1789 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1790 }
1791 }
1792 }
1793
1794 tracing::trace!("Writing account changes");
1796 let mut account_changeset_cursor =
1797 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1798
1799 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1800 let block_number = first_block + block_index as BlockNumber;
1801 account_block_reverts.par_sort_by_key(|a| a.0);
1803
1804 for (address, info) in account_block_reverts {
1805 account_changeset_cursor.append_dup(
1806 block_number,
1807 AccountBeforeTx { address, info: info.map(Into::into) },
1808 )?;
1809 }
1810 }
1811
1812 Ok(())
1813 }
1814
1815 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1816 changes.accounts.par_sort_by_key(|a| a.0);
1819 changes.storage.par_sort_by_key(|a| a.address);
1820 changes.contracts.par_sort_by_key(|a| a.0);
1821
1822 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1824 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1825 for (address, account) in changes.accounts {
1827 if let Some(account) = account {
1828 tracing::trace!(?address, "Updating plain state account");
1829 accounts_cursor.upsert(address, &account.into())?;
1830 } else if accounts_cursor.seek_exact(address)?.is_some() {
1831 tracing::trace!(?address, "Deleting plain state account");
1832 accounts_cursor.delete_current()?;
1833 }
1834 }
1835
1836 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1838 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1839 for (hash, bytecode) in changes.contracts {
1840 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1841 }
1842
1843 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1845 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1846 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1847 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1849 storages_cursor.delete_current_duplicates()?;
1850 }
1851 let mut storage = storage
1853 .into_iter()
1854 .map(|(k, value)| StorageEntry { key: k.into(), value })
1855 .collect::<Vec<_>>();
1856 storage.par_sort_unstable_by_key(|a| a.key);
1858
1859 for entry in storage {
1860 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1861 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1862 db_entry.key == entry.key
1863 {
1864 storages_cursor.delete_current()?;
1865 }
1866
1867 if !entry.value.is_zero() {
1868 storages_cursor.upsert(address, &entry)?;
1869 }
1870 }
1871 }
1872
1873 Ok(())
1874 }
1875
1876 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1877 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1879 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
1880 if let Some(account) = account {
1881 hashed_accounts_cursor.upsert(hashed_address, &account)?;
1882 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
1883 hashed_accounts_cursor.delete_current()?;
1884 }
1885 }
1886
1887 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1889 let mut hashed_storage_cursor =
1890 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1891 for (hashed_address, storage) in sorted_storages {
1892 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1893 hashed_storage_cursor.delete_current_duplicates()?;
1894 }
1895
1896 for (hashed_slot, value) in storage.storage_slots_sorted() {
1897 let entry = StorageEntry { key: hashed_slot, value };
1898 if let Some(db_entry) =
1899 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1900 db_entry.key == entry.key
1901 {
1902 hashed_storage_cursor.delete_current()?;
1903 }
1904
1905 if !entry.value.is_zero() {
1906 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1907 }
1908 }
1909 }
1910
1911 Ok(())
1912 }
1913
1914 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1936 let range = block + 1..=self.last_block_number()?;
1937
1938 if range.is_empty() {
1939 return Ok(());
1940 }
1941
1942 let block_bodies = self.block_body_indices_range(range.clone())?;
1944
1945 let from_transaction_num =
1947 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1948
1949 let storage_range = BlockNumberAddress::range(range.clone());
1950
1951 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1952 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1953
1954 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1959 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1960
1961 let (state, _) = self.populate_bundle_state(
1962 account_changeset,
1963 storage_changeset,
1964 &mut plain_accounts_cursor,
1965 &mut plain_storage_cursor,
1966 )?;
1967
1968 for (address, (old_account, new_account, storage)) in &state {
1970 if old_account != new_account {
1972 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1973 if let Some(account) = old_account {
1974 plain_accounts_cursor.upsert(*address, account)?;
1975 } else if existing_entry.is_some() {
1976 plain_accounts_cursor.delete_current()?;
1977 }
1978 }
1979
1980 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1982 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1983 if plain_storage_cursor
1986 .seek_by_key_subkey(*address, *storage_key)?
1987 .filter(|s| s.key == *storage_key)
1988 .is_some()
1989 {
1990 plain_storage_cursor.delete_current()?
1991 }
1992
1993 if !old_storage_value.is_zero() {
1995 plain_storage_cursor.upsert(*address, &storage_entry)?;
1996 }
1997 }
1998 }
1999
2000 self.remove_receipts_from(from_transaction_num, block)?;
2001
2002 Ok(())
2003 }
2004
2005 fn take_state_above(
2027 &self,
2028 block: BlockNumber,
2029 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2030 let range = block + 1..=self.last_block_number()?;
2031
2032 if range.is_empty() {
2033 return Ok(ExecutionOutcome::default())
2034 }
2035 let start_block_number = *range.start();
2036
2037 let block_bodies = self.block_body_indices_range(range.clone())?;
2039
2040 let from_transaction_num =
2042 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2043 let to_transaction_num =
2044 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2045
2046 let storage_range = BlockNumberAddress::range(range.clone());
2047
2048 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2049 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2050
2051 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2056 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2057
2058 let (state, reverts) = self.populate_bundle_state(
2061 account_changeset,
2062 storage_changeset,
2063 &mut plain_accounts_cursor,
2064 &mut plain_storage_cursor,
2065 )?;
2066
2067 for (address, (old_account, new_account, storage)) in &state {
2069 if old_account != new_account {
2071 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2072 if let Some(account) = old_account {
2073 plain_accounts_cursor.upsert(*address, account)?;
2074 } else if existing_entry.is_some() {
2075 plain_accounts_cursor.delete_current()?;
2076 }
2077 }
2078
2079 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2081 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2082 if plain_storage_cursor
2085 .seek_by_key_subkey(*address, *storage_key)?
2086 .filter(|s| s.key == *storage_key)
2087 .is_some()
2088 {
2089 plain_storage_cursor.delete_current()?
2090 }
2091
2092 if !old_storage_value.is_zero() {
2094 plain_storage_cursor.upsert(*address, &storage_entry)?;
2095 }
2096 }
2097 }
2098
2099 let mut receipts_iter = self
2101 .static_file_provider
2102 .get_range_with_static_file_or_database(
2103 StaticFileSegment::Receipts,
2104 from_transaction_num..to_transaction_num + 1,
2105 |static_file, range, _| {
2106 static_file
2107 .receipts_by_tx_range(range.clone())
2108 .map(|r| range.into_iter().zip(r).collect())
2109 },
2110 |range, _| {
2111 self.tx
2112 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2113 .walk_range(range)?
2114 .map(|r| r.map_err(Into::into))
2115 .collect()
2116 },
2117 |_| true,
2118 )?
2119 .into_iter()
2120 .peekable();
2121
2122 let mut receipts = Vec::with_capacity(block_bodies.len());
2123 for block_body in block_bodies {
2125 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2126 for num in block_body.tx_num_range() {
2127 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2128 block_receipts.push(receipts_iter.next().unwrap().1);
2129 }
2130 }
2131 receipts.push(block_receipts);
2132 }
2133
2134 self.remove_receipts_from(from_transaction_num, block)?;
2135
2136 Ok(ExecutionOutcome::new_init(
2137 state,
2138 reverts,
2139 Vec::new(),
2140 receipts,
2141 start_block_number,
2142 Vec::new(),
2143 ))
2144 }
2145}
2146
2147impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2148 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2150 if trie_updates.is_empty() {
2151 return Ok(0)
2152 }
2153
2154 let mut num_entries = 0;
2156
2157 let mut account_updates = trie_updates
2159 .removed_nodes_ref()
2160 .iter()
2161 .filter_map(|n| {
2162 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2163 })
2164 .collect::<Vec<_>>();
2165 account_updates.extend(
2166 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2167 );
2168 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2170
2171 let tx = self.tx_ref();
2172 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2173 for (key, updated_node) in account_updates {
2174 let nibbles = StoredNibbles(*key);
2175 match updated_node {
2176 Some(node) => {
2177 if !nibbles.0.is_empty() {
2178 num_entries += 1;
2179 account_trie_cursor.upsert(nibbles, node)?;
2180 }
2181 }
2182 None => {
2183 num_entries += 1;
2184 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2185 account_trie_cursor.delete_current()?;
2186 }
2187 }
2188 }
2189 }
2190
2191 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref().iter())?;
2192
2193 Ok(num_entries)
2194 }
2195}
2196
2197impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2198 fn write_storage_trie_updates<'a>(
2201 &self,
2202 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdates)>,
2203 ) -> ProviderResult<usize> {
2204 let mut num_entries = 0;
2205 let mut storage_tries = storage_tries.collect::<Vec<_>>();
2206 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2207 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2208 for (hashed_address, storage_trie_updates) in storage_tries {
2209 let mut db_storage_trie_cursor =
2210 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2211 num_entries +=
2212 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2213 cursor = db_storage_trie_cursor.cursor;
2214 }
2215
2216 Ok(num_entries)
2217 }
2218}
2219
2220impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2221 fn unwind_account_hashing<'a>(
2222 &self,
2223 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2224 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2225 let hashed_accounts = changesets
2229 .into_iter()
2230 .map(|(_, e)| (keccak256(e.address), e.info))
2231 .collect::<Vec<_>>()
2232 .into_iter()
2233 .rev()
2234 .collect::<BTreeMap<_, _>>();
2235
2236 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2238 for (hashed_address, account) in &hashed_accounts {
2239 if let Some(account) = account {
2240 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2241 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2242 hashed_accounts_cursor.delete_current()?;
2243 }
2244 }
2245
2246 Ok(hashed_accounts)
2247 }
2248
2249 fn unwind_account_hashing_range(
2250 &self,
2251 range: impl RangeBounds<BlockNumber>,
2252 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2253 let changesets = self
2254 .tx
2255 .cursor_read::<tables::AccountChangeSets>()?
2256 .walk_range(range)?
2257 .collect::<Result<Vec<_>, _>>()?;
2258 self.unwind_account_hashing(changesets.iter())
2259 }
2260
2261 fn insert_account_for_hashing(
2262 &self,
2263 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2264 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2265 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2266 let hashed_accounts =
2267 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2268 for (hashed_address, account) in &hashed_accounts {
2269 if let Some(account) = account {
2270 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2271 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2272 hashed_accounts_cursor.delete_current()?;
2273 }
2274 }
2275 Ok(hashed_accounts)
2276 }
2277
2278 fn unwind_storage_hashing(
2279 &self,
2280 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2281 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2282 let mut hashed_storages = changesets
2284 .into_iter()
2285 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2286 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2287 })
2288 .collect::<Vec<_>>();
2289 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2290
2291 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2293 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2294 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2295 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2296 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2297
2298 if hashed_storage
2299 .seek_by_key_subkey(hashed_address, key)?
2300 .filter(|entry| entry.key == key)
2301 .is_some()
2302 {
2303 hashed_storage.delete_current()?;
2304 }
2305
2306 if !value.is_zero() {
2307 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2308 }
2309 }
2310 Ok(hashed_storage_keys)
2311 }
2312
2313 fn unwind_storage_hashing_range(
2314 &self,
2315 range: impl RangeBounds<BlockNumberAddress>,
2316 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2317 let changesets = self
2318 .tx
2319 .cursor_read::<tables::StorageChangeSets>()?
2320 .walk_range(range)?
2321 .collect::<Result<Vec<_>, _>>()?;
2322 self.unwind_storage_hashing(changesets.into_iter())
2323 }
2324
2325 fn insert_storage_for_hashing(
2326 &self,
2327 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2328 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2329 let hashed_storages =
2331 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2332 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2333 map.insert(keccak256(entry.key), entry.value);
2334 map
2335 });
2336 map.insert(keccak256(address), storage);
2337 map
2338 });
2339
2340 let hashed_storage_keys = hashed_storages
2341 .iter()
2342 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2343 .collect();
2344
2345 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2346 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2349 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2350 if hashed_storage_cursor
2351 .seek_by_key_subkey(hashed_address, key)?
2352 .filter(|entry| entry.key == key)
2353 .is_some()
2354 {
2355 hashed_storage_cursor.delete_current()?;
2356 }
2357
2358 if !value.is_zero() {
2359 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2360 }
2361 Ok(())
2362 })
2363 })?;
2364
2365 Ok(hashed_storage_keys)
2366 }
2367}
2368
2369impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2370 fn unwind_account_history_indices<'a>(
2371 &self,
2372 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2373 ) -> ProviderResult<usize> {
2374 let mut last_indices = changesets
2375 .into_iter()
2376 .map(|(index, account)| (account.address, *index))
2377 .collect::<Vec<_>>();
2378 last_indices.sort_by_key(|(a, _)| *a);
2379
2380 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2382 for &(address, rem_index) in &last_indices {
2383 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2384 &mut cursor,
2385 ShardedKey::last(address),
2386 rem_index,
2387 |sharded_key| sharded_key.key == address,
2388 )?;
2389
2390 if !partial_shard.is_empty() {
2393 cursor.insert(
2394 ShardedKey::last(address),
2395 &BlockNumberList::new_pre_sorted(partial_shard),
2396 )?;
2397 }
2398 }
2399
2400 let changesets = last_indices.len();
2401 Ok(changesets)
2402 }
2403
2404 fn unwind_account_history_indices_range(
2405 &self,
2406 range: impl RangeBounds<BlockNumber>,
2407 ) -> ProviderResult<usize> {
2408 let changesets = self
2409 .tx
2410 .cursor_read::<tables::AccountChangeSets>()?
2411 .walk_range(range)?
2412 .collect::<Result<Vec<_>, _>>()?;
2413 self.unwind_account_history_indices(changesets.iter())
2414 }
2415
2416 fn insert_account_history_index(
2417 &self,
2418 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2419 ) -> ProviderResult<()> {
2420 self.append_history_index::<_, tables::AccountsHistory>(
2421 account_transitions,
2422 ShardedKey::new,
2423 )
2424 }
2425
2426 fn unwind_storage_history_indices(
2427 &self,
2428 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2429 ) -> ProviderResult<usize> {
2430 let mut storage_changesets = changesets
2431 .into_iter()
2432 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2433 .collect::<Vec<_>>();
2434 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2435
2436 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2437 for &(address, storage_key, rem_index) in &storage_changesets {
2438 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2439 &mut cursor,
2440 StorageShardedKey::last(address, storage_key),
2441 rem_index,
2442 |storage_sharded_key| {
2443 storage_sharded_key.address == address &&
2444 storage_sharded_key.sharded_key.key == storage_key
2445 },
2446 )?;
2447
2448 if !partial_shard.is_empty() {
2451 cursor.insert(
2452 StorageShardedKey::last(address, storage_key),
2453 &BlockNumberList::new_pre_sorted(partial_shard),
2454 )?;
2455 }
2456 }
2457
2458 let changesets = storage_changesets.len();
2459 Ok(changesets)
2460 }
2461
2462 fn unwind_storage_history_indices_range(
2463 &self,
2464 range: impl RangeBounds<BlockNumberAddress>,
2465 ) -> ProviderResult<usize> {
2466 let changesets = self
2467 .tx
2468 .cursor_read::<tables::StorageChangeSets>()?
2469 .walk_range(range)?
2470 .collect::<Result<Vec<_>, _>>()?;
2471 self.unwind_storage_history_indices(changesets.into_iter())
2472 }
2473
2474 fn insert_storage_history_index(
2475 &self,
2476 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2477 ) -> ProviderResult<()> {
2478 self.append_history_index::<_, tables::StoragesHistory>(
2479 storage_transitions,
2480 |(address, storage_key), highest_block_number| {
2481 StorageShardedKey::new(address, storage_key, highest_block_number)
2482 },
2483 )
2484 }
2485
2486 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2487 {
2489 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2490 self.insert_account_history_index(indices)?;
2491 }
2492
2493 {
2495 let indices = self.changed_storages_and_blocks_with_range(range)?;
2496 self.insert_storage_history_index(indices)?;
2497 }
2498
2499 Ok(())
2500 }
2501}
2502
2503impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2504 for DatabaseProvider<TX, N>
2505{
2506 fn take_block_and_execution_above(
2507 &self,
2508 block: BlockNumber,
2509 ) -> ProviderResult<Chain<Self::Primitives>> {
2510 let range = block + 1..=self.last_block_number()?;
2511
2512 self.unwind_trie_state_range(range.clone())?;
2513
2514 let execution_state = self.take_state_above(block)?;
2516
2517 let blocks = self.recovered_block_range(range)?;
2518
2519 self.remove_blocks_above(block)?;
2522
2523 self.update_pipeline_stages(block, true)?;
2525
2526 Ok(Chain::new(blocks, execution_state, None))
2527 }
2528
2529 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2530 let range = block + 1..=self.last_block_number()?;
2531
2532 self.unwind_trie_state_range(range)?;
2533
2534 self.remove_state_above(block)?;
2536
2537 self.remove_blocks_above(block)?;
2540
2541 self.update_pipeline_stages(block, true)?;
2543
2544 Ok(())
2545 }
2546}
2547
2548impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2549 for DatabaseProvider<TX, N>
2550{
2551 type Block = BlockTy<N>;
2552 type Receipt = ReceiptTy<N>;
2553
2554 fn insert_block(
2576 &self,
2577 block: RecoveredBlock<Self::Block>,
2578 ) -> ProviderResult<StoredBlockBodyIndices> {
2579 let block_number = block.number();
2580
2581 let mut durations_recorder = metrics::DurationsRecorder::default();
2582
2583 let ttd = if block_number == 0 {
2585 block.header().difficulty()
2586 } else {
2587 let parent_block_number = block_number - 1;
2588 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2589 durations_recorder.record_relative(metrics::Action::GetParentTD);
2590 parent_ttd + block.header().difficulty()
2591 };
2592
2593 self.static_file_provider
2594 .get_writer(block_number, StaticFileSegment::Headers)?
2595 .append_header(block.header(), ttd, &block.hash())?;
2596
2597 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2598 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2599
2600 let mut next_tx_num = self
2601 .tx
2602 .cursor_read::<tables::TransactionBlocks>()?
2603 .last()?
2604 .map(|(n, _)| n + 1)
2605 .unwrap_or_default();
2606 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2607 let first_tx_num = next_tx_num;
2608
2609 let tx_count = block.body().transaction_count() as u64;
2610
2611 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2613 let hash = transaction.tx_hash();
2614
2615 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2616 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2617 }
2618
2619 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2620 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2621 }
2622 next_tx_num += 1;
2623 }
2624
2625 self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?;
2626
2627 debug!(
2628 target: "providers::db",
2629 ?block_number,
2630 actions = ?durations_recorder.actions,
2631 "Inserted block"
2632 );
2633
2634 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2635 }
2636
2637 fn append_block_bodies(
2638 &self,
2639 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2640 ) -> ProviderResult<()> {
2641 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2642
2643 let mut tx_writer =
2645 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2646
2647 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2648 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2649
2650 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2652
2653 for (block_number, body) in &bodies {
2654 tx_writer.increment_block(*block_number)?;
2656
2657 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2658 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2659
2660 let mut durations_recorder = metrics::DurationsRecorder::default();
2661
2662 block_indices_cursor.append(*block_number, &block_indices)?;
2664
2665 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2666
2667 let Some(body) = body else { continue };
2668
2669 if !body.transactions().is_empty() {
2671 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2672 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2673 }
2674
2675 for transaction in body.transactions() {
2677 tx_writer.append_transaction(next_tx_num, transaction)?;
2678
2679 next_tx_num += 1;
2681 }
2682 }
2683
2684 self.storage.writer().write_block_bodies(self, bodies)?;
2685
2686 Ok(())
2687 }
2688
2689 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2690 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2692 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2693 }
2694
2695 let highest_static_file_block = self
2697 .static_file_provider()
2698 .get_highest_static_file_block(StaticFileSegment::Headers)
2699 .expect("todo: error handling, headers should exist");
2700
2701 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
2707 self.static_file_provider()
2708 .get_writer(block, StaticFileSegment::Headers)?
2709 .prune_headers(highest_static_file_block.saturating_sub(block))?;
2710
2711 let unwind_tx_from = self
2713 .block_body_indices(block)?
2714 .map(|b| b.next_tx_num())
2715 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2716
2717 let unwind_tx_to = self
2719 .tx
2720 .cursor_read::<tables::BlockBodyIndices>()?
2721 .last()?
2722 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2724 .1
2725 .last_tx_num();
2726
2727 if unwind_tx_from <= unwind_tx_to {
2728 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2729 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2730 }
2731 }
2732
2733 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
2734
2735 self.remove_bodies_above(block)?;
2736
2737 Ok(())
2738 }
2739
2740 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
2741 self.storage.writer().remove_block_bodies_above(self, block)?;
2742
2743 let unwind_tx_from = self
2745 .block_body_indices(block)?
2746 .map(|b| b.next_tx_num())
2747 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2748
2749 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
2750 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
2751
2752 let static_file_tx_num =
2753 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
2754
2755 let to_delete = static_file_tx_num
2756 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
2757 .unwrap_or_default();
2758
2759 self.static_file_provider
2760 .latest_writer(StaticFileSegment::Transactions)?
2761 .prune_transactions(to_delete, block)?;
2762
2763 Ok(())
2764 }
2765
2766 fn append_blocks_with_state(
2768 &self,
2769 blocks: Vec<RecoveredBlock<Self::Block>>,
2770 execution_outcome: &ExecutionOutcome<Self::Receipt>,
2771 hashed_state: HashedPostStateSorted,
2772 ) -> ProviderResult<()> {
2773 if blocks.is_empty() {
2774 debug!(target: "providers::db", "Attempted to append empty block range");
2775 return Ok(())
2776 }
2777
2778 let first_number = blocks[0].number();
2781
2782 let last_block_number = blocks[blocks.len() - 1].number();
2785
2786 let mut durations_recorder = metrics::DurationsRecorder::default();
2787
2788 for block in blocks {
2790 self.insert_block(block)?;
2791 durations_recorder.record_relative(metrics::Action::InsertBlock);
2792 }
2793
2794 self.write_state(execution_outcome, OriginalValuesKnown::No)?;
2795 durations_recorder.record_relative(metrics::Action::InsertState);
2796
2797 self.write_hashed_state(&hashed_state)?;
2799 durations_recorder.record_relative(metrics::Action::InsertHashes);
2800
2801 self.update_history_indices(first_number..=last_block_number)?;
2802 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
2803
2804 self.update_pipeline_stages(last_block_number, false)?;
2806 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
2807
2808 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
2809
2810 Ok(())
2811 }
2812}
2813
2814impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
2815 fn get_prune_checkpoint(
2816 &self,
2817 segment: PruneSegment,
2818 ) -> ProviderResult<Option<PruneCheckpoint>> {
2819 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
2820 }
2821
2822 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
2823 Ok(self
2824 .tx
2825 .cursor_read::<tables::PruneCheckpoints>()?
2826 .walk(None)?
2827 .collect::<Result<_, _>>()?)
2828 }
2829}
2830
2831impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
2832 fn save_prune_checkpoint(
2833 &self,
2834 segment: PruneSegment,
2835 checkpoint: PruneCheckpoint,
2836 ) -> ProviderResult<()> {
2837 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
2838 }
2839}
2840
2841impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
2842 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2843 let db_entries = self.tx.entries::<T>()?;
2844 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
2845 Ok(entries) => entries,
2846 Err(ProviderError::UnsupportedProvider) => 0,
2847 Err(err) => return Err(err),
2848 };
2849
2850 Ok(db_entries + static_file_entries)
2851 }
2852}
2853
2854impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
2855 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
2856 let mut finalized_blocks = self
2857 .tx
2858 .cursor_read::<tables::ChainState>()?
2859 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
2860 .take(1)
2861 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
2862
2863 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
2864 Ok(last_finalized_block_number)
2865 }
2866
2867 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
2868 let mut finalized_blocks = self
2869 .tx
2870 .cursor_read::<tables::ChainState>()?
2871 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
2872 .take(1)
2873 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
2874
2875 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
2876 Ok(last_finalized_block_number)
2877 }
2878}
2879
2880impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
2881 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
2882 Ok(self
2883 .tx
2884 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
2885 }
2886
2887 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
2888 Ok(self
2889 .tx
2890 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
2891 }
2892}
2893
2894impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
2895 type Tx = TX;
2896
2897 fn tx_ref(&self) -> &Self::Tx {
2898 &self.tx
2899 }
2900
2901 fn tx_mut(&mut self) -> &mut Self::Tx {
2902 &mut self.tx
2903 }
2904
2905 fn into_tx(self) -> Self::Tx {
2906 self.tx
2907 }
2908
2909 fn prune_modes_ref(&self) -> &PruneModes {
2910 self.prune_modes_ref()
2911 }
2912
2913 fn commit(self) -> ProviderResult<bool> {
2915 if self.static_file_provider.has_unwind_queued() {
2920 self.tx.commit()?;
2921 self.static_file_provider.commit()?;
2922 } else {
2923 self.static_file_provider.commit()?;
2924 self.tx.commit()?;
2925 }
2926
2927 Ok(true)
2928 }
2929}
2930
2931#[cfg(test)]
2932mod tests {
2933 use super::*;
2934 use crate::{
2935 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
2936 BlockWriter,
2937 };
2938 use reth_testing_utils::generators::{self, random_block, BlockParams};
2939
2940 #[test]
2941 fn test_receipts_by_block_range_empty_range() {
2942 let factory = create_test_provider_factory();
2943 let provider = factory.provider().unwrap();
2944
2945 let start = 10u64;
2947 let end = 9u64;
2948 let result = provider.receipts_by_block_range(start..=end).unwrap();
2949 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
2950 }
2951
2952 #[test]
2953 fn test_receipts_by_block_range_nonexistent_blocks() {
2954 let factory = create_test_provider_factory();
2955 let provider = factory.provider().unwrap();
2956
2957 let result = provider.receipts_by_block_range(10..=12).unwrap();
2959 assert_eq!(result, vec![vec![], vec![], vec![]]);
2960 }
2961
2962 #[test]
2963 fn test_receipts_by_block_range_single_block() {
2964 let factory = create_test_provider_factory();
2965 let data = BlockchainTestData::default();
2966
2967 let provider_rw = factory.provider_rw().unwrap();
2968 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
2969 provider_rw
2970 .write_state(
2971 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
2972 crate::OriginalValuesKnown::No,
2973 )
2974 .unwrap();
2975 provider_rw.insert_block(data.blocks[0].0.clone()).unwrap();
2976 provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
2977 provider_rw.commit().unwrap();
2978
2979 let provider = factory.provider().unwrap();
2980 let result = provider.receipts_by_block_range(1..=1).unwrap();
2981
2982 assert_eq!(result.len(), 1);
2984 assert_eq!(result[0].len(), 1);
2985 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
2986 }
2987
2988 #[test]
2989 fn test_receipts_by_block_range_multiple_blocks() {
2990 let factory = create_test_provider_factory();
2991 let data = BlockchainTestData::default();
2992
2993 let provider_rw = factory.provider_rw().unwrap();
2994 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
2995 provider_rw
2996 .write_state(
2997 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
2998 crate::OriginalValuesKnown::No,
2999 )
3000 .unwrap();
3001 for i in 0..3 {
3002 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3003 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3004 }
3005 provider_rw.commit().unwrap();
3006
3007 let provider = factory.provider().unwrap();
3008 let result = provider.receipts_by_block_range(1..=3).unwrap();
3009
3010 assert_eq!(result.len(), 3);
3012 for (i, block_receipts) in result.iter().enumerate() {
3013 assert_eq!(block_receipts.len(), 1);
3014 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3015 }
3016 }
3017
3018 #[test]
3019 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3020 let factory = create_test_provider_factory();
3021 let data = BlockchainTestData::default();
3022
3023 let provider_rw = factory.provider_rw().unwrap();
3024 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3025 provider_rw
3026 .write_state(
3027 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3028 crate::OriginalValuesKnown::No,
3029 )
3030 .unwrap();
3031
3032 for i in 0..3 {
3034 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3035 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3036 }
3037 provider_rw.commit().unwrap();
3038
3039 let provider = factory.provider().unwrap();
3040 let result = provider.receipts_by_block_range(1..=3).unwrap();
3041
3042 assert_eq!(result.len(), 3);
3044 for block_receipts in &result {
3045 assert_eq!(block_receipts.len(), 1);
3046 }
3047 }
3048
3049 #[test]
3050 fn test_receipts_by_block_range_partial_range() {
3051 let factory = create_test_provider_factory();
3052 let data = BlockchainTestData::default();
3053
3054 let provider_rw = factory.provider_rw().unwrap();
3055 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3056 provider_rw
3057 .write_state(
3058 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3059 crate::OriginalValuesKnown::No,
3060 )
3061 .unwrap();
3062 for i in 0..3 {
3063 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3064 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3065 }
3066 provider_rw.commit().unwrap();
3067
3068 let provider = factory.provider().unwrap();
3069
3070 let result = provider.receipts_by_block_range(2..=5).unwrap();
3072 assert_eq!(result.len(), 4);
3073
3074 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3081 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3082 }
3083
3084 #[test]
3085 fn test_receipts_by_block_range_all_empty_blocks() {
3086 let factory = create_test_provider_factory();
3087 let mut rng = generators::rng();
3088
3089 let mut blocks = Vec::new();
3091 for i in 0..3 {
3092 let block =
3093 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3094 blocks.push(block);
3095 }
3096
3097 let provider_rw = factory.provider_rw().unwrap();
3098 for block in blocks {
3099 provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
3100 }
3101 provider_rw.commit().unwrap();
3102
3103 let provider = factory.provider().unwrap();
3104 let result = provider.receipts_by_block_range(1..=3).unwrap();
3105
3106 assert_eq!(result.len(), 3);
3107 for block_receipts in result {
3108 assert_eq!(block_receipts.len(), 0);
3109 }
3110 }
3111
3112 #[test]
3113 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3114 let factory = create_test_provider_factory();
3115 let data = BlockchainTestData::default();
3116
3117 let provider_rw = factory.provider_rw().unwrap();
3118 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3119 provider_rw
3120 .write_state(
3121 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3122 crate::OriginalValuesKnown::No,
3123 )
3124 .unwrap();
3125 for i in 0..3 {
3126 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3127 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3128 }
3129 provider_rw.commit().unwrap();
3130
3131 let provider = factory.provider().unwrap();
3132
3133 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3135
3136 let mut individual_results = Vec::new();
3138 for block_num in 1..=3 {
3139 let receipts =
3140 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3141 individual_results.push(receipts);
3142 }
3143
3144 assert_eq!(range_result, individual_results);
3145 }
3146}