1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
15 HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter, LatestStateProvider,
16 LatestStateProviderRef, OriginalValuesKnown, ProviderError, PruneCheckpointReader,
17 PruneCheckpointWriter, RevertsInit, StageCheckpointReader, StateCommitmentProvider,
18 StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
19 StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
20 TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
21};
22use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header, TxReceipt};
23use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
24use alloy_primitives::{
25 keccak256,
26 map::{hash_map, B256Map, HashMap, HashSet},
27 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
28};
29use itertools::Itertools;
30use rayon::slice::ParallelSliceMut;
31use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
32use reth_db_api::{
33 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
34 database::Database,
35 models::{
36 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
37 ShardedKey, StoredBlockBodyIndices,
38 },
39 table::Table,
40 tables,
41 transaction::{DbTx, DbTxMut},
42 BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
43};
44use reth_execution_types::{Chain, ExecutionOutcome};
45use reth_network_p2p::headers::downloader::SyncTarget;
46use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
47use reth_primitives::{
48 Account, Bytecode, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
49 StaticFileSegment, StorageEntry,
50};
51use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction};
52use reth_prune_types::{
53 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
54};
55use reth_stages_types::{StageCheckpoint, StageId};
56use reth_storage_api::{
57 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
58 StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
59};
60use reth_storage_errors::provider::{ProviderResult, RootMismatch};
61use reth_trie::{
62 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
63 updates::{StorageTrieUpdates, TrieUpdates},
64 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
65};
66use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
67use revm_database::states::{
68 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
69};
70use std::{
71 cmp::Ordering,
72 collections::{BTreeMap, BTreeSet},
73 fmt::Debug,
74 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
75 sync::{mpsc, Arc},
76};
77use tokio::sync::watch;
78use tracing::{debug, trace};
79
80pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
82
83#[derive(Debug)]
88pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
89 pub DatabaseProvider<<DB as Database>::TXMut, N>,
90);
91
92impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
93 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
94
95 fn deref(&self) -> &Self::Target {
96 &self.0
97 }
98}
99
100impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
101 fn deref_mut(&mut self) -> &mut Self::Target {
102 &mut self.0
103 }
104}
105
106impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
107 for DatabaseProviderRW<DB, N>
108{
109 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
110 &self.0
111 }
112}
113
114impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
115 pub fn commit(self) -> ProviderResult<bool> {
117 self.0.commit()
118 }
119
120 pub fn into_tx(self) -> <DB as Database>::TXMut {
122 self.0.into_tx()
123 }
124}
125
126impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
127 for DatabaseProvider<<DB as Database>::TXMut, N>
128{
129 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
130 provider.0
131 }
132}
133
134#[derive(Debug)]
137pub struct DatabaseProvider<TX, N: NodeTypes> {
138 tx: TX,
140 chain_spec: Arc<N::ChainSpec>,
142 static_file_provider: StaticFileProvider<N::Primitives>,
144 prune_modes: PruneModes,
146 storage: Arc<N::Storage>,
148}
149
150impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
151 pub const fn prune_modes_ref(&self) -> &PruneModes {
153 &self.prune_modes
154 }
155}
156
157impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
158 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
160 trace!(target: "providers::db", "Returning latest state provider");
161 Box::new(LatestStateProviderRef::new(self))
162 }
163
164 pub fn history_by_block_hash<'a>(
166 &'a self,
167 block_hash: BlockHash,
168 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
169 let mut block_number =
170 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
171 if block_number == self.best_block_number().unwrap_or_default() &&
172 block_number == self.last_block_number().unwrap_or_default()
173 {
174 return Ok(Box::new(LatestStateProviderRef::new(self)))
175 }
176
177 block_number += 1;
179
180 let account_history_prune_checkpoint =
181 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
182 let storage_history_prune_checkpoint =
183 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
184
185 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
186
187 if let Some(prune_checkpoint_block_number) =
190 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
191 {
192 state_provider = state_provider.with_lowest_available_account_history_block_number(
193 prune_checkpoint_block_number + 1,
194 );
195 }
196 if let Some(prune_checkpoint_block_number) =
197 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
198 {
199 state_provider = state_provider.with_lowest_available_storage_history_block_number(
200 prune_checkpoint_block_number + 1,
201 );
202 }
203
204 Ok(Box::new(state_provider))
205 }
206
207 #[cfg(feature = "test-utils")]
208 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
210 self.prune_modes = prune_modes;
211 }
212}
213
214impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
215 type Primitives = N::Primitives;
216}
217
218impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
219 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
221 self.static_file_provider.clone()
222 }
223}
224
225impl<TX: Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
226 for DatabaseProvider<TX, N>
227{
228 type ChainSpec = N::ChainSpec;
229
230 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
231 self.chain_spec.clone()
232 }
233}
234
235impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
236 pub const fn new_rw(
238 tx: TX,
239 chain_spec: Arc<N::ChainSpec>,
240 static_file_provider: StaticFileProvider<N::Primitives>,
241 prune_modes: PruneModes,
242 storage: Arc<N::Storage>,
243 ) -> Self {
244 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
245 }
246}
247
248impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
249 fn as_ref(&self) -> &Self {
250 self
251 }
252}
253
254impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
255 pub fn unwind_trie_state_range(
260 &self,
261 range: RangeInclusive<BlockNumber>,
262 ) -> ProviderResult<()> {
263 let changed_accounts = self
264 .tx
265 .cursor_read::<tables::AccountChangeSets>()?
266 .walk_range(range.clone())?
267 .collect::<Result<Vec<_>, _>>()?;
268
269 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
271 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
272 let mut destroyed_accounts = HashSet::default();
273 for (hashed_address, account) in hashed_addresses {
274 account_prefix_set.insert(Nibbles::unpack(hashed_address));
275 if account.is_none() {
276 destroyed_accounts.insert(hashed_address);
277 }
278 }
279
280 self.unwind_account_history_indices(changed_accounts.iter())?;
282 let storage_range = BlockNumberAddress::range(range.clone());
283
284 let changed_storages = self
285 .tx
286 .cursor_read::<tables::StorageChangeSets>()?
287 .walk_range(storage_range)?
288 .collect::<Result<Vec<_>, _>>()?;
289
290 let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
293 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
294 for (hashed_address, hashed_slots) in storage_entries {
295 account_prefix_set.insert(Nibbles::unpack(hashed_address));
296 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
297 for slot in hashed_slots {
298 storage_prefix_set.insert(Nibbles::unpack(slot));
299 }
300 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
301 }
302
303 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
305
306 let prefix_sets = TriePrefixSets {
310 account_prefix_set: account_prefix_set.freeze(),
311 storage_prefix_sets,
312 destroyed_accounts,
313 };
314 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
315 .with_prefix_sets(prefix_sets)
316 .root_with_updates()
317 .map_err(reth_db_api::DatabaseError::from)?;
318
319 let parent_number = range.start().saturating_sub(1);
320 let parent_state_root = self
321 .header_by_number(parent_number)?
322 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
323 .state_root();
324
325 if new_state_root != parent_state_root {
328 let parent_hash = self
329 .block_hash(parent_number)?
330 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
331 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
332 root: GotExpected { got: new_state_root, expected: parent_state_root },
333 block_number: parent_number,
334 block_hash: parent_hash,
335 })))
336 }
337 self.write_trie_updates(&trie_updates)?;
338
339 Ok(())
340 }
341
342 fn remove_receipts_from(
344 &self,
345 from_tx: TxNumber,
346 last_block: BlockNumber,
347 remove_from: StorageLocation,
348 ) -> ProviderResult<()> {
349 if remove_from.database() {
350 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
352 }
353
354 if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
355 let static_file_receipt_num =
356 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
357
358 let to_delete = static_file_receipt_num
359 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
360 .unwrap_or_default();
361
362 self.static_file_provider
363 .latest_writer(StaticFileSegment::Receipts)?
364 .prune_receipts(to_delete, last_block)?;
365 }
366
367 Ok(())
368 }
369}
370
371impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
372 fn try_into_history_at_block(
373 self,
374 mut block_number: BlockNumber,
375 ) -> ProviderResult<StateProviderBox> {
376 if block_number == self.best_block_number().unwrap_or_default() {
379 return Ok(Box::new(LatestStateProvider::new(self)))
380 }
381
382 block_number += 1;
384
385 let account_history_prune_checkpoint =
386 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
387 let storage_history_prune_checkpoint =
388 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
389
390 let mut state_provider = HistoricalStateProvider::new(self, block_number);
391
392 if let Some(prune_checkpoint_block_number) =
395 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
396 {
397 state_provider = state_provider.with_lowest_available_account_history_block_number(
398 prune_checkpoint_block_number + 1,
399 );
400 }
401 if let Some(prune_checkpoint_block_number) =
402 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
403 {
404 state_provider = state_provider.with_lowest_available_storage_history_block_number(
405 prune_checkpoint_block_number + 1,
406 );
407 }
408
409 Ok(Box::new(state_provider))
410 }
411}
412
413impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
414 type StateCommitment = N::StateCommitment;
415}
416
417impl<
418 Tx: DbTx + DbTxMut + 'static,
419 N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
420 > DatabaseProvider<Tx, N>
421{
422 pub fn insert_historical_block(
426 &self,
427 block: RecoveredBlock<<Self as BlockWriter>::Block>,
428 ) -> ProviderResult<StoredBlockBodyIndices> {
429 let ttd = if block.number() == 0 {
430 block.header().difficulty()
431 } else {
432 let parent_block_number = block.number() - 1;
433 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
434 parent_ttd + block.header().difficulty()
435 };
436
437 let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
438
439 let segment_header = writer.user_header();
441 if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
442 for block_number in 0..block.number() {
443 let mut prev = block.clone_header();
444 prev.number = block_number;
445 writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
446 }
447 }
448
449 writer.append_header(block.header(), ttd, &block.hash())?;
450
451 self.insert_block(block, StorageLocation::Database)
452 }
453}
454
455fn unwind_history_shards<S, T, C>(
468 cursor: &mut C,
469 start_key: T::Key,
470 block_number: BlockNumber,
471 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
472) -> ProviderResult<Vec<u64>>
473where
474 T: Table<Value = BlockNumberList>,
475 T::Key: AsRef<ShardedKey<S>>,
476 C: DbCursorRO<T> + DbCursorRW<T>,
477{
478 let mut item = cursor.seek_exact(start_key)?;
479 while let Some((sharded_key, list)) = item {
480 if !shard_belongs_to_key(&sharded_key) {
482 break
483 }
484 cursor.delete_current()?;
485
486 let first = list.iter().next().expect("List can't be empty");
489 if first >= block_number {
490 item = cursor.prev()?;
491 continue
492 } else if block_number <= sharded_key.as_ref().highest_block_number {
493 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
495 }
496 return Ok(list.iter().collect::<Vec<_>>())
497 }
498
499 Ok(Vec::new())
500}
501
502impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
503 pub const fn new(
505 tx: TX,
506 chain_spec: Arc<N::ChainSpec>,
507 static_file_provider: StaticFileProvider<N::Primitives>,
508 prune_modes: PruneModes,
509 storage: Arc<N::Storage>,
510 ) -> Self {
511 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
512 }
513
514 pub fn into_tx(self) -> TX {
516 self.tx
517 }
518
519 pub fn tx_mut(&mut self) -> &mut TX {
521 &mut self.tx
522 }
523
524 pub const fn tx_ref(&self) -> &TX {
526 &self.tx
527 }
528
529 pub fn chain_spec(&self) -> &N::ChainSpec {
531 &self.chain_spec
532 }
533}
534
535impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
536 fn transactions_by_tx_range_with_cursor<C>(
537 &self,
538 range: impl RangeBounds<TxNumber>,
539 cursor: &mut C,
540 ) -> ProviderResult<Vec<TxTy<N>>>
541 where
542 C: DbCursorRO<tables::Transactions<TxTy<N>>>,
543 {
544 self.static_file_provider.get_range_with_static_file_or_database(
545 StaticFileSegment::Transactions,
546 to_range(range),
547 |static_file, range, _| static_file.transactions_by_tx_range(range),
548 |range, _| self.cursor_collect(cursor, range),
549 |_| true,
550 )
551 }
552
553 fn recovered_block<H, HF, B, BF>(
554 &self,
555 id: BlockHashOrNumber,
556 _transaction_kind: TransactionVariant,
557 header_by_number: HF,
558 construct_block: BF,
559 ) -> ProviderResult<Option<B>>
560 where
561 H: AsRef<HeaderTy<N>>,
562 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
563 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
564 {
565 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
566 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
567
568 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
575
576 let tx_range = body.tx_num_range();
577
578 let (transactions, senders) = if tx_range.is_empty() {
579 (vec![], vec![])
580 } else {
581 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
582 };
583
584 let body = self
585 .storage
586 .reader()
587 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
588 .pop()
589 .ok_or(ProviderError::InvalidStorageOutput)?;
590
591 construct_block(header, body, senders)
592 }
593
594 fn block_range<F, H, HF, R>(
604 &self,
605 range: RangeInclusive<BlockNumber>,
606 headers_range: HF,
607 mut assemble_block: F,
608 ) -> ProviderResult<Vec<R>>
609 where
610 H: AsRef<HeaderTy<N>>,
611 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
612 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
613 {
614 if range.is_empty() {
615 return Ok(Vec::new())
616 }
617
618 let len = range.end().saturating_sub(*range.start()) as usize;
619 let mut blocks = Vec::with_capacity(len);
620
621 let headers = headers_range(range.clone())?;
622 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
623
624 let present_headers = self
630 .block_body_indices_range(range)?
631 .into_iter()
632 .map(|b| b.tx_num_range())
633 .zip(headers)
634 .collect::<Vec<_>>();
635
636 let mut inputs = Vec::new();
637 for (tx_range, header) in &present_headers {
638 let transactions = if tx_range.is_empty() {
639 Vec::new()
640 } else {
641 self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
642 };
643
644 inputs.push((header.as_ref(), transactions));
645 }
646
647 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
648
649 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
650 blocks.push(assemble_block(header, body, tx_range)?);
651 }
652
653 Ok(blocks)
654 }
655
656 fn block_with_senders_range<H, HF, B, BF>(
667 &self,
668 range: RangeInclusive<BlockNumber>,
669 headers_range: HF,
670 assemble_block: BF,
671 ) -> ProviderResult<Vec<B>>
672 where
673 H: AsRef<HeaderTy<N>>,
674 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
675 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
676 {
677 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
678
679 self.block_range(range, headers_range, |header, body, tx_range| {
680 let senders = if tx_range.is_empty() {
681 Vec::new()
682 } else {
683 let known_senders =
685 senders_cursor
686 .walk_range(tx_range.clone())?
687 .collect::<Result<HashMap<_, _>, _>>()?;
688
689 let mut senders = Vec::with_capacity(body.transactions().len());
690 for (tx_num, tx) in tx_range.zip(body.transactions()) {
691 match known_senders.get(&tx_num) {
692 None => {
693 let sender = tx.recover_signer_unchecked()?;
695 senders.push(sender);
696 }
697 Some(sender) => senders.push(*sender),
698 }
699 }
700
701 senders
702 };
703
704 assemble_block(header, body, senders)
705 })
706 }
707
708 fn populate_bundle_state<A, S>(
712 &self,
713 account_changeset: Vec<(u64, AccountBeforeTx)>,
714 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
715 plain_accounts_cursor: &mut A,
716 plain_storage_cursor: &mut S,
717 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
718 where
719 A: DbCursorRO<PlainAccountState>,
720 S: DbDupCursorRO<PlainStorageState>,
721 {
722 let mut state: BundleStateInit = HashMap::default();
726
727 let mut reverts: RevertsInit = HashMap::default();
733
734 for (block_number, account_before) in account_changeset.into_iter().rev() {
736 let AccountBeforeTx { info: old_info, address } = account_before;
737 match state.entry(address) {
738 hash_map::Entry::Vacant(entry) => {
739 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
740 entry.insert((old_info, new_info, HashMap::default()));
741 }
742 hash_map::Entry::Occupied(mut entry) => {
743 entry.get_mut().0 = old_info;
745 }
746 }
747 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
749 }
750
751 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
753 let BlockNumberAddress((block_number, address)) = block_and_address;
754 let account_state = match state.entry(address) {
756 hash_map::Entry::Vacant(entry) => {
757 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
758 entry.insert((present_info, present_info, HashMap::default()))
759 }
760 hash_map::Entry::Occupied(entry) => entry.into_mut(),
761 };
762
763 match account_state.2.entry(old_storage.key) {
765 hash_map::Entry::Vacant(entry) => {
766 let new_storage = plain_storage_cursor
767 .seek_by_key_subkey(address, old_storage.key)?
768 .filter(|storage| storage.key == old_storage.key)
769 .unwrap_or_default();
770 entry.insert((old_storage.value, new_storage.value));
771 }
772 hash_map::Entry::Occupied(mut entry) => {
773 entry.get_mut().0 = old_storage.value;
774 }
775 };
776
777 reverts
778 .entry(block_number)
779 .or_default()
780 .entry(address)
781 .or_default()
782 .1
783 .push(old_storage);
784 }
785
786 Ok((state, reverts))
787 }
788}
789
790impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
791 pub fn commit(self) -> ProviderResult<bool> {
793 Ok(self.tx.commit()?)
794 }
795
796 fn take_shard<T>(
799 &self,
800 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
801 key: T::Key,
802 ) -> ProviderResult<Vec<u64>>
803 where
804 T: Table<Value = BlockNumberList>,
805 {
806 if let Some((_, list)) = cursor.seek_exact(key)? {
807 cursor.delete_current()?;
809 let list = list.iter().collect::<Vec<_>>();
810 return Ok(list)
811 }
812 Ok(Vec::new())
813 }
814
815 fn append_history_index<P, T>(
823 &self,
824 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
825 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
826 ) -> ProviderResult<()>
827 where
828 P: Copy,
829 T: Table<Value = BlockNumberList>,
830 {
831 let mut cursor = self.tx.cursor_write::<T>()?;
832 for (partial_key, indices) in index_updates {
833 let mut last_shard =
834 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
835 last_shard.extend(indices);
836 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
838 while let Some(list) = chunks.next() {
839 let highest_block_number = if chunks.peek().is_some() {
840 *list.last().expect("`chunks` does not return empty list")
841 } else {
842 u64::MAX
844 };
845 cursor.insert(
846 sharded_key_factory(partial_key, highest_block_number),
847 &BlockNumberList::new_pre_sorted(list.iter().copied()),
848 )?;
849 }
850 }
851 Ok(())
852 }
853}
854
855impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
856 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
857 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
858 }
859}
860
861impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
862 fn changed_accounts_with_range(
863 &self,
864 range: impl RangeBounds<BlockNumber>,
865 ) -> ProviderResult<BTreeSet<Address>> {
866 self.tx
867 .cursor_read::<tables::AccountChangeSets>()?
868 .walk_range(range)?
869 .map(|entry| {
870 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
871 })
872 .collect()
873 }
874
875 fn basic_accounts(
876 &self,
877 iter: impl IntoIterator<Item = Address>,
878 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
879 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
880 Ok(iter
881 .into_iter()
882 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
883 .collect::<Result<Vec<_>, _>>()?)
884 }
885
886 fn changed_accounts_and_blocks_with_range(
887 &self,
888 range: RangeInclusive<BlockNumber>,
889 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
890 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
891
892 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
893 BTreeMap::new(),
894 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
895 let (index, account) = entry?;
896 accounts.entry(account.address).or_default().push(index);
897 Ok(accounts)
898 },
899 )?;
900
901 Ok(account_transitions)
902 }
903}
904
905impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
906 fn storage_changeset(
907 &self,
908 block_number: BlockNumber,
909 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
910 let range = block_number..=block_number;
911 let storage_range = BlockNumberAddress::range(range);
912 self.tx
913 .cursor_dup_read::<tables::StorageChangeSets>()?
914 .walk_range(storage_range)?
915 .map(|result| -> ProviderResult<_> { Ok(result?) })
916 .collect()
917 }
918}
919
920impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
921 fn account_block_changeset(
922 &self,
923 block_number: BlockNumber,
924 ) -> ProviderResult<Vec<AccountBeforeTx>> {
925 let range = block_number..=block_number;
926 self.tx
927 .cursor_read::<tables::AccountChangeSets>()?
928 .walk_range(range)?
929 .map(|result| -> ProviderResult<_> {
930 let (_, account_before) = result?;
931 Ok(account_before)
932 })
933 .collect()
934 }
935}
936
937impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
938 for DatabaseProvider<TX, N>
939{
940 type Header = HeaderTy<N>;
941
942 fn sync_gap(
943 &self,
944 tip: watch::Receiver<B256>,
945 highest_uninterrupted_block: BlockNumber,
946 ) -> ProviderResult<HeaderSyncGap<Self::Header>> {
947 let static_file_provider = self.static_file_provider();
948
949 let next_static_file_block_num = static_file_provider
952 .get_highest_static_file_block(StaticFileSegment::Headers)
953 .map(|id| id + 1)
954 .unwrap_or_default();
955 let next_block = highest_uninterrupted_block + 1;
956
957 match next_static_file_block_num.cmp(&next_block) {
958 Ordering::Greater => {
961 let mut static_file_producer =
962 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
963 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
964 static_file_producer.commit()?
967 }
968 Ordering::Less => {
969 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
971 }
972 Ordering::Equal => {}
973 }
974
975 let local_head = static_file_provider
976 .sealed_header(highest_uninterrupted_block)?
977 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
978
979 let target = SyncTarget::Tip(*tip.borrow());
980
981 Ok(HeaderSyncGap { local_head, target })
982 }
983}
984
985impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
986 type Header = HeaderTy<N>;
987
988 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
989 if let Some(num) = self.block_number(*block_hash)? {
990 Ok(self.header_by_number(num)?)
991 } else {
992 Ok(None)
993 }
994 }
995
996 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
997 self.static_file_provider.get_with_static_file_or_database(
998 StaticFileSegment::Headers,
999 num,
1000 |static_file| static_file.header_by_number(num),
1001 || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1002 )
1003 }
1004
1005 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1006 if let Some(num) = self.block_number(*block_hash)? {
1007 self.header_td_by_number(num)
1008 } else {
1009 Ok(None)
1010 }
1011 }
1012
1013 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1014 if self.chain_spec.is_paris_active_at_block(number) {
1015 if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1016 return Ok(Some(td))
1019 }
1020 }
1021
1022 self.static_file_provider.get_with_static_file_or_database(
1023 StaticFileSegment::Headers,
1024 number,
1025 |static_file| static_file.header_td_by_number(number),
1026 || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1027 )
1028 }
1029
1030 fn headers_range(
1031 &self,
1032 range: impl RangeBounds<BlockNumber>,
1033 ) -> ProviderResult<Vec<Self::Header>> {
1034 self.static_file_provider.get_range_with_static_file_or_database(
1035 StaticFileSegment::Headers,
1036 to_range(range),
1037 |static_file, range, _| static_file.headers_range(range),
1038 |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1039 |_| true,
1040 )
1041 }
1042
1043 fn sealed_header(
1044 &self,
1045 number: BlockNumber,
1046 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1047 self.static_file_provider.get_with_static_file_or_database(
1048 StaticFileSegment::Headers,
1049 number,
1050 |static_file| static_file.sealed_header(number),
1051 || {
1052 if let Some(header) = self.header_by_number(number)? {
1053 let hash = self
1054 .block_hash(number)?
1055 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1056 Ok(Some(SealedHeader::new(header, hash)))
1057 } else {
1058 Ok(None)
1059 }
1060 },
1061 )
1062 }
1063
1064 fn sealed_headers_while(
1065 &self,
1066 range: impl RangeBounds<BlockNumber>,
1067 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1068 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1069 self.static_file_provider.get_range_with_static_file_or_database(
1070 StaticFileSegment::Headers,
1071 to_range(range),
1072 |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1073 |range, mut predicate| {
1074 let mut headers = vec![];
1075 for entry in
1076 self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1077 {
1078 let (number, header) = entry?;
1079 let hash = self
1080 .block_hash(number)?
1081 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1082 let sealed = SealedHeader::new(header, hash);
1083 if !predicate(&sealed) {
1084 break
1085 }
1086 headers.push(sealed);
1087 }
1088 Ok(headers)
1089 },
1090 predicate,
1091 )
1092 }
1093}
1094
1095impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1096 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1097 self.static_file_provider.get_with_static_file_or_database(
1098 StaticFileSegment::Headers,
1099 number,
1100 |static_file| static_file.block_hash(number),
1101 || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1102 )
1103 }
1104
1105 fn canonical_hashes_range(
1106 &self,
1107 start: BlockNumber,
1108 end: BlockNumber,
1109 ) -> ProviderResult<Vec<B256>> {
1110 self.static_file_provider.get_range_with_static_file_or_database(
1111 StaticFileSegment::Headers,
1112 start..end,
1113 |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1114 |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1115 |_| true,
1116 )
1117 }
1118}
1119
1120impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1121 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1122 let best_number = self.best_block_number()?;
1123 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1124 Ok(ChainInfo { best_hash, best_number })
1125 }
1126
1127 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1128 Ok(self
1131 .get_stage_checkpoint(StageId::Finish)?
1132 .map(|checkpoint| checkpoint.block_number)
1133 .unwrap_or_default())
1134 }
1135
1136 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1137 Ok(self
1138 .tx
1139 .cursor_read::<tables::CanonicalHeaders>()?
1140 .last()?
1141 .map(|(num, _)| num)
1142 .max(
1143 self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1144 )
1145 .unwrap_or_default())
1146 }
1147
1148 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1149 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1150 }
1151}
1152
1153impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1154 type Block = BlockTy<N>;
1155
1156 fn find_block_by_hash(
1157 &self,
1158 hash: B256,
1159 source: BlockSource,
1160 ) -> ProviderResult<Option<Self::Block>> {
1161 if source.is_canonical() {
1162 self.block(hash.into())
1163 } else {
1164 Ok(None)
1165 }
1166 }
1167
1168 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1174 if let Some(number) = self.convert_hash_or_number(id)? {
1175 if let Some(header) = self.header_by_number(number)? {
1176 let Some(transactions) = self.transactions_by_block(number.into())? else {
1181 return Ok(None)
1182 };
1183
1184 let body = self
1185 .storage
1186 .reader()
1187 .read_block_bodies(self, vec![(&header, transactions)])?
1188 .pop()
1189 .ok_or(ProviderError::InvalidStorageOutput)?;
1190
1191 return Ok(Some(Self::Block::new(header, body)))
1192 }
1193 }
1194
1195 Ok(None)
1196 }
1197
1198 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1199 Ok(None)
1200 }
1201
1202 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1203 Ok(None)
1204 }
1205
1206 fn pending_block_and_receipts(
1207 &self,
1208 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1209 Ok(None)
1210 }
1211
1212 fn recovered_block(
1221 &self,
1222 id: BlockHashOrNumber,
1223 transaction_kind: TransactionVariant,
1224 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1225 self.recovered_block(
1226 id,
1227 transaction_kind,
1228 |block_number| self.header_by_number(block_number),
1229 |header, body, senders| {
1230 Self::Block::new(header, body)
1231 .try_into_recovered_unchecked(senders)
1235 .map(Some)
1236 .map_err(|_| ProviderError::SenderRecoveryError)
1237 },
1238 )
1239 }
1240
1241 fn sealed_block_with_senders(
1242 &self,
1243 id: BlockHashOrNumber,
1244 transaction_kind: TransactionVariant,
1245 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1246 self.recovered_block(
1247 id,
1248 transaction_kind,
1249 |block_number| self.sealed_header(block_number),
1250 |header, body, senders| {
1251 Self::Block::new_sealed(header, body)
1252 .try_with_senders_unchecked(senders)
1256 .map(Some)
1257 .map_err(|_| ProviderError::SenderRecoveryError)
1258 },
1259 )
1260 }
1261
1262 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1263 self.block_range(
1264 range,
1265 |range| self.headers_range(range),
1266 |header, body, _| Ok(Self::Block::new(header, body)),
1267 )
1268 }
1269
1270 fn block_with_senders_range(
1271 &self,
1272 range: RangeInclusive<BlockNumber>,
1273 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1274 self.block_with_senders_range(
1275 range,
1276 |range| self.headers_range(range),
1277 |header, body, senders| {
1278 Self::Block::new(header, body)
1279 .try_into_recovered_unchecked(senders)
1280 .map_err(|_| ProviderError::SenderRecoveryError)
1281 },
1282 )
1283 }
1284
1285 fn recovered_block_range(
1286 &self,
1287 range: RangeInclusive<BlockNumber>,
1288 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1289 self.block_with_senders_range(
1290 range,
1291 |range| self.sealed_headers_range(range),
1292 |header, body, senders| {
1293 Self::Block::new_sealed(header, body)
1294 .try_with_senders(senders)
1295 .map_err(|_| ProviderError::SenderRecoveryError)
1296 },
1297 )
1298 }
1299}
1300
1301impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1302 for DatabaseProvider<TX, N>
1303{
1304 fn transaction_hashes_by_range(
1307 &self,
1308 tx_range: Range<TxNumber>,
1309 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1310 self.static_file_provider.get_range_with_static_file_or_database(
1311 StaticFileSegment::Transactions,
1312 tx_range,
1313 |static_file, range, _| static_file.transaction_hashes_by_range(range),
1314 |tx_range, _| {
1315 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1316 let tx_range_size = tx_range.clone().count();
1317 let tx_walker = tx_cursor.walk_range(tx_range)?;
1318
1319 let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1320 let mut channels = Vec::with_capacity(chunk_size);
1321 let mut transaction_count = 0;
1322
1323 #[inline]
1324 fn calculate_hash<T>(
1325 entry: Result<(TxNumber, T), DatabaseError>,
1326 rlp_buf: &mut Vec<u8>,
1327 ) -> Result<(B256, TxNumber), Box<ProviderError>>
1328 where
1329 T: Encodable2718,
1330 {
1331 let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1332 tx.encode_2718(rlp_buf);
1333 Ok((keccak256(rlp_buf), tx_id))
1334 }
1335
1336 for chunk in &tx_walker.chunks(chunk_size) {
1337 let (tx, rx) = mpsc::channel();
1338 channels.push(rx);
1339
1340 let chunk: Vec<_> = chunk.collect();
1343 transaction_count += chunk.len();
1344
1345 rayon::spawn(move || {
1349 let mut rlp_buf = Vec::with_capacity(128);
1350 for entry in chunk {
1351 rlp_buf.clear();
1352 let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1353 }
1354 });
1355 }
1356 let mut tx_list = Vec::with_capacity(transaction_count);
1357
1358 for channel in channels {
1360 while let Ok(tx) = channel.recv() {
1361 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1362 tx_list.push((tx_hash, tx_id));
1363 }
1364 }
1365
1366 Ok(tx_list)
1367 },
1368 |_| true,
1369 )
1370 }
1371}
1372
1373impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1375 type Transaction = TxTy<N>;
1376
1377 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1378 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1379 }
1380
1381 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1382 self.static_file_provider.get_with_static_file_or_database(
1383 StaticFileSegment::Transactions,
1384 id,
1385 |static_file| static_file.transaction_by_id(id),
1386 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1387 )
1388 }
1389
1390 fn transaction_by_id_unhashed(
1391 &self,
1392 id: TxNumber,
1393 ) -> ProviderResult<Option<Self::Transaction>> {
1394 self.static_file_provider.get_with_static_file_or_database(
1395 StaticFileSegment::Transactions,
1396 id,
1397 |static_file| static_file.transaction_by_id_unhashed(id),
1398 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1399 )
1400 }
1401
1402 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1403 if let Some(id) = self.transaction_id(hash)? {
1404 Ok(self.transaction_by_id_unhashed(id)?)
1405 } else {
1406 Ok(None)
1407 }
1408 }
1409
1410 fn transaction_by_hash_with_meta(
1411 &self,
1412 tx_hash: TxHash,
1413 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1414 let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1415 if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1416 if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1417 if let Some(block_number) =
1418 transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1419 {
1420 if let Some(sealed_header) = self.sealed_header(block_number)? {
1421 let (header, block_hash) = sealed_header.split();
1422 if let Some(block_body) = self.block_body_indices(block_number)? {
1423 let index = transaction_id - block_body.first_tx_num();
1428
1429 let meta = TransactionMeta {
1430 tx_hash,
1431 index,
1432 block_hash,
1433 block_number,
1434 base_fee: header.base_fee_per_gas(),
1435 excess_blob_gas: header.excess_blob_gas(),
1436 timestamp: header.timestamp(),
1437 };
1438
1439 return Ok(Some((transaction, meta)))
1440 }
1441 }
1442 }
1443 }
1444 }
1445
1446 Ok(None)
1447 }
1448
1449 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1450 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1451 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1452 }
1453
1454 fn transactions_by_block(
1455 &self,
1456 id: BlockHashOrNumber,
1457 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1458 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1459
1460 if let Some(block_number) = self.convert_hash_or_number(id)? {
1461 if let Some(body) = self.block_body_indices(block_number)? {
1462 let tx_range = body.tx_num_range();
1463 return if tx_range.is_empty() {
1464 Ok(Some(Vec::new()))
1465 } else {
1466 Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1467 }
1468 }
1469 }
1470 Ok(None)
1471 }
1472
1473 fn transactions_by_block_range(
1474 &self,
1475 range: impl RangeBounds<BlockNumber>,
1476 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1477 let range = to_range(range);
1478 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1479
1480 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1481 .into_iter()
1482 .map(|body| {
1483 let tx_num_range = body.tx_num_range();
1484 if tx_num_range.is_empty() {
1485 Ok(Vec::new())
1486 } else {
1487 Ok(self
1488 .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1489 .into_iter()
1490 .collect())
1491 }
1492 })
1493 .collect()
1494 }
1495
1496 fn transactions_by_tx_range(
1497 &self,
1498 range: impl RangeBounds<TxNumber>,
1499 ) -> ProviderResult<Vec<Self::Transaction>> {
1500 self.transactions_by_tx_range_with_cursor(
1501 range,
1502 &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1503 )
1504 }
1505
1506 fn senders_by_tx_range(
1507 &self,
1508 range: impl RangeBounds<TxNumber>,
1509 ) -> ProviderResult<Vec<Address>> {
1510 self.cursor_read_collect::<tables::TransactionSenders>(range)
1511 }
1512
1513 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1514 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1515 }
1516}
1517
1518impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1519 type Receipt = ReceiptTy<N>;
1520
1521 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1522 self.static_file_provider.get_with_static_file_or_database(
1523 StaticFileSegment::Receipts,
1524 id,
1525 |static_file| static_file.receipt(id),
1526 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1527 )
1528 }
1529
1530 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1531 if let Some(id) = self.transaction_id(hash)? {
1532 self.receipt(id)
1533 } else {
1534 Ok(None)
1535 }
1536 }
1537
1538 fn receipts_by_block(
1539 &self,
1540 block: BlockHashOrNumber,
1541 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1542 if let Some(number) = self.convert_hash_or_number(block)? {
1543 if let Some(body) = self.block_body_indices(number)? {
1544 let tx_range = body.tx_num_range();
1545 return if tx_range.is_empty() {
1546 Ok(Some(Vec::new()))
1547 } else {
1548 self.receipts_by_tx_range(tx_range).map(Some)
1549 }
1550 }
1551 }
1552 Ok(None)
1553 }
1554
1555 fn receipts_by_tx_range(
1556 &self,
1557 range: impl RangeBounds<TxNumber>,
1558 ) -> ProviderResult<Vec<Self::Receipt>> {
1559 self.static_file_provider.get_range_with_static_file_or_database(
1560 StaticFileSegment::Receipts,
1561 to_range(range),
1562 |static_file, range, _| static_file.receipts_by_tx_range(range),
1563 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1564 |_| true,
1565 )
1566 }
1567}
1568
1569impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1570 for DatabaseProvider<TX, N>
1571{
1572 fn withdrawals_by_block(
1573 &self,
1574 id: BlockHashOrNumber,
1575 timestamp: u64,
1576 ) -> ProviderResult<Option<Withdrawals>> {
1577 if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1578 if let Some(number) = self.convert_hash_or_number(id)? {
1579 return self.static_file_provider.get_with_static_file_or_database(
1580 StaticFileSegment::BlockMeta,
1581 number,
1582 |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1583 || {
1584 let withdrawals = self
1587 .tx
1588 .get::<tables::BlockWithdrawals>(number)
1589 .map(|w| w.map(|w| w.withdrawals))?
1590 .unwrap_or_default();
1591 Ok(Some(withdrawals))
1592 },
1593 )
1594 }
1595 }
1596 Ok(None)
1597 }
1598}
1599
1600impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1601 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1606 if let Some(number) = self.convert_hash_or_number(id)? {
1607 if self.chain_spec.is_paris_active_at_block(number) {
1610 return Ok(Some(Vec::new()))
1611 }
1612
1613 return self.static_file_provider.get_with_static_file_or_database(
1614 StaticFileSegment::BlockMeta,
1615 number,
1616 |static_file| static_file.ommers(id),
1617 || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1618 )
1619 }
1620
1621 Ok(None)
1622 }
1623}
1624
1625impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1626 for DatabaseProvider<TX, N>
1627{
1628 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1629 self.static_file_provider.get_with_static_file_or_database(
1630 StaticFileSegment::BlockMeta,
1631 num,
1632 |static_file| static_file.block_body_indices(num),
1633 || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1634 )
1635 }
1636
1637 fn block_body_indices_range(
1638 &self,
1639 range: RangeInclusive<BlockNumber>,
1640 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1641 self.static_file_provider.get_range_with_static_file_or_database(
1642 StaticFileSegment::BlockMeta,
1643 *range.start()..*range.end() + 1,
1644 |static_file, range, _| {
1645 static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1646 },
1647 |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1648 |_| true,
1649 )
1650 }
1651}
1652
1653impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1654 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1655 Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1656 }
1657
1658 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1660 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1661 }
1662
1663 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1664 self.tx
1665 .cursor_read::<tables::StageCheckpoints>()?
1666 .walk(None)?
1667 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1668 .map_err(ProviderError::Database)
1669 }
1670}
1671
1672impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1673 fn save_stage_checkpoint(
1675 &self,
1676 id: StageId,
1677 checkpoint: StageCheckpoint,
1678 ) -> ProviderResult<()> {
1679 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1680 }
1681
1682 fn save_stage_checkpoint_progress(
1684 &self,
1685 id: StageId,
1686 checkpoint: Vec<u8>,
1687 ) -> ProviderResult<()> {
1688 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1689 }
1690
1691 fn update_pipeline_stages(
1692 &self,
1693 block_number: BlockNumber,
1694 drop_stage_checkpoint: bool,
1695 ) -> ProviderResult<()> {
1696 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1698 for stage_id in StageId::ALL {
1699 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1700 cursor.upsert(
1701 stage_id.to_string(),
1702 &StageCheckpoint {
1703 block_number,
1704 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1705 },
1706 )?;
1707 }
1708
1709 Ok(())
1710 }
1711}
1712
1713impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1714 fn plain_state_storages(
1715 &self,
1716 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1717 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1718 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1719
1720 addresses_with_keys
1721 .into_iter()
1722 .map(|(address, storage)| {
1723 storage
1724 .into_iter()
1725 .map(|key| -> ProviderResult<_> {
1726 Ok(plain_storage
1727 .seek_by_key_subkey(address, key)?
1728 .filter(|v| v.key == key)
1729 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1730 })
1731 .collect::<ProviderResult<Vec<_>>>()
1732 .map(|storage| (address, storage))
1733 })
1734 .collect::<ProviderResult<Vec<(_, _)>>>()
1735 }
1736
1737 fn changed_storages_with_range(
1738 &self,
1739 range: RangeInclusive<BlockNumber>,
1740 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1741 self.tx
1742 .cursor_read::<tables::StorageChangeSets>()?
1743 .walk_range(BlockNumberAddress::range(range))?
1744 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1747 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1748 accounts.entry(address).or_default().insert(storage_entry.key);
1749 Ok(accounts)
1750 })
1751 }
1752
1753 fn changed_storages_and_blocks_with_range(
1754 &self,
1755 range: RangeInclusive<BlockNumber>,
1756 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1757 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1758
1759 let storage_changeset_lists =
1760 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1761 BTreeMap::new(),
1762 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1763 let (index, storage) = entry?;
1764 storages
1765 .entry((index.address(), storage.key))
1766 .or_default()
1767 .push(index.block_number());
1768 Ok(storages)
1769 },
1770 )?;
1771
1772 Ok(storage_changeset_lists)
1773 }
1774}
1775
1776impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1777 for DatabaseProvider<TX, N>
1778{
1779 type Receipt = ReceiptTy<N>;
1780
1781 fn write_state(
1782 &self,
1783 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1784 is_value_known: OriginalValuesKnown,
1785 write_receipts_to: StorageLocation,
1786 ) -> ProviderResult<()> {
1787 let first_block = execution_outcome.first_block();
1788 let block_count = execution_outcome.len() as u64;
1789 let last_block = execution_outcome.last_block();
1790 let block_range = first_block..=last_block;
1791
1792 let tip = self.last_block_number()?.max(last_block);
1793
1794 let (plain_state, reverts) =
1795 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1796
1797 self.write_state_reverts(reverts, first_block)?;
1798 self.write_state_changes(plain_state)?;
1799
1800 let block_indices: Vec<_> = self
1802 .block_body_indices_range(block_range)?
1803 .into_iter()
1804 .map(|b| b.first_tx_num)
1805 .collect();
1806
1807 if block_indices.len() < block_count as usize {
1809 let missing_blocks = block_count - block_indices.len() as u64;
1810 return Err(ProviderError::BlockBodyIndicesNotFound(
1811 last_block.saturating_sub(missing_blocks - 1),
1812 ));
1813 }
1814
1815 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1816
1817 let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1822 .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1823 .transpose()?;
1824
1825 let mut receipts_static_writer = (write_receipts_to.static_files() &&
1829 !has_receipts_pruning)
1830 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1831 .transpose()?;
1832
1833 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1834 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1835
1836 let prunable_receipts =
1839 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1840
1841 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1843 for (_, addresses) in contract_log_pruner.range(..first_block) {
1844 allowed_addresses.extend(addresses.iter().copied());
1845 }
1846
1847 for (idx, (receipts, first_tx_index)) in
1848 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1849 {
1850 let block_number = first_block + idx as u64;
1851
1852 if let Some(writer) = receipts_static_writer.as_mut() {
1854 writer.increment_block(block_number)?;
1855 }
1856
1857 if prunable_receipts &&
1859 self.prune_modes
1860 .receipts
1861 .is_some_and(|mode| mode.should_prune(block_number, tip))
1862 {
1863 continue
1864 }
1865
1866 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1868 allowed_addresses.extend(new_addresses.iter().copied());
1869 }
1870
1871 for (idx, receipt) in receipts.iter().enumerate() {
1872 let receipt_idx = first_tx_index + idx as u64;
1873 if prunable_receipts &&
1876 has_contract_log_filter &&
1877 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1878 {
1879 continue
1880 }
1881
1882 if let Some(writer) = &mut receipts_static_writer {
1883 writer.append_receipt(receipt_idx, receipt)?;
1884 }
1885
1886 if let Some(cursor) = &mut receipts_cursor {
1887 cursor.append(receipt_idx, receipt)?;
1888 }
1889 }
1890 }
1891
1892 Ok(())
1893 }
1894
1895 fn write_state_reverts(
1896 &self,
1897 reverts: PlainStateReverts,
1898 first_block: BlockNumber,
1899 ) -> ProviderResult<()> {
1900 tracing::trace!("Writing storage changes");
1902 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1903 let mut storage_changeset_cursor =
1904 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1905 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1906 let block_number = first_block + block_index as BlockNumber;
1907
1908 tracing::trace!(block_number, "Writing block change");
1909 storage_changes.par_sort_unstable_by_key(|a| a.address);
1911 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1912 let storage_id = BlockNumberAddress((block_number, address));
1913
1914 let mut storage = storage_revert
1915 .into_iter()
1916 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1917 .collect::<Vec<_>>();
1918 storage.par_sort_unstable_by_key(|a| a.0);
1920
1921 let mut wiped_storage = Vec::new();
1925 if wiped {
1926 tracing::trace!(?address, "Wiping storage");
1927 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1928 wiped_storage.push((entry.key, entry.value));
1929 while let Some(entry) = storages_cursor.next_dup_val()? {
1930 wiped_storage.push((entry.key, entry.value))
1931 }
1932 }
1933 }
1934
1935 tracing::trace!(?address, ?storage, "Writing storage reverts");
1936 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1937 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1938 }
1939 }
1940 }
1941
1942 tracing::trace!("Writing account changes");
1944 let mut account_changeset_cursor =
1945 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1946
1947 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1948 let block_number = first_block + block_index as BlockNumber;
1949 account_block_reverts.par_sort_by_key(|a| a.0);
1951
1952 for (address, info) in account_block_reverts {
1953 account_changeset_cursor.append_dup(
1954 block_number,
1955 AccountBeforeTx { address, info: info.map(Into::into) },
1956 )?;
1957 }
1958 }
1959
1960 Ok(())
1961 }
1962
1963 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1964 changes.accounts.par_sort_by_key(|a| a.0);
1967 changes.storage.par_sort_by_key(|a| a.address);
1968 changes.contracts.par_sort_by_key(|a| a.0);
1969
1970 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1972 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1973 for (address, account) in changes.accounts {
1975 if let Some(account) = account {
1976 tracing::trace!(?address, "Updating plain state account");
1977 accounts_cursor.upsert(address, &account.into())?;
1978 } else if accounts_cursor.seek_exact(address)?.is_some() {
1979 tracing::trace!(?address, "Deleting plain state account");
1980 accounts_cursor.delete_current()?;
1981 }
1982 }
1983
1984 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1986 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1987 for (hash, bytecode) in changes.contracts {
1988 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1989 }
1990
1991 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1993 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1994 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1995 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1997 storages_cursor.delete_current_duplicates()?;
1998 }
1999 let mut storage = storage
2001 .into_iter()
2002 .map(|(k, value)| StorageEntry { key: k.into(), value })
2003 .collect::<Vec<_>>();
2004 storage.par_sort_unstable_by_key(|a| a.key);
2006
2007 for entry in storage {
2008 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2009 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2010 if db_entry.key == entry.key {
2011 storages_cursor.delete_current()?;
2012 }
2013 }
2014
2015 if !entry.value.is_zero() {
2016 storages_cursor.upsert(address, &entry)?;
2017 }
2018 }
2019 }
2020
2021 Ok(())
2022 }
2023
2024 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2025 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2027 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2028 if let Some(account) = account {
2029 hashed_accounts_cursor.upsert(hashed_address, &account)?;
2030 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2031 hashed_accounts_cursor.delete_current()?;
2032 }
2033 }
2034
2035 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2037 let mut hashed_storage_cursor =
2038 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2039 for (hashed_address, storage) in sorted_storages {
2040 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2041 hashed_storage_cursor.delete_current_duplicates()?;
2042 }
2043
2044 for (hashed_slot, value) in storage.storage_slots_sorted() {
2045 let entry = StorageEntry { key: hashed_slot, value };
2046 if let Some(db_entry) =
2047 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2048 {
2049 if db_entry.key == entry.key {
2050 hashed_storage_cursor.delete_current()?;
2051 }
2052 }
2053
2054 if !entry.value.is_zero() {
2055 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2056 }
2057 }
2058 }
2059
2060 Ok(())
2061 }
2062
2063 fn remove_state_above(
2085 &self,
2086 block: BlockNumber,
2087 remove_receipts_from: StorageLocation,
2088 ) -> ProviderResult<()> {
2089 let range = block + 1..=self.last_block_number()?;
2090
2091 if range.is_empty() {
2092 return Ok(());
2093 }
2094
2095 let block_bodies = self.block_body_indices_range(range.clone())?;
2097
2098 let from_transaction_num =
2100 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2101
2102 let storage_range = BlockNumberAddress::range(range.clone());
2103
2104 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2105 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2106
2107 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2112 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2113
2114 let (state, _) = self.populate_bundle_state(
2115 account_changeset,
2116 storage_changeset,
2117 &mut plain_accounts_cursor,
2118 &mut plain_storage_cursor,
2119 )?;
2120
2121 for (address, (old_account, new_account, storage)) in &state {
2123 if old_account != new_account {
2125 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2126 if let Some(account) = old_account {
2127 plain_accounts_cursor.upsert(*address, account)?;
2128 } else if existing_entry.is_some() {
2129 plain_accounts_cursor.delete_current()?;
2130 }
2131 }
2132
2133 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2135 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2136 if plain_storage_cursor
2139 .seek_by_key_subkey(*address, *storage_key)?
2140 .filter(|s| s.key == *storage_key)
2141 .is_some()
2142 {
2143 plain_storage_cursor.delete_current()?
2144 }
2145
2146 if !old_storage_value.is_zero() {
2148 plain_storage_cursor.upsert(*address, &storage_entry)?;
2149 }
2150 }
2151 }
2152
2153 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2154
2155 Ok(())
2156 }
2157
2158 fn take_state_above(
2180 &self,
2181 block: BlockNumber,
2182 remove_receipts_from: StorageLocation,
2183 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2184 let range = block + 1..=self.last_block_number()?;
2185
2186 if range.is_empty() {
2187 return Ok(ExecutionOutcome::default())
2188 }
2189 let start_block_number = *range.start();
2190
2191 let block_bodies = self.block_body_indices_range(range.clone())?;
2193
2194 let from_transaction_num =
2196 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2197 let to_transaction_num =
2198 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2199
2200 let storage_range = BlockNumberAddress::range(range.clone());
2201
2202 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2203 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2204
2205 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2210 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2211
2212 let (state, reverts) = self.populate_bundle_state(
2215 account_changeset,
2216 storage_changeset,
2217 &mut plain_accounts_cursor,
2218 &mut plain_storage_cursor,
2219 )?;
2220
2221 for (address, (old_account, new_account, storage)) in &state {
2223 if old_account != new_account {
2225 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2226 if let Some(account) = old_account {
2227 plain_accounts_cursor.upsert(*address, account)?;
2228 } else if existing_entry.is_some() {
2229 plain_accounts_cursor.delete_current()?;
2230 }
2231 }
2232
2233 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2235 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2236 if plain_storage_cursor
2239 .seek_by_key_subkey(*address, *storage_key)?
2240 .filter(|s| s.key == *storage_key)
2241 .is_some()
2242 {
2243 plain_storage_cursor.delete_current()?
2244 }
2245
2246 if !old_storage_value.is_zero() {
2248 plain_storage_cursor.upsert(*address, &storage_entry)?;
2249 }
2250 }
2251 }
2252
2253 let mut receipts_iter = self
2255 .static_file_provider
2256 .get_range_with_static_file_or_database(
2257 StaticFileSegment::Receipts,
2258 from_transaction_num..to_transaction_num + 1,
2259 |static_file, range, _| {
2260 static_file
2261 .receipts_by_tx_range(range.clone())
2262 .map(|r| range.into_iter().zip(r).collect())
2263 },
2264 |range, _| {
2265 self.tx
2266 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2267 .walk_range(range)?
2268 .map(|r| r.map_err(Into::into))
2269 .collect()
2270 },
2271 |_| true,
2272 )?
2273 .into_iter()
2274 .peekable();
2275
2276 let mut receipts = Vec::with_capacity(block_bodies.len());
2277 for block_body in block_bodies {
2279 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2280 for num in block_body.tx_num_range() {
2281 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2282 block_receipts.push(receipts_iter.next().unwrap().1);
2283 }
2284 }
2285 receipts.push(block_receipts);
2286 }
2287
2288 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2289
2290 Ok(ExecutionOutcome::new_init(
2291 state,
2292 reverts,
2293 Vec::new(),
2294 receipts,
2295 start_block_number,
2296 Vec::new(),
2297 ))
2298 }
2299}
2300
2301impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2302 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2304 if trie_updates.is_empty() {
2305 return Ok(0)
2306 }
2307
2308 let mut num_entries = 0;
2310
2311 let mut account_updates = trie_updates
2313 .removed_nodes_ref()
2314 .iter()
2315 .filter_map(|n| {
2316 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2317 })
2318 .collect::<Vec<_>>();
2319 account_updates.extend(
2320 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2321 );
2322 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2324
2325 let tx = self.tx_ref();
2326 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2327 for (key, updated_node) in account_updates {
2328 let nibbles = StoredNibbles(key.clone());
2329 match updated_node {
2330 Some(node) => {
2331 if !nibbles.0.is_empty() {
2332 num_entries += 1;
2333 account_trie_cursor.upsert(nibbles, node)?;
2334 }
2335 }
2336 None => {
2337 num_entries += 1;
2338 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2339 account_trie_cursor.delete_current()?;
2340 }
2341 }
2342 }
2343 }
2344
2345 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2346
2347 Ok(num_entries)
2348 }
2349}
2350
2351impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2352 fn write_storage_trie_updates(
2355 &self,
2356 storage_tries: &B256Map<StorageTrieUpdates>,
2357 ) -> ProviderResult<usize> {
2358 let mut num_entries = 0;
2359 let mut storage_tries = Vec::from_iter(storage_tries);
2360 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2361 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2362 for (hashed_address, storage_trie_updates) in storage_tries {
2363 let mut db_storage_trie_cursor =
2364 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2365 num_entries +=
2366 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2367 cursor = db_storage_trie_cursor.cursor;
2368 }
2369
2370 Ok(num_entries)
2371 }
2372
2373 fn write_individual_storage_trie_updates(
2374 &self,
2375 hashed_address: B256,
2376 updates: &StorageTrieUpdates,
2377 ) -> ProviderResult<usize> {
2378 if updates.is_empty() {
2379 return Ok(0)
2380 }
2381
2382 let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2383 let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2384 Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2385 }
2386}
2387
2388impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2389 fn unwind_account_hashing<'a>(
2390 &self,
2391 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2392 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2393 let hashed_accounts = changesets
2397 .into_iter()
2398 .map(|(_, e)| (keccak256(e.address), e.info))
2399 .collect::<Vec<_>>()
2400 .into_iter()
2401 .rev()
2402 .collect::<BTreeMap<_, _>>();
2403
2404 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2406 for (hashed_address, account) in &hashed_accounts {
2407 if let Some(account) = account {
2408 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2409 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2410 hashed_accounts_cursor.delete_current()?;
2411 }
2412 }
2413
2414 Ok(hashed_accounts)
2415 }
2416
2417 fn unwind_account_hashing_range(
2418 &self,
2419 range: impl RangeBounds<BlockNumber>,
2420 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2421 let changesets = self
2422 .tx
2423 .cursor_read::<tables::AccountChangeSets>()?
2424 .walk_range(range)?
2425 .collect::<Result<Vec<_>, _>>()?;
2426 self.unwind_account_hashing(changesets.iter())
2427 }
2428
2429 fn insert_account_for_hashing(
2430 &self,
2431 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2432 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2433 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2434 let hashed_accounts =
2435 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2436 for (hashed_address, account) in &hashed_accounts {
2437 if let Some(account) = account {
2438 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2439 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2440 hashed_accounts_cursor.delete_current()?;
2441 }
2442 }
2443 Ok(hashed_accounts)
2444 }
2445
2446 fn unwind_storage_hashing(
2447 &self,
2448 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2449 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2450 let mut hashed_storages = changesets
2452 .into_iter()
2453 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2454 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2455 })
2456 .collect::<Vec<_>>();
2457 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2458
2459 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2461 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2462 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2463 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2464 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2465
2466 if hashed_storage
2467 .seek_by_key_subkey(hashed_address, key)?
2468 .filter(|entry| entry.key == key)
2469 .is_some()
2470 {
2471 hashed_storage.delete_current()?;
2472 }
2473
2474 if !value.is_zero() {
2475 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2476 }
2477 }
2478 Ok(hashed_storage_keys)
2479 }
2480
2481 fn unwind_storage_hashing_range(
2482 &self,
2483 range: impl RangeBounds<BlockNumberAddress>,
2484 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2485 let changesets = self
2486 .tx
2487 .cursor_read::<tables::StorageChangeSets>()?
2488 .walk_range(range)?
2489 .collect::<Result<Vec<_>, _>>()?;
2490 self.unwind_storage_hashing(changesets.into_iter())
2491 }
2492
2493 fn insert_storage_for_hashing(
2494 &self,
2495 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2496 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2497 let hashed_storages =
2499 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2500 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2501 map.insert(keccak256(entry.key), entry.value);
2502 map
2503 });
2504 map.insert(keccak256(address), storage);
2505 map
2506 });
2507
2508 let hashed_storage_keys = hashed_storages
2509 .iter()
2510 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2511 .collect();
2512
2513 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2514 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2517 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2518 if hashed_storage_cursor
2519 .seek_by_key_subkey(hashed_address, key)?
2520 .filter(|entry| entry.key == key)
2521 .is_some()
2522 {
2523 hashed_storage_cursor.delete_current()?;
2524 }
2525
2526 if !value.is_zero() {
2527 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2528 }
2529 Ok(())
2530 })
2531 })?;
2532
2533 Ok(hashed_storage_keys)
2534 }
2535
2536 fn insert_hashes(
2537 &self,
2538 range: RangeInclusive<BlockNumber>,
2539 end_block_hash: B256,
2540 expected_state_root: B256,
2541 ) -> ProviderResult<()> {
2542 let mut account_prefix_set = PrefixSetMut::default();
2544 let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2545 let mut destroyed_accounts = HashSet::default();
2546
2547 let mut durations_recorder = metrics::DurationsRecorder::default();
2548
2549 {
2551 let lists = self.changed_storages_with_range(range.clone())?;
2552 let storages = self.plain_state_storages(lists)?;
2553 let storage_entries = self.insert_storage_for_hashing(storages)?;
2554 for (hashed_address, hashed_slots) in storage_entries {
2555 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2556 for slot in hashed_slots {
2557 storage_prefix_sets
2558 .entry(hashed_address)
2559 .or_default()
2560 .insert(Nibbles::unpack(slot));
2561 }
2562 }
2563 }
2564 durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2565
2566 {
2568 let lists = self.changed_accounts_with_range(range.clone())?;
2569 let accounts = self.basic_accounts(lists)?;
2570 let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2571 for (hashed_address, account) in hashed_addresses {
2572 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2573 if account.is_none() {
2574 destroyed_accounts.insert(hashed_address);
2575 }
2576 }
2577 }
2578 durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2579
2580 {
2582 let prefix_sets = TriePrefixSets {
2585 account_prefix_set: account_prefix_set.freeze(),
2586 storage_prefix_sets: storage_prefix_sets
2587 .into_iter()
2588 .map(|(k, v)| (k, v.freeze()))
2589 .collect(),
2590 destroyed_accounts,
2591 };
2592 let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2593 .with_prefix_sets(prefix_sets)
2594 .root_with_updates()
2595 .map_err(reth_db_api::DatabaseError::from)?;
2596 if state_root != expected_state_root {
2597 return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2598 root: GotExpected { got: state_root, expected: expected_state_root },
2599 block_number: *range.end(),
2600 block_hash: end_block_hash,
2601 })))
2602 }
2603 self.write_trie_updates(&trie_updates)?;
2604 }
2605 durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2606
2607 debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2608
2609 Ok(())
2610 }
2611}
2612
2613impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2614 fn unwind_account_history_indices<'a>(
2615 &self,
2616 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2617 ) -> ProviderResult<usize> {
2618 let mut last_indices = changesets
2619 .into_iter()
2620 .map(|(index, account)| (account.address, *index))
2621 .collect::<Vec<_>>();
2622 last_indices.sort_by_key(|(a, _)| *a);
2623
2624 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2626 for &(address, rem_index) in &last_indices {
2627 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2628 &mut cursor,
2629 ShardedKey::last(address),
2630 rem_index,
2631 |sharded_key| sharded_key.key == address,
2632 )?;
2633
2634 if !partial_shard.is_empty() {
2637 cursor.insert(
2638 ShardedKey::last(address),
2639 &BlockNumberList::new_pre_sorted(partial_shard),
2640 )?;
2641 }
2642 }
2643
2644 let changesets = last_indices.len();
2645 Ok(changesets)
2646 }
2647
2648 fn unwind_account_history_indices_range(
2649 &self,
2650 range: impl RangeBounds<BlockNumber>,
2651 ) -> ProviderResult<usize> {
2652 let changesets = self
2653 .tx
2654 .cursor_read::<tables::AccountChangeSets>()?
2655 .walk_range(range)?
2656 .collect::<Result<Vec<_>, _>>()?;
2657 self.unwind_account_history_indices(changesets.iter())
2658 }
2659
2660 fn insert_account_history_index(
2661 &self,
2662 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2663 ) -> ProviderResult<()> {
2664 self.append_history_index::<_, tables::AccountsHistory>(
2665 account_transitions,
2666 ShardedKey::new,
2667 )
2668 }
2669
2670 fn unwind_storage_history_indices(
2671 &self,
2672 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2673 ) -> ProviderResult<usize> {
2674 let mut storage_changesets = changesets
2675 .into_iter()
2676 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2677 .collect::<Vec<_>>();
2678 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2679
2680 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2681 for &(address, storage_key, rem_index) in &storage_changesets {
2682 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2683 &mut cursor,
2684 StorageShardedKey::last(address, storage_key),
2685 rem_index,
2686 |storage_sharded_key| {
2687 storage_sharded_key.address == address &&
2688 storage_sharded_key.sharded_key.key == storage_key
2689 },
2690 )?;
2691
2692 if !partial_shard.is_empty() {
2695 cursor.insert(
2696 StorageShardedKey::last(address, storage_key),
2697 &BlockNumberList::new_pre_sorted(partial_shard),
2698 )?;
2699 }
2700 }
2701
2702 let changesets = storage_changesets.len();
2703 Ok(changesets)
2704 }
2705
2706 fn unwind_storage_history_indices_range(
2707 &self,
2708 range: impl RangeBounds<BlockNumberAddress>,
2709 ) -> ProviderResult<usize> {
2710 let changesets = self
2711 .tx
2712 .cursor_read::<tables::StorageChangeSets>()?
2713 .walk_range(range)?
2714 .collect::<Result<Vec<_>, _>>()?;
2715 self.unwind_storage_history_indices(changesets.into_iter())
2716 }
2717
2718 fn insert_storage_history_index(
2719 &self,
2720 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2721 ) -> ProviderResult<()> {
2722 self.append_history_index::<_, tables::StoragesHistory>(
2723 storage_transitions,
2724 |(address, storage_key), highest_block_number| {
2725 StorageShardedKey::new(address, storage_key, highest_block_number)
2726 },
2727 )
2728 }
2729
2730 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2731 {
2733 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2734 self.insert_account_history_index(indices)?;
2735 }
2736
2737 {
2739 let indices = self.changed_storages_and_blocks_with_range(range)?;
2740 self.insert_storage_history_index(indices)?;
2741 }
2742
2743 Ok(())
2744 }
2745}
2746
2747impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2748 for DatabaseProvider<TX, N>
2749{
2750 fn take_block_and_execution_above(
2751 &self,
2752 block: BlockNumber,
2753 remove_from: StorageLocation,
2754 ) -> ProviderResult<Chain<Self::Primitives>> {
2755 let range = block + 1..=self.last_block_number()?;
2756
2757 self.unwind_trie_state_range(range.clone())?;
2758
2759 let execution_state = self.take_state_above(block, remove_from)?;
2761
2762 let blocks = self.recovered_block_range(range)?;
2763
2764 self.remove_blocks_above(block, remove_from)?;
2767
2768 self.update_pipeline_stages(block, true)?;
2770
2771 Ok(Chain::new(blocks, execution_state, None))
2772 }
2773
2774 fn remove_block_and_execution_above(
2775 &self,
2776 block: BlockNumber,
2777 remove_from: StorageLocation,
2778 ) -> ProviderResult<()> {
2779 let range = block + 1..=self.last_block_number()?;
2780
2781 self.unwind_trie_state_range(range)?;
2782
2783 self.remove_state_above(block, remove_from)?;
2785
2786 self.remove_blocks_above(block, remove_from)?;
2789
2790 self.update_pipeline_stages(block, true)?;
2792
2793 Ok(())
2794 }
2795}
2796
2797impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2798 for DatabaseProvider<TX, N>
2799{
2800 type Block = BlockTy<N>;
2801 type Receipt = ReceiptTy<N>;
2802
2803 fn insert_block(
2824 &self,
2825 block: RecoveredBlock<Self::Block>,
2826 write_to: StorageLocation,
2827 ) -> ProviderResult<StoredBlockBodyIndices> {
2828 let block_number = block.number();
2829
2830 let mut durations_recorder = metrics::DurationsRecorder::default();
2831
2832 let ttd = if block_number == 0 {
2834 block.header().difficulty()
2835 } else {
2836 let parent_block_number = block_number - 1;
2837 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2838 durations_recorder.record_relative(metrics::Action::GetParentTD);
2839 parent_ttd + block.header().difficulty()
2840 };
2841
2842 if write_to.database() {
2843 self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2844 durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2845
2846 self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2848 durations_recorder.record_relative(metrics::Action::InsertHeaders);
2849
2850 self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2851 durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2852 }
2853
2854 if write_to.static_files() {
2855 let mut writer =
2856 self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2857 writer.append_header(block.header(), ttd, &block.hash())?;
2858 }
2859
2860 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2861 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2862
2863 let mut next_tx_num = self
2864 .tx
2865 .cursor_read::<tables::TransactionBlocks>()?
2866 .last()?
2867 .map(|(n, _)| n + 1)
2868 .unwrap_or_default();
2869 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2870 let first_tx_num = next_tx_num;
2871
2872 let tx_count = block.body().transaction_count() as u64;
2873
2874 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2876 let hash = transaction.tx_hash();
2877
2878 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2879 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2880 }
2881
2882 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2883 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2884 }
2885 next_tx_num += 1;
2886 }
2887
2888 self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2889
2890 debug!(
2891 target: "providers::db",
2892 ?block_number,
2893 actions = ?durations_recorder.actions,
2894 "Inserted block"
2895 );
2896
2897 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2898 }
2899
2900 fn append_block_bodies(
2901 &self,
2902 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2903 write_to: StorageLocation,
2904 ) -> ProviderResult<()> {
2905 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2906
2907 let mut tx_static_writer = write_to
2909 .static_files()
2910 .then(|| {
2911 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2912 })
2913 .transpose()?;
2914
2915 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2916 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2917
2918 let mut tx_cursor = write_to
2920 .database()
2921 .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2922 .transpose()?;
2923
2924 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2926
2927 for (block_number, body) in &bodies {
2928 if let Some(writer) = tx_static_writer.as_mut() {
2930 writer.increment_block(*block_number)?;
2931 }
2932
2933 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2934 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2935
2936 let mut durations_recorder = metrics::DurationsRecorder::default();
2937
2938 block_indices_cursor.append(*block_number, &block_indices)?;
2940
2941 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2942
2943 let Some(body) = body else { continue };
2944
2945 if !body.transactions().is_empty() {
2947 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2948 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2949 }
2950
2951 for transaction in body.transactions() {
2953 if let Some(writer) = tx_static_writer.as_mut() {
2954 writer.append_transaction(next_tx_num, transaction)?;
2955 }
2956 if let Some(cursor) = tx_cursor.as_mut() {
2957 cursor.append(next_tx_num, transaction)?;
2958 }
2959
2960 next_tx_num += 1;
2962 }
2963
2964 debug!(
2965 target: "providers::db",
2966 ?block_number,
2967 actions = ?durations_recorder.actions,
2968 "Inserted block body"
2969 );
2970 }
2971
2972 self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2973
2974 Ok(())
2975 }
2976
2977 fn remove_blocks_above(
2978 &self,
2979 block: BlockNumber,
2980 remove_from: StorageLocation,
2981 ) -> ProviderResult<()> {
2982 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2983 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2984 }
2985
2986 self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2989 self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2990 self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2991
2992 let unwind_tx_from = self
2994 .block_body_indices(block)?
2995 .map(|b| b.next_tx_num())
2996 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2997
2998 let unwind_tx_to = self
3000 .tx
3001 .cursor_read::<tables::BlockBodyIndices>()?
3002 .last()?
3003 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3005 .1
3006 .last_tx_num();
3007
3008 if unwind_tx_from <= unwind_tx_to {
3009 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3010 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3011 }
3012 }
3013
3014 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3015
3016 self.remove_bodies_above(block, remove_from)?;
3017
3018 Ok(())
3019 }
3020
3021 fn remove_bodies_above(
3022 &self,
3023 block: BlockNumber,
3024 remove_from: StorageLocation,
3025 ) -> ProviderResult<()> {
3026 self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3027
3028 let unwind_tx_from = self
3030 .block_body_indices(block)?
3031 .map(|b| b.next_tx_num())
3032 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3033
3034 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3035 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3036
3037 if remove_from.database() {
3038 self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3039 }
3040
3041 if remove_from.static_files() {
3042 let static_file_tx_num = self
3043 .static_file_provider
3044 .get_highest_static_file_tx(StaticFileSegment::Transactions);
3045
3046 let to_delete = static_file_tx_num
3047 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3048 .unwrap_or_default();
3049
3050 self.static_file_provider
3051 .latest_writer(StaticFileSegment::Transactions)?
3052 .prune_transactions(to_delete, block)?;
3053 }
3054
3055 Ok(())
3056 }
3057
3058 fn append_blocks_with_state(
3060 &self,
3061 blocks: Vec<RecoveredBlock<Self::Block>>,
3062 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3063 hashed_state: HashedPostStateSorted,
3064 trie_updates: TrieUpdates,
3065 ) -> ProviderResult<()> {
3066 if blocks.is_empty() {
3067 debug!(target: "providers::db", "Attempted to append empty block range");
3068 return Ok(())
3069 }
3070
3071 let first_number = blocks.first().unwrap().number();
3072
3073 let last = blocks.last().unwrap();
3074 let last_block_number = last.number();
3075
3076 let mut durations_recorder = metrics::DurationsRecorder::default();
3077
3078 for block in blocks {
3080 self.insert_block(block, StorageLocation::Database)?;
3081 durations_recorder.record_relative(metrics::Action::InsertBlock);
3082 }
3083
3084 self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3085 durations_recorder.record_relative(metrics::Action::InsertState);
3086
3087 self.write_hashed_state(&hashed_state)?;
3089 self.write_trie_updates(&trie_updates)?;
3090 durations_recorder.record_relative(metrics::Action::InsertHashes);
3091
3092 self.update_history_indices(first_number..=last_block_number)?;
3093 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3094
3095 self.update_pipeline_stages(last_block_number, false)?;
3097 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3098
3099 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3100
3101 Ok(())
3102 }
3103}
3104
3105impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3106 fn get_prune_checkpoint(
3107 &self,
3108 segment: PruneSegment,
3109 ) -> ProviderResult<Option<PruneCheckpoint>> {
3110 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3111 }
3112
3113 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3114 Ok(self
3115 .tx
3116 .cursor_read::<tables::PruneCheckpoints>()?
3117 .walk(None)?
3118 .collect::<Result<_, _>>()?)
3119 }
3120}
3121
3122impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3123 fn save_prune_checkpoint(
3124 &self,
3125 segment: PruneSegment,
3126 checkpoint: PruneCheckpoint,
3127 ) -> ProviderResult<()> {
3128 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3129 }
3130}
3131
3132impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3133 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3134 let db_entries = self.tx.entries::<T>()?;
3135 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3136 Ok(entries) => entries,
3137 Err(ProviderError::UnsupportedProvider) => 0,
3138 Err(err) => return Err(err),
3139 };
3140
3141 Ok(db_entries + static_file_entries)
3142 }
3143}
3144
3145impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3146 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3147 let mut finalized_blocks = self
3148 .tx
3149 .cursor_read::<tables::ChainState>()?
3150 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3151 .take(1)
3152 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3153
3154 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3155 Ok(last_finalized_block_number)
3156 }
3157
3158 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3159 let mut finalized_blocks = self
3160 .tx
3161 .cursor_read::<tables::ChainState>()?
3162 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3163 .take(1)
3164 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3165
3166 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3167 Ok(last_finalized_block_number)
3168 }
3169}
3170
3171impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3172 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3173 Ok(self
3174 .tx
3175 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3176 }
3177
3178 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3179 Ok(self
3180 .tx
3181 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3182 }
3183}
3184
3185impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3186 type Tx = TX;
3187
3188 fn tx_ref(&self) -> &Self::Tx {
3189 &self.tx
3190 }
3191
3192 fn tx_mut(&mut self) -> &mut Self::Tx {
3193 &mut self.tx
3194 }
3195
3196 fn into_tx(self) -> Self::Tx {
3197 self.tx
3198 }
3199
3200 fn prune_modes_ref(&self) -> &PruneModes {
3201 self.prune_modes_ref()
3202 }
3203}