1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15 HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16 OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17 StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
18 StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
19 TransactionsProviderExt, TrieWriter,
20};
21use alloy_consensus::{
22 transaction::{SignerRecoverable, TransactionMeta},
23 BlockHeader, Header, TxReceipt,
24};
25use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
26use alloy_primitives::{
27 keccak256,
28 map::{hash_map, B256Map, HashMap, HashSet},
29 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
30};
31use itertools::Itertools;
32use rayon::slice::ParallelSliceMut;
33use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
34use reth_db_api::{
35 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
36 database::Database,
37 models::{
38 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
39 ShardedKey, StoredBlockBodyIndices,
40 },
41 table::Table,
42 tables,
43 transaction::{DbTx, DbTxMut},
44 BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
45};
46use reth_execution_types::{Chain, ExecutionOutcome};
47use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
48use reth_primitives_traits::{
49 Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
50 SealedHeader, SignedTransaction, StorageEntry,
51};
52use reth_prune_types::{
53 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
54};
55use reth_stages_types::{StageCheckpoint, StageId};
56use reth_static_file_types::StaticFileSegment;
57use reth_storage_api::{
58 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
59 StorageChangeSetReader, TryIntoHistoricalStateProvider,
60};
61use reth_storage_errors::provider::{ProviderResult, RootMismatch};
62use reth_trie::{
63 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
64 updates::{StorageTrieUpdates, TrieUpdates},
65 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
66};
67use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
68use revm_database::states::{
69 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
70};
71use std::{
72 cmp::Ordering,
73 collections::{BTreeMap, BTreeSet},
74 fmt::Debug,
75 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
76 sync::{mpsc, Arc},
77};
78use tracing::{debug, trace};
79
80pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
82
83#[derive(Debug)]
88pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
89 pub DatabaseProvider<<DB as Database>::TXMut, N>,
90);
91
92impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
93 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
94
95 fn deref(&self) -> &Self::Target {
96 &self.0
97 }
98}
99
100impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
101 fn deref_mut(&mut self) -> &mut Self::Target {
102 &mut self.0
103 }
104}
105
106impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
107 for DatabaseProviderRW<DB, N>
108{
109 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
110 &self.0
111 }
112}
113
114impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
115 pub fn commit(self) -> ProviderResult<bool> {
117 self.0.commit()
118 }
119
120 pub fn into_tx(self) -> <DB as Database>::TXMut {
122 self.0.into_tx()
123 }
124}
125
126impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
127 for DatabaseProvider<<DB as Database>::TXMut, N>
128{
129 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
130 provider.0
131 }
132}
133
134#[derive(Debug)]
137pub struct DatabaseProvider<TX, N: NodeTypes> {
138 tx: TX,
140 chain_spec: Arc<N::ChainSpec>,
142 static_file_provider: StaticFileProvider<N::Primitives>,
144 prune_modes: PruneModes,
146 storage: Arc<N::Storage>,
148}
149
150impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
151 pub const fn prune_modes_ref(&self) -> &PruneModes {
153 &self.prune_modes
154 }
155}
156
157impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
158 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
160 trace!(target: "providers::db", "Returning latest state provider");
161 Box::new(LatestStateProviderRef::new(self))
162 }
163
164 pub fn history_by_block_hash<'a>(
166 &'a self,
167 block_hash: BlockHash,
168 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
169 let mut block_number =
170 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
171 if block_number == self.best_block_number().unwrap_or_default() &&
172 block_number == self.last_block_number().unwrap_or_default()
173 {
174 return Ok(Box::new(LatestStateProviderRef::new(self)))
175 }
176
177 block_number += 1;
179
180 let account_history_prune_checkpoint =
181 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
182 let storage_history_prune_checkpoint =
183 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
184
185 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
186
187 if let Some(prune_checkpoint_block_number) =
190 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
191 {
192 state_provider = state_provider.with_lowest_available_account_history_block_number(
193 prune_checkpoint_block_number + 1,
194 );
195 }
196 if let Some(prune_checkpoint_block_number) =
197 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
198 {
199 state_provider = state_provider.with_lowest_available_storage_history_block_number(
200 prune_checkpoint_block_number + 1,
201 );
202 }
203
204 Ok(Box::new(state_provider))
205 }
206
207 #[cfg(feature = "test-utils")]
208 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
210 self.prune_modes = prune_modes;
211 }
212}
213
214impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
215 type Primitives = N::Primitives;
216}
217
218impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
219 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
221 self.static_file_provider.clone()
222 }
223}
224
225impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
226 for DatabaseProvider<TX, N>
227{
228 type ChainSpec = N::ChainSpec;
229
230 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
231 self.chain_spec.clone()
232 }
233}
234
235impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
236 pub const fn new_rw(
238 tx: TX,
239 chain_spec: Arc<N::ChainSpec>,
240 static_file_provider: StaticFileProvider<N::Primitives>,
241 prune_modes: PruneModes,
242 storage: Arc<N::Storage>,
243 ) -> Self {
244 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
245 }
246}
247
248impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
249 fn as_ref(&self) -> &Self {
250 self
251 }
252}
253
254impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
255 pub fn unwind_trie_state_range(
260 &self,
261 range: RangeInclusive<BlockNumber>,
262 ) -> ProviderResult<()> {
263 let changed_accounts = self
264 .tx
265 .cursor_read::<tables::AccountChangeSets>()?
266 .walk_range(range.clone())?
267 .collect::<Result<Vec<_>, _>>()?;
268
269 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
271 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
272 let mut destroyed_accounts = HashSet::default();
273 for (hashed_address, account) in hashed_addresses {
274 account_prefix_set.insert(Nibbles::unpack(hashed_address));
275 if account.is_none() {
276 destroyed_accounts.insert(hashed_address);
277 }
278 }
279
280 self.unwind_account_history_indices(changed_accounts.iter())?;
282 let storage_range = BlockNumberAddress::range(range.clone());
283
284 let changed_storages = self
285 .tx
286 .cursor_read::<tables::StorageChangeSets>()?
287 .walk_range(storage_range)?
288 .collect::<Result<Vec<_>, _>>()?;
289
290 let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
293 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
294 for (hashed_address, hashed_slots) in storage_entries {
295 account_prefix_set.insert(Nibbles::unpack(hashed_address));
296 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
297 for slot in hashed_slots {
298 storage_prefix_set.insert(Nibbles::unpack(slot));
299 }
300 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
301 }
302
303 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
305
306 let prefix_sets = TriePrefixSets {
310 account_prefix_set: account_prefix_set.freeze(),
311 storage_prefix_sets,
312 destroyed_accounts,
313 };
314 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
315 .with_prefix_sets(prefix_sets)
316 .root_with_updates()
317 .map_err(reth_db_api::DatabaseError::from)?;
318
319 let parent_number = range.start().saturating_sub(1);
320 let parent_state_root = self
321 .header_by_number(parent_number)?
322 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
323 .state_root();
324
325 if new_state_root != parent_state_root {
328 let parent_hash = self
329 .block_hash(parent_number)?
330 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
331 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
332 root: GotExpected { got: new_state_root, expected: parent_state_root },
333 block_number: parent_number,
334 block_hash: parent_hash,
335 })))
336 }
337 self.write_trie_updates(&trie_updates)?;
338
339 Ok(())
340 }
341
342 fn remove_receipts_from(
344 &self,
345 from_tx: TxNumber,
346 last_block: BlockNumber,
347 remove_from: StorageLocation,
348 ) -> ProviderResult<()> {
349 if remove_from.database() {
350 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
352 }
353
354 if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
355 let static_file_receipt_num =
356 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
357
358 let to_delete = static_file_receipt_num
359 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
360 .unwrap_or_default();
361
362 self.static_file_provider
363 .latest_writer(StaticFileSegment::Receipts)?
364 .prune_receipts(to_delete, last_block)?;
365 }
366
367 Ok(())
368 }
369}
370
371impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
372 fn try_into_history_at_block(
373 self,
374 mut block_number: BlockNumber,
375 ) -> ProviderResult<StateProviderBox> {
376 if block_number == self.best_block_number().unwrap_or_default() {
379 return Ok(Box::new(LatestStateProvider::new(self)))
380 }
381
382 block_number += 1;
384
385 let account_history_prune_checkpoint =
386 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
387 let storage_history_prune_checkpoint =
388 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
389
390 let mut state_provider = HistoricalStateProvider::new(self, block_number);
391
392 if let Some(prune_checkpoint_block_number) =
395 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
396 {
397 state_provider = state_provider.with_lowest_available_account_history_block_number(
398 prune_checkpoint_block_number + 1,
399 );
400 }
401 if let Some(prune_checkpoint_block_number) =
402 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
403 {
404 state_provider = state_provider.with_lowest_available_storage_history_block_number(
405 prune_checkpoint_block_number + 1,
406 );
407 }
408
409 Ok(Box::new(state_provider))
410 }
411}
412
413impl<
414 Tx: DbTx + DbTxMut + 'static,
415 N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
416 > DatabaseProvider<Tx, N>
417{
418 pub fn insert_historical_block(
422 &self,
423 block: RecoveredBlock<<Self as BlockWriter>::Block>,
424 ) -> ProviderResult<StoredBlockBodyIndices> {
425 let ttd = if block.number() == 0 {
426 block.header().difficulty()
427 } else {
428 let parent_block_number = block.number() - 1;
429 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
430 parent_ttd + block.header().difficulty()
431 };
432
433 let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
434
435 let segment_header = writer.user_header();
437 if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
438 for block_number in 0..block.number() {
439 let mut prev = block.clone_header();
440 prev.number = block_number;
441 writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
442 }
443 }
444
445 writer.append_header(block.header(), ttd, &block.hash())?;
446
447 self.insert_block(block, StorageLocation::Database)
448 }
449}
450
451fn unwind_history_shards<S, T, C>(
466 cursor: &mut C,
467 start_key: T::Key,
468 block_number: BlockNumber,
469 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
470) -> ProviderResult<Vec<u64>>
471where
472 T: Table<Value = BlockNumberList>,
473 T::Key: AsRef<ShardedKey<S>>,
474 C: DbCursorRO<T> + DbCursorRW<T>,
475{
476 let mut item = cursor.seek_exact(start_key)?;
478 while let Some((sharded_key, list)) = item {
479 if !shard_belongs_to_key(&sharded_key) {
481 break
482 }
483
484 cursor.delete_current()?;
487
488 let first = list.iter().next().expect("List can't be empty");
491
492 if first >= block_number {
495 item = cursor.prev()?;
496 continue
497 }
498 else if block_number <= sharded_key.as_ref().highest_block_number {
501 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
504 }
505 return Ok(list.iter().collect::<Vec<_>>())
508 }
509
510 Ok(Vec::new())
512}
513
514impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
515 pub const fn new(
517 tx: TX,
518 chain_spec: Arc<N::ChainSpec>,
519 static_file_provider: StaticFileProvider<N::Primitives>,
520 prune_modes: PruneModes,
521 storage: Arc<N::Storage>,
522 ) -> Self {
523 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
524 }
525
526 pub fn into_tx(self) -> TX {
528 self.tx
529 }
530
531 pub const fn tx_mut(&mut self) -> &mut TX {
533 &mut self.tx
534 }
535
536 pub const fn tx_ref(&self) -> &TX {
538 &self.tx
539 }
540
541 pub fn chain_spec(&self) -> &N::ChainSpec {
543 &self.chain_spec
544 }
545}
546
547impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
548 fn transactions_by_tx_range_with_cursor<C>(
549 &self,
550 range: impl RangeBounds<TxNumber>,
551 cursor: &mut C,
552 ) -> ProviderResult<Vec<TxTy<N>>>
553 where
554 C: DbCursorRO<tables::Transactions<TxTy<N>>>,
555 {
556 self.static_file_provider.get_range_with_static_file_or_database(
557 StaticFileSegment::Transactions,
558 to_range(range),
559 |static_file, range, _| static_file.transactions_by_tx_range(range),
560 |range, _| self.cursor_collect(cursor, range),
561 |_| true,
562 )
563 }
564
565 fn recovered_block<H, HF, B, BF>(
566 &self,
567 id: BlockHashOrNumber,
568 _transaction_kind: TransactionVariant,
569 header_by_number: HF,
570 construct_block: BF,
571 ) -> ProviderResult<Option<B>>
572 where
573 H: AsRef<HeaderTy<N>>,
574 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
575 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
576 {
577 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
578 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
579
580 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
587
588 let tx_range = body.tx_num_range();
589
590 let (transactions, senders) = if tx_range.is_empty() {
591 (vec![], vec![])
592 } else {
593 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
594 };
595
596 let body = self
597 .storage
598 .reader()
599 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
600 .pop()
601 .ok_or(ProviderError::InvalidStorageOutput)?;
602
603 construct_block(header, body, senders)
604 }
605
606 fn block_range<F, H, HF, R>(
616 &self,
617 range: RangeInclusive<BlockNumber>,
618 headers_range: HF,
619 mut assemble_block: F,
620 ) -> ProviderResult<Vec<R>>
621 where
622 H: AsRef<HeaderTy<N>>,
623 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
624 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
625 {
626 if range.is_empty() {
627 return Ok(Vec::new())
628 }
629
630 let len = range.end().saturating_sub(*range.start()) as usize;
631 let mut blocks = Vec::with_capacity(len);
632
633 let headers = headers_range(range.clone())?;
634 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
635
636 let present_headers = self
642 .block_body_indices_range(range)?
643 .into_iter()
644 .map(|b| b.tx_num_range())
645 .zip(headers)
646 .collect::<Vec<_>>();
647
648 let mut inputs = Vec::new();
649 for (tx_range, header) in &present_headers {
650 let transactions = if tx_range.is_empty() {
651 Vec::new()
652 } else {
653 self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
654 };
655
656 inputs.push((header.as_ref(), transactions));
657 }
658
659 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
660
661 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
662 blocks.push(assemble_block(header, body, tx_range)?);
663 }
664
665 Ok(blocks)
666 }
667
668 fn block_with_senders_range<H, HF, B, BF>(
679 &self,
680 range: RangeInclusive<BlockNumber>,
681 headers_range: HF,
682 assemble_block: BF,
683 ) -> ProviderResult<Vec<B>>
684 where
685 H: AsRef<HeaderTy<N>>,
686 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
687 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
688 {
689 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
690
691 self.block_range(range, headers_range, |header, body, tx_range| {
692 let senders = if tx_range.is_empty() {
693 Vec::new()
694 } else {
695 let known_senders =
697 senders_cursor
698 .walk_range(tx_range.clone())?
699 .collect::<Result<HashMap<_, _>, _>>()?;
700
701 let mut senders = Vec::with_capacity(body.transactions().len());
702 for (tx_num, tx) in tx_range.zip(body.transactions()) {
703 match known_senders.get(&tx_num) {
704 None => {
705 let sender = tx.recover_signer_unchecked()?;
707 senders.push(sender);
708 }
709 Some(sender) => senders.push(*sender),
710 }
711 }
712
713 senders
714 };
715
716 assemble_block(header, body, senders)
717 })
718 }
719
720 fn populate_bundle_state<A, S>(
724 &self,
725 account_changeset: Vec<(u64, AccountBeforeTx)>,
726 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
727 plain_accounts_cursor: &mut A,
728 plain_storage_cursor: &mut S,
729 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
730 where
731 A: DbCursorRO<PlainAccountState>,
732 S: DbDupCursorRO<PlainStorageState>,
733 {
734 let mut state: BundleStateInit = HashMap::default();
738
739 let mut reverts: RevertsInit = HashMap::default();
745
746 for (block_number, account_before) in account_changeset.into_iter().rev() {
748 let AccountBeforeTx { info: old_info, address } = account_before;
749 match state.entry(address) {
750 hash_map::Entry::Vacant(entry) => {
751 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
752 entry.insert((old_info, new_info, HashMap::default()));
753 }
754 hash_map::Entry::Occupied(mut entry) => {
755 entry.get_mut().0 = old_info;
757 }
758 }
759 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
761 }
762
763 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
765 let BlockNumberAddress((block_number, address)) = block_and_address;
766 let account_state = match state.entry(address) {
768 hash_map::Entry::Vacant(entry) => {
769 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
770 entry.insert((present_info, present_info, HashMap::default()))
771 }
772 hash_map::Entry::Occupied(entry) => entry.into_mut(),
773 };
774
775 match account_state.2.entry(old_storage.key) {
777 hash_map::Entry::Vacant(entry) => {
778 let new_storage = plain_storage_cursor
779 .seek_by_key_subkey(address, old_storage.key)?
780 .filter(|storage| storage.key == old_storage.key)
781 .unwrap_or_default();
782 entry.insert((old_storage.value, new_storage.value));
783 }
784 hash_map::Entry::Occupied(mut entry) => {
785 entry.get_mut().0 = old_storage.value;
786 }
787 };
788
789 reverts
790 .entry(block_number)
791 .or_default()
792 .entry(address)
793 .or_default()
794 .1
795 .push(old_storage);
796 }
797
798 Ok((state, reverts))
799 }
800}
801
802impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
803 pub fn commit(self) -> ProviderResult<bool> {
805 Ok(self.tx.commit()?)
806 }
807
808 fn take_shard<T>(
811 &self,
812 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
813 key: T::Key,
814 ) -> ProviderResult<Vec<u64>>
815 where
816 T: Table<Value = BlockNumberList>,
817 {
818 if let Some((_, list)) = cursor.seek_exact(key)? {
819 cursor.delete_current()?;
821 let list = list.iter().collect::<Vec<_>>();
822 return Ok(list)
823 }
824 Ok(Vec::new())
825 }
826
827 fn append_history_index<P, T>(
835 &self,
836 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
837 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
838 ) -> ProviderResult<()>
839 where
840 P: Copy,
841 T: Table<Value = BlockNumberList>,
842 {
843 let mut cursor = self.tx.cursor_write::<T>()?;
844 for (partial_key, indices) in index_updates {
845 let mut last_shard =
846 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
847 last_shard.extend(indices);
848 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
850 while let Some(list) = chunks.next() {
851 let highest_block_number = if chunks.peek().is_some() {
852 *list.last().expect("`chunks` does not return empty list")
853 } else {
854 u64::MAX
856 };
857 cursor.insert(
858 sharded_key_factory(partial_key, highest_block_number),
859 &BlockNumberList::new_pre_sorted(list.iter().copied()),
860 )?;
861 }
862 }
863 Ok(())
864 }
865}
866
867impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
868 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
869 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
870 }
871}
872
873impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
874 fn changed_accounts_with_range(
875 &self,
876 range: impl RangeBounds<BlockNumber>,
877 ) -> ProviderResult<BTreeSet<Address>> {
878 self.tx
879 .cursor_read::<tables::AccountChangeSets>()?
880 .walk_range(range)?
881 .map(|entry| {
882 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
883 })
884 .collect()
885 }
886
887 fn basic_accounts(
888 &self,
889 iter: impl IntoIterator<Item = Address>,
890 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
891 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
892 Ok(iter
893 .into_iter()
894 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
895 .collect::<Result<Vec<_>, _>>()?)
896 }
897
898 fn changed_accounts_and_blocks_with_range(
899 &self,
900 range: RangeInclusive<BlockNumber>,
901 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
902 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
903
904 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
905 BTreeMap::new(),
906 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
907 let (index, account) = entry?;
908 accounts.entry(account.address).or_default().push(index);
909 Ok(accounts)
910 },
911 )?;
912
913 Ok(account_transitions)
914 }
915}
916
917impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
918 fn storage_changeset(
919 &self,
920 block_number: BlockNumber,
921 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
922 let range = block_number..=block_number;
923 let storage_range = BlockNumberAddress::range(range);
924 self.tx
925 .cursor_dup_read::<tables::StorageChangeSets>()?
926 .walk_range(storage_range)?
927 .map(|result| -> ProviderResult<_> { Ok(result?) })
928 .collect()
929 }
930}
931
932impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
933 fn account_block_changeset(
934 &self,
935 block_number: BlockNumber,
936 ) -> ProviderResult<Vec<AccountBeforeTx>> {
937 let range = block_number..=block_number;
938 self.tx
939 .cursor_read::<tables::AccountChangeSets>()?
940 .walk_range(range)?
941 .map(|result| -> ProviderResult<_> {
942 let (_, account_before) = result?;
943 Ok(account_before)
944 })
945 .collect()
946 }
947}
948
949impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
950 for DatabaseProvider<TX, N>
951{
952 type Header = HeaderTy<N>;
953
954 fn local_tip_header(
955 &self,
956 highest_uninterrupted_block: BlockNumber,
957 ) -> ProviderResult<SealedHeader<Self::Header>> {
958 let static_file_provider = self.static_file_provider();
959
960 let next_static_file_block_num = static_file_provider
963 .get_highest_static_file_block(StaticFileSegment::Headers)
964 .map(|id| id + 1)
965 .unwrap_or_default();
966 let next_block = highest_uninterrupted_block + 1;
967
968 match next_static_file_block_num.cmp(&next_block) {
969 Ordering::Greater => {
972 let mut static_file_producer =
973 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
974 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
975 static_file_producer.commit()?
978 }
979 Ordering::Less => {
980 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
982 }
983 Ordering::Equal => {}
984 }
985
986 let local_head = static_file_provider
987 .sealed_header(highest_uninterrupted_block)?
988 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
989
990 Ok(local_head)
991 }
992}
993
994impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
995 type Header = HeaderTy<N>;
996
997 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
998 if let Some(num) = self.block_number(*block_hash)? {
999 Ok(self.header_by_number(num)?)
1000 } else {
1001 Ok(None)
1002 }
1003 }
1004
1005 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1006 self.static_file_provider.get_with_static_file_or_database(
1007 StaticFileSegment::Headers,
1008 num,
1009 |static_file| static_file.header_by_number(num),
1010 || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1011 )
1012 }
1013
1014 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1015 if let Some(num) = self.block_number(*block_hash)? {
1016 self.header_td_by_number(num)
1017 } else {
1018 Ok(None)
1019 }
1020 }
1021
1022 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1023 if self.chain_spec.is_paris_active_at_block(number) {
1024 if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1025 return Ok(Some(td))
1028 }
1029 }
1030
1031 self.static_file_provider.get_with_static_file_or_database(
1032 StaticFileSegment::Headers,
1033 number,
1034 |static_file| static_file.header_td_by_number(number),
1035 || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1036 )
1037 }
1038
1039 fn headers_range(
1040 &self,
1041 range: impl RangeBounds<BlockNumber>,
1042 ) -> ProviderResult<Vec<Self::Header>> {
1043 self.static_file_provider.get_range_with_static_file_or_database(
1044 StaticFileSegment::Headers,
1045 to_range(range),
1046 |static_file, range, _| static_file.headers_range(range),
1047 |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1048 |_| true,
1049 )
1050 }
1051
1052 fn sealed_header(
1053 &self,
1054 number: BlockNumber,
1055 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1056 self.static_file_provider.get_with_static_file_or_database(
1057 StaticFileSegment::Headers,
1058 number,
1059 |static_file| static_file.sealed_header(number),
1060 || {
1061 if let Some(header) = self.header_by_number(number)? {
1062 let hash = self
1063 .block_hash(number)?
1064 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1065 Ok(Some(SealedHeader::new(header, hash)))
1066 } else {
1067 Ok(None)
1068 }
1069 },
1070 )
1071 }
1072
1073 fn sealed_headers_while(
1074 &self,
1075 range: impl RangeBounds<BlockNumber>,
1076 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1077 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1078 self.static_file_provider.get_range_with_static_file_or_database(
1079 StaticFileSegment::Headers,
1080 to_range(range),
1081 |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1082 |range, mut predicate| {
1083 let mut headers = vec![];
1084 for entry in
1085 self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1086 {
1087 let (number, header) = entry?;
1088 let hash = self
1089 .block_hash(number)?
1090 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1091 let sealed = SealedHeader::new(header, hash);
1092 if !predicate(&sealed) {
1093 break
1094 }
1095 headers.push(sealed);
1096 }
1097 Ok(headers)
1098 },
1099 predicate,
1100 )
1101 }
1102}
1103
1104impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1105 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1106 self.static_file_provider.get_with_static_file_or_database(
1107 StaticFileSegment::Headers,
1108 number,
1109 |static_file| static_file.block_hash(number),
1110 || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1111 )
1112 }
1113
1114 fn canonical_hashes_range(
1115 &self,
1116 start: BlockNumber,
1117 end: BlockNumber,
1118 ) -> ProviderResult<Vec<B256>> {
1119 self.static_file_provider.get_range_with_static_file_or_database(
1120 StaticFileSegment::Headers,
1121 start..end,
1122 |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1123 |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1124 |_| true,
1125 )
1126 }
1127}
1128
1129impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1130 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1131 let best_number = self.best_block_number()?;
1132 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1133 Ok(ChainInfo { best_hash, best_number })
1134 }
1135
1136 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1137 Ok(self
1140 .get_stage_checkpoint(StageId::Finish)?
1141 .map(|checkpoint| checkpoint.block_number)
1142 .unwrap_or_default())
1143 }
1144
1145 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1146 Ok(self
1147 .tx
1148 .cursor_read::<tables::CanonicalHeaders>()?
1149 .last()?
1150 .map(|(num, _)| num)
1151 .max(
1152 self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1153 )
1154 .unwrap_or_default())
1155 }
1156
1157 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1158 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1159 }
1160}
1161
1162impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1163 type Block = BlockTy<N>;
1164
1165 fn find_block_by_hash(
1166 &self,
1167 hash: B256,
1168 source: BlockSource,
1169 ) -> ProviderResult<Option<Self::Block>> {
1170 if source.is_canonical() {
1171 self.block(hash.into())
1172 } else {
1173 Ok(None)
1174 }
1175 }
1176
1177 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1183 if let Some(number) = self.convert_hash_or_number(id)? {
1184 if let Some(header) = self.header_by_number(number)? {
1185 let Some(transactions) = self.transactions_by_block(number.into())? else {
1190 return Ok(None)
1191 };
1192
1193 let body = self
1194 .storage
1195 .reader()
1196 .read_block_bodies(self, vec![(&header, transactions)])?
1197 .pop()
1198 .ok_or(ProviderError::InvalidStorageOutput)?;
1199
1200 return Ok(Some(Self::Block::new(header, body)))
1201 }
1202 }
1203
1204 Ok(None)
1205 }
1206 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1207 Ok(None)
1208 }
1209
1210 fn pending_block_and_receipts(
1211 &self,
1212 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1213 Ok(None)
1214 }
1215
1216 fn recovered_block(
1225 &self,
1226 id: BlockHashOrNumber,
1227 transaction_kind: TransactionVariant,
1228 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1229 self.recovered_block(
1230 id,
1231 transaction_kind,
1232 |block_number| self.header_by_number(block_number),
1233 |header, body, senders| {
1234 Self::Block::new(header, body)
1235 .try_into_recovered_unchecked(senders)
1239 .map(Some)
1240 .map_err(|_| ProviderError::SenderRecoveryError)
1241 },
1242 )
1243 }
1244
1245 fn sealed_block_with_senders(
1246 &self,
1247 id: BlockHashOrNumber,
1248 transaction_kind: TransactionVariant,
1249 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1250 self.recovered_block(
1251 id,
1252 transaction_kind,
1253 |block_number| self.sealed_header(block_number),
1254 |header, body, senders| {
1255 Self::Block::new_sealed(header, body)
1256 .try_with_senders_unchecked(senders)
1260 .map(Some)
1261 .map_err(|_| ProviderError::SenderRecoveryError)
1262 },
1263 )
1264 }
1265
1266 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1267 self.block_range(
1268 range,
1269 |range| self.headers_range(range),
1270 |header, body, _| Ok(Self::Block::new(header, body)),
1271 )
1272 }
1273
1274 fn block_with_senders_range(
1275 &self,
1276 range: RangeInclusive<BlockNumber>,
1277 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1278 self.block_with_senders_range(
1279 range,
1280 |range| self.headers_range(range),
1281 |header, body, senders| {
1282 Self::Block::new(header, body)
1283 .try_into_recovered_unchecked(senders)
1284 .map_err(|_| ProviderError::SenderRecoveryError)
1285 },
1286 )
1287 }
1288
1289 fn recovered_block_range(
1290 &self,
1291 range: RangeInclusive<BlockNumber>,
1292 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1293 self.block_with_senders_range(
1294 range,
1295 |range| self.sealed_headers_range(range),
1296 |header, body, senders| {
1297 Self::Block::new_sealed(header, body)
1298 .try_with_senders(senders)
1299 .map_err(|_| ProviderError::SenderRecoveryError)
1300 },
1301 )
1302 }
1303}
1304
1305impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1306 for DatabaseProvider<TX, N>
1307{
1308 fn transaction_hashes_by_range(
1311 &self,
1312 tx_range: Range<TxNumber>,
1313 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1314 self.static_file_provider.get_range_with_static_file_or_database(
1315 StaticFileSegment::Transactions,
1316 tx_range,
1317 |static_file, range, _| static_file.transaction_hashes_by_range(range),
1318 |tx_range, _| {
1319 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1320 let tx_range_size = tx_range.clone().count();
1321 let tx_walker = tx_cursor.walk_range(tx_range)?;
1322
1323 let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1324 let mut channels = Vec::with_capacity(chunk_size);
1325 let mut transaction_count = 0;
1326
1327 #[inline]
1328 fn calculate_hash<T>(
1329 entry: Result<(TxNumber, T), DatabaseError>,
1330 rlp_buf: &mut Vec<u8>,
1331 ) -> Result<(B256, TxNumber), Box<ProviderError>>
1332 where
1333 T: Encodable2718,
1334 {
1335 let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1336 tx.encode_2718(rlp_buf);
1337 Ok((keccak256(rlp_buf), tx_id))
1338 }
1339
1340 for chunk in &tx_walker.chunks(chunk_size) {
1341 let (tx, rx) = mpsc::channel();
1342 channels.push(rx);
1343
1344 let chunk: Vec<_> = chunk.collect();
1347 transaction_count += chunk.len();
1348
1349 rayon::spawn(move || {
1353 let mut rlp_buf = Vec::with_capacity(128);
1354 for entry in chunk {
1355 rlp_buf.clear();
1356 let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1357 }
1358 });
1359 }
1360 let mut tx_list = Vec::with_capacity(transaction_count);
1361
1362 for channel in channels {
1364 while let Ok(tx) = channel.recv() {
1365 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1366 tx_list.push((tx_hash, tx_id));
1367 }
1368 }
1369
1370 Ok(tx_list)
1371 },
1372 |_| true,
1373 )
1374 }
1375}
1376
1377impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1379 type Transaction = TxTy<N>;
1380
1381 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1382 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1383 }
1384
1385 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1386 self.static_file_provider.get_with_static_file_or_database(
1387 StaticFileSegment::Transactions,
1388 id,
1389 |static_file| static_file.transaction_by_id(id),
1390 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1391 )
1392 }
1393
1394 fn transaction_by_id_unhashed(
1395 &self,
1396 id: TxNumber,
1397 ) -> ProviderResult<Option<Self::Transaction>> {
1398 self.static_file_provider.get_with_static_file_or_database(
1399 StaticFileSegment::Transactions,
1400 id,
1401 |static_file| static_file.transaction_by_id_unhashed(id),
1402 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1403 )
1404 }
1405
1406 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1407 if let Some(id) = self.transaction_id(hash)? {
1408 Ok(self.transaction_by_id_unhashed(id)?)
1409 } else {
1410 Ok(None)
1411 }
1412 }
1413
1414 fn transaction_by_hash_with_meta(
1415 &self,
1416 tx_hash: TxHash,
1417 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1418 let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1419 if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1420 if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1421 if let Some(block_number) =
1422 transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1423 {
1424 if let Some(sealed_header) = self.sealed_header(block_number)? {
1425 let (header, block_hash) = sealed_header.split();
1426 if let Some(block_body) = self.block_body_indices(block_number)? {
1427 let index = transaction_id - block_body.first_tx_num();
1432
1433 let meta = TransactionMeta {
1434 tx_hash,
1435 index,
1436 block_hash,
1437 block_number,
1438 base_fee: header.base_fee_per_gas(),
1439 excess_blob_gas: header.excess_blob_gas(),
1440 timestamp: header.timestamp(),
1441 };
1442
1443 return Ok(Some((transaction, meta)))
1444 }
1445 }
1446 }
1447 }
1448 }
1449
1450 Ok(None)
1451 }
1452
1453 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1454 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1455 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1456 }
1457
1458 fn transactions_by_block(
1459 &self,
1460 id: BlockHashOrNumber,
1461 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1462 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1463
1464 if let Some(block_number) = self.convert_hash_or_number(id)? {
1465 if let Some(body) = self.block_body_indices(block_number)? {
1466 let tx_range = body.tx_num_range();
1467 return if tx_range.is_empty() {
1468 Ok(Some(Vec::new()))
1469 } else {
1470 Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1471 }
1472 }
1473 }
1474 Ok(None)
1475 }
1476
1477 fn transactions_by_block_range(
1478 &self,
1479 range: impl RangeBounds<BlockNumber>,
1480 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1481 let range = to_range(range);
1482 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1483
1484 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1485 .into_iter()
1486 .map(|body| {
1487 let tx_num_range = body.tx_num_range();
1488 if tx_num_range.is_empty() {
1489 Ok(Vec::new())
1490 } else {
1491 Ok(self
1492 .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1493 .into_iter()
1494 .collect())
1495 }
1496 })
1497 .collect()
1498 }
1499
1500 fn transactions_by_tx_range(
1501 &self,
1502 range: impl RangeBounds<TxNumber>,
1503 ) -> ProviderResult<Vec<Self::Transaction>> {
1504 self.transactions_by_tx_range_with_cursor(
1505 range,
1506 &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1507 )
1508 }
1509
1510 fn senders_by_tx_range(
1511 &self,
1512 range: impl RangeBounds<TxNumber>,
1513 ) -> ProviderResult<Vec<Address>> {
1514 self.cursor_read_collect::<tables::TransactionSenders>(range)
1515 }
1516
1517 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1518 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1519 }
1520}
1521
1522impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1523 type Receipt = ReceiptTy<N>;
1524
1525 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1526 self.static_file_provider.get_with_static_file_or_database(
1527 StaticFileSegment::Receipts,
1528 id,
1529 |static_file| static_file.receipt(id),
1530 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1531 )
1532 }
1533
1534 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1535 if let Some(id) = self.transaction_id(hash)? {
1536 self.receipt(id)
1537 } else {
1538 Ok(None)
1539 }
1540 }
1541
1542 fn receipts_by_block(
1543 &self,
1544 block: BlockHashOrNumber,
1545 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1546 if let Some(number) = self.convert_hash_or_number(block)? {
1547 if let Some(body) = self.block_body_indices(number)? {
1548 let tx_range = body.tx_num_range();
1549 return if tx_range.is_empty() {
1550 Ok(Some(Vec::new()))
1551 } else {
1552 self.receipts_by_tx_range(tx_range).map(Some)
1553 }
1554 }
1555 }
1556 Ok(None)
1557 }
1558
1559 fn receipts_by_tx_range(
1560 &self,
1561 range: impl RangeBounds<TxNumber>,
1562 ) -> ProviderResult<Vec<Self::Receipt>> {
1563 self.static_file_provider.get_range_with_static_file_or_database(
1564 StaticFileSegment::Receipts,
1565 to_range(range),
1566 |static_file, range, _| static_file.receipts_by_tx_range(range),
1567 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1568 |_| true,
1569 )
1570 }
1571
1572 fn receipts_by_block_range(
1573 &self,
1574 block_range: RangeInclusive<BlockNumber>,
1575 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1576 if block_range.is_empty() {
1577 return Ok(Vec::new());
1578 }
1579
1580 let mut block_body_indices = Vec::new();
1582 for block_num in block_range {
1583 if let Some(indices) = self.block_body_indices(block_num)? {
1584 block_body_indices.push(indices);
1585 } else {
1586 block_body_indices.push(StoredBlockBodyIndices::default());
1588 }
1589 }
1590
1591 if block_body_indices.is_empty() {
1592 return Ok(Vec::new());
1593 }
1594
1595 let non_empty_blocks: Vec<_> =
1597 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1598
1599 if non_empty_blocks.is_empty() {
1600 return Ok(vec![Vec::new(); block_body_indices.len()]);
1602 }
1603
1604 let first_tx = non_empty_blocks[0].first_tx_num();
1606 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1607
1608 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1610 let mut receipts_iter = all_receipts.into_iter();
1611
1612 let mut result = Vec::with_capacity(block_body_indices.len());
1614 for indices in &block_body_indices {
1615 if indices.tx_count == 0 {
1616 result.push(Vec::new());
1617 } else {
1618 let block_receipts =
1619 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1620 result.push(block_receipts);
1621 }
1622 }
1623
1624 Ok(result)
1625 }
1626}
1627
1628impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1629 for DatabaseProvider<TX, N>
1630{
1631 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1632 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1633 }
1634
1635 fn block_body_indices_range(
1636 &self,
1637 range: RangeInclusive<BlockNumber>,
1638 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1639 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1640 }
1641}
1642
1643impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1644 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1645 Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1646 }
1647
1648 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1650 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1651 }
1652
1653 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1654 self.tx
1655 .cursor_read::<tables::StageCheckpoints>()?
1656 .walk(None)?
1657 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1658 .map_err(ProviderError::Database)
1659 }
1660}
1661
1662impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1663 fn save_stage_checkpoint(
1665 &self,
1666 id: StageId,
1667 checkpoint: StageCheckpoint,
1668 ) -> ProviderResult<()> {
1669 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1670 }
1671
1672 fn save_stage_checkpoint_progress(
1674 &self,
1675 id: StageId,
1676 checkpoint: Vec<u8>,
1677 ) -> ProviderResult<()> {
1678 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1679 }
1680
1681 fn update_pipeline_stages(
1682 &self,
1683 block_number: BlockNumber,
1684 drop_stage_checkpoint: bool,
1685 ) -> ProviderResult<()> {
1686 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1688 for stage_id in StageId::ALL {
1689 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1690 cursor.upsert(
1691 stage_id.to_string(),
1692 &StageCheckpoint {
1693 block_number,
1694 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1695 },
1696 )?;
1697 }
1698
1699 Ok(())
1700 }
1701}
1702
1703impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1704 fn plain_state_storages(
1705 &self,
1706 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1707 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1708 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1709
1710 addresses_with_keys
1711 .into_iter()
1712 .map(|(address, storage)| {
1713 storage
1714 .into_iter()
1715 .map(|key| -> ProviderResult<_> {
1716 Ok(plain_storage
1717 .seek_by_key_subkey(address, key)?
1718 .filter(|v| v.key == key)
1719 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1720 })
1721 .collect::<ProviderResult<Vec<_>>>()
1722 .map(|storage| (address, storage))
1723 })
1724 .collect::<ProviderResult<Vec<(_, _)>>>()
1725 }
1726
1727 fn changed_storages_with_range(
1728 &self,
1729 range: RangeInclusive<BlockNumber>,
1730 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1731 self.tx
1732 .cursor_read::<tables::StorageChangeSets>()?
1733 .walk_range(BlockNumberAddress::range(range))?
1734 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1737 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1738 accounts.entry(address).or_default().insert(storage_entry.key);
1739 Ok(accounts)
1740 })
1741 }
1742
1743 fn changed_storages_and_blocks_with_range(
1744 &self,
1745 range: RangeInclusive<BlockNumber>,
1746 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1747 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1748
1749 let storage_changeset_lists =
1750 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1751 BTreeMap::new(),
1752 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1753 let (index, storage) = entry?;
1754 storages
1755 .entry((index.address(), storage.key))
1756 .or_default()
1757 .push(index.block_number());
1758 Ok(storages)
1759 },
1760 )?;
1761
1762 Ok(storage_changeset_lists)
1763 }
1764}
1765
1766impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1767 for DatabaseProvider<TX, N>
1768{
1769 type Receipt = ReceiptTy<N>;
1770
1771 fn write_state(
1772 &self,
1773 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1774 is_value_known: OriginalValuesKnown,
1775 write_receipts_to: StorageLocation,
1776 ) -> ProviderResult<()> {
1777 let first_block = execution_outcome.first_block();
1778 let block_count = execution_outcome.len() as u64;
1779 let last_block = execution_outcome.last_block();
1780 let block_range = first_block..=last_block;
1781
1782 let tip = self.last_block_number()?.max(last_block);
1783
1784 let (plain_state, reverts) =
1785 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1786
1787 self.write_state_reverts(reverts, first_block)?;
1788 self.write_state_changes(plain_state)?;
1789
1790 let block_indices: Vec<_> = self
1792 .block_body_indices_range(block_range)?
1793 .into_iter()
1794 .map(|b| b.first_tx_num)
1795 .collect();
1796
1797 if block_indices.len() < block_count as usize {
1799 let missing_blocks = block_count - block_indices.len() as u64;
1800 return Err(ProviderError::BlockBodyIndicesNotFound(
1801 last_block.saturating_sub(missing_blocks - 1),
1802 ));
1803 }
1804
1805 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1806
1807 let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1812 .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1813 .transpose()?;
1814
1815 let mut receipts_static_writer = (write_receipts_to.static_files() &&
1819 !has_receipts_pruning)
1820 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1821 .transpose()?;
1822
1823 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1824 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1825
1826 let prunable_receipts =
1829 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1830
1831 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1833 for (_, addresses) in contract_log_pruner.range(..first_block) {
1834 allowed_addresses.extend(addresses.iter().copied());
1835 }
1836
1837 for (idx, (receipts, first_tx_index)) in
1838 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1839 {
1840 let block_number = first_block + idx as u64;
1841
1842 if let Some(writer) = receipts_static_writer.as_mut() {
1844 writer.increment_block(block_number)?;
1845 }
1846
1847 if prunable_receipts &&
1849 self.prune_modes
1850 .receipts
1851 .is_some_and(|mode| mode.should_prune(block_number, tip))
1852 {
1853 continue
1854 }
1855
1856 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1858 allowed_addresses.extend(new_addresses.iter().copied());
1859 }
1860
1861 for (idx, receipt) in receipts.iter().enumerate() {
1862 let receipt_idx = first_tx_index + idx as u64;
1863 if prunable_receipts &&
1866 has_contract_log_filter &&
1867 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1868 {
1869 continue
1870 }
1871
1872 if let Some(writer) = &mut receipts_static_writer {
1873 writer.append_receipt(receipt_idx, receipt)?;
1874 }
1875
1876 if let Some(cursor) = &mut receipts_cursor {
1877 cursor.append(receipt_idx, receipt)?;
1878 }
1879 }
1880 }
1881
1882 Ok(())
1883 }
1884
1885 fn write_state_reverts(
1886 &self,
1887 reverts: PlainStateReverts,
1888 first_block: BlockNumber,
1889 ) -> ProviderResult<()> {
1890 tracing::trace!("Writing storage changes");
1892 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1893 let mut storage_changeset_cursor =
1894 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1895 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1896 let block_number = first_block + block_index as BlockNumber;
1897
1898 tracing::trace!(block_number, "Writing block change");
1899 storage_changes.par_sort_unstable_by_key(|a| a.address);
1901 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1902 let storage_id = BlockNumberAddress((block_number, address));
1903
1904 let mut storage = storage_revert
1905 .into_iter()
1906 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1907 .collect::<Vec<_>>();
1908 storage.par_sort_unstable_by_key(|a| a.0);
1910
1911 let mut wiped_storage = Vec::new();
1915 if wiped {
1916 tracing::trace!(?address, "Wiping storage");
1917 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1918 wiped_storage.push((entry.key, entry.value));
1919 while let Some(entry) = storages_cursor.next_dup_val()? {
1920 wiped_storage.push((entry.key, entry.value))
1921 }
1922 }
1923 }
1924
1925 tracing::trace!(?address, ?storage, "Writing storage reverts");
1926 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1927 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1928 }
1929 }
1930 }
1931
1932 tracing::trace!("Writing account changes");
1934 let mut account_changeset_cursor =
1935 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1936
1937 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1938 let block_number = first_block + block_index as BlockNumber;
1939 account_block_reverts.par_sort_by_key(|a| a.0);
1941
1942 for (address, info) in account_block_reverts {
1943 account_changeset_cursor.append_dup(
1944 block_number,
1945 AccountBeforeTx { address, info: info.map(Into::into) },
1946 )?;
1947 }
1948 }
1949
1950 Ok(())
1951 }
1952
1953 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1954 changes.accounts.par_sort_by_key(|a| a.0);
1957 changes.storage.par_sort_by_key(|a| a.address);
1958 changes.contracts.par_sort_by_key(|a| a.0);
1959
1960 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1962 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1963 for (address, account) in changes.accounts {
1965 if let Some(account) = account {
1966 tracing::trace!(?address, "Updating plain state account");
1967 accounts_cursor.upsert(address, &account.into())?;
1968 } else if accounts_cursor.seek_exact(address)?.is_some() {
1969 tracing::trace!(?address, "Deleting plain state account");
1970 accounts_cursor.delete_current()?;
1971 }
1972 }
1973
1974 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1976 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1977 for (hash, bytecode) in changes.contracts {
1978 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1979 }
1980
1981 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1983 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1984 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1985 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1987 storages_cursor.delete_current_duplicates()?;
1988 }
1989 let mut storage = storage
1991 .into_iter()
1992 .map(|(k, value)| StorageEntry { key: k.into(), value })
1993 .collect::<Vec<_>>();
1994 storage.par_sort_unstable_by_key(|a| a.key);
1996
1997 for entry in storage {
1998 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1999 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2000 if db_entry.key == entry.key {
2001 storages_cursor.delete_current()?;
2002 }
2003 }
2004
2005 if !entry.value.is_zero() {
2006 storages_cursor.upsert(address, &entry)?;
2007 }
2008 }
2009 }
2010
2011 Ok(())
2012 }
2013
2014 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2015 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2017 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2018 if let Some(account) = account {
2019 hashed_accounts_cursor.upsert(hashed_address, &account)?;
2020 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2021 hashed_accounts_cursor.delete_current()?;
2022 }
2023 }
2024
2025 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2027 let mut hashed_storage_cursor =
2028 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2029 for (hashed_address, storage) in sorted_storages {
2030 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2031 hashed_storage_cursor.delete_current_duplicates()?;
2032 }
2033
2034 for (hashed_slot, value) in storage.storage_slots_sorted() {
2035 let entry = StorageEntry { key: hashed_slot, value };
2036 if let Some(db_entry) =
2037 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2038 {
2039 if db_entry.key == entry.key {
2040 hashed_storage_cursor.delete_current()?;
2041 }
2042 }
2043
2044 if !entry.value.is_zero() {
2045 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2046 }
2047 }
2048 }
2049
2050 Ok(())
2051 }
2052
2053 fn remove_state_above(
2075 &self,
2076 block: BlockNumber,
2077 remove_receipts_from: StorageLocation,
2078 ) -> ProviderResult<()> {
2079 let range = block + 1..=self.last_block_number()?;
2080
2081 if range.is_empty() {
2082 return Ok(());
2083 }
2084
2085 let block_bodies = self.block_body_indices_range(range.clone())?;
2087
2088 let from_transaction_num =
2090 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2091
2092 let storage_range = BlockNumberAddress::range(range.clone());
2093
2094 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2095 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2096
2097 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2102 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2103
2104 let (state, _) = self.populate_bundle_state(
2105 account_changeset,
2106 storage_changeset,
2107 &mut plain_accounts_cursor,
2108 &mut plain_storage_cursor,
2109 )?;
2110
2111 for (address, (old_account, new_account, storage)) in &state {
2113 if old_account != new_account {
2115 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2116 if let Some(account) = old_account {
2117 plain_accounts_cursor.upsert(*address, account)?;
2118 } else if existing_entry.is_some() {
2119 plain_accounts_cursor.delete_current()?;
2120 }
2121 }
2122
2123 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2125 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2126 if plain_storage_cursor
2129 .seek_by_key_subkey(*address, *storage_key)?
2130 .filter(|s| s.key == *storage_key)
2131 .is_some()
2132 {
2133 plain_storage_cursor.delete_current()?
2134 }
2135
2136 if !old_storage_value.is_zero() {
2138 plain_storage_cursor.upsert(*address, &storage_entry)?;
2139 }
2140 }
2141 }
2142
2143 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2144
2145 Ok(())
2146 }
2147
2148 fn take_state_above(
2170 &self,
2171 block: BlockNumber,
2172 remove_receipts_from: StorageLocation,
2173 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2174 let range = block + 1..=self.last_block_number()?;
2175
2176 if range.is_empty() {
2177 return Ok(ExecutionOutcome::default())
2178 }
2179 let start_block_number = *range.start();
2180
2181 let block_bodies = self.block_body_indices_range(range.clone())?;
2183
2184 let from_transaction_num =
2186 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2187 let to_transaction_num =
2188 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2189
2190 let storage_range = BlockNumberAddress::range(range.clone());
2191
2192 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2193 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2194
2195 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2200 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2201
2202 let (state, reverts) = self.populate_bundle_state(
2205 account_changeset,
2206 storage_changeset,
2207 &mut plain_accounts_cursor,
2208 &mut plain_storage_cursor,
2209 )?;
2210
2211 for (address, (old_account, new_account, storage)) in &state {
2213 if old_account != new_account {
2215 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2216 if let Some(account) = old_account {
2217 plain_accounts_cursor.upsert(*address, account)?;
2218 } else if existing_entry.is_some() {
2219 plain_accounts_cursor.delete_current()?;
2220 }
2221 }
2222
2223 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2225 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2226 if plain_storage_cursor
2229 .seek_by_key_subkey(*address, *storage_key)?
2230 .filter(|s| s.key == *storage_key)
2231 .is_some()
2232 {
2233 plain_storage_cursor.delete_current()?
2234 }
2235
2236 if !old_storage_value.is_zero() {
2238 plain_storage_cursor.upsert(*address, &storage_entry)?;
2239 }
2240 }
2241 }
2242
2243 let mut receipts_iter = self
2245 .static_file_provider
2246 .get_range_with_static_file_or_database(
2247 StaticFileSegment::Receipts,
2248 from_transaction_num..to_transaction_num + 1,
2249 |static_file, range, _| {
2250 static_file
2251 .receipts_by_tx_range(range.clone())
2252 .map(|r| range.into_iter().zip(r).collect())
2253 },
2254 |range, _| {
2255 self.tx
2256 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2257 .walk_range(range)?
2258 .map(|r| r.map_err(Into::into))
2259 .collect()
2260 },
2261 |_| true,
2262 )?
2263 .into_iter()
2264 .peekable();
2265
2266 let mut receipts = Vec::with_capacity(block_bodies.len());
2267 for block_body in block_bodies {
2269 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2270 for num in block_body.tx_num_range() {
2271 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2272 block_receipts.push(receipts_iter.next().unwrap().1);
2273 }
2274 }
2275 receipts.push(block_receipts);
2276 }
2277
2278 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2279
2280 Ok(ExecutionOutcome::new_init(
2281 state,
2282 reverts,
2283 Vec::new(),
2284 receipts,
2285 start_block_number,
2286 Vec::new(),
2287 ))
2288 }
2289}
2290
2291impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2292 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2294 if trie_updates.is_empty() {
2295 return Ok(0)
2296 }
2297
2298 let mut num_entries = 0;
2300
2301 let mut account_updates = trie_updates
2303 .removed_nodes_ref()
2304 .iter()
2305 .filter_map(|n| {
2306 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2307 })
2308 .collect::<Vec<_>>();
2309 account_updates.extend(
2310 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2311 );
2312 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2314
2315 let tx = self.tx_ref();
2316 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2317 for (key, updated_node) in account_updates {
2318 let nibbles = StoredNibbles(*key);
2319 match updated_node {
2320 Some(node) => {
2321 if !nibbles.0.is_empty() {
2322 num_entries += 1;
2323 account_trie_cursor.upsert(nibbles, node)?;
2324 }
2325 }
2326 None => {
2327 num_entries += 1;
2328 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2329 account_trie_cursor.delete_current()?;
2330 }
2331 }
2332 }
2333 }
2334
2335 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2336
2337 Ok(num_entries)
2338 }
2339}
2340
2341impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2342 fn write_storage_trie_updates(
2345 &self,
2346 storage_tries: &B256Map<StorageTrieUpdates>,
2347 ) -> ProviderResult<usize> {
2348 let mut num_entries = 0;
2349 let mut storage_tries = Vec::from_iter(storage_tries);
2350 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2351 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2352 for (hashed_address, storage_trie_updates) in storage_tries {
2353 let mut db_storage_trie_cursor =
2354 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2355 num_entries +=
2356 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2357 cursor = db_storage_trie_cursor.cursor;
2358 }
2359
2360 Ok(num_entries)
2361 }
2362
2363 fn write_individual_storage_trie_updates(
2364 &self,
2365 hashed_address: B256,
2366 updates: &StorageTrieUpdates,
2367 ) -> ProviderResult<usize> {
2368 if updates.is_empty() {
2369 return Ok(0)
2370 }
2371
2372 let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2373 let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2374 Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2375 }
2376}
2377
2378impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2379 fn unwind_account_hashing<'a>(
2380 &self,
2381 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2382 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2383 let hashed_accounts = changesets
2387 .into_iter()
2388 .map(|(_, e)| (keccak256(e.address), e.info))
2389 .collect::<Vec<_>>()
2390 .into_iter()
2391 .rev()
2392 .collect::<BTreeMap<_, _>>();
2393
2394 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2396 for (hashed_address, account) in &hashed_accounts {
2397 if let Some(account) = account {
2398 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2399 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2400 hashed_accounts_cursor.delete_current()?;
2401 }
2402 }
2403
2404 Ok(hashed_accounts)
2405 }
2406
2407 fn unwind_account_hashing_range(
2408 &self,
2409 range: impl RangeBounds<BlockNumber>,
2410 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2411 let changesets = self
2412 .tx
2413 .cursor_read::<tables::AccountChangeSets>()?
2414 .walk_range(range)?
2415 .collect::<Result<Vec<_>, _>>()?;
2416 self.unwind_account_hashing(changesets.iter())
2417 }
2418
2419 fn insert_account_for_hashing(
2420 &self,
2421 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2422 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2423 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2424 let hashed_accounts =
2425 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2426 for (hashed_address, account) in &hashed_accounts {
2427 if let Some(account) = account {
2428 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2429 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2430 hashed_accounts_cursor.delete_current()?;
2431 }
2432 }
2433 Ok(hashed_accounts)
2434 }
2435
2436 fn unwind_storage_hashing(
2437 &self,
2438 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2439 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2440 let mut hashed_storages = changesets
2442 .into_iter()
2443 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2444 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2445 })
2446 .collect::<Vec<_>>();
2447 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2448
2449 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2451 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2452 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2453 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2454 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2455
2456 if hashed_storage
2457 .seek_by_key_subkey(hashed_address, key)?
2458 .filter(|entry| entry.key == key)
2459 .is_some()
2460 {
2461 hashed_storage.delete_current()?;
2462 }
2463
2464 if !value.is_zero() {
2465 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2466 }
2467 }
2468 Ok(hashed_storage_keys)
2469 }
2470
2471 fn unwind_storage_hashing_range(
2472 &self,
2473 range: impl RangeBounds<BlockNumberAddress>,
2474 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2475 let changesets = self
2476 .tx
2477 .cursor_read::<tables::StorageChangeSets>()?
2478 .walk_range(range)?
2479 .collect::<Result<Vec<_>, _>>()?;
2480 self.unwind_storage_hashing(changesets.into_iter())
2481 }
2482
2483 fn insert_storage_for_hashing(
2484 &self,
2485 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2486 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2487 let hashed_storages =
2489 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2490 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2491 map.insert(keccak256(entry.key), entry.value);
2492 map
2493 });
2494 map.insert(keccak256(address), storage);
2495 map
2496 });
2497
2498 let hashed_storage_keys = hashed_storages
2499 .iter()
2500 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2501 .collect();
2502
2503 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2504 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2507 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2508 if hashed_storage_cursor
2509 .seek_by_key_subkey(hashed_address, key)?
2510 .filter(|entry| entry.key == key)
2511 .is_some()
2512 {
2513 hashed_storage_cursor.delete_current()?;
2514 }
2515
2516 if !value.is_zero() {
2517 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2518 }
2519 Ok(())
2520 })
2521 })?;
2522
2523 Ok(hashed_storage_keys)
2524 }
2525
2526 fn insert_hashes(
2527 &self,
2528 range: RangeInclusive<BlockNumber>,
2529 end_block_hash: B256,
2530 expected_state_root: B256,
2531 ) -> ProviderResult<()> {
2532 let mut account_prefix_set = PrefixSetMut::default();
2534 let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2535 let mut destroyed_accounts = HashSet::default();
2536
2537 let mut durations_recorder = metrics::DurationsRecorder::default();
2538
2539 {
2541 let lists = self.changed_storages_with_range(range.clone())?;
2542 let storages = self.plain_state_storages(lists)?;
2543 let storage_entries = self.insert_storage_for_hashing(storages)?;
2544 for (hashed_address, hashed_slots) in storage_entries {
2545 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2546 for slot in hashed_slots {
2547 storage_prefix_sets
2548 .entry(hashed_address)
2549 .or_default()
2550 .insert(Nibbles::unpack(slot));
2551 }
2552 }
2553 }
2554 durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2555
2556 {
2558 let lists = self.changed_accounts_with_range(range.clone())?;
2559 let accounts = self.basic_accounts(lists)?;
2560 let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2561 for (hashed_address, account) in hashed_addresses {
2562 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2563 if account.is_none() {
2564 destroyed_accounts.insert(hashed_address);
2565 }
2566 }
2567 }
2568 durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2569
2570 {
2572 let prefix_sets = TriePrefixSets {
2575 account_prefix_set: account_prefix_set.freeze(),
2576 storage_prefix_sets: storage_prefix_sets
2577 .into_iter()
2578 .map(|(k, v)| (k, v.freeze()))
2579 .collect(),
2580 destroyed_accounts,
2581 };
2582 let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2583 .with_prefix_sets(prefix_sets)
2584 .root_with_updates()
2585 .map_err(reth_db_api::DatabaseError::from)?;
2586 if state_root != expected_state_root {
2587 return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2588 root: GotExpected { got: state_root, expected: expected_state_root },
2589 block_number: *range.end(),
2590 block_hash: end_block_hash,
2591 })))
2592 }
2593 self.write_trie_updates(&trie_updates)?;
2594 }
2595 durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2596
2597 debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2598
2599 Ok(())
2600 }
2601}
2602
2603impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2604 fn unwind_account_history_indices<'a>(
2605 &self,
2606 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2607 ) -> ProviderResult<usize> {
2608 let mut last_indices = changesets
2609 .into_iter()
2610 .map(|(index, account)| (account.address, *index))
2611 .collect::<Vec<_>>();
2612 last_indices.sort_by_key(|(a, _)| *a);
2613
2614 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2616 for &(address, rem_index) in &last_indices {
2617 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2618 &mut cursor,
2619 ShardedKey::last(address),
2620 rem_index,
2621 |sharded_key| sharded_key.key == address,
2622 )?;
2623
2624 if !partial_shard.is_empty() {
2627 cursor.insert(
2628 ShardedKey::last(address),
2629 &BlockNumberList::new_pre_sorted(partial_shard),
2630 )?;
2631 }
2632 }
2633
2634 let changesets = last_indices.len();
2635 Ok(changesets)
2636 }
2637
2638 fn unwind_account_history_indices_range(
2639 &self,
2640 range: impl RangeBounds<BlockNumber>,
2641 ) -> ProviderResult<usize> {
2642 let changesets = self
2643 .tx
2644 .cursor_read::<tables::AccountChangeSets>()?
2645 .walk_range(range)?
2646 .collect::<Result<Vec<_>, _>>()?;
2647 self.unwind_account_history_indices(changesets.iter())
2648 }
2649
2650 fn insert_account_history_index(
2651 &self,
2652 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2653 ) -> ProviderResult<()> {
2654 self.append_history_index::<_, tables::AccountsHistory>(
2655 account_transitions,
2656 ShardedKey::new,
2657 )
2658 }
2659
2660 fn unwind_storage_history_indices(
2661 &self,
2662 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2663 ) -> ProviderResult<usize> {
2664 let mut storage_changesets = changesets
2665 .into_iter()
2666 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2667 .collect::<Vec<_>>();
2668 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2669
2670 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2671 for &(address, storage_key, rem_index) in &storage_changesets {
2672 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2673 &mut cursor,
2674 StorageShardedKey::last(address, storage_key),
2675 rem_index,
2676 |storage_sharded_key| {
2677 storage_sharded_key.address == address &&
2678 storage_sharded_key.sharded_key.key == storage_key
2679 },
2680 )?;
2681
2682 if !partial_shard.is_empty() {
2685 cursor.insert(
2686 StorageShardedKey::last(address, storage_key),
2687 &BlockNumberList::new_pre_sorted(partial_shard),
2688 )?;
2689 }
2690 }
2691
2692 let changesets = storage_changesets.len();
2693 Ok(changesets)
2694 }
2695
2696 fn unwind_storage_history_indices_range(
2697 &self,
2698 range: impl RangeBounds<BlockNumberAddress>,
2699 ) -> ProviderResult<usize> {
2700 let changesets = self
2701 .tx
2702 .cursor_read::<tables::StorageChangeSets>()?
2703 .walk_range(range)?
2704 .collect::<Result<Vec<_>, _>>()?;
2705 self.unwind_storage_history_indices(changesets.into_iter())
2706 }
2707
2708 fn insert_storage_history_index(
2709 &self,
2710 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2711 ) -> ProviderResult<()> {
2712 self.append_history_index::<_, tables::StoragesHistory>(
2713 storage_transitions,
2714 |(address, storage_key), highest_block_number| {
2715 StorageShardedKey::new(address, storage_key, highest_block_number)
2716 },
2717 )
2718 }
2719
2720 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2721 {
2723 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2724 self.insert_account_history_index(indices)?;
2725 }
2726
2727 {
2729 let indices = self.changed_storages_and_blocks_with_range(range)?;
2730 self.insert_storage_history_index(indices)?;
2731 }
2732
2733 Ok(())
2734 }
2735}
2736
2737impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2738 for DatabaseProvider<TX, N>
2739{
2740 fn take_block_and_execution_above(
2741 &self,
2742 block: BlockNumber,
2743 remove_from: StorageLocation,
2744 ) -> ProviderResult<Chain<Self::Primitives>> {
2745 let range = block + 1..=self.last_block_number()?;
2746
2747 self.unwind_trie_state_range(range.clone())?;
2748
2749 let execution_state = self.take_state_above(block, remove_from)?;
2751
2752 let blocks = self.recovered_block_range(range)?;
2753
2754 self.remove_blocks_above(block, remove_from)?;
2757
2758 self.update_pipeline_stages(block, true)?;
2760
2761 Ok(Chain::new(blocks, execution_state, None))
2762 }
2763
2764 fn remove_block_and_execution_above(
2765 &self,
2766 block: BlockNumber,
2767 remove_from: StorageLocation,
2768 ) -> ProviderResult<()> {
2769 let range = block + 1..=self.last_block_number()?;
2770
2771 self.unwind_trie_state_range(range)?;
2772
2773 self.remove_state_above(block, remove_from)?;
2775
2776 self.remove_blocks_above(block, remove_from)?;
2779
2780 self.update_pipeline_stages(block, true)?;
2782
2783 Ok(())
2784 }
2785}
2786
2787impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2788 for DatabaseProvider<TX, N>
2789{
2790 type Block = BlockTy<N>;
2791 type Receipt = ReceiptTy<N>;
2792
2793 fn insert_block(
2814 &self,
2815 block: RecoveredBlock<Self::Block>,
2816 write_to: StorageLocation,
2817 ) -> ProviderResult<StoredBlockBodyIndices> {
2818 let block_number = block.number();
2819
2820 let mut durations_recorder = metrics::DurationsRecorder::default();
2821
2822 let ttd = if block_number == 0 {
2824 block.header().difficulty()
2825 } else {
2826 let parent_block_number = block_number - 1;
2827 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2828 durations_recorder.record_relative(metrics::Action::GetParentTD);
2829 parent_ttd + block.header().difficulty()
2830 };
2831
2832 if write_to.database() {
2833 self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2834 durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2835
2836 self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2838 durations_recorder.record_relative(metrics::Action::InsertHeaders);
2839
2840 self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2841 durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2842 }
2843
2844 if write_to.static_files() {
2845 let mut writer =
2846 self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2847 writer.append_header(block.header(), ttd, &block.hash())?;
2848 }
2849
2850 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2851 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2852
2853 let mut next_tx_num = self
2854 .tx
2855 .cursor_read::<tables::TransactionBlocks>()?
2856 .last()?
2857 .map(|(n, _)| n + 1)
2858 .unwrap_or_default();
2859 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2860 let first_tx_num = next_tx_num;
2861
2862 let tx_count = block.body().transaction_count() as u64;
2863
2864 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2866 let hash = transaction.tx_hash();
2867
2868 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2869 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2870 }
2871
2872 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2873 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2874 }
2875 next_tx_num += 1;
2876 }
2877
2878 self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2879
2880 debug!(
2881 target: "providers::db",
2882 ?block_number,
2883 actions = ?durations_recorder.actions,
2884 "Inserted block"
2885 );
2886
2887 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2888 }
2889
2890 fn append_block_bodies(
2891 &self,
2892 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2893 write_to: StorageLocation,
2894 ) -> ProviderResult<()> {
2895 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2896
2897 let mut tx_static_writer = write_to
2899 .static_files()
2900 .then(|| {
2901 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2902 })
2903 .transpose()?;
2904
2905 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2906 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2907
2908 let mut tx_cursor = write_to
2910 .database()
2911 .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2912 .transpose()?;
2913
2914 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2916
2917 for (block_number, body) in &bodies {
2918 if let Some(writer) = tx_static_writer.as_mut() {
2920 writer.increment_block(*block_number)?;
2921 }
2922
2923 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2924 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2925
2926 let mut durations_recorder = metrics::DurationsRecorder::default();
2927
2928 block_indices_cursor.append(*block_number, &block_indices)?;
2930
2931 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2932
2933 let Some(body) = body else { continue };
2934
2935 if !body.transactions().is_empty() {
2937 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2938 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2939 }
2940
2941 for transaction in body.transactions() {
2943 if let Some(writer) = tx_static_writer.as_mut() {
2944 writer.append_transaction(next_tx_num, transaction)?;
2945 }
2946 if let Some(cursor) = tx_cursor.as_mut() {
2947 cursor.append(next_tx_num, transaction)?;
2948 }
2949
2950 next_tx_num += 1;
2952 }
2953 }
2954
2955 self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2956
2957 Ok(())
2958 }
2959
2960 fn remove_blocks_above(
2961 &self,
2962 block: BlockNumber,
2963 remove_from: StorageLocation,
2964 ) -> ProviderResult<()> {
2965 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2966 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2967 }
2968
2969 self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2972 self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2973 self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2974
2975 let unwind_tx_from = self
2977 .block_body_indices(block)?
2978 .map(|b| b.next_tx_num())
2979 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2980
2981 let unwind_tx_to = self
2983 .tx
2984 .cursor_read::<tables::BlockBodyIndices>()?
2985 .last()?
2986 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2988 .1
2989 .last_tx_num();
2990
2991 if unwind_tx_from <= unwind_tx_to {
2992 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2993 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2994 }
2995 }
2996
2997 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
2998
2999 self.remove_bodies_above(block, remove_from)?;
3000
3001 Ok(())
3002 }
3003
3004 fn remove_bodies_above(
3005 &self,
3006 block: BlockNumber,
3007 remove_from: StorageLocation,
3008 ) -> ProviderResult<()> {
3009 self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3010
3011 let unwind_tx_from = self
3013 .block_body_indices(block)?
3014 .map(|b| b.next_tx_num())
3015 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3016
3017 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3018 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3019
3020 if remove_from.database() {
3021 self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3022 }
3023
3024 if remove_from.static_files() {
3025 let static_file_tx_num = self
3026 .static_file_provider
3027 .get_highest_static_file_tx(StaticFileSegment::Transactions);
3028
3029 let to_delete = static_file_tx_num
3030 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3031 .unwrap_or_default();
3032
3033 self.static_file_provider
3034 .latest_writer(StaticFileSegment::Transactions)?
3035 .prune_transactions(to_delete, block)?;
3036 }
3037
3038 Ok(())
3039 }
3040
3041 fn append_blocks_with_state(
3043 &self,
3044 blocks: Vec<RecoveredBlock<Self::Block>>,
3045 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3046 hashed_state: HashedPostStateSorted,
3047 trie_updates: TrieUpdates,
3048 ) -> ProviderResult<()> {
3049 if blocks.is_empty() {
3050 debug!(target: "providers::db", "Attempted to append empty block range");
3051 return Ok(())
3052 }
3053
3054 let first_number = blocks[0].number();
3057
3058 let last_block_number = blocks[blocks.len() - 1].number();
3061
3062 let mut durations_recorder = metrics::DurationsRecorder::default();
3063
3064 for block in blocks {
3066 self.insert_block(block, StorageLocation::Database)?;
3067 durations_recorder.record_relative(metrics::Action::InsertBlock);
3068 }
3069
3070 self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3071 durations_recorder.record_relative(metrics::Action::InsertState);
3072
3073 self.write_hashed_state(&hashed_state)?;
3075 self.write_trie_updates(&trie_updates)?;
3076 durations_recorder.record_relative(metrics::Action::InsertHashes);
3077
3078 self.update_history_indices(first_number..=last_block_number)?;
3079 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3080
3081 self.update_pipeline_stages(last_block_number, false)?;
3083 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3084
3085 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3086
3087 Ok(())
3088 }
3089}
3090
3091impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3092 fn get_prune_checkpoint(
3093 &self,
3094 segment: PruneSegment,
3095 ) -> ProviderResult<Option<PruneCheckpoint>> {
3096 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3097 }
3098
3099 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3100 Ok(self
3101 .tx
3102 .cursor_read::<tables::PruneCheckpoints>()?
3103 .walk(None)?
3104 .collect::<Result<_, _>>()?)
3105 }
3106}
3107
3108impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3109 fn save_prune_checkpoint(
3110 &self,
3111 segment: PruneSegment,
3112 checkpoint: PruneCheckpoint,
3113 ) -> ProviderResult<()> {
3114 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3115 }
3116}
3117
3118impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3119 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3120 let db_entries = self.tx.entries::<T>()?;
3121 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3122 Ok(entries) => entries,
3123 Err(ProviderError::UnsupportedProvider) => 0,
3124 Err(err) => return Err(err),
3125 };
3126
3127 Ok(db_entries + static_file_entries)
3128 }
3129}
3130
3131impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3132 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3133 let mut finalized_blocks = self
3134 .tx
3135 .cursor_read::<tables::ChainState>()?
3136 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3137 .take(1)
3138 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3139
3140 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3141 Ok(last_finalized_block_number)
3142 }
3143
3144 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3145 let mut finalized_blocks = self
3146 .tx
3147 .cursor_read::<tables::ChainState>()?
3148 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3149 .take(1)
3150 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3151
3152 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3153 Ok(last_finalized_block_number)
3154 }
3155}
3156
3157impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3158 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3159 Ok(self
3160 .tx
3161 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3162 }
3163
3164 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3165 Ok(self
3166 .tx
3167 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3168 }
3169}
3170
3171impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3172 type Tx = TX;
3173
3174 fn tx_ref(&self) -> &Self::Tx {
3175 &self.tx
3176 }
3177
3178 fn tx_mut(&mut self) -> &mut Self::Tx {
3179 &mut self.tx
3180 }
3181
3182 fn into_tx(self) -> Self::Tx {
3183 self.tx
3184 }
3185
3186 fn prune_modes_ref(&self) -> &PruneModes {
3187 self.prune_modes_ref()
3188 }
3189}
3190
3191#[cfg(test)]
3192mod tests {
3193 use super::*;
3194 use crate::{
3195 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3196 BlockWriter,
3197 };
3198 use reth_testing_utils::generators::{self, random_block, BlockParams};
3199
3200 #[test]
3201 fn test_receipts_by_block_range_empty_range() {
3202 let factory = create_test_provider_factory();
3203 let provider = factory.provider().unwrap();
3204
3205 let start = 10u64;
3207 let end = 9u64;
3208 let result = provider.receipts_by_block_range(start..=end).unwrap();
3209 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3210 }
3211
3212 #[test]
3213 fn test_receipts_by_block_range_nonexistent_blocks() {
3214 let factory = create_test_provider_factory();
3215 let provider = factory.provider().unwrap();
3216
3217 let result = provider.receipts_by_block_range(10..=12).unwrap();
3219 assert_eq!(result, vec![vec![], vec![], vec![]]);
3220 }
3221
3222 #[test]
3223 fn test_receipts_by_block_range_single_block() {
3224 let factory = create_test_provider_factory();
3225 let data = BlockchainTestData::default();
3226
3227 let provider_rw = factory.provider_rw().unwrap();
3228 provider_rw
3229 .insert_block(
3230 data.genesis.clone().try_recover().unwrap(),
3231 crate::StorageLocation::Database,
3232 )
3233 .unwrap();
3234 provider_rw
3235 .insert_block(data.blocks[0].0.clone(), crate::StorageLocation::Database)
3236 .unwrap();
3237 provider_rw
3238 .write_state(
3239 &data.blocks[0].1,
3240 crate::OriginalValuesKnown::No,
3241 crate::StorageLocation::Database,
3242 )
3243 .unwrap();
3244 provider_rw.commit().unwrap();
3245
3246 let provider = factory.provider().unwrap();
3247 let result = provider.receipts_by_block_range(1..=1).unwrap();
3248
3249 assert_eq!(result.len(), 1);
3251 assert_eq!(result[0].len(), 1);
3252 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3253 }
3254
3255 #[test]
3256 fn test_receipts_by_block_range_multiple_blocks() {
3257 let factory = create_test_provider_factory();
3258 let data = BlockchainTestData::default();
3259
3260 let provider_rw = factory.provider_rw().unwrap();
3261 provider_rw
3262 .insert_block(
3263 data.genesis.clone().try_recover().unwrap(),
3264 crate::StorageLocation::Database,
3265 )
3266 .unwrap();
3267 for i in 0..3 {
3268 provider_rw
3269 .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3270 .unwrap();
3271 provider_rw
3272 .write_state(
3273 &data.blocks[i].1,
3274 crate::OriginalValuesKnown::No,
3275 crate::StorageLocation::Database,
3276 )
3277 .unwrap();
3278 }
3279 provider_rw.commit().unwrap();
3280
3281 let provider = factory.provider().unwrap();
3282 let result = provider.receipts_by_block_range(1..=3).unwrap();
3283
3284 assert_eq!(result.len(), 3);
3286 for (i, block_receipts) in result.iter().enumerate() {
3287 assert_eq!(block_receipts.len(), 1);
3288 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3289 }
3290 }
3291
3292 #[test]
3293 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3294 let factory = create_test_provider_factory();
3295 let data = BlockchainTestData::default();
3296
3297 let provider_rw = factory.provider_rw().unwrap();
3298 provider_rw
3299 .insert_block(
3300 data.genesis.clone().try_recover().unwrap(),
3301 crate::StorageLocation::Database,
3302 )
3303 .unwrap();
3304
3305 for i in 0..3 {
3307 provider_rw
3308 .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3309 .unwrap();
3310 provider_rw
3311 .write_state(
3312 &data.blocks[i].1,
3313 crate::OriginalValuesKnown::No,
3314 crate::StorageLocation::Database,
3315 )
3316 .unwrap();
3317 }
3318 provider_rw.commit().unwrap();
3319
3320 let provider = factory.provider().unwrap();
3321 let result = provider.receipts_by_block_range(1..=3).unwrap();
3322
3323 assert_eq!(result.len(), 3);
3325 for block_receipts in &result {
3326 assert_eq!(block_receipts.len(), 1);
3327 }
3328 }
3329
3330 #[test]
3331 fn test_receipts_by_block_range_partial_range() {
3332 let factory = create_test_provider_factory();
3333 let data = BlockchainTestData::default();
3334
3335 let provider_rw = factory.provider_rw().unwrap();
3336 provider_rw
3337 .insert_block(
3338 data.genesis.clone().try_recover().unwrap(),
3339 crate::StorageLocation::Database,
3340 )
3341 .unwrap();
3342 for i in 0..3 {
3343 provider_rw
3344 .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3345 .unwrap();
3346 provider_rw
3347 .write_state(
3348 &data.blocks[i].1,
3349 crate::OriginalValuesKnown::No,
3350 crate::StorageLocation::Database,
3351 )
3352 .unwrap();
3353 }
3354 provider_rw.commit().unwrap();
3355
3356 let provider = factory.provider().unwrap();
3357
3358 let result = provider.receipts_by_block_range(2..=5).unwrap();
3360 assert_eq!(result.len(), 4);
3361
3362 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3369 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3370 }
3371
3372 #[test]
3373 fn test_receipts_by_block_range_all_empty_blocks() {
3374 let factory = create_test_provider_factory();
3375 let mut rng = generators::rng();
3376
3377 let mut blocks = Vec::new();
3379 for i in 1..=3 {
3380 let block =
3381 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3382 blocks.push(block);
3383 }
3384
3385 let provider_rw = factory.provider_rw().unwrap();
3386 for block in blocks {
3387 provider_rw
3388 .insert_block(block.try_recover().unwrap(), crate::StorageLocation::Database)
3389 .unwrap();
3390 }
3391 provider_rw.commit().unwrap();
3392
3393 let provider = factory.provider().unwrap();
3394 let result = provider.receipts_by_block_range(1..=3).unwrap();
3395
3396 assert_eq!(result.len(), 3);
3397 for block_receipts in result {
3398 assert_eq!(block_receipts.len(), 0);
3399 }
3400 }
3401
3402 #[test]
3403 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3404 let factory = create_test_provider_factory();
3405 let data = BlockchainTestData::default();
3406
3407 let provider_rw = factory.provider_rw().unwrap();
3408 provider_rw
3409 .insert_block(
3410 data.genesis.clone().try_recover().unwrap(),
3411 crate::StorageLocation::Database,
3412 )
3413 .unwrap();
3414 for i in 0..3 {
3415 provider_rw
3416 .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3417 .unwrap();
3418 provider_rw
3419 .write_state(
3420 &data.blocks[i].1,
3421 crate::OriginalValuesKnown::No,
3422 crate::StorageLocation::Database,
3423 )
3424 .unwrap();
3425 }
3426 provider_rw.commit().unwrap();
3427
3428 let provider = factory.provider().unwrap();
3429
3430 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3432
3433 let mut individual_results = Vec::new();
3435 for block_num in 1..=3 {
3436 let receipts =
3437 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3438 individual_results.push(receipts);
3439 }
3440
3441 assert_eq!(range_result, individual_results);
3442 }
3443}