1use crate::{
2 changesets_utils::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6 static_file::{StaticFileWriteCtx, StaticFileWriter},
7 NodeTypesForProvider, StaticFileProvider,
8 },
9 to_range,
10 traits::{
11 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12 },
13 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19 RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21 TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25 BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, HashMap, HashSet},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40 database::Database,
41 models::{
42 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43 ShardedKey, StorageBeforeTx, StorageSettings, StoredBlockBodyIndices,
44 },
45 table::Table,
46 tables,
47 transaction::{DbTx, DbTxMut},
48 BlockNumberList, PlainAccountState, PlainStorageState,
49};
50use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
51use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
52use reth_primitives_traits::{
53 Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
54};
55use reth_prune_types::{
56 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
57};
58use reth_stages_types::{StageCheckpoint, StageId};
59use reth_static_file_types::StaticFileSegment;
60use reth_storage_api::{
61 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
62 NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader,
63 StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
64};
65use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
66use reth_trie::{
67 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
68 HashedPostStateSorted, StoredNibbles,
69};
70use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor};
71use revm_database::states::{
72 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
73};
74use std::{
75 cmp::Ordering,
76 collections::{BTreeMap, BTreeSet},
77 fmt::Debug,
78 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
79 sync::Arc,
80 thread,
81 time::Instant,
82};
83use tracing::{debug, instrument, trace};
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
87pub enum CommitOrder {
88 #[default]
90 Normal,
91 Unwind,
94}
95
96impl CommitOrder {
97 pub const fn is_unwind(&self) -> bool {
99 matches!(self, Self::Unwind)
100 }
101}
102
103pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
105
106#[derive(Debug)]
111pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
112 pub DatabaseProvider<<DB as Database>::TXMut, N>,
113);
114
115impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
116 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
117
118 fn deref(&self) -> &Self::Target {
119 &self.0
120 }
121}
122
123impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
124 fn deref_mut(&mut self) -> &mut Self::Target {
125 &mut self.0
126 }
127}
128
129impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
130 for DatabaseProviderRW<DB, N>
131{
132 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
133 &self.0
134 }
135}
136
137impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
138 pub fn commit(self) -> ProviderResult<()> {
140 self.0.commit()
141 }
142
143 pub fn into_tx(self) -> <DB as Database>::TXMut {
145 self.0.into_tx()
146 }
147
148 #[cfg(any(test, feature = "test-utils"))]
150 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
151 self.0.minimum_pruning_distance = distance;
152 self
153 }
154}
155
156impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
157 for DatabaseProvider<<DB as Database>::TXMut, N>
158{
159 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
160 provider.0
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum SaveBlocksMode {
167 Full,
170 BlocksOnly,
174}
175
176impl SaveBlocksMode {
177 pub const fn with_state(self) -> bool {
179 matches!(self, Self::Full)
180 }
181}
182
183pub struct DatabaseProvider<TX, N: NodeTypes> {
186 tx: TX,
188 chain_spec: Arc<N::ChainSpec>,
190 static_file_provider: StaticFileProvider<N::Primitives>,
192 prune_modes: PruneModes,
194 storage: Arc<N::Storage>,
196 storage_settings: Arc<RwLock<StorageSettings>>,
198 rocksdb_provider: RocksDBProvider,
200 changeset_cache: ChangesetCache,
202 #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
204 pending_rocksdb_batches: PendingRocksDBBatches,
205 commit_order: CommitOrder,
207 minimum_pruning_distance: u64,
209 metrics: metrics::DatabaseProviderMetrics,
211}
212
213impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 let mut s = f.debug_struct("DatabaseProvider");
216 s.field("tx", &self.tx)
217 .field("chain_spec", &self.chain_spec)
218 .field("static_file_provider", &self.static_file_provider)
219 .field("prune_modes", &self.prune_modes)
220 .field("storage", &self.storage)
221 .field("storage_settings", &self.storage_settings)
222 .field("rocksdb_provider", &self.rocksdb_provider)
223 .field("changeset_cache", &self.changeset_cache)
224 .field("pending_rocksdb_batches", &"<pending batches>")
225 .field("commit_order", &self.commit_order)
226 .field("minimum_pruning_distance", &self.minimum_pruning_distance)
227 .finish()
228 }
229}
230
231impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
232 pub const fn prune_modes_ref(&self) -> &PruneModes {
234 &self.prune_modes
235 }
236}
237
238impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
239 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
241 trace!(target: "providers::db", "Returning latest state provider");
242 Box::new(LatestStateProviderRef::new(self))
243 }
244
245 pub fn history_by_block_hash<'a>(
247 &'a self,
248 block_hash: BlockHash,
249 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
250 let mut block_number =
251 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
252 if block_number == self.best_block_number().unwrap_or_default() &&
253 block_number == self.last_block_number().unwrap_or_default()
254 {
255 return Ok(Box::new(LatestStateProviderRef::new(self)))
256 }
257
258 block_number += 1;
260
261 let account_history_prune_checkpoint =
262 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
263 let storage_history_prune_checkpoint =
264 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
265
266 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
267
268 if let Some(prune_checkpoint_block_number) =
271 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
272 {
273 state_provider = state_provider.with_lowest_available_account_history_block_number(
274 prune_checkpoint_block_number + 1,
275 );
276 }
277 if let Some(prune_checkpoint_block_number) =
278 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
279 {
280 state_provider = state_provider.with_lowest_available_storage_history_block_number(
281 prune_checkpoint_block_number + 1,
282 );
283 }
284
285 Ok(Box::new(state_provider))
286 }
287
288 #[cfg(feature = "test-utils")]
289 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
291 self.prune_modes = prune_modes;
292 }
293}
294
295impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
296 type Primitives = N::Primitives;
297}
298
299impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
300 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
302 self.static_file_provider.clone()
303 }
304
305 fn get_static_file_writer(
306 &self,
307 block: BlockNumber,
308 segment: StaticFileSegment,
309 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
310 self.static_file_provider.get_writer(block, segment)
311 }
312}
313
314impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
315 fn rocksdb_provider(&self) -> RocksDBProvider {
317 self.rocksdb_provider.clone()
318 }
319
320 #[cfg(all(unix, feature = "rocksdb"))]
321 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
322 self.pending_rocksdb_batches.lock().push(batch);
323 }
324}
325
326impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
327 for DatabaseProvider<TX, N>
328{
329 type ChainSpec = N::ChainSpec;
330
331 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
332 self.chain_spec.clone()
333 }
334}
335
336impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
337 #[allow(clippy::too_many_arguments)]
339 fn new_rw_inner(
340 tx: TX,
341 chain_spec: Arc<N::ChainSpec>,
342 static_file_provider: StaticFileProvider<N::Primitives>,
343 prune_modes: PruneModes,
344 storage: Arc<N::Storage>,
345 storage_settings: Arc<RwLock<StorageSettings>>,
346 rocksdb_provider: RocksDBProvider,
347 changeset_cache: ChangesetCache,
348 commit_order: CommitOrder,
349 ) -> Self {
350 Self {
351 tx,
352 chain_spec,
353 static_file_provider,
354 prune_modes,
355 storage,
356 storage_settings,
357 rocksdb_provider,
358 changeset_cache,
359 pending_rocksdb_batches: Default::default(),
360 commit_order,
361 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
362 metrics: metrics::DatabaseProviderMetrics::default(),
363 }
364 }
365
366 #[allow(clippy::too_many_arguments)]
368 pub fn new_rw(
369 tx: TX,
370 chain_spec: Arc<N::ChainSpec>,
371 static_file_provider: StaticFileProvider<N::Primitives>,
372 prune_modes: PruneModes,
373 storage: Arc<N::Storage>,
374 storage_settings: Arc<RwLock<StorageSettings>>,
375 rocksdb_provider: RocksDBProvider,
376 changeset_cache: ChangesetCache,
377 ) -> Self {
378 Self::new_rw_inner(
379 tx,
380 chain_spec,
381 static_file_provider,
382 prune_modes,
383 storage,
384 storage_settings,
385 rocksdb_provider,
386 changeset_cache,
387 CommitOrder::Normal,
388 )
389 }
390
391 #[allow(clippy::too_many_arguments)]
393 pub fn new_unwind_rw(
394 tx: TX,
395 chain_spec: Arc<N::ChainSpec>,
396 static_file_provider: StaticFileProvider<N::Primitives>,
397 prune_modes: PruneModes,
398 storage: Arc<N::Storage>,
399 storage_settings: Arc<RwLock<StorageSettings>>,
400 rocksdb_provider: RocksDBProvider,
401 changeset_cache: ChangesetCache,
402 ) -> Self {
403 Self::new_rw_inner(
404 tx,
405 chain_spec,
406 static_file_provider,
407 prune_modes,
408 storage,
409 storage_settings,
410 rocksdb_provider,
411 changeset_cache,
412 CommitOrder::Unwind,
413 )
414 }
415}
416
417impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
418 fn as_ref(&self) -> &Self {
419 self
420 }
421}
422
423impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
424 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
428 where
429 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
430 {
431 #[cfg(all(unix, feature = "rocksdb"))]
432 let rocksdb = self.rocksdb_provider();
433 #[cfg(all(unix, feature = "rocksdb"))]
434 let rocksdb_batch = rocksdb.batch();
435 #[cfg(not(all(unix, feature = "rocksdb")))]
436 let rocksdb_batch = ();
437
438 let (result, raw_batch) = f(rocksdb_batch)?;
439
440 #[cfg(all(unix, feature = "rocksdb"))]
441 if let Some(batch) = raw_batch {
442 self.set_pending_rocksdb_batch(batch);
443 }
444 let _ = raw_batch; Ok(result)
447 }
448
449 fn static_file_write_ctx(
451 &self,
452 save_mode: SaveBlocksMode,
453 first_block: BlockNumber,
454 last_block: BlockNumber,
455 ) -> ProviderResult<StaticFileWriteCtx> {
456 let tip = self.last_block_number()?.max(last_block);
457 Ok(StaticFileWriteCtx {
458 write_senders: EitherWriterDestination::senders(self).is_static_file() &&
459 self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
460 write_receipts: save_mode.with_state() &&
461 EitherWriter::receipts_destination(self).is_static_file(),
462 write_account_changesets: save_mode.with_state() &&
463 EitherWriterDestination::account_changesets(self).is_static_file(),
464 write_storage_changesets: save_mode.with_state() &&
465 EitherWriterDestination::storage_changesets(self).is_static_file(),
466 tip,
467 receipts_prune_mode: self.prune_modes.receipts,
468 receipts_prunable: self
470 .static_file_provider
471 .get_highest_static_file_tx(StaticFileSegment::Receipts)
472 .is_none() &&
473 PruneMode::Distance(self.minimum_pruning_distance)
474 .should_prune(first_block, tip),
475 })
476 }
477
478 #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
480 fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
481 RocksDBWriteCtx {
482 first_block_number: first_block,
483 prune_tx_lookup: self.prune_modes.transaction_lookup,
484 storage_settings: self.cached_storage_settings(),
485 pending_batches: self.pending_rocksdb_batches.clone(),
486 }
487 }
488
489 #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
498 pub fn save_blocks(
499 &self,
500 blocks: Vec<ExecutedBlock<N::Primitives>>,
501 save_mode: SaveBlocksMode,
502 ) -> ProviderResult<()> {
503 if blocks.is_empty() {
504 debug!(target: "providers::db", "Attempted to write empty block range");
505 return Ok(())
506 }
507
508 let total_start = Instant::now();
509 let block_count = blocks.len() as u64;
510 let first_number = blocks.first().unwrap().recovered_block().number();
511 let last_block_number = blocks.last().unwrap().recovered_block().number();
512
513 debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
514
515 let first_tx_num = self
517 .tx
518 .cursor_read::<tables::TransactionBlocks>()?
519 .last()?
520 .map(|(n, _)| n + 1)
521 .unwrap_or_default();
522
523 let tx_nums: Vec<TxNumber> = {
524 let mut nums = Vec::with_capacity(blocks.len());
525 let mut current = first_tx_num;
526 for block in &blocks {
527 nums.push(current);
528 current += block.recovered_block().body().transaction_count() as u64;
529 }
530 nums
531 };
532
533 let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
534
535 let sf_provider = &self.static_file_provider;
537 let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
538 #[cfg(all(unix, feature = "rocksdb"))]
539 let rocksdb_provider = self.rocksdb_provider.clone();
540 #[cfg(all(unix, feature = "rocksdb"))]
541 let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
542
543 thread::scope(|s| {
544 let sf_handle = s.spawn(|| {
546 let start = Instant::now();
547 sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
548 Ok::<_, ProviderError>(start.elapsed())
549 });
550
551 #[cfg(all(unix, feature = "rocksdb"))]
553 let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
554 s.spawn(|| {
555 let start = Instant::now();
556 rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
557 Ok::<_, ProviderError>(start.elapsed())
558 })
559 });
560
561 let mdbx_start = Instant::now();
563
564 if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
566 self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
567 {
568 let start = Instant::now();
569 let total_tx_count: usize =
570 blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
571 let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
572 for (i, block) in blocks.iter().enumerate() {
573 let recovered_block = block.recovered_block();
574 let mut tx_num = tx_nums[i];
575 for transaction in recovered_block.body().transactions_iter() {
576 all_tx_hashes.push((*transaction.tx_hash(), tx_num));
577 tx_num += 1;
578 }
579 }
580
581 all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
583
584 self.with_rocksdb_batch(|batch| {
586 let mut tx_hash_writer =
587 EitherWriter::new_transaction_hash_numbers(self, batch)?;
588 tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
589 let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
590 Ok(((), raw_batch))
591 })?;
592 self.metrics.record_duration(
593 metrics::Action::InsertTransactionHashNumbers,
594 start.elapsed(),
595 );
596 }
597
598 for (i, block) in blocks.iter().enumerate() {
599 let recovered_block = block.recovered_block();
600
601 let start = Instant::now();
602 self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
603 timings.insert_block += start.elapsed();
604
605 if save_mode.with_state() {
606 let execution_output = block.execution_outcome();
607
608 let start = Instant::now();
612 self.write_state(
613 WriteStateInput::Single {
614 outcome: execution_output,
615 block: recovered_block.number(),
616 },
617 OriginalValuesKnown::No,
618 StateWriteConfig {
619 write_receipts: !sf_ctx.write_receipts,
620 write_account_changesets: !sf_ctx.write_account_changesets,
621 },
622 )?;
623 timings.write_state += start.elapsed();
624
625 let trie_data = block.trie_data();
626
627 let start = Instant::now();
629 self.write_hashed_state(&trie_data.hashed_state)?;
630 timings.write_hashed_state += start.elapsed();
631 }
632 }
633
634 if save_mode.with_state() {
637 let start = Instant::now();
638
639 let merged =
641 TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
642
643 if !merged.is_empty() {
644 self.write_trie_updates_sorted(&merged)?;
645 }
646 timings.write_trie_updates += start.elapsed();
647 }
648
649 if save_mode.with_state() {
651 let start = Instant::now();
652 self.update_history_indices(first_number..=last_block_number)?;
653 timings.update_history_indices = start.elapsed();
654 }
655
656 let start = Instant::now();
658 self.update_pipeline_stages(last_block_number, false)?;
659 timings.update_pipeline_stages = start.elapsed();
660
661 timings.mdbx = mdbx_start.elapsed();
662
663 timings.sf = sf_handle
665 .join()
666 .map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
667
668 #[cfg(all(unix, feature = "rocksdb"))]
670 if let Some(handle) = rocksdb_handle {
671 timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
672 }
673
674 timings.total = total_start.elapsed();
675
676 self.metrics.record_save_blocks(&timings);
677 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
678
679 Ok(())
680 })
681 }
682
683 #[instrument(level = "debug", target = "providers::db", skip_all)]
687 fn insert_block_mdbx_only(
688 &self,
689 block: &RecoveredBlock<BlockTy<N>>,
690 first_tx_num: TxNumber,
691 ) -> ProviderResult<StoredBlockBodyIndices> {
692 if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
693 EitherWriterDestination::senders(self).is_database()
694 {
695 let start = Instant::now();
696 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
697 let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
698 for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
699 cursor.append(tx_num, &sender)?;
700 }
701 self.metrics
702 .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
703 }
704
705 let block_number = block.number();
706 let tx_count = block.body().transaction_count() as u64;
707
708 let start = Instant::now();
709 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
710 self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
711
712 self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
713
714 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
715 }
716
717 fn write_block_body_indices(
720 &self,
721 block_number: BlockNumber,
722 body: &BodyTy<N>,
723 first_tx_num: TxNumber,
724 tx_count: u64,
725 ) -> ProviderResult<()> {
726 let start = Instant::now();
728 self.tx
729 .cursor_write::<tables::BlockBodyIndices>()?
730 .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
731 self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
732
733 if tx_count > 0 {
735 let start = Instant::now();
736 self.tx
737 .cursor_write::<tables::TransactionBlocks>()?
738 .append(first_tx_num + tx_count - 1, &block_number)?;
739 self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
740 }
741
742 self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
744
745 Ok(())
746 }
747
748 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
753 let changed_accounts = self
754 .tx
755 .cursor_read::<tables::AccountChangeSets>()?
756 .walk_range(from..)?
757 .collect::<Result<Vec<_>, _>>()?;
758
759 self.unwind_account_hashing(changed_accounts.iter())?;
761
762 self.unwind_account_history_indices(changed_accounts.iter())?;
764
765 let storage_start = BlockNumberAddress((from, Address::ZERO));
766 let changed_storages = self
767 .tx
768 .cursor_read::<tables::StorageChangeSets>()?
769 .walk_range(storage_start..)?
770 .collect::<Result<Vec<_>, _>>()?;
771
772 self.unwind_storage_hashing(changed_storages.iter().copied())?;
774
775 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
777
778 let db_tip_block = self
781 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
782 .as_ref()
783 .map(|chk| chk.block_number)
784 .ok_or_else(|| ProviderError::InsufficientChangesets {
785 requested: from,
786 available: 0..=0,
787 })?;
788
789 let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
790 self.write_trie_updates_sorted(&trie_revert)?;
791
792 Ok(())
793 }
794
795 fn remove_receipts_from(
797 &self,
798 from_tx: TxNumber,
799 last_block: BlockNumber,
800 ) -> ProviderResult<()> {
801 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
803
804 if EitherWriter::receipts_destination(self).is_static_file() {
805 let static_file_receipt_num =
806 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
807
808 let to_delete = static_file_receipt_num
809 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
810 .unwrap_or_default();
811
812 self.static_file_provider
813 .latest_writer(StaticFileSegment::Receipts)?
814 .prune_receipts(to_delete, last_block)?;
815 }
816
817 Ok(())
818 }
819}
820
821impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
822 fn try_into_history_at_block(
823 self,
824 mut block_number: BlockNumber,
825 ) -> ProviderResult<StateProviderBox> {
826 if block_number == self.best_block_number().unwrap_or_default() {
829 return Ok(Box::new(LatestStateProvider::new(self)))
830 }
831
832 block_number += 1;
834
835 let account_history_prune_checkpoint =
836 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
837 let storage_history_prune_checkpoint =
838 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
839
840 let mut state_provider = HistoricalStateProvider::new(self, block_number);
841
842 if let Some(prune_checkpoint_block_number) =
845 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
846 {
847 state_provider = state_provider.with_lowest_available_account_history_block_number(
848 prune_checkpoint_block_number + 1,
849 );
850 }
851 if let Some(prune_checkpoint_block_number) =
852 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
853 {
854 state_provider = state_provider.with_lowest_available_storage_history_block_number(
855 prune_checkpoint_block_number + 1,
856 );
857 }
858
859 Ok(Box::new(state_provider))
860 }
861}
862
863fn unwind_history_shards<S, T, C>(
878 cursor: &mut C,
879 start_key: T::Key,
880 block_number: BlockNumber,
881 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
882) -> ProviderResult<Vec<u64>>
883where
884 T: Table<Value = BlockNumberList>,
885 T::Key: AsRef<ShardedKey<S>>,
886 C: DbCursorRO<T> + DbCursorRW<T>,
887{
888 let mut item = cursor.seek_exact(start_key)?;
890 while let Some((sharded_key, list)) = item {
891 if !shard_belongs_to_key(&sharded_key) {
893 break
894 }
895
896 cursor.delete_current()?;
899
900 let first = list.iter().next().expect("List can't be empty");
903
904 if first >= block_number {
907 item = cursor.prev()?;
908 continue
909 }
910 else if block_number <= sharded_key.as_ref().highest_block_number {
913 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
916 }
917 return Ok(list.iter().collect::<Vec<_>>())
920 }
921
922 Ok(Vec::new())
924}
925
926impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
927 #[allow(clippy::too_many_arguments)]
929 pub fn new(
930 tx: TX,
931 chain_spec: Arc<N::ChainSpec>,
932 static_file_provider: StaticFileProvider<N::Primitives>,
933 prune_modes: PruneModes,
934 storage: Arc<N::Storage>,
935 storage_settings: Arc<RwLock<StorageSettings>>,
936 rocksdb_provider: RocksDBProvider,
937 changeset_cache: ChangesetCache,
938 ) -> Self {
939 Self {
940 tx,
941 chain_spec,
942 static_file_provider,
943 prune_modes,
944 storage,
945 storage_settings,
946 rocksdb_provider,
947 changeset_cache,
948 pending_rocksdb_batches: Default::default(),
949 commit_order: CommitOrder::Normal,
950 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
951 metrics: metrics::DatabaseProviderMetrics::default(),
952 }
953 }
954
955 pub fn into_tx(self) -> TX {
957 self.tx
958 }
959
960 pub const fn tx_mut(&mut self) -> &mut TX {
962 &mut self.tx
963 }
964
965 pub const fn tx_ref(&self) -> &TX {
967 &self.tx
968 }
969
970 pub fn chain_spec(&self) -> &N::ChainSpec {
972 &self.chain_spec
973 }
974}
975
976impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
977 fn recovered_block<H, HF, B, BF>(
978 &self,
979 id: BlockHashOrNumber,
980 _transaction_kind: TransactionVariant,
981 header_by_number: HF,
982 construct_block: BF,
983 ) -> ProviderResult<Option<B>>
984 where
985 H: AsRef<HeaderTy<N>>,
986 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
987 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
988 {
989 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
990 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
991
992 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
999
1000 let tx_range = body.tx_num_range();
1001
1002 let (transactions, senders) = if tx_range.is_empty() {
1003 (vec![], vec![])
1004 } else {
1005 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
1006 };
1007
1008 let body = self
1009 .storage
1010 .reader()
1011 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1012 .pop()
1013 .ok_or(ProviderError::InvalidStorageOutput)?;
1014
1015 construct_block(header, body, senders)
1016 }
1017
1018 fn block_range<F, H, HF, R>(
1028 &self,
1029 range: RangeInclusive<BlockNumber>,
1030 headers_range: HF,
1031 mut assemble_block: F,
1032 ) -> ProviderResult<Vec<R>>
1033 where
1034 H: AsRef<HeaderTy<N>>,
1035 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1036 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1037 {
1038 if range.is_empty() {
1039 return Ok(Vec::new())
1040 }
1041
1042 let len = range.end().saturating_sub(*range.start()) as usize;
1043 let mut blocks = Vec::with_capacity(len);
1044
1045 let headers = headers_range(range.clone())?;
1046
1047 let present_headers = self
1053 .block_body_indices_range(range)?
1054 .into_iter()
1055 .map(|b| b.tx_num_range())
1056 .zip(headers)
1057 .collect::<Vec<_>>();
1058
1059 let mut inputs = Vec::with_capacity(present_headers.len());
1060 for (tx_range, header) in &present_headers {
1061 let transactions = if tx_range.is_empty() {
1062 Vec::new()
1063 } else {
1064 self.transactions_by_tx_range(tx_range.clone())?
1065 };
1066
1067 inputs.push((header.as_ref(), transactions));
1068 }
1069
1070 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1071
1072 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1073 blocks.push(assemble_block(header, body, tx_range)?);
1074 }
1075
1076 Ok(blocks)
1077 }
1078
1079 fn block_with_senders_range<H, HF, B, BF>(
1090 &self,
1091 range: RangeInclusive<BlockNumber>,
1092 headers_range: HF,
1093 assemble_block: BF,
1094 ) -> ProviderResult<Vec<B>>
1095 where
1096 H: AsRef<HeaderTy<N>>,
1097 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1098 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1099 {
1100 self.block_range(range, headers_range, |header, body, tx_range| {
1101 let senders = if tx_range.is_empty() {
1102 Vec::new()
1103 } else {
1104 let known_senders: HashMap<TxNumber, Address> =
1105 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1106
1107 let mut senders = Vec::with_capacity(body.transactions().len());
1108 for (tx_num, tx) in tx_range.zip(body.transactions()) {
1109 match known_senders.get(&tx_num) {
1110 None => {
1111 let sender = tx.recover_signer_unchecked()?;
1113 senders.push(sender);
1114 }
1115 Some(sender) => senders.push(*sender),
1116 }
1117 }
1118
1119 senders
1120 };
1121
1122 assemble_block(header, body, senders)
1123 })
1124 }
1125
1126 fn populate_bundle_state<A, S>(
1130 &self,
1131 account_changeset: Vec<(u64, AccountBeforeTx)>,
1132 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1133 plain_accounts_cursor: &mut A,
1134 plain_storage_cursor: &mut S,
1135 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1136 where
1137 A: DbCursorRO<PlainAccountState>,
1138 S: DbDupCursorRO<PlainStorageState>,
1139 {
1140 let mut state: BundleStateInit = HashMap::default();
1144
1145 let mut reverts: RevertsInit = HashMap::default();
1151
1152 for (block_number, account_before) in account_changeset.into_iter().rev() {
1154 let AccountBeforeTx { info: old_info, address } = account_before;
1155 match state.entry(address) {
1156 hash_map::Entry::Vacant(entry) => {
1157 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1158 entry.insert((old_info, new_info, HashMap::default()));
1159 }
1160 hash_map::Entry::Occupied(mut entry) => {
1161 entry.get_mut().0 = old_info;
1163 }
1164 }
1165 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1167 }
1168
1169 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1171 let BlockNumberAddress((block_number, address)) = block_and_address;
1172 let account_state = match state.entry(address) {
1174 hash_map::Entry::Vacant(entry) => {
1175 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1176 entry.insert((present_info, present_info, HashMap::default()))
1177 }
1178 hash_map::Entry::Occupied(entry) => entry.into_mut(),
1179 };
1180
1181 match account_state.2.entry(old_storage.key) {
1183 hash_map::Entry::Vacant(entry) => {
1184 let new_storage = plain_storage_cursor
1185 .seek_by_key_subkey(address, old_storage.key)?
1186 .filter(|storage| storage.key == old_storage.key)
1187 .unwrap_or_default();
1188 entry.insert((old_storage.value, new_storage.value));
1189 }
1190 hash_map::Entry::Occupied(mut entry) => {
1191 entry.get_mut().0 = old_storage.value;
1192 }
1193 };
1194
1195 reverts
1196 .entry(block_number)
1197 .or_default()
1198 .entry(address)
1199 .or_default()
1200 .1
1201 .push(old_storage);
1202 }
1203
1204 Ok((state, reverts))
1205 }
1206}
1207
1208impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1209 fn append_history_index<P, T>(
1217 &self,
1218 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1219 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1220 ) -> ProviderResult<()>
1221 where
1222 P: Copy,
1223 T: Table<Value = BlockNumberList>,
1224 {
1225 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1228
1229 let mut cursor = self.tx.cursor_write::<T>()?;
1230
1231 for (partial_key, indices) in index_updates {
1232 let last_key = sharded_key_factory(partial_key, u64::MAX);
1233 let mut last_shard = cursor
1234 .seek_exact(last_key.clone())?
1235 .map(|(_, list)| list)
1236 .unwrap_or_else(BlockNumberList::empty);
1237
1238 last_shard.append(indices).map_err(ProviderError::other)?;
1239
1240 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1242 cursor.upsert(last_key, &last_shard)?;
1243 continue;
1244 }
1245
1246 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1248 let mut chunks_peekable = chunks.into_iter().peekable();
1249
1250 while let Some(chunk) = chunks_peekable.next() {
1251 let shard = BlockNumberList::new_pre_sorted(chunk);
1252 let highest_block_number = if chunks_peekable.peek().is_some() {
1253 shard.iter().next_back().expect("`chunks` does not return empty list")
1254 } else {
1255 u64::MAX
1257 };
1258
1259 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1260 }
1261 }
1262
1263 Ok(())
1264 }
1265}
1266
1267impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1268 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1269 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1270 }
1271}
1272
1273impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1274 fn changed_accounts_with_range(
1275 &self,
1276 range: RangeInclusive<BlockNumber>,
1277 ) -> ProviderResult<BTreeSet<Address>> {
1278 let mut reader = EitherReader::new_account_changesets(self)?;
1279
1280 reader.changed_accounts_with_range(range)
1281 }
1282
1283 fn basic_accounts(
1284 &self,
1285 iter: impl IntoIterator<Item = Address>,
1286 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1287 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1288 Ok(iter
1289 .into_iter()
1290 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
1291 .collect::<Result<Vec<_>, _>>()?)
1292 }
1293
1294 fn changed_accounts_and_blocks_with_range(
1295 &self,
1296 range: RangeInclusive<BlockNumber>,
1297 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1298 let highest_static_block = self
1299 .static_file_provider
1300 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1301
1302 if let Some(highest) = highest_static_block &&
1303 self.cached_storage_settings().account_changesets_in_static_files
1304 {
1305 let start = *range.start();
1306 let static_end = (*range.end()).min(highest + 1);
1307
1308 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1309 if start <= static_end {
1310 for block in start..=static_end {
1311 let block_changesets = self.account_block_changeset(block)?;
1312 for changeset in block_changesets {
1313 changed_accounts_and_blocks
1314 .entry(changeset.address)
1315 .or_default()
1316 .push(block);
1317 }
1318 }
1319 }
1320
1321 Ok(changed_accounts_and_blocks)
1322 } else {
1323 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1324
1325 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1326 BTreeMap::new(),
1327 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1328 let (index, account) = entry?;
1329 accounts.entry(account.address).or_default().push(index);
1330 Ok(accounts)
1331 },
1332 )?;
1333
1334 Ok(account_transitions)
1335 }
1336 }
1337}
1338
1339impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1340 fn storage_changeset(
1341 &self,
1342 block_number: BlockNumber,
1343 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1344 if self.cached_storage_settings().storage_changesets_in_static_files {
1345 self.static_file_provider.storage_changeset(block_number)
1346 } else {
1347 let range = block_number..=block_number;
1348 let storage_range = BlockNumberAddress::range(range);
1349 self.tx
1350 .cursor_dup_read::<tables::StorageChangeSets>()?
1351 .walk_range(storage_range)?
1352 .map(|result| -> ProviderResult<_> { Ok(result?) })
1353 .collect()
1354 }
1355 }
1356
1357 fn get_storage_before_block(
1358 &self,
1359 block_number: BlockNumber,
1360 address: Address,
1361 storage_key: B256,
1362 ) -> ProviderResult<Option<StorageEntry>> {
1363 if self.cached_storage_settings().storage_changesets_in_static_files {
1364 self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1365 } else {
1366 self.tx
1367 .cursor_dup_read::<tables::StorageChangeSets>()?
1368 .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1369 .filter(|entry| entry.key == storage_key)
1370 .map(Ok)
1371 .transpose()
1372 }
1373 }
1374
1375 fn storage_changesets_range(
1376 &self,
1377 range: RangeInclusive<BlockNumber>,
1378 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1379 if self.cached_storage_settings().storage_changesets_in_static_files {
1380 self.static_file_provider.storage_changesets_range(range)
1381 } else {
1382 self.tx
1383 .cursor_dup_read::<tables::StorageChangeSets>()?
1384 .walk_range(BlockNumberAddress::range(range))?
1385 .map(|result| -> ProviderResult<_> { Ok(result?) })
1386 .collect()
1387 }
1388 }
1389
1390 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1391 if self.cached_storage_settings().storage_changesets_in_static_files {
1392 self.static_file_provider.storage_changeset_count()
1393 } else {
1394 Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1395 }
1396 }
1397}
1398
1399impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1400 fn account_block_changeset(
1401 &self,
1402 block_number: BlockNumber,
1403 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1404 if self.cached_storage_settings().account_changesets_in_static_files {
1405 let static_changesets =
1406 self.static_file_provider.account_block_changeset(block_number)?;
1407 Ok(static_changesets)
1408 } else {
1409 let range = block_number..=block_number;
1410 self.tx
1411 .cursor_read::<tables::AccountChangeSets>()?
1412 .walk_range(range)?
1413 .map(|result| -> ProviderResult<_> {
1414 let (_, account_before) = result?;
1415 Ok(account_before)
1416 })
1417 .collect()
1418 }
1419 }
1420
1421 fn get_account_before_block(
1422 &self,
1423 block_number: BlockNumber,
1424 address: Address,
1425 ) -> ProviderResult<Option<AccountBeforeTx>> {
1426 if self.cached_storage_settings().account_changesets_in_static_files {
1427 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1428 } else {
1429 self.tx
1430 .cursor_dup_read::<tables::AccountChangeSets>()?
1431 .seek_by_key_subkey(block_number, address)?
1432 .filter(|acc| acc.address == address)
1433 .map(Ok)
1434 .transpose()
1435 }
1436 }
1437
1438 fn account_changesets_range(
1439 &self,
1440 range: impl core::ops::RangeBounds<BlockNumber>,
1441 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1442 let range = to_range(range);
1443 let mut changesets = Vec::new();
1444 if self.cached_storage_settings().account_changesets_in_static_files &&
1445 let Some(highest) = self
1446 .static_file_provider
1447 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1448 {
1449 let static_end = range.end.min(highest + 1);
1450 if range.start < static_end {
1451 for block in range.start..static_end {
1452 let block_changesets = self.account_block_changeset(block)?;
1453 for changeset in block_changesets {
1454 changesets.push((block, changeset));
1455 }
1456 }
1457 }
1458 } else {
1459 let mut cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1461 for entry in cursor.walk_range(range)? {
1462 let (block_num, account_before) = entry?;
1463 changesets.push((block_num, account_before));
1464 }
1465 }
1466
1467 Ok(changesets)
1468 }
1469
1470 fn account_changeset_count(&self) -> ProviderResult<usize> {
1471 if self.cached_storage_settings().account_changesets_in_static_files {
1474 self.static_file_provider.account_changeset_count()
1475 } else {
1476 Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1477 }
1478 }
1479}
1480
1481impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1482 for DatabaseProvider<TX, N>
1483{
1484 type Header = HeaderTy<N>;
1485
1486 fn local_tip_header(
1487 &self,
1488 highest_uninterrupted_block: BlockNumber,
1489 ) -> ProviderResult<SealedHeader<Self::Header>> {
1490 let static_file_provider = self.static_file_provider();
1491
1492 let next_static_file_block_num = static_file_provider
1495 .get_highest_static_file_block(StaticFileSegment::Headers)
1496 .map(|id| id + 1)
1497 .unwrap_or_default();
1498 let next_block = highest_uninterrupted_block + 1;
1499
1500 match next_static_file_block_num.cmp(&next_block) {
1501 Ordering::Greater => {
1504 let mut static_file_producer =
1505 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1506 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1507 static_file_producer.commit()?
1510 }
1511 Ordering::Less => {
1512 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1514 }
1515 Ordering::Equal => {}
1516 }
1517
1518 let local_head = static_file_provider
1519 .sealed_header(highest_uninterrupted_block)?
1520 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1521
1522 Ok(local_head)
1523 }
1524}
1525
1526impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1527 type Header = HeaderTy<N>;
1528
1529 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1530 if let Some(num) = self.block_number(block_hash)? {
1531 Ok(self.header_by_number(num)?)
1532 } else {
1533 Ok(None)
1534 }
1535 }
1536
1537 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1538 self.static_file_provider.header_by_number(num)
1539 }
1540
1541 fn headers_range(
1542 &self,
1543 range: impl RangeBounds<BlockNumber>,
1544 ) -> ProviderResult<Vec<Self::Header>> {
1545 self.static_file_provider.headers_range(range)
1546 }
1547
1548 fn sealed_header(
1549 &self,
1550 number: BlockNumber,
1551 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1552 self.static_file_provider.sealed_header(number)
1553 }
1554
1555 fn sealed_headers_while(
1556 &self,
1557 range: impl RangeBounds<BlockNumber>,
1558 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1559 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1560 self.static_file_provider.sealed_headers_while(range, predicate)
1561 }
1562}
1563
1564impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1565 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1566 self.static_file_provider.block_hash(number)
1567 }
1568
1569 fn canonical_hashes_range(
1570 &self,
1571 start: BlockNumber,
1572 end: BlockNumber,
1573 ) -> ProviderResult<Vec<B256>> {
1574 self.static_file_provider.canonical_hashes_range(start, end)
1575 }
1576}
1577
1578impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1579 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1580 let best_number = self.best_block_number()?;
1581 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1582 Ok(ChainInfo { best_hash, best_number })
1583 }
1584
1585 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1586 Ok(self
1589 .get_stage_checkpoint(StageId::Finish)?
1590 .map(|checkpoint| checkpoint.block_number)
1591 .unwrap_or_default())
1592 }
1593
1594 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1595 self.static_file_provider.last_block_number()
1596 }
1597
1598 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1599 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1600 }
1601}
1602
1603impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1604 type Block = BlockTy<N>;
1605
1606 fn find_block_by_hash(
1607 &self,
1608 hash: B256,
1609 source: BlockSource,
1610 ) -> ProviderResult<Option<Self::Block>> {
1611 if source.is_canonical() {
1612 self.block(hash.into())
1613 } else {
1614 Ok(None)
1615 }
1616 }
1617
1618 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1626 if let Some(number) = self.convert_hash_or_number(id)? {
1627 let earliest_available = self.static_file_provider.earliest_history_height();
1628 if number < earliest_available {
1629 return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1630 }
1631
1632 let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1633
1634 let Some(transactions) = self.transactions_by_block(number.into())? else {
1639 return Ok(None)
1640 };
1641
1642 let body = self
1643 .storage
1644 .reader()
1645 .read_block_bodies(self, vec![(&header, transactions)])?
1646 .pop()
1647 .ok_or(ProviderError::InvalidStorageOutput)?;
1648
1649 return Ok(Some(Self::Block::new(header, body)))
1650 }
1651
1652 Ok(None)
1653 }
1654
1655 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1656 Ok(None)
1657 }
1658
1659 fn pending_block_and_receipts(
1660 &self,
1661 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1662 Ok(None)
1663 }
1664
1665 fn recovered_block(
1674 &self,
1675 id: BlockHashOrNumber,
1676 transaction_kind: TransactionVariant,
1677 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1678 self.recovered_block(
1679 id,
1680 transaction_kind,
1681 |block_number| self.header_by_number(block_number),
1682 |header, body, senders| {
1683 Self::Block::new(header, body)
1684 .try_into_recovered_unchecked(senders)
1688 .map(Some)
1689 .map_err(|_| ProviderError::SenderRecoveryError)
1690 },
1691 )
1692 }
1693
1694 fn sealed_block_with_senders(
1695 &self,
1696 id: BlockHashOrNumber,
1697 transaction_kind: TransactionVariant,
1698 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1699 self.recovered_block(
1700 id,
1701 transaction_kind,
1702 |block_number| self.sealed_header(block_number),
1703 |header, body, senders| {
1704 Self::Block::new_sealed(header, body)
1705 .try_with_senders_unchecked(senders)
1709 .map(Some)
1710 .map_err(|_| ProviderError::SenderRecoveryError)
1711 },
1712 )
1713 }
1714
1715 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1716 self.block_range(
1717 range,
1718 |range| self.headers_range(range),
1719 |header, body, _| Ok(Self::Block::new(header, body)),
1720 )
1721 }
1722
1723 fn block_with_senders_range(
1724 &self,
1725 range: RangeInclusive<BlockNumber>,
1726 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1727 self.block_with_senders_range(
1728 range,
1729 |range| self.headers_range(range),
1730 |header, body, senders| {
1731 Self::Block::new(header, body)
1732 .try_into_recovered_unchecked(senders)
1733 .map_err(|_| ProviderError::SenderRecoveryError)
1734 },
1735 )
1736 }
1737
1738 fn recovered_block_range(
1739 &self,
1740 range: RangeInclusive<BlockNumber>,
1741 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1742 self.block_with_senders_range(
1743 range,
1744 |range| self.sealed_headers_range(range),
1745 |header, body, senders| {
1746 Self::Block::new_sealed(header, body)
1747 .try_with_senders(senders)
1748 .map_err(|_| ProviderError::SenderRecoveryError)
1749 },
1750 )
1751 }
1752
1753 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1754 Ok(self
1755 .tx
1756 .cursor_read::<tables::TransactionBlocks>()?
1757 .seek(id)
1758 .map(|b| b.map(|(_, bn)| bn))?)
1759 }
1760}
1761
1762impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1763 for DatabaseProvider<TX, N>
1764{
1765 fn transaction_hashes_by_range(
1768 &self,
1769 tx_range: Range<TxNumber>,
1770 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1771 self.static_file_provider.transaction_hashes_by_range(tx_range)
1772 }
1773}
1774
1775impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1777 type Transaction = TxTy<N>;
1778
1779 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1780 self.with_rocksdb_tx(|tx_ref| {
1781 let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1782 reader.get_transaction_hash_number(tx_hash)
1783 })
1784 }
1785
1786 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1787 self.static_file_provider.transaction_by_id(id)
1788 }
1789
1790 fn transaction_by_id_unhashed(
1791 &self,
1792 id: TxNumber,
1793 ) -> ProviderResult<Option<Self::Transaction>> {
1794 self.static_file_provider.transaction_by_id_unhashed(id)
1795 }
1796
1797 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1798 if let Some(id) = self.transaction_id(hash)? {
1799 Ok(self.transaction_by_id_unhashed(id)?)
1800 } else {
1801 Ok(None)
1802 }
1803 }
1804
1805 fn transaction_by_hash_with_meta(
1806 &self,
1807 tx_hash: TxHash,
1808 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1809 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1810 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1811 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1812 let Some(sealed_header) = self.sealed_header(block_number)?
1813 {
1814 let (header, block_hash) = sealed_header.split();
1815 if let Some(block_body) = self.block_body_indices(block_number)? {
1816 let index = transaction_id - block_body.first_tx_num();
1821
1822 let meta = TransactionMeta {
1823 tx_hash,
1824 index,
1825 block_hash,
1826 block_number,
1827 base_fee: header.base_fee_per_gas(),
1828 excess_blob_gas: header.excess_blob_gas(),
1829 timestamp: header.timestamp(),
1830 };
1831
1832 return Ok(Some((transaction, meta)))
1833 }
1834 }
1835
1836 Ok(None)
1837 }
1838
1839 fn transactions_by_block(
1840 &self,
1841 id: BlockHashOrNumber,
1842 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1843 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1844 let Some(body) = self.block_body_indices(block_number)?
1845 {
1846 let tx_range = body.tx_num_range();
1847 return if tx_range.is_empty() {
1848 Ok(Some(Vec::new()))
1849 } else {
1850 self.transactions_by_tx_range(tx_range).map(Some)
1851 }
1852 }
1853 Ok(None)
1854 }
1855
1856 fn transactions_by_block_range(
1857 &self,
1858 range: impl RangeBounds<BlockNumber>,
1859 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1860 let range = to_range(range);
1861
1862 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1863 .into_iter()
1864 .map(|body| {
1865 let tx_num_range = body.tx_num_range();
1866 if tx_num_range.is_empty() {
1867 Ok(Vec::new())
1868 } else {
1869 self.transactions_by_tx_range(tx_num_range)
1870 }
1871 })
1872 .collect()
1873 }
1874
1875 fn transactions_by_tx_range(
1876 &self,
1877 range: impl RangeBounds<TxNumber>,
1878 ) -> ProviderResult<Vec<Self::Transaction>> {
1879 self.static_file_provider.transactions_by_tx_range(range)
1880 }
1881
1882 fn senders_by_tx_range(
1883 &self,
1884 range: impl RangeBounds<TxNumber>,
1885 ) -> ProviderResult<Vec<Address>> {
1886 if EitherWriterDestination::senders(self).is_static_file() {
1887 self.static_file_provider.senders_by_tx_range(range)
1888 } else {
1889 self.cursor_read_collect::<tables::TransactionSenders>(range)
1890 }
1891 }
1892
1893 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1894 if EitherWriterDestination::senders(self).is_static_file() {
1895 self.static_file_provider.transaction_sender(id)
1896 } else {
1897 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1898 }
1899 }
1900}
1901
1902impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1903 type Receipt = ReceiptTy<N>;
1904
1905 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1906 self.static_file_provider.get_with_static_file_or_database(
1907 StaticFileSegment::Receipts,
1908 id,
1909 |static_file| static_file.receipt(id),
1910 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1911 )
1912 }
1913
1914 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1915 if let Some(id) = self.transaction_id(hash)? {
1916 self.receipt(id)
1917 } else {
1918 Ok(None)
1919 }
1920 }
1921
1922 fn receipts_by_block(
1923 &self,
1924 block: BlockHashOrNumber,
1925 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1926 if let Some(number) = self.convert_hash_or_number(block)? &&
1927 let Some(body) = self.block_body_indices(number)?
1928 {
1929 let tx_range = body.tx_num_range();
1930 return if tx_range.is_empty() {
1931 Ok(Some(Vec::new()))
1932 } else {
1933 self.receipts_by_tx_range(tx_range).map(Some)
1934 }
1935 }
1936 Ok(None)
1937 }
1938
1939 fn receipts_by_tx_range(
1940 &self,
1941 range: impl RangeBounds<TxNumber>,
1942 ) -> ProviderResult<Vec<Self::Receipt>> {
1943 self.static_file_provider.get_range_with_static_file_or_database(
1944 StaticFileSegment::Receipts,
1945 to_range(range),
1946 |static_file, range, _| static_file.receipts_by_tx_range(range),
1947 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1948 |_| true,
1949 )
1950 }
1951
1952 fn receipts_by_block_range(
1953 &self,
1954 block_range: RangeInclusive<BlockNumber>,
1955 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1956 if block_range.is_empty() {
1957 return Ok(Vec::new());
1958 }
1959
1960 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
1962 let mut block_body_indices = Vec::with_capacity(range_len);
1963 for block_num in block_range {
1964 if let Some(indices) = self.block_body_indices(block_num)? {
1965 block_body_indices.push(indices);
1966 } else {
1967 block_body_indices.push(StoredBlockBodyIndices::default());
1969 }
1970 }
1971
1972 if block_body_indices.is_empty() {
1973 return Ok(Vec::new());
1974 }
1975
1976 let non_empty_blocks: Vec<_> =
1978 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1979
1980 if non_empty_blocks.is_empty() {
1981 return Ok(vec![Vec::new(); block_body_indices.len()]);
1983 }
1984
1985 let first_tx = non_empty_blocks[0].first_tx_num();
1987 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1988
1989 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1991 let mut receipts_iter = all_receipts.into_iter();
1992
1993 let mut result = Vec::with_capacity(block_body_indices.len());
1995 for indices in &block_body_indices {
1996 if indices.tx_count == 0 {
1997 result.push(Vec::new());
1998 } else {
1999 let block_receipts =
2000 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2001 result.push(block_receipts);
2002 }
2003 }
2004
2005 Ok(result)
2006 }
2007}
2008
2009impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2010 for DatabaseProvider<TX, N>
2011{
2012 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2013 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2014 }
2015
2016 fn block_body_indices_range(
2017 &self,
2018 range: RangeInclusive<BlockNumber>,
2019 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2020 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2021 }
2022}
2023
2024impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2025 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2026 Ok(if let Some(encoded) = id.get_pre_encoded() {
2027 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2028 } else {
2029 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2030 })
2031 }
2032
2033 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2035 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2036 }
2037
2038 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2039 self.tx
2040 .cursor_read::<tables::StageCheckpoints>()?
2041 .walk(None)?
2042 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2043 .map_err(ProviderError::Database)
2044 }
2045}
2046
2047impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2048 fn save_stage_checkpoint(
2050 &self,
2051 id: StageId,
2052 checkpoint: StageCheckpoint,
2053 ) -> ProviderResult<()> {
2054 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2055 }
2056
2057 fn save_stage_checkpoint_progress(
2059 &self,
2060 id: StageId,
2061 checkpoint: Vec<u8>,
2062 ) -> ProviderResult<()> {
2063 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2064 }
2065
2066 #[instrument(level = "debug", target = "providers::db", skip_all)]
2067 fn update_pipeline_stages(
2068 &self,
2069 block_number: BlockNumber,
2070 drop_stage_checkpoint: bool,
2071 ) -> ProviderResult<()> {
2072 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2074 for stage_id in StageId::ALL {
2075 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2076 cursor.upsert(
2077 stage_id.to_string(),
2078 &StageCheckpoint {
2079 block_number,
2080 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2081 },
2082 )?;
2083 }
2084
2085 Ok(())
2086 }
2087}
2088
2089impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2090 fn plain_state_storages(
2091 &self,
2092 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2093 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2094 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2095
2096 addresses_with_keys
2097 .into_iter()
2098 .map(|(address, storage)| {
2099 storage
2100 .into_iter()
2101 .map(|key| -> ProviderResult<_> {
2102 Ok(plain_storage
2103 .seek_by_key_subkey(address, key)?
2104 .filter(|v| v.key == key)
2105 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2106 })
2107 .collect::<ProviderResult<Vec<_>>>()
2108 .map(|storage| (address, storage))
2109 })
2110 .collect::<ProviderResult<Vec<(_, _)>>>()
2111 }
2112
2113 fn changed_storages_with_range(
2114 &self,
2115 range: RangeInclusive<BlockNumber>,
2116 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2117 if self.cached_storage_settings().storage_changesets_in_static_files {
2118 self.storage_changesets_range(range)?.into_iter().try_fold(
2119 BTreeMap::new(),
2120 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2121 let (BlockNumberAddress((_, address)), storage_entry) = entry;
2122 accounts.entry(address).or_default().insert(storage_entry.key);
2123 Ok(accounts)
2124 },
2125 )
2126 } else {
2127 self.tx
2128 .cursor_read::<tables::StorageChangeSets>()?
2129 .walk_range(BlockNumberAddress::range(range))?
2130 .try_fold(
2133 BTreeMap::new(),
2134 |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2135 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2136 accounts.entry(address).or_default().insert(storage_entry.key);
2137 Ok(accounts)
2138 },
2139 )
2140 }
2141 }
2142
2143 fn changed_storages_and_blocks_with_range(
2144 &self,
2145 range: RangeInclusive<BlockNumber>,
2146 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2147 if self.cached_storage_settings().storage_changesets_in_static_files {
2148 self.storage_changesets_range(range)?.into_iter().try_fold(
2149 BTreeMap::new(),
2150 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2151 storages
2152 .entry((index.address(), storage.key))
2153 .or_default()
2154 .push(index.block_number());
2155 Ok(storages)
2156 },
2157 )
2158 } else {
2159 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2160
2161 let storage_changeset_lists =
2162 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2163 BTreeMap::new(),
2164 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2165 entry|
2166 -> ProviderResult<_> {
2167 let (index, storage) = entry?;
2168 storages
2169 .entry((index.address(), storage.key))
2170 .or_default()
2171 .push(index.block_number());
2172 Ok(storages)
2173 },
2174 )?;
2175
2176 Ok(storage_changeset_lists)
2177 }
2178 }
2179}
2180
2181impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2182 for DatabaseProvider<TX, N>
2183{
2184 type Receipt = ReceiptTy<N>;
2185
2186 #[instrument(level = "debug", target = "providers::db", skip_all)]
2187 fn write_state<'a>(
2188 &self,
2189 execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2190 is_value_known: OriginalValuesKnown,
2191 config: StateWriteConfig,
2192 ) -> ProviderResult<()> {
2193 let execution_outcome = execution_outcome.into();
2194 let first_block = execution_outcome.first_block();
2195
2196 let (plain_state, reverts) =
2197 execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2198
2199 self.write_state_reverts(reverts, first_block, config)?;
2200 self.write_state_changes(plain_state)?;
2201
2202 if !config.write_receipts {
2203 return Ok(());
2204 }
2205
2206 let block_count = execution_outcome.len() as u64;
2207 let last_block = execution_outcome.last_block();
2208 let block_range = first_block..=last_block;
2209
2210 let tip = self.last_block_number()?.max(last_block);
2211
2212 let block_indices: Vec<_> = self
2214 .block_body_indices_range(block_range)?
2215 .into_iter()
2216 .map(|b| b.first_tx_num)
2217 .collect();
2218
2219 if block_indices.len() < block_count as usize {
2221 let missing_blocks = block_count - block_indices.len() as u64;
2222 return Err(ProviderError::BlockBodyIndicesNotFound(
2223 last_block.saturating_sub(missing_blocks - 1),
2224 ));
2225 }
2226
2227 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2228
2229 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2230 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2231
2232 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2240 self.static_file_provider()
2241 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2242 .is_none()) &&
2243 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2244
2245 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
2247 for (_, addresses) in contract_log_pruner.range(..first_block) {
2248 allowed_addresses.extend(addresses.iter().copied());
2249 }
2250
2251 for (idx, (receipts, first_tx_index)) in
2252 execution_outcome.receipts().zip(block_indices).enumerate()
2253 {
2254 let block_number = first_block + idx as u64;
2255
2256 receipts_writer.increment_block(block_number)?;
2258
2259 if prunable_receipts &&
2261 self.prune_modes
2262 .receipts
2263 .is_some_and(|mode| mode.should_prune(block_number, tip))
2264 {
2265 continue
2266 }
2267
2268 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2270 allowed_addresses.extend(new_addresses.iter().copied());
2271 }
2272
2273 for (idx, receipt) in receipts.iter().enumerate() {
2274 let receipt_idx = first_tx_index + idx as u64;
2275 if prunable_receipts &&
2278 has_contract_log_filter &&
2279 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2280 {
2281 continue
2282 }
2283
2284 receipts_writer.append_receipt(receipt_idx, receipt)?;
2285 }
2286 }
2287
2288 Ok(())
2289 }
2290
2291 fn write_state_reverts(
2292 &self,
2293 reverts: PlainStateReverts,
2294 first_block: BlockNumber,
2295 config: StateWriteConfig,
2296 ) -> ProviderResult<()> {
2297 tracing::trace!("Writing storage changes");
2299 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2300 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2301 let block_number = first_block + block_index as BlockNumber;
2302
2303 tracing::trace!(block_number, "Writing block change");
2304 storage_changes.par_sort_unstable_by_key(|a| a.address);
2306 let total_changes =
2307 storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2308 let mut changeset = Vec::with_capacity(total_changes);
2309 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2310 let mut storage = storage_revert
2311 .into_iter()
2312 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
2313 .collect::<Vec<_>>();
2314 storage.par_sort_unstable_by_key(|a| a.0);
2316
2317 let mut wiped_storage = Vec::new();
2325 if wiped {
2326 tracing::trace!(?address, "Wiping storage");
2327 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2328 wiped_storage.push((entry.key, entry.value));
2329 while let Some(entry) = storages_cursor.next_dup_val()? {
2330 wiped_storage.push((entry.key, entry.value))
2331 }
2332 }
2333 }
2334
2335 tracing::trace!(?address, ?storage, "Writing storage reverts");
2336 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2337 changeset.push(StorageBeforeTx { address, key, value });
2338 }
2339 }
2340
2341 let mut storage_changesets_writer =
2342 EitherWriter::new_storage_changesets(self, block_number)?;
2343 storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2344 }
2345
2346 if !config.write_account_changesets {
2347 return Ok(());
2348 }
2349
2350 tracing::trace!(?first_block, "Writing account changes");
2352 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2353 let block_number = first_block + block_index as BlockNumber;
2354 let changeset = account_block_reverts
2355 .into_iter()
2356 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2357 .collect::<Vec<_>>();
2358 let mut account_changesets_writer =
2359 EitherWriter::new_account_changesets(self, block_number)?;
2360
2361 account_changesets_writer.append_account_changeset(block_number, changeset)?;
2362 }
2363
2364 Ok(())
2365 }
2366
2367 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2368 changes.accounts.par_sort_by_key(|a| a.0);
2371 changes.storage.par_sort_by_key(|a| a.address);
2372 changes.contracts.par_sort_by_key(|a| a.0);
2373
2374 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2376 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2377 for (address, account) in changes.accounts {
2379 if let Some(account) = account {
2380 tracing::trace!(?address, "Updating plain state account");
2381 accounts_cursor.upsert(address, &account.into())?;
2382 } else if accounts_cursor.seek_exact(address)?.is_some() {
2383 tracing::trace!(?address, "Deleting plain state account");
2384 accounts_cursor.delete_current()?;
2385 }
2386 }
2387
2388 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2390 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
2391 for (hash, bytecode) in changes.contracts {
2392 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
2393 }
2394
2395 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2397 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2398 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2399 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2401 storages_cursor.delete_current_duplicates()?;
2402 }
2403 let mut storage = storage
2405 .into_iter()
2406 .map(|(k, value)| StorageEntry { key: k.into(), value })
2407 .collect::<Vec<_>>();
2408 storage.par_sort_unstable_by_key(|a| a.key);
2410
2411 for entry in storage {
2412 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2413 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2414 db_entry.key == entry.key
2415 {
2416 storages_cursor.delete_current()?;
2417 }
2418
2419 if !entry.value.is_zero() {
2420 storages_cursor.upsert(address, &entry)?;
2421 }
2422 }
2423 }
2424
2425 Ok(())
2426 }
2427
2428 #[instrument(level = "debug", target = "providers::db", skip_all)]
2429 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2430 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2432 for (hashed_address, account) in hashed_state.accounts() {
2433 if let Some(account) = account {
2434 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2435 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2436 hashed_accounts_cursor.delete_current()?;
2437 }
2438 }
2439
2440 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2442 let mut hashed_storage_cursor =
2443 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2444 for (hashed_address, storage) in sorted_storages {
2445 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2446 hashed_storage_cursor.delete_current_duplicates()?;
2447 }
2448
2449 for (hashed_slot, value) in storage.storage_slots_ref() {
2450 let entry = StorageEntry { key: *hashed_slot, value: *value };
2451
2452 if let Some(db_entry) =
2453 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2454 db_entry.key == entry.key
2455 {
2456 hashed_storage_cursor.delete_current()?;
2457 }
2458
2459 if !entry.value.is_zero() {
2460 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2461 }
2462 }
2463 }
2464
2465 Ok(())
2466 }
2467
2468 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2490 let range = block + 1..=self.last_block_number()?;
2491
2492 if range.is_empty() {
2493 return Ok(());
2494 }
2495
2496 let block_bodies = self.block_body_indices_range(range.clone())?;
2498
2499 let from_transaction_num =
2501 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2502
2503 let storage_range = BlockNumberAddress::range(range.clone());
2504 let storage_changeset = if let Some(_highest_block) = self
2505 .static_file_provider
2506 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2507 self.cached_storage_settings().storage_changesets_in_static_files
2508 {
2509 let changesets = self.storage_changesets_range(range.clone())?;
2510 let mut changeset_writer =
2511 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2512 changeset_writer.prune_storage_changesets(block)?;
2513 changesets
2514 } else {
2515 self.take::<tables::StorageChangeSets>(storage_range)?
2516 };
2517 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2518
2519 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2524 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2525
2526 let (state, _) = self.populate_bundle_state(
2527 account_changeset,
2528 storage_changeset,
2529 &mut plain_accounts_cursor,
2530 &mut plain_storage_cursor,
2531 )?;
2532
2533 for (address, (old_account, new_account, storage)) in &state {
2535 if old_account != new_account {
2537 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2538 if let Some(account) = old_account {
2539 plain_accounts_cursor.upsert(*address, account)?;
2540 } else if existing_entry.is_some() {
2541 plain_accounts_cursor.delete_current()?;
2542 }
2543 }
2544
2545 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2547 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2548 if plain_storage_cursor
2550 .seek_by_key_subkey(*address, *storage_key)?
2551 .filter(|s| s.key == *storage_key)
2552 .is_some()
2553 {
2554 plain_storage_cursor.delete_current()?
2555 }
2556
2557 if !old_storage_value.is_zero() {
2559 plain_storage_cursor.upsert(*address, &storage_entry)?;
2560 }
2561 }
2562 }
2563
2564 self.remove_receipts_from(from_transaction_num, block)?;
2565
2566 Ok(())
2567 }
2568
2569 fn take_state_above(
2591 &self,
2592 block: BlockNumber,
2593 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2594 let range = block + 1..=self.last_block_number()?;
2595
2596 if range.is_empty() {
2597 return Ok(ExecutionOutcome::default())
2598 }
2599 let start_block_number = *range.start();
2600
2601 let block_bodies = self.block_body_indices_range(range.clone())?;
2603
2604 let from_transaction_num =
2606 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2607 let to_transaction_num =
2608 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2609
2610 let storage_range = BlockNumberAddress::range(range.clone());
2611 let storage_changeset = if let Some(highest_block) = self
2612 .static_file_provider
2613 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2614 self.cached_storage_settings().storage_changesets_in_static_files
2615 {
2616 let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2617 let mut changeset_writer =
2618 self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2619 changeset_writer.prune_storage_changesets(block)?;
2620 changesets
2621 } else {
2622 self.take::<tables::StorageChangeSets>(storage_range)?
2623 };
2624
2625 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2630 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2631
2632 let highest_changeset_block = self
2634 .static_file_provider
2635 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2636 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2637 self.cached_storage_settings().account_changesets_in_static_files
2638 {
2639 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2641 let mut changeset_writer =
2642 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2643 changeset_writer.prune_account_changesets(block)?;
2644
2645 changesets
2646 } else {
2647 self.take::<tables::AccountChangeSets>(range)?
2650 };
2651
2652 let (state, reverts) = self.populate_bundle_state(
2655 account_changeset,
2656 storage_changeset,
2657 &mut plain_accounts_cursor,
2658 &mut plain_storage_cursor,
2659 )?;
2660
2661 for (address, (old_account, new_account, storage)) in &state {
2663 if old_account != new_account {
2665 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2666 if let Some(account) = old_account {
2667 plain_accounts_cursor.upsert(*address, account)?;
2668 } else if existing_entry.is_some() {
2669 plain_accounts_cursor.delete_current()?;
2670 }
2671 }
2672
2673 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2675 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2676 if plain_storage_cursor
2678 .seek_by_key_subkey(*address, *storage_key)?
2679 .filter(|s| s.key == *storage_key)
2680 .is_some()
2681 {
2682 plain_storage_cursor.delete_current()?
2683 }
2684
2685 if !old_storage_value.is_zero() {
2687 plain_storage_cursor.upsert(*address, &storage_entry)?;
2688 }
2689 }
2690 }
2691
2692 let mut receipts_iter = self
2694 .static_file_provider
2695 .get_range_with_static_file_or_database(
2696 StaticFileSegment::Receipts,
2697 from_transaction_num..to_transaction_num + 1,
2698 |static_file, range, _| {
2699 static_file
2700 .receipts_by_tx_range(range.clone())
2701 .map(|r| range.into_iter().zip(r).collect())
2702 },
2703 |range, _| {
2704 self.tx
2705 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2706 .walk_range(range)?
2707 .map(|r| r.map_err(Into::into))
2708 .collect()
2709 },
2710 |_| true,
2711 )?
2712 .into_iter()
2713 .peekable();
2714
2715 let mut receipts = Vec::with_capacity(block_bodies.len());
2716 for block_body in block_bodies {
2718 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2719 for num in block_body.tx_num_range() {
2720 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2721 block_receipts.push(receipts_iter.next().unwrap().1);
2722 }
2723 }
2724 receipts.push(block_receipts);
2725 }
2726
2727 self.remove_receipts_from(from_transaction_num, block)?;
2728
2729 Ok(ExecutionOutcome::new_init(
2730 state,
2731 reverts,
2732 Vec::new(),
2733 receipts,
2734 start_block_number,
2735 Vec::new(),
2736 ))
2737 }
2738}
2739
2740impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2741 #[instrument(level = "debug", target = "providers::db", skip_all)]
2745 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2746 if trie_updates.is_empty() {
2747 return Ok(0)
2748 }
2749
2750 let mut num_entries = 0;
2752
2753 let tx = self.tx_ref();
2754 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2755
2756 for (key, updated_node) in trie_updates.account_nodes_ref() {
2758 let nibbles = StoredNibbles(*key);
2759 match updated_node {
2760 Some(node) => {
2761 if !nibbles.0.is_empty() {
2762 num_entries += 1;
2763 account_trie_cursor.upsert(nibbles, node)?;
2764 }
2765 }
2766 None => {
2767 num_entries += 1;
2768 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2769 account_trie_cursor.delete_current()?;
2770 }
2771 }
2772 }
2773 }
2774
2775 num_entries +=
2776 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2777
2778 Ok(num_entries)
2779 }
2780}
2781
2782impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2783 fn write_storage_trie_updates_sorted<'a>(
2789 &self,
2790 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2791 ) -> ProviderResult<usize> {
2792 let mut num_entries = 0;
2793 let mut storage_tries = storage_tries.collect::<Vec<_>>();
2794 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2795 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2796 for (hashed_address, storage_trie_updates) in storage_tries {
2797 let mut db_storage_trie_cursor =
2798 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2799 num_entries +=
2800 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2801 cursor = db_storage_trie_cursor.cursor;
2802 }
2803
2804 Ok(num_entries)
2805 }
2806}
2807
2808impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2809 fn unwind_account_hashing<'a>(
2810 &self,
2811 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2812 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2813 let hashed_accounts = changesets
2817 .into_iter()
2818 .map(|(_, e)| (keccak256(e.address), e.info))
2819 .collect::<Vec<_>>()
2820 .into_iter()
2821 .rev()
2822 .collect::<BTreeMap<_, _>>();
2823
2824 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2826 for (hashed_address, account) in &hashed_accounts {
2827 if let Some(account) = account {
2828 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2829 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2830 hashed_accounts_cursor.delete_current()?;
2831 }
2832 }
2833
2834 Ok(hashed_accounts)
2835 }
2836
2837 fn unwind_account_hashing_range(
2838 &self,
2839 range: impl RangeBounds<BlockNumber>,
2840 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2841 let changesets = self
2842 .tx
2843 .cursor_read::<tables::AccountChangeSets>()?
2844 .walk_range(range)?
2845 .collect::<Result<Vec<_>, _>>()?;
2846 self.unwind_account_hashing(changesets.iter())
2847 }
2848
2849 fn insert_account_for_hashing(
2850 &self,
2851 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2852 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2853 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2854 let hashed_accounts =
2855 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2856 for (hashed_address, account) in &hashed_accounts {
2857 if let Some(account) = account {
2858 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2859 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2860 hashed_accounts_cursor.delete_current()?;
2861 }
2862 }
2863 Ok(hashed_accounts)
2864 }
2865
2866 fn unwind_storage_hashing(
2867 &self,
2868 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2869 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2870 let mut hashed_storages = changesets
2872 .into_iter()
2873 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2874 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2875 })
2876 .collect::<Vec<_>>();
2877 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2878
2879 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2881 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2882 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2883 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2884 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2885
2886 if hashed_storage
2887 .seek_by_key_subkey(hashed_address, key)?
2888 .filter(|entry| entry.key == key)
2889 .is_some()
2890 {
2891 hashed_storage.delete_current()?;
2892 }
2893
2894 if !value.is_zero() {
2895 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2896 }
2897 }
2898 Ok(hashed_storage_keys)
2899 }
2900
2901 fn unwind_storage_hashing_range(
2902 &self,
2903 range: impl RangeBounds<BlockNumberAddress>,
2904 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2905 let changesets = self
2906 .tx
2907 .cursor_read::<tables::StorageChangeSets>()?
2908 .walk_range(range)?
2909 .collect::<Result<Vec<_>, _>>()?;
2910 self.unwind_storage_hashing(changesets.into_iter())
2911 }
2912
2913 fn insert_storage_for_hashing(
2914 &self,
2915 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2916 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2917 let hashed_storages =
2919 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2920 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2921 map.insert(keccak256(entry.key), entry.value);
2922 map
2923 });
2924 map.insert(keccak256(address), storage);
2925 map
2926 });
2927
2928 let hashed_storage_keys = hashed_storages
2929 .iter()
2930 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2931 .collect();
2932
2933 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2934 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2937 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2938 if hashed_storage_cursor
2939 .seek_by_key_subkey(hashed_address, key)?
2940 .filter(|entry| entry.key == key)
2941 .is_some()
2942 {
2943 hashed_storage_cursor.delete_current()?;
2944 }
2945
2946 if !value.is_zero() {
2947 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2948 }
2949 Ok(())
2950 })
2951 })?;
2952
2953 Ok(hashed_storage_keys)
2954 }
2955}
2956
2957impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2958 fn unwind_account_history_indices<'a>(
2959 &self,
2960 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2961 ) -> ProviderResult<usize> {
2962 let mut last_indices = changesets
2963 .into_iter()
2964 .map(|(index, account)| (account.address, *index))
2965 .collect::<Vec<_>>();
2966 last_indices.sort_unstable_by_key(|(a, _)| *a);
2967
2968 if self.cached_storage_settings().account_history_in_rocksdb {
2969 #[cfg(all(unix, feature = "rocksdb"))]
2970 {
2971 let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
2972 self.pending_rocksdb_batches.lock().push(batch);
2973 }
2974 } else {
2975 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2977 for &(address, rem_index) in &last_indices {
2978 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2979 &mut cursor,
2980 ShardedKey::last(address),
2981 rem_index,
2982 |sharded_key| sharded_key.key == address,
2983 )?;
2984
2985 if !partial_shard.is_empty() {
2988 cursor.insert(
2989 ShardedKey::last(address),
2990 &BlockNumberList::new_pre_sorted(partial_shard),
2991 )?;
2992 }
2993 }
2994 }
2995
2996 let changesets = last_indices.len();
2997 Ok(changesets)
2998 }
2999
3000 fn unwind_account_history_indices_range(
3001 &self,
3002 range: impl RangeBounds<BlockNumber>,
3003 ) -> ProviderResult<usize> {
3004 let changesets = self
3005 .tx
3006 .cursor_read::<tables::AccountChangeSets>()?
3007 .walk_range(range)?
3008 .collect::<Result<Vec<_>, _>>()?;
3009 self.unwind_account_history_indices(changesets.iter())
3010 }
3011
3012 fn insert_account_history_index(
3013 &self,
3014 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3015 ) -> ProviderResult<()> {
3016 self.append_history_index::<_, tables::AccountsHistory>(
3017 account_transitions,
3018 ShardedKey::new,
3019 )
3020 }
3021
3022 fn unwind_storage_history_indices(
3023 &self,
3024 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3025 ) -> ProviderResult<usize> {
3026 let mut storage_changesets = changesets
3027 .into_iter()
3028 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3029 .collect::<Vec<_>>();
3030 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3031
3032 if self.cached_storage_settings().storages_history_in_rocksdb {
3033 #[cfg(all(unix, feature = "rocksdb"))]
3034 {
3035 let batch =
3036 self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3037 self.pending_rocksdb_batches.lock().push(batch);
3038 }
3039 } else {
3040 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3042 for &(address, storage_key, rem_index) in &storage_changesets {
3043 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3044 &mut cursor,
3045 StorageShardedKey::last(address, storage_key),
3046 rem_index,
3047 |storage_sharded_key| {
3048 storage_sharded_key.address == address &&
3049 storage_sharded_key.sharded_key.key == storage_key
3050 },
3051 )?;
3052
3053 if !partial_shard.is_empty() {
3056 cursor.insert(
3057 StorageShardedKey::last(address, storage_key),
3058 &BlockNumberList::new_pre_sorted(partial_shard),
3059 )?;
3060 }
3061 }
3062 }
3063
3064 let changesets = storage_changesets.len();
3065 Ok(changesets)
3066 }
3067
3068 fn unwind_storage_history_indices_range(
3069 &self,
3070 range: impl RangeBounds<BlockNumberAddress>,
3071 ) -> ProviderResult<usize> {
3072 let changesets = self
3073 .tx
3074 .cursor_read::<tables::StorageChangeSets>()?
3075 .walk_range(range)?
3076 .collect::<Result<Vec<_>, _>>()?;
3077 self.unwind_storage_history_indices(changesets.into_iter())
3078 }
3079
3080 fn insert_storage_history_index(
3081 &self,
3082 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3083 ) -> ProviderResult<()> {
3084 self.append_history_index::<_, tables::StoragesHistory>(
3085 storage_transitions,
3086 |(address, storage_key), highest_block_number| {
3087 StorageShardedKey::new(address, storage_key, highest_block_number)
3088 },
3089 )
3090 }
3091
3092 #[instrument(level = "debug", target = "providers::db", skip_all)]
3093 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3094 let storage_settings = self.cached_storage_settings();
3095 if !storage_settings.account_history_in_rocksdb {
3096 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3097 self.insert_account_history_index(indices)?;
3098 }
3099
3100 if !storage_settings.storages_history_in_rocksdb {
3101 let indices = self.changed_storages_and_blocks_with_range(range)?;
3102 self.insert_storage_history_index(indices)?;
3103 }
3104
3105 Ok(())
3106 }
3107}
3108
3109impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3110 for DatabaseProvider<TX, N>
3111{
3112 fn take_block_and_execution_above(
3113 &self,
3114 block: BlockNumber,
3115 ) -> ProviderResult<Chain<Self::Primitives>> {
3116 let range = block + 1..=self.last_block_number()?;
3117
3118 self.unwind_trie_state_from(block + 1)?;
3119
3120 let execution_state = self.take_state_above(block)?;
3122
3123 let blocks = self.recovered_block_range(range)?;
3124
3125 self.remove_blocks_above(block)?;
3128
3129 self.update_pipeline_stages(block, true)?;
3131
3132 Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3133 }
3134
3135 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3136 self.unwind_trie_state_from(block + 1)?;
3137
3138 self.remove_state_above(block)?;
3140
3141 self.remove_blocks_above(block)?;
3144
3145 self.update_pipeline_stages(block, true)?;
3147
3148 Ok(())
3149 }
3150}
3151
3152impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3153 for DatabaseProvider<TX, N>
3154{
3155 type Block = BlockTy<N>;
3156 type Receipt = ReceiptTy<N>;
3157
3158 fn insert_block(
3163 &self,
3164 block: &RecoveredBlock<Self::Block>,
3165 ) -> ProviderResult<StoredBlockBodyIndices> {
3166 let block_number = block.number();
3167
3168 let executed_block = ExecutedBlock::new(
3170 Arc::new(block.clone()),
3171 Arc::new(BlockExecutionOutput {
3172 result: BlockExecutionResult {
3173 receipts: Default::default(),
3174 requests: Default::default(),
3175 gas_used: 0,
3176 blob_gas_used: 0,
3177 },
3178 state: Default::default(),
3179 }),
3180 ComputedTrieData::default(),
3181 );
3182
3183 self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3185
3186 self.block_body_indices(block_number)?
3188 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3189 }
3190
3191 fn append_block_bodies(
3192 &self,
3193 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3194 ) -> ProviderResult<()> {
3195 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3196
3197 let mut tx_writer =
3199 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3200
3201 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3202 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3203
3204 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3206
3207 for (block_number, body) in &bodies {
3208 tx_writer.increment_block(*block_number)?;
3210
3211 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3212 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3213
3214 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3215
3216 block_indices_cursor.append(*block_number, &block_indices)?;
3218
3219 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3220
3221 let Some(body) = body else { continue };
3222
3223 if !body.transactions().is_empty() {
3225 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3226 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3227 }
3228
3229 for transaction in body.transactions() {
3231 tx_writer.append_transaction(next_tx_num, transaction)?;
3232
3233 next_tx_num += 1;
3235 }
3236 }
3237
3238 self.storage.writer().write_block_bodies(self, bodies)?;
3239
3240 Ok(())
3241 }
3242
3243 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3244 let last_block_number = self.last_block_number()?;
3245 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3247 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3248 }
3249
3250 let highest_static_file_block = self
3252 .static_file_provider()
3253 .get_highest_static_file_block(StaticFileSegment::Headers)
3254 .expect("todo: error handling, headers should exist");
3255
3256 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3262 self.static_file_provider()
3263 .get_writer(block, StaticFileSegment::Headers)?
3264 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3265
3266 let unwind_tx_from = self
3268 .block_body_indices(block)?
3269 .map(|b| b.next_tx_num())
3270 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3271
3272 let unwind_tx_to = self
3274 .tx
3275 .cursor_read::<tables::BlockBodyIndices>()?
3276 .last()?
3277 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3279 .1
3280 .last_tx_num();
3281
3282 if unwind_tx_from <= unwind_tx_to {
3283 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3284 self.with_rocksdb_batch(|batch| {
3285 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3286 for (hash, _) in hashes {
3287 writer.delete_transaction_hash_number(hash)?;
3288 }
3289 Ok(((), writer.into_raw_rocksdb_batch()))
3290 })?;
3291 }
3292
3293 EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3294
3295 self.remove_bodies_above(block)?;
3296
3297 Ok(())
3298 }
3299
3300 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3301 self.storage.writer().remove_block_bodies_above(self, block)?;
3302
3303 let unwind_tx_from = self
3305 .block_body_indices(block)?
3306 .map(|b| b.next_tx_num())
3307 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3308
3309 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3310 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3311
3312 let static_file_tx_num =
3313 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3314
3315 let to_delete = static_file_tx_num
3316 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3317 .unwrap_or_default();
3318
3319 self.static_file_provider
3320 .latest_writer(StaticFileSegment::Transactions)?
3321 .prune_transactions(to_delete, block)?;
3322
3323 Ok(())
3324 }
3325
3326 fn append_blocks_with_state(
3328 &self,
3329 blocks: Vec<RecoveredBlock<Self::Block>>,
3330 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3331 hashed_state: HashedPostStateSorted,
3332 ) -> ProviderResult<()> {
3333 if blocks.is_empty() {
3334 debug!(target: "providers::db", "Attempted to append empty block range");
3335 return Ok(())
3336 }
3337
3338 let first_number = blocks[0].number();
3341
3342 let last_block_number = blocks[blocks.len() - 1].number();
3345
3346 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3347
3348 let (account_transitions, storage_transitions) = {
3353 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3354 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3355 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3356 let block_number = first_number + block_idx as u64;
3357 for (address, account_revert) in block_reverts {
3358 account_transitions.entry(*address).or_default().push(block_number);
3359 for storage_key in account_revert.storage.keys() {
3360 let key = B256::new(storage_key.to_be_bytes());
3361 storage_transitions.entry((*address, key)).or_default().push(block_number);
3362 }
3363 }
3364 }
3365 (account_transitions, storage_transitions)
3366 };
3367
3368 for block in blocks {
3370 self.insert_block(&block)?;
3371 durations_recorder.record_relative(metrics::Action::InsertBlock);
3372 }
3373
3374 self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3375 durations_recorder.record_relative(metrics::Action::InsertState);
3376
3377 self.write_hashed_state(&hashed_state)?;
3379 durations_recorder.record_relative(metrics::Action::InsertHashes);
3380
3381 self.insert_account_history_index(account_transitions)?;
3384 self.insert_storage_history_index(storage_transitions)?;
3385 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3386
3387 self.update_pipeline_stages(last_block_number, false)?;
3389 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3390
3391 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3392
3393 Ok(())
3394 }
3395}
3396
3397impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3398 fn get_prune_checkpoint(
3399 &self,
3400 segment: PruneSegment,
3401 ) -> ProviderResult<Option<PruneCheckpoint>> {
3402 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3403 }
3404
3405 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3406 Ok(PruneSegment::variants()
3407 .filter_map(|segment| {
3408 self.tx
3409 .get::<tables::PruneCheckpoints>(segment)
3410 .transpose()
3411 .map(|chk| chk.map(|chk| (segment, chk)))
3412 })
3413 .collect::<Result<_, _>>()?)
3414 }
3415}
3416
3417impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3418 fn save_prune_checkpoint(
3419 &self,
3420 segment: PruneSegment,
3421 checkpoint: PruneCheckpoint,
3422 ) -> ProviderResult<()> {
3423 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3424 }
3425}
3426
3427impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3428 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3429 let db_entries = self.tx.entries::<T>()?;
3430 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3431 Ok(entries) => entries,
3432 Err(ProviderError::UnsupportedProvider) => 0,
3433 Err(err) => return Err(err),
3434 };
3435
3436 Ok(db_entries + static_file_entries)
3437 }
3438}
3439
3440impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3441 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3442 let mut finalized_blocks = self
3443 .tx
3444 .cursor_read::<tables::ChainState>()?
3445 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3446 .take(1)
3447 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3448
3449 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3450 Ok(last_finalized_block_number)
3451 }
3452
3453 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3454 let mut finalized_blocks = self
3455 .tx
3456 .cursor_read::<tables::ChainState>()?
3457 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3458 .take(1)
3459 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3460
3461 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3462 Ok(last_finalized_block_number)
3463 }
3464}
3465
3466impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3467 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3468 Ok(self
3469 .tx
3470 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3471 }
3472
3473 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3474 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3475 }
3476}
3477
3478impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3479 type Tx = TX;
3480
3481 fn tx_ref(&self) -> &Self::Tx {
3482 &self.tx
3483 }
3484
3485 fn tx_mut(&mut self) -> &mut Self::Tx {
3486 &mut self.tx
3487 }
3488
3489 fn into_tx(self) -> Self::Tx {
3490 self.tx
3491 }
3492
3493 fn prune_modes_ref(&self) -> &PruneModes {
3494 self.prune_modes_ref()
3495 }
3496
3497 fn commit(self) -> ProviderResult<()> {
3499 if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3504 self.tx.commit()?;
3505
3506 #[cfg(all(unix, feature = "rocksdb"))]
3507 {
3508 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3509 for batch in batches {
3510 self.rocksdb_provider.commit_batch(batch)?;
3511 }
3512 }
3513
3514 self.static_file_provider.commit()?;
3515 } else {
3516 let mut timings = metrics::CommitTimings::default();
3518
3519 let start = Instant::now();
3520 self.static_file_provider.finalize()?;
3521 timings.sf = start.elapsed();
3522
3523 #[cfg(all(unix, feature = "rocksdb"))]
3524 {
3525 let start = Instant::now();
3526 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3527 for batch in batches {
3528 self.rocksdb_provider.commit_batch(batch)?;
3529 }
3530 timings.rocksdb = start.elapsed();
3531 }
3532
3533 let start = Instant::now();
3534 self.tx.commit()?;
3535 timings.mdbx = start.elapsed();
3536
3537 self.metrics.record_commit(&timings);
3538 }
3539
3540 Ok(())
3541 }
3542}
3543
3544impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3545 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3546 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3547 }
3548}
3549
3550impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3551 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3552 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3553 }
3554}
3555
3556impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3557 fn cached_storage_settings(&self) -> StorageSettings {
3558 *self.storage_settings.read()
3559 }
3560
3561 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3562 *self.storage_settings.write() = settings;
3563 }
3564}
3565
3566#[cfg(test)]
3567mod tests {
3568 use super::*;
3569 use crate::{
3570 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3571 BlockWriter,
3572 };
3573 use alloy_primitives::map::B256Map;
3574 use reth_ethereum_primitives::Receipt;
3575 use reth_testing_utils::generators::{self, random_block, BlockParams};
3576 use reth_trie::{Nibbles, StoredNibblesSubKey};
3577
3578 #[test]
3579 fn test_receipts_by_block_range_empty_range() {
3580 let factory = create_test_provider_factory();
3581 let provider = factory.provider().unwrap();
3582
3583 let start = 10u64;
3585 let end = 9u64;
3586 let result = provider.receipts_by_block_range(start..=end).unwrap();
3587 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3588 }
3589
3590 #[test]
3591 fn test_receipts_by_block_range_nonexistent_blocks() {
3592 let factory = create_test_provider_factory();
3593 let provider = factory.provider().unwrap();
3594
3595 let result = provider.receipts_by_block_range(10..=12).unwrap();
3597 assert_eq!(result, vec![vec![], vec![], vec![]]);
3598 }
3599
3600 #[test]
3601 fn test_receipts_by_block_range_single_block() {
3602 let factory = create_test_provider_factory();
3603 let data = BlockchainTestData::default();
3604
3605 let provider_rw = factory.provider_rw().unwrap();
3606 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3607 provider_rw
3608 .write_state(
3609 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3610 crate::OriginalValuesKnown::No,
3611 StateWriteConfig::default(),
3612 )
3613 .unwrap();
3614 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3615 provider_rw
3616 .write_state(
3617 &data.blocks[0].1,
3618 crate::OriginalValuesKnown::No,
3619 StateWriteConfig::default(),
3620 )
3621 .unwrap();
3622 provider_rw.commit().unwrap();
3623
3624 let provider = factory.provider().unwrap();
3625 let result = provider.receipts_by_block_range(1..=1).unwrap();
3626
3627 assert_eq!(result.len(), 1);
3629 assert_eq!(result[0].len(), 1);
3630 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3631 }
3632
3633 #[test]
3634 fn test_receipts_by_block_range_multiple_blocks() {
3635 let factory = create_test_provider_factory();
3636 let data = BlockchainTestData::default();
3637
3638 let provider_rw = factory.provider_rw().unwrap();
3639 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3640 provider_rw
3641 .write_state(
3642 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3643 crate::OriginalValuesKnown::No,
3644 StateWriteConfig::default(),
3645 )
3646 .unwrap();
3647 for i in 0..3 {
3648 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3649 provider_rw
3650 .write_state(
3651 &data.blocks[i].1,
3652 crate::OriginalValuesKnown::No,
3653 StateWriteConfig::default(),
3654 )
3655 .unwrap();
3656 }
3657 provider_rw.commit().unwrap();
3658
3659 let provider = factory.provider().unwrap();
3660 let result = provider.receipts_by_block_range(1..=3).unwrap();
3661
3662 assert_eq!(result.len(), 3);
3664 for (i, block_receipts) in result.iter().enumerate() {
3665 assert_eq!(block_receipts.len(), 1);
3666 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3667 }
3668 }
3669
3670 #[test]
3671 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3672 let factory = create_test_provider_factory();
3673 let data = BlockchainTestData::default();
3674
3675 let provider_rw = factory.provider_rw().unwrap();
3676 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3677 provider_rw
3678 .write_state(
3679 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3680 crate::OriginalValuesKnown::No,
3681 StateWriteConfig::default(),
3682 )
3683 .unwrap();
3684
3685 for i in 0..3 {
3687 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3688 provider_rw
3689 .write_state(
3690 &data.blocks[i].1,
3691 crate::OriginalValuesKnown::No,
3692 StateWriteConfig::default(),
3693 )
3694 .unwrap();
3695 }
3696 provider_rw.commit().unwrap();
3697
3698 let provider = factory.provider().unwrap();
3699 let result = provider.receipts_by_block_range(1..=3).unwrap();
3700
3701 assert_eq!(result.len(), 3);
3703 for block_receipts in &result {
3704 assert_eq!(block_receipts.len(), 1);
3705 }
3706 }
3707
3708 #[test]
3709 fn test_receipts_by_block_range_partial_range() {
3710 let factory = create_test_provider_factory();
3711 let data = BlockchainTestData::default();
3712
3713 let provider_rw = factory.provider_rw().unwrap();
3714 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3715 provider_rw
3716 .write_state(
3717 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3718 crate::OriginalValuesKnown::No,
3719 StateWriteConfig::default(),
3720 )
3721 .unwrap();
3722 for i in 0..3 {
3723 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3724 provider_rw
3725 .write_state(
3726 &data.blocks[i].1,
3727 crate::OriginalValuesKnown::No,
3728 StateWriteConfig::default(),
3729 )
3730 .unwrap();
3731 }
3732 provider_rw.commit().unwrap();
3733
3734 let provider = factory.provider().unwrap();
3735
3736 let result = provider.receipts_by_block_range(2..=5).unwrap();
3738 assert_eq!(result.len(), 4);
3739
3740 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3747 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3748 }
3749
3750 #[test]
3751 fn test_receipts_by_block_range_all_empty_blocks() {
3752 let factory = create_test_provider_factory();
3753 let mut rng = generators::rng();
3754
3755 let mut blocks = Vec::new();
3757 for i in 0..3 {
3758 let block =
3759 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3760 blocks.push(block);
3761 }
3762
3763 let provider_rw = factory.provider_rw().unwrap();
3764 for block in blocks {
3765 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
3766 }
3767 provider_rw.commit().unwrap();
3768
3769 let provider = factory.provider().unwrap();
3770 let result = provider.receipts_by_block_range(1..=3).unwrap();
3771
3772 assert_eq!(result.len(), 3);
3773 for block_receipts in result {
3774 assert_eq!(block_receipts.len(), 0);
3775 }
3776 }
3777
3778 #[test]
3779 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3780 let factory = create_test_provider_factory();
3781 let data = BlockchainTestData::default();
3782
3783 let provider_rw = factory.provider_rw().unwrap();
3784 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3785 provider_rw
3786 .write_state(
3787 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3788 crate::OriginalValuesKnown::No,
3789 StateWriteConfig::default(),
3790 )
3791 .unwrap();
3792 for i in 0..3 {
3793 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3794 provider_rw
3795 .write_state(
3796 &data.blocks[i].1,
3797 crate::OriginalValuesKnown::No,
3798 StateWriteConfig::default(),
3799 )
3800 .unwrap();
3801 }
3802 provider_rw.commit().unwrap();
3803
3804 let provider = factory.provider().unwrap();
3805
3806 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3808
3809 let mut individual_results = Vec::new();
3811 for block_num in 1..=3 {
3812 let receipts =
3813 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3814 individual_results.push(receipts);
3815 }
3816
3817 assert_eq!(range_result, individual_results);
3818 }
3819
3820 #[test]
3821 fn test_write_trie_updates_sorted() {
3822 use reth_trie::{
3823 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
3824 BranchNodeCompact, StorageTrieEntry,
3825 };
3826
3827 let factory = create_test_provider_factory();
3828 let provider_rw = factory.provider_rw().unwrap();
3829
3830 {
3832 let tx = provider_rw.tx_ref();
3833 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
3834
3835 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
3837 cursor
3838 .upsert(
3839 to_delete,
3840 &BranchNodeCompact::new(
3841 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
3845 None,
3846 ),
3847 )
3848 .unwrap();
3849
3850 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
3852 cursor
3853 .upsert(
3854 to_update,
3855 &BranchNodeCompact::new(
3856 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
3860 None,
3861 ),
3862 )
3863 .unwrap();
3864 }
3865
3866 let storage_address1 = B256::from([1u8; 32]);
3868 let storage_address2 = B256::from([2u8; 32]);
3869 {
3870 let tx = provider_rw.tx_ref();
3871 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
3872
3873 storage_cursor
3875 .upsert(
3876 storage_address1,
3877 &StorageTrieEntry {
3878 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
3879 node: BranchNodeCompact::new(
3880 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
3882 0b0000_0000_0000_0000,
3883 vec![],
3884 None,
3885 ),
3886 },
3887 )
3888 .unwrap();
3889
3890 storage_cursor
3892 .upsert(
3893 storage_address2,
3894 &StorageTrieEntry {
3895 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
3896 node: BranchNodeCompact::new(
3897 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
3899 0b0000_0000_0000_0000,
3900 vec![],
3901 None,
3902 ),
3903 },
3904 )
3905 .unwrap();
3906 storage_cursor
3907 .upsert(
3908 storage_address2,
3909 &StorageTrieEntry {
3910 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
3911 node: BranchNodeCompact::new(
3912 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
3914 0b0000_0000_0000_0000,
3915 vec![],
3916 None,
3917 ),
3918 },
3919 )
3920 .unwrap();
3921 }
3922
3923 let account_nodes = vec![
3925 (
3926 Nibbles::from_nibbles([0x1, 0x2]),
3927 Some(BranchNodeCompact::new(
3928 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
3932 None,
3933 )),
3934 ),
3935 (Nibbles::from_nibbles([0x3, 0x4]), None), (
3937 Nibbles::from_nibbles([0x5, 0x6]),
3938 Some(BranchNodeCompact::new(
3939 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
3943 None,
3944 )),
3945 ),
3946 ];
3947
3948 let storage_trie1 = StorageTrieUpdatesSorted {
3950 is_deleted: false,
3951 storage_nodes: vec![
3952 (
3953 Nibbles::from_nibbles([0x1, 0x0]),
3954 Some(BranchNodeCompact::new(
3955 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
3959 None,
3960 )),
3961 ),
3962 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
3964 };
3965
3966 let storage_trie2 = StorageTrieUpdatesSorted {
3967 is_deleted: true, storage_nodes: vec![],
3969 };
3970
3971 let mut storage_tries = B256Map::default();
3972 storage_tries.insert(storage_address1, storage_trie1);
3973 storage_tries.insert(storage_address2, storage_trie2);
3974
3975 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3976
3977 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
3979
3980 assert_eq!(num_entries, 5);
3983
3984 let tx = provider_rw.tx_ref();
3986 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
3987
3988 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
3990 let entry1 = cursor.seek_exact(nibbles1).unwrap();
3991 assert!(entry1.is_some(), "Updated account node should exist");
3992 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
3993 assert_eq!(
3994 entry1.unwrap().1.state_mask,
3995 expected_mask,
3996 "Account node should have updated state_mask"
3997 );
3998
3999 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4001 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4002 assert!(entry2.is_none(), "Deleted account node should not exist");
4003
4004 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4006 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4007 assert!(entry3.is_some(), "New account node should exist");
4008
4009 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4011
4012 let storage_entries1: Vec<_> = storage_cursor
4014 .walk_dup(Some(storage_address1), None)
4015 .unwrap()
4016 .collect::<Result<Vec<_>, _>>()
4017 .unwrap();
4018 assert_eq!(
4019 storage_entries1.len(),
4020 1,
4021 "Storage address1 should have 1 entry after deletion"
4022 );
4023 assert_eq!(
4024 storage_entries1[0].1.nibbles.0,
4025 Nibbles::from_nibbles([0x1, 0x0]),
4026 "Remaining entry should be [0x1, 0x0]"
4027 );
4028
4029 let storage_entries2: Vec<_> = storage_cursor
4031 .walk_dup(Some(storage_address2), None)
4032 .unwrap()
4033 .collect::<Result<Vec<_>, _>>()
4034 .unwrap();
4035 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4036
4037 provider_rw.commit().unwrap();
4038 }
4039
4040 #[test]
4041 fn test_prunable_receipts_logic() {
4042 let insert_blocks =
4043 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4044 let mut rng = generators::rng();
4045 for block_num in 0..=tip_block {
4046 let block = random_block(
4047 &mut rng,
4048 block_num,
4049 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4050 );
4051 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4052 }
4053 };
4054
4055 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4056 let outcome = ExecutionOutcome {
4057 first_block: block,
4058 receipts: vec![vec![Receipt {
4059 tx_type: Default::default(),
4060 success: true,
4061 cumulative_gas_used: block, logs: vec![],
4063 }]],
4064 ..Default::default()
4065 };
4066 provider_rw
4067 .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4068 .unwrap();
4069 provider_rw.commit().unwrap();
4070 };
4071
4072 {
4074 let factory = create_test_provider_factory();
4075 let storage_settings = StorageSettings::legacy();
4076 factory.set_storage_settings_cache(storage_settings);
4077 let factory = factory.with_prune_modes(PruneModes {
4078 receipts: Some(PruneMode::Before(100)),
4079 ..Default::default()
4080 });
4081
4082 let tip_block = 200u64;
4083 let first_block = 1u64;
4084
4085 let provider_rw = factory.provider_rw().unwrap();
4087 insert_blocks(&provider_rw, tip_block, 1);
4088 provider_rw.commit().unwrap();
4089
4090 write_receipts(
4091 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4092 first_block,
4093 );
4094 write_receipts(
4095 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4096 tip_block - 1,
4097 );
4098
4099 let provider = factory.provider().unwrap();
4100
4101 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4102 assert!(provider
4103 .receipts_by_block(block.into())
4104 .unwrap()
4105 .is_some_and(|r| r.len() == num_receipts));
4106 }
4107 }
4108
4109 {
4111 let factory = create_test_provider_factory();
4112 let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
4113 factory.set_storage_settings_cache(storage_settings);
4114 let factory = factory.with_prune_modes(PruneModes {
4115 receipts: Some(PruneMode::Before(2)),
4116 ..Default::default()
4117 });
4118
4119 let tip_block = 200u64;
4120
4121 let provider_rw = factory.provider_rw().unwrap();
4123 insert_blocks(&provider_rw, tip_block, 1);
4124 provider_rw.commit().unwrap();
4125
4126 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4128 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4129
4130 assert!(factory
4131 .static_file_provider()
4132 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4133 .is_none(),);
4134 assert!(factory
4135 .static_file_provider()
4136 .get_highest_static_file_block(StaticFileSegment::Receipts)
4137 .is_some_and(|b| b == 1),);
4138
4139 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4142 assert!(factory
4143 .static_file_provider()
4144 .get_highest_static_file_tx(StaticFileSegment::Receipts)
4145 .is_some_and(|num| num == 2),);
4146
4147 let factory = factory.with_prune_modes(PruneModes {
4151 receipts: Some(PruneMode::Before(100)),
4152 ..Default::default()
4153 });
4154 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4155 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4156 write_receipts(provider_rw, 3);
4157
4158 let provider = factory.provider().unwrap();
4163 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4164 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4165 assert!(provider
4166 .receipts_by_block(num.into())
4167 .unwrap()
4168 .is_some_and(|r| r.len() == num_receipts));
4169
4170 let receipt = provider.receipt(num).unwrap();
4171 if num_receipts > 0 {
4172 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4173 } else {
4174 assert!(receipt.is_none());
4175 }
4176 }
4177 }
4178 }
4179}