1use crate::{
2 changesets_utils::{
3 storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4 },
5 providers::{
6 database::{chain::ChainStorage, metrics},
7 static_file::StaticFileWriter,
8 NodeTypesForProvider, StaticFileProvider,
9 },
10 to_range,
11 traits::{
12 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
13 },
14 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
15 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
16 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
17 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
18 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
19 PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
20 StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader,
21 StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
22 TrieReader, TrieWriter,
23};
24use alloy_consensus::{
25 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
26 BlockHeader, TxReceipt,
27};
28use alloy_eips::BlockHashOrNumber;
29use alloy_primitives::{
30 keccak256,
31 map::{hash_map, B256Map, HashMap, HashSet},
32 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
33};
34use itertools::Itertools;
35use parking_lot::RwLock;
36use rayon::slice::ParallelSliceMut;
37use reth_chain_state::ExecutedBlock;
38use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
39use reth_db_api::{
40 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
41 database::Database,
42 models::{
43 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
44 BlockNumberHashedAddress, ShardedKey, StorageSettings, StoredBlockBodyIndices,
45 },
46 table::Table,
47 tables,
48 transaction::{DbTx, DbTxMut},
49 BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54 Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
55};
56use reth_prune_types::{
57 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
58};
59use reth_stages_types::{StageCheckpoint, StageId};
60use reth_static_file_types::StaticFileSegment;
61use reth_storage_api::{
62 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
63 NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
64 TryIntoHistoricalStateProvider,
65};
66use reth_storage_errors::provider::ProviderResult;
67use reth_trie::{
68 trie_cursor::{
69 InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
70 TrieCursorIter,
71 },
72 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
73 HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
74};
75use reth_trie_db::{
76 DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
77};
78use revm_database::states::{
79 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
80};
81use std::{
82 cmp::Ordering,
83 collections::{BTreeMap, BTreeSet},
84 fmt::Debug,
85 ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
86 sync::Arc,
87};
88use tracing::{debug, trace};
89
90pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
92
93#[derive(Debug)]
98pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
99 pub DatabaseProvider<<DB as Database>::TXMut, N>,
100);
101
102impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
103 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
104
105 fn deref(&self) -> &Self::Target {
106 &self.0
107 }
108}
109
110impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
111 fn deref_mut(&mut self) -> &mut Self::Target {
112 &mut self.0
113 }
114}
115
116impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
117 for DatabaseProviderRW<DB, N>
118{
119 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
120 &self.0
121 }
122}
123
124impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
125 pub fn commit(self) -> ProviderResult<bool> {
127 self.0.commit()
128 }
129
130 pub fn into_tx(self) -> <DB as Database>::TXMut {
132 self.0.into_tx()
133 }
134
135 #[cfg(any(test, feature = "test-utils"))]
137 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
138 self.0.minimum_pruning_distance = distance;
139 self
140 }
141}
142
143impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
144 for DatabaseProvider<<DB as Database>::TXMut, N>
145{
146 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
147 provider.0
148 }
149}
150
151#[derive(Debug)]
154pub struct DatabaseProvider<TX, N: NodeTypes> {
155 tx: TX,
157 chain_spec: Arc<N::ChainSpec>,
159 static_file_provider: StaticFileProvider<N::Primitives>,
161 prune_modes: PruneModes,
163 storage: Arc<N::Storage>,
165 storage_settings: Arc<RwLock<StorageSettings>>,
167 minimum_pruning_distance: u64,
169}
170
171impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
172 pub const fn prune_modes_ref(&self) -> &PruneModes {
174 &self.prune_modes
175 }
176}
177
178impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
179 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
181 trace!(target: "providers::db", "Returning latest state provider");
182 Box::new(LatestStateProviderRef::new(self))
183 }
184
185 pub fn history_by_block_hash<'a>(
187 &'a self,
188 block_hash: BlockHash,
189 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
190 let mut block_number =
191 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
192 if block_number == self.best_block_number().unwrap_or_default() &&
193 block_number == self.last_block_number().unwrap_or_default()
194 {
195 return Ok(Box::new(LatestStateProviderRef::new(self)))
196 }
197
198 block_number += 1;
200
201 let account_history_prune_checkpoint =
202 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
203 let storage_history_prune_checkpoint =
204 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
205
206 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
207
208 if let Some(prune_checkpoint_block_number) =
211 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
212 {
213 state_provider = state_provider.with_lowest_available_account_history_block_number(
214 prune_checkpoint_block_number + 1,
215 );
216 }
217 if let Some(prune_checkpoint_block_number) =
218 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
219 {
220 state_provider = state_provider.with_lowest_available_storage_history_block_number(
221 prune_checkpoint_block_number + 1,
222 );
223 }
224
225 Ok(Box::new(state_provider))
226 }
227
228 #[cfg(feature = "test-utils")]
229 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
231 self.prune_modes = prune_modes;
232 }
233}
234
235impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
236 type Primitives = N::Primitives;
237}
238
239impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
240 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
242 self.static_file_provider.clone()
243 }
244
245 fn get_static_file_writer(
246 &self,
247 block: BlockNumber,
248 segment: StaticFileSegment,
249 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
250 self.static_file_provider.get_writer(block, segment)
251 }
252}
253
254impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
255 for DatabaseProvider<TX, N>
256{
257 type ChainSpec = N::ChainSpec;
258
259 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
260 self.chain_spec.clone()
261 }
262}
263
264impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
265 pub const fn new_rw(
267 tx: TX,
268 chain_spec: Arc<N::ChainSpec>,
269 static_file_provider: StaticFileProvider<N::Primitives>,
270 prune_modes: PruneModes,
271 storage: Arc<N::Storage>,
272 storage_settings: Arc<RwLock<StorageSettings>>,
273 ) -> Self {
274 Self {
275 tx,
276 chain_spec,
277 static_file_provider,
278 prune_modes,
279 storage,
280 storage_settings,
281 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
282 }
283 }
284}
285
286impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
287 fn as_ref(&self) -> &Self {
288 self
289 }
290}
291
292impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
293 pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
295 if blocks.is_empty() {
296 debug!(target: "providers::db", "Attempted to write empty block range");
297 return Ok(())
298 }
299
300 let first_block = blocks.first().unwrap().recovered_block();
302
303 let last_block = blocks.last().unwrap().recovered_block();
304 let first_number = first_block.number();
305 let last_block_number = last_block.number();
306
307 debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
308
309 for ExecutedBlock { recovered_block, execution_output, hashed_state, trie_updates } in
319 blocks
320 {
321 let block_number = recovered_block.number();
322 self.insert_block(Arc::unwrap_or_clone(recovered_block))?;
323
324 self.write_state(&execution_output, OriginalValuesKnown::No)?;
327
328 self.write_hashed_state(&hashed_state)?;
330
331 self.write_trie_changesets(block_number, &trie_updates, None)?;
332 self.write_trie_updates_sorted(&trie_updates)?;
333 }
334
335 self.update_history_indices(first_number..=last_block_number)?;
337
338 self.update_pipeline_stages(last_block_number, false)?;
340
341 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
342
343 Ok(())
344 }
345
346 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
351 let changed_accounts = self
352 .tx
353 .cursor_read::<tables::AccountChangeSets>()?
354 .walk_range(from..)?
355 .collect::<Result<Vec<_>, _>>()?;
356
357 self.unwind_account_hashing(changed_accounts.iter())?;
359
360 self.unwind_account_history_indices(changed_accounts.iter())?;
362
363 let storage_start = BlockNumberAddress((from, Address::ZERO));
364 let changed_storages = self
365 .tx
366 .cursor_read::<tables::StorageChangeSets>()?
367 .walk_range(storage_start..)?
368 .collect::<Result<Vec<_>, _>>()?;
369
370 self.unwind_storage_hashing(changed_storages.iter().copied())?;
372
373 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
375
376 let trie_revert = self.trie_reverts(from)?;
378 self.write_trie_updates_sorted(&trie_revert)?;
379
380 self.clear_trie_changesets_from(from)?;
382
383 Ok(())
384 }
385
386 fn remove_receipts_from(
388 &self,
389 from_tx: TxNumber,
390 last_block: BlockNumber,
391 ) -> ProviderResult<()> {
392 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
394
395 if EitherWriter::receipts_destination(self).is_static_file() {
396 let static_file_receipt_num =
397 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
398
399 let to_delete = static_file_receipt_num
400 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
401 .unwrap_or_default();
402
403 self.static_file_provider
404 .latest_writer(StaticFileSegment::Receipts)?
405 .prune_receipts(to_delete, last_block)?;
406 }
407
408 Ok(())
409 }
410}
411
412impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
413 fn try_into_history_at_block(
414 self,
415 mut block_number: BlockNumber,
416 ) -> ProviderResult<StateProviderBox> {
417 if block_number == self.best_block_number().unwrap_or_default() {
420 return Ok(Box::new(LatestStateProvider::new(self)))
421 }
422
423 block_number += 1;
425
426 let account_history_prune_checkpoint =
427 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
428 let storage_history_prune_checkpoint =
429 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
430
431 let mut state_provider = HistoricalStateProvider::new(self, block_number);
432
433 if let Some(prune_checkpoint_block_number) =
436 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
437 {
438 state_provider = state_provider.with_lowest_available_account_history_block_number(
439 prune_checkpoint_block_number + 1,
440 );
441 }
442 if let Some(prune_checkpoint_block_number) =
443 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
444 {
445 state_provider = state_provider.with_lowest_available_storage_history_block_number(
446 prune_checkpoint_block_number + 1,
447 );
448 }
449
450 Ok(Box::new(state_provider))
451 }
452}
453
454fn unwind_history_shards<S, T, C>(
469 cursor: &mut C,
470 start_key: T::Key,
471 block_number: BlockNumber,
472 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
473) -> ProviderResult<Vec<u64>>
474where
475 T: Table<Value = BlockNumberList>,
476 T::Key: AsRef<ShardedKey<S>>,
477 C: DbCursorRO<T> + DbCursorRW<T>,
478{
479 let mut item = cursor.seek_exact(start_key)?;
481 while let Some((sharded_key, list)) = item {
482 if !shard_belongs_to_key(&sharded_key) {
484 break
485 }
486
487 cursor.delete_current()?;
490
491 let first = list.iter().next().expect("List can't be empty");
494
495 if first >= block_number {
498 item = cursor.prev()?;
499 continue
500 }
501 else if block_number <= sharded_key.as_ref().highest_block_number {
504 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
507 }
508 return Ok(list.iter().collect::<Vec<_>>())
511 }
512
513 Ok(Vec::new())
515}
516
517impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
518 pub const fn new(
520 tx: TX,
521 chain_spec: Arc<N::ChainSpec>,
522 static_file_provider: StaticFileProvider<N::Primitives>,
523 prune_modes: PruneModes,
524 storage: Arc<N::Storage>,
525 storage_settings: Arc<RwLock<StorageSettings>>,
526 ) -> Self {
527 Self {
528 tx,
529 chain_spec,
530 static_file_provider,
531 prune_modes,
532 storage,
533 storage_settings,
534 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
535 }
536 }
537
538 pub fn into_tx(self) -> TX {
540 self.tx
541 }
542
543 pub const fn tx_mut(&mut self) -> &mut TX {
545 &mut self.tx
546 }
547
548 pub const fn tx_ref(&self) -> &TX {
550 &self.tx
551 }
552
553 pub fn chain_spec(&self) -> &N::ChainSpec {
555 &self.chain_spec
556 }
557}
558
559impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
560 fn recovered_block<H, HF, B, BF>(
561 &self,
562 id: BlockHashOrNumber,
563 _transaction_kind: TransactionVariant,
564 header_by_number: HF,
565 construct_block: BF,
566 ) -> ProviderResult<Option<B>>
567 where
568 H: AsRef<HeaderTy<N>>,
569 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
570 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
571 {
572 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
573 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
574
575 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
582
583 let tx_range = body.tx_num_range();
584
585 let (transactions, senders) = if tx_range.is_empty() {
586 (vec![], vec![])
587 } else {
588 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
589 };
590
591 let body = self
592 .storage
593 .reader()
594 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
595 .pop()
596 .ok_or(ProviderError::InvalidStorageOutput)?;
597
598 construct_block(header, body, senders)
599 }
600
601 fn block_range<F, H, HF, R>(
611 &self,
612 range: RangeInclusive<BlockNumber>,
613 headers_range: HF,
614 mut assemble_block: F,
615 ) -> ProviderResult<Vec<R>>
616 where
617 H: AsRef<HeaderTy<N>>,
618 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
619 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
620 {
621 if range.is_empty() {
622 return Ok(Vec::new())
623 }
624
625 let len = range.end().saturating_sub(*range.start()) as usize;
626 let mut blocks = Vec::with_capacity(len);
627
628 let headers = headers_range(range.clone())?;
629
630 let present_headers = self
636 .block_body_indices_range(range)?
637 .into_iter()
638 .map(|b| b.tx_num_range())
639 .zip(headers)
640 .collect::<Vec<_>>();
641
642 let mut inputs = Vec::new();
643 for (tx_range, header) in &present_headers {
644 let transactions = if tx_range.is_empty() {
645 Vec::new()
646 } else {
647 self.transactions_by_tx_range(tx_range.clone())?
648 };
649
650 inputs.push((header.as_ref(), transactions));
651 }
652
653 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
654
655 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
656 blocks.push(assemble_block(header, body, tx_range)?);
657 }
658
659 Ok(blocks)
660 }
661
662 fn block_with_senders_range<H, HF, B, BF>(
673 &self,
674 range: RangeInclusive<BlockNumber>,
675 headers_range: HF,
676 assemble_block: BF,
677 ) -> ProviderResult<Vec<B>>
678 where
679 H: AsRef<HeaderTy<N>>,
680 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
681 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
682 {
683 self.block_range(range, headers_range, |header, body, tx_range| {
684 let senders = if tx_range.is_empty() {
685 Vec::new()
686 } else {
687 let known_senders: HashMap<TxNumber, Address> =
688 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
689
690 let mut senders = Vec::with_capacity(body.transactions().len());
691 for (tx_num, tx) in tx_range.zip(body.transactions()) {
692 match known_senders.get(&tx_num) {
693 None => {
694 let sender = tx.recover_signer_unchecked()?;
696 senders.push(sender);
697 }
698 Some(sender) => senders.push(*sender),
699 }
700 }
701
702 senders
703 };
704
705 assemble_block(header, body, senders)
706 })
707 }
708
709 fn populate_bundle_state<A, S>(
713 &self,
714 account_changeset: Vec<(u64, AccountBeforeTx)>,
715 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
716 plain_accounts_cursor: &mut A,
717 plain_storage_cursor: &mut S,
718 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
719 where
720 A: DbCursorRO<PlainAccountState>,
721 S: DbDupCursorRO<PlainStorageState>,
722 {
723 let mut state: BundleStateInit = HashMap::default();
727
728 let mut reverts: RevertsInit = HashMap::default();
734
735 for (block_number, account_before) in account_changeset.into_iter().rev() {
737 let AccountBeforeTx { info: old_info, address } = account_before;
738 match state.entry(address) {
739 hash_map::Entry::Vacant(entry) => {
740 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
741 entry.insert((old_info, new_info, HashMap::default()));
742 }
743 hash_map::Entry::Occupied(mut entry) => {
744 entry.get_mut().0 = old_info;
746 }
747 }
748 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
750 }
751
752 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
754 let BlockNumberAddress((block_number, address)) = block_and_address;
755 let account_state = match state.entry(address) {
757 hash_map::Entry::Vacant(entry) => {
758 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
759 entry.insert((present_info, present_info, HashMap::default()))
760 }
761 hash_map::Entry::Occupied(entry) => entry.into_mut(),
762 };
763
764 match account_state.2.entry(old_storage.key) {
766 hash_map::Entry::Vacant(entry) => {
767 let new_storage = plain_storage_cursor
768 .seek_by_key_subkey(address, old_storage.key)?
769 .filter(|storage| storage.key == old_storage.key)
770 .unwrap_or_default();
771 entry.insert((old_storage.value, new_storage.value));
772 }
773 hash_map::Entry::Occupied(mut entry) => {
774 entry.get_mut().0 = old_storage.value;
775 }
776 };
777
778 reverts
779 .entry(block_number)
780 .or_default()
781 .entry(address)
782 .or_default()
783 .1
784 .push(old_storage);
785 }
786
787 Ok((state, reverts))
788 }
789}
790
791impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
792 fn take_shard<T>(
795 &self,
796 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
797 key: T::Key,
798 ) -> ProviderResult<Vec<u64>>
799 where
800 T: Table<Value = BlockNumberList>,
801 {
802 if let Some((_, list)) = cursor.seek_exact(key)? {
803 cursor.delete_current()?;
805 let list = list.iter().collect::<Vec<_>>();
806 return Ok(list)
807 }
808 Ok(Vec::new())
809 }
810
811 fn append_history_index<P, T>(
819 &self,
820 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
821 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
822 ) -> ProviderResult<()>
823 where
824 P: Copy,
825 T: Table<Value = BlockNumberList>,
826 {
827 let mut cursor = self.tx.cursor_write::<T>()?;
828 for (partial_key, indices) in index_updates {
829 let mut last_shard =
830 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
831 last_shard.extend(indices);
832 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
834 while let Some(list) = chunks.next() {
835 let highest_block_number = if chunks.peek().is_some() {
836 *list.last().expect("`chunks` does not return empty list")
837 } else {
838 u64::MAX
840 };
841 cursor.insert(
842 sharded_key_factory(partial_key, highest_block_number),
843 &BlockNumberList::new_pre_sorted(list.iter().copied()),
844 )?;
845 }
846 }
847 Ok(())
848 }
849}
850
851impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
852 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
853 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
854 }
855}
856
857impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
858 fn changed_accounts_with_range(
859 &self,
860 range: impl RangeBounds<BlockNumber>,
861 ) -> ProviderResult<BTreeSet<Address>> {
862 self.tx
863 .cursor_read::<tables::AccountChangeSets>()?
864 .walk_range(range)?
865 .map(|entry| {
866 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
867 })
868 .collect()
869 }
870
871 fn basic_accounts(
872 &self,
873 iter: impl IntoIterator<Item = Address>,
874 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
875 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
876 Ok(iter
877 .into_iter()
878 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
879 .collect::<Result<Vec<_>, _>>()?)
880 }
881
882 fn changed_accounts_and_blocks_with_range(
883 &self,
884 range: RangeInclusive<BlockNumber>,
885 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
886 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
887
888 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
889 BTreeMap::new(),
890 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
891 let (index, account) = entry?;
892 accounts.entry(account.address).or_default().push(index);
893 Ok(accounts)
894 },
895 )?;
896
897 Ok(account_transitions)
898 }
899}
900
901impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
902 fn storage_changeset(
903 &self,
904 block_number: BlockNumber,
905 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
906 let range = block_number..=block_number;
907 let storage_range = BlockNumberAddress::range(range);
908 self.tx
909 .cursor_dup_read::<tables::StorageChangeSets>()?
910 .walk_range(storage_range)?
911 .map(|result| -> ProviderResult<_> { Ok(result?) })
912 .collect()
913 }
914}
915
916impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
917 fn account_block_changeset(
918 &self,
919 block_number: BlockNumber,
920 ) -> ProviderResult<Vec<AccountBeforeTx>> {
921 let range = block_number..=block_number;
922 self.tx
923 .cursor_read::<tables::AccountChangeSets>()?
924 .walk_range(range)?
925 .map(|result| -> ProviderResult<_> {
926 let (_, account_before) = result?;
927 Ok(account_before)
928 })
929 .collect()
930 }
931
932 fn get_account_before_block(
933 &self,
934 block_number: BlockNumber,
935 address: Address,
936 ) -> ProviderResult<Option<AccountBeforeTx>> {
937 self.tx
938 .cursor_dup_read::<tables::AccountChangeSets>()?
939 .seek_by_key_subkey(block_number, address)?
940 .filter(|acc| acc.address == address)
941 .map(Ok)
942 .transpose()
943 }
944}
945
946impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
947 for DatabaseProvider<TX, N>
948{
949 type Header = HeaderTy<N>;
950
951 fn local_tip_header(
952 &self,
953 highest_uninterrupted_block: BlockNumber,
954 ) -> ProviderResult<SealedHeader<Self::Header>> {
955 let static_file_provider = self.static_file_provider();
956
957 let next_static_file_block_num = static_file_provider
960 .get_highest_static_file_block(StaticFileSegment::Headers)
961 .map(|id| id + 1)
962 .unwrap_or_default();
963 let next_block = highest_uninterrupted_block + 1;
964
965 match next_static_file_block_num.cmp(&next_block) {
966 Ordering::Greater => {
969 let mut static_file_producer =
970 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
971 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
972 static_file_producer.commit()?
975 }
976 Ordering::Less => {
977 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
979 }
980 Ordering::Equal => {}
981 }
982
983 let local_head = static_file_provider
984 .sealed_header(highest_uninterrupted_block)?
985 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
986
987 Ok(local_head)
988 }
989}
990
991impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
992 type Header = HeaderTy<N>;
993
994 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
995 if let Some(num) = self.block_number(block_hash)? {
996 Ok(self.header_by_number(num)?)
997 } else {
998 Ok(None)
999 }
1000 }
1001
1002 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1003 self.static_file_provider.header_by_number(num)
1004 }
1005
1006 fn headers_range(
1007 &self,
1008 range: impl RangeBounds<BlockNumber>,
1009 ) -> ProviderResult<Vec<Self::Header>> {
1010 self.static_file_provider.headers_range(range)
1011 }
1012
1013 fn sealed_header(
1014 &self,
1015 number: BlockNumber,
1016 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1017 self.static_file_provider.sealed_header(number)
1018 }
1019
1020 fn sealed_headers_while(
1021 &self,
1022 range: impl RangeBounds<BlockNumber>,
1023 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1024 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1025 self.static_file_provider.sealed_headers_while(range, predicate)
1026 }
1027}
1028
1029impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1030 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1031 self.static_file_provider.block_hash(number)
1032 }
1033
1034 fn canonical_hashes_range(
1035 &self,
1036 start: BlockNumber,
1037 end: BlockNumber,
1038 ) -> ProviderResult<Vec<B256>> {
1039 self.static_file_provider.canonical_hashes_range(start, end)
1040 }
1041}
1042
1043impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1044 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1045 let best_number = self.best_block_number()?;
1046 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1047 Ok(ChainInfo { best_hash, best_number })
1048 }
1049
1050 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1051 Ok(self
1054 .get_stage_checkpoint(StageId::Finish)?
1055 .map(|checkpoint| checkpoint.block_number)
1056 .unwrap_or_default())
1057 }
1058
1059 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1060 self.static_file_provider.last_block_number()
1061 }
1062
1063 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1064 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1065 }
1066}
1067
1068impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1069 type Block = BlockTy<N>;
1070
1071 fn find_block_by_hash(
1072 &self,
1073 hash: B256,
1074 source: BlockSource,
1075 ) -> ProviderResult<Option<Self::Block>> {
1076 if source.is_canonical() {
1077 self.block(hash.into())
1078 } else {
1079 Ok(None)
1080 }
1081 }
1082
1083 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1089 if let Some(number) = self.convert_hash_or_number(id)? &&
1090 let Some(header) = self.header_by_number(number)?
1091 {
1092 let Some(transactions) = self.transactions_by_block(number.into())? else {
1097 return Ok(None)
1098 };
1099
1100 let body = self
1101 .storage
1102 .reader()
1103 .read_block_bodies(self, vec![(&header, transactions)])?
1104 .pop()
1105 .ok_or(ProviderError::InvalidStorageOutput)?;
1106
1107 return Ok(Some(Self::Block::new(header, body)))
1108 }
1109
1110 Ok(None)
1111 }
1112
1113 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1114 Ok(None)
1115 }
1116
1117 fn pending_block_and_receipts(
1118 &self,
1119 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1120 Ok(None)
1121 }
1122
1123 fn recovered_block(
1132 &self,
1133 id: BlockHashOrNumber,
1134 transaction_kind: TransactionVariant,
1135 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1136 self.recovered_block(
1137 id,
1138 transaction_kind,
1139 |block_number| self.header_by_number(block_number),
1140 |header, body, senders| {
1141 Self::Block::new(header, body)
1142 .try_into_recovered_unchecked(senders)
1146 .map(Some)
1147 .map_err(|_| ProviderError::SenderRecoveryError)
1148 },
1149 )
1150 }
1151
1152 fn sealed_block_with_senders(
1153 &self,
1154 id: BlockHashOrNumber,
1155 transaction_kind: TransactionVariant,
1156 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1157 self.recovered_block(
1158 id,
1159 transaction_kind,
1160 |block_number| self.sealed_header(block_number),
1161 |header, body, senders| {
1162 Self::Block::new_sealed(header, body)
1163 .try_with_senders_unchecked(senders)
1167 .map(Some)
1168 .map_err(|_| ProviderError::SenderRecoveryError)
1169 },
1170 )
1171 }
1172
1173 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1174 self.block_range(
1175 range,
1176 |range| self.headers_range(range),
1177 |header, body, _| Ok(Self::Block::new(header, body)),
1178 )
1179 }
1180
1181 fn block_with_senders_range(
1182 &self,
1183 range: RangeInclusive<BlockNumber>,
1184 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1185 self.block_with_senders_range(
1186 range,
1187 |range| self.headers_range(range),
1188 |header, body, senders| {
1189 Self::Block::new(header, body)
1190 .try_into_recovered_unchecked(senders)
1191 .map_err(|_| ProviderError::SenderRecoveryError)
1192 },
1193 )
1194 }
1195
1196 fn recovered_block_range(
1197 &self,
1198 range: RangeInclusive<BlockNumber>,
1199 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1200 self.block_with_senders_range(
1201 range,
1202 |range| self.sealed_headers_range(range),
1203 |header, body, senders| {
1204 Self::Block::new_sealed(header, body)
1205 .try_with_senders(senders)
1206 .map_err(|_| ProviderError::SenderRecoveryError)
1207 },
1208 )
1209 }
1210
1211 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1212 Ok(self
1213 .tx
1214 .cursor_read::<tables::TransactionBlocks>()?
1215 .seek(id)
1216 .map(|b| b.map(|(_, bn)| bn))?)
1217 }
1218}
1219
1220impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1221 for DatabaseProvider<TX, N>
1222{
1223 fn transaction_hashes_by_range(
1226 &self,
1227 tx_range: Range<TxNumber>,
1228 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1229 self.static_file_provider.transaction_hashes_by_range(tx_range)
1230 }
1231}
1232
1233impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1235 type Transaction = TxTy<N>;
1236
1237 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1238 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1239 }
1240
1241 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1242 self.static_file_provider.transaction_by_id(id)
1243 }
1244
1245 fn transaction_by_id_unhashed(
1246 &self,
1247 id: TxNumber,
1248 ) -> ProviderResult<Option<Self::Transaction>> {
1249 self.static_file_provider.transaction_by_id_unhashed(id)
1250 }
1251
1252 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1253 if let Some(id) = self.transaction_id(hash)? {
1254 Ok(self.transaction_by_id_unhashed(id)?)
1255 } else {
1256 Ok(None)
1257 }
1258 }
1259
1260 fn transaction_by_hash_with_meta(
1261 &self,
1262 tx_hash: TxHash,
1263 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1264 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1265 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1266 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1267 let Some(sealed_header) = self.sealed_header(block_number)?
1268 {
1269 let (header, block_hash) = sealed_header.split();
1270 if let Some(block_body) = self.block_body_indices(block_number)? {
1271 let index = transaction_id - block_body.first_tx_num();
1276
1277 let meta = TransactionMeta {
1278 tx_hash,
1279 index,
1280 block_hash,
1281 block_number,
1282 base_fee: header.base_fee_per_gas(),
1283 excess_blob_gas: header.excess_blob_gas(),
1284 timestamp: header.timestamp(),
1285 };
1286
1287 return Ok(Some((transaction, meta)))
1288 }
1289 }
1290
1291 Ok(None)
1292 }
1293
1294 fn transactions_by_block(
1295 &self,
1296 id: BlockHashOrNumber,
1297 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1298 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1299 let Some(body) = self.block_body_indices(block_number)?
1300 {
1301 let tx_range = body.tx_num_range();
1302 return if tx_range.is_empty() {
1303 Ok(Some(Vec::new()))
1304 } else {
1305 self.transactions_by_tx_range(tx_range).map(Some)
1306 }
1307 }
1308 Ok(None)
1309 }
1310
1311 fn transactions_by_block_range(
1312 &self,
1313 range: impl RangeBounds<BlockNumber>,
1314 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1315 let range = to_range(range);
1316
1317 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1318 .into_iter()
1319 .map(|body| {
1320 let tx_num_range = body.tx_num_range();
1321 if tx_num_range.is_empty() {
1322 Ok(Vec::new())
1323 } else {
1324 self.transactions_by_tx_range(tx_num_range)
1325 }
1326 })
1327 .collect()
1328 }
1329
1330 fn transactions_by_tx_range(
1331 &self,
1332 range: impl RangeBounds<TxNumber>,
1333 ) -> ProviderResult<Vec<Self::Transaction>> {
1334 self.static_file_provider.transactions_by_tx_range(range)
1335 }
1336
1337 fn senders_by_tx_range(
1338 &self,
1339 range: impl RangeBounds<TxNumber>,
1340 ) -> ProviderResult<Vec<Address>> {
1341 if EitherWriterDestination::senders(self).is_static_file() {
1342 self.static_file_provider.senders_by_tx_range(range)
1343 } else {
1344 self.cursor_read_collect::<tables::TransactionSenders>(range)
1345 }
1346 }
1347
1348 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1349 if EitherWriterDestination::senders(self).is_static_file() {
1350 self.static_file_provider.transaction_sender(id)
1351 } else {
1352 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1353 }
1354 }
1355}
1356
1357impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1358 type Receipt = ReceiptTy<N>;
1359
1360 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1361 self.static_file_provider.get_with_static_file_or_database(
1362 StaticFileSegment::Receipts,
1363 id,
1364 |static_file| static_file.receipt(id),
1365 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1366 )
1367 }
1368
1369 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1370 if let Some(id) = self.transaction_id(hash)? {
1371 self.receipt(id)
1372 } else {
1373 Ok(None)
1374 }
1375 }
1376
1377 fn receipts_by_block(
1378 &self,
1379 block: BlockHashOrNumber,
1380 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1381 if let Some(number) = self.convert_hash_or_number(block)? &&
1382 let Some(body) = self.block_body_indices(number)?
1383 {
1384 let tx_range = body.tx_num_range();
1385 return if tx_range.is_empty() {
1386 Ok(Some(Vec::new()))
1387 } else {
1388 self.receipts_by_tx_range(tx_range).map(Some)
1389 }
1390 }
1391 Ok(None)
1392 }
1393
1394 fn receipts_by_tx_range(
1395 &self,
1396 range: impl RangeBounds<TxNumber>,
1397 ) -> ProviderResult<Vec<Self::Receipt>> {
1398 self.static_file_provider.get_range_with_static_file_or_database(
1399 StaticFileSegment::Receipts,
1400 to_range(range),
1401 |static_file, range, _| static_file.receipts_by_tx_range(range),
1402 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1403 |_| true,
1404 )
1405 }
1406
1407 fn receipts_by_block_range(
1408 &self,
1409 block_range: RangeInclusive<BlockNumber>,
1410 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1411 if block_range.is_empty() {
1412 return Ok(Vec::new());
1413 }
1414
1415 let mut block_body_indices = Vec::new();
1417 for block_num in block_range {
1418 if let Some(indices) = self.block_body_indices(block_num)? {
1419 block_body_indices.push(indices);
1420 } else {
1421 block_body_indices.push(StoredBlockBodyIndices::default());
1423 }
1424 }
1425
1426 if block_body_indices.is_empty() {
1427 return Ok(Vec::new());
1428 }
1429
1430 let non_empty_blocks: Vec<_> =
1432 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1433
1434 if non_empty_blocks.is_empty() {
1435 return Ok(vec![Vec::new(); block_body_indices.len()]);
1437 }
1438
1439 let first_tx = non_empty_blocks[0].first_tx_num();
1441 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1442
1443 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1445 let mut receipts_iter = all_receipts.into_iter();
1446
1447 let mut result = Vec::with_capacity(block_body_indices.len());
1449 for indices in &block_body_indices {
1450 if indices.tx_count == 0 {
1451 result.push(Vec::new());
1452 } else {
1453 let block_receipts =
1454 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1455 result.push(block_receipts);
1456 }
1457 }
1458
1459 Ok(result)
1460 }
1461}
1462
1463impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1464 for DatabaseProvider<TX, N>
1465{
1466 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1467 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1468 }
1469
1470 fn block_body_indices_range(
1471 &self,
1472 range: RangeInclusive<BlockNumber>,
1473 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1474 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1475 }
1476}
1477
1478impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1479 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1480 Ok(if let Some(encoded) = id.get_pre_encoded() {
1481 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1482 } else {
1483 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1484 })
1485 }
1486
1487 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1489 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1490 }
1491
1492 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1493 self.tx
1494 .cursor_read::<tables::StageCheckpoints>()?
1495 .walk(None)?
1496 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1497 .map_err(ProviderError::Database)
1498 }
1499}
1500
1501impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1502 fn save_stage_checkpoint(
1504 &self,
1505 id: StageId,
1506 checkpoint: StageCheckpoint,
1507 ) -> ProviderResult<()> {
1508 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1509 }
1510
1511 fn save_stage_checkpoint_progress(
1513 &self,
1514 id: StageId,
1515 checkpoint: Vec<u8>,
1516 ) -> ProviderResult<()> {
1517 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1518 }
1519
1520 fn update_pipeline_stages(
1521 &self,
1522 block_number: BlockNumber,
1523 drop_stage_checkpoint: bool,
1524 ) -> ProviderResult<()> {
1525 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1527 for stage_id in StageId::ALL {
1528 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1529 cursor.upsert(
1530 stage_id.to_string(),
1531 &StageCheckpoint {
1532 block_number,
1533 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1534 },
1535 )?;
1536 }
1537
1538 Ok(())
1539 }
1540}
1541
1542impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1543 fn plain_state_storages(
1544 &self,
1545 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1546 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1547 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1548
1549 addresses_with_keys
1550 .into_iter()
1551 .map(|(address, storage)| {
1552 storage
1553 .into_iter()
1554 .map(|key| -> ProviderResult<_> {
1555 Ok(plain_storage
1556 .seek_by_key_subkey(address, key)?
1557 .filter(|v| v.key == key)
1558 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1559 })
1560 .collect::<ProviderResult<Vec<_>>>()
1561 .map(|storage| (address, storage))
1562 })
1563 .collect::<ProviderResult<Vec<(_, _)>>>()
1564 }
1565
1566 fn changed_storages_with_range(
1567 &self,
1568 range: RangeInclusive<BlockNumber>,
1569 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1570 self.tx
1571 .cursor_read::<tables::StorageChangeSets>()?
1572 .walk_range(BlockNumberAddress::range(range))?
1573 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1576 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1577 accounts.entry(address).or_default().insert(storage_entry.key);
1578 Ok(accounts)
1579 })
1580 }
1581
1582 fn changed_storages_and_blocks_with_range(
1583 &self,
1584 range: RangeInclusive<BlockNumber>,
1585 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1586 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1587
1588 let storage_changeset_lists =
1589 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1590 BTreeMap::new(),
1591 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1592 let (index, storage) = entry?;
1593 storages
1594 .entry((index.address(), storage.key))
1595 .or_default()
1596 .push(index.block_number());
1597 Ok(storages)
1598 },
1599 )?;
1600
1601 Ok(storage_changeset_lists)
1602 }
1603}
1604
1605impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1606 for DatabaseProvider<TX, N>
1607{
1608 type Receipt = ReceiptTy<N>;
1609
1610 fn write_state(
1611 &self,
1612 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1613 is_value_known: OriginalValuesKnown,
1614 ) -> ProviderResult<()> {
1615 let first_block = execution_outcome.first_block();
1616 let block_count = execution_outcome.len() as u64;
1617 let last_block = execution_outcome.last_block();
1618 let block_range = first_block..=last_block;
1619
1620 let tip = self.last_block_number()?.max(last_block);
1621
1622 let (plain_state, reverts) =
1623 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1624
1625 self.write_state_reverts(reverts, first_block)?;
1626 self.write_state_changes(plain_state)?;
1627
1628 let block_indices: Vec<_> = self
1630 .block_body_indices_range(block_range)?
1631 .into_iter()
1632 .map(|b| b.first_tx_num)
1633 .collect();
1634
1635 if block_indices.len() < block_count as usize {
1637 let missing_blocks = block_count - block_indices.len() as u64;
1638 return Err(ProviderError::BlockBodyIndicesNotFound(
1639 last_block.saturating_sub(missing_blocks - 1),
1640 ));
1641 }
1642
1643 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
1644
1645 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1646 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1647
1648 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
1656 self.static_file_provider()
1657 .get_highest_static_file_tx(StaticFileSegment::Receipts)
1658 .is_none()) &&
1659 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
1660
1661 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1663 for (_, addresses) in contract_log_pruner.range(..first_block) {
1664 allowed_addresses.extend(addresses.iter().copied());
1665 }
1666
1667 for (idx, (receipts, first_tx_index)) in
1668 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1669 {
1670 let block_number = first_block + idx as u64;
1671
1672 receipts_writer.increment_block(block_number)?;
1674
1675 if prunable_receipts &&
1677 self.prune_modes
1678 .receipts
1679 .is_some_and(|mode| mode.should_prune(block_number, tip))
1680 {
1681 continue
1682 }
1683
1684 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1686 allowed_addresses.extend(new_addresses.iter().copied());
1687 }
1688
1689 for (idx, receipt) in receipts.iter().enumerate() {
1690 let receipt_idx = first_tx_index + idx as u64;
1691 if prunable_receipts &&
1694 has_contract_log_filter &&
1695 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1696 {
1697 continue
1698 }
1699
1700 receipts_writer.append_receipt(receipt_idx, receipt)?;
1701 }
1702 }
1703
1704 Ok(())
1705 }
1706
1707 fn write_state_reverts(
1708 &self,
1709 reverts: PlainStateReverts,
1710 first_block: BlockNumber,
1711 ) -> ProviderResult<()> {
1712 tracing::trace!("Writing storage changes");
1714 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1715 let mut storage_changeset_cursor =
1716 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1717 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1718 let block_number = first_block + block_index as BlockNumber;
1719
1720 tracing::trace!(block_number, "Writing block change");
1721 storage_changes.par_sort_unstable_by_key(|a| a.address);
1723 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1724 let storage_id = BlockNumberAddress((block_number, address));
1725
1726 let mut storage = storage_revert
1727 .into_iter()
1728 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1729 .collect::<Vec<_>>();
1730 storage.par_sort_unstable_by_key(|a| a.0);
1732
1733 let mut wiped_storage = Vec::new();
1741 if wiped {
1742 tracing::trace!(?address, "Wiping storage");
1743 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1744 wiped_storage.push((entry.key, entry.value));
1745 while let Some(entry) = storages_cursor.next_dup_val()? {
1746 wiped_storage.push((entry.key, entry.value))
1747 }
1748 }
1749 }
1750
1751 tracing::trace!(?address, ?storage, "Writing storage reverts");
1752 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1753 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1754 }
1755 }
1756 }
1757
1758 tracing::trace!("Writing account changes");
1760 let mut account_changeset_cursor =
1761 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1762
1763 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1764 let block_number = first_block + block_index as BlockNumber;
1765 account_block_reverts.par_sort_by_key(|a| a.0);
1767
1768 for (address, info) in account_block_reverts {
1769 account_changeset_cursor.append_dup(
1770 block_number,
1771 AccountBeforeTx { address, info: info.map(Into::into) },
1772 )?;
1773 }
1774 }
1775
1776 Ok(())
1777 }
1778
1779 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1780 changes.accounts.par_sort_by_key(|a| a.0);
1783 changes.storage.par_sort_by_key(|a| a.address);
1784 changes.contracts.par_sort_by_key(|a| a.0);
1785
1786 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1788 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1789 for (address, account) in changes.accounts {
1791 if let Some(account) = account {
1792 tracing::trace!(?address, "Updating plain state account");
1793 accounts_cursor.upsert(address, &account.into())?;
1794 } else if accounts_cursor.seek_exact(address)?.is_some() {
1795 tracing::trace!(?address, "Deleting plain state account");
1796 accounts_cursor.delete_current()?;
1797 }
1798 }
1799
1800 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1802 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1803 for (hash, bytecode) in changes.contracts {
1804 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1805 }
1806
1807 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1809 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1810 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1811 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1813 storages_cursor.delete_current_duplicates()?;
1814 }
1815 let mut storage = storage
1817 .into_iter()
1818 .map(|(k, value)| StorageEntry { key: k.into(), value })
1819 .collect::<Vec<_>>();
1820 storage.par_sort_unstable_by_key(|a| a.key);
1822
1823 for entry in storage {
1824 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1825 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1826 db_entry.key == entry.key
1827 {
1828 storages_cursor.delete_current()?;
1829 }
1830
1831 if !entry.value.is_zero() {
1832 storages_cursor.upsert(address, &entry)?;
1833 }
1834 }
1835 }
1836
1837 Ok(())
1838 }
1839
1840 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1841 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1843 for (hashed_address, account) in hashed_state.accounts() {
1844 if let Some(account) = account {
1845 hashed_accounts_cursor.upsert(*hashed_address, account)?;
1846 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
1847 hashed_accounts_cursor.delete_current()?;
1848 }
1849 }
1850
1851 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1853 let mut hashed_storage_cursor =
1854 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1855 for (hashed_address, storage) in sorted_storages {
1856 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1857 hashed_storage_cursor.delete_current_duplicates()?;
1858 }
1859
1860 for (hashed_slot, value) in storage.storage_slots_ref() {
1861 let entry = StorageEntry { key: *hashed_slot, value: *value };
1862
1863 if let Some(db_entry) =
1864 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1865 db_entry.key == entry.key
1866 {
1867 hashed_storage_cursor.delete_current()?;
1868 }
1869
1870 if !entry.value.is_zero() {
1871 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1872 }
1873 }
1874 }
1875
1876 Ok(())
1877 }
1878
1879 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1901 let range = block + 1..=self.last_block_number()?;
1902
1903 if range.is_empty() {
1904 return Ok(());
1905 }
1906
1907 let block_bodies = self.block_body_indices_range(range.clone())?;
1909
1910 let from_transaction_num =
1912 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1913
1914 let storage_range = BlockNumberAddress::range(range.clone());
1915
1916 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1917 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1918
1919 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1924 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1925
1926 let (state, _) = self.populate_bundle_state(
1927 account_changeset,
1928 storage_changeset,
1929 &mut plain_accounts_cursor,
1930 &mut plain_storage_cursor,
1931 )?;
1932
1933 for (address, (old_account, new_account, storage)) in &state {
1935 if old_account != new_account {
1937 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1938 if let Some(account) = old_account {
1939 plain_accounts_cursor.upsert(*address, account)?;
1940 } else if existing_entry.is_some() {
1941 plain_accounts_cursor.delete_current()?;
1942 }
1943 }
1944
1945 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1947 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1948 if plain_storage_cursor
1950 .seek_by_key_subkey(*address, *storage_key)?
1951 .filter(|s| s.key == *storage_key)
1952 .is_some()
1953 {
1954 plain_storage_cursor.delete_current()?
1955 }
1956
1957 if !old_storage_value.is_zero() {
1959 plain_storage_cursor.upsert(*address, &storage_entry)?;
1960 }
1961 }
1962 }
1963
1964 self.remove_receipts_from(from_transaction_num, block)?;
1965
1966 Ok(())
1967 }
1968
1969 fn take_state_above(
1991 &self,
1992 block: BlockNumber,
1993 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
1994 let range = block + 1..=self.last_block_number()?;
1995
1996 if range.is_empty() {
1997 return Ok(ExecutionOutcome::default())
1998 }
1999 let start_block_number = *range.start();
2000
2001 let block_bodies = self.block_body_indices_range(range.clone())?;
2003
2004 let from_transaction_num =
2006 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2007 let to_transaction_num =
2008 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2009
2010 let storage_range = BlockNumberAddress::range(range.clone());
2011
2012 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2013 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2014
2015 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2020 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2021
2022 let (state, reverts) = self.populate_bundle_state(
2025 account_changeset,
2026 storage_changeset,
2027 &mut plain_accounts_cursor,
2028 &mut plain_storage_cursor,
2029 )?;
2030
2031 for (address, (old_account, new_account, storage)) in &state {
2033 if old_account != new_account {
2035 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2036 if let Some(account) = old_account {
2037 plain_accounts_cursor.upsert(*address, account)?;
2038 } else if existing_entry.is_some() {
2039 plain_accounts_cursor.delete_current()?;
2040 }
2041 }
2042
2043 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2045 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2046 if plain_storage_cursor
2048 .seek_by_key_subkey(*address, *storage_key)?
2049 .filter(|s| s.key == *storage_key)
2050 .is_some()
2051 {
2052 plain_storage_cursor.delete_current()?
2053 }
2054
2055 if !old_storage_value.is_zero() {
2057 plain_storage_cursor.upsert(*address, &storage_entry)?;
2058 }
2059 }
2060 }
2061
2062 let mut receipts_iter = self
2064 .static_file_provider
2065 .get_range_with_static_file_or_database(
2066 StaticFileSegment::Receipts,
2067 from_transaction_num..to_transaction_num + 1,
2068 |static_file, range, _| {
2069 static_file
2070 .receipts_by_tx_range(range.clone())
2071 .map(|r| range.into_iter().zip(r).collect())
2072 },
2073 |range, _| {
2074 self.tx
2075 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2076 .walk_range(range)?
2077 .map(|r| r.map_err(Into::into))
2078 .collect()
2079 },
2080 |_| true,
2081 )?
2082 .into_iter()
2083 .peekable();
2084
2085 let mut receipts = Vec::with_capacity(block_bodies.len());
2086 for block_body in block_bodies {
2088 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2089 for num in block_body.tx_num_range() {
2090 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2091 block_receipts.push(receipts_iter.next().unwrap().1);
2092 }
2093 }
2094 receipts.push(block_receipts);
2095 }
2096
2097 self.remove_receipts_from(from_transaction_num, block)?;
2098
2099 Ok(ExecutionOutcome::new_init(
2100 state,
2101 reverts,
2102 Vec::new(),
2103 receipts,
2104 start_block_number,
2105 Vec::new(),
2106 ))
2107 }
2108}
2109
2110impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2111 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2115 if trie_updates.is_empty() {
2116 return Ok(0)
2117 }
2118
2119 let mut num_entries = 0;
2121
2122 let tx = self.tx_ref();
2123 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2124
2125 for (key, updated_node) in trie_updates.account_nodes_ref() {
2127 let nibbles = StoredNibbles(*key);
2128 match updated_node {
2129 Some(node) => {
2130 if !nibbles.0.is_empty() {
2131 num_entries += 1;
2132 account_trie_cursor.upsert(nibbles, node)?;
2133 }
2134 }
2135 None => {
2136 num_entries += 1;
2137 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2138 account_trie_cursor.delete_current()?;
2139 }
2140 }
2141 }
2142 }
2143
2144 num_entries +=
2145 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2146
2147 Ok(num_entries)
2148 }
2149
2150 fn write_trie_changesets(
2158 &self,
2159 block_number: BlockNumber,
2160 trie_updates: &TrieUpdatesSorted,
2161 updates_overlay: Option<&TrieUpdatesSorted>,
2162 ) -> ProviderResult<usize> {
2163 let mut num_entries = 0;
2164
2165 let mut changeset_cursor =
2166 self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2167 let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2168
2169 let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2171
2172 let empty_updates = TrieUpdatesSorted::default();
2174 let overlay = updates_overlay.unwrap_or(&empty_updates);
2175
2176 let mut in_memory_account_cursor =
2178 InMemoryTrieCursor::new_account(&mut db_account_cursor, overlay);
2179
2180 for (path, _) in trie_updates.account_nodes_ref() {
2181 num_entries += 1;
2182 let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2183 changeset_cursor.append_dup(
2184 block_number,
2185 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2186 )?;
2187 }
2188
2189 let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2190 storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2191
2192 num_entries += self.write_storage_trie_changesets(
2193 block_number,
2194 storage_updates.into_iter(),
2195 updates_overlay,
2196 )?;
2197
2198 Ok(num_entries)
2199 }
2200
2201 fn clear_trie_changesets(&self) -> ProviderResult<()> {
2202 let tx = self.tx_ref();
2203 tx.clear::<tables::AccountsTrieChangeSets>()?;
2204 tx.clear::<tables::StoragesTrieChangeSets>()?;
2205 Ok(())
2206 }
2207
2208 fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2209 let tx = self.tx_ref();
2210 {
2211 let range = from..;
2212 let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2213 let mut walker = cursor.walk_range(range)?;
2214
2215 while walker.next().transpose()?.is_some() {
2216 walker.delete_current()?;
2217 }
2218 }
2219
2220 {
2221 let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2222 let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2223 let mut walker = cursor.walk_range(range)?;
2224
2225 while walker.next().transpose()?.is_some() {
2226 walker.delete_current()?;
2227 }
2228 }
2229
2230 Ok(())
2231 }
2232}
2233
2234impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2235 fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2236 let tx = self.tx_ref();
2237
2238 let mut account_nodes = Vec::new();
2241 let mut seen_account_keys = HashSet::new();
2242 let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2243
2244 for entry in accounts_cursor.walk_range(from..)? {
2245 let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2246 if seen_account_keys.insert(nibbles.0) {
2248 account_nodes.push((nibbles.0, node));
2249 }
2250 }
2251
2252 account_nodes.sort_by_key(|(path, _)| *path);
2253
2254 let mut storage_tries = B256Map::<Vec<_>>::default();
2258 let mut seen_storage_keys = HashSet::new();
2259 let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2260
2261 let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2263
2264 for entry in storages_cursor.walk_range(storage_range_start..)? {
2265 let (
2266 BlockNumberHashedAddress((_, hashed_address)),
2267 TrieChangeSetsEntry { nibbles, node },
2268 ) = entry?;
2269
2270 if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2272 storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2273 }
2274 }
2275
2276 let storage_tries = storage_tries
2278 .into_iter()
2279 .map(|(address, mut nodes)| {
2280 nodes.sort_by_key(|(path, _)| *path);
2281 (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2282 })
2283 .collect();
2284
2285 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2286 }
2287
2288 fn get_block_trie_updates(
2289 &self,
2290 block_number: BlockNumber,
2291 ) -> ProviderResult<TrieUpdatesSorted> {
2292 let tx = self.tx_ref();
2293
2294 let reverts = self.trie_reverts(block_number + 1)?;
2296
2297 let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2300 let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2301
2302 let mut account_nodes = Vec::new();
2304
2305 let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2307 let mut account_cursor = cursor_factory.account_trie_cursor()?;
2308
2309 for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2310 let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2311 let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2313 account_nodes.push((nibbles.0, node_value));
2314 }
2315
2316 let mut storage_tries = B256Map::default();
2318 let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2319 let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2320 let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2321
2322 let mut current_hashed_address = None;
2323 let mut storage_cursor = None;
2324
2325 for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2326 let (
2327 BlockNumberHashedAddress((_, hashed_address)),
2328 TrieChangeSetsEntry { nibbles, .. },
2329 ) = entry?;
2330
2331 if current_hashed_address != Some(hashed_address) {
2333 storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2334 current_hashed_address = Some(hashed_address);
2335 }
2336
2337 let cursor =
2339 storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2340 let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2341 storage_tries
2342 .entry(hashed_address)
2343 .or_insert_with(|| StorageTrieUpdatesSorted {
2344 storage_nodes: Vec::new(),
2345 is_deleted: false,
2346 })
2347 .storage_nodes
2348 .push((nibbles.0, node_value));
2349 }
2350
2351 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2352 }
2353}
2354
2355impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2356 fn write_storage_trie_updates_sorted<'a>(
2362 &self,
2363 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2364 ) -> ProviderResult<usize> {
2365 let mut num_entries = 0;
2366 let mut storage_tries = storage_tries.collect::<Vec<_>>();
2367 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2368 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2369 for (hashed_address, storage_trie_updates) in storage_tries {
2370 let mut db_storage_trie_cursor =
2371 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2372 num_entries +=
2373 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2374 cursor = db_storage_trie_cursor.cursor;
2375 }
2376
2377 Ok(num_entries)
2378 }
2379
2380 fn write_storage_trie_changesets<'a>(
2388 &self,
2389 block_number: BlockNumber,
2390 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2391 updates_overlay: Option<&TrieUpdatesSorted>,
2392 ) -> ProviderResult<usize> {
2393 let mut num_written = 0;
2394
2395 let mut changeset_cursor =
2396 self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2397
2398 let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2402 let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2403
2404 let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2408 changed_curr_values_cursor,
2409 B256::default(), );
2411 let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2412 wiped_nodes_cursor,
2413 B256::default(), );
2415
2416 let empty_updates = TrieUpdatesSorted::default();
2418
2419 for (hashed_address, storage_trie_updates) in storage_tries {
2420 let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2421
2422 changed_curr_values_cursor =
2424 DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2425
2426 let overlay = updates_overlay.unwrap_or(&empty_updates);
2428
2429 let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
2431 &mut changed_curr_values_cursor,
2432 overlay,
2433 *hashed_address,
2434 );
2435
2436 let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2439 storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2440 &mut in_memory_changed_cursor,
2441 )?;
2442
2443 if storage_trie_updates.is_deleted() {
2444 wiped_nodes_cursor =
2447 DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2448
2449 let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
2451 &mut wiped_nodes_cursor,
2452 overlay,
2453 *hashed_address,
2454 );
2455
2456 let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2457
2458 for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2459 let (path, node) = wiped?;
2460 num_written += 1;
2461 changeset_cursor.append_dup(
2462 changeset_key,
2463 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2464 )?;
2465 }
2466 } else {
2467 for curr_value in curr_values_of_changed {
2468 let (path, node) = curr_value?;
2469 num_written += 1;
2470 changeset_cursor.append_dup(
2471 changeset_key,
2472 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2473 )?;
2474 }
2475 }
2476 }
2477
2478 Ok(num_written)
2479 }
2480}
2481
2482impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2483 fn unwind_account_hashing<'a>(
2484 &self,
2485 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2486 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2487 let hashed_accounts = changesets
2491 .into_iter()
2492 .map(|(_, e)| (keccak256(e.address), e.info))
2493 .collect::<Vec<_>>()
2494 .into_iter()
2495 .rev()
2496 .collect::<BTreeMap<_, _>>();
2497
2498 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2500 for (hashed_address, account) in &hashed_accounts {
2501 if let Some(account) = account {
2502 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2503 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2504 hashed_accounts_cursor.delete_current()?;
2505 }
2506 }
2507
2508 Ok(hashed_accounts)
2509 }
2510
2511 fn unwind_account_hashing_range(
2512 &self,
2513 range: impl RangeBounds<BlockNumber>,
2514 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2515 let changesets = self
2516 .tx
2517 .cursor_read::<tables::AccountChangeSets>()?
2518 .walk_range(range)?
2519 .collect::<Result<Vec<_>, _>>()?;
2520 self.unwind_account_hashing(changesets.iter())
2521 }
2522
2523 fn insert_account_for_hashing(
2524 &self,
2525 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2526 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2527 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2528 let hashed_accounts =
2529 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2530 for (hashed_address, account) in &hashed_accounts {
2531 if let Some(account) = account {
2532 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2533 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2534 hashed_accounts_cursor.delete_current()?;
2535 }
2536 }
2537 Ok(hashed_accounts)
2538 }
2539
2540 fn unwind_storage_hashing(
2541 &self,
2542 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2543 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2544 let mut hashed_storages = changesets
2546 .into_iter()
2547 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2548 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2549 })
2550 .collect::<Vec<_>>();
2551 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2552
2553 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2555 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2556 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2557 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2558 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2559
2560 if hashed_storage
2561 .seek_by_key_subkey(hashed_address, key)?
2562 .filter(|entry| entry.key == key)
2563 .is_some()
2564 {
2565 hashed_storage.delete_current()?;
2566 }
2567
2568 if !value.is_zero() {
2569 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2570 }
2571 }
2572 Ok(hashed_storage_keys)
2573 }
2574
2575 fn unwind_storage_hashing_range(
2576 &self,
2577 range: impl RangeBounds<BlockNumberAddress>,
2578 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2579 let changesets = self
2580 .tx
2581 .cursor_read::<tables::StorageChangeSets>()?
2582 .walk_range(range)?
2583 .collect::<Result<Vec<_>, _>>()?;
2584 self.unwind_storage_hashing(changesets.into_iter())
2585 }
2586
2587 fn insert_storage_for_hashing(
2588 &self,
2589 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2590 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2591 let hashed_storages =
2593 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2594 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2595 map.insert(keccak256(entry.key), entry.value);
2596 map
2597 });
2598 map.insert(keccak256(address), storage);
2599 map
2600 });
2601
2602 let hashed_storage_keys = hashed_storages
2603 .iter()
2604 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2605 .collect();
2606
2607 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2608 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2611 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2612 if hashed_storage_cursor
2613 .seek_by_key_subkey(hashed_address, key)?
2614 .filter(|entry| entry.key == key)
2615 .is_some()
2616 {
2617 hashed_storage_cursor.delete_current()?;
2618 }
2619
2620 if !value.is_zero() {
2621 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2622 }
2623 Ok(())
2624 })
2625 })?;
2626
2627 Ok(hashed_storage_keys)
2628 }
2629}
2630
2631impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2632 fn unwind_account_history_indices<'a>(
2633 &self,
2634 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2635 ) -> ProviderResult<usize> {
2636 let mut last_indices = changesets
2637 .into_iter()
2638 .map(|(index, account)| (account.address, *index))
2639 .collect::<Vec<_>>();
2640 last_indices.sort_by_key(|(a, _)| *a);
2641
2642 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2644 for &(address, rem_index) in &last_indices {
2645 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2646 &mut cursor,
2647 ShardedKey::last(address),
2648 rem_index,
2649 |sharded_key| sharded_key.key == address,
2650 )?;
2651
2652 if !partial_shard.is_empty() {
2655 cursor.insert(
2656 ShardedKey::last(address),
2657 &BlockNumberList::new_pre_sorted(partial_shard),
2658 )?;
2659 }
2660 }
2661
2662 let changesets = last_indices.len();
2663 Ok(changesets)
2664 }
2665
2666 fn unwind_account_history_indices_range(
2667 &self,
2668 range: impl RangeBounds<BlockNumber>,
2669 ) -> ProviderResult<usize> {
2670 let changesets = self
2671 .tx
2672 .cursor_read::<tables::AccountChangeSets>()?
2673 .walk_range(range)?
2674 .collect::<Result<Vec<_>, _>>()?;
2675 self.unwind_account_history_indices(changesets.iter())
2676 }
2677
2678 fn insert_account_history_index(
2679 &self,
2680 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2681 ) -> ProviderResult<()> {
2682 self.append_history_index::<_, tables::AccountsHistory>(
2683 account_transitions,
2684 ShardedKey::new,
2685 )
2686 }
2687
2688 fn unwind_storage_history_indices(
2689 &self,
2690 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2691 ) -> ProviderResult<usize> {
2692 let mut storage_changesets = changesets
2693 .into_iter()
2694 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2695 .collect::<Vec<_>>();
2696 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2697
2698 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2699 for &(address, storage_key, rem_index) in &storage_changesets {
2700 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2701 &mut cursor,
2702 StorageShardedKey::last(address, storage_key),
2703 rem_index,
2704 |storage_sharded_key| {
2705 storage_sharded_key.address == address &&
2706 storage_sharded_key.sharded_key.key == storage_key
2707 },
2708 )?;
2709
2710 if !partial_shard.is_empty() {
2713 cursor.insert(
2714 StorageShardedKey::last(address, storage_key),
2715 &BlockNumberList::new_pre_sorted(partial_shard),
2716 )?;
2717 }
2718 }
2719
2720 let changesets = storage_changesets.len();
2721 Ok(changesets)
2722 }
2723
2724 fn unwind_storage_history_indices_range(
2725 &self,
2726 range: impl RangeBounds<BlockNumberAddress>,
2727 ) -> ProviderResult<usize> {
2728 let changesets = self
2729 .tx
2730 .cursor_read::<tables::StorageChangeSets>()?
2731 .walk_range(range)?
2732 .collect::<Result<Vec<_>, _>>()?;
2733 self.unwind_storage_history_indices(changesets.into_iter())
2734 }
2735
2736 fn insert_storage_history_index(
2737 &self,
2738 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2739 ) -> ProviderResult<()> {
2740 self.append_history_index::<_, tables::StoragesHistory>(
2741 storage_transitions,
2742 |(address, storage_key), highest_block_number| {
2743 StorageShardedKey::new(address, storage_key, highest_block_number)
2744 },
2745 )
2746 }
2747
2748 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2749 {
2751 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2752 self.insert_account_history_index(indices)?;
2753 }
2754
2755 {
2757 let indices = self.changed_storages_and_blocks_with_range(range)?;
2758 self.insert_storage_history_index(indices)?;
2759 }
2760
2761 Ok(())
2762 }
2763}
2764
2765impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2766 for DatabaseProvider<TX, N>
2767{
2768 fn take_block_and_execution_above(
2769 &self,
2770 block: BlockNumber,
2771 ) -> ProviderResult<Chain<Self::Primitives>> {
2772 let range = block + 1..=self.last_block_number()?;
2773
2774 self.unwind_trie_state_from(block + 1)?;
2775
2776 let execution_state = self.take_state_above(block)?;
2778
2779 let blocks = self.recovered_block_range(range)?;
2780
2781 self.remove_blocks_above(block)?;
2784
2785 self.update_pipeline_stages(block, true)?;
2787
2788 Ok(Chain::new(blocks, execution_state, None))
2789 }
2790
2791 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2792 self.unwind_trie_state_from(block + 1)?;
2793
2794 self.remove_state_above(block)?;
2796
2797 self.remove_blocks_above(block)?;
2800
2801 self.update_pipeline_stages(block, true)?;
2803
2804 Ok(())
2805 }
2806}
2807
2808impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2809 for DatabaseProvider<TX, N>
2810{
2811 type Block = BlockTy<N>;
2812 type Receipt = ReceiptTy<N>;
2813
2814 fn insert_block(
2836 &self,
2837 block: RecoveredBlock<Self::Block>,
2838 ) -> ProviderResult<StoredBlockBodyIndices> {
2839 let block_number = block.number();
2840 let tx_count = block.body().transaction_count() as u64;
2841
2842 let mut durations_recorder = metrics::DurationsRecorder::default();
2843
2844 self.static_file_provider
2845 .get_writer(block_number, StaticFileSegment::Headers)?
2846 .append_header(block.header(), &block.hash())?;
2847
2848 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2849 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2850
2851 let first_tx_num = self
2852 .tx
2853 .cursor_read::<tables::TransactionBlocks>()?
2854 .last()?
2855 .map(|(n, _)| n + 1)
2856 .unwrap_or_default();
2857 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2858
2859 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
2860
2861 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2862 let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
2863 senders_writer.increment_block(block.number())?;
2864 senders_writer
2865 .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
2866 durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
2867 }
2868
2869 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2870 for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
2871 let hash = transaction.tx_hash();
2872 self.tx.put::<tables::TransactionHashNumbers>(*hash, tx_num)?;
2873 }
2874 durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
2875 }
2876
2877 self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?;
2878
2879 debug!(
2880 target: "providers::db",
2881 ?block_number,
2882 actions = ?durations_recorder.actions,
2883 "Inserted block"
2884 );
2885
2886 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2887 }
2888
2889 fn append_block_bodies(
2890 &self,
2891 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2892 ) -> ProviderResult<()> {
2893 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2894
2895 let mut tx_writer =
2897 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2898
2899 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2900 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2901
2902 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2904
2905 for (block_number, body) in &bodies {
2906 tx_writer.increment_block(*block_number)?;
2908
2909 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2910 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2911
2912 let mut durations_recorder = metrics::DurationsRecorder::default();
2913
2914 block_indices_cursor.append(*block_number, &block_indices)?;
2916
2917 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2918
2919 let Some(body) = body else { continue };
2920
2921 if !body.transactions().is_empty() {
2923 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2924 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2925 }
2926
2927 for transaction in body.transactions() {
2929 tx_writer.append_transaction(next_tx_num, transaction)?;
2930
2931 next_tx_num += 1;
2933 }
2934 }
2935
2936 self.storage.writer().write_block_bodies(self, bodies)?;
2937
2938 Ok(())
2939 }
2940
2941 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2942 let last_block_number = self.last_block_number()?;
2943 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
2945 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2946 }
2947
2948 let highest_static_file_block = self
2950 .static_file_provider()
2951 .get_highest_static_file_block(StaticFileSegment::Headers)
2952 .expect("todo: error handling, headers should exist");
2953
2954 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
2960 self.static_file_provider()
2961 .get_writer(block, StaticFileSegment::Headers)?
2962 .prune_headers(highest_static_file_block.saturating_sub(block))?;
2963
2964 let unwind_tx_from = self
2966 .block_body_indices(block)?
2967 .map(|b| b.next_tx_num())
2968 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2969
2970 let unwind_tx_to = self
2972 .tx
2973 .cursor_read::<tables::BlockBodyIndices>()?
2974 .last()?
2975 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2977 .1
2978 .last_tx_num();
2979
2980 if unwind_tx_from <= unwind_tx_to {
2981 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2982 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2983 }
2984 }
2985
2986 EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
2987
2988 self.remove_bodies_above(block)?;
2989
2990 Ok(())
2991 }
2992
2993 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
2994 self.storage.writer().remove_block_bodies_above(self, block)?;
2995
2996 let unwind_tx_from = self
2998 .block_body_indices(block)?
2999 .map(|b| b.next_tx_num())
3000 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3001
3002 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3003 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3004
3005 let static_file_tx_num =
3006 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3007
3008 let to_delete = static_file_tx_num
3009 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3010 .unwrap_or_default();
3011
3012 self.static_file_provider
3013 .latest_writer(StaticFileSegment::Transactions)?
3014 .prune_transactions(to_delete, block)?;
3015
3016 Ok(())
3017 }
3018
3019 fn append_blocks_with_state(
3021 &self,
3022 blocks: Vec<RecoveredBlock<Self::Block>>,
3023 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3024 hashed_state: HashedPostStateSorted,
3025 ) -> ProviderResult<()> {
3026 if blocks.is_empty() {
3027 debug!(target: "providers::db", "Attempted to append empty block range");
3028 return Ok(())
3029 }
3030
3031 let first_number = blocks[0].number();
3034
3035 let last_block_number = blocks[blocks.len() - 1].number();
3038
3039 let mut durations_recorder = metrics::DurationsRecorder::default();
3040
3041 for block in blocks {
3043 self.insert_block(block)?;
3044 durations_recorder.record_relative(metrics::Action::InsertBlock);
3045 }
3046
3047 self.write_state(execution_outcome, OriginalValuesKnown::No)?;
3048 durations_recorder.record_relative(metrics::Action::InsertState);
3049
3050 self.write_hashed_state(&hashed_state)?;
3052 durations_recorder.record_relative(metrics::Action::InsertHashes);
3053
3054 self.update_history_indices(first_number..=last_block_number)?;
3055 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3056
3057 self.update_pipeline_stages(last_block_number, false)?;
3059 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3060
3061 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3062
3063 Ok(())
3064 }
3065}
3066
3067impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3068 fn get_prune_checkpoint(
3069 &self,
3070 segment: PruneSegment,
3071 ) -> ProviderResult<Option<PruneCheckpoint>> {
3072 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3073 }
3074
3075 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3076 Ok(PruneSegment::variants()
3077 .filter_map(|segment| {
3078 self.tx
3079 .get::<tables::PruneCheckpoints>(segment)
3080 .transpose()
3081 .map(|chk| chk.map(|chk| (segment, chk)))
3082 })
3083 .collect::<Result<_, _>>()?)
3084 }
3085}
3086
3087impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3088 fn save_prune_checkpoint(
3089 &self,
3090 segment: PruneSegment,
3091 checkpoint: PruneCheckpoint,
3092 ) -> ProviderResult<()> {
3093 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3094 }
3095}
3096
3097impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3098 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3099 let db_entries = self.tx.entries::<T>()?;
3100 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3101 Ok(entries) => entries,
3102 Err(ProviderError::UnsupportedProvider) => 0,
3103 Err(err) => return Err(err),
3104 };
3105
3106 Ok(db_entries + static_file_entries)
3107 }
3108}
3109
3110impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3111 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3112 let mut finalized_blocks = self
3113 .tx
3114 .cursor_read::<tables::ChainState>()?
3115 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3116 .take(1)
3117 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3118
3119 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3120 Ok(last_finalized_block_number)
3121 }
3122
3123 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3124 let mut finalized_blocks = self
3125 .tx
3126 .cursor_read::<tables::ChainState>()?
3127 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3128 .take(1)
3129 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3130
3131 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3132 Ok(last_finalized_block_number)
3133 }
3134}
3135
3136impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3137 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3138 Ok(self
3139 .tx
3140 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3141 }
3142
3143 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3144 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3145 }
3146}
3147
3148impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3149 type Tx = TX;
3150
3151 fn tx_ref(&self) -> &Self::Tx {
3152 &self.tx
3153 }
3154
3155 fn tx_mut(&mut self) -> &mut Self::Tx {
3156 &mut self.tx
3157 }
3158
3159 fn into_tx(self) -> Self::Tx {
3160 self.tx
3161 }
3162
3163 fn prune_modes_ref(&self) -> &PruneModes {
3164 self.prune_modes_ref()
3165 }
3166
3167 fn commit(self) -> ProviderResult<bool> {
3169 if self.static_file_provider.has_unwind_queued() {
3174 self.tx.commit()?;
3175 self.static_file_provider.commit()?;
3176 } else {
3177 self.static_file_provider.commit()?;
3178 self.tx.commit()?;
3179 }
3180
3181 Ok(true)
3182 }
3183}
3184
3185impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3186 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3187 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3188 }
3189}
3190
3191impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3192 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3193 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3194 }
3195}
3196
3197impl<TX: Send + Sync, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3198 fn cached_storage_settings(&self) -> StorageSettings {
3199 *self.storage_settings.read()
3200 }
3201
3202 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3203 *self.storage_settings.write() = settings;
3204 }
3205}
3206
3207#[cfg(test)]
3208mod tests {
3209 use super::*;
3210 use crate::{
3211 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3212 BlockWriter,
3213 };
3214 use reth_ethereum_primitives::Receipt;
3215 use reth_testing_utils::generators::{self, random_block, BlockParams};
3216 use reth_trie::Nibbles;
3217
3218 #[test]
3219 fn test_receipts_by_block_range_empty_range() {
3220 let factory = create_test_provider_factory();
3221 let provider = factory.provider().unwrap();
3222
3223 let start = 10u64;
3225 let end = 9u64;
3226 let result = provider.receipts_by_block_range(start..=end).unwrap();
3227 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3228 }
3229
3230 #[test]
3231 fn test_receipts_by_block_range_nonexistent_blocks() {
3232 let factory = create_test_provider_factory();
3233 let provider = factory.provider().unwrap();
3234
3235 let result = provider.receipts_by_block_range(10..=12).unwrap();
3237 assert_eq!(result, vec![vec![], vec![], vec![]]);
3238 }
3239
3240 #[test]
3241 fn test_receipts_by_block_range_single_block() {
3242 let factory = create_test_provider_factory();
3243 let data = BlockchainTestData::default();
3244
3245 let provider_rw = factory.provider_rw().unwrap();
3246 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3247 provider_rw
3248 .write_state(
3249 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3250 crate::OriginalValuesKnown::No,
3251 )
3252 .unwrap();
3253 provider_rw.insert_block(data.blocks[0].0.clone()).unwrap();
3254 provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3255 provider_rw.commit().unwrap();
3256
3257 let provider = factory.provider().unwrap();
3258 let result = provider.receipts_by_block_range(1..=1).unwrap();
3259
3260 assert_eq!(result.len(), 1);
3262 assert_eq!(result[0].len(), 1);
3263 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3264 }
3265
3266 #[test]
3267 fn test_receipts_by_block_range_multiple_blocks() {
3268 let factory = create_test_provider_factory();
3269 let data = BlockchainTestData::default();
3270
3271 let provider_rw = factory.provider_rw().unwrap();
3272 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3273 provider_rw
3274 .write_state(
3275 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3276 crate::OriginalValuesKnown::No,
3277 )
3278 .unwrap();
3279 for i in 0..3 {
3280 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3281 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3282 }
3283 provider_rw.commit().unwrap();
3284
3285 let provider = factory.provider().unwrap();
3286 let result = provider.receipts_by_block_range(1..=3).unwrap();
3287
3288 assert_eq!(result.len(), 3);
3290 for (i, block_receipts) in result.iter().enumerate() {
3291 assert_eq!(block_receipts.len(), 1);
3292 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3293 }
3294 }
3295
3296 #[test]
3297 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3298 let factory = create_test_provider_factory();
3299 let data = BlockchainTestData::default();
3300
3301 let provider_rw = factory.provider_rw().unwrap();
3302 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3303 provider_rw
3304 .write_state(
3305 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3306 crate::OriginalValuesKnown::No,
3307 )
3308 .unwrap();
3309
3310 for i in 0..3 {
3312 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3313 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3314 }
3315 provider_rw.commit().unwrap();
3316
3317 let provider = factory.provider().unwrap();
3318 let result = provider.receipts_by_block_range(1..=3).unwrap();
3319
3320 assert_eq!(result.len(), 3);
3322 for block_receipts in &result {
3323 assert_eq!(block_receipts.len(), 1);
3324 }
3325 }
3326
3327 #[test]
3328 fn test_receipts_by_block_range_partial_range() {
3329 let factory = create_test_provider_factory();
3330 let data = BlockchainTestData::default();
3331
3332 let provider_rw = factory.provider_rw().unwrap();
3333 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3334 provider_rw
3335 .write_state(
3336 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3337 crate::OriginalValuesKnown::No,
3338 )
3339 .unwrap();
3340 for i in 0..3 {
3341 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3342 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3343 }
3344 provider_rw.commit().unwrap();
3345
3346 let provider = factory.provider().unwrap();
3347
3348 let result = provider.receipts_by_block_range(2..=5).unwrap();
3350 assert_eq!(result.len(), 4);
3351
3352 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3359 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3360 }
3361
3362 #[test]
3363 fn test_receipts_by_block_range_all_empty_blocks() {
3364 let factory = create_test_provider_factory();
3365 let mut rng = generators::rng();
3366
3367 let mut blocks = Vec::new();
3369 for i in 0..3 {
3370 let block =
3371 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3372 blocks.push(block);
3373 }
3374
3375 let provider_rw = factory.provider_rw().unwrap();
3376 for block in blocks {
3377 provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
3378 }
3379 provider_rw.commit().unwrap();
3380
3381 let provider = factory.provider().unwrap();
3382 let result = provider.receipts_by_block_range(1..=3).unwrap();
3383
3384 assert_eq!(result.len(), 3);
3385 for block_receipts in result {
3386 assert_eq!(block_receipts.len(), 0);
3387 }
3388 }
3389
3390 #[test]
3391 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3392 let factory = create_test_provider_factory();
3393 let data = BlockchainTestData::default();
3394
3395 let provider_rw = factory.provider_rw().unwrap();
3396 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3397 provider_rw
3398 .write_state(
3399 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3400 crate::OriginalValuesKnown::No,
3401 )
3402 .unwrap();
3403 for i in 0..3 {
3404 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3405 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3406 }
3407 provider_rw.commit().unwrap();
3408
3409 let provider = factory.provider().unwrap();
3410
3411 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3413
3414 let mut individual_results = Vec::new();
3416 for block_num in 1..=3 {
3417 let receipts =
3418 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3419 individual_results.push(receipts);
3420 }
3421
3422 assert_eq!(range_result, individual_results);
3423 }
3424
3425 #[test]
3426 fn test_write_trie_changesets() {
3427 use reth_db_api::models::BlockNumberHashedAddress;
3428 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3429
3430 let factory = create_test_provider_factory();
3431 let provider_rw = factory.provider_rw().unwrap();
3432
3433 let block_number = 1u64;
3434
3435 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3437 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3438
3439 let node1 = BranchNodeCompact::new(
3440 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3446
3447 {
3449 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3450 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3451 }
3452
3453 let account_nodes = vec![
3455 (account_nibbles1, Some(node1.clone())), (account_nibbles2, None), ];
3458
3459 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3464 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3465 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3466
3467 let storage_node1 = BranchNodeCompact::new(
3468 0b1111_0000_0000_0000,
3469 0b0000_0000_0000_0000,
3470 0b0000_0000_0000_0000,
3471 vec![],
3472 None,
3473 );
3474
3475 let storage_node2 = BranchNodeCompact::new(
3476 0b0000_1111_0000_0000,
3477 0b0000_0000_0000_0000,
3478 0b0000_0000_0000_0000,
3479 vec![],
3480 None,
3481 );
3482
3483 let storage_node1_old = BranchNodeCompact::new(
3485 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3487 0b0000_0000_0000_0000,
3488 vec![],
3489 None,
3490 );
3491
3492 {
3494 let mut cursor =
3495 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3496 let entry = StorageTrieEntry {
3498 nibbles: StoredNibblesSubKey(storage_nibbles1),
3499 node: storage_node1_old.clone(),
3500 };
3501 cursor.upsert(storage_address1, &entry).unwrap();
3502 }
3503
3504 {
3506 let mut cursor =
3507 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3508 let entry1 = StorageTrieEntry {
3510 nibbles: StoredNibblesSubKey(storage_nibbles1),
3511 node: storage_node1.clone(),
3512 };
3513 cursor.upsert(storage_address2, &entry1).unwrap();
3514 let entry3 = StorageTrieEntry {
3516 nibbles: StoredNibblesSubKey(storage_nibbles3),
3517 node: storage_node2.clone(),
3518 };
3519 cursor.upsert(storage_address2, &entry3).unwrap();
3520 }
3521
3522 let storage_trie1 = StorageTrieUpdatesSorted {
3524 is_deleted: false,
3525 storage_nodes: vec![
3526 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3529 };
3530
3531 let storage_trie2 = StorageTrieUpdatesSorted {
3533 is_deleted: true,
3534 storage_nodes: vec![
3535 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3540 };
3541
3542 let mut storage_tries = B256Map::default();
3543 storage_tries.insert(storage_address1, storage_trie1);
3544 storage_tries.insert(storage_address2, storage_trie2);
3545
3546 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3547
3548 let num_written =
3550 provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3551
3552 assert_eq!(num_written, 7);
3559
3560 {
3562 let mut cursor =
3563 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3564
3565 let all_entries = cursor
3567 .walk_dup(Some(block_number), None)
3568 .unwrap()
3569 .collect::<Result<Vec<_>, _>>()
3570 .unwrap();
3571
3572 assert_eq!(
3574 all_entries,
3575 vec![
3576 (
3577 block_number,
3578 TrieChangeSetsEntry {
3579 nibbles: StoredNibblesSubKey(account_nibbles1),
3580 node: Some(node1),
3581 }
3582 ),
3583 (
3584 block_number,
3585 TrieChangeSetsEntry {
3586 nibbles: StoredNibblesSubKey(account_nibbles2),
3587 node: None,
3588 }
3589 ),
3590 ]
3591 );
3592 }
3593
3594 {
3596 let mut cursor =
3597 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3598
3599 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3601 let entries1 =
3602 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3603
3604 assert_eq!(
3605 entries1,
3606 vec![
3607 (
3608 key1,
3609 TrieChangeSetsEntry {
3610 nibbles: StoredNibblesSubKey(storage_nibbles1),
3611 node: Some(storage_node1_old), }
3613 ),
3614 (
3615 key1,
3616 TrieChangeSetsEntry {
3617 nibbles: StoredNibblesSubKey(storage_nibbles2),
3618 node: None, }
3620 ),
3621 ]
3622 );
3623
3624 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3626 let entries2 =
3627 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3628
3629 assert_eq!(
3630 entries2,
3631 vec![
3632 (
3633 key2,
3634 TrieChangeSetsEntry {
3635 nibbles: StoredNibblesSubKey(storage_nibbles1),
3636 node: Some(storage_node1), }
3638 ),
3639 (
3640 key2,
3641 TrieChangeSetsEntry {
3642 nibbles: StoredNibblesSubKey(storage_nibbles2),
3643 node: None, }
3645 ),
3646 (
3647 key2,
3648 TrieChangeSetsEntry {
3649 nibbles: StoredNibblesSubKey(storage_nibbles3),
3650 node: Some(storage_node2), }
3652 ),
3653 ]
3654 );
3655 }
3656
3657 provider_rw.commit().unwrap();
3658 }
3659
3660 #[test]
3661 fn test_write_trie_changesets_with_overlay() {
3662 use reth_db_api::models::BlockNumberHashedAddress;
3663 use reth_trie::BranchNodeCompact;
3664
3665 let factory = create_test_provider_factory();
3666 let provider_rw = factory.provider_rw().unwrap();
3667
3668 let block_number = 1u64;
3669
3670 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3672 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3673
3674 let node1 = BranchNodeCompact::new(
3675 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3681
3682 let node1_old = BranchNodeCompact::new(
3687 0b1010_1010_1010_1010, 0b0000_0000_0000_0000,
3689 0b0000_0000_0000_0000,
3690 vec![],
3691 None,
3692 );
3693
3694 let overlay_account_nodes = vec![
3696 (account_nibbles1, Some(node1_old.clone())), ];
3698
3699 let account_nodes = vec![
3701 (account_nibbles1, Some(node1)), (account_nibbles2, None), ];
3704
3705 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3710 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3711 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3712
3713 let storage_node1 = BranchNodeCompact::new(
3714 0b1111_0000_0000_0000,
3715 0b0000_0000_0000_0000,
3716 0b0000_0000_0000_0000,
3717 vec![],
3718 None,
3719 );
3720
3721 let storage_node2 = BranchNodeCompact::new(
3722 0b0000_1111_0000_0000,
3723 0b0000_0000_0000_0000,
3724 0b0000_0000_0000_0000,
3725 vec![],
3726 None,
3727 );
3728
3729 let storage_node1_old = BranchNodeCompact::new(
3731 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3733 0b0000_0000_0000_0000,
3734 vec![],
3735 None,
3736 );
3737
3738 let mut overlay_storage_tries = B256Map::default();
3740
3741 let overlay_storage_trie1 = StorageTrieUpdatesSorted {
3743 is_deleted: false,
3744 storage_nodes: vec![
3745 (storage_nibbles1, Some(storage_node1_old.clone())), ],
3748 };
3749
3750 let overlay_storage_trie2 = StorageTrieUpdatesSorted {
3752 is_deleted: false,
3753 storage_nodes: vec![
3754 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles3, Some(storage_node2.clone())), ],
3757 };
3758
3759 overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
3760 overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
3761
3762 let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
3763
3764 let storage_trie1 = StorageTrieUpdatesSorted {
3766 is_deleted: false,
3767 storage_nodes: vec![
3768 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3771 };
3772
3773 let storage_trie2 = StorageTrieUpdatesSorted {
3775 is_deleted: true,
3776 storage_nodes: vec![
3777 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3783 };
3784
3785 let mut storage_tries = B256Map::default();
3786 storage_tries.insert(storage_address1, storage_trie1);
3787 storage_tries.insert(storage_address2, storage_trie2);
3788
3789 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3790
3791 let num_written =
3793 provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
3794
3795 assert_eq!(num_written, 7);
3802
3803 {
3805 let mut cursor =
3806 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3807
3808 let all_entries = cursor
3810 .walk_dup(Some(block_number), None)
3811 .unwrap()
3812 .collect::<Result<Vec<_>, _>>()
3813 .unwrap();
3814
3815 assert_eq!(
3817 all_entries,
3818 vec![
3819 (
3820 block_number,
3821 TrieChangeSetsEntry {
3822 nibbles: StoredNibblesSubKey(account_nibbles1),
3823 node: Some(node1_old), }
3825 ),
3826 (
3827 block_number,
3828 TrieChangeSetsEntry {
3829 nibbles: StoredNibblesSubKey(account_nibbles2),
3830 node: None,
3831 }
3832 ),
3833 ]
3834 );
3835 }
3836
3837 {
3839 let mut cursor =
3840 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3841
3842 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3844 let entries1 =
3845 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3846
3847 assert_eq!(
3848 entries1,
3849 vec![
3850 (
3851 key1,
3852 TrieChangeSetsEntry {
3853 nibbles: StoredNibblesSubKey(storage_nibbles1),
3854 node: Some(storage_node1_old), }
3856 ),
3857 (
3858 key1,
3859 TrieChangeSetsEntry {
3860 nibbles: StoredNibblesSubKey(storage_nibbles2),
3861 node: None, }
3863 ),
3864 ]
3865 );
3866
3867 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3869 let entries2 =
3870 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3871
3872 assert_eq!(
3873 entries2,
3874 vec![
3875 (
3876 key2,
3877 TrieChangeSetsEntry {
3878 nibbles: StoredNibblesSubKey(storage_nibbles1),
3879 node: Some(storage_node1), }
3881 ),
3882 (
3883 key2,
3884 TrieChangeSetsEntry {
3885 nibbles: StoredNibblesSubKey(storage_nibbles2),
3886 node: None, }
3888 ),
3889 (
3890 key2,
3891 TrieChangeSetsEntry {
3892 nibbles: StoredNibblesSubKey(storage_nibbles3),
3893 node: Some(storage_node2), }
3896 ),
3897 ]
3898 );
3899 }
3900
3901 provider_rw.commit().unwrap();
3902 }
3903
3904 #[test]
3905 fn test_clear_trie_changesets_from() {
3906 use alloy_primitives::hex_literal::hex;
3907 use reth_db_api::models::BlockNumberHashedAddress;
3908 use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
3909
3910 let factory = create_test_provider_factory();
3911
3912 let block1 = 100u64;
3914 let block2 = 101u64;
3915 let block3 = 102u64;
3916 let block4 = 103u64;
3917 let block5 = 104u64;
3918
3919 let storage_address1 =
3921 B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
3922 let storage_address2 =
3923 B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
3924
3925 let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
3927 let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
3928 let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
3929
3930 let node1 = BranchNodeCompact::new(
3932 0b1111_1111_1111_1111,
3933 0b1111_1111_1111_1111,
3934 0b0000_0000_0000_0001,
3935 vec![B256::from(hex!(
3936 "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
3937 ))],
3938 None,
3939 );
3940 let node2 = BranchNodeCompact::new(
3941 0b1111_1111_1111_1110,
3942 0b1111_1111_1111_1110,
3943 0b0000_0000_0000_0010,
3944 vec![B256::from(hex!(
3945 "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
3946 ))],
3947 Some(B256::from(hex!(
3948 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
3949 ))),
3950 );
3951
3952 {
3954 let provider_rw = factory.provider_rw().unwrap();
3955 let mut cursor =
3956 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
3957
3958 cursor
3960 .upsert(
3961 block1,
3962 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3963 )
3964 .unwrap();
3965 cursor
3966 .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
3967 .unwrap();
3968
3969 cursor
3971 .upsert(
3972 block2,
3973 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
3974 )
3975 .unwrap();
3976 cursor
3977 .upsert(
3978 block2,
3979 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3980 )
3981 .unwrap(); cursor
3983 .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3984 .unwrap();
3985
3986 cursor
3988 .upsert(
3989 block3,
3990 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
3991 )
3992 .unwrap();
3993 cursor
3994 .upsert(
3995 block3,
3996 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
3997 )
3998 .unwrap();
3999
4000 cursor
4002 .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4003 .unwrap();
4004
4005 cursor
4007 .upsert(
4008 block5,
4009 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4010 )
4011 .unwrap();
4012 cursor
4013 .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4014 .unwrap();
4015
4016 provider_rw.commit().unwrap();
4017 }
4018
4019 {
4021 let provider_rw = factory.provider_rw().unwrap();
4022 let mut cursor =
4023 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4024
4025 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4027 cursor
4028 .upsert(
4029 key1_block1,
4030 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4031 )
4032 .unwrap();
4033 cursor
4034 .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4035 .unwrap();
4036
4037 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4040 cursor
4041 .upsert(
4042 key1_block2,
4043 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4044 )
4045 .unwrap();
4046 cursor
4047 .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4048 .unwrap(); cursor
4050 .upsert(
4051 key1_block2,
4052 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4053 )
4054 .unwrap();
4055
4056 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4058 cursor
4059 .upsert(
4060 key2_block3,
4061 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4062 )
4063 .unwrap();
4064 cursor
4065 .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4066 .unwrap();
4067
4068 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4070 cursor
4071 .upsert(
4072 key1_block4,
4073 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4074 )
4075 .unwrap();
4076 cursor
4077 .upsert(
4078 key1_block4,
4079 &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4080 )
4081 .unwrap(); let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4085 cursor
4086 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4087 .unwrap();
4088 cursor
4089 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4090 .unwrap();
4091
4092 provider_rw.commit().unwrap();
4093 }
4094
4095 {
4097 let provider_rw = factory.provider_rw().unwrap();
4098 provider_rw.clear_trie_changesets_from(block2).unwrap();
4099 provider_rw.commit().unwrap();
4100 }
4101
4102 {
4104 let provider = factory.provider().unwrap();
4105 let mut cursor =
4106 provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4107
4108 let block1_entries = cursor
4110 .walk_dup(Some(block1), None)
4111 .unwrap()
4112 .collect::<Result<Vec<_>, _>>()
4113 .unwrap();
4114 assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4115 assert_eq!(block1_entries[0].0, block1);
4116 assert_eq!(block1_entries[1].0, block1);
4117
4118 let block2_entries = cursor
4120 .walk_dup(Some(block2), None)
4121 .unwrap()
4122 .collect::<Result<Vec<_>, _>>()
4123 .unwrap();
4124 assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4125
4126 let block3_entries = cursor
4127 .walk_dup(Some(block3), None)
4128 .unwrap()
4129 .collect::<Result<Vec<_>, _>>()
4130 .unwrap();
4131 assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4132
4133 let block4_entries = cursor
4134 .walk_dup(Some(block4), None)
4135 .unwrap()
4136 .collect::<Result<Vec<_>, _>>()
4137 .unwrap();
4138 assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4139
4140 let block5_entries = cursor
4142 .walk_dup(Some(block5), None)
4143 .unwrap()
4144 .collect::<Result<Vec<_>, _>>()
4145 .unwrap();
4146 assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4147 }
4148
4149 {
4151 let provider = factory.provider().unwrap();
4152 let mut cursor =
4153 provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4154
4155 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4157 let block1_entries = cursor
4158 .walk_dup(Some(key1_block1), None)
4159 .unwrap()
4160 .collect::<Result<Vec<_>, _>>()
4161 .unwrap();
4162 assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4163
4164 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4166 let block2_entries = cursor
4167 .walk_dup(Some(key1_block2), None)
4168 .unwrap()
4169 .collect::<Result<Vec<_>, _>>()
4170 .unwrap();
4171 assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4172
4173 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4174 let block3_entries = cursor
4175 .walk_dup(Some(key2_block3), None)
4176 .unwrap()
4177 .collect::<Result<Vec<_>, _>>()
4178 .unwrap();
4179 assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4180
4181 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4182 let block4_entries = cursor
4183 .walk_dup(Some(key1_block4), None)
4184 .unwrap()
4185 .collect::<Result<Vec<_>, _>>()
4186 .unwrap();
4187 assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4188
4189 let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4191 let block5_entries = cursor
4192 .walk_dup(Some(key2_block5), None)
4193 .unwrap()
4194 .collect::<Result<Vec<_>, _>>()
4195 .unwrap();
4196 assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4197 }
4198 }
4199
4200 #[test]
4201 fn test_write_trie_updates_sorted() {
4202 use reth_trie::{
4203 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4204 BranchNodeCompact, StorageTrieEntry,
4205 };
4206
4207 let factory = create_test_provider_factory();
4208 let provider_rw = factory.provider_rw().unwrap();
4209
4210 {
4212 let tx = provider_rw.tx_ref();
4213 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4214
4215 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4217 cursor
4218 .upsert(
4219 to_delete,
4220 &BranchNodeCompact::new(
4221 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4225 None,
4226 ),
4227 )
4228 .unwrap();
4229
4230 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4232 cursor
4233 .upsert(
4234 to_update,
4235 &BranchNodeCompact::new(
4236 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4240 None,
4241 ),
4242 )
4243 .unwrap();
4244 }
4245
4246 let storage_address1 = B256::from([1u8; 32]);
4248 let storage_address2 = B256::from([2u8; 32]);
4249 {
4250 let tx = provider_rw.tx_ref();
4251 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4252
4253 storage_cursor
4255 .upsert(
4256 storage_address1,
4257 &StorageTrieEntry {
4258 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4259 node: BranchNodeCompact::new(
4260 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4262 0b0000_0000_0000_0000,
4263 vec![],
4264 None,
4265 ),
4266 },
4267 )
4268 .unwrap();
4269
4270 storage_cursor
4272 .upsert(
4273 storage_address2,
4274 &StorageTrieEntry {
4275 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4276 node: BranchNodeCompact::new(
4277 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4279 0b0000_0000_0000_0000,
4280 vec![],
4281 None,
4282 ),
4283 },
4284 )
4285 .unwrap();
4286 storage_cursor
4287 .upsert(
4288 storage_address2,
4289 &StorageTrieEntry {
4290 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4291 node: BranchNodeCompact::new(
4292 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4294 0b0000_0000_0000_0000,
4295 vec![],
4296 None,
4297 ),
4298 },
4299 )
4300 .unwrap();
4301 }
4302
4303 let account_nodes = vec![
4305 (
4306 Nibbles::from_nibbles([0x1, 0x2]),
4307 Some(BranchNodeCompact::new(
4308 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4312 None,
4313 )),
4314 ),
4315 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4317 Nibbles::from_nibbles([0x5, 0x6]),
4318 Some(BranchNodeCompact::new(
4319 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4323 None,
4324 )),
4325 ),
4326 ];
4327
4328 let storage_trie1 = StorageTrieUpdatesSorted {
4330 is_deleted: false,
4331 storage_nodes: vec![
4332 (
4333 Nibbles::from_nibbles([0x1, 0x0]),
4334 Some(BranchNodeCompact::new(
4335 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4339 None,
4340 )),
4341 ),
4342 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4344 };
4345
4346 let storage_trie2 = StorageTrieUpdatesSorted {
4347 is_deleted: true, storage_nodes: vec![],
4349 };
4350
4351 let mut storage_tries = B256Map::default();
4352 storage_tries.insert(storage_address1, storage_trie1);
4353 storage_tries.insert(storage_address2, storage_trie2);
4354
4355 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4356
4357 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4359
4360 assert_eq!(num_entries, 5);
4363
4364 let tx = provider_rw.tx_ref();
4366 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4367
4368 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4370 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4371 assert!(entry1.is_some(), "Updated account node should exist");
4372 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4373 assert_eq!(
4374 entry1.unwrap().1.state_mask,
4375 expected_mask,
4376 "Account node should have updated state_mask"
4377 );
4378
4379 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4381 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4382 assert!(entry2.is_none(), "Deleted account node should not exist");
4383
4384 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4386 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4387 assert!(entry3.is_some(), "New account node should exist");
4388
4389 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4391
4392 let storage_entries1: Vec<_> = storage_cursor
4394 .walk_dup(Some(storage_address1), None)
4395 .unwrap()
4396 .collect::<Result<Vec<_>, _>>()
4397 .unwrap();
4398 assert_eq!(
4399 storage_entries1.len(),
4400 1,
4401 "Storage address1 should have 1 entry after deletion"
4402 );
4403 assert_eq!(
4404 storage_entries1[0].1.nibbles.0,
4405 Nibbles::from_nibbles([0x1, 0x0]),
4406 "Remaining entry should be [0x1, 0x0]"
4407 );
4408
4409 let storage_entries2: Vec<_> = storage_cursor
4411 .walk_dup(Some(storage_address2), None)
4412 .unwrap()
4413 .collect::<Result<Vec<_>, _>>()
4414 .unwrap();
4415 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4416
4417 provider_rw.commit().unwrap();
4418 }
4419
4420 #[test]
4421 fn test_get_block_trie_updates() {
4422 use reth_db_api::models::BlockNumberHashedAddress;
4423 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4424
4425 let factory = create_test_provider_factory();
4426 let provider_rw = factory.provider_rw().unwrap();
4427
4428 let target_block = 2u64;
4429 let next_block = 3u64;
4430
4431 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4433 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4434 let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4435
4436 let node1 = BranchNodeCompact::new(
4437 0b1111_1111_0000_0000,
4438 0b0000_0000_0000_0000,
4439 0b0000_0000_0000_0000,
4440 vec![],
4441 None,
4442 );
4443
4444 let node2 = BranchNodeCompact::new(
4445 0b0000_0000_1111_1111,
4446 0b0000_0000_0000_0000,
4447 0b0000_0000_0000_0000,
4448 vec![],
4449 None,
4450 );
4451
4452 let node3 = BranchNodeCompact::new(
4453 0b1010_1010_1010_1010,
4454 0b0000_0000_0000_0000,
4455 0b0000_0000_0000_0000,
4456 vec![],
4457 None,
4458 );
4459
4460 {
4462 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4463 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4464 cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4465 }
4467
4468 {
4470 let mut cursor =
4471 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4472 cursor
4474 .append_dup(
4475 target_block,
4476 TrieChangeSetsEntry {
4477 nibbles: StoredNibblesSubKey(account_nibbles1),
4478 node: Some(BranchNodeCompact::new(
4479 0b1111_0000_0000_0000, 0b0000_0000_0000_0000,
4481 0b0000_0000_0000_0000,
4482 vec![],
4483 None,
4484 )),
4485 },
4486 )
4487 .unwrap();
4488 cursor
4490 .append_dup(
4491 target_block,
4492 TrieChangeSetsEntry {
4493 nibbles: StoredNibblesSubKey(account_nibbles2),
4494 node: None,
4495 },
4496 )
4497 .unwrap();
4498 }
4499
4500 {
4502 let mut cursor =
4503 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4504 cursor
4506 .append_dup(
4507 next_block,
4508 TrieChangeSetsEntry {
4509 nibbles: StoredNibblesSubKey(account_nibbles3),
4510 node: Some(node3),
4511 },
4512 )
4513 .unwrap();
4514 }
4515
4516 let storage_address1 = B256::from([1u8; 32]);
4518 let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4519 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4520
4521 let storage_node1 = BranchNodeCompact::new(
4522 0b1111_1111_1111_0000,
4523 0b0000_0000_0000_0000,
4524 0b0000_0000_0000_0000,
4525 vec![],
4526 None,
4527 );
4528
4529 let storage_node2 = BranchNodeCompact::new(
4530 0b0101_0101_0101_0101,
4531 0b0000_0000_0000_0000,
4532 0b0000_0000_0000_0000,
4533 vec![],
4534 None,
4535 );
4536
4537 {
4539 let mut cursor =
4540 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4541 cursor
4542 .upsert(
4543 storage_address1,
4544 &StorageTrieEntry {
4545 nibbles: StoredNibblesSubKey(storage_nibbles1),
4546 node: storage_node1.clone(),
4547 },
4548 )
4549 .unwrap();
4550 }
4552
4553 {
4555 let mut cursor =
4556 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4557 let key = BlockNumberHashedAddress((target_block, storage_address1));
4558
4559 cursor
4561 .append_dup(
4562 key,
4563 TrieChangeSetsEntry {
4564 nibbles: StoredNibblesSubKey(storage_nibbles1),
4565 node: Some(BranchNodeCompact::new(
4566 0b0000_0000_1111_1111, 0b0000_0000_0000_0000,
4568 0b0000_0000_0000_0000,
4569 vec![],
4570 None,
4571 )),
4572 },
4573 )
4574 .unwrap();
4575
4576 cursor
4578 .append_dup(
4579 key,
4580 TrieChangeSetsEntry {
4581 nibbles: StoredNibblesSubKey(storage_nibbles2),
4582 node: None,
4583 },
4584 )
4585 .unwrap();
4586 }
4587
4588 {
4590 let mut cursor =
4591 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4592 let key = BlockNumberHashedAddress((next_block, storage_address1));
4593
4594 cursor
4596 .append_dup(
4597 key,
4598 TrieChangeSetsEntry {
4599 nibbles: StoredNibblesSubKey(storage_nibbles2),
4600 node: Some(BranchNodeCompact::new(
4601 0b0101_0101_0101_0101, 0b0000_0000_0000_0000,
4603 0b0000_0000_0000_0000,
4604 vec![],
4605 None,
4606 )),
4607 },
4608 )
4609 .unwrap();
4610 }
4611
4612 provider_rw.commit().unwrap();
4613
4614 let provider = factory.provider().unwrap();
4616 let result = provider.get_block_trie_updates(target_block).unwrap();
4617
4618 assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4620
4621 let nibbles1_update = result
4623 .account_nodes_ref()
4624 .iter()
4625 .find(|(n, _)| n == &account_nibbles1)
4626 .expect("Should find nibbles1");
4627 assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4628 assert_eq!(
4629 nibbles1_update.1.as_ref().unwrap().state_mask,
4630 node1.state_mask,
4631 "nibbles1 should have current value"
4632 );
4633
4634 let nibbles2_update = result
4636 .account_nodes_ref()
4637 .iter()
4638 .find(|(n, _)| n == &account_nibbles2)
4639 .expect("Should find nibbles2");
4640 assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4641 assert_eq!(
4642 nibbles2_update.1.as_ref().unwrap().state_mask,
4643 node2.state_mask,
4644 "nibbles2 should have current value"
4645 );
4646
4647 assert!(
4649 !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4650 "nibbles3 should not be in target_block updates"
4651 );
4652
4653 assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4655 let storage_updates = result
4656 .storage_tries_ref()
4657 .get(&storage_address1)
4658 .expect("Should have storage updates for address1");
4659
4660 assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4661
4662 let storage1_update = storage_updates
4664 .storage_nodes
4665 .iter()
4666 .find(|(n, _)| n == &storage_nibbles1)
4667 .expect("Should find storage_nibbles1");
4668 assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4669 assert_eq!(
4670 storage1_update.1.as_ref().unwrap().state_mask,
4671 storage_node1.state_mask,
4672 "storage_nibbles1 should have current value"
4673 );
4674
4675 let storage2_update = storage_updates
4678 .storage_nodes
4679 .iter()
4680 .find(|(n, _)| n == &storage_nibbles2)
4681 .expect("Should find storage_nibbles2");
4682 assert!(
4683 storage2_update.1.is_some(),
4684 "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4685 );
4686 assert_eq!(
4687 storage2_update.1.as_ref().unwrap().state_mask,
4688 storage_node2.state_mask,
4689 "storage_nibbles2 should have the value that was created and will be deleted"
4690 );
4691 }
4692
4693 #[test]
4694 fn test_prunable_receipts_logic() {
4695 let insert_blocks =
4696 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4697 let mut rng = generators::rng();
4698 for block_num in 0..=tip_block {
4699 let block = random_block(
4700 &mut rng,
4701 block_num,
4702 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4703 );
4704 provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
4705 }
4706 };
4707
4708 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4709 let outcome = ExecutionOutcome {
4710 first_block: block,
4711 receipts: vec![vec![Receipt {
4712 tx_type: Default::default(),
4713 success: true,
4714 cumulative_gas_used: block, logs: vec![],
4716 }]],
4717 ..Default::default()
4718 };
4719 provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
4720 provider_rw.commit().unwrap();
4721 };
4722
4723 {
4725 let factory = create_test_provider_factory();
4726 let storage_settings = StorageSettings::legacy();
4727 factory.set_storage_settings_cache(storage_settings);
4728 let factory = factory.with_prune_modes(PruneModes {
4729 receipts: Some(PruneMode::Before(100)),
4730 ..Default::default()
4731 });
4732
4733 let tip_block = 200u64;
4734 let first_block = 1u64;
4735
4736 let provider_rw = factory.provider_rw().unwrap();
4738 insert_blocks(&provider_rw, tip_block, 1);
4739 provider_rw.commit().unwrap();
4740
4741 write_receipts(
4742 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4743 first_block,
4744 );
4745 write_receipts(
4746 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4747 tip_block - 1,
4748 );
4749
4750 let provider = factory.provider().unwrap();
4751
4752 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4753 assert!(provider
4754 .receipts_by_block(block.into())
4755 .unwrap()
4756 .is_some_and(|r| r.len() == num_receipts));
4757 }
4758 }
4759
4760 {
4762 let factory = create_test_provider_factory();
4763 let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
4764 factory.set_storage_settings_cache(storage_settings);
4765 let factory = factory.with_prune_modes(PruneModes {
4766 receipts: Some(PruneMode::Before(2)),
4767 ..Default::default()
4768 });
4769
4770 let tip_block = 200u64;
4771
4772 let provider_rw = factory.provider_rw().unwrap();
4774 insert_blocks(&provider_rw, tip_block, 1);
4775 provider_rw.commit().unwrap();
4776
4777 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4779 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4780
4781 assert!(factory
4782 .static_file_provider()
4783 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4784 .is_none(),);
4785 assert!(factory
4786 .static_file_provider()
4787 .get_highest_static_file_block(StaticFileSegment::Receipts)
4788 .is_some_and(|b| b == 1),);
4789
4790 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4793 assert!(factory
4794 .static_file_provider()
4795 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4796 .is_some_and(|num| num == 2),);
4797
4798 let factory = factory.with_prune_modes(PruneModes {
4802 receipts: Some(PruneMode::Before(100)),
4803 ..Default::default()
4804 });
4805 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4806 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4807 write_receipts(provider_rw, 3);
4808
4809 let provider = factory.provider().unwrap();
4814 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4815 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4816 assert!(provider
4817 .receipts_by_block(num.into())
4818 .unwrap()
4819 .is_some_and(|r| r.len() == num_receipts));
4820
4821 let receipt = provider.receipt(num).unwrap();
4822 if num_receipts > 0 {
4823 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4824 } else {
4825 assert!(receipt.is_none());
4826 }
4827 }
4828 }
4829 }
4830}