1use crate::{
2 changesets_utils::{
3 storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4 },
5 providers::{
6 database::{chain::ChainStorage, metrics},
7 rocksdb::RocksDBProvider,
8 static_file::StaticFileWriter,
9 NodeTypesForProvider, StaticFileProvider,
10 },
11 to_range,
12 traits::{
13 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
14 },
15 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
16 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
17 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
18 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
19 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
20 PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, RocksDBProviderFactory,
21 StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
22 StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
23 TransactionsProviderExt, TrieReader, TrieWriter,
24};
25use alloy_consensus::{
26 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
27 BlockHeader, TxReceipt,
28};
29use alloy_eips::BlockHashOrNumber;
30use alloy_primitives::{
31 keccak256,
32 map::{hash_map, B256Map, HashMap, HashSet},
33 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
34};
35use itertools::Itertools;
36use parking_lot::RwLock;
37use rayon::slice::ParallelSliceMut;
38use reth_chain_state::ExecutedBlock;
39use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
40use reth_db_api::{
41 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
42 database::Database,
43 models::{
44 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
45 BlockNumberHashedAddress, ShardedKey, StorageSettings, StoredBlockBodyIndices,
46 },
47 table::Table,
48 tables,
49 transaction::{DbTx, DbTxMut},
50 BlockNumberList, PlainAccountState, PlainStorageState,
51};
52use reth_execution_types::{Chain, ExecutionOutcome};
53use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
54use reth_primitives_traits::{
55 Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
65 TryIntoHistoricalStateProvider,
66};
67use reth_storage_errors::provider::ProviderResult;
68use reth_trie::{
69 trie_cursor::{
70 InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
71 TrieCursorIter,
72 },
73 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
74 HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
75};
76use reth_trie_db::{
77 DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
78};
79use revm_database::states::{
80 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
81};
82use std::{
83 cmp::Ordering,
84 collections::{BTreeMap, BTreeSet},
85 fmt::Debug,
86 ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
87 sync::Arc,
88};
89use tracing::{debug, trace};
90
91pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
93
94#[derive(Debug)]
99pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
100 pub DatabaseProvider<<DB as Database>::TXMut, N>,
101);
102
103impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
104 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
105
106 fn deref(&self) -> &Self::Target {
107 &self.0
108 }
109}
110
111impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
112 fn deref_mut(&mut self) -> &mut Self::Target {
113 &mut self.0
114 }
115}
116
117impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
118 for DatabaseProviderRW<DB, N>
119{
120 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
121 &self.0
122 }
123}
124
125impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
126 pub fn commit(self) -> ProviderResult<bool> {
128 self.0.commit()
129 }
130
131 pub fn into_tx(self) -> <DB as Database>::TXMut {
133 self.0.into_tx()
134 }
135
136 #[cfg(any(test, feature = "test-utils"))]
138 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
139 self.0.minimum_pruning_distance = distance;
140 self
141 }
142}
143
144impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
145 for DatabaseProvider<<DB as Database>::TXMut, N>
146{
147 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
148 provider.0
149 }
150}
151
152pub struct DatabaseProvider<TX, N: NodeTypes> {
155 tx: TX,
157 chain_spec: Arc<N::ChainSpec>,
159 static_file_provider: StaticFileProvider<N::Primitives>,
161 prune_modes: PruneModes,
163 storage: Arc<N::Storage>,
165 storage_settings: Arc<RwLock<StorageSettings>>,
167 rocksdb_provider: RocksDBProvider,
169 #[cfg(all(unix, feature = "rocksdb"))]
171 pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
172 minimum_pruning_distance: u64,
174}
175
176impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 let mut s = f.debug_struct("DatabaseProvider");
179 s.field("tx", &self.tx)
180 .field("chain_spec", &self.chain_spec)
181 .field("static_file_provider", &self.static_file_provider)
182 .field("prune_modes", &self.prune_modes)
183 .field("storage", &self.storage)
184 .field("storage_settings", &self.storage_settings)
185 .field("rocksdb_provider", &self.rocksdb_provider);
186 #[cfg(all(unix, feature = "rocksdb"))]
187 s.field("pending_rocksdb_batches", &"<pending batches>");
188 s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
189 }
190}
191
192impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
193 pub const fn prune_modes_ref(&self) -> &PruneModes {
195 &self.prune_modes
196 }
197}
198
199impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
200 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
202 trace!(target: "providers::db", "Returning latest state provider");
203 Box::new(LatestStateProviderRef::new(self))
204 }
205
206 pub fn history_by_block_hash<'a>(
208 &'a self,
209 block_hash: BlockHash,
210 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
211 let mut block_number =
212 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
213 if block_number == self.best_block_number().unwrap_or_default() &&
214 block_number == self.last_block_number().unwrap_or_default()
215 {
216 return Ok(Box::new(LatestStateProviderRef::new(self)))
217 }
218
219 block_number += 1;
221
222 let account_history_prune_checkpoint =
223 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
224 let storage_history_prune_checkpoint =
225 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
226
227 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
228
229 if let Some(prune_checkpoint_block_number) =
232 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
233 {
234 state_provider = state_provider.with_lowest_available_account_history_block_number(
235 prune_checkpoint_block_number + 1,
236 );
237 }
238 if let Some(prune_checkpoint_block_number) =
239 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
240 {
241 state_provider = state_provider.with_lowest_available_storage_history_block_number(
242 prune_checkpoint_block_number + 1,
243 );
244 }
245
246 Ok(Box::new(state_provider))
247 }
248
249 #[cfg(feature = "test-utils")]
250 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
252 self.prune_modes = prune_modes;
253 }
254}
255
256impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
257 type Primitives = N::Primitives;
258}
259
260impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
261 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
263 self.static_file_provider.clone()
264 }
265
266 fn get_static_file_writer(
267 &self,
268 block: BlockNumber,
269 segment: StaticFileSegment,
270 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
271 self.static_file_provider.get_writer(block, segment)
272 }
273}
274
275impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
276 fn rocksdb_provider(&self) -> RocksDBProvider {
278 self.rocksdb_provider.clone()
279 }
280
281 #[cfg(all(unix, feature = "rocksdb"))]
282 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
283 self.pending_rocksdb_batches.lock().push(batch);
284 }
285}
286
287impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
288 for DatabaseProvider<TX, N>
289{
290 type ChainSpec = N::ChainSpec;
291
292 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
293 self.chain_spec.clone()
294 }
295}
296
297impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
298 pub const fn new_rw(
300 tx: TX,
301 chain_spec: Arc<N::ChainSpec>,
302 static_file_provider: StaticFileProvider<N::Primitives>,
303 prune_modes: PruneModes,
304 storage: Arc<N::Storage>,
305 storage_settings: Arc<RwLock<StorageSettings>>,
306 rocksdb_provider: RocksDBProvider,
307 ) -> Self {
308 Self {
309 tx,
310 chain_spec,
311 static_file_provider,
312 prune_modes,
313 storage,
314 storage_settings,
315 rocksdb_provider,
316 #[cfg(all(unix, feature = "rocksdb"))]
317 pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
318 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
319 }
320 }
321}
322
323impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
324 fn as_ref(&self) -> &Self {
325 self
326 }
327}
328
329impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
330 pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
332 if blocks.is_empty() {
333 debug!(target: "providers::db", "Attempted to write empty block range");
334 return Ok(())
335 }
336
337 let first_block = blocks.first().unwrap().recovered_block();
339
340 let last_block = blocks.last().unwrap().recovered_block();
341 let first_number = first_block.number();
342 let last_block_number = last_block.number();
343
344 debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
345
346 for block in blocks {
356 let trie_data = block.trie_data();
357 let ExecutedBlock { recovered_block, execution_output, .. } = block;
358 let block_number = recovered_block.number();
359 self.insert_block(&recovered_block)?;
360
361 self.write_state(&execution_output, OriginalValuesKnown::No)?;
364
365 self.write_hashed_state(&trie_data.hashed_state)?;
367
368 self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
369 self.write_trie_updates_sorted(&trie_data.trie_updates)?;
370 }
371
372 self.update_history_indices(first_number..=last_block_number)?;
374
375 self.update_pipeline_stages(last_block_number, false)?;
377
378 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
379
380 Ok(())
381 }
382
383 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
388 let changed_accounts = self
389 .tx
390 .cursor_read::<tables::AccountChangeSets>()?
391 .walk_range(from..)?
392 .collect::<Result<Vec<_>, _>>()?;
393
394 self.unwind_account_hashing(changed_accounts.iter())?;
396
397 self.unwind_account_history_indices(changed_accounts.iter())?;
399
400 let storage_start = BlockNumberAddress((from, Address::ZERO));
401 let changed_storages = self
402 .tx
403 .cursor_read::<tables::StorageChangeSets>()?
404 .walk_range(storage_start..)?
405 .collect::<Result<Vec<_>, _>>()?;
406
407 self.unwind_storage_hashing(changed_storages.iter().copied())?;
409
410 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
412
413 let trie_revert = self.trie_reverts(from)?;
415 self.write_trie_updates_sorted(&trie_revert)?;
416
417 self.clear_trie_changesets_from(from)?;
419
420 Ok(())
421 }
422
423 fn remove_receipts_from(
425 &self,
426 from_tx: TxNumber,
427 last_block: BlockNumber,
428 ) -> ProviderResult<()> {
429 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
431
432 if EitherWriter::receipts_destination(self).is_static_file() {
433 let static_file_receipt_num =
434 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
435
436 let to_delete = static_file_receipt_num
437 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
438 .unwrap_or_default();
439
440 self.static_file_provider
441 .latest_writer(StaticFileSegment::Receipts)?
442 .prune_receipts(to_delete, last_block)?;
443 }
444
445 Ok(())
446 }
447}
448
449impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
450 fn try_into_history_at_block(
451 self,
452 mut block_number: BlockNumber,
453 ) -> ProviderResult<StateProviderBox> {
454 if block_number == self.best_block_number().unwrap_or_default() {
457 return Ok(Box::new(LatestStateProvider::new(self)))
458 }
459
460 block_number += 1;
462
463 let account_history_prune_checkpoint =
464 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
465 let storage_history_prune_checkpoint =
466 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
467
468 let mut state_provider = HistoricalStateProvider::new(self, block_number);
469
470 if let Some(prune_checkpoint_block_number) =
473 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
474 {
475 state_provider = state_provider.with_lowest_available_account_history_block_number(
476 prune_checkpoint_block_number + 1,
477 );
478 }
479 if let Some(prune_checkpoint_block_number) =
480 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
481 {
482 state_provider = state_provider.with_lowest_available_storage_history_block_number(
483 prune_checkpoint_block_number + 1,
484 );
485 }
486
487 Ok(Box::new(state_provider))
488 }
489}
490
491fn unwind_history_shards<S, T, C>(
506 cursor: &mut C,
507 start_key: T::Key,
508 block_number: BlockNumber,
509 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
510) -> ProviderResult<Vec<u64>>
511where
512 T: Table<Value = BlockNumberList>,
513 T::Key: AsRef<ShardedKey<S>>,
514 C: DbCursorRO<T> + DbCursorRW<T>,
515{
516 let mut item = cursor.seek_exact(start_key)?;
518 while let Some((sharded_key, list)) = item {
519 if !shard_belongs_to_key(&sharded_key) {
521 break
522 }
523
524 cursor.delete_current()?;
527
528 let first = list.iter().next().expect("List can't be empty");
531
532 if first >= block_number {
535 item = cursor.prev()?;
536 continue
537 }
538 else if block_number <= sharded_key.as_ref().highest_block_number {
541 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
544 }
545 return Ok(list.iter().collect::<Vec<_>>())
548 }
549
550 Ok(Vec::new())
552}
553
554impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
555 pub const fn new(
557 tx: TX,
558 chain_spec: Arc<N::ChainSpec>,
559 static_file_provider: StaticFileProvider<N::Primitives>,
560 prune_modes: PruneModes,
561 storage: Arc<N::Storage>,
562 storage_settings: Arc<RwLock<StorageSettings>>,
563 rocksdb_provider: RocksDBProvider,
564 ) -> Self {
565 Self {
566 tx,
567 chain_spec,
568 static_file_provider,
569 prune_modes,
570 storage,
571 storage_settings,
572 rocksdb_provider,
573 #[cfg(all(unix, feature = "rocksdb"))]
574 pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
575 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
576 }
577 }
578
579 pub fn into_tx(self) -> TX {
581 self.tx
582 }
583
584 pub const fn tx_mut(&mut self) -> &mut TX {
586 &mut self.tx
587 }
588
589 pub const fn tx_ref(&self) -> &TX {
591 &self.tx
592 }
593
594 pub fn chain_spec(&self) -> &N::ChainSpec {
596 &self.chain_spec
597 }
598}
599
600impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
601 fn recovered_block<H, HF, B, BF>(
602 &self,
603 id: BlockHashOrNumber,
604 _transaction_kind: TransactionVariant,
605 header_by_number: HF,
606 construct_block: BF,
607 ) -> ProviderResult<Option<B>>
608 where
609 H: AsRef<HeaderTy<N>>,
610 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
611 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
612 {
613 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
614 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
615
616 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
623
624 let tx_range = body.tx_num_range();
625
626 let (transactions, senders) = if tx_range.is_empty() {
627 (vec![], vec![])
628 } else {
629 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
630 };
631
632 let body = self
633 .storage
634 .reader()
635 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
636 .pop()
637 .ok_or(ProviderError::InvalidStorageOutput)?;
638
639 construct_block(header, body, senders)
640 }
641
642 fn block_range<F, H, HF, R>(
652 &self,
653 range: RangeInclusive<BlockNumber>,
654 headers_range: HF,
655 mut assemble_block: F,
656 ) -> ProviderResult<Vec<R>>
657 where
658 H: AsRef<HeaderTy<N>>,
659 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
660 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
661 {
662 if range.is_empty() {
663 return Ok(Vec::new())
664 }
665
666 let len = range.end().saturating_sub(*range.start()) as usize;
667 let mut blocks = Vec::with_capacity(len);
668
669 let headers = headers_range(range.clone())?;
670
671 let present_headers = self
677 .block_body_indices_range(range)?
678 .into_iter()
679 .map(|b| b.tx_num_range())
680 .zip(headers)
681 .collect::<Vec<_>>();
682
683 let mut inputs = Vec::new();
684 for (tx_range, header) in &present_headers {
685 let transactions = if tx_range.is_empty() {
686 Vec::new()
687 } else {
688 self.transactions_by_tx_range(tx_range.clone())?
689 };
690
691 inputs.push((header.as_ref(), transactions));
692 }
693
694 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
695
696 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
697 blocks.push(assemble_block(header, body, tx_range)?);
698 }
699
700 Ok(blocks)
701 }
702
703 fn block_with_senders_range<H, HF, B, BF>(
714 &self,
715 range: RangeInclusive<BlockNumber>,
716 headers_range: HF,
717 assemble_block: BF,
718 ) -> ProviderResult<Vec<B>>
719 where
720 H: AsRef<HeaderTy<N>>,
721 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
722 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
723 {
724 self.block_range(range, headers_range, |header, body, tx_range| {
725 let senders = if tx_range.is_empty() {
726 Vec::new()
727 } else {
728 let known_senders: HashMap<TxNumber, Address> =
729 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
730
731 let mut senders = Vec::with_capacity(body.transactions().len());
732 for (tx_num, tx) in tx_range.zip(body.transactions()) {
733 match known_senders.get(&tx_num) {
734 None => {
735 let sender = tx.recover_signer_unchecked()?;
737 senders.push(sender);
738 }
739 Some(sender) => senders.push(*sender),
740 }
741 }
742
743 senders
744 };
745
746 assemble_block(header, body, senders)
747 })
748 }
749
750 fn populate_bundle_state<A, S>(
754 &self,
755 account_changeset: Vec<(u64, AccountBeforeTx)>,
756 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
757 plain_accounts_cursor: &mut A,
758 plain_storage_cursor: &mut S,
759 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
760 where
761 A: DbCursorRO<PlainAccountState>,
762 S: DbDupCursorRO<PlainStorageState>,
763 {
764 let mut state: BundleStateInit = HashMap::default();
768
769 let mut reverts: RevertsInit = HashMap::default();
775
776 for (block_number, account_before) in account_changeset.into_iter().rev() {
778 let AccountBeforeTx { info: old_info, address } = account_before;
779 match state.entry(address) {
780 hash_map::Entry::Vacant(entry) => {
781 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
782 entry.insert((old_info, new_info, HashMap::default()));
783 }
784 hash_map::Entry::Occupied(mut entry) => {
785 entry.get_mut().0 = old_info;
787 }
788 }
789 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
791 }
792
793 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
795 let BlockNumberAddress((block_number, address)) = block_and_address;
796 let account_state = match state.entry(address) {
798 hash_map::Entry::Vacant(entry) => {
799 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
800 entry.insert((present_info, present_info, HashMap::default()))
801 }
802 hash_map::Entry::Occupied(entry) => entry.into_mut(),
803 };
804
805 match account_state.2.entry(old_storage.key) {
807 hash_map::Entry::Vacant(entry) => {
808 let new_storage = plain_storage_cursor
809 .seek_by_key_subkey(address, old_storage.key)?
810 .filter(|storage| storage.key == old_storage.key)
811 .unwrap_or_default();
812 entry.insert((old_storage.value, new_storage.value));
813 }
814 hash_map::Entry::Occupied(mut entry) => {
815 entry.get_mut().0 = old_storage.value;
816 }
817 };
818
819 reverts
820 .entry(block_number)
821 .or_default()
822 .entry(address)
823 .or_default()
824 .1
825 .push(old_storage);
826 }
827
828 Ok((state, reverts))
829 }
830}
831
832impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
833 fn append_history_index<P, T>(
841 &self,
842 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
843 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
844 ) -> ProviderResult<()>
845 where
846 P: Copy,
847 T: Table<Value = BlockNumberList>,
848 {
849 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
852
853 let mut cursor = self.tx.cursor_write::<T>()?;
854
855 for (partial_key, indices) in index_updates {
856 let last_key = sharded_key_factory(partial_key, u64::MAX);
857 let mut last_shard = cursor
858 .seek_exact(last_key.clone())?
859 .map(|(_, list)| list)
860 .unwrap_or_else(BlockNumberList::empty);
861
862 last_shard.append(indices).map_err(ProviderError::other)?;
863
864 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
866 cursor.upsert(last_key, &last_shard)?;
867 continue;
868 }
869
870 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
872 let mut chunks_peekable = chunks.into_iter().peekable();
873
874 while let Some(chunk) = chunks_peekable.next() {
875 let shard = BlockNumberList::new_pre_sorted(chunk);
876 let highest_block_number = if chunks_peekable.peek().is_some() {
877 shard.iter().next_back().expect("`chunks` does not return empty list")
878 } else {
879 u64::MAX
881 };
882
883 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
884 }
885 }
886
887 Ok(())
888 }
889}
890
891impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
892 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
893 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
894 }
895}
896
897impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
898 fn changed_accounts_with_range(
899 &self,
900 range: impl RangeBounds<BlockNumber>,
901 ) -> ProviderResult<BTreeSet<Address>> {
902 self.tx
903 .cursor_read::<tables::AccountChangeSets>()?
904 .walk_range(range)?
905 .map(|entry| {
906 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
907 })
908 .collect()
909 }
910
911 fn basic_accounts(
912 &self,
913 iter: impl IntoIterator<Item = Address>,
914 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
915 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
916 Ok(iter
917 .into_iter()
918 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
919 .collect::<Result<Vec<_>, _>>()?)
920 }
921
922 fn changed_accounts_and_blocks_with_range(
923 &self,
924 range: RangeInclusive<BlockNumber>,
925 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
926 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
927
928 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
929 BTreeMap::new(),
930 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
931 let (index, account) = entry?;
932 accounts.entry(account.address).or_default().push(index);
933 Ok(accounts)
934 },
935 )?;
936
937 Ok(account_transitions)
938 }
939}
940
941impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
942 fn storage_changeset(
943 &self,
944 block_number: BlockNumber,
945 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
946 let range = block_number..=block_number;
947 let storage_range = BlockNumberAddress::range(range);
948 self.tx
949 .cursor_dup_read::<tables::StorageChangeSets>()?
950 .walk_range(storage_range)?
951 .map(|result| -> ProviderResult<_> { Ok(result?) })
952 .collect()
953 }
954}
955
956impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
957 fn account_block_changeset(
958 &self,
959 block_number: BlockNumber,
960 ) -> ProviderResult<Vec<AccountBeforeTx>> {
961 let range = block_number..=block_number;
962 self.tx
963 .cursor_read::<tables::AccountChangeSets>()?
964 .walk_range(range)?
965 .map(|result| -> ProviderResult<_> {
966 let (_, account_before) = result?;
967 Ok(account_before)
968 })
969 .collect()
970 }
971
972 fn get_account_before_block(
973 &self,
974 block_number: BlockNumber,
975 address: Address,
976 ) -> ProviderResult<Option<AccountBeforeTx>> {
977 self.tx
978 .cursor_dup_read::<tables::AccountChangeSets>()?
979 .seek_by_key_subkey(block_number, address)?
980 .filter(|acc| acc.address == address)
981 .map(Ok)
982 .transpose()
983 }
984}
985
986impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
987 for DatabaseProvider<TX, N>
988{
989 type Header = HeaderTy<N>;
990
991 fn local_tip_header(
992 &self,
993 highest_uninterrupted_block: BlockNumber,
994 ) -> ProviderResult<SealedHeader<Self::Header>> {
995 let static_file_provider = self.static_file_provider();
996
997 let next_static_file_block_num = static_file_provider
1000 .get_highest_static_file_block(StaticFileSegment::Headers)
1001 .map(|id| id + 1)
1002 .unwrap_or_default();
1003 let next_block = highest_uninterrupted_block + 1;
1004
1005 match next_static_file_block_num.cmp(&next_block) {
1006 Ordering::Greater => {
1009 let mut static_file_producer =
1010 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1011 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1012 static_file_producer.commit()?
1015 }
1016 Ordering::Less => {
1017 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1019 }
1020 Ordering::Equal => {}
1021 }
1022
1023 let local_head = static_file_provider
1024 .sealed_header(highest_uninterrupted_block)?
1025 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1026
1027 Ok(local_head)
1028 }
1029}
1030
1031impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1032 type Header = HeaderTy<N>;
1033
1034 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1035 if let Some(num) = self.block_number(block_hash)? {
1036 Ok(self.header_by_number(num)?)
1037 } else {
1038 Ok(None)
1039 }
1040 }
1041
1042 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1043 self.static_file_provider.header_by_number(num)
1044 }
1045
1046 fn headers_range(
1047 &self,
1048 range: impl RangeBounds<BlockNumber>,
1049 ) -> ProviderResult<Vec<Self::Header>> {
1050 self.static_file_provider.headers_range(range)
1051 }
1052
1053 fn sealed_header(
1054 &self,
1055 number: BlockNumber,
1056 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1057 self.static_file_provider.sealed_header(number)
1058 }
1059
1060 fn sealed_headers_while(
1061 &self,
1062 range: impl RangeBounds<BlockNumber>,
1063 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1064 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1065 self.static_file_provider.sealed_headers_while(range, predicate)
1066 }
1067}
1068
1069impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1070 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1071 self.static_file_provider.block_hash(number)
1072 }
1073
1074 fn canonical_hashes_range(
1075 &self,
1076 start: BlockNumber,
1077 end: BlockNumber,
1078 ) -> ProviderResult<Vec<B256>> {
1079 self.static_file_provider.canonical_hashes_range(start, end)
1080 }
1081}
1082
1083impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1084 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1085 let best_number = self.best_block_number()?;
1086 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1087 Ok(ChainInfo { best_hash, best_number })
1088 }
1089
1090 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1091 Ok(self
1094 .get_stage_checkpoint(StageId::Finish)?
1095 .map(|checkpoint| checkpoint.block_number)
1096 .unwrap_or_default())
1097 }
1098
1099 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1100 self.static_file_provider.last_block_number()
1101 }
1102
1103 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1104 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1105 }
1106}
1107
1108impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1109 type Block = BlockTy<N>;
1110
1111 fn find_block_by_hash(
1112 &self,
1113 hash: B256,
1114 source: BlockSource,
1115 ) -> ProviderResult<Option<Self::Block>> {
1116 if source.is_canonical() {
1117 self.block(hash.into())
1118 } else {
1119 Ok(None)
1120 }
1121 }
1122
1123 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1129 if let Some(number) = self.convert_hash_or_number(id)? &&
1130 let Some(header) = self.header_by_number(number)?
1131 {
1132 let Some(transactions) = self.transactions_by_block(number.into())? else {
1137 return Ok(None)
1138 };
1139
1140 let body = self
1141 .storage
1142 .reader()
1143 .read_block_bodies(self, vec![(&header, transactions)])?
1144 .pop()
1145 .ok_or(ProviderError::InvalidStorageOutput)?;
1146
1147 return Ok(Some(Self::Block::new(header, body)))
1148 }
1149
1150 Ok(None)
1151 }
1152
1153 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1154 Ok(None)
1155 }
1156
1157 fn pending_block_and_receipts(
1158 &self,
1159 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1160 Ok(None)
1161 }
1162
1163 fn recovered_block(
1172 &self,
1173 id: BlockHashOrNumber,
1174 transaction_kind: TransactionVariant,
1175 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1176 self.recovered_block(
1177 id,
1178 transaction_kind,
1179 |block_number| self.header_by_number(block_number),
1180 |header, body, senders| {
1181 Self::Block::new(header, body)
1182 .try_into_recovered_unchecked(senders)
1186 .map(Some)
1187 .map_err(|_| ProviderError::SenderRecoveryError)
1188 },
1189 )
1190 }
1191
1192 fn sealed_block_with_senders(
1193 &self,
1194 id: BlockHashOrNumber,
1195 transaction_kind: TransactionVariant,
1196 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1197 self.recovered_block(
1198 id,
1199 transaction_kind,
1200 |block_number| self.sealed_header(block_number),
1201 |header, body, senders| {
1202 Self::Block::new_sealed(header, body)
1203 .try_with_senders_unchecked(senders)
1207 .map(Some)
1208 .map_err(|_| ProviderError::SenderRecoveryError)
1209 },
1210 )
1211 }
1212
1213 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1214 self.block_range(
1215 range,
1216 |range| self.headers_range(range),
1217 |header, body, _| Ok(Self::Block::new(header, body)),
1218 )
1219 }
1220
1221 fn block_with_senders_range(
1222 &self,
1223 range: RangeInclusive<BlockNumber>,
1224 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1225 self.block_with_senders_range(
1226 range,
1227 |range| self.headers_range(range),
1228 |header, body, senders| {
1229 Self::Block::new(header, body)
1230 .try_into_recovered_unchecked(senders)
1231 .map_err(|_| ProviderError::SenderRecoveryError)
1232 },
1233 )
1234 }
1235
1236 fn recovered_block_range(
1237 &self,
1238 range: RangeInclusive<BlockNumber>,
1239 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1240 self.block_with_senders_range(
1241 range,
1242 |range| self.sealed_headers_range(range),
1243 |header, body, senders| {
1244 Self::Block::new_sealed(header, body)
1245 .try_with_senders(senders)
1246 .map_err(|_| ProviderError::SenderRecoveryError)
1247 },
1248 )
1249 }
1250
1251 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1252 Ok(self
1253 .tx
1254 .cursor_read::<tables::TransactionBlocks>()?
1255 .seek(id)
1256 .map(|b| b.map(|(_, bn)| bn))?)
1257 }
1258}
1259
1260impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1261 for DatabaseProvider<TX, N>
1262{
1263 fn transaction_hashes_by_range(
1266 &self,
1267 tx_range: Range<TxNumber>,
1268 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1269 self.static_file_provider.transaction_hashes_by_range(tx_range)
1270 }
1271}
1272
1273impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1275 type Transaction = TxTy<N>;
1276
1277 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1278 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1279 }
1280
1281 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1282 self.static_file_provider.transaction_by_id(id)
1283 }
1284
1285 fn transaction_by_id_unhashed(
1286 &self,
1287 id: TxNumber,
1288 ) -> ProviderResult<Option<Self::Transaction>> {
1289 self.static_file_provider.transaction_by_id_unhashed(id)
1290 }
1291
1292 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1293 if let Some(id) = self.transaction_id(hash)? {
1294 Ok(self.transaction_by_id_unhashed(id)?)
1295 } else {
1296 Ok(None)
1297 }
1298 }
1299
1300 fn transaction_by_hash_with_meta(
1301 &self,
1302 tx_hash: TxHash,
1303 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1304 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1305 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1306 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1307 let Some(sealed_header) = self.sealed_header(block_number)?
1308 {
1309 let (header, block_hash) = sealed_header.split();
1310 if let Some(block_body) = self.block_body_indices(block_number)? {
1311 let index = transaction_id - block_body.first_tx_num();
1316
1317 let meta = TransactionMeta {
1318 tx_hash,
1319 index,
1320 block_hash,
1321 block_number,
1322 base_fee: header.base_fee_per_gas(),
1323 excess_blob_gas: header.excess_blob_gas(),
1324 timestamp: header.timestamp(),
1325 };
1326
1327 return Ok(Some((transaction, meta)))
1328 }
1329 }
1330
1331 Ok(None)
1332 }
1333
1334 fn transactions_by_block(
1335 &self,
1336 id: BlockHashOrNumber,
1337 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1338 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1339 let Some(body) = self.block_body_indices(block_number)?
1340 {
1341 let tx_range = body.tx_num_range();
1342 return if tx_range.is_empty() {
1343 Ok(Some(Vec::new()))
1344 } else {
1345 self.transactions_by_tx_range(tx_range).map(Some)
1346 }
1347 }
1348 Ok(None)
1349 }
1350
1351 fn transactions_by_block_range(
1352 &self,
1353 range: impl RangeBounds<BlockNumber>,
1354 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1355 let range = to_range(range);
1356
1357 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1358 .into_iter()
1359 .map(|body| {
1360 let tx_num_range = body.tx_num_range();
1361 if tx_num_range.is_empty() {
1362 Ok(Vec::new())
1363 } else {
1364 self.transactions_by_tx_range(tx_num_range)
1365 }
1366 })
1367 .collect()
1368 }
1369
1370 fn transactions_by_tx_range(
1371 &self,
1372 range: impl RangeBounds<TxNumber>,
1373 ) -> ProviderResult<Vec<Self::Transaction>> {
1374 self.static_file_provider.transactions_by_tx_range(range)
1375 }
1376
1377 fn senders_by_tx_range(
1378 &self,
1379 range: impl RangeBounds<TxNumber>,
1380 ) -> ProviderResult<Vec<Address>> {
1381 if EitherWriterDestination::senders(self).is_static_file() {
1382 self.static_file_provider.senders_by_tx_range(range)
1383 } else {
1384 self.cursor_read_collect::<tables::TransactionSenders>(range)
1385 }
1386 }
1387
1388 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1389 if EitherWriterDestination::senders(self).is_static_file() {
1390 self.static_file_provider.transaction_sender(id)
1391 } else {
1392 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1393 }
1394 }
1395}
1396
1397impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1398 type Receipt = ReceiptTy<N>;
1399
1400 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1401 self.static_file_provider.get_with_static_file_or_database(
1402 StaticFileSegment::Receipts,
1403 id,
1404 |static_file| static_file.receipt(id),
1405 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1406 )
1407 }
1408
1409 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1410 if let Some(id) = self.transaction_id(hash)? {
1411 self.receipt(id)
1412 } else {
1413 Ok(None)
1414 }
1415 }
1416
1417 fn receipts_by_block(
1418 &self,
1419 block: BlockHashOrNumber,
1420 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1421 if let Some(number) = self.convert_hash_or_number(block)? &&
1422 let Some(body) = self.block_body_indices(number)?
1423 {
1424 let tx_range = body.tx_num_range();
1425 return if tx_range.is_empty() {
1426 Ok(Some(Vec::new()))
1427 } else {
1428 self.receipts_by_tx_range(tx_range).map(Some)
1429 }
1430 }
1431 Ok(None)
1432 }
1433
1434 fn receipts_by_tx_range(
1435 &self,
1436 range: impl RangeBounds<TxNumber>,
1437 ) -> ProviderResult<Vec<Self::Receipt>> {
1438 self.static_file_provider.get_range_with_static_file_or_database(
1439 StaticFileSegment::Receipts,
1440 to_range(range),
1441 |static_file, range, _| static_file.receipts_by_tx_range(range),
1442 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1443 |_| true,
1444 )
1445 }
1446
1447 fn receipts_by_block_range(
1448 &self,
1449 block_range: RangeInclusive<BlockNumber>,
1450 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1451 if block_range.is_empty() {
1452 return Ok(Vec::new());
1453 }
1454
1455 let mut block_body_indices = Vec::new();
1457 for block_num in block_range {
1458 if let Some(indices) = self.block_body_indices(block_num)? {
1459 block_body_indices.push(indices);
1460 } else {
1461 block_body_indices.push(StoredBlockBodyIndices::default());
1463 }
1464 }
1465
1466 if block_body_indices.is_empty() {
1467 return Ok(Vec::new());
1468 }
1469
1470 let non_empty_blocks: Vec<_> =
1472 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1473
1474 if non_empty_blocks.is_empty() {
1475 return Ok(vec![Vec::new(); block_body_indices.len()]);
1477 }
1478
1479 let first_tx = non_empty_blocks[0].first_tx_num();
1481 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1482
1483 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1485 let mut receipts_iter = all_receipts.into_iter();
1486
1487 let mut result = Vec::with_capacity(block_body_indices.len());
1489 for indices in &block_body_indices {
1490 if indices.tx_count == 0 {
1491 result.push(Vec::new());
1492 } else {
1493 let block_receipts =
1494 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1495 result.push(block_receipts);
1496 }
1497 }
1498
1499 Ok(result)
1500 }
1501}
1502
1503impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1504 for DatabaseProvider<TX, N>
1505{
1506 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1507 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1508 }
1509
1510 fn block_body_indices_range(
1511 &self,
1512 range: RangeInclusive<BlockNumber>,
1513 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1514 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1515 }
1516}
1517
1518impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1519 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1520 Ok(if let Some(encoded) = id.get_pre_encoded() {
1521 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1522 } else {
1523 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1524 })
1525 }
1526
1527 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1529 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1530 }
1531
1532 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1533 self.tx
1534 .cursor_read::<tables::StageCheckpoints>()?
1535 .walk(None)?
1536 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1537 .map_err(ProviderError::Database)
1538 }
1539}
1540
1541impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1542 fn save_stage_checkpoint(
1544 &self,
1545 id: StageId,
1546 checkpoint: StageCheckpoint,
1547 ) -> ProviderResult<()> {
1548 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1549 }
1550
1551 fn save_stage_checkpoint_progress(
1553 &self,
1554 id: StageId,
1555 checkpoint: Vec<u8>,
1556 ) -> ProviderResult<()> {
1557 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1558 }
1559
1560 fn update_pipeline_stages(
1561 &self,
1562 block_number: BlockNumber,
1563 drop_stage_checkpoint: bool,
1564 ) -> ProviderResult<()> {
1565 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1567 for stage_id in StageId::ALL {
1568 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1569 cursor.upsert(
1570 stage_id.to_string(),
1571 &StageCheckpoint {
1572 block_number,
1573 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1574 },
1575 )?;
1576 }
1577
1578 Ok(())
1579 }
1580}
1581
1582impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1583 fn plain_state_storages(
1584 &self,
1585 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1586 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1587 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1588
1589 addresses_with_keys
1590 .into_iter()
1591 .map(|(address, storage)| {
1592 storage
1593 .into_iter()
1594 .map(|key| -> ProviderResult<_> {
1595 Ok(plain_storage
1596 .seek_by_key_subkey(address, key)?
1597 .filter(|v| v.key == key)
1598 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1599 })
1600 .collect::<ProviderResult<Vec<_>>>()
1601 .map(|storage| (address, storage))
1602 })
1603 .collect::<ProviderResult<Vec<(_, _)>>>()
1604 }
1605
1606 fn changed_storages_with_range(
1607 &self,
1608 range: RangeInclusive<BlockNumber>,
1609 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1610 self.tx
1611 .cursor_read::<tables::StorageChangeSets>()?
1612 .walk_range(BlockNumberAddress::range(range))?
1613 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1616 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1617 accounts.entry(address).or_default().insert(storage_entry.key);
1618 Ok(accounts)
1619 })
1620 }
1621
1622 fn changed_storages_and_blocks_with_range(
1623 &self,
1624 range: RangeInclusive<BlockNumber>,
1625 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1626 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1627
1628 let storage_changeset_lists =
1629 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1630 BTreeMap::new(),
1631 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1632 let (index, storage) = entry?;
1633 storages
1634 .entry((index.address(), storage.key))
1635 .or_default()
1636 .push(index.block_number());
1637 Ok(storages)
1638 },
1639 )?;
1640
1641 Ok(storage_changeset_lists)
1642 }
1643}
1644
1645impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1646 for DatabaseProvider<TX, N>
1647{
1648 type Receipt = ReceiptTy<N>;
1649
1650 fn write_state(
1651 &self,
1652 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1653 is_value_known: OriginalValuesKnown,
1654 ) -> ProviderResult<()> {
1655 let first_block = execution_outcome.first_block();
1656 let block_count = execution_outcome.len() as u64;
1657 let last_block = execution_outcome.last_block();
1658 let block_range = first_block..=last_block;
1659
1660 let tip = self.last_block_number()?.max(last_block);
1661
1662 let (plain_state, reverts) =
1663 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1664
1665 self.write_state_reverts(reverts, first_block)?;
1666 self.write_state_changes(plain_state)?;
1667
1668 let block_indices: Vec<_> = self
1670 .block_body_indices_range(block_range)?
1671 .into_iter()
1672 .map(|b| b.first_tx_num)
1673 .collect();
1674
1675 if block_indices.len() < block_count as usize {
1677 let missing_blocks = block_count - block_indices.len() as u64;
1678 return Err(ProviderError::BlockBodyIndicesNotFound(
1679 last_block.saturating_sub(missing_blocks - 1),
1680 ));
1681 }
1682
1683 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
1684
1685 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1686 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1687
1688 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
1696 self.static_file_provider()
1697 .get_highest_static_file_tx(StaticFileSegment::Receipts)
1698 .is_none()) &&
1699 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
1700
1701 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1703 for (_, addresses) in contract_log_pruner.range(..first_block) {
1704 allowed_addresses.extend(addresses.iter().copied());
1705 }
1706
1707 for (idx, (receipts, first_tx_index)) in
1708 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1709 {
1710 let block_number = first_block + idx as u64;
1711
1712 receipts_writer.increment_block(block_number)?;
1714
1715 if prunable_receipts &&
1717 self.prune_modes
1718 .receipts
1719 .is_some_and(|mode| mode.should_prune(block_number, tip))
1720 {
1721 continue
1722 }
1723
1724 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1726 allowed_addresses.extend(new_addresses.iter().copied());
1727 }
1728
1729 for (idx, receipt) in receipts.iter().enumerate() {
1730 let receipt_idx = first_tx_index + idx as u64;
1731 if prunable_receipts &&
1734 has_contract_log_filter &&
1735 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1736 {
1737 continue
1738 }
1739
1740 receipts_writer.append_receipt(receipt_idx, receipt)?;
1741 }
1742 }
1743
1744 Ok(())
1745 }
1746
1747 fn write_state_reverts(
1748 &self,
1749 reverts: PlainStateReverts,
1750 first_block: BlockNumber,
1751 ) -> ProviderResult<()> {
1752 tracing::trace!("Writing storage changes");
1754 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1755 let mut storage_changeset_cursor =
1756 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1757 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1758 let block_number = first_block + block_index as BlockNumber;
1759
1760 tracing::trace!(block_number, "Writing block change");
1761 storage_changes.par_sort_unstable_by_key(|a| a.address);
1763 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1764 let storage_id = BlockNumberAddress((block_number, address));
1765
1766 let mut storage = storage_revert
1767 .into_iter()
1768 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1769 .collect::<Vec<_>>();
1770 storage.par_sort_unstable_by_key(|a| a.0);
1772
1773 let mut wiped_storage = Vec::new();
1781 if wiped {
1782 tracing::trace!(?address, "Wiping storage");
1783 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1784 wiped_storage.push((entry.key, entry.value));
1785 while let Some(entry) = storages_cursor.next_dup_val()? {
1786 wiped_storage.push((entry.key, entry.value))
1787 }
1788 }
1789 }
1790
1791 tracing::trace!(?address, ?storage, "Writing storage reverts");
1792 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1793 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1794 }
1795 }
1796 }
1797
1798 tracing::trace!("Writing account changes");
1800 let mut account_changeset_cursor =
1801 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1802
1803 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1804 let block_number = first_block + block_index as BlockNumber;
1805 account_block_reverts.par_sort_by_key(|a| a.0);
1807
1808 for (address, info) in account_block_reverts {
1809 account_changeset_cursor.append_dup(
1810 block_number,
1811 AccountBeforeTx { address, info: info.map(Into::into) },
1812 )?;
1813 }
1814 }
1815
1816 Ok(())
1817 }
1818
1819 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1820 changes.accounts.par_sort_by_key(|a| a.0);
1823 changes.storage.par_sort_by_key(|a| a.address);
1824 changes.contracts.par_sort_by_key(|a| a.0);
1825
1826 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1828 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1829 for (address, account) in changes.accounts {
1831 if let Some(account) = account {
1832 tracing::trace!(?address, "Updating plain state account");
1833 accounts_cursor.upsert(address, &account.into())?;
1834 } else if accounts_cursor.seek_exact(address)?.is_some() {
1835 tracing::trace!(?address, "Deleting plain state account");
1836 accounts_cursor.delete_current()?;
1837 }
1838 }
1839
1840 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1842 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1843 for (hash, bytecode) in changes.contracts {
1844 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1845 }
1846
1847 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1849 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1850 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1851 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1853 storages_cursor.delete_current_duplicates()?;
1854 }
1855 let mut storage = storage
1857 .into_iter()
1858 .map(|(k, value)| StorageEntry { key: k.into(), value })
1859 .collect::<Vec<_>>();
1860 storage.par_sort_unstable_by_key(|a| a.key);
1862
1863 for entry in storage {
1864 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1865 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1866 db_entry.key == entry.key
1867 {
1868 storages_cursor.delete_current()?;
1869 }
1870
1871 if !entry.value.is_zero() {
1872 storages_cursor.upsert(address, &entry)?;
1873 }
1874 }
1875 }
1876
1877 Ok(())
1878 }
1879
1880 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1881 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1883 for (hashed_address, account) in hashed_state.accounts() {
1884 if let Some(account) = account {
1885 hashed_accounts_cursor.upsert(*hashed_address, account)?;
1886 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
1887 hashed_accounts_cursor.delete_current()?;
1888 }
1889 }
1890
1891 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1893 let mut hashed_storage_cursor =
1894 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1895 for (hashed_address, storage) in sorted_storages {
1896 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1897 hashed_storage_cursor.delete_current_duplicates()?;
1898 }
1899
1900 for (hashed_slot, value) in storage.storage_slots_ref() {
1901 let entry = StorageEntry { key: *hashed_slot, value: *value };
1902
1903 if let Some(db_entry) =
1904 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1905 db_entry.key == entry.key
1906 {
1907 hashed_storage_cursor.delete_current()?;
1908 }
1909
1910 if !entry.value.is_zero() {
1911 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1912 }
1913 }
1914 }
1915
1916 Ok(())
1917 }
1918
1919 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1941 let range = block + 1..=self.last_block_number()?;
1942
1943 if range.is_empty() {
1944 return Ok(());
1945 }
1946
1947 let block_bodies = self.block_body_indices_range(range.clone())?;
1949
1950 let from_transaction_num =
1952 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1953
1954 let storage_range = BlockNumberAddress::range(range.clone());
1955
1956 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1957 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1958
1959 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1964 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1965
1966 let (state, _) = self.populate_bundle_state(
1967 account_changeset,
1968 storage_changeset,
1969 &mut plain_accounts_cursor,
1970 &mut plain_storage_cursor,
1971 )?;
1972
1973 for (address, (old_account, new_account, storage)) in &state {
1975 if old_account != new_account {
1977 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1978 if let Some(account) = old_account {
1979 plain_accounts_cursor.upsert(*address, account)?;
1980 } else if existing_entry.is_some() {
1981 plain_accounts_cursor.delete_current()?;
1982 }
1983 }
1984
1985 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1987 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1988 if plain_storage_cursor
1990 .seek_by_key_subkey(*address, *storage_key)?
1991 .filter(|s| s.key == *storage_key)
1992 .is_some()
1993 {
1994 plain_storage_cursor.delete_current()?
1995 }
1996
1997 if !old_storage_value.is_zero() {
1999 plain_storage_cursor.upsert(*address, &storage_entry)?;
2000 }
2001 }
2002 }
2003
2004 self.remove_receipts_from(from_transaction_num, block)?;
2005
2006 Ok(())
2007 }
2008
2009 fn take_state_above(
2031 &self,
2032 block: BlockNumber,
2033 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2034 let range = block + 1..=self.last_block_number()?;
2035
2036 if range.is_empty() {
2037 return Ok(ExecutionOutcome::default())
2038 }
2039 let start_block_number = *range.start();
2040
2041 let block_bodies = self.block_body_indices_range(range.clone())?;
2043
2044 let from_transaction_num =
2046 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2047 let to_transaction_num =
2048 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2049
2050 let storage_range = BlockNumberAddress::range(range.clone());
2051
2052 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2053 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2054
2055 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2060 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2061
2062 let (state, reverts) = self.populate_bundle_state(
2065 account_changeset,
2066 storage_changeset,
2067 &mut plain_accounts_cursor,
2068 &mut plain_storage_cursor,
2069 )?;
2070
2071 for (address, (old_account, new_account, storage)) in &state {
2073 if old_account != new_account {
2075 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2076 if let Some(account) = old_account {
2077 plain_accounts_cursor.upsert(*address, account)?;
2078 } else if existing_entry.is_some() {
2079 plain_accounts_cursor.delete_current()?;
2080 }
2081 }
2082
2083 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2085 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2086 if plain_storage_cursor
2088 .seek_by_key_subkey(*address, *storage_key)?
2089 .filter(|s| s.key == *storage_key)
2090 .is_some()
2091 {
2092 plain_storage_cursor.delete_current()?
2093 }
2094
2095 if !old_storage_value.is_zero() {
2097 plain_storage_cursor.upsert(*address, &storage_entry)?;
2098 }
2099 }
2100 }
2101
2102 let mut receipts_iter = self
2104 .static_file_provider
2105 .get_range_with_static_file_or_database(
2106 StaticFileSegment::Receipts,
2107 from_transaction_num..to_transaction_num + 1,
2108 |static_file, range, _| {
2109 static_file
2110 .receipts_by_tx_range(range.clone())
2111 .map(|r| range.into_iter().zip(r).collect())
2112 },
2113 |range, _| {
2114 self.tx
2115 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2116 .walk_range(range)?
2117 .map(|r| r.map_err(Into::into))
2118 .collect()
2119 },
2120 |_| true,
2121 )?
2122 .into_iter()
2123 .peekable();
2124
2125 let mut receipts = Vec::with_capacity(block_bodies.len());
2126 for block_body in block_bodies {
2128 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2129 for num in block_body.tx_num_range() {
2130 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2131 block_receipts.push(receipts_iter.next().unwrap().1);
2132 }
2133 }
2134 receipts.push(block_receipts);
2135 }
2136
2137 self.remove_receipts_from(from_transaction_num, block)?;
2138
2139 Ok(ExecutionOutcome::new_init(
2140 state,
2141 reverts,
2142 Vec::new(),
2143 receipts,
2144 start_block_number,
2145 Vec::new(),
2146 ))
2147 }
2148}
2149
2150impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2151 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2155 if trie_updates.is_empty() {
2156 return Ok(0)
2157 }
2158
2159 let mut num_entries = 0;
2161
2162 let tx = self.tx_ref();
2163 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2164
2165 for (key, updated_node) in trie_updates.account_nodes_ref() {
2167 let nibbles = StoredNibbles(*key);
2168 match updated_node {
2169 Some(node) => {
2170 if !nibbles.0.is_empty() {
2171 num_entries += 1;
2172 account_trie_cursor.upsert(nibbles, node)?;
2173 }
2174 }
2175 None => {
2176 num_entries += 1;
2177 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2178 account_trie_cursor.delete_current()?;
2179 }
2180 }
2181 }
2182 }
2183
2184 num_entries +=
2185 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2186
2187 Ok(num_entries)
2188 }
2189
2190 fn write_trie_changesets(
2198 &self,
2199 block_number: BlockNumber,
2200 trie_updates: &TrieUpdatesSorted,
2201 updates_overlay: Option<&TrieUpdatesSorted>,
2202 ) -> ProviderResult<usize> {
2203 let mut num_entries = 0;
2204
2205 let mut changeset_cursor =
2206 self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2207 let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2208
2209 let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2211
2212 let empty_updates = TrieUpdatesSorted::default();
2214 let overlay = updates_overlay.unwrap_or(&empty_updates);
2215
2216 let mut in_memory_account_cursor =
2218 InMemoryTrieCursor::new_account(&mut db_account_cursor, overlay);
2219
2220 for (path, _) in trie_updates.account_nodes_ref() {
2221 num_entries += 1;
2222 let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2223 changeset_cursor.append_dup(
2224 block_number,
2225 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2226 )?;
2227 }
2228
2229 let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2230 storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2231
2232 num_entries += self.write_storage_trie_changesets(
2233 block_number,
2234 storage_updates.into_iter(),
2235 updates_overlay,
2236 )?;
2237
2238 Ok(num_entries)
2239 }
2240
2241 fn clear_trie_changesets(&self) -> ProviderResult<()> {
2242 let tx = self.tx_ref();
2243 tx.clear::<tables::AccountsTrieChangeSets>()?;
2244 tx.clear::<tables::StoragesTrieChangeSets>()?;
2245 Ok(())
2246 }
2247
2248 fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2249 let tx = self.tx_ref();
2250 {
2251 let range = from..;
2252 let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2253 let mut walker = cursor.walk_range(range)?;
2254
2255 while walker.next().transpose()?.is_some() {
2256 walker.delete_current()?;
2257 }
2258 }
2259
2260 {
2261 let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2262 let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2263 let mut walker = cursor.walk_range(range)?;
2264
2265 while walker.next().transpose()?.is_some() {
2266 walker.delete_current()?;
2267 }
2268 }
2269
2270 Ok(())
2271 }
2272}
2273
2274impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2275 fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2276 let tx = self.tx_ref();
2277
2278 let mut account_nodes = Vec::new();
2281 let mut seen_account_keys = HashSet::new();
2282 let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2283
2284 for entry in accounts_cursor.walk_range(from..)? {
2285 let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2286 if seen_account_keys.insert(nibbles.0) {
2288 account_nodes.push((nibbles.0, node));
2289 }
2290 }
2291
2292 account_nodes.sort_by_key(|(path, _)| *path);
2293
2294 let mut storage_tries = B256Map::<Vec<_>>::default();
2298 let mut seen_storage_keys = HashSet::new();
2299 let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2300
2301 let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2303
2304 for entry in storages_cursor.walk_range(storage_range_start..)? {
2305 let (
2306 BlockNumberHashedAddress((_, hashed_address)),
2307 TrieChangeSetsEntry { nibbles, node },
2308 ) = entry?;
2309
2310 if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2312 storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2313 }
2314 }
2315
2316 let storage_tries = storage_tries
2318 .into_iter()
2319 .map(|(address, mut nodes)| {
2320 nodes.sort_by_key(|(path, _)| *path);
2321 (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2322 })
2323 .collect();
2324
2325 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2326 }
2327
2328 fn get_block_trie_updates(
2329 &self,
2330 block_number: BlockNumber,
2331 ) -> ProviderResult<TrieUpdatesSorted> {
2332 let tx = self.tx_ref();
2333
2334 let reverts = self.trie_reverts(block_number + 1)?;
2336
2337 let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2340 let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2341
2342 let mut account_nodes = Vec::new();
2344
2345 let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2347 let mut account_cursor = cursor_factory.account_trie_cursor()?;
2348
2349 for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2350 let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2351 let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2353 account_nodes.push((nibbles.0, node_value));
2354 }
2355
2356 let mut storage_tries = B256Map::default();
2358 let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2359 let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2360 let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2361
2362 let mut current_hashed_address = None;
2363 let mut storage_cursor = None;
2364
2365 for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2366 let (
2367 BlockNumberHashedAddress((_, hashed_address)),
2368 TrieChangeSetsEntry { nibbles, .. },
2369 ) = entry?;
2370
2371 if current_hashed_address != Some(hashed_address) {
2373 storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2374 current_hashed_address = Some(hashed_address);
2375 }
2376
2377 let cursor =
2379 storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2380 let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2381 storage_tries
2382 .entry(hashed_address)
2383 .or_insert_with(|| StorageTrieUpdatesSorted {
2384 storage_nodes: Vec::new(),
2385 is_deleted: false,
2386 })
2387 .storage_nodes
2388 .push((nibbles.0, node_value));
2389 }
2390
2391 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2392 }
2393}
2394
2395impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2396 fn write_storage_trie_updates_sorted<'a>(
2402 &self,
2403 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2404 ) -> ProviderResult<usize> {
2405 let mut num_entries = 0;
2406 let mut storage_tries = storage_tries.collect::<Vec<_>>();
2407 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2408 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2409 for (hashed_address, storage_trie_updates) in storage_tries {
2410 let mut db_storage_trie_cursor =
2411 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2412 num_entries +=
2413 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2414 cursor = db_storage_trie_cursor.cursor;
2415 }
2416
2417 Ok(num_entries)
2418 }
2419
2420 fn write_storage_trie_changesets<'a>(
2428 &self,
2429 block_number: BlockNumber,
2430 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2431 updates_overlay: Option<&TrieUpdatesSorted>,
2432 ) -> ProviderResult<usize> {
2433 let mut num_written = 0;
2434
2435 let mut changeset_cursor =
2436 self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2437
2438 let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2442 let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2443
2444 let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2448 changed_curr_values_cursor,
2449 B256::default(), );
2451 let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2452 wiped_nodes_cursor,
2453 B256::default(), );
2455
2456 let empty_updates = TrieUpdatesSorted::default();
2458
2459 for (hashed_address, storage_trie_updates) in storage_tries {
2460 let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2461
2462 changed_curr_values_cursor =
2464 DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2465
2466 let overlay = updates_overlay.unwrap_or(&empty_updates);
2468
2469 let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
2471 &mut changed_curr_values_cursor,
2472 overlay,
2473 *hashed_address,
2474 );
2475
2476 let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2479 storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2480 &mut in_memory_changed_cursor,
2481 )?;
2482
2483 if storage_trie_updates.is_deleted() {
2484 wiped_nodes_cursor =
2487 DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2488
2489 let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
2491 &mut wiped_nodes_cursor,
2492 overlay,
2493 *hashed_address,
2494 );
2495
2496 let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2497
2498 for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2499 let (path, node) = wiped?;
2500 num_written += 1;
2501 changeset_cursor.append_dup(
2502 changeset_key,
2503 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2504 )?;
2505 }
2506 } else {
2507 for curr_value in curr_values_of_changed {
2508 let (path, node) = curr_value?;
2509 num_written += 1;
2510 changeset_cursor.append_dup(
2511 changeset_key,
2512 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2513 )?;
2514 }
2515 }
2516 }
2517
2518 Ok(num_written)
2519 }
2520}
2521
2522impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2523 fn unwind_account_hashing<'a>(
2524 &self,
2525 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2526 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2527 let hashed_accounts = changesets
2531 .into_iter()
2532 .map(|(_, e)| (keccak256(e.address), e.info))
2533 .collect::<Vec<_>>()
2534 .into_iter()
2535 .rev()
2536 .collect::<BTreeMap<_, _>>();
2537
2538 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2540 for (hashed_address, account) in &hashed_accounts {
2541 if let Some(account) = account {
2542 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2543 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2544 hashed_accounts_cursor.delete_current()?;
2545 }
2546 }
2547
2548 Ok(hashed_accounts)
2549 }
2550
2551 fn unwind_account_hashing_range(
2552 &self,
2553 range: impl RangeBounds<BlockNumber>,
2554 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2555 let changesets = self
2556 .tx
2557 .cursor_read::<tables::AccountChangeSets>()?
2558 .walk_range(range)?
2559 .collect::<Result<Vec<_>, _>>()?;
2560 self.unwind_account_hashing(changesets.iter())
2561 }
2562
2563 fn insert_account_for_hashing(
2564 &self,
2565 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2566 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2567 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2568 let hashed_accounts =
2569 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2570 for (hashed_address, account) in &hashed_accounts {
2571 if let Some(account) = account {
2572 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2573 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2574 hashed_accounts_cursor.delete_current()?;
2575 }
2576 }
2577 Ok(hashed_accounts)
2578 }
2579
2580 fn unwind_storage_hashing(
2581 &self,
2582 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2583 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2584 let mut hashed_storages = changesets
2586 .into_iter()
2587 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2588 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2589 })
2590 .collect::<Vec<_>>();
2591 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2592
2593 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2595 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2596 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2597 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2598 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2599
2600 if hashed_storage
2601 .seek_by_key_subkey(hashed_address, key)?
2602 .filter(|entry| entry.key == key)
2603 .is_some()
2604 {
2605 hashed_storage.delete_current()?;
2606 }
2607
2608 if !value.is_zero() {
2609 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2610 }
2611 }
2612 Ok(hashed_storage_keys)
2613 }
2614
2615 fn unwind_storage_hashing_range(
2616 &self,
2617 range: impl RangeBounds<BlockNumberAddress>,
2618 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2619 let changesets = self
2620 .tx
2621 .cursor_read::<tables::StorageChangeSets>()?
2622 .walk_range(range)?
2623 .collect::<Result<Vec<_>, _>>()?;
2624 self.unwind_storage_hashing(changesets.into_iter())
2625 }
2626
2627 fn insert_storage_for_hashing(
2628 &self,
2629 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2630 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2631 let hashed_storages =
2633 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2634 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2635 map.insert(keccak256(entry.key), entry.value);
2636 map
2637 });
2638 map.insert(keccak256(address), storage);
2639 map
2640 });
2641
2642 let hashed_storage_keys = hashed_storages
2643 .iter()
2644 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2645 .collect();
2646
2647 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2648 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2651 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2652 if hashed_storage_cursor
2653 .seek_by_key_subkey(hashed_address, key)?
2654 .filter(|entry| entry.key == key)
2655 .is_some()
2656 {
2657 hashed_storage_cursor.delete_current()?;
2658 }
2659
2660 if !value.is_zero() {
2661 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2662 }
2663 Ok(())
2664 })
2665 })?;
2666
2667 Ok(hashed_storage_keys)
2668 }
2669}
2670
2671impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2672 fn unwind_account_history_indices<'a>(
2673 &self,
2674 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2675 ) -> ProviderResult<usize> {
2676 let mut last_indices = changesets
2677 .into_iter()
2678 .map(|(index, account)| (account.address, *index))
2679 .collect::<Vec<_>>();
2680 last_indices.sort_by_key(|(a, _)| *a);
2681
2682 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2684 for &(address, rem_index) in &last_indices {
2685 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2686 &mut cursor,
2687 ShardedKey::last(address),
2688 rem_index,
2689 |sharded_key| sharded_key.key == address,
2690 )?;
2691
2692 if !partial_shard.is_empty() {
2695 cursor.insert(
2696 ShardedKey::last(address),
2697 &BlockNumberList::new_pre_sorted(partial_shard),
2698 )?;
2699 }
2700 }
2701
2702 let changesets = last_indices.len();
2703 Ok(changesets)
2704 }
2705
2706 fn unwind_account_history_indices_range(
2707 &self,
2708 range: impl RangeBounds<BlockNumber>,
2709 ) -> ProviderResult<usize> {
2710 let changesets = self
2711 .tx
2712 .cursor_read::<tables::AccountChangeSets>()?
2713 .walk_range(range)?
2714 .collect::<Result<Vec<_>, _>>()?;
2715 self.unwind_account_history_indices(changesets.iter())
2716 }
2717
2718 fn insert_account_history_index(
2719 &self,
2720 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2721 ) -> ProviderResult<()> {
2722 self.append_history_index::<_, tables::AccountsHistory>(
2723 account_transitions,
2724 ShardedKey::new,
2725 )
2726 }
2727
2728 fn unwind_storage_history_indices(
2729 &self,
2730 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2731 ) -> ProviderResult<usize> {
2732 let mut storage_changesets = changesets
2733 .into_iter()
2734 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2735 .collect::<Vec<_>>();
2736 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2737
2738 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2739 for &(address, storage_key, rem_index) in &storage_changesets {
2740 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2741 &mut cursor,
2742 StorageShardedKey::last(address, storage_key),
2743 rem_index,
2744 |storage_sharded_key| {
2745 storage_sharded_key.address == address &&
2746 storage_sharded_key.sharded_key.key == storage_key
2747 },
2748 )?;
2749
2750 if !partial_shard.is_empty() {
2753 cursor.insert(
2754 StorageShardedKey::last(address, storage_key),
2755 &BlockNumberList::new_pre_sorted(partial_shard),
2756 )?;
2757 }
2758 }
2759
2760 let changesets = storage_changesets.len();
2761 Ok(changesets)
2762 }
2763
2764 fn unwind_storage_history_indices_range(
2765 &self,
2766 range: impl RangeBounds<BlockNumberAddress>,
2767 ) -> ProviderResult<usize> {
2768 let changesets = self
2769 .tx
2770 .cursor_read::<tables::StorageChangeSets>()?
2771 .walk_range(range)?
2772 .collect::<Result<Vec<_>, _>>()?;
2773 self.unwind_storage_history_indices(changesets.into_iter())
2774 }
2775
2776 fn insert_storage_history_index(
2777 &self,
2778 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2779 ) -> ProviderResult<()> {
2780 self.append_history_index::<_, tables::StoragesHistory>(
2781 storage_transitions,
2782 |(address, storage_key), highest_block_number| {
2783 StorageShardedKey::new(address, storage_key, highest_block_number)
2784 },
2785 )
2786 }
2787
2788 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2789 {
2791 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2792 self.insert_account_history_index(indices)?;
2793 }
2794
2795 {
2797 let indices = self.changed_storages_and_blocks_with_range(range)?;
2798 self.insert_storage_history_index(indices)?;
2799 }
2800
2801 Ok(())
2802 }
2803}
2804
2805impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2806 for DatabaseProvider<TX, N>
2807{
2808 fn take_block_and_execution_above(
2809 &self,
2810 block: BlockNumber,
2811 ) -> ProviderResult<Chain<Self::Primitives>> {
2812 let range = block + 1..=self.last_block_number()?;
2813
2814 self.unwind_trie_state_from(block + 1)?;
2815
2816 let execution_state = self.take_state_above(block)?;
2818
2819 let blocks = self.recovered_block_range(range)?;
2820
2821 self.remove_blocks_above(block)?;
2824
2825 self.update_pipeline_stages(block, true)?;
2827
2828 Ok(Chain::new(blocks, execution_state, None))
2829 }
2830
2831 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2832 self.unwind_trie_state_from(block + 1)?;
2833
2834 self.remove_state_above(block)?;
2836
2837 self.remove_blocks_above(block)?;
2840
2841 self.update_pipeline_stages(block, true)?;
2843
2844 Ok(())
2845 }
2846}
2847
2848impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2849 for DatabaseProvider<TX, N>
2850{
2851 type Block = BlockTy<N>;
2852 type Receipt = ReceiptTy<N>;
2853
2854 fn insert_block(
2876 &self,
2877 block: &RecoveredBlock<Self::Block>,
2878 ) -> ProviderResult<StoredBlockBodyIndices> {
2879 let block_number = block.number();
2880 let tx_count = block.body().transaction_count() as u64;
2881
2882 let mut durations_recorder = metrics::DurationsRecorder::default();
2883
2884 self.static_file_provider
2885 .get_writer(block_number, StaticFileSegment::Headers)?
2886 .append_header(block.header(), &block.hash())?;
2887
2888 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2889 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2890
2891 let first_tx_num = self
2892 .tx
2893 .cursor_read::<tables::TransactionBlocks>()?
2894 .last()?
2895 .map(|(n, _)| n + 1)
2896 .unwrap_or_default();
2897 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2898
2899 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
2900
2901 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2902 let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
2903 senders_writer.increment_block(block.number())?;
2904 senders_writer
2905 .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
2906 durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
2907 }
2908
2909 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2910 for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
2911 let hash = transaction.tx_hash();
2912 self.tx.put::<tables::TransactionHashNumbers>(*hash, tx_num)?;
2913 }
2914 durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
2915 }
2916
2917 self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
2918
2919 debug!(
2920 target: "providers::db",
2921 ?block_number,
2922 actions = ?durations_recorder.actions,
2923 "Inserted block"
2924 );
2925
2926 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2927 }
2928
2929 fn append_block_bodies(
2930 &self,
2931 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
2932 ) -> ProviderResult<()> {
2933 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2934
2935 let mut tx_writer =
2937 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2938
2939 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2940 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2941
2942 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2944
2945 for (block_number, body) in &bodies {
2946 tx_writer.increment_block(*block_number)?;
2948
2949 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2950 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2951
2952 let mut durations_recorder = metrics::DurationsRecorder::default();
2953
2954 block_indices_cursor.append(*block_number, &block_indices)?;
2956
2957 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2958
2959 let Some(body) = body else { continue };
2960
2961 if !body.transactions().is_empty() {
2963 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2964 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2965 }
2966
2967 for transaction in body.transactions() {
2969 tx_writer.append_transaction(next_tx_num, transaction)?;
2970
2971 next_tx_num += 1;
2973 }
2974 }
2975
2976 self.storage.writer().write_block_bodies(self, bodies)?;
2977
2978 Ok(())
2979 }
2980
2981 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2982 let last_block_number = self.last_block_number()?;
2983 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
2985 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2986 }
2987
2988 let highest_static_file_block = self
2990 .static_file_provider()
2991 .get_highest_static_file_block(StaticFileSegment::Headers)
2992 .expect("todo: error handling, headers should exist");
2993
2994 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3000 self.static_file_provider()
3001 .get_writer(block, StaticFileSegment::Headers)?
3002 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3003
3004 let unwind_tx_from = self
3006 .block_body_indices(block)?
3007 .map(|b| b.next_tx_num())
3008 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3009
3010 let unwind_tx_to = self
3012 .tx
3013 .cursor_read::<tables::BlockBodyIndices>()?
3014 .last()?
3015 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3017 .1
3018 .last_tx_num();
3019
3020 if unwind_tx_from <= unwind_tx_to {
3021 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3022 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3023 }
3024 }
3025
3026 EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3027
3028 self.remove_bodies_above(block)?;
3029
3030 Ok(())
3031 }
3032
3033 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3034 self.storage.writer().remove_block_bodies_above(self, block)?;
3035
3036 let unwind_tx_from = self
3038 .block_body_indices(block)?
3039 .map(|b| b.next_tx_num())
3040 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3041
3042 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3043 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3044
3045 let static_file_tx_num =
3046 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3047
3048 let to_delete = static_file_tx_num
3049 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3050 .unwrap_or_default();
3051
3052 self.static_file_provider
3053 .latest_writer(StaticFileSegment::Transactions)?
3054 .prune_transactions(to_delete, block)?;
3055
3056 Ok(())
3057 }
3058
3059 fn append_blocks_with_state(
3061 &self,
3062 blocks: Vec<RecoveredBlock<Self::Block>>,
3063 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3064 hashed_state: HashedPostStateSorted,
3065 ) -> ProviderResult<()> {
3066 if blocks.is_empty() {
3067 debug!(target: "providers::db", "Attempted to append empty block range");
3068 return Ok(())
3069 }
3070
3071 let first_number = blocks[0].number();
3074
3075 let last_block_number = blocks[blocks.len() - 1].number();
3078
3079 let mut durations_recorder = metrics::DurationsRecorder::default();
3080
3081 for block in blocks {
3083 self.insert_block(&block)?;
3084 durations_recorder.record_relative(metrics::Action::InsertBlock);
3085 }
3086
3087 self.write_state(execution_outcome, OriginalValuesKnown::No)?;
3088 durations_recorder.record_relative(metrics::Action::InsertState);
3089
3090 self.write_hashed_state(&hashed_state)?;
3092 durations_recorder.record_relative(metrics::Action::InsertHashes);
3093
3094 self.update_history_indices(first_number..=last_block_number)?;
3095 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3096
3097 self.update_pipeline_stages(last_block_number, false)?;
3099 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3100
3101 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3102
3103 Ok(())
3104 }
3105}
3106
3107impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3108 fn get_prune_checkpoint(
3109 &self,
3110 segment: PruneSegment,
3111 ) -> ProviderResult<Option<PruneCheckpoint>> {
3112 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3113 }
3114
3115 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3116 Ok(PruneSegment::variants()
3117 .filter_map(|segment| {
3118 self.tx
3119 .get::<tables::PruneCheckpoints>(segment)
3120 .transpose()
3121 .map(|chk| chk.map(|chk| (segment, chk)))
3122 })
3123 .collect::<Result<_, _>>()?)
3124 }
3125}
3126
3127impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3128 fn save_prune_checkpoint(
3129 &self,
3130 segment: PruneSegment,
3131 checkpoint: PruneCheckpoint,
3132 ) -> ProviderResult<()> {
3133 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3134 }
3135}
3136
3137impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3138 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3139 let db_entries = self.tx.entries::<T>()?;
3140 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3141 Ok(entries) => entries,
3142 Err(ProviderError::UnsupportedProvider) => 0,
3143 Err(err) => return Err(err),
3144 };
3145
3146 Ok(db_entries + static_file_entries)
3147 }
3148}
3149
3150impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3151 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3152 let mut finalized_blocks = self
3153 .tx
3154 .cursor_read::<tables::ChainState>()?
3155 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3156 .take(1)
3157 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3158
3159 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3160 Ok(last_finalized_block_number)
3161 }
3162
3163 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3164 let mut finalized_blocks = self
3165 .tx
3166 .cursor_read::<tables::ChainState>()?
3167 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3168 .take(1)
3169 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3170
3171 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3172 Ok(last_finalized_block_number)
3173 }
3174}
3175
3176impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3177 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3178 Ok(self
3179 .tx
3180 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3181 }
3182
3183 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3184 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3185 }
3186}
3187
3188impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3189 type Tx = TX;
3190
3191 fn tx_ref(&self) -> &Self::Tx {
3192 &self.tx
3193 }
3194
3195 fn tx_mut(&mut self) -> &mut Self::Tx {
3196 &mut self.tx
3197 }
3198
3199 fn into_tx(self) -> Self::Tx {
3200 self.tx
3201 }
3202
3203 fn prune_modes_ref(&self) -> &PruneModes {
3204 self.prune_modes_ref()
3205 }
3206
3207 fn commit(self) -> ProviderResult<bool> {
3209 if self.static_file_provider.has_unwind_queued() {
3214 self.tx.commit()?;
3215
3216 #[cfg(all(unix, feature = "rocksdb"))]
3217 {
3218 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3219 for batch in batches {
3220 self.rocksdb_provider.commit_batch(batch)?;
3221 }
3222 }
3223
3224 self.static_file_provider.commit()?;
3225 } else {
3226 self.static_file_provider.commit()?;
3227
3228 #[cfg(all(unix, feature = "rocksdb"))]
3229 {
3230 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3231 for batch in batches {
3232 self.rocksdb_provider.commit_batch(batch)?;
3233 }
3234 }
3235
3236 self.tx.commit()?;
3237 }
3238
3239 Ok(true)
3240 }
3241}
3242
3243impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3244 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3245 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3246 }
3247}
3248
3249impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3250 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3251 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3252 }
3253}
3254
3255impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3256 fn cached_storage_settings(&self) -> StorageSettings {
3257 *self.storage_settings.read()
3258 }
3259
3260 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3261 *self.storage_settings.write() = settings;
3262 }
3263}
3264
3265#[cfg(test)]
3266mod tests {
3267 use super::*;
3268 use crate::{
3269 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3270 BlockWriter,
3271 };
3272 use reth_ethereum_primitives::Receipt;
3273 use reth_testing_utils::generators::{self, random_block, BlockParams};
3274 use reth_trie::Nibbles;
3275
3276 #[test]
3277 fn test_receipts_by_block_range_empty_range() {
3278 let factory = create_test_provider_factory();
3279 let provider = factory.provider().unwrap();
3280
3281 let start = 10u64;
3283 let end = 9u64;
3284 let result = provider.receipts_by_block_range(start..=end).unwrap();
3285 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3286 }
3287
3288 #[test]
3289 fn test_receipts_by_block_range_nonexistent_blocks() {
3290 let factory = create_test_provider_factory();
3291 let provider = factory.provider().unwrap();
3292
3293 let result = provider.receipts_by_block_range(10..=12).unwrap();
3295 assert_eq!(result, vec![vec![], vec![], vec![]]);
3296 }
3297
3298 #[test]
3299 fn test_receipts_by_block_range_single_block() {
3300 let factory = create_test_provider_factory();
3301 let data = BlockchainTestData::default();
3302
3303 let provider_rw = factory.provider_rw().unwrap();
3304 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3305 provider_rw
3306 .write_state(
3307 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3308 crate::OriginalValuesKnown::No,
3309 )
3310 .unwrap();
3311 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3312 provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3313 provider_rw.commit().unwrap();
3314
3315 let provider = factory.provider().unwrap();
3316 let result = provider.receipts_by_block_range(1..=1).unwrap();
3317
3318 assert_eq!(result.len(), 1);
3320 assert_eq!(result[0].len(), 1);
3321 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3322 }
3323
3324 #[test]
3325 fn test_receipts_by_block_range_multiple_blocks() {
3326 let factory = create_test_provider_factory();
3327 let data = BlockchainTestData::default();
3328
3329 let provider_rw = factory.provider_rw().unwrap();
3330 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3331 provider_rw
3332 .write_state(
3333 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3334 crate::OriginalValuesKnown::No,
3335 )
3336 .unwrap();
3337 for i in 0..3 {
3338 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3339 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3340 }
3341 provider_rw.commit().unwrap();
3342
3343 let provider = factory.provider().unwrap();
3344 let result = provider.receipts_by_block_range(1..=3).unwrap();
3345
3346 assert_eq!(result.len(), 3);
3348 for (i, block_receipts) in result.iter().enumerate() {
3349 assert_eq!(block_receipts.len(), 1);
3350 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3351 }
3352 }
3353
3354 #[test]
3355 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3356 let factory = create_test_provider_factory();
3357 let data = BlockchainTestData::default();
3358
3359 let provider_rw = factory.provider_rw().unwrap();
3360 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3361 provider_rw
3362 .write_state(
3363 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3364 crate::OriginalValuesKnown::No,
3365 )
3366 .unwrap();
3367
3368 for i in 0..3 {
3370 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3371 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3372 }
3373 provider_rw.commit().unwrap();
3374
3375 let provider = factory.provider().unwrap();
3376 let result = provider.receipts_by_block_range(1..=3).unwrap();
3377
3378 assert_eq!(result.len(), 3);
3380 for block_receipts in &result {
3381 assert_eq!(block_receipts.len(), 1);
3382 }
3383 }
3384
3385 #[test]
3386 fn test_receipts_by_block_range_partial_range() {
3387 let factory = create_test_provider_factory();
3388 let data = BlockchainTestData::default();
3389
3390 let provider_rw = factory.provider_rw().unwrap();
3391 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3392 provider_rw
3393 .write_state(
3394 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3395 crate::OriginalValuesKnown::No,
3396 )
3397 .unwrap();
3398 for i in 0..3 {
3399 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3400 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3401 }
3402 provider_rw.commit().unwrap();
3403
3404 let provider = factory.provider().unwrap();
3405
3406 let result = provider.receipts_by_block_range(2..=5).unwrap();
3408 assert_eq!(result.len(), 4);
3409
3410 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3417 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3418 }
3419
3420 #[test]
3421 fn test_receipts_by_block_range_all_empty_blocks() {
3422 let factory = create_test_provider_factory();
3423 let mut rng = generators::rng();
3424
3425 let mut blocks = Vec::new();
3427 for i in 0..3 {
3428 let block =
3429 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3430 blocks.push(block);
3431 }
3432
3433 let provider_rw = factory.provider_rw().unwrap();
3434 for block in blocks {
3435 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
3436 }
3437 provider_rw.commit().unwrap();
3438
3439 let provider = factory.provider().unwrap();
3440 let result = provider.receipts_by_block_range(1..=3).unwrap();
3441
3442 assert_eq!(result.len(), 3);
3443 for block_receipts in result {
3444 assert_eq!(block_receipts.len(), 0);
3445 }
3446 }
3447
3448 #[test]
3449 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3450 let factory = create_test_provider_factory();
3451 let data = BlockchainTestData::default();
3452
3453 let provider_rw = factory.provider_rw().unwrap();
3454 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3455 provider_rw
3456 .write_state(
3457 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3458 crate::OriginalValuesKnown::No,
3459 )
3460 .unwrap();
3461 for i in 0..3 {
3462 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3463 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3464 }
3465 provider_rw.commit().unwrap();
3466
3467 let provider = factory.provider().unwrap();
3468
3469 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3471
3472 let mut individual_results = Vec::new();
3474 for block_num in 1..=3 {
3475 let receipts =
3476 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3477 individual_results.push(receipts);
3478 }
3479
3480 assert_eq!(range_result, individual_results);
3481 }
3482
3483 #[test]
3484 fn test_write_trie_changesets() {
3485 use reth_db_api::models::BlockNumberHashedAddress;
3486 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3487
3488 let factory = create_test_provider_factory();
3489 let provider_rw = factory.provider_rw().unwrap();
3490
3491 let block_number = 1u64;
3492
3493 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3495 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3496
3497 let node1 = BranchNodeCompact::new(
3498 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3504
3505 {
3507 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3508 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3509 }
3510
3511 let account_nodes = vec![
3513 (account_nibbles1, Some(node1.clone())), (account_nibbles2, None), ];
3516
3517 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3522 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3523 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3524
3525 let storage_node1 = BranchNodeCompact::new(
3526 0b1111_0000_0000_0000,
3527 0b0000_0000_0000_0000,
3528 0b0000_0000_0000_0000,
3529 vec![],
3530 None,
3531 );
3532
3533 let storage_node2 = BranchNodeCompact::new(
3534 0b0000_1111_0000_0000,
3535 0b0000_0000_0000_0000,
3536 0b0000_0000_0000_0000,
3537 vec![],
3538 None,
3539 );
3540
3541 let storage_node1_old = BranchNodeCompact::new(
3543 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3545 0b0000_0000_0000_0000,
3546 vec![],
3547 None,
3548 );
3549
3550 {
3552 let mut cursor =
3553 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3554 let entry = StorageTrieEntry {
3556 nibbles: StoredNibblesSubKey(storage_nibbles1),
3557 node: storage_node1_old.clone(),
3558 };
3559 cursor.upsert(storage_address1, &entry).unwrap();
3560 }
3561
3562 {
3564 let mut cursor =
3565 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3566 let entry1 = StorageTrieEntry {
3568 nibbles: StoredNibblesSubKey(storage_nibbles1),
3569 node: storage_node1.clone(),
3570 };
3571 cursor.upsert(storage_address2, &entry1).unwrap();
3572 let entry3 = StorageTrieEntry {
3574 nibbles: StoredNibblesSubKey(storage_nibbles3),
3575 node: storage_node2.clone(),
3576 };
3577 cursor.upsert(storage_address2, &entry3).unwrap();
3578 }
3579
3580 let storage_trie1 = StorageTrieUpdatesSorted {
3582 is_deleted: false,
3583 storage_nodes: vec![
3584 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3587 };
3588
3589 let storage_trie2 = StorageTrieUpdatesSorted {
3591 is_deleted: true,
3592 storage_nodes: vec![
3593 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3598 };
3599
3600 let mut storage_tries = B256Map::default();
3601 storage_tries.insert(storage_address1, storage_trie1);
3602 storage_tries.insert(storage_address2, storage_trie2);
3603
3604 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3605
3606 let num_written =
3608 provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3609
3610 assert_eq!(num_written, 7);
3617
3618 {
3620 let mut cursor =
3621 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3622
3623 let all_entries = cursor
3625 .walk_dup(Some(block_number), None)
3626 .unwrap()
3627 .collect::<Result<Vec<_>, _>>()
3628 .unwrap();
3629
3630 assert_eq!(
3632 all_entries,
3633 vec![
3634 (
3635 block_number,
3636 TrieChangeSetsEntry {
3637 nibbles: StoredNibblesSubKey(account_nibbles1),
3638 node: Some(node1),
3639 }
3640 ),
3641 (
3642 block_number,
3643 TrieChangeSetsEntry {
3644 nibbles: StoredNibblesSubKey(account_nibbles2),
3645 node: None,
3646 }
3647 ),
3648 ]
3649 );
3650 }
3651
3652 {
3654 let mut cursor =
3655 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3656
3657 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3659 let entries1 =
3660 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3661
3662 assert_eq!(
3663 entries1,
3664 vec![
3665 (
3666 key1,
3667 TrieChangeSetsEntry {
3668 nibbles: StoredNibblesSubKey(storage_nibbles1),
3669 node: Some(storage_node1_old), }
3671 ),
3672 (
3673 key1,
3674 TrieChangeSetsEntry {
3675 nibbles: StoredNibblesSubKey(storage_nibbles2),
3676 node: None, }
3678 ),
3679 ]
3680 );
3681
3682 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3684 let entries2 =
3685 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3686
3687 assert_eq!(
3688 entries2,
3689 vec![
3690 (
3691 key2,
3692 TrieChangeSetsEntry {
3693 nibbles: StoredNibblesSubKey(storage_nibbles1),
3694 node: Some(storage_node1), }
3696 ),
3697 (
3698 key2,
3699 TrieChangeSetsEntry {
3700 nibbles: StoredNibblesSubKey(storage_nibbles2),
3701 node: None, }
3703 ),
3704 (
3705 key2,
3706 TrieChangeSetsEntry {
3707 nibbles: StoredNibblesSubKey(storage_nibbles3),
3708 node: Some(storage_node2), }
3710 ),
3711 ]
3712 );
3713 }
3714
3715 provider_rw.commit().unwrap();
3716 }
3717
3718 #[test]
3719 fn test_write_trie_changesets_with_overlay() {
3720 use reth_db_api::models::BlockNumberHashedAddress;
3721 use reth_trie::BranchNodeCompact;
3722
3723 let factory = create_test_provider_factory();
3724 let provider_rw = factory.provider_rw().unwrap();
3725
3726 let block_number = 1u64;
3727
3728 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3730 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3731
3732 let node1 = BranchNodeCompact::new(
3733 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3739
3740 let node1_old = BranchNodeCompact::new(
3745 0b1010_1010_1010_1010, 0b0000_0000_0000_0000,
3747 0b0000_0000_0000_0000,
3748 vec![],
3749 None,
3750 );
3751
3752 let overlay_account_nodes = vec![
3754 (account_nibbles1, Some(node1_old.clone())), ];
3756
3757 let account_nodes = vec![
3759 (account_nibbles1, Some(node1)), (account_nibbles2, None), ];
3762
3763 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3768 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3769 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3770
3771 let storage_node1 = BranchNodeCompact::new(
3772 0b1111_0000_0000_0000,
3773 0b0000_0000_0000_0000,
3774 0b0000_0000_0000_0000,
3775 vec![],
3776 None,
3777 );
3778
3779 let storage_node2 = BranchNodeCompact::new(
3780 0b0000_1111_0000_0000,
3781 0b0000_0000_0000_0000,
3782 0b0000_0000_0000_0000,
3783 vec![],
3784 None,
3785 );
3786
3787 let storage_node1_old = BranchNodeCompact::new(
3789 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3791 0b0000_0000_0000_0000,
3792 vec![],
3793 None,
3794 );
3795
3796 let mut overlay_storage_tries = B256Map::default();
3798
3799 let overlay_storage_trie1 = StorageTrieUpdatesSorted {
3801 is_deleted: false,
3802 storage_nodes: vec![
3803 (storage_nibbles1, Some(storage_node1_old.clone())), ],
3806 };
3807
3808 let overlay_storage_trie2 = StorageTrieUpdatesSorted {
3810 is_deleted: false,
3811 storage_nodes: vec![
3812 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles3, Some(storage_node2.clone())), ],
3815 };
3816
3817 overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
3818 overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
3819
3820 let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
3821
3822 let storage_trie1 = StorageTrieUpdatesSorted {
3824 is_deleted: false,
3825 storage_nodes: vec![
3826 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3829 };
3830
3831 let storage_trie2 = StorageTrieUpdatesSorted {
3833 is_deleted: true,
3834 storage_nodes: vec![
3835 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3841 };
3842
3843 let mut storage_tries = B256Map::default();
3844 storage_tries.insert(storage_address1, storage_trie1);
3845 storage_tries.insert(storage_address2, storage_trie2);
3846
3847 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3848
3849 let num_written =
3851 provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
3852
3853 assert_eq!(num_written, 7);
3860
3861 {
3863 let mut cursor =
3864 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3865
3866 let all_entries = cursor
3868 .walk_dup(Some(block_number), None)
3869 .unwrap()
3870 .collect::<Result<Vec<_>, _>>()
3871 .unwrap();
3872
3873 assert_eq!(
3875 all_entries,
3876 vec![
3877 (
3878 block_number,
3879 TrieChangeSetsEntry {
3880 nibbles: StoredNibblesSubKey(account_nibbles1),
3881 node: Some(node1_old), }
3883 ),
3884 (
3885 block_number,
3886 TrieChangeSetsEntry {
3887 nibbles: StoredNibblesSubKey(account_nibbles2),
3888 node: None,
3889 }
3890 ),
3891 ]
3892 );
3893 }
3894
3895 {
3897 let mut cursor =
3898 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3899
3900 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3902 let entries1 =
3903 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3904
3905 assert_eq!(
3906 entries1,
3907 vec![
3908 (
3909 key1,
3910 TrieChangeSetsEntry {
3911 nibbles: StoredNibblesSubKey(storage_nibbles1),
3912 node: Some(storage_node1_old), }
3914 ),
3915 (
3916 key1,
3917 TrieChangeSetsEntry {
3918 nibbles: StoredNibblesSubKey(storage_nibbles2),
3919 node: None, }
3921 ),
3922 ]
3923 );
3924
3925 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3927 let entries2 =
3928 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3929
3930 assert_eq!(
3931 entries2,
3932 vec![
3933 (
3934 key2,
3935 TrieChangeSetsEntry {
3936 nibbles: StoredNibblesSubKey(storage_nibbles1),
3937 node: Some(storage_node1), }
3939 ),
3940 (
3941 key2,
3942 TrieChangeSetsEntry {
3943 nibbles: StoredNibblesSubKey(storage_nibbles2),
3944 node: None, }
3946 ),
3947 (
3948 key2,
3949 TrieChangeSetsEntry {
3950 nibbles: StoredNibblesSubKey(storage_nibbles3),
3951 node: Some(storage_node2), }
3954 ),
3955 ]
3956 );
3957 }
3958
3959 provider_rw.commit().unwrap();
3960 }
3961
3962 #[test]
3963 fn test_clear_trie_changesets_from() {
3964 use alloy_primitives::hex_literal::hex;
3965 use reth_db_api::models::BlockNumberHashedAddress;
3966 use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
3967
3968 let factory = create_test_provider_factory();
3969
3970 let block1 = 100u64;
3972 let block2 = 101u64;
3973 let block3 = 102u64;
3974 let block4 = 103u64;
3975 let block5 = 104u64;
3976
3977 let storage_address1 =
3979 B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
3980 let storage_address2 =
3981 B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
3982
3983 let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
3985 let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
3986 let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
3987
3988 let node1 = BranchNodeCompact::new(
3990 0b1111_1111_1111_1111,
3991 0b1111_1111_1111_1111,
3992 0b0000_0000_0000_0001,
3993 vec![B256::from(hex!(
3994 "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
3995 ))],
3996 None,
3997 );
3998 let node2 = BranchNodeCompact::new(
3999 0b1111_1111_1111_1110,
4000 0b1111_1111_1111_1110,
4001 0b0000_0000_0000_0010,
4002 vec![B256::from(hex!(
4003 "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
4004 ))],
4005 Some(B256::from(hex!(
4006 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
4007 ))),
4008 );
4009
4010 {
4012 let provider_rw = factory.provider_rw().unwrap();
4013 let mut cursor =
4014 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4015
4016 cursor
4018 .upsert(
4019 block1,
4020 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4021 )
4022 .unwrap();
4023 cursor
4024 .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4025 .unwrap();
4026
4027 cursor
4029 .upsert(
4030 block2,
4031 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4032 )
4033 .unwrap();
4034 cursor
4035 .upsert(
4036 block2,
4037 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4038 )
4039 .unwrap(); cursor
4041 .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4042 .unwrap();
4043
4044 cursor
4046 .upsert(
4047 block3,
4048 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4049 )
4050 .unwrap();
4051 cursor
4052 .upsert(
4053 block3,
4054 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
4055 )
4056 .unwrap();
4057
4058 cursor
4060 .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4061 .unwrap();
4062
4063 cursor
4065 .upsert(
4066 block5,
4067 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4068 )
4069 .unwrap();
4070 cursor
4071 .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4072 .unwrap();
4073
4074 provider_rw.commit().unwrap();
4075 }
4076
4077 {
4079 let provider_rw = factory.provider_rw().unwrap();
4080 let mut cursor =
4081 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4082
4083 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4085 cursor
4086 .upsert(
4087 key1_block1,
4088 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4089 )
4090 .unwrap();
4091 cursor
4092 .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4093 .unwrap();
4094
4095 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4098 cursor
4099 .upsert(
4100 key1_block2,
4101 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4102 )
4103 .unwrap();
4104 cursor
4105 .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4106 .unwrap(); cursor
4108 .upsert(
4109 key1_block2,
4110 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4111 )
4112 .unwrap();
4113
4114 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4116 cursor
4117 .upsert(
4118 key2_block3,
4119 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4120 )
4121 .unwrap();
4122 cursor
4123 .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4124 .unwrap();
4125
4126 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4128 cursor
4129 .upsert(
4130 key1_block4,
4131 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4132 )
4133 .unwrap();
4134 cursor
4135 .upsert(
4136 key1_block4,
4137 &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4138 )
4139 .unwrap(); let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4143 cursor
4144 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4145 .unwrap();
4146 cursor
4147 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4148 .unwrap();
4149
4150 provider_rw.commit().unwrap();
4151 }
4152
4153 {
4155 let provider_rw = factory.provider_rw().unwrap();
4156 provider_rw.clear_trie_changesets_from(block2).unwrap();
4157 provider_rw.commit().unwrap();
4158 }
4159
4160 {
4162 let provider = factory.provider().unwrap();
4163 let mut cursor =
4164 provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4165
4166 let block1_entries = cursor
4168 .walk_dup(Some(block1), None)
4169 .unwrap()
4170 .collect::<Result<Vec<_>, _>>()
4171 .unwrap();
4172 assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4173 assert_eq!(block1_entries[0].0, block1);
4174 assert_eq!(block1_entries[1].0, block1);
4175
4176 let block2_entries = cursor
4178 .walk_dup(Some(block2), None)
4179 .unwrap()
4180 .collect::<Result<Vec<_>, _>>()
4181 .unwrap();
4182 assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4183
4184 let block3_entries = cursor
4185 .walk_dup(Some(block3), None)
4186 .unwrap()
4187 .collect::<Result<Vec<_>, _>>()
4188 .unwrap();
4189 assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4190
4191 let block4_entries = cursor
4192 .walk_dup(Some(block4), None)
4193 .unwrap()
4194 .collect::<Result<Vec<_>, _>>()
4195 .unwrap();
4196 assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4197
4198 let block5_entries = cursor
4200 .walk_dup(Some(block5), None)
4201 .unwrap()
4202 .collect::<Result<Vec<_>, _>>()
4203 .unwrap();
4204 assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4205 }
4206
4207 {
4209 let provider = factory.provider().unwrap();
4210 let mut cursor =
4211 provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4212
4213 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4215 let block1_entries = cursor
4216 .walk_dup(Some(key1_block1), None)
4217 .unwrap()
4218 .collect::<Result<Vec<_>, _>>()
4219 .unwrap();
4220 assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4221
4222 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4224 let block2_entries = cursor
4225 .walk_dup(Some(key1_block2), None)
4226 .unwrap()
4227 .collect::<Result<Vec<_>, _>>()
4228 .unwrap();
4229 assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4230
4231 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4232 let block3_entries = cursor
4233 .walk_dup(Some(key2_block3), None)
4234 .unwrap()
4235 .collect::<Result<Vec<_>, _>>()
4236 .unwrap();
4237 assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4238
4239 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4240 let block4_entries = cursor
4241 .walk_dup(Some(key1_block4), None)
4242 .unwrap()
4243 .collect::<Result<Vec<_>, _>>()
4244 .unwrap();
4245 assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4246
4247 let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4249 let block5_entries = cursor
4250 .walk_dup(Some(key2_block5), None)
4251 .unwrap()
4252 .collect::<Result<Vec<_>, _>>()
4253 .unwrap();
4254 assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4255 }
4256 }
4257
4258 #[test]
4259 fn test_write_trie_updates_sorted() {
4260 use reth_trie::{
4261 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4262 BranchNodeCompact, StorageTrieEntry,
4263 };
4264
4265 let factory = create_test_provider_factory();
4266 let provider_rw = factory.provider_rw().unwrap();
4267
4268 {
4270 let tx = provider_rw.tx_ref();
4271 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4272
4273 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4275 cursor
4276 .upsert(
4277 to_delete,
4278 &BranchNodeCompact::new(
4279 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4283 None,
4284 ),
4285 )
4286 .unwrap();
4287
4288 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4290 cursor
4291 .upsert(
4292 to_update,
4293 &BranchNodeCompact::new(
4294 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4298 None,
4299 ),
4300 )
4301 .unwrap();
4302 }
4303
4304 let storage_address1 = B256::from([1u8; 32]);
4306 let storage_address2 = B256::from([2u8; 32]);
4307 {
4308 let tx = provider_rw.tx_ref();
4309 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4310
4311 storage_cursor
4313 .upsert(
4314 storage_address1,
4315 &StorageTrieEntry {
4316 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4317 node: BranchNodeCompact::new(
4318 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4320 0b0000_0000_0000_0000,
4321 vec![],
4322 None,
4323 ),
4324 },
4325 )
4326 .unwrap();
4327
4328 storage_cursor
4330 .upsert(
4331 storage_address2,
4332 &StorageTrieEntry {
4333 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4334 node: BranchNodeCompact::new(
4335 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4337 0b0000_0000_0000_0000,
4338 vec![],
4339 None,
4340 ),
4341 },
4342 )
4343 .unwrap();
4344 storage_cursor
4345 .upsert(
4346 storage_address2,
4347 &StorageTrieEntry {
4348 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4349 node: BranchNodeCompact::new(
4350 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4352 0b0000_0000_0000_0000,
4353 vec![],
4354 None,
4355 ),
4356 },
4357 )
4358 .unwrap();
4359 }
4360
4361 let account_nodes = vec![
4363 (
4364 Nibbles::from_nibbles([0x1, 0x2]),
4365 Some(BranchNodeCompact::new(
4366 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4370 None,
4371 )),
4372 ),
4373 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4375 Nibbles::from_nibbles([0x5, 0x6]),
4376 Some(BranchNodeCompact::new(
4377 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4381 None,
4382 )),
4383 ),
4384 ];
4385
4386 let storage_trie1 = StorageTrieUpdatesSorted {
4388 is_deleted: false,
4389 storage_nodes: vec![
4390 (
4391 Nibbles::from_nibbles([0x1, 0x0]),
4392 Some(BranchNodeCompact::new(
4393 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4397 None,
4398 )),
4399 ),
4400 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4402 };
4403
4404 let storage_trie2 = StorageTrieUpdatesSorted {
4405 is_deleted: true, storage_nodes: vec![],
4407 };
4408
4409 let mut storage_tries = B256Map::default();
4410 storage_tries.insert(storage_address1, storage_trie1);
4411 storage_tries.insert(storage_address2, storage_trie2);
4412
4413 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4414
4415 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4417
4418 assert_eq!(num_entries, 5);
4421
4422 let tx = provider_rw.tx_ref();
4424 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4425
4426 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4428 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4429 assert!(entry1.is_some(), "Updated account node should exist");
4430 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4431 assert_eq!(
4432 entry1.unwrap().1.state_mask,
4433 expected_mask,
4434 "Account node should have updated state_mask"
4435 );
4436
4437 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4439 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4440 assert!(entry2.is_none(), "Deleted account node should not exist");
4441
4442 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4444 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4445 assert!(entry3.is_some(), "New account node should exist");
4446
4447 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4449
4450 let storage_entries1: Vec<_> = storage_cursor
4452 .walk_dup(Some(storage_address1), None)
4453 .unwrap()
4454 .collect::<Result<Vec<_>, _>>()
4455 .unwrap();
4456 assert_eq!(
4457 storage_entries1.len(),
4458 1,
4459 "Storage address1 should have 1 entry after deletion"
4460 );
4461 assert_eq!(
4462 storage_entries1[0].1.nibbles.0,
4463 Nibbles::from_nibbles([0x1, 0x0]),
4464 "Remaining entry should be [0x1, 0x0]"
4465 );
4466
4467 let storage_entries2: Vec<_> = storage_cursor
4469 .walk_dup(Some(storage_address2), None)
4470 .unwrap()
4471 .collect::<Result<Vec<_>, _>>()
4472 .unwrap();
4473 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4474
4475 provider_rw.commit().unwrap();
4476 }
4477
4478 #[test]
4479 fn test_get_block_trie_updates() {
4480 use reth_db_api::models::BlockNumberHashedAddress;
4481 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4482
4483 let factory = create_test_provider_factory();
4484 let provider_rw = factory.provider_rw().unwrap();
4485
4486 let target_block = 2u64;
4487 let next_block = 3u64;
4488
4489 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4491 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4492 let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4493
4494 let node1 = BranchNodeCompact::new(
4495 0b1111_1111_0000_0000,
4496 0b0000_0000_0000_0000,
4497 0b0000_0000_0000_0000,
4498 vec![],
4499 None,
4500 );
4501
4502 let node2 = BranchNodeCompact::new(
4503 0b0000_0000_1111_1111,
4504 0b0000_0000_0000_0000,
4505 0b0000_0000_0000_0000,
4506 vec![],
4507 None,
4508 );
4509
4510 let node3 = BranchNodeCompact::new(
4511 0b1010_1010_1010_1010,
4512 0b0000_0000_0000_0000,
4513 0b0000_0000_0000_0000,
4514 vec![],
4515 None,
4516 );
4517
4518 {
4520 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4521 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4522 cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4523 }
4525
4526 {
4528 let mut cursor =
4529 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4530 cursor
4532 .append_dup(
4533 target_block,
4534 TrieChangeSetsEntry {
4535 nibbles: StoredNibblesSubKey(account_nibbles1),
4536 node: Some(BranchNodeCompact::new(
4537 0b1111_0000_0000_0000, 0b0000_0000_0000_0000,
4539 0b0000_0000_0000_0000,
4540 vec![],
4541 None,
4542 )),
4543 },
4544 )
4545 .unwrap();
4546 cursor
4548 .append_dup(
4549 target_block,
4550 TrieChangeSetsEntry {
4551 nibbles: StoredNibblesSubKey(account_nibbles2),
4552 node: None,
4553 },
4554 )
4555 .unwrap();
4556 }
4557
4558 {
4560 let mut cursor =
4561 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4562 cursor
4564 .append_dup(
4565 next_block,
4566 TrieChangeSetsEntry {
4567 nibbles: StoredNibblesSubKey(account_nibbles3),
4568 node: Some(node3),
4569 },
4570 )
4571 .unwrap();
4572 }
4573
4574 let storage_address1 = B256::from([1u8; 32]);
4576 let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4577 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4578
4579 let storage_node1 = BranchNodeCompact::new(
4580 0b1111_1111_1111_0000,
4581 0b0000_0000_0000_0000,
4582 0b0000_0000_0000_0000,
4583 vec![],
4584 None,
4585 );
4586
4587 let storage_node2 = BranchNodeCompact::new(
4588 0b0101_0101_0101_0101,
4589 0b0000_0000_0000_0000,
4590 0b0000_0000_0000_0000,
4591 vec![],
4592 None,
4593 );
4594
4595 {
4597 let mut cursor =
4598 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4599 cursor
4600 .upsert(
4601 storage_address1,
4602 &StorageTrieEntry {
4603 nibbles: StoredNibblesSubKey(storage_nibbles1),
4604 node: storage_node1.clone(),
4605 },
4606 )
4607 .unwrap();
4608 }
4610
4611 {
4613 let mut cursor =
4614 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4615 let key = BlockNumberHashedAddress((target_block, storage_address1));
4616
4617 cursor
4619 .append_dup(
4620 key,
4621 TrieChangeSetsEntry {
4622 nibbles: StoredNibblesSubKey(storage_nibbles1),
4623 node: Some(BranchNodeCompact::new(
4624 0b0000_0000_1111_1111, 0b0000_0000_0000_0000,
4626 0b0000_0000_0000_0000,
4627 vec![],
4628 None,
4629 )),
4630 },
4631 )
4632 .unwrap();
4633
4634 cursor
4636 .append_dup(
4637 key,
4638 TrieChangeSetsEntry {
4639 nibbles: StoredNibblesSubKey(storage_nibbles2),
4640 node: None,
4641 },
4642 )
4643 .unwrap();
4644 }
4645
4646 {
4648 let mut cursor =
4649 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4650 let key = BlockNumberHashedAddress((next_block, storage_address1));
4651
4652 cursor
4654 .append_dup(
4655 key,
4656 TrieChangeSetsEntry {
4657 nibbles: StoredNibblesSubKey(storage_nibbles2),
4658 node: Some(BranchNodeCompact::new(
4659 0b0101_0101_0101_0101, 0b0000_0000_0000_0000,
4661 0b0000_0000_0000_0000,
4662 vec![],
4663 None,
4664 )),
4665 },
4666 )
4667 .unwrap();
4668 }
4669
4670 provider_rw.commit().unwrap();
4671
4672 let provider = factory.provider().unwrap();
4674 let result = provider.get_block_trie_updates(target_block).unwrap();
4675
4676 assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4678
4679 let nibbles1_update = result
4681 .account_nodes_ref()
4682 .iter()
4683 .find(|(n, _)| n == &account_nibbles1)
4684 .expect("Should find nibbles1");
4685 assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4686 assert_eq!(
4687 nibbles1_update.1.as_ref().unwrap().state_mask,
4688 node1.state_mask,
4689 "nibbles1 should have current value"
4690 );
4691
4692 let nibbles2_update = result
4694 .account_nodes_ref()
4695 .iter()
4696 .find(|(n, _)| n == &account_nibbles2)
4697 .expect("Should find nibbles2");
4698 assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4699 assert_eq!(
4700 nibbles2_update.1.as_ref().unwrap().state_mask,
4701 node2.state_mask,
4702 "nibbles2 should have current value"
4703 );
4704
4705 assert!(
4707 !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4708 "nibbles3 should not be in target_block updates"
4709 );
4710
4711 assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4713 let storage_updates = result
4714 .storage_tries_ref()
4715 .get(&storage_address1)
4716 .expect("Should have storage updates for address1");
4717
4718 assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4719
4720 let storage1_update = storage_updates
4722 .storage_nodes
4723 .iter()
4724 .find(|(n, _)| n == &storage_nibbles1)
4725 .expect("Should find storage_nibbles1");
4726 assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4727 assert_eq!(
4728 storage1_update.1.as_ref().unwrap().state_mask,
4729 storage_node1.state_mask,
4730 "storage_nibbles1 should have current value"
4731 );
4732
4733 let storage2_update = storage_updates
4736 .storage_nodes
4737 .iter()
4738 .find(|(n, _)| n == &storage_nibbles2)
4739 .expect("Should find storage_nibbles2");
4740 assert!(
4741 storage2_update.1.is_some(),
4742 "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4743 );
4744 assert_eq!(
4745 storage2_update.1.as_ref().unwrap().state_mask,
4746 storage_node2.state_mask,
4747 "storage_nibbles2 should have the value that was created and will be deleted"
4748 );
4749 }
4750
4751 #[test]
4752 fn test_prunable_receipts_logic() {
4753 let insert_blocks =
4754 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4755 let mut rng = generators::rng();
4756 for block_num in 0..=tip_block {
4757 let block = random_block(
4758 &mut rng,
4759 block_num,
4760 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4761 );
4762 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4763 }
4764 };
4765
4766 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4767 let outcome = ExecutionOutcome {
4768 first_block: block,
4769 receipts: vec![vec![Receipt {
4770 tx_type: Default::default(),
4771 success: true,
4772 cumulative_gas_used: block, logs: vec![],
4774 }]],
4775 ..Default::default()
4776 };
4777 provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
4778 provider_rw.commit().unwrap();
4779 };
4780
4781 {
4783 let factory = create_test_provider_factory();
4784 let storage_settings = StorageSettings::legacy();
4785 factory.set_storage_settings_cache(storage_settings);
4786 let factory = factory.with_prune_modes(PruneModes {
4787 receipts: Some(PruneMode::Before(100)),
4788 ..Default::default()
4789 });
4790
4791 let tip_block = 200u64;
4792 let first_block = 1u64;
4793
4794 let provider_rw = factory.provider_rw().unwrap();
4796 insert_blocks(&provider_rw, tip_block, 1);
4797 provider_rw.commit().unwrap();
4798
4799 write_receipts(
4800 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4801 first_block,
4802 );
4803 write_receipts(
4804 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4805 tip_block - 1,
4806 );
4807
4808 let provider = factory.provider().unwrap();
4809
4810 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4811 assert!(provider
4812 .receipts_by_block(block.into())
4813 .unwrap()
4814 .is_some_and(|r| r.len() == num_receipts));
4815 }
4816 }
4817
4818 {
4820 let factory = create_test_provider_factory();
4821 let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
4822 factory.set_storage_settings_cache(storage_settings);
4823 let factory = factory.with_prune_modes(PruneModes {
4824 receipts: Some(PruneMode::Before(2)),
4825 ..Default::default()
4826 });
4827
4828 let tip_block = 200u64;
4829
4830 let provider_rw = factory.provider_rw().unwrap();
4832 insert_blocks(&provider_rw, tip_block, 1);
4833 provider_rw.commit().unwrap();
4834
4835 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4837 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4838
4839 assert!(factory
4840 .static_file_provider()
4841 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4842 .is_none(),);
4843 assert!(factory
4844 .static_file_provider()
4845 .get_highest_static_file_block(StaticFileSegment::Receipts)
4846 .is_some_and(|b| b == 1),);
4847
4848 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4851 assert!(factory
4852 .static_file_provider()
4853 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4854 .is_some_and(|num| num == 2),);
4855
4856 let factory = factory.with_prune_modes(PruneModes {
4860 receipts: Some(PruneMode::Before(100)),
4861 ..Default::default()
4862 });
4863 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4864 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4865 write_receipts(provider_rw, 3);
4866
4867 let provider = factory.provider().unwrap();
4872 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4873 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4874 assert!(provider
4875 .receipts_by_block(num.into())
4876 .unwrap()
4877 .is_some_and(|r| r.len() == num_receipts));
4878
4879 let receipt = provider.receipt(num).unwrap();
4880 if num_receipts > 0 {
4881 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4882 } else {
4883 assert!(receipt.is_none());
4884 }
4885 }
4886 }
4887 }
4888}