1use crate::{
2 changesets_utils::{
3 storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4 },
5 providers::{
6 database::{chain::ChainStorage, metrics},
7 rocksdb::RocksDBProvider,
8 static_file::StaticFileWriter,
9 NodeTypesForProvider, StaticFileProvider,
10 },
11 to_range,
12 traits::{
13 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
14 },
15 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
16 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
17 DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
18 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
19 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
20 PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
21 RocksDBProviderFactory, RocksTxRefArg, StageCheckpointReader, StateProviderBox, StateWriter,
22 StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
23 TransactionsProvider, TransactionsProviderExt, TrieReader, TrieWriter,
24};
25use alloy_consensus::{
26 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
27 BlockHeader, TxReceipt,
28};
29use alloy_eips::BlockHashOrNumber;
30use alloy_primitives::{
31 keccak256,
32 map::{hash_map, B256Map, HashMap, HashSet},
33 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
34};
35use itertools::Itertools;
36use parking_lot::RwLock;
37use rayon::slice::ParallelSliceMut;
38use reth_chain_state::ExecutedBlock;
39use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
40use reth_db_api::{
41 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
42 database::Database,
43 models::{
44 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
45 BlockNumberHashedAddress, ShardedKey, StorageSettings, StoredBlockBodyIndices,
46 },
47 table::Table,
48 tables,
49 transaction::{DbTx, DbTxMut},
50 BlockNumberList, PlainAccountState, PlainStorageState,
51};
52use reth_execution_types::{Chain, ExecutionOutcome};
53use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
54use reth_primitives_traits::{
55 Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64 NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
65 TryIntoHistoricalStateProvider,
66};
67use reth_storage_errors::provider::ProviderResult;
68use reth_trie::{
69 trie_cursor::{
70 InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
71 TrieCursorIter,
72 },
73 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
74 HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
75};
76use reth_trie_db::{
77 DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
78};
79use revm_database::states::{
80 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
81};
82use std::{
83 cmp::Ordering,
84 collections::{BTreeMap, BTreeSet},
85 fmt::Debug,
86 ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
87 sync::Arc,
88 time::{Duration, Instant},
89};
90use tracing::{debug, trace};
91
92pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
94
95#[derive(Debug)]
100pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
101 pub DatabaseProvider<<DB as Database>::TXMut, N>,
102);
103
104impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
105 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
106
107 fn deref(&self) -> &Self::Target {
108 &self.0
109 }
110}
111
112impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
113 fn deref_mut(&mut self) -> &mut Self::Target {
114 &mut self.0
115 }
116}
117
118impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
119 for DatabaseProviderRW<DB, N>
120{
121 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
122 &self.0
123 }
124}
125
126impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
127 pub fn commit(self) -> ProviderResult<bool> {
129 self.0.commit()
130 }
131
132 pub fn into_tx(self) -> <DB as Database>::TXMut {
134 self.0.into_tx()
135 }
136
137 #[cfg(any(test, feature = "test-utils"))]
139 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
140 self.0.minimum_pruning_distance = distance;
141 self
142 }
143}
144
145impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
146 for DatabaseProvider<<DB as Database>::TXMut, N>
147{
148 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
149 provider.0
150 }
151}
152
153pub struct DatabaseProvider<TX, N: NodeTypes> {
156 tx: TX,
158 chain_spec: Arc<N::ChainSpec>,
160 static_file_provider: StaticFileProvider<N::Primitives>,
162 prune_modes: PruneModes,
164 storage: Arc<N::Storage>,
166 storage_settings: Arc<RwLock<StorageSettings>>,
168 rocksdb_provider: RocksDBProvider,
170 #[cfg(all(unix, feature = "rocksdb"))]
172 pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
173 minimum_pruning_distance: u64,
175 metrics: metrics::DatabaseProviderMetrics,
177}
178
179impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 let mut s = f.debug_struct("DatabaseProvider");
182 s.field("tx", &self.tx)
183 .field("chain_spec", &self.chain_spec)
184 .field("static_file_provider", &self.static_file_provider)
185 .field("prune_modes", &self.prune_modes)
186 .field("storage", &self.storage)
187 .field("storage_settings", &self.storage_settings)
188 .field("rocksdb_provider", &self.rocksdb_provider);
189 #[cfg(all(unix, feature = "rocksdb"))]
190 s.field("pending_rocksdb_batches", &"<pending batches>");
191 s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
192 }
193}
194
195impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
196 pub const fn prune_modes_ref(&self) -> &PruneModes {
198 &self.prune_modes
199 }
200}
201
202impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
203 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
205 trace!(target: "providers::db", "Returning latest state provider");
206 Box::new(LatestStateProviderRef::new(self))
207 }
208
209 pub fn history_by_block_hash<'a>(
211 &'a self,
212 block_hash: BlockHash,
213 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
214 let mut block_number =
215 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
216 if block_number == self.best_block_number().unwrap_or_default() &&
217 block_number == self.last_block_number().unwrap_or_default()
218 {
219 return Ok(Box::new(LatestStateProviderRef::new(self)))
220 }
221
222 block_number += 1;
224
225 let account_history_prune_checkpoint =
226 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
227 let storage_history_prune_checkpoint =
228 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
229
230 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
231
232 if let Some(prune_checkpoint_block_number) =
235 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
236 {
237 state_provider = state_provider.with_lowest_available_account_history_block_number(
238 prune_checkpoint_block_number + 1,
239 );
240 }
241 if let Some(prune_checkpoint_block_number) =
242 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
243 {
244 state_provider = state_provider.with_lowest_available_storage_history_block_number(
245 prune_checkpoint_block_number + 1,
246 );
247 }
248
249 Ok(Box::new(state_provider))
250 }
251
252 #[cfg(feature = "test-utils")]
253 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
255 self.prune_modes = prune_modes;
256 }
257}
258
259impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
260 type Primitives = N::Primitives;
261}
262
263impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
264 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
266 self.static_file_provider.clone()
267 }
268
269 fn get_static_file_writer(
270 &self,
271 block: BlockNumber,
272 segment: StaticFileSegment,
273 ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
274 self.static_file_provider.get_writer(block, segment)
275 }
276}
277
278impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
279 fn rocksdb_provider(&self) -> RocksDBProvider {
281 self.rocksdb_provider.clone()
282 }
283
284 #[cfg(all(unix, feature = "rocksdb"))]
285 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
286 self.pending_rocksdb_batches.lock().push(batch);
287 }
288}
289
290impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
291 for DatabaseProvider<TX, N>
292{
293 type ChainSpec = N::ChainSpec;
294
295 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
296 self.chain_spec.clone()
297 }
298}
299
300impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
301 pub fn new_rw(
303 tx: TX,
304 chain_spec: Arc<N::ChainSpec>,
305 static_file_provider: StaticFileProvider<N::Primitives>,
306 prune_modes: PruneModes,
307 storage: Arc<N::Storage>,
308 storage_settings: Arc<RwLock<StorageSettings>>,
309 rocksdb_provider: RocksDBProvider,
310 ) -> Self {
311 Self {
312 tx,
313 chain_spec,
314 static_file_provider,
315 prune_modes,
316 storage,
317 storage_settings,
318 rocksdb_provider,
319 #[cfg(all(unix, feature = "rocksdb"))]
320 pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
321 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
322 metrics: metrics::DatabaseProviderMetrics::default(),
323 }
324 }
325}
326
327impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
328 fn as_ref(&self) -> &Self {
329 self
330 }
331}
332
333impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
334 pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
338 where
339 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
340 {
341 #[cfg(all(unix, feature = "rocksdb"))]
342 let rocksdb = self.rocksdb_provider();
343 #[cfg(all(unix, feature = "rocksdb"))]
344 let rocksdb_batch = rocksdb.batch();
345 #[cfg(not(all(unix, feature = "rocksdb")))]
346 let rocksdb_batch = ();
347
348 let (result, raw_batch) = f(rocksdb_batch)?;
349
350 #[cfg(all(unix, feature = "rocksdb"))]
351 if let Some(batch) = raw_batch {
352 self.set_pending_rocksdb_batch(batch);
353 }
354 let _ = raw_batch; Ok(result)
357 }
358
359 pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
361 if blocks.is_empty() {
362 debug!(target: "providers::db", "Attempted to write empty block range");
363 return Ok(())
364 }
365
366 let first_block = blocks.first().unwrap().recovered_block();
368
369 let last_block = blocks.last().unwrap().recovered_block();
370 let first_number = first_block.number();
371 let last_block_number = last_block.number();
372
373 debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
374
375 let mut total_insert_block = Duration::ZERO;
377 let mut total_write_state = Duration::ZERO;
378 let mut total_write_hashed_state = Duration::ZERO;
379 let mut total_write_trie_changesets = Duration::ZERO;
380 let mut total_write_trie_updates = Duration::ZERO;
381
382 for block in blocks {
392 let trie_data = block.trie_data();
393 let ExecutedBlock { recovered_block, execution_output, .. } = block;
394 let block_number = recovered_block.number();
395
396 let start = Instant::now();
397 self.insert_block(&recovered_block)?;
398 total_insert_block += start.elapsed();
399
400 let start = Instant::now();
403 self.write_state(&execution_output, OriginalValuesKnown::No)?;
404 total_write_state += start.elapsed();
405
406 let start = Instant::now();
408 self.write_hashed_state(&trie_data.hashed_state)?;
409 total_write_hashed_state += start.elapsed();
410
411 let start = Instant::now();
412 self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
413 total_write_trie_changesets += start.elapsed();
414
415 let start = Instant::now();
416 self.write_trie_updates_sorted(&trie_data.trie_updates)?;
417 total_write_trie_updates += start.elapsed();
418 }
419
420 let start = Instant::now();
422 self.update_history_indices(first_number..=last_block_number)?;
423 let duration_update_history_indices = start.elapsed();
424
425 let start = Instant::now();
427 self.update_pipeline_stages(last_block_number, false)?;
428 let duration_update_pipeline_stages = start.elapsed();
429
430 self.metrics.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block);
432 self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state);
433 self.metrics
434 .record_duration(metrics::Action::SaveBlocksWriteHashedState, total_write_hashed_state);
435 self.metrics.record_duration(
436 metrics::Action::SaveBlocksWriteTrieChangesets,
437 total_write_trie_changesets,
438 );
439 self.metrics
440 .record_duration(metrics::Action::SaveBlocksWriteTrieUpdates, total_write_trie_updates);
441 self.metrics.record_duration(
442 metrics::Action::SaveBlocksUpdateHistoryIndices,
443 duration_update_history_indices,
444 );
445 self.metrics.record_duration(
446 metrics::Action::SaveBlocksUpdatePipelineStages,
447 duration_update_pipeline_stages,
448 );
449
450 debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
451
452 Ok(())
453 }
454
455 pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
460 let changed_accounts = self
461 .tx
462 .cursor_read::<tables::AccountChangeSets>()?
463 .walk_range(from..)?
464 .collect::<Result<Vec<_>, _>>()?;
465
466 self.unwind_account_hashing(changed_accounts.iter())?;
468
469 self.unwind_account_history_indices(changed_accounts.iter())?;
471
472 let storage_start = BlockNumberAddress((from, Address::ZERO));
473 let changed_storages = self
474 .tx
475 .cursor_read::<tables::StorageChangeSets>()?
476 .walk_range(storage_start..)?
477 .collect::<Result<Vec<_>, _>>()?;
478
479 self.unwind_storage_hashing(changed_storages.iter().copied())?;
481
482 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
484
485 let trie_revert = self.trie_reverts(from)?;
487 self.write_trie_updates_sorted(&trie_revert)?;
488
489 self.clear_trie_changesets_from(from)?;
491
492 Ok(())
493 }
494
495 fn remove_receipts_from(
497 &self,
498 from_tx: TxNumber,
499 last_block: BlockNumber,
500 ) -> ProviderResult<()> {
501 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
503
504 if EitherWriter::receipts_destination(self).is_static_file() {
505 let static_file_receipt_num =
506 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
507
508 let to_delete = static_file_receipt_num
509 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
510 .unwrap_or_default();
511
512 self.static_file_provider
513 .latest_writer(StaticFileSegment::Receipts)?
514 .prune_receipts(to_delete, last_block)?;
515 }
516
517 Ok(())
518 }
519}
520
521impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
522 fn try_into_history_at_block(
523 self,
524 mut block_number: BlockNumber,
525 ) -> ProviderResult<StateProviderBox> {
526 if block_number == self.best_block_number().unwrap_or_default() {
529 return Ok(Box::new(LatestStateProvider::new(self)))
530 }
531
532 block_number += 1;
534
535 let account_history_prune_checkpoint =
536 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
537 let storage_history_prune_checkpoint =
538 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
539
540 let mut state_provider = HistoricalStateProvider::new(self, block_number);
541
542 if let Some(prune_checkpoint_block_number) =
545 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
546 {
547 state_provider = state_provider.with_lowest_available_account_history_block_number(
548 prune_checkpoint_block_number + 1,
549 );
550 }
551 if let Some(prune_checkpoint_block_number) =
552 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
553 {
554 state_provider = state_provider.with_lowest_available_storage_history_block_number(
555 prune_checkpoint_block_number + 1,
556 );
557 }
558
559 Ok(Box::new(state_provider))
560 }
561}
562
563fn unwind_history_shards<S, T, C>(
578 cursor: &mut C,
579 start_key: T::Key,
580 block_number: BlockNumber,
581 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
582) -> ProviderResult<Vec<u64>>
583where
584 T: Table<Value = BlockNumberList>,
585 T::Key: AsRef<ShardedKey<S>>,
586 C: DbCursorRO<T> + DbCursorRW<T>,
587{
588 let mut item = cursor.seek_exact(start_key)?;
590 while let Some((sharded_key, list)) = item {
591 if !shard_belongs_to_key(&sharded_key) {
593 break
594 }
595
596 cursor.delete_current()?;
599
600 let first = list.iter().next().expect("List can't be empty");
603
604 if first >= block_number {
607 item = cursor.prev()?;
608 continue
609 }
610 else if block_number <= sharded_key.as_ref().highest_block_number {
613 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
616 }
617 return Ok(list.iter().collect::<Vec<_>>())
620 }
621
622 Ok(Vec::new())
624}
625
626impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
627 pub fn new(
629 tx: TX,
630 chain_spec: Arc<N::ChainSpec>,
631 static_file_provider: StaticFileProvider<N::Primitives>,
632 prune_modes: PruneModes,
633 storage: Arc<N::Storage>,
634 storage_settings: Arc<RwLock<StorageSettings>>,
635 rocksdb_provider: RocksDBProvider,
636 ) -> Self {
637 Self {
638 tx,
639 chain_spec,
640 static_file_provider,
641 prune_modes,
642 storage,
643 storage_settings,
644 rocksdb_provider,
645 #[cfg(all(unix, feature = "rocksdb"))]
646 pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
647 minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
648 metrics: metrics::DatabaseProviderMetrics::default(),
649 }
650 }
651
652 pub fn into_tx(self) -> TX {
654 self.tx
655 }
656
657 pub const fn tx_mut(&mut self) -> &mut TX {
659 &mut self.tx
660 }
661
662 pub const fn tx_ref(&self) -> &TX {
664 &self.tx
665 }
666
667 pub fn chain_spec(&self) -> &N::ChainSpec {
669 &self.chain_spec
670 }
671
672 fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
676 where
677 F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
678 {
679 #[cfg(all(unix, feature = "rocksdb"))]
680 let rocksdb = self.rocksdb_provider();
681 #[cfg(all(unix, feature = "rocksdb"))]
682 let rocksdb_tx = rocksdb.tx();
683 #[cfg(all(unix, feature = "rocksdb"))]
684 let rocksdb_tx_ref = &rocksdb_tx;
685 #[cfg(not(all(unix, feature = "rocksdb")))]
686 let rocksdb_tx_ref = ();
687
688 f(rocksdb_tx_ref)
689 }
690}
691
692impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
693 fn recovered_block<H, HF, B, BF>(
694 &self,
695 id: BlockHashOrNumber,
696 _transaction_kind: TransactionVariant,
697 header_by_number: HF,
698 construct_block: BF,
699 ) -> ProviderResult<Option<B>>
700 where
701 H: AsRef<HeaderTy<N>>,
702 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
703 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
704 {
705 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
706 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
707
708 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
715
716 let tx_range = body.tx_num_range();
717
718 let (transactions, senders) = if tx_range.is_empty() {
719 (vec![], vec![])
720 } else {
721 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
722 };
723
724 let body = self
725 .storage
726 .reader()
727 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
728 .pop()
729 .ok_or(ProviderError::InvalidStorageOutput)?;
730
731 construct_block(header, body, senders)
732 }
733
734 fn block_range<F, H, HF, R>(
744 &self,
745 range: RangeInclusive<BlockNumber>,
746 headers_range: HF,
747 mut assemble_block: F,
748 ) -> ProviderResult<Vec<R>>
749 where
750 H: AsRef<HeaderTy<N>>,
751 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
752 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
753 {
754 if range.is_empty() {
755 return Ok(Vec::new())
756 }
757
758 let len = range.end().saturating_sub(*range.start()) as usize;
759 let mut blocks = Vec::with_capacity(len);
760
761 let headers = headers_range(range.clone())?;
762
763 let present_headers = self
769 .block_body_indices_range(range)?
770 .into_iter()
771 .map(|b| b.tx_num_range())
772 .zip(headers)
773 .collect::<Vec<_>>();
774
775 let mut inputs = Vec::with_capacity(present_headers.len());
776 for (tx_range, header) in &present_headers {
777 let transactions = if tx_range.is_empty() {
778 Vec::new()
779 } else {
780 self.transactions_by_tx_range(tx_range.clone())?
781 };
782
783 inputs.push((header.as_ref(), transactions));
784 }
785
786 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
787
788 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
789 blocks.push(assemble_block(header, body, tx_range)?);
790 }
791
792 Ok(blocks)
793 }
794
795 fn block_with_senders_range<H, HF, B, BF>(
806 &self,
807 range: RangeInclusive<BlockNumber>,
808 headers_range: HF,
809 assemble_block: BF,
810 ) -> ProviderResult<Vec<B>>
811 where
812 H: AsRef<HeaderTy<N>>,
813 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
814 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
815 {
816 self.block_range(range, headers_range, |header, body, tx_range| {
817 let senders = if tx_range.is_empty() {
818 Vec::new()
819 } else {
820 let known_senders: HashMap<TxNumber, Address> =
821 EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
822
823 let mut senders = Vec::with_capacity(body.transactions().len());
824 for (tx_num, tx) in tx_range.zip(body.transactions()) {
825 match known_senders.get(&tx_num) {
826 None => {
827 let sender = tx.recover_signer_unchecked()?;
829 senders.push(sender);
830 }
831 Some(sender) => senders.push(*sender),
832 }
833 }
834
835 senders
836 };
837
838 assemble_block(header, body, senders)
839 })
840 }
841
842 fn populate_bundle_state<A, S>(
846 &self,
847 account_changeset: Vec<(u64, AccountBeforeTx)>,
848 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
849 plain_accounts_cursor: &mut A,
850 plain_storage_cursor: &mut S,
851 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
852 where
853 A: DbCursorRO<PlainAccountState>,
854 S: DbDupCursorRO<PlainStorageState>,
855 {
856 let mut state: BundleStateInit = HashMap::default();
860
861 let mut reverts: RevertsInit = HashMap::default();
867
868 for (block_number, account_before) in account_changeset.into_iter().rev() {
870 let AccountBeforeTx { info: old_info, address } = account_before;
871 match state.entry(address) {
872 hash_map::Entry::Vacant(entry) => {
873 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
874 entry.insert((old_info, new_info, HashMap::default()));
875 }
876 hash_map::Entry::Occupied(mut entry) => {
877 entry.get_mut().0 = old_info;
879 }
880 }
881 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
883 }
884
885 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
887 let BlockNumberAddress((block_number, address)) = block_and_address;
888 let account_state = match state.entry(address) {
890 hash_map::Entry::Vacant(entry) => {
891 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
892 entry.insert((present_info, present_info, HashMap::default()))
893 }
894 hash_map::Entry::Occupied(entry) => entry.into_mut(),
895 };
896
897 match account_state.2.entry(old_storage.key) {
899 hash_map::Entry::Vacant(entry) => {
900 let new_storage = plain_storage_cursor
901 .seek_by_key_subkey(address, old_storage.key)?
902 .filter(|storage| storage.key == old_storage.key)
903 .unwrap_or_default();
904 entry.insert((old_storage.value, new_storage.value));
905 }
906 hash_map::Entry::Occupied(mut entry) => {
907 entry.get_mut().0 = old_storage.value;
908 }
909 };
910
911 reverts
912 .entry(block_number)
913 .or_default()
914 .entry(address)
915 .or_default()
916 .1
917 .push(old_storage);
918 }
919
920 Ok((state, reverts))
921 }
922}
923
924impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
925 fn append_history_index<P, T>(
933 &self,
934 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
935 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
936 ) -> ProviderResult<()>
937 where
938 P: Copy,
939 T: Table<Value = BlockNumberList>,
940 {
941 assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
944
945 let mut cursor = self.tx.cursor_write::<T>()?;
946
947 for (partial_key, indices) in index_updates {
948 let last_key = sharded_key_factory(partial_key, u64::MAX);
949 let mut last_shard = cursor
950 .seek_exact(last_key.clone())?
951 .map(|(_, list)| list)
952 .unwrap_or_else(BlockNumberList::empty);
953
954 last_shard.append(indices).map_err(ProviderError::other)?;
955
956 if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
958 cursor.upsert(last_key, &last_shard)?;
959 continue;
960 }
961
962 let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
964 let mut chunks_peekable = chunks.into_iter().peekable();
965
966 while let Some(chunk) = chunks_peekable.next() {
967 let shard = BlockNumberList::new_pre_sorted(chunk);
968 let highest_block_number = if chunks_peekable.peek().is_some() {
969 shard.iter().next_back().expect("`chunks` does not return empty list")
970 } else {
971 u64::MAX
973 };
974
975 cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
976 }
977 }
978
979 Ok(())
980 }
981}
982
983impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
984 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
985 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
986 }
987}
988
989impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
990 fn changed_accounts_with_range(
991 &self,
992 range: RangeInclusive<BlockNumber>,
993 ) -> ProviderResult<BTreeSet<Address>> {
994 let mut reader = EitherReader::new_account_changesets(self)?;
995
996 reader.changed_accounts_with_range(range)
997 }
998
999 fn basic_accounts(
1000 &self,
1001 iter: impl IntoIterator<Item = Address>,
1002 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1003 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1004 Ok(iter
1005 .into_iter()
1006 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
1007 .collect::<Result<Vec<_>, _>>()?)
1008 }
1009
1010 fn changed_accounts_and_blocks_with_range(
1011 &self,
1012 range: RangeInclusive<BlockNumber>,
1013 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1014 let highest_static_block = self
1015 .static_file_provider
1016 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1017
1018 if let Some(highest) = highest_static_block &&
1019 self.cached_storage_settings().account_changesets_in_static_files
1020 {
1021 let start = *range.start();
1022 let static_end = (*range.end()).min(highest + 1);
1023
1024 let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1025 if start <= static_end {
1026 for block in start..=static_end {
1027 let block_changesets = self.account_block_changeset(block)?;
1028 for changeset in block_changesets {
1029 changed_accounts_and_blocks
1030 .entry(changeset.address)
1031 .or_default()
1032 .push(block);
1033 }
1034 }
1035 }
1036
1037 Ok(changed_accounts_and_blocks)
1038 } else {
1039 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1040
1041 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1042 BTreeMap::new(),
1043 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1044 let (index, account) = entry?;
1045 accounts.entry(account.address).or_default().push(index);
1046 Ok(accounts)
1047 },
1048 )?;
1049
1050 Ok(account_transitions)
1051 }
1052 }
1053}
1054
1055impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1056 fn storage_changeset(
1057 &self,
1058 block_number: BlockNumber,
1059 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1060 let range = block_number..=block_number;
1061 let storage_range = BlockNumberAddress::range(range);
1062 self.tx
1063 .cursor_dup_read::<tables::StorageChangeSets>()?
1064 .walk_range(storage_range)?
1065 .map(|result| -> ProviderResult<_> { Ok(result?) })
1066 .collect()
1067 }
1068}
1069
1070impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1071 fn account_block_changeset(
1072 &self,
1073 block_number: BlockNumber,
1074 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1075 if self.cached_storage_settings().account_changesets_in_static_files {
1076 let static_changesets =
1077 self.static_file_provider.account_block_changeset(block_number)?;
1078 Ok(static_changesets)
1079 } else {
1080 let range = block_number..=block_number;
1081 self.tx
1082 .cursor_read::<tables::AccountChangeSets>()?
1083 .walk_range(range)?
1084 .map(|result| -> ProviderResult<_> {
1085 let (_, account_before) = result?;
1086 Ok(account_before)
1087 })
1088 .collect()
1089 }
1090 }
1091
1092 fn get_account_before_block(
1093 &self,
1094 block_number: BlockNumber,
1095 address: Address,
1096 ) -> ProviderResult<Option<AccountBeforeTx>> {
1097 if self.cached_storage_settings().account_changesets_in_static_files {
1098 Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1099 } else {
1100 self.tx
1101 .cursor_dup_read::<tables::AccountChangeSets>()?
1102 .seek_by_key_subkey(block_number, address)?
1103 .filter(|acc| acc.address == address)
1104 .map(Ok)
1105 .transpose()
1106 }
1107 }
1108
1109 fn account_changesets_range(
1110 &self,
1111 range: impl core::ops::RangeBounds<BlockNumber>,
1112 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1113 let range = to_range(range);
1114 let mut changesets = Vec::new();
1115 if self.cached_storage_settings().account_changesets_in_static_files &&
1116 let Some(highest) = self
1117 .static_file_provider
1118 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1119 {
1120 let static_end = range.end.min(highest + 1);
1121 if range.start < static_end {
1122 for block in range.start..static_end {
1123 let block_changesets = self.account_block_changeset(block)?;
1124 for changeset in block_changesets {
1125 changesets.push((block, changeset));
1126 }
1127 }
1128 }
1129 } else {
1130 let mut cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1132 for entry in cursor.walk_range(range)? {
1133 let (block_num, account_before) = entry?;
1134 changesets.push((block_num, account_before));
1135 }
1136 }
1137
1138 Ok(changesets)
1139 }
1140
1141 fn account_changeset_count(&self) -> ProviderResult<usize> {
1142 if self.cached_storage_settings().account_changesets_in_static_files {
1145 self.static_file_provider.account_changeset_count()
1146 } else {
1147 Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1148 }
1149 }
1150}
1151
1152impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1153 for DatabaseProvider<TX, N>
1154{
1155 type Header = HeaderTy<N>;
1156
1157 fn local_tip_header(
1158 &self,
1159 highest_uninterrupted_block: BlockNumber,
1160 ) -> ProviderResult<SealedHeader<Self::Header>> {
1161 let static_file_provider = self.static_file_provider();
1162
1163 let next_static_file_block_num = static_file_provider
1166 .get_highest_static_file_block(StaticFileSegment::Headers)
1167 .map(|id| id + 1)
1168 .unwrap_or_default();
1169 let next_block = highest_uninterrupted_block + 1;
1170
1171 match next_static_file_block_num.cmp(&next_block) {
1172 Ordering::Greater => {
1175 let mut static_file_producer =
1176 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1177 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1178 static_file_producer.commit()?
1181 }
1182 Ordering::Less => {
1183 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1185 }
1186 Ordering::Equal => {}
1187 }
1188
1189 let local_head = static_file_provider
1190 .sealed_header(highest_uninterrupted_block)?
1191 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1192
1193 Ok(local_head)
1194 }
1195}
1196
1197impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1198 type Header = HeaderTy<N>;
1199
1200 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1201 if let Some(num) = self.block_number(block_hash)? {
1202 Ok(self.header_by_number(num)?)
1203 } else {
1204 Ok(None)
1205 }
1206 }
1207
1208 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1209 self.static_file_provider.header_by_number(num)
1210 }
1211
1212 fn headers_range(
1213 &self,
1214 range: impl RangeBounds<BlockNumber>,
1215 ) -> ProviderResult<Vec<Self::Header>> {
1216 self.static_file_provider.headers_range(range)
1217 }
1218
1219 fn sealed_header(
1220 &self,
1221 number: BlockNumber,
1222 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1223 self.static_file_provider.sealed_header(number)
1224 }
1225
1226 fn sealed_headers_while(
1227 &self,
1228 range: impl RangeBounds<BlockNumber>,
1229 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1230 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1231 self.static_file_provider.sealed_headers_while(range, predicate)
1232 }
1233}
1234
1235impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1236 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1237 self.static_file_provider.block_hash(number)
1238 }
1239
1240 fn canonical_hashes_range(
1241 &self,
1242 start: BlockNumber,
1243 end: BlockNumber,
1244 ) -> ProviderResult<Vec<B256>> {
1245 self.static_file_provider.canonical_hashes_range(start, end)
1246 }
1247}
1248
1249impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1250 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1251 let best_number = self.best_block_number()?;
1252 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1253 Ok(ChainInfo { best_hash, best_number })
1254 }
1255
1256 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1257 Ok(self
1260 .get_stage_checkpoint(StageId::Finish)?
1261 .map(|checkpoint| checkpoint.block_number)
1262 .unwrap_or_default())
1263 }
1264
1265 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1266 self.static_file_provider.last_block_number()
1267 }
1268
1269 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1270 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1271 }
1272}
1273
1274impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1275 type Block = BlockTy<N>;
1276
1277 fn find_block_by_hash(
1278 &self,
1279 hash: B256,
1280 source: BlockSource,
1281 ) -> ProviderResult<Option<Self::Block>> {
1282 if source.is_canonical() {
1283 self.block(hash.into())
1284 } else {
1285 Ok(None)
1286 }
1287 }
1288
1289 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1295 if let Some(number) = self.convert_hash_or_number(id)? &&
1296 let Some(header) = self.header_by_number(number)?
1297 {
1298 let Some(transactions) = self.transactions_by_block(number.into())? else {
1303 return Ok(None)
1304 };
1305
1306 let body = self
1307 .storage
1308 .reader()
1309 .read_block_bodies(self, vec![(&header, transactions)])?
1310 .pop()
1311 .ok_or(ProviderError::InvalidStorageOutput)?;
1312
1313 return Ok(Some(Self::Block::new(header, body)))
1314 }
1315
1316 Ok(None)
1317 }
1318
1319 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1320 Ok(None)
1321 }
1322
1323 fn pending_block_and_receipts(
1324 &self,
1325 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1326 Ok(None)
1327 }
1328
1329 fn recovered_block(
1338 &self,
1339 id: BlockHashOrNumber,
1340 transaction_kind: TransactionVariant,
1341 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1342 self.recovered_block(
1343 id,
1344 transaction_kind,
1345 |block_number| self.header_by_number(block_number),
1346 |header, body, senders| {
1347 Self::Block::new(header, body)
1348 .try_into_recovered_unchecked(senders)
1352 .map(Some)
1353 .map_err(|_| ProviderError::SenderRecoveryError)
1354 },
1355 )
1356 }
1357
1358 fn sealed_block_with_senders(
1359 &self,
1360 id: BlockHashOrNumber,
1361 transaction_kind: TransactionVariant,
1362 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1363 self.recovered_block(
1364 id,
1365 transaction_kind,
1366 |block_number| self.sealed_header(block_number),
1367 |header, body, senders| {
1368 Self::Block::new_sealed(header, body)
1369 .try_with_senders_unchecked(senders)
1373 .map(Some)
1374 .map_err(|_| ProviderError::SenderRecoveryError)
1375 },
1376 )
1377 }
1378
1379 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1380 self.block_range(
1381 range,
1382 |range| self.headers_range(range),
1383 |header, body, _| Ok(Self::Block::new(header, body)),
1384 )
1385 }
1386
1387 fn block_with_senders_range(
1388 &self,
1389 range: RangeInclusive<BlockNumber>,
1390 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1391 self.block_with_senders_range(
1392 range,
1393 |range| self.headers_range(range),
1394 |header, body, senders| {
1395 Self::Block::new(header, body)
1396 .try_into_recovered_unchecked(senders)
1397 .map_err(|_| ProviderError::SenderRecoveryError)
1398 },
1399 )
1400 }
1401
1402 fn recovered_block_range(
1403 &self,
1404 range: RangeInclusive<BlockNumber>,
1405 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1406 self.block_with_senders_range(
1407 range,
1408 |range| self.sealed_headers_range(range),
1409 |header, body, senders| {
1410 Self::Block::new_sealed(header, body)
1411 .try_with_senders(senders)
1412 .map_err(|_| ProviderError::SenderRecoveryError)
1413 },
1414 )
1415 }
1416
1417 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1418 Ok(self
1419 .tx
1420 .cursor_read::<tables::TransactionBlocks>()?
1421 .seek(id)
1422 .map(|b| b.map(|(_, bn)| bn))?)
1423 }
1424}
1425
1426impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1427 for DatabaseProvider<TX, N>
1428{
1429 fn transaction_hashes_by_range(
1432 &self,
1433 tx_range: Range<TxNumber>,
1434 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1435 self.static_file_provider.transaction_hashes_by_range(tx_range)
1436 }
1437}
1438
1439impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1441 type Transaction = TxTy<N>;
1442
1443 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1444 self.with_rocksdb_tx(|tx_ref| {
1445 let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1446 reader.get_transaction_hash_number(tx_hash)
1447 })
1448 }
1449
1450 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1451 self.static_file_provider.transaction_by_id(id)
1452 }
1453
1454 fn transaction_by_id_unhashed(
1455 &self,
1456 id: TxNumber,
1457 ) -> ProviderResult<Option<Self::Transaction>> {
1458 self.static_file_provider.transaction_by_id_unhashed(id)
1459 }
1460
1461 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1462 if let Some(id) = self.transaction_id(hash)? {
1463 Ok(self.transaction_by_id_unhashed(id)?)
1464 } else {
1465 Ok(None)
1466 }
1467 }
1468
1469 fn transaction_by_hash_with_meta(
1470 &self,
1471 tx_hash: TxHash,
1472 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1473 if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1474 let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1475 let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1476 let Some(sealed_header) = self.sealed_header(block_number)?
1477 {
1478 let (header, block_hash) = sealed_header.split();
1479 if let Some(block_body) = self.block_body_indices(block_number)? {
1480 let index = transaction_id - block_body.first_tx_num();
1485
1486 let meta = TransactionMeta {
1487 tx_hash,
1488 index,
1489 block_hash,
1490 block_number,
1491 base_fee: header.base_fee_per_gas(),
1492 excess_blob_gas: header.excess_blob_gas(),
1493 timestamp: header.timestamp(),
1494 };
1495
1496 return Ok(Some((transaction, meta)))
1497 }
1498 }
1499
1500 Ok(None)
1501 }
1502
1503 fn transactions_by_block(
1504 &self,
1505 id: BlockHashOrNumber,
1506 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1507 if let Some(block_number) = self.convert_hash_or_number(id)? &&
1508 let Some(body) = self.block_body_indices(block_number)?
1509 {
1510 let tx_range = body.tx_num_range();
1511 return if tx_range.is_empty() {
1512 Ok(Some(Vec::new()))
1513 } else {
1514 self.transactions_by_tx_range(tx_range).map(Some)
1515 }
1516 }
1517 Ok(None)
1518 }
1519
1520 fn transactions_by_block_range(
1521 &self,
1522 range: impl RangeBounds<BlockNumber>,
1523 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1524 let range = to_range(range);
1525
1526 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1527 .into_iter()
1528 .map(|body| {
1529 let tx_num_range = body.tx_num_range();
1530 if tx_num_range.is_empty() {
1531 Ok(Vec::new())
1532 } else {
1533 self.transactions_by_tx_range(tx_num_range)
1534 }
1535 })
1536 .collect()
1537 }
1538
1539 fn transactions_by_tx_range(
1540 &self,
1541 range: impl RangeBounds<TxNumber>,
1542 ) -> ProviderResult<Vec<Self::Transaction>> {
1543 self.static_file_provider.transactions_by_tx_range(range)
1544 }
1545
1546 fn senders_by_tx_range(
1547 &self,
1548 range: impl RangeBounds<TxNumber>,
1549 ) -> ProviderResult<Vec<Address>> {
1550 if EitherWriterDestination::senders(self).is_static_file() {
1551 self.static_file_provider.senders_by_tx_range(range)
1552 } else {
1553 self.cursor_read_collect::<tables::TransactionSenders>(range)
1554 }
1555 }
1556
1557 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1558 if EitherWriterDestination::senders(self).is_static_file() {
1559 self.static_file_provider.transaction_sender(id)
1560 } else {
1561 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1562 }
1563 }
1564}
1565
1566impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1567 type Receipt = ReceiptTy<N>;
1568
1569 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1570 self.static_file_provider.get_with_static_file_or_database(
1571 StaticFileSegment::Receipts,
1572 id,
1573 |static_file| static_file.receipt(id),
1574 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1575 )
1576 }
1577
1578 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1579 if let Some(id) = self.transaction_id(hash)? {
1580 self.receipt(id)
1581 } else {
1582 Ok(None)
1583 }
1584 }
1585
1586 fn receipts_by_block(
1587 &self,
1588 block: BlockHashOrNumber,
1589 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1590 if let Some(number) = self.convert_hash_or_number(block)? &&
1591 let Some(body) = self.block_body_indices(number)?
1592 {
1593 let tx_range = body.tx_num_range();
1594 return if tx_range.is_empty() {
1595 Ok(Some(Vec::new()))
1596 } else {
1597 self.receipts_by_tx_range(tx_range).map(Some)
1598 }
1599 }
1600 Ok(None)
1601 }
1602
1603 fn receipts_by_tx_range(
1604 &self,
1605 range: impl RangeBounds<TxNumber>,
1606 ) -> ProviderResult<Vec<Self::Receipt>> {
1607 self.static_file_provider.get_range_with_static_file_or_database(
1608 StaticFileSegment::Receipts,
1609 to_range(range),
1610 |static_file, range, _| static_file.receipts_by_tx_range(range),
1611 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1612 |_| true,
1613 )
1614 }
1615
1616 fn receipts_by_block_range(
1617 &self,
1618 block_range: RangeInclusive<BlockNumber>,
1619 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1620 if block_range.is_empty() {
1621 return Ok(Vec::new());
1622 }
1623
1624 let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
1626 let mut block_body_indices = Vec::with_capacity(range_len);
1627 for block_num in block_range {
1628 if let Some(indices) = self.block_body_indices(block_num)? {
1629 block_body_indices.push(indices);
1630 } else {
1631 block_body_indices.push(StoredBlockBodyIndices::default());
1633 }
1634 }
1635
1636 if block_body_indices.is_empty() {
1637 return Ok(Vec::new());
1638 }
1639
1640 let non_empty_blocks: Vec<_> =
1642 block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1643
1644 if non_empty_blocks.is_empty() {
1645 return Ok(vec![Vec::new(); block_body_indices.len()]);
1647 }
1648
1649 let first_tx = non_empty_blocks[0].first_tx_num();
1651 let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1652
1653 let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1655 let mut receipts_iter = all_receipts.into_iter();
1656
1657 let mut result = Vec::with_capacity(block_body_indices.len());
1659 for indices in &block_body_indices {
1660 if indices.tx_count == 0 {
1661 result.push(Vec::new());
1662 } else {
1663 let block_receipts =
1664 receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1665 result.push(block_receipts);
1666 }
1667 }
1668
1669 Ok(result)
1670 }
1671}
1672
1673impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1674 for DatabaseProvider<TX, N>
1675{
1676 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1677 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1678 }
1679
1680 fn block_body_indices_range(
1681 &self,
1682 range: RangeInclusive<BlockNumber>,
1683 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1684 self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1685 }
1686}
1687
1688impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1689 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1690 Ok(if let Some(encoded) = id.get_pre_encoded() {
1691 self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1692 } else {
1693 self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1694 })
1695 }
1696
1697 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1699 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1700 }
1701
1702 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1703 self.tx
1704 .cursor_read::<tables::StageCheckpoints>()?
1705 .walk(None)?
1706 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1707 .map_err(ProviderError::Database)
1708 }
1709}
1710
1711impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1712 fn save_stage_checkpoint(
1714 &self,
1715 id: StageId,
1716 checkpoint: StageCheckpoint,
1717 ) -> ProviderResult<()> {
1718 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1719 }
1720
1721 fn save_stage_checkpoint_progress(
1723 &self,
1724 id: StageId,
1725 checkpoint: Vec<u8>,
1726 ) -> ProviderResult<()> {
1727 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1728 }
1729
1730 fn update_pipeline_stages(
1731 &self,
1732 block_number: BlockNumber,
1733 drop_stage_checkpoint: bool,
1734 ) -> ProviderResult<()> {
1735 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1737 for stage_id in StageId::ALL {
1738 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1739 cursor.upsert(
1740 stage_id.to_string(),
1741 &StageCheckpoint {
1742 block_number,
1743 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1744 },
1745 )?;
1746 }
1747
1748 Ok(())
1749 }
1750}
1751
1752impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1753 fn plain_state_storages(
1754 &self,
1755 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1756 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1757 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1758
1759 addresses_with_keys
1760 .into_iter()
1761 .map(|(address, storage)| {
1762 storage
1763 .into_iter()
1764 .map(|key| -> ProviderResult<_> {
1765 Ok(plain_storage
1766 .seek_by_key_subkey(address, key)?
1767 .filter(|v| v.key == key)
1768 .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1769 })
1770 .collect::<ProviderResult<Vec<_>>>()
1771 .map(|storage| (address, storage))
1772 })
1773 .collect::<ProviderResult<Vec<(_, _)>>>()
1774 }
1775
1776 fn changed_storages_with_range(
1777 &self,
1778 range: RangeInclusive<BlockNumber>,
1779 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1780 self.tx
1781 .cursor_read::<tables::StorageChangeSets>()?
1782 .walk_range(BlockNumberAddress::range(range))?
1783 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1786 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1787 accounts.entry(address).or_default().insert(storage_entry.key);
1788 Ok(accounts)
1789 })
1790 }
1791
1792 fn changed_storages_and_blocks_with_range(
1793 &self,
1794 range: RangeInclusive<BlockNumber>,
1795 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1796 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1797
1798 let storage_changeset_lists =
1799 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1800 BTreeMap::new(),
1801 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1802 let (index, storage) = entry?;
1803 storages
1804 .entry((index.address(), storage.key))
1805 .or_default()
1806 .push(index.block_number());
1807 Ok(storages)
1808 },
1809 )?;
1810
1811 Ok(storage_changeset_lists)
1812 }
1813}
1814
1815impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1816 for DatabaseProvider<TX, N>
1817{
1818 type Receipt = ReceiptTy<N>;
1819
1820 fn write_state(
1821 &self,
1822 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1823 is_value_known: OriginalValuesKnown,
1824 ) -> ProviderResult<()> {
1825 let first_block = execution_outcome.first_block();
1826 let block_count = execution_outcome.len() as u64;
1827 let last_block = execution_outcome.last_block();
1828 let block_range = first_block..=last_block;
1829
1830 let tip = self.last_block_number()?.max(last_block);
1831
1832 let (plain_state, reverts) =
1833 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1834
1835 self.write_state_reverts(reverts, first_block)?;
1836 self.write_state_changes(plain_state)?;
1837
1838 let block_indices: Vec<_> = self
1840 .block_body_indices_range(block_range)?
1841 .into_iter()
1842 .map(|b| b.first_tx_num)
1843 .collect();
1844
1845 if block_indices.len() < block_count as usize {
1847 let missing_blocks = block_count - block_indices.len() as u64;
1848 return Err(ProviderError::BlockBodyIndicesNotFound(
1849 last_block.saturating_sub(missing_blocks - 1),
1850 ));
1851 }
1852
1853 let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
1854
1855 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1856 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1857
1858 let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
1866 self.static_file_provider()
1867 .get_highest_static_file_tx(StaticFileSegment::Receipts)
1868 .is_none()) &&
1869 PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
1870
1871 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1873 for (_, addresses) in contract_log_pruner.range(..first_block) {
1874 allowed_addresses.extend(addresses.iter().copied());
1875 }
1876
1877 for (idx, (receipts, first_tx_index)) in
1878 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1879 {
1880 let block_number = first_block + idx as u64;
1881
1882 receipts_writer.increment_block(block_number)?;
1884
1885 if prunable_receipts &&
1887 self.prune_modes
1888 .receipts
1889 .is_some_and(|mode| mode.should_prune(block_number, tip))
1890 {
1891 continue
1892 }
1893
1894 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1896 allowed_addresses.extend(new_addresses.iter().copied());
1897 }
1898
1899 for (idx, receipt) in receipts.iter().enumerate() {
1900 let receipt_idx = first_tx_index + idx as u64;
1901 if prunable_receipts &&
1904 has_contract_log_filter &&
1905 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1906 {
1907 continue
1908 }
1909
1910 receipts_writer.append_receipt(receipt_idx, receipt)?;
1911 }
1912 }
1913
1914 Ok(())
1915 }
1916
1917 fn write_state_reverts(
1918 &self,
1919 reverts: PlainStateReverts,
1920 first_block: BlockNumber,
1921 ) -> ProviderResult<()> {
1922 tracing::trace!("Writing storage changes");
1924 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1925 let mut storage_changeset_cursor =
1926 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1927 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1928 let block_number = first_block + block_index as BlockNumber;
1929
1930 tracing::trace!(block_number, "Writing block change");
1931 storage_changes.par_sort_unstable_by_key(|a| a.address);
1933 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1934 let storage_id = BlockNumberAddress((block_number, address));
1935
1936 let mut storage = storage_revert
1937 .into_iter()
1938 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1939 .collect::<Vec<_>>();
1940 storage.par_sort_unstable_by_key(|a| a.0);
1942
1943 let mut wiped_storage = Vec::new();
1951 if wiped {
1952 tracing::trace!(?address, "Wiping storage");
1953 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1954 wiped_storage.push((entry.key, entry.value));
1955 while let Some(entry) = storages_cursor.next_dup_val()? {
1956 wiped_storage.push((entry.key, entry.value))
1957 }
1958 }
1959 }
1960
1961 tracing::trace!(?address, ?storage, "Writing storage reverts");
1962 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1963 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1964 }
1965 }
1966 }
1967
1968 tracing::debug!(target: "sync::stages::merkle_changesets", ?first_block, "Writing account changes");
1970 for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1971 let block_number = first_block + block_index as BlockNumber;
1972 let changeset = account_block_reverts
1973 .into_iter()
1974 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1975 .collect::<Vec<_>>();
1976 let mut account_changesets_writer =
1977 EitherWriter::new_account_changesets(self, block_number)?;
1978
1979 account_changesets_writer.append_account_changeset(block_number, changeset)?;
1980 }
1981
1982 Ok(())
1983 }
1984
1985 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1986 changes.accounts.par_sort_by_key(|a| a.0);
1989 changes.storage.par_sort_by_key(|a| a.address);
1990 changes.contracts.par_sort_by_key(|a| a.0);
1991
1992 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1994 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1995 for (address, account) in changes.accounts {
1997 if let Some(account) = account {
1998 tracing::trace!(?address, "Updating plain state account");
1999 accounts_cursor.upsert(address, &account.into())?;
2000 } else if accounts_cursor.seek_exact(address)?.is_some() {
2001 tracing::trace!(?address, "Deleting plain state account");
2002 accounts_cursor.delete_current()?;
2003 }
2004 }
2005
2006 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2008 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
2009 for (hash, bytecode) in changes.contracts {
2010 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
2011 }
2012
2013 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2015 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2016 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2017 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2019 storages_cursor.delete_current_duplicates()?;
2020 }
2021 let mut storage = storage
2023 .into_iter()
2024 .map(|(k, value)| StorageEntry { key: k.into(), value })
2025 .collect::<Vec<_>>();
2026 storage.par_sort_unstable_by_key(|a| a.key);
2028
2029 for entry in storage {
2030 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2031 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2032 db_entry.key == entry.key
2033 {
2034 storages_cursor.delete_current()?;
2035 }
2036
2037 if !entry.value.is_zero() {
2038 storages_cursor.upsert(address, &entry)?;
2039 }
2040 }
2041 }
2042
2043 Ok(())
2044 }
2045
2046 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2047 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2049 for (hashed_address, account) in hashed_state.accounts() {
2050 if let Some(account) = account {
2051 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2052 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2053 hashed_accounts_cursor.delete_current()?;
2054 }
2055 }
2056
2057 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2059 let mut hashed_storage_cursor =
2060 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2061 for (hashed_address, storage) in sorted_storages {
2062 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2063 hashed_storage_cursor.delete_current_duplicates()?;
2064 }
2065
2066 for (hashed_slot, value) in storage.storage_slots_ref() {
2067 let entry = StorageEntry { key: *hashed_slot, value: *value };
2068
2069 if let Some(db_entry) =
2070 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2071 db_entry.key == entry.key
2072 {
2073 hashed_storage_cursor.delete_current()?;
2074 }
2075
2076 if !entry.value.is_zero() {
2077 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2078 }
2079 }
2080 }
2081
2082 Ok(())
2083 }
2084
2085 fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2107 let range = block + 1..=self.last_block_number()?;
2108
2109 if range.is_empty() {
2110 return Ok(());
2111 }
2112
2113 let block_bodies = self.block_body_indices_range(range.clone())?;
2115
2116 let from_transaction_num =
2118 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2119
2120 let storage_range = BlockNumberAddress::range(range.clone());
2121
2122 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2123 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2124
2125 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2130 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2131
2132 let (state, _) = self.populate_bundle_state(
2133 account_changeset,
2134 storage_changeset,
2135 &mut plain_accounts_cursor,
2136 &mut plain_storage_cursor,
2137 )?;
2138
2139 for (address, (old_account, new_account, storage)) in &state {
2141 if old_account != new_account {
2143 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2144 if let Some(account) = old_account {
2145 plain_accounts_cursor.upsert(*address, account)?;
2146 } else if existing_entry.is_some() {
2147 plain_accounts_cursor.delete_current()?;
2148 }
2149 }
2150
2151 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2153 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2154 if plain_storage_cursor
2156 .seek_by_key_subkey(*address, *storage_key)?
2157 .filter(|s| s.key == *storage_key)
2158 .is_some()
2159 {
2160 plain_storage_cursor.delete_current()?
2161 }
2162
2163 if !old_storage_value.is_zero() {
2165 plain_storage_cursor.upsert(*address, &storage_entry)?;
2166 }
2167 }
2168 }
2169
2170 self.remove_receipts_from(from_transaction_num, block)?;
2171
2172 Ok(())
2173 }
2174
2175 fn take_state_above(
2197 &self,
2198 block: BlockNumber,
2199 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2200 let range = block + 1..=self.last_block_number()?;
2201
2202 if range.is_empty() {
2203 return Ok(ExecutionOutcome::default())
2204 }
2205 let start_block_number = *range.start();
2206
2207 let block_bodies = self.block_body_indices_range(range.clone())?;
2209
2210 let from_transaction_num =
2212 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2213 let to_transaction_num =
2214 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2215
2216 let storage_range = BlockNumberAddress::range(range.clone());
2217
2218 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2219
2220 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2225 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2226
2227 let highest_changeset_block = self
2229 .static_file_provider
2230 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2231 let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2232 self.cached_storage_settings().account_changesets_in_static_files
2233 {
2234 let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2236 let mut changeset_writer =
2237 self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2238 changeset_writer.prune_account_changesets(block)?;
2239
2240 changesets
2241 } else {
2242 self.take::<tables::AccountChangeSets>(range)?
2245 };
2246
2247 let (state, reverts) = self.populate_bundle_state(
2250 account_changeset,
2251 storage_changeset,
2252 &mut plain_accounts_cursor,
2253 &mut plain_storage_cursor,
2254 )?;
2255
2256 for (address, (old_account, new_account, storage)) in &state {
2258 if old_account != new_account {
2260 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2261 if let Some(account) = old_account {
2262 plain_accounts_cursor.upsert(*address, account)?;
2263 } else if existing_entry.is_some() {
2264 plain_accounts_cursor.delete_current()?;
2265 }
2266 }
2267
2268 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2270 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2271 if plain_storage_cursor
2273 .seek_by_key_subkey(*address, *storage_key)?
2274 .filter(|s| s.key == *storage_key)
2275 .is_some()
2276 {
2277 plain_storage_cursor.delete_current()?
2278 }
2279
2280 if !old_storage_value.is_zero() {
2282 plain_storage_cursor.upsert(*address, &storage_entry)?;
2283 }
2284 }
2285 }
2286
2287 let mut receipts_iter = self
2289 .static_file_provider
2290 .get_range_with_static_file_or_database(
2291 StaticFileSegment::Receipts,
2292 from_transaction_num..to_transaction_num + 1,
2293 |static_file, range, _| {
2294 static_file
2295 .receipts_by_tx_range(range.clone())
2296 .map(|r| range.into_iter().zip(r).collect())
2297 },
2298 |range, _| {
2299 self.tx
2300 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2301 .walk_range(range)?
2302 .map(|r| r.map_err(Into::into))
2303 .collect()
2304 },
2305 |_| true,
2306 )?
2307 .into_iter()
2308 .peekable();
2309
2310 let mut receipts = Vec::with_capacity(block_bodies.len());
2311 for block_body in block_bodies {
2313 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2314 for num in block_body.tx_num_range() {
2315 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2316 block_receipts.push(receipts_iter.next().unwrap().1);
2317 }
2318 }
2319 receipts.push(block_receipts);
2320 }
2321
2322 self.remove_receipts_from(from_transaction_num, block)?;
2323
2324 Ok(ExecutionOutcome::new_init(
2325 state,
2326 reverts,
2327 Vec::new(),
2328 receipts,
2329 start_block_number,
2330 Vec::new(),
2331 ))
2332 }
2333}
2334
2335impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2336 fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2340 if trie_updates.is_empty() {
2341 return Ok(0)
2342 }
2343
2344 let mut num_entries = 0;
2346
2347 let tx = self.tx_ref();
2348 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2349
2350 for (key, updated_node) in trie_updates.account_nodes_ref() {
2352 let nibbles = StoredNibbles(*key);
2353 match updated_node {
2354 Some(node) => {
2355 if !nibbles.0.is_empty() {
2356 num_entries += 1;
2357 account_trie_cursor.upsert(nibbles, node)?;
2358 }
2359 }
2360 None => {
2361 num_entries += 1;
2362 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2363 account_trie_cursor.delete_current()?;
2364 }
2365 }
2366 }
2367 }
2368
2369 num_entries +=
2370 self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2371
2372 Ok(num_entries)
2373 }
2374
2375 fn write_trie_changesets(
2383 &self,
2384 block_number: BlockNumber,
2385 trie_updates: &TrieUpdatesSorted,
2386 updates_overlay: Option<&TrieUpdatesSorted>,
2387 ) -> ProviderResult<usize> {
2388 let mut num_entries = 0;
2389
2390 let mut changeset_cursor =
2391 self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2392 let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2393
2394 let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2396
2397 let empty_updates = TrieUpdatesSorted::default();
2399 let overlay = updates_overlay.unwrap_or(&empty_updates);
2400
2401 let mut in_memory_account_cursor =
2403 InMemoryTrieCursor::new_account(&mut db_account_cursor, overlay);
2404
2405 for (path, _) in trie_updates.account_nodes_ref() {
2406 num_entries += 1;
2407 let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2408 changeset_cursor.append_dup(
2409 block_number,
2410 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2411 )?;
2412 }
2413
2414 let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2415 storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2416
2417 num_entries += self.write_storage_trie_changesets(
2418 block_number,
2419 storage_updates.into_iter(),
2420 updates_overlay,
2421 )?;
2422
2423 Ok(num_entries)
2424 }
2425
2426 fn clear_trie_changesets(&self) -> ProviderResult<()> {
2427 let tx = self.tx_ref();
2428 tx.clear::<tables::AccountsTrieChangeSets>()?;
2429 tx.clear::<tables::StoragesTrieChangeSets>()?;
2430 Ok(())
2431 }
2432
2433 fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2434 let tx = self.tx_ref();
2435 {
2436 let range = from..;
2437 let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2438 let mut walker = cursor.walk_range(range)?;
2439
2440 while walker.next().transpose()?.is_some() {
2441 walker.delete_current()?;
2442 }
2443 }
2444
2445 {
2446 let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2447 let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2448 let mut walker = cursor.walk_range(range)?;
2449
2450 while walker.next().transpose()?.is_some() {
2451 walker.delete_current()?;
2452 }
2453 }
2454
2455 Ok(())
2456 }
2457}
2458
2459impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2460 fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2461 let tx = self.tx_ref();
2462
2463 let mut account_nodes = Vec::new();
2466 let mut seen_account_keys = HashSet::new();
2467 let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2468
2469 for entry in accounts_cursor.walk_range(from..)? {
2470 let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2471 if seen_account_keys.insert(nibbles.0) {
2473 account_nodes.push((nibbles.0, node));
2474 }
2475 }
2476
2477 account_nodes.sort_by_key(|(path, _)| *path);
2478
2479 let mut storage_tries = B256Map::<Vec<_>>::default();
2483 let mut seen_storage_keys = HashSet::new();
2484 let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2485
2486 let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2488
2489 for entry in storages_cursor.walk_range(storage_range_start..)? {
2490 let (
2491 BlockNumberHashedAddress((_, hashed_address)),
2492 TrieChangeSetsEntry { nibbles, node },
2493 ) = entry?;
2494
2495 if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2497 storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2498 }
2499 }
2500
2501 let storage_tries = storage_tries
2503 .into_iter()
2504 .map(|(address, mut nodes)| {
2505 nodes.sort_by_key(|(path, _)| *path);
2506 (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2507 })
2508 .collect();
2509
2510 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2511 }
2512
2513 fn get_block_trie_updates(
2514 &self,
2515 block_number: BlockNumber,
2516 ) -> ProviderResult<TrieUpdatesSorted> {
2517 let tx = self.tx_ref();
2518
2519 let reverts = self.trie_reverts(block_number + 1)?;
2521
2522 let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2525 let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2526
2527 let mut account_nodes = Vec::new();
2529
2530 let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2532 let mut account_cursor = cursor_factory.account_trie_cursor()?;
2533
2534 for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2535 let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2536 let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2538 account_nodes.push((nibbles.0, node_value));
2539 }
2540
2541 let mut storage_tries = B256Map::default();
2543 let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2544 let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2545 let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2546
2547 let mut current_hashed_address = None;
2548 let mut storage_cursor = None;
2549
2550 for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2551 let (
2552 BlockNumberHashedAddress((_, hashed_address)),
2553 TrieChangeSetsEntry { nibbles, .. },
2554 ) = entry?;
2555
2556 if current_hashed_address != Some(hashed_address) {
2558 storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2559 current_hashed_address = Some(hashed_address);
2560 }
2561
2562 let cursor =
2564 storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2565 let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2566 storage_tries
2567 .entry(hashed_address)
2568 .or_insert_with(|| StorageTrieUpdatesSorted {
2569 storage_nodes: Vec::new(),
2570 is_deleted: false,
2571 })
2572 .storage_nodes
2573 .push((nibbles.0, node_value));
2574 }
2575
2576 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2577 }
2578}
2579
2580impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2581 fn write_storage_trie_updates_sorted<'a>(
2587 &self,
2588 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2589 ) -> ProviderResult<usize> {
2590 let mut num_entries = 0;
2591 let mut storage_tries = storage_tries.collect::<Vec<_>>();
2592 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2593 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2594 for (hashed_address, storage_trie_updates) in storage_tries {
2595 let mut db_storage_trie_cursor =
2596 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2597 num_entries +=
2598 db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2599 cursor = db_storage_trie_cursor.cursor;
2600 }
2601
2602 Ok(num_entries)
2603 }
2604
2605 fn write_storage_trie_changesets<'a>(
2613 &self,
2614 block_number: BlockNumber,
2615 storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2616 updates_overlay: Option<&TrieUpdatesSorted>,
2617 ) -> ProviderResult<usize> {
2618 let mut num_written = 0;
2619
2620 let mut changeset_cursor =
2621 self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2622
2623 let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2627 let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2628
2629 let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2633 changed_curr_values_cursor,
2634 B256::default(), );
2636 let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2637 wiped_nodes_cursor,
2638 B256::default(), );
2640
2641 let empty_updates = TrieUpdatesSorted::default();
2643
2644 for (hashed_address, storage_trie_updates) in storage_tries {
2645 let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2646
2647 changed_curr_values_cursor =
2649 DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2650
2651 let overlay = updates_overlay.unwrap_or(&empty_updates);
2653
2654 let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
2656 &mut changed_curr_values_cursor,
2657 overlay,
2658 *hashed_address,
2659 );
2660
2661 let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2664 storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2665 &mut in_memory_changed_cursor,
2666 )?;
2667
2668 if storage_trie_updates.is_deleted() {
2669 wiped_nodes_cursor =
2672 DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2673
2674 let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
2676 &mut wiped_nodes_cursor,
2677 overlay,
2678 *hashed_address,
2679 );
2680
2681 let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2682
2683 for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2684 let (path, node) = wiped?;
2685 num_written += 1;
2686 changeset_cursor.append_dup(
2687 changeset_key,
2688 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2689 )?;
2690 }
2691 } else {
2692 for curr_value in curr_values_of_changed {
2693 let (path, node) = curr_value?;
2694 num_written += 1;
2695 changeset_cursor.append_dup(
2696 changeset_key,
2697 TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2698 )?;
2699 }
2700 }
2701 }
2702
2703 Ok(num_written)
2704 }
2705}
2706
2707impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2708 fn unwind_account_hashing<'a>(
2709 &self,
2710 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2711 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2712 let hashed_accounts = changesets
2716 .into_iter()
2717 .map(|(_, e)| (keccak256(e.address), e.info))
2718 .collect::<Vec<_>>()
2719 .into_iter()
2720 .rev()
2721 .collect::<BTreeMap<_, _>>();
2722
2723 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2725 for (hashed_address, account) in &hashed_accounts {
2726 if let Some(account) = account {
2727 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2728 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2729 hashed_accounts_cursor.delete_current()?;
2730 }
2731 }
2732
2733 Ok(hashed_accounts)
2734 }
2735
2736 fn unwind_account_hashing_range(
2737 &self,
2738 range: impl RangeBounds<BlockNumber>,
2739 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2740 let changesets = self
2741 .tx
2742 .cursor_read::<tables::AccountChangeSets>()?
2743 .walk_range(range)?
2744 .collect::<Result<Vec<_>, _>>()?;
2745 self.unwind_account_hashing(changesets.iter())
2746 }
2747
2748 fn insert_account_for_hashing(
2749 &self,
2750 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2751 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2752 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2753 let hashed_accounts =
2754 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2755 for (hashed_address, account) in &hashed_accounts {
2756 if let Some(account) = account {
2757 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2758 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2759 hashed_accounts_cursor.delete_current()?;
2760 }
2761 }
2762 Ok(hashed_accounts)
2763 }
2764
2765 fn unwind_storage_hashing(
2766 &self,
2767 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2768 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2769 let mut hashed_storages = changesets
2771 .into_iter()
2772 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2773 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2774 })
2775 .collect::<Vec<_>>();
2776 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2777
2778 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2780 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2781 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2782 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2783 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2784
2785 if hashed_storage
2786 .seek_by_key_subkey(hashed_address, key)?
2787 .filter(|entry| entry.key == key)
2788 .is_some()
2789 {
2790 hashed_storage.delete_current()?;
2791 }
2792
2793 if !value.is_zero() {
2794 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2795 }
2796 }
2797 Ok(hashed_storage_keys)
2798 }
2799
2800 fn unwind_storage_hashing_range(
2801 &self,
2802 range: impl RangeBounds<BlockNumberAddress>,
2803 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2804 let changesets = self
2805 .tx
2806 .cursor_read::<tables::StorageChangeSets>()?
2807 .walk_range(range)?
2808 .collect::<Result<Vec<_>, _>>()?;
2809 self.unwind_storage_hashing(changesets.into_iter())
2810 }
2811
2812 fn insert_storage_for_hashing(
2813 &self,
2814 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2815 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2816 let hashed_storages =
2818 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2819 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2820 map.insert(keccak256(entry.key), entry.value);
2821 map
2822 });
2823 map.insert(keccak256(address), storage);
2824 map
2825 });
2826
2827 let hashed_storage_keys = hashed_storages
2828 .iter()
2829 .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2830 .collect();
2831
2832 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2833 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2836 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2837 if hashed_storage_cursor
2838 .seek_by_key_subkey(hashed_address, key)?
2839 .filter(|entry| entry.key == key)
2840 .is_some()
2841 {
2842 hashed_storage_cursor.delete_current()?;
2843 }
2844
2845 if !value.is_zero() {
2846 hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2847 }
2848 Ok(())
2849 })
2850 })?;
2851
2852 Ok(hashed_storage_keys)
2853 }
2854}
2855
2856impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2857 fn unwind_account_history_indices<'a>(
2858 &self,
2859 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2860 ) -> ProviderResult<usize> {
2861 let mut last_indices = changesets
2862 .into_iter()
2863 .map(|(index, account)| (account.address, *index))
2864 .collect::<Vec<_>>();
2865 last_indices.sort_by_key(|(a, _)| *a);
2866
2867 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2869 for &(address, rem_index) in &last_indices {
2870 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2871 &mut cursor,
2872 ShardedKey::last(address),
2873 rem_index,
2874 |sharded_key| sharded_key.key == address,
2875 )?;
2876
2877 if !partial_shard.is_empty() {
2880 cursor.insert(
2881 ShardedKey::last(address),
2882 &BlockNumberList::new_pre_sorted(partial_shard),
2883 )?;
2884 }
2885 }
2886
2887 let changesets = last_indices.len();
2888 Ok(changesets)
2889 }
2890
2891 fn unwind_account_history_indices_range(
2892 &self,
2893 range: impl RangeBounds<BlockNumber>,
2894 ) -> ProviderResult<usize> {
2895 let changesets = self
2896 .tx
2897 .cursor_read::<tables::AccountChangeSets>()?
2898 .walk_range(range)?
2899 .collect::<Result<Vec<_>, _>>()?;
2900 self.unwind_account_history_indices(changesets.iter())
2901 }
2902
2903 fn insert_account_history_index(
2904 &self,
2905 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2906 ) -> ProviderResult<()> {
2907 self.append_history_index::<_, tables::AccountsHistory>(
2908 account_transitions,
2909 ShardedKey::new,
2910 )
2911 }
2912
2913 fn unwind_storage_history_indices(
2914 &self,
2915 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2916 ) -> ProviderResult<usize> {
2917 let mut storage_changesets = changesets
2918 .into_iter()
2919 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2920 .collect::<Vec<_>>();
2921 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2922
2923 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2924 for &(address, storage_key, rem_index) in &storage_changesets {
2925 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2926 &mut cursor,
2927 StorageShardedKey::last(address, storage_key),
2928 rem_index,
2929 |storage_sharded_key| {
2930 storage_sharded_key.address == address &&
2931 storage_sharded_key.sharded_key.key == storage_key
2932 },
2933 )?;
2934
2935 if !partial_shard.is_empty() {
2938 cursor.insert(
2939 StorageShardedKey::last(address, storage_key),
2940 &BlockNumberList::new_pre_sorted(partial_shard),
2941 )?;
2942 }
2943 }
2944
2945 let changesets = storage_changesets.len();
2946 Ok(changesets)
2947 }
2948
2949 fn unwind_storage_history_indices_range(
2950 &self,
2951 range: impl RangeBounds<BlockNumberAddress>,
2952 ) -> ProviderResult<usize> {
2953 let changesets = self
2954 .tx
2955 .cursor_read::<tables::StorageChangeSets>()?
2956 .walk_range(range)?
2957 .collect::<Result<Vec<_>, _>>()?;
2958 self.unwind_storage_history_indices(changesets.into_iter())
2959 }
2960
2961 fn insert_storage_history_index(
2962 &self,
2963 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2964 ) -> ProviderResult<()> {
2965 self.append_history_index::<_, tables::StoragesHistory>(
2966 storage_transitions,
2967 |(address, storage_key), highest_block_number| {
2968 StorageShardedKey::new(address, storage_key, highest_block_number)
2969 },
2970 )
2971 }
2972
2973 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2974 {
2976 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2977 self.insert_account_history_index(indices)?;
2978 }
2979
2980 {
2982 let indices = self.changed_storages_and_blocks_with_range(range)?;
2983 self.insert_storage_history_index(indices)?;
2984 }
2985
2986 Ok(())
2987 }
2988}
2989
2990impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2991 for DatabaseProvider<TX, N>
2992{
2993 fn take_block_and_execution_above(
2994 &self,
2995 block: BlockNumber,
2996 ) -> ProviderResult<Chain<Self::Primitives>> {
2997 let range = block + 1..=self.last_block_number()?;
2998
2999 self.unwind_trie_state_from(block + 1)?;
3000
3001 let execution_state = self.take_state_above(block)?;
3003
3004 let blocks = self.recovered_block_range(range)?;
3005
3006 self.remove_blocks_above(block)?;
3009
3010 self.update_pipeline_stages(block, true)?;
3012
3013 Ok(Chain::new(blocks, execution_state, BTreeMap::new(), BTreeMap::new()))
3014 }
3015
3016 fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3017 self.unwind_trie_state_from(block + 1)?;
3018
3019 self.remove_state_above(block)?;
3021
3022 self.remove_blocks_above(block)?;
3025
3026 self.update_pipeline_stages(block, true)?;
3028
3029 Ok(())
3030 }
3031}
3032
3033impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
3034 for DatabaseProvider<TX, N>
3035{
3036 type Block = BlockTy<N>;
3037 type Receipt = ReceiptTy<N>;
3038
3039 fn insert_block(
3061 &self,
3062 block: &RecoveredBlock<Self::Block>,
3063 ) -> ProviderResult<StoredBlockBodyIndices> {
3064 let block_number = block.number();
3065 let tx_count = block.body().transaction_count() as u64;
3066
3067 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3068
3069 self.static_file_provider
3070 .get_writer(block_number, StaticFileSegment::Headers)?
3071 .append_header(block.header(), &block.hash())?;
3072
3073 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
3074 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
3075
3076 let first_tx_num = self
3077 .tx
3078 .cursor_read::<tables::TransactionBlocks>()?
3079 .last()?
3080 .map(|(n, _)| n + 1)
3081 .unwrap_or_default();
3082 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
3083
3084 let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
3085
3086 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
3087 let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
3088 senders_writer.increment_block(block.number())?;
3089 senders_writer
3090 .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
3091 durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
3092 }
3093
3094 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
3095 self.with_rocksdb_batch(|batch| {
3096 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3097 for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
3098 let hash = transaction.tx_hash();
3099 writer.put_transaction_hash_number(*hash, tx_num, false)?;
3100 }
3101 Ok(((), writer.into_raw_rocksdb_batch()))
3102 })?;
3103 durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
3104 }
3105
3106 self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
3107
3108 debug!(
3109 target: "providers::db",
3110 ?block_number,
3111 actions = ?durations_recorder.actions,
3112 "Inserted block"
3113 );
3114
3115 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
3116 }
3117
3118 fn append_block_bodies(
3119 &self,
3120 bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3121 ) -> ProviderResult<()> {
3122 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3123
3124 let mut tx_writer =
3126 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3127
3128 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3129 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3130
3131 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3133
3134 for (block_number, body) in &bodies {
3135 tx_writer.increment_block(*block_number)?;
3137
3138 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3139 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3140
3141 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3142
3143 block_indices_cursor.append(*block_number, &block_indices)?;
3145
3146 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3147
3148 let Some(body) = body else { continue };
3149
3150 if !body.transactions().is_empty() {
3152 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3153 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3154 }
3155
3156 for transaction in body.transactions() {
3158 tx_writer.append_transaction(next_tx_num, transaction)?;
3159
3160 next_tx_num += 1;
3162 }
3163 }
3164
3165 self.storage.writer().write_block_bodies(self, bodies)?;
3166
3167 Ok(())
3168 }
3169
3170 fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3171 let last_block_number = self.last_block_number()?;
3172 for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3174 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3175 }
3176
3177 let highest_static_file_block = self
3179 .static_file_provider()
3180 .get_highest_static_file_block(StaticFileSegment::Headers)
3181 .expect("todo: error handling, headers should exist");
3182
3183 debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3189 self.static_file_provider()
3190 .get_writer(block, StaticFileSegment::Headers)?
3191 .prune_headers(highest_static_file_block.saturating_sub(block))?;
3192
3193 let unwind_tx_from = self
3195 .block_body_indices(block)?
3196 .map(|b| b.next_tx_num())
3197 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3198
3199 let unwind_tx_to = self
3201 .tx
3202 .cursor_read::<tables::BlockBodyIndices>()?
3203 .last()?
3204 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3206 .1
3207 .last_tx_num();
3208
3209 if unwind_tx_from <= unwind_tx_to {
3210 let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3211 self.with_rocksdb_batch(|batch| {
3212 let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3213 for (hash, _) in hashes {
3214 writer.delete_transaction_hash_number(hash)?;
3215 }
3216 Ok(((), writer.into_raw_rocksdb_batch()))
3217 })?;
3218 }
3219
3220 EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3221
3222 self.remove_bodies_above(block)?;
3223
3224 Ok(())
3225 }
3226
3227 fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3228 self.storage.writer().remove_block_bodies_above(self, block)?;
3229
3230 let unwind_tx_from = self
3232 .block_body_indices(block)?
3233 .map(|b| b.next_tx_num())
3234 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3235
3236 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3237 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3238
3239 let static_file_tx_num =
3240 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3241
3242 let to_delete = static_file_tx_num
3243 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3244 .unwrap_or_default();
3245
3246 self.static_file_provider
3247 .latest_writer(StaticFileSegment::Transactions)?
3248 .prune_transactions(to_delete, block)?;
3249
3250 Ok(())
3251 }
3252
3253 fn append_blocks_with_state(
3255 &self,
3256 blocks: Vec<RecoveredBlock<Self::Block>>,
3257 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3258 hashed_state: HashedPostStateSorted,
3259 ) -> ProviderResult<()> {
3260 if blocks.is_empty() {
3261 debug!(target: "providers::db", "Attempted to append empty block range");
3262 return Ok(())
3263 }
3264
3265 let first_number = blocks[0].number();
3268
3269 let last_block_number = blocks[blocks.len() - 1].number();
3272
3273 let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3274
3275 let (account_transitions, storage_transitions) = {
3280 let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3281 let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3282 for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3283 let block_number = first_number + block_idx as u64;
3284 for (address, account_revert) in block_reverts {
3285 account_transitions.entry(*address).or_default().push(block_number);
3286 for storage_key in account_revert.storage.keys() {
3287 let key = B256::new(storage_key.to_be_bytes());
3288 storage_transitions.entry((*address, key)).or_default().push(block_number);
3289 }
3290 }
3291 }
3292 (account_transitions, storage_transitions)
3293 };
3294
3295 for block in blocks {
3297 self.insert_block(&block)?;
3298 durations_recorder.record_relative(metrics::Action::InsertBlock);
3299 }
3300
3301 self.write_state(execution_outcome, OriginalValuesKnown::No)?;
3302 durations_recorder.record_relative(metrics::Action::InsertState);
3303
3304 self.write_hashed_state(&hashed_state)?;
3306 durations_recorder.record_relative(metrics::Action::InsertHashes);
3307
3308 self.insert_account_history_index(account_transitions)?;
3311 self.insert_storage_history_index(storage_transitions)?;
3312 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3313
3314 self.update_pipeline_stages(last_block_number, false)?;
3316 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3317
3318 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3319
3320 Ok(())
3321 }
3322}
3323
3324impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3325 fn get_prune_checkpoint(
3326 &self,
3327 segment: PruneSegment,
3328 ) -> ProviderResult<Option<PruneCheckpoint>> {
3329 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3330 }
3331
3332 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3333 Ok(PruneSegment::variants()
3334 .filter_map(|segment| {
3335 self.tx
3336 .get::<tables::PruneCheckpoints>(segment)
3337 .transpose()
3338 .map(|chk| chk.map(|chk| (segment, chk)))
3339 })
3340 .collect::<Result<_, _>>()?)
3341 }
3342}
3343
3344impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3345 fn save_prune_checkpoint(
3346 &self,
3347 segment: PruneSegment,
3348 checkpoint: PruneCheckpoint,
3349 ) -> ProviderResult<()> {
3350 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3351 }
3352}
3353
3354impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3355 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3356 let db_entries = self.tx.entries::<T>()?;
3357 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3358 Ok(entries) => entries,
3359 Err(ProviderError::UnsupportedProvider) => 0,
3360 Err(err) => return Err(err),
3361 };
3362
3363 Ok(db_entries + static_file_entries)
3364 }
3365}
3366
3367impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3368 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3369 let mut finalized_blocks = self
3370 .tx
3371 .cursor_read::<tables::ChainState>()?
3372 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3373 .take(1)
3374 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3375
3376 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3377 Ok(last_finalized_block_number)
3378 }
3379
3380 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3381 let mut finalized_blocks = self
3382 .tx
3383 .cursor_read::<tables::ChainState>()?
3384 .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3385 .take(1)
3386 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3387
3388 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3389 Ok(last_finalized_block_number)
3390 }
3391}
3392
3393impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3394 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3395 Ok(self
3396 .tx
3397 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3398 }
3399
3400 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3401 Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3402 }
3403}
3404
3405impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3406 type Tx = TX;
3407
3408 fn tx_ref(&self) -> &Self::Tx {
3409 &self.tx
3410 }
3411
3412 fn tx_mut(&mut self) -> &mut Self::Tx {
3413 &mut self.tx
3414 }
3415
3416 fn into_tx(self) -> Self::Tx {
3417 self.tx
3418 }
3419
3420 fn prune_modes_ref(&self) -> &PruneModes {
3421 self.prune_modes_ref()
3422 }
3423
3424 fn commit(self) -> ProviderResult<bool> {
3426 if self.static_file_provider.has_unwind_queued() {
3431 self.tx.commit()?;
3432
3433 #[cfg(all(unix, feature = "rocksdb"))]
3434 {
3435 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3436 for batch in batches {
3437 self.rocksdb_provider.commit_batch(batch)?;
3438 }
3439 }
3440
3441 self.static_file_provider.commit()?;
3442 } else {
3443 self.static_file_provider.commit()?;
3444
3445 #[cfg(all(unix, feature = "rocksdb"))]
3446 {
3447 let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3448 for batch in batches {
3449 self.rocksdb_provider.commit_batch(batch)?;
3450 }
3451 }
3452
3453 self.tx.commit()?;
3454 }
3455
3456 Ok(true)
3457 }
3458}
3459
3460impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3461 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3462 self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3463 }
3464}
3465
3466impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3467 fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3468 self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3469 }
3470}
3471
3472impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3473 fn cached_storage_settings(&self) -> StorageSettings {
3474 *self.storage_settings.read()
3475 }
3476
3477 fn set_storage_settings_cache(&self, settings: StorageSettings) {
3478 *self.storage_settings.write() = settings;
3479 }
3480}
3481
3482#[cfg(test)]
3483mod tests {
3484 use super::*;
3485 use crate::{
3486 test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3487 BlockWriter,
3488 };
3489 use reth_ethereum_primitives::Receipt;
3490 use reth_testing_utils::generators::{self, random_block, BlockParams};
3491 use reth_trie::Nibbles;
3492
3493 #[test]
3494 fn test_receipts_by_block_range_empty_range() {
3495 let factory = create_test_provider_factory();
3496 let provider = factory.provider().unwrap();
3497
3498 let start = 10u64;
3500 let end = 9u64;
3501 let result = provider.receipts_by_block_range(start..=end).unwrap();
3502 assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3503 }
3504
3505 #[test]
3506 fn test_receipts_by_block_range_nonexistent_blocks() {
3507 let factory = create_test_provider_factory();
3508 let provider = factory.provider().unwrap();
3509
3510 let result = provider.receipts_by_block_range(10..=12).unwrap();
3512 assert_eq!(result, vec![vec![], vec![], vec![]]);
3513 }
3514
3515 #[test]
3516 fn test_receipts_by_block_range_single_block() {
3517 let factory = create_test_provider_factory();
3518 let data = BlockchainTestData::default();
3519
3520 let provider_rw = factory.provider_rw().unwrap();
3521 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3522 provider_rw
3523 .write_state(
3524 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3525 crate::OriginalValuesKnown::No,
3526 )
3527 .unwrap();
3528 provider_rw.insert_block(&data.blocks[0].0).unwrap();
3529 provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3530 provider_rw.commit().unwrap();
3531
3532 let provider = factory.provider().unwrap();
3533 let result = provider.receipts_by_block_range(1..=1).unwrap();
3534
3535 assert_eq!(result.len(), 1);
3537 assert_eq!(result[0].len(), 1);
3538 assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3539 }
3540
3541 #[test]
3542 fn test_receipts_by_block_range_multiple_blocks() {
3543 let factory = create_test_provider_factory();
3544 let data = BlockchainTestData::default();
3545
3546 let provider_rw = factory.provider_rw().unwrap();
3547 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3548 provider_rw
3549 .write_state(
3550 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3551 crate::OriginalValuesKnown::No,
3552 )
3553 .unwrap();
3554 for i in 0..3 {
3555 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3556 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3557 }
3558 provider_rw.commit().unwrap();
3559
3560 let provider = factory.provider().unwrap();
3561 let result = provider.receipts_by_block_range(1..=3).unwrap();
3562
3563 assert_eq!(result.len(), 3);
3565 for (i, block_receipts) in result.iter().enumerate() {
3566 assert_eq!(block_receipts.len(), 1);
3567 assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3568 }
3569 }
3570
3571 #[test]
3572 fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3573 let factory = create_test_provider_factory();
3574 let data = BlockchainTestData::default();
3575
3576 let provider_rw = factory.provider_rw().unwrap();
3577 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3578 provider_rw
3579 .write_state(
3580 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3581 crate::OriginalValuesKnown::No,
3582 )
3583 .unwrap();
3584
3585 for i in 0..3 {
3587 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3588 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3589 }
3590 provider_rw.commit().unwrap();
3591
3592 let provider = factory.provider().unwrap();
3593 let result = provider.receipts_by_block_range(1..=3).unwrap();
3594
3595 assert_eq!(result.len(), 3);
3597 for block_receipts in &result {
3598 assert_eq!(block_receipts.len(), 1);
3599 }
3600 }
3601
3602 #[test]
3603 fn test_receipts_by_block_range_partial_range() {
3604 let factory = create_test_provider_factory();
3605 let data = BlockchainTestData::default();
3606
3607 let provider_rw = factory.provider_rw().unwrap();
3608 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3609 provider_rw
3610 .write_state(
3611 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3612 crate::OriginalValuesKnown::No,
3613 )
3614 .unwrap();
3615 for i in 0..3 {
3616 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3617 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3618 }
3619 provider_rw.commit().unwrap();
3620
3621 let provider = factory.provider().unwrap();
3622
3623 let result = provider.receipts_by_block_range(2..=5).unwrap();
3625 assert_eq!(result.len(), 4);
3626
3627 assert_eq!(result[0].len(), 1); assert_eq!(result[1].len(), 1); assert_eq!(result[2].len(), 0); assert_eq!(result[3].len(), 0); assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3634 assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3635 }
3636
3637 #[test]
3638 fn test_receipts_by_block_range_all_empty_blocks() {
3639 let factory = create_test_provider_factory();
3640 let mut rng = generators::rng();
3641
3642 let mut blocks = Vec::new();
3644 for i in 0..3 {
3645 let block =
3646 random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3647 blocks.push(block);
3648 }
3649
3650 let provider_rw = factory.provider_rw().unwrap();
3651 for block in blocks {
3652 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
3653 }
3654 provider_rw.commit().unwrap();
3655
3656 let provider = factory.provider().unwrap();
3657 let result = provider.receipts_by_block_range(1..=3).unwrap();
3658
3659 assert_eq!(result.len(), 3);
3660 for block_receipts in result {
3661 assert_eq!(block_receipts.len(), 0);
3662 }
3663 }
3664
3665 #[test]
3666 fn test_receipts_by_block_range_consistency_with_individual_calls() {
3667 let factory = create_test_provider_factory();
3668 let data = BlockchainTestData::default();
3669
3670 let provider_rw = factory.provider_rw().unwrap();
3671 provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3672 provider_rw
3673 .write_state(
3674 &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3675 crate::OriginalValuesKnown::No,
3676 )
3677 .unwrap();
3678 for i in 0..3 {
3679 provider_rw.insert_block(&data.blocks[i].0).unwrap();
3680 provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3681 }
3682 provider_rw.commit().unwrap();
3683
3684 let provider = factory.provider().unwrap();
3685
3686 let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3688
3689 let mut individual_results = Vec::new();
3691 for block_num in 1..=3 {
3692 let receipts =
3693 provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3694 individual_results.push(receipts);
3695 }
3696
3697 assert_eq!(range_result, individual_results);
3698 }
3699
3700 #[test]
3701 fn test_write_trie_changesets() {
3702 use reth_db_api::models::BlockNumberHashedAddress;
3703 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3704
3705 let factory = create_test_provider_factory();
3706 let provider_rw = factory.provider_rw().unwrap();
3707
3708 let block_number = 1u64;
3709
3710 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3712 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3713
3714 let node1 = BranchNodeCompact::new(
3715 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3721
3722 {
3724 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3725 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3726 }
3727
3728 let account_nodes = vec![
3730 (account_nibbles1, Some(node1.clone())), (account_nibbles2, None), ];
3733
3734 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3739 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3740 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3741
3742 let storage_node1 = BranchNodeCompact::new(
3743 0b1111_0000_0000_0000,
3744 0b0000_0000_0000_0000,
3745 0b0000_0000_0000_0000,
3746 vec![],
3747 None,
3748 );
3749
3750 let storage_node2 = BranchNodeCompact::new(
3751 0b0000_1111_0000_0000,
3752 0b0000_0000_0000_0000,
3753 0b0000_0000_0000_0000,
3754 vec![],
3755 None,
3756 );
3757
3758 let storage_node1_old = BranchNodeCompact::new(
3760 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
3762 0b0000_0000_0000_0000,
3763 vec![],
3764 None,
3765 );
3766
3767 {
3769 let mut cursor =
3770 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3771 let entry = StorageTrieEntry {
3773 nibbles: StoredNibblesSubKey(storage_nibbles1),
3774 node: storage_node1_old.clone(),
3775 };
3776 cursor.upsert(storage_address1, &entry).unwrap();
3777 }
3778
3779 {
3781 let mut cursor =
3782 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3783 let entry1 = StorageTrieEntry {
3785 nibbles: StoredNibblesSubKey(storage_nibbles1),
3786 node: storage_node1.clone(),
3787 };
3788 cursor.upsert(storage_address2, &entry1).unwrap();
3789 let entry3 = StorageTrieEntry {
3791 nibbles: StoredNibblesSubKey(storage_nibbles3),
3792 node: storage_node2.clone(),
3793 };
3794 cursor.upsert(storage_address2, &entry3).unwrap();
3795 }
3796
3797 let storage_trie1 = StorageTrieUpdatesSorted {
3799 is_deleted: false,
3800 storage_nodes: vec![
3801 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
3804 };
3805
3806 let storage_trie2 = StorageTrieUpdatesSorted {
3808 is_deleted: true,
3809 storage_nodes: vec![
3810 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
3815 };
3816
3817 let mut storage_tries = B256Map::default();
3818 storage_tries.insert(storage_address1, storage_trie1);
3819 storage_tries.insert(storage_address2, storage_trie2);
3820
3821 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3822
3823 let num_written =
3825 provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3826
3827 assert_eq!(num_written, 7);
3834
3835 {
3837 let mut cursor =
3838 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3839
3840 let all_entries = cursor
3842 .walk_dup(Some(block_number), None)
3843 .unwrap()
3844 .collect::<Result<Vec<_>, _>>()
3845 .unwrap();
3846
3847 assert_eq!(
3849 all_entries,
3850 vec![
3851 (
3852 block_number,
3853 TrieChangeSetsEntry {
3854 nibbles: StoredNibblesSubKey(account_nibbles1),
3855 node: Some(node1),
3856 }
3857 ),
3858 (
3859 block_number,
3860 TrieChangeSetsEntry {
3861 nibbles: StoredNibblesSubKey(account_nibbles2),
3862 node: None,
3863 }
3864 ),
3865 ]
3866 );
3867 }
3868
3869 {
3871 let mut cursor =
3872 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3873
3874 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3876 let entries1 =
3877 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3878
3879 assert_eq!(
3880 entries1,
3881 vec![
3882 (
3883 key1,
3884 TrieChangeSetsEntry {
3885 nibbles: StoredNibblesSubKey(storage_nibbles1),
3886 node: Some(storage_node1_old), }
3888 ),
3889 (
3890 key1,
3891 TrieChangeSetsEntry {
3892 nibbles: StoredNibblesSubKey(storage_nibbles2),
3893 node: None, }
3895 ),
3896 ]
3897 );
3898
3899 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3901 let entries2 =
3902 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3903
3904 assert_eq!(
3905 entries2,
3906 vec![
3907 (
3908 key2,
3909 TrieChangeSetsEntry {
3910 nibbles: StoredNibblesSubKey(storage_nibbles1),
3911 node: Some(storage_node1), }
3913 ),
3914 (
3915 key2,
3916 TrieChangeSetsEntry {
3917 nibbles: StoredNibblesSubKey(storage_nibbles2),
3918 node: None, }
3920 ),
3921 (
3922 key2,
3923 TrieChangeSetsEntry {
3924 nibbles: StoredNibblesSubKey(storage_nibbles3),
3925 node: Some(storage_node2), }
3927 ),
3928 ]
3929 );
3930 }
3931
3932 provider_rw.commit().unwrap();
3933 }
3934
3935 #[test]
3936 fn test_write_trie_changesets_with_overlay() {
3937 use reth_db_api::models::BlockNumberHashedAddress;
3938 use reth_trie::BranchNodeCompact;
3939
3940 let factory = create_test_provider_factory();
3941 let provider_rw = factory.provider_rw().unwrap();
3942
3943 let block_number = 1u64;
3944
3945 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3947 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3948
3949 let node1 = BranchNodeCompact::new(
3950 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![], None, );
3956
3957 let node1_old = BranchNodeCompact::new(
3962 0b1010_1010_1010_1010, 0b0000_0000_0000_0000,
3964 0b0000_0000_0000_0000,
3965 vec![],
3966 None,
3967 );
3968
3969 let overlay_account_nodes = vec![
3971 (account_nibbles1, Some(node1_old.clone())), ];
3973
3974 let account_nodes = vec![
3976 (account_nibbles1, Some(node1)), (account_nibbles2, None), ];
3979
3980 let storage_address1 = B256::from([1u8; 32]); let storage_address2 = B256::from([2u8; 32]); let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3985 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3986 let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3987
3988 let storage_node1 = BranchNodeCompact::new(
3989 0b1111_0000_0000_0000,
3990 0b0000_0000_0000_0000,
3991 0b0000_0000_0000_0000,
3992 vec![],
3993 None,
3994 );
3995
3996 let storage_node2 = BranchNodeCompact::new(
3997 0b0000_1111_0000_0000,
3998 0b0000_0000_0000_0000,
3999 0b0000_0000_0000_0000,
4000 vec![],
4001 None,
4002 );
4003
4004 let storage_node1_old = BranchNodeCompact::new(
4006 0b1010_0000_0000_0000, 0b0000_0000_0000_0000,
4008 0b0000_0000_0000_0000,
4009 vec![],
4010 None,
4011 );
4012
4013 let mut overlay_storage_tries = B256Map::default();
4015
4016 let overlay_storage_trie1 = StorageTrieUpdatesSorted {
4018 is_deleted: false,
4019 storage_nodes: vec![
4020 (storage_nibbles1, Some(storage_node1_old.clone())), ],
4023 };
4024
4025 let overlay_storage_trie2 = StorageTrieUpdatesSorted {
4027 is_deleted: false,
4028 storage_nodes: vec![
4029 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles3, Some(storage_node2.clone())), ],
4032 };
4033
4034 overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
4035 overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
4036
4037 let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
4038
4039 let storage_trie1 = StorageTrieUpdatesSorted {
4041 is_deleted: false,
4042 storage_nodes: vec![
4043 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, None), ],
4046 };
4047
4048 let storage_trie2 = StorageTrieUpdatesSorted {
4050 is_deleted: true,
4051 storage_nodes: vec![
4052 (storage_nibbles1, Some(storage_node1.clone())), (storage_nibbles2, Some(storage_node2.clone())), ],
4058 };
4059
4060 let mut storage_tries = B256Map::default();
4061 storage_tries.insert(storage_address1, storage_trie1);
4062 storage_tries.insert(storage_address2, storage_trie2);
4063
4064 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4065
4066 let num_written =
4068 provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
4069
4070 assert_eq!(num_written, 7);
4077
4078 {
4080 let mut cursor =
4081 provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4082
4083 let all_entries = cursor
4085 .walk_dup(Some(block_number), None)
4086 .unwrap()
4087 .collect::<Result<Vec<_>, _>>()
4088 .unwrap();
4089
4090 assert_eq!(
4092 all_entries,
4093 vec![
4094 (
4095 block_number,
4096 TrieChangeSetsEntry {
4097 nibbles: StoredNibblesSubKey(account_nibbles1),
4098 node: Some(node1_old), }
4100 ),
4101 (
4102 block_number,
4103 TrieChangeSetsEntry {
4104 nibbles: StoredNibblesSubKey(account_nibbles2),
4105 node: None,
4106 }
4107 ),
4108 ]
4109 );
4110 }
4111
4112 {
4114 let mut cursor =
4115 provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4116
4117 let key1 = BlockNumberHashedAddress((block_number, storage_address1));
4119 let entries1 =
4120 cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
4121
4122 assert_eq!(
4123 entries1,
4124 vec![
4125 (
4126 key1,
4127 TrieChangeSetsEntry {
4128 nibbles: StoredNibblesSubKey(storage_nibbles1),
4129 node: Some(storage_node1_old), }
4131 ),
4132 (
4133 key1,
4134 TrieChangeSetsEntry {
4135 nibbles: StoredNibblesSubKey(storage_nibbles2),
4136 node: None, }
4138 ),
4139 ]
4140 );
4141
4142 let key2 = BlockNumberHashedAddress((block_number, storage_address2));
4144 let entries2 =
4145 cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
4146
4147 assert_eq!(
4148 entries2,
4149 vec![
4150 (
4151 key2,
4152 TrieChangeSetsEntry {
4153 nibbles: StoredNibblesSubKey(storage_nibbles1),
4154 node: Some(storage_node1), }
4156 ),
4157 (
4158 key2,
4159 TrieChangeSetsEntry {
4160 nibbles: StoredNibblesSubKey(storage_nibbles2),
4161 node: None, }
4163 ),
4164 (
4165 key2,
4166 TrieChangeSetsEntry {
4167 nibbles: StoredNibblesSubKey(storage_nibbles3),
4168 node: Some(storage_node2), }
4171 ),
4172 ]
4173 );
4174 }
4175
4176 provider_rw.commit().unwrap();
4177 }
4178
4179 #[test]
4180 fn test_clear_trie_changesets_from() {
4181 use alloy_primitives::hex_literal::hex;
4182 use reth_db_api::models::BlockNumberHashedAddress;
4183 use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
4184
4185 let factory = create_test_provider_factory();
4186
4187 let block1 = 100u64;
4189 let block2 = 101u64;
4190 let block3 = 102u64;
4191 let block4 = 103u64;
4192 let block5 = 104u64;
4193
4194 let storage_address1 =
4196 B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
4197 let storage_address2 =
4198 B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
4199
4200 let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
4202 let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
4203 let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
4204
4205 let node1 = BranchNodeCompact::new(
4207 0b1111_1111_1111_1111,
4208 0b1111_1111_1111_1111,
4209 0b0000_0000_0000_0001,
4210 vec![B256::from(hex!(
4211 "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
4212 ))],
4213 None,
4214 );
4215 let node2 = BranchNodeCompact::new(
4216 0b1111_1111_1111_1110,
4217 0b1111_1111_1111_1110,
4218 0b0000_0000_0000_0010,
4219 vec![B256::from(hex!(
4220 "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
4221 ))],
4222 Some(B256::from(hex!(
4223 "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
4224 ))),
4225 );
4226
4227 {
4229 let provider_rw = factory.provider_rw().unwrap();
4230 let mut cursor =
4231 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4232
4233 cursor
4235 .upsert(
4236 block1,
4237 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4238 )
4239 .unwrap();
4240 cursor
4241 .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4242 .unwrap();
4243
4244 cursor
4246 .upsert(
4247 block2,
4248 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4249 )
4250 .unwrap();
4251 cursor
4252 .upsert(
4253 block2,
4254 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4255 )
4256 .unwrap(); cursor
4258 .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4259 .unwrap();
4260
4261 cursor
4263 .upsert(
4264 block3,
4265 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4266 )
4267 .unwrap();
4268 cursor
4269 .upsert(
4270 block3,
4271 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
4272 )
4273 .unwrap();
4274
4275 cursor
4277 .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4278 .unwrap();
4279
4280 cursor
4282 .upsert(
4283 block5,
4284 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4285 )
4286 .unwrap();
4287 cursor
4288 .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4289 .unwrap();
4290
4291 provider_rw.commit().unwrap();
4292 }
4293
4294 {
4296 let provider_rw = factory.provider_rw().unwrap();
4297 let mut cursor =
4298 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4299
4300 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4302 cursor
4303 .upsert(
4304 key1_block1,
4305 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4306 )
4307 .unwrap();
4308 cursor
4309 .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4310 .unwrap();
4311
4312 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4315 cursor
4316 .upsert(
4317 key1_block2,
4318 &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4319 )
4320 .unwrap();
4321 cursor
4322 .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4323 .unwrap(); cursor
4325 .upsert(
4326 key1_block2,
4327 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4328 )
4329 .unwrap();
4330
4331 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4333 cursor
4334 .upsert(
4335 key2_block3,
4336 &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4337 )
4338 .unwrap();
4339 cursor
4340 .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4341 .unwrap();
4342
4343 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4345 cursor
4346 .upsert(
4347 key1_block4,
4348 &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4349 )
4350 .unwrap();
4351 cursor
4352 .upsert(
4353 key1_block4,
4354 &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4355 )
4356 .unwrap(); let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4360 cursor
4361 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4362 .unwrap();
4363 cursor
4364 .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4365 .unwrap();
4366
4367 provider_rw.commit().unwrap();
4368 }
4369
4370 {
4372 let provider_rw = factory.provider_rw().unwrap();
4373 provider_rw.clear_trie_changesets_from(block2).unwrap();
4374 provider_rw.commit().unwrap();
4375 }
4376
4377 {
4379 let provider = factory.provider().unwrap();
4380 let mut cursor =
4381 provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4382
4383 let block1_entries = cursor
4385 .walk_dup(Some(block1), None)
4386 .unwrap()
4387 .collect::<Result<Vec<_>, _>>()
4388 .unwrap();
4389 assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4390 assert_eq!(block1_entries[0].0, block1);
4391 assert_eq!(block1_entries[1].0, block1);
4392
4393 let block2_entries = cursor
4395 .walk_dup(Some(block2), None)
4396 .unwrap()
4397 .collect::<Result<Vec<_>, _>>()
4398 .unwrap();
4399 assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4400
4401 let block3_entries = cursor
4402 .walk_dup(Some(block3), None)
4403 .unwrap()
4404 .collect::<Result<Vec<_>, _>>()
4405 .unwrap();
4406 assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4407
4408 let block4_entries = cursor
4409 .walk_dup(Some(block4), None)
4410 .unwrap()
4411 .collect::<Result<Vec<_>, _>>()
4412 .unwrap();
4413 assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4414
4415 let block5_entries = cursor
4417 .walk_dup(Some(block5), None)
4418 .unwrap()
4419 .collect::<Result<Vec<_>, _>>()
4420 .unwrap();
4421 assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4422 }
4423
4424 {
4426 let provider = factory.provider().unwrap();
4427 let mut cursor =
4428 provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4429
4430 let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4432 let block1_entries = cursor
4433 .walk_dup(Some(key1_block1), None)
4434 .unwrap()
4435 .collect::<Result<Vec<_>, _>>()
4436 .unwrap();
4437 assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4438
4439 let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4441 let block2_entries = cursor
4442 .walk_dup(Some(key1_block2), None)
4443 .unwrap()
4444 .collect::<Result<Vec<_>, _>>()
4445 .unwrap();
4446 assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4447
4448 let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4449 let block3_entries = cursor
4450 .walk_dup(Some(key2_block3), None)
4451 .unwrap()
4452 .collect::<Result<Vec<_>, _>>()
4453 .unwrap();
4454 assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4455
4456 let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4457 let block4_entries = cursor
4458 .walk_dup(Some(key1_block4), None)
4459 .unwrap()
4460 .collect::<Result<Vec<_>, _>>()
4461 .unwrap();
4462 assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4463
4464 let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4466 let block5_entries = cursor
4467 .walk_dup(Some(key2_block5), None)
4468 .unwrap()
4469 .collect::<Result<Vec<_>, _>>()
4470 .unwrap();
4471 assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4472 }
4473 }
4474
4475 #[test]
4476 fn test_write_trie_updates_sorted() {
4477 use reth_trie::{
4478 updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4479 BranchNodeCompact, StorageTrieEntry,
4480 };
4481
4482 let factory = create_test_provider_factory();
4483 let provider_rw = factory.provider_rw().unwrap();
4484
4485 {
4487 let tx = provider_rw.tx_ref();
4488 let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4489
4490 let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4492 cursor
4493 .upsert(
4494 to_delete,
4495 &BranchNodeCompact::new(
4496 0b1010_1010_1010_1010, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4500 None,
4501 ),
4502 )
4503 .unwrap();
4504
4505 let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4507 cursor
4508 .upsert(
4509 to_update,
4510 &BranchNodeCompact::new(
4511 0b0101_0101_0101_0101, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4515 None,
4516 ),
4517 )
4518 .unwrap();
4519 }
4520
4521 let storage_address1 = B256::from([1u8; 32]);
4523 let storage_address2 = B256::from([2u8; 32]);
4524 {
4525 let tx = provider_rw.tx_ref();
4526 let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4527
4528 storage_cursor
4530 .upsert(
4531 storage_address1,
4532 &StorageTrieEntry {
4533 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4534 node: BranchNodeCompact::new(
4535 0b0011_0011_0011_0011, 0b0000_0000_0000_0000,
4537 0b0000_0000_0000_0000,
4538 vec![],
4539 None,
4540 ),
4541 },
4542 )
4543 .unwrap();
4544
4545 storage_cursor
4547 .upsert(
4548 storage_address2,
4549 &StorageTrieEntry {
4550 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4551 node: BranchNodeCompact::new(
4552 0b1100_1100_1100_1100, 0b0000_0000_0000_0000,
4554 0b0000_0000_0000_0000,
4555 vec![],
4556 None,
4557 ),
4558 },
4559 )
4560 .unwrap();
4561 storage_cursor
4562 .upsert(
4563 storage_address2,
4564 &StorageTrieEntry {
4565 nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4566 node: BranchNodeCompact::new(
4567 0b0011_1100_0011_1100, 0b0000_0000_0000_0000,
4569 0b0000_0000_0000_0000,
4570 vec![],
4571 None,
4572 ),
4573 },
4574 )
4575 .unwrap();
4576 }
4577
4578 let account_nodes = vec![
4580 (
4581 Nibbles::from_nibbles([0x1, 0x2]),
4582 Some(BranchNodeCompact::new(
4583 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4587 None,
4588 )),
4589 ),
4590 (Nibbles::from_nibbles([0x3, 0x4]), None), (
4592 Nibbles::from_nibbles([0x5, 0x6]),
4593 Some(BranchNodeCompact::new(
4594 0b1111_1111_1111_1111, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4598 None,
4599 )),
4600 ),
4601 ];
4602
4603 let storage_trie1 = StorageTrieUpdatesSorted {
4605 is_deleted: false,
4606 storage_nodes: vec![
4607 (
4608 Nibbles::from_nibbles([0x1, 0x0]),
4609 Some(BranchNodeCompact::new(
4610 0b1111_0000_0000_0000, 0b0000_0000_0000_0000, 0b0000_0000_0000_0000, vec![],
4614 None,
4615 )),
4616 ),
4617 (Nibbles::from_nibbles([0x2, 0x0]), None), ],
4619 };
4620
4621 let storage_trie2 = StorageTrieUpdatesSorted {
4622 is_deleted: true, storage_nodes: vec![],
4624 };
4625
4626 let mut storage_tries = B256Map::default();
4627 storage_tries.insert(storage_address1, storage_trie1);
4628 storage_tries.insert(storage_address2, storage_trie2);
4629
4630 let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4631
4632 let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4634
4635 assert_eq!(num_entries, 5);
4638
4639 let tx = provider_rw.tx_ref();
4641 let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4642
4643 let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4645 let entry1 = cursor.seek_exact(nibbles1).unwrap();
4646 assert!(entry1.is_some(), "Updated account node should exist");
4647 let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4648 assert_eq!(
4649 entry1.unwrap().1.state_mask,
4650 expected_mask,
4651 "Account node should have updated state_mask"
4652 );
4653
4654 let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4656 let entry2 = cursor.seek_exact(nibbles2).unwrap();
4657 assert!(entry2.is_none(), "Deleted account node should not exist");
4658
4659 let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4661 let entry3 = cursor.seek_exact(nibbles3).unwrap();
4662 assert!(entry3.is_some(), "New account node should exist");
4663
4664 let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4666
4667 let storage_entries1: Vec<_> = storage_cursor
4669 .walk_dup(Some(storage_address1), None)
4670 .unwrap()
4671 .collect::<Result<Vec<_>, _>>()
4672 .unwrap();
4673 assert_eq!(
4674 storage_entries1.len(),
4675 1,
4676 "Storage address1 should have 1 entry after deletion"
4677 );
4678 assert_eq!(
4679 storage_entries1[0].1.nibbles.0,
4680 Nibbles::from_nibbles([0x1, 0x0]),
4681 "Remaining entry should be [0x1, 0x0]"
4682 );
4683
4684 let storage_entries2: Vec<_> = storage_cursor
4686 .walk_dup(Some(storage_address2), None)
4687 .unwrap()
4688 .collect::<Result<Vec<_>, _>>()
4689 .unwrap();
4690 assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4691
4692 provider_rw.commit().unwrap();
4693 }
4694
4695 #[test]
4696 fn test_get_block_trie_updates() {
4697 use reth_db_api::models::BlockNumberHashedAddress;
4698 use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4699
4700 let factory = create_test_provider_factory();
4701 let provider_rw = factory.provider_rw().unwrap();
4702
4703 let target_block = 2u64;
4704 let next_block = 3u64;
4705
4706 let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4708 let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4709 let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4710
4711 let node1 = BranchNodeCompact::new(
4712 0b1111_1111_0000_0000,
4713 0b0000_0000_0000_0000,
4714 0b0000_0000_0000_0000,
4715 vec![],
4716 None,
4717 );
4718
4719 let node2 = BranchNodeCompact::new(
4720 0b0000_0000_1111_1111,
4721 0b0000_0000_0000_0000,
4722 0b0000_0000_0000_0000,
4723 vec![],
4724 None,
4725 );
4726
4727 let node3 = BranchNodeCompact::new(
4728 0b1010_1010_1010_1010,
4729 0b0000_0000_0000_0000,
4730 0b0000_0000_0000_0000,
4731 vec![],
4732 None,
4733 );
4734
4735 {
4737 let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4738 cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4739 cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4740 }
4742
4743 {
4745 let mut cursor =
4746 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4747 cursor
4749 .append_dup(
4750 target_block,
4751 TrieChangeSetsEntry {
4752 nibbles: StoredNibblesSubKey(account_nibbles1),
4753 node: Some(BranchNodeCompact::new(
4754 0b1111_0000_0000_0000, 0b0000_0000_0000_0000,
4756 0b0000_0000_0000_0000,
4757 vec![],
4758 None,
4759 )),
4760 },
4761 )
4762 .unwrap();
4763 cursor
4765 .append_dup(
4766 target_block,
4767 TrieChangeSetsEntry {
4768 nibbles: StoredNibblesSubKey(account_nibbles2),
4769 node: None,
4770 },
4771 )
4772 .unwrap();
4773 }
4774
4775 {
4777 let mut cursor =
4778 provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4779 cursor
4781 .append_dup(
4782 next_block,
4783 TrieChangeSetsEntry {
4784 nibbles: StoredNibblesSubKey(account_nibbles3),
4785 node: Some(node3),
4786 },
4787 )
4788 .unwrap();
4789 }
4790
4791 let storage_address1 = B256::from([1u8; 32]);
4793 let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4794 let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4795
4796 let storage_node1 = BranchNodeCompact::new(
4797 0b1111_1111_1111_0000,
4798 0b0000_0000_0000_0000,
4799 0b0000_0000_0000_0000,
4800 vec![],
4801 None,
4802 );
4803
4804 let storage_node2 = BranchNodeCompact::new(
4805 0b0101_0101_0101_0101,
4806 0b0000_0000_0000_0000,
4807 0b0000_0000_0000_0000,
4808 vec![],
4809 None,
4810 );
4811
4812 {
4814 let mut cursor =
4815 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4816 cursor
4817 .upsert(
4818 storage_address1,
4819 &StorageTrieEntry {
4820 nibbles: StoredNibblesSubKey(storage_nibbles1),
4821 node: storage_node1.clone(),
4822 },
4823 )
4824 .unwrap();
4825 }
4827
4828 {
4830 let mut cursor =
4831 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4832 let key = BlockNumberHashedAddress((target_block, storage_address1));
4833
4834 cursor
4836 .append_dup(
4837 key,
4838 TrieChangeSetsEntry {
4839 nibbles: StoredNibblesSubKey(storage_nibbles1),
4840 node: Some(BranchNodeCompact::new(
4841 0b0000_0000_1111_1111, 0b0000_0000_0000_0000,
4843 0b0000_0000_0000_0000,
4844 vec![],
4845 None,
4846 )),
4847 },
4848 )
4849 .unwrap();
4850
4851 cursor
4853 .append_dup(
4854 key,
4855 TrieChangeSetsEntry {
4856 nibbles: StoredNibblesSubKey(storage_nibbles2),
4857 node: None,
4858 },
4859 )
4860 .unwrap();
4861 }
4862
4863 {
4865 let mut cursor =
4866 provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4867 let key = BlockNumberHashedAddress((next_block, storage_address1));
4868
4869 cursor
4871 .append_dup(
4872 key,
4873 TrieChangeSetsEntry {
4874 nibbles: StoredNibblesSubKey(storage_nibbles2),
4875 node: Some(BranchNodeCompact::new(
4876 0b0101_0101_0101_0101, 0b0000_0000_0000_0000,
4878 0b0000_0000_0000_0000,
4879 vec![],
4880 None,
4881 )),
4882 },
4883 )
4884 .unwrap();
4885 }
4886
4887 provider_rw.commit().unwrap();
4888
4889 let provider = factory.provider().unwrap();
4891 let result = provider.get_block_trie_updates(target_block).unwrap();
4892
4893 assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4895
4896 let nibbles1_update = result
4898 .account_nodes_ref()
4899 .iter()
4900 .find(|(n, _)| n == &account_nibbles1)
4901 .expect("Should find nibbles1");
4902 assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4903 assert_eq!(
4904 nibbles1_update.1.as_ref().unwrap().state_mask,
4905 node1.state_mask,
4906 "nibbles1 should have current value"
4907 );
4908
4909 let nibbles2_update = result
4911 .account_nodes_ref()
4912 .iter()
4913 .find(|(n, _)| n == &account_nibbles2)
4914 .expect("Should find nibbles2");
4915 assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4916 assert_eq!(
4917 nibbles2_update.1.as_ref().unwrap().state_mask,
4918 node2.state_mask,
4919 "nibbles2 should have current value"
4920 );
4921
4922 assert!(
4924 !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4925 "nibbles3 should not be in target_block updates"
4926 );
4927
4928 assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4930 let storage_updates = result
4931 .storage_tries_ref()
4932 .get(&storage_address1)
4933 .expect("Should have storage updates for address1");
4934
4935 assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4936
4937 let storage1_update = storage_updates
4939 .storage_nodes
4940 .iter()
4941 .find(|(n, _)| n == &storage_nibbles1)
4942 .expect("Should find storage_nibbles1");
4943 assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4944 assert_eq!(
4945 storage1_update.1.as_ref().unwrap().state_mask,
4946 storage_node1.state_mask,
4947 "storage_nibbles1 should have current value"
4948 );
4949
4950 let storage2_update = storage_updates
4953 .storage_nodes
4954 .iter()
4955 .find(|(n, _)| n == &storage_nibbles2)
4956 .expect("Should find storage_nibbles2");
4957 assert!(
4958 storage2_update.1.is_some(),
4959 "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4960 );
4961 assert_eq!(
4962 storage2_update.1.as_ref().unwrap().state_mask,
4963 storage_node2.state_mask,
4964 "storage_nibbles2 should have the value that was created and will be deleted"
4965 );
4966 }
4967
4968 #[test]
4969 fn test_prunable_receipts_logic() {
4970 let insert_blocks =
4971 |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4972 let mut rng = generators::rng();
4973 for block_num in 0..=tip_block {
4974 let block = random_block(
4975 &mut rng,
4976 block_num,
4977 BlockParams { tx_count: Some(tx_count), ..Default::default() },
4978 );
4979 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4980 }
4981 };
4982
4983 let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4984 let outcome = ExecutionOutcome {
4985 first_block: block,
4986 receipts: vec![vec![Receipt {
4987 tx_type: Default::default(),
4988 success: true,
4989 cumulative_gas_used: block, logs: vec![],
4991 }]],
4992 ..Default::default()
4993 };
4994 provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
4995 provider_rw.commit().unwrap();
4996 };
4997
4998 {
5000 let factory = create_test_provider_factory();
5001 let storage_settings = StorageSettings::legacy();
5002 factory.set_storage_settings_cache(storage_settings);
5003 let factory = factory.with_prune_modes(PruneModes {
5004 receipts: Some(PruneMode::Before(100)),
5005 ..Default::default()
5006 });
5007
5008 let tip_block = 200u64;
5009 let first_block = 1u64;
5010
5011 let provider_rw = factory.provider_rw().unwrap();
5013 insert_blocks(&provider_rw, tip_block, 1);
5014 provider_rw.commit().unwrap();
5015
5016 write_receipts(
5017 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
5018 first_block,
5019 );
5020 write_receipts(
5021 factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
5022 tip_block - 1,
5023 );
5024
5025 let provider = factory.provider().unwrap();
5026
5027 for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
5028 assert!(provider
5029 .receipts_by_block(block.into())
5030 .unwrap()
5031 .is_some_and(|r| r.len() == num_receipts));
5032 }
5033 }
5034
5035 {
5037 let factory = create_test_provider_factory();
5038 let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
5039 factory.set_storage_settings_cache(storage_settings);
5040 let factory = factory.with_prune_modes(PruneModes {
5041 receipts: Some(PruneMode::Before(2)),
5042 ..Default::default()
5043 });
5044
5045 let tip_block = 200u64;
5046
5047 let provider_rw = factory.provider_rw().unwrap();
5049 insert_blocks(&provider_rw, tip_block, 1);
5050 provider_rw.commit().unwrap();
5051
5052 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
5054 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
5055
5056 assert!(factory
5057 .static_file_provider()
5058 .get_highest_static_file_tx(StaticFileSegment::Receipts)
5059 .is_none(),);
5060 assert!(factory
5061 .static_file_provider()
5062 .get_highest_static_file_block(StaticFileSegment::Receipts)
5063 .is_some_and(|b| b == 1),);
5064
5065 write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
5068 assert!(factory
5069 .static_file_provider()
5070 .get_highest_static_file_tx(StaticFileSegment::Receipts)
5071 .is_some_and(|num| num == 2),);
5072
5073 let factory = factory.with_prune_modes(PruneModes {
5077 receipts: Some(PruneMode::Before(100)),
5078 ..Default::default()
5079 });
5080 let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
5081 assert!(PruneMode::Distance(1).should_prune(3, tip_block));
5082 write_receipts(provider_rw, 3);
5083
5084 let provider = factory.provider().unwrap();
5089 assert!(EitherWriter::receipts_destination(&provider).is_static_file());
5090 for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
5091 assert!(provider
5092 .receipts_by_block(num.into())
5093 .unwrap()
5094 .is_some_and(|r| r.len() == num_receipts));
5095
5096 let receipt = provider.receipt(num).unwrap();
5097 if num_receipts > 0 {
5098 assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
5099 } else {
5100 assert!(receipt.is_none());
5101 }
5102 }
5103 }
5104 }
5105}