1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15 HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16 OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17 StageCheckpointReader, StateCommitmentProvider, StateProviderBox, StateWriter,
18 StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter,
19 TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter,
20 WithdrawalsProvider,
21};
22use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header, TxReceipt};
23use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
24use alloy_primitives::{
25 keccak256,
26 map::{hash_map, B256Map, HashMap, HashSet},
27 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
28};
29use itertools::Itertools;
30use rayon::slice::ParallelSliceMut;
31use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
32use reth_db_api::{
33 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
34 database::Database,
35 models::{
36 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
37 ShardedKey, StoredBlockBodyIndices,
38 },
39 table::Table,
40 tables,
41 transaction::{DbTx, DbTxMut},
42 BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
43};
44use reth_execution_types::{Chain, ExecutionOutcome};
45use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
46use reth_primitives_traits::{
47 Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
48 SealedBlock, SealedHeader, SignedTransaction, StorageEntry,
49};
50use reth_prune_types::{
51 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
52};
53use reth_stages_types::{StageCheckpoint, StageId};
54use reth_static_file_types::StaticFileSegment;
55use reth_storage_api::{
56 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
57 StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
58};
59use reth_storage_errors::provider::{ProviderResult, RootMismatch};
60use reth_trie::{
61 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
62 updates::{StorageTrieUpdates, TrieUpdates},
63 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
64};
65use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
66use revm_database::states::{
67 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
68};
69use std::{
70 cmp::Ordering,
71 collections::{BTreeMap, BTreeSet},
72 fmt::Debug,
73 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
74 sync::{mpsc, Arc},
75};
76use tracing::{debug, trace};
77
78pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
80
81#[derive(Debug)]
86pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
87 pub DatabaseProvider<<DB as Database>::TXMut, N>,
88);
89
90impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
91 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
92
93 fn deref(&self) -> &Self::Target {
94 &self.0
95 }
96}
97
98impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
99 fn deref_mut(&mut self) -> &mut Self::Target {
100 &mut self.0
101 }
102}
103
104impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
105 for DatabaseProviderRW<DB, N>
106{
107 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
108 &self.0
109 }
110}
111
112impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
113 pub fn commit(self) -> ProviderResult<bool> {
115 self.0.commit()
116 }
117
118 pub fn into_tx(self) -> <DB as Database>::TXMut {
120 self.0.into_tx()
121 }
122}
123
124impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
125 for DatabaseProvider<<DB as Database>::TXMut, N>
126{
127 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
128 provider.0
129 }
130}
131
132#[derive(Debug)]
135pub struct DatabaseProvider<TX, N: NodeTypes> {
136 tx: TX,
138 chain_spec: Arc<N::ChainSpec>,
140 static_file_provider: StaticFileProvider<N::Primitives>,
142 prune_modes: PruneModes,
144 storage: Arc<N::Storage>,
146}
147
148impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
149 pub const fn prune_modes_ref(&self) -> &PruneModes {
151 &self.prune_modes
152 }
153}
154
155impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
156 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
158 trace!(target: "providers::db", "Returning latest state provider");
159 Box::new(LatestStateProviderRef::new(self))
160 }
161
162 pub fn history_by_block_hash<'a>(
164 &'a self,
165 block_hash: BlockHash,
166 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
167 let mut block_number =
168 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
169 if block_number == self.best_block_number().unwrap_or_default() &&
170 block_number == self.last_block_number().unwrap_or_default()
171 {
172 return Ok(Box::new(LatestStateProviderRef::new(self)))
173 }
174
175 block_number += 1;
177
178 let account_history_prune_checkpoint =
179 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
180 let storage_history_prune_checkpoint =
181 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
182
183 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
184
185 if let Some(prune_checkpoint_block_number) =
188 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
189 {
190 state_provider = state_provider.with_lowest_available_account_history_block_number(
191 prune_checkpoint_block_number + 1,
192 );
193 }
194 if let Some(prune_checkpoint_block_number) =
195 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
196 {
197 state_provider = state_provider.with_lowest_available_storage_history_block_number(
198 prune_checkpoint_block_number + 1,
199 );
200 }
201
202 Ok(Box::new(state_provider))
203 }
204
205 #[cfg(feature = "test-utils")]
206 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
208 self.prune_modes = prune_modes;
209 }
210}
211
212impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
213 type Primitives = N::Primitives;
214}
215
216impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
217 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
219 self.static_file_provider.clone()
220 }
221}
222
223impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
224 for DatabaseProvider<TX, N>
225{
226 type ChainSpec = N::ChainSpec;
227
228 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
229 self.chain_spec.clone()
230 }
231}
232
233impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
234 pub const fn new_rw(
236 tx: TX,
237 chain_spec: Arc<N::ChainSpec>,
238 static_file_provider: StaticFileProvider<N::Primitives>,
239 prune_modes: PruneModes,
240 storage: Arc<N::Storage>,
241 ) -> Self {
242 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
243 }
244}
245
246impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
247 fn as_ref(&self) -> &Self {
248 self
249 }
250}
251
252impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
253 pub fn unwind_trie_state_range(
258 &self,
259 range: RangeInclusive<BlockNumber>,
260 ) -> ProviderResult<()> {
261 let changed_accounts = self
262 .tx
263 .cursor_read::<tables::AccountChangeSets>()?
264 .walk_range(range.clone())?
265 .collect::<Result<Vec<_>, _>>()?;
266
267 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
269 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
270 let mut destroyed_accounts = HashSet::default();
271 for (hashed_address, account) in hashed_addresses {
272 account_prefix_set.insert(Nibbles::unpack(hashed_address));
273 if account.is_none() {
274 destroyed_accounts.insert(hashed_address);
275 }
276 }
277
278 self.unwind_account_history_indices(changed_accounts.iter())?;
280 let storage_range = BlockNumberAddress::range(range.clone());
281
282 let changed_storages = self
283 .tx
284 .cursor_read::<tables::StorageChangeSets>()?
285 .walk_range(storage_range)?
286 .collect::<Result<Vec<_>, _>>()?;
287
288 let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
291 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
292 for (hashed_address, hashed_slots) in storage_entries {
293 account_prefix_set.insert(Nibbles::unpack(hashed_address));
294 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
295 for slot in hashed_slots {
296 storage_prefix_set.insert(Nibbles::unpack(slot));
297 }
298 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
299 }
300
301 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
303
304 let prefix_sets = TriePrefixSets {
308 account_prefix_set: account_prefix_set.freeze(),
309 storage_prefix_sets,
310 destroyed_accounts,
311 };
312 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
313 .with_prefix_sets(prefix_sets)
314 .root_with_updates()
315 .map_err(reth_db_api::DatabaseError::from)?;
316
317 let parent_number = range.start().saturating_sub(1);
318 let parent_state_root = self
319 .header_by_number(parent_number)?
320 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
321 .state_root();
322
323 if new_state_root != parent_state_root {
326 let parent_hash = self
327 .block_hash(parent_number)?
328 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
329 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
330 root: GotExpected { got: new_state_root, expected: parent_state_root },
331 block_number: parent_number,
332 block_hash: parent_hash,
333 })))
334 }
335 self.write_trie_updates(&trie_updates)?;
336
337 Ok(())
338 }
339
340 fn remove_receipts_from(
342 &self,
343 from_tx: TxNumber,
344 last_block: BlockNumber,
345 remove_from: StorageLocation,
346 ) -> ProviderResult<()> {
347 if remove_from.database() {
348 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
350 }
351
352 if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
353 let static_file_receipt_num =
354 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
355
356 let to_delete = static_file_receipt_num
357 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
358 .unwrap_or_default();
359
360 self.static_file_provider
361 .latest_writer(StaticFileSegment::Receipts)?
362 .prune_receipts(to_delete, last_block)?;
363 }
364
365 Ok(())
366 }
367}
368
369impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
370 fn try_into_history_at_block(
371 self,
372 mut block_number: BlockNumber,
373 ) -> ProviderResult<StateProviderBox> {
374 if block_number == self.best_block_number().unwrap_or_default() {
377 return Ok(Box::new(LatestStateProvider::new(self)))
378 }
379
380 block_number += 1;
382
383 let account_history_prune_checkpoint =
384 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
385 let storage_history_prune_checkpoint =
386 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
387
388 let mut state_provider = HistoricalStateProvider::new(self, block_number);
389
390 if let Some(prune_checkpoint_block_number) =
393 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
394 {
395 state_provider = state_provider.with_lowest_available_account_history_block_number(
396 prune_checkpoint_block_number + 1,
397 );
398 }
399 if let Some(prune_checkpoint_block_number) =
400 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
401 {
402 state_provider = state_provider.with_lowest_available_storage_history_block_number(
403 prune_checkpoint_block_number + 1,
404 );
405 }
406
407 Ok(Box::new(state_provider))
408 }
409}
410
411impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
412 type StateCommitment = N::StateCommitment;
413}
414
415impl<
416 Tx: DbTx + DbTxMut + 'static,
417 N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
418 > DatabaseProvider<Tx, N>
419{
420 pub fn insert_historical_block(
424 &self,
425 block: RecoveredBlock<<Self as BlockWriter>::Block>,
426 ) -> ProviderResult<StoredBlockBodyIndices> {
427 let ttd = if block.number() == 0 {
428 block.header().difficulty()
429 } else {
430 let parent_block_number = block.number() - 1;
431 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
432 parent_ttd + block.header().difficulty()
433 };
434
435 let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
436
437 let segment_header = writer.user_header();
439 if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
440 for block_number in 0..block.number() {
441 let mut prev = block.clone_header();
442 prev.number = block_number;
443 writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
444 }
445 }
446
447 writer.append_header(block.header(), ttd, &block.hash())?;
448
449 self.insert_block(block, StorageLocation::Database)
450 }
451}
452
453fn unwind_history_shards<S, T, C>(
466 cursor: &mut C,
467 start_key: T::Key,
468 block_number: BlockNumber,
469 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
470) -> ProviderResult<Vec<u64>>
471where
472 T: Table<Value = BlockNumberList>,
473 T::Key: AsRef<ShardedKey<S>>,
474 C: DbCursorRO<T> + DbCursorRW<T>,
475{
476 let mut item = cursor.seek_exact(start_key)?;
477 while let Some((sharded_key, list)) = item {
478 if !shard_belongs_to_key(&sharded_key) {
480 break
481 }
482 cursor.delete_current()?;
483
484 let first = list.iter().next().expect("List can't be empty");
487 if first >= block_number {
488 item = cursor.prev()?;
489 continue
490 } else if block_number <= sharded_key.as_ref().highest_block_number {
491 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
493 }
494 return Ok(list.iter().collect::<Vec<_>>())
495 }
496
497 Ok(Vec::new())
498}
499
500impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
501 pub const fn new(
503 tx: TX,
504 chain_spec: Arc<N::ChainSpec>,
505 static_file_provider: StaticFileProvider<N::Primitives>,
506 prune_modes: PruneModes,
507 storage: Arc<N::Storage>,
508 ) -> Self {
509 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
510 }
511
512 pub fn into_tx(self) -> TX {
514 self.tx
515 }
516
517 pub const fn tx_mut(&mut self) -> &mut TX {
519 &mut self.tx
520 }
521
522 pub const fn tx_ref(&self) -> &TX {
524 &self.tx
525 }
526
527 pub fn chain_spec(&self) -> &N::ChainSpec {
529 &self.chain_spec
530 }
531}
532
533impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
534 fn transactions_by_tx_range_with_cursor<C>(
535 &self,
536 range: impl RangeBounds<TxNumber>,
537 cursor: &mut C,
538 ) -> ProviderResult<Vec<TxTy<N>>>
539 where
540 C: DbCursorRO<tables::Transactions<TxTy<N>>>,
541 {
542 self.static_file_provider.get_range_with_static_file_or_database(
543 StaticFileSegment::Transactions,
544 to_range(range),
545 |static_file, range, _| static_file.transactions_by_tx_range(range),
546 |range, _| self.cursor_collect(cursor, range),
547 |_| true,
548 )
549 }
550
551 fn recovered_block<H, HF, B, BF>(
552 &self,
553 id: BlockHashOrNumber,
554 _transaction_kind: TransactionVariant,
555 header_by_number: HF,
556 construct_block: BF,
557 ) -> ProviderResult<Option<B>>
558 where
559 H: AsRef<HeaderTy<N>>,
560 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
561 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
562 {
563 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
564 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
565
566 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
573
574 let tx_range = body.tx_num_range();
575
576 let (transactions, senders) = if tx_range.is_empty() {
577 (vec![], vec![])
578 } else {
579 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
580 };
581
582 let body = self
583 .storage
584 .reader()
585 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
586 .pop()
587 .ok_or(ProviderError::InvalidStorageOutput)?;
588
589 construct_block(header, body, senders)
590 }
591
592 fn block_range<F, H, HF, R>(
602 &self,
603 range: RangeInclusive<BlockNumber>,
604 headers_range: HF,
605 mut assemble_block: F,
606 ) -> ProviderResult<Vec<R>>
607 where
608 H: AsRef<HeaderTy<N>>,
609 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
610 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
611 {
612 if range.is_empty() {
613 return Ok(Vec::new())
614 }
615
616 let len = range.end().saturating_sub(*range.start()) as usize;
617 let mut blocks = Vec::with_capacity(len);
618
619 let headers = headers_range(range.clone())?;
620 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
621
622 let present_headers = self
628 .block_body_indices_range(range)?
629 .into_iter()
630 .map(|b| b.tx_num_range())
631 .zip(headers)
632 .collect::<Vec<_>>();
633
634 let mut inputs = Vec::new();
635 for (tx_range, header) in &present_headers {
636 let transactions = if tx_range.is_empty() {
637 Vec::new()
638 } else {
639 self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
640 };
641
642 inputs.push((header.as_ref(), transactions));
643 }
644
645 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
646
647 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
648 blocks.push(assemble_block(header, body, tx_range)?);
649 }
650
651 Ok(blocks)
652 }
653
654 fn block_with_senders_range<H, HF, B, BF>(
665 &self,
666 range: RangeInclusive<BlockNumber>,
667 headers_range: HF,
668 assemble_block: BF,
669 ) -> ProviderResult<Vec<B>>
670 where
671 H: AsRef<HeaderTy<N>>,
672 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
673 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
674 {
675 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
676
677 self.block_range(range, headers_range, |header, body, tx_range| {
678 let senders = if tx_range.is_empty() {
679 Vec::new()
680 } else {
681 let known_senders =
683 senders_cursor
684 .walk_range(tx_range.clone())?
685 .collect::<Result<HashMap<_, _>, _>>()?;
686
687 let mut senders = Vec::with_capacity(body.transactions().len());
688 for (tx_num, tx) in tx_range.zip(body.transactions()) {
689 match known_senders.get(&tx_num) {
690 None => {
691 let sender = tx.recover_signer_unchecked()?;
693 senders.push(sender);
694 }
695 Some(sender) => senders.push(*sender),
696 }
697 }
698
699 senders
700 };
701
702 assemble_block(header, body, senders)
703 })
704 }
705
706 fn populate_bundle_state<A, S>(
710 &self,
711 account_changeset: Vec<(u64, AccountBeforeTx)>,
712 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
713 plain_accounts_cursor: &mut A,
714 plain_storage_cursor: &mut S,
715 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
716 where
717 A: DbCursorRO<PlainAccountState>,
718 S: DbDupCursorRO<PlainStorageState>,
719 {
720 let mut state: BundleStateInit = HashMap::default();
724
725 let mut reverts: RevertsInit = HashMap::default();
731
732 for (block_number, account_before) in account_changeset.into_iter().rev() {
734 let AccountBeforeTx { info: old_info, address } = account_before;
735 match state.entry(address) {
736 hash_map::Entry::Vacant(entry) => {
737 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
738 entry.insert((old_info, new_info, HashMap::default()));
739 }
740 hash_map::Entry::Occupied(mut entry) => {
741 entry.get_mut().0 = old_info;
743 }
744 }
745 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
747 }
748
749 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
751 let BlockNumberAddress((block_number, address)) = block_and_address;
752 let account_state = match state.entry(address) {
754 hash_map::Entry::Vacant(entry) => {
755 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
756 entry.insert((present_info, present_info, HashMap::default()))
757 }
758 hash_map::Entry::Occupied(entry) => entry.into_mut(),
759 };
760
761 match account_state.2.entry(old_storage.key) {
763 hash_map::Entry::Vacant(entry) => {
764 let new_storage = plain_storage_cursor
765 .seek_by_key_subkey(address, old_storage.key)?
766 .filter(|storage| storage.key == old_storage.key)
767 .unwrap_or_default();
768 entry.insert((old_storage.value, new_storage.value));
769 }
770 hash_map::Entry::Occupied(mut entry) => {
771 entry.get_mut().0 = old_storage.value;
772 }
773 };
774
775 reverts
776 .entry(block_number)
777 .or_default()
778 .entry(address)
779 .or_default()
780 .1
781 .push(old_storage);
782 }
783
784 Ok((state, reverts))
785 }
786}
787
788impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
789 pub fn commit(self) -> ProviderResult<bool> {
791 Ok(self.tx.commit()?)
792 }
793
794 fn take_shard<T>(
797 &self,
798 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
799 key: T::Key,
800 ) -> ProviderResult<Vec<u64>>
801 where
802 T: Table<Value = BlockNumberList>,
803 {
804 if let Some((_, list)) = cursor.seek_exact(key)? {
805 cursor.delete_current()?;
807 let list = list.iter().collect::<Vec<_>>();
808 return Ok(list)
809 }
810 Ok(Vec::new())
811 }
812
813 fn append_history_index<P, T>(
821 &self,
822 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
823 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
824 ) -> ProviderResult<()>
825 where
826 P: Copy,
827 T: Table<Value = BlockNumberList>,
828 {
829 let mut cursor = self.tx.cursor_write::<T>()?;
830 for (partial_key, indices) in index_updates {
831 let mut last_shard =
832 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
833 last_shard.extend(indices);
834 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
836 while let Some(list) = chunks.next() {
837 let highest_block_number = if chunks.peek().is_some() {
838 *list.last().expect("`chunks` does not return empty list")
839 } else {
840 u64::MAX
842 };
843 cursor.insert(
844 sharded_key_factory(partial_key, highest_block_number),
845 &BlockNumberList::new_pre_sorted(list.iter().copied()),
846 )?;
847 }
848 }
849 Ok(())
850 }
851}
852
853impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
854 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
855 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
856 }
857}
858
859impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
860 fn changed_accounts_with_range(
861 &self,
862 range: impl RangeBounds<BlockNumber>,
863 ) -> ProviderResult<BTreeSet<Address>> {
864 self.tx
865 .cursor_read::<tables::AccountChangeSets>()?
866 .walk_range(range)?
867 .map(|entry| {
868 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
869 })
870 .collect()
871 }
872
873 fn basic_accounts(
874 &self,
875 iter: impl IntoIterator<Item = Address>,
876 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
877 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
878 Ok(iter
879 .into_iter()
880 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
881 .collect::<Result<Vec<_>, _>>()?)
882 }
883
884 fn changed_accounts_and_blocks_with_range(
885 &self,
886 range: RangeInclusive<BlockNumber>,
887 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
888 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
889
890 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
891 BTreeMap::new(),
892 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
893 let (index, account) = entry?;
894 accounts.entry(account.address).or_default().push(index);
895 Ok(accounts)
896 },
897 )?;
898
899 Ok(account_transitions)
900 }
901}
902
903impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
904 fn storage_changeset(
905 &self,
906 block_number: BlockNumber,
907 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
908 let range = block_number..=block_number;
909 let storage_range = BlockNumberAddress::range(range);
910 self.tx
911 .cursor_dup_read::<tables::StorageChangeSets>()?
912 .walk_range(storage_range)?
913 .map(|result| -> ProviderResult<_> { Ok(result?) })
914 .collect()
915 }
916}
917
918impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
919 fn account_block_changeset(
920 &self,
921 block_number: BlockNumber,
922 ) -> ProviderResult<Vec<AccountBeforeTx>> {
923 let range = block_number..=block_number;
924 self.tx
925 .cursor_read::<tables::AccountChangeSets>()?
926 .walk_range(range)?
927 .map(|result| -> ProviderResult<_> {
928 let (_, account_before) = result?;
929 Ok(account_before)
930 })
931 .collect()
932 }
933}
934
935impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
936 for DatabaseProvider<TX, N>
937{
938 type Header = HeaderTy<N>;
939
940 fn local_tip_header(
941 &self,
942 highest_uninterrupted_block: BlockNumber,
943 ) -> ProviderResult<SealedHeader<Self::Header>> {
944 let static_file_provider = self.static_file_provider();
945
946 let next_static_file_block_num = static_file_provider
949 .get_highest_static_file_block(StaticFileSegment::Headers)
950 .map(|id| id + 1)
951 .unwrap_or_default();
952 let next_block = highest_uninterrupted_block + 1;
953
954 match next_static_file_block_num.cmp(&next_block) {
955 Ordering::Greater => {
958 let mut static_file_producer =
959 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
960 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
961 static_file_producer.commit()?
964 }
965 Ordering::Less => {
966 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
968 }
969 Ordering::Equal => {}
970 }
971
972 let local_head = static_file_provider
973 .sealed_header(highest_uninterrupted_block)?
974 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
975
976 Ok(local_head)
977 }
978}
979
980impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
981 type Header = HeaderTy<N>;
982
983 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
984 if let Some(num) = self.block_number(*block_hash)? {
985 Ok(self.header_by_number(num)?)
986 } else {
987 Ok(None)
988 }
989 }
990
991 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
992 self.static_file_provider.get_with_static_file_or_database(
993 StaticFileSegment::Headers,
994 num,
995 |static_file| static_file.header_by_number(num),
996 || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
997 )
998 }
999
1000 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1001 if let Some(num) = self.block_number(*block_hash)? {
1002 self.header_td_by_number(num)
1003 } else {
1004 Ok(None)
1005 }
1006 }
1007
1008 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1009 if self.chain_spec.is_paris_active_at_block(number) {
1010 if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1011 return Ok(Some(td))
1014 }
1015 }
1016
1017 self.static_file_provider.get_with_static_file_or_database(
1018 StaticFileSegment::Headers,
1019 number,
1020 |static_file| static_file.header_td_by_number(number),
1021 || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1022 )
1023 }
1024
1025 fn headers_range(
1026 &self,
1027 range: impl RangeBounds<BlockNumber>,
1028 ) -> ProviderResult<Vec<Self::Header>> {
1029 self.static_file_provider.get_range_with_static_file_or_database(
1030 StaticFileSegment::Headers,
1031 to_range(range),
1032 |static_file, range, _| static_file.headers_range(range),
1033 |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1034 |_| true,
1035 )
1036 }
1037
1038 fn sealed_header(
1039 &self,
1040 number: BlockNumber,
1041 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1042 self.static_file_provider.get_with_static_file_or_database(
1043 StaticFileSegment::Headers,
1044 number,
1045 |static_file| static_file.sealed_header(number),
1046 || {
1047 if let Some(header) = self.header_by_number(number)? {
1048 let hash = self
1049 .block_hash(number)?
1050 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1051 Ok(Some(SealedHeader::new(header, hash)))
1052 } else {
1053 Ok(None)
1054 }
1055 },
1056 )
1057 }
1058
1059 fn sealed_headers_while(
1060 &self,
1061 range: impl RangeBounds<BlockNumber>,
1062 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1063 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1064 self.static_file_provider.get_range_with_static_file_or_database(
1065 StaticFileSegment::Headers,
1066 to_range(range),
1067 |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1068 |range, mut predicate| {
1069 let mut headers = vec![];
1070 for entry in
1071 self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1072 {
1073 let (number, header) = entry?;
1074 let hash = self
1075 .block_hash(number)?
1076 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1077 let sealed = SealedHeader::new(header, hash);
1078 if !predicate(&sealed) {
1079 break
1080 }
1081 headers.push(sealed);
1082 }
1083 Ok(headers)
1084 },
1085 predicate,
1086 )
1087 }
1088}
1089
1090impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1091 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1092 self.static_file_provider.get_with_static_file_or_database(
1093 StaticFileSegment::Headers,
1094 number,
1095 |static_file| static_file.block_hash(number),
1096 || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1097 )
1098 }
1099
1100 fn canonical_hashes_range(
1101 &self,
1102 start: BlockNumber,
1103 end: BlockNumber,
1104 ) -> ProviderResult<Vec<B256>> {
1105 self.static_file_provider.get_range_with_static_file_or_database(
1106 StaticFileSegment::Headers,
1107 start..end,
1108 |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1109 |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1110 |_| true,
1111 )
1112 }
1113}
1114
1115impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1116 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1117 let best_number = self.best_block_number()?;
1118 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1119 Ok(ChainInfo { best_hash, best_number })
1120 }
1121
1122 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1123 Ok(self
1126 .get_stage_checkpoint(StageId::Finish)?
1127 .map(|checkpoint| checkpoint.block_number)
1128 .unwrap_or_default())
1129 }
1130
1131 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1132 Ok(self
1133 .tx
1134 .cursor_read::<tables::CanonicalHeaders>()?
1135 .last()?
1136 .map(|(num, _)| num)
1137 .max(
1138 self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1139 )
1140 .unwrap_or_default())
1141 }
1142
1143 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1144 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1145 }
1146}
1147
1148impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1149 type Block = BlockTy<N>;
1150
1151 fn find_block_by_hash(
1152 &self,
1153 hash: B256,
1154 source: BlockSource,
1155 ) -> ProviderResult<Option<Self::Block>> {
1156 if source.is_canonical() {
1157 self.block(hash.into())
1158 } else {
1159 Ok(None)
1160 }
1161 }
1162
1163 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1169 if let Some(number) = self.convert_hash_or_number(id)? {
1170 if let Some(header) = self.header_by_number(number)? {
1171 let Some(transactions) = self.transactions_by_block(number.into())? else {
1176 return Ok(None)
1177 };
1178
1179 let body = self
1180 .storage
1181 .reader()
1182 .read_block_bodies(self, vec![(&header, transactions)])?
1183 .pop()
1184 .ok_or(ProviderError::InvalidStorageOutput)?;
1185
1186 return Ok(Some(Self::Block::new(header, body)))
1187 }
1188 }
1189
1190 Ok(None)
1191 }
1192
1193 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1194 Ok(None)
1195 }
1196
1197 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1198 Ok(None)
1199 }
1200
1201 fn pending_block_and_receipts(
1202 &self,
1203 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1204 Ok(None)
1205 }
1206
1207 fn recovered_block(
1216 &self,
1217 id: BlockHashOrNumber,
1218 transaction_kind: TransactionVariant,
1219 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1220 self.recovered_block(
1221 id,
1222 transaction_kind,
1223 |block_number| self.header_by_number(block_number),
1224 |header, body, senders| {
1225 Self::Block::new(header, body)
1226 .try_into_recovered_unchecked(senders)
1230 .map(Some)
1231 .map_err(|_| ProviderError::SenderRecoveryError)
1232 },
1233 )
1234 }
1235
1236 fn sealed_block_with_senders(
1237 &self,
1238 id: BlockHashOrNumber,
1239 transaction_kind: TransactionVariant,
1240 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1241 self.recovered_block(
1242 id,
1243 transaction_kind,
1244 |block_number| self.sealed_header(block_number),
1245 |header, body, senders| {
1246 Self::Block::new_sealed(header, body)
1247 .try_with_senders_unchecked(senders)
1251 .map(Some)
1252 .map_err(|_| ProviderError::SenderRecoveryError)
1253 },
1254 )
1255 }
1256
1257 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1258 self.block_range(
1259 range,
1260 |range| self.headers_range(range),
1261 |header, body, _| Ok(Self::Block::new(header, body)),
1262 )
1263 }
1264
1265 fn block_with_senders_range(
1266 &self,
1267 range: RangeInclusive<BlockNumber>,
1268 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1269 self.block_with_senders_range(
1270 range,
1271 |range| self.headers_range(range),
1272 |header, body, senders| {
1273 Self::Block::new(header, body)
1274 .try_into_recovered_unchecked(senders)
1275 .map_err(|_| ProviderError::SenderRecoveryError)
1276 },
1277 )
1278 }
1279
1280 fn recovered_block_range(
1281 &self,
1282 range: RangeInclusive<BlockNumber>,
1283 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1284 self.block_with_senders_range(
1285 range,
1286 |range| self.sealed_headers_range(range),
1287 |header, body, senders| {
1288 Self::Block::new_sealed(header, body)
1289 .try_with_senders(senders)
1290 .map_err(|_| ProviderError::SenderRecoveryError)
1291 },
1292 )
1293 }
1294}
1295
1296impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1297 for DatabaseProvider<TX, N>
1298{
1299 fn transaction_hashes_by_range(
1302 &self,
1303 tx_range: Range<TxNumber>,
1304 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1305 self.static_file_provider.get_range_with_static_file_or_database(
1306 StaticFileSegment::Transactions,
1307 tx_range,
1308 |static_file, range, _| static_file.transaction_hashes_by_range(range),
1309 |tx_range, _| {
1310 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1311 let tx_range_size = tx_range.clone().count();
1312 let tx_walker = tx_cursor.walk_range(tx_range)?;
1313
1314 let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1315 let mut channels = Vec::with_capacity(chunk_size);
1316 let mut transaction_count = 0;
1317
1318 #[inline]
1319 fn calculate_hash<T>(
1320 entry: Result<(TxNumber, T), DatabaseError>,
1321 rlp_buf: &mut Vec<u8>,
1322 ) -> Result<(B256, TxNumber), Box<ProviderError>>
1323 where
1324 T: Encodable2718,
1325 {
1326 let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1327 tx.encode_2718(rlp_buf);
1328 Ok((keccak256(rlp_buf), tx_id))
1329 }
1330
1331 for chunk in &tx_walker.chunks(chunk_size) {
1332 let (tx, rx) = mpsc::channel();
1333 channels.push(rx);
1334
1335 let chunk: Vec<_> = chunk.collect();
1338 transaction_count += chunk.len();
1339
1340 rayon::spawn(move || {
1344 let mut rlp_buf = Vec::with_capacity(128);
1345 for entry in chunk {
1346 rlp_buf.clear();
1347 let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1348 }
1349 });
1350 }
1351 let mut tx_list = Vec::with_capacity(transaction_count);
1352
1353 for channel in channels {
1355 while let Ok(tx) = channel.recv() {
1356 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1357 tx_list.push((tx_hash, tx_id));
1358 }
1359 }
1360
1361 Ok(tx_list)
1362 },
1363 |_| true,
1364 )
1365 }
1366}
1367
1368impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1370 type Transaction = TxTy<N>;
1371
1372 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1373 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1374 }
1375
1376 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1377 self.static_file_provider.get_with_static_file_or_database(
1378 StaticFileSegment::Transactions,
1379 id,
1380 |static_file| static_file.transaction_by_id(id),
1381 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1382 )
1383 }
1384
1385 fn transaction_by_id_unhashed(
1386 &self,
1387 id: TxNumber,
1388 ) -> ProviderResult<Option<Self::Transaction>> {
1389 self.static_file_provider.get_with_static_file_or_database(
1390 StaticFileSegment::Transactions,
1391 id,
1392 |static_file| static_file.transaction_by_id_unhashed(id),
1393 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1394 )
1395 }
1396
1397 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1398 if let Some(id) = self.transaction_id(hash)? {
1399 Ok(self.transaction_by_id_unhashed(id)?)
1400 } else {
1401 Ok(None)
1402 }
1403 }
1404
1405 fn transaction_by_hash_with_meta(
1406 &self,
1407 tx_hash: TxHash,
1408 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1409 let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1410 if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1411 if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1412 if let Some(block_number) =
1413 transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1414 {
1415 if let Some(sealed_header) = self.sealed_header(block_number)? {
1416 let (header, block_hash) = sealed_header.split();
1417 if let Some(block_body) = self.block_body_indices(block_number)? {
1418 let index = transaction_id - block_body.first_tx_num();
1423
1424 let meta = TransactionMeta {
1425 tx_hash,
1426 index,
1427 block_hash,
1428 block_number,
1429 base_fee: header.base_fee_per_gas(),
1430 excess_blob_gas: header.excess_blob_gas(),
1431 timestamp: header.timestamp(),
1432 };
1433
1434 return Ok(Some((transaction, meta)))
1435 }
1436 }
1437 }
1438 }
1439 }
1440
1441 Ok(None)
1442 }
1443
1444 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1445 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1446 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1447 }
1448
1449 fn transactions_by_block(
1450 &self,
1451 id: BlockHashOrNumber,
1452 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1453 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1454
1455 if let Some(block_number) = self.convert_hash_or_number(id)? {
1456 if let Some(body) = self.block_body_indices(block_number)? {
1457 let tx_range = body.tx_num_range();
1458 return if tx_range.is_empty() {
1459 Ok(Some(Vec::new()))
1460 } else {
1461 Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1462 }
1463 }
1464 }
1465 Ok(None)
1466 }
1467
1468 fn transactions_by_block_range(
1469 &self,
1470 range: impl RangeBounds<BlockNumber>,
1471 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1472 let range = to_range(range);
1473 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1474
1475 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1476 .into_iter()
1477 .map(|body| {
1478 let tx_num_range = body.tx_num_range();
1479 if tx_num_range.is_empty() {
1480 Ok(Vec::new())
1481 } else {
1482 Ok(self
1483 .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1484 .into_iter()
1485 .collect())
1486 }
1487 })
1488 .collect()
1489 }
1490
1491 fn transactions_by_tx_range(
1492 &self,
1493 range: impl RangeBounds<TxNumber>,
1494 ) -> ProviderResult<Vec<Self::Transaction>> {
1495 self.transactions_by_tx_range_with_cursor(
1496 range,
1497 &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1498 )
1499 }
1500
1501 fn senders_by_tx_range(
1502 &self,
1503 range: impl RangeBounds<TxNumber>,
1504 ) -> ProviderResult<Vec<Address>> {
1505 self.cursor_read_collect::<tables::TransactionSenders>(range)
1506 }
1507
1508 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1509 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1510 }
1511}
1512
1513impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1514 type Receipt = ReceiptTy<N>;
1515
1516 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1517 self.static_file_provider.get_with_static_file_or_database(
1518 StaticFileSegment::Receipts,
1519 id,
1520 |static_file| static_file.receipt(id),
1521 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1522 )
1523 }
1524
1525 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1526 if let Some(id) = self.transaction_id(hash)? {
1527 self.receipt(id)
1528 } else {
1529 Ok(None)
1530 }
1531 }
1532
1533 fn receipts_by_block(
1534 &self,
1535 block: BlockHashOrNumber,
1536 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1537 if let Some(number) = self.convert_hash_or_number(block)? {
1538 if let Some(body) = self.block_body_indices(number)? {
1539 let tx_range = body.tx_num_range();
1540 return if tx_range.is_empty() {
1541 Ok(Some(Vec::new()))
1542 } else {
1543 self.receipts_by_tx_range(tx_range).map(Some)
1544 }
1545 }
1546 }
1547 Ok(None)
1548 }
1549
1550 fn receipts_by_tx_range(
1551 &self,
1552 range: impl RangeBounds<TxNumber>,
1553 ) -> ProviderResult<Vec<Self::Receipt>> {
1554 self.static_file_provider.get_range_with_static_file_or_database(
1555 StaticFileSegment::Receipts,
1556 to_range(range),
1557 |static_file, range, _| static_file.receipts_by_tx_range(range),
1558 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1559 |_| true,
1560 )
1561 }
1562}
1563
1564impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1565 for DatabaseProvider<TX, N>
1566{
1567 fn withdrawals_by_block(
1568 &self,
1569 id: BlockHashOrNumber,
1570 timestamp: u64,
1571 ) -> ProviderResult<Option<Withdrawals>> {
1572 if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1573 if let Some(number) = self.convert_hash_or_number(id)? {
1574 return self.static_file_provider.get_with_static_file_or_database(
1575 StaticFileSegment::BlockMeta,
1576 number,
1577 |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1578 || {
1579 let withdrawals = self
1582 .tx
1583 .get::<tables::BlockWithdrawals>(number)
1584 .map(|w| w.map(|w| w.withdrawals))?
1585 .unwrap_or_default();
1586 Ok(Some(withdrawals))
1587 },
1588 )
1589 }
1590 }
1591 Ok(None)
1592 }
1593}
1594
1595impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1596 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1601 if let Some(number) = self.convert_hash_or_number(id)? {
1602 if self.chain_spec.is_paris_active_at_block(number) {
1605 return Ok(Some(Vec::new()))
1606 }
1607
1608 return self.static_file_provider.get_with_static_file_or_database(
1609 StaticFileSegment::BlockMeta,
1610 number,
1611 |static_file| static_file.ommers(id),
1612 || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1613 )
1614 }
1615
1616 Ok(None)
1617 }
1618}
1619
1620impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1621 for DatabaseProvider<TX, N>
1622{
1623 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1624 self.static_file_provider.get_with_static_file_or_database(
1625 StaticFileSegment::BlockMeta,
1626 num,
1627 |static_file| static_file.block_body_indices(num),
1628 || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1629 )
1630 }
1631
1632 fn block_body_indices_range(
1633 &self,
1634 range: RangeInclusive<BlockNumber>,
1635 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1636 self.static_file_provider.get_range_with_static_file_or_database(
1637 StaticFileSegment::BlockMeta,
1638 *range.start()..*range.end() + 1,
1639 |static_file, range, _| {
1640 static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1641 },
1642 |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1643 |_| true,
1644 )
1645 }
1646}
1647
1648impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1649 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1650 Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1651 }
1652
1653 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1655 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1656 }
1657
1658 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1659 self.tx
1660 .cursor_read::<tables::StageCheckpoints>()?
1661 .walk(None)?
1662 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1663 .map_err(ProviderError::Database)
1664 }
1665}
1666
1667impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1668 fn save_stage_checkpoint(
1670 &self,
1671 id: StageId,
1672 checkpoint: StageCheckpoint,
1673 ) -> ProviderResult<()> {
1674 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1675 }
1676
1677 fn save_stage_checkpoint_progress(
1679 &self,
1680 id: StageId,
1681 checkpoint: Vec<u8>,
1682 ) -> ProviderResult<()> {
1683 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1684 }
1685
1686 fn update_pipeline_stages(
1687 &self,
1688 block_number: BlockNumber,
1689 drop_stage_checkpoint: bool,
1690 ) -> ProviderResult<()> {
1691 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1693 for stage_id in StageId::ALL {
1694 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1695 cursor.upsert(
1696 stage_id.to_string(),
1697 &StageCheckpoint {
1698 block_number,
1699 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1700 },
1701 )?;
1702 }
1703
1704 Ok(())
1705 }
1706}
1707
1708impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1709 fn plain_state_storages(
1710 &self,
1711 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1712 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1713 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1714
1715 addresses_with_keys
1716 .into_iter()
1717 .map(|(address, storage)| {
1718 storage
1719 .into_iter()
1720 .map(|key| -> ProviderResult<_> {
1721 Ok(plain_storage
1722 .seek_by_key_subkey(address, key)?
1723 .filter(|v| v.key == key)
1724 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1725 })
1726 .collect::<ProviderResult<Vec<_>>>()
1727 .map(|storage| (address, storage))
1728 })
1729 .collect::<ProviderResult<Vec<(_, _)>>>()
1730 }
1731
1732 fn changed_storages_with_range(
1733 &self,
1734 range: RangeInclusive<BlockNumber>,
1735 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1736 self.tx
1737 .cursor_read::<tables::StorageChangeSets>()?
1738 .walk_range(BlockNumberAddress::range(range))?
1739 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1742 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1743 accounts.entry(address).or_default().insert(storage_entry.key);
1744 Ok(accounts)
1745 })
1746 }
1747
1748 fn changed_storages_and_blocks_with_range(
1749 &self,
1750 range: RangeInclusive<BlockNumber>,
1751 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1752 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1753
1754 let storage_changeset_lists =
1755 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1756 BTreeMap::new(),
1757 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1758 let (index, storage) = entry?;
1759 storages
1760 .entry((index.address(), storage.key))
1761 .or_default()
1762 .push(index.block_number());
1763 Ok(storages)
1764 },
1765 )?;
1766
1767 Ok(storage_changeset_lists)
1768 }
1769}
1770
1771impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1772 for DatabaseProvider<TX, N>
1773{
1774 type Receipt = ReceiptTy<N>;
1775
1776 fn write_state(
1777 &self,
1778 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1779 is_value_known: OriginalValuesKnown,
1780 write_receipts_to: StorageLocation,
1781 ) -> ProviderResult<()> {
1782 let first_block = execution_outcome.first_block();
1783 let block_count = execution_outcome.len() as u64;
1784 let last_block = execution_outcome.last_block();
1785 let block_range = first_block..=last_block;
1786
1787 let tip = self.last_block_number()?.max(last_block);
1788
1789 let (plain_state, reverts) =
1790 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1791
1792 self.write_state_reverts(reverts, first_block)?;
1793 self.write_state_changes(plain_state)?;
1794
1795 let block_indices: Vec<_> = self
1797 .block_body_indices_range(block_range)?
1798 .into_iter()
1799 .map(|b| b.first_tx_num)
1800 .collect();
1801
1802 if block_indices.len() < block_count as usize {
1804 let missing_blocks = block_count - block_indices.len() as u64;
1805 return Err(ProviderError::BlockBodyIndicesNotFound(
1806 last_block.saturating_sub(missing_blocks - 1),
1807 ));
1808 }
1809
1810 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1811
1812 let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1817 .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1818 .transpose()?;
1819
1820 let mut receipts_static_writer = (write_receipts_to.static_files() &&
1824 !has_receipts_pruning)
1825 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1826 .transpose()?;
1827
1828 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1829 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1830
1831 let prunable_receipts =
1834 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1835
1836 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1838 for (_, addresses) in contract_log_pruner.range(..first_block) {
1839 allowed_addresses.extend(addresses.iter().copied());
1840 }
1841
1842 for (idx, (receipts, first_tx_index)) in
1843 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1844 {
1845 let block_number = first_block + idx as u64;
1846
1847 if let Some(writer) = receipts_static_writer.as_mut() {
1849 writer.increment_block(block_number)?;
1850 }
1851
1852 if prunable_receipts &&
1854 self.prune_modes
1855 .receipts
1856 .is_some_and(|mode| mode.should_prune(block_number, tip))
1857 {
1858 continue
1859 }
1860
1861 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1863 allowed_addresses.extend(new_addresses.iter().copied());
1864 }
1865
1866 for (idx, receipt) in receipts.iter().enumerate() {
1867 let receipt_idx = first_tx_index + idx as u64;
1868 if prunable_receipts &&
1871 has_contract_log_filter &&
1872 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1873 {
1874 continue
1875 }
1876
1877 if let Some(writer) = &mut receipts_static_writer {
1878 writer.append_receipt(receipt_idx, receipt)?;
1879 }
1880
1881 if let Some(cursor) = &mut receipts_cursor {
1882 cursor.append(receipt_idx, receipt)?;
1883 }
1884 }
1885 }
1886
1887 Ok(())
1888 }
1889
1890 fn write_state_reverts(
1891 &self,
1892 reverts: PlainStateReverts,
1893 first_block: BlockNumber,
1894 ) -> ProviderResult<()> {
1895 tracing::trace!("Writing storage changes");
1897 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1898 let mut storage_changeset_cursor =
1899 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1900 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1901 let block_number = first_block + block_index as BlockNumber;
1902
1903 tracing::trace!(block_number, "Writing block change");
1904 storage_changes.par_sort_unstable_by_key(|a| a.address);
1906 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1907 let storage_id = BlockNumberAddress((block_number, address));
1908
1909 let mut storage = storage_revert
1910 .into_iter()
1911 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1912 .collect::<Vec<_>>();
1913 storage.par_sort_unstable_by_key(|a| a.0);
1915
1916 let mut wiped_storage = Vec::new();
1920 if wiped {
1921 tracing::trace!(?address, "Wiping storage");
1922 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1923 wiped_storage.push((entry.key, entry.value));
1924 while let Some(entry) = storages_cursor.next_dup_val()? {
1925 wiped_storage.push((entry.key, entry.value))
1926 }
1927 }
1928 }
1929
1930 tracing::trace!(?address, ?storage, "Writing storage reverts");
1931 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1932 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1933 }
1934 }
1935 }
1936
1937 tracing::trace!("Writing account changes");
1939 let mut account_changeset_cursor =
1940 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1941
1942 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1943 let block_number = first_block + block_index as BlockNumber;
1944 account_block_reverts.par_sort_by_key(|a| a.0);
1946
1947 for (address, info) in account_block_reverts {
1948 account_changeset_cursor.append_dup(
1949 block_number,
1950 AccountBeforeTx { address, info: info.map(Into::into) },
1951 )?;
1952 }
1953 }
1954
1955 Ok(())
1956 }
1957
1958 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1959 changes.accounts.par_sort_by_key(|a| a.0);
1962 changes.storage.par_sort_by_key(|a| a.address);
1963 changes.contracts.par_sort_by_key(|a| a.0);
1964
1965 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1967 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1968 for (address, account) in changes.accounts {
1970 if let Some(account) = account {
1971 tracing::trace!(?address, "Updating plain state account");
1972 accounts_cursor.upsert(address, &account.into())?;
1973 } else if accounts_cursor.seek_exact(address)?.is_some() {
1974 tracing::trace!(?address, "Deleting plain state account");
1975 accounts_cursor.delete_current()?;
1976 }
1977 }
1978
1979 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1981 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1982 for (hash, bytecode) in changes.contracts {
1983 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1984 }
1985
1986 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1988 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1989 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1990 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1992 storages_cursor.delete_current_duplicates()?;
1993 }
1994 let mut storage = storage
1996 .into_iter()
1997 .map(|(k, value)| StorageEntry { key: k.into(), value })
1998 .collect::<Vec<_>>();
1999 storage.par_sort_unstable_by_key(|a| a.key);
2001
2002 for entry in storage {
2003 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2004 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2005 if db_entry.key == entry.key {
2006 storages_cursor.delete_current()?;
2007 }
2008 }
2009
2010 if !entry.value.is_zero() {
2011 storages_cursor.upsert(address, &entry)?;
2012 }
2013 }
2014 }
2015
2016 Ok(())
2017 }
2018
2019 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2020 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2022 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2023 if let Some(account) = account {
2024 hashed_accounts_cursor.upsert(hashed_address, &account)?;
2025 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2026 hashed_accounts_cursor.delete_current()?;
2027 }
2028 }
2029
2030 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2032 let mut hashed_storage_cursor =
2033 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2034 for (hashed_address, storage) in sorted_storages {
2035 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2036 hashed_storage_cursor.delete_current_duplicates()?;
2037 }
2038
2039 for (hashed_slot, value) in storage.storage_slots_sorted() {
2040 let entry = StorageEntry { key: hashed_slot, value };
2041 if let Some(db_entry) =
2042 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2043 {
2044 if db_entry.key == entry.key {
2045 hashed_storage_cursor.delete_current()?;
2046 }
2047 }
2048
2049 if !entry.value.is_zero() {
2050 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2051 }
2052 }
2053 }
2054
2055 Ok(())
2056 }
2057
2058 fn remove_state_above(
2080 &self,
2081 block: BlockNumber,
2082 remove_receipts_from: StorageLocation,
2083 ) -> ProviderResult<()> {
2084 let range = block + 1..=self.last_block_number()?;
2085
2086 if range.is_empty() {
2087 return Ok(());
2088 }
2089
2090 let block_bodies = self.block_body_indices_range(range.clone())?;
2092
2093 let from_transaction_num =
2095 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2096
2097 let storage_range = BlockNumberAddress::range(range.clone());
2098
2099 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2100 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2101
2102 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2107 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2108
2109 let (state, _) = self.populate_bundle_state(
2110 account_changeset,
2111 storage_changeset,
2112 &mut plain_accounts_cursor,
2113 &mut plain_storage_cursor,
2114 )?;
2115
2116 for (address, (old_account, new_account, storage)) in &state {
2118 if old_account != new_account {
2120 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2121 if let Some(account) = old_account {
2122 plain_accounts_cursor.upsert(*address, account)?;
2123 } else if existing_entry.is_some() {
2124 plain_accounts_cursor.delete_current()?;
2125 }
2126 }
2127
2128 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2130 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2131 if plain_storage_cursor
2134 .seek_by_key_subkey(*address, *storage_key)?
2135 .filter(|s| s.key == *storage_key)
2136 .is_some()
2137 {
2138 plain_storage_cursor.delete_current()?
2139 }
2140
2141 if !old_storage_value.is_zero() {
2143 plain_storage_cursor.upsert(*address, &storage_entry)?;
2144 }
2145 }
2146 }
2147
2148 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2149
2150 Ok(())
2151 }
2152
2153 fn take_state_above(
2175 &self,
2176 block: BlockNumber,
2177 remove_receipts_from: StorageLocation,
2178 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2179 let range = block + 1..=self.last_block_number()?;
2180
2181 if range.is_empty() {
2182 return Ok(ExecutionOutcome::default())
2183 }
2184 let start_block_number = *range.start();
2185
2186 let block_bodies = self.block_body_indices_range(range.clone())?;
2188
2189 let from_transaction_num =
2191 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2192 let to_transaction_num =
2193 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2194
2195 let storage_range = BlockNumberAddress::range(range.clone());
2196
2197 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2198 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2199
2200 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2205 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2206
2207 let (state, reverts) = self.populate_bundle_state(
2210 account_changeset,
2211 storage_changeset,
2212 &mut plain_accounts_cursor,
2213 &mut plain_storage_cursor,
2214 )?;
2215
2216 for (address, (old_account, new_account, storage)) in &state {
2218 if old_account != new_account {
2220 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2221 if let Some(account) = old_account {
2222 plain_accounts_cursor.upsert(*address, account)?;
2223 } else if existing_entry.is_some() {
2224 plain_accounts_cursor.delete_current()?;
2225 }
2226 }
2227
2228 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2230 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2231 if plain_storage_cursor
2234 .seek_by_key_subkey(*address, *storage_key)?
2235 .filter(|s| s.key == *storage_key)
2236 .is_some()
2237 {
2238 plain_storage_cursor.delete_current()?
2239 }
2240
2241 if !old_storage_value.is_zero() {
2243 plain_storage_cursor.upsert(*address, &storage_entry)?;
2244 }
2245 }
2246 }
2247
2248 let mut receipts_iter = self
2250 .static_file_provider
2251 .get_range_with_static_file_or_database(
2252 StaticFileSegment::Receipts,
2253 from_transaction_num..to_transaction_num + 1,
2254 |static_file, range, _| {
2255 static_file
2256 .receipts_by_tx_range(range.clone())
2257 .map(|r| range.into_iter().zip(r).collect())
2258 },
2259 |range, _| {
2260 self.tx
2261 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2262 .walk_range(range)?
2263 .map(|r| r.map_err(Into::into))
2264 .collect()
2265 },
2266 |_| true,
2267 )?
2268 .into_iter()
2269 .peekable();
2270
2271 let mut receipts = Vec::with_capacity(block_bodies.len());
2272 for block_body in block_bodies {
2274 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2275 for num in block_body.tx_num_range() {
2276 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2277 block_receipts.push(receipts_iter.next().unwrap().1);
2278 }
2279 }
2280 receipts.push(block_receipts);
2281 }
2282
2283 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2284
2285 Ok(ExecutionOutcome::new_init(
2286 state,
2287 reverts,
2288 Vec::new(),
2289 receipts,
2290 start_block_number,
2291 Vec::new(),
2292 ))
2293 }
2294}
2295
2296impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2297 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2299 if trie_updates.is_empty() {
2300 return Ok(0)
2301 }
2302
2303 let mut num_entries = 0;
2305
2306 let mut account_updates = trie_updates
2308 .removed_nodes_ref()
2309 .iter()
2310 .filter_map(|n| {
2311 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2312 })
2313 .collect::<Vec<_>>();
2314 account_updates.extend(
2315 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2316 );
2317 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2319
2320 let tx = self.tx_ref();
2321 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2322 for (key, updated_node) in account_updates {
2323 let nibbles = StoredNibbles(key.clone());
2324 match updated_node {
2325 Some(node) => {
2326 if !nibbles.0.is_empty() {
2327 num_entries += 1;
2328 account_trie_cursor.upsert(nibbles, node)?;
2329 }
2330 }
2331 None => {
2332 num_entries += 1;
2333 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2334 account_trie_cursor.delete_current()?;
2335 }
2336 }
2337 }
2338 }
2339
2340 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2341
2342 Ok(num_entries)
2343 }
2344}
2345
2346impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2347 fn write_storage_trie_updates(
2350 &self,
2351 storage_tries: &B256Map<StorageTrieUpdates>,
2352 ) -> ProviderResult<usize> {
2353 let mut num_entries = 0;
2354 let mut storage_tries = Vec::from_iter(storage_tries);
2355 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2356 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2357 for (hashed_address, storage_trie_updates) in storage_tries {
2358 let mut db_storage_trie_cursor =
2359 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2360 num_entries +=
2361 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2362 cursor = db_storage_trie_cursor.cursor;
2363 }
2364
2365 Ok(num_entries)
2366 }
2367
2368 fn write_individual_storage_trie_updates(
2369 &self,
2370 hashed_address: B256,
2371 updates: &StorageTrieUpdates,
2372 ) -> ProviderResult<usize> {
2373 if updates.is_empty() {
2374 return Ok(0)
2375 }
2376
2377 let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2378 let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2379 Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2380 }
2381}
2382
2383impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2384 fn unwind_account_hashing<'a>(
2385 &self,
2386 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2387 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2388 let hashed_accounts = changesets
2392 .into_iter()
2393 .map(|(_, e)| (keccak256(e.address), e.info))
2394 .collect::<Vec<_>>()
2395 .into_iter()
2396 .rev()
2397 .collect::<BTreeMap<_, _>>();
2398
2399 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2401 for (hashed_address, account) in &hashed_accounts {
2402 if let Some(account) = account {
2403 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2404 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2405 hashed_accounts_cursor.delete_current()?;
2406 }
2407 }
2408
2409 Ok(hashed_accounts)
2410 }
2411
2412 fn unwind_account_hashing_range(
2413 &self,
2414 range: impl RangeBounds<BlockNumber>,
2415 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2416 let changesets = self
2417 .tx
2418 .cursor_read::<tables::AccountChangeSets>()?
2419 .walk_range(range)?
2420 .collect::<Result<Vec<_>, _>>()?;
2421 self.unwind_account_hashing(changesets.iter())
2422 }
2423
2424 fn insert_account_for_hashing(
2425 &self,
2426 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2427 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2428 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2429 let hashed_accounts =
2430 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2431 for (hashed_address, account) in &hashed_accounts {
2432 if let Some(account) = account {
2433 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2434 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2435 hashed_accounts_cursor.delete_current()?;
2436 }
2437 }
2438 Ok(hashed_accounts)
2439 }
2440
2441 fn unwind_storage_hashing(
2442 &self,
2443 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2444 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2445 let mut hashed_storages = changesets
2447 .into_iter()
2448 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2449 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2450 })
2451 .collect::<Vec<_>>();
2452 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2453
2454 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2456 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2457 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2458 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2459 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2460
2461 if hashed_storage
2462 .seek_by_key_subkey(hashed_address, key)?
2463 .filter(|entry| entry.key == key)
2464 .is_some()
2465 {
2466 hashed_storage.delete_current()?;
2467 }
2468
2469 if !value.is_zero() {
2470 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2471 }
2472 }
2473 Ok(hashed_storage_keys)
2474 }
2475
2476 fn unwind_storage_hashing_range(
2477 &self,
2478 range: impl RangeBounds<BlockNumberAddress>,
2479 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2480 let changesets = self
2481 .tx
2482 .cursor_read::<tables::StorageChangeSets>()?
2483 .walk_range(range)?
2484 .collect::<Result<Vec<_>, _>>()?;
2485 self.unwind_storage_hashing(changesets.into_iter())
2486 }
2487
2488 fn insert_storage_for_hashing(
2489 &self,
2490 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2491 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2492 let hashed_storages =
2494 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2495 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2496 map.insert(keccak256(entry.key), entry.value);
2497 map
2498 });
2499 map.insert(keccak256(address), storage);
2500 map
2501 });
2502
2503 let hashed_storage_keys = hashed_storages
2504 .iter()
2505 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2506 .collect();
2507
2508 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2509 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2512 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2513 if hashed_storage_cursor
2514 .seek_by_key_subkey(hashed_address, key)?
2515 .filter(|entry| entry.key == key)
2516 .is_some()
2517 {
2518 hashed_storage_cursor.delete_current()?;
2519 }
2520
2521 if !value.is_zero() {
2522 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2523 }
2524 Ok(())
2525 })
2526 })?;
2527
2528 Ok(hashed_storage_keys)
2529 }
2530
2531 fn insert_hashes(
2532 &self,
2533 range: RangeInclusive<BlockNumber>,
2534 end_block_hash: B256,
2535 expected_state_root: B256,
2536 ) -> ProviderResult<()> {
2537 let mut account_prefix_set = PrefixSetMut::default();
2539 let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2540 let mut destroyed_accounts = HashSet::default();
2541
2542 let mut durations_recorder = metrics::DurationsRecorder::default();
2543
2544 {
2546 let lists = self.changed_storages_with_range(range.clone())?;
2547 let storages = self.plain_state_storages(lists)?;
2548 let storage_entries = self.insert_storage_for_hashing(storages)?;
2549 for (hashed_address, hashed_slots) in storage_entries {
2550 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2551 for slot in hashed_slots {
2552 storage_prefix_sets
2553 .entry(hashed_address)
2554 .or_default()
2555 .insert(Nibbles::unpack(slot));
2556 }
2557 }
2558 }
2559 durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2560
2561 {
2563 let lists = self.changed_accounts_with_range(range.clone())?;
2564 let accounts = self.basic_accounts(lists)?;
2565 let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2566 for (hashed_address, account) in hashed_addresses {
2567 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2568 if account.is_none() {
2569 destroyed_accounts.insert(hashed_address);
2570 }
2571 }
2572 }
2573 durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2574
2575 {
2577 let prefix_sets = TriePrefixSets {
2580 account_prefix_set: account_prefix_set.freeze(),
2581 storage_prefix_sets: storage_prefix_sets
2582 .into_iter()
2583 .map(|(k, v)| (k, v.freeze()))
2584 .collect(),
2585 destroyed_accounts,
2586 };
2587 let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2588 .with_prefix_sets(prefix_sets)
2589 .root_with_updates()
2590 .map_err(reth_db_api::DatabaseError::from)?;
2591 if state_root != expected_state_root {
2592 return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2593 root: GotExpected { got: state_root, expected: expected_state_root },
2594 block_number: *range.end(),
2595 block_hash: end_block_hash,
2596 })))
2597 }
2598 self.write_trie_updates(&trie_updates)?;
2599 }
2600 durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2601
2602 debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2603
2604 Ok(())
2605 }
2606}
2607
2608impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2609 fn unwind_account_history_indices<'a>(
2610 &self,
2611 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2612 ) -> ProviderResult<usize> {
2613 let mut last_indices = changesets
2614 .into_iter()
2615 .map(|(index, account)| (account.address, *index))
2616 .collect::<Vec<_>>();
2617 last_indices.sort_by_key(|(a, _)| *a);
2618
2619 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2621 for &(address, rem_index) in &last_indices {
2622 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2623 &mut cursor,
2624 ShardedKey::last(address),
2625 rem_index,
2626 |sharded_key| sharded_key.key == address,
2627 )?;
2628
2629 if !partial_shard.is_empty() {
2632 cursor.insert(
2633 ShardedKey::last(address),
2634 &BlockNumberList::new_pre_sorted(partial_shard),
2635 )?;
2636 }
2637 }
2638
2639 let changesets = last_indices.len();
2640 Ok(changesets)
2641 }
2642
2643 fn unwind_account_history_indices_range(
2644 &self,
2645 range: impl RangeBounds<BlockNumber>,
2646 ) -> ProviderResult<usize> {
2647 let changesets = self
2648 .tx
2649 .cursor_read::<tables::AccountChangeSets>()?
2650 .walk_range(range)?
2651 .collect::<Result<Vec<_>, _>>()?;
2652 self.unwind_account_history_indices(changesets.iter())
2653 }
2654
2655 fn insert_account_history_index(
2656 &self,
2657 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2658 ) -> ProviderResult<()> {
2659 self.append_history_index::<_, tables::AccountsHistory>(
2660 account_transitions,
2661 ShardedKey::new,
2662 )
2663 }
2664
2665 fn unwind_storage_history_indices(
2666 &self,
2667 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2668 ) -> ProviderResult<usize> {
2669 let mut storage_changesets = changesets
2670 .into_iter()
2671 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2672 .collect::<Vec<_>>();
2673 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2674
2675 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2676 for &(address, storage_key, rem_index) in &storage_changesets {
2677 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2678 &mut cursor,
2679 StorageShardedKey::last(address, storage_key),
2680 rem_index,
2681 |storage_sharded_key| {
2682 storage_sharded_key.address == address &&
2683 storage_sharded_key.sharded_key.key == storage_key
2684 },
2685 )?;
2686
2687 if !partial_shard.is_empty() {
2690 cursor.insert(
2691 StorageShardedKey::last(address, storage_key),
2692 &BlockNumberList::new_pre_sorted(partial_shard),
2693 )?;
2694 }
2695 }
2696
2697 let changesets = storage_changesets.len();
2698 Ok(changesets)
2699 }
2700
2701 fn unwind_storage_history_indices_range(
2702 &self,
2703 range: impl RangeBounds<BlockNumberAddress>,
2704 ) -> ProviderResult<usize> {
2705 let changesets = self
2706 .tx
2707 .cursor_read::<tables::StorageChangeSets>()?
2708 .walk_range(range)?
2709 .collect::<Result<Vec<_>, _>>()?;
2710 self.unwind_storage_history_indices(changesets.into_iter())
2711 }
2712
2713 fn insert_storage_history_index(
2714 &self,
2715 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2716 ) -> ProviderResult<()> {
2717 self.append_history_index::<_, tables::StoragesHistory>(
2718 storage_transitions,
2719 |(address, storage_key), highest_block_number| {
2720 StorageShardedKey::new(address, storage_key, highest_block_number)
2721 },
2722 )
2723 }
2724
2725 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2726 {
2728 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2729 self.insert_account_history_index(indices)?;
2730 }
2731
2732 {
2734 let indices = self.changed_storages_and_blocks_with_range(range)?;
2735 self.insert_storage_history_index(indices)?;
2736 }
2737
2738 Ok(())
2739 }
2740}
2741
2742impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2743 for DatabaseProvider<TX, N>
2744{
2745 fn take_block_and_execution_above(
2746 &self,
2747 block: BlockNumber,
2748 remove_from: StorageLocation,
2749 ) -> ProviderResult<Chain<Self::Primitives>> {
2750 let range = block + 1..=self.last_block_number()?;
2751
2752 self.unwind_trie_state_range(range.clone())?;
2753
2754 let execution_state = self.take_state_above(block, remove_from)?;
2756
2757 let blocks = self.recovered_block_range(range)?;
2758
2759 self.remove_blocks_above(block, remove_from)?;
2762
2763 self.update_pipeline_stages(block, true)?;
2765
2766 Ok(Chain::new(blocks, execution_state, None))
2767 }
2768
2769 fn remove_block_and_execution_above(
2770 &self,
2771 block: BlockNumber,
2772 remove_from: StorageLocation,
2773 ) -> ProviderResult<()> {
2774 let range = block + 1..=self.last_block_number()?;
2775
2776 self.unwind_trie_state_range(range)?;
2777
2778 self.remove_state_above(block, remove_from)?;
2780
2781 self.remove_blocks_above(block, remove_from)?;
2784
2785 self.update_pipeline_stages(block, true)?;
2787
2788 Ok(())
2789 }
2790}
2791
2792impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2793 for DatabaseProvider<TX, N>
2794{
2795 type Block = BlockTy<N>;
2796 type Receipt = ReceiptTy<N>;
2797
2798 fn insert_block(
2819 &self,
2820 block: RecoveredBlock<Self::Block>,
2821 write_to: StorageLocation,
2822 ) -> ProviderResult<StoredBlockBodyIndices> {
2823 let block_number = block.number();
2824
2825 let mut durations_recorder = metrics::DurationsRecorder::default();
2826
2827 let ttd = if block_number == 0 {
2829 block.header().difficulty()
2830 } else {
2831 let parent_block_number = block_number - 1;
2832 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2833 durations_recorder.record_relative(metrics::Action::GetParentTD);
2834 parent_ttd + block.header().difficulty()
2835 };
2836
2837 if write_to.database() {
2838 self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2839 durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2840
2841 self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2843 durations_recorder.record_relative(metrics::Action::InsertHeaders);
2844
2845 self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2846 durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2847 }
2848
2849 if write_to.static_files() {
2850 let mut writer =
2851 self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2852 writer.append_header(block.header(), ttd, &block.hash())?;
2853 }
2854
2855 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2856 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2857
2858 let mut next_tx_num = self
2859 .tx
2860 .cursor_read::<tables::TransactionBlocks>()?
2861 .last()?
2862 .map(|(n, _)| n + 1)
2863 .unwrap_or_default();
2864 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2865 let first_tx_num = next_tx_num;
2866
2867 let tx_count = block.body().transaction_count() as u64;
2868
2869 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2871 let hash = transaction.tx_hash();
2872
2873 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2874 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2875 }
2876
2877 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2878 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2879 }
2880 next_tx_num += 1;
2881 }
2882
2883 self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2884
2885 debug!(
2886 target: "providers::db",
2887 ?block_number,
2888 actions = ?durations_recorder.actions,
2889 "Inserted block"
2890 );
2891
2892 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2893 }
2894
2895 fn append_block_bodies(
2896 &self,
2897 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2898 write_to: StorageLocation,
2899 ) -> ProviderResult<()> {
2900 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2901
2902 let mut tx_static_writer = write_to
2904 .static_files()
2905 .then(|| {
2906 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2907 })
2908 .transpose()?;
2909
2910 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2911 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2912
2913 let mut tx_cursor = write_to
2915 .database()
2916 .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2917 .transpose()?;
2918
2919 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2921
2922 for (block_number, body) in &bodies {
2923 if let Some(writer) = tx_static_writer.as_mut() {
2925 writer.increment_block(*block_number)?;
2926 }
2927
2928 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2929 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2930
2931 let mut durations_recorder = metrics::DurationsRecorder::default();
2932
2933 block_indices_cursor.append(*block_number, &block_indices)?;
2935
2936 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2937
2938 let Some(body) = body else { continue };
2939
2940 if !body.transactions().is_empty() {
2942 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2943 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2944 }
2945
2946 for transaction in body.transactions() {
2948 if let Some(writer) = tx_static_writer.as_mut() {
2949 writer.append_transaction(next_tx_num, transaction)?;
2950 }
2951 if let Some(cursor) = tx_cursor.as_mut() {
2952 cursor.append(next_tx_num, transaction)?;
2953 }
2954
2955 next_tx_num += 1;
2957 }
2958
2959 debug!(
2960 target: "providers::db",
2961 ?block_number,
2962 actions = ?durations_recorder.actions,
2963 "Inserted block body"
2964 );
2965 }
2966
2967 self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2968
2969 Ok(())
2970 }
2971
2972 fn remove_blocks_above(
2973 &self,
2974 block: BlockNumber,
2975 remove_from: StorageLocation,
2976 ) -> ProviderResult<()> {
2977 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2978 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2979 }
2980
2981 self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2984 self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2985 self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2986
2987 let unwind_tx_from = self
2989 .block_body_indices(block)?
2990 .map(|b| b.next_tx_num())
2991 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2992
2993 let unwind_tx_to = self
2995 .tx
2996 .cursor_read::<tables::BlockBodyIndices>()?
2997 .last()?
2998 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3000 .1
3001 .last_tx_num();
3002
3003 if unwind_tx_from <= unwind_tx_to {
3004 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3005 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3006 }
3007 }
3008
3009 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3010
3011 self.remove_bodies_above(block, remove_from)?;
3012
3013 Ok(())
3014 }
3015
3016 fn remove_bodies_above(
3017 &self,
3018 block: BlockNumber,
3019 remove_from: StorageLocation,
3020 ) -> ProviderResult<()> {
3021 self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3022
3023 let unwind_tx_from = self
3025 .block_body_indices(block)?
3026 .map(|b| b.next_tx_num())
3027 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3028
3029 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3030 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3031
3032 if remove_from.database() {
3033 self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3034 }
3035
3036 if remove_from.static_files() {
3037 let static_file_tx_num = self
3038 .static_file_provider
3039 .get_highest_static_file_tx(StaticFileSegment::Transactions);
3040
3041 let to_delete = static_file_tx_num
3042 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3043 .unwrap_or_default();
3044
3045 self.static_file_provider
3046 .latest_writer(StaticFileSegment::Transactions)?
3047 .prune_transactions(to_delete, block)?;
3048 }
3049
3050 Ok(())
3051 }
3052
3053 fn append_blocks_with_state(
3055 &self,
3056 blocks: Vec<RecoveredBlock<Self::Block>>,
3057 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3058 hashed_state: HashedPostStateSorted,
3059 trie_updates: TrieUpdates,
3060 ) -> ProviderResult<()> {
3061 if blocks.is_empty() {
3062 debug!(target: "providers::db", "Attempted to append empty block range");
3063 return Ok(())
3064 }
3065
3066 let first_number = blocks.first().unwrap().number();
3067
3068 let last = blocks.last().unwrap();
3069 let last_block_number = last.number();
3070
3071 let mut durations_recorder = metrics::DurationsRecorder::default();
3072
3073 for block in blocks {
3075 self.insert_block(block, StorageLocation::Database)?;
3076 durations_recorder.record_relative(metrics::Action::InsertBlock);
3077 }
3078
3079 self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3080 durations_recorder.record_relative(metrics::Action::InsertState);
3081
3082 self.write_hashed_state(&hashed_state)?;
3084 self.write_trie_updates(&trie_updates)?;
3085 durations_recorder.record_relative(metrics::Action::InsertHashes);
3086
3087 self.update_history_indices(first_number..=last_block_number)?;
3088 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3089
3090 self.update_pipeline_stages(last_block_number, false)?;
3092 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3093
3094 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3095
3096 Ok(())
3097 }
3098}
3099
3100impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3101 fn get_prune_checkpoint(
3102 &self,
3103 segment: PruneSegment,
3104 ) -> ProviderResult<Option<PruneCheckpoint>> {
3105 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3106 }
3107
3108 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3109 Ok(self
3110 .tx
3111 .cursor_read::<tables::PruneCheckpoints>()?
3112 .walk(None)?
3113 .collect::<Result<_, _>>()?)
3114 }
3115}
3116
3117impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3118 fn save_prune_checkpoint(
3119 &self,
3120 segment: PruneSegment,
3121 checkpoint: PruneCheckpoint,
3122 ) -> ProviderResult<()> {
3123 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3124 }
3125}
3126
3127impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3128 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3129 let db_entries = self.tx.entries::<T>()?;
3130 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3131 Ok(entries) => entries,
3132 Err(ProviderError::UnsupportedProvider) => 0,
3133 Err(err) => return Err(err),
3134 };
3135
3136 Ok(db_entries + static_file_entries)
3137 }
3138}
3139
3140impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3141 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3142 let mut finalized_blocks = self
3143 .tx
3144 .cursor_read::<tables::ChainState>()?
3145 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3146 .take(1)
3147 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3148
3149 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3150 Ok(last_finalized_block_number)
3151 }
3152
3153 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3154 let mut finalized_blocks = self
3155 .tx
3156 .cursor_read::<tables::ChainState>()?
3157 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3158 .take(1)
3159 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3160
3161 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3162 Ok(last_finalized_block_number)
3163 }
3164}
3165
3166impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3167 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3168 Ok(self
3169 .tx
3170 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3171 }
3172
3173 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3174 Ok(self
3175 .tx
3176 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3177 }
3178}
3179
3180impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3181 type Tx = TX;
3182
3183 fn tx_ref(&self) -> &Self::Tx {
3184 &self.tx
3185 }
3186
3187 fn tx_mut(&mut self) -> &mut Self::Tx {
3188 &mut self.tx
3189 }
3190
3191 fn into_tx(self) -> Self::Tx {
3192 self.tx
3193 }
3194
3195 fn prune_modes_ref(&self) -> &PruneModes {
3196 self.prune_modes_ref()
3197 }
3198}