1use crate::{
2 changesets_utils::{
3 storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4 },
5 providers::{
6 database::{chain::ChainStorage, metrics},
7 static_file::StaticFileWriter,
8 NodeTypesForProvider, StaticFileProvider,
9 },
10 to_range,
11 traits::{
12 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
13 },
14 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
15 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
16 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
17 HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
18 OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
19 StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
20 StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
21 TransactionsProviderExt, TrieReader, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, B256Map, HashMap, HashSet},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use rayon::slice::ParallelSliceMut;
35use reth_chain_state::ExecutedBlock;
36use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
37use reth_db_api::{
38 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
39 database::Database,
40 models::{
41 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
42 BlockNumberHashedAddress, ShardedKey, StoredBlockBodyIndices,
43 },
44 table::Table,
45 tables,
46 transaction::{DbTx, DbTxMut},
47 BlockNumberList, PlainAccountState, PlainStorageState,
48};
49use reth_execution_types::{Chain, ExecutionOutcome};
50use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
51use reth_primitives_traits::{
52 Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
53};
54use reth_prune_types::{
55 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE, PRUNE_SEGMENTS,
56};
57use reth_stages_types::{StageCheckpoint, StageId};
58use reth_static_file_types::StaticFileSegment;
59use reth_storage_api::{
60 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
61 StorageChangeSetReader, TryIntoHistoricalStateProvider,
62};
63use reth_storage_errors::provider::ProviderResult;
64use reth_trie::{
65 trie_cursor::{
66 InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
67 TrieCursorIter,
68 },
69 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70 BranchNodeCompact, HashedPostStateSorted, Nibbles, StoredNibbles, StoredNibblesSubKey,
71 TrieChangeSetsEntry,
72};
73use reth_trie_db::{
74 DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
75};
76use revm_database::states::{
77 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
78};
79use std::{
80 cmp::Ordering,
81 collections::{BTreeMap, BTreeSet},
82 fmt::Debug,
83 ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeFrom, RangeInclusive},
84 sync::Arc,
85};
86use tracing::{debug, trace};
87
88pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
90
91#[derive(Debug)]
96pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
97 pub DatabaseProvider<<DB as Database>::TXMut, N>,
98);
99
100impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
101 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.0
111 }
112}
113
114impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
115 for DatabaseProviderRW<DB, N>
116{
117 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
118 &self.0
119 }
120}
121
122impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
123 pub fn commit(self) -> ProviderResult<bool> {
125 self.0.commit()
126 }
127
128 pub fn into_tx(self) -> <DB as Database>::TXMut {
130 self.0.into_tx()
131 }
132}
133
134impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
135 for DatabaseProvider<<DB as Database>::TXMut, N>
136{
137 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
138 provider.0
139 }
140}
141
142#[derive(Debug)]
145pub struct DatabaseProvider<TX, N: NodeTypes> {
146 tx: TX,
148 chain_spec: Arc<N::ChainSpec>,
150 static_file_provider: StaticFileProvider<N::Primitives>,
152 prune_modes: PruneModes,
154 storage: Arc<N::Storage>,
156}
157
158impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
159 pub const fn prune_modes_ref(&self) -> &PruneModes {
161 &self.prune_modes
162 }
163}
164
165impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
166 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
168 trace!(target: "providers::db", "Returning latest state provider");
169 Box::new(LatestStateProviderRef::new(self))
170 }
171
172 pub fn history_by_block_hash<'a>(
174 &'a self,
175 block_hash: BlockHash,
176 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
177 let mut block_number =
178 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
179 if block_number == self.best_block_number().unwrap_or_default() &&
180 block_number == self.last_block_number().unwrap_or_default()
181 {
182 return Ok(Box::new(LatestStateProviderRef::new(self)))
183 }
184
185 block_number += 1;
187
188 let account_history_prune_checkpoint =
189 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
190 let storage_history_prune_checkpoint =
191 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
192
193 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
194
195 if let Some(prune_checkpoint_block_number) =
198 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
199 {
200 state_provider = state_provider.with_lowest_available_account_history_block_number(
201 prune_checkpoint_block_number + 1,
202 );
203 }
204 if let Some(prune_checkpoint_block_number) =
205 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
206 {
207 state_provider = state_provider.with_lowest_available_storage_history_block_number(
208 prune_checkpoint_block_number + 1,
209 );
210 }
211
212 Ok(Box::new(state_provider))
213 }
214
215 #[cfg(feature = "test-utils")]
216 pub const fn set_prune_modes(&mut self, prune_modes: PruneModes) {
218 self.prune_modes = prune_modes;
219 }
220}
221
222impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
223 type Primitives = N::Primitives;
224}
225
226impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
227 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
229 self.static_file_provider.clone()
230 }
231}
232
233impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
234 for DatabaseProvider<TX, N>
235{
236 type ChainSpec = N::ChainSpec;
237
238 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
239 self.chain_spec.clone()
240 }
241}
242
243impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
244 pub const fn new_rw(
246 tx: TX,
247 chain_spec: Arc<N::ChainSpec>,
248 static_file_provider: StaticFileProvider<N::Primitives>,
249 prune_modes: PruneModes,
250 storage: Arc<N::Storage>,
251 ) -> Self {
252 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
253 }
254}
255
256impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
257 fn as_ref(&self) -> &Self {
258 self
259 }
260}
261
262impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
263 pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
265 if blocks.is_empty() {
266 debug!(target: "providers::db", "Attempted to write empty block range");
267 return Ok(())
268 }
269
270 let first_block = blocks.first().unwrap().recovered_block();
272
273 let last_block = blocks.last().unwrap().recovered_block();
274 let first_number = first_block.number();
275 let last_block_number = last_block.number();
276
277 debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
278
279 for ExecutedBlock { recovered_block, execution_output, hashed_state, trie_updates } in
289 blocks
290 {
291 let block_number = recovered_block.number();
292 self.insert_block(Arc::unwrap_or_clone(recovered_block))?;
293
294 self.write_state(&execution_output, OriginalValuesKnown::No)?;
297
298 self.write_hashed_state(&Arc::unwrap_or_clone(hashed_state).into_sorted())?;
300
301 let trie_updates_sorted = (*trie_updates).clone().into_sorted();
303 self.write_trie_changesets(block_number, &trie_updates_sorted, None)?;
304 self.write_trie_updates_sorted(&trie_updates_sorted)?;
305 }
306
307 self.update_history_indices(first_number..=last_block_number)?;
309
310 self.update_pipeline_stages(last_block_number, false)?;
312
313 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
314
315 Ok(())
316 }
317
318 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
323 let changed_accounts = self
324 .tx
325 .cursor_read::<tables::AccountChangeSets>()?
326 .walk_range(from..)?
327 .collect::<Result<Vec<_>, _>>()?;
328
329 self.unwind_account_hashing(changed_accounts.iter())?;
331
332 self.unwind_account_history_indices(changed_accounts.iter())?;
334
335 let storage_start = BlockNumberAddress((from, Address::ZERO));
336 let changed_storages = self
337 .tx
338 .cursor_read::<tables::StorageChangeSets>()?
339 .walk_range(storage_start..)?
340 .collect::<Result<Vec<_>, _>>()?;
341
342 self.unwind_storage_hashing(changed_storages.iter().copied())?;
344
345 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
347
348 let trie_revert = self.trie_reverts(from)?;
350 self.write_trie_updates_sorted(&trie_revert)?;
351
352 self.clear_trie_changesets_from(from)?;
354
355 Ok(())
356 }
357
358 fn remove_receipts_from(
360 &self,
361 from_tx: TxNumber,
362 last_block: BlockNumber,
363 ) -> ProviderResult<()> {
364 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
366
367 if !self.prune_modes.has_receipts_pruning() {
368 let static_file_receipt_num =
369 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
370
371 let to_delete = static_file_receipt_num
372 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
373 .unwrap_or_default();
374
375 self.static_file_provider
376 .latest_writer(StaticFileSegment::Receipts)?
377 .prune_receipts(to_delete, last_block)?;
378 }
379
380 Ok(())
381 }
382}
383
384impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
385 fn try_into_history_at_block(
386 self,
387 mut block_number: BlockNumber,
388 ) -> ProviderResult<StateProviderBox> {
389 if block_number == self.best_block_number().unwrap_or_default() {
392 return Ok(Box::new(LatestStateProvider::new(self)))
393 }
394
395 block_number += 1;
397
398 let account_history_prune_checkpoint =
399 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
400 let storage_history_prune_checkpoint =
401 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
402
403 let mut state_provider = HistoricalStateProvider::new(self, block_number);
404
405 if let Some(prune_checkpoint_block_number) =
408 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
409 {
410 state_provider = state_provider.with_lowest_available_account_history_block_number(
411 prune_checkpoint_block_number + 1,
412 );
413 }
414 if let Some(prune_checkpoint_block_number) =
415 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
416 {
417 state_provider = state_provider.with_lowest_available_storage_history_block_number(
418 prune_checkpoint_block_number + 1,
419 );
420 }
421
422 Ok(Box::new(state_provider))
423 }
424}
425
426fn unwind_history_shards<S, T, C>(
441 cursor: &mut C,
442 start_key: T::Key,
443 block_number: BlockNumber,
444 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
445) -> ProviderResult<Vec<u64>>
446where
447 T: Table<Value = BlockNumberList>,
448 T::Key: AsRef<ShardedKey<S>>,
449 C: DbCursorRO<T> + DbCursorRW<T>,
450{
451 let mut item = cursor.seek_exact(start_key)?;
453 while let Some((sharded_key, list)) = item {
454 if !shard_belongs_to_key(&sharded_key) {
456 break
457 }
458
459 cursor.delete_current()?;
462
463 let first = list.iter().next().expect("List can't be empty");
466
467 if first >= block_number {
470 item = cursor.prev()?;
471 continue
472 }
473 else if block_number <= sharded_key.as_ref().highest_block_number {
476 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
479 }
480 return Ok(list.iter().collect::<Vec<_>>())
483 }
484
485 Ok(Vec::new())
487}
488
489impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
490 pub const fn new(
492 tx: TX,
493 chain_spec: Arc<N::ChainSpec>,
494 static_file_provider: StaticFileProvider<N::Primitives>,
495 prune_modes: PruneModes,
496 storage: Arc<N::Storage>,
497 ) -> Self {
498 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
499 }
500
501 pub fn into_tx(self) -> TX {
503 self.tx
504 }
505
506 pub const fn tx_mut(&mut self) -> &mut TX {
508 &mut self.tx
509 }
510
511 pub const fn tx_ref(&self) -> &TX {
513 &self.tx
514 }
515
516 pub fn chain_spec(&self) -> &N::ChainSpec {
518 &self.chain_spec
519 }
520}
521
522impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
523 fn recovered_block<H, HF, B, BF>(
524 &self,
525 id: BlockHashOrNumber,
526 _transaction_kind: TransactionVariant,
527 header_by_number: HF,
528 construct_block: BF,
529 ) -> ProviderResult<Option<B>>
530 where
531 H: AsRef<HeaderTy<N>>,
532 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
533 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
534 {
535 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
536 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
537
538 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
545
546 let tx_range = body.tx_num_range();
547
548 let (transactions, senders) = if tx_range.is_empty() {
549 (vec![], vec![])
550 } else {
551 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
552 };
553
554 let body = self
555 .storage
556 .reader()
557 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
558 .pop()
559 .ok_or(ProviderError::InvalidStorageOutput)?;
560
561 construct_block(header, body, senders)
562 }
563
564 fn block_range<F, H, HF, R>(
574 &self,
575 range: RangeInclusive<BlockNumber>,
576 headers_range: HF,
577 mut assemble_block: F,
578 ) -> ProviderResult<Vec<R>>
579 where
580 H: AsRef<HeaderTy<N>>,
581 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
582 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
583 {
584 if range.is_empty() {
585 return Ok(Vec::new())
586 }
587
588 let len = range.end().saturating_sub(*range.start()) as usize;
589 let mut blocks = Vec::with_capacity(len);
590
591 let headers = headers_range(range.clone())?;
592
593 let present_headers = self
599 .block_body_indices_range(range)?
600 .into_iter()
601 .map(|b| b.tx_num_range())
602 .zip(headers)
603 .collect::<Vec<_>>();
604
605 let mut inputs = Vec::new();
606 for (tx_range, header) in &present_headers {
607 let transactions = if tx_range.is_empty() {
608 Vec::new()
609 } else {
610 self.transactions_by_tx_range(tx_range.clone())?
611 };
612
613 inputs.push((header.as_ref(), transactions));
614 }
615
616 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
617
618 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
619 blocks.push(assemble_block(header, body, tx_range)?);
620 }
621
622 Ok(blocks)
623 }
624
625 fn block_with_senders_range<H, HF, B, BF>(
636 &self,
637 range: RangeInclusive<BlockNumber>,
638 headers_range: HF,
639 assemble_block: BF,
640 ) -> ProviderResult<Vec<B>>
641 where
642 H: AsRef<HeaderTy<N>>,
643 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
644 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
645 {
646 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
647
648 self.block_range(range, headers_range, |header, body, tx_range| {
649 let senders = if tx_range.is_empty() {
650 Vec::new()
651 } else {
652 let known_senders =
654 senders_cursor
655 .walk_range(tx_range.clone())?
656 .collect::<Result<HashMap<_, _>, _>>()?;
657
658 let mut senders = Vec::with_capacity(body.transactions().len());
659 for (tx_num, tx) in tx_range.zip(body.transactions()) {
660 match known_senders.get(&tx_num) {
661 None => {
662 let sender = tx.recover_signer_unchecked()?;
664 senders.push(sender);
665 }
666 Some(sender) => senders.push(*sender),
667 }
668 }
669
670 senders
671 };
672
673 assemble_block(header, body, senders)
674 })
675 }
676
677 fn populate_bundle_state<A, S>(
681 &self,
682 account_changeset: Vec<(u64, AccountBeforeTx)>,
683 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
684 plain_accounts_cursor: &mut A,
685 plain_storage_cursor: &mut S,
686 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
687 where
688 A: DbCursorRO<PlainAccountState>,
689 S: DbDupCursorRO<PlainStorageState>,
690 {
691 let mut state: BundleStateInit = HashMap::default();
695
696 let mut reverts: RevertsInit = HashMap::default();
702
703 for (block_number, account_before) in account_changeset.into_iter().rev() {
705 let AccountBeforeTx { info: old_info, address } = account_before;
706 match state.entry(address) {
707 hash_map::Entry::Vacant(entry) => {
708 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
709 entry.insert((old_info, new_info, HashMap::default()));
710 }
711 hash_map::Entry::Occupied(mut entry) => {
712 entry.get_mut().0 = old_info;
714 }
715 }
716 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
718 }
719
720 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
722 let BlockNumberAddress((block_number, address)) = block_and_address;
723 let account_state = match state.entry(address) {
725 hash_map::Entry::Vacant(entry) => {
726 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
727 entry.insert((present_info, present_info, HashMap::default()))
728 }
729 hash_map::Entry::Occupied(entry) => entry.into_mut(),
730 };
731
732 match account_state.2.entry(old_storage.key) {
734 hash_map::Entry::Vacant(entry) => {
735 let new_storage = plain_storage_cursor
736 .seek_by_key_subkey(address, old_storage.key)?
737 .filter(|storage| storage.key == old_storage.key)
738 .unwrap_or_default();
739 entry.insert((old_storage.value, new_storage.value));
740 }
741 hash_map::Entry::Occupied(mut entry) => {
742 entry.get_mut().0 = old_storage.value;
743 }
744 };
745
746 reverts
747 .entry(block_number)
748 .or_default()
749 .entry(address)
750 .or_default()
751 .1
752 .push(old_storage);
753 }
754
755 Ok((state, reverts))
756 }
757}
758
759impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
760 fn take_shard<T>(
763 &self,
764 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
765 key: T::Key,
766 ) -> ProviderResult<Vec<u64>>
767 where
768 T: Table<Value = BlockNumberList>,
769 {
770 if let Some((_, list)) = cursor.seek_exact(key)? {
771 cursor.delete_current()?;
773 let list = list.iter().collect::<Vec<_>>();
774 return Ok(list)
775 }
776 Ok(Vec::new())
777 }
778
779 fn append_history_index<P, T>(
787 &self,
788 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
789 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
790 ) -> ProviderResult<()>
791 where
792 P: Copy,
793 T: Table<Value = BlockNumberList>,
794 {
795 let mut cursor = self.tx.cursor_write::<T>()?;
796 for (partial_key, indices) in index_updates {
797 let mut last_shard =
798 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
799 last_shard.extend(indices);
800 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
802 while let Some(list) = chunks.next() {
803 let highest_block_number = if chunks.peek().is_some() {
804 *list.last().expect("`chunks` does not return empty list")
805 } else {
806 u64::MAX
808 };
809 cursor.insert(
810 sharded_key_factory(partial_key, highest_block_number),
811 &BlockNumberList::new_pre_sorted(list.iter().copied()),
812 )?;
813 }
814 }
815 Ok(())
816 }
817}
818
819impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
820 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
821 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
822 }
823}
824
825impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
826 fn changed_accounts_with_range(
827 &self,
828 range: impl RangeBounds<BlockNumber>,
829 ) -> ProviderResult<BTreeSet<Address>> {
830 self.tx
831 .cursor_read::<tables::AccountChangeSets>()?
832 .walk_range(range)?
833 .map(|entry| {
834 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
835 })
836 .collect()
837 }
838
839 fn basic_accounts(
840 &self,
841 iter: impl IntoIterator<Item = Address>,
842 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
843 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
844 Ok(iter
845 .into_iter()
846 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
847 .collect::<Result<Vec<_>, _>>()?)
848 }
849
850 fn changed_accounts_and_blocks_with_range(
851 &self,
852 range: RangeInclusive<BlockNumber>,
853 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
854 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
855
856 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
857 BTreeMap::new(),
858 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
859 let (index, account) = entry?;
860 accounts.entry(account.address).or_default().push(index);
861 Ok(accounts)
862 },
863 )?;
864
865 Ok(account_transitions)
866 }
867}
868
869impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
870 fn storage_changeset(
871 &self,
872 block_number: BlockNumber,
873 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
874 let range = block_number..=block_number;
875 let storage_range = BlockNumberAddress::range(range);
876 self.tx
877 .cursor_dup_read::<tables::StorageChangeSets>()?
878 .walk_range(storage_range)?
879 .map(|result| -> ProviderResult<_> { Ok(result?) })
880 .collect()
881 }
882}
883
884impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
885 fn account_block_changeset(
886 &self,
887 block_number: BlockNumber,
888 ) -> ProviderResult<Vec<AccountBeforeTx>> {
889 let range = block_number..=block_number;
890 self.tx
891 .cursor_read::<tables::AccountChangeSets>()?
892 .walk_range(range)?
893 .map(|result| -> ProviderResult<_> {
894 let (_, account_before) = result?;
895 Ok(account_before)
896 })
897 .collect()
898 }
899
900 fn get_account_before_block(
901 &self,
902 block_number: BlockNumber,
903 address: Address,
904 ) -> ProviderResult<Option<AccountBeforeTx>> {
905 self.tx
906 .cursor_dup_read::<tables::AccountChangeSets>()?
907 .seek_by_key_subkey(block_number, address)?
908 .filter(|acc| acc.address == address)
909 .map(Ok)
910 .transpose()
911 }
912}
913
914impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
915 for DatabaseProvider<TX, N>
916{
917 type Header = HeaderTy<N>;
918
919 fn local_tip_header(
920 &self,
921 highest_uninterrupted_block: BlockNumber,
922 ) -> ProviderResult<SealedHeader<Self::Header>> {
923 let static_file_provider = self.static_file_provider();
924
925 let next_static_file_block_num = static_file_provider
928 .get_highest_static_file_block(StaticFileSegment::Headers)
929 .map(|id| id + 1)
930 .unwrap_or_default();
931 let next_block = highest_uninterrupted_block + 1;
932
933 match next_static_file_block_num.cmp(&next_block) {
934 Ordering::Greater => {
937 let mut static_file_producer =
938 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
939 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
940 static_file_producer.commit()?
943 }
944 Ordering::Less => {
945 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
947 }
948 Ordering::Equal => {}
949 }
950
951 let local_head = static_file_provider
952 .sealed_header(highest_uninterrupted_block)?
953 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
954
955 Ok(local_head)
956 }
957}
958
959impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
960 type Header = HeaderTy<N>;
961
962 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
963 if let Some(num) = self.block_number(block_hash)? {
964 Ok(self.header_by_number(num)?)
965 } else {
966 Ok(None)
967 }
968 }
969
970 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
971 self.static_file_provider.header_by_number(num)
972 }
973
974 fn headers_range(
975 &self,
976 range: impl RangeBounds<BlockNumber>,
977 ) -> ProviderResult<Vec<Self::Header>> {
978 self.static_file_provider.headers_range(range)
979 }
980
981 fn sealed_header(
982 &self,
983 number: BlockNumber,
984 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
985 self.static_file_provider.sealed_header(number)
986 }
987
988 fn sealed_headers_while(
989 &self,
990 range: impl RangeBounds<BlockNumber>,
991 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
992 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
993 self.static_file_provider.sealed_headers_while(range, predicate)
994 }
995}
996
997impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
998 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
999 self.static_file_provider.block_hash(number)
1000 }
1001
1002 fn canonical_hashes_range(
1003 &self,
1004 start: BlockNumber,
1005 end: BlockNumber,
1006 ) -> ProviderResult<Vec<B256>> {
1007 self.static_file_provider.canonical_hashes_range(start, end)
1008 }
1009}
1010
1011impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1012 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1013 let best_number = self.best_block_number()?;
1014 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1015 Ok(ChainInfo { best_hash, best_number })
1016 }
1017
1018 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1019 Ok(self
1022 .get_stage_checkpoint(StageId::Finish)?
1023 .map(|checkpoint| checkpoint.block_number)
1024 .unwrap_or_default())
1025 }
1026
1027 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1028 self.static_file_provider.last_block_number()
1029 }
1030
1031 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1032 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1033 }
1034}
1035
1036impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1037 type Block = BlockTy<N>;
1038
1039 fn find_block_by_hash(
1040 &self,
1041 hash: B256,
1042 source: BlockSource,
1043 ) -> ProviderResult<Option<Self::Block>> {
1044 if source.is_canonical() {
1045 self.block(hash.into())
1046 } else {
1047 Ok(None)
1048 }
1049 }
1050
1051 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1057 if let Some(number) = self.convert_hash_or_number(id)? &&
1058 let Some(header) = self.header_by_number(number)?
1059 {
1060 let Some(transactions) = self.transactions_by_block(number.into())? else {
1065 return Ok(None)
1066 };
1067
1068 let body = self
1069 .storage
1070 .reader()
1071 .read_block_bodies(self, vec![(&header, transactions)])?
1072 .pop()
1073 .ok_or(ProviderError::InvalidStorageOutput)?;
1074
1075 return Ok(Some(Self::Block::new(header, body)))
1076 }
1077
1078 Ok(None)
1079 }
1080
1081 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1082 Ok(None)
1083 }
1084
1085 fn pending_block_and_receipts(
1086 &self,
1087 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1088 Ok(None)
1089 }
1090
1091 fn recovered_block(
1100 &self,
1101 id: BlockHashOrNumber,
1102 transaction_kind: TransactionVariant,
1103 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1104 self.recovered_block(
1105 id,
1106 transaction_kind,
1107 |block_number| self.header_by_number(block_number),
1108 |header, body, senders| {
1109 Self::Block::new(header, body)
1110 .try_into_recovered_unchecked(senders)
1114 .map(Some)
1115 .map_err(|_| ProviderError::SenderRecoveryError)
1116 },
1117 )
1118 }
1119
1120 fn sealed_block_with_senders(
1121 &self,
1122 id: BlockHashOrNumber,
1123 transaction_kind: TransactionVariant,
1124 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1125 self.recovered_block(
1126 id,
1127 transaction_kind,
1128 |block_number| self.sealed_header(block_number),
1129 |header, body, senders| {
1130 Self::Block::new_sealed(header, body)
1131 .try_with_senders_unchecked(senders)
1135 .map(Some)
1136 .map_err(|_| ProviderError::SenderRecoveryError)
1137 },
1138 )
1139 }
1140
1141 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1142 self.block_range(
1143 range,
1144 |range| self.headers_range(range),
1145 |header, body, _| Ok(Self::Block::new(header, body)),
1146 )
1147 }
1148
1149 fn block_with_senders_range(
1150 &self,
1151 range: RangeInclusive<BlockNumber>,
1152 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1153 self.block_with_senders_range(
1154 range,
1155 |range| self.headers_range(range),
1156 |header, body, senders| {
1157 Self::Block::new(header, body)
1158 .try_into_recovered_unchecked(senders)
1159 .map_err(|_| ProviderError::SenderRecoveryError)
1160 },
1161 )
1162 }
1163
1164 fn recovered_block_range(
1165 &self,
1166 range: RangeInclusive<BlockNumber>,
1167 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1168 self.block_with_senders_range(
1169 range,
1170 |range| self.sealed_headers_range(range),
1171 |header, body, senders| {
1172 Self::Block::new_sealed(header, body)
1173 .try_with_senders(senders)
1174 .map_err(|_| ProviderError::SenderRecoveryError)
1175 },
1176 )
1177 }
1178
1179 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1180 Ok(self
1181 .tx
1182 .cursor_read::<tables::TransactionBlocks>()?
1183 .seek(id)
1184 .map(|b| b.map(|(_, bn)| bn))?)
1185 }
1186}
1187
1188impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1189 for DatabaseProvider<TX, N>
1190{
1191 fn transaction_hashes_by_range(
1194 &self,
1195 tx_range: Range<TxNumber>,
1196 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1197 self.static_file_provider.transaction_hashes_by_range(tx_range)
1198 }
1199}
1200
1201impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1203 type Transaction = TxTy<N>;
1204
1205 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1206 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1207 }
1208
1209 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1210 self.static_file_provider.transaction_by_id(id)
1211 }
1212
1213 fn transaction_by_id_unhashed(
1214 &self,
1215 id: TxNumber,
1216 ) -> ProviderResult<Option<Self::Transaction>> {
1217 self.static_file_provider.transaction_by_id_unhashed(id)
1218 }
1219
1220 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1221 if let Some(id) = self.transaction_id(hash)? {
1222 Ok(self.transaction_by_id_unhashed(id)?)
1223 } else {
1224 Ok(None)
1225 }
1226 }
1227
1228 fn transaction_by_hash_with_meta(
1229 &self,
1230 tx_hash: TxHash,
1231 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1232 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1233 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1234 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1235 let Some(sealed_header) = self.sealed_header(block_number)?
1236 {
1237 let (header, block_hash) = sealed_header.split();
1238 if let Some(block_body) = self.block_body_indices(block_number)? {
1239 let index = transaction_id - block_body.first_tx_num();
1244
1245 let meta = TransactionMeta {
1246 tx_hash,
1247 index,
1248 block_hash,
1249 block_number,
1250 base_fee: header.base_fee_per_gas(),
1251 excess_blob_gas: header.excess_blob_gas(),
1252 timestamp: header.timestamp(),
1253 };
1254
1255 return Ok(Some((transaction, meta)))
1256 }
1257 }
1258
1259 Ok(None)
1260 }
1261
1262 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1263 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1264 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1265 }
1266
1267 fn transactions_by_block(
1268 &self,
1269 id: BlockHashOrNumber,
1270 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1271 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1272 let Some(body) = self.block_body_indices(block_number)?
1273 {
1274 let tx_range = body.tx_num_range();
1275 return if tx_range.is_empty() {
1276 Ok(Some(Vec::new()))
1277 } else {
1278 self.transactions_by_tx_range(tx_range).map(Some)
1279 }
1280 }
1281 Ok(None)
1282 }
1283
1284 fn transactions_by_block_range(
1285 &self,
1286 range: impl RangeBounds<BlockNumber>,
1287 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1288 let range = to_range(range);
1289
1290 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1291 .into_iter()
1292 .map(|body| {
1293 let tx_num_range = body.tx_num_range();
1294 if tx_num_range.is_empty() {
1295 Ok(Vec::new())
1296 } else {
1297 self.transactions_by_tx_range(tx_num_range)
1298 }
1299 })
1300 .collect()
1301 }
1302
1303 fn transactions_by_tx_range(
1304 &self,
1305 range: impl RangeBounds<TxNumber>,
1306 ) -> ProviderResult<Vec<Self::Transaction>> {
1307 self.static_file_provider.transactions_by_tx_range(range)
1308 }
1309
1310 fn senders_by_tx_range(
1311 &self,
1312 range: impl RangeBounds<TxNumber>,
1313 ) -> ProviderResult<Vec<Address>> {
1314 self.cursor_read_collect::<tables::TransactionSenders>(range)
1315 }
1316
1317 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1318 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1319 }
1320}
1321
1322impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1323 type Receipt = ReceiptTy<N>;
1324
1325 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1326 self.static_file_provider.get_with_static_file_or_database(
1327 StaticFileSegment::Receipts,
1328 id,
1329 |static_file| static_file.receipt(id),
1330 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1331 )
1332 }
1333
1334 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1335 if let Some(id) = self.transaction_id(hash)? {
1336 self.receipt(id)
1337 } else {
1338 Ok(None)
1339 }
1340 }
1341
1342 fn receipts_by_block(
1343 &self,
1344 block: BlockHashOrNumber,
1345 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1346 if let Some(number) = self.convert_hash_or_number(block)? &&
1347 let Some(body) = self.block_body_indices(number)?
1348 {
1349 let tx_range = body.tx_num_range();
1350 return if tx_range.is_empty() {
1351 Ok(Some(Vec::new()))
1352 } else {
1353 self.receipts_by_tx_range(tx_range).map(Some)
1354 }
1355 }
1356 Ok(None)
1357 }
1358
1359 fn receipts_by_tx_range(
1360 &self,
1361 range: impl RangeBounds<TxNumber>,
1362 ) -> ProviderResult<Vec<Self::Receipt>> {
1363 self.static_file_provider.get_range_with_static_file_or_database(
1364 StaticFileSegment::Receipts,
1365 to_range(range),
1366 |static_file, range, _| static_file.receipts_by_tx_range(range),
1367 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1368 |_| true,
1369 )
1370 }
1371
1372 fn receipts_by_block_range(
1373 &self,
1374 block_range: RangeInclusive<BlockNumber>,
1375 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1376 if block_range.is_empty() {
1377 return Ok(Vec::new());
1378 }
1379
1380 let mut block_body_indices = Vec::new();
1382 for block_num in block_range {
1383 if let Some(indices) = self.block_body_indices(block_num)? {
1384 block_body_indices.push(indices);
1385 } else {
1386 block_body_indices.push(StoredBlockBodyIndices::default());
1388 }
1389 }
1390
1391 if block_body_indices.is_empty() {
1392 return Ok(Vec::new());
1393 }
1394
1395 let non_empty_blocks: Vec<_> =
1397 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1398
1399 if non_empty_blocks.is_empty() {
1400 return Ok(vec![Vec::new(); block_body_indices.len()]);
1402 }
1403
1404 let first_tx = non_empty_blocks[0].first_tx_num();
1406 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1407
1408 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1410 let mut receipts_iter = all_receipts.into_iter();
1411
1412 let mut result = Vec::with_capacity(block_body_indices.len());
1414 for indices in &block_body_indices {
1415 if indices.tx_count == 0 {
1416 result.push(Vec::new());
1417 } else {
1418 let block_receipts =
1419 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1420 result.push(block_receipts);
1421 }
1422 }
1423
1424 Ok(result)
1425 }
1426}
1427
1428impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1429 for DatabaseProvider<TX, N>
1430{
1431 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1432 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1433 }
1434
1435 fn block_body_indices_range(
1436 &self,
1437 range: RangeInclusive<BlockNumber>,
1438 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1439 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1440 }
1441}
1442
1443impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1444 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1445 Ok(if let Some(encoded) = id.get_pre_encoded() {
1446 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1447 } else {
1448 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1449 })
1450 }
1451
1452 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1454 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1455 }
1456
1457 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1458 self.tx
1459 .cursor_read::<tables::StageCheckpoints>()?
1460 .walk(None)?
1461 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1462 .map_err(ProviderError::Database)
1463 }
1464}
1465
1466impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1467 fn save_stage_checkpoint(
1469 &self,
1470 id: StageId,
1471 checkpoint: StageCheckpoint,
1472 ) -> ProviderResult<()> {
1473 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1474 }
1475
1476 fn save_stage_checkpoint_progress(
1478 &self,
1479 id: StageId,
1480 checkpoint: Vec<u8>,
1481 ) -> ProviderResult<()> {
1482 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1483 }
1484
1485 fn update_pipeline_stages(
1486 &self,
1487 block_number: BlockNumber,
1488 drop_stage_checkpoint: bool,
1489 ) -> ProviderResult<()> {
1490 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1492 for stage_id in StageId::ALL {
1493 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1494 cursor.upsert(
1495 stage_id.to_string(),
1496 &StageCheckpoint {
1497 block_number,
1498 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1499 },
1500 )?;
1501 }
1502
1503 Ok(())
1504 }
1505}
1506
1507impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1508 fn plain_state_storages(
1509 &self,
1510 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1511 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1512 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1513
1514 addresses_with_keys
1515 .into_iter()
1516 .map(|(address, storage)| {
1517 storage
1518 .into_iter()
1519 .map(|key| -> ProviderResult<_> {
1520 Ok(plain_storage
1521 .seek_by_key_subkey(address, key)?
1522 .filter(|v| v.key == key)
1523 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1524 })
1525 .collect::<ProviderResult<Vec<_>>>()
1526 .map(|storage| (address, storage))
1527 })
1528 .collect::<ProviderResult<Vec<(_, _)>>>()
1529 }
1530
1531 fn changed_storages_with_range(
1532 &self,
1533 range: RangeInclusive<BlockNumber>,
1534 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1535 self.tx
1536 .cursor_read::<tables::StorageChangeSets>()?
1537 .walk_range(BlockNumberAddress::range(range))?
1538 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1541 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1542 accounts.entry(address).or_default().insert(storage_entry.key);
1543 Ok(accounts)
1544 })
1545 }
1546
1547 fn changed_storages_and_blocks_with_range(
1548 &self,
1549 range: RangeInclusive<BlockNumber>,
1550 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1551 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1552
1553 let storage_changeset_lists =
1554 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1555 BTreeMap::new(),
1556 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1557 let (index, storage) = entry?;
1558 storages
1559 .entry((index.address(), storage.key))
1560 .or_default()
1561 .push(index.block_number());
1562 Ok(storages)
1563 },
1564 )?;
1565
1566 Ok(storage_changeset_lists)
1567 }
1568}
1569
1570impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1571 for DatabaseProvider<TX, N>
1572{
1573 type Receipt = ReceiptTy<N>;
1574
1575 fn write_state(
1576 &self,
1577 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1578 is_value_known: OriginalValuesKnown,
1579 ) -> ProviderResult<()> {
1580 let first_block = execution_outcome.first_block();
1581 let block_count = execution_outcome.len() as u64;
1582 let last_block = execution_outcome.last_block();
1583 let block_range = first_block..=last_block;
1584
1585 let tip = self.last_block_number()?.max(last_block);
1586
1587 let (plain_state, reverts) =
1588 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1589
1590 self.write_state_reverts(reverts, first_block)?;
1591 self.write_state_changes(plain_state)?;
1592
1593 let block_indices: Vec<_> = self
1595 .block_body_indices_range(block_range)?
1596 .into_iter()
1597 .map(|b| b.first_tx_num)
1598 .collect();
1599
1600 if block_indices.len() < block_count as usize {
1602 let missing_blocks = block_count - block_indices.len() as u64;
1603 return Err(ProviderError::BlockBodyIndicesNotFound(
1604 last_block.saturating_sub(missing_blocks - 1),
1605 ));
1606 }
1607
1608 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1609
1610 let mut receipts_cursor = self.tx.cursor_write::<tables::Receipts<Self::Receipt>>()?;
1615
1616 let mut receipts_static_writer = has_receipts_pruning
1620 .not()
1621 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1622 .transpose()?;
1623
1624 let prunable_receipts =
1627 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1628
1629 for (idx, (receipts, first_tx_index)) in
1630 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1631 {
1632 let block_number = first_block + idx as u64;
1633
1634 if let Some(writer) = receipts_static_writer.as_mut() {
1636 writer.increment_block(block_number)?;
1637 }
1638
1639 if prunable_receipts &&
1641 self.prune_modes
1642 .receipts
1643 .is_some_and(|mode| mode.should_prune(block_number, tip))
1644 {
1645 continue
1646 }
1647
1648 for (idx, receipt) in receipts.iter().enumerate() {
1649 let receipt_idx = first_tx_index + idx as u64;
1650
1651 if let Some(writer) = &mut receipts_static_writer {
1652 writer.append_receipt(receipt_idx, receipt)?;
1653 } else {
1654 receipts_cursor.append(receipt_idx, receipt)?;
1655 }
1656 }
1657 }
1658
1659 Ok(())
1660 }
1661
1662 fn write_state_reverts(
1663 &self,
1664 reverts: PlainStateReverts,
1665 first_block: BlockNumber,
1666 ) -> ProviderResult<()> {
1667 tracing::trace!("Writing storage changes");
1669 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1670 let mut storage_changeset_cursor =
1671 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1672 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1673 let block_number = first_block + block_index as BlockNumber;
1674
1675 tracing::trace!(block_number, "Writing block change");
1676 storage_changes.par_sort_unstable_by_key(|a| a.address);
1678 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1679 let storage_id = BlockNumberAddress((block_number, address));
1680
1681 let mut storage = storage_revert
1682 .into_iter()
1683 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1684 .collect::<Vec<_>>();
1685 storage.par_sort_unstable_by_key(|a| a.0);
1687
1688 let mut wiped_storage = Vec::new();
1696 if wiped {
1697 tracing::trace!(?address, "Wiping storage");
1698 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1699 wiped_storage.push((entry.key, entry.value));
1700 while let Some(entry) = storages_cursor.next_dup_val()? {
1701 wiped_storage.push((entry.key, entry.value))
1702 }
1703 }
1704 }
1705
1706 tracing::trace!(?address, ?storage, "Writing storage reverts");
1707 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1708 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1709 }
1710 }
1711 }
1712
1713 tracing::trace!("Writing account changes");
1715 let mut account_changeset_cursor =
1716 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1717
1718 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1719 let block_number = first_block + block_index as BlockNumber;
1720 account_block_reverts.par_sort_by_key(|a| a.0);
1722
1723 for (address, info) in account_block_reverts {
1724 account_changeset_cursor.append_dup(
1725 block_number,
1726 AccountBeforeTx { address, info: info.map(Into::into) },
1727 )?;
1728 }
1729 }
1730
1731 Ok(())
1732 }
1733
1734 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1735 changes.accounts.par_sort_by_key(|a| a.0);
1738 changes.storage.par_sort_by_key(|a| a.address);
1739 changes.contracts.par_sort_by_key(|a| a.0);
1740
1741 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1743 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1744 for (address, account) in changes.accounts {
1746 if let Some(account) = account {
1747 tracing::trace!(?address, "Updating plain state account");
1748 accounts_cursor.upsert(address, &account.into())?;
1749 } else if accounts_cursor.seek_exact(address)?.is_some() {
1750 tracing::trace!(?address, "Deleting plain state account");
1751 accounts_cursor.delete_current()?;
1752 }
1753 }
1754
1755 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1757 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1758 for (hash, bytecode) in changes.contracts {
1759 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1760 }
1761
1762 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1764 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1765 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1766 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1768 storages_cursor.delete_current_duplicates()?;
1769 }
1770 let mut storage = storage
1772 .into_iter()
1773 .map(|(k, value)| StorageEntry { key: k.into(), value })
1774 .collect::<Vec<_>>();
1775 storage.par_sort_unstable_by_key(|a| a.key);
1777
1778 for entry in storage {
1779 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1780 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1781 db_entry.key == entry.key
1782 {
1783 storages_cursor.delete_current()?;
1784 }
1785
1786 if !entry.value.is_zero() {
1787 storages_cursor.upsert(address, &entry)?;
1788 }
1789 }
1790 }
1791
1792 Ok(())
1793 }
1794
1795 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1796 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1798 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
1799 if let Some(account) = account {
1800 hashed_accounts_cursor.upsert(hashed_address, &account)?;
1801 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
1802 hashed_accounts_cursor.delete_current()?;
1803 }
1804 }
1805
1806 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1808 let mut hashed_storage_cursor =
1809 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1810 for (hashed_address, storage) in sorted_storages {
1811 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1812 hashed_storage_cursor.delete_current_duplicates()?;
1813 }
1814
1815 for (hashed_slot, value) in storage.storage_slots_sorted() {
1816 let entry = StorageEntry { key: hashed_slot, value };
1817 if let Some(db_entry) =
1818 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1819 db_entry.key == entry.key
1820 {
1821 hashed_storage_cursor.delete_current()?;
1822 }
1823
1824 if !entry.value.is_zero() {
1825 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1826 }
1827 }
1828 }
1829
1830 Ok(())
1831 }
1832
1833 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1855 let range = block + 1..=self.last_block_number()?;
1856
1857 if range.is_empty() {
1858 return Ok(());
1859 }
1860
1861 let block_bodies = self.block_body_indices_range(range.clone())?;
1863
1864 let from_transaction_num =
1866 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1867
1868 let storage_range = BlockNumberAddress::range(range.clone());
1869
1870 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1871 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1872
1873 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1878 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1879
1880 let (state, _) = self.populate_bundle_state(
1881 account_changeset,
1882 storage_changeset,
1883 &mut plain_accounts_cursor,
1884 &mut plain_storage_cursor,
1885 )?;
1886
1887 for (address, (old_account, new_account, storage)) in &state {
1889 if old_account != new_account {
1891 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1892 if let Some(account) = old_account {
1893 plain_accounts_cursor.upsert(*address, account)?;
1894 } else if existing_entry.is_some() {
1895 plain_accounts_cursor.delete_current()?;
1896 }
1897 }
1898
1899 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1901 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1902 if plain_storage_cursor
1904 .seek_by_key_subkey(*address, *storage_key)?
1905 .filter(|s| s.key == *storage_key)
1906 .is_some()
1907 {
1908 plain_storage_cursor.delete_current()?
1909 }
1910
1911 if !old_storage_value.is_zero() {
1913 plain_storage_cursor.upsert(*address, &storage_entry)?;
1914 }
1915 }
1916 }
1917
1918 self.remove_receipts_from(from_transaction_num, block)?;
1919
1920 Ok(())
1921 }
1922
1923 fn take_state_above(
1945 &self,
1946 block: BlockNumber,
1947 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
1948 let range = block + 1..=self.last_block_number()?;
1949
1950 if range.is_empty() {
1951 return Ok(ExecutionOutcome::default())
1952 }
1953 let start_block_number = *range.start();
1954
1955 let block_bodies = self.block_body_indices_range(range.clone())?;
1957
1958 let from_transaction_num =
1960 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1961 let to_transaction_num =
1962 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
1963
1964 let storage_range = BlockNumberAddress::range(range.clone());
1965
1966 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1967 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1968
1969 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1974 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1975
1976 let (state, reverts) = self.populate_bundle_state(
1979 account_changeset,
1980 storage_changeset,
1981 &mut plain_accounts_cursor,
1982 &mut plain_storage_cursor,
1983 )?;
1984
1985 for (address, (old_account, new_account, storage)) in &state {
1987 if old_account != new_account {
1989 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1990 if let Some(account) = old_account {
1991 plain_accounts_cursor.upsert(*address, account)?;
1992 } else if existing_entry.is_some() {
1993 plain_accounts_cursor.delete_current()?;
1994 }
1995 }
1996
1997 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1999 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2000 if plain_storage_cursor
2002 .seek_by_key_subkey(*address, *storage_key)?
2003 .filter(|s| s.key == *storage_key)
2004 .is_some()
2005 {
2006 plain_storage_cursor.delete_current()?
2007 }
2008
2009 if !old_storage_value.is_zero() {
2011 plain_storage_cursor.upsert(*address, &storage_entry)?;
2012 }
2013 }
2014 }
2015
2016 let mut receipts_iter = self
2018 .static_file_provider
2019 .get_range_with_static_file_or_database(
2020 StaticFileSegment::Receipts,
2021 from_transaction_num..to_transaction_num + 1,
2022 |static_file, range, _| {
2023 static_file
2024 .receipts_by_tx_range(range.clone())
2025 .map(|r| range.into_iter().zip(r).collect())
2026 },
2027 |range, _| {
2028 self.tx
2029 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2030 .walk_range(range)?
2031 .map(|r| r.map_err(Into::into))
2032 .collect()
2033 },
2034 |_| true,
2035 )?
2036 .into_iter()
2037 .peekable();
2038
2039 let mut receipts = Vec::with_capacity(block_bodies.len());
2040 for block_body in block_bodies {
2042 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2043 for num in block_body.tx_num_range() {
2044 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2045 block_receipts.push(receipts_iter.next().unwrap().1);
2046 }
2047 }
2048 receipts.push(block_receipts);
2049 }
2050
2051 self.remove_receipts_from(from_transaction_num, block)?;
2052
2053 Ok(ExecutionOutcome::new_init(
2054 state,
2055 reverts,
2056 Vec::new(),
2057 receipts,
2058 start_block_number,
2059 Vec::new(),
2060 ))
2061 }
2062}
2063
2064impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2065 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2069 if trie_updates.is_empty() {
2070 return Ok(0)
2071 }
2072
2073 let mut num_entries = 0;
2075
2076 let tx = self.tx_ref();
2077 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2078
2079 for (key, updated_node) in trie_updates.account_nodes_ref() {
2081 let nibbles = StoredNibbles(*key);
2082 match updated_node {
2083 Some(node) => {
2084 if !nibbles.0.is_empty() {
2085 num_entries += 1;
2086 account_trie_cursor.upsert(nibbles, node)?;
2087 }
2088 }
2089 None => {
2090 num_entries += 1;
2091 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2092 account_trie_cursor.delete_current()?;
2093 }
2094 }
2095 }
2096 }
2097
2098 num_entries +=
2099 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2100
2101 Ok(num_entries)
2102 }
2103
2104 fn write_trie_changesets(
2112 &self,
2113 block_number: BlockNumber,
2114 trie_updates: &TrieUpdatesSorted,
2115 updates_overlay: Option<&TrieUpdatesSorted>,
2116 ) -> ProviderResult<usize> {
2117 let mut num_entries = 0;
2118
2119 let mut changeset_cursor =
2120 self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2121 let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2122
2123 let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2125
2126 static EMPTY_ACCOUNT_UPDATES: Vec<(Nibbles, Option<BranchNodeCompact>)> = Vec::new();
2128
2129 let account_overlay_updates = updates_overlay
2131 .map(|overlay| overlay.account_nodes_ref())
2132 .unwrap_or(&EMPTY_ACCOUNT_UPDATES);
2133
2134 let mut in_memory_account_cursor =
2136 InMemoryTrieCursor::new(Some(&mut db_account_cursor), account_overlay_updates);
2137
2138 for (path, _) in trie_updates.account_nodes_ref() {
2139 num_entries += 1;
2140 let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2141 changeset_cursor.append_dup(
2142 block_number,
2143 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2144 )?;
2145 }
2146
2147 let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2148 storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2149
2150 num_entries += self.write_storage_trie_changesets(
2151 block_number,
2152 storage_updates.into_iter(),
2153 updates_overlay,
2154 )?;
2155
2156 Ok(num_entries)
2157 }
2158
2159 fn clear_trie_changesets(&self) -> ProviderResult<()> {
2160 let tx = self.tx_ref();
2161 tx.clear::<tables::AccountsTrieChangeSets>()?;
2162 tx.clear::<tables::StoragesTrieChangeSets>()?;
2163 Ok(())
2164 }
2165
2166 fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2167 let tx = self.tx_ref();
2168 {
2169 let range = from..;
2170 let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2171 let mut walker = cursor.walk_range(range)?;
2172
2173 while walker.next().transpose()?.is_some() {
2174 walker.delete_current()?;
2175 }
2176 }
2177
2178 {
2179 let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2180 let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2181 let mut walker = cursor.walk_range(range)?;
2182
2183 while walker.next().transpose()?.is_some() {
2184 walker.delete_current()?;
2185 }
2186 }
2187
2188 Ok(())
2189 }
2190}
2191
2192impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2193 fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2194 let tx = self.tx_ref();
2195
2196 let mut account_nodes = Vec::new();
2199 let mut seen_account_keys = HashSet::new();
2200 let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2201
2202 for entry in accounts_cursor.walk_range(from..)? {
2203 let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2204 if seen_account_keys.insert(nibbles.0) {
2206 account_nodes.push((nibbles.0, node));
2207 }
2208 }
2209
2210 account_nodes.sort_by_key(|(path, _)| *path);
2211
2212 let mut storage_tries = B256Map::<Vec<_>>::default();
2216 let mut seen_storage_keys = HashSet::new();
2217 let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2218
2219 let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2221
2222 for entry in storages_cursor.walk_range(storage_range_start..)? {
2223 let (
2224 BlockNumberHashedAddress((_, hashed_address)),
2225 TrieChangeSetsEntry { nibbles, node },
2226 ) = entry?;
2227
2228 if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2230 storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2231 }
2232 }
2233
2234 let storage_tries = storage_tries
2236 .into_iter()
2237 .map(|(address, mut nodes)| {
2238 nodes.sort_by_key(|(path, _)| *path);
2239 (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2240 })
2241 .collect();
2242
2243 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2244 }
2245
2246 fn get_block_trie_updates(
2247 &self,
2248 block_number: BlockNumber,
2249 ) -> ProviderResult<TrieUpdatesSorted> {
2250 let tx = self.tx_ref();
2251
2252 let reverts = self.trie_reverts(block_number + 1)?;
2254
2255 let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2258 let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2259
2260 let mut account_nodes = Vec::new();
2262
2263 let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2265 let mut account_cursor = cursor_factory.account_trie_cursor()?;
2266
2267 for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2268 let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2269 let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2271 account_nodes.push((nibbles.0, node_value));
2272 }
2273
2274 let mut storage_tries = B256Map::default();
2276 let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2277 let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2278 let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2279
2280 let mut current_hashed_address = None;
2281 let mut storage_cursor = None;
2282
2283 for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2284 let (
2285 BlockNumberHashedAddress((_, hashed_address)),
2286 TrieChangeSetsEntry { nibbles, .. },
2287 ) = entry?;
2288
2289 if current_hashed_address != Some(hashed_address) {
2291 storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2292 current_hashed_address = Some(hashed_address);
2293 }
2294
2295 let cursor =
2297 storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2298 let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2299 storage_tries
2300 .entry(hashed_address)
2301 .or_insert_with(|| StorageTrieUpdatesSorted {
2302 storage_nodes: Vec::new(),
2303 is_deleted: false,
2304 })
2305 .storage_nodes
2306 .push((nibbles.0, node_value));
2307 }
2308
2309 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2310 }
2311}
2312
2313impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2314 fn write_storage_trie_updates_sorted<'a>(
2320 &self,
2321 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2322 ) -> ProviderResult<usize> {
2323 let mut num_entries = 0;
2324 let mut storage_tries = storage_tries.collect::<Vec<_>>();
2325 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2326 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2327 for (hashed_address, storage_trie_updates) in storage_tries {
2328 let mut db_storage_trie_cursor =
2329 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2330 num_entries +=
2331 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2332 cursor = db_storage_trie_cursor.cursor;
2333 }
2334
2335 Ok(num_entries)
2336 }
2337
2338 fn write_storage_trie_changesets<'a>(
2346 &self,
2347 block_number: BlockNumber,
2348 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2349 updates_overlay: Option<&TrieUpdatesSorted>,
2350 ) -> ProviderResult<usize> {
2351 let mut num_written = 0;
2352
2353 let mut changeset_cursor =
2354 self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2355
2356 let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2360 let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2361
2362 let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2366 changed_curr_values_cursor,
2367 B256::default(), );
2369 let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2370 wiped_nodes_cursor,
2371 B256::default(), );
2373
2374 static EMPTY_UPDATES: Vec<(Nibbles, Option<BranchNodeCompact>)> = Vec::new();
2376
2377 for (hashed_address, storage_trie_updates) in storage_tries {
2378 let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2379
2380 changed_curr_values_cursor =
2382 DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2383
2384 let overlay_updates = updates_overlay
2386 .and_then(|overlay| overlay.storage_tries_ref().get(hashed_address))
2387 .map(|updates| updates.storage_nodes_ref())
2388 .unwrap_or(&EMPTY_UPDATES);
2389
2390 let mut in_memory_changed_cursor =
2392 InMemoryTrieCursor::new(Some(&mut changed_curr_values_cursor), overlay_updates);
2393
2394 let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2397 storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2398 &mut in_memory_changed_cursor,
2399 )?;
2400
2401 if storage_trie_updates.is_deleted() {
2402 wiped_nodes_cursor =
2405 DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2406
2407 let mut in_memory_wiped_cursor =
2409 InMemoryTrieCursor::new(Some(&mut wiped_nodes_cursor), overlay_updates);
2410
2411 let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2412
2413 for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2414 let (path, node) = wiped?;
2415 num_written += 1;
2416 changeset_cursor.append_dup(
2417 changeset_key,
2418 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2419 )?;
2420 }
2421 } else {
2422 for curr_value in curr_values_of_changed {
2423 let (path, node) = curr_value?;
2424 num_written += 1;
2425 changeset_cursor.append_dup(
2426 changeset_key,
2427 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2428 )?;
2429 }
2430 }
2431 }
2432
2433 Ok(num_written)
2434 }
2435}
2436
2437impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2438 fn unwind_account_hashing<'a>(
2439 &self,
2440 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2441 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2442 let hashed_accounts = changesets
2446 .into_iter()
2447 .map(|(_, e)| (keccak256(e.address), e.info))
2448 .collect::<Vec<_>>()
2449 .into_iter()
2450 .rev()
2451 .collect::<BTreeMap<_, _>>();
2452
2453 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2455 for (hashed_address, account) in &hashed_accounts {
2456 if let Some(account) = account {
2457 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2458 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2459 hashed_accounts_cursor.delete_current()?;
2460 }
2461 }
2462
2463 Ok(hashed_accounts)
2464 }
2465
2466 fn unwind_account_hashing_range(
2467 &self,
2468 range: impl RangeBounds<BlockNumber>,
2469 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2470 let changesets = self
2471 .tx
2472 .cursor_read::<tables::AccountChangeSets>()?
2473 .walk_range(range)?
2474 .collect::<Result<Vec<_>, _>>()?;
2475 self.unwind_account_hashing(changesets.iter())
2476 }
2477
2478 fn insert_account_for_hashing(
2479 &self,
2480 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2481 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2482 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2483 let hashed_accounts =
2484 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2485 for (hashed_address, account) in &hashed_accounts {
2486 if let Some(account) = account {
2487 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2488 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2489 hashed_accounts_cursor.delete_current()?;
2490 }
2491 }
2492 Ok(hashed_accounts)
2493 }
2494
2495 fn unwind_storage_hashing(
2496 &self,
2497 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2498 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2499 let mut hashed_storages = changesets
2501 .into_iter()
2502 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2503 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2504 })
2505 .collect::<Vec<_>>();
2506 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2507
2508 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2510 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2511 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2512 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2513 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2514
2515 if hashed_storage
2516 .seek_by_key_subkey(hashed_address, key)?
2517 .filter(|entry| entry.key == key)
2518 .is_some()
2519 {
2520 hashed_storage.delete_current()?;
2521 }
2522
2523 if !value.is_zero() {
2524 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2525 }
2526 }
2527 Ok(hashed_storage_keys)
2528 }
2529
2530 fn unwind_storage_hashing_range(
2531 &self,
2532 range: impl RangeBounds<BlockNumberAddress>,
2533 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2534 let changesets = self
2535 .tx
2536 .cursor_read::<tables::StorageChangeSets>()?
2537 .walk_range(range)?
2538 .collect::<Result<Vec<_>, _>>()?;
2539 self.unwind_storage_hashing(changesets.into_iter())
2540 }
2541
2542 fn insert_storage_for_hashing(
2543 &self,
2544 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2545 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2546 let hashed_storages =
2548 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2549 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2550 map.insert(keccak256(entry.key), entry.value);
2551 map
2552 });
2553 map.insert(keccak256(address), storage);
2554 map
2555 });
2556
2557 let hashed_storage_keys = hashed_storages
2558 .iter()
2559 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2560 .collect();
2561
2562 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2563 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2566 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2567 if hashed_storage_cursor
2568 .seek_by_key_subkey(hashed_address, key)?
2569 .filter(|entry| entry.key == key)
2570 .is_some()
2571 {
2572 hashed_storage_cursor.delete_current()?;
2573 }
2574
2575 if !value.is_zero() {
2576 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2577 }
2578 Ok(())
2579 })
2580 })?;
2581
2582 Ok(hashed_storage_keys)
2583 }
2584}
2585
2586impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2587 fn unwind_account_history_indices<'a>(
2588 &self,
2589 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2590 ) -> ProviderResult<usize> {
2591 let mut last_indices = changesets
2592 .into_iter()
2593 .map(|(index, account)| (account.address, *index))
2594 .collect::<Vec<_>>();
2595 last_indices.sort_by_key(|(a, _)| *a);
2596
2597 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2599 for &(address, rem_index) in &last_indices {
2600 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2601 &mut cursor,
2602 ShardedKey::last(address),
2603 rem_index,
2604 |sharded_key| sharded_key.key == address,
2605 )?;
2606
2607 if !partial_shard.is_empty() {
2610 cursor.insert(
2611 ShardedKey::last(address),
2612 &BlockNumberList::new_pre_sorted(partial_shard),
2613 )?;
2614 }
2615 }
2616
2617 let changesets = last_indices.len();
2618 Ok(changesets)
2619 }
2620
2621 fn unwind_account_history_indices_range(
2622 &self,
2623 range: impl RangeBounds<BlockNumber>,
2624 ) -> ProviderResult<usize> {
2625 let changesets = self
2626 .tx
2627 .cursor_read::<tables::AccountChangeSets>()?
2628 .walk_range(range)?
2629 .collect::<Result<Vec<_>, _>>()?;
2630 self.unwind_account_history_indices(changesets.iter())
2631 }
2632
2633 fn insert_account_history_index(
2634 &self,
2635 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2636 ) -> ProviderResult<()> {
2637 self.append_history_index::<_, tables::AccountsHistory>(
2638 account_transitions,
2639 ShardedKey::new,
2640 )
2641 }
2642
2643 fn unwind_storage_history_indices(
2644 &self,
2645 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2646 ) -> ProviderResult<usize> {
2647 let mut storage_changesets = changesets
2648 .into_iter()
2649 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2650 .collect::<Vec<_>>();
2651 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2652
2653 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2654 for &(address, storage_key, rem_index) in &storage_changesets {
2655 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2656 &mut cursor,
2657 StorageShardedKey::last(address, storage_key),
2658 rem_index,
2659 |storage_sharded_key| {
2660 storage_sharded_key.address == address &&
2661 storage_sharded_key.sharded_key.key == storage_key
2662 },
2663 )?;
2664
2665 if !partial_shard.is_empty() {
2668 cursor.insert(
2669 StorageShardedKey::last(address, storage_key),
2670 &BlockNumberList::new_pre_sorted(partial_shard),
2671 )?;
2672 }
2673 }
2674
2675 let changesets = storage_changesets.len();
2676 Ok(changesets)
2677 }
2678
2679 fn unwind_storage_history_indices_range(
2680 &self,
2681 range: impl RangeBounds<BlockNumberAddress>,
2682 ) -> ProviderResult<usize> {
2683 let changesets = self
2684 .tx
2685 .cursor_read::<tables::StorageChangeSets>()?
2686 .walk_range(range)?
2687 .collect::<Result<Vec<_>, _>>()?;
2688 self.unwind_storage_history_indices(changesets.into_iter())
2689 }
2690
2691 fn insert_storage_history_index(
2692 &self,
2693 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2694 ) -> ProviderResult<()> {
2695 self.append_history_index::<_, tables::StoragesHistory>(
2696 storage_transitions,
2697 |(address, storage_key), highest_block_number| {
2698 StorageShardedKey::new(address, storage_key, highest_block_number)
2699 },
2700 )
2701 }
2702
2703 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2704 {
2706 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2707 self.insert_account_history_index(indices)?;
2708 }
2709
2710 {
2712 let indices = self.changed_storages_and_blocks_with_range(range)?;
2713 self.insert_storage_history_index(indices)?;
2714 }
2715
2716 Ok(())
2717 }
2718}
2719
2720impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2721 for DatabaseProvider<TX, N>
2722{
2723 fn take_block_and_execution_above(
2724 &self,
2725 block: BlockNumber,
2726 ) -> ProviderResult<Chain<Self::Primitives>> {
2727 let range = block + 1..=self.last_block_number()?;
2728
2729 self.unwind_trie_state_from(block + 1)?;
2730
2731 let execution_state = self.take_state_above(block)?;
2733
2734 let blocks = self.recovered_block_range(range)?;
2735
2736 self.remove_blocks_above(block)?;
2739
2740 self.update_pipeline_stages(block, true)?;
2742
2743 Ok(Chain::new(blocks, execution_state, None))
2744 }
2745
2746 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2747 self.unwind_trie_state_from(block + 1)?;
2748
2749 self.remove_state_above(block)?;
2751
2752 self.remove_blocks_above(block)?;
2755
2756 self.update_pipeline_stages(block, true)?;
2758
2759 Ok(())
2760 }
2761}
2762
2763impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2764 for DatabaseProvider<TX, N>
2765{
2766 type Block = BlockTy<N>;
2767 type Receipt = ReceiptTy<N>;
2768
2769 fn insert_block(
2790 &self,
2791 block: RecoveredBlock<Self::Block>,
2792 ) -> ProviderResult<StoredBlockBodyIndices> {
2793 let block_number = block.number();
2794
2795 let mut durations_recorder = metrics::DurationsRecorder::default();
2796
2797 self.static_file_provider
2798 .get_writer(block_number, StaticFileSegment::Headers)?
2799 .append_header(block.header(), &block.hash())?;
2800
2801 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2802 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2803
2804 let mut next_tx_num = self
2805 .tx
2806 .cursor_read::<tables::TransactionBlocks>()?
2807 .last()?
2808 .map(|(n, _)| n + 1)
2809 .unwrap_or_default();
2810 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2811 let first_tx_num = next_tx_num;
2812
2813 let tx_count = block.body().transaction_count() as u64;
2814
2815 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2817 let hash = transaction.tx_hash();
2818
2819 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2820 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2821 }
2822
2823 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2824 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2825 }
2826 next_tx_num += 1;
2827 }
2828
2829 self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?;
2830
2831 debug!(
2832 target: "providers::db",
2833 ?block_number,
2834 actions = ?durations_recorder.actions,
2835 "Inserted block"
2836 );
2837
2838 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2839 }
2840
2841 fn append_block_bodies(
2842 &self,
2843 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2844 ) -> ProviderResult<()> {
2845 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2846
2847 let mut tx_writer =
2849 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2850
2851 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2852 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2853
2854 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2856
2857 for (block_number, body) in &bodies {
2858 tx_writer.increment_block(*block_number)?;
2860
2861 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2862 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2863
2864 let mut durations_recorder = metrics::DurationsRecorder::default();
2865
2866 block_indices_cursor.append(*block_number, &block_indices)?;
2868
2869 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2870
2871 let Some(body) = body else { continue };
2872
2873 if !body.transactions().is_empty() {
2875 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2876 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2877 }
2878
2879 for transaction in body.transactions() {
2881 tx_writer.append_transaction(next_tx_num, transaction)?;
2882
2883 next_tx_num += 1;
2885 }
2886 }
2887
2888 self.storage.writer().write_block_bodies(self, bodies)?;
2889
2890 Ok(())
2891 }
2892
2893 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2894 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2896 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2897 }
2898
2899 let highest_static_file_block = self
2901 .static_file_provider()
2902 .get_highest_static_file_block(StaticFileSegment::Headers)
2903 .expect("todo: error handling, headers should exist");
2904
2905 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
2911 self.static_file_provider()
2912 .get_writer(block, StaticFileSegment::Headers)?
2913 .prune_headers(highest_static_file_block.saturating_sub(block))?;
2914
2915 let unwind_tx_from = self
2917 .block_body_indices(block)?
2918 .map(|b| b.next_tx_num())
2919 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2920
2921 let unwind_tx_to = self
2923 .tx
2924 .cursor_read::<tables::BlockBodyIndices>()?
2925 .last()?
2926 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2928 .1
2929 .last_tx_num();
2930
2931 if unwind_tx_from <= unwind_tx_to {
2932 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2933 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2934 }
2935 }
2936
2937 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
2938
2939 self.remove_bodies_above(block)?;
2940
2941 Ok(())
2942 }
2943
2944 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
2945 self.storage.writer().remove_block_bodies_above(self, block)?;
2946
2947 let unwind_tx_from = self
2949 .block_body_indices(block)?
2950 .map(|b| b.next_tx_num())
2951 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2952
2953 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
2954 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
2955
2956 let static_file_tx_num =
2957 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
2958
2959 let to_delete = static_file_tx_num
2960 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
2961 .unwrap_or_default();
2962
2963 self.static_file_provider
2964 .latest_writer(StaticFileSegment::Transactions)?
2965 .prune_transactions(to_delete, block)?;
2966
2967 Ok(())
2968 }
2969
2970 fn append_blocks_with_state(
2972 &self,
2973 blocks: Vec<RecoveredBlock<Self::Block>>,
2974 execution_outcome: &ExecutionOutcome<Self::Receipt>,
2975 hashed_state: HashedPostStateSorted,
2976 ) -> ProviderResult<()> {
2977 if blocks.is_empty() {
2978 debug!(target: "providers::db", "Attempted to append empty block range");
2979 return Ok(())
2980 }
2981
2982 let first_number = blocks[0].number();
2985
2986 let last_block_number = blocks[blocks.len() - 1].number();
2989
2990 let mut durations_recorder = metrics::DurationsRecorder::default();
2991
2992 for block in blocks {
2994 self.insert_block(block)?;
2995 durations_recorder.record_relative(metrics::Action::InsertBlock);
2996 }
2997
2998 self.write_state(execution_outcome, OriginalValuesKnown::No)?;
2999 durations_recorder.record_relative(metrics::Action::InsertState);
3000
3001 self.write_hashed_state(&hashed_state)?;
3003 durations_recorder.record_relative(metrics::Action::InsertHashes);
3004
3005 self.update_history_indices(first_number..=last_block_number)?;
3006 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3007
3008 self.update_pipeline_stages(last_block_number, false)?;
3010 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3011
3012 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3013
3014 Ok(())
3015 }
3016}
3017
3018impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3019 fn get_prune_checkpoint(
3020 &self,
3021 segment: PruneSegment,
3022 ) -> ProviderResult<Option<PruneCheckpoint>> {
3023 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3024 }
3025
3026 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3027 Ok(PRUNE_SEGMENTS
3028 .iter()
3029 .filter_map(|segment| {
3030 self.tx
3031 .get::<tables::PruneCheckpoints>(*segment)
3032 .transpose()
3033 .map(|chk| chk.map(|chk| (*segment, chk)))
3034 })
3035 .collect::<Result<_, _>>()?)
3036 }
3037}
3038
3039impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3040 fn save_prune_checkpoint(
3041 &self,
3042 segment: PruneSegment,
3043 checkpoint: PruneCheckpoint,
3044 ) -> ProviderResult<()> {
3045 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3046 }
3047}
3048
3049impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3050 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3051 let db_entries = self.tx.entries::<T>()?;
3052 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3053 Ok(entries) => entries,
3054 Err(ProviderError::UnsupportedProvider) => 0,
3055 Err(err) => return Err(err),
3056 };
3057
3058 Ok(db_entries + static_file_entries)
3059 }
3060}
3061
3062impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3063 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3064 let mut finalized_blocks = self
3065 .tx
3066 .cursor_read::<tables::ChainState>()?
3067 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3068 .take(1)
3069 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3070
3071 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3072 Ok(last_finalized_block_number)
3073 }
3074
3075 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3076 let mut finalized_blocks = self
3077 .tx
3078 .cursor_read::<tables::ChainState>()?
3079 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3080 .take(1)
3081 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3082
3083 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3084 Ok(last_finalized_block_number)
3085 }
3086}
3087
3088impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3089 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3090 Ok(self
3091 .tx
3092 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3093 }
3094
3095 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3096 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3097 }
3098}
3099
3100impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3101 type Tx = TX;
3102
3103 fn tx_ref(&self) -> &Self::Tx {
3104 &self.tx
3105 }
3106
3107 fn tx_mut(&mut self) -> &mut Self::Tx {
3108 &mut self.tx
3109 }
3110
3111 fn into_tx(self) -> Self::Tx {
3112 self.tx
3113 }
3114
3115 fn prune_modes_ref(&self) -> &PruneModes {
3116 self.prune_modes_ref()
3117 }
3118
3119 fn commit(self) -> ProviderResult<bool> {
3121 if self.static_file_provider.has_unwind_queued() {
3126 self.tx.commit()?;
3127 self.static_file_provider.commit()?;
3128 } else {
3129 self.static_file_provider.commit()?;
3130 self.tx.commit()?;
3131 }
3132
3133 Ok(true)
3134 }
3135}
3136
3137#[cfg(test)]
3138mod tests {
3139 use super::*;
3140 use crate::{
3141 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3142 BlockWriter,
3143 };
3144 use reth_testing_utils::generators::{self, random_block, BlockParams};
3145
3146 #[test]
3147 fn test_receipts_by_block_range_empty_range() {
3148 let factory = create_test_provider_factory();
3149 let provider = factory.provider().unwrap();
3150
3151 let start = 10u64;
3153 let end = 9u64;
3154 let result = provider.receipts_by_block_range(start..=end).unwrap();
3155 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3156 }
3157
3158 #[test]
3159 fn test_receipts_by_block_range_nonexistent_blocks() {
3160 let factory = create_test_provider_factory();
3161 let provider = factory.provider().unwrap();
3162
3163 let result = provider.receipts_by_block_range(10..=12).unwrap();
3165 assert_eq!(result, vec![vec![], vec![], vec![]]);
3166 }
3167
3168 #[test]
3169 fn test_receipts_by_block_range_single_block() {
3170 let factory = create_test_provider_factory();
3171 let data = BlockchainTestData::default();
3172
3173 let provider_rw = factory.provider_rw().unwrap();
3174 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3175 provider_rw
3176 .write_state(
3177 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3178 crate::OriginalValuesKnown::No,
3179 )
3180 .unwrap();
3181 provider_rw.insert_block(data.blocks[0].0.clone()).unwrap();
3182 provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3183 provider_rw.commit().unwrap();
3184
3185 let provider = factory.provider().unwrap();
3186 let result = provider.receipts_by_block_range(1..=1).unwrap();
3187
3188 assert_eq!(result.len(), 1);
3190 assert_eq!(result[0].len(), 1);
3191 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3192 }
3193
3194 #[test]
3195 fn test_receipts_by_block_range_multiple_blocks() {
3196 let factory = create_test_provider_factory();
3197 let data = BlockchainTestData::default();
3198
3199 let provider_rw = factory.provider_rw().unwrap();
3200 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3201 provider_rw
3202 .write_state(
3203 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3204 crate::OriginalValuesKnown::No,
3205 )
3206 .unwrap();
3207 for i in 0..3 {
3208 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3209 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3210 }
3211 provider_rw.commit().unwrap();
3212
3213 let provider = factory.provider().unwrap();
3214 let result = provider.receipts_by_block_range(1..=3).unwrap();
3215
3216 assert_eq!(result.len(), 3);
3218 for (i, block_receipts) in result.iter().enumerate() {
3219 assert_eq!(block_receipts.len(), 1);
3220 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3221 }
3222 }
3223
3224 #[test]
3225 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3226 let factory = create_test_provider_factory();
3227 let data = BlockchainTestData::default();
3228
3229 let provider_rw = factory.provider_rw().unwrap();
3230 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3231 provider_rw
3232 .write_state(
3233 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3234 crate::OriginalValuesKnown::No,
3235 )
3236 .unwrap();
3237
3238 for i in 0..3 {
3240 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3241 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3242 }
3243 provider_rw.commit().unwrap();
3244
3245 let provider = factory.provider().unwrap();
3246 let result = provider.receipts_by_block_range(1..=3).unwrap();
3247
3248 assert_eq!(result.len(), 3);
3250 for block_receipts in &result {
3251 assert_eq!(block_receipts.len(), 1);
3252 }
3253 }
3254
3255 #[test]
3256 fn test_receipts_by_block_range_partial_range() {
3257 let factory = create_test_provider_factory();
3258 let data = BlockchainTestData::default();
3259
3260 let provider_rw = factory.provider_rw().unwrap();
3261 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3262 provider_rw
3263 .write_state(
3264 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3265 crate::OriginalValuesKnown::No,
3266 )
3267 .unwrap();
3268 for i in 0..3 {
3269 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3270 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3271 }
3272 provider_rw.commit().unwrap();
3273
3274 let provider = factory.provider().unwrap();
3275
3276 let result = provider.receipts_by_block_range(2..=5).unwrap();
3278 assert_eq!(result.len(), 4);
3279
3280 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3287 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3288 }
3289
3290 #[test]
3291 fn test_receipts_by_block_range_all_empty_blocks() {
3292 let factory = create_test_provider_factory();
3293 let mut rng = generators::rng();
3294
3295 let mut blocks = Vec::new();
3297 for i in 0..3 {
3298 let block =
3299 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3300 blocks.push(block);
3301 }
3302
3303 let provider_rw = factory.provider_rw().unwrap();
3304 for block in blocks {
3305 provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
3306 }
3307 provider_rw.commit().unwrap();
3308
3309 let provider = factory.provider().unwrap();
3310 let result = provider.receipts_by_block_range(1..=3).unwrap();
3311
3312 assert_eq!(result.len(), 3);
3313 for block_receipts in result {
3314 assert_eq!(block_receipts.len(), 0);
3315 }
3316 }
3317
3318 #[test]
3319 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3320 let factory = create_test_provider_factory();
3321 let data = BlockchainTestData::default();
3322
3323 let provider_rw = factory.provider_rw().unwrap();
3324 provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3325 provider_rw
3326 .write_state(
3327 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3328 crate::OriginalValuesKnown::No,
3329 )
3330 .unwrap();
3331 for i in 0..3 {
3332 provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3333 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3334 }
3335 provider_rw.commit().unwrap();
3336
3337 let provider = factory.provider().unwrap();
3338
3339 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3341
3342 let mut individual_results = Vec::new();
3344 for block_num in 1..=3 {
3345 let receipts =
3346 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3347 individual_results.push(receipts);
3348 }
3349
3350 assert_eq!(range_result, individual_results);
3351 }
3352
3353 #[test]
3354 fn test_write_trie_changesets() {
3355 use reth_db_api::models::BlockNumberHashedAddress;
3356 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3357
3358 let factory = create_test_provider_factory();
3359 let provider_rw = factory.provider_rw().unwrap();
3360
3361 let block_number = 1u64;
3362
3363 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3365 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3366
3367 let node1 = BranchNodeCompact::new(
3368 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3374
3375 {
3377 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3378 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3379 }
3380
3381 let account_nodes = vec![
3383 (account_nibbles1, Some(node1.clone())), (account_nibbles2, None), ];
3386
3387 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3392 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3393 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3394
3395 let storage_node1 = BranchNodeCompact::new(
3396 0b1111_0000_0000_0000,
3397 0b0000_0000_0000_0000,
3398 0b0000_0000_0000_0000,
3399 vec![],
3400 None,
3401 );
3402
3403 let storage_node2 = BranchNodeCompact::new(
3404 0b0000_1111_0000_0000,
3405 0b0000_0000_0000_0000,
3406 0b0000_0000_0000_0000,
3407 vec![],
3408 None,
3409 );
3410
3411 let storage_node1_old = BranchNodeCompact::new(
3413 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3415 0b0000_0000_0000_0000,
3416 vec![],
3417 None,
3418 );
3419
3420 {
3422 let mut cursor =
3423 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3424 let entry = StorageTrieEntry {
3426 nibbles: StoredNibblesSubKey(storage_nibbles1),
3427 node: storage_node1_old.clone(),
3428 };
3429 cursor.upsert(storage_address1, &entry).unwrap();
3430 }
3431
3432 {
3434 let mut cursor =
3435 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3436 let entry1 = StorageTrieEntry {
3438 nibbles: StoredNibblesSubKey(storage_nibbles1),
3439 node: storage_node1.clone(),
3440 };
3441 cursor.upsert(storage_address2, &entry1).unwrap();
3442 let entry3 = StorageTrieEntry {
3444 nibbles: StoredNibblesSubKey(storage_nibbles3),
3445 node: storage_node2.clone(),
3446 };
3447 cursor.upsert(storage_address2, &entry3).unwrap();
3448 }
3449
3450 let storage_trie1 = StorageTrieUpdatesSorted {
3452 is_deleted: false,
3453 storage_nodes: vec![
3454 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3457 };
3458
3459 let storage_trie2 = StorageTrieUpdatesSorted {
3461 is_deleted: true,
3462 storage_nodes: vec![
3463 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3468 };
3469
3470 let mut storage_tries = B256Map::default();
3471 storage_tries.insert(storage_address1, storage_trie1);
3472 storage_tries.insert(storage_address2, storage_trie2);
3473
3474 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3475
3476 let num_written =
3478 provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3479
3480 assert_eq!(num_written, 7);
3487
3488 {
3490 let mut cursor =
3491 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3492
3493 let all_entries = cursor
3495 .walk_dup(Some(block_number), None)
3496 .unwrap()
3497 .collect::<Result<Vec<_>, _>>()
3498 .unwrap();
3499
3500 assert_eq!(
3502 all_entries,
3503 vec![
3504 (
3505 block_number,
3506 TrieChangeSetsEntry {
3507 nibbles: StoredNibblesSubKey(account_nibbles1),
3508 node: Some(node1),
3509 }
3510 ),
3511 (
3512 block_number,
3513 TrieChangeSetsEntry {
3514 nibbles: StoredNibblesSubKey(account_nibbles2),
3515 node: None,
3516 }
3517 ),
3518 ]
3519 );
3520 }
3521
3522 {
3524 let mut cursor =
3525 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3526
3527 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3529 let entries1 =
3530 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3531
3532 assert_eq!(
3533 entries1,
3534 vec![
3535 (
3536 key1,
3537 TrieChangeSetsEntry {
3538 nibbles: StoredNibblesSubKey(storage_nibbles1),
3539 node: Some(storage_node1_old), }
3541 ),
3542 (
3543 key1,
3544 TrieChangeSetsEntry {
3545 nibbles: StoredNibblesSubKey(storage_nibbles2),
3546 node: None, }
3548 ),
3549 ]
3550 );
3551
3552 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3554 let entries2 =
3555 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3556
3557 assert_eq!(
3558 entries2,
3559 vec![
3560 (
3561 key2,
3562 TrieChangeSetsEntry {
3563 nibbles: StoredNibblesSubKey(storage_nibbles1),
3564 node: Some(storage_node1), }
3566 ),
3567 (
3568 key2,
3569 TrieChangeSetsEntry {
3570 nibbles: StoredNibblesSubKey(storage_nibbles2),
3571 node: None, }
3573 ),
3574 (
3575 key2,
3576 TrieChangeSetsEntry {
3577 nibbles: StoredNibblesSubKey(storage_nibbles3),
3578 node: Some(storage_node2), }
3580 ),
3581 ]
3582 );
3583 }
3584
3585 provider_rw.commit().unwrap();
3586 }
3587
3588 #[test]
3589 fn test_write_trie_changesets_with_overlay() {
3590 use reth_db_api::models::BlockNumberHashedAddress;
3591 use reth_trie::BranchNodeCompact;
3592
3593 let factory = create_test_provider_factory();
3594 let provider_rw = factory.provider_rw().unwrap();
3595
3596 let block_number = 1u64;
3597
3598 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3600 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3601
3602 let node1 = BranchNodeCompact::new(
3603 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3609
3610 let node1_old = BranchNodeCompact::new(
3615 0b1010_1010_1010_1010, 0b0000_0000_0000_0000,
3617 0b0000_0000_0000_0000,
3618 vec![],
3619 None,
3620 );
3621
3622 let overlay_account_nodes = vec![
3624 (account_nibbles1, Some(node1_old.clone())), ];
3626
3627 let account_nodes = vec![
3629 (account_nibbles1, Some(node1)), (account_nibbles2, None), ];
3632
3633 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3638 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3639 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3640
3641 let storage_node1 = BranchNodeCompact::new(
3642 0b1111_0000_0000_0000,
3643 0b0000_0000_0000_0000,
3644 0b0000_0000_0000_0000,
3645 vec![],
3646 None,
3647 );
3648
3649 let storage_node2 = BranchNodeCompact::new(
3650 0b0000_1111_0000_0000,
3651 0b0000_0000_0000_0000,
3652 0b0000_0000_0000_0000,
3653 vec![],
3654 None,
3655 );
3656
3657 let storage_node1_old = BranchNodeCompact::new(
3659 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3661 0b0000_0000_0000_0000,
3662 vec![],
3663 None,
3664 );
3665
3666 let mut overlay_storage_tries = B256Map::default();
3668
3669 let overlay_storage_trie1 = StorageTrieUpdatesSorted {
3671 is_deleted: false,
3672 storage_nodes: vec![
3673 (storage_nibbles1, Some(storage_node1_old.clone())), ],
3676 };
3677
3678 let overlay_storage_trie2 = StorageTrieUpdatesSorted {
3680 is_deleted: false,
3681 storage_nodes: vec![
3682 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles3, Some(storage_node2.clone())), ],
3685 };
3686
3687 overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
3688 overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
3689
3690 let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
3691
3692 let storage_trie1 = StorageTrieUpdatesSorted {
3694 is_deleted: false,
3695 storage_nodes: vec![
3696 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3699 };
3700
3701 let storage_trie2 = StorageTrieUpdatesSorted {
3703 is_deleted: true,
3704 storage_nodes: vec![
3705 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3711 };
3712
3713 let mut storage_tries = B256Map::default();
3714 storage_tries.insert(storage_address1, storage_trie1);
3715 storage_tries.insert(storage_address2, storage_trie2);
3716
3717 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3718
3719 let num_written =
3721 provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
3722
3723 assert_eq!(num_written, 7);
3730
3731 {
3733 let mut cursor =
3734 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3735
3736 let all_entries = cursor
3738 .walk_dup(Some(block_number), None)
3739 .unwrap()
3740 .collect::<Result<Vec<_>, _>>()
3741 .unwrap();
3742
3743 assert_eq!(
3745 all_entries,
3746 vec![
3747 (
3748 block_number,
3749 TrieChangeSetsEntry {
3750 nibbles: StoredNibblesSubKey(account_nibbles1),
3751 node: Some(node1_old), }
3753 ),
3754 (
3755 block_number,
3756 TrieChangeSetsEntry {
3757 nibbles: StoredNibblesSubKey(account_nibbles2),
3758 node: None,
3759 }
3760 ),
3761 ]
3762 );
3763 }
3764
3765 {
3767 let mut cursor =
3768 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3769
3770 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3772 let entries1 =
3773 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3774
3775 assert_eq!(
3776 entries1,
3777 vec![
3778 (
3779 key1,
3780 TrieChangeSetsEntry {
3781 nibbles: StoredNibblesSubKey(storage_nibbles1),
3782 node: Some(storage_node1_old), }
3784 ),
3785 (
3786 key1,
3787 TrieChangeSetsEntry {
3788 nibbles: StoredNibblesSubKey(storage_nibbles2),
3789 node: None, }
3791 ),
3792 ]
3793 );
3794
3795 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3797 let entries2 =
3798 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3799
3800 assert_eq!(
3801 entries2,
3802 vec![
3803 (
3804 key2,
3805 TrieChangeSetsEntry {
3806 nibbles: StoredNibblesSubKey(storage_nibbles1),
3807 node: Some(storage_node1), }
3809 ),
3810 (
3811 key2,
3812 TrieChangeSetsEntry {
3813 nibbles: StoredNibblesSubKey(storage_nibbles2),
3814 node: None, }
3816 ),
3817 (
3818 key2,
3819 TrieChangeSetsEntry {
3820 nibbles: StoredNibblesSubKey(storage_nibbles3),
3821 node: Some(storage_node2), }
3824 ),
3825 ]
3826 );
3827 }
3828
3829 provider_rw.commit().unwrap();
3830 }
3831
3832 #[test]
3833 fn test_clear_trie_changesets_from() {
3834 use alloy_primitives::hex_literal::hex;
3835 use reth_db_api::models::BlockNumberHashedAddress;
3836 use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
3837
3838 let factory = create_test_provider_factory();
3839
3840 let block1 = 100u64;
3842 let block2 = 101u64;
3843 let block3 = 102u64;
3844 let block4 = 103u64;
3845 let block5 = 104u64;
3846
3847 let storage_address1 =
3849 B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
3850 let storage_address2 =
3851 B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
3852
3853 let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
3855 let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
3856 let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
3857
3858 let node1 = BranchNodeCompact::new(
3860 0b1111_1111_1111_1111,
3861 0b1111_1111_1111_1111,
3862 0b0000_0000_0000_0001,
3863 vec![B256::from(hex!(
3864 "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
3865 ))],
3866 None,
3867 );
3868 let node2 = BranchNodeCompact::new(
3869 0b1111_1111_1111_1110,
3870 0b1111_1111_1111_1110,
3871 0b0000_0000_0000_0010,
3872 vec![B256::from(hex!(
3873 "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
3874 ))],
3875 Some(B256::from(hex!(
3876 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
3877 ))),
3878 );
3879
3880 {
3882 let provider_rw = factory.provider_rw().unwrap();
3883 let mut cursor =
3884 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
3885
3886 cursor
3888 .upsert(
3889 block1,
3890 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3891 )
3892 .unwrap();
3893 cursor
3894 .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
3895 .unwrap();
3896
3897 cursor
3899 .upsert(
3900 block2,
3901 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
3902 )
3903 .unwrap();
3904 cursor
3905 .upsert(
3906 block2,
3907 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3908 )
3909 .unwrap(); cursor
3911 .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3912 .unwrap();
3913
3914 cursor
3916 .upsert(
3917 block3,
3918 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
3919 )
3920 .unwrap();
3921 cursor
3922 .upsert(
3923 block3,
3924 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
3925 )
3926 .unwrap();
3927
3928 cursor
3930 .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
3931 .unwrap();
3932
3933 cursor
3935 .upsert(
3936 block5,
3937 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
3938 )
3939 .unwrap();
3940 cursor
3941 .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3942 .unwrap();
3943
3944 provider_rw.commit().unwrap();
3945 }
3946
3947 {
3949 let provider_rw = factory.provider_rw().unwrap();
3950 let mut cursor =
3951 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
3952
3953 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
3955 cursor
3956 .upsert(
3957 key1_block1,
3958 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3959 )
3960 .unwrap();
3961 cursor
3962 .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
3963 .unwrap();
3964
3965 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
3968 cursor
3969 .upsert(
3970 key1_block2,
3971 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
3972 )
3973 .unwrap();
3974 cursor
3975 .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
3976 .unwrap(); cursor
3978 .upsert(
3979 key1_block2,
3980 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
3981 )
3982 .unwrap();
3983
3984 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
3986 cursor
3987 .upsert(
3988 key2_block3,
3989 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
3990 )
3991 .unwrap();
3992 cursor
3993 .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3994 .unwrap();
3995
3996 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
3998 cursor
3999 .upsert(
4000 key1_block4,
4001 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4002 )
4003 .unwrap();
4004 cursor
4005 .upsert(
4006 key1_block4,
4007 &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4008 )
4009 .unwrap(); let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4013 cursor
4014 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4015 .unwrap();
4016 cursor
4017 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4018 .unwrap();
4019
4020 provider_rw.commit().unwrap();
4021 }
4022
4023 {
4025 let provider_rw = factory.provider_rw().unwrap();
4026 provider_rw.clear_trie_changesets_from(block2).unwrap();
4027 provider_rw.commit().unwrap();
4028 }
4029
4030 {
4032 let provider = factory.provider().unwrap();
4033 let mut cursor =
4034 provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4035
4036 let block1_entries = cursor
4038 .walk_dup(Some(block1), None)
4039 .unwrap()
4040 .collect::<Result<Vec<_>, _>>()
4041 .unwrap();
4042 assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4043 assert_eq!(block1_entries[0].0, block1);
4044 assert_eq!(block1_entries[1].0, block1);
4045
4046 let block2_entries = cursor
4048 .walk_dup(Some(block2), None)
4049 .unwrap()
4050 .collect::<Result<Vec<_>, _>>()
4051 .unwrap();
4052 assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4053
4054 let block3_entries = cursor
4055 .walk_dup(Some(block3), None)
4056 .unwrap()
4057 .collect::<Result<Vec<_>, _>>()
4058 .unwrap();
4059 assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4060
4061 let block4_entries = cursor
4062 .walk_dup(Some(block4), None)
4063 .unwrap()
4064 .collect::<Result<Vec<_>, _>>()
4065 .unwrap();
4066 assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4067
4068 let block5_entries = cursor
4070 .walk_dup(Some(block5), None)
4071 .unwrap()
4072 .collect::<Result<Vec<_>, _>>()
4073 .unwrap();
4074 assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4075 }
4076
4077 {
4079 let provider = factory.provider().unwrap();
4080 let mut cursor =
4081 provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4082
4083 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4085 let block1_entries = cursor
4086 .walk_dup(Some(key1_block1), None)
4087 .unwrap()
4088 .collect::<Result<Vec<_>, _>>()
4089 .unwrap();
4090 assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4091
4092 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4094 let block2_entries = cursor
4095 .walk_dup(Some(key1_block2), None)
4096 .unwrap()
4097 .collect::<Result<Vec<_>, _>>()
4098 .unwrap();
4099 assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4100
4101 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4102 let block3_entries = cursor
4103 .walk_dup(Some(key2_block3), None)
4104 .unwrap()
4105 .collect::<Result<Vec<_>, _>>()
4106 .unwrap();
4107 assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4108
4109 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4110 let block4_entries = cursor
4111 .walk_dup(Some(key1_block4), None)
4112 .unwrap()
4113 .collect::<Result<Vec<_>, _>>()
4114 .unwrap();
4115 assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4116
4117 let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4119 let block5_entries = cursor
4120 .walk_dup(Some(key2_block5), None)
4121 .unwrap()
4122 .collect::<Result<Vec<_>, _>>()
4123 .unwrap();
4124 assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4125 }
4126 }
4127
4128 #[test]
4129 fn test_write_trie_updates_sorted() {
4130 use reth_trie::{
4131 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4132 BranchNodeCompact, StorageTrieEntry,
4133 };
4134
4135 let factory = create_test_provider_factory();
4136 let provider_rw = factory.provider_rw().unwrap();
4137
4138 {
4140 let tx = provider_rw.tx_ref();
4141 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4142
4143 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4145 cursor
4146 .upsert(
4147 to_delete,
4148 &BranchNodeCompact::new(
4149 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4153 None,
4154 ),
4155 )
4156 .unwrap();
4157
4158 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4160 cursor
4161 .upsert(
4162 to_update,
4163 &BranchNodeCompact::new(
4164 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4168 None,
4169 ),
4170 )
4171 .unwrap();
4172 }
4173
4174 let storage_address1 = B256::from([1u8; 32]);
4176 let storage_address2 = B256::from([2u8; 32]);
4177 {
4178 let tx = provider_rw.tx_ref();
4179 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4180
4181 storage_cursor
4183 .upsert(
4184 storage_address1,
4185 &StorageTrieEntry {
4186 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4187 node: BranchNodeCompact::new(
4188 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4190 0b0000_0000_0000_0000,
4191 vec![],
4192 None,
4193 ),
4194 },
4195 )
4196 .unwrap();
4197
4198 storage_cursor
4200 .upsert(
4201 storage_address2,
4202 &StorageTrieEntry {
4203 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4204 node: BranchNodeCompact::new(
4205 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4207 0b0000_0000_0000_0000,
4208 vec![],
4209 None,
4210 ),
4211 },
4212 )
4213 .unwrap();
4214 storage_cursor
4215 .upsert(
4216 storage_address2,
4217 &StorageTrieEntry {
4218 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4219 node: BranchNodeCompact::new(
4220 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4222 0b0000_0000_0000_0000,
4223 vec![],
4224 None,
4225 ),
4226 },
4227 )
4228 .unwrap();
4229 }
4230
4231 let account_nodes = vec![
4233 (
4234 Nibbles::from_nibbles([0x1, 0x2]),
4235 Some(BranchNodeCompact::new(
4236 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4240 None,
4241 )),
4242 ),
4243 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4245 Nibbles::from_nibbles([0x5, 0x6]),
4246 Some(BranchNodeCompact::new(
4247 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4251 None,
4252 )),
4253 ),
4254 ];
4255
4256 let storage_trie1 = StorageTrieUpdatesSorted {
4258 is_deleted: false,
4259 storage_nodes: vec![
4260 (
4261 Nibbles::from_nibbles([0x1, 0x0]),
4262 Some(BranchNodeCompact::new(
4263 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4267 None,
4268 )),
4269 ),
4270 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4272 };
4273
4274 let storage_trie2 = StorageTrieUpdatesSorted {
4275 is_deleted: true, storage_nodes: vec![],
4277 };
4278
4279 let mut storage_tries = B256Map::default();
4280 storage_tries.insert(storage_address1, storage_trie1);
4281 storage_tries.insert(storage_address2, storage_trie2);
4282
4283 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4284
4285 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4287
4288 assert_eq!(num_entries, 5);
4291
4292 let tx = provider_rw.tx_ref();
4294 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4295
4296 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4298 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4299 assert!(entry1.is_some(), "Updated account node should exist");
4300 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4301 assert_eq!(
4302 entry1.unwrap().1.state_mask,
4303 expected_mask,
4304 "Account node should have updated state_mask"
4305 );
4306
4307 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4309 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4310 assert!(entry2.is_none(), "Deleted account node should not exist");
4311
4312 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4314 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4315 assert!(entry3.is_some(), "New account node should exist");
4316
4317 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4319
4320 let storage_entries1: Vec<_> = storage_cursor
4322 .walk_dup(Some(storage_address1), None)
4323 .unwrap()
4324 .collect::<Result<Vec<_>, _>>()
4325 .unwrap();
4326 assert_eq!(
4327 storage_entries1.len(),
4328 1,
4329 "Storage address1 should have 1 entry after deletion"
4330 );
4331 assert_eq!(
4332 storage_entries1[0].1.nibbles.0,
4333 Nibbles::from_nibbles([0x1, 0x0]),
4334 "Remaining entry should be [0x1, 0x0]"
4335 );
4336
4337 let storage_entries2: Vec<_> = storage_cursor
4339 .walk_dup(Some(storage_address2), None)
4340 .unwrap()
4341 .collect::<Result<Vec<_>, _>>()
4342 .unwrap();
4343 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4344
4345 provider_rw.commit().unwrap();
4346 }
4347
4348 #[test]
4349 fn test_get_block_trie_updates() {
4350 use reth_db_api::models::BlockNumberHashedAddress;
4351 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4352
4353 let factory = create_test_provider_factory();
4354 let provider_rw = factory.provider_rw().unwrap();
4355
4356 let target_block = 2u64;
4357 let next_block = 3u64;
4358
4359 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4361 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4362 let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4363
4364 let node1 = BranchNodeCompact::new(
4365 0b1111_1111_0000_0000,
4366 0b0000_0000_0000_0000,
4367 0b0000_0000_0000_0000,
4368 vec![],
4369 None,
4370 );
4371
4372 let node2 = BranchNodeCompact::new(
4373 0b0000_0000_1111_1111,
4374 0b0000_0000_0000_0000,
4375 0b0000_0000_0000_0000,
4376 vec![],
4377 None,
4378 );
4379
4380 let node3 = BranchNodeCompact::new(
4381 0b1010_1010_1010_1010,
4382 0b0000_0000_0000_0000,
4383 0b0000_0000_0000_0000,
4384 vec![],
4385 None,
4386 );
4387
4388 {
4390 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4391 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4392 cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4393 }
4395
4396 {
4398 let mut cursor =
4399 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4400 cursor
4402 .append_dup(
4403 target_block,
4404 TrieChangeSetsEntry {
4405 nibbles: StoredNibblesSubKey(account_nibbles1),
4406 node: Some(BranchNodeCompact::new(
4407 0b1111_0000_0000_0000, 0b0000_0000_0000_0000,
4409 0b0000_0000_0000_0000,
4410 vec![],
4411 None,
4412 )),
4413 },
4414 )
4415 .unwrap();
4416 cursor
4418 .append_dup(
4419 target_block,
4420 TrieChangeSetsEntry {
4421 nibbles: StoredNibblesSubKey(account_nibbles2),
4422 node: None,
4423 },
4424 )
4425 .unwrap();
4426 }
4427
4428 {
4430 let mut cursor =
4431 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4432 cursor
4434 .append_dup(
4435 next_block,
4436 TrieChangeSetsEntry {
4437 nibbles: StoredNibblesSubKey(account_nibbles3),
4438 node: Some(node3),
4439 },
4440 )
4441 .unwrap();
4442 }
4443
4444 let storage_address1 = B256::from([1u8; 32]);
4446 let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4447 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4448
4449 let storage_node1 = BranchNodeCompact::new(
4450 0b1111_1111_1111_0000,
4451 0b0000_0000_0000_0000,
4452 0b0000_0000_0000_0000,
4453 vec![],
4454 None,
4455 );
4456
4457 let storage_node2 = BranchNodeCompact::new(
4458 0b0101_0101_0101_0101,
4459 0b0000_0000_0000_0000,
4460 0b0000_0000_0000_0000,
4461 vec![],
4462 None,
4463 );
4464
4465 {
4467 let mut cursor =
4468 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4469 cursor
4470 .upsert(
4471 storage_address1,
4472 &StorageTrieEntry {
4473 nibbles: StoredNibblesSubKey(storage_nibbles1),
4474 node: storage_node1.clone(),
4475 },
4476 )
4477 .unwrap();
4478 }
4480
4481 {
4483 let mut cursor =
4484 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4485 let key = BlockNumberHashedAddress((target_block, storage_address1));
4486
4487 cursor
4489 .append_dup(
4490 key,
4491 TrieChangeSetsEntry {
4492 nibbles: StoredNibblesSubKey(storage_nibbles1),
4493 node: Some(BranchNodeCompact::new(
4494 0b0000_0000_1111_1111, 0b0000_0000_0000_0000,
4496 0b0000_0000_0000_0000,
4497 vec![],
4498 None,
4499 )),
4500 },
4501 )
4502 .unwrap();
4503
4504 cursor
4506 .append_dup(
4507 key,
4508 TrieChangeSetsEntry {
4509 nibbles: StoredNibblesSubKey(storage_nibbles2),
4510 node: None,
4511 },
4512 )
4513 .unwrap();
4514 }
4515
4516 {
4518 let mut cursor =
4519 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4520 let key = BlockNumberHashedAddress((next_block, storage_address1));
4521
4522 cursor
4524 .append_dup(
4525 key,
4526 TrieChangeSetsEntry {
4527 nibbles: StoredNibblesSubKey(storage_nibbles2),
4528 node: Some(BranchNodeCompact::new(
4529 0b0101_0101_0101_0101, 0b0000_0000_0000_0000,
4531 0b0000_0000_0000_0000,
4532 vec![],
4533 None,
4534 )),
4535 },
4536 )
4537 .unwrap();
4538 }
4539
4540 provider_rw.commit().unwrap();
4541
4542 let provider = factory.provider().unwrap();
4544 let result = provider.get_block_trie_updates(target_block).unwrap();
4545
4546 assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4548
4549 let nibbles1_update = result
4551 .account_nodes_ref()
4552 .iter()
4553 .find(|(n, _)| n == &account_nibbles1)
4554 .expect("Should find nibbles1");
4555 assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4556 assert_eq!(
4557 nibbles1_update.1.as_ref().unwrap().state_mask,
4558 node1.state_mask,
4559 "nibbles1 should have current value"
4560 );
4561
4562 let nibbles2_update = result
4564 .account_nodes_ref()
4565 .iter()
4566 .find(|(n, _)| n == &account_nibbles2)
4567 .expect("Should find nibbles2");
4568 assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4569 assert_eq!(
4570 nibbles2_update.1.as_ref().unwrap().state_mask,
4571 node2.state_mask,
4572 "nibbles2 should have current value"
4573 );
4574
4575 assert!(
4577 !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4578 "nibbles3 should not be in target_block updates"
4579 );
4580
4581 assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4583 let storage_updates = result
4584 .storage_tries_ref()
4585 .get(&storage_address1)
4586 .expect("Should have storage updates for address1");
4587
4588 assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4589
4590 let storage1_update = storage_updates
4592 .storage_nodes
4593 .iter()
4594 .find(|(n, _)| n == &storage_nibbles1)
4595 .expect("Should find storage_nibbles1");
4596 assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4597 assert_eq!(
4598 storage1_update.1.as_ref().unwrap().state_mask,
4599 storage_node1.state_mask,
4600 "storage_nibbles1 should have current value"
4601 );
4602
4603 let storage2_update = storage_updates
4606 .storage_nodes
4607 .iter()
4608 .find(|(n, _)| n == &storage_nibbles2)
4609 .expect("Should find storage_nibbles2");
4610 assert!(
4611 storage2_update.1.is_some(),
4612 "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4613 );
4614 assert_eq!(
4615 storage2_update.1.as_ref().unwrap().state_mask,
4616 storage_node2.state_mask,
4617 "storage_nibbles2 should have the value that was created and will be deleted"
4618 );
4619 }
4620}